From a3fe3eb698e7bfa20b0b7fddd91c37a44c092f2c Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 17 Jun 2015 11:28:57 -0700 Subject: [PATCH] Retain atom 'revert' result (or failure) When a atom is reverted it can be useful to retain the result of that 'revert' method being called, so that it can be later analyzed (or used for various purposes) so adjust the storage, and actions to enable it to be stored. Change-Id: I38a9a5f3bf7550e924468bb4a86652cb8beb306c --- doc/source/img/retry_states.svg | 4 +- doc/source/img/task_states.svg | 4 +- doc/source/states.rst | 34 +++- .../engines/action_engine/actions/base.py | 16 +- .../engines/action_engine/actions/retry.py | 20 +- .../engines/action_engine/actions/task.py | 22 +- taskflow/engines/action_engine/completer.py | 1 + taskflow/engines/action_engine/engine.py | 7 +- ..._add_revert_results_and_revert_failure_.py | 42 ++++ .../persistence/backends/sqlalchemy/tables.py | 2 + taskflow/persistence/models.py | 189 +++++++++++++----- taskflow/states.py | 15 +- taskflow/storage.py | 179 ++++++++++++----- .../tests/unit/action_engine/test_runner.py | 3 +- taskflow/tests/unit/test_check_transition.py | 12 +- taskflow/tests/unit/test_engines.py | 56 ++++-- taskflow/tests/unit/test_retries.py | 182 ++++++++--------- taskflow/tests/unit/test_storage.py | 36 +++- taskflow/tests/unit/test_suspend.py | 18 +- .../tests/unit/worker_based/test_worker.py | 2 +- taskflow/tests/utils.py | 9 + taskflow/types/failure.py | 12 +- tools/state_graph.py | 2 +- 23 files changed, 582 insertions(+), 285 deletions(-) create mode 100644 taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py diff --git a/doc/source/img/retry_states.svg b/doc/source/img/retry_states.svg index d6801b19..abf8498e 100644 --- a/doc/source/img/retry_states.svg +++ b/doc/source/img/retry_states.svg @@ -3,6 +3,6 @@ - -Retries statesPENDINGIGNORERUNNINGSUCCESSFAILURERETRYINGREVERTINGREVERTEDstart + +Retries statesPENDINGIGNORERUNNINGSUCCESSFAILURERETRYINGREVERTINGREVERTEDREVERT_FAILUREstart diff --git a/doc/source/img/task_states.svg b/doc/source/img/task_states.svg index 9c27c843..a9368e31 100644 --- a/doc/source/img/task_states.svg +++ b/doc/source/img/task_states.svg @@ -3,6 +3,6 @@ - -Tasks statesPENDINGIGNORERUNNINGFAILURESUCCESSREVERTINGREVERTEDstart + +Tasks statesPENDINGIGNORERUNNINGFAILURESUCCESSREVERTINGREVERTEDREVERT_FAILUREstart diff --git a/doc/source/states.rst b/doc/source/states.rst index 01e9da59..3d42bad1 100644 --- a/doc/source/states.rst +++ b/doc/source/states.rst @@ -136,19 +136,25 @@ method returns. **SUCCESS** - The engine running the task transitions the task to this state after the task has finished successfully (ie no exception/s were raised during -execution). +running its :py:meth:`~taskflow.task.BaseTask.execute` method). **FAILURE** - The engine running the task transitions the task to this state -after it has finished with an error. +after it has finished with an error (ie exception/s were raised during +running its :py:meth:`~taskflow.task.BaseTask.execute` method). + +**REVERT_FAILURE** - The engine running the task transitions the task to this +state after it has finished with an error (ie exception/s were raised during +running its :py:meth:`~taskflow.task.BaseTask.revert` method). **REVERTING** - The engine running a task transitions the task to this state when the containing flow the engine is running starts to revert and its :py:meth:`~taskflow.task.BaseTask.revert` method is called. Only tasks in -the ``SUCCESS`` or ``FAILURE`` state can be reverted. If this method fails (ie -raises an exception), the task goes to the ``FAILURE`` state (if it was already -in the ``FAILURE`` state then this is a no-op). +the ``SUCCESS`` or ``FAILURE`` state can be reverted. If this method fails (ie +raises an exception), the task goes to the ``REVERT_FAILURE`` state. -**REVERTED** - A task that has been reverted appears in this state. +**REVERTED** - The engine running the task transitions the task to this state +after it has successfully reverted the task (ie no exception/s were raised +during running its :py:meth:`~taskflow.task.BaseTask.revert` method). Retry ===== @@ -188,17 +194,23 @@ state until its :py:meth:`~taskflow.retry.Retry.execute` method returns. it was finished successfully (ie no exception/s were raised during execution). -**FAILURE** - The engine running the retry transitions it to this state after -it has finished with an error. +**FAILURE** - The engine running the retry transitions the retry to this state +after it has finished with an error (ie exception/s were raised during +running its :py:meth:`~taskflow.retry.Retry.execute` method). + +**REVERT_FAILURE** - The engine running the retry transitions the retry to +this state after it has finished with an error (ie exception/s were raised +during its :py:meth:`~taskflow.retry.Retry.revert` method). **REVERTING** - The engine running the retry transitions to this state when the associated flow the engine is running starts to revert it and its :py:meth:`~taskflow.retry.Retry.revert` method is called. Only retries in ``SUCCESS`` or ``FAILURE`` state can be reverted. If this method fails (ie -raises an exception), the retry goes to the ``FAILURE`` state (if it was -already in the ``FAILURE`` state then this is a no-op). +raises an exception), the retry goes to the ``REVERT_FAILURE`` state. -**REVERTED** - A retry that has been reverted appears in this state. +**REVERTED** - The engine running the retry transitions the retry to this state +after it has successfully reverted the retry (ie no exception/s were raised +during running its :py:meth:`~taskflow.retry.Retry.revert` method). **RETRYING** - If flow that is associated with the current retry was failed and reverted, the engine prepares the flow for the next run and transitions the diff --git a/taskflow/engines/action_engine/actions/base.py b/taskflow/engines/action_engine/actions/base.py index 369a6c66..48846746 100644 --- a/taskflow/engines/action_engine/actions/base.py +++ b/taskflow/engines/action_engine/actions/base.py @@ -21,17 +21,19 @@ import six from taskflow import states -#: Sentinel use to represent no-result (none can be a valid result...) -NO_RESULT = object() - -#: States that are expected to/may have a result to save... -SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE) - - @six.add_metaclass(abc.ABCMeta) class Action(object): """An action that handles executing, state changes, ... of atoms.""" + NO_RESULT = object() + """ + Sentinel use to represent lack of any result (none can be a valid result) + """ + + #: States that are expected to have a result to save... + SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE, + states.REVERTED, states.REVERT_FAILURE) + def __init__(self, storage, notifier): self._storage = storage self._notifier = notifier diff --git a/taskflow/engines/action_engine/actions/retry.py b/taskflow/engines/action_engine/actions/retry.py index c8cad50a..e8b076b4 100644 --- a/taskflow/engines/action_engine/actions/retry.py +++ b/taskflow/engines/action_engine/actions/retry.py @@ -60,19 +60,21 @@ class RetryAction(base.Action): arguments.update(addons) return arguments - def change_state(self, retry, state, result=base.NO_RESULT): + def change_state(self, retry, state, result=base.Action.NO_RESULT): old_state = self._storage.get_atom_state(retry.name) - if state in base.SAVE_RESULT_STATES: + if state in self.SAVE_RESULT_STATES: save_result = None - if result is not base.NO_RESULT: + if result is not self.NO_RESULT: save_result = result self._storage.save(retry.name, save_result, state) - elif state == states.REVERTED: - self._storage.cleanup_retry_history(retry.name, state) + # TODO(harlowja): combine this with the save to avoid a call + # back into the persistence layer... + if state == states.REVERTED: + self._storage.cleanup_retry_history(retry.name, state) else: if state == old_state: # NOTE(imelnikov): nothing really changed, so we should not - # write anything to storage and run notifications + # write anything to storage and run notifications. return self._storage.set_atom_state(retry.name, state) retry_uuid = self._storage.get_atom_uuid(retry.name) @@ -81,7 +83,7 @@ class RetryAction(base.Action): 'retry_uuid': retry_uuid, 'old_state': old_state, } - if result is not base.NO_RESULT: + if result is not self.NO_RESULT: details['result'] = result self._notifier.notify(state, details) @@ -106,9 +108,9 @@ class RetryAction(base.Action): def _on_done_callback(fut): result = fut.result()[-1] if isinstance(result, failure.Failure): - self.change_state(retry, states.FAILURE) + self.change_state(retry, states.REVERT_FAILURE, result=result) else: - self.change_state(retry, states.REVERTED) + self.change_state(retry, states.REVERTED, result=result) self.change_state(retry, states.REVERTING) arg_addons = { diff --git a/taskflow/engines/action_engine/actions/task.py b/taskflow/engines/action_engine/actions/task.py index ab4b50d9..7ae6b55f 100644 --- a/taskflow/engines/action_engine/actions/task.py +++ b/taskflow/engines/action_engine/actions/task.py @@ -32,8 +32,8 @@ class TaskAction(base.Action): super(TaskAction, self).__init__(storage, notifier) self._task_executor = task_executor - def _is_identity_transition(self, old_state, state, task, progress): - if state in base.SAVE_RESULT_STATES: + def _is_identity_transition(self, old_state, state, task, progress=None): + if state in self.SAVE_RESULT_STATES: # saving result is never identity transition return False if state != old_state: @@ -50,16 +50,17 @@ class TaskAction(base.Action): return True def change_state(self, task, state, - result=base.NO_RESULT, progress=None): + progress=None, result=base.Action.NO_RESULT): old_state = self._storage.get_atom_state(task.name) - if self._is_identity_transition(old_state, state, task, progress): + if self._is_identity_transition(old_state, state, task, + progress=progress): # NOTE(imelnikov): ignore identity transitions in order # to avoid extra write to storage backend and, what's - # more important, extra notifications + # more important, extra notifications. return - if state in base.SAVE_RESULT_STATES: + if state in self.SAVE_RESULT_STATES: save_result = None - if result is not base.NO_RESULT: + if result is not self.NO_RESULT: save_result = result self._storage.save(task.name, save_result, state) else: @@ -72,7 +73,7 @@ class TaskAction(base.Action): 'task_uuid': task_uuid, 'old_state': old_state, } - if result is not base.NO_RESULT: + if result is not self.NO_RESULT: details['result'] = result self._notifier.notify(state, details) if progress is not None: @@ -140,9 +141,10 @@ class TaskAction(base.Action): def complete_reversion(self, task, result): if isinstance(result, failure.Failure): - self.change_state(task, states.FAILURE) + self.change_state(task, states.REVERT_FAILURE, result=result) else: - self.change_state(task, states.REVERTED, progress=1.0) + self.change_state(task, states.REVERTED, progress=1.0, + result=result) def wait_for_any(self, fs, timeout): return self._task_executor.wait_for_any(fs, timeout) diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 318e3bc0..47300a46 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -152,6 +152,7 @@ class Completer(object): if event == ex.EXECUTED: self._process_atom_failure(node, result) else: + # Reverting failed, always retain the failure... return True return False diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 9da9ae9d..fed8d4ce 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -16,6 +16,7 @@ import collections import contextlib +import itertools import threading from concurrent import futures @@ -194,8 +195,10 @@ class ActionEngine(base.Engine): if last_state and last_state not in ignorable_states: self._change_state(last_state) if last_state not in self.NO_RERAISING_STATES: - failures = self.storage.get_failures() - failure.Failure.reraise_if_any(failures.values()) + it = itertools.chain( + six.itervalues(self.storage.get_failures()), + six.itervalues(self.storage.get_revert_failures())) + failure.Failure.reraise_if_any(it) def _change_state(self, state): with self._state_lock: diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py b/taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py new file mode 100644 index 00000000..dd54dff3 --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Add 'revert_results' and 'revert_failure' atom detail column. + +Revision ID: 3162c0f3f8e4 +Revises: 589dccdf2b6e +Create Date: 2015-06-17 15:52:56.575245 + +""" + +# revision identifiers, used by Alembic. +revision = '3162c0f3f8e4' +down_revision = '589dccdf2b6e' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('atomdetails', + sa.Column('revert_results', sa.Text(), nullable=True)) + op.add_column('atomdetails', + sa.Column('revert_failure', sa.Text(), nullable=True)) + + +def downgrade(): + op.drop_column('atomdetails', 'revert_results') + op.drop_column('atomdetails', 'revert_failure') diff --git a/taskflow/persistence/backends/sqlalchemy/tables.py b/taskflow/persistence/backends/sqlalchemy/tables.py index 28acca1a..65969fb2 100644 --- a/taskflow/persistence/backends/sqlalchemy/tables.py +++ b/taskflow/persistence/backends/sqlalchemy/tables.py @@ -92,6 +92,8 @@ def fetch(metadata): default=uuidutils.generate_uuid), Column('failure', Json), Column('results', Json), + Column('revert_results', Json), + Column('revert_failure', Json), Column('atom_type', Enum(*models.ATOM_TYPES, name='atom_types')), Column('intention', Enum(*states.INTENTIONS, diff --git a/taskflow/persistence/models.py b/taskflow/persistence/models.py index c7a6eae5..e41d6d79 100644 --- a/taskflow/persistence/models.py +++ b/taskflow/persistence/models.py @@ -32,6 +32,14 @@ LOG = logging.getLogger(__name__) # Internal helpers... +def _is_all_none(arg, *args): + if arg is not None: + return False + for more_arg in args: + if more_arg is not None: + return False + return True + def _copy_function(deep_copy): if deep_copy: @@ -413,11 +421,18 @@ class AtomDetail(object): strategies). :ivar results: Any results the atom produced from either its ``execute`` method or from other sources. - :ivar failure: If the atom failed (possibly due to its ``execute`` - method raising) this will be a + :ivar revert_results: Any results the atom produced from either its + ``revert`` method or from other sources. + :ivar failure: If the atom failed (due to its ``execute`` method + raising) this will be a :py:class:`~taskflow.types.failure.Failure` object that represents that failure (if there was no failure this will be set to none). + :ivar revert_failure: If the atom failed (possibly due to its ``revert`` + method raising) this will be a + :py:class:`~taskflow.types.failure.Failure` object + that represents that failure (if there was no + failure this will be set to none). """ def __init__(self, name, uuid): @@ -427,6 +442,8 @@ class AtomDetail(object): self.intention = states.EXECUTE self.results = None self.failure = None + self.revert_results = None + self.revert_failure = None self.meta = {} self.version = None @@ -465,6 +482,8 @@ class AtomDetail(object): self.meta = ad.meta self.failure = ad.failure self.results = ad.results + self.revert_results = ad.revert_results + self.revert_failure = ad.revert_failure self.version = ad.version return self @@ -503,6 +522,16 @@ class AtomDetail(object): self.failure = other.failure else: self.failure = None + if self.revert_failure != other.revert_failure: + # NOTE(imelnikov): we can't just deep copy Failures, as they + # contain tracebacks, which are not copyable. + if other.revert_failure: + if deep_copy: + self.revert_failure = other.revert_failure.copy() + else: + self.revert_failure = other.revert_failure + else: + self.revert_failure = None if self.meta != other.meta: self.meta = copy_fn(other.meta) if self.version != other.version: @@ -522,11 +551,17 @@ class AtomDetail(object): failure = self.failure.to_dict() else: failure = None + if self.revert_failure: + revert_failure = self.revert_failure.to_dict() + else: + revert_failure = None return { 'failure': failure, + 'revert_failure': revert_failure, 'meta': self.meta, 'name': self.name, 'results': self.results, + 'revert_results': self.revert_results, 'state': self.state, 'version': self.version, 'intention': self.intention, @@ -547,11 +582,15 @@ class AtomDetail(object): obj.state = data.get('state') obj.intention = data.get('intention') obj.results = data.get('results') + obj.revert_results = data.get('revert_results') obj.version = data.get('version') obj.meta = _fix_meta(data) failure = data.get('failure') if failure: obj.failure = ft.Failure.from_dict(failure) + revert_failure = data.get('revert_failure') + if revert_failure: + obj.revert_failure = ft.Failure.from_dict(revert_failure) return obj @property @@ -582,47 +621,65 @@ class TaskDetail(AtomDetail): def reset(self, state): """Resets this task detail and sets ``state`` attribute value. - This sets any previously set ``results`` and ``failure`` attributes - back to ``None`` and sets the state to the provided one, as well as - setting this task details ``intention`` attribute to ``EXECUTE``. + This sets any previously set ``results``, ``failure``, + and ``revert_results`` attributes back to ``None`` and sets the + state to the provided one, as well as setting this task + details ``intention`` attribute to ``EXECUTE``. """ self.results = None self.failure = None + self.revert_results = None + self.revert_failure = None self.state = state self.intention = states.EXECUTE def put(self, state, result): """Puts a result (acquired in the given state) into this detail. - If the result is a :py:class:`~taskflow.types.failure.Failure` object - then the ``failure`` attribute will be set (and the ``results`` - attribute will be set to ``None``); if the result is not a - :py:class:`~taskflow.types.failure.Failure` object then the - ``results`` attribute will be set (and the ``failure`` attribute - will be set to ``None``). In either case the ``state`` - attribute will be set to the provided state. + Returns whether this object was modified (or whether it was not). """ was_altered = False - if self.state != state: + if state != self.state: self.state = state was_altered = True - if self._was_failure(state, result): + if state == states.REVERT_FAILURE: + if self.revert_failure != result: + self.revert_failure = result + was_altered = True + if not _is_all_none(self.results, self.revert_results): + self.results = None + self.revert_results = None + was_altered = True + elif state == states.FAILURE: if self.failure != result: self.failure = result was_altered = True - if self.results is not None: + if not _is_all_none(self.results, self.revert_results, + self.revert_failure): self.results = None + self.revert_results = None + self.revert_failure = None + was_altered = True + elif state == states.SUCCESS: + if not _is_all_none(self.revert_results, self.revert_failure, + self.failure): + self.revert_results = None + self.revert_failure = None + self.failure = None was_altered = True - else: # We don't really have the ability to determine equality of # task (user) results at the current time, without making # potentially bad guesses, so assume the task detail always needs # to be saved if they are not exactly equivalent... - if self.results is not result: + if result is not self.results: self.results = result was_altered = True - if self.failure is not None: - self.failure = None + elif state == states.REVERTED: + if not _is_all_none(self.revert_failure): + self.revert_failure = None + was_altered = True + if result is not self.revert_results: + self.revert_results = result was_altered = True return was_altered @@ -630,10 +687,11 @@ class TaskDetail(AtomDetail): """Merges the current task detail with the given one. NOTE(harlowja): This merge does **not** copy and replace - the ``results`` attribute if it differs. Instead the current - objects ``results`` attribute directly becomes (via assignment) the - other objects ``results`` attribute. Also note that if the provided - object is this object itself then **no** merging is done. + the ``results`` or ``revert_results`` if it differs. Instead the + current objects ``results`` and ``revert_results`` attributes directly + becomes (via assignment) the other objects attributes. Also note that + if the provided object is this object itself then **no** merging is + done. See: https://bugs.launchpad.net/taskflow/+bug/1452978 for what happens if this is copied at a deeper level (for example by @@ -648,8 +706,8 @@ class TaskDetail(AtomDetail): if other is self: return self super(TaskDetail, self).merge(other, deep_copy=deep_copy) - if self.results != other.results: - self.results = other.results + self.results = other.results + self.revert_results = other.revert_results return self def copy(self): @@ -659,10 +717,10 @@ class TaskDetail(AtomDetail): version information that this object maintains is shallow copied via ``copy.copy``). - NOTE(harlowja): This copy does **not** perform ``copy.copy`` on - the ``results`` attribute of this object (before assigning to the - copy). Instead the current objects ``results`` attribute directly - becomes (via assignment) the copied objects ``results`` attribute. + NOTE(harlowja): This copy does **not** copy and replace + the ``results`` or ``revert_results`` attribute if it differs. Instead + the current objects ``results`` and ``revert_results`` attributes + directly becomes (via assignment) the cloned objects attributes. See: https://bugs.launchpad.net/taskflow/+bug/1452978 for what happens if this is copied at a deeper level (for example by @@ -673,6 +731,7 @@ class TaskDetail(AtomDetail): """ clone = copy.copy(self) clone.results = self.results + clone.revert_results = self.revert_results if self.meta: clone.meta = self.meta.copy() if self.version: @@ -694,12 +753,15 @@ class RetryDetail(AtomDetail): """Resets this retry detail and sets ``state`` attribute value. This sets any previously added ``results`` back to an empty list - and resets the ``failure`` attribute back to ``None`` and sets the - state to the provided one, as well as setting this atom + and resets the ``failure`` and ``revert_failure`` and + ``revert_results`` attributes back to ``None`` and sets the state + to the provided one, as well as setting this retry details ``intention`` attribute to ``EXECUTE``. """ self.results = [] + self.revert_results = None self.failure = None + self.revert_failure = None self.state = state self.intention = states.EXECUTE @@ -711,14 +773,15 @@ class RetryDetail(AtomDetail): copied via ``copy.copy``). NOTE(harlowja): This copy does **not** copy - the incoming objects ``results`` attribute. Instead this - objects ``results`` attribute list is iterated over and a new list - is constructed with each ``(data, failures)`` element in that list - having its ``failures`` (a dictionary of each named + the incoming objects ``results`` or ``revert_results`` attributes. + Instead this objects ``results`` attribute list is iterated over and + a new list is constructed with each ``(data, failures)`` element in + that list having its ``failures`` (a dictionary of each named :py:class:`~taskflow.types.failure.Failure` object that occured) copied but its ``data`` is left untouched. After this is done that new list becomes (via assignment) the cloned - objects ``results`` attribute. + objects ``results`` attribute. The ``revert_results`` is directly + assigned to the cloned objects ``revert_results`` attribute. See: https://bugs.launchpad.net/taskflow/+bug/1452978 for what happens if the ``data`` in ``results`` is copied at a @@ -738,6 +801,7 @@ class RetryDetail(AtomDetail): copied_failures[key] = failure results.append((data, copied_failures)) clone.results = results + clone.revert_results = self.revert_results if self.meta: clone.meta = self.meta.copy() if self.version: @@ -771,21 +835,50 @@ class RetryDetail(AtomDetail): def put(self, state, result): """Puts a result (acquired in the given state) into this detail. - If the result is a :py:class:`~taskflow.types.failure.Failure` object - then the ``failure`` attribute will be set; if the result is not a - :py:class:`~taskflow.types.failure.Failure` object then the - ``results`` attribute will be appended to (and the ``failure`` - attribute will be set to ``None``). In either case the ``state`` - attribute will be set to the provided state. + Returns whether this object was modified (or whether it was not). """ # Do not clean retry history (only on reset does this happen). - self.state = state - if self._was_failure(state, result): - self.failure = result - else: + was_altered = False + if state != self.state: + self.state = state + was_altered = True + if state == states.REVERT_FAILURE: + if result != self.revert_failure: + self.revert_failure = result + was_altered = True + if not _is_all_none(self.revert_results): + self.revert_results = None + was_altered = True + elif state == states.FAILURE: + if result != self.failure: + self.failure = result + was_altered = True + if not _is_all_none(self.revert_results, self.revert_failure): + self.revert_results = None + self.revert_failure = None + was_altered = True + elif state == states.SUCCESS: + if not _is_all_none(self.failure, self.revert_failure, + self.revert_results): + self.failure = None + self.revert_failure = None + self.revert_results = None + # Track what we produced, so that we can examine it (or avoid + # using it again). self.results.append((result, {})) - self.failure = None - return True + was_altered = True + elif state == states.REVERTED: + # We don't really have the ability to determine equality of + # task (user) results at the current time, without making + # potentially bad guesses, so assume the retry detail always needs + # to be saved if they are not exactly equivalent... + if result is not self.revert_results: + self.revert_results = result + was_altered = True + if not _is_all_none(self.revert_failure): + self.revert_failure = None + was_altered = True + return was_altered @classmethod def from_dict(cls, data): diff --git a/taskflow/states.py b/taskflow/states.py index cbef58c7..07e70dd1 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -41,6 +41,7 @@ SUCCESS = SUCCESS RUNNING = RUNNING RETRYING = 'RETRYING' IGNORE = 'IGNORE' +REVERT_FAILURE = 'REVERT_FAILURE' # Atom intentions. EXECUTE = 'EXECUTE' @@ -157,20 +158,20 @@ def check_flow_transition(old_state, new_state): # Task state transitions -# See: http://docs.openstack.org/developer/taskflow/states.html +# See: http://docs.openstack.org/developer/taskflow/states.html#task _ALLOWED_TASK_TRANSITIONS = frozenset(( (PENDING, RUNNING), # run it! (PENDING, IGNORE), # skip it! - (RUNNING, SUCCESS), # the task finished successfully - (RUNNING, FAILURE), # the task failed + (RUNNING, SUCCESS), # the task executed successfully + (RUNNING, FAILURE), # the task execution failed - (FAILURE, REVERTING), # task failed, do cleanup now - (SUCCESS, REVERTING), # some other task failed, do cleanup now + (FAILURE, REVERTING), # task execution failed, try reverting... + (SUCCESS, REVERTING), # some other task failed, try reverting... - (REVERTING, REVERTED), # revert done - (REVERTING, FAILURE), # revert failed + (REVERTING, REVERTED), # the task reverted successfully + (REVERTING, REVERT_FAILURE), # the task failed reverting (terminal!) (REVERTED, PENDING), # try again (IGNORE, PENDING), # try again diff --git a/taskflow/storage.py b/taskflow/storage.py index 05b48999..cab68f6d 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -28,15 +28,43 @@ from taskflow.persistence import models from taskflow import retry from taskflow import states from taskflow import task -from taskflow.types import failure from taskflow.utils import misc LOG = logging.getLogger(__name__) -STATES_WITH_RESULTS = (states.SUCCESS, states.REVERTING, states.FAILURE) + + +_EXECUTE_STATES_WITH_RESULTS = ( + # The atom ``execute`` worked out :) + states.SUCCESS, + # The atom ``execute`` didn't work out :( + states.FAILURE, + # In this state we will still have access to prior SUCCESS (or FAILURE) + # results, so make sure extraction is still allowed in this state... + states.REVERTING, +) + +_REVERT_STATES_WITH_RESULTS = ( + # The atom ``revert`` worked out :) + states.REVERTED, + # The atom ``revert`` didn't work out :( + states.REVERT_FAILURE, + # In this state we will still have access to prior SUCCESS (or FAILURE) + # results, so make sure extraction is still allowed in this state... + states.REVERTING, +) + +# Atom states that may have results... +STATES_WITH_RESULTS = set() +STATES_WITH_RESULTS.update(_REVERT_STATES_WITH_RESULTS) +STATES_WITH_RESULTS.update(_EXECUTE_STATES_WITH_RESULTS) +STATES_WITH_RESULTS = tuple(sorted(STATES_WITH_RESULTS)) # TODO(harlowja): do this better (via a singleton or something else...) _TRANSIENT_PROVIDER = object() +# Only for these intentions will we cache any failures that happened... +_SAVE_FAILURE_INTENTIONS = (states.EXECUTE, states.REVERT) + # NOTE(harlowja): Perhaps the container is a dictionary-like object and that # key does not exist (key error), or the container is a tuple/list and a # non-numeric key is being requested (index error), or there was no container @@ -164,8 +192,12 @@ class Storage(object): # so we cache failures here, in atom name -> failure mapping. self._failures = {} for ad in self._flowdetail: + fail_cache = {} if ad.failure is not None: - self._failures[ad.name] = ad.failure + fail_cache[states.EXECUTE] = ad.failure + if ad.revert_failure is not None: + fail_cache[states.REVERT] = ad.revert_failure + self._failures[ad.name] = fail_cache self._atom_name_to_uuid = dict((ad.name, ad.uuid) for ad in self._flowdetail) @@ -247,6 +279,7 @@ class Storage(object): atom_ids[i] = ad.uuid self._atom_name_to_uuid[atom_name] = ad.uuid self._set_result_mapping(atom_name, atom.save_as) + self._failures.setdefault(atom_name, {}) return atom_ids def ensure_atom(self, atom): @@ -448,21 +481,23 @@ class Storage(object): "with index %r (name %s)", atom_name, index, name) @fasteners.write_locked - def save(self, atom_name, data, state=states.SUCCESS): - """Save result for named atom into storage with given state.""" + def save(self, atom_name, result, state=states.SUCCESS): + """Put result for atom with provided name to storage.""" source, clone = self._atomdetail_by_name(atom_name, clone=True) - if clone.put(state, data): - result = self._with_connection(self._save_atom_detail, - source, clone) - else: - result = clone - if state == states.FAILURE and isinstance(data, failure.Failure): + if clone.put(state, result): + self._with_connection(self._save_atom_detail, source, clone) + # We need to somehow place more of this responsibility on the atom + # detail class itself, vs doing it here; since it ties those two + # together (which is bad)... + if state in (states.FAILURE, states.REVERT_FAILURE): # NOTE(imelnikov): failure serialization looses information, # so we cache failures here, in atom name -> failure mapping so # that we can later use the better version on fetch/get. - self._failures[result.name] = data - else: - self._check_all_results_provided(result.name, data) + if clone.intention in _SAVE_FAILURE_INTENTIONS: + fail_cache = self._failures[clone.name] + fail_cache[clone.intention] = result + if state == states.SUCCESS and clone.intention == states.EXECUTE: + self._check_all_results_provided(clone.name, result) @fasteners.write_locked def save_retry_failure(self, retry_name, failed_atom_name, failure): @@ -491,39 +526,69 @@ class Storage(object): self._with_connection(self._save_atom_detail, source, clone) @fasteners.read_locked - def _get(self, atom_name, only_last=False): + def _get(self, atom_name, + results_attr_name, fail_attr_name, + allowed_states, fail_cache_key): source, _clone = self._atomdetail_by_name(atom_name) - if source.failure is not None: - cached = self._failures.get(atom_name) - if source.failure.matches(cached): - # Try to give the version back that should have the backtrace - # instead of one that has it stripped (since backtraces are not - # serializable). - return cached - return source.failure - if source.state not in STATES_WITH_RESULTS: + failure = getattr(source, fail_attr_name) + if failure is not None: + fail_cache = self._failures[atom_name] + try: + fail = fail_cache[fail_cache_key] + if failure.matches(fail): + # Try to give the version back that should have the + # backtrace instead of one that has it + # stripped (since backtraces are not serializable). + failure = fail + except KeyError: + pass + return failure + # TODO(harlowja): this seems like it should be checked before fetching + # the potential failure, instead of after, fix this soon... + if source.state not in allowed_states: raise exceptions.NotFound("Result for atom %s is not currently" " known" % atom_name) - if only_last: - return source.last_results - else: - return source.results + return getattr(source, results_attr_name) - def get(self, atom_name): - """Gets the results for an atom with a given name from storage.""" - return self._get(atom_name) + def get_execute_result(self, atom_name): + """Gets the ``execute`` results for an atom from storage.""" + return self._get(atom_name, 'results', 'failure', + _EXECUTE_STATES_WITH_RESULTS, states.EXECUTE) @fasteners.read_locked - def get_failures(self): - """Get list of failures that happened with this flow. + def _get_failures(self, fail_cache_key): + failures = {} + for atom_name, fail_cache in six.iteritems(self._failures): + try: + failures[atom_name] = fail_cache[fail_cache_key] + except KeyError: + pass + return failures - No order guaranteed. - """ - return self._failures.copy() + def get_execute_failures(self): + """Get all ``execute`` failures that happened with this flow.""" + return self._get_failures(states.EXECUTE) + # TODO(harlowja): remove these in the future? + get = get_execute_result + get_failures = get_execute_failures + + def get_revert_result(self, atom_name): + """Gets the ``revert`` results for an atom from storage.""" + return self._get(atom_name, 'revert_results', 'revert_failure', + _REVERT_STATES_WITH_RESULTS, states.REVERT) + + def get_revert_failures(self): + """Get all ``revert`` failures that happened with this flow.""" + return self._get_failures(states.REVERT) + + @fasteners.read_locked def has_failures(self): - """Returns True if there are failed tasks in the storage.""" - return bool(self._failures) + """Returns true if there are **any** failures in storage.""" + for fail_cache in six.itervalues(self._failures): + if fail_cache: + return True + return False @fasteners.write_locked def reset(self, atom_name, state=states.PENDING): @@ -534,8 +599,8 @@ class Storage(object): if source.state == state: return clone.reset(state) - result = self._with_connection(self._save_atom_detail, source, clone) - self._failures.pop(result.name, None) + self._with_connection(self._save_atom_detail, source, clone) + self._failures[clone.name].clear() def inject_atom_args(self, atom_name, pairs, transient=True): """Add values into storage for a specific atom only. @@ -681,7 +746,7 @@ class Storage(object): @fasteners.read_locked def fetch(self, name, many_handler=None): - """Fetch a named result.""" + """Fetch a named ``execute`` result.""" def _many_handler(values): # By default we just return the first of many (unless provided # a different callback that can translate many results into @@ -702,7 +767,10 @@ class Storage(object): self._transients, name)) else: try: - container = self._get(provider.name, only_last=True) + container = self._get(provider.name, + 'last_results', 'failure', + _EXECUTE_STATES_WITH_RESULTS, + states.EXECUTE) except exceptions.NotFound: pass else: @@ -717,7 +785,7 @@ class Storage(object): @fasteners.read_locked def fetch_unsatisfied_args(self, atom_name, args_mapping, scope_walker=None, optional_args=None): - """Fetch unsatisfied atom arguments using an atoms argument mapping. + """Fetch unsatisfied ``execute`` arguments using an atoms args mapping. NOTE(harlowja): this takes into account the provided scope walker atoms who should produce the required value at runtime, as well as @@ -756,7 +824,9 @@ class Storage(object): results = self._transients else: try: - results = self._get(p.name, only_last=True) + results = self._get(p.name, 'last_results', 'failure', + _EXECUTE_STATES_WITH_RESULTS, + states.EXECUTE) except exceptions.NotFound: results = {} try: @@ -802,7 +872,7 @@ class Storage(object): @fasteners.read_locked def fetch_all(self, many_handler=None): - """Fetch all named results known so far.""" + """Fetch all named ``execute`` results known so far.""" def _many_handler(values): if len(values) > 1: return values @@ -821,7 +891,7 @@ class Storage(object): def fetch_mapped_args(self, args_mapping, atom_name=None, scope_walker=None, optional_args=None): - """Fetch arguments for an atom using an atoms argument mapping.""" + """Fetch ``execute`` arguments for an atom using its args mapping.""" def _extract_first_from(name, sources): """Extracts/returns first occurence of key in list of dicts.""" @@ -835,7 +905,9 @@ class Storage(object): def _get_results(looking_for, provider): """Gets the results saved for a given provider.""" try: - return self._get(provider.name, only_last=True) + return self._get(provider.name, 'last_results', 'failure', + _EXECUTE_STATES_WITH_RESULTS, + states.EXECUTE) except exceptions.NotFound: exceptions.raise_with_cause(exceptions.NotFound, "Expected to be able to find" @@ -963,11 +1035,14 @@ class Storage(object): # NOTE(harlowja): Try to use our local cache to get a more # complete failure object that has a traceback (instead of the # one that is saved which will *typically* not have one)... - cached = self._failures.get(ad.name) - if ad.failure.matches(cached): - failure = cached - else: - failure = ad.failure + failure = ad.failure + fail_cache = self._failures[ad.name] + try: + fail = fail_cache[states.EXECUTE] + if failure.matches(fail): + failure = fail + except KeyError: + pass return retry.History(ad.results, failure=failure) @fasteners.read_locked diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py index 401cf50d..9d43f312 100644 --- a/taskflow/tests/unit/action_engine/test_runner.py +++ b/taskflow/tests/unit/action_engine/test_runner.py @@ -126,7 +126,8 @@ class RunnerTest(test.TestCase, _RunnerTestMixin): failure = failures[0] self.assertTrue(failure.check(RuntimeError)) - self.assertEqual(st.FAILURE, rt.storage.get_atom_state(tasks[0].name)) + self.assertEqual(st.REVERT_FAILURE, + rt.storage.get_atom_state(tasks[0].name)) def test_run_iterations_suspended(self): flow = lf.Flow("root") diff --git a/taskflow/tests/unit/test_check_transition.py b/taskflow/tests/unit/test_check_transition.py index 7c820fd9..a8b5a7c3 100644 --- a/taskflow/tests/unit/test_check_transition.py +++ b/taskflow/tests/unit/test_check_transition.py @@ -21,11 +21,16 @@ from taskflow import test class TransitionTest(test.TestCase): + _DISALLOWED_TPL = "Transition from '%s' to '%s' was found to be disallowed" + _NOT_IGNORED_TPL = "Transition from '%s' to '%s' was not ignored" + def assertTransitionAllowed(self, from_state, to_state): - self.assertTrue(self.check_transition(from_state, to_state)) + msg = self._DISALLOWED_TPL % (from_state, to_state) + self.assertTrue(self.check_transition(from_state, to_state), msg=msg) def assertTransitionIgnored(self, from_state, to_state): - self.assertFalse(self.check_transition(from_state, to_state)) + msg = self._NOT_IGNORED_TPL % (from_state, to_state) + self.assertFalse(self.check_transition(from_state, to_state), msg=msg) def assertTransitionForbidden(self, from_state, to_state): self.assertRaisesRegexp(exc.InvalidState, @@ -101,7 +106,8 @@ class CheckTaskTransitionTest(TransitionTest): def test_from_reverting_state(self): self.assertTransitions(from_state=states.REVERTING, - allowed=(states.FAILURE, states.REVERTED), + allowed=(states.REVERT_FAILURE, + states.REVERTED), ignored=(states.RUNNING, states.REVERTING, states.PENDING, states.SUCCESS)) diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 255ac4fd..5cc242cb 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -66,7 +66,7 @@ class EngineTaskTest(object): engine = self._make_engine(flow) expected = ['fail.f RUNNING', 'fail.t RUNNING', 'fail.t FAILURE(Failure: RuntimeError: Woot!)', - 'fail.t REVERTING', 'fail.t REVERTED', + 'fail.t REVERTING', 'fail.t REVERTED(None)', 'fail.f REVERTED'] with utils.CaptureListener(engine, values=values) as capturer: self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) @@ -374,6 +374,29 @@ class EngineLinearFlowTest(utils.EngineTestBase): self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) self.assertEqual(engine.storage.fetch_all(), {}) + def test_revert_provided(self): + flow = lf.Flow('revert').add( + utils.GiveBackRevert('giver'), + utils.FailingTask(name='fail') + ) + engine = self._make_engine(flow, store={'value': 0}) + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) + self.assertEqual(engine.storage.get_revert_result('giver'), 2) + + def test_nasty_revert(self): + flow = lf.Flow('revert').add( + utils.NastyTask('nasty'), + utils.FailingTask(name='fail') + ) + engine = self._make_engine(flow) + self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) + fail = engine.storage.get_revert_result('nasty') + self.assertIsNotNone(fail.check(RuntimeError)) + exec_failures = engine.storage.get_execute_failures() + self.assertIn('fail', exec_failures) + rev_failures = engine.storage.get_revert_failures() + self.assertIn('nasty', rev_failures) + def test_sequential_flow_nested_blocks(self): flow = lf.Flow('nested-1').add( utils.ProgressingTask('task1'), @@ -406,7 +429,7 @@ class EngineLinearFlowTest(utils.EngineTestBase): self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) expected = ['fail.t RUNNING', 'fail.t FAILURE(Failure: RuntimeError: Woot!)', - 'fail.t REVERTING', 'fail.t REVERTED'] + 'fail.t REVERTING', 'fail.t REVERTED(None)'] self.assertEqual(expected, capturer.values) def test_correctly_reverts_children(self): @@ -424,9 +447,9 @@ class EngineLinearFlowTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t SUCCESS(5)', 'fail.t RUNNING', 'fail.t FAILURE(Failure: RuntimeError: Woot!)', - 'fail.t REVERTING', 'fail.t REVERTED', - 'task2.t REVERTING', 'task2.t REVERTED', - 'task1.t REVERTING', 'task1.t REVERTED'] + 'fail.t REVERTING', 'fail.t REVERTED(None)', + 'task2.t REVERTING', 'task2.t REVERTED(None)', + 'task1.t REVERTING', 'task1.t REVERTED(None)'] self.assertEqual(expected, capturer.values) @@ -529,18 +552,19 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) # NOTE(imelnikov): we don't know if task 3 was run, but if it was, - # it should have been reverted in correct order. + # it should have been REVERTED(None) in correct order. possible_values_no_task3 = [ 'task1.t RUNNING', 'task2.t RUNNING', 'fail.t FAILURE(Failure: RuntimeError: Woot!)', - 'task2.t REVERTED', 'task1.t REVERTED' + 'task2.t REVERTED(None)', 'task1.t REVERTED(None)' ] self.assertIsSuperAndSubsequence(capturer.values, possible_values_no_task3) if 'task3' in capturer.values: possible_values_task3 = [ 'task1.t RUNNING', 'task2.t RUNNING', 'task3.t RUNNING', - 'task3.t REVERTED', 'task2.t REVERTED', 'task1.t REVERTED' + 'task3.t REVERTED(None)', 'task2.t REVERTED(None)', + 'task1.t REVERTED(None)' ] self.assertIsSuperAndSubsequence(capturer.values, possible_values_task3) @@ -561,12 +585,12 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) # NOTE(imelnikov): we don't know if task 3 was run, but if it was, - # it should have been reverted in correct order. + # it should have been REVERTED(None) in correct order. possible_values = ['task1.t RUNNING', 'task1.t SUCCESS(5)', 'task2.t RUNNING', 'task2.t SUCCESS(5)', 'task3.t RUNNING', 'task3.t SUCCESS(5)', 'task3.t REVERTING', - 'task3.t REVERTED'] + 'task3.t REVERTED(None)'] self.assertIsSuperAndSubsequence(possible_values, capturer.values) possible_values_no_task3 = ['task1.t RUNNING', 'task2.t RUNNING'] self.assertIsSuperAndSubsequence(capturer.values, @@ -589,12 +613,12 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): # NOTE(imelnikov): if task1 was run, it should have been reverted. if 'task1' in capturer.values: task1_story = ['task1.t RUNNING', 'task1.t SUCCESS(5)', - 'task1.t REVERTED'] + 'task1.t REVERTED(None)'] self.assertIsSuperAndSubsequence(capturer.values, task1_story) # NOTE(imelnikov): task2 should have been run and reverted task2_story = ['task2.t RUNNING', 'task2.t SUCCESS(5)', - 'task2.t REVERTED'] + 'task2.t REVERTED(None)'] self.assertIsSuperAndSubsequence(capturer.values, task2_story) def test_revert_raises_for_linear_in_unordered(self): @@ -608,7 +632,7 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): engine = self._make_engine(flow) with utils.CaptureListener(engine, capture_flow=False) as capturer: self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) - self.assertNotIn('task2.t REVERTED', capturer.values) + self.assertNotIn('task2.t REVERTED(None)', capturer.values) class EngineGraphFlowTest(utils.EngineTestBase): @@ -697,11 +721,11 @@ class EngineGraphFlowTest(utils.EngineTestBase): 'task3.t RUNNING', 'task3.t FAILURE(Failure: RuntimeError: Woot!)', 'task3.t REVERTING', - 'task3.t REVERTED', + 'task3.t REVERTED(None)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED'] + 'task1.t REVERTED(None)'] self.assertEqual(expected, capturer.values) self.assertEqual(engine.storage.get_flow_state(), states.REVERTED) diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index ddb256b0..54f51bbf 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -82,8 +82,8 @@ class RetryTest(utils.EngineTestBase): 'task1.t RUNNING', 'task1.t SUCCESS(5)', 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', - 'task2.t REVERTING', 'task2.t REVERTED', - 'task1.t REVERTING', 'task1.t REVERTED', + 'task2.t REVERTING', 'task2.t REVERTED(None)', + 'task1.t REVERTING', 'task1.t REVERTED(None)', 'r1.r RETRYING', 'task1.t PENDING', 'task2.t PENDING', @@ -114,9 +114,9 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'r1.r RETRYING', 'task1.t PENDING', 'task2.t PENDING', @@ -127,11 +127,11 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -153,9 +153,9 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'task1.t REVERTING', - 'task1.t FAILURE', + 'task1.t REVERT_FAILURE(Failure: RuntimeError: Gotcha!)', 'flow-1.f FAILURE'] self.assertEqual(expected, capturer.values) @@ -185,9 +185,9 @@ class RetryTest(utils.EngineTestBase): 'task3.t RUNNING', 'task3.t FAILURE(Failure: RuntimeError: Woot!)', 'task3.t REVERTING', - 'task3.t REVERTED', + 'task3.t REVERTED(None)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r2.r RETRYING', 'task2.t PENDING', 'task3.t PENDING', @@ -231,15 +231,15 @@ class RetryTest(utils.EngineTestBase): 'task4.t RUNNING', 'task4.t FAILURE(Failure: RuntimeError: Woot!)', 'task4.t REVERTING', - 'task4.t REVERTED', + 'task4.t REVERTED(None)', 'task3.t REVERTING', - 'task3.t REVERTED', + 'task3.t REVERTED(None)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r2.r REVERTING', - 'r2.r REVERTED', + 'r2.r REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'r1.r RETRYING', 'task1.t PENDING', 'r2.r PENDING', @@ -280,8 +280,8 @@ class RetryTest(utils.EngineTestBase): 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', 'task1.t REVERTING', - 'task2.t REVERTED', - 'task1.t REVERTED', + 'task2.t REVERTED(None)', + 'task1.t REVERTED(None)', 'r.r RETRYING', 'task1.t PENDING', 'task2.t PENDING', @@ -316,11 +316,11 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r2.r REVERTING', - 'r2.r REVERTED', + 'r2.r REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'r1.r RETRYING', 'task1.t PENDING', 'r2.r PENDING', @@ -359,9 +359,9 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -388,11 +388,11 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -417,13 +417,13 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r2.r REVERTING', - 'r2.r REVERTED', + 'r2.r REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -515,7 +515,7 @@ class RetryTest(utils.EngineTestBase): 'c.t RUNNING', 'c.t FAILURE(Failure: RuntimeError: Woot!)', 'c.t REVERTING', - 'c.t REVERTED', + 'c.t REVERTED(None)', 'r1.r RETRYING', 'c.t PENDING', 'r1.r RUNNING', @@ -542,9 +542,9 @@ class RetryTest(utils.EngineTestBase): 't2.t RUNNING', 't2.t FAILURE(Failure: RuntimeError: Woot!)', 't2.t REVERTING', - 't2.t REVERTED', + 't2.t REVERTED(None)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 't2.t PENDING', @@ -555,9 +555,9 @@ class RetryTest(utils.EngineTestBase): 't2.t RUNNING', 't2.t FAILURE(Failure: RuntimeError: Woot!)', 't2.t REVERTING', - 't2.t REVERTED', + 't2.t REVERTED(None)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 't2.t PENDING', @@ -568,11 +568,11 @@ class RetryTest(utils.EngineTestBase): 't2.t RUNNING', 't2.t FAILURE(Failure: RuntimeError: Woot!)', 't2.t REVERTING', - 't2.t REVERTED', + 't2.t REVERTED(None)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -589,7 +589,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -597,7 +597,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 2)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -605,7 +605,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -613,9 +613,9 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 5)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -632,7 +632,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 2)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -640,7 +640,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -648,9 +648,9 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 5)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertItemsEqual(capturer.values, expected) @@ -674,7 +674,7 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r RETRYING', 'task2.t PENDING', 'r1.r RUNNING', @@ -682,7 +682,7 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 2)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r RETRYING', 'task2.t PENDING', 'r1.r RUNNING', @@ -690,7 +690,7 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r RETRYING', 'task2.t PENDING', 'r1.r RUNNING', @@ -698,9 +698,9 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 5)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -724,7 +724,7 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r RETRYING', 'task2.t PENDING', 'r1.r RUNNING', @@ -732,7 +732,7 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 2)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r RETRYING', 'task2.t PENDING', 'r1.r RUNNING', @@ -740,7 +740,7 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r RETRYING', 'task2.t PENDING', 'r1.r RUNNING', @@ -748,11 +748,11 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 5)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -778,7 +778,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -786,7 +786,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 2)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -794,9 +794,9 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 5)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -814,7 +814,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -822,7 +822,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 2)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -830,9 +830,9 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 5)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertItemsEqual(capturer.values, expected) @@ -857,7 +857,7 @@ class RetryTest(utils.EngineTestBase): 'task-2.t RUNNING', 'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)', 'task-2.t REVERTING', - 'task-2.t REVERTED', + 'task-2.t REVERTED(None)', 'r1.r RETRYING', 'task-2.t PENDING', 'r1.r RUNNING', @@ -865,7 +865,7 @@ class RetryTest(utils.EngineTestBase): 'task-2.t RUNNING', 'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)', 'task-2.t REVERTING', - 'task-2.t REVERTED', + 'task-2.t REVERTED(None)', 'r1.r RETRYING', 'task-2.t PENDING', 'r1.r RUNNING', @@ -873,9 +873,9 @@ class RetryTest(utils.EngineTestBase): 'task-2.t RUNNING', 'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)', 'task-2.t REVERTING', - 'task-2.t REVERTED', + 'task-2.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -901,7 +901,7 @@ class RetryTest(utils.EngineTestBase): 'task-2.t RUNNING', 'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)', 'task-2.t REVERTING', - 'task-2.t REVERTED', + 'task-2.t REVERTED(None)', 'r1.r RETRYING', 'task-2.t PENDING', 'r1.r RUNNING', @@ -909,7 +909,7 @@ class RetryTest(utils.EngineTestBase): 'task-2.t RUNNING', 'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)', 'task-2.t REVERTING', - 'task-2.t REVERTED', + 'task-2.t REVERTED(None)', 'r1.r RETRYING', 'task-2.t PENDING', 'r1.r RUNNING', @@ -917,11 +917,11 @@ class RetryTest(utils.EngineTestBase): 'task-2.t RUNNING', 'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)', 'task-2.t REVERTING', - 'task-2.t REVERTED', + 'task-2.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'task-1.t REVERTING', - 'task-1.t REVERTED', + 'task-1.t REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -973,7 +973,7 @@ class RetryTest(utils.EngineTestBase): with utils.CaptureListener(engine) as capturer: engine.run() expected = ['task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'flow-1_retry.r RETRYING', 'task1.t PENDING', 'flow-1_retry.r RUNNING', @@ -988,7 +988,7 @@ class RetryTest(utils.EngineTestBase): with utils.CaptureListener(engine) as capturer: engine.run() expected = ['task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'flow-1_retry.r RETRYING', 'task1.t PENDING', 'flow-1_retry.r RUNNING', @@ -1003,7 +1003,7 @@ class RetryTest(utils.EngineTestBase): with utils.CaptureListener(engine) as capturer: engine.run() expected = ['task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'flow-1_retry.r RETRYING', 'task1.t PENDING', 'flow-1_retry.r RUNNING', @@ -1018,7 +1018,7 @@ class RetryTest(utils.EngineTestBase): with utils.CaptureListener(engine) as capturer: engine.run() expected = ['task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'flow-1_retry.r RETRYING', 'task1.t PENDING', 'flow-1_retry.r RUNNING', @@ -1032,7 +1032,7 @@ class RetryTest(utils.EngineTestBase): engine = self._pretend_to_run_a_flow_and_crash('revert scheduled') with utils.CaptureListener(engine) as capturer: engine.run() - expected = ['task1.t REVERTED', + expected = ['task1.t REVERTED(None)', 'flow-1_retry.r RETRYING', 'task1.t PENDING', 'flow-1_retry.r RUNNING', @@ -1077,16 +1077,16 @@ class RetryTest(utils.EngineTestBase): 'c.t FAILURE(Failure: RuntimeError: Woot!)', 'a.t REVERTING', 'c.t REVERTING', - 'a.t REVERTED', - 'c.t REVERTED', + 'a.t REVERTED(None)', + 'c.t REVERTED(None)', 'b.t REVERTING', - 'b.t REVERTED'] + 'b.t REVERTED(None)'] self.assertItemsEqual(capturer.values[:8], expected) # Task 'a' was or was not executed again, both cases are ok. self.assertIsSuperAndSubsequence(capturer.values[8:], [ 'b.t RUNNING', 'c.t FAILURE(Failure: RuntimeError: Woot!)', - 'b.t REVERTED', + 'b.t REVERTED(None)', ]) self.assertEqual(engine.storage.get_flow_state(), st.REVERTED) @@ -1107,9 +1107,9 @@ class RetryTest(utils.EngineTestBase): with utils.CaptureListener(engine, capture_flow=False) as capturer: engine.run() expected = ['c.t REVERTING', - 'c.t REVERTED', + 'c.t REVERTED(None)', 'b.t REVERTING', - 'b.t REVERTED'] + 'b.t REVERTED(None)'] self.assertItemsEqual(capturer.values[:4], expected) expected = ['test2_retry.r RETRYING', 'b.t PENDING', @@ -1149,10 +1149,10 @@ class RetryParallelExecutionTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'task1.t SUCCESS(5)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'r.r RETRYING', 'task1.t PENDING', 'task2.t PENDING', @@ -1189,10 +1189,10 @@ class RetryParallelExecutionTest(utils.EngineTestBase): 'task3.t FAILURE(Failure: RuntimeError: Woot!)', 'task3.t REVERTING', 'task1.t REVERTING', - 'task3.t REVERTED', - 'task1.t REVERTED', + 'task3.t REVERTED(None)', + 'task1.t REVERTED(None)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r.r RETRYING', 'task1.t PENDING', 'task2.t PENDING', diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index 958d5a53..0e1c47fc 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -118,13 +118,6 @@ class StorageTestMixin(object): self.assertEqual(s.fetch_all(), {}) self.assertEqual(s.get_atom_state('my task'), states.SUCCESS) - def test_save_and_get_other_state(self): - s = self._get_storage() - s.ensure_atom(test_utils.NoopTask('my task')) - s.save('my task', 5, states.FAILURE) - self.assertEqual(s.get('my task'), 5) - self.assertEqual(s.get_atom_state('my task'), states.FAILURE) - def test_save_and_get_cached_failure(self): a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() @@ -141,7 +134,7 @@ class StorageTestMixin(object): s.ensure_atom(test_utils.NoopTask('my task')) s.save('my task', a_failure, states.FAILURE) self.assertEqual(s.get('my task'), a_failure) - s._failures['my task'] = None + s._failures['my task'] = {} self.assertTrue(a_failure.matches(s.get('my task'))) def test_get_failure_from_reverted_task(self): @@ -564,6 +557,33 @@ class StorageTestMixin(object): args = s.fetch_mapped_args(t.rebind, atom_name=t.name) self.assertEqual(3, args['x']) + def test_save_fetch(self): + t = test_utils.GiveBackRevert('my task') + s = self._get_storage() + s.ensure_atom(t) + s.save('my task', 2) + self.assertEqual(2, s.get('my task')) + self.assertRaises(exceptions.NotFound, + s.get_revert_result, 'my task') + + def test_save_fetch_revert(self): + t = test_utils.GiveBackRevert('my task') + s = self._get_storage() + s.ensure_atom(t) + s.set_atom_intention('my task', states.REVERT) + s.save('my task', 2, state=states.REVERTED) + self.assertRaises(exceptions.NotFound, s.get, 'my task') + self.assertEqual(2, s.get_revert_result('my task')) + + def test_save_fail_fetch_revert(self): + t = test_utils.GiveBackRevert('my task') + s = self._get_storage() + s.ensure_atom(t) + s.set_atom_intention('my task', states.REVERT) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) + s.save('my task', a_failure, state=states.REVERT_FAILURE) + self.assertEqual(a_failure, s.get_revert_result('my task')) + class StorageMemoryTest(StorageTestMixin, test.TestCase): def setUp(self): diff --git a/taskflow/tests/unit/test_suspend.py b/taskflow/tests/unit/test_suspend.py index e5d0288f..1b358acc 100644 --- a/taskflow/tests/unit/test_suspend.py +++ b/taskflow/tests/unit/test_suspend.py @@ -97,14 +97,14 @@ class SuspendTest(utils.EngineTestBase): 'c.t RUNNING', 'c.t FAILURE(Failure: RuntimeError: Woot!)', 'c.t REVERTING', - 'c.t REVERTED', + 'c.t REVERTED(None)', 'b.t REVERTING', - 'b.t REVERTED'] + 'b.t REVERTED(None)'] self.assertEqual(expected, capturer.values) with utils.CaptureListener(engine, capture_flow=False) as capturer: self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) self.assertEqual(engine.storage.get_flow_state(), states.REVERTED) - expected = ['a.t REVERTING', 'a.t REVERTED'] + expected = ['a.t REVERTING', 'a.t REVERTED(None)'] self.assertEqual(expected, capturer.values) def test_suspend_and_resume_linear_flow_on_revert(self): @@ -124,9 +124,9 @@ class SuspendTest(utils.EngineTestBase): 'c.t RUNNING', 'c.t FAILURE(Failure: RuntimeError: Woot!)', 'c.t REVERTING', - 'c.t REVERTED', + 'c.t REVERTED(None)', 'b.t REVERTING', - 'b.t REVERTED'] + 'b.t REVERTED(None)'] self.assertEqual(expected, capturer.values) # pretend we are resuming @@ -135,7 +135,7 @@ class SuspendTest(utils.EngineTestBase): self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run) self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED) expected = ['a.t REVERTING', - 'a.t REVERTED'] + 'a.t REVERTED(None)'] self.assertEqual(expected, capturer2.values) def test_suspend_and_revert_even_if_task_is_gone(self): @@ -157,9 +157,9 @@ class SuspendTest(utils.EngineTestBase): 'c.t RUNNING', 'c.t FAILURE(Failure: RuntimeError: Woot!)', 'c.t REVERTING', - 'c.t REVERTED', + 'c.t REVERTED(None)', 'b.t REVERTING', - 'b.t REVERTED'] + 'b.t REVERTED(None)'] self.assertEqual(expected, capturer.values) # pretend we are resuming, but task 'c' gone when flow got updated @@ -171,7 +171,7 @@ class SuspendTest(utils.EngineTestBase): with utils.CaptureListener(engine2, capture_flow=False) as capturer2: self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run) self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED) - expected = ['a.t REVERTING', 'a.t REVERTED'] + expected = ['a.t REVERTING', 'a.t REVERTED(None)'] self.assertEqual(capturer2.values, expected) def test_storage_is_rechecked(self): diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index 3acf245b..c5986739 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -33,7 +33,7 @@ class TestWorker(test.MockTestCase): self.broker_url = 'test-url' self.exchange = 'test-exchange' self.topic = 'test-topic' - self.endpoint_count = 25 + self.endpoint_count = 26 # patch classes self.executor_mock, self.executor_inst_mock = self.patchClass( diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index b295fc2a..266a0e8c 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -117,6 +117,15 @@ class AddOne(task.Task): return source + 1 +class GiveBackRevert(task.Task): + + def execute(self, value): + return value + 1 + + def revert(self, *args, **kwargs): + return kwargs.get('result') + 1 + + class FakeTask(object): def execute(self, **kwargs): diff --git a/taskflow/types/failure.py b/taskflow/types/failure.py index d713098d..dafe73e6 100644 --- a/taskflow/types/failure.py +++ b/taskflow/types/failure.py @@ -291,13 +291,15 @@ class Failure(object): def reraise_if_any(failures): """Re-raise exceptions if argument is not empty. - If argument is empty list, this method returns None. If - argument is a list with a single ``Failure`` object in it, - that failure is reraised. Else, a + If argument is empty list/tuple/iterator, this method returns + None. If argument is coverted into a list with a + single ``Failure`` object in it, that failure is reraised. Else, a :class:`~taskflow.exceptions.WrappedFailure` exception - is raised with a failure list as causes. + is raised with the failure list as causes. """ - failures = list(failures) + if not isinstance(failures, (list, tuple)): + # Convert generators/other into a list... + failures = list(failures) if len(failures) == 1: failures[0].reraise() elif len(failures) > 1: diff --git a/tools/state_graph.py b/tools/state_graph.py index c37cd703..635ec687 100755 --- a/tools/state_graph.py +++ b/tools/state_graph.py @@ -68,7 +68,7 @@ def make_machine(start_state, transitions): def map_color(internal_states, state): if state in internal_states: return 'blue' - if state == states.FAILURE: + if state in (states.FAILURE, states.REVERT_FAILURE): return 'red' if state == states.REVERTED: return 'darkorange'