Moved get_hub, use_hub, get_default_hub to eventlet.hubs. This is a step in the direction of better factoring and fewer circular-ish dependencies.

This commit is contained in:
Ryan Williams
2009-12-17 10:37:59 -08:00
parent c7d3432b29
commit 441f72f408
20 changed files with 207 additions and 168 deletions

1
NEWS
View File

@@ -1,6 +1,7 @@
0.9.3 0.9.3
===== =====
* Moved get_hub, use_hub, get_default_hub to eventlet.hubs
* Renamed libevent hub to pyevent. * Renamed libevent hub to pyevent.
* Renamed coros.event to coros.Event * Renamed coros.event to coros.Event
* Removed previously-deprecated features tcp_server, GreenSSL, erpc, and trap_errors. * Removed previously-deprecated features tcp_server, GreenSSL, erpc, and trap_errors.

View File

@@ -4,11 +4,9 @@ import socket
import string import string
import linecache import linecache
import inspect import inspect
import threading
from eventlet.support import greenlets as greenlet from eventlet.support import greenlets as greenlet
from eventlet.hubs import get_hub as get_hub_, get_default_hub as get_default_hub_, use_hub as use_hub_
import warnings
__all__ = [ __all__ = [
'call_after', 'exc_after', 'getcurrent', 'get_default_hub', 'get_hub', 'call_after', 'exc_after', 'getcurrent', 'get_default_hub', 'get_hub',
@@ -17,6 +15,22 @@ __all__ = [
'unspew', 'use_hub', 'with_timeout', 'timeout'] 'unspew', 'use_hub', 'with_timeout', 'timeout']
import warnings
def get_hub(*a, **kw):
warnings.warn("eventlet.api.get_hub has moved to eventlet.hubs.get_hub",
DeprecationWarning, stacklevel=2)
return get_hub_(*a, **kw)
def get_default_hub(*a, **kw):
warnings.warn("eventlet.api.get_default_hub has moved to"
" eventlet.hubs.get_default_hub",
DeprecationWarning, stacklevel=2)
return get_default_hub_(*a, **kw)
def use_hub(*a, **kw):
warnings.warn("eventlet.api.use_hub has moved to eventlet.hubs.use_hub",
DeprecationWarning, stacklevel=2)
return use_hub_(*a, **kw)
def switch(coro, result=None, exc=None): def switch(coro, result=None, exc=None):
if exc is not None: if exc is not None:
return coro.throw(exc) return coro.throw(exc)
@@ -28,7 +42,6 @@ class TimeoutError(Exception):
"""Exception raised if an asynchronous operation times out""" """Exception raised if an asynchronous operation times out"""
pass pass
_threadlocal = threading.local()
def tcp_listener(address, backlog=50): def tcp_listener(address, backlog=50):
""" """
@@ -83,7 +96,7 @@ def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError
returning normally. returning normally.
""" """
t = None t = None
hub = get_hub() hub = get_hub_()
current = greenlet.getcurrent() current = greenlet.getcurrent()
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop' assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
assert not (read and write), 'not allowed to trampoline for reading and writing' assert not (read and write), 'not allowed to trampoline for reading and writing'
@@ -137,13 +150,13 @@ def spawn(function, *args, **kwds):
# killable # killable
t = None t = None
g = Greenlet(_spawn_startup) g = Greenlet(_spawn_startup)
t = get_hub().schedule_call_global(0, _spawn, g) t = get_hub_().schedule_call_global(0, _spawn, g)
g.switch(function, args, kwds, t.cancel) g.switch(function, args, kwds, t.cancel)
return g return g
def kill(g, *throw_args): def kill(g, *throw_args):
get_hub().schedule_call_global(0, g.throw, *throw_args) get_hub_().schedule_call_global(0, g.throw, *throw_args)
if getcurrent() is not get_hub().greenlet: if getcurrent() is not get_hub_().greenlet:
sleep(0) sleep(0)
def call_after_global(seconds, function, *args, **kwds): def call_after_global(seconds, function, *args, **kwds):
@@ -162,7 +175,7 @@ def call_after_global(seconds, function, *args, **kwds):
g = Greenlet(_spawn_startup) g = Greenlet(_spawn_startup)
g.switch(function, args, kwds) g.switch(function, args, kwds)
g.switch() g.switch()
t = get_hub().schedule_call_global(seconds, startup) t = get_hub_().schedule_call_global(seconds, startup)
return t return t
def call_after_local(seconds, function, *args, **kwds): def call_after_local(seconds, function, *args, **kwds):
@@ -181,7 +194,7 @@ def call_after_local(seconds, function, *args, **kwds):
g = Greenlet(_spawn_startup) g = Greenlet(_spawn_startup)
g.switch(function, args, kwds) g.switch(function, args, kwds)
g.switch() g.switch()
t = get_hub().schedule_call_local(seconds, startup) t = get_hub_().schedule_call_local(seconds, startup)
return t return t
# for compatibility with original eventlet API # for compatibility with original eventlet API
@@ -313,66 +326,6 @@ def exc_after(seconds, *throw_args):
""" """
return call_after(seconds, getcurrent().throw, *throw_args) return call_after(seconds, getcurrent().throw, *throw_args)
def get_default_hub():
"""Select the default hub implementation based on what multiplexing
libraries are installed. Tries twistedr if a twisted reactor is imported,
then poll, then select.
"""
# pyevent hub disabled for now because it is not thread-safe
#try:
# import eventlet.hubs.pyevent
# return eventlet.hubs.pyevent
#except:
# pass
if 'twisted.internet.reactor' in sys.modules:
from eventlet.hubs import twistedr
return twistedr
try:
import eventlet.hubs.epolls
return eventlet.hubs.epolls
except ImportError:
import select
if hasattr(select, 'poll'):
import eventlet.hubs.poll
return eventlet.hubs.poll
else:
import eventlet.hubs.selects
return eventlet.hubs.selects
def use_hub(mod=None):
"""Use the module *mod*, containing a class called Hub, as the
event hub. Usually not required; the default hub is usually fine.
"""
if mod is None:
mod = get_default_hub()
if hasattr(_threadlocal, 'hub'):
del _threadlocal.hub
if isinstance(mod, str):
mod = __import__('eventlet.hubs.' + mod, globals(), locals(), ['Hub'])
if hasattr(mod, 'Hub'):
_threadlocal.Hub = mod.Hub
else:
_threadlocal.Hub = mod
def get_hub():
"""Get the current event hub singleton object.
"""
try:
hub = _threadlocal.hub
except AttributeError:
try:
_threadlocal.Hub
except AttributeError:
use_hub()
hub = _threadlocal.hub = _threadlocal.Hub()
return hub
def sleep(seconds=0): def sleep(seconds=0):
"""Yield control to another eligible coroutine until at least *seconds* have """Yield control to another eligible coroutine until at least *seconds* have
elapsed. elapsed.
@@ -384,7 +337,7 @@ def sleep(seconds=0):
calling any socket methods, it's a good idea to call ``sleep(0)`` calling any socket methods, it's a good idea to call ``sleep(0)``
occasionally; otherwise nothing else will run. occasionally; otherwise nothing else will run.
""" """
hub = get_hub() hub = get_hub_()
assert hub.greenlet is not greenlet.getcurrent(), 'do not call blocking functions from the mainloop' assert hub.greenlet is not greenlet.getcurrent(), 'do not call blocking functions from the mainloop'
timer = hub.schedule_call_global(seconds, greenlet.getcurrent().switch) timer = hub.schedule_call_global(seconds, greenlet.getcurrent().switch)
try: try:

View File

@@ -96,7 +96,7 @@ def backdoor((conn, addr), locals=None):
fl = conn.makeGreenFile("rw") fl = conn.makeGreenFile("rw")
fl.newlines = '\n' fl.newlines = '\n'
greenlet = SocketConsole(fl, (host, port), locals) greenlet = SocketConsole(fl, (host, port), locals)
hub = api.get_hub() hub = hubs.get_hub()
hub.schedule_call_global(0, greenlet.switch) hub.schedule_call_global(0, greenlet.switch)

View File

@@ -4,6 +4,7 @@ import traceback
import warnings import warnings
from eventlet import api from eventlet import api
from eventlet import hubs
class Cancelled(RuntimeError): class Cancelled(RuntimeError):
@@ -130,7 +131,7 @@ class Event(object):
if self._result is NOT_USED: if self._result is NOT_USED:
self._waiters.add(api.getcurrent()) self._waiters.add(api.getcurrent())
try: try:
return api.get_hub().switch() return hubs.get_hub().switch()
finally: finally:
self._waiters.discard(api.getcurrent()) self._waiters.discard(api.getcurrent())
if self._exc is not None: if self._exc is not None:
@@ -168,7 +169,7 @@ class Event(object):
if exc is not None and not isinstance(exc, tuple): if exc is not None and not isinstance(exc, tuple):
exc = (exc, ) exc = (exc, )
self._exc = exc self._exc = exc
hub = api.get_hub() hub = hubs.get_hub()
if self._waiters: if self._waiters:
hub.schedule_call_global(0, self._do_send, self._result, self._exc, self._waiters.copy()) hub.schedule_call_global(0, self._do_send, self._result, self._exc, self._waiters.copy())
@@ -224,7 +225,7 @@ class Semaphore(object):
self._waiters.add(api.getcurrent()) self._waiters.add(api.getcurrent())
try: try:
while self.counter <= 0: while self.counter <= 0:
api.get_hub().switch() hubs.get_hub().switch()
finally: finally:
self._waiters.discard(api.getcurrent()) self._waiters.discard(api.getcurrent())
self.counter -= 1 self.counter -= 1
@@ -237,7 +238,7 @@ class Semaphore(object):
# `blocking' parameter is for consistency with BoundedSemaphore and is ignored # `blocking' parameter is for consistency with BoundedSemaphore and is ignored
self.counter += 1 self.counter += 1
if self._waiters: if self._waiters:
api.get_hub().schedule_call_global(0, self._do_acquire) hubs.get_hub().schedule_call_global(0, self._do_acquire)
return True return True
def _do_acquire(self): def _do_acquire(self):
@@ -429,7 +430,7 @@ class Queue(object):
exc = (exc, ) exc = (exc, )
self.items.append((result, exc)) self.items.append((result, exc))
if self._waiters: if self._waiters:
api.get_hub().schedule_call_global(0, self._do_send) hubs.get_hub().schedule_call_global(0, self._do_send)
def send_exception(self, *args): def send_exception(self, *args):
# the arguments are the same as for greenlet.throw # the arguments are the same as for greenlet.throw
@@ -451,7 +452,7 @@ class Queue(object):
else: else:
self._waiters.add(api.getcurrent()) self._waiters.add(api.getcurrent())
try: try:
result, exc = api.get_hub().switch() result, exc = hubs.get_hub().switch()
if exc is None: if exc is None:
return result return result
else: else:
@@ -491,20 +492,20 @@ class Channel(object):
def send(self, result=None, exc=None): def send(self, result=None, exc=None):
if exc is not None and not isinstance(exc, tuple): if exc is not None and not isinstance(exc, tuple):
exc = (exc, ) exc = (exc, )
if api.getcurrent() is api.get_hub().greenlet: if api.getcurrent() is hubs.get_hub().greenlet:
self.items.append((result, exc)) self.items.append((result, exc))
if self._waiters: if self._waiters:
api.get_hub().schedule_call_global(0, self._do_switch) hubs.get_hub().schedule_call_global(0, self._do_switch)
else: else:
self.items.append((result, exc)) self.items.append((result, exc))
# note that send() does not work well with timeouts. if your timeout fires # note that send() does not work well with timeouts. if your timeout fires
# after this point, the item will remain in the queue # after this point, the item will remain in the queue
if self._waiters: if self._waiters:
api.get_hub().schedule_call_global(0, self._do_switch) hubs.get_hub().schedule_call_global(0, self._do_switch)
if len(self.items) > self.max_size: if len(self.items) > self.max_size:
self._senders.add(api.getcurrent()) self._senders.add(api.getcurrent())
try: try:
api.get_hub().switch() hubs.get_hub().switch()
finally: finally:
self._senders.discard(api.getcurrent()) self._senders.discard(api.getcurrent())
@@ -534,17 +535,17 @@ class Channel(object):
if self.items: if self.items:
result, exc = self.items.popleft() result, exc = self.items.popleft()
if len(self.items) <= self.max_size: if len(self.items) <= self.max_size:
api.get_hub().schedule_call_global(0, self._do_switch) hubs.get_hub().schedule_call_global(0, self._do_switch)
if exc is None: if exc is None:
return result return result
else: else:
api.getcurrent().throw(*exc) api.getcurrent().throw(*exc)
else: else:
if self._senders: if self._senders:
api.get_hub().schedule_call_global(0, self._do_switch) hubs.get_hub().schedule_call_global(0, self._do_switch)
self._waiters.add(api.getcurrent()) self._waiters.add(api.getcurrent())
try: try:
result, exc = api.get_hub().switch() result, exc = hubs.get_hub().switch()
if exc is None: if exc is None:
return result return result
else: else:

View File

@@ -1,6 +1,7 @@
__select = __import__('select') __select = __import__('select')
error = __select.error error = __select.error
from eventlet.api import get_hub, getcurrent from eventlet.api import getcurrent
from eventlet.hubs import get_hub
def get_fileno(obj): def get_fileno(obj):
try: try:

View File

@@ -3,7 +3,7 @@ for var in __socket.__all__:
exec "%s = __socket.%s" % (var, var) exec "%s = __socket.%s" % (var, var)
_fileobject = __socket._fileobject _fileobject = __socket._fileobject
from eventlet.api import get_hub from eventlet.hubs import get_hub
from eventlet.greenio import GreenSocket as socket from eventlet.greenio import GreenSocket as socket
from eventlet.greenio import SSL as _SSL # for exceptions from eventlet.greenio import SSL as _SSL # for exceptions
import warnings import warnings

View File

@@ -1,4 +1,5 @@
from eventlet.api import trampoline, get_hub from eventlet.api import trampoline
from eventlet.hubs import get_hub
BUFFER_SIZE = 4096 BUFFER_SIZE = 4096

View File

@@ -1 +1,74 @@
import sys
import threading
_threadlocal = threading.local()
def get_default_hub():
"""Select the default hub implementation based on what multiplexing
libraries are installed. The order that the hubs are tried is:
* twistedr
* epoll
* poll
* select
It won't ever automatically select the pyevent hub, because it's not
python-thread-safe.
"""
# pyevent hub disabled for now because it is not thread-safe
#try:
# import eventlet.hubs.pyevent
# return eventlet.hubs.pyevent
#except:
# pass
if 'twisted.internet.reactor' in sys.modules:
from eventlet.hubs import twistedr
return twistedr
try:
import eventlet.hubs.epolls
return eventlet.hubs.epolls
except ImportError:
import select
if hasattr(select, 'poll'):
import eventlet.hubs.poll
return eventlet.hubs.poll
else:
import eventlet.hubs.selects
return eventlet.hubs.selects
def use_hub(mod=None):
"""Use the module *mod*, containing a class called Hub, as the
event hub. Usually not required; the default hub is usually fine.
Mod can be an actual module, a string, or None. If *mod* is a module,
it uses it directly. If *mod* is a string, use_hub tries to import
`eventlet.hubs.mod` and use that as the hub module. If *mod* is None,
use_hub uses the default hub. Only call use_hub during application
initialization, because it resets the hub's state and any existing
timers or listeners will never be resumed.
"""
if mod is None:
mod = get_default_hub()
if hasattr(_threadlocal, 'hub'):
del _threadlocal.hub
if isinstance(mod, str):
mod = __import__('eventlet.hubs.' + mod, globals(), locals(), ['Hub'])
if hasattr(mod, 'Hub'):
_threadlocal.Hub = mod.Hub
else:
_threadlocal.Hub = mod
def get_hub():
"""Get the current event hub singleton object.
"""
try:
hub = _threadlocal.hub
except AttributeError:
try:
_threadlocal.Hub
except AttributeError:
use_hub()
hub = _threadlocal.hub = _threadlocal.Hub()
return hub

View File

@@ -57,7 +57,7 @@ coroutines and wait for all them to complete. Such a function is provided by
this module. this module.
""" """
import sys import sys
from eventlet import api, coros from eventlet import api, coros, hubs
__all__ = ['LinkedExited', __all__ = ['LinkedExited',
'LinkedFailed', 'LinkedFailed',
@@ -202,8 +202,8 @@ def killall(procs, *throw_args, **kwargs):
raise TypeError('Invalid keyword argument for proc.killall(): %s' % ', '.join(kwargs.keys())) raise TypeError('Invalid keyword argument for proc.killall(): %s' % ', '.join(kwargs.keys()))
for g in procs: for g in procs:
if not g.dead: if not g.dead:
api.get_hub().schedule_call_global(0, g.throw, *throw_args) hubs.get_hub().schedule_call_global(0, g.throw, *throw_args)
if wait and api.getcurrent() is not api.get_hub().greenlet: if wait and api.getcurrent() is not hubs.get_hub().greenlet:
api.sleep(0) api.sleep(0)
@@ -223,8 +223,8 @@ def spawn_greenlet(function, *args):
supported (limitation of greenlet), use :func:`spawn` to work around that. supported (limitation of greenlet), use :func:`spawn` to work around that.
""" """
g = api.Greenlet(function) g = api.Greenlet(function)
g.parent = api.get_hub().greenlet g.parent = hubs.get_hub().greenlet
api.get_hub().schedule_call_global(0, g.switch, *args) hubs.get_hub().schedule_call_global(0, g.switch, *args)
return g return g
@@ -395,7 +395,7 @@ class Source(object):
self._start_send() self._start_send()
def _start_send(self): def _start_send(self):
api.get_hub().schedule_call_global(0, self._do_send, self._value_links.items(), self._value_links) hubs.get_hub().schedule_call_global(0, self._do_send, self._value_links.items(), self._value_links)
def send_exception(self, *throw_args): def send_exception(self, *throw_args):
assert not self.ready(), "%s has been fired already" % self assert not self.ready(), "%s has been fired already" % self
@@ -404,7 +404,7 @@ class Source(object):
self._start_send_exception() self._start_send_exception()
def _start_send_exception(self): def _start_send_exception(self):
api.get_hub().schedule_call_global(0, self._do_send, self._exception_links.items(), self._exception_links) hubs.get_hub().schedule_call_global(0, self._do_send, self._exception_links.items(), self._exception_links)
def _do_send(self, links, consult): def _do_send(self, links, consult):
while links: while links:
@@ -416,7 +416,7 @@ class Source(object):
finally: finally:
consult.pop(listener, None) consult.pop(listener, None)
except: except:
api.get_hub().schedule_call_global(0, self._do_send, links, consult) hubs.get_hub().schedule_call_global(0, self._do_send, links, consult)
raise raise
def wait(self, timeout=None, *throw_args): def wait(self, timeout=None, *throw_args):
@@ -474,7 +474,7 @@ class Waiter(object):
"""Wake up the greenlet that is calling wait() currently (if there is one). """Wake up the greenlet that is calling wait() currently (if there is one).
Can only be called from get_hub().greenlet. Can only be called from get_hub().greenlet.
""" """
assert api.getcurrent() is api.get_hub().greenlet assert api.getcurrent() is hubs.get_hub().greenlet
if self.greenlet is not None: if self.greenlet is not None:
self.greenlet.switch(value) self.greenlet.switch(value)
@@ -482,7 +482,7 @@ class Waiter(object):
"""Make greenlet calling wait() wake up (if there is a wait()). """Make greenlet calling wait() wake up (if there is a wait()).
Can only be called from get_hub().greenlet. Can only be called from get_hub().greenlet.
""" """
assert api.getcurrent() is api.get_hub().greenlet assert api.getcurrent() is hubs.get_hub().greenlet
if self.greenlet is not None: if self.greenlet is not None:
self.greenlet.throw(*throw_args) self.greenlet.throw(*throw_args)
@@ -492,10 +492,10 @@ class Waiter(object):
""" """
assert self.greenlet is None assert self.greenlet is None
current = api.getcurrent() current = api.getcurrent()
assert current is not api.get_hub().greenlet assert current is not hubs.get_hub().greenlet
self.greenlet = current self.greenlet = current
try: try:
return api.get_hub().switch() return hubs.get_hub().switch()
finally: finally:
self.greenlet = None self.greenlet = None
@@ -587,8 +587,8 @@ class Proc(Source):
if not self.dead: if not self.dead:
if not throw_args: if not throw_args:
throw_args = (ProcExit, ) throw_args = (ProcExit, )
api.get_hub().schedule_call_global(0, self.greenlet.throw, *throw_args) hubs.get_hub().schedule_call_global(0, self.greenlet.throw, *throw_args)
if api.getcurrent() is not api.get_hub().greenlet: if api.getcurrent() is not hubs.get_hub().greenlet:
api.sleep(0) api.sleep(0)
# QQQ maybe Proc should not inherit from Source (because its send() and send_exception() # QQQ maybe Proc should not inherit from Source (because its send() and send_exception()

View File

@@ -1,4 +1,5 @@
from eventlet.api import get_hub, getcurrent from eventlet.api import getcurrent
from eventlet.hubs import get_hub
""" If true, captures a stack trace for each timer when constructed. This is """ If true, captures a stack trace for each timer when constructed. This is
useful for debugging leaking timers, to find out where the timer was set up. """ useful for debugging leaking timers, to find out where the timer was set up. """

View File

@@ -1,6 +1,7 @@
from twisted.internet import defer from twisted.internet import defer
from twisted.python import failure from twisted.python import failure
from eventlet.api import get_hub, spawn, getcurrent from eventlet.api import spawn, getcurrent
from eventlet.hubs import get_hub
def block_on(deferred): def block_on(deferred):
cur = [getcurrent()] cur = [getcurrent()]

View File

@@ -4,7 +4,7 @@ import os
import errno import errno
import unittest import unittest
# convenience # convenience for importers
main = unittest.main main = unittest.main
def skipped(func): def skipped(func):
@@ -63,7 +63,7 @@ def skip_unless(condition):
def requires_twisted(func): def requires_twisted(func):
""" Decorator that skips a test if Twisted is not present.""" """ Decorator that skips a test if Twisted is not present."""
def requirement(_f): def requirement(_f):
from eventlet.api import get_hub from eventlet.hubs import get_hub
try: try:
return 'Twisted' in type(get_hub()).__name__ return 'Twisted' in type(get_hub()).__name__
except Exception: except Exception:
@@ -74,12 +74,13 @@ def requires_twisted(func):
def skip_with_pyevent(func): def skip_with_pyevent(func):
""" Decorator that skips a test if we're using the pyevent hub.""" """ Decorator that skips a test if we're using the pyevent hub."""
def using_pyevent(_f): def using_pyevent(_f):
from eventlet.api import get_hub from eventlet.hubs import get_hub
return 'pyevent' in type(get_hub()).__module__ return 'pyevent' in type(get_hub()).__module__
return skip_if(using_pyevent)(func) return skip_if(using_pyevent)(func)
def skip_on_windows(func): def skip_on_windows(func):
""" Decorator that skips a test on Windows."""
import sys import sys
return skip_if(sys.platform.startswith('win'))(func) return skip_if(sys.platform.startswith('win'))(func)
@@ -109,14 +110,14 @@ class SilencedTestCase(LimitedTestCase):
""" Subclass of LimitedTestCase that also silences the printing of timer """ Subclass of LimitedTestCase that also silences the printing of timer
exceptions.""" exceptions."""
def setUp(self): def setUp(self):
from eventlet import api from eventlet import hubs
super(SilencedTestCase, self).setUp() super(SilencedTestCase, self).setUp()
api.get_hub().silent_timer_exceptions = True hubs.get_hub().silent_timer_exceptions = True
def tearDown(self): def tearDown(self):
from eventlet import api from eventlet import hubs
super(SilencedTestCase, self).tearDown() super(SilencedTestCase, self).tearDown()
api.get_hub().silent_timer_exceptions = False hubs.get_hub().silent_timer_exceptions = False
def find_command(command): def find_command(command):

View File

@@ -6,21 +6,21 @@ from unittest import TestCase, main
from eventlet import api from eventlet import api
from eventlet import greenio from eventlet import greenio
from eventlet import util from eventlet import util
from eventlet import hubs
def check_hub(): def check_hub():
# Clear through the descriptor queue # Clear through the descriptor queue
api.sleep(0) api.sleep(0)
api.sleep(0) api.sleep(0)
hub = api.get_hub() hub = hubs.get_hub()
for nm in 'get_readers', 'get_writers': for nm in 'get_readers', 'get_writers':
dct = getattr(hub, nm)() dct = getattr(hub, nm)()
assert not dct, "hub.%s not empty: %s" % (nm, dct) assert not dct, "hub.%s not empty: %s" % (nm, dct)
# Stop the runloop (unless it's twistedhub which does not support that) # Stop the runloop (unless it's twistedhub which does not support that)
if not getattr(api.get_hub(), 'uses_twisted_reactor', None): if not getattr(hub, 'uses_twisted_reactor', None):
api.get_hub().abort() hub.abort()
api.sleep(0) api.sleep(0)
### ??? assert not api.get_hub().running ### ??? assert not hubs.get_hub().running
class TestApi(TestCase): class TestApi(TestCase):
@@ -145,16 +145,6 @@ class TestApi(TestCase):
check_hub() check_hub()
if not getattr(api.get_hub(), 'uses_twisted_reactor', None):
def test_explicit_hub(self):
oldhub = api.get_hub()
try:
api.use_hub(Foo)
assert isinstance(api.get_hub(), Foo), api.get_hub()
finally:
api._threadlocal.hub = oldhub
check_hub()
def test_named(self): def test_named(self):
named_foo = api.named('tests.api_test.Foo') named_foo = api.named('tests.api_test.Foo')
self.assertEquals( self.assertEquals(
@@ -233,8 +223,6 @@ class TestApi(TestCase):
self.assertRaises(api.TimeoutError, api.with_timeout, 0.1, func) self.assertRaises(api.TimeoutError, api.with_timeout, 0.1, func)
class Foo(object): class Foo(object):
pass pass

View File

@@ -1,7 +1,7 @@
import logging import logging
from nose.plugins.base import Plugin from nose.plugins.base import Plugin
from eventlet import api from eventlet import hubs
log = logging.getLogger('nose.plugins.eventlethub') log = logging.getLogger('nose.plugins.eventlethub')
@@ -56,13 +56,13 @@ class EventletHub(Plugin):
if self.hub_name is None: if self.hub_name is None:
log.warn('Using default eventlet hub: %s, did you mean '\ log.warn('Using default eventlet hub: %s, did you mean '\
'to supply --hub command line argument?', 'to supply --hub command line argument?',
api.get_hub().__module__) hubs.get_hub().__module__)
else: else:
if self.hub_name == 'twistedr': if self.hub_name == 'twistedr':
if self.twisted_already_used: if self.twisted_already_used:
return return
else: else:
self.twisted_already_used = True self.twisted_already_used = True
api.use_hub(self.hub_name) hubs.use_hub(self.hub_name)
log.info('using hub %s', api.get_hub()) log.info('using hub %s', hubs.get_hub())

View File

@@ -3,8 +3,8 @@ from eventlet.green import thread
from eventlet.green import time from eventlet.green import time
# necessary to initialize the hub before running on 2.5 # necessary to initialize the hub before running on 2.5
from eventlet import api from eventlet import hubs
api.get_hub() hubs.get_hub()
patcher.inject('test.test_thread', patcher.inject('test.test_thread',
globals(), globals(),

View File

@@ -4,8 +4,8 @@ from eventlet.green import threading
from eventlet.green import time from eventlet.green import time
# hub requires initialization before test can run # hub requires initialization before test can run
from eventlet import api from eventlet import hubs
api.get_hub() hubs.get_hub()
patcher.inject('test.test_threading_local', patcher.inject('test.test_threading_local',
globals(), globals(),

View File

@@ -1,37 +1,45 @@
import unittest from tests import LimitedTestCase, SilencedTestCase, main
from tests import SilencedTestCase
import time import time
from eventlet import api from eventlet import api
from eventlet import hubs
from eventlet.green import socket from eventlet.green import socket
DELAY = 0.1 DELAY = 0.001
class TestScheduleCall(unittest.TestCase):
class TestScheduleCall(LimitedTestCase):
def test_local(self): def test_local(self):
lst = [1] lst = [1]
api.spawn(api.get_hub().schedule_call_local, DELAY, lst.pop) api.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop)
api.sleep(0)
api.sleep(DELAY*2) api.sleep(DELAY*2)
assert lst == [1], lst assert lst == [1], lst
def test_global(self): def test_global(self):
lst = [1] lst = [1]
api.spawn(api.get_hub().schedule_call_global, DELAY, lst.pop) api.spawn(hubs.get_hub().schedule_call_global, DELAY, lst.pop)
api.sleep(0)
api.sleep(DELAY*2) api.sleep(DELAY*2)
assert lst == [], lst assert lst == [], lst
def test_ordering(self):
lst = []
hubs.get_hub().schedule_call_global(DELAY*2, lst.append, 3)
hubs.get_hub().schedule_call_global(DELAY, lst.append, 1)
hubs.get_hub().schedule_call_global(DELAY, lst.append, 2)
while len(lst) < 3:
api.sleep(DELAY)
self.assertEquals(lst, [1,2,3])
class TestDebug(unittest.TestCase): class TestDebug(LimitedTestCase):
def test_debug(self): def test_debug(self):
api.get_hub().debug = True hubs.get_hub().debug = True
self.assert_(api.get_hub().debug) self.assert_(hubs.get_hub().debug)
api.get_hub().debug = False hubs.get_hub().debug = False
self.assert_(not api.get_hub().debug) self.assert_(not hubs.get_hub().debug)
class TestExceptionInMainloop(SilencedTestCase): class TestExceptionInMainloop(SilencedTestCase):
def test_sleep(self): def test_sleep(self):
# even if there was an error in the mainloop, the hub should continue to work # even if there was an error in the mainloop, the hub should continue to work
start = time.time() start = time.time()
@@ -43,7 +51,7 @@ class TestExceptionInMainloop(SilencedTestCase):
def fail(): def fail():
1/0 1/0
api.get_hub().schedule_call_global(0, fail) hubs.get_hub().schedule_call_global(0, fail)
start = time.time() start = time.time()
api.sleep(DELAY) api.sleep(DELAY)
@@ -52,6 +60,23 @@ class TestExceptionInMainloop(SilencedTestCase):
assert delay >= DELAY*0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (delay, DELAY) assert delay >= DELAY*0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (delay, DELAY)
if __name__=='__main__': class TestHubSelection(LimitedTestCase):
unittest.main() def test_explicit_hub(self):
if getattr(hubs.get_hub(), 'uses_twisted_reactor', None):
# doesn't work with twisted
return
oldhub = hubs.get_hub()
try:
hubs.use_hub(Foo)
self.assert_(isinstance(hubs.get_hub(), Foo), hubs.get_hub())
finally:
hubs._threadlocal.hub = oldhub
class Foo(object):
pass
if __name__=='__main__':
main()

View File

@@ -1,4 +1,4 @@
from eventlet import pool, coros, api from eventlet import pool, coros, api, hubs
from tests import LimitedTestCase from tests import LimitedTestCase
from unittest import main from unittest import main
@@ -70,7 +70,7 @@ class TestCoroutinePool(LimitedTestCase):
def fire_timer(): def fire_timer():
timer_fired.append(True) timer_fired.append(True)
def some_work(): def some_work():
api.get_hub().schedule_call_local(0, fire_timer) hubs.get_hub().schedule_call_local(0, fire_timer)
pool = self.klass(0, 2) pool = self.klass(0, 2)
worker = pool.execute(some_work) worker = pool.execute(some_work)
worker.wait() worker.wait()

View File

@@ -1,13 +1,8 @@
import unittest import unittest
from eventlet import api from eventlet import api
from eventlet.green import socket
if hasattr(api._threadlocal, 'hub'): class TestSocketErrors(unittest.TestCase):
from eventlet.green import socket
else:
import socket
class TestSocketErrors(unittest.TestCase):
def test_connection_refused(self): def test_connection_refused(self):
# open and close a dummy server to find an unused port # open and close a dummy server to find an unused port
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

View File

@@ -1,10 +1,8 @@
from unittest import TestCase, main from unittest import TestCase, main
from eventlet import api, timer from eventlet import api, timer, hubs
class TestTimer(TestCase): class TestTimer(TestCase):
mode = 'static'
def test_copy(self): def test_copy(self):
t = timer.Timer(0, lambda: None) t = timer.Timer(0, lambda: None)
t2 = t.copy() t2 = t.copy()
@@ -24,7 +22,7 @@ class TestTimer(TestCase):
## assert not r.running ## assert not r.running
def test_schedule(self): def test_schedule(self):
hub = api.get_hub() hub = hubs.get_hub()
# clean up the runloop, preventing side effects from previous tests # clean up the runloop, preventing side effects from previous tests
# on this thread # on this thread
if hub.running: if hub.running:
@@ -36,8 +34,8 @@ class TestTimer(TestCase):
# let's have a timer somewhere in the future; make sure abort() still works # let's have a timer somewhere in the future; make sure abort() still works
# (for pyevent, its dispatcher() does not exit if there is something scheduled) # (for pyevent, its dispatcher() does not exit if there is something scheduled)
# XXX pyevent handles this, other hubs do not # XXX pyevent handles this, other hubs do not
#api.get_hub().schedule_call_global(10000, lambda: (called.append(True), hub.abort())) #hubs.get_hub().schedule_call_global(10000, lambda: (called.append(True), hub.abort()))
api.get_hub().schedule_call_global(0, lambda: (called.append(True), hub.abort())) hubs.get_hub().schedule_call_global(0, lambda: (called.append(True), hub.abort()))
hub.default_sleep = lambda: 0.0 hub.default_sleep = lambda: 0.0
hub.switch() hub.switch()
assert called assert called