From 1e3dc09453e7e179fba613b61317ebad1556b77b Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 9 Jun 2015 18:19:11 -0700 Subject: [PATCH] Rename logbook module -> models module Since this module contains more than the logbook class and really is a our generic models that are used to hold the runtime structure it is more appropriate to place it under a models module and deprecate the usage of the old module by placing a warning there (so that when it is imported that warning is triggered). Change-Id: I79def5ee08f560d38f2c9dcefd0b33becc2a4d36 --- doc/source/engines.rst | 8 +- doc/source/jobs.rst | 2 +- doc/source/persistence.rst | 22 +- doc/source/resumption.rst | 14 +- taskflow/examples/persistence_example.py | 4 +- taskflow/examples/tox_conductor.py | 8 +- .../persistence/backends/impl_sqlalchemy.py | 10 +- .../84d6e888850_add_task_detail_type.py | 4 +- .../persistence/backends/sqlalchemy/tables.py | 4 +- taskflow/persistence/base.py | 4 +- taskflow/persistence/logbook.py | 889 +---------------- taskflow/persistence/models.py | 892 ++++++++++++++++++ taskflow/persistence/path_based.py | 28 +- taskflow/storage.py | 26 +- taskflow/tests/unit/persistence/base.py | 102 +- taskflow/tests/unit/test_engines.py | 4 +- taskflow/tests/unit/test_storage.py | 10 +- taskflow/utils/persistence_utils.py | 10 +- 18 files changed, 1039 insertions(+), 1002 deletions(-) create mode 100644 taskflow/persistence/models.py diff --git a/doc/source/engines.rst b/doc/source/engines.rst index 2f162c891..fa37274ec 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -246,7 +246,7 @@ Preparation This stage (see :py:func:`~taskflow.engines.base.Engine.prepare`) starts by setting up the storage needed for all atoms in the compiled graph, ensuring -that corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail` (or +that corresponding :py:class:`~taskflow.persistence.models.AtomDetail` (or subclass of) objects are created for each node in the graph. Validation @@ -293,7 +293,7 @@ for things like retry atom which can influence what a tasks intention should be :py:class:`~taskflow.engines.action_engine.analyzer.Analyzer` helper object which was designed to provide helper methods for this analysis). Once these intentions are determined and associated with each task (the intention is -also stored in the :py:class:`~taskflow.persistence.logbook.AtomDetail` object) +also stored in the :py:class:`~taskflow.persistence.models.AtomDetail` object) the :ref:`scheduling ` stage starts. .. _scheduling: @@ -323,8 +323,8 @@ submitted to complete. Once one of the future objects completes (or fails) that atoms result will be examined and finalized using a :py:class:`~taskflow.engines.action_engine.completer.Completer` implementation. It typically will persist results to a provided persistence backend (saved -into the corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail` -and :py:class:`~taskflow.persistence.logbook.FlowDetail` objects via the +into the corresponding :py:class:`~taskflow.persistence.models.AtomDetail` +and :py:class:`~taskflow.persistence.models.FlowDetail` objects via the :py:class:`~taskflow.storage.Storage` helper) and reflect the new state of the atom. At this point what typically happens falls into two categories, one for if that atom failed and one for if it did not. If the atom diff --git a/doc/source/jobs.rst b/doc/source/jobs.rst index 2b826da43..3cd7f95fb 100644 --- a/doc/source/jobs.rst +++ b/doc/source/jobs.rst @@ -30,7 +30,7 @@ Definitions Jobs A :py:class:`job ` consists of a unique identifier, name, and a reference to a :py:class:`logbook - ` which contains the details of the + ` which contains the details of the work that has been or should be/will be completed to finish the work that has been created for that job. diff --git a/doc/source/persistence.rst b/doc/source/persistence.rst index 70d6b7d77..8b451d42d 100644 --- a/doc/source/persistence.rst +++ b/doc/source/persistence.rst @@ -40,38 +40,38 @@ On :doc:`engine ` construction typically a backend (it can be optional) will be provided which satisfies the :py:class:`~taskflow.persistence.base.Backend` abstraction. Along with providing a backend object a -:py:class:`~taskflow.persistence.logbook.FlowDetail` object will also be +:py:class:`~taskflow.persistence.models.FlowDetail` object will also be created and provided (this object will contain the details about the flow to be ran) to the engine constructor (or associated :py:meth:`load() ` helper functions). Typically a -:py:class:`~taskflow.persistence.logbook.FlowDetail` object is created from a -:py:class:`~taskflow.persistence.logbook.LogBook` object (the book object acts -as a type of container for :py:class:`~taskflow.persistence.logbook.FlowDetail` -and :py:class:`~taskflow.persistence.logbook.AtomDetail` objects). +:py:class:`~taskflow.persistence.models.FlowDetail` object is created from a +:py:class:`~taskflow.persistence.models.LogBook` object (the book object acts +as a type of container for :py:class:`~taskflow.persistence.models.FlowDetail` +and :py:class:`~taskflow.persistence.models.AtomDetail` objects). **Preparation**: Once an engine starts to run it will create a :py:class:`~taskflow.storage.Storage` object which will act as the engines interface to the underlying backend storage objects (it provides helper functions that are commonly used by the engine, avoiding repeating code when interacting with the provided -:py:class:`~taskflow.persistence.logbook.FlowDetail` and +:py:class:`~taskflow.persistence.models.FlowDetail` and :py:class:`~taskflow.persistence.base.Backend` objects). As an engine initializes it will extract (or create) -:py:class:`~taskflow.persistence.logbook.AtomDetail` objects for each atom in +:py:class:`~taskflow.persistence.models.AtomDetail` objects for each atom in the workflow the engine will be executing. **Execution:** When an engine beings to execute (see :doc:`engine ` for more of the details about how an engine goes about this process) it will examine any previously existing -:py:class:`~taskflow.persistence.logbook.AtomDetail` objects to see if they can +:py:class:`~taskflow.persistence.models.AtomDetail` objects to see if they can be used for resuming; see :doc:`resumption ` for more details on this subject. For atoms which have not finished (or did not finish correctly from a previous run) they will begin executing only after any dependent inputs are ready. This is done by analyzing the execution graph and looking at -predecessor :py:class:`~taskflow.persistence.logbook.AtomDetail` outputs and +predecessor :py:class:`~taskflow.persistence.models.AtomDetail` outputs and states (which may have been persisted in a past run). This will result in either using their previous information or by running those predecessors and -saving their output to the :py:class:`~taskflow.persistence.logbook.FlowDetail` +saving their output to the :py:class:`~taskflow.persistence.models.FlowDetail` and :py:class:`~taskflow.persistence.base.Backend` objects. This execution, analysis and interaction with the storage objects continues (what is described here is a simplification of what really happens; which is quite a bit @@ -288,7 +288,7 @@ Interfaces Models ====== -.. automodule:: taskflow.persistence.logbook +.. automodule:: taskflow.persistence.models Implementations =============== diff --git a/doc/source/resumption.rst b/doc/source/resumption.rst index 3be864f69..4a85ab6ac 100644 --- a/doc/source/resumption.rst +++ b/doc/source/resumption.rst @@ -46,7 +46,7 @@ name serves a special purpose in the resumption process (as well as serving a useful purpose when running, allowing for atom identification in the :doc:`notification ` process). The reason for having names is that an atom in a flow needs to be somehow matched with (a potentially) -existing :py:class:`~taskflow.persistence.logbook.AtomDetail` during engine +existing :py:class:`~taskflow.persistence.models.AtomDetail` during engine resumption & subsequent running. The match should be: @@ -71,9 +71,9 @@ Scenarios ========= When new flow is loaded into engine, there is no persisted data for it yet, so -a corresponding :py:class:`~taskflow.persistence.logbook.FlowDetail` object +a corresponding :py:class:`~taskflow.persistence.models.FlowDetail` object will be created, as well as a -:py:class:`~taskflow.persistence.logbook.AtomDetail` object for each atom that +:py:class:`~taskflow.persistence.models.AtomDetail` object for each atom that is contained in it. These will be immediately saved into the persistence backend that is configured. If no persistence backend is configured, then as expected nothing will be saved and the atoms and flow will be ran in a @@ -94,7 +94,7 @@ When the factory function mentioned above returns the exact same the flow and atoms (no changes are performed). **Runtime change:** Nothing should be done -- the engine will re-associate -atoms with :py:class:`~taskflow.persistence.logbook.AtomDetail` objects by name +atoms with :py:class:`~taskflow.persistence.models.AtomDetail` objects by name and then the engine resumes. Atom was added @@ -105,7 +105,7 @@ in (for example for changing the runtime structure of what was previously ran in the first run). **Runtime change:** By default when the engine resumes it will notice that a -corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail` does not +corresponding :py:class:`~taskflow.persistence.models.AtomDetail` does not exist and one will be created and associated. Atom was removed @@ -134,7 +134,7 @@ factory should replace this name where it was being used previously. exist when a new atom is added. In the future TaskFlow could make this easier by providing a ``upgrade()`` function that can be used to give users the ability to upgrade atoms before running (manual introspection & modification of -a :py:class:`~taskflow.persistence.logbook.LogBook` can be done before engine +a :py:class:`~taskflow.persistence.models.LogBook` can be done before engine loading and running to accomplish this in the meantime). Atom was split in two atoms or merged @@ -150,7 +150,7 @@ exist when a new atom is added or removed. In the future TaskFlow could make this easier by providing a ``migrate()`` function that can be used to give users the ability to migrate atoms previous data before running (manual introspection & modification of a -:py:class:`~taskflow.persistence.logbook.LogBook` can be done before engine +:py:class:`~taskflow.persistence.models.LogBook` can be done before engine loading and running to accomplish this in the meantime). Flow structure was changed diff --git a/taskflow/examples/persistence_example.py b/taskflow/examples/persistence_example.py index c911c2f18..de9b42746 100644 --- a/taskflow/examples/persistence_example.py +++ b/taskflow/examples/persistence_example.py @@ -31,7 +31,7 @@ sys.path.insert(0, self_dir) from taskflow import engines from taskflow.patterns import linear_flow as lf -from taskflow.persistence import logbook +from taskflow.persistence import models from taskflow import task from taskflow.utils import persistence_utils as p_utils @@ -94,7 +94,7 @@ with eu.get_backend(backend_uri) as backend: # Make a flow that will blow up if the file didn't exist previously, if it # did exist, assume we won't blow up (and therefore this shows the undo # and redo that a flow will go through). - book = logbook.LogBook("my-test") + book = models.LogBook("my-test") flow = make_flow(blowup=blowup) eu.print_wrapped("Running") try: diff --git a/taskflow/examples/tox_conductor.py b/taskflow/examples/tox_conductor.py index feff42453..66e575b55 100644 --- a/taskflow/examples/tox_conductor.py +++ b/taskflow/examples/tox_conductor.py @@ -42,7 +42,7 @@ from taskflow import engines from taskflow.jobs import backends as boards from taskflow.patterns import linear_flow from taskflow.persistence import backends as persistence -from taskflow.persistence import logbook +from taskflow.persistence import models from taskflow import task from taskflow.utils import threading_utils @@ -145,9 +145,9 @@ def generate_reviewer(client, saver, name=NAME): def make_save_book(saver, review_id): # Record what we want to happen (sometime in the future). - book = logbook.LogBook("book_%s" % review_id) - detail = logbook.FlowDetail("flow_%s" % review_id, - uuidutils.generate_uuid()) + book = models.LogBook("book_%s" % review_id) + detail = models.FlowDetail("flow_%s" % review_id, + uuidutils.generate_uuid()) book.add(detail) # Associate the factory method we want to be called (in the future) # with the book, so that the conductor will be able to call into diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index d5342018b..b9aef44cc 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -34,7 +34,7 @@ from taskflow import logging from taskflow.persistence.backends.sqlalchemy import migration from taskflow.persistence.backends.sqlalchemy import tables from taskflow.persistence import base -from taskflow.persistence import logbook +from taskflow.persistence import models from taskflow.types import failure from taskflow.utils import eventlet_utils from taskflow.utils import misc @@ -194,16 +194,16 @@ class _Alchemist(object): @staticmethod def convert_flow_detail(row): - return logbook.FlowDetail.from_dict(dict(row.items())) + return models.FlowDetail.from_dict(dict(row.items())) @staticmethod def convert_book(row): - return logbook.LogBook.from_dict(dict(row.items())) + return models.LogBook.from_dict(dict(row.items())) @staticmethod def convert_atom_detail(row): row = dict(row.items()) - atom_cls = logbook.atom_detail_class(row.pop('atom_type')) + atom_cls = models.atom_detail_class(row.pop('atom_type')) return atom_cls.from_dict(row) def atom_query_iter(self, conn, parent_uuid): @@ -457,7 +457,7 @@ class Connection(base.Connection): def _insert_atom_details(self, conn, ad, parent_uuid): value = ad.to_dict() value['parent_uuid'] = parent_uuid - value['atom_type'] = logbook.atom_detail_type(ad) + value['atom_type'] = models.atom_detail_type(ad) conn.execute(sql.insert(self._tables.atomdetails, value)) def _update_atom_details(self, conn, ad, e_ad): diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/versions/84d6e888850_add_task_detail_type.py b/taskflow/persistence/backends/sqlalchemy/alembic/versions/84d6e888850_add_task_detail_type.py index 756cf93ae..47441dcf2 100644 --- a/taskflow/persistence/backends/sqlalchemy/alembic/versions/84d6e888850_add_task_detail_type.py +++ b/taskflow/persistence/backends/sqlalchemy/alembic/versions/84d6e888850_add_task_detail_type.py @@ -29,11 +29,11 @@ down_revision = '1c783c0c2875' from alembic import op import sqlalchemy as sa -from taskflow.persistence import logbook +from taskflow.persistence import models def upgrade(): - atom_types = sa.Enum(*logbook.ATOM_TYPES, name='atom_types') + atom_types = sa.Enum(*models.ATOM_TYPES, name='atom_types') column = sa.Column('atom_type', atom_types) bind = op.get_bind() impl = atom_types.dialect_impl(bind.dialect) diff --git a/taskflow/persistence/backends/sqlalchemy/tables.py b/taskflow/persistence/backends/sqlalchemy/tables.py index 47306b9c0..28acca1a0 100644 --- a/taskflow/persistence/backends/sqlalchemy/tables.py +++ b/taskflow/persistence/backends/sqlalchemy/tables.py @@ -22,7 +22,7 @@ from oslo_utils import uuidutils from sqlalchemy import Table, Column, String, ForeignKey, DateTime, Enum from sqlalchemy import types -from taskflow.persistence import logbook +from taskflow.persistence import models from taskflow import states Tables = collections.namedtuple('Tables', @@ -92,7 +92,7 @@ def fetch(metadata): default=uuidutils.generate_uuid), Column('failure', Json), Column('results', Json), - Column('atom_type', Enum(*logbook.ATOM_TYPES, + Column('atom_type', Enum(*models.ATOM_TYPES, name='atom_types')), Column('intention', Enum(*states.INTENTIONS, name='intentions'))) diff --git a/taskflow/persistence/base.py b/taskflow/persistence/base.py index 91fd095e9..7f08c9253 100644 --- a/taskflow/persistence/base.py +++ b/taskflow/persistence/base.py @@ -18,7 +18,7 @@ import abc import six -from taskflow.persistence import logbook +from taskflow.persistence import models @six.add_metaclass(abc.ABCMeta) @@ -125,5 +125,5 @@ class Connection(object): def _format_atom(atom_detail): return { 'atom': atom_detail.to_dict(), - 'type': logbook.atom_detail_type(atom_detail), + 'type': models.atom_detail_type(atom_detail), } diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index c7a6eae51..162730317 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -15,878 +14,24 @@ # License for the specific language governing permissions and limitations # under the License. -import abc -import copy +from debtcollector import removals -from oslo_utils import timeutils -from oslo_utils import uuidutils -import six +from taskflow.persistence import models -from taskflow import exceptions as exc -from taskflow import logging -from taskflow import states -from taskflow.types import failure as ft -LOG = logging.getLogger(__name__) +# TODO(harlowja): remove me in a future version, since the models +# module is more appropriately named to what the objects in it are used for... +removals.removed_module(__name__, replacement="'%s'" % models.__name__, + version="0.11", removal_version='?', + stacklevel=4) -# Internal helpers... - - -def _copy_function(deep_copy): - if deep_copy: - return copy.deepcopy - else: - return lambda x: x - - -def _safe_marshal_time(when): - if not when: - return None - return timeutils.marshall_now(now=when) - - -def _safe_unmarshal_time(when): - if not when: - return None - return timeutils.unmarshall_time(when) - - -def _fix_meta(data): - # Handle the case where older schemas allowed this to be non-dict by - # correcting this case by replacing it with a dictionary when a non-dict - # is found. - meta = data.get('meta') - if not isinstance(meta, dict): - meta = {} - return meta - - -class LogBook(object): - """A collection of flow details and associated metadata. - - Typically this class contains a collection of flow detail entries - for a given engine (or job) so that those entities can track what 'work' - has been completed for resumption, reverting and miscellaneous tracking - purposes. - - The data contained within this class need **not** be persisted to the - backend storage in real time. The data in this class will only be - guaranteed to be persisted when a save occurs via some backend - connection. - - NOTE(harlowja): the naming of this class is analogous to a ship's log or a - similar type of record used in detailing work that has been completed (or - work that has not been completed). - - :ivar created_at: A ``datetime.datetime`` object of when this logbook - was created. - :ivar updated_at: A ``datetime.datetime`` object of when this logbook - was last updated at. - :ivar meta: A dictionary of meta-data associated with this logbook. - """ - def __init__(self, name, uuid=None): - if uuid: - self._uuid = uuid - else: - self._uuid = uuidutils.generate_uuid() - self._name = name - self._flowdetails_by_id = {} - self.created_at = timeutils.utcnow() - self.updated_at = None - self.meta = {} - - def add(self, fd): - """Adds a new flow detail into this logbook. - - NOTE(harlowja): if an existing flow detail exists with the same - uuid the existing one will be overwritten with the newly provided - one. - - Does not *guarantee* that the details will be immediately saved. - """ - self._flowdetails_by_id[fd.uuid] = fd - self.updated_at = timeutils.utcnow() - - def find(self, flow_uuid): - """Locate the flow detail corresponding to the given uuid. - - :returns: the flow detail with that uuid - :rtype: :py:class:`.FlowDetail` (or ``None`` if not found) - """ - return self._flowdetails_by_id.get(flow_uuid, None) - - def merge(self, lb, deep_copy=False): - """Merges the current object state with the given ones state. - - If ``deep_copy`` is provided as truthy then the - local object will use ``copy.deepcopy`` to replace this objects - local attributes with the provided objects attributes (**only** if - there is a difference between this objects attributes and the - provided attributes). If ``deep_copy`` is falsey (the default) then a - reference copy will occur instead when a difference is detected. - - NOTE(harlowja): If the provided object is this object itself - then **no** merging is done. Also note that this does **not** merge - the flow details contained in either. - - :returns: this logbook (freshly merged with the incoming object) - :rtype: :py:class:`.LogBook` - """ - if lb is self: - return self - copy_fn = _copy_function(deep_copy) - if self.meta != lb.meta: - self.meta = copy_fn(lb.meta) - if lb.created_at != self.created_at: - self.created_at = copy_fn(lb.created_at) - if lb.updated_at != self.updated_at: - self.updated_at = copy_fn(lb.updated_at) - return self - - def to_dict(self, marshal_time=False): - """Translates the internal state of this object to a ``dict``. - - NOTE(harlowja): The returned ``dict`` does **not** include any - contained flow details. - - :returns: this logbook in ``dict`` form - """ - if not marshal_time: - marshal_fn = lambda x: x - else: - marshal_fn = _safe_marshal_time - return { - 'name': self.name, - 'meta': self.meta, - 'uuid': self.uuid, - 'updated_at': marshal_fn(self.updated_at), - 'created_at': marshal_fn(self.created_at), - } - - @classmethod - def from_dict(cls, data, unmarshal_time=False): - """Translates the given ``dict`` into an instance of this class. - - NOTE(harlowja): the ``dict`` provided should come from a prior - call to :meth:`.to_dict`. - - :returns: a new logbook - :rtype: :py:class:`.LogBook` - """ - if not unmarshal_time: - unmarshal_fn = lambda x: x - else: - unmarshal_fn = _safe_unmarshal_time - obj = cls(data['name'], uuid=data['uuid']) - obj.updated_at = unmarshal_fn(data['updated_at']) - obj.created_at = unmarshal_fn(data['created_at']) - obj.meta = _fix_meta(data) - return obj - - @property - def uuid(self): - """The unique identifer of this logbook.""" - return self._uuid - - @property - def name(self): - """The name of this logbook.""" - return self._name - - def __iter__(self): - for fd in six.itervalues(self._flowdetails_by_id): - yield fd - - def __len__(self): - return len(self._flowdetails_by_id) - - def copy(self, retain_contents=True): - """Copies this logbook. - - Creates a shallow copy of this logbook. If this logbook contains - flow details and ``retain_contents`` is truthy (the default) then - the flow details container will be shallow copied (the flow details - contained there-in will **not** be copied). If ``retain_contents`` is - falsey then the copied logbook will have **no** contained flow - details (but it will have the rest of the local objects attributes - copied). - - :returns: a new logbook - :rtype: :py:class:`.LogBook` - """ - clone = copy.copy(self) - if not retain_contents: - clone._flowdetails_by_id = {} - else: - clone._flowdetails_by_id = self._flowdetails_by_id.copy() - if self.meta: - clone.meta = self.meta.copy() - return clone - - -class FlowDetail(object): - """A collection of atom details and associated metadata. - - Typically this class contains a collection of atom detail entries that - represent the atoms in a given flow structure (along with any other needed - metadata relevant to that flow). - - The data contained within this class need **not** be persisted to the - backend storage in real time. The data in this class will only be - guaranteed to be persisted when a save (or update) occurs via some backend - connection. - - :ivar state: The state of the flow associated with this flow detail. - :ivar meta: A dictionary of meta-data associated with this flow detail. - """ - def __init__(self, name, uuid): - self._uuid = uuid - self._name = name - self._atomdetails_by_id = {} - self.state = None - self.meta = {} - - def update(self, fd): - """Updates the objects state to be the same as the given one. - - This will assign the private and public attributes of the given - flow detail directly to this object (replacing any existing - attributes in this object; even if they are the **same**). - - NOTE(harlowja): If the provided object is this object itself - then **no** update is done. - - :returns: this flow detail - :rtype: :py:class:`.FlowDetail` - """ - if fd is self: - return self - self._atomdetails_by_id = fd._atomdetails_by_id - self.state = fd.state - self.meta = fd.meta - return self - - def merge(self, fd, deep_copy=False): - """Merges the current object state with the given one's state. - - If ``deep_copy`` is provided as truthy then the - local object will use ``copy.deepcopy`` to replace this objects - local attributes with the provided objects attributes (**only** if - there is a difference between this objects attributes and the - provided attributes). If ``deep_copy`` is falsey (the default) then a - reference copy will occur instead when a difference is detected. - - NOTE(harlowja): If the provided object is this object itself - then **no** merging is done. Also this does **not** merge the atom - details contained in either. - - :returns: this flow detail (freshly merged with the incoming object) - :rtype: :py:class:`.FlowDetail` - """ - if fd is self: - return self - copy_fn = _copy_function(deep_copy) - if self.meta != fd.meta: - self.meta = copy_fn(fd.meta) - if self.state != fd.state: - # NOTE(imelnikov): states are just strings, no need to copy. - self.state = fd.state - return self - - def copy(self, retain_contents=True): - """Copies this flow detail. - - Creates a shallow copy of this flow detail. If this detail contains - flow details and ``retain_contents`` is truthy (the default) then - the atom details container will be shallow copied (the atom details - contained there-in will **not** be copied). If ``retain_contents`` is - falsey then the copied flow detail will have **no** contained atom - details (but it will have the rest of the local objects attributes - copied). - - :returns: a new flow detail - :rtype: :py:class:`.FlowDetail` - """ - clone = copy.copy(self) - if not retain_contents: - clone._atomdetails_by_id = {} - else: - clone._atomdetails_by_id = self._atomdetails_by_id.copy() - if self.meta: - clone.meta = self.meta.copy() - return clone - - def to_dict(self): - """Translates the internal state of this object to a ``dict``. - - NOTE(harlowja): The returned ``dict`` does **not** include any - contained atom details. - - :returns: this flow detail in ``dict`` form - """ - return { - 'name': self.name, - 'meta': self.meta, - 'state': self.state, - 'uuid': self.uuid, - } - - @classmethod - def from_dict(cls, data): - """Translates the given ``dict`` into an instance of this class. - - NOTE(harlowja): the ``dict`` provided should come from a prior - call to :meth:`.to_dict`. - - :returns: a new flow detail - :rtype: :py:class:`.FlowDetail` - """ - obj = cls(data['name'], data['uuid']) - obj.state = data.get('state') - obj.meta = _fix_meta(data) - return obj - - def add(self, ad): - """Adds a new atom detail into this flow detail. - - NOTE(harlowja): if an existing atom detail exists with the same - uuid the existing one will be overwritten with the newly provided - one. - - Does not *guarantee* that the details will be immediately saved. - """ - self._atomdetails_by_id[ad.uuid] = ad - - def find(self, ad_uuid): - """Locate the atom detail corresponding to the given uuid. - - :returns: the atom detail with that uuid - :rtype: :py:class:`.AtomDetail` (or ``None`` if not found) - """ - return self._atomdetails_by_id.get(ad_uuid) - - @property - def uuid(self): - """The unique identifer of this flow detail.""" - return self._uuid - - @property - def name(self): - """The name of this flow detail.""" - return self._name - - def __iter__(self): - for ad in six.itervalues(self._atomdetails_by_id): - yield ad - - def __len__(self): - return len(self._atomdetails_by_id) - - -@six.add_metaclass(abc.ABCMeta) -class AtomDetail(object): - """A collection of atom specific runtime information and metadata. - - This is a base **abstract** class that contains attributes that are used - to connect a atom to the persistence layer before, during, or after it is - running. It includes any results it may have produced, any state that it - may be in (for example ``FAILURE``), any exception that occurred when - running, and any associated stacktrace that may have occurring during an - exception being thrown. It may also contain any other metadata that - should also be stored along-side the details about the connected atom. - - The data contained within this class need **not** be persisted to the - backend storage in real time. The data in this class will only be - guaranteed to be persisted when a save (or update) occurs via some backend - connection. - - :ivar state: The state of the atom associated with this atom detail. - :ivar intention: The execution strategy of the atom associated - with this atom detail (used by an engine/others to - determine if the associated atom needs to be - executed, reverted, retried and so-on). - :ivar meta: A dictionary of meta-data associated with this atom detail. - :ivar version: A version tuple or string that represents the - atom version this atom detail is associated with (typically - used for introspection and any data migration - strategies). - :ivar results: Any results the atom produced from either its - ``execute`` method or from other sources. - :ivar failure: If the atom failed (possibly due to its ``execute`` - method raising) this will be a - :py:class:`~taskflow.types.failure.Failure` object that - represents that failure (if there was no failure this - will be set to none). - """ - - def __init__(self, name, uuid): - self._uuid = uuid - self._name = name - self.state = None - self.intention = states.EXECUTE - self.results = None - self.failure = None - self.meta = {} - self.version = None - - @staticmethod - def _was_failure(state, result): - # Internal helper method... - return state == states.FAILURE and isinstance(result, ft.Failure) - - @property - def last_results(self): - """Gets the atoms last result. - - If the atom has produced many results (for example if it has been - retried, reverted, executed and ...) this returns the last one of - many results. - """ - return self.results - - def update(self, ad): - """Updates the object's state to be the same as the given one. - - This will assign the private and public attributes of the given - atom detail directly to this object (replacing any existing - attributes in this object; even if they are the **same**). - - NOTE(harlowja): If the provided object is this object itself - then **no** update is done. - - :returns: this atom detail - :rtype: :py:class:`.AtomDetail` - """ - if ad is self: - return self - self.state = ad.state - self.intention = ad.intention - self.meta = ad.meta - self.failure = ad.failure - self.results = ad.results - self.version = ad.version - return self - - @abc.abstractmethod - def merge(self, other, deep_copy=False): - """Merges the current object state with the given ones state. - - If ``deep_copy`` is provided as truthy then the - local object will use ``copy.deepcopy`` to replace this objects - local attributes with the provided objects attributes (**only** if - there is a difference between this objects attributes and the - provided attributes). If ``deep_copy`` is falsey (the default) then a - reference copy will occur instead when a difference is detected. - - NOTE(harlowja): If the provided object is this object itself - then **no** merging is done. Do note that **no** results are merged - in this method. That operation **must** to be the responsibilty of - subclasses to implement and override this abstract method - and provide that merging themselves as they see fit. - - :returns: this atom detail (freshly merged with the incoming object) - :rtype: :py:class:`.AtomDetail` - """ - copy_fn = _copy_function(deep_copy) - # NOTE(imelnikov): states and intentions are just strings, - # so there is no need to copy them (strings are immutable in python). - self.state = other.state - self.intention = other.intention - if self.failure != other.failure: - # NOTE(imelnikov): we can't just deep copy Failures, as they - # contain tracebacks, which are not copyable. - if other.failure: - if deep_copy: - self.failure = other.failure.copy() - else: - self.failure = other.failure - else: - self.failure = None - if self.meta != other.meta: - self.meta = copy_fn(other.meta) - if self.version != other.version: - self.version = copy_fn(other.version) - return self - - @abc.abstractmethod - def put(self, state, result): - """Puts a result (acquired in the given state) into this detail.""" - - def to_dict(self): - """Translates the internal state of this object to a ``dict``. - - :returns: this atom detail in ``dict`` form - """ - if self.failure: - failure = self.failure.to_dict() - else: - failure = None - return { - 'failure': failure, - 'meta': self.meta, - 'name': self.name, - 'results': self.results, - 'state': self.state, - 'version': self.version, - 'intention': self.intention, - 'uuid': self.uuid, - } - - @classmethod - def from_dict(cls, data): - """Translates the given ``dict`` into an instance of this class. - - NOTE(harlowja): the ``dict`` provided should come from a prior - call to :meth:`.to_dict`. - - :returns: a new atom detail - :rtype: :py:class:`.AtomDetail` - """ - obj = cls(data['name'], data['uuid']) - obj.state = data.get('state') - obj.intention = data.get('intention') - obj.results = data.get('results') - obj.version = data.get('version') - obj.meta = _fix_meta(data) - failure = data.get('failure') - if failure: - obj.failure = ft.Failure.from_dict(failure) - return obj - - @property - def uuid(self): - """The unique identifer of this atom detail.""" - return self._uuid - - @property - def name(self): - """The name of this atom detail.""" - return self._name - - @abc.abstractmethod - def reset(self, state): - """Resets this atom detail and sets ``state`` attribute value.""" - - @abc.abstractmethod - def copy(self): - """Copies this atom detail.""" - - -class TaskDetail(AtomDetail): - """A task detail (an atom detail typically associated with a |tt| atom). - - .. |tt| replace:: :py:class:`~taskflow.task.BaseTask` - """ - - def reset(self, state): - """Resets this task detail and sets ``state`` attribute value. - - This sets any previously set ``results`` and ``failure`` attributes - back to ``None`` and sets the state to the provided one, as well as - setting this task details ``intention`` attribute to ``EXECUTE``. - """ - self.results = None - self.failure = None - self.state = state - self.intention = states.EXECUTE - - def put(self, state, result): - """Puts a result (acquired in the given state) into this detail. - - If the result is a :py:class:`~taskflow.types.failure.Failure` object - then the ``failure`` attribute will be set (and the ``results`` - attribute will be set to ``None``); if the result is not a - :py:class:`~taskflow.types.failure.Failure` object then the - ``results`` attribute will be set (and the ``failure`` attribute - will be set to ``None``). In either case the ``state`` - attribute will be set to the provided state. - """ - was_altered = False - if self.state != state: - self.state = state - was_altered = True - if self._was_failure(state, result): - if self.failure != result: - self.failure = result - was_altered = True - if self.results is not None: - self.results = None - was_altered = True - else: - # We don't really have the ability to determine equality of - # task (user) results at the current time, without making - # potentially bad guesses, so assume the task detail always needs - # to be saved if they are not exactly equivalent... - if self.results is not result: - self.results = result - was_altered = True - if self.failure is not None: - self.failure = None - was_altered = True - return was_altered - - def merge(self, other, deep_copy=False): - """Merges the current task detail with the given one. - - NOTE(harlowja): This merge does **not** copy and replace - the ``results`` attribute if it differs. Instead the current - objects ``results`` attribute directly becomes (via assignment) the - other objects ``results`` attribute. Also note that if the provided - object is this object itself then **no** merging is done. - - See: https://bugs.launchpad.net/taskflow/+bug/1452978 for - what happens if this is copied at a deeper level (for example by - using ``copy.deepcopy`` or by using ``copy.copy``). - - :returns: this task detail (freshly merged with the incoming object) - :rtype: :py:class:`.TaskDetail` - """ - if not isinstance(other, TaskDetail): - raise exc.NotImplementedError("Can only merge with other" - " task details") - if other is self: - return self - super(TaskDetail, self).merge(other, deep_copy=deep_copy) - if self.results != other.results: - self.results = other.results - return self - - def copy(self): - """Copies this task detail. - - Creates a shallow copy of this task detail (any meta-data and - version information that this object maintains is shallow - copied via ``copy.copy``). - - NOTE(harlowja): This copy does **not** perform ``copy.copy`` on - the ``results`` attribute of this object (before assigning to the - copy). Instead the current objects ``results`` attribute directly - becomes (via assignment) the copied objects ``results`` attribute. - - See: https://bugs.launchpad.net/taskflow/+bug/1452978 for - what happens if this is copied at a deeper level (for example by - using ``copy.deepcopy`` or by using ``copy.copy``). - - :returns: a new task detail - :rtype: :py:class:`.TaskDetail` - """ - clone = copy.copy(self) - clone.results = self.results - if self.meta: - clone.meta = self.meta.copy() - if self.version: - clone.version = copy.copy(self.version) - return clone - - -class RetryDetail(AtomDetail): - """A retry detail (an atom detail typically associated with a |rt| atom). - - .. |rt| replace:: :py:class:`~taskflow.retry.Retry` - """ - - def __init__(self, name, uuid): - super(RetryDetail, self).__init__(name, uuid) - self.results = [] - - def reset(self, state): - """Resets this retry detail and sets ``state`` attribute value. - - This sets any previously added ``results`` back to an empty list - and resets the ``failure`` attribute back to ``None`` and sets the - state to the provided one, as well as setting this atom - details ``intention`` attribute to ``EXECUTE``. - """ - self.results = [] - self.failure = None - self.state = state - self.intention = states.EXECUTE - - def copy(self): - """Copies this retry detail. - - Creates a shallow copy of this retry detail (any meta-data and - version information that this object maintains is shallow - copied via ``copy.copy``). - - NOTE(harlowja): This copy does **not** copy - the incoming objects ``results`` attribute. Instead this - objects ``results`` attribute list is iterated over and a new list - is constructed with each ``(data, failures)`` element in that list - having its ``failures`` (a dictionary of each named - :py:class:`~taskflow.types.failure.Failure` object that - occured) copied but its ``data`` is left untouched. After - this is done that new list becomes (via assignment) the cloned - objects ``results`` attribute. - - See: https://bugs.launchpad.net/taskflow/+bug/1452978 for - what happens if the ``data`` in ``results`` is copied at a - deeper level (for example by using ``copy.deepcopy`` or by - using ``copy.copy``). - - :returns: a new retry detail - :rtype: :py:class:`.RetryDetail` - """ - clone = copy.copy(self) - results = [] - # NOTE(imelnikov): we can't just deep copy Failures, as they - # contain tracebacks, which are not copyable. - for (data, failures) in self.results: - copied_failures = {} - for (key, failure) in six.iteritems(failures): - copied_failures[key] = failure - results.append((data, copied_failures)) - clone.results = results - if self.meta: - clone.meta = self.meta.copy() - if self.version: - clone.version = copy.copy(self.version) - return clone - - @property - def last_results(self): - """The last result that was produced.""" - try: - return self.results[-1][0] - except IndexError: - exc.raise_with_cause(exc.NotFound, "Last results not found") - - @property - def last_failures(self): - """The last failure dictionary that was produced. - - NOTE(harlowja): This is **not** the same as the - local ``failure`` attribute as the obtained failure dictionary in - the ``results`` attribute (which is what this returns) is from - associated atom failures (which is different from the directly - related failure of the retry unit associated with this - atom detail). - """ - try: - return self.results[-1][1] - except IndexError: - exc.raise_with_cause(exc.NotFound, "Last failures not found") - - def put(self, state, result): - """Puts a result (acquired in the given state) into this detail. - - If the result is a :py:class:`~taskflow.types.failure.Failure` object - then the ``failure`` attribute will be set; if the result is not a - :py:class:`~taskflow.types.failure.Failure` object then the - ``results`` attribute will be appended to (and the ``failure`` - attribute will be set to ``None``). In either case the ``state`` - attribute will be set to the provided state. - """ - # Do not clean retry history (only on reset does this happen). - self.state = state - if self._was_failure(state, result): - self.failure = result - else: - self.results.append((result, {})) - self.failure = None - return True - - @classmethod - def from_dict(cls, data): - """Translates the given ``dict`` into an instance of this class.""" - - def decode_results(results): - if not results: - return [] - new_results = [] - for (data, failures) in results: - new_failures = {} - for (key, data) in six.iteritems(failures): - new_failures[key] = ft.Failure.from_dict(data) - new_results.append((data, new_failures)) - return new_results - - obj = super(RetryDetail, cls).from_dict(data) - obj.results = decode_results(obj.results) - return obj - - def to_dict(self): - """Translates the internal state of this object to a ``dict``.""" - - def encode_results(results): - if not results: - return [] - new_results = [] - for (data, failures) in results: - new_failures = {} - for (key, failure) in six.iteritems(failures): - new_failures[key] = failure.to_dict() - new_results.append((data, new_failures)) - return new_results - - base = super(RetryDetail, self).to_dict() - base['results'] = encode_results(base.get('results')) - return base - - def merge(self, other, deep_copy=False): - """Merges the current retry detail with the given one. - - NOTE(harlowja): This merge does **not** deep copy - the incoming objects ``results`` attribute (if it differs). Instead - the incoming objects ``results`` attribute list is **always** iterated - over and a new list is constructed with - each ``(data, failures)`` element in that list having - its ``failures`` (a dictionary of each named - :py:class:`~taskflow.types.failure.Failure` objects that - occurred) copied but its ``data`` is left untouched. After - this is done that new list becomes (via assignment) this - objects ``results`` attribute. Also note that if the provided object - is this object itself then **no** merging is done. - - See: https://bugs.launchpad.net/taskflow/+bug/1452978 for - what happens if the ``data`` in ``results`` is copied at a - deeper level (for example by using ``copy.deepcopy`` or by - using ``copy.copy``). - - :returns: this retry detail (freshly merged with the incoming object) - :rtype: :py:class:`.RetryDetail` - """ - if not isinstance(other, RetryDetail): - raise exc.NotImplementedError("Can only merge with other" - " retry details") - if other is self: - return self - super(RetryDetail, self).merge(other, deep_copy=deep_copy) - results = [] - # NOTE(imelnikov): we can't just deep copy Failures, as they - # contain tracebacks, which are not copyable. - for (data, failures) in other.results: - copied_failures = {} - for (key, failure) in six.iteritems(failures): - if deep_copy: - copied_failures[key] = failure.copy() - else: - copied_failures[key] = failure - results.append((data, copied_failures)) - self.results = results - return self - - -_DETAIL_TO_NAME = { - RetryDetail: 'RETRY_DETAIL', - TaskDetail: 'TASK_DETAIL', -} -_NAME_TO_DETAIL = dict((name, cls) - for (cls, name) in six.iteritems(_DETAIL_TO_NAME)) -ATOM_TYPES = list(six.iterkeys(_NAME_TO_DETAIL)) - - -def atom_detail_class(atom_type): - try: - return _NAME_TO_DETAIL[atom_type] - except KeyError: - raise TypeError("Unknown atom type '%s'" % (atom_type)) - - -def atom_detail_type(atom_detail): - try: - return _DETAIL_TO_NAME[type(atom_detail)] - except KeyError: - raise TypeError("Unknown atom '%s' (%s)" - % (atom_detail, type(atom_detail))) +# Keep alias classes/functions... around until this module is removed. +LogBook = models.LogBook +FlowDetail = models.FlowDetail +AtomDetail = models.AtomDetail +TaskDetail = models.TaskDetail +RetryDetail = models.RetryDetail +atom_detail_type = models.atom_detail_type +atom_detail_class = models.atom_detail_class +ATOM_TYPES = models.ATOM_TYPES diff --git a/taskflow/persistence/models.py b/taskflow/persistence/models.py new file mode 100644 index 000000000..c7a6eae51 --- /dev/null +++ b/taskflow/persistence/models.py @@ -0,0 +1,892 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import copy + +from oslo_utils import timeutils +from oslo_utils import uuidutils +import six + +from taskflow import exceptions as exc +from taskflow import logging +from taskflow import states +from taskflow.types import failure as ft + +LOG = logging.getLogger(__name__) + + +# Internal helpers... + + +def _copy_function(deep_copy): + if deep_copy: + return copy.deepcopy + else: + return lambda x: x + + +def _safe_marshal_time(when): + if not when: + return None + return timeutils.marshall_now(now=when) + + +def _safe_unmarshal_time(when): + if not when: + return None + return timeutils.unmarshall_time(when) + + +def _fix_meta(data): + # Handle the case where older schemas allowed this to be non-dict by + # correcting this case by replacing it with a dictionary when a non-dict + # is found. + meta = data.get('meta') + if not isinstance(meta, dict): + meta = {} + return meta + + +class LogBook(object): + """A collection of flow details and associated metadata. + + Typically this class contains a collection of flow detail entries + for a given engine (or job) so that those entities can track what 'work' + has been completed for resumption, reverting and miscellaneous tracking + purposes. + + The data contained within this class need **not** be persisted to the + backend storage in real time. The data in this class will only be + guaranteed to be persisted when a save occurs via some backend + connection. + + NOTE(harlowja): the naming of this class is analogous to a ship's log or a + similar type of record used in detailing work that has been completed (or + work that has not been completed). + + :ivar created_at: A ``datetime.datetime`` object of when this logbook + was created. + :ivar updated_at: A ``datetime.datetime`` object of when this logbook + was last updated at. + :ivar meta: A dictionary of meta-data associated with this logbook. + """ + def __init__(self, name, uuid=None): + if uuid: + self._uuid = uuid + else: + self._uuid = uuidutils.generate_uuid() + self._name = name + self._flowdetails_by_id = {} + self.created_at = timeutils.utcnow() + self.updated_at = None + self.meta = {} + + def add(self, fd): + """Adds a new flow detail into this logbook. + + NOTE(harlowja): if an existing flow detail exists with the same + uuid the existing one will be overwritten with the newly provided + one. + + Does not *guarantee* that the details will be immediately saved. + """ + self._flowdetails_by_id[fd.uuid] = fd + self.updated_at = timeutils.utcnow() + + def find(self, flow_uuid): + """Locate the flow detail corresponding to the given uuid. + + :returns: the flow detail with that uuid + :rtype: :py:class:`.FlowDetail` (or ``None`` if not found) + """ + return self._flowdetails_by_id.get(flow_uuid, None) + + def merge(self, lb, deep_copy=False): + """Merges the current object state with the given ones state. + + If ``deep_copy`` is provided as truthy then the + local object will use ``copy.deepcopy`` to replace this objects + local attributes with the provided objects attributes (**only** if + there is a difference between this objects attributes and the + provided attributes). If ``deep_copy`` is falsey (the default) then a + reference copy will occur instead when a difference is detected. + + NOTE(harlowja): If the provided object is this object itself + then **no** merging is done. Also note that this does **not** merge + the flow details contained in either. + + :returns: this logbook (freshly merged with the incoming object) + :rtype: :py:class:`.LogBook` + """ + if lb is self: + return self + copy_fn = _copy_function(deep_copy) + if self.meta != lb.meta: + self.meta = copy_fn(lb.meta) + if lb.created_at != self.created_at: + self.created_at = copy_fn(lb.created_at) + if lb.updated_at != self.updated_at: + self.updated_at = copy_fn(lb.updated_at) + return self + + def to_dict(self, marshal_time=False): + """Translates the internal state of this object to a ``dict``. + + NOTE(harlowja): The returned ``dict`` does **not** include any + contained flow details. + + :returns: this logbook in ``dict`` form + """ + if not marshal_time: + marshal_fn = lambda x: x + else: + marshal_fn = _safe_marshal_time + return { + 'name': self.name, + 'meta': self.meta, + 'uuid': self.uuid, + 'updated_at': marshal_fn(self.updated_at), + 'created_at': marshal_fn(self.created_at), + } + + @classmethod + def from_dict(cls, data, unmarshal_time=False): + """Translates the given ``dict`` into an instance of this class. + + NOTE(harlowja): the ``dict`` provided should come from a prior + call to :meth:`.to_dict`. + + :returns: a new logbook + :rtype: :py:class:`.LogBook` + """ + if not unmarshal_time: + unmarshal_fn = lambda x: x + else: + unmarshal_fn = _safe_unmarshal_time + obj = cls(data['name'], uuid=data['uuid']) + obj.updated_at = unmarshal_fn(data['updated_at']) + obj.created_at = unmarshal_fn(data['created_at']) + obj.meta = _fix_meta(data) + return obj + + @property + def uuid(self): + """The unique identifer of this logbook.""" + return self._uuid + + @property + def name(self): + """The name of this logbook.""" + return self._name + + def __iter__(self): + for fd in six.itervalues(self._flowdetails_by_id): + yield fd + + def __len__(self): + return len(self._flowdetails_by_id) + + def copy(self, retain_contents=True): + """Copies this logbook. + + Creates a shallow copy of this logbook. If this logbook contains + flow details and ``retain_contents`` is truthy (the default) then + the flow details container will be shallow copied (the flow details + contained there-in will **not** be copied). If ``retain_contents`` is + falsey then the copied logbook will have **no** contained flow + details (but it will have the rest of the local objects attributes + copied). + + :returns: a new logbook + :rtype: :py:class:`.LogBook` + """ + clone = copy.copy(self) + if not retain_contents: + clone._flowdetails_by_id = {} + else: + clone._flowdetails_by_id = self._flowdetails_by_id.copy() + if self.meta: + clone.meta = self.meta.copy() + return clone + + +class FlowDetail(object): + """A collection of atom details and associated metadata. + + Typically this class contains a collection of atom detail entries that + represent the atoms in a given flow structure (along with any other needed + metadata relevant to that flow). + + The data contained within this class need **not** be persisted to the + backend storage in real time. The data in this class will only be + guaranteed to be persisted when a save (or update) occurs via some backend + connection. + + :ivar state: The state of the flow associated with this flow detail. + :ivar meta: A dictionary of meta-data associated with this flow detail. + """ + def __init__(self, name, uuid): + self._uuid = uuid + self._name = name + self._atomdetails_by_id = {} + self.state = None + self.meta = {} + + def update(self, fd): + """Updates the objects state to be the same as the given one. + + This will assign the private and public attributes of the given + flow detail directly to this object (replacing any existing + attributes in this object; even if they are the **same**). + + NOTE(harlowja): If the provided object is this object itself + then **no** update is done. + + :returns: this flow detail + :rtype: :py:class:`.FlowDetail` + """ + if fd is self: + return self + self._atomdetails_by_id = fd._atomdetails_by_id + self.state = fd.state + self.meta = fd.meta + return self + + def merge(self, fd, deep_copy=False): + """Merges the current object state with the given one's state. + + If ``deep_copy`` is provided as truthy then the + local object will use ``copy.deepcopy`` to replace this objects + local attributes with the provided objects attributes (**only** if + there is a difference between this objects attributes and the + provided attributes). If ``deep_copy`` is falsey (the default) then a + reference copy will occur instead when a difference is detected. + + NOTE(harlowja): If the provided object is this object itself + then **no** merging is done. Also this does **not** merge the atom + details contained in either. + + :returns: this flow detail (freshly merged with the incoming object) + :rtype: :py:class:`.FlowDetail` + """ + if fd is self: + return self + copy_fn = _copy_function(deep_copy) + if self.meta != fd.meta: + self.meta = copy_fn(fd.meta) + if self.state != fd.state: + # NOTE(imelnikov): states are just strings, no need to copy. + self.state = fd.state + return self + + def copy(self, retain_contents=True): + """Copies this flow detail. + + Creates a shallow copy of this flow detail. If this detail contains + flow details and ``retain_contents`` is truthy (the default) then + the atom details container will be shallow copied (the atom details + contained there-in will **not** be copied). If ``retain_contents`` is + falsey then the copied flow detail will have **no** contained atom + details (but it will have the rest of the local objects attributes + copied). + + :returns: a new flow detail + :rtype: :py:class:`.FlowDetail` + """ + clone = copy.copy(self) + if not retain_contents: + clone._atomdetails_by_id = {} + else: + clone._atomdetails_by_id = self._atomdetails_by_id.copy() + if self.meta: + clone.meta = self.meta.copy() + return clone + + def to_dict(self): + """Translates the internal state of this object to a ``dict``. + + NOTE(harlowja): The returned ``dict`` does **not** include any + contained atom details. + + :returns: this flow detail in ``dict`` form + """ + return { + 'name': self.name, + 'meta': self.meta, + 'state': self.state, + 'uuid': self.uuid, + } + + @classmethod + def from_dict(cls, data): + """Translates the given ``dict`` into an instance of this class. + + NOTE(harlowja): the ``dict`` provided should come from a prior + call to :meth:`.to_dict`. + + :returns: a new flow detail + :rtype: :py:class:`.FlowDetail` + """ + obj = cls(data['name'], data['uuid']) + obj.state = data.get('state') + obj.meta = _fix_meta(data) + return obj + + def add(self, ad): + """Adds a new atom detail into this flow detail. + + NOTE(harlowja): if an existing atom detail exists with the same + uuid the existing one will be overwritten with the newly provided + one. + + Does not *guarantee* that the details will be immediately saved. + """ + self._atomdetails_by_id[ad.uuid] = ad + + def find(self, ad_uuid): + """Locate the atom detail corresponding to the given uuid. + + :returns: the atom detail with that uuid + :rtype: :py:class:`.AtomDetail` (or ``None`` if not found) + """ + return self._atomdetails_by_id.get(ad_uuid) + + @property + def uuid(self): + """The unique identifer of this flow detail.""" + return self._uuid + + @property + def name(self): + """The name of this flow detail.""" + return self._name + + def __iter__(self): + for ad in six.itervalues(self._atomdetails_by_id): + yield ad + + def __len__(self): + return len(self._atomdetails_by_id) + + +@six.add_metaclass(abc.ABCMeta) +class AtomDetail(object): + """A collection of atom specific runtime information and metadata. + + This is a base **abstract** class that contains attributes that are used + to connect a atom to the persistence layer before, during, or after it is + running. It includes any results it may have produced, any state that it + may be in (for example ``FAILURE``), any exception that occurred when + running, and any associated stacktrace that may have occurring during an + exception being thrown. It may also contain any other metadata that + should also be stored along-side the details about the connected atom. + + The data contained within this class need **not** be persisted to the + backend storage in real time. The data in this class will only be + guaranteed to be persisted when a save (or update) occurs via some backend + connection. + + :ivar state: The state of the atom associated with this atom detail. + :ivar intention: The execution strategy of the atom associated + with this atom detail (used by an engine/others to + determine if the associated atom needs to be + executed, reverted, retried and so-on). + :ivar meta: A dictionary of meta-data associated with this atom detail. + :ivar version: A version tuple or string that represents the + atom version this atom detail is associated with (typically + used for introspection and any data migration + strategies). + :ivar results: Any results the atom produced from either its + ``execute`` method or from other sources. + :ivar failure: If the atom failed (possibly due to its ``execute`` + method raising) this will be a + :py:class:`~taskflow.types.failure.Failure` object that + represents that failure (if there was no failure this + will be set to none). + """ + + def __init__(self, name, uuid): + self._uuid = uuid + self._name = name + self.state = None + self.intention = states.EXECUTE + self.results = None + self.failure = None + self.meta = {} + self.version = None + + @staticmethod + def _was_failure(state, result): + # Internal helper method... + return state == states.FAILURE and isinstance(result, ft.Failure) + + @property + def last_results(self): + """Gets the atoms last result. + + If the atom has produced many results (for example if it has been + retried, reverted, executed and ...) this returns the last one of + many results. + """ + return self.results + + def update(self, ad): + """Updates the object's state to be the same as the given one. + + This will assign the private and public attributes of the given + atom detail directly to this object (replacing any existing + attributes in this object; even if they are the **same**). + + NOTE(harlowja): If the provided object is this object itself + then **no** update is done. + + :returns: this atom detail + :rtype: :py:class:`.AtomDetail` + """ + if ad is self: + return self + self.state = ad.state + self.intention = ad.intention + self.meta = ad.meta + self.failure = ad.failure + self.results = ad.results + self.version = ad.version + return self + + @abc.abstractmethod + def merge(self, other, deep_copy=False): + """Merges the current object state with the given ones state. + + If ``deep_copy`` is provided as truthy then the + local object will use ``copy.deepcopy`` to replace this objects + local attributes with the provided objects attributes (**only** if + there is a difference between this objects attributes and the + provided attributes). If ``deep_copy`` is falsey (the default) then a + reference copy will occur instead when a difference is detected. + + NOTE(harlowja): If the provided object is this object itself + then **no** merging is done. Do note that **no** results are merged + in this method. That operation **must** to be the responsibilty of + subclasses to implement and override this abstract method + and provide that merging themselves as they see fit. + + :returns: this atom detail (freshly merged with the incoming object) + :rtype: :py:class:`.AtomDetail` + """ + copy_fn = _copy_function(deep_copy) + # NOTE(imelnikov): states and intentions are just strings, + # so there is no need to copy them (strings are immutable in python). + self.state = other.state + self.intention = other.intention + if self.failure != other.failure: + # NOTE(imelnikov): we can't just deep copy Failures, as they + # contain tracebacks, which are not copyable. + if other.failure: + if deep_copy: + self.failure = other.failure.copy() + else: + self.failure = other.failure + else: + self.failure = None + if self.meta != other.meta: + self.meta = copy_fn(other.meta) + if self.version != other.version: + self.version = copy_fn(other.version) + return self + + @abc.abstractmethod + def put(self, state, result): + """Puts a result (acquired in the given state) into this detail.""" + + def to_dict(self): + """Translates the internal state of this object to a ``dict``. + + :returns: this atom detail in ``dict`` form + """ + if self.failure: + failure = self.failure.to_dict() + else: + failure = None + return { + 'failure': failure, + 'meta': self.meta, + 'name': self.name, + 'results': self.results, + 'state': self.state, + 'version': self.version, + 'intention': self.intention, + 'uuid': self.uuid, + } + + @classmethod + def from_dict(cls, data): + """Translates the given ``dict`` into an instance of this class. + + NOTE(harlowja): the ``dict`` provided should come from a prior + call to :meth:`.to_dict`. + + :returns: a new atom detail + :rtype: :py:class:`.AtomDetail` + """ + obj = cls(data['name'], data['uuid']) + obj.state = data.get('state') + obj.intention = data.get('intention') + obj.results = data.get('results') + obj.version = data.get('version') + obj.meta = _fix_meta(data) + failure = data.get('failure') + if failure: + obj.failure = ft.Failure.from_dict(failure) + return obj + + @property + def uuid(self): + """The unique identifer of this atom detail.""" + return self._uuid + + @property + def name(self): + """The name of this atom detail.""" + return self._name + + @abc.abstractmethod + def reset(self, state): + """Resets this atom detail and sets ``state`` attribute value.""" + + @abc.abstractmethod + def copy(self): + """Copies this atom detail.""" + + +class TaskDetail(AtomDetail): + """A task detail (an atom detail typically associated with a |tt| atom). + + .. |tt| replace:: :py:class:`~taskflow.task.BaseTask` + """ + + def reset(self, state): + """Resets this task detail and sets ``state`` attribute value. + + This sets any previously set ``results`` and ``failure`` attributes + back to ``None`` and sets the state to the provided one, as well as + setting this task details ``intention`` attribute to ``EXECUTE``. + """ + self.results = None + self.failure = None + self.state = state + self.intention = states.EXECUTE + + def put(self, state, result): + """Puts a result (acquired in the given state) into this detail. + + If the result is a :py:class:`~taskflow.types.failure.Failure` object + then the ``failure`` attribute will be set (and the ``results`` + attribute will be set to ``None``); if the result is not a + :py:class:`~taskflow.types.failure.Failure` object then the + ``results`` attribute will be set (and the ``failure`` attribute + will be set to ``None``). In either case the ``state`` + attribute will be set to the provided state. + """ + was_altered = False + if self.state != state: + self.state = state + was_altered = True + if self._was_failure(state, result): + if self.failure != result: + self.failure = result + was_altered = True + if self.results is not None: + self.results = None + was_altered = True + else: + # We don't really have the ability to determine equality of + # task (user) results at the current time, without making + # potentially bad guesses, so assume the task detail always needs + # to be saved if they are not exactly equivalent... + if self.results is not result: + self.results = result + was_altered = True + if self.failure is not None: + self.failure = None + was_altered = True + return was_altered + + def merge(self, other, deep_copy=False): + """Merges the current task detail with the given one. + + NOTE(harlowja): This merge does **not** copy and replace + the ``results`` attribute if it differs. Instead the current + objects ``results`` attribute directly becomes (via assignment) the + other objects ``results`` attribute. Also note that if the provided + object is this object itself then **no** merging is done. + + See: https://bugs.launchpad.net/taskflow/+bug/1452978 for + what happens if this is copied at a deeper level (for example by + using ``copy.deepcopy`` or by using ``copy.copy``). + + :returns: this task detail (freshly merged with the incoming object) + :rtype: :py:class:`.TaskDetail` + """ + if not isinstance(other, TaskDetail): + raise exc.NotImplementedError("Can only merge with other" + " task details") + if other is self: + return self + super(TaskDetail, self).merge(other, deep_copy=deep_copy) + if self.results != other.results: + self.results = other.results + return self + + def copy(self): + """Copies this task detail. + + Creates a shallow copy of this task detail (any meta-data and + version information that this object maintains is shallow + copied via ``copy.copy``). + + NOTE(harlowja): This copy does **not** perform ``copy.copy`` on + the ``results`` attribute of this object (before assigning to the + copy). Instead the current objects ``results`` attribute directly + becomes (via assignment) the copied objects ``results`` attribute. + + See: https://bugs.launchpad.net/taskflow/+bug/1452978 for + what happens if this is copied at a deeper level (for example by + using ``copy.deepcopy`` or by using ``copy.copy``). + + :returns: a new task detail + :rtype: :py:class:`.TaskDetail` + """ + clone = copy.copy(self) + clone.results = self.results + if self.meta: + clone.meta = self.meta.copy() + if self.version: + clone.version = copy.copy(self.version) + return clone + + +class RetryDetail(AtomDetail): + """A retry detail (an atom detail typically associated with a |rt| atom). + + .. |rt| replace:: :py:class:`~taskflow.retry.Retry` + """ + + def __init__(self, name, uuid): + super(RetryDetail, self).__init__(name, uuid) + self.results = [] + + def reset(self, state): + """Resets this retry detail and sets ``state`` attribute value. + + This sets any previously added ``results`` back to an empty list + and resets the ``failure`` attribute back to ``None`` and sets the + state to the provided one, as well as setting this atom + details ``intention`` attribute to ``EXECUTE``. + """ + self.results = [] + self.failure = None + self.state = state + self.intention = states.EXECUTE + + def copy(self): + """Copies this retry detail. + + Creates a shallow copy of this retry detail (any meta-data and + version information that this object maintains is shallow + copied via ``copy.copy``). + + NOTE(harlowja): This copy does **not** copy + the incoming objects ``results`` attribute. Instead this + objects ``results`` attribute list is iterated over and a new list + is constructed with each ``(data, failures)`` element in that list + having its ``failures`` (a dictionary of each named + :py:class:`~taskflow.types.failure.Failure` object that + occured) copied but its ``data`` is left untouched. After + this is done that new list becomes (via assignment) the cloned + objects ``results`` attribute. + + See: https://bugs.launchpad.net/taskflow/+bug/1452978 for + what happens if the ``data`` in ``results`` is copied at a + deeper level (for example by using ``copy.deepcopy`` or by + using ``copy.copy``). + + :returns: a new retry detail + :rtype: :py:class:`.RetryDetail` + """ + clone = copy.copy(self) + results = [] + # NOTE(imelnikov): we can't just deep copy Failures, as they + # contain tracebacks, which are not copyable. + for (data, failures) in self.results: + copied_failures = {} + for (key, failure) in six.iteritems(failures): + copied_failures[key] = failure + results.append((data, copied_failures)) + clone.results = results + if self.meta: + clone.meta = self.meta.copy() + if self.version: + clone.version = copy.copy(self.version) + return clone + + @property + def last_results(self): + """The last result that was produced.""" + try: + return self.results[-1][0] + except IndexError: + exc.raise_with_cause(exc.NotFound, "Last results not found") + + @property + def last_failures(self): + """The last failure dictionary that was produced. + + NOTE(harlowja): This is **not** the same as the + local ``failure`` attribute as the obtained failure dictionary in + the ``results`` attribute (which is what this returns) is from + associated atom failures (which is different from the directly + related failure of the retry unit associated with this + atom detail). + """ + try: + return self.results[-1][1] + except IndexError: + exc.raise_with_cause(exc.NotFound, "Last failures not found") + + def put(self, state, result): + """Puts a result (acquired in the given state) into this detail. + + If the result is a :py:class:`~taskflow.types.failure.Failure` object + then the ``failure`` attribute will be set; if the result is not a + :py:class:`~taskflow.types.failure.Failure` object then the + ``results`` attribute will be appended to (and the ``failure`` + attribute will be set to ``None``). In either case the ``state`` + attribute will be set to the provided state. + """ + # Do not clean retry history (only on reset does this happen). + self.state = state + if self._was_failure(state, result): + self.failure = result + else: + self.results.append((result, {})) + self.failure = None + return True + + @classmethod + def from_dict(cls, data): + """Translates the given ``dict`` into an instance of this class.""" + + def decode_results(results): + if not results: + return [] + new_results = [] + for (data, failures) in results: + new_failures = {} + for (key, data) in six.iteritems(failures): + new_failures[key] = ft.Failure.from_dict(data) + new_results.append((data, new_failures)) + return new_results + + obj = super(RetryDetail, cls).from_dict(data) + obj.results = decode_results(obj.results) + return obj + + def to_dict(self): + """Translates the internal state of this object to a ``dict``.""" + + def encode_results(results): + if not results: + return [] + new_results = [] + for (data, failures) in results: + new_failures = {} + for (key, failure) in six.iteritems(failures): + new_failures[key] = failure.to_dict() + new_results.append((data, new_failures)) + return new_results + + base = super(RetryDetail, self).to_dict() + base['results'] = encode_results(base.get('results')) + return base + + def merge(self, other, deep_copy=False): + """Merges the current retry detail with the given one. + + NOTE(harlowja): This merge does **not** deep copy + the incoming objects ``results`` attribute (if it differs). Instead + the incoming objects ``results`` attribute list is **always** iterated + over and a new list is constructed with + each ``(data, failures)`` element in that list having + its ``failures`` (a dictionary of each named + :py:class:`~taskflow.types.failure.Failure` objects that + occurred) copied but its ``data`` is left untouched. After + this is done that new list becomes (via assignment) this + objects ``results`` attribute. Also note that if the provided object + is this object itself then **no** merging is done. + + See: https://bugs.launchpad.net/taskflow/+bug/1452978 for + what happens if the ``data`` in ``results`` is copied at a + deeper level (for example by using ``copy.deepcopy`` or by + using ``copy.copy``). + + :returns: this retry detail (freshly merged with the incoming object) + :rtype: :py:class:`.RetryDetail` + """ + if not isinstance(other, RetryDetail): + raise exc.NotImplementedError("Can only merge with other" + " retry details") + if other is self: + return self + super(RetryDetail, self).merge(other, deep_copy=deep_copy) + results = [] + # NOTE(imelnikov): we can't just deep copy Failures, as they + # contain tracebacks, which are not copyable. + for (data, failures) in other.results: + copied_failures = {} + for (key, failure) in six.iteritems(failures): + if deep_copy: + copied_failures[key] = failure.copy() + else: + copied_failures[key] = failure + results.append((data, copied_failures)) + self.results = results + return self + + +_DETAIL_TO_NAME = { + RetryDetail: 'RETRY_DETAIL', + TaskDetail: 'TASK_DETAIL', +} +_NAME_TO_DETAIL = dict((name, cls) + for (cls, name) in six.iteritems(_DETAIL_TO_NAME)) +ATOM_TYPES = list(six.iterkeys(_NAME_TO_DETAIL)) + + +def atom_detail_class(atom_type): + try: + return _NAME_TO_DETAIL[atom_type] + except KeyError: + raise TypeError("Unknown atom type '%s'" % (atom_type)) + + +def atom_detail_type(atom_detail): + try: + return _DETAIL_TO_NAME[type(atom_detail)] + except KeyError: + raise TypeError("Unknown atom '%s' (%s)" + % (atom_detail, type(atom_detail))) diff --git a/taskflow/persistence/path_based.py b/taskflow/persistence/path_based.py index 0d729978d..f2d411b2a 100644 --- a/taskflow/persistence/path_based.py +++ b/taskflow/persistence/path_based.py @@ -19,7 +19,7 @@ import six from taskflow import exceptions as exc from taskflow.persistence import base -from taskflow.persistence import logbook +from taskflow.persistence import models @six.add_metaclass(abc.ABCMeta) @@ -60,23 +60,23 @@ class PathBasedConnection(base.Connection): @staticmethod def _serialize(obj): - if isinstance(obj, logbook.LogBook): + if isinstance(obj, models.LogBook): return obj.to_dict(marshal_time=True) - elif isinstance(obj, logbook.FlowDetail): + elif isinstance(obj, models.FlowDetail): return obj.to_dict() - elif isinstance(obj, logbook.AtomDetail): + elif isinstance(obj, models.AtomDetail): return base._format_atom(obj) else: raise exc.StorageFailure("Invalid storage class %s" % type(obj)) @staticmethod def _deserialize(cls, data): - if issubclass(cls, logbook.LogBook): + if issubclass(cls, models.LogBook): return cls.from_dict(data, unmarshal_time=True) - elif issubclass(cls, logbook.FlowDetail): + elif issubclass(cls, models.FlowDetail): return cls.from_dict(data) - elif issubclass(cls, logbook.AtomDetail): - atom_class = logbook.atom_detail_class(data['type']) + elif issubclass(cls, models.AtomDetail): + atom_class = models.atom_detail_class(data['type']) return atom_class.from_dict(data['atom']) else: raise exc.StorageFailure("Invalid storage class %s" % cls) @@ -130,11 +130,11 @@ class PathBasedConnection(base.Connection): """Context manager that yields a transaction""" def _get_obj_path(self, obj): - if isinstance(obj, logbook.LogBook): + if isinstance(obj, models.LogBook): path = self.book_path - elif isinstance(obj, logbook.FlowDetail): + elif isinstance(obj, models.FlowDetail): path = self.flow_path - elif isinstance(obj, logbook.AtomDetail): + elif isinstance(obj, models.AtomDetail): path = self.atom_path else: raise exc.StorageFailure("Invalid storage class %s" % type(obj)) @@ -159,7 +159,7 @@ class PathBasedConnection(base.Connection): def get_logbook(self, book_uuid, lazy=False): book_path = self._join_path(self.book_path, book_uuid) book_data = self._get_item(book_path) - book = self._deserialize(logbook.LogBook, book_data) + book = self._deserialize(models.LogBook, book_data) if not lazy: for flow_details in self.get_flows_for_book(book_uuid): book.add(flow_details) @@ -185,7 +185,7 @@ class PathBasedConnection(base.Connection): def get_flow_details(self, flow_uuid, lazy=False): flow_path = self._join_path(self.flow_path, flow_uuid) flow_data = self._get_item(flow_path) - flow_details = self._deserialize(logbook.FlowDetail, flow_data) + flow_details = self._deserialize(models.FlowDetail, flow_data) if not lazy: for atom_details in self.get_atoms_for_flow(flow_uuid): flow_details.add(atom_details) @@ -216,7 +216,7 @@ class PathBasedConnection(base.Connection): def get_atom_details(self, atom_uuid): atom_path = self._join_path(self.atom_path, atom_uuid) atom_data = self._get_item(atom_path) - return self._deserialize(logbook.AtomDetail, atom_data) + return self._deserialize(models.AtomDetail, atom_data) def update_atom_details(self, atom_detail, ignore_missing=False): with self._transaction() as transaction: diff --git a/taskflow/storage.py b/taskflow/storage.py index cb1fbaad0..05b489993 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -24,7 +24,7 @@ import six from taskflow import exceptions from taskflow import logging from taskflow.persistence.backends import impl_memory -from taskflow.persistence import logbook +from taskflow.persistence import models from taskflow import retry from taskflow import states from taskflow import task @@ -153,8 +153,8 @@ class Storage(object): self._injected_args = {} self._lock = fasteners.ReaderWriterLock() self._ensure_matchers = [ - ((task.BaseTask,), (logbook.TaskDetail, 'Task')), - ((retry.Retry,), (logbook.RetryDetail, 'Retry')), + ((task.BaseTask,), (models.TaskDetail, 'Task')), + ((retry.Retry,), (models.RetryDetail, 'Retry')), ] if scope_fetcher is None: scope_fetcher = lambda atom_name: None @@ -171,7 +171,7 @@ class Storage(object): for ad in self._flowdetail) try: source, _clone = self._atomdetail_by_name( - self.injector_name, expected_type=logbook.TaskDetail) + self.injector_name, expected_type=models.TaskDetail) except exceptions.NotFound: pass else: @@ -399,7 +399,7 @@ class Storage(object): else: update_with[META_PROGRESS_DETAILS] = None self._update_atom_metadata(task_name, update_with, - expected_type=logbook.TaskDetail) + expected_type=models.TaskDetail) @fasteners.read_locked def get_task_progress(self, task_name): @@ -409,7 +409,7 @@ class Storage(object): :returns: current task progress value """ source, _clone = self._atomdetail_by_name( - task_name, expected_type=logbook.TaskDetail) + task_name, expected_type=models.TaskDetail) try: return source.meta[META_PROGRESS] except KeyError: @@ -424,7 +424,7 @@ class Storage(object): dict """ source, _clone = self._atomdetail_by_name( - task_name, expected_type=logbook.TaskDetail) + task_name, expected_type=models.TaskDetail) try: return source.meta[META_PROGRESS_DETAILS] except KeyError: @@ -468,7 +468,7 @@ class Storage(object): def save_retry_failure(self, retry_name, failed_atom_name, failure): """Save subflow failure to retry controller history.""" source, clone = self._atomdetail_by_name( - retry_name, expected_type=logbook.RetryDetail, clone=True) + retry_name, expected_type=models.RetryDetail, clone=True) try: failures = clone.last_failures except exceptions.NotFound: @@ -485,7 +485,7 @@ class Storage(object): def cleanup_retry_history(self, retry_name, state): """Cleanup history of retry atom with given name.""" source, clone = self._atomdetail_by_name( - retry_name, expected_type=logbook.RetryDetail, clone=True) + retry_name, expected_type=models.RetryDetail, clone=True) clone.state = state clone.results = [] self._with_connection(self._save_atom_detail, source, clone) @@ -625,7 +625,7 @@ class Storage(object): try: source, clone = self._atomdetail_by_name( self.injector_name, - expected_type=logbook.TaskDetail, + expected_type=models.TaskDetail, clone=True) except exceptions.NotFound: # Ensure we have our special task detail... @@ -633,7 +633,7 @@ class Storage(object): # TODO(harlowja): get this removed when # https://review.openstack.org/#/c/165645/ merges. source = self._create_atom_detail(self.injector_name, - logbook.TaskDetail, + models.TaskDetail, atom_state=None) fd_source, fd_clone = self._fetch_flowdetail(clone=True) fd_clone.add(source) @@ -974,7 +974,7 @@ class Storage(object): def get_retry_history(self, retry_name): """Fetch a single retrys history.""" source, _clone = self._atomdetail_by_name( - retry_name, expected_type=logbook.RetryDetail) + retry_name, expected_type=models.RetryDetail) return self._translate_into_history(source) @fasteners.read_locked @@ -982,7 +982,7 @@ class Storage(object): """Fetch all retrys histories.""" histories = [] for ad in self._flowdetail: - if isinstance(ad, logbook.RetryDetail): + if isinstance(ad, models.RetryDetail): histories.append((ad.name, self._translate_into_history(ad))) return histories diff --git a/taskflow/tests/unit/persistence/base.py b/taskflow/tests/unit/persistence/base.py index 924e62b23..0b56617fe 100644 --- a/taskflow/tests/unit/persistence/base.py +++ b/taskflow/tests/unit/persistence/base.py @@ -19,7 +19,7 @@ import contextlib from oslo_utils import uuidutils from taskflow import exceptions as exc -from taskflow.persistence import logbook +from taskflow.persistence import models from taskflow import states from taskflow.types import failure @@ -31,15 +31,15 @@ class PersistenceTestMixin(object): def test_task_detail_update_not_existing(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) - td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) + td = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) fd.add(td) with contextlib.closing(self._get_connection()) as conn: conn.save_logbook(lb) - td2 = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) + td2 = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) fd.add(td2) with contextlib.closing(self._get_connection()) as conn: conn.update_flow_details(fd) @@ -53,13 +53,13 @@ class PersistenceTestMixin(object): def test_flow_detail_update_not_existing(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) with contextlib.closing(self._get_connection()) as conn: conn.save_logbook(lb) - fd2 = logbook.FlowDetail('test-2', uuid=uuidutils.generate_uuid()) + fd2 = models.FlowDetail('test-2', uuid=uuidutils.generate_uuid()) lb.add(fd2) with contextlib.closing(self._get_connection()) as conn: conn.save_logbook(lb) @@ -73,7 +73,7 @@ class PersistenceTestMixin(object): lb_id = uuidutils.generate_uuid() lb_meta = {'1': 2} lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) + lb = models.LogBook(name=lb_name, uuid=lb_id) lb.meta = lb_meta # Should not already exist @@ -94,8 +94,8 @@ class PersistenceTestMixin(object): def test_flow_detail_save(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) # Ensure we can't save it since its owning logbook hasn't been @@ -113,8 +113,8 @@ class PersistenceTestMixin(object): def test_flow_detail_meta_update(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) fd.meta = {'test': 42} lb.add(fd) @@ -133,9 +133,9 @@ class PersistenceTestMixin(object): def test_flow_detail_lazy_fetch(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) - td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) + td = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) td.version = '4.2' fd.add(td) lb.add(fd) @@ -149,10 +149,10 @@ class PersistenceTestMixin(object): def test_task_detail_save(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) - td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) + td = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) fd.add(td) # Ensure we can't save it since its owning logbook hasn't been @@ -171,10 +171,10 @@ class PersistenceTestMixin(object): def test_task_detail_meta_update(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) - td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) + td = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) td.meta = {'test': 42} fd.add(td) @@ -192,15 +192,15 @@ class PersistenceTestMixin(object): fd2 = lb2.find(fd.uuid) td2 = fd2.find(td.uuid) self.assertEqual(td2.meta.get('test'), 43) - self.assertIsInstance(td2, logbook.TaskDetail) + self.assertIsInstance(td2, models.TaskDetail) def test_task_detail_with_failure(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) - td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) + td = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) try: raise RuntimeError('Woot!') @@ -222,18 +222,18 @@ class PersistenceTestMixin(object): self.assertEqual(td2.failure.exception_str, 'Woot!') self.assertIs(td2.failure.check(RuntimeError), RuntimeError) self.assertEqual(td2.failure.traceback_str, td.failure.traceback_str) - self.assertIsInstance(td2, logbook.TaskDetail) + self.assertIsInstance(td2, models.TaskDetail) def test_logbook_merge_flow_detail(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) with contextlib.closing(self._get_connection()) as conn: conn.save_logbook(lb) - lb2 = logbook.LogBook(name=lb_name, uuid=lb_id) - fd2 = logbook.FlowDetail('test2', uuid=uuidutils.generate_uuid()) + lb2 = models.LogBook(name=lb_name, uuid=lb_id) + fd2 = models.FlowDetail('test2', uuid=uuidutils.generate_uuid()) lb2.add(fd2) with contextlib.closing(self._get_connection()) as conn: conn.save_logbook(lb2) @@ -244,8 +244,8 @@ class PersistenceTestMixin(object): def test_logbook_add_flow_detail(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) with contextlib.closing(self._get_connection()) as conn: conn.save_logbook(lb) @@ -258,8 +258,8 @@ class PersistenceTestMixin(object): def test_logbook_lazy_fetch(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) with contextlib.closing(self._get_connection()) as conn: conn.save_logbook(lb) @@ -271,9 +271,9 @@ class PersistenceTestMixin(object): def test_logbook_add_task_detail(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) - td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) + td = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) td.version = '4.2' fd.add(td) lb.add(fd) @@ -298,7 +298,7 @@ class PersistenceTestMixin(object): def test_logbook_delete(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) + lb = models.LogBook(name=lb_name, uuid=lb_id) with contextlib.closing(self._get_connection()) as conn: self.assertRaises(exc.NotFound, conn.destroy_logbook, lb_id) with contextlib.closing(self._get_connection()) as conn: @@ -313,10 +313,10 @@ class PersistenceTestMixin(object): def test_task_detail_retry_type_(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) - rd = logbook.RetryDetail("detail-1", uuid=uuidutils.generate_uuid()) + rd = models.RetryDetail("detail-1", uuid=uuidutils.generate_uuid()) rd.intention = states.REVERT fd.add(rd) @@ -330,15 +330,15 @@ class PersistenceTestMixin(object): fd2 = lb2.find(fd.uuid) rd2 = fd2.find(rd.uuid) self.assertEqual(rd2.intention, states.REVERT) - self.assertIsInstance(rd2, logbook.RetryDetail) + self.assertIsInstance(rd2, models.RetryDetail) def test_retry_detail_save_with_task_failure(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) - rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) + rd = models.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) fail = failure.Failure.from_exception(RuntimeError('fail')) rd.results.append((42, {'some-task': fail})) fd.add(rd) @@ -354,7 +354,7 @@ class PersistenceTestMixin(object): lb2 = conn.get_logbook(lb_id) fd2 = lb2.find(fd.uuid) rd2 = fd2.find(rd.uuid) - self.assertIsInstance(rd2, logbook.RetryDetail) + self.assertIsInstance(rd2, models.RetryDetail) fail2 = rd2.results[0][1].get('some-task') self.assertIsInstance(fail2, failure.Failure) self.assertTrue(fail.matches(fail2)) @@ -362,10 +362,10 @@ class PersistenceTestMixin(object): def test_retry_detail_save_intention(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) - lb = logbook.LogBook(name=lb_name, uuid=lb_id) - fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb = models.LogBook(name=lb_name, uuid=lb_id) + fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) - rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) + rd = models.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) fd.add(rd) # save it @@ -385,4 +385,4 @@ class PersistenceTestMixin(object): fd2 = lb2.find(fd.uuid) rd2 = fd2.find(rd.uuid) self.assertEqual(rd2.intention, states.REVERT) - self.assertIsInstance(rd2, logbook.RetryDetail) + self.assertIsInstance(rd2, models.RetryDetail) diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 4e38dfa54..c49804dd8 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -26,7 +26,7 @@ from taskflow import exceptions as exc from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf -from taskflow.persistence import logbook +from taskflow.persistence import models from taskflow import states from taskflow import task from taskflow import test @@ -493,7 +493,7 @@ class EngineParallelFlowTest(utils.EngineTestBase): # Create FlowDetail as if we already run task1 lb, fd = p_utils.temporary_flow_detail(self.backend) - td = logbook.TaskDetail(name='task1', uuid='42') + td = models.TaskDetail(name='task1', uuid='42') td.state = states.SUCCESS td.results = 17 fd.add(td) diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index 8793f0ffb..958d5a53c 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -21,7 +21,7 @@ from oslo_utils import uuidutils from taskflow import exceptions from taskflow.persistence import backends -from taskflow.persistence import logbook +from taskflow.persistence import models from taskflow import states from taskflow import storage from taskflow import test @@ -61,7 +61,7 @@ class StorageTestMixin(object): self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my_task'))) def test_flow_name_and_uuid(self): - flow_detail = logbook.FlowDetail(name='test-fd', uuid='aaaa') + flow_detail = models.FlowDetail(name='test-fd', uuid='aaaa') s = self._get_storage(flow_detail) self.assertEqual(s.flow_name, 'test-fd') self.assertEqual(s.flow_uuid, 'aaaa') @@ -97,14 +97,14 @@ class StorageTestMixin(object): def test_get_without_save(self): _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) - td = logbook.TaskDetail(name='my_task', uuid='42') + td = models.TaskDetail(name='my_task', uuid='42') flow_detail.add(td) s = self._get_storage(flow_detail) self.assertEqual('42', s.get_atom_uuid('my_task')) def test_ensure_existing_task(self): _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) - td = logbook.TaskDetail(name='my_task', uuid='42') + td = models.TaskDetail(name='my_task', uuid='42') flow_detail.add(td) s = self._get_storage(flow_detail) s.ensure_atom(test_utils.NoopTask('my_task')) @@ -523,7 +523,7 @@ class StorageTestMixin(object): def test_logbook_get_unknown_atom_type(self): self.assertRaisesRegexp(TypeError, 'Unknown atom', - logbook.atom_detail_class, 'some_detail') + models.atom_detail_class, 'some_detail') def test_save_task_intention(self): s = self._get_storage() diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py index dd304bc6e..0837afb9f 100644 --- a/taskflow/utils/persistence_utils.py +++ b/taskflow/utils/persistence_utils.py @@ -21,7 +21,7 @@ from oslo_utils import timeutils from oslo_utils import uuidutils from taskflow import logging -from taskflow.persistence import logbook +from taskflow.persistence import models from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -33,7 +33,7 @@ def temporary_log_book(backend=None): Mainly useful for tests and other use cases where a temporary logbook is needed for a short-period of time. """ - book = logbook.LogBook('tmp') + book = models.LogBook('tmp') if backend is not None: with contextlib.closing(backend.get_connection()) as conn: conn.save_logbook(book) @@ -48,7 +48,7 @@ def temporary_flow_detail(backend=None): """ flow_id = uuidutils.generate_uuid() book = temporary_log_book(backend) - book.add(logbook.FlowDetail(name='tmp-flow-detail', uuid=flow_id)) + book.add(models.FlowDetail(name='tmp-flow-detail', uuid=flow_id)) if backend is not None: with contextlib.closing(backend.get_connection()) as conn: conn.save_logbook(book) @@ -77,7 +77,7 @@ def create_flow_detail(flow, book=None, backend=None, meta=None): LOG.warn("No name provided for flow %s (id %s)", flow, flow_id) flow_name = flow_id - flow_detail = logbook.FlowDetail(name=flow_name, uuid=flow_id) + flow_detail = models.FlowDetail(name=flow_name, uuid=flow_id) if meta is not None: if flow_detail.meta is None: flow_detail.meta = {} @@ -130,7 +130,7 @@ def _format_shared(obj, indent): def pformat_atom_detail(atom_detail, indent=0): """Pretty formats a atom detail.""" - detail_type = logbook.atom_detail_type(atom_detail) + detail_type = models.atom_detail_type(atom_detail) lines = ["%s%s: '%s'" % (" " * (indent), detail_type, atom_detail.name)] lines.extend(_format_shared(atom_detail, indent=indent + 1)) lines.append("%s- version = %s"