Merge "Relax the unordered flow symbol constraints"

This commit is contained in:
Jenkins
2014-10-18 18:25:17 +00:00
committed by Gerrit Code Review
3 changed files with 51 additions and 95 deletions

View File

@@ -14,70 +14,25 @@
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import exceptions
from taskflow import flow
class Flow(flow.Flow):
"""Unordered Flow pattern.
"""Unordered flow pattern.
A unordered (potentially nested) flow of *tasks/flows* that can be
executed in any order as one unit and rolled back as one unit.
NOTE(harlowja): Since the flow is unordered there can *not* be any
dependency between task/flow inputs (requirements) and
task/flow outputs (provided names/values).
"""
def __init__(self, name, retry=None):
super(Flow, self).__init__(name, retry)
# NOTE(imelnikov): A unordered flow is unordered, so we use
# set instead of list to save children, children so that
# people using it don't depend on the ordering
# people using it don't depend on the ordering.
self._children = set()
def add(self, *items):
"""Adds a given task/tasks/flow/flows to this flow."""
if not items:
return self
# check that items don't provide anything that other
# part of flow provides or requires
provides = self.provides
old_requires = self.requires
for item in items:
item_provides = item.provides
bad_provs = item_provides & old_requires
if bad_provs:
raise exceptions.DependencyFailure(
"%(item)s provides %(oo)s that are required "
"by other item(s) of unordered flow %(flow)s"
% dict(item=item.name, flow=self.name,
oo=sorted(bad_provs)))
same_provides = provides & item.provides
if same_provides:
raise exceptions.DependencyFailure(
"%(item)s provides %(value)s but is already being"
" provided by %(flow)s and duplicate producers"
" are disallowed"
% dict(item=item.name, flow=self.name,
value=sorted(same_provides)))
provides |= item.provides
# check that items don't require anything other children provides
if self.retry:
# NOTE(imelnikov): it is allowed to depend on value provided
# by retry controller of the flow
provides -= self.retry.provides
for item in items:
bad_reqs = provides & item.requires
if bad_reqs:
raise exceptions.DependencyFailure(
"%(item)s requires %(oo)s that are provided "
"by other item(s) of unordered flow %(flow)s"
% dict(item=item.name, flow=self.name,
oo=sorted(bad_reqs)))
self._children.update(items)
return self

View File

@@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import exceptions as exc
from taskflow.patterns import unordered_flow as uf
from taskflow import retry
from taskflow import test
@@ -59,7 +58,7 @@ class UnorderedFlowTest(test.TestCase):
self.assertEqual(f.requires, set(['a', 'b']))
self.assertEqual(f.provides, set(['c', 'd']))
def test_unordered_flow_two_independent_tasks(self):
def test_unordered_flow_two_tasks(self):
task1 = _task(name='task1')
task2 = _task(name='task2')
f = uf.Flow('test').add(task1, task2)
@@ -68,35 +67,29 @@ class UnorderedFlowTest(test.TestCase):
self.assertEqual(set(f), set([task1, task2]))
self.assertEqual(list(f.iter_links()), [])
def test_unordered_flow_two_dependent_tasks(self):
task1 = _task(name='task1', provides=['a'])
task2 = _task(name='task2', requires=['a'])
f = uf.Flow('test')
self.assertRaises(exc.DependencyFailure, f.add, task1, task2)
def test_unordered_flow_two_dependent_tasks_two_different_calls(self):
def test_unordered_flow_two_tasks_two_different_calls(self):
task1 = _task(name='task1', provides=['a'])
task2 = _task(name='task2', requires=['a'])
f = uf.Flow('test').add(task1)
self.assertRaises(exc.DependencyFailure, f.add, task2)
f.add(task2)
self.assertEqual(len(f), 2)
self.assertEqual(set(['a']), f.requires)
self.assertEqual(set(['a']), f.provides)
def test_unordered_flow_two_dependent_tasks_reverse_order(self):
def test_unordered_flow_two_tasks_reverse_order(self):
task1 = _task(name='task1', provides=['a'])
task2 = _task(name='task2', requires=['a'])
f = uf.Flow('test')
self.assertRaises(exc.DependencyFailure, f.add, task2, task1)
def test_unordered_flow_two_dependent_tasks_reverse_order2(self):
task1 = _task(name='task1', provides=['a'])
task2 = _task(name='task2', requires=['a'])
f = uf.Flow('test').add(task2)
self.assertRaises(exc.DependencyFailure, f.add, task1)
f = uf.Flow('test').add(task2).add(task1)
self.assertEqual(len(f), 2)
self.assertEqual(set(['a']), f.requires)
self.assertEqual(set(['a']), f.provides)
def test_unordered_flow_two_task_same_provide(self):
task1 = _task(name='task1', provides=['a', 'b'])
task2 = _task(name='task2', provides=['a', 'c'])
f = uf.Flow('test')
self.assertRaises(exc.DependencyFailure, f.add, task2, task1)
f.add(task2, task1)
self.assertEqual(len(f), 2)
def test_unordered_flow_with_retry(self):
ret = retry.AlwaysRevert(requires=['a'], provides=['b'])
@@ -106,3 +99,12 @@ class UnorderedFlowTest(test.TestCase):
self.assertEqual(f.requires, set(['a']))
self.assertEqual(f.provides, set(['b']))
def test_unordered_flow_with_retry_fully_satisfies(self):
ret = retry.AlwaysRevert(provides=['b', 'a'])
f = uf.Flow('test', ret)
f.add(_task(name='task1', requires=['a']))
self.assertIs(f.retry, ret)
self.assertEqual(ret.name, 'test_retry')
self.assertEqual(f.requires, set([]))
self.assertEqual(f.provides, set(['b', 'a']))

View File

@@ -130,24 +130,27 @@ class FlowDependenciesTest(test.TestCase):
def test_unordered_flow_provides_required_values(self):
flow = uf.Flow('uf')
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn('task1', provides='x'),
utils.TaskOneArg('task2'))
flow.add(utils.TaskOneReturn('task1', provides='x'),
utils.TaskOneArg('task2'))
flow.add(utils.TaskOneReturn('task1', provides='x'),
utils.TaskOneArg('task2'))
self.assertEqual(set(['x']), flow.provides)
self.assertEqual(set(['x']), flow.requires)
def test_unordered_flow_requires_provided_value_other_call(self):
flow = uf.Flow('uf')
flow.add(utils.TaskOneReturn('task1', provides='x'))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneArg('task2'))
flow.add(utils.TaskOneArg('task2'))
self.assertEqual(set(['x']), flow.provides)
self.assertEqual(set(['x']), flow.requires)
def test_unordered_flow_provides_required_value_other_call(self):
flow = uf.Flow('uf')
flow.add(utils.TaskOneArg('task2'))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn('task1', provides='x'))
flow.add(utils.TaskOneReturn('task1', provides='x'))
self.assertEqual(2, len(flow))
self.assertEqual(set(['x']), flow.provides)
self.assertEqual(set(['x']), flow.requires)
def test_unordered_flow_multi_provides_and_requires_values(self):
flow = uf.Flow('uf').add(
@@ -161,16 +164,14 @@ class FlowDependenciesTest(test.TestCase):
def test_unordered_flow_provides_same_values(self):
flow = uf.Flow('uf').add(utils.TaskOneReturn(provides='x'))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn(provides='x'))
flow.add(utils.TaskOneReturn(provides='x'))
self.assertEqual(set(['x']), flow.provides)
def test_unordered_flow_provides_same_values_one_add(self):
flow = uf.Flow('uf')
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn(provides='x'),
utils.TaskOneReturn(provides='x'))
flow.add(utils.TaskOneReturn(provides='x'),
utils.TaskOneReturn(provides='x'))
self.assertEqual(set(['x']), flow.provides)
def test_nested_flows_requirements(self):
flow = uf.Flow('uf').add(
@@ -339,24 +340,22 @@ class FlowDependenciesTest(test.TestCase):
self.assertEqual(flow.requires, set(['x', 'y', 'c']))
self.assertEqual(flow.provides, set(['a', 'b', 'z']))
def test_unordered_flow_retry_and_task_dependency_conflict(self):
def test_unordered_flow_retry_and_task_same_requires_provides(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt', requires=['x']))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn(provides=['x']))
flow.add(utils.TaskOneReturn(provides=['x']))
self.assertEqual(set(['x']), flow.requires)
self.assertEqual(set(['x']), flow.provides)
def test_unordered_flow_retry_and_task_provide_same_value(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['x']))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn('t1', provides=['x']))
flow.add(utils.TaskOneReturn('t1', provides=['x']))
self.assertEqual(set(['x']), flow.provides)
def test_unordered_flow_retry_two_tasks_provide_same_value(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['y']))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn('t1', provides=['x']),
utils.TaskOneReturn('t2', provides=['x']))
flow.add(utils.TaskOneReturn('t1', provides=['x']),
utils.TaskOneReturn('t2', provides=['x']))
self.assertEqual(set(['x', 'y']), flow.provides)
def test_graph_flow_retry_and_task(self):
flow = gf.Flow('gf', retry.AlwaysRevert('rt',