diff --git a/nodepool/launcher.py b/nodepool/launcher.py index f2aeaff14..6ec189e17 100755 --- a/nodepool/launcher.py +++ b/nodepool/launcher.py @@ -900,7 +900,9 @@ class NodePool(threading.Thread): requested_labels = list(self._submittedRequests.keys()) needed_labels = list(set(label_names) - set(requested_labels)) - ready_nodes = self.zk.getReadyNodesOfTypes(needed_labels) + # Note we explicitly don't use the cache here because otherwise we can + # end up creating more min-ready nodes than we want. + ready_nodes = self.zk.getReadyNodesOfTypes(needed_labels, cached=False) for label in self.config.labels.values(): if label.name not in needed_labels: diff --git a/nodepool/stats.py b/nodepool/stats.py index 7bdeee555..7cbf32e10 100755 --- a/nodepool/stats.py +++ b/nodepool/stats.py @@ -110,7 +110,14 @@ class StatsReporter(object): key = 'nodepool.label.%s.nodes.%s' % (label, state) states[key] = 0 - for node in zk_conn.nodeIterator(): + # Note that we intentionally don't use caching here because we don't + # know when the next update will happen and thus need to report the + # correct most recent state. Otherwise we can end up in reporting + # a gauge with a node in state deleting = 1 and never update this for + # a long time. + # TODO(tobiash): Changing updateNodeStats to just run periodically will + # resolve this and we can operate on cached data. + for node in zk_conn.nodeIterator(cached=False): # nodepool.nodes.STATE key = 'nodepool.nodes.%s' % node.state states[key] += 1 diff --git a/nodepool/tests/unit/test_commands.py b/nodepool/tests/unit/test_commands.py index dbabd12e0..986052757 100644 --- a/nodepool/tests/unit/test_commands.py +++ b/nodepool/tests/unit/test_commands.py @@ -24,6 +24,7 @@ import testtools from nodepool.cmd import nodepoolcmd from nodepool import tests from nodepool import zk +from nodepool.nodeutils import iterate_timeout class TestNodepoolCMD(tests.DBTestCase): @@ -124,8 +125,15 @@ class TestNodepoolCMD(tests.DBTestCase): pool.start() self.waitForImage('fake-provider', 'fake-image') self.waitForNodes('fake-label') - self.assert_nodes_listed(configfile, 1, detail=False, - validate_col_count=True) + + for _ in iterate_timeout(10, Exception, "assert nodes are listed"): + try: + self.assert_nodes_listed(configfile, 1, detail=False, + validate_col_count=True) + break + except AssertionError: + # node is not listed yet, retry later + pass def test_list_nodes_detail(self): configfile = self.setup_config('node.yaml') @@ -134,8 +142,14 @@ class TestNodepoolCMD(tests.DBTestCase): pool.start() self.waitForImage('fake-provider', 'fake-image') self.waitForNodes('fake-label') - self.assert_nodes_listed(configfile, 1, detail=True, - validate_col_count=True) + for _ in iterate_timeout(10, Exception, "assert nodes are listed"): + try: + self.assert_nodes_listed(configfile, 1, detail=True, + validate_col_count=True) + break + except AssertionError: + # node is not listed yet, retry later + pass def test_config_validate(self): config = os.path.join(os.path.dirname(tests.__file__), diff --git a/nodepool/tests/unit/test_zk.py b/nodepool/tests/unit/test_zk.py index 76867c446..8f1dca523 100644 --- a/nodepool/tests/unit/test_zk.py +++ b/nodepool/tests/unit/test_zk.py @@ -17,6 +17,7 @@ import time from nodepool import exceptions as npe from nodepool import tests from nodepool import zk +from nodepool.nodeutils import iterate_timeout class TestZooKeeper(tests.DBTestCase): @@ -583,7 +584,7 @@ class TestZooKeeper(tests.DBTestCase): n3.type = 'label2' self.zk.storeNode(n3) - r = self.zk.getReadyNodesOfTypes(['label1']) + r = self.zk.getReadyNodesOfTypes(['label1'], cached=False) self.assertIn('label1', r) self.assertEqual(2, len(r['label1'])) self.assertIn(n1, r['label1']) @@ -603,7 +604,7 @@ class TestZooKeeper(tests.DBTestCase): n3.type = 'label2' self.zk.storeNode(n3) - r = self.zk.getReadyNodesOfTypes(['label1', 'label3']) + r = self.zk.getReadyNodesOfTypes(['label1', 'label3'], cached=False) self.assertIn('label1', r) self.assertIn('label3', r) self.assertEqual(2, len(r['label1'])) @@ -614,7 +615,7 @@ class TestZooKeeper(tests.DBTestCase): def test_nodeIterator(self): n1 = self._create_node() - i = self.zk.nodeIterator() + i = self.zk.nodeIterator(cached=False) self.assertEqual(n1, next(i)) with testtools.ExpectedException(StopIteration): next(i) @@ -670,6 +671,40 @@ class TestZooKeeper(tests.DBTestCase): self.zk.deleteNodeRequestLock(lock_ids[0]) self.assertEqual([], self.zk.getNodeRequestLockIDs()) + def test_node_caching(self): + ''' + Test that node iteration using both cached and uncached calls + produces identical results. + ''' + # Test new node in node set + n1 = self._create_node() + + # uncached + a1 = self.zk.nodeIterator(cached=False) + self.assertEqual(n1, next(a1)) + + # cached + a2 = self.zk.nodeIterator(cached=True) + self.assertEqual(n1, next(a2)) + with testtools.ExpectedException(StopIteration): + next(a2) + + # Test modification of existing node set + n1.state = zk.HOLD + n1.label = "oompaloompa" + self.zk.storeNode(n1) + + # uncached + b1 = self.zk.nodeIterator(cached=False) + self.assertEqual(n1, next(b1)) + + # cached + for _ in iterate_timeout(10, Exception, + "cached node equals original node"): + b2 = self.zk.nodeIterator(cached=True) + if n1 == next(b2): + break + class TestZKModel(tests.BaseTestCase): diff --git a/nodepool/zk.py b/nodepool/zk.py index dcb6fec62..c0dd2a489 100755 --- a/nodepool/zk.py +++ b/nodepool/zk.py @@ -22,6 +22,7 @@ from kazoo.client import KazooClient, KazooState from kazoo import exceptions as kze from kazoo.handlers.threading import KazooTimeoutError from kazoo.recipe.lock import Lock +from kazoo.recipe.cache import TreeCache from nodepool import exceptions as npe @@ -681,6 +682,7 @@ class ZooKeeper(object): self.client = None self._became_lost = False self._last_retry_log = 0 + self._node_cache = None # ======================================================================= # Private Methods @@ -871,6 +873,9 @@ class ZooKeeper(object): except KazooTimeoutError: self.logConnectionRetryEvent() + self._node_cache = TreeCache(self.client, self.NODE_ROOT) + self._node_cache.start() + def disconnect(self): ''' Close the ZooKeeper cluster connection. @@ -878,6 +883,11 @@ class ZooKeeper(object): You should call this method if you used connect() to establish a cluster connection. ''' + + if self._node_cache is not None: + self._node_cache.close() + self._node_cache = None + if self.client is not None and self.client.connected: self.client.stop() self.client.close() @@ -1692,19 +1702,35 @@ class ZooKeeper(object): except kze.NoNodeError: return [] - def getNode(self, node): + def getNode(self, node, cached=False): ''' Get the data for a specific node. :param str node: The node ID. + :param bool cached: True if the data should be taken from the cache. :returns: The node data, or None if the node was not found. ''' path = self._nodePath(node) - try: - data, stat = self.client.get(path) - except kze.NoNodeError: - return None + data = None + stat = None + if cached: + cached_data = self._node_cache.get_data(path) + if cached_data: + data = cached_data.data + stat = cached_data.stat + + # If data is empty we either didn't use the cache or the cache didn't + # have the node (yet). Note that even if we use caching we need to + # do a real query if the cached data is empty because the node data + # might not be in the cache yet when it's listed by the get_children + # call. + if not data: + try: + data, stat = self.client.get(path) + except kze.NoNodeError: + return None + if not data: return None @@ -1759,7 +1785,7 @@ class ZooKeeper(object): except kze.NoNodeError: pass - def getReadyNodesOfTypes(self, labels): + def getReadyNodesOfTypes(self, labels, cached=True): ''' Query ZooKeeper for unused/ready nodes. @@ -1771,7 +1797,7 @@ class ZooKeeper(object): those labels. ''' ret = {} - for node in self.nodeIterator(): + for node in self.nodeIterator(cached=cached): if node.state != READY or node.allocated_to: continue for label in labels: @@ -1848,12 +1874,14 @@ class ZooKeeper(object): return False - def nodeIterator(self): + def nodeIterator(self, cached=True): ''' Utility generator method for iterating through all nodes. + + :param bool cached: True if the data should be taken from the cache. ''' for node_id in self.getNodes(): - node = self.getNode(node_id) + node = self.getNode(node_id, cached=cached) if node: yield node