diff --git a/nodepool/tests/unit/test_zk.py b/nodepool/tests/unit/test_zk.py index fabc9050f..fbbc8409d 100644 --- a/nodepool/tests/unit/test_zk.py +++ b/nodepool/tests/unit/test_zk.py @@ -1416,8 +1416,9 @@ class TestTreeCache(tests.DBTestCase): break self.assertEqual(len(cached_node.lock_contenders), 1) - my_zk._node_cache._ready.clear() - my_zk._node_cache._start() + my_zk._node_cache._sessionListener(KazooState.LOST) + my_zk._node_cache._sessionListener(KazooState.CONNECTED) + my_zk._node_cache.ensureReady() my_zk._node_cache._event_queue.join() my_zk._node_cache._playback_queue.join() diff --git a/nodepool/zk/zookeeper.py b/nodepool/zk/zookeeper.py index 2530d4c4b..3af823b0b 100644 --- a/nodepool/zk/zookeeper.py +++ b/nodepool/zk/zookeeper.py @@ -748,24 +748,25 @@ class NodepoolTreeCache(abc.ABC): self._ready = threading.Event() self._init_lock = threading.Lock() self._stopped = False + self._stop_workers = False self._event_queue = queue.Queue() self._playback_queue = queue.Queue() - self._event_worker = threading.Thread( - target=self._eventWorker) - self._event_worker.daemon = True - self._event_worker.start() - self._playback_worker = threading.Thread( - target=self._playbackWorker) - self._playback_worker.daemon = True - self._playback_worker.start() + self._event_worker = None + self._playback_worker = None zk.kazoo_client.add_listener(self._sessionListener) self._start() def _sessionListener(self, state): if state == KazooState.LOST: self._ready.clear() + self._stop_workers = True + self._event_queue.put(None) + self._playback_queue.put(None) elif state == KazooState.CONNECTED and not self._stopped: self._ready.clear() + self._stop_workers = True + self._event_queue.put(None) + self._playback_queue.put(None) self.zk.kazoo_client.handler.short_spawn(self._start) def _cacheListener(self, event): @@ -775,6 +776,34 @@ class NodepoolTreeCache(abc.ABC): locked = self._init_lock.acquire(blocking=False) if locked: self.log.debug("Initialize cache at %s", self.root) + + # If we have an event worker (this is a re-init), then way + # for it to finish stopping (the session listener should + # have told it to stop). + if self._event_worker: + self._event_worker.join() + # Replace the queue since any events from the previous + # session aren't valid. + self._event_queue = queue.Queue() + # Prepare (but don't start) the new worker. + self._event_worker = threading.Thread( + target=self._eventWorker) + self._event_worker.daemon = True + + if self._playback_worker: + self._playback_worker.join() + self._playback_queue = queue.Queue() + self._playback_worker = threading.Thread( + target=self._playbackWorker) + self._playback_worker.daemon = True + + # Clear the stop flag and start the workers now that we + # are sure that both have stopped and we have cleared the + # queues. + self._stop_workers = False + self._event_worker.start() + self._playback_worker.start() + try: self.zk.kazoo_client.add_watch( self.root, self._cacheListener, @@ -832,7 +861,7 @@ class NodepoolTreeCache(abc.ABC): self._cacheListener(event) def _eventWorker(self): - while not self._stopped: + while not (self._stopped or self._stop_workers): event = self._event_queue.get() if event is None: self._event_queue.task_done() @@ -885,7 +914,7 @@ class NodepoolTreeCache(abc.ABC): self._playback_queue.put((event, future, key)) def _playbackWorker(self): - while not self._stopped: + while not (self._stopped or self._stop_workers): item = self._playback_queue.get() if item is None: self._playback_queue.task_done() @@ -942,7 +971,10 @@ class NodepoolTreeCache(abc.ABC): self._cached_paths.discard(event.path) # Some caches have special handling for certain sub-objects - if self.preCacheHook(event, exists): + self.preCacheHook(event, exists) + + # If we don't actually cache this kind of object, return now + if key is None: return if data: @@ -988,9 +1020,6 @@ class NodepoolTreeCache(abc.ABC): Otherwise, it indicates whether or not the EventType would cause the node to exist in ZK. - If True is returned, then the cache will stop processing the event. - Other return values are ignored. - :param EventType event: The event. :param bool exists: Whether the object exists in ZK. @@ -1051,8 +1080,7 @@ class ImageCache(NodepoolTreeCache): image.paused = True else: image.paused = False - # This event was for a paused path; no further handling necessary - return True + return def objectFromDict(self, d, key): if len(key) == 4: @@ -1112,8 +1140,7 @@ class NodeCache(NodepoolTreeCache): node.lock_contenders.add(contender) else: node.lock_contenders.discard(contender) - # This event was for a lock path; no further handling necessary - return True + return def postCacheHook(self, event, data, stat): # set the stats event so the stats reporting thread can act upon it