diff --git a/eventlet/api.py b/eventlet/api.py index 82df92e..af65cff 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -31,7 +31,7 @@ import inspect import traceback from eventlet.support import greenlet -from eventlet import greenlib, tls +from eventlet import tls __all__ = [ 'call_after', 'exc_after', 'getcurrent', 'get_default_hub', 'get_hub', @@ -135,15 +135,15 @@ def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError """ t = None hub = get_hub() - self = greenlet.getcurrent() - assert hub.greenlet is not self, 'do not call blocking functions from the mainloop' + current = greenlet.getcurrent() + assert hub.greenlet is not current, 'do not call blocking functions from the mainloop' fileno = getattr(fd, 'fileno', lambda: fd)() def _do_close(_d, error=None): - greenlib.switch(self, exc=getattr(error, 'value', None)) # convert to socket.error + current.throw(getattr(error, 'value', None)) # convert to socket.error def _do_timeout(): - greenlib.switch(self, exc=timeout_exc()) + current.throw(timeout_exc()) def cb(d): - greenlib.switch(self) + current.switch() # with TwistedHub, descriptor actually an object (socket_rwdescriptor) which stores # this callback. If this callback stores a reference to the socket instance (fd) # then descriptor has a reference to that instance. This makes socket not collected @@ -173,10 +173,10 @@ def get_fileno(obj): return f() def select(read_list, write_list, error_list, timeout=None): - self = get_hub() + hub = get_hub() t = None current = greenlet.getcurrent() - assert self.greenlet is not current, 'do not call blocking functions from the mainloop' + assert hub.greenlet is not current, 'do not call blocking functions from the mainloop' ds = {} for r in read_list: ds[get_fileno(r)] = {'read' : r} @@ -189,33 +189,33 @@ def select(read_list, write_list, error_list, timeout=None): def on_read(d): original = ds[get_fileno(d)]['read'] - greenlib.switch(current, ([original], [], [])) + current.switch(([original], [], [])) def on_write(d): original = ds[get_fileno(d)]['write'] - greenlib.switch(current, ([], [original], [])) + current.switch(([], [original], [])) def on_error(d, _err=None): original = ds[get_fileno(d)]['error'] - greenlib.switch(current, ([], [], [original])) + current.switch(([], [], [original])) def on_timeout(): - greenlib.switch(current, ([], [], [])) + current.switch(([], [], [])) if timeout is not None: - t = self.schedule_call(timeout, on_timeout) + t = hub.schedule_call(timeout, on_timeout) try: for k, v in ds.iteritems(): - d = self.add_descriptor(k, - v.get('read') is not None and on_read, - v.get('write') is not None and on_write, - v.get('error') is not None and on_error) + d = hub.add_descriptor(k, + v.get('read') is not None and on_read, + v.get('write') is not None and on_write, + v.get('error') is not None and on_error) descriptors.append(d) try: - return self.switch() + return hub.switch() finally: for d in descriptors: - self.remove_descriptor(d) + hub.remove_descriptor(d) finally: if t is not None: t.cancel() @@ -223,7 +223,7 @@ def select(read_list, write_list, error_list, timeout=None): def _spawn_startup(cb, args, kw, cancel=None): try: - greenlib.switch(greenlet.getcurrent().parent) + greenlet.getcurrent().parent.switch() cancel = None finally: if cancel is not None: @@ -232,7 +232,25 @@ def _spawn_startup(cb, args, kw, cancel=None): def _spawn(g): g.parent = greenlet.getcurrent() - greenlib.switch(g) + g.switch() + + +class CancellingTimersGreenlet(greenlet.greenlet): + + def __init__(self, run=None, parent=None, hub=None): + self._run = run + if parent is None: + parent = greenlet.getcurrent() + if hub is None: + hub = get_hub() + self.hub = hub + greenlet.greenlet.__init__(self, None, parent) + + def run(self, *args, **kwargs): + try: + return self._run(*args, **kwargs) + finally: + self.hub.cancel_timers(self, quiet=True) def spawn(function, *args, **kwds): @@ -251,17 +269,18 @@ def spawn(function, *args, **kwds): """ # killable t = None - g = greenlib.tracked_greenlet() - t = get_hub().schedule_call(0, _spawn, g) - greenlib.switch(g, (_spawn_startup, function, args, kwds, t.cancel)) + g = CancellingTimersGreenlet(_spawn_startup) + t = get_hub().schedule_call_global(0, _spawn, g) + g.switch(function, args, kwds, t.cancel) return g def kill(g): - get_hub().schedule_call(0, greenlib.kill, g) - sleep(0) + get_hub().schedule_call(0, g.throw) + sleep(0) -def call_after(seconds, function, *args, **kwds): +def call_after_global(seconds, function, *args, **kwds): """Schedule *function* to be called after *seconds* have elapsed. + The function will be scheduled even if the current greenlet has exited. *seconds* may be specified as an integer, or a float if fractional seconds are desired. The *function* will be called with the given *args* and @@ -272,12 +291,35 @@ def call_after(seconds, function, *args, **kwds): """ # cancellable def startup(): - g = greenlib.tracked_greenlet() - greenlib.switch(g, (_spawn_startup, function, args, kwds)) - greenlib.switch(g) - return get_hub().schedule_call(seconds, startup) + g = CancellingTimersGreenlet(_spawn_startup) + g.switch(function, args, kwds) + g.switch() + t = get_hub().schedule_call_global(seconds, startup) + return t +def call_after_local(seconds, function, *args, **kwds): + """Schedule *function* to be called after *seconds* have elapsed. + The function will NOT be called if the current greenlet has exited. + + *seconds* may be specified as an integer, or a float if fractional seconds + are desired. The *function* will be called with the given *args* and + keyword arguments *kwds*, and will be executed within the main loop's + coroutine. + + Its return value is discarded. Any uncaught exception will be logged. + """ + # cancellable + def startup(): + g = CancellingTimersGreenlet(_spawn_startup) + g.switch(function, args, kwds) + g.switch() + t = get_hub().schedule_call_local(seconds, startup) + return t + +# for compatibility with original eventlet API +call_after = call_after_local + def with_timeout(seconds, func, *args, **kwds): """Wrap a call to some (yielding) function with a timeout; if the called function fails to return before the timeout, cancel it and return a flag @@ -422,15 +464,13 @@ def sleep(seconds=0): """ hub = get_hub() assert hub.greenlet is not greenlet.getcurrent(), 'do not call blocking functions from the mainloop' - timer = hub.schedule_call(seconds, greenlib.switch, greenlet.getcurrent()) + timer = hub.schedule_call(seconds, greenlet.getcurrent().switch) try: hub.switch() finally: timer.cancel() -switch = greenlib.switch -local_dict = greenlib.greenlet_dict getcurrent = greenlet.getcurrent GreenletExit = greenlet.GreenletExit diff --git a/eventlet/channel.py b/eventlet/channel.py index a8be44e..32be5ae 100644 --- a/eventlet/channel.py +++ b/eventlet/channel.py @@ -25,7 +25,7 @@ THE SOFTWARE. import collections -from eventlet import api, greenlib +from eventlet import api from eventlet.support import greenlet __all__ = ['channel'] @@ -49,8 +49,13 @@ class channel(object): def _tasklet_loop(self): deque = self.deque = collections.deque() hub = api.get_hub() - switch = greenlib.switch - direction, caller, args = switch() + current = greenlet.getcurrent() + def switch(g, value=None, exc=None): + if exc is None: + return g.switch(value) + else: + return g.throw(exc) + direction, caller, args = switch(current.parent or current) try: while True: if direction == -1: @@ -80,12 +85,12 @@ class channel(object): try: t = self._tasklet except AttributeError: - t = self._tasklet = greenlib.tracked_greenlet() - greenlib.switch(t, (self._tasklet_loop,)) + t = self._tasklet = greenlet.greenlet(self._tasklet_loop) + t.switch() if args: - return greenlib.switch(t, (1, greenlet.getcurrent(), args)) + return t.switch((1, greenlet.getcurrent(), args)) else: - return greenlib.switch(t, (-1, greenlet.getcurrent(), args)) + return t.switch((-1, greenlet.getcurrent(), args)) def receive(self): return self._send_tasklet() diff --git a/eventlet/coros.py b/eventlet/coros.py index e1b77b4..da6d690 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -31,7 +31,6 @@ import traceback from eventlet import api from eventlet import channel from eventlet import pools -from eventlet import greenlib try: @@ -182,7 +181,7 @@ class event(object): if waiter in self._waiters: del self._waiters[waiter] api.get_hub().schedule_call( - 0, greenlib.switch, waiter, None, Cancelled()) + 0, waiter.throw, Cancelled()) def send(self, result=None, exc=None): """Makes arrangements for the waiters to be woken with the @@ -215,8 +214,7 @@ class event(object): self._exc = exc hub = api.get_hub() for waiter in self._waiters: - hub.schedule_call(0, greenlib.switch, waiter, self._result) - + hub.schedule_call(0, waiter.switch, self._result) class semaphore(object): """Classic semaphore implemented with a counter and an event. diff --git a/eventlet/green/thread.py b/eventlet/green/thread.py index 31f220b..cd7176a 100644 --- a/eventlet/green/thread.py +++ b/eventlet/green/thread.py @@ -1,16 +1,21 @@ """implements standard module 'thread' with greenlets""" from __future__ import absolute_import import thread as thread_module -from eventlet.greenlib import greenlet_id as get_ident from eventlet.support import greenlet from eventlet.api import spawn from eventlet.coros import semaphore as LockType error = thread_module.error +def get_ident(gr=None): + if gr is None: + return id(greenlet.getcurrent()) + else: + return id(gr) + def start_new_thread(function, args=(), kwargs={}): g = spawn(function, *args, **kwargs) - return get_ident(g) or 0 # XXX 0 only for main greenlet, None for the rest untracked + return get_ident(g) def allocate_lock(): return LockType(1) diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index 4e93000..9109ecb 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -30,8 +30,6 @@ import traceback import time from eventlet.support import greenlet - -from eventlet import greenlib from eventlet.timer import Timer _g_debug = True @@ -49,7 +47,7 @@ class BaseHub(object): self.waiters_by_greenlet = {} self.clock = clock - self.greenlet = None + self.greenlet = greenlet.greenlet(self.run) self.stopping = False self.running = False self.timers = [] @@ -106,7 +104,7 @@ class BaseHub(object): fileno = self.waiters_by_greenlet.pop(gr, None) if fileno is not None: self.remove_descriptor(fileno) - greenlib.switch(gr, None, exception_object) + gr.throw(exception_object) def exc_descriptor(self, fileno): exc = self.excs.get(fileno) @@ -122,16 +120,13 @@ class BaseHub(object): self.switch() def switch(self): - if not self.greenlet: - self.greenlet = greenlib.tracked_greenlet() - args = ((self.run,),) - else: - args = () + if self.greenlet.dead: + self.greenlet = greenlet.greenlet(self.run) try: greenlet.getcurrent().parent = self.greenlet except ValueError: pass - return greenlib.switch(self.greenlet, *args) + return self.greenlet.switch() def squelch_exception(self, fileno, exc_info): traceback.print_exception(*exc_info) diff --git a/eventlet/hubs/libev.py b/eventlet/hubs/libev.py index 3baf741..d839ecd 100644 --- a/eventlet/hubs/libev.py +++ b/eventlet/hubs/libev.py @@ -29,7 +29,6 @@ import errno import traceback import time -from eventlet import greenlib from eventlet.timer import Timer from eventlet.hubs import hub diff --git a/eventlet/hubs/nginx.py b/eventlet/hubs/nginx.py index 3d5a905..0914df0 100644 --- a/eventlet/hubs/nginx.py +++ b/eventlet/hubs/nginx.py @@ -36,7 +36,6 @@ if mydir not in sys.path: from eventlet import api -from eventlet import greenlib from eventlet import httpc from eventlet.hubs import hub from eventlet import util @@ -112,7 +111,7 @@ class Hub(hub.BaseHub): result = to_call[0](to_call[1]) del self.to_call return result - greenlib.switch(self.current_application, self.poll(int(seconds*1000))) + self.current_application.switch(self.poll(int(seconds*1000))) def application(self, env, start_response): print "ENV",env @@ -151,8 +150,8 @@ class Hub(hub.BaseHub): yield x return result = self.switch() - if not isinstance(result, tuple): - result = (result, None) ## TODO Fix greenlib's return values + #if not isinstance(result, tuple): + # result = (result, None) ## TODO Fix greenlib's return values def application(env, start_response): diff --git a/eventlet/hubs/poll.py b/eventlet/hubs/poll.py index 981c153..a6ec709 100644 --- a/eventlet/hubs/poll.py +++ b/eventlet/hubs/poll.py @@ -31,7 +31,6 @@ import traceback from time import sleep import time -from eventlet import greenlib from eventlet.hubs import hub EXC_MASK = select.POLLERR | select.POLLHUP | select.POLLNVAL diff --git a/eventlet/hubs/twistedr.py b/eventlet/hubs/twistedr.py index 29073d2..22ad203 100644 --- a/eventlet/hubs/twistedr.py +++ b/eventlet/hubs/twistedr.py @@ -3,7 +3,6 @@ import threading import weakref from twisted.internet.base import DelayedCall as TwistedDelayedCall from eventlet.hubs.hub import _g_debug -from eventlet import greenlib from eventlet.support.greenlet import greenlet import traceback @@ -78,7 +77,7 @@ class BaseTwistedHub(object): greenlet.getcurrent().parent = self.greenlet except ValueError, ex: pass - return greenlib.switch(self.greenlet) + return self.greenlet.switch() def stop(self): from twisted.internet import reactor @@ -105,7 +104,7 @@ class BaseTwistedHub(object): fileno = self.waiters_by_greenlet.pop(gr, None) if fileno is not None: self.remove_descriptor(fileno) - greenlib.switch(gr, None, exception_object) + gr.throw(exception_object) # required by GreenSocket def exc_descriptor(self, _fileno): @@ -212,7 +211,6 @@ class BaseTwistedHub(object): return len(reactor.getDelayedCalls()) - class TwistedHub(BaseTwistedHub): # wrapper around reactor that runs reactor's main loop in a separate greenlet. # whenever you need to wait, i.e. inside a call that must appear @@ -235,19 +233,17 @@ class TwistedHub(BaseTwistedHub): assert Hub.state==0, ('This hub can only be instantiated once', Hub.state) Hub.state = 1 make_twisted_threadpool_daemonic() # otherwise the program would hang after the main greenlet exited - BaseTwistedHub.__init__(self, None) + g = greenlet(self.run) + BaseTwistedHub.__init__(self, g) def switch(self): - if not self.greenlet: - self.greenlet = greenlib.tracked_greenlet() - args = ((self.run,),) - else: - args = () + if self.greenlet.dead: + self.greenlet = greenlet(self.run) try: greenlet.getcurrent().parent = self.greenlet except ValueError, ex: pass - return greenlib.switch(self.greenlet, *args) + return self.greenlet.switch() def run(self, installSignalHandlers=None): if installSignalHandlers is None: diff --git a/eventlet/twistedutil/__init__.py b/eventlet/twistedutil/__init__.py index 1441b55..43d01e2 100644 --- a/eventlet/twistedutil/__init__.py +++ b/eventlet/twistedutil/__init__.py @@ -1,18 +1,17 @@ from twisted.internet import defer from twisted.python import failure from eventlet.support.greenlet import greenlet -from eventlet import greenlib from eventlet.api import get_hub, spawn def block_on(deferred): cur = [greenlet.getcurrent()] def cb(value): if cur: - greenlib.switch(cur[0], value) + cur[0].switch(value) return value def eb(err): if cur: - greenlib.switch(cur[0], exc=(err.type, err.value, err.tb)) + err.throwExceptionIntoGenerator(cur[0]) return err deferred.addCallback(cb) deferred.addErrback(eb) @@ -41,7 +40,10 @@ def callInGreenThread(func, *args, **kwargs): if __name__=='__main__': import sys - num = int(sys.argv[1]) + try: + num = int(sys.argv[1]) + except: + sys.exit('Supply number of test as an argument, 0, 1, 2 or 3') from twisted.internet import reactor def test(): print block_on(reactor.resolver.getHostByName('www.google.com')) @@ -51,7 +53,9 @@ if __name__=='__main__': elif num==1: spawn(test) from eventlet.api import sleep + print 'sleeping..' sleep(5) + print 'done sleeping..' elif num==2: from eventlet.twistedutil import join_reactor spawn(test)