Merge "Add supercedent pipeline manager"
This commit is contained in:
commit
06df3bb0d1
@ -142,7 +142,7 @@ success, the pipeline reports back to Gerrit with ``Verified`` vote of
|
||||
.. attr:: manager
|
||||
:required:
|
||||
|
||||
There are currently two schemes for managing pipelines:
|
||||
There are three schemes for managing pipelines:
|
||||
|
||||
.. value:: independent
|
||||
|
||||
@ -186,6 +186,23 @@ success, the pipeline reports back to Gerrit with ``Verified`` vote of
|
||||
For more detail on the theory and operation of Zuul's
|
||||
dependent pipeline manager, see: :doc:`gating`.
|
||||
|
||||
.. value:: supercedent
|
||||
|
||||
This is like an independent pipeline, in that every item is
|
||||
distinct, except that items are grouped by project and ref,
|
||||
and only one item for each project-ref is processed at a
|
||||
time. If more than one additional item is enqueued for the
|
||||
project-ref, previously enqueued items which have not started
|
||||
processing are removed.
|
||||
|
||||
In other words, this pipeline manager will only run jobs for
|
||||
the most recent item enqueued for a given project-ref.
|
||||
|
||||
This may be useful for post-merge pipelines which perform
|
||||
artifact builds where only the latest version is of use. In
|
||||
these cases, build resources can be conserved by avoiding
|
||||
building intermediate versions.
|
||||
|
||||
.. attr:: post-review
|
||||
:default: false
|
||||
|
||||
|
@ -0,0 +1,5 @@
|
||||
features:
|
||||
- |
|
||||
The :value:`pipeline.manager.supercedent` pipeline manager has
|
||||
been added. It is designed to make post-merge artifact build
|
||||
pipelines more efficient.
|
@ -1361,6 +1361,7 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
|
||||
BuildHistory(name=build.name, result=result, changes=build.changes,
|
||||
node=build.node, uuid=build.unique,
|
||||
ref=build.parameters['zuul']['ref'],
|
||||
newrev=build.parameters['zuul'].get('newrev'),
|
||||
parameters=build.parameters, jobdir=build.jobdir,
|
||||
pipeline=build.parameters['zuul']['pipeline'])
|
||||
)
|
||||
|
21
tests/fixtures/layouts/supercedent.yaml
vendored
Normal file
21
tests/fixtures/layouts/supercedent.yaml
vendored
Normal file
@ -0,0 +1,21 @@
|
||||
- pipeline:
|
||||
name: post
|
||||
manager: supercedent
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: ref-updated
|
||||
ref: ^(?!refs/).*$
|
||||
|
||||
- job:
|
||||
name: base
|
||||
parent: null
|
||||
run: playbooks/base.yaml
|
||||
|
||||
- job:
|
||||
name: post-job
|
||||
|
||||
- project:
|
||||
name: org/project
|
||||
post:
|
||||
jobs:
|
||||
- post-job
|
83
tests/unit/test_supercedent.py
Normal file
83
tests/unit/test_supercedent.py
Normal file
@ -0,0 +1,83 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from tests.base import (
|
||||
ZuulTestCase,
|
||||
simple_layout,
|
||||
)
|
||||
|
||||
|
||||
class TestSupercedent(ZuulTestCase):
|
||||
tenant_config_file = 'config/single-tenant/main.yaml'
|
||||
|
||||
@simple_layout('layouts/supercedent.yaml')
|
||||
def test_supercedent(self):
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
arev = A.patchsets[-1]['revision']
|
||||
A.setMerged()
|
||||
self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
||||
# We should never run jobs for more than one change at a time
|
||||
self.assertEqual(len(self.builds), 1)
|
||||
|
||||
# This change should be superceded by the next
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
B.setMerged()
|
||||
self.fake_gerrit.addEvent(B.getRefUpdatedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 1)
|
||||
|
||||
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
|
||||
crev = C.patchsets[-1]['revision']
|
||||
C.setMerged()
|
||||
self.fake_gerrit.addEvent(C.getRefUpdatedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 1)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
self.orderedRelease()
|
||||
self.assertHistory([
|
||||
dict(name='post-job', result='SUCCESS', newrev=arev),
|
||||
dict(name='post-job', result='SUCCESS', newrev=crev),
|
||||
], ordered=False)
|
||||
|
||||
@simple_layout('layouts/supercedent.yaml')
|
||||
def test_supercedent_branches(self):
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
self.create_branch('org/project', 'stable')
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
arev = A.patchsets[-1]['revision']
|
||||
A.setMerged()
|
||||
self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 1)
|
||||
|
||||
# This change should not be superceded
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'stable', 'B')
|
||||
brev = B.patchsets[-1]['revision']
|
||||
B.setMerged()
|
||||
self.fake_gerrit.addEvent(B.getRefUpdatedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 2)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
self.orderedRelease()
|
||||
self.assertHistory([
|
||||
dict(name='post-job', result='SUCCESS', newrev=arev),
|
||||
dict(name='post-job', result='SUCCESS', newrev=brev),
|
||||
], ordered=False)
|
@ -28,6 +28,7 @@ from zuul import model
|
||||
from zuul.lib import yamlutil as yaml
|
||||
import zuul.manager.dependent
|
||||
import zuul.manager.independent
|
||||
import zuul.manager.supercedent
|
||||
from zuul import change_matcher
|
||||
from zuul.lib import encryption
|
||||
|
||||
@ -1012,7 +1013,8 @@ class PipelineParser(object):
|
||||
|
||||
def getSchema(self):
|
||||
manager = vs.Any('independent',
|
||||
'dependent')
|
||||
'dependent',
|
||||
'supercedent')
|
||||
|
||||
precedence = vs.Any('normal', 'low', 'high')
|
||||
|
||||
@ -1115,6 +1117,9 @@ class PipelineParser(object):
|
||||
elif manager_name == 'independent':
|
||||
manager = zuul.manager.independent.IndependentPipelineManager(
|
||||
self.pcontext.scheduler, pipeline)
|
||||
elif manager_name == 'supercedent':
|
||||
manager = zuul.manager.supercedent.SupercedentPipelineManager(
|
||||
self.pcontext.scheduler, pipeline)
|
||||
|
||||
pipeline.setManager(manager)
|
||||
|
||||
|
@ -610,7 +610,9 @@ class PipelineManager(object):
|
||||
self.cancelJobs(item, prime=False)
|
||||
else:
|
||||
item_ahead_merged = False
|
||||
if (item_ahead and item_ahead.change.is_merged):
|
||||
if (item_ahead and
|
||||
hasattr(item_ahead.change, 'is_merged') and
|
||||
item_ahead.change.is_merged):
|
||||
item_ahead_merged = True
|
||||
if (item_ahead != nnfi and not item_ahead_merged):
|
||||
# Our current base is different than what we expected,
|
||||
|
71
zuul/manager/supercedent.py
Normal file
71
zuul/manager/supercedent.py
Normal file
@ -0,0 +1,71 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from zuul import model
|
||||
from zuul.manager import PipelineManager, DynamicChangeQueueContextManager
|
||||
|
||||
|
||||
class SupercedentPipelineManager(PipelineManager):
|
||||
"""PipelineManager with one queue per project and a window of 1"""
|
||||
|
||||
changes_merge = False
|
||||
|
||||
def getChangeQueue(self, change, existing=None):
|
||||
# creates a new change queue for every project-ref
|
||||
# combination.
|
||||
if existing:
|
||||
return DynamicChangeQueueContextManager(existing)
|
||||
|
||||
# Don't use Pipeline.getQueue to find an existing queue
|
||||
# because we're matching project and ref.
|
||||
for queue in self.pipeline.queues:
|
||||
if (queue.queue[-1].change.project == change.project and
|
||||
queue.queue[-1].change.ref == change.ref):
|
||||
self.log.debug("Found existing queue %s", queue)
|
||||
return DynamicChangeQueueContextManager(queue)
|
||||
change_queue = model.ChangeQueue(
|
||||
self.pipeline,
|
||||
window=1,
|
||||
window_floor=1,
|
||||
window_increase_type='none',
|
||||
window_decrease_type='none')
|
||||
change_queue.addProject(change.project)
|
||||
self.pipeline.addQueue(change_queue)
|
||||
self.log.debug("Dynamically created queue %s", change_queue)
|
||||
return DynamicChangeQueueContextManager(change_queue)
|
||||
|
||||
def _pruneQueues(self):
|
||||
# Leave the first item in the queue, as it's running, and the
|
||||
# last item, as it's the most recent, but remove any items in
|
||||
# between. This is what causes the last item to "supercede"
|
||||
# any previously enqueued items (which we know aren't running
|
||||
# jobs because the window size is 1).
|
||||
for queue in self.pipeline.queues:
|
||||
remove = queue.queue[1:-1]
|
||||
for item in remove:
|
||||
self.log.debug("Item %s is superceded by %s, removing" %
|
||||
(item, queue.queue[-1]))
|
||||
self.removeItem(item)
|
||||
|
||||
def addChange(self, *args, **kw):
|
||||
ret = super(SupercedentPipelineManager, self).addChange(
|
||||
*args, **kw)
|
||||
if ret:
|
||||
self._pruneQueues()
|
||||
return ret
|
||||
|
||||
def dequeueItem(self, item):
|
||||
super(SupercedentPipelineManager, self).dequeueItem(item)
|
||||
# A supercedent pipeline manager dynamically removes empty
|
||||
# queues
|
||||
if not item.queue.queue:
|
||||
self.pipeline.removeQueue(item.queue)
|
Loading…
Reference in New Issue
Block a user