Fix mutation while iterating over queues

When we process a pipeline, we iterate over every shared change
queue, then iterate over every item in each queue.  When the last
item in a queue is complete and removed from an independent
pipeline, the queue itself is also removed.  Since we iterate over
the list without making a copy first, we might then skip over the
next queue.  We didn't notice this because any time we change a
pipeline, we process it again immediately afterwords.  Nonetheless,
this could lead to starting jobs which we immediately cancel and
other inefficiencies.

What did make us take notice is that we accumulate the list of
changes in a pipeline and then use that to prune the change cache.
If we skip an item we would omit that change from the list of
relevant changes and delete it from the cache.  The timing for this
has to be just right on two schedulers, but it happened in OpenDev.

To correct this, make a copy of the queue before iterating.

It is not feasible to reasonably test this because of the fact that
we immediately run the pipeline again after changing it -- there is
no opportunity for the test framework to interrupt processing between
the two runs.

Change-Id: Iaf0bb998c069ad6e4f7c53ca5fb05bd9675410a0
This commit is contained in:
James E. Blair 2021-12-01 15:22:25 -08:00
parent ac9b62e4b5
commit da9df89591
2 changed files with 2 additions and 2 deletions

View File

@ -1483,7 +1483,7 @@ class PipelineManager(metaclass=ABCMeta):
self.log.debug("Starting queue processor: %s" % self.pipeline.name)
changed = False
change_keys = set()
for queue in self.pipeline.queues:
for queue in self.pipeline.queues[:]:
queue_changed = False
nnfi = None # Nearest non-failing item
for item in queue.queue[:]:

View File

@ -58,7 +58,7 @@ class SupercedentPipelineManager(PipelineManager):
# between. This is what causes the last item to "supercede"
# any previously enqueued items (which we know aren't running
# jobs because the window size is 1).
for queue in self.pipeline.queues:
for queue in self.pipeline.queues[:]:
remove = queue.queue[1:-1]
for item in remove:
self.log.debug("Item %s is superceded by %s, removing" %