diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index d50e30d9..cc7362c7 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -27,6 +27,14 @@ class Duplicate(TaskFlowException): pass +class StorageError(TaskFlowException): + """Raised when logbook can not be read/saved/deleted.""" + + def __init__(self, message, cause=None): + super(StorageError, self).__init__(message) + self.cause = cause + + class NotFound(TaskFlowException): """Raised when some entry in some object doesn't exist.""" pass diff --git a/taskflow/persistence/backends/api.py b/taskflow/persistence/backends/api.py index f7879fc8..8845e022 100644 --- a/taskflow/persistence/backends/api.py +++ b/taskflow/persistence/backends/api.py @@ -2,7 +2,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. +# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -16,479 +16,47 @@ # License for the specific language governing permissions and limitations # under the License. -"""Backend persistence API""" +import threading -from oslo.config import cfg - -from taskflow.common import config -from taskflow import utils - -db_opts = [ - cfg.StrOpt('db_backend', - default='sqlalchemy', - help='The backend to use for db')] -mem_opts = [ - cfg.StrOpt('mem_backend', - default='memory', - help='The backend to use for in-memory')] +from taskflow import exceptions as exc +from taskflow.openstack.common import importutils -CONF = cfg.CONF -CONF.register_opts(db_opts) -CONF.register_opts(mem_opts) - -IMPL = utils.LazyPluggable('mem_backend', - memory='taskflow.persistence.backends.memory.api', - sqlalchemy='taskflow.persistence.backends' - '.sqlalchemy.api') +_BACKEND_MAPPING = { + 'memory': 'taskflow.persistence.backends.memory.api', + 'sqlalchemy': 'taskflow.persistence.backends.sqlalchemy.api', + # TODO(harlowja): we likely need to allow more customization here so that + # its easier for a user of this library to alter the impl to there own + # choicing, aka, cinder has its own DB, or heat may want to write this + # information into swift, we need a way to accomodate that. +} +_BACKEND_MAPPING_LOCK = threading.RLock() +_BACKENDS = {} +_BACKEND_LOCK = threading.RLock() -def configure(pivot='mem_backend'): - IMPL.set_pivot(pivot) +def register(backend, module): + """Register a new (or override an old) backend type. - if pivot == 'db_backend': - global SQL_CONNECTION - global SQL_IDLE_TIMEOUT - config.register_db_opts() - SQL_CONNECTION = cfg.CONF.sql_connection - SQL_IDLE_TIMEOUT = cfg.CONF.sql_idle_timeout - - -""" -LOGBOOK -""" - - -def logbook_create(name, lb_id=None): - """Create a logbook - - This method creates a logbook representation in the currently selected - backend to be persisted. - - PARAMETERS: - name: str - - Specifies the name of the logbook represented. - - lb_id: str - - Specifies the uuid of the logbook represented. If this element is - left as None, the table will generate a uuid for the logbook. This - field must be unique for all logbooks stored in the backend. In the - event that a logbook with this uuid already exists, the backend - will raise an exception. + Instead of being restricted to the existing types that are pre-registered + in taskflow it is useful to allow others to either override those types + or add new ones (since all backend types can not be predicted ahead of + time). """ - return IMPL.logbook_create(name, lb_id) - - -def logbook_destroy(lb_id): - """Destroy a logbook - - This method will destroy a logbook representation with a uuid matching - lb_id from the currently selected backend. - - PARAMETERS: - lb_id: str - - Specifies the uuid of the logbook representation to be destroyed - from the currently selected backend. If a logbook with matching - uuid is not present, the backend will raise an exception. - """ - return IMPL.logbook_destroy(lb_id) - - -def logbook_save(lb): - """Save a logbook - - This method creates a logbook representation in the currently selected - backend if one does not already exists and updates it with attributes from - the generic LogBook passed in as a parameter. - - PARAMETERS: - lb: taskflow.generics.LogBook - - The generic type LogBook to be persisted in the backend. If a - representation does not yet exist, one will be created. The - representation will be updated to match the attributes of this - object. - """ - return IMPL.logbook_save(lb) - - -def logbook_delete(lb): - """Delete a logbook - - This method destroys a logbook representation in the curretly selected - backend. - - PARAMETERS: - lb: taskflow.generics.LogBook - - The generic type LogBook whose representation is to be destroyed - from the backend. If a representation does not exist, the backend - will raise an exception. - """ - return IMPL.logbook_delete(lb) - - -def logbook_get(lb_id): - """Get a logbook - - This method returns a generic type LogBook based on its representation in - the currently selected backend. - - PARAMETERS: - lb_id: str - - Specifies the uuid of the logbook representation to be used. If a - logbook with this uuid does not exist, the backend will raise an - exception. - - RETURNS: - a generic type LogBook that reflects what is stored by a logbook - representation in the backend. - """ - return IMPL.logbook_get(lb_id) - - -def logbook_add_flow_detail(lb_id, fd_id): - """Add a flowdetail - - This method adds a flowdetail in the backend to the list of flowdetails - contained by the specified logbook representation. - - PARAMETERS: - lb_id: str - - Specifies the uuid of the logbook representation to which the - flowdetail will be added. If a logbook with this uuid does not - exist, the backend will raise an exception. - - fd_id: str - - Specifies the uuid of the flowdetail representation to be added - to the specified logbook representation. If a flowdetail with this - uuid does not exist, the backend will raise an exception. - """ - return IMPL.logbook_add_flow_detail(lb_id, fd_id) - - -def logbook_remove_flowdetail(lb_id, fd_id): - """Remove a flowdetail - - This method removes a flowdetail from the list of flowdetails contained by - the specified logbook representation. - - PARAMETERS: - lb_id: str - - Specifies the uuid of the logbook representation from which the - flowdetail will be removed. If a logbook with this uuid does not - exist, the backend will raise an exception. - - fd_id: str - Specifies the uuid of the flowdetail representation to be removed - from the specified logbook representation. - """ - return IMPL.logbook_remove_flowdetail(lb_id, fd_id) - - -def logbook_get_ids_names(): - """Get the ids and names of all logbooks - - This method returns a dict of uuids and names of all logbook - representations stored in the backend. - - RETURNS: - a dict of uuids and names of all logbook representations - """ - return IMPL.logbook_get_ids_names() - - -""" -FLOWDETAIL -""" - - -def flowdetail_create(name, wf, fd_id=None): - """Create a flowdetail - - This method creates a flowdetail representation in the currently selected - backend to be persisted. - - PARAMETERS: - name: str - - Specifies the name of the flowdetail represented. - - wf: taskflow.generics.Flow - - The workflow object this flowdetail is to represent. - - fd_id: str - - Specifies the uuid of the flowdetail represented. If this element - is left as None, the table will generate a uuid for the flowdetail. - This field must be unique for all flowdetails stored in the - backend. In the event that a flowdetail with this uuid already - exists, the backend will raise an exception. - """ - return IMPL.flowdetail_create(name, wf, fd_id) - - -def flowdetail_destroy(fd_id): - """Destroy a flowdetail - - This method will destroy a flowdetail representation with a uuid matching - fd_id from the currently selected backend. - - PARAMETERS: - fd_id: str - - Specifices the uuid of the flowdetail representation to be - destroyed from the currently selected backend. If a flowdetail with - matching uuid is not present, the backend will raise an exception. - """ - return IMPL.flowdetail_destroy(fd_id) - - -def flowdetail_save(fd): - """Save a flowdetail - - This method creates a flowdetail representation in the currently selected - backend if one does not already exist and updates it with attributes from - the generic FlowDetail passed in as a parameter. - - PARAMETERS: - fd: taskflow.generics.FlowDetail - - The generic type FlowDetail to be persisted in the backend. If a - representation does not yet exist, one will be created. The - representation will be updated to match the attributes of this - object. - """ - return IMPL.flowdetail_save(fd) - - -def flowdetail_delete(fd): - """Delete a flowdetail - - This method destroys a flowdetail representation in the currently selected - backend. - - PARAMETERS: - fd: taskflow.generics.FlowDetail - - The generic type FlowDetail whose representation is to be destroyed - from the backend. If a representation does not exist, the backend - will raise an exception. - """ - return IMPL.flowdetail_delete(fd) - - -def flowdetail_get(fd_id): - """Get a flowdetail - - This method returns a generic type FlowDetail based on its representation - in the currently selected backend. - - PARAMETERS: - fd_id: str - - Specifies the uuid of the flowdetail representation to be used. If - a flowdetail with this uuid does not exist, the backend will raise - an exception. - - RETURNS: - a generic type FlowDetail that reflects what is stored by a flowdetail - representation in the backend. - """ - return IMPL.flowdetail_get(fd_id) - - -def flowdetail_add_task_detail(fd_id, td_id): - """Add a taskdetail - - This method adds a taskdetail in the backend to the list of taskdetails - contained by the specified flowdetail representation. - - PARAMETERS: - fd_id: str - - Specifies the uuid of the flowdetail representation to which the - taskdetail will be added. If a flowdetail with this uuid does not - exist, the backend will raise an exception. - - td_id: str - - Specifies the uuid of the taskdetail representation to be added - to the specified flowdetail representation. If a flowdetail with - this uuid does not exist, the backend will raise an exception. - """ - return IMPL.flowdetail_add_task_detail(fd_id, td_id) - - -def flowdetail_remove_taskdetail(fd_id, td_id): - """Remove a taskdetail - - This method removes a taskdetail from the list of taskdetails contained by - the specified flowdetail representation. - - PARAMETERS: - fd_id: str - - Specifies the uuid of the flowdetail representation from which the - taskdetail will be removed. If a flowdetail with this uuid does not - exist, the backend will raise an exception. - - td_id: str - Specifies the uuid of the taskdetail representation to be removed - from the specified flowdetail representation. - """ - return IMPL.flowdetail_remove_taskdetail(fd_id, td_id) - - -def flowdetail_get_ids_names(): - """Get the ids and names of all flowdetails - - This method returns a dict of the uuids and names of all flowdetail - representations stored in the backend. - - RETURNS: - a dict of uuids and names of all flowdetail representations - """ - return IMPL.flowdetail_get_ids_names() - - -""" -TASKDETAIL -""" - - -def taskdetail_create(name, tsk, td_id=None): - """Create a taskdetail - - This method creates a taskdetail representation in the current selected - backend to be persisted. - - PARAMETERS: - name: str - - Specifies the name of the taskdetail represented. - - tsk: taskflow.generics.Task - - The task object this taskdetail is to represent. - - td_id: str - - Specifies the uuid of the taskdetail represented. If this element - is left as None, the table will generate a uuid for the taskdetail. - This field must be unique for all taskdetails stored in the - backend. In the event that a taskdetail with this uuid already - exists, the backend will raise an exception. - """ - return IMPL.taskdetail_create(name, tsk, td_id) - - -def taskdetail_destroy(td_id): - """Destroy a taskdetail - - This method will destroy a taskdetail representation with a uuid matching - td_id from the currently selected backend. - - PARAMETERS: - td_id: str - - Specifies the uuid of the taskdetail representation to be - destroyed from the currently selected backend. If a taskdetail with - matching uuid is not present, the backend will raise an exception. - """ - return IMPL.taskdetail_destroy(td_id) - - -def taskdetail_save(td): - """Save a taskdetail - - This method creates a taskdetail representation in the currently selected - backend if one does not already exist and updates it with attributes from - the generic TaskDetail passed in as a parameter. - - PARAMETERS: - td: taskflow.generics.TaskDetail - - The generic type TaskDetail to be persisted in the backend. If a - representation does not yet exist, one will be created. The - representation will be updated to match the attributes of this - object. - """ - return IMPL.taskdetail_save(td) - - -def taskdetail_delete(td): - """Delete a taskdetail - - This method destroys a taskdetail representation in the currently selected - backend. - - PARAMETERS: - td: taskdetail.generics.TaskDetail - - The generic type TaskDetail whose representation is to be destroyed - from the backend. If a representation does not exist, the backend - will raise an exception. - """ - return IMPL.taskdetail_delete(td) - - -def taskdetail_get(td_id): - """Get a taskdetail - - This method returns a generic type TaskDetail based on its representation - in the currently selected backend. - - PARAMETERS: - td_id: str - - Specifies the uuid of the taskdetail representation to be used. If - a taskdetail with this uuid does not exist, the backend will raise - an exception. - - RETURNS: - a generic type TaskDetail that reflects what is stored by a taskdetail - representation in the backend. - """ - return IMPL.taskdetail_get(td_id) - - -def taskdetail_update(td_id, values): - """Update a taskdetail - - This method updates the attributes of a taskdetail representation in the - currently selected backend. - - PARAMETERS: - td_id: str - - Specifies the uuid of the taskdetail representation to be updated. - If a taskdetail with this uuid does not exist, the backend will - raise an execption. - - values: dict - - Specifies the values to be updated and the values to which they are - to be updated. - """ - return IMPL.taskdetail_update(td_id, values) - - -def taskdetail_get_ids_names(): - """Gets the ids and names of all taskdetails - - This method returns a dict of the uuids and names of all taskdetail - representations stored in the backend. - - RETURNS: - a dict of uuids and names of all taskdetail representations - """ - return IMPL.taskdetail_get_ids_names() + with _BACKEND_MAPPING_LOCK: + _BACKEND_MAPPING[backend] = str(module) + + +def fetch(backend): + """Fetch a backend impl. for a given backend type.""" + with _BACKEND_MAPPING_LOCK: + if backend not in _BACKEND_MAPPING: + raise exc.NotFound("Unknown backend %s requested" % (backend)) + mod = _BACKEND_MAPPING.get(backend, backend) + with _BACKEND_LOCK: + if mod in _BACKENDS: + return _BACKENDS[mod] + backend_mod = importutils.import_module(mod) + backend_impl = backend_mod.get_backend() + _BACKENDS[mod] = backend_impl + return backend_impl diff --git a/taskflow/persistence/backends/memory/api.py b/taskflow/persistence/backends/memory/api.py index fdaa02fe..fdf615f7 100644 --- a/taskflow/persistence/backends/memory/api.py +++ b/taskflow/persistence/backends/memory/api.py @@ -19,435 +19,148 @@ """Implementation of in-memory backend.""" +import copy import logging +import sys +import threading -from taskflow import exceptions as exception -from taskflow.persistence.backends.memory import memory -from taskflow.persistence import flowdetail -from taskflow.persistence import logbook -from taskflow.persistence import taskdetail -from taskflow.utils import LockingDict +from taskflow import exceptions as exc +from taskflow.openstack.common import timeutils +from taskflow import utils LOG = logging.getLogger(__name__) -logbooks = LockingDict() -flowdetails = LockingDict() -taskdetails = LockingDict() +# TODO(harlowja): we likely need to figure out a better place to put these +# rather than globals. +LOG_BOOKS = {} +FLOW_DETAILS = {} +TASK_DETAILS = {} + +# For now this will be a pretty big lock, since it is not expected that saves +# will be that frequent this seems ok for the time being. I imagine that this +# can be done better but it will require much more careful usage of a dict as +# a key/value map. Aka I wish python had a concurrent dict that was safe and +# known good to use. +SAVE_LOCK = threading.RLock() +READ_LOCK = threading.RLock() +READ_SAVE_ORDER = (READ_LOCK, SAVE_LOCK,) -""" -LOGBOOK -""" +def get_backend(): + """The backend is this module itself.""" + return sys.modules[__name__] -def logbook_create(name, lb_id=None): - """Creates a new LogBook model with matching lb_id""" - # Create the LogBook model - lb = memory.MemoryLogBook(name, lb_id) - - # Store it in the LockingDict for LogBooks - logbooks[lb_id] = lb +def _taskdetails_merge(td_e, td_new): + """Merges an existing taskdetails with a new one.""" + if td_e.state != td_new.state: + td_e.state = td_new.state + if td_e.results != td_new.results: + td_e.results = td_new.results + if td_e.exception != td_new.exception: + td_e.exception = td_new.exception + if td_e.stacktrace != td_new.stacktrace: + td_e.stacktrace = td_new.stacktrace + if td_e.meta != td_new.meta: + td_e.meta = td_new.meta + return td_e -def logbook_destroy(lb_id): - """Deletes the LogBook model with matching lb_id""" - # Try deleting the LogBook +def taskdetails_save(td): + with utils.MultiLock(READ_SAVE_ORDER): + try: + return _taskdetails_merge(TASK_DETAILS[td.uuid], td) + except KeyError: + raise exc.NotFound("No task details found with id: %s" % td.uuid) + + +def flowdetails_save(fd): try: - del logbooks[lb_id] - # Raise a NotFound error if the LogBook doesn't exist + with utils.MultiLock(READ_SAVE_ORDER): + e_fd = FLOW_DETAILS[fd.uuid] + if e_fd.meta != fd.meta: + e_fd.meta = fd.meta + if e_fd.state != fd.state: + e_fd.state = fd.state + for td in fd: + if td not in e_fd: + td = copy.deepcopy(td) + TASK_DETAILS[td.uuid] = td + e_fd.add(td) + else: + # Previously added but not saved into the taskdetails + # 'permanent' storage. + if td.uuid not in TASK_DETAILS: + TASK_DETAILS[td.uuid] = copy.deepcopy(td) + taskdetails_save(td) + return e_fd except KeyError: - raise exception.NotFound("No Logbook found with id " - "%s." % (lb_id,)) + raise exc.NotFound("No flow details found with id: %s" % fd.uuid) -def logbook_save(lb): - """Saves a generic LogBook object to the db""" - # Create a LogBook model if one doesn't exist - if not _logbook_exists(lb.uuid): - logbook_create(lb.name, lb.uuid) - - # Get a copy of the LogBook model in the LockingDict - ld_lb = logbook_get(lb.uuid) - - for fd in lb: - # Save each FlowDetail the LogBook to save has - fd.save() - - # Add the FlowDetail to the LogBook model if not already there - if fd not in ld_lb: - logbook_add_flow_detail(lb.uuid, fd.uuid) - - -def logbook_delete(lb): - """Deletes a LogBook from db based on a generic type""" - # Try to get the LogBook - try: - ld_lb = logbooks[lb.uuid] - # Raise a NotFound exception if the LogBook cannot be found - except KeyError: - raise exception.NotFound("No Logbook found with id " - "%s." % (lb.uuid,)) - - # Raise an error if the LogBook still has FlowDetails - if len(ld_lb): - raise exception.Error("Logbook <%s> still has " - "dependents." % (lb.uuid,)) - # Destroy the LogBook model if it is safe - else: - logbook_destroy(lb.uuid) +def clear_all(): + with utils.MultiLock(READ_SAVE_ORDER): + count = 0 + for lb_id in list(LOG_BOOKS.iterkeys()): + logbook_destroy(lb_id) + count += 1 + return count def logbook_get(lb_id): - """Gets a LogBook with matching lb_id, if it exists""" - # Try to get the LogBook try: - ld_lb = logbooks[lb_id] - # Raise a NotFound exception if the LogBook is not there + with READ_LOCK: + return LOG_BOOKS[lb_id] except KeyError: - raise exception.NotFound("No Logbook found with id " - "%s." % (lb_id,)) - - # Acquire a read lock on the LogBook - with ld_lb.acquire_lock(read=True): - # Create a generic type LogBook to return - retVal = logbook.LogBook(ld_lb.name, ld_lb.uuid) - - # Attach the appropriate generic FlowDetails to the generic LogBook - for fd in ld_lb: - retVal.add_flow_detail(flowdetail_get(fd.uuid)) - - return retVal + raise exc.NotFound("No logbook found with id: %s" % lb_id) -def logbook_add_flow_detail(lb_id, fd_id): - """Adds a FlowDetail with id fd_id to a LogBook with id lb_id""" - # Try to get the LogBook +def logbook_destroy(lb_id): try: - ld_lb = logbooks[lb_id] - # Raise a NotFound exception if the LogBook is not there + with utils.MultiLock(READ_SAVE_ORDER): + # Do the same cascading delete that the sql layer does. + lb = LOG_BOOKS.pop(lb_id) + for fd in lb: + FLOW_DETAILS.pop(fd.uuid, None) + for td in fd: + TASK_DETAILS.pop(td.uuid, None) except KeyError: - raise exception.NotFound("No Logbook found with id " - "%s." % (lb_id,)) - - # Try to get the FlowDetail to add - try: - ld_fd = flowdetails[fd_id] - # Raise a NotFound exception if the FlowDetail is not there - except KeyError: - raise exception.NotFound("No FlowDetail found with id " - "%s." % (fd_id,)) - - # Acquire a write lock on the LogBook - with ld_lb.acquire_lock(read=False): - # Add the FlowDetail model to the LogBook model - ld_lb.add_flow_detail(ld_fd) - - -def logbook_remove_flowdetail(lb_id, fd_id): - """Removes a FlowDetail with id fd_id from a LogBook with id lb_id""" - # Try to get the LogBook - try: - ld_lb = logbooks[lb_id] - # Raise a NotFound exception if it is not there - except KeyError: - raise exception.NotFound("No Logbook found with id " - "%s." % (lb_id,)) - - # Try to get the FlowDetail to remove - try: - ld_fd = flowdetails[fd_id] - # Raise a NotFound exception if it is not there - except KeyError: - raise exception.NotFound("No FlowDetail found with id " - "%s." % (fd_id,)) - - # Acquire a write lock on the LogBook model - with ld_lb.acquire_lock(read=False): - # Remove the FlowDetail from the LogBook model - ld_lb.remove_flow_detail(ld_fd) - - -def logbook_get_ids_names(): - """Returns all LogBook ids and names""" - lb_ids = [] - lb_names = [] - - # Iterate through the LockingDict and append to the Lists - for (k, v) in logbooks.items(): - lb_ids.append(k) - lb_names.append(v.name) - - # Return a dict of the ids and names - return dict(zip(lb_ids, lb_names)) - - -def _logbook_exists(lb_id, session=None): - """Check if a LogBook with lb_id exists""" - return lb_id in logbooks.keys() - - -""" -FLOWDETAIL -""" - - -def flowdetail_create(name, wf, fd_id=None): - """Create a new FlowDetail model with matching fd_id""" - # Create a FlowDetail model to save - fd = memory.MemoryFlowDetail(name, wf, fd_id) - - #Save the FlowDetail model to the LockingDict - flowdetails[fd_id] = fd - - -def flowdetail_destroy(fd_id): - """Deletes the FlowDetail model with matching fd_id""" - # Try to delete the FlowDetail model - try: - del flowdetails[fd_id] - # Raise a NotFound exception if the FlowDetail is not there - except KeyError: - raise exception.NotFound("No FlowDetail found with id " - "%s." % (fd_id,)) - - -def flowdetail_save(fd): - """Saves a generic FlowDetail object to the db""" - # Create a FlowDetail model if one does not exist - if not _flowdetail_exists(fd.uuid): - flowdetail_create(fd.name, fd.flow, fd.uuid) - - # Get a copy of the FlowDetail model in the LockingDict - ld_fd = flowdetail_get(fd.uuid) - - for td in fd: - # Save the TaskDetails in the FlowDetail to save - td.save() - - # Add the TaskDetail model to the FlowDetail model if it is not there - if td not in ld_fd: - flowdetail_add_task_detail(fd.uuid, td.uuid) - - -def flowdetail_delete(fd): - """Deletes a FlowDetail from db based on a generic type""" - # Try to get the FlowDetails - try: - ld_fd = flowdetails[fd.uuid] - # Raise a NotFound exception if the FlowDetail is not there - except KeyError: - raise exception.NotFound("No FlowDetail found with id " - "%s." % (fd.uuid,)) - - # Raise an error if the FlowDetail still has TaskDetails - if len(ld_fd): - raise exception.Error("FlowDetail <%s> still has " - "dependents." % (fd.uuid,)) - # If it is safe, delete the FlowDetail model - else: - flowdetail_destroy(fd.uuid) - - -def flowdetail_get(fd_id): - """Gets a FlowDetail with matching fd_id, if it exists""" - # Try to get the FlowDetail - try: - fd = flowdetails[fd_id] - # Raise a NotFound exception if it is not there - except KeyError: - raise exception.NotFound("No FlowDetail found with id " - "%s." % (fd_id,)) - - # Acquire a read lock on the FlowDetail - with fd.acquire_lock(read=True): - # Get the Flow this FlowDetail represents - wf = fd.flow - - # Create a FlowDetail to return - retVal = flowdetail.FlowDetail(fd.name, wf, fd.uuid) - - # Change updated_at to reflect the current data - retVal.updated_at = fd.updated_at - - # Add the generic TaskDetails to the FlowDetail to return - for td in fd: - retVal.add_task_detail(taskdetail_get(td.uuid)) - - return retVal - - -def flowdetail_add_task_detail(fd_id, td_id): - """Adds a TaskDetail with id td_id to a Flowdetail with id fd_id""" - # Try to get the FlowDetail - try: - fd = flowdetails[fd_id] - # Raise a NotFound exception if it is not there - except KeyError: - raise exception.NotFound("No FlowDetail found with id " - "%s." % (fd_id,)) - - # Try to get the TaskDetail to add - try: - td = taskdetails[td_id] - # Raise a NotFound exception if it is not there - except KeyError: - raise exception.NotFound("No TaskDetail found with id " - "%s." % (td_id,)) - - # Acquire a write lock on the FlowDetail model - with fd.acquire_lock(read=False): - # Add the TaskDetail to the FlowDetail model - fd.add_task_detail(td) - - -def flowdetail_remove_taskdetail(fd_id, td_id): - """Removes a TaskDetail with id td_id from a FlowDetail with id fd_id""" - # Try to get the FlowDetail - try: - fd = flowdetails[fd_id] - # Raise a NotFound exception if it is not there - except KeyError: - raise exception.NotFound("No FlowDetail found with id " - "%s." % (fd_id,)) - - # Try to get the TaskDetail to remove - try: - td = taskdetails[td_id] - # Raise a NotFound exception if it is not there - except KeyError: - raise exception.NotFound("No TaskDetail found with id " - "%s." % (td_id,)) - - # Acquire a write lock on the FlowDetail - with fd.acquire_lock(read=False): - # Remove the TaskDetail model from the FlowDetail model - fd.remove_task_detail(td) - - -def flowdetail_get_ids_names(): - """Returns all FlowDetail ids and names""" - fd_ids = [] - fd_names = [] - - # Iterate through the LockingDict and append to lists - for (k, v) in flowdetails.items(): - fd_ids.append(k) - fd_names.append(v.name) - - # Return a dict of the uuids and names - return dict(zip(fd_ids, fd_names)) - - -def _flowdetail_exists(fd_id): - """Checks if a FlowDetail with fd_id exists""" - return fd_id in flowdetails.keys() - - -""" -TASKDETAIL -""" - - -def taskdetail_create(name, tsk, td_id=None): - """Create a new TaskDetail model with matching td_id""" - # Create a TaskDetail model to save - td = memory.MemoryTaskDetail(name, tsk, td_id) - # Save the TaskDetail model to the LockingDict - taskdetails[td_id] = td - - -def taskdetail_destroy(td_id): - """Deletes the TaskDetail model with matching td_id""" - # Try to delete the TaskDetails - try: - del taskdetails[td_id] - # Raise a NotFound exception if it is not there - except KeyError: - raise exception.NotFound("No TaskDetail found with id " - "%s." % (td_id,)) - - -def taskdetail_save(td): - """Saves a generic TaskDetail object to the db""" - # Create a TaskDetail model if it doesn't exist - if not _taskdetail_exists(td.uuid): - taskdetail_create(td.name, td.task, td.uuid) - - # Prepare values for updating - values = dict(state=td.state, - results=td.results, - exception=td.exception, - stacktrace=td.stacktrace, - meta=td.meta) - - # Update the values of the TaskDetail model - taskdetail_update(td.uuid, values) - - -def taskdetail_delete(td): - """Deletes a TaskDetail from db based on a generic type""" - # Destroy the TaskDetail model - taskdetail_destroy(td.uuid) - - -def taskdetail_get(td_id): - """Gets a TaskDetail with matching td_id, if it exists""" - # Try to get the TaskDetail - try: - ld_td = taskdetails[td_id] - # Raise NotFound exception if it is not there - except KeyError: - raise exception.NotFound("No TaskDetail found with id " - "%s." % (td_id,)) - - # Acquire a read lock - with ld_td.acquire_lock(read=True): - # Get the Task this TaskDetail represents - tsk = ld_td.task - - # Update TaskDetail to return - retVal = taskdetail.TaskDetail(ld_td.name, tsk, ld_td.uuid) - retVal.updated_at = ld_td.updated_at - retVal.state = ld_td.state - retVal.results = ld_td.results - retVal.exception = ld_td.exception - retVal.stacktrace = ld_td.stacktrace - retVal.meta = ld_td.meta - - return retVal - - -def taskdetail_update(td_id, values): - """Updates a TaskDetail with matching td_id""" - # Try to get the TaskDetail - try: - ld_td = taskdetails[td_id] - # Raise a NotFound exception if it is not there - except KeyError: - raise exception.NotFound("No TaskDetail found with id " - "%s." % (td_id,)) - - # Acquire a write lock for the TaskDetail - with ld_td.acquire_lock(read=False): - # Write the values to the TaskDetail - for k, v in values.iteritems(): - setattr(ld_td, k, v) - - -def taskdetail_get_ids_names(): - """Returns all TaskDetail ids and names""" - td_ids = [] - td_names = [] - - # Iterate through the LockingDict and append to Lists - for (k, v) in taskdetails.items(): - td_ids.append(k) - td_names.append(v.name) - - # Return a dict of uuids and names - return dict(zip(td_ids, td_names)) - - -def _taskdetail_exists(td_id, session=None): - """Check if a TaskDetail with td_id exists""" - return td_id in taskdetails.keys() + raise exc.NotFound("No logbook found with id: %s" % lb_id) + + +def logbook_save(lb): + # Acquire all the locks that will be needed to perform this operation with + # out being affected by other threads doing it at the same time. + with utils.MultiLock(READ_SAVE_ORDER): + # Get a existing logbook model (or create it if it isn't there). + try: + backing_lb = LOG_BOOKS[lb.uuid] + if backing_lb.meta != lb.meta: + backing_lb.meta = lb.meta + # Add anything on to the existing loaded logbook that isn't already + # in the existing logbook. + for fd in lb: + if fd not in backing_lb: + FLOW_DETAILS[fd.uuid] = copy.deepcopy(fd) + backing_lb.add(flowdetails_save(fd)) + else: + # Previously added but not saved into the flowdetails + # 'permanent' storage. + if fd.uuid not in FLOW_DETAILS: + FLOW_DETAILS[fd.uuid] = copy.deepcopy(fd) + flowdetails_save(fd) + # TODO(harlowja): figure out a better way to set this property + # without actually letting others set it external. + backing_lb._updated_at = timeutils.utcnow() + except KeyError: + backing_lb = copy.deepcopy(lb) + # TODO(harlowja): figure out a better way to set this property + # without actually letting others set it external. + backing_lb._created_at = timeutils.utcnow() + # Record all the pieces as being saved. + LOG_BOOKS[lb.uuid] = backing_lb + for fd in backing_lb: + FLOW_DETAILS[fd.uuid] = fd + for td in fd: + TASK_DETAILS[td.uuid] = td + return backing_lb diff --git a/taskflow/persistence/backends/memory/memory.py b/taskflow/persistence/backends/memory/memory.py deleted file mode 100644 index 805861d8..00000000 --- a/taskflow/persistence/backends/memory/memory.py +++ /dev/null @@ -1,128 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import contextlib - -from taskflow import flow -from taskflow.jobs import job -from taskflow.jobs import jobboard -from taskflow.persistence import flowdetail -from taskflow.persistence import logbook -from taskflow.persistence import taskdetail -from taskflow import task -from taskflow import utils - - -class MemoryJobBoard(jobboard.JobBoard): - def __init__(self, name, jb_id=None): - super(MemoryJobBoard, self).__init__(name, jb_id) - self._lock = utils.ReaderWriterLock() - - @contextlib.contextmanager - def acquire_lock(self, read=True): - try: - self._lock.acquire(read) - yield self._lock - finally: - self._lock.release() - - -class MemoryJob(job.Job): - def __init__(self, name, job_id=None): - super(MemoryJob, self).__init__(name, job_id) - self._lock = utils.ReaderWriterLock() - - @contextlib.contextmanager - def acquire_lock(self, read=True): - try: - self._lock.acquire(read) - yield self._lock - finally: - self._lock.release() - - -class MemoryLogBook(logbook.LogBook): - def __init__(self, name, lb_id=None): - super(MemoryLogBook, self).__init__(name, lb_id) - self._lock = utils.ReaderWriterLock() - - @contextlib.contextmanager - def acquire_lock(self, read=True): - try: - self._lock.acquire(read) - yield self._lock - finally: - self._lock.release() - - -class MemoryFlow(flow.Flow): - def __init__(self, name, wf_id=None): - super(MemoryFlow, self).__init__(name, wf_id) - self._lock = utils.ReaderWriterLock() - self.flowdetails = {} - - @contextlib.contextmanager - def acquire_lock(self, read=True): - try: - self._lock.acquire(read) - yield self._lock - finally: - self._lock.release() - - -class MemoryFlowDetail(flowdetail.FlowDetail): - def __init__(self, name, wf, fd_id=None): - super(MemoryFlowDetail, self).__init__(name, wf, fd_id) - self._lock = utils.ReaderWriterLock() - - @contextlib.contextmanager - def acquire_lock(self, read=True): - try: - self._lock.acquire(read) - yield self._lock - finally: - self._lock.release() - - -class MemoryTask(task.Task): - def __init__(self, name, task_id=None): - super(MemoryTask, self).__init__(name, task_id) - self._lock = utils.ReaderWriterLock() - self.taskdetails = {} - - @contextlib.contextmanager - def acquire_lock(self, read=True): - try: - self._lock.acquire(read) - yield self._lock - finally: - self._lock.release() - - -class MemoryTaskDetail(taskdetail.TaskDetail): - def __init__(self, name, task, td_id=None): - super(MemoryTaskDetail, self).__init__(name, task, td_id) - self._lock = utils.ReaderWriterLock() - - @contextlib.contextmanager - def acquire_lock(self, read=True): - try: - self._lock.acquire(read) - yield self._lock - finally: - self._lock.release() diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/README b/taskflow/persistence/backends/sqlalchemy/alembic/README new file mode 100644 index 00000000..27497ba5 --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/alembic/README @@ -0,0 +1,9 @@ +Please see https://alembic.readthedocs.org/en/latest/index.html for general documentation + +To create alembic migrations you need to have alembic installed and available in PATH: +# pip install alembic +$ cd ./taskflow/persistence/backends/sqlalchemy/alembic +$ alembic revision -m "migration_description" + +See Operation Reference https://alembic.readthedocs.org/en/latest/ops.html#ops +for a short list of commands diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/alembic.ini b/taskflow/persistence/backends/sqlalchemy/alembic/alembic.ini new file mode 100644 index 00000000..b63e922f --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/alembic/alembic.ini @@ -0,0 +1,15 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = %(here)s + +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# This is set inside of migration script +# sqlalchemy.url = driver://user:pass@localhost/dbname diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/env.py b/taskflow/persistence/backends/sqlalchemy/alembic/env.py new file mode 100644 index 00000000..c47b8720 --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/alembic/env.py @@ -0,0 +1,64 @@ +from __future__ import with_statement +from alembic import context +from sqlalchemy import engine_from_config, pool + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = None + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure(url=url) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + engine = engine_from_config(config.get_section(config.config_ini_section), + prefix='sqlalchemy.', + poolclass=pool.NullPool) + + connection = engine.connect() + context.configure(connection=connection, target_metadata=target_metadata) + + try: + with context.begin_transaction(): + context.run_migrations() + finally: + connection.close() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/script.py.mako b/taskflow/persistence/backends/sqlalchemy/alembic/script.py.mako new file mode 100644 index 00000000..95702017 --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/alembic/script.py.mako @@ -0,0 +1,22 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision} +Create Date: ${create_date} + +""" + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/versions/1cea328f0f65_initial_logbook_deta.py b/taskflow/persistence/backends/sqlalchemy/alembic/versions/1cea328f0f65_initial_logbook_deta.py new file mode 100644 index 00000000..0bb45db6 --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/alembic/versions/1cea328f0f65_initial_logbook_deta.py @@ -0,0 +1,119 @@ +"""initial_logbook_details_tables + +Revision ID: 1cea328f0f65 +Revises: None +Create Date: 2013-08-23 11:41:49.207087 + +""" + +# revision identifiers, used by Alembic. +revision = '1cea328f0f65' +down_revision = None + +import logging + +from alembic import op +import sqlalchemy as sa + + +LOG = logging.getLogger(__name__) + + +def _get_indexes(): + # Ensure all uuids are indexed since they are what is typically looked + # up and fetched, so attempt to ensure that that is done quickly. + indexes = [ + { + 'name': 'logbook_uuid_idx', + 'table_name': 'logbooks', + 'columns': ['uuid'], + }, + { + 'name': 'flowdetails_uuid_idx', + 'table_name': 'flowdetails', + 'columns': ['uuid'], + }, + { + 'name': 'taskdetails_uuid_idx', + 'table_name': 'taskdetails', + 'columns': ['uuid'], + }, + ] + return indexes + + +def _get_foreign_keys(): + f_keys = [ + # Flow details uuid -> logbook parent uuid + { + 'name': 'flowdetails_ibfk_1', + 'source': 'flowdetails', + 'referent': 'logbooks', + 'local_cols': ['parent_uuid'], + 'remote_cols': ['uuid'], + 'ondelete': 'CASCADE', + }, + # Task details uuid -> flow details parent uuid + { + 'name': 'taskdetails_ibfk_1', + 'source': 'taskdetails', + 'referent': 'flowdetails', + 'local_cols': ['parent_uuid'], + 'remote_cols': ['uuid'], + 'ondelete': 'CASCADE', + }, + ] + return f_keys + + +def upgrade(): + op.create_table('logbooks', + sa.Column('created_at', sa.DateTime), + sa.Column('updated_at', sa.DateTime), + sa.Column('meta', sa.Text(), nullable=True), + sa.Column('name', sa.String(length=255), nullable=True), + sa.Column('uuid', sa.String(length=64), primary_key=True, + nullable=False), + mysql_engine='InnoDB', + mysql_charset='utf8') + op.create_table('flowdetails', + sa.Column('created_at', sa.DateTime), + sa.Column('updated_at', sa.DateTime), + sa.Column('parent_uuid', sa.String(length=64)), + sa.Column('meta', sa.Text(), nullable=True), + sa.Column('state', sa.String(length=255), nullable=True), + sa.Column('name', sa.String(length=255), nullable=True), + sa.Column('uuid', sa.String(length=64), primary_key=True, + nullable=False), + mysql_engine='InnoDB', + mysql_charset='utf8') + op.create_table('taskdetails', + sa.Column('created_at', sa.DateTime), + sa.Column('updated_at', sa.DateTime), + sa.Column('parent_uuid', sa.String(length=64)), + sa.Column('meta', sa.Text(), nullable=True), + sa.Column('name', sa.String(length=255), nullable=True), + sa.Column('results', sa.Text(), nullable=True), + sa.Column('version', sa.String(length=64), nullable=True), + sa.Column('stacktrace', sa.Text(), nullable=True), + sa.Column('exception', sa.Text(), nullable=True), + sa.Column('state', sa.String(length=255), nullable=True), + sa.Column('uuid', sa.String(length=64), primary_key=True, + nullable=False), + mysql_engine='InnoDB', + mysql_charset='utf8') + try: + for fkey_descriptor in _get_foreign_keys(): + op.create_foreign_key(**fkey_descriptor) + except NotImplementedError as e: + LOG.warn("Foreign keys are not supported: %s", e) + try: + for index_descriptor in _get_indexes(): + op.create_index(**index_descriptor) + except NotImplementedError as e: + LOG.warn("Indexes are not supported: %s", e) + + +def downgrade(): + for table in ['logbooks', 'flowdetails', 'taskdetails']: + op.drop_table(table) diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/versions/README b/taskflow/persistence/backends/sqlalchemy/alembic/versions/README new file mode 100644 index 00000000..9df3b2db --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/alembic/versions/README @@ -0,0 +1 @@ +Directory for alembic migration files diff --git a/taskflow/persistence/backends/sqlalchemy/api.py b/taskflow/persistence/backends/sqlalchemy/api.py index 6a378074..1dc04376 100644 --- a/taskflow/persistence/backends/sqlalchemy/api.py +++ b/taskflow/persistence/backends/sqlalchemy/api.py @@ -17,14 +17,16 @@ # License for the specific language governing permissions and limitations # under the License. -"""Implementation of SQLAlchemy backend.""" +"""Implementation of a SQLAlchemy storage backend.""" import logging +import sys -from sqlalchemy import exc -from taskflow import exceptions as exception +from sqlalchemy import exceptions as sql_exc + +from taskflow import exceptions as exc +from taskflow.openstack.common.db.sqlalchemy import session as db_session from taskflow.persistence.backends.sqlalchemy import models -from taskflow.persistence.backends.sqlalchemy import session as sql_session from taskflow.persistence import flowdetail from taskflow.persistence import logbook from taskflow.persistence import taskdetail @@ -33,452 +35,212 @@ from taskflow.persistence import taskdetail LOG = logging.getLogger(__name__) -def model_query(*args, **kwargs): - session = kwargs.get('session') or sql_session.get_session() - query = session.query(*args) - - return query +def get_backend(): + """The backend is this module itself.""" + return sys.modules[__name__] -""" -LOGBOOK -""" +def _convert_fd_to_external(fd): + fd_c = flowdetail.FlowDetail(fd.name, uuid=fd.uuid, backend='sqlalchemy') + fd_c.meta = fd.meta + fd_c.state = fd.state + for td in fd.taskdetails: + fd_c.add(_convert_td_to_external(td)) + return fd_c -def logbook_create(name, lb_id=None): - """Creates a new LogBook model with matching lb_id""" - # Create a LogBook model to save - lb_ref = models.LogBook() - # Update attributes of the LogBook model - lb_ref.name = name - if lb_id: - lb_ref.logbook_id = lb_id - # Save the LogBook to the database - lb_ref.save() +def _convert_fd_to_internal(fd, lb_uuid): + fd_m = models.FlowDetail(name=fd.name, uuid=fd.uuid, parent_uuid=lb_uuid, + meta=fd.meta, state=fd.state) + fd_m.taskdetails = [] + for td in fd: + fd_m.taskdetails.append(_convert_td_to_internal(td, fd_m.uuid)) + return fd_m + + +def _convert_td_to_internal(td, parent_uuid): + return models.TaskDetail(name=td.name, uuid=td.uuid, + state=td.state, results=td.results, + exception=td.exception, meta=td.meta, + stacktrace=td.stacktrace, + version=td.version, parent_uuid=parent_uuid) + + +def _convert_td_to_external(td): + # Convert from sqlalchemy model -> external model, this allows us + # to change the internal sqlalchemy model easily by forcing a defined + # interface (that isn't the sqlalchemy model itself). + td_c = taskdetail.TaskDetail(td.name, uuid=td.uuid, backend='sqlalchemy') + td_c.state = td.state + td_c.results = td.results + td_c.exception = td.exception + td_c.stacktrace = td.stacktrace + td_c.meta = td.meta + td_c.version = td.version + return td_c + + +def _convert_lb_to_external(lb_m): + """Don't expose the internal sqlalchemy ORM model to the external api.""" + lb_c = logbook.LogBook(lb_m.name, lb_m.uuid, + updated_at=lb_m.updated_at, + created_at=lb_m.created_at, + backend='sqlalchemy') + lb_c.meta = lb_m.meta + for fd_m in lb_m.flowdetails: + lb_c.add(_convert_fd_to_external(fd_m)) + return lb_c + + +def _convert_lb_to_internal(lb_c): + """Don't expose the external model to the sqlalchemy ORM model.""" + lb_m = models.LogBook(uuid=lb_c.uuid, meta=lb_c.meta, name=lb_c.name) + lb_m.flowdetails = [] + for fd_c in lb_c: + lb_m.flowdetails.append(_convert_fd_to_internal(fd_c, lb_c.uuid)) + return lb_m + + +def _logbook_get_model(lb_id, session): + entry = session.query(models.LogBook).filter_by(uuid=lb_id).first() + if entry is None: + raise exc.NotFound("No logbook found with id: %s" % lb_id) + return entry + + +def _flow_details_get_model(f_id, session): + entry = session.query(models.FlowDetail).filter_by(uuid=f_id).first() + if entry is None: + raise exc.NotFound("No flow details found with id: %s" % f_id) + return entry + + +def _task_details_get_model(t_id, session): + entry = session.query(models.TaskDetail).filter_by(uuid=t_id).first() + if entry is None: + raise exc.NotFound("No task details found with id: %s" % t_id) + return entry + + +def _taskdetails_merge(td_m, td): + if td_m.state != td.state: + td_m.state = td.state + if td_m.results != td.results: + td_m.results = td.results + if td_m.exception != td.exception: + td_m.exception = td.exception + if td_m.stacktrace != td.stacktrace: + td_m.stacktrace = td.stacktrace + if td_m.meta != td.meta: + td_m.meta = td.meta + return td_m + + +def clear_all(): + session = db_session.get_session() + with session.begin(): + # NOTE(harlowja): due to how we have our relationship setup and + # cascading deletes are enabled, this will cause all associated task + # details and flow details to automatically be purged. + try: + return session.query(models.LogBook).delete() + except sql_exc.DBAPIError as e: + raise exc.StorageError("Failed clearing all entries: %s" % e, e) + + +def taskdetails_save(td): + # Must already exist since a tasks details has a strong connection to + # a flow details, and tasks details can not be saved on there own since + # they *must* have a connection to an existing flow details. + session = db_session.get_session() + with session.begin(): + td_m = _task_details_get_model(td.uuid, session=session) + td_m = _taskdetails_merge(td_m, td) + td_m = session.merge(td_m) + return _convert_td_to_external(td_m) + + +def flowdetails_save(fd): + # Must already exist since a flow details has a strong connection to + # a logbook, and flow details can not be saved on there own since they + # *must* have a connection to an existing logbook. + session = db_session.get_session() + with session.begin(): + fd_m = _flow_details_get_model(fd.uuid, session=session) + if fd_m.meta != fd.meta: + fd_m.meta = fd.meta + if fd_m.state != fd.state: + fd_m.state = fd.state + for td in fd: + updated = False + for td_m in fd_m.taskdetails: + if td_m.uuid == td.uuid: + updated = True + td_m = _taskdetails_merge(td_m, td) + break + if not updated: + fd_m.taskdetails.append(_convert_td_to_internal(td, fd_m.uuid)) + fd_m = session.merge(fd_m) + return _convert_fd_to_external(fd_m) def logbook_destroy(lb_id): - """Deletes the LogBook model with matching lb_id""" - # Get the session to interact with the database - session = sql_session.get_session() + session = db_session.get_session() with session.begin(): - # Get the LogBook model - lb = _logbook_get_model(lb_id, session=session) - # Delete the LogBook model from the database - lb.delete(session=session) + try: + lb = _logbook_get_model(lb_id, session=session) + session.delete(lb) + except sql_exc.DBAPIError as e: + raise exc.StorageError("Failed destroying" + " logbook %s: %s" % (lb_id, e), e) def logbook_save(lb): - """Saves a generic LogBook object to the db""" - # Try to create the LogBook model - try: - logbook_create(lb.name, lb.uuid) - # Do nothing if it is already there - except exc.IntegrityError: - pass - - # Get a copy of the LogBook in the database - db_lb = logbook_get(lb.uuid) - - for fd in lb: - # Save each FlowDetail - flowdetail_save(fd) - - # Add the FlowDetail model to the LogBook model if it is not there - if fd not in db_lb: - logbook_add_flow_detail(lb.uuid, fd.uuid) - - -def logbook_delete(lb): - """Deletes a LogBook from db based on a generic type""" - # Get a session to interact with the database - session = sql_session.get_session() + session = db_session.get_session() with session.begin(): - # Get the LogBook model - lb_model = _logbook_get_model(lb.uuid, session=session) - - # Raise an error if the LogBook model still has FlowDetails - if lb_model.flowdetails: - raise exception.Error("Logbook <%s> still has " - "dependents." % (lb.uuid,)) - # Destroy the model if it is safe - else: - logbook_destroy(lb.uuid) + try: + lb_m = _logbook_get_model(lb.uuid, session=session) + # NOTE(harlowja): Merge them (note that this doesn't provide 100% + # correct update semantics due to how databases have MVCC). This + # is where a stored procedure or a better backing store would + # handle this better (something more suited to this type of data). + for fd in lb: + existing_fd = False + for fd_m in lb_m.flowdetails: + if fd_m.uuid == fd.uuid: + existing_fd = True + if fd_m.meta != fd.meta: + fd_m.meta = fd.meta + if fd_m.state != fd.state: + fd_m.state = fd.state + for td in fd: + existing_td = False + for td_m in fd_m.taskdetails: + if td_m.uuid == td.uuid: + existing_td = True + td_m = _taskdetails_merge(td_m, td) + break + if not existing_td: + td_m = _convert_td_to_internal(td, fd_m.uuid) + fd_m.taskdetails.append(td_m) + if not existing_fd: + lb_m.flowdetails.append(_convert_fd_to_internal(fd, + lb_m.uuid)) + except exc.NotFound: + lb_m = _convert_lb_to_internal(lb) + try: + lb_m = session.merge(lb_m) + return _convert_lb_to_external(lb_m) + except sql_exc.DBAPIError as e: + raise exc.StorageError("Failed saving" + " logbook %s: %s" % (lb.uuid, e), e) def logbook_get(lb_id): - """Gets a LogBook with matching lb_id, if it exists""" - # Get a session to interact with the database - session = sql_session.get_session() - with session.begin(): - # Get the LogBook model from the database - lb = _logbook_get_model(lb_id, session=session) - - # Create a generic LogBook to return - retVal = logbook.LogBook(lb.name, lb.logbook_id) - - # Add the generic FlowDetails associated with this LogBook - for fd in lb.flowdetails: - retVal.add_flow_detail(flowdetail_get(fd.flowdetail_id)) - - return retVal - - -def logbook_add_flow_detail(lb_id, fd_id): - """Adds a FlowDetail with id fd_id to a LogBook with id lb_id""" - # Get a session to interact with the database - session = sql_session.get_session() - with session.begin(): - # Get the LogBook model from the database - lb = _logbook_get_model(lb_id, session=session) - # Get the FlowDetail model from the database - fd = _flowdetail_get_model(fd_id, session=session) - # Add the FlowDetail model to the LogBook model - lb.flowdetails.append(fd) - - -def logbook_remove_flowdetail(lb_id, fd_id): - """Removes a FlowDetail with id fd_id from a LogBook with id lb_id""" - # Get a session to interact with the database - session = sql_session.get_session() - with session.begin(): - # Get the LogBook model - lb = _logbook_get_model(lb_id, session=session) - # Remove the FlowDetail model from the LogBook model - lb.flowdetails = [fd for fd in lb.flowdetails - if fd.flowdetail_id != fd_id] - - -def logbook_get_ids_names(): - """Returns all LogBook ids and names""" - # Get a List of all LogBook models - lbs = model_query(models.LogBook).all() - - # Get all of the LogBook uuids - lb_ids = [lb.logbook_id for lb in lbs] - # Get all of the LogBook names - names = [lb.name for lb in lbs] - - # Return a dict with uuids and names - return dict(zip(lb_ids, names)) - - -def _logbook_get_model(lb_id, session=None): - """Gets a LogBook model with matching lb_id, if it exists""" - # Get a query of LogBooks by uuid - query = model_query(models.LogBook, session=session).\ - filter_by(logbook_id=lb_id) - - # If there are no elements in the Query, raise a NotFound exception - if not query.first(): - raise exception.NotFound("No LogBook found with id " - "%s." % (lb_id,)) - - # Return the first item in the Query - return query.first() - - -def _logbook_exists(lb_id, session=None): - """Check if a LogBook with lb_id exists""" - # Gets a Query of all LogBook models - query = model_query(models.LogBook, session=session).\ - filter_by(logbook_id=lb_id) - - # Return False if the query is empty - if not query.first(): - return False - - # Return True if there is something in the query - return True - - -""" -FLOWDETAIL -""" - - -def flowdetail_create(name, wf, fd_id=None): - """Create a new FlowDetail model with matching fd_id""" - # Create a FlowDetail model to be saved - fd_ref = models.FlowDetail() - # Update attributes of FlowDetail model to be saved - fd_ref.name = name - if fd_id: - fd_ref.flowdetail_id = fd_id - # Save FlowDetail model to database - fd_ref.save() - - -def flowdetail_destroy(fd_id): - """Deletes the FlowDetail model with matching fd_id""" - # Get a session for interaction with the database - session = sql_session.get_session() - with session.begin(): - # Get the FlowDetail model - fd = _flowdetail_get_model(fd_id, session=session) - # Delete the FlowDetail from the database - fd.delete(session=session) - - -def flowdetail_save(fd): - """Saves a generic FlowDetail object to the db""" - # Try to create the FlowDetail model + session = db_session.get_session() try: - flowdetail_create(fd.name, fd.flow, fd.uuid) - # Do nothing if it is already there - except exc.IntegrityError: - pass - - # Get a copy of the FlowDetail in the database - db_fd = flowdetail_get(fd.uuid) - - for td in fd: - # Save each TaskDetail - taskdetail_save(td) - - # Add the TaskDetail model to the FlowDetail model if it is not there - if td not in db_fd: - flowdetail_add_task_detail(fd.uuid, td.uuid) - - -def flowdetail_delete(fd): - """Deletes a FlowDetail from db based on a generic type""" - # Get a session to interact with the database - session = sql_session.get_session() - with session.begin(): - # Get the FlowDetail model - fd_model = _flowdetail_get_model(fd.uuid, session=session) - - # Raise an error if the FlowDetail model still has TaskDetails - if fd_model.taskdetails: - raise exception.Error("FlowDetail <%s> still has " - "dependents." % (fd.uuid,)) - # If it is safe, destroy the FlowDetail model from the database - else: - flowdetail_destroy(fd.uuid) - - -def flowdetail_get(fd_id): - """Gets a FlowDetail with matching fd_id, if it exists""" - # Get a session for interaction with the database - session = sql_session.get_session() - with session.begin(): - # Get the FlowDetail model from the database - fd = _flowdetail_get_model(fd_id, session=session) - - # Create a generic FlowDetail to return - retVal = flowdetail.FlowDetail(fd.name, None, fd.flowdetail_id) - - # Update attributes to match - retVal.updated_at = fd.updated_at - - # Add the TaskDetails belonging to this FlowDetail to itself - for td in fd.taskdetails: - retVal.add_task_detail(taskdetail_get(td.taskdetail_id)) - - return retVal - - -def flowdetail_add_task_detail(fd_id, td_id): - """Adds a TaskDetail with id td_id to a Flowdetail with id fd_id""" - # Get a session for interaction with the database - session = sql_session.get_session() - with session.begin(): - # Get the FlowDetail model - fd = _flowdetail_get_model(fd_id, session=session) - # Get the TaskDetail model - td = _taskdetail_get_model(td_id, session=session) - # Add the TaskDetail model to the FlowDetail model - fd.taskdetails.append(td) - - -def flowdetail_remove_taskdetail(fd_id, td_id): - """Removes a TaskDetail with id td_id from a FlowDetail with id fd_id""" - # Get a session for interaction with the database - session = sql_session.get_session() - with session.begin(): - # Get the FlowDetail model - fd = _flowdetail_get_model(fd_id, session=session) - # Remove the TaskDetail from the FlowDetail model - fd.taskdetails = [td for td in fd.taskdetails - if td.taskdetail_id != td_id] - - -def flowdetail_get_ids_names(): - """Returns all FlowDetail ids and names""" - # Get all FlowDetail models - fds = model_query(models.FlowDetail).all() - - # Get the uuids of all FlowDetail models - fd_ids = [fd.flowdetail_id for fd in fds] - # Get the names of all FlowDetail models - names = [fd.name for fd in fds] - - # Return a dict of uuids and names - return dict(zip(fd_ids, names)) - - -def _flowdetail_get_model(fd_id, session=None): - """Gets a FlowDetail model with matching fd_id, if it exists""" - # Get a query of FlowDetails by uuid - query = model_query(models.FlowDetail, session=session).\ - filter_by(flowdetail_id=fd_id) - - # Raise a NotFound exception if the query is empty - if not query.first(): - raise exception.NotFound("No FlowDetail found with id " - "%s." % (fd_id,)) - - # Return the first entry in the query - return query.first() - - -def _flowdetail_exists(fd_id, session=None): - """Checks if a FlowDetail with fd_id exists""" - # Get a query of FlowDetails by uuid - query = model_query(models.FlowDetail, session=session).\ - filter_by(flowdetail_id=fd_id) - - # Return False if the query is empty - if not query.first(): - return False - - # Return True if there is something in the query - return True - - -""" -TASKDETAIL -""" - - -def taskdetail_create(name, tsk, td_id=None): - """Create a new TaskDetail model with matching td_id""" - # Create a TaskDetail model to add - td_ref = models.TaskDetail() - # Update the attributes of the TaskDetail model to add - td_ref.name = name - if td_id: - td_ref.taskdetail_id = td_id - - td_ref.task_id = tsk.uuid - td_ref.task_name = tsk.name - td_ref.task_provides = list(tsk.provides) - td_ref.task_requires = list(tsk.requires) - td_ref.task_optional = list(tsk.optional) - # Save the TaskDetail model to the database - td_ref.save() - - -def taskdetail_destroy(td_id): - """Deletes the TaskDetail model with matching td_id""" - # Get a session for interaction with the database - session = sql_session.get_session() - with session.begin(): - # Get the TaskDetail model to delete - td = _taskdetail_get_model(td_id, session=session) - # Delete the TaskDetail model from the database - td.delete(session=session) - - -def taskdetail_save(td): - """Saves a generic TaskDetail object to the db""" - # Create a TaskDetail model if it does not already exist - if not _taskdetail_exists(td.uuid): - taskdetail_create(td.name, td.task, td.uuid) - - # Prepare values to be saved to the TaskDetail model - values = dict(state=td.state, - results=td.results, - exception=td.exception, - stacktrace=td.stacktrace, - meta=td.meta) - - # Update the TaskDetail model with the values of the generic TaskDetail - taskdetail_update(td.uuid, values) - - -def taskdetail_delete(td): - """Deletes a TaskDetail from db based on a generic type""" - # Destroy the TaskDetail if it exists - taskdetail_destroy(td.uuid) - - -def taskdetail_get(td_id): - """Gets a TaskDetail with matching td_id, if it exists""" - # Get a session for interaction with the database - session = sql_session.get_session() - with session.begin(): - # Get the TaskDetail model - td = _taskdetail_get_model(td_id, session=session) - - # Create a generic type Task to return as part of the TaskDetail - tsk = None - - # Create a generic type TaskDetail to return - retVal = taskdetail.TaskDetail(td.name, tsk, td.taskdetail_id) - # Update the TaskDetail to reflect the data in the database - retVal.updated_at = td.updated_at - retVal.state = td.state - retVal.results = td.results - retVal.exception = td.exception - retVal.stacktrace = td.stacktrace - retVal.meta = td.meta - - return retVal - - -def taskdetail_update(td_id, values): - """Updates a TaskDetail with matching td_id""" - # Get a session for interaction with the database - session = sql_session.get_session() - with session.begin(): - # Get the TaskDetail model - td = _taskdetail_get_model(td_id, session=session) - - # Update the TaskDetail model with values - td.update(values) - # Write the TaskDetail model changes to the database - td.save(session=session) - - -def taskdetail_get_ids_names(): - """Returns all TaskDetail ids and names""" - # Get all TaskDetail models - tds = model_query(models.TaskDetail).all() - - # Get the list of TaskDetail uuids - td_ids = [td.taskdetail_id for td in tds] - # Get the list of TaskDetail names - names = [td.name for td in tds] - - #Return a dict of uuids and names - return dict(zip(td_ids, names)) - - -def _taskdetail_get_model(td_id, session=None): - """Gets a TaskDetail model with matching td_id, if it exists""" - # Get a query of TaskDetails by uuid - query = model_query(models.TaskDetail, session=session).\ - filter_by(taskdetail_id=td_id) - - # Raise a NotFound exception if the query is empty - if not query.first(): - raise exception.NotFound("No TaskDetail found with id " - "%s." % (td_id,)) - - return query.first() - - -def _taskdetail_exists(td_id, session=None): - """Check if a TaskDetail with td_id exists""" - # Get a query of TaskDetails by uuid - query = model_query(models.TaskDetail, session=session).\ - filter_by(taskdetail_id=td_id) - - # Return False if the query is empty - if not query.first(): - return False - - # Return True if there is something in the query - return True + lb_m = _logbook_get_model(lb_id, session=session) + return _convert_lb_to_external(lb_m) + except sql_exc.DBAPIError as e: + raise exc.StorageError("Failed getting" + " logbook %s: %s" % (lb_id, e), e) diff --git a/taskflow/persistence/backends/sqlalchemy/base.py b/taskflow/persistence/backends/sqlalchemy/base.py deleted file mode 100644 index 3ecfce17..00000000 --- a/taskflow/persistence/backends/sqlalchemy/base.py +++ /dev/null @@ -1,39 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Base class for classes that need modular database access.""" - -from oslo.config import cfg - -from taskflow.openstack.common import importutils - -db_driver_opt = cfg.StrOpt('db_driver', - default='taskflow.db', - help='driver to use for database access') - -CONF = cfg.CONF -CONF.register_opt(db_driver_opt) - - -class Base(object): - """DB driver is injected in the init method.""" - - def __init__(self, db_driver=None): - if not db_driver: - db_driver = CONF.db_driver - self.db = importutils.import_module(db_driver) # pylint: disable=C0103 diff --git a/taskflow/persistence/backends/sqlalchemy/migration.py b/taskflow/persistence/backends/sqlalchemy/migration.py new file mode 100644 index 00000000..89c27c29 --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/migration.py @@ -0,0 +1,44 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Database setup and migration commands.""" + +import os + +from oslo.config import cfg + +import alembic +from alembic import config as alembic_config + +CONF = cfg.CONF +CONF.import_opt('connection', + 'taskflow.openstack.common.db.sqlalchemy.session', + group='database') + + +def _alembic_config(): + path = os.path.join(os.path.dirname(__file__), 'alembic', 'alembic.ini') + config = alembic_config.Config(path) + if not config.get_main_option('url'): + config.set_main_option('sqlalchemy.url', CONF.database.connection) + return config + + +def db_sync(): + config = _alembic_config() + alembic.command.upgrade(config, "head") diff --git a/taskflow/persistence/backends/sqlalchemy/models.py b/taskflow/persistence/backends/sqlalchemy/models.py index f307978b..bc3e4818 100644 --- a/taskflow/persistence/backends/sqlalchemy/models.py +++ b/taskflow/persistence/backends/sqlalchemy/models.py @@ -3,7 +3,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. +# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -16,25 +16,18 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -""" -SQLAlchemy models for taskflow data. -""" -import json -from oslo.config import cfg -from sqlalchemy import Column, Integer, String -from sqlalchemy.exc import IntegrityError +from sqlalchemy import Column, String from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import object_mapper, relationship -from sqlalchemy import DateTime, ForeignKey +from sqlalchemy import ForeignKey +from sqlalchemy.orm import backref +from sqlalchemy.orm import relationship from sqlalchemy import types as types -from taskflow import exceptions as exception -from taskflow.openstack.common import timeutils +from taskflow.openstack.common.db.sqlalchemy import models as c_models +from taskflow.openstack.common import jsonutils from taskflow.openstack.common import uuidutils -from taskflow.persistence.backends.sqlalchemy import session as sql_session -CONF = cfg.CONF BASE = declarative_base() @@ -42,133 +35,57 @@ class Json(types.TypeDecorator, types.MutableType): impl = types.Text def process_bind_param(self, value, dialect): - return json.dumps(value) + return jsonutils.dumps(value) def process_result_value(self, value, dialect): - return json.loads(value) + return jsonutils.loads(value) -class TaskFlowBase(object): - """Base class for TaskFlow Models.""" - __table_args__ = {'mysql_engine': 'InnoDB'} - __table_initialized = False - created_at = Column(DateTime, default=timeutils.utcnow) - updated_at = Column(DateTime, default=timeutils.utcnow) - - def save(self, session=None): - """Save this object.""" - if not session: - session = sql_session.get_session() - session.add(self) - try: - session.flush() - except IntegrityError, e: - if str(e).endswith('is not unique'): - raise exception.Duplicate(str(e)) - else: - raise - - def delete(self, session=None): - """Delete this object.""" - self.deleted = True - self.deleted_at = timeutils.utcnow() - if not session: - session = sql_session.get_session() - session.delete(self) - session.flush() - - def __setitem__(self, key, value): - setattr(self, key, value) - - def __getitem__(self, key): - return getattr(self, key) - - def get(self, key, default=None): - return getattr(self, key, default) - - def __iter__(self): - self._i = iter(object_mapper(self).columns) - return self - - def next(self): - n = self._i.next().name - return n, getattr(self, n) - - def update(self, values): - """Make the model object behave like a dict""" - for k, v in values.iteritems(): - setattr(self, k, v) - - def iteritems(self): - """Make the model object behave like a dict - - Includes attributes from joins. - """ - local = dict(self) - joined = dict([k, v] for k, v in self.__dict__.iteritems() - if not k[0] == '_') - local.update(joined) - return local.iteritems() +class ModelBase(c_models.ModelBase, + c_models.TimestampMixin): + """Base model for all taskflow objects""" + uuid = Column(String, default=uuidutils.generate_uuid, + primary_key=True, nullable=False, unique=True) + name = Column(String, nullable=True) + meta = Column(Json, nullable=True) -class LogBook(BASE, TaskFlowBase): +class LogBook(BASE, ModelBase): """Represents a logbook for a set of flows""" - - __tablename__ = 'logbook' - - # Member variables - id = Column(Integer, primary_key=True) - logbook_id = Column(String, default=uuidutils.generate_uuid, - unique=True) - name = Column(String) + __tablename__ = 'logbooks' # Relationships - flowdetails = relationship("FlowDetail", backref="logbook") + flowdetails = relationship("FlowDetail", + single_parent=True, + backref=backref("logbooks", + cascade="save-update, delete, " + "merge")) -class FlowDetail(BASE, TaskFlowBase): - """Represent FlowDetail objects""" - - __tablename__ = 'flowdetail' +class FlowDetail(BASE, ModelBase): + __tablename__ = 'flowdetails' # Member variables - id = Column(Integer, primary_key=True) - flowdetail_id = Column(String, default=uuidutils.generate_uuid, - unique=True) - name = Column(String) - flow_id = Column(String) - flow_type = Column(String) + state = Column(String) # Relationships - logbook_id = Column(Integer, ForeignKey('logbook.logbook_id')) - taskdetails = relationship("TaskDetail", backref="flowdetail") + parent_uuid = Column(String, ForeignKey('logbooks.uuid')) + taskdetails = relationship("TaskDetail", + single_parent=True, + backref=backref("flowdetails", + cascade="save-update, delete, " + "merge")) -class TaskDetail(BASE, TaskFlowBase): - """Represents TaskDetail objects""" - - __tablename__ = 'taskdetail' +class TaskDetail(BASE, ModelBase): + __tablename__ = 'taskdetails' # Member variables - id = Column(Integer, primary_key=True) - taskdetail_id = Column(String, default=uuidutils.generate_uuid, - unique=True) - name = Column(String) state = Column(String) results = Column(Json) - exception = Column(String) - stacktrace = Column(String) - meta = Column(String) - - task_id = Column(String) - task_name = Column(String) - task_provides = Column(Json) - task_requires = Column(Json) - task_optional = Column(Json) + exception = Column(Json) + stacktrace = Column(Json) + version = Column(String) # Relationships - flowdetail_id = Column(String, ForeignKey('flowdetail.flowdetail_id')) - - -def create_tables(): - BASE.metadata.create_all(sql_session.get_engine()) + parent_uuid = Column(String, ForeignKey('flowdetails.uuid')) diff --git a/taskflow/persistence/backends/sqlalchemy/session.py b/taskflow/persistence/backends/sqlalchemy/session.py deleted file mode 100644 index 39120a5a..00000000 --- a/taskflow/persistence/backends/sqlalchemy/session.py +++ /dev/null @@ -1,111 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Session Handling for SQLAlchemy backend.""" - -import logging - -import sqlalchemy -import sqlalchemy.engine -import sqlalchemy.interfaces -import sqlalchemy.orm -from sqlalchemy.pool import NullPool - -from sqlalchemy import exc - -from taskflow.persistence.backends import api as b_api - -LOG = logging.getLogger(__name__) - -_ENGINE = None -_MAKER = None - - -def get_session(autocommit=True, expire_on_commit=True): - """Return a SQLAlchemy session.""" - global _MAKER - - if _MAKER is None: - _MAKER = get_maker(get_engine(), autocommit, expire_on_commit) - return _MAKER() - - -def synchronous_switch_listener(dbapi_conn, connection_rec): - """Switch sqlite connections to non-synchronous mode""" - dbapi_conn.execute("PRAGMA synchronous = OFF") - - -def ping_listener(dbapi_conn, connection_rec, connection_proxy): - """Ensures that MySQL connections checked out of the -pool are alive. - -Borrowed from: -http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f -""" - try: - dbapi_conn.cursor().execute('select 1') - except dbapi_conn.OperationalError, ex: - if ex.args[0] in (2006, 2013, 2014, 2045, 2055): - LOG.warn(_('Got mysql server has gone away: %s'), ex) - raise exc.DisconnectionError("Database server went away") - else: - raise - - -def get_engine(): - """Return a SQLAlchemy engine.""" - global _ENGINE - if _ENGINE is None: - connection_dict = sqlalchemy.engine.url.make_url(_get_sql_connection()) - engine_args = { - "pool_recycle": _get_sql_idle_timeout(), - "echo": False, - "convert_unicode": True - } - - if "sqlite" in connection_dict.drivername: - engine_args['poolclass'] = NullPool - - _ENGINE = sqlalchemy.create_engine(_get_sql_connection(), - **engine_args) - - if 'mysql' in connection_dict.drivername: - sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener) - if 'sqlite' in connection_dict.drivername: - sqlalchemy.event.listen(_ENGINE, 'connect', - synchronous_switch_listener) - - # TODO(jharlow): Check to make sure engine connected - - return _ENGINE - - -def get_maker(engine, autocommit=True, expire_on_commit=False): - "Return a SQLAlchemy sessionmaker using the given engine.""" - return sqlalchemy.orm.sessionmaker(bind=engine, - autocommit=autocommit, - expire_on_commit=expire_on_commit) - - -def _get_sql_connection(): - return b_api.SQL_CONNECTION - - -def _get_sql_idle_timeout(): - return b_api.SQL_IDLE_TIMEOUT diff --git a/taskflow/persistence/flowdetail.py b/taskflow/persistence/flowdetail.py index ebcc2051..7e5c8eee 100644 --- a/taskflow/persistence/flowdetail.py +++ b/taskflow/persistence/flowdetail.py @@ -16,47 +16,71 @@ # License for the specific language governing permissions and limitations # under the License. -from datetime import datetime - from taskflow.openstack.common import uuidutils from taskflow.persistence.backends import api as b_api class FlowDetail(object): - """Generic backend class that contains TaskDetail snapshots from when - the FlowDetail is gotten. The data contained within this class is - not backed by the backend storage in real time. - The data in this class will only be persisted by the backend when - the save() method is called. + """This class contains an append-only list of task detail entries for a + given flow along with any metadata associated with that flow. + + The data contained within this class need *not* backed by the backend + storage in real time. The data in this class will only be guaranteed to be + persisted when the logbook that contains this flow detail is saved or when + the save() method is called directly. """ - def __init__(self, name, wf, fd_id=None): - if fd_id: - self._uuid = fd_id + def __init__(self, name, uuid=None, backend='memory'): + if uuid: + self._uuid = uuid else: self._uuid = uuidutils.generate_uuid() self._name = name - self._flow = wf - self.updated_at = datetime.now() self._taskdetails = [] + self.state = None + # Any other metadata to include about this flow while storing. For + # example timing information could be stored here, other misc. flow + # related items (edge connections)... + self.meta = None + self.backend = backend + + def _get_backend(self): + if not self.backend: + return None + return b_api.fetch(self.backend) + + def add(self, td): + self._taskdetails.append(td) + # When added the backend that the task details is using will be + # automatically switched to whatever backend this flow details is + # using. + if td.backend != self.backend: + td.backend = self.backend + + def find(self, td_uuid): + for self_td in self: + if self_td.uuid == td_uuid: + return self_td + return None def save(self): - b_api.flowdetail_save(self) + """Saves *most* of the components of this given object. - def delete(self): - b_api.flowdetail_delete(self) + This will immediately and atomically save the attributes of this flow + details object to a backing store providing by the backing api. - def add_task_detail(self, td): - self._taskdetails.append(td) - - def remove_task_detail(self, td): - self._taskdetails = [d for d in self._taskdetails - if d.uuid != td.uuid] - - def __contains__(self, td): - for self_td in self: - if self_td.taskdetail_id == td.taskdetail_id: - return True - return False + The underlying storage must contain an existing flow details that this + save operation will merge with and then reflect those changes in this + object. + """ + backend = self._get_backend() + if not backend: + raise NotImplementedError("No saving backend provided") + fd_u = backend.flowdetails_save(self) + if fd_u is self: + return + self.meta = fd_u.meta + self.state = fd_u.state + self._taskdetails = fd_u._taskdetails @property def uuid(self): @@ -66,10 +90,6 @@ class FlowDetail(object): def name(self): return self._name - @property - def flow(self): - return self._flow - def __iter__(self): for td in self._taskdetails: yield td diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index 82a38860..dcced943 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -16,46 +16,104 @@ # License for the specific language governing permissions and limitations # under the License. -from datetime import datetime - from taskflow.openstack.common import uuidutils from taskflow.persistence.backends import api as b_api class LogBook(object): - """Generic backend class that contains TaskDetail snapshots from when - the FlowDetail is gotten. The data contained within this class is - not backed by the backend storage in real time. - The data in this class will only be persisted by the backend when - the save method is called. + """This class that contains an append-only list of flow detail + entries for a given *job* so that the job can track what 'work' has been + completed for resumption/reverting and miscellaneous tracking purposes. + + The data contained within this class need *not* backed by the backend + storage in real time. The data in this class will only be guaranteed to be + persisted when the logbook is saved. """ - def __init__(self, name, lb_id=None): - if lb_id: - self._uuid = lb_id + def __init__(self, name, uuid=None, updated_at=None, created_at=None, + backend='memory'): + if uuid: + self._uuid = uuid else: self._uuid = uuidutils.generate_uuid() self._name = name - self.updated_at = datetime.now() self._flowdetails = [] + self._updated_at = updated_at + self._created_at = created_at + self.backend = backend + self.meta = None + + def _get_backend(self): + if not self.backend: + return None + return b_api.fetch(self.backend) + + @property + def created_at(self): + return self._created_at + + @property + def updated_at(self): + return self._updated_at def save(self): - b_api.logbook_save(self) + """Saves all the components of the given logbook. + + This will immediately and atomically save all the entries of the given + logbook, including any flow details, and any associated task details, + that may be contained in this logbook to a backing store providing by + the backing api. + + If the logbook is the underlying storage contains an existing logbook + then this save operation will merge the different flow details objects + to that logbook and then reflect those changes in this logbook. + """ + backend = self._get_backend() + if not backend: + raise NotImplementedError("No saving backend provided") + s_book = backend.logbook_save(self) + if s_book is self: + return + # Alter the internal variables to reflect what was saved (which may + # have new additions if there was a merge with pre-existing data). + self._name = s_book._name + self._flowdetails = s_book._flowdetails + self._updated_at = s_book._updated_at + self._created_at = s_book._created_at + self.meta = s_book.meta def delete(self): - b_api.logbook_delete(self) + """Deletes all traces of the given logbook. - def add_flow_detail(self, fd): - self._flowdetails.append(fd) + This will delete the logbook entry, any associated flow detail entries + and any associated task detail entries associated with those flow + detail entries immediately via the backing api (using a atomic + transaction). + """ + backend = self._get_backend() + if not backend: + raise NotImplementedError("No deleting backend provided") + backend.logbook_destroy(self.uuid) - def remove_flow_detail(self, fd): - self._flowdetails = [d for d in self._flowdetails - if d.uuid != fd.uuid] + def add(self, flow_detail): + """Adds a new entry to the underlying logbook. - def __contains__(self, fd): - for self_fd in self: - if self_fd.flowdetail_id == fd.flowdetail_id: - return True - return False + Does not *guarantee* that the details will be immediatly saved. + """ + self._flowdetails.append(flow_detail) + # When added the backend that the flow details (and any owned task + # details) is using will be automatically switched to whatever backend + # this logbook is using. + if flow_detail.backend != self.backend: + flow_detail.backend = self.backend + for task_detail in flow_detail: + if task_detail.backend != self.backend: + task_detail.backend = self.backend + + def find(self, flow_uuid): + for fd in self._flowdetails: + if fd.uuid == flow_uuid: + return fd + return None @property def uuid(self): @@ -71,3 +129,8 @@ class LogBook(object): def __len__(self): return len(self._flowdetails) + + +def load(lb_id, backend='memory'): + """Loads a given logbook (if it exists) from the given backend type.""" + return b_api.fetch(backend).logbook_get(lb_id) diff --git a/taskflow/persistence/taskdetail.py b/taskflow/persistence/taskdetail.py index f182eff6..836a8c16 100644 --- a/taskflow/persistence/taskdetail.py +++ b/taskflow/persistence/taskdetail.py @@ -16,33 +16,83 @@ # License for the specific language governing permissions and limitations # under the License. -from datetime import datetime - from taskflow.openstack.common import uuidutils from taskflow.persistence.backends import api as b_api class TaskDetail(object): - def __init__(self, name, task, td_id=None): - if td_id: - self._uuid = td_id + """This class contains an entry that contains the persistance of a task + after or before (or during) it is running including any results it may have + produced, any state that it may be in (failed for example), any exception + that occured when running and any associated stacktrace that may have + occuring during that exception being thrown and any other metadata that + should be stored along-side the details about this task. + + The data contained within this class need *not* backed by the backend + storage in real time. The data in this class will only be guaranteed to be + persisted when the logbook that contains this task detail is saved or when + the save() method is called directly. + """ + def __init__(self, name, task=None, uuid=None, backend='memory'): + if uuid: + self._uuid = uuid else: self._uuid = uuidutils.generate_uuid() - self._name = name - self.updated_at = datetime.now() + # TODO(harlowja): decide if these should be passed in and therefore + # immutable or let them be assigned? + # + # The state the task was last in. self.state = None + # The results it may have produced (useful for reverting). self.results = None + # An exception that it may have thrown (or part of it), useful for + # knowing what failed. self.exception = None + # Any stack trace the exception may have had, useful for debugging or + # examining the failure in more depth. self.stacktrace = None + # Any other metadata to include about this task while storing. For + # example timing information could be stored here, other misc. task + # related items. self.meta = None - self.task = task + # The version of the task this task details was associated with which + # is quite useful for determining what versions of tasks this detail + # information can be associated with. + self.version = None + if task and task.version: + if isinstance(task.version, basestring): + self.version = str(task.version) + elif isinstance(task.version, (tuple, list)): + self.version = '.'.join([str(p) for p in task.version]) + self.backend = backend def save(self): - b_api.taskdetail_save(self) + """Saves *most* of the components of this given object. - def delete(self): - b_api.taskdetail_delete(self) + This will immediately and atomically save the attributes of this task + details object to a backing store providing by the backing api. + + The underlying storage must contain an existing task details that this + save operation will merge with and then reflect those changes in this + object. + """ + backend = self._get_backend() + if not backend: + raise NotImplementedError("No saving backend provided") + td_u = backend.taskdetails_save(self) + if td_u is self: + return + self.meta = td_u.meta + self.exception = td_u.exception + self.results = td_u.results + self.stacktrace = td_u.stacktrace + self.state = td_u.state + + def _get_backend(self): + if not self.backend: + return None + return b_api.fetch(self.backend) @property def uuid(self): diff --git a/taskflow/tests/unit/memory_api/test_flowdetail_api.py b/taskflow/tests/unit/memory_api/test_flowdetail_api.py deleted file mode 100644 index 24fd0972..00000000 --- a/taskflow/tests/unit/memory_api/test_flowdetail_api.py +++ /dev/null @@ -1,212 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Import required libraries""" -import unittest2 - -from taskflow import exceptions as exception -from taskflow.openstack.common import uuidutils -from taskflow.patterns import graph_flow as flow -from taskflow.persistence.backends import api as b_api -from taskflow.persistence import flowdetail -from taskflow.tests import utils - - -class FlowDetailTest(unittest2.TestCase): - """This class is to test the functionality of the backend API's flowdetail - methods. - """ - wfs = [] - fd_names = [] - fd_ids = [] - tsks = [] - td_names = [] - td_ids = [] - - @classmethod - def setUpClass(cls): - # Create a workflow for flowdetails to use - wf_id = uuidutils.generate_uuid() - wf_name = 'wf-%s' % (wf_id) - - wf = flow.Flow(wf_name, None, wf_id) - cls.wfs.append(wf) - - # Create a task for taskdetails to use - task_id = uuidutils.generate_uuid() - task_name = 'task-%s' % (task_id) - - tsk = utils.DummyTask(task_name, task_id) - cls.tsks.append(tsk) - - @classmethod - def tearDownClass(cls): - # Clear out the lists of workflows and tasks - utils.drain(cls.wfs) - utils.drain(cls.tsks) - - def setUp(self): - # Create a flowdetail and record its uuid and name - fd_id = uuidutils.generate_uuid() - fd_name = 'fd-%s' % (fd_id) - - b_api.flowdetail_create(fd_name, self.wfs[0], fd_id) - self.fd_names.append(fd_name) - self.fd_ids.append(fd_id) - - # Create a taskdetail and record its uuid and name - td_id = uuidutils.generate_uuid() - td_name = 'td-%s' % (td_id) - - b_api.taskdetail_create(td_name, self.tsks[0], td_id) - self.td_names.append(td_name) - self.td_ids.append(td_id) - - def tearDown(self): - # Destroy all taskdetails and flowdetails in the backend - for id in self.td_ids: - b_api.taskdetail_destroy(id) - for id in self.fd_ids: - b_api.flowdetail_destroy(id) - - # Drain the lists of taskdetail and flowdetail uuids and names - utils.drain(self.fd_names) - utils.drain(self.fd_ids) - utils.drain(self.td_names) - utils.drain(self.td_ids) - - def test_flowdetail_create(self): - # Create a flowdetail and record its uuid and name - fd_id = uuidutils.generate_uuid() - fd_name = 'fd-%s' % (fd_id) - - b_api.flowdetail_create(fd_name, self.wfs[0], fd_id) - self.fd_names.append(fd_name) - self.fd_ids.append(fd_id) - - # Check to see that the created flowdetail is there - actual = b_api.flowdetail_get(fd_id) - - self.assertIsNotNone(actual) - - def test_flowdetail_destroy(self): - # Destroy the last added flowdetail - id = self.fd_ids.pop() - b_api.flowdetail_destroy(id) - self.fd_names.pop() - - # Check to make sure the removed flowdetail is no longer there - self.assertRaises(exception.NotFound, b_api.flowdetail_get, - id) - - def test_flowdetail_save(self): - # Create a generic flowdetail to save - fd_id = uuidutils.generate_uuid() - fd_name = 'fd-%s' % (fd_id) - wf = self.wfs[0] - fd = flowdetail.FlowDetail(fd_name, wf, fd_id) - - # Save the generic flowdetail to the backend and record its uuid/name - b_api.flowdetail_save(fd) - self.fd_names.append(fd_name) - self.fd_ids.append(fd_id) - - # Check that the saved flowdetail is in the backend - actual = b_api.flowdetail_get(fd_id) - - self.assertIsNotNone(actual) - # Check that the saved flowdetail has no taskdetails - self.assertEquals(len(actual), 0) - - # Add a generic taskdetail to the flowdetail - td = b_api.taskdetail_get(self.td_ids[0]) - fd.add_task_detail(td) - - # Save the updated flowdetail - b_api.flowdetail_save(fd) - - # Check that the saved flowdetail is still there - actual = b_api.flowdetail_get(fd_id) - - self.assertIsNotNone(actual) - # Check that the addition of a taskdetail was recorded - self.assertEquals(len(actual), 1) - - def test_flowdetail_delete(self): - # Get the flowdetail to delete - id = self.fd_ids.pop() - fd = b_api.flowdetail_get(id) - # Delete the flowdetail - b_api.flowdetail_delete(fd) - self.fd_names.pop() - - # Make sure it is not there anymore - self.assertRaises(exception.NotFound, b_api.flowdetail_get, - id) - - def test_flowdetail_get(self): - # Get the first flowdetail - actual = b_api.flowdetail_get(self.fd_ids[0]) - - # Check that it is a flowdetail - self.assertIsInstance(actual, flowdetail.FlowDetail) - # Check that its name matches what is expected - self.assertEquals(actual.name, self.fd_names[0]) - - def test_flowdetail_add_task_detail(self): - # Get the first flowdetail - actual = b_api.flowdetail_get(self.fd_ids[0]) - - # Make sure it has no taskdetails - self.assertEquals(len(actual), 0) - - # Add a taskdetail to the flowdetail - b_api.flowdetail_add_task_detail(self.fd_ids[0], self.td_ids[0]) - - # Get the flowdetail again - actual = b_api.flowdetail_get(self.fd_ids[0]) - - # Check that the flowdetail has one taskdetail - self.assertEquals(len(actual), 1) - - def test_flowdetail_remove_taskdetail(self): - # Add a taskdetail to the first flowdetail - b_api.flowdetail_add_task_detail(self.fd_ids[0], self.td_ids[0]) - - # Get the first flowdetail - actual = b_api.flowdetail_get(self.fd_ids[0]) - - # Check that the first flowdetail has exactly one taskdetail - self.assertEquals(len(actual), 1) - - # Remove the taskdetail from the first flowdetail - b_api.flowdetail_remove_taskdetail(self.fd_ids[0], self.td_ids[0]) - - # Get the first flowdetail - actual = b_api.flowdetail_get(self.fd_ids[0]) - - # Check that the first flowdetail no longer has any taskdetails - self.assertEquals(len(actual), 0) - - def test_flowdetail_get_ids_names(self): - # Get a list of all uuids and names for flowdetails - actual = b_api.flowdetail_get_ids_names() - - # Match it to our in-memory records - self.assertEquals(actual.values(), self.fd_names) - self.assertEquals(actual.keys(), self.fd_ids) diff --git a/taskflow/tests/unit/memory_api/test_logbook_api.py b/taskflow/tests/unit/memory_api/test_logbook_api.py deleted file mode 100644 index 136027ff..00000000 --- a/taskflow/tests/unit/memory_api/test_logbook_api.py +++ /dev/null @@ -1,202 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Import required libraries""" -import unittest2 - -from taskflow import exceptions as exception -from taskflow.openstack.common import uuidutils -from taskflow.patterns import graph_flow as flow -from taskflow.persistence.backends import api as b_api -from taskflow.persistence import logbook -from taskflow.tests import utils - - -class LogBookTest(unittest2.TestCase): - """This class is designed to test the functionality of the backend API's - logbook methods - """ - lb_names = [] - lb_ids = [] - wfs = [] - fd_names = [] - fd_ids = [] - - @classmethod - def setUpClass(cls): - # Create a workflow to create flowdetails with - wf_id = uuidutils.generate_uuid() - wf_name = 'wf-%s' % (wf_id) - - wf = flow.Flow(wf_name, None, wf_id) - cls.wfs.append(wf) - - @classmethod - def tearDownClass(cls): - # Empty the list of workflows - utils.drain(cls.wfs) - - def setUp(self): - # Create a logbook and record its uuid and name - lb_id = uuidutils.generate_uuid() - lb_name = 'lb-%s' % (lb_id) - - b_api.logbook_create(lb_name, lb_id) - self.lb_names.append(lb_name) - self.lb_ids.append(lb_id) - - # Create a flowdetail and record its uuid and name - fd_id = uuidutils.generate_uuid() - fd_name = 'fd-%s' % (fd_id) - - b_api.flowdetail_create(fd_name, self.wfs[0], fd_id) - self.fd_names.append(fd_name) - self.fd_ids.append(fd_id) - - def tearDown(self): - # Destroy all flowdetails and logbooks in the backend - for id in self.fd_ids: - b_api.flowdetail_destroy(id) - for id in self.lb_ids: - b_api.logbook_destroy(id) - - # Clear the lists of logbook and flowdetail uuids and names - utils.drain(self.lb_names) - utils.drain(self.lb_ids) - utils.drain(self.fd_names) - utils.drain(self.fd_ids) - - def test_logbook_create(self): - # Create a logbook and record its uuid and name - lb_id = uuidutils.generate_uuid() - lb_name = 'lb-%s' % (lb_id) - - b_api.logbook_create(lb_name, lb_id) - self.lb_names.append(lb_name) - self.lb_ids.append(lb_id) - - # Check that the created logbook exists in the backend - actual = b_api.logbook_get(lb_id) - - self.assertIsNotNone(actual) - - def test_logbook_destroy(self): - # Delete the last added logbook - id = self.lb_ids.pop() - b_api.logbook_destroy(id) - self.lb_names.pop() - - # Check that the deleted logbook is no longer there - self.assertRaises(exception.NotFound, b_api.logbook_get, - id) - - def test_logbook_save(self): - # Create a generic logbook to save - lb_id = uuidutils.generate_uuid() - lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(lb_name, lb_id) - - # Save the logbook and record its uuid and name - b_api.logbook_save(lb) - self.lb_names.append(lb_name) - self.lb_ids.append(lb_id) - - # Check that the saved logbook exists in the backend - actual = b_api.logbook_get(lb_id) - - self.assertIsNotNone(actual) - # Check that the saved logbook has no flowdetails - self.assertEquals(len(actual), 0) - - # Add a flowdetail to the logbook - fd = b_api.flowdetail_get(self.fd_ids[0]) - lb.add_flow_detail(fd) - - # Save the updated logbook - b_api.logbook_save(lb) - - # Check that the updated logbook is still in the backend - actual = b_api.logbook_get(lb_id) - - self.assertIsNotNone(actual) - # Check that the added flowdetail was recorded - self.assertEquals(len(actual), 1) - - def test_logbook_delete(self): - # Get the logbook to delete - id = self.lb_ids.pop() - lb = b_api.logbook_get(id) - # Delete the logbook from the backend - b_api.logbook_delete(lb) - self.lb_names.pop() - - # Check that the deleted logbook is no longer present - self.assertRaises(exception.NotFound, b_api.logbook_get, - id) - - def test_logbook_get(self): - # Get the logbook from the backend - actual = b_api.logbook_get(self.lb_ids[0]) - - # Check that it is actually a logbook - self.assertIsInstance(actual, logbook.LogBook) - # Check that the name is correct - self.assertEquals(actual.name, self.lb_names[0]) - - def test_logbook_add_flow_detail(self): - # Get the logbook from the backend - actual = b_api.logbook_get(self.lb_ids[0]) - - # Check that it has no flowdetails - self.assertEquals(len(actual), 0) - - # Add a flowdetail to the logbook - b_api.logbook_add_flow_detail(self.lb_ids[0], self.fd_ids[0]) - - # Get the logbook again - actual = b_api.logbook_get(self.lb_ids[0]) - - # Check that the logbook has exactly one flowdetail - self.assertEquals(len(actual), 1) - - def test_logbook_remove_flowdetail(self): - # Add a flowdetail to the first logbook - b_api.logbook_add_flow_detail(self.lb_ids[0], self.fd_ids[0]) - - # Get the first logbook - actual = b_api.logbook_get(self.lb_ids[0]) - - # Check that it has exactly one flowdetail - self.assertEquals(len(actual), 1) - - # Remove the flowdetail from the logbook - b_api.logbook_remove_flowdetail(self.lb_ids[0], self.fd_ids[0]) - - # Get the logbook again - actual = b_api.logbook_get(self.lb_ids[0]) - - # Check that the logbook now has no flowdetails - self.assertEquals(len(actual), 0) - - def test_logbook_get_ids_names(self): - # Get the dict of uuids and names - actual = b_api.logbook_get_ids_names() - - # Check that it matches our in-memory list - self.assertEquals(actual.values(), self.lb_names) - self.assertEquals(actual.keys(), self.lb_ids) diff --git a/taskflow/tests/unit/memory_api/test_taskdetail_api.py b/taskflow/tests/unit/memory_api/test_taskdetail_api.py deleted file mode 100644 index bb49afc3..00000000 --- a/taskflow/tests/unit/memory_api/test_taskdetail_api.py +++ /dev/null @@ -1,190 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Import required libraries""" -import unittest2 - -from taskflow import exceptions as exception -from taskflow.openstack.common import uuidutils -from taskflow.persistence.backends import api as b_api -from taskflow.persistence import taskdetail -from taskflow.tests import utils - - -class TaskDetailTest(unittest2.TestCase): - """This class is designed to test the functionality of the backend API's - taskdetail methods - """ - tsks = [] - td_names = [] - td_ids = [] - - @classmethod - def setUpClass(cls): - # Create a task for taskdetails to be made from - task_id = uuidutils.generate_uuid() - task_name = 'task-%s' % (task_id) - - tsk = utils.DummyTask(task_name, task_id) - tsk.requires.update('r') - tsk.optional.update('o') - tsk.provides.update('p') - - cls.tsks.append(tsk) - - @classmethod - def tearDownClass(cls): - # Clear the tasks - utils.drain(cls.tsks) - - def setUp(self): - # Create a taskdetail and record its uuid and name - td_id = uuidutils.generate_uuid() - td_name = 'td-%s' % (td_id) - - b_api.taskdetail_create(td_name, self.tsks[0], td_id) - self.td_names.append(td_name) - self.td_ids.append(td_id) - - def tearDown(self): - # Destroy all taskdetails from the backend - for id in self.td_ids: - b_api.taskdetail_destroy(id) - - # Clear the list of taskdetail names and uuids - utils.drain(self.td_names) - utils.drain(self.td_ids) - - def test_taskdetail_create(self): - # Create a taskdetail and record its uuid and name - td_id = uuidutils.generate_uuid() - td_name = 'td-%s' % (td_id) - - b_api.taskdetail_create(td_name, self.tsks[0], td_id) - self.td_names.append(td_name) - self.td_ids.append(td_id) - - # Check that the taskdetail is there - actual = b_api.taskdetail_get(td_id) - - self.assertIsNotNone(actual) - - def test_taskdetail_destroy(self): - # Destroy the last added taskdetail - id = self.td_ids.pop() - b_api.taskdetail_destroy(id) - self.td_names.pop() - - # Check that the deleted taskdetail is no longer there - self.assertRaises(exception.NotFound, b_api.taskdetail_get, - id) - - def test_taskdetail_save(self): - # Create a generic taskdetail to save - td_id = uuidutils.generate_uuid() - td_name = 'td-%s' % (td_id) - tsk = self.tsks[0] - td = taskdetail.TaskDetail(td_name, tsk, td_id) - - # Save the generic taskdetail to the backend and record uuid/name - b_api.taskdetail_save(td) - self.td_names.append(td_name) - self.td_ids.append(td_id) - - # Get the created taskdetail and check for default attributes - actual = b_api.taskdetail_get(td_id) - - self.assertIsNotNone(actual) - self.assertIsNone(actual.state) - self.assertIsNone(actual.results) - self.assertIsNone(actual.exception) - self.assertIsNone(actual.stacktrace) - self.assertIsNone(actual.meta) - - # Change the generic taskdetail's attributes - td.state = 'SUCCESS' - td.exception = 'ERROR' - td.stacktrace = 'STACKTRACE' - td.meta = 'META' - - # Save the changed taskdetail - b_api.taskdetail_save(td) - - # Get the updated taskdetail and check for updated attributes - actual = b_api.taskdetail_get(td_id) - - self.assertEquals(actual.state, 'SUCCESS') - self.assertIsNone(actual.results) - self.assertEquals(actual.exception, 'ERROR') - self.assertEquals(actual.stacktrace, 'STACKTRACE') - self.assertEquals(actual.meta, 'META') - - def test_taskdetail_delete(self): - # Get the taskdetail to delete - id = self.td_ids.pop() - td = b_api.taskdetail_get(id) - # Delete the desired taskdetail - b_api.taskdetail_delete(td) - self.td_names.pop() - - # Check that the deleted taskdetail is no longer there - self.assertRaises(exception.NotFound, b_api.taskdetail_get, - id) - - def test_taskdetail_get(self): - # Get the first taskdetail - actual = b_api.taskdetail_get(self.td_ids[0]) - - # Check that it is actually a taskdetail - self.assertIsInstance(actual, taskdetail.TaskDetail) - # Check that its name is what we expect - self.assertEquals(actual.name, self.td_names[0]) - - def test_taskdetail_update(self): - # Get the first taskdetail and check for default attributes - actual = b_api.taskdetail_get(self.td_ids[0]) - - self.assertIsNone(actual.state) - self.assertIsNone(actual.results) - self.assertIsNone(actual.exception) - self.assertIsNone(actual.stacktrace) - self.assertIsNone(actual.meta) - - # Prepare attributes for updating - values = dict(state='SUCCESS', exception='ERROR', - stacktrace='STACKTRACE', meta='META') - - # Update attributes - b_api.taskdetail_update(self.td_ids[0], values) - - # Get the updated taskdetila and check for updated attributes - actual = b_api.taskdetail_get(self.td_ids[0]) - - self.assertEquals(actual.state, 'SUCCESS') - self.assertIsNone(actual.results) - self.assertEquals(actual.exception, 'ERROR') - self.assertEquals(actual.stacktrace, 'STACKTRACE') - self.assertEquals(actual.meta, 'META') - - def test_taskdetail_get_ids_names(self): - # Get dict of uuids and names - actual = b_api.taskdetail_get_ids_names() - - # Check that it matches our in-memory records - self.assertEquals(actual.values(), self.td_names) - self.assertEquals(actual.keys(), self.td_ids) diff --git a/taskflow/tests/unit/memory_api/__init__.py b/taskflow/tests/unit/persistence/__init__.py similarity index 77% rename from taskflow/tests/unit/memory_api/__init__.py rename to taskflow/tests/unit/persistence/__init__.py index 44fa8d98..830dd2e7 100644 --- a/taskflow/tests/unit/memory_api/__init__.py +++ b/taskflow/tests/unit/persistence/__init__.py @@ -3,7 +3,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -16,13 +15,3 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -from taskflow.persistence.backends import api as b_api - - -def setUpModule(): - b_api.configure('mem_backend') - - -def tearDownModule(): - pass diff --git a/taskflow/tests/unit/persistence/base.py b/taskflow/tests/unit/persistence/base.py new file mode 100644 index 00000000..270b1b30 --- /dev/null +++ b/taskflow/tests/unit/persistence/base.py @@ -0,0 +1,161 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow import exceptions as exc +from taskflow.openstack.common import uuidutils +from taskflow.persistence import flowdetail +from taskflow.persistence import logbook +from taskflow.persistence import taskdetail + + +class PersistenceTestMixin(object): + def _get_backend(): + raise NotImplementedError() + + def test_logbook_simple_save(self): + lb_id = uuidutils.generate_uuid() + lb_meta = {'1': 2} + lb_name = 'lb-%s' % (lb_id) + lb = logbook.LogBook(name=lb_name, uuid=lb_id, + backend=self._get_backend()) + lb.meta = lb_meta + + # Should not already exist + self.assertRaises(exc.NotFound, logbook.load, lb_id, + backend=self._get_backend()) + + lb.save() + del lb + lb = None + + lb = logbook.load(lb_id, backend=self._get_backend()) + self.assertEquals(lb_name, lb.name) + self.assertEquals(0, len(lb)) + self.assertEquals(lb_meta, lb.meta) + self.assertIsNone(lb.updated_at) + self.assertIsNotNone(lb.created_at) + + def test_flow_detail_save(self): + lb_id = uuidutils.generate_uuid() + lb_name = 'lb-%s' % (lb_id) + lb = logbook.LogBook(name=lb_name, uuid=lb_id, + backend=self._get_backend()) + + fd = flowdetail.FlowDetail('test') + lb.add(fd) + + # Ensure we can't save it since its owning logbook hasn't been + # saved. + self.assertRaises(exc.NotFound, fd.save) + + # Ok now we should be able to save it + lb.save() + fd.save() + + def test_task_detail_save(self): + lb_id = uuidutils.generate_uuid() + lb_name = 'lb-%s' % (lb_id) + lb = logbook.LogBook(name=lb_name, uuid=lb_id, + backend=self._get_backend()) + + fd = flowdetail.FlowDetail('test') + lb.add(fd) + td = taskdetail.TaskDetail("detail-1") + fd.add(td) + + # Ensure we can't save it since its owning logbook hasn't been + # saved. + self.assertRaises(exc.NotFound, fd.save) + self.assertRaises(exc.NotFound, td.save) + + # Ok now we should be able to save it + lb.save() + fd.save() + td.save() + + def test_logbook_merge_flow_detail(self): + lb_id = uuidutils.generate_uuid() + lb_name = 'lb-%s' % (lb_id) + lb = logbook.LogBook(name=lb_name, uuid=lb_id, + backend=self._get_backend()) + + fd = flowdetail.FlowDetail('test') + lb.add(fd) + lb.save() + + lb2 = logbook.LogBook(name=lb_name, uuid=lb_id, + backend=self._get_backend()) + fd = flowdetail.FlowDetail('test2') + lb2.add(fd) + lb2.save() + + lb3 = logbook.load(lb_id, backend=self._get_backend()) + self.assertEquals(2, len(lb3)) + + def test_logbook_add_flow_detail(self): + lb_id = uuidutils.generate_uuid() + lb_name = 'lb-%s' % (lb_id) + lb = logbook.LogBook(name=lb_name, uuid=lb_id, + backend=self._get_backend()) + + fd = flowdetail.FlowDetail('test') + lb.add(fd) + lb.save() + + lb2 = logbook.load(lb_id, backend=self._get_backend()) + self.assertEquals(1, len(lb2)) + self.assertEquals(1, len(lb)) + + self.assertEquals(fd.name, lb2.find(fd.uuid).name) + + def test_logbook_add_task_detail(self): + lb_id = uuidutils.generate_uuid() + lb_name = 'lb-%s' % (lb_id) + lb = logbook.LogBook(name=lb_name, uuid=lb_id, + backend=self._get_backend()) + + fd = flowdetail.FlowDetail('test') + td = taskdetail.TaskDetail("detail-1") + fd.add(td) + lb.add(fd) + lb.save() + + lb2 = logbook.load(lb_id, backend=self._get_backend()) + self.assertEquals(1, len(lb2)) + tasks = 0 + for fd in lb: + tasks += len(fd) + self.assertEquals(1, tasks) + + def test_logbook_delete(self): + lb_id = uuidutils.generate_uuid() + lb_name = 'lb-%s' % (lb_id) + lb = logbook.LogBook(name=lb_name, uuid=lb_id, + backend=self._get_backend()) + + # Ensure we can't delete it since it hasn't been saved + self.assertRaises(exc.NotFound, lb.delete) + + lb.save() + + lb2 = logbook.load(lb_id, backend=self._get_backend()) + self.assertIsNotNone(lb2) + + lb.delete() + + self.assertRaises(exc.NotFound, lb.delete) diff --git a/taskflow/tests/unit/sql_db_api/__init__.py b/taskflow/tests/unit/persistence/test_memory_persistence.py similarity index 67% rename from taskflow/tests/unit/sql_db_api/__init__.py rename to taskflow/tests/unit/persistence/test_memory_persistence.py index 9ca097bb..37aaa366 100644 --- a/taskflow/tests/unit/sql_db_api/__init__.py +++ b/taskflow/tests/unit/persistence/test_memory_persistence.py @@ -2,7 +2,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. # Copyright (C) 2013 Rackspace Hosting All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -17,20 +16,16 @@ # License for the specific language governing permissions and limitations # under the License. -import os - -from os import path from taskflow.persistence.backends import api as b_api -from taskflow.persistence.backends.sqlalchemy import models +from taskflow.tests.unit.persistence import base + +import unittest2 -def setUpModule(): - b_api.configure('db_backend') - b_api.SQL_CONNECTION = 'sqlite:///test.db' +class MemoryPersistenceTest(unittest2.TestCase, base.PersistenceTestMixin): + def _get_backend(self): + return 'memory' - if not path.isfile('test.db'): - models.create_tables() - - -def tearDownModule(): - os.remove('test.db') + def tearDown(self): + b_api.fetch(self._get_backend()).clear_all() + super(MemoryPersistenceTest, self).tearDown() diff --git a/taskflow/tests/unit/persistence/test_sql_persistence.py b/taskflow/tests/unit/persistence/test_sql_persistence.py new file mode 100644 index 00000000..1a67f6a6 --- /dev/null +++ b/taskflow/tests/unit/persistence/test_sql_persistence.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import tempfile + +from taskflow.openstack.common.db.sqlalchemy import session +from taskflow.persistence.backends import api as b_api +from taskflow.persistence.backends.sqlalchemy import migration +from taskflow.tests.unit.persistence import base + +import unittest2 + + +class SqlPersistenceTest(unittest2.TestCase, base.PersistenceTestMixin): + """Inherits from the base test and sets up a sqlite temporary db.""" + def _get_backend(self): + return 'sqlalchemy' + + def setupDatabase(self): + _handle, db_location = tempfile.mkstemp() + db_uri = "sqlite:///%s" % (db_location) + session.set_defaults(db_uri, db_location) + migration.db_sync() + return db_location + + def setUp(self): + super(SqlPersistenceTest, self).setUp() + self.db_location = self.setupDatabase() + + def tearDown(self): + b_api.fetch(self._get_backend()).clear_all() + super(SqlPersistenceTest, self).tearDown() + if self.db_location and os.path.isfile(self.db_location): + os.unlink(self.db_location) + self.db_location = None diff --git a/taskflow/tests/unit/sql_db_api/test_flowdetail_api.py b/taskflow/tests/unit/sql_db_api/test_flowdetail_api.py deleted file mode 100644 index 24fd0972..00000000 --- a/taskflow/tests/unit/sql_db_api/test_flowdetail_api.py +++ /dev/null @@ -1,212 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Import required libraries""" -import unittest2 - -from taskflow import exceptions as exception -from taskflow.openstack.common import uuidutils -from taskflow.patterns import graph_flow as flow -from taskflow.persistence.backends import api as b_api -from taskflow.persistence import flowdetail -from taskflow.tests import utils - - -class FlowDetailTest(unittest2.TestCase): - """This class is to test the functionality of the backend API's flowdetail - methods. - """ - wfs = [] - fd_names = [] - fd_ids = [] - tsks = [] - td_names = [] - td_ids = [] - - @classmethod - def setUpClass(cls): - # Create a workflow for flowdetails to use - wf_id = uuidutils.generate_uuid() - wf_name = 'wf-%s' % (wf_id) - - wf = flow.Flow(wf_name, None, wf_id) - cls.wfs.append(wf) - - # Create a task for taskdetails to use - task_id = uuidutils.generate_uuid() - task_name = 'task-%s' % (task_id) - - tsk = utils.DummyTask(task_name, task_id) - cls.tsks.append(tsk) - - @classmethod - def tearDownClass(cls): - # Clear out the lists of workflows and tasks - utils.drain(cls.wfs) - utils.drain(cls.tsks) - - def setUp(self): - # Create a flowdetail and record its uuid and name - fd_id = uuidutils.generate_uuid() - fd_name = 'fd-%s' % (fd_id) - - b_api.flowdetail_create(fd_name, self.wfs[0], fd_id) - self.fd_names.append(fd_name) - self.fd_ids.append(fd_id) - - # Create a taskdetail and record its uuid and name - td_id = uuidutils.generate_uuid() - td_name = 'td-%s' % (td_id) - - b_api.taskdetail_create(td_name, self.tsks[0], td_id) - self.td_names.append(td_name) - self.td_ids.append(td_id) - - def tearDown(self): - # Destroy all taskdetails and flowdetails in the backend - for id in self.td_ids: - b_api.taskdetail_destroy(id) - for id in self.fd_ids: - b_api.flowdetail_destroy(id) - - # Drain the lists of taskdetail and flowdetail uuids and names - utils.drain(self.fd_names) - utils.drain(self.fd_ids) - utils.drain(self.td_names) - utils.drain(self.td_ids) - - def test_flowdetail_create(self): - # Create a flowdetail and record its uuid and name - fd_id = uuidutils.generate_uuid() - fd_name = 'fd-%s' % (fd_id) - - b_api.flowdetail_create(fd_name, self.wfs[0], fd_id) - self.fd_names.append(fd_name) - self.fd_ids.append(fd_id) - - # Check to see that the created flowdetail is there - actual = b_api.flowdetail_get(fd_id) - - self.assertIsNotNone(actual) - - def test_flowdetail_destroy(self): - # Destroy the last added flowdetail - id = self.fd_ids.pop() - b_api.flowdetail_destroy(id) - self.fd_names.pop() - - # Check to make sure the removed flowdetail is no longer there - self.assertRaises(exception.NotFound, b_api.flowdetail_get, - id) - - def test_flowdetail_save(self): - # Create a generic flowdetail to save - fd_id = uuidutils.generate_uuid() - fd_name = 'fd-%s' % (fd_id) - wf = self.wfs[0] - fd = flowdetail.FlowDetail(fd_name, wf, fd_id) - - # Save the generic flowdetail to the backend and record its uuid/name - b_api.flowdetail_save(fd) - self.fd_names.append(fd_name) - self.fd_ids.append(fd_id) - - # Check that the saved flowdetail is in the backend - actual = b_api.flowdetail_get(fd_id) - - self.assertIsNotNone(actual) - # Check that the saved flowdetail has no taskdetails - self.assertEquals(len(actual), 0) - - # Add a generic taskdetail to the flowdetail - td = b_api.taskdetail_get(self.td_ids[0]) - fd.add_task_detail(td) - - # Save the updated flowdetail - b_api.flowdetail_save(fd) - - # Check that the saved flowdetail is still there - actual = b_api.flowdetail_get(fd_id) - - self.assertIsNotNone(actual) - # Check that the addition of a taskdetail was recorded - self.assertEquals(len(actual), 1) - - def test_flowdetail_delete(self): - # Get the flowdetail to delete - id = self.fd_ids.pop() - fd = b_api.flowdetail_get(id) - # Delete the flowdetail - b_api.flowdetail_delete(fd) - self.fd_names.pop() - - # Make sure it is not there anymore - self.assertRaises(exception.NotFound, b_api.flowdetail_get, - id) - - def test_flowdetail_get(self): - # Get the first flowdetail - actual = b_api.flowdetail_get(self.fd_ids[0]) - - # Check that it is a flowdetail - self.assertIsInstance(actual, flowdetail.FlowDetail) - # Check that its name matches what is expected - self.assertEquals(actual.name, self.fd_names[0]) - - def test_flowdetail_add_task_detail(self): - # Get the first flowdetail - actual = b_api.flowdetail_get(self.fd_ids[0]) - - # Make sure it has no taskdetails - self.assertEquals(len(actual), 0) - - # Add a taskdetail to the flowdetail - b_api.flowdetail_add_task_detail(self.fd_ids[0], self.td_ids[0]) - - # Get the flowdetail again - actual = b_api.flowdetail_get(self.fd_ids[0]) - - # Check that the flowdetail has one taskdetail - self.assertEquals(len(actual), 1) - - def test_flowdetail_remove_taskdetail(self): - # Add a taskdetail to the first flowdetail - b_api.flowdetail_add_task_detail(self.fd_ids[0], self.td_ids[0]) - - # Get the first flowdetail - actual = b_api.flowdetail_get(self.fd_ids[0]) - - # Check that the first flowdetail has exactly one taskdetail - self.assertEquals(len(actual), 1) - - # Remove the taskdetail from the first flowdetail - b_api.flowdetail_remove_taskdetail(self.fd_ids[0], self.td_ids[0]) - - # Get the first flowdetail - actual = b_api.flowdetail_get(self.fd_ids[0]) - - # Check that the first flowdetail no longer has any taskdetails - self.assertEquals(len(actual), 0) - - def test_flowdetail_get_ids_names(self): - # Get a list of all uuids and names for flowdetails - actual = b_api.flowdetail_get_ids_names() - - # Match it to our in-memory records - self.assertEquals(actual.values(), self.fd_names) - self.assertEquals(actual.keys(), self.fd_ids) diff --git a/taskflow/tests/unit/sql_db_api/test_logbook_api.py b/taskflow/tests/unit/sql_db_api/test_logbook_api.py deleted file mode 100644 index 136027ff..00000000 --- a/taskflow/tests/unit/sql_db_api/test_logbook_api.py +++ /dev/null @@ -1,202 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Import required libraries""" -import unittest2 - -from taskflow import exceptions as exception -from taskflow.openstack.common import uuidutils -from taskflow.patterns import graph_flow as flow -from taskflow.persistence.backends import api as b_api -from taskflow.persistence import logbook -from taskflow.tests import utils - - -class LogBookTest(unittest2.TestCase): - """This class is designed to test the functionality of the backend API's - logbook methods - """ - lb_names = [] - lb_ids = [] - wfs = [] - fd_names = [] - fd_ids = [] - - @classmethod - def setUpClass(cls): - # Create a workflow to create flowdetails with - wf_id = uuidutils.generate_uuid() - wf_name = 'wf-%s' % (wf_id) - - wf = flow.Flow(wf_name, None, wf_id) - cls.wfs.append(wf) - - @classmethod - def tearDownClass(cls): - # Empty the list of workflows - utils.drain(cls.wfs) - - def setUp(self): - # Create a logbook and record its uuid and name - lb_id = uuidutils.generate_uuid() - lb_name = 'lb-%s' % (lb_id) - - b_api.logbook_create(lb_name, lb_id) - self.lb_names.append(lb_name) - self.lb_ids.append(lb_id) - - # Create a flowdetail and record its uuid and name - fd_id = uuidutils.generate_uuid() - fd_name = 'fd-%s' % (fd_id) - - b_api.flowdetail_create(fd_name, self.wfs[0], fd_id) - self.fd_names.append(fd_name) - self.fd_ids.append(fd_id) - - def tearDown(self): - # Destroy all flowdetails and logbooks in the backend - for id in self.fd_ids: - b_api.flowdetail_destroy(id) - for id in self.lb_ids: - b_api.logbook_destroy(id) - - # Clear the lists of logbook and flowdetail uuids and names - utils.drain(self.lb_names) - utils.drain(self.lb_ids) - utils.drain(self.fd_names) - utils.drain(self.fd_ids) - - def test_logbook_create(self): - # Create a logbook and record its uuid and name - lb_id = uuidutils.generate_uuid() - lb_name = 'lb-%s' % (lb_id) - - b_api.logbook_create(lb_name, lb_id) - self.lb_names.append(lb_name) - self.lb_ids.append(lb_id) - - # Check that the created logbook exists in the backend - actual = b_api.logbook_get(lb_id) - - self.assertIsNotNone(actual) - - def test_logbook_destroy(self): - # Delete the last added logbook - id = self.lb_ids.pop() - b_api.logbook_destroy(id) - self.lb_names.pop() - - # Check that the deleted logbook is no longer there - self.assertRaises(exception.NotFound, b_api.logbook_get, - id) - - def test_logbook_save(self): - # Create a generic logbook to save - lb_id = uuidutils.generate_uuid() - lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(lb_name, lb_id) - - # Save the logbook and record its uuid and name - b_api.logbook_save(lb) - self.lb_names.append(lb_name) - self.lb_ids.append(lb_id) - - # Check that the saved logbook exists in the backend - actual = b_api.logbook_get(lb_id) - - self.assertIsNotNone(actual) - # Check that the saved logbook has no flowdetails - self.assertEquals(len(actual), 0) - - # Add a flowdetail to the logbook - fd = b_api.flowdetail_get(self.fd_ids[0]) - lb.add_flow_detail(fd) - - # Save the updated logbook - b_api.logbook_save(lb) - - # Check that the updated logbook is still in the backend - actual = b_api.logbook_get(lb_id) - - self.assertIsNotNone(actual) - # Check that the added flowdetail was recorded - self.assertEquals(len(actual), 1) - - def test_logbook_delete(self): - # Get the logbook to delete - id = self.lb_ids.pop() - lb = b_api.logbook_get(id) - # Delete the logbook from the backend - b_api.logbook_delete(lb) - self.lb_names.pop() - - # Check that the deleted logbook is no longer present - self.assertRaises(exception.NotFound, b_api.logbook_get, - id) - - def test_logbook_get(self): - # Get the logbook from the backend - actual = b_api.logbook_get(self.lb_ids[0]) - - # Check that it is actually a logbook - self.assertIsInstance(actual, logbook.LogBook) - # Check that the name is correct - self.assertEquals(actual.name, self.lb_names[0]) - - def test_logbook_add_flow_detail(self): - # Get the logbook from the backend - actual = b_api.logbook_get(self.lb_ids[0]) - - # Check that it has no flowdetails - self.assertEquals(len(actual), 0) - - # Add a flowdetail to the logbook - b_api.logbook_add_flow_detail(self.lb_ids[0], self.fd_ids[0]) - - # Get the logbook again - actual = b_api.logbook_get(self.lb_ids[0]) - - # Check that the logbook has exactly one flowdetail - self.assertEquals(len(actual), 1) - - def test_logbook_remove_flowdetail(self): - # Add a flowdetail to the first logbook - b_api.logbook_add_flow_detail(self.lb_ids[0], self.fd_ids[0]) - - # Get the first logbook - actual = b_api.logbook_get(self.lb_ids[0]) - - # Check that it has exactly one flowdetail - self.assertEquals(len(actual), 1) - - # Remove the flowdetail from the logbook - b_api.logbook_remove_flowdetail(self.lb_ids[0], self.fd_ids[0]) - - # Get the logbook again - actual = b_api.logbook_get(self.lb_ids[0]) - - # Check that the logbook now has no flowdetails - self.assertEquals(len(actual), 0) - - def test_logbook_get_ids_names(self): - # Get the dict of uuids and names - actual = b_api.logbook_get_ids_names() - - # Check that it matches our in-memory list - self.assertEquals(actual.values(), self.lb_names) - self.assertEquals(actual.keys(), self.lb_ids) diff --git a/taskflow/tests/unit/sql_db_api/test_taskdetail_api.py b/taskflow/tests/unit/sql_db_api/test_taskdetail_api.py deleted file mode 100644 index bb49afc3..00000000 --- a/taskflow/tests/unit/sql_db_api/test_taskdetail_api.py +++ /dev/null @@ -1,190 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Import required libraries""" -import unittest2 - -from taskflow import exceptions as exception -from taskflow.openstack.common import uuidutils -from taskflow.persistence.backends import api as b_api -from taskflow.persistence import taskdetail -from taskflow.tests import utils - - -class TaskDetailTest(unittest2.TestCase): - """This class is designed to test the functionality of the backend API's - taskdetail methods - """ - tsks = [] - td_names = [] - td_ids = [] - - @classmethod - def setUpClass(cls): - # Create a task for taskdetails to be made from - task_id = uuidutils.generate_uuid() - task_name = 'task-%s' % (task_id) - - tsk = utils.DummyTask(task_name, task_id) - tsk.requires.update('r') - tsk.optional.update('o') - tsk.provides.update('p') - - cls.tsks.append(tsk) - - @classmethod - def tearDownClass(cls): - # Clear the tasks - utils.drain(cls.tsks) - - def setUp(self): - # Create a taskdetail and record its uuid and name - td_id = uuidutils.generate_uuid() - td_name = 'td-%s' % (td_id) - - b_api.taskdetail_create(td_name, self.tsks[0], td_id) - self.td_names.append(td_name) - self.td_ids.append(td_id) - - def tearDown(self): - # Destroy all taskdetails from the backend - for id in self.td_ids: - b_api.taskdetail_destroy(id) - - # Clear the list of taskdetail names and uuids - utils.drain(self.td_names) - utils.drain(self.td_ids) - - def test_taskdetail_create(self): - # Create a taskdetail and record its uuid and name - td_id = uuidutils.generate_uuid() - td_name = 'td-%s' % (td_id) - - b_api.taskdetail_create(td_name, self.tsks[0], td_id) - self.td_names.append(td_name) - self.td_ids.append(td_id) - - # Check that the taskdetail is there - actual = b_api.taskdetail_get(td_id) - - self.assertIsNotNone(actual) - - def test_taskdetail_destroy(self): - # Destroy the last added taskdetail - id = self.td_ids.pop() - b_api.taskdetail_destroy(id) - self.td_names.pop() - - # Check that the deleted taskdetail is no longer there - self.assertRaises(exception.NotFound, b_api.taskdetail_get, - id) - - def test_taskdetail_save(self): - # Create a generic taskdetail to save - td_id = uuidutils.generate_uuid() - td_name = 'td-%s' % (td_id) - tsk = self.tsks[0] - td = taskdetail.TaskDetail(td_name, tsk, td_id) - - # Save the generic taskdetail to the backend and record uuid/name - b_api.taskdetail_save(td) - self.td_names.append(td_name) - self.td_ids.append(td_id) - - # Get the created taskdetail and check for default attributes - actual = b_api.taskdetail_get(td_id) - - self.assertIsNotNone(actual) - self.assertIsNone(actual.state) - self.assertIsNone(actual.results) - self.assertIsNone(actual.exception) - self.assertIsNone(actual.stacktrace) - self.assertIsNone(actual.meta) - - # Change the generic taskdetail's attributes - td.state = 'SUCCESS' - td.exception = 'ERROR' - td.stacktrace = 'STACKTRACE' - td.meta = 'META' - - # Save the changed taskdetail - b_api.taskdetail_save(td) - - # Get the updated taskdetail and check for updated attributes - actual = b_api.taskdetail_get(td_id) - - self.assertEquals(actual.state, 'SUCCESS') - self.assertIsNone(actual.results) - self.assertEquals(actual.exception, 'ERROR') - self.assertEquals(actual.stacktrace, 'STACKTRACE') - self.assertEquals(actual.meta, 'META') - - def test_taskdetail_delete(self): - # Get the taskdetail to delete - id = self.td_ids.pop() - td = b_api.taskdetail_get(id) - # Delete the desired taskdetail - b_api.taskdetail_delete(td) - self.td_names.pop() - - # Check that the deleted taskdetail is no longer there - self.assertRaises(exception.NotFound, b_api.taskdetail_get, - id) - - def test_taskdetail_get(self): - # Get the first taskdetail - actual = b_api.taskdetail_get(self.td_ids[0]) - - # Check that it is actually a taskdetail - self.assertIsInstance(actual, taskdetail.TaskDetail) - # Check that its name is what we expect - self.assertEquals(actual.name, self.td_names[0]) - - def test_taskdetail_update(self): - # Get the first taskdetail and check for default attributes - actual = b_api.taskdetail_get(self.td_ids[0]) - - self.assertIsNone(actual.state) - self.assertIsNone(actual.results) - self.assertIsNone(actual.exception) - self.assertIsNone(actual.stacktrace) - self.assertIsNone(actual.meta) - - # Prepare attributes for updating - values = dict(state='SUCCESS', exception='ERROR', - stacktrace='STACKTRACE', meta='META') - - # Update attributes - b_api.taskdetail_update(self.td_ids[0], values) - - # Get the updated taskdetila and check for updated attributes - actual = b_api.taskdetail_get(self.td_ids[0]) - - self.assertEquals(actual.state, 'SUCCESS') - self.assertIsNone(actual.results) - self.assertEquals(actual.exception, 'ERROR') - self.assertEquals(actual.stacktrace, 'STACKTRACE') - self.assertEquals(actual.meta, 'META') - - def test_taskdetail_get_ids_names(self): - # Get dict of uuids and names - actual = b_api.taskdetail_get_ids_names() - - # Check that it matches our in-memory records - self.assertEquals(actual.values(), self.td_names) - self.assertEquals(actual.keys(), self.td_ids) diff --git a/taskflow/utils.py b/taskflow/utils.py index 560f6c51..8919056b 100644 --- a/taskflow/utils.py +++ b/taskflow/utils.py @@ -399,6 +399,45 @@ class ReaderWriterLock(object): self.readers_ok.release() +class MultiLock(object): + """A class which can attempt to obtain many locks at once and release + said locks when exiting. + + Useful as a context manager around many locks (instead of having to nest + said individual context managers). + """ + + def __init__(self, locks): + self._locks = locks + self._locked = list([False] * len(locks)) + + def __enter__(self): + + def is_locked(lock): + # NOTE(harlowja): the threading2 lock doesn't seem to have this + # attribute, so thats why we are checking it existing first. + if hasattr(lock, 'locked'): + return lock.locked() + return False + + for i in xrange(0, len(self._locked)): + if self._locked[i] or is_locked(self._locks[i]): + raise threading.ThreadError("Lock %s not previously released" + % (i + 1)) + self._locked[i] = False + for (i, lock) in enumerate(self._locks): + self._locked[i] = lock.acquire() + + def __exit__(self, type, value, traceback): + for (i, locked) in enumerate(self._locked): + try: + if locked: + self._locks[i].release() + self._locked[i] = False + except threading.ThreadError: + LOG.exception("Unable to release lock %s", i + 1) + + class LazyPluggable(object): """A pluggable backend loaded lazily based on some value.""" @@ -430,61 +469,3 @@ class LazyPluggable(object): def __getattr__(self, key): backend = self.__get_backend() return getattr(backend, key) - - -class LockingDict(object): - """This class is designed to provide threadsafe element access in the form - of a dictionary. - """ - - def __init__(self): - """Constructor""" - self._container = {} - self._lock = ReaderWriterLock() - - def __getitem__(self, key): - """Return one item referenced by key""" - with self._lock.acquire(read=True): - retVal = self._container[key] - return retVal - - def __setitem__(self, key, value): - """Set one item referenced by key to value""" - with self._lock.acquire(read=False): - self._container[key] = value - - def __delitem__(self, key): - """Delete the item referenced by key""" - with self._lock.acquire(read=False): - del self._container[key] - - def __contains__(self, item): - """Check if the item is contained by this dict""" - with self._lock.acquire(read=True): - return item in self._container - - def keys(self): - """Return a list of the keys in a threadsafe manner""" - retVal = [] - with self._lock.acquire(read=True): - return list(self._container.iterkeys()) - - return retVal - - def values(self): - """Return a list of the values in a threadsafe manner""" - retVal = [] - with self._lock.acquire(read=True): - return list(self._container.itervalues()) - - return retVal - - def items(self): - """Return a threadsafe list of the items""" - retVal = [] - - with self._lock.acquire(read=True): - for item in self._container.items(): - retVal.append(item) - - return retVal diff --git a/tools/pip-requires b/tools/pip-requires index 072424ca..6587da87 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -1,13 +1,16 @@ # Packages needed for using this library. anyjson>=0.2.4 -oslo.config>=1.1.0 eventlet>=0.9.17 iso8601 netaddr>=0.7.6 # Very nice graph library networkx>=1.5 +# Need currently the newest version of oslo.config for the DeprecatedOpt +# used inside oslo database layer. +-f http://tarballs.openstack.org/oslo.config/oslo.config-1.2.0a3.tar.gz#egg=oslo.config-1.2.0a3 +oslo.config>=1.2.0a3 six # Only needed if database backend used. sqlalchemy>=0.7,<=0.7.99 -sqlalchemy-migrate>=0.7 +alembic>=0.4.1 Babel>=0.9.6