From 3e8eb915b55dfd6577a9cdb313afc71fd6d44b1a Mon Sep 17 00:00:00 2001 From: Dan Krause Date: Tue, 3 Mar 2015 10:57:09 -0600 Subject: [PATCH] Persistence backend refactor Factors lots of duplicate code out of persistence backends Adds get_flows_for_book to all backends Change-Id: I0434bd4931cd9274876f9e9c92909531f244bcac --- taskflow/persistence/backends/impl_dir.py | 404 +++--------------- taskflow/persistence/backends/impl_memory.py | 293 ++++--------- .../persistence/backends/impl_sqlalchemy.py | 21 +- .../persistence/backends/impl_zookeeper.py | 390 +++-------------- taskflow/persistence/base.py | 5 + taskflow/persistence/path_based.py | 244 +++++++++++ 6 files changed, 474 insertions(+), 883 deletions(-) create mode 100644 taskflow/persistence/path_based.py diff --git a/taskflow/persistence/backends/impl_dir.py b/taskflow/persistence/backends/impl_dir.py index 155ffe41..5ee244f7 100644 --- a/taskflow/persistence/backends/impl_dir.py +++ b/taskflow/persistence/backends/impl_dir.py @@ -15,6 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib import errno import os import shutil @@ -23,25 +24,26 @@ from oslo_serialization import jsonutils import six from taskflow import exceptions as exc -from taskflow import logging -from taskflow.persistence import base -from taskflow.persistence import logbook +from taskflow.persistence import path_based from taskflow.utils import lock_utils from taskflow.utils import misc -LOG = logging.getLogger(__name__) + +@contextlib.contextmanager +def _storagefailure_wrapper(): + try: + yield + except exc.TaskFlowException: + raise + except Exception as e: + if isinstance(e, (IOError, OSError)) and e.errno == errno.ENOENT: + raise exc.NotFound('Item not found: %s' % e.filename, e) + raise exc.StorageFailure("Storage backend internal error", e) -class DirBackend(base.Backend): +class DirBackend(path_based.PathBasedBackend): """A directory and file based backend. - This backend writes logbooks, flow details, and atom details to a provided - base path on the local filesystem. It will create and store those objects - in three key directories (one for logbooks, one for flow details and one - for atom details). It creates those associated directories and then - creates files inside those directories that represent the contents of those - objects for later reading and writing. - This backend does *not* provide true transactional semantics. It does guarantee that there will be no interprocess race conditions when writing and reading by using a consistent hierarchy of file based locks. @@ -54,17 +56,10 @@ class DirBackend(base.Backend): """ def __init__(self, conf): super(DirBackend, self).__init__(conf) - self._path = os.path.abspath(conf['path']) - self._lock_path = os.path.join(self._path, 'locks') - self._file_cache = {} - - @property - def lock_path(self): - return self._lock_path - - @property - def base_path(self): - return self._path + self.file_cache = {} + if not self._path: + raise ValueError("Empty path is disallowed") + self._path = os.path.abspath(self._path) def get_connection(self): return Connection(self) @@ -73,33 +68,13 @@ class DirBackend(base.Backend): pass -class Connection(base.Connection): - def __init__(self, backend): - self._backend = backend - self._file_cache = self._backend._file_cache - self._flow_path = os.path.join(self._backend.base_path, 'flows') - self._atom_path = os.path.join(self._backend.base_path, 'atoms') - self._book_path = os.path.join(self._backend.base_path, 'books') - - def validate(self): - # Verify key paths exist. - paths = [ - self._backend.base_path, - self._backend.lock_path, - self._flow_path, - self._atom_path, - self._book_path, - ] - for p in paths: - if not os.path.isdir(p): - raise RuntimeError("Missing required directory: %s" % (p)) - +class Connection(path_based.PathBasedConnection): def _read_from(self, filename): # This is very similar to the oslo-incubator fileutils module, but # tweaked to not depend on a global cache, as well as tweaked to not # pull-in the oslo logging module (which is a huge pile of code). mtime = os.path.getmtime(filename) - cache_info = self._file_cache.setdefault(filename, {}) + cache_info = self.backend.file_cache.setdefault(filename, {}) if not cache_info or mtime > cache_info.get('mtime', 0): with open(filename, 'rb') as fp: cache_info['data'] = fp.read().decode('utf-8') @@ -111,301 +86,56 @@ class Connection(base.Connection): contents = contents.encode('utf-8') with open(filename, 'wb') as fp: fp.write(contents) - self._file_cache.pop(filename, None) + self.backend.file_cache.pop(filename, None) - def _run_with_process_lock(self, lock_name, functor, *args, **kwargs): - lock_path = os.path.join(self.backend.lock_path, lock_name) - with lock_utils.InterProcessLock(lock_path): + @contextlib.contextmanager + def _path_lock(self, path): + lockfile = self._join_path(path, 'lock') + with lock_utils.InterProcessLock(lockfile) as lock: + with _storagefailure_wrapper(): + yield lock + + def _join_path(self, *parts): + return os.path.join(*parts) + + def _get_item(self, path): + with self._path_lock(path): + item_path = self._join_path(path, 'metadata') + return misc.decode_json(self._read_from(item_path)) + + def _set_item(self, path, value, transaction): + with self._path_lock(path): + item_path = self._join_path(path, 'metadata') + self._write_to(item_path, jsonutils.dumps(value)) + + def _del_tree(self, path, transaction): + with self._path_lock(path): + shutil.rmtree(path) + + def _get_children(self, path): + with _storagefailure_wrapper(): + return [link for link in os.listdir(path) + if os.path.islink(self._join_path(path, link))] + + def _ensure_path(self, path): + with _storagefailure_wrapper(): + misc.ensure_tree(path) + + def _create_link(self, src_path, dest_path, transaction): + with _storagefailure_wrapper(): try: - return functor(*args, **kwargs) - except exc.TaskFlowException: - raise - except Exception as e: - LOG.exception("Failed running locking file based session") - # NOTE(harlowja): trap all other errors as storage errors. - raise exc.StorageFailure("Storage backend internal error", e) - - def _get_logbooks(self): - lb_uuids = [] - try: - lb_uuids = [d for d in os.listdir(self._book_path) - if os.path.isdir(os.path.join(self._book_path, d))] - except EnvironmentError as e: - if e.errno != errno.ENOENT: - raise - for lb_uuid in lb_uuids: - try: - yield self._get_logbook(lb_uuid) - except exc.NotFound: - pass - - def get_logbooks(self): - try: - books = list(self._get_logbooks()) - except EnvironmentError as e: - raise exc.StorageFailure("Unable to fetch logbooks", e) - else: - for b in books: - yield b - - @property - def backend(self): - return self._backend - - def close(self): - pass - - def _save_atom_details(self, atom_detail, ignore_missing): - # See if we have an existing atom detail to merge with. - e_ad = None - try: - e_ad = self._get_atom_details(atom_detail.uuid, lock=False) - except EnvironmentError: - if not ignore_missing: - raise exc.NotFound("No atom details found with id: %s" - % atom_detail.uuid) - if e_ad is not None: - atom_detail = e_ad.merge(atom_detail) - ad_path = os.path.join(self._atom_path, atom_detail.uuid) - ad_data = base._format_atom(atom_detail) - self._write_to(ad_path, jsonutils.dumps(ad_data)) - return atom_detail - - def update_atom_details(self, atom_detail): - return self._run_with_process_lock("atom", - self._save_atom_details, - atom_detail, - ignore_missing=False) - - def _get_atom_details(self, uuid, lock=True): - - def _get(): - ad_path = os.path.join(self._atom_path, uuid) - ad_data = misc.decode_json(self._read_from(ad_path)) - ad_cls = logbook.atom_detail_class(ad_data['type']) - return ad_cls.from_dict(ad_data['atom']) - - if lock: - return self._run_with_process_lock('atom', _get) - else: - return _get() - - def _get_flow_details(self, uuid, lock=True): - - def _get(): - fd_path = os.path.join(self._flow_path, uuid) - meta_path = os.path.join(fd_path, 'metadata') - meta = misc.decode_json(self._read_from(meta_path)) - fd = logbook.FlowDetail.from_dict(meta) - ad_to_load = [] - ad_path = os.path.join(fd_path, 'atoms') - try: - ad_to_load = [f for f in os.listdir(ad_path) - if os.path.islink(os.path.join(ad_path, f))] - except EnvironmentError as e: - if e.errno != errno.ENOENT: - raise - for ad_uuid in ad_to_load: - fd.add(self._get_atom_details(ad_uuid)) - return fd - - if lock: - return self._run_with_process_lock('flow', _get) - else: - return _get() - - def _save_atoms_and_link(self, atom_details, local_atom_path): - for atom_detail in atom_details: - self._save_atom_details(atom_detail, ignore_missing=True) - src_ad_path = os.path.join(self._atom_path, atom_detail.uuid) - target_ad_path = os.path.join(local_atom_path, atom_detail.uuid) - try: - os.symlink(src_ad_path, target_ad_path) - except EnvironmentError as e: + os.symlink(src_path, dest_path) + except OSError as e: if e.errno != errno.EEXIST: raise - def _save_flow_details(self, flow_detail, ignore_missing): - # See if we have an existing flow detail to merge with. - e_fd = None - try: - e_fd = self._get_flow_details(flow_detail.uuid, lock=False) - except EnvironmentError: - if not ignore_missing: - raise exc.NotFound("No flow details found with id: %s" - % flow_detail.uuid) - if e_fd is not None: - e_fd = e_fd.merge(flow_detail) - for ad in flow_detail: - if e_fd.find(ad.uuid) is None: - e_fd.add(ad) - flow_detail = e_fd - flow_path = os.path.join(self._flow_path, flow_detail.uuid) - misc.ensure_tree(flow_path) - self._write_to(os.path.join(flow_path, 'metadata'), - jsonutils.dumps(flow_detail.to_dict())) - if len(flow_detail): - atom_path = os.path.join(flow_path, 'atoms') - misc.ensure_tree(atom_path) - self._run_with_process_lock('atom', - self._save_atoms_and_link, - list(flow_detail), atom_path) - return flow_detail + @contextlib.contextmanager + def _transaction(self): + """This backend doesn't support transactions""" + yield - def update_flow_details(self, flow_detail): - return self._run_with_process_lock("flow", - self._save_flow_details, - flow_detail, - ignore_missing=False) - - def _save_flows_and_link(self, flow_details, local_flow_path): - for flow_detail in flow_details: - self._save_flow_details(flow_detail, ignore_missing=True) - src_fd_path = os.path.join(self._flow_path, flow_detail.uuid) - target_fd_path = os.path.join(local_flow_path, flow_detail.uuid) - try: - os.symlink(src_fd_path, target_fd_path) - except EnvironmentError as e: - if e.errno != errno.EEXIST: - raise - - def _save_logbook(self, book): - # See if we have an existing logbook to merge with. - e_lb = None - try: - e_lb = self._get_logbook(book.uuid) - except exc.NotFound: - pass - if e_lb is not None: - e_lb = e_lb.merge(book) - for fd in book: - if e_lb.find(fd.uuid) is None: - e_lb.add(fd) - book = e_lb - book_path = os.path.join(self._book_path, book.uuid) - misc.ensure_tree(book_path) - self._write_to(os.path.join(book_path, 'metadata'), - jsonutils.dumps(book.to_dict(marshal_time=True))) - if len(book): - flow_path = os.path.join(book_path, 'flows') - misc.ensure_tree(flow_path) - self._run_with_process_lock('flow', - self._save_flows_and_link, - list(book), flow_path) - return book - - def save_logbook(self, book): - return self._run_with_process_lock("book", - self._save_logbook, book) - - def upgrade(self): - - def _step_create(): - for path in (self._book_path, self._flow_path, self._atom_path): - try: - misc.ensure_tree(path) - except EnvironmentError as e: - raise exc.StorageFailure("Unable to create logbooks" - " required child path %s" % path, - e) - - for path in (self._backend.base_path, self._backend.lock_path): - try: - misc.ensure_tree(path) - except EnvironmentError as e: - raise exc.StorageFailure("Unable to create logbooks required" - " path %s" % path, e) - - self._run_with_process_lock("init", _step_create) - - def clear_all(self): - - def _step_clear(): - for d in (self._book_path, self._flow_path, self._atom_path): - if os.path.isdir(d): - shutil.rmtree(d) - - def _step_atom(): - self._run_with_process_lock("atom", _step_clear) - - def _step_flow(): - self._run_with_process_lock("flow", _step_atom) - - def _step_book(): - self._run_with_process_lock("book", _step_flow) - - # Acquire all locks by going through this little hierarchy. - self._run_with_process_lock("init", _step_book) - - def destroy_logbook(self, book_uuid): - - def _destroy_atoms(atom_details): - for atom_detail in atom_details: - atom_path = os.path.join(self._atom_path, atom_detail.uuid) - try: - shutil.rmtree(atom_path) - except EnvironmentError as e: - if e.errno != errno.ENOENT: - raise exc.StorageFailure("Unable to remove atom" - " directory %s" % atom_path, - e) - - def _destroy_flows(flow_details): - for flow_detail in flow_details: - flow_path = os.path.join(self._flow_path, flow_detail.uuid) - self._run_with_process_lock("atom", _destroy_atoms, - list(flow_detail)) - try: - shutil.rmtree(flow_path) - except EnvironmentError as e: - if e.errno != errno.ENOENT: - raise exc.StorageFailure("Unable to remove flow" - " directory %s" % flow_path, - e) - - def _destroy_book(): - book = self._get_logbook(book_uuid) - book_path = os.path.join(self._book_path, book.uuid) - self._run_with_process_lock("flow", _destroy_flows, list(book)) - try: - shutil.rmtree(book_path) - except EnvironmentError as e: - if e.errno != errno.ENOENT: - raise exc.StorageFailure("Unable to remove book" - " directory %s" % book_path, e) - - # Acquire all locks by going through this little hierarchy. - self._run_with_process_lock("book", _destroy_book) - - def _get_logbook(self, book_uuid): - book_path = os.path.join(self._book_path, book_uuid) - meta_path = os.path.join(book_path, 'metadata') - try: - meta = misc.decode_json(self._read_from(meta_path)) - except EnvironmentError as e: - if e.errno == errno.ENOENT: - raise exc.NotFound("No logbook found with id: %s" % book_uuid) - else: - raise - lb = logbook.LogBook.from_dict(meta, unmarshal_time=True) - fd_path = os.path.join(book_path, 'flows') - fd_uuids = [] - try: - fd_uuids = [f for f in os.listdir(fd_path) - if os.path.islink(os.path.join(fd_path, f))] - except EnvironmentError as e: - if e.errno != errno.ENOENT: - raise - for fd_uuid in fd_uuids: - lb.add(self._get_flow_details(fd_uuid)) - return lb - - def get_logbook(self, book_uuid): - return self._run_with_process_lock("book", - self._get_logbook, book_uuid) - - def get_flow_details(self, fd_uuid): - return self._get_flow_details(fd_uuid) - - def get_atom_details(self, ad_uuid): - return self._get_atom_details(ad_uuid) + def validate(self): + with _storagefailure_wrapper(): + for p in (self.flow_path, self.atom_path, self.book_path): + if not os.path.isdir(p): + raise RuntimeError("Missing required directory: %s" % (p)) diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py index 020deff8..7fc22e6d 100644 --- a/taskflow/persistence/backends/impl_memory.py +++ b/taskflow/persistence/backends/impl_memory.py @@ -15,127 +15,32 @@ # License for the specific language governing permissions and limitations # under the License. -import functools - -import six +import contextlib +import copy +import os from taskflow import exceptions as exc -from taskflow import logging -from taskflow.persistence import base -from taskflow.persistence import logbook +from taskflow.persistence import path_based +from taskflow.types import tree from taskflow.utils import lock_utils -LOG = logging.getLogger(__name__) - -class _Memory(object): - """Where the data is really stored.""" - - def __init__(self): - self.log_books = {} - self.flow_details = {} - self.atom_details = {} - - def clear_all(self): - self.log_books.clear() - self.flow_details.clear() - self.atom_details.clear() - - -class _MemoryHelper(object): - """Helper functionality for the memory backends & connections.""" - - def __init__(self, memory): - self._memory = memory - - @staticmethod - def _fetch_clone_args(incoming): - if isinstance(incoming, (logbook.LogBook, logbook.FlowDetail)): - # We keep our own copy of the added contents of the following - # types so we don't need the clone to retain them directly... - return { - 'retain_contents': False, - } - return {} - - def construct(self, uuid, container): - """Reconstructs a object from the given uuid and storage container.""" - source = container[uuid] - clone_kwargs = self._fetch_clone_args(source) - clone = source['object'].copy(**clone_kwargs) - rebuilder = source.get('rebuilder') - if rebuilder: - for component in map(rebuilder, source['components']): - clone.add(component) - return clone - - def merge(self, incoming, saved_info=None): - """Merges the incoming object into the local memories copy.""" - if saved_info is None: - if isinstance(incoming, logbook.LogBook): - saved_info = self._memory.log_books.setdefault( - incoming.uuid, {}) - elif isinstance(incoming, logbook.FlowDetail): - saved_info = self._memory.flow_details.setdefault( - incoming.uuid, {}) - elif isinstance(incoming, logbook.AtomDetail): - saved_info = self._memory.atom_details.setdefault( - incoming.uuid, {}) - else: - raise TypeError("Unknown how to merge '%s' (%s)" - % (incoming, type(incoming))) - try: - saved_info['object'].merge(incoming) - except KeyError: - clone_kwargs = self._fetch_clone_args(incoming) - saved_info['object'] = incoming.copy(**clone_kwargs) - if isinstance(incoming, logbook.LogBook): - flow_details = saved_info.setdefault('components', set()) - if 'rebuilder' not in saved_info: - saved_info['rebuilder'] = functools.partial( - self.construct, container=self._memory.flow_details) - for flow_detail in incoming: - flow_details.add(self.merge(flow_detail)) - elif isinstance(incoming, logbook.FlowDetail): - atom_details = saved_info.setdefault('components', set()) - if 'rebuilder' not in saved_info: - saved_info['rebuilder'] = functools.partial( - self.construct, container=self._memory.atom_details) - for atom_detail in incoming: - atom_details.add(self.merge(atom_detail)) - return incoming.uuid - - -class MemoryBackend(base.Backend): +class MemoryBackend(path_based.PathBasedBackend): """A in-memory (non-persistent) backend. This backend writes logbooks, flow details, and atom details to in-memory dictionaries and retrieves from those dictionaries as needed. + + This backend does *not* provide true transactional semantics. It does + guarantee that there will be no inter-thread race conditions when + writing and reading by using a read/write locks. """ def __init__(self, conf=None): super(MemoryBackend, self).__init__(conf) - self._memory = _Memory() - self._helper = _MemoryHelper(self._memory) - self._lock = lock_utils.ReaderWriterLock() - - def _construct_from(self, container): - return dict((uuid, self._helper.construct(uuid, container)) - for uuid in six.iterkeys(container)) - - @property - def log_books(self): - with self._lock.read_lock(): - return self._construct_from(self._memory.log_books) - - @property - def flow_details(self): - with self._lock.read_lock(): - return self._construct_from(self._memory.flow_details) - - @property - def atom_details(self): - with self._lock.read_lock(): - return self._construct_from(self._memory.atom_details) + if self._path is None: + self._path = os.sep + self.memory = tree.Node(self._path) + self.lock = lock_utils.ReaderWriterLock() def get_connection(self): return Connection(self) @@ -144,107 +49,79 @@ class MemoryBackend(base.Backend): pass -class Connection(base.Connection): - """A connection to an in-memory backend.""" - +class Connection(path_based.PathBasedConnection): def __init__(self, backend): - self._backend = backend - self._helper = backend._helper - self._memory = backend._memory - self._lock = backend._lock + super(Connection, self).__init__(backend) + self.upgrade() - def upgrade(self): - pass + @contextlib.contextmanager + def _memory_lock(self, write=False): + if write: + lock = self.backend.lock.write_lock + else: + lock = self.backend.lock.read_lock + + with lock(): + try: + yield + except exc.TaskFlowException as e: + raise + except Exception as e: + raise exc.StorageFailure("Storage backend internal error", e) + + def _fetch_node(self, path): + node = self.backend.memory.find(path) + if node is None: + raise exc.NotFound("Item not found %s" % path) + return node + + def _join_path(self, *parts): + return os.path.join(*parts) + + def _get_item(self, path): + with self._memory_lock(): + return copy.deepcopy(self._fetch_node(path).metadata['value']) + + def _set_item(self, path, value, transaction): + value = copy.deepcopy(value) + try: + item_node = self._fetch_node(path) + item_node.metadata.update(value=value) + except exc.NotFound: + dirname, basename = os.path.split(path) + parent_node = self._fetch_node(dirname) + parent_node.add(tree.Node(path, name=basename, value=value)) + + def _del_tree(self, path, transaction): + node = self._fetch_node(path) + node.disassociate() + + def _get_children(self, path): + with self._memory_lock(): + return [node.metadata['name'] for node in self._fetch_node(path)] + + def _ensure_path(self, path): + with self._memory_lock(write=True): + path = os.path.normpath(path) + parts = path.split(os.sep) + node = self.backend.memory + for p in range(len(parts) - 1): + node_path = os.sep.join(parts[:p + 2]) + try: + node = self._fetch_node(node_path) + except exc.NotFound: + node.add(tree.Node(node_path, name=parts[p + 1])) + + def _create_link(self, src_path, dest_path, transaction): + dirname, basename = os.path.split(dest_path) + parent_node = self._fetch_node(dirname) + parent_node.add(tree.Node(dest_path, name=basename, target=src_path)) + + @contextlib.contextmanager + def _transaction(self): + """This just wraps a global write-lock""" + with self._memory_lock(write=True): + yield def validate(self): pass - - @property - def backend(self): - return self._backend - - def close(self): - pass - - def clear_all(self): - with self._lock.write_lock(): - self._memory.clear_all() - - def destroy_logbook(self, book_uuid): - with self._lock.write_lock(): - try: - # Do the same cascading delete that the sql layer does. - book_info = self._memory.log_books.pop(book_uuid) - except KeyError: - raise exc.NotFound("No logbook found with uuid '%s'" - % book_uuid) - else: - while book_info['components']: - flow_uuid = book_info['components'].pop() - flow_info = self._memory.flow_details.pop(flow_uuid) - while flow_info['components']: - atom_uuid = flow_info['components'].pop() - self._memory.atom_details.pop(atom_uuid) - - def update_atom_details(self, atom_detail): - with self._lock.write_lock(): - try: - atom_info = self._memory.atom_details[atom_detail.uuid] - return self._helper.construct( - self._helper.merge(atom_detail, saved_info=atom_info), - self._memory.atom_details) - except KeyError: - raise exc.NotFound("No atom details found with uuid '%s'" - % atom_detail.uuid) - - def update_flow_details(self, flow_detail): - with self._lock.write_lock(): - try: - flow_info = self._memory.flow_details[flow_detail.uuid] - return self._helper.construct( - self._helper.merge(flow_detail, saved_info=flow_info), - self._memory.flow_details) - except KeyError: - raise exc.NotFound("No flow details found with uuid '%s'" - % flow_detail.uuid) - - def save_logbook(self, book): - with self._lock.write_lock(): - return self._helper.construct(self._helper.merge(book), - self._memory.log_books) - - def get_logbook(self, book_uuid): - with self._lock.read_lock(): - try: - return self._helper.construct(book_uuid, - self._memory.log_books) - except KeyError: - raise exc.NotFound("No logbook found with uuid '%s'" - % book_uuid) - - def get_logbooks(self): - # Don't hold locks while iterating... - with self._lock.read_lock(): - book_uuids = set(six.iterkeys(self._memory.log_books)) - for book_uuid in book_uuids: - try: - with self._lock.read_lock(): - book = self._helper.construct(book_uuid, - self._memory.log_books) - yield book - except KeyError: - pass - - def get_flow_details(self, fd_uuid): - try: - with self._lock.read_lock(): - return self._memory.flow_details[fd_uuid] - except KeyError: - raise exc.NotFound("No flow details found '%s'" % fd_uuid) - - def get_atom_details(self, ad_uuid): - try: - with self._lock.read_lock(): - return self._memory.atom_details[ad_uuid] - except KeyError: - raise exc.NotFound("No atom details found '%s'" % ad_uuid) diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index a49d2492..4368b78a 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -202,25 +202,25 @@ class Alchemist(object): atom_cls = logbook.atom_detail_class(row.pop('atom_type')) return atom_cls.from_dict(row) - def _atom_query_iter(self, conn, parent_uuid): + def atom_query_iter(self, conn, parent_uuid): q = (sql.select([self._tables.atomdetails]). where(self._tables.atomdetails.c.parent_uuid == parent_uuid)) for row in conn.execute(q): yield self.convert_atom_detail(row) - def _flow_query_iter(self, conn, parent_uuid): + def flow_query_iter(self, conn, parent_uuid): q = (sql.select([self._tables.flowdetails]). where(self._tables.flowdetails.c.parent_uuid == parent_uuid)) for row in conn.execute(q): yield self.convert_flow_detail(row) def populate_book(self, conn, book): - for fd in self._flow_query_iter(conn, book.uuid): + for fd in self.flow_query_iter(conn, book.uuid): book.add(fd) self.populate_flow_detail(conn, fd) def populate_flow_detail(self, conn, fd): - for ad in self._atom_query_iter(conn, fd.uuid): + for ad in self.atom_query_iter(conn, fd.uuid): fd.add(ad) @@ -558,6 +558,19 @@ class Connection(base.Connection): for book in gathered: yield book + def get_flows_for_book(self, book_uuid): + gathered = [] + try: + with contextlib.closing(self._engine.connect()) as conn: + for row in self._converter.flow_query_iter(conn, book_uuid): + flow_details = self._converter.populate_flow_detail(conn, + row) + gathered.append(flow_details) + except sa_exc.DBAPIError as e: + raise exc.StorageFailure("Failed getting flow details", e) + for flow_details in gathered: + yield flow_details + def get_flow_details(self, fd_uuid): try: flowdetails = self._tables.flowdetails diff --git a/taskflow/persistence/backends/impl_zookeeper.py b/taskflow/persistence/backends/impl_zookeeper.py index ae8096f1..b60e93bf 100644 --- a/taskflow/persistence/backends/impl_zookeeper.py +++ b/taskflow/persistence/backends/impl_zookeeper.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright (C) 2014 AT&T Labs All Rights Reserved. +# Copyright (C) 2015 Rackspace Hosting All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -21,30 +22,18 @@ from kazoo.protocol import paths from oslo_serialization import jsonutils from taskflow import exceptions as exc -from taskflow import logging -from taskflow.persistence import base -from taskflow.persistence import logbook +from taskflow.persistence import path_based from taskflow.utils import kazoo_utils as k_utils from taskflow.utils import misc -LOG = logging.getLogger(__name__) -# Transaction support was added in 3.4.0 MIN_ZK_VERSION = (3, 4, 0) -class ZkBackend(base.Backend): - """A zookeeper backend. - - This backend writes logbooks, flow details, and atom details to a provided - base path in zookeeper. It will create and store those objects in three - key directories (one for logbooks, one for flow details and one for atom - details). It creates those associated directories and then creates files - inside those directories that represent the contents of those objects for - later reading and writing. +class ZkBackend(path_based.PathBasedBackend): + """A zookeeper-backed backend. Example configuration:: - conf = { "hosts": "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181", "path": "/taskflow", @@ -52,24 +41,18 @@ class ZkBackend(base.Backend): """ def __init__(self, conf, client=None): super(ZkBackend, self).__init__(conf) - path = str(conf.get("path", "/taskflow")) - if not path: - raise ValueError("Empty zookeeper path is disallowed") - if not paths.isabs(path): + if not self._path: + self._path = '/taskflow' + if not paths.isabs(self._path): raise ValueError("Zookeeper path must be absolute") - self._path = path if client is not None: self._client = client self._owned = False else: - self._client = k_utils.make_client(conf) + self._client = k_utils.make_client(self._conf) self._owned = True self._validated = False - @property - def path(self): - return self._path - def get_connection(self): conn = ZkConnection(self, self._client, self._conf) if not self._validated: @@ -87,52 +70,15 @@ class ZkBackend(base.Backend): raise exc.StorageFailure("Unable to finalize client", e) -class ZkConnection(base.Connection): +class ZkConnection(path_based.PathBasedConnection): def __init__(self, backend, client, conf): - self._backend = backend - self._client = client + super(ZkConnection, self).__init__(backend) self._conf = conf - self._book_path = paths.join(self._backend.path, "books") - self._flow_path = paths.join(self._backend.path, "flow_details") - self._atom_path = paths.join(self._backend.path, "atom_details") + self._client = client with self._exc_wrapper(): # NOOP if already started. self._client.start() - def validate(self): - with self._exc_wrapper(): - try: - if self._conf.get('check_compatible', True): - k_utils.check_compatible(self._client, MIN_ZK_VERSION) - except exc.IncompatibleVersion as e: - raise exc.StorageFailure("Backend storage is not a" - " compatible version", e) - - @property - def backend(self): - return self._backend - - @property - def book_path(self): - return self._book_path - - @property - def flow_path(self): - return self._flow_path - - @property - def atom_path(self): - return self._atom_path - - def close(self): - pass - - def upgrade(self): - """Creates the initial paths (if they already don't exist).""" - with self._exc_wrapper(): - for path in (self.book_path, self.flow_path, self.atom_path): - self._client.ensure_path(path) - @contextlib.contextmanager def _exc_wrapper(self): """Exception context-manager which wraps kazoo exceptions. @@ -146,8 +92,7 @@ class ZkConnection(base.Connection): except self._client.handler.timeout_exception as e: raise exc.StorageFailure("Storage backend timeout", e) except k_exc.SessionExpiredError as e: - raise exc.StorageFailure("Storage backend session" - " has expired", e) + raise exc.StorageFailure("Storage backend session has expired", e) except k_exc.NoNodeError as e: raise exc.NotFound("Storage backend node not found: %s" % e) except k_exc.NodeExistsError as e: @@ -155,273 +100,50 @@ class ZkConnection(base.Connection): except (k_exc.KazooException, k_exc.ZookeeperError) as e: raise exc.StorageFailure("Storage backend internal error", e) - def update_atom_details(self, ad): - """Update a atom detail transactionally.""" - with self._exc_wrapper(): - txn = self._client.transaction() - ad = self._update_atom_details(ad, txn) - k_utils.checked_commit(txn) - return ad + def _join_path(self, *parts): + return paths.join(*parts) - def _update_atom_details(self, ad, txn, create_missing=False): - # Determine whether the desired data exists or not. - ad_path = paths.join(self.atom_path, ad.uuid) - e_ad = None - try: - ad_data, _zstat = self._client.get(ad_path) - except k_exc.NoNodeError: - # Not-existent: create or raise exception. - if not create_missing: - raise exc.NotFound("No atom details found with" - " id: %s" % ad.uuid) - else: - txn.create(ad_path) + def _get_item(self, path): + with self._exc_wrapper(): + data, _ = self._client.get(path) + return misc.decode_json(data) + + def _set_item(self, path, value, transaction): + data = misc.binary_encode(jsonutils.dumps(value)) + if not self._client.exists(path): + transaction.create(path, data) else: - # Existent: read it out. + transaction.set_data(path, data) + + def _del_tree(self, path, transaction): + for child in self._get_children(path): + self._del_tree(self._join_path(path, child), transaction) + transaction.delete(path) + + def _get_children(self, path): + with self._exc_wrapper(): + return self._client.get_children(path) + + def _ensure_path(self, path): + with self._exc_wrapper(): + self._client.ensure_path(path) + + def _create_link(self, src_path, dest_path, transaction): + if not self._client.exists(dest_path): + transaction.create(dest_path) + + @contextlib.contextmanager + def _transaction(self): + transaction = self._client.transaction() + with self._exc_wrapper(): + yield transaction + k_utils.checked_commit(transaction) + + def validate(self): + with self._exc_wrapper(): try: - ad_data = misc.decode_json(ad_data) - ad_cls = logbook.atom_detail_class(ad_data['type']) - e_ad = ad_cls.from_dict(ad_data['atom']) - except KeyError: - pass - - # Update and write it back - if e_ad: - e_ad = e_ad.merge(ad) - else: - e_ad = ad - ad_data = base._format_atom(e_ad) - txn.set_data(ad_path, - misc.binary_encode(jsonutils.dumps(ad_data))) - return e_ad - - def get_atom_details(self, ad_uuid): - """Read a atom detail. - - *Read-only*, so no need of zk transaction. - """ - with self._exc_wrapper(): - return self._get_atom_details(ad_uuid) - - def _get_atom_details(self, ad_uuid): - ad_path = paths.join(self.atom_path, ad_uuid) - try: - ad_data, _zstat = self._client.get(ad_path) - except k_exc.NoNodeError: - raise exc.NotFound("No atom details found with id: %s" % ad_uuid) - else: - ad_data = misc.decode_json(ad_data) - ad_cls = logbook.atom_detail_class(ad_data['type']) - return ad_cls.from_dict(ad_data['atom']) - - def update_flow_details(self, fd): - """Update a flow detail transactionally.""" - with self._exc_wrapper(): - txn = self._client.transaction() - fd = self._update_flow_details(fd, txn) - k_utils.checked_commit(txn) - return fd - - def _update_flow_details(self, fd, txn, create_missing=False): - # Determine whether the desired data exists or not - fd_path = paths.join(self.flow_path, fd.uuid) - try: - fd_data, _zstat = self._client.get(fd_path) - except k_exc.NoNodeError: - # Not-existent: create or raise exception - if create_missing: - txn.create(fd_path) - e_fd = logbook.FlowDetail(name=fd.name, uuid=fd.uuid) - else: - raise exc.NotFound("No flow details found with id: %s" - % fd.uuid) - else: - # Existent: read it out - e_fd = logbook.FlowDetail.from_dict(misc.decode_json(fd_data)) - - # Update and write it back - e_fd = e_fd.merge(fd) - fd_data = e_fd.to_dict() - txn.set_data(fd_path, misc.binary_encode(jsonutils.dumps(fd_data))) - for ad in fd: - ad_path = paths.join(fd_path, ad.uuid) - # NOTE(harlowja): create an entry in the flow detail path - # for the provided atom detail so that a reference exists - # from the flow detail to its atom details. - if not self._client.exists(ad_path): - txn.create(ad_path) - e_fd.add(self._update_atom_details(ad, txn, create_missing=True)) - return e_fd - - def get_flow_details(self, fd_uuid): - """Read a flow detail. - - *Read-only*, so no need of zk transaction. - """ - with self._exc_wrapper(): - return self._get_flow_details(fd_uuid) - - def _get_flow_details(self, fd_uuid): - fd_path = paths.join(self.flow_path, fd_uuid) - try: - fd_data, _zstat = self._client.get(fd_path) - except k_exc.NoNodeError: - raise exc.NotFound("No flow details found with id: %s" % fd_uuid) - - fd = logbook.FlowDetail.from_dict(misc.decode_json(fd_data)) - for ad_uuid in self._client.get_children(fd_path): - fd.add(self._get_atom_details(ad_uuid)) - return fd - - def save_logbook(self, lb): - """Save (update) a log_book transactionally.""" - - def _create_logbook(lb_path, txn): - lb_data = lb.to_dict(marshal_time=True) - txn.create(lb_path, misc.binary_encode(jsonutils.dumps(lb_data))) - for fd in lb: - # NOTE(harlowja): create an entry in the logbook path - # for the provided flow detail so that a reference exists - # from the logbook to its flow details. - txn.create(paths.join(lb_path, fd.uuid)) - fd_path = paths.join(self.flow_path, fd.uuid) - fd_data = jsonutils.dumps(fd.to_dict()) - txn.create(fd_path, misc.binary_encode(fd_data)) - for ad in fd: - # NOTE(harlowja): create an entry in the flow detail path - # for the provided atom detail so that a reference exists - # from the flow detail to its atom details. - txn.create(paths.join(fd_path, ad.uuid)) - ad_path = paths.join(self.atom_path, ad.uuid) - ad_data = base._format_atom(ad) - txn.create(ad_path, - misc.binary_encode(jsonutils.dumps(ad_data))) - return lb - - def _update_logbook(lb_path, lb_data, txn): - e_lb = logbook.LogBook.from_dict(misc.decode_json(lb_data), - unmarshal_time=True) - e_lb = e_lb.merge(lb) - lb_data = e_lb.to_dict(marshal_time=True) - txn.set_data(lb_path, misc.binary_encode(jsonutils.dumps(lb_data))) - for fd in lb: - fd_path = paths.join(lb_path, fd.uuid) - if not self._client.exists(fd_path): - # NOTE(harlowja): create an entry in the logbook path - # for the provided flow detail so that a reference exists - # from the logbook to its flow details. - txn.create(fd_path) - e_fd = self._update_flow_details(fd, txn, create_missing=True) - e_lb.add(e_fd) - return e_lb - - with self._exc_wrapper(): - txn = self._client.transaction() - # Determine whether the desired data exists or not. - lb_path = paths.join(self.book_path, lb.uuid) - try: - lb_data, _zstat = self._client.get(lb_path) - except k_exc.NoNodeError: - # Create a new logbook since it doesn't exist. - e_lb = _create_logbook(lb_path, txn) - else: - # Otherwise update the existing logbook instead. - e_lb = _update_logbook(lb_path, lb_data, txn) - k_utils.checked_commit(txn) - return e_lb - - def _get_logbook(self, lb_uuid): - lb_path = paths.join(self.book_path, lb_uuid) - try: - lb_data, _zstat = self._client.get(lb_path) - except k_exc.NoNodeError: - raise exc.NotFound("No logbook found with id: %s" % lb_uuid) - else: - lb = logbook.LogBook.from_dict(misc.decode_json(lb_data), - unmarshal_time=True) - for fd_uuid in self._client.get_children(lb_path): - lb.add(self._get_flow_details(fd_uuid)) - return lb - - def get_logbook(self, lb_uuid): - """Read a logbook. - - *Read-only*, so no need of zk transaction. - """ - with self._exc_wrapper(): - return self._get_logbook(lb_uuid) - - def get_logbooks(self): - """Read all logbooks. - - *Read-only*, so no need of zk transaction. - """ - with self._exc_wrapper(): - for lb_uuid in self._client.get_children(self.book_path): - yield self._get_logbook(lb_uuid) - - def destroy_logbook(self, lb_uuid): - """Destroy (delete) a log_book transactionally.""" - - def _destroy_atom_details(ad_uuid, txn): - ad_path = paths.join(self.atom_path, ad_uuid) - if not self._client.exists(ad_path): - raise exc.NotFound("No atom details found with id: %s" - % ad_uuid) - txn.delete(ad_path) - - def _destroy_flow_details(fd_uuid, txn): - fd_path = paths.join(self.flow_path, fd_uuid) - if not self._client.exists(fd_path): - raise exc.NotFound("No flow details found with id: %s" - % fd_uuid) - for ad_uuid in self._client.get_children(fd_path): - _destroy_atom_details(ad_uuid, txn) - txn.delete(paths.join(fd_path, ad_uuid)) - txn.delete(fd_path) - - def _destroy_logbook(lb_uuid, txn): - lb_path = paths.join(self.book_path, lb_uuid) - if not self._client.exists(lb_path): - raise exc.NotFound("No logbook found with id: %s" % lb_uuid) - for fd_uuid in self._client.get_children(lb_path): - _destroy_flow_details(fd_uuid, txn) - txn.delete(paths.join(lb_path, fd_uuid)) - txn.delete(lb_path) - - with self._exc_wrapper(): - txn = self._client.transaction() - _destroy_logbook(lb_uuid, txn) - k_utils.checked_commit(txn) - - def clear_all(self, delete_dirs=True): - """Delete all data transactionally.""" - with self._exc_wrapper(): - txn = self._client.transaction() - - # Delete all data under logbook path. - for lb_uuid in self._client.get_children(self.book_path): - lb_path = paths.join(self.book_path, lb_uuid) - for fd_uuid in self._client.get_children(lb_path): - txn.delete(paths.join(lb_path, fd_uuid)) - txn.delete(lb_path) - - # Delete all data under flow detail path. - for fd_uuid in self._client.get_children(self.flow_path): - fd_path = paths.join(self.flow_path, fd_uuid) - for ad_uuid in self._client.get_children(fd_path): - txn.delete(paths.join(fd_path, ad_uuid)) - txn.delete(fd_path) - - # Delete all data under atom detail path. - for ad_uuid in self._client.get_children(self.atom_path): - ad_path = paths.join(self.atom_path, ad_uuid) - txn.delete(ad_path) - - # Delete containing directories. - if delete_dirs: - txn.delete(self.book_path) - txn.delete(self.atom_path) - txn.delete(self.flow_path) - - k_utils.checked_commit(txn) + if self._conf.get('check_compatible', True): + k_utils.check_compatible(self._client, MIN_ZK_VERSION) + except exc.IncompatibleVersion as e: + raise exc.StorageFailure("Backend storage is not a" + " compatible version", e) diff --git a/taskflow/persistence/base.py b/taskflow/persistence/base.py index 0ce09259..a1f120df 100644 --- a/taskflow/persistence/base.py +++ b/taskflow/persistence/base.py @@ -118,6 +118,11 @@ class Connection(object): """Return an iterable of logbook objects.""" pass + @abc.abstractmethod + def get_flows_for_book(self, book_uuid): + """Return an iterable of flowdetails for a given logbook uuid.""" + pass + @abc.abstractmethod def get_flow_details(self, fd_uuid): """Fetches a flowdetails object matching the given uuid.""" diff --git a/taskflow/persistence/path_based.py b/taskflow/persistence/path_based.py new file mode 100644 index 00000000..ea080257 --- /dev/null +++ b/taskflow/persistence/path_based.py @@ -0,0 +1,244 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Rackspace Hosting All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import six + +from taskflow import exceptions as exc +from taskflow.persistence import base +from taskflow.persistence import logbook + + +@six.add_metaclass(abc.ABCMeta) +class PathBasedBackend(base.Backend): + """Base class for persistence backends that address data by path + + Subclasses of this backend write logbooks, flow details, and atom details + to a provided base path in some filesystem-like storage. They will create + and store those objects in three key directories (one for logbooks, one + for flow details and one for atom details). They create those associated + directories and then create files inside those directories that represent + the contents of those objects for later reading and writing. + """ + + def __init__(self, conf): + super(PathBasedBackend, self).__init__(conf) + if conf is None: + conf = {} + self._path = conf.get('path', None) + + @property + def path(self): + return self._path + + +@six.add_metaclass(abc.ABCMeta) +class PathBasedConnection(base.Connection): + def __init__(self, backend): + self._backend = backend + self._book_path = self._join_path(backend.path, "books") + self._flow_path = self._join_path(backend.path, "flow_details") + self._atom_path = self._join_path(backend.path, "atom_details") + + @staticmethod + def _serialize(obj): + if isinstance(obj, logbook.LogBook): + return obj.to_dict(marshal_time=True) + elif isinstance(obj, logbook.FlowDetail): + return obj.to_dict() + elif isinstance(obj, logbook.AtomDetail): + return base._format_atom(obj) + else: + raise exc.StorageFailure("Invalid storage class %s" % type(obj)) + + @staticmethod + def _deserialize(cls, data): + if issubclass(cls, logbook.LogBook): + return cls.from_dict(data, unmarshal_time=True) + elif issubclass(cls, logbook.FlowDetail): + return cls.from_dict(data) + elif issubclass(cls, logbook.AtomDetail): + atom_class = logbook.atom_detail_class(data['type']) + return atom_class.from_dict(data['atom']) + else: + raise exc.StorageFailure("Invalid storage class %s" % cls) + + @property + def backend(self): + return self._backend + + @property + def book_path(self): + return self._book_path + + @property + def flow_path(self): + return self._flow_path + + @property + def atom_path(self): + return self._atom_path + + @abc.abstractmethod + def _join_path(self, *parts): + """Accept path parts, and return a joined path""" + + @abc.abstractmethod + def _get_item(self, path): + """Fetch a single item from the backend""" + + @abc.abstractmethod + def _set_item(self, path, value, transaction): + """Write a single item to the backend""" + + @abc.abstractmethod + def _del_tree(self, path, transaction): + """Recursively deletes a folder from the backend.""" + + @abc.abstractmethod + def _get_children(self, path): + """Get a list of child items of a path""" + + @abc.abstractmethod + def _ensure_path(self, path): + """Recursively ensure that a path (folder) in the backend exists""" + + @abc.abstractmethod + def _create_link(self, src_path, dest_path, transaction): + """Create a symlink-like link between two paths""" + + @abc.abstractmethod + def _transaction(self): + """Context manager that yields a transaction""" + + def _get_obj_path(self, obj): + if isinstance(obj, logbook.LogBook): + path = self.book_path + elif isinstance(obj, logbook.FlowDetail): + path = self.flow_path + elif isinstance(obj, logbook.AtomDetail): + path = self.atom_path + else: + raise exc.StorageFailure("Invalid storage class %s" % type(obj)) + return self._join_path(path, obj.uuid) + + def _update_object(self, obj, transaction, ignore_missing=False): + path = self._get_obj_path(obj) + try: + item_data = self._get_item(path) + existing_obj = self._deserialize(type(obj), item_data) + obj = existing_obj.merge(obj) + except exc.NotFound: + if not ignore_missing: + raise + self._set_item(path, self._serialize(obj), transaction) + return obj + + def get_logbooks(self, lazy=False): + for book_uuid in self._get_children(self.book_path): + yield self.get_logbook(book_uuid, lazy) + + def get_logbook(self, book_uuid, lazy=False): + book_path = self._join_path(self.book_path, book_uuid) + book_data = self._get_item(book_path) + book = self._deserialize(logbook.LogBook, book_data) + if not lazy: + for flow_details in self.get_flows_for_book(book_uuid): + book.add(flow_details) + return book + + def save_logbook(self, book): + book_path = self._get_obj_path(book) + with self._transaction() as transaction: + self._update_object(book, transaction, ignore_missing=True) + for flow_details in book: + flow_path = self._get_obj_path(flow_details) + link_path = self._join_path(book_path, flow_details.uuid) + self._do_update_flow_details(flow_details, transaction, + ignore_missing=True) + self._create_link(flow_path, link_path, transaction) + return book + + def get_flows_for_book(self, book_uuid, lazy=False): + book_path = self._join_path(self.book_path, book_uuid) + for flow_uuid in self._get_children(book_path): + yield self.get_flow_details(flow_uuid, lazy) + + def get_flow_details(self, flow_uuid, lazy=False): + flow_path = self._join_path(self.flow_path, flow_uuid) + flow_data = self._get_item(flow_path) + flow_details = self._deserialize(logbook.FlowDetail, flow_data) + if not lazy: + for atom_details in self.get_atoms_for_flow(flow_uuid): + flow_details.add(atom_details) + return flow_details + + def _do_update_flow_details(self, flow_detail, transaction, + ignore_missing=False): + flow_path = self._get_obj_path(flow_detail) + self._update_object(flow_detail, transaction, ignore_missing) + for atom_details in flow_detail: + atom_path = self._get_obj_path(atom_details) + link_path = self._join_path(flow_path, atom_details.uuid) + self._create_link(atom_path, link_path, transaction) + self._update_object(atom_details, transaction, ignore_missing=True) + return flow_detail + + def update_flow_details(self, flow_detail, ignore_missing=False): + with self._transaction() as transaction: + return self._do_update_flow_details(flow_detail, transaction, + ignore_missing) + + def get_atoms_for_flow(self, flow_uuid): + flow_path = self._join_path(self.flow_path, flow_uuid) + for atom_uuid in self._get_children(flow_path): + yield self.get_atom_details(atom_uuid) + + def get_atom_details(self, atom_uuid): + atom_path = self._join_path(self.atom_path, atom_uuid) + atom_data = self._get_item(atom_path) + return self._deserialize(logbook.AtomDetail, atom_data) + + def update_atom_details(self, atom_detail, ignore_missing=False): + with self._transaction() as transaction: + return self._update_object(atom_detail, transaction, + ignore_missing) + + def _do_destroy_logbook(self, book_uuid, transaction): + book_path = self._join_path(self.book_path, book_uuid) + for flow_uuid in self._get_children(book_path): + flow_path = self._join_path(self.flow_path, flow_uuid) + for atom_uuid in self._get_children(flow_path): + atom_path = self._join_path(self.atom_path, atom_uuid) + self._del_tree(atom_path, transaction) + self._del_tree(flow_path, transaction) + self._del_tree(book_path, transaction) + + def destroy_logbook(self, book_uuid): + with self._transaction() as transaction: + return self._do_destroy_logbook(book_uuid, transaction) + + def clear_all(self): + with self._transaction() as transaction: + for path in (self.book_path, self.flow_path, self.atom_path): + self._del_tree(path, transaction) + + def upgrade(self): + for path in (self.book_path, self.flow_path, self.atom_path): + self._ensure_path(path) + + def close(self): + pass