diff --git a/.gitignore b/.gitignore index 3cf05e0c..b645cf69 100644 --- a/.gitignore +++ b/.gitignore @@ -22,7 +22,8 @@ lib64 pip-log.txt # Unit test / coverage reports -.coverage +.coverage* +.diagram-tools/* .tox nosetests.xml .venv diff --git a/.testr.conf b/.testr.conf index 8aa2cd00..339bf3e7 100644 --- a/.testr.conf +++ b/.testr.conf @@ -2,6 +2,7 @@ test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \ OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \ OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-160} \ + OS_DEBUG=${OS_DEBUG:-TRACE} \ ${PYTHON:-python} -m subunit.run discover -t ./ ./taskflow/tests $LISTOPT $IDOPTION test_id_option=--load-list $IDFILE diff --git a/doc/diagrams/tasks.graffle.tgz b/doc/diagrams/tasks.graffle.tgz new file mode 100644 index 00000000..c014c0c1 Binary files /dev/null and b/doc/diagrams/tasks.graffle.tgz differ diff --git a/doc/source/arguments_and_results.rst b/doc/source/arguments_and_results.rst index a8cc2e57..619a3c41 100644 --- a/doc/source/arguments_and_results.rst +++ b/doc/source/arguments_and_results.rst @@ -87,7 +87,7 @@ Rebinding stored with a name other than the corresponding arguments name. That's when the ``rebind`` constructor parameter comes in handy. Using it the flow author can instruct the engine to fetch a value from storage by one name, but pass it -to a tasks/retrys ``execute`` method with another name. There are two possible +to a tasks/retries ``execute`` method with another name. There are two possible ways of accomplishing this. The first is to pass a dictionary that maps the argument name to the name @@ -303,7 +303,7 @@ Default provides ++++++++++++++++ As mentioned above, the default base class provides nothing, which means -results are not accessible to other tasks/retrys in the flow. +results are not accessible to other tasks/retries in the flow. The author can override this and specify default value for provides using the ``default_provides`` class/instance variable: @@ -386,7 +386,7 @@ A |Retry| controller works with arguments in the same way as a |Task|. But it has an additional parameter ``'history'`` that is itself a :py:class:`~taskflow.retry.History` object that contains what failed over all the engines attempts (aka the outcomes). The history object can be -viewed as a tuple that contains a result of the previous retrys run and a +viewed as a tuple that contains a result of the previous retries run and a table/dict where each key is a failed atoms name and each value is a :py:class:`~taskflow.types.failure.Failure` object. @@ -415,7 +415,7 @@ the following history (printed as a list):: At this point (since the implementation returned ``RETRY``) the |retry.execute| method will be called again and it will receive the same -history and it can then return a value that subseqent tasks can use to alter +history and it can then return a value that subsequent tasks can use to alter their behavior. If instead the |retry.execute| method itself raises an exception, diff --git a/doc/source/atoms.rst b/doc/source/atoms.rst index 2b7c9d95..f71fa601 100644 --- a/doc/source/atoms.rst +++ b/doc/source/atoms.rst @@ -29,6 +29,13 @@ it (they are *nearly* analogous to functions). These task objects all derive from :py:class:`~taskflow.task.BaseTask` which defines what a task must provide in terms of properties and methods. +**For example:** + +.. image:: img/tasks.png + :width: 525px + :align: left + :alt: Task outline. + Currently the following *provided* types of task subclasses are: * :py:class:`~taskflow.task.Task`: useful for inheriting from and creating your diff --git a/doc/source/engines.rst b/doc/source/engines.rst index 6f163231..04a4bb86 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -85,7 +85,7 @@ Of course these kind of features can come with some drawbacks: away from (and this is likely a mindset change for programmers used to the imperative model). We have worked to make this less of a concern by creating and encouraging the usage of :doc:`persistence `, to help make - it possible to have state and tranfer that state via a argument input and + it possible to have state and transfer that state via a argument input and output mechanism. * Depending on how much imperative code exists (and state inside that code) there *may* be *significant* rework of that code and converting or @@ -258,9 +258,10 @@ Execution The graph (and helper objects) previously created are now used for guiding further execution (see :py:func:`~taskflow.engines.base.Engine.run`). The flow is put into the ``RUNNING`` :doc:`state ` and a -:py:class:`~taskflow.engines.action_engine.runner.Runner` implementation -object starts to take over and begins going through the stages listed -below (for a more visual diagram/representation see +:py:class:`~taskflow.engines.action_engine.builder.MachineBuilder` state +machine object and runner object are built (using the `automaton`_ library). +That machine and associated runner then starts to take over and begins going +through the stages listed below (for a more visual diagram/representation see the :ref:`engine state diagram `). .. note:: @@ -338,8 +339,8 @@ above stages will be restarted and resuming will occur). Finishing --------- -At this point the -:py:class:`~taskflow.engines.action_engine.runner.Runner` has +At this point the machine (and runner) that was built using the +:py:class:`~taskflow.engines.action_engine.builder.MachineBuilder` class has now finished successfully, failed, or the execution was suspended. Depending on which one of these occurs will cause the flow to enter a new state (typically one of ``FAILURE``, ``SUSPENDED``, ``SUCCESS`` or ``REVERTED``). @@ -365,9 +366,9 @@ this performs is a transition of the flow state from ``RUNNING`` into a ``SUSPENDING`` state (which will later transition into a ``SUSPENDED`` state). Since an engine may be remotely executing atoms (or locally executing them) and there is currently no preemption what occurs is that the engines -:py:class:`~taskflow.engines.action_engine.runner.Runner` state machine will -detect this transition into ``SUSPENDING`` has occurred and the state -machine will avoid scheduling new work (it will though let active work +:py:class:`~taskflow.engines.action_engine.builder.MachineBuilder` state +machine will detect this transition into ``SUSPENDING`` has occurred and the +state machine will avoid scheduling new work (it will though let active work continue). After the current work has finished the engine will transition from ``SUSPENDING`` into ``SUSPENDED`` and return from its :py:func:`~taskflow.engines.base.Engine.run` method. @@ -444,10 +445,10 @@ Components cycle). .. automodule:: taskflow.engines.action_engine.analyzer +.. automodule:: taskflow.engines.action_engine.builder .. automodule:: taskflow.engines.action_engine.compiler .. automodule:: taskflow.engines.action_engine.completer .. automodule:: taskflow.engines.action_engine.executor -.. automodule:: taskflow.engines.action_engine.runner .. automodule:: taskflow.engines.action_engine.runtime .. automodule:: taskflow.engines.action_engine.scheduler .. autoclass:: taskflow.engines.action_engine.scopes.ScopeWalker @@ -462,6 +463,7 @@ Hierarchy taskflow.engines.worker_based.engine.WorkerBasedActionEngine :parts: 1 +.. _automaton: http://docs.openstack.org/developer/automaton/ .. _multiprocessing: https://docs.python.org/2/library/multiprocessing.html .. _future: https://docs.python.org/dev/library/concurrent.futures.html#future-objects .. _executor: https://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.Executor diff --git a/doc/source/img/tasks.png b/doc/source/img/tasks.png new file mode 100644 index 00000000..cca91f99 Binary files /dev/null and b/doc/source/img/tasks.png differ diff --git a/doc/source/jobs.rst b/doc/source/jobs.rst index 3cd7f95f..f1f457a1 100644 --- a/doc/source/jobs.rst +++ b/doc/source/jobs.rst @@ -215,7 +215,7 @@ Redis **Board type**: ``'redis'`` Uses `redis`_ to provide the jobboard capabilities and semantics by using -a redis hash datastructure and individual job ownership keys (that can +a redis hash data structure and individual job ownership keys (that can optionally expire after a given amount of time). .. note:: @@ -303,5 +303,5 @@ Hierarchy .. _paradigm shift: https://wiki.openstack.org/wiki/TaskFlow/Paradigm_shifts#Workflow_ownership_transfer .. _zookeeper: http://zookeeper.apache.org/ .. _kazoo: http://kazoo.readthedocs.org/ -.. _stevedore: http://stevedore.readthedocs.org/ +.. _stevedore: http://docs.openstack.org/developer/stevedore/ .. _redis: http://redis.io/ diff --git a/doc/source/persistence.rst b/doc/source/persistence.rst index 8b451d42..905ff746 100644 --- a/doc/source/persistence.rst +++ b/doc/source/persistence.rst @@ -31,7 +31,7 @@ This abstraction serves the following *major* purposes: vs. stop. * *Something you create...* -.. _stevedore: http://stevedore.readthedocs.org/ +.. _stevedore: http://docs.openstack.org/developer/stevedore/ How it is used ============== diff --git a/doc/source/utils.rst b/doc/source/utils.rst index d8da6c0c..7878cbcb 100644 --- a/doc/source/utils.rst +++ b/doc/source/utils.rst @@ -23,6 +23,11 @@ Eventlet .. automodule:: taskflow.utils.eventlet_utils +Iterators +~~~~~~~~~ + +.. automodule:: taskflow.utils.iter_utils + Kazoo ~~~~~ diff --git a/doc/source/workers.rst b/doc/source/workers.rst index 95c15598..a2dc941d 100644 --- a/doc/source/workers.rst +++ b/doc/source/workers.rst @@ -286,10 +286,10 @@ but not *yet* consumed. **PENDING** - Worker accepted request and is pending to run using its executor (threads, processes, or other). -**FAILURE** - Worker failed after running request (due to task exeception) or +**FAILURE** - Worker failed after running request (due to task exception) or no worker moved/started executing (by placing the request into ``RUNNING`` state) with-in specified time span (this defaults to 60 seconds unless -overriden). +overridden). **RUNNING** - Workers executor (using threads, processes...) has started to run requested task (once this state is transitioned to any request timeout no diff --git a/requirements.txt b/requirements.txt index 7ae90991..760dfcfa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ # process, which may cause wedges in the gate later. # See: https://bugs.launchpad.net/pbr/+bug/1384919 for why this is here... -pbr<2.0,>=1.3 +pbr<2.0,>=1.6 # Packages needed for using this library. @@ -20,7 +20,7 @@ futurist>=0.1.2 # Apache-2.0 fasteners>=0.7 # Apache-2.0 # Very nice graph library -networkx>=1.8 +networkx>=1.10 # For contextlib new additions/compatibility for <= python 3.3 contextlib2>=0.4.0 # PSF License @@ -32,16 +32,16 @@ stevedore>=1.5.0 # Apache-2.0 futures>=3.0;python_version=='2.7' or python_version=='2.6' # Backport for time.monotonic which is in 3.3+ -monotonic>=0.1 # Apache-2.0 +monotonic>=0.3 # Apache-2.0 # Used for structured input validation jsonschema!=2.5.0,<3.0.0,>=2.0.0 # For the state machine we run with -automaton>=0.2.0 # Apache-2.0 +automaton>=0.5.0 # Apache-2.0 # For common utilities -oslo.utils>=1.9.0 # Apache-2.0 +oslo.utils>=2.0.0 # Apache-2.0 oslo.serialization>=1.4.0 # Apache-2.0 # For lru caches and such diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py index 3a88a461..78d4c29f 100644 --- a/taskflow/engines/action_engine/analyzer.py +++ b/taskflow/engines/action_engine/analyzer.py @@ -14,8 +14,9 @@ # License for the specific language governing permissions and limitations # under the License. -import functools +import abc import itertools +import weakref from networkx.algorithms import traversal import six @@ -23,7 +24,33 @@ import six from taskflow import states as st -class IgnoreDecider(object): +@six.add_metaclass(abc.ABCMeta) +class Decider(object): + """Base class for deciders. + + Provides interface to be implemented by sub-classes + Decider checks whether next atom in flow should be executed or not + """ + + @abc.abstractmethod + def check(self, runtime): + """Returns bool of whether this decider should allow running.""" + + @abc.abstractmethod + def affect(self, runtime): + """If the :py:func:`~.check` returns false, affects associated atoms. + + """ + + def check_and_affect(self, runtime): + """Handles :py:func:`~.check` + :py:func:`~.affect` in right order.""" + proceed = self.check(runtime) + if not proceed: + self.affect(runtime) + return proceed + + +class IgnoreDecider(Decider): """Checks any provided edge-deciders and determines if ok to run.""" def __init__(self, atom, edge_deciders): @@ -51,15 +78,8 @@ class IgnoreDecider(object): runtime.reset_nodes(itertools.chain([self._atom], successors_iter), state=st.IGNORE, intention=st.IGNORE) - def check_and_affect(self, runtime): - """Handles :py:func:`~.check` + :py:func:`~.affect` in right order.""" - proceed = self.check(runtime) - if not proceed: - self.affect(runtime) - return proceed - -class NoOpDecider(object): +class NoOpDecider(Decider): """No-op decider that says it is always ok to run & has no effect(s).""" def check(self, runtime): @@ -69,13 +89,6 @@ class NoOpDecider(object): def affect(self, runtime): """Does nothing.""" - def check_and_affect(self, runtime): - """Handles :py:func:`~.check` + :py:func:`~.affect` in right order. - - Does nothing. - """ - return self.check(runtime) - class Analyzer(object): """Analyzes a compilation and aids in execution processes. @@ -88,12 +101,9 @@ class Analyzer(object): """ def __init__(self, runtime): + self._runtime = weakref.proxy(runtime) self._storage = runtime.storage self._execution_graph = runtime.compilation.execution_graph - self._check_atom_transition = runtime.check_atom_transition - self._fetch_edge_deciders = runtime.fetch_edge_deciders - self._fetch_retries = functools.partial( - runtime.fetch_atoms_by_kind, 'retry') def get_next_nodes(self, node=None): """Get next nodes to run (originating from node or all nodes).""" @@ -161,7 +171,8 @@ class Analyzer(object): state = self.get_state(atom) intention = self._storage.get_atom_intention(atom.name) - transition = self._check_atom_transition(atom, state, st.RUNNING) + transition = self._runtime.check_atom_transition(atom, state, + st.RUNNING) if not transition or intention != st.EXECUTE: return (False, None) @@ -177,7 +188,7 @@ class Analyzer(object): if not ok_to_run: return (False, None) else: - edge_deciders = self._fetch_edge_deciders(atom) + edge_deciders = self._runtime.fetch_edge_deciders(atom) return (True, IgnoreDecider(atom, edge_deciders)) def _get_maybe_ready_for_revert(self, atom): @@ -185,7 +196,8 @@ class Analyzer(object): state = self.get_state(atom) intention = self._storage.get_atom_intention(atom.name) - transition = self._check_atom_transition(atom, state, st.REVERTING) + transition = self._runtime.check_atom_transition(atom, state, + st.REVERTING) if not transition or intention not in (st.REVERT, st.RETRY): return (False, None) @@ -213,7 +225,7 @@ class Analyzer(object): If no state is provided it will yield back all retry atoms. """ - for atom in self._fetch_retries(): + for atom in self._runtime.fetch_atoms_by_kind('retry'): if not state or self.get_state(atom) == state: yield atom diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/builder.py similarity index 66% rename from taskflow/engines/action_engine/runner.py rename to taskflow/engines/action_engine/builder.py index f02f3f09..9ab26d4a 100644 --- a/taskflow/engines/action_engine/runner.py +++ b/taskflow/engines/action_engine/builder.py @@ -14,37 +14,37 @@ # License for the specific language governing permissions and limitations # under the License. +import weakref from automaton import machines -from automaton import runners from taskflow import logging from taskflow import states as st from taskflow.types import failure -# Waiting state timeout (in seconds). -_WAITING_TIMEOUT = 60 +# Default waiting state timeout (in seconds). +WAITING_TIMEOUT = 60 # Meta states the state machine uses. -_UNDEFINED = 'UNDEFINED' -_GAME_OVER = 'GAME_OVER' -_META_STATES = (_GAME_OVER, _UNDEFINED) +UNDEFINED = 'UNDEFINED' +GAME_OVER = 'GAME_OVER' +META_STATES = (GAME_OVER, UNDEFINED) # Event name constants the state machine uses. -_SCHEDULE = 'schedule_next' -_WAIT = 'wait_finished' -_ANALYZE = 'examine_finished' -_FINISH = 'completed' -_FAILED = 'failed' -_SUSPENDED = 'suspended' -_SUCCESS = 'success' -_REVERTED = 'reverted' -_START = 'start' +SCHEDULE = 'schedule_next' +WAIT = 'wait_finished' +ANALYZE = 'examine_finished' +FINISH = 'completed' +FAILED = 'failed' +SUSPENDED = 'suspended' +SUCCESS = 'success' +REVERTED = 'reverted' +START = 'start' LOG = logging.getLogger(__name__) -class _MachineMemory(object): +class MachineMemory(object): """State machine memory.""" def __init__(self): @@ -54,31 +54,31 @@ class _MachineMemory(object): self.done = set() -class Runner(object): - """State machine *builder* + *runner* that powers the engine components. +class MachineBuilder(object): + """State machine *builder* that powers the engine components. NOTE(harlowja): the machine (states and events that will trigger transitions) that this builds is represented by the following table:: +--------------+------------------+------------+----------+---------+ - Start | Event | End | On Enter | On Exit + | Start | Event | End | On Enter | On Exit | +--------------+------------------+------------+----------+---------+ - ANALYZING | completed | GAME_OVER | | - ANALYZING | schedule_next | SCHEDULING | | - ANALYZING | wait_finished | WAITING | | - FAILURE[$] | | | | - GAME_OVER | failed | FAILURE | | - GAME_OVER | reverted | REVERTED | | - GAME_OVER | success | SUCCESS | | - GAME_OVER | suspended | SUSPENDED | | - RESUMING | schedule_next | SCHEDULING | | - REVERTED[$] | | | | - SCHEDULING | wait_finished | WAITING | | - SUCCESS[$] | | | | - SUSPENDED[$] | | | | - UNDEFINED[^] | start | RESUMING | | - WAITING | examine_finished | ANALYZING | | + | ANALYZING | completed | GAME_OVER | . | . | + | ANALYZING | schedule_next | SCHEDULING | . | . | + | ANALYZING | wait_finished | WAITING | . | . | + | FAILURE[$] | . | . | . | . | + | GAME_OVER | failed | FAILURE | . | . | + | GAME_OVER | reverted | REVERTED | . | . | + | GAME_OVER | success | SUCCESS | . | . | + | GAME_OVER | suspended | SUSPENDED | . | . | + | RESUMING | schedule_next | SCHEDULING | . | . | + | REVERTED[$] | . | . | . | . | + | SCHEDULING | wait_finished | WAITING | . | . | + | SUCCESS[$] | . | . | . | . | + | SUSPENDED[$] | . | . | . | . | + | UNDEFINED[^] | start | RESUMING | . | . | + | WAITING | examine_finished | ANALYZING | . | . | +--------------+------------------+------------+----------+---------+ Between any of these yielded states (minus ``GAME_OVER`` and ``UNDEFINED``) @@ -91,34 +91,29 @@ class Runner(object): tasks in parallel, this enables parallel running and/or reversion. """ - # Informational states this action yields while running, not useful to - # have the engine record but useful to provide to end-users when doing - # execution iterations. - ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING) - def __init__(self, runtime, waiter): - self._runtime = runtime + self._runtime = weakref.proxy(runtime) self._analyzer = runtime.analyzer self._completer = runtime.completer self._scheduler = runtime.scheduler self._storage = runtime.storage self._waiter = waiter - def runnable(self): - """Checks if the storage says the flow is still runnable/running.""" - return self._storage.get_flow_state() == st.RUNNING - def build(self, timeout=None): - """Builds a state-machine (that can be/is used during running).""" + """Builds a state-machine (that is used during running).""" - memory = _MachineMemory() + memory = MachineMemory() if timeout is None: - timeout = _WAITING_TIMEOUT + timeout = WAITING_TIMEOUT # Cache some local functions/methods... do_schedule = self._scheduler.schedule do_complete = self._completer.complete + def is_runnable(): + # Checks if the storage says the flow is still runnable... + return self._storage.get_flow_state() == st.RUNNING + def iter_next_nodes(target_node=None): # Yields and filters and tweaks the next nodes to execute... maybe_nodes = self._analyzer.get_next_nodes(node=target_node) @@ -134,7 +129,7 @@ class Runner(object): # that are now ready to be ran. memory.next_nodes.update(self._completer.resume()) memory.next_nodes.update(iter_next_nodes()) - return _SCHEDULE + return SCHEDULE def game_over(old_state, new_state, event): # This reaction function is mainly a intermediary delegation @@ -142,13 +137,13 @@ class Runner(object): # the appropriate handler that will deal with the memory values, # it is *always* called before the final state is entered. if memory.failures: - return _FAILED + return FAILED if any(1 for node in iter_next_nodes()): - return _SUSPENDED + return SUSPENDED elif self._analyzer.is_success(): - return _SUCCESS + return SUCCESS else: - return _REVERTED + return REVERTED def schedule(old_state, new_state, event): # This reaction function starts to schedule the memory's next @@ -156,14 +151,14 @@ class Runner(object): # if the user of this engine has requested the engine/storage # that holds this information to stop or suspend); handles failures # that occur during this process safely... - if self.runnable() and memory.next_nodes: + if is_runnable() and memory.next_nodes: not_done, failures = do_schedule(memory.next_nodes) if not_done: memory.not_done.update(not_done) if failures: memory.failures.extend(failures) memory.next_nodes.clear() - return _WAIT + return WAIT def wait(old_state, new_state, event): # TODO(harlowja): maybe we should start doing 'yield from' this @@ -173,7 +168,7 @@ class Runner(object): done, not_done = self._waiter(memory.not_done, timeout=timeout) memory.done.update(done) memory.not_done = not_done - return _ANALYZE + return ANALYZE def analyze(old_state, new_state, event): # This reaction function is responsible for analyzing all nodes @@ -215,13 +210,13 @@ class Runner(object): memory.failures.append(failure.Failure()) else: next_nodes.update(more_nodes) - if self.runnable() and next_nodes and not memory.failures: + if is_runnable() and next_nodes and not memory.failures: memory.next_nodes.update(next_nodes) - return _SCHEDULE + return SCHEDULE elif memory.not_done: - return _WAIT + return WAIT else: - return _FINISH + return FINISH def on_exit(old_state, event): LOG.debug("Exiting old state '%s' in response to event '%s'", @@ -239,8 +234,8 @@ class Runner(object): watchers['on_enter'] = on_enter m = machines.FiniteMachine() - m.add_state(_GAME_OVER, **watchers) - m.add_state(_UNDEFINED, **watchers) + m.add_state(GAME_OVER, **watchers) + m.add_state(UNDEFINED, **watchers) m.add_state(st.ANALYZING, **watchers) m.add_state(st.RESUMING, **watchers) m.add_state(st.REVERTED, terminal=True, **watchers) @@ -249,38 +244,25 @@ class Runner(object): m.add_state(st.SUSPENDED, terminal=True, **watchers) m.add_state(st.WAITING, **watchers) m.add_state(st.FAILURE, terminal=True, **watchers) - m.default_start_state = _UNDEFINED + m.default_start_state = UNDEFINED - m.add_transition(_GAME_OVER, st.REVERTED, _REVERTED) - m.add_transition(_GAME_OVER, st.SUCCESS, _SUCCESS) - m.add_transition(_GAME_OVER, st.SUSPENDED, _SUSPENDED) - m.add_transition(_GAME_OVER, st.FAILURE, _FAILED) - m.add_transition(_UNDEFINED, st.RESUMING, _START) - m.add_transition(st.ANALYZING, _GAME_OVER, _FINISH) - m.add_transition(st.ANALYZING, st.SCHEDULING, _SCHEDULE) - m.add_transition(st.ANALYZING, st.WAITING, _WAIT) - m.add_transition(st.RESUMING, st.SCHEDULING, _SCHEDULE) - m.add_transition(st.SCHEDULING, st.WAITING, _WAIT) - m.add_transition(st.WAITING, st.ANALYZING, _ANALYZE) + m.add_transition(GAME_OVER, st.REVERTED, REVERTED) + m.add_transition(GAME_OVER, st.SUCCESS, SUCCESS) + m.add_transition(GAME_OVER, st.SUSPENDED, SUSPENDED) + m.add_transition(GAME_OVER, st.FAILURE, FAILED) + m.add_transition(UNDEFINED, st.RESUMING, START) + m.add_transition(st.ANALYZING, GAME_OVER, FINISH) + m.add_transition(st.ANALYZING, st.SCHEDULING, SCHEDULE) + m.add_transition(st.ANALYZING, st.WAITING, WAIT) + m.add_transition(st.RESUMING, st.SCHEDULING, SCHEDULE) + m.add_transition(st.SCHEDULING, st.WAITING, WAIT) + m.add_transition(st.WAITING, st.ANALYZING, ANALYZE) - m.add_reaction(_GAME_OVER, _FINISH, game_over) - m.add_reaction(st.ANALYZING, _ANALYZE, analyze) - m.add_reaction(st.RESUMING, _START, resume) - m.add_reaction(st.SCHEDULING, _SCHEDULE, schedule) - m.add_reaction(st.WAITING, _WAIT, wait) + m.add_reaction(GAME_OVER, FINISH, game_over) + m.add_reaction(st.ANALYZING, ANALYZE, analyze) + m.add_reaction(st.RESUMING, START, resume) + m.add_reaction(st.SCHEDULING, SCHEDULE, schedule) + m.add_reaction(st.WAITING, WAIT, wait) m.freeze() - - r = runners.FiniteRunner(m) - return (m, r, memory) - - def run_iter(self, timeout=None): - """Runs iteratively using a locally built state machine.""" - machine, runner, memory = self.build(timeout=timeout) - for (_prior_state, new_state) in runner.run_iter(_START): - # NOTE(harlowja): skip over meta-states. - if new_state not in _META_STATES: - if new_state == st.FAILURE: - yield (new_state, memory.failures) - else: - yield (new_state, []) + return (m, memory) diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py index dc6c24e1..50ce4eb1 100644 --- a/taskflow/engines/action_engine/compiler.py +++ b/taskflow/engines/action_engine/compiler.py @@ -25,6 +25,7 @@ from taskflow import logging from taskflow import task from taskflow.types import graph as gr from taskflow.types import tree as tr +from taskflow.utils import iter_utils from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -232,8 +233,8 @@ class _FlowCompiler(object): @staticmethod def _occurence_detector(to_graph, from_graph): - return sum(1 for node in from_graph.nodes_iter() - if node in to_graph) + return iter_utils.count(node for node in from_graph.nodes_iter() + if node in to_graph) def _decompose_flow(self, flow, parent=None): """Decomposes a flow into a graph, tree node + decomposed subgraphs.""" @@ -371,6 +372,7 @@ class PatternCompiler(object): _FlowCompiler(self._compile, self._linker), _TaskCompiler(), ] + self._level = 0 def _compile(self, item, parent=None): """Compiles a item (pattern, task) into a graph + tree node.""" @@ -391,13 +393,28 @@ class PatternCompiler(object): " and/or recursive compiling is not" " supported" % (item, type(item))) self._history.add(item) + if LOG.isEnabledFor(logging.BLATHER): + LOG.blather("%sCompiling '%s'", " " * self._level, item) + self._level += 1 def _post_item_compile(self, item, graph, node): """Called after a item is compiled; doing post-compilation actions.""" + self._level -= 1 + if LOG.isEnabledFor(logging.BLATHER): + prefix = ' ' * self._level + LOG.blather("%sDecomposed '%s' into:", prefix, item) + prefix = ' ' * (self._level + 1) + LOG.blather("%sGraph:", prefix) + for line in graph.pformat().splitlines(): + LOG.blather("%s %s", prefix, line) + LOG.blather("%sHierarchy:", prefix) + for line in node.pformat().splitlines(): + LOG.blather("%s %s", prefix, line) def _pre_compile(self): """Called before the compilation of the root starts.""" self._history.clear() + self._level = 0 def _post_compile(self, graph, node): """Called after the compilation of the root finishes successfully.""" @@ -410,19 +427,6 @@ class PatternCompiler(object): raise exc.Empty("Root container '%s' (%s) is empty" % (self._root, type(self._root))) self._history.clear() - # NOTE(harlowja): this one can be expensive to calculate (especially - # the cycle detection), so only do it if we know BLATHER is enabled - # and not under all cases. - if LOG.isEnabledFor(logging.BLATHER): - LOG.blather("Translated '%s'", self._root) - LOG.blather("Graph:") - for line in graph.pformat().splitlines(): - # Indent it so that it's slightly offset from the above line. - LOG.blather(" %s", line) - LOG.blather("Hierarchy:") - for line in node.pformat().splitlines(): - # Indent it so that it's slightly offset from the above line. - LOG.blather(" %s", line) @fasteners.locked def compile(self): diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index ef557f93..cc6b1ac4 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -19,6 +19,7 @@ import contextlib import itertools import threading +from automaton import runners from concurrent import futures import fasteners import networkx as nx @@ -26,6 +27,7 @@ from oslo_utils import excutils from oslo_utils import strutils import six +from taskflow.engines.action_engine import builder from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import executor from taskflow.engines.action_engine import runtime @@ -59,9 +61,9 @@ class ActionEngine(base.Engine): This engine compiles the flow (and any subflows) into a compilation unit which contains the full runtime definition to be executed and then uses - this compilation unit in combination with the executor, runtime, runner - and storage classes to attempt to run your flow (and any subflows & - contained atoms) to completion. + this compilation unit in combination with the executor, runtime, machine + builder and storage classes to attempt to run your flow (and any + subflows & contained atoms) to completion. NOTE(harlowja): during this process it is permissible and valid to have a task or multiple tasks in the execution graph fail (at the same time even), @@ -77,6 +79,15 @@ class ActionEngine(base.Engine): failure/s that were captured (if any) to get reraised. """ + IGNORABLE_STATES = frozenset( + itertools.chain([states.SCHEDULING, states.WAITING, states.RESUMING, + states.ANALYZING], builder.META_STATES)) + """ + Informational states this engines internal machine yields back while + running, not useful to have the engine record but useful to provide to + end-users when doing execution iterations via :py:meth:`.run_iter`. + """ + def __init__(self, flow, flow_detail, backend, options): super(ActionEngine, self).__init__(flow, flow_detail, backend, options) self._runtime = None @@ -151,20 +162,20 @@ class ActionEngine(base.Engine): def run_iter(self, timeout=None): """Runs the engine using iteration (or die trying). - :param timeout: timeout to wait for any tasks to complete (this timeout + :param timeout: timeout to wait for any atoms to complete (this timeout will be used during the waiting period that occurs after the - waiting state is yielded when unfinished tasks are being waited - for). + waiting state is yielded when unfinished atoms are being waited + on). Instead of running to completion in a blocking manner, this will return a generator which will yield back the various states that the engine is going through (and can be used to run multiple engines at - once using a generator per engine). the iterator returned also - responds to the send() method from pep-0342 and will attempt to suspend - itself if a truthy value is sent in (the suspend may be delayed until - all active tasks have finished). + once using a generator per engine). The iterator returned also + responds to the ``send()`` method from :pep:`0342` and will attempt to + suspend itself if a truthy value is sent in (the suspend may be + delayed until all active atoms have finished). - NOTE(harlowja): using the run_iter method will **not** retain the + NOTE(harlowja): using the ``run_iter`` method will **not** retain the engine lock while executing so the user should ensure that there is only one entity using a returned engine iterator (one per engine) at a given time. @@ -172,19 +183,24 @@ class ActionEngine(base.Engine): self.compile() self.prepare() self.validate() - runner = self._runtime.runner last_state = None with _start_stop(self._task_executor, self._retry_executor): self._change_state(states.RUNNING) try: closed = False - for (last_state, failures) in runner.run_iter(timeout=timeout): - if failures: - failure.Failure.reraise_if_any(failures) + machine, memory = self._runtime.builder.build(timeout=timeout) + r = runners.FiniteRunner(machine) + for (_prior_state, new_state) in r.run_iter(builder.START): + last_state = new_state + # NOTE(harlowja): skip over meta-states. + if new_state in builder.META_STATES: + continue + if new_state == states.FAILURE: + failure.Failure.reraise_if_any(memory.failures) if closed: continue try: - try_suspend = yield last_state + try_suspend = yield new_state except GeneratorExit: # The generator was closed, attempt to suspend and # continue looping until we have cleanly closed up @@ -198,9 +214,8 @@ class ActionEngine(base.Engine): with excutils.save_and_reraise_exception(): self._change_state(states.FAILURE) else: - ignorable_states = getattr(runner, 'ignorable_states', []) - if last_state and last_state not in ignorable_states: - self._change_state(last_state) + if last_state and last_state not in self.IGNORABLE_STATES: + self._change_state(new_state) if last_state not in self.NO_RERAISING_STATES: it = itertools.chain( six.itervalues(self.storage.get_failures()), @@ -294,10 +309,7 @@ class ActionEngine(base.Engine): @fasteners.locked def reset(self): - if not self._storage_ensured: - raise exc.InvalidState("Can not reset an engine" - " which has not has its storage" - " populated") + self._check('reset', True, True) # This transitions *all* contained atoms back into the PENDING state # with an intention to EXECUTE (or dies trying to do that) and then # changes the state of the flow to PENDING so that it can then run... diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 38998b8c..232a6917 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -16,14 +16,16 @@ import functools +from futurist import waiters + from taskflow.engines.action_engine.actions import retry as ra from taskflow.engines.action_engine.actions import task as ta from taskflow.engines.action_engine import analyzer as an +from taskflow.engines.action_engine import builder as bu from taskflow.engines.action_engine import completer as co -from taskflow.engines.action_engine import runner as ru from taskflow.engines.action_engine import scheduler as sched from taskflow.engines.action_engine import scopes as sc -from taskflow import flow as flow_type +from taskflow import flow from taskflow import states as st from taskflow import task from taskflow.utils import async_utils @@ -88,7 +90,7 @@ class Runtime(object): # is able to run (or should not) ensure we retain it and use # it later as needed. u_v_data = execution_graph.adj[previous_atom][atom] - u_v_decider = u_v_data.get(flow_type.LINK_DECIDER) + u_v_decider = u_v_data.get(flow.LINK_DECIDER) if u_v_decider is not None: edge_deciders[previous_atom.name] = u_v_decider metadata['scope_walker'] = walker @@ -113,8 +115,8 @@ class Runtime(object): return an.Analyzer(self) @misc.cachedproperty - def runner(self): - return ru.Runner(self, async_utils.wait_for_any) + def builder(self): + return bu.MachineBuilder(self, waiters.wait_for_any) @misc.cachedproperty def completer(self): diff --git a/taskflow/engines/action_engine/scheduler.py b/taskflow/engines/action_engine/scheduler.py index 404781e6..5fdc1995 100644 --- a/taskflow/engines/action_engine/scheduler.py +++ b/taskflow/engines/action_engine/scheduler.py @@ -76,7 +76,7 @@ class Scheduler(object): """Safely schedules atoms using a runtime ``fetch_scheduler`` routine.""" def __init__(self, runtime): - self._fetch_scheduler = runtime.fetch_scheduler + self._runtime = weakref.proxy(runtime) def schedule(self, atoms): """Schedules the provided atoms for *future* completion. @@ -89,7 +89,7 @@ class Scheduler(object): """ futures = set() for atom in atoms: - scheduler = self._fetch_scheduler(atom) + scheduler = self._runtime.fetch_scheduler(atom) try: futures.add(scheduler.schedule(atom)) except Exception: diff --git a/taskflow/logging.py b/taskflow/logging.py index 0ededf7f..823f8b0c 100644 --- a/taskflow/logging.py +++ b/taskflow/logging.py @@ -17,7 +17,6 @@ from __future__ import absolute_import import logging -import sys _BASE = __name__.split(".", 1)[0] @@ -49,45 +48,8 @@ class _BlatherLoggerAdapter(logging.LoggerAdapter): self.warning(msg, *args, **kwargs) -# TODO(harlowja): we should remove when we no longer have to support 2.6... -if sys.version_info[0:2] == (2, 6): - - class _FixedBlatherLoggerAdapter(_BlatherLoggerAdapter): - """Ensures isEnabledFor() exists on adapters that are created.""" - - def isEnabledFor(self, level): - return self.logger.isEnabledFor(level) - - _BlatherLoggerAdapter = _FixedBlatherLoggerAdapter - - # Taken from python2.7 (same in python3.4)... - class _NullHandler(logging.Handler): - """This handler does nothing. - - It's intended to be used to avoid the - "No handlers could be found for logger XXX" one-off warning. This is - important for library code, which may contain code to log events. If a - user of the library does not configure logging, the one-off warning - might be produced; to avoid this, the library developer simply needs - to instantiate a _NullHandler and add it to the top-level logger of the - library module or package. - """ - - def handle(self, record): - """Stub.""" - - def emit(self, record): - """Stub.""" - - def createLock(self): - self.lock = None - -else: - _NullHandler = logging.NullHandler - - def getLogger(name=_BASE, extra=None): logger = logging.getLogger(name) if not logger.handlers: - logger.addHandler(_NullHandler()) + logger.addHandler(logging.NullHandler()) return _BlatherLoggerAdapter(logger, extra=extra) diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 9e255871..37da34a6 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -54,10 +54,15 @@ class Flow(flow.Flow): which will be resolved by using the *flows/tasks* provides and requires mappings or by following manually created dependency links. - From dependencies directed graph is build. If it has edge A -> B, this - means B depends on A. + From dependencies a `directed graph`_ is built. If it has edge ``A -> B``, + this means ``B`` depends on ``A`` (and that the execution of ``B`` must + wait until ``A`` has finished executing, on reverting this means that the + reverting of ``A`` must wait until ``B`` has finished reverting). - Note: Cyclic dependencies are not allowed. + Note: `cyclic`_ dependencies are not allowed. + + .. _directed graph: https://en.wikipedia.org/wiki/Directed_graph + .. _cyclic: https://en.wikipedia.org/wiki/Cycle_graph """ def __init__(self, name, retry=None): @@ -71,6 +76,12 @@ class Flow(flow.Flow): def link(self, u, v, decider=None): """Link existing node u as a runtime dependency of existing node v. + Note that if the addition of these edges creates a `cyclic`_ graph + then a :class:`~taskflow.exceptions.DependencyFailure` will be + raised and the provided changes will be discarded. If the nodes + that are being requested to link do not exist in this graph than a + :class:`ValueError` will be raised. + :param u: task or flow to create a link from (must exist already) :param v: task or flow to create a link to (must exist already) :param decider: A callback function that will be expected to decide @@ -82,6 +93,8 @@ class Flow(flow.Flow): links that have ``v`` as a target. It is expected to return a single boolean (``True`` to allow ``v`` execution or ``False`` to not). + + .. _cyclic: https://en.wikipedia.org/wiki/Cycle_graph """ if not self._graph.has_node(u): raise ValueError("Node '%s' not found to link from" % (u)) @@ -135,6 +148,11 @@ class Flow(flow.Flow): def add(self, *nodes, **kwargs): """Adds a given task/tasks/flow/flows to this flow. + Note that if the addition of these nodes (and any edges) creates + a `cyclic`_ graph then + a :class:`~taskflow.exceptions.DependencyFailure` will be + raised and the applied changes will be discarded. + :param nodes: node(s) to add to the flow :param kwargs: keyword arguments, the two keyword arguments currently processed are: @@ -144,13 +162,18 @@ class Flow(flow.Flow): symbol requirements will be matched to existing node(s) and links will be automatically made to those providers. If multiple possible providers exist - then a AmbiguousDependency exception will be raised. + then a + :class:`~taskflow.exceptions.AmbiguousDependency` + exception will be raised and the provided additions + will be discarded. * ``resolve_existing``, a boolean that when true (the default) implies that on addition of a new node that existing node(s) will have their requirements scanned for symbols that this newly added node can provide. If a match is found a link is automatically created from the newly added node to the requiree. + + .. _cyclic: https://en.wikipedia.org/wiki/Cycle_graph """ # Let's try to avoid doing any work if we can; since the below code diff --git a/taskflow/persistence/models.py b/taskflow/persistence/models.py index a1d04bbd..fd245d57 100644 --- a/taskflow/persistence/models.py +++ b/taskflow/persistence/models.py @@ -527,11 +527,6 @@ class AtomDetail(object): self.meta = {} self.version = None - @staticmethod - def _was_failure(state, result): - # Internal helper method... - return state == states.FAILURE and isinstance(result, ft.Failure) - @property def last_results(self): """Gets the atoms last result. diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_builder.py similarity index 57% rename from taskflow/tests/unit/action_engine/test_runner.py rename to taskflow/tests/unit/action_engine/test_builder.py index bff74cb9..b4067449 100644 --- a/taskflow/tests/unit/action_engine/test_runner.py +++ b/taskflow/tests/unit/action_engine/test_builder.py @@ -15,11 +15,12 @@ # under the License. from automaton import exceptions as excp +from automaton import runners import six +from taskflow.engines.action_engine import builder from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import executor -from taskflow.engines.action_engine import runner from taskflow.engines.action_engine import runtime from taskflow.patterns import linear_flow as lf from taskflow import states as st @@ -30,7 +31,8 @@ from taskflow.types import notifier from taskflow.utils import persistence_utils as pu -class _RunnerTestMixin(object): +class BuildersTest(test.TestCase): + def _make_runtime(self, flow, initial_state=None): compilation = compiler.PatternCompiler(flow).compile() flow_detail = pu.create_flow_detail(flow) @@ -51,17 +53,11 @@ class _RunnerTestMixin(object): r.compile() return r - -class RunnerTest(test.TestCase, _RunnerTestMixin): - def test_running(self): - flow = lf.Flow("root") - flow.add(*test_utils.make_many(1)) - - rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.runnable()) - - rt = self._make_runtime(flow, initial_state=st.SUSPENDED) - self.assertFalse(rt.runner.runnable()) + def _make_machine(self, flow, initial_state=None): + runtime = self._make_runtime(flow, initial_state=initial_state) + machine, memory = runtime.builder.build() + machine_runner = runners.FiniteRunner(machine) + return (runtime, machine, memory, machine_runner) def test_run_iterations(self): flow = lf.Flow("root") @@ -69,29 +65,32 @@ class RunnerTest(test.TestCase, _RunnerTestMixin): 1, task_cls=test_utils.TaskNoRequiresNoReturns) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) - it = rt.runner.run_iter() - state, failures = six.next(it) - self.assertEqual(st.RESUMING, state) - self.assertEqual(0, len(failures)) + it = machine_runner.run_iter(builder.START) + prior_state, new_state = six.next(it) + self.assertEqual(st.RESUMING, new_state) + self.assertEqual(0, len(memory.failures)) - state, failures = six.next(it) - self.assertEqual(st.SCHEDULING, state) - self.assertEqual(0, len(failures)) + prior_state, new_state = six.next(it) + self.assertEqual(st.SCHEDULING, new_state) + self.assertEqual(0, len(memory.failures)) - state, failures = six.next(it) - self.assertEqual(st.WAITING, state) - self.assertEqual(0, len(failures)) + prior_state, new_state = six.next(it) + self.assertEqual(st.WAITING, new_state) + self.assertEqual(0, len(memory.failures)) - state, failures = six.next(it) - self.assertEqual(st.ANALYZING, state) - self.assertEqual(0, len(failures)) + prior_state, new_state = six.next(it) + self.assertEqual(st.ANALYZING, new_state) + self.assertEqual(0, len(memory.failures)) - state, failures = six.next(it) - self.assertEqual(st.SUCCESS, state) - self.assertEqual(0, len(failures)) + prior_state, new_state = six.next(it) + self.assertEqual(builder.GAME_OVER, new_state) + self.assertEqual(0, len(memory.failures)) + prior_state, new_state = six.next(it) + self.assertEqual(st.SUCCESS, new_state) + self.assertEqual(0, len(memory.failures)) self.assertRaises(StopIteration, six.next, it) @@ -101,15 +100,15 @@ class RunnerTest(test.TestCase, _RunnerTestMixin): 1, task_cls=test_utils.TaskWithFailure) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) - transitions = list(rt.runner.run_iter()) - state, failures = transitions[-1] - self.assertEqual(st.REVERTED, state) - self.assertEqual([], failures) - - self.assertEqual(st.REVERTED, rt.storage.get_atom_state(tasks[0].name)) + transitions = list(machine_runner.run_iter(builder.START)) + prior_state, new_state = transitions[-1] + self.assertEqual(st.REVERTED, new_state) + self.assertEqual([], memory.failures) + self.assertEqual(st.REVERTED, + runtime.storage.get_atom_state(tasks[0].name)) def test_run_iterations_failure(self): flow = lf.Flow("root") @@ -117,18 +116,17 @@ class RunnerTest(test.TestCase, _RunnerTestMixin): 1, task_cls=test_utils.NastyFailingTask) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) - transitions = list(rt.runner.run_iter()) - state, failures = transitions[-1] - self.assertEqual(st.FAILURE, state) - self.assertEqual(1, len(failures)) - failure = failures[0] + transitions = list(machine_runner.run_iter(builder.START)) + prior_state, new_state = transitions[-1] + self.assertEqual(st.FAILURE, new_state) + self.assertEqual(1, len(memory.failures)) + failure = memory.failures[0] self.assertTrue(failure.check(RuntimeError)) - self.assertEqual(st.REVERT_FAILURE, - rt.storage.get_atom_state(tasks[0].name)) + runtime.storage.get_atom_state(tasks[0].name)) def test_run_iterations_suspended(self): flow = lf.Flow("root") @@ -136,20 +134,22 @@ class RunnerTest(test.TestCase, _RunnerTestMixin): 2, task_cls=test_utils.TaskNoRequiresNoReturns) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) transitions = [] - for state, failures in rt.runner.run_iter(): - transitions.append((state, failures)) - if state == st.ANALYZING: - rt.storage.set_flow_state(st.SUSPENDED) + for prior_state, new_state in machine_runner.run_iter(builder.START): + transitions.append((new_state, memory.failures)) + if new_state == st.ANALYZING: + runtime.storage.set_flow_state(st.SUSPENDED) state, failures = transitions[-1] self.assertEqual(st.SUSPENDED, state) self.assertEqual([], failures) - self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name)) - self.assertEqual(st.PENDING, rt.storage.get_atom_state(tasks[1].name)) + self.assertEqual(st.SUCCESS, + runtime.storage.get_atom_state(tasks[0].name)) + self.assertEqual(st.PENDING, + runtime.storage.get_atom_state(tasks[1].name)) def test_run_iterations_suspended_failure(self): flow = lf.Flow("root") @@ -160,46 +160,44 @@ class RunnerTest(test.TestCase, _RunnerTestMixin): 1, task_cls=test_utils.TaskNoRequiresNoReturns, offset=1) flow.add(*happy_tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) transitions = [] - for state, failures in rt.runner.run_iter(): - transitions.append((state, failures)) - if state == st.ANALYZING: - rt.storage.set_flow_state(st.SUSPENDED) + for prior_state, new_state in machine_runner.run_iter(builder.START): + transitions.append((new_state, memory.failures)) + if new_state == st.ANALYZING: + runtime.storage.set_flow_state(st.SUSPENDED) state, failures = transitions[-1] self.assertEqual(st.SUSPENDED, state) self.assertEqual([], failures) self.assertEqual(st.PENDING, - rt.storage.get_atom_state(happy_tasks[0].name)) + runtime.storage.get_atom_state(happy_tasks[0].name)) self.assertEqual(st.FAILURE, - rt.storage.get_atom_state(sad_tasks[0].name)) + runtime.storage.get_atom_state(sad_tasks[0].name)) - -class RunnerBuildTest(test.TestCase, _RunnerTestMixin): def test_builder_manual_process(self): flow = lf.Flow("root") tasks = test_utils.make_many( 1, task_cls=test_utils.TaskNoRequiresNoReturns) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, machine_runner, memory = rt.runner.build() - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) self.assertRaises(excp.NotInitialized, machine.process_event, 'poke') # Should now be pending... - self.assertEqual(st.PENDING, rt.storage.get_atom_state(tasks[0].name)) + self.assertEqual(st.PENDING, + runtime.storage.get_atom_state(tasks[0].name)) machine.initialize() - self.assertEqual(runner._UNDEFINED, machine.current_state) + self.assertEqual(builder.UNDEFINED, machine.current_state) self.assertFalse(machine.terminated) self.assertRaises(excp.NotFound, machine.process_event, 'poke') last_state = machine.current_state - reaction, terminal = machine.process_event('start') + reaction, terminal = machine.process_event(builder.START) self.assertFalse(terminal) self.assertIsNotNone(reaction) self.assertEqual(st.RESUMING, machine.current_state) @@ -208,7 +206,7 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin): last_state = machine.current_state cb, args, kwargs = reaction next_event = cb(last_state, machine.current_state, - 'start', *args, **kwargs) + builder.START, *args, **kwargs) reaction, terminal = machine.process_event(next_event) self.assertFalse(terminal) self.assertIsNotNone(reaction) @@ -225,7 +223,8 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin): self.assertRaises(excp.NotFound, machine.process_event, 'poke') # Should now be running... - self.assertEqual(st.RUNNING, rt.storage.get_atom_state(tasks[0].name)) + self.assertEqual(st.RUNNING, + runtime.storage.get_atom_state(tasks[0].name)) last_state = machine.current_state cb, args, kwargs = reaction @@ -243,10 +242,11 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin): next_event, *args, **kwargs) reaction, terminal = machine.process_event(next_event) self.assertFalse(terminal) - self.assertEqual(runner._GAME_OVER, machine.current_state) + self.assertEqual(builder.GAME_OVER, machine.current_state) # Should now be done... - self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name)) + self.assertEqual(st.SUCCESS, + runtime.storage.get_atom_state(tasks[0].name)) def test_builder_automatic_process(self): flow = lf.Flow("root") @@ -254,26 +254,25 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin): 1, task_cls=test_utils.TaskNoRequiresNoReturns) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, machine_runner, memory = rt.runner.build() - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) - transitions = list(machine_runner.run_iter('start')) - self.assertEqual((runner._UNDEFINED, st.RESUMING), transitions[0]) - self.assertEqual((runner._GAME_OVER, st.SUCCESS), transitions[-1]) - self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name)) + transitions = list(machine_runner.run_iter(builder.START)) + self.assertEqual((builder.UNDEFINED, st.RESUMING), transitions[0]) + self.assertEqual((builder.GAME_OVER, st.SUCCESS), transitions[-1]) + self.assertEqual(st.SUCCESS, + runtime.storage.get_atom_state(tasks[0].name)) def test_builder_automatic_process_failure(self): flow = lf.Flow("root") tasks = test_utils.make_many(1, task_cls=test_utils.NastyFailingTask) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, machine_runner, memory = rt.runner.build() - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) - transitions = list(machine_runner.run_iter('start')) - self.assertEqual((runner._GAME_OVER, st.FAILURE), transitions[-1]) + transitions = list(machine_runner.run_iter(builder.START)) + self.assertEqual((builder.GAME_OVER, st.FAILURE), transitions[-1]) self.assertEqual(1, len(memory.failures)) def test_builder_automatic_process_reverted(self): @@ -281,13 +280,13 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin): tasks = test_utils.make_many(1, task_cls=test_utils.TaskWithFailure) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, machine_runner, memory = rt.runner.build() - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) - transitions = list(machine_runner.run_iter('start')) - self.assertEqual((runner._GAME_OVER, st.REVERTED), transitions[-1]) - self.assertEqual(st.REVERTED, rt.storage.get_atom_state(tasks[0].name)) + transitions = list(machine_runner.run_iter(builder.START)) + self.assertEqual((builder.GAME_OVER, st.REVERTED), transitions[-1]) + self.assertEqual(st.REVERTED, + runtime.storage.get_atom_state(tasks[0].name)) def test_builder_expected_transition_occurrences(self): flow = lf.Flow("root") @@ -295,16 +294,16 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin): 10, task_cls=test_utils.TaskNoRequiresNoReturns) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, machine_runner, memory = rt.runner.build() - transitions = list(machine_runner.run_iter('start')) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) + transitions = list(machine_runner.run_iter(builder.START)) occurrences = dict((t, transitions.count(t)) for t in transitions) self.assertEqual(10, occurrences.get((st.SCHEDULING, st.WAITING))) self.assertEqual(10, occurrences.get((st.WAITING, st.ANALYZING))) self.assertEqual(9, occurrences.get((st.ANALYZING, st.SCHEDULING))) - self.assertEqual(1, occurrences.get((runner._GAME_OVER, st.SUCCESS))) - self.assertEqual(1, occurrences.get((runner._UNDEFINED, st.RESUMING))) + self.assertEqual(1, occurrences.get((builder.GAME_OVER, st.SUCCESS))) + self.assertEqual(1, occurrences.get((builder.UNDEFINED, st.RESUMING))) self.assertEqual(0, len(memory.next_nodes)) self.assertEqual(0, len(memory.not_done)) diff --git a/taskflow/tests/unit/test_types.py b/taskflow/tests/unit/test_types.py index 1d3f5410..79044923 100644 --- a/taskflow/tests/unit/test_types.py +++ b/taskflow/tests/unit/test_types.py @@ -140,6 +140,245 @@ class TreeTest(test.TestCase): p.add(tree.Node("human")) return a + def test_pformat_species(self): + root = self._make_species() + expected = """ +animal +|__mammal +| |__horse +| |__primate +| |__monkey +| |__human +|__reptile +""" + self.assertEqual(expected.strip(), root.pformat()) + + def test_pformat_flat(self): + root = tree.Node("josh") + root.add(tree.Node("josh.1")) + expected = """ +josh +|__josh.1 +""" + self.assertEqual(expected.strip(), root.pformat()) + + root[0].add(tree.Node("josh.1.1")) + expected = """ +josh +|__josh.1 + |__josh.1.1 +""" + self.assertEqual(expected.strip(), root.pformat()) + + root[0][0].add(tree.Node("josh.1.1.1")) + expected = """ +josh +|__josh.1 + |__josh.1.1 + |__josh.1.1.1 +""" + self.assertEqual(expected.strip(), root.pformat()) + + root[0][0][0].add(tree.Node("josh.1.1.1.1")) + expected = """ +josh +|__josh.1 + |__josh.1.1 + |__josh.1.1.1 + |__josh.1.1.1.1 +""" + self.assertEqual(expected.strip(), root.pformat()) + + def test_pformat_partial_species(self): + root = self._make_species() + + expected = """ +reptile +""" + self.assertEqual(expected.strip(), root[1].pformat()) + + expected = """ +mammal +|__horse +|__primate + |__monkey + |__human +""" + self.assertEqual(expected.strip(), root[0].pformat()) + + expected = """ +primate +|__monkey +|__human +""" + self.assertEqual(expected.strip(), root[0][1].pformat()) + + expected = """ +monkey +""" + self.assertEqual(expected.strip(), root[0][1][0].pformat()) + + def test_pformat(self): + + root = tree.Node("CEO") + + expected = """ +CEO +""" + + self.assertEqual(expected.strip(), root.pformat()) + + root.add(tree.Node("Infra")) + + expected = """ +CEO +|__Infra +""" + self.assertEqual(expected.strip(), root.pformat()) + + root[0].add(tree.Node("Infra.1")) + expected = """ +CEO +|__Infra + |__Infra.1 +""" + self.assertEqual(expected.strip(), root.pformat()) + + root.add(tree.Node("Mail")) + expected = """ +CEO +|__Infra +| |__Infra.1 +|__Mail +""" + self.assertEqual(expected.strip(), root.pformat()) + + root.add(tree.Node("Search")) + expected = """ +CEO +|__Infra +| |__Infra.1 +|__Mail +|__Search +""" + self.assertEqual(expected.strip(), root.pformat()) + + root[-1].add(tree.Node("Search.1")) + expected = """ +CEO +|__Infra +| |__Infra.1 +|__Mail +|__Search + |__Search.1 +""" + self.assertEqual(expected.strip(), root.pformat()) + + root[-1].add(tree.Node("Search.2")) + expected = """ +CEO +|__Infra +| |__Infra.1 +|__Mail +|__Search + |__Search.1 + |__Search.2 +""" + self.assertEqual(expected.strip(), root.pformat()) + + root[0].add(tree.Node("Infra.2")) + expected = """ +CEO +|__Infra +| |__Infra.1 +| |__Infra.2 +|__Mail +|__Search + |__Search.1 + |__Search.2 +""" + self.assertEqual(expected.strip(), root.pformat()) + + root[0].add(tree.Node("Infra.3")) + expected = """ +CEO +|__Infra +| |__Infra.1 +| |__Infra.2 +| |__Infra.3 +|__Mail +|__Search + |__Search.1 + |__Search.2 +""" + self.assertEqual(expected.strip(), root.pformat()) + + root[0][-1].add(tree.Node("Infra.3.1")) + expected = """ +CEO +|__Infra +| |__Infra.1 +| |__Infra.2 +| |__Infra.3 +| |__Infra.3.1 +|__Mail +|__Search + |__Search.1 + |__Search.2 +""" + self.assertEqual(expected.strip(), root.pformat()) + + root[-1][0].add(tree.Node("Search.1.1")) + expected = """ +CEO +|__Infra +| |__Infra.1 +| |__Infra.2 +| |__Infra.3 +| |__Infra.3.1 +|__Mail +|__Search + |__Search.1 + | |__Search.1.1 + |__Search.2 +""" + self.assertEqual(expected.strip(), root.pformat()) + + root[1].add(tree.Node("Mail.1")) + expected = """ +CEO +|__Infra +| |__Infra.1 +| |__Infra.2 +| |__Infra.3 +| |__Infra.3.1 +|__Mail +| |__Mail.1 +|__Search + |__Search.1 + | |__Search.1.1 + |__Search.2 +""" + self.assertEqual(expected.strip(), root.pformat()) + + root[1][0].add(tree.Node("Mail.1.1")) + expected = """ +CEO +|__Infra +| |__Infra.1 +| |__Infra.2 +| |__Infra.3 +| |__Infra.3.1 +|__Mail +| |__Mail.1 +| |__Mail.1.1 +|__Search + |__Search.1 + | |__Search.1.1 + |__Search.2 +""" + self.assertEqual(expected.strip(), root.pformat()) + def test_path(self): root = self._make_species() human = root.find("human") diff --git a/taskflow/tests/unit/test_utils_async_utils.py b/taskflow/tests/unit/test_utils_async_utils.py index 1f8b0119..bd8b9a6b 100644 --- a/taskflow/tests/unit/test_utils_async_utils.py +++ b/taskflow/tests/unit/test_utils_async_utils.py @@ -14,56 +14,8 @@ # License for the specific language governing permissions and limitations # under the License. -import futurist -import testtools - from taskflow import test from taskflow.utils import async_utils as au -from taskflow.utils import eventlet_utils as eu - - -class WaitForAnyTestsMixin(object): - timeout = 0.001 - - def test_waits_and_finishes(self): - def foo(): - pass - - with self._make_executor(2) as e: - fs = [e.submit(foo), e.submit(foo)] - # this test assumes that our foo will end within 10 seconds - done, not_done = au.wait_for_any(fs, 10) - self.assertIn(len(done), (1, 2)) - self.assertTrue(any(f in done for f in fs)) - - def test_not_done_futures(self): - fs = [futurist.Future(), futurist.Future()] - done, not_done = au.wait_for_any(fs, self.timeout) - self.assertEqual(len(done), 0) - self.assertEqual(len(not_done), 2) - - def test_mixed_futures(self): - f1 = futurist.Future() - f2 = futurist.Future() - f2.set_result(1) - done, not_done = au.wait_for_any([f1, f2], self.timeout) - self.assertEqual(len(done), 1) - self.assertEqual(len(not_done), 1) - self.assertIs(not_done.pop(), f1) - self.assertIs(done.pop(), f2) - - -@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') -class AsyncUtilsEventletTest(test.TestCase, - WaitForAnyTestsMixin): - def _make_executor(self, max_workers): - return futurist.GreenThreadPoolExecutor(max_workers=max_workers) - - -class AsyncUtilsThreadedTest(test.TestCase, - WaitForAnyTestsMixin): - def _make_executor(self, max_workers): - return futurist.ThreadPoolExecutor(max_workers=max_workers) class MakeCompletedFutureTest(test.TestCase): @@ -73,9 +25,3 @@ class MakeCompletedFutureTest(test.TestCase): future = au.make_completed_future(result) self.assertTrue(future.done()) self.assertIs(future.result(), result) - - -class AsyncUtilsSynchronousTest(test.TestCase, - WaitForAnyTestsMixin): - def _make_executor(self, max_workers): - return futurist.SynchronousExecutor() diff --git a/taskflow/tests/unit/test_utils_iter_utils.py b/taskflow/tests/unit/test_utils_iter_utils.py new file mode 100644 index 00000000..82d470f3 --- /dev/null +++ b/taskflow/tests/unit/test_utils_iter_utils.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import string + +from six.moves import range as compat_range + +from taskflow import test +from taskflow.utils import iter_utils + + +def forever_it(): + i = 0 + while True: + yield i + i += 1 + + +class IterUtilsTest(test.TestCase): + def test_find_first_match(self): + it = forever_it() + self.assertEqual(100, iter_utils.find_first_match(it, + lambda v: v == 100)) + + def test_find_first_match_not_found(self): + it = iter(string.ascii_lowercase) + self.assertIsNone(iter_utils.find_first_match(it, + lambda v: v == '')) + + def test_count(self): + self.assertEqual(0, iter_utils.count([])) + self.assertEqual(1, iter_utils.count(['a'])) + self.assertEqual(10, iter_utils.count(compat_range(0, 10))) + self.assertEqual(1000, iter_utils.count(compat_range(0, 1000))) + self.assertEqual(0, iter_utils.count(compat_range(0))) + self.assertEqual(0, iter_utils.count(compat_range(-1))) + + def test_while_is_not(self): + it = iter(string.ascii_lowercase) + self.assertEqual(['a'], + list(iter_utils.while_is_not(it, 'a'))) + it = iter(string.ascii_lowercase) + self.assertEqual(['a', 'b'], + list(iter_utils.while_is_not(it, 'b'))) + self.assertEqual(list(string.ascii_lowercase[2:]), + list(iter_utils.while_is_not(it, 'zzz'))) + it = iter(string.ascii_lowercase) + self.assertEqual(list(string.ascii_lowercase), + list(iter_utils.while_is_not(it, ''))) diff --git a/taskflow/tests/unit/worker_based/test_pipeline.py b/taskflow/tests/unit/worker_based/test_pipeline.py index a2075763..723bf6e4 100644 --- a/taskflow/tests/unit/worker_based/test_pipeline.py +++ b/taskflow/tests/unit/worker_based/test_pipeline.py @@ -15,6 +15,7 @@ # under the License. import futurist +from futurist import waiters from oslo_utils import uuidutils from taskflow.engines.action_engine import executor as base_executor @@ -78,7 +79,7 @@ class TestPipeline(test.TestCase): progress_callback = lambda *args, **kwargs: None f = executor.execute_task(t, uuidutils.generate_uuid(), {}, progress_callback=progress_callback) - async_utils.wait_for_any([f]) + waiters.wait_for_any([f]) event, result = f.result() self.assertEqual(1, result) @@ -94,7 +95,7 @@ class TestPipeline(test.TestCase): progress_callback = lambda *args, **kwargs: None f = executor.execute_task(t, uuidutils.generate_uuid(), {}, progress_callback=progress_callback) - async_utils.wait_for_any([f]) + waiters.wait_for_any([f]) action, result = f.result() self.assertIsInstance(result, failure.Failure) diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 266a0e8c..f4654676 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -123,7 +123,11 @@ class GiveBackRevert(task.Task): return value + 1 def revert(self, *args, **kwargs): - return kwargs.get('result') + 1 + result = kwargs.get('result') + # If this somehow fails, timeout, or other don't send back a + # valid result... + if isinstance(result, six.integer_types): + return result + 1 class FakeTask(object): diff --git a/taskflow/types/tree.py b/taskflow/types/tree.py index 4faa8291..94c009e1 100644 --- a/taskflow/types/tree.py +++ b/taskflow/types/tree.py @@ -22,6 +22,7 @@ import os import six +from taskflow.utils import iter_utils from taskflow.utils import misc @@ -77,11 +78,25 @@ class _BFSIter(object): class Node(object): """A n-ary node class that can be used to create tree structures.""" - # Constants used when pretty formatting the node (and its children). + #: Default string prefix used in :py:meth:`.pformat`. STARTING_PREFIX = "" + + #: Default string used to create empty space used in :py:meth:`.pformat`. EMPTY_SPACE_SEP = " " + HORIZONTAL_CONN = "__" + """ + Default string used to horizontally connect a node to its + parent (used in :py:meth:`.pformat`.). + """ + VERTICAL_CONN = "|" + """ + Default string used to vertically connect a node to its + parent (used in :py:meth:`.pformat`). + """ + + #: Default line separator used in :py:meth:`.pformat`. LINE_SEP = os.linesep def __init__(self, item, **kwargs): @@ -124,18 +139,22 @@ class Node(object): yield node node = node.parent - def find(self, item, only_direct=False, include_self=True): - """Returns the node for an item if it exists in this node. + def find_first_match(self, matcher, only_direct=False, include_self=True): + """Finds the *first* node that matching callback returns true. - This will search not only this node but also any children nodes and - finally if nothing is found then None is returned instead of a node - object. + This will search not only this node but also any children nodes (in + depth first order, from right to left) and finally if nothing is + matched then ``None`` is returned instead of a node object. - :param item: item to lookup. - :param only_direct: only look at current node and its direct children. + :param matcher: callback that takes one positional argument (a node) + and returns true if it matches desired node or false + if not. + :param only_direct: only look at current node and its + direct children (implies that this does not + search using depth first). :param include_self: include the current node during searching. - :returns: the node for an item if it exists in this node + :returns: the node that matched (or ``None``) """ if only_direct: if include_self: @@ -144,10 +163,26 @@ class Node(object): it = self.reverse_iter() else: it = self.dfs_iter(include_self=include_self) - for n in it: - if n.item == item: - return n - return None + return iter_utils.find_first_match(it, matcher) + + def find(self, item, only_direct=False, include_self=True): + """Returns the *first* node for an item if it exists in this node. + + This will search not only this node but also any children nodes (in + depth first order, from right to left) and finally if nothing is + matched then ``None`` is returned instead of a node object. + + :param item: item to look for. + :param only_direct: only look at current node and its + direct children (implies that this does not + search using depth first). + :param include_self: include the current node during searching. + + :returns: the node that matched provided item (or ``None``) + """ + return self.find_first_match(lambda n: n.item == item, + only_direct=only_direct, + include_self=include_self) def disassociate(self): """Removes this node from its parent (if any). @@ -176,7 +211,9 @@ class Node(object): the normally returned *removed* node object. :param item: item to lookup. - :param only_direct: only look at current node and its direct children. + :param only_direct: only look at current node and its + direct children (implies that this does not + search using depth first). :param include_self: include the current node during searching. """ node = self.find(item, only_direct=only_direct, @@ -200,8 +237,11 @@ class Node(object): # NOTE(harlowja): 0 is the right most index, len - 1 is the left most return self._children[index] - def pformat(self, stringify_node=None): - """Recursively formats a node into a nice string representation. + def pformat(self, stringify_node=None, + linesep=LINE_SEP, vertical_conn=VERTICAL_CONN, + horizontal_conn=HORIZONTAL_CONN, empty_space=EMPTY_SPACE_SEP, + starting_prefix=STARTING_PREFIX): + """Formats this node + children into a nice string representation. **Example**:: @@ -220,33 +260,73 @@ class Node(object): |__Mobile |__Mail """ - def _inner_pformat(node, level, stringify_node): - if level == 0: - yield stringify_node(node) - prefix = self.STARTING_PREFIX - else: - yield self.HORIZONTAL_CONN + stringify_node(node) - prefix = self.EMPTY_SPACE_SEP * len(self.HORIZONTAL_CONN) - child_count = node.child_count() - for (i, child) in enumerate(node): - for (j, text) in enumerate(_inner_pformat(child, - level + 1, - stringify_node)): - if j == 0 or i + 1 < child_count: - text = prefix + self.VERTICAL_CONN + text - else: - text = prefix + self.EMPTY_SPACE_SEP + text - yield text if stringify_node is None: # Default to making a unicode string out of the nodes item... stringify_node = lambda node: six.text_type(node.item) - expected_lines = self.child_count(only_direct=False) - accumulator = six.StringIO() - for i, line in enumerate(_inner_pformat(self, 0, stringify_node)): - accumulator.write(line) - if i < expected_lines: - accumulator.write(self.LINE_SEP) - return accumulator.getvalue() + expected_lines = self.child_count(only_direct=False) + 1 + buff = six.StringIO() + conn = vertical_conn + horizontal_conn + stop_at_parent = self + for i, node in enumerate(self.dfs_iter(include_self=True), 1): + prefix = [] + connected_to_parent = False + last_node = node + # Walk through *most* of this nodes parents, and form the expected + # prefix that each parent should require, repeat this until we + # hit the root node (self) and use that as our nodes prefix + # string... + parent_node_it = iter_utils.while_is_not( + node.path_iter(include_self=True), stop_at_parent) + for j, parent_node in enumerate(parent_node_it): + if parent_node is stop_at_parent: + if j > 0: + if not connected_to_parent: + prefix.append(conn) + connected_to_parent = True + else: + # If the node was connected already then it must + # have had more than one parent, so we want to put + # the right final starting prefix on (which may be + # a empty space or another vertical connector)... + last_node = self._children[-1] + m = last_node.find_first_match(lambda n: n is node, + include_self=False, + only_direct=False) + if m is not None: + prefix.append(empty_space) + else: + prefix.append(vertical_conn) + elif parent_node is node: + # Skip ourself... (we only include ourself so that + # we can use the 'j' variable to determine if the only + # node requested is ourself in the first place); used + # in the first conditional here... + pass + else: + if not connected_to_parent: + prefix.append(conn) + spaces = len(horizontal_conn) + connected_to_parent = True + else: + # If we have already been connected to our parent + # then determine if this current node is the last + # node of its parent (and in that case just put + # on more spaces), otherwise put a vertical connector + # on and less spaces... + if parent_node[-1] is not last_node: + prefix.append(vertical_conn) + spaces = len(horizontal_conn) + else: + spaces = len(conn) + prefix.append(empty_space * spaces) + last_node = parent_node + prefix.append(starting_prefix) + for prefix_piece in reversed(prefix): + buff.write(prefix_piece) + buff.write(stringify_node(node)) + if i != expected_lines: + buff.write(linesep) + return buff.getvalue() def child_count(self, only_direct=True): """Returns how many children this node has. @@ -257,10 +337,7 @@ class Node(object): NOTE(harlowja): it does not account for the current node in this count. """ if not only_direct: - count = 0 - for _node in self.dfs_iter(): - count += 1 - return count + return iter_utils.count(self.dfs_iter()) return len(self._children) def __iter__(self): diff --git a/taskflow/utils/async_utils.py b/taskflow/utils/async_utils.py index c4b114b8..cc24d215 100644 --- a/taskflow/utils/async_utils.py +++ b/taskflow/utils/async_utils.py @@ -14,20 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -from concurrent import futures as _futures -from concurrent.futures import _base import futurist -from oslo_utils import importutils - -greenthreading = importutils.try_import('eventlet.green.threading') - -from taskflow.utils import eventlet_utils as eu - - -_DONE_STATES = frozenset([ - _base.CANCELLED_AND_NOTIFIED, - _base.FINISHED, -]) def make_completed_future(result): @@ -35,78 +22,3 @@ def make_completed_future(result): future = futurist.Future() future.set_result(result) return future - - -def wait_for_any(fs, timeout=None): - """Wait for one of the futures to complete. - - Works correctly with both green and non-green futures (but not both - together, since this can't be guaranteed to avoid dead-lock due to how - the waiting implementations are different when green threads are being - used). - - Returns pair (done futures, not done futures). - """ - # TODO(harlowja): remove this when - # https://review.openstack.org/#/c/196269/ is merged and is made - # available. - green_fs = sum(1 for f in fs if isinstance(f, futurist.GreenFuture)) - if not green_fs: - return _futures.wait(fs, - timeout=timeout, - return_when=_futures.FIRST_COMPLETED) - else: - non_green_fs = len(fs) - green_fs - if non_green_fs: - raise RuntimeError("Can not wait on %s green futures and %s" - " non-green futures in the same `wait_for_any`" - " call" % (green_fs, non_green_fs)) - else: - return _wait_for_any_green(fs, timeout=timeout) - - -class _GreenWaiter(object): - """Provides the event that wait_for_any() blocks on.""" - def __init__(self): - self.event = greenthreading.Event() - - def add_result(self, future): - self.event.set() - - def add_exception(self, future): - self.event.set() - - def add_cancelled(self, future): - self.event.set() - - -def _partition_futures(fs): - done = set() - not_done = set() - for f in fs: - if f._state in _DONE_STATES: - done.add(f) - else: - not_done.add(f) - return done, not_done - - -def _wait_for_any_green(fs, timeout=None): - eu.check_for_eventlet(RuntimeError('Eventlet is needed to wait on' - ' green futures')) - - with _base._AcquireFutures(fs): - done, not_done = _partition_futures(fs) - if done: - return _base.DoneAndNotDoneFutures(done, not_done) - waiter = _GreenWaiter() - for f in fs: - f._waiters.append(waiter) - - waiter.event.wait(timeout) - for f in fs: - f._waiters.remove(waiter) - - with _base._AcquireFutures(fs): - done, not_done = _partition_futures(fs) - return _base.DoneAndNotDoneFutures(done, not_done) diff --git a/taskflow/utils/iter_utils.py b/taskflow/utils/iter_utils.py new file mode 100644 index 00000000..68810e8c --- /dev/null +++ b/taskflow/utils/iter_utils.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +def count(it): + """Returns how many values in the iterator (depletes the iterator).""" + return sum(1 for _value in it) + + +def find_first_match(it, matcher, not_found_value=None): + """Searches iterator for first value that matcher callback returns true.""" + for value in it: + if matcher(value): + return value + return not_found_value + + +def while_is_not(it, stop_value): + """Yields given values from iterator until stop value is passed. + + This uses the ``is`` operator to determine equivalency (and not the + ``==`` operator). + """ + for value in it: + yield value + if value is stop_value: + break diff --git a/test-requirements.txt b/test-requirements.txt index 5a06c57d..e9bca9bc 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,7 +3,7 @@ # process, which may cause wedges in the gate later. hacking<0.11,>=0.10.0 -oslotest>=1.7.0 # Apache-2.0 +oslotest>=1.10.0 # Apache-2.0 mock>=1.2 testtools>=1.4.0 testscenarios>=0.4 @@ -22,9 +22,9 @@ kazoo>=2.2 redis>=2.10.0 # Used for testing database persistence backends. -SQLAlchemy<1.1.0,>=0.9.7 -alembic>=0.7.2 -psycopg2 +SQLAlchemy<1.1.0,>=0.9.9 +alembic>=0.8.0 +psycopg2>=2.5 PyMySQL>=0.6.2 # MIT License # Used for making sure we still work with eventlet. diff --git a/tools/state_graph.py b/tools/state_graph.py index c9bdd0b1..5530a469 100755 --- a/tools/state_graph.py +++ b/tools/state_graph.py @@ -31,12 +31,12 @@ import pydot from automaton import machines -from taskflow.engines.action_engine import runner +from taskflow.engines.action_engine import builder from taskflow.engines.worker_based import protocol from taskflow import states -# This is just needed to get at the runner builder object (we will not +# This is just needed to get at the machine object (we will not # actually be running it...). class DummyRuntime(object): def __init__(self): @@ -134,9 +134,9 @@ def main(): list(states._ALLOWED_RETRY_TRANSITIONS)) elif options.engines: source_type = "Engines" - r = runner.Runner(DummyRuntime(), mock.MagicMock()) - source, memory = r.build() - internal_states.extend(runner._META_STATES) + b = builder.MachineBuilder(DummyRuntime(), mock.MagicMock()) + source, memory = b.build() + internal_states.extend(builder.META_STATES) ordering = 'out' elif options.wbe_requests: source_type = "WBE requests"