Merge "Refactor machine builder + runner into single unit"

This commit is contained in:
Jenkins 2015-06-20 05:36:11 +00:00 committed by Gerrit Code Review
commit ffce3b6b02
3 changed files with 48 additions and 64 deletions

View File

@ -51,38 +51,48 @@ class _MachineMemory(object):
self.done = set() self.done = set()
class _MachineBuilder(object): class Runner(object):
"""State machine *builder* that the runner uses. """State machine *builder* + *runner* that powers the engine components.
NOTE(harlowja): the machine states that this build will for are:: NOTE(harlowja): the machine (states and events that will trigger
transitions) that this builds is represented by the following
table::
+--------------+------------------+------------+----------+---------+ +--------------+------------------+------------+----------+---------+
Start | Event | End | On Enter | On Exit Start | Event | End | On Enter | On Exit
+--------------+------------------+------------+----------+---------+ +--------------+------------------+------------+----------+---------+
ANALYZING | completed | GAME_OVER | | ANALYZING | completed | GAME_OVER | |
ANALYZING | schedule_next | SCHEDULING | | ANALYZING | schedule_next | SCHEDULING | |
ANALYZING | wait_finished | WAITING | | ANALYZING | wait_finished | WAITING | |
FAILURE[$] | | | | FAILURE[$] | | | |
GAME_OVER | failed | FAILURE | | GAME_OVER | failed | FAILURE | |
GAME_OVER | reverted | REVERTED | | GAME_OVER | reverted | REVERTED | |
GAME_OVER | success | SUCCESS | | GAME_OVER | success | SUCCESS | |
GAME_OVER | suspended | SUSPENDED | | GAME_OVER | suspended | SUSPENDED | |
RESUMING | schedule_next | SCHEDULING | | RESUMING | schedule_next | SCHEDULING | |
REVERTED[$] | | | | REVERTED[$] | | | |
SCHEDULING | wait_finished | WAITING | | SCHEDULING | wait_finished | WAITING | |
SUCCESS[$] | | | | SUCCESS[$] | | | |
SUSPENDED[$] | | | | SUSPENDED[$] | | | |
UNDEFINED[^] | start | RESUMING | | UNDEFINED[^] | start | RESUMING | |
WAITING | examine_finished | ANALYZING | | WAITING | examine_finished | ANALYZING | |
+--------------+------------------+------------+----------+---------+ +--------------+------------------+------------+----------+---------+
Between any of these yielded states (minus ``GAME_OVER`` and ``UNDEFINED``) Between any of these yielded states (minus ``GAME_OVER`` and ``UNDEFINED``)
if the engine has been suspended or the engine has failed (due to a if the engine has been suspended or the engine has failed (due to a
non-resolveable task failure or scheduling failure) the machine will stop non-resolveable task failure or scheduling failure) the machine will stop
executing new tasks (currently running tasks will be allowed to complete) executing new tasks (currently running tasks will be allowed to complete)
and this machines run loop will be broken. and this machines run loop will be broken.
NOTE(harlowja): If the runtimes scheduler component is able to schedule
tasks in parallel, this enables parallel running and/or reversion.
""" """
# Informational states this action yields while running, not useful to
# have the engine record but useful to provide to end-users when doing
# execution iterations.
ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING)
def __init__(self, runtime, waiter): def __init__(self, runtime, waiter):
self._analyzer = runtime.analyzer self._analyzer = runtime.analyzer
self._completer = runtime.completer self._completer = runtime.completer
@ -91,9 +101,12 @@ class _MachineBuilder(object):
self._waiter = waiter self._waiter = waiter
def runnable(self): def runnable(self):
"""Checks if the storage says the flow is still runnable/running."""
return self._storage.get_flow_state() == st.RUNNING return self._storage.get_flow_state() == st.RUNNING
def build(self, timeout=None): def build(self, timeout=None):
"""Builds a state-machine (that can be/is used during running)."""
memory = _MachineMemory() memory = _MachineMemory()
if timeout is None: if timeout is None:
timeout = _WAITING_TIMEOUT timeout = _WAITING_TIMEOUT
@ -244,38 +257,9 @@ class _MachineBuilder(object):
m.freeze() m.freeze()
return (m, memory) return (m, memory)
class Runner(object):
"""Runner that iterates while executing nodes using the given runtime.
This runner acts as the action engine run loop/state-machine, it resumes
the workflow, schedules all task it can for execution using the runtimes
scheduler and analyzer components, and than waits on returned futures and
then activates the runtimes completion component to finish up those tasks
and so on...
NOTE(harlowja): If the runtimes scheduler component is able to schedule
tasks in parallel, this enables parallel running and/or reversion.
"""
# Informational states this action yields while running, not useful to
# have the engine record but useful to provide to end-users when doing
# execution iterations.
ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING)
def __init__(self, runtime, waiter):
self._builder = _MachineBuilder(runtime, waiter)
@property
def builder(self):
return self._builder
def runnable(self):
return self._builder.runnable()
def run_iter(self, timeout=None): def run_iter(self, timeout=None):
"""Runs the nodes using a built state machine.""" """Runs iteratively using a locally built state machine."""
machine, memory = self.builder.build(timeout=timeout) machine, memory = self.build(timeout=timeout)
for (_prior_state, new_state) in machine.run_iter(_START): for (_prior_state, new_state) in machine.run_iter(_START):
# NOTE(harlowja): skip over meta-states. # NOTE(harlowja): skip over meta-states.
if new_state not in _META_STATES: if new_state not in _META_STATES:

View File

@ -176,7 +176,7 @@ class RunnerTest(test.TestCase, _RunnerTestMixin):
rt.storage.get_atom_state(sad_tasks[0].name)) rt.storage.get_atom_state(sad_tasks[0].name))
class RunnerBuilderTest(test.TestCase, _RunnerTestMixin): class RunnerBuildTest(test.TestCase, _RunnerTestMixin):
def test_builder_manual_process(self): def test_builder_manual_process(self):
flow = lf.Flow("root") flow = lf.Flow("root")
tasks = test_utils.make_many( tasks = test_utils.make_many(
@ -184,8 +184,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks) flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING) rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, memory = rt.runner.builder.build() machine, memory = rt.runner.build()
self.assertTrue(rt.runner.builder.runnable()) self.assertTrue(rt.runner.runnable())
self.assertRaises(fsm.NotInitialized, machine.process_event, 'poke') self.assertRaises(fsm.NotInitialized, machine.process_event, 'poke')
# Should now be pending... # Should now be pending...
@ -253,8 +253,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks) flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING) rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, memory = rt.runner.builder.build() machine, memory = rt.runner.build()
self.assertTrue(rt.runner.builder.runnable()) self.assertTrue(rt.runner.runnable())
transitions = list(machine.run_iter('start')) transitions = list(machine.run_iter('start'))
self.assertEqual((runner._UNDEFINED, st.RESUMING), transitions[0]) self.assertEqual((runner._UNDEFINED, st.RESUMING), transitions[0])
@ -267,8 +267,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks) flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING) rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, memory = rt.runner.builder.build() machine, memory = rt.runner.build()
self.assertTrue(rt.runner.builder.runnable()) self.assertTrue(rt.runner.runnable())
transitions = list(machine.run_iter('start')) transitions = list(machine.run_iter('start'))
self.assertEqual((runner._GAME_OVER, st.FAILURE), transitions[-1]) self.assertEqual((runner._GAME_OVER, st.FAILURE), transitions[-1])
@ -280,8 +280,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks) flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING) rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, memory = rt.runner.builder.build() machine, memory = rt.runner.build()
self.assertTrue(rt.runner.builder.runnable()) self.assertTrue(rt.runner.runnable())
transitions = list(machine.run_iter('start')) transitions = list(machine.run_iter('start'))
self.assertEqual((runner._GAME_OVER, st.REVERTED), transitions[-1]) self.assertEqual((runner._GAME_OVER, st.REVERTED), transitions[-1])
@ -294,7 +294,7 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks) flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING) rt = self._make_runtime(flow, initial_state=st.RUNNING)
machine, memory = rt.runner.builder.build() machine, memory = rt.runner.build()
transitions = list(machine.run_iter('start')) transitions = list(machine.run_iter('start'))
occurrences = dict((t, transitions.count(t)) for t in transitions) occurrences = dict((t, transitions.count(t)) for t in transitions)

View File

@ -131,7 +131,7 @@ def main():
elif options.engines: elif options.engines:
source_type = "Engines" source_type = "Engines"
r = runner.Runner(DummyRuntime(), None) r = runner.Runner(DummyRuntime(), None)
source, memory = r.builder.build() source, memory = r.build()
internal_states.extend(runner._META_STATES) internal_states.extend(runner._META_STATES)
ordering = 'out' ordering = 'out'
elif options.wbe_requests: elif options.wbe_requests: