Moved trampoline into hubs/__init__, which maybe is the best place for it; open to suggestions.

This commit is contained in:
Ryan Williams
2010-01-24 15:41:53 -05:00
parent d877a490c3
commit dcaf551256
7 changed files with 56 additions and 51 deletions

View File

@@ -7,7 +7,7 @@ import inspect
import warnings
from eventlet.support import greenlets as greenlet
from eventlet.hubs import get_hub as get_hub_, get_default_hub as get_default_hub_, use_hub as use_hub_
from eventlet import hubs
from eventlet import greenthread
from eventlet import debug
@@ -20,16 +20,16 @@ __all__ = [
def get_hub(*a, **kw):
warnings.warn("eventlet.api.get_hub has moved to eventlet.hubs.get_hub",
DeprecationWarning, stacklevel=2)
return get_hub_(*a, **kw)
return hubs.get_hub(*a, **kw)
def get_default_hub(*a, **kw):
warnings.warn("eventlet.api.get_default_hub has moved to"
" eventlet.hubs.get_default_hub",
DeprecationWarning, stacklevel=2)
return get_default_hub_(*a, **kw)
return hubs.get_default_hub(*a, **kw)
def use_hub(*a, **kw):
warnings.warn("eventlet.api.use_hub has moved to eventlet.hubs.use_hub",
DeprecationWarning, stacklevel=2)
return use_hub_(*a, **kw)
return hubs.use_hub(*a, **kw)
def switch(coro, result=None, exc=None):
@@ -86,42 +86,7 @@ def connect_tcp(address, localaddr=None):
TimeoutError = greenthread.TimeoutError
def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError):
"""Suspend the current coroutine until the given socket object or file
descriptor is ready to *read*, ready to *write*, or the specified
*timeout* elapses, depending on arguments specified.
To wait for *fd* to be ready to read, pass *read* ``=True``; ready to
write, pass *write* ``=True``. To specify a timeout, pass the *timeout*
argument in seconds.
If the specified *timeout* elapses before the socket is ready to read or
write, *timeout_exc* will be raised instead of ``trampoline()``
returning normally.
"""
t = None
hub = get_hub_()
current = greenlet.getcurrent()
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
assert not (read and write), 'not allowed to trampoline for reading and writing'
fileno = getattr(fd, 'fileno', lambda: fd)()
def cb(d):
current.switch()
if timeout is not None:
t = hub.schedule_call_global(timeout, current.throw, timeout_exc)
try:
if read:
listener = hub.add(hub.READ, fileno, cb)
if write:
listener = hub.add(hub.WRITE, fileno, cb)
try:
return hub.switch()
finally:
hub.remove(listener)
finally:
if t is not None:
t.cancel()
trampoline = hubs.trampoline
spawn = greenthread.spawn
spawn_n = greenthread.spawn_n

View File

@@ -1,7 +1,7 @@
from OpenSSL import SSL as orig_SSL
from OpenSSL.SSL import *
from eventlet import greenio
from eventlet.api import trampoline
from eventlet.hubs import trampoline
import socket
class GreenConnection(greenio.GreenSocket):

View File

@@ -6,7 +6,7 @@ for attr in dir(__ssl):
import errno
import time
from eventlet.api import trampoline, getcurrent
from eventlet.hubs import trampoline
from thread import get_ident
from eventlet.greenio import set_nonblocking, GreenSocket, SOCKET_CLOSED, CONNECT_ERR, CONNECT_SUCCESS
orig_socket = __import__('socket')

View File

@@ -1,4 +1,4 @@
from eventlet.api import trampoline
from eventlet.hubs import trampoline
from eventlet.hubs import get_hub
BUFFER_SIZE = 4096

View File

@@ -8,6 +8,7 @@ from eventlet.support import greenlets as greenlet
__all__ = ['getcurrent', 'sleep', 'spawn', 'spawn_n', 'call_after_global', 'call_after_local', 'GreenThread']
getcurrent = greenlet.getcurrent
TimeoutError = hubs.TimeoutError
def sleep(seconds=0):
"""Yield control to another eligible coroutine until at least *seconds* have
@@ -89,9 +90,6 @@ def call_after_local(seconds, function, *args, **kwargs):
call_after = call_after_local
class TimeoutError(Exception):
"""Exception raised if an asynchronous operation times out"""
pass
def exc_after(seconds, *throw_args):
"""Schedule an exception to be raised into the current coroutine

View File

@@ -1,9 +1,10 @@
import select
import sys
import threading
from eventlet.support import greenlets as greenlet
_threadlocal = threading.local()
__all__ = ["use_hub", "get_hub", "get_default_hub"]
__all__ = ["use_hub", "get_hub", "get_default_hub", "trampoline"]
def get_default_hub():
"""Select the default hub implementation based on what multiplexing
@@ -76,3 +77,44 @@ def get_hub():
use_hub()
hub = _threadlocal.hub = _threadlocal.Hub()
return hub
class TimeoutError(Exception):
"""Exception raised if an asynchronous operation times out"""
pass
def trampoline(fd, read=None, write=None, timeout=None,
timeout_exc=TimeoutError):
"""Suspend the current coroutine until the given socket object or file
descriptor is ready to *read*, ready to *write*, or the specified
*timeout* elapses, depending on arguments specified.
To wait for *fd* to be ready to read, pass *read* ``=True``; ready to
write, pass *write* ``=True``. To specify a timeout, pass the *timeout*
argument in seconds.
If the specified *timeout* elapses before the socket is ready to read or
write, *timeout_exc* will be raised instead of ``trampoline()``
returning normally.
"""
t = None
hub = get_hub()
current = greenlet.getcurrent()
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
assert not (read and write), 'not allowed to trampoline for reading and writing'
fileno = getattr(fd, 'fileno', lambda: fd)()
def cb(d):
current.switch()
if timeout is not None:
t = hub.schedule_call_global(timeout, current.throw, timeout_exc)
try:
if read:
listener = hub.add(hub.READ, fileno, cb)
if write:
listener = hub.add(hub.WRITE, fileno, cb)
try:
return hub.switch()
finally:
hub.remove(listener)
finally:
if t is not None:
t.cancel()

View File

@@ -128,9 +128,9 @@ def wrap_pipes_with_coroutine_pipes():
def new_fdopen(*args, **kw):
return greenio.GreenPipe(__original_fdopen__(*args, **kw))
def new_read(fd, *args, **kw):
from eventlet import api
from eventlet import hubs
try:
api.trampoline(fd, read=True)
hubs.trampoline(fd, read=True)
except socket.error, e:
if e[0] == errno.EPIPE:
return ''
@@ -138,8 +138,8 @@ def wrap_pipes_with_coroutine_pipes():
raise
return __original_read__(fd, *args, **kw)
def new_write(fd, *args, **kw):
from eventlet import api
api.trampoline(fd, write=True)
from eventlet import hubs
hubs.trampoline(fd, write=True)
return __original_write__(fd, *args, **kw)
def new_fork(*args, **kwargs):
pid = __original_fork__()