From e7508eb8e53217d889fe8af24a92cdd0dbb28068 Mon Sep 17 00:00:00 2001 From: Anastasia Karpinska Date: Thu, 17 Oct 2013 13:16:06 +0300 Subject: [PATCH] Unit tests refactoring * duplicated tests were removed * common tasks moved to utils Change-Id: I69c91a264ec668b1333db8fd907298262af098cb --- taskflow/tests/unit/test_action_engine.py | 748 ++++++------------ taskflow/tests/unit/test_arguments_passing.py | 143 ++++ taskflow/tests/unit/test_flow_dependencies.py | 228 +++--- taskflow/tests/unit/test_graph_flow.py | 49 +- taskflow/tests/unit/test_linear_flow.py | 260 ------ taskflow/tests/unit/test_suspend_flow.py | 180 +++++ taskflow/tests/utils.py | 189 ++++- 7 files changed, 828 insertions(+), 969 deletions(-) create mode 100644 taskflow/tests/unit/test_arguments_passing.py delete mode 100644 taskflow/tests/unit/test_linear_flow.py create mode 100644 taskflow/tests/unit/test_suspend_flow.py diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 1fcafe773..524987411 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -18,7 +18,6 @@ import contextlib import networkx -import time from concurrent import futures @@ -29,134 +28,17 @@ from taskflow.patterns import unordered_flow as uf import taskflow.engines from taskflow.engines.action_engine import engine as eng -from taskflow import exceptions as exc -from taskflow.persistence.backends import impl_memory from taskflow.persistence import logbook from taskflow import states -from taskflow import task from taskflow import test +from taskflow.tests import utils from taskflow.utils import persistence_utils as p_utils -class TestTask(task.Task): - - def __init__(self, values=None, name=None, sleep=None, - provides=None, rebind=None, requires=None): - super(TestTask, self).__init__(name=name, provides=provides, - rebind=rebind, requires=requires) - if values is None: - self.values = [] - else: - self.values = values - self._sleep = sleep - - def execute(self, **kwargs): - self.update_progress(0.0) - if self._sleep: - time.sleep(self._sleep) - self.values.append(self.name) - self.update_progress(1.0) - return 5 - - def revert(self, **kwargs): - self.update_progress(0) - if self._sleep: - time.sleep(self._sleep) - self.values.append(self.name + ' reverted(%s)' - % kwargs.get('result')) - self.update_progress(1.0) - - -class FailingTask(TestTask): - - def execute(self, **kwargs): - self.update_progress(0) - if self._sleep: - time.sleep(self._sleep) - self.update_progress(0.99) - raise RuntimeError('Woot!') - - -class NeverRunningTask(task.Task): - def execute(self, **kwargs): - assert False, 'This method should not be called' - - def revert(self, **kwargs): - assert False, 'This method should not be called' - - -class NastyTask(task.Task): - def execute(self, **kwargs): - pass - - def revert(self, **kwargs): - raise RuntimeError('Gotcha!') - - -class MultiReturnTask(task.Task): - def execute(self, **kwargs): - return 12, 2, 1 - - -class MultiargsTask(task.Task): - def execute(self, a, b, c): - return a + b + c - - -class MultiDictTask(task.Task): - def execute(self): - self.update_progress(0) - output = {} - total = len(sorted(self.provides)) - for i, k in enumerate(sorted(self.provides)): - output[k] = i - self.update_progress(i / total) - self.update_progress(1.0) - return output - - -class AutoSuspendingTask(TestTask): - - def execute(self, engine): - result = super(AutoSuspendingTask, self).execute() - engine.suspend() - return result - - def revert(self, engine, result): - super(AutoSuspendingTask, self).revert(**{'result': result}) - - -class AutoSuspendingTaskOnRevert(TestTask): - - def execute(self, engine): - return super(AutoSuspendingTaskOnRevert, self).execute() - - def revert(self, engine, result): - super(AutoSuspendingTaskOnRevert, self).revert(**{'result': result}) - engine.suspend() - - -class EngineTestBase(object): - def setUp(self): - super(EngineTestBase, self).setUp() - self.values = [] - self.backend = impl_memory.MemoryBackend(conf={}) - - def tearDown(self): - super(EngineTestBase, self).tearDown() - with contextlib.closing(self.backend) as be: - with contextlib.closing(be.get_connection()) as conn: - conn.clear_all() - - def _make_engine(self, flow, flow_detail=None): - raise NotImplementedError() - - -class EngineTaskTest(EngineTestBase): +class EngineTaskTest(utils.EngineTestBase): def test_run_task_as_flow(self): - flow = lf.Flow('test-1') - flow.add(TestTask(self.values, name='task1')) + flow = utils.SaveOrderTask(self.values, name='task1') engine = self._make_engine(flow) engine.run() self.assertEquals(self.values, ['task1']) @@ -171,7 +53,7 @@ class EngineTaskTest(EngineTestBase): values.append('flow %s' % state) def test_run_task_with_notifications(self): - flow = TestTask(self.values, name='task1') + flow = utils.SaveOrderTask(self.values, name='task1') engine = self._make_engine(flow) engine.notifier.register('*', self._flow_callback, kwargs={'values': self.values}) @@ -186,7 +68,7 @@ class EngineTaskTest(EngineTestBase): 'flow SUCCESS']) def test_failing_task_with_notifications(self): - flow = FailingTask(self.values, 'fail') + flow = utils.FailingTask(self.values, 'fail') engine = self._make_engine(flow) engine.notifier.register('*', self._flow_callback, kwargs={'values': self.values}) @@ -221,132 +103,30 @@ class EngineTaskTest(EngineTestBase): engine.run() self.assertIn(value, str(err.exception)) - def test_save_as(self): - flow = TestTask(self.values, name='task1', provides='first_data') - engine = self._make_engine(flow) - engine.run() - self.assertEquals(self.values, ['task1']) - self.assertEquals(engine.storage.fetch_all(), {'first_data': 5}) - def test_save_all_in_one(self): - flow = MultiReturnTask(provides='all_data') - engine = self._make_engine(flow) - engine.run() - self.assertEquals(engine.storage.fetch_all(), - {'all_data': (12, 2, 1)}) - - def test_save_several_values(self): - flow = MultiReturnTask(provides=('badger', 'mushroom', 'snake')) - engine = self._make_engine(flow) - engine.run() - self.assertEquals(engine.storage.fetch_all(), { - 'badger': 12, - 'mushroom': 2, - 'snake': 1 - }) - - def test_save_dict(self): - flow = MultiDictTask(provides=set(['badger', 'mushroom', 'snake'])) - engine = self._make_engine(flow) - engine.run() - self.assertEquals(engine.storage.fetch_all(), { - 'badger': 0, - 'mushroom': 1, - 'snake': 2, - }) - - def test_bad_save_as_value(self): - with self.assertRaises(TypeError): - TestTask(name='task1', provides=object()) - - def test_arguments_passing(self): - flow = MultiargsTask(provides='result') - engine = self._make_engine(flow) - engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17}) - engine.run() - self.assertEquals(engine.storage.fetch_all(), { - 'a': 1, 'b': 4, 'c': 9, 'x': 17, - 'result': 14, - }) - - def test_arguments_missing(self): - flow = MultiargsTask(provides='result') - engine = self._make_engine(flow) - engine.storage.inject({'a': 1, 'b': 4, 'x': 17}) - with self.assertRaises(exc.MissingDependencies): - engine.run() - - def test_partial_arguments_mapping(self): - flow = MultiargsTask(name='task1', - provides='result', - rebind={'b': 'x'}) - engine = self._make_engine(flow) - engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17}) - engine.run() - self.assertEquals(engine.storage.fetch_all(), { - 'a': 1, 'b': 4, 'c': 9, 'x': 17, - 'result': 27, - }) - - def test_all_arguments_mapping(self): - flow = MultiargsTask(name='task1', - provides='result', - rebind=['x', 'y', 'z']) - engine = self._make_engine(flow) - engine.storage.inject({ - 'a': 1, 'b': 2, 'c': 3, 'x': 4, 'y': 5, 'z': 6 - }) - engine.run() - self.assertEquals(engine.storage.fetch_all(), { - 'a': 1, 'b': 2, 'c': 3, 'x': 4, 'y': 5, 'z': 6, - 'result': 15, - }) - - def test_invalid_argument_name_map(self): - flow = MultiargsTask(name='task1', provides='result', - rebind={'b': 'z'}) - engine = self._make_engine(flow) - engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17}) - with self.assertRaises(exc.MissingDependencies): - engine.run() - - def test_invalid_argument_name_list(self): - flow = MultiargsTask(name='task1', - provides='result', - rebind=['a', 'z', 'b']) - engine = self._make_engine(flow) - engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17}) - with self.assertRaises(exc.MissingDependencies): - engine.run() - - def test_bad_rebind_args_value(self): - with self.assertRaises(TypeError): - TestTask(name='task1', - rebind=object()) - - -class EngineLinearFlowTest(EngineTestBase): +class EngineLinearFlowTest(utils.EngineTestBase): def test_sequential_flow_one_task(self): flow = lf.Flow('flow-1').add( - TestTask(self.values, name='task1') + utils.SaveOrderTask(self.values, name='task1') ) self._make_engine(flow).run() self.assertEquals(self.values, ['task1']) def test_sequential_flow_two_tasks(self): flow = lf.Flow('flow-2').add( - TestTask(self.values, name='task1'), - TestTask(self.values, name='task2') + utils.SaveOrderTask(self.values, name='task1'), + utils.SaveOrderTask(self.values, name='task2') ) self._make_engine(flow).run() self.assertEquals(self.values, ['task1', 'task2']) + self.assertEquals(len(flow), 2) def test_revert_removes_data(self): flow = lf.Flow('revert-removes').add( - TestTask(provides='one'), - MultiReturnTask(provides=('a', 'b', 'c')), - FailingTask(name='fail') + utils.TaskOneReturn(provides='one'), + utils.TaskMultiReturn(provides=('a', 'b', 'c')), + utils.FailingTask(name='fail') ) engine = self._make_engine(flow) with self.assertRaisesRegexp(RuntimeError, '^Woot'): @@ -355,9 +135,9 @@ class EngineLinearFlowTest(EngineTestBase): def test_sequential_flow_nested_blocks(self): flow = lf.Flow('nested-1').add( - TestTask(self.values, 'task1'), + utils.SaveOrderTask(self.values, 'task1'), lf.Flow('inner-1').add( - TestTask(self.values, 'task2') + utils.SaveOrderTask(self.values, 'task2') ) ) self._make_engine(flow).run() @@ -365,8 +145,8 @@ class EngineLinearFlowTest(EngineTestBase): def test_revert_exception_is_reraised(self): flow = lf.Flow('revert-1').add( - NastyTask(), - FailingTask(name='fail') + utils.NastyTask(), + utils.FailingTask(name='fail') ) engine = self._make_engine(flow) with self.assertRaisesRegexp(RuntimeError, '^Gotcha'): @@ -374,8 +154,8 @@ class EngineLinearFlowTest(EngineTestBase): def test_revert_not_run_task_is_not_reverted(self): flow = lf.Flow('revert-not-run').add( - FailingTask(self.values, 'fail'), - NeverRunningTask(), + utils.FailingTask(self.values, 'fail'), + utils.NeverRunningTask(), ) engine = self._make_engine(flow) with self.assertRaisesRegexp(RuntimeError, '^Woot'): @@ -386,10 +166,10 @@ class EngineLinearFlowTest(EngineTestBase): def test_correctly_reverts_children(self): flow = lf.Flow('root-1').add( - TestTask(self.values, 'task1'), + utils.SaveOrderTask(self.values, 'task1'), lf.Flow('child-1').add( - TestTask(self.values, 'task2'), - FailingTask(self.values, 'fail') + utils.SaveOrderTask(self.values, 'task2'), + utils.FailingTask(self.values, 'fail') ) ) engine = self._make_engine(flow) @@ -402,30 +182,31 @@ class EngineLinearFlowTest(EngineTestBase): 'task2 reverted(5)', 'task1 reverted(5)']) -class EngineParallelFlowTest(EngineTestBase): +class EngineParallelFlowTest(utils.EngineTestBase): def test_parallel_flow_one_task(self): flow = uf.Flow('p-1').add( - TestTask(self.values, name='task1', sleep=0.01) + utils.SaveOrderTask(self.values, name='task1', sleep=0.01) ) self._make_engine(flow).run() self.assertEquals(self.values, ['task1']) def test_parallel_flow_two_tasks(self): flow = uf.Flow('p-2').add( - TestTask(self.values, name='task1', sleep=0.01), - TestTask(self.values, name='task2', sleep=0.01) + utils.SaveOrderTask(self.values, name='task1', sleep=0.01), + utils.SaveOrderTask(self.values, name='task2', sleep=0.01) ) self._make_engine(flow).run() result = set(self.values) self.assertEquals(result, set(['task1', 'task2'])) + self.assertEquals(len(flow), 2) def test_parallel_revert_common(self): flow = uf.Flow('p-r-3').add( - TestTask(self.values, name='task1'), - FailingTask(self.values, sleep=0.01), - TestTask(self.values, name='task2') + utils.TaskNoRequiresNoReturns(name='task1'), + utils.FailingTask(sleep=0.01), + utils.TaskNoRequiresNoReturns(name='task2') ) engine = self._make_engine(flow) with self.assertRaisesRegexp(RuntimeError, '^Woot'): @@ -438,10 +219,10 @@ class EngineParallelFlowTest(EngineTestBase): # FailingTask fails. flow = lf.Flow('p-r-r-l').add( uf.Flow('p-r-r').add( - TestTask(self.values, name='task1'), - NastyTask() + utils.TaskNoRequiresNoReturns(name='task1'), + utils.NastyTask() ), - FailingTask(self.values, sleep=0.1) + utils.FailingTask(self.values, sleep=0.1) ) engine = self._make_engine(flow) with self.assertRaisesRegexp(RuntimeError, '^Gotcha'): @@ -449,8 +230,8 @@ class EngineParallelFlowTest(EngineTestBase): def test_sequential_flow_two_tasks_with_resumption(self): flow = lf.Flow('lf-2-r').add( - TestTask(self.values, name='task1', provides='x1'), - TestTask(self.values, name='task2', provides='x2') + utils.SaveOrderTask(self.values, name='task1', provides='x1'), + utils.SaveOrderTask(self.values, name='task2', provides='x2') ) # Create FlowDetail as if we already run task1 @@ -470,255 +251,11 @@ class EngineParallelFlowTest(EngineTestBase): self.assertEquals(engine.storage.fetch_all(), {'x1': 17, 'x2': 5}) - -class EngineGraphFlowTest(EngineTestBase): - - def test_graph_flow_one_task(self): - flow = gf.Flow('g-1').add( - TestTask(self.values, name='task1') - ) - self._make_engine(flow).run() - self.assertEquals(self.values, ['task1']) - - def test_graph_flow_two_independent_tasks(self): - flow = gf.Flow('g-2').add( - TestTask(self.values, name='task1'), - TestTask(self.values, name='task2') - ) - self._make_engine(flow).run() - self.assertEquals(set(self.values), set(['task1', 'task2'])) - - def test_graph_flow_two_tasks(self): - flow = gf.Flow('g-1-1').add( - TestTask(self.values, name='task2', requires=['a']), - TestTask(self.values, name='task1', provides='a') - ) - self._make_engine(flow).run() - self.assertEquals(self.values, ['task1', 'task2']) - - def test_graph_flow_four_tasks_added_separately(self): - flow = (gf.Flow('g-4') - .add(TestTask(self.values, name='task4', - provides='d', requires=['c'])) - .add(TestTask(self.values, name='task2', - provides='b', requires=['a'])) - .add(TestTask(self.values, name='task3', - provides='c', requires=['b'])) - .add(TestTask(self.values, name='task1', - provides='a')) - ) - self._make_engine(flow).run() - self.assertEquals(self.values, ['task1', 'task2', 'task3', 'task4']) - - def test_graph_cyclic_dependency(self): - with self.assertRaisesRegexp(exc.DependencyFailure, '^No path'): - gf.Flow('g-3-cyclic').add( - TestTask([], name='task1', provides='a', requires=['b']), - TestTask([], name='task2', provides='b', requires=['c']), - TestTask([], name='task3', provides='c', requires=['a'])) - - def test_graph_two_tasks_returns_same_value(self): - with self.assertRaisesRegexp(exc.DependencyFailure, - "task2 provides a but is already being" - " provided by task1 and duplicate" - " producers are disallowed"): - gf.Flow('g-2-same-value').add( - TestTask([], name='task1', provides='a'), - TestTask([], name='task2', provides='a')) - - def test_graph_flow_four_tasks_revert(self): - flow = gf.Flow('g-4-failing').add( - TestTask(self.values, name='task4', provides='d', requires=['c']), - TestTask(self.values, name='task2', provides='b', requires=['a']), - FailingTask(self.values, name='task3', - provides='c', requires=['b']), - TestTask(self.values, name='task1', provides='a')) - - engine = self._make_engine(flow) - with self.assertRaisesRegexp(RuntimeError, '^Woot'): - engine.run() - self.assertEquals( - self.values, - ['task1', 'task2', - 'task3 reverted(Failure: RuntimeError: Woot!)', - 'task2 reverted(5)', 'task1 reverted(5)']) - - def test_graph_flow_four_tasks_revert_failure(self): - flow = gf.Flow('g-3-nasty').add( - NastyTask(name='task2', provides='b', requires=['a']), - FailingTask(self.values, name='task3', requires=['b']), - TestTask(self.values, name='task1', provides='a')) - - engine = self._make_engine(flow) - with self.assertRaisesRegexp(RuntimeError, '^Gotcha'): - engine.run() - - def test_graph_flow_with_multireturn_and_multiargs_tasks(self): - flow = gf.Flow('g-3-multi').add( - MultiargsTask(name='task1', rebind=['a', 'b', 'y'], provides='z'), - MultiReturnTask(name='task2', provides=['a', 'b', 'c']), - MultiargsTask(name='task3', rebind=['c', 'b', 'x'], provides='y')) - - engine = self._make_engine(flow) - engine.storage.inject({'x': 30}) - engine.run() - self.assertEquals(engine.storage.fetch_all(), { - 'a': 12, - 'b': 2, - 'c': 1, - 'x': 30, - 'y': 33, - 'z': 47 - }) - - def test_one_task_provides_and_requires_same_data(self): - with self.assertRaisesRegexp(exc.DependencyFailure, '^No path'): - gf.Flow('g-1-req-error').add( - TestTask([], name='task1', requires=['a'], provides='a')) - - def test_task_graph_property(self): - flow = gf.Flow('test').add( - TestTask(name='task1'), - TestTask(name='task2')) - - engine = self._make_engine(flow) - graph = engine.execution_graph - self.assertTrue(isinstance(graph, networkx.DiGraph)) - - def test_task_graph_property_for_one_task(self): - flow = TestTask(name='task1') - - engine = self._make_engine(flow) - graph = engine.execution_graph - self.assertTrue(isinstance(graph, networkx.DiGraph)) - - -class SuspendFlowTest(EngineTestBase): - - def test_suspend_one_task(self): - flow = AutoSuspendingTask(self.values, 'a') - engine = self._make_engine(flow) - engine.storage.inject({'engine': engine}) - engine.run() - self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS) - self.assertEquals(self.values, ['a']) - engine.run() - self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS) - self.assertEquals(self.values, ['a']) - - def test_suspend_linear_flow(self): - flow = lf.Flow('linear').add( - TestTask(self.values, 'a'), - AutoSuspendingTask(self.values, 'b'), - TestTask(self.values, 'c') - ) - engine = self._make_engine(flow) - engine.storage.inject({'engine': engine}) - engine.run() - self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED) - self.assertEquals(self.values, ['a', 'b']) - engine.run() - self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS) - self.assertEquals(self.values, ['a', 'b', 'c']) - - def test_suspend_linear_flow_on_revert(self): - flow = lf.Flow('linear').add( - TestTask(self.values, 'a'), - AutoSuspendingTaskOnRevert(self.values, 'b'), - FailingTask(self.values, 'c') - ) - engine = self._make_engine(flow) - engine.storage.inject({'engine': engine}) - engine.run() - self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED) - self.assertEquals( - self.values, - ['a', 'b', - 'c reverted(Failure: RuntimeError: Woot!)', - 'b reverted(5)']) - with self.assertRaisesRegexp(RuntimeError, '^Woot'): - engine.run() - self.assertEquals(engine.storage.get_flow_state(), states.REVERTED) - self.assertEquals( - self.values, - ['a', - 'b', - 'c reverted(Failure: RuntimeError: Woot!)', - 'b reverted(5)', - 'a reverted(5)']) - - def test_storage_is_rechecked(self): - flow = lf.Flow('linear').add( - AutoSuspendingTask(self.values, 'b'), - TestTask(self.values, name='c') - ) - engine = self._make_engine(flow) - engine.storage.inject({'engine': engine, 'boo': True}) - engine.run() - self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED) - # uninject engine - engine.storage.save( - engine.storage.get_uuid_by_name(engine.storage.injector_name), - None, - states.FAILURE) - with self.assertRaises(exc.MissingDependencies): - engine.run() - - -class SingleThreadedEngineTest(EngineTaskTest, - EngineLinearFlowTest, - EngineParallelFlowTest, - EngineGraphFlowTest, - SuspendFlowTest, - test.TestCase): - def _make_engine(self, flow, flow_detail=None): - return taskflow.engines.load(flow, - flow_detail=flow_detail, - engine_conf='serial', - backend=self.backend) - - def test_correct_load(self): - engine = self._make_engine(TestTask) - self.assertIsInstance(engine, eng.SingleThreadedActionEngine) - - def test_singlethreaded_is_the_default(self): - engine = taskflow.engines.load(TestTask) - self.assertIsInstance(engine, eng.SingleThreadedActionEngine) - - -class MultiThreadedEngineTest(EngineTaskTest, - EngineLinearFlowTest, - EngineParallelFlowTest, - EngineGraphFlowTest, - SuspendFlowTest, - test.TestCase): - def _make_engine(self, flow, flow_detail=None, executor=None): - engine_conf = dict(engine='parallel', - executor=executor) - return taskflow.engines.load(flow, flow_detail=flow_detail, - engine_conf=engine_conf, - backend=self.backend) - - def test_correct_load(self): - engine = self._make_engine(TestTask) - self.assertIsInstance(engine, eng.MultiThreadedActionEngine) - self.assertIs(engine.executor, None) - - def test_using_common_executor(self): - flow = TestTask(self.values, name='task1') - executor = futures.ThreadPoolExecutor(2) - try: - e1 = self._make_engine(flow, executor=executor) - e2 = self._make_engine(flow, executor=executor) - self.assertIs(e1.executor, e2.executor) - finally: - executor.shutdown(wait=True) - def test_parallel_revert_specific(self): flow = uf.Flow('p-r-r').add( - TestTask(self.values, name='task1', sleep=0.01), - FailingTask(sleep=0.01), - TestTask(self.values, name='task2', sleep=0.01) + utils.SaveOrderTask(self.values, name='task1', sleep=0.01), + utils.FailingTask(sleep=0.01), + utils.SaveOrderTask(self.values, name='task2', sleep=0.01) ) engine = self._make_engine(flow) with self.assertRaisesRegexp(RuntimeError, '^Woot'): @@ -732,10 +269,11 @@ class MultiThreadedEngineTest(EngineTaskTest, def test_parallel_revert_exception_is_reraised_(self): flow = lf.Flow('p-r-reraise').add( - TestTask(self.values, name='task1', sleep=0.01), - NastyTask(), - FailingTask(sleep=0.01), - TestTask() # this should not get reverted + utils.SaveOrderTask(self.values, name='task1', sleep=0.01), + utils.NastyTask(), + utils.FailingTask(sleep=0.01), + utils.SaveOrderTask(self.values, + name='task2') # this should not get reverted ) engine = self._make_engine(flow) with self.assertRaisesRegexp(RuntimeError, '^Gotcha'): @@ -745,12 +283,12 @@ class MultiThreadedEngineTest(EngineTaskTest, def test_nested_parallel_revert_exception_is_reraised(self): flow = uf.Flow('p-root').add( - TestTask(self.values, name='task1'), - TestTask(self.values, name='task2'), + utils.SaveOrderTask(self.values, name='task1'), + utils.SaveOrderTask(self.values, name='task2'), lf.Flow('p-inner').add( - TestTask(self.values, name='task3', sleep=0.1), - NastyTask(), - FailingTask(sleep=0.01) + utils.SaveOrderTask(self.values, name='task3', sleep=0.1), + utils.NastyTask(), + utils.FailingTask(sleep=0.01) ) ) engine = self._make_engine(flow) @@ -767,12 +305,12 @@ class MultiThreadedEngineTest(EngineTaskTest, def test_parallel_revert_exception_do_not_revert_linear_tasks(self): flow = lf.Flow('l-root').add( - TestTask(self.values, name='task1'), - TestTask(self.values, name='task2'), + utils.SaveOrderTask(self.values, name='task1'), + utils.SaveOrderTask(self.values, name='task2'), uf.Flow('p-inner').add( - TestTask(self.values, name='task3', sleep=0.1), - NastyTask(), - FailingTask(sleep=0.01) + utils.SaveOrderTask(self.values, name='task3', sleep=0.1), + utils.NastyTask(), + utils.FailingTask(sleep=0.01) ) ) engine = self._make_engine(flow) @@ -808,11 +346,11 @@ class MultiThreadedEngineTest(EngineTaskTest, def test_parallel_nested_to_linear_revert(self): flow = lf.Flow('l-root').add( - TestTask(self.values, name='task1'), - TestTask(self.values, name='task2'), + utils.SaveOrderTask(self.values, name='task1'), + utils.SaveOrderTask(self.values, name='task2'), uf.Flow('p-inner').add( - TestTask(self.values, name='task3', sleep=0.1), - FailingTask(sleep=0.01) + utils.SaveOrderTask(self.values, name='task3', sleep=0.1), + utils.FailingTask(sleep=0.01) ) ) engine = self._make_engine(flow) @@ -834,11 +372,11 @@ class MultiThreadedEngineTest(EngineTaskTest, def test_linear_nested_to_parallel_revert(self): flow = uf.Flow('p-root').add( - TestTask(self.values, name='task1'), - TestTask(self.values, name='task2'), + utils.SaveOrderTask(self.values, name='task1'), + utils.SaveOrderTask(self.values, name='task2'), lf.Flow('l-inner').add( - TestTask(self.values, name='task3', sleep=0.1), - FailingTask(self.values, name='fail', sleep=0.01) + utils.SaveOrderTask(self.values, name='task3', sleep=0.1), + utils.FailingTask(self.values, name='fail', sleep=0.01) ) ) engine = self._make_engine(flow) @@ -858,12 +396,12 @@ class MultiThreadedEngineTest(EngineTaskTest, def test_linear_nested_to_parallel_revert_exception(self): flow = uf.Flow('p-root').add( - TestTask(self.values, name='task1', sleep=0.01), - TestTask(self.values, name='task2', sleep=0.01), + utils.SaveOrderTask(self.values, name='task1', sleep=0.01), + utils.SaveOrderTask(self.values, name='task2', sleep=0.01), lf.Flow('l-inner').add( - TestTask(self.values, name='task3'), - NastyTask(), - FailingTask(sleep=0.01) + utils.SaveOrderTask(self.values, name='task3'), + utils.NastyTask(), + utils.FailingTask(sleep=0.01) ) ) engine = self._make_engine(flow) @@ -874,3 +412,157 @@ class MultiThreadedEngineTest(EngineTaskTest, 'task2', 'task2 reverted(5)', 'task3']) self.assertIsSubset(possible_result, result) + + +class EngineGraphFlowTest(utils.EngineTestBase): + + def test_graph_flow_one_task(self): + flow = gf.Flow('g-1').add( + utils.SaveOrderTask(self.values, name='task1') + ) + self._make_engine(flow).run() + self.assertEquals(self.values, ['task1']) + + def test_graph_flow_two_independent_tasks(self): + flow = gf.Flow('g-2').add( + utils.SaveOrderTask(self.values, name='task1'), + utils.SaveOrderTask(self.values, name='task2') + ) + self._make_engine(flow).run() + self.assertEquals(set(self.values), set(['task1', 'task2'])) + self.assertEquals(len(flow), 2) + + def test_graph_flow_two_tasks(self): + flow = gf.Flow('g-1-1').add( + utils.SaveOrderTask(self.values, name='task2', requires=['a']), + utils.SaveOrderTask(self.values, name='task1', provides='a') + ) + self._make_engine(flow).run() + self.assertEquals(self.values, ['task1', 'task2']) + + def test_graph_flow_four_tasks_added_separately(self): + flow = (gf.Flow('g-4') + .add(utils.SaveOrderTask(self.values, name='task4', + provides='d', requires=['c'])) + .add(utils.SaveOrderTask(self.values, name='task2', + provides='b', requires=['a'])) + .add(utils.SaveOrderTask(self.values, name='task3', + provides='c', requires=['b'])) + .add(utils.SaveOrderTask(self.values, name='task1', + provides='a')) + ) + self._make_engine(flow).run() + self.assertEquals(self.values, ['task1', 'task2', 'task3', 'task4']) + + def test_graph_flow_four_tasks_revert(self): + flow = gf.Flow('g-4-failing').add( + utils.SaveOrderTask(self.values, name='task4', + provides='d', requires=['c']), + utils.SaveOrderTask(self.values, name='task2', + provides='b', requires=['a']), + utils.FailingTask(self.values, name='task3', + provides='c', requires=['b']), + utils.SaveOrderTask(self.values, name='task1', provides='a')) + + engine = self._make_engine(flow) + with self.assertRaisesRegexp(RuntimeError, '^Woot'): + engine.run() + self.assertEquals( + self.values, + ['task1', 'task2', + 'task3 reverted(Failure: RuntimeError: Woot!)', + 'task2 reverted(5)', 'task1 reverted(5)']) + + def test_graph_flow_four_tasks_revert_failure(self): + flow = gf.Flow('g-3-nasty').add( + utils.NastyTask(name='task2', provides='b', requires=['a']), + utils.FailingTask(self.values, name='task3', requires=['b']), + utils.SaveOrderTask(self.values, name='task1', provides='a')) + + engine = self._make_engine(flow) + with self.assertRaisesRegexp(RuntimeError, '^Gotcha'): + engine.run() + + def test_graph_flow_with_multireturn_and_multiargs_tasks(self): + flow = gf.Flow('g-3-multi').add( + utils.TaskMultiArgOneReturn(name='task1', + rebind=['a', 'b', 'y'], provides='z'), + utils.TaskMultiReturn(name='task2', provides=['a', 'b', 'c']), + utils.TaskMultiArgOneReturn(name='task3', + rebind=['c', 'b', 'x'], provides='y')) + + engine = self._make_engine(flow) + engine.storage.inject({'x': 30}) + engine.run() + self.assertEquals(engine.storage.fetch_all(), { + 'a': 1, + 'b': 3, + 'c': 5, + 'x': 30, + 'y': 38, + 'z': 42 + }) + + def test_task_graph_property(self): + flow = gf.Flow('test').add( + utils.TaskNoRequiresNoReturns(name='task1'), + utils.TaskNoRequiresNoReturns(name='task2')) + + engine = self._make_engine(flow) + graph = engine.execution_graph + self.assertTrue(isinstance(graph, networkx.DiGraph)) + + def test_task_graph_property_for_one_task(self): + flow = utils.TaskNoRequiresNoReturns(name='task1') + + engine = self._make_engine(flow) + graph = engine.execution_graph + self.assertTrue(isinstance(graph, networkx.DiGraph)) + + +class SingleThreadedEngineTest(EngineTaskTest, + EngineLinearFlowTest, + EngineParallelFlowTest, + EngineGraphFlowTest, + test.TestCase): + def _make_engine(self, flow, flow_detail=None): + return taskflow.engines.load(flow, + flow_detail=flow_detail, + engine_conf='serial', + backend=self.backend) + + def test_correct_load(self): + engine = self._make_engine(utils.TaskNoRequiresNoReturns) + self.assertIsInstance(engine, eng.SingleThreadedActionEngine) + + def test_singlethreaded_is_the_default(self): + engine = taskflow.engines.load(utils.TaskNoRequiresNoReturns) + self.assertIsInstance(engine, eng.SingleThreadedActionEngine) + + +class MultiThreadedEngineTest(EngineTaskTest, + EngineLinearFlowTest, + EngineParallelFlowTest, + EngineGraphFlowTest, + test.TestCase): + def _make_engine(self, flow, flow_detail=None, executor=None): + engine_conf = dict(engine='parallel', + executor=executor) + return taskflow.engines.load(flow, flow_detail=flow_detail, + engine_conf=engine_conf, + backend=self.backend) + + def test_correct_load(self): + engine = self._make_engine(utils.TaskNoRequiresNoReturns) + self.assertIsInstance(engine, eng.MultiThreadedActionEngine) + self.assertIs(engine.executor, None) + + def test_using_common_executor(self): + flow = utils.TaskNoRequiresNoReturns(name='task1') + executor = futures.ThreadPoolExecutor(2) + try: + e1 = self._make_engine(flow, executor=executor) + e2 = self._make_engine(flow, executor=executor) + self.assertIs(e1.executor, e2.executor) + finally: + executor.shutdown(wait=True) diff --git a/taskflow/tests/unit/test_arguments_passing.py b/taskflow/tests/unit/test_arguments_passing.py new file mode 100644 index 000000000..9865301a5 --- /dev/null +++ b/taskflow/tests/unit/test_arguments_passing.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import taskflow.engines + +from taskflow import exceptions as exc +from taskflow import test +from taskflow.tests import utils + + +class ArgumentsPassingTest(utils.EngineTestBase): + + def test_save_as(self): + flow = utils.TaskOneReturn(name='task1', provides='first_data') + engine = self._make_engine(flow) + engine.run() + self.assertEquals(engine.storage.fetch_all(), {'first_data': 1}) + + def test_save_all_in_one(self): + flow = utils.TaskMultiReturn(provides='all_data') + engine = self._make_engine(flow) + engine.run() + self.assertEquals(engine.storage.fetch_all(), + {'all_data': (1, 3, 5)}) + + def test_save_several_values(self): + flow = utils.TaskMultiReturn(provides=('badger', 'mushroom', 'snake')) + engine = self._make_engine(flow) + engine.run() + self.assertEquals(engine.storage.fetch_all(), { + 'badger': 1, + 'mushroom': 3, + 'snake': 5 + }) + + def test_save_dict(self): + flow = utils.TaskMultiDictk(provides=set(['badger', + 'mushroom', + 'snake'])) + engine = self._make_engine(flow) + engine.run() + self.assertEquals(engine.storage.fetch_all(), { + 'badger': 0, + 'mushroom': 1, + 'snake': 2, + }) + + def test_bad_save_as_value(self): + with self.assertRaises(TypeError): + utils.TaskOneReturn(name='task1', provides=object()) + + def test_arguments_passing(self): + flow = utils.TaskMultiArgOneReturn(provides='result') + engine = self._make_engine(flow) + engine.storage.inject({'x': 1, 'y': 4, 'z': 9, 'a': 17}) + engine.run() + self.assertEquals(engine.storage.fetch_all(), { + 'x': 1, 'y': 4, 'z': 9, 'a': 17, + 'result': 14, + }) + + def test_arguments_missing(self): + flow = utils.TaskMultiArg() + engine = self._make_engine(flow) + engine.storage.inject({'a': 1, 'b': 4, 'x': 17}) + with self.assertRaises(exc.MissingDependencies): + engine.run() + + def test_partial_arguments_mapping(self): + flow = utils.TaskMultiArgOneReturn(provides='result', + rebind={'x': 'a'}) + engine = self._make_engine(flow) + engine.storage.inject({'x': 1, 'y': 4, 'z': 9, 'a': 17}) + engine.run() + self.assertEquals(engine.storage.fetch_all(), { + 'x': 1, 'y': 4, 'z': 9, 'a': 17, + 'result': 30, + }) + + def test_all_arguments_mapping(self): + flow = utils.TaskMultiArgOneReturn(provides='result', + rebind=['a', 'b', 'c']) + engine = self._make_engine(flow) + engine.storage.inject({ + 'a': 1, 'b': 2, 'c': 3, 'x': 4, 'y': 5, 'z': 6 + }) + engine.run() + self.assertEquals(engine.storage.fetch_all(), { + 'a': 1, 'b': 2, 'c': 3, 'x': 4, 'y': 5, 'z': 6, + 'result': 6, + }) + + def test_invalid_argument_name_map(self): + flow = utils.TaskMultiArg(rebind={'z': 'b'}) + engine = self._make_engine(flow) + engine.storage.inject({'a': 1, 'y': 4, 'c': 9, 'x': 17}) + with self.assertRaises(exc.MissingDependencies): + engine.run() + + def test_invalid_argument_name_list(self): + flow = utils.TaskMultiArg(rebind=['a', 'z', 'b']) + engine = self._make_engine(flow) + engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17}) + with self.assertRaises(exc.MissingDependencies): + engine.run() + + def test_bad_rebind_args_value(self): + with self.assertRaises(TypeError): + utils.TaskOneArg(rebind=object()) + + +class SingleThreadedEngineTest(ArgumentsPassingTest, + test.TestCase): + def _make_engine(self, flow, flow_detail=None): + return taskflow.engines.load(flow, + flow_detail=flow_detail, + engine_conf='serial', + backend=self.backend) + + +class MultiThreadedEngineTest(ArgumentsPassingTest, + test.TestCase): + def _make_engine(self, flow, flow_detail=None, executor=None): + engine_conf = dict(engine='parallel', + executor=executor) + return taskflow.engines.load(flow, flow_detail=flow_detail, + engine_conf=engine_conf, + backend=self.backend) diff --git a/taskflow/tests/unit/test_flow_dependencies.py b/taskflow/tests/unit/test_flow_dependencies.py index ffa14f634..97aa41323 100644 --- a/taskflow/tests/unit/test_flow_dependencies.py +++ b/taskflow/tests/unit/test_flow_dependencies.py @@ -16,233 +16,237 @@ # License for the specific language governing permissions and limitations # under the License. +from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf from taskflow import exceptions -from taskflow import task from taskflow import test - - -class TaskNoRequiresNoReturns(task.Task): - - def execute(self, **kwargs): - pass - - def revert(self, **kwargs): - pass - - -class TaskOneArg(task.Task): - - def execute(self, x, **kwargs): - pass - - def revert(self, x, **kwargs): - pass - - -class TaskMultiArg(task.Task): - - def execute(self, x, y, z, **kwargs): - pass - - def revert(self, x, y, z, **kwargs): - pass - - -class TaskOneReturn(task.Task): - - def execute(self, **kwargs): - return 1 - - def revert(self, **kwargs): - pass - - -class TaskMultiReturn(task.Task): - - def execute(self, **kwargs): - return 1, 3, 5 - - def revert(self, **kwargs): - pass - - -class TaskOneArgOneReturn(task.Task): - - def execute(self, x, **kwargs): - return 1 - - def revert(self, x, **kwargs): - pass - - -class TaskMultiArgMultiReturn(task.Task): - - def execute(self, x, y, z, **kwargs): - return 1, 3, 5 - - def revert(self, x, y, z, **kwargs): - pass +from taskflow.tests import utils class FlowDependenciesTest(test.TestCase): def test_task_without_dependencies(self): - flow = TaskNoRequiresNoReturns() + flow = utils.TaskNoRequiresNoReturns() self.assertEquals(flow.requires, set()) self.assertEquals(flow.provides, set()) def test_task_requires_default_values(self): - flow = TaskMultiArg() + flow = utils.TaskMultiArg() self.assertEquals(flow.requires, set(['x', 'y', 'z'])) self.assertEquals(flow.provides, set()) def test_task_requires_rebinded_mapped(self): - flow = TaskMultiArg(rebind={'x': 'a', 'y': 'b', 'z': 'c'}) + flow = utils.TaskMultiArg(rebind={'x': 'a', 'y': 'b', 'z': 'c'}) self.assertEquals(flow.requires, set(['a', 'b', 'c'])) self.assertEquals(flow.provides, set()) def test_task_requires_additional_values(self): - flow = TaskMultiArg(requires=['a', 'b']) + flow = utils.TaskMultiArg(requires=['a', 'b']) self.assertEquals(flow.requires, set(['a', 'b', 'x', 'y', 'z'])) self.assertEquals(flow.provides, set()) def test_task_provides_values(self): - flow = TaskMultiReturn(provides=['a', 'b', 'c']) + flow = utils.TaskMultiReturn(provides=['a', 'b', 'c']) self.assertEquals(flow.requires, set()) self.assertEquals(flow.provides, set(['a', 'b', 'c'])) def test_task_provides_and_requires_values(self): - flow = TaskMultiArgMultiReturn(provides=['a', 'b', 'c']) + flow = utils.TaskMultiArgMultiReturn(provides=['a', 'b', 'c']) self.assertEquals(flow.requires, set(['x', 'y', 'z'])) self.assertEquals(flow.provides, set(['a', 'b', 'c'])) def test_linear_flow_without_dependencies(self): flow = lf.Flow('lf').add( - TaskNoRequiresNoReturns('task1'), - TaskNoRequiresNoReturns('task2')) + utils.TaskNoRequiresNoReturns('task1'), + utils.TaskNoRequiresNoReturns('task2')) self.assertEquals(flow.requires, set()) self.assertEquals(flow.provides, set()) def test_linear_flow_reuires_values(self): flow = lf.Flow('lf').add( - TaskOneArg('task1'), - TaskMultiArg('task2')) + utils.TaskOneArg('task1'), + utils.TaskMultiArg('task2')) self.assertEquals(flow.requires, set(['x', 'y', 'z'])) self.assertEquals(flow.provides, set()) def test_linear_flow_reuires_rebind_values(self): flow = lf.Flow('lf').add( - TaskOneArg('task1', rebind=['q']), - TaskMultiArg('task2')) + utils.TaskOneArg('task1', rebind=['q']), + utils.TaskMultiArg('task2')) self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q'])) self.assertEquals(flow.provides, set()) def test_linear_flow_provides_values(self): flow = lf.Flow('lf').add( - TaskOneReturn('task1', provides='x'), - TaskMultiReturn('task2', provides=['a', 'b', 'c'])) + utils.TaskOneReturn('task1', provides='x'), + utils.TaskMultiReturn('task2', provides=['a', 'b', 'c'])) self.assertEquals(flow.requires, set()) self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c'])) def test_linear_flow_provides_out_of_order(self): with self.assertRaises(exceptions.InvariantViolationException): lf.Flow('lf').add( - TaskOneArg('task2'), - TaskOneReturn('task1', provides='x')) + utils.TaskOneArg('task2'), + utils.TaskOneReturn('task1', provides='x')) def test_linear_flow_provides_required_values(self): flow = lf.Flow('lf').add( - TaskOneReturn('task1', provides='x'), - TaskOneArg('task2')) + utils.TaskOneReturn('task1', provides='x'), + utils.TaskOneArg('task2')) self.assertEquals(flow.requires, set()) self.assertEquals(flow.provides, set(['x'])) def test_linear_flow_multi_provides_and_requires_values(self): flow = lf.Flow('lf').add( - TaskMultiArgMultiReturn('task1', - rebind=['a', 'b', 'c'], - provides=['x', 'y', 'q']), - TaskMultiArgMultiReturn('task2', - provides=['i', 'j', 'k'])) + utils.TaskMultiArgMultiReturn('task1', + rebind=['a', 'b', 'c'], + provides=['x', 'y', 'q']), + utils.TaskMultiArgMultiReturn('task2', + provides=['i', 'j', 'k'])) self.assertEquals(flow.requires, set(['a', 'b', 'c', 'z'])) self.assertEquals(flow.provides, set(['x', 'y', 'q', 'i', 'j', 'k'])) def test_linear_flow_self_requires(self): flow = lf.Flow('uf') with self.assertRaises(exceptions.InvariantViolationException): - flow.add(TaskNoRequiresNoReturns(rebind=['x'], provides='x')) + flow.add(utils.TaskNoRequiresNoReturns(rebind=['x'], provides='x')) def test_unordered_flow_without_dependencies(self): flow = uf.Flow('uf').add( - TaskNoRequiresNoReturns('task1'), - TaskNoRequiresNoReturns('task2')) + utils.TaskNoRequiresNoReturns('task1'), + utils.TaskNoRequiresNoReturns('task2')) self.assertEquals(flow.requires, set()) self.assertEquals(flow.provides, set()) def test_unordered_flow_self_requires(self): flow = uf.Flow('uf') with self.assertRaises(exceptions.InvariantViolationException): - flow.add(TaskNoRequiresNoReturns(rebind=['x'], provides='x')) + flow.add(utils.TaskNoRequiresNoReturns(rebind=['x'], provides='x')) def test_unordered_flow_reuires_values(self): flow = uf.Flow('uf').add( - TaskOneArg('task1'), - TaskMultiArg('task2')) + utils.TaskOneArg('task1'), + utils.TaskMultiArg('task2')) self.assertEquals(flow.requires, set(['x', 'y', 'z'])) self.assertEquals(flow.provides, set()) def test_unordered_flow_reuires_rebind_values(self): flow = uf.Flow('uf').add( - TaskOneArg('task1', rebind=['q']), - TaskMultiArg('task2')) + utils.TaskOneArg('task1', rebind=['q']), + utils.TaskMultiArg('task2')) self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q'])) self.assertEquals(flow.provides, set()) def test_unordered_flow_provides_values(self): flow = uf.Flow('uf').add( - TaskOneReturn('task1', provides='x'), - TaskMultiReturn('task2', provides=['a', 'b', 'c'])) + utils.TaskOneReturn('task1', provides='x'), + utils.TaskMultiReturn('task2', provides=['a', 'b', 'c'])) self.assertEquals(flow.requires, set()) self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c'])) def test_unordered_flow_provides_required_values(self): with self.assertRaises(exceptions.InvariantViolationException): uf.Flow('uf').add( - TaskOneReturn('task1', provides='x'), - TaskOneArg('task2')) + utils.TaskOneReturn('task1', provides='x'), + utils.TaskOneArg('task2')) def test_unordered_flow_requires_provided_value_other_call(self): flow = uf.Flow('uf') - flow.add(TaskOneReturn('task1', provides='x')) + flow.add(utils.TaskOneReturn('task1', provides='x')) with self.assertRaises(exceptions.InvariantViolationException): - flow.add(TaskOneArg('task2')) + flow.add(utils.TaskOneArg('task2')) def test_unordered_flow_provides_required_value_other_call(self): flow = uf.Flow('uf') - flow.add(TaskOneArg('task2')) + flow.add(utils.TaskOneArg('task2')) with self.assertRaises(exceptions.InvariantViolationException): - flow.add(TaskOneReturn('task1', provides='x')) + flow.add(utils.TaskOneReturn('task1', provides='x')) def test_unordered_flow_multi_provides_and_requires_values(self): flow = uf.Flow('uf').add( - TaskMultiArgMultiReturn('task1', - rebind=['a', 'b', 'c'], - provides=['d', 'e', 'f']), - TaskMultiArgMultiReturn('task2', - provides=['i', 'j', 'k'])) + utils.TaskMultiArgMultiReturn('task1', + rebind=['a', 'b', 'c'], + provides=['d', 'e', 'f']), + utils.TaskMultiArgMultiReturn('task2', + provides=['i', 'j', 'k'])) self.assertEquals(flow.requires, set(['a', 'b', 'c', 'x', 'y', 'z'])) self.assertEquals(flow.provides, set(['d', 'e', 'f', 'i', 'j', 'k'])) def test_nested_flows_requirements(self): flow = uf.Flow('uf').add( lf.Flow('lf').add( - TaskOneArgOneReturn('task1', rebind=['a'], provides=['x']), - TaskOneArgOneReturn('task2', provides=['y'])), + utils.TaskOneArgOneReturn('task1', + rebind=['a'], provides=['x']), + utils.TaskOneArgOneReturn('task2', provides=['y'])), uf.Flow('uf').add( - TaskOneArgOneReturn('task3', rebind=['b'], provides=['z']), - TaskOneArgOneReturn('task4', rebind=['c'], provides=['q']))) + utils.TaskOneArgOneReturn('task3', + rebind=['b'], provides=['z']), + utils.TaskOneArgOneReturn('task4', rebind=['c'], + provides=['q']))) self.assertEquals(flow.requires, set(['a', 'b', 'c'])) self.assertEquals(flow.provides, set(['x', 'y', 'z', 'q'])) + + def test_graph_flow_without_dependencies(self): + flow = gf.Flow('gf').add( + utils.TaskNoRequiresNoReturns('task1'), + utils.TaskNoRequiresNoReturns('task2')) + self.assertEquals(flow.requires, set()) + self.assertEquals(flow.provides, set()) + + def test_graph_flow_self_requires(self): + with self.assertRaisesRegexp(exceptions.DependencyFailure, '^No path'): + gf.Flow('g-1-req-error').add( + utils.TaskOneArgOneReturn(requires=['a'], provides='a')) + + def test_graph_flow_reuires_values(self): + flow = gf.Flow('gf').add( + utils.TaskOneArg('task1'), + utils.TaskMultiArg('task2')) + self.assertEquals(flow.requires, set(['x', 'y', 'z'])) + self.assertEquals(flow.provides, set()) + + def test_graph_flow_reuires_rebind_values(self): + flow = gf.Flow('gf').add( + utils.TaskOneArg('task1', rebind=['q']), + utils.TaskMultiArg('task2')) + self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q'])) + self.assertEquals(flow.provides, set()) + + def test_graph_flow_provides_values(self): + flow = gf.Flow('gf').add( + utils.TaskOneReturn('task1', provides='x'), + utils.TaskMultiReturn('task2', provides=['a', 'b', 'c'])) + self.assertEquals(flow.requires, set()) + self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c'])) + + def test_graph_flow_provides_required_values(self): + flow = gf.Flow('gf').add( + utils.TaskOneReturn('task1', provides='x'), + utils.TaskOneArg('task2')) + self.assertEquals(flow.requires, set()) + self.assertEquals(flow.provides, set(['x'])) + + def test_graph_flow_provides_provided_value_other_call(self): + flow = gf.Flow('gf') + flow.add(utils.TaskOneReturn('task1', provides='x')) + with self.assertRaises(exceptions.DependencyFailure): + flow.add(utils.TaskOneReturn('task2', provides='x')) + + def test_graph_flow_multi_provides_and_requires_values(self): + flow = gf.Flow('gf').add( + utils.TaskMultiArgMultiReturn('task1', + rebind=['a', 'b', 'c'], + provides=['d', 'e', 'f']), + utils.TaskMultiArgMultiReturn('task2', + provides=['i', 'j', 'k'])) + self.assertEquals(flow.requires, set(['a', 'b', 'c', 'x', 'y', 'z'])) + self.assertEquals(flow.provides, set(['d', 'e', 'f', 'i', 'j', 'k'])) + + def test_graph_cyclic_dependency(self): + with self.assertRaisesRegexp(exceptions.DependencyFailure, '^No path'): + gf.Flow('g-3-cyclic').add( + utils.TaskOneArgOneReturn(provides='a', requires=['b']), + utils.TaskOneArgOneReturn(provides='b', requires=['c']), + utils.TaskOneArgOneReturn(provides='c', requires=['a'])) diff --git a/taskflow/tests/unit/test_graph_flow.py b/taskflow/tests/unit/test_graph_flow.py index 04d3fba26..85508bb2d 100644 --- a/taskflow/tests/unit/test_graph_flow.py +++ b/taskflow/tests/unit/test_graph_flow.py @@ -20,9 +20,7 @@ import collections import taskflow.engines -from taskflow import exceptions as exc from taskflow.patterns import graph_flow as gw -from taskflow import states from taskflow.utils import flow_utils as fu from taskflow.utils import graph_utils as gu @@ -32,7 +30,7 @@ from taskflow.tests import utils class GraphFlowTest(test.TestCase): def _make_engine(self, flow): - return taskflow.engines.load(flow, store={'context': {}}) + return taskflow.engines.load(flow, store={}) def _capture_states(self): # TODO(harlowja): move function to shared helper @@ -64,26 +62,6 @@ class GraphFlowTest(test.TestCase): self.assertEquals([test_1], list(gu.get_no_predecessors(wf.graph))) self.assertEquals([test_3], list(gu.get_no_successors(wf.graph))) - def test_invalid_add_simple(self): - wf = gw.Flow("the-test-action") - test_1 = utils.ProvidesRequiresTask('test-1', - requires=['a'], - provides=set(['a', 'b'])) - self.assertRaises(exc.DependencyFailure, wf.add, test_1) - self.assertEquals(0, len(wf)) - - def test_invalid_add_loop(self): - wf = gw.Flow("the-test-action") - test_1 = utils.ProvidesRequiresTask('test-1', - requires=['c'], - provides=set(['a', 'b'])) - test_2 = utils.ProvidesRequiresTask('test-2', - requires=['a', 'b'], - provides=set(['c'])) - wf.add(test_1) - self.assertRaises(exc.DependencyFailure, wf.add, test_2) - self.assertEquals(1, len(wf)) - def test_basic_edge_reasons(self): wf = gw.Flow("the-test-action") test_1 = utils.ProvidesRequiresTask('test-1', @@ -136,28 +114,3 @@ class GraphFlowTest(test.TestCase): edge_attrs = gu.get_edge_attrs(g, test_1, test_2) self.assertTrue(edge_attrs.get('manual')) self.assertTrue(edge_attrs.get('flatten')) - - def test_graph_run(self): - wf = gw.Flow("the-test-action") - test_1 = utils.ProvidesRequiresTask('test-1', - requires=[], - provides=[]) - test_2 = utils.ProvidesRequiresTask('test-2', - provides=[], - requires=[]) - wf.add(test_1, test_2) - wf.link(test_1, test_2) - self.assertEquals(2, len(wf)) - - e = self._make_engine(wf) - capture_func, captured = self._capture_states() - e.task_notifier.register('*', capture_func) - e.run() - - self.assertEquals(2, len(captured)) - for (_uuid, t_states) in captured.items(): - self.assertEquals([states.RUNNING, states.SUCCESS], t_states) - - run_context = e.storage.fetch('context') - ordering = [o['name'] for o in run_context[utils.ORDER_KEY]] - self.assertEquals(['test-1', 'test-2'], ordering) diff --git a/taskflow/tests/unit/test_linear_flow.py b/taskflow/tests/unit/test_linear_flow.py deleted file mode 100644 index 159b2556e..000000000 --- a/taskflow/tests/unit/test_linear_flow.py +++ /dev/null @@ -1,260 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections - -import taskflow.engines -from taskflow import exceptions as exc -from taskflow.patterns import linear_flow as lw -from taskflow import states -from taskflow import task -from taskflow import test - -from taskflow.tests import utils - - -class LinearFlowTest(test.TestCase): - def _make_engine(self, flow): - return taskflow.engines.load(flow, store={'context': {}}) - - def test_result_access(self): - - def do_apply1(context): - return [1, 2] - - wf = lw.Flow("the-test-action") - wf.add(task.FunctorTask(do_apply1, provides=['a', 'b'])) - - e = self._make_engine(wf) - e.run() - data = e.storage.fetch_all() - self.assertIn('a', data) - self.assertIn('b', data) - self.assertEquals(2, data['b']) - self.assertEquals(1, data['a']) - - def test_functor_flow(self): - wf = lw.Flow("the-test-action") - - def do_apply1(context): - context['1'] = True - return ['a', 'b', 'c'] - - def do_apply2(context, a, **kwargs): - self.assertTrue('c' in kwargs) - self.assertEquals('a', a) - context['2'] = True - - wf.add(task.FunctorTask(do_apply1, provides=['a', 'b', 'c'])) - wf.add(task.FunctorTask(do_apply2, requires=set(['c']))) - - e = self._make_engine(wf) - e.run() - self.assertEquals(2, len(e.storage.fetch('context'))) - - def test_sad_flow_state_changes(self): - changes = [] - task_changes = [] - - def listener(state, details): - changes.append(state) - - def task_listener(state, details): - if details.get('task_name') == 'blowup_1': - task_changes.append(state) - - wf = lw.Flow("the-test-action") - wf.add(utils.make_reverting_task(2, False)) - wf.add(utils.make_reverting_task(1, True)) - - e = self._make_engine(wf) - e.notifier.register('*', listener) - e.task_notifier.register('*', task_listener) - self.assertRaises(Exception, e.run) - - expected_states = [ - states.RUNNING, - states.FAILURE, - states.REVERTING, - states.REVERTED, - ] - self.assertEquals(expected_states, changes) - expected_states = [ - states.RUNNING, - states.FAILURE, - states.REVERTING, - states.REVERTED, - states.PENDING, - ] - self.assertEquals(expected_states, task_changes) - context = e.storage.fetch('context') - - # Only 2 should have been reverted (which should have been - # marked in the context as occuring). - self.assertIn(2, context) - self.assertEquals('reverted', context[2]) - self.assertNotIn(1, context) - - def test_happy_flow_state_changes(self): - changes = [] - - def listener(state, details): - changes.append(state) - - wf = lw.Flow("the-test-action") - wf.add(utils.make_reverting_task(1)) - - e = self._make_engine(wf) - e.notifier.register('*', listener) - e.run() - - self.assertEquals([states.RUNNING, states.SUCCESS], changes) - - def test_happy_flow(self): - wf = lw.Flow("the-test-action") - for i in range(0, 10): - wf.add(utils.make_reverting_task(i)) - - e = self._make_engine(wf) - capture_func, captured = self._capture_states() - e.task_notifier.register('*', capture_func) - e.run() - - context = e.storage.fetch('context') - self.assertEquals(10, len(context)) - self.assertEquals(10, len(captured)) - for _k, v in context.items(): - self.assertEquals('passed', v) - for _uuid, u_states in captured.items(): - self.assertEquals([states.RUNNING, states.SUCCESS], u_states) - - def _capture_states(self): - capture_where = collections.defaultdict(list) - - def do_capture(state, details): - task_uuid = details.get('task_uuid') - if not task_uuid: - return - capture_where[task_uuid].append(state) - - return (do_capture, capture_where) - - def test_reverting_flow(self): - wf = lw.Flow("the-test-action") - wf.add(utils.make_reverting_task(1)) - wf.add(utils.make_reverting_task(2, True)) - - capture_func, captured = self._capture_states() - e = self._make_engine(wf) - e.task_notifier.register('*', capture_func) - - self.assertRaises(Exception, e.run) - - run_context = e.storage.fetch('context') - self.assertEquals('reverted', run_context[1]) - self.assertEquals(1, len(run_context)) - - blowup_id = e.storage.get_uuid_by_name('blowup_2') - happy_id = e.storage.get_uuid_by_name('do_apply_1') - self.assertEquals(2, len(captured)) - self.assertIn(blowup_id, captured) - - expected_states = [states.RUNNING, states.FAILURE, states.REVERTING, - states.REVERTED, states.PENDING] - self.assertEquals(expected_states, captured[blowup_id]) - - expected_states = [states.RUNNING, states.SUCCESS, states.REVERTING, - states.REVERTED, states.PENDING] - self.assertIn(happy_id, captured) - self.assertEquals(expected_states, captured[happy_id]) - - def test_not_satisfied_inputs(self): - - def task_a(context, *args, **kwargs): - pass - - def task_b(context, c, *args, **kwargs): - pass - - wf = lw.Flow("the-test-action") - wf.add(task.FunctorTask(task_a)) - wf.add(task.FunctorTask(task_b)) - e = self._make_engine(wf) - self.assertRaises(exc.MissingDependencies, e.run) - - def test_flow_bad_order(self): - wf = lw.Flow("the-test-action") - - wf.add(utils.ProvidesRequiresTask('test-1', - requires=set(), - provides=['a', 'b'])) - - # This one should fail to add since it requires 'c' - no_req_task = utils.ProvidesRequiresTask('test-2', requires=['c'], - provides=[]) - wf.add(no_req_task) - e = self._make_engine(wf) - self.assertRaises(exc.MissingDependencies, e.run) - - def test_flow_set_order(self): - wf = lw.Flow("the-test-action") - wf.add(utils.ProvidesRequiresTask('test-1', - requires=[], - provides=set(['a', 'b']))) - wf.add(utils.ProvidesRequiresTask('test-2', - requires=set(['a', 'b']), - provides=set([]))) - e = self._make_engine(wf) - e.run() - run_context = e.storage.fetch('context') - ordering = run_context[utils.ORDER_KEY] - self.assertEquals(2, len(ordering)) - self.assertEquals('test-1', ordering[0]['name']) - self.assertEquals('test-2', ordering[1]['name']) - self.assertEquals({'a': 'a', 'b': 'b'}, - ordering[1][utils.KWARGS_KEY]) - self.assertEquals({}, - ordering[0][utils.KWARGS_KEY]) - - def test_flow_list_order(self): - wf = lw.Flow("the-test-action") - wf.add(utils.ProvidesRequiresTask('test-1', - requires=[], - provides=['a', 'b'])) - wf.add(utils.ProvidesRequiresTask('test-2', - requires=['a', 'b'], - provides=['c', 'd'])) - wf.add(utils.ProvidesRequiresTask('test-3', - requires=['c', 'd'], - provides=[])) - wf.add(utils.ProvidesRequiresTask('test-4', - requires=[], - provides=['d'])) - wf.add(utils.ProvidesRequiresTask('test-5', - requires=[], - provides=['d'])) - wf.add(utils.ProvidesRequiresTask('test-6', - requires=['d'], - provides=[])) - - e = self._make_engine(wf) - e.run() - run_context = e.storage.fetch('context') - ordering = run_context[utils.ORDER_KEY] - for i, entry in enumerate(ordering): - self.assertEquals('test-%s' % (i + 1), entry['name']) diff --git a/taskflow/tests/unit/test_suspend_flow.py b/taskflow/tests/unit/test_suspend_flow.py new file mode 100644 index 000000000..a4c8ed5f9 --- /dev/null +++ b/taskflow/tests/unit/test_suspend_flow.py @@ -0,0 +1,180 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +from taskflow.patterns import linear_flow as lf + +import taskflow.engines + +from taskflow import exceptions as exc +from taskflow import states +from taskflow import task +from taskflow import test +from taskflow.tests import utils + + +class TestTask(task.Task): + + def __init__(self, values=None, name=None, sleep=None, + provides=None, rebind=None, requires=None): + super(TestTask, self).__init__(name=name, provides=provides, + rebind=rebind, requires=requires) + if values is None: + self.values = [] + else: + self.values = values + self._sleep = sleep + + def execute(self, **kwargs): + self.update_progress(0.0) + if self._sleep: + time.sleep(self._sleep) + self.values.append(self.name) + self.update_progress(1.0) + return 5 + + def revert(self, **kwargs): + self.update_progress(0) + if self._sleep: + time.sleep(self._sleep) + self.values.append(self.name + ' reverted(%s)' + % kwargs.get('result')) + self.update_progress(1.0) + + +class FailingTask(TestTask): + + def execute(self, **kwargs): + self.update_progress(0) + if self._sleep: + time.sleep(self._sleep) + self.update_progress(0.99) + raise RuntimeError('Woot!') + + +class AutoSuspendingTask(TestTask): + + def execute(self, engine): + result = super(AutoSuspendingTask, self).execute() + engine.suspend() + return result + + def revert(self, engine, result): + super(AutoSuspendingTask, self).revert(**{'result': result}) + + +class AutoSuspendingTaskOnRevert(TestTask): + + def execute(self, engine): + return super(AutoSuspendingTaskOnRevert, self).execute() + + def revert(self, engine, result): + super(AutoSuspendingTaskOnRevert, self).revert(**{'result': result}) + engine.suspend() + + +class SuspendFlowTest(utils.EngineTestBase): + + def test_suspend_one_task(self): + flow = AutoSuspendingTask(self.values, 'a') + engine = self._make_engine(flow) + engine.storage.inject({'engine': engine}) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS) + self.assertEquals(self.values, ['a']) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS) + self.assertEquals(self.values, ['a']) + + def test_suspend_linear_flow(self): + flow = lf.Flow('linear').add( + TestTask(self.values, 'a'), + AutoSuspendingTask(self.values, 'b'), + TestTask(self.values, 'c') + ) + engine = self._make_engine(flow) + engine.storage.inject({'engine': engine}) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED) + self.assertEquals(self.values, ['a', 'b']) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS) + self.assertEquals(self.values, ['a', 'b', 'c']) + + def test_suspend_linear_flow_on_revert(self): + flow = lf.Flow('linear').add( + TestTask(self.values, 'a'), + AutoSuspendingTaskOnRevert(self.values, 'b'), + FailingTask(self.values, 'c') + ) + engine = self._make_engine(flow) + engine.storage.inject({'engine': engine}) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED) + self.assertEquals( + self.values, + ['a', 'b', + 'c reverted(Failure: RuntimeError: Woot!)', + 'b reverted(5)']) + with self.assertRaisesRegexp(RuntimeError, '^Woot'): + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.REVERTED) + self.assertEquals( + self.values, + ['a', + 'b', + 'c reverted(Failure: RuntimeError: Woot!)', + 'b reverted(5)', + 'a reverted(5)']) + + def test_storage_is_rechecked(self): + flow = lf.Flow('linear').add( + AutoSuspendingTask(self.values, 'b'), + TestTask(self.values, name='c') + ) + engine = self._make_engine(flow) + engine.storage.inject({'engine': engine, 'boo': True}) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED) + # uninject engine + engine.storage.save( + engine.storage.get_uuid_by_name(engine.storage.injector_name), + None, + states.FAILURE) + with self.assertRaises(exc.MissingDependencies): + engine.run() + + +class SingleThreadedEngineTest(SuspendFlowTest, + test.TestCase): + def _make_engine(self, flow, flow_detail=None): + return taskflow.engines.load(flow, + flow_detail=flow_detail, + engine_conf='serial', + backend=self.backend) + + +class MultiThreadedEngineTest(SuspendFlowTest, + test.TestCase): + def _make_engine(self, flow, flow_detail=None, executor=None): + engine_conf = dict(engine='parallel', + executor=executor) + return taskflow.engines.load(flow, flow_detail=flow_detail, + engine_conf=engine_conf, + backend=self.backend) diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index db31a2a5b..81107ae80 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -16,8 +16,11 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib import six +import time +from taskflow.persistence.backends import impl_memory from taskflow import task ARGS_KEY = '__args__' @@ -45,27 +48,6 @@ def make_reverting_task(token, blowup=False): name='do_apply_%s' % token) -class ProvidesRequiresTask(task.Task): - def __init__(self, name, provides, requires, return_tuple=True): - super(ProvidesRequiresTask, self).__init__(name=name, - provides=provides, - requires=requires) - self.return_tuple = isinstance(provides, (tuple, list)) - - def execute(self, context, *args, **kwargs): - if ORDER_KEY not in context: - context[ORDER_KEY] = [] - context[ORDER_KEY].append({ - 'name': self.name, - KWARGS_KEY: kwargs, - ARGS_KEY: args, - }) - if self.return_tuple: - return tuple(range(len(self.provides))) - else: - return dict((k, k) for k in self.provides) - - class DummyTask(task.Task): def execute(self, context, *args, **kwargs): pass @@ -77,3 +59,168 @@ if six.PY3: else: RUNTIME_ERROR_CLASSES = ['RuntimeError', 'StandardError', 'Exception', 'BaseException', 'object'] + + +class ProvidesRequiresTask(task.Task): + def __init__(self, name, provides, requires, return_tuple=True): + super(ProvidesRequiresTask, self).__init__(name=name, + provides=provides, + requires=requires) + self.return_tuple = isinstance(provides, (tuple, list)) + + def execute(self, *args, **kwargs): + if self.return_tuple: + return tuple(range(len(self.provides))) + else: + return dict((k, k) for k in self.provides) + + +class SaveOrderTask(task.Task): + + def __init__(self, values=None, name=None, sleep=None, + *args, **kwargs): + super(SaveOrderTask, self).__init__(name=name, *args, **kwargs) + if values is None: + self.values = [] + else: + self.values = values + self._sleep = sleep + + def execute(self, **kwargs): + self.update_progress(0.0) + if self._sleep: + time.sleep(self._sleep) + self.values.append(self.name) + self.update_progress(1.0) + return 5 + + def revert(self, **kwargs): + self.update_progress(0) + if self._sleep: + time.sleep(self._sleep) + self.values.append(self.name + ' reverted(%s)' + % kwargs.get('result')) + self.update_progress(1.0) + + +class FailingTask(SaveOrderTask): + + def execute(self, **kwargs): + self.update_progress(0) + if self._sleep: + time.sleep(self._sleep) + self.update_progress(0.99) + raise RuntimeError('Woot!') + + +class NastyTask(task.Task): + def execute(self, **kwargs): + pass + + def revert(self, **kwargs): + raise RuntimeError('Gotcha!') + + +class TaskNoRequiresNoReturns(task.Task): + + def execute(self, **kwargs): + pass + + def revert(self, **kwargs): + pass + + +class TaskOneArg(task.Task): + + def execute(self, x, **kwargs): + pass + + def revert(self, x, **kwargs): + pass + + +class TaskMultiArg(task.Task): + + def execute(self, x, y, z, **kwargs): + pass + + def revert(self, x, y, z, **kwargs): + pass + + +class TaskOneReturn(task.Task): + + def execute(self, **kwargs): + return 1 + + def revert(self, **kwargs): + pass + + +class TaskMultiReturn(task.Task): + + def execute(self, **kwargs): + return 1, 3, 5 + + def revert(self, **kwargs): + pass + + +class TaskOneArgOneReturn(task.Task): + + def execute(self, x, **kwargs): + return 1 + + def revert(self, x, **kwargs): + pass + + +class TaskMultiArgOneReturn(task.Task): + + def execute(self, x, y, z, **kwargs): + return x + y + z + + def revert(self, x, y, z, **kwargs): + pass + + +class TaskMultiArgMultiReturn(task.Task): + + def execute(self, x, y, z, **kwargs): + return 1, 3, 5 + + def revert(self, x, y, z, **kwargs): + pass + + +class TaskMultiDictk(task.Task): + + def execute(self): + output = {} + for i, k in enumerate(sorted(self.provides)): + output[k] = i + return output + + +class NeverRunningTask(task.Task): + def execute(self, **kwargs): + assert False, 'This method should not be called' + + def revert(self, **kwargs): + assert False, 'This method should not be called' + + +class EngineTestBase(object): + def setUp(self): + super(EngineTestBase, self).setUp() + self.values = [] + self.backend = impl_memory.MemoryBackend(conf={}) + + def tearDown(self): + super(EngineTestBase, self).tearDown() + with contextlib.closing(self.backend) as be: + with contextlib.closing(be.get_connection()) as conn: + conn.clear_all() + + def _make_engine(self, flow, flow_detail=None): + raise NotImplementedError()