Fix erroneous semaphore leak cleanup
When a scheduler starts, we run some cleanup routines in case this is the first scheduler start after an unclean shutdown. One of the routines detectes and releases leaked semaphores. There were two issues with this which caused us to erroneously detect a semaphore as leaked and delete it. Fixing either alone would have been sufficient to avoid the issue; fixing both is correct. 1) The pipeline state object was not serializing its layout uuid. This means that every time a new pipeline state object is created, it is assumed that its layout is newer than what is in zookeeper, and a re-enqueue must be performed. All existing queue items are moved to the old_queues attribute, and a scheduler (possibly a different one) will re-enqueue them. 2) The semaphore leak detection method did not ask for "old" queue items. This means that if a re-enqueue is in progress, it will only see the queue items which have been re-enqueued. These two issues combined cause us to fairly reliably move all of the queue items out of the active queue and then look at only the active queue to determine which semaphores are in use (which is to say, none) and then release those semaphores. This patch corrects both issues. Change-Id: Ibf5fca03bb3bd33fefdf1982b8245be0e09df567
This commit is contained in:
parent
38fb68ef50
commit
793a3de2b8
|
@ -0,0 +1,37 @@
|
|||
- pipeline:
|
||||
name: check
|
||||
manager: independent
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: patchset-created
|
||||
success:
|
||||
gerrit:
|
||||
Verified: 1
|
||||
failure:
|
||||
gerrit:
|
||||
Verified: -1
|
||||
|
||||
- job:
|
||||
name: base
|
||||
parent: null
|
||||
run: playbooks/base.yaml
|
||||
|
||||
- job:
|
||||
name: test1
|
||||
run: playbooks/check.yaml
|
||||
semaphore: test-semaphore
|
||||
|
||||
- job:
|
||||
name: test2
|
||||
run: playbooks/check.yaml
|
||||
semaphore: test-semaphore
|
||||
|
||||
- semaphore:
|
||||
name: test-semaphore
|
||||
|
||||
- project:
|
||||
name: org/project
|
||||
check:
|
||||
jobs:
|
||||
- test1
|
||||
- test2
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
import zuul.model
|
||||
|
||||
from tests.base import iterate_timeout, ZuulTestCase
|
||||
from tests.base import iterate_timeout, ZuulTestCase, simple_layout
|
||||
|
||||
|
||||
class TestScaleOutScheduler(ZuulTestCase):
|
||||
|
@ -209,3 +209,55 @@ class TestScaleOutScheduler(ZuulTestCase):
|
|||
# second object can read now.
|
||||
summary2.refresh(context)
|
||||
self.assertNotEqual(summary2.status, {})
|
||||
|
||||
@simple_layout('layouts/semaphore.yaml')
|
||||
def test_semaphore(self):
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
self.assertEqual(len(self.builds), 1)
|
||||
self.assertEqual(self.builds[0].name, 'test1')
|
||||
self.assertHistory([])
|
||||
|
||||
tenant = self.scheds.first.sched.abide.tenants['tenant-one']
|
||||
semaphore = tenant.semaphore_handler.getSemaphores()[0]
|
||||
holders = tenant.semaphore_handler.semaphoreHolders(semaphore)
|
||||
self.assertEqual(len(holders), 1)
|
||||
|
||||
# Start a second scheduler so that it runs through the initial
|
||||
# cleanup processes.
|
||||
app = self.createScheduler()
|
||||
# Hold the lock on the second scheduler so that if any events
|
||||
# happen, they are processed by the first scheduler (this lets
|
||||
# them be as out of sync as possible).
|
||||
with app.sched.run_handler_lock:
|
||||
app.start()
|
||||
self.assertEqual(len(self.scheds), 2)
|
||||
self.waitUntilSettled(matcher=[self.scheds.first])
|
||||
# Wait until initial cleanup is run
|
||||
app.sched.start_cleanup_thread.join()
|
||||
# We should not have released the semaphore
|
||||
holders = tenant.semaphore_handler.semaphoreHolders(semaphore)
|
||||
self.assertEqual(len(holders), 1)
|
||||
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
self.assertEqual(len(self.builds), 1)
|
||||
self.assertEqual(self.builds[0].name, 'test2')
|
||||
self.assertHistory([
|
||||
dict(name='test1', result='SUCCESS', changes='1,1'),
|
||||
], ordered=False)
|
||||
holders = tenant.semaphore_handler.semaphoreHolders(semaphore)
|
||||
self.assertEqual(len(holders), 1)
|
||||
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
self.assertEqual(len(self.builds), 0)
|
||||
self.assertHistory([
|
||||
dict(name='test1', result='SUCCESS', changes='1,1'),
|
||||
dict(name='test2', result='SUCCESS', changes='1,1'),
|
||||
], ordered=False)
|
||||
|
||||
holders = tenant.semaphore_handler.semaphoreHolders(semaphore)
|
||||
self.assertEqual(len(holders), 0)
|
||||
|
|
|
@ -656,6 +656,7 @@ class PipelineState(zkobject.ZKObject):
|
|||
"disabled": self.disabled,
|
||||
"queues": [q.getPath() for q in self.queues],
|
||||
"old_queues": [q.getPath() for q in self.old_queues],
|
||||
"layout_uuid": self.layout_uuid,
|
||||
}
|
||||
return json.dumps(data, sort_keys=True).encode("utf8")
|
||||
|
||||
|
|
|
@ -203,7 +203,7 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
|
|||
|
||||
running_handles = set()
|
||||
for pipeline in self.layout.pipelines.values():
|
||||
for item in pipeline.getAllItems():
|
||||
for item in pipeline.getAllItems(include_old=True):
|
||||
for job in item.getJobs():
|
||||
running_handles.add(f"{item.uuid}-{job.name}")
|
||||
|
||||
|
|
Loading…
Reference in New Issue