Retain atom 'revert' result (or failure)

When a atom is reverted it can be useful to retain the
result of that 'revert' method being called, so that it
can be later analyzed (or used for various purposes) so
adjust the storage, and actions to enable it to be stored.

Change-Id: I38a9a5f3bf7550e924468bb4a86652cb8beb306c
This commit is contained in:
Joshua Harlow 2015-06-17 11:28:57 -07:00
parent 5fb62f3864
commit a3fe3eb698
23 changed files with 582 additions and 285 deletions

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 22 KiB

After

Width:  |  Height:  |  Size: 22 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 20 KiB

After

Width:  |  Height:  |  Size: 20 KiB

View File

@ -136,19 +136,25 @@ method returns.
**SUCCESS** - The engine running the task transitions the task to this state
after the task has finished successfully (ie no exception/s were raised during
execution).
running its :py:meth:`~taskflow.task.BaseTask.execute` method).
**FAILURE** - The engine running the task transitions the task to this state
after it has finished with an error.
after it has finished with an error (ie exception/s were raised during
running its :py:meth:`~taskflow.task.BaseTask.execute` method).
**REVERT_FAILURE** - The engine running the task transitions the task to this
state after it has finished with an error (ie exception/s were raised during
running its :py:meth:`~taskflow.task.BaseTask.revert` method).
**REVERTING** - The engine running a task transitions the task to this state
when the containing flow the engine is running starts to revert and
its :py:meth:`~taskflow.task.BaseTask.revert` method is called. Only tasks in
the ``SUCCESS`` or ``FAILURE`` state can be reverted. If this method fails (ie
raises an exception), the task goes to the ``FAILURE`` state (if it was already
in the ``FAILURE`` state then this is a no-op).
the ``SUCCESS`` or ``FAILURE`` state can be reverted. If this method fails (ie
raises an exception), the task goes to the ``REVERT_FAILURE`` state.
**REVERTED** - A task that has been reverted appears in this state.
**REVERTED** - The engine running the task transitions the task to this state
after it has successfully reverted the task (ie no exception/s were raised
during running its :py:meth:`~taskflow.task.BaseTask.revert` method).
Retry
=====
@ -188,17 +194,23 @@ state until its :py:meth:`~taskflow.retry.Retry.execute` method returns.
it was finished successfully (ie no exception/s were raised during
execution).
**FAILURE** - The engine running the retry transitions it to this state after
it has finished with an error.
**FAILURE** - The engine running the retry transitions the retry to this state
after it has finished with an error (ie exception/s were raised during
running its :py:meth:`~taskflow.retry.Retry.execute` method).
**REVERT_FAILURE** - The engine running the retry transitions the retry to
this state after it has finished with an error (ie exception/s were raised
during its :py:meth:`~taskflow.retry.Retry.revert` method).
**REVERTING** - The engine running the retry transitions to this state when
the associated flow the engine is running starts to revert it and its
:py:meth:`~taskflow.retry.Retry.revert` method is called. Only retries
in ``SUCCESS`` or ``FAILURE`` state can be reverted. If this method fails (ie
raises an exception), the retry goes to the ``FAILURE`` state (if it was
already in the ``FAILURE`` state then this is a no-op).
raises an exception), the retry goes to the ``REVERT_FAILURE`` state.
**REVERTED** - A retry that has been reverted appears in this state.
**REVERTED** - The engine running the retry transitions the retry to this state
after it has successfully reverted the retry (ie no exception/s were raised
during running its :py:meth:`~taskflow.retry.Retry.revert` method).
**RETRYING** - If flow that is associated with the current retry was failed and
reverted, the engine prepares the flow for the next run and transitions the

View File

@ -21,17 +21,19 @@ import six
from taskflow import states
#: Sentinel use to represent no-result (none can be a valid result...)
NO_RESULT = object()
#: States that are expected to/may have a result to save...
SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE)
@six.add_metaclass(abc.ABCMeta)
class Action(object):
"""An action that handles executing, state changes, ... of atoms."""
NO_RESULT = object()
"""
Sentinel use to represent lack of any result (none can be a valid result)
"""
#: States that are expected to have a result to save...
SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE,
states.REVERTED, states.REVERT_FAILURE)
def __init__(self, storage, notifier):
self._storage = storage
self._notifier = notifier

View File

@ -60,19 +60,21 @@ class RetryAction(base.Action):
arguments.update(addons)
return arguments
def change_state(self, retry, state, result=base.NO_RESULT):
def change_state(self, retry, state, result=base.Action.NO_RESULT):
old_state = self._storage.get_atom_state(retry.name)
if state in base.SAVE_RESULT_STATES:
if state in self.SAVE_RESULT_STATES:
save_result = None
if result is not base.NO_RESULT:
if result is not self.NO_RESULT:
save_result = result
self._storage.save(retry.name, save_result, state)
elif state == states.REVERTED:
self._storage.cleanup_retry_history(retry.name, state)
# TODO(harlowja): combine this with the save to avoid a call
# back into the persistence layer...
if state == states.REVERTED:
self._storage.cleanup_retry_history(retry.name, state)
else:
if state == old_state:
# NOTE(imelnikov): nothing really changed, so we should not
# write anything to storage and run notifications
# write anything to storage and run notifications.
return
self._storage.set_atom_state(retry.name, state)
retry_uuid = self._storage.get_atom_uuid(retry.name)
@ -81,7 +83,7 @@ class RetryAction(base.Action):
'retry_uuid': retry_uuid,
'old_state': old_state,
}
if result is not base.NO_RESULT:
if result is not self.NO_RESULT:
details['result'] = result
self._notifier.notify(state, details)
@ -106,9 +108,9 @@ class RetryAction(base.Action):
def _on_done_callback(fut):
result = fut.result()[-1]
if isinstance(result, failure.Failure):
self.change_state(retry, states.FAILURE)
self.change_state(retry, states.REVERT_FAILURE, result=result)
else:
self.change_state(retry, states.REVERTED)
self.change_state(retry, states.REVERTED, result=result)
self.change_state(retry, states.REVERTING)
arg_addons = {

View File

@ -32,8 +32,8 @@ class TaskAction(base.Action):
super(TaskAction, self).__init__(storage, notifier)
self._task_executor = task_executor
def _is_identity_transition(self, old_state, state, task, progress):
if state in base.SAVE_RESULT_STATES:
def _is_identity_transition(self, old_state, state, task, progress=None):
if state in self.SAVE_RESULT_STATES:
# saving result is never identity transition
return False
if state != old_state:
@ -50,16 +50,17 @@ class TaskAction(base.Action):
return True
def change_state(self, task, state,
result=base.NO_RESULT, progress=None):
progress=None, result=base.Action.NO_RESULT):
old_state = self._storage.get_atom_state(task.name)
if self._is_identity_transition(old_state, state, task, progress):
if self._is_identity_transition(old_state, state, task,
progress=progress):
# NOTE(imelnikov): ignore identity transitions in order
# to avoid extra write to storage backend and, what's
# more important, extra notifications
# more important, extra notifications.
return
if state in base.SAVE_RESULT_STATES:
if state in self.SAVE_RESULT_STATES:
save_result = None
if result is not base.NO_RESULT:
if result is not self.NO_RESULT:
save_result = result
self._storage.save(task.name, save_result, state)
else:
@ -72,7 +73,7 @@ class TaskAction(base.Action):
'task_uuid': task_uuid,
'old_state': old_state,
}
if result is not base.NO_RESULT:
if result is not self.NO_RESULT:
details['result'] = result
self._notifier.notify(state, details)
if progress is not None:
@ -140,9 +141,10 @@ class TaskAction(base.Action):
def complete_reversion(self, task, result):
if isinstance(result, failure.Failure):
self.change_state(task, states.FAILURE)
self.change_state(task, states.REVERT_FAILURE, result=result)
else:
self.change_state(task, states.REVERTED, progress=1.0)
self.change_state(task, states.REVERTED, progress=1.0,
result=result)
def wait_for_any(self, fs, timeout):
return self._task_executor.wait_for_any(fs, timeout)

View File

@ -152,6 +152,7 @@ class Completer(object):
if event == ex.EXECUTED:
self._process_atom_failure(node, result)
else:
# Reverting failed, always retain the failure...
return True
return False

View File

@ -16,6 +16,7 @@
import collections
import contextlib
import itertools
import threading
from concurrent import futures
@ -194,8 +195,10 @@ class ActionEngine(base.Engine):
if last_state and last_state not in ignorable_states:
self._change_state(last_state)
if last_state not in self.NO_RERAISING_STATES:
failures = self.storage.get_failures()
failure.Failure.reraise_if_any(failures.values())
it = itertools.chain(
six.itervalues(self.storage.get_failures()),
six.itervalues(self.storage.get_revert_failures()))
failure.Failure.reraise_if_any(it)
def _change_state(self, state):
with self._state_lock:

View File

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Add 'revert_results' and 'revert_failure' atom detail column.
Revision ID: 3162c0f3f8e4
Revises: 589dccdf2b6e
Create Date: 2015-06-17 15:52:56.575245
"""
# revision identifiers, used by Alembic.
revision = '3162c0f3f8e4'
down_revision = '589dccdf2b6e'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('atomdetails',
sa.Column('revert_results', sa.Text(), nullable=True))
op.add_column('atomdetails',
sa.Column('revert_failure', sa.Text(), nullable=True))
def downgrade():
op.drop_column('atomdetails', 'revert_results')
op.drop_column('atomdetails', 'revert_failure')

View File

@ -92,6 +92,8 @@ def fetch(metadata):
default=uuidutils.generate_uuid),
Column('failure', Json),
Column('results', Json),
Column('revert_results', Json),
Column('revert_failure', Json),
Column('atom_type', Enum(*models.ATOM_TYPES,
name='atom_types')),
Column('intention', Enum(*states.INTENTIONS,

View File

@ -32,6 +32,14 @@ LOG = logging.getLogger(__name__)
# Internal helpers...
def _is_all_none(arg, *args):
if arg is not None:
return False
for more_arg in args:
if more_arg is not None:
return False
return True
def _copy_function(deep_copy):
if deep_copy:
@ -413,11 +421,18 @@ class AtomDetail(object):
strategies).
:ivar results: Any results the atom produced from either its
``execute`` method or from other sources.
:ivar failure: If the atom failed (possibly due to its ``execute``
method raising) this will be a
:ivar revert_results: Any results the atom produced from either its
``revert`` method or from other sources.
:ivar failure: If the atom failed (due to its ``execute`` method
raising) this will be a
:py:class:`~taskflow.types.failure.Failure` object that
represents that failure (if there was no failure this
will be set to none).
:ivar revert_failure: If the atom failed (possibly due to its ``revert``
method raising) this will be a
:py:class:`~taskflow.types.failure.Failure` object
that represents that failure (if there was no
failure this will be set to none).
"""
def __init__(self, name, uuid):
@ -427,6 +442,8 @@ class AtomDetail(object):
self.intention = states.EXECUTE
self.results = None
self.failure = None
self.revert_results = None
self.revert_failure = None
self.meta = {}
self.version = None
@ -465,6 +482,8 @@ class AtomDetail(object):
self.meta = ad.meta
self.failure = ad.failure
self.results = ad.results
self.revert_results = ad.revert_results
self.revert_failure = ad.revert_failure
self.version = ad.version
return self
@ -503,6 +522,16 @@ class AtomDetail(object):
self.failure = other.failure
else:
self.failure = None
if self.revert_failure != other.revert_failure:
# NOTE(imelnikov): we can't just deep copy Failures, as they
# contain tracebacks, which are not copyable.
if other.revert_failure:
if deep_copy:
self.revert_failure = other.revert_failure.copy()
else:
self.revert_failure = other.revert_failure
else:
self.revert_failure = None
if self.meta != other.meta:
self.meta = copy_fn(other.meta)
if self.version != other.version:
@ -522,11 +551,17 @@ class AtomDetail(object):
failure = self.failure.to_dict()
else:
failure = None
if self.revert_failure:
revert_failure = self.revert_failure.to_dict()
else:
revert_failure = None
return {
'failure': failure,
'revert_failure': revert_failure,
'meta': self.meta,
'name': self.name,
'results': self.results,
'revert_results': self.revert_results,
'state': self.state,
'version': self.version,
'intention': self.intention,
@ -547,11 +582,15 @@ class AtomDetail(object):
obj.state = data.get('state')
obj.intention = data.get('intention')
obj.results = data.get('results')
obj.revert_results = data.get('revert_results')
obj.version = data.get('version')
obj.meta = _fix_meta(data)
failure = data.get('failure')
if failure:
obj.failure = ft.Failure.from_dict(failure)
revert_failure = data.get('revert_failure')
if revert_failure:
obj.revert_failure = ft.Failure.from_dict(revert_failure)
return obj
@property
@ -582,47 +621,65 @@ class TaskDetail(AtomDetail):
def reset(self, state):
"""Resets this task detail and sets ``state`` attribute value.
This sets any previously set ``results`` and ``failure`` attributes
back to ``None`` and sets the state to the provided one, as well as
setting this task details ``intention`` attribute to ``EXECUTE``.
This sets any previously set ``results``, ``failure``,
and ``revert_results`` attributes back to ``None`` and sets the
state to the provided one, as well as setting this task
details ``intention`` attribute to ``EXECUTE``.
"""
self.results = None
self.failure = None
self.revert_results = None
self.revert_failure = None
self.state = state
self.intention = states.EXECUTE
def put(self, state, result):
"""Puts a result (acquired in the given state) into this detail.
If the result is a :py:class:`~taskflow.types.failure.Failure` object
then the ``failure`` attribute will be set (and the ``results``
attribute will be set to ``None``); if the result is not a
:py:class:`~taskflow.types.failure.Failure` object then the
``results`` attribute will be set (and the ``failure`` attribute
will be set to ``None``). In either case the ``state``
attribute will be set to the provided state.
Returns whether this object was modified (or whether it was not).
"""
was_altered = False
if self.state != state:
if state != self.state:
self.state = state
was_altered = True
if self._was_failure(state, result):
if state == states.REVERT_FAILURE:
if self.revert_failure != result:
self.revert_failure = result
was_altered = True
if not _is_all_none(self.results, self.revert_results):
self.results = None
self.revert_results = None
was_altered = True
elif state == states.FAILURE:
if self.failure != result:
self.failure = result
was_altered = True
if self.results is not None:
if not _is_all_none(self.results, self.revert_results,
self.revert_failure):
self.results = None
self.revert_results = None
self.revert_failure = None
was_altered = True
elif state == states.SUCCESS:
if not _is_all_none(self.revert_results, self.revert_failure,
self.failure):
self.revert_results = None
self.revert_failure = None
self.failure = None
was_altered = True
else:
# We don't really have the ability to determine equality of
# task (user) results at the current time, without making
# potentially bad guesses, so assume the task detail always needs
# to be saved if they are not exactly equivalent...
if self.results is not result:
if result is not self.results:
self.results = result
was_altered = True
if self.failure is not None:
self.failure = None
elif state == states.REVERTED:
if not _is_all_none(self.revert_failure):
self.revert_failure = None
was_altered = True
if result is not self.revert_results:
self.revert_results = result
was_altered = True
return was_altered
@ -630,10 +687,11 @@ class TaskDetail(AtomDetail):
"""Merges the current task detail with the given one.
NOTE(harlowja): This merge does **not** copy and replace
the ``results`` attribute if it differs. Instead the current
objects ``results`` attribute directly becomes (via assignment) the
other objects ``results`` attribute. Also note that if the provided
object is this object itself then **no** merging is done.
the ``results`` or ``revert_results`` if it differs. Instead the
current objects ``results`` and ``revert_results`` attributes directly
becomes (via assignment) the other objects attributes. Also note that
if the provided object is this object itself then **no** merging is
done.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
what happens if this is copied at a deeper level (for example by
@ -648,8 +706,8 @@ class TaskDetail(AtomDetail):
if other is self:
return self
super(TaskDetail, self).merge(other, deep_copy=deep_copy)
if self.results != other.results:
self.results = other.results
self.results = other.results
self.revert_results = other.revert_results
return self
def copy(self):
@ -659,10 +717,10 @@ class TaskDetail(AtomDetail):
version information that this object maintains is shallow
copied via ``copy.copy``).
NOTE(harlowja): This copy does **not** perform ``copy.copy`` on
the ``results`` attribute of this object (before assigning to the
copy). Instead the current objects ``results`` attribute directly
becomes (via assignment) the copied objects ``results`` attribute.
NOTE(harlowja): This copy does **not** copy and replace
the ``results`` or ``revert_results`` attribute if it differs. Instead
the current objects ``results`` and ``revert_results`` attributes
directly becomes (via assignment) the cloned objects attributes.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
what happens if this is copied at a deeper level (for example by
@ -673,6 +731,7 @@ class TaskDetail(AtomDetail):
"""
clone = copy.copy(self)
clone.results = self.results
clone.revert_results = self.revert_results
if self.meta:
clone.meta = self.meta.copy()
if self.version:
@ -694,12 +753,15 @@ class RetryDetail(AtomDetail):
"""Resets this retry detail and sets ``state`` attribute value.
This sets any previously added ``results`` back to an empty list
and resets the ``failure`` attribute back to ``None`` and sets the
state to the provided one, as well as setting this atom
and resets the ``failure`` and ``revert_failure`` and
``revert_results`` attributes back to ``None`` and sets the state
to the provided one, as well as setting this retry
details ``intention`` attribute to ``EXECUTE``.
"""
self.results = []
self.revert_results = None
self.failure = None
self.revert_failure = None
self.state = state
self.intention = states.EXECUTE
@ -711,14 +773,15 @@ class RetryDetail(AtomDetail):
copied via ``copy.copy``).
NOTE(harlowja): This copy does **not** copy
the incoming objects ``results`` attribute. Instead this
objects ``results`` attribute list is iterated over and a new list
is constructed with each ``(data, failures)`` element in that list
having its ``failures`` (a dictionary of each named
the incoming objects ``results`` or ``revert_results`` attributes.
Instead this objects ``results`` attribute list is iterated over and
a new list is constructed with each ``(data, failures)`` element in
that list having its ``failures`` (a dictionary of each named
:py:class:`~taskflow.types.failure.Failure` object that
occured) copied but its ``data`` is left untouched. After
this is done that new list becomes (via assignment) the cloned
objects ``results`` attribute.
objects ``results`` attribute. The ``revert_results`` is directly
assigned to the cloned objects ``revert_results`` attribute.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
what happens if the ``data`` in ``results`` is copied at a
@ -738,6 +801,7 @@ class RetryDetail(AtomDetail):
copied_failures[key] = failure
results.append((data, copied_failures))
clone.results = results
clone.revert_results = self.revert_results
if self.meta:
clone.meta = self.meta.copy()
if self.version:
@ -771,21 +835,50 @@ class RetryDetail(AtomDetail):
def put(self, state, result):
"""Puts a result (acquired in the given state) into this detail.
If the result is a :py:class:`~taskflow.types.failure.Failure` object
then the ``failure`` attribute will be set; if the result is not a
:py:class:`~taskflow.types.failure.Failure` object then the
``results`` attribute will be appended to (and the ``failure``
attribute will be set to ``None``). In either case the ``state``
attribute will be set to the provided state.
Returns whether this object was modified (or whether it was not).
"""
# Do not clean retry history (only on reset does this happen).
self.state = state
if self._was_failure(state, result):
self.failure = result
else:
was_altered = False
if state != self.state:
self.state = state
was_altered = True
if state == states.REVERT_FAILURE:
if result != self.revert_failure:
self.revert_failure = result
was_altered = True
if not _is_all_none(self.revert_results):
self.revert_results = None
was_altered = True
elif state == states.FAILURE:
if result != self.failure:
self.failure = result
was_altered = True
if not _is_all_none(self.revert_results, self.revert_failure):
self.revert_results = None
self.revert_failure = None
was_altered = True
elif state == states.SUCCESS:
if not _is_all_none(self.failure, self.revert_failure,
self.revert_results):
self.failure = None
self.revert_failure = None
self.revert_results = None
# Track what we produced, so that we can examine it (or avoid
# using it again).
self.results.append((result, {}))
self.failure = None
return True
was_altered = True
elif state == states.REVERTED:
# We don't really have the ability to determine equality of
# task (user) results at the current time, without making
# potentially bad guesses, so assume the retry detail always needs
# to be saved if they are not exactly equivalent...
if result is not self.revert_results:
self.revert_results = result
was_altered = True
if not _is_all_none(self.revert_failure):
self.revert_failure = None
was_altered = True
return was_altered
@classmethod
def from_dict(cls, data):

View File

@ -41,6 +41,7 @@ SUCCESS = SUCCESS
RUNNING = RUNNING
RETRYING = 'RETRYING'
IGNORE = 'IGNORE'
REVERT_FAILURE = 'REVERT_FAILURE'
# Atom intentions.
EXECUTE = 'EXECUTE'
@ -157,20 +158,20 @@ def check_flow_transition(old_state, new_state):
# Task state transitions
# See: http://docs.openstack.org/developer/taskflow/states.html
# See: http://docs.openstack.org/developer/taskflow/states.html#task
_ALLOWED_TASK_TRANSITIONS = frozenset((
(PENDING, RUNNING), # run it!
(PENDING, IGNORE), # skip it!
(RUNNING, SUCCESS), # the task finished successfully
(RUNNING, FAILURE), # the task failed
(RUNNING, SUCCESS), # the task executed successfully
(RUNNING, FAILURE), # the task execution failed
(FAILURE, REVERTING), # task failed, do cleanup now
(SUCCESS, REVERTING), # some other task failed, do cleanup now
(FAILURE, REVERTING), # task execution failed, try reverting...
(SUCCESS, REVERTING), # some other task failed, try reverting...
(REVERTING, REVERTED), # revert done
(REVERTING, FAILURE), # revert failed
(REVERTING, REVERTED), # the task reverted successfully
(REVERTING, REVERT_FAILURE), # the task failed reverting (terminal!)
(REVERTED, PENDING), # try again
(IGNORE, PENDING), # try again

View File

@ -28,15 +28,43 @@ from taskflow.persistence import models
from taskflow import retry
from taskflow import states
from taskflow import task
from taskflow.types import failure
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
STATES_WITH_RESULTS = (states.SUCCESS, states.REVERTING, states.FAILURE)
_EXECUTE_STATES_WITH_RESULTS = (
# The atom ``execute`` worked out :)
states.SUCCESS,
# The atom ``execute`` didn't work out :(
states.FAILURE,
# In this state we will still have access to prior SUCCESS (or FAILURE)
# results, so make sure extraction is still allowed in this state...
states.REVERTING,
)
_REVERT_STATES_WITH_RESULTS = (
# The atom ``revert`` worked out :)
states.REVERTED,
# The atom ``revert`` didn't work out :(
states.REVERT_FAILURE,
# In this state we will still have access to prior SUCCESS (or FAILURE)
# results, so make sure extraction is still allowed in this state...
states.REVERTING,
)
# Atom states that may have results...
STATES_WITH_RESULTS = set()
STATES_WITH_RESULTS.update(_REVERT_STATES_WITH_RESULTS)
STATES_WITH_RESULTS.update(_EXECUTE_STATES_WITH_RESULTS)
STATES_WITH_RESULTS = tuple(sorted(STATES_WITH_RESULTS))
# TODO(harlowja): do this better (via a singleton or something else...)
_TRANSIENT_PROVIDER = object()
# Only for these intentions will we cache any failures that happened...
_SAVE_FAILURE_INTENTIONS = (states.EXECUTE, states.REVERT)
# NOTE(harlowja): Perhaps the container is a dictionary-like object and that
# key does not exist (key error), or the container is a tuple/list and a
# non-numeric key is being requested (index error), or there was no container
@ -164,8 +192,12 @@ class Storage(object):
# so we cache failures here, in atom name -> failure mapping.
self._failures = {}
for ad in self._flowdetail:
fail_cache = {}
if ad.failure is not None:
self._failures[ad.name] = ad.failure
fail_cache[states.EXECUTE] = ad.failure
if ad.revert_failure is not None:
fail_cache[states.REVERT] = ad.revert_failure
self._failures[ad.name] = fail_cache
self._atom_name_to_uuid = dict((ad.name, ad.uuid)
for ad in self._flowdetail)
@ -247,6 +279,7 @@ class Storage(object):
atom_ids[i] = ad.uuid
self._atom_name_to_uuid[atom_name] = ad.uuid
self._set_result_mapping(atom_name, atom.save_as)
self._failures.setdefault(atom_name, {})
return atom_ids
def ensure_atom(self, atom):
@ -448,21 +481,23 @@ class Storage(object):
"with index %r (name %s)", atom_name, index, name)
@fasteners.write_locked
def save(self, atom_name, data, state=states.SUCCESS):
"""Save result for named atom into storage with given state."""
def save(self, atom_name, result, state=states.SUCCESS):
"""Put result for atom with provided name to storage."""
source, clone = self._atomdetail_by_name(atom_name, clone=True)
if clone.put(state, data):
result = self._with_connection(self._save_atom_detail,
source, clone)
else:
result = clone
if state == states.FAILURE and isinstance(data, failure.Failure):
if clone.put(state, result):
self._with_connection(self._save_atom_detail, source, clone)
# We need to somehow place more of this responsibility on the atom
# detail class itself, vs doing it here; since it ties those two
# together (which is bad)...
if state in (states.FAILURE, states.REVERT_FAILURE):
# NOTE(imelnikov): failure serialization looses information,
# so we cache failures here, in atom name -> failure mapping so
# that we can later use the better version on fetch/get.
self._failures[result.name] = data
else:
self._check_all_results_provided(result.name, data)
if clone.intention in _SAVE_FAILURE_INTENTIONS:
fail_cache = self._failures[clone.name]
fail_cache[clone.intention] = result
if state == states.SUCCESS and clone.intention == states.EXECUTE:
self._check_all_results_provided(clone.name, result)
@fasteners.write_locked
def save_retry_failure(self, retry_name, failed_atom_name, failure):
@ -491,39 +526,69 @@ class Storage(object):
self._with_connection(self._save_atom_detail, source, clone)
@fasteners.read_locked
def _get(self, atom_name, only_last=False):
def _get(self, atom_name,
results_attr_name, fail_attr_name,
allowed_states, fail_cache_key):
source, _clone = self._atomdetail_by_name(atom_name)
if source.failure is not None:
cached = self._failures.get(atom_name)
if source.failure.matches(cached):
# Try to give the version back that should have the backtrace
# instead of one that has it stripped (since backtraces are not
# serializable).
return cached
return source.failure
if source.state not in STATES_WITH_RESULTS:
failure = getattr(source, fail_attr_name)
if failure is not None:
fail_cache = self._failures[atom_name]
try:
fail = fail_cache[fail_cache_key]
if failure.matches(fail):
# Try to give the version back that should have the
# backtrace instead of one that has it
# stripped (since backtraces are not serializable).
failure = fail
except KeyError:
pass
return failure
# TODO(harlowja): this seems like it should be checked before fetching
# the potential failure, instead of after, fix this soon...
if source.state not in allowed_states:
raise exceptions.NotFound("Result for atom %s is not currently"
" known" % atom_name)
if only_last:
return source.last_results
else:
return source.results
return getattr(source, results_attr_name)
def get(self, atom_name):
"""Gets the results for an atom with a given name from storage."""
return self._get(atom_name)
def get_execute_result(self, atom_name):
"""Gets the ``execute`` results for an atom from storage."""
return self._get(atom_name, 'results', 'failure',
_EXECUTE_STATES_WITH_RESULTS, states.EXECUTE)
@fasteners.read_locked
def get_failures(self):
"""Get list of failures that happened with this flow.
def _get_failures(self, fail_cache_key):
failures = {}
for atom_name, fail_cache in six.iteritems(self._failures):
try:
failures[atom_name] = fail_cache[fail_cache_key]
except KeyError:
pass
return failures
No order guaranteed.
"""
return self._failures.copy()
def get_execute_failures(self):
"""Get all ``execute`` failures that happened with this flow."""
return self._get_failures(states.EXECUTE)
# TODO(harlowja): remove these in the future?
get = get_execute_result
get_failures = get_execute_failures
def get_revert_result(self, atom_name):
"""Gets the ``revert`` results for an atom from storage."""
return self._get(atom_name, 'revert_results', 'revert_failure',
_REVERT_STATES_WITH_RESULTS, states.REVERT)
def get_revert_failures(self):
"""Get all ``revert`` failures that happened with this flow."""
return self._get_failures(states.REVERT)
@fasteners.read_locked
def has_failures(self):
"""Returns True if there are failed tasks in the storage."""
return bool(self._failures)
"""Returns true if there are **any** failures in storage."""
for fail_cache in six.itervalues(self._failures):
if fail_cache:
return True
return False
@fasteners.write_locked
def reset(self, atom_name, state=states.PENDING):
@ -534,8 +599,8 @@ class Storage(object):
if source.state == state:
return
clone.reset(state)
result = self._with_connection(self._save_atom_detail, source, clone)
self._failures.pop(result.name, None)
self._with_connection(self._save_atom_detail, source, clone)
self._failures[clone.name].clear()
def inject_atom_args(self, atom_name, pairs, transient=True):
"""Add values into storage for a specific atom only.
@ -681,7 +746,7 @@ class Storage(object):
@fasteners.read_locked
def fetch(self, name, many_handler=None):
"""Fetch a named result."""
"""Fetch a named ``execute`` result."""
def _many_handler(values):
# By default we just return the first of many (unless provided
# a different callback that can translate many results into
@ -702,7 +767,10 @@ class Storage(object):
self._transients, name))
else:
try:
container = self._get(provider.name, only_last=True)
container = self._get(provider.name,
'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS,
states.EXECUTE)
except exceptions.NotFound:
pass
else:
@ -717,7 +785,7 @@ class Storage(object):
@fasteners.read_locked
def fetch_unsatisfied_args(self, atom_name, args_mapping,
scope_walker=None, optional_args=None):
"""Fetch unsatisfied atom arguments using an atoms argument mapping.
"""Fetch unsatisfied ``execute`` arguments using an atoms args mapping.
NOTE(harlowja): this takes into account the provided scope walker
atoms who should produce the required value at runtime, as well as
@ -756,7 +824,9 @@ class Storage(object):
results = self._transients
else:
try:
results = self._get(p.name, only_last=True)
results = self._get(p.name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS,
states.EXECUTE)
except exceptions.NotFound:
results = {}
try:
@ -802,7 +872,7 @@ class Storage(object):
@fasteners.read_locked
def fetch_all(self, many_handler=None):
"""Fetch all named results known so far."""
"""Fetch all named ``execute`` results known so far."""
def _many_handler(values):
if len(values) > 1:
return values
@ -821,7 +891,7 @@ class Storage(object):
def fetch_mapped_args(self, args_mapping,
atom_name=None, scope_walker=None,
optional_args=None):
"""Fetch arguments for an atom using an atoms argument mapping."""
"""Fetch ``execute`` arguments for an atom using its args mapping."""
def _extract_first_from(name, sources):
"""Extracts/returns first occurence of key in list of dicts."""
@ -835,7 +905,9 @@ class Storage(object):
def _get_results(looking_for, provider):
"""Gets the results saved for a given provider."""
try:
return self._get(provider.name, only_last=True)
return self._get(provider.name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS,
states.EXECUTE)
except exceptions.NotFound:
exceptions.raise_with_cause(exceptions.NotFound,
"Expected to be able to find"
@ -963,11 +1035,14 @@ class Storage(object):
# NOTE(harlowja): Try to use our local cache to get a more
# complete failure object that has a traceback (instead of the
# one that is saved which will *typically* not have one)...
cached = self._failures.get(ad.name)
if ad.failure.matches(cached):
failure = cached
else:
failure = ad.failure
failure = ad.failure
fail_cache = self._failures[ad.name]
try:
fail = fail_cache[states.EXECUTE]
if failure.matches(fail):
failure = fail
except KeyError:
pass
return retry.History(ad.results, failure=failure)
@fasteners.read_locked

View File

@ -126,7 +126,8 @@ class RunnerTest(test.TestCase, _RunnerTestMixin):
failure = failures[0]
self.assertTrue(failure.check(RuntimeError))
self.assertEqual(st.FAILURE, rt.storage.get_atom_state(tasks[0].name))
self.assertEqual(st.REVERT_FAILURE,
rt.storage.get_atom_state(tasks[0].name))
def test_run_iterations_suspended(self):
flow = lf.Flow("root")

View File

@ -21,11 +21,16 @@ from taskflow import test
class TransitionTest(test.TestCase):
_DISALLOWED_TPL = "Transition from '%s' to '%s' was found to be disallowed"
_NOT_IGNORED_TPL = "Transition from '%s' to '%s' was not ignored"
def assertTransitionAllowed(self, from_state, to_state):
self.assertTrue(self.check_transition(from_state, to_state))
msg = self._DISALLOWED_TPL % (from_state, to_state)
self.assertTrue(self.check_transition(from_state, to_state), msg=msg)
def assertTransitionIgnored(self, from_state, to_state):
self.assertFalse(self.check_transition(from_state, to_state))
msg = self._NOT_IGNORED_TPL % (from_state, to_state)
self.assertFalse(self.check_transition(from_state, to_state), msg=msg)
def assertTransitionForbidden(self, from_state, to_state):
self.assertRaisesRegexp(exc.InvalidState,
@ -101,7 +106,8 @@ class CheckTaskTransitionTest(TransitionTest):
def test_from_reverting_state(self):
self.assertTransitions(from_state=states.REVERTING,
allowed=(states.FAILURE, states.REVERTED),
allowed=(states.REVERT_FAILURE,
states.REVERTED),
ignored=(states.RUNNING, states.REVERTING,
states.PENDING, states.SUCCESS))

View File

@ -66,7 +66,7 @@ class EngineTaskTest(object):
engine = self._make_engine(flow)
expected = ['fail.f RUNNING', 'fail.t RUNNING',
'fail.t FAILURE(Failure: RuntimeError: Woot!)',
'fail.t REVERTING', 'fail.t REVERTED',
'fail.t REVERTING', 'fail.t REVERTED(None)',
'fail.f REVERTED']
with utils.CaptureListener(engine, values=values) as capturer:
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
@ -374,6 +374,29 @@ class EngineLinearFlowTest(utils.EngineTestBase):
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
self.assertEqual(engine.storage.fetch_all(), {})
def test_revert_provided(self):
flow = lf.Flow('revert').add(
utils.GiveBackRevert('giver'),
utils.FailingTask(name='fail')
)
engine = self._make_engine(flow, store={'value': 0})
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
self.assertEqual(engine.storage.get_revert_result('giver'), 2)
def test_nasty_revert(self):
flow = lf.Flow('revert').add(
utils.NastyTask('nasty'),
utils.FailingTask(name='fail')
)
engine = self._make_engine(flow)
self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
fail = engine.storage.get_revert_result('nasty')
self.assertIsNotNone(fail.check(RuntimeError))
exec_failures = engine.storage.get_execute_failures()
self.assertIn('fail', exec_failures)
rev_failures = engine.storage.get_revert_failures()
self.assertIn('nasty', rev_failures)
def test_sequential_flow_nested_blocks(self):
flow = lf.Flow('nested-1').add(
utils.ProgressingTask('task1'),
@ -406,7 +429,7 @@ class EngineLinearFlowTest(utils.EngineTestBase):
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
expected = ['fail.t RUNNING',
'fail.t FAILURE(Failure: RuntimeError: Woot!)',
'fail.t REVERTING', 'fail.t REVERTED']
'fail.t REVERTING', 'fail.t REVERTED(None)']
self.assertEqual(expected, capturer.values)
def test_correctly_reverts_children(self):
@ -424,9 +447,9 @@ class EngineLinearFlowTest(utils.EngineTestBase):
'task2.t RUNNING', 'task2.t SUCCESS(5)',
'fail.t RUNNING',
'fail.t FAILURE(Failure: RuntimeError: Woot!)',
'fail.t REVERTING', 'fail.t REVERTED',
'task2.t REVERTING', 'task2.t REVERTED',
'task1.t REVERTING', 'task1.t REVERTED']
'fail.t REVERTING', 'fail.t REVERTED(None)',
'task2.t REVERTING', 'task2.t REVERTED(None)',
'task1.t REVERTING', 'task1.t REVERTED(None)']
self.assertEqual(expected, capturer.values)
@ -529,18 +552,19 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
# NOTE(imelnikov): we don't know if task 3 was run, but if it was,
# it should have been reverted in correct order.
# it should have been REVERTED(None) in correct order.
possible_values_no_task3 = [
'task1.t RUNNING', 'task2.t RUNNING',
'fail.t FAILURE(Failure: RuntimeError: Woot!)',
'task2.t REVERTED', 'task1.t REVERTED'
'task2.t REVERTED(None)', 'task1.t REVERTED(None)'
]
self.assertIsSuperAndSubsequence(capturer.values,
possible_values_no_task3)
if 'task3' in capturer.values:
possible_values_task3 = [
'task1.t RUNNING', 'task2.t RUNNING', 'task3.t RUNNING',
'task3.t REVERTED', 'task2.t REVERTED', 'task1.t REVERTED'
'task3.t REVERTED(None)', 'task2.t REVERTED(None)',
'task1.t REVERTED(None)'
]
self.assertIsSuperAndSubsequence(capturer.values,
possible_values_task3)
@ -561,12 +585,12 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
# NOTE(imelnikov): we don't know if task 3 was run, but if it was,
# it should have been reverted in correct order.
# it should have been REVERTED(None) in correct order.
possible_values = ['task1.t RUNNING', 'task1.t SUCCESS(5)',
'task2.t RUNNING', 'task2.t SUCCESS(5)',
'task3.t RUNNING', 'task3.t SUCCESS(5)',
'task3.t REVERTING',
'task3.t REVERTED']
'task3.t REVERTED(None)']
self.assertIsSuperAndSubsequence(possible_values, capturer.values)
possible_values_no_task3 = ['task1.t RUNNING', 'task2.t RUNNING']
self.assertIsSuperAndSubsequence(capturer.values,
@ -589,12 +613,12 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
# NOTE(imelnikov): if task1 was run, it should have been reverted.
if 'task1' in capturer.values:
task1_story = ['task1.t RUNNING', 'task1.t SUCCESS(5)',
'task1.t REVERTED']
'task1.t REVERTED(None)']
self.assertIsSuperAndSubsequence(capturer.values, task1_story)
# NOTE(imelnikov): task2 should have been run and reverted
task2_story = ['task2.t RUNNING', 'task2.t SUCCESS(5)',
'task2.t REVERTED']
'task2.t REVERTED(None)']
self.assertIsSuperAndSubsequence(capturer.values, task2_story)
def test_revert_raises_for_linear_in_unordered(self):
@ -608,7 +632,7 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
engine = self._make_engine(flow)
with utils.CaptureListener(engine, capture_flow=False) as capturer:
self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
self.assertNotIn('task2.t REVERTED', capturer.values)
self.assertNotIn('task2.t REVERTED(None)', capturer.values)
class EngineGraphFlowTest(utils.EngineTestBase):
@ -697,11 +721,11 @@ class EngineGraphFlowTest(utils.EngineTestBase):
'task3.t RUNNING',
'task3.t FAILURE(Failure: RuntimeError: Woot!)',
'task3.t REVERTING',
'task3.t REVERTED',
'task3.t REVERTED(None)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'task1.t REVERTING',
'task1.t REVERTED']
'task1.t REVERTED(None)']
self.assertEqual(expected, capturer.values)
self.assertEqual(engine.storage.get_flow_state(), states.REVERTED)

View File

@ -82,8 +82,8 @@ class RetryTest(utils.EngineTestBase):
'task1.t RUNNING', 'task1.t SUCCESS(5)',
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot!)',
'task2.t REVERTING', 'task2.t REVERTED',
'task1.t REVERTING', 'task1.t REVERTED',
'task2.t REVERTING', 'task2.t REVERTED(None)',
'task1.t REVERTING', 'task1.t REVERTED(None)',
'r1.r RETRYING',
'task1.t PENDING',
'task2.t PENDING',
@ -114,9 +114,9 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot!)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'task1.t REVERTING',
'task1.t REVERTED',
'task1.t REVERTED(None)',
'r1.r RETRYING',
'task1.t PENDING',
'task2.t PENDING',
@ -127,11 +127,11 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot!)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'task1.t REVERTING',
'task1.t REVERTED',
'task1.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED',
'r1.r REVERTED(None)',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
@ -153,9 +153,9 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot!)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'task1.t REVERTING',
'task1.t FAILURE',
'task1.t REVERT_FAILURE(Failure: RuntimeError: Gotcha!)',
'flow-1.f FAILURE']
self.assertEqual(expected, capturer.values)
@ -185,9 +185,9 @@ class RetryTest(utils.EngineTestBase):
'task3.t RUNNING',
'task3.t FAILURE(Failure: RuntimeError: Woot!)',
'task3.t REVERTING',
'task3.t REVERTED',
'task3.t REVERTED(None)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r2.r RETRYING',
'task2.t PENDING',
'task3.t PENDING',
@ -231,15 +231,15 @@ class RetryTest(utils.EngineTestBase):
'task4.t RUNNING',
'task4.t FAILURE(Failure: RuntimeError: Woot!)',
'task4.t REVERTING',
'task4.t REVERTED',
'task4.t REVERTED(None)',
'task3.t REVERTING',
'task3.t REVERTED',
'task3.t REVERTED(None)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r2.r REVERTING',
'r2.r REVERTED',
'r2.r REVERTED(None)',
'task1.t REVERTING',
'task1.t REVERTED',
'task1.t REVERTED(None)',
'r1.r RETRYING',
'task1.t PENDING',
'r2.r PENDING',
@ -280,8 +280,8 @@ class RetryTest(utils.EngineTestBase):
'task2.t FAILURE(Failure: RuntimeError: Woot!)',
'task2.t REVERTING',
'task1.t REVERTING',
'task2.t REVERTED',
'task1.t REVERTED',
'task2.t REVERTED(None)',
'task1.t REVERTED(None)',
'r.r RETRYING',
'task1.t PENDING',
'task2.t PENDING',
@ -316,11 +316,11 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot!)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r2.r REVERTING',
'r2.r REVERTED',
'r2.r REVERTED(None)',
'task1.t REVERTING',
'task1.t REVERTED',
'task1.t REVERTED(None)',
'r1.r RETRYING',
'task1.t PENDING',
'r2.r PENDING',
@ -359,9 +359,9 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot!)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED',
'r1.r REVERTED(None)',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
@ -388,11 +388,11 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot!)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED',
'r1.r REVERTED(None)',
'task1.t REVERTING',
'task1.t REVERTED',
'task1.t REVERTED(None)',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
@ -417,13 +417,13 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot!)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r2.r REVERTING',
'r2.r REVERTED',
'r2.r REVERTED(None)',
'task1.t REVERTING',
'task1.t REVERTED',
'task1.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED',
'r1.r REVERTED(None)',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
@ -515,7 +515,7 @@ class RetryTest(utils.EngineTestBase):
'c.t RUNNING',
'c.t FAILURE(Failure: RuntimeError: Woot!)',
'c.t REVERTING',
'c.t REVERTED',
'c.t REVERTED(None)',
'r1.r RETRYING',
'c.t PENDING',
'r1.r RUNNING',
@ -542,9 +542,9 @@ class RetryTest(utils.EngineTestBase):
't2.t RUNNING',
't2.t FAILURE(Failure: RuntimeError: Woot!)',
't2.t REVERTING',
't2.t REVERTED',
't2.t REVERTED(None)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r RETRYING',
't1.t PENDING',
't2.t PENDING',
@ -555,9 +555,9 @@ class RetryTest(utils.EngineTestBase):
't2.t RUNNING',
't2.t FAILURE(Failure: RuntimeError: Woot!)',
't2.t REVERTING',
't2.t REVERTED',
't2.t REVERTED(None)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r RETRYING',
't1.t PENDING',
't2.t PENDING',
@ -568,11 +568,11 @@ class RetryTest(utils.EngineTestBase):
't2.t RUNNING',
't2.t FAILURE(Failure: RuntimeError: Woot!)',
't2.t REVERTING',
't2.t REVERTED',
't2.t REVERTED(None)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED',
'r1.r REVERTED(None)',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
@ -589,7 +589,7 @@ class RetryTest(utils.EngineTestBase):
't1.t RUNNING',
't1.t FAILURE(Failure: RuntimeError: Woot with 3)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r RETRYING',
't1.t PENDING',
'r1.r RUNNING',
@ -597,7 +597,7 @@ class RetryTest(utils.EngineTestBase):
't1.t RUNNING',
't1.t FAILURE(Failure: RuntimeError: Woot with 2)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r RETRYING',
't1.t PENDING',
'r1.r RUNNING',
@ -605,7 +605,7 @@ class RetryTest(utils.EngineTestBase):
't1.t RUNNING',
't1.t FAILURE(Failure: RuntimeError: Woot with 3)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r RETRYING',
't1.t PENDING',
'r1.r RUNNING',
@ -613,9 +613,9 @@ class RetryTest(utils.EngineTestBase):
't1.t RUNNING',
't1.t FAILURE(Failure: RuntimeError: Woot with 5)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED',
'r1.r REVERTED(None)',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
@ -632,7 +632,7 @@ class RetryTest(utils.EngineTestBase):
't1.t RUNNING',
't1.t FAILURE(Failure: RuntimeError: Woot with 2)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r RETRYING',
't1.t PENDING',
'r1.r RUNNING',
@ -640,7 +640,7 @@ class RetryTest(utils.EngineTestBase):
't1.t RUNNING',
't1.t FAILURE(Failure: RuntimeError: Woot with 3)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r RETRYING',
't1.t PENDING',
'r1.r RUNNING',
@ -648,9 +648,9 @@ class RetryTest(utils.EngineTestBase):
't1.t RUNNING',
't1.t FAILURE(Failure: RuntimeError: Woot with 5)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED',
'r1.r REVERTED(None)',
'flow-1.f REVERTED']
self.assertItemsEqual(capturer.values, expected)
@ -674,7 +674,7 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 3)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r1.r RETRYING',
'task2.t PENDING',
'r1.r RUNNING',
@ -682,7 +682,7 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 2)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r1.r RETRYING',
'task2.t PENDING',
'r1.r RUNNING',
@ -690,7 +690,7 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 3)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r1.r RETRYING',
'task2.t PENDING',
'r1.r RUNNING',
@ -698,9 +698,9 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 5)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED',
'r1.r REVERTED(None)',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
@ -724,7 +724,7 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 3)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r1.r RETRYING',
'task2.t PENDING',
'r1.r RUNNING',
@ -732,7 +732,7 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 2)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r1.r RETRYING',
'task2.t PENDING',
'r1.r RUNNING',
@ -740,7 +740,7 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 3)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r1.r RETRYING',
'task2.t PENDING',
'r1.r RUNNING',
@ -748,11 +748,11 @@ class RetryTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 5)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED',
'r1.r REVERTED(None)',
'task1.t REVERTING',
'task1.t REVERTED',
'task1.t REVERTED(None)',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
@ -778,7 +778,7 @@ class RetryTest(utils.EngineTestBase):
't1.t RUNNING',
't1.t FAILURE(Failure: RuntimeError: Woot with 3)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r RETRYING',
't1.t PENDING',
'r1.r RUNNING',
@ -786,7 +786,7 @@ class RetryTest(utils.EngineTestBase):
't1.t RUNNING',
't1.t FAILURE(Failure: RuntimeError: Woot with 2)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r RETRYING',
't1.t PENDING',
'r1.r RUNNING',
@ -794,9 +794,9 @@ class RetryTest(utils.EngineTestBase):
't1.t RUNNING',
't1.t FAILURE(Failure: RuntimeError: Woot with 5)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED',
'r1.r REVERTED(None)',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
@ -814,7 +814,7 @@ class RetryTest(utils.EngineTestBase):
't1.t RUNNING',
't1.t FAILURE(Failure: RuntimeError: Woot with 3)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r RETRYING',
't1.t PENDING',
'r1.r RUNNING',
@ -822,7 +822,7 @@ class RetryTest(utils.EngineTestBase):
't1.t RUNNING',
't1.t FAILURE(Failure: RuntimeError: Woot with 2)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r RETRYING',
't1.t PENDING',
'r1.r RUNNING',
@ -830,9 +830,9 @@ class RetryTest(utils.EngineTestBase):
't1.t RUNNING',
't1.t FAILURE(Failure: RuntimeError: Woot with 5)',
't1.t REVERTING',
't1.t REVERTED',
't1.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED',
'r1.r REVERTED(None)',
'flow-1.f REVERTED']
self.assertItemsEqual(capturer.values, expected)
@ -857,7 +857,7 @@ class RetryTest(utils.EngineTestBase):
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)',
'task-2.t REVERTING',
'task-2.t REVERTED',
'task-2.t REVERTED(None)',
'r1.r RETRYING',
'task-2.t PENDING',
'r1.r RUNNING',
@ -865,7 +865,7 @@ class RetryTest(utils.EngineTestBase):
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)',
'task-2.t REVERTING',
'task-2.t REVERTED',
'task-2.t REVERTED(None)',
'r1.r RETRYING',
'task-2.t PENDING',
'r1.r RUNNING',
@ -873,9 +873,9 @@ class RetryTest(utils.EngineTestBase):
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)',
'task-2.t REVERTING',
'task-2.t REVERTED',
'task-2.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED',
'r1.r REVERTED(None)',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
@ -901,7 +901,7 @@ class RetryTest(utils.EngineTestBase):
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)',
'task-2.t REVERTING',
'task-2.t REVERTED',
'task-2.t REVERTED(None)',
'r1.r RETRYING',
'task-2.t PENDING',
'r1.r RUNNING',
@ -909,7 +909,7 @@ class RetryTest(utils.EngineTestBase):
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)',
'task-2.t REVERTING',
'task-2.t REVERTED',
'task-2.t REVERTED(None)',
'r1.r RETRYING',
'task-2.t PENDING',
'r1.r RUNNING',
@ -917,11 +917,11 @@ class RetryTest(utils.EngineTestBase):
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)',
'task-2.t REVERTING',
'task-2.t REVERTED',
'task-2.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED',
'r1.r REVERTED(None)',
'task-1.t REVERTING',
'task-1.t REVERTED',
'task-1.t REVERTED(None)',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
@ -973,7 +973,7 @@ class RetryTest(utils.EngineTestBase):
with utils.CaptureListener(engine) as capturer:
engine.run()
expected = ['task1.t REVERTING',
'task1.t REVERTED',
'task1.t REVERTED(None)',
'flow-1_retry.r RETRYING',
'task1.t PENDING',
'flow-1_retry.r RUNNING',
@ -988,7 +988,7 @@ class RetryTest(utils.EngineTestBase):
with utils.CaptureListener(engine) as capturer:
engine.run()
expected = ['task1.t REVERTING',
'task1.t REVERTED',
'task1.t REVERTED(None)',
'flow-1_retry.r RETRYING',
'task1.t PENDING',
'flow-1_retry.r RUNNING',
@ -1003,7 +1003,7 @@ class RetryTest(utils.EngineTestBase):
with utils.CaptureListener(engine) as capturer:
engine.run()
expected = ['task1.t REVERTING',
'task1.t REVERTED',
'task1.t REVERTED(None)',
'flow-1_retry.r RETRYING',
'task1.t PENDING',
'flow-1_retry.r RUNNING',
@ -1018,7 +1018,7 @@ class RetryTest(utils.EngineTestBase):
with utils.CaptureListener(engine) as capturer:
engine.run()
expected = ['task1.t REVERTING',
'task1.t REVERTED',
'task1.t REVERTED(None)',
'flow-1_retry.r RETRYING',
'task1.t PENDING',
'flow-1_retry.r RUNNING',
@ -1032,7 +1032,7 @@ class RetryTest(utils.EngineTestBase):
engine = self._pretend_to_run_a_flow_and_crash('revert scheduled')
with utils.CaptureListener(engine) as capturer:
engine.run()
expected = ['task1.t REVERTED',
expected = ['task1.t REVERTED(None)',
'flow-1_retry.r RETRYING',
'task1.t PENDING',
'flow-1_retry.r RUNNING',
@ -1077,16 +1077,16 @@ class RetryTest(utils.EngineTestBase):
'c.t FAILURE(Failure: RuntimeError: Woot!)',
'a.t REVERTING',
'c.t REVERTING',
'a.t REVERTED',
'c.t REVERTED',
'a.t REVERTED(None)',
'c.t REVERTED(None)',
'b.t REVERTING',
'b.t REVERTED']
'b.t REVERTED(None)']
self.assertItemsEqual(capturer.values[:8], expected)
# Task 'a' was or was not executed again, both cases are ok.
self.assertIsSuperAndSubsequence(capturer.values[8:], [
'b.t RUNNING',
'c.t FAILURE(Failure: RuntimeError: Woot!)',
'b.t REVERTED',
'b.t REVERTED(None)',
])
self.assertEqual(engine.storage.get_flow_state(), st.REVERTED)
@ -1107,9 +1107,9 @@ class RetryTest(utils.EngineTestBase):
with utils.CaptureListener(engine, capture_flow=False) as capturer:
engine.run()
expected = ['c.t REVERTING',
'c.t REVERTED',
'c.t REVERTED(None)',
'b.t REVERTING',
'b.t REVERTED']
'b.t REVERTED(None)']
self.assertItemsEqual(capturer.values[:4], expected)
expected = ['test2_retry.r RETRYING',
'b.t PENDING',
@ -1149,10 +1149,10 @@ class RetryParallelExecutionTest(utils.EngineTestBase):
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot!)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'task1.t SUCCESS(5)',
'task1.t REVERTING',
'task1.t REVERTED',
'task1.t REVERTED(None)',
'r.r RETRYING',
'task1.t PENDING',
'task2.t PENDING',
@ -1189,10 +1189,10 @@ class RetryParallelExecutionTest(utils.EngineTestBase):
'task3.t FAILURE(Failure: RuntimeError: Woot!)',
'task3.t REVERTING',
'task1.t REVERTING',
'task3.t REVERTED',
'task1.t REVERTED',
'task3.t REVERTED(None)',
'task1.t REVERTED(None)',
'task2.t REVERTING',
'task2.t REVERTED',
'task2.t REVERTED(None)',
'r.r RETRYING',
'task1.t PENDING',
'task2.t PENDING',

View File

@ -118,13 +118,6 @@ class StorageTestMixin(object):
self.assertEqual(s.fetch_all(), {})
self.assertEqual(s.get_atom_state('my task'), states.SUCCESS)
def test_save_and_get_other_state(self):
s = self._get_storage()
s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', 5, states.FAILURE)
self.assertEqual(s.get('my task'), 5)
self.assertEqual(s.get_atom_state('my task'), states.FAILURE)
def test_save_and_get_cached_failure(self):
a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
@ -141,7 +134,7 @@ class StorageTestMixin(object):
s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', a_failure, states.FAILURE)
self.assertEqual(s.get('my task'), a_failure)
s._failures['my task'] = None
s._failures['my task'] = {}
self.assertTrue(a_failure.matches(s.get('my task')))
def test_get_failure_from_reverted_task(self):
@ -564,6 +557,33 @@ class StorageTestMixin(object):
args = s.fetch_mapped_args(t.rebind, atom_name=t.name)
self.assertEqual(3, args['x'])
def test_save_fetch(self):
t = test_utils.GiveBackRevert('my task')
s = self._get_storage()
s.ensure_atom(t)
s.save('my task', 2)
self.assertEqual(2, s.get('my task'))
self.assertRaises(exceptions.NotFound,
s.get_revert_result, 'my task')
def test_save_fetch_revert(self):
t = test_utils.GiveBackRevert('my task')
s = self._get_storage()
s.ensure_atom(t)
s.set_atom_intention('my task', states.REVERT)
s.save('my task', 2, state=states.REVERTED)
self.assertRaises(exceptions.NotFound, s.get, 'my task')
self.assertEqual(2, s.get_revert_result('my task'))
def test_save_fail_fetch_revert(self):
t = test_utils.GiveBackRevert('my task')
s = self._get_storage()
s.ensure_atom(t)
s.set_atom_intention('my task', states.REVERT)
a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
s.save('my task', a_failure, state=states.REVERT_FAILURE)
self.assertEqual(a_failure, s.get_revert_result('my task'))
class StorageMemoryTest(StorageTestMixin, test.TestCase):
def setUp(self):

View File

@ -97,14 +97,14 @@ class SuspendTest(utils.EngineTestBase):
'c.t RUNNING',
'c.t FAILURE(Failure: RuntimeError: Woot!)',
'c.t REVERTING',
'c.t REVERTED',
'c.t REVERTED(None)',
'b.t REVERTING',
'b.t REVERTED']
'b.t REVERTED(None)']
self.assertEqual(expected, capturer.values)
with utils.CaptureListener(engine, capture_flow=False) as capturer:
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
self.assertEqual(engine.storage.get_flow_state(), states.REVERTED)
expected = ['a.t REVERTING', 'a.t REVERTED']
expected = ['a.t REVERTING', 'a.t REVERTED(None)']
self.assertEqual(expected, capturer.values)
def test_suspend_and_resume_linear_flow_on_revert(self):
@ -124,9 +124,9 @@ class SuspendTest(utils.EngineTestBase):
'c.t RUNNING',
'c.t FAILURE(Failure: RuntimeError: Woot!)',
'c.t REVERTING',
'c.t REVERTED',
'c.t REVERTED(None)',
'b.t REVERTING',
'b.t REVERTED']
'b.t REVERTED(None)']
self.assertEqual(expected, capturer.values)
# pretend we are resuming
@ -135,7 +135,7 @@ class SuspendTest(utils.EngineTestBase):
self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run)
self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED)
expected = ['a.t REVERTING',
'a.t REVERTED']
'a.t REVERTED(None)']
self.assertEqual(expected, capturer2.values)
def test_suspend_and_revert_even_if_task_is_gone(self):
@ -157,9 +157,9 @@ class SuspendTest(utils.EngineTestBase):
'c.t RUNNING',
'c.t FAILURE(Failure: RuntimeError: Woot!)',
'c.t REVERTING',
'c.t REVERTED',
'c.t REVERTED(None)',
'b.t REVERTING',
'b.t REVERTED']
'b.t REVERTED(None)']
self.assertEqual(expected, capturer.values)
# pretend we are resuming, but task 'c' gone when flow got updated
@ -171,7 +171,7 @@ class SuspendTest(utils.EngineTestBase):
with utils.CaptureListener(engine2, capture_flow=False) as capturer2:
self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run)
self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED)
expected = ['a.t REVERTING', 'a.t REVERTED']
expected = ['a.t REVERTING', 'a.t REVERTED(None)']
self.assertEqual(capturer2.values, expected)
def test_storage_is_rechecked(self):

View File

@ -33,7 +33,7 @@ class TestWorker(test.MockTestCase):
self.broker_url = 'test-url'
self.exchange = 'test-exchange'
self.topic = 'test-topic'
self.endpoint_count = 25
self.endpoint_count = 26
# patch classes
self.executor_mock, self.executor_inst_mock = self.patchClass(

View File

@ -117,6 +117,15 @@ class AddOne(task.Task):
return source + 1
class GiveBackRevert(task.Task):
def execute(self, value):
return value + 1
def revert(self, *args, **kwargs):
return kwargs.get('result') + 1
class FakeTask(object):
def execute(self, **kwargs):

View File

@ -291,13 +291,15 @@ class Failure(object):
def reraise_if_any(failures):
"""Re-raise exceptions if argument is not empty.
If argument is empty list, this method returns None. If
argument is a list with a single ``Failure`` object in it,
that failure is reraised. Else, a
If argument is empty list/tuple/iterator, this method returns
None. If argument is coverted into a list with a
single ``Failure`` object in it, that failure is reraised. Else, a
:class:`~taskflow.exceptions.WrappedFailure` exception
is raised with a failure list as causes.
is raised with the failure list as causes.
"""
failures = list(failures)
if not isinstance(failures, (list, tuple)):
# Convert generators/other into a list...
failures = list(failures)
if len(failures) == 1:
failures[0].reraise()
elif len(failures) > 1:

View File

@ -68,7 +68,7 @@ def make_machine(start_state, transitions):
def map_color(internal_states, state):
if state in internal_states:
return 'blue'
if state == states.FAILURE:
if state in (states.FAILURE, states.REVERT_FAILURE):
return 'red'
if state == states.REVERTED:
return 'darkorange'