From aa82d5e64ecefed8764558b14d3b667fd3754b9b Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Wed, 17 Jun 2009 18:06:51 +0700 Subject: [PATCH] new libevent hub; works much better now --- eventlet/hubs/libevent.py | 205 ++++++++++++++++++++++++-------------- 1 file changed, 128 insertions(+), 77 deletions(-) diff --git a/eventlet/hubs/libevent.py b/eventlet/hubs/libevent.py index c6377c2..ff05f32 100644 --- a/eventlet/hubs/libevent.py +++ b/eventlet/hubs/libevent.py @@ -1,4 +1,5 @@ # Copyright (c) 2007, Linden Research, Inc. +# Copyright (c) 2009 Denis Bilenko # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights @@ -17,43 +18,114 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. -import signal import sys import time - -from eventlet.hubs import hub - - -try: - # use rel if available - import rel - rel.initialize() - rel.override() -except ImportError: - # don't have rel, but might still have libevent - pass - +import traceback import event +from eventlet import api + + +class event_wrapper(object): + + def __init__(self, impl): + self.impl = impl + + def __repr__(self): + if self.impl is not None: + return repr(self.impl) + else: + return object.__repr__(self) + + def __str__(self): + if self.impl is not None: + return str(self.impl) + else: + return object.__str__(self) + + def cancel(self): + if self.impl is not None: + self.impl.delete() + self.impl = None + + +class LocalTimer(event_wrapper): + + def __init__(self, cb, args, kwargs): + self.tpl = cb, args, kwargs + self.greenlet = api.getcurrent() + # 'impl' attribute must be set to libevent's timeout instance + + def __call__(self): + if self.greenlet: + cb, args, kwargs = self.tpl + cb(*args, **kwargs) + + +class Hub(object): + + SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit, api.GreenletExit) -class Hub(hub.BaseHub): def __init__(self, clock=time.time): - super(Hub, self).__init__(clock) - self.interrupted = False event.init() + self.clock = clock + self.readers = {} + self.writers = {} + self.excs = {} + self.greenlet = api.Greenlet(self.run) + self.exc_info = None + self.signal(2, lambda signalnum, frame: self.greenlet.parent.throw(KeyboardInterrupt)) - sig = event.signal(signal.SIGINT, self.signal_received, signal.SIGINT) - sig.add() + def switch(self): + cur = api.getcurrent() + assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP' + switch_out = getattr(cur, 'switch_out', None) + if switch_out is not None: + try: + switch_out() + except: + traceback.print_exception(*sys.exc_info()) + if self.greenlet.dead: + self.greenlet = api.Greenlet(self.run) + try: + api.getcurrent().parent = self.greenlet + except ValueError: + pass + return self.greenlet.switch() + + def run(self): + while True: + try: + event.dispatch() + break + except api.GreenletExit: + break + except self.SYSTEM_EXCEPTIONS: + raise + except: + if self.exc_info is not None: + self.schedule_call_global(0, api.getcurrent().parent.throw, *self.exc_info) + self.exc_info = None + else: + traceback.print_exc() + + def abort(self): + # schedule an exception, because otherwise dispatch() will not exit if + # there are timeouts scheduled + self.schedule_call_global(0, self.greenlet.throw, api.GreenletExit) + event.abort() + + @property + def running(self): + return True if self.greenlet else False def add_descriptor(self, fileno, read=None, write=None, exc=None): if read: evt = event.read(fileno, read, fileno) - evt.add() self.readers[fileno] = evt, read if write: evt = event.write(fileno, write, fileno) - evt.add() self.writers[fileno] = evt, write if exc: @@ -61,6 +133,15 @@ class Hub(hub.BaseHub): return fileno + def signal(self, signalnum, handler): + def wrapper(): + try: + handler(signalnum, None) + except: + self.exc_info = sys.exc_info() + event.abort() + return event_wrapper(event.signal(signalnum, wrapper)) + def remove_descriptor(self, fileno): for queue in (self.readers, self.writers): tpl = queue.pop(fileno, None) @@ -68,65 +149,35 @@ class Hub(hub.BaseHub): tpl[0].delete() self.excs.pop(fileno, None) - def abort(self): - super(Hub, self).abort() - event.abort() + def schedule_call_local(self, seconds, cb, *args, **kwargs): + timer = LocalTimer(cb, args, kwargs) + event_timeout = event.timeout(seconds, timer) + timer.impl = event_timeout + return timer - def signal_received(self, signal): - # can't do more than set this flag here because the pyevent callback - # mechanism swallows exceptions raised here, so we have to raise in - # the 'main' greenlet (in wait()) to kill the program - self.interrupted = True - event.abort() + schedule_call = schedule_call_local - def wait(self, seconds=None): - # this timeout will cause us to return from the dispatch() call - # when we want to - timer = event.timeout(seconds, lambda: None) - timer.add() + def schedule_call_global(self, seconds, cb, *args, **kwargs): + event_timeout = event.timeout(seconds, lambda : cb(*args, **kwargs) and None) + return event_wrapper(event_timeout) - try: - status = event.dispatch() - except self.SYSTEM_EXCEPTIONS: - self.interrupted = True - except: - self.squelch_exception(-1, sys.exc_info()) - - # we are explicitly ignoring the status because in our experience it's - # harmless and there's nothing meaningful we could do with it anyway - - timer.delete() - - # raise any signals that deserve raising - if self.interrupted: - self.interrupted = False - raise KeyboardInterrupt() - - def add_timer(self, timer): - # store the pyevent timer object so that we can cancel later - eventtimer = event.timeout(timer.seconds, timer) - timer.impltimer = eventtimer - eventtimer.add() - - def timer_finished(self, timer): - try: + def exc_descriptor(self, fileno): + exc = self.excs.get(fileno) + if exc is not None: try: - timer.impltimer.delete() - del timer.impltimer - # XXX might this raise other exceptions? double delete? - except (AttributeError, TypeError): - pass - finally: - super(Hub, self).timer_finished(timer) + exc(fileno) + except: + traceback.print_exc() - def timer_canceled(self, timer): - """ Cancels the underlying libevent timer. """ - try: - try: - timer.impltimer.delete() - del timer.impltimer - except (AttributeError, TypeError): - pass - finally: - super(Hub, self).timer_canceled(timer) + def timer_finished(self, t): pass + def timer_canceled(self, t): pass + + def get_readers(self): + return self.readers + + def get_writers(self): + return self.writers + + def get_excs(self): + return self.excs