remove CoroutinePool from pools
This commit is contained in:
@@ -213,462 +213,6 @@ class ExceptionWrapper(object):
|
|||||||
self.e = e
|
self.e = e
|
||||||
|
|
||||||
|
|
||||||
class CoroutinePool(Pool):
|
|
||||||
""" Like a thread pool, but with coroutines.
|
|
||||||
|
|
||||||
Coroutine pools are useful for splitting up tasks or globally controlling
|
|
||||||
concurrency. You don't retrieve the coroutines directly with get() --
|
|
||||||
instead use the execute() and execute_async() methods to run code.
|
|
||||||
|
|
||||||
>>> from eventlet import coros, api
|
|
||||||
>>> p = coros.CoroutinePool(max_size=2)
|
|
||||||
>>> def foo(a):
|
|
||||||
... print "foo", a
|
|
||||||
...
|
|
||||||
>>> evt = p.execute(foo, 1)
|
|
||||||
>>> evt.wait()
|
|
||||||
foo 1
|
|
||||||
|
|
||||||
Once the pool is exhausted, calling an execute forces a yield.
|
|
||||||
|
|
||||||
>>> p.execute_async(foo, 2)
|
|
||||||
>>> p.execute_async(foo, 3)
|
|
||||||
>>> p.free()
|
|
||||||
0
|
|
||||||
>>> p.execute_async(foo, 4)
|
|
||||||
foo 2
|
|
||||||
foo 3
|
|
||||||
|
|
||||||
>>> api.sleep(0)
|
|
||||||
foo 4
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, min_size=0, max_size=4, track_events=False):
|
|
||||||
self._greenlets = set()
|
|
||||||
if track_events:
|
|
||||||
self._tracked_events = []
|
|
||||||
self._next_event = None
|
|
||||||
else:
|
|
||||||
self._tracked_events = None
|
|
||||||
self.requested = coros.metaphore()
|
|
||||||
super(CoroutinePool, self).__init__(min_size, max_size)
|
|
||||||
|
|
||||||
## This doesn't yet pass its own doctest -- but I'm not even sure it's a
|
|
||||||
## wonderful idea.
|
|
||||||
## def __del__(self):
|
|
||||||
## """Experimental: try to prevent the calling script from exiting until
|
|
||||||
## all coroutines in this pool have run to completion.
|
|
||||||
|
|
||||||
## >>> from eventlet import coros
|
|
||||||
## >>> pool = coros.CoroutinePool()
|
|
||||||
## >>> def saw(x): print "I saw %s!"
|
|
||||||
## ...
|
|
||||||
## >>> pool.launch_all(saw, "GHI")
|
|
||||||
## >>> del pool
|
|
||||||
## I saw G!
|
|
||||||
## I saw H!
|
|
||||||
## I saw I!
|
|
||||||
## """
|
|
||||||
## self.wait_all()
|
|
||||||
|
|
||||||
def _main_loop(self, sender):
|
|
||||||
""" Private, infinite loop run by a pooled coroutine. """
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
recvd = sender.wait()
|
|
||||||
# Delete the sender's result here because the very
|
|
||||||
# first event through the loop is referenced by
|
|
||||||
# spawn_startup, and therefore is not itself deleted.
|
|
||||||
# This means that we have to free up its argument
|
|
||||||
# because otherwise said argument persists in memory
|
|
||||||
# forever. This is generally only a problem in unit
|
|
||||||
# tests.
|
|
||||||
sender._result = coros.NOT_USED
|
|
||||||
|
|
||||||
sender = coros.event()
|
|
||||||
(evt, func, args, kw) = recvd
|
|
||||||
self._safe_apply(evt, func, args, kw)
|
|
||||||
#api.get_hub().cancel_timers(api.getcurrent())
|
|
||||||
# Likewise, delete these variables or else they will
|
|
||||||
# be referenced by this frame until replaced by the
|
|
||||||
# next recvd, which may or may not be a long time from
|
|
||||||
# now.
|
|
||||||
del evt, func, args, kw, recvd
|
|
||||||
|
|
||||||
self.put(sender)
|
|
||||||
finally:
|
|
||||||
# if we get here, something broke badly, and all we can really
|
|
||||||
# do is try to keep the pool from leaking items.
|
|
||||||
# Shouldn't even try to print the exception.
|
|
||||||
self.put(self.create())
|
|
||||||
|
|
||||||
def _safe_apply(self, evt, func, args, kw):
|
|
||||||
""" Private method that runs the function, catches exceptions, and
|
|
||||||
passes back the return value in the event."""
|
|
||||||
try:
|
|
||||||
result = func(*args, **kw)
|
|
||||||
if evt is not None:
|
|
||||||
evt.send(result)
|
|
||||||
if self._tracked_events is not None:
|
|
||||||
if self._next_event is None:
|
|
||||||
self._tracked_events.append(result)
|
|
||||||
else:
|
|
||||||
|
|
||||||
ne = self._next_event
|
|
||||||
self._next_event = None
|
|
||||||
ne.send(result)
|
|
||||||
except api.GreenletExit, e:
|
|
||||||
# we're printing this out to see if it ever happens
|
|
||||||
# in practice
|
|
||||||
print "GreenletExit raised in coroutine pool", e
|
|
||||||
if evt is not None:
|
|
||||||
evt.send(e) # sent as a return value, not an exception
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
raise # allow program to exit
|
|
||||||
except Exception, e:
|
|
||||||
traceback.print_exc()
|
|
||||||
if evt is not None:
|
|
||||||
evt.send(exc=e)
|
|
||||||
if self._tracked_events is not None:
|
|
||||||
if self._next_event is None:
|
|
||||||
self._tracked_events.append(ExceptionWrapper(e))
|
|
||||||
else:
|
|
||||||
ne = self._next_event
|
|
||||||
self._next_event = None
|
|
||||||
ne.send(exc=e)
|
|
||||||
|
|
||||||
def _execute(self, evt, func, args, kw):
|
|
||||||
""" Private implementation of the execute methods.
|
|
||||||
"""
|
|
||||||
# if reentering an empty pool, don't try to wait on a coroutine freeing
|
|
||||||
# itself -- instead, just execute in the current coroutine
|
|
||||||
if self.free() == 0 and api.getcurrent() in self._greenlets:
|
|
||||||
self._safe_apply(evt, func, args, kw)
|
|
||||||
else:
|
|
||||||
sender = self.get()
|
|
||||||
sender.send((evt, func, args, kw))
|
|
||||||
|
|
||||||
def create(self):
|
|
||||||
"""Private implementation of eventlet.pools.Pool
|
|
||||||
interface. Creates an event and spawns the
|
|
||||||
_main_loop coroutine, passing the event.
|
|
||||||
The event is used to send a callable into the
|
|
||||||
new coroutine, to be executed.
|
|
||||||
"""
|
|
||||||
sender = coros.event()
|
|
||||||
self._greenlets.add(api.spawn(self._main_loop, sender))
|
|
||||||
return sender
|
|
||||||
|
|
||||||
def get(self):
|
|
||||||
"""Override of eventlet.pools.Pool interface"""
|
|
||||||
# Track the number of requested CoroutinePool coroutines
|
|
||||||
self.requested.inc()
|
|
||||||
# forward call to base class
|
|
||||||
return super(CoroutinePool, self).get()
|
|
||||||
|
|
||||||
def put(self, item):
|
|
||||||
"""Override of eventlet.pools.Pool interface"""
|
|
||||||
# forward call to base class
|
|
||||||
super(CoroutinePool, self).put(item)
|
|
||||||
# Track the number of outstanding CoroutinePool coroutines
|
|
||||||
self.requested.dec()
|
|
||||||
|
|
||||||
def execute(self, func, *args, **kw):
|
|
||||||
"""Execute func in one of the coroutines maintained
|
|
||||||
by the pool, when one is free.
|
|
||||||
|
|
||||||
Immediately returns an eventlet.coros.event object which
|
|
||||||
func's result will be sent to when it is available.
|
|
||||||
|
|
||||||
>>> from eventlet import coros
|
|
||||||
>>> p = coros.CoroutinePool()
|
|
||||||
>>> evt = p.execute(lambda a: ('foo', a), 1)
|
|
||||||
>>> evt.wait()
|
|
||||||
('foo', 1)
|
|
||||||
"""
|
|
||||||
receiver = coros.event()
|
|
||||||
self._execute(receiver, func, args, kw)
|
|
||||||
return receiver
|
|
||||||
|
|
||||||
def execute_async(self, func, *args, **kw):
|
|
||||||
"""Execute func in one of the coroutines maintained
|
|
||||||
by the pool, when one is free.
|
|
||||||
|
|
||||||
No return value is provided.
|
|
||||||
>>> from eventlet import coros, api
|
|
||||||
>>> p = coros.CoroutinePool()
|
|
||||||
>>> def foo(a):
|
|
||||||
... print "foo", a
|
|
||||||
...
|
|
||||||
>>> p.execute_async(foo, 1)
|
|
||||||
>>> api.sleep(0)
|
|
||||||
foo 1
|
|
||||||
"""
|
|
||||||
self._execute(None, func, args, kw)
|
|
||||||
|
|
||||||
def wait(self):
|
|
||||||
"""Wait for the next execute in the pool to complete,
|
|
||||||
and return the result.
|
|
||||||
|
|
||||||
You must pass track_events=True to the CoroutinePool constructor
|
|
||||||
in order to use this method.
|
|
||||||
"""
|
|
||||||
assert self._tracked_events is not None, (
|
|
||||||
"Must pass track_events=True to the constructor to use CoroutinePool.wait()")
|
|
||||||
if self._next_event is not None:
|
|
||||||
return self._next_event.wait()
|
|
||||||
|
|
||||||
if not self._tracked_events:
|
|
||||||
self._next_event = coros.event()
|
|
||||||
return self._next_event.wait()
|
|
||||||
|
|
||||||
result = self._tracked_events.pop(0)
|
|
||||||
if isinstance(result, ExceptionWrapper):
|
|
||||||
raise result.e
|
|
||||||
|
|
||||||
if not self._tracked_events:
|
|
||||||
self._next_event = coros.event()
|
|
||||||
return result
|
|
||||||
|
|
||||||
def killall(self):
|
|
||||||
for g in self._greenlets:
|
|
||||||
api.kill(g)
|
|
||||||
|
|
||||||
def wait_all(self):
|
|
||||||
"""Wait until all coroutines started either by execute() or
|
|
||||||
execute_async() have completed. If you kept the event objects returned
|
|
||||||
by execute(), you can then call their individual wait() methods to
|
|
||||||
retrieve results with no further actual waiting.
|
|
||||||
|
|
||||||
>>> from eventlet import coros
|
|
||||||
>>> pool = coros.CoroutinePool()
|
|
||||||
>>> pool.wait_all()
|
|
||||||
>>> def hi(name):
|
|
||||||
... print "Hello, %s!" % name
|
|
||||||
... return name
|
|
||||||
...
|
|
||||||
>>> evt = pool.execute(hi, "world")
|
|
||||||
>>> pool.execute_async(hi, "darkness, my old friend")
|
|
||||||
>>> pool.wait_all()
|
|
||||||
Hello, world!
|
|
||||||
Hello, darkness, my old friend!
|
|
||||||
>>> evt.wait()
|
|
||||||
'world'
|
|
||||||
>>> pool.wait_all()
|
|
||||||
"""
|
|
||||||
self.requested.wait()
|
|
||||||
|
|
||||||
def launch_all(self, function, iterable):
|
|
||||||
"""For each tuple (sequence) in iterable, launch function(*tuple) in
|
|
||||||
its own coroutine -- like itertools.starmap(), but in parallel.
|
|
||||||
Discard values returned by function(). You should call wait_all() to
|
|
||||||
wait for all coroutines, newly-launched plus any previously-submitted
|
|
||||||
execute() or execute_async() calls, to complete.
|
|
||||||
|
|
||||||
>>> from eventlet import coros
|
|
||||||
>>> pool = coros.CoroutinePool()
|
|
||||||
>>> def saw(x):
|
|
||||||
... print "I saw %s!" % x
|
|
||||||
...
|
|
||||||
>>> pool.launch_all(saw, "ABC")
|
|
||||||
>>> pool.wait_all()
|
|
||||||
I saw A!
|
|
||||||
I saw B!
|
|
||||||
I saw C!
|
|
||||||
"""
|
|
||||||
for tup in iterable:
|
|
||||||
self.execute_async(function, *tup)
|
|
||||||
|
|
||||||
def process_all(self, function, iterable):
|
|
||||||
"""For each tuple (sequence) in iterable, launch function(*tuple) in
|
|
||||||
its own coroutine -- like itertools.starmap(), but in parallel.
|
|
||||||
Discard values returned by function(). Don't return until all
|
|
||||||
coroutines, newly-launched plus any previously-submitted execute() or
|
|
||||||
execute_async() calls, have completed.
|
|
||||||
|
|
||||||
>>> from eventlet import coros
|
|
||||||
>>> pool = coros.CoroutinePool()
|
|
||||||
>>> def saw(x): print "I saw %s!" % x
|
|
||||||
...
|
|
||||||
>>> pool.process_all(saw, "DEF")
|
|
||||||
I saw D!
|
|
||||||
I saw E!
|
|
||||||
I saw F!
|
|
||||||
"""
|
|
||||||
self.launch_all(function, iterable)
|
|
||||||
self.wait_all()
|
|
||||||
|
|
||||||
def generate_results(self, function, iterable, qsize=None):
|
|
||||||
"""For each tuple (sequence) in iterable, launch function(*tuple) in
|
|
||||||
its own coroutine -- like itertools.starmap(), but in parallel.
|
|
||||||
Yield each of the values returned by function(), in the order they're
|
|
||||||
completed rather than the order the coroutines were launched.
|
|
||||||
|
|
||||||
Iteration stops when we've yielded results for each arguments tuple in
|
|
||||||
iterable. Unlike wait_all() and process_all(), this function does not
|
|
||||||
wait for any previously-submitted execute() or execute_async() calls.
|
|
||||||
|
|
||||||
Results are temporarily buffered in a queue. If you pass qsize=, this
|
|
||||||
value is used to limit the max size of the queue: an attempt to buffer
|
|
||||||
too many results will suspend the completed CoroutinePool coroutine
|
|
||||||
until the requesting coroutine (the caller of generate_results()) has
|
|
||||||
retrieved one or more results by calling this generator-iterator's
|
|
||||||
next().
|
|
||||||
|
|
||||||
If any coroutine raises an uncaught exception, that exception will
|
|
||||||
propagate to the requesting coroutine via the corresponding next() call.
|
|
||||||
|
|
||||||
What I particularly want these tests to illustrate is that using this
|
|
||||||
generator function:
|
|
||||||
|
|
||||||
for result in generate_results(function, iterable):
|
|
||||||
# ... do something with result ...
|
|
||||||
|
|
||||||
executes coroutines at least as aggressively as the classic eventlet
|
|
||||||
idiom:
|
|
||||||
|
|
||||||
events = [pool.execute(function, *args) for args in iterable]
|
|
||||||
for event in events:
|
|
||||||
result = event.wait()
|
|
||||||
# ... do something with result ...
|
|
||||||
|
|
||||||
even without a distinct event object for every arg tuple in iterable,
|
|
||||||
and despite the funny flow control from interleaving launches of new
|
|
||||||
coroutines with yields of completed coroutines' results.
|
|
||||||
|
|
||||||
(The use case that makes this function preferable to the classic idiom
|
|
||||||
above is when the iterable, which may itself be a generator, produces
|
|
||||||
millions of items.)
|
|
||||||
|
|
||||||
>>> from eventlet import coros
|
|
||||||
>>> import string
|
|
||||||
>>> pool = coros.CoroutinePool(max_size=5)
|
|
||||||
>>> pausers = [coros.event() for x in xrange(2)]
|
|
||||||
>>> def longtask(evt, desc):
|
|
||||||
... print "%s woke up with %s" % (desc, evt.wait())
|
|
||||||
...
|
|
||||||
>>> pool.launch_all(longtask, zip(pausers, "AB"))
|
|
||||||
>>> def quicktask(desc):
|
|
||||||
... print "returning %s" % desc
|
|
||||||
... return desc
|
|
||||||
...
|
|
||||||
|
|
||||||
(Instead of using a for loop, step through generate_results()
|
|
||||||
items individually to illustrate timing)
|
|
||||||
|
|
||||||
>>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase))
|
|
||||||
>>> print step.next()
|
|
||||||
returning a
|
|
||||||
returning b
|
|
||||||
returning c
|
|
||||||
a
|
|
||||||
>>> print step.next()
|
|
||||||
b
|
|
||||||
>>> print step.next()
|
|
||||||
c
|
|
||||||
>>> print step.next()
|
|
||||||
returning d
|
|
||||||
returning e
|
|
||||||
returning f
|
|
||||||
d
|
|
||||||
>>> pausers[0].send("A")
|
|
||||||
>>> print step.next()
|
|
||||||
e
|
|
||||||
>>> print step.next()
|
|
||||||
f
|
|
||||||
>>> print step.next()
|
|
||||||
A woke up with A
|
|
||||||
returning g
|
|
||||||
returning h
|
|
||||||
returning i
|
|
||||||
g
|
|
||||||
>>> print "".join([step.next() for x in xrange(3)])
|
|
||||||
returning j
|
|
||||||
returning k
|
|
||||||
returning l
|
|
||||||
returning m
|
|
||||||
hij
|
|
||||||
>>> pausers[1].send("B")
|
|
||||||
>>> print "".join([step.next() for x in xrange(4)])
|
|
||||||
B woke up with B
|
|
||||||
returning n
|
|
||||||
returning o
|
|
||||||
returning p
|
|
||||||
returning q
|
|
||||||
klmn
|
|
||||||
"""
|
|
||||||
# Get an iterator because of our funny nested loop below. Wrap the
|
|
||||||
# iterable in enumerate() so we count items that come through.
|
|
||||||
tuples = iter(enumerate(iterable))
|
|
||||||
# If the iterable is empty, this whole function is a no-op, and we can
|
|
||||||
# save ourselves some grief by just quitting out. In particular, once
|
|
||||||
# we enter the outer loop below, we're going to wait on the queue --
|
|
||||||
# but if we launched no coroutines with that queue as the destination,
|
|
||||||
# we could end up waiting a very long time.
|
|
||||||
try:
|
|
||||||
index, args = tuples.next()
|
|
||||||
except StopIteration:
|
|
||||||
return
|
|
||||||
# From this point forward, 'args' is the current arguments tuple and
|
|
||||||
# 'index+1' counts how many such tuples we've seen.
|
|
||||||
# This implementation relies on the fact that _execute() accepts an
|
|
||||||
# event-like object, and -- unless it's None -- the completed
|
|
||||||
# coroutine calls send(result). We slyly pass a queue rather than an
|
|
||||||
# event -- the same queue instance for all coroutines. This is why our
|
|
||||||
# queue interface intentionally resembles the event interface.
|
|
||||||
q = coros.queue(max_size=qsize)
|
|
||||||
# How many results have we yielded so far?
|
|
||||||
finished = 0
|
|
||||||
# This first loop is only until we've launched all the coroutines. Its
|
|
||||||
# complexity is because if iterable contains more args tuples than the
|
|
||||||
# size of our pool, attempting to _execute() the (poolsize+1)th
|
|
||||||
# coroutine would suspend until something completes and send()s its
|
|
||||||
# result to our queue. But to keep down queue overhead and to maximize
|
|
||||||
# responsiveness to our caller, we'd rather suspend on reading the
|
|
||||||
# queue. So we stuff the pool as full as we can, then wait for
|
|
||||||
# something to finish, then stuff more coroutines into the pool.
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
# Before each yield, start as many new coroutines as we can fit.
|
|
||||||
# (The self.free() test isn't 100% accurate: if we happen to be
|
|
||||||
# executing in one of the pool's coroutines, we could _execute()
|
|
||||||
# without waiting even if self.free() reports 0. See _execute().)
|
|
||||||
# The point is that we don't want to wait in the _execute() call,
|
|
||||||
# we want to wait in the q.wait() call.
|
|
||||||
# IMPORTANT: at start, and whenever we've caught up with all
|
|
||||||
# coroutines we've launched so far, we MUST iterate this inner
|
|
||||||
# loop at least once, regardless of self.free() -- otherwise the
|
|
||||||
# q.wait() call below will deadlock!
|
|
||||||
# Recall that index is the index of the NEXT args tuple that we
|
|
||||||
# haven't yet launched. Therefore it counts how many args tuples
|
|
||||||
# we've launched so far.
|
|
||||||
while self.free() > 0 or finished == index:
|
|
||||||
# Just like the implementation of execute_async(), save that
|
|
||||||
# we're passing our queue instead of None as the "event" to
|
|
||||||
# which to send() the result.
|
|
||||||
self._execute(q, function, args, {})
|
|
||||||
# We've consumed that args tuple, advance to next.
|
|
||||||
index, args = tuples.next()
|
|
||||||
# Okay, we've filled up the pool again, yield a result -- which
|
|
||||||
# will probably wait for a coroutine to complete. Although we do
|
|
||||||
# have q.ready(), so we could iterate without waiting, we avoid
|
|
||||||
# that because every yield could involve considerable real time.
|
|
||||||
# We don't know how long it takes to return from yield, so every
|
|
||||||
# time we do, take the opportunity to stuff more requests into the
|
|
||||||
# pool before yielding again.
|
|
||||||
yield q.wait()
|
|
||||||
# Be sure to count results so we know when to stop!
|
|
||||||
finished += 1
|
|
||||||
except StopIteration:
|
|
||||||
pass
|
|
||||||
# Here we've exhausted the input iterable. index+1 is the total number
|
|
||||||
# of coroutines we've launched. We probably haven't yielded that many
|
|
||||||
# results yet. Wait for the rest of the results, yielding them as they
|
|
||||||
# arrive.
|
|
||||||
while finished < index + 1:
|
|
||||||
yield q.wait()
|
|
||||||
finished += 1
|
|
||||||
|
|
||||||
if __name__=='__main__':
|
if __name__=='__main__':
|
||||||
import doctest
|
import doctest
|
||||||
doctest.testmod()
|
doctest.testmod()
|
||||||
|
@@ -240,190 +240,6 @@ class TestFan(tests.TestCase):
|
|||||||
self.assertRaises(pools.SomeFailed, self.pool.fan, my_failing_callable, range(4))
|
self.assertRaises(pools.SomeFailed, self.pool.fan, my_failing_callable, range(4))
|
||||||
|
|
||||||
|
|
||||||
class TestCoroutinePool(tests.TestCase):
|
|
||||||
mode = 'static'
|
|
||||||
def setUp(self):
|
|
||||||
# raise an exception if we're waiting forever
|
|
||||||
self._cancel_timeout = api.exc_after(1, TestTookTooLong())
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
self._cancel_timeout.cancel()
|
|
||||||
|
|
||||||
def test_execute_async(self):
|
|
||||||
done = coros.event()
|
|
||||||
def some_work():
|
|
||||||
done.send()
|
|
||||||
pool = pools.CoroutinePool(0, 2)
|
|
||||||
pool.execute_async(some_work)
|
|
||||||
done.wait()
|
|
||||||
|
|
||||||
def test_execute(self):
|
|
||||||
value = 'return value'
|
|
||||||
def some_work():
|
|
||||||
return value
|
|
||||||
pool = pools.CoroutinePool(0, 2)
|
|
||||||
worker = pool.execute(some_work)
|
|
||||||
self.assertEqual(value, worker.wait())
|
|
||||||
|
|
||||||
def test_multiple_coros(self):
|
|
||||||
evt = coros.event()
|
|
||||||
results = []
|
|
||||||
def producer():
|
|
||||||
results.append('prod')
|
|
||||||
evt.send()
|
|
||||||
|
|
||||||
def consumer():
|
|
||||||
results.append('cons1')
|
|
||||||
evt.wait()
|
|
||||||
results.append('cons2')
|
|
||||||
|
|
||||||
pool = pools.CoroutinePool(0, 2)
|
|
||||||
done = pool.execute(consumer)
|
|
||||||
pool.execute_async(producer)
|
|
||||||
done.wait()
|
|
||||||
self.assertEquals(['cons1', 'prod', 'cons2'], results)
|
|
||||||
|
|
||||||
def test_timer_cancel(self):
|
|
||||||
def some_work():
|
|
||||||
t = timer.Timer(5, lambda: None)
|
|
||||||
t.autocancellable = True
|
|
||||||
t.schedule()
|
|
||||||
return t
|
|
||||||
pool = pools.CoroutinePool(0, 2)
|
|
||||||
worker = pool.execute(some_work)
|
|
||||||
t = worker.wait()
|
|
||||||
api.sleep(0)
|
|
||||||
self.assertEquals(t.cancelled, True)
|
|
||||||
|
|
||||||
def test_reentrant(self):
|
|
||||||
pool = pools.CoroutinePool(0,1)
|
|
||||||
def reenter():
|
|
||||||
waiter = pool.execute(lambda a: a, 'reenter')
|
|
||||||
self.assertEqual('reenter', waiter.wait())
|
|
||||||
|
|
||||||
outer_waiter = pool.execute(reenter)
|
|
||||||
outer_waiter.wait()
|
|
||||||
|
|
||||||
evt = coros.event()
|
|
||||||
def reenter_async():
|
|
||||||
pool.execute_async(lambda a: a, 'reenter')
|
|
||||||
evt.send('done')
|
|
||||||
|
|
||||||
pool.execute_async(reenter_async)
|
|
||||||
evt.wait()
|
|
||||||
|
|
||||||
def test_horrible_main_loop_death(self):
|
|
||||||
# testing the case that causes the run_forever
|
|
||||||
# method to exit unwantedly
|
|
||||||
pool = pools.CoroutinePool(min_size=1, max_size=1)
|
|
||||||
def crash(*args, **kw):
|
|
||||||
raise RuntimeError("Whoa")
|
|
||||||
class FakeFile(object):
|
|
||||||
write = crash
|
|
||||||
|
|
||||||
# we're going to do this by causing the traceback.print_exc in
|
|
||||||
# safe_apply to raise an exception and thus exit _main_loop
|
|
||||||
normal_err = sys.stderr
|
|
||||||
try:
|
|
||||||
sys.stderr = FakeFile()
|
|
||||||
waiter = pool.execute(crash)
|
|
||||||
self.assertRaises(RuntimeError, waiter.wait)
|
|
||||||
# the pool should have something free at this point since the
|
|
||||||
# waiter returned
|
|
||||||
self.assertEqual(pool.free(), 1)
|
|
||||||
# shouldn't block when trying to get
|
|
||||||
t = api.exc_after(0.1, api.TimeoutError)
|
|
||||||
self.assert_(pool.get())
|
|
||||||
t.cancel()
|
|
||||||
finally:
|
|
||||||
sys.stderr = normal_err
|
|
||||||
|
|
||||||
def test_track_events(self):
|
|
||||||
pool = pools.CoroutinePool(track_events=True)
|
|
||||||
for x in range(6):
|
|
||||||
pool.execute(lambda n: n, x)
|
|
||||||
for y in range(6):
|
|
||||||
pool.wait()
|
|
||||||
|
|
||||||
def test_track_slow_event(self):
|
|
||||||
pool = pools.CoroutinePool(track_events=True)
|
|
||||||
def slow():
|
|
||||||
api.sleep(0.1)
|
|
||||||
return 'ok'
|
|
||||||
pool.execute(slow)
|
|
||||||
self.assertEquals(pool.wait(), 'ok')
|
|
||||||
|
|
||||||
def test_channel_smash(self):
|
|
||||||
# The premise is that the coroutine in the pool exhibits an
|
|
||||||
# interest in receiving data from the channel, but then times
|
|
||||||
# out and gets recycled, so it ceases to care about what gets
|
|
||||||
# sent over the channel. The pool should be able to tell the
|
|
||||||
# channel about the sudden change of heart, or else, when we
|
|
||||||
# eventually do send something into the channel it will catch
|
|
||||||
# the coroutine pool's coroutine in an awkward place, losing
|
|
||||||
# the data that we're sending.
|
|
||||||
from eventlet import pools
|
|
||||||
pool = pools.CoroutinePool(min_size=1, max_size=1)
|
|
||||||
tp = pools.TokenPool(max_size=1)
|
|
||||||
token = tp.get() # empty pool
|
|
||||||
def do_receive(tp):
|
|
||||||
api.exc_after(0, RuntimeError())
|
|
||||||
try:
|
|
||||||
t = tp.get()
|
|
||||||
self.fail("Shouldn't have recieved anything from the pool")
|
|
||||||
except RuntimeError:
|
|
||||||
return 'timed out'
|
|
||||||
|
|
||||||
# the execute makes the pool expect that coroutine, but then
|
|
||||||
# immediately cuts bait
|
|
||||||
e1 = pool.execute(do_receive, tp)
|
|
||||||
self.assertEquals(e1.wait(), 'timed out')
|
|
||||||
|
|
||||||
# the pool can get some random item back
|
|
||||||
def send_wakeup(tp):
|
|
||||||
tp.put('wakeup')
|
|
||||||
api.spawn(send_wakeup, tp)
|
|
||||||
|
|
||||||
# now we ask the pool to run something else, which should not
|
|
||||||
# be affected by the previous send at all
|
|
||||||
def resume():
|
|
||||||
return 'resumed'
|
|
||||||
e2 = pool.execute(resume)
|
|
||||||
self.assertEquals(e2.wait(), 'resumed')
|
|
||||||
|
|
||||||
# we should be able to get out the thing we put in there, too
|
|
||||||
self.assertEquals(tp.get(), 'wakeup')
|
|
||||||
|
|
||||||
def test_channel_death(self):
|
|
||||||
# In here, we have a coroutine trying to receive data from a
|
|
||||||
# channel, but timing out immediately and dying. The channel
|
|
||||||
# should be smart enough to not try to send data to a dead
|
|
||||||
# coroutine, because if it tries to, it'll lose the data.
|
|
||||||
from eventlet import pools
|
|
||||||
tp = pools.TokenPool(max_size=1)
|
|
||||||
token = tp.get()
|
|
||||||
e1 = coros.event()
|
|
||||||
def do_receive(evt, tp):
|
|
||||||
api.exc_after(0, RuntimeError())
|
|
||||||
try:
|
|
||||||
t = tp.get()
|
|
||||||
evt.send(t)
|
|
||||||
except RuntimeError:
|
|
||||||
evt.send('timed out')
|
|
||||||
|
|
||||||
# the execute gets the pool to add a waiter, but then kills
|
|
||||||
# itself off
|
|
||||||
api.spawn(do_receive, e1, tp)
|
|
||||||
self.assertEquals(e1.wait(), 'timed out')
|
|
||||||
|
|
||||||
def send_wakeup(tp):
|
|
||||||
tp.put('wakeup')
|
|
||||||
api.spawn(send_wakeup, tp)
|
|
||||||
|
|
||||||
# should be able to retrieve the message
|
|
||||||
self.assertEquals(tp.get(), 'wakeup')
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
tests.main()
|
tests.main()
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user