From abf21c728914ac2c2390efcce7dec4f19eb926a5 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 3 Jun 2015 16:52:16 -0700 Subject: [PATCH] Refactor machine builder + runner into single unit There is really no need to seperate off the runner into a unit that builds a state-machine and then provides a tiny utility function, both of these can just be in the same class and code so that it is easier to understand/read. Change-Id: I18b97514e230451ef804a878a0edcea4d0b2ad20 --- taskflow/engines/action_engine/runner.py | 90 ++++++++----------- .../tests/unit/action_engine/test_runner.py | 20 ++--- tools/state_graph.py | 2 +- 3 files changed, 48 insertions(+), 64 deletions(-) diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/runner.py index d50de157e..8d637c1ce 100644 --- a/taskflow/engines/action_engine/runner.py +++ b/taskflow/engines/action_engine/runner.py @@ -51,38 +51,48 @@ class _MachineMemory(object): self.done = set() -class _MachineBuilder(object): - """State machine *builder* that the runner uses. +class Runner(object): + """State machine *builder* + *runner* that powers the engine components. - NOTE(harlowja): the machine states that this build will for are:: + NOTE(harlowja): the machine (states and events that will trigger + transitions) that this builds is represented by the following + table:: - +--------------+------------------+------------+----------+---------+ - Start | Event | End | On Enter | On Exit - +--------------+------------------+------------+----------+---------+ - ANALYZING | completed | GAME_OVER | | - ANALYZING | schedule_next | SCHEDULING | | - ANALYZING | wait_finished | WAITING | | - FAILURE[$] | | | | - GAME_OVER | failed | FAILURE | | - GAME_OVER | reverted | REVERTED | | - GAME_OVER | success | SUCCESS | | - GAME_OVER | suspended | SUSPENDED | | - RESUMING | schedule_next | SCHEDULING | | - REVERTED[$] | | | | - SCHEDULING | wait_finished | WAITING | | - SUCCESS[$] | | | | - SUSPENDED[$] | | | | - UNDEFINED[^] | start | RESUMING | | - WAITING | examine_finished | ANALYZING | | - +--------------+------------------+------------+----------+---------+ + +--------------+------------------+------------+----------+---------+ + Start | Event | End | On Enter | On Exit + +--------------+------------------+------------+----------+---------+ + ANALYZING | completed | GAME_OVER | | + ANALYZING | schedule_next | SCHEDULING | | + ANALYZING | wait_finished | WAITING | | + FAILURE[$] | | | | + GAME_OVER | failed | FAILURE | | + GAME_OVER | reverted | REVERTED | | + GAME_OVER | success | SUCCESS | | + GAME_OVER | suspended | SUSPENDED | | + RESUMING | schedule_next | SCHEDULING | | + REVERTED[$] | | | | + SCHEDULING | wait_finished | WAITING | | + SUCCESS[$] | | | | + SUSPENDED[$] | | | | + UNDEFINED[^] | start | RESUMING | | + WAITING | examine_finished | ANALYZING | | + +--------------+------------------+------------+----------+---------+ Between any of these yielded states (minus ``GAME_OVER`` and ``UNDEFINED``) if the engine has been suspended or the engine has failed (due to a non-resolveable task failure or scheduling failure) the machine will stop executing new tasks (currently running tasks will be allowed to complete) and this machines run loop will be broken. + + NOTE(harlowja): If the runtimes scheduler component is able to schedule + tasks in parallel, this enables parallel running and/or reversion. """ + # Informational states this action yields while running, not useful to + # have the engine record but useful to provide to end-users when doing + # execution iterations. + ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING) + def __init__(self, runtime, waiter): self._analyzer = runtime.analyzer self._completer = runtime.completer @@ -91,9 +101,12 @@ class _MachineBuilder(object): self._waiter = waiter def runnable(self): + """Checks if the storage says the flow is still runnable/running.""" return self._storage.get_flow_state() == st.RUNNING def build(self, timeout=None): + """Builds a state-machine (that can be/is used during running).""" + memory = _MachineMemory() if timeout is None: timeout = _WAITING_TIMEOUT @@ -244,38 +257,9 @@ class _MachineBuilder(object): m.freeze() return (m, memory) - -class Runner(object): - """Runner that iterates while executing nodes using the given runtime. - - This runner acts as the action engine run loop/state-machine, it resumes - the workflow, schedules all task it can for execution using the runtimes - scheduler and analyzer components, and than waits on returned futures and - then activates the runtimes completion component to finish up those tasks - and so on... - - NOTE(harlowja): If the runtimes scheduler component is able to schedule - tasks in parallel, this enables parallel running and/or reversion. - """ - - # Informational states this action yields while running, not useful to - # have the engine record but useful to provide to end-users when doing - # execution iterations. - ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING) - - def __init__(self, runtime, waiter): - self._builder = _MachineBuilder(runtime, waiter) - - @property - def builder(self): - return self._builder - - def runnable(self): - return self._builder.runnable() - def run_iter(self, timeout=None): - """Runs the nodes using a built state machine.""" - machine, memory = self.builder.build(timeout=timeout) + """Runs iteratively using a locally built state machine.""" + machine, memory = self.build(timeout=timeout) for (_prior_state, new_state) in machine.run_iter(_START): # NOTE(harlowja): skip over meta-states. if new_state not in _META_STATES: diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py index 98ae0e287..cbc1e2e93 100644 --- a/taskflow/tests/unit/action_engine/test_runner.py +++ b/taskflow/tests/unit/action_engine/test_runner.py @@ -174,7 +174,7 @@ class RunnerTest(test.TestCase, _RunnerTestMixin): rt.storage.get_atom_state(sad_tasks[0].name)) -class RunnerBuilderTest(test.TestCase, _RunnerTestMixin): +class RunnerBuildTest(test.TestCase, _RunnerTestMixin): def test_builder_manual_process(self): flow = lf.Flow("root") tasks = test_utils.make_many( @@ -182,8 +182,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin): flow.add(*tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, memory = rt.runner.builder.build() - self.assertTrue(rt.runner.builder.runnable()) + machine, memory = rt.runner.build() + self.assertTrue(rt.runner.runnable()) self.assertRaises(fsm.NotInitialized, machine.process_event, 'poke') # Should now be pending... @@ -251,8 +251,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin): flow.add(*tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, memory = rt.runner.builder.build() - self.assertTrue(rt.runner.builder.runnable()) + machine, memory = rt.runner.build() + self.assertTrue(rt.runner.runnable()) transitions = list(machine.run_iter('start')) self.assertEqual((runner._UNDEFINED, st.RESUMING), transitions[0]) @@ -265,8 +265,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin): flow.add(*tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, memory = rt.runner.builder.build() - self.assertTrue(rt.runner.builder.runnable()) + machine, memory = rt.runner.build() + self.assertTrue(rt.runner.runnable()) transitions = list(machine.run_iter('start')) self.assertEqual((runner._GAME_OVER, st.FAILURE), transitions[-1]) @@ -278,8 +278,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin): flow.add(*tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, memory = rt.runner.builder.build() - self.assertTrue(rt.runner.builder.runnable()) + machine, memory = rt.runner.build() + self.assertTrue(rt.runner.runnable()) transitions = list(machine.run_iter('start')) self.assertEqual((runner._GAME_OVER, st.REVERTED), transitions[-1]) @@ -292,7 +292,7 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin): flow.add(*tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - machine, memory = rt.runner.builder.build() + machine, memory = rt.runner.build() transitions = list(machine.run_iter('start')) occurrences = dict((t, transitions.count(t)) for t in transitions) diff --git a/tools/state_graph.py b/tools/state_graph.py index 319614041..38bd6d91e 100755 --- a/tools/state_graph.py +++ b/tools/state_graph.py @@ -134,7 +134,7 @@ def main(): elif options.engines: source_type = "Engines" r = runner.Runner(DummyRuntime(), None) - source, memory = r.builder.build() + source, memory = r.build() internal_states.extend(runner._META_STATES) ordering = 'out' elif options.wbe_requests: