From 64583e075f41a5a239de15d6d521ce4bfde69a7d Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 6 Aug 2015 17:18:45 -0700 Subject: [PATCH] Use graphs as the underlying structure of patterns This unifies all the patterns to be graph based so that they are more uniform and there underlying constraints are more easy to understand (taskflow basically processes graphs). Change-Id: Ib2ab07c1c87165cf40a06508128010887f658391 --- taskflow/patterns/graph_flow.py | 2 +- taskflow/patterns/linear_flow.py | 43 ++++++++---- taskflow/patterns/unordered_flow.py | 25 ++++--- .../tests/unit/patterns/test_linear_flow.py | 2 +- taskflow/types/graph.py | 68 +++++++++++++++---- 5 files changed, 96 insertions(+), 44 deletions(-) diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index c0745e1e..c769124f 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -67,7 +67,7 @@ class Flow(flow.Flow): def __init__(self, name, retry=None): super(Flow, self).__init__(name, retry) - self._graph = gr.DiGraph() + self._graph = gr.DiGraph(name=name) self._graph.freeze() #: Extracts the unsatisified symbol requirements of a single node. diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index f581ce45..747f4d26 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -15,9 +15,7 @@ # under the License. from taskflow import flow - - -_LINK_METADATA = {flow.LINK_INVARIANT: True} +from taskflow.types import graph as gr class Flow(flow.Flow): @@ -28,22 +26,37 @@ class Flow(flow.Flow): the reverse order that the *tasks/flows* have been applied in. """ + _no_last_item = object() + """Sentinel object used to denote no last item has been assigned. + + This is used to track no last item being added, since at creation there + is no last item, but since the :meth:`.add` routine can take any object + including none, we have to use a different object to be able to + distinguish the lack of any last item... + """ + def __init__(self, name, retry=None): super(Flow, self).__init__(name, retry) - self._children = [] + self._graph = gr.OrderedDiGraph(name=name) + self._last_item = self._no_last_item def add(self, *items): """Adds a given task/tasks/flow/flows to this flow.""" - items = [i for i in items if i not in self._children] - self._children.extend(items) + for item in items: + if not self._graph.has_node(item): + self._graph.add_node(item) + if self._last_item is not self._no_last_item: + self._graph.add_edge(self._last_item, item, + attr_dict={flow.LINK_INVARIANT: True}) + self._last_item = item return self def __len__(self): - return len(self._children) + return len(self._graph) def __iter__(self): - for child in self._children: - yield child + for item in self._graph.nodes_iter(): + yield item @property def requires(self): @@ -57,10 +70,10 @@ class Flow(flow.Flow): prior_provides.update(item.provides) return frozenset(requires) - def iter_links(self): - for src, dst in zip(self._children[:-1], self._children[1:]): - yield (src, dst, _LINK_METADATA.copy()) - def iter_nodes(self): - for n in self._children: - yield (n, {}) + for (n, n_data) in self._graph.nodes_iter(data=True): + yield (n, n_data) + + def iter_links(self): + for (u, v, e_data) in self._graph.edges_iter(data=True): + yield (u, v, e_data) diff --git a/taskflow/patterns/unordered_flow.py b/taskflow/patterns/unordered_flow.py index 036ca2f9..3de005c6 100644 --- a/taskflow/patterns/unordered_flow.py +++ b/taskflow/patterns/unordered_flow.py @@ -15,6 +15,7 @@ # under the License. from taskflow import flow +from taskflow.types import graph as gr class Flow(flow.Flow): @@ -26,31 +27,29 @@ class Flow(flow.Flow): def __init__(self, name, retry=None): super(Flow, self).__init__(name, retry) - # NOTE(imelnikov): A unordered flow is unordered, so we use - # set instead of list to save children, children so that - # people using it don't depend on the ordering. - self._children = set() + self._graph = gr.Graph(name=name) def add(self, *items): """Adds a given task/tasks/flow/flows to this flow.""" - self._children.update(items) + for item in items: + if not self._graph.has_node(item): + self._graph.add_node(item) return self def __len__(self): - return len(self._children) + return len(self._graph) def __iter__(self): - for child in self._children: - yield child + for item in self._graph: + yield item def iter_links(self): - # NOTE(imelnikov): children in unordered flow have no dependencies - # between each other due to invariants retained during construction. - return iter(()) + for (u, v, e_data) in self._graph.edges_iter(data=True): + yield (u, v, e_data) def iter_nodes(self): - for n in self._children: - yield (n, {}) + for n, n_data in self._graph.nodes_iter(data=True): + yield (n, n_data) @property def requires(self): diff --git a/taskflow/tests/unit/patterns/test_linear_flow.py b/taskflow/tests/unit/patterns/test_linear_flow.py index fa39e173..05f4253a 100644 --- a/taskflow/tests/unit/patterns/test_linear_flow.py +++ b/taskflow/tests/unit/patterns/test_linear_flow.py @@ -138,4 +138,4 @@ class LinearFlowTest(test.TestCase): for (u, v, data) in f.iter_links(): self.assertTrue(u in tasks) self.assertTrue(v in tasks) - self.assertDictEqual(lf._LINK_METADATA, data) + self.assertDictEqual({'invariant': True}, data) diff --git a/taskflow/types/graph.py b/taskflow/types/graph.py index 53eddba6..7462c9bd 100644 --- a/taskflow/types/graph.py +++ b/taskflow/types/graph.py @@ -21,8 +21,49 @@ import networkx as nx import six +def _common_format(g, edge_notation): + lines = [] + lines.append("Name: %s" % g.name) + lines.append("Type: %s" % type(g).__name__) + lines.append("Frozen: %s" % nx.is_frozen(g)) + lines.append("Density: %0.3f" % nx.density(g)) + lines.append("Nodes: %s" % g.number_of_nodes()) + for n in g.nodes_iter(): + lines.append(" - %s" % n) + lines.append("Edges: %s" % g.number_of_edges()) + for (u, v, e_data) in g.edges_iter(data=True): + if e_data: + lines.append(" %s %s %s (%s)" % (u, edge_notation, v, e_data)) + else: + lines.append(" %s %s %s" % (u, edge_notation, v)) + return lines + + +class Graph(nx.Graph): + """A graph subclass with useful utility functions.""" + + def __init__(self, data=None, name=''): + super(Graph, self).__init__(name=name, data=data) + self.frozen = False + + def freeze(self): + """Freezes the graph so that no more mutations can occur.""" + if not self.frozen: + nx.freeze(self) + return self + + def export_to_dot(self): + """Exports the graph to a dot format (requires pydot library).""" + return nx.to_pydot(self).to_string() + + def pformat(self): + """Pretty formats your graph into a string.""" + return os.linesep.join(_common_format(self, "<->")) + + class DiGraph(nx.DiGraph): """A directed graph subclass with useful utility functions.""" + def __init__(self, data=None, name=''): super(DiGraph, self).__init__(name=name, data=data) self.frozen = False @@ -56,20 +97,7 @@ class DiGraph(nx.DiGraph): details about your graph, including; name, type, frozeness, node count, nodes, edge count, edges, graph density and graph cycles (if any). """ - lines = [] - lines.append("Name: %s" % self.name) - lines.append("Type: %s" % type(self).__name__) - lines.append("Frozen: %s" % nx.is_frozen(self)) - lines.append("Nodes: %s" % self.number_of_nodes()) - for n in self.nodes_iter(): - lines.append(" - %s" % n) - lines.append("Edges: %s" % self.number_of_edges()) - for (u, v, e_data) in self.edges_iter(data=True): - if e_data: - lines.append(" %s -> %s (%s)" % (u, v, e_data)) - else: - lines.append(" %s -> %s" % (u, v)) - lines.append("Density: %0.3f" % nx.density(self)) + lines = _common_format(self, "->") cycles = list(nx.cycles.recursive_simple_cycles(self)) lines.append("Cycles: %s" % len(cycles)) for cycle in cycles: @@ -122,6 +150,18 @@ class DiGraph(nx.DiGraph): queue.append(pred_pred) +class OrderedDiGraph(DiGraph): + """A directed graph subclass with useful utility functions. + + This derivative retains node, edge, insertation and iteration + ordering (so that the iteration order matches the insertation + order). + """ + node_dict_factory = collections.OrderedDict + adjlist_dict_factory = collections.OrderedDict + edge_attr_dict_factory = collections.OrderedDict + + def merge_graphs(graph, *graphs, **kwargs): """Merges a bunch of graphs into a new graph.