Fix second simultaneous read (parallel paramiko issue)
https://github.com/eventlet/eventlet/issues/94 Because of the way paramiko utilises a client thread to manage its communication, it's not been compatible with eventlet when run in parallel. It's not the only place these problems would arise. This stemmed from the reuse of a fileno by the underlying OS. Because listeners are registered against this descriptor, it would be possible for old listeners to receive events destined for newer descriptors; occasionally code would attempt to utilise the new descriptor from a different greenlet, giving rise to the 'second simultaneous read' problem. Whenever a Python object is created to wrap one of these filenos, we now signal the hub in order that it can correctly obsolete extant listeners against that fileno. This is a fairly tricky operation, due to the way that listeners' threads are interleaved with the hub's operation - there are a number of small fixes here to defend against one listener from effectively obsoleting another when an event is pending against it.
This commit is contained in:
committed by
Sergey Shepelev
parent
2f49f860b8
commit
da87716714
43
eventlet/green/builtin.py
Normal file
43
eventlet/green/builtin.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""
|
||||
In order to detect a filehandle that's been closed, our only clue may be
|
||||
the operating system returning the same filehandle in response to some
|
||||
other operation.
|
||||
|
||||
The builtins 'file' and 'open' are patched to collaborate with the
|
||||
notify_opened protocol.
|
||||
"""
|
||||
|
||||
builtins_orig = __builtins__
|
||||
|
||||
from eventlet import hubs
|
||||
from eventlet.hubs import hub
|
||||
from eventlet.patcher import slurp_properties
|
||||
import sys
|
||||
|
||||
__all__ = dir(builtins_orig)
|
||||
__patched__ = ['file', 'open']
|
||||
|
||||
slurp_properties(builtins_orig, globals(),
|
||||
ignore=__patched__, srckeys=dir(builtins_orig))
|
||||
|
||||
hubs.get_hub()
|
||||
|
||||
__original_file = file
|
||||
class file(__original_file):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(file, self).__init__(*args, **kwargs)
|
||||
hubs.notify_opened(self.fileno())
|
||||
|
||||
__original_open = open
|
||||
__opening = False
|
||||
def open(*args):
|
||||
global __opening
|
||||
result = __original_open(*args)
|
||||
if not __opening:
|
||||
# This is incredibly ugly. 'open' is used under the hood by
|
||||
# the import process. So, ensure we don't wind up in an
|
||||
# infinite loop.
|
||||
__opening = True
|
||||
hubs.notify_opened(result.fileno())
|
||||
__opening = False
|
||||
return result
|
||||
@@ -9,7 +9,7 @@ from eventlet import hubs
|
||||
from eventlet.patcher import slurp_properties
|
||||
|
||||
__all__ = os_orig.__all__
|
||||
__patched__ = ['fdopen', 'read', 'write', 'wait', 'waitpid']
|
||||
__patched__ = ['fdopen', 'read', 'write', 'wait', 'waitpid', 'open']
|
||||
|
||||
slurp_properties(os_orig, globals(),
|
||||
ignore=__patched__, srckeys=dir(os_orig))
|
||||
@@ -40,7 +40,10 @@ def read(fd, n):
|
||||
if get_errno(e) == errno.EPIPE:
|
||||
return ''
|
||||
raise
|
||||
hubs.trampoline(fd, read=True)
|
||||
try:
|
||||
hubs.trampoline(fd, read=True)
|
||||
except hubs.IOClosed:
|
||||
return ''
|
||||
|
||||
__original_write__ = os_orig.write
|
||||
def write(fd, st):
|
||||
@@ -81,4 +84,12 @@ def waitpid(pid, options):
|
||||
return rpid, status
|
||||
greenthread.sleep(0.01)
|
||||
|
||||
# TODO: open
|
||||
__original_open__ = os_orig.open
|
||||
def open(file, flags, mode=0777):
|
||||
""" Wrap os.open
|
||||
This behaves identically, but collaborates with
|
||||
the hub's notify_opened protocol.
|
||||
"""
|
||||
fd = __original_open__(file, flags, mode)
|
||||
hubs.notify_opened(fd)
|
||||
return fd
|
||||
@@ -8,7 +8,7 @@ import errno
|
||||
time = __import__('time')
|
||||
|
||||
from eventlet.support import get_errno
|
||||
from eventlet.hubs import trampoline
|
||||
from eventlet.hubs import trampoline, IOClosed
|
||||
from eventlet.greenio import set_nonblocking, GreenSocket, SOCKET_CLOSED, CONNECT_ERR, CONNECT_SUCCESS
|
||||
orig_socket = __import__('socket')
|
||||
socket = orig_socket.socket
|
||||
@@ -98,8 +98,11 @@ class GreenSSLSocket(__ssl.SSLSocket):
|
||||
def read(self, len=1024):
|
||||
"""Read up to LEN bytes and return them.
|
||||
Return zero-length string on EOF."""
|
||||
return self._call_trampolining(
|
||||
super(GreenSSLSocket, self).read, len)
|
||||
try:
|
||||
return self._call_trampolining(
|
||||
super(GreenSSLSocket, self).read, len)
|
||||
except IOClosed:
|
||||
return ''
|
||||
|
||||
def send (self, data, flags=0):
|
||||
if self._sslobj:
|
||||
@@ -164,8 +167,11 @@ class GreenSSLSocket(__ssl.SSLSocket):
|
||||
if self.act_non_blocking:
|
||||
raise
|
||||
if get_errno(e) == errno.EWOULDBLOCK:
|
||||
trampoline(self, read=True,
|
||||
timeout=self.gettimeout(), timeout_exc=timeout_exc('timed out'))
|
||||
try:
|
||||
trampoline(self, read=True,
|
||||
timeout=self.gettimeout(), timeout_exc=timeout_exc('timed out'))
|
||||
except IOClosed:
|
||||
return ''
|
||||
if get_errno(e) in SOCKET_CLOSED:
|
||||
return ''
|
||||
raise
|
||||
|
||||
@@ -7,7 +7,7 @@ import time
|
||||
import warnings
|
||||
|
||||
from eventlet.support import get_errno, six
|
||||
from eventlet.hubs import trampoline
|
||||
from eventlet.hubs import trampoline, notify_close, notify_opened, IOClosed
|
||||
|
||||
__all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe']
|
||||
|
||||
@@ -122,6 +122,8 @@ class GreenSocket(object):
|
||||
should_set_nonblocking = kwargs.pop('set_nonblocking', True)
|
||||
if isinstance(family_or_realsock, six.integer_types):
|
||||
fd = _original_socket(family_or_realsock, *args, **kwargs)
|
||||
# Notify the hub that this is a newly-opened socket.
|
||||
notify_opened(fd.fileno())
|
||||
else:
|
||||
fd = family_or_realsock
|
||||
|
||||
@@ -151,6 +153,7 @@ class GreenSocket(object):
|
||||
self.listen = fd.listen
|
||||
self.setsockopt = fd.setsockopt
|
||||
self.shutdown = fd.shutdown
|
||||
self._closed = False
|
||||
|
||||
@property
|
||||
def _sock(self):
|
||||
@@ -166,6 +169,25 @@ class GreenSocket(object):
|
||||
setattr(self, name, attr)
|
||||
return attr
|
||||
|
||||
def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
|
||||
""" We need to trampoline via the event hub.
|
||||
We catch any signal back from the hub indicating that the operation we
|
||||
were waiting on was associated with a filehandle that's since been
|
||||
invalidated.
|
||||
"""
|
||||
if self._closed:
|
||||
# If we did any logging, alerting to a second trampoline attempt on a closed
|
||||
# socket here would be useful.
|
||||
raise IOClosed()
|
||||
try:
|
||||
return trampoline(fd, read=True, timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"),
|
||||
mark_as_closed=self._mark_as_closed)
|
||||
except IOClosed:
|
||||
# This socket's been obsoleted. De-fang it.
|
||||
self._mark_as_closed()
|
||||
raise
|
||||
|
||||
def accept(self):
|
||||
if self.act_non_blocking:
|
||||
return self.fd.accept()
|
||||
@@ -176,16 +198,37 @@ class GreenSocket(object):
|
||||
client, addr = res
|
||||
set_nonblocking(client)
|
||||
return type(self)(client), addr
|
||||
trampoline(fd, read=True, timeout=self.gettimeout(),
|
||||
self._trampoline(fd, read=True, timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
|
||||
def _defanged_close(self):
|
||||
# Already closed the once
|
||||
pass
|
||||
|
||||
|
||||
def _mark_as_closed(self):
|
||||
""" Mark this socket as being closed """
|
||||
self.close = self._defanged_close
|
||||
self._closed = True
|
||||
|
||||
def close(self):
|
||||
notify_close(self.fd)
|
||||
self._mark_as_closed() # Don't do this twice.
|
||||
return self.fd.close()
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
def connect(self, address):
|
||||
if self.act_non_blocking:
|
||||
return self.fd.connect(address)
|
||||
fd = self.fd
|
||||
if self.gettimeout() is None:
|
||||
while not socket_connect(fd, address):
|
||||
trampoline(fd, write=True)
|
||||
try:
|
||||
self._trampoline(fd, write=True)
|
||||
except IOClosed:
|
||||
raise socket.error(errno.EBADFD)
|
||||
socket_checkerr(fd)
|
||||
else:
|
||||
end = time.time() + self.gettimeout()
|
||||
@@ -194,8 +237,12 @@ class GreenSocket(object):
|
||||
return
|
||||
if time.time() >= end:
|
||||
raise socket.timeout("timed out")
|
||||
trampoline(fd, write=True, timeout=end - time.time(),
|
||||
try:
|
||||
self._trampoline(fd, write=True, timeout=end - time.time(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
except IOClosed:
|
||||
# ... we need some workable errno here.
|
||||
raise socket.error(errno.EBADFD)
|
||||
socket_checkerr(fd)
|
||||
|
||||
def connect_ex(self, address):
|
||||
@@ -205,10 +252,12 @@ class GreenSocket(object):
|
||||
if self.gettimeout() is None:
|
||||
while not socket_connect(fd, address):
|
||||
try:
|
||||
trampoline(fd, write=True)
|
||||
self._trampoline(fd, write=True)
|
||||
socket_checkerr(fd)
|
||||
except socket.error as ex:
|
||||
return get_errno(ex)
|
||||
except IOClosed:
|
||||
return errno.EBADFD
|
||||
else:
|
||||
end = time.time() + self.gettimeout()
|
||||
while True:
|
||||
@@ -217,11 +266,13 @@ class GreenSocket(object):
|
||||
return 0
|
||||
if time.time() >= end:
|
||||
raise socket.timeout(errno.EAGAIN)
|
||||
trampoline(fd, write=True, timeout=end - time.time(),
|
||||
self._trampoline(fd, write=True, timeout=end - time.time(),
|
||||
timeout_exc=socket.timeout(errno.EAGAIN))
|
||||
socket_checkerr(fd)
|
||||
except socket.error as ex:
|
||||
return get_errno(ex)
|
||||
except IOClosed:
|
||||
return errno.EBADFD
|
||||
|
||||
def dup(self, *args, **kw):
|
||||
sock = self.fd.dup(*args, **kw)
|
||||
@@ -255,27 +306,31 @@ class GreenSocket(object):
|
||||
return ''
|
||||
else:
|
||||
raise
|
||||
trampoline(
|
||||
fd,
|
||||
read=True,
|
||||
timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
try:
|
||||
self._trampoline(
|
||||
fd,
|
||||
read=True,
|
||||
timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
except IOClosed as e:
|
||||
# Perhaps we should return '' instead?
|
||||
raise EOFError()
|
||||
|
||||
def recvfrom(self, *args):
|
||||
if not self.act_non_blocking:
|
||||
trampoline(self.fd, read=True, timeout=self.gettimeout(),
|
||||
self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
return self.fd.recvfrom(*args)
|
||||
|
||||
def recvfrom_into(self, *args):
|
||||
if not self.act_non_blocking:
|
||||
trampoline(self.fd, read=True, timeout=self.gettimeout(),
|
||||
self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
return self.fd.recvfrom_into(*args)
|
||||
|
||||
def recv_into(self, *args):
|
||||
if not self.act_non_blocking:
|
||||
trampoline(self.fd, read=True, timeout=self.gettimeout(),
|
||||
self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
return self.fd.recv_into(*args)
|
||||
|
||||
@@ -297,8 +352,11 @@ class GreenSocket(object):
|
||||
if total_sent == len_data:
|
||||
break
|
||||
|
||||
trampoline(self.fd, write=True, timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
try:
|
||||
self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
except IOClosed:
|
||||
raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
|
||||
|
||||
return total_sent
|
||||
|
||||
@@ -309,7 +367,7 @@ class GreenSocket(object):
|
||||
tail += self.send(data[tail:], flags)
|
||||
|
||||
def sendto(self, *args):
|
||||
trampoline(self.fd, write=True)
|
||||
self._trampoline(self.fd, write=True)
|
||||
return self.fd.sendto(*args)
|
||||
|
||||
def setblocking(self, flag):
|
||||
@@ -357,6 +415,30 @@ class _SocketDuckForFd(object):
|
||||
|
||||
def __init__(self, fileno):
|
||||
self._fileno = fileno
|
||||
notify_opened(fileno)
|
||||
self._closed = False
|
||||
|
||||
def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
|
||||
if self._closed:
|
||||
# Don't trampoline if we're already closed.
|
||||
raise IOClosed()
|
||||
try:
|
||||
return trampoline(fd, read=True, timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"),
|
||||
mark_as_closed=self.mark_as_closed)
|
||||
except IOClosed:
|
||||
# Our fileno has been obsoleted. Defang ourselves to
|
||||
# prevent spurious closes.
|
||||
self._mark_as_closed()
|
||||
raise
|
||||
|
||||
def _defanged_close(self):
|
||||
# Don't let anything close the wrong filehandle.
|
||||
pass
|
||||
|
||||
def _mark_as_closed(self):
|
||||
self.close = self._close = self._defanged_close
|
||||
self._closed = True
|
||||
|
||||
@property
|
||||
def _sock(self):
|
||||
@@ -373,7 +455,7 @@ class _SocketDuckForFd(object):
|
||||
except OSError as e:
|
||||
if get_errno(e) not in SOCKET_BLOCKING:
|
||||
raise IOError(*e.args)
|
||||
trampoline(self, read=True)
|
||||
self._trampoline(self, read=True)
|
||||
|
||||
def recv_into(self, buf, nbytes=0, flags=0):
|
||||
if nbytes == 0:
|
||||
@@ -402,7 +484,7 @@ class _SocketDuckForFd(object):
|
||||
raise IOError(*e.args)
|
||||
total_sent = 0
|
||||
while total_sent < len_data:
|
||||
trampoline(self, write=True)
|
||||
self._trampoline(self, write=True)
|
||||
try:
|
||||
total_sent += os_write(fileno, data[total_sent:])
|
||||
except OSError as e:
|
||||
@@ -413,6 +495,8 @@ class _SocketDuckForFd(object):
|
||||
self._close()
|
||||
|
||||
def _close(self):
|
||||
notify_close(self._fileno)
|
||||
self._mark_as_closed()
|
||||
try:
|
||||
os.close(self._fileno)
|
||||
except:
|
||||
@@ -484,7 +568,14 @@ class GreenPipe(_fileobject):
|
||||
self.mode,
|
||||
(id(self) < 0) and (sys.maxint + id(self)) or id(self))
|
||||
|
||||
def _defanged_close(self):
|
||||
pass
|
||||
|
||||
def _mark_as_closed(self):
|
||||
self.close = self._defanged_close
|
||||
|
||||
def close(self):
|
||||
self._mark_as_closed()
|
||||
super(GreenPipe, self).close()
|
||||
for method in [
|
||||
'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
|
||||
|
||||
@@ -119,7 +119,8 @@ from eventlet import timeout
|
||||
|
||||
|
||||
def trampoline(fd, read=None, write=None, timeout=None,
|
||||
timeout_exc=timeout.Timeout):
|
||||
timeout_exc=timeout.Timeout,
|
||||
mark_as_closed = None):
|
||||
"""Suspend the current coroutine until the given socket object or file
|
||||
descriptor is ready to *read*, ready to *write*, or the specified
|
||||
*timeout* elapses, depending on arguments specified.
|
||||
@@ -145,12 +146,15 @@ def trampoline(fd, read=None, write=None, timeout=None,
|
||||
except AttributeError:
|
||||
fileno = fd
|
||||
if timeout is not None:
|
||||
t = hub.schedule_call_global(timeout, current.throw, timeout_exc)
|
||||
def _timeout(exc):
|
||||
# This is only useful to insert debugging
|
||||
current.throw(exc)
|
||||
t = hub.schedule_call_global(timeout, _timeout, timeout_exc)
|
||||
try:
|
||||
if read:
|
||||
listener = hub.add(hub.READ, fileno, current.switch)
|
||||
listener = hub.add(hub.READ, fileno, current.switch, current.throw, mark_as_closed)
|
||||
elif write:
|
||||
listener = hub.add(hub.WRITE, fileno, current.switch)
|
||||
listener = hub.add(hub.WRITE, fileno, current.switch, current.throw, mark_as_closed)
|
||||
try:
|
||||
return hub.switch()
|
||||
finally:
|
||||
@@ -158,3 +162,27 @@ def trampoline(fd, read=None, write=None, timeout=None,
|
||||
finally:
|
||||
if t is not None:
|
||||
t.cancel()
|
||||
|
||||
def notify_close(fd):
|
||||
"""
|
||||
A particular file descriptor has been explicitly closed. Register for any
|
||||
waiting listeners to be notified on the next run loop.
|
||||
"""
|
||||
hub = get_hub()
|
||||
hub.notify_close(fd)
|
||||
|
||||
def notify_opened(fd):
|
||||
"""
|
||||
Some file descriptors may be closed 'silently' - that is, by the garbage
|
||||
collector, by an external library, etc. When the OS returns a file descriptor
|
||||
from an open call (or something similar), this may be the only indication we
|
||||
have that the FD has been closed and then recycled.
|
||||
We let the hub know that the old file descriptor is dead; any stuck listeners
|
||||
will be disabled and notified in turn.
|
||||
"""
|
||||
hub = get_hub()
|
||||
hub.mark_as_reopened(fd)
|
||||
|
||||
|
||||
class IOClosed(IOError):
|
||||
pass
|
||||
|
||||
@@ -42,10 +42,10 @@ class Hub(poll.Hub):
|
||||
except AttributeError:
|
||||
self.modify = self.poll.register
|
||||
|
||||
def add(self, evtype, fileno, cb):
|
||||
def add(self, evtype, fileno, cb, tb, mac):
|
||||
oldlisteners = bool(self.listeners[READ].get(fileno) or
|
||||
self.listeners[WRITE].get(fileno))
|
||||
listener = BaseHub.add(self, evtype, fileno, cb)
|
||||
listener = BaseHub.add(self, evtype, fileno, cb, tb, mac)
|
||||
try:
|
||||
if not oldlisteners:
|
||||
# Means we've added a new listener
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import errno
|
||||
import heapq
|
||||
import math
|
||||
import traceback
|
||||
@@ -20,7 +21,7 @@ else:
|
||||
arm_alarm = alarm_signal
|
||||
|
||||
from eventlet.support import greenlets as greenlet, clear_sys_exc_info
|
||||
from eventlet.hubs import timer
|
||||
from eventlet.hubs import timer, IOClosed
|
||||
from eventlet import patcher
|
||||
time = patcher.original('time')
|
||||
|
||||
@@ -29,30 +30,64 @@ g_prevent_multiple_readers = True
|
||||
READ="read"
|
||||
WRITE="write"
|
||||
|
||||
|
||||
def closed_callback(fileno):
|
||||
""" Used to de-fang a callback that may be triggered by a loop in BaseHub.wait
|
||||
"""
|
||||
# No-op.
|
||||
pass
|
||||
|
||||
|
||||
class FdListener(object):
|
||||
def __init__(self, evtype, fileno, cb):
|
||||
def __init__(self, evtype, fileno, cb, tb, mark_as_closed):
|
||||
""" The following are required:
|
||||
cb - the standard callback, which will switch into the
|
||||
listening greenlet to indicate that the event waited upon
|
||||
is ready
|
||||
tb - a 'throwback'. This is typically greenlet.throw, used
|
||||
to raise a signal into the target greenlet indicating that
|
||||
an event was obsoleted by its underlying filehandle being
|
||||
repurposed.
|
||||
mark_as_closed - if any listener is obsoleted, this is called
|
||||
(in the context of some other client greenlet) to alert
|
||||
underlying filehandle-wrapping objects that they've been
|
||||
closed.
|
||||
"""
|
||||
assert (evtype is READ or evtype is WRITE)
|
||||
self.evtype = evtype
|
||||
self.fileno = fileno
|
||||
self.cb = cb
|
||||
self.tb = tb
|
||||
self.mark_as_closed = mark_as_closed
|
||||
self.spent = False
|
||||
self.greenlet = greenlet.getcurrent()
|
||||
def __repr__(self):
|
||||
return "%s(%r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno, self.cb)
|
||||
return "%s(%r, %r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno,
|
||||
self.cb, self.tb)
|
||||
__str__ = __repr__
|
||||
|
||||
def defang(self):
|
||||
self.cb = closed_callback
|
||||
if self.mark_as_closed is not None:
|
||||
self.mark_as_closed()
|
||||
self.spent = True
|
||||
|
||||
|
||||
noop = FdListener(READ, 0, lambda x: None, lambda x: None, None)
|
||||
|
||||
noop = FdListener(READ, 0, lambda x: None)
|
||||
|
||||
# in debug mode, track the call site that created the listener
|
||||
class DebugListener(FdListener):
|
||||
def __init__(self, evtype, fileno, cb):
|
||||
def __init__(self, evtype, fileno, cb, tb, mark_as_closed):
|
||||
self.where_called = traceback.format_stack()
|
||||
self.greenlet = greenlet.getcurrent()
|
||||
super(DebugListener, self).__init__(evtype, fileno, cb)
|
||||
super(DebugListener, self).__init__(evtype, fileno, cb, tb, mark_as_closed)
|
||||
def __repr__(self):
|
||||
return "DebugListener(%r, %r, %r, %r)\n%sEndDebugFdListener" % (
|
||||
return "DebugListener(%r, %r, %r, %r, %r, %r)\n%sEndDebugFdListener" % (
|
||||
self.evtype,
|
||||
self.fileno,
|
||||
self.cb,
|
||||
self.tb,
|
||||
self.mark_as_closed,
|
||||
self.greenlet,
|
||||
''.join(self.where_called))
|
||||
__str__ = __repr__
|
||||
@@ -75,6 +110,7 @@ class BaseHub(object):
|
||||
def __init__(self, clock=time.time):
|
||||
self.listeners = {READ:{}, WRITE:{}}
|
||||
self.secondaries = {READ:{}, WRITE:{}}
|
||||
self.closed = []
|
||||
|
||||
self.clock = clock
|
||||
self.greenlet = greenlet.greenlet(self.run)
|
||||
@@ -102,7 +138,7 @@ class BaseHub(object):
|
||||
signal.signal(signal.SIGALRM, self._old_signal_handler)
|
||||
signal.alarm(0)
|
||||
|
||||
def add(self, evtype, fileno, cb):
|
||||
def add(self, evtype, fileno, cb, tb, mark_as_closed):
|
||||
""" Signals an intent to or write a particular file descriptor.
|
||||
|
||||
The *evtype* argument is either the constant READ or WRITE.
|
||||
@@ -111,8 +147,15 @@ class BaseHub(object):
|
||||
|
||||
The *cb* argument is the callback which will be called when the file
|
||||
is ready for reading/writing.
|
||||
|
||||
The *tb* argument is the throwback used to signal (into the greenlet)
|
||||
that the file was closed.
|
||||
|
||||
The *mark_as_closed* is used in the context of the event hub to
|
||||
prepare a Python object as being closed, pre-empting further
|
||||
close operations from accidentally shutting down the wrong OS thread.
|
||||
"""
|
||||
listener = self.lclass(evtype, fileno, cb)
|
||||
listener = self.lclass(evtype, fileno, cb, tb, mark_as_closed)
|
||||
bucket = self.listeners[evtype]
|
||||
if fileno in bucket:
|
||||
if g_prevent_multiple_readers:
|
||||
@@ -122,15 +165,51 @@ class BaseHub(object):
|
||||
"particular socket. Consider using a pools.Pool. "\
|
||||
"If you do know what you're doing and want to disable "\
|
||||
"this error, call "\
|
||||
"eventlet.debug.hub_prevent_multiple_readers(False)" % (
|
||||
evtype, fileno, evtype))
|
||||
"eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=%s; THAT THREAD=%s" % (
|
||||
evtype, fileno, evtype, cb, bucket[fileno]))
|
||||
# store off the second listener in another structure
|
||||
self.secondaries[evtype].setdefault(fileno, []).append(listener)
|
||||
else:
|
||||
bucket[fileno] = listener
|
||||
return listener
|
||||
|
||||
def _obsolete(self, fileno):
|
||||
""" We've received an indication that 'fileno' has been obsoleted.
|
||||
Any current listeners must be defanged, and notifications to
|
||||
their greenlets queued up to send.
|
||||
"""
|
||||
found = False
|
||||
for evtype, bucket in self.secondaries.items():
|
||||
if fileno in bucket:
|
||||
for listener in bucket[fileno]:
|
||||
found = True
|
||||
self.closed.append(listener)
|
||||
listener.defang()
|
||||
del bucket[fileno]
|
||||
|
||||
# For the primary listeners, we actually need to call remove,
|
||||
# which may modify the underlying OS polling objects.
|
||||
for evtype, bucket in self.listeners.items():
|
||||
if fileno in bucket:
|
||||
listener = bucket[fileno]
|
||||
found = True
|
||||
listener.defang()
|
||||
self.closed.append(bucket[fileno])
|
||||
self.remove(listener)
|
||||
|
||||
return found
|
||||
|
||||
def notify_close(self, fileno):
|
||||
""" We might want to do something when a fileno is closed.
|
||||
However, currently it suffices to obsolete listeners only
|
||||
when we detect an old fileno being recycled, on open.
|
||||
"""
|
||||
pass
|
||||
|
||||
def remove(self, listener):
|
||||
if listener.spent:
|
||||
# trampoline may trigger this in its finally section.
|
||||
return
|
||||
fileno = listener.fileno
|
||||
evtype = listener.evtype
|
||||
self.listeners[evtype].pop(fileno, None)
|
||||
@@ -143,6 +222,16 @@ class BaseHub(object):
|
||||
if not sec:
|
||||
del self.secondaries[evtype][fileno]
|
||||
|
||||
def mark_as_reopened(self, fileno):
|
||||
""" If a file descriptor is returned by the OS as the result of some
|
||||
open call (or equivalent), that signals that it might be being
|
||||
recycled.
|
||||
|
||||
Catch the case where the fd was previously in use.
|
||||
"""
|
||||
self._obsolete(fileno)
|
||||
|
||||
|
||||
def remove_descriptor(self, fileno):
|
||||
""" Completely remove all listeners for this fileno. For internal use
|
||||
only."""
|
||||
@@ -157,6 +246,16 @@ class BaseHub(object):
|
||||
except Exception as e:
|
||||
self.squelch_generic_exception(sys.exc_info())
|
||||
|
||||
def close_one(self):
|
||||
""" Triggered from the main run loop. If a listener's underlying FD was
|
||||
closed somehow, throw an exception back to the trampoline, which should
|
||||
be able to manage it appropriately.
|
||||
"""
|
||||
listener = self.closed.pop()
|
||||
if not listener.greenlet.dead:
|
||||
# There's no point signalling a greenlet that's already dead.
|
||||
listener.tb(IOClosed(errno.ENOTCONN, "Operation on closed file"))
|
||||
|
||||
def ensure_greenlet(self):
|
||||
if self.greenlet.dead:
|
||||
# create new greenlet sharing same parent as original
|
||||
@@ -220,6 +319,9 @@ class BaseHub(object):
|
||||
self.running = True
|
||||
self.stopping = False
|
||||
while not self.stopping:
|
||||
while self.closed:
|
||||
# We ditch all of these first.
|
||||
self.close_one()
|
||||
self.prepare_timers()
|
||||
if self.debug_blocking:
|
||||
self.block_detect_pre()
|
||||
|
||||
@@ -48,8 +48,8 @@ class Hub(BaseHub):
|
||||
return self.kqueue.control(events, max_events, timeout)
|
||||
raise
|
||||
|
||||
def add(self, evtype, fileno, cb):
|
||||
listener = super(Hub, self).add(evtype, fileno, cb)
|
||||
def add(self, evtype, fileno, cb, tb, mac):
|
||||
listener = super(Hub, self).add(evtype, fileno, cb, tb, mac)
|
||||
events = self._events.setdefault(fileno, {})
|
||||
if evtype not in events:
|
||||
try:
|
||||
|
||||
@@ -24,8 +24,8 @@ class Hub(BaseHub):
|
||||
except AttributeError:
|
||||
self.modify = self.poll.register
|
||||
|
||||
def add(self, evtype, fileno, cb):
|
||||
listener = super(Hub, self).add(evtype, fileno, cb)
|
||||
def add(self, evtype, fileno, cb, tb, mac):
|
||||
listener = super(Hub, self).add(evtype, fileno, cb, tb, mac)
|
||||
self.register(fileno, new=True)
|
||||
return listener
|
||||
|
||||
@@ -92,18 +92,27 @@ class Hub(BaseHub):
|
||||
if self.debug_blocking:
|
||||
self.block_detect_pre()
|
||||
|
||||
# Accumulate the listeners to call back to prior to
|
||||
# triggering any of them. This is to keep the set
|
||||
# of callbacks in sync with the events we've just
|
||||
# polled for. It prevents one handler from invalidating
|
||||
# another.
|
||||
callbacks = set()
|
||||
for fileno, event in presult:
|
||||
if event & READ_MASK:
|
||||
callbacks.add((readers.get(fileno, noop), fileno))
|
||||
if event & WRITE_MASK:
|
||||
callbacks.add((writers.get(fileno, noop), fileno))
|
||||
if event & select.POLLNVAL:
|
||||
self.remove_descriptor(fileno)
|
||||
continue
|
||||
if event & EXC_MASK:
|
||||
callbacks.add((readers.get(fileno, noop), fileno))
|
||||
callbacks.add((writers.get(fileno, noop), fileno))
|
||||
|
||||
for listener, fileno in callbacks:
|
||||
try:
|
||||
if event & READ_MASK:
|
||||
readers.get(fileno, noop).cb(fileno)
|
||||
if event & WRITE_MASK:
|
||||
writers.get(fileno, noop).cb(fileno)
|
||||
if event & select.POLLNVAL:
|
||||
self.remove_descriptor(fileno)
|
||||
continue
|
||||
if event & EXC_MASK:
|
||||
readers.get(fileno, noop).cb(fileno)
|
||||
writers.get(fileno, noop).cb(fileno)
|
||||
listener.cb(fileno)
|
||||
except SYSTEM_EXCEPTIONS:
|
||||
raise
|
||||
except:
|
||||
|
||||
@@ -98,7 +98,7 @@ class Hub(BaseHub):
|
||||
pass # exists for compatibility with BaseHub
|
||||
running = property(_getrunning, _setrunning)
|
||||
|
||||
def add(self, evtype, fileno, real_cb):
|
||||
def add(self, evtype, fileno, real_cb, real_tb, mac):
|
||||
# this is stupid: pyevent won't call a callback unless it's a function,
|
||||
# so we have to force it to be one here
|
||||
if isinstance(real_cb, types.BuiltinMethodType):
|
||||
@@ -112,7 +112,7 @@ class Hub(BaseHub):
|
||||
elif evtype is WRITE:
|
||||
evt = event.write(fileno, cb, fileno)
|
||||
|
||||
return super(Hub,self).add(evtype, fileno, evt)
|
||||
return super(Hub,self).add(evtype, fileno, evt, real_tb, mac)
|
||||
|
||||
def signal(self, signalnum, handler):
|
||||
def wrapper():
|
||||
|
||||
@@ -215,7 +215,7 @@ def monkey_patch(**on):
|
||||
It's safe to call monkey_patch multiple times.
|
||||
"""
|
||||
accepted_args = set(('os', 'select', 'socket',
|
||||
'thread', 'time', 'psycopg', 'MySQLdb'))
|
||||
'thread', 'time', 'psycopg', 'MySQLdb', '__builtin__'))
|
||||
default_on = on.pop("all",None)
|
||||
for k in six.iterkeys(on):
|
||||
if k not in accepted_args:
|
||||
@@ -248,6 +248,9 @@ def monkey_patch(**on):
|
||||
if on.get('MySQLdb') and not already_patched.get('MySQLdb'):
|
||||
modules_to_patch += _green_MySQLdb()
|
||||
already_patched['MySQLdb'] = True
|
||||
if on.get('__builtin__') and not already_patched.get('__builtin__'):
|
||||
modules_to_patch += _green_builtins()
|
||||
already_patched['__builtin__'] = True
|
||||
if on['psycopg'] and not already_patched.get('psycopg'):
|
||||
try:
|
||||
from eventlet.support import psycopg2_patcher
|
||||
@@ -321,6 +324,13 @@ def _green_MySQLdb():
|
||||
except ImportError:
|
||||
return []
|
||||
|
||||
def _green_builtins():
|
||||
try:
|
||||
from eventlet.green import builtin
|
||||
return [('__builtin__', builtin)]
|
||||
except ImportError:
|
||||
return []
|
||||
|
||||
|
||||
def slurp_properties(source, destination, ignore=[], srckeys=None):
|
||||
"""Copy properties from *source* (assumed to be a module) to
|
||||
|
||||
Reference in New Issue
Block a user