diff --git a/doc/source/engines.rst b/doc/source/engines.rst index e2908174d..04a4bb86b 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -258,9 +258,10 @@ Execution The graph (and helper objects) previously created are now used for guiding further execution (see :py:func:`~taskflow.engines.base.Engine.run`). The flow is put into the ``RUNNING`` :doc:`state ` and a -:py:class:`~taskflow.engines.action_engine.runner.Runner` implementation -object starts to take over and begins going through the stages listed -below (for a more visual diagram/representation see +:py:class:`~taskflow.engines.action_engine.builder.MachineBuilder` state +machine object and runner object are built (using the `automaton`_ library). +That machine and associated runner then starts to take over and begins going +through the stages listed below (for a more visual diagram/representation see the :ref:`engine state diagram `). .. note:: @@ -338,8 +339,8 @@ above stages will be restarted and resuming will occur). Finishing --------- -At this point the -:py:class:`~taskflow.engines.action_engine.runner.Runner` has +At this point the machine (and runner) that was built using the +:py:class:`~taskflow.engines.action_engine.builder.MachineBuilder` class has now finished successfully, failed, or the execution was suspended. Depending on which one of these occurs will cause the flow to enter a new state (typically one of ``FAILURE``, ``SUSPENDED``, ``SUCCESS`` or ``REVERTED``). @@ -365,9 +366,9 @@ this performs is a transition of the flow state from ``RUNNING`` into a ``SUSPENDING`` state (which will later transition into a ``SUSPENDED`` state). Since an engine may be remotely executing atoms (or locally executing them) and there is currently no preemption what occurs is that the engines -:py:class:`~taskflow.engines.action_engine.runner.Runner` state machine will -detect this transition into ``SUSPENDING`` has occurred and the state -machine will avoid scheduling new work (it will though let active work +:py:class:`~taskflow.engines.action_engine.builder.MachineBuilder` state +machine will detect this transition into ``SUSPENDING`` has occurred and the +state machine will avoid scheduling new work (it will though let active work continue). After the current work has finished the engine will transition from ``SUSPENDING`` into ``SUSPENDED`` and return from its :py:func:`~taskflow.engines.base.Engine.run` method. @@ -444,10 +445,10 @@ Components cycle). .. automodule:: taskflow.engines.action_engine.analyzer +.. automodule:: taskflow.engines.action_engine.builder .. automodule:: taskflow.engines.action_engine.compiler .. automodule:: taskflow.engines.action_engine.completer .. automodule:: taskflow.engines.action_engine.executor -.. automodule:: taskflow.engines.action_engine.runner .. automodule:: taskflow.engines.action_engine.runtime .. automodule:: taskflow.engines.action_engine.scheduler .. autoclass:: taskflow.engines.action_engine.scopes.ScopeWalker @@ -462,6 +463,7 @@ Hierarchy taskflow.engines.worker_based.engine.WorkerBasedActionEngine :parts: 1 +.. _automaton: http://docs.openstack.org/developer/automaton/ .. _multiprocessing: https://docs.python.org/2/library/multiprocessing.html .. _future: https://docs.python.org/dev/library/concurrent.futures.html#future-objects .. _executor: https://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.Executor diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/builder.py similarity index 66% rename from taskflow/engines/action_engine/runner.py rename to taskflow/engines/action_engine/builder.py index f02f3f09b..f46d4a1f3 100644 --- a/taskflow/engines/action_engine/runner.py +++ b/taskflow/engines/action_engine/builder.py @@ -16,35 +16,34 @@ from automaton import machines -from automaton import runners from taskflow import logging from taskflow import states as st from taskflow.types import failure -# Waiting state timeout (in seconds). -_WAITING_TIMEOUT = 60 +# Default waiting state timeout (in seconds). +WAITING_TIMEOUT = 60 # Meta states the state machine uses. -_UNDEFINED = 'UNDEFINED' -_GAME_OVER = 'GAME_OVER' -_META_STATES = (_GAME_OVER, _UNDEFINED) +UNDEFINED = 'UNDEFINED' +GAME_OVER = 'GAME_OVER' +META_STATES = (GAME_OVER, UNDEFINED) # Event name constants the state machine uses. -_SCHEDULE = 'schedule_next' -_WAIT = 'wait_finished' -_ANALYZE = 'examine_finished' -_FINISH = 'completed' -_FAILED = 'failed' -_SUSPENDED = 'suspended' -_SUCCESS = 'success' -_REVERTED = 'reverted' -_START = 'start' +SCHEDULE = 'schedule_next' +WAIT = 'wait_finished' +ANALYZE = 'examine_finished' +FINISH = 'completed' +FAILED = 'failed' +SUSPENDED = 'suspended' +SUCCESS = 'success' +REVERTED = 'reverted' +START = 'start' LOG = logging.getLogger(__name__) -class _MachineMemory(object): +class MachineMemory(object): """State machine memory.""" def __init__(self): @@ -54,31 +53,31 @@ class _MachineMemory(object): self.done = set() -class Runner(object): - """State machine *builder* + *runner* that powers the engine components. +class MachineBuilder(object): + """State machine *builder* that powers the engine components. NOTE(harlowja): the machine (states and events that will trigger transitions) that this builds is represented by the following table:: +--------------+------------------+------------+----------+---------+ - Start | Event | End | On Enter | On Exit + | Start | Event | End | On Enter | On Exit | +--------------+------------------+------------+----------+---------+ - ANALYZING | completed | GAME_OVER | | - ANALYZING | schedule_next | SCHEDULING | | - ANALYZING | wait_finished | WAITING | | - FAILURE[$] | | | | - GAME_OVER | failed | FAILURE | | - GAME_OVER | reverted | REVERTED | | - GAME_OVER | success | SUCCESS | | - GAME_OVER | suspended | SUSPENDED | | - RESUMING | schedule_next | SCHEDULING | | - REVERTED[$] | | | | - SCHEDULING | wait_finished | WAITING | | - SUCCESS[$] | | | | - SUSPENDED[$] | | | | - UNDEFINED[^] | start | RESUMING | | - WAITING | examine_finished | ANALYZING | | + | ANALYZING | completed | GAME_OVER | . | . | + | ANALYZING | schedule_next | SCHEDULING | . | . | + | ANALYZING | wait_finished | WAITING | . | . | + | FAILURE[$] | . | . | . | . | + | GAME_OVER | failed | FAILURE | . | . | + | GAME_OVER | reverted | REVERTED | . | . | + | GAME_OVER | success | SUCCESS | . | . | + | GAME_OVER | suspended | SUSPENDED | . | . | + | RESUMING | schedule_next | SCHEDULING | . | . | + | REVERTED[$] | . | . | . | . | + | SCHEDULING | wait_finished | WAITING | . | . | + | SUCCESS[$] | . | . | . | . | + | SUSPENDED[$] | . | . | . | . | + | UNDEFINED[^] | start | RESUMING | . | . | + | WAITING | examine_finished | ANALYZING | . | . | +--------------+------------------+------------+----------+---------+ Between any of these yielded states (minus ``GAME_OVER`` and ``UNDEFINED``) @@ -91,11 +90,6 @@ class Runner(object): tasks in parallel, this enables parallel running and/or reversion. """ - # Informational states this action yields while running, not useful to - # have the engine record but useful to provide to end-users when doing - # execution iterations. - ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING) - def __init__(self, runtime, waiter): self._runtime = runtime self._analyzer = runtime.analyzer @@ -104,21 +98,21 @@ class Runner(object): self._storage = runtime.storage self._waiter = waiter - def runnable(self): - """Checks if the storage says the flow is still runnable/running.""" - return self._storage.get_flow_state() == st.RUNNING - def build(self, timeout=None): - """Builds a state-machine (that can be/is used during running).""" + """Builds a state-machine (that is used during running).""" - memory = _MachineMemory() + memory = MachineMemory() if timeout is None: - timeout = _WAITING_TIMEOUT + timeout = WAITING_TIMEOUT # Cache some local functions/methods... do_schedule = self._scheduler.schedule do_complete = self._completer.complete + def is_runnable(): + # Checks if the storage says the flow is still runnable... + return self._storage.get_flow_state() == st.RUNNING + def iter_next_nodes(target_node=None): # Yields and filters and tweaks the next nodes to execute... maybe_nodes = self._analyzer.get_next_nodes(node=target_node) @@ -134,7 +128,7 @@ class Runner(object): # that are now ready to be ran. memory.next_nodes.update(self._completer.resume()) memory.next_nodes.update(iter_next_nodes()) - return _SCHEDULE + return SCHEDULE def game_over(old_state, new_state, event): # This reaction function is mainly a intermediary delegation @@ -142,13 +136,13 @@ class Runner(object): # the appropriate handler that will deal with the memory values, # it is *always* called before the final state is entered. if memory.failures: - return _FAILED + return FAILED if any(1 for node in iter_next_nodes()): - return _SUSPENDED + return SUSPENDED elif self._analyzer.is_success(): - return _SUCCESS + return SUCCESS else: - return _REVERTED + return REVERTED def schedule(old_state, new_state, event): # This reaction function starts to schedule the memory's next @@ -156,14 +150,14 @@ class Runner(object): # if the user of this engine has requested the engine/storage # that holds this information to stop or suspend); handles failures # that occur during this process safely... - if self.runnable() and memory.next_nodes: + if is_runnable() and memory.next_nodes: not_done, failures = do_schedule(memory.next_nodes) if not_done: memory.not_done.update(not_done) if failures: memory.failures.extend(failures) memory.next_nodes.clear() - return _WAIT + return WAIT def wait(old_state, new_state, event): # TODO(harlowja): maybe we should start doing 'yield from' this @@ -173,7 +167,7 @@ class Runner(object): done, not_done = self._waiter(memory.not_done, timeout=timeout) memory.done.update(done) memory.not_done = not_done - return _ANALYZE + return ANALYZE def analyze(old_state, new_state, event): # This reaction function is responsible for analyzing all nodes @@ -215,13 +209,13 @@ class Runner(object): memory.failures.append(failure.Failure()) else: next_nodes.update(more_nodes) - if self.runnable() and next_nodes and not memory.failures: + if is_runnable() and next_nodes and not memory.failures: memory.next_nodes.update(next_nodes) - return _SCHEDULE + return SCHEDULE elif memory.not_done: - return _WAIT + return WAIT else: - return _FINISH + return FINISH def on_exit(old_state, event): LOG.debug("Exiting old state '%s' in response to event '%s'", @@ -239,8 +233,8 @@ class Runner(object): watchers['on_enter'] = on_enter m = machines.FiniteMachine() - m.add_state(_GAME_OVER, **watchers) - m.add_state(_UNDEFINED, **watchers) + m.add_state(GAME_OVER, **watchers) + m.add_state(UNDEFINED, **watchers) m.add_state(st.ANALYZING, **watchers) m.add_state(st.RESUMING, **watchers) m.add_state(st.REVERTED, terminal=True, **watchers) @@ -249,38 +243,25 @@ class Runner(object): m.add_state(st.SUSPENDED, terminal=True, **watchers) m.add_state(st.WAITING, **watchers) m.add_state(st.FAILURE, terminal=True, **watchers) - m.default_start_state = _UNDEFINED + m.default_start_state = UNDEFINED - m.add_transition(_GAME_OVER, st.REVERTED, _REVERTED) - m.add_transition(_GAME_OVER, st.SUCCESS, _SUCCESS) - m.add_transition(_GAME_OVER, st.SUSPENDED, _SUSPENDED) - m.add_transition(_GAME_OVER, st.FAILURE, _FAILED) - m.add_transition(_UNDEFINED, st.RESUMING, _START) - m.add_transition(st.ANALYZING, _GAME_OVER, _FINISH) - m.add_transition(st.ANALYZING, st.SCHEDULING, _SCHEDULE) - m.add_transition(st.ANALYZING, st.WAITING, _WAIT) - m.add_transition(st.RESUMING, st.SCHEDULING, _SCHEDULE) - m.add_transition(st.SCHEDULING, st.WAITING, _WAIT) - m.add_transition(st.WAITING, st.ANALYZING, _ANALYZE) + m.add_transition(GAME_OVER, st.REVERTED, REVERTED) + m.add_transition(GAME_OVER, st.SUCCESS, SUCCESS) + m.add_transition(GAME_OVER, st.SUSPENDED, SUSPENDED) + m.add_transition(GAME_OVER, st.FAILURE, FAILED) + m.add_transition(UNDEFINED, st.RESUMING, START) + m.add_transition(st.ANALYZING, GAME_OVER, FINISH) + m.add_transition(st.ANALYZING, st.SCHEDULING, SCHEDULE) + m.add_transition(st.ANALYZING, st.WAITING, WAIT) + m.add_transition(st.RESUMING, st.SCHEDULING, SCHEDULE) + m.add_transition(st.SCHEDULING, st.WAITING, WAIT) + m.add_transition(st.WAITING, st.ANALYZING, ANALYZE) - m.add_reaction(_GAME_OVER, _FINISH, game_over) - m.add_reaction(st.ANALYZING, _ANALYZE, analyze) - m.add_reaction(st.RESUMING, _START, resume) - m.add_reaction(st.SCHEDULING, _SCHEDULE, schedule) - m.add_reaction(st.WAITING, _WAIT, wait) + m.add_reaction(GAME_OVER, FINISH, game_over) + m.add_reaction(st.ANALYZING, ANALYZE, analyze) + m.add_reaction(st.RESUMING, START, resume) + m.add_reaction(st.SCHEDULING, SCHEDULE, schedule) + m.add_reaction(st.WAITING, WAIT, wait) m.freeze() - - r = runners.FiniteRunner(m) - return (m, r, memory) - - def run_iter(self, timeout=None): - """Runs iteratively using a locally built state machine.""" - machine, runner, memory = self.build(timeout=timeout) - for (_prior_state, new_state) in runner.run_iter(_START): - # NOTE(harlowja): skip over meta-states. - if new_state not in _META_STATES: - if new_state == st.FAILURE: - yield (new_state, memory.failures) - else: - yield (new_state, []) + return (m, memory) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index ef557f93c..309e1a32b 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -19,6 +19,7 @@ import contextlib import itertools import threading +from automaton import runners from concurrent import futures import fasteners import networkx as nx @@ -26,6 +27,7 @@ from oslo_utils import excutils from oslo_utils import strutils import six +from taskflow.engines.action_engine import builder from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import executor from taskflow.engines.action_engine import runtime @@ -59,9 +61,9 @@ class ActionEngine(base.Engine): This engine compiles the flow (and any subflows) into a compilation unit which contains the full runtime definition to be executed and then uses - this compilation unit in combination with the executor, runtime, runner - and storage classes to attempt to run your flow (and any subflows & - contained atoms) to completion. + this compilation unit in combination with the executor, runtime, machine + builder and storage classes to attempt to run your flow (and any + subflows & contained atoms) to completion. NOTE(harlowja): during this process it is permissible and valid to have a task or multiple tasks in the execution graph fail (at the same time even), @@ -77,6 +79,15 @@ class ActionEngine(base.Engine): failure/s that were captured (if any) to get reraised. """ + IGNORABLE_STATES = frozenset( + itertools.chain([states.SCHEDULING, states.WAITING, states.RESUMING, + states.ANALYZING], builder.META_STATES)) + """ + Informational states this engines internal machine yields back while + running, not useful to have the engine record but useful to provide to + end-users when doing execution iterations via :py:meth:`.run_iter`. + """ + def __init__(self, flow, flow_detail, backend, options): super(ActionEngine, self).__init__(flow, flow_detail, backend, options) self._runtime = None @@ -151,20 +162,20 @@ class ActionEngine(base.Engine): def run_iter(self, timeout=None): """Runs the engine using iteration (or die trying). - :param timeout: timeout to wait for any tasks to complete (this timeout + :param timeout: timeout to wait for any atoms to complete (this timeout will be used during the waiting period that occurs after the - waiting state is yielded when unfinished tasks are being waited - for). + waiting state is yielded when unfinished atoms are being waited + on). Instead of running to completion in a blocking manner, this will return a generator which will yield back the various states that the engine is going through (and can be used to run multiple engines at - once using a generator per engine). the iterator returned also - responds to the send() method from pep-0342 and will attempt to suspend - itself if a truthy value is sent in (the suspend may be delayed until - all active tasks have finished). + once using a generator per engine). The iterator returned also + responds to the ``send()`` method from :pep:`0342` and will attempt to + suspend itself if a truthy value is sent in (the suspend may be + delayed until all active atoms have finished). - NOTE(harlowja): using the run_iter method will **not** retain the + NOTE(harlowja): using the ``run_iter`` method will **not** retain the engine lock while executing so the user should ensure that there is only one entity using a returned engine iterator (one per engine) at a given time. @@ -172,19 +183,24 @@ class ActionEngine(base.Engine): self.compile() self.prepare() self.validate() - runner = self._runtime.runner last_state = None with _start_stop(self._task_executor, self._retry_executor): self._change_state(states.RUNNING) try: closed = False - for (last_state, failures) in runner.run_iter(timeout=timeout): - if failures: - failure.Failure.reraise_if_any(failures) + machine, memory = self._runtime.builder.build(timeout=timeout) + r = runners.FiniteRunner(machine) + for (_prior_state, new_state) in r.run_iter(builder.START): + last_state = new_state + # NOTE(harlowja): skip over meta-states. + if new_state in builder.META_STATES: + continue + if new_state == states.FAILURE: + failure.Failure.reraise_if_any(memory.failures) if closed: continue try: - try_suspend = yield last_state + try_suspend = yield new_state except GeneratorExit: # The generator was closed, attempt to suspend and # continue looping until we have cleanly closed up @@ -198,9 +214,8 @@ class ActionEngine(base.Engine): with excutils.save_and_reraise_exception(): self._change_state(states.FAILURE) else: - ignorable_states = getattr(runner, 'ignorable_states', []) - if last_state and last_state not in ignorable_states: - self._change_state(last_state) + if last_state and last_state not in self.IGNORABLE_STATES: + self._change_state(new_state) if last_state not in self.NO_RERAISING_STATES: it = itertools.chain( six.itervalues(self.storage.get_failures()), diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 2841968ae..d97ba9679 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -21,11 +21,11 @@ from futurist import waiters from taskflow.engines.action_engine.actions import retry as ra from taskflow.engines.action_engine.actions import task as ta from taskflow.engines.action_engine import analyzer as an +from taskflow.engines.action_engine import builder as bu from taskflow.engines.action_engine import completer as co -from taskflow.engines.action_engine import runner as ru from taskflow.engines.action_engine import scheduler as sched from taskflow.engines.action_engine import scopes as sc -from taskflow import flow as flow_type +from taskflow import flow from taskflow import states as st from taskflow import task from taskflow.utils import misc @@ -89,7 +89,7 @@ class Runtime(object): # is able to run (or should not) ensure we retain it and use # it later as needed. u_v_data = execution_graph.adj[previous_atom][atom] - u_v_decider = u_v_data.get(flow_type.LINK_DECIDER) + u_v_decider = u_v_data.get(flow.LINK_DECIDER) if u_v_decider is not None: edge_deciders[previous_atom.name] = u_v_decider metadata['scope_walker'] = walker @@ -114,8 +114,8 @@ class Runtime(object): return an.Analyzer(self) @misc.cachedproperty - def runner(self): - return ru.Runner(self, waiters.wait_for_any) + def builder(self): + return bu.MachineBuilder(self, waiters.wait_for_any) @misc.cachedproperty def completer(self): diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_builder.py similarity index 57% rename from taskflow/tests/unit/action_engine/test_runner.py rename to taskflow/tests/unit/action_engine/test_builder.py index bff74cb9e..b4067449f 100644 --- a/taskflow/tests/unit/action_engine/test_runner.py +++ b/taskflow/tests/unit/action_engine/test_builder.py @@ -15,11 +15,12 @@ # under the License. from automaton import exceptions as excp +from automaton import runners import six +from taskflow.engines.action_engine import builder from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import executor -from taskflow.engines.action_engine import runner from taskflow.engines.action_engine import runtime from taskflow.patterns import linear_flow as lf from taskflow import states as st @@ -30,7 +31,8 @@ from taskflow.types import notifier from taskflow.utils import persistence_utils as pu -class _RunnerTestMixin(object): +class BuildersTest(test.TestCase): + def _make_runtime(self, flow, initial_state=None): compilation = compiler.PatternCompiler(flow).compile() flow_detail = pu.create_flow_detail(flow) @@ -51,17 +53,11 @@ class _RunnerTestMixin(object): r.compile() return r - -class RunnerTest(test.TestCase, _RunnerTestMixin): - def test_running(self): - flow = lf.Flow("root") - flow.add(*test_utils.make_many(1)) - - rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.runnable()) - - rt = self._make_runtime(flow, initial_state=st.SUSPENDED) - self.assertFalse(rt.runner.runnable()) + def _make_machine(self, flow, initial_state=None): + runtime = self._make_runtime(flow, initial_state=initial_state) + machine, memory = runtime.builder.build() + machine_runner = runners.FiniteRunner(machine) + return (runtime, machine, memory, machine_runner) def test_run_iterations(self): flow = lf.Flow("root") @@ -69,29 +65,32 @@ class RunnerTest(test.TestCase, _RunnerTestMixin): 1, task_cls=test_utils.TaskNoRequiresNoReturns) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) - it = rt.runner.run_iter() - state, failures = six.next(it) - self.assertEqual(st.RESUMING, state) - self.assertEqual(0, len(failures)) + it = machine_runner.run_iter(builder.START) + prior_state, new_state = six.next(it) + self.assertEqual(st.RESUMING, new_state) + self.assertEqual(0, len(memory.failures)) - state, failures = six.next(it) - self.assertEqual(st.SCHEDULING, state) - self.assertEqual(0, len(failures)) + prior_state, new_state = six.next(it) + self.assertEqual(st.SCHEDULING, new_state) + self.assertEqual(0, len(memory.failures)) - state, failures = six.next(it) - self.assertEqual(st.WAITING, state) - self.assertEqual(0, len(failures)) + prior_state, new_state = six.next(it) + self.assertEqual(st.WAITING, new_state) + self.assertEqual(0, len(memory.failures)) - state, failures = six.next(it) - self.assertEqual(st.ANALYZING, state) - self.assertEqual(0, len(failures)) + prior_state, new_state = six.next(it) + self.assertEqual(st.ANALYZING, new_state) + self.assertEqual(0, len(memory.failures)) - state, failures = six.next(it) - self.assertEqual(st.SUCCESS, state) - self.assertEqual(0, len(failures)) + prior_state, new_state = six.next(it) + self.assertEqual(builder.GAME_OVER, new_state) + self.assertEqual(0, len(memory.failures)) + prior_state, new_state = six.next(it) + self.assertEqual(st.SUCCESS, new_state) + self.assertEqual(0, len(memory.failures)) self.assertRaises(StopIteration, six.next, it) @@ -101,15 +100,15 @@ class RunnerTest(test.TestCase, _RunnerTestMixin): 1, task_cls=test_utils.TaskWithFailure) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) - transitions = list(rt.runner.run_iter()) - state, failures = transitions[-1] - self.assertEqual(st.REVERTED, state) - self.assertEqual([], failures) - - self.assertEqual(st.REVERTED, rt.storage.get_atom_state(tasks[0].name)) + transitions = list(machine_runner.run_iter(builder.START)) + prior_state, new_state = transitions[-1] + self.assertEqual(st.REVERTED, new_state) + self.assertEqual([], memory.failures) + self.assertEqual(st.REVERTED, + runtime.storage.get_atom_state(tasks[0].name)) def test_run_iterations_failure(self): flow = lf.Flow("root") @@ -117,18 +116,17 @@ class RunnerTest(test.TestCase, _RunnerTestMixin): 1, task_cls=test_utils.NastyFailingTask) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) - transitions = list(rt.runner.run_iter()) - state, failures = transitions[-1] - self.assertEqual(st.FAILURE, state) - self.assertEqual(1, len(failures)) - failure = failures[0] + transitions = list(machine_runner.run_iter(builder.START)) + prior_state, new_state = transitions[-1] + self.assertEqual(st.FAILURE, new_state) + self.assertEqual(1, len(memory.failures)) + failure = memory.failures[0] self.assertTrue(failure.check(RuntimeError)) - self.assertEqual(st.REVERT_FAILURE, - rt.storage.get_atom_state(tasks[0].name)) + runtime.storage.get_atom_state(tasks[0].name)) def test_run_iterations_suspended(self): flow = lf.Flow("root") @@ -136,20 +134,22 @@ class RunnerTest(test.TestCase, _RunnerTestMixin): 2, task_cls=test_utils.TaskNoRequiresNoReturns) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) transitions = [] - for state, failures in rt.runner.run_iter(): - transitions.append((state, failures)) - if state == st.ANALYZING: - rt.storage.set_flow_state(st.SUSPENDED) + for prior_state, new_state in machine_runner.run_iter(builder.START): + transitions.append((new_state, memory.failures)) + if new_state == st.ANALYZING: + runtime.storage.set_flow_state(st.SUSPENDED) state, failures = transitions[-1] self.assertEqual(st.SUSPENDED, state) self.assertEqual([], failures) - self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name)) - self.assertEqual(st.PENDING, rt.storage.get_atom_state(tasks[1].name)) + self.assertEqual(st.SUCCESS, + runtime.storage.get_atom_state(tasks[0].name)) + self.assertEqual(st.PENDING, + runtime.storage.get_atom_state(tasks[1].name)) def test_run_iterations_suspended_failure(self): flow = lf.Flow("root") @@ -160,46 +160,44 @@ class RunnerTest(test.TestCase, _RunnerTestMixin): 1, task_cls=test_utils.TaskNoRequiresNoReturns, offset=1) flow.add(*happy_tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) transitions = [] - for state, failures in rt.runner.run_iter(): - transitions.append((state, failures)) - if state == st.ANALYZING: - rt.storage.set_flow_state(st.SUSPENDED) + for prior_state, new_state in machine_runner.run_iter(builder.START): + transitions.append((new_state, memory.failures)) + if new_state == st.ANALYZING: + runtime.storage.set_flow_state(st.SUSPENDED) state, failures = transitions[-1] self.assertEqual(st.SUSPENDED, state) self.assertEqual([], failures) self.assertEqual(st.PENDING, - rt.storage.get_atom_state(happy_tasks[0].name)) + runtime.storage.get_atom_state(happy_tasks[0].name)) self.assertEqual(st.FAILURE, - rt.storage.get_atom_state(sad_tasks[0].name)) + runtime.storage.get_atom_state(sad_tasks[0].name)) - -class RunnerBuildTest(test.TestCase, _RunnerTestMixin): def test_builder_manual_process(self): flow = lf.Flow("root") tasks = test_utils.make_many( 1, task_cls=test_utils.TaskNoRequiresNoReturns) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, machine_runner, memory = rt.runner.build() - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) self.assertRaises(excp.NotInitialized, machine.process_event, 'poke') # Should now be pending... - self.assertEqual(st.PENDING, rt.storage.get_atom_state(tasks[0].name)) + self.assertEqual(st.PENDING, + runtime.storage.get_atom_state(tasks[0].name)) machine.initialize() - self.assertEqual(runner._UNDEFINED, machine.current_state) + self.assertEqual(builder.UNDEFINED, machine.current_state) self.assertFalse(machine.terminated) self.assertRaises(excp.NotFound, machine.process_event, 'poke') last_state = machine.current_state - reaction, terminal = machine.process_event('start') + reaction, terminal = machine.process_event(builder.START) self.assertFalse(terminal) self.assertIsNotNone(reaction) self.assertEqual(st.RESUMING, machine.current_state) @@ -208,7 +206,7 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin): last_state = machine.current_state cb, args, kwargs = reaction next_event = cb(last_state, machine.current_state, - 'start', *args, **kwargs) + builder.START, *args, **kwargs) reaction, terminal = machine.process_event(next_event) self.assertFalse(terminal) self.assertIsNotNone(reaction) @@ -225,7 +223,8 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin): self.assertRaises(excp.NotFound, machine.process_event, 'poke') # Should now be running... - self.assertEqual(st.RUNNING, rt.storage.get_atom_state(tasks[0].name)) + self.assertEqual(st.RUNNING, + runtime.storage.get_atom_state(tasks[0].name)) last_state = machine.current_state cb, args, kwargs = reaction @@ -243,10 +242,11 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin): next_event, *args, **kwargs) reaction, terminal = machine.process_event(next_event) self.assertFalse(terminal) - self.assertEqual(runner._GAME_OVER, machine.current_state) + self.assertEqual(builder.GAME_OVER, machine.current_state) # Should now be done... - self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name)) + self.assertEqual(st.SUCCESS, + runtime.storage.get_atom_state(tasks[0].name)) def test_builder_automatic_process(self): flow = lf.Flow("root") @@ -254,26 +254,25 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin): 1, task_cls=test_utils.TaskNoRequiresNoReturns) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, machine_runner, memory = rt.runner.build() - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) - transitions = list(machine_runner.run_iter('start')) - self.assertEqual((runner._UNDEFINED, st.RESUMING), transitions[0]) - self.assertEqual((runner._GAME_OVER, st.SUCCESS), transitions[-1]) - self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name)) + transitions = list(machine_runner.run_iter(builder.START)) + self.assertEqual((builder.UNDEFINED, st.RESUMING), transitions[0]) + self.assertEqual((builder.GAME_OVER, st.SUCCESS), transitions[-1]) + self.assertEqual(st.SUCCESS, + runtime.storage.get_atom_state(tasks[0].name)) def test_builder_automatic_process_failure(self): flow = lf.Flow("root") tasks = test_utils.make_many(1, task_cls=test_utils.NastyFailingTask) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, machine_runner, memory = rt.runner.build() - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) - transitions = list(machine_runner.run_iter('start')) - self.assertEqual((runner._GAME_OVER, st.FAILURE), transitions[-1]) + transitions = list(machine_runner.run_iter(builder.START)) + self.assertEqual((builder.GAME_OVER, st.FAILURE), transitions[-1]) self.assertEqual(1, len(memory.failures)) def test_builder_automatic_process_reverted(self): @@ -281,13 +280,13 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin): tasks = test_utils.make_many(1, task_cls=test_utils.TaskWithFailure) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, machine_runner, memory = rt.runner.build() - self.assertTrue(rt.runner.runnable()) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) - transitions = list(machine_runner.run_iter('start')) - self.assertEqual((runner._GAME_OVER, st.REVERTED), transitions[-1]) - self.assertEqual(st.REVERTED, rt.storage.get_atom_state(tasks[0].name)) + transitions = list(machine_runner.run_iter(builder.START)) + self.assertEqual((builder.GAME_OVER, st.REVERTED), transitions[-1]) + self.assertEqual(st.REVERTED, + runtime.storage.get_atom_state(tasks[0].name)) def test_builder_expected_transition_occurrences(self): flow = lf.Flow("root") @@ -295,16 +294,16 @@ class RunnerBuildTest(test.TestCase, _RunnerTestMixin): 10, task_cls=test_utils.TaskNoRequiresNoReturns) flow.add(*tasks) - rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, machine_runner, memory = rt.runner.build() - transitions = list(machine_runner.run_iter('start')) + runtime, machine, memory, machine_runner = self._make_machine( + flow, initial_state=st.RUNNING) + transitions = list(machine_runner.run_iter(builder.START)) occurrences = dict((t, transitions.count(t)) for t in transitions) self.assertEqual(10, occurrences.get((st.SCHEDULING, st.WAITING))) self.assertEqual(10, occurrences.get((st.WAITING, st.ANALYZING))) self.assertEqual(9, occurrences.get((st.ANALYZING, st.SCHEDULING))) - self.assertEqual(1, occurrences.get((runner._GAME_OVER, st.SUCCESS))) - self.assertEqual(1, occurrences.get((runner._UNDEFINED, st.RESUMING))) + self.assertEqual(1, occurrences.get((builder.GAME_OVER, st.SUCCESS))) + self.assertEqual(1, occurrences.get((builder.UNDEFINED, st.RESUMING))) self.assertEqual(0, len(memory.next_nodes)) self.assertEqual(0, len(memory.not_done)) diff --git a/tools/state_graph.py b/tools/state_graph.py index c9bdd0b10..5530a4692 100755 --- a/tools/state_graph.py +++ b/tools/state_graph.py @@ -31,12 +31,12 @@ import pydot from automaton import machines -from taskflow.engines.action_engine import runner +from taskflow.engines.action_engine import builder from taskflow.engines.worker_based import protocol from taskflow import states -# This is just needed to get at the runner builder object (we will not +# This is just needed to get at the machine object (we will not # actually be running it...). class DummyRuntime(object): def __init__(self): @@ -134,9 +134,9 @@ def main(): list(states._ALLOWED_RETRY_TRANSITIONS)) elif options.engines: source_type = "Engines" - r = runner.Runner(DummyRuntime(), mock.MagicMock()) - source, memory = r.build() - internal_states.extend(runner._META_STATES) + b = builder.MachineBuilder(DummyRuntime(), mock.MagicMock()) + source, memory = b.build() + internal_states.extend(builder.META_STATES) ordering = 'out' elif options.wbe_requests: source_type = "WBE requests"