Relax the unordered flow symbol constraints

In order to make it possible to have a symbol
tree we need to relax and remove the constraints
that are being imposed by the unordered constraints
and later move those constraint checks and validations
into the engines compilation stage.

Part of blueprint taskflow-improved-scoping

Change-Id: I80718b4bc01fbf0dce6a95cd2fac7e6e2e1814d1
This commit is contained in:
Joshua Harlow
2014-07-18 12:58:18 -07:00
parent 2339bacaf7
commit 76641d86b8
3 changed files with 51 additions and 95 deletions

View File

@@ -14,70 +14,25 @@
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import exceptions
from taskflow import flow
class Flow(flow.Flow):
"""Unordered Flow pattern.
"""Unordered flow pattern.
A unordered (potentially nested) flow of *tasks/flows* that can be
executed in any order as one unit and rolled back as one unit.
NOTE(harlowja): Since the flow is unordered there can *not* be any
dependency between task/flow inputs (requirements) and
task/flow outputs (provided names/values).
"""
def __init__(self, name, retry=None):
super(Flow, self).__init__(name, retry)
# NOTE(imelnikov): A unordered flow is unordered, so we use
# set instead of list to save children, children so that
# people using it don't depend on the ordering
# people using it don't depend on the ordering.
self._children = set()
def add(self, *items):
"""Adds a given task/tasks/flow/flows to this flow."""
if not items:
return self
# check that items don't provide anything that other
# part of flow provides or requires
provides = self.provides
old_requires = self.requires
for item in items:
item_provides = item.provides
bad_provs = item_provides & old_requires
if bad_provs:
raise exceptions.DependencyFailure(
"%(item)s provides %(oo)s that are required "
"by other item(s) of unordered flow %(flow)s"
% dict(item=item.name, flow=self.name,
oo=sorted(bad_provs)))
same_provides = provides & item.provides
if same_provides:
raise exceptions.DependencyFailure(
"%(item)s provides %(value)s but is already being"
" provided by %(flow)s and duplicate producers"
" are disallowed"
% dict(item=item.name, flow=self.name,
value=sorted(same_provides)))
provides |= item.provides
# check that items don't require anything other children provides
if self.retry:
# NOTE(imelnikov): it is allowed to depend on value provided
# by retry controller of the flow
provides -= self.retry.provides
for item in items:
bad_reqs = provides & item.requires
if bad_reqs:
raise exceptions.DependencyFailure(
"%(item)s requires %(oo)s that are provided "
"by other item(s) of unordered flow %(flow)s"
% dict(item=item.name, flow=self.name,
oo=sorted(bad_reqs)))
self._children.update(items)
return self

View File

@@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import exceptions as exc
from taskflow.patterns import unordered_flow as uf
from taskflow import retry
from taskflow import test
@@ -59,7 +58,7 @@ class UnorderedFlowTest(test.TestCase):
self.assertEqual(f.requires, set(['a', 'b']))
self.assertEqual(f.provides, set(['c', 'd']))
def test_unordered_flow_two_independent_tasks(self):
def test_unordered_flow_two_tasks(self):
task1 = _task(name='task1')
task2 = _task(name='task2')
f = uf.Flow('test').add(task1, task2)
@@ -68,35 +67,29 @@ class UnorderedFlowTest(test.TestCase):
self.assertEqual(set(f), set([task1, task2]))
self.assertEqual(list(f.iter_links()), [])
def test_unordered_flow_two_dependent_tasks(self):
task1 = _task(name='task1', provides=['a'])
task2 = _task(name='task2', requires=['a'])
f = uf.Flow('test')
self.assertRaises(exc.DependencyFailure, f.add, task1, task2)
def test_unordered_flow_two_dependent_tasks_two_different_calls(self):
def test_unordered_flow_two_tasks_two_different_calls(self):
task1 = _task(name='task1', provides=['a'])
task2 = _task(name='task2', requires=['a'])
f = uf.Flow('test').add(task1)
self.assertRaises(exc.DependencyFailure, f.add, task2)
f.add(task2)
self.assertEqual(len(f), 2)
self.assertEqual(set(['a']), f.requires)
self.assertEqual(set(['a']), f.provides)
def test_unordered_flow_two_dependent_tasks_reverse_order(self):
def test_unordered_flow_two_tasks_reverse_order(self):
task1 = _task(name='task1', provides=['a'])
task2 = _task(name='task2', requires=['a'])
f = uf.Flow('test')
self.assertRaises(exc.DependencyFailure, f.add, task2, task1)
def test_unordered_flow_two_dependent_tasks_reverse_order2(self):
task1 = _task(name='task1', provides=['a'])
task2 = _task(name='task2', requires=['a'])
f = uf.Flow('test').add(task2)
self.assertRaises(exc.DependencyFailure, f.add, task1)
f = uf.Flow('test').add(task2).add(task1)
self.assertEqual(len(f), 2)
self.assertEqual(set(['a']), f.requires)
self.assertEqual(set(['a']), f.provides)
def test_unordered_flow_two_task_same_provide(self):
task1 = _task(name='task1', provides=['a', 'b'])
task2 = _task(name='task2', provides=['a', 'c'])
f = uf.Flow('test')
self.assertRaises(exc.DependencyFailure, f.add, task2, task1)
f.add(task2, task1)
self.assertEqual(len(f), 2)
def test_unordered_flow_with_retry(self):
ret = retry.AlwaysRevert(requires=['a'], provides=['b'])
@@ -106,3 +99,12 @@ class UnorderedFlowTest(test.TestCase):
self.assertEqual(f.requires, set(['a']))
self.assertEqual(f.provides, set(['b']))
def test_unordered_flow_with_retry_fully_satisfies(self):
ret = retry.AlwaysRevert(provides=['b', 'a'])
f = uf.Flow('test', ret)
f.add(_task(name='task1', requires=['a']))
self.assertIs(f.retry, ret)
self.assertEqual(ret.name, 'test_retry')
self.assertEqual(f.requires, set([]))
self.assertEqual(f.provides, set(['b', 'a']))

View File

@@ -130,24 +130,27 @@ class FlowDependenciesTest(test.TestCase):
def test_unordered_flow_provides_required_values(self):
flow = uf.Flow('uf')
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn('task1', provides='x'),
utils.TaskOneArg('task2'))
flow.add(utils.TaskOneReturn('task1', provides='x'),
utils.TaskOneArg('task2'))
flow.add(utils.TaskOneReturn('task1', provides='x'),
utils.TaskOneArg('task2'))
self.assertEqual(set(['x']), flow.provides)
self.assertEqual(set(['x']), flow.requires)
def test_unordered_flow_requires_provided_value_other_call(self):
flow = uf.Flow('uf')
flow.add(utils.TaskOneReturn('task1', provides='x'))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneArg('task2'))
flow.add(utils.TaskOneArg('task2'))
self.assertEqual(set(['x']), flow.provides)
self.assertEqual(set(['x']), flow.requires)
def test_unordered_flow_provides_required_value_other_call(self):
flow = uf.Flow('uf')
flow.add(utils.TaskOneArg('task2'))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn('task1', provides='x'))
flow.add(utils.TaskOneReturn('task1', provides='x'))
self.assertEqual(2, len(flow))
self.assertEqual(set(['x']), flow.provides)
self.assertEqual(set(['x']), flow.requires)
def test_unordered_flow_multi_provides_and_requires_values(self):
flow = uf.Flow('uf').add(
@@ -161,16 +164,14 @@ class FlowDependenciesTest(test.TestCase):
def test_unordered_flow_provides_same_values(self):
flow = uf.Flow('uf').add(utils.TaskOneReturn(provides='x'))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn(provides='x'))
flow.add(utils.TaskOneReturn(provides='x'))
self.assertEqual(set(['x']), flow.provides)
def test_unordered_flow_provides_same_values_one_add(self):
flow = uf.Flow('uf')
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn(provides='x'),
utils.TaskOneReturn(provides='x'))
flow.add(utils.TaskOneReturn(provides='x'),
utils.TaskOneReturn(provides='x'))
self.assertEqual(set(['x']), flow.provides)
def test_nested_flows_requirements(self):
flow = uf.Flow('uf').add(
@@ -339,24 +340,22 @@ class FlowDependenciesTest(test.TestCase):
self.assertEqual(flow.requires, set(['x', 'y', 'c']))
self.assertEqual(flow.provides, set(['a', 'b', 'z']))
def test_unordered_flow_retry_and_task_dependency_conflict(self):
def test_unordered_flow_retry_and_task_same_requires_provides(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt', requires=['x']))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn(provides=['x']))
flow.add(utils.TaskOneReturn(provides=['x']))
self.assertEqual(set(['x']), flow.requires)
self.assertEqual(set(['x']), flow.provides)
def test_unordered_flow_retry_and_task_provide_same_value(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['x']))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn('t1', provides=['x']))
flow.add(utils.TaskOneReturn('t1', provides=['x']))
self.assertEqual(set(['x']), flow.provides)
def test_unordered_flow_retry_two_tasks_provide_same_value(self):
flow = uf.Flow('uf', retry.AlwaysRevert('rt', provides=['y']))
self.assertRaises(exceptions.DependencyFailure,
flow.add,
utils.TaskOneReturn('t1', provides=['x']),
utils.TaskOneReturn('t2', provides=['x']))
flow.add(utils.TaskOneReturn('t1', provides=['x']),
utils.TaskOneReturn('t2', provides=['x']))
self.assertEqual(set(['x', 'y']), flow.provides)
def test_graph_flow_retry_and_task(self):
flow = gf.Flow('gf', retry.AlwaysRevert('rt',