
When we process a pipeline, we iterate over every shared change queue, then iterate over every item in each queue. When the last item in a queue is complete and removed from an independent pipeline, the queue itself is also removed. Since we iterate over the list without making a copy first, we might then skip over the next queue. We didn't notice this because any time we change a pipeline, we process it again immediately afterwords. Nonetheless, this could lead to starting jobs which we immediately cancel and other inefficiencies. What did make us take notice is that we accumulate the list of changes in a pipeline and then use that to prune the change cache. If we skip an item we would omit that change from the list of relevant changes and delete it from the cache. The timing for this has to be just right on two schedulers, but it happened in OpenDev. To correct this, make a copy of the queue before iterating. It is not feasible to reasonably test this because of the fact that we immediately run the pipeline again after changing it -- there is no opportunity for the test framework to interrupt processing between the two runs. Change-Id: Iaf0bb998c069ad6e4f7c53ca5fb05bd9675410a0
81 lines
3.3 KiB
Python
81 lines
3.3 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from zuul import model
|
|
from zuul.lib.logutil import get_annotated_logger
|
|
from zuul.manager import PipelineManager, DynamicChangeQueueContextManager
|
|
|
|
|
|
class SupercedentPipelineManager(PipelineManager):
|
|
"""PipelineManager with one queue per project and a window of 1"""
|
|
|
|
changes_merge = False
|
|
type = 'supercedent'
|
|
|
|
def getChangeQueue(self, change, event, existing=None):
|
|
log = get_annotated_logger(self.log, event)
|
|
|
|
# creates a new change queue for every project-ref
|
|
# combination.
|
|
if existing:
|
|
return DynamicChangeQueueContextManager(existing)
|
|
|
|
# Don't use Pipeline.getQueue to find an existing queue
|
|
# because we're matching project and (branch or ref).
|
|
for queue in self.pipeline.queues:
|
|
if (queue.queue[-1].change.project == change.project and
|
|
((hasattr(change, 'branch') and
|
|
hasattr(queue.queue[-1].change, 'branch') and
|
|
queue.queue[-1].change.branch == change.branch) or
|
|
queue.queue[-1].change.ref == change.ref)):
|
|
log.debug("Found existing queue %s", queue)
|
|
return DynamicChangeQueueContextManager(queue)
|
|
change_queue = model.ChangeQueue.new(
|
|
self.pipeline.manager.current_context,
|
|
pipeline=self.pipeline,
|
|
window=1,
|
|
window_floor=1,
|
|
window_increase_type='none',
|
|
window_decrease_type='none',
|
|
dynamic=True)
|
|
change_queue.addProject(change.project, None)
|
|
self.pipeline.addQueue(change_queue)
|
|
log.debug("Dynamically created queue %s", change_queue)
|
|
return DynamicChangeQueueContextManager(change_queue)
|
|
|
|
def _pruneQueues(self):
|
|
# Leave the first item in the queue, as it's running, and the
|
|
# last item, as it's the most recent, but remove any items in
|
|
# between. This is what causes the last item to "supercede"
|
|
# any previously enqueued items (which we know aren't running
|
|
# jobs because the window size is 1).
|
|
for queue in self.pipeline.queues[:]:
|
|
remove = queue.queue[1:-1]
|
|
for item in remove:
|
|
self.log.debug("Item %s is superceded by %s, removing" %
|
|
(item, queue.queue[-1]))
|
|
self.removeItem(item)
|
|
|
|
def addChange(self, *args, **kw):
|
|
ret = super(SupercedentPipelineManager, self).addChange(
|
|
*args, **kw)
|
|
if ret:
|
|
self._pruneQueues()
|
|
return ret
|
|
|
|
def dequeueItem(self, item):
|
|
super(SupercedentPipelineManager, self).dequeueItem(item)
|
|
# A supercedent pipeline manager dynamically removes empty
|
|
# queues
|
|
if not item.queue.queue:
|
|
self.pipeline.removeQueue(item.queue)
|