diff --git a/taskflow/flow.py b/taskflow/flow.py index 98b3c49f5..26d2dcfa5 100644 --- a/taskflow/flow.py +++ b/taskflow/flow.py @@ -43,16 +43,10 @@ class Flow(object): def __init__(self, name, retry=None): self._name = six.text_type(name) self._retry = retry - # If retry doesn't have a name, + # NOTE(akarpinska): if retry doesn't have a name, # the name of its owner will be assigned - if self._retry: - self._retry_provides = self.retry.provides - self._retry_requires = self.retry.requires - if not self._retry.name: + if self._retry and self._retry.name is None: self._retry.set_name(self.name + "_retry") - else: - self._retry_provides = set() - self._retry_requires = set() @property def name(self): @@ -66,6 +60,10 @@ class Flow(object): """ return self._retry + @abc.abstractmethod + def add(self, *items): + """Adds a given item/items to this flow.""" + @abc.abstractmethod def __len__(self): """Returns how many items are in this flow.""" @@ -90,14 +88,29 @@ class Flow(object): lines.append("%s" % (len(self))) return "; ".join(lines) - @abc.abstractmethod - def add(self, *items): - """Adds a given item/items to this flow.""" - - @abc.abstractproperty - def requires(self): - """Browse argument requirement names this flow requires to run.""" - - @abc.abstractproperty + @property def provides(self): - """Browse argument names provided by the flow.""" + """Set of result names provided by the flow. + + Includes names of all the outputs provided by atoms of this flow. + """ + provides = set() + if self._retry: + provides.update(self._retry.provides) + for subflow in self: + provides.update(subflow.provides) + return provides + + @property + def requires(self): + """Set of argument names required by the flow. + + Includes names of all the inputs required by atoms of this + flow, but not provided within the flow itself. + """ + requires = set() + if self._retry: + requires.update(self._retry.requires) + for subflow in self: + requires.update(subflow.requires) + return requires - self.provides diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 68691996d..0ed74c75a 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -104,8 +104,8 @@ class Flow(flow.Flow): if self.retry: update_requirements(self.retry) - provided.update(dict((k, - self.retry) for k in self._retry_provides)) + provided.update(dict((k, self.retry) + for k in self.retry.provides)) # NOTE(harlowja): Add items and edges to a temporary copy of the # underlying graph and only if that is successful added to do we then @@ -123,7 +123,7 @@ class Flow(flow.Flow): % dict(item=item.name, flow=provided[value].name, value=value)) - if value in self._retry_requires: + if self.retry and value in self.retry.requires: raise exc.DependencyFailure( "Flows retry controller %(retry)s requires %(value)s " "but item %(item)s being added to the flow produces " @@ -167,22 +167,6 @@ class Flow(flow.Flow): for (u, v, e_data) in self._get_subgraph().edges_iter(data=True): yield (u, v, e_data) - @property - def provides(self): - provides = set() - provides.update(self._retry_provides) - for subflow in self: - provides.update(subflow.provides) - return provides - - @property - def requires(self): - requires = set() - requires.update(self._retry_requires) - for subflow in self: - requires.update(subflow.requires) - return requires - self.provides - class TargetedFlow(Flow): """Graph flow with a target. diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index d7cbb549c..48b4d3cbe 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -78,22 +78,3 @@ class Flow(flow.Flow): for src, dst in zip(self._children[:-1], self._children[1:]): yield (src, dst, _LINK_METADATA.copy()) - - @property - def provides(self): - provides = set() - provides.update(self._retry_provides) - for subflow in self._children: - provides.update(subflow.provides) - return provides - - @property - def requires(self): - requires = set() - provides = set() - requires.update(self._retry_requires) - provides.update(self._retry_provides) - for subflow in self._children: - requires.update(subflow.requires - provides) - provides.update(subflow.provides) - return requires diff --git a/taskflow/patterns/unordered_flow.py b/taskflow/patterns/unordered_flow.py index 2890e80ef..a8377960b 100644 --- a/taskflow/patterns/unordered_flow.py +++ b/taskflow/patterns/unordered_flow.py @@ -41,12 +41,9 @@ class Flow(flow.Flow): if not items: return self - # NOTE(harlowja): check that items to be added are actually - # independent. - provides = set() - for subflow in self: - provides.update(subflow.provides) - + # check that items don't provide anything that other + # part of flow provides or requires + provides = self.provides old_requires = self.requires for item in items: item_provides = item.provides @@ -57,7 +54,7 @@ class Flow(flow.Flow): "by other item(s) of unordered flow %(flow)s" % dict(item=item.name, flow=self.name, oo=sorted(bad_provs))) - same_provides = (provides | self._retry_provides) & item.provides + same_provides = provides & item.provides if same_provides: raise exceptions.DependencyFailure( "%(item)s provides %(value)s but is already being" @@ -67,6 +64,11 @@ class Flow(flow.Flow): value=sorted(same_provides))) provides |= item.provides + # check that items don't require anything other children provides + if self.retry: + # NOTE(imelnikov): it is allowed to depend on value provided + # by retry controller of the flow + provides -= self.retry.provides for item in items: bad_reqs = provides & item.requires if bad_reqs: @@ -79,23 +81,6 @@ class Flow(flow.Flow): self._children.update(items) return self - @property - def provides(self): - provides = set() - provides.update(self._retry_provides) - for subflow in self: - provides.update(subflow.provides) - return provides - - @property - def requires(self): - requires = set() - for subflow in self: - requires.update(subflow.requires) - requires.update(self._retry_requires) - requires -= self._retry_provides - return requires - def __len__(self): return len(self._children)