diff --git a/taskflow/engines/action_engine/traversal.py b/taskflow/engines/action_engine/traversal.py index abd14db53..7885d0d36 100644 --- a/taskflow/engines/action_engine/traversal.py +++ b/taskflow/engines/action_engine/traversal.py @@ -60,8 +60,13 @@ def breadth_first_iterate(execution_graph, starting_node, direction, through_flows=through_flows, through_retries=through_retries, through_tasks=through_tasks) q = collections.deque(initial_nodes_iter) + visited_nodes = set() while q: node = q.popleft() + if node in visited_nodes: + continue + visited_nodes.add(node) + node_attrs = execution_graph.nodes[node] if not node_attrs.get('noop'): yield node @@ -88,8 +93,13 @@ def depth_first_iterate(execution_graph, starting_node, direction, through_flows=through_flows, through_retries=through_retries, through_tasks=through_tasks) stack = list(initial_nodes_iter) + visited_nodes = set() while stack: node = stack.pop() + if node in visited_nodes: + continue + visited_nodes.add(node) + node_attrs = execution_graph.nodes[node] if not node_attrs.get('noop'): yield node diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index cd60c970e..215c2cfb0 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -632,6 +632,20 @@ class EngineParallelFlowTest(utils.EngineTestBase): self.assertEqual({'x1': 17, 'x2': 5}, engine.storage.fetch_all()) + # Reproducer for #2139228 and #2086453 + def test_many_unordered_flows_in_linear_flow(self): + flow = lf.Flow("root") + for i in range(10): + sf = uf.Flow(f'subflow {i}') + for j in range(10): + sf.add(utils.ProgressingTask(name=f"task {i}:{j}")) + flow.add(sf) + + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + self.assertIn("task 9:9.t SUCCESS(5)", capturer.values) + class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):