Refresh pipeline change list and status when refreshing the state
The ZKObject uses an internal hash to determine whether it has been updated since the last time it was read so that we can avoid unecessary ZK writes. Therefore we must ensure that we read the latest data before we make any updates. However, we were not doing that for the pipeline summary. That could permit the following sequence: * Scheduler A initializes the pipline summary (empty) * Scheduler B enqueues an item in the pipeline and updates the summary to include the item * Scheduler A removes the item and updates the summary to exclude the item; but that is the same content as it had previously so the ZK write is skipped We are left with a summary with the item included in the pipeline when it actually is not. Note that the attached test runs no jobs -- that's because in order for this case to hit, the change must be added and removed from the pipeline in only two cycles, which is only possible with projects that do not participate in the pipeline. To correct this, refresh the pipeline summary when we refresh the pipeline state. Additionally, do the same for the change list; though the problem is much less likely to occur in this case since the change list is refreshed during tenant trigger event forwarding which occurs immediately prior to pipeline processing. But there is still a window where we could refresh the change list, and another scheduler could process a pipeline in the interim. This is more defensive. Change-Id: I365c19ccb567a53f35c68bfd03acf42e8739345a
This commit is contained in:
parent
7b93424cc8
commit
72e6234157
|
@ -0,0 +1,20 @@
|
|||
- pipeline:
|
||||
name: check
|
||||
manager: independent
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: patchset-created
|
||||
success:
|
||||
gerrit:
|
||||
Verified: 1
|
||||
failure:
|
||||
gerrit:
|
||||
Verified: -1
|
||||
|
||||
- job:
|
||||
name: base
|
||||
parent: null
|
||||
run: playbooks/base.yaml
|
||||
|
||||
- project:
|
||||
name: org/project
|
|
@ -55,6 +55,45 @@ class TestScaleOutScheduler(ZuulTestCase):
|
|||
dict(name='project-test2', result='SUCCESS', changes='1,1'),
|
||||
], ordered=False)
|
||||
|
||||
@simple_layout('layouts/multi-scheduler-status.yaml')
|
||||
def test_multi_scheduler_status(self):
|
||||
self.hold_merge_jobs_in_queue = True
|
||||
|
||||
first = self.scheds.first
|
||||
second = self.createScheduler()
|
||||
second.start()
|
||||
self.assertEqual(len(self.scheds), 2)
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.log.debug("Force second scheduler to process check")
|
||||
with first.sched.run_handler_lock:
|
||||
event = zuul.model.PipelinePostConfigEvent()
|
||||
first.sched.pipeline_management_events[
|
||||
'tenant-one']['check'].put(event, needs_result=False)
|
||||
self.waitUntilSettled(matcher=[second])
|
||||
|
||||
self.log.debug("Add change in first scheduler")
|
||||
with second.sched.run_handler_lock:
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled(matcher=[first])
|
||||
|
||||
self.log.debug("Finish change in second scheduler")
|
||||
with first.sched.run_handler_lock:
|
||||
self.hold_merge_jobs_in_queue = False
|
||||
self.merger_api.release()
|
||||
self.waitUntilSettled(matcher=[second])
|
||||
|
||||
self.assertHistory([])
|
||||
|
||||
tenant = first.sched.abide.tenants['tenant-one']
|
||||
pipeline = tenant.layout.pipelines['check']
|
||||
summary = zuul.model.PipelineSummary()
|
||||
summary._set(pipeline=pipeline)
|
||||
context = self.createZKContext()
|
||||
summary.refresh(context)
|
||||
self.assertEqual(summary.status['change_queues'], [])
|
||||
|
||||
def test_config_priming(self):
|
||||
# Wait until scheduler is primed
|
||||
self.waitUntilSettled()
|
||||
|
|
|
@ -1961,7 +1961,10 @@ class Scheduler(threading.Thread):
|
|||
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
|
||||
ctx = pipeline.manager.current_context
|
||||
with self.statsd_timer(f'{stats_key}.refresh'):
|
||||
pipeline.change_list.refresh(ctx)
|
||||
pipeline.summary.refresh(ctx)
|
||||
pipeline.state.refresh(ctx)
|
||||
|
||||
pipeline.state.setDirty(self.zk_client.client)
|
||||
if pipeline.state.old_queues:
|
||||
self._reenqueuePipeline(tenant, pipeline, ctx)
|
||||
|
|
Loading…
Reference in New Issue