Add a engine preparation stage

Move the final logic of the compilation stage (and pre-run) code
into a new engine stage that is devoted to performing post-compile
but pre-run actions. Do the final validation that was being done
in compile() and run() in this stage instead of being split across
those two stages.

Breaking change: any user expecting storage to be ensured after
compilation will have to adjust their code to call prepare (after
which they can continue to expect storage to be ready).

Change-Id: Ie97bc4a795266dda9d7ae8b395bcaadcd1ada737
This commit is contained in:
Joshua Harlow
2014-03-29 17:36:32 -07:00
parent 5048dfc2a9
commit 1d983f5c31
4 changed files with 40 additions and 15 deletions

View File

@@ -64,6 +64,7 @@ class ActionEngine(base.EngineBase):
self._task_executor = None
self._task_action = None
self._retry_action = None
self._storage_ensured = False
def __str__(self):
return "%s: %s" % (reflection.get_class_name(self), id(self))
@@ -83,15 +84,7 @@ class ActionEngine(base.EngineBase):
def run(self):
"""Runs the flow in the engine to completion."""
self.compile()
external_provides = set(self.storage.fetch_all().keys())
missing = self._flow.requires - external_provides
if missing:
raise exc.MissingDependencies(self._flow, sorted(missing))
if self.storage.get_flow_state() == states.REVERTED:
self._root.reset_all()
self._change_state(states.PENDING)
self.prepare()
self._task_executor.start()
try:
self._run()
@@ -145,6 +138,27 @@ class ActionEngine(base.EngineBase):
self.storage.ensure_task(node.name, version, node.save_as)
self._change_state(states.SUSPENDED) # does nothing in PENDING state
@lock_utils.locked
def prepare(self):
if not self._compiled:
raise exc.InvalidState("Can not prepare an engine"
" which has not been compiled")
if not self._storage_ensured:
self._ensure_storage_for(self.execution_graph)
self._storage_ensured = True
# At this point we can check to ensure all dependencies are either
# flow/task provided or storage provided, if there are still missing
# dependencies then this flow will fail at runtime (which we can avoid
# by failing at preparation time).
external_provides = set(self.storage.fetch_all().keys())
missing = self._flow.requires - external_provides
if missing:
raise exc.MissingDependencies(self._flow, sorted(missing))
# Reset everything back to pending (if we were previously reverted).
if self.storage.get_flow_state() == states.REVERTED:
self._root.reset_all()
self._change_state(states.PENDING)
@lock_utils.locked
def compile(self):
if self._compiled:
@@ -167,13 +181,8 @@ class ActionEngine(base.EngineBase):
self.storage,
self._task_action,
self._retry_action)
# NOTE(harlowja): Perform initial state manipulation and setup.
#
# TODO(harlowja): This doesn't seem like it should be in a compilation
# function since compilation seems like it should not modify any
# external state.
self._ensure_storage_for(execution_graph)
self._compiled = True
return
class SingleThreadedActionEngine(ActionEngine):

View File

@@ -56,6 +56,16 @@ class EngineBase(object):
indicating why this compilation could not be achieved.
"""
@abc.abstractmethod
def prepare(self):
"""Performs any pre-run, but post-compilation actions.
NOTE(harlowja): During preparation it is currently assumed that the
underlying storage will be initialized, all final dependencies
will be verified, the tasks will be reset and the engine will enter
the PENDING state.
"""
@abc.abstractmethod
def run(self):
"""Runs the flow in the engine to completion (or die trying)."""

View File

@@ -49,6 +49,7 @@ class TestProgress(test.TestCase):
flow_detail=flow_detail,
backend=backend)
e.compile()
e.prepare()
return e
def tearDown(self):

View File

@@ -394,6 +394,7 @@ class RetryTest(utils.EngineTestBase):
)
engine = self._make_engine(flow)
engine.compile()
engine.prepare()
utils.register_notifiers(engine, self.values)
engine.storage.set_atom_state('r1', st.RETRYING)
engine.storage.set_atom_state('t1', st.PENDING)
@@ -425,6 +426,7 @@ class RetryTest(utils.EngineTestBase):
)
engine = self._make_engine(flow)
engine.compile()
engine.prepare()
utils.register_notifiers(engine, self.values)
engine.storage.set_atom_intention('r1', st.RETRY)
engine.storage.set_atom_state('r1', st.SUCCESS)
@@ -551,6 +553,7 @@ class RetryTest(utils.EngineTestBase):
utils.SaveOrderTask('task1'))
engine = self._make_engine(flow)
engine.compile()
engine.prepare()
# imagine we run engine
engine.storage.set_flow_state(st.RUNNING)
engine.storage.set_atom_intention('flow-1_retry', st.EXECUTE)
@@ -662,6 +665,7 @@ class RetryTest(utils.EngineTestBase):
utils.FailingTask('c')))
engine = self._make_engine(flow)
engine.compile()
engine.prepare()
engine.storage.save('test2_retry', 1)
engine.storage.save('b', 11)
engine.storage.save('a', 10)
@@ -687,6 +691,7 @@ class RetryTest(utils.EngineTestBase):
utils.SaveOrderTask('c')))
engine = self._make_engine(flow)
engine.compile()
engine.prepare()
engine.storage.save('test2_retry', 1)
engine.storage.save('b', 11)
# pretend that 'c' failed