Put provides and requires code to basic Flow

Code that calculates provides and requires for flow is almost
identical for all patterns, so this change makes it completely
identical and puts it to the base class. Other patterns are
still allowed to override these properties for sake of customization
or optimization.

Change-Id: I6e875e863047b5287ec727fc9a491f252f144ecf
This commit is contained in:
Ivan A. Melnikov
2014-05-06 15:26:46 +04:00
parent ee1e96d87d
commit 095650653d
4 changed files with 43 additions and 80 deletions

View File

@@ -43,16 +43,10 @@ class Flow(object):
def __init__(self, name, retry=None):
self._name = six.text_type(name)
self._retry = retry
# If retry doesn't have a name,
# NOTE(akarpinska): if retry doesn't have a name,
# the name of its owner will be assigned
if self._retry:
self._retry_provides = self.retry.provides
self._retry_requires = self.retry.requires
if not self._retry.name:
if self._retry and self._retry.name is None:
self._retry.set_name(self.name + "_retry")
else:
self._retry_provides = set()
self._retry_requires = set()
@property
def name(self):
@@ -66,6 +60,10 @@ class Flow(object):
"""
return self._retry
@abc.abstractmethod
def add(self, *items):
"""Adds a given item/items to this flow."""
@abc.abstractmethod
def __len__(self):
"""Returns how many items are in this flow."""
@@ -90,14 +88,29 @@ class Flow(object):
lines.append("%s" % (len(self)))
return "; ".join(lines)
@abc.abstractmethod
def add(self, *items):
"""Adds a given item/items to this flow."""
@abc.abstractproperty
def requires(self):
"""Browse argument requirement names this flow requires to run."""
@abc.abstractproperty
@property
def provides(self):
"""Browse argument names provided by the flow."""
"""Set of result names provided by the flow.
Includes names of all the outputs provided by atoms of this flow.
"""
provides = set()
if self._retry:
provides.update(self._retry.provides)
for subflow in self:
provides.update(subflow.provides)
return provides
@property
def requires(self):
"""Set of argument names required by the flow.
Includes names of all the inputs required by atoms of this
flow, but not provided within the flow itself.
"""
requires = set()
if self._retry:
requires.update(self._retry.requires)
for subflow in self:
requires.update(subflow.requires)
return requires - self.provides

View File

@@ -104,8 +104,8 @@ class Flow(flow.Flow):
if self.retry:
update_requirements(self.retry)
provided.update(dict((k,
self.retry) for k in self._retry_provides))
provided.update(dict((k, self.retry)
for k in self.retry.provides))
# NOTE(harlowja): Add items and edges to a temporary copy of the
# underlying graph and only if that is successful added to do we then
@@ -123,7 +123,7 @@ class Flow(flow.Flow):
% dict(item=item.name,
flow=provided[value].name,
value=value))
if value in self._retry_requires:
if self.retry and value in self.retry.requires:
raise exc.DependencyFailure(
"Flows retry controller %(retry)s requires %(value)s "
"but item %(item)s being added to the flow produces "
@@ -167,22 +167,6 @@ class Flow(flow.Flow):
for (u, v, e_data) in self._get_subgraph().edges_iter(data=True):
yield (u, v, e_data)
@property
def provides(self):
provides = set()
provides.update(self._retry_provides)
for subflow in self:
provides.update(subflow.provides)
return provides
@property
def requires(self):
requires = set()
requires.update(self._retry_requires)
for subflow in self:
requires.update(subflow.requires)
return requires - self.provides
class TargetedFlow(Flow):
"""Graph flow with a target.

View File

@@ -78,22 +78,3 @@ class Flow(flow.Flow):
for src, dst in zip(self._children[:-1],
self._children[1:]):
yield (src, dst, _LINK_METADATA.copy())
@property
def provides(self):
provides = set()
provides.update(self._retry_provides)
for subflow in self._children:
provides.update(subflow.provides)
return provides
@property
def requires(self):
requires = set()
provides = set()
requires.update(self._retry_requires)
provides.update(self._retry_provides)
for subflow in self._children:
requires.update(subflow.requires - provides)
provides.update(subflow.provides)
return requires

View File

@@ -41,12 +41,9 @@ class Flow(flow.Flow):
if not items:
return self
# NOTE(harlowja): check that items to be added are actually
# independent.
provides = set()
for subflow in self:
provides.update(subflow.provides)
# check that items don't provide anything that other
# part of flow provides or requires
provides = self.provides
old_requires = self.requires
for item in items:
item_provides = item.provides
@@ -57,7 +54,7 @@ class Flow(flow.Flow):
"by other item(s) of unordered flow %(flow)s"
% dict(item=item.name, flow=self.name,
oo=sorted(bad_provs)))
same_provides = (provides | self._retry_provides) & item.provides
same_provides = provides & item.provides
if same_provides:
raise exceptions.DependencyFailure(
"%(item)s provides %(value)s but is already being"
@@ -67,6 +64,11 @@ class Flow(flow.Flow):
value=sorted(same_provides)))
provides |= item.provides
# check that items don't require anything other children provides
if self.retry:
# NOTE(imelnikov): it is allowed to depend on value provided
# by retry controller of the flow
provides -= self.retry.provides
for item in items:
bad_reqs = provides & item.requires
if bad_reqs:
@@ -79,23 +81,6 @@ class Flow(flow.Flow):
self._children.update(items)
return self
@property
def provides(self):
provides = set()
provides.update(self._retry_provides)
for subflow in self:
provides.update(subflow.provides)
return provides
@property
def requires(self):
requires = set()
for subflow in self:
requires.update(subflow.requires)
requires.update(self._retry_requires)
requires -= self._retry_provides
return requires
def __len__(self):
return len(self._children)