Add more tests for resumption with retry

Cover cases where crash happens at different point of atom
failure processing.

Change-Id: I3ba1cacbcec2a052e91f508debf3fef9b04c3d42
This commit is contained in:
Ivan A. Melnikov
2014-03-18 11:34:36 +04:00
parent bb6cf4cd2c
commit 4b93f2c7a9

View File

@@ -546,7 +546,7 @@ class RetryTest(utils.EngineTestBase):
engine.storage.inject({'values': values, 'y': 1})
self.assertRaisesRegexp(exc.NotFound, '^No elements left', engine.run)
def test_retry_after_failure_before_processing_failure(self):
def _pretend_to_run_a_flow_and_crash(self, when):
flow = uf.Flow('flow-1', retry.Times(3, provides='x')).add(
utils.SaveOrderTask('task1'))
engine = self._make_engine(flow)
@@ -557,12 +557,63 @@ class RetryTest(utils.EngineTestBase):
engine.storage.set_atom_intention('task1', st.EXECUTE)
# we execute retry
engine.storage.save('flow-1_retry', 1)
# task fails (if we comment it out, it works)
engine.storage.save('task1',
misc.Failure.from_exception(RuntimeError('foo')),
state=st.FAILURE)
# task fails
fail = misc.Failure.from_exception(RuntimeError('foo')),
engine.storage.save('task1', fail, state=st.FAILURE)
if when == 'task fails':
return engine
# we save it's failure to retry and ask what to do
engine.storage.save_retry_failure('flow-1_retry', 'task1', fail)
if when == 'retry queried':
return engine
# it returned 'RETRY', so we update it's intention
engine.storage.set_atom_intention('flow-1_retry', st.RETRY)
if when == 'retry updated':
return engine
# we set task1 intention to REVERT
engine.storage.set_atom_intention('task1', st.REVERT)
if when == 'task updated':
return engine
# we schedule task1 for reversion
engine.storage.set_task_state('task1', st.REVERTING)
if when == 'revert scheduled':
return engine
raise ValueError('Invalid crash point: %s' % when)
def test_resumption_on_crash_after_task_failure(self):
engine = self._pretend_to_run_a_flow_and_crash('task fails')
# then process die and we resume engine
engine.run()
expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
self.assertEqual(self.values, expected)
def test_resumption_on_crash_after_retry_queried(self):
engine = self._pretend_to_run_a_flow_and_crash('retry queried')
# then process die and we resume engine
engine.run()
expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
self.assertEqual(self.values, expected)
def test_resumption_on_crash_after_retry_updated(self):
engine = self._pretend_to_run_a_flow_and_crash('retry updated')
# then process die and we resume engine
engine.run()
expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
self.assertEqual(self.values, expected)
def test_resumption_on_crash_after_task_updated(self):
engine = self._pretend_to_run_a_flow_and_crash('task updated')
# then process die and we resume engine
engine.run()
expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
self.assertEqual(self.values, expected)
def test_resumption_on_crash_after_revert_scheduled(self):
engine = self._pretend_to_run_a_flow_and_crash('revert scheduled')
# then process die and we resume engine
engine.run()
expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
self.assertEqual(self.values, expected)
def test_retry_fails(self):