Combine shared queues explicitly
In Zuul v2, shared queues were determined automatically by combining queues which shared a job name. In v3, they are explicitly created by configuration by naming them. Update the queue construction to behave this way. Change-Id: I7e328b2170351bc51232c1c6ea48946a36d34e27
This commit is contained in:
parent
3158e288f0
commit
0dcef7a575
|
@ -156,7 +156,7 @@ class ProjectTemplateParser(object):
|
||||||
continue
|
continue
|
||||||
project_pipeline = model.ProjectPipelineConfig()
|
project_pipeline = model.ProjectPipelineConfig()
|
||||||
project_template.pipelines[pipeline.name] = project_pipeline
|
project_template.pipelines[pipeline.name] = project_pipeline
|
||||||
project_pipeline.queue_name = conf.get('queue')
|
project_pipeline.queue_name = conf_pipeline.get('queue')
|
||||||
project_pipeline.job_tree = ProjectTemplateParser._parseJobTree(
|
project_pipeline.job_tree = ProjectTemplateParser._parseJobTree(
|
||||||
layout, conf_pipeline.get('jobs'))
|
layout, conf_pipeline.get('jobs'))
|
||||||
return project_template
|
return project_template
|
||||||
|
|
|
@ -36,51 +36,38 @@ class DependentPipelineManager(PipelineManager):
|
||||||
|
|
||||||
def buildChangeQueues(self):
|
def buildChangeQueues(self):
|
||||||
self.log.debug("Building shared change queues")
|
self.log.debug("Building shared change queues")
|
||||||
change_queues = []
|
change_queues = {}
|
||||||
|
project_configs = self.pipeline.layout.project_configs
|
||||||
|
|
||||||
for project in self.pipeline.getProjects():
|
for project in self.pipeline.getProjects():
|
||||||
|
project_config = project_configs[project.name]
|
||||||
|
project_pipeline_config = project_config.pipelines[
|
||||||
|
self.pipeline.name]
|
||||||
|
queue_name = project_pipeline_config.queue_name
|
||||||
|
if queue_name and queue_name in change_queues:
|
||||||
|
change_queue = change_queues[queue_name]
|
||||||
|
else:
|
||||||
|
p = self.pipeline
|
||||||
change_queue = model.ChangeQueue(
|
change_queue = model.ChangeQueue(
|
||||||
self.pipeline,
|
p,
|
||||||
window=self.pipeline.window,
|
window=p.window,
|
||||||
window_floor=self.pipeline.window_floor,
|
window_floor=p.window_floor,
|
||||||
window_increase_type=self.pipeline.window_increase_type,
|
window_increase_type=p.window_increase_type,
|
||||||
window_increase_factor=self.pipeline.window_increase_factor,
|
window_increase_factor=p.window_increase_factor,
|
||||||
window_decrease_type=self.pipeline.window_decrease_type,
|
window_decrease_type=p.window_decrease_type,
|
||||||
window_decrease_factor=self.pipeline.window_decrease_factor)
|
window_decrease_factor=p.window_decrease_factor,
|
||||||
change_queue.addProject(project)
|
name=queue_name)
|
||||||
change_queues.append(change_queue)
|
if queue_name:
|
||||||
|
# If this is a named queue, keep track of it in
|
||||||
|
# case it is referenced again. Otherwise, it will
|
||||||
|
# have a name automatically generated from its
|
||||||
|
# constituent projects.
|
||||||
|
change_queues[queue_name] = change_queue
|
||||||
|
self.pipeline.addQueue(change_queue)
|
||||||
self.log.debug("Created queue: %s" % change_queue)
|
self.log.debug("Created queue: %s" % change_queue)
|
||||||
|
change_queue.addProject(project)
|
||||||
# Iterate over all queues trying to combine them, and keep doing
|
self.log.debug("Added project %s to queue: %s" %
|
||||||
# so until they can not be combined further.
|
(project, change_queue))
|
||||||
last_change_queues = change_queues
|
|
||||||
while True:
|
|
||||||
new_change_queues = self.combineChangeQueues(last_change_queues)
|
|
||||||
if len(last_change_queues) == len(new_change_queues):
|
|
||||||
break
|
|
||||||
last_change_queues = new_change_queues
|
|
||||||
|
|
||||||
self.log.info(" Shared change queues:")
|
|
||||||
for queue in new_change_queues:
|
|
||||||
self.pipeline.addQueue(queue)
|
|
||||||
self.log.info(" %s containing %s" % (
|
|
||||||
queue, queue.generated_name))
|
|
||||||
|
|
||||||
def combineChangeQueues(self, change_queues):
|
|
||||||
self.log.debug("Combining shared queues")
|
|
||||||
new_change_queues = []
|
|
||||||
for a in change_queues:
|
|
||||||
merged_a = False
|
|
||||||
for b in new_change_queues:
|
|
||||||
if not a.getJobs().isdisjoint(b.getJobs()):
|
|
||||||
self.log.debug("Merging queue %s into %s" % (a, b))
|
|
||||||
b.mergeChangeQueue(a)
|
|
||||||
merged_a = True
|
|
||||||
break # this breaks out of 'for b' and continues 'for a'
|
|
||||||
if not merged_a:
|
|
||||||
self.log.debug("Keeping queue %s" % (a))
|
|
||||||
new_change_queues.append(a)
|
|
||||||
return new_change_queues
|
|
||||||
|
|
||||||
def getChangeQueue(self, change, existing=None):
|
def getChangeQueue(self, change, existing=None):
|
||||||
if existing:
|
if existing:
|
||||||
|
|
|
@ -131,11 +131,6 @@ class Pipeline(object):
|
||||||
def setManager(self, manager):
|
def setManager(self, manager):
|
||||||
self.manager = manager
|
self.manager = manager
|
||||||
|
|
||||||
def addProject(self, project):
|
|
||||||
job_tree = JobTree(None) # Null job == job tree root
|
|
||||||
self.job_trees[project] = job_tree
|
|
||||||
return job_tree
|
|
||||||
|
|
||||||
def getProjects(self):
|
def getProjects(self):
|
||||||
# cmp is not in python3, applied idiom from
|
# cmp is not in python3, applied idiom from
|
||||||
# http://python-future.org/compatible_idioms.html#cmp
|
# http://python-future.org/compatible_idioms.html#cmp
|
||||||
|
@ -219,11 +214,13 @@ class ChangeQueue(object):
|
||||||
"""
|
"""
|
||||||
def __init__(self, pipeline, window=0, window_floor=1,
|
def __init__(self, pipeline, window=0, window_floor=1,
|
||||||
window_increase_type='linear', window_increase_factor=1,
|
window_increase_type='linear', window_increase_factor=1,
|
||||||
window_decrease_type='exponential', window_decrease_factor=2):
|
window_decrease_type='exponential', window_decrease_factor=2,
|
||||||
|
name=None):
|
||||||
self.pipeline = pipeline
|
self.pipeline = pipeline
|
||||||
|
if name:
|
||||||
|
self.name = name
|
||||||
|
else:
|
||||||
self.name = ''
|
self.name = ''
|
||||||
self.assigned_name = None
|
|
||||||
self.generated_name = None
|
|
||||||
self.projects = []
|
self.projects = []
|
||||||
self._jobs = set()
|
self._jobs = set()
|
||||||
self.queue = []
|
self.queue = []
|
||||||
|
@ -244,10 +241,8 @@ class ChangeQueue(object):
|
||||||
if project not in self.projects:
|
if project not in self.projects:
|
||||||
self.projects.append(project)
|
self.projects.append(project)
|
||||||
|
|
||||||
names = [x.name for x in self.projects]
|
if not self.name:
|
||||||
names.sort()
|
self.name = project.name
|
||||||
self.generated_name = ', '.join(names)
|
|
||||||
self.name = self.assigned_name or self.generated_name
|
|
||||||
|
|
||||||
def enqueueChange(self, change):
|
def enqueueChange(self, change):
|
||||||
item = QueueItem(self, change)
|
item = QueueItem(self, change)
|
||||||
|
|
Loading…
Reference in New Issue