Periodically clean up leaked semaphores

If there happens to be a bug in Zuul, or if the scheduler crashes,
then we may leak semaphores, which, now that they are held in ZK,
is a permanent situation.  To avoid the necessity of operators
manually editing or removing ZK nodes, introduce a periodic semaphore
cleanup.

This identifies any semaphores which are held by jobs which are not
running and therefore may be removed.  It runs once an hour, outside
the main scheduler thread.  It also runs immediately after the first
reconfiguration, so that if a scheduler crashes and is restarted,
leaked semaphores are cleaned up quickly.

The general algorithm should be safe with multiple schedulers, though
the details about how we should traverse the entire abide are unclear
at this point, so this may need to change.  But it is as forward-
compatible as I can make it right now.  We may also decide that once
all running builds are in ZK, we no longer need this.  Regardless,
it is needed now, and should be adaptible to future situtaions if we
want it.

Change-Id: I023ddabb13047a1ed95a26a8f5793de36817c191
This commit is contained in:
James E. Blair 2021-04-01 18:54:59 -07:00
parent 991d8280ac
commit c10c46a8ac
3 changed files with 145 additions and 13 deletions

View File

@ -8125,6 +8125,56 @@ class TestSemaphore(ZuulTestCase):
self.executor_server.release()
self.waitUntilSettled()
def test_semaphore_handler_cleanup(self):
"Test the semaphore handler leak cleanup"
self.executor_server.hold_jobs_in_build = True
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
# Save some variables for later use while the job is running
check_pipeline = tenant.layout.pipelines['check']
item = check_pipeline.getAllItems()[0]
job = item.getJob('semaphore-one-test1')
tenant.semaphore_handler.cleanupLeaks()
# Nothing has leaked; our handle should be present.
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
# Make sure the semaphore is released normally
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
# Use our previously saved data to simulate a leaked semaphore
tenant.semaphore_handler.acquire(item, job, False)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
tenant.semaphore_handler.cleanupLeaks()
# Make sure the leaked semaphore is cleaned up
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
class TestSemaphoreMultiTenant(ZuulTestCase):
tenant_config_file = 'config/multi-tenant-semaphore/main.yaml'

View File

@ -99,6 +99,7 @@ class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler")
_stats_interval = 30
_cleanup_interval = 60 * 60
_merger_client_class = MergeClient
# Number of seconds past node expiration a hold request will remain
@ -129,6 +130,9 @@ class Scheduler(threading.Thread):
self.stats_thread = threading.Thread(target=self.runStats)
self.stats_thread.daemon = True
self.stats_stop = threading.Event()
self.cleanup_thread = threading.Thread(target=self.runCleanup)
self.cleanup_thread.daemon = True
self.cleanup_stop = threading.Event()
# TODO(jeblair): fix this
# Despite triggers being part of the pipeline, there is one trigger set
# per scheduler. The pipeline handles the trigger filters but since
@ -219,15 +223,18 @@ class Scheduler(threading.Thread):
self.rpc.start()
self.rpc_slow.start()
self.stats_thread.start()
self.cleanup_thread.start()
self.zk_component.set('state', self.zk_component.RUNNING)
def stop(self):
self._stopped = True
self.zk_component.set('state', self.zk_component.STOPPED)
self.stats_stop.set()
self.cleanup_stop.set()
self.stopConnections()
self.wake_event.set()
self.stats_thread.join()
self.cleanup_thread.join()
self.rpc.stop()
self.rpc.join()
self.rpc_slow.stop()
@ -326,6 +333,31 @@ class Scheduler(threading.Thread):
self.statsd.gauge('zuul.scheduler.eventqueues.management',
len(self.management_events))
def runCleanup(self):
# Run the first cleanup immediately after the first
# reconfiguration.
interval = 0
while not self.cleanup_stop.wait(interval):
try:
if not self.last_reconfigured:
time.sleep(1)
continue
self._runCleanup()
except Exception:
self.log.exception("Error in periodic cleanup:")
interval = self._cleanup_interval
def _runCleanup(self):
# Get the layout lock to make sure the abide doesn't change
# under us.
with self.layout_lock:
self.log.debug("Starting semaphore cleanup")
for tenant in self.abide.tenants.values():
try:
tenant.semaphore_handler.cleanupLeaks()
except Exception:
self.log.exception("Error in semaphore cleanup:")
def addTriggerEvent(self, driver_name, event):
event.arrived_at_scheduler_timestamp = time.time()
self.trigger_events.put(driver_name, event)

View File

@ -1,4 +1,5 @@
# Copyright 2021 BMW Group
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -93,22 +94,21 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
data, zstat = self.kazoo_client.get(semaphore_path)
return holdersFromData(data), zstat
def release(self, item, job):
if not job.semaphore:
return
log = get_annotated_logger(self.log, item.event)
semaphore_key = quote_plus(job.semaphore.name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
semaphore_handle = f"{item.uuid}-{job.name}"
def getSemaphores(self):
try:
return self.kazoo_client.get_children(self.tenant_root)
except NoNodeError:
return []
def _release(self, log, semaphore_path, semaphore_handle):
while True:
try:
semaphore_holders, zstat = self.getHolders(semaphore_path)
semaphore_holders.remove(semaphore_handle)
except (ValueError, NoNodeError):
log.error("Semaphore can not be released for %s "
"because the semaphore is not held", item)
log.error("Semaphore %s can not be released for %s "
"because the semaphore is not held", semaphore_path,
semaphore_handle)
break
try:
@ -118,13 +118,24 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
except BadVersionError:
log.debug(
"Retrying semaphore %s release due to concurrent update",
job.semaphore.name)
semaphore_path)
continue
log.debug("Semaphore %s released: job %s, item %s",
job.semaphore.name, job.name, item)
log.debug("Semaphore %s released for %s",
semaphore_path, semaphore_handle)
break
def release(self, item, job):
if not job.semaphore:
return
log = get_annotated_logger(self.log, item.event)
semaphore_key = quote_plus(job.semaphore.name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
semaphore_handle = f"{item.uuid}-{job.name}"
self._release(log, semaphore_path, semaphore_handle)
def semaphoreHolders(self, semaphore_name):
semaphore_key = quote_plus(semaphore_name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
@ -137,3 +148,42 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
def _max_count(self, semaphore_name: str) -> int:
semaphore = self.layout.semaphores.get(semaphore_name)
return 1 if semaphore is None else semaphore.max
def cleanupLeaks(self):
# This is designed to account for jobs starting and stopping
# while this runs, and should therefore be safe to run outside
# of the scheduler main loop (and accross multiple
# schedulers).
first_semaphores_by_holder = {}
for semaphore in self.getSemaphores():
for holder in self.semaphoreHolders(semaphore):
first_semaphores_by_holder[holder] = semaphore
first_holders = set(first_semaphores_by_holder.keys())
running_handles = set()
for pipeline in self.layout.pipelines.values():
for item in pipeline.getAllItems():
for job in item.getJobs():
running_handles.add(f"{item.uuid}-{job.name}")
second_semaphores_by_holder = {}
for semaphore in self.getSemaphores():
for holder in self.semaphoreHolders(semaphore):
second_semaphores_by_holder[holder] = semaphore
second_holders = set(second_semaphores_by_holder.keys())
# The stable set of holders; avoids race conditions with
# scheduler(s) starting jobs.
holders = first_holders.intersection(second_holders)
semaphores_by_holder = first_semaphores_by_holder
semaphores_by_holder.update(second_semaphores_by_holder)
for holder in holders:
if holder not in running_handles:
semaphore_name = semaphores_by_holder[holder]
semaphore_key = quote_plus(semaphore_name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
self.log.error("Releasing leaked semaphore %s held by %s",
semaphore_path, holder)
self._release(self.log, semaphore_path, holder)