Merge "Support per branch change queues"
This commit is contained in:
commit
27084136ff
|
@ -92,6 +92,7 @@ the YAML files:
|
|||
pipeline_def
|
||||
job_def
|
||||
project_def
|
||||
queue_def
|
||||
secret_def
|
||||
nodeset_def
|
||||
semaphore_def
|
||||
|
|
|
@ -176,6 +176,9 @@ pipeline.
|
|||
changes which break the others. This is a free-form string;
|
||||
just set the same value for each group of projects.
|
||||
|
||||
The name can refer to the name of a :attr:`queue` which allows
|
||||
further configuration of the queue.
|
||||
|
||||
Each pipeline for a project can only belong to one queue,
|
||||
therefore Zuul will use the first value that it encounters.
|
||||
It need not appear in the first instance of a :attr:`project`
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
.. _queue:
|
||||
|
||||
Queue
|
||||
=====
|
||||
|
||||
Projects that interact with each other should share a ``queue``.
|
||||
This is especially used in a :value:`dependent <pipeline.manager.dependent>`
|
||||
pipeline. The :attr:`project.<pipeline>.queue` can optionally refer
|
||||
to a specific :attr:`queue` object that can further configure the
|
||||
behavior of the queue.
|
||||
|
||||
Here is an example ``queue`` configuration.
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
- queue:
|
||||
name: integrated
|
||||
per-branch: false
|
||||
|
||||
|
||||
.. attr:: queue
|
||||
|
||||
The attributes available on a queue are as follows (all are
|
||||
optional unless otherwise specified):
|
||||
|
||||
.. attr:: name
|
||||
:required:
|
||||
|
||||
This is used later in the project definition to refer to this queue.
|
||||
|
||||
.. attr:: per-branch
|
||||
:default: false
|
||||
|
||||
Queues by default define a single queue for all projects and
|
||||
branches that use it. This is especially important if projects
|
||||
want to do upgrade tests between different branches in
|
||||
the :term:`gate`. If a set of projects doesn't have this use case
|
||||
it can configure the queue to create a shared queue per branch for
|
||||
all projects. This can be useful for large projects to improve the
|
||||
throughput of a gate pipeline as this results in shorter queues
|
||||
and thus less impact when a job fails in the gate. Note that this
|
||||
means that all projects that should be gated must have aligned branch
|
||||
names when using per branch queues. Otherwise changes that belong
|
||||
together end up in different queues.
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Projects can now configure change queues to queue per branch.
|
||||
See :attr:`queue` for more information.
|
|
@ -3178,7 +3178,7 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
|
|||
cid = None
|
||||
changes[cid] = data
|
||||
|
||||
def release(self, regex=None):
|
||||
def release(self, regex=None, change=None):
|
||||
"""Release a held build.
|
||||
|
||||
:arg str regex: A regular expression which, if supplied, will
|
||||
|
@ -3193,7 +3193,8 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
|
|||
|
||||
self.log.debug("Releasing build %s (%s)" % (regex, len(builds)))
|
||||
for build in builds:
|
||||
if not regex or re.match(regex, build.name):
|
||||
if (not regex or re.match(regex, build.name) and
|
||||
not change or build.change == change):
|
||||
self.log.debug("Releasing build %s" %
|
||||
(build.parameters['zuul']['build']))
|
||||
build.release()
|
||||
|
|
2
tests/fixtures/config/change-queues/git/common-config/playbooks/project-test.yaml
vendored
Normal file
2
tests/fixtures/config/change-queues/git/common-config/playbooks/project-test.yaml
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
- hosts: all
|
||||
tasks: []
|
|
@ -0,0 +1,35 @@
|
|||
- pipeline:
|
||||
name: gate
|
||||
manager: dependent
|
||||
success-message: Build succeeded (gate).
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: comment-added
|
||||
approval:
|
||||
- Approved: 1
|
||||
success:
|
||||
gerrit:
|
||||
Verified: 2
|
||||
submit: true
|
||||
failure:
|
||||
gerrit:
|
||||
Verified: -2
|
||||
start:
|
||||
gerrit:
|
||||
Verified: 0
|
||||
precedence: high
|
||||
|
||||
- job:
|
||||
name: base
|
||||
parent: null
|
||||
|
||||
- job:
|
||||
name: project-test
|
||||
run: playbooks/project-test.yaml
|
||||
|
||||
- project:
|
||||
name: org/project2
|
||||
gate:
|
||||
queue: integrated
|
||||
jobs:
|
||||
- project-test
|
|
@ -0,0 +1,3 @@
|
|||
- queue:
|
||||
name: integrated
|
||||
per-branch: true
|
|
@ -0,0 +1,10 @@
|
|||
# This queue will be ignored since it is already defined in common-config
|
||||
- queue:
|
||||
name: integrated
|
||||
per-branch: false
|
||||
|
||||
- project:
|
||||
gate:
|
||||
queue: integrated
|
||||
jobs:
|
||||
- project-test
|
|
@ -0,0 +1 @@
|
|||
test
|
|
@ -0,0 +1,5 @@
|
|||
- project:
|
||||
gate:
|
||||
queue: integrated-untrusted
|
||||
jobs:
|
||||
- project-test
|
|
@ -0,0 +1,3 @@
|
|||
- queue:
|
||||
name: integrated-untrusted
|
||||
per-branch: true
|
|
@ -0,0 +1,10 @@
|
|||
- tenant:
|
||||
name: tenant-one
|
||||
source:
|
||||
gerrit:
|
||||
config-projects:
|
||||
- common-config
|
||||
untrusted-projects:
|
||||
- org/project
|
||||
- org/project2
|
||||
- org/project3
|
|
@ -26,7 +26,7 @@ class TenantParserTestCase(ZuulTestCase):
|
|||
create_project_keys = True
|
||||
|
||||
CONFIG_SET = set(['pipeline', 'job', 'semaphore', 'project',
|
||||
'project-template', 'nodeset', 'secret'])
|
||||
'project-template', 'nodeset', 'secret', 'queue'])
|
||||
UNTRUSTED_SET = CONFIG_SET - set(['pipeline'])
|
||||
|
||||
def setupAllProjectKeys(self, config: ConfigParser):
|
||||
|
|
|
@ -20,6 +20,7 @@ import os
|
|||
import shutil
|
||||
import socket
|
||||
import time
|
||||
from collections import namedtuple
|
||||
from unittest import mock
|
||||
from unittest import skip
|
||||
from kazoo.exceptions import NoNodeError
|
||||
|
@ -3167,8 +3168,17 @@ class TestScheduler(ZuulTestCase):
|
|||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
(trusted, project1) = tenant.getProject('org/project1')
|
||||
(trusted, project2) = tenant.getProject('org/project2')
|
||||
q1 = tenant.layout.pipelines['gate'].getQueue(project1)
|
||||
q2 = tenant.layout.pipelines['gate'].getQueue(project2)
|
||||
# Change queues are created lazy by the dependent pipeline manager
|
||||
# so retrieve the queue first without having to really enqueue a
|
||||
# change first.
|
||||
gate = tenant.layout.pipelines['gate']
|
||||
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
|
||||
fake_a = FakeChange(project1, 'master')
|
||||
fake_b = FakeChange(project2, 'master')
|
||||
gate.manager.getChangeQueue(fake_a, None)
|
||||
gate.manager.getChangeQueue(fake_b, None)
|
||||
q1 = gate.getQueue(project1, None)
|
||||
q2 = gate.getQueue(project2, None)
|
||||
self.assertEqual(q1.name, 'integrated')
|
||||
self.assertEqual(q2.name, 'integrated')
|
||||
|
||||
|
@ -3178,8 +3188,18 @@ class TestScheduler(ZuulTestCase):
|
|||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
(trusted, project1) = tenant.getProject('org/project1')
|
||||
(trusted, project2) = tenant.getProject('org/project2')
|
||||
q1 = tenant.layout.pipelines['gate'].getQueue(project1)
|
||||
q2 = tenant.layout.pipelines['gate'].getQueue(project2)
|
||||
|
||||
# Change queues are created lazy by the dependent pipeline manager
|
||||
# so retrieve the queue first without having to really enqueue a
|
||||
# change first.
|
||||
gate = tenant.layout.pipelines['gate']
|
||||
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
|
||||
fake_a = FakeChange(project1, 'master')
|
||||
fake_b = FakeChange(project2, 'master')
|
||||
gate.manager.getChangeQueue(fake_a, None)
|
||||
gate.manager.getChangeQueue(fake_b, None)
|
||||
q1 = gate.getQueue(project1, None)
|
||||
q2 = gate.getQueue(project2, None)
|
||||
self.assertEqual(q1.name, 'integrated')
|
||||
self.assertEqual(q2.name, 'integrated')
|
||||
|
||||
|
@ -3189,8 +3209,17 @@ class TestScheduler(ZuulTestCase):
|
|||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
(trusted, project1) = tenant.getProject('org/project1')
|
||||
(trusted, project2) = tenant.getProject('org/project2')
|
||||
q1 = tenant.layout.pipelines['gate'].getQueue(project1)
|
||||
q2 = tenant.layout.pipelines['gate'].getQueue(project2)
|
||||
# Change queues are created lazy by the dependent pipeline manager
|
||||
# so retrieve the queue first without having to really enqueue a
|
||||
# change first.
|
||||
gate = tenant.layout.pipelines['gate']
|
||||
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
|
||||
fake_a = FakeChange(project1, 'master')
|
||||
fake_b = FakeChange(project2, 'master')
|
||||
gate.manager.getChangeQueue(fake_a, None)
|
||||
gate.manager.getChangeQueue(fake_b, None)
|
||||
q1 = gate.getQueue(project1, None)
|
||||
q2 = gate.getQueue(project2, None)
|
||||
self.assertEqual(q1.name, 'integrated')
|
||||
self.assertEqual(q2.name, 'integrated')
|
||||
|
||||
|
@ -3200,8 +3229,17 @@ class TestScheduler(ZuulTestCase):
|
|||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
(trusted, project1) = tenant.getProject('org/project1')
|
||||
(trusted, project2) = tenant.getProject('org/project2')
|
||||
q1 = tenant.layout.pipelines['gate'].getQueue(project1)
|
||||
q2 = tenant.layout.pipelines['gate'].getQueue(project2)
|
||||
# Change queues are created lazy by the dependent pipeline manager
|
||||
# so retrieve the queue first without having to really enqueue a
|
||||
# change first.
|
||||
gate = tenant.layout.pipelines['gate']
|
||||
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
|
||||
fake_a = FakeChange(project1, 'master')
|
||||
fake_b = FakeChange(project2, 'master')
|
||||
gate.manager.getChangeQueue(fake_a, None)
|
||||
gate.manager.getChangeQueue(fake_b, None)
|
||||
q1 = gate.getQueue(project1, None)
|
||||
q2 = gate.getQueue(project2, None)
|
||||
self.assertEqual(q1.name, 'integrated')
|
||||
self.assertEqual(q2.name, 'integrated')
|
||||
|
||||
|
@ -6264,6 +6302,166 @@ For CI problems and help debugging, contact ci@example.org"""
|
|||
], ordered=False)
|
||||
|
||||
|
||||
class TestChangeQueues(ZuulTestCase):
|
||||
tenant_config_file = 'config/change-queues/main.yaml'
|
||||
|
||||
def _test_dependent_queues_per_branch(self, project,
|
||||
queue_name='integrated',
|
||||
queue_repo='common-config'):
|
||||
self.create_branch(project, 'stable')
|
||||
self.fake_gerrit.addEvent(
|
||||
self.fake_gerrit.getFakeBranchCreatedEvent(project, 'stable'))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange(project, 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange(project, 'stable', 'B')
|
||||
A.addApproval('Code-Review', 2)
|
||||
B.addApproval('Code-Review', 2)
|
||||
|
||||
self.executor_server.failJob('project-test', A)
|
||||
|
||||
# Let first go A into gate then B
|
||||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# There should be one project-test job at the head of each queue
|
||||
self.assertBuilds([
|
||||
dict(name='project-test', changes='1,1'),
|
||||
dict(name='project-test', changes='2,1'),
|
||||
])
|
||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
_, p = tenant.getProject(project)
|
||||
q1 = tenant.layout.pipelines['gate'].getQueue(p, 'master')
|
||||
q2 = tenant.layout.pipelines['gate'].getQueue(p, 'stable')
|
||||
self.assertEqual(q1.name, queue_name)
|
||||
self.assertEqual(q2.name, queue_name)
|
||||
|
||||
# Both queues must contain one item
|
||||
self.assertEqual(len(q1.queue), 1)
|
||||
self.assertEqual(len(q2.queue), 1)
|
||||
|
||||
# Fail job on the change on master
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertNotEqual(A.data['status'], 'MERGED')
|
||||
self.assertEqual(B.data['status'], 'MERGED')
|
||||
self.assertEqual(A.reported, 2)
|
||||
self.assertEqual(B.reported, 2)
|
||||
|
||||
# Now reconfigure the queue to be non-branched and run the same test
|
||||
# again.
|
||||
conf = textwrap.dedent(
|
||||
"""
|
||||
- queue:
|
||||
name: {}
|
||||
per-branch: false
|
||||
""").format(queue_name)
|
||||
|
||||
file_dict = {'zuul.d/queue.yaml': conf}
|
||||
C = self.fake_gerrit.addFakeChange(queue_repo, 'master', 'A',
|
||||
files=file_dict)
|
||||
C.setMerged()
|
||||
self.fake_gerrit.addEvent(C.getChangeMergedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
D = self.fake_gerrit.addFakeChange(project, 'master', 'D')
|
||||
E = self.fake_gerrit.addFakeChange(project, 'stable', 'E')
|
||||
D.addApproval('Code-Review', 2)
|
||||
E.addApproval('Code-Review', 2)
|
||||
|
||||
self.executor_server.failJob('project-test', D)
|
||||
|
||||
# Let first go A into gate then B
|
||||
self.fake_gerrit.addEvent(D.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.fake_gerrit.addEvent(E.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# There should be two project-test jobs in a shared queue
|
||||
self.assertBuilds([
|
||||
dict(name='project-test', changes='4,1'),
|
||||
dict(name='project-test', changes='4,1 5,1'),
|
||||
])
|
||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
_, p = tenant.getProject(project)
|
||||
q1 = tenant.layout.pipelines['gate'].getQueue(p, 'master')
|
||||
q2 = tenant.layout.pipelines['gate'].getQueue(p, 'stable')
|
||||
q3 = tenant.layout.pipelines['gate'].getQueue(p, None)
|
||||
|
||||
# There should be no branch specific queues anymore
|
||||
self.assertEqual(q1, None)
|
||||
self.assertEqual(q2, None)
|
||||
self.assertEqual(q3.name, queue_name)
|
||||
|
||||
# Both queues must contain one item
|
||||
self.assertEqual(len(q3.queue), 2)
|
||||
|
||||
# Release project-test of D to make history after test deterministic
|
||||
self.executor_server.release('project-test', change='4 1')
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Fail job on the change on master
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertNotEqual(D.data['status'], 'MERGED')
|
||||
self.assertEqual(E.data['status'], 'MERGED')
|
||||
self.assertEqual(D.reported, 2)
|
||||
self.assertEqual(E.reported, 2)
|
||||
|
||||
self.assertHistory([
|
||||
# Independent runs because of per branch queues
|
||||
dict(name='project-test', result='FAILURE', changes='1,1'),
|
||||
dict(name='project-test', result='SUCCESS', changes='2,1'),
|
||||
|
||||
# Same queue with gate reset because of 4,1
|
||||
dict(name='project-test', result='FAILURE', changes='4,1'),
|
||||
|
||||
# Result can be anything depending on timing of the gate reset.
|
||||
dict(name='project-test', changes='4,1 5,1'),
|
||||
dict(name='project-test', result='SUCCESS', changes='5,1'),
|
||||
], ordered=False)
|
||||
|
||||
def test_dependent_queues_per_branch(self):
|
||||
"""
|
||||
Test that change queues can be different for different branches.
|
||||
|
||||
In this case the project contains zuul config so the branches are
|
||||
known upfront and the queues are pre-seeded.
|
||||
"""
|
||||
self._test_dependent_queues_per_branch('org/project')
|
||||
|
||||
def test_dependent_queues_per_branch_no_config(self):
|
||||
"""
|
||||
Test that change queues can be different for different branches.
|
||||
|
||||
In this case we create changes for two branches in a repo that
|
||||
doesn't contain zuul config so the queues are not pre-seeded
|
||||
in the gate pipeline.
|
||||
"""
|
||||
self._test_dependent_queues_per_branch('org/project2')
|
||||
|
||||
def test_dependent_queues_per_branch_untrusted(self):
|
||||
"""
|
||||
Test that change queues can be different for different branches.
|
||||
|
||||
In this case we create changes for two branches in an untrusted repo
|
||||
that defines its own queue.
|
||||
"""
|
||||
self._test_dependent_queues_per_branch(
|
||||
'org/project3', queue_name='integrated-untrusted',
|
||||
queue_repo='org/project3')
|
||||
|
||||
|
||||
class TestJobUpdateBrokenConfig(ZuulTestCase):
|
||||
tenant_config_file = 'config/job-update-broken/main.yaml'
|
||||
|
||||
|
|
|
@ -355,7 +355,7 @@ class ZuulMark(object):
|
|||
class ZuulSafeLoader(yaml.SafeLoader):
|
||||
zuul_node_types = frozenset(('job', 'nodeset', 'secret', 'pipeline',
|
||||
'project', 'project-template',
|
||||
'semaphore', 'pragma'))
|
||||
'semaphore', 'queue', 'pragma'))
|
||||
|
||||
def __init__(self, stream, context):
|
||||
wrapped_stream = io.StringIO(stream)
|
||||
|
@ -1391,6 +1391,29 @@ class SemaphoreParser(object):
|
|||
return semaphore
|
||||
|
||||
|
||||
class QueueParser:
|
||||
def __init__(self, pcontext):
|
||||
self.log = logging.getLogger("zuul.QueueParser")
|
||||
self.pcontext = pcontext
|
||||
self.schema = self.getSchema()
|
||||
|
||||
def getSchema(self):
|
||||
queue = {vs.Required('name'): str,
|
||||
'per-branch': bool,
|
||||
'_source_context': model.SourceContext,
|
||||
'_start_mark': ZuulMark,
|
||||
}
|
||||
return vs.Schema(queue)
|
||||
|
||||
def fromYaml(self, conf):
|
||||
self.schema(conf)
|
||||
queue = model.Queue(conf['name'], conf.get('per-branch', False))
|
||||
queue.source_context = conf.get('_source_context')
|
||||
queue.start_mark = conf.get('_start_mark')
|
||||
queue.freeze()
|
||||
return queue
|
||||
|
||||
|
||||
class AuthorizationRuleParser(object):
|
||||
def __init__(self):
|
||||
self.log = logging.getLogger("zuul.AuthorizationRuleParser")
|
||||
|
@ -1439,6 +1462,7 @@ class ParseContext(object):
|
|||
self.secret_parser = SecretParser(self)
|
||||
self.job_parser = JobParser(self)
|
||||
self.semaphore_parser = SemaphoreParser(self)
|
||||
self.queue_parser = QueueParser(self)
|
||||
self.project_template_parser = ProjectTemplateParser(self)
|
||||
self.project_parser = ProjectParser(self)
|
||||
|
||||
|
@ -1478,7 +1502,7 @@ class TenantParser(object):
|
|||
self.keystorage = keystorage
|
||||
|
||||
classes = vs.Any('pipeline', 'job', 'semaphore', 'project',
|
||||
'project-template', 'nodeset', 'secret')
|
||||
'project-template', 'nodeset', 'secret', 'queue')
|
||||
|
||||
project_dict = {str: {
|
||||
'include': to_list(classes),
|
||||
|
@ -1772,7 +1796,8 @@ class TenantParser(object):
|
|||
untrusted_projects = []
|
||||
|
||||
default_include = frozenset(['pipeline', 'job', 'semaphore', 'project',
|
||||
'secret', 'project-template', 'nodeset'])
|
||||
'secret', 'project-template', 'nodeset',
|
||||
'queue'])
|
||||
|
||||
for source_name, conf_source in conf_tenant.get('source', {}).items():
|
||||
source = self.connections.getSource(source_name)
|
||||
|
@ -1988,6 +2013,15 @@ class TenantParser(object):
|
|||
parsed_config.semaphores.append(
|
||||
pcontext.semaphore_parser.fromYaml(config_semaphore))
|
||||
|
||||
for config_queue in unparsed_config.queues:
|
||||
classes = self._getLoadClasses(tenant, config_queue)
|
||||
if 'queue' not in classes:
|
||||
continue
|
||||
with configuration_exceptions('queue',
|
||||
config_queue, loading_errors):
|
||||
parsed_config.queues.append(
|
||||
pcontext.queue_parser.fromYaml(config_queue))
|
||||
|
||||
for config_template in unparsed_config.project_templates:
|
||||
classes = self._getLoadClasses(tenant, config_template)
|
||||
if 'project-template' not in classes:
|
||||
|
@ -2045,6 +2079,9 @@ class TenantParser(object):
|
|||
for job in parsed_config.jobs:
|
||||
_cache('jobs', job)
|
||||
|
||||
for queue in parsed_config.queues:
|
||||
_cache('queues', queue)
|
||||
|
||||
for semaphore in parsed_config.semaphores:
|
||||
_cache('semaphores', semaphore)
|
||||
|
||||
|
@ -2105,6 +2142,10 @@ class TenantParser(object):
|
|||
'semaphore', semaphore, layout.loading_errors):
|
||||
semaphore_layout.addSemaphore(semaphore)
|
||||
|
||||
for queue in parsed_config.queues:
|
||||
with reference_exceptions('queue', queue, layout.loading_errors):
|
||||
layout.addQueue(queue)
|
||||
|
||||
for template in parsed_config.project_templates:
|
||||
with reference_exceptions(
|
||||
'project-template', template, layout.loading_errors):
|
||||
|
|
|
@ -9,11 +9,10 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import textwrap
|
||||
import urllib
|
||||
from abc import ABCMeta
|
||||
from abc import ABCMeta, abstractmethod
|
||||
|
||||
from zuul import exceptions
|
||||
from zuul import model
|
||||
|
@ -252,6 +251,10 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
if item.change.equals(change):
|
||||
self.removeItem(item)
|
||||
|
||||
@abstractmethod
|
||||
def getChangeQueue(self, change, event, existing=None):
|
||||
pass
|
||||
|
||||
def reEnqueueItem(self, item, last_head, old_item_ahead, item_ahead_valid):
|
||||
log = get_annotated_logger(self.log, item.event)
|
||||
with self.getChangeQueue(item.change, item.event,
|
||||
|
|
|
@ -65,15 +65,17 @@ class DependentPipelineManager(SharedQueuePipelineManager):
|
|||
|
||||
# for project in change_queue, project.source get changes, then dedup.
|
||||
sources = set()
|
||||
for project in change_queue.projects:
|
||||
for project, _ in change_queue.project_branches:
|
||||
sources.add(project.source)
|
||||
|
||||
seen = set(change.needed_by_changes)
|
||||
needed_by_changes = change.needed_by_changes[:]
|
||||
for source in sources:
|
||||
log.debug(" Checking source: %s", source)
|
||||
projects = [project_branch[0]
|
||||
for project_branch in change_queue.project_branches]
|
||||
for c in source.getChangesDependingOn(change,
|
||||
change_queue.projects,
|
||||
projects,
|
||||
self.pipeline.tenant):
|
||||
if c not in seen:
|
||||
seen.add(c)
|
||||
|
|
|
@ -31,7 +31,7 @@ class IndependentPipelineManager(PipelineManager):
|
|||
if existing:
|
||||
return DynamicChangeQueueContextManager(existing)
|
||||
change_queue = model.ChangeQueue(self.pipeline)
|
||||
change_queue.addProject(change.project)
|
||||
change_queue.addProject(change.project, None)
|
||||
self.pipeline.addQueue(change_queue)
|
||||
log.debug("Dynamically created queue %s", change_queue)
|
||||
return DynamicChangeQueueContextManager(change_queue)
|
||||
|
|
|
@ -18,6 +18,37 @@ from zuul.manager import PipelineManager, StaticChangeQueueContextManager
|
|||
from zuul.manager import DynamicChangeQueueContextManager
|
||||
|
||||
|
||||
class ChangeQueueManager:
|
||||
|
||||
def __init__(self, pipeline_manager, name=None, per_branch=False):
|
||||
self.log = pipeline_manager.log
|
||||
self.pipeline_manager = pipeline_manager
|
||||
self.name = name
|
||||
self.per_branch = per_branch
|
||||
self.projects = []
|
||||
self.created_for_branches = {}
|
||||
|
||||
def addProject(self, project):
|
||||
self.projects.append(project)
|
||||
|
||||
def getOrCreateQueue(self, project, branch):
|
||||
change_queue = self.created_for_branches.get(branch)
|
||||
|
||||
if not change_queue:
|
||||
p = self.pipeline_manager.pipeline
|
||||
change_queue = self.pipeline_manager.constructChangeQueue(
|
||||
self.name)
|
||||
p.addQueue(change_queue)
|
||||
self.created_for_branches[branch] = change_queue
|
||||
|
||||
if not change_queue.matches(project, branch):
|
||||
change_queue.addProject(project, branch)
|
||||
self.log.debug("Added project %s to queue: %s" %
|
||||
(project, change_queue))
|
||||
|
||||
return change_queue
|
||||
|
||||
|
||||
class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
|
||||
"""Intermediate class that adds the shared-queue behavior.
|
||||
|
||||
|
@ -28,9 +59,13 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
|
|||
|
||||
changes_merge = False
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.change_queue_managers = []
|
||||
|
||||
def buildChangeQueues(self, layout):
|
||||
self.log.debug("Building shared change queues")
|
||||
change_queues = {}
|
||||
change_queues_managers = {}
|
||||
tenant = self.pipeline.tenant
|
||||
layout_project_configs = layout.project_configs
|
||||
|
||||
|
@ -49,21 +84,27 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
|
|||
break
|
||||
if not project_in_pipeline:
|
||||
continue
|
||||
if queue_name and queue_name in change_queues:
|
||||
change_queue = change_queues[queue_name]
|
||||
|
||||
# Check if the queue is global or per branch
|
||||
queue = layout.queues.get(queue_name)
|
||||
per_branch = queue and queue.per_branch
|
||||
|
||||
if queue_name and queue_name in change_queues_managers:
|
||||
change_queue_manager = change_queues_managers[queue_name]
|
||||
else:
|
||||
change_queue = self.constructChangeQueue(queue_name)
|
||||
change_queue_manager = ChangeQueueManager(
|
||||
self, name=queue_name, per_branch=per_branch)
|
||||
if queue_name:
|
||||
# If this is a named queue, keep track of it in
|
||||
# case it is referenced again. Otherwise, it will
|
||||
# have a name automatically generated from its
|
||||
# constituent projects.
|
||||
change_queues[queue_name] = change_queue
|
||||
self.pipeline.addQueue(change_queue)
|
||||
self.log.debug("Created queue: %s" % change_queue)
|
||||
change_queue.addProject(project)
|
||||
self.log.debug("Added project %s to queue: %s" %
|
||||
(project, change_queue))
|
||||
change_queues_managers[queue_name] = change_queue_manager
|
||||
self.change_queue_managers.append(change_queue_manager)
|
||||
self.log.debug("Created queue: %s" % change_queue_manager)
|
||||
change_queue_manager.addProject(project)
|
||||
self.log.debug("Added project %s to queue managers: %s" %
|
||||
(project, change_queue_manager))
|
||||
|
||||
def getChangeQueue(self, change, event, existing=None):
|
||||
log = get_annotated_logger(self.log, event)
|
||||
|
@ -71,14 +112,35 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
|
|||
# Ignore the existing queue, since we can always get the correct queue
|
||||
# from the pipeline. This avoids enqueuing changes in a wrong queue
|
||||
# e.g. during re-configuration.
|
||||
queue = self.pipeline.getQueue(change.project)
|
||||
queue = self.pipeline.getQueue(change.project, change.branch)
|
||||
if queue:
|
||||
return StaticChangeQueueContextManager(queue)
|
||||
else:
|
||||
# Change queues in the dependent pipeline manager are created
|
||||
# lazy so first check the managers for the project.
|
||||
matching_managers = [t for t in self.change_queue_managers
|
||||
if change.project in t.projects]
|
||||
if matching_managers:
|
||||
manager = matching_managers[0]
|
||||
branch = None
|
||||
if manager.per_branch:
|
||||
# The change queue is not existing yet for this branch
|
||||
branch = change.branch
|
||||
|
||||
# We have a queue manager but no queue yet, so create it
|
||||
return StaticChangeQueueContextManager(
|
||||
manager.getOrCreateQueue(change.project, branch)
|
||||
)
|
||||
|
||||
# No specific per-branch queue matched so look again with no branch
|
||||
queue = self.pipeline.getQueue(change.project, None)
|
||||
if queue:
|
||||
return StaticChangeQueueContextManager(queue)
|
||||
|
||||
# There is no existing queue for this change. Create a
|
||||
# dynamic one for this one change's use
|
||||
change_queue = model.ChangeQueue(self.pipeline, dynamic=True)
|
||||
change_queue.addProject(change.project)
|
||||
change_queue.addProject(change.project, None)
|
||||
self.pipeline.addQueue(change_queue)
|
||||
log.debug("Dynamically created queue %s", change_queue)
|
||||
return DynamicChangeQueueContextManager(change_queue)
|
||||
|
|
|
@ -44,7 +44,7 @@ class SupercedentPipelineManager(PipelineManager):
|
|||
window_floor=1,
|
||||
window_increase_type='none',
|
||||
window_decrease_type='none')
|
||||
change_queue.addProject(change.project)
|
||||
change_queue.addProject(change.project, None)
|
||||
self.pipeline.addQueue(change_queue)
|
||||
log.debug("Dynamically created queue %s", change_queue)
|
||||
return DynamicChangeQueueContextManager(change_queue)
|
||||
|
|
|
@ -273,6 +273,7 @@ class Pipeline(object):
|
|||
self.queues = []
|
||||
self.relative_priority_queues = {}
|
||||
self.precedence = PRECEDENCE_NORMAL
|
||||
self.supercedes = []
|
||||
self.triggers = []
|
||||
self.enqueue_actions = []
|
||||
self.start_actions = []
|
||||
|
@ -330,9 +331,10 @@ class Pipeline(object):
|
|||
def addQueue(self, queue):
|
||||
self.queues.append(queue)
|
||||
|
||||
def getQueue(self, project):
|
||||
def getQueue(self, project, branch):
|
||||
# Queues might be branch specific so match with branch
|
||||
for queue in self.queues:
|
||||
if project in queue.projects:
|
||||
if queue.matches(project, branch):
|
||||
return queue
|
||||
return None
|
||||
|
||||
|
@ -417,7 +419,7 @@ class ChangeQueue(object):
|
|||
self.name = name
|
||||
else:
|
||||
self.name = ''
|
||||
self.projects = []
|
||||
self.project_branches = []
|
||||
self._jobs = set()
|
||||
self.queue = []
|
||||
self.window = window
|
||||
|
@ -434,13 +436,24 @@ class ChangeQueue(object):
|
|||
def getJobs(self):
|
||||
return self._jobs
|
||||
|
||||
def addProject(self, project):
|
||||
if project not in self.projects:
|
||||
self.projects.append(project)
|
||||
def addProject(self, project, branch):
|
||||
"""
|
||||
Adds a project branch combination to the queue.
|
||||
|
||||
The queue will match exactly this combination. If the caller doesn't
|
||||
care about branches it can supply None (but must supply None as well
|
||||
when matching)
|
||||
"""
|
||||
project_branch = (project, branch)
|
||||
if project_branch not in self.project_branches:
|
||||
self.project_branches.append(project_branch)
|
||||
|
||||
if not self.name:
|
||||
self.name = project.name
|
||||
|
||||
def matches(self, project, branch):
|
||||
return (project, branch) in self.project_branches
|
||||
|
||||
def enqueueChange(self, change, event):
|
||||
item = QueueItem(self, change, event)
|
||||
self.enqueueItem(item)
|
||||
|
@ -3715,6 +3728,7 @@ class UnparsedConfig(object):
|
|||
self.nodesets = []
|
||||
self.secrets = []
|
||||
self.semaphores = []
|
||||
self.queues = []
|
||||
|
||||
# The list of files/dirs which this represents.
|
||||
self.files_examined = set()
|
||||
|
@ -3728,7 +3742,8 @@ class UnparsedConfig(object):
|
|||
# project-branch-path so that we can share them across objects
|
||||
source_contexts = {}
|
||||
for attr in ['pragmas', 'pipelines', 'jobs', 'project_templates',
|
||||
'projects', 'nodesets', 'secrets', 'semaphores']:
|
||||
'projects', 'nodesets', 'secrets', 'semaphores',
|
||||
'queues']:
|
||||
# Make a deep copy of each of our attributes
|
||||
old_objlist = getattr(self, attr)
|
||||
new_objlist = copy.deepcopy(old_objlist)
|
||||
|
@ -3758,6 +3773,7 @@ class UnparsedConfig(object):
|
|||
self.nodesets.extend(conf.nodesets)
|
||||
self.secrets.extend(conf.secrets)
|
||||
self.semaphores.extend(conf.semaphores)
|
||||
self.queues.extend(conf.queues)
|
||||
return
|
||||
|
||||
if not isinstance(conf, list):
|
||||
|
@ -3783,6 +3799,8 @@ class UnparsedConfig(object):
|
|||
self.secrets.append(value)
|
||||
elif key == 'semaphore':
|
||||
self.semaphores.append(value)
|
||||
elif key == 'queue':
|
||||
self.queues.append(value)
|
||||
elif key == 'pragma':
|
||||
self.pragmas.append(value)
|
||||
else:
|
||||
|
@ -3802,6 +3820,7 @@ class ParsedConfig(object):
|
|||
self.nodesets = []
|
||||
self.secrets = []
|
||||
self.semaphores = []
|
||||
self.queues = []
|
||||
|
||||
def copy(self):
|
||||
r = ParsedConfig()
|
||||
|
@ -3814,6 +3833,7 @@ class ParsedConfig(object):
|
|||
r.nodesets = self.nodesets[:]
|
||||
r.secrets = self.secrets[:]
|
||||
r.semaphores = self.semaphores[:]
|
||||
r.queues = self.queues[:]
|
||||
return r
|
||||
|
||||
def extend(self, conf):
|
||||
|
@ -3826,6 +3846,7 @@ class ParsedConfig(object):
|
|||
self.nodesets.extend(conf.nodesets)
|
||||
self.secrets.extend(conf.secrets)
|
||||
self.semaphores.extend(conf.semaphores)
|
||||
self.queues.extend(conf.queues)
|
||||
for regex, projects in conf.projects_by_regex.items():
|
||||
self.projects_by_regex.setdefault(regex, []).extend(projects)
|
||||
return
|
||||
|
@ -3859,6 +3880,7 @@ class Layout(object):
|
|||
self.nodesets = {}
|
||||
self.secrets = {}
|
||||
self.semaphores = {}
|
||||
self.queues = {}
|
||||
self.loading_errors = LoadingErrors()
|
||||
|
||||
def getJob(self, name):
|
||||
|
@ -3967,6 +3989,13 @@ class Layout(object):
|
|||
return
|
||||
self.semaphores[semaphore.name] = semaphore
|
||||
|
||||
def addQueue(self, queue):
|
||||
# Change queues must be unique and cannot be overridden.
|
||||
if queue.name in self.queues:
|
||||
raise Exception('Queue %s is already defined' % queue.name)
|
||||
|
||||
self.queues[queue.name] = queue
|
||||
|
||||
def addPipeline(self, pipeline):
|
||||
if pipeline.tenant is not self.tenant:
|
||||
raise Exception("Pipeline created for tenant %s "
|
||||
|
@ -4326,6 +4355,22 @@ class Semaphore(ConfigObject):
|
|||
self.max == other.max)
|
||||
|
||||
|
||||
class Queue(ConfigObject):
|
||||
def __init__(self, name, per_branch=False):
|
||||
super().__init__()
|
||||
self.name = name
|
||||
self.per_branch = per_branch
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self.__eq__(other)
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, Queue):
|
||||
return False
|
||||
return (self.name == other.name and
|
||||
self.per_branch == other.per_branch)
|
||||
|
||||
|
||||
class SemaphoreHandler(object):
|
||||
log = logging.getLogger("zuul.SemaphoreHandler")
|
||||
|
||||
|
|
|
@ -1032,7 +1032,8 @@ class Scheduler(threading.Thread):
|
|||
last_head = None
|
||||
for shared_queue in old_pipeline.queues:
|
||||
# Attempt to keep window sizes from shrinking where possible
|
||||
new_queue = new_pipeline.getQueue(shared_queue.projects[0])
|
||||
project, branch = shared_queue.project_branches[0]
|
||||
new_queue = new_pipeline.getQueue(project, branch)
|
||||
if new_queue and shared_queue.window and (not static_window):
|
||||
new_queue.window = max(shared_queue.window,
|
||||
new_queue.window_floor)
|
||||
|
|
Loading…
Reference in New Issue