Merge "Suspend single and multi threaded engines"

This commit is contained in:
Jenkins
2013-09-30 06:29:56 +00:00
committed by Gerrit Code Review
4 changed files with 141 additions and 21 deletions

View File

@@ -46,14 +46,17 @@ class ActionEngine(object):
self._root = None
self._flow = flow
self._lock = threading.RLock()
self._state_lock = threading.RLock()
self.notifier = misc.TransitionNotifier()
self.task_notifier = misc.TransitionNotifier()
self.storage = storage
def _revert(self, current_failure):
self._change_state(states.REVERTING)
self._root.revert(self)
self._change_state(states.REVERTED)
state = self._root.revert(self)
self._change_state(state)
if state == states.SUSPENDED:
return
self._change_state(states.FAILURE)
if self._failures:
if len(self._failures) == 1:
@@ -67,29 +70,43 @@ class ActionEngine(object):
def _reset(self):
self._failures = []
def suspend(self):
self._change_state(states.SUSPENDING)
def get_graph(self):
self.compile()
return self._root.graph
@decorators.locked
def run(self):
self.compile()
self._reset()
if self.storage.get_flow_state() != states.SUSPENDED:
self.compile()
self._reset()
external_provides = set(self.storage.fetch_all().keys())
missing = self._flow.requires - external_provides
if missing:
raise exc.MissingDependencies(self._flow, sorted(missing))
external_provides = set(self.storage.fetch_all().keys())
missing = self._flow.requires - external_provides
if missing:
raise exc.MissingDependencies(self._flow, sorted(missing))
self._run()
elif self._failures:
self._revert(self._failures[-1])
else:
self._run()
def _run(self):
self._change_state(states.RUNNING)
try:
self._root.execute(self)
state = self._root.execute(self)
except Exception:
self._revert(misc.Failure())
else:
self._change_state(states.SUCCESS)
self._change_state(state)
@decorators.locked(lock='_state_lock')
def _change_state(self, state):
if (state == states.SUSPENDING and not (self.is_running or
self.is_reverting)):
return
self.storage.set_flow_state(state)
details = dict(engine=self)
self.notifier.notify(state, details)
@@ -116,6 +133,14 @@ class ActionEngine(object):
if self._root is None:
self._root = self._translate_flow_to_action()
@property
def is_running(self):
return self.storage.get_flow_state() == states.RUNNING
@property
def is_reverting(self):
return self.storage.get_flow_state() == states.REVERTING
class SingleThreadedActionEngine(ActionEngine):
# This one attempts to run in a serial manner.

View File

@@ -24,6 +24,7 @@ from concurrent import futures
from taskflow.engines.action_engine import base_action as base
from taskflow import exceptions as exc
from taskflow import states as st
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@@ -78,27 +79,35 @@ class SequentialGraphAction(GraphAction):
deps_counter = self._get_nodes_dependencies_count()
to_execute = self._browse_nodes_to_execute(deps_counter)
while to_execute:
while to_execute and engine.is_running:
node = to_execute.pop()
action = self._action_mapping[node]
action.execute(engine) # raises on failure
to_execute += self._resolve_dependencies(node, deps_counter)
if to_execute:
return st.SUSPENDED
return st.SUCCESS
def revert(self, engine):
deps_counter = self._get_nodes_dependencies_count(True)
to_revert = self._browse_nodes_to_execute(deps_counter)
while to_revert:
while to_revert and engine.is_reverting:
node = to_revert.pop()
action = self._action_mapping[node]
action.revert(engine) # raises on failure
to_revert += self._resolve_dependencies(node, deps_counter, True)
if to_revert:
return st.SUSPENDED
return st.REVERTED
class ParallelGraphAction(SequentialGraphAction):
def execute(self, engine):
"""This action executes the provided graph in parallel by selecting
nodes which can run (those which have there dependencies satisified
nodes which can run (those which have there dependencies satisfied
or those with no dependencies) and submitting them to the executor
to be ran, and then after running this process will be repeated until
no more nodes can be ran (or a failure has a occured and all nodes
@@ -110,6 +119,7 @@ class ParallelGraphAction(SequentialGraphAction):
has_failed = threading.Event()
deps_lock = threading.RLock()
deps_counter = self._get_nodes_dependencies_count()
self._future_flow_state = st.SUCCESS
def submit_followups(node):
# Mutating the deps_counter isn't thread safe.
@@ -133,7 +143,11 @@ class ParallelGraphAction(SequentialGraphAction):
return
action = self._action_mapping[node]
try:
action.execute(engine)
if engine.is_running:
action.execute(engine)
else:
self._future_flow_state = st.SUSPENDED
return
except Exception:
# Make sure others don't continue working (although they may
# be already actively working, but u can't stop that anyway).
@@ -189,3 +203,5 @@ class ParallelGraphAction(SequentialGraphAction):
for fail in failures])
elif len(failures) == 1:
failures[0].reraise()
return self._future_flow_state

View File

@@ -36,6 +36,8 @@ STARTED = 'STARTED'
SUCCESS = SUCCESS
CANCELLED = 'CANCELLED'
INCOMPLETE = 'INCOMPLETE'
SUSPENDING = 'SUSPENDING'
SUSPENDED = 'SUSPENDED'
# Task states.
FAILURE = FAILURE

View File

@@ -27,7 +27,7 @@ from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow.engines.action_engine import engine as eng
from taskflow import exceptions
from taskflow import exceptions as exc
from taskflow.persistence.backends import impl_memory
from taskflow.persistence import logbook
from taskflow import states
@@ -113,6 +113,27 @@ class MultiDictTask(task.Task):
return output
class AutoSuspendingTask(TestTask):
def execute(self, engine):
result = super(AutoSuspendingTask, self).execute()
engine.suspend()
return result
def revert(self, egnine, result):
super(AutoSuspendingTask, self).revert(**{'result': result})
class AutoSuspendingTaskOnRevert(TestTask):
def execute(self, engine):
return super(AutoSuspendingTaskOnRevert, self).execute()
def revert(self, engine, result):
super(AutoSuspendingTaskOnRevert, self).revert(**{'result': result})
engine.suspend()
class EngineTestBase(object):
def setUp(self):
super(EngineTestBase, self).setUp()
@@ -251,7 +272,7 @@ class EngineTaskTest(EngineTestBase):
flow = MultiargsTask(provides='result')
engine = self._make_engine(flow)
engine.storage.inject({'a': 1, 'b': 4, 'x': 17})
with self.assertRaises(exceptions.MissingDependencies):
with self.assertRaises(exc.MissingDependencies):
engine.run()
def test_partial_arguments_mapping(self):
@@ -285,7 +306,7 @@ class EngineTaskTest(EngineTestBase):
rebind={'b': 'z'})
engine = self._make_engine(flow)
engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17})
with self.assertRaises(exceptions.MissingDependencies):
with self.assertRaises(exc.MissingDependencies):
engine.run()
def test_invalid_argument_name_list(self):
@@ -294,7 +315,7 @@ class EngineTaskTest(EngineTestBase):
rebind=['a', 'z', 'b'])
engine = self._make_engine(flow)
engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17})
with self.assertRaises(exceptions.MissingDependencies):
with self.assertRaises(exc.MissingDependencies):
engine.run()
def test_bad_rebind_args_value(self):
@@ -487,14 +508,14 @@ class EngineGraphFlowTest(EngineTestBase):
self.assertEquals(self.values, ['task1', 'task2', 'task3', 'task4'])
def test_graph_cyclic_dependency(self):
with self.assertRaisesRegexp(exceptions.DependencyFailure, '^No path'):
with self.assertRaisesRegexp(exc.DependencyFailure, '^No path'):
gf.Flow('g-3-cyclic').add(
TestTask([], name='task1', provides='a', requires=['b']),
TestTask([], name='task2', provides='b', requires=['c']),
TestTask([], name='task3', provides='c', requires=['a']))
def test_graph_two_tasks_returns_same_value(self):
with self.assertRaisesRegexp(exceptions.DependencyFailure,
with self.assertRaisesRegexp(exc.DependencyFailure,
"task2 provides a but is already being"
" provided by task1 and duplicate"
" producers are disallowed"):
@@ -547,7 +568,7 @@ class EngineGraphFlowTest(EngineTestBase):
})
def test_one_task_provides_and_requires_same_data(self):
with self.assertRaisesRegexp(exceptions.DependencyFailure, '^No path'):
with self.assertRaisesRegexp(exc.DependencyFailure, '^No path'):
gf.Flow('g-1-req-error').add(
TestTask([], name='task1', requires=['a'], provides='a'))
@@ -568,10 +589,65 @@ class EngineGraphFlowTest(EngineTestBase):
self.assertTrue(isinstance(graph, networkx.DiGraph))
class SuspendFlowTest(EngineTestBase):
def test_suspend_one_task(self):
flow = AutoSuspendingTask(self.values, 'a')
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS)
self.assertEquals(self.values, ['a'])
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS)
self.assertEquals(self.values, ['a'])
def test_suspend_linear_flow(self):
flow = lf.Flow('linear').add(
TestTask(self.values, 'a'),
AutoSuspendingTask(self.values, 'b'),
TestTask(self.values, 'c')
)
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED)
self.assertEquals(self.values, ['a', 'b'])
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS)
self.assertEquals(self.values, ['a', 'b', 'c'])
def test_suspend_linear_flow_on_revert(self):
flow = lf.Flow('linear').add(
TestTask(self.values, 'a'),
AutoSuspendingTaskOnRevert(self.values, 'b'),
FailingTask(self.values, 'c')
)
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED)
self.assertEquals(self.values,
['a',
'b',
'c reverted(Failure: RuntimeError: Woot!)',
'b reverted(5)'])
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.FAILURE)
self.assertEquals(self.values,
['a',
'b',
'c reverted(Failure: RuntimeError: Woot!)',
'b reverted(5)',
'a reverted(5)'])
class SingleThreadedEngineTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineGraphFlowTest,
SuspendFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None):
if flow_detail is None:
@@ -585,6 +661,7 @@ class MultiThreadedEngineTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineGraphFlowTest,
SuspendFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
if flow_detail is None: