diff --git a/taskflow/persistence/backends/impl_zookeeper.py b/taskflow/persistence/backends/impl_zookeeper.py index a3b8e35b..11b41de6 100644 --- a/taskflow/persistence/backends/impl_zookeeper.py +++ b/taskflow/persistence/backends/impl_zookeeper.py @@ -19,20 +19,22 @@ import contextlib import logging -from kazoo import client as kazoo_client from kazoo import exceptions as k_exc from kazoo.protocol import paths -from zake import fake_client from taskflow import exceptions as exc from taskflow.openstack.common import jsonutils from taskflow.persistence.backends import base from taskflow.persistence import logbook +from taskflow.utils import kazoo_utils as k_utils from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils LOG = logging.getLogger(__name__) +# Transaction support was added in 3.4.0 +MIN_ZK_VERSION = (3, 4, 0) + class ZkBackend(base.Backend): """ZooKeeper as backend storage implementation @@ -43,63 +45,79 @@ class ZkBackend(base.Backend): "hosts": "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181", "path": "/taskflow", } - - Example conf (use Zake): - - conf = { - "path": "/taskflow", - } """ - def __init__(self, conf): + def __init__(self, conf, client=None): super(ZkBackend, self).__init__(conf) - path = conf.get("path", "/taskflow") + path = str(conf.get("path", "/taskflow")) + if not path: + raise ValueError("Empty zookeeper path is disallowed") + if not paths.isabs(path): + raise ValueError("Zookeeper path must be absolute") self._path = path - hosts = conf.get("hosts", None) - # if no specified hosts, use zake to fake - if hosts is None: - self._zk = fake_client.FakeClient() - # otherwise use Kazoo + if client is not None: + self._zk = client + self._owned = False else: - self._zk = kazoo_client.KazooClient(hosts=hosts) - - @property - def zk(self): - return self._zk + self._zk = k_utils.make_client(conf) + self._owned = True + self._validated = False @property def path(self): return self._path def get_connection(self): - return ZkConnection(self) + conn = ZkConnection(self, self._zk) + if not self._validated: + conn.validate() + self._validated = True + return conn def close(self): - self.zk.stop() - self.zk.close() + self._validated = False + if not self._owned: + return + try: + self._zk.stop() + except (k_exc.KazooException, k_exc.ZookeeperError) as e: + raise exc.StorageError("Unable to stop client: %s" % e) + try: + self._zk.close() + except TypeError: + # NOTE(harlowja): https://github.com/python-zk/kazoo/issues/167 + pass + except (k_exc.KazooException, k_exc.ZookeeperError) as e: + raise exc.StorageError("Unable to close client: %s" % e) class ZkConnection(base.Connection): - def __init__(self, backend): + def __init__(self, backend, client): self._backend = backend - - self._log_path = paths.join(self._backend.path, "log_books") + self._client = client + self._book_path = paths.join(self._backend.path, "books") self._flow_path = paths.join(self._backend.path, "flow_details") self._task_path = paths.join(self._backend.path, "task_details") - with self._exc_wrapper(): - self._backend.zk.start() - self._backend.zk.ensure_path(self._backend.path) - self._backend.zk.ensure_path(self.log_path) - self._backend.zk.ensure_path(self.flow_path) - self._backend.zk.ensure_path(self.task_path) + # NOOP if already started. + self._client.start() + + def validate(self): + with self._exc_wrapper(): + zk_ver = self._client.server_version() + if tuple(zk_ver) < MIN_ZK_VERSION: + given_zk_ver = ".".join([str(a) for a in zk_ver]) + desired_zk_ver = ".".join([str(a) for a in MIN_ZK_VERSION]) + raise exc.StorageError("Incompatible zookeeper version" + " %s detected, zookeeper >= %s required" + % (given_zk_ver, desired_zk_ver)) @property def backend(self): return self._backend @property - def log_path(self): - return self._log_path + def book_path(self): + return self._book_path @property def flow_path(self): @@ -113,7 +131,10 @@ class ZkConnection(base.Connection): pass def upgrade(self): - pass + """Creates the initial paths (if they already don't exist).""" + with self._exc_wrapper(): + for path in (self.book_path, self.flow_path, self.task_path): + self._client.ensure_path(path) @contextlib.contextmanager def _exc_wrapper(self): @@ -122,264 +143,262 @@ class ZkConnection(base.Connection): """ try: yield - except self._backend.zk.handler.timeout_exception as e: - raise exc.ConnectionFailure("Backend error: %s" % e) + except self._client.handler.timeout_exception as e: + raise exc.ConnectionFailure("Storage backend timeout: %s" % e) except k_exc.SessionExpiredError as e: - raise exc.ConnectionFailure("Backend error: %s" % e) + raise exc.ConnectionFailure("Storage backend session" + " has expired: %s" % e) except k_exc.NoNodeError as e: - raise exc.NotFound("Backend error: %s" % e) + raise exc.NotFound("Storage backend node not found: %s" % e) except k_exc.NodeExistsError as e: - raise exc.AlreadyExists("Backend error: %s" % e) + raise exc.AlreadyExists("Storage backend duplicate node: %s" % e) except (k_exc.KazooException, k_exc.ZookeeperError) as e: - raise exc.TaskFlowException("Backend error: %s" % e) + raise exc.StorageError("Storage backend internal error: %s" % e) def update_task_details(self, td): - """Update a task_detail transactionally. - """ + """Update a task_detail transactionally.""" with self._exc_wrapper(): - with self.backend.zk.transaction() as txn: + with self._client.transaction() as txn: return self._update_task_details(td, txn) def _update_task_details(self, td, txn, create_missing=False): - zk_path = paths.join(self.task_path, td.uuid) - - # determine whether the desired data exists or not + # Determine whether the desired data exists or not. + td_path = paths.join(self.task_path, td.uuid) try: - zk_data, _zstat = self.backend.zk.get(zk_path) + td_data, _zstat = self._client.get(td_path) except k_exc.NoNodeError: - # Not-existent: create or raise exception + # Not-existent: create or raise exception. if create_missing: - txn.create(zk_path) + txn.create(td_path) e_td = logbook.TaskDetail(name=td.name, uuid=td.uuid) else: raise exc.NotFound("No task details found with id: %s" % td.uuid) - # Existent: read it out else: + # Existent: read it out. e_td = p_utils.unformat_task_detail(td.uuid, - misc.decode_json(zk_data)) + misc.decode_json(td_data)) # Update and write it back e_td = p_utils.task_details_merge(e_td, td) - zk_data = misc.binary_encode(jsonutils.dumps( - p_utils.format_task_detail(e_td))) - txn.set_data(zk_path, zk_data) + td_data = p_utils.format_task_detail(e_td) + txn.set_data(td_path, misc.binary_encode(jsonutils.dumps(td_data))) return e_td def get_task_details(self, td_uuid): - """Read a task_detail. *Read-only*, so no need of zk transaction. + """Read a taskdetail. + + *Read-only*, so no need of zk transaction. """ with self._exc_wrapper(): return self._get_task_details(td_uuid) def _get_task_details(self, td_uuid): - zk_path = paths.join(self.task_path, td_uuid) - + td_path = paths.join(self.task_path, td_uuid) try: - zk_data, _zstat = self.backend.zk.get(zk_path) + td_data, _zstat = self._client.get(td_path) except k_exc.NoNodeError: raise exc.NotFound("No task details found with id: %s" % td_uuid) - - td = p_utils.unformat_task_detail(td_uuid, misc.decode_json(zk_data)) - return td + else: + return p_utils.unformat_task_detail(td_uuid, + misc.decode_json(td_data)) def update_flow_details(self, fd): - """Update a flow_detail transactionally. - """ + """Update a flowdetail transactionally.""" with self._exc_wrapper(): - with self.backend.zk.transaction() as txn: + with self._client.transaction() as txn: return self._update_flow_details(fd, txn) def _update_flow_details(self, fd, txn, create_missing=False): - zk_path = paths.join(self.flow_path, fd.uuid) - - # determine whether the desired data exists or not + # Determine whether the desired data exists or not + fd_path = paths.join(self.flow_path, fd.uuid) try: - zk_data, _zstat = self.backend.zk.get(zk_path) + fd_data, _zstat = self._client.get(fd_path) except k_exc.NoNodeError: - # Not-existent: create or raise exception + # Not-existent: create or raise exception if create_missing: - txn.create(zk_path) + txn.create(fd_path) e_fd = logbook.FlowDetail(name=fd.name, uuid=fd.uuid) else: raise exc.NotFound("No flow details found with id: %s" % fd.uuid) - # Existent: read it out else: + # Existent: read it out e_fd = p_utils.unformat_flow_detail(fd.uuid, - misc.decode_json(zk_data)) + misc.decode_json(fd_data)) # Update and write it back e_fd = p_utils.flow_details_merge(e_fd, fd) - zk_data = misc.binary_encode(jsonutils.dumps( - p_utils.format_flow_detail(e_fd))) - txn.set_data(zk_path, zk_data) + fd_data = p_utils.format_flow_detail(e_fd) + txn.set_data(fd_path, misc.binary_encode(jsonutils.dumps(fd_data))) for td in fd: - zk_path = paths.join(self.flow_path, fd.uuid, td.uuid) - if not self.backend.zk.exists(zk_path): - txn.create(zk_path) - e_td = self._update_task_details(td, txn, create_missing=True) - e_fd.add(e_td) + td_path = paths.join(fd_path, td.uuid) + # NOTE(harlowja): create an entry in the flow detail path + # for the provided task detail so that a reference exists + # from the flow detail to its task details. + if not self._client.exists(td_path): + txn.create(td_path) + e_fd.add(self._update_task_details(td, txn, create_missing=True)) return e_fd def get_flow_details(self, fd_uuid): - """Read a flow_detail. *Read-only*, so no need of zk transaction. + """Read a flowdetail. + + *Read-only*, so no need of zk transaction. """ with self._exc_wrapper(): return self._get_flow_details(fd_uuid) def _get_flow_details(self, fd_uuid): - zk_path = paths.join(self.flow_path, fd_uuid) - + fd_path = paths.join(self.flow_path, fd_uuid) try: - zk_data, _zstat = self.backend.zk.get(zk_path) + fd_data, _zstat = self._client.get(fd_path) except k_exc.NoNodeError: raise exc.NotFound("No flow details found with id: %s" % fd_uuid) - fd = p_utils.unformat_flow_detail(fd_uuid, misc.decode_json(zk_data)) - for td_uuid in self.backend.zk.get_children(zk_path): - td = self._get_task_details(td_uuid) - fd.add(td) + fd = p_utils.unformat_flow_detail(fd_uuid, misc.decode_json(fd_data)) + for td_uuid in self._client.get_children(fd_path): + fd.add(self._get_task_details(td_uuid)) return fd def save_logbook(self, lb): - """Save (update) a log_book transactionally. - """ + """Save (update) a log_book transactionally.""" + + def create_logbook(lb_path, txn): + lb_data = p_utils.format_logbook(lb, created_at=None) + txn.create(lb_path, misc.binary_encode(jsonutils.dumps(lb_data))) + for fd in lb: + # NOTE(harlowja): create an entry in the logbook path + # for the provided flow detail so that a reference exists + # from the logbook to its flow details. + txn.create(paths.join(lb_path, fd.uuid)) + fd_path = paths.join(self.flow_path, fd.uuid) + fd_data = jsonutils.dumps(p_utils.format_flow_detail(fd)) + txn.create(fd_path, misc.binary_encode(fd_data)) + for td in fd: + # NOTE(harlowja): create an entry in the flow detail path + # for the provided task detail so that a reference exists + # from the flow detail to its task details. + txn.create(paths.join(fd_path, td.uuid)) + td_path = paths.join(self.task_path, td.uuid) + td_data = jsonutils.dumps(p_utils.format_task_detail(td)) + txn.create(td_path, misc.binary_encode(td_data)) + return lb + + def update_logbook(lb_path, lb_data, txn): + e_lb = p_utils.unformat_logbook(lb.uuid, misc.decode_json(lb_data)) + e_lb = p_utils.logbook_merge(e_lb, lb) + lb_data = p_utils.format_logbook(e_lb, created_at=lb.created_at) + txn.set_data(lb_path, misc.binary_encode(jsonutils.dumps(lb_data))) + for fd in lb: + fd_path = paths.join(lb_path, fd.uuid) + if not self._client.exists(fd_path): + # NOTE(harlowja): create an entry in the logbook path + # for the provided flow detail so that a reference exists + # from the logbook to its flow details. + txn.create(fd_path) + e_fd = self._update_flow_details(fd, txn, create_missing=True) + e_lb.add(e_fd) + return e_lb + with self._exc_wrapper(): - with self.backend.zk.transaction() as txn: - zk_path = paths.join(self.log_path, lb.uuid) - - # determine whether the desired data exists or not + with self._client.transaction() as txn: + # Determine whether the desired data exists or not. + lb_path = paths.join(self.book_path, lb.uuid) try: - zk_data, _zstat = self.backend.zk.get(zk_path) + lb_data, _zstat = self._client.get(lb_path) except k_exc.NoNodeError: - # Create if a new log_book - e_lb = lb - zk_data = misc.binary_encode(jsonutils.dumps( - p_utils.format_logbook(lb, created_at=None))) - txn.create(zk_path, zk_data) - for fd in lb: - zk_path = paths.join(self.log_path, lb.uuid, fd.uuid) - txn.create(zk_path) - zk_path = paths.join(self.flow_path, fd.uuid) - zk_data = misc.binary_encode(jsonutils.dumps( - p_utils.format_flow_detail(fd))) - txn.create(zk_path, zk_data) - for td in fd: - zk_path = paths.join( - self.flow_path, fd.uuid, td.uuid) - txn.create(zk_path) - zk_path = paths.join(self.task_path, td.uuid) - zk_data = misc.binary_encode(jsonutils.dumps( - p_utils.format_task_detail(td))) - txn.create(zk_path, zk_data) - - # Otherwise update the existing log_book + # Create a new logbook since it doesn't exist. + e_lb = create_logbook(lb_path, txn) else: - e_lb = p_utils.unformat_logbook(lb.uuid, - misc.decode_json(zk_data)) - e_lb = p_utils.logbook_merge(e_lb, lb) - zk_data = misc.binary_encode((jsonutils.dumps( - p_utils.format_logbook(e_lb, - created_at=lb.created_at)))) - txn.set_data(zk_path, zk_data) - for fd in lb: - zk_path = paths.join(self.log_path, lb.uuid, fd.uuid) - if not self.backend.zk.exists(zk_path): - txn.create(zk_path) - e_fd = self._update_flow_details(fd, txn, - create_missing=True) - e_lb.add(e_fd) - # finally return (updated) log_book + # Otherwise update the existing logbook instead. + e_lb = update_logbook(lb_path, lb_data, txn) + # Finally return (updated) logbook. return e_lb def get_logbook(self, lb_uuid): - """Read a log_book. *Read-only*, so no need of zk transaction. + """Read a logbook. + + *Read-only*, so no need of zk transaction. """ with self._exc_wrapper(): - zk_path = paths.join(self.log_path, lb_uuid) - + lb_path = paths.join(self.book_path, lb_uuid) try: - zk_data, _zstat = self.backend.zk.get(zk_path) + lb_data, _zstat = self._client.get(lb_path) except k_exc.NoNodeError: raise exc.NotFound("No logbook found with id: %s" % lb_uuid) - - lb = p_utils.unformat_logbook(lb_uuid, misc.decode_json(zk_data)) - for fd_uuid in self.backend.zk.get_children(zk_path): - fd = self._get_flow_details(fd_uuid) - lb.add(fd) - return lb + else: + lb = p_utils.unformat_logbook(lb_uuid, + misc.decode_json(lb_data)) + for fd_uuid in self._client.get_children(lb_path): + lb.add(self._get_flow_details(fd_uuid)) + return lb def get_logbooks(self): - """Read all logbooks. *Read-only*, so no need of zk transaction - """ + """Read all logbooks. *Read-only*, so no need of zk transaction.""" with self._exc_wrapper(): - for lb_uuid in self.backend.zk.get_children(self.log_path): + for lb_uuid in self._client.get_children(self.book_path): yield self.get_logbook(lb_uuid) def destroy_logbook(self, lb_uuid): - """Detroy (delete) a log_book transactionally. - """ + """Detroy (delete) a log_book transactionally.""" def _destroy_task_details(td_uuid, txn): - zk_path = paths.join(self.task_path, td_uuid) - if not self.backend.zk.exists(zk_path): + td_path = paths.join(self.task_path, td_uuid) + if not self._client.exists(td_path): raise exc.NotFound("No task details found with id: %s" % td_uuid) - - txn.delete(zk_path) + txn.delete(td_path) def _destroy_flow_details(fd_uuid, txn): - zk_path = paths.join(self.flow_path, fd_uuid) - if not self.backend.zk.exists(zk_path): + fd_path = paths.join(self.flow_path, fd_uuid) + if not self._client.exists(fd_path): raise exc.NotFound("No flow details found with id: %s" % fd_uuid) - - for td_uuid in self.backend.zk.get_children(zk_path): + for td_uuid in self._client.get_children(fd_path): _destroy_task_details(td_uuid, txn) - txn.delete(paths.join(zk_path, td_uuid)) - txn.delete(zk_path) + txn.delete(paths.join(fd_path, td_uuid)) + txn.delete(fd_path) def _destroy_logbook(lb_uuid, txn): - zk_path = paths.join(self.log_path, lb_uuid) - if not self.backend.zk.exists(zk_path): + lb_path = paths.join(self.book_path, lb_uuid) + if not self._client.exists(lb_path): raise exc.NotFound("No logbook found with id: %s" % lb_uuid) - - for fd_uuid in self.backend.zk.get_children(zk_path): + for fd_uuid in self._client.get_children(lb_path): _destroy_flow_details(fd_uuid, txn) - txn.delete(paths.join(zk_path, fd_uuid)) - txn.delete(zk_path) + txn.delete(paths.join(lb_path, fd_uuid)) + txn.delete(lb_path) with self._exc_wrapper(): - with self.backend.zk.transaction() as txn: + with self._client.transaction() as txn: _destroy_logbook(lb_uuid, txn) - def clear_all(self): - """Delete all data transactioanlly. - """ + def clear_all(self, delete_dirs=True): + """Delete all data transactioanlly.""" with self._exc_wrapper(): - with self.backend.zk.transaction() as txn: - # delete all data under log_book path - for lb_uuid in self.backend.zk.get_children(self.log_path): - zk_path = paths.join(self.log_path, lb_uuid) - for fd_uuid in self.backend.zk.get_children(zk_path): - txn.delete(paths.join(zk_path, fd_uuid)) - txn.delete(zk_path) - txn.delete(self.log_path) + with self._client.transaction() as txn: - # delete all data under flow_detail path - for fd_uuid in self.backend.zk.get_children(self.flow_path): - zk_path = paths.join(self.flow_path, fd_uuid) - for td_uuid in self.backend.zk.get_children(zk_path): - txn.delete(paths.join(zk_path, td_uuid)) - txn.delete(zk_path) - txn.delete(self.flow_path) + # Delete all data under logbook path. + for lb_uuid in self._client.get_children(self.book_path): + lb_path = paths.join(self.book_path, lb_uuid) + for fd_uuid in self._client.get_children(lb_path): + txn.delete(paths.join(lb_path, fd_uuid)) + txn.delete(lb_path) - # delete all data under task_detail path - for td_uuid in self.backend.zk.get_children(self.task_path): - zk_path = paths.join(self.task_path, td_uuid) - txn.delete(zk_path) - txn.delete(self.task_path) + # Delete all data under flowdetail path. + for fd_uuid in self._client.get_children(self.flow_path): + fd_path = paths.join(self.flow_path, fd_uuid) + for td_uuid in self._client.get_children(fd_path): + txn.delete(paths.join(fd_path, td_uuid)) + txn.delete(fd_path) - # delete top-level path - txn.delete(self.backend.path) + # Delete all data under taskdetail path. + for td_uuid in self._client.get_children(self.task_path): + td_path = paths.join(self.task_path, td_uuid) + txn.delete(td_path) + + # Delete containing directories. + if delete_dirs: + txn.delete(self.book_path) + txn.delete(self.task_path) + txn.delete(self.flow_path) diff --git a/taskflow/tests/unit/persistence/test_zake_persistence.py b/taskflow/tests/unit/persistence/test_zake_persistence.py index 22e8ab5e..2be24ca8 100644 --- a/taskflow/tests/unit/persistence/test_zake_persistence.py +++ b/taskflow/tests/unit/persistence/test_zake_persistence.py @@ -18,30 +18,29 @@ import contextlib +from zake import fake_client + from taskflow.persistence import backends from taskflow.persistence.backends import impl_zookeeper from taskflow import test from taskflow.tests.unit.persistence import base -class ZkPersistenceTest(test.TestCase, base.PersistenceTestMixin): +class ZakePersistenceTest(test.TestCase, base.PersistenceTestMixin): def _get_connection(self): return self._backend.get_connection() def setUp(self): - super(ZkPersistenceTest, self).setUp() + super(ZakePersistenceTest, self).setUp() conf = { "path": "/taskflow", } - self._backend = impl_zookeeper.ZkBackend(conf) + client = fake_client.FakeClient() + client.start() + self._backend = impl_zookeeper.ZkBackend(conf, client=client) conn = self._backend.get_connection() conn.upgrade() - def tearDown(self): - super(ZkPersistenceTest, self).tearDown() - conn = self._get_connection() - conn.clear_all() - def test_zk_persistence_entry_point(self): conf = {'connection': 'zookeeper:'} with contextlib.closing(backends.fetch(conf)) as be: diff --git a/taskflow/utils/kazoo_utils.py b/taskflow/utils/kazoo_utils.py new file mode 100644 index 00000000..0bd91402 --- /dev/null +++ b/taskflow/utils/kazoo_utils.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kazoo import client +import six + + +def _parse_hosts(hosts): + if isinstance(hosts, six.string_types): + return hosts.strip() + if isinstance(hosts, (dict)): + host_ports = [] + for (k, v) in six.iteritems(hosts): + host_ports.append("%s:%s" % (k, v)) + hosts = host_ports + if isinstance(hosts, (list, set, tuple)): + return ",".join([str(h) for h in hosts]) + return hosts + + +def make_client(conf): + """Creates a kazoo client given a configuration dictionary.""" + client_kwargs = { + 'read_only': bool(conf.get('read_only')), + 'randomize_hosts': bool(conf.get('randomize_hosts')), + } + hosts = _parse_hosts(conf.get("hosts", "localhost:2181")) + if not hosts or not isinstance(hosts, six.string_types): + raise TypeError("Invalid hosts format, expected " + "non-empty string/list, not %s" % type(hosts)) + client_kwargs['hosts'] = hosts + if 'timeout' in conf: + client_kwargs['timeout'] = float(conf['timeout']) + # Kazoo supports various handlers, gevent, threading, eventlet... + # allow the user of this client object to optionally specify one to be + # used. + if 'handler' in conf: + client_kwargs['handler'] = conf['handler'] + return client.KazooClient(**client_kwargs) diff --git a/test-requirements.txt b/test-requirements.txt index 3c40bcf5..9a2c7591 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -6,4 +6,4 @@ testrepository>=0.0.17 testtools>=0.9.32,<0.9.35 # ZooKeeper kazoo>=1.3.1 -zake>=0.0.7 +zake>=0.0.13