Merge "Add provides and requires properties to Flow"
This commit is contained in:
@@ -28,8 +28,8 @@ class TaskAction(base.Action):
|
||||
|
||||
def __init__(self, task, engine):
|
||||
self._task = task
|
||||
self._result_mapping = task.provides
|
||||
self._args_mapping = task.requires
|
||||
self._result_mapping = task.save_as
|
||||
self._args_mapping = task.rebind
|
||||
try:
|
||||
self._id = engine.storage.get_uuid_by_name(self._task.name)
|
||||
except exceptions.NotFound:
|
||||
|
||||
@@ -81,3 +81,11 @@ class Flow(object):
|
||||
def add(self, *items):
|
||||
"""Adds a given item/items to this flow."""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractproperty
|
||||
def requires(self):
|
||||
"""Browse flow requirements."""
|
||||
|
||||
@abc.abstractproperty
|
||||
def provides(self):
|
||||
"""Browse values provided by the flow."""
|
||||
|
||||
@@ -45,3 +45,19 @@ class Flow(flow.Flow):
|
||||
def __iter__(self):
|
||||
for child in self._children:
|
||||
yield child
|
||||
|
||||
@property
|
||||
def provides(self):
|
||||
provides = set()
|
||||
for subflow in self._children:
|
||||
provides.update(subflow.provides)
|
||||
return provides
|
||||
|
||||
@property
|
||||
def requires(self):
|
||||
requires = set()
|
||||
provides = set()
|
||||
for subflow in self._children:
|
||||
requires.update(subflow.requires - provides)
|
||||
provides.update(subflow.provides)
|
||||
return requires
|
||||
|
||||
@@ -46,6 +46,20 @@ class Flow(flow.Flow):
|
||||
self._count += 1
|
||||
return self
|
||||
|
||||
@property
|
||||
def provides(self):
|
||||
provides = set()
|
||||
for subflow in self:
|
||||
provides.update(subflow.provides)
|
||||
return provides
|
||||
|
||||
@property
|
||||
def requires(self):
|
||||
requires = set()
|
||||
for subflow in self:
|
||||
requires.update(subflow.requires)
|
||||
return requires
|
||||
|
||||
def __len__(self):
|
||||
return self._count
|
||||
|
||||
|
||||
@@ -105,12 +105,12 @@ class BaseTask(object):
|
||||
# on existing before this task can be applied.
|
||||
#
|
||||
# Format is input_name:arg_name
|
||||
self.requires = {}
|
||||
self.rebind = {}
|
||||
# An *immutable* output 'resource' name dict this task
|
||||
# produces that other tasks may depend on this task providing.
|
||||
#
|
||||
# Format is output index:arg_name
|
||||
self.provides = _save_as_to_mapping(provides)
|
||||
self.save_as = _save_as_to_mapping(provides)
|
||||
# This identifies the version of the task to be ran which
|
||||
# can be useful in resuming older versions of tasks. Standard
|
||||
# major, minor version semantics apply.
|
||||
@@ -139,6 +139,14 @@ class BaseTask(object):
|
||||
said reversion.
|
||||
"""
|
||||
|
||||
@property
|
||||
def provides(self):
|
||||
return set(self.save_as)
|
||||
|
||||
@property
|
||||
def requires(self):
|
||||
return set(self.rebind.values())
|
||||
|
||||
|
||||
class Task(BaseTask):
|
||||
"""Base class for user-defined tasks
|
||||
@@ -155,8 +163,8 @@ class Task(BaseTask):
|
||||
name = reflection.get_callable_name(self)
|
||||
super(Task, self).__init__(name,
|
||||
provides=provides)
|
||||
self.requires = _build_arg_mapping(self.name, requires, rebind,
|
||||
self.execute, auto_extract)
|
||||
self.rebind = _build_arg_mapping(self.name, requires, rebind,
|
||||
self.execute, auto_extract)
|
||||
|
||||
|
||||
class FunctorTask(BaseTask):
|
||||
@@ -176,8 +184,8 @@ class FunctorTask(BaseTask):
|
||||
self._revert = revert
|
||||
if version is not None:
|
||||
self.version = version
|
||||
self.requires = _build_arg_mapping(self.name, requires, rebind,
|
||||
execute, auto_extract)
|
||||
self.rebind = _build_arg_mapping(self.name, requires, rebind,
|
||||
execute, auto_extract)
|
||||
|
||||
def execute(self, *args, **kwargs):
|
||||
return self._execute(*args, **kwargs)
|
||||
|
||||
@@ -94,7 +94,7 @@ class MultiargsTask(task.Task):
|
||||
class MultiDictTask(task.Task):
|
||||
def execute(self):
|
||||
output = {}
|
||||
for i, k in enumerate(sorted(self.provides.keys())):
|
||||
for i, k in enumerate(sorted(self.provides)):
|
||||
output[k] = i
|
||||
return output
|
||||
|
||||
|
||||
220
taskflow/tests/unit/test_flow_dependencies.py
Normal file
220
taskflow/tests/unit/test_flow_dependencies.py
Normal file
@@ -0,0 +1,220 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from taskflow.patterns import linear_flow as lf
|
||||
from taskflow.patterns import unordered_flow as uf
|
||||
|
||||
from taskflow import task
|
||||
from taskflow import test
|
||||
|
||||
|
||||
class TaskNoRequiresNoReturns(task.Task):
|
||||
|
||||
def execute(self, **kwargs):
|
||||
pass
|
||||
|
||||
def revert(self, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class TaskOneArg(task.Task):
|
||||
|
||||
def execute(self, x, **kwargs):
|
||||
pass
|
||||
|
||||
def revert(self, x, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class TaskMultiArg(task.Task):
|
||||
|
||||
def execute(self, x, y, z, **kwargs):
|
||||
pass
|
||||
|
||||
def revert(self, x, y, z, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class TaskOneReturn(task.Task):
|
||||
|
||||
def execute(self, **kwargs):
|
||||
return 1
|
||||
|
||||
def revert(self, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class TaskMultiReturn(task.Task):
|
||||
|
||||
def execute(self, **kwargs):
|
||||
return 1, 3, 5
|
||||
|
||||
def revert(self, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class TaskOneArgOneReturn(task.Task):
|
||||
|
||||
def execute(self, x, **kwargs):
|
||||
return 1
|
||||
|
||||
def revert(self, x, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class TaskMultiArgMultiReturn(task.Task):
|
||||
|
||||
def execute(self, x, y, z, **kwargs):
|
||||
return 1, 3, 5
|
||||
|
||||
def revert(self, x, y, z, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class FlowDependenciesTest(test.TestCase):
|
||||
|
||||
def test_task_without_dependencies(self):
|
||||
flow = TaskNoRequiresNoReturns()
|
||||
self.assertEquals(flow.requires, set())
|
||||
self.assertEquals(flow.provides, set())
|
||||
|
||||
def test_task_requires_default_values(self):
|
||||
flow = TaskMultiArg()
|
||||
self.assertEquals(flow.requires, set(['x', 'y', 'z']))
|
||||
self.assertEquals(flow.provides, set())
|
||||
|
||||
def test_task_requires_rebinded_mapped(self):
|
||||
flow = TaskMultiArg(rebind={'x': 'a', 'y': 'b', 'z': 'c'})
|
||||
self.assertEquals(flow.requires, set(['a', 'b', 'c']))
|
||||
self.assertEquals(flow.provides, set())
|
||||
|
||||
def test_task_requires_additional_values(self):
|
||||
flow = TaskMultiArg(requires=['a', 'b'])
|
||||
self.assertEquals(flow.requires, set(['a', 'b', 'x', 'y', 'z']))
|
||||
self.assertEquals(flow.provides, set())
|
||||
|
||||
def test_task_provides_values(self):
|
||||
flow = TaskMultiReturn(provides=['a', 'b', 'c'])
|
||||
self.assertEquals(flow.requires, set())
|
||||
self.assertEquals(flow.provides, set(['a', 'b', 'c']))
|
||||
|
||||
def test_task_provides_and_requires_values(self):
|
||||
flow = TaskMultiArgMultiReturn(provides=['a', 'b', 'c'])
|
||||
self.assertEquals(flow.requires, set(['x', 'y', 'z']))
|
||||
self.assertEquals(flow.provides, set(['a', 'b', 'c']))
|
||||
|
||||
def test_linear_flow_without_dependencies(self):
|
||||
flow = lf.Flow('lf').add(
|
||||
TaskNoRequiresNoReturns('task1'),
|
||||
TaskNoRequiresNoReturns('task2'))
|
||||
self.assertEquals(flow.requires, set())
|
||||
self.assertEquals(flow.provides, set())
|
||||
|
||||
def test_linear_flow_reuires_values(self):
|
||||
flow = lf.Flow('lf').add(
|
||||
TaskOneArg('task1'),
|
||||
TaskMultiArg('task2'))
|
||||
self.assertEquals(flow.requires, set(['x', 'y', 'z']))
|
||||
self.assertEquals(flow.provides, set())
|
||||
|
||||
def test_linear_flow_reuires_rebind_values(self):
|
||||
flow = lf.Flow('lf').add(
|
||||
TaskOneArg('task1', rebind=['q']),
|
||||
TaskMultiArg('task2'))
|
||||
self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q']))
|
||||
self.assertEquals(flow.provides, set())
|
||||
|
||||
def test_linear_flow_provides_values(self):
|
||||
flow = lf.Flow('lf').add(
|
||||
TaskOneReturn('task1', provides='x'),
|
||||
TaskMultiReturn('task2', provides=['a', 'b', 'c']))
|
||||
self.assertEquals(flow.requires, set())
|
||||
self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c']))
|
||||
|
||||
def test_linear_flow_provides_required_values(self):
|
||||
flow = lf.Flow('lf').add(
|
||||
TaskOneReturn('task1', provides='x'),
|
||||
TaskOneArg('task2'))
|
||||
self.assertEquals(flow.requires, set())
|
||||
self.assertEquals(flow.provides, set(['x']))
|
||||
|
||||
def test_linear_flow_multi_provides_and_requires_values(self):
|
||||
flow = lf.Flow('lf').add(
|
||||
TaskMultiArgMultiReturn('task1',
|
||||
rebind=['a', 'b', 'c'],
|
||||
provides=['x', 'y', 'q']),
|
||||
TaskMultiArgMultiReturn('task2',
|
||||
provides=['i', 'j', 'k']))
|
||||
self.assertEquals(flow.requires, set(['a', 'b', 'c', 'z']))
|
||||
self.assertEquals(flow.provides, set(['x', 'y', 'q', 'i', 'j', 'k']))
|
||||
|
||||
def test_unordered_flow_without_dependencies(self):
|
||||
flow = uf.Flow('uf').add(
|
||||
TaskNoRequiresNoReturns('task1'),
|
||||
TaskNoRequiresNoReturns('task2'))
|
||||
self.assertEquals(flow.requires, set())
|
||||
self.assertEquals(flow.provides, set())
|
||||
|
||||
def test_unordered_flow_reuires_values(self):
|
||||
flow = uf.Flow('uf').add(
|
||||
TaskOneArg('task1'),
|
||||
TaskMultiArg('task2'))
|
||||
self.assertEquals(flow.requires, set(['x', 'y', 'z']))
|
||||
self.assertEquals(flow.provides, set())
|
||||
|
||||
def test_unordered_flow_reuires_rebind_values(self):
|
||||
flow = uf.Flow('uf').add(
|
||||
TaskOneArg('task1', rebind=['q']),
|
||||
TaskMultiArg('task2'))
|
||||
self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q']))
|
||||
self.assertEquals(flow.provides, set())
|
||||
|
||||
def test_unordered_flow_provides_values(self):
|
||||
flow = uf.Flow('uf').add(
|
||||
TaskOneReturn('task1', provides='x'),
|
||||
TaskMultiReturn('task2', provides=['a', 'b', 'c']))
|
||||
self.assertEquals(flow.requires, set())
|
||||
self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c']))
|
||||
|
||||
def test_unordered_flow_provides_required_values(self):
|
||||
flow = uf.Flow('uf').add(
|
||||
TaskOneReturn('task1', provides='x'),
|
||||
TaskOneArg('task2'))
|
||||
self.assertEquals(flow.requires, set(['x']))
|
||||
self.assertEquals(flow.provides, set(['x']))
|
||||
|
||||
def test_unordered_flow_multi_provides_and_requires_values(self):
|
||||
flow = uf.Flow('uf').add(
|
||||
TaskMultiArgMultiReturn('task1',
|
||||
rebind=['a', 'b', 'c'],
|
||||
provides=['x', 'y', 'q']),
|
||||
TaskMultiArgMultiReturn('task2',
|
||||
provides=['i', 'j', 'k']))
|
||||
self.assertEquals(flow.requires, set(['a', 'b', 'c', 'x', 'y', 'z']))
|
||||
self.assertEquals(flow.provides, set(['x', 'y', 'q', 'i', 'j', 'k']))
|
||||
|
||||
def test_nested_flows_requirements(self):
|
||||
flow = uf.Flow('uf').add(
|
||||
lf.Flow('lf').add(
|
||||
TaskOneArgOneReturn('task1', rebind=['a'], provides=['x']),
|
||||
TaskOneArgOneReturn('task2', provides=['y'])),
|
||||
uf.Flow('uf').add(
|
||||
TaskOneArgOneReturn('task3', rebind=['b'], provides=['z']),
|
||||
TaskOneArgOneReturn('task4', rebind=['c'], provides=['q'])))
|
||||
self.assertEquals(flow.requires, set(['a', 'b', 'c']))
|
||||
self.assertEquals(flow.provides, set(['x', 'y', 'z', 'q']))
|
||||
@@ -44,19 +44,19 @@ class TaskTestCase(test.TestCase):
|
||||
|
||||
def test_no_provides(self):
|
||||
my_task = MyTask()
|
||||
self.assertEquals(my_task.provides, {})
|
||||
self.assertEquals(my_task.save_as, {})
|
||||
|
||||
def test_provides(self):
|
||||
my_task = MyTask(provides='food')
|
||||
self.assertEquals(my_task.provides, {'food': None})
|
||||
self.assertEquals(my_task.save_as, {'food': None})
|
||||
|
||||
def test_multi_provides(self):
|
||||
my_task = MyTask(provides=('food', 'water'))
|
||||
self.assertEquals(my_task.provides, {'food': 0, 'water': 1})
|
||||
self.assertEquals(my_task.save_as, {'food': 0, 'water': 1})
|
||||
|
||||
def test_unpack(self):
|
||||
my_task = MyTask(provides=('food',))
|
||||
self.assertEquals(my_task.provides, {'food': 0})
|
||||
self.assertEquals(my_task.save_as, {'food': 0})
|
||||
|
||||
def test_bad_provides(self):
|
||||
with self.assertRaisesRegexp(TypeError, '^Task provides'):
|
||||
@@ -64,7 +64,7 @@ class TaskTestCase(test.TestCase):
|
||||
|
||||
def test_requires_by_default(self):
|
||||
my_task = MyTask()
|
||||
self.assertEquals(my_task.requires, {
|
||||
self.assertEquals(my_task.rebind, {
|
||||
'spam': 'spam',
|
||||
'eggs': 'eggs',
|
||||
'context': 'context'
|
||||
@@ -72,7 +72,7 @@ class TaskTestCase(test.TestCase):
|
||||
|
||||
def test_requires_amended(self):
|
||||
my_task = MyTask(requires=('spam', 'eggs'))
|
||||
self.assertEquals(my_task.requires, {
|
||||
self.assertEquals(my_task.rebind, {
|
||||
'spam': 'spam',
|
||||
'eggs': 'eggs',
|
||||
'context': 'context'
|
||||
@@ -81,7 +81,7 @@ class TaskTestCase(test.TestCase):
|
||||
def test_requires_explicit(self):
|
||||
my_task = MyTask(auto_extract=False,
|
||||
requires=('spam', 'eggs', 'context'))
|
||||
self.assertEquals(my_task.requires, {
|
||||
self.assertEquals(my_task.rebind, {
|
||||
'spam': 'spam',
|
||||
'eggs': 'eggs',
|
||||
'context': 'context'
|
||||
@@ -93,7 +93,7 @@ class TaskTestCase(test.TestCase):
|
||||
|
||||
def test_rebind_all_args(self):
|
||||
my_task = MyTask(rebind={'spam': 'a', 'eggs': 'b', 'context': 'c'})
|
||||
self.assertEquals(my_task.requires, {
|
||||
self.assertEquals(my_task.rebind, {
|
||||
'spam': 'a',
|
||||
'eggs': 'b',
|
||||
'context': 'c'
|
||||
@@ -101,7 +101,7 @@ class TaskTestCase(test.TestCase):
|
||||
|
||||
def test_rebind_partial(self):
|
||||
my_task = MyTask(rebind={'spam': 'a', 'eggs': 'b'})
|
||||
self.assertEquals(my_task.requires, {
|
||||
self.assertEquals(my_task.rebind, {
|
||||
'spam': 'a',
|
||||
'eggs': 'b',
|
||||
'context': 'context'
|
||||
@@ -113,14 +113,14 @@ class TaskTestCase(test.TestCase):
|
||||
|
||||
def test_rebind_unknown_kwargs(self):
|
||||
task = KwargsTask(rebind={'foo': 'bar'})
|
||||
self.assertEquals(task.requires, {
|
||||
self.assertEquals(task.rebind, {
|
||||
'foo': 'bar',
|
||||
'spam': 'spam'
|
||||
})
|
||||
|
||||
def test_rebind_list_all(self):
|
||||
my_task = MyTask(rebind=('a', 'b', 'c'))
|
||||
self.assertEquals(my_task.requires, {
|
||||
self.assertEquals(my_task.rebind, {
|
||||
'context': 'a',
|
||||
'spam': 'b',
|
||||
'eggs': 'c'
|
||||
@@ -128,7 +128,7 @@ class TaskTestCase(test.TestCase):
|
||||
|
||||
def test_rebind_list_partial(self):
|
||||
my_task = MyTask(rebind=('a', 'b'))
|
||||
self.assertEquals(my_task.requires, {
|
||||
self.assertEquals(my_task.rebind, {
|
||||
'context': 'a',
|
||||
'spam': 'b',
|
||||
'eggs': 'eggs'
|
||||
@@ -140,7 +140,7 @@ class TaskTestCase(test.TestCase):
|
||||
|
||||
def test_rebind_list_more_kwargs(self):
|
||||
task = KwargsTask(rebind=('a', 'b', 'c'))
|
||||
self.assertEquals(task.requires, {
|
||||
self.assertEquals(task.rebind, {
|
||||
'spam': 'a',
|
||||
'b': 'b',
|
||||
'c': 'c'
|
||||
|
||||
@@ -61,7 +61,7 @@ class ProvidesRequiresTask(task.Task):
|
||||
return tuple(outs)
|
||||
else:
|
||||
outs = {}
|
||||
for k in self.provides.keys():
|
||||
for k in self.provides:
|
||||
outs[k] = k
|
||||
return outs
|
||||
|
||||
|
||||
Reference in New Issue
Block a user