From db556f9f6721dfa21786fb242408ae2a2041ae3b Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Mon, 2 Dec 2013 19:00:51 +0400 Subject: [PATCH] Targeted graph flow pattern This change adds graph flow like pattern that is able to ignore graph nodes (not even add them to flow) if they are not needed to run certain task. Implements blueprint targeted-graph-flow Change-Id: I57d5a1c0598bd032d77e2a262b2d9644418ce3f6 --- taskflow/examples/buildsystem.py | 110 +++++++++++++++++++++++++ taskflow/patterns/graph_flow.py | 65 ++++++++++++++- taskflow/tests/unit/test_graph_flow.py | 94 +++++++++++++++++++++ 3 files changed, 267 insertions(+), 2 deletions(-) create mode 100644 taskflow/examples/buildsystem.py diff --git a/taskflow/examples/buildsystem.py b/taskflow/examples/buildsystem.py new file mode 100644 index 00000000..d717056d --- /dev/null +++ b/taskflow/examples/buildsystem.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +import taskflow.engines +from taskflow.patterns import graph_flow as gf +from taskflow import task + + +# In this example we demonstrate use of TargetedFlow to make oversimplified +# build system. It pretends to compile all sources to object files and +# link them into an executable. It also can build docs, but this can be +# "switched off" via targeted flow special power -- ability to ignore +# all tasks not needed by its target. + + +class CompileTask(task.Task): + """Pretends to take a source and make object file.""" + default_provides = 'object_filename' + + def execute(self, source_filename): + object_filename = '%s.o' % os.path.splitext(source_filename)[0] + print('Compiling %s into %s' + % (source_filename, object_filename)) + return object_filename + + +class LinkTask(task.Task): + """Pretends to link executable form several object files.""" + default_provides = 'executable' + + def __init__(self, executable_path, *args, **kwargs): + super(LinkTask, self).__init__(*args, **kwargs) + self._executable_path = executable_path + + def execute(self, **kwargs): + object_filenames = list(kwargs.values()) + print('Linking executable %s from files %s' + % (self._executable_path, + ', '.join(object_filenames))) + return self._executable_path + + +class BuildDocsTask(task.Task): + """Pretends to build docs from sources.""" + default_provides = 'docs' + + def execute(self, **kwargs): + for source_filename in kwargs.values(): + print("Building docs for %s" % source_filename) + return 'docs' + + +def make_flow_and_store(source_files, executable_only=False): + flow = gf.TargetedFlow('build flow') + object_targets = [] + store = {} + for source in source_files: + source_stored = '%s-source' % source + object_stored = '%s-object' % source + store[source_stored] = source + object_targets.append(object_stored) + flow.add(CompileTask(name='compile-%s' % source, + rebind={'source_filename': source_stored}, + provides=object_stored)) + flow.add(BuildDocsTask(requires=list(store.keys()))) + + # Try this to see executable_only switch broken: + object_targets.append('docs') + link_task = LinkTask('build/executable', requires=object_targets) + flow.add(link_task) + if executable_only: + flow.set_target(link_task) + return flow, store + + +SOURCE_FILES = ['first.c', 'second.cpp', 'main.cpp'] + +print('Running all tasks:') +flow, store = make_flow_and_store(SOURCE_FILES) +taskflow.engines.run(flow, store=store) + +print('\nBuilding executable, no docs:') +flow, store = make_flow_and_store(SOURCE_FILES, executable_only=True) +taskflow.engines.run(flow, store=store) diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index a9f76ab2..db56ec6d 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -19,6 +19,7 @@ import collections import networkx as nx +from networkx.algorithms import traversal from taskflow import exceptions as exc from taskflow import flow @@ -32,6 +33,9 @@ class Flow(flow.Flow): which will be resolved by using the *flows/tasks* provides and requires mappings or by following manually created dependency links. + From dependencies directed graph is build. If it has edge A -> B, this + means B depends on A. + Note: Cyclic dependencies are not allowed. """ @@ -138,10 +142,10 @@ class Flow(flow.Flow): return self def __len__(self): - return self._graph.number_of_nodes() + return self.graph.number_of_nodes() def __iter__(self): - for n in self._graph.nodes_iter(): + for n in self.graph.nodes_iter(): yield n @property @@ -161,3 +165,60 @@ class Flow(flow.Flow): @property def graph(self): return self._graph + + +class TargetedFlow(Flow): + """Graph flow with a target. + + Adds possibility to execute a flow up to certain graph node + (task or subflow). + """ + + def __init__(self, *args, **kwargs): + super(TargetedFlow, self).__init__(*args, **kwargs) + self._subgraph = None + self._target = None + + def set_target(self, target_item): + """Set target for the flow. + + Any items (tasks or subflows) not needed for the target + item will not be executed. + """ + if not self._graph.has_node(target_item): + raise ValueError('Item %s not found' % target_item) + self._target = target_item + self._subgraph = None + + def reset_target(self): + """Reset target for the flow. + + All items of the flow will be executed. + """ + + self._target = None + self._subgraph = None + + def add(self, *items): + """Adds a given task/tasks/flow/flows to this flow.""" + super(TargetedFlow, self).add(*items) + # reset cached subgraph, in case it was affected + self._subgraph = None + + def link(self, u, v): + """Link existing node u as a runtime dependency of existing node v.""" + super(TargetedFlow, self).link(u, v) + # reset cached subgraph, in case it was affected + self._subgraph = None + + @property + def graph(self): + if self._subgraph is not None: + return self._subgraph + if self._target is None: + return self._graph + nodes = [self._target] + nodes.extend(dst for _src, dst in + traversal.dfs_edges(self._graph.reverse(), self._target)) + self._subgraph = nx.freeze(self._graph.subgraph(nodes)) + return self._subgraph diff --git a/taskflow/tests/unit/test_graph_flow.py b/taskflow/tests/unit/test_graph_flow.py index 2c9cf1c1..319330b8 100644 --- a/taskflow/tests/unit/test_graph_flow.py +++ b/taskflow/tests/unit/test_graph_flow.py @@ -114,3 +114,97 @@ class GraphFlowTest(test.TestCase): edge_attrs = gu.get_edge_attrs(g, test_1, test_2) self.assertTrue(edge_attrs.get('manual')) self.assertTrue(edge_attrs.get('flatten')) + + +class TargetedGraphFlowTest(test.TestCase): + + def test_targeted_flow(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=['a'], requires=[]) + test_2 = utils.ProvidesRequiresTask('test-2', + provides=['b'], requires=['a']) + test_3 = utils.ProvidesRequiresTask('test-3', + provides=[], requires=['b']) + test_4 = utils.ProvidesRequiresTask('test-4', + provides=[], requires=['b']) + wf.add(test_1, test_2, test_3, test_4) + wf.set_target(test_3) + g = fu.flatten(wf) + self.assertEqual(3, len(g)) + self.assertFalse(g.has_node(test_4)) + self.assertFalse('c' in wf.provides) + + def test_targeted_flow_reset(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=['a'], requires=[]) + test_2 = utils.ProvidesRequiresTask('test-2', + provides=['b'], requires=['a']) + test_3 = utils.ProvidesRequiresTask('test-3', + provides=[], requires=['b']) + test_4 = utils.ProvidesRequiresTask('test-4', + provides=['c'], requires=['b']) + wf.add(test_1, test_2, test_3, test_4) + wf.set_target(test_3) + wf.reset_target() + g = fu.flatten(wf) + self.assertEqual(4, len(g)) + self.assertTrue(g.has_node(test_4)) + + def test_targeted_flow_bad_target(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=['a'], requires=[]) + test_2 = utils.ProvidesRequiresTask('test-2', + provides=['b'], requires=['a']) + wf.add(test_1) + self.assertRaisesRegexp(ValueError, '^Item .* not found', + wf.set_target, test_2) + + def test_targeted_flow_one_node(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=['a'], requires=[]) + wf.add(test_1) + wf.set_target(test_1) + g = fu.flatten(wf) + self.assertEqual(1, len(g)) + self.assertTrue(g.has_node(test_1)) + + def test_recache_on_add(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=[], requires=['a']) + wf.add(test_1) + wf.set_target(test_1) + self.assertEqual(1, len(wf.graph)) + test_2 = utils.ProvidesRequiresTask('test-2', + provides=['a'], requires=[]) + wf.add(test_2) + self.assertEqual(2, len(wf.graph)) + + def test_recache_on_add_no_deps(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=[], requires=[]) + wf.add(test_1) + wf.set_target(test_1) + self.assertEqual(1, len(wf.graph)) + test_2 = utils.ProvidesRequiresTask('test-2', + provides=[], requires=[]) + wf.add(test_2) + self.assertEqual(1, len(wf.graph)) + + def test_recache_on_link(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=[], requires=[]) + test_2 = utils.ProvidesRequiresTask('test-2', + provides=[], requires=[]) + wf.add(test_1, test_2) + wf.set_target(test_1) + self.assertEqual(1, len(wf.graph)) + wf.link(test_2, test_1) + self.assertEqual(2, len(wf.graph)) + self.assertEqual([(test_2, test_1)], list(wf.graph.edges()))