From 261d69a75915f869b2e7bf351c47f202c725837c Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Fri, 21 Mar 2014 17:07:03 +0400 Subject: [PATCH] Rework graph flow unit tests This commit adds unit tests that check graph flow methods without executing or flattening it. Now-redundant tests from other test suites are deleted. Change-Id: I8dafe0f9b295428831eddb3f9fd48f042d2f1ffc --- taskflow/patterns/graph_flow.py | 16 +- .../tests/unit/patterns/test_graph_flow.py | 259 ++++++++++++++++++ taskflow/tests/unit/test_flow_dependencies.py | 7 - taskflow/tests/unit/test_graph_flow.py | 215 --------------- 4 files changed, 264 insertions(+), 233 deletions(-) create mode 100644 taskflow/tests/unit/patterns/test_graph_flow.py delete mode 100644 taskflow/tests/unit/test_graph_flow.py diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index a835e0b3..decf3562 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -41,16 +41,6 @@ class Flow(flow.Flow): super(Flow, self).__init__(name, retry) self._graph = nx.freeze(nx.DiGraph()) - def _validate(self, graph=None): - if graph is None: - graph = self._graph - # Ensure that there is a valid topological ordering. - if not nx.is_directed_acyclic_graph(graph): - raise exc.DependencyFailure("No path through the items in the" - " graph produces an ordering that" - " will allow for correct dependency" - " resolution") - def link(self, u, v): """Link existing node u as a runtime dependency of existing node v.""" if not self._graph.has_node(u): @@ -86,7 +76,11 @@ class Flow(flow.Flow): with a frozen version of the replacement graph (this maintains the invariant that the underlying graph is immutable). """ - self._validate(replacement_graph) + if not nx.is_directed_acyclic_graph(replacement_graph): + raise exc.DependencyFailure("No path through the items in the" + " graph produces an ordering that" + " will allow for correct dependency" + " resolution") self._graph = nx.freeze(replacement_graph) def add(self, *items): diff --git a/taskflow/tests/unit/patterns/test_graph_flow.py b/taskflow/tests/unit/patterns/test_graph_flow.py new file mode 100644 index 00000000..2a95ad2d --- /dev/null +++ b/taskflow/tests/unit/patterns/test_graph_flow.py @@ -0,0 +1,259 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow import exceptions as exc +from taskflow.patterns import graph_flow as gf +from taskflow import retry + +from taskflow import test +from taskflow.tests import utils + + +def _task(name, provides=None, requires=None): + return utils.ProvidesRequiresTask(name, provides, requires) + + +class GraphFlowTest(test.TestCase): + + def test_graph_flow_starts_as_empty(self): + f = gf.Flow('test') + + self.assertEqual(len(f), 0) + self.assertEqual(list(f), []) + self.assertEqual(list(f.iter_links()), []) + + self.assertEqual(f.requires, set()) + self.assertEqual(f.provides, set()) + + expected = 'taskflow.patterns.graph_flow.Flow: test; 0' + self.assertEqual(str(f), expected) + + def test_graph_flow_add_nothing(self): + f = gf.Flow('test') + result = f.add() + self.assertIs(f, result) + self.assertEqual(len(f), 0) + + def test_graph_flow_one_task(self): + f = gf.Flow('test') + task = _task(name='task1', requires=['a', 'b'], provides=['c', 'd']) + result = f.add(task) + + self.assertIs(f, result) + + self.assertEqual(len(f), 1) + self.assertEqual(list(f), [task]) + self.assertEqual(list(f.iter_links()), []) + self.assertEqual(f.requires, set(['a', 'b'])) + self.assertEqual(f.provides, set(['c', 'd'])) + + def test_graph_flow_two_independent_tasks(self): + task1 = _task(name='task1') + task2 = _task(name='task2') + f = gf.Flow('test').add(task1, task2) + + self.assertEqual(len(f), 2) + self.assertItemsEqual(f, [task1, task2]) + self.assertEqual(list(f.iter_links()), []) + + def test_graph_flow_two_dependent_tasks(self): + task1 = _task(name='task1', provides=['a']) + task2 = _task(name='task2', requires=['a']) + f = gf.Flow('test').add(task1, task2) + + self.assertEqual(len(f), 2) + self.assertItemsEqual(f, [task1, task2]) + self.assertEqual(list(f.iter_links()), [ + (task1, task2, {'reasons': set(['a'])}) + ]) + + self.assertEqual(f.requires, set()) + self.assertEqual(f.provides, set(['a'])) + + def test_graph_flow_two_dependent_tasks_two_different_calls(self): + task1 = _task(name='task1', provides=['a']) + task2 = _task(name='task2', requires=['a']) + f = gf.Flow('test').add(task1).add(task2) + + self.assertEqual(len(f), 2) + self.assertItemsEqual(f, [task1, task2]) + self.assertEqual(list(f.iter_links()), [ + (task1, task2, {'reasons': set(['a'])}) + ]) + + def test_graph_flow_two_task_same_provide(self): + task1 = _task(name='task1', provides=['a', 'b']) + task2 = _task(name='task2', provides=['a', 'c']) + f = gf.Flow('test') + self.assertRaises(exc.DependencyFailure, f.add, task2, task1) + + def test_graph_flow_with_retry(self): + ret = retry.AlwaysRevert(requires=['a'], provides=['b']) + f = gf.Flow('test', ret) + self.assertIs(f.retry, ret) + self.assertEqual(ret.name, 'test_retry') + + self.assertEqual(f.requires, set(['a'])) + self.assertEqual(f.provides, set(['b'])) + + def test_graph_flow_ordering(self): + task1 = _task('task1', provides=set(['a', 'b'])) + task2 = _task('task2', provides=['c'], requires=['a', 'b']) + task3 = _task('task3', provides=[], requires=['c']) + f = gf.Flow('test').add(task1, task2, task3) + + self.assertEqual(3, len(f)) + + self.assertItemsEqual(list(f.iter_links()), [ + (task1, task2, {'reasons': set(['a', 'b'])}), + (task2, task3, {'reasons': set(['c'])}) + ]) + + def test_graph_flow_links(self): + task1 = _task('task1') + task2 = _task('task2') + f = gf.Flow('test').add(task1, task2) + linked = f.link(task1, task2) + self.assertIs(linked, f) + self.assertItemsEqual(list(f.iter_links()), [ + (task1, task2, {'manual': True}) + ]) + + def test_graph_flow_links_and_dependencies(self): + task1 = _task('task1', provides=['a']) + task2 = _task('task2', requires=['a']) + f = gf.Flow('test').add(task1, task2) + linked = f.link(task1, task2) + self.assertIs(linked, f) + expected_meta = { + 'manual': True, + 'reasons': set(['a']) + } + self.assertItemsEqual(list(f.iter_links()), [ + (task1, task2, expected_meta) + ]) + + def test_graph_flow_link_from_unknown_node(self): + task1 = _task('task1') + task2 = _task('task2') + f = gf.Flow('test').add(task2) + self.assertRaisesRegexp(ValueError, 'Item .* not found to link from', + f.link, task1, task2) + + def test_graph_flow_link_to_unknown_node(self): + task1 = _task('task1') + task2 = _task('task2') + f = gf.Flow('test').add(task1) + self.assertRaisesRegexp(ValueError, 'Item .* not found to link to', + f.link, task1, task2) + + def test_graph_flow_link_raises_on_cycle(self): + task1 = _task('task1', provides=['a']) + task2 = _task('task2', requires=['a']) + f = gf.Flow('test').add(task1, task2) + self.assertRaises(exc.DependencyFailure, f.link, task2, task1) + + def test_graph_flow_link_raises_on_link_cycle(self): + task1 = _task('task1') + task2 = _task('task2') + f = gf.Flow('test').add(task1, task2) + f.link(task1, task2) + self.assertRaises(exc.DependencyFailure, f.link, task2, task1) + + def test_graph_flow_dependency_cycle(self): + task1 = _task('task1', provides=['a'], requires=['c']) + task2 = _task('task2', provides=['b'], requires=['a']) + task3 = _task('task3', provides=['c'], requires=['b']) + f = gf.Flow('test').add(task1, task2) + self.assertRaises(exc.DependencyFailure, f.add, task3) + + +class TargetedGraphFlowTest(test.TestCase): + + def test_targeted_flow_restricts(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=['a'], requires=[]) + task2 = _task('task2', provides=['b'], requires=['a']) + task3 = _task('task3', provides=[], requires=['b']) + task4 = _task('task4', provides=[], requires=['b']) + f.add(task1, task2, task3, task4) + f.set_target(task3) + self.assertEqual(len(f), 3) + self.assertItemsEqual(f, [task1, task2, task3]) + self.assertNotIn('c', f.provides) + + def test_targeted_flow_reset(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=['a'], requires=[]) + task2 = _task('task2', provides=['b'], requires=['a']) + task3 = _task('task3', provides=[], requires=['b']) + task4 = _task('task4', provides=['c'], requires=['b']) + f.add(task1, task2, task3, task4) + f.set_target(task3) + f.reset_target() + self.assertEqual(len(f), 4) + self.assertItemsEqual(f, [task1, task2, task3, task4]) + self.assertIn('c', f.provides) + + def test_targeted_flow_bad_target(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=['a'], requires=[]) + task2 = _task('task2', provides=['b'], requires=['a']) + f.add(task1) + self.assertRaisesRegexp(ValueError, '^Item .* not found', + f.set_target, task2) + + def test_targeted_flow_one_node(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=['a'], requires=[]) + f.add(task1) + f.set_target(task1) + self.assertEqual(len(f), 1) + self.assertItemsEqual(f, [task1]) + + def test_recache_on_add(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=[], requires=['a']) + f.add(task1) + f.set_target(task1) + self.assertEqual(1, len(f)) + task2 = _task('task2', provides=['a'], requires=[]) + f.add(task2) + self.assertEqual(2, len(f)) + + def test_recache_on_add_no_deps(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=[], requires=[]) + f.add(task1) + f.set_target(task1) + self.assertEqual(1, len(f)) + task2 = _task('task2', provides=[], requires=[]) + f.add(task2) + self.assertEqual(1, len(f)) + + def test_recache_on_link(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=[], requires=[]) + task2 = _task('task2', provides=[], requires=[]) + f.add(task1, task2) + f.set_target(task1) + self.assertEqual(1, len(f)) + + f.link(task2, task1) + self.assertEqual(2, len(f)) + self.assertEqual(list(f.iter_links()), [ + (task2, task1, {'manual': True}) + ]) diff --git a/taskflow/tests/unit/test_flow_dependencies.py b/taskflow/tests/unit/test_flow_dependencies.py index 8f5d680f..3d02939a 100644 --- a/taskflow/tests/unit/test_flow_dependencies.py +++ b/taskflow/tests/unit/test_flow_dependencies.py @@ -194,13 +194,6 @@ class FlowDependenciesTest(test.TestCase): flow.add, gf.Flow('gf').add(utils.TaskOneReturn(provides='x'))) - def test_graph_flow_without_dependencies(self): - flow = gf.Flow('gf').add( - utils.TaskNoRequiresNoReturns('task1'), - utils.TaskNoRequiresNoReturns('task2')) - self.assertEqual(flow.requires, set()) - self.assertEqual(flow.provides, set()) - def test_graph_flow_requires_values(self): flow = gf.Flow('gf').add( utils.TaskOneArg('task1'), diff --git a/taskflow/tests/unit/test_graph_flow.py b/taskflow/tests/unit/test_graph_flow.py deleted file mode 100644 index 4618c24d..00000000 --- a/taskflow/tests/unit/test_graph_flow.py +++ /dev/null @@ -1,215 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections - -import taskflow.engines - -from taskflow.patterns import graph_flow as gw -from taskflow.utils import flow_utils as fu -from taskflow.utils import graph_utils as gu - -from taskflow import test -from taskflow.tests import utils - - -class GraphFlowTest(test.TestCase): - def _make_engine(self, flow): - return taskflow.engines.load(flow, store={}) - - def _capture_states(self): - # TODO(harlowja): move function to shared helper. - capture_where = collections.defaultdict(list) - - def do_capture(state, details): - task_uuid = details.get('task_uuid') - if not task_uuid: - return - capture_where[task_uuid].append(state) - - return (do_capture, capture_where) - - def test_ordering(self): - wf = gw.Flow("the-test-action") - test_1 = utils.ProvidesRequiresTask('test-1', - requires=[], - provides=set(['a', 'b'])) - test_2 = utils.ProvidesRequiresTask('test-2', - provides=['c'], - requires=['a', 'b']) - test_3 = utils.ProvidesRequiresTask('test-3', - provides=[], - requires=['c']) - wf.add(test_1, test_2, test_3) - self.assertEqual(3, len(wf)) - - edges = [(src, dst) for src, dst, _meta in wf.iter_links()] - self.assertIn((test_1, test_2), edges) - self.assertIn((test_2, test_3), edges) - self.assertEqual(2, len(edges)) - - def test_basic_edge_reasons(self): - wf = gw.Flow("the-test-action") - test_1 = utils.ProvidesRequiresTask('test-1', - requires=[], - provides=set(['a', 'b'])) - test_2 = utils.ProvidesRequiresTask('test-2', - provides=['c'], - requires=['a', 'b']) - wf.add(test_1, test_2) - edges = list(wf.iter_links()) - self.assertEqual(len(edges), 1) - - from_task, to_task, edge_attrs = edges[0] - self.assertIs(from_task, test_1) - self.assertIs(to_task, test_2) - - self.assertTrue(len(edge_attrs) > 0) - self.assertIn('reasons', edge_attrs) - self.assertEqual(set(['a', 'b']), edge_attrs['reasons']) - - def test_linked_edge_reasons(self): - wf = gw.Flow("the-test-action") - test_1 = utils.ProvidesRequiresTask('test-1', - requires=[], - provides=[]) - test_2 = utils.ProvidesRequiresTask('test-2', - provides=[], - requires=[]) - wf.add(test_1, test_2) - self.assertEqual(len(list(wf.iter_links())), 0) - - wf.link(test_1, test_2) - edges = list(wf.iter_links()) - self.assertEqual(len(edges), 1) - - from_task, to_task, edge_attrs = edges[0] - self.assertIs(from_task, test_1) - self.assertIs(to_task, test_2) - - self.assertTrue(len(edge_attrs) > 0) - self.assertTrue(edge_attrs.get('manual')) - - def test_flatten_attribute(self): - wf = gw.Flow("the-test-action") - test_1 = utils.ProvidesRequiresTask('test-1', - requires=[], - provides=[]) - test_2 = utils.ProvidesRequiresTask('test-2', - provides=[], - requires=[]) - wf.add(test_1, test_2) - wf.link(test_1, test_2) - g = fu.flatten(wf) - self.assertEqual(2, len(g)) - edge_attrs = gu.get_edge_attrs(g, test_1, test_2) - self.assertTrue(edge_attrs.get('manual')) - self.assertTrue(edge_attrs.get('flatten')) - - -class TargetedGraphFlowTest(test.TestCase): - - def test_targeted_flow(self): - wf = gw.TargetedFlow("test") - test_1 = utils.ProvidesRequiresTask('test-1', - provides=['a'], requires=[]) - test_2 = utils.ProvidesRequiresTask('test-2', - provides=['b'], requires=['a']) - test_3 = utils.ProvidesRequiresTask('test-3', - provides=[], requires=['b']) - test_4 = utils.ProvidesRequiresTask('test-4', - provides=[], requires=['b']) - wf.add(test_1, test_2, test_3, test_4) - wf.set_target(test_3) - g = fu.flatten(wf) - self.assertEqual(3, len(g)) - self.assertFalse(g.has_node(test_4)) - self.assertFalse('c' in wf.provides) - - def test_targeted_flow_reset(self): - wf = gw.TargetedFlow("test") - test_1 = utils.ProvidesRequiresTask('test-1', - provides=['a'], requires=[]) - test_2 = utils.ProvidesRequiresTask('test-2', - provides=['b'], requires=['a']) - test_3 = utils.ProvidesRequiresTask('test-3', - provides=[], requires=['b']) - test_4 = utils.ProvidesRequiresTask('test-4', - provides=['c'], requires=['b']) - wf.add(test_1, test_2, test_3, test_4) - wf.set_target(test_3) - wf.reset_target() - g = fu.flatten(wf) - self.assertEqual(4, len(g)) - self.assertTrue(g.has_node(test_4)) - - def test_targeted_flow_bad_target(self): - wf = gw.TargetedFlow("test") - test_1 = utils.ProvidesRequiresTask('test-1', - provides=['a'], requires=[]) - test_2 = utils.ProvidesRequiresTask('test-2', - provides=['b'], requires=['a']) - wf.add(test_1) - self.assertRaisesRegexp(ValueError, '^Item .* not found', - wf.set_target, test_2) - - def test_targeted_flow_one_node(self): - wf = gw.TargetedFlow("test") - test_1 = utils.ProvidesRequiresTask('test-1', - provides=['a'], requires=[]) - wf.add(test_1) - wf.set_target(test_1) - g = fu.flatten(wf) - self.assertEqual(1, len(g)) - self.assertTrue(g.has_node(test_1)) - - def test_recache_on_add(self): - wf = gw.TargetedFlow("test") - test_1 = utils.ProvidesRequiresTask('test-1', - provides=[], requires=['a']) - wf.add(test_1) - wf.set_target(test_1) - self.assertEqual(1, len(wf)) - test_2 = utils.ProvidesRequiresTask('test-2', - provides=['a'], requires=[]) - wf.add(test_2) - self.assertEqual(2, len(wf)) - - def test_recache_on_add_no_deps(self): - wf = gw.TargetedFlow("test") - test_1 = utils.ProvidesRequiresTask('test-1', - provides=[], requires=[]) - wf.add(test_1) - wf.set_target(test_1) - self.assertEqual(1, len(wf)) - test_2 = utils.ProvidesRequiresTask('test-2', - provides=[], requires=[]) - wf.add(test_2) - self.assertEqual(1, len(wf)) - - def test_recache_on_link(self): - wf = gw.TargetedFlow("test") - test_1 = utils.ProvidesRequiresTask('test-1', - provides=[], requires=[]) - test_2 = utils.ProvidesRequiresTask('test-2', - provides=[], requires=[]) - wf.add(test_1, test_2) - wf.set_target(test_1) - self.assertEqual(1, len(wf)) - wf.link(test_2, test_1) - self.assertEqual(2, len(wf)) - edges = [(src, dst) for src, dst, _meta in wf.iter_links()] - self.assertEqual([(test_2, test_1)], edges)