From 4d9ce3d845c4d39667a6b6cd00dfb7599171152f Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 5 Feb 2014 00:12:43 -0800 Subject: [PATCH] Move some common/to be shared kazoo utils to kazoo_utils Stopping and closing in a way that will avoid the current kazoo bug will be a common utility function so move it to kazoo_utils. The other common utility function will be version checking so move that as well to kazoo_utils (and improve it to allow for checking against a max version). Change-Id: I81dfd9c01a00d7491b973d65d3b38f61b29dd298 --- taskflow/exceptions.py | 5 +++ .../persistence/backends/impl_zookeeper.py | 19 ++------- taskflow/utils/kazoo_utils.py | 42 +++++++++++++++++++ 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 1fa17675..2f106ef3 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -24,6 +24,11 @@ class TaskFlowException(Exception): pass +class IncompatibleVersion(TaskFlowException): + """Raised when some type of version incompatibility occurs.""" + pass + + class ConnectionFailure(TaskFlowException): """Raised when some type of connection can not be opened or is lost.""" pass diff --git a/taskflow/persistence/backends/impl_zookeeper.py b/taskflow/persistence/backends/impl_zookeeper.py index 3ede5938..7f8dea2a 100644 --- a/taskflow/persistence/backends/impl_zookeeper.py +++ b/taskflow/persistence/backends/impl_zookeeper.py @@ -78,16 +78,9 @@ class ZkBackend(base.Backend): if not self._owned: return try: - self._client.stop() + k_utils.finalize_client(self._client) except (k_exc.KazooException, k_exc.ZookeeperError) as e: - raise exc.StorageError("Unable to stop client: %s" % e) - try: - self._client.close() - except TypeError: - # NOTE(harlowja): https://github.com/python-zk/kazoo/issues/167 - pass - except (k_exc.KazooException, k_exc.ZookeeperError) as e: - raise exc.StorageError("Unable to close client: %s" % e) + raise exc.StorageError("Unable to finalize client", e) class ZkConnection(base.Connection): @@ -103,13 +96,7 @@ class ZkConnection(base.Connection): def validate(self): with self._exc_wrapper(): - zk_ver = self._client.server_version() - if tuple(zk_ver) < MIN_ZK_VERSION: - given_zk_ver = ".".join([str(a) for a in zk_ver]) - desired_zk_ver = ".".join([str(a) for a in MIN_ZK_VERSION]) - raise exc.StorageError("Incompatible zookeeper version" - " %s detected, zookeeper >= %s required" - % (given_zk_ver, desired_zk_ver)) + k_utils.check_compatible(self._client, MIN_ZK_VERSION) @property def backend(self): diff --git a/taskflow/utils/kazoo_utils.py b/taskflow/utils/kazoo_utils.py index 0bd91402..afc4921e 100644 --- a/taskflow/utils/kazoo_utils.py +++ b/taskflow/utils/kazoo_utils.py @@ -19,6 +19,8 @@ from kazoo import client import six +from taskflow import exceptions as exc + def _parse_hosts(hosts): if isinstance(hosts, six.string_types): @@ -33,6 +35,46 @@ def _parse_hosts(hosts): return hosts +def finalize_client(client): + """Stops and closes a client, even if it wasn't started.""" + client.stop() + try: + client.close() + except TypeError: + # NOTE(harlowja): https://github.com/python-zk/kazoo/issues/167 + # + # This can be removed after that one is fixed/merged. + pass + + +def check_compatible(client, min_version=None, max_version=None): + """Checks if a kazook client is backed by a zookeeper server version + that satisfies a given min (inclusive) and max (inclusive) version range. + """ + server_version = None + if min_version: + server_version = tuple((int(a) for a in client.server_version())) + min_version = tuple((int(a) for a in min_version)) + if server_version < min_version: + pretty_server_version = ".".join([str(a) for a in server_version]) + min_version = ".".join([str(a) for a in min_version]) + raise exc.IncompatibleVersion("Incompatible zookeeper version" + " %s detected, zookeeper >= %s" + " required" % (pretty_server_version, + min_version)) + if max_version: + if server_version is None: + server_version = tuple((int(a) for a in client.server_version())) + max_version = tuple((int(a) for a in max_version)) + if server_version > max_version: + pretty_server_version = ".".join([str(a) for a in server_version]) + max_version = ".".join([str(a) for a in max_version]) + raise exc.IncompatibleVersion("Incompatible zookeeper version" + " %s detected, zookeeper <= %s" + " required" % (pretty_server_version, + max_version)) + + def make_client(conf): """Creates a kazoo client given a configuration dictionary.""" client_kwargs = {