Add DAGPool, a dependency-driven greenthread pool
https://github.com/eventlet/eventlet/pull/347
This commit is contained in:
parent
4d2cdca17a
commit
6d0103298a
@ -6,6 +6,7 @@ Module Reference
|
||||
|
||||
modules/backdoor
|
||||
modules/corolocal
|
||||
modules/dagpool
|
||||
modules/debug
|
||||
modules/db_pool
|
||||
modules/event
|
||||
|
6
doc/modules/dagpool.rst
Normal file
6
doc/modules/dagpool.rst
Normal file
@ -0,0 +1,6 @@
|
||||
:mod:`dagpool` -- Dependency-Driven Greenthreads
|
||||
================================================
|
||||
|
||||
.. automodule:: eventlet.dagpool
|
||||
:members:
|
||||
|
572
eventlet/dagpool.py
Normal file
572
eventlet/dagpool.py
Normal file
@ -0,0 +1,572 @@
|
||||
"""\
|
||||
@file dagpool.py
|
||||
@author Nat Goodspeed
|
||||
@date 2016-08-08
|
||||
@brief Provide DAGPool class
|
||||
"""
|
||||
|
||||
from eventlet.event import Event
|
||||
from eventlet import greenthread
|
||||
from eventlet.support import six
|
||||
import collections
|
||||
|
||||
|
||||
# value distinguished from any other Python value including None
|
||||
_MISSING = object()
|
||||
|
||||
|
||||
class Collision(Exception):
|
||||
"""
|
||||
DAGPool raises Collision when you try to launch two greenthreads with the
|
||||
same key, or post() a result for a key corresponding to a greenthread, or
|
||||
post() twice for the same key. As with KeyError, str(collision) names the
|
||||
key in question.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class PropagateError(Exception):
|
||||
"""
|
||||
When a DAGPool greenthread terminates with an exception instead of
|
||||
returning a result, attempting to retrieve its value raises
|
||||
PropagateError.
|
||||
|
||||
Attributes:
|
||||
key: the key of the greenthread which raised the exception
|
||||
exc: the exception object raised by the greenthread
|
||||
"""
|
||||
def __init__(self, key, exc):
|
||||
self.key = key
|
||||
self.exc = exc
|
||||
|
||||
def __str__(self):
|
||||
return "PropagateError({0}): {1}: {2}" \
|
||||
.format(self.key, self.exc.__class__.__name__, self.exc)
|
||||
|
||||
|
||||
class DAGPool(object):
|
||||
"""
|
||||
A DAGPool is a pool that constrains greenthreads, not by max concurrency,
|
||||
but by data dependencies.
|
||||
|
||||
This is a way to implement general DAG dependencies. A simple dependency
|
||||
tree (flowing in either direction) can straightforwardly be implemented
|
||||
using recursion and (e.g.) GreenThread.imap(). What gets complicated is
|
||||
when a given node depends on several other nodes as well as contributing
|
||||
to several other nodes.
|
||||
|
||||
With DAGPool, you concurrently launch all applicable greenthreads; each
|
||||
will proceed as soon as it has all required inputs. The DAG is implicit in
|
||||
which items are required by each greenthread.
|
||||
|
||||
Each greenthread is launched in a DAGPool with a key: any value that can
|
||||
serve as a Python dict key. The caller also specifies an iterable of other
|
||||
keys on which this greenthread depends. This iterable may be empty.
|
||||
|
||||
The greenthread callable must accept (key, results), where:
|
||||
|
||||
key is its own key
|
||||
results is an iterable of (key, value) pairs.
|
||||
|
||||
A newly-launched DAGPool greenthread is entered immediately, and can
|
||||
perform any necessary setup work. At some point it will iterate over the
|
||||
(key, value) pairs from the passed 'results' iterable. Doing so blocks the
|
||||
greenthread until a value is available for each of the keys specified in
|
||||
its initial dependencies iterable. These (key, value) pairs are delivered
|
||||
in chronological order, NOT the order in which they are initially
|
||||
specified: each value will be delivered as soon as it becomes available.
|
||||
|
||||
The value returned by a DAGPool greenthread becomes the value for its
|
||||
key, which unblocks any other greenthreads waiting on that key.
|
||||
|
||||
If a DAGPool greenthread terminates with an exception instead of returning
|
||||
a value, attempting to retrieve the value raises PropagateError, which
|
||||
binds the key of the original greenthread and the original exception.
|
||||
Unless the greenthread attempting to retrieve the value handles
|
||||
PropagateError, that exception will in turn be wrapped in a PropagateError
|
||||
of its own, and so forth. The code that ultimately handles PropagateError
|
||||
can follow the chain of PropagateError.exc attributes to discover the flow
|
||||
of that exception through the DAG of greenthreads.
|
||||
|
||||
External greenthreads may also interact with a DAGPool. See wait_each(),
|
||||
waitall(), post().
|
||||
|
||||
It is not recommended to constrain external DAGPool producer greenthreads
|
||||
in a GreenPool: it may be hard to provably avoid deadlock.
|
||||
"""
|
||||
|
||||
_Coro = collections.namedtuple("_Coro", ("greenthread", "pending"))
|
||||
|
||||
def __init__(self, preload={}):
|
||||
"""
|
||||
DAGPool can be prepopulated with an initial dict or iterable of (key,
|
||||
value) pairs. These (key, value) pairs are of course immediately
|
||||
available for any greenthread that depends on any of those keys.
|
||||
"""
|
||||
try:
|
||||
# If a dict is passed, copy it. Don't risk a subsequent
|
||||
# modification to passed dict affecting our internal state.
|
||||
iteritems = six.iteritems(preload)
|
||||
except AttributeError:
|
||||
# Not a dict, just an iterable of (key, value) pairs
|
||||
iteritems = preload
|
||||
|
||||
# Load the initial dict
|
||||
self.values = dict(iteritems)
|
||||
|
||||
# track greenthreads
|
||||
self.coros = {}
|
||||
|
||||
# The key to blocking greenthreads is the Event.
|
||||
self.event = Event()
|
||||
|
||||
def waitall(self):
|
||||
"""
|
||||
waitall() blocks the calling greenthread until there is a value for
|
||||
every DAGPool greenthread launched by spawn(). It returns a dict
|
||||
containing all preload data, all data from post() and all values
|
||||
returned by spawned greenthreads.
|
||||
|
||||
See also wait().
|
||||
"""
|
||||
# waitall() is an alias for compatibility with GreenPool
|
||||
return self.wait()
|
||||
|
||||
def wait(self, keys=_MISSING):
|
||||
"""
|
||||
keys is an optional iterable of keys. If you omit the argument, it
|
||||
waits for all the keys from preload data, from post() calls and from
|
||||
spawn() calls: in other words, all the keys of which this DAGPool is
|
||||
aware.
|
||||
|
||||
wait() blocks the calling greenthread until all of the relevant keys
|
||||
have values. wait() returns a dict whose keys are the relevant keys,
|
||||
and whose values come from the preload data, from values returned by
|
||||
DAGPool greenthreads or from post() calls.
|
||||
|
||||
If a greenthread terminates with an exception, wait() will raise
|
||||
PropagateError wrapping that exception. If more than one greenthread
|
||||
terminates with an exception, it is indeterminate which one wait()
|
||||
will raise.
|
||||
|
||||
If a greenthread posts a PropagateError instance, wait() will raise
|
||||
that PropagateError. If more than one greenthread posts
|
||||
PropagateError, it is indeterminate which one wait() will raise.
|
||||
|
||||
See also wait_each_success(), wait_each_exception().
|
||||
"""
|
||||
# This is mostly redundant with wait_each() functionality.
|
||||
return dict(self.wait_each(keys))
|
||||
|
||||
def wait_each(self, keys=_MISSING):
|
||||
"""
|
||||
keys is an optional iterable of keys. If you omit the argument, it
|
||||
waits for all the keys from preload data, from post() calls and from
|
||||
spawn() calls: in other words, all the keys of which this DAGPool is
|
||||
aware.
|
||||
|
||||
wait_each() is a generator producing (key, value) pairs as a value
|
||||
becomes available for each requested key. wait_each() blocks the
|
||||
calling greenthread until the next value becomes available. If the
|
||||
DAGPool was prepopulated with values for any of the relevant keys, of
|
||||
course those can be delivered immediately without waiting.
|
||||
|
||||
Delivery order is intentionally decoupled from the initial sequence of
|
||||
keys: each value is delivered as soon as it becomes available. If
|
||||
multiple keys are available at the same time, wait_each() delivers
|
||||
each of the ready ones in arbitrary order before blocking again.
|
||||
|
||||
The DAGPool does not distinguish between a value returned by one of
|
||||
its own greenthreads and one provided by a post() call or preload data.
|
||||
|
||||
The wait_each() generator terminates (raises StopIteration) when all
|
||||
specified keys have been delivered. Thus, typical usage might be:
|
||||
|
||||
for key, value in dagpool.wait_each(keys):
|
||||
# process this ready key and value
|
||||
# continue processing now that we've gotten values for all keys
|
||||
|
||||
By implication, if you pass wait_each() an empty iterable of keys, it
|
||||
returns immediately without yielding anything.
|
||||
|
||||
If the value to be delivered is a PropagateError exception object, the
|
||||
generator raises that PropagateError instead of yielding it.
|
||||
|
||||
See also wait_each_success(), wait_each_exception().
|
||||
"""
|
||||
# Build a local set() and then call _wait_each().
|
||||
return self._wait_each(self._get_keyset_for_wait_each(keys))
|
||||
|
||||
def wait_each_success(self, keys=_MISSING):
|
||||
"""
|
||||
wait_each_success() filters results so that only success values are
|
||||
yielded. In other words, unlike wait_each(), wait_each_success() will
|
||||
not raise PropagateError. Not every provided (or defaulted) key will
|
||||
necessarily be represented, though naturally the generator must wait
|
||||
until all have completed.
|
||||
|
||||
In all other respects, wait_each_success() behaves like wait_each().
|
||||
"""
|
||||
for key, value in self._wait_each_raw(self._get_keyset_for_wait_each(keys)):
|
||||
if not isinstance(value, PropagateError):
|
||||
yield key, value
|
||||
|
||||
def wait_each_exception(self, keys=_MISSING):
|
||||
"""
|
||||
wait_each_exception() filters results so that only exceptions are
|
||||
yielded. Not every provided (or defaulted) key will necessarily be
|
||||
represented, though naturally the generator must wait until all have
|
||||
completed.
|
||||
|
||||
Unlike other DAGPool methods, wait_each_exception() simply yields
|
||||
PropagateError instances as values rather than raising them.
|
||||
|
||||
In all other respects, wait_each_exception() behaves like wait_each().
|
||||
"""
|
||||
for key, value in self._wait_each_raw(self._get_keyset_for_wait_each(keys)):
|
||||
if isinstance(value, PropagateError):
|
||||
yield key, value
|
||||
|
||||
def _get_keyset_for_wait_each(self, keys):
|
||||
"""
|
||||
wait_each(), wait_each_success() and wait_each_exception() promise
|
||||
that if you pass an iterable of keys, the method will wait for results
|
||||
from those keys -- but if you omit the keys argument, the method will
|
||||
wait for results from all known keys. This helper implements that
|
||||
distinction, returning a set() of the relevant keys.
|
||||
"""
|
||||
if keys is not _MISSING:
|
||||
return set(keys)
|
||||
else:
|
||||
# keys arg omitted -- use all the keys we know about
|
||||
return set(six.iterkeys(self.coros)) | set(six.iterkeys(self.values))
|
||||
|
||||
def _wait_each(self, pending):
|
||||
"""
|
||||
When _wait_each() encounters a value of PropagateError, it raises it.
|
||||
|
||||
In all other respects, _wait_each() behaves like _wait_each_raw().
|
||||
"""
|
||||
for key, value in self._wait_each_raw(pending):
|
||||
yield key, self._value_or_raise(value)
|
||||
|
||||
@staticmethod
|
||||
def _value_or_raise(value):
|
||||
# Most methods attempting to deliver PropagateError should raise that
|
||||
# instead of simply returning it.
|
||||
if isinstance(value, PropagateError):
|
||||
raise value
|
||||
return value
|
||||
|
||||
def _wait_each_raw(self, pending):
|
||||
"""
|
||||
pending is a set() of keys for which we intend to wait. THIS SET WILL
|
||||
BE DESTRUCTIVELY MODIFIED: as each key acquires a value, that key will
|
||||
be removed from the passed 'pending' set.
|
||||
|
||||
_wait_each_raw() does not treat a PropagateError instance specially:
|
||||
it will be yielded to the caller like any other value.
|
||||
|
||||
In all other respects, _wait_each_raw() behaves like wait_each().
|
||||
"""
|
||||
while True:
|
||||
# Before even waiting, show caller any (key, value) pairs that
|
||||
# are already available. Copy 'pending' because we want to be able
|
||||
# to remove items from the original set while iterating.
|
||||
for key in pending.copy():
|
||||
value = self.values.get(key, _MISSING)
|
||||
if value is not _MISSING:
|
||||
# found one, it's no longer pending
|
||||
pending.remove(key)
|
||||
yield (key, value)
|
||||
|
||||
if not pending:
|
||||
# Once we've yielded all the caller's keys, done.
|
||||
break
|
||||
|
||||
# There are still more keys pending, so wait.
|
||||
self.event.wait()
|
||||
|
||||
def spawn(self, key, depends, function, *args, **kwds):
|
||||
"""
|
||||
Launch the passed function(key, results, ...) as a greenthread,
|
||||
passing it:
|
||||
|
||||
- the specified 'key'
|
||||
- an iterable of (key, value) pairs
|
||||
- whatever other positional args or keywords you specify.
|
||||
|
||||
Iterating over the 'results' iterable behaves like calling
|
||||
wait_each(depends).
|
||||
|
||||
Returning from function() behaves like post(key, return_value).
|
||||
|
||||
If function() terminates with an exception, that exception is wrapped
|
||||
in PropagateError with the greenthread's key and (effectively) posted
|
||||
as the value for that key. Attempting to retrieve that value will
|
||||
raise that PropagateError.
|
||||
|
||||
Thus, if the greenthread with key 'a' terminates with an exception,
|
||||
and greenthread 'b' depends on 'a', when greenthread 'b' attempts to
|
||||
iterate through its 'results' argument, it will encounter
|
||||
PropagateError. So by default, an uncaught exception will propagate
|
||||
through all the downstream dependencies.
|
||||
|
||||
If you pass spawn() a key already passed to spawn() or post(), spawn()
|
||||
raises Collision.
|
||||
"""
|
||||
if key in self.coros or key in self.values:
|
||||
raise Collision(key)
|
||||
|
||||
# The order is a bit tricky. First construct the set() of keys.
|
||||
pending = set(depends)
|
||||
# It's important that we pass to _wait_each() the same 'pending' set()
|
||||
# that we store in self.coros for this key. The generator-iterator
|
||||
# returned by _wait_each() becomes the function's 'results' iterable.
|
||||
newcoro = greenthread.spawn(self._wrapper, function, key,
|
||||
self._wait_each(pending),
|
||||
*args, **kwds)
|
||||
# Also capture the same (!) set in the new _Coro object for this key.
|
||||
# We must be able to observe ready keys being removed from the set.
|
||||
self.coros[key] = self._Coro(newcoro, pending)
|
||||
|
||||
def _wrapper(self, function, key, results, *args, **kwds):
|
||||
"""
|
||||
This wrapper runs the top-level function in a DAGPool greenthread,
|
||||
posting its return value (or PropagateError) to the DAGPool.
|
||||
"""
|
||||
try:
|
||||
# call our passed function
|
||||
result = function(key, results, *args, **kwds)
|
||||
except Exception as err:
|
||||
# Wrap any exception it may raise in a PropagateError.
|
||||
result = PropagateError(key, err)
|
||||
finally:
|
||||
# function() has returned (or terminated with an exception). We no
|
||||
# longer need to track this greenthread in self.coros. Remove it
|
||||
# first so post() won't complain about a running greenthread.
|
||||
del self.coros[key]
|
||||
|
||||
try:
|
||||
# as advertised, try to post() our return value
|
||||
self.post(key, result)
|
||||
except Collision:
|
||||
# if we've already post()ed a result, oh well
|
||||
pass
|
||||
|
||||
# also, in case anyone cares...
|
||||
return result
|
||||
|
||||
def spawn_many(self, depends, function, *args, **kwds):
|
||||
"""
|
||||
spawn_many() accepts a single function whose parameters are the same
|
||||
as for spawn().
|
||||
|
||||
The difference is that spawn_many() accepts a dependency dict. A new
|
||||
greenthread is spawned for each key in the dict. That dict key's value
|
||||
should be an iterable of other keys on which this greenthread depends.
|
||||
"""
|
||||
# Iterate over 'depends' items, relying on self.spawn() not to
|
||||
# context-switch so no one can modify 'depends' along the way.
|
||||
for key, deps in six.iteritems(depends):
|
||||
self.spawn(key, deps, function, *args, **kwds)
|
||||
|
||||
def kill(self, key):
|
||||
"""
|
||||
Kill the greenthread that was spawned with the specified 'key'.
|
||||
|
||||
If no such greenthread was spawned, raise KeyError.
|
||||
"""
|
||||
# let KeyError, if any, propagate
|
||||
self.coros[key].greenthread.kill()
|
||||
# once killed, remove it
|
||||
del self.coros[key]
|
||||
|
||||
def post(self, key, value, replace=False):
|
||||
"""
|
||||
post(key, value) stores the passed value for the passed key. It then
|
||||
causes each greenthread blocked on its results iterable, or on
|
||||
wait_each(keys), to check for new values. A waiting greenthread might
|
||||
not literally resume on every single post() of a relevant key, but the
|
||||
first post() of a relevant key ensures that it will resume eventually,
|
||||
and when it does it will catch up with all relevant post() calls.
|
||||
|
||||
Calling post(key, value) when there is a running greenthread with that
|
||||
same 'key' raises Collision. If you must post(key, value) instead of
|
||||
letting the greenthread run to completion, you must first call
|
||||
kill(key).
|
||||
|
||||
The DAGPool implicitly post()s the return value from each of its
|
||||
greenthreads. But a greenthread may explicitly post() a value for its
|
||||
own key, which will cause its return value to be discarded.
|
||||
|
||||
Calling post(key, value, replace=False) (the default 'replace') when a
|
||||
value for that key has already been posted, by any means, raises
|
||||
Collision.
|
||||
|
||||
Calling post(key, value, replace=True) when a value for that key has
|
||||
already been posted, by any means, replaces the previously-stored
|
||||
value. However, that may make it complicated to reason about the
|
||||
behavior of greenthreads waiting on that key.
|
||||
|
||||
After a post(key, value1) followed by post(key, value2, replace=True),
|
||||
it is unspecified which pending wait_each([key...]) calls (or
|
||||
greenthreads iterating over 'results' involving that key) will observe
|
||||
value1 versus value2. It is guaranteed that subsequent
|
||||
wait_each([key...]) calls (or greenthreads spawned after that point)
|
||||
will observe value2.
|
||||
|
||||
A successful call to post(key, PropagateError(key, ExceptionSubclass))
|
||||
ensures that any subsequent attempt to retrieve that key's value will
|
||||
raise that PropagateError instance.
|
||||
"""
|
||||
# First, check if we're trying to post() to a key with a running
|
||||
# greenthread.
|
||||
# A DAGPool greenthread is explicitly permitted to post() to its
|
||||
# OWN key.
|
||||
coro = self.coros.get(key, _MISSING)
|
||||
if coro is not _MISSING and coro.greenthread is not greenthread.getcurrent():
|
||||
# oh oh, trying to post a value for running greenthread from
|
||||
# some other greenthread
|
||||
raise Collision(key)
|
||||
|
||||
# Here, either we're posting a value for a key with no greenthread or
|
||||
# we're posting from that greenthread itself.
|
||||
|
||||
# Has somebody already post()ed a value for this key?
|
||||
# Unless replace == True, this is a problem.
|
||||
if key in self.values and not replace:
|
||||
raise Collision(key)
|
||||
|
||||
# Either we've never before posted a value for this key, or we're
|
||||
# posting with replace == True.
|
||||
|
||||
# update our database
|
||||
self.values[key] = value
|
||||
# and wake up pending waiters
|
||||
self.event.send()
|
||||
# The comment in Event.reset() says: "it's better to create a new
|
||||
# event rather than reset an old one". Okay, fine. We do want to be
|
||||
# able to support new waiters, so create a new Event.
|
||||
self.event = Event()
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""
|
||||
__getitem__(key) (aka dagpool[key]) blocks until 'key' has a value,
|
||||
then delivers that value.
|
||||
"""
|
||||
# This is a degenerate case of wait_each(). Construct a tuple
|
||||
# containing only this 'key'. wait_each() will yield exactly one (key,
|
||||
# value) pair. Return just its value.
|
||||
for _, value in self.wait_each((key,)):
|
||||
return value
|
||||
|
||||
def get(self, key, default=None):
|
||||
"""
|
||||
get() returns the value for 'key'. If 'key' does not yet have a value,
|
||||
get() returns 'default'.
|
||||
"""
|
||||
return self._value_or_raise(self.values.get(key, default))
|
||||
|
||||
def keys(self):
|
||||
"""
|
||||
Return a tuple of keys for which we currently have values. Explicitly
|
||||
return a copy rather than an iterator: don't assume our caller will
|
||||
finish iterating before new values are posted.
|
||||
"""
|
||||
return tuple(six.iterkeys(self.values))
|
||||
|
||||
def items(self):
|
||||
"""
|
||||
Return a snapshot tuple of currently-available (key, value) pairs.
|
||||
Don't assume our caller will finish iterating before new values are
|
||||
posted.
|
||||
"""
|
||||
return tuple((key, self._value_or_raise(value))
|
||||
for key, value in six.iteritems(self.values))
|
||||
|
||||
def running(self):
|
||||
"""
|
||||
Return number of running greenthreads. This includes greenthreads
|
||||
blocked while iterating through their 'results' iterable, that is,
|
||||
greenthreads waiting on values from other keys.
|
||||
"""
|
||||
return len(self.coros)
|
||||
|
||||
def running_keys(self):
|
||||
"""
|
||||
Return keys for running greenthreads. This includes greenthreads
|
||||
blocked while iterating through their 'results' iterable, that is,
|
||||
greenthreads waiting on values from other keys.
|
||||
"""
|
||||
# return snapshot; don't assume caller will finish iterating before we
|
||||
# next modify self.coros
|
||||
return tuple(six.iterkeys(self.coros))
|
||||
|
||||
def waiting(self):
|
||||
"""
|
||||
Return number of waiting greenthreads, that is, greenthreads still
|
||||
waiting on values from other keys. This explicitly does NOT include
|
||||
external greenthreads waiting on wait(), waitall(), wait_each().
|
||||
"""
|
||||
# n.b. if Event would provide a count of its waiters, we could say
|
||||
# something about external greenthreads as well.
|
||||
# The logic to determine this count is exactly the same as the general
|
||||
# waiting_for() call.
|
||||
return len(self.waiting_for())
|
||||
|
||||
# Use _MISSING instead of None as the default 'key' param so we can permit
|
||||
# None as a supported key.
|
||||
def waiting_for(self, key=_MISSING):
|
||||
"""
|
||||
waiting_for(key) returns a set() of the keys for which the greenthread
|
||||
spawned with that key is still waiting. If you pass a key for which no
|
||||
greenthread was spawned, waiting_for() raises KeyError.
|
||||
|
||||
waiting_for() without argument returns a dict. Its keys are the keys
|
||||
of greenthreads still waiting on one or more values. In the returned
|
||||
dict, the value of each such key is the set of other keys for which
|
||||
that greenthread is still waiting.
|
||||
|
||||
This method allows diagnosing a 'hung' DAGPool. If certain
|
||||
greenthreads are making no progress, it's possible that they are
|
||||
waiting on keys for which there is no greenthread and no post() data.
|
||||
"""
|
||||
# We may have greenthreads whose 'pending' entry indicates they're
|
||||
# waiting on some keys even though values have now been posted for
|
||||
# some or all of those keys, because those greenthreads have not yet
|
||||
# regained control since values were posted. So make a point of
|
||||
# excluding values that are now available.
|
||||
available = set(six.iterkeys(self.values))
|
||||
|
||||
if key is not _MISSING:
|
||||
# waiting_for(key) is semantically different than waiting_for().
|
||||
# It's just that they both seem to want the same method name.
|
||||
coro = self.coros.get(key, _MISSING)
|
||||
if coro is _MISSING:
|
||||
# Hmm, no running greenthread with this key. But was there
|
||||
# EVER a greenthread with this key? If not, let KeyError
|
||||
# propagate.
|
||||
self.values[key]
|
||||
# Oh good, there's a value for this key. Either the
|
||||
# greenthread finished, or somebody posted a value. Just say
|
||||
# the greenthread isn't waiting for anything.
|
||||
return set()
|
||||
else:
|
||||
# coro is the _Coro for the running greenthread with the
|
||||
# specified key.
|
||||
return coro.pending - available
|
||||
|
||||
# This is a waiting_for() call, i.e. a general query rather than for a
|
||||
# specific key.
|
||||
|
||||
# Start by iterating over (key, coro) pairs in self.coros. Generate
|
||||
# (key, pending) pairs in which 'pending' is the set of keys on which
|
||||
# the greenthread believes it's waiting, minus the set of keys that
|
||||
# are now available. Filter out any pair in which 'pending' is empty,
|
||||
# that is, that greenthread will be unblocked next time it resumes.
|
||||
# Make a dict from those pairs.
|
||||
return dict((key, pending)
|
||||
for key, pending in ((key, (coro.pending - available))
|
||||
for key, coro in six.iteritems(self.coros))
|
||||
if pending)
|
693
tests/dagpool_test.py
Normal file
693
tests/dagpool_test.py
Normal file
@ -0,0 +1,693 @@
|
||||
"""\
|
||||
@file dagpool_test.py
|
||||
@author Nat Goodspeed
|
||||
@date 2016-08-26
|
||||
@brief Test DAGPool class
|
||||
"""
|
||||
|
||||
from nose.tools import *
|
||||
import eventlet
|
||||
from eventlet.dagpool import DAGPool, Collision, PropagateError
|
||||
from eventlet.support import six
|
||||
from contextlib import contextmanager
|
||||
import itertools
|
||||
|
||||
|
||||
# Not all versions of nose.tools.assert_raises() support the usage in this
|
||||
# module, but it's straightforward enough to code that explicitly.
|
||||
@contextmanager
|
||||
def assert_raises(exc):
|
||||
"""exc is an exception class"""
|
||||
try:
|
||||
yield
|
||||
except exc:
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("failed to raise expected exception {0}"
|
||||
.format(exc.__class__.__name__))
|
||||
|
||||
|
||||
def assert_in(sought, container):
|
||||
assert sought in container, "{0} not in {1}".format(sought, container)
|
||||
|
||||
|
||||
# ****************************************************************************
|
||||
# Verify that a given operation returns without suspending
|
||||
# ****************************************************************************
|
||||
# module-scope counter allows us to verify when the main greenthread running
|
||||
# the test does or does not suspend
|
||||
counter = None
|
||||
|
||||
|
||||
def incrementer():
|
||||
"""
|
||||
This function runs as a background greenthread. Every time it regains
|
||||
control, it increments 'counter' and relinquishes control again. The point
|
||||
is that by testing 'counter' before and after a particular operation, a
|
||||
test can determine whether other greenthreads were allowed to run during
|
||||
that operation -- in other words, whether that operation suspended.
|
||||
"""
|
||||
global counter
|
||||
# suspend_checker() initializes counter to 0, so the first time we get
|
||||
# control, set it to 1
|
||||
for counter in itertools.count(1):
|
||||
eventlet.sleep(0)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def suspend_checker():
|
||||
"""
|
||||
This context manager enables check_no_suspend() support. It runs the
|
||||
incrementer() function as a background greenthread, then kills it off when
|
||||
you exit the block.
|
||||
"""
|
||||
global counter
|
||||
# make counter not None to enable check_no_suspend()
|
||||
counter = 0
|
||||
coro = eventlet.spawn(incrementer)
|
||||
yield
|
||||
coro.kill()
|
||||
# set counter back to None to disable check_no_suspend()
|
||||
counter = None
|
||||
|
||||
|
||||
@contextmanager
|
||||
def check_no_suspend():
|
||||
"""
|
||||
Within a 'with suspend_checker()' block, use 'with check_no_suspend()' to
|
||||
verify that a particular operation does not suspend the calling
|
||||
greenthread. If it does suspend, incrementer() will have regained control
|
||||
and incremented the global 'counter'.
|
||||
"""
|
||||
global counter
|
||||
# It would be an easy mistake to use check_no_suspend() outside of a
|
||||
# suspend_checker() block. Without the incrementer() greenthread running,
|
||||
# 'counter' will never be incremented, therefore check_no_suspend() will
|
||||
# always be satisfied, possibly masking bugs.
|
||||
assert counter is not None, "Use 'with suspend_checker():' to enable check_no_suspend()"
|
||||
current = counter
|
||||
yield
|
||||
assert counter == current, "Operation suspended {0} times".format(counter - current)
|
||||
|
||||
|
||||
def test_check_no_suspend():
|
||||
with assert_raises(AssertionError):
|
||||
# We WANT this to raise AssertionError because it's outside of a
|
||||
# suspend_checker() block -- that is, we have no incrementer()
|
||||
# greenthread.
|
||||
with check_no_suspend():
|
||||
pass
|
||||
|
||||
# Here we use check_no_suspend() the right way, inside 'with
|
||||
# suspend_checker()'. Does it really do what we claim it should?
|
||||
with suspend_checker():
|
||||
with assert_raises(AssertionError):
|
||||
with check_no_suspend():
|
||||
# suspend, so we know if check_no_suspend() asserts
|
||||
eventlet.sleep(0)
|
||||
|
||||
|
||||
# ****************************************************************************
|
||||
# Verify that the expected things happened in the expected order
|
||||
# ****************************************************************************
|
||||
class Capture(object):
|
||||
"""
|
||||
This class is intended to capture a sequence (of string messages) to
|
||||
verify that all expected events occurred, and in the expected order. The
|
||||
tricky part is that certain subsequences can occur in arbitrary order and
|
||||
still be correct.
|
||||
|
||||
Specifically, when posting a particular value to a DAGPool instance
|
||||
unblocks several waiting greenthreads, it is indeterminate which
|
||||
greenthread will first receive the new value.
|
||||
|
||||
Similarly, when several values for which a particular greenthread is
|
||||
waiting become available at (effectively) the same time, it is
|
||||
indeterminate in which order they will be delivered.
|
||||
|
||||
This is addressed by building a list of sets. Each set contains messages
|
||||
that can occur in indeterminate order, therefore comparing that set to any
|
||||
other ordering of the same messages should succeed. However, it's
|
||||
important that each set of messages that occur 'at the same time' should
|
||||
itself be properly sequenced with respect to all other such sets.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.sequence = [set()]
|
||||
|
||||
def add(self, message):
|
||||
self.sequence[-1].add(message)
|
||||
|
||||
def step(self):
|
||||
self.sequence.append(set())
|
||||
|
||||
def validate(self, sequence):
|
||||
# Let caller pass any sequence of grouped items. For comparison
|
||||
# purposes, turn them into the specific form we store: a list of sets.
|
||||
setlist = []
|
||||
for subseq in sequence:
|
||||
if isinstance(subseq, six.string_types):
|
||||
# If this item is a plain string (which Python regards as an
|
||||
# iterable of characters) rather than a list or tuple or set
|
||||
# of strings, treat it as atomic. Make a set containing only
|
||||
# that string.
|
||||
setlist.append(set([subseq]))
|
||||
else:
|
||||
try:
|
||||
iter(subseq)
|
||||
except TypeError:
|
||||
# subseq is a scalar of some other kind. Make a set
|
||||
# containing only that item.
|
||||
setlist.append(set([subseq]))
|
||||
else:
|
||||
# subseq is, as we expect, an iterable -- possibly already
|
||||
# a set. Make a set containing its elements.
|
||||
setlist.append(set(subseq))
|
||||
# Now that we've massaged 'sequence' into 'setlist', compare.
|
||||
assert_equal(self.sequence, setlist)
|
||||
|
||||
|
||||
# ****************************************************************************
|
||||
# Canonical DAGPool greenthread function
|
||||
# ****************************************************************************
|
||||
def observe(key, results, capture, event):
|
||||
for k, v in results:
|
||||
capture.add("{0} got {1}".format(key, k))
|
||||
result = event.wait()
|
||||
capture.add("{0} returning {1}".format(key, result))
|
||||
return result
|
||||
|
||||
|
||||
# ****************************************************************************
|
||||
# DAGPool test functions
|
||||
# ****************************************************************************
|
||||
def test_init():
|
||||
with suspend_checker():
|
||||
# no preload data, just so we know it doesn't blow up
|
||||
pool = DAGPool()
|
||||
|
||||
# preload dict
|
||||
pool = DAGPool(dict(a=1, b=2, c=3))
|
||||
# this must not hang
|
||||
with check_no_suspend():
|
||||
results = pool.waitall()
|
||||
# with no spawn() or post(), waitall() returns preload data
|
||||
assert_equals(results, dict(a=1, b=2, c=3))
|
||||
|
||||
# preload sequence of pairs
|
||||
pool = DAGPool([("d", 4), ("e", 5), ("f", 6)])
|
||||
# this must not hang
|
||||
with check_no_suspend():
|
||||
results = pool.waitall()
|
||||
assert_equals(results, dict(d=4, e=5, f=6))
|
||||
|
||||
|
||||
def test_wait_each_empty():
|
||||
pool = DAGPool()
|
||||
with suspend_checker():
|
||||
with check_no_suspend():
|
||||
for k, v in pool.wait_each(()):
|
||||
# shouldn't yield anything
|
||||
raise AssertionError("empty wait_each() returned ({0}, {1})".format(k, v))
|
||||
|
||||
|
||||
def test_wait_each_preload():
|
||||
pool = DAGPool(dict(a=1, b=2, c=3))
|
||||
with suspend_checker():
|
||||
with check_no_suspend():
|
||||
# wait_each() may deliver in arbitrary order; collect into a dict
|
||||
# for comparison
|
||||
assert_equals(dict(pool.wait_each("abc")), dict(a=1, b=2, c=3))
|
||||
|
||||
# while we're at it, test wait() for preloaded keys
|
||||
assert_equals(pool.wait("bc"), dict(b=2, c=3))
|
||||
|
||||
|
||||
def post_each(pool, capture):
|
||||
# distinguish the results wait_each() can retrieve immediately from those
|
||||
# it must wait for us to post()
|
||||
eventlet.sleep(0)
|
||||
capture.step()
|
||||
pool.post('g', 'gval')
|
||||
pool.post('f', 'fval')
|
||||
eventlet.sleep(0)
|
||||
capture.step()
|
||||
pool.post('e', 'eval')
|
||||
pool.post('d', 'dval')
|
||||
|
||||
|
||||
def test_wait_each_posted():
|
||||
capture = Capture()
|
||||
pool = DAGPool(dict(a=1, b=2, c=3))
|
||||
eventlet.spawn(post_each, pool, capture)
|
||||
# use a string as a convenient iterable of single-letter keys
|
||||
for k, v in pool.wait_each("bcdefg"):
|
||||
capture.add("got ({0}, {1})".format(k, v))
|
||||
|
||||
capture.validate([
|
||||
["got (b, 2)", "got (c, 3)"],
|
||||
["got (f, fval)", "got (g, gval)"],
|
||||
["got (d, dval)", "got (e, eval)"],
|
||||
])
|
||||
|
||||
|
||||
def test_wait_posted():
|
||||
# same as test_wait_each_posted(), but calling wait()
|
||||
capture = Capture()
|
||||
pool = DAGPool(dict(a=1, b=2, c=3))
|
||||
eventlet.spawn(post_each, pool, capture)
|
||||
gotten = pool.wait("bcdefg")
|
||||
capture.add("got all")
|
||||
assert_equals(gotten,
|
||||
dict(b=2, c=3,
|
||||
d="dval", e="eval",
|
||||
f="fval", g="gval"))
|
||||
capture.validate([
|
||||
[],
|
||||
[],
|
||||
["got all"],
|
||||
])
|
||||
|
||||
|
||||
def test_spawn_collision_preload():
|
||||
pool = DAGPool([("a", 1)])
|
||||
with assert_raises(Collision):
|
||||
pool.spawn("a", (), lambda key, results: None)
|
||||
|
||||
|
||||
def test_spawn_collision_post():
|
||||
pool = DAGPool()
|
||||
pool.post("a", "aval")
|
||||
with assert_raises(Collision):
|
||||
pool.spawn("a", (), lambda key, results: None)
|
||||
|
||||
|
||||
def test_spawn_collision_spawn():
|
||||
pool = DAGPool()
|
||||
pool.spawn("a", (), lambda key, results: "aval")
|
||||
# hasn't yet even started
|
||||
assert_equals(pool.get("a"), None)
|
||||
with assert_raises(Collision):
|
||||
# Attempting to spawn again with same key should collide even if the
|
||||
# first spawned greenthread hasn't yet had a chance to run.
|
||||
pool.spawn("a", (), lambda key, results: "bad")
|
||||
# now let the spawned eventlet run
|
||||
eventlet.sleep(0)
|
||||
# should have finished
|
||||
assert_equals(pool.get("a"), "aval")
|
||||
with assert_raises(Collision):
|
||||
# Attempting to spawn with same key collides even when the greenthread
|
||||
# has completed.
|
||||
pool.spawn("a", (), lambda key, results: "badagain")
|
||||
|
||||
|
||||
def spin():
|
||||
# Let all pending greenthreads run until they're blocked
|
||||
for x in range(10):
|
||||
eventlet.sleep(0)
|
||||
|
||||
|
||||
def test_spawn_multiple():
|
||||
capture = Capture()
|
||||
pool = DAGPool(dict(a=1, b=2, c=3))
|
||||
events = {}
|
||||
for k in "defg":
|
||||
events[k] = eventlet.event.Event()
|
||||
pool.spawn(k, (), observe, capture, events[k])
|
||||
# Now for a greenthread that depends on ALL the above.
|
||||
events["h"] = eventlet.event.Event()
|
||||
# trigger the last event right away: we only care about dependencies
|
||||
events["h"].send("hval")
|
||||
pool.spawn("h", "bcdefg", observe, capture, events["h"])
|
||||
|
||||
# let all the spawned greenthreads get as far as they can
|
||||
spin()
|
||||
capture.step()
|
||||
# but none of them has yet produced a result
|
||||
for k in "defgh":
|
||||
assert_equals(pool.get(k), None)
|
||||
assert_equals(set(pool.keys()), set("abc"))
|
||||
assert_equals(dict(pool.items()), dict(a=1, b=2, c=3))
|
||||
assert_equals(pool.running(), 5)
|
||||
assert_equals(set(pool.running_keys()), set("defgh"))
|
||||
assert_equals(pool.waiting(), 1)
|
||||
assert_equals(pool.waiting_for(), dict(h=set("defg")))
|
||||
assert_equals(pool.waiting_for("d"), set())
|
||||
assert_equals(pool.waiting_for("c"), set())
|
||||
with assert_raises(KeyError):
|
||||
pool.waiting_for("j")
|
||||
assert_equals(pool.waiting_for("h"), set("defg"))
|
||||
|
||||
# let one of the upstream greenthreads complete
|
||||
events["f"].send("fval")
|
||||
spin()
|
||||
capture.step()
|
||||
assert_equals(pool.get("f"), "fval")
|
||||
assert_equals(set(pool.keys()), set("abcf"))
|
||||
assert_equals(dict(pool.items()), dict(a=1, b=2, c=3, f="fval"))
|
||||
assert_equals(pool.running(), 4)
|
||||
assert_equals(set(pool.running_keys()), set("degh"))
|
||||
assert_equals(pool.waiting(), 1)
|
||||
assert_equals(pool.waiting_for("h"), set("deg"))
|
||||
|
||||
# now two others
|
||||
events["e"].send("eval")
|
||||
events["g"].send("gval")
|
||||
spin()
|
||||
capture.step()
|
||||
assert_equals(pool.get("e"), "eval")
|
||||
assert_equals(pool.get("g"), "gval")
|
||||
assert_equals(set(pool.keys()), set("abcefg"))
|
||||
assert_equals(dict(pool.items()),
|
||||
dict(a=1, b=2, c=3, e="eval", f="fval", g="gval"))
|
||||
assert_equals(pool.running(), 2)
|
||||
assert_equals(set(pool.running_keys()), set("dh"))
|
||||
assert_equals(pool.waiting(), 1)
|
||||
assert_equals(pool.waiting_for("h"), set("d"))
|
||||
|
||||
# last one
|
||||
events["d"].send("dval")
|
||||
# make sure both pool greenthreads get a chance to run
|
||||
spin()
|
||||
capture.step()
|
||||
assert_equals(pool.get("d"), "dval")
|
||||
assert_equals(set(pool.keys()), set("abcdefgh"))
|
||||
assert_equals(dict(pool.items()),
|
||||
dict(a=1, b=2, c=3,
|
||||
d="dval", e="eval", f="fval", g="gval", h="hval"))
|
||||
assert_equals(pool.running(), 0)
|
||||
assert_false(pool.running_keys())
|
||||
assert_equals(pool.waiting(), 0)
|
||||
assert_equals(pool.waiting_for("h"), set())
|
||||
|
||||
capture.validate([
|
||||
["h got b", "h got c"],
|
||||
["f returning fval", "h got f"],
|
||||
["e returning eval", "g returning gval",
|
||||
"h got e", "h got g"],
|
||||
["d returning dval", "h got d", "h returning hval"],
|
||||
[],
|
||||
])
|
||||
|
||||
|
||||
def spawn_many_func(key, results, capture, pool):
|
||||
for k, v in results:
|
||||
# with a capture.step() at each post(), too complicated to predict
|
||||
# which results will be delivered when
|
||||
pass
|
||||
capture.add("{0} done".format(key))
|
||||
# use post(key) instead of waiting for implicit post() of return value
|
||||
pool.post(key, key)
|
||||
capture.step()
|
||||
spin()
|
||||
|
||||
|
||||
def waitall_done(capture, pool):
|
||||
pool.waitall()
|
||||
capture.add("waitall() done")
|
||||
|
||||
|
||||
def test_spawn_many():
|
||||
# This dependencies dict sets up a graph like this:
|
||||
# a
|
||||
# / \
|
||||
# b c
|
||||
# \ /|
|
||||
# d |
|
||||
# \|
|
||||
# e
|
||||
|
||||
deps = dict(e="cd",
|
||||
d="bc",
|
||||
c="a",
|
||||
b="a",
|
||||
a="")
|
||||
|
||||
capture = Capture()
|
||||
pool = DAGPool()
|
||||
# spawn a waitall() waiter externally to our DAGPool, but capture its
|
||||
# message in same Capture instance
|
||||
eventlet.spawn(waitall_done, capture, pool)
|
||||
pool.spawn_many(deps, spawn_many_func, capture, pool)
|
||||
# This set of greenthreads should in fact run to completion once spawned.
|
||||
spin()
|
||||
# verify that e completed (also that post(key) within greenthread
|
||||
# overrides implicit post of return value, which would be None)
|
||||
assert_equals(pool.get("e"), "e")
|
||||
|
||||
# With the dependency graph shown above, it is not guaranteed whether b or
|
||||
# c will complete first. Handle either case.
|
||||
sequence = capture.sequence[:]
|
||||
sequence[1:3] = [set([sequence[1].pop(), sequence[2].pop()])]
|
||||
assert_equals(sequence,
|
||||
[set(["a done"]),
|
||||
set(["b done", "c done"]),
|
||||
set(["d done"]),
|
||||
set(["e done"]),
|
||||
set(["waitall() done"]),
|
||||
])
|
||||
|
||||
|
||||
# deliberately distinguish this from dagpool._MISSING
|
||||
_notthere = object()
|
||||
|
||||
|
||||
def test_wait_each_all():
|
||||
# set up a simple linear dependency chain
|
||||
deps = dict(b="a", c="b", d="c", e="d")
|
||||
capture = Capture()
|
||||
pool = DAGPool([("a", "a")])
|
||||
# capture a different Event for each key
|
||||
events = dict((key, eventlet.event.Event()) for key in six.iterkeys(deps))
|
||||
# can't use spawn_many() because we need a different event for each
|
||||
for key, dep in six.iteritems(deps):
|
||||
pool.spawn(key, dep, observe, capture, events[key])
|
||||
keys = "abcde" # this specific order
|
||||
each = iter(pool.wait_each())
|
||||
for pos in range(len(keys)):
|
||||
# next value from wait_each()
|
||||
k, v = next(each)
|
||||
assert_equals(k, keys[pos])
|
||||
# advance every pool greenlet as far as it can go
|
||||
spin()
|
||||
# everything from keys[:pos+1] should have a value by now
|
||||
for k in keys[:pos + 1]:
|
||||
assert pool.get(k, _notthere) is not _notthere, \
|
||||
"greenlet {0} did not yet produce a value".format(k)
|
||||
# everything from keys[pos+1:] should not yet
|
||||
for k in keys[pos + 1:]:
|
||||
assert pool.get(k, _notthere) is _notthere, \
|
||||
"wait_each() delayed value for {0}".format(keys[pos])
|
||||
# let next greenthread complete
|
||||
if pos < len(keys) - 1:
|
||||
k = keys[pos + 1]
|
||||
events[k].send(k)
|
||||
|
||||
|
||||
def test_kill():
|
||||
pool = DAGPool()
|
||||
# nonexistent key raises KeyError
|
||||
with assert_raises(KeyError):
|
||||
pool.kill("a")
|
||||
# spawn a greenthread
|
||||
pool.spawn("a", (), lambda key, result: 1)
|
||||
# kill it before it can even run
|
||||
pool.kill("a")
|
||||
# didn't run
|
||||
spin()
|
||||
assert_equals(pool.get("a"), None)
|
||||
# killing it forgets about it
|
||||
with assert_raises(KeyError):
|
||||
pool.kill("a")
|
||||
# so that we can try again
|
||||
pool.spawn("a", (), lambda key, result: 2)
|
||||
spin()
|
||||
# this time it ran to completion, so can no longer be killed
|
||||
with assert_raises(KeyError):
|
||||
pool.kill("a")
|
||||
# verify it ran to completion
|
||||
assert_equals(pool.get("a"), 2)
|
||||
|
||||
|
||||
def test_post_collision_preload():
|
||||
pool = DAGPool(dict(a=1))
|
||||
with assert_raises(Collision):
|
||||
pool.post("a", 2)
|
||||
|
||||
|
||||
def test_post_collision_post():
|
||||
pool = DAGPool()
|
||||
pool.post("a", 1)
|
||||
with assert_raises(Collision):
|
||||
pool.post("a", 2)
|
||||
|
||||
|
||||
def test_post_collision_spawn():
|
||||
pool = DAGPool()
|
||||
pool.spawn("a", (), lambda key, result: 1)
|
||||
# hasn't yet run
|
||||
with assert_raises(Collision):
|
||||
# n.b. This exercises the code that tests whether post(key) is or is
|
||||
# not coming from that key's greenthread.
|
||||
pool.post("a", 2)
|
||||
# kill it
|
||||
pool.kill("a")
|
||||
# now we can post
|
||||
pool.post("a", 3)
|
||||
assert_equals(pool.get("a"), 3)
|
||||
|
||||
pool = DAGPool()
|
||||
pool.spawn("a", (), lambda key, result: 4)
|
||||
# run it
|
||||
spin()
|
||||
with assert_raises(Collision):
|
||||
pool.post("a", 5)
|
||||
# can't kill it now either
|
||||
with assert_raises(KeyError):
|
||||
pool.kill("a")
|
||||
# still can't post
|
||||
with assert_raises(Collision):
|
||||
pool.post("a", 6)
|
||||
|
||||
|
||||
def test_post_replace():
|
||||
pool = DAGPool()
|
||||
pool.post("a", 1)
|
||||
pool.post("a", 2, replace=True)
|
||||
assert_equals(pool.get("a"), 2)
|
||||
assert_equals(dict(pool.wait_each("a")), dict(a=2))
|
||||
assert_equals(pool.wait("a"), dict(a=2))
|
||||
assert_equals(pool["a"], 2)
|
||||
|
||||
|
||||
def waitfor(capture, pool, key):
|
||||
value = pool[key]
|
||||
capture.add("got {0}".format(value))
|
||||
|
||||
|
||||
def test_getitem():
|
||||
capture = Capture()
|
||||
pool = DAGPool()
|
||||
eventlet.spawn(waitfor, capture, pool, "a")
|
||||
# pool["a"] just waiting
|
||||
capture.validate([[]])
|
||||
pool.spawn("a", (), lambda key, results: 1)
|
||||
# still waiting: hasn't yet run
|
||||
capture.validate([[]])
|
||||
# run it
|
||||
spin()
|
||||
capture.validate([["got 1"]])
|
||||
|
||||
|
||||
class BogusError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def raiser(key, results, exc):
|
||||
raise exc
|
||||
|
||||
|
||||
def consumer(key, results):
|
||||
for k, v in results:
|
||||
pass
|
||||
return True
|
||||
|
||||
|
||||
def test_waitall_exc():
|
||||
pool = DAGPool()
|
||||
pool.spawn("a", (), raiser, BogusError("bogus"))
|
||||
try:
|
||||
pool.waitall()
|
||||
except PropagateError as err:
|
||||
assert_equals(err.key, "a")
|
||||
assert isinstance(err.exc, BogusError), \
|
||||
"exc attribute is {0}, not BogusError".format(err.exc)
|
||||
assert_equals(str(err.exc), "bogus")
|
||||
msg = str(err)
|
||||
assert_in("PropagateError(a)", msg)
|
||||
assert_in("BogusError", msg)
|
||||
assert_in("bogus", msg)
|
||||
|
||||
|
||||
def test_propagate_exc():
|
||||
pool = DAGPool()
|
||||
pool.spawn("a", (), raiser, BogusError("bogus"))
|
||||
pool.spawn("b", "a", consumer)
|
||||
pool.spawn("c", "b", consumer)
|
||||
try:
|
||||
pool["c"]
|
||||
except PropagateError as errc:
|
||||
assert_equals(errc.key, "c")
|
||||
errb = errc.exc
|
||||
assert_equals(errb.key, "b")
|
||||
erra = errb.exc
|
||||
assert_equals(erra.key, "a")
|
||||
assert isinstance(erra.exc, BogusError), \
|
||||
"exc attribute is {0}, not BogusError".format(erra.exc)
|
||||
assert_equals(str(erra.exc), "bogus")
|
||||
msg = str(errc)
|
||||
assert_in("PropagateError(a)", msg)
|
||||
assert_in("PropagateError(b)", msg)
|
||||
assert_in("PropagateError(c)", msg)
|
||||
assert_in("BogusError", msg)
|
||||
assert_in("bogus", msg)
|
||||
|
||||
|
||||
def test_wait_each_exc():
|
||||
pool = DAGPool()
|
||||
pool.spawn("a", (), raiser, BogusError("bogus"))
|
||||
with assert_raises(PropagateError):
|
||||
for k, v in pool.wait_each("a"):
|
||||
pass
|
||||
|
||||
with assert_raises(PropagateError):
|
||||
for k, v in pool.wait_each():
|
||||
pass
|
||||
|
||||
|
||||
def test_post_get_exc():
|
||||
pool = DAGPool()
|
||||
bogua = BogusError("bogua")
|
||||
pool.post("a", bogua)
|
||||
assert isinstance(pool.get("a"), BogusError), \
|
||||
"should have delivered BogusError instead of raising"
|
||||
bogub = PropagateError("b", BogusError("bogub"))
|
||||
pool.post("b", bogub)
|
||||
with assert_raises(PropagateError):
|
||||
pool.get("b")
|
||||
|
||||
# Notice that although we have both "a" and "b" keys, items() is
|
||||
# guaranteed to raise PropagateError because one of them is
|
||||
# PropagateError. Other values don't matter.
|
||||
with assert_raises(PropagateError):
|
||||
pool.items()
|
||||
|
||||
# Similar remarks about waitall() and wait().
|
||||
with assert_raises(PropagateError):
|
||||
pool.waitall()
|
||||
with assert_raises(PropagateError):
|
||||
pool.wait()
|
||||
with assert_raises(PropagateError):
|
||||
pool.wait("b")
|
||||
with assert_raises(PropagateError):
|
||||
pool.wait("ab")
|
||||
# but if we're only wait()ing for success results, no exception
|
||||
assert isinstance(pool.wait("a")["a"], BogusError), \
|
||||
"should have delivered BogusError instead of raising"
|
||||
|
||||
# wait_each() is guaranteed to eventually raise PropagateError, though you
|
||||
# may obtain valid values before you hit it.
|
||||
with assert_raises(PropagateError):
|
||||
for k, v in pool.wait_each():
|
||||
pass
|
||||
|
||||
# wait_each_success() filters
|
||||
assert_equals(dict(pool.wait_each_success()), dict(a=bogua))
|
||||
assert_equals(dict(pool.wait_each_success("ab")), dict(a=bogua))
|
||||
assert_equals(dict(pool.wait_each_success("a")), dict(a=bogua))
|
||||
assert_equals(dict(pool.wait_each_success("b")), {})
|
||||
|
||||
# wait_each_exception() filters the other way
|
||||
assert_equals(dict(pool.wait_each_exception()), dict(b=bogub))
|
||||
assert_equals(dict(pool.wait_each_exception("ab")), dict(b=bogub))
|
||||
assert_equals(dict(pool.wait_each_exception("a")), {})
|
||||
assert_equals(dict(pool.wait_each_exception("b")), dict(b=bogub))
|
Loading…
Reference in New Issue
Block a user