Merge "Add more tests for resumption with retry"
This commit is contained in:
@@ -546,7 +546,7 @@ class RetryTest(utils.EngineTestBase):
|
|||||||
engine.storage.inject({'values': values, 'y': 1})
|
engine.storage.inject({'values': values, 'y': 1})
|
||||||
self.assertRaisesRegexp(exc.NotFound, '^No elements left', engine.run)
|
self.assertRaisesRegexp(exc.NotFound, '^No elements left', engine.run)
|
||||||
|
|
||||||
def test_retry_after_failure_before_processing_failure(self):
|
def _pretend_to_run_a_flow_and_crash(self, when):
|
||||||
flow = uf.Flow('flow-1', retry.Times(3, provides='x')).add(
|
flow = uf.Flow('flow-1', retry.Times(3, provides='x')).add(
|
||||||
utils.SaveOrderTask('task1'))
|
utils.SaveOrderTask('task1'))
|
||||||
engine = self._make_engine(flow)
|
engine = self._make_engine(flow)
|
||||||
@@ -557,12 +557,63 @@ class RetryTest(utils.EngineTestBase):
|
|||||||
engine.storage.set_atom_intention('task1', st.EXECUTE)
|
engine.storage.set_atom_intention('task1', st.EXECUTE)
|
||||||
# we execute retry
|
# we execute retry
|
||||||
engine.storage.save('flow-1_retry', 1)
|
engine.storage.save('flow-1_retry', 1)
|
||||||
# task fails (if we comment it out, it works)
|
# task fails
|
||||||
engine.storage.save('task1',
|
fail = misc.Failure.from_exception(RuntimeError('foo')),
|
||||||
misc.Failure.from_exception(RuntimeError('foo')),
|
engine.storage.save('task1', fail, state=st.FAILURE)
|
||||||
state=st.FAILURE)
|
if when == 'task fails':
|
||||||
|
return engine
|
||||||
|
# we save it's failure to retry and ask what to do
|
||||||
|
engine.storage.save_retry_failure('flow-1_retry', 'task1', fail)
|
||||||
|
if when == 'retry queried':
|
||||||
|
return engine
|
||||||
|
# it returned 'RETRY', so we update it's intention
|
||||||
|
engine.storage.set_atom_intention('flow-1_retry', st.RETRY)
|
||||||
|
if when == 'retry updated':
|
||||||
|
return engine
|
||||||
|
# we set task1 intention to REVERT
|
||||||
|
engine.storage.set_atom_intention('task1', st.REVERT)
|
||||||
|
if when == 'task updated':
|
||||||
|
return engine
|
||||||
|
# we schedule task1 for reversion
|
||||||
|
engine.storage.set_task_state('task1', st.REVERTING)
|
||||||
|
if when == 'revert scheduled':
|
||||||
|
return engine
|
||||||
|
raise ValueError('Invalid crash point: %s' % when)
|
||||||
|
|
||||||
|
def test_resumption_on_crash_after_task_failure(self):
|
||||||
|
engine = self._pretend_to_run_a_flow_and_crash('task fails')
|
||||||
# then process die and we resume engine
|
# then process die and we resume engine
|
||||||
engine.run()
|
engine.run()
|
||||||
|
expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
|
||||||
|
self.assertEqual(self.values, expected)
|
||||||
|
|
||||||
|
def test_resumption_on_crash_after_retry_queried(self):
|
||||||
|
engine = self._pretend_to_run_a_flow_and_crash('retry queried')
|
||||||
|
# then process die and we resume engine
|
||||||
|
engine.run()
|
||||||
|
expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
|
||||||
|
self.assertEqual(self.values, expected)
|
||||||
|
|
||||||
|
def test_resumption_on_crash_after_retry_updated(self):
|
||||||
|
engine = self._pretend_to_run_a_flow_and_crash('retry updated')
|
||||||
|
# then process die and we resume engine
|
||||||
|
engine.run()
|
||||||
|
expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
|
||||||
|
self.assertEqual(self.values, expected)
|
||||||
|
|
||||||
|
def test_resumption_on_crash_after_task_updated(self):
|
||||||
|
engine = self._pretend_to_run_a_flow_and_crash('task updated')
|
||||||
|
# then process die and we resume engine
|
||||||
|
engine.run()
|
||||||
|
expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
|
||||||
|
self.assertEqual(self.values, expected)
|
||||||
|
|
||||||
|
def test_resumption_on_crash_after_revert_scheduled(self):
|
||||||
|
engine = self._pretend_to_run_a_flow_and_crash('revert scheduled')
|
||||||
|
# then process die and we resume engine
|
||||||
|
engine.run()
|
||||||
|
expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
|
||||||
|
self.assertEqual(self.values, expected)
|
||||||
|
|
||||||
def test_retry_fails(self):
|
def test_retry_fails(self):
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user