From 7bbdfdc9fd1f4e816bbb66a96c1c758a3cf92d45 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Fri, 17 Jun 2022 12:50:37 -0700 Subject: [PATCH] Update ZooKeeper class connection methods This updates the ZooKeeper class to inherit from ZooKeeperBase and utilize its connection methods. It also moves the connection loss detection used by the builder to be more localized and removes unused methods. Change-Id: I6c9dbe17976560bc024f74cd31bdb6305d51168d --- nodepool/builder.py | 9 +- nodepool/launcher.py | 2 +- nodepool/tests/__init__.py | 8 +- nodepool/tests/unit/test_commands.py | 2 +- nodepool/tests/unit/test_launcher.py | 10 +- nodepool/tests/unit/test_zk.py | 108 +++++++------ nodepool/zk/zookeeper.py | 219 ++++++++++++--------------- tools/print-zk.py | 4 +- 8 files changed, 178 insertions(+), 184 deletions(-) diff --git a/nodepool/builder.py b/nodepool/builder.py index de8f2077e..9b4505c50 100644 --- a/nodepool/builder.py +++ b/nodepool/builder.py @@ -597,6 +597,11 @@ class BuildWorker(BaseWorker): interval, zk) self.log = logging.getLogger("nodepool.builder.BuildWorker.%s" % name) self.name = 'BuildWorker.%s' % name + self._lost_zk_connection = False + zk.client.on_connection_lost_listeners.append(self._onConnectionLost) + + def _onConnectionLost(self): + self._lost_zk_connection = True def _getBuildLogRoot(self, name): log_dir = self._config.build_log_dir @@ -697,6 +702,7 @@ class BuildWorker(BaseWorker): :returns: The updated ImageBuild data structure. ''' + self._lost_zk_connection = False data = zk.ImageBuild() data.state = zk.BUILDING data.builder_id = self._builder_id @@ -951,9 +957,8 @@ class BuildWorker(BaseWorker): if self._statsd: pipeline = self._statsd.pipeline() - if self._zk.didLoseConnection: + if self._lost_zk_connection: self.log.info("ZooKeeper lost while building %s" % diskimage.name) - self._zk.resetLostFlag() build_data.state = zk.FAILED elif p.returncode or did_timeout: self.log.info( diff --git a/nodepool/launcher.py b/nodepool/launcher.py index ba8a9ec97..ed73d3e86 100644 --- a/nodepool/launcher.py +++ b/nodepool/launcher.py @@ -381,7 +381,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter): # Make sure we're always registered with ZK hostname = socket.gethostname() self.component_info = PoolComponent( - self.zk.zk_client, hostname, + self.zk.client, hostname, version=get_version_string()) labels = set() for prov_cfg in self.nodepool.config.providers.values(): diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 720bdf1a5..fed7001e4 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -689,21 +689,21 @@ class DBTestCase(BaseTestCase): return a + b return a + '/' + b - data, stat = self.zk.client.get(node) + data, stat = self.zk.kazoo_client.get(node) self.log.debug("Node: %s" % (node,)) if data: self.log.debug(data) - for child in self.zk.client.get_children(node): + for child in self.zk.kazoo_client.get_children(node): self.printZKTree(join(node, child)) def getZKTree(self, path, ret=None): """Return the contents of a ZK tree as a dictionary""" if ret is None: ret = {} - for key in self.zk.client.get_children(path): + for key in self.zk.kazoo_client.get_children(path): subpath = os.path.join(path, key) - ret[subpath] = self.zk.client.get( + ret[subpath] = self.zk.kazoo_client.get( os.path.join(path, key))[0] self.getZKTree(subpath, ret) return ret diff --git a/nodepool/tests/unit/test_commands.py b/nodepool/tests/unit/test_commands.py index bcb16771b..ee699f398 100644 --- a/nodepool/tests/unit/test_commands.py +++ b/nodepool/tests/unit/test_commands.py @@ -448,7 +448,7 @@ class TestNodepoolCMD(tests.DBTestCase): "-c", configfile, 'export-image-data', tf.name) nodepoolcmd.main() # Delete data from ZK - self.zk.client.delete('/nodepool', recursive=True) + self.zk.kazoo_client.delete('/nodepool', recursive=True) self.patch_argv( "-c", configfile, 'import-image-data', tf.name) diff --git a/nodepool/tests/unit/test_launcher.py b/nodepool/tests/unit/test_launcher.py index 1f1d5f665..39888194b 100644 --- a/nodepool/tests/unit/test_launcher.py +++ b/nodepool/tests/unit/test_launcher.py @@ -91,7 +91,7 @@ class TestLauncher(tests.DBTestCase): # Verify the cleanup thread removed the lock self.assertIsNotNone( - self.zk.client.exists(self.zk._requestLockPath(req.id)) + self.zk.kazoo_client.exists(self.zk._requestLockPath(req.id)) ) self.zk.deleteNodeRequest(req) self.waitForNodeRequestLockDeletion(req.id) @@ -766,7 +766,7 @@ class TestLauncher(tests.DBTestCase): # than what we are going to request. hostname = socket.gethostname() dummy_component = PoolComponent( - self.zk.zk_client, hostname, + self.zk.client, hostname, version=get_version_string()) dummy_component.content.update({ 'id': 'dummy', @@ -2426,14 +2426,14 @@ class TestLauncher(tests.DBTestCase): # Create empty node path = "%s" % self.zk._nodePath("12345") self.log.debug("node path %s", path) - self.zk.client.create(path, makepath=True) - self.assertTrue(self.zk.client.exists(path)) + self.zk.kazoo_client.create(path, makepath=True) + self.assertTrue(self.zk.kazoo_client.exists(path)) pool = self.useNodepool(configfile, watermark_sleep=1) pool.cleanup_interval = .1 pool.start() - while self.zk.client.exists(path): + while self.zk.kazoo_client.exists(path): time.sleep(.1) def test_leaked_port_cleanup(self): diff --git a/nodepool/tests/unit/test_zk.py b/nodepool/tests/unit/test_zk.py index 07e43877a..e232a2802 100644 --- a/nodepool/tests/unit/test_zk.py +++ b/nodepool/tests/unit/test_zk.py @@ -30,7 +30,7 @@ class TestComponentRegistry(tests.DBTestCase): def test_pool_component(self): hostname = socket.gethostname() launcher = PoolComponent( - self.zk.zk_client, hostname, + self.zk.client, hostname, version=get_version_string()) launcher.content.update({ 'id': "launcher-Poolworker.provider-main-" + uuid.uuid4().hex, @@ -92,7 +92,7 @@ class TestZooKeeper(tests.DBTestCase): def test_imageBuildLock(self): path = self.zk._imageBuildLockPath("ubuntu-trusty") with self.zk.imageBuildLock("ubuntu-trusty", blocking=False): - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) def test_imageBuildLock_exception_nonblocking(self): image = "ubuntu-trusty" @@ -115,7 +115,7 @@ class TestZooKeeper(tests.DBTestCase): with self.zk.imageBuildNumberLock( "ubuntu-trusty", "0000", blocking=False ): - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) def test_imageBuildNumberLock_exception_nonblocking(self): image = "ubuntu-trusty" @@ -152,12 +152,12 @@ class TestZooKeeper(tests.DBTestCase): upload.provider_name, upload.id) with self.zk.imageUploadNumberLock(upload, blocking=False): - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) self.zk.deleteUpload(upload.image_name, upload.build_id, upload.provider_name, upload.id) - self.assertIsNone(self.zk.client.exists(path)) + self.assertIsNone(self.zk.kazoo_client.exists(path)) def test_imageUploadNumberLock_orphan(self): upload = zk.ImageUpload() @@ -171,7 +171,7 @@ class TestZooKeeper(tests.DBTestCase): upload.id) with self.zk.imageUploadNumberLock(upload, blocking=False): - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) self.zk.deleteUpload(upload.image_name, upload.build_id, upload.provider_name, @@ -183,7 +183,7 @@ class TestZooKeeper(tests.DBTestCase): # We now recreated an empty image upload number node pass - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) # Should not throw an exception because of the empty upload self.zk.getImageUpload(upload.image_name, upload.build_id, @@ -200,7 +200,7 @@ class TestZooKeeper(tests.DBTestCase): upload.provider_name, upload.id) with self.zk.imageUploadNumberLock(upload, blocking=False): - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) def test_imageUploadNumberLock_exception_nonblocking(self): upload = zk.ImageUpload() @@ -232,7 +232,7 @@ class TestZooKeeper(tests.DBTestCase): path = self.zk._imageUploadLockPath("ubuntu-trusty", "0000", "prov1") with self.zk.imageUploadLock("ubuntu-trusty", "0000", "prov1", blocking=False): - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) def test_imageUploadLock_exception_nonblocking(self): image = "ubuntu-trusty" @@ -372,7 +372,7 @@ class TestZooKeeper(tests.DBTestCase): # We now created an empty build number node pass - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) # Should not throw an exception because of the empty upload self.assertIsNone(self.zk.getBuild(image, build_number)) @@ -501,11 +501,15 @@ class TestZooKeeper(tests.DBTestCase): v3.state = zk.FAILED v4 = zk.ImageBuild() v4.state = zk.DELETING - self.zk.client.create(path + "/1", value=v1.serialize(), makepath=True) - self.zk.client.create(path + "/2", value=v2.serialize(), makepath=True) - self.zk.client.create(path + "/3", value=v3.serialize(), makepath=True) - self.zk.client.create(path + "/4", value=v4.serialize(), makepath=True) - self.zk.client.create(path + "/lock", makepath=True) + self.zk.kazoo_client.create(path + "/1", value=v1.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/2", value=v2.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/3", value=v3.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/4", value=v4.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/lock", makepath=True) matches = self.zk.getBuilds(image, None) self.assertEqual(4, len(matches)) @@ -521,11 +525,15 @@ class TestZooKeeper(tests.DBTestCase): v3.state = zk.FAILED v4 = zk.ImageBuild() v4.state = zk.DELETING - self.zk.client.create(path + "/1", value=v1.serialize(), makepath=True) - self.zk.client.create(path + "/2", value=v2.serialize(), makepath=True) - self.zk.client.create(path + "/3", value=v3.serialize(), makepath=True) - self.zk.client.create(path + "/4", value=v4.serialize(), makepath=True) - self.zk.client.create(path + "/lock", makepath=True) + self.zk.kazoo_client.create(path + "/1", value=v1.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/2", value=v2.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/3", value=v3.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/4", value=v4.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/lock", makepath=True) matches = self.zk.getBuilds(image, [zk.DELETING, zk.FAILED]) self.assertEqual(2, len(matches)) @@ -540,11 +548,15 @@ class TestZooKeeper(tests.DBTestCase): v3.state = zk.FAILED v4 = zk.ImageUpload() v4.state = zk.DELETING - self.zk.client.create(path + "/1", value=v1.serialize(), makepath=True) - self.zk.client.create(path + "/2", value=v2.serialize(), makepath=True) - self.zk.client.create(path + "/3", value=v3.serialize(), makepath=True) - self.zk.client.create(path + "/4", value=v4.serialize(), makepath=True) - self.zk.client.create(path + "/lock", makepath=True) + self.zk.kazoo_client.create(path + "/1", value=v1.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/2", value=v2.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/3", value=v3.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/4", value=v4.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/lock", makepath=True) matches = self.zk.getUploads("trusty", "000", "rax", [zk.DELETING, zk.FAILED]) @@ -560,11 +572,15 @@ class TestZooKeeper(tests.DBTestCase): v3.state = zk.FAILED v4 = zk.ImageUpload() v4.state = zk.DELETING - self.zk.client.create(path + "/1", value=v1.serialize(), makepath=True) - self.zk.client.create(path + "/2", value=v2.serialize(), makepath=True) - self.zk.client.create(path + "/3", value=v3.serialize(), makepath=True) - self.zk.client.create(path + "/4", value=v4.serialize(), makepath=True) - self.zk.client.create(path + "/lock", makepath=True) + self.zk.kazoo_client.create(path + "/1", value=v1.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/2", value=v2.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/3", value=v3.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/4", value=v4.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/lock", makepath=True) matches = self.zk.getUploads("trusty", "000", "rax", None) self.assertEqual(4, len(matches)) @@ -592,9 +608,9 @@ class TestZooKeeper(tests.DBTestCase): def test_deleteUpload(self): path = self.zk._imageUploadPath("trusty", "000", "rax") + "/000001" - self.zk.client.create(path, makepath=True) + self.zk.kazoo_client.create(path, makepath=True) self.zk.deleteUpload("trusty", "000", "rax", "000001") - self.assertIsNone(self.zk.client.exists(path)) + self.assertIsNone(self.zk.kazoo_client.exists(path)) def test_getNodeRequests_empty(self): self.assertEqual([], self.zk.getNodeRequests()) @@ -604,10 +620,10 @@ class TestZooKeeper(tests.DBTestCase): r2 = self.zk._requestPath("100-456") r3 = self.zk._requestPath("100-123") r4 = self.zk._requestPath("400-123") - self.zk.client.create(r1, makepath=True, ephemeral=True) - self.zk.client.create(r2, makepath=True, ephemeral=True) - self.zk.client.create(r3, makepath=True, ephemeral=True) - self.zk.client.create(r4, makepath=True, ephemeral=True) + self.zk.kazoo_client.create(r1, makepath=True, ephemeral=True) + self.zk.kazoo_client.create(r2, makepath=True, ephemeral=True) + self.zk.kazoo_client.create(r3, makepath=True, ephemeral=True) + self.zk.kazoo_client.create(r4, makepath=True, ephemeral=True) self.assertEqual( ["100-123", "100-456", "400-123", "500-123"], @@ -618,8 +634,8 @@ class TestZooKeeper(tests.DBTestCase): r = zk.NodeRequest("500-123") r.state = zk.REQUESTED path = self.zk._requestPath(r.id) - self.zk.client.create(path, value=r.serialize(), - makepath=True, ephemeral=True) + self.zk.kazoo_client.create(path, value=r.serialize(), + makepath=True, ephemeral=True) o = self.zk.getNodeRequest(r.id) self.assertIsInstance(o, zk.NodeRequest) self.assertEqual(r.id, o.id) @@ -628,8 +644,8 @@ class TestZooKeeper(tests.DBTestCase): self.assertIsNone(self.zk.getNodeRequest("invalid")) def test_getNodes(self): - self.zk.client.create(self.zk._nodePath('100'), makepath=True) - self.zk.client.create(self.zk._nodePath('200'), makepath=True) + self.zk.kazoo_client.create(self.zk._nodePath('100'), makepath=True) + self.zk.kazoo_client.create(self.zk._nodePath('200'), makepath=True) nodes = self.zk.getNodes() self.assertIn('100', nodes) self.assertIn('200', nodes) @@ -638,7 +654,7 @@ class TestZooKeeper(tests.DBTestCase): n = zk.Node('100') n.state = zk.BUILDING path = self.zk._nodePath(n.id) - self.zk.client.create(path, value=n.serialize(), makepath=True) + self.zk.kazoo_client.create(path, value=n.serialize(), makepath=True) o = self.zk.getNode(n.id) self.assertIsInstance(o, zk.Node) self.assertEqual(n.id, o.id) @@ -659,7 +675,7 @@ class TestZooKeeper(tests.DBTestCase): self.zk.lockNode(node) self.assertIsNotNone(node.lock) self.assertIsNotNone( - self.zk.client.exists(self.zk._nodeLockPath(node.id)) + self.zk.kazoo_client.exists(self.zk._nodeLockPath(node.id)) ) self.zk.unlockNode(node) self.assertIsNone(node.lock) @@ -678,7 +694,7 @@ class TestZooKeeper(tests.DBTestCase): self.zk.storeNode(node) self.assertIsNotNone(node.id) self.assertIsNotNone( - self.zk.client.exists(self.zk._nodePath(node.id)) + self.zk.kazoo_client.exists(self.zk._nodePath(node.id)) ) return node @@ -700,7 +716,7 @@ class TestZooKeeper(tests.DBTestCase): req.node_types.append('label1') self.zk.storeNodeRequest(req) self.assertIsNotNone( - self.zk.client.exists(self.zk._requestPath(req.id)) + self.zk.kazoo_client.exists(self.zk._requestPath(req.id)) ) return req @@ -722,14 +738,14 @@ class TestZooKeeper(tests.DBTestCase): req = self._create_node_request() self.zk.deleteNodeRequest(req) self.assertIsNone( - self.zk.client.exists(self.zk._requestPath(req.id)) + self.zk.kazoo_client.exists(self.zk._requestPath(req.id)) ) def test_deleteNode(self): n1 = self._create_node() self.zk.deleteNode(n1) self.assertIsNone( - self.zk.client.exists(self.zk._nodePath(n1.id)) + self.zk.kazoo_client.exists(self.zk._nodePath(n1.id)) ) def test_getReadyNodesOfTypes(self): diff --git a/nodepool/zk/zookeeper.py b/nodepool/zk/zookeeper.py index 38d8cf08c..4eaf8ca44 100644 --- a/nodepool/zk/zookeeper.py +++ b/nodepool/zk/zookeeper.py @@ -18,7 +18,6 @@ import logging import time import uuid -from kazoo.client import KazooState from kazoo import exceptions as kze from kazoo.recipe.lock import Lock from kazoo.recipe.cache import TreeCache, TreeEvent @@ -27,6 +26,7 @@ from kazoo.recipe.election import Election from nodepool import exceptions as npe from nodepool.logconfig import get_annotated_logger from nodepool.zk.components import COMPONENT_REGISTRY +from nodepool.zk import ZooKeeperBase # States: # We are building this image (or node) but it is not ready for use. @@ -676,7 +676,7 @@ class Node(BaseModel): self.requestor = d.get('requestor') -class ZooKeeper(object): +class ZooKeeper(ZooKeeperBase): ''' Class implementing the ZooKeeper interface. @@ -704,13 +704,11 @@ class ZooKeeper(object): # Log zookeeper retry every 10 seconds retry_log_rate = 10 - def __init__(self, zk_client, enable_cache=True): + def __init__(self, client, enable_cache=True): ''' Initialize the ZooKeeper object. ''' - self.zk_client = zk_client # nodepool.zk.ZooKeeperClient - self.client = zk_client.client # KazooClient - self._became_lost = False + super().__init__(client) self._last_retry_log = 0 self._node_cache = None self._request_cache = None @@ -719,22 +717,35 @@ class ZooKeeper(object): self.enable_cache = enable_cache self.node_stats_event = None - if self.enable_cache: - self._node_cache = TreeCache(self.client, self.NODE_ROOT) - self._node_cache.listen_fault(self.cacheFaultListener) - self._node_cache.listen(self.nodeCacheListener) - self._node_cache.start() + if self.client.connected: + self._onConnect() - self._request_cache = TreeCache(self.client, self.REQUEST_ROOT) - self._request_cache.listen_fault(self.cacheFaultListener) - self._request_cache.listen(self.requestCacheListener) - self._request_cache.start() - - COMPONENT_REGISTRY.create(self.zk_client) + COMPONENT_REGISTRY.create(self.client) # ======================================================================= # Private Methods # ======================================================================= + def _onConnect(self): + if self.enable_cache: + self._node_cache = TreeCache(self.kazoo_client, self.NODE_ROOT) + self._node_cache.listen_fault(self.cacheFaultListener) + self._node_cache.listen(self.nodeCacheListener) + self._node_cache.start() + + self._request_cache = TreeCache(self.kazoo_client, + self.REQUEST_ROOT) + self._request_cache.listen_fault(self.cacheFaultListener) + self._request_cache.listen(self.requestCacheListener) + self._request_cache.start() + + def _onDisconnect(self): + if self._node_cache is not None: + self._node_cache.close() + self._node_cache = None + + if self._request_cache is not None: + self._request_cache.close() + self._request_cache = None def _electionPath(self, election): return "%s/%s" % (self.ELECTION_ROOT, election) @@ -800,7 +811,7 @@ class ZooKeeper(object): def _getImageBuildLock(self, image, blocking=True, timeout=None): lock_path = self._imageBuildLockPath(image) try: - lock = Lock(self.client, lock_path) + lock = Lock(self.kazoo_client, lock_path) have_lock = lock.acquire(blocking, timeout) except kze.LockTimeout: raise npe.TimeoutException( @@ -820,7 +831,7 @@ class ZooKeeper(object): blocking=True, timeout=None): lock_path = self._imageBuildNumberLockPath(image, build_number) try: - lock = Lock(self.client, lock_path) + lock = Lock(self.kazoo_client, lock_path) have_lock = lock.acquire(blocking, timeout) except kze.LockTimeout: raise npe.TimeoutException( @@ -842,7 +853,7 @@ class ZooKeeper(object): lock_path = self._imageUploadNumberLockPath(image, build_number, provider, upload_number) try: - lock = Lock(self.client, lock_path) + lock = Lock(self.kazoo_client, lock_path) have_lock = lock.acquire(blocking, timeout) except kze.LockTimeout: raise npe.TimeoutException( @@ -864,7 +875,7 @@ class ZooKeeper(object): blocking=True, timeout=None): lock_path = self._imageUploadLockPath(image, build_number, provider) try: - lock = Lock(self.client, lock_path) + lock = Lock(self.kazoo_client, lock_path) have_lock = lock.acquire(blocking, timeout) except kze.LockTimeout: raise npe.TimeoutException( @@ -881,20 +892,6 @@ class ZooKeeper(object): return lock - def _connection_listener(self, state): - ''' - Listener method for Kazoo connection state changes. - - .. warning:: This method must not block. - ''' - if state == KazooState.LOST: - self.log.debug("ZooKeeper connection: LOST") - self._became_lost = True - elif state == KazooState.SUSPENDED: - self.log.debug("ZooKeeper connection: SUSPENDED") - else: - self.log.debug("ZooKeeper connection: CONNECTED") - def logConnectionRetryEvent(self): ''' Kazoo retry callback @@ -910,28 +907,15 @@ class ZooKeeper(object): @property def connected(self): - if self.client is None: - return False - return self.client.state == KazooState.CONNECTED + return self.client.connected @property def suspended(self): - if self.client is None: - return True - return self.client.state == KazooState.SUSPENDED + return self.client.suspended @property def lost(self): - if self.client is None: - return True - return self.client.state == KazooState.LOST - - @property - def didLoseConnection(self): - return self._became_lost - - def resetLostFlag(self): - self._became_lost = False + return self.client.lost def disconnect(self): ''' @@ -941,18 +925,7 @@ class ZooKeeper(object): cluster connection. ''' - if self._node_cache is not None: - self._node_cache.close() - self._node_cache = None - - if self._request_cache is not None: - self._request_cache.close() - self._request_cache = None - - if self.client is not None and self.client.connected: - self.client.stop() - self.client.close() - self.client = None + self.client.disconnect() def resetHosts(self, hosts): ''' @@ -960,8 +933,7 @@ class ZooKeeper(object): :param str host_list: A ZK host list ''' - if self.client is not None: - self.client.set_hosts(hosts=hosts) + self.client.resetHosts(hosts) @contextmanager def imageBuildLock(self, image, blocking=True, timeout=None): @@ -1086,7 +1058,7 @@ class ZooKeeper(object): path = self.IMAGE_ROOT try: - images = self.client.get_children(path) + images = self.kazoo_client.get_children(path) except kze.NoNodeError: return [] return sorted(images) @@ -1100,7 +1072,7 @@ class ZooKeeper(object): path = self._imagePausePath(image) try: - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) except kze.NoNodeError: return False return True @@ -1113,12 +1085,12 @@ class ZooKeeper(object): if paused: try: - self.client.create(path) + self.kazoo_client.create(path) except kze.NodeExistsError: pass else: try: - self.client.delete(path) + self.kazoo_client.delete(path) except kze.NoNodeError: pass @@ -1133,7 +1105,7 @@ class ZooKeeper(object): path = self._imageBuildsPath(image) try: - builds = self.client.get_children(path) + builds = self.kazoo_client.get_children(path) except kze.NoNodeError: return [] builds = [x for x in builds if x != 'lock'] @@ -1152,7 +1124,7 @@ class ZooKeeper(object): path = self._imageProviderPath(image, build_number) try: - providers = self.client.get_children(path) + providers = self.kazoo_client.get_children(path) except kze.NoNodeError: return [] @@ -1172,7 +1144,7 @@ class ZooKeeper(object): path = self._imageUploadPath(image, build_number, provider) try: - uploads = self.client.get_children(path) + uploads = self.kazoo_client.get_children(path) except kze.NoNodeError: return [] @@ -1191,7 +1163,7 @@ class ZooKeeper(object): path = self._imageBuildsPath(image) + "/%s" % build_number try: - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) except kze.NoNodeError: return None @@ -1218,7 +1190,7 @@ class ZooKeeper(object): path = self._imageBuildsPath(image) try: - builds = self.client.get_children(path) + builds = self.kazoo_client.get_children(path) except kze.NoNodeError: return [] @@ -1285,7 +1257,7 @@ class ZooKeeper(object): build_path = self._imageBuildsPath(image) + "/" if build_number is None: - path = self.client.create( + path = self.kazoo_client.create( build_path, value=build_data.serialize(), sequence=True, @@ -1293,7 +1265,7 @@ class ZooKeeper(object): build_number = path.split("/")[-1] else: path = build_path + build_number - self.client.set(path, build_data.serialize()) + self.kazoo_client.set(path, build_data.serialize()) return build_number @@ -1314,7 +1286,7 @@ class ZooKeeper(object): path = path + "/%s" % upload_number try: - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) except kze.NoNodeError: return None @@ -1347,7 +1319,7 @@ class ZooKeeper(object): path = self._imageUploadPath(image, build_number, provider) try: - uploads = self.client.get_children(path) + uploads = self.kazoo_client.get_children(path) except kze.NoNodeError: return [] @@ -1411,7 +1383,7 @@ class ZooKeeper(object): path = self._imageUploadPath(image, build_number, provider) try: - uploads = self.client.get_children(path) + uploads = self.kazoo_client.get_children(path) except kze.NoNodeError: uploads = [] @@ -1454,7 +1426,7 @@ class ZooKeeper(object): ''' # We expect the image builds path to already exist. build_path = self._imageBuildsPath(image) - if not self.client.exists(build_path): + if not self.kazoo_client.exists(build_path): raise npe.ZKException( "Cannot find build %s of image %s" % (build_number, image) ) @@ -1466,7 +1438,7 @@ class ZooKeeper(object): image, build_number, provider) + "/" if upload_number is None: - path = self.client.create( + path = self.kazoo_client.create( upload_path, value=image_data.serialize(), sequence=True, @@ -1474,7 +1446,7 @@ class ZooKeeper(object): upload_number = path.split("/")[-1] else: path = upload_path + upload_number - self.client.set(path, image_data.serialize()) + self.kazoo_client.set(path, image_data.serialize()) return upload_number @@ -1487,7 +1459,7 @@ class ZooKeeper(object): :returns: True if request is pending, False otherwise ''' path = self._imageBuildRequestPath(image) - if self.client.exists(path) is not None: + if self.kazoo_client.exists(path) is not None: return True return False @@ -1498,7 +1470,7 @@ class ZooKeeper(object): latest_build, *_ = builds builds_path = self._imageBuildNumberPath(image, latest_build) - return self.client.exists(builds_path) + return self.kazoo_client.exists(builds_path) def getBuildRequest(self, image): """Get a build request for the given image. @@ -1509,13 +1481,13 @@ class ZooKeeper(object): """ path = self._imageBuildRequestPath(image) try: - _, stat = self.client.get(path) + _, stat = self.kazoo_client.get(path) except kze.NoNodeError: return pending = True lock_path = self._imageBuildLockPath(image) - lock_stat = self.client.exists(lock_path) + lock_stat = self.kazoo_client.exists(lock_path) if lock_stat and lock_stat.children_count: build_stat = self._latestImageBuildStat(image) # If there is a lock, but no build we assume that the build @@ -1534,7 +1506,7 @@ class ZooKeeper(object): :param str image: The image name. ''' path = self._imageBuildRequestPath(image) - self.client.ensure_path(path) + self.kazoo_client.ensure_path(path) def removeBuildRequest(self, image): ''' @@ -1544,7 +1516,7 @@ class ZooKeeper(object): ''' path = self._imageBuildRequestPath(image) try: - self.client.delete(path) + self.kazoo_client.delete(path) except kze.NoNodeError: pass @@ -1571,7 +1543,7 @@ class ZooKeeper(object): try: # NOTE: Need to do recursively to remove lock znodes - self.client.delete(path, recursive=True) + self.kazoo_client.delete(path, recursive=True) except kze.NoNodeError: pass @@ -1590,7 +1562,7 @@ class ZooKeeper(object): path = path + "/%s" % upload_number try: # NOTE: Need to do recursively to remove lock znodes - self.client.delete(path, recursive=True) + self.kazoo_client.delete(path, recursive=True) except kze.NoNodeError: pass @@ -1610,7 +1582,7 @@ class ZooKeeper(object): :returns: A list of request nodes. ''' try: - requests = self.client.get_children(self.REQUEST_ROOT) + requests = self.kazoo_client.get_children(self.REQUEST_ROOT) except kze.NoNodeError: return [] @@ -1621,7 +1593,7 @@ class ZooKeeper(object): Get the current list of all node request lock ids. ''' try: - lock_ids = self.client.get_children(self.REQUEST_LOCK_ROOT) + lock_ids = self.kazoo_client.get_children(self.REQUEST_LOCK_ROOT) except kze.NoNodeError: return [] return lock_ids @@ -1640,7 +1612,7 @@ class ZooKeeper(object): ''' path = self._requestLockPath(lock_id) try: - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) except kze.NoNodeError: return None d = NodeRequestLockStats(lock_id) @@ -1655,7 +1627,7 @@ class ZooKeeper(object): ''' path = self._requestLockPath(lock_id) try: - self.client.delete(path, recursive=True) + self.kazoo_client.delete(path, recursive=True) except kze.NoNodeError: pass @@ -1680,7 +1652,7 @@ class ZooKeeper(object): # call. try: path = self._requestPath(request) - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) except kze.NoNodeError: return None @@ -1696,7 +1668,7 @@ class ZooKeeper(object): ''' path = self._requestPath(request.id) - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) if data: d = self._bytesToDict(data) @@ -1717,7 +1689,7 @@ class ZooKeeper(object): if not request.event_id: request.event_id = uuid.uuid4().hex path = "%s/%s-" % (self.REQUEST_ROOT, priority) - path = self.client.create( + path = self.kazoo_client.create( path, value=request.serialize(), ephemeral=True, @@ -1732,7 +1704,7 @@ class ZooKeeper(object): "Attempt to update non-existing request %s" % request) path = self._requestPath(request.id) - self.client.set(path, request.serialize()) + self.kazoo_client.set(path, request.serialize()) def deleteNodeRequest(self, request): ''' @@ -1745,7 +1717,7 @@ class ZooKeeper(object): path = self._requestPath(request.id) try: - self.client.delete(path) + self.kazoo_client.delete(path) except kze.NoNodeError: pass @@ -1772,7 +1744,7 @@ class ZooKeeper(object): node_request_id=request.id) path = self._requestLockPath(request.id) try: - lock = Lock(self.client, path) + lock = Lock(self.kazoo_client, path) have_lock = lock.acquire(blocking, timeout) except kze.LockTimeout: raise npe.TimeoutException( @@ -1833,7 +1805,7 @@ class ZooKeeper(object): ''' path = self._nodeLockPath(node.id) try: - lock = Lock(self.client, path, identifier) + lock = Lock(self.kazoo_client, path, identifier) have_lock = lock.acquire(blocking, timeout, ephemeral) except kze.LockTimeout: raise npe.TimeoutException( @@ -1875,7 +1847,7 @@ class ZooKeeper(object): ''' path = self._nodeLockPath(node.id) try: - self.client.delete(path, recursive=True) + self.kazoo_client.delete(path, recursive=True) except kze.NoNodeError: pass @@ -1884,7 +1856,7 @@ class ZooKeeper(object): Return the contenders for a node lock. ''' path = self._nodeLockPath(node.id) - lock = Lock(self.client, path) + lock = Lock(self.kazoo_client, path) return lock.contenders() def getNodes(self): @@ -1894,7 +1866,7 @@ class ZooKeeper(object): :returns: A list of nodes. ''' try: - return self.client.get_children(self.NODE_ROOT) + return self.kazoo_client.get_children(self.NODE_ROOT) except kze.NoNodeError: return [] @@ -1919,7 +1891,7 @@ class ZooKeeper(object): # call. try: path = self._nodePath(node) - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) except kze.NoNodeError: return None @@ -1939,7 +1911,7 @@ class ZooKeeper(object): ''' path = self._nodePath(node.id) - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) if data: d = self._bytesToDict(data) @@ -1971,7 +1943,7 @@ class ZooKeeper(object): else: node.created_time = time.time() - path = self.client.create( + path = self.kazoo_client.create( node_path, value=node.serialize(), sequence=True, @@ -1979,7 +1951,7 @@ class ZooKeeper(object): node.id = path.split("/")[-1] else: path = self._nodePath(node.id) - self.client.set(path, node.serialize()) + self.kazoo_client.set(path, node.serialize()) def watchNode(self, node, callback): '''Watch an existing node for changes. @@ -2000,7 +1972,7 @@ class ZooKeeper(object): return callback(node, deleted) path = self._nodePath(node.id) - self.client.DataWatch(path, _callback_wrapper) + self.kazoo_client.DataWatch(path, _callback_wrapper) def deleteRawNode(self, node_id): ''' @@ -2012,7 +1984,7 @@ class ZooKeeper(object): ''' path = self._nodePath(node_id) try: - self.client.delete(path, recursive=True) + self.kazoo_client.delete(path, recursive=True) except kze.NoNodeError: pass @@ -2031,7 +2003,7 @@ class ZooKeeper(object): # anything so that we can detect a race condition where the # lock is removed before the node deletion occurs. node.state = DELETED - self.client.set(path, node.serialize()) + self.kazoo_client.set(path, node.serialize()) self.deleteRawNode(node.id) def getReadyNodesOfTypes(self, labels, cached=True): @@ -2248,7 +2220,7 @@ class ZooKeeper(object): path = self._imageProviderPath(image, build) path = "%s/%s" % (path, provider_name) try: - self.client.delete(path, recursive=True) + self.kazoo_client.delete(path, recursive=True) except kze.NoNodeError: pass @@ -2389,7 +2361,7 @@ class ZooKeeper(object): def getStatsElection(self, identifier): path = self._electionPath('stats') - return Election(self.client, path, identifier) + return Election(self.kazoo_client, path, identifier) def exportImageData(self): ''' @@ -2404,7 +2376,7 @@ class ZooKeeper(object): for build_no in self.getBuildNumbers(image_name): build_path = self._imageBuildsPath(image_name) + "/" + build_no try: - build_data, stat = self.client.get(build_path) + build_data, stat = self.kazoo_client.get(build_path) except kze.NoNodeError: continue ret[build_path] = build_data.decode('utf8') @@ -2416,7 +2388,8 @@ class ZooKeeper(object): image_name, build_no, provider_name) + "/" upload_path += upload_no try: - upload_data, stat = self.client.get(upload_path) + upload_data, stat = self.kazoo_client.get( + upload_path) except kze.NoNodeError: continue ret[upload_path] = upload_data.decode('utf8') @@ -2466,19 +2439,19 @@ class ZooKeeper(object): highest_num[key] = max(highest_num.get(key, num), num) for path, num in highest_num.items(): for x in range(num): - node = self.client.create( + node = self.kazoo_client.create( path + '/', makepath=True, sequence=True) # If this node isn't in our import data, go ahead and # delete it. if node not in import_data: - self.client.delete(node) + self.kazoo_client.delete(node) for path, data in import_data.items(): # We may have already created a node above; in that # case, just set the data on it. - if self.client.exists(path): - self.client.set(path, - value=data.encode('utf8')) + if self.kazoo_client.exists(path): + self.kazoo_client.set(path, + value=data.encode('utf8')) else: - self.client.create(path, - value=data.encode('utf8'), - makepath=True) + self.kazoo_client.create(path, + value=data.encode('utf8'), + makepath=True) diff --git a/tools/print-zk.py b/tools/print-zk.py index c7d2d2c5b..03e90168a 100755 --- a/tools/print-zk.py +++ b/tools/print-zk.py @@ -47,12 +47,12 @@ def join(a, b): return a+'/'+b def print_tree(node): - data, stat = zk.client.get(node) + data, stat = zk.kazoo_client.get(node) print("Node: %s %s" % (node, stat)) if data: print(data) - for child in zk.client.get_children(node): + for child in zk.kazoo_client.get_children(node): print() print_tree(join(node, child))