From 31d25f25b8af2b07f1f41e98bcfb83c89b88dfd9 Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Wed, 30 Oct 2013 17:46:28 +0400 Subject: [PATCH] Don't reset tasks to PENDING state while reverting While flow is in REVERTING or REVERTED state, reverted tasks are now left in REVERTED state. They are then reset to PENDING state when flow is run again. With this change: - when task is in REVERTED state, its result (if task were successfully executed before) is removed, but its failure (if its execution or reversion failed) is still available from storage; - when flow is in REVERTED or FAILURE states, you can get all task failures from storage; - if flow is in REVERTED or REVERTING state, but task is in PENDING state, it means this task was never executed; - flow cannot transition from REVERTED to RUNNING state any more -- it should go through PENDING state; on REVERTED -> PENDING transition tasks of the flow are reset. This ensures that failure information is available until it becomes irrelevant. Closes-bug: #1246612 Change-Id: I27c3891f5d412d6de6240638b5014afda94a58a4 --- taskflow/engines/action_engine/engine.py | 13 ++++- taskflow/engines/action_engine/task_action.py | 4 -- taskflow/states.py | 3 +- taskflow/storage.py | 35 +++++++++++-- taskflow/tests/unit/test_action_engine.py | 31 +++++++----- taskflow/tests/unit/test_storage.py | 50 +++++++++++++++++++ taskflow/tests/unit/test_suspend_flow.py | 29 +++++++++++ 7 files changed, 141 insertions(+), 24 deletions(-) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index f104dac2..39782766 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -97,12 +97,13 @@ class ActionEngine(base.EngineBase): @lock_utils.locked def run(self): """Runs the flow in the engine to completion.""" + if self.storage.get_flow_state() == states.REVERTED: + self._reset() self.compile() external_provides = set(self.storage.fetch_all().keys()) missing = self._flow.requires - external_provides if missing: raise exc.MissingDependencies(self._flow, sorted(missing)) - if self._failures: self._revert() else: @@ -152,6 +153,16 @@ class ActionEngine(base.EngineBase): result=result) self.task_notifier.notify(state, details) + def _reset(self): + for name, uuid in self.storage.reset_tasks(): + details = dict(engine=self, + task_name=name, + task_uuid=uuid, + result=None) + self.task_notifier.notify(states.PENDING, details) + self._failures = {} + self._change_state(states.PENDING) + @lock_utils.locked def compile(self): """Compiles the contained flow into a structure which the engine can diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index a866b327..e8c97045 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -26,7 +26,6 @@ from taskflow.utils import misc LOG = logging.getLogger(__name__) -RESET_TASK_STATES = (states.PENDING,) SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE) @@ -58,8 +57,6 @@ class TaskAction(base.Action): old_state = engine.storage.get_task_state(self.uuid) if not states.check_task_transition(old_state, state): return False - if state in RESET_TASK_STATES: - engine.storage.reset(self.uuid) if state in SAVE_RESULT_STATES: engine.storage.save(self.uuid, result, state) else: @@ -121,4 +118,3 @@ class TaskAction(base.Action): with excutils.save_and_reraise_exception(): self._change_state(engine, states.FAILURE) self._change_state_update_task(engine, states.REVERTED, 1.0) - self._change_state_update_task(engine, states.PENDING, 0.0) diff --git a/taskflow/states.py b/taskflow/states.py index dcb65165..20798eb2 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -67,7 +67,7 @@ _ALLOWED_FLOW_TRANSITIONS = frozenset(( (REVERTING, FAILURE), # revert failed (REVERTING, SUSPENDING), # engine.suspend was called - (REVERTED, RUNNING), # try again + (REVERTED, PENDING), # try again (SUSPENDING, SUSPENDED), # suspend finished (SUSPENDING, SUCCESS), # all tasks finished while we were waiting @@ -96,7 +96,6 @@ _ALLOWED_FLOW_TRANSITIONS = frozenset(( # successfully while we were waiting, flow can be transitioned from # SUSPENDING to SUCCESS state. - _IGNORED_FLOW_TRANSITIONS = frozenset( (a, b) for a in (PENDING, FAILURE, SUCCESS, SUSPENDED, REVERTED) diff --git a/taskflow/storage.py b/taskflow/storage.py index 5e659558..d72d661e 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -223,18 +223,43 @@ class Storage(object): def get(self, uuid): """Get result for task with id 'uuid' to storage""" td = self._taskdetail_by_uuid(uuid) - if td.state not in STATES_WITH_RESULTS: - raise exceptions.NotFound("Result for task %r is not known" % uuid) if td.failure: return td.failure + if td.state not in STATES_WITH_RESULTS: + raise exceptions.NotFound("Result for task %r is not known" % uuid) return td.results + def _reset_task(self, td, state): + if td.name == self.injector_name: + return False + if td.state == state: + return False + td.results = None + td.failure = None + td.state = state + return True + def reset(self, uuid, state=states.PENDING): """Remove result for task with id 'uuid' from storage""" td = self._taskdetail_by_uuid(uuid) - td.results = None - td.state = state - self._with_connection(self._save_task_detail, task_detail=td) + if self._reset_task(td, state): + self._with_connection(self._save_task_detail, task_detail=td) + + def reset_tasks(self): + """Reset all tasks to PENDING state, removing results. + + Returns list of (name, uuid) tuples for all tasks that were reset. + """ + result = [] + + def do_reset_all(connection): + for td in self._flowdetail: + if self._reset_task(td, states.PENDING): + self._save_task_detail(connection, td) + result.append((td.name, td.uuid)) + + self._with_connection(do_reset_all) + return result def inject(self, pairs): """Add values into storage diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 52498741..42189fb4 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -74,20 +74,25 @@ class EngineTaskTest(utils.EngineTestBase): kwargs={'values': self.values}) engine.task_notifier.register('*', self._callback, kwargs={'values': self.values}) + expected = ['flow RUNNING', + 'fail RUNNING', + 'fail FAILURE', + 'flow FAILURE', + 'flow REVERTING', + 'fail REVERTING', + 'fail reverted(Failure: RuntimeError: Woot!)', + 'fail REVERTED', + 'flow REVERTED'] with self.assertRaisesRegexp(RuntimeError, '^Woot'): engine.run() - self.assertEquals( - self.values, - ['flow RUNNING', - 'fail RUNNING', - 'fail FAILURE', - 'flow FAILURE', - 'flow REVERTING', - 'fail REVERTING', - 'fail reverted(Failure: RuntimeError: Woot!)', - 'fail REVERTED', - 'fail PENDING', - 'flow REVERTED']) + self.assertEquals(self.values, expected) + self.assertEquals(engine.storage.get_flow_state(), states.REVERTED) + + with self.assertRaisesRegexp(RuntimeError, '^Woot'): + engine.run() + now_expected = expected + ['fail PENDING', 'flow PENDING'] + expected + self.assertEquals(self.values, now_expected) + self.assertEquals(engine.storage.get_flow_state(), states.REVERTED) def test_invalid_flow_raises(self): value = 'i am string, not task/flow, sorry' @@ -472,6 +477,7 @@ class EngineGraphFlowTest(utils.EngineTestBase): ['task1', 'task2', 'task3 reverted(Failure: RuntimeError: Woot!)', 'task2 reverted(5)', 'task1 reverted(5)']) + self.assertEquals(engine.storage.get_flow_state(), states.REVERTED) def test_graph_flow_four_tasks_revert_failure(self): flow = gf.Flow('g-3-nasty').add( @@ -482,6 +488,7 @@ class EngineGraphFlowTest(utils.EngineTestBase): engine = self._make_engine(flow) with self.assertRaisesRegexp(RuntimeError, '^Gotcha'): engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.FAILURE) def test_graph_flow_with_multireturn_and_multiargs_tasks(self): flow = gf.Flow('g-3-multi').add( diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index f1fcefb1..b9760e25 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -95,6 +95,18 @@ class StorageTest(test.TestCase): self.assertEquals(s.get('42'), fail) self.assertEquals(s.get_task_state('42'), states.FAILURE) + def test_get_failure_from_reverted_task(self): + fail = misc.Failure(exc_info=(RuntimeError, RuntimeError(), None)) + s = self._get_storage() + s.add_task('42', 'my task') + s.save('42', fail, states.FAILURE) + + s.set_task_state('42', states.REVERTING) + self.assertEquals(s.get('42'), fail) + + s.set_task_state('42', states.REVERTED) + self.assertEquals(s.get('42'), fail) + def test_get_non_existing_var(self): s = self._get_storage() s.add_task('42', 'my task') @@ -115,6 +127,36 @@ class StorageTest(test.TestCase): s.add_task('42', 'my task') self.assertEquals(s.reset('42'), None) + def test_reset_tasks(self): + s = self._get_storage() + s.add_task('42', 'my task') + s.save('42', 5) + s.add_task('43', 'my other task') + s.save('43', 7) + + s.reset_tasks() + + self.assertEquals(s.get_task_state('42'), states.PENDING) + with self.assertRaises(exceptions.NotFound): + s.get('42') + self.assertEquals(s.get_task_state('43'), states.PENDING) + with self.assertRaises(exceptions.NotFound): + s.get('43') + + def test_reset_tasks_does_not_breaks_inject(self): + s = self._get_storage() + s.inject({'foo': 'bar', 'spam': 'eggs'}) + + # NOTE(imelnikov): injecting is implemented as special task + # so resetting tasks may break it if implemented incorrectly + s.reset_tasks() + + self.assertEquals(s.fetch('spam'), 'eggs') + self.assertEquals(s.fetch_all(), { + 'foo': 'bar', + 'spam': 'eggs', + }) + def test_fetch_by_name(self): s = self._get_storage() s.add_task('42', 'my task') @@ -161,6 +203,14 @@ class StorageTest(test.TestCase): 'details': {'test_data': 17} }) + def test_task_progress_erase(self): + s = self._get_storage() + s.add_task('42', 'my task') + + s.set_task_progress('42', 0.8, {}) + self.assertEquals(s.get_task_progress('42'), 0.8) + self.assertEquals(s.get_task_progress_details('42'), None) + def test_fetch_result_not_ready(self): s = self._get_storage() s.add_task('42', 'my task') diff --git a/taskflow/tests/unit/test_suspend_flow.py b/taskflow/tests/unit/test_suspend_flow.py index a4c8ed5f..8fcc059b 100644 --- a/taskflow/tests/unit/test_suspend_flow.py +++ b/taskflow/tests/unit/test_suspend_flow.py @@ -143,6 +143,35 @@ class SuspendFlowTest(utils.EngineTestBase): 'b reverted(5)', 'a reverted(5)']) + def test_suspend_and_resume_linear_flow_on_revert(self): + flow = lf.Flow('linear').add( + TestTask(self.values, 'a'), + AutoSuspendingTaskOnRevert(self.values, 'b'), + FailingTask(self.values, 'c') + ) + engine = self._make_engine(flow) + engine.storage.inject({'engine': engine}) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED) + self.assertEquals( + self.values, + ['a', 'b', + 'c reverted(Failure: RuntimeError: Woot!)', + 'b reverted(5)']) + + # pretend we are resuming + engine2 = self._make_engine(flow, engine.storage._flowdetail) + with self.assertRaisesRegexp(RuntimeError, '^Woot'): + engine2.run() + self.assertEquals(engine2.storage.get_flow_state(), states.REVERTED) + self.assertEquals( + self.values, + ['a', + 'b', + 'c reverted(Failure: RuntimeError: Woot!)', + 'b reverted(5)', + 'a reverted(5)']) + def test_storage_is_rechecked(self): flow = lf.Flow('linear').add( AutoSuspendingTask(self.values, 'b'),