Add initial ZooKeeper API

This implements the API necessary to perform the ZooKeeper functionality
outlined in the "Nodepool: Use ZooKeeper for Workers" spec:

   http://specs.openstack.org/openstack-infra/infra-specs/specs/nodepool-zookeeper-workers.html

This API is not used yet, but will be used and modified where necessary
in upcoming reviews based on this work.

Change-Id: I681722a1f2dc3fe13efa2baa3a1a7acd1cbe50ee
This commit is contained in:
David Shrewsbury 2016-07-11 23:12:16 -04:00
parent 0cf4484eea
commit 8cbe6bb4ca
3 changed files with 582 additions and 0 deletions

View File

@ -43,3 +43,9 @@ class ServerDeleteException(TimeoutException):
class ImageCreateException(TimeoutException):
statsd_key = 'error.imagetimeout'
class ZKException(Exception):
pass
class ZKLockException(ZKException):
pass

173
nodepool/tests/test_zk.py Normal file
View File

@ -0,0 +1,173 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
from nodepool import exceptions as npe
from nodepool import tests
from nodepool import zk
class TestZooKeeper(tests.ZKTestCase):
def setUp(self):
super(TestZooKeeper, self).setUp()
self.zk = zk.ZooKeeper(self.zkclient)
def test_buildZooKeeperHosts_single(self):
hosts = [
dict(host='127.0.0.1', port=2181, chroot='/test1')
]
self.assertEqual('127.0.0.1:2181/test1',
zk.buildZooKeeperHosts(hosts))
def test_buildZooKeeperHosts_multiple(self):
hosts = [
dict(host='127.0.0.1', port=2181, chroot='/test1'),
dict(host='127.0.0.2', port=2182, chroot='/test2')
]
self.assertEqual('127.0.0.1:2181/test1,127.0.0.2:2182/test2',
zk.buildZooKeeperHosts(hosts))
def test_getMaxBuildId(self):
test_root = self.zk._imageBuildsPath("ubuntu-trusty")
self.zk.client.create(test_root, makepath=True)
self.zk.client.create(test_root + "/1")
self.zk.client.create(test_root + "/10")
self.zk.client.create(test_root + "/3")
self.zk.client.create(test_root + "/22")
self.zk.client.create(test_root + "/lock")
self.assertEqual(22, self.zk.getMaxBuildId("ubuntu-trusty"))
def test_getMaxBuildId_not_found(self):
with testtools.ExpectedException(
npe.ZKException, "Image build path not found for .*"
):
self.zk.getMaxBuildId("aaa")
def test_getMaxImageUploadId(self):
image = "ubuntu-trusty"
build_number = 1
provider = "rax"
test_root = self.zk._imageUploadPath(image, build_number, provider)
self.zk.client.create(test_root, makepath=True)
self.zk.client.create(test_root + "/1")
self.zk.client.create(test_root + "/10")
self.zk.client.create(test_root + "/3")
self.zk.client.create(test_root + "/22")
self.assertEqual(22, self.zk.getMaxImageUploadId(image,
build_number,
provider))
def test_getMaxImageUploadId_not_found(self):
with testtools.ExpectedException(
npe.ZKException, "Image upload path not found for .*"
):
self.zk.getMaxImageUploadId("aaa", 1, "xyz")
def test_imageBuildLock(self):
test_root = self.zk._imageBuildsPath("ubuntu-trusty")
self.zk.client.create(test_root, makepath=True)
self.zk.client.create(test_root + "/10")
with self.zk.imageBuildLock("ubuntu-trusty", blocking=False) as e:
# Make sure the volume goes to 11
self.assertEqual(11, e)
def test_imageBuildLock_exception_nonblocking(self):
zk2 = zk.ZooKeeper()
zk2.connect([{'host': self.zookeeper_host,
'port': self.zookeeper_port,
'chroot': self.chroot_path}])
with zk2.imageBuildLock("ubuntu-trusty", blocking=False):
with testtools.ExpectedException(npe.ZKLockException):
with self.zk.imageBuildLock("ubuntu-trusty", blocking=False):
pass
zk2.disconnect()
def test_imageBuildLock_exception_blocking(self):
zk2 = zk.ZooKeeper()
zk2.connect([{'host': self.zookeeper_host,
'port': self.zookeeper_port,
'chroot': self.chroot_path}])
with zk2.imageBuildLock("ubuntu-trusty", blocking=False):
with testtools.ExpectedException(npe.TimeoutException):
with self.zk.imageBuildLock("ubuntu-trusty",
blocking=True,
timeout=1):
pass
zk2.disconnect()
def test_storeBuild_not_locked(self):
with testtools.ExpectedException(npe.ZKException):
self.zk.storeBuild("ubuntu-trusty", 123, "")
def test_store_and_get_build(self):
orig_data = dict(builder="host", filename="file", state="state")
with self.zk.imageBuildLock("ubuntu-trusty",
blocking=True,
timeout=1) as build_num:
self.zk.storeBuild("ubuntu-trusty", build_num, orig_data)
data = self.zk.getBuild("ubuntu-trusty", build_num)
self.assertEqual(orig_data, data)
def test_getBuild_not_found(self):
with testtools.ExpectedException(
npe.ZKException, "Cannot find build data .*"
):
self.zk.getBuild("ubuntu-trusty", 0)
def test_getImageUpload_not_found(self):
image = "ubuntu-trusty"
build_number = 1
provider = "rax"
test_root = self.zk._imageUploadPath(image, build_number, provider)
self.zk.client.create(test_root, makepath=True)
self.zk.client.create(test_root + "/1")
with testtools.ExpectedException(
npe.ZKException, "Cannot find upload data .*"
):
self.zk.getImageUpload(image, build_number, provider, 2)
def test_storeImageUpload_invalid_build(self):
image = "ubuntu-trusty"
build_number = 1
provider = "rax"
orig_data = dict(external_id="deadbeef", state="READY")
with testtools.ExpectedException(
npe.ZKException, "Cannot find build .*"
):
self.zk.storeImageUpload(image, build_number, provider, orig_data)
def test_store_and_get_image_upload(self):
image = "ubuntu-trusty"
build_number = 1
provider = "rax"
orig_data = dict(external_id="deadbeef", state="READY")
test_root = self.zk._imageUploadPath(image, build_number, provider)
self.zk.client.create(test_root, makepath=True)
upload_id = self.zk.storeImageUpload(image, build_number, provider,
orig_data)
# Should be the first upload
self.assertEqual(1, upload_id)
data = self.zk.getImageUpload(image, build_number, provider, upload_id)
self.assertEqual(orig_data, data)

403
nodepool/zk.py Normal file
View File

@ -0,0 +1,403 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from contextlib import contextmanager
import json
from kazoo.client import KazooClient
from kazoo import exceptions as kze
from kazoo.recipe.lock import Lock
from nodepool import exceptions as npe
def buildZooKeeperHosts(host_list):
'''
Build the ZK cluster host list for client connections.
:param list host_list: A list of dicts (one per server) defining
the ZooKeeper cluster servers. Keys for 'host', 'port', and
'chroot' are expected. Only 'host' is required.'. E.g.::
[
dict(host='192.168.0.2'),
dict(host='192.168.0.3', port=2181, chroot='/junk')
]
'''
if not isinstance(host_list, list):
raise Exception("'host_list' must be a list")
hosts = []
for host_def in host_list:
host = host_def['host']
if 'port' in host_def:
host = host + ":%s" % host_def['port']
else:
host = host + ":2181"
if 'chroot' in host_def:
host = host + host_def['chroot']
hosts.append(host)
return ",".join(hosts)
class ZooKeeper(object):
'''
Class implementing the ZooKeeper interface.
This class uses the facade design pattern to keep common interaction
with the ZooKeeper API simple and consistent for the caller, and
limits coupling between objects. It allows for more complex interactions
by providing direct access to the client connection when needed (though
that is discouraged). It also provides for a convenient entry point for
testing only ZooKeeper interactions.
Most API calls reference an image name only, as the path for the znode
for that image is calculated automatically. And image names are assumed
to be unique.
If you will have multiple threads needing this API, each thread should
instantiate their own ZooKeeper object. It should not be shared.
'''
IMAGE_ROOT = "/nodepool/image"
def __init__(self, client=None):
'''
Initialize the ZooKeeper object.
:param client: A pre-connected client. Optionally, you may choose
to use the connect() call.
'''
self.client = client
self._current_lock = None
#========================================================================
# Private Methods
#========================================================================
def _imagePath(self, image):
return "%s/%s" % (self.IMAGE_ROOT, image)
def _imageBuildsPath(self, image):
return "%s/builds" % self._imagePath(image)
def _imageLockPath(self, image):
return "%s/lock" % self._imageBuildsPath(image)
def _imageUploadPath(self, image, build_number, provider):
return "%s/%s/provider/%s/images" % (self._imageBuildsPath(image),
build_number,
provider)
def _dictToStr(self, data):
return json.dumps(data)
def _strToDict(self, data):
return json.loads(data)
def _getImageLock(self, image, blocking=True, timeout=None):
# If we don't already have a znode for this image, create it.
image_lock = self._imageLockPath(image)
try:
self.client.ensure_path(self._imagePath(image))
self._current_lock = Lock(self.client, image_lock)
have_lock = self._current_lock.acquire(blocking, timeout)
except kze.LockTimeout:
raise npe.TimeoutException(
"Timeout trying to acquire lock %s" % image_lock)
# If we aren't blocking, it's possible we didn't get the lock
# because someone else has it.
if not have_lock:
raise npe.ZKLockException("Did not get lock on %s" % image_lock)
def _getImageBuildLock(self, image, blocking=True, timeout=None):
'''
This differs from _get_image_lock() in that it creates a new build
znode and returns its name to the caller.
'''
self._getImageLock(image, blocking, timeout)
# Create new znode with new build_number
build_number = self.getMaxBuildId(image) + 1
self.client.create(
self._imageBuildsPath(image) + "/%s" % build_number
)
return build_number
#========================================================================
# Public Methods
#========================================================================
def connect(self, host_list, read_only=False):
'''
Establish a connection with ZooKeeper cluster.
Convenience method if a pre-existing ZooKeeper connection is not
supplied to the ZooKeeper object at instantiation time.
:param list host_list: A list of dicts (one per server) defining
the ZooKeeper cluster servers.
:param bool read_only: If True, establishes a read-only connection.
'''
if not self.client:
hosts = buildZooKeeperHosts(host_list)
self.client = KazooClient(hosts=hosts, read_only=read_only)
self.client.start()
def disconnect(self):
'''
Close the ZooKeeper cluster connection.
You should call this method if you used connect() to establish a
cluster connection.
'''
if self.client:
self.client.stop()
def getMaxBuildId(self, image):
'''
Find the highest build number for a given image.
Image builds are integer znodes, which are children of the 'builds'
parent znode.
:param str image: The image name.
:returns: An int value for the max existing image build number, or
zero if none exist.
:raises: ZKException if the image build path is not found.
'''
path = self._imageBuildsPath(image)
if not self.client.exists(path):
raise npe.ZKException(
"Image build path not found for image %s" % image
)
max_found = 0
children = self.client.get_children(path)
if children:
for child in children:
# There can be a lock znode that we should ignore
if child != 'lock':
max_found = max(max_found, int(child))
return max_found
def getMaxImageUploadId(self, image, build_number, provider):
'''
Find the highest image upload number for a given image for a provider.
For a given image build, it may have been uploaded one or more times
to a provider (with once being the most common case). Each upload is
given its own znode, which is a integer increased by one for each
upload. This method gets the highest numbered znode.
:param str image: The image name.
:param int build_number: The image build number.
:param str provider: The provider name owning the image.
:returns: An int value for the max existing image upload number, or
zero if none exist.
:raises: ZKException if the image upload path is not found.
'''
path = self._imageUploadPath(image, build_number, provider)
if not self.client.exists(path):
raise npe.ZKException(
"Image upload path not found for build %s of image %s" % (
build_number, provider)
)
max_found = 0
children = self.client.get_children(path )
if children:
max_found = max([int(child) for child in children])
return max_found
@contextmanager
def imageLock(self, image, blocking=True, timeout=None):
'''
Context manager to use for locking an image.
Obtains a write lock for the specified image. A thread of control
using this API may have only one image locked at a time. This is
different from image_build_lock() in that a new build node is NOT
created and returned.
:param str image: Name of the image to lock
:param bool blocking: Whether or not to block on trying to
acquire the lock
:param int timeout: When blocking, how long to wait for the lock
to get acquired. None, the default, waits forever.
:raises: TimeoutException if we failed to acquire the lock when
blocking with a timeout. ZKLockException if we are not blocking
and could not get the lock, or a lock is already held.
'''
if self._current_lock:
raise npe.ZKLockException("A lock is already held.")
try:
yield self._getImageLock(image, blocking, timeout)
finally:
if self._current_lock:
self._current_lock.release()
self._current_lock = None
@contextmanager
def imageBuildLock(self, image, blocking=True, timeout=None):
'''
Context manager to use for locking new image builds.
Obtains a write lock for the specified image. A thread of control
using this API may have only one image locked at a time. A new
znode is created with the next highest build number. This build
number is returned to the caller.
:param str image: Name of the image to lock
:param bool blocking: Whether or not to block on trying to
acquire the lock
:param int timeout: When blocking, how long to wait for the lock
to get acquired. None, the default, waits forever.
:returns: A integer to use for the new build id.
:raises: TimeoutException if we failed to acquire the lock when
blocking with a timeout. ZKLockException if we are not blocking
and could not get the lock, or a lock is already held.
'''
if self._current_lock:
raise npe.ZKLockException("A lock is already held.")
try:
yield self._getImageBuildLock(image, blocking, timeout)
finally:
if self._current_lock:
self._current_lock.release()
self._current_lock = None
def getBuild(self, image, build_number):
'''
Retrieve the image build data.
:param str image: The image name.
:param int build_number: The image build number.
:returns: The dictionary of build data.
'''
path = self._imageBuildsPath(image) + "/%s" % build_number
if not self.client.exists(path):
raise npe.ZKException(
"Cannot find build data (image: %s, build: %s)" % (
image, build_number)
)
data, stat = self.client.get(path)
return self._strToDict(data)
def storeBuild(self, image, build_number, build_data):
'''
Store the image build data.
The build data is either created if it does not exist, or it is
updated in its entirety if it does not. There is no partial updating.
The build data is expected to be represented as a dict. This dict may
contain any data, as appropriate.
:param str image: The image name for which we have data.
:param int build_number: The image build number.
:param dict build_data: The build data.
:raises: ZKException if the build znode does not exist (it is created
with the image_build_lock() context manager).
'''
path = self._imageBuildsPath(image) + "/%s" % build_number
# The build path won't exist until it's created with the build lock
if not self.client.exists(path):
raise npe.ZKException(
"%s does not exist. Did you lock it?" % path)
self.client.set(path, self._dictToStr(build_data))
def getImageUpload(self, image, build_number, provider,
upload_number=None):
'''
Retrieve the image upload data.
:param str image: The image name.
:param int build_number: The image build number.
:param str provider: The provider name owning the image.
:param int build_number: The image upload number. If this is None,
the most recent upload data is returned.
:returns: A dict of upload data.
:raises: ZKException if the image upload path is not found.
'''
if upload_number is None:
upload_number = self.getMaxImageUploadId(image, build_number,
provider)
path = self._imageUploadPath(image, build_number, provider)
path = path + "/%s" % upload_number
if not self.client.exists(path):
raise npe.ZKException(
"Cannot find upload data "
"(image: %s, build: %s, provider: %s, upload: %s)" % (
image, build_number, provider, upload_number)
)
data, stat = self.client.get(path)
return self._strToDict(data)
def storeImageUpload(self, image, build_number, provider, image_data):
'''
Store the built image's upload data for the given provider.
:param str image: The image name for which we have data.
:param int build_number: The image build number.
:param str provider: The provider name owning the image.
:param dict image_data: The image data we want to store.
:returns: An int for the new upload id.
:raises: ZKException for an invalid image build.
'''
# We expect the image builds path to already exist.
build_path = self._imageBuildsPath(image)
if not self.client.exists(build_path):
raise npe.ZKException(
"Cannot find build %s of image %s" % (build_number, provider)
)
# Generate a path for the upload. This doesn't have to exist yet
# since we'll create new provider/upload ID znodes automatically.
path = self._imageUploadPath(image, build_number, provider)
# We need to create the provider upload path if it doesn't exist
# before we attempt to get the max image upload ID next.
self.client.ensure_path(path)
# Get a new upload ID
next_id = self.getMaxImageUploadId(image, build_number, provider) + 1
path = path + "/%s" % next_id
self.client.create(path, self._dictToStr(image_data))
return next_id