From 4b4f94a65a7488db37d0295107a701c4909d927f Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Tue, 6 May 2014 15:26:46 +0400 Subject: [PATCH] Put provides and requires code to basic Flow Code that calculates provides and requires for flow is almost identical for all patterns, so this change makes it completely identical and puts it to the base class. Other patterns are still allowed to override these properties for sake of customization or optimization. Change-Id: I6e875e863047b5287ec727fc9a491f252f144ecf --- taskflow/flow.py | 49 ++++++++++++++++++----------- taskflow/patterns/graph_flow.py | 22 ++----------- taskflow/patterns/linear_flow.py | 19 ----------- taskflow/patterns/unordered_flow.py | 33 ++++++------------- 4 files changed, 43 insertions(+), 80 deletions(-) diff --git a/taskflow/flow.py b/taskflow/flow.py index 98b3c49f..26d2dcfa 100644 --- a/taskflow/flow.py +++ b/taskflow/flow.py @@ -43,16 +43,10 @@ class Flow(object): def __init__(self, name, retry=None): self._name = six.text_type(name) self._retry = retry - # If retry doesn't have a name, + # NOTE(akarpinska): if retry doesn't have a name, # the name of its owner will be assigned - if self._retry: - self._retry_provides = self.retry.provides - self._retry_requires = self.retry.requires - if not self._retry.name: + if self._retry and self._retry.name is None: self._retry.set_name(self.name + "_retry") - else: - self._retry_provides = set() - self._retry_requires = set() @property def name(self): @@ -66,6 +60,10 @@ class Flow(object): """ return self._retry + @abc.abstractmethod + def add(self, *items): + """Adds a given item/items to this flow.""" + @abc.abstractmethod def __len__(self): """Returns how many items are in this flow.""" @@ -90,14 +88,29 @@ class Flow(object): lines.append("%s" % (len(self))) return "; ".join(lines) - @abc.abstractmethod - def add(self, *items): - """Adds a given item/items to this flow.""" - - @abc.abstractproperty - def requires(self): - """Browse argument requirement names this flow requires to run.""" - - @abc.abstractproperty + @property def provides(self): - """Browse argument names provided by the flow.""" + """Set of result names provided by the flow. + + Includes names of all the outputs provided by atoms of this flow. + """ + provides = set() + if self._retry: + provides.update(self._retry.provides) + for subflow in self: + provides.update(subflow.provides) + return provides + + @property + def requires(self): + """Set of argument names required by the flow. + + Includes names of all the inputs required by atoms of this + flow, but not provided within the flow itself. + """ + requires = set() + if self._retry: + requires.update(self._retry.requires) + for subflow in self: + requires.update(subflow.requires) + return requires - self.provides diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 68691996..0ed74c75 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -104,8 +104,8 @@ class Flow(flow.Flow): if self.retry: update_requirements(self.retry) - provided.update(dict((k, - self.retry) for k in self._retry_provides)) + provided.update(dict((k, self.retry) + for k in self.retry.provides)) # NOTE(harlowja): Add items and edges to a temporary copy of the # underlying graph and only if that is successful added to do we then @@ -123,7 +123,7 @@ class Flow(flow.Flow): % dict(item=item.name, flow=provided[value].name, value=value)) - if value in self._retry_requires: + if self.retry and value in self.retry.requires: raise exc.DependencyFailure( "Flows retry controller %(retry)s requires %(value)s " "but item %(item)s being added to the flow produces " @@ -167,22 +167,6 @@ class Flow(flow.Flow): for (u, v, e_data) in self._get_subgraph().edges_iter(data=True): yield (u, v, e_data) - @property - def provides(self): - provides = set() - provides.update(self._retry_provides) - for subflow in self: - provides.update(subflow.provides) - return provides - - @property - def requires(self): - requires = set() - requires.update(self._retry_requires) - for subflow in self: - requires.update(subflow.requires) - return requires - self.provides - class TargetedFlow(Flow): """Graph flow with a target. diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index d7cbb549..48b4d3cb 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -78,22 +78,3 @@ class Flow(flow.Flow): for src, dst in zip(self._children[:-1], self._children[1:]): yield (src, dst, _LINK_METADATA.copy()) - - @property - def provides(self): - provides = set() - provides.update(self._retry_provides) - for subflow in self._children: - provides.update(subflow.provides) - return provides - - @property - def requires(self): - requires = set() - provides = set() - requires.update(self._retry_requires) - provides.update(self._retry_provides) - for subflow in self._children: - requires.update(subflow.requires - provides) - provides.update(subflow.provides) - return requires diff --git a/taskflow/patterns/unordered_flow.py b/taskflow/patterns/unordered_flow.py index 2890e80e..a8377960 100644 --- a/taskflow/patterns/unordered_flow.py +++ b/taskflow/patterns/unordered_flow.py @@ -41,12 +41,9 @@ class Flow(flow.Flow): if not items: return self - # NOTE(harlowja): check that items to be added are actually - # independent. - provides = set() - for subflow in self: - provides.update(subflow.provides) - + # check that items don't provide anything that other + # part of flow provides or requires + provides = self.provides old_requires = self.requires for item in items: item_provides = item.provides @@ -57,7 +54,7 @@ class Flow(flow.Flow): "by other item(s) of unordered flow %(flow)s" % dict(item=item.name, flow=self.name, oo=sorted(bad_provs))) - same_provides = (provides | self._retry_provides) & item.provides + same_provides = provides & item.provides if same_provides: raise exceptions.DependencyFailure( "%(item)s provides %(value)s but is already being" @@ -67,6 +64,11 @@ class Flow(flow.Flow): value=sorted(same_provides))) provides |= item.provides + # check that items don't require anything other children provides + if self.retry: + # NOTE(imelnikov): it is allowed to depend on value provided + # by retry controller of the flow + provides -= self.retry.provides for item in items: bad_reqs = provides & item.requires if bad_reqs: @@ -79,23 +81,6 @@ class Flow(flow.Flow): self._children.update(items) return self - @property - def provides(self): - provides = set() - provides.update(self._retry_provides) - for subflow in self: - provides.update(subflow.provides) - return provides - - @property - def requires(self): - requires = set() - for subflow in self: - requires.update(subflow.requires) - requires.update(self._retry_requires) - requires -= self._retry_provides - return requires - def __len__(self): return len(self._children)