diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 7084f51b..659711b1 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -277,6 +277,13 @@ class ActionEngine(base.Engine): # shop... closed = True self.suspend() + except Exception: + # Capture the failure, and ensure that the + # machine will notice that something externally + # has sent an exception in and that it should + # finish up and reraise. + memory.failures.append(failure.Failure()) + closed = True else: if try_suspend: self.suspend() diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index cb59a3e0..79db8eae 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -301,6 +301,36 @@ class EngineLinearFlowTest(utils.EngineTestBase): results = engine.storage.fetch_all() self.assertEqual(2, results['result']) + def test_sequential_flow_interrupted_externally(self): + flow = lf.Flow('flow-1').add( + utils.ProgressingTask(name='task1'), + utils.ProgressingTask(name='task2'), + utils.ProgressingTask(name='task3'), + ) + engine = self._make_engine(flow) + + def _run_engine_and_raise(): + engine_states = {} + engine_it = engine.run_iter() + while True: + try: + engine_state = six.next(engine_it) + if engine_state not in engine_states: + engine_states[engine_state] = 1 + else: + engine_states[engine_state] += 1 + if engine_states.get(states.SCHEDULING) == 2: + engine_state = engine_it.throw(IOError("I Broke")) + if engine_state not in engine_states: + engine_states[engine_state] = 1 + else: + engine_states[engine_state] += 1 + except StopIteration: + break + + self.assertRaises(IOError, _run_engine_and_raise) + self.assertEqual(states.FAILURE, engine.storage.get_flow_state()) + def test_sequential_flow_one_task(self): flow = lf.Flow('flow-1').add( utils.ProgressingTask(name='task1')