diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index b205e8ce..698a4465 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -22,6 +22,11 @@ class TaskFlowException(Exception): pass +class ConnectionFailure(TaskFlowException): + """Raised when some type of connection can not be opened or is lost.""" + pass + + class Duplicate(TaskFlowException): """Raised when a duplicate entry is found.""" pass diff --git a/taskflow/persistence/backends/impl_dir.py b/taskflow/persistence/backends/impl_dir.py index e3e4eb46..7c934435 100644 --- a/taskflow/persistence/backends/impl_dir.py +++ b/taskflow/persistence/backends/impl_dir.py @@ -28,9 +28,7 @@ import six from taskflow import exceptions as exc from taskflow.openstack.common import jsonutils -from taskflow.openstack.common import timeutils from taskflow.persistence.backends import base -from taskflow.persistence import logbook from taskflow.utils import lock_utils from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils @@ -169,7 +167,7 @@ class Connection(base.Connection): if e_td is not None: task_detail = p_utils.task_details_merge(e_td, task_detail) td_path = os.path.join(self._task_path, task_detail.uuid) - td_data = _format_task_detail(task_detail) + td_data = p_utils.format_task_detail(task_detail) self._write_to(td_path, jsonutils.dumps(td_data)) return task_detail @@ -185,7 +183,7 @@ class Connection(base.Connection): def _get(): td_path = os.path.join(self._task_path, uuid) td_data = jsonutils.loads(self._read_from(td_path)) - return _unformat_task_detail(uuid, td_data) + return p_utils.unformat_task_detail(uuid, td_data) if lock: return self._run_with_process_lock('task', _get) @@ -198,7 +196,7 @@ class Connection(base.Connection): fd_path = os.path.join(self._flow_path, uuid) meta_path = os.path.join(fd_path, 'metadata') meta = jsonutils.loads(self._read_from(meta_path)) - fd = _unformat_flow_detail(uuid, meta) + fd = p_utils.unformat_flow_detail(uuid, meta) td_to_load = [] td_path = os.path.join(fd_path, 'tasks') try: @@ -244,8 +242,9 @@ class Connection(base.Connection): flow_detail = e_fd flow_path = os.path.join(self._flow_path, flow_detail.uuid) misc.ensure_tree(flow_path) - self._write_to(os.path.join(flow_path, 'metadata'), - jsonutils.dumps(_format_flow_detail(flow_detail))) + self._write_to( + os.path.join(flow_path, 'metadata'), + jsonutils.dumps(p_utils.format_flow_detail(flow_detail))) if len(flow_detail): task_path = os.path.join(flow_path, 'tasks') misc.ensure_tree(task_path) @@ -290,9 +289,8 @@ class Connection(base.Connection): created_at = None if e_lb is not None: created_at = e_lb.created_at - self._write_to(os.path.join(book_path, 'metadata'), - jsonutils.dumps(_format_logbook(book, - created_at=created_at))) + self._write_to(os.path.join(book_path, 'metadata'), jsonutils.dumps( + p_utils.format_logbook(book, created_at=created_at))) if len(book): flow_path = os.path.join(book_path, 'flows') misc.ensure_tree(flow_path) @@ -382,7 +380,7 @@ class Connection(base.Connection): raise exc.NotFound("No logbook found with id: %s" % book_uuid) else: raise - lb = _unformat_logbook(book_uuid, meta) + lb = p_utils.unformat_logbook(book_uuid, meta) fd_path = os.path.join(book_path, 'flows') fd_uuids = [] try: @@ -399,78 +397,3 @@ class Connection(base.Connection): def get_logbook(self, book_uuid): return self._run_with_process_lock("book", self._get_logbook, book_uuid) - - -### -# Internal <-> external model + other helper functions. -### - -def _str_2_datetime(text): - """Converts an iso8601 string/text into a datetime object (or none).""" - if text is None: - return None - if not isinstance(text, six.string_types): - raise ValueError("Can only convert strings into a datetime object and" - " not %r" % (text)) - if not len(text): - return None - return timeutils.parse_isotime(text) - - -def _format_task_detail(task_detail): - return { - 'failure': p_utils.failure_to_dict(task_detail.failure), - 'meta': task_detail.meta, - 'name': task_detail.name, - 'results': task_detail.results, - 'state': task_detail.state, - 'version': task_detail.version, - } - - -def _unformat_task_detail(uuid, td_data): - td = logbook.TaskDetail(name=td_data['name'], uuid=uuid) - td.state = td_data.get('state') - td.results = td_data.get('results') - td.failure = p_utils.failure_from_dict(td_data.get('failure')) - td.meta = td_data.get('meta') - td.version = td_data.get('version') - return td - - -def _format_flow_detail(flow_detail): - return { - 'name': flow_detail.name, - 'meta': flow_detail.meta, - 'state': flow_detail.state, - } - - -def _unformat_flow_detail(uuid, fd_data): - fd = logbook.FlowDetail(name=fd_data['name'], uuid=uuid) - fd.state = fd_data.get('state') - fd.meta = fd_data.get('meta') - return fd - - -def _format_logbook(book, created_at=None): - lb_data = { - 'name': book.name, - 'meta': book.meta, - } - if created_at: - lb_data['created_at'] = timeutils.isotime(at=created_at) - lb_data['updated_at'] = timeutils.isotime() - else: - lb_data['created_at'] = timeutils.isotime() - lb_data['updated_at'] = None - return lb_data - - -def _unformat_logbook(uuid, lb_data): - lb = logbook.LogBook(name=lb_data['name'], - uuid=uuid, - updated_at=_str_2_datetime(lb_data['updated_at']), - created_at=_str_2_datetime(lb_data['created_at'])) - lb.meta = lb_data.get('meta') - return lb diff --git a/taskflow/persistence/backends/impl_zookeeper.py b/taskflow/persistence/backends/impl_zookeeper.py new file mode 100644 index 00000000..a3b8e35b --- /dev/null +++ b/taskflow/persistence/backends/impl_zookeeper.py @@ -0,0 +1,385 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2014 AT&T Labs All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import logging + +from kazoo import client as kazoo_client +from kazoo import exceptions as k_exc +from kazoo.protocol import paths +from zake import fake_client + +from taskflow import exceptions as exc +from taskflow.openstack.common import jsonutils +from taskflow.persistence.backends import base +from taskflow.persistence import logbook +from taskflow.utils import misc +from taskflow.utils import persistence_utils as p_utils + +LOG = logging.getLogger(__name__) + + +class ZkBackend(base.Backend): + """ZooKeeper as backend storage implementation + + Example conf (use Kazoo): + + conf = { + "hosts": "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181", + "path": "/taskflow", + } + + Example conf (use Zake): + + conf = { + "path": "/taskflow", + } + """ + def __init__(self, conf): + super(ZkBackend, self).__init__(conf) + path = conf.get("path", "/taskflow") + self._path = path + hosts = conf.get("hosts", None) + # if no specified hosts, use zake to fake + if hosts is None: + self._zk = fake_client.FakeClient() + # otherwise use Kazoo + else: + self._zk = kazoo_client.KazooClient(hosts=hosts) + + @property + def zk(self): + return self._zk + + @property + def path(self): + return self._path + + def get_connection(self): + return ZkConnection(self) + + def close(self): + self.zk.stop() + self.zk.close() + + +class ZkConnection(base.Connection): + def __init__(self, backend): + self._backend = backend + + self._log_path = paths.join(self._backend.path, "log_books") + self._flow_path = paths.join(self._backend.path, "flow_details") + self._task_path = paths.join(self._backend.path, "task_details") + + with self._exc_wrapper(): + self._backend.zk.start() + self._backend.zk.ensure_path(self._backend.path) + self._backend.zk.ensure_path(self.log_path) + self._backend.zk.ensure_path(self.flow_path) + self._backend.zk.ensure_path(self.task_path) + + @property + def backend(self): + return self._backend + + @property + def log_path(self): + return self._log_path + + @property + def flow_path(self): + return self._flow_path + + @property + def task_path(self): + return self._task_path + + def close(self): + pass + + def upgrade(self): + pass + + @contextlib.contextmanager + def _exc_wrapper(self): + """Exception wrapper which wraps kazoo exceptions and groups them + to taskflow exceptions. + """ + try: + yield + except self._backend.zk.handler.timeout_exception as e: + raise exc.ConnectionFailure("Backend error: %s" % e) + except k_exc.SessionExpiredError as e: + raise exc.ConnectionFailure("Backend error: %s" % e) + except k_exc.NoNodeError as e: + raise exc.NotFound("Backend error: %s" % e) + except k_exc.NodeExistsError as e: + raise exc.AlreadyExists("Backend error: %s" % e) + except (k_exc.KazooException, k_exc.ZookeeperError) as e: + raise exc.TaskFlowException("Backend error: %s" % e) + + def update_task_details(self, td): + """Update a task_detail transactionally. + """ + with self._exc_wrapper(): + with self.backend.zk.transaction() as txn: + return self._update_task_details(td, txn) + + def _update_task_details(self, td, txn, create_missing=False): + zk_path = paths.join(self.task_path, td.uuid) + + # determine whether the desired data exists or not + try: + zk_data, _zstat = self.backend.zk.get(zk_path) + except k_exc.NoNodeError: + # Not-existent: create or raise exception + if create_missing: + txn.create(zk_path) + e_td = logbook.TaskDetail(name=td.name, uuid=td.uuid) + else: + raise exc.NotFound("No task details found with id: %s" + % td.uuid) + # Existent: read it out + else: + e_td = p_utils.unformat_task_detail(td.uuid, + misc.decode_json(zk_data)) + + # Update and write it back + e_td = p_utils.task_details_merge(e_td, td) + zk_data = misc.binary_encode(jsonutils.dumps( + p_utils.format_task_detail(e_td))) + txn.set_data(zk_path, zk_data) + return e_td + + def get_task_details(self, td_uuid): + """Read a task_detail. *Read-only*, so no need of zk transaction. + """ + with self._exc_wrapper(): + return self._get_task_details(td_uuid) + + def _get_task_details(self, td_uuid): + zk_path = paths.join(self.task_path, td_uuid) + + try: + zk_data, _zstat = self.backend.zk.get(zk_path) + except k_exc.NoNodeError: + raise exc.NotFound("No task details found with id: %s" % td_uuid) + + td = p_utils.unformat_task_detail(td_uuid, misc.decode_json(zk_data)) + return td + + def update_flow_details(self, fd): + """Update a flow_detail transactionally. + """ + with self._exc_wrapper(): + with self.backend.zk.transaction() as txn: + return self._update_flow_details(fd, txn) + + def _update_flow_details(self, fd, txn, create_missing=False): + zk_path = paths.join(self.flow_path, fd.uuid) + + # determine whether the desired data exists or not + try: + zk_data, _zstat = self.backend.zk.get(zk_path) + except k_exc.NoNodeError: + # Not-existent: create or raise exception + if create_missing: + txn.create(zk_path) + e_fd = logbook.FlowDetail(name=fd.name, uuid=fd.uuid) + else: + raise exc.NotFound("No flow details found with id: %s" + % fd.uuid) + # Existent: read it out + else: + e_fd = p_utils.unformat_flow_detail(fd.uuid, + misc.decode_json(zk_data)) + + # Update and write it back + e_fd = p_utils.flow_details_merge(e_fd, fd) + zk_data = misc.binary_encode(jsonutils.dumps( + p_utils.format_flow_detail(e_fd))) + txn.set_data(zk_path, zk_data) + for td in fd: + zk_path = paths.join(self.flow_path, fd.uuid, td.uuid) + if not self.backend.zk.exists(zk_path): + txn.create(zk_path) + e_td = self._update_task_details(td, txn, create_missing=True) + e_fd.add(e_td) + return e_fd + + def get_flow_details(self, fd_uuid): + """Read a flow_detail. *Read-only*, so no need of zk transaction. + """ + with self._exc_wrapper(): + return self._get_flow_details(fd_uuid) + + def _get_flow_details(self, fd_uuid): + zk_path = paths.join(self.flow_path, fd_uuid) + + try: + zk_data, _zstat = self.backend.zk.get(zk_path) + except k_exc.NoNodeError: + raise exc.NotFound("No flow details found with id: %s" % fd_uuid) + + fd = p_utils.unformat_flow_detail(fd_uuid, misc.decode_json(zk_data)) + for td_uuid in self.backend.zk.get_children(zk_path): + td = self._get_task_details(td_uuid) + fd.add(td) + return fd + + def save_logbook(self, lb): + """Save (update) a log_book transactionally. + """ + with self._exc_wrapper(): + with self.backend.zk.transaction() as txn: + zk_path = paths.join(self.log_path, lb.uuid) + + # determine whether the desired data exists or not + try: + zk_data, _zstat = self.backend.zk.get(zk_path) + except k_exc.NoNodeError: + # Create if a new log_book + e_lb = lb + zk_data = misc.binary_encode(jsonutils.dumps( + p_utils.format_logbook(lb, created_at=None))) + txn.create(zk_path, zk_data) + for fd in lb: + zk_path = paths.join(self.log_path, lb.uuid, fd.uuid) + txn.create(zk_path) + zk_path = paths.join(self.flow_path, fd.uuid) + zk_data = misc.binary_encode(jsonutils.dumps( + p_utils.format_flow_detail(fd))) + txn.create(zk_path, zk_data) + for td in fd: + zk_path = paths.join( + self.flow_path, fd.uuid, td.uuid) + txn.create(zk_path) + zk_path = paths.join(self.task_path, td.uuid) + zk_data = misc.binary_encode(jsonutils.dumps( + p_utils.format_task_detail(td))) + txn.create(zk_path, zk_data) + + # Otherwise update the existing log_book + else: + e_lb = p_utils.unformat_logbook(lb.uuid, + misc.decode_json(zk_data)) + e_lb = p_utils.logbook_merge(e_lb, lb) + zk_data = misc.binary_encode((jsonutils.dumps( + p_utils.format_logbook(e_lb, + created_at=lb.created_at)))) + txn.set_data(zk_path, zk_data) + for fd in lb: + zk_path = paths.join(self.log_path, lb.uuid, fd.uuid) + if not self.backend.zk.exists(zk_path): + txn.create(zk_path) + e_fd = self._update_flow_details(fd, txn, + create_missing=True) + e_lb.add(e_fd) + # finally return (updated) log_book + return e_lb + + def get_logbook(self, lb_uuid): + """Read a log_book. *Read-only*, so no need of zk transaction. + """ + with self._exc_wrapper(): + zk_path = paths.join(self.log_path, lb_uuid) + + try: + zk_data, _zstat = self.backend.zk.get(zk_path) + except k_exc.NoNodeError: + raise exc.NotFound("No logbook found with id: %s" % lb_uuid) + + lb = p_utils.unformat_logbook(lb_uuid, misc.decode_json(zk_data)) + for fd_uuid in self.backend.zk.get_children(zk_path): + fd = self._get_flow_details(fd_uuid) + lb.add(fd) + return lb + + def get_logbooks(self): + """Read all logbooks. *Read-only*, so no need of zk transaction + """ + with self._exc_wrapper(): + for lb_uuid in self.backend.zk.get_children(self.log_path): + yield self.get_logbook(lb_uuid) + + def destroy_logbook(self, lb_uuid): + """Detroy (delete) a log_book transactionally. + """ + + def _destroy_task_details(td_uuid, txn): + zk_path = paths.join(self.task_path, td_uuid) + if not self.backend.zk.exists(zk_path): + raise exc.NotFound("No task details found with id: %s" + % td_uuid) + + txn.delete(zk_path) + + def _destroy_flow_details(fd_uuid, txn): + zk_path = paths.join(self.flow_path, fd_uuid) + if not self.backend.zk.exists(zk_path): + raise exc.NotFound("No flow details found with id: %s" + % fd_uuid) + + for td_uuid in self.backend.zk.get_children(zk_path): + _destroy_task_details(td_uuid, txn) + txn.delete(paths.join(zk_path, td_uuid)) + txn.delete(zk_path) + + def _destroy_logbook(lb_uuid, txn): + zk_path = paths.join(self.log_path, lb_uuid) + if not self.backend.zk.exists(zk_path): + raise exc.NotFound("No logbook found with id: %s" % lb_uuid) + + for fd_uuid in self.backend.zk.get_children(zk_path): + _destroy_flow_details(fd_uuid, txn) + txn.delete(paths.join(zk_path, fd_uuid)) + txn.delete(zk_path) + + with self._exc_wrapper(): + with self.backend.zk.transaction() as txn: + _destroy_logbook(lb_uuid, txn) + + def clear_all(self): + """Delete all data transactioanlly. + """ + with self._exc_wrapper(): + with self.backend.zk.transaction() as txn: + # delete all data under log_book path + for lb_uuid in self.backend.zk.get_children(self.log_path): + zk_path = paths.join(self.log_path, lb_uuid) + for fd_uuid in self.backend.zk.get_children(zk_path): + txn.delete(paths.join(zk_path, fd_uuid)) + txn.delete(zk_path) + txn.delete(self.log_path) + + # delete all data under flow_detail path + for fd_uuid in self.backend.zk.get_children(self.flow_path): + zk_path = paths.join(self.flow_path, fd_uuid) + for td_uuid in self.backend.zk.get_children(zk_path): + txn.delete(paths.join(zk_path, td_uuid)) + txn.delete(zk_path) + txn.delete(self.flow_path) + + # delete all data under task_detail path + for td_uuid in self.backend.zk.get_children(self.task_path): + zk_path = paths.join(self.task_path, td_uuid) + txn.delete(zk_path) + txn.delete(self.task_path) + + # delete top-level path + txn.delete(self.backend.path) diff --git a/taskflow/tests/unit/persistence/test_zake_persistence.py b/taskflow/tests/unit/persistence/test_zake_persistence.py new file mode 100644 index 00000000..1de5267b --- /dev/null +++ b/taskflow/tests/unit/persistence/test_zake_persistence.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2014 AT&T Labs All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow.persistence.backends import impl_zookeeper +from taskflow import test +from taskflow.tests.unit.persistence import base + + +class ZkPersistenceTest(test.TestCase, base.PersistenceTestMixin): + def _get_connection(self): + return self._backend.get_connection() + + def setUp(self): + super(ZkPersistenceTest, self).setUp() + conf = { + "path": "/taskflow", + } + self._backend = impl_zookeeper.ZkBackend(conf) + conn = self._backend.get_connection() + conn.upgrade() + + def tearDown(self): + super(ZkPersistenceTest, self).tearDown() + conn = self._get_connection() + conn.clear_all() diff --git a/taskflow/tests/unit/persistence/test_zk_persistence.py b/taskflow/tests/unit/persistence/test_zk_persistence.py new file mode 100644 index 00000000..d259cbf4 --- /dev/null +++ b/taskflow/tests/unit/persistence/test_zk_persistence.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2014 AT&T Labs All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from taskflow.persistence.backends import impl_zookeeper +from taskflow import test +from taskflow.tests.unit.persistence import base + + +@testtools.skipIf(True, 'ZooKeeper is not available in Jenkins') +class ZkPersistenceTest(test.TestCase, base.PersistenceTestMixin): + def _get_connection(self): + return self._backend.get_connection() + + def setUp(self): + super(ZkPersistenceTest, self).setUp() + conf = { + 'hosts': "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181", + 'path': "/taskflow", + } + self._backend = impl_zookeeper.ZkBackend(conf) + conn = self._get_connection() + conn.upgrade() + + def tearDown(self): + super(ZkPersistenceTest, self).tearDown() + conn = self._get_connection() + conn.clear_all() diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py index b9bd1940..a1fd26fc 100644 --- a/taskflow/utils/persistence_utils.py +++ b/taskflow/utils/persistence_utils.py @@ -20,6 +20,8 @@ import contextlib import copy import logging +import six + from taskflow.openstack.common import timeutils from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook @@ -265,3 +267,74 @@ def pformat(book, indent=0): for flow_detail in book: lines.append(pformat_flow_detail(flow_detail, indent=indent + 1)) return "\n".join(lines) + + +def _str_2_datetime(text): + """Converts an iso8601 string/text into a datetime object (or none).""" + if text is None: + return None + if not isinstance(text, six.string_types): + raise ValueError("Can only convert strings into a datetime object and" + " not %r" % (text)) + if not len(text): + return None + return timeutils.parse_isotime(text) + + +def format_task_detail(td): + return { + 'failure': failure_to_dict(td.failure), + 'meta': td.meta, + 'name': td.name, + 'results': td.results, + 'state': td.state, + 'version': td.version, + } + + +def unformat_task_detail(uuid, td_data): + td = logbook.TaskDetail(name=td_data['name'], uuid=uuid) + td.state = td_data.get('state') + td.results = td_data.get('results') + td.failure = failure_from_dict(td_data.get('failure')) + td.meta = td_data.get('meta') + td.version = td_data.get('version') + return td + + +def format_flow_detail(fd): + return { + 'name': fd.name, + 'meta': fd.meta, + 'state': fd.state, + } + + +def unformat_flow_detail(uuid, fd_data): + fd = logbook.FlowDetail(name=fd_data['name'], uuid=uuid) + fd.state = fd_data.get('state') + fd.meta = fd_data.get('meta') + return fd + + +def format_logbook(lb, created_at=None): + lb_data = { + 'name': lb.name, + 'meta': lb.meta, + } + if created_at: + lb_data['created_at'] = timeutils.isotime(at=created_at) + lb_data['updated_at'] = timeutils.isotime() + else: + lb_data['created_at'] = timeutils.isotime() + lb_data['updated_at'] = None + return lb_data + + +def unformat_logbook(uuid, lb_data): + lb = logbook.LogBook(name=lb_data['name'], + uuid=uuid, + updated_at=_str_2_datetime(lb_data['updated_at']), + created_at=_str_2_datetime(lb_data['created_at'])) + lb.meta = lb_data.get('meta') + return lb diff --git a/test-requirements.txt b/test-requirements.txt index 70e70145..ff2745ef 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -4,3 +4,6 @@ coverage>=3.6 mock>=1.0 testrepository>=0.0.17 testtools>=0.9.32 +# ZooKeeper +kazoo>=1.3.1 +zake>=0.0.7