From 6fc3a9b0f1868e3a6092007bb0fd8e79df6f5d95 Mon Sep 17 00:00:00 2001 From: Gregory Thiemonge Date: Wed, 28 Jan 2026 13:31:33 +0100 Subject: [PATCH] Avoid iterating over the same atoms in a graph When iterating over a "diamond" graph, some atoms may be processed multiple times. Depending on the size of the graph, it has a huge impact on the complexity of the algorithm. The patch ensures that each node is processed only once. Closes-Bug: #2139228 Closes-Bug: #2086453 Change-Id: Iced8a1fd02ef5766f4017bb1b6c6d48b4c061b5c Signed-off-by: Gregory Thiemonge --- taskflow/engines/action_engine/traversal.py | 10 ++++++++++ taskflow/tests/unit/test_engines.py | 14 ++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/taskflow/engines/action_engine/traversal.py b/taskflow/engines/action_engine/traversal.py index abd14db53..7885d0d36 100644 --- a/taskflow/engines/action_engine/traversal.py +++ b/taskflow/engines/action_engine/traversal.py @@ -60,8 +60,13 @@ def breadth_first_iterate(execution_graph, starting_node, direction, through_flows=through_flows, through_retries=through_retries, through_tasks=through_tasks) q = collections.deque(initial_nodes_iter) + visited_nodes = set() while q: node = q.popleft() + if node in visited_nodes: + continue + visited_nodes.add(node) + node_attrs = execution_graph.nodes[node] if not node_attrs.get('noop'): yield node @@ -88,8 +93,13 @@ def depth_first_iterate(execution_graph, starting_node, direction, through_flows=through_flows, through_retries=through_retries, through_tasks=through_tasks) stack = list(initial_nodes_iter) + visited_nodes = set() while stack: node = stack.pop() + if node in visited_nodes: + continue + visited_nodes.add(node) + node_attrs = execution_graph.nodes[node] if not node_attrs.get('noop'): yield node diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index cd60c970e..215c2cfb0 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -632,6 +632,20 @@ class EngineParallelFlowTest(utils.EngineTestBase): self.assertEqual({'x1': 17, 'x2': 5}, engine.storage.fetch_all()) + # Reproducer for #2139228 and #2086453 + def test_many_unordered_flows_in_linear_flow(self): + flow = lf.Flow("root") + for i in range(10): + sf = uf.Flow(f'subflow {i}') + for j in range(10): + sf.add(utils.ProgressingTask(name=f"task {i}:{j}")) + flow.add(sf) + + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + self.assertIn("task 9:9.t SUCCESS(5)", capturer.values) + class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):