Graph flow, sequential graph action

Change-Id: I07cc820aa2f37d0f9599f34efab07b28cf47ca48
This commit is contained in:
Anastasia Karpinska
2013-09-18 18:21:15 +03:00
parent b07ee63f78
commit 1623dbb01e
6 changed files with 378 additions and 116 deletions

View File

@@ -21,10 +21,12 @@ import threading
from concurrent import futures
from taskflow.engines.action_engine import graph_action
from taskflow.engines.action_engine import parallel_action
from taskflow.engines.action_engine import seq_action
from taskflow.engines.action_engine import task_action
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
@@ -137,7 +139,8 @@ class SingleThreadedTranslator(Translator):
def _factory_map(self):
return [(lf.Flow, self._translate_sequential),
(uf.Flow, self._translate_sequential)]
(uf.Flow, self._translate_sequential),
(gf.Flow, self._translate_graph)]
def _translate_sequential(self, pattern):
action = seq_action.SequentialAction()
@@ -145,6 +148,12 @@ class SingleThreadedTranslator(Translator):
action.add(self.translate(p))
return action
def _translate_graph(self, pattern):
action = graph_action.SequentialGraphAction(pattern.graph)
for p in pattern:
action.add(p, self.translate(p))
return action
class SingleThreadedActionEngine(ActionEngine):
translator_cls = SingleThreadedTranslator
@@ -163,7 +172,8 @@ class MultiThreadedTranslator(Translator):
def _factory_map(self):
return [(lf.Flow, self._translate_sequential),
# unordered can be run in parallel
(uf.Flow, self._translate_parallel)]
(uf.Flow, self._translate_parallel),
(gf.Flow, self._translate_graph)]
def _translate_sequential(self, pattern):
action = seq_action.SequentialAction()
@@ -177,6 +187,13 @@ class MultiThreadedTranslator(Translator):
action.add(self.translate(p))
return action
def _translate_graph(self, pattern):
# TODO(akarpinska): replace with parallel graph later
action = graph_action.SequentialGraphAction(pattern.graph)
for p in pattern:
action.add(p, self.translate(p))
return action
class MultiThreadedActionEngine(ActionEngine):
translator_cls = MultiThreadedTranslator

View File

@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.engines.action_engine import base_action as base
class GraphAction(base.Action):
def __init__(self, graph):
self._graph = graph
self._action_mapping = {}
def add(self, node, action):
self._action_mapping[node] = action
def _succ(self, node):
return self._graph.successors(node)
def _pred(self, node):
return self._graph.predecessors(node)
def _resolve_dependencies(self, node, deps_counter, revert=False):
to_execute = []
nodes = self._pred(node) if revert else self._succ(node)
for next_node in nodes:
deps_counter[next_node] -= 1
if not deps_counter[next_node]:
to_execute.append(next_node)
return to_execute
def _browse_nodes_to_execute(self, deps_counter):
to_execute = []
for node, deps in deps_counter.items():
if not deps:
to_execute.append(node)
return to_execute
def _get_nodes_dependencies_count(self, revert=False):
deps_counter = {}
for node in self._graph.nodes_iter():
nodes = self._succ(node) if revert else self._pred(node)
deps_counter[node] = len(nodes)
return deps_counter
class SequentialGraphAction(GraphAction):
def execute(self, engine):
deps_counter = self._get_nodes_dependencies_count()
to_execute = self._browse_nodes_to_execute(deps_counter)
while to_execute:
node = to_execute.pop()
action = self._action_mapping[node]
action.execute(engine) # raises on failure
to_execute += self._resolve_dependencies(node, deps_counter)
def revert(self, engine):
deps_counter = self._get_nodes_dependencies_count(True)
to_revert = self._browse_nodes_to_execute(deps_counter)
while to_revert:
node = to_revert.pop()
action = self._action_mapping[node]
action.revert(engine) # raises on failure
to_revert += self._resolve_dependencies(node, deps_counter, True)

View File

@@ -0,0 +1,75 @@
import logging
import os
import sys
logging.basicConfig(level=logging.ERROR)
my_dir_path = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
os.pardir))
from taskflow.engines.action_engine import engine as eng
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
# In this example there are complex dependencies between
# tasks. User shouldn't care about ordering the Tasks.
# GraphFlow resolves dependencies automatically using tasks'
# requirements and provided values.
# Flows of any types can be nested into Graph flow. Subflows
# dependencies will be resolved too.
class Adder(task.Task):
def execute(self, x, y):
return x + y
flow = gf.Flow('root').add(
lf.Flow('nested_linear').add(
# x2 = y3+y4 = 12
Adder("add2", provides='x2', rebind=['y3', 'y4']),
# x1 = y1+y2 = 4
Adder("add1", provides='x1', rebind=['y1', 'y2'])
),
# x5 = x1+x3 = 20
Adder("add5", provides='x5', rebind=['x1', 'x3']),
# x3 = x1+x2 = 16
Adder("add3", provides='x3', rebind=['x1', 'x2']),
# x4 = x2+y5 = 21
Adder("add4", provides='x4', rebind=['x2', 'y5']),
# x6 = x5+x4 = 41
Adder("add6", provides='x6', rebind=['x5', 'x4']),
# x7 = x6+x6 = 82
Adder("add7", provides='x7', rebind=['x6', 'x6']))
single_threaded_engine = eng.SingleThreadedActionEngine(flow)
single_threaded_engine.storage.inject({
"y1": 1,
"y2": 3,
"y3": 5,
"y4": 7,
"y5": 9,
})
single_threaded_engine.run()
print ("Single threaded engine result %s" %
single_threaded_engine.storage.fetch_all())
multi_threaded_engine = eng.MultiThreadedActionEngine(flow)
multi_threaded_engine.storage.inject({
"y1": 1,
"y2": 3,
"y3": 5,
"y4": 7,
"y5": 9,
})
multi_threaded_engine.run()
print ("Multi threaded engine result %s" %
multi_threaded_engine.storage.fetch_all())

View File

@@ -112,3 +112,8 @@ class MissingDependencies(InvalidStateException):
message = self.message % {'who': who, 'requirements': requirements}
super(MissingDependencies, self).__init__(message)
self.missing_requirements = requirements
class DependencyFailure(TaskFlowException):
"""Raised when flow can't resolve dependency."""
pass

View File

@@ -16,132 +16,108 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import collections
from networkx.algorithms import dag
from networkx.classes import digraph
from networkx import exception as g_exc
from taskflow import decorators
from taskflow import exceptions as exc
from taskflow.patterns import linear_flow
from taskflow.utils import graph_utils
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
from taskflow import flow
class Flow(linear_flow.Flow):
"""A extension of the linear flow which will run the associated tasks in
a linear topological ordering (and reverse using the same linear
topological order).
class Flow(flow.Flow):
"""Graph flow pattern
Nested flows will be executed according to their dependencies
that will be resolved using data tasks provide and require.
Note: Cyclic dependencies are not allowed.
"""
def __init__(self, name, parents=None, uuid=None):
super(Flow, self).__init__(name, parents, uuid)
def __init__(self, name, uuid=None):
super(Flow, self).__init__(name, uuid)
self._graph = digraph.DiGraph()
@decorators.locked
def add(self, task, infer=True):
# Only insert the node to start, connect all the edges
# together later after all nodes have been added since if we try
# to infer the edges at this stage we likely will fail finding
# dependencies from nodes that don't exist.
r = misc.AOTRunner(task)
self._graph.add_node(r, uuid=r.uuid, infer=infer)
self._reset_internals()
return r.uuid
def link(self, u, v):
if not self._graph.has_node(u):
raise ValueError('Item %s not found to link from' % (u))
if not self._graph.has_node(v):
raise ValueError('Item %s not found to link to' % (v))
self._graph.add_edge(u, v)
def _find_uuid(self, uuid):
runner = None
for r in self._graph.nodes_iter():
if r.uuid == uuid:
runner = r
break
return runner
# Ensure that there is a valid topological ordering.
if not dag.is_directed_acyclic_graph(self._graph):
self._graph.remove_edge(u, v)
raise exc.DependencyFailure("No path through the items in the"
" graph produces an ordering that"
" will allow for correct dependency"
" resolution")
def add(self, *items):
"""Adds a given task/tasks/flow/flows to this flow."""
requirements = collections.defaultdict(list)
provided = {}
def update_requirements(node):
for value in node.requires:
requirements[value].append(node)
for node in self:
update_requirements(node)
for value in node.provides:
provided[value] = node
try:
for item in items:
self._graph.add_node(item)
update_requirements(item)
for value in item.provides:
if value in provided:
raise exc.DependencyFailure(
"%(item)s provides %(value)s but is already being"
" provided by %(flow)s and duplicate producers"
" are disallowed"
% dict(item=item.name,
flow=provided[value].name,
value=value))
provided[value] = item
for value in item.requires:
if value in provided:
self.link(provided[value], item)
for value in item.provides:
if value in requirements:
for node in requirements[value]:
self.link(item, node)
except Exception:
self._graph.remove_nodes_from(items)
raise
return self
def __len__(self):
return len(self._graph)
return self._graph.number_of_nodes()
@decorators.locked
def add_dependency(self, provider_uuid, requirer_uuid):
"""Connects provider to requirer where provider will now be required
to run before requirer does.
"""
if provider_uuid == requirer_uuid:
raise ValueError("Unable to link %s to itself" % provider_uuid)
provider = self._find_uuid(provider_uuid)
if not provider:
raise ValueError("No provider found with uuid %s" % provider_uuid)
requirer = self._find_uuid(requirer_uuid)
if not requirer:
raise ValueError("No requirer found with uuid %s" % requirer_uuid)
self._add_dependency(provider, requirer, reason='manual')
self._reset_internals()
def __iter__(self):
for child in self._graph.nodes_iter():
yield child
def _add_dependency(self, provider, requirer, reason):
self._graph.add_edge(provider, requirer, reason=reason)
@property
def provides(self):
provides = set()
for subflow in self:
provides.update(subflow.provides)
return provides
def __str__(self):
lines = ["GraphFlow: %s" % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (self._graph.number_of_nodes()))
lines.append("%s" % (self._graph.number_of_edges()))
lines.append("%s" % (len(self.parents)))
lines.append("%s" % (self.state))
return "; ".join(lines)
@property
def requires(self):
requires = set()
for subflow in self:
requires.update(subflow.requires)
return requires - self.provides
def _reset_internals(self):
super(Flow, self)._reset_internals()
self._runners = []
@decorators.locked
def remove(self, uuid):
runner = self._find_uuid(uuid)
if not runner:
raise ValueError("No uuid %s found" % (uuid))
else:
self._graph.remove_node(runner)
self._reset_internals()
def _ordering(self):
try:
return iter(self._connect())
except g_exc.NetworkXUnfeasible:
raise exc.InvalidStateException("Unable to correctly determine "
"the path through the provided "
"flow which will satisfy the "
"tasks needed inputs and outputs.")
def _connect(self):
"""Connects the nodes & edges of the graph together by examining who
the requirements of each node and finding another node that will
create said dependency.
"""
if len(self._graph) == 0:
return []
if self._connected:
return self._runners
# Clear out all automatically added edges since we want to do a fresh
# connections. Leave the manually connected ones intact so that users
# still retain the dependencies they established themselves.
def discard_edge_func(u, v, e_data):
if e_data and e_data.get('reason') != 'manual':
return True
return False
# Link providers to requirers.
graph_utils.connect(self._graph, discard_func=discard_edge_func)
# Now figure out the order so that we can give the runners there
# optional item providers as well as figure out the topological run
# order.
run_order = dag.topological_sort(self._graph)
run_stack = []
for r in run_order:
r.runs_before = list(reversed(run_stack))
run_stack.append(r)
self._runners = run_order
self._connected = True
return run_order
@property
def graph(self):
return self._graph

View File

@@ -21,6 +21,7 @@ import time
from concurrent import futures
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
@@ -36,10 +37,10 @@ from taskflow import test
class TestTask(task.Task):
def __init__(self, values=None, name=None,
sleep=None, provides=None, rebind=None):
def __init__(self, values=None, name=None, sleep=None,
provides=None, rebind=None, requires=None):
super(TestTask, self).__init__(name=name, provides=provides,
rebind=rebind)
rebind=rebind, requires=requires)
if values is None:
self.values = []
else:
@@ -435,9 +436,115 @@ class EngineParallelFlowTest(EngineTestBase):
{'x1': 17, 'x2': 5})
class EngineGraphFlowTest(EngineTestBase):
def test_graph_flow_one_task(self):
flow = gf.Flow('g-1').add(
TestTask(self.values, name='task1')
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['task1'])
def test_graph_flow_two_independent_tasks(self):
flow = gf.Flow('g-2').add(
TestTask(self.values, name='task1'),
TestTask(self.values, name='task2')
)
self._make_engine(flow).run()
self.assertEquals(set(self.values), set(['task1', 'task2']))
def test_graph_flow_two_tasks(self):
flow = gf.Flow('g-1-1').add(
TestTask(self.values, name='task2', requires=['a']),
TestTask(self.values, name='task1', provides='a')
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['task1', 'task2'])
def test_graph_flow_four_tasks_added_separately(self):
flow = (gf.Flow('g-4')
.add(TestTask(self.values, name='task4',
provides='d', requires=['c']))
.add(TestTask(self.values, name='task2',
provides='b', requires=['a']))
.add(TestTask(self.values, name='task3',
provides='c', requires=['b']))
.add(TestTask(self.values, name='task1',
provides='a'))
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['task1', 'task2', 'task3', 'task4'])
def test_graph_cyclic_dependency(self):
with self.assertRaisesRegexp(exceptions.DependencyFailure, '^No path'):
gf.Flow('g-3-cyclic').add(
TestTask([], name='task1', provides='a', requires=['b']),
TestTask([], name='task2', provides='b', requires=['c']),
TestTask([], name='task3', provides='c', requires=['a']))
def test_graph_two_tasks_returns_same_value(self):
with self.assertRaisesRegexp(exceptions.DependencyFailure,
"task2 provides a but is already being"
" provided by task1 and duplicate"
" producers are disallowed"):
gf.Flow('g-2-same-value').add(
TestTask([], name='task1', provides='a'),
TestTask([], name='task2', provides='a'))
def test_graph_flow_four_tasks_revert(self):
flow = gf.Flow('g-4-failing').add(
TestTask(self.values, name='task4', provides='d', requires=['c']),
TestTask(self.values, name='task2', provides='b', requires=['a']),
FailingTask(self.values, name='task3',
provides='c', requires=['b']),
TestTask(self.values, name='task1', provides='a'))
engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
engine.run()
self.assertEquals(self.values,
['task1', 'task2',
'task3 reverted(Failure: RuntimeError: Woot!)',
'task2 reverted(5)', 'task1 reverted(5)'])
def test_graph_flow_four_tasks_revert_failure(self):
flow = gf.Flow('g-3-nasty').add(
NastyTask(name='task2', provides='b', requires=['a']),
FailingTask(self.values, name='task3', requires=['b']),
TestTask(self.values, name='task1', provides='a'))
engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Gotcha'):
engine.run()
def test_graph_flow_with_multireturn_and_multiargs_tasks(self):
flow = gf.Flow('g-3-multi').add(
MultiargsTask(name='task1', rebind=['a', 'b', 'y'], provides='z'),
MultiReturnTask(name='task2', provides=['a', 'b', 'c']),
MultiargsTask(name='task3', rebind=['c', 'b', 'x'], provides='y'))
engine = self._make_engine(flow)
engine.storage.inject({'x': 30})
engine.run()
self.assertEquals(engine.storage.fetch_all(), {
'a': 12,
'b': 2,
'c': 1,
'x': 30,
'y': 33,
'z': 47
})
def test_one_task_provides_and_requires_same_data(self):
with self.assertRaisesRegexp(exceptions.DependencyFailure, '^No path'):
gf.Flow('g-1-req-error').add(
TestTask([], name='task1', requires=['a'], provides='a'))
class SingleThreadedEngineTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineGraphFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None):
if flow_detail is None:
@@ -450,6 +557,7 @@ class SingleThreadedEngineTest(EngineTaskTest,
class MultiThreadedEngineTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineGraphFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
if flow_detail is None: