Add second level cache of nodes
An earlier change introduced zNode caching of Nodes. This first version still required frequent re-parsing of the node json data. This change introduces a second level cache that updates a dict of the current nodes based on the events emitted by the TreeCache. Using this we can reduce json parsing and make node caching mode effective. Change-Id: I4834a7ea722cf2ac7df79455ce077832ae966e63
This commit is contained in:
parent
1d278ffaee
commit
35094dbb62
|
@ -39,18 +39,25 @@ class TestNodepoolCMD(tests.DBTestCase):
|
||||||
def assert_listed(self, configfile, cmd, col, val, count, col_count=0):
|
def assert_listed(self, configfile, cmd, col, val, count, col_count=0):
|
||||||
log = logging.getLogger("tests.PrettyTableMock")
|
log = logging.getLogger("tests.PrettyTableMock")
|
||||||
self.patch_argv("-c", configfile, *cmd)
|
self.patch_argv("-c", configfile, *cmd)
|
||||||
with mock.patch('prettytable.PrettyTable.add_row') as m_add_row:
|
for _ in iterate_timeout(10, AssertionError, 'assert listed'):
|
||||||
nodepoolcmd.main()
|
try:
|
||||||
rows_with_val = 0
|
with mock.patch('prettytable.PrettyTable.add_row') as \
|
||||||
# Find add_rows with the status were looking for
|
m_add_row:
|
||||||
for args, kwargs in m_add_row.call_args_list:
|
nodepoolcmd.main()
|
||||||
row = args[0]
|
rows_with_val = 0
|
||||||
if col_count:
|
# Find add_rows with the status were looking for
|
||||||
self.assertEquals(len(row), col_count)
|
for args, kwargs in m_add_row.call_args_list:
|
||||||
log.debug(row)
|
row = args[0]
|
||||||
if row[col] == val:
|
if col_count:
|
||||||
rows_with_val += 1
|
self.assertEquals(len(row), col_count)
|
||||||
self.assertEquals(rows_with_val, count)
|
log.debug(row)
|
||||||
|
if row[col] == val:
|
||||||
|
rows_with_val += 1
|
||||||
|
self.assertEquals(rows_with_val, count)
|
||||||
|
break
|
||||||
|
except AssertionError:
|
||||||
|
# retry
|
||||||
|
pass
|
||||||
|
|
||||||
def assert_alien_images_listed(self, configfile, image_cnt, image_id):
|
def assert_alien_images_listed(self, configfile, image_cnt, image_id):
|
||||||
self.assert_listed(configfile, ['alien-image-list'], 2, image_id,
|
self.assert_listed(configfile, ['alien-image-list'], 2, image_id,
|
||||||
|
|
|
@ -22,7 +22,7 @@ from kazoo.client import KazooClient, KazooState
|
||||||
from kazoo import exceptions as kze
|
from kazoo import exceptions as kze
|
||||||
from kazoo.handlers.threading import KazooTimeoutError
|
from kazoo.handlers.threading import KazooTimeoutError
|
||||||
from kazoo.recipe.lock import Lock
|
from kazoo.recipe.lock import Lock
|
||||||
from kazoo.recipe.cache import TreeCache
|
from kazoo.recipe.cache import TreeCache, TreeEvent
|
||||||
|
|
||||||
from nodepool import exceptions as npe
|
from nodepool import exceptions as npe
|
||||||
|
|
||||||
|
@ -702,6 +702,7 @@ class ZooKeeper(object):
|
||||||
self._last_retry_log = 0
|
self._last_retry_log = 0
|
||||||
self._node_cache = None
|
self._node_cache = None
|
||||||
self._request_cache = None
|
self._request_cache = None
|
||||||
|
self._cached_nodes = {}
|
||||||
|
|
||||||
# =======================================================================
|
# =======================================================================
|
||||||
# Private Methods
|
# Private Methods
|
||||||
|
@ -893,6 +894,8 @@ class ZooKeeper(object):
|
||||||
self.logConnectionRetryEvent()
|
self.logConnectionRetryEvent()
|
||||||
|
|
||||||
self._node_cache = TreeCache(self.client, self.NODE_ROOT)
|
self._node_cache = TreeCache(self.client, self.NODE_ROOT)
|
||||||
|
self._node_cache.listen_fault(self.cacheFaultListener)
|
||||||
|
self._node_cache.listen(self.nodeCacheListener)
|
||||||
self._node_cache.start()
|
self._node_cache.start()
|
||||||
|
|
||||||
self._request_cache = TreeCache(self.client, self.REQUEST_ROOT)
|
self._request_cache = TreeCache(self.client, self.REQUEST_ROOT)
|
||||||
|
@ -1780,25 +1783,21 @@ class ZooKeeper(object):
|
||||||
|
|
||||||
:returns: The node data, or None if the node was not found.
|
:returns: The node data, or None if the node was not found.
|
||||||
'''
|
'''
|
||||||
path = self._nodePath(node)
|
|
||||||
data = None
|
|
||||||
stat = None
|
|
||||||
if cached:
|
if cached:
|
||||||
cached_data = self._node_cache.get_data(path)
|
d = self._cached_nodes.get(node)
|
||||||
if cached_data:
|
if d:
|
||||||
data = cached_data.data
|
return d
|
||||||
stat = cached_data.stat
|
|
||||||
|
|
||||||
# If data is empty we either didn't use the cache or the cache didn't
|
# We got here we either didn't use the cache or the cache didn't
|
||||||
# have the node (yet). Note that even if we use caching we need to
|
# have the node (yet). Note that even if we use caching we need to
|
||||||
# do a real query if the cached data is empty because the node data
|
# do a real query if the cached data is empty because the node data
|
||||||
# might not be in the cache yet when it's listed by the get_children
|
# might not be in the cache yet when it's listed by the get_children
|
||||||
# call.
|
# call.
|
||||||
if not data:
|
try:
|
||||||
try:
|
path = self._nodePath(node)
|
||||||
data, stat = self.client.get(path)
|
data, stat = self.client.get(path)
|
||||||
except kze.NoNodeError:
|
except kze.NoNodeError:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if not data:
|
if not data:
|
||||||
return None
|
return None
|
||||||
|
@ -2060,3 +2059,51 @@ class ZooKeeper(object):
|
||||||
'''
|
'''
|
||||||
for node in provider_nodes:
|
for node in provider_nodes:
|
||||||
self.deleteNode(node)
|
self.deleteNode(node)
|
||||||
|
|
||||||
|
def cacheFaultListener(self, e):
|
||||||
|
self.log.exception(e)
|
||||||
|
|
||||||
|
def nodeCacheListener(self, event):
|
||||||
|
|
||||||
|
if hasattr(event.event_data, 'path'):
|
||||||
|
# Ignore root node
|
||||||
|
path = event.event_data.path
|
||||||
|
if path == self.NODE_ROOT:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Ignore lock nodes
|
||||||
|
if '/lock' in path:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Ignore any non-node related events such as connection events here
|
||||||
|
if event.event_type not in (TreeEvent.NODE_ADDED,
|
||||||
|
TreeEvent.NODE_UPDATED,
|
||||||
|
TreeEvent.NODE_REMOVED):
|
||||||
|
return
|
||||||
|
|
||||||
|
path = event.event_data.path
|
||||||
|
node_id = path.rsplit('/', 1)[1]
|
||||||
|
|
||||||
|
if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
|
||||||
|
# Perform an in-place update of the already cached node if possible
|
||||||
|
d = self._bytesToDict(event.event_data.data)
|
||||||
|
old_node = self._cached_nodes.get(node_id)
|
||||||
|
if old_node:
|
||||||
|
if event.event_data.stat.version <= old_node.stat.version:
|
||||||
|
# Don't update to older data
|
||||||
|
return
|
||||||
|
if old_node.lock:
|
||||||
|
# Don't update a locked node
|
||||||
|
return
|
||||||
|
old_node.updateFromDict(d)
|
||||||
|
old_node.stat = event.event_data.stat
|
||||||
|
else:
|
||||||
|
node = Node.fromDict(d, node_id)
|
||||||
|
node.stat = event.event_data.stat
|
||||||
|
self._cached_nodes[node_id] = node
|
||||||
|
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||||
|
try:
|
||||||
|
del self._cached_nodes[node_id]
|
||||||
|
except KeyError:
|
||||||
|
# If it's already gone, don't care
|
||||||
|
pass
|
||||||
|
|
Loading…
Reference in New Issue