diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index f104dac2..39782766 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -97,12 +97,13 @@ class ActionEngine(base.EngineBase): @lock_utils.locked def run(self): """Runs the flow in the engine to completion.""" + if self.storage.get_flow_state() == states.REVERTED: + self._reset() self.compile() external_provides = set(self.storage.fetch_all().keys()) missing = self._flow.requires - external_provides if missing: raise exc.MissingDependencies(self._flow, sorted(missing)) - if self._failures: self._revert() else: @@ -152,6 +153,16 @@ class ActionEngine(base.EngineBase): result=result) self.task_notifier.notify(state, details) + def _reset(self): + for name, uuid in self.storage.reset_tasks(): + details = dict(engine=self, + task_name=name, + task_uuid=uuid, + result=None) + self.task_notifier.notify(states.PENDING, details) + self._failures = {} + self._change_state(states.PENDING) + @lock_utils.locked def compile(self): """Compiles the contained flow into a structure which the engine can diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index a866b327..e8c97045 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -26,7 +26,6 @@ from taskflow.utils import misc LOG = logging.getLogger(__name__) -RESET_TASK_STATES = (states.PENDING,) SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE) @@ -58,8 +57,6 @@ class TaskAction(base.Action): old_state = engine.storage.get_task_state(self.uuid) if not states.check_task_transition(old_state, state): return False - if state in RESET_TASK_STATES: - engine.storage.reset(self.uuid) if state in SAVE_RESULT_STATES: engine.storage.save(self.uuid, result, state) else: @@ -121,4 +118,3 @@ class TaskAction(base.Action): with excutils.save_and_reraise_exception(): self._change_state(engine, states.FAILURE) self._change_state_update_task(engine, states.REVERTED, 1.0) - self._change_state_update_task(engine, states.PENDING, 0.0) diff --git a/taskflow/states.py b/taskflow/states.py index dcb65165..20798eb2 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -67,7 +67,7 @@ _ALLOWED_FLOW_TRANSITIONS = frozenset(( (REVERTING, FAILURE), # revert failed (REVERTING, SUSPENDING), # engine.suspend was called - (REVERTED, RUNNING), # try again + (REVERTED, PENDING), # try again (SUSPENDING, SUSPENDED), # suspend finished (SUSPENDING, SUCCESS), # all tasks finished while we were waiting @@ -96,7 +96,6 @@ _ALLOWED_FLOW_TRANSITIONS = frozenset(( # successfully while we were waiting, flow can be transitioned from # SUSPENDING to SUCCESS state. - _IGNORED_FLOW_TRANSITIONS = frozenset( (a, b) for a in (PENDING, FAILURE, SUCCESS, SUSPENDED, REVERTED) diff --git a/taskflow/storage.py b/taskflow/storage.py index 5e659558..d72d661e 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -223,18 +223,43 @@ class Storage(object): def get(self, uuid): """Get result for task with id 'uuid' to storage""" td = self._taskdetail_by_uuid(uuid) - if td.state not in STATES_WITH_RESULTS: - raise exceptions.NotFound("Result for task %r is not known" % uuid) if td.failure: return td.failure + if td.state not in STATES_WITH_RESULTS: + raise exceptions.NotFound("Result for task %r is not known" % uuid) return td.results + def _reset_task(self, td, state): + if td.name == self.injector_name: + return False + if td.state == state: + return False + td.results = None + td.failure = None + td.state = state + return True + def reset(self, uuid, state=states.PENDING): """Remove result for task with id 'uuid' from storage""" td = self._taskdetail_by_uuid(uuid) - td.results = None - td.state = state - self._with_connection(self._save_task_detail, task_detail=td) + if self._reset_task(td, state): + self._with_connection(self._save_task_detail, task_detail=td) + + def reset_tasks(self): + """Reset all tasks to PENDING state, removing results. + + Returns list of (name, uuid) tuples for all tasks that were reset. + """ + result = [] + + def do_reset_all(connection): + for td in self._flowdetail: + if self._reset_task(td, states.PENDING): + self._save_task_detail(connection, td) + result.append((td.name, td.uuid)) + + self._with_connection(do_reset_all) + return result def inject(self, pairs): """Add values into storage diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 52498741..42189fb4 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -74,20 +74,25 @@ class EngineTaskTest(utils.EngineTestBase): kwargs={'values': self.values}) engine.task_notifier.register('*', self._callback, kwargs={'values': self.values}) + expected = ['flow RUNNING', + 'fail RUNNING', + 'fail FAILURE', + 'flow FAILURE', + 'flow REVERTING', + 'fail REVERTING', + 'fail reverted(Failure: RuntimeError: Woot!)', + 'fail REVERTED', + 'flow REVERTED'] with self.assertRaisesRegexp(RuntimeError, '^Woot'): engine.run() - self.assertEquals( - self.values, - ['flow RUNNING', - 'fail RUNNING', - 'fail FAILURE', - 'flow FAILURE', - 'flow REVERTING', - 'fail REVERTING', - 'fail reverted(Failure: RuntimeError: Woot!)', - 'fail REVERTED', - 'fail PENDING', - 'flow REVERTED']) + self.assertEquals(self.values, expected) + self.assertEquals(engine.storage.get_flow_state(), states.REVERTED) + + with self.assertRaisesRegexp(RuntimeError, '^Woot'): + engine.run() + now_expected = expected + ['fail PENDING', 'flow PENDING'] + expected + self.assertEquals(self.values, now_expected) + self.assertEquals(engine.storage.get_flow_state(), states.REVERTED) def test_invalid_flow_raises(self): value = 'i am string, not task/flow, sorry' @@ -472,6 +477,7 @@ class EngineGraphFlowTest(utils.EngineTestBase): ['task1', 'task2', 'task3 reverted(Failure: RuntimeError: Woot!)', 'task2 reverted(5)', 'task1 reverted(5)']) + self.assertEquals(engine.storage.get_flow_state(), states.REVERTED) def test_graph_flow_four_tasks_revert_failure(self): flow = gf.Flow('g-3-nasty').add( @@ -482,6 +488,7 @@ class EngineGraphFlowTest(utils.EngineTestBase): engine = self._make_engine(flow) with self.assertRaisesRegexp(RuntimeError, '^Gotcha'): engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.FAILURE) def test_graph_flow_with_multireturn_and_multiargs_tasks(self): flow = gf.Flow('g-3-multi').add( diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index f1fcefb1..b9760e25 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -95,6 +95,18 @@ class StorageTest(test.TestCase): self.assertEquals(s.get('42'), fail) self.assertEquals(s.get_task_state('42'), states.FAILURE) + def test_get_failure_from_reverted_task(self): + fail = misc.Failure(exc_info=(RuntimeError, RuntimeError(), None)) + s = self._get_storage() + s.add_task('42', 'my task') + s.save('42', fail, states.FAILURE) + + s.set_task_state('42', states.REVERTING) + self.assertEquals(s.get('42'), fail) + + s.set_task_state('42', states.REVERTED) + self.assertEquals(s.get('42'), fail) + def test_get_non_existing_var(self): s = self._get_storage() s.add_task('42', 'my task') @@ -115,6 +127,36 @@ class StorageTest(test.TestCase): s.add_task('42', 'my task') self.assertEquals(s.reset('42'), None) + def test_reset_tasks(self): + s = self._get_storage() + s.add_task('42', 'my task') + s.save('42', 5) + s.add_task('43', 'my other task') + s.save('43', 7) + + s.reset_tasks() + + self.assertEquals(s.get_task_state('42'), states.PENDING) + with self.assertRaises(exceptions.NotFound): + s.get('42') + self.assertEquals(s.get_task_state('43'), states.PENDING) + with self.assertRaises(exceptions.NotFound): + s.get('43') + + def test_reset_tasks_does_not_breaks_inject(self): + s = self._get_storage() + s.inject({'foo': 'bar', 'spam': 'eggs'}) + + # NOTE(imelnikov): injecting is implemented as special task + # so resetting tasks may break it if implemented incorrectly + s.reset_tasks() + + self.assertEquals(s.fetch('spam'), 'eggs') + self.assertEquals(s.fetch_all(), { + 'foo': 'bar', + 'spam': 'eggs', + }) + def test_fetch_by_name(self): s = self._get_storage() s.add_task('42', 'my task') @@ -161,6 +203,14 @@ class StorageTest(test.TestCase): 'details': {'test_data': 17} }) + def test_task_progress_erase(self): + s = self._get_storage() + s.add_task('42', 'my task') + + s.set_task_progress('42', 0.8, {}) + self.assertEquals(s.get_task_progress('42'), 0.8) + self.assertEquals(s.get_task_progress_details('42'), None) + def test_fetch_result_not_ready(self): s = self._get_storage() s.add_task('42', 'my task') diff --git a/taskflow/tests/unit/test_suspend_flow.py b/taskflow/tests/unit/test_suspend_flow.py index a4c8ed5f..8fcc059b 100644 --- a/taskflow/tests/unit/test_suspend_flow.py +++ b/taskflow/tests/unit/test_suspend_flow.py @@ -143,6 +143,35 @@ class SuspendFlowTest(utils.EngineTestBase): 'b reverted(5)', 'a reverted(5)']) + def test_suspend_and_resume_linear_flow_on_revert(self): + flow = lf.Flow('linear').add( + TestTask(self.values, 'a'), + AutoSuspendingTaskOnRevert(self.values, 'b'), + FailingTask(self.values, 'c') + ) + engine = self._make_engine(flow) + engine.storage.inject({'engine': engine}) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED) + self.assertEquals( + self.values, + ['a', 'b', + 'c reverted(Failure: RuntimeError: Woot!)', + 'b reverted(5)']) + + # pretend we are resuming + engine2 = self._make_engine(flow, engine.storage._flowdetail) + with self.assertRaisesRegexp(RuntimeError, '^Woot'): + engine2.run() + self.assertEquals(engine2.storage.get_flow_state(), states.REVERTED) + self.assertEquals( + self.values, + ['a', + 'b', + 'c reverted(Failure: RuntimeError: Woot!)', + 'b reverted(5)', + 'a reverted(5)']) + def test_storage_is_rechecked(self): flow = lf.Flow('linear').add( AutoSuspendingTask(self.values, 'b'),