From c73c0975600121fda89039a851bd44bc1280016b Mon Sep 17 00:00:00 2001 From: Anastasia Karpinska Date: Tue, 10 Sep 2013 13:34:29 +0300 Subject: [PATCH] Add provides and requires properties to Flow Provides and requires properties are used to browse all required and provided values for the whole Flow. The same properties were added to the Task. Appropriate Task properties were renamed to rebind and save_as. Change-Id: I02eb02303a9701a13f1a54f06f20bbf9aebd1d04 --- taskflow/engines/action_engine/task_action.py | 4 +- taskflow/flow.py | 8 + taskflow/patterns/linear_flow.py | 16 ++ taskflow/patterns/unordered_flow.py | 14 ++ taskflow/task.py | 20 +- taskflow/tests/unit/test_action_engine.py | 2 +- taskflow/tests/unit/test_flow_dependencies.py | 220 ++++++++++++++++++ taskflow/tests/unit/test_task.py | 26 +-- taskflow/tests/utils.py | 2 +- 9 files changed, 289 insertions(+), 23 deletions(-) create mode 100644 taskflow/tests/unit/test_flow_dependencies.py diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index 80be36b3..31341c0b 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -28,8 +28,8 @@ class TaskAction(base.Action): def __init__(self, task, engine): self._task = task - self._result_mapping = task.provides - self._args_mapping = task.requires + self._result_mapping = task.save_as + self._args_mapping = task.rebind try: self._id = engine.storage.get_uuid_by_name(self._task.name) except exceptions.NotFound: diff --git a/taskflow/flow.py b/taskflow/flow.py index 9ba6f6f3..92acb57f 100644 --- a/taskflow/flow.py +++ b/taskflow/flow.py @@ -81,3 +81,11 @@ class Flow(object): def add(self, *items): """Adds a given item/items to this flow.""" raise NotImplementedError() + + @abc.abstractproperty + def requires(self): + """Browse flow requirements.""" + + @abc.abstractproperty + def provides(self): + """Browse values provided by the flow.""" diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 8997fb8b..f7cf16e0 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -45,3 +45,19 @@ class Flow(flow.Flow): def __iter__(self): for child in self._children: yield child + + @property + def provides(self): + provides = set() + for subflow in self._children: + provides.update(subflow.provides) + return provides + + @property + def requires(self): + requires = set() + provides = set() + for subflow in self._children: + requires.update(subflow.requires - provides) + provides.update(subflow.provides) + return requires diff --git a/taskflow/patterns/unordered_flow.py b/taskflow/patterns/unordered_flow.py index 65434b55..acf616ce 100644 --- a/taskflow/patterns/unordered_flow.py +++ b/taskflow/patterns/unordered_flow.py @@ -46,6 +46,20 @@ class Flow(flow.Flow): self._count += 1 return self + @property + def provides(self): + provides = set() + for subflow in self: + provides.update(subflow.provides) + return provides + + @property + def requires(self): + requires = set() + for subflow in self: + requires.update(subflow.requires) + return requires + def __len__(self): return self._count diff --git a/taskflow/task.py b/taskflow/task.py index 20db472f..fb699645 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -105,12 +105,12 @@ class BaseTask(object): # on existing before this task can be applied. # # Format is input_name:arg_name - self.requires = {} + self.rebind = {} # An *immutable* output 'resource' name dict this task # produces that other tasks may depend on this task providing. # # Format is output index:arg_name - self.provides = _save_as_to_mapping(provides) + self.save_as = _save_as_to_mapping(provides) # This identifies the version of the task to be ran which # can be useful in resuming older versions of tasks. Standard # major, minor version semantics apply. @@ -139,6 +139,14 @@ class BaseTask(object): said reversion. """ + @property + def provides(self): + return set(self.save_as) + + @property + def requires(self): + return set(self.rebind.values()) + class Task(BaseTask): """Base class for user-defined tasks @@ -155,8 +163,8 @@ class Task(BaseTask): name = reflection.get_callable_name(self) super(Task, self).__init__(name, provides=provides) - self.requires = _build_arg_mapping(self.name, requires, rebind, - self.execute, auto_extract) + self.rebind = _build_arg_mapping(self.name, requires, rebind, + self.execute, auto_extract) class FunctorTask(BaseTask): @@ -176,8 +184,8 @@ class FunctorTask(BaseTask): self._revert = revert if version is not None: self.version = version - self.requires = _build_arg_mapping(self.name, requires, rebind, - execute, auto_extract) + self.rebind = _build_arg_mapping(self.name, requires, rebind, + execute, auto_extract) def execute(self, *args, **kwargs): return self._execute(*args, **kwargs) diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 538c826c..9d6697ff 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -94,7 +94,7 @@ class MultiargsTask(task.Task): class MultiDictTask(task.Task): def execute(self): output = {} - for i, k in enumerate(sorted(self.provides.keys())): + for i, k in enumerate(sorted(self.provides)): output[k] = i return output diff --git a/taskflow/tests/unit/test_flow_dependencies.py b/taskflow/tests/unit/test_flow_dependencies.py new file mode 100644 index 00000000..68805f2f --- /dev/null +++ b/taskflow/tests/unit/test_flow_dependencies.py @@ -0,0 +1,220 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow.patterns import linear_flow as lf +from taskflow.patterns import unordered_flow as uf + +from taskflow import task +from taskflow import test + + +class TaskNoRequiresNoReturns(task.Task): + + def execute(self, **kwargs): + pass + + def revert(self, **kwargs): + pass + + +class TaskOneArg(task.Task): + + def execute(self, x, **kwargs): + pass + + def revert(self, x, **kwargs): + pass + + +class TaskMultiArg(task.Task): + + def execute(self, x, y, z, **kwargs): + pass + + def revert(self, x, y, z, **kwargs): + pass + + +class TaskOneReturn(task.Task): + + def execute(self, **kwargs): + return 1 + + def revert(self, **kwargs): + pass + + +class TaskMultiReturn(task.Task): + + def execute(self, **kwargs): + return 1, 3, 5 + + def revert(self, **kwargs): + pass + + +class TaskOneArgOneReturn(task.Task): + + def execute(self, x, **kwargs): + return 1 + + def revert(self, x, **kwargs): + pass + + +class TaskMultiArgMultiReturn(task.Task): + + def execute(self, x, y, z, **kwargs): + return 1, 3, 5 + + def revert(self, x, y, z, **kwargs): + pass + + +class FlowDependenciesTest(test.TestCase): + + def test_task_without_dependencies(self): + flow = TaskNoRequiresNoReturns() + self.assertEquals(flow.requires, set()) + self.assertEquals(flow.provides, set()) + + def test_task_requires_default_values(self): + flow = TaskMultiArg() + self.assertEquals(flow.requires, set(['x', 'y', 'z'])) + self.assertEquals(flow.provides, set()) + + def test_task_requires_rebinded_mapped(self): + flow = TaskMultiArg(rebind={'x': 'a', 'y': 'b', 'z': 'c'}) + self.assertEquals(flow.requires, set(['a', 'b', 'c'])) + self.assertEquals(flow.provides, set()) + + def test_task_requires_additional_values(self): + flow = TaskMultiArg(requires=['a', 'b']) + self.assertEquals(flow.requires, set(['a', 'b', 'x', 'y', 'z'])) + self.assertEquals(flow.provides, set()) + + def test_task_provides_values(self): + flow = TaskMultiReturn(provides=['a', 'b', 'c']) + self.assertEquals(flow.requires, set()) + self.assertEquals(flow.provides, set(['a', 'b', 'c'])) + + def test_task_provides_and_requires_values(self): + flow = TaskMultiArgMultiReturn(provides=['a', 'b', 'c']) + self.assertEquals(flow.requires, set(['x', 'y', 'z'])) + self.assertEquals(flow.provides, set(['a', 'b', 'c'])) + + def test_linear_flow_without_dependencies(self): + flow = lf.Flow('lf').add( + TaskNoRequiresNoReturns('task1'), + TaskNoRequiresNoReturns('task2')) + self.assertEquals(flow.requires, set()) + self.assertEquals(flow.provides, set()) + + def test_linear_flow_reuires_values(self): + flow = lf.Flow('lf').add( + TaskOneArg('task1'), + TaskMultiArg('task2')) + self.assertEquals(flow.requires, set(['x', 'y', 'z'])) + self.assertEquals(flow.provides, set()) + + def test_linear_flow_reuires_rebind_values(self): + flow = lf.Flow('lf').add( + TaskOneArg('task1', rebind=['q']), + TaskMultiArg('task2')) + self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q'])) + self.assertEquals(flow.provides, set()) + + def test_linear_flow_provides_values(self): + flow = lf.Flow('lf').add( + TaskOneReturn('task1', provides='x'), + TaskMultiReturn('task2', provides=['a', 'b', 'c'])) + self.assertEquals(flow.requires, set()) + self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c'])) + + def test_linear_flow_provides_required_values(self): + flow = lf.Flow('lf').add( + TaskOneReturn('task1', provides='x'), + TaskOneArg('task2')) + self.assertEquals(flow.requires, set()) + self.assertEquals(flow.provides, set(['x'])) + + def test_linear_flow_multi_provides_and_requires_values(self): + flow = lf.Flow('lf').add( + TaskMultiArgMultiReturn('task1', + rebind=['a', 'b', 'c'], + provides=['x', 'y', 'q']), + TaskMultiArgMultiReturn('task2', + provides=['i', 'j', 'k'])) + self.assertEquals(flow.requires, set(['a', 'b', 'c', 'z'])) + self.assertEquals(flow.provides, set(['x', 'y', 'q', 'i', 'j', 'k'])) + + def test_unordered_flow_without_dependencies(self): + flow = uf.Flow('uf').add( + TaskNoRequiresNoReturns('task1'), + TaskNoRequiresNoReturns('task2')) + self.assertEquals(flow.requires, set()) + self.assertEquals(flow.provides, set()) + + def test_unordered_flow_reuires_values(self): + flow = uf.Flow('uf').add( + TaskOneArg('task1'), + TaskMultiArg('task2')) + self.assertEquals(flow.requires, set(['x', 'y', 'z'])) + self.assertEquals(flow.provides, set()) + + def test_unordered_flow_reuires_rebind_values(self): + flow = uf.Flow('uf').add( + TaskOneArg('task1', rebind=['q']), + TaskMultiArg('task2')) + self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q'])) + self.assertEquals(flow.provides, set()) + + def test_unordered_flow_provides_values(self): + flow = uf.Flow('uf').add( + TaskOneReturn('task1', provides='x'), + TaskMultiReturn('task2', provides=['a', 'b', 'c'])) + self.assertEquals(flow.requires, set()) + self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c'])) + + def test_unordered_flow_provides_required_values(self): + flow = uf.Flow('uf').add( + TaskOneReturn('task1', provides='x'), + TaskOneArg('task2')) + self.assertEquals(flow.requires, set(['x'])) + self.assertEquals(flow.provides, set(['x'])) + + def test_unordered_flow_multi_provides_and_requires_values(self): + flow = uf.Flow('uf').add( + TaskMultiArgMultiReturn('task1', + rebind=['a', 'b', 'c'], + provides=['x', 'y', 'q']), + TaskMultiArgMultiReturn('task2', + provides=['i', 'j', 'k'])) + self.assertEquals(flow.requires, set(['a', 'b', 'c', 'x', 'y', 'z'])) + self.assertEquals(flow.provides, set(['x', 'y', 'q', 'i', 'j', 'k'])) + + def test_nested_flows_requirements(self): + flow = uf.Flow('uf').add( + lf.Flow('lf').add( + TaskOneArgOneReturn('task1', rebind=['a'], provides=['x']), + TaskOneArgOneReturn('task2', provides=['y'])), + uf.Flow('uf').add( + TaskOneArgOneReturn('task3', rebind=['b'], provides=['z']), + TaskOneArgOneReturn('task4', rebind=['c'], provides=['q']))) + self.assertEquals(flow.requires, set(['a', 'b', 'c'])) + self.assertEquals(flow.provides, set(['x', 'y', 'z', 'q'])) diff --git a/taskflow/tests/unit/test_task.py b/taskflow/tests/unit/test_task.py index 0ee3e263..5c45659e 100644 --- a/taskflow/tests/unit/test_task.py +++ b/taskflow/tests/unit/test_task.py @@ -44,19 +44,19 @@ class TaskTestCase(test.TestCase): def test_no_provides(self): my_task = MyTask() - self.assertEquals(my_task.provides, {}) + self.assertEquals(my_task.save_as, {}) def test_provides(self): my_task = MyTask(provides='food') - self.assertEquals(my_task.provides, {'food': None}) + self.assertEquals(my_task.save_as, {'food': None}) def test_multi_provides(self): my_task = MyTask(provides=('food', 'water')) - self.assertEquals(my_task.provides, {'food': 0, 'water': 1}) + self.assertEquals(my_task.save_as, {'food': 0, 'water': 1}) def test_unpack(self): my_task = MyTask(provides=('food',)) - self.assertEquals(my_task.provides, {'food': 0}) + self.assertEquals(my_task.save_as, {'food': 0}) def test_bad_provides(self): with self.assertRaisesRegexp(TypeError, '^Task provides'): @@ -64,7 +64,7 @@ class TaskTestCase(test.TestCase): def test_requires_by_default(self): my_task = MyTask() - self.assertEquals(my_task.requires, { + self.assertEquals(my_task.rebind, { 'spam': 'spam', 'eggs': 'eggs', 'context': 'context' @@ -72,7 +72,7 @@ class TaskTestCase(test.TestCase): def test_requires_amended(self): my_task = MyTask(requires=('spam', 'eggs')) - self.assertEquals(my_task.requires, { + self.assertEquals(my_task.rebind, { 'spam': 'spam', 'eggs': 'eggs', 'context': 'context' @@ -81,7 +81,7 @@ class TaskTestCase(test.TestCase): def test_requires_explicit(self): my_task = MyTask(auto_extract=False, requires=('spam', 'eggs', 'context')) - self.assertEquals(my_task.requires, { + self.assertEquals(my_task.rebind, { 'spam': 'spam', 'eggs': 'eggs', 'context': 'context' @@ -93,7 +93,7 @@ class TaskTestCase(test.TestCase): def test_rebind_all_args(self): my_task = MyTask(rebind={'spam': 'a', 'eggs': 'b', 'context': 'c'}) - self.assertEquals(my_task.requires, { + self.assertEquals(my_task.rebind, { 'spam': 'a', 'eggs': 'b', 'context': 'c' @@ -101,7 +101,7 @@ class TaskTestCase(test.TestCase): def test_rebind_partial(self): my_task = MyTask(rebind={'spam': 'a', 'eggs': 'b'}) - self.assertEquals(my_task.requires, { + self.assertEquals(my_task.rebind, { 'spam': 'a', 'eggs': 'b', 'context': 'context' @@ -113,14 +113,14 @@ class TaskTestCase(test.TestCase): def test_rebind_unknown_kwargs(self): task = KwargsTask(rebind={'foo': 'bar'}) - self.assertEquals(task.requires, { + self.assertEquals(task.rebind, { 'foo': 'bar', 'spam': 'spam' }) def test_rebind_list_all(self): my_task = MyTask(rebind=('a', 'b', 'c')) - self.assertEquals(my_task.requires, { + self.assertEquals(my_task.rebind, { 'context': 'a', 'spam': 'b', 'eggs': 'c' @@ -128,7 +128,7 @@ class TaskTestCase(test.TestCase): def test_rebind_list_partial(self): my_task = MyTask(rebind=('a', 'b')) - self.assertEquals(my_task.requires, { + self.assertEquals(my_task.rebind, { 'context': 'a', 'spam': 'b', 'eggs': 'eggs' @@ -140,7 +140,7 @@ class TaskTestCase(test.TestCase): def test_rebind_list_more_kwargs(self): task = KwargsTask(rebind=('a', 'b', 'c')) - self.assertEquals(task.requires, { + self.assertEquals(task.rebind, { 'spam': 'a', 'b': 'b', 'c': 'c' diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index e34ff6a5..4bdd165f 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -61,7 +61,7 @@ class ProvidesRequiresTask(task.Task): return tuple(outs) else: outs = {} - for k in self.provides.keys(): + for k in self.provides: outs[k] = k return outs