dagpool: improved documentation
This commit is contained in:

committed by
Sergey Shepelev

parent
9a13405598
commit
7343f2e817
@@ -1,6 +1,493 @@
|
||||
:mod:`dagpool` -- Dependency-Driven Greenthreads
|
||||
================================================
|
||||
|
||||
Rationale
|
||||
*********
|
||||
|
||||
The dagpool module provides the :class:`DAGPool <eventlet.dagpool.DAGPool>`
|
||||
class, which addresses situations in which the value produced by one
|
||||
greenthread might be consumed by several others -- while at the same time a
|
||||
consuming greenthread might depend on the output from several different
|
||||
greenthreads.
|
||||
|
||||
If you have a tree with strict many-to-one dependencies -- each producer
|
||||
greenthread provides results to exactly one consumer, though a given consumer
|
||||
may depend on multiple producers -- that could be addressed by recursively
|
||||
constructing a :class:`GreenPool <eventlet.greenpool.GreenPool>` of producers
|
||||
for each consumer, then :meth:`waiting <eventlet.greenpool.GreenPool.waitall>`
|
||||
for all producers.
|
||||
|
||||
If you have a tree with strict one-to-many dependencies -- each consumer
|
||||
greenthread depends on exactly one producer, though a given producer may
|
||||
provide results to multiple consumers -- that could be addressed by causing
|
||||
each producer to finish by launching a :class:`GreenPool
|
||||
<eventlet.greenpool.GreenPool>` of consumers.
|
||||
|
||||
But when you have many-to-many dependencies, a tree doesn't suffice. This is
|
||||
known as a
|
||||
`Directed Acyclic Graph <https://en.wikipedia.org/wiki/Directed_acyclic_graph>`_,
|
||||
or DAG.
|
||||
|
||||
You might consider sorting the greenthreads into dependency order
|
||||
(`topological sort <https://en.wikipedia.org/wiki/Topological_sorting>`_) and
|
||||
launching them in a GreenPool. But the concurrency of the GreenPool must be
|
||||
strictly constrained to ensure that no greenthread is launched before all its
|
||||
upstream producers have completed -- and the appropriate pool size is
|
||||
data-dependent. Only a pool of size 1 (serializing all the greenthreads)
|
||||
guarantees that a topological sort will produce correct results.
|
||||
|
||||
Even if you do serialize all the greenthreads, how do you pass results from
|
||||
each producer to all its consumers, which might start at very different points
|
||||
in time?
|
||||
|
||||
One answer is to associate each greenthread with a distinct key, and store its
|
||||
result in a common dict. Then each consumer greenthread can identify its
|
||||
direct upstream producers by their keys, and find their results in that dict.
|
||||
|
||||
This is the essence of DAGPool.
|
||||
|
||||
A DAGPool instance owns a dict, and stores greenthread results in that dict.
|
||||
You :meth:`spawn <eventlet.dagpool.DAGPool.spawn>` *all* greenthreads in the
|
||||
DAG, specifying for each its own key -- the key with which its result will be
|
||||
stored on completion -- plus the keys of the upstream producer greenthreads on
|
||||
whose results it directly depends.
|
||||
|
||||
Keys need only be unique within the DAGPool instance; they need not be UUIDs.
|
||||
A key can be any type that can be used as a dict key. String keys make it
|
||||
easier to reason about a DAGPool's behavior, but are by no means required.
|
||||
|
||||
The DAGPool passes to each greenthread an iterable of (key, value) pairs.
|
||||
The key in each pair is the key of one of the greenthread's specified upstream
|
||||
producers; the value is the value returned by that producer greenthread. Pairs
|
||||
are delivered in the order results become available; the consuming greenthread
|
||||
blocks until the next result can be delivered.
|
||||
|
||||
Tutorial
|
||||
*******
|
||||
Example
|
||||
-------
|
||||
|
||||
Consider a couple of programs in some compiled language that depend on a set
|
||||
of precompiled libraries. Suppose every such build requires as input the
|
||||
specific set of library builds on which it directly depends.
|
||||
|
||||
::
|
||||
|
||||
a zlib
|
||||
| / |
|
||||
|/ |
|
||||
b c
|
||||
| /|
|
||||
| / |
|
||||
| / |
|
||||
|/ |
|
||||
d e
|
||||
|
||||
We can't run the build for program d until we have the build results for both
|
||||
b and c. We can't run the build for library b until we have build results for
|
||||
a and zlib. We can, however, immediately run the builds for a and zlib.
|
||||
|
||||
So we can use a DAGPool instance to spawn greenthreads running a function such
|
||||
as this:
|
||||
|
||||
::
|
||||
|
||||
def builder(key, upstream):
|
||||
for libname, product in upstream:
|
||||
# ... configure build for 'key' to use 'product' for 'libname'
|
||||
# all upstream builds have completed
|
||||
# ... run build for 'key'
|
||||
return build_product_for_key
|
||||
|
||||
:meth:`spawn <eventlet.dagpool.DAGPool.spawn>` all these greenthreads:
|
||||
|
||||
::
|
||||
|
||||
pool = DAGPool()
|
||||
# the upstream producer keys passed to spawn() can be from any iterable,
|
||||
# including a generator
|
||||
pool.spawn("d", ("b", "c"), builder)
|
||||
pool.spawn("e", ["c"], builder)
|
||||
pool.spawn("b", ("a", "zlib"), builder)
|
||||
pool.spawn("c", ["zlib"], builder)
|
||||
pool.spawn("a", (), builder)
|
||||
|
||||
As with :func:`eventlet.spawn() <eventlet.spawn>`, if you need to pass special
|
||||
build flags to some set of builds, these can be passed as either positional or
|
||||
keyword arguments:
|
||||
|
||||
::
|
||||
|
||||
def builder(key, upstream, cflags="", linkflags=""):
|
||||
...
|
||||
|
||||
pool.spawn("d", ("b", "c"), builder, "-o2")
|
||||
pool.spawn("e", ["c"], builder, linkflags="-pie")
|
||||
|
||||
However, if the arguments to each builder() call are uniform (as in the
|
||||
original example), you could alternatively build a dict of the dependencies
|
||||
and call :meth:`spawn_many() <eventlet.dagpool.DAGPool.spawn_many>`:
|
||||
|
||||
::
|
||||
|
||||
deps = dict(d=("b", "c"),
|
||||
e=["c"],
|
||||
b=("a", "zlib"),
|
||||
c=["zlib"],
|
||||
a=())
|
||||
pool.spawn_many(deps, builder)
|
||||
|
||||
From outside the DAGPool, you can obtain the results for d and e (or in fact
|
||||
for any of the build greenthreads) in any of several ways.
|
||||
|
||||
:meth:`pool.waitall() <eventlet.dagpool.DAGPool.waitall>` waits until the last of the spawned
|
||||
greenthreads has completed, and returns a dict containing results for *all* of
|
||||
them:
|
||||
|
||||
::
|
||||
|
||||
final = pool.waitall()
|
||||
print("for d: {0}".format(final["d"]))
|
||||
print("for e: {0}".format(final["e"]))
|
||||
|
||||
waitall() is an alias for :meth:`wait() <eventlet.dagpool.DAGPool.wait>` with no arguments:
|
||||
|
||||
::
|
||||
|
||||
final = pool.wait()
|
||||
print("for d: {0}".format(final["d"]))
|
||||
print("for e: {0}".format(final["e"]))
|
||||
|
||||
Or you can specifically wait for only the final programs:
|
||||
|
||||
::
|
||||
|
||||
final = pool.wait(["d", "e"])
|
||||
|
||||
The returned dict will contain only the specified keys. The keys may be passed
|
||||
into wait() from any iterable, including a generator.
|
||||
|
||||
You can wait for any specified set of greenthreads; they need not be
|
||||
topologically last:
|
||||
|
||||
::
|
||||
|
||||
# returns as soon as both a and zlib have returned results, regardless of
|
||||
# what else is still running
|
||||
leaves = pool.wait(["a", "zlib"])
|
||||
|
||||
Suppose you want to wait specifically for just *one* of the final programs:
|
||||
|
||||
::
|
||||
|
||||
final = pool.wait(["d"])
|
||||
dprog = final["d"]
|
||||
|
||||
The above wait() call will return as soon as greenthread d returns a result --
|
||||
regardless of whether greenthread e has finished.
|
||||
|
||||
:meth:`__getitem()__ <eventlet.dagpool.DAGPool.__getitem__>` is shorthand for
|
||||
obtaining a single result:
|
||||
|
||||
::
|
||||
|
||||
# waits until greenthread d returns its result
|
||||
dprog = pool["d"]
|
||||
|
||||
In contrast, :meth:`get() <eventlet.dagpool.DAGPool.get>` returns immediately,
|
||||
whether or not a result is ready:
|
||||
|
||||
::
|
||||
|
||||
# returns immediately
|
||||
if pool.get("d") is None:
|
||||
...
|
||||
|
||||
Of course, your greenthread might not include an explicit return statement and
|
||||
hence might implicitly return None. You might have to test some other value.
|
||||
|
||||
::
|
||||
|
||||
# returns immediately
|
||||
if pool.get("d", "notdone") == "notdone":
|
||||
...
|
||||
|
||||
Suppose you want to process each of the final programs in some way (upload
|
||||
it?), but you don't want to have to wait until they've both finished. You
|
||||
don't have to poll get() calls -- use :meth:`wait_each()
|
||||
<eventlet.dagpool.DAGPool.wait_each>`:
|
||||
|
||||
::
|
||||
|
||||
for key, result in pool.wait_each(["d", "e"]):
|
||||
# key will be d or e, in completion order
|
||||
# process result...
|
||||
|
||||
As with :meth:`wait() <eventlet.dagpool.DAGPool.wait>`, if you omit the
|
||||
argument to wait_each(), it delivers results for all the greenthreads of which
|
||||
it's aware:
|
||||
|
||||
::
|
||||
|
||||
for key, result in pool.wait_each():
|
||||
# key will be a, zlib, b, c, d, e, in whatever order each completes
|
||||
# process its result...
|
||||
|
||||
Introspection
|
||||
-------------
|
||||
|
||||
Let's say you have set up a :class:`DAGPool <eventlet.dagpool.DAGPool>` with
|
||||
the dependencies shown above. To your consternation, your :meth:`waitall()
|
||||
<eventlet.dagpool.DAGPool.waitall>` call does not return! The DAGPool instance
|
||||
is stuck!
|
||||
|
||||
You could change waitall() to :meth:`wait_each()
|
||||
<eventlet.dagpool.DAGPool.wait_each>`, and print each key as it becomes
|
||||
available:
|
||||
|
||||
::
|
||||
|
||||
for key, result in pool.wait_each():
|
||||
print("got result for {0}".format(key))
|
||||
# ... process ...
|
||||
|
||||
Once the build for a has completed, this produces:
|
||||
|
||||
::
|
||||
|
||||
got result for a
|
||||
|
||||
and then stops. Hmm!
|
||||
|
||||
You can check the number of :meth:`running <eventlet.dagpool.DAGPool.running>`
|
||||
greenthreads:
|
||||
|
||||
::
|
||||
|
||||
>>> print(pool.running())
|
||||
4
|
||||
|
||||
and the number of :meth:`waiting <eventlet.dagpool.DAGPool.waiting>`
|
||||
greenthreads:
|
||||
|
||||
::
|
||||
|
||||
>>> print(pool.waiting())
|
||||
4
|
||||
|
||||
It's often more informative to ask *which* greenthreads are :meth:`still
|
||||
running <eventlet.dagpool.DAGPool.running_keys>`:
|
||||
|
||||
::
|
||||
|
||||
>>> print(pool.running_keys())
|
||||
('c', 'b', 'e', 'd')
|
||||
|
||||
but in this case, we already know a has completed.
|
||||
|
||||
We can ask for all available results:
|
||||
|
||||
::
|
||||
|
||||
>>> print(pool.keys())
|
||||
('a',)
|
||||
>>> print(pool.items())
|
||||
(('a', result_from_a),)
|
||||
|
||||
The :meth:`keys() <eventlet.dagpool.DAGPool.keys>` and :meth:`items()
|
||||
<eventlet.dagpool.DAGPool.items>` methods only return keys and items for
|
||||
which results are actually available, reflecting the underlying dict.
|
||||
|
||||
But what's blocking the works? What are we :meth:`waiting for
|
||||
<eventlet.dagpool.DAGPool.waiting_for>`?
|
||||
|
||||
::
|
||||
|
||||
>>> print(pool.waiting_for("d"))
|
||||
set(['c', 'b'])
|
||||
|
||||
(waiting_for()'s optional argument is a *single* key.)
|
||||
|
||||
That doesn't help much yet...
|
||||
|
||||
::
|
||||
|
||||
>>> print(pool.waiting_for("b"))
|
||||
set(['zlib'])
|
||||
>>> print(pool.waiting_for("zlib"))
|
||||
KeyError: 'zlib'
|
||||
|
||||
Aha! We forgot to even include the zlib build when we were originally
|
||||
configuring this DAGPool!
|
||||
|
||||
(For non-interactive use, it would be more informative to omit waiting_for()'s
|
||||
argument. This usage returns a dict indicating, for each greenthread key,
|
||||
which other keys it's waiting for.)
|
||||
|
||||
::
|
||||
|
||||
from pprint import pprint
|
||||
pprint(pool.waiting_for())
|
||||
|
||||
{'b': set(['zlib']), 'c': set(['zlib']), 'd': set(['b', 'c']), 'e': set(['c'])}
|
||||
|
||||
In this case, a reasonable fix would be to spawn the zlib greenthread:
|
||||
|
||||
::
|
||||
|
||||
pool.spawn("zlib", (), builder)
|
||||
|
||||
Even if this is the last method call on this DAGPool instance, it should
|
||||
unblock all the rest of the DAGPool greenthreads.
|
||||
|
||||
Posting
|
||||
-------
|
||||
|
||||
If we happen to have zlib build results in hand already, though, we could
|
||||
instead :meth:`post() <eventlet.dagpool.DAGPool.post>` that result instead of
|
||||
rebuilding the library:
|
||||
|
||||
::
|
||||
|
||||
pool.post("zlib", result_from_zlib)
|
||||
|
||||
This, too, should unblock the rest of the DAGPool greenthreads.
|
||||
|
||||
Preloading
|
||||
----------
|
||||
|
||||
If rebuilding takes nontrivial realtime, it might be useful to record partial
|
||||
results, so that in case of interruption you can restart from where you left
|
||||
off rather than having to rebuild everything prior to that point.
|
||||
|
||||
You could iteratively :meth:`post() <eventlet.dagpool.DAGPool.post>` those
|
||||
prior results into a new DAGPool instance; alternatively you can
|
||||
:meth:`preload <eventlet.dagpool.DAGPool.__init__>` the :class:`DAGPool
|
||||
<eventlet.dagpool.DAGPool>` from an existing dict:
|
||||
|
||||
::
|
||||
|
||||
pool = DAGPool(dict(a=result_from_a, zlib=result_from_zlib))
|
||||
|
||||
Any DAGPool greenthreads that depend on either a or zlib can immediately
|
||||
consume those results.
|
||||
|
||||
It also works to construct DAGPool with an iterable of (key, result) pairs.
|
||||
|
||||
Exception Propagation
|
||||
---------------------
|
||||
|
||||
But what if we spawn a zlib build that fails? Suppose the zlib greenthread
|
||||
terminates with an exception? In that case none of b, c, d or e can proceed!
|
||||
Nor do we want to wait forever for them.
|
||||
|
||||
::
|
||||
|
||||
dprog = pool["d"]
|
||||
eventlet.dagpool.PropagateError: PropagateError(d): PropagateError: PropagateError(c): PropagateError: PropagateError(zlib): OriginalError
|
||||
|
||||
DAGPool provides a :class:`PropagateError <eventlet.dagpool.PropagateError>`
|
||||
exception specifically to wrap such failures. If a DAGPool greenthread
|
||||
terminates with an Exception subclass, the DAGPool wraps that exception in a
|
||||
PropagateError instance whose *key* attribute is the key of the failing
|
||||
greenthread and whose *exc* attribute is the exception that terminated it.
|
||||
This PropagateError is stored as the result from that greenthread.
|
||||
|
||||
Attempting to consume the result from a greenthread for which a PropagateError
|
||||
was stored raises that PropagateError.
|
||||
|
||||
::
|
||||
|
||||
pool["zlib"]
|
||||
eventlet.dagpool.PropagateError: PropagateError(zlib): OriginalError
|
||||
|
||||
Thus, when greenthread c attempts to consume the result from zlib, the
|
||||
PropagateError for zlib is raised. Unless the builder function for greenthread
|
||||
c handles that PropagateError exception, that greenthread will itself
|
||||
terminate. That PropagateError will be wrapped in another PropagateError whose
|
||||
*key* attribute is c and whose *exc* attribute is the PropagateError for zlib.
|
||||
|
||||
Similarly, when greenthread d attempts to consume the result from c, the
|
||||
PropagateError for c is raised. This in turn is wrapped in a PropagateError
|
||||
whose *key* is d and whose *exc* is the PropagateError for c.
|
||||
|
||||
When someone attempts to consume the result from d, as shown above, the
|
||||
PropagateError for d is raised.
|
||||
|
||||
You can programmatically chase the failure path to determine the original
|
||||
failure if desired:
|
||||
|
||||
::
|
||||
|
||||
orig_err = err
|
||||
key = "unknown"
|
||||
while isinstance(orig_err, PropagateError):
|
||||
key = orig_err.key
|
||||
orig_err = orig_err.exc
|
||||
|
||||
Scanning for Success / Exceptions
|
||||
---------------------------------
|
||||
|
||||
Exception propagation means that we neither perform useless builds nor wait for
|
||||
results that will never arrive.
|
||||
|
||||
However, it does make it difficult to obtain *partial* results for builds that
|
||||
*did* succeed.
|
||||
|
||||
For that you can call :meth:`wait_each_success()
|
||||
<eventlet.dagpool.DAGPool.wait_each_success>`:
|
||||
|
||||
::
|
||||
|
||||
for key, result in pool.wait_each_success():
|
||||
print("{0} succeeded".format(key))
|
||||
# ... process result ...
|
||||
|
||||
a succeeded
|
||||
|
||||
Another problem is that although five different greenthreads failed in the
|
||||
example, we only see one chain of failures. You can enumerate the bad news
|
||||
with :meth:`wait_each_exception() <eventlet.dagpool.DAGPool.wait_each_exception>`:
|
||||
|
||||
::
|
||||
|
||||
for key, err in pool.wait_each_exception():
|
||||
print("{0} failed with {1}".format(key, err.exc.__class__.__name__))
|
||||
|
||||
c failed with PropagateError
|
||||
b failed with PropagateError
|
||||
e failed with PropagateError
|
||||
d failed with PropagateError
|
||||
zlib failed with OriginalError
|
||||
|
||||
wait_each_exception() yields each PropagateError wrapper as if it were the
|
||||
result, rather than raising it as an exception.
|
||||
|
||||
Notice that we print :code:`err.exc.__class__.__name__` because
|
||||
:code:`err.__class__.__name__` is always PropagateError.
|
||||
|
||||
Both wait_each_success() and wait_each_exception() can accept an iterable of
|
||||
keys to report:
|
||||
|
||||
::
|
||||
|
||||
for key, result in pool.wait_each_success(["d", "e"]):
|
||||
print("{0} succeeded".format(key))
|
||||
|
||||
(no output)
|
||||
|
||||
for key, err in pool.wait_each_exception(["d", "e"]):
|
||||
print("{0} failed with {1}".format(key, err.exc.__class__.__name__))
|
||||
|
||||
e failed with PropagateError
|
||||
d failed with PropagateError
|
||||
|
||||
Both wait_each_success() and wait_each_exception() must wait until the
|
||||
greenthreads for all specified keys (or all keys) have terminated, one way or
|
||||
the other, because of course we can't know until then how to categorize each.
|
||||
|
||||
Module Contents
|
||||
===============
|
||||
|
||||
.. automodule:: eventlet.dagpool
|
||||
:members:
|
||||
|
||||
|
@@ -1,9 +1,7 @@
|
||||
"""\
|
||||
@file dagpool.py
|
||||
@author Nat Goodspeed
|
||||
@date 2016-08-08
|
||||
@brief Provide DAGPool class
|
||||
"""
|
||||
# @file dagpool.py
|
||||
# @author Nat Goodspeed
|
||||
# @date 2016-08-08
|
||||
# @brief Provide DAGPool class
|
||||
|
||||
from eventlet.event import Event
|
||||
from eventlet import greenthread
|
||||
@@ -32,8 +30,12 @@ class PropagateError(Exception):
|
||||
PropagateError.
|
||||
|
||||
Attributes:
|
||||
key: the key of the greenthread which raised the exception
|
||||
exc: the exception object raised by the greenthread
|
||||
|
||||
key
|
||||
the key of the greenthread which raised the exception
|
||||
|
||||
exc
|
||||
the exception object raised by the greenthread
|
||||
"""
|
||||
def __init__(self, key, exc):
|
||||
self.key = key
|
||||
@@ -51,9 +53,10 @@ class DAGPool(object):
|
||||
|
||||
This is a way to implement general DAG dependencies. A simple dependency
|
||||
tree (flowing in either direction) can straightforwardly be implemented
|
||||
using recursion and (e.g.) GreenThread.imap(). What gets complicated is
|
||||
when a given node depends on several other nodes as well as contributing
|
||||
to several other nodes.
|
||||
using recursion and (e.g.)
|
||||
:meth:`GreenThread.imap() <eventlet.greenthread.GreenThread.imap>`.
|
||||
What gets complicated is when a given node depends on several other nodes
|
||||
as well as contributing to several other nodes.
|
||||
|
||||
With DAGPool, you concurrently launch all applicable greenthreads; each
|
||||
will proceed as soon as it has all required inputs. The DAG is implicit in
|
||||
@@ -65,34 +68,41 @@ class DAGPool(object):
|
||||
|
||||
The greenthread callable must accept (key, results), where:
|
||||
|
||||
key is its own key
|
||||
results is an iterable of (key, value) pairs.
|
||||
key
|
||||
is its own key
|
||||
|
||||
results
|
||||
is an iterable of (key, value) pairs.
|
||||
|
||||
A newly-launched DAGPool greenthread is entered immediately, and can
|
||||
perform any necessary setup work. At some point it will iterate over the
|
||||
(key, value) pairs from the passed 'results' iterable. Doing so blocks the
|
||||
greenthread until a value is available for each of the keys specified in
|
||||
its initial dependencies iterable. These (key, value) pairs are delivered
|
||||
in chronological order, NOT the order in which they are initially
|
||||
in chronological order, *not* the order in which they are initially
|
||||
specified: each value will be delivered as soon as it becomes available.
|
||||
|
||||
The value returned by a DAGPool greenthread becomes the value for its
|
||||
key, which unblocks any other greenthreads waiting on that key.
|
||||
|
||||
If a DAGPool greenthread terminates with an exception instead of returning
|
||||
a value, attempting to retrieve the value raises PropagateError, which
|
||||
binds the key of the original greenthread and the original exception.
|
||||
Unless the greenthread attempting to retrieve the value handles
|
||||
a value, attempting to retrieve the value raises :class:`PropagateError`,
|
||||
which binds the key of the original greenthread and the original
|
||||
exception. Unless the greenthread attempting to retrieve the value handles
|
||||
PropagateError, that exception will in turn be wrapped in a PropagateError
|
||||
of its own, and so forth. The code that ultimately handles PropagateError
|
||||
can follow the chain of PropagateError.exc attributes to discover the flow
|
||||
of that exception through the DAG of greenthreads.
|
||||
|
||||
External greenthreads may also interact with a DAGPool. See wait_each(),
|
||||
waitall(), post().
|
||||
External greenthreads may also interact with a DAGPool. See :meth:`wait_each`,
|
||||
:meth:`waitall`, :meth:`post`.
|
||||
|
||||
It is not recommended to constrain external DAGPool producer greenthreads
|
||||
in a GreenPool: it may be hard to provably avoid deadlock.
|
||||
in a :class:`GreenPool <eventlet.greenpool.GreenPool>`: it may be hard to
|
||||
provably avoid deadlock.
|
||||
|
||||
.. automethod:: __init__
|
||||
.. automethod:: __getitem__
|
||||
"""
|
||||
|
||||
_Coro = collections.namedtuple("_Coro", ("greenthread", "pending"))
|
||||
@@ -123,47 +133,47 @@ class DAGPool(object):
|
||||
def waitall(self):
|
||||
"""
|
||||
waitall() blocks the calling greenthread until there is a value for
|
||||
every DAGPool greenthread launched by spawn(). It returns a dict
|
||||
containing all preload data, all data from post() and all values
|
||||
returned by spawned greenthreads.
|
||||
every DAGPool greenthread launched by :meth:`spawn`. It returns a dict
|
||||
containing all :class:`preload data <DAGPool>`, all data from
|
||||
:meth:`post` and all values returned by spawned greenthreads.
|
||||
|
||||
See also wait().
|
||||
See also :meth:`wait`.
|
||||
"""
|
||||
# waitall() is an alias for compatibility with GreenPool
|
||||
return self.wait()
|
||||
|
||||
def wait(self, keys=_MISSING):
|
||||
"""
|
||||
keys is an optional iterable of keys. If you omit the argument, it
|
||||
waits for all the keys from preload data, from post() calls and from
|
||||
spawn() calls: in other words, all the keys of which this DAGPool is
|
||||
aware.
|
||||
*keys* is an optional iterable of keys. If you omit the argument, it
|
||||
waits for all the keys from :class:`preload data <DAGPool>`, from
|
||||
:meth:`post` calls and from :meth:`spawn` calls: in other words, all
|
||||
the keys of which this DAGPool is aware.
|
||||
|
||||
wait() blocks the calling greenthread until all of the relevant keys
|
||||
have values. wait() returns a dict whose keys are the relevant keys,
|
||||
and whose values come from the preload data, from values returned by
|
||||
DAGPool greenthreads or from post() calls.
|
||||
and whose values come from the *preload* data, from values returned by
|
||||
DAGPool greenthreads or from :meth:`post` calls.
|
||||
|
||||
If a greenthread terminates with an exception, wait() will raise
|
||||
PropagateError wrapping that exception. If more than one greenthread
|
||||
terminates with an exception, it is indeterminate which one wait()
|
||||
will raise.
|
||||
If a DAGPool greenthread terminates with an exception, wait() will
|
||||
raise :class:`PropagateError` wrapping that exception. If more than
|
||||
one greenthread terminates with an exception, it is indeterminate
|
||||
which one wait() will raise.
|
||||
|
||||
If a greenthread posts a PropagateError instance, wait() will raise
|
||||
that PropagateError. If more than one greenthread posts
|
||||
PropagateError, it is indeterminate which one wait() will raise.
|
||||
If an external greenthread posts a :class:`PropagateError` instance,
|
||||
wait() will raise that PropagateError. If more than one greenthread
|
||||
posts PropagateError, it is indeterminate which one wait() will raise.
|
||||
|
||||
See also wait_each_success(), wait_each_exception().
|
||||
See also :meth:`wait_each_success`, :meth:`wait_each_exception`.
|
||||
"""
|
||||
# This is mostly redundant with wait_each() functionality.
|
||||
return dict(self.wait_each(keys))
|
||||
|
||||
def wait_each(self, keys=_MISSING):
|
||||
"""
|
||||
keys is an optional iterable of keys. If you omit the argument, it
|
||||
waits for all the keys from preload data, from post() calls and from
|
||||
spawn() calls: in other words, all the keys of which this DAGPool is
|
||||
aware.
|
||||
*keys* is an optional iterable of keys. If you omit the argument, it
|
||||
waits for all the keys from :class:`preload data <DAGPool>`, from
|
||||
:meth:`post` calls and from :meth:`spawn` calls: in other words, all
|
||||
the keys of which this DAGPool is aware.
|
||||
|
||||
wait_each() is a generator producing (key, value) pairs as a value
|
||||
becomes available for each requested key. wait_each() blocks the
|
||||
@@ -177,11 +187,13 @@ class DAGPool(object):
|
||||
each of the ready ones in arbitrary order before blocking again.
|
||||
|
||||
The DAGPool does not distinguish between a value returned by one of
|
||||
its own greenthreads and one provided by a post() call or preload data.
|
||||
its own greenthreads and one provided by a :meth:`post` call or *preload* data.
|
||||
|
||||
The wait_each() generator terminates (raises StopIteration) when all
|
||||
specified keys have been delivered. Thus, typical usage might be:
|
||||
|
||||
::
|
||||
|
||||
for key, value in dagpool.wait_each(keys):
|
||||
# process this ready key and value
|
||||
# continue processing now that we've gotten values for all keys
|
||||
@@ -189,10 +201,10 @@ class DAGPool(object):
|
||||
By implication, if you pass wait_each() an empty iterable of keys, it
|
||||
returns immediately without yielding anything.
|
||||
|
||||
If the value to be delivered is a PropagateError exception object, the
|
||||
If the value to be delivered is a :class:`PropagateError` exception object, the
|
||||
generator raises that PropagateError instead of yielding it.
|
||||
|
||||
See also wait_each_success(), wait_each_exception().
|
||||
See also :meth:`wait_each_success`, :meth:`wait_each_exception`.
|
||||
"""
|
||||
# Build a local set() and then call _wait_each().
|
||||
return self._wait_each(self._get_keyset_for_wait_each(keys))
|
||||
@@ -200,12 +212,12 @@ class DAGPool(object):
|
||||
def wait_each_success(self, keys=_MISSING):
|
||||
"""
|
||||
wait_each_success() filters results so that only success values are
|
||||
yielded. In other words, unlike wait_each(), wait_each_success() will
|
||||
not raise PropagateError. Not every provided (or defaulted) key will
|
||||
necessarily be represented, though naturally the generator must wait
|
||||
until all have completed.
|
||||
yielded. In other words, unlike :meth:`wait_each`, wait_each_success()
|
||||
will not raise :class:`PropagateError`. Not every provided (or
|
||||
defaulted) key will necessarily be represented, though naturally the
|
||||
generator will not finish until all have completed.
|
||||
|
||||
In all other respects, wait_each_success() behaves like wait_each().
|
||||
In all other respects, wait_each_success() behaves like :meth:`wait_each`.
|
||||
"""
|
||||
for key, value in self._wait_each_raw(self._get_keyset_for_wait_each(keys)):
|
||||
if not isinstance(value, PropagateError):
|
||||
@@ -215,13 +227,13 @@ class DAGPool(object):
|
||||
"""
|
||||
wait_each_exception() filters results so that only exceptions are
|
||||
yielded. Not every provided (or defaulted) key will necessarily be
|
||||
represented, though naturally the generator must wait until all have
|
||||
completed.
|
||||
represented, though naturally the generator will not finish until
|
||||
all have completed.
|
||||
|
||||
Unlike other DAGPool methods, wait_each_exception() simply yields
|
||||
PropagateError instances as values rather than raising them.
|
||||
:class:`PropagateError` instances as values rather than raising them.
|
||||
|
||||
In all other respects, wait_each_exception() behaves like wait_each().
|
||||
In all other respects, wait_each_exception() behaves like :meth:`wait_each`.
|
||||
"""
|
||||
for key, value in self._wait_each_raw(self._get_keyset_for_wait_each(keys)):
|
||||
if isinstance(value, PropagateError):
|
||||
@@ -289,31 +301,32 @@ class DAGPool(object):
|
||||
|
||||
def spawn(self, key, depends, function, *args, **kwds):
|
||||
"""
|
||||
Launch the passed function(key, results, ...) as a greenthread,
|
||||
Launch the passed *function(key, results, ...)* as a greenthread,
|
||||
passing it:
|
||||
|
||||
- the specified 'key'
|
||||
- the specified *key*
|
||||
- an iterable of (key, value) pairs
|
||||
- whatever other positional args or keywords you specify.
|
||||
|
||||
Iterating over the 'results' iterable behaves like calling
|
||||
wait_each(depends).
|
||||
Iterating over the *results* iterable behaves like calling
|
||||
:meth:`wait_each(depends) <DAGPool.wait_each>`.
|
||||
|
||||
Returning from function() behaves like post(key, return_value).
|
||||
Returning from *function()* behaves like
|
||||
:meth:`post(key, return_value) <DAGPool.post>`.
|
||||
|
||||
If function() terminates with an exception, that exception is wrapped
|
||||
in PropagateError with the greenthread's key and (effectively) posted
|
||||
If *function()* terminates with an exception, that exception is wrapped
|
||||
in :class:`PropagateError` with the greenthread's *key* and (effectively) posted
|
||||
as the value for that key. Attempting to retrieve that value will
|
||||
raise that PropagateError.
|
||||
|
||||
Thus, if the greenthread with key 'a' terminates with an exception,
|
||||
and greenthread 'b' depends on 'a', when greenthread 'b' attempts to
|
||||
iterate through its 'results' argument, it will encounter
|
||||
iterate through its *results* argument, it will encounter
|
||||
PropagateError. So by default, an uncaught exception will propagate
|
||||
through all the downstream dependencies.
|
||||
|
||||
If you pass spawn() a key already passed to spawn() or post(), spawn()
|
||||
raises Collision.
|
||||
If you pass :meth:`spawn` a key already passed to spawn() or :meth:`post`, spawn()
|
||||
raises :class:`Collision`.
|
||||
"""
|
||||
if key in self.coros or key in self.values:
|
||||
raise Collision(key)
|
||||
@@ -359,12 +372,18 @@ class DAGPool(object):
|
||||
|
||||
def spawn_many(self, depends, function, *args, **kwds):
|
||||
"""
|
||||
spawn_many() accepts a single function whose parameters are the same
|
||||
as for spawn().
|
||||
spawn_many() accepts a single *function* whose parameters are the same
|
||||
as for :meth:`spawn`.
|
||||
|
||||
The difference is that spawn_many() accepts a dependency dict. A new
|
||||
greenthread is spawned for each key in the dict. That dict key's value
|
||||
should be an iterable of other keys on which this greenthread depends.
|
||||
The difference is that spawn_many() accepts a dependency dict
|
||||
*depends*. A new greenthread is spawned for each key in the dict. That
|
||||
dict key's value should be an iterable of other keys on which this
|
||||
greenthread depends.
|
||||
|
||||
If the *depends* dict contains any key already passed to :meth:`spawn`
|
||||
or :meth:`post`, spawn_many() raises :class:`Collision`. It is
|
||||
indeterminate how many of the other keys in *depends* will have
|
||||
successfully spawned greenthreads.
|
||||
"""
|
||||
# Iterate over 'depends' items, relying on self.spawn() not to
|
||||
# context-switch so no one can modify 'depends' along the way.
|
||||
@@ -373,7 +392,7 @@ class DAGPool(object):
|
||||
|
||||
def kill(self, key):
|
||||
"""
|
||||
Kill the greenthread that was spawned with the specified 'key'.
|
||||
Kill the greenthread that was spawned with the specified *key*.
|
||||
|
||||
If no such greenthread was spawned, raise KeyError.
|
||||
"""
|
||||
@@ -384,25 +403,26 @@ class DAGPool(object):
|
||||
|
||||
def post(self, key, value, replace=False):
|
||||
"""
|
||||
post(key, value) stores the passed value for the passed key. It then
|
||||
causes each greenthread blocked on its results iterable, or on
|
||||
wait_each(keys), to check for new values. A waiting greenthread might
|
||||
not literally resume on every single post() of a relevant key, but the
|
||||
first post() of a relevant key ensures that it will resume eventually,
|
||||
and when it does it will catch up with all relevant post() calls.
|
||||
post(key, value) stores the passed *value* for the passed *key*. It
|
||||
then causes each greenthread blocked on its results iterable, or on
|
||||
:meth:`wait_each(keys) <DAGPool.wait_each>`, to check for new values.
|
||||
A waiting greenthread might not literally resume on every single
|
||||
post() of a relevant key, but the first post() of a relevant key
|
||||
ensures that it will resume eventually, and when it does it will catch
|
||||
up with all relevant post() calls.
|
||||
|
||||
Calling post(key, value) when there is a running greenthread with that
|
||||
same 'key' raises Collision. If you must post(key, value) instead of
|
||||
same *key* raises :class:`Collision`. If you must post(key, value) instead of
|
||||
letting the greenthread run to completion, you must first call
|
||||
kill(key).
|
||||
:meth:`kill(key) <DAGPool.kill>`.
|
||||
|
||||
The DAGPool implicitly post()s the return value from each of its
|
||||
greenthreads. But a greenthread may explicitly post() a value for its
|
||||
own key, which will cause its return value to be discarded.
|
||||
|
||||
Calling post(key, value, replace=False) (the default 'replace') when a
|
||||
Calling post(key, value, replace=False) (the default *replace*) when a
|
||||
value for that key has already been posted, by any means, raises
|
||||
Collision.
|
||||
:class:`Collision`.
|
||||
|
||||
Calling post(key, value, replace=True) when a value for that key has
|
||||
already been posted, by any means, replaces the previously-stored
|
||||
@@ -410,13 +430,14 @@ class DAGPool(object):
|
||||
behavior of greenthreads waiting on that key.
|
||||
|
||||
After a post(key, value1) followed by post(key, value2, replace=True),
|
||||
it is unspecified which pending wait_each([key...]) calls (or
|
||||
greenthreads iterating over 'results' involving that key) will observe
|
||||
value1 versus value2. It is guaranteed that subsequent
|
||||
wait_each([key...]) calls (or greenthreads spawned after that point)
|
||||
will observe value2.
|
||||
it is unspecified which pending :meth:`wait_each([key...]) <DAGPool.wait_each>`
|
||||
calls (or greenthreads iterating over *results* involving that key)
|
||||
will observe *value1* versus *value2*. It is guaranteed that
|
||||
subsequent wait_each([key...]) calls (or greenthreads spawned after
|
||||
that point) will observe *value2*.
|
||||
|
||||
A successful call to post(key, PropagateError(key, ExceptionSubclass))
|
||||
A successful call to
|
||||
post(key, :class:`PropagateError(key, ExceptionSubclass) <PropagateError>`)
|
||||
ensures that any subsequent attempt to retrieve that key's value will
|
||||
raise that PropagateError instance.
|
||||
"""
|
||||
@@ -452,7 +473,7 @@ class DAGPool(object):
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""
|
||||
__getitem__(key) (aka dagpool[key]) blocks until 'key' has a value,
|
||||
__getitem__(key) (aka dagpool[key]) blocks until *key* has a value,
|
||||
then delivers that value.
|
||||
"""
|
||||
# This is a degenerate case of wait_each(). Construct a tuple
|
||||
@@ -463,41 +484,41 @@ class DAGPool(object):
|
||||
|
||||
def get(self, key, default=None):
|
||||
"""
|
||||
get() returns the value for 'key'. If 'key' does not yet have a value,
|
||||
get() returns 'default'.
|
||||
get() returns the value for *key*. If *key* does not yet have a value,
|
||||
get() returns *default*.
|
||||
"""
|
||||
return self._value_or_raise(self.values.get(key, default))
|
||||
|
||||
def keys(self):
|
||||
"""
|
||||
Return a tuple of keys for which we currently have values. Explicitly
|
||||
return a copy rather than an iterator: don't assume our caller will
|
||||
finish iterating before new values are posted.
|
||||
Return a snapshot tuple of keys for which we currently have values.
|
||||
"""
|
||||
# Explicitly return a copy rather than an iterator: don't assume our
|
||||
# caller will finish iterating before new values are posted.
|
||||
return tuple(six.iterkeys(self.values))
|
||||
|
||||
def items(self):
|
||||
"""
|
||||
Return a snapshot tuple of currently-available (key, value) pairs.
|
||||
Don't assume our caller will finish iterating before new values are
|
||||
posted.
|
||||
"""
|
||||
# Don't assume our caller will finish iterating before new values are
|
||||
# posted.
|
||||
return tuple((key, self._value_or_raise(value))
|
||||
for key, value in six.iteritems(self.values))
|
||||
|
||||
def running(self):
|
||||
"""
|
||||
Return number of running greenthreads. This includes greenthreads
|
||||
blocked while iterating through their 'results' iterable, that is,
|
||||
greenthreads waiting on values from other keys.
|
||||
Return number of running DAGPool greenthreads. This includes
|
||||
greenthreads blocked while iterating through their *results* iterable,
|
||||
that is, greenthreads waiting on values from other keys.
|
||||
"""
|
||||
return len(self.coros)
|
||||
|
||||
def running_keys(self):
|
||||
"""
|
||||
Return keys for running greenthreads. This includes greenthreads
|
||||
blocked while iterating through their 'results' iterable, that is,
|
||||
greenthreads waiting on values from other keys.
|
||||
Return keys for running DAGPool greenthreads. This includes
|
||||
greenthreads blocked while iterating through their *results* iterable,
|
||||
that is, greenthreads waiting on values from other keys.
|
||||
"""
|
||||
# return snapshot; don't assume caller will finish iterating before we
|
||||
# next modify self.coros
|
||||
@@ -505,9 +526,10 @@ class DAGPool(object):
|
||||
|
||||
def waiting(self):
|
||||
"""
|
||||
Return number of waiting greenthreads, that is, greenthreads still
|
||||
waiting on values from other keys. This explicitly does NOT include
|
||||
external greenthreads waiting on wait(), waitall(), wait_each().
|
||||
Return number of waiting DAGPool greenthreads, that is, greenthreads
|
||||
still waiting on values from other keys. This explicitly does *not*
|
||||
include external greenthreads waiting on :meth:`wait`,
|
||||
:meth:`waitall`, :meth:`wait_each`.
|
||||
"""
|
||||
# n.b. if Event would provide a count of its waiters, we could say
|
||||
# something about external greenthreads as well.
|
||||
@@ -519,18 +541,19 @@ class DAGPool(object):
|
||||
# None as a supported key.
|
||||
def waiting_for(self, key=_MISSING):
|
||||
"""
|
||||
waiting_for(key) returns a set() of the keys for which the greenthread
|
||||
spawned with that key is still waiting. If you pass a key for which no
|
||||
greenthread was spawned, waiting_for() raises KeyError.
|
||||
waiting_for(key) returns a set() of the keys for which the DAGPool
|
||||
greenthread spawned with that *key* is still waiting. If you pass a
|
||||
*key* for which no greenthread was spawned, waiting_for() raises
|
||||
KeyError.
|
||||
|
||||
waiting_for() without argument returns a dict. Its keys are the keys
|
||||
of greenthreads still waiting on one or more values. In the returned
|
||||
dict, the value of each such key is the set of other keys for which
|
||||
that greenthread is still waiting.
|
||||
of DAGPool greenthreads still waiting on one or more values. In the
|
||||
returned dict, the value of each such key is the set of other keys for
|
||||
which that greenthread is still waiting.
|
||||
|
||||
This method allows diagnosing a 'hung' DAGPool. If certain
|
||||
This method allows diagnosing a "hung" DAGPool. If certain
|
||||
greenthreads are making no progress, it's possible that they are
|
||||
waiting on keys for which there is no greenthread and no post() data.
|
||||
waiting on keys for which there is no greenthread and no :meth:`post` data.
|
||||
"""
|
||||
# We may have greenthreads whose 'pending' entry indicates they're
|
||||
# waiting on some keys even though values have now been posted for
|
||||
|
Reference in New Issue
Block a user