Merge "Rename logbook module -> models module"
This commit is contained in:
commit
5fb62f3864
@ -246,7 +246,7 @@ Preparation
|
||||
|
||||
This stage (see :py:func:`~taskflow.engines.base.Engine.prepare`) starts by
|
||||
setting up the storage needed for all atoms in the compiled graph, ensuring
|
||||
that corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail` (or
|
||||
that corresponding :py:class:`~taskflow.persistence.models.AtomDetail` (or
|
||||
subclass of) objects are created for each node in the graph.
|
||||
|
||||
Validation
|
||||
@ -293,7 +293,7 @@ for things like retry atom which can influence what a tasks intention should be
|
||||
:py:class:`~taskflow.engines.action_engine.analyzer.Analyzer` helper
|
||||
object which was designed to provide helper methods for this analysis). Once
|
||||
these intentions are determined and associated with each task (the intention is
|
||||
also stored in the :py:class:`~taskflow.persistence.logbook.AtomDetail` object)
|
||||
also stored in the :py:class:`~taskflow.persistence.models.AtomDetail` object)
|
||||
the :ref:`scheduling <scheduling>` stage starts.
|
||||
|
||||
.. _scheduling:
|
||||
@ -323,8 +323,8 @@ submitted to complete. Once one of the future objects completes (or fails) that
|
||||
atoms result will be examined and finalized using a
|
||||
:py:class:`~taskflow.engines.action_engine.completer.Completer` implementation.
|
||||
It typically will persist results to a provided persistence backend (saved
|
||||
into the corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail`
|
||||
and :py:class:`~taskflow.persistence.logbook.FlowDetail` objects via the
|
||||
into the corresponding :py:class:`~taskflow.persistence.models.AtomDetail`
|
||||
and :py:class:`~taskflow.persistence.models.FlowDetail` objects via the
|
||||
:py:class:`~taskflow.storage.Storage` helper) and reflect
|
||||
the new state of the atom. At this point what typically happens falls into two
|
||||
categories, one for if that atom failed and one for if it did not. If the atom
|
||||
|
@ -30,7 +30,7 @@ Definitions
|
||||
Jobs
|
||||
A :py:class:`job <taskflow.jobs.base.Job>` consists of a unique identifier,
|
||||
name, and a reference to a :py:class:`logbook
|
||||
<taskflow.persistence.logbook.LogBook>` which contains the details of the
|
||||
<taskflow.persistence.models.LogBook>` which contains the details of the
|
||||
work that has been or should be/will be completed to finish the work that has
|
||||
been created for that job.
|
||||
|
||||
|
@ -40,38 +40,38 @@ On :doc:`engine <engines>` construction typically a backend (it can be
|
||||
optional) will be provided which satisfies the
|
||||
:py:class:`~taskflow.persistence.base.Backend` abstraction. Along with
|
||||
providing a backend object a
|
||||
:py:class:`~taskflow.persistence.logbook.FlowDetail` object will also be
|
||||
:py:class:`~taskflow.persistence.models.FlowDetail` object will also be
|
||||
created and provided (this object will contain the details about the flow to be
|
||||
ran) to the engine constructor (or associated :py:meth:`load()
|
||||
<taskflow.engines.helpers.load>` helper functions). Typically a
|
||||
:py:class:`~taskflow.persistence.logbook.FlowDetail` object is created from a
|
||||
:py:class:`~taskflow.persistence.logbook.LogBook` object (the book object acts
|
||||
as a type of container for :py:class:`~taskflow.persistence.logbook.FlowDetail`
|
||||
and :py:class:`~taskflow.persistence.logbook.AtomDetail` objects).
|
||||
:py:class:`~taskflow.persistence.models.FlowDetail` object is created from a
|
||||
:py:class:`~taskflow.persistence.models.LogBook` object (the book object acts
|
||||
as a type of container for :py:class:`~taskflow.persistence.models.FlowDetail`
|
||||
and :py:class:`~taskflow.persistence.models.AtomDetail` objects).
|
||||
|
||||
**Preparation**: Once an engine starts to run it will create a
|
||||
:py:class:`~taskflow.storage.Storage` object which will act as the engines
|
||||
interface to the underlying backend storage objects (it provides helper
|
||||
functions that are commonly used by the engine, avoiding repeating code when
|
||||
interacting with the provided
|
||||
:py:class:`~taskflow.persistence.logbook.FlowDetail` and
|
||||
:py:class:`~taskflow.persistence.models.FlowDetail` and
|
||||
:py:class:`~taskflow.persistence.base.Backend` objects). As an engine
|
||||
initializes it will extract (or create)
|
||||
:py:class:`~taskflow.persistence.logbook.AtomDetail` objects for each atom in
|
||||
:py:class:`~taskflow.persistence.models.AtomDetail` objects for each atom in
|
||||
the workflow the engine will be executing.
|
||||
|
||||
**Execution:** When an engine beings to execute (see :doc:`engine <engines>`
|
||||
for more of the details about how an engine goes about this process) it will
|
||||
examine any previously existing
|
||||
:py:class:`~taskflow.persistence.logbook.AtomDetail` objects to see if they can
|
||||
:py:class:`~taskflow.persistence.models.AtomDetail` objects to see if they can
|
||||
be used for resuming; see :doc:`resumption <resumption>` for more details on
|
||||
this subject. For atoms which have not finished (or did not finish correctly
|
||||
from a previous run) they will begin executing only after any dependent inputs
|
||||
are ready. This is done by analyzing the execution graph and looking at
|
||||
predecessor :py:class:`~taskflow.persistence.logbook.AtomDetail` outputs and
|
||||
predecessor :py:class:`~taskflow.persistence.models.AtomDetail` outputs and
|
||||
states (which may have been persisted in a past run). This will result in
|
||||
either using their previous information or by running those predecessors and
|
||||
saving their output to the :py:class:`~taskflow.persistence.logbook.FlowDetail`
|
||||
saving their output to the :py:class:`~taskflow.persistence.models.FlowDetail`
|
||||
and :py:class:`~taskflow.persistence.base.Backend` objects. This
|
||||
execution, analysis and interaction with the storage objects continues (what is
|
||||
described here is a simplification of what really happens; which is quite a bit
|
||||
@ -288,7 +288,7 @@ Interfaces
|
||||
Models
|
||||
======
|
||||
|
||||
.. automodule:: taskflow.persistence.logbook
|
||||
.. automodule:: taskflow.persistence.models
|
||||
|
||||
Implementations
|
||||
===============
|
||||
|
@ -46,7 +46,7 @@ name serves a special purpose in the resumption process (as well as serving a
|
||||
useful purpose when running, allowing for atom identification in the
|
||||
:doc:`notification <notifications>` process). The reason for having names is
|
||||
that an atom in a flow needs to be somehow matched with (a potentially)
|
||||
existing :py:class:`~taskflow.persistence.logbook.AtomDetail` during engine
|
||||
existing :py:class:`~taskflow.persistence.models.AtomDetail` during engine
|
||||
resumption & subsequent running.
|
||||
|
||||
The match should be:
|
||||
@ -71,9 +71,9 @@ Scenarios
|
||||
=========
|
||||
|
||||
When new flow is loaded into engine, there is no persisted data for it yet, so
|
||||
a corresponding :py:class:`~taskflow.persistence.logbook.FlowDetail` object
|
||||
a corresponding :py:class:`~taskflow.persistence.models.FlowDetail` object
|
||||
will be created, as well as a
|
||||
:py:class:`~taskflow.persistence.logbook.AtomDetail` object for each atom that
|
||||
:py:class:`~taskflow.persistence.models.AtomDetail` object for each atom that
|
||||
is contained in it. These will be immediately saved into the persistence
|
||||
backend that is configured. If no persistence backend is configured, then as
|
||||
expected nothing will be saved and the atoms and flow will be ran in a
|
||||
@ -94,7 +94,7 @@ When the factory function mentioned above returns the exact same the flow and
|
||||
atoms (no changes are performed).
|
||||
|
||||
**Runtime change:** Nothing should be done -- the engine will re-associate
|
||||
atoms with :py:class:`~taskflow.persistence.logbook.AtomDetail` objects by name
|
||||
atoms with :py:class:`~taskflow.persistence.models.AtomDetail` objects by name
|
||||
and then the engine resumes.
|
||||
|
||||
Atom was added
|
||||
@ -105,7 +105,7 @@ in (for example for changing the runtime structure of what was previously ran
|
||||
in the first run).
|
||||
|
||||
**Runtime change:** By default when the engine resumes it will notice that a
|
||||
corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail` does not
|
||||
corresponding :py:class:`~taskflow.persistence.models.AtomDetail` does not
|
||||
exist and one will be created and associated.
|
||||
|
||||
Atom was removed
|
||||
@ -134,7 +134,7 @@ factory should replace this name where it was being used previously.
|
||||
exist when a new atom is added. In the future TaskFlow could make this easier
|
||||
by providing a ``upgrade()`` function that can be used to give users the
|
||||
ability to upgrade atoms before running (manual introspection & modification of
|
||||
a :py:class:`~taskflow.persistence.logbook.LogBook` can be done before engine
|
||||
a :py:class:`~taskflow.persistence.models.LogBook` can be done before engine
|
||||
loading and running to accomplish this in the meantime).
|
||||
|
||||
Atom was split in two atoms or merged
|
||||
@ -150,7 +150,7 @@ exist when a new atom is added or removed. In the future TaskFlow could make
|
||||
this easier by providing a ``migrate()`` function that can be used to give
|
||||
users the ability to migrate atoms previous data before running (manual
|
||||
introspection & modification of a
|
||||
:py:class:`~taskflow.persistence.logbook.LogBook` can be done before engine
|
||||
:py:class:`~taskflow.persistence.models.LogBook` can be done before engine
|
||||
loading and running to accomplish this in the meantime).
|
||||
|
||||
Flow structure was changed
|
||||
|
@ -31,7 +31,7 @@ sys.path.insert(0, self_dir)
|
||||
|
||||
from taskflow import engines
|
||||
from taskflow.patterns import linear_flow as lf
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.persistence import models
|
||||
from taskflow import task
|
||||
from taskflow.utils import persistence_utils as p_utils
|
||||
|
||||
@ -94,7 +94,7 @@ with eu.get_backend(backend_uri) as backend:
|
||||
# Make a flow that will blow up if the file didn't exist previously, if it
|
||||
# did exist, assume we won't blow up (and therefore this shows the undo
|
||||
# and redo that a flow will go through).
|
||||
book = logbook.LogBook("my-test")
|
||||
book = models.LogBook("my-test")
|
||||
flow = make_flow(blowup=blowup)
|
||||
eu.print_wrapped("Running")
|
||||
try:
|
||||
|
@ -42,7 +42,7 @@ from taskflow import engines
|
||||
from taskflow.jobs import backends as boards
|
||||
from taskflow.patterns import linear_flow
|
||||
from taskflow.persistence import backends as persistence
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.persistence import models
|
||||
from taskflow import task
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
@ -145,9 +145,9 @@ def generate_reviewer(client, saver, name=NAME):
|
||||
|
||||
def make_save_book(saver, review_id):
|
||||
# Record what we want to happen (sometime in the future).
|
||||
book = logbook.LogBook("book_%s" % review_id)
|
||||
detail = logbook.FlowDetail("flow_%s" % review_id,
|
||||
uuidutils.generate_uuid())
|
||||
book = models.LogBook("book_%s" % review_id)
|
||||
detail = models.FlowDetail("flow_%s" % review_id,
|
||||
uuidutils.generate_uuid())
|
||||
book.add(detail)
|
||||
# Associate the factory method we want to be called (in the future)
|
||||
# with the book, so that the conductor will be able to call into
|
||||
|
@ -34,7 +34,7 @@ from taskflow import logging
|
||||
from taskflow.persistence.backends.sqlalchemy import migration
|
||||
from taskflow.persistence.backends.sqlalchemy import tables
|
||||
from taskflow.persistence import base
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.persistence import models
|
||||
from taskflow.types import failure
|
||||
from taskflow.utils import eventlet_utils
|
||||
from taskflow.utils import misc
|
||||
@ -194,16 +194,16 @@ class _Alchemist(object):
|
||||
|
||||
@staticmethod
|
||||
def convert_flow_detail(row):
|
||||
return logbook.FlowDetail.from_dict(dict(row.items()))
|
||||
return models.FlowDetail.from_dict(dict(row.items()))
|
||||
|
||||
@staticmethod
|
||||
def convert_book(row):
|
||||
return logbook.LogBook.from_dict(dict(row.items()))
|
||||
return models.LogBook.from_dict(dict(row.items()))
|
||||
|
||||
@staticmethod
|
||||
def convert_atom_detail(row):
|
||||
row = dict(row.items())
|
||||
atom_cls = logbook.atom_detail_class(row.pop('atom_type'))
|
||||
atom_cls = models.atom_detail_class(row.pop('atom_type'))
|
||||
return atom_cls.from_dict(row)
|
||||
|
||||
def atom_query_iter(self, conn, parent_uuid):
|
||||
@ -457,7 +457,7 @@ class Connection(base.Connection):
|
||||
def _insert_atom_details(self, conn, ad, parent_uuid):
|
||||
value = ad.to_dict()
|
||||
value['parent_uuid'] = parent_uuid
|
||||
value['atom_type'] = logbook.atom_detail_type(ad)
|
||||
value['atom_type'] = models.atom_detail_type(ad)
|
||||
conn.execute(sql.insert(self._tables.atomdetails, value))
|
||||
|
||||
def _update_atom_details(self, conn, ad, e_ad):
|
||||
|
@ -29,11 +29,11 @@ down_revision = '1c783c0c2875'
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.persistence import models
|
||||
|
||||
|
||||
def upgrade():
|
||||
atom_types = sa.Enum(*logbook.ATOM_TYPES, name='atom_types')
|
||||
atom_types = sa.Enum(*models.ATOM_TYPES, name='atom_types')
|
||||
column = sa.Column('atom_type', atom_types)
|
||||
bind = op.get_bind()
|
||||
impl = atom_types.dialect_impl(bind.dialect)
|
||||
|
@ -22,7 +22,7 @@ from oslo_utils import uuidutils
|
||||
from sqlalchemy import Table, Column, String, ForeignKey, DateTime, Enum
|
||||
from sqlalchemy import types
|
||||
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.persistence import models
|
||||
from taskflow import states
|
||||
|
||||
Tables = collections.namedtuple('Tables',
|
||||
@ -92,7 +92,7 @@ def fetch(metadata):
|
||||
default=uuidutils.generate_uuid),
|
||||
Column('failure', Json),
|
||||
Column('results', Json),
|
||||
Column('atom_type', Enum(*logbook.ATOM_TYPES,
|
||||
Column('atom_type', Enum(*models.ATOM_TYPES,
|
||||
name='atom_types')),
|
||||
Column('intention', Enum(*states.INTENTIONS,
|
||||
name='intentions')))
|
||||
|
@ -18,7 +18,7 @@ import abc
|
||||
|
||||
import six
|
||||
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.persistence import models
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
@ -125,5 +125,5 @@ class Connection(object):
|
||||
def _format_atom(atom_detail):
|
||||
return {
|
||||
'atom': atom_detail.to_dict(),
|
||||
'type': logbook.atom_detail_type(atom_detail),
|
||||
'type': models.atom_detail_type(atom_detail),
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
|
||||
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -15,878 +14,24 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import copy
|
||||
from debtcollector import removals
|
||||
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
from taskflow.persistence import models
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import logging
|
||||
from taskflow import states
|
||||
from taskflow.types import failure as ft
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
# TODO(harlowja): remove me in a future version, since the models
|
||||
# module is more appropriately named to what the objects in it are used for...
|
||||
removals.removed_module(__name__, replacement="'%s'" % models.__name__,
|
||||
version="0.11", removal_version='?',
|
||||
stacklevel=4)
|
||||
|
||||
|
||||
# Internal helpers...
|
||||
|
||||
|
||||
def _copy_function(deep_copy):
|
||||
if deep_copy:
|
||||
return copy.deepcopy
|
||||
else:
|
||||
return lambda x: x
|
||||
|
||||
|
||||
def _safe_marshal_time(when):
|
||||
if not when:
|
||||
return None
|
||||
return timeutils.marshall_now(now=when)
|
||||
|
||||
|
||||
def _safe_unmarshal_time(when):
|
||||
if not when:
|
||||
return None
|
||||
return timeutils.unmarshall_time(when)
|
||||
|
||||
|
||||
def _fix_meta(data):
|
||||
# Handle the case where older schemas allowed this to be non-dict by
|
||||
# correcting this case by replacing it with a dictionary when a non-dict
|
||||
# is found.
|
||||
meta = data.get('meta')
|
||||
if not isinstance(meta, dict):
|
||||
meta = {}
|
||||
return meta
|
||||
|
||||
|
||||
class LogBook(object):
|
||||
"""A collection of flow details and associated metadata.
|
||||
|
||||
Typically this class contains a collection of flow detail entries
|
||||
for a given engine (or job) so that those entities can track what 'work'
|
||||
has been completed for resumption, reverting and miscellaneous tracking
|
||||
purposes.
|
||||
|
||||
The data contained within this class need **not** be persisted to the
|
||||
backend storage in real time. The data in this class will only be
|
||||
guaranteed to be persisted when a save occurs via some backend
|
||||
connection.
|
||||
|
||||
NOTE(harlowja): the naming of this class is analogous to a ship's log or a
|
||||
similar type of record used in detailing work that has been completed (or
|
||||
work that has not been completed).
|
||||
|
||||
:ivar created_at: A ``datetime.datetime`` object of when this logbook
|
||||
was created.
|
||||
:ivar updated_at: A ``datetime.datetime`` object of when this logbook
|
||||
was last updated at.
|
||||
:ivar meta: A dictionary of meta-data associated with this logbook.
|
||||
"""
|
||||
def __init__(self, name, uuid=None):
|
||||
if uuid:
|
||||
self._uuid = uuid
|
||||
else:
|
||||
self._uuid = uuidutils.generate_uuid()
|
||||
self._name = name
|
||||
self._flowdetails_by_id = {}
|
||||
self.created_at = timeutils.utcnow()
|
||||
self.updated_at = None
|
||||
self.meta = {}
|
||||
|
||||
def add(self, fd):
|
||||
"""Adds a new flow detail into this logbook.
|
||||
|
||||
NOTE(harlowja): if an existing flow detail exists with the same
|
||||
uuid the existing one will be overwritten with the newly provided
|
||||
one.
|
||||
|
||||
Does not *guarantee* that the details will be immediately saved.
|
||||
"""
|
||||
self._flowdetails_by_id[fd.uuid] = fd
|
||||
self.updated_at = timeutils.utcnow()
|
||||
|
||||
def find(self, flow_uuid):
|
||||
"""Locate the flow detail corresponding to the given uuid.
|
||||
|
||||
:returns: the flow detail with that uuid
|
||||
:rtype: :py:class:`.FlowDetail` (or ``None`` if not found)
|
||||
"""
|
||||
return self._flowdetails_by_id.get(flow_uuid, None)
|
||||
|
||||
def merge(self, lb, deep_copy=False):
|
||||
"""Merges the current object state with the given ones state.
|
||||
|
||||
If ``deep_copy`` is provided as truthy then the
|
||||
local object will use ``copy.deepcopy`` to replace this objects
|
||||
local attributes with the provided objects attributes (**only** if
|
||||
there is a difference between this objects attributes and the
|
||||
provided attributes). If ``deep_copy`` is falsey (the default) then a
|
||||
reference copy will occur instead when a difference is detected.
|
||||
|
||||
NOTE(harlowja): If the provided object is this object itself
|
||||
then **no** merging is done. Also note that this does **not** merge
|
||||
the flow details contained in either.
|
||||
|
||||
:returns: this logbook (freshly merged with the incoming object)
|
||||
:rtype: :py:class:`.LogBook`
|
||||
"""
|
||||
if lb is self:
|
||||
return self
|
||||
copy_fn = _copy_function(deep_copy)
|
||||
if self.meta != lb.meta:
|
||||
self.meta = copy_fn(lb.meta)
|
||||
if lb.created_at != self.created_at:
|
||||
self.created_at = copy_fn(lb.created_at)
|
||||
if lb.updated_at != self.updated_at:
|
||||
self.updated_at = copy_fn(lb.updated_at)
|
||||
return self
|
||||
|
||||
def to_dict(self, marshal_time=False):
|
||||
"""Translates the internal state of this object to a ``dict``.
|
||||
|
||||
NOTE(harlowja): The returned ``dict`` does **not** include any
|
||||
contained flow details.
|
||||
|
||||
:returns: this logbook in ``dict`` form
|
||||
"""
|
||||
if not marshal_time:
|
||||
marshal_fn = lambda x: x
|
||||
else:
|
||||
marshal_fn = _safe_marshal_time
|
||||
return {
|
||||
'name': self.name,
|
||||
'meta': self.meta,
|
||||
'uuid': self.uuid,
|
||||
'updated_at': marshal_fn(self.updated_at),
|
||||
'created_at': marshal_fn(self.created_at),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data, unmarshal_time=False):
|
||||
"""Translates the given ``dict`` into an instance of this class.
|
||||
|
||||
NOTE(harlowja): the ``dict`` provided should come from a prior
|
||||
call to :meth:`.to_dict`.
|
||||
|
||||
:returns: a new logbook
|
||||
:rtype: :py:class:`.LogBook`
|
||||
"""
|
||||
if not unmarshal_time:
|
||||
unmarshal_fn = lambda x: x
|
||||
else:
|
||||
unmarshal_fn = _safe_unmarshal_time
|
||||
obj = cls(data['name'], uuid=data['uuid'])
|
||||
obj.updated_at = unmarshal_fn(data['updated_at'])
|
||||
obj.created_at = unmarshal_fn(data['created_at'])
|
||||
obj.meta = _fix_meta(data)
|
||||
return obj
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
"""The unique identifer of this logbook."""
|
||||
return self._uuid
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""The name of this logbook."""
|
||||
return self._name
|
||||
|
||||
def __iter__(self):
|
||||
for fd in six.itervalues(self._flowdetails_by_id):
|
||||
yield fd
|
||||
|
||||
def __len__(self):
|
||||
return len(self._flowdetails_by_id)
|
||||
|
||||
def copy(self, retain_contents=True):
|
||||
"""Copies this logbook.
|
||||
|
||||
Creates a shallow copy of this logbook. If this logbook contains
|
||||
flow details and ``retain_contents`` is truthy (the default) then
|
||||
the flow details container will be shallow copied (the flow details
|
||||
contained there-in will **not** be copied). If ``retain_contents`` is
|
||||
falsey then the copied logbook will have **no** contained flow
|
||||
details (but it will have the rest of the local objects attributes
|
||||
copied).
|
||||
|
||||
:returns: a new logbook
|
||||
:rtype: :py:class:`.LogBook`
|
||||
"""
|
||||
clone = copy.copy(self)
|
||||
if not retain_contents:
|
||||
clone._flowdetails_by_id = {}
|
||||
else:
|
||||
clone._flowdetails_by_id = self._flowdetails_by_id.copy()
|
||||
if self.meta:
|
||||
clone.meta = self.meta.copy()
|
||||
return clone
|
||||
|
||||
|
||||
class FlowDetail(object):
|
||||
"""A collection of atom details and associated metadata.
|
||||
|
||||
Typically this class contains a collection of atom detail entries that
|
||||
represent the atoms in a given flow structure (along with any other needed
|
||||
metadata relevant to that flow).
|
||||
|
||||
The data contained within this class need **not** be persisted to the
|
||||
backend storage in real time. The data in this class will only be
|
||||
guaranteed to be persisted when a save (or update) occurs via some backend
|
||||
connection.
|
||||
|
||||
:ivar state: The state of the flow associated with this flow detail.
|
||||
:ivar meta: A dictionary of meta-data associated with this flow detail.
|
||||
"""
|
||||
def __init__(self, name, uuid):
|
||||
self._uuid = uuid
|
||||
self._name = name
|
||||
self._atomdetails_by_id = {}
|
||||
self.state = None
|
||||
self.meta = {}
|
||||
|
||||
def update(self, fd):
|
||||
"""Updates the objects state to be the same as the given one.
|
||||
|
||||
This will assign the private and public attributes of the given
|
||||
flow detail directly to this object (replacing any existing
|
||||
attributes in this object; even if they are the **same**).
|
||||
|
||||
NOTE(harlowja): If the provided object is this object itself
|
||||
then **no** update is done.
|
||||
|
||||
:returns: this flow detail
|
||||
:rtype: :py:class:`.FlowDetail`
|
||||
"""
|
||||
if fd is self:
|
||||
return self
|
||||
self._atomdetails_by_id = fd._atomdetails_by_id
|
||||
self.state = fd.state
|
||||
self.meta = fd.meta
|
||||
return self
|
||||
|
||||
def merge(self, fd, deep_copy=False):
|
||||
"""Merges the current object state with the given one's state.
|
||||
|
||||
If ``deep_copy`` is provided as truthy then the
|
||||
local object will use ``copy.deepcopy`` to replace this objects
|
||||
local attributes with the provided objects attributes (**only** if
|
||||
there is a difference between this objects attributes and the
|
||||
provided attributes). If ``deep_copy`` is falsey (the default) then a
|
||||
reference copy will occur instead when a difference is detected.
|
||||
|
||||
NOTE(harlowja): If the provided object is this object itself
|
||||
then **no** merging is done. Also this does **not** merge the atom
|
||||
details contained in either.
|
||||
|
||||
:returns: this flow detail (freshly merged with the incoming object)
|
||||
:rtype: :py:class:`.FlowDetail`
|
||||
"""
|
||||
if fd is self:
|
||||
return self
|
||||
copy_fn = _copy_function(deep_copy)
|
||||
if self.meta != fd.meta:
|
||||
self.meta = copy_fn(fd.meta)
|
||||
if self.state != fd.state:
|
||||
# NOTE(imelnikov): states are just strings, no need to copy.
|
||||
self.state = fd.state
|
||||
return self
|
||||
|
||||
def copy(self, retain_contents=True):
|
||||
"""Copies this flow detail.
|
||||
|
||||
Creates a shallow copy of this flow detail. If this detail contains
|
||||
flow details and ``retain_contents`` is truthy (the default) then
|
||||
the atom details container will be shallow copied (the atom details
|
||||
contained there-in will **not** be copied). If ``retain_contents`` is
|
||||
falsey then the copied flow detail will have **no** contained atom
|
||||
details (but it will have the rest of the local objects attributes
|
||||
copied).
|
||||
|
||||
:returns: a new flow detail
|
||||
:rtype: :py:class:`.FlowDetail`
|
||||
"""
|
||||
clone = copy.copy(self)
|
||||
if not retain_contents:
|
||||
clone._atomdetails_by_id = {}
|
||||
else:
|
||||
clone._atomdetails_by_id = self._atomdetails_by_id.copy()
|
||||
if self.meta:
|
||||
clone.meta = self.meta.copy()
|
||||
return clone
|
||||
|
||||
def to_dict(self):
|
||||
"""Translates the internal state of this object to a ``dict``.
|
||||
|
||||
NOTE(harlowja): The returned ``dict`` does **not** include any
|
||||
contained atom details.
|
||||
|
||||
:returns: this flow detail in ``dict`` form
|
||||
"""
|
||||
return {
|
||||
'name': self.name,
|
||||
'meta': self.meta,
|
||||
'state': self.state,
|
||||
'uuid': self.uuid,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data):
|
||||
"""Translates the given ``dict`` into an instance of this class.
|
||||
|
||||
NOTE(harlowja): the ``dict`` provided should come from a prior
|
||||
call to :meth:`.to_dict`.
|
||||
|
||||
:returns: a new flow detail
|
||||
:rtype: :py:class:`.FlowDetail`
|
||||
"""
|
||||
obj = cls(data['name'], data['uuid'])
|
||||
obj.state = data.get('state')
|
||||
obj.meta = _fix_meta(data)
|
||||
return obj
|
||||
|
||||
def add(self, ad):
|
||||
"""Adds a new atom detail into this flow detail.
|
||||
|
||||
NOTE(harlowja): if an existing atom detail exists with the same
|
||||
uuid the existing one will be overwritten with the newly provided
|
||||
one.
|
||||
|
||||
Does not *guarantee* that the details will be immediately saved.
|
||||
"""
|
||||
self._atomdetails_by_id[ad.uuid] = ad
|
||||
|
||||
def find(self, ad_uuid):
|
||||
"""Locate the atom detail corresponding to the given uuid.
|
||||
|
||||
:returns: the atom detail with that uuid
|
||||
:rtype: :py:class:`.AtomDetail` (or ``None`` if not found)
|
||||
"""
|
||||
return self._atomdetails_by_id.get(ad_uuid)
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
"""The unique identifer of this flow detail."""
|
||||
return self._uuid
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""The name of this flow detail."""
|
||||
return self._name
|
||||
|
||||
def __iter__(self):
|
||||
for ad in six.itervalues(self._atomdetails_by_id):
|
||||
yield ad
|
||||
|
||||
def __len__(self):
|
||||
return len(self._atomdetails_by_id)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class AtomDetail(object):
|
||||
"""A collection of atom specific runtime information and metadata.
|
||||
|
||||
This is a base **abstract** class that contains attributes that are used
|
||||
to connect a atom to the persistence layer before, during, or after it is
|
||||
running. It includes any results it may have produced, any state that it
|
||||
may be in (for example ``FAILURE``), any exception that occurred when
|
||||
running, and any associated stacktrace that may have occurring during an
|
||||
exception being thrown. It may also contain any other metadata that
|
||||
should also be stored along-side the details about the connected atom.
|
||||
|
||||
The data contained within this class need **not** be persisted to the
|
||||
backend storage in real time. The data in this class will only be
|
||||
guaranteed to be persisted when a save (or update) occurs via some backend
|
||||
connection.
|
||||
|
||||
:ivar state: The state of the atom associated with this atom detail.
|
||||
:ivar intention: The execution strategy of the atom associated
|
||||
with this atom detail (used by an engine/others to
|
||||
determine if the associated atom needs to be
|
||||
executed, reverted, retried and so-on).
|
||||
:ivar meta: A dictionary of meta-data associated with this atom detail.
|
||||
:ivar version: A version tuple or string that represents the
|
||||
atom version this atom detail is associated with (typically
|
||||
used for introspection and any data migration
|
||||
strategies).
|
||||
:ivar results: Any results the atom produced from either its
|
||||
``execute`` method or from other sources.
|
||||
:ivar failure: If the atom failed (possibly due to its ``execute``
|
||||
method raising) this will be a
|
||||
:py:class:`~taskflow.types.failure.Failure` object that
|
||||
represents that failure (if there was no failure this
|
||||
will be set to none).
|
||||
"""
|
||||
|
||||
def __init__(self, name, uuid):
|
||||
self._uuid = uuid
|
||||
self._name = name
|
||||
self.state = None
|
||||
self.intention = states.EXECUTE
|
||||
self.results = None
|
||||
self.failure = None
|
||||
self.meta = {}
|
||||
self.version = None
|
||||
|
||||
@staticmethod
|
||||
def _was_failure(state, result):
|
||||
# Internal helper method...
|
||||
return state == states.FAILURE and isinstance(result, ft.Failure)
|
||||
|
||||
@property
|
||||
def last_results(self):
|
||||
"""Gets the atoms last result.
|
||||
|
||||
If the atom has produced many results (for example if it has been
|
||||
retried, reverted, executed and ...) this returns the last one of
|
||||
many results.
|
||||
"""
|
||||
return self.results
|
||||
|
||||
def update(self, ad):
|
||||
"""Updates the object's state to be the same as the given one.
|
||||
|
||||
This will assign the private and public attributes of the given
|
||||
atom detail directly to this object (replacing any existing
|
||||
attributes in this object; even if they are the **same**).
|
||||
|
||||
NOTE(harlowja): If the provided object is this object itself
|
||||
then **no** update is done.
|
||||
|
||||
:returns: this atom detail
|
||||
:rtype: :py:class:`.AtomDetail`
|
||||
"""
|
||||
if ad is self:
|
||||
return self
|
||||
self.state = ad.state
|
||||
self.intention = ad.intention
|
||||
self.meta = ad.meta
|
||||
self.failure = ad.failure
|
||||
self.results = ad.results
|
||||
self.version = ad.version
|
||||
return self
|
||||
|
||||
@abc.abstractmethod
|
||||
def merge(self, other, deep_copy=False):
|
||||
"""Merges the current object state with the given ones state.
|
||||
|
||||
If ``deep_copy`` is provided as truthy then the
|
||||
local object will use ``copy.deepcopy`` to replace this objects
|
||||
local attributes with the provided objects attributes (**only** if
|
||||
there is a difference between this objects attributes and the
|
||||
provided attributes). If ``deep_copy`` is falsey (the default) then a
|
||||
reference copy will occur instead when a difference is detected.
|
||||
|
||||
NOTE(harlowja): If the provided object is this object itself
|
||||
then **no** merging is done. Do note that **no** results are merged
|
||||
in this method. That operation **must** to be the responsibilty of
|
||||
subclasses to implement and override this abstract method
|
||||
and provide that merging themselves as they see fit.
|
||||
|
||||
:returns: this atom detail (freshly merged with the incoming object)
|
||||
:rtype: :py:class:`.AtomDetail`
|
||||
"""
|
||||
copy_fn = _copy_function(deep_copy)
|
||||
# NOTE(imelnikov): states and intentions are just strings,
|
||||
# so there is no need to copy them (strings are immutable in python).
|
||||
self.state = other.state
|
||||
self.intention = other.intention
|
||||
if self.failure != other.failure:
|
||||
# NOTE(imelnikov): we can't just deep copy Failures, as they
|
||||
# contain tracebacks, which are not copyable.
|
||||
if other.failure:
|
||||
if deep_copy:
|
||||
self.failure = other.failure.copy()
|
||||
else:
|
||||
self.failure = other.failure
|
||||
else:
|
||||
self.failure = None
|
||||
if self.meta != other.meta:
|
||||
self.meta = copy_fn(other.meta)
|
||||
if self.version != other.version:
|
||||
self.version = copy_fn(other.version)
|
||||
return self
|
||||
|
||||
@abc.abstractmethod
|
||||
def put(self, state, result):
|
||||
"""Puts a result (acquired in the given state) into this detail."""
|
||||
|
||||
def to_dict(self):
|
||||
"""Translates the internal state of this object to a ``dict``.
|
||||
|
||||
:returns: this atom detail in ``dict`` form
|
||||
"""
|
||||
if self.failure:
|
||||
failure = self.failure.to_dict()
|
||||
else:
|
||||
failure = None
|
||||
return {
|
||||
'failure': failure,
|
||||
'meta': self.meta,
|
||||
'name': self.name,
|
||||
'results': self.results,
|
||||
'state': self.state,
|
||||
'version': self.version,
|
||||
'intention': self.intention,
|
||||
'uuid': self.uuid,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data):
|
||||
"""Translates the given ``dict`` into an instance of this class.
|
||||
|
||||
NOTE(harlowja): the ``dict`` provided should come from a prior
|
||||
call to :meth:`.to_dict`.
|
||||
|
||||
:returns: a new atom detail
|
||||
:rtype: :py:class:`.AtomDetail`
|
||||
"""
|
||||
obj = cls(data['name'], data['uuid'])
|
||||
obj.state = data.get('state')
|
||||
obj.intention = data.get('intention')
|
||||
obj.results = data.get('results')
|
||||
obj.version = data.get('version')
|
||||
obj.meta = _fix_meta(data)
|
||||
failure = data.get('failure')
|
||||
if failure:
|
||||
obj.failure = ft.Failure.from_dict(failure)
|
||||
return obj
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
"""The unique identifer of this atom detail."""
|
||||
return self._uuid
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""The name of this atom detail."""
|
||||
return self._name
|
||||
|
||||
@abc.abstractmethod
|
||||
def reset(self, state):
|
||||
"""Resets this atom detail and sets ``state`` attribute value."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def copy(self):
|
||||
"""Copies this atom detail."""
|
||||
|
||||
|
||||
class TaskDetail(AtomDetail):
|
||||
"""A task detail (an atom detail typically associated with a |tt| atom).
|
||||
|
||||
.. |tt| replace:: :py:class:`~taskflow.task.BaseTask`
|
||||
"""
|
||||
|
||||
def reset(self, state):
|
||||
"""Resets this task detail and sets ``state`` attribute value.
|
||||
|
||||
This sets any previously set ``results`` and ``failure`` attributes
|
||||
back to ``None`` and sets the state to the provided one, as well as
|
||||
setting this task details ``intention`` attribute to ``EXECUTE``.
|
||||
"""
|
||||
self.results = None
|
||||
self.failure = None
|
||||
self.state = state
|
||||
self.intention = states.EXECUTE
|
||||
|
||||
def put(self, state, result):
|
||||
"""Puts a result (acquired in the given state) into this detail.
|
||||
|
||||
If the result is a :py:class:`~taskflow.types.failure.Failure` object
|
||||
then the ``failure`` attribute will be set (and the ``results``
|
||||
attribute will be set to ``None``); if the result is not a
|
||||
:py:class:`~taskflow.types.failure.Failure` object then the
|
||||
``results`` attribute will be set (and the ``failure`` attribute
|
||||
will be set to ``None``). In either case the ``state``
|
||||
attribute will be set to the provided state.
|
||||
"""
|
||||
was_altered = False
|
||||
if self.state != state:
|
||||
self.state = state
|
||||
was_altered = True
|
||||
if self._was_failure(state, result):
|
||||
if self.failure != result:
|
||||
self.failure = result
|
||||
was_altered = True
|
||||
if self.results is not None:
|
||||
self.results = None
|
||||
was_altered = True
|
||||
else:
|
||||
# We don't really have the ability to determine equality of
|
||||
# task (user) results at the current time, without making
|
||||
# potentially bad guesses, so assume the task detail always needs
|
||||
# to be saved if they are not exactly equivalent...
|
||||
if self.results is not result:
|
||||
self.results = result
|
||||
was_altered = True
|
||||
if self.failure is not None:
|
||||
self.failure = None
|
||||
was_altered = True
|
||||
return was_altered
|
||||
|
||||
def merge(self, other, deep_copy=False):
|
||||
"""Merges the current task detail with the given one.
|
||||
|
||||
NOTE(harlowja): This merge does **not** copy and replace
|
||||
the ``results`` attribute if it differs. Instead the current
|
||||
objects ``results`` attribute directly becomes (via assignment) the
|
||||
other objects ``results`` attribute. Also note that if the provided
|
||||
object is this object itself then **no** merging is done.
|
||||
|
||||
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
|
||||
what happens if this is copied at a deeper level (for example by
|
||||
using ``copy.deepcopy`` or by using ``copy.copy``).
|
||||
|
||||
:returns: this task detail (freshly merged with the incoming object)
|
||||
:rtype: :py:class:`.TaskDetail`
|
||||
"""
|
||||
if not isinstance(other, TaskDetail):
|
||||
raise exc.NotImplementedError("Can only merge with other"
|
||||
" task details")
|
||||
if other is self:
|
||||
return self
|
||||
super(TaskDetail, self).merge(other, deep_copy=deep_copy)
|
||||
if self.results != other.results:
|
||||
self.results = other.results
|
||||
return self
|
||||
|
||||
def copy(self):
|
||||
"""Copies this task detail.
|
||||
|
||||
Creates a shallow copy of this task detail (any meta-data and
|
||||
version information that this object maintains is shallow
|
||||
copied via ``copy.copy``).
|
||||
|
||||
NOTE(harlowja): This copy does **not** perform ``copy.copy`` on
|
||||
the ``results`` attribute of this object (before assigning to the
|
||||
copy). Instead the current objects ``results`` attribute directly
|
||||
becomes (via assignment) the copied objects ``results`` attribute.
|
||||
|
||||
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
|
||||
what happens if this is copied at a deeper level (for example by
|
||||
using ``copy.deepcopy`` or by using ``copy.copy``).
|
||||
|
||||
:returns: a new task detail
|
||||
:rtype: :py:class:`.TaskDetail`
|
||||
"""
|
||||
clone = copy.copy(self)
|
||||
clone.results = self.results
|
||||
if self.meta:
|
||||
clone.meta = self.meta.copy()
|
||||
if self.version:
|
||||
clone.version = copy.copy(self.version)
|
||||
return clone
|
||||
|
||||
|
||||
class RetryDetail(AtomDetail):
|
||||
"""A retry detail (an atom detail typically associated with a |rt| atom).
|
||||
|
||||
.. |rt| replace:: :py:class:`~taskflow.retry.Retry`
|
||||
"""
|
||||
|
||||
def __init__(self, name, uuid):
|
||||
super(RetryDetail, self).__init__(name, uuid)
|
||||
self.results = []
|
||||
|
||||
def reset(self, state):
|
||||
"""Resets this retry detail and sets ``state`` attribute value.
|
||||
|
||||
This sets any previously added ``results`` back to an empty list
|
||||
and resets the ``failure`` attribute back to ``None`` and sets the
|
||||
state to the provided one, as well as setting this atom
|
||||
details ``intention`` attribute to ``EXECUTE``.
|
||||
"""
|
||||
self.results = []
|
||||
self.failure = None
|
||||
self.state = state
|
||||
self.intention = states.EXECUTE
|
||||
|
||||
def copy(self):
|
||||
"""Copies this retry detail.
|
||||
|
||||
Creates a shallow copy of this retry detail (any meta-data and
|
||||
version information that this object maintains is shallow
|
||||
copied via ``copy.copy``).
|
||||
|
||||
NOTE(harlowja): This copy does **not** copy
|
||||
the incoming objects ``results`` attribute. Instead this
|
||||
objects ``results`` attribute list is iterated over and a new list
|
||||
is constructed with each ``(data, failures)`` element in that list
|
||||
having its ``failures`` (a dictionary of each named
|
||||
:py:class:`~taskflow.types.failure.Failure` object that
|
||||
occured) copied but its ``data`` is left untouched. After
|
||||
this is done that new list becomes (via assignment) the cloned
|
||||
objects ``results`` attribute.
|
||||
|
||||
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
|
||||
what happens if the ``data`` in ``results`` is copied at a
|
||||
deeper level (for example by using ``copy.deepcopy`` or by
|
||||
using ``copy.copy``).
|
||||
|
||||
:returns: a new retry detail
|
||||
:rtype: :py:class:`.RetryDetail`
|
||||
"""
|
||||
clone = copy.copy(self)
|
||||
results = []
|
||||
# NOTE(imelnikov): we can't just deep copy Failures, as they
|
||||
# contain tracebacks, which are not copyable.
|
||||
for (data, failures) in self.results:
|
||||
copied_failures = {}
|
||||
for (key, failure) in six.iteritems(failures):
|
||||
copied_failures[key] = failure
|
||||
results.append((data, copied_failures))
|
||||
clone.results = results
|
||||
if self.meta:
|
||||
clone.meta = self.meta.copy()
|
||||
if self.version:
|
||||
clone.version = copy.copy(self.version)
|
||||
return clone
|
||||
|
||||
@property
|
||||
def last_results(self):
|
||||
"""The last result that was produced."""
|
||||
try:
|
||||
return self.results[-1][0]
|
||||
except IndexError:
|
||||
exc.raise_with_cause(exc.NotFound, "Last results not found")
|
||||
|
||||
@property
|
||||
def last_failures(self):
|
||||
"""The last failure dictionary that was produced.
|
||||
|
||||
NOTE(harlowja): This is **not** the same as the
|
||||
local ``failure`` attribute as the obtained failure dictionary in
|
||||
the ``results`` attribute (which is what this returns) is from
|
||||
associated atom failures (which is different from the directly
|
||||
related failure of the retry unit associated with this
|
||||
atom detail).
|
||||
"""
|
||||
try:
|
||||
return self.results[-1][1]
|
||||
except IndexError:
|
||||
exc.raise_with_cause(exc.NotFound, "Last failures not found")
|
||||
|
||||
def put(self, state, result):
|
||||
"""Puts a result (acquired in the given state) into this detail.
|
||||
|
||||
If the result is a :py:class:`~taskflow.types.failure.Failure` object
|
||||
then the ``failure`` attribute will be set; if the result is not a
|
||||
:py:class:`~taskflow.types.failure.Failure` object then the
|
||||
``results`` attribute will be appended to (and the ``failure``
|
||||
attribute will be set to ``None``). In either case the ``state``
|
||||
attribute will be set to the provided state.
|
||||
"""
|
||||
# Do not clean retry history (only on reset does this happen).
|
||||
self.state = state
|
||||
if self._was_failure(state, result):
|
||||
self.failure = result
|
||||
else:
|
||||
self.results.append((result, {}))
|
||||
self.failure = None
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data):
|
||||
"""Translates the given ``dict`` into an instance of this class."""
|
||||
|
||||
def decode_results(results):
|
||||
if not results:
|
||||
return []
|
||||
new_results = []
|
||||
for (data, failures) in results:
|
||||
new_failures = {}
|
||||
for (key, data) in six.iteritems(failures):
|
||||
new_failures[key] = ft.Failure.from_dict(data)
|
||||
new_results.append((data, new_failures))
|
||||
return new_results
|
||||
|
||||
obj = super(RetryDetail, cls).from_dict(data)
|
||||
obj.results = decode_results(obj.results)
|
||||
return obj
|
||||
|
||||
def to_dict(self):
|
||||
"""Translates the internal state of this object to a ``dict``."""
|
||||
|
||||
def encode_results(results):
|
||||
if not results:
|
||||
return []
|
||||
new_results = []
|
||||
for (data, failures) in results:
|
||||
new_failures = {}
|
||||
for (key, failure) in six.iteritems(failures):
|
||||
new_failures[key] = failure.to_dict()
|
||||
new_results.append((data, new_failures))
|
||||
return new_results
|
||||
|
||||
base = super(RetryDetail, self).to_dict()
|
||||
base['results'] = encode_results(base.get('results'))
|
||||
return base
|
||||
|
||||
def merge(self, other, deep_copy=False):
|
||||
"""Merges the current retry detail with the given one.
|
||||
|
||||
NOTE(harlowja): This merge does **not** deep copy
|
||||
the incoming objects ``results`` attribute (if it differs). Instead
|
||||
the incoming objects ``results`` attribute list is **always** iterated
|
||||
over and a new list is constructed with
|
||||
each ``(data, failures)`` element in that list having
|
||||
its ``failures`` (a dictionary of each named
|
||||
:py:class:`~taskflow.types.failure.Failure` objects that
|
||||
occurred) copied but its ``data`` is left untouched. After
|
||||
this is done that new list becomes (via assignment) this
|
||||
objects ``results`` attribute. Also note that if the provided object
|
||||
is this object itself then **no** merging is done.
|
||||
|
||||
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
|
||||
what happens if the ``data`` in ``results`` is copied at a
|
||||
deeper level (for example by using ``copy.deepcopy`` or by
|
||||
using ``copy.copy``).
|
||||
|
||||
:returns: this retry detail (freshly merged with the incoming object)
|
||||
:rtype: :py:class:`.RetryDetail`
|
||||
"""
|
||||
if not isinstance(other, RetryDetail):
|
||||
raise exc.NotImplementedError("Can only merge with other"
|
||||
" retry details")
|
||||
if other is self:
|
||||
return self
|
||||
super(RetryDetail, self).merge(other, deep_copy=deep_copy)
|
||||
results = []
|
||||
# NOTE(imelnikov): we can't just deep copy Failures, as they
|
||||
# contain tracebacks, which are not copyable.
|
||||
for (data, failures) in other.results:
|
||||
copied_failures = {}
|
||||
for (key, failure) in six.iteritems(failures):
|
||||
if deep_copy:
|
||||
copied_failures[key] = failure.copy()
|
||||
else:
|
||||
copied_failures[key] = failure
|
||||
results.append((data, copied_failures))
|
||||
self.results = results
|
||||
return self
|
||||
|
||||
|
||||
_DETAIL_TO_NAME = {
|
||||
RetryDetail: 'RETRY_DETAIL',
|
||||
TaskDetail: 'TASK_DETAIL',
|
||||
}
|
||||
_NAME_TO_DETAIL = dict((name, cls)
|
||||
for (cls, name) in six.iteritems(_DETAIL_TO_NAME))
|
||||
ATOM_TYPES = list(six.iterkeys(_NAME_TO_DETAIL))
|
||||
|
||||
|
||||
def atom_detail_class(atom_type):
|
||||
try:
|
||||
return _NAME_TO_DETAIL[atom_type]
|
||||
except KeyError:
|
||||
raise TypeError("Unknown atom type '%s'" % (atom_type))
|
||||
|
||||
|
||||
def atom_detail_type(atom_detail):
|
||||
try:
|
||||
return _DETAIL_TO_NAME[type(atom_detail)]
|
||||
except KeyError:
|
||||
raise TypeError("Unknown atom '%s' (%s)"
|
||||
% (atom_detail, type(atom_detail)))
|
||||
# Keep alias classes/functions... around until this module is removed.
|
||||
LogBook = models.LogBook
|
||||
FlowDetail = models.FlowDetail
|
||||
AtomDetail = models.AtomDetail
|
||||
TaskDetail = models.TaskDetail
|
||||
RetryDetail = models.RetryDetail
|
||||
atom_detail_type = models.atom_detail_type
|
||||
atom_detail_class = models.atom_detail_class
|
||||
ATOM_TYPES = models.ATOM_TYPES
|
||||
|
892
taskflow/persistence/models.py
Normal file
892
taskflow/persistence/models.py
Normal file
@ -0,0 +1,892 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import copy
|
||||
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import logging
|
||||
from taskflow import states
|
||||
from taskflow.types import failure as ft
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Internal helpers...
|
||||
|
||||
|
||||
def _copy_function(deep_copy):
|
||||
if deep_copy:
|
||||
return copy.deepcopy
|
||||
else:
|
||||
return lambda x: x
|
||||
|
||||
|
||||
def _safe_marshal_time(when):
|
||||
if not when:
|
||||
return None
|
||||
return timeutils.marshall_now(now=when)
|
||||
|
||||
|
||||
def _safe_unmarshal_time(when):
|
||||
if not when:
|
||||
return None
|
||||
return timeutils.unmarshall_time(when)
|
||||
|
||||
|
||||
def _fix_meta(data):
|
||||
# Handle the case where older schemas allowed this to be non-dict by
|
||||
# correcting this case by replacing it with a dictionary when a non-dict
|
||||
# is found.
|
||||
meta = data.get('meta')
|
||||
if not isinstance(meta, dict):
|
||||
meta = {}
|
||||
return meta
|
||||
|
||||
|
||||
class LogBook(object):
|
||||
"""A collection of flow details and associated metadata.
|
||||
|
||||
Typically this class contains a collection of flow detail entries
|
||||
for a given engine (or job) so that those entities can track what 'work'
|
||||
has been completed for resumption, reverting and miscellaneous tracking
|
||||
purposes.
|
||||
|
||||
The data contained within this class need **not** be persisted to the
|
||||
backend storage in real time. The data in this class will only be
|
||||
guaranteed to be persisted when a save occurs via some backend
|
||||
connection.
|
||||
|
||||
NOTE(harlowja): the naming of this class is analogous to a ship's log or a
|
||||
similar type of record used in detailing work that has been completed (or
|
||||
work that has not been completed).
|
||||
|
||||
:ivar created_at: A ``datetime.datetime`` object of when this logbook
|
||||
was created.
|
||||
:ivar updated_at: A ``datetime.datetime`` object of when this logbook
|
||||
was last updated at.
|
||||
:ivar meta: A dictionary of meta-data associated with this logbook.
|
||||
"""
|
||||
def __init__(self, name, uuid=None):
|
||||
if uuid:
|
||||
self._uuid = uuid
|
||||
else:
|
||||
self._uuid = uuidutils.generate_uuid()
|
||||
self._name = name
|
||||
self._flowdetails_by_id = {}
|
||||
self.created_at = timeutils.utcnow()
|
||||
self.updated_at = None
|
||||
self.meta = {}
|
||||
|
||||
def add(self, fd):
|
||||
"""Adds a new flow detail into this logbook.
|
||||
|
||||
NOTE(harlowja): if an existing flow detail exists with the same
|
||||
uuid the existing one will be overwritten with the newly provided
|
||||
one.
|
||||
|
||||
Does not *guarantee* that the details will be immediately saved.
|
||||
"""
|
||||
self._flowdetails_by_id[fd.uuid] = fd
|
||||
self.updated_at = timeutils.utcnow()
|
||||
|
||||
def find(self, flow_uuid):
|
||||
"""Locate the flow detail corresponding to the given uuid.
|
||||
|
||||
:returns: the flow detail with that uuid
|
||||
:rtype: :py:class:`.FlowDetail` (or ``None`` if not found)
|
||||
"""
|
||||
return self._flowdetails_by_id.get(flow_uuid, None)
|
||||
|
||||
def merge(self, lb, deep_copy=False):
|
||||
"""Merges the current object state with the given ones state.
|
||||
|
||||
If ``deep_copy`` is provided as truthy then the
|
||||
local object will use ``copy.deepcopy`` to replace this objects
|
||||
local attributes with the provided objects attributes (**only** if
|
||||
there is a difference between this objects attributes and the
|
||||
provided attributes). If ``deep_copy`` is falsey (the default) then a
|
||||
reference copy will occur instead when a difference is detected.
|
||||
|
||||
NOTE(harlowja): If the provided object is this object itself
|
||||
then **no** merging is done. Also note that this does **not** merge
|
||||
the flow details contained in either.
|
||||
|
||||
:returns: this logbook (freshly merged with the incoming object)
|
||||
:rtype: :py:class:`.LogBook`
|
||||
"""
|
||||
if lb is self:
|
||||
return self
|
||||
copy_fn = _copy_function(deep_copy)
|
||||
if self.meta != lb.meta:
|
||||
self.meta = copy_fn(lb.meta)
|
||||
if lb.created_at != self.created_at:
|
||||
self.created_at = copy_fn(lb.created_at)
|
||||
if lb.updated_at != self.updated_at:
|
||||
self.updated_at = copy_fn(lb.updated_at)
|
||||
return self
|
||||
|
||||
def to_dict(self, marshal_time=False):
|
||||
"""Translates the internal state of this object to a ``dict``.
|
||||
|
||||
NOTE(harlowja): The returned ``dict`` does **not** include any
|
||||
contained flow details.
|
||||
|
||||
:returns: this logbook in ``dict`` form
|
||||
"""
|
||||
if not marshal_time:
|
||||
marshal_fn = lambda x: x
|
||||
else:
|
||||
marshal_fn = _safe_marshal_time
|
||||
return {
|
||||
'name': self.name,
|
||||
'meta': self.meta,
|
||||
'uuid': self.uuid,
|
||||
'updated_at': marshal_fn(self.updated_at),
|
||||
'created_at': marshal_fn(self.created_at),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data, unmarshal_time=False):
|
||||
"""Translates the given ``dict`` into an instance of this class.
|
||||
|
||||
NOTE(harlowja): the ``dict`` provided should come from a prior
|
||||
call to :meth:`.to_dict`.
|
||||
|
||||
:returns: a new logbook
|
||||
:rtype: :py:class:`.LogBook`
|
||||
"""
|
||||
if not unmarshal_time:
|
||||
unmarshal_fn = lambda x: x
|
||||
else:
|
||||
unmarshal_fn = _safe_unmarshal_time
|
||||
obj = cls(data['name'], uuid=data['uuid'])
|
||||
obj.updated_at = unmarshal_fn(data['updated_at'])
|
||||
obj.created_at = unmarshal_fn(data['created_at'])
|
||||
obj.meta = _fix_meta(data)
|
||||
return obj
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
"""The unique identifer of this logbook."""
|
||||
return self._uuid
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""The name of this logbook."""
|
||||
return self._name
|
||||
|
||||
def __iter__(self):
|
||||
for fd in six.itervalues(self._flowdetails_by_id):
|
||||
yield fd
|
||||
|
||||
def __len__(self):
|
||||
return len(self._flowdetails_by_id)
|
||||
|
||||
def copy(self, retain_contents=True):
|
||||
"""Copies this logbook.
|
||||
|
||||
Creates a shallow copy of this logbook. If this logbook contains
|
||||
flow details and ``retain_contents`` is truthy (the default) then
|
||||
the flow details container will be shallow copied (the flow details
|
||||
contained there-in will **not** be copied). If ``retain_contents`` is
|
||||
falsey then the copied logbook will have **no** contained flow
|
||||
details (but it will have the rest of the local objects attributes
|
||||
copied).
|
||||
|
||||
:returns: a new logbook
|
||||
:rtype: :py:class:`.LogBook`
|
||||
"""
|
||||
clone = copy.copy(self)
|
||||
if not retain_contents:
|
||||
clone._flowdetails_by_id = {}
|
||||
else:
|
||||
clone._flowdetails_by_id = self._flowdetails_by_id.copy()
|
||||
if self.meta:
|
||||
clone.meta = self.meta.copy()
|
||||
return clone
|
||||
|
||||
|
||||
class FlowDetail(object):
|
||||
"""A collection of atom details and associated metadata.
|
||||
|
||||
Typically this class contains a collection of atom detail entries that
|
||||
represent the atoms in a given flow structure (along with any other needed
|
||||
metadata relevant to that flow).
|
||||
|
||||
The data contained within this class need **not** be persisted to the
|
||||
backend storage in real time. The data in this class will only be
|
||||
guaranteed to be persisted when a save (or update) occurs via some backend
|
||||
connection.
|
||||
|
||||
:ivar state: The state of the flow associated with this flow detail.
|
||||
:ivar meta: A dictionary of meta-data associated with this flow detail.
|
||||
"""
|
||||
def __init__(self, name, uuid):
|
||||
self._uuid = uuid
|
||||
self._name = name
|
||||
self._atomdetails_by_id = {}
|
||||
self.state = None
|
||||
self.meta = {}
|
||||
|
||||
def update(self, fd):
|
||||
"""Updates the objects state to be the same as the given one.
|
||||
|
||||
This will assign the private and public attributes of the given
|
||||
flow detail directly to this object (replacing any existing
|
||||
attributes in this object; even if they are the **same**).
|
||||
|
||||
NOTE(harlowja): If the provided object is this object itself
|
||||
then **no** update is done.
|
||||
|
||||
:returns: this flow detail
|
||||
:rtype: :py:class:`.FlowDetail`
|
||||
"""
|
||||
if fd is self:
|
||||
return self
|
||||
self._atomdetails_by_id = fd._atomdetails_by_id
|
||||
self.state = fd.state
|
||||
self.meta = fd.meta
|
||||
return self
|
||||
|
||||
def merge(self, fd, deep_copy=False):
|
||||
"""Merges the current object state with the given one's state.
|
||||
|
||||
If ``deep_copy`` is provided as truthy then the
|
||||
local object will use ``copy.deepcopy`` to replace this objects
|
||||
local attributes with the provided objects attributes (**only** if
|
||||
there is a difference between this objects attributes and the
|
||||
provided attributes). If ``deep_copy`` is falsey (the default) then a
|
||||
reference copy will occur instead when a difference is detected.
|
||||
|
||||
NOTE(harlowja): If the provided object is this object itself
|
||||
then **no** merging is done. Also this does **not** merge the atom
|
||||
details contained in either.
|
||||
|
||||
:returns: this flow detail (freshly merged with the incoming object)
|
||||
:rtype: :py:class:`.FlowDetail`
|
||||
"""
|
||||
if fd is self:
|
||||
return self
|
||||
copy_fn = _copy_function(deep_copy)
|
||||
if self.meta != fd.meta:
|
||||
self.meta = copy_fn(fd.meta)
|
||||
if self.state != fd.state:
|
||||
# NOTE(imelnikov): states are just strings, no need to copy.
|
||||
self.state = fd.state
|
||||
return self
|
||||
|
||||
def copy(self, retain_contents=True):
|
||||
"""Copies this flow detail.
|
||||
|
||||
Creates a shallow copy of this flow detail. If this detail contains
|
||||
flow details and ``retain_contents`` is truthy (the default) then
|
||||
the atom details container will be shallow copied (the atom details
|
||||
contained there-in will **not** be copied). If ``retain_contents`` is
|
||||
falsey then the copied flow detail will have **no** contained atom
|
||||
details (but it will have the rest of the local objects attributes
|
||||
copied).
|
||||
|
||||
:returns: a new flow detail
|
||||
:rtype: :py:class:`.FlowDetail`
|
||||
"""
|
||||
clone = copy.copy(self)
|
||||
if not retain_contents:
|
||||
clone._atomdetails_by_id = {}
|
||||
else:
|
||||
clone._atomdetails_by_id = self._atomdetails_by_id.copy()
|
||||
if self.meta:
|
||||
clone.meta = self.meta.copy()
|
||||
return clone
|
||||
|
||||
def to_dict(self):
|
||||
"""Translates the internal state of this object to a ``dict``.
|
||||
|
||||
NOTE(harlowja): The returned ``dict`` does **not** include any
|
||||
contained atom details.
|
||||
|
||||
:returns: this flow detail in ``dict`` form
|
||||
"""
|
||||
return {
|
||||
'name': self.name,
|
||||
'meta': self.meta,
|
||||
'state': self.state,
|
||||
'uuid': self.uuid,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data):
|
||||
"""Translates the given ``dict`` into an instance of this class.
|
||||
|
||||
NOTE(harlowja): the ``dict`` provided should come from a prior
|
||||
call to :meth:`.to_dict`.
|
||||
|
||||
:returns: a new flow detail
|
||||
:rtype: :py:class:`.FlowDetail`
|
||||
"""
|
||||
obj = cls(data['name'], data['uuid'])
|
||||
obj.state = data.get('state')
|
||||
obj.meta = _fix_meta(data)
|
||||
return obj
|
||||
|
||||
def add(self, ad):
|
||||
"""Adds a new atom detail into this flow detail.
|
||||
|
||||
NOTE(harlowja): if an existing atom detail exists with the same
|
||||
uuid the existing one will be overwritten with the newly provided
|
||||
one.
|
||||
|
||||
Does not *guarantee* that the details will be immediately saved.
|
||||
"""
|
||||
self._atomdetails_by_id[ad.uuid] = ad
|
||||
|
||||
def find(self, ad_uuid):
|
||||
"""Locate the atom detail corresponding to the given uuid.
|
||||
|
||||
:returns: the atom detail with that uuid
|
||||
:rtype: :py:class:`.AtomDetail` (or ``None`` if not found)
|
||||
"""
|
||||
return self._atomdetails_by_id.get(ad_uuid)
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
"""The unique identifer of this flow detail."""
|
||||
return self._uuid
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""The name of this flow detail."""
|
||||
return self._name
|
||||
|
||||
def __iter__(self):
|
||||
for ad in six.itervalues(self._atomdetails_by_id):
|
||||
yield ad
|
||||
|
||||
def __len__(self):
|
||||
return len(self._atomdetails_by_id)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class AtomDetail(object):
|
||||
"""A collection of atom specific runtime information and metadata.
|
||||
|
||||
This is a base **abstract** class that contains attributes that are used
|
||||
to connect a atom to the persistence layer before, during, or after it is
|
||||
running. It includes any results it may have produced, any state that it
|
||||
may be in (for example ``FAILURE``), any exception that occurred when
|
||||
running, and any associated stacktrace that may have occurring during an
|
||||
exception being thrown. It may also contain any other metadata that
|
||||
should also be stored along-side the details about the connected atom.
|
||||
|
||||
The data contained within this class need **not** be persisted to the
|
||||
backend storage in real time. The data in this class will only be
|
||||
guaranteed to be persisted when a save (or update) occurs via some backend
|
||||
connection.
|
||||
|
||||
:ivar state: The state of the atom associated with this atom detail.
|
||||
:ivar intention: The execution strategy of the atom associated
|
||||
with this atom detail (used by an engine/others to
|
||||
determine if the associated atom needs to be
|
||||
executed, reverted, retried and so-on).
|
||||
:ivar meta: A dictionary of meta-data associated with this atom detail.
|
||||
:ivar version: A version tuple or string that represents the
|
||||
atom version this atom detail is associated with (typically
|
||||
used for introspection and any data migration
|
||||
strategies).
|
||||
:ivar results: Any results the atom produced from either its
|
||||
``execute`` method or from other sources.
|
||||
:ivar failure: If the atom failed (possibly due to its ``execute``
|
||||
method raising) this will be a
|
||||
:py:class:`~taskflow.types.failure.Failure` object that
|
||||
represents that failure (if there was no failure this
|
||||
will be set to none).
|
||||
"""
|
||||
|
||||
def __init__(self, name, uuid):
|
||||
self._uuid = uuid
|
||||
self._name = name
|
||||
self.state = None
|
||||
self.intention = states.EXECUTE
|
||||
self.results = None
|
||||
self.failure = None
|
||||
self.meta = {}
|
||||
self.version = None
|
||||
|
||||
@staticmethod
|
||||
def _was_failure(state, result):
|
||||
# Internal helper method...
|
||||
return state == states.FAILURE and isinstance(result, ft.Failure)
|
||||
|
||||
@property
|
||||
def last_results(self):
|
||||
"""Gets the atoms last result.
|
||||
|
||||
If the atom has produced many results (for example if it has been
|
||||
retried, reverted, executed and ...) this returns the last one of
|
||||
many results.
|
||||
"""
|
||||
return self.results
|
||||
|
||||
def update(self, ad):
|
||||
"""Updates the object's state to be the same as the given one.
|
||||
|
||||
This will assign the private and public attributes of the given
|
||||
atom detail directly to this object (replacing any existing
|
||||
attributes in this object; even if they are the **same**).
|
||||
|
||||
NOTE(harlowja): If the provided object is this object itself
|
||||
then **no** update is done.
|
||||
|
||||
:returns: this atom detail
|
||||
:rtype: :py:class:`.AtomDetail`
|
||||
"""
|
||||
if ad is self:
|
||||
return self
|
||||
self.state = ad.state
|
||||
self.intention = ad.intention
|
||||
self.meta = ad.meta
|
||||
self.failure = ad.failure
|
||||
self.results = ad.results
|
||||
self.version = ad.version
|
||||
return self
|
||||
|
||||
@abc.abstractmethod
|
||||
def merge(self, other, deep_copy=False):
|
||||
"""Merges the current object state with the given ones state.
|
||||
|
||||
If ``deep_copy`` is provided as truthy then the
|
||||
local object will use ``copy.deepcopy`` to replace this objects
|
||||
local attributes with the provided objects attributes (**only** if
|
||||
there is a difference between this objects attributes and the
|
||||
provided attributes). If ``deep_copy`` is falsey (the default) then a
|
||||
reference copy will occur instead when a difference is detected.
|
||||
|
||||
NOTE(harlowja): If the provided object is this object itself
|
||||
then **no** merging is done. Do note that **no** results are merged
|
||||
in this method. That operation **must** to be the responsibilty of
|
||||
subclasses to implement and override this abstract method
|
||||
and provide that merging themselves as they see fit.
|
||||
|
||||
:returns: this atom detail (freshly merged with the incoming object)
|
||||
:rtype: :py:class:`.AtomDetail`
|
||||
"""
|
||||
copy_fn = _copy_function(deep_copy)
|
||||
# NOTE(imelnikov): states and intentions are just strings,
|
||||
# so there is no need to copy them (strings are immutable in python).
|
||||
self.state = other.state
|
||||
self.intention = other.intention
|
||||
if self.failure != other.failure:
|
||||
# NOTE(imelnikov): we can't just deep copy Failures, as they
|
||||
# contain tracebacks, which are not copyable.
|
||||
if other.failure:
|
||||
if deep_copy:
|
||||
self.failure = other.failure.copy()
|
||||
else:
|
||||
self.failure = other.failure
|
||||
else:
|
||||
self.failure = None
|
||||
if self.meta != other.meta:
|
||||
self.meta = copy_fn(other.meta)
|
||||
if self.version != other.version:
|
||||
self.version = copy_fn(other.version)
|
||||
return self
|
||||
|
||||
@abc.abstractmethod
|
||||
def put(self, state, result):
|
||||
"""Puts a result (acquired in the given state) into this detail."""
|
||||
|
||||
def to_dict(self):
|
||||
"""Translates the internal state of this object to a ``dict``.
|
||||
|
||||
:returns: this atom detail in ``dict`` form
|
||||
"""
|
||||
if self.failure:
|
||||
failure = self.failure.to_dict()
|
||||
else:
|
||||
failure = None
|
||||
return {
|
||||
'failure': failure,
|
||||
'meta': self.meta,
|
||||
'name': self.name,
|
||||
'results': self.results,
|
||||
'state': self.state,
|
||||
'version': self.version,
|
||||
'intention': self.intention,
|
||||
'uuid': self.uuid,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data):
|
||||
"""Translates the given ``dict`` into an instance of this class.
|
||||
|
||||
NOTE(harlowja): the ``dict`` provided should come from a prior
|
||||
call to :meth:`.to_dict`.
|
||||
|
||||
:returns: a new atom detail
|
||||
:rtype: :py:class:`.AtomDetail`
|
||||
"""
|
||||
obj = cls(data['name'], data['uuid'])
|
||||
obj.state = data.get('state')
|
||||
obj.intention = data.get('intention')
|
||||
obj.results = data.get('results')
|
||||
obj.version = data.get('version')
|
||||
obj.meta = _fix_meta(data)
|
||||
failure = data.get('failure')
|
||||
if failure:
|
||||
obj.failure = ft.Failure.from_dict(failure)
|
||||
return obj
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
"""The unique identifer of this atom detail."""
|
||||
return self._uuid
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""The name of this atom detail."""
|
||||
return self._name
|
||||
|
||||
@abc.abstractmethod
|
||||
def reset(self, state):
|
||||
"""Resets this atom detail and sets ``state`` attribute value."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def copy(self):
|
||||
"""Copies this atom detail."""
|
||||
|
||||
|
||||
class TaskDetail(AtomDetail):
|
||||
"""A task detail (an atom detail typically associated with a |tt| atom).
|
||||
|
||||
.. |tt| replace:: :py:class:`~taskflow.task.BaseTask`
|
||||
"""
|
||||
|
||||
def reset(self, state):
|
||||
"""Resets this task detail and sets ``state`` attribute value.
|
||||
|
||||
This sets any previously set ``results`` and ``failure`` attributes
|
||||
back to ``None`` and sets the state to the provided one, as well as
|
||||
setting this task details ``intention`` attribute to ``EXECUTE``.
|
||||
"""
|
||||
self.results = None
|
||||
self.failure = None
|
||||
self.state = state
|
||||
self.intention = states.EXECUTE
|
||||
|
||||
def put(self, state, result):
|
||||
"""Puts a result (acquired in the given state) into this detail.
|
||||
|
||||
If the result is a :py:class:`~taskflow.types.failure.Failure` object
|
||||
then the ``failure`` attribute will be set (and the ``results``
|
||||
attribute will be set to ``None``); if the result is not a
|
||||
:py:class:`~taskflow.types.failure.Failure` object then the
|
||||
``results`` attribute will be set (and the ``failure`` attribute
|
||||
will be set to ``None``). In either case the ``state``
|
||||
attribute will be set to the provided state.
|
||||
"""
|
||||
was_altered = False
|
||||
if self.state != state:
|
||||
self.state = state
|
||||
was_altered = True
|
||||
if self._was_failure(state, result):
|
||||
if self.failure != result:
|
||||
self.failure = result
|
||||
was_altered = True
|
||||
if self.results is not None:
|
||||
self.results = None
|
||||
was_altered = True
|
||||
else:
|
||||
# We don't really have the ability to determine equality of
|
||||
# task (user) results at the current time, without making
|
||||
# potentially bad guesses, so assume the task detail always needs
|
||||
# to be saved if they are not exactly equivalent...
|
||||
if self.results is not result:
|
||||
self.results = result
|
||||
was_altered = True
|
||||
if self.failure is not None:
|
||||
self.failure = None
|
||||
was_altered = True
|
||||
return was_altered
|
||||
|
||||
def merge(self, other, deep_copy=False):
|
||||
"""Merges the current task detail with the given one.
|
||||
|
||||
NOTE(harlowja): This merge does **not** copy and replace
|
||||
the ``results`` attribute if it differs. Instead the current
|
||||
objects ``results`` attribute directly becomes (via assignment) the
|
||||
other objects ``results`` attribute. Also note that if the provided
|
||||
object is this object itself then **no** merging is done.
|
||||
|
||||
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
|
||||
what happens if this is copied at a deeper level (for example by
|
||||
using ``copy.deepcopy`` or by using ``copy.copy``).
|
||||
|
||||
:returns: this task detail (freshly merged with the incoming object)
|
||||
:rtype: :py:class:`.TaskDetail`
|
||||
"""
|
||||
if not isinstance(other, TaskDetail):
|
||||
raise exc.NotImplementedError("Can only merge with other"
|
||||
" task details")
|
||||
if other is self:
|
||||
return self
|
||||
super(TaskDetail, self).merge(other, deep_copy=deep_copy)
|
||||
if self.results != other.results:
|
||||
self.results = other.results
|
||||
return self
|
||||
|
||||
def copy(self):
|
||||
"""Copies this task detail.
|
||||
|
||||
Creates a shallow copy of this task detail (any meta-data and
|
||||
version information that this object maintains is shallow
|
||||
copied via ``copy.copy``).
|
||||
|
||||
NOTE(harlowja): This copy does **not** perform ``copy.copy`` on
|
||||
the ``results`` attribute of this object (before assigning to the
|
||||
copy). Instead the current objects ``results`` attribute directly
|
||||
becomes (via assignment) the copied objects ``results`` attribute.
|
||||
|
||||
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
|
||||
what happens if this is copied at a deeper level (for example by
|
||||
using ``copy.deepcopy`` or by using ``copy.copy``).
|
||||
|
||||
:returns: a new task detail
|
||||
:rtype: :py:class:`.TaskDetail`
|
||||
"""
|
||||
clone = copy.copy(self)
|
||||
clone.results = self.results
|
||||
if self.meta:
|
||||
clone.meta = self.meta.copy()
|
||||
if self.version:
|
||||
clone.version = copy.copy(self.version)
|
||||
return clone
|
||||
|
||||
|
||||
class RetryDetail(AtomDetail):
|
||||
"""A retry detail (an atom detail typically associated with a |rt| atom).
|
||||
|
||||
.. |rt| replace:: :py:class:`~taskflow.retry.Retry`
|
||||
"""
|
||||
|
||||
def __init__(self, name, uuid):
|
||||
super(RetryDetail, self).__init__(name, uuid)
|
||||
self.results = []
|
||||
|
||||
def reset(self, state):
|
||||
"""Resets this retry detail and sets ``state`` attribute value.
|
||||
|
||||
This sets any previously added ``results`` back to an empty list
|
||||
and resets the ``failure`` attribute back to ``None`` and sets the
|
||||
state to the provided one, as well as setting this atom
|
||||
details ``intention`` attribute to ``EXECUTE``.
|
||||
"""
|
||||
self.results = []
|
||||
self.failure = None
|
||||
self.state = state
|
||||
self.intention = states.EXECUTE
|
||||
|
||||
def copy(self):
|
||||
"""Copies this retry detail.
|
||||
|
||||
Creates a shallow copy of this retry detail (any meta-data and
|
||||
version information that this object maintains is shallow
|
||||
copied via ``copy.copy``).
|
||||
|
||||
NOTE(harlowja): This copy does **not** copy
|
||||
the incoming objects ``results`` attribute. Instead this
|
||||
objects ``results`` attribute list is iterated over and a new list
|
||||
is constructed with each ``(data, failures)`` element in that list
|
||||
having its ``failures`` (a dictionary of each named
|
||||
:py:class:`~taskflow.types.failure.Failure` object that
|
||||
occured) copied but its ``data`` is left untouched. After
|
||||
this is done that new list becomes (via assignment) the cloned
|
||||
objects ``results`` attribute.
|
||||
|
||||
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
|
||||
what happens if the ``data`` in ``results`` is copied at a
|
||||
deeper level (for example by using ``copy.deepcopy`` or by
|
||||
using ``copy.copy``).
|
||||
|
||||
:returns: a new retry detail
|
||||
:rtype: :py:class:`.RetryDetail`
|
||||
"""
|
||||
clone = copy.copy(self)
|
||||
results = []
|
||||
# NOTE(imelnikov): we can't just deep copy Failures, as they
|
||||
# contain tracebacks, which are not copyable.
|
||||
for (data, failures) in self.results:
|
||||
copied_failures = {}
|
||||
for (key, failure) in six.iteritems(failures):
|
||||
copied_failures[key] = failure
|
||||
results.append((data, copied_failures))
|
||||
clone.results = results
|
||||
if self.meta:
|
||||
clone.meta = self.meta.copy()
|
||||
if self.version:
|
||||
clone.version = copy.copy(self.version)
|
||||
return clone
|
||||
|
||||
@property
|
||||
def last_results(self):
|
||||
"""The last result that was produced."""
|
||||
try:
|
||||
return self.results[-1][0]
|
||||
except IndexError:
|
||||
exc.raise_with_cause(exc.NotFound, "Last results not found")
|
||||
|
||||
@property
|
||||
def last_failures(self):
|
||||
"""The last failure dictionary that was produced.
|
||||
|
||||
NOTE(harlowja): This is **not** the same as the
|
||||
local ``failure`` attribute as the obtained failure dictionary in
|
||||
the ``results`` attribute (which is what this returns) is from
|
||||
associated atom failures (which is different from the directly
|
||||
related failure of the retry unit associated with this
|
||||
atom detail).
|
||||
"""
|
||||
try:
|
||||
return self.results[-1][1]
|
||||
except IndexError:
|
||||
exc.raise_with_cause(exc.NotFound, "Last failures not found")
|
||||
|
||||
def put(self, state, result):
|
||||
"""Puts a result (acquired in the given state) into this detail.
|
||||
|
||||
If the result is a :py:class:`~taskflow.types.failure.Failure` object
|
||||
then the ``failure`` attribute will be set; if the result is not a
|
||||
:py:class:`~taskflow.types.failure.Failure` object then the
|
||||
``results`` attribute will be appended to (and the ``failure``
|
||||
attribute will be set to ``None``). In either case the ``state``
|
||||
attribute will be set to the provided state.
|
||||
"""
|
||||
# Do not clean retry history (only on reset does this happen).
|
||||
self.state = state
|
||||
if self._was_failure(state, result):
|
||||
self.failure = result
|
||||
else:
|
||||
self.results.append((result, {}))
|
||||
self.failure = None
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data):
|
||||
"""Translates the given ``dict`` into an instance of this class."""
|
||||
|
||||
def decode_results(results):
|
||||
if not results:
|
||||
return []
|
||||
new_results = []
|
||||
for (data, failures) in results:
|
||||
new_failures = {}
|
||||
for (key, data) in six.iteritems(failures):
|
||||
new_failures[key] = ft.Failure.from_dict(data)
|
||||
new_results.append((data, new_failures))
|
||||
return new_results
|
||||
|
||||
obj = super(RetryDetail, cls).from_dict(data)
|
||||
obj.results = decode_results(obj.results)
|
||||
return obj
|
||||
|
||||
def to_dict(self):
|
||||
"""Translates the internal state of this object to a ``dict``."""
|
||||
|
||||
def encode_results(results):
|
||||
if not results:
|
||||
return []
|
||||
new_results = []
|
||||
for (data, failures) in results:
|
||||
new_failures = {}
|
||||
for (key, failure) in six.iteritems(failures):
|
||||
new_failures[key] = failure.to_dict()
|
||||
new_results.append((data, new_failures))
|
||||
return new_results
|
||||
|
||||
base = super(RetryDetail, self).to_dict()
|
||||
base['results'] = encode_results(base.get('results'))
|
||||
return base
|
||||
|
||||
def merge(self, other, deep_copy=False):
|
||||
"""Merges the current retry detail with the given one.
|
||||
|
||||
NOTE(harlowja): This merge does **not** deep copy
|
||||
the incoming objects ``results`` attribute (if it differs). Instead
|
||||
the incoming objects ``results`` attribute list is **always** iterated
|
||||
over and a new list is constructed with
|
||||
each ``(data, failures)`` element in that list having
|
||||
its ``failures`` (a dictionary of each named
|
||||
:py:class:`~taskflow.types.failure.Failure` objects that
|
||||
occurred) copied but its ``data`` is left untouched. After
|
||||
this is done that new list becomes (via assignment) this
|
||||
objects ``results`` attribute. Also note that if the provided object
|
||||
is this object itself then **no** merging is done.
|
||||
|
||||
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
|
||||
what happens if the ``data`` in ``results`` is copied at a
|
||||
deeper level (for example by using ``copy.deepcopy`` or by
|
||||
using ``copy.copy``).
|
||||
|
||||
:returns: this retry detail (freshly merged with the incoming object)
|
||||
:rtype: :py:class:`.RetryDetail`
|
||||
"""
|
||||
if not isinstance(other, RetryDetail):
|
||||
raise exc.NotImplementedError("Can only merge with other"
|
||||
" retry details")
|
||||
if other is self:
|
||||
return self
|
||||
super(RetryDetail, self).merge(other, deep_copy=deep_copy)
|
||||
results = []
|
||||
# NOTE(imelnikov): we can't just deep copy Failures, as they
|
||||
# contain tracebacks, which are not copyable.
|
||||
for (data, failures) in other.results:
|
||||
copied_failures = {}
|
||||
for (key, failure) in six.iteritems(failures):
|
||||
if deep_copy:
|
||||
copied_failures[key] = failure.copy()
|
||||
else:
|
||||
copied_failures[key] = failure
|
||||
results.append((data, copied_failures))
|
||||
self.results = results
|
||||
return self
|
||||
|
||||
|
||||
_DETAIL_TO_NAME = {
|
||||
RetryDetail: 'RETRY_DETAIL',
|
||||
TaskDetail: 'TASK_DETAIL',
|
||||
}
|
||||
_NAME_TO_DETAIL = dict((name, cls)
|
||||
for (cls, name) in six.iteritems(_DETAIL_TO_NAME))
|
||||
ATOM_TYPES = list(six.iterkeys(_NAME_TO_DETAIL))
|
||||
|
||||
|
||||
def atom_detail_class(atom_type):
|
||||
try:
|
||||
return _NAME_TO_DETAIL[atom_type]
|
||||
except KeyError:
|
||||
raise TypeError("Unknown atom type '%s'" % (atom_type))
|
||||
|
||||
|
||||
def atom_detail_type(atom_detail):
|
||||
try:
|
||||
return _DETAIL_TO_NAME[type(atom_detail)]
|
||||
except KeyError:
|
||||
raise TypeError("Unknown atom '%s' (%s)"
|
||||
% (atom_detail, type(atom_detail)))
|
@ -19,7 +19,7 @@ import six
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.persistence import base
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.persistence import models
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
@ -60,23 +60,23 @@ class PathBasedConnection(base.Connection):
|
||||
|
||||
@staticmethod
|
||||
def _serialize(obj):
|
||||
if isinstance(obj, logbook.LogBook):
|
||||
if isinstance(obj, models.LogBook):
|
||||
return obj.to_dict(marshal_time=True)
|
||||
elif isinstance(obj, logbook.FlowDetail):
|
||||
elif isinstance(obj, models.FlowDetail):
|
||||
return obj.to_dict()
|
||||
elif isinstance(obj, logbook.AtomDetail):
|
||||
elif isinstance(obj, models.AtomDetail):
|
||||
return base._format_atom(obj)
|
||||
else:
|
||||
raise exc.StorageFailure("Invalid storage class %s" % type(obj))
|
||||
|
||||
@staticmethod
|
||||
def _deserialize(cls, data):
|
||||
if issubclass(cls, logbook.LogBook):
|
||||
if issubclass(cls, models.LogBook):
|
||||
return cls.from_dict(data, unmarshal_time=True)
|
||||
elif issubclass(cls, logbook.FlowDetail):
|
||||
elif issubclass(cls, models.FlowDetail):
|
||||
return cls.from_dict(data)
|
||||
elif issubclass(cls, logbook.AtomDetail):
|
||||
atom_class = logbook.atom_detail_class(data['type'])
|
||||
elif issubclass(cls, models.AtomDetail):
|
||||
atom_class = models.atom_detail_class(data['type'])
|
||||
return atom_class.from_dict(data['atom'])
|
||||
else:
|
||||
raise exc.StorageFailure("Invalid storage class %s" % cls)
|
||||
@ -130,11 +130,11 @@ class PathBasedConnection(base.Connection):
|
||||
"""Context manager that yields a transaction"""
|
||||
|
||||
def _get_obj_path(self, obj):
|
||||
if isinstance(obj, logbook.LogBook):
|
||||
if isinstance(obj, models.LogBook):
|
||||
path = self.book_path
|
||||
elif isinstance(obj, logbook.FlowDetail):
|
||||
elif isinstance(obj, models.FlowDetail):
|
||||
path = self.flow_path
|
||||
elif isinstance(obj, logbook.AtomDetail):
|
||||
elif isinstance(obj, models.AtomDetail):
|
||||
path = self.atom_path
|
||||
else:
|
||||
raise exc.StorageFailure("Invalid storage class %s" % type(obj))
|
||||
@ -159,7 +159,7 @@ class PathBasedConnection(base.Connection):
|
||||
def get_logbook(self, book_uuid, lazy=False):
|
||||
book_path = self._join_path(self.book_path, book_uuid)
|
||||
book_data = self._get_item(book_path)
|
||||
book = self._deserialize(logbook.LogBook, book_data)
|
||||
book = self._deserialize(models.LogBook, book_data)
|
||||
if not lazy:
|
||||
for flow_details in self.get_flows_for_book(book_uuid):
|
||||
book.add(flow_details)
|
||||
@ -185,7 +185,7 @@ class PathBasedConnection(base.Connection):
|
||||
def get_flow_details(self, flow_uuid, lazy=False):
|
||||
flow_path = self._join_path(self.flow_path, flow_uuid)
|
||||
flow_data = self._get_item(flow_path)
|
||||
flow_details = self._deserialize(logbook.FlowDetail, flow_data)
|
||||
flow_details = self._deserialize(models.FlowDetail, flow_data)
|
||||
if not lazy:
|
||||
for atom_details in self.get_atoms_for_flow(flow_uuid):
|
||||
flow_details.add(atom_details)
|
||||
@ -216,7 +216,7 @@ class PathBasedConnection(base.Connection):
|
||||
def get_atom_details(self, atom_uuid):
|
||||
atom_path = self._join_path(self.atom_path, atom_uuid)
|
||||
atom_data = self._get_item(atom_path)
|
||||
return self._deserialize(logbook.AtomDetail, atom_data)
|
||||
return self._deserialize(models.AtomDetail, atom_data)
|
||||
|
||||
def update_atom_details(self, atom_detail, ignore_missing=False):
|
||||
with self._transaction() as transaction:
|
||||
|
@ -24,7 +24,7 @@ import six
|
||||
from taskflow import exceptions
|
||||
from taskflow import logging
|
||||
from taskflow.persistence.backends import impl_memory
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.persistence import models
|
||||
from taskflow import retry
|
||||
from taskflow import states
|
||||
from taskflow import task
|
||||
@ -153,8 +153,8 @@ class Storage(object):
|
||||
self._injected_args = {}
|
||||
self._lock = fasteners.ReaderWriterLock()
|
||||
self._ensure_matchers = [
|
||||
((task.BaseTask,), (logbook.TaskDetail, 'Task')),
|
||||
((retry.Retry,), (logbook.RetryDetail, 'Retry')),
|
||||
((task.BaseTask,), (models.TaskDetail, 'Task')),
|
||||
((retry.Retry,), (models.RetryDetail, 'Retry')),
|
||||
]
|
||||
if scope_fetcher is None:
|
||||
scope_fetcher = lambda atom_name: None
|
||||
@ -171,7 +171,7 @@ class Storage(object):
|
||||
for ad in self._flowdetail)
|
||||
try:
|
||||
source, _clone = self._atomdetail_by_name(
|
||||
self.injector_name, expected_type=logbook.TaskDetail)
|
||||
self.injector_name, expected_type=models.TaskDetail)
|
||||
except exceptions.NotFound:
|
||||
pass
|
||||
else:
|
||||
@ -399,7 +399,7 @@ class Storage(object):
|
||||
else:
|
||||
update_with[META_PROGRESS_DETAILS] = None
|
||||
self._update_atom_metadata(task_name, update_with,
|
||||
expected_type=logbook.TaskDetail)
|
||||
expected_type=models.TaskDetail)
|
||||
|
||||
@fasteners.read_locked
|
||||
def get_task_progress(self, task_name):
|
||||
@ -409,7 +409,7 @@ class Storage(object):
|
||||
:returns: current task progress value
|
||||
"""
|
||||
source, _clone = self._atomdetail_by_name(
|
||||
task_name, expected_type=logbook.TaskDetail)
|
||||
task_name, expected_type=models.TaskDetail)
|
||||
try:
|
||||
return source.meta[META_PROGRESS]
|
||||
except KeyError:
|
||||
@ -424,7 +424,7 @@ class Storage(object):
|
||||
dict
|
||||
"""
|
||||
source, _clone = self._atomdetail_by_name(
|
||||
task_name, expected_type=logbook.TaskDetail)
|
||||
task_name, expected_type=models.TaskDetail)
|
||||
try:
|
||||
return source.meta[META_PROGRESS_DETAILS]
|
||||
except KeyError:
|
||||
@ -468,7 +468,7 @@ class Storage(object):
|
||||
def save_retry_failure(self, retry_name, failed_atom_name, failure):
|
||||
"""Save subflow failure to retry controller history."""
|
||||
source, clone = self._atomdetail_by_name(
|
||||
retry_name, expected_type=logbook.RetryDetail, clone=True)
|
||||
retry_name, expected_type=models.RetryDetail, clone=True)
|
||||
try:
|
||||
failures = clone.last_failures
|
||||
except exceptions.NotFound:
|
||||
@ -485,7 +485,7 @@ class Storage(object):
|
||||
def cleanup_retry_history(self, retry_name, state):
|
||||
"""Cleanup history of retry atom with given name."""
|
||||
source, clone = self._atomdetail_by_name(
|
||||
retry_name, expected_type=logbook.RetryDetail, clone=True)
|
||||
retry_name, expected_type=models.RetryDetail, clone=True)
|
||||
clone.state = state
|
||||
clone.results = []
|
||||
self._with_connection(self._save_atom_detail, source, clone)
|
||||
@ -625,7 +625,7 @@ class Storage(object):
|
||||
try:
|
||||
source, clone = self._atomdetail_by_name(
|
||||
self.injector_name,
|
||||
expected_type=logbook.TaskDetail,
|
||||
expected_type=models.TaskDetail,
|
||||
clone=True)
|
||||
except exceptions.NotFound:
|
||||
# Ensure we have our special task detail...
|
||||
@ -633,7 +633,7 @@ class Storage(object):
|
||||
# TODO(harlowja): get this removed when
|
||||
# https://review.openstack.org/#/c/165645/ merges.
|
||||
source = self._create_atom_detail(self.injector_name,
|
||||
logbook.TaskDetail,
|
||||
models.TaskDetail,
|
||||
atom_state=None)
|
||||
fd_source, fd_clone = self._fetch_flowdetail(clone=True)
|
||||
fd_clone.add(source)
|
||||
@ -974,7 +974,7 @@ class Storage(object):
|
||||
def get_retry_history(self, retry_name):
|
||||
"""Fetch a single retrys history."""
|
||||
source, _clone = self._atomdetail_by_name(
|
||||
retry_name, expected_type=logbook.RetryDetail)
|
||||
retry_name, expected_type=models.RetryDetail)
|
||||
return self._translate_into_history(source)
|
||||
|
||||
@fasteners.read_locked
|
||||
@ -982,7 +982,7 @@ class Storage(object):
|
||||
"""Fetch all retrys histories."""
|
||||
histories = []
|
||||
for ad in self._flowdetail:
|
||||
if isinstance(ad, logbook.RetryDetail):
|
||||
if isinstance(ad, models.RetryDetail):
|
||||
histories.append((ad.name,
|
||||
self._translate_into_history(ad)))
|
||||
return histories
|
||||
|
@ -19,7 +19,7 @@ import contextlib
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.persistence import models
|
||||
from taskflow import states
|
||||
from taskflow.types import failure
|
||||
|
||||
@ -31,15 +31,15 @@ class PersistenceTestMixin(object):
|
||||
def test_task_detail_update_not_existing(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
td = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
fd.add(td)
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
conn.save_logbook(lb)
|
||||
|
||||
td2 = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
td2 = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
fd.add(td2)
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
conn.update_flow_details(fd)
|
||||
@ -53,13 +53,13 @@ class PersistenceTestMixin(object):
|
||||
def test_flow_detail_update_not_existing(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
conn.save_logbook(lb)
|
||||
|
||||
fd2 = logbook.FlowDetail('test-2', uuid=uuidutils.generate_uuid())
|
||||
fd2 = models.FlowDetail('test-2', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd2)
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
conn.save_logbook(lb)
|
||||
@ -73,7 +73,7 @@ class PersistenceTestMixin(object):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_meta = {'1': 2}
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
lb.meta = lb_meta
|
||||
|
||||
# Should not already exist
|
||||
@ -94,8 +94,8 @@ class PersistenceTestMixin(object):
|
||||
def test_flow_detail_save(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
|
||||
# Ensure we can't save it since its owning logbook hasn't been
|
||||
@ -113,8 +113,8 @@ class PersistenceTestMixin(object):
|
||||
def test_flow_detail_meta_update(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
fd.meta = {'test': 42}
|
||||
lb.add(fd)
|
||||
|
||||
@ -133,9 +133,9 @@ class PersistenceTestMixin(object):
|
||||
def test_flow_detail_lazy_fetch(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
td = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
td.version = '4.2'
|
||||
fd.add(td)
|
||||
lb.add(fd)
|
||||
@ -149,10 +149,10 @@ class PersistenceTestMixin(object):
|
||||
def test_task_detail_save(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
td = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
fd.add(td)
|
||||
|
||||
# Ensure we can't save it since its owning logbook hasn't been
|
||||
@ -171,10 +171,10 @@ class PersistenceTestMixin(object):
|
||||
def test_task_detail_meta_update(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
td = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
td.meta = {'test': 42}
|
||||
fd.add(td)
|
||||
|
||||
@ -192,15 +192,15 @@ class PersistenceTestMixin(object):
|
||||
fd2 = lb2.find(fd.uuid)
|
||||
td2 = fd2.find(td.uuid)
|
||||
self.assertEqual(td2.meta.get('test'), 43)
|
||||
self.assertIsInstance(td2, logbook.TaskDetail)
|
||||
self.assertIsInstance(td2, models.TaskDetail)
|
||||
|
||||
def test_task_detail_with_failure(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
td = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
|
||||
try:
|
||||
raise RuntimeError('Woot!')
|
||||
@ -222,18 +222,18 @@ class PersistenceTestMixin(object):
|
||||
self.assertEqual(td2.failure.exception_str, 'Woot!')
|
||||
self.assertIs(td2.failure.check(RuntimeError), RuntimeError)
|
||||
self.assertEqual(td2.failure.traceback_str, td.failure.traceback_str)
|
||||
self.assertIsInstance(td2, logbook.TaskDetail)
|
||||
self.assertIsInstance(td2, models.TaskDetail)
|
||||
|
||||
def test_logbook_merge_flow_detail(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
conn.save_logbook(lb)
|
||||
lb2 = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd2 = logbook.FlowDetail('test2', uuid=uuidutils.generate_uuid())
|
||||
lb2 = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd2 = models.FlowDetail('test2', uuid=uuidutils.generate_uuid())
|
||||
lb2.add(fd2)
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
conn.save_logbook(lb2)
|
||||
@ -244,8 +244,8 @@ class PersistenceTestMixin(object):
|
||||
def test_logbook_add_flow_detail(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
conn.save_logbook(lb)
|
||||
@ -258,8 +258,8 @@ class PersistenceTestMixin(object):
|
||||
def test_logbook_lazy_fetch(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
conn.save_logbook(lb)
|
||||
@ -271,9 +271,9 @@ class PersistenceTestMixin(object):
|
||||
def test_logbook_add_task_detail(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
td = models.TaskDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
td.version = '4.2'
|
||||
fd.add(td)
|
||||
lb.add(fd)
|
||||
@ -298,7 +298,7 @@ class PersistenceTestMixin(object):
|
||||
def test_logbook_delete(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
self.assertRaises(exc.NotFound, conn.destroy_logbook, lb_id)
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
@ -313,10 +313,10 @@ class PersistenceTestMixin(object):
|
||||
def test_task_detail_retry_type_(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
rd = logbook.RetryDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
rd = models.RetryDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
rd.intention = states.REVERT
|
||||
fd.add(rd)
|
||||
|
||||
@ -330,15 +330,15 @@ class PersistenceTestMixin(object):
|
||||
fd2 = lb2.find(fd.uuid)
|
||||
rd2 = fd2.find(rd.uuid)
|
||||
self.assertEqual(rd2.intention, states.REVERT)
|
||||
self.assertIsInstance(rd2, logbook.RetryDetail)
|
||||
self.assertIsInstance(rd2, models.RetryDetail)
|
||||
|
||||
def test_retry_detail_save_with_task_failure(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid())
|
||||
rd = models.RetryDetail("retry-1", uuid=uuidutils.generate_uuid())
|
||||
fail = failure.Failure.from_exception(RuntimeError('fail'))
|
||||
rd.results.append((42, {'some-task': fail}))
|
||||
fd.add(rd)
|
||||
@ -354,7 +354,7 @@ class PersistenceTestMixin(object):
|
||||
lb2 = conn.get_logbook(lb_id)
|
||||
fd2 = lb2.find(fd.uuid)
|
||||
rd2 = fd2.find(rd.uuid)
|
||||
self.assertIsInstance(rd2, logbook.RetryDetail)
|
||||
self.assertIsInstance(rd2, models.RetryDetail)
|
||||
fail2 = rd2.results[0][1].get('some-task')
|
||||
self.assertIsInstance(fail2, failure.Failure)
|
||||
self.assertTrue(fail.matches(fail2))
|
||||
@ -362,10 +362,10 @@ class PersistenceTestMixin(object):
|
||||
def test_retry_detail_save_intention(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb = models.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = models.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid())
|
||||
rd = models.RetryDetail("retry-1", uuid=uuidutils.generate_uuid())
|
||||
fd.add(rd)
|
||||
|
||||
# save it
|
||||
@ -385,4 +385,4 @@ class PersistenceTestMixin(object):
|
||||
fd2 = lb2.find(fd.uuid)
|
||||
rd2 = fd2.find(rd.uuid)
|
||||
self.assertEqual(rd2.intention, states.REVERT)
|
||||
self.assertIsInstance(rd2, logbook.RetryDetail)
|
||||
self.assertIsInstance(rd2, models.RetryDetail)
|
||||
|
@ -28,7 +28,7 @@ from taskflow import exceptions as exc
|
||||
from taskflow.patterns import graph_flow as gf
|
||||
from taskflow.patterns import linear_flow as lf
|
||||
from taskflow.patterns import unordered_flow as uf
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.persistence import models
|
||||
from taskflow import states
|
||||
from taskflow import task
|
||||
from taskflow import test
|
||||
@ -495,7 +495,7 @@ class EngineParallelFlowTest(utils.EngineTestBase):
|
||||
|
||||
# Create FlowDetail as if we already run task1
|
||||
lb, fd = p_utils.temporary_flow_detail(self.backend)
|
||||
td = logbook.TaskDetail(name='task1', uuid='42')
|
||||
td = models.TaskDetail(name='task1', uuid='42')
|
||||
td.state = states.SUCCESS
|
||||
td.results = 17
|
||||
fd.add(td)
|
||||
|
@ -21,7 +21,7 @@ from oslo_utils import uuidutils
|
||||
|
||||
from taskflow import exceptions
|
||||
from taskflow.persistence import backends
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.persistence import models
|
||||
from taskflow import states
|
||||
from taskflow import storage
|
||||
from taskflow import test
|
||||
@ -61,7 +61,7 @@ class StorageTestMixin(object):
|
||||
self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my_task')))
|
||||
|
||||
def test_flow_name_and_uuid(self):
|
||||
flow_detail = logbook.FlowDetail(name='test-fd', uuid='aaaa')
|
||||
flow_detail = models.FlowDetail(name='test-fd', uuid='aaaa')
|
||||
s = self._get_storage(flow_detail)
|
||||
self.assertEqual(s.flow_name, 'test-fd')
|
||||
self.assertEqual(s.flow_uuid, 'aaaa')
|
||||
@ -97,14 +97,14 @@ class StorageTestMixin(object):
|
||||
|
||||
def test_get_without_save(self):
|
||||
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
|
||||
td = logbook.TaskDetail(name='my_task', uuid='42')
|
||||
td = models.TaskDetail(name='my_task', uuid='42')
|
||||
flow_detail.add(td)
|
||||
s = self._get_storage(flow_detail)
|
||||
self.assertEqual('42', s.get_atom_uuid('my_task'))
|
||||
|
||||
def test_ensure_existing_task(self):
|
||||
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
|
||||
td = logbook.TaskDetail(name='my_task', uuid='42')
|
||||
td = models.TaskDetail(name='my_task', uuid='42')
|
||||
flow_detail.add(td)
|
||||
s = self._get_storage(flow_detail)
|
||||
s.ensure_atom(test_utils.NoopTask('my_task'))
|
||||
@ -523,7 +523,7 @@ class StorageTestMixin(object):
|
||||
def test_logbook_get_unknown_atom_type(self):
|
||||
self.assertRaisesRegexp(TypeError,
|
||||
'Unknown atom',
|
||||
logbook.atom_detail_class, 'some_detail')
|
||||
models.atom_detail_class, 'some_detail')
|
||||
|
||||
def test_save_task_intention(self):
|
||||
s = self._get_storage()
|
||||
|
@ -21,7 +21,7 @@ from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from taskflow import logging
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.persistence import models
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -33,7 +33,7 @@ def temporary_log_book(backend=None):
|
||||
Mainly useful for tests and other use cases where a temporary logbook
|
||||
is needed for a short-period of time.
|
||||
"""
|
||||
book = logbook.LogBook('tmp')
|
||||
book = models.LogBook('tmp')
|
||||
if backend is not None:
|
||||
with contextlib.closing(backend.get_connection()) as conn:
|
||||
conn.save_logbook(book)
|
||||
@ -48,7 +48,7 @@ def temporary_flow_detail(backend=None):
|
||||
"""
|
||||
flow_id = uuidutils.generate_uuid()
|
||||
book = temporary_log_book(backend)
|
||||
book.add(logbook.FlowDetail(name='tmp-flow-detail', uuid=flow_id))
|
||||
book.add(models.FlowDetail(name='tmp-flow-detail', uuid=flow_id))
|
||||
if backend is not None:
|
||||
with contextlib.closing(backend.get_connection()) as conn:
|
||||
conn.save_logbook(book)
|
||||
@ -77,7 +77,7 @@ def create_flow_detail(flow, book=None, backend=None, meta=None):
|
||||
LOG.warn("No name provided for flow %s (id %s)", flow, flow_id)
|
||||
flow_name = flow_id
|
||||
|
||||
flow_detail = logbook.FlowDetail(name=flow_name, uuid=flow_id)
|
||||
flow_detail = models.FlowDetail(name=flow_name, uuid=flow_id)
|
||||
if meta is not None:
|
||||
if flow_detail.meta is None:
|
||||
flow_detail.meta = {}
|
||||
@ -130,7 +130,7 @@ def _format_shared(obj, indent):
|
||||
|
||||
def pformat_atom_detail(atom_detail, indent=0):
|
||||
"""Pretty formats a atom detail."""
|
||||
detail_type = logbook.atom_detail_type(atom_detail)
|
||||
detail_type = models.atom_detail_type(atom_detail)
|
||||
lines = ["%s%s: '%s'" % (" " * (indent), detail_type, atom_detail.name)]
|
||||
lines.extend(_format_shared(atom_detail, indent=indent + 1))
|
||||
lines.append("%s- version = %s"
|
||||
|
Loading…
Reference in New Issue
Block a user