Clear pipeline change cache at start of refresh

The pipeline change cache is used to avoid repeated cache lookups
for change dependencies, but is not as robust as the "real" sounce
change cache at managing the lifetime of change objects.  If it is
used to store some change objects in one scheduler which are later
dequeued by a second scheduler, the next time those changes show
up in that pipeline on the first scheduler it may use old change
objects instead of new ones from the ZooKeeper cache.

To fix this, clear the pipeeline change cache before we refresh
the pipeline state.  This will cause some extra ZK ChangeCache
hits to repopulate the cache, but only in the case of commit
dependencies (the pipeline change cache isn't used for git
dependencies or the changes in the pipeline themselves unless they
are commit dependencies).

Change-Id: I0a20dc972917440d4f3e8bb59295b77c13913a48
This commit is contained in:
James E. Blair 2022-07-27 09:52:43 -07:00
parent 1636ecd23b
commit a61ec1bb5f
3 changed files with 81 additions and 0 deletions

View File

@ -55,6 +55,79 @@ class TestScaleOutScheduler(ZuulTestCase):
dict(name='project-test2', result='SUCCESS', changes='1,1'),
], ordered=False)
def test_pipeline_cache_clear(self):
# Test that the pipeline cache on a second scheduler isn't
# holding old change objects.
# Hold jobs in build
sched1 = self.scheds.first
self.executor_server.hold_jobs_in_build = True
# We need a pair of changes in order to populate the pipeline
# change cache (a single change doesn't activate the cache,
# it's for dependencies).
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
B.addApproval('Code-Review', 2)
B.addApproval('Approved', 1)
B.setDependsOn(A, 1)
# Fail a job
self.executor_server.failJob('project-test1', A)
# Enqueue into gate with scheduler 1
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
# Start scheduler 2
sched2 = self.createScheduler()
sched2.start()
self.assertEqual(len(self.scheds), 2)
# Pause scheduler 1
with sched1.sched.run_handler_lock:
# Release jobs
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
# Wait for scheduler 2 to dequeue
self.waitUntilSettled(matcher=[sched2])
# Unpause scheduler 1
self.assertEqual(A.data['status'], 'NEW')
self.assertEqual(B.data['status'], 'NEW')
# Clear zk change cache
self.fake_gerrit._change_cache.prune([], max_age=0)
# At this point, scheduler 1 should have a bogus change entry
# in the pipeline cache because scheduler 2 performed the
# dequeue so scheduler 1 never cleaned up its cache.
self.executor_server.fail_tests.clear()
self.executor_server.hold_jobs_in_build = True
# Pause scheduler 1
with sched1.sched.run_handler_lock:
# Enqueue into gate with scheduler 2
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled(matcher=[sched2])
# Pause scheduler 2
with sched2.sched.run_handler_lock:
# Make sure that scheduler 1 does some pipeline runs which
# reconstitute state from ZK. This gives it the
# opportunity to use old cache data if we don't clear it.
# Release job1
self.executor_server.release()
self.waitUntilSettled(matcher=[sched1])
# Release job2
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
# Wait for scheduler 1 to merge change
self.waitUntilSettled(matcher=[sched1])
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'MERGED')
@simple_layout('layouts/multi-scheduler-status.yaml')
def test_multi_scheduler_status(self):
self.hold_merge_jobs_in_queue = True

View File

@ -235,6 +235,9 @@ class PipelineManager(metaclass=ABCMeta):
resolved_changes.append(change)
return resolved_changes
def clearCache(self):
self._change_cache.clear()
def _maintainCache(self):
active_layout_uuids = set()
referenced_change_keys = set()

View File

@ -694,6 +694,11 @@ class PipelineState(zkobject.ZKObject):
return json.dumps(data, sort_keys=True).encode("utf8")
def deserialize(self, raw, context):
# We may have old change objects in the pipeline cache, so
# make sure they are the same objects we would get from the
# source change cache.
self.pipeline.manager.clearCache()
data = super().deserialize(raw, context)
existing_queues = {
q.getPath(): q for q in self.queues + self.old_queues