Linear flow: mark links and rework unit tests

- add 'invariant': True to all links produced by linear flow;
- write new unit tests for basic functionality of linear flow
  pattern;
- remove some simple now-redundant unit tests from
  test_flow_dependencies.

Change-Id: I1f4fa8a1dfc61485555a10e8d0629a67aab1997f
This commit is contained in:
Ivan A. Melnikov
2014-03-21 14:26:24 +04:00
parent 1011df951e
commit 3c18637399
4 changed files with 141 additions and 22 deletions

View File

@@ -18,8 +18,7 @@ from taskflow import exceptions
from taskflow import flow
# TODO(imelnikov): add metadata describing link here
_LINK_METADATA = dict()
_LINK_METADATA = {'invariant': True}
class Flow(flow.Flow):

View File

View File

@@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import exceptions as exc
from taskflow.patterns import linear_flow as lf
from taskflow import retry
from taskflow import test
from taskflow.tests import utils
def _task(name, provides=None, requires=None):
return utils.ProvidesRequiresTask(name, provides, requires)
class LinearFlowTest(test.TestCase):
def test_linear_flow_starts_as_empty(self):
f = lf.Flow('test')
self.assertEqual(len(f), 0)
self.assertEqual(list(f), [])
self.assertEqual(list(f.iter_links()), [])
self.assertEqual(f.requires, set())
self.assertEqual(f.provides, set())
expected = 'taskflow.patterns.linear_flow.Flow: test; 0'
self.assertEqual(str(f), expected)
def test_linear_flow_add_nothing(self):
f = lf.Flow('test')
result = f.add()
self.assertIs(f, result)
self.assertEqual(len(f), 0)
def test_linear_flow_one_task(self):
f = lf.Flow('test')
task = _task(name='task1', requires=['a', 'b'], provides=['c', 'd'])
result = f.add(task)
self.assertIs(f, result)
self.assertEqual(len(f), 1)
self.assertEqual(list(f), [task])
self.assertEqual(list(f.iter_links()), [])
self.assertEqual(f.requires, set(['a', 'b']))
self.assertEqual(f.provides, set(['c', 'd']))
def test_linear_flow_two_independent_tasks(self):
task1 = _task(name='task1')
task2 = _task(name='task2')
f = lf.Flow('test').add(task1, task2)
self.assertEqual(len(f), 2)
self.assertEqual(list(f), [task1, task2])
self.assertEqual(list(f.iter_links()), [
(task1, task2, {'invariant': True})
])
def test_linear_flow_two_dependent_tasks(self):
task1 = _task(name='task1', provides=['a'])
task2 = _task(name='task2', requires=['a'])
f = lf.Flow('test').add(task1, task2)
self.assertEqual(len(f), 2)
self.assertEqual(list(f), [task1, task2])
self.assertEqual(list(f.iter_links()), [
(task1, task2, {'invariant': True})
])
self.assertEqual(f.requires, set())
self.assertEqual(f.provides, set(['a']))
def test_linear_flow_two_dependent_tasks_two_different_calls(self):
task1 = _task(name='task1', provides=['a'])
task2 = _task(name='task2', requires=['a'])
f = lf.Flow('test').add(task1).add(task2)
self.assertEqual(len(f), 2)
self.assertEqual(list(f), [task1, task2])
self.assertEqual(list(f.iter_links()), [
(task1, task2, {'invariant': True})
])
def test_linear_flow_two_dependent_tasks_reverse_order(self):
task1 = _task(name='task1', provides=['a'])
task2 = _task(name='task2', requires=['a'])
f = lf.Flow('test')
self.assertRaises(exc.InvariantViolation, f.add, task2, task1)
def test_linear_flow_two_dependent_tasks_reverse_order2(self):
task1 = _task(name='task1', provides=['a'])
task2 = _task(name='task2', requires=['a'])
f = lf.Flow('test').add(task2)
self.assertRaises(exc.InvariantViolation, f.add, task1)
def test_linear_flow_two_task_same_provide(self):
task1 = _task(name='task1', provides=['a', 'b'])
task2 = _task(name='task2', provides=['a', 'c'])
f = lf.Flow('test')
self.assertRaises(exc.DependencyFailure, f.add, task2, task1)
def test_linear_flow_three_tasks(self):
task1 = _task(name='task1')
task2 = _task(name='task2')
task3 = _task(name='task3')
f = lf.Flow('test').add(task1, task2, task3)
self.assertEqual(len(f), 3)
self.assertEqual(list(f), [task1, task2, task3])
self.assertEqual(list(f.iter_links()), [
(task1, task2, {'invariant': True}),
(task2, task3, {'invariant': True})
])
expected = 'taskflow.patterns.linear_flow.Flow: test; 3'
self.assertEqual(str(f), expected)
def test_linear_flow_with_retry(self):
ret = retry.AlwaysRevert(requires=['a'], provides=['b'])
f = lf.Flow('test', ret)
self.assertIs(f.retry, ret)
self.assertEqual(ret.name, 'test_retry')
self.assertEqual(f.requires, set(['a']))
self.assertEqual(f.provides, set(['b']))

View File

@@ -84,13 +84,6 @@ class FlowDependenciesTest(test.TestCase):
self.assertEqual(flow.requires, set())
self.assertEqual(flow.provides, set(['x', 'a', 'b', 'c']))
def test_linear_flow_provides_out_of_order(self):
flow = lf.Flow('lf')
self.assertRaises(exceptions.InvariantViolation,
flow.add,
utils.TaskOneArg('task2'),
utils.TaskOneReturn('task1', provides='x'))
def test_linear_flow_provides_required_values(self):
flow = lf.Flow('lf').add(
utils.TaskOneReturn('task1', provides='x'),
@@ -108,19 +101,6 @@ class FlowDependenciesTest(test.TestCase):
self.assertEqual(flow.requires, set(['a', 'b', 'c', 'z']))
self.assertEqual(flow.provides, set(['x', 'y', 'q', 'i', 'j', 'k']))
def test_linear_flow_provides_same_values(self):
flow = lf.Flow('lf').add(utils.TaskOneReturn(provides='x'))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn(provides='x'))
def test_linear_flow_provides_same_values_one_add(self):
flow = lf.Flow('lf')
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn(provides='x'),
utils.TaskOneReturn(provides='x'))
def test_unordered_flow_without_dependencies(self):
flow = uf.Flow('uf').add(
utils.TaskNoRequiresNoReturns('task1'),