diff --git a/eventlet/hubs/twistedr.py b/eventlet/hubs/twistedr.py index 26dd43b..b5017d8 100644 --- a/eventlet/hubs/twistedr.py +++ b/eventlet/hubs/twistedr.py @@ -1,7 +1,12 @@ import threading +import weakref +from eventlet.hubs.hub import _g_debug from eventlet import greenlib from eventlet.support.greenlet import greenlet +from functools import wraps +import traceback + class socket_rwdescriptor: #implements(IReadWriteDescriptor) @@ -42,6 +47,7 @@ class BaseTwistedHub(object): def __init__(self, mainloop_greenlet): self.greenlet = mainloop_greenlet self.waiters_by_greenlet = {} + self.timers_by_greenlet = {} def switch(self): assert greenlet.getcurrent() is not self.greenlet, 'Impossible to switch() from the mainloop greenlet' @@ -61,9 +67,8 @@ class BaseTwistedHub(object): self.switch() def add_descriptor(self, fileno, read=None, write=None, exc=None): - #print 'add_descriptor', fileno, read, write, exc - descriptor = socket_rwdescriptor(fileno, read, write, exc) from twisted.internet import reactor + descriptor = socket_rwdescriptor(fileno, read, write, exc) if read: reactor.addReader(descriptor) if write: @@ -88,13 +93,96 @@ class BaseTwistedHub(object): def exc_descriptor(self, _fileno): pass # XXX do something sensible here - # required by greenlet_body - def cancel_timers(self, greenlet, quiet=False): - pass # XXX do something sensible here - def schedule_call(self, seconds, func, *args, **kwargs): from twisted.internet import reactor - return reactor.callLater(seconds, func, *args, **kwargs) + @wraps(func) + def wrap(*args, **kwargs): + try: + return func(*args, **kwargs) + finally: + self.timer_finished(timer) + timer = reactor.callLater(seconds, wrap, *args, **kwargs) + if seconds: + self.track_timer(timer) + return timer + + def track_timer(self, timer): + try: + current_greenlet = greenlet.getcurrent() + timer.greenlet = current_greenlet + self.timers_by_greenlet.setdefault( + current_greenlet, + weakref.WeakKeyDictionary())[timer] = True + except: + print 'track_timer failed' + traceback.print_exc() + raise + + def timer_finished(self, timer): + try: + greenlet = timer.greenlet + del self.timers_by_greenlet[greenlet][timer] + if not self.timers_by_greenlet[greenlet]: + del self.timers_by_greenlet[greenlet] + except (AttributeError, KeyError): + pass + except: + print 'timer_finished failed' + traceback.print_exc() + raise + + def cancel_timers(self, greenlet, quiet=False): + try: + if greenlet not in self.timers_by_greenlet: + return + for timer in self.timers_by_greenlet[greenlet].keys(): + if not timer.cancelled and not timer.called and hasattr(timer, 'greenlet'): + ## If timer.seconds is 0, this isn't a timer, it's + ## actually eventlet's silly way of specifying whether + ## a coroutine is "ready to run" or not. + ## TwistedHub: I do the same, by not attaching 'greenlet' attribute to zero-timers QQQ + try: + # this might be None due to weirdness with weakrefs + timer.cancel() + except TypeError: + pass + if _g_debug and not quiet: + print 'Hub cancelling left-over timer %s' % timer + try: + del self.timers_by_greenlet[greenlet] + except KeyError: + pass + except: + print 'cancel_timers failed' + import traceback + traceback.print_exc() + if not quiet: + raise + + def abort(self): + from twisted.internet import reactor + reactor.crash() + + # for test cases: + + def get_readers(self): + from twisted.internet import reactor + readers = reactor.getReaders() + readers.remove(getattr(reactor, 'waker')) + return readers + + def get_writers(self): + from twisted.internet import reactor + return reactor.getWriters() + + def get_excs(self): + return [] + + @property + def running(self): + from twisted.internet import reactor + return reactor.running + class TwistedHub(BaseTwistedHub): # wrapper around reactor that runs reactor's main loop in a separate greenlet.