Support per branch change queues

We have several large projects with most of the time long gate
queues. Those projects typically work on master and few release
branches where the changes in the release branches are more important
to the changes for master. Currently all of those changes are queued
up in a shared gate queue which makes the process of getting changes
into the release branches very slow especially if occasional gate
resets are involved. In order to improve this allow specifying the
change queues per branch so we can queue up the changes for each
release branch in a separate queue.

This is done by adding a new config element 'queue' which can be
configured to work on a per branch level.

Change-Id: Ie5c1a2b8f413fd595dbaaeba67251da14c6b4b36
This commit is contained in:
Tobias Henkel 2020-04-08 12:35:52 +02:00
parent 8be15d9aad
commit e5ba72f778
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
23 changed files with 515 additions and 40 deletions

View File

@ -90,6 +90,7 @@ the YAML files:
pipeline_def
job_def
project_def
queue_def
secret_def
nodeset_def
semaphore_def

View File

@ -176,6 +176,9 @@ pipeline.
changes which break the others. This is a free-form string;
just set the same value for each group of projects.
The name can refer to the name of a :attr:`queue` which allows
further configuration of the queue.
Each pipeline for a project can only belong to one queue,
therefore Zuul will use the first value that it encounters.
It need not appear in the first instance of a :attr:`project`

View File

@ -0,0 +1,44 @@
.. _queue:
Queue
=====
Projects that interact with each other should share a ``queue``.
This is especially used in a :value:`dependent <pipeline.manager.dependent>`
pipeline. The :attr:`project.<pipeline>.queue` can optionally refer
to a specific :attr:`queue` object that can further configure the
behavior of the queue.
Here is an example ``queue`` configuration.
.. code-block:: yaml
- queue:
name: integrated
per-branch: false
.. attr:: queue
The attributes available on a queue are as follows (all are
optional unless otherwise specified):
.. attr:: name
:required:
This is used later in the project definition to refer to this queue.
.. attr:: per-branch
:default: false
Queues by default define a single queue for all projects and
branches that use it. This is especially important if projects
want to do upgrade tests between different branches in
the :term:`gate`. If a set of projects doesn't have this use case
it can configure the queue to create a shared queue per branch for
all projects. This can be useful for large projects to improve the
throughput of a gate pipeline as this results in shorter queues
and thus less impact when a job fails in the gate. Note that this
means that all projects that should be gated must have aligned branch
names when using per branch queues. Otherwise changes that belong
together end up in different queues.

View File

@ -0,0 +1,5 @@
---
features:
- |
Projects can now configure change queues to queue per branch.
See :attr:`queue` for more information.

View File

@ -3136,7 +3136,7 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
cid = None
changes[cid] = data
def release(self, regex=None):
def release(self, regex=None, change=None):
"""Release a held build.
:arg str regex: A regular expression which, if supplied, will
@ -3151,7 +3151,8 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
self.log.debug("Releasing build %s (%s)" % (regex, len(builds)))
for build in builds:
if not regex or re.match(regex, build.name):
if (not regex or re.match(regex, build.name) and
not change or build.change == change):
self.log.debug("Releasing build %s" %
(build.parameters['zuul']['build']))
build.release()

View File

@ -0,0 +1,2 @@
- hosts: all
tasks: []

View File

@ -0,0 +1,35 @@
- pipeline:
name: gate
manager: dependent
success-message: Build succeeded (gate).
trigger:
gerrit:
- event: comment-added
approval:
- Approved: 1
success:
gerrit:
Verified: 2
submit: true
failure:
gerrit:
Verified: -2
start:
gerrit:
Verified: 0
precedence: high
- job:
name: base
parent: null
- job:
name: project-test
run: playbooks/project-test.yaml
- project:
name: org/project2
gate:
queue: integrated
jobs:
- project-test

View File

@ -0,0 +1,3 @@
- queue:
name: integrated
per-branch: true

View File

@ -0,0 +1,10 @@
# This queue will be ignored since it is already defined in common-config
- queue:
name: integrated
per-branch: false
- project:
gate:
queue: integrated
jobs:
- project-test

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1,5 @@
- project:
gate:
queue: integrated-untrusted
jobs:
- project-test

View File

@ -0,0 +1,3 @@
- queue:
name: integrated-untrusted
per-branch: true

View File

@ -0,0 +1,10 @@
- tenant:
name: tenant-one
source:
gerrit:
config-projects:
- common-config
untrusted-projects:
- org/project
- org/project2
- org/project3

View File

@ -26,7 +26,7 @@ class TenantParserTestCase(ZuulTestCase):
create_project_keys = True
CONFIG_SET = set(['pipeline', 'job', 'semaphore', 'project',
'project-template', 'nodeset', 'secret'])
'project-template', 'nodeset', 'secret', 'queue'])
UNTRUSTED_SET = CONFIG_SET - set(['pipeline'])
def setupAllProjectKeys(self, config: ConfigParser):

View File

@ -20,6 +20,7 @@ import os
import shutil
import socket
import time
from collections import namedtuple
from unittest import mock
from unittest import skip
from kazoo.exceptions import NoNodeError
@ -3159,8 +3160,17 @@ class TestScheduler(ZuulTestCase):
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
(trusted, project1) = tenant.getProject('org/project1')
(trusted, project2) = tenant.getProject('org/project2')
q1 = tenant.layout.pipelines['gate'].getQueue(project1)
q2 = tenant.layout.pipelines['gate'].getQueue(project2)
# Change queues are created lazy by the dependent pipeline manager
# so retrieve the queue first without having to really enqueue a
# change first.
gate = tenant.layout.pipelines['gate']
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1, None)
q2 = gate.getQueue(project2, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -3170,8 +3180,18 @@ class TestScheduler(ZuulTestCase):
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
(trusted, project1) = tenant.getProject('org/project1')
(trusted, project2) = tenant.getProject('org/project2')
q1 = tenant.layout.pipelines['gate'].getQueue(project1)
q2 = tenant.layout.pipelines['gate'].getQueue(project2)
# Change queues are created lazy by the dependent pipeline manager
# so retrieve the queue first without having to really enqueue a
# change first.
gate = tenant.layout.pipelines['gate']
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1, None)
q2 = gate.getQueue(project2, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -3181,8 +3201,17 @@ class TestScheduler(ZuulTestCase):
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
(trusted, project1) = tenant.getProject('org/project1')
(trusted, project2) = tenant.getProject('org/project2')
q1 = tenant.layout.pipelines['gate'].getQueue(project1)
q2 = tenant.layout.pipelines['gate'].getQueue(project2)
# Change queues are created lazy by the dependent pipeline manager
# so retrieve the queue first without having to really enqueue a
# change first.
gate = tenant.layout.pipelines['gate']
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1, None)
q2 = gate.getQueue(project2, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -3192,8 +3221,17 @@ class TestScheduler(ZuulTestCase):
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
(trusted, project1) = tenant.getProject('org/project1')
(trusted, project2) = tenant.getProject('org/project2')
q1 = tenant.layout.pipelines['gate'].getQueue(project1)
q2 = tenant.layout.pipelines['gate'].getQueue(project2)
# Change queues are created lazy by the dependent pipeline manager
# so retrieve the queue first without having to really enqueue a
# change first.
gate = tenant.layout.pipelines['gate']
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1, None)
q2 = gate.getQueue(project2, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -6250,6 +6288,166 @@ For CI problems and help debugging, contact ci@example.org"""
], ordered=False)
class TestChangeQueues(ZuulTestCase):
tenant_config_file = 'config/change-queues/main.yaml'
def _test_dependent_queues_per_branch(self, project,
queue_name='integrated',
queue_repo='common-config'):
self.create_branch(project, 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(project, 'stable'))
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange(project, 'master', 'A')
B = self.fake_gerrit.addFakeChange(project, 'stable', 'B')
A.addApproval('Code-Review', 2)
B.addApproval('Code-Review', 2)
self.executor_server.failJob('project-test', A)
# Let first go A into gate then B
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
self.waitUntilSettled()
# There should be one project-test job at the head of each queue
self.assertBuilds([
dict(name='project-test', changes='1,1'),
dict(name='project-test', changes='2,1'),
])
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
_, p = tenant.getProject(project)
q1 = tenant.layout.pipelines['gate'].getQueue(p, 'master')
q2 = tenant.layout.pipelines['gate'].getQueue(p, 'stable')
self.assertEqual(q1.name, queue_name)
self.assertEqual(q2.name, queue_name)
# Both queues must contain one item
self.assertEqual(len(q1.queue), 1)
self.assertEqual(len(q2.queue), 1)
# Fail job on the change on master
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertNotEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 2)
# Now reconfigure the queue to be non-branched and run the same test
# again.
conf = textwrap.dedent(
"""
- queue:
name: {}
per-branch: false
""").format(queue_name)
file_dict = {'zuul.d/queue.yaml': conf}
C = self.fake_gerrit.addFakeChange(queue_repo, 'master', 'A',
files=file_dict)
C.setMerged()
self.fake_gerrit.addEvent(C.getChangeMergedEvent())
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = True
D = self.fake_gerrit.addFakeChange(project, 'master', 'D')
E = self.fake_gerrit.addFakeChange(project, 'stable', 'E')
D.addApproval('Code-Review', 2)
E.addApproval('Code-Review', 2)
self.executor_server.failJob('project-test', D)
# Let first go A into gate then B
self.fake_gerrit.addEvent(D.addApproval('Approved', 1))
self.waitUntilSettled()
self.fake_gerrit.addEvent(E.addApproval('Approved', 1))
self.waitUntilSettled()
# There should be two project-test jobs in a shared queue
self.assertBuilds([
dict(name='project-test', changes='4,1'),
dict(name='project-test', changes='4,1 5,1'),
])
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
_, p = tenant.getProject(project)
q1 = tenant.layout.pipelines['gate'].getQueue(p, 'master')
q2 = tenant.layout.pipelines['gate'].getQueue(p, 'stable')
q3 = tenant.layout.pipelines['gate'].getQueue(p, None)
# There should be no branch specific queues anymore
self.assertEqual(q1, None)
self.assertEqual(q2, None)
self.assertEqual(q3.name, queue_name)
# Both queues must contain one item
self.assertEqual(len(q3.queue), 2)
# Release project-test of D to make history after test deterministic
self.executor_server.release('project-test', change='4 1')
self.waitUntilSettled()
# Fail job on the change on master
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertNotEqual(D.data['status'], 'MERGED')
self.assertEqual(E.data['status'], 'MERGED')
self.assertEqual(D.reported, 2)
self.assertEqual(E.reported, 2)
self.assertHistory([
# Independent runs because of per branch queues
dict(name='project-test', result='FAILURE', changes='1,1'),
dict(name='project-test', result='SUCCESS', changes='2,1'),
# Same queue with gate reset because of 4,1
dict(name='project-test', result='FAILURE', changes='4,1'),
# Result can be anything depending on timing of the gate reset.
dict(name='project-test', changes='4,1 5,1'),
dict(name='project-test', result='SUCCESS', changes='5,1'),
], ordered=False)
def test_dependent_queues_per_branch(self):
"""
Test that change queues can be different for different branches.
In this case the project contains zuul config so the branches are
known upfront and the queues are pre-seeded.
"""
self._test_dependent_queues_per_branch('org/project')
def test_dependent_queues_per_branch_no_config(self):
"""
Test that change queues can be different for different branches.
In this case we create changes for two branches in a repo that
doesn't contain zuul config so the queues are not pre-seeded
in the gate pipeline.
"""
self._test_dependent_queues_per_branch('org/project2')
def test_dependent_queues_per_branch_untrusted(self):
"""
Test that change queues can be different for different branches.
In this case we create changes for two branches in an untrusted repo
that defines its own queue.
"""
self._test_dependent_queues_per_branch(
'org/project3', queue_name='integrated-untrusted',
queue_repo='org/project3')
class TestJobUpdateBrokenConfig(ZuulTestCase):
tenant_config_file = 'config/job-update-broken/main.yaml'

View File

@ -354,7 +354,7 @@ class ZuulMark(object):
class ZuulSafeLoader(yaml.SafeLoader):
zuul_node_types = frozenset(('job', 'nodeset', 'secret', 'pipeline',
'project', 'project-template',
'semaphore', 'pragma'))
'semaphore', 'queue', 'pragma'))
def __init__(self, stream, context):
wrapped_stream = io.StringIO(stream)
@ -1365,6 +1365,29 @@ class SemaphoreParser(object):
return semaphore
class QueueParser:
def __init__(self, pcontext):
self.log = logging.getLogger("zuul.QueueParser")
self.pcontext = pcontext
self.schema = self.getSchema()
def getSchema(self):
queue = {vs.Required('name'): str,
'per-branch': bool,
'_source_context': model.SourceContext,
'_start_mark': ZuulMark,
}
return vs.Schema(queue)
def fromYaml(self, conf):
self.schema(conf)
queue = model.Queue(conf['name'], conf.get('per-branch', False))
queue.source_context = conf.get('_source_context')
queue.start_mark = conf.get('_start_mark')
queue.freeze()
return queue
class AuthorizationRuleParser(object):
def __init__(self):
self.log = logging.getLogger("zuul.AuthorizationRuleParser")
@ -1413,6 +1436,7 @@ class ParseContext(object):
self.secret_parser = SecretParser(self)
self.job_parser = JobParser(self)
self.semaphore_parser = SemaphoreParser(self)
self.queue_parser = QueueParser(self)
self.project_template_parser = ProjectTemplateParser(self)
self.project_parser = ProjectParser(self)
@ -1452,7 +1476,7 @@ class TenantParser(object):
self.keystorage = keystorage
classes = vs.Any('pipeline', 'job', 'semaphore', 'project',
'project-template', 'nodeset', 'secret')
'project-template', 'nodeset', 'secret', 'queue')
project_dict = {str: {
'include': to_list(classes),
@ -1746,7 +1770,8 @@ class TenantParser(object):
untrusted_projects = []
default_include = frozenset(['pipeline', 'job', 'semaphore', 'project',
'secret', 'project-template', 'nodeset'])
'secret', 'project-template', 'nodeset',
'queue'])
for source_name, conf_source in conf_tenant.get('source', {}).items():
source = self.connections.getSource(source_name)
@ -1962,6 +1987,15 @@ class TenantParser(object):
parsed_config.semaphores.append(
pcontext.semaphore_parser.fromYaml(config_semaphore))
for config_queue in unparsed_config.queues:
classes = self._getLoadClasses(tenant, config_queue)
if 'queue' not in classes:
continue
with configuration_exceptions('queue',
config_queue, loading_errors):
parsed_config.queues.append(
pcontext.queue_parser.fromYaml(config_queue))
for config_template in unparsed_config.project_templates:
classes = self._getLoadClasses(tenant, config_template)
if 'project-template' not in classes:
@ -2019,6 +2053,9 @@ class TenantParser(object):
for job in parsed_config.jobs:
_cache('jobs', job)
for queue in parsed_config.queues:
_cache('queues', queue)
for semaphore in parsed_config.semaphores:
_cache('semaphores', semaphore)
@ -2079,6 +2116,10 @@ class TenantParser(object):
'semaphore', semaphore, layout.loading_errors):
semaphore_layout.addSemaphore(semaphore)
for queue in parsed_config.queues:
with reference_exceptions('queue', queue, layout.loading_errors):
layout.addQueue(queue)
for template in parsed_config.project_templates:
with reference_exceptions(
'project-template', template, layout.loading_errors):

View File

@ -9,11 +9,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import textwrap
import urllib
from abc import ABCMeta
from abc import ABCMeta, abstractmethod
from zuul import exceptions
from zuul import model
@ -252,6 +251,10 @@ class PipelineManager(metaclass=ABCMeta):
if item.change.equals(change):
self.removeItem(item)
@abstractmethod
def getChangeQueue(self, change, event, existing=None):
pass
def reEnqueueItem(self, item, last_head, old_item_ahead, item_ahead_valid):
log = get_annotated_logger(self.log, item.event)
with self.getChangeQueue(item.change, item.event,

View File

@ -65,15 +65,17 @@ class DependentPipelineManager(SharedQueuePipelineManager):
# for project in change_queue, project.source get changes, then dedup.
sources = set()
for project in change_queue.projects:
for project, _ in change_queue.project_branches:
sources.add(project.source)
seen = set(change.needed_by_changes)
needed_by_changes = change.needed_by_changes[:]
for source in sources:
log.debug(" Checking source: %s", source)
projects = [project_branch[0]
for project_branch in change_queue.project_branches]
for c in source.getChangesDependingOn(change,
change_queue.projects,
projects,
self.pipeline.tenant):
if c not in seen:
seen.add(c)

View File

@ -31,7 +31,7 @@ class IndependentPipelineManager(PipelineManager):
if existing:
return DynamicChangeQueueContextManager(existing)
change_queue = model.ChangeQueue(self.pipeline)
change_queue.addProject(change.project)
change_queue.addProject(change.project, None)
self.pipeline.addQueue(change_queue)
log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(change_queue)

View File

@ -18,6 +18,37 @@ from zuul.manager import PipelineManager, StaticChangeQueueContextManager
from zuul.manager import DynamicChangeQueueContextManager
class ChangeQueueManager:
def __init__(self, pipeline_manager, name=None, per_branch=False):
self.log = pipeline_manager.log
self.pipeline_manager = pipeline_manager
self.name = name
self.per_branch = per_branch
self.projects = []
self.created_for_branches = {}
def addProject(self, project):
self.projects.append(project)
def getOrCreateQueue(self, project, branch):
change_queue = self.created_for_branches.get(branch)
if not change_queue:
p = self.pipeline_manager.pipeline
change_queue = self.pipeline_manager.constructChangeQueue(
self.name)
p.addQueue(change_queue)
self.created_for_branches[branch] = change_queue
if not change_queue.matches(project, branch):
change_queue.addProject(project, branch)
self.log.debug("Added project %s to queue: %s" %
(project, change_queue))
return change_queue
class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
"""Intermediate class that adds the shared-queue behavior.
@ -28,9 +59,13 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
changes_merge = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.change_queue_managers = []
def buildChangeQueues(self, layout):
self.log.debug("Building shared change queues")
change_queues = {}
change_queues_managers = {}
tenant = self.pipeline.tenant
layout_project_configs = layout.project_configs
@ -49,21 +84,27 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
break
if not project_in_pipeline:
continue
if queue_name and queue_name in change_queues:
change_queue = change_queues[queue_name]
# Check if the queue is global or per branch
queue = layout.queues.get(queue_name)
per_branch = queue and queue.per_branch
if queue_name and queue_name in change_queues_managers:
change_queue_manager = change_queues_managers[queue_name]
else:
change_queue = self.constructChangeQueue(queue_name)
change_queue_manager = ChangeQueueManager(
self, name=queue_name, per_branch=per_branch)
if queue_name:
# If this is a named queue, keep track of it in
# case it is referenced again. Otherwise, it will
# have a name automatically generated from its
# constituent projects.
change_queues[queue_name] = change_queue
self.pipeline.addQueue(change_queue)
self.log.debug("Created queue: %s" % change_queue)
change_queue.addProject(project)
self.log.debug("Added project %s to queue: %s" %
(project, change_queue))
change_queues_managers[queue_name] = change_queue_manager
self.change_queue_managers.append(change_queue_manager)
self.log.debug("Created queue: %s" % change_queue_manager)
change_queue_manager.addProject(project)
self.log.debug("Added project %s to queue managers: %s" %
(project, change_queue_manager))
def getChangeQueue(self, change, event, existing=None):
log = get_annotated_logger(self.log, event)
@ -71,14 +112,35 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
# Ignore the existing queue, since we can always get the correct queue
# from the pipeline. This avoids enqueuing changes in a wrong queue
# e.g. during re-configuration.
queue = self.pipeline.getQueue(change.project)
queue = self.pipeline.getQueue(change.project, change.branch)
if queue:
return StaticChangeQueueContextManager(queue)
else:
# Change queues in the dependent pipeline manager are created
# lazy so first check the managers for the project.
matching_managers = [t for t in self.change_queue_managers
if change.project in t.projects]
if matching_managers:
manager = matching_managers[0]
branch = None
if manager.per_branch:
# The change queue is not existing yet for this branch
branch = change.branch
# We have a queue manager but no queue yet, so create it
return StaticChangeQueueContextManager(
manager.getOrCreateQueue(change.project, branch)
)
# No specific per-branch queue matched so look again with no branch
queue = self.pipeline.getQueue(change.project, None)
if queue:
return StaticChangeQueueContextManager(queue)
# There is no existing queue for this change. Create a
# dynamic one for this one change's use
change_queue = model.ChangeQueue(self.pipeline, dynamic=True)
change_queue.addProject(change.project)
change_queue.addProject(change.project, None)
self.pipeline.addQueue(change_queue)
log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(change_queue)

View File

@ -44,7 +44,7 @@ class SupercedentPipelineManager(PipelineManager):
window_floor=1,
window_increase_type='none',
window_decrease_type='none')
change_queue.addProject(change.project)
change_queue.addProject(change.project, None)
self.pipeline.addQueue(change_queue)
log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(change_queue)

View File

@ -272,6 +272,7 @@ class Pipeline(object):
self.queues = []
self.relative_priority_queues = {}
self.precedence = PRECEDENCE_NORMAL
self.supercedes = []
self.triggers = []
self.enqueue_actions = []
self.start_actions = []
@ -329,9 +330,10 @@ class Pipeline(object):
def addQueue(self, queue):
self.queues.append(queue)
def getQueue(self, project):
def getQueue(self, project, branch):
# Queues might be branch specific so match with branch
for queue in self.queues:
if project in queue.projects:
if queue.matches(project, branch):
return queue
return None
@ -416,7 +418,7 @@ class ChangeQueue(object):
self.name = name
else:
self.name = ''
self.projects = []
self.project_branches = []
self._jobs = set()
self.queue = []
self.window = window
@ -433,13 +435,24 @@ class ChangeQueue(object):
def getJobs(self):
return self._jobs
def addProject(self, project):
if project not in self.projects:
self.projects.append(project)
def addProject(self, project, branch):
"""
Adds a project branch combination to the queue.
The queue will match exactly this combination. If the caller doesn't
care about branches it can supply None (but must supply None as well
when matching)
"""
project_branch = (project, branch)
if project_branch not in self.project_branches:
self.project_branches.append(project_branch)
if not self.name:
self.name = project.name
def matches(self, project, branch):
return (project, branch) in self.project_branches
def enqueueChange(self, change, event):
item = QueueItem(self, change, event)
self.enqueueItem(item)
@ -3682,6 +3695,7 @@ class UnparsedConfig(object):
self.nodesets = []
self.secrets = []
self.semaphores = []
self.queues = []
# The list of files/dirs which this represents.
self.files_examined = set()
@ -3695,7 +3709,8 @@ class UnparsedConfig(object):
# project-branch-path so that we can share them across objects
source_contexts = {}
for attr in ['pragmas', 'pipelines', 'jobs', 'project_templates',
'projects', 'nodesets', 'secrets', 'semaphores']:
'projects', 'nodesets', 'secrets', 'semaphores',
'queues']:
# Make a deep copy of each of our attributes
old_objlist = getattr(self, attr)
new_objlist = copy.deepcopy(old_objlist)
@ -3725,6 +3740,7 @@ class UnparsedConfig(object):
self.nodesets.extend(conf.nodesets)
self.secrets.extend(conf.secrets)
self.semaphores.extend(conf.semaphores)
self.queues.extend(conf.queues)
return
if not isinstance(conf, list):
@ -3750,6 +3766,8 @@ class UnparsedConfig(object):
self.secrets.append(value)
elif key == 'semaphore':
self.semaphores.append(value)
elif key == 'queue':
self.queues.append(value)
elif key == 'pragma':
self.pragmas.append(value)
else:
@ -3769,6 +3787,7 @@ class ParsedConfig(object):
self.nodesets = []
self.secrets = []
self.semaphores = []
self.queues = []
def copy(self):
r = ParsedConfig()
@ -3781,6 +3800,7 @@ class ParsedConfig(object):
r.nodesets = self.nodesets[:]
r.secrets = self.secrets[:]
r.semaphores = self.semaphores[:]
r.queues = self.queues[:]
return r
def extend(self, conf):
@ -3793,6 +3813,7 @@ class ParsedConfig(object):
self.nodesets.extend(conf.nodesets)
self.secrets.extend(conf.secrets)
self.semaphores.extend(conf.semaphores)
self.queues.extend(conf.queues)
for regex, projects in conf.projects_by_regex.items():
self.projects_by_regex.setdefault(regex, []).extend(projects)
return
@ -3826,6 +3847,7 @@ class Layout(object):
self.nodesets = {}
self.secrets = {}
self.semaphores = {}
self.queues = {}
self.loading_errors = LoadingErrors()
def getJob(self, name):
@ -3934,6 +3956,13 @@ class Layout(object):
return
self.semaphores[semaphore.name] = semaphore
def addQueue(self, queue):
# Change queues must be unique and cannot be overridden.
if queue.name in self.queues:
raise Exception('Queue %s is already defined' % queue.name)
self.queues[queue.name] = queue
def addPipeline(self, pipeline):
if pipeline.tenant is not self.tenant:
raise Exception("Pipeline created for tenant %s "
@ -4293,6 +4322,22 @@ class Semaphore(ConfigObject):
self.max == other.max)
class Queue(ConfigObject):
def __init__(self, name, per_branch=False):
super().__init__()
self.name = name
self.per_branch = per_branch
def __ne__(self, other):
return not self.__eq__(other)
def __eq__(self, other):
if not isinstance(other, Queue):
return False
return (self.name == other.name and
self.per_branch == other.per_branch)
class SemaphoreHandler(object):
log = logging.getLogger("zuul.SemaphoreHandler")

View File

@ -1029,7 +1029,8 @@ class Scheduler(threading.Thread):
last_head = None
for shared_queue in old_pipeline.queues:
# Attempt to keep window sizes from shrinking where possible
new_queue = new_pipeline.getQueue(shared_queue.projects[0])
project, branch = shared_queue.project_branches[0]
new_queue = new_pipeline.getQueue(project, branch)
if new_queue and shared_queue.window and (not static_window):
new_queue.window = max(shared_queue.window,
new_queue.window_floor)