diff --git a/eventlet/api.py b/eventlet/api.py index d902fe8..d511a5f 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -7,7 +7,7 @@ import inspect import warnings from eventlet.support import greenlets as greenlet -from eventlet.hubs import get_hub as get_hub_, get_default_hub as get_default_hub_, use_hub as use_hub_ +from eventlet import hubs from eventlet import greenthread from eventlet import debug @@ -20,16 +20,16 @@ __all__ = [ def get_hub(*a, **kw): warnings.warn("eventlet.api.get_hub has moved to eventlet.hubs.get_hub", DeprecationWarning, stacklevel=2) - return get_hub_(*a, **kw) + return hubs.get_hub(*a, **kw) def get_default_hub(*a, **kw): warnings.warn("eventlet.api.get_default_hub has moved to" " eventlet.hubs.get_default_hub", DeprecationWarning, stacklevel=2) - return get_default_hub_(*a, **kw) + return hubs.get_default_hub(*a, **kw) def use_hub(*a, **kw): warnings.warn("eventlet.api.use_hub has moved to eventlet.hubs.use_hub", DeprecationWarning, stacklevel=2) - return use_hub_(*a, **kw) + return hubs.use_hub(*a, **kw) def switch(coro, result=None, exc=None): @@ -86,42 +86,7 @@ def connect_tcp(address, localaddr=None): TimeoutError = greenthread.TimeoutError -def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError): - """Suspend the current coroutine until the given socket object or file - descriptor is ready to *read*, ready to *write*, or the specified - *timeout* elapses, depending on arguments specified. - - To wait for *fd* to be ready to read, pass *read* ``=True``; ready to - write, pass *write* ``=True``. To specify a timeout, pass the *timeout* - argument in seconds. - - If the specified *timeout* elapses before the socket is ready to read or - write, *timeout_exc* will be raised instead of ``trampoline()`` - returning normally. - """ - t = None - hub = get_hub_() - current = greenlet.getcurrent() - assert hub.greenlet is not current, 'do not call blocking functions from the mainloop' - assert not (read and write), 'not allowed to trampoline for reading and writing' - fileno = getattr(fd, 'fileno', lambda: fd)() - def cb(d): - current.switch() - if timeout is not None: - t = hub.schedule_call_global(timeout, current.throw, timeout_exc) - try: - if read: - listener = hub.add(hub.READ, fileno, cb) - if write: - listener = hub.add(hub.WRITE, fileno, cb) - try: - return hub.switch() - finally: - hub.remove(listener) - finally: - if t is not None: - t.cancel() - +trampoline = hubs.trampoline spawn = greenthread.spawn spawn_n = greenthread.spawn_n diff --git a/eventlet/green/OpenSSL/SSL.py b/eventlet/green/OpenSSL/SSL.py index 23b8bbe..d0a0b42 100644 --- a/eventlet/green/OpenSSL/SSL.py +++ b/eventlet/green/OpenSSL/SSL.py @@ -1,7 +1,7 @@ from OpenSSL import SSL as orig_SSL from OpenSSL.SSL import * from eventlet import greenio -from eventlet.api import trampoline +from eventlet.hubs import trampoline import socket class GreenConnection(greenio.GreenSocket): diff --git a/eventlet/green/ssl.py b/eventlet/green/ssl.py index 9bdedec..de2dac3 100644 --- a/eventlet/green/ssl.py +++ b/eventlet/green/ssl.py @@ -6,7 +6,7 @@ for attr in dir(__ssl): import errno import time -from eventlet.api import trampoline, getcurrent +from eventlet.hubs import trampoline from thread import get_ident from eventlet.greenio import set_nonblocking, GreenSocket, SOCKET_CLOSED, CONNECT_ERR, CONNECT_SUCCESS orig_socket = __import__('socket') diff --git a/eventlet/greenio.py b/eventlet/greenio.py index c58ef36..f24315e 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -1,4 +1,4 @@ -from eventlet.api import trampoline +from eventlet.hubs import trampoline from eventlet.hubs import get_hub BUFFER_SIZE = 4096 diff --git a/eventlet/greenthread.py b/eventlet/greenthread.py index aaf3e95..975fb26 100644 --- a/eventlet/greenthread.py +++ b/eventlet/greenthread.py @@ -8,6 +8,7 @@ from eventlet.support import greenlets as greenlet __all__ = ['getcurrent', 'sleep', 'spawn', 'spawn_n', 'call_after_global', 'call_after_local', 'GreenThread'] getcurrent = greenlet.getcurrent +TimeoutError = hubs.TimeoutError def sleep(seconds=0): """Yield control to another eligible coroutine until at least *seconds* have @@ -89,9 +90,6 @@ def call_after_local(seconds, function, *args, **kwargs): call_after = call_after_local -class TimeoutError(Exception): - """Exception raised if an asynchronous operation times out""" - pass def exc_after(seconds, *throw_args): """Schedule an exception to be raised into the current coroutine diff --git a/eventlet/hubs/__init__.py b/eventlet/hubs/__init__.py index 52f6be9..9512a16 100644 --- a/eventlet/hubs/__init__.py +++ b/eventlet/hubs/__init__.py @@ -1,9 +1,10 @@ import select import sys import threading +from eventlet.support import greenlets as greenlet _threadlocal = threading.local() -__all__ = ["use_hub", "get_hub", "get_default_hub"] +__all__ = ["use_hub", "get_hub", "get_default_hub", "trampoline"] def get_default_hub(): """Select the default hub implementation based on what multiplexing @@ -76,3 +77,44 @@ def get_hub(): use_hub() hub = _threadlocal.hub = _threadlocal.Hub() return hub + +class TimeoutError(Exception): + """Exception raised if an asynchronous operation times out""" + pass + +def trampoline(fd, read=None, write=None, timeout=None, + timeout_exc=TimeoutError): + """Suspend the current coroutine until the given socket object or file + descriptor is ready to *read*, ready to *write*, or the specified + *timeout* elapses, depending on arguments specified. + + To wait for *fd* to be ready to read, pass *read* ``=True``; ready to + write, pass *write* ``=True``. To specify a timeout, pass the *timeout* + argument in seconds. + + If the specified *timeout* elapses before the socket is ready to read or + write, *timeout_exc* will be raised instead of ``trampoline()`` + returning normally. + """ + t = None + hub = get_hub() + current = greenlet.getcurrent() + assert hub.greenlet is not current, 'do not call blocking functions from the mainloop' + assert not (read and write), 'not allowed to trampoline for reading and writing' + fileno = getattr(fd, 'fileno', lambda: fd)() + def cb(d): + current.switch() + if timeout is not None: + t = hub.schedule_call_global(timeout, current.throw, timeout_exc) + try: + if read: + listener = hub.add(hub.READ, fileno, cb) + if write: + listener = hub.add(hub.WRITE, fileno, cb) + try: + return hub.switch() + finally: + hub.remove(listener) + finally: + if t is not None: + t.cancel() diff --git a/eventlet/util.py b/eventlet/util.py index 6aedcf6..efe516e 100644 --- a/eventlet/util.py +++ b/eventlet/util.py @@ -128,9 +128,9 @@ def wrap_pipes_with_coroutine_pipes(): def new_fdopen(*args, **kw): return greenio.GreenPipe(__original_fdopen__(*args, **kw)) def new_read(fd, *args, **kw): - from eventlet import api + from eventlet import hubs try: - api.trampoline(fd, read=True) + hubs.trampoline(fd, read=True) except socket.error, e: if e[0] == errno.EPIPE: return '' @@ -138,8 +138,8 @@ def wrap_pipes_with_coroutine_pipes(): raise return __original_read__(fd, *args, **kw) def new_write(fd, *args, **kw): - from eventlet import api - api.trampoline(fd, write=True) + from eventlet import hubs + hubs.trampoline(fd, write=True) return __original_write__(fd, *args, **kw) def new_fork(*args, **kwargs): pid = __original_fork__()