Add provides and requires properties to Flow

Provides and requires properties are used to browse all
required and provided values for the whole Flow.

The same properties were added to the Task.

Appropriate Task properties were renamed to rebind and save_as.

Change-Id: I02eb02303a9701a13f1a54f06f20bbf9aebd1d04
This commit is contained in:
Anastasia Karpinska
2013-09-10 13:34:29 +03:00
committed by anastasia-karpinska
parent 7c2aeaed36
commit c73c097560
9 changed files with 289 additions and 23 deletions

View File

@@ -28,8 +28,8 @@ class TaskAction(base.Action):
def __init__(self, task, engine):
self._task = task
self._result_mapping = task.provides
self._args_mapping = task.requires
self._result_mapping = task.save_as
self._args_mapping = task.rebind
try:
self._id = engine.storage.get_uuid_by_name(self._task.name)
except exceptions.NotFound:

View File

@@ -81,3 +81,11 @@ class Flow(object):
def add(self, *items):
"""Adds a given item/items to this flow."""
raise NotImplementedError()
@abc.abstractproperty
def requires(self):
"""Browse flow requirements."""
@abc.abstractproperty
def provides(self):
"""Browse values provided by the flow."""

View File

@@ -45,3 +45,19 @@ class Flow(flow.Flow):
def __iter__(self):
for child in self._children:
yield child
@property
def provides(self):
provides = set()
for subflow in self._children:
provides.update(subflow.provides)
return provides
@property
def requires(self):
requires = set()
provides = set()
for subflow in self._children:
requires.update(subflow.requires - provides)
provides.update(subflow.provides)
return requires

View File

@@ -46,6 +46,20 @@ class Flow(flow.Flow):
self._count += 1
return self
@property
def provides(self):
provides = set()
for subflow in self:
provides.update(subflow.provides)
return provides
@property
def requires(self):
requires = set()
for subflow in self:
requires.update(subflow.requires)
return requires
def __len__(self):
return self._count

View File

@@ -105,12 +105,12 @@ class BaseTask(object):
# on existing before this task can be applied.
#
# Format is input_name:arg_name
self.requires = {}
self.rebind = {}
# An *immutable* output 'resource' name dict this task
# produces that other tasks may depend on this task providing.
#
# Format is output index:arg_name
self.provides = _save_as_to_mapping(provides)
self.save_as = _save_as_to_mapping(provides)
# This identifies the version of the task to be ran which
# can be useful in resuming older versions of tasks. Standard
# major, minor version semantics apply.
@@ -139,6 +139,14 @@ class BaseTask(object):
said reversion.
"""
@property
def provides(self):
return set(self.save_as)
@property
def requires(self):
return set(self.rebind.values())
class Task(BaseTask):
"""Base class for user-defined tasks
@@ -155,8 +163,8 @@ class Task(BaseTask):
name = reflection.get_callable_name(self)
super(Task, self).__init__(name,
provides=provides)
self.requires = _build_arg_mapping(self.name, requires, rebind,
self.execute, auto_extract)
self.rebind = _build_arg_mapping(self.name, requires, rebind,
self.execute, auto_extract)
class FunctorTask(BaseTask):
@@ -176,8 +184,8 @@ class FunctorTask(BaseTask):
self._revert = revert
if version is not None:
self.version = version
self.requires = _build_arg_mapping(self.name, requires, rebind,
execute, auto_extract)
self.rebind = _build_arg_mapping(self.name, requires, rebind,
execute, auto_extract)
def execute(self, *args, **kwargs):
return self._execute(*args, **kwargs)

View File

@@ -94,7 +94,7 @@ class MultiargsTask(task.Task):
class MultiDictTask(task.Task):
def execute(self):
output = {}
for i, k in enumerate(sorted(self.provides.keys())):
for i, k in enumerate(sorted(self.provides)):
output[k] = i
return output

View File

@@ -0,0 +1,220 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow import test
class TaskNoRequiresNoReturns(task.Task):
def execute(self, **kwargs):
pass
def revert(self, **kwargs):
pass
class TaskOneArg(task.Task):
def execute(self, x, **kwargs):
pass
def revert(self, x, **kwargs):
pass
class TaskMultiArg(task.Task):
def execute(self, x, y, z, **kwargs):
pass
def revert(self, x, y, z, **kwargs):
pass
class TaskOneReturn(task.Task):
def execute(self, **kwargs):
return 1
def revert(self, **kwargs):
pass
class TaskMultiReturn(task.Task):
def execute(self, **kwargs):
return 1, 3, 5
def revert(self, **kwargs):
pass
class TaskOneArgOneReturn(task.Task):
def execute(self, x, **kwargs):
return 1
def revert(self, x, **kwargs):
pass
class TaskMultiArgMultiReturn(task.Task):
def execute(self, x, y, z, **kwargs):
return 1, 3, 5
def revert(self, x, y, z, **kwargs):
pass
class FlowDependenciesTest(test.TestCase):
def test_task_without_dependencies(self):
flow = TaskNoRequiresNoReturns()
self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set())
def test_task_requires_default_values(self):
flow = TaskMultiArg()
self.assertEquals(flow.requires, set(['x', 'y', 'z']))
self.assertEquals(flow.provides, set())
def test_task_requires_rebinded_mapped(self):
flow = TaskMultiArg(rebind={'x': 'a', 'y': 'b', 'z': 'c'})
self.assertEquals(flow.requires, set(['a', 'b', 'c']))
self.assertEquals(flow.provides, set())
def test_task_requires_additional_values(self):
flow = TaskMultiArg(requires=['a', 'b'])
self.assertEquals(flow.requires, set(['a', 'b', 'x', 'y', 'z']))
self.assertEquals(flow.provides, set())
def test_task_provides_values(self):
flow = TaskMultiReturn(provides=['a', 'b', 'c'])
self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set(['a', 'b', 'c']))
def test_task_provides_and_requires_values(self):
flow = TaskMultiArgMultiReturn(provides=['a', 'b', 'c'])
self.assertEquals(flow.requires, set(['x', 'y', 'z']))
self.assertEquals(flow.provides, set(['a', 'b', 'c']))
def test_linear_flow_without_dependencies(self):
flow = lf.Flow('lf').add(
TaskNoRequiresNoReturns('task1'),
TaskNoRequiresNoReturns('task2'))
self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set())
def test_linear_flow_reuires_values(self):
flow = lf.Flow('lf').add(
TaskOneArg('task1'),
TaskMultiArg('task2'))
self.assertEquals(flow.requires, set(['x', 'y', 'z']))
self.assertEquals(flow.provides, set())
def test_linear_flow_reuires_rebind_values(self):
flow = lf.Flow('lf').add(
TaskOneArg('task1', rebind=['q']),
TaskMultiArg('task2'))
self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q']))
self.assertEquals(flow.provides, set())
def test_linear_flow_provides_values(self):
flow = lf.Flow('lf').add(
TaskOneReturn('task1', provides='x'),
TaskMultiReturn('task2', provides=['a', 'b', 'c']))
self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c']))
def test_linear_flow_provides_required_values(self):
flow = lf.Flow('lf').add(
TaskOneReturn('task1', provides='x'),
TaskOneArg('task2'))
self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set(['x']))
def test_linear_flow_multi_provides_and_requires_values(self):
flow = lf.Flow('lf').add(
TaskMultiArgMultiReturn('task1',
rebind=['a', 'b', 'c'],
provides=['x', 'y', 'q']),
TaskMultiArgMultiReturn('task2',
provides=['i', 'j', 'k']))
self.assertEquals(flow.requires, set(['a', 'b', 'c', 'z']))
self.assertEquals(flow.provides, set(['x', 'y', 'q', 'i', 'j', 'k']))
def test_unordered_flow_without_dependencies(self):
flow = uf.Flow('uf').add(
TaskNoRequiresNoReturns('task1'),
TaskNoRequiresNoReturns('task2'))
self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set())
def test_unordered_flow_reuires_values(self):
flow = uf.Flow('uf').add(
TaskOneArg('task1'),
TaskMultiArg('task2'))
self.assertEquals(flow.requires, set(['x', 'y', 'z']))
self.assertEquals(flow.provides, set())
def test_unordered_flow_reuires_rebind_values(self):
flow = uf.Flow('uf').add(
TaskOneArg('task1', rebind=['q']),
TaskMultiArg('task2'))
self.assertEquals(flow.requires, set(['x', 'y', 'z', 'q']))
self.assertEquals(flow.provides, set())
def test_unordered_flow_provides_values(self):
flow = uf.Flow('uf').add(
TaskOneReturn('task1', provides='x'),
TaskMultiReturn('task2', provides=['a', 'b', 'c']))
self.assertEquals(flow.requires, set())
self.assertEquals(flow.provides, set(['x', 'a', 'b', 'c']))
def test_unordered_flow_provides_required_values(self):
flow = uf.Flow('uf').add(
TaskOneReturn('task1', provides='x'),
TaskOneArg('task2'))
self.assertEquals(flow.requires, set(['x']))
self.assertEquals(flow.provides, set(['x']))
def test_unordered_flow_multi_provides_and_requires_values(self):
flow = uf.Flow('uf').add(
TaskMultiArgMultiReturn('task1',
rebind=['a', 'b', 'c'],
provides=['x', 'y', 'q']),
TaskMultiArgMultiReturn('task2',
provides=['i', 'j', 'k']))
self.assertEquals(flow.requires, set(['a', 'b', 'c', 'x', 'y', 'z']))
self.assertEquals(flow.provides, set(['x', 'y', 'q', 'i', 'j', 'k']))
def test_nested_flows_requirements(self):
flow = uf.Flow('uf').add(
lf.Flow('lf').add(
TaskOneArgOneReturn('task1', rebind=['a'], provides=['x']),
TaskOneArgOneReturn('task2', provides=['y'])),
uf.Flow('uf').add(
TaskOneArgOneReturn('task3', rebind=['b'], provides=['z']),
TaskOneArgOneReturn('task4', rebind=['c'], provides=['q'])))
self.assertEquals(flow.requires, set(['a', 'b', 'c']))
self.assertEquals(flow.provides, set(['x', 'y', 'z', 'q']))

View File

@@ -44,19 +44,19 @@ class TaskTestCase(test.TestCase):
def test_no_provides(self):
my_task = MyTask()
self.assertEquals(my_task.provides, {})
self.assertEquals(my_task.save_as, {})
def test_provides(self):
my_task = MyTask(provides='food')
self.assertEquals(my_task.provides, {'food': None})
self.assertEquals(my_task.save_as, {'food': None})
def test_multi_provides(self):
my_task = MyTask(provides=('food', 'water'))
self.assertEquals(my_task.provides, {'food': 0, 'water': 1})
self.assertEquals(my_task.save_as, {'food': 0, 'water': 1})
def test_unpack(self):
my_task = MyTask(provides=('food',))
self.assertEquals(my_task.provides, {'food': 0})
self.assertEquals(my_task.save_as, {'food': 0})
def test_bad_provides(self):
with self.assertRaisesRegexp(TypeError, '^Task provides'):
@@ -64,7 +64,7 @@ class TaskTestCase(test.TestCase):
def test_requires_by_default(self):
my_task = MyTask()
self.assertEquals(my_task.requires, {
self.assertEquals(my_task.rebind, {
'spam': 'spam',
'eggs': 'eggs',
'context': 'context'
@@ -72,7 +72,7 @@ class TaskTestCase(test.TestCase):
def test_requires_amended(self):
my_task = MyTask(requires=('spam', 'eggs'))
self.assertEquals(my_task.requires, {
self.assertEquals(my_task.rebind, {
'spam': 'spam',
'eggs': 'eggs',
'context': 'context'
@@ -81,7 +81,7 @@ class TaskTestCase(test.TestCase):
def test_requires_explicit(self):
my_task = MyTask(auto_extract=False,
requires=('spam', 'eggs', 'context'))
self.assertEquals(my_task.requires, {
self.assertEquals(my_task.rebind, {
'spam': 'spam',
'eggs': 'eggs',
'context': 'context'
@@ -93,7 +93,7 @@ class TaskTestCase(test.TestCase):
def test_rebind_all_args(self):
my_task = MyTask(rebind={'spam': 'a', 'eggs': 'b', 'context': 'c'})
self.assertEquals(my_task.requires, {
self.assertEquals(my_task.rebind, {
'spam': 'a',
'eggs': 'b',
'context': 'c'
@@ -101,7 +101,7 @@ class TaskTestCase(test.TestCase):
def test_rebind_partial(self):
my_task = MyTask(rebind={'spam': 'a', 'eggs': 'b'})
self.assertEquals(my_task.requires, {
self.assertEquals(my_task.rebind, {
'spam': 'a',
'eggs': 'b',
'context': 'context'
@@ -113,14 +113,14 @@ class TaskTestCase(test.TestCase):
def test_rebind_unknown_kwargs(self):
task = KwargsTask(rebind={'foo': 'bar'})
self.assertEquals(task.requires, {
self.assertEquals(task.rebind, {
'foo': 'bar',
'spam': 'spam'
})
def test_rebind_list_all(self):
my_task = MyTask(rebind=('a', 'b', 'c'))
self.assertEquals(my_task.requires, {
self.assertEquals(my_task.rebind, {
'context': 'a',
'spam': 'b',
'eggs': 'c'
@@ -128,7 +128,7 @@ class TaskTestCase(test.TestCase):
def test_rebind_list_partial(self):
my_task = MyTask(rebind=('a', 'b'))
self.assertEquals(my_task.requires, {
self.assertEquals(my_task.rebind, {
'context': 'a',
'spam': 'b',
'eggs': 'eggs'
@@ -140,7 +140,7 @@ class TaskTestCase(test.TestCase):
def test_rebind_list_more_kwargs(self):
task = KwargsTask(rebind=('a', 'b', 'c'))
self.assertEquals(task.requires, {
self.assertEquals(task.rebind, {
'spam': 'a',
'b': 'b',
'c': 'c'

View File

@@ -61,7 +61,7 @@ class ProvidesRequiresTask(task.Task):
return tuple(outs)
else:
outs = {}
for k in self.provides.keys():
for k in self.provides:
outs[k] = k
return outs