Targeted graph flow pattern

This change adds graph flow like pattern that is able to ignore graph
nodes (not even add them to flow) if they are not needed to run certain
task.

Implements blueprint targeted-graph-flow
Change-Id: I57d5a1c0598bd032d77e2a262b2d9644418ce3f6
This commit is contained in:
Ivan A. Melnikov
2013-12-02 19:00:51 +04:00
parent 06bd21f8c2
commit db556f9f67
3 changed files with 267 additions and 2 deletions

View File

@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import sys
logging.basicConfig(level=logging.ERROR)
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir,
os.pardir))
sys.path.insert(0, top_dir)
import taskflow.engines
from taskflow.patterns import graph_flow as gf
from taskflow import task
# In this example we demonstrate use of TargetedFlow to make oversimplified
# build system. It pretends to compile all sources to object files and
# link them into an executable. It also can build docs, but this can be
# "switched off" via targeted flow special power -- ability to ignore
# all tasks not needed by its target.
class CompileTask(task.Task):
"""Pretends to take a source and make object file."""
default_provides = 'object_filename'
def execute(self, source_filename):
object_filename = '%s.o' % os.path.splitext(source_filename)[0]
print('Compiling %s into %s'
% (source_filename, object_filename))
return object_filename
class LinkTask(task.Task):
"""Pretends to link executable form several object files."""
default_provides = 'executable'
def __init__(self, executable_path, *args, **kwargs):
super(LinkTask, self).__init__(*args, **kwargs)
self._executable_path = executable_path
def execute(self, **kwargs):
object_filenames = list(kwargs.values())
print('Linking executable %s from files %s'
% (self._executable_path,
', '.join(object_filenames)))
return self._executable_path
class BuildDocsTask(task.Task):
"""Pretends to build docs from sources."""
default_provides = 'docs'
def execute(self, **kwargs):
for source_filename in kwargs.values():
print("Building docs for %s" % source_filename)
return 'docs'
def make_flow_and_store(source_files, executable_only=False):
flow = gf.TargetedFlow('build flow')
object_targets = []
store = {}
for source in source_files:
source_stored = '%s-source' % source
object_stored = '%s-object' % source
store[source_stored] = source
object_targets.append(object_stored)
flow.add(CompileTask(name='compile-%s' % source,
rebind={'source_filename': source_stored},
provides=object_stored))
flow.add(BuildDocsTask(requires=list(store.keys())))
# Try this to see executable_only switch broken:
object_targets.append('docs')
link_task = LinkTask('build/executable', requires=object_targets)
flow.add(link_task)
if executable_only:
flow.set_target(link_task)
return flow, store
SOURCE_FILES = ['first.c', 'second.cpp', 'main.cpp']
print('Running all tasks:')
flow, store = make_flow_and_store(SOURCE_FILES)
taskflow.engines.run(flow, store=store)
print('\nBuilding executable, no docs:')
flow, store = make_flow_and_store(SOURCE_FILES, executable_only=True)
taskflow.engines.run(flow, store=store)

View File

@@ -19,6 +19,7 @@
import collections
import networkx as nx
from networkx.algorithms import traversal
from taskflow import exceptions as exc
from taskflow import flow
@@ -32,6 +33,9 @@ class Flow(flow.Flow):
which will be resolved by using the *flows/tasks* provides and requires
mappings or by following manually created dependency links.
From dependencies directed graph is build. If it has edge A -> B, this
means B depends on A.
Note: Cyclic dependencies are not allowed.
"""
@@ -138,10 +142,10 @@ class Flow(flow.Flow):
return self
def __len__(self):
return self._graph.number_of_nodes()
return self.graph.number_of_nodes()
def __iter__(self):
for n in self._graph.nodes_iter():
for n in self.graph.nodes_iter():
yield n
@property
@@ -161,3 +165,60 @@ class Flow(flow.Flow):
@property
def graph(self):
return self._graph
class TargetedFlow(Flow):
"""Graph flow with a target.
Adds possibility to execute a flow up to certain graph node
(task or subflow).
"""
def __init__(self, *args, **kwargs):
super(TargetedFlow, self).__init__(*args, **kwargs)
self._subgraph = None
self._target = None
def set_target(self, target_item):
"""Set target for the flow.
Any items (tasks or subflows) not needed for the target
item will not be executed.
"""
if not self._graph.has_node(target_item):
raise ValueError('Item %s not found' % target_item)
self._target = target_item
self._subgraph = None
def reset_target(self):
"""Reset target for the flow.
All items of the flow will be executed.
"""
self._target = None
self._subgraph = None
def add(self, *items):
"""Adds a given task/tasks/flow/flows to this flow."""
super(TargetedFlow, self).add(*items)
# reset cached subgraph, in case it was affected
self._subgraph = None
def link(self, u, v):
"""Link existing node u as a runtime dependency of existing node v."""
super(TargetedFlow, self).link(u, v)
# reset cached subgraph, in case it was affected
self._subgraph = None
@property
def graph(self):
if self._subgraph is not None:
return self._subgraph
if self._target is None:
return self._graph
nodes = [self._target]
nodes.extend(dst for _src, dst in
traversal.dfs_edges(self._graph.reverse(), self._target))
self._subgraph = nx.freeze(self._graph.subgraph(nodes))
return self._subgraph

View File

@@ -114,3 +114,97 @@ class GraphFlowTest(test.TestCase):
edge_attrs = gu.get_edge_attrs(g, test_1, test_2)
self.assertTrue(edge_attrs.get('manual'))
self.assertTrue(edge_attrs.get('flatten'))
class TargetedGraphFlowTest(test.TestCase):
def test_targeted_flow(self):
wf = gw.TargetedFlow("test")
test_1 = utils.ProvidesRequiresTask('test-1',
provides=['a'], requires=[])
test_2 = utils.ProvidesRequiresTask('test-2',
provides=['b'], requires=['a'])
test_3 = utils.ProvidesRequiresTask('test-3',
provides=[], requires=['b'])
test_4 = utils.ProvidesRequiresTask('test-4',
provides=[], requires=['b'])
wf.add(test_1, test_2, test_3, test_4)
wf.set_target(test_3)
g = fu.flatten(wf)
self.assertEqual(3, len(g))
self.assertFalse(g.has_node(test_4))
self.assertFalse('c' in wf.provides)
def test_targeted_flow_reset(self):
wf = gw.TargetedFlow("test")
test_1 = utils.ProvidesRequiresTask('test-1',
provides=['a'], requires=[])
test_2 = utils.ProvidesRequiresTask('test-2',
provides=['b'], requires=['a'])
test_3 = utils.ProvidesRequiresTask('test-3',
provides=[], requires=['b'])
test_4 = utils.ProvidesRequiresTask('test-4',
provides=['c'], requires=['b'])
wf.add(test_1, test_2, test_3, test_4)
wf.set_target(test_3)
wf.reset_target()
g = fu.flatten(wf)
self.assertEqual(4, len(g))
self.assertTrue(g.has_node(test_4))
def test_targeted_flow_bad_target(self):
wf = gw.TargetedFlow("test")
test_1 = utils.ProvidesRequiresTask('test-1',
provides=['a'], requires=[])
test_2 = utils.ProvidesRequiresTask('test-2',
provides=['b'], requires=['a'])
wf.add(test_1)
self.assertRaisesRegexp(ValueError, '^Item .* not found',
wf.set_target, test_2)
def test_targeted_flow_one_node(self):
wf = gw.TargetedFlow("test")
test_1 = utils.ProvidesRequiresTask('test-1',
provides=['a'], requires=[])
wf.add(test_1)
wf.set_target(test_1)
g = fu.flatten(wf)
self.assertEqual(1, len(g))
self.assertTrue(g.has_node(test_1))
def test_recache_on_add(self):
wf = gw.TargetedFlow("test")
test_1 = utils.ProvidesRequiresTask('test-1',
provides=[], requires=['a'])
wf.add(test_1)
wf.set_target(test_1)
self.assertEqual(1, len(wf.graph))
test_2 = utils.ProvidesRequiresTask('test-2',
provides=['a'], requires=[])
wf.add(test_2)
self.assertEqual(2, len(wf.graph))
def test_recache_on_add_no_deps(self):
wf = gw.TargetedFlow("test")
test_1 = utils.ProvidesRequiresTask('test-1',
provides=[], requires=[])
wf.add(test_1)
wf.set_target(test_1)
self.assertEqual(1, len(wf.graph))
test_2 = utils.ProvidesRequiresTask('test-2',
provides=[], requires=[])
wf.add(test_2)
self.assertEqual(1, len(wf.graph))
def test_recache_on_link(self):
wf = gw.TargetedFlow("test")
test_1 = utils.ProvidesRequiresTask('test-1',
provides=[], requires=[])
test_2 = utils.ProvidesRequiresTask('test-2',
provides=[], requires=[])
wf.add(test_1, test_2)
wf.set_target(test_1)
self.assertEqual(1, len(wf.graph))
wf.link(test_2, test_1)
self.assertEqual(2, len(wf.graph))
self.assertEqual([(test_2, test_1)], list(wf.graph.edges()))