From 6eb80deb36ba3f1c6975ce321ef1c391905b661e Mon Sep 17 00:00:00 2001 From: Tobias Henkel Date: Tue, 20 Nov 2018 20:21:54 +0100 Subject: [PATCH] Add second level cache to node requests When implementing dynamic node request priorities we need to be able to quickly get all requests and reorder them in a priority queue. This won't be efficient if this involves excessive json parsing. So this adds an event based second level cache. Change-Id: I923195de1890fdb74f7e5b33a0165f400dbbf374 --- nodepool/zk.py | 71 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 58 insertions(+), 13 deletions(-) diff --git a/nodepool/zk.py b/nodepool/zk.py index 6ce42e4b8..d67342cb0 100755 --- a/nodepool/zk.py +++ b/nodepool/zk.py @@ -703,6 +703,7 @@ class ZooKeeper(object): self._node_cache = None self._request_cache = None self._cached_nodes = {} + self._cached_node_requests = {} # ======================================================================= # Private Methods @@ -899,6 +900,8 @@ class ZooKeeper(object): self._node_cache.start() self._request_cache = TreeCache(self.client, self.REQUEST_ROOT) + self._request_cache.listen_fault(self.cacheFaultListener) + self._request_cache.listen(self.requestCacheListener) self._request_cache.start() def disconnect(self): @@ -1569,25 +1572,21 @@ class ZooKeeper(object): :returns: The request data, or None if the request was not found. ''' - path = self._requestPath(request) - data = None - stat = None if cached: - cached_data = self._request_cache.get_data(path) - if cached_data: - data = cached_data.data - stat = cached_data.stat + d = self._cached_node_requests.get(request) + if d: + return d - # If data is empty we either didn't use the cache or the cache didn't + # If we got here we either didn't use the cache or the cache didn't # have the request (yet). Note that even if we use caching we need to # do a real query if the cached data is empty because the request data # might not be in the cache yet when it's listed by the get_children # call. - if not data: - try: - data, stat = self.client.get(path) - except kze.NoNodeError: - return None + try: + path = self._requestPath(request) + data, stat = self.client.get(path) + except kze.NoNodeError: + return None d = NodeRequest.fromDict(self._bytesToDict(data), request) d.stat = stat @@ -2107,3 +2106,49 @@ class ZooKeeper(object): except KeyError: # If it's already gone, don't care pass + + def requestCacheListener(self, event): + + if hasattr(event.event_data, 'path'): + # Ignore root node + path = event.event_data.path + if path == self.REQUEST_ROOT: + return + + # Ignore lock nodes + if '/lock' in path: + return + + # Ignore any non-node related events such as connection events here + if event.event_type not in (TreeEvent.NODE_ADDED, + TreeEvent.NODE_UPDATED, + TreeEvent.NODE_REMOVED): + return + + path = event.event_data.path + request_id = path.rsplit('/', 1)[1] + + if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED): + # Perform an in-place update of the cached request if possible + d = self._bytesToDict(event.event_data.data) + old_request = self._cached_node_requests.get(request_id) + if old_request: + if event.event_data.stat.version <= old_request.stat.version: + # Don't update to older data + return + if old_request.lock: + # Don't update a locked node request + return + old_request.updateFromDict(d) + old_request.stat = event.event_data.stat + else: + request = NodeRequest.fromDict(d, request_id) + request.stat = event.event_data.stat + self._cached_node_requests[request_id] = request + + elif event.event_type == TreeEvent.NODE_REMOVED: + try: + del self._cached_node_requests[request_id] + except KeyError: + # If it's already gone, don't care + pass