diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 23759869..a6a05082 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -22,6 +22,11 @@ class TaskFlowException(Exception): pass +class IncompatibleVersion(TaskFlowException): + """Raised when some type of version incompatibility occurs.""" + pass + + class ConnectionFailure(TaskFlowException): """Raised when some type of connection can not be opened or is lost.""" pass diff --git a/taskflow/persistence/backends/impl_zookeeper.py b/taskflow/persistence/backends/impl_zookeeper.py index 56a4ad5a..382bbdff 100644 --- a/taskflow/persistence/backends/impl_zookeeper.py +++ b/taskflow/persistence/backends/impl_zookeeper.py @@ -76,16 +76,9 @@ class ZkBackend(base.Backend): if not self._owned: return try: - self._client.stop() + k_utils.finalize_client(self._client) except (k_exc.KazooException, k_exc.ZookeeperError) as e: - raise exc.StorageError("Unable to stop client: %s" % e) - try: - self._client.close() - except TypeError: - # NOTE(harlowja): https://github.com/python-zk/kazoo/issues/167 - pass - except (k_exc.KazooException, k_exc.ZookeeperError) as e: - raise exc.StorageError("Unable to close client: %s" % e) + raise exc.StorageError("Unable to finalize client", e) class ZkConnection(base.Connection): @@ -101,13 +94,7 @@ class ZkConnection(base.Connection): def validate(self): with self._exc_wrapper(): - zk_ver = self._client.server_version() - if tuple(zk_ver) < MIN_ZK_VERSION: - given_zk_ver = ".".join([str(a) for a in zk_ver]) - desired_zk_ver = ".".join([str(a) for a in MIN_ZK_VERSION]) - raise exc.StorageError("Incompatible zookeeper version" - " %s detected, zookeeper >= %s required" - % (given_zk_ver, desired_zk_ver)) + k_utils.check_compatible(self._client, MIN_ZK_VERSION) @property def backend(self): diff --git a/taskflow/utils/kazoo_utils.py b/taskflow/utils/kazoo_utils.py index bf30b5d7..ddf6bf62 100644 --- a/taskflow/utils/kazoo_utils.py +++ b/taskflow/utils/kazoo_utils.py @@ -17,6 +17,8 @@ from kazoo import client import six +from taskflow import exceptions as exc + def _parse_hosts(hosts): if isinstance(hosts, six.string_types): @@ -31,6 +33,46 @@ def _parse_hosts(hosts): return hosts +def finalize_client(client): + """Stops and closes a client, even if it wasn't started.""" + client.stop() + try: + client.close() + except TypeError: + # NOTE(harlowja): https://github.com/python-zk/kazoo/issues/167 + # + # This can be removed after that one is fixed/merged. + pass + + +def check_compatible(client, min_version=None, max_version=None): + """Checks if a kazook client is backed by a zookeeper server version + that satisfies a given min (inclusive) and max (inclusive) version range. + """ + server_version = None + if min_version: + server_version = tuple((int(a) for a in client.server_version())) + min_version = tuple((int(a) for a in min_version)) + if server_version < min_version: + pretty_server_version = ".".join([str(a) for a in server_version]) + min_version = ".".join([str(a) for a in min_version]) + raise exc.IncompatibleVersion("Incompatible zookeeper version" + " %s detected, zookeeper >= %s" + " required" % (pretty_server_version, + min_version)) + if max_version: + if server_version is None: + server_version = tuple((int(a) for a in client.server_version())) + max_version = tuple((int(a) for a in max_version)) + if server_version > max_version: + pretty_server_version = ".".join([str(a) for a in server_version]) + max_version = ".".join([str(a) for a in max_version]) + raise exc.IncompatibleVersion("Incompatible zookeeper version" + " %s detected, zookeeper <= %s" + " required" % (pretty_server_version, + max_version)) + + def make_client(conf): """Creates a kazoo client given a configuration dictionary.""" client_kwargs = {