Refactor machine builder + runner into single unit

There is really no need to seperate off the runner into
a unit that builds a state-machine and then provides a tiny
utility function, both of these can just be in the same class
and code so that it is easier to understand/read.

Change-Id: I18b97514e230451ef804a878a0edcea4d0b2ad20
This commit is contained in:
Joshua Harlow 2015-06-03 16:52:16 -07:00
parent 1e6b991b23
commit abf21c7289
3 changed files with 48 additions and 64 deletions

View File

@ -51,38 +51,48 @@ class _MachineMemory(object):
self.done = set() self.done = set()
class _MachineBuilder(object): class Runner(object):
"""State machine *builder* that the runner uses. """State machine *builder* + *runner* that powers the engine components.
NOTE(harlowja): the machine states that this build will for are:: NOTE(harlowja): the machine (states and events that will trigger
transitions) that this builds is represented by the following
table::
+--------------+------------------+------------+----------+---------+ +--------------+------------------+------------+----------+---------+
Start | Event | End | On Enter | On Exit Start | Event | End | On Enter | On Exit
+--------------+------------------+------------+----------+---------+ +--------------+------------------+------------+----------+---------+
ANALYZING | completed | GAME_OVER | | ANALYZING | completed | GAME_OVER | |
ANALYZING | schedule_next | SCHEDULING | | ANALYZING | schedule_next | SCHEDULING | |
ANALYZING | wait_finished | WAITING | | ANALYZING | wait_finished | WAITING | |
FAILURE[$] | | | | FAILURE[$] | | | |
GAME_OVER | failed | FAILURE | | GAME_OVER | failed | FAILURE | |
GAME_OVER | reverted | REVERTED | | GAME_OVER | reverted | REVERTED | |
GAME_OVER | success | SUCCESS | | GAME_OVER | success | SUCCESS | |
GAME_OVER | suspended | SUSPENDED | | GAME_OVER | suspended | SUSPENDED | |
RESUMING | schedule_next | SCHEDULING | | RESUMING | schedule_next | SCHEDULING | |
REVERTED[$] | | | | REVERTED[$] | | | |
SCHEDULING | wait_finished | WAITING | | SCHEDULING | wait_finished | WAITING | |
SUCCESS[$] | | | | SUCCESS[$] | | | |
SUSPENDED[$] | | | | SUSPENDED[$] | | | |
UNDEFINED[^] | start | RESUMING | | UNDEFINED[^] | start | RESUMING | |
WAITING | examine_finished | ANALYZING | | WAITING | examine_finished | ANALYZING | |
+--------------+------------------+------------+----------+---------+ +--------------+------------------+------------+----------+---------+
Between any of these yielded states (minus ``GAME_OVER`` and ``UNDEFINED``) Between any of these yielded states (minus ``GAME_OVER`` and ``UNDEFINED``)
if the engine has been suspended or the engine has failed (due to a if the engine has been suspended or the engine has failed (due to a
non-resolveable task failure or scheduling failure) the machine will stop non-resolveable task failure or scheduling failure) the machine will stop
executing new tasks (currently running tasks will be allowed to complete) executing new tasks (currently running tasks will be allowed to complete)
and this machines run loop will be broken. and this machines run loop will be broken.
NOTE(harlowja): If the runtimes scheduler component is able to schedule
tasks in parallel, this enables parallel running and/or reversion.
""" """
# Informational states this action yields while running, not useful to
# have the engine record but useful to provide to end-users when doing
# execution iterations.
ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING)
def __init__(self, runtime, waiter): def __init__(self, runtime, waiter):
self._analyzer = runtime.analyzer self._analyzer = runtime.analyzer
self._completer = runtime.completer self._completer = runtime.completer
@ -91,9 +101,12 @@ class _MachineBuilder(object):
self._waiter = waiter self._waiter = waiter
def runnable(self): def runnable(self):
"""Checks if the storage says the flow is still runnable/running."""
return self._storage.get_flow_state() == st.RUNNING return self._storage.get_flow_state() == st.RUNNING
def build(self, timeout=None): def build(self, timeout=None):
"""Builds a state-machine (that can be/is used during running)."""
memory = _MachineMemory() memory = _MachineMemory()
if timeout is None: if timeout is None:
timeout = _WAITING_TIMEOUT timeout = _WAITING_TIMEOUT
@ -244,38 +257,9 @@ class _MachineBuilder(object):
m.freeze() m.freeze()
return (m, memory) return (m, memory)
class Runner(object):
"""Runner that iterates while executing nodes using the given runtime.
This runner acts as the action engine run loop/state-machine, it resumes
the workflow, schedules all task it can for execution using the runtimes
scheduler and analyzer components, and than waits on returned futures and
then activates the runtimes completion component to finish up those tasks
and so on...
NOTE(harlowja): If the runtimes scheduler component is able to schedule
tasks in parallel, this enables parallel running and/or reversion.
"""
# Informational states this action yields while running, not useful to
# have the engine record but useful to provide to end-users when doing
# execution iterations.
ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING)
def __init__(self, runtime, waiter):
self._builder = _MachineBuilder(runtime, waiter)
@property
def builder(self):
return self._builder
def runnable(self):
return self._builder.runnable()
def run_iter(self, timeout=None): def run_iter(self, timeout=None):
"""Runs the nodes using a built state machine.""" """Runs iteratively using a locally built state machine."""
machine, memory = self.builder.build(timeout=timeout) machine, memory = self.build(timeout=timeout)
for (_prior_state, new_state) in machine.run_iter(_START): for (_prior_state, new_state) in machine.run_iter(_START):
# NOTE(harlowja): skip over meta-states. # NOTE(harlowja): skip over meta-states.
if new_state not in _META_STATES: if new_state not in _META_STATES:

View File

@ -174,7 +174,7 @@ class RunnerTest(test.TestCase, _RunnerTestMixin):
rt.storage.get_atom_state(sad_tasks[0].name)) rt.storage.get_atom_state(sad_tasks[0].name))
class RunnerBuilderTest(test.TestCase, _RunnerTestMixin): class RunnerBuildTest(test.TestCase, _RunnerTestMixin):
def test_builder_manual_process(self): def test_builder_manual_process(self):
flow = lf.Flow("root") flow = lf.Flow("root")
tasks = test_utils.make_many( tasks = test_utils.make_many(
@ -182,8 +182,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks) flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING) rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, memory = rt.runner.builder.build() machine, memory = rt.runner.build()
self.assertTrue(rt.runner.builder.runnable()) self.assertTrue(rt.runner.runnable())
self.assertRaises(fsm.NotInitialized, machine.process_event, 'poke') self.assertRaises(fsm.NotInitialized, machine.process_event, 'poke')
# Should now be pending... # Should now be pending...
@ -251,8 +251,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks) flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING) rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, memory = rt.runner.builder.build() machine, memory = rt.runner.build()
self.assertTrue(rt.runner.builder.runnable()) self.assertTrue(rt.runner.runnable())
transitions = list(machine.run_iter('start')) transitions = list(machine.run_iter('start'))
self.assertEqual((runner._UNDEFINED, st.RESUMING), transitions[0]) self.assertEqual((runner._UNDEFINED, st.RESUMING), transitions[0])
@ -265,8 +265,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks) flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING) rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, memory = rt.runner.builder.build() machine, memory = rt.runner.build()
self.assertTrue(rt.runner.builder.runnable()) self.assertTrue(rt.runner.runnable())
transitions = list(machine.run_iter('start')) transitions = list(machine.run_iter('start'))
self.assertEqual((runner._GAME_OVER, st.FAILURE), transitions[-1]) self.assertEqual((runner._GAME_OVER, st.FAILURE), transitions[-1])
@ -278,8 +278,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks) flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING) rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, memory = rt.runner.builder.build() machine, memory = rt.runner.build()
self.assertTrue(rt.runner.builder.runnable()) self.assertTrue(rt.runner.runnable())
transitions = list(machine.run_iter('start')) transitions = list(machine.run_iter('start'))
self.assertEqual((runner._GAME_OVER, st.REVERTED), transitions[-1]) self.assertEqual((runner._GAME_OVER, st.REVERTED), transitions[-1])
@ -292,7 +292,7 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks) flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING) rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, memory = rt.runner.builder.build() machine, memory = rt.runner.build()
transitions = list(machine.run_iter('start')) transitions = list(machine.run_iter('start'))
occurrences = dict((t, transitions.count(t)) for t in transitions) occurrences = dict((t, transitions.count(t)) for t in transitions)

View File

@ -134,7 +134,7 @@ def main():
elif options.engines: elif options.engines:
source_type = "Engines" source_type = "Engines"
r = runner.Runner(DummyRuntime(), None) r = runner.Runner(DummyRuntime(), None)
source, memory = r.builder.build() source, memory = r.build()
internal_states.extend(runner._META_STATES) internal_states.extend(runner._META_STATES)
ordering = 'out' ordering = 'out'
elif options.wbe_requests: elif options.wbe_requests: