Clean up dangling cache data nodes more often

We need to periodically remove change data nodes that are not referenced
by any cache entry. This needs to happen more often than the periodic
cache maintenance (every 5min vs. every hour).

For this we need to introduce a new method `cleanupCache()` in the
connection interface.

In each run of the change cache's cleanup method we will identify
candidates of dangling data nodes to be cleaned up on the next run. The
reason for this delayed cleanup is that we don't want to remove data
nodes where a driver is in the process of creating/updating a new cache
entry.

Change-Id: I9abf7ac6fa117fe4a093ae7a0db1df0da5ae3ff3
This commit is contained in:
Simon Westphahl 2021-09-02 11:08:37 +02:00 committed by James E. Blair
parent 0d635181f8
commit 9cbcf56e14
8 changed files with 45 additions and 5 deletions

View File

@ -77,6 +77,14 @@ class BaseConnection(object, metaclass=abc.ABCMeta):
def registerScheduler(self, sched) -> None: def registerScheduler(self, sched) -> None:
self.sched = sched self.sched = sched
def cleanupCache(self):
"""Clean up the connection cache.
This allows a connection to perform periodic cleanup actions of
the cache, e.g. garbage collection.
"""
pass
def maintainCache(self, relevant, max_age): def maintainCache(self, relevant, max_age):
"""Remove stale changes from the cache. """Remove stale changes from the cache.

View File

@ -745,9 +745,11 @@ class GerritConnection(BaseConnection):
except KeyError: except KeyError:
pass pass
def cleanupCache(self):
self._change_cache.cleanup()
def maintainCache(self, relevant, max_age): def maintainCache(self, relevant, max_age):
self._change_cache.prune(relevant, max_age) self._change_cache.prune(relevant, max_age)
self._change_cache.cleanup()
def updateChangeAttributes(self, change, **attrs): def updateChangeAttributes(self, change, **attrs):
def _update_attrs(c): def _update_attrs(c):

View File

@ -103,9 +103,11 @@ class GitConnection(BaseConnection):
refs[ref] = sha refs[ref] = sha
return refs return refs
def cleanupCache(self):
self._change_cache.cleanup()
def maintainCache(self, relevant, max_age): def maintainCache(self, relevant, max_age):
self._change_cache.prune(relevant, max_age) self._change_cache.prune(relevant, max_age)
self._change_cache.cleanup()
def getChange(self, event, refresh=False): def getChange(self, event, refresh=False):
key = str((event.project_name, event.ref, event.newrev)) key = str((event.project_name, event.ref, event.newrev))

View File

@ -1262,9 +1262,11 @@ class GithubConnection(CachedBranchConnection):
return self._github_client_manager.getGithubClient( return self._github_client_manager.getGithubClient(
project_name=project_name, zuul_event_id=zuul_event_id) project_name=project_name, zuul_event_id=zuul_event_id)
def cleanupCache(self):
self._change_cache.cleanup()
def maintainCache(self, relevant, max_age): def maintainCache(self, relevant, max_age):
self._change_cache.prune(relevant, max_age) self._change_cache.prune(relevant, max_age)
self._change_cache.cleanup()
def updateChangeAttributes(self, change, **attrs): def updateChangeAttributes(self, change, **attrs):
def _update_attrs(c): def _update_attrs(c):

View File

@ -459,9 +459,11 @@ class GitlabConnection(CachedBranchConnection):
if hasattr(self, 'gitlab_event_connector'): if hasattr(self, 'gitlab_event_connector'):
self._stop_event_connector() self._stop_event_connector()
def cleanupCache(self):
self._change_cache.cleanup()
def maintainCache(self, relevant, max_age): def maintainCache(self, relevant, max_age):
self._change_cache.prune(relevant, max_age) self._change_cache.prune(relevant, max_age)
self._change_cache.cleanup()
def updateChangeAttributes(self, change, **attrs): def updateChangeAttributes(self, change, **attrs):
def _update_attrs(c): def _update_attrs(c):

View File

@ -545,9 +545,11 @@ class PagureConnection(BaseConnection):
"Fetching project %s webhook token from API" % project) "Fetching project %s webhook token from API" % project)
return token return token
def cleanupCache(self):
self._change_cache.cleanup()
def maintainCache(self, relevant, max_age): def maintainCache(self, relevant, max_age):
self._change_cache.prune(relevant, max_age) self._change_cache.prune(relevant, max_age)
self._change_cache.cleanup()
def updateChangeAttributes(self, change, **attrs): def updateChangeAttributes(self, change, **attrs):
def _update_attrs(c): def _update_attrs(c):

View File

@ -74,6 +74,7 @@ from zuul.zk import ZooKeeperClient
from zuul.zk.cleanup import ( from zuul.zk.cleanup import (
SemaphoreCleanupLock, SemaphoreCleanupLock,
BuildRequestCleanupLock, BuildRequestCleanupLock,
ConnectionCleanupLock,
GeneralCleanupLock, GeneralCleanupLock,
MergeRequestCleanupLock, MergeRequestCleanupLock,
NodeRequestCleanupLock, NodeRequestCleanupLock,
@ -133,6 +134,7 @@ class Scheduler(threading.Thread):
_general_cleanup_interval = IntervalTrigger(minutes=60, jitter=60) _general_cleanup_interval = IntervalTrigger(minutes=60, jitter=60)
_build_request_cleanup_interval = IntervalTrigger(seconds=60, jitter=5) _build_request_cleanup_interval = IntervalTrigger(seconds=60, jitter=5)
_merge_request_cleanup_interval = IntervalTrigger(seconds=60, jitter=5) _merge_request_cleanup_interval = IntervalTrigger(seconds=60, jitter=5)
_connection_cleanup_interval = IntervalTrigger(minutes=5, jitter=10)
_merger_client_class = MergeClient _merger_client_class = MergeClient
_executor_client_class = ExecutorClient _executor_client_class = ExecutorClient
@ -214,6 +216,7 @@ class Scheduler(threading.Thread):
self.zk_client) self.zk_client)
self.merge_request_cleanup_lock = MergeRequestCleanupLock( self.merge_request_cleanup_lock = MergeRequestCleanupLock(
self.zk_client) self.zk_client)
self.connection_cleanup_lock = ConnectionCleanupLock(self.zk_client)
self.node_request_cleanup_lock = NodeRequestCleanupLock(self.zk_client) self.node_request_cleanup_lock = NodeRequestCleanupLock(self.zk_client)
self.abide = Abide() self.abide = Abide()
@ -502,6 +505,8 @@ class Scheduler(threading.Thread):
trigger=self._build_request_cleanup_interval) trigger=self._build_request_cleanup_interval)
self.apsched.add_job(self._runMergeRequestCleanup, self.apsched.add_job(self._runMergeRequestCleanup,
trigger=self._merge_request_cleanup_interval) trigger=self._merge_request_cleanup_interval)
self.apsched.add_job(self._runConnectionCleanup,
trigger=self._connection_cleanup_interval)
self.apsched.add_job(self._runGeneralCleanup, self.apsched.add_job(self._runGeneralCleanup,
trigger=self._general_cleanup_interval) trigger=self._general_cleanup_interval)
return return
@ -615,6 +620,16 @@ class Scheduler(threading.Thread):
finally: finally:
self.merge_request_cleanup_lock.release() self.merge_request_cleanup_lock.release()
def _runConnectionCleanup(self):
if self.connection_cleanup_lock.acquire(blocking=False):
try:
for connection in self.connections.connections.values():
self.log.debug("Cleaning up connection cache for: %s",
connection)
connection.cleanupCache()
finally:
self.connection_cleanup_lock.release()
def addTriggerEvent(self, driver_name, event): def addTriggerEvent(self, driver_name, event):
event.arrived_at_scheduler_timestamp = time.time() event.arrived_at_scheduler_timestamp = time.time()
for tenant in self.abide.tenants.values(): for tenant in self.abide.tenants.values():

View File

@ -36,6 +36,13 @@ class MergeRequestCleanupLock(kazoo.recipe.lock.Lock):
super().__init__(client.client, self._path) super().__init__(client.client, self._path)
class ConnectionCleanupLock(kazoo.recipe.lock.Lock):
_path = '/zuul/cleanup/connection'
def __init__(self, client):
super().__init__(client.client, self._path)
class GeneralCleanupLock(kazoo.recipe.lock.Lock): class GeneralCleanupLock(kazoo.recipe.lock.Lock):
_path = '/zuul/cleanup/general' _path = '/zuul/cleanup/general'