From 6957eec3fd6b96fb76a562f20d3b57995e547d8f Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 20 Jan 2016 16:51:31 -0800 Subject: [PATCH] Ensure that the engine finishes up even under sent-in failures Change-Id: Ic16c854d285398c688f132697c3bb7e637feb9a8 --- taskflow/engines/action_engine/engine.py | 7 ++++++ taskflow/tests/unit/test_engines.py | 30 ++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 7084f51b..659711b1 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -277,6 +277,13 @@ class ActionEngine(base.Engine): # shop... closed = True self.suspend() + except Exception: + # Capture the failure, and ensure that the + # machine will notice that something externally + # has sent an exception in and that it should + # finish up and reraise. + memory.failures.append(failure.Failure()) + closed = True else: if try_suspend: self.suspend() diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index cb59a3e0..79db8eae 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -301,6 +301,36 @@ class EngineLinearFlowTest(utils.EngineTestBase): results = engine.storage.fetch_all() self.assertEqual(2, results['result']) + def test_sequential_flow_interrupted_externally(self): + flow = lf.Flow('flow-1').add( + utils.ProgressingTask(name='task1'), + utils.ProgressingTask(name='task2'), + utils.ProgressingTask(name='task3'), + ) + engine = self._make_engine(flow) + + def _run_engine_and_raise(): + engine_states = {} + engine_it = engine.run_iter() + while True: + try: + engine_state = six.next(engine_it) + if engine_state not in engine_states: + engine_states[engine_state] = 1 + else: + engine_states[engine_state] += 1 + if engine_states.get(states.SCHEDULING) == 2: + engine_state = engine_it.throw(IOError("I Broke")) + if engine_state not in engine_states: + engine_states[engine_state] = 1 + else: + engine_states[engine_state] += 1 + except StopIteration: + break + + self.assertRaises(IOError, _run_engine_and_raise) + self.assertEqual(states.FAILURE, engine.storage.get_flow_state()) + def test_sequential_flow_one_task(self): flow = lf.Flow('flow-1').add( utils.ProgressingTask(name='task1')