From daf341ad819987f9d7ec3e198793274688597dd7 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 21 May 2013 13:51:13 -0700 Subject: [PATCH] Add a few more graph ordering test cases. --- taskflow/tests/unit/test_graph_flow.py | 78 ++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/taskflow/tests/unit/test_graph_flow.py b/taskflow/tests/unit/test_graph_flow.py index ea20a14f..845cd37d 100644 --- a/taskflow/tests/unit/test_graph_flow.py +++ b/taskflow/tests/unit/test_graph_flow.py @@ -16,6 +16,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import functools import unittest @@ -31,6 +32,28 @@ def null_functor(*args, **kwargs): return None +class ProvidesRequiresTask(task.Task): + def __init__(self, name, provides, requires): + super(ProvidesRequiresTask, self).__init__(name) + self._provides = provides + self._requires = requires + + def requires(self): + return self._requires + + def provides(self): + return self._provides + + def apply(self, context, *args, **kwargs): + outs = { + '__inputs__': dict(kwargs), + } + context['__order__'].append(self.name) + for v in self.provides(): + outs[v] = True + return outs + + class GraphFlowTest(unittest.TestCase): def testRevertPath(self): flo = gw.Flow("test-flow") @@ -61,6 +84,61 @@ class GraphFlowTest(unittest.TestCase): self.assertEquals(states.FAILURE, flo.state) self.assertEquals(['run1'], reverted) + def testNoProvider(self): + flo = gw.Flow("test-flow") + flo.add(ProvidesRequiresTask('test1', + provides=['a', 'b'], + requires=['c', 'd'])) + self.assertEquals(states.PENDING, flo.state) + self.assertRaises(excp.InvalidStateException, flo.run, {}) + self.assertEquals(states.FAILURE, flo.state) + + def testLoopFlow(self): + flo = gw.Flow("test-flow") + flo.add(ProvidesRequiresTask('test1', + provides=['a', 'b'], + requires=['c', 'd', 'e'])) + flo.add(ProvidesRequiresTask('test2', + provides=['c', 'd', 'e'], + requires=['a', 'b'])) + ctx = collections.defaultdict(list) + self.assertEquals(states.PENDING, flo.state) + self.assertRaises(excp.InvalidStateException, flo.run, ctx) + self.assertEquals(states.FAILURE, flo.state) + + def testComplicatedInputsOutputs(self): + flo = gw.Flow("test-flow") + flo.add(ProvidesRequiresTask('test1', + provides=['a', 'b'], + requires=['c', 'd', 'e'])) + flo.add(ProvidesRequiresTask('test2', + provides=['c', 'd', 'e'], + requires=[])) + flo.add(ProvidesRequiresTask('test3', + provides=['c', 'd'], + requires=[])) + flo.add(ProvidesRequiresTask('test4', + provides=['z'], + requires=['a', 'b', 'c', 'd', 'e'])) + flo.add(ProvidesRequiresTask('test5', + provides=['y'], + requires=['z'])) + flo.add(ProvidesRequiresTask('test6', + provides=[], + requires=['y'])) + + self.assertEquals(states.PENDING, flo.state) + ctx = collections.defaultdict(list) + flo.run(ctx) + self.assertEquals(states.SUCCESS, flo.state) + run_order = ctx['__order__'] + + # Order isn't deterministic so that's why we sort it + self.assertEquals(['test2', 'test3'], sorted(run_order[0:2])) + + # This order is deterministic + self.assertEquals(['test1', 'test4', 'test5', 'test6'], run_order[2:]) + def testConnectRequirementFailure(self): def run1(context, *args, **kwargs):