diff --git a/nodepool/exceptions.py b/nodepool/exceptions.py index 9445b0146..93533923b 100644 --- a/nodepool/exceptions.py +++ b/nodepool/exceptions.py @@ -43,3 +43,9 @@ class ServerDeleteException(TimeoutException): class ImageCreateException(TimeoutException): statsd_key = 'error.imagetimeout' + +class ZKException(Exception): + pass + +class ZKLockException(ZKException): + pass diff --git a/nodepool/tests/test_zk.py b/nodepool/tests/test_zk.py new file mode 100644 index 000000000..8d6201a0e --- /dev/null +++ b/nodepool/tests/test_zk.py @@ -0,0 +1,173 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import testtools + +from nodepool import exceptions as npe +from nodepool import tests +from nodepool import zk + + +class TestZooKeeper(tests.ZKTestCase): + + def setUp(self): + super(TestZooKeeper, self).setUp() + self.zk = zk.ZooKeeper(self.zkclient) + + def test_buildZooKeeperHosts_single(self): + hosts = [ + dict(host='127.0.0.1', port=2181, chroot='/test1') + ] + self.assertEqual('127.0.0.1:2181/test1', + zk.buildZooKeeperHosts(hosts)) + + def test_buildZooKeeperHosts_multiple(self): + hosts = [ + dict(host='127.0.0.1', port=2181, chroot='/test1'), + dict(host='127.0.0.2', port=2182, chroot='/test2') + ] + self.assertEqual('127.0.0.1:2181/test1,127.0.0.2:2182/test2', + zk.buildZooKeeperHosts(hosts)) + + def test_getMaxBuildId(self): + test_root = self.zk._imageBuildsPath("ubuntu-trusty") + self.zk.client.create(test_root, makepath=True) + self.zk.client.create(test_root + "/1") + self.zk.client.create(test_root + "/10") + self.zk.client.create(test_root + "/3") + self.zk.client.create(test_root + "/22") + self.zk.client.create(test_root + "/lock") + + self.assertEqual(22, self.zk.getMaxBuildId("ubuntu-trusty")) + + def test_getMaxBuildId_not_found(self): + with testtools.ExpectedException( + npe.ZKException, "Image build path not found for .*" + ): + self.zk.getMaxBuildId("aaa") + + def test_getMaxImageUploadId(self): + image = "ubuntu-trusty" + build_number = 1 + provider = "rax" + + test_root = self.zk._imageUploadPath(image, build_number, provider) + self.zk.client.create(test_root, makepath=True) + self.zk.client.create(test_root + "/1") + self.zk.client.create(test_root + "/10") + self.zk.client.create(test_root + "/3") + self.zk.client.create(test_root + "/22") + + self.assertEqual(22, self.zk.getMaxImageUploadId(image, + build_number, + provider)) + + def test_getMaxImageUploadId_not_found(self): + with testtools.ExpectedException( + npe.ZKException, "Image upload path not found for .*" + ): + self.zk.getMaxImageUploadId("aaa", 1, "xyz") + + def test_imageBuildLock(self): + test_root = self.zk._imageBuildsPath("ubuntu-trusty") + self.zk.client.create(test_root, makepath=True) + self.zk.client.create(test_root + "/10") + + with self.zk.imageBuildLock("ubuntu-trusty", blocking=False) as e: + # Make sure the volume goes to 11 + self.assertEqual(11, e) + + def test_imageBuildLock_exception_nonblocking(self): + zk2 = zk.ZooKeeper() + zk2.connect([{'host': self.zookeeper_host, + 'port': self.zookeeper_port, + 'chroot': self.chroot_path}]) + with zk2.imageBuildLock("ubuntu-trusty", blocking=False): + with testtools.ExpectedException(npe.ZKLockException): + with self.zk.imageBuildLock("ubuntu-trusty", blocking=False): + pass + zk2.disconnect() + + def test_imageBuildLock_exception_blocking(self): + zk2 = zk.ZooKeeper() + zk2.connect([{'host': self.zookeeper_host, + 'port': self.zookeeper_port, + 'chroot': self.chroot_path}]) + with zk2.imageBuildLock("ubuntu-trusty", blocking=False): + with testtools.ExpectedException(npe.TimeoutException): + with self.zk.imageBuildLock("ubuntu-trusty", + blocking=True, + timeout=1): + pass + zk2.disconnect() + + def test_storeBuild_not_locked(self): + with testtools.ExpectedException(npe.ZKException): + self.zk.storeBuild("ubuntu-trusty", 123, "") + + def test_store_and_get_build(self): + orig_data = dict(builder="host", filename="file", state="state") + with self.zk.imageBuildLock("ubuntu-trusty", + blocking=True, + timeout=1) as build_num: + self.zk.storeBuild("ubuntu-trusty", build_num, orig_data) + + data = self.zk.getBuild("ubuntu-trusty", build_num) + self.assertEqual(orig_data, data) + + def test_getBuild_not_found(self): + with testtools.ExpectedException( + npe.ZKException, "Cannot find build data .*" + ): + self.zk.getBuild("ubuntu-trusty", 0) + + def test_getImageUpload_not_found(self): + image = "ubuntu-trusty" + build_number = 1 + provider = "rax" + test_root = self.zk._imageUploadPath(image, build_number, provider) + self.zk.client.create(test_root, makepath=True) + self.zk.client.create(test_root + "/1") + + with testtools.ExpectedException( + npe.ZKException, "Cannot find upload data .*" + ): + self.zk.getImageUpload(image, build_number, provider, 2) + + def test_storeImageUpload_invalid_build(self): + image = "ubuntu-trusty" + build_number = 1 + provider = "rax" + orig_data = dict(external_id="deadbeef", state="READY") + + with testtools.ExpectedException( + npe.ZKException, "Cannot find build .*" + ): + self.zk.storeImageUpload(image, build_number, provider, orig_data) + + def test_store_and_get_image_upload(self): + image = "ubuntu-trusty" + build_number = 1 + provider = "rax" + orig_data = dict(external_id="deadbeef", state="READY") + test_root = self.zk._imageUploadPath(image, build_number, provider) + self.zk.client.create(test_root, makepath=True) + + upload_id = self.zk.storeImageUpload(image, build_number, provider, + orig_data) + + # Should be the first upload + self.assertEqual(1, upload_id) + + data = self.zk.getImageUpload(image, build_number, provider, upload_id) + self.assertEqual(orig_data, data) diff --git a/nodepool/zk.py b/nodepool/zk.py new file mode 100644 index 000000000..e5c6eda80 --- /dev/null +++ b/nodepool/zk.py @@ -0,0 +1,403 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from contextlib import contextmanager +import json +from kazoo.client import KazooClient +from kazoo import exceptions as kze +from kazoo.recipe.lock import Lock + +from nodepool import exceptions as npe + + +def buildZooKeeperHosts(host_list): + ''' + Build the ZK cluster host list for client connections. + + :param list host_list: A list of dicts (one per server) defining + the ZooKeeper cluster servers. Keys for 'host', 'port', and + 'chroot' are expected. Only 'host' is required.'. E.g.:: + + [ + dict(host='192.168.0.2'), + dict(host='192.168.0.3', port=2181, chroot='/junk') + ] + ''' + if not isinstance(host_list, list): + raise Exception("'host_list' must be a list") + hosts = [] + for host_def in host_list: + host = host_def['host'] + if 'port' in host_def: + host = host + ":%s" % host_def['port'] + else: + host = host + ":2181" + if 'chroot' in host_def: + host = host + host_def['chroot'] + hosts.append(host) + return ",".join(hosts) + + +class ZooKeeper(object): + ''' + Class implementing the ZooKeeper interface. + + This class uses the facade design pattern to keep common interaction + with the ZooKeeper API simple and consistent for the caller, and + limits coupling between objects. It allows for more complex interactions + by providing direct access to the client connection when needed (though + that is discouraged). It also provides for a convenient entry point for + testing only ZooKeeper interactions. + + Most API calls reference an image name only, as the path for the znode + for that image is calculated automatically. And image names are assumed + to be unique. + + If you will have multiple threads needing this API, each thread should + instantiate their own ZooKeeper object. It should not be shared. + ''' + + IMAGE_ROOT = "/nodepool/image" + + def __init__(self, client=None): + ''' + Initialize the ZooKeeper object. + + :param client: A pre-connected client. Optionally, you may choose + to use the connect() call. + ''' + self.client = client + self._current_lock = None + + #======================================================================== + # Private Methods + #======================================================================== + + def _imagePath(self, image): + return "%s/%s" % (self.IMAGE_ROOT, image) + + def _imageBuildsPath(self, image): + return "%s/builds" % self._imagePath(image) + + def _imageLockPath(self, image): + return "%s/lock" % self._imageBuildsPath(image) + + def _imageUploadPath(self, image, build_number, provider): + return "%s/%s/provider/%s/images" % (self._imageBuildsPath(image), + build_number, + provider) + def _dictToStr(self, data): + return json.dumps(data) + + def _strToDict(self, data): + return json.loads(data) + + def _getImageLock(self, image, blocking=True, timeout=None): + # If we don't already have a znode for this image, create it. + image_lock = self._imageLockPath(image) + try: + self.client.ensure_path(self._imagePath(image)) + self._current_lock = Lock(self.client, image_lock) + have_lock = self._current_lock.acquire(blocking, timeout) + except kze.LockTimeout: + raise npe.TimeoutException( + "Timeout trying to acquire lock %s" % image_lock) + + # If we aren't blocking, it's possible we didn't get the lock + # because someone else has it. + if not have_lock: + raise npe.ZKLockException("Did not get lock on %s" % image_lock) + + def _getImageBuildLock(self, image, blocking=True, timeout=None): + ''' + This differs from _get_image_lock() in that it creates a new build + znode and returns its name to the caller. + ''' + self._getImageLock(image, blocking, timeout) + + # Create new znode with new build_number + build_number = self.getMaxBuildId(image) + 1 + self.client.create( + self._imageBuildsPath(image) + "/%s" % build_number + ) + + return build_number + + #======================================================================== + # Public Methods + #======================================================================== + + def connect(self, host_list, read_only=False): + ''' + Establish a connection with ZooKeeper cluster. + + Convenience method if a pre-existing ZooKeeper connection is not + supplied to the ZooKeeper object at instantiation time. + + :param list host_list: A list of dicts (one per server) defining + the ZooKeeper cluster servers. + + :param bool read_only: If True, establishes a read-only connection. + ''' + if not self.client: + hosts = buildZooKeeperHosts(host_list) + self.client = KazooClient(hosts=hosts, read_only=read_only) + self.client.start() + + def disconnect(self): + ''' + Close the ZooKeeper cluster connection. + + You should call this method if you used connect() to establish a + cluster connection. + ''' + if self.client: + self.client.stop() + + def getMaxBuildId(self, image): + ''' + Find the highest build number for a given image. + + Image builds are integer znodes, which are children of the 'builds' + parent znode. + + :param str image: The image name. + + :returns: An int value for the max existing image build number, or + zero if none exist. + + :raises: ZKException if the image build path is not found. + ''' + path = self._imageBuildsPath(image) + + if not self.client.exists(path): + raise npe.ZKException( + "Image build path not found for image %s" % image + ) + + max_found = 0 + children = self.client.get_children(path) + if children: + for child in children: + # There can be a lock znode that we should ignore + if child != 'lock': + max_found = max(max_found, int(child)) + return max_found + + def getMaxImageUploadId(self, image, build_number, provider): + ''' + Find the highest image upload number for a given image for a provider. + + For a given image build, it may have been uploaded one or more times + to a provider (with once being the most common case). Each upload is + given its own znode, which is a integer increased by one for each + upload. This method gets the highest numbered znode. + + :param str image: The image name. + :param int build_number: The image build number. + :param str provider: The provider name owning the image. + + :returns: An int value for the max existing image upload number, or + zero if none exist. + + :raises: ZKException if the image upload path is not found. + ''' + path = self._imageUploadPath(image, build_number, provider) + + if not self.client.exists(path): + raise npe.ZKException( + "Image upload path not found for build %s of image %s" % ( + build_number, provider) + ) + + max_found = 0 + children = self.client.get_children(path ) + if children: + max_found = max([int(child) for child in children]) + return max_found + + @contextmanager + def imageLock(self, image, blocking=True, timeout=None): + ''' + Context manager to use for locking an image. + + Obtains a write lock for the specified image. A thread of control + using this API may have only one image locked at a time. This is + different from image_build_lock() in that a new build node is NOT + created and returned. + + :param str image: Name of the image to lock + :param bool blocking: Whether or not to block on trying to + acquire the lock + :param int timeout: When blocking, how long to wait for the lock + to get acquired. None, the default, waits forever. + + :raises: TimeoutException if we failed to acquire the lock when + blocking with a timeout. ZKLockException if we are not blocking + and could not get the lock, or a lock is already held. + ''' + if self._current_lock: + raise npe.ZKLockException("A lock is already held.") + + try: + yield self._getImageLock(image, blocking, timeout) + finally: + if self._current_lock: + self._current_lock.release() + self._current_lock = None + + @contextmanager + def imageBuildLock(self, image, blocking=True, timeout=None): + ''' + Context manager to use for locking new image builds. + + Obtains a write lock for the specified image. A thread of control + using this API may have only one image locked at a time. A new + znode is created with the next highest build number. This build + number is returned to the caller. + + :param str image: Name of the image to lock + :param bool blocking: Whether or not to block on trying to + acquire the lock + :param int timeout: When blocking, how long to wait for the lock + to get acquired. None, the default, waits forever. + + :returns: A integer to use for the new build id. + + :raises: TimeoutException if we failed to acquire the lock when + blocking with a timeout. ZKLockException if we are not blocking + and could not get the lock, or a lock is already held. + ''' + if self._current_lock: + raise npe.ZKLockException("A lock is already held.") + + try: + yield self._getImageBuildLock(image, blocking, timeout) + finally: + if self._current_lock: + self._current_lock.release() + self._current_lock = None + + def getBuild(self, image, build_number): + ''' + Retrieve the image build data. + + :param str image: The image name. + :param int build_number: The image build number. + + :returns: The dictionary of build data. + ''' + path = self._imageBuildsPath(image) + "/%s" % build_number + + if not self.client.exists(path): + raise npe.ZKException( + "Cannot find build data (image: %s, build: %s)" % ( + image, build_number) + ) + + data, stat = self.client.get(path) + return self._strToDict(data) + + def storeBuild(self, image, build_number, build_data): + ''' + Store the image build data. + + The build data is either created if it does not exist, or it is + updated in its entirety if it does not. There is no partial updating. + The build data is expected to be represented as a dict. This dict may + contain any data, as appropriate. + + :param str image: The image name for which we have data. + :param int build_number: The image build number. + :param dict build_data: The build data. + + :raises: ZKException if the build znode does not exist (it is created + with the image_build_lock() context manager). + ''' + path = self._imageBuildsPath(image) + "/%s" % build_number + + # The build path won't exist until it's created with the build lock + if not self.client.exists(path): + raise npe.ZKException( + "%s does not exist. Did you lock it?" % path) + + self.client.set(path, self._dictToStr(build_data)) + + def getImageUpload(self, image, build_number, provider, + upload_number=None): + ''' + Retrieve the image upload data. + + :param str image: The image name. + :param int build_number: The image build number. + :param str provider: The provider name owning the image. + :param int build_number: The image upload number. If this is None, + the most recent upload data is returned. + + :returns: A dict of upload data. + + :raises: ZKException if the image upload path is not found. + ''' + if upload_number is None: + upload_number = self.getMaxImageUploadId(image, build_number, + provider) + + path = self._imageUploadPath(image, build_number, provider) + path = path + "/%s" % upload_number + + if not self.client.exists(path): + raise npe.ZKException( + "Cannot find upload data " + "(image: %s, build: %s, provider: %s, upload: %s)" % ( + image, build_number, provider, upload_number) + ) + + data, stat = self.client.get(path) + return self._strToDict(data) + + def storeImageUpload(self, image, build_number, provider, image_data): + ''' + Store the built image's upload data for the given provider. + + :param str image: The image name for which we have data. + :param int build_number: The image build number. + :param str provider: The provider name owning the image. + :param dict image_data: The image data we want to store. + + :returns: An int for the new upload id. + + :raises: ZKException for an invalid image build. + ''' + # We expect the image builds path to already exist. + build_path = self._imageBuildsPath(image) + if not self.client.exists(build_path): + raise npe.ZKException( + "Cannot find build %s of image %s" % (build_number, provider) + ) + + # Generate a path for the upload. This doesn't have to exist yet + # since we'll create new provider/upload ID znodes automatically. + path = self._imageUploadPath(image, build_number, provider) + + # We need to create the provider upload path if it doesn't exist + # before we attempt to get the max image upload ID next. + self.client.ensure_path(path) + + # Get a new upload ID + next_id = self.getMaxImageUploadId(image, build_number, provider) + 1 + + path = path + "/%s" % next_id + self.client.create(path, self._dictToStr(image_data)) + + return next_id