Update ZooKeeper class connection methods

This updates the ZooKeeper class to inherit from ZooKeeperBase
and utilize its connection methods.

It also moves the connection loss detection used by the builder
to be more localized and removes unused methods.

Change-Id: I6c9dbe17976560bc024f74cd31bdb6305d51168d
This commit is contained in:
James E. Blair 2022-06-17 12:50:37 -07:00
parent d01f19d762
commit 7bbdfdc9fd
8 changed files with 178 additions and 184 deletions

View File

@ -597,6 +597,11 @@ class BuildWorker(BaseWorker):
interval, zk)
self.log = logging.getLogger("nodepool.builder.BuildWorker.%s" % name)
self.name = 'BuildWorker.%s' % name
self._lost_zk_connection = False
zk.client.on_connection_lost_listeners.append(self._onConnectionLost)
def _onConnectionLost(self):
self._lost_zk_connection = True
def _getBuildLogRoot(self, name):
log_dir = self._config.build_log_dir
@ -697,6 +702,7 @@ class BuildWorker(BaseWorker):
:returns: The updated ImageBuild data structure.
'''
self._lost_zk_connection = False
data = zk.ImageBuild()
data.state = zk.BUILDING
data.builder_id = self._builder_id
@ -951,9 +957,8 @@ class BuildWorker(BaseWorker):
if self._statsd:
pipeline = self._statsd.pipeline()
if self._zk.didLoseConnection:
if self._lost_zk_connection:
self.log.info("ZooKeeper lost while building %s" % diskimage.name)
self._zk.resetLostFlag()
build_data.state = zk.FAILED
elif p.returncode or did_timeout:
self.log.info(

View File

@ -381,7 +381,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
# Make sure we're always registered with ZK
hostname = socket.gethostname()
self.component_info = PoolComponent(
self.zk.zk_client, hostname,
self.zk.client, hostname,
version=get_version_string())
labels = set()
for prov_cfg in self.nodepool.config.providers.values():

View File

@ -689,21 +689,21 @@ class DBTestCase(BaseTestCase):
return a + b
return a + '/' + b
data, stat = self.zk.client.get(node)
data, stat = self.zk.kazoo_client.get(node)
self.log.debug("Node: %s" % (node,))
if data:
self.log.debug(data)
for child in self.zk.client.get_children(node):
for child in self.zk.kazoo_client.get_children(node):
self.printZKTree(join(node, child))
def getZKTree(self, path, ret=None):
"""Return the contents of a ZK tree as a dictionary"""
if ret is None:
ret = {}
for key in self.zk.client.get_children(path):
for key in self.zk.kazoo_client.get_children(path):
subpath = os.path.join(path, key)
ret[subpath] = self.zk.client.get(
ret[subpath] = self.zk.kazoo_client.get(
os.path.join(path, key))[0]
self.getZKTree(subpath, ret)
return ret

View File

@ -448,7 +448,7 @@ class TestNodepoolCMD(tests.DBTestCase):
"-c", configfile, 'export-image-data', tf.name)
nodepoolcmd.main()
# Delete data from ZK
self.zk.client.delete('/nodepool', recursive=True)
self.zk.kazoo_client.delete('/nodepool', recursive=True)
self.patch_argv(
"-c", configfile, 'import-image-data', tf.name)

View File

@ -91,7 +91,7 @@ class TestLauncher(tests.DBTestCase):
# Verify the cleanup thread removed the lock
self.assertIsNotNone(
self.zk.client.exists(self.zk._requestLockPath(req.id))
self.zk.kazoo_client.exists(self.zk._requestLockPath(req.id))
)
self.zk.deleteNodeRequest(req)
self.waitForNodeRequestLockDeletion(req.id)
@ -766,7 +766,7 @@ class TestLauncher(tests.DBTestCase):
# than what we are going to request.
hostname = socket.gethostname()
dummy_component = PoolComponent(
self.zk.zk_client, hostname,
self.zk.client, hostname,
version=get_version_string())
dummy_component.content.update({
'id': 'dummy',
@ -2426,14 +2426,14 @@ class TestLauncher(tests.DBTestCase):
# Create empty node
path = "%s" % self.zk._nodePath("12345")
self.log.debug("node path %s", path)
self.zk.client.create(path, makepath=True)
self.assertTrue(self.zk.client.exists(path))
self.zk.kazoo_client.create(path, makepath=True)
self.assertTrue(self.zk.kazoo_client.exists(path))
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.cleanup_interval = .1
pool.start()
while self.zk.client.exists(path):
while self.zk.kazoo_client.exists(path):
time.sleep(.1)
def test_leaked_port_cleanup(self):

View File

@ -30,7 +30,7 @@ class TestComponentRegistry(tests.DBTestCase):
def test_pool_component(self):
hostname = socket.gethostname()
launcher = PoolComponent(
self.zk.zk_client, hostname,
self.zk.client, hostname,
version=get_version_string())
launcher.content.update({
'id': "launcher-Poolworker.provider-main-" + uuid.uuid4().hex,
@ -92,7 +92,7 @@ class TestZooKeeper(tests.DBTestCase):
def test_imageBuildLock(self):
path = self.zk._imageBuildLockPath("ubuntu-trusty")
with self.zk.imageBuildLock("ubuntu-trusty", blocking=False):
self.assertIsNotNone(self.zk.client.exists(path))
self.assertIsNotNone(self.zk.kazoo_client.exists(path))
def test_imageBuildLock_exception_nonblocking(self):
image = "ubuntu-trusty"
@ -115,7 +115,7 @@ class TestZooKeeper(tests.DBTestCase):
with self.zk.imageBuildNumberLock(
"ubuntu-trusty", "0000", blocking=False
):
self.assertIsNotNone(self.zk.client.exists(path))
self.assertIsNotNone(self.zk.kazoo_client.exists(path))
def test_imageBuildNumberLock_exception_nonblocking(self):
image = "ubuntu-trusty"
@ -152,12 +152,12 @@ class TestZooKeeper(tests.DBTestCase):
upload.provider_name,
upload.id)
with self.zk.imageUploadNumberLock(upload, blocking=False):
self.assertIsNotNone(self.zk.client.exists(path))
self.assertIsNotNone(self.zk.kazoo_client.exists(path))
self.zk.deleteUpload(upload.image_name,
upload.build_id,
upload.provider_name,
upload.id)
self.assertIsNone(self.zk.client.exists(path))
self.assertIsNone(self.zk.kazoo_client.exists(path))
def test_imageUploadNumberLock_orphan(self):
upload = zk.ImageUpload()
@ -171,7 +171,7 @@ class TestZooKeeper(tests.DBTestCase):
upload.id)
with self.zk.imageUploadNumberLock(upload, blocking=False):
self.assertIsNotNone(self.zk.client.exists(path))
self.assertIsNotNone(self.zk.kazoo_client.exists(path))
self.zk.deleteUpload(upload.image_name,
upload.build_id,
upload.provider_name,
@ -183,7 +183,7 @@ class TestZooKeeper(tests.DBTestCase):
# We now recreated an empty image upload number node
pass
self.assertIsNotNone(self.zk.client.exists(path))
self.assertIsNotNone(self.zk.kazoo_client.exists(path))
# Should not throw an exception because of the empty upload
self.zk.getImageUpload(upload.image_name, upload.build_id,
@ -200,7 +200,7 @@ class TestZooKeeper(tests.DBTestCase):
upload.provider_name,
upload.id)
with self.zk.imageUploadNumberLock(upload, blocking=False):
self.assertIsNotNone(self.zk.client.exists(path))
self.assertIsNotNone(self.zk.kazoo_client.exists(path))
def test_imageUploadNumberLock_exception_nonblocking(self):
upload = zk.ImageUpload()
@ -232,7 +232,7 @@ class TestZooKeeper(tests.DBTestCase):
path = self.zk._imageUploadLockPath("ubuntu-trusty", "0000", "prov1")
with self.zk.imageUploadLock("ubuntu-trusty", "0000", "prov1",
blocking=False):
self.assertIsNotNone(self.zk.client.exists(path))
self.assertIsNotNone(self.zk.kazoo_client.exists(path))
def test_imageUploadLock_exception_nonblocking(self):
image = "ubuntu-trusty"
@ -372,7 +372,7 @@ class TestZooKeeper(tests.DBTestCase):
# We now created an empty build number node
pass
self.assertIsNotNone(self.zk.client.exists(path))
self.assertIsNotNone(self.zk.kazoo_client.exists(path))
# Should not throw an exception because of the empty upload
self.assertIsNone(self.zk.getBuild(image, build_number))
@ -501,11 +501,15 @@ class TestZooKeeper(tests.DBTestCase):
v3.state = zk.FAILED
v4 = zk.ImageBuild()
v4.state = zk.DELETING
self.zk.client.create(path + "/1", value=v1.serialize(), makepath=True)
self.zk.client.create(path + "/2", value=v2.serialize(), makepath=True)
self.zk.client.create(path + "/3", value=v3.serialize(), makepath=True)
self.zk.client.create(path + "/4", value=v4.serialize(), makepath=True)
self.zk.client.create(path + "/lock", makepath=True)
self.zk.kazoo_client.create(path + "/1", value=v1.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/2", value=v2.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/3", value=v3.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/4", value=v4.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/lock", makepath=True)
matches = self.zk.getBuilds(image, None)
self.assertEqual(4, len(matches))
@ -521,11 +525,15 @@ class TestZooKeeper(tests.DBTestCase):
v3.state = zk.FAILED
v4 = zk.ImageBuild()
v4.state = zk.DELETING
self.zk.client.create(path + "/1", value=v1.serialize(), makepath=True)
self.zk.client.create(path + "/2", value=v2.serialize(), makepath=True)
self.zk.client.create(path + "/3", value=v3.serialize(), makepath=True)
self.zk.client.create(path + "/4", value=v4.serialize(), makepath=True)
self.zk.client.create(path + "/lock", makepath=True)
self.zk.kazoo_client.create(path + "/1", value=v1.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/2", value=v2.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/3", value=v3.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/4", value=v4.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/lock", makepath=True)
matches = self.zk.getBuilds(image, [zk.DELETING, zk.FAILED])
self.assertEqual(2, len(matches))
@ -540,11 +548,15 @@ class TestZooKeeper(tests.DBTestCase):
v3.state = zk.FAILED
v4 = zk.ImageUpload()
v4.state = zk.DELETING
self.zk.client.create(path + "/1", value=v1.serialize(), makepath=True)
self.zk.client.create(path + "/2", value=v2.serialize(), makepath=True)
self.zk.client.create(path + "/3", value=v3.serialize(), makepath=True)
self.zk.client.create(path + "/4", value=v4.serialize(), makepath=True)
self.zk.client.create(path + "/lock", makepath=True)
self.zk.kazoo_client.create(path + "/1", value=v1.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/2", value=v2.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/3", value=v3.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/4", value=v4.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/lock", makepath=True)
matches = self.zk.getUploads("trusty", "000", "rax",
[zk.DELETING, zk.FAILED])
@ -560,11 +572,15 @@ class TestZooKeeper(tests.DBTestCase):
v3.state = zk.FAILED
v4 = zk.ImageUpload()
v4.state = zk.DELETING
self.zk.client.create(path + "/1", value=v1.serialize(), makepath=True)
self.zk.client.create(path + "/2", value=v2.serialize(), makepath=True)
self.zk.client.create(path + "/3", value=v3.serialize(), makepath=True)
self.zk.client.create(path + "/4", value=v4.serialize(), makepath=True)
self.zk.client.create(path + "/lock", makepath=True)
self.zk.kazoo_client.create(path + "/1", value=v1.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/2", value=v2.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/3", value=v3.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/4", value=v4.serialize(),
makepath=True)
self.zk.kazoo_client.create(path + "/lock", makepath=True)
matches = self.zk.getUploads("trusty", "000", "rax", None)
self.assertEqual(4, len(matches))
@ -592,9 +608,9 @@ class TestZooKeeper(tests.DBTestCase):
def test_deleteUpload(self):
path = self.zk._imageUploadPath("trusty", "000", "rax") + "/000001"
self.zk.client.create(path, makepath=True)
self.zk.kazoo_client.create(path, makepath=True)
self.zk.deleteUpload("trusty", "000", "rax", "000001")
self.assertIsNone(self.zk.client.exists(path))
self.assertIsNone(self.zk.kazoo_client.exists(path))
def test_getNodeRequests_empty(self):
self.assertEqual([], self.zk.getNodeRequests())
@ -604,10 +620,10 @@ class TestZooKeeper(tests.DBTestCase):
r2 = self.zk._requestPath("100-456")
r3 = self.zk._requestPath("100-123")
r4 = self.zk._requestPath("400-123")
self.zk.client.create(r1, makepath=True, ephemeral=True)
self.zk.client.create(r2, makepath=True, ephemeral=True)
self.zk.client.create(r3, makepath=True, ephemeral=True)
self.zk.client.create(r4, makepath=True, ephemeral=True)
self.zk.kazoo_client.create(r1, makepath=True, ephemeral=True)
self.zk.kazoo_client.create(r2, makepath=True, ephemeral=True)
self.zk.kazoo_client.create(r3, makepath=True, ephemeral=True)
self.zk.kazoo_client.create(r4, makepath=True, ephemeral=True)
self.assertEqual(
["100-123", "100-456", "400-123", "500-123"],
@ -618,7 +634,7 @@ class TestZooKeeper(tests.DBTestCase):
r = zk.NodeRequest("500-123")
r.state = zk.REQUESTED
path = self.zk._requestPath(r.id)
self.zk.client.create(path, value=r.serialize(),
self.zk.kazoo_client.create(path, value=r.serialize(),
makepath=True, ephemeral=True)
o = self.zk.getNodeRequest(r.id)
self.assertIsInstance(o, zk.NodeRequest)
@ -628,8 +644,8 @@ class TestZooKeeper(tests.DBTestCase):
self.assertIsNone(self.zk.getNodeRequest("invalid"))
def test_getNodes(self):
self.zk.client.create(self.zk._nodePath('100'), makepath=True)
self.zk.client.create(self.zk._nodePath('200'), makepath=True)
self.zk.kazoo_client.create(self.zk._nodePath('100'), makepath=True)
self.zk.kazoo_client.create(self.zk._nodePath('200'), makepath=True)
nodes = self.zk.getNodes()
self.assertIn('100', nodes)
self.assertIn('200', nodes)
@ -638,7 +654,7 @@ class TestZooKeeper(tests.DBTestCase):
n = zk.Node('100')
n.state = zk.BUILDING
path = self.zk._nodePath(n.id)
self.zk.client.create(path, value=n.serialize(), makepath=True)
self.zk.kazoo_client.create(path, value=n.serialize(), makepath=True)
o = self.zk.getNode(n.id)
self.assertIsInstance(o, zk.Node)
self.assertEqual(n.id, o.id)
@ -659,7 +675,7 @@ class TestZooKeeper(tests.DBTestCase):
self.zk.lockNode(node)
self.assertIsNotNone(node.lock)
self.assertIsNotNone(
self.zk.client.exists(self.zk._nodeLockPath(node.id))
self.zk.kazoo_client.exists(self.zk._nodeLockPath(node.id))
)
self.zk.unlockNode(node)
self.assertIsNone(node.lock)
@ -678,7 +694,7 @@ class TestZooKeeper(tests.DBTestCase):
self.zk.storeNode(node)
self.assertIsNotNone(node.id)
self.assertIsNotNone(
self.zk.client.exists(self.zk._nodePath(node.id))
self.zk.kazoo_client.exists(self.zk._nodePath(node.id))
)
return node
@ -700,7 +716,7 @@ class TestZooKeeper(tests.DBTestCase):
req.node_types.append('label1')
self.zk.storeNodeRequest(req)
self.assertIsNotNone(
self.zk.client.exists(self.zk._requestPath(req.id))
self.zk.kazoo_client.exists(self.zk._requestPath(req.id))
)
return req
@ -722,14 +738,14 @@ class TestZooKeeper(tests.DBTestCase):
req = self._create_node_request()
self.zk.deleteNodeRequest(req)
self.assertIsNone(
self.zk.client.exists(self.zk._requestPath(req.id))
self.zk.kazoo_client.exists(self.zk._requestPath(req.id))
)
def test_deleteNode(self):
n1 = self._create_node()
self.zk.deleteNode(n1)
self.assertIsNone(
self.zk.client.exists(self.zk._nodePath(n1.id))
self.zk.kazoo_client.exists(self.zk._nodePath(n1.id))
)
def test_getReadyNodesOfTypes(self):

View File

@ -18,7 +18,6 @@ import logging
import time
import uuid
from kazoo.client import KazooState
from kazoo import exceptions as kze
from kazoo.recipe.lock import Lock
from kazoo.recipe.cache import TreeCache, TreeEvent
@ -27,6 +26,7 @@ from kazoo.recipe.election import Election
from nodepool import exceptions as npe
from nodepool.logconfig import get_annotated_logger
from nodepool.zk.components import COMPONENT_REGISTRY
from nodepool.zk import ZooKeeperBase
# States:
# We are building this image (or node) but it is not ready for use.
@ -676,7 +676,7 @@ class Node(BaseModel):
self.requestor = d.get('requestor')
class ZooKeeper(object):
class ZooKeeper(ZooKeeperBase):
'''
Class implementing the ZooKeeper interface.
@ -704,13 +704,11 @@ class ZooKeeper(object):
# Log zookeeper retry every 10 seconds
retry_log_rate = 10
def __init__(self, zk_client, enable_cache=True):
def __init__(self, client, enable_cache=True):
'''
Initialize the ZooKeeper object.
'''
self.zk_client = zk_client # nodepool.zk.ZooKeeperClient
self.client = zk_client.client # KazooClient
self._became_lost = False
super().__init__(client)
self._last_retry_log = 0
self._node_cache = None
self._request_cache = None
@ -719,22 +717,35 @@ class ZooKeeper(object):
self.enable_cache = enable_cache
self.node_stats_event = None
if self.enable_cache:
self._node_cache = TreeCache(self.client, self.NODE_ROOT)
self._node_cache.listen_fault(self.cacheFaultListener)
self._node_cache.listen(self.nodeCacheListener)
self._node_cache.start()
if self.client.connected:
self._onConnect()
self._request_cache = TreeCache(self.client, self.REQUEST_ROOT)
self._request_cache.listen_fault(self.cacheFaultListener)
self._request_cache.listen(self.requestCacheListener)
self._request_cache.start()
COMPONENT_REGISTRY.create(self.zk_client)
COMPONENT_REGISTRY.create(self.client)
# =======================================================================
# Private Methods
# =======================================================================
def _onConnect(self):
if self.enable_cache:
self._node_cache = TreeCache(self.kazoo_client, self.NODE_ROOT)
self._node_cache.listen_fault(self.cacheFaultListener)
self._node_cache.listen(self.nodeCacheListener)
self._node_cache.start()
self._request_cache = TreeCache(self.kazoo_client,
self.REQUEST_ROOT)
self._request_cache.listen_fault(self.cacheFaultListener)
self._request_cache.listen(self.requestCacheListener)
self._request_cache.start()
def _onDisconnect(self):
if self._node_cache is not None:
self._node_cache.close()
self._node_cache = None
if self._request_cache is not None:
self._request_cache.close()
self._request_cache = None
def _electionPath(self, election):
return "%s/%s" % (self.ELECTION_ROOT, election)
@ -800,7 +811,7 @@ class ZooKeeper(object):
def _getImageBuildLock(self, image, blocking=True, timeout=None):
lock_path = self._imageBuildLockPath(image)
try:
lock = Lock(self.client, lock_path)
lock = Lock(self.kazoo_client, lock_path)
have_lock = lock.acquire(blocking, timeout)
except kze.LockTimeout:
raise npe.TimeoutException(
@ -820,7 +831,7 @@ class ZooKeeper(object):
blocking=True, timeout=None):
lock_path = self._imageBuildNumberLockPath(image, build_number)
try:
lock = Lock(self.client, lock_path)
lock = Lock(self.kazoo_client, lock_path)
have_lock = lock.acquire(blocking, timeout)
except kze.LockTimeout:
raise npe.TimeoutException(
@ -842,7 +853,7 @@ class ZooKeeper(object):
lock_path = self._imageUploadNumberLockPath(image, build_number,
provider, upload_number)
try:
lock = Lock(self.client, lock_path)
lock = Lock(self.kazoo_client, lock_path)
have_lock = lock.acquire(blocking, timeout)
except kze.LockTimeout:
raise npe.TimeoutException(
@ -864,7 +875,7 @@ class ZooKeeper(object):
blocking=True, timeout=None):
lock_path = self._imageUploadLockPath(image, build_number, provider)
try:
lock = Lock(self.client, lock_path)
lock = Lock(self.kazoo_client, lock_path)
have_lock = lock.acquire(blocking, timeout)
except kze.LockTimeout:
raise npe.TimeoutException(
@ -881,20 +892,6 @@ class ZooKeeper(object):
return lock
def _connection_listener(self, state):
'''
Listener method for Kazoo connection state changes.
.. warning:: This method must not block.
'''
if state == KazooState.LOST:
self.log.debug("ZooKeeper connection: LOST")
self._became_lost = True
elif state == KazooState.SUSPENDED:
self.log.debug("ZooKeeper connection: SUSPENDED")
else:
self.log.debug("ZooKeeper connection: CONNECTED")
def logConnectionRetryEvent(self):
'''
Kazoo retry callback
@ -910,28 +907,15 @@ class ZooKeeper(object):
@property
def connected(self):
if self.client is None:
return False
return self.client.state == KazooState.CONNECTED
return self.client.connected
@property
def suspended(self):
if self.client is None:
return True
return self.client.state == KazooState.SUSPENDED
return self.client.suspended
@property
def lost(self):
if self.client is None:
return True
return self.client.state == KazooState.LOST
@property
def didLoseConnection(self):
return self._became_lost
def resetLostFlag(self):
self._became_lost = False
return self.client.lost
def disconnect(self):
'''
@ -941,18 +925,7 @@ class ZooKeeper(object):
cluster connection.
'''
if self._node_cache is not None:
self._node_cache.close()
self._node_cache = None
if self._request_cache is not None:
self._request_cache.close()
self._request_cache = None
if self.client is not None and self.client.connected:
self.client.stop()
self.client.close()
self.client = None
self.client.disconnect()
def resetHosts(self, hosts):
'''
@ -960,8 +933,7 @@ class ZooKeeper(object):
:param str host_list: A ZK host list
'''
if self.client is not None:
self.client.set_hosts(hosts=hosts)
self.client.resetHosts(hosts)
@contextmanager
def imageBuildLock(self, image, blocking=True, timeout=None):
@ -1086,7 +1058,7 @@ class ZooKeeper(object):
path = self.IMAGE_ROOT
try:
images = self.client.get_children(path)
images = self.kazoo_client.get_children(path)
except kze.NoNodeError:
return []
return sorted(images)
@ -1100,7 +1072,7 @@ class ZooKeeper(object):
path = self._imagePausePath(image)
try:
data, stat = self.client.get(path)
data, stat = self.kazoo_client.get(path)
except kze.NoNodeError:
return False
return True
@ -1113,12 +1085,12 @@ class ZooKeeper(object):
if paused:
try:
self.client.create(path)
self.kazoo_client.create(path)
except kze.NodeExistsError:
pass
else:
try:
self.client.delete(path)
self.kazoo_client.delete(path)
except kze.NoNodeError:
pass
@ -1133,7 +1105,7 @@ class ZooKeeper(object):
path = self._imageBuildsPath(image)
try:
builds = self.client.get_children(path)
builds = self.kazoo_client.get_children(path)
except kze.NoNodeError:
return []
builds = [x for x in builds if x != 'lock']
@ -1152,7 +1124,7 @@ class ZooKeeper(object):
path = self._imageProviderPath(image, build_number)
try:
providers = self.client.get_children(path)
providers = self.kazoo_client.get_children(path)
except kze.NoNodeError:
return []
@ -1172,7 +1144,7 @@ class ZooKeeper(object):
path = self._imageUploadPath(image, build_number, provider)
try:
uploads = self.client.get_children(path)
uploads = self.kazoo_client.get_children(path)
except kze.NoNodeError:
return []
@ -1191,7 +1163,7 @@ class ZooKeeper(object):
path = self._imageBuildsPath(image) + "/%s" % build_number
try:
data, stat = self.client.get(path)
data, stat = self.kazoo_client.get(path)
except kze.NoNodeError:
return None
@ -1218,7 +1190,7 @@ class ZooKeeper(object):
path = self._imageBuildsPath(image)
try:
builds = self.client.get_children(path)
builds = self.kazoo_client.get_children(path)
except kze.NoNodeError:
return []
@ -1285,7 +1257,7 @@ class ZooKeeper(object):
build_path = self._imageBuildsPath(image) + "/"
if build_number is None:
path = self.client.create(
path = self.kazoo_client.create(
build_path,
value=build_data.serialize(),
sequence=True,
@ -1293,7 +1265,7 @@ class ZooKeeper(object):
build_number = path.split("/")[-1]
else:
path = build_path + build_number
self.client.set(path, build_data.serialize())
self.kazoo_client.set(path, build_data.serialize())
return build_number
@ -1314,7 +1286,7 @@ class ZooKeeper(object):
path = path + "/%s" % upload_number
try:
data, stat = self.client.get(path)
data, stat = self.kazoo_client.get(path)
except kze.NoNodeError:
return None
@ -1347,7 +1319,7 @@ class ZooKeeper(object):
path = self._imageUploadPath(image, build_number, provider)
try:
uploads = self.client.get_children(path)
uploads = self.kazoo_client.get_children(path)
except kze.NoNodeError:
return []
@ -1411,7 +1383,7 @@ class ZooKeeper(object):
path = self._imageUploadPath(image, build_number, provider)
try:
uploads = self.client.get_children(path)
uploads = self.kazoo_client.get_children(path)
except kze.NoNodeError:
uploads = []
@ -1454,7 +1426,7 @@ class ZooKeeper(object):
'''
# We expect the image builds path to already exist.
build_path = self._imageBuildsPath(image)
if not self.client.exists(build_path):
if not self.kazoo_client.exists(build_path):
raise npe.ZKException(
"Cannot find build %s of image %s" % (build_number, image)
)
@ -1466,7 +1438,7 @@ class ZooKeeper(object):
image, build_number, provider) + "/"
if upload_number is None:
path = self.client.create(
path = self.kazoo_client.create(
upload_path,
value=image_data.serialize(),
sequence=True,
@ -1474,7 +1446,7 @@ class ZooKeeper(object):
upload_number = path.split("/")[-1]
else:
path = upload_path + upload_number
self.client.set(path, image_data.serialize())
self.kazoo_client.set(path, image_data.serialize())
return upload_number
@ -1487,7 +1459,7 @@ class ZooKeeper(object):
:returns: True if request is pending, False otherwise
'''
path = self._imageBuildRequestPath(image)
if self.client.exists(path) is not None:
if self.kazoo_client.exists(path) is not None:
return True
return False
@ -1498,7 +1470,7 @@ class ZooKeeper(object):
latest_build, *_ = builds
builds_path = self._imageBuildNumberPath(image, latest_build)
return self.client.exists(builds_path)
return self.kazoo_client.exists(builds_path)
def getBuildRequest(self, image):
"""Get a build request for the given image.
@ -1509,13 +1481,13 @@ class ZooKeeper(object):
"""
path = self._imageBuildRequestPath(image)
try:
_, stat = self.client.get(path)
_, stat = self.kazoo_client.get(path)
except kze.NoNodeError:
return
pending = True
lock_path = self._imageBuildLockPath(image)
lock_stat = self.client.exists(lock_path)
lock_stat = self.kazoo_client.exists(lock_path)
if lock_stat and lock_stat.children_count:
build_stat = self._latestImageBuildStat(image)
# If there is a lock, but no build we assume that the build
@ -1534,7 +1506,7 @@ class ZooKeeper(object):
:param str image: The image name.
'''
path = self._imageBuildRequestPath(image)
self.client.ensure_path(path)
self.kazoo_client.ensure_path(path)
def removeBuildRequest(self, image):
'''
@ -1544,7 +1516,7 @@ class ZooKeeper(object):
'''
path = self._imageBuildRequestPath(image)
try:
self.client.delete(path)
self.kazoo_client.delete(path)
except kze.NoNodeError:
pass
@ -1571,7 +1543,7 @@ class ZooKeeper(object):
try:
# NOTE: Need to do recursively to remove lock znodes
self.client.delete(path, recursive=True)
self.kazoo_client.delete(path, recursive=True)
except kze.NoNodeError:
pass
@ -1590,7 +1562,7 @@ class ZooKeeper(object):
path = path + "/%s" % upload_number
try:
# NOTE: Need to do recursively to remove lock znodes
self.client.delete(path, recursive=True)
self.kazoo_client.delete(path, recursive=True)
except kze.NoNodeError:
pass
@ -1610,7 +1582,7 @@ class ZooKeeper(object):
:returns: A list of request nodes.
'''
try:
requests = self.client.get_children(self.REQUEST_ROOT)
requests = self.kazoo_client.get_children(self.REQUEST_ROOT)
except kze.NoNodeError:
return []
@ -1621,7 +1593,7 @@ class ZooKeeper(object):
Get the current list of all node request lock ids.
'''
try:
lock_ids = self.client.get_children(self.REQUEST_LOCK_ROOT)
lock_ids = self.kazoo_client.get_children(self.REQUEST_LOCK_ROOT)
except kze.NoNodeError:
return []
return lock_ids
@ -1640,7 +1612,7 @@ class ZooKeeper(object):
'''
path = self._requestLockPath(lock_id)
try:
data, stat = self.client.get(path)
data, stat = self.kazoo_client.get(path)
except kze.NoNodeError:
return None
d = NodeRequestLockStats(lock_id)
@ -1655,7 +1627,7 @@ class ZooKeeper(object):
'''
path = self._requestLockPath(lock_id)
try:
self.client.delete(path, recursive=True)
self.kazoo_client.delete(path, recursive=True)
except kze.NoNodeError:
pass
@ -1680,7 +1652,7 @@ class ZooKeeper(object):
# call.
try:
path = self._requestPath(request)
data, stat = self.client.get(path)
data, stat = self.kazoo_client.get(path)
except kze.NoNodeError:
return None
@ -1696,7 +1668,7 @@ class ZooKeeper(object):
'''
path = self._requestPath(request.id)
data, stat = self.client.get(path)
data, stat = self.kazoo_client.get(path)
if data:
d = self._bytesToDict(data)
@ -1717,7 +1689,7 @@ class ZooKeeper(object):
if not request.event_id:
request.event_id = uuid.uuid4().hex
path = "%s/%s-" % (self.REQUEST_ROOT, priority)
path = self.client.create(
path = self.kazoo_client.create(
path,
value=request.serialize(),
ephemeral=True,
@ -1732,7 +1704,7 @@ class ZooKeeper(object):
"Attempt to update non-existing request %s" % request)
path = self._requestPath(request.id)
self.client.set(path, request.serialize())
self.kazoo_client.set(path, request.serialize())
def deleteNodeRequest(self, request):
'''
@ -1745,7 +1717,7 @@ class ZooKeeper(object):
path = self._requestPath(request.id)
try:
self.client.delete(path)
self.kazoo_client.delete(path)
except kze.NoNodeError:
pass
@ -1772,7 +1744,7 @@ class ZooKeeper(object):
node_request_id=request.id)
path = self._requestLockPath(request.id)
try:
lock = Lock(self.client, path)
lock = Lock(self.kazoo_client, path)
have_lock = lock.acquire(blocking, timeout)
except kze.LockTimeout:
raise npe.TimeoutException(
@ -1833,7 +1805,7 @@ class ZooKeeper(object):
'''
path = self._nodeLockPath(node.id)
try:
lock = Lock(self.client, path, identifier)
lock = Lock(self.kazoo_client, path, identifier)
have_lock = lock.acquire(blocking, timeout, ephemeral)
except kze.LockTimeout:
raise npe.TimeoutException(
@ -1875,7 +1847,7 @@ class ZooKeeper(object):
'''
path = self._nodeLockPath(node.id)
try:
self.client.delete(path, recursive=True)
self.kazoo_client.delete(path, recursive=True)
except kze.NoNodeError:
pass
@ -1884,7 +1856,7 @@ class ZooKeeper(object):
Return the contenders for a node lock.
'''
path = self._nodeLockPath(node.id)
lock = Lock(self.client, path)
lock = Lock(self.kazoo_client, path)
return lock.contenders()
def getNodes(self):
@ -1894,7 +1866,7 @@ class ZooKeeper(object):
:returns: A list of nodes.
'''
try:
return self.client.get_children(self.NODE_ROOT)
return self.kazoo_client.get_children(self.NODE_ROOT)
except kze.NoNodeError:
return []
@ -1919,7 +1891,7 @@ class ZooKeeper(object):
# call.
try:
path = self._nodePath(node)
data, stat = self.client.get(path)
data, stat = self.kazoo_client.get(path)
except kze.NoNodeError:
return None
@ -1939,7 +1911,7 @@ class ZooKeeper(object):
'''
path = self._nodePath(node.id)
data, stat = self.client.get(path)
data, stat = self.kazoo_client.get(path)
if data:
d = self._bytesToDict(data)
@ -1971,7 +1943,7 @@ class ZooKeeper(object):
else:
node.created_time = time.time()
path = self.client.create(
path = self.kazoo_client.create(
node_path,
value=node.serialize(),
sequence=True,
@ -1979,7 +1951,7 @@ class ZooKeeper(object):
node.id = path.split("/")[-1]
else:
path = self._nodePath(node.id)
self.client.set(path, node.serialize())
self.kazoo_client.set(path, node.serialize())
def watchNode(self, node, callback):
'''Watch an existing node for changes.
@ -2000,7 +1972,7 @@ class ZooKeeper(object):
return callback(node, deleted)
path = self._nodePath(node.id)
self.client.DataWatch(path, _callback_wrapper)
self.kazoo_client.DataWatch(path, _callback_wrapper)
def deleteRawNode(self, node_id):
'''
@ -2012,7 +1984,7 @@ class ZooKeeper(object):
'''
path = self._nodePath(node_id)
try:
self.client.delete(path, recursive=True)
self.kazoo_client.delete(path, recursive=True)
except kze.NoNodeError:
pass
@ -2031,7 +2003,7 @@ class ZooKeeper(object):
# anything so that we can detect a race condition where the
# lock is removed before the node deletion occurs.
node.state = DELETED
self.client.set(path, node.serialize())
self.kazoo_client.set(path, node.serialize())
self.deleteRawNode(node.id)
def getReadyNodesOfTypes(self, labels, cached=True):
@ -2248,7 +2220,7 @@ class ZooKeeper(object):
path = self._imageProviderPath(image, build)
path = "%s/%s" % (path, provider_name)
try:
self.client.delete(path, recursive=True)
self.kazoo_client.delete(path, recursive=True)
except kze.NoNodeError:
pass
@ -2389,7 +2361,7 @@ class ZooKeeper(object):
def getStatsElection(self, identifier):
path = self._electionPath('stats')
return Election(self.client, path, identifier)
return Election(self.kazoo_client, path, identifier)
def exportImageData(self):
'''
@ -2404,7 +2376,7 @@ class ZooKeeper(object):
for build_no in self.getBuildNumbers(image_name):
build_path = self._imageBuildsPath(image_name) + "/" + build_no
try:
build_data, stat = self.client.get(build_path)
build_data, stat = self.kazoo_client.get(build_path)
except kze.NoNodeError:
continue
ret[build_path] = build_data.decode('utf8')
@ -2416,7 +2388,8 @@ class ZooKeeper(object):
image_name, build_no, provider_name) + "/"
upload_path += upload_no
try:
upload_data, stat = self.client.get(upload_path)
upload_data, stat = self.kazoo_client.get(
upload_path)
except kze.NoNodeError:
continue
ret[upload_path] = upload_data.decode('utf8')
@ -2466,19 +2439,19 @@ class ZooKeeper(object):
highest_num[key] = max(highest_num.get(key, num), num)
for path, num in highest_num.items():
for x in range(num):
node = self.client.create(
node = self.kazoo_client.create(
path + '/', makepath=True, sequence=True)
# If this node isn't in our import data, go ahead and
# delete it.
if node not in import_data:
self.client.delete(node)
self.kazoo_client.delete(node)
for path, data in import_data.items():
# We may have already created a node above; in that
# case, just set the data on it.
if self.client.exists(path):
self.client.set(path,
if self.kazoo_client.exists(path):
self.kazoo_client.set(path,
value=data.encode('utf8'))
else:
self.client.create(path,
self.kazoo_client.create(path,
value=data.encode('utf8'),
makepath=True)

View File

@ -47,12 +47,12 @@ def join(a, b):
return a+'/'+b
def print_tree(node):
data, stat = zk.client.get(node)
data, stat = zk.kazoo_client.get(node)
print("Node: %s %s" % (node, stat))
if data:
print(data)
for child in zk.client.get_children(node):
for child in zk.kazoo_client.get_children(node):
print()
print_tree(join(node, child))