Move executor api cleanup to apscheduler

We don't need to clean up leaked build parameters every time we
clean up lost build requests.  The latter happens every 60 seconds
because it's performance critical, but the param cleanup can happen
much less frequently.

Move it into the scheduled tasks that the scheduler runs, but rather
than continuing to add new apsched events, combine this with the
config cache cleanup.  We can also add the similar cleanup task which
will be needed by the merger api.  These can all run infrequently
(every 60 minutes).

The only downside to this is that we will have one scheduler running
all of these combined tasks, but they are relatively low impact so
it's probably not important to load-balance them.

Change-Id: I76ce9df242ebdaf9f1b7113530775bd702ccaaa0
This commit is contained in:
James E. Blair 2021-08-02 14:03:38 -07:00
parent a99b2dace3
commit c237772ac8
3 changed files with 27 additions and 5 deletions

View File

@ -229,7 +229,6 @@ class ExecutorClient(object):
self.cleanupLostBuildRequest(build_request)
except Exception:
self.log.exception("Exception cleaning up lost build request:")
self.executor_api.cleanup()
def cleanupLostBuildRequest(self, build_request):
result = {"result": "ABORTED"}

View File

@ -70,7 +70,11 @@ from zuul.model import (
STATE_FAILED,
)
from zuul.zk import ZooKeeperClient
from zuul.zk.cleanup import SemaphoreCleanupLock, BuildRequestCleanupLock
from zuul.zk.cleanup import (
SemaphoreCleanupLock,
BuildRequestCleanupLock,
GeneralCleanupLock,
)
from zuul.zk.components import (
BaseComponent, ComponentRegistry, SchedulerComponent
)
@ -122,7 +126,7 @@ class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler")
_stats_interval = 30
_semaphore_cleanup_interval = IntervalTrigger(minutes=60, jitter=60)
_config_cache_cleanup_interval = IntervalTrigger(minutes=60, jitter=60)
_general_cleanup_interval = IntervalTrigger(minutes=60, jitter=60)
_build_request_cleanup_interval = IntervalTrigger(seconds=60, jitter=5)
_merger_client_class = MergeClient
_executor_client_class = ExecutorClient
@ -196,6 +200,7 @@ class Scheduler(threading.Thread):
self.zk_client
)
self.general_cleanup_lock = GeneralCleanupLock(self.zk_client)
self.semaphore_cleanup_lock = SemaphoreCleanupLock(self.zk_client)
self.build_request_cleanup_lock = BuildRequestCleanupLock(
self.zk_client)
@ -464,8 +469,8 @@ class Scheduler(threading.Thread):
trigger=self._semaphore_cleanup_interval)
self.apsched.add_job(self._runBuildRequestCleanup,
trigger=self._build_request_cleanup_interval)
self.apsched.add_job(self._runConfigCacheCleanup,
trigger=self._config_cache_cleanup_interval)
self.apsched.add_job(self._runGeneralCleanup,
trigger=self._general_cleanup_interval)
return
def _runSemaphoreCleanup(self):
@ -483,6 +488,11 @@ class Scheduler(threading.Thread):
finally:
self.semaphore_cleanup_lock.release()
def _runGeneralCleanup(self):
if self.general_cleanup_lock.acquire(blocking=False):
self._runConfigCacheCleanup()
self._runExecutorApiCleanup()
def _runConfigCacheCleanup(self):
with self.layout_lock:
try:
@ -497,6 +507,12 @@ class Scheduler(threading.Thread):
except Exception:
self.log.exception("Error in config cache cleanup:")
def _runExecutorApiCleanup(self):
try:
self.executor.executor_api.cleanup()
except Exception:
self.log.exception("Error in executor API cleanup:")
def _runBuildRequestCleanup(self):
# If someone else is running the cleanup, skip it.
if self.build_request_cleanup_lock.acquire(blocking=False):

View File

@ -27,3 +27,10 @@ class BuildRequestCleanupLock(kazoo.recipe.lock.Lock):
def __init__(self, client):
super().__init__(client.client, self._path)
class GeneralCleanupLock(kazoo.recipe.lock.Lock):
_path = '/zuul/cleanup/general'
def __init__(self, client):
super().__init__(client.client, self._path)