Ensure that the engine finishes up even under sent-in failures
Change-Id: Ic16c854d285398c688f132697c3bb7e637feb9a8
This commit is contained in:
@@ -277,6 +277,13 @@ class ActionEngine(base.Engine):
|
|||||||
# shop...
|
# shop...
|
||||||
closed = True
|
closed = True
|
||||||
self.suspend()
|
self.suspend()
|
||||||
|
except Exception:
|
||||||
|
# Capture the failure, and ensure that the
|
||||||
|
# machine will notice that something externally
|
||||||
|
# has sent an exception in and that it should
|
||||||
|
# finish up and reraise.
|
||||||
|
memory.failures.append(failure.Failure())
|
||||||
|
closed = True
|
||||||
else:
|
else:
|
||||||
if try_suspend:
|
if try_suspend:
|
||||||
self.suspend()
|
self.suspend()
|
||||||
|
|||||||
@@ -301,6 +301,36 @@ class EngineLinearFlowTest(utils.EngineTestBase):
|
|||||||
results = engine.storage.fetch_all()
|
results = engine.storage.fetch_all()
|
||||||
self.assertEqual(2, results['result'])
|
self.assertEqual(2, results['result'])
|
||||||
|
|
||||||
|
def test_sequential_flow_interrupted_externally(self):
|
||||||
|
flow = lf.Flow('flow-1').add(
|
||||||
|
utils.ProgressingTask(name='task1'),
|
||||||
|
utils.ProgressingTask(name='task2'),
|
||||||
|
utils.ProgressingTask(name='task3'),
|
||||||
|
)
|
||||||
|
engine = self._make_engine(flow)
|
||||||
|
|
||||||
|
def _run_engine_and_raise():
|
||||||
|
engine_states = {}
|
||||||
|
engine_it = engine.run_iter()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
engine_state = six.next(engine_it)
|
||||||
|
if engine_state not in engine_states:
|
||||||
|
engine_states[engine_state] = 1
|
||||||
|
else:
|
||||||
|
engine_states[engine_state] += 1
|
||||||
|
if engine_states.get(states.SCHEDULING) == 2:
|
||||||
|
engine_state = engine_it.throw(IOError("I Broke"))
|
||||||
|
if engine_state not in engine_states:
|
||||||
|
engine_states[engine_state] = 1
|
||||||
|
else:
|
||||||
|
engine_states[engine_state] += 1
|
||||||
|
except StopIteration:
|
||||||
|
break
|
||||||
|
|
||||||
|
self.assertRaises(IOError, _run_engine_and_raise)
|
||||||
|
self.assertEqual(states.FAILURE, engine.storage.get_flow_state())
|
||||||
|
|
||||||
def test_sequential_flow_one_task(self):
|
def test_sequential_flow_one_task(self):
|
||||||
flow = lf.Flow('flow-1').add(
|
flow = lf.Flow('flow-1').add(
|
||||||
utils.ProgressingTask(name='task1')
|
utils.ProgressingTask(name='task1')
|
||||||
|
|||||||
Reference in New Issue
Block a user