Moved a bunch of stuff around. Event, GreenThread, spawn, spawn_n went to the greenthread module. Some tweaks to the parallel module, including adding a spawn_n method that is faster than spawn.
This commit is contained in:
@@ -1,2 +1,16 @@
|
||||
version_info = (0, 9, '3pre')
|
||||
__version__ = '%s.%s.%s' % version_info
|
||||
|
||||
from eventlet import greenthread
|
||||
from eventlet import parallel
|
||||
|
||||
__all__ = ['sleep', 'spawn', 'spawn_n', 'Event', 'GreenPool', 'GreenPile']
|
||||
|
||||
sleep = greenthread.sleep
|
||||
|
||||
spawn = greenthread.spawn
|
||||
spawn_n = greenthread.spawn_n
|
||||
Event = greenthread.Event
|
||||
|
||||
GreenPool = parallel.GreenPool
|
||||
GreenPile = parallel.GreenPile
|
||||
@@ -119,6 +119,10 @@ def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError
|
||||
t.cancel()
|
||||
|
||||
|
||||
from eventlet import greenthread
|
||||
spawn = greenthread.spawn
|
||||
spawn_n = greenthread.spawn_n
|
||||
|
||||
def _spawn_startup(cb, args, kw, cancel=None):
|
||||
try:
|
||||
greenlet.getcurrent().parent.switch()
|
||||
@@ -128,69 +132,6 @@ def _spawn_startup(cb, args, kw, cancel=None):
|
||||
cancel()
|
||||
return cb(*args, **kw)
|
||||
|
||||
|
||||
class GreenThread(Greenlet):
|
||||
def __init__(self, parent):
|
||||
Greenlet.__init__(self, self.main, parent)
|
||||
from eventlet import coros
|
||||
self._exit_event = coros.Event()
|
||||
|
||||
def wait(self):
|
||||
return self._exit_event.wait()
|
||||
|
||||
def link(self, func, *curried_args, **curried_kwargs):
|
||||
""" Set up a function to be called with the results of the GreenThread.
|
||||
|
||||
The function must have the following signature:
|
||||
def f(result=None, exc=None, [curried args/kwargs]):
|
||||
"""
|
||||
self._exit_funcs = getattr(self, '_exit_funcs', [])
|
||||
self._exit_funcs.append((func, curried_args, curried_kwargs))
|
||||
|
||||
def main(self, function, args, kwargs):
|
||||
try:
|
||||
result = function(*args, **kwargs)
|
||||
except:
|
||||
self._exit_event.send_exception(*sys.exc_info())
|
||||
# ca and ckw are the curried function arguments
|
||||
for f, ca, ckw in getattr(self, '_exit_funcs', []):
|
||||
f(exc=sys.exc_info(), *ca, **ckw)
|
||||
raise
|
||||
else:
|
||||
self._exit_event.send(result)
|
||||
for f, ca, ckw in getattr(self, '_exit_funcs', []):
|
||||
f(result, *ca, **ckw)
|
||||
|
||||
|
||||
def spawn(func, *args, **kwargs):
|
||||
"""Create a green thread to run func(*args, **kwargs). Returns a GreenThread
|
||||
object which you can use to get the results of the call.
|
||||
"""
|
||||
hub = get_hub_()
|
||||
g = GreenThread(hub.greenlet)
|
||||
hub.schedule_call_global(0, g.switch, func, args, kwargs)
|
||||
return g
|
||||
|
||||
|
||||
def _main_wrapper(func, args, kwargs):
|
||||
# function that gets around the fact that greenlet.switch
|
||||
# doesn't accept keyword arguments
|
||||
return func(*args, **kwargs)
|
||||
|
||||
def spawn_n(func, *args, **kwargs):
|
||||
"""Same as spawn, but returns a greenlet object from which it is not possible
|
||||
to retrieve the results. This is slightly faster than spawn; it is fastest
|
||||
if there are no keyword arguments."""
|
||||
hub = get_hub_()
|
||||
if kwargs:
|
||||
g = Greenlet(_main_wrapper, parent=hub.greenlet)
|
||||
hub.schedule_call_global(0, g.switch, func, args, kwargs)
|
||||
else:
|
||||
g = Greenlet(func, parent=hub.greenlet)
|
||||
hub.schedule_call_global(0, g.switch, *args)
|
||||
return g
|
||||
|
||||
|
||||
def kill(g, *throw_args):
|
||||
get_hub_().schedule_call_global(0, g.throw, *throw_args)
|
||||
if getcurrent() is not get_hub_().greenlet:
|
||||
@@ -362,26 +303,9 @@ def exc_after(seconds, *throw_args):
|
||||
timer.cancel()
|
||||
"""
|
||||
return call_after(seconds, getcurrent().throw, *throw_args)
|
||||
|
||||
def sleep(seconds=0):
|
||||
"""Yield control to another eligible coroutine until at least *seconds* have
|
||||
elapsed.
|
||||
|
||||
*seconds* may be specified as an integer, or a float if fractional seconds
|
||||
are desired. Calling :func:`~eventlet.api.sleep` with *seconds* of 0 is the
|
||||
canonical way of expressing a cooperative yield. For example, if one is
|
||||
looping over a large list performing an expensive calculation without
|
||||
calling any socket methods, it's a good idea to call ``sleep(0)``
|
||||
occasionally; otherwise nothing else will run.
|
||||
"""
|
||||
hub = get_hub_()
|
||||
assert hub.greenlet is not greenlet.getcurrent(), 'do not call blocking functions from the mainloop'
|
||||
timer = hub.schedule_call_global(seconds, greenlet.getcurrent().switch)
|
||||
try:
|
||||
hub.switch()
|
||||
finally:
|
||||
timer.cancel()
|
||||
|
||||
|
||||
|
||||
sleep = greenthread.sleep
|
||||
|
||||
getcurrent = greenlet.getcurrent
|
||||
GreenletExit = greenlet.GreenletExit
|
||||
|
||||
@@ -5,11 +5,7 @@ import warnings
|
||||
|
||||
from eventlet import api
|
||||
from eventlet import hubs
|
||||
|
||||
|
||||
class Cancelled(RuntimeError):
|
||||
pass
|
||||
|
||||
from eventlet import greenthread
|
||||
|
||||
class NOT_USED:
|
||||
def __repr__(self):
|
||||
@@ -17,179 +13,19 @@ class NOT_USED:
|
||||
|
||||
NOT_USED = NOT_USED()
|
||||
|
||||
class Event(object):
|
||||
"""An abstraction where an arbitrary number of coroutines
|
||||
can wait for one event from another.
|
||||
def Event(*a, **kw):
|
||||
warnings.warn("The Event class has been moved to the greenthread module! "
|
||||
"Please construct greenthread.Event objects instead.",
|
||||
DeprecationWarning, stacklevel=2)
|
||||
return greenthread.Event(*a, **kw)
|
||||
|
||||
Events differ from channels in two ways:
|
||||
|
||||
1. calling :meth:`send` does not unschedule the current coroutine
|
||||
2. :meth:`send` can only be called once; use :meth:`reset` to prepare the
|
||||
event for another :meth:`send`
|
||||
|
||||
They are ideal for communicating return values between coroutines.
|
||||
|
||||
>>> from eventlet import coros, api
|
||||
>>> evt = coros.Event()
|
||||
>>> def baz(b):
|
||||
... evt.send(b + 1)
|
||||
...
|
||||
>>> _ = api.spawn(baz, 3)
|
||||
>>> evt.wait()
|
||||
4
|
||||
"""
|
||||
_result = None
|
||||
def __init__(self):
|
||||
self._waiters = set()
|
||||
self.reset()
|
||||
|
||||
def __str__(self):
|
||||
params = (self.__class__.__name__, hex(id(self)), self._result, self._exc, len(self._waiters))
|
||||
return '<%s at %s result=%r _exc=%r _waiters[%d]>' % params
|
||||
|
||||
def reset(self):
|
||||
""" Reset this event so it can be used to send again.
|
||||
Can only be called after :meth:`send` has been called.
|
||||
|
||||
>>> from eventlet import coros
|
||||
>>> evt = coros.Event()
|
||||
>>> evt.send(1)
|
||||
>>> evt.reset()
|
||||
>>> evt.send(2)
|
||||
>>> evt.wait()
|
||||
2
|
||||
|
||||
Calling reset multiple times in a row is an error.
|
||||
|
||||
>>> evt.reset()
|
||||
>>> evt.reset()
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
AssertionError: Trying to re-reset() a fresh event.
|
||||
|
||||
"""
|
||||
assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.'
|
||||
self.epoch = time.time()
|
||||
self._result = NOT_USED
|
||||
self._exc = None
|
||||
|
||||
def ready(self):
|
||||
""" Return true if the :meth:`wait` call will return immediately.
|
||||
Used to avoid waiting for things that might take a while to time out.
|
||||
For example, you can put a bunch of events into a list, and then visit
|
||||
them all repeatedly, calling :meth:`ready` until one returns ``True``,
|
||||
and then you can :meth:`wait` on that one."""
|
||||
return self._result is not NOT_USED
|
||||
|
||||
def has_exception(self):
|
||||
return self._exc is not None
|
||||
|
||||
def has_result(self):
|
||||
return self._result is not NOT_USED and self._exc is None
|
||||
|
||||
def poll(self, notready=None):
|
||||
if self.ready():
|
||||
return self.wait()
|
||||
return notready
|
||||
|
||||
# QQQ make it return tuple (type, value, tb) instead of raising
|
||||
# because
|
||||
# 1) "poll" does not imply raising
|
||||
# 2) it's better not to screw up caller's sys.exc_info() by default
|
||||
# (e.g. if caller wants to calls the function in except or finally)
|
||||
def poll_exception(self, notready=None):
|
||||
if self.has_exception():
|
||||
return self.wait()
|
||||
return notready
|
||||
|
||||
def poll_result(self, notready=None):
|
||||
if self.has_result():
|
||||
return self.wait()
|
||||
return notready
|
||||
|
||||
def wait(self):
|
||||
"""Wait until another coroutine calls :meth:`send`.
|
||||
Returns the value the other coroutine passed to
|
||||
:meth:`send`.
|
||||
|
||||
>>> from eventlet import coros, api
|
||||
>>> evt = coros.Event()
|
||||
>>> def wait_on():
|
||||
... retval = evt.wait()
|
||||
... print "waited for", retval
|
||||
>>> _ = api.spawn(wait_on)
|
||||
>>> evt.send('result')
|
||||
>>> api.sleep(0)
|
||||
waited for result
|
||||
|
||||
Returns immediately if the event has already
|
||||
occured.
|
||||
|
||||
>>> evt.wait()
|
||||
'result'
|
||||
"""
|
||||
if self._result is NOT_USED:
|
||||
self._waiters.add(api.getcurrent())
|
||||
try:
|
||||
return hubs.get_hub().switch()
|
||||
finally:
|
||||
self._waiters.discard(api.getcurrent())
|
||||
if self._exc is not None:
|
||||
api.getcurrent().throw(*self._exc)
|
||||
return self._result
|
||||
|
||||
def send(self, result=None, exc=None):
|
||||
"""Makes arrangements for the waiters to be woken with the
|
||||
result and then returns immediately to the parent.
|
||||
|
||||
>>> from eventlet import coros, api
|
||||
>>> evt = coros.Event()
|
||||
>>> def waiter():
|
||||
... print 'about to wait'
|
||||
... result = evt.wait()
|
||||
... print 'waited for', result
|
||||
>>> _ = api.spawn(waiter)
|
||||
>>> api.sleep(0)
|
||||
about to wait
|
||||
>>> evt.send('a')
|
||||
>>> api.sleep(0)
|
||||
waited for a
|
||||
|
||||
It is an error to call :meth:`send` multiple times on the same event.
|
||||
|
||||
>>> evt.send('whoops')
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
AssertionError: Trying to re-send() an already-triggered event.
|
||||
|
||||
Use :meth:`reset` between :meth:`send` s to reuse an event object.
|
||||
"""
|
||||
assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.'
|
||||
self._result = result
|
||||
if exc is not None and not isinstance(exc, tuple):
|
||||
exc = (exc, )
|
||||
self._exc = exc
|
||||
hub = hubs.get_hub()
|
||||
if self._waiters:
|
||||
hub.schedule_call_global(0, self._do_send, self._result, self._exc, self._waiters.copy())
|
||||
|
||||
def _do_send(self, result, exc, waiters):
|
||||
while waiters:
|
||||
waiter = waiters.pop()
|
||||
if waiter in self._waiters:
|
||||
if exc is None:
|
||||
waiter.switch(result)
|
||||
else:
|
||||
waiter.throw(*exc)
|
||||
|
||||
def send_exception(self, *args):
|
||||
# the arguments and the same as for greenlet.throw
|
||||
return self.send(None, args)
|
||||
|
||||
def event(*a, **kw):
|
||||
warnings.warn("The event class has been capitalized! Please construct"
|
||||
" Event objects instead.", DeprecationWarning, stacklevel=2)
|
||||
return Event(*a, **kw)
|
||||
warnings.warn("The event class has been capitalized and moved! Please "
|
||||
"construct greenthread.Event objects instead.",
|
||||
DeprecationWarning, stacklevel=2)
|
||||
return greenthread.Event(*a, **kw)
|
||||
|
||||
|
||||
class Semaphore(object):
|
||||
"""An unbounded semaphore.
|
||||
@@ -348,7 +184,7 @@ class metaphore(object):
|
||||
"""
|
||||
def __init__(self):
|
||||
self.counter = 0
|
||||
self.event = Event()
|
||||
self.event = greenthread.Event()
|
||||
# send() right away, else we'd wait on the default 0 count!
|
||||
self.event.send()
|
||||
|
||||
@@ -397,14 +233,14 @@ def execute(func, *args, **kw):
|
||||
>>> evt.wait()
|
||||
('foo', 1)
|
||||
"""
|
||||
evt = Event()
|
||||
def _really_execute():
|
||||
evt.send(func(*args, **kw))
|
||||
api.spawn(_really_execute)
|
||||
return evt
|
||||
warnings.warn("Coros.execute is deprecated. Please use eventlet.spawn "
|
||||
"instead.", DeprecationWarning, stacklevel=2)
|
||||
return greenthread.spawn(func, *args, **kw)
|
||||
|
||||
|
||||
def CoroutinePool(*args, **kwargs):
|
||||
warnings.warn("CoroutinePool is deprecated. Please use "
|
||||
"eventlet.GreenPool instead.", DeprecationWarning, stacklevel=2)
|
||||
from eventlet.pool import Pool
|
||||
return Pool(*args, **kwargs)
|
||||
|
||||
@@ -595,7 +431,7 @@ class Actor(object):
|
||||
serially.
|
||||
"""
|
||||
self._mailbox = collections.deque()
|
||||
self._event = Event()
|
||||
self._event = greenthread.Event()
|
||||
self._killer = api.spawn(self.run_forever)
|
||||
self._pool = CoroutinePool(min_size=0, max_size=concurrency)
|
||||
|
||||
@@ -604,7 +440,7 @@ class Actor(object):
|
||||
while True:
|
||||
if not self._mailbox:
|
||||
self._event.wait()
|
||||
self._event = Event()
|
||||
self._event = greenthread.Event()
|
||||
else:
|
||||
# leave the message in the mailbox until after it's
|
||||
# been processed so the event doesn't get triggered
|
||||
@@ -645,7 +481,7 @@ class Actor(object):
|
||||
coroutine in a predictable manner, but this kinda defeats the point of
|
||||
the :class:`Actor`, so don't do it in a real application.
|
||||
|
||||
>>> evt = Event()
|
||||
>>> evt = greenthread.Event()
|
||||
>>> a.cast( ("message 1", evt) )
|
||||
>>> evt.wait() # force it to run at this exact moment
|
||||
received message 1
|
||||
|
||||
267
eventlet/greenthread.py
Normal file
267
eventlet/greenthread.py
Normal file
@@ -0,0 +1,267 @@
|
||||
import sys
|
||||
|
||||
from eventlet import hubs
|
||||
from eventlet.support import greenlets as greenlet
|
||||
|
||||
__all__ = ['getcurrent', 'sleep', 'spawn', 'spawn_n', 'GreenThread', 'Event']
|
||||
|
||||
getcurrent = greenlet.getcurrent
|
||||
|
||||
def sleep(seconds=0):
|
||||
"""Yield control to another eligible coroutine until at least *seconds* have
|
||||
elapsed.
|
||||
|
||||
*seconds* may be specified as an integer, or a float if fractional seconds
|
||||
are desired. Calling :func:`~eventlet.api.sleep` with *seconds* of 0 is the
|
||||
canonical way of expressing a cooperative yield. For example, if one is
|
||||
looping over a large list performing an expensive calculation without
|
||||
calling any socket methods, it's a good idea to call ``sleep(0)``
|
||||
occasionally; otherwise nothing else will run.
|
||||
"""
|
||||
hub = hubs.get_hub()
|
||||
assert hub.greenlet is not greenlet.getcurrent(), 'do not call blocking functions from the mainloop'
|
||||
timer = hub.schedule_call_global(seconds, greenlet.getcurrent().switch)
|
||||
try:
|
||||
hub.switch()
|
||||
finally:
|
||||
timer.cancel()
|
||||
|
||||
|
||||
def spawn(func, *args, **kwargs):
|
||||
"""Create a green thread to run func(*args, **kwargs). Returns a GreenThread
|
||||
object which you can use to get the results of the call.
|
||||
"""
|
||||
hub = hubs.get_hub()
|
||||
g = GreenThread(hub.greenlet)
|
||||
hub.schedule_call_global(0, g.switch, func, args, kwargs)
|
||||
return g
|
||||
|
||||
|
||||
def _main_wrapper(func, args, kwargs):
|
||||
# function that gets around the fact that greenlet.switch
|
||||
# doesn't accept keyword arguments
|
||||
return func(*args, **kwargs)
|
||||
|
||||
|
||||
def spawn_n(func, *args, **kwargs):
|
||||
"""Same as spawn, but returns a greenlet object from which it is not
|
||||
possible to retrieve the results. This is slightly faster than spawn; it is
|
||||
fastest if there are no keyword arguments."""
|
||||
hub = hubs.get_hub()
|
||||
if kwargs:
|
||||
g = greenlet.greenlet(_main_wrapper, parent=hub.greenlet)
|
||||
hub.schedule_call_global(0, g.switch, func, args, kwargs)
|
||||
else:
|
||||
g = greenlet.greenlet(func, parent=hub.greenlet)
|
||||
hub.schedule_call_global(0, g.switch, *args)
|
||||
return g
|
||||
|
||||
|
||||
class GreenThread(greenlet.greenlet):
|
||||
def __init__(self, parent):
|
||||
greenlet.greenlet.__init__(self, self.main, parent)
|
||||
self._exit_event = Event()
|
||||
|
||||
def wait(self):
|
||||
return self._exit_event.wait()
|
||||
|
||||
def link(self, func, *curried_args, **curried_kwargs):
|
||||
""" Set up a function to be called with the results of the GreenThread.
|
||||
|
||||
The function must have the following signature:
|
||||
def f(result=None, exc=None, [curried args/kwargs]):
|
||||
"""
|
||||
self._exit_funcs = getattr(self, '_exit_funcs', [])
|
||||
self._exit_funcs.append((func, curried_args, curried_kwargs))
|
||||
|
||||
def main(self, function, args, kwargs):
|
||||
try:
|
||||
result = function(*args, **kwargs)
|
||||
except:
|
||||
self._exit_event.send_exception(*sys.exc_info())
|
||||
# ca and ckw are the curried function arguments
|
||||
for f, ca, ckw in getattr(self, '_exit_funcs', []):
|
||||
f(exc=sys.exc_info(), *ca, **ckw)
|
||||
raise
|
||||
else:
|
||||
self._exit_event.send(result)
|
||||
for f, ca, ckw in getattr(self, '_exit_funcs', []):
|
||||
f(result, *ca, **ckw)
|
||||
|
||||
|
||||
|
||||
class NOT_USED:
|
||||
def __repr__(self):
|
||||
return 'NOT_USED'
|
||||
|
||||
NOT_USED = NOT_USED()
|
||||
|
||||
class Event(object):
|
||||
"""An abstraction where an arbitrary number of coroutines
|
||||
can wait for one event from another.
|
||||
|
||||
Events differ from channels in two ways:
|
||||
|
||||
1. calling :meth:`send` does not unschedule the current coroutine
|
||||
2. :meth:`send` can only be called once; use :meth:`reset` to prepare the
|
||||
event for another :meth:`send`
|
||||
|
||||
They are ideal for communicating return values between coroutines.
|
||||
|
||||
>>> from eventlet import coros, api
|
||||
>>> evt = coros.Event()
|
||||
>>> def baz(b):
|
||||
... evt.send(b + 1)
|
||||
...
|
||||
>>> _ = api.spawn(baz, 3)
|
||||
>>> evt.wait()
|
||||
4
|
||||
"""
|
||||
_result = None
|
||||
def __init__(self):
|
||||
self._waiters = set()
|
||||
self.reset()
|
||||
|
||||
def __str__(self):
|
||||
params = (self.__class__.__name__, hex(id(self)), self._result, self._exc, len(self._waiters))
|
||||
return '<%s at %s result=%r _exc=%r _waiters[%d]>' % params
|
||||
|
||||
def reset(self):
|
||||
""" Reset this event so it can be used to send again.
|
||||
Can only be called after :meth:`send` has been called.
|
||||
|
||||
>>> from eventlet import coros
|
||||
>>> evt = coros.Event()
|
||||
>>> evt.send(1)
|
||||
>>> evt.reset()
|
||||
>>> evt.send(2)
|
||||
>>> evt.wait()
|
||||
2
|
||||
|
||||
Calling reset multiple times in a row is an error.
|
||||
|
||||
>>> evt.reset()
|
||||
>>> evt.reset()
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
AssertionError: Trying to re-reset() a fresh event.
|
||||
|
||||
"""
|
||||
assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.'
|
||||
self._result = NOT_USED
|
||||
self._exc = None
|
||||
|
||||
def ready(self):
|
||||
""" Return true if the :meth:`wait` call will return immediately.
|
||||
Used to avoid waiting for things that might take a while to time out.
|
||||
For example, you can put a bunch of events into a list, and then visit
|
||||
them all repeatedly, calling :meth:`ready` until one returns ``True``,
|
||||
and then you can :meth:`wait` on that one."""
|
||||
return self._result is not NOT_USED
|
||||
|
||||
def has_exception(self):
|
||||
return self._exc is not None
|
||||
|
||||
def has_result(self):
|
||||
return self._result is not NOT_USED and self._exc is None
|
||||
|
||||
def poll(self, notready=None):
|
||||
if self.ready():
|
||||
return self.wait()
|
||||
return notready
|
||||
|
||||
# QQQ make it return tuple (type, value, tb) instead of raising
|
||||
# because
|
||||
# 1) "poll" does not imply raising
|
||||
# 2) it's better not to screw up caller's sys.exc_info() by default
|
||||
# (e.g. if caller wants to calls the function in except or finally)
|
||||
def poll_exception(self, notready=None):
|
||||
if self.has_exception():
|
||||
return self.wait()
|
||||
return notready
|
||||
|
||||
def poll_result(self, notready=None):
|
||||
if self.has_result():
|
||||
return self.wait()
|
||||
return notready
|
||||
|
||||
def wait(self):
|
||||
"""Wait until another coroutine calls :meth:`send`.
|
||||
Returns the value the other coroutine passed to
|
||||
:meth:`send`.
|
||||
|
||||
>>> from eventlet import coros, api
|
||||
>>> evt = coros.Event()
|
||||
>>> def wait_on():
|
||||
... retval = evt.wait()
|
||||
... print "waited for", retval
|
||||
>>> _ = api.spawn(wait_on)
|
||||
>>> evt.send('result')
|
||||
>>> api.sleep(0)
|
||||
waited for result
|
||||
|
||||
Returns immediately if the event has already
|
||||
occured.
|
||||
|
||||
>>> evt.wait()
|
||||
'result'
|
||||
"""
|
||||
current = getcurrent()
|
||||
if self._result is NOT_USED:
|
||||
self._waiters.add(current)
|
||||
try:
|
||||
return hubs.get_hub().switch()
|
||||
finally:
|
||||
self._waiters.discard(current)
|
||||
if self._exc is not None:
|
||||
current.throw(*self._exc)
|
||||
return self._result
|
||||
|
||||
def send(self, result=None, exc=None):
|
||||
"""Makes arrangements for the waiters to be woken with the
|
||||
result and then returns immediately to the parent.
|
||||
|
||||
>>> from eventlet import coros, api
|
||||
>>> evt = coros.Event()
|
||||
>>> def waiter():
|
||||
... print 'about to wait'
|
||||
... result = evt.wait()
|
||||
... print 'waited for', result
|
||||
>>> _ = api.spawn(waiter)
|
||||
>>> api.sleep(0)
|
||||
about to wait
|
||||
>>> evt.send('a')
|
||||
>>> api.sleep(0)
|
||||
waited for a
|
||||
|
||||
It is an error to call :meth:`send` multiple times on the same event.
|
||||
|
||||
>>> evt.send('whoops')
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
AssertionError: Trying to re-send() an already-triggered event.
|
||||
|
||||
Use :meth:`reset` between :meth:`send` s to reuse an event object.
|
||||
"""
|
||||
assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.'
|
||||
self._result = result
|
||||
if exc is not None and not isinstance(exc, tuple):
|
||||
exc = (exc, )
|
||||
self._exc = exc
|
||||
hub = hubs.get_hub()
|
||||
if self._waiters:
|
||||
hub.schedule_call_global(0, self._do_send, self._result, self._exc, self._waiters.copy())
|
||||
|
||||
def _do_send(self, result, exc, waiters):
|
||||
while waiters:
|
||||
waiter = waiters.pop()
|
||||
if waiter in self._waiters:
|
||||
if exc is None:
|
||||
waiter.switch(result)
|
||||
else:
|
||||
waiter.throw(*exc)
|
||||
|
||||
def send_exception(self, *args):
|
||||
# the arguments and the same as for greenlet.throw
|
||||
return self.send(None, args)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from eventlet import api
|
||||
from eventlet import greenthread
|
||||
from eventlet import coros
|
||||
|
||||
__all__ = ['GreenPool', 'GreenPile']
|
||||
@@ -10,7 +10,7 @@ class GreenPool(object):
|
||||
self.size = size
|
||||
self.coroutines_running = set()
|
||||
self.sem = coros.Semaphore(size)
|
||||
self.no_coros_running = coros.Event()
|
||||
self.no_coros_running = greenthread.Event()
|
||||
|
||||
def resize(self, new_size):
|
||||
""" Change the max number of coroutines doing work at any given time.
|
||||
@@ -36,35 +36,58 @@ class GreenPool(object):
|
||||
|
||||
def spawn(self, func, *args, **kwargs):
|
||||
"""Run func(*args, **kwargs) in its own green thread. Returns the
|
||||
GreenThread object that is running the function.
|
||||
GreenThread object that is running the function, which can be used
|
||||
to retrieve the results.
|
||||
"""
|
||||
return self._spawn(func, *args, **kwargs)
|
||||
|
||||
def spawn_n(self, func, *args, **kwargs):
|
||||
""" Create a coroutine to run func(*args, **kwargs).
|
||||
|
||||
Returns None; the results of the function are not retrievable.
|
||||
The results of the function are not put into the results() iterator.
|
||||
"""
|
||||
self._spawn(func, *args, **kwargs)
|
||||
|
||||
def _spawn(self, func, *args, **kwargs):
|
||||
# if reentering an empty pool, don't try to wait on a coroutine freeing
|
||||
# itself -- instead, just execute in the current coroutine
|
||||
current = api.getcurrent()
|
||||
current = greenthread.getcurrent()
|
||||
if self.sem.locked() and current in self.coroutines_running:
|
||||
# a bit hacky to use the GT without switching to it
|
||||
gt = api.GreenThread(current)
|
||||
gt = greenthread.GreenThread(current)
|
||||
gt.main(func, args, kwargs)
|
||||
return gt
|
||||
else:
|
||||
self.sem.acquire()
|
||||
gt = api.spawn(func, *args, **kwargs)
|
||||
gt = greenthread.spawn(func, *args, **kwargs)
|
||||
if not self.coroutines_running:
|
||||
self.no_coros_running = coros.Event()
|
||||
self.no_coros_running = greenthread.Event()
|
||||
self.coroutines_running.add(gt)
|
||||
gt.link(self._spawn_done, coro=gt)
|
||||
return gt
|
||||
|
||||
def _spawn_n_impl(self, func, args, kwargs, coro=None):
|
||||
try:
|
||||
try:
|
||||
func(*args, **kwargs)
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
raise
|
||||
except:
|
||||
# TODO in debug mode print these
|
||||
pass
|
||||
finally:
|
||||
if coro is None:
|
||||
return
|
||||
else:
|
||||
coro = greenthread.getcurrent()
|
||||
self._spawn_done(coro=coro)
|
||||
|
||||
def spawn_n(self, func, *args, **kwargs):
|
||||
""" Create a coroutine to run func(*args, **kwargs).
|
||||
|
||||
Returns None; the results of the function are not retrievable.
|
||||
"""
|
||||
# if reentering an empty pool, don't try to wait on a coroutine freeing
|
||||
# itself -- instead, just execute in the current coroutine
|
||||
current = greenthread.getcurrent()
|
||||
if self.sem.locked() and current in self.coroutines_running:
|
||||
self._spawn_n_impl(func, args, kwargs)
|
||||
else:
|
||||
self.sem.acquire()
|
||||
g = greenthread.spawn_n(self._spawn_n_impl, func, args, kwargs, coro=True)
|
||||
if not self.coroutines_running:
|
||||
self.no_coros_running = coros.Event()
|
||||
self.coroutines_running.add(g)
|
||||
|
||||
def waitall(self):
|
||||
"""Waits until all coroutines in the pool are finished working."""
|
||||
@@ -72,7 +95,8 @@ class GreenPool(object):
|
||||
|
||||
def _spawn_done(self, result=None, exc=None, coro=None):
|
||||
self.sem.release()
|
||||
self.coroutines_running.remove(coro)
|
||||
if coro is not None:
|
||||
self.coroutines_running.remove(coro)
|
||||
# if done processing (no more work is waiting for processing),
|
||||
# send StopIteration so that the queue knows it's done
|
||||
if self.sem.balance == self.size:
|
||||
@@ -92,7 +116,7 @@ try:
|
||||
except NameError:
|
||||
def next(it):
|
||||
try:
|
||||
it.next()
|
||||
return it.next()
|
||||
except AttributeError:
|
||||
raise TypeError("%s object is not an iterator" % type(it))
|
||||
|
||||
@@ -157,5 +181,5 @@ class GreenPile(object):
|
||||
# iterates over us
|
||||
self.spawn(lambda: next(iter([])))
|
||||
# spin off a coroutine to launch the rest of the items
|
||||
api.spawn(self._do_map, function, it)
|
||||
greenthread.spawn(self._do_map, function, it)
|
||||
return self
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
# replacement of CoroutinePool implemented with proc module
|
||||
from eventlet import coros, proc, api
|
||||
|
||||
class Pool(object):
|
||||
import warnings
|
||||
warnings.warn("The pool module is deprecated. Please use the "
|
||||
"eventlet.GreenPool and eventlet.GreenPile classes instead.",
|
||||
DeprecationWarning, stacklevel=2)
|
||||
|
||||
class Pool(object):
|
||||
def __init__(self, min_size=0, max_size=4, track_events=False):
|
||||
if min_size > max_size:
|
||||
raise ValueError('min_size cannot be bigger than max_size')
|
||||
|
||||
@@ -8,7 +8,8 @@ from twisted.python import failure
|
||||
|
||||
from eventlet import proc
|
||||
from eventlet.api import getcurrent
|
||||
from eventlet.coros import Queue, Event
|
||||
from eventlet.coros import Queue
|
||||
from eventlet.greenthread import Event
|
||||
|
||||
|
||||
class ValueQueue(Queue):
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
import gc
|
||||
import os
|
||||
import random
|
||||
|
||||
from eventlet import api, hubs, parallel, coros
|
||||
import eventlet
|
||||
from eventlet import api
|
||||
from eventlet import hubs, parallel, coros
|
||||
import tests
|
||||
|
||||
class Spawn(tests.LimitedTestCase):
|
||||
@@ -10,11 +13,11 @@ class Spawn(tests.LimitedTestCase):
|
||||
def f(a, b=None):
|
||||
return (a,b)
|
||||
|
||||
gt = parallel.api. spawn(f, 1, b=2)
|
||||
gt = eventlet.spawn(f, 1, b=2)
|
||||
self.assertEquals(gt.wait(), (1,2))
|
||||
|
||||
def passthru(a):
|
||||
api.sleep(0.01)
|
||||
eventlet.sleep(0.01)
|
||||
return a
|
||||
|
||||
class GreenPool(tests.LimitedTestCase):
|
||||
@@ -30,7 +33,7 @@ class GreenPool(tests.LimitedTestCase):
|
||||
p = parallel.GreenPool(4)
|
||||
results_closure = []
|
||||
def do_something(a):
|
||||
api.sleep(0.01)
|
||||
eventlet.sleep(0.01)
|
||||
results_closure.append(a)
|
||||
for i in xrange(10):
|
||||
p.spawn(do_something, i)
|
||||
@@ -48,14 +51,14 @@ class GreenPool(tests.LimitedTestCase):
|
||||
|
||||
waiters = []
|
||||
self.assertEqual(pool.running(), 0)
|
||||
waiters.append(api.spawn(waiter, pool))
|
||||
api.sleep(0)
|
||||
waiters.append(eventlet.spawn(waiter, pool))
|
||||
eventlet.sleep(0)
|
||||
self.assertEqual(pool.waiting(), 0)
|
||||
waiters.append(api.spawn(waiter, pool))
|
||||
api.sleep(0)
|
||||
waiters.append(eventlet.spawn(waiter, pool))
|
||||
eventlet.sleep(0)
|
||||
self.assertEqual(pool.waiting(), 1)
|
||||
waiters.append(api.spawn(waiter, pool))
|
||||
api.sleep(0)
|
||||
waiters.append(eventlet.spawn(waiter, pool))
|
||||
eventlet.sleep(0)
|
||||
self.assertEqual(pool.waiting(), 2)
|
||||
self.assertEqual(pool.running(), 1)
|
||||
done.send(None)
|
||||
@@ -92,8 +95,8 @@ class GreenPool(tests.LimitedTestCase):
|
||||
pool = parallel.GreenPool(2)
|
||||
worker = pool.spawn(some_work)
|
||||
worker.wait()
|
||||
api.sleep(0)
|
||||
api.sleep(0)
|
||||
eventlet.sleep(0)
|
||||
eventlet.sleep(0)
|
||||
self.assertEquals(timer_fired, [])
|
||||
|
||||
def test_reentrant(self):
|
||||
@@ -136,8 +139,8 @@ class GreenPool(tests.LimitedTestCase):
|
||||
|
||||
# clean up by causing all the wait_long_time functions to return
|
||||
evt.send(None)
|
||||
api.sleep(0)
|
||||
api.sleep(0)
|
||||
eventlet.sleep(0)
|
||||
eventlet.sleep(0)
|
||||
|
||||
def test_resize(self):
|
||||
pool = parallel.GreenPool(2)
|
||||
@@ -156,8 +159,8 @@ class GreenPool(tests.LimitedTestCase):
|
||||
# cause the wait_long_time functions to return, which will
|
||||
# trigger puts to the pool
|
||||
evt.send(None)
|
||||
api.sleep(0)
|
||||
api.sleep(0)
|
||||
eventlet.sleep(0)
|
||||
eventlet.sleep(0)
|
||||
|
||||
self.assertEquals(pool.free(), 1)
|
||||
self.assertEquals(pool.running(), 0)
|
||||
@@ -195,7 +198,7 @@ class GreenPool(tests.LimitedTestCase):
|
||||
# the pool can get some random item back
|
||||
def send_wakeup(tp):
|
||||
tp.put('wakeup')
|
||||
gt = api.spawn(send_wakeup, tp)
|
||||
gt = eventlet.spawn(send_wakeup, tp)
|
||||
|
||||
# now we ask the pool to run something else, which should not
|
||||
# be affected by the previous send at all
|
||||
@@ -218,7 +221,7 @@ class GreenPool(tests.LimitedTestCase):
|
||||
self.assertEqual(p.free(), 1)
|
||||
gt.wait()
|
||||
self.assertEqual(r, [1])
|
||||
api.sleep(0)
|
||||
eventlet.sleep(0)
|
||||
self.assertEqual(p.free(), 2)
|
||||
|
||||
#Once the pool is exhausted, spawning forces a yield.
|
||||
@@ -232,7 +235,7 @@ class GreenPool(tests.LimitedTestCase):
|
||||
|
||||
p.spawn_n(foo, 4)
|
||||
self.assertEqual(set(r), set([1,2,3]))
|
||||
api.sleep(0)
|
||||
eventlet.sleep(0)
|
||||
self.assertEqual(set(r), set([1,2,3,4]))
|
||||
|
||||
class GreenPile(tests.LimitedTestCase):
|
||||
@@ -245,6 +248,11 @@ class GreenPile(tests.LimitedTestCase):
|
||||
p = parallel.GreenPile(4)
|
||||
result_iter = p.imap(passthru, [])
|
||||
self.assertRaises(StopIteration, result_iter.next)
|
||||
|
||||
def test_imap_nonefunc(self):
|
||||
p = parallel.GreenPile(4)
|
||||
result_list = list(p.imap(None, xrange(10)))
|
||||
self.assertEquals(result_list, [(x,) for x in xrange(10)])
|
||||
|
||||
def test_pile(self):
|
||||
p = parallel.GreenPile(4)
|
||||
@@ -273,12 +281,11 @@ class GreenPile(tests.LimitedTestCase):
|
||||
def bunch_of_work(pile, unique):
|
||||
for i in xrange(10):
|
||||
pile.spawn(passthru, i + unique)
|
||||
api.spawn(bunch_of_work, pile1, 0)
|
||||
api.spawn(bunch_of_work, pile2, 100)
|
||||
api.sleep(0)
|
||||
eventlet.spawn(bunch_of_work, pile1, 0)
|
||||
eventlet.spawn(bunch_of_work, pile2, 100)
|
||||
eventlet.sleep(0)
|
||||
self.assertEquals(list(pile2), list(xrange(100,110)))
|
||||
self.assertEquals(list(pile1), list(xrange(10)))
|
||||
|
||||
|
||||
|
||||
class StressException(Exception):
|
||||
@@ -287,16 +294,16 @@ class StressException(Exception):
|
||||
r = random.Random(0)
|
||||
def pressure(arg):
|
||||
while r.random() < 0.5:
|
||||
api.sleep(r.random() * 0.001)
|
||||
eventlet.sleep(r.random() * 0.001)
|
||||
if r.random() < 0.8:
|
||||
return arg
|
||||
else:
|
||||
raise StressException(arg)
|
||||
|
||||
# TODO: skip these unless explicitly demanded by the user
|
||||
class Stress(tests.SilencedTestCase):
|
||||
# tests will take extra-long
|
||||
TEST_TIMEOUT=10
|
||||
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
|
||||
def spawn_memory(self, concurrency):
|
||||
# checks that piles are strictly ordered
|
||||
# and bounded in memory
|
||||
@@ -306,9 +313,9 @@ class Stress(tests.SilencedTestCase):
|
||||
token = (unique, i)
|
||||
p.spawn(pressure, token)
|
||||
|
||||
api.spawn(makework, 1000, 1)
|
||||
api.spawn(makework, 1000, 2)
|
||||
api.spawn(makework, 1000, 3)
|
||||
eventlet.spawn(makework, 1000, 1)
|
||||
eventlet.spawn(makework, 1000, 2)
|
||||
eventlet.spawn(makework, 1000, 3)
|
||||
p.spawn(pressure, (0,0))
|
||||
latest = [-1] * 4
|
||||
received = 0
|
||||
@@ -330,15 +337,19 @@ class Stress(tests.SilencedTestCase):
|
||||
self.assert_(latest[unique] < order)
|
||||
latest[unique] = order
|
||||
|
||||
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
|
||||
def test_memory_5(self):
|
||||
self.spawn_memory(5)
|
||||
|
||||
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
|
||||
def test_memory_50(self):
|
||||
self.spawn_memory(50)
|
||||
|
||||
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
|
||||
def test_memory_500(self):
|
||||
self.spawn_memory(50)
|
||||
|
||||
|
||||
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
|
||||
def test_with_intpool(self):
|
||||
from eventlet import pools
|
||||
class IntPool(pools.Pool):
|
||||
@@ -349,7 +360,7 @@ class Stress(tests.SilencedTestCase):
|
||||
def subtest(intpool_size, pool_size, num_executes):
|
||||
def run(int_pool):
|
||||
token = int_pool.get()
|
||||
api.sleep(0.0001)
|
||||
eventlet.sleep(0.0001)
|
||||
int_pool.put(token)
|
||||
return token
|
||||
|
||||
|
||||
Reference in New Issue
Block a user