diff --git a/nodepool/builder.py b/nodepool/builder.py index e4aee0338..e712a8e43 100644 --- a/nodepool/builder.py +++ b/nodepool/builder.py @@ -599,6 +599,11 @@ class BuildWorker(BaseWorker): interval, zk) self.log = logging.getLogger("nodepool.builder.BuildWorker.%s" % name) self.name = 'BuildWorker.%s' % name + self._lost_zk_connection = False + zk.client.on_connection_lost_listeners.append(self._onConnectionLost) + + def _onConnectionLost(self): + self._lost_zk_connection = True def _getBuildLogRoot(self, name): log_dir = self._config.build_log_dir @@ -699,6 +704,7 @@ class BuildWorker(BaseWorker): :returns: The updated ImageBuild data structure. ''' + self._lost_zk_connection = False data = zk.ImageBuild() data.state = zk.BUILDING data.builder_id = self._builder_id @@ -953,9 +959,8 @@ class BuildWorker(BaseWorker): if self._statsd: pipeline = self._statsd.pipeline() - if self._zk.didLoseConnection: + if self._lost_zk_connection: self.log.info("ZooKeeper lost while building %s" % diskimage.name) - self._zk.resetLostFlag() build_data.state = zk.FAILED elif p.returncode or did_timeout: self.log.info( diff --git a/nodepool/launcher.py b/nodepool/launcher.py index ba8a9ec97..ed73d3e86 100644 --- a/nodepool/launcher.py +++ b/nodepool/launcher.py @@ -381,7 +381,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter): # Make sure we're always registered with ZK hostname = socket.gethostname() self.component_info = PoolComponent( - self.zk.zk_client, hostname, + self.zk.client, hostname, version=get_version_string()) labels = set() for prov_cfg in self.nodepool.config.providers.values(): diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 720bdf1a5..fed7001e4 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -689,21 +689,21 @@ class DBTestCase(BaseTestCase): return a + b return a + '/' + b - data, stat = self.zk.client.get(node) + data, stat = self.zk.kazoo_client.get(node) self.log.debug("Node: %s" % (node,)) if data: self.log.debug(data) - for child in self.zk.client.get_children(node): + for child in self.zk.kazoo_client.get_children(node): self.printZKTree(join(node, child)) def getZKTree(self, path, ret=None): """Return the contents of a ZK tree as a dictionary""" if ret is None: ret = {} - for key in self.zk.client.get_children(path): + for key in self.zk.kazoo_client.get_children(path): subpath = os.path.join(path, key) - ret[subpath] = self.zk.client.get( + ret[subpath] = self.zk.kazoo_client.get( os.path.join(path, key))[0] self.getZKTree(subpath, ret) return ret diff --git a/nodepool/tests/unit/test_commands.py b/nodepool/tests/unit/test_commands.py index 446ce909f..5db3d44c1 100644 --- a/nodepool/tests/unit/test_commands.py +++ b/nodepool/tests/unit/test_commands.py @@ -488,7 +488,7 @@ class TestNodepoolCMD(tests.DBTestCase): "-c", configfile, 'export-image-data', tf.name) nodepoolcmd.main() # Delete data from ZK - self.zk.client.delete('/nodepool', recursive=True) + self.zk.kazoo_client.delete('/nodepool', recursive=True) self.patch_argv( "-c", configfile, 'import-image-data', tf.name) diff --git a/nodepool/tests/unit/test_launcher.py b/nodepool/tests/unit/test_launcher.py index 1f1d5f665..39888194b 100644 --- a/nodepool/tests/unit/test_launcher.py +++ b/nodepool/tests/unit/test_launcher.py @@ -91,7 +91,7 @@ class TestLauncher(tests.DBTestCase): # Verify the cleanup thread removed the lock self.assertIsNotNone( - self.zk.client.exists(self.zk._requestLockPath(req.id)) + self.zk.kazoo_client.exists(self.zk._requestLockPath(req.id)) ) self.zk.deleteNodeRequest(req) self.waitForNodeRequestLockDeletion(req.id) @@ -766,7 +766,7 @@ class TestLauncher(tests.DBTestCase): # than what we are going to request. hostname = socket.gethostname() dummy_component = PoolComponent( - self.zk.zk_client, hostname, + self.zk.client, hostname, version=get_version_string()) dummy_component.content.update({ 'id': 'dummy', @@ -2426,14 +2426,14 @@ class TestLauncher(tests.DBTestCase): # Create empty node path = "%s" % self.zk._nodePath("12345") self.log.debug("node path %s", path) - self.zk.client.create(path, makepath=True) - self.assertTrue(self.zk.client.exists(path)) + self.zk.kazoo_client.create(path, makepath=True) + self.assertTrue(self.zk.kazoo_client.exists(path)) pool = self.useNodepool(configfile, watermark_sleep=1) pool.cleanup_interval = .1 pool.start() - while self.zk.client.exists(path): + while self.zk.kazoo_client.exists(path): time.sleep(.1) def test_leaked_port_cleanup(self): diff --git a/nodepool/tests/unit/test_zk.py b/nodepool/tests/unit/test_zk.py index 07e43877a..e232a2802 100644 --- a/nodepool/tests/unit/test_zk.py +++ b/nodepool/tests/unit/test_zk.py @@ -30,7 +30,7 @@ class TestComponentRegistry(tests.DBTestCase): def test_pool_component(self): hostname = socket.gethostname() launcher = PoolComponent( - self.zk.zk_client, hostname, + self.zk.client, hostname, version=get_version_string()) launcher.content.update({ 'id': "launcher-Poolworker.provider-main-" + uuid.uuid4().hex, @@ -92,7 +92,7 @@ class TestZooKeeper(tests.DBTestCase): def test_imageBuildLock(self): path = self.zk._imageBuildLockPath("ubuntu-trusty") with self.zk.imageBuildLock("ubuntu-trusty", blocking=False): - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) def test_imageBuildLock_exception_nonblocking(self): image = "ubuntu-trusty" @@ -115,7 +115,7 @@ class TestZooKeeper(tests.DBTestCase): with self.zk.imageBuildNumberLock( "ubuntu-trusty", "0000", blocking=False ): - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) def test_imageBuildNumberLock_exception_nonblocking(self): image = "ubuntu-trusty" @@ -152,12 +152,12 @@ class TestZooKeeper(tests.DBTestCase): upload.provider_name, upload.id) with self.zk.imageUploadNumberLock(upload, blocking=False): - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) self.zk.deleteUpload(upload.image_name, upload.build_id, upload.provider_name, upload.id) - self.assertIsNone(self.zk.client.exists(path)) + self.assertIsNone(self.zk.kazoo_client.exists(path)) def test_imageUploadNumberLock_orphan(self): upload = zk.ImageUpload() @@ -171,7 +171,7 @@ class TestZooKeeper(tests.DBTestCase): upload.id) with self.zk.imageUploadNumberLock(upload, blocking=False): - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) self.zk.deleteUpload(upload.image_name, upload.build_id, upload.provider_name, @@ -183,7 +183,7 @@ class TestZooKeeper(tests.DBTestCase): # We now recreated an empty image upload number node pass - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) # Should not throw an exception because of the empty upload self.zk.getImageUpload(upload.image_name, upload.build_id, @@ -200,7 +200,7 @@ class TestZooKeeper(tests.DBTestCase): upload.provider_name, upload.id) with self.zk.imageUploadNumberLock(upload, blocking=False): - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) def test_imageUploadNumberLock_exception_nonblocking(self): upload = zk.ImageUpload() @@ -232,7 +232,7 @@ class TestZooKeeper(tests.DBTestCase): path = self.zk._imageUploadLockPath("ubuntu-trusty", "0000", "prov1") with self.zk.imageUploadLock("ubuntu-trusty", "0000", "prov1", blocking=False): - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) def test_imageUploadLock_exception_nonblocking(self): image = "ubuntu-trusty" @@ -372,7 +372,7 @@ class TestZooKeeper(tests.DBTestCase): # We now created an empty build number node pass - self.assertIsNotNone(self.zk.client.exists(path)) + self.assertIsNotNone(self.zk.kazoo_client.exists(path)) # Should not throw an exception because of the empty upload self.assertIsNone(self.zk.getBuild(image, build_number)) @@ -501,11 +501,15 @@ class TestZooKeeper(tests.DBTestCase): v3.state = zk.FAILED v4 = zk.ImageBuild() v4.state = zk.DELETING - self.zk.client.create(path + "/1", value=v1.serialize(), makepath=True) - self.zk.client.create(path + "/2", value=v2.serialize(), makepath=True) - self.zk.client.create(path + "/3", value=v3.serialize(), makepath=True) - self.zk.client.create(path + "/4", value=v4.serialize(), makepath=True) - self.zk.client.create(path + "/lock", makepath=True) + self.zk.kazoo_client.create(path + "/1", value=v1.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/2", value=v2.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/3", value=v3.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/4", value=v4.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/lock", makepath=True) matches = self.zk.getBuilds(image, None) self.assertEqual(4, len(matches)) @@ -521,11 +525,15 @@ class TestZooKeeper(tests.DBTestCase): v3.state = zk.FAILED v4 = zk.ImageBuild() v4.state = zk.DELETING - self.zk.client.create(path + "/1", value=v1.serialize(), makepath=True) - self.zk.client.create(path + "/2", value=v2.serialize(), makepath=True) - self.zk.client.create(path + "/3", value=v3.serialize(), makepath=True) - self.zk.client.create(path + "/4", value=v4.serialize(), makepath=True) - self.zk.client.create(path + "/lock", makepath=True) + self.zk.kazoo_client.create(path + "/1", value=v1.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/2", value=v2.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/3", value=v3.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/4", value=v4.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/lock", makepath=True) matches = self.zk.getBuilds(image, [zk.DELETING, zk.FAILED]) self.assertEqual(2, len(matches)) @@ -540,11 +548,15 @@ class TestZooKeeper(tests.DBTestCase): v3.state = zk.FAILED v4 = zk.ImageUpload() v4.state = zk.DELETING - self.zk.client.create(path + "/1", value=v1.serialize(), makepath=True) - self.zk.client.create(path + "/2", value=v2.serialize(), makepath=True) - self.zk.client.create(path + "/3", value=v3.serialize(), makepath=True) - self.zk.client.create(path + "/4", value=v4.serialize(), makepath=True) - self.zk.client.create(path + "/lock", makepath=True) + self.zk.kazoo_client.create(path + "/1", value=v1.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/2", value=v2.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/3", value=v3.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/4", value=v4.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/lock", makepath=True) matches = self.zk.getUploads("trusty", "000", "rax", [zk.DELETING, zk.FAILED]) @@ -560,11 +572,15 @@ class TestZooKeeper(tests.DBTestCase): v3.state = zk.FAILED v4 = zk.ImageUpload() v4.state = zk.DELETING - self.zk.client.create(path + "/1", value=v1.serialize(), makepath=True) - self.zk.client.create(path + "/2", value=v2.serialize(), makepath=True) - self.zk.client.create(path + "/3", value=v3.serialize(), makepath=True) - self.zk.client.create(path + "/4", value=v4.serialize(), makepath=True) - self.zk.client.create(path + "/lock", makepath=True) + self.zk.kazoo_client.create(path + "/1", value=v1.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/2", value=v2.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/3", value=v3.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/4", value=v4.serialize(), + makepath=True) + self.zk.kazoo_client.create(path + "/lock", makepath=True) matches = self.zk.getUploads("trusty", "000", "rax", None) self.assertEqual(4, len(matches)) @@ -592,9 +608,9 @@ class TestZooKeeper(tests.DBTestCase): def test_deleteUpload(self): path = self.zk._imageUploadPath("trusty", "000", "rax") + "/000001" - self.zk.client.create(path, makepath=True) + self.zk.kazoo_client.create(path, makepath=True) self.zk.deleteUpload("trusty", "000", "rax", "000001") - self.assertIsNone(self.zk.client.exists(path)) + self.assertIsNone(self.zk.kazoo_client.exists(path)) def test_getNodeRequests_empty(self): self.assertEqual([], self.zk.getNodeRequests()) @@ -604,10 +620,10 @@ class TestZooKeeper(tests.DBTestCase): r2 = self.zk._requestPath("100-456") r3 = self.zk._requestPath("100-123") r4 = self.zk._requestPath("400-123") - self.zk.client.create(r1, makepath=True, ephemeral=True) - self.zk.client.create(r2, makepath=True, ephemeral=True) - self.zk.client.create(r3, makepath=True, ephemeral=True) - self.zk.client.create(r4, makepath=True, ephemeral=True) + self.zk.kazoo_client.create(r1, makepath=True, ephemeral=True) + self.zk.kazoo_client.create(r2, makepath=True, ephemeral=True) + self.zk.kazoo_client.create(r3, makepath=True, ephemeral=True) + self.zk.kazoo_client.create(r4, makepath=True, ephemeral=True) self.assertEqual( ["100-123", "100-456", "400-123", "500-123"], @@ -618,8 +634,8 @@ class TestZooKeeper(tests.DBTestCase): r = zk.NodeRequest("500-123") r.state = zk.REQUESTED path = self.zk._requestPath(r.id) - self.zk.client.create(path, value=r.serialize(), - makepath=True, ephemeral=True) + self.zk.kazoo_client.create(path, value=r.serialize(), + makepath=True, ephemeral=True) o = self.zk.getNodeRequest(r.id) self.assertIsInstance(o, zk.NodeRequest) self.assertEqual(r.id, o.id) @@ -628,8 +644,8 @@ class TestZooKeeper(tests.DBTestCase): self.assertIsNone(self.zk.getNodeRequest("invalid")) def test_getNodes(self): - self.zk.client.create(self.zk._nodePath('100'), makepath=True) - self.zk.client.create(self.zk._nodePath('200'), makepath=True) + self.zk.kazoo_client.create(self.zk._nodePath('100'), makepath=True) + self.zk.kazoo_client.create(self.zk._nodePath('200'), makepath=True) nodes = self.zk.getNodes() self.assertIn('100', nodes) self.assertIn('200', nodes) @@ -638,7 +654,7 @@ class TestZooKeeper(tests.DBTestCase): n = zk.Node('100') n.state = zk.BUILDING path = self.zk._nodePath(n.id) - self.zk.client.create(path, value=n.serialize(), makepath=True) + self.zk.kazoo_client.create(path, value=n.serialize(), makepath=True) o = self.zk.getNode(n.id) self.assertIsInstance(o, zk.Node) self.assertEqual(n.id, o.id) @@ -659,7 +675,7 @@ class TestZooKeeper(tests.DBTestCase): self.zk.lockNode(node) self.assertIsNotNone(node.lock) self.assertIsNotNone( - self.zk.client.exists(self.zk._nodeLockPath(node.id)) + self.zk.kazoo_client.exists(self.zk._nodeLockPath(node.id)) ) self.zk.unlockNode(node) self.assertIsNone(node.lock) @@ -678,7 +694,7 @@ class TestZooKeeper(tests.DBTestCase): self.zk.storeNode(node) self.assertIsNotNone(node.id) self.assertIsNotNone( - self.zk.client.exists(self.zk._nodePath(node.id)) + self.zk.kazoo_client.exists(self.zk._nodePath(node.id)) ) return node @@ -700,7 +716,7 @@ class TestZooKeeper(tests.DBTestCase): req.node_types.append('label1') self.zk.storeNodeRequest(req) self.assertIsNotNone( - self.zk.client.exists(self.zk._requestPath(req.id)) + self.zk.kazoo_client.exists(self.zk._requestPath(req.id)) ) return req @@ -722,14 +738,14 @@ class TestZooKeeper(tests.DBTestCase): req = self._create_node_request() self.zk.deleteNodeRequest(req) self.assertIsNone( - self.zk.client.exists(self.zk._requestPath(req.id)) + self.zk.kazoo_client.exists(self.zk._requestPath(req.id)) ) def test_deleteNode(self): n1 = self._create_node() self.zk.deleteNode(n1) self.assertIsNone( - self.zk.client.exists(self.zk._nodePath(n1.id)) + self.zk.kazoo_client.exists(self.zk._nodePath(n1.id)) ) def test_getReadyNodesOfTypes(self): diff --git a/nodepool/zk/zookeeper.py b/nodepool/zk/zookeeper.py index 38d8cf08c..4eaf8ca44 100644 --- a/nodepool/zk/zookeeper.py +++ b/nodepool/zk/zookeeper.py @@ -18,7 +18,6 @@ import logging import time import uuid -from kazoo.client import KazooState from kazoo import exceptions as kze from kazoo.recipe.lock import Lock from kazoo.recipe.cache import TreeCache, TreeEvent @@ -27,6 +26,7 @@ from kazoo.recipe.election import Election from nodepool import exceptions as npe from nodepool.logconfig import get_annotated_logger from nodepool.zk.components import COMPONENT_REGISTRY +from nodepool.zk import ZooKeeperBase # States: # We are building this image (or node) but it is not ready for use. @@ -676,7 +676,7 @@ class Node(BaseModel): self.requestor = d.get('requestor') -class ZooKeeper(object): +class ZooKeeper(ZooKeeperBase): ''' Class implementing the ZooKeeper interface. @@ -704,13 +704,11 @@ class ZooKeeper(object): # Log zookeeper retry every 10 seconds retry_log_rate = 10 - def __init__(self, zk_client, enable_cache=True): + def __init__(self, client, enable_cache=True): ''' Initialize the ZooKeeper object. ''' - self.zk_client = zk_client # nodepool.zk.ZooKeeperClient - self.client = zk_client.client # KazooClient - self._became_lost = False + super().__init__(client) self._last_retry_log = 0 self._node_cache = None self._request_cache = None @@ -719,22 +717,35 @@ class ZooKeeper(object): self.enable_cache = enable_cache self.node_stats_event = None - if self.enable_cache: - self._node_cache = TreeCache(self.client, self.NODE_ROOT) - self._node_cache.listen_fault(self.cacheFaultListener) - self._node_cache.listen(self.nodeCacheListener) - self._node_cache.start() + if self.client.connected: + self._onConnect() - self._request_cache = TreeCache(self.client, self.REQUEST_ROOT) - self._request_cache.listen_fault(self.cacheFaultListener) - self._request_cache.listen(self.requestCacheListener) - self._request_cache.start() - - COMPONENT_REGISTRY.create(self.zk_client) + COMPONENT_REGISTRY.create(self.client) # ======================================================================= # Private Methods # ======================================================================= + def _onConnect(self): + if self.enable_cache: + self._node_cache = TreeCache(self.kazoo_client, self.NODE_ROOT) + self._node_cache.listen_fault(self.cacheFaultListener) + self._node_cache.listen(self.nodeCacheListener) + self._node_cache.start() + + self._request_cache = TreeCache(self.kazoo_client, + self.REQUEST_ROOT) + self._request_cache.listen_fault(self.cacheFaultListener) + self._request_cache.listen(self.requestCacheListener) + self._request_cache.start() + + def _onDisconnect(self): + if self._node_cache is not None: + self._node_cache.close() + self._node_cache = None + + if self._request_cache is not None: + self._request_cache.close() + self._request_cache = None def _electionPath(self, election): return "%s/%s" % (self.ELECTION_ROOT, election) @@ -800,7 +811,7 @@ class ZooKeeper(object): def _getImageBuildLock(self, image, blocking=True, timeout=None): lock_path = self._imageBuildLockPath(image) try: - lock = Lock(self.client, lock_path) + lock = Lock(self.kazoo_client, lock_path) have_lock = lock.acquire(blocking, timeout) except kze.LockTimeout: raise npe.TimeoutException( @@ -820,7 +831,7 @@ class ZooKeeper(object): blocking=True, timeout=None): lock_path = self._imageBuildNumberLockPath(image, build_number) try: - lock = Lock(self.client, lock_path) + lock = Lock(self.kazoo_client, lock_path) have_lock = lock.acquire(blocking, timeout) except kze.LockTimeout: raise npe.TimeoutException( @@ -842,7 +853,7 @@ class ZooKeeper(object): lock_path = self._imageUploadNumberLockPath(image, build_number, provider, upload_number) try: - lock = Lock(self.client, lock_path) + lock = Lock(self.kazoo_client, lock_path) have_lock = lock.acquire(blocking, timeout) except kze.LockTimeout: raise npe.TimeoutException( @@ -864,7 +875,7 @@ class ZooKeeper(object): blocking=True, timeout=None): lock_path = self._imageUploadLockPath(image, build_number, provider) try: - lock = Lock(self.client, lock_path) + lock = Lock(self.kazoo_client, lock_path) have_lock = lock.acquire(blocking, timeout) except kze.LockTimeout: raise npe.TimeoutException( @@ -881,20 +892,6 @@ class ZooKeeper(object): return lock - def _connection_listener(self, state): - ''' - Listener method for Kazoo connection state changes. - - .. warning:: This method must not block. - ''' - if state == KazooState.LOST: - self.log.debug("ZooKeeper connection: LOST") - self._became_lost = True - elif state == KazooState.SUSPENDED: - self.log.debug("ZooKeeper connection: SUSPENDED") - else: - self.log.debug("ZooKeeper connection: CONNECTED") - def logConnectionRetryEvent(self): ''' Kazoo retry callback @@ -910,28 +907,15 @@ class ZooKeeper(object): @property def connected(self): - if self.client is None: - return False - return self.client.state == KazooState.CONNECTED + return self.client.connected @property def suspended(self): - if self.client is None: - return True - return self.client.state == KazooState.SUSPENDED + return self.client.suspended @property def lost(self): - if self.client is None: - return True - return self.client.state == KazooState.LOST - - @property - def didLoseConnection(self): - return self._became_lost - - def resetLostFlag(self): - self._became_lost = False + return self.client.lost def disconnect(self): ''' @@ -941,18 +925,7 @@ class ZooKeeper(object): cluster connection. ''' - if self._node_cache is not None: - self._node_cache.close() - self._node_cache = None - - if self._request_cache is not None: - self._request_cache.close() - self._request_cache = None - - if self.client is not None and self.client.connected: - self.client.stop() - self.client.close() - self.client = None + self.client.disconnect() def resetHosts(self, hosts): ''' @@ -960,8 +933,7 @@ class ZooKeeper(object): :param str host_list: A ZK host list ''' - if self.client is not None: - self.client.set_hosts(hosts=hosts) + self.client.resetHosts(hosts) @contextmanager def imageBuildLock(self, image, blocking=True, timeout=None): @@ -1086,7 +1058,7 @@ class ZooKeeper(object): path = self.IMAGE_ROOT try: - images = self.client.get_children(path) + images = self.kazoo_client.get_children(path) except kze.NoNodeError: return [] return sorted(images) @@ -1100,7 +1072,7 @@ class ZooKeeper(object): path = self._imagePausePath(image) try: - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) except kze.NoNodeError: return False return True @@ -1113,12 +1085,12 @@ class ZooKeeper(object): if paused: try: - self.client.create(path) + self.kazoo_client.create(path) except kze.NodeExistsError: pass else: try: - self.client.delete(path) + self.kazoo_client.delete(path) except kze.NoNodeError: pass @@ -1133,7 +1105,7 @@ class ZooKeeper(object): path = self._imageBuildsPath(image) try: - builds = self.client.get_children(path) + builds = self.kazoo_client.get_children(path) except kze.NoNodeError: return [] builds = [x for x in builds if x != 'lock'] @@ -1152,7 +1124,7 @@ class ZooKeeper(object): path = self._imageProviderPath(image, build_number) try: - providers = self.client.get_children(path) + providers = self.kazoo_client.get_children(path) except kze.NoNodeError: return [] @@ -1172,7 +1144,7 @@ class ZooKeeper(object): path = self._imageUploadPath(image, build_number, provider) try: - uploads = self.client.get_children(path) + uploads = self.kazoo_client.get_children(path) except kze.NoNodeError: return [] @@ -1191,7 +1163,7 @@ class ZooKeeper(object): path = self._imageBuildsPath(image) + "/%s" % build_number try: - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) except kze.NoNodeError: return None @@ -1218,7 +1190,7 @@ class ZooKeeper(object): path = self._imageBuildsPath(image) try: - builds = self.client.get_children(path) + builds = self.kazoo_client.get_children(path) except kze.NoNodeError: return [] @@ -1285,7 +1257,7 @@ class ZooKeeper(object): build_path = self._imageBuildsPath(image) + "/" if build_number is None: - path = self.client.create( + path = self.kazoo_client.create( build_path, value=build_data.serialize(), sequence=True, @@ -1293,7 +1265,7 @@ class ZooKeeper(object): build_number = path.split("/")[-1] else: path = build_path + build_number - self.client.set(path, build_data.serialize()) + self.kazoo_client.set(path, build_data.serialize()) return build_number @@ -1314,7 +1286,7 @@ class ZooKeeper(object): path = path + "/%s" % upload_number try: - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) except kze.NoNodeError: return None @@ -1347,7 +1319,7 @@ class ZooKeeper(object): path = self._imageUploadPath(image, build_number, provider) try: - uploads = self.client.get_children(path) + uploads = self.kazoo_client.get_children(path) except kze.NoNodeError: return [] @@ -1411,7 +1383,7 @@ class ZooKeeper(object): path = self._imageUploadPath(image, build_number, provider) try: - uploads = self.client.get_children(path) + uploads = self.kazoo_client.get_children(path) except kze.NoNodeError: uploads = [] @@ -1454,7 +1426,7 @@ class ZooKeeper(object): ''' # We expect the image builds path to already exist. build_path = self._imageBuildsPath(image) - if not self.client.exists(build_path): + if not self.kazoo_client.exists(build_path): raise npe.ZKException( "Cannot find build %s of image %s" % (build_number, image) ) @@ -1466,7 +1438,7 @@ class ZooKeeper(object): image, build_number, provider) + "/" if upload_number is None: - path = self.client.create( + path = self.kazoo_client.create( upload_path, value=image_data.serialize(), sequence=True, @@ -1474,7 +1446,7 @@ class ZooKeeper(object): upload_number = path.split("/")[-1] else: path = upload_path + upload_number - self.client.set(path, image_data.serialize()) + self.kazoo_client.set(path, image_data.serialize()) return upload_number @@ -1487,7 +1459,7 @@ class ZooKeeper(object): :returns: True if request is pending, False otherwise ''' path = self._imageBuildRequestPath(image) - if self.client.exists(path) is not None: + if self.kazoo_client.exists(path) is not None: return True return False @@ -1498,7 +1470,7 @@ class ZooKeeper(object): latest_build, *_ = builds builds_path = self._imageBuildNumberPath(image, latest_build) - return self.client.exists(builds_path) + return self.kazoo_client.exists(builds_path) def getBuildRequest(self, image): """Get a build request for the given image. @@ -1509,13 +1481,13 @@ class ZooKeeper(object): """ path = self._imageBuildRequestPath(image) try: - _, stat = self.client.get(path) + _, stat = self.kazoo_client.get(path) except kze.NoNodeError: return pending = True lock_path = self._imageBuildLockPath(image) - lock_stat = self.client.exists(lock_path) + lock_stat = self.kazoo_client.exists(lock_path) if lock_stat and lock_stat.children_count: build_stat = self._latestImageBuildStat(image) # If there is a lock, but no build we assume that the build @@ -1534,7 +1506,7 @@ class ZooKeeper(object): :param str image: The image name. ''' path = self._imageBuildRequestPath(image) - self.client.ensure_path(path) + self.kazoo_client.ensure_path(path) def removeBuildRequest(self, image): ''' @@ -1544,7 +1516,7 @@ class ZooKeeper(object): ''' path = self._imageBuildRequestPath(image) try: - self.client.delete(path) + self.kazoo_client.delete(path) except kze.NoNodeError: pass @@ -1571,7 +1543,7 @@ class ZooKeeper(object): try: # NOTE: Need to do recursively to remove lock znodes - self.client.delete(path, recursive=True) + self.kazoo_client.delete(path, recursive=True) except kze.NoNodeError: pass @@ -1590,7 +1562,7 @@ class ZooKeeper(object): path = path + "/%s" % upload_number try: # NOTE: Need to do recursively to remove lock znodes - self.client.delete(path, recursive=True) + self.kazoo_client.delete(path, recursive=True) except kze.NoNodeError: pass @@ -1610,7 +1582,7 @@ class ZooKeeper(object): :returns: A list of request nodes. ''' try: - requests = self.client.get_children(self.REQUEST_ROOT) + requests = self.kazoo_client.get_children(self.REQUEST_ROOT) except kze.NoNodeError: return [] @@ -1621,7 +1593,7 @@ class ZooKeeper(object): Get the current list of all node request lock ids. ''' try: - lock_ids = self.client.get_children(self.REQUEST_LOCK_ROOT) + lock_ids = self.kazoo_client.get_children(self.REQUEST_LOCK_ROOT) except kze.NoNodeError: return [] return lock_ids @@ -1640,7 +1612,7 @@ class ZooKeeper(object): ''' path = self._requestLockPath(lock_id) try: - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) except kze.NoNodeError: return None d = NodeRequestLockStats(lock_id) @@ -1655,7 +1627,7 @@ class ZooKeeper(object): ''' path = self._requestLockPath(lock_id) try: - self.client.delete(path, recursive=True) + self.kazoo_client.delete(path, recursive=True) except kze.NoNodeError: pass @@ -1680,7 +1652,7 @@ class ZooKeeper(object): # call. try: path = self._requestPath(request) - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) except kze.NoNodeError: return None @@ -1696,7 +1668,7 @@ class ZooKeeper(object): ''' path = self._requestPath(request.id) - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) if data: d = self._bytesToDict(data) @@ -1717,7 +1689,7 @@ class ZooKeeper(object): if not request.event_id: request.event_id = uuid.uuid4().hex path = "%s/%s-" % (self.REQUEST_ROOT, priority) - path = self.client.create( + path = self.kazoo_client.create( path, value=request.serialize(), ephemeral=True, @@ -1732,7 +1704,7 @@ class ZooKeeper(object): "Attempt to update non-existing request %s" % request) path = self._requestPath(request.id) - self.client.set(path, request.serialize()) + self.kazoo_client.set(path, request.serialize()) def deleteNodeRequest(self, request): ''' @@ -1745,7 +1717,7 @@ class ZooKeeper(object): path = self._requestPath(request.id) try: - self.client.delete(path) + self.kazoo_client.delete(path) except kze.NoNodeError: pass @@ -1772,7 +1744,7 @@ class ZooKeeper(object): node_request_id=request.id) path = self._requestLockPath(request.id) try: - lock = Lock(self.client, path) + lock = Lock(self.kazoo_client, path) have_lock = lock.acquire(blocking, timeout) except kze.LockTimeout: raise npe.TimeoutException( @@ -1833,7 +1805,7 @@ class ZooKeeper(object): ''' path = self._nodeLockPath(node.id) try: - lock = Lock(self.client, path, identifier) + lock = Lock(self.kazoo_client, path, identifier) have_lock = lock.acquire(blocking, timeout, ephemeral) except kze.LockTimeout: raise npe.TimeoutException( @@ -1875,7 +1847,7 @@ class ZooKeeper(object): ''' path = self._nodeLockPath(node.id) try: - self.client.delete(path, recursive=True) + self.kazoo_client.delete(path, recursive=True) except kze.NoNodeError: pass @@ -1884,7 +1856,7 @@ class ZooKeeper(object): Return the contenders for a node lock. ''' path = self._nodeLockPath(node.id) - lock = Lock(self.client, path) + lock = Lock(self.kazoo_client, path) return lock.contenders() def getNodes(self): @@ -1894,7 +1866,7 @@ class ZooKeeper(object): :returns: A list of nodes. ''' try: - return self.client.get_children(self.NODE_ROOT) + return self.kazoo_client.get_children(self.NODE_ROOT) except kze.NoNodeError: return [] @@ -1919,7 +1891,7 @@ class ZooKeeper(object): # call. try: path = self._nodePath(node) - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) except kze.NoNodeError: return None @@ -1939,7 +1911,7 @@ class ZooKeeper(object): ''' path = self._nodePath(node.id) - data, stat = self.client.get(path) + data, stat = self.kazoo_client.get(path) if data: d = self._bytesToDict(data) @@ -1971,7 +1943,7 @@ class ZooKeeper(object): else: node.created_time = time.time() - path = self.client.create( + path = self.kazoo_client.create( node_path, value=node.serialize(), sequence=True, @@ -1979,7 +1951,7 @@ class ZooKeeper(object): node.id = path.split("/")[-1] else: path = self._nodePath(node.id) - self.client.set(path, node.serialize()) + self.kazoo_client.set(path, node.serialize()) def watchNode(self, node, callback): '''Watch an existing node for changes. @@ -2000,7 +1972,7 @@ class ZooKeeper(object): return callback(node, deleted) path = self._nodePath(node.id) - self.client.DataWatch(path, _callback_wrapper) + self.kazoo_client.DataWatch(path, _callback_wrapper) def deleteRawNode(self, node_id): ''' @@ -2012,7 +1984,7 @@ class ZooKeeper(object): ''' path = self._nodePath(node_id) try: - self.client.delete(path, recursive=True) + self.kazoo_client.delete(path, recursive=True) except kze.NoNodeError: pass @@ -2031,7 +2003,7 @@ class ZooKeeper(object): # anything so that we can detect a race condition where the # lock is removed before the node deletion occurs. node.state = DELETED - self.client.set(path, node.serialize()) + self.kazoo_client.set(path, node.serialize()) self.deleteRawNode(node.id) def getReadyNodesOfTypes(self, labels, cached=True): @@ -2248,7 +2220,7 @@ class ZooKeeper(object): path = self._imageProviderPath(image, build) path = "%s/%s" % (path, provider_name) try: - self.client.delete(path, recursive=True) + self.kazoo_client.delete(path, recursive=True) except kze.NoNodeError: pass @@ -2389,7 +2361,7 @@ class ZooKeeper(object): def getStatsElection(self, identifier): path = self._electionPath('stats') - return Election(self.client, path, identifier) + return Election(self.kazoo_client, path, identifier) def exportImageData(self): ''' @@ -2404,7 +2376,7 @@ class ZooKeeper(object): for build_no in self.getBuildNumbers(image_name): build_path = self._imageBuildsPath(image_name) + "/" + build_no try: - build_data, stat = self.client.get(build_path) + build_data, stat = self.kazoo_client.get(build_path) except kze.NoNodeError: continue ret[build_path] = build_data.decode('utf8') @@ -2416,7 +2388,8 @@ class ZooKeeper(object): image_name, build_no, provider_name) + "/" upload_path += upload_no try: - upload_data, stat = self.client.get(upload_path) + upload_data, stat = self.kazoo_client.get( + upload_path) except kze.NoNodeError: continue ret[upload_path] = upload_data.decode('utf8') @@ -2466,19 +2439,19 @@ class ZooKeeper(object): highest_num[key] = max(highest_num.get(key, num), num) for path, num in highest_num.items(): for x in range(num): - node = self.client.create( + node = self.kazoo_client.create( path + '/', makepath=True, sequence=True) # If this node isn't in our import data, go ahead and # delete it. if node not in import_data: - self.client.delete(node) + self.kazoo_client.delete(node) for path, data in import_data.items(): # We may have already created a node above; in that # case, just set the data on it. - if self.client.exists(path): - self.client.set(path, - value=data.encode('utf8')) + if self.kazoo_client.exists(path): + self.kazoo_client.set(path, + value=data.encode('utf8')) else: - self.client.create(path, - value=data.encode('utf8'), - makepath=True) + self.kazoo_client.create(path, + value=data.encode('utf8'), + makepath=True) diff --git a/tools/print-zk.py b/tools/print-zk.py index c7d2d2c5b..03e90168a 100755 --- a/tools/print-zk.py +++ b/tools/print-zk.py @@ -47,12 +47,12 @@ def join(a, b): return a+'/'+b def print_tree(node): - data, stat = zk.client.get(node) + data, stat = zk.kazoo_client.get(node) print("Node: %s %s" % (node, stat)) if data: print(data) - for child in zk.client.get_children(node): + for child in zk.kazoo_client.get_children(node): print() print_tree(join(node, child))