From 4b93f2c7a938b451c078e0a5ad608f483ed9aeed Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Tue, 18 Mar 2014 11:34:36 +0400 Subject: [PATCH] Add more tests for resumption with retry Cover cases where crash happens at different point of atom failure processing. Change-Id: I3ba1cacbcec2a052e91f508debf3fef9b04c3d42 --- taskflow/tests/unit/test_retries.py | 61 ++++++++++++++++++++++++++--- 1 file changed, 56 insertions(+), 5 deletions(-) diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 5d475029e..e33b70120 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -546,7 +546,7 @@ class RetryTest(utils.EngineTestBase): engine.storage.inject({'values': values, 'y': 1}) self.assertRaisesRegexp(exc.NotFound, '^No elements left', engine.run) - def test_retry_after_failure_before_processing_failure(self): + def _pretend_to_run_a_flow_and_crash(self, when): flow = uf.Flow('flow-1', retry.Times(3, provides='x')).add( utils.SaveOrderTask('task1')) engine = self._make_engine(flow) @@ -557,12 +557,63 @@ class RetryTest(utils.EngineTestBase): engine.storage.set_atom_intention('task1', st.EXECUTE) # we execute retry engine.storage.save('flow-1_retry', 1) - # task fails (if we comment it out, it works) - engine.storage.save('task1', - misc.Failure.from_exception(RuntimeError('foo')), - state=st.FAILURE) + # task fails + fail = misc.Failure.from_exception(RuntimeError('foo')), + engine.storage.save('task1', fail, state=st.FAILURE) + if when == 'task fails': + return engine + # we save it's failure to retry and ask what to do + engine.storage.save_retry_failure('flow-1_retry', 'task1', fail) + if when == 'retry queried': + return engine + # it returned 'RETRY', so we update it's intention + engine.storage.set_atom_intention('flow-1_retry', st.RETRY) + if when == 'retry updated': + return engine + # we set task1 intention to REVERT + engine.storage.set_atom_intention('task1', st.REVERT) + if when == 'task updated': + return engine + # we schedule task1 for reversion + engine.storage.set_task_state('task1', st.REVERTING) + if when == 'revert scheduled': + return engine + raise ValueError('Invalid crash point: %s' % when) + + def test_resumption_on_crash_after_task_failure(self): + engine = self._pretend_to_run_a_flow_and_crash('task fails') # then process die and we resume engine engine.run() + expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1'] + self.assertEqual(self.values, expected) + + def test_resumption_on_crash_after_retry_queried(self): + engine = self._pretend_to_run_a_flow_and_crash('retry queried') + # then process die and we resume engine + engine.run() + expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1'] + self.assertEqual(self.values, expected) + + def test_resumption_on_crash_after_retry_updated(self): + engine = self._pretend_to_run_a_flow_and_crash('retry updated') + # then process die and we resume engine + engine.run() + expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1'] + self.assertEqual(self.values, expected) + + def test_resumption_on_crash_after_task_updated(self): + engine = self._pretend_to_run_a_flow_and_crash('task updated') + # then process die and we resume engine + engine.run() + expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1'] + self.assertEqual(self.values, expected) + + def test_resumption_on_crash_after_revert_scheduled(self): + engine = self._pretend_to_run_a_flow_and_crash('revert scheduled') + # then process die and we resume engine + engine.run() + expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1'] + self.assertEqual(self.values, expected) def test_retry_fails(self):