Support node caching in the nodeIterator

This adds support to return cached data by the nodeIterator. This can
be done easily by utilizing the TreeCache recipe of kazoo.

Depends-On: https://review.openstack.org/616398
Change-Id: I23a992417d186b712864f2b00e79bc88bbfca967
This commit is contained in:
Tobias Henkel 2018-09-23 20:27:13 +02:00
parent e0ebaa9799
commit 56bac6e9cb
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
5 changed files with 104 additions and 18 deletions

View File

@ -900,7 +900,9 @@ class NodePool(threading.Thread):
requested_labels = list(self._submittedRequests.keys()) requested_labels = list(self._submittedRequests.keys())
needed_labels = list(set(label_names) - set(requested_labels)) needed_labels = list(set(label_names) - set(requested_labels))
ready_nodes = self.zk.getReadyNodesOfTypes(needed_labels) # Note we explicitly don't use the cache here because otherwise we can
# end up creating more min-ready nodes than we want.
ready_nodes = self.zk.getReadyNodesOfTypes(needed_labels, cached=False)
for label in self.config.labels.values(): for label in self.config.labels.values():
if label.name not in needed_labels: if label.name not in needed_labels:

View File

@ -110,7 +110,14 @@ class StatsReporter(object):
key = 'nodepool.label.%s.nodes.%s' % (label, state) key = 'nodepool.label.%s.nodes.%s' % (label, state)
states[key] = 0 states[key] = 0
for node in zk_conn.nodeIterator(): # Note that we intentionally don't use caching here because we don't
# know when the next update will happen and thus need to report the
# correct most recent state. Otherwise we can end up in reporting
# a gauge with a node in state deleting = 1 and never update this for
# a long time.
# TODO(tobiash): Changing updateNodeStats to just run periodically will
# resolve this and we can operate on cached data.
for node in zk_conn.nodeIterator(cached=False):
# nodepool.nodes.STATE # nodepool.nodes.STATE
key = 'nodepool.nodes.%s' % node.state key = 'nodepool.nodes.%s' % node.state
states[key] += 1 states[key] += 1

View File

@ -24,6 +24,7 @@ import testtools
from nodepool.cmd import nodepoolcmd from nodepool.cmd import nodepoolcmd
from nodepool import tests from nodepool import tests
from nodepool import zk from nodepool import zk
from nodepool.nodeutils import iterate_timeout
class TestNodepoolCMD(tests.DBTestCase): class TestNodepoolCMD(tests.DBTestCase):
@ -124,8 +125,15 @@ class TestNodepoolCMD(tests.DBTestCase):
pool.start() pool.start()
self.waitForImage('fake-provider', 'fake-image') self.waitForImage('fake-provider', 'fake-image')
self.waitForNodes('fake-label') self.waitForNodes('fake-label')
self.assert_nodes_listed(configfile, 1, detail=False,
validate_col_count=True) for _ in iterate_timeout(10, Exception, "assert nodes are listed"):
try:
self.assert_nodes_listed(configfile, 1, detail=False,
validate_col_count=True)
break
except AssertionError:
# node is not listed yet, retry later
pass
def test_list_nodes_detail(self): def test_list_nodes_detail(self):
configfile = self.setup_config('node.yaml') configfile = self.setup_config('node.yaml')
@ -134,8 +142,14 @@ class TestNodepoolCMD(tests.DBTestCase):
pool.start() pool.start()
self.waitForImage('fake-provider', 'fake-image') self.waitForImage('fake-provider', 'fake-image')
self.waitForNodes('fake-label') self.waitForNodes('fake-label')
self.assert_nodes_listed(configfile, 1, detail=True, for _ in iterate_timeout(10, Exception, "assert nodes are listed"):
validate_col_count=True) try:
self.assert_nodes_listed(configfile, 1, detail=True,
validate_col_count=True)
break
except AssertionError:
# node is not listed yet, retry later
pass
def test_config_validate(self): def test_config_validate(self):
config = os.path.join(os.path.dirname(tests.__file__), config = os.path.join(os.path.dirname(tests.__file__),

View File

@ -17,6 +17,7 @@ import time
from nodepool import exceptions as npe from nodepool import exceptions as npe
from nodepool import tests from nodepool import tests
from nodepool import zk from nodepool import zk
from nodepool.nodeutils import iterate_timeout
class TestZooKeeper(tests.DBTestCase): class TestZooKeeper(tests.DBTestCase):
@ -583,7 +584,7 @@ class TestZooKeeper(tests.DBTestCase):
n3.type = 'label2' n3.type = 'label2'
self.zk.storeNode(n3) self.zk.storeNode(n3)
r = self.zk.getReadyNodesOfTypes(['label1']) r = self.zk.getReadyNodesOfTypes(['label1'], cached=False)
self.assertIn('label1', r) self.assertIn('label1', r)
self.assertEqual(2, len(r['label1'])) self.assertEqual(2, len(r['label1']))
self.assertIn(n1, r['label1']) self.assertIn(n1, r['label1'])
@ -603,7 +604,7 @@ class TestZooKeeper(tests.DBTestCase):
n3.type = 'label2' n3.type = 'label2'
self.zk.storeNode(n3) self.zk.storeNode(n3)
r = self.zk.getReadyNodesOfTypes(['label1', 'label3']) r = self.zk.getReadyNodesOfTypes(['label1', 'label3'], cached=False)
self.assertIn('label1', r) self.assertIn('label1', r)
self.assertIn('label3', r) self.assertIn('label3', r)
self.assertEqual(2, len(r['label1'])) self.assertEqual(2, len(r['label1']))
@ -614,7 +615,7 @@ class TestZooKeeper(tests.DBTestCase):
def test_nodeIterator(self): def test_nodeIterator(self):
n1 = self._create_node() n1 = self._create_node()
i = self.zk.nodeIterator() i = self.zk.nodeIterator(cached=False)
self.assertEqual(n1, next(i)) self.assertEqual(n1, next(i))
with testtools.ExpectedException(StopIteration): with testtools.ExpectedException(StopIteration):
next(i) next(i)
@ -670,6 +671,40 @@ class TestZooKeeper(tests.DBTestCase):
self.zk.deleteNodeRequestLock(lock_ids[0]) self.zk.deleteNodeRequestLock(lock_ids[0])
self.assertEqual([], self.zk.getNodeRequestLockIDs()) self.assertEqual([], self.zk.getNodeRequestLockIDs())
def test_node_caching(self):
'''
Test that node iteration using both cached and uncached calls
produces identical results.
'''
# Test new node in node set
n1 = self._create_node()
# uncached
a1 = self.zk.nodeIterator(cached=False)
self.assertEqual(n1, next(a1))
# cached
a2 = self.zk.nodeIterator(cached=True)
self.assertEqual(n1, next(a2))
with testtools.ExpectedException(StopIteration):
next(a2)
# Test modification of existing node set
n1.state = zk.HOLD
n1.label = "oompaloompa"
self.zk.storeNode(n1)
# uncached
b1 = self.zk.nodeIterator(cached=False)
self.assertEqual(n1, next(b1))
# cached
for _ in iterate_timeout(10, Exception,
"cached node equals original node"):
b2 = self.zk.nodeIterator(cached=True)
if n1 == next(b2):
break
class TestZKModel(tests.BaseTestCase): class TestZKModel(tests.BaseTestCase):

View File

@ -22,6 +22,7 @@ from kazoo.client import KazooClient, KazooState
from kazoo import exceptions as kze from kazoo import exceptions as kze
from kazoo.handlers.threading import KazooTimeoutError from kazoo.handlers.threading import KazooTimeoutError
from kazoo.recipe.lock import Lock from kazoo.recipe.lock import Lock
from kazoo.recipe.cache import TreeCache
from nodepool import exceptions as npe from nodepool import exceptions as npe
@ -681,6 +682,7 @@ class ZooKeeper(object):
self.client = None self.client = None
self._became_lost = False self._became_lost = False
self._last_retry_log = 0 self._last_retry_log = 0
self._node_cache = None
# ======================================================================= # =======================================================================
# Private Methods # Private Methods
@ -871,6 +873,9 @@ class ZooKeeper(object):
except KazooTimeoutError: except KazooTimeoutError:
self.logConnectionRetryEvent() self.logConnectionRetryEvent()
self._node_cache = TreeCache(self.client, self.NODE_ROOT)
self._node_cache.start()
def disconnect(self): def disconnect(self):
''' '''
Close the ZooKeeper cluster connection. Close the ZooKeeper cluster connection.
@ -878,6 +883,11 @@ class ZooKeeper(object):
You should call this method if you used connect() to establish a You should call this method if you used connect() to establish a
cluster connection. cluster connection.
''' '''
if self._node_cache is not None:
self._node_cache.close()
self._node_cache = None
if self.client is not None and self.client.connected: if self.client is not None and self.client.connected:
self.client.stop() self.client.stop()
self.client.close() self.client.close()
@ -1692,19 +1702,35 @@ class ZooKeeper(object):
except kze.NoNodeError: except kze.NoNodeError:
return [] return []
def getNode(self, node): def getNode(self, node, cached=False):
''' '''
Get the data for a specific node. Get the data for a specific node.
:param str node: The node ID. :param str node: The node ID.
:param bool cached: True if the data should be taken from the cache.
:returns: The node data, or None if the node was not found. :returns: The node data, or None if the node was not found.
''' '''
path = self._nodePath(node) path = self._nodePath(node)
try: data = None
data, stat = self.client.get(path) stat = None
except kze.NoNodeError: if cached:
return None cached_data = self._node_cache.get_data(path)
if cached_data:
data = cached_data.data
stat = cached_data.stat
# If data is empty we either didn't use the cache or the cache didn't
# have the node (yet). Note that even if we use caching we need to
# do a real query if the cached data is empty because the node data
# might not be in the cache yet when it's listed by the get_children
# call.
if not data:
try:
data, stat = self.client.get(path)
except kze.NoNodeError:
return None
if not data: if not data:
return None return None
@ -1759,7 +1785,7 @@ class ZooKeeper(object):
except kze.NoNodeError: except kze.NoNodeError:
pass pass
def getReadyNodesOfTypes(self, labels): def getReadyNodesOfTypes(self, labels, cached=True):
''' '''
Query ZooKeeper for unused/ready nodes. Query ZooKeeper for unused/ready nodes.
@ -1771,7 +1797,7 @@ class ZooKeeper(object):
those labels. those labels.
''' '''
ret = {} ret = {}
for node in self.nodeIterator(): for node in self.nodeIterator(cached=cached):
if node.state != READY or node.allocated_to: if node.state != READY or node.allocated_to:
continue continue
for label in labels: for label in labels:
@ -1848,12 +1874,14 @@ class ZooKeeper(object):
return False return False
def nodeIterator(self): def nodeIterator(self, cached=True):
''' '''
Utility generator method for iterating through all nodes. Utility generator method for iterating through all nodes.
:param bool cached: True if the data should be taken from the cache.
''' '''
for node_id in self.getNodes(): for node_id in self.getNodes():
node = self.getNode(node_id) node = self.getNode(node_id, cached=cached)
if node: if node:
yield node yield node