From e5ba72f77803519c99014da70f3ab97a13121912 Mon Sep 17 00:00:00 2001 From: Tobias Henkel Date: Wed, 8 Apr 2020 12:35:52 +0200 Subject: [PATCH] Support per branch change queues We have several large projects with most of the time long gate queues. Those projects typically work on master and few release branches where the changes in the release branches are more important to the changes for master. Currently all of those changes are queued up in a shared gate queue which makes the process of getting changes into the release branches very slow especially if occasional gate resets are involved. In order to improve this allow specifying the change queues per branch so we can queue up the changes for each release branch in a separate queue. This is done by adding a new config element 'queue' which can be configured to work on a per branch level. Change-Id: Ie5c1a2b8f413fd595dbaaeba67251da14c6b4b36 --- doc/source/reference/config.rst | 1 + doc/source/reference/project_def.rst | 3 + doc/source/reference/queue_def.rst | 44 ++++ ...nge-queue-per-branch-8f7f352fdf0aec3b.yaml | 5 + tests/base.py | 5 +- .../common-config/playbooks/project-test.yaml | 2 + .../git/common-config/zuul.d/config.yaml | 35 +++ .../git/common-config/zuul.d/queue.yaml | 3 + .../change-queues/git/org_project/.zuul.yaml | 10 + .../change-queues/git/org_project2/readme | 1 + .../git/org_project3/zuul.d/project.yaml | 5 + .../git/org_project3/zuul.d/queue.yaml | 3 + tests/fixtures/config/change-queues/main.yaml | 10 + tests/unit/test_configloader.py | 2 +- tests/unit/test_scheduler.py | 214 +++++++++++++++++- zuul/configloader.py | 47 +++- zuul/manager/__init__.py | 7 +- zuul/manager/dependent.py | 6 +- zuul/manager/independent.py | 2 +- zuul/manager/shared.py | 86 ++++++- zuul/manager/supercedent.py | 2 +- zuul/model.py | 59 ++++- zuul/scheduler.py | 3 +- 23 files changed, 515 insertions(+), 40 deletions(-) create mode 100644 doc/source/reference/queue_def.rst create mode 100644 releasenotes/notes/change-queue-per-branch-8f7f352fdf0aec3b.yaml create mode 100644 tests/fixtures/config/change-queues/git/common-config/playbooks/project-test.yaml create mode 100644 tests/fixtures/config/change-queues/git/common-config/zuul.d/config.yaml create mode 100644 tests/fixtures/config/change-queues/git/common-config/zuul.d/queue.yaml create mode 100644 tests/fixtures/config/change-queues/git/org_project/.zuul.yaml create mode 100644 tests/fixtures/config/change-queues/git/org_project2/readme create mode 100644 tests/fixtures/config/change-queues/git/org_project3/zuul.d/project.yaml create mode 100644 tests/fixtures/config/change-queues/git/org_project3/zuul.d/queue.yaml create mode 100644 tests/fixtures/config/change-queues/main.yaml diff --git a/doc/source/reference/config.rst b/doc/source/reference/config.rst index 036980ca81..0646f9c882 100644 --- a/doc/source/reference/config.rst +++ b/doc/source/reference/config.rst @@ -90,6 +90,7 @@ the YAML files: pipeline_def job_def project_def + queue_def secret_def nodeset_def semaphore_def diff --git a/doc/source/reference/project_def.rst b/doc/source/reference/project_def.rst index 55a63fb358..6f663d09e1 100644 --- a/doc/source/reference/project_def.rst +++ b/doc/source/reference/project_def.rst @@ -176,6 +176,9 @@ pipeline. changes which break the others. This is a free-form string; just set the same value for each group of projects. + The name can refer to the name of a :attr:`queue` which allows + further configuration of the queue. + Each pipeline for a project can only belong to one queue, therefore Zuul will use the first value that it encounters. It need not appear in the first instance of a :attr:`project` diff --git a/doc/source/reference/queue_def.rst b/doc/source/reference/queue_def.rst new file mode 100644 index 0000000000..d5f106a0b6 --- /dev/null +++ b/doc/source/reference/queue_def.rst @@ -0,0 +1,44 @@ +.. _queue: + +Queue +===== + +Projects that interact with each other should share a ``queue``. +This is especially used in a :value:`dependent ` +pipeline. The :attr:`project..queue` can optionally refer +to a specific :attr:`queue` object that can further configure the +behavior of the queue. + +Here is an example ``queue`` configuration. + +.. code-block:: yaml + + - queue: + name: integrated + per-branch: false + + +.. attr:: queue + + The attributes available on a queue are as follows (all are + optional unless otherwise specified): + + .. attr:: name + :required: + + This is used later in the project definition to refer to this queue. + + .. attr:: per-branch + :default: false + + Queues by default define a single queue for all projects and + branches that use it. This is especially important if projects + want to do upgrade tests between different branches in + the :term:`gate`. If a set of projects doesn't have this use case + it can configure the queue to create a shared queue per branch for + all projects. This can be useful for large projects to improve the + throughput of a gate pipeline as this results in shorter queues + and thus less impact when a job fails in the gate. Note that this + means that all projects that should be gated must have aligned branch + names when using per branch queues. Otherwise changes that belong + together end up in different queues. diff --git a/releasenotes/notes/change-queue-per-branch-8f7f352fdf0aec3b.yaml b/releasenotes/notes/change-queue-per-branch-8f7f352fdf0aec3b.yaml new file mode 100644 index 0000000000..7b7647234b --- /dev/null +++ b/releasenotes/notes/change-queue-per-branch-8f7f352fdf0aec3b.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Projects can now configure change queues to queue per branch. + See :attr:`queue` for more information. diff --git a/tests/base.py b/tests/base.py index d5d4a009b1..1c74bc978a 100644 --- a/tests/base.py +++ b/tests/base.py @@ -3136,7 +3136,7 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer): cid = None changes[cid] = data - def release(self, regex=None): + def release(self, regex=None, change=None): """Release a held build. :arg str regex: A regular expression which, if supplied, will @@ -3151,7 +3151,8 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer): self.log.debug("Releasing build %s (%s)" % (regex, len(builds))) for build in builds: - if not regex or re.match(regex, build.name): + if (not regex or re.match(regex, build.name) and + not change or build.change == change): self.log.debug("Releasing build %s" % (build.parameters['zuul']['build'])) build.release() diff --git a/tests/fixtures/config/change-queues/git/common-config/playbooks/project-test.yaml b/tests/fixtures/config/change-queues/git/common-config/playbooks/project-test.yaml new file mode 100644 index 0000000000..f679dceaef --- /dev/null +++ b/tests/fixtures/config/change-queues/git/common-config/playbooks/project-test.yaml @@ -0,0 +1,2 @@ +- hosts: all + tasks: [] diff --git a/tests/fixtures/config/change-queues/git/common-config/zuul.d/config.yaml b/tests/fixtures/config/change-queues/git/common-config/zuul.d/config.yaml new file mode 100644 index 0000000000..0946ca3f95 --- /dev/null +++ b/tests/fixtures/config/change-queues/git/common-config/zuul.d/config.yaml @@ -0,0 +1,35 @@ +- pipeline: + name: gate + manager: dependent + success-message: Build succeeded (gate). + trigger: + gerrit: + - event: comment-added + approval: + - Approved: 1 + success: + gerrit: + Verified: 2 + submit: true + failure: + gerrit: + Verified: -2 + start: + gerrit: + Verified: 0 + precedence: high + +- job: + name: base + parent: null + +- job: + name: project-test + run: playbooks/project-test.yaml + +- project: + name: org/project2 + gate: + queue: integrated + jobs: + - project-test diff --git a/tests/fixtures/config/change-queues/git/common-config/zuul.d/queue.yaml b/tests/fixtures/config/change-queues/git/common-config/zuul.d/queue.yaml new file mode 100644 index 0000000000..91badeb8f3 --- /dev/null +++ b/tests/fixtures/config/change-queues/git/common-config/zuul.d/queue.yaml @@ -0,0 +1,3 @@ +- queue: + name: integrated + per-branch: true diff --git a/tests/fixtures/config/change-queues/git/org_project/.zuul.yaml b/tests/fixtures/config/change-queues/git/org_project/.zuul.yaml new file mode 100644 index 0000000000..b62c7ee67d --- /dev/null +++ b/tests/fixtures/config/change-queues/git/org_project/.zuul.yaml @@ -0,0 +1,10 @@ +# This queue will be ignored since it is already defined in common-config +- queue: + name: integrated + per-branch: false + +- project: + gate: + queue: integrated + jobs: + - project-test diff --git a/tests/fixtures/config/change-queues/git/org_project2/readme b/tests/fixtures/config/change-queues/git/org_project2/readme new file mode 100644 index 0000000000..9daeafb986 --- /dev/null +++ b/tests/fixtures/config/change-queues/git/org_project2/readme @@ -0,0 +1 @@ +test diff --git a/tests/fixtures/config/change-queues/git/org_project3/zuul.d/project.yaml b/tests/fixtures/config/change-queues/git/org_project3/zuul.d/project.yaml new file mode 100644 index 0000000000..e1a297bd58 --- /dev/null +++ b/tests/fixtures/config/change-queues/git/org_project3/zuul.d/project.yaml @@ -0,0 +1,5 @@ +- project: + gate: + queue: integrated-untrusted + jobs: + - project-test diff --git a/tests/fixtures/config/change-queues/git/org_project3/zuul.d/queue.yaml b/tests/fixtures/config/change-queues/git/org_project3/zuul.d/queue.yaml new file mode 100644 index 0000000000..ac7074a0eb --- /dev/null +++ b/tests/fixtures/config/change-queues/git/org_project3/zuul.d/queue.yaml @@ -0,0 +1,3 @@ +- queue: + name: integrated-untrusted + per-branch: true diff --git a/tests/fixtures/config/change-queues/main.yaml b/tests/fixtures/config/change-queues/main.yaml new file mode 100644 index 0000000000..7eacee534d --- /dev/null +++ b/tests/fixtures/config/change-queues/main.yaml @@ -0,0 +1,10 @@ +- tenant: + name: tenant-one + source: + gerrit: + config-projects: + - common-config + untrusted-projects: + - org/project + - org/project2 + - org/project3 diff --git a/tests/unit/test_configloader.py b/tests/unit/test_configloader.py index 924ce05ddb..191c95dc8e 100644 --- a/tests/unit/test_configloader.py +++ b/tests/unit/test_configloader.py @@ -26,7 +26,7 @@ class TenantParserTestCase(ZuulTestCase): create_project_keys = True CONFIG_SET = set(['pipeline', 'job', 'semaphore', 'project', - 'project-template', 'nodeset', 'secret']) + 'project-template', 'nodeset', 'secret', 'queue']) UNTRUSTED_SET = CONFIG_SET - set(['pipeline']) def setupAllProjectKeys(self, config: ConfigParser): diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 3756db16f4..1fa9ff9f32 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -20,6 +20,7 @@ import os import shutil import socket import time +from collections import namedtuple from unittest import mock from unittest import skip from kazoo.exceptions import NoNodeError @@ -3159,8 +3160,17 @@ class TestScheduler(ZuulTestCase): tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') (trusted, project1) = tenant.getProject('org/project1') (trusted, project2) = tenant.getProject('org/project2') - q1 = tenant.layout.pipelines['gate'].getQueue(project1) - q2 = tenant.layout.pipelines['gate'].getQueue(project2) + # Change queues are created lazy by the dependent pipeline manager + # so retrieve the queue first without having to really enqueue a + # change first. + gate = tenant.layout.pipelines['gate'] + FakeChange = namedtuple('FakeChange', ['project', 'branch']) + fake_a = FakeChange(project1, 'master') + fake_b = FakeChange(project2, 'master') + gate.manager.getChangeQueue(fake_a, None) + gate.manager.getChangeQueue(fake_b, None) + q1 = gate.getQueue(project1, None) + q2 = gate.getQueue(project2, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -3170,8 +3180,18 @@ class TestScheduler(ZuulTestCase): tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') (trusted, project1) = tenant.getProject('org/project1') (trusted, project2) = tenant.getProject('org/project2') - q1 = tenant.layout.pipelines['gate'].getQueue(project1) - q2 = tenant.layout.pipelines['gate'].getQueue(project2) + + # Change queues are created lazy by the dependent pipeline manager + # so retrieve the queue first without having to really enqueue a + # change first. + gate = tenant.layout.pipelines['gate'] + FakeChange = namedtuple('FakeChange', ['project', 'branch']) + fake_a = FakeChange(project1, 'master') + fake_b = FakeChange(project2, 'master') + gate.manager.getChangeQueue(fake_a, None) + gate.manager.getChangeQueue(fake_b, None) + q1 = gate.getQueue(project1, None) + q2 = gate.getQueue(project2, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -3181,8 +3201,17 @@ class TestScheduler(ZuulTestCase): tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') (trusted, project1) = tenant.getProject('org/project1') (trusted, project2) = tenant.getProject('org/project2') - q1 = tenant.layout.pipelines['gate'].getQueue(project1) - q2 = tenant.layout.pipelines['gate'].getQueue(project2) + # Change queues are created lazy by the dependent pipeline manager + # so retrieve the queue first without having to really enqueue a + # change first. + gate = tenant.layout.pipelines['gate'] + FakeChange = namedtuple('FakeChange', ['project', 'branch']) + fake_a = FakeChange(project1, 'master') + fake_b = FakeChange(project2, 'master') + gate.manager.getChangeQueue(fake_a, None) + gate.manager.getChangeQueue(fake_b, None) + q1 = gate.getQueue(project1, None) + q2 = gate.getQueue(project2, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -3192,8 +3221,17 @@ class TestScheduler(ZuulTestCase): tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') (trusted, project1) = tenant.getProject('org/project1') (trusted, project2) = tenant.getProject('org/project2') - q1 = tenant.layout.pipelines['gate'].getQueue(project1) - q2 = tenant.layout.pipelines['gate'].getQueue(project2) + # Change queues are created lazy by the dependent pipeline manager + # so retrieve the queue first without having to really enqueue a + # change first. + gate = tenant.layout.pipelines['gate'] + FakeChange = namedtuple('FakeChange', ['project', 'branch']) + fake_a = FakeChange(project1, 'master') + fake_b = FakeChange(project2, 'master') + gate.manager.getChangeQueue(fake_a, None) + gate.manager.getChangeQueue(fake_b, None) + q1 = gate.getQueue(project1, None) + q2 = gate.getQueue(project2, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -6250,6 +6288,166 @@ For CI problems and help debugging, contact ci@example.org""" ], ordered=False) +class TestChangeQueues(ZuulTestCase): + tenant_config_file = 'config/change-queues/main.yaml' + + def _test_dependent_queues_per_branch(self, project, + queue_name='integrated', + queue_repo='common-config'): + self.create_branch(project, 'stable') + self.fake_gerrit.addEvent( + self.fake_gerrit.getFakeBranchCreatedEvent(project, 'stable')) + self.waitUntilSettled() + + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange(project, 'master', 'A') + B = self.fake_gerrit.addFakeChange(project, 'stable', 'B') + A.addApproval('Code-Review', 2) + B.addApproval('Code-Review', 2) + + self.executor_server.failJob('project-test', A) + + # Let first go A into gate then B + self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) + self.waitUntilSettled() + + self.fake_gerrit.addEvent(B.addApproval('Approved', 1)) + self.waitUntilSettled() + + # There should be one project-test job at the head of each queue + self.assertBuilds([ + dict(name='project-test', changes='1,1'), + dict(name='project-test', changes='2,1'), + ]) + tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') + _, p = tenant.getProject(project) + q1 = tenant.layout.pipelines['gate'].getQueue(p, 'master') + q2 = tenant.layout.pipelines['gate'].getQueue(p, 'stable') + self.assertEqual(q1.name, queue_name) + self.assertEqual(q2.name, queue_name) + + # Both queues must contain one item + self.assertEqual(len(q1.queue), 1) + self.assertEqual(len(q2.queue), 1) + + # Fail job on the change on master + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertNotEqual(A.data['status'], 'MERGED') + self.assertEqual(B.data['status'], 'MERGED') + self.assertEqual(A.reported, 2) + self.assertEqual(B.reported, 2) + + # Now reconfigure the queue to be non-branched and run the same test + # again. + conf = textwrap.dedent( + """ + - queue: + name: {} + per-branch: false + """).format(queue_name) + + file_dict = {'zuul.d/queue.yaml': conf} + C = self.fake_gerrit.addFakeChange(queue_repo, 'master', 'A', + files=file_dict) + C.setMerged() + self.fake_gerrit.addEvent(C.getChangeMergedEvent()) + self.waitUntilSettled() + + self.executor_server.hold_jobs_in_build = True + D = self.fake_gerrit.addFakeChange(project, 'master', 'D') + E = self.fake_gerrit.addFakeChange(project, 'stable', 'E') + D.addApproval('Code-Review', 2) + E.addApproval('Code-Review', 2) + + self.executor_server.failJob('project-test', D) + + # Let first go A into gate then B + self.fake_gerrit.addEvent(D.addApproval('Approved', 1)) + self.waitUntilSettled() + + self.fake_gerrit.addEvent(E.addApproval('Approved', 1)) + self.waitUntilSettled() + + # There should be two project-test jobs in a shared queue + self.assertBuilds([ + dict(name='project-test', changes='4,1'), + dict(name='project-test', changes='4,1 5,1'), + ]) + tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') + _, p = tenant.getProject(project) + q1 = tenant.layout.pipelines['gate'].getQueue(p, 'master') + q2 = tenant.layout.pipelines['gate'].getQueue(p, 'stable') + q3 = tenant.layout.pipelines['gate'].getQueue(p, None) + + # There should be no branch specific queues anymore + self.assertEqual(q1, None) + self.assertEqual(q2, None) + self.assertEqual(q3.name, queue_name) + + # Both queues must contain one item + self.assertEqual(len(q3.queue), 2) + + # Release project-test of D to make history after test deterministic + self.executor_server.release('project-test', change='4 1') + self.waitUntilSettled() + + # Fail job on the change on master + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertNotEqual(D.data['status'], 'MERGED') + self.assertEqual(E.data['status'], 'MERGED') + self.assertEqual(D.reported, 2) + self.assertEqual(E.reported, 2) + + self.assertHistory([ + # Independent runs because of per branch queues + dict(name='project-test', result='FAILURE', changes='1,1'), + dict(name='project-test', result='SUCCESS', changes='2,1'), + + # Same queue with gate reset because of 4,1 + dict(name='project-test', result='FAILURE', changes='4,1'), + + # Result can be anything depending on timing of the gate reset. + dict(name='project-test', changes='4,1 5,1'), + dict(name='project-test', result='SUCCESS', changes='5,1'), + ], ordered=False) + + def test_dependent_queues_per_branch(self): + """ + Test that change queues can be different for different branches. + + In this case the project contains zuul config so the branches are + known upfront and the queues are pre-seeded. + """ + self._test_dependent_queues_per_branch('org/project') + + def test_dependent_queues_per_branch_no_config(self): + """ + Test that change queues can be different for different branches. + + In this case we create changes for two branches in a repo that + doesn't contain zuul config so the queues are not pre-seeded + in the gate pipeline. + """ + self._test_dependent_queues_per_branch('org/project2') + + def test_dependent_queues_per_branch_untrusted(self): + """ + Test that change queues can be different for different branches. + + In this case we create changes for two branches in an untrusted repo + that defines its own queue. + """ + self._test_dependent_queues_per_branch( + 'org/project3', queue_name='integrated-untrusted', + queue_repo='org/project3') + + class TestJobUpdateBrokenConfig(ZuulTestCase): tenant_config_file = 'config/job-update-broken/main.yaml' diff --git a/zuul/configloader.py b/zuul/configloader.py index 748fd90373..cf2fd296ab 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -354,7 +354,7 @@ class ZuulMark(object): class ZuulSafeLoader(yaml.SafeLoader): zuul_node_types = frozenset(('job', 'nodeset', 'secret', 'pipeline', 'project', 'project-template', - 'semaphore', 'pragma')) + 'semaphore', 'queue', 'pragma')) def __init__(self, stream, context): wrapped_stream = io.StringIO(stream) @@ -1365,6 +1365,29 @@ class SemaphoreParser(object): return semaphore +class QueueParser: + def __init__(self, pcontext): + self.log = logging.getLogger("zuul.QueueParser") + self.pcontext = pcontext + self.schema = self.getSchema() + + def getSchema(self): + queue = {vs.Required('name'): str, + 'per-branch': bool, + '_source_context': model.SourceContext, + '_start_mark': ZuulMark, + } + return vs.Schema(queue) + + def fromYaml(self, conf): + self.schema(conf) + queue = model.Queue(conf['name'], conf.get('per-branch', False)) + queue.source_context = conf.get('_source_context') + queue.start_mark = conf.get('_start_mark') + queue.freeze() + return queue + + class AuthorizationRuleParser(object): def __init__(self): self.log = logging.getLogger("zuul.AuthorizationRuleParser") @@ -1413,6 +1436,7 @@ class ParseContext(object): self.secret_parser = SecretParser(self) self.job_parser = JobParser(self) self.semaphore_parser = SemaphoreParser(self) + self.queue_parser = QueueParser(self) self.project_template_parser = ProjectTemplateParser(self) self.project_parser = ProjectParser(self) @@ -1452,7 +1476,7 @@ class TenantParser(object): self.keystorage = keystorage classes = vs.Any('pipeline', 'job', 'semaphore', 'project', - 'project-template', 'nodeset', 'secret') + 'project-template', 'nodeset', 'secret', 'queue') project_dict = {str: { 'include': to_list(classes), @@ -1746,7 +1770,8 @@ class TenantParser(object): untrusted_projects = [] default_include = frozenset(['pipeline', 'job', 'semaphore', 'project', - 'secret', 'project-template', 'nodeset']) + 'secret', 'project-template', 'nodeset', + 'queue']) for source_name, conf_source in conf_tenant.get('source', {}).items(): source = self.connections.getSource(source_name) @@ -1962,6 +1987,15 @@ class TenantParser(object): parsed_config.semaphores.append( pcontext.semaphore_parser.fromYaml(config_semaphore)) + for config_queue in unparsed_config.queues: + classes = self._getLoadClasses(tenant, config_queue) + if 'queue' not in classes: + continue + with configuration_exceptions('queue', + config_queue, loading_errors): + parsed_config.queues.append( + pcontext.queue_parser.fromYaml(config_queue)) + for config_template in unparsed_config.project_templates: classes = self._getLoadClasses(tenant, config_template) if 'project-template' not in classes: @@ -2019,6 +2053,9 @@ class TenantParser(object): for job in parsed_config.jobs: _cache('jobs', job) + for queue in parsed_config.queues: + _cache('queues', queue) + for semaphore in parsed_config.semaphores: _cache('semaphores', semaphore) @@ -2079,6 +2116,10 @@ class TenantParser(object): 'semaphore', semaphore, layout.loading_errors): semaphore_layout.addSemaphore(semaphore) + for queue in parsed_config.queues: + with reference_exceptions('queue', queue, layout.loading_errors): + layout.addQueue(queue) + for template in parsed_config.project_templates: with reference_exceptions( 'project-template', template, layout.loading_errors): diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 7144633c94..f511012642 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -9,11 +9,10 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - import logging import textwrap import urllib -from abc import ABCMeta +from abc import ABCMeta, abstractmethod from zuul import exceptions from zuul import model @@ -252,6 +251,10 @@ class PipelineManager(metaclass=ABCMeta): if item.change.equals(change): self.removeItem(item) + @abstractmethod + def getChangeQueue(self, change, event, existing=None): + pass + def reEnqueueItem(self, item, last_head, old_item_ahead, item_ahead_valid): log = get_annotated_logger(self.log, item.event) with self.getChangeQueue(item.change, item.event, diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py index c219182336..5d9450fc4b 100644 --- a/zuul/manager/dependent.py +++ b/zuul/manager/dependent.py @@ -65,15 +65,17 @@ class DependentPipelineManager(SharedQueuePipelineManager): # for project in change_queue, project.source get changes, then dedup. sources = set() - for project in change_queue.projects: + for project, _ in change_queue.project_branches: sources.add(project.source) seen = set(change.needed_by_changes) needed_by_changes = change.needed_by_changes[:] for source in sources: log.debug(" Checking source: %s", source) + projects = [project_branch[0] + for project_branch in change_queue.project_branches] for c in source.getChangesDependingOn(change, - change_queue.projects, + projects, self.pipeline.tenant): if c not in seen: seen.add(c) diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py index 682020a029..b57f7a0780 100644 --- a/zuul/manager/independent.py +++ b/zuul/manager/independent.py @@ -31,7 +31,7 @@ class IndependentPipelineManager(PipelineManager): if existing: return DynamicChangeQueueContextManager(existing) change_queue = model.ChangeQueue(self.pipeline) - change_queue.addProject(change.project) + change_queue.addProject(change.project, None) self.pipeline.addQueue(change_queue) log.debug("Dynamically created queue %s", change_queue) return DynamicChangeQueueContextManager(change_queue) diff --git a/zuul/manager/shared.py b/zuul/manager/shared.py index 4b286f4f15..f2ea53b61f 100644 --- a/zuul/manager/shared.py +++ b/zuul/manager/shared.py @@ -18,6 +18,37 @@ from zuul.manager import PipelineManager, StaticChangeQueueContextManager from zuul.manager import DynamicChangeQueueContextManager +class ChangeQueueManager: + + def __init__(self, pipeline_manager, name=None, per_branch=False): + self.log = pipeline_manager.log + self.pipeline_manager = pipeline_manager + self.name = name + self.per_branch = per_branch + self.projects = [] + self.created_for_branches = {} + + def addProject(self, project): + self.projects.append(project) + + def getOrCreateQueue(self, project, branch): + change_queue = self.created_for_branches.get(branch) + + if not change_queue: + p = self.pipeline_manager.pipeline + change_queue = self.pipeline_manager.constructChangeQueue( + self.name) + p.addQueue(change_queue) + self.created_for_branches[branch] = change_queue + + if not change_queue.matches(project, branch): + change_queue.addProject(project, branch) + self.log.debug("Added project %s to queue: %s" % + (project, change_queue)) + + return change_queue + + class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta): """Intermediate class that adds the shared-queue behavior. @@ -28,9 +59,13 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta): changes_merge = False + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.change_queue_managers = [] + def buildChangeQueues(self, layout): self.log.debug("Building shared change queues") - change_queues = {} + change_queues_managers = {} tenant = self.pipeline.tenant layout_project_configs = layout.project_configs @@ -49,21 +84,27 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta): break if not project_in_pipeline: continue - if queue_name and queue_name in change_queues: - change_queue = change_queues[queue_name] + + # Check if the queue is global or per branch + queue = layout.queues.get(queue_name) + per_branch = queue and queue.per_branch + + if queue_name and queue_name in change_queues_managers: + change_queue_manager = change_queues_managers[queue_name] else: - change_queue = self.constructChangeQueue(queue_name) + change_queue_manager = ChangeQueueManager( + self, name=queue_name, per_branch=per_branch) if queue_name: # If this is a named queue, keep track of it in # case it is referenced again. Otherwise, it will # have a name automatically generated from its # constituent projects. - change_queues[queue_name] = change_queue - self.pipeline.addQueue(change_queue) - self.log.debug("Created queue: %s" % change_queue) - change_queue.addProject(project) - self.log.debug("Added project %s to queue: %s" % - (project, change_queue)) + change_queues_managers[queue_name] = change_queue_manager + self.change_queue_managers.append(change_queue_manager) + self.log.debug("Created queue: %s" % change_queue_manager) + change_queue_manager.addProject(project) + self.log.debug("Added project %s to queue managers: %s" % + (project, change_queue_manager)) def getChangeQueue(self, change, event, existing=None): log = get_annotated_logger(self.log, event) @@ -71,14 +112,35 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta): # Ignore the existing queue, since we can always get the correct queue # from the pipeline. This avoids enqueuing changes in a wrong queue # e.g. during re-configuration. - queue = self.pipeline.getQueue(change.project) + queue = self.pipeline.getQueue(change.project, change.branch) if queue: return StaticChangeQueueContextManager(queue) else: + # Change queues in the dependent pipeline manager are created + # lazy so first check the managers for the project. + matching_managers = [t for t in self.change_queue_managers + if change.project in t.projects] + if matching_managers: + manager = matching_managers[0] + branch = None + if manager.per_branch: + # The change queue is not existing yet for this branch + branch = change.branch + + # We have a queue manager but no queue yet, so create it + return StaticChangeQueueContextManager( + manager.getOrCreateQueue(change.project, branch) + ) + + # No specific per-branch queue matched so look again with no branch + queue = self.pipeline.getQueue(change.project, None) + if queue: + return StaticChangeQueueContextManager(queue) + # There is no existing queue for this change. Create a # dynamic one for this one change's use change_queue = model.ChangeQueue(self.pipeline, dynamic=True) - change_queue.addProject(change.project) + change_queue.addProject(change.project, None) self.pipeline.addQueue(change_queue) log.debug("Dynamically created queue %s", change_queue) return DynamicChangeQueueContextManager(change_queue) diff --git a/zuul/manager/supercedent.py b/zuul/manager/supercedent.py index ec0174e2f5..05238b960f 100644 --- a/zuul/manager/supercedent.py +++ b/zuul/manager/supercedent.py @@ -44,7 +44,7 @@ class SupercedentPipelineManager(PipelineManager): window_floor=1, window_increase_type='none', window_decrease_type='none') - change_queue.addProject(change.project) + change_queue.addProject(change.project, None) self.pipeline.addQueue(change_queue) log.debug("Dynamically created queue %s", change_queue) return DynamicChangeQueueContextManager(change_queue) diff --git a/zuul/model.py b/zuul/model.py index 77fdf337d7..2ff3ff3ab8 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -272,6 +272,7 @@ class Pipeline(object): self.queues = [] self.relative_priority_queues = {} self.precedence = PRECEDENCE_NORMAL + self.supercedes = [] self.triggers = [] self.enqueue_actions = [] self.start_actions = [] @@ -329,9 +330,10 @@ class Pipeline(object): def addQueue(self, queue): self.queues.append(queue) - def getQueue(self, project): + def getQueue(self, project, branch): + # Queues might be branch specific so match with branch for queue in self.queues: - if project in queue.projects: + if queue.matches(project, branch): return queue return None @@ -416,7 +418,7 @@ class ChangeQueue(object): self.name = name else: self.name = '' - self.projects = [] + self.project_branches = [] self._jobs = set() self.queue = [] self.window = window @@ -433,13 +435,24 @@ class ChangeQueue(object): def getJobs(self): return self._jobs - def addProject(self, project): - if project not in self.projects: - self.projects.append(project) + def addProject(self, project, branch): + """ + Adds a project branch combination to the queue. + + The queue will match exactly this combination. If the caller doesn't + care about branches it can supply None (but must supply None as well + when matching) + """ + project_branch = (project, branch) + if project_branch not in self.project_branches: + self.project_branches.append(project_branch) if not self.name: self.name = project.name + def matches(self, project, branch): + return (project, branch) in self.project_branches + def enqueueChange(self, change, event): item = QueueItem(self, change, event) self.enqueueItem(item) @@ -3682,6 +3695,7 @@ class UnparsedConfig(object): self.nodesets = [] self.secrets = [] self.semaphores = [] + self.queues = [] # The list of files/dirs which this represents. self.files_examined = set() @@ -3695,7 +3709,8 @@ class UnparsedConfig(object): # project-branch-path so that we can share them across objects source_contexts = {} for attr in ['pragmas', 'pipelines', 'jobs', 'project_templates', - 'projects', 'nodesets', 'secrets', 'semaphores']: + 'projects', 'nodesets', 'secrets', 'semaphores', + 'queues']: # Make a deep copy of each of our attributes old_objlist = getattr(self, attr) new_objlist = copy.deepcopy(old_objlist) @@ -3725,6 +3740,7 @@ class UnparsedConfig(object): self.nodesets.extend(conf.nodesets) self.secrets.extend(conf.secrets) self.semaphores.extend(conf.semaphores) + self.queues.extend(conf.queues) return if not isinstance(conf, list): @@ -3750,6 +3766,8 @@ class UnparsedConfig(object): self.secrets.append(value) elif key == 'semaphore': self.semaphores.append(value) + elif key == 'queue': + self.queues.append(value) elif key == 'pragma': self.pragmas.append(value) else: @@ -3769,6 +3787,7 @@ class ParsedConfig(object): self.nodesets = [] self.secrets = [] self.semaphores = [] + self.queues = [] def copy(self): r = ParsedConfig() @@ -3781,6 +3800,7 @@ class ParsedConfig(object): r.nodesets = self.nodesets[:] r.secrets = self.secrets[:] r.semaphores = self.semaphores[:] + r.queues = self.queues[:] return r def extend(self, conf): @@ -3793,6 +3813,7 @@ class ParsedConfig(object): self.nodesets.extend(conf.nodesets) self.secrets.extend(conf.secrets) self.semaphores.extend(conf.semaphores) + self.queues.extend(conf.queues) for regex, projects in conf.projects_by_regex.items(): self.projects_by_regex.setdefault(regex, []).extend(projects) return @@ -3826,6 +3847,7 @@ class Layout(object): self.nodesets = {} self.secrets = {} self.semaphores = {} + self.queues = {} self.loading_errors = LoadingErrors() def getJob(self, name): @@ -3934,6 +3956,13 @@ class Layout(object): return self.semaphores[semaphore.name] = semaphore + def addQueue(self, queue): + # Change queues must be unique and cannot be overridden. + if queue.name in self.queues: + raise Exception('Queue %s is already defined' % queue.name) + + self.queues[queue.name] = queue + def addPipeline(self, pipeline): if pipeline.tenant is not self.tenant: raise Exception("Pipeline created for tenant %s " @@ -4293,6 +4322,22 @@ class Semaphore(ConfigObject): self.max == other.max) +class Queue(ConfigObject): + def __init__(self, name, per_branch=False): + super().__init__() + self.name = name + self.per_branch = per_branch + + def __ne__(self, other): + return not self.__eq__(other) + + def __eq__(self, other): + if not isinstance(other, Queue): + return False + return (self.name == other.name and + self.per_branch == other.per_branch) + + class SemaphoreHandler(object): log = logging.getLogger("zuul.SemaphoreHandler") diff --git a/zuul/scheduler.py b/zuul/scheduler.py index ad2baa7cb5..006dee495b 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1029,7 +1029,8 @@ class Scheduler(threading.Thread): last_head = None for shared_queue in old_pipeline.queues: # Attempt to keep window sizes from shrinking where possible - new_queue = new_pipeline.getQueue(shared_queue.projects[0]) + project, branch = shared_queue.project_branches[0] + new_queue = new_pipeline.getQueue(project, branch) if new_queue and shared_queue.window and (not static_window): new_queue.window = max(shared_queue.window, new_queue.window_floor)