Avoid iterating over the same atoms in a graph
When iterating over a "diamond" graph, some atoms may be processed multiple times. Depending on the size of the graph, it has a huge impact on the complexity of the algorithm. The patch ensures that each node is processed only once. Closes-Bug: #2139228 Closes-Bug: #2086453 Change-Id: Iced8a1fd02ef5766f4017bb1b6c6d48b4c061b5c Signed-off-by: Gregory Thiemonge <gthiemon@redhat.com>
This commit is contained in:
@@ -60,8 +60,13 @@ def breadth_first_iterate(execution_graph, starting_node, direction,
|
||||
through_flows=through_flows, through_retries=through_retries,
|
||||
through_tasks=through_tasks)
|
||||
q = collections.deque(initial_nodes_iter)
|
||||
visited_nodes = set()
|
||||
while q:
|
||||
node = q.popleft()
|
||||
if node in visited_nodes:
|
||||
continue
|
||||
visited_nodes.add(node)
|
||||
|
||||
node_attrs = execution_graph.nodes[node]
|
||||
if not node_attrs.get('noop'):
|
||||
yield node
|
||||
@@ -88,8 +93,13 @@ def depth_first_iterate(execution_graph, starting_node, direction,
|
||||
through_flows=through_flows, through_retries=through_retries,
|
||||
through_tasks=through_tasks)
|
||||
stack = list(initial_nodes_iter)
|
||||
visited_nodes = set()
|
||||
while stack:
|
||||
node = stack.pop()
|
||||
if node in visited_nodes:
|
||||
continue
|
||||
visited_nodes.add(node)
|
||||
|
||||
node_attrs = execution_graph.nodes[node]
|
||||
if not node_attrs.get('noop'):
|
||||
yield node
|
||||
|
||||
@@ -632,6 +632,20 @@ class EngineParallelFlowTest(utils.EngineTestBase):
|
||||
self.assertEqual({'x1': 17, 'x2': 5},
|
||||
engine.storage.fetch_all())
|
||||
|
||||
# Reproducer for #2139228 and #2086453
|
||||
def test_many_unordered_flows_in_linear_flow(self):
|
||||
flow = lf.Flow("root")
|
||||
for i in range(10):
|
||||
sf = uf.Flow(f'subflow {i}')
|
||||
for j in range(10):
|
||||
sf.add(utils.ProgressingTask(name=f"task {i}:{j}"))
|
||||
flow.add(sf)
|
||||
|
||||
engine = self._make_engine(flow)
|
||||
with utils.CaptureListener(engine, capture_flow=False) as capturer:
|
||||
engine.run()
|
||||
self.assertIn("task 9:9.t SUCCESS(5)", capturer.values)
|
||||
|
||||
|
||||
class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
|
||||
|
||||
|
||||
Reference in New Issue
Block a user