diff --git a/eventlet/green/builtin.py b/eventlet/green/builtin.py new file mode 100644 index 0000000..2ea2e61 --- /dev/null +++ b/eventlet/green/builtin.py @@ -0,0 +1,43 @@ +""" +In order to detect a filehandle that's been closed, our only clue may be +the operating system returning the same filehandle in response to some +other operation. + +The builtins 'file' and 'open' are patched to collaborate with the +notify_opened protocol. +""" + +builtins_orig = __builtins__ + +from eventlet import hubs +from eventlet.hubs import hub +from eventlet.patcher import slurp_properties +import sys + +__all__ = dir(builtins_orig) +__patched__ = ['file', 'open'] + +slurp_properties(builtins_orig, globals(), + ignore=__patched__, srckeys=dir(builtins_orig)) + +hubs.get_hub() + +__original_file = file +class file(__original_file): + def __init__(self, *args, **kwargs): + super(file, self).__init__(*args, **kwargs) + hubs.notify_opened(self.fileno()) + +__original_open = open +__opening = False +def open(*args): + global __opening + result = __original_open(*args) + if not __opening: + # This is incredibly ugly. 'open' is used under the hood by + # the import process. So, ensure we don't wind up in an + # infinite loop. + __opening = True + hubs.notify_opened(result.fileno()) + __opening = False + return result \ No newline at end of file diff --git a/eventlet/green/os.py b/eventlet/green/os.py index e16041a..f18ecac 100644 --- a/eventlet/green/os.py +++ b/eventlet/green/os.py @@ -9,7 +9,7 @@ from eventlet import hubs from eventlet.patcher import slurp_properties __all__ = os_orig.__all__ -__patched__ = ['fdopen', 'read', 'write', 'wait', 'waitpid'] +__patched__ = ['fdopen', 'read', 'write', 'wait', 'waitpid', 'open'] slurp_properties(os_orig, globals(), ignore=__patched__, srckeys=dir(os_orig)) @@ -40,7 +40,10 @@ def read(fd, n): if get_errno(e) == errno.EPIPE: return '' raise - hubs.trampoline(fd, read=True) + try: + hubs.trampoline(fd, read=True) + except hubs.IOClosed: + return '' __original_write__ = os_orig.write def write(fd, st): @@ -81,4 +84,12 @@ def waitpid(pid, options): return rpid, status greenthread.sleep(0.01) -# TODO: open +__original_open__ = os_orig.open +def open(file, flags, mode=0777): + """ Wrap os.open + This behaves identically, but collaborates with + the hub's notify_opened protocol. + """ + fd = __original_open__(file, flags, mode) + hubs.notify_opened(fd) + return fd \ No newline at end of file diff --git a/eventlet/green/ssl.py b/eventlet/green/ssl.py index 62fec77..8d9d072 100644 --- a/eventlet/green/ssl.py +++ b/eventlet/green/ssl.py @@ -8,7 +8,7 @@ import errno time = __import__('time') from eventlet.support import get_errno -from eventlet.hubs import trampoline +from eventlet.hubs import trampoline, IOClosed from eventlet.greenio import set_nonblocking, GreenSocket, SOCKET_CLOSED, CONNECT_ERR, CONNECT_SUCCESS orig_socket = __import__('socket') socket = orig_socket.socket @@ -98,8 +98,11 @@ class GreenSSLSocket(__ssl.SSLSocket): def read(self, len=1024): """Read up to LEN bytes and return them. Return zero-length string on EOF.""" - return self._call_trampolining( - super(GreenSSLSocket, self).read, len) + try: + return self._call_trampolining( + super(GreenSSLSocket, self).read, len) + except IOClosed: + return '' def send (self, data, flags=0): if self._sslobj: @@ -164,8 +167,11 @@ class GreenSSLSocket(__ssl.SSLSocket): if self.act_non_blocking: raise if get_errno(e) == errno.EWOULDBLOCK: - trampoline(self, read=True, - timeout=self.gettimeout(), timeout_exc=timeout_exc('timed out')) + try: + trampoline(self, read=True, + timeout=self.gettimeout(), timeout_exc=timeout_exc('timed out')) + except IOClosed: + return '' if get_errno(e) in SOCKET_CLOSED: return '' raise diff --git a/eventlet/greenio.py b/eventlet/greenio.py index 51336bd..f099826 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -7,7 +7,7 @@ import time import warnings from eventlet.support import get_errno, six -from eventlet.hubs import trampoline +from eventlet.hubs import trampoline, notify_close, notify_opened, IOClosed __all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe'] @@ -122,6 +122,8 @@ class GreenSocket(object): should_set_nonblocking = kwargs.pop('set_nonblocking', True) if isinstance(family_or_realsock, six.integer_types): fd = _original_socket(family_or_realsock, *args, **kwargs) + # Notify the hub that this is a newly-opened socket. + notify_opened(fd.fileno()) else: fd = family_or_realsock @@ -151,6 +153,7 @@ class GreenSocket(object): self.listen = fd.listen self.setsockopt = fd.setsockopt self.shutdown = fd.shutdown + self._closed = False @property def _sock(self): @@ -166,6 +169,25 @@ class GreenSocket(object): setattr(self, name, attr) return attr + def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None): + """ We need to trampoline via the event hub. + We catch any signal back from the hub indicating that the operation we + were waiting on was associated with a filehandle that's since been + invalidated. + """ + if self._closed: + # If we did any logging, alerting to a second trampoline attempt on a closed + # socket here would be useful. + raise IOClosed() + try: + return trampoline(fd, read=True, timeout=self.gettimeout(), + timeout_exc=socket.timeout("timed out"), + mark_as_closed=self._mark_as_closed) + except IOClosed: + # This socket's been obsoleted. De-fang it. + self._mark_as_closed() + raise + def accept(self): if self.act_non_blocking: return self.fd.accept() @@ -176,16 +198,37 @@ class GreenSocket(object): client, addr = res set_nonblocking(client) return type(self)(client), addr - trampoline(fd, read=True, timeout=self.gettimeout(), + self._trampoline(fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) + def _defanged_close(self): + # Already closed the once + pass + + + def _mark_as_closed(self): + """ Mark this socket as being closed """ + self.close = self._defanged_close + self._closed = True + + def close(self): + notify_close(self.fd) + self._mark_as_closed() # Don't do this twice. + return self.fd.close() + + def __del__(self): + self.close() + def connect(self, address): if self.act_non_blocking: return self.fd.connect(address) fd = self.fd if self.gettimeout() is None: while not socket_connect(fd, address): - trampoline(fd, write=True) + try: + self._trampoline(fd, write=True) + except IOClosed: + raise socket.error(errno.EBADFD) socket_checkerr(fd) else: end = time.time() + self.gettimeout() @@ -194,8 +237,12 @@ class GreenSocket(object): return if time.time() >= end: raise socket.timeout("timed out") - trampoline(fd, write=True, timeout=end - time.time(), + try: + self._trampoline(fd, write=True, timeout=end - time.time(), timeout_exc=socket.timeout("timed out")) + except IOClosed: + # ... we need some workable errno here. + raise socket.error(errno.EBADFD) socket_checkerr(fd) def connect_ex(self, address): @@ -205,10 +252,12 @@ class GreenSocket(object): if self.gettimeout() is None: while not socket_connect(fd, address): try: - trampoline(fd, write=True) + self._trampoline(fd, write=True) socket_checkerr(fd) except socket.error as ex: return get_errno(ex) + except IOClosed: + return errno.EBADFD else: end = time.time() + self.gettimeout() while True: @@ -217,11 +266,13 @@ class GreenSocket(object): return 0 if time.time() >= end: raise socket.timeout(errno.EAGAIN) - trampoline(fd, write=True, timeout=end - time.time(), + self._trampoline(fd, write=True, timeout=end - time.time(), timeout_exc=socket.timeout(errno.EAGAIN)) socket_checkerr(fd) except socket.error as ex: return get_errno(ex) + except IOClosed: + return errno.EBADFD def dup(self, *args, **kw): sock = self.fd.dup(*args, **kw) @@ -255,27 +306,31 @@ class GreenSocket(object): return '' else: raise - trampoline( - fd, - read=True, - timeout=self.gettimeout(), - timeout_exc=socket.timeout("timed out")) + try: + self._trampoline( + fd, + read=True, + timeout=self.gettimeout(), + timeout_exc=socket.timeout("timed out")) + except IOClosed as e: + # Perhaps we should return '' instead? + raise EOFError() def recvfrom(self, *args): if not self.act_non_blocking: - trampoline(self.fd, read=True, timeout=self.gettimeout(), + self._trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) return self.fd.recvfrom(*args) def recvfrom_into(self, *args): if not self.act_non_blocking: - trampoline(self.fd, read=True, timeout=self.gettimeout(), + self._trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) return self.fd.recvfrom_into(*args) def recv_into(self, *args): if not self.act_non_blocking: - trampoline(self.fd, read=True, timeout=self.gettimeout(), + self._trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) return self.fd.recv_into(*args) @@ -297,8 +352,11 @@ class GreenSocket(object): if total_sent == len_data: break - trampoline(self.fd, write=True, timeout=self.gettimeout(), - timeout_exc=socket.timeout("timed out")) + try: + self._trampoline(self.fd, write=True, timeout=self.gettimeout(), + timeout_exc=socket.timeout("timed out")) + except IOClosed: + raise socket.error(errno.ECONNRESET, 'Connection closed by another thread') return total_sent @@ -309,7 +367,7 @@ class GreenSocket(object): tail += self.send(data[tail:], flags) def sendto(self, *args): - trampoline(self.fd, write=True) + self._trampoline(self.fd, write=True) return self.fd.sendto(*args) def setblocking(self, flag): @@ -357,6 +415,30 @@ class _SocketDuckForFd(object): def __init__(self, fileno): self._fileno = fileno + notify_opened(fileno) + self._closed = False + + def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None): + if self._closed: + # Don't trampoline if we're already closed. + raise IOClosed() + try: + return trampoline(fd, read=True, timeout=self.gettimeout(), + timeout_exc=socket.timeout("timed out"), + mark_as_closed=self.mark_as_closed) + except IOClosed: + # Our fileno has been obsoleted. Defang ourselves to + # prevent spurious closes. + self._mark_as_closed() + raise + + def _defanged_close(self): + # Don't let anything close the wrong filehandle. + pass + + def _mark_as_closed(self): + self.close = self._close = self._defanged_close + self._closed = True @property def _sock(self): @@ -373,7 +455,7 @@ class _SocketDuckForFd(object): except OSError as e: if get_errno(e) not in SOCKET_BLOCKING: raise IOError(*e.args) - trampoline(self, read=True) + self._trampoline(self, read=True) def recv_into(self, buf, nbytes=0, flags=0): if nbytes == 0: @@ -402,7 +484,7 @@ class _SocketDuckForFd(object): raise IOError(*e.args) total_sent = 0 while total_sent < len_data: - trampoline(self, write=True) + self._trampoline(self, write=True) try: total_sent += os_write(fileno, data[total_sent:]) except OSError as e: @@ -413,6 +495,8 @@ class _SocketDuckForFd(object): self._close() def _close(self): + notify_close(self._fileno) + self._mark_as_closed() try: os.close(self._fileno) except: @@ -484,7 +568,14 @@ class GreenPipe(_fileobject): self.mode, (id(self) < 0) and (sys.maxint + id(self)) or id(self)) + def _defanged_close(self): + pass + + def _mark_as_closed(self): + self.close = self._defanged_close + def close(self): + self._mark_as_closed() super(GreenPipe, self).close() for method in [ 'fileno', 'flush', 'isatty', 'next', 'read', 'readinto', diff --git a/eventlet/hubs/__init__.py b/eventlet/hubs/__init__.py index b07a219..4662ec3 100644 --- a/eventlet/hubs/__init__.py +++ b/eventlet/hubs/__init__.py @@ -119,7 +119,8 @@ from eventlet import timeout def trampoline(fd, read=None, write=None, timeout=None, - timeout_exc=timeout.Timeout): + timeout_exc=timeout.Timeout, + mark_as_closed = None): """Suspend the current coroutine until the given socket object or file descriptor is ready to *read*, ready to *write*, or the specified *timeout* elapses, depending on arguments specified. @@ -145,12 +146,15 @@ def trampoline(fd, read=None, write=None, timeout=None, except AttributeError: fileno = fd if timeout is not None: - t = hub.schedule_call_global(timeout, current.throw, timeout_exc) + def _timeout(exc): + # This is only useful to insert debugging + current.throw(exc) + t = hub.schedule_call_global(timeout, _timeout, timeout_exc) try: if read: - listener = hub.add(hub.READ, fileno, current.switch) + listener = hub.add(hub.READ, fileno, current.switch, current.throw, mark_as_closed) elif write: - listener = hub.add(hub.WRITE, fileno, current.switch) + listener = hub.add(hub.WRITE, fileno, current.switch, current.throw, mark_as_closed) try: return hub.switch() finally: @@ -158,3 +162,27 @@ def trampoline(fd, read=None, write=None, timeout=None, finally: if t is not None: t.cancel() + +def notify_close(fd): + """ + A particular file descriptor has been explicitly closed. Register for any + waiting listeners to be notified on the next run loop. + """ + hub = get_hub() + hub.notify_close(fd) + +def notify_opened(fd): + """ + Some file descriptors may be closed 'silently' - that is, by the garbage + collector, by an external library, etc. When the OS returns a file descriptor + from an open call (or something similar), this may be the only indication we + have that the FD has been closed and then recycled. + We let the hub know that the old file descriptor is dead; any stuck listeners + will be disabled and notified in turn. + """ + hub = get_hub() + hub.mark_as_reopened(fd) + + +class IOClosed(IOError): + pass diff --git a/eventlet/hubs/epolls.py b/eventlet/hubs/epolls.py index a45d460..e59502c 100644 --- a/eventlet/hubs/epolls.py +++ b/eventlet/hubs/epolls.py @@ -42,10 +42,10 @@ class Hub(poll.Hub): except AttributeError: self.modify = self.poll.register - def add(self, evtype, fileno, cb): + def add(self, evtype, fileno, cb, tb, mac): oldlisteners = bool(self.listeners[READ].get(fileno) or self.listeners[WRITE].get(fileno)) - listener = BaseHub.add(self, evtype, fileno, cb) + listener = BaseHub.add(self, evtype, fileno, cb, tb, mac) try: if not oldlisteners: # Means we've added a new listener diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index e25c752..c0ecc13 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -1,3 +1,4 @@ +import errno import heapq import math import traceback @@ -20,7 +21,7 @@ else: arm_alarm = alarm_signal from eventlet.support import greenlets as greenlet, clear_sys_exc_info -from eventlet.hubs import timer +from eventlet.hubs import timer, IOClosed from eventlet import patcher time = patcher.original('time') @@ -29,30 +30,64 @@ g_prevent_multiple_readers = True READ="read" WRITE="write" + +def closed_callback(fileno): + """ Used to de-fang a callback that may be triggered by a loop in BaseHub.wait + """ + # No-op. + pass + + class FdListener(object): - def __init__(self, evtype, fileno, cb): + def __init__(self, evtype, fileno, cb, tb, mark_as_closed): + """ The following are required: + cb - the standard callback, which will switch into the + listening greenlet to indicate that the event waited upon + is ready + tb - a 'throwback'. This is typically greenlet.throw, used + to raise a signal into the target greenlet indicating that + an event was obsoleted by its underlying filehandle being + repurposed. + mark_as_closed - if any listener is obsoleted, this is called + (in the context of some other client greenlet) to alert + underlying filehandle-wrapping objects that they've been + closed. + """ assert (evtype is READ or evtype is WRITE) self.evtype = evtype self.fileno = fileno self.cb = cb + self.tb = tb + self.mark_as_closed = mark_as_closed + self.spent = False + self.greenlet = greenlet.getcurrent() def __repr__(self): - return "%s(%r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno, self.cb) + return "%s(%r, %r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno, + self.cb, self.tb) __str__ = __repr__ + def defang(self): + self.cb = closed_callback + if self.mark_as_closed is not None: + self.mark_as_closed() + self.spent = True + + +noop = FdListener(READ, 0, lambda x: None, lambda x: None, None) -noop = FdListener(READ, 0, lambda x: None) # in debug mode, track the call site that created the listener class DebugListener(FdListener): - def __init__(self, evtype, fileno, cb): + def __init__(self, evtype, fileno, cb, tb, mark_as_closed): self.where_called = traceback.format_stack() - self.greenlet = greenlet.getcurrent() - super(DebugListener, self).__init__(evtype, fileno, cb) + super(DebugListener, self).__init__(evtype, fileno, cb, tb, mark_as_closed) def __repr__(self): - return "DebugListener(%r, %r, %r, %r)\n%sEndDebugFdListener" % ( + return "DebugListener(%r, %r, %r, %r, %r, %r)\n%sEndDebugFdListener" % ( self.evtype, self.fileno, self.cb, + self.tb, + self.mark_as_closed, self.greenlet, ''.join(self.where_called)) __str__ = __repr__ @@ -75,6 +110,7 @@ class BaseHub(object): def __init__(self, clock=time.time): self.listeners = {READ:{}, WRITE:{}} self.secondaries = {READ:{}, WRITE:{}} + self.closed = [] self.clock = clock self.greenlet = greenlet.greenlet(self.run) @@ -102,7 +138,7 @@ class BaseHub(object): signal.signal(signal.SIGALRM, self._old_signal_handler) signal.alarm(0) - def add(self, evtype, fileno, cb): + def add(self, evtype, fileno, cb, tb, mark_as_closed): """ Signals an intent to or write a particular file descriptor. The *evtype* argument is either the constant READ or WRITE. @@ -111,8 +147,15 @@ class BaseHub(object): The *cb* argument is the callback which will be called when the file is ready for reading/writing. + + The *tb* argument is the throwback used to signal (into the greenlet) + that the file was closed. + + The *mark_as_closed* is used in the context of the event hub to + prepare a Python object as being closed, pre-empting further + close operations from accidentally shutting down the wrong OS thread. """ - listener = self.lclass(evtype, fileno, cb) + listener = self.lclass(evtype, fileno, cb, tb, mark_as_closed) bucket = self.listeners[evtype] if fileno in bucket: if g_prevent_multiple_readers: @@ -122,15 +165,51 @@ class BaseHub(object): "particular socket. Consider using a pools.Pool. "\ "If you do know what you're doing and want to disable "\ "this error, call "\ - "eventlet.debug.hub_prevent_multiple_readers(False)" % ( - evtype, fileno, evtype)) + "eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=%s; THAT THREAD=%s" % ( + evtype, fileno, evtype, cb, bucket[fileno])) # store off the second listener in another structure self.secondaries[evtype].setdefault(fileno, []).append(listener) else: bucket[fileno] = listener return listener + def _obsolete(self, fileno): + """ We've received an indication that 'fileno' has been obsoleted. + Any current listeners must be defanged, and notifications to + their greenlets queued up to send. + """ + found = False + for evtype, bucket in self.secondaries.items(): + if fileno in bucket: + for listener in bucket[fileno]: + found = True + self.closed.append(listener) + listener.defang() + del bucket[fileno] + + # For the primary listeners, we actually need to call remove, + # which may modify the underlying OS polling objects. + for evtype, bucket in self.listeners.items(): + if fileno in bucket: + listener = bucket[fileno] + found = True + listener.defang() + self.closed.append(bucket[fileno]) + self.remove(listener) + + return found + + def notify_close(self, fileno): + """ We might want to do something when a fileno is closed. + However, currently it suffices to obsolete listeners only + when we detect an old fileno being recycled, on open. + """ + pass + def remove(self, listener): + if listener.spent: + # trampoline may trigger this in its finally section. + return fileno = listener.fileno evtype = listener.evtype self.listeners[evtype].pop(fileno, None) @@ -143,6 +222,16 @@ class BaseHub(object): if not sec: del self.secondaries[evtype][fileno] + def mark_as_reopened(self, fileno): + """ If a file descriptor is returned by the OS as the result of some + open call (or equivalent), that signals that it might be being + recycled. + + Catch the case where the fd was previously in use. + """ + self._obsolete(fileno) + + def remove_descriptor(self, fileno): """ Completely remove all listeners for this fileno. For internal use only.""" @@ -157,6 +246,16 @@ class BaseHub(object): except Exception as e: self.squelch_generic_exception(sys.exc_info()) + def close_one(self): + """ Triggered from the main run loop. If a listener's underlying FD was + closed somehow, throw an exception back to the trampoline, which should + be able to manage it appropriately. + """ + listener = self.closed.pop() + if not listener.greenlet.dead: + # There's no point signalling a greenlet that's already dead. + listener.tb(IOClosed(errno.ENOTCONN, "Operation on closed file")) + def ensure_greenlet(self): if self.greenlet.dead: # create new greenlet sharing same parent as original @@ -220,6 +319,9 @@ class BaseHub(object): self.running = True self.stopping = False while not self.stopping: + while self.closed: + # We ditch all of these first. + self.close_one() self.prepare_timers() if self.debug_blocking: self.block_detect_pre() diff --git a/eventlet/hubs/kqueue.py b/eventlet/hubs/kqueue.py index 1a6f65d..a3e81b1 100644 --- a/eventlet/hubs/kqueue.py +++ b/eventlet/hubs/kqueue.py @@ -48,8 +48,8 @@ class Hub(BaseHub): return self.kqueue.control(events, max_events, timeout) raise - def add(self, evtype, fileno, cb): - listener = super(Hub, self).add(evtype, fileno, cb) + def add(self, evtype, fileno, cb, tb, mac): + listener = super(Hub, self).add(evtype, fileno, cb, tb, mac) events = self._events.setdefault(fileno, {}) if evtype not in events: try: diff --git a/eventlet/hubs/poll.py b/eventlet/hubs/poll.py index 233986e..caaaecf 100644 --- a/eventlet/hubs/poll.py +++ b/eventlet/hubs/poll.py @@ -24,8 +24,8 @@ class Hub(BaseHub): except AttributeError: self.modify = self.poll.register - def add(self, evtype, fileno, cb): - listener = super(Hub, self).add(evtype, fileno, cb) + def add(self, evtype, fileno, cb, tb, mac): + listener = super(Hub, self).add(evtype, fileno, cb, tb, mac) self.register(fileno, new=True) return listener @@ -92,18 +92,27 @@ class Hub(BaseHub): if self.debug_blocking: self.block_detect_pre() + # Accumulate the listeners to call back to prior to + # triggering any of them. This is to keep the set + # of callbacks in sync with the events we've just + # polled for. It prevents one handler from invalidating + # another. + callbacks = set() for fileno, event in presult: + if event & READ_MASK: + callbacks.add((readers.get(fileno, noop), fileno)) + if event & WRITE_MASK: + callbacks.add((writers.get(fileno, noop), fileno)) + if event & select.POLLNVAL: + self.remove_descriptor(fileno) + continue + if event & EXC_MASK: + callbacks.add((readers.get(fileno, noop), fileno)) + callbacks.add((writers.get(fileno, noop), fileno)) + + for listener, fileno in callbacks: try: - if event & READ_MASK: - readers.get(fileno, noop).cb(fileno) - if event & WRITE_MASK: - writers.get(fileno, noop).cb(fileno) - if event & select.POLLNVAL: - self.remove_descriptor(fileno) - continue - if event & EXC_MASK: - readers.get(fileno, noop).cb(fileno) - writers.get(fileno, noop).cb(fileno) + listener.cb(fileno) except SYSTEM_EXCEPTIONS: raise except: diff --git a/eventlet/hubs/pyevent.py b/eventlet/hubs/pyevent.py index a8afa51..fd277e7 100644 --- a/eventlet/hubs/pyevent.py +++ b/eventlet/hubs/pyevent.py @@ -98,7 +98,7 @@ class Hub(BaseHub): pass # exists for compatibility with BaseHub running = property(_getrunning, _setrunning) - def add(self, evtype, fileno, real_cb): + def add(self, evtype, fileno, real_cb, real_tb, mac): # this is stupid: pyevent won't call a callback unless it's a function, # so we have to force it to be one here if isinstance(real_cb, types.BuiltinMethodType): @@ -112,7 +112,7 @@ class Hub(BaseHub): elif evtype is WRITE: evt = event.write(fileno, cb, fileno) - return super(Hub,self).add(evtype, fileno, evt) + return super(Hub,self).add(evtype, fileno, evt, real_tb, mac) def signal(self, signalnum, handler): def wrapper(): diff --git a/eventlet/patcher.py b/eventlet/patcher.py index ee8832f..1ba921e 100644 --- a/eventlet/patcher.py +++ b/eventlet/patcher.py @@ -215,7 +215,7 @@ def monkey_patch(**on): It's safe to call monkey_patch multiple times. """ accepted_args = set(('os', 'select', 'socket', - 'thread', 'time', 'psycopg', 'MySQLdb')) + 'thread', 'time', 'psycopg', 'MySQLdb', '__builtin__')) default_on = on.pop("all",None) for k in six.iterkeys(on): if k not in accepted_args: @@ -248,6 +248,9 @@ def monkey_patch(**on): if on.get('MySQLdb') and not already_patched.get('MySQLdb'): modules_to_patch += _green_MySQLdb() already_patched['MySQLdb'] = True + if on.get('__builtin__') and not already_patched.get('__builtin__'): + modules_to_patch += _green_builtins() + already_patched['__builtin__'] = True if on['psycopg'] and not already_patched.get('psycopg'): try: from eventlet.support import psycopg2_patcher @@ -321,6 +324,13 @@ def _green_MySQLdb(): except ImportError: return [] +def _green_builtins(): + try: + from eventlet.green import builtin + return [('__builtin__', builtin)] + except ImportError: + return [] + def slurp_properties(source, destination, ignore=[], srckeys=None): """Copy properties from *source* (assumed to be a module) to