From 25af998d649fe0908bfe037458ca65ce5ae35f60 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 6 Jun 2023 17:18:04 -0700 Subject: [PATCH] Clear tree cache queues on disconnect If a TreeCache encounters a ZK disconnect, then any events in its event queue are no longer useful and possibly counter-productive. If we process events from before the disconnection after a reconnection, the cache could get into an erroneous state. The cache is designed so that on initial or re-connection, we first establish the watch, and then walk the tree to look for previously existing data. All events we process (whether generated by the watch or by walking the tree) must be in order after the watch was established. To ensure this, we clear the event queues on reconnect and restart the event queue threads. Additionally, this change cleans up handling of the case where the cache key is None (the parser says we're not interested in the object) but the preCacheHook didn't tell us to exit early. In that case, we would previously most likely try to pop the None object from the cache, which would generally be harmless. But we shouldn't be doing that anyway, so add a check for whether we have an object key in the main cache method and ignore the return values from the preCacheHook. Change-Id: I4bbedb19364c894a2033ef8c1d2e439299971a83 --- nodepool/tests/unit/test_zk.py | 5 +-- nodepool/zk/zookeeper.py | 63 ++++++++++++++++++++++++---------- 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/nodepool/tests/unit/test_zk.py b/nodepool/tests/unit/test_zk.py index fabc9050f..fbbc8409d 100644 --- a/nodepool/tests/unit/test_zk.py +++ b/nodepool/tests/unit/test_zk.py @@ -1416,8 +1416,9 @@ class TestTreeCache(tests.DBTestCase): break self.assertEqual(len(cached_node.lock_contenders), 1) - my_zk._node_cache._ready.clear() - my_zk._node_cache._start() + my_zk._node_cache._sessionListener(KazooState.LOST) + my_zk._node_cache._sessionListener(KazooState.CONNECTED) + my_zk._node_cache.ensureReady() my_zk._node_cache._event_queue.join() my_zk._node_cache._playback_queue.join() diff --git a/nodepool/zk/zookeeper.py b/nodepool/zk/zookeeper.py index 2530d4c4b..3af823b0b 100644 --- a/nodepool/zk/zookeeper.py +++ b/nodepool/zk/zookeeper.py @@ -748,24 +748,25 @@ class NodepoolTreeCache(abc.ABC): self._ready = threading.Event() self._init_lock = threading.Lock() self._stopped = False + self._stop_workers = False self._event_queue = queue.Queue() self._playback_queue = queue.Queue() - self._event_worker = threading.Thread( - target=self._eventWorker) - self._event_worker.daemon = True - self._event_worker.start() - self._playback_worker = threading.Thread( - target=self._playbackWorker) - self._playback_worker.daemon = True - self._playback_worker.start() + self._event_worker = None + self._playback_worker = None zk.kazoo_client.add_listener(self._sessionListener) self._start() def _sessionListener(self, state): if state == KazooState.LOST: self._ready.clear() + self._stop_workers = True + self._event_queue.put(None) + self._playback_queue.put(None) elif state == KazooState.CONNECTED and not self._stopped: self._ready.clear() + self._stop_workers = True + self._event_queue.put(None) + self._playback_queue.put(None) self.zk.kazoo_client.handler.short_spawn(self._start) def _cacheListener(self, event): @@ -775,6 +776,34 @@ class NodepoolTreeCache(abc.ABC): locked = self._init_lock.acquire(blocking=False) if locked: self.log.debug("Initialize cache at %s", self.root) + + # If we have an event worker (this is a re-init), then way + # for it to finish stopping (the session listener should + # have told it to stop). + if self._event_worker: + self._event_worker.join() + # Replace the queue since any events from the previous + # session aren't valid. + self._event_queue = queue.Queue() + # Prepare (but don't start) the new worker. + self._event_worker = threading.Thread( + target=self._eventWorker) + self._event_worker.daemon = True + + if self._playback_worker: + self._playback_worker.join() + self._playback_queue = queue.Queue() + self._playback_worker = threading.Thread( + target=self._playbackWorker) + self._playback_worker.daemon = True + + # Clear the stop flag and start the workers now that we + # are sure that both have stopped and we have cleared the + # queues. + self._stop_workers = False + self._event_worker.start() + self._playback_worker.start() + try: self.zk.kazoo_client.add_watch( self.root, self._cacheListener, @@ -832,7 +861,7 @@ class NodepoolTreeCache(abc.ABC): self._cacheListener(event) def _eventWorker(self): - while not self._stopped: + while not (self._stopped or self._stop_workers): event = self._event_queue.get() if event is None: self._event_queue.task_done() @@ -885,7 +914,7 @@ class NodepoolTreeCache(abc.ABC): self._playback_queue.put((event, future, key)) def _playbackWorker(self): - while not self._stopped: + while not (self._stopped or self._stop_workers): item = self._playback_queue.get() if item is None: self._playback_queue.task_done() @@ -942,7 +971,10 @@ class NodepoolTreeCache(abc.ABC): self._cached_paths.discard(event.path) # Some caches have special handling for certain sub-objects - if self.preCacheHook(event, exists): + self.preCacheHook(event, exists) + + # If we don't actually cache this kind of object, return now + if key is None: return if data: @@ -988,9 +1020,6 @@ class NodepoolTreeCache(abc.ABC): Otherwise, it indicates whether or not the EventType would cause the node to exist in ZK. - If True is returned, then the cache will stop processing the event. - Other return values are ignored. - :param EventType event: The event. :param bool exists: Whether the object exists in ZK. @@ -1051,8 +1080,7 @@ class ImageCache(NodepoolTreeCache): image.paused = True else: image.paused = False - # This event was for a paused path; no further handling necessary - return True + return def objectFromDict(self, d, key): if len(key) == 4: @@ -1112,8 +1140,7 @@ class NodeCache(NodepoolTreeCache): node.lock_contenders.add(contender) else: node.lock_contenders.discard(contender) - # This event was for a lock path; no further handling necessary - return True + return def postCacheHook(self, event, data, stat): # set the stats event so the stats reporting thread can act upon it