Merge "Add a engine preparation stage"

This commit is contained in:
Jenkins
2014-04-01 09:50:57 +00:00
committed by Gerrit Code Review
4 changed files with 40 additions and 15 deletions

View File

@@ -64,6 +64,7 @@ class ActionEngine(base.EngineBase):
self._task_executor = None self._task_executor = None
self._task_action = None self._task_action = None
self._retry_action = None self._retry_action = None
self._storage_ensured = False
def __str__(self): def __str__(self):
return "%s: %s" % (reflection.get_class_name(self), id(self)) return "%s: %s" % (reflection.get_class_name(self), id(self))
@@ -89,15 +90,7 @@ class ActionEngine(base.EngineBase):
def run(self): def run(self):
"""Runs the flow in the engine to completion.""" """Runs the flow in the engine to completion."""
self.compile() self.compile()
external_provides = set(self.storage.fetch_all().keys()) self.prepare()
missing = self._flow.requires - external_provides
if missing:
raise exc.MissingDependencies(self._flow, sorted(missing))
if self.storage.get_flow_state() == states.REVERTED:
self._root.reset_all()
self._change_state(states.PENDING)
self._task_executor.start() self._task_executor.start()
try: try:
self._run() self._run()
@@ -151,6 +144,27 @@ class ActionEngine(base.EngineBase):
self.storage.ensure_task(node.name, version, node.save_as) self.storage.ensure_task(node.name, version, node.save_as)
self._change_state(states.SUSPENDED) # does nothing in PENDING state self._change_state(states.SUSPENDED) # does nothing in PENDING state
@lock_utils.locked
def prepare(self):
if not self._compiled:
raise exc.InvalidState("Can not prepare an engine"
" which has not been compiled")
if not self._storage_ensured:
self._ensure_storage_for(self.execution_graph)
self._storage_ensured = True
# At this point we can check to ensure all dependencies are either
# flow/task provided or storage provided, if there are still missing
# dependencies then this flow will fail at runtime (which we can avoid
# by failing at preparation time).
external_provides = set(self.storage.fetch_all().keys())
missing = self._flow.requires - external_provides
if missing:
raise exc.MissingDependencies(self._flow, sorted(missing))
# Reset everything back to pending (if we were previously reverted).
if self.storage.get_flow_state() == states.REVERTED:
self._root.reset_all()
self._change_state(states.PENDING)
@lock_utils.locked @lock_utils.locked
def compile(self): def compile(self):
if self._compiled: if self._compiled:
@@ -173,13 +187,8 @@ class ActionEngine(base.EngineBase):
self.storage, self.storage,
self._task_action, self._task_action,
self._retry_action) self._retry_action)
# NOTE(harlowja): Perform initial state manipulation and setup.
#
# TODO(harlowja): This doesn't seem like it should be in a compilation
# function since compilation seems like it should not modify any
# external state.
self._ensure_storage_for(execution_graph)
self._compiled = True self._compiled = True
return
class SingleThreadedActionEngine(ActionEngine): class SingleThreadedActionEngine(ActionEngine):

View File

@@ -56,6 +56,16 @@ class EngineBase(object):
indicating why this compilation could not be achieved. indicating why this compilation could not be achieved.
""" """
@abc.abstractmethod
def prepare(self):
"""Performs any pre-run, but post-compilation actions.
NOTE(harlowja): During preparation it is currently assumed that the
underlying storage will be initialized, all final dependencies
will be verified, the tasks will be reset and the engine will enter
the PENDING state.
"""
@abc.abstractmethod @abc.abstractmethod
def run(self): def run(self):
"""Runs the flow in the engine to completion (or die trying).""" """Runs the flow in the engine to completion (or die trying)."""

View File

@@ -49,6 +49,7 @@ class TestProgress(test.TestCase):
flow_detail=flow_detail, flow_detail=flow_detail,
backend=backend) backend=backend)
e.compile() e.compile()
e.prepare()
return e return e
def tearDown(self): def tearDown(self):

View File

@@ -394,6 +394,7 @@ class RetryTest(utils.EngineTestBase):
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.compile() engine.compile()
engine.prepare()
utils.register_notifiers(engine, self.values) utils.register_notifiers(engine, self.values)
engine.storage.set_atom_state('r1', st.RETRYING) engine.storage.set_atom_state('r1', st.RETRYING)
engine.storage.set_atom_state('t1', st.PENDING) engine.storage.set_atom_state('t1', st.PENDING)
@@ -425,6 +426,7 @@ class RetryTest(utils.EngineTestBase):
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.compile() engine.compile()
engine.prepare()
utils.register_notifiers(engine, self.values) utils.register_notifiers(engine, self.values)
engine.storage.set_atom_intention('r1', st.RETRY) engine.storage.set_atom_intention('r1', st.RETRY)
engine.storage.set_atom_state('r1', st.SUCCESS) engine.storage.set_atom_state('r1', st.SUCCESS)
@@ -551,6 +553,7 @@ class RetryTest(utils.EngineTestBase):
utils.SaveOrderTask('task1')) utils.SaveOrderTask('task1'))
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.compile() engine.compile()
engine.prepare()
# imagine we run engine # imagine we run engine
engine.storage.set_flow_state(st.RUNNING) engine.storage.set_flow_state(st.RUNNING)
engine.storage.set_atom_intention('flow-1_retry', st.EXECUTE) engine.storage.set_atom_intention('flow-1_retry', st.EXECUTE)
@@ -662,6 +665,7 @@ class RetryTest(utils.EngineTestBase):
utils.FailingTask('c'))) utils.FailingTask('c')))
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.compile() engine.compile()
engine.prepare()
engine.storage.save('test2_retry', 1) engine.storage.save('test2_retry', 1)
engine.storage.save('b', 11) engine.storage.save('b', 11)
engine.storage.save('a', 10) engine.storage.save('a', 10)
@@ -687,6 +691,7 @@ class RetryTest(utils.EngineTestBase):
utils.SaveOrderTask('c'))) utils.SaveOrderTask('c')))
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.compile() engine.compile()
engine.prepare()
engine.storage.save('test2_retry', 1) engine.storage.save('test2_retry', 1)
engine.storage.save('b', 11) engine.storage.save('b', 11)
# pretend that 'c' failed # pretend that 'c' failed