From c10c46a8ac0daf8dc334a462ad1e2f337893167f Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 1 Apr 2021 18:54:59 -0700 Subject: [PATCH] Periodically clean up leaked semaphores If there happens to be a bug in Zuul, or if the scheduler crashes, then we may leak semaphores, which, now that they are held in ZK, is a permanent situation. To avoid the necessity of operators manually editing or removing ZK nodes, introduce a periodic semaphore cleanup. This identifies any semaphores which are held by jobs which are not running and therefore may be removed. It runs once an hour, outside the main scheduler thread. It also runs immediately after the first reconfiguration, so that if a scheduler crashes and is restarted, leaked semaphores are cleaned up quickly. The general algorithm should be safe with multiple schedulers, though the details about how we should traverse the entire abide are unclear at this point, so this may need to change. But it is as forward- compatible as I can make it right now. We may also decide that once all running builds are in ZK, we no longer need this. Regardless, it is needed now, and should be adaptible to future situtaions if we want it. Change-Id: I023ddabb13047a1ed95a26a8f5793de36817c191 --- tests/unit/test_scheduler.py | 50 ++++++++++++++++++++++++ zuul/scheduler.py | 32 +++++++++++++++ zuul/zk/semaphore.py | 76 ++++++++++++++++++++++++++++++------ 3 files changed, 145 insertions(+), 13 deletions(-) diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 99d166ee96..b563afb5e6 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -8125,6 +8125,56 @@ class TestSemaphore(ZuulTestCase): self.executor_server.release() self.waitUntilSettled() + def test_semaphore_handler_cleanup(self): + "Test the semaphore handler leak cleanup" + self.executor_server.hold_jobs_in_build = True + tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') + + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) + + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) + + # Save some variables for later use while the job is running + check_pipeline = tenant.layout.pipelines['check'] + item = check_pipeline.getAllItems()[0] + job = item.getJob('semaphore-one-test1') + + tenant.semaphore_handler.cleanupLeaks() + + # Nothing has leaked; our handle should be present. + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + # Make sure the semaphore is released normally + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) + + # Use our previously saved data to simulate a leaked semaphore + tenant.semaphore_handler.acquire(item, job, False) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) + + tenant.semaphore_handler.cleanupLeaks() + # Make sure the leaked semaphore is cleaned up + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) + class TestSemaphoreMultiTenant(ZuulTestCase): tenant_config_file = 'config/multi-tenant-semaphore/main.yaml' diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 1b01e4a488..0e0ab6eaa0 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -99,6 +99,7 @@ class Scheduler(threading.Thread): log = logging.getLogger("zuul.Scheduler") _stats_interval = 30 + _cleanup_interval = 60 * 60 _merger_client_class = MergeClient # Number of seconds past node expiration a hold request will remain @@ -129,6 +130,9 @@ class Scheduler(threading.Thread): self.stats_thread = threading.Thread(target=self.runStats) self.stats_thread.daemon = True self.stats_stop = threading.Event() + self.cleanup_thread = threading.Thread(target=self.runCleanup) + self.cleanup_thread.daemon = True + self.cleanup_stop = threading.Event() # TODO(jeblair): fix this # Despite triggers being part of the pipeline, there is one trigger set # per scheduler. The pipeline handles the trigger filters but since @@ -219,15 +223,18 @@ class Scheduler(threading.Thread): self.rpc.start() self.rpc_slow.start() self.stats_thread.start() + self.cleanup_thread.start() self.zk_component.set('state', self.zk_component.RUNNING) def stop(self): self._stopped = True self.zk_component.set('state', self.zk_component.STOPPED) self.stats_stop.set() + self.cleanup_stop.set() self.stopConnections() self.wake_event.set() self.stats_thread.join() + self.cleanup_thread.join() self.rpc.stop() self.rpc.join() self.rpc_slow.stop() @@ -326,6 +333,31 @@ class Scheduler(threading.Thread): self.statsd.gauge('zuul.scheduler.eventqueues.management', len(self.management_events)) + def runCleanup(self): + # Run the first cleanup immediately after the first + # reconfiguration. + interval = 0 + while not self.cleanup_stop.wait(interval): + try: + if not self.last_reconfigured: + time.sleep(1) + continue + self._runCleanup() + except Exception: + self.log.exception("Error in periodic cleanup:") + interval = self._cleanup_interval + + def _runCleanup(self): + # Get the layout lock to make sure the abide doesn't change + # under us. + with self.layout_lock: + self.log.debug("Starting semaphore cleanup") + for tenant in self.abide.tenants.values(): + try: + tenant.semaphore_handler.cleanupLeaks() + except Exception: + self.log.exception("Error in semaphore cleanup:") + def addTriggerEvent(self, driver_name, event): event.arrived_at_scheduler_timestamp = time.time() self.trigger_events.put(driver_name, event) diff --git a/zuul/zk/semaphore.py b/zuul/zk/semaphore.py index 0f986e2f90..c5eb150c3d 100644 --- a/zuul/zk/semaphore.py +++ b/zuul/zk/semaphore.py @@ -1,4 +1,5 @@ # Copyright 2021 BMW Group +# Copyright 2021 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -93,22 +94,21 @@ class SemaphoreHandler(ZooKeeperSimpleBase): data, zstat = self.kazoo_client.get(semaphore_path) return holdersFromData(data), zstat - def release(self, item, job): - if not job.semaphore: - return - - log = get_annotated_logger(self.log, item.event) - semaphore_key = quote_plus(job.semaphore.name) - semaphore_path = f"{self.tenant_root}/{semaphore_key}" - semaphore_handle = f"{item.uuid}-{job.name}" + def getSemaphores(self): + try: + return self.kazoo_client.get_children(self.tenant_root) + except NoNodeError: + return [] + def _release(self, log, semaphore_path, semaphore_handle): while True: try: semaphore_holders, zstat = self.getHolders(semaphore_path) semaphore_holders.remove(semaphore_handle) except (ValueError, NoNodeError): - log.error("Semaphore can not be released for %s " - "because the semaphore is not held", item) + log.error("Semaphore %s can not be released for %s " + "because the semaphore is not held", semaphore_path, + semaphore_handle) break try: @@ -118,13 +118,24 @@ class SemaphoreHandler(ZooKeeperSimpleBase): except BadVersionError: log.debug( "Retrying semaphore %s release due to concurrent update", - job.semaphore.name) + semaphore_path) continue - log.debug("Semaphore %s released: job %s, item %s", - job.semaphore.name, job.name, item) + log.debug("Semaphore %s released for %s", + semaphore_path, semaphore_handle) break + def release(self, item, job): + if not job.semaphore: + return + + log = get_annotated_logger(self.log, item.event) + semaphore_key = quote_plus(job.semaphore.name) + semaphore_path = f"{self.tenant_root}/{semaphore_key}" + semaphore_handle = f"{item.uuid}-{job.name}" + + self._release(log, semaphore_path, semaphore_handle) + def semaphoreHolders(self, semaphore_name): semaphore_key = quote_plus(semaphore_name) semaphore_path = f"{self.tenant_root}/{semaphore_key}" @@ -137,3 +148,42 @@ class SemaphoreHandler(ZooKeeperSimpleBase): def _max_count(self, semaphore_name: str) -> int: semaphore = self.layout.semaphores.get(semaphore_name) return 1 if semaphore is None else semaphore.max + + def cleanupLeaks(self): + # This is designed to account for jobs starting and stopping + # while this runs, and should therefore be safe to run outside + # of the scheduler main loop (and accross multiple + # schedulers). + + first_semaphores_by_holder = {} + for semaphore in self.getSemaphores(): + for holder in self.semaphoreHolders(semaphore): + first_semaphores_by_holder[holder] = semaphore + first_holders = set(first_semaphores_by_holder.keys()) + + running_handles = set() + for pipeline in self.layout.pipelines.values(): + for item in pipeline.getAllItems(): + for job in item.getJobs(): + running_handles.add(f"{item.uuid}-{job.name}") + + second_semaphores_by_holder = {} + for semaphore in self.getSemaphores(): + for holder in self.semaphoreHolders(semaphore): + second_semaphores_by_holder[holder] = semaphore + second_holders = set(second_semaphores_by_holder.keys()) + + # The stable set of holders; avoids race conditions with + # scheduler(s) starting jobs. + holders = first_holders.intersection(second_holders) + semaphores_by_holder = first_semaphores_by_holder + semaphores_by_holder.update(second_semaphores_by_holder) + + for holder in holders: + if holder not in running_handles: + semaphore_name = semaphores_by_holder[holder] + semaphore_key = quote_plus(semaphore_name) + semaphore_path = f"{self.tenant_root}/{semaphore_key}" + self.log.error("Releasing leaked semaphore %s held by %s", + semaphore_path, holder) + self._release(self.log, semaphore_path, holder)