From 2d3eb6348894c5abda9ebe32b510e3800b22695d Mon Sep 17 00:00:00 2001 From: radix Date: Wed, 19 Mar 2008 20:38:10 -0500 Subject: [PATCH] Add gthreadless (greenlet / deferred integration) and fix some module names that conflicted with toplevel modules. --- eventlet/api.py | 6 +- eventlet/gthreadless.py | 135 ++++++++++++++++++ eventlet/hubs/hub.py | 2 +- eventlet/hubs/poll.py | 3 +- eventlet/hubs/{select.py => selects.py} | 0 .../support/{stackless.py => stacklesss.py} | 0 eventlet/support/{twisted.py => twisteds.py} | 12 +- 7 files changed, 151 insertions(+), 7 deletions(-) create mode 100644 eventlet/gthreadless.py rename eventlet/hubs/{select.py => selects.py} (100%) rename eventlet/support/{stackless.py => stacklesss.py} (100%) rename eventlet/support/{twisted.py => twisteds.py} (89%) diff --git a/eventlet/api.py b/eventlet/api.py index facc834..de19143 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -200,12 +200,12 @@ def get_default_hub(): pass import select - if hasattr(select, 'poll'): + if 0:#hasattr(select, 'poll'): import eventlet.hubs.poll return eventlet.hubs.poll else: - import eventlet.hubs.select - return eventlet.hubs.select + import eventlet.hubs.selects + return eventlet.hubs.selects def use_hub(mod=None): diff --git a/eventlet/gthreadless.py b/eventlet/gthreadless.py new file mode 100644 index 0000000..382bc4d --- /dev/null +++ b/eventlet/gthreadless.py @@ -0,0 +1,135 @@ +import greenlet +greenlet.main = greenlet.getcurrent() # WTF did greenlet.main go? +from twisted.internet import defer, reactor + +def _desc(g): + if isinstance(g, DebugGreenlet): + if hasattr(g, 'name'): + desc = "<%s %s" % (g.name, hex(id(g))) + else: + desc = " %s" % (_desc(current), _desc(self)) + return super(DebugGreenlet, self).switch(*args, **kwargs) + +def deferredGreenlet(func): + """ + I am a function decorator for functions that call blockOn. The + function I return will call the original function inside of a + greenlet, and return a Deferred. + + TODO: Do a hack so the name of 'replacement' is the name of 'func'. + """ + def replacement(*args, **kwargs): + d = defer.Deferred() + def greenfunc(*args, **kwargs): + try: + d.callback(func(*args, **kwargs)) + except: + d.errback() + g = greenlet.greenlet(greenfunc) + crap = g.switch(*args, **kwargs) + return d + return replacement + +class CalledFromMain(Exception): + pass + +class _IAmAnException(object): + def __init__(self, f): + self.f = f + +def blockOn(d, desc=None): + """ + Use me in non-main greenlets to wait for a Deferred to fire. + """ + g = greenlet.getcurrent() + if g is greenlet.main: + raise CalledFromMain("You cannot call blockOn from the main greenlet.") + + ## Note ## + # Notice that this code catches and ignores GreenletExit. The + # greenlet mechanism sends a GreenletExit at a blocking greenlet if + # there is no chance that the greenlet will be fired by anyone + # else -- that is, no other greenlets have a reference to the one + # that's blocking. + + # This is often the case with blockOn. When someone blocks on a + # Deferred, these callbacks are added to it. When the deferred + # fires, we make the blockOn() call finish -- we resume the + # blocker. At that point, the Deferred chain is irrelevant; it + # makes no sense for any other callbacks to be called. The + # Deferred, then, will likely be garbage collected and thus all + # references to our greenlet will be lost -- and thus it will have + # GreenletExit fired. + + def cb(r): + try: + # This callback might be fired immediately when added + # and switching to the current greenlet seems to do nothing + # (ie. we will never actually return to the function we called + # blockOn from), so we make the call happen later in the main greenlet + # instead, if the current greenlet is the same as the one we are swithcing + # to. + + if g == greenlet.getcurrent(): + reactor.callLater(0, g.switch, r) + else: + g.switch(r) + except greenlet.GreenletExit: + pass + def eb(f): + try: + g.switch(_IAmAnException(f)) + except greenlet.GreenletExit: + pass + + d.addCallbacks(cb, eb) + + x = g.parent.switch() + if isinstance(x, _IAmAnException): + x.f.raiseException() + return x + + +class GreenletWrapper(object): + """Wrap an object which presents an asynchronous interface (via Deferreds). + + The wrapped object will present the same interface, but all methods will + return results, rather than Deferreds. + + When a Deferred would otherwise be returned, a greenlet is created and then + control is switched back to the main greenlet. When the Deferred fires, + control is switched back to the created greenlet and execution resumes with + the result. + """ + + def __init__(self, wrappee): + self.wrappee = wrappee + + def __getattribute__(self, name): + wrappee = super(GreenletWrapper, self).__getattribute__('wrappee') + original = getattr(wrappee, name) + if callable(original): + def wrapper(*a, **kw): + result = original(*a, **kw) + if isinstance(result, defer.Deferred): + return blockOn(result) + return result + return wrapper + return original + diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index b9c0d2f..5cc382b 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -203,7 +203,7 @@ class BaseHub(object): def add_timer(self, timer): scheduled_time = self.clock() + timer.seconds self._add_absolute_timer(scheduled_time, timer) - timer.greenlet = current_greenlet + timer.greenlet = greenlet.getcurrent() self.track_timer(timer) return scheduled_time diff --git a/eventlet/hubs/poll.py b/eventlet/hubs/poll.py index 065f31b..8d34ff0 100644 --- a/eventlet/hubs/poll.py +++ b/eventlet/hubs/poll.py @@ -29,6 +29,7 @@ import socket import errno import traceback from time import sleep +import time from eventlet import greenlib from eventlet.hubs import hub @@ -42,7 +43,7 @@ class Hub(hub.BaseHub): super(Hub, self).__init__(clock) self.poll = select.poll() - def add_descriptor(self, fileno, read=None, write=None, exc=None): + def add_descriptor(self, fileno, read=None, write=None, exc=None): super(Hub, self).add_descriptor(fileno, read, write, exc) mask = self.get_fn_mask(read, write) diff --git a/eventlet/hubs/select.py b/eventlet/hubs/selects.py similarity index 100% rename from eventlet/hubs/select.py rename to eventlet/hubs/selects.py diff --git a/eventlet/support/stackless.py b/eventlet/support/stacklesss.py similarity index 100% rename from eventlet/support/stackless.py rename to eventlet/support/stacklesss.py diff --git a/eventlet/support/twisted.py b/eventlet/support/twisteds.py similarity index 89% rename from eventlet/support/twisted.py rename to eventlet/support/twisteds.py index 2776882..cd6fdb3 100644 --- a/eventlet/support/twisted.py +++ b/eventlet/support/twisteds.py @@ -91,28 +91,36 @@ class EventletReactor(posixbase.PosixReactorBase): api.get_hub().abort() def addReader(self, reader): + print "NEW READER", reader.fileno() fileno = reader.fileno() self._readers[fileno] = reader api.get_hub().add_descriptor(fileno, read=self._got_read) def _got_read(self, fileno): + print "got read on", fileno, self._readers[fileno] + api.get_hub().add_descriptor(fileno, read=self._got_read) self._readers[fileno].doRead() def addWriter(self, writer): + print "NEW WRITER", writer.fileno() fileno = writer.fileno() self._writers[fileno] = writer api.get_hub().add_descriptor(fileno, write=self._got_write) def _got_write(self, fileno): + print "got write on", fileno, self._writers[fileno] + api.get_hub().add_descriptor(fileno, write=self._got_write) self._writers[fileno].doWrite() def removeReader(self, reader): + print "removing reader", reader.fileno() fileno = reader.fileno() if fileno in self._readers: self._readers.pop(fileno) api.get_hub().remove_descriptor(fileno) def removeWriter(self, writer): + print "removing writer", writer.fileno() fileno = writer.fileno() if fileno in self._writers: self._writers.pop(fileno) @@ -122,7 +130,7 @@ class EventletReactor(posixbase.PosixReactorBase): return self._removeAll(self._readers.values(), self._writers.values()) -def emulate(): +def install(): if not _working: raise RuntimeError, "Can't use support.twisted because zope.interface is not installed." reactor = EventletReactor() @@ -130,5 +138,5 @@ def emulate(): installReactor(reactor) -__all__ = ['emulate'] +__all__ = ['install']