Merge tag '1.21.0' into debian/liberty

taskflow 1.21.0 release
This commit is contained in:
Ivan Udovichenko
2015-09-29 10:23:55 +03:00
32 changed files with 822 additions and 532 deletions

3
.gitignore vendored
View File

@@ -22,7 +22,8 @@ lib64
pip-log.txt
# Unit test / coverage reports
.coverage
.coverage*
.diagram-tools/*
.tox
nosetests.xml
.venv

View File

@@ -2,6 +2,7 @@
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-160} \
OS_DEBUG=${OS_DEBUG:-TRACE} \
${PYTHON:-python} -m subunit.run discover -t ./ ./taskflow/tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE

Binary file not shown.

View File

@@ -87,7 +87,7 @@ Rebinding
stored with a name other than the corresponding arguments name. That's when the
``rebind`` constructor parameter comes in handy. Using it the flow author
can instruct the engine to fetch a value from storage by one name, but pass it
to a tasks/retrys ``execute`` method with another name. There are two possible
to a tasks/retries ``execute`` method with another name. There are two possible
ways of accomplishing this.
The first is to pass a dictionary that maps the argument name to the name
@@ -303,7 +303,7 @@ Default provides
++++++++++++++++
As mentioned above, the default base class provides nothing, which means
results are not accessible to other tasks/retrys in the flow.
results are not accessible to other tasks/retries in the flow.
The author can override this and specify default value for provides using
the ``default_provides`` class/instance variable:
@@ -386,7 +386,7 @@ A |Retry| controller works with arguments in the same way as a |Task|. But it
has an additional parameter ``'history'`` that is itself a
:py:class:`~taskflow.retry.History` object that contains what failed over all
the engines attempts (aka the outcomes). The history object can be
viewed as a tuple that contains a result of the previous retrys run and a
viewed as a tuple that contains a result of the previous retries run and a
table/dict where each key is a failed atoms name and each value is
a :py:class:`~taskflow.types.failure.Failure` object.
@@ -415,7 +415,7 @@ the following history (printed as a list)::
At this point (since the implementation returned ``RETRY``) the
|retry.execute| method will be called again and it will receive the same
history and it can then return a value that subseqent tasks can use to alter
history and it can then return a value that subsequent tasks can use to alter
their behavior.
If instead the |retry.execute| method itself raises an exception,

View File

@@ -29,6 +29,13 @@ it (they are *nearly* analogous to functions). These task objects all derive
from :py:class:`~taskflow.task.BaseTask` which defines what a task must
provide in terms of properties and methods.
**For example:**
.. image:: img/tasks.png
:width: 525px
:align: left
:alt: Task outline.
Currently the following *provided* types of task subclasses are:
* :py:class:`~taskflow.task.Task`: useful for inheriting from and creating your

View File

@@ -85,7 +85,7 @@ Of course these kind of features can come with some drawbacks:
away from (and this is likely a mindset change for programmers used to the
imperative model). We have worked to make this less of a concern by creating
and encouraging the usage of :doc:`persistence <persistence>`, to help make
it possible to have state and tranfer that state via a argument input and
it possible to have state and transfer that state via a argument input and
output mechanism.
* Depending on how much imperative code exists (and state inside that code)
there *may* be *significant* rework of that code and converting or
@@ -258,9 +258,10 @@ Execution
The graph (and helper objects) previously created are now used for guiding
further execution (see :py:func:`~taskflow.engines.base.Engine.run`). The
flow is put into the ``RUNNING`` :doc:`state <states>` and a
:py:class:`~taskflow.engines.action_engine.runner.Runner` implementation
object starts to take over and begins going through the stages listed
below (for a more visual diagram/representation see
:py:class:`~taskflow.engines.action_engine.builder.MachineBuilder` state
machine object and runner object are built (using the `automaton`_ library).
That machine and associated runner then starts to take over and begins going
through the stages listed below (for a more visual diagram/representation see
the :ref:`engine state diagram <engine states>`).
.. note::
@@ -338,8 +339,8 @@ above stages will be restarted and resuming will occur).
Finishing
---------
At this point the
:py:class:`~taskflow.engines.action_engine.runner.Runner` has
At this point the machine (and runner) that was built using the
:py:class:`~taskflow.engines.action_engine.builder.MachineBuilder` class has
now finished successfully, failed, or the execution was suspended. Depending on
which one of these occurs will cause the flow to enter a new state (typically
one of ``FAILURE``, ``SUSPENDED``, ``SUCCESS`` or ``REVERTED``).
@@ -365,9 +366,9 @@ this performs is a transition of the flow state from ``RUNNING`` into a
``SUSPENDING`` state (which will later transition into a ``SUSPENDED`` state).
Since an engine may be remotely executing atoms (or locally executing them)
and there is currently no preemption what occurs is that the engines
:py:class:`~taskflow.engines.action_engine.runner.Runner` state machine will
detect this transition into ``SUSPENDING`` has occurred and the state
machine will avoid scheduling new work (it will though let active work
:py:class:`~taskflow.engines.action_engine.builder.MachineBuilder` state
machine will detect this transition into ``SUSPENDING`` has occurred and the
state machine will avoid scheduling new work (it will though let active work
continue). After the current work has finished the engine will
transition from ``SUSPENDING`` into ``SUSPENDED`` and return from its
:py:func:`~taskflow.engines.base.Engine.run` method.
@@ -444,10 +445,10 @@ Components
cycle).
.. automodule:: taskflow.engines.action_engine.analyzer
.. automodule:: taskflow.engines.action_engine.builder
.. automodule:: taskflow.engines.action_engine.compiler
.. automodule:: taskflow.engines.action_engine.completer
.. automodule:: taskflow.engines.action_engine.executor
.. automodule:: taskflow.engines.action_engine.runner
.. automodule:: taskflow.engines.action_engine.runtime
.. automodule:: taskflow.engines.action_engine.scheduler
.. autoclass:: taskflow.engines.action_engine.scopes.ScopeWalker
@@ -462,6 +463,7 @@ Hierarchy
taskflow.engines.worker_based.engine.WorkerBasedActionEngine
:parts: 1
.. _automaton: http://docs.openstack.org/developer/automaton/
.. _multiprocessing: https://docs.python.org/2/library/multiprocessing.html
.. _future: https://docs.python.org/dev/library/concurrent.futures.html#future-objects
.. _executor: https://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.Executor

BIN
doc/source/img/tasks.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 236 KiB

View File

@@ -215,7 +215,7 @@ Redis
**Board type**: ``'redis'``
Uses `redis`_ to provide the jobboard capabilities and semantics by using
a redis hash datastructure and individual job ownership keys (that can
a redis hash data structure and individual job ownership keys (that can
optionally expire after a given amount of time).
.. note::
@@ -303,5 +303,5 @@ Hierarchy
.. _paradigm shift: https://wiki.openstack.org/wiki/TaskFlow/Paradigm_shifts#Workflow_ownership_transfer
.. _zookeeper: http://zookeeper.apache.org/
.. _kazoo: http://kazoo.readthedocs.org/
.. _stevedore: http://stevedore.readthedocs.org/
.. _stevedore: http://docs.openstack.org/developer/stevedore/
.. _redis: http://redis.io/

View File

@@ -31,7 +31,7 @@ This abstraction serves the following *major* purposes:
vs. stop.
* *Something you create...*
.. _stevedore: http://stevedore.readthedocs.org/
.. _stevedore: http://docs.openstack.org/developer/stevedore/
How it is used
==============

View File

@@ -23,6 +23,11 @@ Eventlet
.. automodule:: taskflow.utils.eventlet_utils
Iterators
~~~~~~~~~
.. automodule:: taskflow.utils.iter_utils
Kazoo
~~~~~

View File

@@ -286,10 +286,10 @@ but not *yet* consumed.
**PENDING** - Worker accepted request and is pending to run using its
executor (threads, processes, or other).
**FAILURE** - Worker failed after running request (due to task exeception) or
**FAILURE** - Worker failed after running request (due to task exception) or
no worker moved/started executing (by placing the request into ``RUNNING``
state) with-in specified time span (this defaults to 60 seconds unless
overriden).
overridden).
**RUNNING** - Workers executor (using threads, processes...) has started to
run requested task (once this state is transitioned to any request timeout no

View File

@@ -3,7 +3,7 @@
# process, which may cause wedges in the gate later.
# See: https://bugs.launchpad.net/pbr/+bug/1384919 for why this is here...
pbr<2.0,>=1.3
pbr<2.0,>=1.6
# Packages needed for using this library.
@@ -20,7 +20,7 @@ futurist>=0.1.2 # Apache-2.0
fasteners>=0.7 # Apache-2.0
# Very nice graph library
networkx>=1.8
networkx>=1.10
# For contextlib new additions/compatibility for <= python 3.3
contextlib2>=0.4.0 # PSF License
@@ -32,16 +32,16 @@ stevedore>=1.5.0 # Apache-2.0
futures>=3.0;python_version=='2.7' or python_version=='2.6'
# Backport for time.monotonic which is in 3.3+
monotonic>=0.1 # Apache-2.0
monotonic>=0.3 # Apache-2.0
# Used for structured input validation
jsonschema!=2.5.0,<3.0.0,>=2.0.0
# For the state machine we run with
automaton>=0.2.0 # Apache-2.0
automaton>=0.5.0 # Apache-2.0
# For common utilities
oslo.utils>=1.9.0 # Apache-2.0
oslo.utils>=2.0.0 # Apache-2.0
oslo.serialization>=1.4.0 # Apache-2.0
# For lru caches and such

View File

@@ -14,8 +14,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import abc
import itertools
import weakref
from networkx.algorithms import traversal
import six
@@ -23,7 +24,33 @@ import six
from taskflow import states as st
class IgnoreDecider(object):
@six.add_metaclass(abc.ABCMeta)
class Decider(object):
"""Base class for deciders.
Provides interface to be implemented by sub-classes
Decider checks whether next atom in flow should be executed or not
"""
@abc.abstractmethod
def check(self, runtime):
"""Returns bool of whether this decider should allow running."""
@abc.abstractmethod
def affect(self, runtime):
"""If the :py:func:`~.check` returns false, affects associated atoms.
"""
def check_and_affect(self, runtime):
"""Handles :py:func:`~.check` + :py:func:`~.affect` in right order."""
proceed = self.check(runtime)
if not proceed:
self.affect(runtime)
return proceed
class IgnoreDecider(Decider):
"""Checks any provided edge-deciders and determines if ok to run."""
def __init__(self, atom, edge_deciders):
@@ -51,15 +78,8 @@ class IgnoreDecider(object):
runtime.reset_nodes(itertools.chain([self._atom], successors_iter),
state=st.IGNORE, intention=st.IGNORE)
def check_and_affect(self, runtime):
"""Handles :py:func:`~.check` + :py:func:`~.affect` in right order."""
proceed = self.check(runtime)
if not proceed:
self.affect(runtime)
return proceed
class NoOpDecider(object):
class NoOpDecider(Decider):
"""No-op decider that says it is always ok to run & has no effect(s)."""
def check(self, runtime):
@@ -69,13 +89,6 @@ class NoOpDecider(object):
def affect(self, runtime):
"""Does nothing."""
def check_and_affect(self, runtime):
"""Handles :py:func:`~.check` + :py:func:`~.affect` in right order.
Does nothing.
"""
return self.check(runtime)
class Analyzer(object):
"""Analyzes a compilation and aids in execution processes.
@@ -88,12 +101,9 @@ class Analyzer(object):
"""
def __init__(self, runtime):
self._runtime = weakref.proxy(runtime)
self._storage = runtime.storage
self._execution_graph = runtime.compilation.execution_graph
self._check_atom_transition = runtime.check_atom_transition
self._fetch_edge_deciders = runtime.fetch_edge_deciders
self._fetch_retries = functools.partial(
runtime.fetch_atoms_by_kind, 'retry')
def get_next_nodes(self, node=None):
"""Get next nodes to run (originating from node or all nodes)."""
@@ -161,7 +171,8 @@ class Analyzer(object):
state = self.get_state(atom)
intention = self._storage.get_atom_intention(atom.name)
transition = self._check_atom_transition(atom, state, st.RUNNING)
transition = self._runtime.check_atom_transition(atom, state,
st.RUNNING)
if not transition or intention != st.EXECUTE:
return (False, None)
@@ -177,7 +188,7 @@ class Analyzer(object):
if not ok_to_run:
return (False, None)
else:
edge_deciders = self._fetch_edge_deciders(atom)
edge_deciders = self._runtime.fetch_edge_deciders(atom)
return (True, IgnoreDecider(atom, edge_deciders))
def _get_maybe_ready_for_revert(self, atom):
@@ -185,7 +196,8 @@ class Analyzer(object):
state = self.get_state(atom)
intention = self._storage.get_atom_intention(atom.name)
transition = self._check_atom_transition(atom, state, st.REVERTING)
transition = self._runtime.check_atom_transition(atom, state,
st.REVERTING)
if not transition or intention not in (st.REVERT, st.RETRY):
return (False, None)
@@ -213,7 +225,7 @@ class Analyzer(object):
If no state is provided it will yield back all retry atoms.
"""
for atom in self._fetch_retries():
for atom in self._runtime.fetch_atoms_by_kind('retry'):
if not state or self.get_state(atom) == state:
yield atom

View File

@@ -14,37 +14,37 @@
# License for the specific language governing permissions and limitations
# under the License.
import weakref
from automaton import machines
from automaton import runners
from taskflow import logging
from taskflow import states as st
from taskflow.types import failure
# Waiting state timeout (in seconds).
_WAITING_TIMEOUT = 60
# Default waiting state timeout (in seconds).
WAITING_TIMEOUT = 60
# Meta states the state machine uses.
_UNDEFINED = 'UNDEFINED'
_GAME_OVER = 'GAME_OVER'
_META_STATES = (_GAME_OVER, _UNDEFINED)
UNDEFINED = 'UNDEFINED'
GAME_OVER = 'GAME_OVER'
META_STATES = (GAME_OVER, UNDEFINED)
# Event name constants the state machine uses.
_SCHEDULE = 'schedule_next'
_WAIT = 'wait_finished'
_ANALYZE = 'examine_finished'
_FINISH = 'completed'
_FAILED = 'failed'
_SUSPENDED = 'suspended'
_SUCCESS = 'success'
_REVERTED = 'reverted'
_START = 'start'
SCHEDULE = 'schedule_next'
WAIT = 'wait_finished'
ANALYZE = 'examine_finished'
FINISH = 'completed'
FAILED = 'failed'
SUSPENDED = 'suspended'
SUCCESS = 'success'
REVERTED = 'reverted'
START = 'start'
LOG = logging.getLogger(__name__)
class _MachineMemory(object):
class MachineMemory(object):
"""State machine memory."""
def __init__(self):
@@ -54,31 +54,31 @@ class _MachineMemory(object):
self.done = set()
class Runner(object):
"""State machine *builder* + *runner* that powers the engine components.
class MachineBuilder(object):
"""State machine *builder* that powers the engine components.
NOTE(harlowja): the machine (states and events that will trigger
transitions) that this builds is represented by the following
table::
+--------------+------------------+------------+----------+---------+
Start | Event | End | On Enter | On Exit
| Start | Event | End | On Enter | On Exit |
+--------------+------------------+------------+----------+---------+
ANALYZING | completed | GAME_OVER | |
ANALYZING | schedule_next | SCHEDULING | |
ANALYZING | wait_finished | WAITING | |
FAILURE[$] | | | |
GAME_OVER | failed | FAILURE | |
GAME_OVER | reverted | REVERTED | |
GAME_OVER | success | SUCCESS | |
GAME_OVER | suspended | SUSPENDED | |
RESUMING | schedule_next | SCHEDULING | |
REVERTED[$] | | | |
SCHEDULING | wait_finished | WAITING | |
SUCCESS[$] | | | |
SUSPENDED[$] | | | |
UNDEFINED[^] | start | RESUMING | |
WAITING | examine_finished | ANALYZING | |
| ANALYZING | completed | GAME_OVER | . | . |
| ANALYZING | schedule_next | SCHEDULING | . | . |
| ANALYZING | wait_finished | WAITING | . | . |
| FAILURE[$] | . | . | . | . |
| GAME_OVER | failed | FAILURE | . | . |
| GAME_OVER | reverted | REVERTED | . | . |
| GAME_OVER | success | SUCCESS | . | . |
| GAME_OVER | suspended | SUSPENDED | . | . |
| RESUMING | schedule_next | SCHEDULING | . | . |
| REVERTED[$] | . | . | . | . |
| SCHEDULING | wait_finished | WAITING | . | . |
| SUCCESS[$] | . | . | . | . |
| SUSPENDED[$] | . | . | . | . |
| UNDEFINED[^] | start | RESUMING | . | . |
| WAITING | examine_finished | ANALYZING | . | . |
+--------------+------------------+------------+----------+---------+
Between any of these yielded states (minus ``GAME_OVER`` and ``UNDEFINED``)
@@ -91,34 +91,29 @@ class Runner(object):
tasks in parallel, this enables parallel running and/or reversion.
"""
# Informational states this action yields while running, not useful to
# have the engine record but useful to provide to end-users when doing
# execution iterations.
ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING)
def __init__(self, runtime, waiter):
self._runtime = runtime
self._runtime = weakref.proxy(runtime)
self._analyzer = runtime.analyzer
self._completer = runtime.completer
self._scheduler = runtime.scheduler
self._storage = runtime.storage
self._waiter = waiter
def runnable(self):
"""Checks if the storage says the flow is still runnable/running."""
return self._storage.get_flow_state() == st.RUNNING
def build(self, timeout=None):
"""Builds a state-machine (that can be/is used during running)."""
"""Builds a state-machine (that is used during running)."""
memory = _MachineMemory()
memory = MachineMemory()
if timeout is None:
timeout = _WAITING_TIMEOUT
timeout = WAITING_TIMEOUT
# Cache some local functions/methods...
do_schedule = self._scheduler.schedule
do_complete = self._completer.complete
def is_runnable():
# Checks if the storage says the flow is still runnable...
return self._storage.get_flow_state() == st.RUNNING
def iter_next_nodes(target_node=None):
# Yields and filters and tweaks the next nodes to execute...
maybe_nodes = self._analyzer.get_next_nodes(node=target_node)
@@ -134,7 +129,7 @@ class Runner(object):
# that are now ready to be ran.
memory.next_nodes.update(self._completer.resume())
memory.next_nodes.update(iter_next_nodes())
return _SCHEDULE
return SCHEDULE
def game_over(old_state, new_state, event):
# This reaction function is mainly a intermediary delegation
@@ -142,13 +137,13 @@ class Runner(object):
# the appropriate handler that will deal with the memory values,
# it is *always* called before the final state is entered.
if memory.failures:
return _FAILED
return FAILED
if any(1 for node in iter_next_nodes()):
return _SUSPENDED
return SUSPENDED
elif self._analyzer.is_success():
return _SUCCESS
return SUCCESS
else:
return _REVERTED
return REVERTED
def schedule(old_state, new_state, event):
# This reaction function starts to schedule the memory's next
@@ -156,14 +151,14 @@ class Runner(object):
# if the user of this engine has requested the engine/storage
# that holds this information to stop or suspend); handles failures
# that occur during this process safely...
if self.runnable() and memory.next_nodes:
if is_runnable() and memory.next_nodes:
not_done, failures = do_schedule(memory.next_nodes)
if not_done:
memory.not_done.update(not_done)
if failures:
memory.failures.extend(failures)
memory.next_nodes.clear()
return _WAIT
return WAIT
def wait(old_state, new_state, event):
# TODO(harlowja): maybe we should start doing 'yield from' this
@@ -173,7 +168,7 @@ class Runner(object):
done, not_done = self._waiter(memory.not_done, timeout=timeout)
memory.done.update(done)
memory.not_done = not_done
return _ANALYZE
return ANALYZE
def analyze(old_state, new_state, event):
# This reaction function is responsible for analyzing all nodes
@@ -215,13 +210,13 @@ class Runner(object):
memory.failures.append(failure.Failure())
else:
next_nodes.update(more_nodes)
if self.runnable() and next_nodes and not memory.failures:
if is_runnable() and next_nodes and not memory.failures:
memory.next_nodes.update(next_nodes)
return _SCHEDULE
return SCHEDULE
elif memory.not_done:
return _WAIT
return WAIT
else:
return _FINISH
return FINISH
def on_exit(old_state, event):
LOG.debug("Exiting old state '%s' in response to event '%s'",
@@ -239,8 +234,8 @@ class Runner(object):
watchers['on_enter'] = on_enter
m = machines.FiniteMachine()
m.add_state(_GAME_OVER, **watchers)
m.add_state(_UNDEFINED, **watchers)
m.add_state(GAME_OVER, **watchers)
m.add_state(UNDEFINED, **watchers)
m.add_state(st.ANALYZING, **watchers)
m.add_state(st.RESUMING, **watchers)
m.add_state(st.REVERTED, terminal=True, **watchers)
@@ -249,38 +244,25 @@ class Runner(object):
m.add_state(st.SUSPENDED, terminal=True, **watchers)
m.add_state(st.WAITING, **watchers)
m.add_state(st.FAILURE, terminal=True, **watchers)
m.default_start_state = _UNDEFINED
m.default_start_state = UNDEFINED
m.add_transition(_GAME_OVER, st.REVERTED, _REVERTED)
m.add_transition(_GAME_OVER, st.SUCCESS, _SUCCESS)
m.add_transition(_GAME_OVER, st.SUSPENDED, _SUSPENDED)
m.add_transition(_GAME_OVER, st.FAILURE, _FAILED)
m.add_transition(_UNDEFINED, st.RESUMING, _START)
m.add_transition(st.ANALYZING, _GAME_OVER, _FINISH)
m.add_transition(st.ANALYZING, st.SCHEDULING, _SCHEDULE)
m.add_transition(st.ANALYZING, st.WAITING, _WAIT)
m.add_transition(st.RESUMING, st.SCHEDULING, _SCHEDULE)
m.add_transition(st.SCHEDULING, st.WAITING, _WAIT)
m.add_transition(st.WAITING, st.ANALYZING, _ANALYZE)
m.add_transition(GAME_OVER, st.REVERTED, REVERTED)
m.add_transition(GAME_OVER, st.SUCCESS, SUCCESS)
m.add_transition(GAME_OVER, st.SUSPENDED, SUSPENDED)
m.add_transition(GAME_OVER, st.FAILURE, FAILED)
m.add_transition(UNDEFINED, st.RESUMING, START)
m.add_transition(st.ANALYZING, GAME_OVER, FINISH)
m.add_transition(st.ANALYZING, st.SCHEDULING, SCHEDULE)
m.add_transition(st.ANALYZING, st.WAITING, WAIT)
m.add_transition(st.RESUMING, st.SCHEDULING, SCHEDULE)
m.add_transition(st.SCHEDULING, st.WAITING, WAIT)
m.add_transition(st.WAITING, st.ANALYZING, ANALYZE)
m.add_reaction(_GAME_OVER, _FINISH, game_over)
m.add_reaction(st.ANALYZING, _ANALYZE, analyze)
m.add_reaction(st.RESUMING, _START, resume)
m.add_reaction(st.SCHEDULING, _SCHEDULE, schedule)
m.add_reaction(st.WAITING, _WAIT, wait)
m.add_reaction(GAME_OVER, FINISH, game_over)
m.add_reaction(st.ANALYZING, ANALYZE, analyze)
m.add_reaction(st.RESUMING, START, resume)
m.add_reaction(st.SCHEDULING, SCHEDULE, schedule)
m.add_reaction(st.WAITING, WAIT, wait)
m.freeze()
r = runners.FiniteRunner(m)
return (m, r, memory)
def run_iter(self, timeout=None):
"""Runs iteratively using a locally built state machine."""
machine, runner, memory = self.build(timeout=timeout)
for (_prior_state, new_state) in runner.run_iter(_START):
# NOTE(harlowja): skip over meta-states.
if new_state not in _META_STATES:
if new_state == st.FAILURE:
yield (new_state, memory.failures)
else:
yield (new_state, [])
return (m, memory)

View File

@@ -25,6 +25,7 @@ from taskflow import logging
from taskflow import task
from taskflow.types import graph as gr
from taskflow.types import tree as tr
from taskflow.utils import iter_utils
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@@ -232,8 +233,8 @@ class _FlowCompiler(object):
@staticmethod
def _occurence_detector(to_graph, from_graph):
return sum(1 for node in from_graph.nodes_iter()
if node in to_graph)
return iter_utils.count(node for node in from_graph.nodes_iter()
if node in to_graph)
def _decompose_flow(self, flow, parent=None):
"""Decomposes a flow into a graph, tree node + decomposed subgraphs."""
@@ -371,6 +372,7 @@ class PatternCompiler(object):
_FlowCompiler(self._compile, self._linker),
_TaskCompiler(),
]
self._level = 0
def _compile(self, item, parent=None):
"""Compiles a item (pattern, task) into a graph + tree node."""
@@ -391,13 +393,28 @@ class PatternCompiler(object):
" and/or recursive compiling is not"
" supported" % (item, type(item)))
self._history.add(item)
if LOG.isEnabledFor(logging.BLATHER):
LOG.blather("%sCompiling '%s'", " " * self._level, item)
self._level += 1
def _post_item_compile(self, item, graph, node):
"""Called after a item is compiled; doing post-compilation actions."""
self._level -= 1
if LOG.isEnabledFor(logging.BLATHER):
prefix = ' ' * self._level
LOG.blather("%sDecomposed '%s' into:", prefix, item)
prefix = ' ' * (self._level + 1)
LOG.blather("%sGraph:", prefix)
for line in graph.pformat().splitlines():
LOG.blather("%s %s", prefix, line)
LOG.blather("%sHierarchy:", prefix)
for line in node.pformat().splitlines():
LOG.blather("%s %s", prefix, line)
def _pre_compile(self):
"""Called before the compilation of the root starts."""
self._history.clear()
self._level = 0
def _post_compile(self, graph, node):
"""Called after the compilation of the root finishes successfully."""
@@ -410,19 +427,6 @@ class PatternCompiler(object):
raise exc.Empty("Root container '%s' (%s) is empty"
% (self._root, type(self._root)))
self._history.clear()
# NOTE(harlowja): this one can be expensive to calculate (especially
# the cycle detection), so only do it if we know BLATHER is enabled
# and not under all cases.
if LOG.isEnabledFor(logging.BLATHER):
LOG.blather("Translated '%s'", self._root)
LOG.blather("Graph:")
for line in graph.pformat().splitlines():
# Indent it so that it's slightly offset from the above line.
LOG.blather(" %s", line)
LOG.blather("Hierarchy:")
for line in node.pformat().splitlines():
# Indent it so that it's slightly offset from the above line.
LOG.blather(" %s", line)
@fasteners.locked
def compile(self):

View File

@@ -19,6 +19,7 @@ import contextlib
import itertools
import threading
from automaton import runners
from concurrent import futures
import fasteners
import networkx as nx
@@ -26,6 +27,7 @@ from oslo_utils import excutils
from oslo_utils import strutils
import six
from taskflow.engines.action_engine import builder
from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import executor
from taskflow.engines.action_engine import runtime
@@ -59,9 +61,9 @@ class ActionEngine(base.Engine):
This engine compiles the flow (and any subflows) into a compilation unit
which contains the full runtime definition to be executed and then uses
this compilation unit in combination with the executor, runtime, runner
and storage classes to attempt to run your flow (and any subflows &
contained atoms) to completion.
this compilation unit in combination with the executor, runtime, machine
builder and storage classes to attempt to run your flow (and any
subflows & contained atoms) to completion.
NOTE(harlowja): during this process it is permissible and valid to have a
task or multiple tasks in the execution graph fail (at the same time even),
@@ -77,6 +79,15 @@ class ActionEngine(base.Engine):
failure/s that were captured (if any) to get reraised.
"""
IGNORABLE_STATES = frozenset(
itertools.chain([states.SCHEDULING, states.WAITING, states.RESUMING,
states.ANALYZING], builder.META_STATES))
"""
Informational states this engines internal machine yields back while
running, not useful to have the engine record but useful to provide to
end-users when doing execution iterations via :py:meth:`.run_iter`.
"""
def __init__(self, flow, flow_detail, backend, options):
super(ActionEngine, self).__init__(flow, flow_detail, backend, options)
self._runtime = None
@@ -151,20 +162,20 @@ class ActionEngine(base.Engine):
def run_iter(self, timeout=None):
"""Runs the engine using iteration (or die trying).
:param timeout: timeout to wait for any tasks to complete (this timeout
:param timeout: timeout to wait for any atoms to complete (this timeout
will be used during the waiting period that occurs after the
waiting state is yielded when unfinished tasks are being waited
for).
waiting state is yielded when unfinished atoms are being waited
on).
Instead of running to completion in a blocking manner, this will
return a generator which will yield back the various states that the
engine is going through (and can be used to run multiple engines at
once using a generator per engine). the iterator returned also
responds to the send() method from pep-0342 and will attempt to suspend
itself if a truthy value is sent in (the suspend may be delayed until
all active tasks have finished).
once using a generator per engine). The iterator returned also
responds to the ``send()`` method from :pep:`0342` and will attempt to
suspend itself if a truthy value is sent in (the suspend may be
delayed until all active atoms have finished).
NOTE(harlowja): using the run_iter method will **not** retain the
NOTE(harlowja): using the ``run_iter`` method will **not** retain the
engine lock while executing so the user should ensure that there is
only one entity using a returned engine iterator (one per engine) at a
given time.
@@ -172,19 +183,24 @@ class ActionEngine(base.Engine):
self.compile()
self.prepare()
self.validate()
runner = self._runtime.runner
last_state = None
with _start_stop(self._task_executor, self._retry_executor):
self._change_state(states.RUNNING)
try:
closed = False
for (last_state, failures) in runner.run_iter(timeout=timeout):
if failures:
failure.Failure.reraise_if_any(failures)
machine, memory = self._runtime.builder.build(timeout=timeout)
r = runners.FiniteRunner(machine)
for (_prior_state, new_state) in r.run_iter(builder.START):
last_state = new_state
# NOTE(harlowja): skip over meta-states.
if new_state in builder.META_STATES:
continue
if new_state == states.FAILURE:
failure.Failure.reraise_if_any(memory.failures)
if closed:
continue
try:
try_suspend = yield last_state
try_suspend = yield new_state
except GeneratorExit:
# The generator was closed, attempt to suspend and
# continue looping until we have cleanly closed up
@@ -198,9 +214,8 @@ class ActionEngine(base.Engine):
with excutils.save_and_reraise_exception():
self._change_state(states.FAILURE)
else:
ignorable_states = getattr(runner, 'ignorable_states', [])
if last_state and last_state not in ignorable_states:
self._change_state(last_state)
if last_state and last_state not in self.IGNORABLE_STATES:
self._change_state(new_state)
if last_state not in self.NO_RERAISING_STATES:
it = itertools.chain(
six.itervalues(self.storage.get_failures()),
@@ -294,10 +309,7 @@ class ActionEngine(base.Engine):
@fasteners.locked
def reset(self):
if not self._storage_ensured:
raise exc.InvalidState("Can not reset an engine"
" which has not has its storage"
" populated")
self._check('reset', True, True)
# This transitions *all* contained atoms back into the PENDING state
# with an intention to EXECUTE (or dies trying to do that) and then
# changes the state of the flow to PENDING so that it can then run...

View File

@@ -16,14 +16,16 @@
import functools
from futurist import waiters
from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
from taskflow.engines.action_engine import analyzer as an
from taskflow.engines.action_engine import builder as bu
from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import runner as ru
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
from taskflow import flow as flow_type
from taskflow import flow
from taskflow import states as st
from taskflow import task
from taskflow.utils import async_utils
@@ -88,7 +90,7 @@ class Runtime(object):
# is able to run (or should not) ensure we retain it and use
# it later as needed.
u_v_data = execution_graph.adj[previous_atom][atom]
u_v_decider = u_v_data.get(flow_type.LINK_DECIDER)
u_v_decider = u_v_data.get(flow.LINK_DECIDER)
if u_v_decider is not None:
edge_deciders[previous_atom.name] = u_v_decider
metadata['scope_walker'] = walker
@@ -113,8 +115,8 @@ class Runtime(object):
return an.Analyzer(self)
@misc.cachedproperty
def runner(self):
return ru.Runner(self, async_utils.wait_for_any)
def builder(self):
return bu.MachineBuilder(self, waiters.wait_for_any)
@misc.cachedproperty
def completer(self):

View File

@@ -76,7 +76,7 @@ class Scheduler(object):
"""Safely schedules atoms using a runtime ``fetch_scheduler`` routine."""
def __init__(self, runtime):
self._fetch_scheduler = runtime.fetch_scheduler
self._runtime = weakref.proxy(runtime)
def schedule(self, atoms):
"""Schedules the provided atoms for *future* completion.
@@ -89,7 +89,7 @@ class Scheduler(object):
"""
futures = set()
for atom in atoms:
scheduler = self._fetch_scheduler(atom)
scheduler = self._runtime.fetch_scheduler(atom)
try:
futures.add(scheduler.schedule(atom))
except Exception:

View File

@@ -17,7 +17,6 @@
from __future__ import absolute_import
import logging
import sys
_BASE = __name__.split(".", 1)[0]
@@ -49,45 +48,8 @@ class _BlatherLoggerAdapter(logging.LoggerAdapter):
self.warning(msg, *args, **kwargs)
# TODO(harlowja): we should remove when we no longer have to support 2.6...
if sys.version_info[0:2] == (2, 6):
class _FixedBlatherLoggerAdapter(_BlatherLoggerAdapter):
"""Ensures isEnabledFor() exists on adapters that are created."""
def isEnabledFor(self, level):
return self.logger.isEnabledFor(level)
_BlatherLoggerAdapter = _FixedBlatherLoggerAdapter
# Taken from python2.7 (same in python3.4)...
class _NullHandler(logging.Handler):
"""This handler does nothing.
It's intended to be used to avoid the
"No handlers could be found for logger XXX" one-off warning. This is
important for library code, which may contain code to log events. If a
user of the library does not configure logging, the one-off warning
might be produced; to avoid this, the library developer simply needs
to instantiate a _NullHandler and add it to the top-level logger of the
library module or package.
"""
def handle(self, record):
"""Stub."""
def emit(self, record):
"""Stub."""
def createLock(self):
self.lock = None
else:
_NullHandler = logging.NullHandler
def getLogger(name=_BASE, extra=None):
logger = logging.getLogger(name)
if not logger.handlers:
logger.addHandler(_NullHandler())
logger.addHandler(logging.NullHandler())
return _BlatherLoggerAdapter(logger, extra=extra)

View File

@@ -54,10 +54,15 @@ class Flow(flow.Flow):
which will be resolved by using the *flows/tasks* provides and requires
mappings or by following manually created dependency links.
From dependencies directed graph is build. If it has edge A -> B, this
means B depends on A.
From dependencies a `directed graph`_ is built. If it has edge ``A -> B``,
this means ``B`` depends on ``A`` (and that the execution of ``B`` must
wait until ``A`` has finished executing, on reverting this means that the
reverting of ``A`` must wait until ``B`` has finished reverting).
Note: Cyclic dependencies are not allowed.
Note: `cyclic`_ dependencies are not allowed.
.. _directed graph: https://en.wikipedia.org/wiki/Directed_graph
.. _cyclic: https://en.wikipedia.org/wiki/Cycle_graph
"""
def __init__(self, name, retry=None):
@@ -71,6 +76,12 @@ class Flow(flow.Flow):
def link(self, u, v, decider=None):
"""Link existing node u as a runtime dependency of existing node v.
Note that if the addition of these edges creates a `cyclic`_ graph
then a :class:`~taskflow.exceptions.DependencyFailure` will be
raised and the provided changes will be discarded. If the nodes
that are being requested to link do not exist in this graph than a
:class:`ValueError` will be raised.
:param u: task or flow to create a link from (must exist already)
:param v: task or flow to create a link to (must exist already)
:param decider: A callback function that will be expected to decide
@@ -82,6 +93,8 @@ class Flow(flow.Flow):
links that have ``v`` as a target. It is expected to
return a single boolean (``True`` to allow ``v``
execution or ``False`` to not).
.. _cyclic: https://en.wikipedia.org/wiki/Cycle_graph
"""
if not self._graph.has_node(u):
raise ValueError("Node '%s' not found to link from" % (u))
@@ -135,6 +148,11 @@ class Flow(flow.Flow):
def add(self, *nodes, **kwargs):
"""Adds a given task/tasks/flow/flows to this flow.
Note that if the addition of these nodes (and any edges) creates
a `cyclic`_ graph then
a :class:`~taskflow.exceptions.DependencyFailure` will be
raised and the applied changes will be discarded.
:param nodes: node(s) to add to the flow
:param kwargs: keyword arguments, the two keyword arguments
currently processed are:
@@ -144,13 +162,18 @@ class Flow(flow.Flow):
symbol requirements will be matched to existing
node(s) and links will be automatically made to those
providers. If multiple possible providers exist
then a AmbiguousDependency exception will be raised.
then a
:class:`~taskflow.exceptions.AmbiguousDependency`
exception will be raised and the provided additions
will be discarded.
* ``resolve_existing``, a boolean that when true (the
default) implies that on addition of a new node that
existing node(s) will have their requirements scanned
for symbols that this newly added node can provide.
If a match is found a link is automatically created
from the newly added node to the requiree.
.. _cyclic: https://en.wikipedia.org/wiki/Cycle_graph
"""
# Let's try to avoid doing any work if we can; since the below code

View File

@@ -527,11 +527,6 @@ class AtomDetail(object):
self.meta = {}
self.version = None
@staticmethod
def _was_failure(state, result):
# Internal helper method...
return state == states.FAILURE and isinstance(result, ft.Failure)
@property
def last_results(self):
"""Gets the atoms last result.

View File

@@ -15,11 +15,12 @@
# under the License.
from automaton import exceptions as excp
from automaton import runners
import six
from taskflow.engines.action_engine import builder
from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import executor
from taskflow.engines.action_engine import runner
from taskflow.engines.action_engine import runtime
from taskflow.patterns import linear_flow as lf
from taskflow import states as st
@@ -30,7 +31,8 @@ from taskflow.types import notifier
from taskflow.utils import persistence_utils as pu
class _RunnerTestMixin(object):
class BuildersTest(test.TestCase):
def _make_runtime(self, flow, initial_state=None):
compilation = compiler.PatternCompiler(flow).compile()
flow_detail = pu.create_flow_detail(flow)
@@ -51,17 +53,11 @@ class _RunnerTestMixin(object):
r.compile()
return r
class RunnerTest(test.TestCase, _RunnerTestMixin):
def test_running(self):
flow = lf.Flow("root")
flow.add(*test_utils.make_many(1))
rt = self._make_runtime(flow, initial_state=st.RUNNING)
self.assertTrue(rt.runner.runnable())
rt = self._make_runtime(flow, initial_state=st.SUSPENDED)
self.assertFalse(rt.runner.runnable())
def _make_machine(self, flow, initial_state=None):
runtime = self._make_runtime(flow, initial_state=initial_state)
machine, memory = runtime.builder.build()
machine_runner = runners.FiniteRunner(machine)
return (runtime, machine, memory, machine_runner)
def test_run_iterations(self):
flow = lf.Flow("root")
@@ -69,29 +65,32 @@ class RunnerTest(test.TestCase, _RunnerTestMixin):
1, task_cls=test_utils.TaskNoRequiresNoReturns)
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
self.assertTrue(rt.runner.runnable())
runtime, machine, memory, machine_runner = self._make_machine(
flow, initial_state=st.RUNNING)
it = rt.runner.run_iter()
state, failures = six.next(it)
self.assertEqual(st.RESUMING, state)
self.assertEqual(0, len(failures))
it = machine_runner.run_iter(builder.START)
prior_state, new_state = six.next(it)
self.assertEqual(st.RESUMING, new_state)
self.assertEqual(0, len(memory.failures))
state, failures = six.next(it)
self.assertEqual(st.SCHEDULING, state)
self.assertEqual(0, len(failures))
prior_state, new_state = six.next(it)
self.assertEqual(st.SCHEDULING, new_state)
self.assertEqual(0, len(memory.failures))
state, failures = six.next(it)
self.assertEqual(st.WAITING, state)
self.assertEqual(0, len(failures))
prior_state, new_state = six.next(it)
self.assertEqual(st.WAITING, new_state)
self.assertEqual(0, len(memory.failures))
state, failures = six.next(it)
self.assertEqual(st.ANALYZING, state)
self.assertEqual(0, len(failures))
prior_state, new_state = six.next(it)
self.assertEqual(st.ANALYZING, new_state)
self.assertEqual(0, len(memory.failures))
state, failures = six.next(it)
self.assertEqual(st.SUCCESS, state)
self.assertEqual(0, len(failures))
prior_state, new_state = six.next(it)
self.assertEqual(builder.GAME_OVER, new_state)
self.assertEqual(0, len(memory.failures))
prior_state, new_state = six.next(it)
self.assertEqual(st.SUCCESS, new_state)
self.assertEqual(0, len(memory.failures))
self.assertRaises(StopIteration, six.next, it)
@@ -101,15 +100,15 @@ class RunnerTest(test.TestCase, _RunnerTestMixin):
1, task_cls=test_utils.TaskWithFailure)
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
self.assertTrue(rt.runner.runnable())
runtime, machine, memory, machine_runner = self._make_machine(
flow, initial_state=st.RUNNING)
transitions = list(rt.runner.run_iter())
state, failures = transitions[-1]
self.assertEqual(st.REVERTED, state)
self.assertEqual([], failures)
self.assertEqual(st.REVERTED, rt.storage.get_atom_state(tasks[0].name))
transitions = list(machine_runner.run_iter(builder.START))
prior_state, new_state = transitions[-1]
self.assertEqual(st.REVERTED, new_state)
self.assertEqual([], memory.failures)
self.assertEqual(st.REVERTED,
runtime.storage.get_atom_state(tasks[0].name))
def test_run_iterations_failure(self):
flow = lf.Flow("root")
@@ -117,18 +116,17 @@ class RunnerTest(test.TestCase, _RunnerTestMixin):
1, task_cls=test_utils.NastyFailingTask)
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
self.assertTrue(rt.runner.runnable())
runtime, machine, memory, machine_runner = self._make_machine(
flow, initial_state=st.RUNNING)
transitions = list(rt.runner.run_iter())
state, failures = transitions[-1]
self.assertEqual(st.FAILURE, state)
self.assertEqual(1, len(failures))
failure = failures[0]
transitions = list(machine_runner.run_iter(builder.START))
prior_state, new_state = transitions[-1]
self.assertEqual(st.FAILURE, new_state)
self.assertEqual(1, len(memory.failures))
failure = memory.failures[0]
self.assertTrue(failure.check(RuntimeError))
self.assertEqual(st.REVERT_FAILURE,
rt.storage.get_atom_state(tasks[0].name))
runtime.storage.get_atom_state(tasks[0].name))
def test_run_iterations_suspended(self):
flow = lf.Flow("root")
@@ -136,20 +134,22 @@ class RunnerTest(test.TestCase, _RunnerTestMixin):
2, task_cls=test_utils.TaskNoRequiresNoReturns)
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
self.assertTrue(rt.runner.runnable())
runtime, machine, memory, machine_runner = self._make_machine(
flow, initial_state=st.RUNNING)
transitions = []
for state, failures in rt.runner.run_iter():
transitions.append((state, failures))
if state == st.ANALYZING:
rt.storage.set_flow_state(st.SUSPENDED)
for prior_state, new_state in machine_runner.run_iter(builder.START):
transitions.append((new_state, memory.failures))
if new_state == st.ANALYZING:
runtime.storage.set_flow_state(st.SUSPENDED)
state, failures = transitions[-1]
self.assertEqual(st.SUSPENDED, state)
self.assertEqual([], failures)
self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name))
self.assertEqual(st.PENDING, rt.storage.get_atom_state(tasks[1].name))
self.assertEqual(st.SUCCESS,
runtime.storage.get_atom_state(tasks[0].name))
self.assertEqual(st.PENDING,
runtime.storage.get_atom_state(tasks[1].name))
def test_run_iterations_suspended_failure(self):
flow = lf.Flow("root")
@@ -160,46 +160,44 @@ class RunnerTest(test.TestCase, _RunnerTestMixin):
1, task_cls=test_utils.TaskNoRequiresNoReturns, offset=1)
flow.add(*happy_tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
self.assertTrue(rt.runner.runnable())
runtime, machine, memory, machine_runner = self._make_machine(
flow, initial_state=st.RUNNING)
transitions = []
for state, failures in rt.runner.run_iter():
transitions.append((state, failures))
if state == st.ANALYZING:
rt.storage.set_flow_state(st.SUSPENDED)
for prior_state, new_state in machine_runner.run_iter(builder.START):
transitions.append((new_state, memory.failures))
if new_state == st.ANALYZING:
runtime.storage.set_flow_state(st.SUSPENDED)
state, failures = transitions[-1]
self.assertEqual(st.SUSPENDED, state)
self.assertEqual([], failures)
self.assertEqual(st.PENDING,
rt.storage.get_atom_state(happy_tasks[0].name))
runtime.storage.get_atom_state(happy_tasks[0].name))
self.assertEqual(st.FAILURE,
rt.storage.get_atom_state(sad_tasks[0].name))
runtime.storage.get_atom_state(sad_tasks[0].name))
class RunnerBuildTest(test.TestCase, _RunnerTestMixin):
def test_builder_manual_process(self):
flow = lf.Flow("root")
tasks = test_utils.make_many(
1, task_cls=test_utils.TaskNoRequiresNoReturns)
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, machine_runner, memory = rt.runner.build()
self.assertTrue(rt.runner.runnable())
runtime, machine, memory, machine_runner = self._make_machine(
flow, initial_state=st.RUNNING)
self.assertRaises(excp.NotInitialized, machine.process_event, 'poke')
# Should now be pending...
self.assertEqual(st.PENDING, rt.storage.get_atom_state(tasks[0].name))
self.assertEqual(st.PENDING,
runtime.storage.get_atom_state(tasks[0].name))
machine.initialize()
self.assertEqual(runner._UNDEFINED, machine.current_state)
self.assertEqual(builder.UNDEFINED, machine.current_state)
self.assertFalse(machine.terminated)
self.assertRaises(excp.NotFound, machine.process_event, 'poke')
last_state = machine.current_state
reaction, terminal = machine.process_event('start')
reaction, terminal = machine.process_event(builder.START)
self.assertFalse(terminal)
self.assertIsNotNone(reaction)
self.assertEqual(st.RESUMING, machine.current_state)
@@ -208,7 +206,7 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin):
last_state = machine.current_state
cb, args, kwargs = reaction
next_event = cb(last_state, machine.current_state,
'start', *args, **kwargs)
builder.START, *args, **kwargs)
reaction, terminal = machine.process_event(next_event)
self.assertFalse(terminal)
self.assertIsNotNone(reaction)
@@ -225,7 +223,8 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin):
self.assertRaises(excp.NotFound, machine.process_event, 'poke')
# Should now be running...
self.assertEqual(st.RUNNING, rt.storage.get_atom_state(tasks[0].name))
self.assertEqual(st.RUNNING,
runtime.storage.get_atom_state(tasks[0].name))
last_state = machine.current_state
cb, args, kwargs = reaction
@@ -243,10 +242,11 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin):
next_event, *args, **kwargs)
reaction, terminal = machine.process_event(next_event)
self.assertFalse(terminal)
self.assertEqual(runner._GAME_OVER, machine.current_state)
self.assertEqual(builder.GAME_OVER, machine.current_state)
# Should now be done...
self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name))
self.assertEqual(st.SUCCESS,
runtime.storage.get_atom_state(tasks[0].name))
def test_builder_automatic_process(self):
flow = lf.Flow("root")
@@ -254,26 +254,25 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin):
1, task_cls=test_utils.TaskNoRequiresNoReturns)
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, machine_runner, memory = rt.runner.build()
self.assertTrue(rt.runner.runnable())
runtime, machine, memory, machine_runner = self._make_machine(
flow, initial_state=st.RUNNING)
transitions = list(machine_runner.run_iter('start'))
self.assertEqual((runner._UNDEFINED, st.RESUMING), transitions[0])
self.assertEqual((runner._GAME_OVER, st.SUCCESS), transitions[-1])
self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name))
transitions = list(machine_runner.run_iter(builder.START))
self.assertEqual((builder.UNDEFINED, st.RESUMING), transitions[0])
self.assertEqual((builder.GAME_OVER, st.SUCCESS), transitions[-1])
self.assertEqual(st.SUCCESS,
runtime.storage.get_atom_state(tasks[0].name))
def test_builder_automatic_process_failure(self):
flow = lf.Flow("root")
tasks = test_utils.make_many(1, task_cls=test_utils.NastyFailingTask)
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, machine_runner, memory = rt.runner.build()
self.assertTrue(rt.runner.runnable())
runtime, machine, memory, machine_runner = self._make_machine(
flow, initial_state=st.RUNNING)
transitions = list(machine_runner.run_iter('start'))
self.assertEqual((runner._GAME_OVER, st.FAILURE), transitions[-1])
transitions = list(machine_runner.run_iter(builder.START))
self.assertEqual((builder.GAME_OVER, st.FAILURE), transitions[-1])
self.assertEqual(1, len(memory.failures))
def test_builder_automatic_process_reverted(self):
@@ -281,13 +280,13 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin):
tasks = test_utils.make_many(1, task_cls=test_utils.TaskWithFailure)
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, machine_runner, memory = rt.runner.build()
self.assertTrue(rt.runner.runnable())
runtime, machine, memory, machine_runner = self._make_machine(
flow, initial_state=st.RUNNING)
transitions = list(machine_runner.run_iter('start'))
self.assertEqual((runner._GAME_OVER, st.REVERTED), transitions[-1])
self.assertEqual(st.REVERTED, rt.storage.get_atom_state(tasks[0].name))
transitions = list(machine_runner.run_iter(builder.START))
self.assertEqual((builder.GAME_OVER, st.REVERTED), transitions[-1])
self.assertEqual(st.REVERTED,
runtime.storage.get_atom_state(tasks[0].name))
def test_builder_expected_transition_occurrences(self):
flow = lf.Flow("root")
@@ -295,16 +294,16 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin):
10, task_cls=test_utils.TaskNoRequiresNoReturns)
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, machine_runner, memory = rt.runner.build()
transitions = list(machine_runner.run_iter('start'))
runtime, machine, memory, machine_runner = self._make_machine(
flow, initial_state=st.RUNNING)
transitions = list(machine_runner.run_iter(builder.START))
occurrences = dict((t, transitions.count(t)) for t in transitions)
self.assertEqual(10, occurrences.get((st.SCHEDULING, st.WAITING)))
self.assertEqual(10, occurrences.get((st.WAITING, st.ANALYZING)))
self.assertEqual(9, occurrences.get((st.ANALYZING, st.SCHEDULING)))
self.assertEqual(1, occurrences.get((runner._GAME_OVER, st.SUCCESS)))
self.assertEqual(1, occurrences.get((runner._UNDEFINED, st.RESUMING)))
self.assertEqual(1, occurrences.get((builder.GAME_OVER, st.SUCCESS)))
self.assertEqual(1, occurrences.get((builder.UNDEFINED, st.RESUMING)))
self.assertEqual(0, len(memory.next_nodes))
self.assertEqual(0, len(memory.not_done))

View File

@@ -140,6 +140,245 @@ class TreeTest(test.TestCase):
p.add(tree.Node("human"))
return a
def test_pformat_species(self):
root = self._make_species()
expected = """
animal
|__mammal
| |__horse
| |__primate
| |__monkey
| |__human
|__reptile
"""
self.assertEqual(expected.strip(), root.pformat())
def test_pformat_flat(self):
root = tree.Node("josh")
root.add(tree.Node("josh.1"))
expected = """
josh
|__josh.1
"""
self.assertEqual(expected.strip(), root.pformat())
root[0].add(tree.Node("josh.1.1"))
expected = """
josh
|__josh.1
|__josh.1.1
"""
self.assertEqual(expected.strip(), root.pformat())
root[0][0].add(tree.Node("josh.1.1.1"))
expected = """
josh
|__josh.1
|__josh.1.1
|__josh.1.1.1
"""
self.assertEqual(expected.strip(), root.pformat())
root[0][0][0].add(tree.Node("josh.1.1.1.1"))
expected = """
josh
|__josh.1
|__josh.1.1
|__josh.1.1.1
|__josh.1.1.1.1
"""
self.assertEqual(expected.strip(), root.pformat())
def test_pformat_partial_species(self):
root = self._make_species()
expected = """
reptile
"""
self.assertEqual(expected.strip(), root[1].pformat())
expected = """
mammal
|__horse
|__primate
|__monkey
|__human
"""
self.assertEqual(expected.strip(), root[0].pformat())
expected = """
primate
|__monkey
|__human
"""
self.assertEqual(expected.strip(), root[0][1].pformat())
expected = """
monkey
"""
self.assertEqual(expected.strip(), root[0][1][0].pformat())
def test_pformat(self):
root = tree.Node("CEO")
expected = """
CEO
"""
self.assertEqual(expected.strip(), root.pformat())
root.add(tree.Node("Infra"))
expected = """
CEO
|__Infra
"""
self.assertEqual(expected.strip(), root.pformat())
root[0].add(tree.Node("Infra.1"))
expected = """
CEO
|__Infra
|__Infra.1
"""
self.assertEqual(expected.strip(), root.pformat())
root.add(tree.Node("Mail"))
expected = """
CEO
|__Infra
| |__Infra.1
|__Mail
"""
self.assertEqual(expected.strip(), root.pformat())
root.add(tree.Node("Search"))
expected = """
CEO
|__Infra
| |__Infra.1
|__Mail
|__Search
"""
self.assertEqual(expected.strip(), root.pformat())
root[-1].add(tree.Node("Search.1"))
expected = """
CEO
|__Infra
| |__Infra.1
|__Mail
|__Search
|__Search.1
"""
self.assertEqual(expected.strip(), root.pformat())
root[-1].add(tree.Node("Search.2"))
expected = """
CEO
|__Infra
| |__Infra.1
|__Mail
|__Search
|__Search.1
|__Search.2
"""
self.assertEqual(expected.strip(), root.pformat())
root[0].add(tree.Node("Infra.2"))
expected = """
CEO
|__Infra
| |__Infra.1
| |__Infra.2
|__Mail
|__Search
|__Search.1
|__Search.2
"""
self.assertEqual(expected.strip(), root.pformat())
root[0].add(tree.Node("Infra.3"))
expected = """
CEO
|__Infra
| |__Infra.1
| |__Infra.2
| |__Infra.3
|__Mail
|__Search
|__Search.1
|__Search.2
"""
self.assertEqual(expected.strip(), root.pformat())
root[0][-1].add(tree.Node("Infra.3.1"))
expected = """
CEO
|__Infra
| |__Infra.1
| |__Infra.2
| |__Infra.3
| |__Infra.3.1
|__Mail
|__Search
|__Search.1
|__Search.2
"""
self.assertEqual(expected.strip(), root.pformat())
root[-1][0].add(tree.Node("Search.1.1"))
expected = """
CEO
|__Infra
| |__Infra.1
| |__Infra.2
| |__Infra.3
| |__Infra.3.1
|__Mail
|__Search
|__Search.1
| |__Search.1.1
|__Search.2
"""
self.assertEqual(expected.strip(), root.pformat())
root[1].add(tree.Node("Mail.1"))
expected = """
CEO
|__Infra
| |__Infra.1
| |__Infra.2
| |__Infra.3
| |__Infra.3.1
|__Mail
| |__Mail.1
|__Search
|__Search.1
| |__Search.1.1
|__Search.2
"""
self.assertEqual(expected.strip(), root.pformat())
root[1][0].add(tree.Node("Mail.1.1"))
expected = """
CEO
|__Infra
| |__Infra.1
| |__Infra.2
| |__Infra.3
| |__Infra.3.1
|__Mail
| |__Mail.1
| |__Mail.1.1
|__Search
|__Search.1
| |__Search.1.1
|__Search.2
"""
self.assertEqual(expected.strip(), root.pformat())
def test_path(self):
root = self._make_species()
human = root.find("human")

View File

@@ -14,56 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import futurist
import testtools
from taskflow import test
from taskflow.utils import async_utils as au
from taskflow.utils import eventlet_utils as eu
class WaitForAnyTestsMixin(object):
timeout = 0.001
def test_waits_and_finishes(self):
def foo():
pass
with self._make_executor(2) as e:
fs = [e.submit(foo), e.submit(foo)]
# this test assumes that our foo will end within 10 seconds
done, not_done = au.wait_for_any(fs, 10)
self.assertIn(len(done), (1, 2))
self.assertTrue(any(f in done for f in fs))
def test_not_done_futures(self):
fs = [futurist.Future(), futurist.Future()]
done, not_done = au.wait_for_any(fs, self.timeout)
self.assertEqual(len(done), 0)
self.assertEqual(len(not_done), 2)
def test_mixed_futures(self):
f1 = futurist.Future()
f2 = futurist.Future()
f2.set_result(1)
done, not_done = au.wait_for_any([f1, f2], self.timeout)
self.assertEqual(len(done), 1)
self.assertEqual(len(not_done), 1)
self.assertIs(not_done.pop(), f1)
self.assertIs(done.pop(), f2)
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class AsyncUtilsEventletTest(test.TestCase,
WaitForAnyTestsMixin):
def _make_executor(self, max_workers):
return futurist.GreenThreadPoolExecutor(max_workers=max_workers)
class AsyncUtilsThreadedTest(test.TestCase,
WaitForAnyTestsMixin):
def _make_executor(self, max_workers):
return futurist.ThreadPoolExecutor(max_workers=max_workers)
class MakeCompletedFutureTest(test.TestCase):
@@ -73,9 +25,3 @@ class MakeCompletedFutureTest(test.TestCase):
future = au.make_completed_future(result)
self.assertTrue(future.done())
self.assertIs(future.result(), result)
class AsyncUtilsSynchronousTest(test.TestCase,
WaitForAnyTestsMixin):
def _make_executor(self, max_workers):
return futurist.SynchronousExecutor()

View File

@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import string
from six.moves import range as compat_range
from taskflow import test
from taskflow.utils import iter_utils
def forever_it():
i = 0
while True:
yield i
i += 1
class IterUtilsTest(test.TestCase):
def test_find_first_match(self):
it = forever_it()
self.assertEqual(100, iter_utils.find_first_match(it,
lambda v: v == 100))
def test_find_first_match_not_found(self):
it = iter(string.ascii_lowercase)
self.assertIsNone(iter_utils.find_first_match(it,
lambda v: v == ''))
def test_count(self):
self.assertEqual(0, iter_utils.count([]))
self.assertEqual(1, iter_utils.count(['a']))
self.assertEqual(10, iter_utils.count(compat_range(0, 10)))
self.assertEqual(1000, iter_utils.count(compat_range(0, 1000)))
self.assertEqual(0, iter_utils.count(compat_range(0)))
self.assertEqual(0, iter_utils.count(compat_range(-1)))
def test_while_is_not(self):
it = iter(string.ascii_lowercase)
self.assertEqual(['a'],
list(iter_utils.while_is_not(it, 'a')))
it = iter(string.ascii_lowercase)
self.assertEqual(['a', 'b'],
list(iter_utils.while_is_not(it, 'b')))
self.assertEqual(list(string.ascii_lowercase[2:]),
list(iter_utils.while_is_not(it, 'zzz')))
it = iter(string.ascii_lowercase)
self.assertEqual(list(string.ascii_lowercase),
list(iter_utils.while_is_not(it, '')))

View File

@@ -15,6 +15,7 @@
# under the License.
import futurist
from futurist import waiters
from oslo_utils import uuidutils
from taskflow.engines.action_engine import executor as base_executor
@@ -78,7 +79,7 @@ class TestPipeline(test.TestCase):
progress_callback = lambda *args, **kwargs: None
f = executor.execute_task(t, uuidutils.generate_uuid(), {},
progress_callback=progress_callback)
async_utils.wait_for_any([f])
waiters.wait_for_any([f])
event, result = f.result()
self.assertEqual(1, result)
@@ -94,7 +95,7 @@ class TestPipeline(test.TestCase):
progress_callback = lambda *args, **kwargs: None
f = executor.execute_task(t, uuidutils.generate_uuid(), {},
progress_callback=progress_callback)
async_utils.wait_for_any([f])
waiters.wait_for_any([f])
action, result = f.result()
self.assertIsInstance(result, failure.Failure)

View File

@@ -123,7 +123,11 @@ class GiveBackRevert(task.Task):
return value + 1
def revert(self, *args, **kwargs):
return kwargs.get('result') + 1
result = kwargs.get('result')
# If this somehow fails, timeout, or other don't send back a
# valid result...
if isinstance(result, six.integer_types):
return result + 1
class FakeTask(object):

View File

@@ -22,6 +22,7 @@ import os
import six
from taskflow.utils import iter_utils
from taskflow.utils import misc
@@ -77,11 +78,25 @@ class _BFSIter(object):
class Node(object):
"""A n-ary node class that can be used to create tree structures."""
# Constants used when pretty formatting the node (and its children).
#: Default string prefix used in :py:meth:`.pformat`.
STARTING_PREFIX = ""
#: Default string used to create empty space used in :py:meth:`.pformat`.
EMPTY_SPACE_SEP = " "
HORIZONTAL_CONN = "__"
"""
Default string used to horizontally connect a node to its
parent (used in :py:meth:`.pformat`.).
"""
VERTICAL_CONN = "|"
"""
Default string used to vertically connect a node to its
parent (used in :py:meth:`.pformat`).
"""
#: Default line separator used in :py:meth:`.pformat`.
LINE_SEP = os.linesep
def __init__(self, item, **kwargs):
@@ -124,18 +139,22 @@ class Node(object):
yield node
node = node.parent
def find(self, item, only_direct=False, include_self=True):
"""Returns the node for an item if it exists in this node.
def find_first_match(self, matcher, only_direct=False, include_self=True):
"""Finds the *first* node that matching callback returns true.
This will search not only this node but also any children nodes and
finally if nothing is found then None is returned instead of a node
object.
This will search not only this node but also any children nodes (in
depth first order, from right to left) and finally if nothing is
matched then ``None`` is returned instead of a node object.
:param item: item to lookup.
:param only_direct: only look at current node and its direct children.
:param matcher: callback that takes one positional argument (a node)
and returns true if it matches desired node or false
if not.
:param only_direct: only look at current node and its
direct children (implies that this does not
search using depth first).
:param include_self: include the current node during searching.
:returns: the node for an item if it exists in this node
:returns: the node that matched (or ``None``)
"""
if only_direct:
if include_self:
@@ -144,10 +163,26 @@ class Node(object):
it = self.reverse_iter()
else:
it = self.dfs_iter(include_self=include_self)
for n in it:
if n.item == item:
return n
return None
return iter_utils.find_first_match(it, matcher)
def find(self, item, only_direct=False, include_self=True):
"""Returns the *first* node for an item if it exists in this node.
This will search not only this node but also any children nodes (in
depth first order, from right to left) and finally if nothing is
matched then ``None`` is returned instead of a node object.
:param item: item to look for.
:param only_direct: only look at current node and its
direct children (implies that this does not
search using depth first).
:param include_self: include the current node during searching.
:returns: the node that matched provided item (or ``None``)
"""
return self.find_first_match(lambda n: n.item == item,
only_direct=only_direct,
include_self=include_self)
def disassociate(self):
"""Removes this node from its parent (if any).
@@ -176,7 +211,9 @@ class Node(object):
the normally returned *removed* node object.
:param item: item to lookup.
:param only_direct: only look at current node and its direct children.
:param only_direct: only look at current node and its
direct children (implies that this does not
search using depth first).
:param include_self: include the current node during searching.
"""
node = self.find(item, only_direct=only_direct,
@@ -200,8 +237,11 @@ class Node(object):
# NOTE(harlowja): 0 is the right most index, len - 1 is the left most
return self._children[index]
def pformat(self, stringify_node=None):
"""Recursively formats a node into a nice string representation.
def pformat(self, stringify_node=None,
linesep=LINE_SEP, vertical_conn=VERTICAL_CONN,
horizontal_conn=HORIZONTAL_CONN, empty_space=EMPTY_SPACE_SEP,
starting_prefix=STARTING_PREFIX):
"""Formats this node + children into a nice string representation.
**Example**::
@@ -220,33 +260,73 @@ class Node(object):
|__Mobile
|__Mail
"""
def _inner_pformat(node, level, stringify_node):
if level == 0:
yield stringify_node(node)
prefix = self.STARTING_PREFIX
else:
yield self.HORIZONTAL_CONN + stringify_node(node)
prefix = self.EMPTY_SPACE_SEP * len(self.HORIZONTAL_CONN)
child_count = node.child_count()
for (i, child) in enumerate(node):
for (j, text) in enumerate(_inner_pformat(child,
level + 1,
stringify_node)):
if j == 0 or i + 1 < child_count:
text = prefix + self.VERTICAL_CONN + text
else:
text = prefix + self.EMPTY_SPACE_SEP + text
yield text
if stringify_node is None:
# Default to making a unicode string out of the nodes item...
stringify_node = lambda node: six.text_type(node.item)
expected_lines = self.child_count(only_direct=False)
accumulator = six.StringIO()
for i, line in enumerate(_inner_pformat(self, 0, stringify_node)):
accumulator.write(line)
if i < expected_lines:
accumulator.write(self.LINE_SEP)
return accumulator.getvalue()
expected_lines = self.child_count(only_direct=False) + 1
buff = six.StringIO()
conn = vertical_conn + horizontal_conn
stop_at_parent = self
for i, node in enumerate(self.dfs_iter(include_self=True), 1):
prefix = []
connected_to_parent = False
last_node = node
# Walk through *most* of this nodes parents, and form the expected
# prefix that each parent should require, repeat this until we
# hit the root node (self) and use that as our nodes prefix
# string...
parent_node_it = iter_utils.while_is_not(
node.path_iter(include_self=True), stop_at_parent)
for j, parent_node in enumerate(parent_node_it):
if parent_node is stop_at_parent:
if j > 0:
if not connected_to_parent:
prefix.append(conn)
connected_to_parent = True
else:
# If the node was connected already then it must
# have had more than one parent, so we want to put
# the right final starting prefix on (which may be
# a empty space or another vertical connector)...
last_node = self._children[-1]
m = last_node.find_first_match(lambda n: n is node,
include_self=False,
only_direct=False)
if m is not None:
prefix.append(empty_space)
else:
prefix.append(vertical_conn)
elif parent_node is node:
# Skip ourself... (we only include ourself so that
# we can use the 'j' variable to determine if the only
# node requested is ourself in the first place); used
# in the first conditional here...
pass
else:
if not connected_to_parent:
prefix.append(conn)
spaces = len(horizontal_conn)
connected_to_parent = True
else:
# If we have already been connected to our parent
# then determine if this current node is the last
# node of its parent (and in that case just put
# on more spaces), otherwise put a vertical connector
# on and less spaces...
if parent_node[-1] is not last_node:
prefix.append(vertical_conn)
spaces = len(horizontal_conn)
else:
spaces = len(conn)
prefix.append(empty_space * spaces)
last_node = parent_node
prefix.append(starting_prefix)
for prefix_piece in reversed(prefix):
buff.write(prefix_piece)
buff.write(stringify_node(node))
if i != expected_lines:
buff.write(linesep)
return buff.getvalue()
def child_count(self, only_direct=True):
"""Returns how many children this node has.
@@ -257,10 +337,7 @@ class Node(object):
NOTE(harlowja): it does not account for the current node in this count.
"""
if not only_direct:
count = 0
for _node in self.dfs_iter():
count += 1
return count
return iter_utils.count(self.dfs_iter())
return len(self._children)
def __iter__(self):

View File

@@ -14,20 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from concurrent import futures as _futures
from concurrent.futures import _base
import futurist
from oslo_utils import importutils
greenthreading = importutils.try_import('eventlet.green.threading')
from taskflow.utils import eventlet_utils as eu
_DONE_STATES = frozenset([
_base.CANCELLED_AND_NOTIFIED,
_base.FINISHED,
])
def make_completed_future(result):
@@ -35,78 +22,3 @@ def make_completed_future(result):
future = futurist.Future()
future.set_result(result)
return future
def wait_for_any(fs, timeout=None):
"""Wait for one of the futures to complete.
Works correctly with both green and non-green futures (but not both
together, since this can't be guaranteed to avoid dead-lock due to how
the waiting implementations are different when green threads are being
used).
Returns pair (done futures, not done futures).
"""
# TODO(harlowja): remove this when
# https://review.openstack.org/#/c/196269/ is merged and is made
# available.
green_fs = sum(1 for f in fs if isinstance(f, futurist.GreenFuture))
if not green_fs:
return _futures.wait(fs,
timeout=timeout,
return_when=_futures.FIRST_COMPLETED)
else:
non_green_fs = len(fs) - green_fs
if non_green_fs:
raise RuntimeError("Can not wait on %s green futures and %s"
" non-green futures in the same `wait_for_any`"
" call" % (green_fs, non_green_fs))
else:
return _wait_for_any_green(fs, timeout=timeout)
class _GreenWaiter(object):
"""Provides the event that wait_for_any() blocks on."""
def __init__(self):
self.event = greenthreading.Event()
def add_result(self, future):
self.event.set()
def add_exception(self, future):
self.event.set()
def add_cancelled(self, future):
self.event.set()
def _partition_futures(fs):
done = set()
not_done = set()
for f in fs:
if f._state in _DONE_STATES:
done.add(f)
else:
not_done.add(f)
return done, not_done
def _wait_for_any_green(fs, timeout=None):
eu.check_for_eventlet(RuntimeError('Eventlet is needed to wait on'
' green futures'))
with _base._AcquireFutures(fs):
done, not_done = _partition_futures(fs)
if done:
return _base.DoneAndNotDoneFutures(done, not_done)
waiter = _GreenWaiter()
for f in fs:
f._waiters.append(waiter)
waiter.event.wait(timeout)
for f in fs:
f._waiters.remove(waiter)
with _base._AcquireFutures(fs):
done, not_done = _partition_futures(fs)
return _base.DoneAndNotDoneFutures(done, not_done)

View File

@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def count(it):
"""Returns how many values in the iterator (depletes the iterator)."""
return sum(1 for _value in it)
def find_first_match(it, matcher, not_found_value=None):
"""Searches iterator for first value that matcher callback returns true."""
for value in it:
if matcher(value):
return value
return not_found_value
def while_is_not(it, stop_value):
"""Yields given values from iterator until stop value is passed.
This uses the ``is`` operator to determine equivalency (and not the
``==`` operator).
"""
for value in it:
yield value
if value is stop_value:
break

View File

@@ -3,7 +3,7 @@
# process, which may cause wedges in the gate later.
hacking<0.11,>=0.10.0
oslotest>=1.7.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
mock>=1.2
testtools>=1.4.0
testscenarios>=0.4
@@ -22,9 +22,9 @@ kazoo>=2.2
redis>=2.10.0
# Used for testing database persistence backends.
SQLAlchemy<1.1.0,>=0.9.7
alembic>=0.7.2
psycopg2
SQLAlchemy<1.1.0,>=0.9.9
alembic>=0.8.0
psycopg2>=2.5
PyMySQL>=0.6.2 # MIT License
# Used for making sure we still work with eventlet.

View File

@@ -31,12 +31,12 @@ import pydot
from automaton import machines
from taskflow.engines.action_engine import runner
from taskflow.engines.action_engine import builder
from taskflow.engines.worker_based import protocol
from taskflow import states
# This is just needed to get at the runner builder object (we will not
# This is just needed to get at the machine object (we will not
# actually be running it...).
class DummyRuntime(object):
def __init__(self):
@@ -134,9 +134,9 @@ def main():
list(states._ALLOWED_RETRY_TRANSITIONS))
elif options.engines:
source_type = "Engines"
r = runner.Runner(DummyRuntime(), mock.MagicMock())
source, memory = r.build()
internal_states.extend(runner._META_STATES)
b = builder.MachineBuilder(DummyRuntime(), mock.MagicMock())
source, memory = b.build()
internal_states.extend(builder.META_STATES)
ordering = 'out'
elif options.wbe_requests:
source_type = "WBE requests"