Fix merging transitively-connected change queues.

When combining change queues, given 3 projects that were transitively
connected by shared jobs, depending on the order of processing, it
was possible for them not to be combined.  To correct this, repeat
the combining operation until the resulting set can be combined no
further.

In order to make the test (and actual usage) behavior more deterministic,
the list of projects returned by the pipeline is now sorted by name.

A test is added for this.

Change-Id: If1386cad4118257efee9aa9918ad12a626927038
This commit is contained in:
James E. Blair 2013-12-03 15:06:48 -08:00
parent b1b010d393
commit c3d428ee89
4 changed files with 49 additions and 6 deletions

25
tests/fixtures/layout-merge-queues.yaml vendored Normal file
View File

@ -0,0 +1,25 @@
pipelines:
- name: gate
manager: DependentPipelineManager
precedence: low
trigger:
gerrit:
- event: comment-added
approval:
- approved: 1
projects:
- name: projectA
gate:
- test-only-a
- common-test1
- name: projectB
gate:
- test-only-b
- common-test2
- name: projectC
gate:
- common-test1
- common-test2

View File

@ -2747,6 +2747,13 @@ class TestScheduler(testtools.TestCase):
self.assertIn('project-test1', status_jobs)
self.assertIn('project-test2', status_jobs)
def test_merging_queues(self):
"Test that transitively-connected change queues are merged"
self.config.set('zuul', 'layout_config',
'tests/fixtures/layout-merge-queues.yaml')
self.sched.reconfigure(self.config)
self.assertEqual(len(self.sched.layout.pipelines['gate'].queues), 1)
def test_node_label(self):
"Test that a job runs on a specific node label"
self.worker.registerFunction('build:node-project-test1:debian')

View File

@ -72,7 +72,7 @@ class Pipeline(object):
return job_tree
def getProjects(self):
return self.job_trees.keys()
return sorted(self.job_trees.keys(), lambda a, b: cmp(a.name, b.name))
def addQueue(self, queue):
self.queues.append(queue)

View File

@ -1368,6 +1368,21 @@ class DependentPipelineManager(BasePipelineManager):
change_queues.append(change_queue)
self.log.debug("Created queue: %s" % change_queue)
# Iterate over all queues trying to combine them, and keep doing
# so until they can not be combined further.
last_change_queues = change_queues
while True:
new_change_queues = self.combineChangeQueues(last_change_queues)
if len(last_change_queues) == len(new_change_queues):
break
last_change_queues = new_change_queues
self.log.info(" Shared change queues:")
for queue in new_change_queues:
self.pipeline.addQueue(queue)
self.log.info(" %s" % queue)
def combineChangeQueues(self, change_queues):
self.log.debug("Combining shared queues")
new_change_queues = []
for a in change_queues:
@ -1381,11 +1396,7 @@ class DependentPipelineManager(BasePipelineManager):
if not merged_a:
self.log.debug("Keeping queue %s" % (a))
new_change_queues.append(a)
self.log.info(" Shared change queues:")
for queue in new_change_queues:
self.pipeline.addQueue(queue)
self.log.info(" %s" % queue)
return new_change_queues
def isChangeReadyToBeEnqueued(self, change):
if not self.pipeline.trigger.canMerge(change,