Merge "Unit tests refactoring"

This commit is contained in:
Jenkins 2013-10-18 19:58:12 +00:00 committed by Gerrit Code Review
commit ba129e2201
7 changed files with 828 additions and 969 deletions

View File

@ -18,7 +18,6 @@
import contextlib import contextlib
import networkx import networkx
import time
from concurrent import futures from concurrent import futures
@ -29,134 +28,17 @@ from taskflow.patterns import unordered_flow as uf
import taskflow.engines import taskflow.engines
from taskflow.engines.action_engine import engine as eng from taskflow.engines.action_engine import engine as eng
from taskflow import exceptions as exc
from taskflow.persistence.backends import impl_memory
from taskflow.persistence import logbook from taskflow.persistence import logbook
from taskflow import states from taskflow import states
from taskflow import task
from taskflow import test from taskflow import test
from taskflow.tests import utils
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
class TestTask(task.Task): class EngineTaskTest(utils.EngineTestBase):
def __init__(self, values=None, name=None, sleep=None,
provides=None, rebind=None, requires=None):
super(TestTask, self).__init__(name=name, provides=provides,
rebind=rebind, requires=requires)
if values is None:
self.values = []
else:
self.values = values
self._sleep = sleep
def execute(self, **kwargs):
self.update_progress(0.0)
if self._sleep:
time.sleep(self._sleep)
self.values.append(self.name)
self.update_progress(1.0)
return 5
def revert(self, **kwargs):
self.update_progress(0)
if self._sleep:
time.sleep(self._sleep)
self.values.append(self.name + ' reverted(%s)'
% kwargs.get('result'))
self.update_progress(1.0)
class FailingTask(TestTask):
def execute(self, **kwargs):
self.update_progress(0)
if self._sleep:
time.sleep(self._sleep)
self.update_progress(0.99)
raise RuntimeError('Woot!')
class NeverRunningTask(task.Task):
def execute(self, **kwargs):
assert False, 'This method should not be called'
def revert(self, **kwargs):
assert False, 'This method should not be called'
class NastyTask(task.Task):
def execute(self, **kwargs):
pass
def revert(self, **kwargs):
raise RuntimeError('Gotcha!')
class MultiReturnTask(task.Task):
def execute(self, **kwargs):
return 12, 2, 1
class MultiargsTask(task.Task):
def execute(self, a, b, c):
return a + b + c
class MultiDictTask(task.Task):
def execute(self):
self.update_progress(0)
output = {}
total = len(sorted(self.provides))
for i, k in enumerate(sorted(self.provides)):
output[k] = i
self.update_progress(i / total)
self.update_progress(1.0)
return output
class AutoSuspendingTask(TestTask):
def execute(self, engine):
result = super(AutoSuspendingTask, self).execute()
engine.suspend()
return result
def revert(self, engine, result):
super(AutoSuspendingTask, self).revert(**{'result': result})
class AutoSuspendingTaskOnRevert(TestTask):
def execute(self, engine):
return super(AutoSuspendingTaskOnRevert, self).execute()
def revert(self, engine, result):
super(AutoSuspendingTaskOnRevert, self).revert(**{'result': result})
engine.suspend()
class EngineTestBase(object):
def setUp(self):
super(EngineTestBase, self).setUp()
self.values = []
self.backend = impl_memory.MemoryBackend(conf={})
def tearDown(self):
super(EngineTestBase, self).tearDown()
with contextlib.closing(self.backend) as be:
with contextlib.closing(be.get_connection()) as conn:
conn.clear_all()
def _make_engine(self, flow, flow_detail=None):
raise NotImplementedError()
class EngineTaskTest(EngineTestBase):
def test_run_task_as_flow(self): def test_run_task_as_flow(self):
flow = lf.Flow('test-1') flow = utils.SaveOrderTask(self.values, name='task1')
flow.add(TestTask(self.values, name='task1'))
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.run() engine.run()
self.assertEquals(self.values, ['task1']) self.assertEquals(self.values, ['task1'])
@ -171,7 +53,7 @@ class EngineTaskTest(EngineTestBase):
values.append('flow %s' % state) values.append('flow %s' % state)
def test_run_task_with_notifications(self): def test_run_task_with_notifications(self):
flow = TestTask(self.values, name='task1') flow = utils.SaveOrderTask(self.values, name='task1')
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.notifier.register('*', self._flow_callback, engine.notifier.register('*', self._flow_callback,
kwargs={'values': self.values}) kwargs={'values': self.values})
@ -186,7 +68,7 @@ class EngineTaskTest(EngineTestBase):
'flow SUCCESS']) 'flow SUCCESS'])
def test_failing_task_with_notifications(self): def test_failing_task_with_notifications(self):
flow = FailingTask(self.values, 'fail') flow = utils.FailingTask(self.values, 'fail')
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.notifier.register('*', self._flow_callback, engine.notifier.register('*', self._flow_callback,
kwargs={'values': self.values}) kwargs={'values': self.values})
@ -221,132 +103,30 @@ class EngineTaskTest(EngineTestBase):
engine.run() engine.run()
self.assertIn(value, str(err.exception)) self.assertIn(value, str(err.exception))
def test_save_as(self):
flow = TestTask(self.values, name='task1', provides='first_data')
engine = self._make_engine(flow)
engine.run()
self.assertEquals(self.values, ['task1'])
self.assertEquals(engine.storage.fetch_all(), {'first_data': 5})
def test_save_all_in_one(self): class EngineLinearFlowTest(utils.EngineTestBase):
flow = MultiReturnTask(provides='all_data')
engine = self._make_engine(flow)
engine.run()
self.assertEquals(engine.storage.fetch_all(),
{'all_data': (12, 2, 1)})
def test_save_several_values(self):
flow = MultiReturnTask(provides=('badger', 'mushroom', 'snake'))
engine = self._make_engine(flow)
engine.run()
self.assertEquals(engine.storage.fetch_all(), {
'badger': 12,
'mushroom': 2,
'snake': 1
})
def test_save_dict(self):
flow = MultiDictTask(provides=set(['badger', 'mushroom', 'snake']))
engine = self._make_engine(flow)
engine.run()
self.assertEquals(engine.storage.fetch_all(), {
'badger': 0,
'mushroom': 1,
'snake': 2,
})
def test_bad_save_as_value(self):
with self.assertRaises(TypeError):
TestTask(name='task1', provides=object())
def test_arguments_passing(self):
flow = MultiargsTask(provides='result')
engine = self._make_engine(flow)
engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17})
engine.run()
self.assertEquals(engine.storage.fetch_all(), {
'a': 1, 'b': 4, 'c': 9, 'x': 17,
'result': 14,
})
def test_arguments_missing(self):
flow = MultiargsTask(provides='result')
engine = self._make_engine(flow)
engine.storage.inject({'a': 1, 'b': 4, 'x': 17})
with self.assertRaises(exc.MissingDependencies):
engine.run()
def test_partial_arguments_mapping(self):
flow = MultiargsTask(name='task1',
provides='result',
rebind={'b': 'x'})
engine = self._make_engine(flow)
engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17})
engine.run()
self.assertEquals(engine.storage.fetch_all(), {
'a': 1, 'b': 4, 'c': 9, 'x': 17,
'result': 27,
})
def test_all_arguments_mapping(self):
flow = MultiargsTask(name='task1',
provides='result',
rebind=['x', 'y', 'z'])
engine = self._make_engine(flow)
engine.storage.inject({
'a': 1, 'b': 2, 'c': 3, 'x': 4, 'y': 5, 'z': 6
})
engine.run()
self.assertEquals(engine.storage.fetch_all(), {
'a': 1, 'b': 2, 'c': 3, 'x': 4, 'y': 5, 'z': 6,
'result': 15,
})
def test_invalid_argument_name_map(self):
flow = MultiargsTask(name='task1', provides='result',
rebind={'b': 'z'})
engine = self._make_engine(flow)
engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17})
with self.assertRaises(exc.MissingDependencies):
engine.run()
def test_invalid_argument_name_list(self):
flow = MultiargsTask(name='task1',
provides='result',
rebind=['a', 'z', 'b'])
engine = self._make_engine(flow)
engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17})
with self.assertRaises(exc.MissingDependencies):
engine.run()
def test_bad_rebind_args_value(self):
with self.assertRaises(TypeError):
TestTask(name='task1',
rebind=object())
class EngineLinearFlowTest(EngineTestBase):
def test_sequential_flow_one_task(self): def test_sequential_flow_one_task(self):
flow = lf.Flow('flow-1').add( flow = lf.Flow('flow-1').add(
TestTask(self.values, name='task1') utils.SaveOrderTask(self.values, name='task1')
) )
self._make_engine(flow).run() self._make_engine(flow).run()
self.assertEquals(self.values, ['task1']) self.assertEquals(self.values, ['task1'])
def test_sequential_flow_two_tasks(self): def test_sequential_flow_two_tasks(self):
flow = lf.Flow('flow-2').add( flow = lf.Flow('flow-2').add(
TestTask(self.values, name='task1'), utils.SaveOrderTask(self.values, name='task1'),
TestTask(self.values, name='task2') utils.SaveOrderTask(self.values, name='task2')
) )
self._make_engine(flow).run() self._make_engine(flow).run()
self.assertEquals(self.values, ['task1', 'task2']) self.assertEquals(self.values, ['task1', 'task2'])
self.assertEquals(len(flow), 2)
def test_revert_removes_data(self): def test_revert_removes_data(self):
flow = lf.Flow('revert-removes').add( flow = lf.Flow('revert-removes').add(
TestTask(provides='one'), utils.TaskOneReturn(provides='one'),
MultiReturnTask(provides=('a', 'b', 'c')), utils.TaskMultiReturn(provides=('a', 'b', 'c')),
FailingTask(name='fail') utils.FailingTask(name='fail')
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Woot'): with self.assertRaisesRegexp(RuntimeError, '^Woot'):
@ -355,9 +135,9 @@ class EngineLinearFlowTest(EngineTestBase):
def test_sequential_flow_nested_blocks(self): def test_sequential_flow_nested_blocks(self):
flow = lf.Flow('nested-1').add( flow = lf.Flow('nested-1').add(
TestTask(self.values, 'task1'), utils.SaveOrderTask(self.values, 'task1'),
lf.Flow('inner-1').add( lf.Flow('inner-1').add(
TestTask(self.values, 'task2') utils.SaveOrderTask(self.values, 'task2')
) )
) )
self._make_engine(flow).run() self._make_engine(flow).run()
@ -365,8 +145,8 @@ class EngineLinearFlowTest(EngineTestBase):
def test_revert_exception_is_reraised(self): def test_revert_exception_is_reraised(self):
flow = lf.Flow('revert-1').add( flow = lf.Flow('revert-1').add(
NastyTask(), utils.NastyTask(),
FailingTask(name='fail') utils.FailingTask(name='fail')
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Gotcha'): with self.assertRaisesRegexp(RuntimeError, '^Gotcha'):
@ -374,8 +154,8 @@ class EngineLinearFlowTest(EngineTestBase):
def test_revert_not_run_task_is_not_reverted(self): def test_revert_not_run_task_is_not_reverted(self):
flow = lf.Flow('revert-not-run').add( flow = lf.Flow('revert-not-run').add(
FailingTask(self.values, 'fail'), utils.FailingTask(self.values, 'fail'),
NeverRunningTask(), utils.NeverRunningTask(),
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Woot'): with self.assertRaisesRegexp(RuntimeError, '^Woot'):
@ -386,10 +166,10 @@ class EngineLinearFlowTest(EngineTestBase):
def test_correctly_reverts_children(self): def test_correctly_reverts_children(self):
flow = lf.Flow('root-1').add( flow = lf.Flow('root-1').add(
TestTask(self.values, 'task1'), utils.SaveOrderTask(self.values, 'task1'),
lf.Flow('child-1').add( lf.Flow('child-1').add(
TestTask(self.values, 'task2'), utils.SaveOrderTask(self.values, 'task2'),
FailingTask(self.values, 'fail') utils.FailingTask(self.values, 'fail')
) )
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
@ -402,30 +182,31 @@ class EngineLinearFlowTest(EngineTestBase):
'task2 reverted(5)', 'task1 reverted(5)']) 'task2 reverted(5)', 'task1 reverted(5)'])
class EngineParallelFlowTest(EngineTestBase): class EngineParallelFlowTest(utils.EngineTestBase):
def test_parallel_flow_one_task(self): def test_parallel_flow_one_task(self):
flow = uf.Flow('p-1').add( flow = uf.Flow('p-1').add(
TestTask(self.values, name='task1', sleep=0.01) utils.SaveOrderTask(self.values, name='task1', sleep=0.01)
) )
self._make_engine(flow).run() self._make_engine(flow).run()
self.assertEquals(self.values, ['task1']) self.assertEquals(self.values, ['task1'])
def test_parallel_flow_two_tasks(self): def test_parallel_flow_two_tasks(self):
flow = uf.Flow('p-2').add( flow = uf.Flow('p-2').add(
TestTask(self.values, name='task1', sleep=0.01), utils.SaveOrderTask(self.values, name='task1', sleep=0.01),
TestTask(self.values, name='task2', sleep=0.01) utils.SaveOrderTask(self.values, name='task2', sleep=0.01)
) )
self._make_engine(flow).run() self._make_engine(flow).run()
result = set(self.values) result = set(self.values)
self.assertEquals(result, set(['task1', 'task2'])) self.assertEquals(result, set(['task1', 'task2']))
self.assertEquals(len(flow), 2)
def test_parallel_revert_common(self): def test_parallel_revert_common(self):
flow = uf.Flow('p-r-3').add( flow = uf.Flow('p-r-3').add(
TestTask(self.values, name='task1'), utils.TaskNoRequiresNoReturns(name='task1'),
FailingTask(self.values, sleep=0.01), utils.FailingTask(sleep=0.01),
TestTask(self.values, name='task2') utils.TaskNoRequiresNoReturns(name='task2')
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Woot'): with self.assertRaisesRegexp(RuntimeError, '^Woot'):
@ -438,10 +219,10 @@ class EngineParallelFlowTest(EngineTestBase):
# FailingTask fails. # FailingTask fails.
flow = lf.Flow('p-r-r-l').add( flow = lf.Flow('p-r-r-l').add(
uf.Flow('p-r-r').add( uf.Flow('p-r-r').add(
TestTask(self.values, name='task1'), utils.TaskNoRequiresNoReturns(name='task1'),
NastyTask() utils.NastyTask()
), ),
FailingTask(self.values, sleep=0.1) utils.FailingTask(self.values, sleep=0.1)
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Gotcha'): with self.assertRaisesRegexp(RuntimeError, '^Gotcha'):
@ -449,8 +230,8 @@ class EngineParallelFlowTest(EngineTestBase):
def test_sequential_flow_two_tasks_with_resumption(self): def test_sequential_flow_two_tasks_with_resumption(self):
flow = lf.Flow('lf-2-r').add( flow = lf.Flow('lf-2-r').add(
TestTask(self.values, name='task1', provides='x1'), utils.SaveOrderTask(self.values, name='task1', provides='x1'),
TestTask(self.values, name='task2', provides='x2') utils.SaveOrderTask(self.values, name='task2', provides='x2')
) )
# Create FlowDetail as if we already run task1 # Create FlowDetail as if we already run task1
@ -470,255 +251,11 @@ class EngineParallelFlowTest(EngineTestBase):
self.assertEquals(engine.storage.fetch_all(), self.assertEquals(engine.storage.fetch_all(),
{'x1': 17, 'x2': 5}) {'x1': 17, 'x2': 5})
class EngineGraphFlowTest(EngineTestBase):
def test_graph_flow_one_task(self):
flow = gf.Flow('g-1').add(
TestTask(self.values, name='task1')
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['task1'])
def test_graph_flow_two_independent_tasks(self):
flow = gf.Flow('g-2').add(
TestTask(self.values, name='task1'),
TestTask(self.values, name='task2')
)
self._make_engine(flow).run()
self.assertEquals(set(self.values), set(['task1', 'task2']))
def test_graph_flow_two_tasks(self):
flow = gf.Flow('g-1-1').add(
TestTask(self.values, name='task2', requires=['a']),
TestTask(self.values, name='task1', provides='a')
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['task1', 'task2'])
def test_graph_flow_four_tasks_added_separately(self):
flow = (gf.Flow('g-4')
.add(TestTask(self.values, name='task4',
provides='d', requires=['c']))
.add(TestTask(self.values, name='task2',
provides='b', requires=['a']))
.add(TestTask(self.values, name='task3',
provides='c', requires=['b']))
.add(TestTask(self.values, name='task1',
provides='a'))
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['task1', 'task2', 'task3', 'task4'])
def test_graph_cyclic_dependency(self):
with self.assertRaisesRegexp(exc.DependencyFailure, '^No path'):
gf.Flow('g-3-cyclic').add(
TestTask([], name='task1', provides='a', requires=['b']),
TestTask([], name='task2', provides='b', requires=['c']),
TestTask([], name='task3', provides='c', requires=['a']))
def test_graph_two_tasks_returns_same_value(self):
with self.assertRaisesRegexp(exc.DependencyFailure,
"task2 provides a but is already being"
" provided by task1 and duplicate"
" producers are disallowed"):
gf.Flow('g-2-same-value').add(
TestTask([], name='task1', provides='a'),
TestTask([], name='task2', provides='a'))
def test_graph_flow_four_tasks_revert(self):
flow = gf.Flow('g-4-failing').add(
TestTask(self.values, name='task4', provides='d', requires=['c']),
TestTask(self.values, name='task2', provides='b', requires=['a']),
FailingTask(self.values, name='task3',
provides='c', requires=['b']),
TestTask(self.values, name='task1', provides='a'))
engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
engine.run()
self.assertEquals(
self.values,
['task1', 'task2',
'task3 reverted(Failure: RuntimeError: Woot!)',
'task2 reverted(5)', 'task1 reverted(5)'])
def test_graph_flow_four_tasks_revert_failure(self):
flow = gf.Flow('g-3-nasty').add(
NastyTask(name='task2', provides='b', requires=['a']),
FailingTask(self.values, name='task3', requires=['b']),
TestTask(self.values, name='task1', provides='a'))
engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Gotcha'):
engine.run()
def test_graph_flow_with_multireturn_and_multiargs_tasks(self):
flow = gf.Flow('g-3-multi').add(
MultiargsTask(name='task1', rebind=['a', 'b', 'y'], provides='z'),
MultiReturnTask(name='task2', provides=['a', 'b', 'c']),
MultiargsTask(name='task3', rebind=['c', 'b', 'x'], provides='y'))
engine = self._make_engine(flow)
engine.storage.inject({'x': 30})
engine.run()
self.assertEquals(engine.storage.fetch_all(), {
'a': 12,
'b': 2,
'c': 1,
'x': 30,
'y': 33,
'z': 47
})
def test_one_task_provides_and_requires_same_data(self):
with self.assertRaisesRegexp(exc.DependencyFailure, '^No path'):
gf.Flow('g-1-req-error').add(
TestTask([], name='task1', requires=['a'], provides='a'))
def test_task_graph_property(self):
flow = gf.Flow('test').add(
TestTask(name='task1'),
TestTask(name='task2'))
engine = self._make_engine(flow)
graph = engine.execution_graph
self.assertTrue(isinstance(graph, networkx.DiGraph))
def test_task_graph_property_for_one_task(self):
flow = TestTask(name='task1')
engine = self._make_engine(flow)
graph = engine.execution_graph
self.assertTrue(isinstance(graph, networkx.DiGraph))
class SuspendFlowTest(EngineTestBase):
def test_suspend_one_task(self):
flow = AutoSuspendingTask(self.values, 'a')
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS)
self.assertEquals(self.values, ['a'])
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS)
self.assertEquals(self.values, ['a'])
def test_suspend_linear_flow(self):
flow = lf.Flow('linear').add(
TestTask(self.values, 'a'),
AutoSuspendingTask(self.values, 'b'),
TestTask(self.values, 'c')
)
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED)
self.assertEquals(self.values, ['a', 'b'])
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS)
self.assertEquals(self.values, ['a', 'b', 'c'])
def test_suspend_linear_flow_on_revert(self):
flow = lf.Flow('linear').add(
TestTask(self.values, 'a'),
AutoSuspendingTaskOnRevert(self.values, 'b'),
FailingTask(self.values, 'c')
)
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED)
self.assertEquals(
self.values,
['a', 'b',
'c reverted(Failure: RuntimeError: Woot!)',
'b reverted(5)'])
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.REVERTED)
self.assertEquals(
self.values,
['a',
'b',
'c reverted(Failure: RuntimeError: Woot!)',
'b reverted(5)',
'a reverted(5)'])
def test_storage_is_rechecked(self):
flow = lf.Flow('linear').add(
AutoSuspendingTask(self.values, 'b'),
TestTask(self.values, name='c')
)
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine, 'boo': True})
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED)
# uninject engine
engine.storage.save(
engine.storage.get_uuid_by_name(engine.storage.injector_name),
None,
states.FAILURE)
with self.assertRaises(exc.MissingDependencies):
engine.run()
class SingleThreadedEngineTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineGraphFlowTest,
SuspendFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None):
return taskflow.engines.load(flow,
flow_detail=flow_detail,
engine_conf='serial',
backend=self.backend)
def test_correct_load(self):
engine = self._make_engine(TestTask)
self.assertIsInstance(engine, eng.SingleThreadedActionEngine)
def test_singlethreaded_is_the_default(self):
engine = taskflow.engines.load(TestTask)
self.assertIsInstance(engine, eng.SingleThreadedActionEngine)
class MultiThreadedEngineTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineGraphFlowTest,
SuspendFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
engine_conf = dict(engine='parallel',
executor=executor)
return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf,
backend=self.backend)
def test_correct_load(self):
engine = self._make_engine(TestTask)
self.assertIsInstance(engine, eng.MultiThreadedActionEngine)
self.assertIs(engine.executor, None)
def test_using_common_executor(self):
flow = TestTask(self.values, name='task1')
executor = futures.ThreadPoolExecutor(2)
try:
e1 = self._make_engine(flow, executor=executor)
e2 = self._make_engine(flow, executor=executor)
self.assertIs(e1.executor, e2.executor)
finally:
executor.shutdown(wait=True)
def test_parallel_revert_specific(self): def test_parallel_revert_specific(self):
flow = uf.Flow('p-r-r').add( flow = uf.Flow('p-r-r').add(
TestTask(self.values, name='task1', sleep=0.01), utils.SaveOrderTask(self.values, name='task1', sleep=0.01),
FailingTask(sleep=0.01), utils.FailingTask(sleep=0.01),
TestTask(self.values, name='task2', sleep=0.01) utils.SaveOrderTask(self.values, name='task2', sleep=0.01)
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Woot'): with self.assertRaisesRegexp(RuntimeError, '^Woot'):
@ -732,10 +269,11 @@ class MultiThreadedEngineTest(EngineTaskTest,
def test_parallel_revert_exception_is_reraised_(self): def test_parallel_revert_exception_is_reraised_(self):
flow = lf.Flow('p-r-reraise').add( flow = lf.Flow('p-r-reraise').add(
TestTask(self.values, name='task1', sleep=0.01), utils.SaveOrderTask(self.values, name='task1', sleep=0.01),
NastyTask(), utils.NastyTask(),
FailingTask(sleep=0.01), utils.FailingTask(sleep=0.01),
TestTask() # this should not get reverted utils.SaveOrderTask(self.values,
name='task2') # this should not get reverted
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Gotcha'): with self.assertRaisesRegexp(RuntimeError, '^Gotcha'):
@ -745,12 +283,12 @@ class MultiThreadedEngineTest(EngineTaskTest,
def test_nested_parallel_revert_exception_is_reraised(self): def test_nested_parallel_revert_exception_is_reraised(self):
flow = uf.Flow('p-root').add( flow = uf.Flow('p-root').add(
TestTask(self.values, name='task1'), utils.SaveOrderTask(self.values, name='task1'),
TestTask(self.values, name='task2'), utils.SaveOrderTask(self.values, name='task2'),
lf.Flow('p-inner').add( lf.Flow('p-inner').add(
TestTask(self.values, name='task3', sleep=0.1), utils.SaveOrderTask(self.values, name='task3', sleep=0.1),
NastyTask(), utils.NastyTask(),
FailingTask(sleep=0.01) utils.FailingTask(sleep=0.01)
) )
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
@ -767,12 +305,12 @@ class MultiThreadedEngineTest(EngineTaskTest,
def test_parallel_revert_exception_do_not_revert_linear_tasks(self): def test_parallel_revert_exception_do_not_revert_linear_tasks(self):
flow = lf.Flow('l-root').add( flow = lf.Flow('l-root').add(
TestTask(self.values, name='task1'), utils.SaveOrderTask(self.values, name='task1'),
TestTask(self.values, name='task2'), utils.SaveOrderTask(self.values, name='task2'),
uf.Flow('p-inner').add( uf.Flow('p-inner').add(
TestTask(self.values, name='task3', sleep=0.1), utils.SaveOrderTask(self.values, name='task3', sleep=0.1),
NastyTask(), utils.NastyTask(),
FailingTask(sleep=0.01) utils.FailingTask(sleep=0.01)
) )
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
@ -808,11 +346,11 @@ class MultiThreadedEngineTest(EngineTaskTest,
def test_parallel_nested_to_linear_revert(self): def test_parallel_nested_to_linear_revert(self):
flow = lf.Flow('l-root').add( flow = lf.Flow('l-root').add(
TestTask(self.values, name='task1'), utils.SaveOrderTask(self.values, name='task1'),
TestTask(self.values, name='task2'), utils.SaveOrderTask(self.values, name='task2'),
uf.Flow('p-inner').add( uf.Flow('p-inner').add(
TestTask(self.values, name='task3', sleep=0.1), utils.SaveOrderTask(self.values, name='task3', sleep=0.1),
FailingTask(sleep=0.01) utils.FailingTask(sleep=0.01)
) )
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
@ -834,11 +372,11 @@ class MultiThreadedEngineTest(EngineTaskTest,
def test_linear_nested_to_parallel_revert(self): def test_linear_nested_to_parallel_revert(self):
flow = uf.Flow('p-root').add( flow = uf.Flow('p-root').add(
TestTask(self.values, name='task1'), utils.SaveOrderTask(self.values, name='task1'),
TestTask(self.values, name='task2'), utils.SaveOrderTask(self.values, name='task2'),
lf.Flow('l-inner').add( lf.Flow('l-inner').add(
TestTask(self.values, name='task3', sleep=0.1), utils.SaveOrderTask(self.values, name='task3', sleep=0.1),
FailingTask(self.values, name='fail', sleep=0.01) utils.FailingTask(self.values, name='fail', sleep=0.01)
) )
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
@ -858,12 +396,12 @@ class MultiThreadedEngineTest(EngineTaskTest,
def test_linear_nested_to_parallel_revert_exception(self): def test_linear_nested_to_parallel_revert_exception(self):
flow = uf.Flow('p-root').add( flow = uf.Flow('p-root').add(
TestTask(self.values, name='task1', sleep=0.01), utils.SaveOrderTask(self.values, name='task1', sleep=0.01),
TestTask(self.values, name='task2', sleep=0.01), utils.SaveOrderTask(self.values, name='task2', sleep=0.01),
lf.Flow('l-inner').add( lf.Flow('l-inner').add(
TestTask(self.values, name='task3'), utils.SaveOrderTask(self.values, name='task3'),
NastyTask(), utils.NastyTask(),
FailingTask(sleep=0.01) utils.FailingTask(sleep=0.01)
) )
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
@ -874,3 +412,157 @@ class MultiThreadedEngineTest(EngineTaskTest,
'task2', 'task2 reverted(5)', 'task2', 'task2 reverted(5)',
'task3']) 'task3'])
self.assertIsSubset(possible_result, result) self.assertIsSubset(possible_result, result)
class EngineGraphFlowTest(utils.EngineTestBase):
def test_graph_flow_one_task(self):
flow = gf.Flow('g-1').add(
utils.SaveOrderTask(self.values, name='task1')
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['task1'])
def test_graph_flow_two_independent_tasks(self):
flow = gf.Flow('g-2').add(
utils.SaveOrderTask(self.values, name='task1'),
utils.SaveOrderTask(self.values, name='task2')
)
self._make_engine(flow).run()
self.assertEquals(set(self.values), set(['task1', 'task2']))
self.assertEquals(len(flow), 2)
def test_graph_flow_two_tasks(self):
flow = gf.Flow('g-1-1').add(
utils.SaveOrderTask(self.values, name='task2', requires=['a']),
utils.SaveOrderTask(self.values, name='task1', provides='a')
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['task1', 'task2'])
def test_graph_flow_four_tasks_added_separately(self):
flow = (gf.Flow('g-4')
.add(utils.SaveOrderTask(self.values, name='task4',
provides='d', requires=['c']))
.add(utils.SaveOrderTask(self.values, name='task2',
provides='b', requires=['a']))
.add(utils.SaveOrderTask(self.values, name='task3',
provides='c', requires=['b']))
.add(utils.SaveOrderTask(self.values, name='task1',
provides='a'))
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['task1', 'task2', 'task3', 'task4'])
def test_graph_flow_four_tasks_revert(self):
flow = gf.Flow('g-4-failing').add(
utils.SaveOrderTask(self.values, name='task4',
provides='d', requires=['c']),
utils.SaveOrderTask(self.values, name='task2',
provides='b', requires=['a']),
utils.FailingTask(self.values, name='task3',
provides='c', requires=['b']),
utils.SaveOrderTask(self.values, name='task1', provides='a'))
engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
engine.run()
self.assertEquals(
self.values,
['task1', 'task2',
'task3 reverted(Failure: RuntimeError: Woot!)',
'task2 reverted(5)', 'task1 reverted(5)'])
def test_graph_flow_four_tasks_revert_failure(self):
flow = gf.Flow('g-3-nasty').add(
utils.NastyTask(name='task2', provides='b', requires=['a']),
utils.FailingTask(self.values, name='task3', requires=['b']),
utils.SaveOrderTask(self.values, name='task1', provides='a'))
engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Gotcha'):
engine.run()
def test_graph_flow_with_multireturn_and_multiargs_tasks(self):
flow = gf.Flow('g-3-multi').add(
utils.TaskMultiArgOneReturn(name='task1',
rebind=['a', 'b', 'y'], provides='z'),
utils.TaskMultiReturn(name='task2', provides=['a', 'b', 'c']),
utils.TaskMultiArgOneReturn(name='task3',
rebind=['c', 'b', 'x'], provides='y'))
engine = self._make_engine(flow)
engine.storage.inject({'x': 30})
engine.run()
self.assertEquals(engine.storage.fetch_all(), {
'a': 1,
'b': 3,
'c': 5,
'x': 30,
'y': 38,
'z': 42
})
def test_task_graph_property(self):
flow = gf.Flow('test').add(
utils.TaskNoRequiresNoReturns(name='task1'),
utils.TaskNoRequiresNoReturns(name='task2'))
engine = self._make_engine(flow)
graph = engine.execution_graph
self.assertTrue(isinstance(graph, networkx.DiGraph))
def test_task_graph_property_for_one_task(self):
flow = utils.TaskNoRequiresNoReturns(name='task1')
engine = self._make_engine(flow)
graph = engine.execution_graph
self.assertTrue(isinstance(graph, networkx.DiGraph))
class SingleThreadedEngineTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineGraphFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None):
return taskflow.engines.load(flow,
flow_detail=flow_detail,
engine_conf='serial',
backend=self.backend)
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
self.assertIsInstance(engine, eng.SingleThreadedActionEngine)
def test_singlethreaded_is_the_default(self):
engine = taskflow.engines.load(utils.TaskNoRequiresNoReturns)
self.assertIsInstance(engine, eng.SingleThreadedActionEngine)
class MultiThreadedEngineTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineGraphFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
engine_conf = dict(engine='parallel',
executor=executor)
return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf,
backend=self.backend)
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
self.assertIsInstance(engine, eng.MultiThreadedActionEngine)
self.assertIs(engine.executor, None)
def test_using_common_executor(self):
flow = utils.TaskNoRequiresNoReturns(name='task1')
executor = futures.ThreadPoolExecutor(2)
try:
e1 = self._make_engine(flow, executor=executor)
e2 = self._make_engine(flow, executor=executor)
self.assertIs(e1.executor, e2.executor)
finally:
executor.shutdown(wait=True)

View File

@ -0,0 +1,143 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import taskflow.engines
from taskflow import exceptions as exc
from taskflow import test
from taskflow.tests import utils
class ArgumentsPassingTest(utils.EngineTestBase):
def test_save_as(self):
flow = utils.TaskOneReturn(name='task1', provides='first_data')
engine = self._make_engine(flow)
engine.run()
self.assertEquals(engine.storage.fetch_all(), {'first_data': 1})
def test_save_all_in_one(self):
flow = utils.TaskMultiReturn(provides='all_data')
engine = self._make_engine(flow)
engine.run()
self.assertEquals(engine.storage.fetch_all(),
{'all_data': (1, 3, 5)})
def test_save_several_values(self):
flow = utils.TaskMultiReturn(provides=('badger', 'mushroom', 'snake'))
engine = self._make_engine(flow)
engine.run()
self.assertEquals(engine.storage.fetch_all(), {
'badger': 1,
'mushroom': 3,
'snake': 5
})
def test_save_dict(self):
flow = utils.TaskMultiDictk(provides=set(['badger',
'mushroom',
'snake']))
engine = self._make_engine(flow)
engine.run()
self.assertEquals(engine.storage.fetch_all(), {
'badger': 0,
'mushroom': 1,
'snake': 2,
})
def test_bad_save_as_value(self):
with self.assertRaises(TypeError):
utils.TaskOneReturn(name='task1', provides=object())
def test_arguments_passing(self):
flow = utils.TaskMultiArgOneReturn(provides='result')
engine = self._make_engine(flow)
engine.storage.inject({'x': 1, 'y': 4, 'z': 9, 'a': 17})
engine.run()
self.assertEquals(engine.storage.fetch_all(), {
'x': 1, 'y': 4, 'z': 9, 'a': 17,
'result': 14,
})
def test_arguments_missing(self):
flow = utils.TaskMultiArg()
engine = self._make_engine(flow)
engine.storage.inject({'a': 1, 'b': 4, 'x': 17})
with self.assertRaises(exc.MissingDependencies):
engine.run()
def test_partial_arguments_mapping(self):
flow = utils.TaskMultiArgOneReturn(provides='result',
rebind={'x': 'a'})
engine = self._make_engine(flow)
engine.storage.inject({'x': 1, 'y': 4, 'z': 9, 'a': 17})
engine.run()
self.assertEquals(engine.storage.fetch_all(), {
'x': 1, 'y': 4, 'z': 9, 'a': 17,
'result': 30,
})
def test_all_arguments_mapping(self):
flow = utils.TaskMultiArgOneReturn(provides='result',
rebind=['a', 'b', 'c'])
engine = self._make_engine(flow)
engine.storage.inject({
'a': 1, 'b': 2, 'c': 3, 'x': 4, 'y': 5, 'z': 6
})
engine.run()
self.assertEquals(engine.storage.fetch_all(), {
'a': 1, 'b': 2, 'c': 3, 'x': 4, 'y': 5, 'z': 6,
'result': 6,
})
def test_invalid_argument_name_map(self):
flow = utils.TaskMultiArg(rebind={'z': 'b'})
engine = self._make_engine(flow)
engine.storage.inject({'a': 1, 'y': 4, 'c': 9, 'x': 17})
with self.assertRaises(exc.MissingDependencies):
engine.run()
def test_invalid_argument_name_list(self):
flow = utils.TaskMultiArg(rebind=['a', 'z', 'b'])
engine = self._make_engine(flow)
engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17})
with self.assertRaises(exc.MissingDependencies):
engine.run()
def test_bad_rebind_args_value(self):
with self.assertRaises(TypeError):
utils.TaskOneArg(rebind=object())
class SingleThreadedEngineTest(ArgumentsPassingTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None):
return taskflow.engines.load(flow,
flow_detail=flow_detail,
engine_conf='serial',
backend=self.backend)
class MultiThreadedEngineTest(ArgumentsPassingTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
engine_conf = dict(engine='parallel',
executor=executor)
return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf,
backend=self.backend)

View File

@ -16,156 +16,94 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf from taskflow.patterns import unordered_flow as uf
from taskflow import exceptions from taskflow import exceptions
from taskflow import task
from taskflow import test from taskflow import test
from taskflow.tests import utils
class TaskNoRequiresNoReturns(task.Task):
def execute(self, **kwargs):
pass
def revert(self, **kwargs):
pass
class TaskOneArg(task.Task):
def execute(self, x, **kwargs):
pass
def revert(self, x, **kwargs):
pass
class TaskMultiArg(task.Task):
def execute(self, x, y, z, **kwargs):
pass
def revert(self, x, y, z, **kwargs):
pass
class TaskOneReturn(task.Task):
def execute(self, **kwargs):
return 1
def revert(self, **kwargs):
pass
class TaskMultiReturn(task.Task):
def execute(self, **kwargs):
return 1, 3, 5
def revert(self, **kwargs):
pass
class TaskOneArgOneReturn(task.Task):
def execute(self, x, **kwargs):
return 1
def revert(self, x, **kwargs):
pass
class TaskMultiArgMultiReturn(task.Task):
def execute(self, x, y, z, **kwargs):
return 1, 3, 5
def revert(self, x, y, z, **kwargs):
pass
class FlowDependenciesTest(test.TestCase): class FlowDependenciesTest(test.TestCase):
def test_task_without_dependencies(self): def test_task_without_dependencies(self):
flow = TaskNoRequiresNoReturns() flow = utils.TaskNoRequiresNoReturns()
self.assertEquals(flow.requires, set()) self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set()) self.assertEquals(flow.provides, set())
def test_task_requires_default_values(self): def test_task_requires_default_values(self):
flow = TaskMultiArg() flow = utils.TaskMultiArg()
self.assertEquals(flow.requires, set(['x', 'y', 'z'])) self.assertEquals(flow.requires, set(['x', 'y', 'z']))
self.assertEquals(flow.provides, set()) self.assertEquals(flow.provides, set())
def test_task_requires_rebinded_mapped(self): def test_task_requires_rebinded_mapped(self):
flow = TaskMultiArg(rebind={'x': 'a', 'y': 'b', 'z': 'c'}) flow = utils.TaskMultiArg(rebind={'x': 'a', 'y': 'b', 'z': 'c'})
self.assertEquals(flow.requires, set(['a', 'b', 'c'])) self.assertEquals(flow.requires, set(['a', 'b', 'c']))
self.assertEquals(flow.provides, set()) self.assertEquals(flow.provides, set())
def test_task_requires_additional_values(self): def test_task_requires_additional_values(self):
flow = TaskMultiArg(requires=['a', 'b']) flow = utils.TaskMultiArg(requires=['a', 'b'])
self.assertEquals(flow.requires, set(['a', 'b', 'x', 'y', 'z'])) self.assertEquals(flow.requires, set(['a', 'b', 'x', 'y', 'z']))
self.assertEquals(flow.provides, set()) self.assertEquals(flow.provides, set())
def test_task_provides_values(self): def test_task_provides_values(self):
flow = TaskMultiReturn(provides=['a', 'b', 'c']) flow = utils.TaskMultiReturn(provides=['a', 'b', 'c'])
self.assertEquals(flow.requires, set()) self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set(['a', 'b', 'c'])) self.assertEquals(flow.provides, set(['a', 'b', 'c']))
def test_task_provides_and_requires_values(self): def test_task_provides_and_requires_values(self):
flow = TaskMultiArgMultiReturn(provides=['a', 'b', 'c']) flow = utils.TaskMultiArgMultiReturn(provides=['a', 'b', 'c'])
self.assertEquals(flow.requires, set(['x', 'y', 'z'])) self.assertEquals(flow.requires, set(['x', 'y', 'z']))
self.assertEquals(flow.provides, set(['a', 'b', 'c'])) self.assertEquals(flow.provides, set(['a', 'b', 'c']))
def test_linear_flow_without_dependencies(self): def test_linear_flow_without_dependencies(self):
flow = lf.Flow('lf').add( flow = lf.Flow('lf').add(
TaskNoRequiresNoReturns('task1'), utils.TaskNoRequiresNoReturns('task1'),
TaskNoRequiresNoReturns('task2')) utils.TaskNoRequiresNoReturns('task2'))
self.assertEquals(flow.requires, set()) self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set()) self.assertEquals(flow.provides, set())
def test_linear_flow_reuires_values(self): def test_linear_flow_reuires_values(self):
flow = lf.Flow('lf').add( flow = lf.Flow('lf').add(
TaskOneArg('task1'), utils.TaskOneArg('task1'),
TaskMultiArg('task2')) utils.TaskMultiArg('task2'))
self.assertEquals(flow.requires, set(['x', 'y', 'z'])) self.assertEquals(flow.requires, set(['x', 'y', 'z']))
self.assertEquals(flow.provides, set()) self.assertEquals(flow.provides, set())
def test_linear_flow_reuires_rebind_values(self): def test_linear_flow_reuires_rebind_values(self):
flow = lf.Flow('lf').add( flow = lf.Flow('lf').add(
TaskOneArg('task1', rebind=['q']), utils.TaskOneArg('task1', rebind=['q']),
TaskMultiArg('task2')) utils.TaskMultiArg('task2'))
self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q'])) self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q']))
self.assertEquals(flow.provides, set()) self.assertEquals(flow.provides, set())
def test_linear_flow_provides_values(self): def test_linear_flow_provides_values(self):
flow = lf.Flow('lf').add( flow = lf.Flow('lf').add(
TaskOneReturn('task1', provides='x'), utils.TaskOneReturn('task1', provides='x'),
TaskMultiReturn('task2', provides=['a', 'b', 'c'])) utils.TaskMultiReturn('task2', provides=['a', 'b', 'c']))
self.assertEquals(flow.requires, set()) self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c'])) self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c']))
def test_linear_flow_provides_out_of_order(self): def test_linear_flow_provides_out_of_order(self):
with self.assertRaises(exceptions.InvariantViolationException): with self.assertRaises(exceptions.InvariantViolationException):
lf.Flow('lf').add( lf.Flow('lf').add(
TaskOneArg('task2'), utils.TaskOneArg('task2'),
TaskOneReturn('task1', provides='x')) utils.TaskOneReturn('task1', provides='x'))
def test_linear_flow_provides_required_values(self): def test_linear_flow_provides_required_values(self):
flow = lf.Flow('lf').add( flow = lf.Flow('lf').add(
TaskOneReturn('task1', provides='x'), utils.TaskOneReturn('task1', provides='x'),
TaskOneArg('task2')) utils.TaskOneArg('task2'))
self.assertEquals(flow.requires, set()) self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set(['x'])) self.assertEquals(flow.provides, set(['x']))
def test_linear_flow_multi_provides_and_requires_values(self): def test_linear_flow_multi_provides_and_requires_values(self):
flow = lf.Flow('lf').add( flow = lf.Flow('lf').add(
TaskMultiArgMultiReturn('task1', utils.TaskMultiArgMultiReturn('task1',
rebind=['a', 'b', 'c'], rebind=['a', 'b', 'c'],
provides=['x', 'y', 'q']), provides=['x', 'y', 'q']),
TaskMultiArgMultiReturn('task2', utils.TaskMultiArgMultiReturn('task2',
provides=['i', 'j', 'k'])) provides=['i', 'j', 'k']))
self.assertEquals(flow.requires, set(['a', 'b', 'c', 'z'])) self.assertEquals(flow.requires, set(['a', 'b', 'c', 'z']))
self.assertEquals(flow.provides, set(['x', 'y', 'q', 'i', 'j', 'k'])) self.assertEquals(flow.provides, set(['x', 'y', 'q', 'i', 'j', 'k']))
@ -173,65 +111,65 @@ class FlowDependenciesTest(test.TestCase):
def test_linear_flow_self_requires(self): def test_linear_flow_self_requires(self):
flow = lf.Flow('uf') flow = lf.Flow('uf')
with self.assertRaises(exceptions.InvariantViolationException): with self.assertRaises(exceptions.InvariantViolationException):
flow.add(TaskNoRequiresNoReturns(rebind=['x'], provides='x')) flow.add(utils.TaskNoRequiresNoReturns(rebind=['x'], provides='x'))
def test_unordered_flow_without_dependencies(self): def test_unordered_flow_without_dependencies(self):
flow = uf.Flow('uf').add( flow = uf.Flow('uf').add(
TaskNoRequiresNoReturns('task1'), utils.TaskNoRequiresNoReturns('task1'),
TaskNoRequiresNoReturns('task2')) utils.TaskNoRequiresNoReturns('task2'))
self.assertEquals(flow.requires, set()) self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set()) self.assertEquals(flow.provides, set())
def test_unordered_flow_self_requires(self): def test_unordered_flow_self_requires(self):
flow = uf.Flow('uf') flow = uf.Flow('uf')
with self.assertRaises(exceptions.InvariantViolationException): with self.assertRaises(exceptions.InvariantViolationException):
flow.add(TaskNoRequiresNoReturns(rebind=['x'], provides='x')) flow.add(utils.TaskNoRequiresNoReturns(rebind=['x'], provides='x'))
def test_unordered_flow_reuires_values(self): def test_unordered_flow_reuires_values(self):
flow = uf.Flow('uf').add( flow = uf.Flow('uf').add(
TaskOneArg('task1'), utils.TaskOneArg('task1'),
TaskMultiArg('task2')) utils.TaskMultiArg('task2'))
self.assertEquals(flow.requires, set(['x', 'y', 'z'])) self.assertEquals(flow.requires, set(['x', 'y', 'z']))
self.assertEquals(flow.provides, set()) self.assertEquals(flow.provides, set())
def test_unordered_flow_reuires_rebind_values(self): def test_unordered_flow_reuires_rebind_values(self):
flow = uf.Flow('uf').add( flow = uf.Flow('uf').add(
TaskOneArg('task1', rebind=['q']), utils.TaskOneArg('task1', rebind=['q']),
TaskMultiArg('task2')) utils.TaskMultiArg('task2'))
self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q'])) self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q']))
self.assertEquals(flow.provides, set()) self.assertEquals(flow.provides, set())
def test_unordered_flow_provides_values(self): def test_unordered_flow_provides_values(self):
flow = uf.Flow('uf').add( flow = uf.Flow('uf').add(
TaskOneReturn('task1', provides='x'), utils.TaskOneReturn('task1', provides='x'),
TaskMultiReturn('task2', provides=['a', 'b', 'c'])) utils.TaskMultiReturn('task2', provides=['a', 'b', 'c']))
self.assertEquals(flow.requires, set()) self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c'])) self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c']))
def test_unordered_flow_provides_required_values(self): def test_unordered_flow_provides_required_values(self):
with self.assertRaises(exceptions.InvariantViolationException): with self.assertRaises(exceptions.InvariantViolationException):
uf.Flow('uf').add( uf.Flow('uf').add(
TaskOneReturn('task1', provides='x'), utils.TaskOneReturn('task1', provides='x'),
TaskOneArg('task2')) utils.TaskOneArg('task2'))
def test_unordered_flow_requires_provided_value_other_call(self): def test_unordered_flow_requires_provided_value_other_call(self):
flow = uf.Flow('uf') flow = uf.Flow('uf')
flow.add(TaskOneReturn('task1', provides='x')) flow.add(utils.TaskOneReturn('task1', provides='x'))
with self.assertRaises(exceptions.InvariantViolationException): with self.assertRaises(exceptions.InvariantViolationException):
flow.add(TaskOneArg('task2')) flow.add(utils.TaskOneArg('task2'))
def test_unordered_flow_provides_required_value_other_call(self): def test_unordered_flow_provides_required_value_other_call(self):
flow = uf.Flow('uf') flow = uf.Flow('uf')
flow.add(TaskOneArg('task2')) flow.add(utils.TaskOneArg('task2'))
with self.assertRaises(exceptions.InvariantViolationException): with self.assertRaises(exceptions.InvariantViolationException):
flow.add(TaskOneReturn('task1', provides='x')) flow.add(utils.TaskOneReturn('task1', provides='x'))
def test_unordered_flow_multi_provides_and_requires_values(self): def test_unordered_flow_multi_provides_and_requires_values(self):
flow = uf.Flow('uf').add( flow = uf.Flow('uf').add(
TaskMultiArgMultiReturn('task1', utils.TaskMultiArgMultiReturn('task1',
rebind=['a', 'b', 'c'], rebind=['a', 'b', 'c'],
provides=['d', 'e', 'f']), provides=['d', 'e', 'f']),
TaskMultiArgMultiReturn('task2', utils.TaskMultiArgMultiReturn('task2',
provides=['i', 'j', 'k'])) provides=['i', 'j', 'k']))
self.assertEquals(flow.requires, set(['a', 'b', 'c', 'x', 'y', 'z'])) self.assertEquals(flow.requires, set(['a', 'b', 'c', 'x', 'y', 'z']))
self.assertEquals(flow.provides, set(['d', 'e', 'f', 'i', 'j', 'k'])) self.assertEquals(flow.provides, set(['d', 'e', 'f', 'i', 'j', 'k']))
@ -239,10 +177,76 @@ class FlowDependenciesTest(test.TestCase):
def test_nested_flows_requirements(self): def test_nested_flows_requirements(self):
flow = uf.Flow('uf').add( flow = uf.Flow('uf').add(
lf.Flow('lf').add( lf.Flow('lf').add(
TaskOneArgOneReturn('task1', rebind=['a'], provides=['x']), utils.TaskOneArgOneReturn('task1',
TaskOneArgOneReturn('task2', provides=['y'])), rebind=['a'], provides=['x']),
utils.TaskOneArgOneReturn('task2', provides=['y'])),
uf.Flow('uf').add( uf.Flow('uf').add(
TaskOneArgOneReturn('task3', rebind=['b'], provides=['z']), utils.TaskOneArgOneReturn('task3',
TaskOneArgOneReturn('task4', rebind=['c'], provides=['q']))) rebind=['b'], provides=['z']),
utils.TaskOneArgOneReturn('task4', rebind=['c'],
provides=['q'])))
self.assertEquals(flow.requires, set(['a', 'b', 'c'])) self.assertEquals(flow.requires, set(['a', 'b', 'c']))
self.assertEquals(flow.provides, set(['x', 'y', 'z', 'q'])) self.assertEquals(flow.provides, set(['x', 'y', 'z', 'q']))
def test_graph_flow_without_dependencies(self):
flow = gf.Flow('gf').add(
utils.TaskNoRequiresNoReturns('task1'),
utils.TaskNoRequiresNoReturns('task2'))
self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set())
def test_graph_flow_self_requires(self):
with self.assertRaisesRegexp(exceptions.DependencyFailure, '^No path'):
gf.Flow('g-1-req-error').add(
utils.TaskOneArgOneReturn(requires=['a'], provides='a'))
def test_graph_flow_reuires_values(self):
flow = gf.Flow('gf').add(
utils.TaskOneArg('task1'),
utils.TaskMultiArg('task2'))
self.assertEquals(flow.requires, set(['x', 'y', 'z']))
self.assertEquals(flow.provides, set())
def test_graph_flow_reuires_rebind_values(self):
flow = gf.Flow('gf').add(
utils.TaskOneArg('task1', rebind=['q']),
utils.TaskMultiArg('task2'))
self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q']))
self.assertEquals(flow.provides, set())
def test_graph_flow_provides_values(self):
flow = gf.Flow('gf').add(
utils.TaskOneReturn('task1', provides='x'),
utils.TaskMultiReturn('task2', provides=['a', 'b', 'c']))
self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c']))
def test_graph_flow_provides_required_values(self):
flow = gf.Flow('gf').add(
utils.TaskOneReturn('task1', provides='x'),
utils.TaskOneArg('task2'))
self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set(['x']))
def test_graph_flow_provides_provided_value_other_call(self):
flow = gf.Flow('gf')
flow.add(utils.TaskOneReturn('task1', provides='x'))
with self.assertRaises(exceptions.DependencyFailure):
flow.add(utils.TaskOneReturn('task2', provides='x'))
def test_graph_flow_multi_provides_and_requires_values(self):
flow = gf.Flow('gf').add(
utils.TaskMultiArgMultiReturn('task1',
rebind=['a', 'b', 'c'],
provides=['d', 'e', 'f']),
utils.TaskMultiArgMultiReturn('task2',
provides=['i', 'j', 'k']))
self.assertEquals(flow.requires, set(['a', 'b', 'c', 'x', 'y', 'z']))
self.assertEquals(flow.provides, set(['d', 'e', 'f', 'i', 'j', 'k']))
def test_graph_cyclic_dependency(self):
with self.assertRaisesRegexp(exceptions.DependencyFailure, '^No path'):
gf.Flow('g-3-cyclic').add(
utils.TaskOneArgOneReturn(provides='a', requires=['b']),
utils.TaskOneArgOneReturn(provides='b', requires=['c']),
utils.TaskOneArgOneReturn(provides='c', requires=['a']))

View File

@ -20,9 +20,7 @@ import collections
import taskflow.engines import taskflow.engines
from taskflow import exceptions as exc
from taskflow.patterns import graph_flow as gw from taskflow.patterns import graph_flow as gw
from taskflow import states
from taskflow.utils import flow_utils as fu from taskflow.utils import flow_utils as fu
from taskflow.utils import graph_utils as gu from taskflow.utils import graph_utils as gu
@ -32,7 +30,7 @@ from taskflow.tests import utils
class GraphFlowTest(test.TestCase): class GraphFlowTest(test.TestCase):
def _make_engine(self, flow): def _make_engine(self, flow):
return taskflow.engines.load(flow, store={'context': {}}) return taskflow.engines.load(flow, store={})
def _capture_states(self): def _capture_states(self):
# TODO(harlowja): move function to shared helper # TODO(harlowja): move function to shared helper
@ -64,26 +62,6 @@ class GraphFlowTest(test.TestCase):
self.assertEquals([test_1], list(gu.get_no_predecessors(wf.graph))) self.assertEquals([test_1], list(gu.get_no_predecessors(wf.graph)))
self.assertEquals([test_3], list(gu.get_no_successors(wf.graph))) self.assertEquals([test_3], list(gu.get_no_successors(wf.graph)))
def test_invalid_add_simple(self):
wf = gw.Flow("the-test-action")
test_1 = utils.ProvidesRequiresTask('test-1',
requires=['a'],
provides=set(['a', 'b']))
self.assertRaises(exc.DependencyFailure, wf.add, test_1)
self.assertEquals(0, len(wf))
def test_invalid_add_loop(self):
wf = gw.Flow("the-test-action")
test_1 = utils.ProvidesRequiresTask('test-1',
requires=['c'],
provides=set(['a', 'b']))
test_2 = utils.ProvidesRequiresTask('test-2',
requires=['a', 'b'],
provides=set(['c']))
wf.add(test_1)
self.assertRaises(exc.DependencyFailure, wf.add, test_2)
self.assertEquals(1, len(wf))
def test_basic_edge_reasons(self): def test_basic_edge_reasons(self):
wf = gw.Flow("the-test-action") wf = gw.Flow("the-test-action")
test_1 = utils.ProvidesRequiresTask('test-1', test_1 = utils.ProvidesRequiresTask('test-1',
@ -136,28 +114,3 @@ class GraphFlowTest(test.TestCase):
edge_attrs = gu.get_edge_attrs(g, test_1, test_2) edge_attrs = gu.get_edge_attrs(g, test_1, test_2)
self.assertTrue(edge_attrs.get('manual')) self.assertTrue(edge_attrs.get('manual'))
self.assertTrue(edge_attrs.get('flatten')) self.assertTrue(edge_attrs.get('flatten'))
def test_graph_run(self):
wf = gw.Flow("the-test-action")
test_1 = utils.ProvidesRequiresTask('test-1',
requires=[],
provides=[])
test_2 = utils.ProvidesRequiresTask('test-2',
provides=[],
requires=[])
wf.add(test_1, test_2)
wf.link(test_1, test_2)
self.assertEquals(2, len(wf))
e = self._make_engine(wf)
capture_func, captured = self._capture_states()
e.task_notifier.register('*', capture_func)
e.run()
self.assertEquals(2, len(captured))
for (_uuid, t_states) in captured.items():
self.assertEquals([states.RUNNING, states.SUCCESS], t_states)
run_context = e.storage.fetch('context')
ordering = [o['name'] for o in run_context[utils.ORDER_KEY]]
self.assertEquals(['test-1', 'test-2'], ordering)

View File

@ -1,260 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import taskflow.engines
from taskflow import exceptions as exc
from taskflow.patterns import linear_flow as lw
from taskflow import states
from taskflow import task
from taskflow import test
from taskflow.tests import utils
class LinearFlowTest(test.TestCase):
def _make_engine(self, flow):
return taskflow.engines.load(flow, store={'context': {}})
def test_result_access(self):
def do_apply1(context):
return [1, 2]
wf = lw.Flow("the-test-action")
wf.add(task.FunctorTask(do_apply1, provides=['a', 'b']))
e = self._make_engine(wf)
e.run()
data = e.storage.fetch_all()
self.assertIn('a', data)
self.assertIn('b', data)
self.assertEquals(2, data['b'])
self.assertEquals(1, data['a'])
def test_functor_flow(self):
wf = lw.Flow("the-test-action")
def do_apply1(context):
context['1'] = True
return ['a', 'b', 'c']
def do_apply2(context, a, **kwargs):
self.assertTrue('c' in kwargs)
self.assertEquals('a', a)
context['2'] = True
wf.add(task.FunctorTask(do_apply1, provides=['a', 'b', 'c']))
wf.add(task.FunctorTask(do_apply2, requires=set(['c'])))
e = self._make_engine(wf)
e.run()
self.assertEquals(2, len(e.storage.fetch('context')))
def test_sad_flow_state_changes(self):
changes = []
task_changes = []
def listener(state, details):
changes.append(state)
def task_listener(state, details):
if details.get('task_name') == 'blowup_1':
task_changes.append(state)
wf = lw.Flow("the-test-action")
wf.add(utils.make_reverting_task(2, False))
wf.add(utils.make_reverting_task(1, True))
e = self._make_engine(wf)
e.notifier.register('*', listener)
e.task_notifier.register('*', task_listener)
self.assertRaises(Exception, e.run)
expected_states = [
states.RUNNING,
states.FAILURE,
states.REVERTING,
states.REVERTED,
]
self.assertEquals(expected_states, changes)
expected_states = [
states.RUNNING,
states.FAILURE,
states.REVERTING,
states.REVERTED,
states.PENDING,
]
self.assertEquals(expected_states, task_changes)
context = e.storage.fetch('context')
# Only 2 should have been reverted (which should have been
# marked in the context as occuring).
self.assertIn(2, context)
self.assertEquals('reverted', context[2])
self.assertNotIn(1, context)
def test_happy_flow_state_changes(self):
changes = []
def listener(state, details):
changes.append(state)
wf = lw.Flow("the-test-action")
wf.add(utils.make_reverting_task(1))
e = self._make_engine(wf)
e.notifier.register('*', listener)
e.run()
self.assertEquals([states.RUNNING, states.SUCCESS], changes)
def test_happy_flow(self):
wf = lw.Flow("the-test-action")
for i in range(0, 10):
wf.add(utils.make_reverting_task(i))
e = self._make_engine(wf)
capture_func, captured = self._capture_states()
e.task_notifier.register('*', capture_func)
e.run()
context = e.storage.fetch('context')
self.assertEquals(10, len(context))
self.assertEquals(10, len(captured))
for _k, v in context.items():
self.assertEquals('passed', v)
for _uuid, u_states in captured.items():
self.assertEquals([states.RUNNING, states.SUCCESS], u_states)
def _capture_states(self):
capture_where = collections.defaultdict(list)
def do_capture(state, details):
task_uuid = details.get('task_uuid')
if not task_uuid:
return
capture_where[task_uuid].append(state)
return (do_capture, capture_where)
def test_reverting_flow(self):
wf = lw.Flow("the-test-action")
wf.add(utils.make_reverting_task(1))
wf.add(utils.make_reverting_task(2, True))
capture_func, captured = self._capture_states()
e = self._make_engine(wf)
e.task_notifier.register('*', capture_func)
self.assertRaises(Exception, e.run)
run_context = e.storage.fetch('context')
self.assertEquals('reverted', run_context[1])
self.assertEquals(1, len(run_context))
blowup_id = e.storage.get_uuid_by_name('blowup_2')
happy_id = e.storage.get_uuid_by_name('do_apply_1')
self.assertEquals(2, len(captured))
self.assertIn(blowup_id, captured)
expected_states = [states.RUNNING, states.FAILURE, states.REVERTING,
states.REVERTED, states.PENDING]
self.assertEquals(expected_states, captured[blowup_id])
expected_states = [states.RUNNING, states.SUCCESS, states.REVERTING,
states.REVERTED, states.PENDING]
self.assertIn(happy_id, captured)
self.assertEquals(expected_states, captured[happy_id])
def test_not_satisfied_inputs(self):
def task_a(context, *args, **kwargs):
pass
def task_b(context, c, *args, **kwargs):
pass
wf = lw.Flow("the-test-action")
wf.add(task.FunctorTask(task_a))
wf.add(task.FunctorTask(task_b))
e = self._make_engine(wf)
self.assertRaises(exc.MissingDependencies, e.run)
def test_flow_bad_order(self):
wf = lw.Flow("the-test-action")
wf.add(utils.ProvidesRequiresTask('test-1',
requires=set(),
provides=['a', 'b']))
# This one should fail to add since it requires 'c'
no_req_task = utils.ProvidesRequiresTask('test-2', requires=['c'],
provides=[])
wf.add(no_req_task)
e = self._make_engine(wf)
self.assertRaises(exc.MissingDependencies, e.run)
def test_flow_set_order(self):
wf = lw.Flow("the-test-action")
wf.add(utils.ProvidesRequiresTask('test-1',
requires=[],
provides=set(['a', 'b'])))
wf.add(utils.ProvidesRequiresTask('test-2',
requires=set(['a', 'b']),
provides=set([])))
e = self._make_engine(wf)
e.run()
run_context = e.storage.fetch('context')
ordering = run_context[utils.ORDER_KEY]
self.assertEquals(2, len(ordering))
self.assertEquals('test-1', ordering[0]['name'])
self.assertEquals('test-2', ordering[1]['name'])
self.assertEquals({'a': 'a', 'b': 'b'},
ordering[1][utils.KWARGS_KEY])
self.assertEquals({},
ordering[0][utils.KWARGS_KEY])
def test_flow_list_order(self):
wf = lw.Flow("the-test-action")
wf.add(utils.ProvidesRequiresTask('test-1',
requires=[],
provides=['a', 'b']))
wf.add(utils.ProvidesRequiresTask('test-2',
requires=['a', 'b'],
provides=['c', 'd']))
wf.add(utils.ProvidesRequiresTask('test-3',
requires=['c', 'd'],
provides=[]))
wf.add(utils.ProvidesRequiresTask('test-4',
requires=[],
provides=['d']))
wf.add(utils.ProvidesRequiresTask('test-5',
requires=[],
provides=['d']))
wf.add(utils.ProvidesRequiresTask('test-6',
requires=['d'],
provides=[]))
e = self._make_engine(wf)
e.run()
run_context = e.storage.fetch('context')
ordering = run_context[utils.ORDER_KEY]
for i, entry in enumerate(ordering):
self.assertEquals('test-%s' % (i + 1), entry['name'])

View File

@ -0,0 +1,180 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from taskflow.patterns import linear_flow as lf
import taskflow.engines
from taskflow import exceptions as exc
from taskflow import states
from taskflow import task
from taskflow import test
from taskflow.tests import utils
class TestTask(task.Task):
def __init__(self, values=None, name=None, sleep=None,
provides=None, rebind=None, requires=None):
super(TestTask, self).__init__(name=name, provides=provides,
rebind=rebind, requires=requires)
if values is None:
self.values = []
else:
self.values = values
self._sleep = sleep
def execute(self, **kwargs):
self.update_progress(0.0)
if self._sleep:
time.sleep(self._sleep)
self.values.append(self.name)
self.update_progress(1.0)
return 5
def revert(self, **kwargs):
self.update_progress(0)
if self._sleep:
time.sleep(self._sleep)
self.values.append(self.name + ' reverted(%s)'
% kwargs.get('result'))
self.update_progress(1.0)
class FailingTask(TestTask):
def execute(self, **kwargs):
self.update_progress(0)
if self._sleep:
time.sleep(self._sleep)
self.update_progress(0.99)
raise RuntimeError('Woot!')
class AutoSuspendingTask(TestTask):
def execute(self, engine):
result = super(AutoSuspendingTask, self).execute()
engine.suspend()
return result
def revert(self, engine, result):
super(AutoSuspendingTask, self).revert(**{'result': result})
class AutoSuspendingTaskOnRevert(TestTask):
def execute(self, engine):
return super(AutoSuspendingTaskOnRevert, self).execute()
def revert(self, engine, result):
super(AutoSuspendingTaskOnRevert, self).revert(**{'result': result})
engine.suspend()
class SuspendFlowTest(utils.EngineTestBase):
def test_suspend_one_task(self):
flow = AutoSuspendingTask(self.values, 'a')
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS)
self.assertEquals(self.values, ['a'])
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS)
self.assertEquals(self.values, ['a'])
def test_suspend_linear_flow(self):
flow = lf.Flow('linear').add(
TestTask(self.values, 'a'),
AutoSuspendingTask(self.values, 'b'),
TestTask(self.values, 'c')
)
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED)
self.assertEquals(self.values, ['a', 'b'])
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUCCESS)
self.assertEquals(self.values, ['a', 'b', 'c'])
def test_suspend_linear_flow_on_revert(self):
flow = lf.Flow('linear').add(
TestTask(self.values, 'a'),
AutoSuspendingTaskOnRevert(self.values, 'b'),
FailingTask(self.values, 'c')
)
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED)
self.assertEquals(
self.values,
['a', 'b',
'c reverted(Failure: RuntimeError: Woot!)',
'b reverted(5)'])
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.REVERTED)
self.assertEquals(
self.values,
['a',
'b',
'c reverted(Failure: RuntimeError: Woot!)',
'b reverted(5)',
'a reverted(5)'])
def test_storage_is_rechecked(self):
flow = lf.Flow('linear').add(
AutoSuspendingTask(self.values, 'b'),
TestTask(self.values, name='c')
)
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine, 'boo': True})
engine.run()
self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED)
# uninject engine
engine.storage.save(
engine.storage.get_uuid_by_name(engine.storage.injector_name),
None,
states.FAILURE)
with self.assertRaises(exc.MissingDependencies):
engine.run()
class SingleThreadedEngineTest(SuspendFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None):
return taskflow.engines.load(flow,
flow_detail=flow_detail,
engine_conf='serial',
backend=self.backend)
class MultiThreadedEngineTest(SuspendFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
engine_conf = dict(engine='parallel',
executor=executor)
return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf,
backend=self.backend)

View File

@ -16,8 +16,11 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import contextlib
import six import six
import time
from taskflow.persistence.backends import impl_memory
from taskflow import task from taskflow import task
ARGS_KEY = '__args__' ARGS_KEY = '__args__'
@ -45,27 +48,6 @@ def make_reverting_task(token, blowup=False):
name='do_apply_%s' % token) name='do_apply_%s' % token)
class ProvidesRequiresTask(task.Task):
def __init__(self, name, provides, requires, return_tuple=True):
super(ProvidesRequiresTask, self).__init__(name=name,
provides=provides,
requires=requires)
self.return_tuple = isinstance(provides, (tuple, list))
def execute(self, context, *args, **kwargs):
if ORDER_KEY not in context:
context[ORDER_KEY] = []
context[ORDER_KEY].append({
'name': self.name,
KWARGS_KEY: kwargs,
ARGS_KEY: args,
})
if self.return_tuple:
return tuple(range(len(self.provides)))
else:
return dict((k, k) for k in self.provides)
class DummyTask(task.Task): class DummyTask(task.Task):
def execute(self, context, *args, **kwargs): def execute(self, context, *args, **kwargs):
pass pass
@ -77,3 +59,168 @@ if six.PY3:
else: else:
RUNTIME_ERROR_CLASSES = ['RuntimeError', 'StandardError', 'Exception', RUNTIME_ERROR_CLASSES = ['RuntimeError', 'StandardError', 'Exception',
'BaseException', 'object'] 'BaseException', 'object']
class ProvidesRequiresTask(task.Task):
def __init__(self, name, provides, requires, return_tuple=True):
super(ProvidesRequiresTask, self).__init__(name=name,
provides=provides,
requires=requires)
self.return_tuple = isinstance(provides, (tuple, list))
def execute(self, *args, **kwargs):
if self.return_tuple:
return tuple(range(len(self.provides)))
else:
return dict((k, k) for k in self.provides)
class SaveOrderTask(task.Task):
def __init__(self, values=None, name=None, sleep=None,
*args, **kwargs):
super(SaveOrderTask, self).__init__(name=name, *args, **kwargs)
if values is None:
self.values = []
else:
self.values = values
self._sleep = sleep
def execute(self, **kwargs):
self.update_progress(0.0)
if self._sleep:
time.sleep(self._sleep)
self.values.append(self.name)
self.update_progress(1.0)
return 5
def revert(self, **kwargs):
self.update_progress(0)
if self._sleep:
time.sleep(self._sleep)
self.values.append(self.name + ' reverted(%s)'
% kwargs.get('result'))
self.update_progress(1.0)
class FailingTask(SaveOrderTask):
def execute(self, **kwargs):
self.update_progress(0)
if self._sleep:
time.sleep(self._sleep)
self.update_progress(0.99)
raise RuntimeError('Woot!')
class NastyTask(task.Task):
def execute(self, **kwargs):
pass
def revert(self, **kwargs):
raise RuntimeError('Gotcha!')
class TaskNoRequiresNoReturns(task.Task):
def execute(self, **kwargs):
pass
def revert(self, **kwargs):
pass
class TaskOneArg(task.Task):
def execute(self, x, **kwargs):
pass
def revert(self, x, **kwargs):
pass
class TaskMultiArg(task.Task):
def execute(self, x, y, z, **kwargs):
pass
def revert(self, x, y, z, **kwargs):
pass
class TaskOneReturn(task.Task):
def execute(self, **kwargs):
return 1
def revert(self, **kwargs):
pass
class TaskMultiReturn(task.Task):
def execute(self, **kwargs):
return 1, 3, 5
def revert(self, **kwargs):
pass
class TaskOneArgOneReturn(task.Task):
def execute(self, x, **kwargs):
return 1
def revert(self, x, **kwargs):
pass
class TaskMultiArgOneReturn(task.Task):
def execute(self, x, y, z, **kwargs):
return x + y + z
def revert(self, x, y, z, **kwargs):
pass
class TaskMultiArgMultiReturn(task.Task):
def execute(self, x, y, z, **kwargs):
return 1, 3, 5
def revert(self, x, y, z, **kwargs):
pass
class TaskMultiDictk(task.Task):
def execute(self):
output = {}
for i, k in enumerate(sorted(self.provides)):
output[k] = i
return output
class NeverRunningTask(task.Task):
def execute(self, **kwargs):
assert False, 'This method should not be called'
def revert(self, **kwargs):
assert False, 'This method should not be called'
class EngineTestBase(object):
def setUp(self):
super(EngineTestBase, self).setUp()
self.values = []
self.backend = impl_memory.MemoryBackend(conf={})
def tearDown(self):
super(EngineTestBase, self).tearDown()
with contextlib.closing(self.backend) as be:
with contextlib.closing(be.get_connection()) as conn:
conn.clear_all()
def _make_engine(self, flow, flow_detail=None):
raise NotImplementedError()