diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py index bdde8975..6f9aa669 100644 --- a/taskflow/engines/action_engine/analyzer.py +++ b/taskflow/engines/action_engine/analyzer.py @@ -80,10 +80,14 @@ class IgnoreDecider(Decider): def check(self, runtime): """Returns bool of whether this decider should allow running.""" + # Gather all atoms results so that those results can be used + # by the decider(s) that are making a decision as to pass or + # not pass... results = {} - for name in six.iterkeys(self._edge_deciders): - results[name] = runtime.storage.get(name) - for local_decider in six.itervalues(self._edge_deciders): + for node, node_kind, _local_decider in self._edge_deciders: + if node_kind in co.ATOMS: + results[node.name] = runtime.storage.get(node.name) + for _node, _node_kind, local_decider in self._edge_deciders: if not local_decider(history=results): return False return True diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 6780e931..dc9aa276 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import functools from futurist import waiters @@ -49,6 +50,34 @@ class Runtime(object): self._compilation = compilation self._atom_cache = {} + @staticmethod + def _walk_edge_deciders(graph, atom): + """Iterates through all nodes, deciders that alter atoms execution.""" + # This is basically a reverse breadth first exploration, with + # special logic to further traverse down flow nodes... + predecessors_iter = graph.predecessors_iter + nodes = collections.deque((u_node, atom) + for u_node in predecessors_iter(atom)) + visited = set() + while nodes: + u_node, v_node = nodes.popleft() + u_node_kind = graph.node[u_node]['kind'] + try: + yield (u_node, u_node_kind, + graph.adj[u_node][v_node][LINK_DECIDER]) + except KeyError: + pass + if u_node_kind == com.FLOW and u_node not in visited: + # Avoid re-exploring the same flow if we get to this + # same flow by a different *future* path... + visited.add(u_node) + # Since we *currently* jump over flow node(s), we need to make + # sure that any prior decider that was directed at this flow + # node also gets used during future decisions about this + # atom node. + nodes.extend((u_u_node, u_node) + for u_u_node in predecessors_iter(u_node)) + def compile(self): """Compiles & caches frequently used execution helper objects. @@ -84,21 +113,13 @@ class Runtime(object): raise exc.CompilationFailure("Unknown node kind '%s'" " encountered" % node_kind) metadata = {} + deciders_it = self._walk_edge_deciders(graph, node) walker = sc.ScopeWalker(self.compilation, node, names_only=True) - edge_deciders = {} - for prev_node in graph.predecessors_iter(node): - # If there is any link function that says if this connection - # is able to run (or should not) ensure we retain it and use - # it later as needed. - u_v_data = graph.adj[prev_node][node] - u_v_decider = u_v_data.get(LINK_DECIDER) - if u_v_decider is not None: - edge_deciders[prev_node.name] = u_v_decider metadata['scope_walker'] = walker metadata['check_transition_handler'] = check_transition_handler metadata['change_state_handler'] = change_state_handler metadata['scheduler'] = scheduler - metadata['edge_deciders'] = edge_deciders + metadata['edge_deciders'] = tuple(deciders_it) self._atom_cache[node.name] = metadata @property diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index ba003aec..7c8b6015 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -933,6 +933,87 @@ class EngineResetTests(utils.EngineTestBase): class EngineGraphConditionalFlowTest(utils.EngineTestBase): + def test_graph_flow_conditional_jumps_across_2(self): + histories = [] + + def should_go(history): + histories.append(history) + return False + + task1 = utils.ProgressingTask(name='task1') + task2 = utils.ProgressingTask(name='task2') + task3 = utils.ProgressingTask(name='task3') + task4 = utils.ProgressingTask(name='task4') + + subflow = lf.Flow("more-work") + subsub_flow = lf.Flow("more-more-work") + subsub_flow.add(task3, task4) + subflow.add(subsub_flow) + + flow = gf.Flow("main-work") + flow.add(task1, task2) + flow.link(task1, task2) + flow.add(subflow) + flow.link(task2, subflow, decider=should_go) + + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + + expected = [ + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + + 'task2.t RUNNING', + 'task2.t SUCCESS(5)', + + 'task3.t IGNORE', + 'task4.t IGNORE', + ] + self.assertEqual(expected, capturer.values) + self.assertEqual(1, len(histories)) + self.assertIn('task2', histories[0]) + + def test_graph_flow_conditional_jumps_across(self): + histories = [] + + def should_go(history): + histories.append(history) + return False + + task1 = utils.ProgressingTask(name='task1') + task2 = utils.ProgressingTask(name='task2') + task3 = utils.ProgressingTask(name='task3') + task4 = utils.ProgressingTask(name='task4') + + subflow = lf.Flow("more-work") + subflow.add(task3, task4) + flow = gf.Flow("main-work") + flow.add(task1, task2) + flow.link(task1, task2) + flow.add(subflow) + flow.link(task2, subflow, decider=should_go) + flow.link(task1, subflow, decider=should_go) + + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + + expected = [ + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + + 'task2.t RUNNING', + 'task2.t SUCCESS(5)', + + 'task3.t IGNORE', + 'task4.t IGNORE', + ] + self.assertEqual(expected, capturer.values) + self.assertEqual(1, len(histories)) + self.assertIn('task1', histories[0]) + self.assertIn('task2', histories[0]) + def test_graph_flow_conditional(self): flow = gf.Flow('root')