From 1623dbb01ed5f1f9e6e6c595a8993f3776f285ef Mon Sep 17 00:00:00 2001 From: Anastasia Karpinska Date: Wed, 18 Sep 2013 18:21:15 +0300 Subject: [PATCH] Graph flow, sequential graph action Change-Id: I07cc820aa2f37d0f9599f34efab07b28cf47ca48 --- taskflow/engines/action_engine/engine.py | 21 +- .../engines/action_engine/graph_action.py | 81 +++++++ taskflow/examples/graph_flow.py | 75 +++++++ taskflow/exceptions.py | 5 + taskflow/patterns/graph_flow.py | 198 ++++++++---------- taskflow/tests/unit/test_action_engine.py | 114 +++++++++- 6 files changed, 378 insertions(+), 116 deletions(-) create mode 100644 taskflow/engines/action_engine/graph_action.py create mode 100644 taskflow/examples/graph_flow.py diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 3bdb4361..d60c974c 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -21,10 +21,12 @@ import threading from concurrent import futures +from taskflow.engines.action_engine import graph_action from taskflow.engines.action_engine import parallel_action from taskflow.engines.action_engine import seq_action from taskflow.engines.action_engine import task_action +from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf @@ -137,7 +139,8 @@ class SingleThreadedTranslator(Translator): def _factory_map(self): return [(lf.Flow, self._translate_sequential), - (uf.Flow, self._translate_sequential)] + (uf.Flow, self._translate_sequential), + (gf.Flow, self._translate_graph)] def _translate_sequential(self, pattern): action = seq_action.SequentialAction() @@ -145,6 +148,12 @@ class SingleThreadedTranslator(Translator): action.add(self.translate(p)) return action + def _translate_graph(self, pattern): + action = graph_action.SequentialGraphAction(pattern.graph) + for p in pattern: + action.add(p, self.translate(p)) + return action + class SingleThreadedActionEngine(ActionEngine): translator_cls = SingleThreadedTranslator @@ -163,7 +172,8 @@ class MultiThreadedTranslator(Translator): def _factory_map(self): return [(lf.Flow, self._translate_sequential), # unordered can be run in parallel - (uf.Flow, self._translate_parallel)] + (uf.Flow, self._translate_parallel), + (gf.Flow, self._translate_graph)] def _translate_sequential(self, pattern): action = seq_action.SequentialAction() @@ -177,6 +187,13 @@ class MultiThreadedTranslator(Translator): action.add(self.translate(p)) return action + def _translate_graph(self, pattern): + # TODO(akarpinska): replace with parallel graph later + action = graph_action.SequentialGraphAction(pattern.graph) + for p in pattern: + action.add(p, self.translate(p)) + return action + class MultiThreadedActionEngine(ActionEngine): translator_cls = MultiThreadedTranslator diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py new file mode 100644 index 00000000..ac4d7f58 --- /dev/null +++ b/taskflow/engines/action_engine/graph_action.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow.engines.action_engine import base_action as base + + +class GraphAction(base.Action): + + def __init__(self, graph): + self._graph = graph + self._action_mapping = {} + + def add(self, node, action): + self._action_mapping[node] = action + + def _succ(self, node): + return self._graph.successors(node) + + def _pred(self, node): + return self._graph.predecessors(node) + + def _resolve_dependencies(self, node, deps_counter, revert=False): + to_execute = [] + nodes = self._pred(node) if revert else self._succ(node) + for next_node in nodes: + deps_counter[next_node] -= 1 + if not deps_counter[next_node]: + to_execute.append(next_node) + return to_execute + + def _browse_nodes_to_execute(self, deps_counter): + to_execute = [] + for node, deps in deps_counter.items(): + if not deps: + to_execute.append(node) + return to_execute + + def _get_nodes_dependencies_count(self, revert=False): + deps_counter = {} + for node in self._graph.nodes_iter(): + nodes = self._succ(node) if revert else self._pred(node) + deps_counter[node] = len(nodes) + return deps_counter + + +class SequentialGraphAction(GraphAction): + + def execute(self, engine): + deps_counter = self._get_nodes_dependencies_count() + to_execute = self._browse_nodes_to_execute(deps_counter) + + while to_execute: + node = to_execute.pop() + action = self._action_mapping[node] + action.execute(engine) # raises on failure + to_execute += self._resolve_dependencies(node, deps_counter) + + def revert(self, engine): + deps_counter = self._get_nodes_dependencies_count(True) + to_revert = self._browse_nodes_to_execute(deps_counter) + + while to_revert: + node = to_revert.pop() + action = self._action_mapping[node] + action.revert(engine) # raises on failure + to_revert += self._resolve_dependencies(node, deps_counter, True) diff --git a/taskflow/examples/graph_flow.py b/taskflow/examples/graph_flow.py new file mode 100644 index 00000000..5c2b68f4 --- /dev/null +++ b/taskflow/examples/graph_flow.py @@ -0,0 +1,75 @@ +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +my_dir_path = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), + os.pardir)) + +from taskflow.engines.action_engine import engine as eng +from taskflow.patterns import graph_flow as gf +from taskflow.patterns import linear_flow as lf +from taskflow import task + + +# In this example there are complex dependencies between +# tasks. User shouldn't care about ordering the Tasks. +# GraphFlow resolves dependencies automatically using tasks' +# requirements and provided values. +# Flows of any types can be nested into Graph flow. Subflows +# dependencies will be resolved too. + + +class Adder(task.Task): + + def execute(self, x, y): + return x + y + + +flow = gf.Flow('root').add( + lf.Flow('nested_linear').add( + # x2 = y3+y4 = 12 + Adder("add2", provides='x2', rebind=['y3', 'y4']), + # x1 = y1+y2 = 4 + Adder("add1", provides='x1', rebind=['y1', 'y2']) + ), + # x5 = x1+x3 = 20 + Adder("add5", provides='x5', rebind=['x1', 'x3']), + # x3 = x1+x2 = 16 + Adder("add3", provides='x3', rebind=['x1', 'x2']), + # x4 = x2+y5 = 21 + Adder("add4", provides='x4', rebind=['x2', 'y5']), + # x6 = x5+x4 = 41 + Adder("add6", provides='x6', rebind=['x5', 'x4']), + # x7 = x6+x6 = 82 + Adder("add7", provides='x7', rebind=['x6', 'x6'])) + +single_threaded_engine = eng.SingleThreadedActionEngine(flow) +single_threaded_engine.storage.inject({ + "y1": 1, + "y2": 3, + "y3": 5, + "y4": 7, + "y5": 9, +}) + +single_threaded_engine.run() + +print ("Single threaded engine result %s" % + single_threaded_engine.storage.fetch_all()) + +multi_threaded_engine = eng.MultiThreadedActionEngine(flow) +multi_threaded_engine.storage.inject({ + "y1": 1, + "y2": 3, + "y3": 5, + "y4": 7, + "y5": 9, +}) + +multi_threaded_engine.run() + +print ("Multi threaded engine result %s" % + multi_threaded_engine.storage.fetch_all()) diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 31fd1ff0..1fcb1c34 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -112,3 +112,8 @@ class MissingDependencies(InvalidStateException): message = self.message % {'who': who, 'requirements': requirements} super(MissingDependencies, self).__init__(message) self.missing_requirements = requirements + + +class DependencyFailure(TaskFlowException): + """Raised when flow can't resolve dependency.""" + pass diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 0cdcf048..40b6e685 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -16,132 +16,108 @@ # License for the specific language governing permissions and limitations # under the License. -import logging +import collections from networkx.algorithms import dag from networkx.classes import digraph -from networkx import exception as g_exc -from taskflow import decorators from taskflow import exceptions as exc -from taskflow.patterns import linear_flow -from taskflow.utils import graph_utils -from taskflow.utils import misc - -LOG = logging.getLogger(__name__) +from taskflow import flow -class Flow(linear_flow.Flow): - """A extension of the linear flow which will run the associated tasks in - a linear topological ordering (and reverse using the same linear - topological order). +class Flow(flow.Flow): + """Graph flow pattern + + Nested flows will be executed according to their dependencies + that will be resolved using data tasks provide and require. + + Note: Cyclic dependencies are not allowed. """ - def __init__(self, name, parents=None, uuid=None): - super(Flow, self).__init__(name, parents, uuid) + def __init__(self, name, uuid=None): + super(Flow, self).__init__(name, uuid) self._graph = digraph.DiGraph() - @decorators.locked - def add(self, task, infer=True): - # Only insert the node to start, connect all the edges - # together later after all nodes have been added since if we try - # to infer the edges at this stage we likely will fail finding - # dependencies from nodes that don't exist. - r = misc.AOTRunner(task) - self._graph.add_node(r, uuid=r.uuid, infer=infer) - self._reset_internals() - return r.uuid + def link(self, u, v): + if not self._graph.has_node(u): + raise ValueError('Item %s not found to link from' % (u)) + if not self._graph.has_node(v): + raise ValueError('Item %s not found to link to' % (v)) + self._graph.add_edge(u, v) - def _find_uuid(self, uuid): - runner = None - for r in self._graph.nodes_iter(): - if r.uuid == uuid: - runner = r - break - return runner + # Ensure that there is a valid topological ordering. + if not dag.is_directed_acyclic_graph(self._graph): + self._graph.remove_edge(u, v) + raise exc.DependencyFailure("No path through the items in the" + " graph produces an ordering that" + " will allow for correct dependency" + " resolution") + + def add(self, *items): + """Adds a given task/tasks/flow/flows to this flow.""" + requirements = collections.defaultdict(list) + provided = {} + + def update_requirements(node): + for value in node.requires: + requirements[value].append(node) + + for node in self: + update_requirements(node) + for value in node.provides: + provided[value] = node + + try: + for item in items: + self._graph.add_node(item) + update_requirements(item) + for value in item.provides: + if value in provided: + raise exc.DependencyFailure( + "%(item)s provides %(value)s but is already being" + " provided by %(flow)s and duplicate producers" + " are disallowed" + % dict(item=item.name, + flow=provided[value].name, + value=value)) + provided[value] = item + + for value in item.requires: + if value in provided: + self.link(provided[value], item) + + for value in item.provides: + if value in requirements: + for node in requirements[value]: + self.link(item, node) + + except Exception: + self._graph.remove_nodes_from(items) + raise + + return self def __len__(self): - return len(self._graph) + return self._graph.number_of_nodes() - @decorators.locked - def add_dependency(self, provider_uuid, requirer_uuid): - """Connects provider to requirer where provider will now be required - to run before requirer does. - """ - if provider_uuid == requirer_uuid: - raise ValueError("Unable to link %s to itself" % provider_uuid) - provider = self._find_uuid(provider_uuid) - if not provider: - raise ValueError("No provider found with uuid %s" % provider_uuid) - requirer = self._find_uuid(requirer_uuid) - if not requirer: - raise ValueError("No requirer found with uuid %s" % requirer_uuid) - self._add_dependency(provider, requirer, reason='manual') - self._reset_internals() + def __iter__(self): + for child in self._graph.nodes_iter(): + yield child - def _add_dependency(self, provider, requirer, reason): - self._graph.add_edge(provider, requirer, reason=reason) + @property + def provides(self): + provides = set() + for subflow in self: + provides.update(subflow.provides) + return provides - def __str__(self): - lines = ["GraphFlow: %s" % (self.name)] - lines.append("%s" % (self.uuid)) - lines.append("%s" % (self._graph.number_of_nodes())) - lines.append("%s" % (self._graph.number_of_edges())) - lines.append("%s" % (len(self.parents))) - lines.append("%s" % (self.state)) - return "; ".join(lines) + @property + def requires(self): + requires = set() + for subflow in self: + requires.update(subflow.requires) + return requires - self.provides - def _reset_internals(self): - super(Flow, self)._reset_internals() - self._runners = [] - - @decorators.locked - def remove(self, uuid): - runner = self._find_uuid(uuid) - if not runner: - raise ValueError("No uuid %s found" % (uuid)) - else: - self._graph.remove_node(runner) - self._reset_internals() - - def _ordering(self): - try: - return iter(self._connect()) - except g_exc.NetworkXUnfeasible: - raise exc.InvalidStateException("Unable to correctly determine " - "the path through the provided " - "flow which will satisfy the " - "tasks needed inputs and outputs.") - - def _connect(self): - """Connects the nodes & edges of the graph together by examining who - the requirements of each node and finding another node that will - create said dependency. - """ - if len(self._graph) == 0: - return [] - if self._connected: - return self._runners - - # Clear out all automatically added edges since we want to do a fresh - # connections. Leave the manually connected ones intact so that users - # still retain the dependencies they established themselves. - def discard_edge_func(u, v, e_data): - if e_data and e_data.get('reason') != 'manual': - return True - return False - - # Link providers to requirers. - graph_utils.connect(self._graph, discard_func=discard_edge_func) - - # Now figure out the order so that we can give the runners there - # optional item providers as well as figure out the topological run - # order. - run_order = dag.topological_sort(self._graph) - run_stack = [] - for r in run_order: - r.runs_before = list(reversed(run_stack)) - run_stack.append(r) - self._runners = run_order - self._connected = True - return run_order + @property + def graph(self): + return self._graph diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index f8e5cd71..eaf9b76e 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -21,6 +21,7 @@ import time from concurrent import futures +from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf @@ -36,10 +37,10 @@ from taskflow import test class TestTask(task.Task): - def __init__(self, values=None, name=None, - sleep=None, provides=None, rebind=None): + def __init__(self, values=None, name=None, sleep=None, + provides=None, rebind=None, requires=None): super(TestTask, self).__init__(name=name, provides=provides, - rebind=rebind) + rebind=rebind, requires=requires) if values is None: self.values = [] else: @@ -435,9 +436,115 @@ class EngineParallelFlowTest(EngineTestBase): {'x1': 17, 'x2': 5}) +class EngineGraphFlowTest(EngineTestBase): + + def test_graph_flow_one_task(self): + flow = gf.Flow('g-1').add( + TestTask(self.values, name='task1') + ) + self._make_engine(flow).run() + self.assertEquals(self.values, ['task1']) + + def test_graph_flow_two_independent_tasks(self): + flow = gf.Flow('g-2').add( + TestTask(self.values, name='task1'), + TestTask(self.values, name='task2') + ) + self._make_engine(flow).run() + self.assertEquals(set(self.values), set(['task1', 'task2'])) + + def test_graph_flow_two_tasks(self): + flow = gf.Flow('g-1-1').add( + TestTask(self.values, name='task2', requires=['a']), + TestTask(self.values, name='task1', provides='a') + ) + self._make_engine(flow).run() + self.assertEquals(self.values, ['task1', 'task2']) + + def test_graph_flow_four_tasks_added_separately(self): + flow = (gf.Flow('g-4') + .add(TestTask(self.values, name='task4', + provides='d', requires=['c'])) + .add(TestTask(self.values, name='task2', + provides='b', requires=['a'])) + .add(TestTask(self.values, name='task3', + provides='c', requires=['b'])) + .add(TestTask(self.values, name='task1', + provides='a')) + ) + self._make_engine(flow).run() + self.assertEquals(self.values, ['task1', 'task2', 'task3', 'task4']) + + def test_graph_cyclic_dependency(self): + with self.assertRaisesRegexp(exceptions.DependencyFailure, '^No path'): + gf.Flow('g-3-cyclic').add( + TestTask([], name='task1', provides='a', requires=['b']), + TestTask([], name='task2', provides='b', requires=['c']), + TestTask([], name='task3', provides='c', requires=['a'])) + + def test_graph_two_tasks_returns_same_value(self): + with self.assertRaisesRegexp(exceptions.DependencyFailure, + "task2 provides a but is already being" + " provided by task1 and duplicate" + " producers are disallowed"): + gf.Flow('g-2-same-value').add( + TestTask([], name='task1', provides='a'), + TestTask([], name='task2', provides='a')) + + def test_graph_flow_four_tasks_revert(self): + flow = gf.Flow('g-4-failing').add( + TestTask(self.values, name='task4', provides='d', requires=['c']), + TestTask(self.values, name='task2', provides='b', requires=['a']), + FailingTask(self.values, name='task3', + provides='c', requires=['b']), + TestTask(self.values, name='task1', provides='a')) + + engine = self._make_engine(flow) + with self.assertRaisesRegexp(RuntimeError, '^Woot'): + engine.run() + self.assertEquals(self.values, + ['task1', 'task2', + 'task3 reverted(Failure: RuntimeError: Woot!)', + 'task2 reverted(5)', 'task1 reverted(5)']) + + def test_graph_flow_four_tasks_revert_failure(self): + flow = gf.Flow('g-3-nasty').add( + NastyTask(name='task2', provides='b', requires=['a']), + FailingTask(self.values, name='task3', requires=['b']), + TestTask(self.values, name='task1', provides='a')) + + engine = self._make_engine(flow) + with self.assertRaisesRegexp(RuntimeError, '^Gotcha'): + engine.run() + + def test_graph_flow_with_multireturn_and_multiargs_tasks(self): + flow = gf.Flow('g-3-multi').add( + MultiargsTask(name='task1', rebind=['a', 'b', 'y'], provides='z'), + MultiReturnTask(name='task2', provides=['a', 'b', 'c']), + MultiargsTask(name='task3', rebind=['c', 'b', 'x'], provides='y')) + + engine = self._make_engine(flow) + engine.storage.inject({'x': 30}) + engine.run() + self.assertEquals(engine.storage.fetch_all(), { + 'a': 12, + 'b': 2, + 'c': 1, + 'x': 30, + 'y': 33, + 'z': 47 + }) + + def test_one_task_provides_and_requires_same_data(self): + with self.assertRaisesRegexp(exceptions.DependencyFailure, '^No path'): + gf.Flow('g-1-req-error').add( + TestTask([], name='task1', requires=['a'], provides='a')) + + class SingleThreadedEngineTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, + EngineGraphFlowTest, test.TestCase): def _make_engine(self, flow, flow_detail=None): if flow_detail is None: @@ -450,6 +557,7 @@ class SingleThreadedEngineTest(EngineTaskTest, class MultiThreadedEngineTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, + EngineGraphFlowTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): if flow_detail is None: