Merge "Periodically clean up leaked semaphores"

This commit is contained in:
Zuul 2021-04-07 16:38:30 +00:00 committed by Gerrit Code Review
commit fbbcff4ef1
3 changed files with 145 additions and 13 deletions

View File

@ -8125,6 +8125,56 @@ class TestSemaphore(ZuulTestCase):
self.executor_server.release() self.executor_server.release()
self.waitUntilSettled() self.waitUntilSettled()
def test_semaphore_handler_cleanup(self):
"Test the semaphore handler leak cleanup"
self.executor_server.hold_jobs_in_build = True
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
# Save some variables for later use while the job is running
check_pipeline = tenant.layout.pipelines['check']
item = check_pipeline.getAllItems()[0]
job = item.getJob('semaphore-one-test1')
tenant.semaphore_handler.cleanupLeaks()
# Nothing has leaked; our handle should be present.
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
# Make sure the semaphore is released normally
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
# Use our previously saved data to simulate a leaked semaphore
tenant.semaphore_handler.acquire(item, job, False)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
tenant.semaphore_handler.cleanupLeaks()
# Make sure the leaked semaphore is cleaned up
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
class TestSemaphoreMultiTenant(ZuulTestCase): class TestSemaphoreMultiTenant(ZuulTestCase):
tenant_config_file = 'config/multi-tenant-semaphore/main.yaml' tenant_config_file = 'config/multi-tenant-semaphore/main.yaml'

View File

@ -99,6 +99,7 @@ class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler") log = logging.getLogger("zuul.Scheduler")
_stats_interval = 30 _stats_interval = 30
_cleanup_interval = 60 * 60
_merger_client_class = MergeClient _merger_client_class = MergeClient
# Number of seconds past node expiration a hold request will remain # Number of seconds past node expiration a hold request will remain
@ -129,6 +130,9 @@ class Scheduler(threading.Thread):
self.stats_thread = threading.Thread(target=self.runStats) self.stats_thread = threading.Thread(target=self.runStats)
self.stats_thread.daemon = True self.stats_thread.daemon = True
self.stats_stop = threading.Event() self.stats_stop = threading.Event()
self.cleanup_thread = threading.Thread(target=self.runCleanup)
self.cleanup_thread.daemon = True
self.cleanup_stop = threading.Event()
# TODO(jeblair): fix this # TODO(jeblair): fix this
# Despite triggers being part of the pipeline, there is one trigger set # Despite triggers being part of the pipeline, there is one trigger set
# per scheduler. The pipeline handles the trigger filters but since # per scheduler. The pipeline handles the trigger filters but since
@ -219,15 +223,18 @@ class Scheduler(threading.Thread):
self.rpc.start() self.rpc.start()
self.rpc_slow.start() self.rpc_slow.start()
self.stats_thread.start() self.stats_thread.start()
self.cleanup_thread.start()
self.zk_component.set('state', self.zk_component.RUNNING) self.zk_component.set('state', self.zk_component.RUNNING)
def stop(self): def stop(self):
self._stopped = True self._stopped = True
self.zk_component.set('state', self.zk_component.STOPPED) self.zk_component.set('state', self.zk_component.STOPPED)
self.stats_stop.set() self.stats_stop.set()
self.cleanup_stop.set()
self.stopConnections() self.stopConnections()
self.wake_event.set() self.wake_event.set()
self.stats_thread.join() self.stats_thread.join()
self.cleanup_thread.join()
self.rpc.stop() self.rpc.stop()
self.rpc.join() self.rpc.join()
self.rpc_slow.stop() self.rpc_slow.stop()
@ -326,6 +333,31 @@ class Scheduler(threading.Thread):
self.statsd.gauge('zuul.scheduler.eventqueues.management', self.statsd.gauge('zuul.scheduler.eventqueues.management',
len(self.management_events)) len(self.management_events))
def runCleanup(self):
# Run the first cleanup immediately after the first
# reconfiguration.
interval = 0
while not self.cleanup_stop.wait(interval):
try:
if not self.last_reconfigured:
time.sleep(1)
continue
self._runCleanup()
except Exception:
self.log.exception("Error in periodic cleanup:")
interval = self._cleanup_interval
def _runCleanup(self):
# Get the layout lock to make sure the abide doesn't change
# under us.
with self.layout_lock:
self.log.debug("Starting semaphore cleanup")
for tenant in self.abide.tenants.values():
try:
tenant.semaphore_handler.cleanupLeaks()
except Exception:
self.log.exception("Error in semaphore cleanup:")
def addTriggerEvent(self, driver_name, event): def addTriggerEvent(self, driver_name, event):
event.arrived_at_scheduler_timestamp = time.time() event.arrived_at_scheduler_timestamp = time.time()
self.trigger_events.put(driver_name, event) self.trigger_events.put(driver_name, event)

View File

@ -1,4 +1,5 @@
# Copyright 2021 BMW Group # Copyright 2021 BMW Group
# Copyright 2021 Acme Gating, LLC
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -93,22 +94,21 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
data, zstat = self.kazoo_client.get(semaphore_path) data, zstat = self.kazoo_client.get(semaphore_path)
return holdersFromData(data), zstat return holdersFromData(data), zstat
def release(self, item, job): def getSemaphores(self):
if not job.semaphore: try:
return return self.kazoo_client.get_children(self.tenant_root)
except NoNodeError:
log = get_annotated_logger(self.log, item.event) return []
semaphore_key = quote_plus(job.semaphore.name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
semaphore_handle = f"{item.uuid}-{job.name}"
def _release(self, log, semaphore_path, semaphore_handle):
while True: while True:
try: try:
semaphore_holders, zstat = self.getHolders(semaphore_path) semaphore_holders, zstat = self.getHolders(semaphore_path)
semaphore_holders.remove(semaphore_handle) semaphore_holders.remove(semaphore_handle)
except (ValueError, NoNodeError): except (ValueError, NoNodeError):
log.error("Semaphore can not be released for %s " log.error("Semaphore %s can not be released for %s "
"because the semaphore is not held", item) "because the semaphore is not held", semaphore_path,
semaphore_handle)
break break
try: try:
@ -118,13 +118,24 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
except BadVersionError: except BadVersionError:
log.debug( log.debug(
"Retrying semaphore %s release due to concurrent update", "Retrying semaphore %s release due to concurrent update",
job.semaphore.name) semaphore_path)
continue continue
log.debug("Semaphore %s released: job %s, item %s", log.debug("Semaphore %s released for %s",
job.semaphore.name, job.name, item) semaphore_path, semaphore_handle)
break break
def release(self, item, job):
if not job.semaphore:
return
log = get_annotated_logger(self.log, item.event)
semaphore_key = quote_plus(job.semaphore.name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
semaphore_handle = f"{item.uuid}-{job.name}"
self._release(log, semaphore_path, semaphore_handle)
def semaphoreHolders(self, semaphore_name): def semaphoreHolders(self, semaphore_name):
semaphore_key = quote_plus(semaphore_name) semaphore_key = quote_plus(semaphore_name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}" semaphore_path = f"{self.tenant_root}/{semaphore_key}"
@ -137,3 +148,42 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
def _max_count(self, semaphore_name: str) -> int: def _max_count(self, semaphore_name: str) -> int:
semaphore = self.layout.semaphores.get(semaphore_name) semaphore = self.layout.semaphores.get(semaphore_name)
return 1 if semaphore is None else semaphore.max return 1 if semaphore is None else semaphore.max
def cleanupLeaks(self):
# This is designed to account for jobs starting and stopping
# while this runs, and should therefore be safe to run outside
# of the scheduler main loop (and accross multiple
# schedulers).
first_semaphores_by_holder = {}
for semaphore in self.getSemaphores():
for holder in self.semaphoreHolders(semaphore):
first_semaphores_by_holder[holder] = semaphore
first_holders = set(first_semaphores_by_holder.keys())
running_handles = set()
for pipeline in self.layout.pipelines.values():
for item in pipeline.getAllItems():
for job in item.getJobs():
running_handles.add(f"{item.uuid}-{job.name}")
second_semaphores_by_holder = {}
for semaphore in self.getSemaphores():
for holder in self.semaphoreHolders(semaphore):
second_semaphores_by_holder[holder] = semaphore
second_holders = set(second_semaphores_by_holder.keys())
# The stable set of holders; avoids race conditions with
# scheduler(s) starting jobs.
holders = first_holders.intersection(second_holders)
semaphores_by_holder = first_semaphores_by_holder
semaphores_by_holder.update(second_semaphores_by_holder)
for holder in holders:
if holder not in running_handles:
semaphore_name = semaphores_by_holder[holder]
semaphore_key = quote_plus(semaphore_name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
self.log.error("Releasing leaked semaphore %s held by %s",
semaphore_path, holder)
self._release(self.log, semaphore_path, holder)