Add second level cache to node requests
When implementing dynamic node request priorities we need to be able to quickly get all requests and reorder them in a priority queue. This won't be efficient if this involves excessive json parsing. So this adds an event based second level cache. Change-Id: I923195de1890fdb74f7e5b33a0165f400dbbf374
This commit is contained in:
parent
35094dbb62
commit
6eb80deb36
|
@ -703,6 +703,7 @@ class ZooKeeper(object):
|
||||||
self._node_cache = None
|
self._node_cache = None
|
||||||
self._request_cache = None
|
self._request_cache = None
|
||||||
self._cached_nodes = {}
|
self._cached_nodes = {}
|
||||||
|
self._cached_node_requests = {}
|
||||||
|
|
||||||
# =======================================================================
|
# =======================================================================
|
||||||
# Private Methods
|
# Private Methods
|
||||||
|
@ -899,6 +900,8 @@ class ZooKeeper(object):
|
||||||
self._node_cache.start()
|
self._node_cache.start()
|
||||||
|
|
||||||
self._request_cache = TreeCache(self.client, self.REQUEST_ROOT)
|
self._request_cache = TreeCache(self.client, self.REQUEST_ROOT)
|
||||||
|
self._request_cache.listen_fault(self.cacheFaultListener)
|
||||||
|
self._request_cache.listen(self.requestCacheListener)
|
||||||
self._request_cache.start()
|
self._request_cache.start()
|
||||||
|
|
||||||
def disconnect(self):
|
def disconnect(self):
|
||||||
|
@ -1569,25 +1572,21 @@ class ZooKeeper(object):
|
||||||
|
|
||||||
:returns: The request data, or None if the request was not found.
|
:returns: The request data, or None if the request was not found.
|
||||||
'''
|
'''
|
||||||
path = self._requestPath(request)
|
|
||||||
data = None
|
|
||||||
stat = None
|
|
||||||
if cached:
|
if cached:
|
||||||
cached_data = self._request_cache.get_data(path)
|
d = self._cached_node_requests.get(request)
|
||||||
if cached_data:
|
if d:
|
||||||
data = cached_data.data
|
return d
|
||||||
stat = cached_data.stat
|
|
||||||
|
|
||||||
# If data is empty we either didn't use the cache or the cache didn't
|
# If we got here we either didn't use the cache or the cache didn't
|
||||||
# have the request (yet). Note that even if we use caching we need to
|
# have the request (yet). Note that even if we use caching we need to
|
||||||
# do a real query if the cached data is empty because the request data
|
# do a real query if the cached data is empty because the request data
|
||||||
# might not be in the cache yet when it's listed by the get_children
|
# might not be in the cache yet when it's listed by the get_children
|
||||||
# call.
|
# call.
|
||||||
if not data:
|
try:
|
||||||
try:
|
path = self._requestPath(request)
|
||||||
data, stat = self.client.get(path)
|
data, stat = self.client.get(path)
|
||||||
except kze.NoNodeError:
|
except kze.NoNodeError:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
d = NodeRequest.fromDict(self._bytesToDict(data), request)
|
d = NodeRequest.fromDict(self._bytesToDict(data), request)
|
||||||
d.stat = stat
|
d.stat = stat
|
||||||
|
@ -2107,3 +2106,49 @@ class ZooKeeper(object):
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# If it's already gone, don't care
|
# If it's already gone, don't care
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def requestCacheListener(self, event):
|
||||||
|
|
||||||
|
if hasattr(event.event_data, 'path'):
|
||||||
|
# Ignore root node
|
||||||
|
path = event.event_data.path
|
||||||
|
if path == self.REQUEST_ROOT:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Ignore lock nodes
|
||||||
|
if '/lock' in path:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Ignore any non-node related events such as connection events here
|
||||||
|
if event.event_type not in (TreeEvent.NODE_ADDED,
|
||||||
|
TreeEvent.NODE_UPDATED,
|
||||||
|
TreeEvent.NODE_REMOVED):
|
||||||
|
return
|
||||||
|
|
||||||
|
path = event.event_data.path
|
||||||
|
request_id = path.rsplit('/', 1)[1]
|
||||||
|
|
||||||
|
if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
|
||||||
|
# Perform an in-place update of the cached request if possible
|
||||||
|
d = self._bytesToDict(event.event_data.data)
|
||||||
|
old_request = self._cached_node_requests.get(request_id)
|
||||||
|
if old_request:
|
||||||
|
if event.event_data.stat.version <= old_request.stat.version:
|
||||||
|
# Don't update to older data
|
||||||
|
return
|
||||||
|
if old_request.lock:
|
||||||
|
# Don't update a locked node request
|
||||||
|
return
|
||||||
|
old_request.updateFromDict(d)
|
||||||
|
old_request.stat = event.event_data.stat
|
||||||
|
else:
|
||||||
|
request = NodeRequest.fromDict(d, request_id)
|
||||||
|
request.stat = event.event_data.stat
|
||||||
|
self._cached_node_requests[request_id] = request
|
||||||
|
|
||||||
|
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||||
|
try:
|
||||||
|
del self._cached_node_requests[request_id]
|
||||||
|
except KeyError:
|
||||||
|
# If it's already gone, don't care
|
||||||
|
pass
|
||||||
|
|
Loading…
Reference in New Issue