zuul/zuul/manager/supercedent.py

91 lines
3.7 KiB
Python

# Copyright 2021, 2023-2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from zuul import model
from zuul.lib.logutil import get_annotated_logger
from zuul.manager import PipelineManager, DynamicChangeQueueContextManager
class SupercedentPipelineManager(PipelineManager):
"""PipelineManager with one queue per project and a window of 1"""
changes_merge = False
type = 'supercedent'
def getChangeQueue(self, change, event, existing=None):
log = get_annotated_logger(self.log, event)
# creates a new change queue for every project-ref
# combination.
if existing:
return DynamicChangeQueueContextManager(existing)
# Don't use Pipeline.getQueue to find an existing queue
# because we're matching project and (branch or ref).
for queue in self.pipeline.queues:
if (queue.queue[-1].changes[0].project == change.project and
((hasattr(change, 'branch') and
hasattr(queue.queue[-1].changes[0], 'branch') and
queue.queue[-1].changes[0].branch == change.branch) or
queue.queue[-1].changes[0].ref == change.ref)):
log.debug("Found existing queue %s", queue)
return DynamicChangeQueueContextManager(queue)
change_queue = model.ChangeQueue.new(
self.pipeline.manager.current_context,
pipeline=self.pipeline,
window=1,
window_floor=1,
window_ceiling=1,
window_increase_type='none',
window_decrease_type='none',
dynamic=True)
change_queue.addProject(change.project, None)
self.pipeline.addQueue(change_queue)
log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(change_queue)
def _pruneQueues(self):
# Leave the first item in the queue, as it's running, and the
# last item, as it's the most recent, but remove any items in
# between. This is what causes the last item to "supercede"
# any previously enqueued items (which we know aren't running
# jobs because the window size is 1).
for queue in self.pipeline.queues[:]:
remove = queue.queue[1:-1]
for item in remove:
self.log.debug("Item %s is superceded by %s, removing" %
(item, queue.queue[-1]))
self.removeItem(item)
def cycleForChange(self, *args, **kw):
# Supercedent pipelines ignore circular dependencies and
# individually enqueue each change that matches the trigger.
# This is because they ignore shared queues and instead create
# a virtual queue for each project-ref.
return []
def addChange(self, *args, **kw):
ret = super(SupercedentPipelineManager, self).addChange(
*args, **kw)
if ret:
self._pruneQueues()
return ret
def dequeueItem(self, item, quiet=False):
super(SupercedentPipelineManager, self).dequeueItem(item, quiet)
# A supercedent pipeline manager dynamically removes empty
# queues
if not item.queue.queue:
self.pipeline.removeQueue(item.queue)