diff --git a/tests/fixtures/layout-merge-queues.yaml b/tests/fixtures/layout-merge-queues.yaml new file mode 100644 index 0000000000..be39a1c464 --- /dev/null +++ b/tests/fixtures/layout-merge-queues.yaml @@ -0,0 +1,25 @@ +pipelines: + - name: gate + manager: DependentPipelineManager + precedence: low + trigger: + gerrit: + - event: comment-added + approval: + - approved: 1 + +projects: + - name: projectA + gate: + - test-only-a + - common-test1 + + - name: projectB + gate: + - test-only-b + - common-test2 + + - name: projectC + gate: + - common-test1 + - common-test2 diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 4832af9424..1be4721aa1 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -2747,6 +2747,13 @@ class TestScheduler(testtools.TestCase): self.assertIn('project-test1', status_jobs) self.assertIn('project-test2', status_jobs) + def test_merging_queues(self): + "Test that transitively-connected change queues are merged" + self.config.set('zuul', 'layout_config', + 'tests/fixtures/layout-merge-queues.yaml') + self.sched.reconfigure(self.config) + self.assertEqual(len(self.sched.layout.pipelines['gate'].queues), 1) + def test_node_label(self): "Test that a job runs on a specific node label" self.worker.registerFunction('build:node-project-test1:debian') diff --git a/zuul/model.py b/zuul/model.py index 0c694301c8..5fc8f6fe62 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -72,7 +72,7 @@ class Pipeline(object): return job_tree def getProjects(self): - return self.job_trees.keys() + return sorted(self.job_trees.keys(), lambda a, b: cmp(a.name, b.name)) def addQueue(self, queue): self.queues.append(queue) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 8b6c20c874..308c790159 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1368,6 +1368,21 @@ class DependentPipelineManager(BasePipelineManager): change_queues.append(change_queue) self.log.debug("Created queue: %s" % change_queue) + # Iterate over all queues trying to combine them, and keep doing + # so until they can not be combined further. + last_change_queues = change_queues + while True: + new_change_queues = self.combineChangeQueues(last_change_queues) + if len(last_change_queues) == len(new_change_queues): + break + last_change_queues = new_change_queues + + self.log.info(" Shared change queues:") + for queue in new_change_queues: + self.pipeline.addQueue(queue) + self.log.info(" %s" % queue) + + def combineChangeQueues(self, change_queues): self.log.debug("Combining shared queues") new_change_queues = [] for a in change_queues: @@ -1381,11 +1396,7 @@ class DependentPipelineManager(BasePipelineManager): if not merged_a: self.log.debug("Keeping queue %s" % (a)) new_change_queues.append(a) - - self.log.info(" Shared change queues:") - for queue in new_change_queues: - self.pipeline.addQueue(queue) - self.log.info(" %s" % queue) + return new_change_queues def isChangeReadyToBeEnqueued(self, change): if not self.pipeline.trigger.canMerge(change,