diff --git a/taskflow/tests/unit/test_suspend_flow.py b/taskflow/tests/unit/test_suspend_flow.py index 47e31efd..145e627d 100644 --- a/taskflow/tests/unit/test_suspend_flow.py +++ b/taskflow/tests/unit/test_suspend_flow.py @@ -20,6 +20,7 @@ import testtools import taskflow.engines from taskflow import exceptions as exc +from taskflow.listeners import base as lbase from taskflow.patterns import linear_flow as lf from taskflow import states from taskflow import test @@ -27,32 +28,26 @@ from taskflow.tests import utils from taskflow.utils import eventlet_utils as eu -class AutoSuspendingTask(utils.SaveOrderTask): +class SuspendingListener(lbase.ListenerBase): - def execute(self, engine): - result = super(AutoSuspendingTask, self).execute() - engine.suspend() - return result + def __init__(self, engine, task_name, task_state): + super(SuspendingListener, self).__init__( + engine, task_listen_for=(task_state,)) + self._task_name = task_name - -class AutoSuspendingTaskOnRevert(utils.SaveOrderTask): - - def execute(self, engine): - return super(AutoSuspendingTaskOnRevert, self).execute() - - def revert(self, engine, result, flow_failures): - super(AutoSuspendingTaskOnRevert, self).revert( - result=result, flow_failures=flow_failures) - engine.suspend() + def _task_receiver(self, state, details): + if details['task_name'] == self._task_name: + self._engine.suspend() class SuspendFlowTest(utils.EngineTestBase): def test_suspend_one_task(self): - flow = AutoSuspendingTask('a') + flow = utils.SaveOrderTask('a') engine = self._make_engine(flow) - engine.storage.inject({'engine': engine}) - engine.run() + with SuspendingListener(engine, task_name='b', + task_state=states.SUCCESS): + engine.run() self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS) self.assertEqual(self.values, ['a']) engine.run() @@ -62,12 +57,13 @@ class SuspendFlowTest(utils.EngineTestBase): def test_suspend_linear_flow(self): flow = lf.Flow('linear').add( utils.SaveOrderTask('a'), - AutoSuspendingTask('b'), + utils.SaveOrderTask('b'), utils.SaveOrderTask('c') ) engine = self._make_engine(flow) - engine.storage.inject({'engine': engine}) - engine.run() + with SuspendingListener(engine, task_name='b', + task_state=states.SUCCESS): + engine.run() self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED) self.assertEqual(self.values, ['a', 'b']) engine.run() @@ -77,12 +73,13 @@ class SuspendFlowTest(utils.EngineTestBase): def test_suspend_linear_flow_on_revert(self): flow = lf.Flow('linear').add( utils.SaveOrderTask('a'), - AutoSuspendingTaskOnRevert('b'), + utils.SaveOrderTask('b'), utils.FailingTask('c') ) engine = self._make_engine(flow) - engine.storage.inject({'engine': engine}) - engine.run() + with SuspendingListener(engine, task_name='b', + task_state=states.REVERTED): + engine.run() self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED) self.assertEqual( self.values, @@ -102,12 +99,14 @@ class SuspendFlowTest(utils.EngineTestBase): def test_suspend_and_resume_linear_flow_on_revert(self): flow = lf.Flow('linear').add( utils.SaveOrderTask('a'), - AutoSuspendingTaskOnRevert('b'), + utils.SaveOrderTask('b'), utils.FailingTask('c') ) engine = self._make_engine(flow) - engine.storage.inject({'engine': engine}) - engine.run() + + with SuspendingListener(engine, task_name='b', + task_state=states.REVERTED): + engine.run() # pretend we are resuming engine2 = self._make_engine(flow, engine.storage._flowdetail) @@ -124,41 +123,45 @@ class SuspendFlowTest(utils.EngineTestBase): def test_suspend_and_revert_even_if_task_is_gone(self): flow = lf.Flow('linear').add( utils.SaveOrderTask('a'), - AutoSuspendingTaskOnRevert('b'), + utils.SaveOrderTask('b'), utils.FailingTask('c') ) engine = self._make_engine(flow) - engine.storage.inject({'engine': engine}) - engine.run() + + with SuspendingListener(engine, task_name='b', + task_state=states.REVERTED): + engine.run() + + expected_values = ['a', 'b', + 'c reverted(Failure: RuntimeError: Woot!)', + 'b reverted(5)'] + self.assertEqual(self.values, expected_values) # pretend we are resuming, but task 'c' gone when flow got updated flow2 = lf.Flow('linear').add( utils.SaveOrderTask('a'), - AutoSuspendingTaskOnRevert('b') + utils.SaveOrderTask('b') ) engine2 = self._make_engine(flow2, engine.storage._flowdetail) self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run) self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED) - self.assertEqual( - self.values, - ['a', - 'b', - 'c reverted(Failure: RuntimeError: Woot!)', - 'b reverted(5)', - 'a reverted(5)']) + expected_values.append('a reverted(5)') + self.assertEqual(self.values, expected_values) def test_storage_is_rechecked(self): flow = lf.Flow('linear').add( - AutoSuspendingTask('b'), - utils.SaveOrderTask(name='c') + utils.SaveOrderTask('b', requires=['foo']), + utils.SaveOrderTask('c') ) engine = self._make_engine(flow) - engine.storage.inject({'engine': engine, 'boo': True}) - engine.run() + engine.storage.inject({'foo': 'bar'}) + with SuspendingListener(engine, task_name='b', + task_state=states.SUCCESS): + engine.run() self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED) - # uninject engine + # uninject everything: engine.storage.save(engine.storage.injector_name, - None, states.FAILURE) + {}, states.SUCCESS) self.assertRaises(exc.MissingDependencies, engine.run)