Move toward using a backend+connection model

Instead of having a pretty restrictive module
based api for saving logbook objects it is much
more friendly and extensible to move toward a more
ceilometer-influenced engine and connection based
storage backend using stevedore to do the backend
loading instead of a custom registration/fetching
mechanism. This allows us to provide a base object
oriented backend api that can be easily inherited
from to allow for customized & pluggable backend
storage modules.

Implements blueprint stevedore-based-backends
Implements blueprint ceilometer-influenced-backends

Change-Id: Ib5868d3d9018b7aa1a3354858dcb90ca1a04055d
This commit is contained in:
Joshua Harlow 2013-09-07 16:49:40 -07:00 committed by Ivan A. Melnikov
parent 9effbd6016
commit 45d350e80d
28 changed files with 1370 additions and 1003 deletions

@ -27,6 +27,13 @@ setup-hooks =
packages =
taskflow
[entry_points]
taskflow.persistence =
memory = taskflow.persistence.backends.impl_memory:MemoryBackend
mysql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend
postgresql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend
sqlite = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend
[nosetests]
cover-erase = true
verbosity = 2

@ -27,6 +27,8 @@ from taskflow.engines.action_engine import task_action
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow.persistence import utils as p_utils
from taskflow import decorators
from taskflow import exceptions as exc
from taskflow import states
@ -140,9 +142,13 @@ class SingleThreadedTranslator(Translator):
class SingleThreadedActionEngine(ActionEngine):
translator_cls = SingleThreadedTranslator
def __init__(self, flow, flow_detail=None):
def __init__(self, flow, flow_detail=None, book=None, backend=None):
if flow_detail is None:
flow_detail = p_utils.create_flow_detail(flow,
book=book,
backend=backend)
ActionEngine.__init__(self, flow,
storage=t_storage.Storage(flow_detail))
storage=t_storage.Storage(flow_detail, backend))
class MultiThreadedTranslator(Translator):
@ -168,9 +174,15 @@ class MultiThreadedTranslator(Translator):
class MultiThreadedActionEngine(ActionEngine):
translator_cls = MultiThreadedTranslator
def __init__(self, flow, flow_detail=None, thread_pool=None):
def __init__(self, flow, flow_detail=None, book=None, backend=None,
thread_pool=None):
if flow_detail is None:
flow_detail = p_utils.create_flow_detail(flow,
book=book,
backend=backend)
ActionEngine.__init__(self, flow,
storage=t_storage.ThreadSafeStorage(flow_detail))
storage=t_storage.ThreadSafeStorage(flow_detail,
backend))
if thread_pool:
self._thread_pool = thread_pool
self._owns_thread_pool = False

@ -2,7 +2,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain

@ -2,7 +2,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -15,3 +15,27 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import urlparse
from stevedore import driver
from taskflow import exceptions as exc
# NOTE(harlowja): this is the entrypoint namespace, not the module namespace.
BACKEND_NAMESPACE = 'taskflow.persistence'
LOG = logging.getLogger(__name__)
def fetch(conf, namespace=BACKEND_NAMESPACE):
backend_name = urlparse.urlparse(conf['connection']).scheme
LOG.debug('Looking for %r backend driver in %r', backend_name, namespace)
try:
mgr = driver.DriverManager(namespace, backend_name,
invoke_on_load=True,
invoke_kwds={'conf': conf})
return mgr.driver
except RuntimeError as e:
raise exc.NotFound("Could not find backend %s: %s" % (backend_name, e))

@ -1,62 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
from taskflow import exceptions as exc
from taskflow.openstack.common import importutils
_BACKEND_MAPPING = {
'memory': 'taskflow.persistence.backends.memory.api',
'sqlalchemy': 'taskflow.persistence.backends.sqlalchemy.api',
# TODO(harlowja): we likely need to allow more customization here so that
# its easier for a user of this library to alter the impl to there own
# choicing, aka, cinder has its own DB, or heat may want to write this
# information into swift, we need a way to accomodate that.
}
_BACKEND_MAPPING_LOCK = threading.RLock()
_BACKENDS = {}
_BACKEND_LOCK = threading.RLock()
def register(backend, module):
"""Register a new (or override an old) backend type.
Instead of being restricted to the existing types that are pre-registered
in taskflow it is useful to allow others to either override those types
or add new ones (since all backend types can not be predicted ahead of
time).
"""
with _BACKEND_MAPPING_LOCK:
_BACKEND_MAPPING[backend] = str(module)
def fetch(backend):
"""Fetch a backend impl. for a given backend type."""
with _BACKEND_MAPPING_LOCK:
if backend not in _BACKEND_MAPPING:
raise exc.NotFound("Unknown backend %s requested" % (backend))
mod = _BACKEND_MAPPING.get(backend, backend)
with _BACKEND_LOCK:
if mod in _BACKENDS:
return _BACKENDS[mod]
backend_mod = importutils.import_module(mod)
backend_impl = backend_mod.get_backend()
_BACKENDS[mod] = backend_impl
return backend_impl

@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class Backend(object):
"""Base class for persistence backends."""
__metaclass__ = abc.ABCMeta
def __init__(self, conf):
self._conf = conf
@abc.abstractmethod
def get_connection(self):
"""Return a Connection instance based on the configuration settings."""
pass
@abc.abstractmethod
def close(self):
"""Closes any resources this backend has open."""
pass
class Connection(object):
"""Base class for backend connections."""
__metaclass__ = abc.ABCMeta
@abc.abstractproperty
def backend(self):
"""Returns the backend this connection is associated with."""
pass
@abc.abstractmethod
def close(self):
"""Closes any resources this connection has open."""
pass
@abc.abstractmethod
def upgrade(self):
"""Migrate the persistence backend to the most recent version."""
pass
@abc.abstractmethod
def clear_all(self):
"""Clear all entries from this backend."""
pass
@abc.abstractmethod
def update_task_details(self, task_detail):
"""Updates a given task details and returns the updated version.
NOTE(harlowja): the details that is to be updated must already have
been created by saving a flow details with the given task detail inside
of it.
"""
pass
@abc.abstractmethod
def update_flow_details(self, flow_detail):
"""Updates a given flow details and returns the updated version.
NOTE(harlowja): the details that is to be updated must already have
been created by saving a logbook with the given flow detail inside
of it.
"""
pass
@abc.abstractmethod
def save_logbook(self, book):
"""Saves a logbook, and all its contained information."""
pass
@abc.abstractmethod
def destroy_logbook(self, book_uuid):
"""Deletes/destroys a logbook matching the given uuid."""
pass
@abc.abstractmethod
def get_logbook(self, book_uuid):
"""Fetches a logbook object matching the given uuid."""
pass
@abc.abstractmethod
def get_logbooks(self):
"""Return an iterable of logbook objects."""
pass

@ -0,0 +1,204 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of in-memory backend."""
import copy
import logging
import threading
import weakref
from taskflow import decorators
from taskflow import exceptions as exc
from taskflow.openstack.common import timeutils
from taskflow.persistence.backends import base
LOG = logging.getLogger(__name__)
# TODO(harlowja): we likely need to figure out a better place to put these
# rather than globals.
_LOG_BOOKS = {}
_FLOW_DETAILS = {}
_TASK_DETAILS = {}
# For now this will be a pretty big lock, since it is not expected that saves
# will be that frequent this seems ok for the time being. I imagine that this
# can be done better but it will require much more careful usage of a dict as
# a key/value map. Aka I wish python had a concurrent dict that was safe and
# known good to use.
_SAVE_LOCK = threading.RLock()
_READ_LOCK = threading.RLock()
_READ_SAVE_ORDER = (_READ_LOCK, _SAVE_LOCK)
def _copy(obj):
return copy.deepcopy(obj)
class MemoryBackend(base.Backend):
def get_connection(self):
return Connection(self)
def close(self):
pass
class Connection(base.Connection):
def __init__(self, backend):
self._read_lock = _READ_LOCK
self._save_locks = _READ_SAVE_ORDER
self._backend = weakref.proxy(backend)
def upgrade(self):
pass
@property
def backend(self):
return self._backend
def close(self):
pass
@decorators.locked(lock="_save_locks")
def clear_all(self):
count = 0
for uuid in list(_LOG_BOOKS.iterkeys()):
self.destroy_logbook(uuid)
count += 1
return count
@decorators.locked(lock="_save_locks")
def destroy_logbook(self, book_uuid):
try:
# Do the same cascading delete that the sql layer does.
lb = _LOG_BOOKS.pop(book_uuid)
for fd in lb:
_FLOW_DETAILS.pop(fd.uuid, None)
for td in fd:
_TASK_DETAILS.pop(td.uuid, None)
except KeyError:
raise exc.NotFound("No logbook found with id: %s" % book_uuid)
@decorators.locked(lock="_save_locks")
def update_task_details(self, task_detail):
try:
return _task_details_merge(_TASK_DETAILS[task_detail.uuid],
task_detail)
except KeyError:
raise exc.NotFound("No task details found with id: %s"
% task_detail.uuid)
@decorators.locked(lock="_save_locks")
def update_flow_details(self, flow_detail):
try:
e_fd = _flow_details_merge(_FLOW_DETAILS[flow_detail.uuid],
flow_detail)
for task_detail in flow_detail:
if e_fd.find(task_detail.uuid) is None:
_TASK_DETAILS[task_detail.uuid] = _copy(task_detail)
e_fd.add(task_detail)
if task_detail.uuid not in _TASK_DETAILS:
_TASK_DETAILS[task_detail.uuid] = _copy(task_detail)
task_detail.update(self.update_task_details(task_detail))
return e_fd
except KeyError:
raise exc.NotFound("No flow details found with id: %s"
% flow_detail.uuid)
@decorators.locked(lock="_save_locks")
def save_logbook(self, book):
# Get a existing logbook model (or create it if it isn't there).
try:
e_lb = _logbook_merge(_LOG_BOOKS[book.uuid], book)
# Add anything in to the new logbook that isn't already
# in the existing logbook.
for flow_detail in book:
if e_lb.find(flow_detail.uuid) is None:
_FLOW_DETAILS[flow_detail.uuid] = _copy(flow_detail)
e_lb.add(flow_detail)
if flow_detail.uuid not in _FLOW_DETAILS:
_FLOW_DETAILS[flow_detail.uuid] = _copy(flow_detail)
flow_detail.update(self.update_flow_details(flow_detail))
# TODO(harlowja): figure out a better way to set this property
# without actually setting a 'private' property.
e_lb._updated_at = timeutils.utcnow()
except KeyError:
# Ok the one given is now the one we will save
e_lb = _copy(book)
# TODO(harlowja): figure out a better way to set this property
# without actually setting a 'private' property.
e_lb._created_at = timeutils.utcnow()
# Record all the pieces as being saved.
_LOG_BOOKS[e_lb.uuid] = e_lb
for flow_detail in e_lb:
_FLOW_DETAILS[flow_detail.uuid] = _copy(flow_detail)
flow_detail.update(self.update_flow_details(flow_detail))
return e_lb
@decorators.locked(lock='_read_lock')
def get_logbook(self, book_uuid):
try:
return _LOG_BOOKS[book_uuid]
except KeyError:
raise exc.NotFound("No logbook found with id: %s" % book_uuid)
def get_logbooks(self):
# NOTE(harlowja): don't hold the lock while iterating
with self._read_lock:
books = list(_LOG_BOOKS.values())
for lb in books:
yield lb
###
# Merging + other helper functions.
###
def _task_details_merge(td_e, td_new):
if td_e is td_new:
return td_e
if td_e.state != td_new.state:
td_e.state = td_new.state
if td_e.results != td_new.results:
td_e.results = td_new.results
if td_e.exception != td_new.exception:
td_e.exception = td_new.exception
if td_e.stacktrace != td_new.stacktrace:
td_e.stacktrace = td_new.stacktrace
if td_e.meta != td_new.meta:
td_e.meta = td_new.meta
return td_e
def _flow_details_merge(fd_e, fd_new):
if fd_e is fd_new:
return fd_e
if fd_e.meta != fd_new.meta:
fd_e.meta = fd_new.meta
if fd_e.state != fd_new.state:
fd_e.state = fd_new.state
return fd_e
def _logbook_merge(lb_e, lb_new):
if lb_e is lb_new:
return lb_e
if lb_e.meta != lb_new.meta:
lb_e.meta = lb_new.meta
return lb_e

@ -0,0 +1,519 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of a SQLAlchemy storage backend."""
from __future__ import absolute_import
import contextlib
import copy
import logging
import time
import weakref
import sqlalchemy as sa
from sqlalchemy import exceptions as sa_exc
from sqlalchemy import orm as sa_orm
from sqlalchemy import pool as sa_pool
from taskflow import exceptions as exc
from taskflow.persistence.backends import base
from taskflow.persistence.backends.sqlalchemy import migration
from taskflow.persistence.backends.sqlalchemy import models
from taskflow.persistence import logbook
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
# NOTE(harlowja): This is all very similar to what oslo-incubator uses but is
# not based on using oslo.cfg and its global configuration (which should not be
# used in libraries such as taskflow).
#
# TODO(harlowja): once oslo.db appears we should be able to use that instead
# since it's not supposed to have any usage of oslo.cfg in it when it
# materializes as a library.
# See: http://dev.mysql.com/doc/refman/5.0/en/error-messages-client.html
MY_SQL_CONN_ERRORS = (
# Lost connection to MySQL server at '%s', system error: %d
'2006',
# Can't connect to MySQL server on '%s' (%d)
'2003',
# Can't connect to local MySQL server through socket '%s' (%d)
'2002',
)
MY_SQL_GONE_WAY_AWAY_ERRORS = (
# Lost connection to MySQL server at '%s', system error: %d
'2006',
# Lost connection to MySQL server during query
'2013',
# Commands out of sync; you can't run this command now
'2014',
# Can't open shared memory; no answer from server (%lu)
'2045',
# Lost connection to MySQL server at '%s', system error: %d
'2055',
)
# See: http://www.postgresql.org/docs/9.1/static/errcodes-appendix.html
POSTGRES_CONN_ERRORS = (
# connection_exception
'08000',
# connection_does_not_exist
'08003',
# connection_failure
'08006',
# sqlclient_unable_to_establish_sqlconnection
'08001',
# sqlserver_rejected_establishment_of_sqlconnection
'08004',
# Just couldn't connect (postgres errors are pretty weird)
'could not connect to server',
)
POSTGRES_GONE_WAY_AWAY_ERRORS = (
# Server terminated while in progress (postgres errors are pretty weird)
'server closed the connection unexpectedly',
'terminating connection due to administrator command',
)
# These connection urls mean sqlite is being used as an in-memory DB
SQLITE_IN_MEMORY = ("sqlite://", 'sqlite:///:memory:')
def _in_any(reason, err_haystack):
"""Checks if any elements of the haystack are in the given reason"""
for err in err_haystack:
if reason.find(str(err)) != -1:
return True
return False
def _is_db_connection_error(reason):
return _in_any(reason, list(MY_SQL_CONN_ERRORS + POSTGRES_CONN_ERRORS))
def _thread_yield(dbapi_con, con_record):
"""Ensure other greenthreads get a chance to be executed.
If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
execute instead of time.sleep(0).
Force a context switch. With common database backends (eg MySQLdb and
sqlite), there is no implicit yield caused by network I/O since they are
implemented by C libraries that eventlet cannot monkey patch.
"""
time.sleep(0)
def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
"""Ensures that MySQL connections checked out of the pool are alive.
Modified + borrowed from: http://bit.ly/14BYaW6
"""
try:
dbapi_conn.cursor().execute('select 1')
except dbapi_conn.OperationalError as ex:
if _in_any(str(ex.args[0]), MY_SQL_GONE_WAY_AWAY_ERRORS):
LOG.warn('Got mysql server has gone away: %s', ex)
raise sa_exc.DisconnectionError("Database server went away")
elif _in_any(str(ex.args[0]), POSTGRES_GONE_WAY_AWAY_ERRORS):
LOG.warn('Got postgres server has gone away: %s', ex)
raise sa_exc.DisconnectionError("Database server went away")
else:
raise
class SQLAlchemyBackend(base.Backend):
def __init__(self, conf):
super(SQLAlchemyBackend, self).__init__(conf)
self._engine = None
self._session_maker = None
def _test_connected(self, engine, max_retries=0):
def test_connect(failures):
try:
# See if we can make a connection happen.
#
# NOTE(harlowja): note that even though we are connecting
# once it does not mean that we will be able to connect in
# the future, so this is more of a sanity test and is not
# complete connection insurance.
with contextlib.closing(engine.connect()):
pass
except sa_exc.OperationalError as ex:
if _is_db_connection_error(str(ex.args[0])):
failures.append(misc.Failure())
return False
return True
failures = []
if test_connect(failures):
return engine
# Sorry it didn't work out...
if max_retries <= 0:
failures[-1].reraise()
# Go through the exponential backoff loop and see if we can connect
# after a given number of backoffs (with a backoff sleeping period
# between each attempt)...
attempts_left = max_retries
for sleepy_secs in misc.ExponentialBackoff(attempts=max_retries):
LOG.warn("SQL connection failed due to '%s', %s attempts left.",
failures[-1].exc, attempts_left)
LOG.info("Attempting to test the connection again in %s seconds.",
sleepy_secs)
time.sleep(sleepy_secs)
if test_connect(failures):
return engine
attempts_left -= 1
# Sorry it didn't work out...
failures[-1].reraise()
def _create_engine(self):
# NOTE(harlowja): copy the internal one so that we don't modify it via
# all the popping that will happen below.
conf = copy.deepcopy(self._conf)
engine_args = {
'echo': misc.as_bool(conf.pop('echo', False)),
'convert_unicode': misc.as_bool(conf.pop('convert_unicode', True)),
'pool_recycle': 3600,
}
try:
idle_timeout = misc.as_int(conf.pop('idle_timeout', None))
engine_args['pool_recycle'] = idle_timeout
except TypeError:
pass
sql_connection = conf.pop('connection')
e_url = sa.engine.url.make_url(sql_connection)
if 'sqlite' in e_url.drivername:
engine_args["poolclass"] = sa_pool.NullPool
# Adjustments for in-memory sqlite usage
if sql_connection.lower().strip() in SQLITE_IN_MEMORY:
engine_args["poolclass"] = sa_pool.StaticPool
engine_args["connect_args"] = {'check_same_thread': False}
else:
for (k, lookup_key) in [('pool_size', 'max_pool_size'),
('max_overflow', 'max_overflow'),
('pool_timeout', 'pool_timeout')]:
try:
engine_args[k] = misc.as_int(conf.pop(lookup_key, None))
except TypeError:
pass
# If the configuration dict specifies any additional engine args
# or engine arg overrides make sure we merge them in.
engine_args.update(conf.pop('engine_args', {}))
engine = sa.create_engine(sql_connection, **engine_args)
if misc.as_bool(conf.pop('checkin_yield', True)):
sa.event.listen(engine, 'checkin', _thread_yield)
if 'mysql' in e_url.drivername:
if misc.as_bool(conf.pop('checkout_ping', True)):
sa.event.listen(engine, 'checkout', _ping_listener)
try:
max_retries = misc.as_int(conf.pop('max_retries', None))
except TypeError:
max_retries = 0
return self._test_connected(engine, max_retries=max_retries)
@property
def engine(self):
if self._engine is None:
self._engine = self._create_engine()
return self._engine
def _get_session_maker(self):
if self._session_maker is None:
self._session_maker = sa_orm.sessionmaker(bind=self.engine,
autocommit=True)
return self._session_maker
def get_connection(self):
return Connection(self, self._get_session_maker())
def close(self):
if self._session_maker is not None:
self._session_maker.close_all()
if self._engine is not None:
self._engine.dispose()
self._engine = None
self._session_maker = None
class Connection(base.Connection):
def __init__(self, backend, session_maker):
self._backend = weakref.proxy(backend)
self._session_maker = session_maker
self._engine = backend.engine
@property
def backend(self):
return self._backend
def _run_in_session(self, functor, *args, **kwargs):
"""Runs a function in a session and makes sure that sqlalchemy
exceptions aren't emitted from that sessions actions (as that would
expose the underlying backends exception model).
"""
try:
session = self._make_session()
with session.begin():
return functor(session, *args, **kwargs)
except sa_exc.SQLAlchemyError as e:
raise exc.StorageError("Failed running database session: %s" % e,
e)
def _make_session(self):
try:
return self._session_maker()
except sa_exc.SQLAlchemyError as e:
raise exc.StorageError("Failed creating database session: %s"
% e, e)
def upgrade(self):
try:
with contextlib.closing(self._engine.connect()) as conn:
migration.db_sync(conn)
except sa_exc.SQLAlchemyError as e:
raise exc.StorageError("Failed upgrading database version: %s" % e,
e)
def _clear_all(self, session):
# NOTE(harlowja): due to how we have our relationship setup and
# cascading deletes are enabled, this will cause all associated
# task details and flow details to automatically be purged.
try:
return session.query(models.LogBook).delete()
except sa_exc.DBAPIError as e:
raise exc.StorageError("Failed clearing all entries: %s" % e, e)
def clear_all(self):
return self._run_in_session(self._clear_all)
def _update_task_details(self, session, td):
# Must already exist since a tasks details has a strong connection to
# a flow details, and tasks details can not be saved on there own since
# they *must* have a connection to an existing flow details.
td_m = _task_details_get_model(td.uuid, session=session)
td_m = _taskdetails_merge(td_m, td)
td_m = session.merge(td_m)
return _convert_td_to_external(td_m)
def update_task_details(self, task_detail):
return self._run_in_session(self._update_task_details, td=task_detail)
def _update_flow_details(self, session, fd):
# Must already exist since a flow details has a strong connection to
# a logbook, and flow details can not be saved on there own since they
# *must* have a connection to an existing logbook.
fd_m = _flow_details_get_model(fd.uuid, session=session)
fd_m = _flowdetails_merge(fd_m, fd)
fd_m = session.merge(fd_m)
return _convert_fd_to_external(fd_m)
def update_flow_details(self, flow_detail):
return self._run_in_session(self._update_flow_details, fd=flow_detail)
def _destroy_logbook(self, session, lb_id):
try:
lb = _logbook_get_model(lb_id, session=session)
session.delete(lb)
except sa_exc.DBAPIError as e:
raise exc.StorageError("Failed destroying"
" logbook %s: %s" % (lb_id, e), e)
def destroy_logbook(self, book_uuid):
return self._run_in_session(self._destroy_logbook, lb_id=book_uuid)
def _save_logbook(self, session, lb):
try:
lb_m = _logbook_get_model(lb.uuid, session=session)
# NOTE(harlowja): Merge them (note that this doesn't provide
# 100% correct update semantics due to how databases have
# MVCC). This is where a stored procedure or a better backing
# store would handle this better by allowing this merge logic
# to exist in the database itself.
lb_m = _logbook_merge(lb_m, lb)
except exc.NotFound:
lb_m = _convert_lb_to_internal(lb)
try:
lb_m = session.merge(lb_m)
return _convert_lb_to_external(lb_m)
except sa_exc.DBAPIError as e:
raise exc.StorageError("Failed saving logbook %s: %s" %
(lb.uuid, e), e)
def save_logbook(self, book):
return self._run_in_session(self._save_logbook, lb=book)
def get_logbook(self, book_uuid):
session = self._make_session()
try:
lb = _logbook_get_model(book_uuid, session=session)
return _convert_lb_to_external(lb)
except sa_exc.DBAPIError as e:
raise exc.StorageError("Failed getting logbook %s: %s"
% (book_uuid, e), e)
def get_logbooks(self):
session = self._make_session()
try:
raw_books = session.query(models.LogBook).all()
books = [_convert_lb_to_external(lb) for lb in raw_books]
except sa_exc.DBAPIError as e:
raise exc.StorageError("Failed getting logbooks: %s" % e, e)
for lb in books:
yield lb
def close(self):
pass
###
# Internal <-> external model + merging + other helper functions.
###
def _convert_fd_to_external(fd):
fd_c = logbook.FlowDetail(fd.name, uuid=fd.uuid)
fd_c.meta = fd.meta
fd_c.state = fd.state
for td in fd.taskdetails:
fd_c.add(_convert_td_to_external(td))
return fd_c
def _convert_fd_to_internal(fd, parent_uuid):
fd_m = models.FlowDetail(name=fd.name, uuid=fd.uuid,
parent_uuid=parent_uuid, meta=fd.meta,
state=fd.state)
fd_m.taskdetails = []
for td in fd:
fd_m.taskdetails.append(_convert_td_to_internal(td, fd_m.uuid))
return fd_m
def _convert_td_to_internal(td, parent_uuid):
return models.TaskDetail(name=td.name, uuid=td.uuid,
state=td.state, results=td.results,
exception=td.exception, meta=td.meta,
stacktrace=td.stacktrace,
version=td.version, parent_uuid=parent_uuid)
def _convert_td_to_external(td):
# Convert from sqlalchemy model -> external model, this allows us
# to change the internal sqlalchemy model easily by forcing a defined
# interface (that isn't the sqlalchemy model itself).
td_c = logbook.TaskDetail(td.name, uuid=td.uuid)
td_c.state = td.state
td_c.results = td.results
td_c.exception = td.exception
td_c.stacktrace = td.stacktrace
td_c.meta = td.meta
td_c.version = td.version
return td_c
def _convert_lb_to_external(lb_m):
"""Don't expose the internal sqlalchemy ORM model to the external api."""
lb_c = logbook.LogBook(lb_m.name, lb_m.uuid,
updated_at=lb_m.updated_at,
created_at=lb_m.created_at)
lb_c.meta = lb_m.meta
for fd_m in lb_m.flowdetails:
lb_c.add(_convert_fd_to_external(fd_m))
return lb_c
def _convert_lb_to_internal(lb_c):
"""Don't expose the external model to the sqlalchemy ORM model."""
lb_m = models.LogBook(uuid=lb_c.uuid, meta=lb_c.meta, name=lb_c.name)
lb_m.flowdetails = []
for fd_c in lb_c:
lb_m.flowdetails.append(_convert_fd_to_internal(fd_c, lb_c.uuid))
return lb_m
def _logbook_get_model(lb_id, session):
entry = session.query(models.LogBook).filter_by(uuid=lb_id).first()
if entry is None:
raise exc.NotFound("No logbook found with id: %s" % lb_id)
return entry
def _flow_details_get_model(f_id, session):
entry = session.query(models.FlowDetail).filter_by(uuid=f_id).first()
if entry is None:
raise exc.NotFound("No flow details found with id: %s" % f_id)
return entry
def _task_details_get_model(t_id, session):
entry = session.query(models.TaskDetail).filter_by(uuid=t_id).first()
if entry is None:
raise exc.NotFound("No task details found with id: %s" % t_id)
return entry
def _logbook_merge(lb_m, lb):
if lb_m.meta != lb.meta:
lb_m.meta = lb.meta
for fd in lb:
existing_fd = False
for fd_m in lb_m.flowdetails:
if fd_m.uuid == fd.uuid:
existing_fd = True
fd_m = _flowdetails_merge(fd_m, fd)
if not existing_fd:
lb_m.flowdetails.append(_convert_fd_to_internal(fd, lb_m.uuid))
return lb_m
def _flowdetails_merge(fd_m, fd):
if fd_m.meta != fd.meta:
fd_m.meta = fd.meta
if fd_m.state != fd.state:
fd_m.state = fd.state
for td in fd:
existing_td = False
for td_m in fd_m.taskdetails:
if td_m.uuid == td.uuid:
existing_td = True
td_m = _taskdetails_merge(td_m, td)
break
if not existing_td:
td_m = _convert_td_to_internal(td, fd_m.uuid)
fd_m.taskdetails.append(td_m)
return fd_m
def _taskdetails_merge(td_m, td):
if td_m.state != td.state:
td_m.state = td.state
if td_m.results != td.results:
td_m.results = td.results
if td_m.exception != td.exception:
td_m.exception = td.exception
if td_m.stacktrace != td.stacktrace:
td_m.stacktrace = td.stacktrace
if td_m.meta != td.meta:
td_m.meta = td.meta
return td_m

@ -1,18 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

@ -1,166 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of in-memory backend."""
import copy
import logging
import sys
import threading
from taskflow import exceptions as exc
from taskflow.openstack.common import timeutils
from taskflow.utils import threading_utils
LOG = logging.getLogger(__name__)
# TODO(harlowja): we likely need to figure out a better place to put these
# rather than globals.
LOG_BOOKS = {}
FLOW_DETAILS = {}
TASK_DETAILS = {}
# For now this will be a pretty big lock, since it is not expected that saves
# will be that frequent this seems ok for the time being. I imagine that this
# can be done better but it will require much more careful usage of a dict as
# a key/value map. Aka I wish python had a concurrent dict that was safe and
# known good to use.
SAVE_LOCK = threading.RLock()
READ_LOCK = threading.RLock()
READ_SAVE_ORDER = (READ_LOCK, SAVE_LOCK,)
def get_backend():
"""The backend is this module itself."""
return sys.modules[__name__]
def _taskdetails_merge(td_e, td_new):
"""Merges an existing taskdetails with a new one."""
if td_e.state != td_new.state:
td_e.state = td_new.state
if td_e.results != td_new.results:
td_e.results = td_new.results
if td_e.exception != td_new.exception:
td_e.exception = td_new.exception
if td_e.stacktrace != td_new.stacktrace:
td_e.stacktrace = td_new.stacktrace
if td_e.meta != td_new.meta:
td_e.meta = td_new.meta
return td_e
def taskdetails_save(td):
with threading_utils.MultiLock(READ_SAVE_ORDER):
try:
return _taskdetails_merge(TASK_DETAILS[td.uuid], td)
except KeyError:
raise exc.NotFound("No task details found with id: %s" % td.uuid)
def flowdetails_save(fd):
try:
with threading_utils.MultiLock(READ_SAVE_ORDER):
e_fd = FLOW_DETAILS[fd.uuid]
if e_fd.meta != fd.meta:
e_fd.meta = fd.meta
if e_fd.state != fd.state:
e_fd.state = fd.state
for td in fd:
if td not in e_fd:
td = copy.deepcopy(td)
TASK_DETAILS[td.uuid] = td
e_fd.add(td)
else:
# Previously added but not saved into the taskdetails
# 'permanent' storage.
if td.uuid not in TASK_DETAILS:
TASK_DETAILS[td.uuid] = copy.deepcopy(td)
taskdetails_save(td)
return e_fd
except KeyError:
raise exc.NotFound("No flow details found with id: %s" % fd.uuid)
def clear_all():
with threading_utils.MultiLock(READ_SAVE_ORDER):
count = 0
for lb_id in list(LOG_BOOKS.iterkeys()):
logbook_destroy(lb_id)
count += 1
return count
def logbook_get(lb_id):
try:
with READ_LOCK:
return LOG_BOOKS[lb_id]
except KeyError:
raise exc.NotFound("No logbook found with id: %s" % lb_id)
def logbook_destroy(lb_id):
try:
with threading_utils.MultiLock(READ_SAVE_ORDER):
# Do the same cascading delete that the sql layer does.
lb = LOG_BOOKS.pop(lb_id)
for fd in lb:
FLOW_DETAILS.pop(fd.uuid, None)
for td in fd:
TASK_DETAILS.pop(td.uuid, None)
except KeyError:
raise exc.NotFound("No logbook found with id: %s" % lb_id)
def logbook_save(lb):
# Acquire all the locks that will be needed to perform this operation with
# out being affected by other threads doing it at the same time.
with threading_utils.MultiLock(READ_SAVE_ORDER):
# Get a existing logbook model (or create it if it isn't there).
try:
backing_lb = LOG_BOOKS[lb.uuid]
if backing_lb.meta != lb.meta:
backing_lb.meta = lb.meta
# Add anything on to the existing loaded logbook that isn't already
# in the existing logbook.
for fd in lb:
if fd not in backing_lb:
FLOW_DETAILS[fd.uuid] = copy.deepcopy(fd)
backing_lb.add(flowdetails_save(fd))
else:
# Previously added but not saved into the flowdetails
# 'permanent' storage.
if fd.uuid not in FLOW_DETAILS:
FLOW_DETAILS[fd.uuid] = copy.deepcopy(fd)
flowdetails_save(fd)
# TODO(harlowja): figure out a better way to set this property
# without actually letting others set it external.
backing_lb._updated_at = timeutils.utcnow()
except KeyError:
backing_lb = copy.deepcopy(lb)
# TODO(harlowja): figure out a better way to set this property
# without actually letting others set it external.
backing_lb._created_at = timeutils.utcnow()
# Record all the pieces as being saved.
LOG_BOOKS[lb.uuid] = backing_lb
for fd in backing_lb:
FLOW_DETAILS[fd.uuid] = fd
for td in fd:
TASK_DETAILS[td.uuid] = td
return backing_lb

@ -1,246 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of a SQLAlchemy storage backend."""
import logging
import sys
from sqlalchemy import exceptions as sql_exc
from taskflow import exceptions as exc
from taskflow.openstack.common.db.sqlalchemy import session as db_session
from taskflow.persistence.backends.sqlalchemy import models
from taskflow.persistence import flowdetail
from taskflow.persistence import logbook
from taskflow.persistence import taskdetail
LOG = logging.getLogger(__name__)
def get_backend():
"""The backend is this module itself."""
return sys.modules[__name__]
def _convert_fd_to_external(fd):
fd_c = flowdetail.FlowDetail(fd.name, uuid=fd.uuid, backend='sqlalchemy')
fd_c.meta = fd.meta
fd_c.state = fd.state
for td in fd.taskdetails:
fd_c.add(_convert_td_to_external(td))
return fd_c
def _convert_fd_to_internal(fd, lb_uuid):
fd_m = models.FlowDetail(name=fd.name, uuid=fd.uuid, parent_uuid=lb_uuid,
meta=fd.meta, state=fd.state)
fd_m.taskdetails = []
for td in fd:
fd_m.taskdetails.append(_convert_td_to_internal(td, fd_m.uuid))
return fd_m
def _convert_td_to_internal(td, parent_uuid):
return models.TaskDetail(name=td.name, uuid=td.uuid,
state=td.state, results=td.results,
exception=td.exception, meta=td.meta,
stacktrace=td.stacktrace,
version=td.version, parent_uuid=parent_uuid)
def _convert_td_to_external(td):
# Convert from sqlalchemy model -> external model, this allows us
# to change the internal sqlalchemy model easily by forcing a defined
# interface (that isn't the sqlalchemy model itself).
td_c = taskdetail.TaskDetail(td.name, uuid=td.uuid, backend='sqlalchemy')
td_c.state = td.state
td_c.results = td.results
td_c.exception = td.exception
td_c.stacktrace = td.stacktrace
td_c.meta = td.meta
td_c.version = td.version
return td_c
def _convert_lb_to_external(lb_m):
"""Don't expose the internal sqlalchemy ORM model to the external api."""
lb_c = logbook.LogBook(lb_m.name, lb_m.uuid,
updated_at=lb_m.updated_at,
created_at=lb_m.created_at,
backend='sqlalchemy')
lb_c.meta = lb_m.meta
for fd_m in lb_m.flowdetails:
lb_c.add(_convert_fd_to_external(fd_m))
return lb_c
def _convert_lb_to_internal(lb_c):
"""Don't expose the external model to the sqlalchemy ORM model."""
lb_m = models.LogBook(uuid=lb_c.uuid, meta=lb_c.meta, name=lb_c.name)
lb_m.flowdetails = []
for fd_c in lb_c:
lb_m.flowdetails.append(_convert_fd_to_internal(fd_c, lb_c.uuid))
return lb_m
def _logbook_get_model(lb_id, session):
entry = session.query(models.LogBook).filter_by(uuid=lb_id).first()
if entry is None:
raise exc.NotFound("No logbook found with id: %s" % lb_id)
return entry
def _flow_details_get_model(f_id, session):
entry = session.query(models.FlowDetail).filter_by(uuid=f_id).first()
if entry is None:
raise exc.NotFound("No flow details found with id: %s" % f_id)
return entry
def _task_details_get_model(t_id, session):
entry = session.query(models.TaskDetail).filter_by(uuid=t_id).first()
if entry is None:
raise exc.NotFound("No task details found with id: %s" % t_id)
return entry
def _taskdetails_merge(td_m, td):
if td_m.state != td.state:
td_m.state = td.state
if td_m.results != td.results:
td_m.results = td.results
if td_m.exception != td.exception:
td_m.exception = td.exception
if td_m.stacktrace != td.stacktrace:
td_m.stacktrace = td.stacktrace
if td_m.meta != td.meta:
td_m.meta = td.meta
return td_m
def clear_all():
session = db_session.get_session()
with session.begin():
# NOTE(harlowja): due to how we have our relationship setup and
# cascading deletes are enabled, this will cause all associated task
# details and flow details to automatically be purged.
try:
return session.query(models.LogBook).delete()
except sql_exc.DBAPIError as e:
raise exc.StorageError("Failed clearing all entries: %s" % e, e)
def taskdetails_save(td):
# Must already exist since a tasks details has a strong connection to
# a flow details, and tasks details can not be saved on there own since
# they *must* have a connection to an existing flow details.
session = db_session.get_session()
with session.begin():
td_m = _task_details_get_model(td.uuid, session=session)
td_m = _taskdetails_merge(td_m, td)
td_m = session.merge(td_m)
return _convert_td_to_external(td_m)
def flowdetails_save(fd):
# Must already exist since a flow details has a strong connection to
# a logbook, and flow details can not be saved on there own since they
# *must* have a connection to an existing logbook.
session = db_session.get_session()
with session.begin():
fd_m = _flow_details_get_model(fd.uuid, session=session)
if fd_m.meta != fd.meta:
fd_m.meta = fd.meta
if fd_m.state != fd.state:
fd_m.state = fd.state
for td in fd:
updated = False
for td_m in fd_m.taskdetails:
if td_m.uuid == td.uuid:
updated = True
td_m = _taskdetails_merge(td_m, td)
break
if not updated:
fd_m.taskdetails.append(_convert_td_to_internal(td, fd_m.uuid))
fd_m = session.merge(fd_m)
return _convert_fd_to_external(fd_m)
def logbook_destroy(lb_id):
session = db_session.get_session()
with session.begin():
try:
lb = _logbook_get_model(lb_id, session=session)
session.delete(lb)
except sql_exc.DBAPIError as e:
raise exc.StorageError("Failed destroying"
" logbook %s: %s" % (lb_id, e), e)
def logbook_save(lb):
session = db_session.get_session()
with session.begin():
try:
lb_m = _logbook_get_model(lb.uuid, session=session)
# NOTE(harlowja): Merge them (note that this doesn't provide 100%
# correct update semantics due to how databases have MVCC). This
# is where a stored procedure or a better backing store would
# handle this better (something more suited to this type of data).
for fd in lb:
existing_fd = False
for fd_m in lb_m.flowdetails:
if fd_m.uuid == fd.uuid:
existing_fd = True
if fd_m.meta != fd.meta:
fd_m.meta = fd.meta
if fd_m.state != fd.state:
fd_m.state = fd.state
for td in fd:
existing_td = False
for td_m in fd_m.taskdetails:
if td_m.uuid == td.uuid:
existing_td = True
td_m = _taskdetails_merge(td_m, td)
break
if not existing_td:
td_m = _convert_td_to_internal(td, fd_m.uuid)
fd_m.taskdetails.append(td_m)
if not existing_fd:
lb_m.flowdetails.append(_convert_fd_to_internal(fd,
lb_m.uuid))
except exc.NotFound:
lb_m = _convert_lb_to_internal(lb)
try:
lb_m = session.merge(lb_m)
return _convert_lb_to_external(lb_m)
except sql_exc.DBAPIError as e:
raise exc.StorageError("Failed saving"
" logbook %s: %s" % (lb.uuid, e), e)
def logbook_get(lb_id):
session = db_session.get_session()
try:
lb_m = _logbook_get_model(lb_id, session=session)
return _convert_lb_to_external(lb_m)
except sql_exc.DBAPIError as e:
raise exc.StorageError("Failed getting"
" logbook %s: %s" % (lb_id, e), e)

@ -20,25 +20,25 @@
import os
from oslo.config import cfg
import alembic
from alembic import config as alembic_config
CONF = cfg.CONF
CONF.import_opt('connection',
'taskflow.openstack.common.db.sqlalchemy.session',
group='database')
from alembic import config as a_config
from alembic import environment as a_env
from alembic import script as a_script
def _alembic_config():
path = os.path.join(os.path.dirname(__file__), 'alembic', 'alembic.ini')
config = alembic_config.Config(path)
if not config.get_main_option('url'):
config.set_main_option('sqlalchemy.url', CONF.database.connection)
return config
return a_config.Config(path)
def db_sync():
def db_sync(connection, revision='head'):
script = a_script.ScriptDirectory.from_config(_alembic_config())
def upgrade(rev, context):
return script._upgrade_revs(revision, rev)
config = _alembic_config()
alembic.command.upgrade(config, "head")
with a_env.EnvironmentContext(config, script, fn=upgrade, as_sql=False,
starting_rev=None, destination_rev=revision,
tag=None) as context:
context.configure(connection=connection)
context.run_migrations()

@ -25,6 +25,7 @@ from sqlalchemy.orm import relationship
from sqlalchemy import types as types
from taskflow.openstack.common.db.sqlalchemy import models as c_models
from taskflow.openstack.common import jsonutils
from taskflow.openstack.common import uuidutils
@ -41,8 +42,7 @@ class Json(types.TypeDecorator, types.MutableType):
return jsonutils.loads(value)
class ModelBase(c_models.ModelBase,
c_models.TimestampMixin):
class ModelBase(c_models.ModelBase, c_models.TimestampMixin):
"""Base model for all taskflow objects"""
uuid = Column(String, default=uuidutils.generate_uuid,
primary_key=True, nullable=False, unique=True)

@ -1,100 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.persistence.backends import api as b_api
class FlowDetail(object):
"""This class contains an append-only list of task detail entries for a
given flow along with any metadata associated with that flow.
The data contained within this class need *not* backed by the backend
storage in real time. The data in this class will only be guaranteed to be
persisted when the logbook that contains this flow detail is saved or when
the save() method is called directly.
"""
def __init__(self, name, uuid, backend='memory'):
self._uuid = uuid
self._name = name
self._taskdetails = []
self.state = None
# Any other metadata to include about this flow while storing. For
# example timing information could be stored here, other misc. flow
# related items (edge connections)...
self.meta = None
self.backend = backend
def _get_backend(self):
if not self.backend:
return None
return b_api.fetch(self.backend)
def add(self, td):
self._taskdetails.append(td)
# When added the backend that the task details is using will be
# automatically switched to whatever backend this flow details is
# using.
if td.backend != self.backend:
td.backend = self.backend
def find(self, td_uuid):
for self_td in self:
if self_td.uuid == td_uuid:
return self_td
return None
def find_by_name(self, td_name):
for self_td in self:
if self_td.name == td_name:
return self_td
return None
def save(self):
"""Saves *most* of the components of this given object.
This will immediately and atomically save the attributes of this flow
details object to a backing store providing by the backing api.
The underlying storage must contain an existing flow details that this
save operation will merge with and then reflect those changes in this
object.
"""
backend = self._get_backend()
if not backend:
raise NotImplementedError("No saving backend provided")
fd_u = backend.flowdetails_save(self)
if fd_u is self:
return
self.meta = fd_u.meta
self.state = fd_u.state
self._taskdetails = fd_u._taskdetails
@property
def uuid(self):
return self._uuid
@property
def name(self):
return self._name
def __iter__(self):
for td in self._taskdetails:
yield td
def __len__(self):
return len(self._taskdetails)

@ -2,7 +2,8 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -16,8 +17,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
from taskflow.openstack.common import uuidutils
from taskflow.persistence.backends import api as b_api
LOG = logging.getLogger(__name__)
class LogBook(object):
@ -27,10 +31,9 @@ class LogBook(object):
The data contained within this class need *not* backed by the backend
storage in real time. The data in this class will only be guaranteed to be
persisted when the logbook is saved.
persisted when a save occurs via some backend connection.
"""
def __init__(self, name, uuid=None, updated_at=None, created_at=None,
backend='memory'):
def __init__(self, name, uuid=None, updated_at=None, created_at=None):
if uuid:
self._uuid = uuid
else:
@ -39,14 +42,8 @@ class LogBook(object):
self._flowdetails = []
self._updated_at = updated_at
self._created_at = created_at
self.backend = backend
self.meta = None
def _get_backend(self):
if not self.backend:
return None
return b_api.fetch(self.backend)
@property
def created_at(self):
return self._created_at
@ -55,59 +52,12 @@ class LogBook(object):
def updated_at(self):
return self._updated_at
def save(self):
"""Saves all the components of the given logbook.
This will immediately and atomically save all the entries of the given
logbook, including any flow details, and any associated task details,
that may be contained in this logbook to a backing store providing by
the backing api.
If the logbook is the underlying storage contains an existing logbook
then this save operation will merge the different flow details objects
to that logbook and then reflect those changes in this logbook.
"""
backend = self._get_backend()
if not backend:
raise NotImplementedError("No saving backend provided")
s_book = backend.logbook_save(self)
if s_book is self:
return
# Alter the internal variables to reflect what was saved (which may
# have new additions if there was a merge with pre-existing data).
self._name = s_book._name
self._flowdetails = s_book._flowdetails
self._updated_at = s_book._updated_at
self._created_at = s_book._created_at
self.meta = s_book.meta
def delete(self):
"""Deletes all traces of the given logbook.
This will delete the logbook entry, any associated flow detail entries
and any associated task detail entries associated with those flow
detail entries immediately via the backing api (using a atomic
transaction).
"""
backend = self._get_backend()
if not backend:
raise NotImplementedError("No deleting backend provided")
backend.logbook_destroy(self.uuid)
def add(self, flow_detail):
"""Adds a new entry to the underlying logbook.
Does not *guarantee* that the details will be immediatly saved.
"""
self._flowdetails.append(flow_detail)
# When added the backend that the flow details (and any owned task
# details) is using will be automatically switched to whatever backend
# this logbook is using.
if flow_detail.backend != self.backend:
flow_detail.backend = self.backend
for task_detail in flow_detail:
if task_detail.backend != self.backend:
task_detail.backend = self.backend
def find(self, flow_uuid):
for fd in self._flowdetails:
@ -131,6 +81,115 @@ class LogBook(object):
return len(self._flowdetails)
def load(lb_id, backend='memory'):
"""Loads a given logbook (if it exists) from the given backend type."""
return b_api.fetch(backend).logbook_get(lb_id)
class FlowDetail(object):
"""This class contains an append-only list of task detail entries for a
given flow along with any metadata associated with that flow.
The data contained within this class need *not* backed by the backend
storage in real time. The data in this class will only be guaranteed to be
persisted when a save/update occurs via some backend connection.
"""
def __init__(self, name, uuid):
self._uuid = uuid
self._name = name
self._taskdetails = []
self.state = None
# Any other metadata to include about this flow while storing. For
# example timing information could be stored here, other misc. flow
# related items (edge connections)...
self.meta = None
def update(self, fd):
"""Updates the objects state to be the same as the given one."""
if fd is self:
return
self._taskdetails = list(fd._taskdetails)
self.state = fd.state
self.meta = fd.meta
def add(self, td):
self._taskdetails.append(td)
def find(self, td_uuid):
for self_td in self:
if self_td.uuid == td_uuid:
return self_td
return None
def find_by_name(self, td_name):
for self_td in self:
if self_td.name == td_name:
return self_td
return None
@property
def uuid(self):
return self._uuid
@property
def name(self):
return self._name
def __iter__(self):
for td in self._taskdetails:
yield td
def __len__(self):
return len(self._taskdetails)
class TaskDetail(object):
"""This class contains an entry that contains the persistance of a task
after or before (or during) it is running including any results it may have
produced, any state that it may be in (failed for example), any exception
that occured when running and any associated stacktrace that may have
occuring during that exception being thrown and any other metadata that
should be stored along-side the details about this task.
The data contained within this class need *not* backed by the backend
storage in real time. The data in this class will only be guaranteed to be
persisted when a save/update occurs via some backend connection.
"""
def __init__(self, name, uuid):
self._uuid = uuid
self._name = name
# TODO(harlowja): decide if these should be passed in and therefore
# immutable or let them be assigned?
#
# The state the task was last in.
self.state = None
# The results it may have produced (useful for reverting).
self.results = None
# An exception that it may have thrown (or part of it), useful for
# knowing what failed.
self.exception = None
# Any stack trace the exception may have had, useful for debugging or
# examining the failure in more depth.
self.stacktrace = None
# Any other metadata to include about this task while storing. For
# example timing information could be stored here, other misc. task
# related items.
self.meta = None
# The version of the task this task details was associated with which
# is quite useful for determining what versions of tasks this detail
# information can be associated with.
self.version = None
def update(self, td):
"""Updates the objects state to be the same as the given one."""
if td is self:
return
self.state = td.state
self.meta = td.meta
self.stacktrace = td.stacktrace
self.exception = td.exception
self.results = td.results
self.version = td.version
@property
def uuid(self):
return self._uuid
@property
def name(self):
return self._name

@ -1,94 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.persistence.backends import api as b_api
class TaskDetail(object):
"""This class contains an entry that contains the persistance of a task
after or before (or during) it is running including any results it may have
produced, any state that it may be in (failed for example), any exception
that occured when running and any associated stacktrace that may have
occuring during that exception being thrown and any other metadata that
should be stored along-side the details about this task.
The data contained within this class need *not* backed by the backend
storage in real time. The data in this class will only be guaranteed to be
persisted when the logbook that contains this task detail is saved or when
the save() method is called directly.
"""
def __init__(self, name, uuid, backend='memory'):
self._uuid = uuid
self._name = name
self.backend = backend
# TODO(harlowja): decide if these should be passed in and therefore
# immutable or let them be assigned?
#
# The state the task was last in.
self.state = None
# The results it may have produced (useful for reverting).
self.results = None
# An exception that it may have thrown (or part of it), useful for
# knowing what failed.
self.exception = None
# Any stack trace the exception may have had, useful for debugging or
# examining the failure in more depth.
self.stacktrace = None
# Any other metadata to include about this task while storing. For
# example timing information could be stored here, other misc. task
# related items.
self.meta = None
# The version of the task this task details was associated with which
# is quite useful for determining what versions of tasks this detail
# information can be associated with.
self.version = None
def save(self):
"""Saves *most* of the components of this given object.
This will immediately and atomically save the attributes of this task
details object to a backing store providing by the backing api.
The underlying storage must contain an existing task details that this
save operation will merge with and then reflect those changes in this
object.
"""
backend = self._get_backend()
if not backend:
raise NotImplementedError("No saving backend provided")
td_u = backend.taskdetails_save(self)
if td_u is self:
return
self.meta = td_u.meta
self.exception = td_u.exception
self.results = td_u.results
self.stacktrace = td_u.stacktrace
self.state = td_u.state
def _get_backend(self):
if not self.backend:
return None
return b_api.fetch(self.backend)
@property
def uuid(self):
return self._uuid
@property
def name(self):
return self._name

@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging
from taskflow.openstack.common import uuidutils
from taskflow.persistence import logbook
LOG = logging.getLogger(__name__)
def temporary_log_book(backend):
"""Creates a temporary logbook for temporary usage in the given backend.
Mainly useful for tests and other use cases where a temporary logbook
is needed for a short-period of time.
"""
book = logbook.LogBook('tmp')
with contextlib.closing(backend.get_connection()) as conn:
conn.save_logbook(book)
return book
def temporary_flow_detail(backend):
"""Creates a temporary flow detail and logbook for temporary usage in
the given backend.
Mainly useful for tests and other use cases where a temporary flow detail
is needed for a short-period of time.
"""
flow_id = uuidutils.generate_uuid()
book = temporary_log_book(backend)
with contextlib.closing(backend.get_connection()) as conn:
book.add(logbook.FlowDetail(name='tmp-flow-detail', uuid=flow_id))
conn.save_logbook(book)
# Return the one from the saved logbook instead of the local one so
# that the freshest version is given back.
return (book, book.find(flow_id))
def create_flow_detail(flow, book=None, backend=None):
"""Creates a flow detail for the given flow and adds it to the provided
logbook (if provided) and then uses the given backend (if provided) to
save the logbook then returns the created flow detail.
"""
try:
flow_name = getattr(flow, 'name')
except AttributeError:
LOG.warn("Flow %s does not have a name attribute, creating one.", flow)
flow_name = uuidutils.generate_uuid()
try:
flow_id = getattr(flow, 'uuid')
except AttributeError:
LOG.warn("Flow %s does not have a uuid attribute, creating one.", flow)
flow_id = uuidutils.generate_uuid()
flow_detail = logbook.FlowDetail(name=flow_name, uuid=flow_id)
if book is not None:
book.add(flow_detail)
if backend is not None:
with contextlib.closing(backend.get_connection()) as conn:
conn.save_logbook(book)
# Return the one from the saved logbook instead of the local one so
# that the freshest version is given back
return book.find(flow_id)
else:
if backend is not None:
LOG.warn("Can not save %s without a provided logbook", flow)
return flow_detail

@ -16,32 +16,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from taskflow import exceptions
from taskflow.openstack.common import uuidutils
from taskflow.persistence import flowdetail
from taskflow.persistence import logbook
from taskflow.persistence import taskdetail
from taskflow import states
from taskflow.utils import threading_utils
def temporary_flow_detail():
"""Creates flow detail class for temporary usage
Creates in-memory logbook and flow detail in it. Should
be useful for tests and other use cases where persistence
is not needed
"""
lb = logbook.LogBook('tmp', backend='memory')
fd = flowdetail.FlowDetail(
name='tmp', uuid=uuidutils.generate_uuid(),
backend='memory')
lb.add(fd)
lb.save()
fd.save()
return fd
STATES_WITH_RESULTS = (states.SUCCESS, states.REVERTING, states.FAILURE)
@ -54,16 +36,17 @@ class Storage(object):
injector_name = '_TaskFlow_INJECTOR'
def __init__(self, flow_detail=None):
def __init__(self, flow_detail, backend=None):
self._result_mappings = {}
self._reverse_mapping = {}
self._backend = backend
self._flowdetail = flow_detail
if flow_detail is None:
# TODO(imelnikov): this is useful mainly for tests;
# maybe we should make flow_detail required parameter?
self._flowdetail = temporary_flow_detail()
else:
self._flowdetail = flow_detail
def _with_connection(self, functor, *args, **kwargs):
if self._backend is None:
return
with contextlib.closing(self._backend.get_connection()) as conn:
functor(conn, *args, **kwargs)
def add_task(self, uuid, task_name):
"""Add the task to storage
@ -72,12 +55,18 @@ class Storage(object):
Task state is set to PENDING.
"""
# TODO(imelnikov): check that task with same uuid or
# task name does not exist
td = taskdetail.TaskDetail(name=task_name, uuid=uuid)
# task name does not exist
td = logbook.TaskDetail(name=task_name, uuid=uuid)
td.state = states.PENDING
self._flowdetail.add(td)
self._flowdetail.save()
td.save()
self._with_connection(self._save_flow_detail)
self._with_connection(self._save_task_detail, task_detail=td)
def _save_flow_detail(self, conn):
# NOTE(harlowja): we need to update our contained flow detail if
# the result of the update actually added more (aka another process
# added item to the flow detail).
self._flowdetail.update(conn.update_flow_details(self._flowdetail))
def get_uuid_by_name(self, task_name):
"""Get uuid of task with given name"""
@ -93,11 +82,17 @@ class Storage(object):
raise exceptions.NotFound("Unknown task: %r" % uuid)
return td
def _save_task_detail(self, conn, task_detail):
# NOTE(harlowja): we need to update our contained task detail if
# the result of the update actually added more (aka another process
# is also modifying the task detail).
task_detail.update(conn.update_task_details(task_detail))
def set_task_state(self, uuid, state):
"""Set task state"""
td = self._taskdetail_by_uuid(uuid)
td.state = state
td.save()
self._with_connection(self._save_task_detail, task_detail=td)
def get_task_state(self, uuid):
"""Get state of task with given uuid"""
@ -108,7 +103,7 @@ class Storage(object):
td = self._taskdetail_by_uuid(uuid)
td.state = state
td.results = data
td.save()
self._with_connection(self._save_task_detail, task_detail=td)
def get(self, uuid):
"""Get result for task with id 'uuid' to storage"""
@ -122,7 +117,7 @@ class Storage(object):
td = self._taskdetail_by_uuid(uuid)
td.results = None
td.state = state
td.save()
self._with_connection(self._save_task_detail, task_detail=td)
def inject(self, pairs):
"""Add values into storage
@ -202,7 +197,7 @@ class Storage(object):
def set_flow_state(self, state):
"""Set flowdetails state and save it"""
self._flowdetail.state = state
self._flowdetail.save()
self._with_connection(self._save_flow_detail)
def get_flow_state(self):
"""Set state from flowdetails"""

@ -18,38 +18,12 @@
import unittest2
from oslo.config import cfg
CONF = cfg.CONF
class TestCase(unittest2.TestCase):
"""Test case base class for all unit tests."""
def setUp(self):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
self.overriden = []
self.addCleanup(self._clear_attrs)
def tearDown(self):
super(TestCase, self).tearDown()
self._reset_flags()
def _reset_flags(self):
for k, group in self.overriden:
CONF.clear_override(k, group=group)
def _clear_attrs(self):
# Delete attributes that don't start with _ so they don't pin
# memory around unnecessarily for the duration of the test
# suite
for key in [k for k in self.__dict__.keys() if k[0] != '_']:
del self.__dict__[key]
def flags(self, **kw):
"""Override flag variables for a test."""
group = kw.pop('group', None)
for k, v in kw.iteritems():
CONF.set_override(k, v, group)
self.overriden.append((k, group))

@ -16,34 +16,33 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from taskflow import exceptions as exc
from taskflow.openstack.common import uuidutils
from taskflow.persistence import flowdetail
from taskflow.persistence import logbook
from taskflow.persistence import taskdetail
class PersistenceTestMixin(object):
def _get_backend():
def _get_connection():
raise NotImplementedError()
def test_logbook_simple_save(self):
def test_logbook_save_retrieve(self):
lb_id = uuidutils.generate_uuid()
lb_meta = {'1': 2}
lb_name = 'lb-%s' % (lb_id)
lb = logbook.LogBook(name=lb_name, uuid=lb_id,
backend=self._get_backend())
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
lb.meta = lb_meta
# Should not already exist
self.assertRaises(exc.NotFound, logbook.load, lb_id,
backend=self._get_backend())
with contextlib.closing(self._get_connection()) as conn:
self.assertRaises(exc.NotFound, conn.get_logbook, lb_id)
conn.save_logbook(lb)
lb.save()
del lb
lb = None
lb = logbook.load(lb_id, backend=self._get_backend())
# Make sure we can reload it (and all of its attributes are what
# we expect them to be).
with contextlib.closing(self._get_connection()) as conn:
lb = conn.get_logbook(lb_id)
self.assertEquals(lb_name, lb.name)
self.assertEquals(0, len(lb))
self.assertEquals(lb_meta, lb.meta)
@ -53,109 +52,104 @@ class PersistenceTestMixin(object):
def test_flow_detail_save(self):
lb_id = uuidutils.generate_uuid()
lb_name = 'lb-%s' % (lb_id)
lb = logbook.LogBook(name=lb_name, uuid=lb_id,
backend=self._get_backend())
fd = flowdetail.FlowDetail('test', uuid=uuidutils.generate_uuid())
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
lb.add(fd)
# Ensure we can't save it since its owning logbook hasn't been
# saved.
self.assertRaises(exc.NotFound, fd.save)
# saved (flow details can not exist on there own without a connection
# to a logbook).
with contextlib.closing(self._get_connection()) as conn:
self.assertRaises(exc.NotFound, conn.get_logbook, lb_id)
self.assertRaises(exc.NotFound, conn.update_flow_details, fd)
# Ok now we should be able to save it
lb.save()
fd.save()
# Ok now we should be able to save both.
with contextlib.closing(self._get_connection()) as conn:
conn.save_logbook(lb)
conn.update_flow_details(fd)
def test_task_detail_save(self):
lb_id = uuidutils.generate_uuid()
lb_name = 'lb-%s' % (lb_id)
lb = logbook.LogBook(name=lb_name, uuid=lb_id,
backend=self._get_backend())
fd = flowdetail.FlowDetail('test', uuid=uuidutils.generate_uuid())
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
lb.add(fd)
td = taskdetail.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
fd.add(td)
# Ensure we can't save it since its owning logbook hasn't been
# saved.
self.assertRaises(exc.NotFound, fd.save)
self.assertRaises(exc.NotFound, td.save)
# saved (flow details/task details can not exist on there own without
# there parent existing).
with contextlib.closing(self._get_connection()) as conn:
self.assertRaises(exc.NotFound, conn.update_flow_details, fd)
self.assertRaises(exc.NotFound, conn.update_task_details, td)
# Ok now we should be able to save it
lb.save()
fd.save()
td.save()
# Ok now we should be able to save them.
with contextlib.closing(self._get_connection()) as conn:
conn.save_logbook(lb)
conn.update_flow_details(fd)
conn.update_task_details(td)
def test_logbook_merge_flow_detail(self):
lb_id = uuidutils.generate_uuid()
lb_name = 'lb-%s' % (lb_id)
lb = logbook.LogBook(name=lb_name, uuid=lb_id,
backend=self._get_backend())
fd = flowdetail.FlowDetail('test', uuid=uuidutils.generate_uuid())
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
lb.add(fd)
lb.save()
lb2 = logbook.LogBook(name=lb_name, uuid=lb_id,
backend=self._get_backend())
fd = flowdetail.FlowDetail('test2', uuid=uuidutils.generate_uuid())
lb2.add(fd)
lb2.save()
lb3 = logbook.load(lb_id, backend=self._get_backend())
self.assertEquals(2, len(lb3))
with contextlib.closing(self._get_connection()) as conn:
conn.save_logbook(lb)
lb2 = logbook.LogBook(name=lb_name, uuid=lb_id)
fd2 = logbook.FlowDetail('test2', uuid=uuidutils.generate_uuid())
lb2.add(fd2)
with contextlib.closing(self._get_connection()) as conn:
conn.save_logbook(lb2)
with contextlib.closing(self._get_connection()) as conn:
lb3 = conn.get_logbook(lb_id)
self.assertEquals(2, len(lb3))
def test_logbook_add_flow_detail(self):
lb_id = uuidutils.generate_uuid()
lb_name = 'lb-%s' % (lb_id)
lb = logbook.LogBook(name=lb_name, uuid=lb_id,
backend=self._get_backend())
fd = flowdetail.FlowDetail('test', uuid=uuidutils.generate_uuid())
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
lb.add(fd)
lb.save()
lb2 = logbook.load(lb_id, backend=self._get_backend())
self.assertEquals(1, len(lb2))
self.assertEquals(1, len(lb))
self.assertEquals(fd.name, lb2.find(fd.uuid).name)
with contextlib.closing(self._get_connection()) as conn:
conn.save_logbook(lb)
with contextlib.closing(self._get_connection()) as conn:
lb2 = conn.get_logbook(lb_id)
self.assertEquals(1, len(lb2))
self.assertEquals(1, len(lb))
self.assertEquals(fd.name, lb2.find(fd.uuid).name)
def test_logbook_add_task_detail(self):
lb_id = uuidutils.generate_uuid()
lb_name = 'lb-%s' % (lb_id)
lb = logbook.LogBook(name=lb_name, uuid=lb_id,
backend=self._get_backend())
fd = flowdetail.FlowDetail('test', uuid=uuidutils.generate_uuid())
td = taskdetail.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
fd.add(td)
lb.add(fd)
lb.save()
lb2 = logbook.load(lb_id, backend=self._get_backend())
self.assertEquals(1, len(lb2))
tasks = 0
for fd in lb:
tasks += len(fd)
self.assertEquals(1, tasks)
with contextlib.closing(self._get_connection()) as conn:
conn.save_logbook(lb)
with contextlib.closing(self._get_connection()) as conn:
lb2 = conn.get_logbook(lb_id)
self.assertEquals(1, len(lb2))
tasks = 0
for fd in lb:
tasks += len(fd)
self.assertEquals(1, tasks)
def test_logbook_delete(self):
lb_id = uuidutils.generate_uuid()
lb_name = 'lb-%s' % (lb_id)
lb = logbook.LogBook(name=lb_name, uuid=lb_id,
backend=self._get_backend())
# Ensure we can't delete it since it hasn't been saved
self.assertRaises(exc.NotFound, lb.delete)
lb.save()
lb2 = logbook.load(lb_id, backend=self._get_backend())
self.assertIsNotNone(lb2)
lb.delete()
self.assertRaises(exc.NotFound, lb.delete)
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
with contextlib.closing(self._get_connection()) as conn:
self.assertRaises(exc.NotFound, conn.destroy_logbook, lb_id)
with contextlib.closing(self._get_connection()) as conn:
conn.save_logbook(lb)
with contextlib.closing(self._get_connection()) as conn:
lb2 = conn.get_logbook(lb_id)
self.assertIsNotNone(lb2)
with contextlib.closing(self._get_connection()) as conn:
conn.destroy_logbook(lb_id)
self.assertRaises(exc.NotFound, conn.destroy_logbook, lb_id)

@ -16,15 +16,16 @@
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.persistence.backends import api as b_api
from taskflow.persistence.backends import impl_memory
from taskflow import test
from taskflow.tests.unit.persistence import base
class MemoryPersistenceTest(test.TestCase, base.PersistenceTestMixin):
def _get_backend(self):
return 'memory'
def _get_connection(self):
return impl_memory.MemoryBackend({}).get_connection()
def tearDown(self):
b_api.fetch(self._get_backend()).clear_all()
conn = self._get_connection()
conn.clear_all()
super(MemoryPersistenceTest, self).tearDown()

@ -19,31 +19,29 @@
import os
import tempfile
from taskflow.openstack.common.db.sqlalchemy import session
from taskflow.persistence.backends import api as b_api
from taskflow.persistence.backends.sqlalchemy import migration
from taskflow.persistence.backends import impl_sqlalchemy
from taskflow import test
from taskflow.tests.unit.persistence import base
class SqlPersistenceTest(test.TestCase, base.PersistenceTestMixin):
"""Inherits from the base test and sets up a sqlite temporary db."""
def _get_backend(self):
return 'sqlalchemy'
def setupDatabase(self):
_handle, db_location = tempfile.mkstemp()
db_uri = "sqlite:///%s" % (db_location)
session.set_defaults(db_uri, db_location)
migration.db_sync()
return db_location
def _get_connection(self):
conf = {
'connection': self.db_uri,
}
conn = impl_sqlalchemy.SQLAlchemyBackend(conf).get_connection()
return conn
def setUp(self):
super(SqlPersistenceTest, self).setUp()
self.db_location = self.setupDatabase()
self.db_location = tempfile.mktemp(suffix='.db')
self.db_uri = "sqlite:///%s" % (self.db_location)
# Ensure upgraded to the right schema
conn = self._get_connection()
conn.upgrade()
def tearDown(self):
b_api.fetch(self._get_backend()).clear_all()
super(SqlPersistenceTest, self).tearDown()
if self.db_location and os.path.isfile(self.db_location):
os.unlink(self.db_location)

@ -16,21 +16,22 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from multiprocessing import pool
import time
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow.engines.action_engine import engine as eng
from taskflow import exceptions
from taskflow.persistence import taskdetail
from taskflow.persistence.backends import impl_memory
from taskflow.persistence import logbook
from taskflow.persistence import utils as p_utils
from taskflow import states
from taskflow import storage
from taskflow import task
from taskflow import test
from taskflow.engines.action_engine import engine as eng
class TestTask(task.Task):
@ -103,8 +104,17 @@ class EngineTestBase(object):
def setUp(self):
super(EngineTestBase, self).setUp()
self.values = []
self.backend = impl_memory.MemoryBackend(conf={})
self.book = p_utils.temporary_log_book(self.backend)
def _make_engine(self, _flow, _flow_detail=None):
def tearDown(self):
super(EngineTestBase, self).tearDown()
with contextlib.closing(self.backend) as be:
with contextlib.closing(be.get_connection()) as conn:
conn.clear_all()
self.book = None
def _make_engine(self, flow, flow_detail=None):
raise NotImplementedError()
@ -114,7 +124,6 @@ class EngineTaskTest(EngineTestBase):
flow = lf.Flow('test-1')
flow.add(TestTask(self.values, name='task1'))
engine = self._make_engine(flow)
engine.compile()
engine.run()
self.assertEquals(self.values, ['task1'])
@ -405,13 +414,15 @@ class EngineParallelFlowTest(EngineTestBase):
)
# Create FlowDetail as if we already run task1
fd = storage.temporary_flow_detail()
td = taskdetail.TaskDetail(name='task1', uuid='42')
_lb, fd = p_utils.temporary_flow_detail(self.backend)
td = logbook.TaskDetail(name='task1', uuid='42')
td.state = states.SUCCESS
td.results = 17
fd.add(td)
fd.save()
td.save()
with contextlib.closing(self.backend.get_connection()) as conn:
fd.update(conn.update_flow_details(fd))
td.update(conn.update_task_details(td))
engine = self._make_engine(flow, fd)
engine.run()
@ -425,22 +436,30 @@ class SingleThreadedEngineTest(EngineTaskTest,
EngineParallelFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None):
return eng.SingleThreadedActionEngine(flow, flow_detail=flow_detail)
if flow_detail is None:
flow_detail = p_utils.create_flow_detail(flow, self.book,
self.backend)
return eng.SingleThreadedActionEngine(flow, backend=self.backend,
flow_detail=flow_detail)
class MultiThreadedEngineTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None):
return eng.MultiThreadedActionEngine(flow, flow_detail=flow_detail)
def _make_engine(self, flow, flow_detail=None, thread_pool=None):
if flow_detail is None:
flow_detail = p_utils.create_flow_detail(flow, self.book,
self.backend)
return eng.MultiThreadedActionEngine(flow, backend=self.backend,
flow_detail=flow_detail,
thread_pool=thread_pool)
def test_using_common_pool(self):
flow = TestTask(self.values, name='task1')
thread_pool = pool.ThreadPool()
e1 = eng.MultiThreadedActionEngine(flow, thread_pool=thread_pool)
e2 = eng.MultiThreadedActionEngine(flow, thread_pool=thread_pool)
e1 = self._make_engine(flow, thread_pool=thread_pool)
e2 = self._make_engine(flow, thread_pool=thread_pool)
self.assertIs(e1.thread_pool, e2.thread_pool)
def test_parallel_revert_specific(self):

@ -23,13 +23,11 @@ from taskflow import test
from taskflow.engines.action_engine import engine as eng
def _make_engine(flow):
e = eng.SingleThreadedActionEngine(flow)
e.compile()
return e
class WrapableObjectsTest(test.TestCase):
def _make_engine(self, flow):
e = eng.SingleThreadedActionEngine(flow)
e.compile()
return e
def test_simple_function(self):
values = []
@ -52,7 +50,7 @@ class WrapableObjectsTest(test.TestCase):
run_fail
)
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
e = _make_engine(flow)
e = self._make_engine(flow)
e.run()
self.assertEquals(values, ['one', 'fail', 'revert one'])
@ -80,7 +78,7 @@ class WrapableObjectsTest(test.TestCase):
tasks.run_fail
)
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
e = _make_engine(flow)
e = self._make_engine(flow)
e.run()
self.assertEquals(tasks.values, ['one', 'fail'])
@ -106,7 +104,7 @@ class WrapableObjectsTest(test.TestCase):
MyTasks.run_fail
)
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
e = _make_engine(flow)
e = self._make_engine(flow)
e.run()
self.assertEquals(values, ['one', 'fail'])
@ -133,6 +131,6 @@ class WrapableObjectsTest(test.TestCase):
MyTasks.run_fail
)
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
e = _make_engine(flow)
e = self._make_engine(flow)
e.run()
self.assertEquals(MyTasks.values, ['one', 'fail'])

@ -19,24 +19,22 @@
import collections
from taskflow import decorators
from taskflow.engines.action_engine import engine as eng
from taskflow import exceptions as exc
from taskflow.patterns import linear_flow as lw
from taskflow import states
from taskflow import test
from taskflow.patterns import linear_flow as lw
from taskflow.tests import utils
from taskflow.engines.action_engine import engine as eng
def _make_engine(flow):
e = eng.SingleThreadedActionEngine(flow)
e.compile()
e.storage.inject([('context', {})])
return e
class LinearFlowTest(test.TestCase):
def _make_engine(self, flow):
e = eng.SingleThreadedActionEngine(flow)
e.compile()
e.storage.inject([('context', {})])
return e
def make_reverting_task(self, token, blowup=False):
def do_revert(context, *args, **kwargs):
@ -67,7 +65,7 @@ class LinearFlowTest(test.TestCase):
wf = lw.Flow("the-test-action")
wf.add(do_apply1)
e = _make_engine(wf)
e = self._make_engine(wf)
e.run()
data = e.storage.fetch_all()
self.assertIn('a', data)
@ -92,7 +90,7 @@ class LinearFlowTest(test.TestCase):
wf.add(do_apply1)
wf.add(do_apply2)
e = _make_engine(wf)
e = self._make_engine(wf)
e.run()
self.assertEquals(2, len(e.storage.fetch('context')))
@ -111,7 +109,7 @@ class LinearFlowTest(test.TestCase):
wf.add(self.make_reverting_task(2, False))
wf.add(self.make_reverting_task(1, True))
e = _make_engine(wf)
e = self._make_engine(wf)
e.notifier.register('*', listener)
e.task_notifier.register('*', task_listener)
self.assertRaises(Exception, e.run)
@ -148,7 +146,7 @@ class LinearFlowTest(test.TestCase):
wf = lw.Flow("the-test-action")
wf.add(self.make_reverting_task(1))
e = _make_engine(wf)
e = self._make_engine(wf)
e.notifier.register('*', listener)
e.run()
@ -159,7 +157,7 @@ class LinearFlowTest(test.TestCase):
for i in range(0, 10):
wf.add(self.make_reverting_task(i))
e = _make_engine(wf)
e = self._make_engine(wf)
capture_func, captured = self._capture_states()
e.task_notifier.register('*', capture_func)
e.run()
@ -189,7 +187,7 @@ class LinearFlowTest(test.TestCase):
wf.add(self.make_reverting_task(2, True))
capture_func, captured = self._capture_states()
e = _make_engine(wf)
e = self._make_engine(wf)
e.task_notifier.register('*', capture_func)
self.assertRaises(Exception, e.run)
@ -225,7 +223,7 @@ class LinearFlowTest(test.TestCase):
wf = lw.Flow("the-test-action")
wf.add(task_a)
wf.add(task_b)
e = _make_engine(wf)
e = self._make_engine(wf)
self.assertRaises(exc.NotFound, e.run)
def test_flow_bad_order(self):
@ -239,7 +237,7 @@ class LinearFlowTest(test.TestCase):
no_req_task = utils.ProvidesRequiresTask('test-2', requires=['c'],
provides=[])
wf.add(no_req_task)
e = _make_engine(wf)
e = self._make_engine(wf)
self.assertRaises(exc.NotFound, e.run)
def test_flow_set_order(self):
@ -250,7 +248,7 @@ class LinearFlowTest(test.TestCase):
wf.add(utils.ProvidesRequiresTask('test-2',
requires=set(['a', 'b']),
provides=set([])))
e = _make_engine(wf)
e = self._make_engine(wf)
e.run()
run_context = e.storage.fetch('context')
ordering = run_context[utils.ORDER_KEY]
@ -283,7 +281,7 @@ class LinearFlowTest(test.TestCase):
requires=['d'],
provides=[]))
e = _make_engine(wf)
e = self._make_engine(wf)
e.run()
run_context = e.storage.fetch('context')
ordering = run_context[utils.ORDER_KEY]

@ -16,21 +16,38 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from taskflow import exceptions
from taskflow.persistence.backends import impl_memory
from taskflow.persistence import utils as p_utils
from taskflow import states
from taskflow import storage
from taskflow import test
class StorageTest(test.TestCase):
def setUp(self):
super(StorageTest, self).setUp()
self.backend = impl_memory.MemoryBackend(conf={})
def _get_storage(self):
lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
return storage.Storage(backend=self.backend, flow_detail=flow_detail)
def tearDown(self):
super(StorageTest, self).tearDown()
with contextlib.closing(self.backend) as be:
with contextlib.closing(be.get_connection()) as conn:
conn.clear_all()
def test_add_task(self):
s = storage.Storage()
s = self._get_storage()
s.add_task('42', 'my task')
self.assertEquals(s.get_task_state('42'), states.PENDING)
def test_save_and_get(self):
s = storage.Storage()
s = self._get_storage()
s.add_task('42', 'my task')
s.save('42', 5)
self.assertEquals(s.get('42'), 5)
@ -38,20 +55,20 @@ class StorageTest(test.TestCase):
self.assertEquals(s.get_task_state('42'), states.SUCCESS)
def test_save_and_get_other_state(self):
s = storage.Storage()
s = self._get_storage()
s.add_task('42', 'my task')
s.save('42', 5, states.FAILURE)
self.assertEquals(s.get('42'), 5)
self.assertEquals(s.get_task_state('42'), states.FAILURE)
def test_get_non_existing_var(self):
s = storage.Storage()
s = self._get_storage()
s.add_task('42', 'my task')
with self.assertRaises(exceptions.NotFound):
s.get('42')
def test_reset(self):
s = storage.Storage()
s = self._get_storage()
s.add_task('42', 'my task')
s.save('42', 5)
s.reset('42')
@ -60,12 +77,12 @@ class StorageTest(test.TestCase):
s.get('42')
def test_reset_unknown_task(self):
s = storage.Storage()
s = self._get_storage()
s.add_task('42', 'my task')
self.assertEquals(s.reset('42'), None)
def test_fetch_by_name(self):
s = storage.Storage()
s = self._get_storage()
s.add_task('42', 'my task')
name = 'my result'
s.set_result_mapping('42', {name: None})
@ -74,13 +91,13 @@ class StorageTest(test.TestCase):
self.assertEquals(s.fetch_all(), {name: 5})
def test_fetch_unknown_name(self):
s = storage.Storage()
s = self._get_storage()
with self.assertRaisesRegexp(exceptions.NotFound,
"^Name 'xxx' is not mapped"):
s.fetch('xxx')
def test_fetch_result_not_ready(self):
s = storage.Storage()
s = self._get_storage()
s.add_task('42', 'my task')
name = 'my result'
s.set_result_mapping('42', {name: None})
@ -89,7 +106,7 @@ class StorageTest(test.TestCase):
self.assertEquals(s.fetch_all(), {})
def test_save_multiple_results(self):
s = storage.Storage()
s = self._get_storage()
s.add_task('42', 'my task')
s.set_result_mapping('42', {'foo': 0, 'bar': 1, 'whole': None})
s.save('42', ('spam', 'eggs'))
@ -100,14 +117,14 @@ class StorageTest(test.TestCase):
})
def test_mapping_none(self):
s = storage.Storage()
s = self._get_storage()
s.add_task('42', 'my task')
s.set_result_mapping('42', None)
s.save('42', 5)
self.assertEquals(s.fetch_all(), {})
def test_inject(self):
s = storage.Storage()
s = self._get_storage()
s.inject({'foo': 'bar', 'spam': 'eggs'})
self.assertEquals(s.fetch('spam'), 'eggs')
self.assertEquals(s.fetch_all(), {
@ -116,48 +133,49 @@ class StorageTest(test.TestCase):
})
def test_fetch_meapped_args(self):
s = storage.Storage()
s = self._get_storage()
s.inject({'foo': 'bar', 'spam': 'eggs'})
self.assertEquals(s.fetch_mapped_args({'viking': 'spam'}),
{'viking': 'eggs'})
def test_fetch_not_found_args(self):
s = storage.Storage()
s = self._get_storage()
s.inject({'foo': 'bar', 'spam': 'eggs'})
with self.assertRaises(exceptions.NotFound):
s.fetch_mapped_args({'viking': 'helmet'})
def test_set_and_get_task_state(self):
s = storage.Storage()
s = self._get_storage()
state = states.PENDING
s.add_task('42', 'my task')
s.set_task_state('42', state)
self.assertEquals(s.get_task_state('42'), state)
def test_get_state_of_unknown_task(self):
s = storage.Storage()
s = self._get_storage()
with self.assertRaisesRegexp(exceptions.NotFound, '^Unknown'):
s.get_task_state('42')
def test_task_by_name(self):
s = storage.Storage()
s = self._get_storage()
s.add_task('42', 'my task')
self.assertEquals(s.get_uuid_by_name('my task'), '42')
def test_unknown_task_by_name(self):
s = storage.Storage()
s = self._get_storage()
with self.assertRaisesRegexp(exceptions.NotFound,
'^Unknown task name:'):
s.get_uuid_by_name('42')
def test_get_flow_state(self):
fd = storage.temporary_flow_detail()
lb, fd = p_utils.temporary_flow_detail(backend=self.backend)
fd.state = states.INTERRUPTED
fd.save()
s = storage.Storage(fd)
with contextlib.closing(self.backend.get_connection()) as conn:
fd.update(conn.update_flow_details(fd))
s = storage.Storage(flow_detail=fd, backend=self.backend)
self.assertEquals(s.get_flow_state(), states.INTERRUPTED)
def test_set_and_get_flow_state(self):
s = storage.Storage()
s = self._get_storage()
s.set_flow_state(states.SUCCESS)
self.assertEquals(s.get_flow_state(), states.SUCCESS)

@ -38,6 +38,49 @@ def get_task_version(task):
return task_version
class ExponentialBackoff(object):
def __init__(self, attempts, exponent=2):
self.attempts = int(attempts)
self.exponent = exponent
def __iter__(self):
if self.attempts <= 0:
raise StopIteration()
for i in xrange(0, self.attempts):
yield self.exponent ** i
def __str__(self):
return "ExponentialBackoff: %s" % ([str(v) for v in self])
def as_bool(val):
if isinstance(val, bool):
return val
if isinstance(val, basestring):
if val.lower() in ('f', 'false', '0', 'n', 'no'):
return False
if val.lower() in ('t', 'true', '1', 'y', 'yes'):
return True
return bool(val)
def as_int(obj, quiet=False):
# Try "2" -> 2
try:
return int(obj)
except (ValueError, TypeError):
pass
# Try "2.5" -> 2
try:
return int(float(obj))
except (ValueError, TypeError):
pass
# Eck, not sure what this is then.
if not quiet:
raise TypeError("Can not translate %s to an integer." % (obj))
return obj
def is_version_compatible(version_1, version_2):
"""Checks for major version compatibility of two *string" versions."""
try:

@ -15,3 +15,5 @@ alembic>=0.4.1
networkx>=1.8.1
threading2
Babel>=0.9.6
# Used for backend storage engine loading
stevedore>=0.10