Use 'node' terminology instead of 'item' terminology

Since graphs are composed of 'nodes' it seems more
appropriate to use that terminology (which is known
to people with graph experience) instead of the non
descriptive and/or standard 'item' terminology.

Change-Id: I04a4521386d3cdf7e58fb9fa8cf26c00443c2cf6
This commit is contained in:
Joshua Harlow
2015-03-30 12:27:24 -07:00
parent d24eebf194
commit bb0af4f81d
2 changed files with 50 additions and 47 deletions

View File

@@ -69,9 +69,9 @@ class Flow(flow.Flow):
def link(self, u, v):
"""Link existing node u as a runtime dependency of existing node v."""
if not self._graph.has_node(u):
raise ValueError('Item %s not found to link from' % (u))
raise ValueError("Node '%s' not found to link from" % (u))
if not self._graph.has_node(v):
raise ValueError('Item %s not found to link to' % (v))
raise ValueError("Node '%s' not found to link to" % (v))
self._swap(self._link(u, v, manual=True))
return self
@@ -105,34 +105,38 @@ class Flow(flow.Flow):
direct access to the underlying graph).
"""
if not graph.is_directed_acyclic():
raise exc.DependencyFailure("No path through the items in the"
raise exc.DependencyFailure("No path through the node(s) in the"
" graph produces an ordering that"
" will allow for logical"
" edge traversal")
self._graph = graph.freeze()
def add(self, *items, **kwargs):
def add(self, *nodes, **kwargs):
"""Adds a given task/tasks/flow/flows to this flow.
:param items: items to add to the flow
:param nodes: node(s) to add to the flow
:param kwargs: keyword arguments, the two keyword arguments
currently processed are:
* ``resolve_requires`` a boolean that when true (the
default) implies that when items are added their
symbol requirements will be matched to existing items
and links will be automatically made to those
default) implies that when node(s) are added their
symbol requirements will be matched to existing
node(s) and links will be automatically made to those
providers. If multiple possible providers exist
then a AmbiguousDependency exception will be raised.
* ``resolve_existing``, a boolean that when true (the
default) implies that on addition of a new item that
existing items will have their requirements scanned
for symbols that this newly added item can provide.
default) implies that on addition of a new node that
existing node(s) will have their requirements scanned
for symbols that this newly added node can provide.
If a match is found a link is automatically created
from the newly added item to the requiree.
from the newly added node to the requiree.
"""
items = [i for i in items if not self._graph.has_node(i)]
if not items:
# Let's try to avoid doing any work if we can; since the below code
# after this filter can create more temporary graphs that aren't needed
# if the nodes already exist...
nodes = [i for i in nodes if not self._graph.has_node(i)]
if not nodes:
return self
# This syntax will *hopefully* be better in future versions of python.
@@ -154,23 +158,23 @@ class Flow(flow.Flow):
retry_provides.add(value)
provided[value].append(self._retry)
for item in self._graph.nodes_iter():
for value in self._unsatisfied_requires(item, self._graph,
for node in self._graph.nodes_iter():
for value in self._unsatisfied_requires(node, self._graph,
retry_provides):
required[value].append(item)
for value in item.provides:
provided[value].append(item)
required[value].append(node)
for value in node.provides:
provided[value].append(node)
# NOTE(harlowja): Add items and edges to a temporary copy of the
# NOTE(harlowja): Add node(s) and edge(s) to a temporary copy of the
# underlying graph and only if that is successful added to do we then
# swap with the underlying graph.
tmp_graph = gr.DiGraph(self._graph)
for item in items:
tmp_graph.add_node(item)
for node in nodes:
tmp_graph.add_node(node)
# Try to find a valid provider.
if resolve_requires:
for value in self._unsatisfied_requires(item, tmp_graph,
for value in self._unsatisfied_requires(node, tmp_graph,
retry_provides):
if value in provided:
providers = provided[value]
@@ -178,28 +182,28 @@ class Flow(flow.Flow):
provider_names = [n.name for n in providers]
raise exc.AmbiguousDependency(
"Resolution error detected when"
" adding %(item)s, multiple"
" adding '%(node)s', multiple"
" providers %(providers)s found for"
" required symbol '%(value)s'"
% dict(item=item.name,
% dict(node=node.name,
providers=sorted(provider_names),
value=value))
else:
self._link(providers[0], item,
self._link(providers[0], node,
graph=tmp_graph, reason=value)
else:
required[value].append(item)
required[value].append(node)
for value in item.provides:
provided[value].append(item)
for value in node.provides:
provided[value].append(node)
# See if what we provide fulfills any existing requiree.
if resolve_existing:
for value in item.provides:
for value in node.provides:
if value in required:
for requiree in list(required[value]):
if requiree is not item:
self._link(item, requiree,
if requiree is not node:
self._link(node, requiree,
graph=tmp_graph, reason=value)
required[value].remove(requiree)
@@ -233,8 +237,8 @@ class Flow(flow.Flow):
requires.update(self._retry.requires)
retry_provides.update(self._retry.provides)
g = self._get_subgraph()
for item in g.nodes_iter():
requires.update(self._unsatisfied_requires(item, g,
for node in g.nodes_iter():
requires.update(self._unsatisfied_requires(node, g,
retry_provides))
return frozenset(requires)
@@ -251,29 +255,28 @@ class TargetedFlow(Flow):
self._subgraph = None
self._target = None
def set_target(self, target_item):
def set_target(self, target_node):
"""Set target for the flow.
Any items (tasks or subflows) not needed for the target
item will not be executed.
Any node(s) (tasks or subflows) not needed for the target
node will not be executed.
"""
if not self._graph.has_node(target_item):
raise ValueError('Item %s not found' % target_item)
self._target = target_item
if not self._graph.has_node(target_node):
raise ValueError("Node '%s' not found" % target_node)
self._target = target_node
self._subgraph = None
def reset_target(self):
"""Reset target for the flow.
All items of the flow will be executed.
All node(s) of the flow will be executed.
"""
self._target = None
self._subgraph = None
def add(self, *items):
def add(self, *nodes):
"""Adds a given task/tasks/flow/flows to this flow."""
super(TargetedFlow, self).add(*items)
super(TargetedFlow, self).add(*nodes)
# reset cached subgraph, in case it was affected
self._subgraph = None
return self

View File

@@ -182,14 +182,14 @@ class GraphFlowTest(test.TestCase):
task1 = _task('task1')
task2 = _task('task2')
f = gf.Flow('test').add(task2)
self.assertRaisesRegexp(ValueError, 'Item .* not found to link from',
self.assertRaisesRegexp(ValueError, 'Node .* not found to link from',
f.link, task1, task2)
def test_graph_flow_link_to_unknown_node(self):
task1 = _task('task1')
task2 = _task('task2')
f = gf.Flow('test').add(task1)
self.assertRaisesRegexp(ValueError, 'Item .* not found to link to',
self.assertRaisesRegexp(ValueError, 'Node .* not found to link to',
f.link, task1, task2)
def test_graph_flow_link_raises_on_cycle(self):
@@ -245,7 +245,7 @@ class TargetedGraphFlowTest(test.TestCase):
task1 = _task('task1', provides=['a'], requires=[])
task2 = _task('task2', provides=['b'], requires=['a'])
f.add(task1)
self.assertRaisesRegexp(ValueError, '^Item .* not found',
self.assertRaisesRegexp(ValueError, '^Node .* not found',
f.set_target, task2)
def test_targeted_flow_one_node(self):