Persistence backend refactor

Factors lots of duplicate code out of persistence backends
Adds get_flows_for_book to all backends

Change-Id: I0434bd4931cd9274876f9e9c92909531f244bcac
This commit is contained in:
Dan Krause
2015-03-03 10:57:09 -06:00
parent 00ab6289ab
commit 3e8eb915b5
6 changed files with 474 additions and 883 deletions

View File

@@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import errno
import os
import shutil
@@ -23,25 +24,26 @@ from oslo_serialization import jsonutils
import six
from taskflow import exceptions as exc
from taskflow import logging
from taskflow.persistence import base
from taskflow.persistence import logbook
from taskflow.persistence import path_based
from taskflow.utils import lock_utils
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@contextlib.contextmanager
def _storagefailure_wrapper():
try:
yield
except exc.TaskFlowException:
raise
except Exception as e:
if isinstance(e, (IOError, OSError)) and e.errno == errno.ENOENT:
raise exc.NotFound('Item not found: %s' % e.filename, e)
raise exc.StorageFailure("Storage backend internal error", e)
class DirBackend(base.Backend):
class DirBackend(path_based.PathBasedBackend):
"""A directory and file based backend.
This backend writes logbooks, flow details, and atom details to a provided
base path on the local filesystem. It will create and store those objects
in three key directories (one for logbooks, one for flow details and one
for atom details). It creates those associated directories and then
creates files inside those directories that represent the contents of those
objects for later reading and writing.
This backend does *not* provide true transactional semantics. It does
guarantee that there will be no interprocess race conditions when
writing and reading by using a consistent hierarchy of file based locks.
@@ -54,17 +56,10 @@ class DirBackend(base.Backend):
"""
def __init__(self, conf):
super(DirBackend, self).__init__(conf)
self._path = os.path.abspath(conf['path'])
self._lock_path = os.path.join(self._path, 'locks')
self._file_cache = {}
@property
def lock_path(self):
return self._lock_path
@property
def base_path(self):
return self._path
self.file_cache = {}
if not self._path:
raise ValueError("Empty path is disallowed")
self._path = os.path.abspath(self._path)
def get_connection(self):
return Connection(self)
@@ -73,33 +68,13 @@ class DirBackend(base.Backend):
pass
class Connection(base.Connection):
def __init__(self, backend):
self._backend = backend
self._file_cache = self._backend._file_cache
self._flow_path = os.path.join(self._backend.base_path, 'flows')
self._atom_path = os.path.join(self._backend.base_path, 'atoms')
self._book_path = os.path.join(self._backend.base_path, 'books')
def validate(self):
# Verify key paths exist.
paths = [
self._backend.base_path,
self._backend.lock_path,
self._flow_path,
self._atom_path,
self._book_path,
]
for p in paths:
if not os.path.isdir(p):
raise RuntimeError("Missing required directory: %s" % (p))
class Connection(path_based.PathBasedConnection):
def _read_from(self, filename):
# This is very similar to the oslo-incubator fileutils module, but
# tweaked to not depend on a global cache, as well as tweaked to not
# pull-in the oslo logging module (which is a huge pile of code).
mtime = os.path.getmtime(filename)
cache_info = self._file_cache.setdefault(filename, {})
cache_info = self.backend.file_cache.setdefault(filename, {})
if not cache_info or mtime > cache_info.get('mtime', 0):
with open(filename, 'rb') as fp:
cache_info['data'] = fp.read().decode('utf-8')
@@ -111,301 +86,56 @@ class Connection(base.Connection):
contents = contents.encode('utf-8')
with open(filename, 'wb') as fp:
fp.write(contents)
self._file_cache.pop(filename, None)
self.backend.file_cache.pop(filename, None)
def _run_with_process_lock(self, lock_name, functor, *args, **kwargs):
lock_path = os.path.join(self.backend.lock_path, lock_name)
with lock_utils.InterProcessLock(lock_path):
@contextlib.contextmanager
def _path_lock(self, path):
lockfile = self._join_path(path, 'lock')
with lock_utils.InterProcessLock(lockfile) as lock:
with _storagefailure_wrapper():
yield lock
def _join_path(self, *parts):
return os.path.join(*parts)
def _get_item(self, path):
with self._path_lock(path):
item_path = self._join_path(path, 'metadata')
return misc.decode_json(self._read_from(item_path))
def _set_item(self, path, value, transaction):
with self._path_lock(path):
item_path = self._join_path(path, 'metadata')
self._write_to(item_path, jsonutils.dumps(value))
def _del_tree(self, path, transaction):
with self._path_lock(path):
shutil.rmtree(path)
def _get_children(self, path):
with _storagefailure_wrapper():
return [link for link in os.listdir(path)
if os.path.islink(self._join_path(path, link))]
def _ensure_path(self, path):
with _storagefailure_wrapper():
misc.ensure_tree(path)
def _create_link(self, src_path, dest_path, transaction):
with _storagefailure_wrapper():
try:
return functor(*args, **kwargs)
except exc.TaskFlowException:
raise
except Exception as e:
LOG.exception("Failed running locking file based session")
# NOTE(harlowja): trap all other errors as storage errors.
raise exc.StorageFailure("Storage backend internal error", e)
def _get_logbooks(self):
lb_uuids = []
try:
lb_uuids = [d for d in os.listdir(self._book_path)
if os.path.isdir(os.path.join(self._book_path, d))]
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
for lb_uuid in lb_uuids:
try:
yield self._get_logbook(lb_uuid)
except exc.NotFound:
pass
def get_logbooks(self):
try:
books = list(self._get_logbooks())
except EnvironmentError as e:
raise exc.StorageFailure("Unable to fetch logbooks", e)
else:
for b in books:
yield b
@property
def backend(self):
return self._backend
def close(self):
pass
def _save_atom_details(self, atom_detail, ignore_missing):
# See if we have an existing atom detail to merge with.
e_ad = None
try:
e_ad = self._get_atom_details(atom_detail.uuid, lock=False)
except EnvironmentError:
if not ignore_missing:
raise exc.NotFound("No atom details found with id: %s"
% atom_detail.uuid)
if e_ad is not None:
atom_detail = e_ad.merge(atom_detail)
ad_path = os.path.join(self._atom_path, atom_detail.uuid)
ad_data = base._format_atom(atom_detail)
self._write_to(ad_path, jsonutils.dumps(ad_data))
return atom_detail
def update_atom_details(self, atom_detail):
return self._run_with_process_lock("atom",
self._save_atom_details,
atom_detail,
ignore_missing=False)
def _get_atom_details(self, uuid, lock=True):
def _get():
ad_path = os.path.join(self._atom_path, uuid)
ad_data = misc.decode_json(self._read_from(ad_path))
ad_cls = logbook.atom_detail_class(ad_data['type'])
return ad_cls.from_dict(ad_data['atom'])
if lock:
return self._run_with_process_lock('atom', _get)
else:
return _get()
def _get_flow_details(self, uuid, lock=True):
def _get():
fd_path = os.path.join(self._flow_path, uuid)
meta_path = os.path.join(fd_path, 'metadata')
meta = misc.decode_json(self._read_from(meta_path))
fd = logbook.FlowDetail.from_dict(meta)
ad_to_load = []
ad_path = os.path.join(fd_path, 'atoms')
try:
ad_to_load = [f for f in os.listdir(ad_path)
if os.path.islink(os.path.join(ad_path, f))]
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
for ad_uuid in ad_to_load:
fd.add(self._get_atom_details(ad_uuid))
return fd
if lock:
return self._run_with_process_lock('flow', _get)
else:
return _get()
def _save_atoms_and_link(self, atom_details, local_atom_path):
for atom_detail in atom_details:
self._save_atom_details(atom_detail, ignore_missing=True)
src_ad_path = os.path.join(self._atom_path, atom_detail.uuid)
target_ad_path = os.path.join(local_atom_path, atom_detail.uuid)
try:
os.symlink(src_ad_path, target_ad_path)
except EnvironmentError as e:
os.symlink(src_path, dest_path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def _save_flow_details(self, flow_detail, ignore_missing):
# See if we have an existing flow detail to merge with.
e_fd = None
try:
e_fd = self._get_flow_details(flow_detail.uuid, lock=False)
except EnvironmentError:
if not ignore_missing:
raise exc.NotFound("No flow details found with id: %s"
% flow_detail.uuid)
if e_fd is not None:
e_fd = e_fd.merge(flow_detail)
for ad in flow_detail:
if e_fd.find(ad.uuid) is None:
e_fd.add(ad)
flow_detail = e_fd
flow_path = os.path.join(self._flow_path, flow_detail.uuid)
misc.ensure_tree(flow_path)
self._write_to(os.path.join(flow_path, 'metadata'),
jsonutils.dumps(flow_detail.to_dict()))
if len(flow_detail):
atom_path = os.path.join(flow_path, 'atoms')
misc.ensure_tree(atom_path)
self._run_with_process_lock('atom',
self._save_atoms_and_link,
list(flow_detail), atom_path)
return flow_detail
@contextlib.contextmanager
def _transaction(self):
"""This backend doesn't support transactions"""
yield
def update_flow_details(self, flow_detail):
return self._run_with_process_lock("flow",
self._save_flow_details,
flow_detail,
ignore_missing=False)
def _save_flows_and_link(self, flow_details, local_flow_path):
for flow_detail in flow_details:
self._save_flow_details(flow_detail, ignore_missing=True)
src_fd_path = os.path.join(self._flow_path, flow_detail.uuid)
target_fd_path = os.path.join(local_flow_path, flow_detail.uuid)
try:
os.symlink(src_fd_path, target_fd_path)
except EnvironmentError as e:
if e.errno != errno.EEXIST:
raise
def _save_logbook(self, book):
# See if we have an existing logbook to merge with.
e_lb = None
try:
e_lb = self._get_logbook(book.uuid)
except exc.NotFound:
pass
if e_lb is not None:
e_lb = e_lb.merge(book)
for fd in book:
if e_lb.find(fd.uuid) is None:
e_lb.add(fd)
book = e_lb
book_path = os.path.join(self._book_path, book.uuid)
misc.ensure_tree(book_path)
self._write_to(os.path.join(book_path, 'metadata'),
jsonutils.dumps(book.to_dict(marshal_time=True)))
if len(book):
flow_path = os.path.join(book_path, 'flows')
misc.ensure_tree(flow_path)
self._run_with_process_lock('flow',
self._save_flows_and_link,
list(book), flow_path)
return book
def save_logbook(self, book):
return self._run_with_process_lock("book",
self._save_logbook, book)
def upgrade(self):
def _step_create():
for path in (self._book_path, self._flow_path, self._atom_path):
try:
misc.ensure_tree(path)
except EnvironmentError as e:
raise exc.StorageFailure("Unable to create logbooks"
" required child path %s" % path,
e)
for path in (self._backend.base_path, self._backend.lock_path):
try:
misc.ensure_tree(path)
except EnvironmentError as e:
raise exc.StorageFailure("Unable to create logbooks required"
" path %s" % path, e)
self._run_with_process_lock("init", _step_create)
def clear_all(self):
def _step_clear():
for d in (self._book_path, self._flow_path, self._atom_path):
if os.path.isdir(d):
shutil.rmtree(d)
def _step_atom():
self._run_with_process_lock("atom", _step_clear)
def _step_flow():
self._run_with_process_lock("flow", _step_atom)
def _step_book():
self._run_with_process_lock("book", _step_flow)
# Acquire all locks by going through this little hierarchy.
self._run_with_process_lock("init", _step_book)
def destroy_logbook(self, book_uuid):
def _destroy_atoms(atom_details):
for atom_detail in atom_details:
atom_path = os.path.join(self._atom_path, atom_detail.uuid)
try:
shutil.rmtree(atom_path)
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise exc.StorageFailure("Unable to remove atom"
" directory %s" % atom_path,
e)
def _destroy_flows(flow_details):
for flow_detail in flow_details:
flow_path = os.path.join(self._flow_path, flow_detail.uuid)
self._run_with_process_lock("atom", _destroy_atoms,
list(flow_detail))
try:
shutil.rmtree(flow_path)
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise exc.StorageFailure("Unable to remove flow"
" directory %s" % flow_path,
e)
def _destroy_book():
book = self._get_logbook(book_uuid)
book_path = os.path.join(self._book_path, book.uuid)
self._run_with_process_lock("flow", _destroy_flows, list(book))
try:
shutil.rmtree(book_path)
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise exc.StorageFailure("Unable to remove book"
" directory %s" % book_path, e)
# Acquire all locks by going through this little hierarchy.
self._run_with_process_lock("book", _destroy_book)
def _get_logbook(self, book_uuid):
book_path = os.path.join(self._book_path, book_uuid)
meta_path = os.path.join(book_path, 'metadata')
try:
meta = misc.decode_json(self._read_from(meta_path))
except EnvironmentError as e:
if e.errno == errno.ENOENT:
raise exc.NotFound("No logbook found with id: %s" % book_uuid)
else:
raise
lb = logbook.LogBook.from_dict(meta, unmarshal_time=True)
fd_path = os.path.join(book_path, 'flows')
fd_uuids = []
try:
fd_uuids = [f for f in os.listdir(fd_path)
if os.path.islink(os.path.join(fd_path, f))]
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
for fd_uuid in fd_uuids:
lb.add(self._get_flow_details(fd_uuid))
return lb
def get_logbook(self, book_uuid):
return self._run_with_process_lock("book",
self._get_logbook, book_uuid)
def get_flow_details(self, fd_uuid):
return self._get_flow_details(fd_uuid)
def get_atom_details(self, ad_uuid):
return self._get_atom_details(ad_uuid)
def validate(self):
with _storagefailure_wrapper():
for p in (self.flow_path, self.atom_path, self.book_path):
if not os.path.isdir(p):
raise RuntimeError("Missing required directory: %s" % (p))

View File

@@ -15,127 +15,32 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import six
import contextlib
import copy
import os
from taskflow import exceptions as exc
from taskflow import logging
from taskflow.persistence import base
from taskflow.persistence import logbook
from taskflow.persistence import path_based
from taskflow.types import tree
from taskflow.utils import lock_utils
LOG = logging.getLogger(__name__)
class _Memory(object):
"""Where the data is really stored."""
def __init__(self):
self.log_books = {}
self.flow_details = {}
self.atom_details = {}
def clear_all(self):
self.log_books.clear()
self.flow_details.clear()
self.atom_details.clear()
class _MemoryHelper(object):
"""Helper functionality for the memory backends & connections."""
def __init__(self, memory):
self._memory = memory
@staticmethod
def _fetch_clone_args(incoming):
if isinstance(incoming, (logbook.LogBook, logbook.FlowDetail)):
# We keep our own copy of the added contents of the following
# types so we don't need the clone to retain them directly...
return {
'retain_contents': False,
}
return {}
def construct(self, uuid, container):
"""Reconstructs a object from the given uuid and storage container."""
source = container[uuid]
clone_kwargs = self._fetch_clone_args(source)
clone = source['object'].copy(**clone_kwargs)
rebuilder = source.get('rebuilder')
if rebuilder:
for component in map(rebuilder, source['components']):
clone.add(component)
return clone
def merge(self, incoming, saved_info=None):
"""Merges the incoming object into the local memories copy."""
if saved_info is None:
if isinstance(incoming, logbook.LogBook):
saved_info = self._memory.log_books.setdefault(
incoming.uuid, {})
elif isinstance(incoming, logbook.FlowDetail):
saved_info = self._memory.flow_details.setdefault(
incoming.uuid, {})
elif isinstance(incoming, logbook.AtomDetail):
saved_info = self._memory.atom_details.setdefault(
incoming.uuid, {})
else:
raise TypeError("Unknown how to merge '%s' (%s)"
% (incoming, type(incoming)))
try:
saved_info['object'].merge(incoming)
except KeyError:
clone_kwargs = self._fetch_clone_args(incoming)
saved_info['object'] = incoming.copy(**clone_kwargs)
if isinstance(incoming, logbook.LogBook):
flow_details = saved_info.setdefault('components', set())
if 'rebuilder' not in saved_info:
saved_info['rebuilder'] = functools.partial(
self.construct, container=self._memory.flow_details)
for flow_detail in incoming:
flow_details.add(self.merge(flow_detail))
elif isinstance(incoming, logbook.FlowDetail):
atom_details = saved_info.setdefault('components', set())
if 'rebuilder' not in saved_info:
saved_info['rebuilder'] = functools.partial(
self.construct, container=self._memory.atom_details)
for atom_detail in incoming:
atom_details.add(self.merge(atom_detail))
return incoming.uuid
class MemoryBackend(base.Backend):
class MemoryBackend(path_based.PathBasedBackend):
"""A in-memory (non-persistent) backend.
This backend writes logbooks, flow details, and atom details to in-memory
dictionaries and retrieves from those dictionaries as needed.
This backend does *not* provide true transactional semantics. It does
guarantee that there will be no inter-thread race conditions when
writing and reading by using a read/write locks.
"""
def __init__(self, conf=None):
super(MemoryBackend, self).__init__(conf)
self._memory = _Memory()
self._helper = _MemoryHelper(self._memory)
self._lock = lock_utils.ReaderWriterLock()
def _construct_from(self, container):
return dict((uuid, self._helper.construct(uuid, container))
for uuid in six.iterkeys(container))
@property
def log_books(self):
with self._lock.read_lock():
return self._construct_from(self._memory.log_books)
@property
def flow_details(self):
with self._lock.read_lock():
return self._construct_from(self._memory.flow_details)
@property
def atom_details(self):
with self._lock.read_lock():
return self._construct_from(self._memory.atom_details)
if self._path is None:
self._path = os.sep
self.memory = tree.Node(self._path)
self.lock = lock_utils.ReaderWriterLock()
def get_connection(self):
return Connection(self)
@@ -144,107 +49,79 @@ class MemoryBackend(base.Backend):
pass
class Connection(base.Connection):
"""A connection to an in-memory backend."""
class Connection(path_based.PathBasedConnection):
def __init__(self, backend):
self._backend = backend
self._helper = backend._helper
self._memory = backend._memory
self._lock = backend._lock
super(Connection, self).__init__(backend)
self.upgrade()
def upgrade(self):
pass
@contextlib.contextmanager
def _memory_lock(self, write=False):
if write:
lock = self.backend.lock.write_lock
else:
lock = self.backend.lock.read_lock
with lock():
try:
yield
except exc.TaskFlowException as e:
raise
except Exception as e:
raise exc.StorageFailure("Storage backend internal error", e)
def _fetch_node(self, path):
node = self.backend.memory.find(path)
if node is None:
raise exc.NotFound("Item not found %s" % path)
return node
def _join_path(self, *parts):
return os.path.join(*parts)
def _get_item(self, path):
with self._memory_lock():
return copy.deepcopy(self._fetch_node(path).metadata['value'])
def _set_item(self, path, value, transaction):
value = copy.deepcopy(value)
try:
item_node = self._fetch_node(path)
item_node.metadata.update(value=value)
except exc.NotFound:
dirname, basename = os.path.split(path)
parent_node = self._fetch_node(dirname)
parent_node.add(tree.Node(path, name=basename, value=value))
def _del_tree(self, path, transaction):
node = self._fetch_node(path)
node.disassociate()
def _get_children(self, path):
with self._memory_lock():
return [node.metadata['name'] for node in self._fetch_node(path)]
def _ensure_path(self, path):
with self._memory_lock(write=True):
path = os.path.normpath(path)
parts = path.split(os.sep)
node = self.backend.memory
for p in range(len(parts) - 1):
node_path = os.sep.join(parts[:p + 2])
try:
node = self._fetch_node(node_path)
except exc.NotFound:
node.add(tree.Node(node_path, name=parts[p + 1]))
def _create_link(self, src_path, dest_path, transaction):
dirname, basename = os.path.split(dest_path)
parent_node = self._fetch_node(dirname)
parent_node.add(tree.Node(dest_path, name=basename, target=src_path))
@contextlib.contextmanager
def _transaction(self):
"""This just wraps a global write-lock"""
with self._memory_lock(write=True):
yield
def validate(self):
pass
@property
def backend(self):
return self._backend
def close(self):
pass
def clear_all(self):
with self._lock.write_lock():
self._memory.clear_all()
def destroy_logbook(self, book_uuid):
with self._lock.write_lock():
try:
# Do the same cascading delete that the sql layer does.
book_info = self._memory.log_books.pop(book_uuid)
except KeyError:
raise exc.NotFound("No logbook found with uuid '%s'"
% book_uuid)
else:
while book_info['components']:
flow_uuid = book_info['components'].pop()
flow_info = self._memory.flow_details.pop(flow_uuid)
while flow_info['components']:
atom_uuid = flow_info['components'].pop()
self._memory.atom_details.pop(atom_uuid)
def update_atom_details(self, atom_detail):
with self._lock.write_lock():
try:
atom_info = self._memory.atom_details[atom_detail.uuid]
return self._helper.construct(
self._helper.merge(atom_detail, saved_info=atom_info),
self._memory.atom_details)
except KeyError:
raise exc.NotFound("No atom details found with uuid '%s'"
% atom_detail.uuid)
def update_flow_details(self, flow_detail):
with self._lock.write_lock():
try:
flow_info = self._memory.flow_details[flow_detail.uuid]
return self._helper.construct(
self._helper.merge(flow_detail, saved_info=flow_info),
self._memory.flow_details)
except KeyError:
raise exc.NotFound("No flow details found with uuid '%s'"
% flow_detail.uuid)
def save_logbook(self, book):
with self._lock.write_lock():
return self._helper.construct(self._helper.merge(book),
self._memory.log_books)
def get_logbook(self, book_uuid):
with self._lock.read_lock():
try:
return self._helper.construct(book_uuid,
self._memory.log_books)
except KeyError:
raise exc.NotFound("No logbook found with uuid '%s'"
% book_uuid)
def get_logbooks(self):
# Don't hold locks while iterating...
with self._lock.read_lock():
book_uuids = set(six.iterkeys(self._memory.log_books))
for book_uuid in book_uuids:
try:
with self._lock.read_lock():
book = self._helper.construct(book_uuid,
self._memory.log_books)
yield book
except KeyError:
pass
def get_flow_details(self, fd_uuid):
try:
with self._lock.read_lock():
return self._memory.flow_details[fd_uuid]
except KeyError:
raise exc.NotFound("No flow details found '%s'" % fd_uuid)
def get_atom_details(self, ad_uuid):
try:
with self._lock.read_lock():
return self._memory.atom_details[ad_uuid]
except KeyError:
raise exc.NotFound("No atom details found '%s'" % ad_uuid)

View File

@@ -202,25 +202,25 @@ class Alchemist(object):
atom_cls = logbook.atom_detail_class(row.pop('atom_type'))
return atom_cls.from_dict(row)
def _atom_query_iter(self, conn, parent_uuid):
def atom_query_iter(self, conn, parent_uuid):
q = (sql.select([self._tables.atomdetails]).
where(self._tables.atomdetails.c.parent_uuid == parent_uuid))
for row in conn.execute(q):
yield self.convert_atom_detail(row)
def _flow_query_iter(self, conn, parent_uuid):
def flow_query_iter(self, conn, parent_uuid):
q = (sql.select([self._tables.flowdetails]).
where(self._tables.flowdetails.c.parent_uuid == parent_uuid))
for row in conn.execute(q):
yield self.convert_flow_detail(row)
def populate_book(self, conn, book):
for fd in self._flow_query_iter(conn, book.uuid):
for fd in self.flow_query_iter(conn, book.uuid):
book.add(fd)
self.populate_flow_detail(conn, fd)
def populate_flow_detail(self, conn, fd):
for ad in self._atom_query_iter(conn, fd.uuid):
for ad in self.atom_query_iter(conn, fd.uuid):
fd.add(ad)
@@ -558,6 +558,19 @@ class Connection(base.Connection):
for book in gathered:
yield book
def get_flows_for_book(self, book_uuid):
gathered = []
try:
with contextlib.closing(self._engine.connect()) as conn:
for row in self._converter.flow_query_iter(conn, book_uuid):
flow_details = self._converter.populate_flow_detail(conn,
row)
gathered.append(flow_details)
except sa_exc.DBAPIError as e:
raise exc.StorageFailure("Failed getting flow details", e)
for flow_details in gathered:
yield flow_details
def get_flow_details(self, fd_uuid):
try:
flowdetails = self._tables.flowdetails

View File

@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 AT&T Labs All Rights Reserved.
# Copyright (C) 2015 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -21,30 +22,18 @@ from kazoo.protocol import paths
from oslo_serialization import jsonutils
from taskflow import exceptions as exc
from taskflow import logging
from taskflow.persistence import base
from taskflow.persistence import logbook
from taskflow.persistence import path_based
from taskflow.utils import kazoo_utils as k_utils
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
# Transaction support was added in 3.4.0
MIN_ZK_VERSION = (3, 4, 0)
class ZkBackend(base.Backend):
"""A zookeeper backend.
This backend writes logbooks, flow details, and atom details to a provided
base path in zookeeper. It will create and store those objects in three
key directories (one for logbooks, one for flow details and one for atom
details). It creates those associated directories and then creates files
inside those directories that represent the contents of those objects for
later reading and writing.
class ZkBackend(path_based.PathBasedBackend):
"""A zookeeper-backed backend.
Example configuration::
conf = {
"hosts": "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181",
"path": "/taskflow",
@@ -52,24 +41,18 @@ class ZkBackend(base.Backend):
"""
def __init__(self, conf, client=None):
super(ZkBackend, self).__init__(conf)
path = str(conf.get("path", "/taskflow"))
if not path:
raise ValueError("Empty zookeeper path is disallowed")
if not paths.isabs(path):
if not self._path:
self._path = '/taskflow'
if not paths.isabs(self._path):
raise ValueError("Zookeeper path must be absolute")
self._path = path
if client is not None:
self._client = client
self._owned = False
else:
self._client = k_utils.make_client(conf)
self._client = k_utils.make_client(self._conf)
self._owned = True
self._validated = False
@property
def path(self):
return self._path
def get_connection(self):
conn = ZkConnection(self, self._client, self._conf)
if not self._validated:
@@ -87,52 +70,15 @@ class ZkBackend(base.Backend):
raise exc.StorageFailure("Unable to finalize client", e)
class ZkConnection(base.Connection):
class ZkConnection(path_based.PathBasedConnection):
def __init__(self, backend, client, conf):
self._backend = backend
self._client = client
super(ZkConnection, self).__init__(backend)
self._conf = conf
self._book_path = paths.join(self._backend.path, "books")
self._flow_path = paths.join(self._backend.path, "flow_details")
self._atom_path = paths.join(self._backend.path, "atom_details")
self._client = client
with self._exc_wrapper():
# NOOP if already started.
self._client.start()
def validate(self):
with self._exc_wrapper():
try:
if self._conf.get('check_compatible', True):
k_utils.check_compatible(self._client, MIN_ZK_VERSION)
except exc.IncompatibleVersion as e:
raise exc.StorageFailure("Backend storage is not a"
" compatible version", e)
@property
def backend(self):
return self._backend
@property
def book_path(self):
return self._book_path
@property
def flow_path(self):
return self._flow_path
@property
def atom_path(self):
return self._atom_path
def close(self):
pass
def upgrade(self):
"""Creates the initial paths (if they already don't exist)."""
with self._exc_wrapper():
for path in (self.book_path, self.flow_path, self.atom_path):
self._client.ensure_path(path)
@contextlib.contextmanager
def _exc_wrapper(self):
"""Exception context-manager which wraps kazoo exceptions.
@@ -146,8 +92,7 @@ class ZkConnection(base.Connection):
except self._client.handler.timeout_exception as e:
raise exc.StorageFailure("Storage backend timeout", e)
except k_exc.SessionExpiredError as e:
raise exc.StorageFailure("Storage backend session"
" has expired", e)
raise exc.StorageFailure("Storage backend session has expired", e)
except k_exc.NoNodeError as e:
raise exc.NotFound("Storage backend node not found: %s" % e)
except k_exc.NodeExistsError as e:
@@ -155,273 +100,50 @@ class ZkConnection(base.Connection):
except (k_exc.KazooException, k_exc.ZookeeperError) as e:
raise exc.StorageFailure("Storage backend internal error", e)
def update_atom_details(self, ad):
"""Update a atom detail transactionally."""
with self._exc_wrapper():
txn = self._client.transaction()
ad = self._update_atom_details(ad, txn)
k_utils.checked_commit(txn)
return ad
def _join_path(self, *parts):
return paths.join(*parts)
def _update_atom_details(self, ad, txn, create_missing=False):
# Determine whether the desired data exists or not.
ad_path = paths.join(self.atom_path, ad.uuid)
e_ad = None
try:
ad_data, _zstat = self._client.get(ad_path)
except k_exc.NoNodeError:
# Not-existent: create or raise exception.
if not create_missing:
raise exc.NotFound("No atom details found with"
" id: %s" % ad.uuid)
else:
txn.create(ad_path)
def _get_item(self, path):
with self._exc_wrapper():
data, _ = self._client.get(path)
return misc.decode_json(data)
def _set_item(self, path, value, transaction):
data = misc.binary_encode(jsonutils.dumps(value))
if not self._client.exists(path):
transaction.create(path, data)
else:
# Existent: read it out.
transaction.set_data(path, data)
def _del_tree(self, path, transaction):
for child in self._get_children(path):
self._del_tree(self._join_path(path, child), transaction)
transaction.delete(path)
def _get_children(self, path):
with self._exc_wrapper():
return self._client.get_children(path)
def _ensure_path(self, path):
with self._exc_wrapper():
self._client.ensure_path(path)
def _create_link(self, src_path, dest_path, transaction):
if not self._client.exists(dest_path):
transaction.create(dest_path)
@contextlib.contextmanager
def _transaction(self):
transaction = self._client.transaction()
with self._exc_wrapper():
yield transaction
k_utils.checked_commit(transaction)
def validate(self):
with self._exc_wrapper():
try:
ad_data = misc.decode_json(ad_data)
ad_cls = logbook.atom_detail_class(ad_data['type'])
e_ad = ad_cls.from_dict(ad_data['atom'])
except KeyError:
pass
# Update and write it back
if e_ad:
e_ad = e_ad.merge(ad)
else:
e_ad = ad
ad_data = base._format_atom(e_ad)
txn.set_data(ad_path,
misc.binary_encode(jsonutils.dumps(ad_data)))
return e_ad
def get_atom_details(self, ad_uuid):
"""Read a atom detail.
*Read-only*, so no need of zk transaction.
"""
with self._exc_wrapper():
return self._get_atom_details(ad_uuid)
def _get_atom_details(self, ad_uuid):
ad_path = paths.join(self.atom_path, ad_uuid)
try:
ad_data, _zstat = self._client.get(ad_path)
except k_exc.NoNodeError:
raise exc.NotFound("No atom details found with id: %s" % ad_uuid)
else:
ad_data = misc.decode_json(ad_data)
ad_cls = logbook.atom_detail_class(ad_data['type'])
return ad_cls.from_dict(ad_data['atom'])
def update_flow_details(self, fd):
"""Update a flow detail transactionally."""
with self._exc_wrapper():
txn = self._client.transaction()
fd = self._update_flow_details(fd, txn)
k_utils.checked_commit(txn)
return fd
def _update_flow_details(self, fd, txn, create_missing=False):
# Determine whether the desired data exists or not
fd_path = paths.join(self.flow_path, fd.uuid)
try:
fd_data, _zstat = self._client.get(fd_path)
except k_exc.NoNodeError:
# Not-existent: create or raise exception
if create_missing:
txn.create(fd_path)
e_fd = logbook.FlowDetail(name=fd.name, uuid=fd.uuid)
else:
raise exc.NotFound("No flow details found with id: %s"
% fd.uuid)
else:
# Existent: read it out
e_fd = logbook.FlowDetail.from_dict(misc.decode_json(fd_data))
# Update and write it back
e_fd = e_fd.merge(fd)
fd_data = e_fd.to_dict()
txn.set_data(fd_path, misc.binary_encode(jsonutils.dumps(fd_data)))
for ad in fd:
ad_path = paths.join(fd_path, ad.uuid)
# NOTE(harlowja): create an entry in the flow detail path
# for the provided atom detail so that a reference exists
# from the flow detail to its atom details.
if not self._client.exists(ad_path):
txn.create(ad_path)
e_fd.add(self._update_atom_details(ad, txn, create_missing=True))
return e_fd
def get_flow_details(self, fd_uuid):
"""Read a flow detail.
*Read-only*, so no need of zk transaction.
"""
with self._exc_wrapper():
return self._get_flow_details(fd_uuid)
def _get_flow_details(self, fd_uuid):
fd_path = paths.join(self.flow_path, fd_uuid)
try:
fd_data, _zstat = self._client.get(fd_path)
except k_exc.NoNodeError:
raise exc.NotFound("No flow details found with id: %s" % fd_uuid)
fd = logbook.FlowDetail.from_dict(misc.decode_json(fd_data))
for ad_uuid in self._client.get_children(fd_path):
fd.add(self._get_atom_details(ad_uuid))
return fd
def save_logbook(self, lb):
"""Save (update) a log_book transactionally."""
def _create_logbook(lb_path, txn):
lb_data = lb.to_dict(marshal_time=True)
txn.create(lb_path, misc.binary_encode(jsonutils.dumps(lb_data)))
for fd in lb:
# NOTE(harlowja): create an entry in the logbook path
# for the provided flow detail so that a reference exists
# from the logbook to its flow details.
txn.create(paths.join(lb_path, fd.uuid))
fd_path = paths.join(self.flow_path, fd.uuid)
fd_data = jsonutils.dumps(fd.to_dict())
txn.create(fd_path, misc.binary_encode(fd_data))
for ad in fd:
# NOTE(harlowja): create an entry in the flow detail path
# for the provided atom detail so that a reference exists
# from the flow detail to its atom details.
txn.create(paths.join(fd_path, ad.uuid))
ad_path = paths.join(self.atom_path, ad.uuid)
ad_data = base._format_atom(ad)
txn.create(ad_path,
misc.binary_encode(jsonutils.dumps(ad_data)))
return lb
def _update_logbook(lb_path, lb_data, txn):
e_lb = logbook.LogBook.from_dict(misc.decode_json(lb_data),
unmarshal_time=True)
e_lb = e_lb.merge(lb)
lb_data = e_lb.to_dict(marshal_time=True)
txn.set_data(lb_path, misc.binary_encode(jsonutils.dumps(lb_data)))
for fd in lb:
fd_path = paths.join(lb_path, fd.uuid)
if not self._client.exists(fd_path):
# NOTE(harlowja): create an entry in the logbook path
# for the provided flow detail so that a reference exists
# from the logbook to its flow details.
txn.create(fd_path)
e_fd = self._update_flow_details(fd, txn, create_missing=True)
e_lb.add(e_fd)
return e_lb
with self._exc_wrapper():
txn = self._client.transaction()
# Determine whether the desired data exists or not.
lb_path = paths.join(self.book_path, lb.uuid)
try:
lb_data, _zstat = self._client.get(lb_path)
except k_exc.NoNodeError:
# Create a new logbook since it doesn't exist.
e_lb = _create_logbook(lb_path, txn)
else:
# Otherwise update the existing logbook instead.
e_lb = _update_logbook(lb_path, lb_data, txn)
k_utils.checked_commit(txn)
return e_lb
def _get_logbook(self, lb_uuid):
lb_path = paths.join(self.book_path, lb_uuid)
try:
lb_data, _zstat = self._client.get(lb_path)
except k_exc.NoNodeError:
raise exc.NotFound("No logbook found with id: %s" % lb_uuid)
else:
lb = logbook.LogBook.from_dict(misc.decode_json(lb_data),
unmarshal_time=True)
for fd_uuid in self._client.get_children(lb_path):
lb.add(self._get_flow_details(fd_uuid))
return lb
def get_logbook(self, lb_uuid):
"""Read a logbook.
*Read-only*, so no need of zk transaction.
"""
with self._exc_wrapper():
return self._get_logbook(lb_uuid)
def get_logbooks(self):
"""Read all logbooks.
*Read-only*, so no need of zk transaction.
"""
with self._exc_wrapper():
for lb_uuid in self._client.get_children(self.book_path):
yield self._get_logbook(lb_uuid)
def destroy_logbook(self, lb_uuid):
"""Destroy (delete) a log_book transactionally."""
def _destroy_atom_details(ad_uuid, txn):
ad_path = paths.join(self.atom_path, ad_uuid)
if not self._client.exists(ad_path):
raise exc.NotFound("No atom details found with id: %s"
% ad_uuid)
txn.delete(ad_path)
def _destroy_flow_details(fd_uuid, txn):
fd_path = paths.join(self.flow_path, fd_uuid)
if not self._client.exists(fd_path):
raise exc.NotFound("No flow details found with id: %s"
% fd_uuid)
for ad_uuid in self._client.get_children(fd_path):
_destroy_atom_details(ad_uuid, txn)
txn.delete(paths.join(fd_path, ad_uuid))
txn.delete(fd_path)
def _destroy_logbook(lb_uuid, txn):
lb_path = paths.join(self.book_path, lb_uuid)
if not self._client.exists(lb_path):
raise exc.NotFound("No logbook found with id: %s" % lb_uuid)
for fd_uuid in self._client.get_children(lb_path):
_destroy_flow_details(fd_uuid, txn)
txn.delete(paths.join(lb_path, fd_uuid))
txn.delete(lb_path)
with self._exc_wrapper():
txn = self._client.transaction()
_destroy_logbook(lb_uuid, txn)
k_utils.checked_commit(txn)
def clear_all(self, delete_dirs=True):
"""Delete all data transactionally."""
with self._exc_wrapper():
txn = self._client.transaction()
# Delete all data under logbook path.
for lb_uuid in self._client.get_children(self.book_path):
lb_path = paths.join(self.book_path, lb_uuid)
for fd_uuid in self._client.get_children(lb_path):
txn.delete(paths.join(lb_path, fd_uuid))
txn.delete(lb_path)
# Delete all data under flow detail path.
for fd_uuid in self._client.get_children(self.flow_path):
fd_path = paths.join(self.flow_path, fd_uuid)
for ad_uuid in self._client.get_children(fd_path):
txn.delete(paths.join(fd_path, ad_uuid))
txn.delete(fd_path)
# Delete all data under atom detail path.
for ad_uuid in self._client.get_children(self.atom_path):
ad_path = paths.join(self.atom_path, ad_uuid)
txn.delete(ad_path)
# Delete containing directories.
if delete_dirs:
txn.delete(self.book_path)
txn.delete(self.atom_path)
txn.delete(self.flow_path)
k_utils.checked_commit(txn)
if self._conf.get('check_compatible', True):
k_utils.check_compatible(self._client, MIN_ZK_VERSION)
except exc.IncompatibleVersion as e:
raise exc.StorageFailure("Backend storage is not a"
" compatible version", e)

View File

@@ -118,6 +118,11 @@ class Connection(object):
"""Return an iterable of logbook objects."""
pass
@abc.abstractmethod
def get_flows_for_book(self, book_uuid):
"""Return an iterable of flowdetails for a given logbook uuid."""
pass
@abc.abstractmethod
def get_flow_details(self, fd_uuid):
"""Fetches a flowdetails object matching the given uuid."""

View File

@@ -0,0 +1,244 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from taskflow import exceptions as exc
from taskflow.persistence import base
from taskflow.persistence import logbook
@six.add_metaclass(abc.ABCMeta)
class PathBasedBackend(base.Backend):
"""Base class for persistence backends that address data by path
Subclasses of this backend write logbooks, flow details, and atom details
to a provided base path in some filesystem-like storage. They will create
and store those objects in three key directories (one for logbooks, one
for flow details and one for atom details). They create those associated
directories and then create files inside those directories that represent
the contents of those objects for later reading and writing.
"""
def __init__(self, conf):
super(PathBasedBackend, self).__init__(conf)
if conf is None:
conf = {}
self._path = conf.get('path', None)
@property
def path(self):
return self._path
@six.add_metaclass(abc.ABCMeta)
class PathBasedConnection(base.Connection):
def __init__(self, backend):
self._backend = backend
self._book_path = self._join_path(backend.path, "books")
self._flow_path = self._join_path(backend.path, "flow_details")
self._atom_path = self._join_path(backend.path, "atom_details")
@staticmethod
def _serialize(obj):
if isinstance(obj, logbook.LogBook):
return obj.to_dict(marshal_time=True)
elif isinstance(obj, logbook.FlowDetail):
return obj.to_dict()
elif isinstance(obj, logbook.AtomDetail):
return base._format_atom(obj)
else:
raise exc.StorageFailure("Invalid storage class %s" % type(obj))
@staticmethod
def _deserialize(cls, data):
if issubclass(cls, logbook.LogBook):
return cls.from_dict(data, unmarshal_time=True)
elif issubclass(cls, logbook.FlowDetail):
return cls.from_dict(data)
elif issubclass(cls, logbook.AtomDetail):
atom_class = logbook.atom_detail_class(data['type'])
return atom_class.from_dict(data['atom'])
else:
raise exc.StorageFailure("Invalid storage class %s" % cls)
@property
def backend(self):
return self._backend
@property
def book_path(self):
return self._book_path
@property
def flow_path(self):
return self._flow_path
@property
def atom_path(self):
return self._atom_path
@abc.abstractmethod
def _join_path(self, *parts):
"""Accept path parts, and return a joined path"""
@abc.abstractmethod
def _get_item(self, path):
"""Fetch a single item from the backend"""
@abc.abstractmethod
def _set_item(self, path, value, transaction):
"""Write a single item to the backend"""
@abc.abstractmethod
def _del_tree(self, path, transaction):
"""Recursively deletes a folder from the backend."""
@abc.abstractmethod
def _get_children(self, path):
"""Get a list of child items of a path"""
@abc.abstractmethod
def _ensure_path(self, path):
"""Recursively ensure that a path (folder) in the backend exists"""
@abc.abstractmethod
def _create_link(self, src_path, dest_path, transaction):
"""Create a symlink-like link between two paths"""
@abc.abstractmethod
def _transaction(self):
"""Context manager that yields a transaction"""
def _get_obj_path(self, obj):
if isinstance(obj, logbook.LogBook):
path = self.book_path
elif isinstance(obj, logbook.FlowDetail):
path = self.flow_path
elif isinstance(obj, logbook.AtomDetail):
path = self.atom_path
else:
raise exc.StorageFailure("Invalid storage class %s" % type(obj))
return self._join_path(path, obj.uuid)
def _update_object(self, obj, transaction, ignore_missing=False):
path = self._get_obj_path(obj)
try:
item_data = self._get_item(path)
existing_obj = self._deserialize(type(obj), item_data)
obj = existing_obj.merge(obj)
except exc.NotFound:
if not ignore_missing:
raise
self._set_item(path, self._serialize(obj), transaction)
return obj
def get_logbooks(self, lazy=False):
for book_uuid in self._get_children(self.book_path):
yield self.get_logbook(book_uuid, lazy)
def get_logbook(self, book_uuid, lazy=False):
book_path = self._join_path(self.book_path, book_uuid)
book_data = self._get_item(book_path)
book = self._deserialize(logbook.LogBook, book_data)
if not lazy:
for flow_details in self.get_flows_for_book(book_uuid):
book.add(flow_details)
return book
def save_logbook(self, book):
book_path = self._get_obj_path(book)
with self._transaction() as transaction:
self._update_object(book, transaction, ignore_missing=True)
for flow_details in book:
flow_path = self._get_obj_path(flow_details)
link_path = self._join_path(book_path, flow_details.uuid)
self._do_update_flow_details(flow_details, transaction,
ignore_missing=True)
self._create_link(flow_path, link_path, transaction)
return book
def get_flows_for_book(self, book_uuid, lazy=False):
book_path = self._join_path(self.book_path, book_uuid)
for flow_uuid in self._get_children(book_path):
yield self.get_flow_details(flow_uuid, lazy)
def get_flow_details(self, flow_uuid, lazy=False):
flow_path = self._join_path(self.flow_path, flow_uuid)
flow_data = self._get_item(flow_path)
flow_details = self._deserialize(logbook.FlowDetail, flow_data)
if not lazy:
for atom_details in self.get_atoms_for_flow(flow_uuid):
flow_details.add(atom_details)
return flow_details
def _do_update_flow_details(self, flow_detail, transaction,
ignore_missing=False):
flow_path = self._get_obj_path(flow_detail)
self._update_object(flow_detail, transaction, ignore_missing)
for atom_details in flow_detail:
atom_path = self._get_obj_path(atom_details)
link_path = self._join_path(flow_path, atom_details.uuid)
self._create_link(atom_path, link_path, transaction)
self._update_object(atom_details, transaction, ignore_missing=True)
return flow_detail
def update_flow_details(self, flow_detail, ignore_missing=False):
with self._transaction() as transaction:
return self._do_update_flow_details(flow_detail, transaction,
ignore_missing)
def get_atoms_for_flow(self, flow_uuid):
flow_path = self._join_path(self.flow_path, flow_uuid)
for atom_uuid in self._get_children(flow_path):
yield self.get_atom_details(atom_uuid)
def get_atom_details(self, atom_uuid):
atom_path = self._join_path(self.atom_path, atom_uuid)
atom_data = self._get_item(atom_path)
return self._deserialize(logbook.AtomDetail, atom_data)
def update_atom_details(self, atom_detail, ignore_missing=False):
with self._transaction() as transaction:
return self._update_object(atom_detail, transaction,
ignore_missing)
def _do_destroy_logbook(self, book_uuid, transaction):
book_path = self._join_path(self.book_path, book_uuid)
for flow_uuid in self._get_children(book_path):
flow_path = self._join_path(self.flow_path, flow_uuid)
for atom_uuid in self._get_children(flow_path):
atom_path = self._join_path(self.atom_path, atom_uuid)
self._del_tree(atom_path, transaction)
self._del_tree(flow_path, transaction)
self._del_tree(book_path, transaction)
def destroy_logbook(self, book_uuid):
with self._transaction() as transaction:
return self._do_destroy_logbook(book_uuid, transaction)
def clear_all(self):
with self._transaction() as transaction:
for path in (self.book_path, self.flow_path, self.atom_path):
self._del_tree(path, transaction)
def upgrade(self):
for path in (self.book_path, self.flow_path, self.atom_path):
self._ensure_path(path)
def close(self):
pass