diff --git a/nodepool/tests/unit/test_zk.py b/nodepool/tests/unit/test_zk.py index 3cc9b6b93..bd40b90f7 100644 --- a/nodepool/tests/unit/test_zk.py +++ b/nodepool/tests/unit/test_zk.py @@ -16,6 +16,8 @@ import time import uuid import socket +from kazoo.protocol.states import KazooState + from nodepool import exceptions as npe from nodepool import tests from nodepool.zk import zookeeper as zk @@ -1141,3 +1143,86 @@ class TestZKModel(tests.BaseTestCase): d = n.toDict() self.assertEqual(d["connection_port"], 22022, "Custom ssh port not set") + + +class SimpleTreeCacheObject: + def __init__(self, key, data): + self.key = key + self.data = data + self.path = '/'.join(key) + + def updateFromDict(self, data): + self.data = data + + +class SimpleTreeCache(zk.NodepoolTreeCache): + def objectFromDict(self, d, key): + return SimpleTreeCacheObject(key, d) + + def parsePath(self, path): + return tuple(path.split('/')) + + +class TestTreeCache(tests.DBTestCase): + # A very simple smoke test of the tree cache + + def waitForCache(self, cache, contents): + paths = set(contents.keys()) + for _ in iterate_timeout(10, Exception, 'cache to sync', interval=0.1): + cached_paths = cache._cached_paths.copy() + cached_paths.discard(cache.root) + object_paths = set( + [x.path for x in cache._cached_objects.values()]) + if paths == cached_paths == object_paths: + found = True + for obj in cache._cached_objects.values(): + if contents[obj.path] != obj.data: + found = False + if found: + return + + def test_tree_cache(self): + client = self.zk.kazoo_client + data = b'{}' + client.create('/test', data) + client.create('/test/foo', data) + cache = SimpleTreeCache(self.zk, "/test") + self.waitForCache(cache, { + '/test/foo': {}, + }) + client.create('/test/bar', data) + self.waitForCache(cache, { + '/test/foo': {}, + '/test/bar': {}, + }) + client.set('/test/bar', b'{"value":1}') + self.waitForCache(cache, { + '/test/foo': {}, + '/test/bar': {'value': 1}, + }) + client.delete('/test/bar') + self.waitForCache(cache, { + '/test/foo': {}, + }) + + # Simulate a change happening while the state was lost + cache._cached_paths.add('/test/bar') + cache._sessionListener(KazooState.LOST) + cache._sessionListener(KazooState.CONNECTED) + self.waitForCache(cache, { + '/test/foo': {}, + }) + + def test_tree_cache_root(self): + client = self.zk.kazoo_client + data = b'{}' + client.create('/foo', data) + cache = SimpleTreeCache(self.zk, "/") + for _ in iterate_timeout(10, Exception, 'cache to sync', interval=0.1): + cached_paths = cache._cached_paths.copy() + cached_paths.discard(cache.root) + object_paths = set( + [x.path for x in cache._cached_objects.values()]) + if ('/foo' in cached_paths and + '/foo' in object_paths): + break diff --git a/nodepool/zk/zookeeper.py b/nodepool/zk/zookeeper.py index ecf7777a6..bdbe1ddb1 100644 --- a/nodepool/zk/zookeeper.py +++ b/nodepool/zk/zookeeper.py @@ -15,19 +15,25 @@ from copy import copy import abc import json import logging +import queue import threading import time import uuid from kazoo import exceptions as kze from kazoo.recipe.lock import Lock -from kazoo.recipe.cache import TreeCache, TreeEvent from kazoo.recipe.election import Election +from kazoo.protocol.states import ( + EventType, + WatchedEvent, + KazooState, +) from nodepool import exceptions as npe from nodepool.logconfig import get_annotated_logger from nodepool.zk.components import COMPONENT_REGISTRY from nodepool.zk import ZooKeeperBase +from nodepool.zk.vendor.states import AddWatchMode from nodepool.nodeutils import Attributes # States: @@ -725,7 +731,7 @@ class Node(BaseModel): class NodepoolTreeCache(abc.ABC): ''' - Use a ZK TreeCache to keep a cache of local Nodepool objects up to date. + Use watchers to keep a cache of local Nodepool objects up to date. ''' log = logging.getLogger("nodepool.zk.ZooKeeper") @@ -734,92 +740,181 @@ class NodepoolTreeCache(abc.ABC): self.zk = zk self.root = root self._cached_objects = {} - self._tree_cache = TreeCache(zk.kazoo_client, - self.root) - self._tree_cache.listen_fault(self.cacheFaultListener) - self._tree_cache.listen(self.cacheListener) - self._tree_cache.start() + self._cached_paths = set() + self._started = False + self._stopped = False + self._queue = queue.Queue() + self._background_thread = threading.Thread( + target=self._backgroundWorker) + self._background_thread.daemon = True + self._background_thread.start() + zk.kazoo_client.add_listener(self._sessionListener) + self._start() - def cacheFaultListener(self, e): - self.log.exception(e) + def _backgroundWorker(self): + while not self._stopped: + event = self._queue.get() + if event is None: + continue + try: + self._handleCacheEvent(event) + except Exception: + self.log.exception("Error handling event %s:", event) - def cacheListener(self, event): - try: - self._cacheListener(event) - except Exception: - self.log.exception( - "Exception in cache update for event: %s", - event) + def _sessionListener(self, state): + if state == KazooState.LOST: + self._started = False + elif (state == KazooState.CONNECTED and + not self._started and not self._stopped): + self.zk.kazoo_client.handler.short_spawn(self._start) def _cacheListener(self, event): - if hasattr(event.event_data, 'path'): - # Ignore root node - path = event.event_data.path - if path == self.root: - return + self._queue.put(event) - # Ignore any non-node related events such as connection events here - if event.event_type not in (TreeEvent.NODE_ADDED, - TreeEvent.NODE_UPDATED, - TreeEvent.NODE_REMOVED): + def _start(self): + self.log.debug("Initialize cache") + self.zk.kazoo_client.add_watch(self.root, self._cacheListener, + AddWatchMode.PERSISTENT_RECURSIVE) + self._walkTree() + self._started = True + + def stop(self): + self._stopped = True + self._queue.put(None) + + def _walkTree(self, root=None, seen_paths=None): + # Recursively walk the tree and emit fake changed events for + # every item in zk and fake deleted events for every item in + # the cache that is not in zk + exists = True + am_root = False + if root is None: + am_root = True + root = self.root + seen_paths = set() + if not self.zk.kazoo_client.exists(root): + exists = False + if exists: + seen_paths.add(root) + event = WatchedEvent(EventType.NONE, + self.zk.kazoo_client._state, + root) + self._cacheListener(event) + for child in self.zk.kazoo_client.get_children(root): + safe_root = root + if safe_root == '/': + safe_root = '' + new_path = '/'.join([safe_root, child]) + self._walkTree(new_path, seen_paths) + if am_root: + for path in self._cached_paths: + if path not in seen_paths: + event = WatchedEvent( + EventType.NONE, + self.zk.kazoo_client._state, + path) + self._cacheListener(event) + + def _handleCacheEvent(self, event): + self.log.debug("Cache event %s", event) + + data, stat = None, None + # The cache is being (re-)initialized. Since this happens out + # of sequence with the normal watch events, we can't be sure + # whether the node still exists or not by the time we process + # it. Later cache watch events may supercede this (for + # example, we may process a NONE event here which we interpret + # as a delete which may be followed by a normal delete event. + # That case, and any other variations should be anticipated. + if event.type == EventType.NONE: + try: + data, stat = self.zk.kazoo_client.get(event.path) + exists = True + except kze.NoNodeError: + exists = False + elif (event.type in (EventType.CREATED, EventType.CHANGED)): + exists = True + elif (event.type == EventType.DELETED): + exists = False + + # Keep the cached paths up to date + if exists: + self._cached_paths.add(event.path) + else: + self._cached_paths.discard(event.path) + + # Ignore root node since we don't maintain a cached object for + # it (all cached objects are under the root in our tree + # caches). + if event.path == self.root: return # Some caches have special handling for certain sub-objects - if self.preCacheHook(event): + if self.preCacheHook(event, exists): return - path = event.event_data.path - key = self.parsePath(path) + key = self.parsePath(event.path) if key is None: return - if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED): - # Images with empty data are invalid so skip add or update these. - if not event.event_data.data: - return - data = self.zk._bytesToDict(event.event_data.data) + # If we didn't get the data above, and we expect the node to + # exist, fetch the data now. + if data is None and exists: + try: + data, stat = self.zk.kazoo_client.get(event.path) + except kze.NoNodeError: + pass - # Perform an in-place update of the cached image if possible + if data: + data = self.zk._bytesToDict(data) + + # Perform an in-place update of the cached object if possible old_obj = self._cached_objects.get(key) if old_obj: - if event.event_data.stat.version <= old_obj.stat.version: + if stat.mzxid <= old_obj.stat.mzxid: # Don't update to older data return if getattr(old_obj, 'lock', None): # Don't update a locked object return old_obj.updateFromDict(data) - old_obj.stat = event.event_data.stat + old_obj.stat = stat else: obj = self.objectFromDict(data, key) - obj.stat = event.event_data.stat + obj.stat = stat self._cached_objects[key] = obj - self.postCacheHook(event) - elif event.event_type == TreeEvent.NODE_REMOVED: + else: try: del self._cached_objects[key] except KeyError: # If it's already gone, don't care pass - self.postCacheHook(event) - - def close(self): - self._tree_cache.close() + self.postCacheHook(event, data, stat) # Methods for subclasses: - def preCacheHook(self, event): + def preCacheHook(self, event, exists): """Called before the cache is updated This is called for any add/update/remove event under the root, even for paths that are ignored, so users much test the relevance of the path in this method. + The ``exists`` argument is provided in all cases. In the case + of EventType.NONE events, it indicates whether the cache has + seen the node in ZK immediately before calling this method. + Otherwise, it indicates whether or not the EventType would + cause the node to exist in ZK. + If True is returned, then the cache will stop processing the event. Other return values are ignored. + + :param EventType event: The event. + :param bool exists: Whether the object exists in ZK. + """ return None - def postCacheHook(self, event): + def postCacheHook(self, event, data, stat): """Called after the cache has been updated""" return None @@ -860,8 +955,8 @@ class ImageCache(NodepoolTreeCache): r = self.zk._parseImagePath(path) return r - def preCacheHook(self, event): - key = self.zk._parseImagePausePath(event.event_data.path) + def preCacheHook(self, event, exists): + key = self.zk._parseImagePausePath(event.path) if key is None: return # A pause flag is being added or removed @@ -869,10 +964,9 @@ class ImageCache(NodepoolTreeCache): image = self._cached_objects.get(key) if not image: return - if event.event_type in (TreeEvent.NODE_ADDED, - TreeEvent.NODE_UPDATED): + if exists: image.paused = True - elif event.event_type == TreeEvent.NODE_REMOVED: + else: image.paused = False # This event was for a paused path; no further handling necessary return True @@ -899,8 +993,8 @@ class NodeCache(NodepoolTreeCache): def parsePath(self, path): return self.zk._parseNodePath(path) - def preCacheHook(self, event): - key = self.zk._parseNodeLockPath(event.event_data.path) + def preCacheHook(self, event, exists): + key = self.zk._parseNodeLockPath(event.path) if key is None: return # A lock contender is being added or removed @@ -910,15 +1004,14 @@ class NodeCache(NodepoolTreeCache): node = self._cached_objects.get(obj_key) if not node: return - if event.event_type in (TreeEvent.NODE_ADDED, - TreeEvent.NODE_UPDATED): + if exists: node.lock_contenders.add(contender) - elif event.event_type == TreeEvent.NODE_REMOVED: + else: node.lock_contenders.discard(contender) # This event was for a lock path; no further handling necessary return True - def postCacheHook(self, event): + def postCacheHook(self, event, data, stat): # set the stats event so the stats reporting thread can act upon it if self.zk.node_stats_event is not None: self.zk.node_stats_event.set() @@ -1002,24 +1095,11 @@ class ZooKeeper(ZooKeeperBase): # Private Methods # ======================================================================= def _onConnect(self): - if self.enable_cache: + if self.enable_cache and self._node_cache is None: self._node_cache = NodeCache(self, self.NODE_ROOT) self._request_cache = RequestCache(self, self.REQUEST_ROOT) self._image_cache = ImageCache(self, self.IMAGE_ROOT) - def _onDisconnect(self): - if self._node_cache is not None: - self._node_cache.close() - self._node_cache = None - - if self._request_cache is not None: - self._request_cache.close() - self._request_cache = None - - if self._image_cache is not None: - self._image_cache.close() - self._image_cache = None - def _electionPath(self, election): return "%s/%s" % (self.ELECTION_ROOT, election)