diff --git a/taskflow/examples/buildsystem.py b/taskflow/examples/buildsystem.py new file mode 100644 index 00000000..d717056d --- /dev/null +++ b/taskflow/examples/buildsystem.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +import taskflow.engines +from taskflow.patterns import graph_flow as gf +from taskflow import task + + +# In this example we demonstrate use of TargetedFlow to make oversimplified +# build system. It pretends to compile all sources to object files and +# link them into an executable. It also can build docs, but this can be +# "switched off" via targeted flow special power -- ability to ignore +# all tasks not needed by its target. + + +class CompileTask(task.Task): + """Pretends to take a source and make object file.""" + default_provides = 'object_filename' + + def execute(self, source_filename): + object_filename = '%s.o' % os.path.splitext(source_filename)[0] + print('Compiling %s into %s' + % (source_filename, object_filename)) + return object_filename + + +class LinkTask(task.Task): + """Pretends to link executable form several object files.""" + default_provides = 'executable' + + def __init__(self, executable_path, *args, **kwargs): + super(LinkTask, self).__init__(*args, **kwargs) + self._executable_path = executable_path + + def execute(self, **kwargs): + object_filenames = list(kwargs.values()) + print('Linking executable %s from files %s' + % (self._executable_path, + ', '.join(object_filenames))) + return self._executable_path + + +class BuildDocsTask(task.Task): + """Pretends to build docs from sources.""" + default_provides = 'docs' + + def execute(self, **kwargs): + for source_filename in kwargs.values(): + print("Building docs for %s" % source_filename) + return 'docs' + + +def make_flow_and_store(source_files, executable_only=False): + flow = gf.TargetedFlow('build flow') + object_targets = [] + store = {} + for source in source_files: + source_stored = '%s-source' % source + object_stored = '%s-object' % source + store[source_stored] = source + object_targets.append(object_stored) + flow.add(CompileTask(name='compile-%s' % source, + rebind={'source_filename': source_stored}, + provides=object_stored)) + flow.add(BuildDocsTask(requires=list(store.keys()))) + + # Try this to see executable_only switch broken: + object_targets.append('docs') + link_task = LinkTask('build/executable', requires=object_targets) + flow.add(link_task) + if executable_only: + flow.set_target(link_task) + return flow, store + + +SOURCE_FILES = ['first.c', 'second.cpp', 'main.cpp'] + +print('Running all tasks:') +flow, store = make_flow_and_store(SOURCE_FILES) +taskflow.engines.run(flow, store=store) + +print('\nBuilding executable, no docs:') +flow, store = make_flow_and_store(SOURCE_FILES, executable_only=True) +taskflow.engines.run(flow, store=store) diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index a9f76ab2..db56ec6d 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -19,6 +19,7 @@ import collections import networkx as nx +from networkx.algorithms import traversal from taskflow import exceptions as exc from taskflow import flow @@ -32,6 +33,9 @@ class Flow(flow.Flow): which will be resolved by using the *flows/tasks* provides and requires mappings or by following manually created dependency links. + From dependencies directed graph is build. If it has edge A -> B, this + means B depends on A. + Note: Cyclic dependencies are not allowed. """ @@ -138,10 +142,10 @@ class Flow(flow.Flow): return self def __len__(self): - return self._graph.number_of_nodes() + return self.graph.number_of_nodes() def __iter__(self): - for n in self._graph.nodes_iter(): + for n in self.graph.nodes_iter(): yield n @property @@ -161,3 +165,60 @@ class Flow(flow.Flow): @property def graph(self): return self._graph + + +class TargetedFlow(Flow): + """Graph flow with a target. + + Adds possibility to execute a flow up to certain graph node + (task or subflow). + """ + + def __init__(self, *args, **kwargs): + super(TargetedFlow, self).__init__(*args, **kwargs) + self._subgraph = None + self._target = None + + def set_target(self, target_item): + """Set target for the flow. + + Any items (tasks or subflows) not needed for the target + item will not be executed. + """ + if not self._graph.has_node(target_item): + raise ValueError('Item %s not found' % target_item) + self._target = target_item + self._subgraph = None + + def reset_target(self): + """Reset target for the flow. + + All items of the flow will be executed. + """ + + self._target = None + self._subgraph = None + + def add(self, *items): + """Adds a given task/tasks/flow/flows to this flow.""" + super(TargetedFlow, self).add(*items) + # reset cached subgraph, in case it was affected + self._subgraph = None + + def link(self, u, v): + """Link existing node u as a runtime dependency of existing node v.""" + super(TargetedFlow, self).link(u, v) + # reset cached subgraph, in case it was affected + self._subgraph = None + + @property + def graph(self): + if self._subgraph is not None: + return self._subgraph + if self._target is None: + return self._graph + nodes = [self._target] + nodes.extend(dst for _src, dst in + traversal.dfs_edges(self._graph.reverse(), self._target)) + self._subgraph = nx.freeze(self._graph.subgraph(nodes)) + return self._subgraph diff --git a/taskflow/tests/unit/test_graph_flow.py b/taskflow/tests/unit/test_graph_flow.py index 2c9cf1c1..319330b8 100644 --- a/taskflow/tests/unit/test_graph_flow.py +++ b/taskflow/tests/unit/test_graph_flow.py @@ -114,3 +114,97 @@ class GraphFlowTest(test.TestCase): edge_attrs = gu.get_edge_attrs(g, test_1, test_2) self.assertTrue(edge_attrs.get('manual')) self.assertTrue(edge_attrs.get('flatten')) + + +class TargetedGraphFlowTest(test.TestCase): + + def test_targeted_flow(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=['a'], requires=[]) + test_2 = utils.ProvidesRequiresTask('test-2', + provides=['b'], requires=['a']) + test_3 = utils.ProvidesRequiresTask('test-3', + provides=[], requires=['b']) + test_4 = utils.ProvidesRequiresTask('test-4', + provides=[], requires=['b']) + wf.add(test_1, test_2, test_3, test_4) + wf.set_target(test_3) + g = fu.flatten(wf) + self.assertEqual(3, len(g)) + self.assertFalse(g.has_node(test_4)) + self.assertFalse('c' in wf.provides) + + def test_targeted_flow_reset(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=['a'], requires=[]) + test_2 = utils.ProvidesRequiresTask('test-2', + provides=['b'], requires=['a']) + test_3 = utils.ProvidesRequiresTask('test-3', + provides=[], requires=['b']) + test_4 = utils.ProvidesRequiresTask('test-4', + provides=['c'], requires=['b']) + wf.add(test_1, test_2, test_3, test_4) + wf.set_target(test_3) + wf.reset_target() + g = fu.flatten(wf) + self.assertEqual(4, len(g)) + self.assertTrue(g.has_node(test_4)) + + def test_targeted_flow_bad_target(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=['a'], requires=[]) + test_2 = utils.ProvidesRequiresTask('test-2', + provides=['b'], requires=['a']) + wf.add(test_1) + self.assertRaisesRegexp(ValueError, '^Item .* not found', + wf.set_target, test_2) + + def test_targeted_flow_one_node(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=['a'], requires=[]) + wf.add(test_1) + wf.set_target(test_1) + g = fu.flatten(wf) + self.assertEqual(1, len(g)) + self.assertTrue(g.has_node(test_1)) + + def test_recache_on_add(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=[], requires=['a']) + wf.add(test_1) + wf.set_target(test_1) + self.assertEqual(1, len(wf.graph)) + test_2 = utils.ProvidesRequiresTask('test-2', + provides=['a'], requires=[]) + wf.add(test_2) + self.assertEqual(2, len(wf.graph)) + + def test_recache_on_add_no_deps(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=[], requires=[]) + wf.add(test_1) + wf.set_target(test_1) + self.assertEqual(1, len(wf.graph)) + test_2 = utils.ProvidesRequiresTask('test-2', + provides=[], requires=[]) + wf.add(test_2) + self.assertEqual(1, len(wf.graph)) + + def test_recache_on_link(self): + wf = gw.TargetedFlow("test") + test_1 = utils.ProvidesRequiresTask('test-1', + provides=[], requires=[]) + test_2 = utils.ProvidesRequiresTask('test-2', + provides=[], requires=[]) + wf.add(test_1, test_2) + wf.set_target(test_1) + self.assertEqual(1, len(wf.graph)) + wf.link(test_2, test_1) + self.assertEqual(2, len(wf.graph)) + self.assertEqual([(test_2, test_1)], list(wf.graph.edges()))