diff --git a/doc/source/img/retry_states.svg b/doc/source/img/retry_states.svg index d6801b19c..abf8498e2 100644 --- a/doc/source/img/retry_states.svg +++ b/doc/source/img/retry_states.svg @@ -3,6 +3,6 @@ - -Retries statesPENDINGIGNORERUNNINGSUCCESSFAILURERETRYINGREVERTINGREVERTEDstart + +Retries statesPENDINGIGNORERUNNINGSUCCESSFAILURERETRYINGREVERTINGREVERTEDREVERT_FAILUREstart diff --git a/doc/source/img/task_states.svg b/doc/source/img/task_states.svg index 9c27c8432..a9368e317 100644 --- a/doc/source/img/task_states.svg +++ b/doc/source/img/task_states.svg @@ -3,6 +3,6 @@ - -Tasks statesPENDINGIGNORERUNNINGFAILURESUCCESSREVERTINGREVERTEDstart + +Tasks statesPENDINGIGNORERUNNINGFAILURESUCCESSREVERTINGREVERTEDREVERT_FAILUREstart diff --git a/doc/source/states.rst b/doc/source/states.rst index 01e9da597..3d42bad1d 100644 --- a/doc/source/states.rst +++ b/doc/source/states.rst @@ -136,19 +136,25 @@ method returns. **SUCCESS** - The engine running the task transitions the task to this state after the task has finished successfully (ie no exception/s were raised during -execution). +running its :py:meth:`~taskflow.task.BaseTask.execute` method). **FAILURE** - The engine running the task transitions the task to this state -after it has finished with an error. +after it has finished with an error (ie exception/s were raised during +running its :py:meth:`~taskflow.task.BaseTask.execute` method). + +**REVERT_FAILURE** - The engine running the task transitions the task to this +state after it has finished with an error (ie exception/s were raised during +running its :py:meth:`~taskflow.task.BaseTask.revert` method). **REVERTING** - The engine running a task transitions the task to this state when the containing flow the engine is running starts to revert and its :py:meth:`~taskflow.task.BaseTask.revert` method is called. Only tasks in -the ``SUCCESS`` or ``FAILURE`` state can be reverted. If this method fails (ie -raises an exception), the task goes to the ``FAILURE`` state (if it was already -in the ``FAILURE`` state then this is a no-op). +the ``SUCCESS`` or ``FAILURE`` state can be reverted. If this method fails (ie +raises an exception), the task goes to the ``REVERT_FAILURE`` state. -**REVERTED** - A task that has been reverted appears in this state. +**REVERTED** - The engine running the task transitions the task to this state +after it has successfully reverted the task (ie no exception/s were raised +during running its :py:meth:`~taskflow.task.BaseTask.revert` method). Retry ===== @@ -188,17 +194,23 @@ state until its :py:meth:`~taskflow.retry.Retry.execute` method returns. it was finished successfully (ie no exception/s were raised during execution). -**FAILURE** - The engine running the retry transitions it to this state after -it has finished with an error. +**FAILURE** - The engine running the retry transitions the retry to this state +after it has finished with an error (ie exception/s were raised during +running its :py:meth:`~taskflow.retry.Retry.execute` method). + +**REVERT_FAILURE** - The engine running the retry transitions the retry to +this state after it has finished with an error (ie exception/s were raised +during its :py:meth:`~taskflow.retry.Retry.revert` method). **REVERTING** - The engine running the retry transitions to this state when the associated flow the engine is running starts to revert it and its :py:meth:`~taskflow.retry.Retry.revert` method is called. Only retries in ``SUCCESS`` or ``FAILURE`` state can be reverted. If this method fails (ie -raises an exception), the retry goes to the ``FAILURE`` state (if it was -already in the ``FAILURE`` state then this is a no-op). +raises an exception), the retry goes to the ``REVERT_FAILURE`` state. -**REVERTED** - A retry that has been reverted appears in this state. +**REVERTED** - The engine running the retry transitions the retry to this state +after it has successfully reverted the retry (ie no exception/s were raised +during running its :py:meth:`~taskflow.retry.Retry.revert` method). **RETRYING** - If flow that is associated with the current retry was failed and reverted, the engine prepares the flow for the next run and transitions the diff --git a/taskflow/engines/action_engine/actions/base.py b/taskflow/engines/action_engine/actions/base.py index 369a6c66b..488467467 100644 --- a/taskflow/engines/action_engine/actions/base.py +++ b/taskflow/engines/action_engine/actions/base.py @@ -21,17 +21,19 @@ import six from taskflow import states -#: Sentinel use to represent no-result (none can be a valid result...) -NO_RESULT = object() - -#: States that are expected to/may have a result to save... -SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE) - - @six.add_metaclass(abc.ABCMeta) class Action(object): """An action that handles executing, state changes, ... of atoms.""" + NO_RESULT = object() + """ + Sentinel use to represent lack of any result (none can be a valid result) + """ + + #: States that are expected to have a result to save... + SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE, + states.REVERTED, states.REVERT_FAILURE) + def __init__(self, storage, notifier): self._storage = storage self._notifier = notifier diff --git a/taskflow/engines/action_engine/actions/retry.py b/taskflow/engines/action_engine/actions/retry.py index c8cad50ad..e8b076b4c 100644 --- a/taskflow/engines/action_engine/actions/retry.py +++ b/taskflow/engines/action_engine/actions/retry.py @@ -60,19 +60,21 @@ class RetryAction(base.Action): arguments.update(addons) return arguments - def change_state(self, retry, state, result=base.NO_RESULT): + def change_state(self, retry, state, result=base.Action.NO_RESULT): old_state = self._storage.get_atom_state(retry.name) - if state in base.SAVE_RESULT_STATES: + if state in self.SAVE_RESULT_STATES: save_result = None - if result is not base.NO_RESULT: + if result is not self.NO_RESULT: save_result = result self._storage.save(retry.name, save_result, state) - elif state == states.REVERTED: - self._storage.cleanup_retry_history(retry.name, state) + # TODO(harlowja): combine this with the save to avoid a call + # back into the persistence layer... + if state == states.REVERTED: + self._storage.cleanup_retry_history(retry.name, state) else: if state == old_state: # NOTE(imelnikov): nothing really changed, so we should not - # write anything to storage and run notifications + # write anything to storage and run notifications. return self._storage.set_atom_state(retry.name, state) retry_uuid = self._storage.get_atom_uuid(retry.name) @@ -81,7 +83,7 @@ class RetryAction(base.Action): 'retry_uuid': retry_uuid, 'old_state': old_state, } - if result is not base.NO_RESULT: + if result is not self.NO_RESULT: details['result'] = result self._notifier.notify(state, details) @@ -106,9 +108,9 @@ class RetryAction(base.Action): def _on_done_callback(fut): result = fut.result()[-1] if isinstance(result, failure.Failure): - self.change_state(retry, states.FAILURE) + self.change_state(retry, states.REVERT_FAILURE, result=result) else: - self.change_state(retry, states.REVERTED) + self.change_state(retry, states.REVERTED, result=result) self.change_state(retry, states.REVERTING) arg_addons = { diff --git a/taskflow/engines/action_engine/actions/task.py b/taskflow/engines/action_engine/actions/task.py index ab4b50d90..7ae6b55f8 100644 --- a/taskflow/engines/action_engine/actions/task.py +++ b/taskflow/engines/action_engine/actions/task.py @@ -32,8 +32,8 @@ class TaskAction(base.Action): super(TaskAction, self).__init__(storage, notifier) self._task_executor = task_executor - def _is_identity_transition(self, old_state, state, task, progress): - if state in base.SAVE_RESULT_STATES: + def _is_identity_transition(self, old_state, state, task, progress=None): + if state in self.SAVE_RESULT_STATES: # saving result is never identity transition return False if state != old_state: @@ -50,16 +50,17 @@ class TaskAction(base.Action): return True def change_state(self, task, state, - result=base.NO_RESULT, progress=None): + progress=None, result=base.Action.NO_RESULT): old_state = self._storage.get_atom_state(task.name) - if self._is_identity_transition(old_state, state, task, progress): + if self._is_identity_transition(old_state, state, task, + progress=progress): # NOTE(imelnikov): ignore identity transitions in order # to avoid extra write to storage backend and, what's - # more important, extra notifications + # more important, extra notifications. return - if state in base.SAVE_RESULT_STATES: + if state in self.SAVE_RESULT_STATES: save_result = None - if result is not base.NO_RESULT: + if result is not self.NO_RESULT: save_result = result self._storage.save(task.name, save_result, state) else: @@ -72,7 +73,7 @@ class TaskAction(base.Action): 'task_uuid': task_uuid, 'old_state': old_state, } - if result is not base.NO_RESULT: + if result is not self.NO_RESULT: details['result'] = result self._notifier.notify(state, details) if progress is not None: @@ -140,9 +141,10 @@ class TaskAction(base.Action): def complete_reversion(self, task, result): if isinstance(result, failure.Failure): - self.change_state(task, states.FAILURE) + self.change_state(task, states.REVERT_FAILURE, result=result) else: - self.change_state(task, states.REVERTED, progress=1.0) + self.change_state(task, states.REVERTED, progress=1.0, + result=result) def wait_for_any(self, fs, timeout): return self._task_executor.wait_for_any(fs, timeout) diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 318e3bc07..47300a466 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -152,6 +152,7 @@ class Completer(object): if event == ex.EXECUTED: self._process_atom_failure(node, result) else: + # Reverting failed, always retain the failure... return True return False diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 9da9ae9db..fed8d4ce2 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -16,6 +16,7 @@ import collections import contextlib +import itertools import threading from concurrent import futures @@ -194,8 +195,10 @@ class ActionEngine(base.Engine): if last_state and last_state not in ignorable_states: self._change_state(last_state) if last_state not in self.NO_RERAISING_STATES: - failures = self.storage.get_failures() - failure.Failure.reraise_if_any(failures.values()) + it = itertools.chain( + six.itervalues(self.storage.get_failures()), + six.itervalues(self.storage.get_revert_failures())) + failure.Failure.reraise_if_any(it) def _change_state(self, state): with self._state_lock: diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py b/taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py new file mode 100644 index 000000000..dd54dff3d --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Add 'revert_results' and 'revert_failure' atom detail column. + +Revision ID: 3162c0f3f8e4 +Revises: 589dccdf2b6e +Create Date: 2015-06-17 15:52:56.575245 + +""" + +# revision identifiers, used by Alembic. +revision = '3162c0f3f8e4' +down_revision = '589dccdf2b6e' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('atomdetails', + sa.Column('revert_results', sa.Text(), nullable=True)) + op.add_column('atomdetails', + sa.Column('revert_failure', sa.Text(), nullable=True)) + + +def downgrade(): + op.drop_column('atomdetails', 'revert_results') + op.drop_column('atomdetails', 'revert_failure') diff --git a/taskflow/persistence/backends/sqlalchemy/tables.py b/taskflow/persistence/backends/sqlalchemy/tables.py index 28acca1a0..65969fb24 100644 --- a/taskflow/persistence/backends/sqlalchemy/tables.py +++ b/taskflow/persistence/backends/sqlalchemy/tables.py @@ -92,6 +92,8 @@ def fetch(metadata): default=uuidutils.generate_uuid), Column('failure', Json), Column('results', Json), + Column('revert_results', Json), + Column('revert_failure', Json), Column('atom_type', Enum(*models.ATOM_TYPES, name='atom_types')), Column('intention', Enum(*states.INTENTIONS, diff --git a/taskflow/persistence/models.py b/taskflow/persistence/models.py index c7a6eae51..e41d6d79c 100644 --- a/taskflow/persistence/models.py +++ b/taskflow/persistence/models.py @@ -32,6 +32,14 @@ LOG = logging.getLogger(__name__) # Internal helpers... +def _is_all_none(arg, *args): + if arg is not None: + return False + for more_arg in args: + if more_arg is not None: + return False + return True + def _copy_function(deep_copy): if deep_copy: @@ -413,11 +421,18 @@ class AtomDetail(object): strategies). :ivar results: Any results the atom produced from either its ``execute`` method or from other sources. - :ivar failure: If the atom failed (possibly due to its ``execute`` - method raising) this will be a + :ivar revert_results: Any results the atom produced from either its + ``revert`` method or from other sources. + :ivar failure: If the atom failed (due to its ``execute`` method + raising) this will be a :py:class:`~taskflow.types.failure.Failure` object that represents that failure (if there was no failure this will be set to none). + :ivar revert_failure: If the atom failed (possibly due to its ``revert`` + method raising) this will be a + :py:class:`~taskflow.types.failure.Failure` object + that represents that failure (if there was no + failure this will be set to none). """ def __init__(self, name, uuid): @@ -427,6 +442,8 @@ class AtomDetail(object): self.intention = states.EXECUTE self.results = None self.failure = None + self.revert_results = None + self.revert_failure = None self.meta = {} self.version = None @@ -465,6 +482,8 @@ class AtomDetail(object): self.meta = ad.meta self.failure = ad.failure self.results = ad.results + self.revert_results = ad.revert_results + self.revert_failure = ad.revert_failure self.version = ad.version return self @@ -503,6 +522,16 @@ class AtomDetail(object): self.failure = other.failure else: self.failure = None + if self.revert_failure != other.revert_failure: + # NOTE(imelnikov): we can't just deep copy Failures, as they + # contain tracebacks, which are not copyable. + if other.revert_failure: + if deep_copy: + self.revert_failure = other.revert_failure.copy() + else: + self.revert_failure = other.revert_failure + else: + self.revert_failure = None if self.meta != other.meta: self.meta = copy_fn(other.meta) if self.version != other.version: @@ -522,11 +551,17 @@ class AtomDetail(object): failure = self.failure.to_dict() else: failure = None + if self.revert_failure: + revert_failure = self.revert_failure.to_dict() + else: + revert_failure = None return { 'failure': failure, + 'revert_failure': revert_failure, 'meta': self.meta, 'name': self.name, 'results': self.results, + 'revert_results': self.revert_results, 'state': self.state, 'version': self.version, 'intention': self.intention, @@ -547,11 +582,15 @@ class AtomDetail(object): obj.state = data.get('state') obj.intention = data.get('intention') obj.results = data.get('results') + obj.revert_results = data.get('revert_results') obj.version = data.get('version') obj.meta = _fix_meta(data) failure = data.get('failure') if failure: obj.failure = ft.Failure.from_dict(failure) + revert_failure = data.get('revert_failure') + if revert_failure: + obj.revert_failure = ft.Failure.from_dict(revert_failure) return obj @property @@ -582,47 +621,65 @@ class TaskDetail(AtomDetail): def reset(self, state): """Resets this task detail and sets ``state`` attribute value. - This sets any previously set ``results`` and ``failure`` attributes - back to ``None`` and sets the state to the provided one, as well as - setting this task details ``intention`` attribute to ``EXECUTE``. + This sets any previously set ``results``, ``failure``, + and ``revert_results`` attributes back to ``None`` and sets the + state to the provided one, as well as setting this task + details ``intention`` attribute to ``EXECUTE``. """ self.results = None self.failure = None + self.revert_results = None + self.revert_failure = None self.state = state self.intention = states.EXECUTE def put(self, state, result): """Puts a result (acquired in the given state) into this detail. - If the result is a :py:class:`~taskflow.types.failure.Failure` object - then the ``failure`` attribute will be set (and the ``results`` - attribute will be set to ``None``); if the result is not a - :py:class:`~taskflow.types.failure.Failure` object then the - ``results`` attribute will be set (and the ``failure`` attribute - will be set to ``None``). In either case the ``state`` - attribute will be set to the provided state. + Returns whether this object was modified (or whether it was not). """ was_altered = False - if self.state != state: + if state != self.state: self.state = state was_altered = True - if self._was_failure(state, result): + if state == states.REVERT_FAILURE: + if self.revert_failure != result: + self.revert_failure = result + was_altered = True + if not _is_all_none(self.results, self.revert_results): + self.results = None + self.revert_results = None + was_altered = True + elif state == states.FAILURE: if self.failure != result: self.failure = result was_altered = True - if self.results is not None: + if not _is_all_none(self.results, self.revert_results, + self.revert_failure): self.results = None + self.revert_results = None + self.revert_failure = None + was_altered = True + elif state == states.SUCCESS: + if not _is_all_none(self.revert_results, self.revert_failure, + self.failure): + self.revert_results = None + self.revert_failure = None + self.failure = None was_altered = True - else: # We don't really have the ability to determine equality of # task (user) results at the current time, without making # potentially bad guesses, so assume the task detail always needs # to be saved if they are not exactly equivalent... - if self.results is not result: + if result is not self.results: self.results = result was_altered = True - if self.failure is not None: - self.failure = None + elif state == states.REVERTED: + if not _is_all_none(self.revert_failure): + self.revert_failure = None + was_altered = True + if result is not self.revert_results: + self.revert_results = result was_altered = True return was_altered @@ -630,10 +687,11 @@ class TaskDetail(AtomDetail): """Merges the current task detail with the given one. NOTE(harlowja): This merge does **not** copy and replace - the ``results`` attribute if it differs. Instead the current - objects ``results`` attribute directly becomes (via assignment) the - other objects ``results`` attribute. Also note that if the provided - object is this object itself then **no** merging is done. + the ``results`` or ``revert_results`` if it differs. Instead the + current objects ``results`` and ``revert_results`` attributes directly + becomes (via assignment) the other objects attributes. Also note that + if the provided object is this object itself then **no** merging is + done. See: https://bugs.launchpad.net/taskflow/+bug/1452978 for what happens if this is copied at a deeper level (for example by @@ -648,8 +706,8 @@ class TaskDetail(AtomDetail): if other is self: return self super(TaskDetail, self).merge(other, deep_copy=deep_copy) - if self.results != other.results: - self.results = other.results + self.results = other.results + self.revert_results = other.revert_results return self def copy(self): @@ -659,10 +717,10 @@ class TaskDetail(AtomDetail): version information that this object maintains is shallow copied via ``copy.copy``). - NOTE(harlowja): This copy does **not** perform ``copy.copy`` on - the ``results`` attribute of this object (before assigning to the - copy). Instead the current objects ``results`` attribute directly - becomes (via assignment) the copied objects ``results`` attribute. + NOTE(harlowja): This copy does **not** copy and replace + the ``results`` or ``revert_results`` attribute if it differs. Instead + the current objects ``results`` and ``revert_results`` attributes + directly becomes (via assignment) the cloned objects attributes. See: https://bugs.launchpad.net/taskflow/+bug/1452978 for what happens if this is copied at a deeper level (for example by @@ -673,6 +731,7 @@ class TaskDetail(AtomDetail): """ clone = copy.copy(self) clone.results = self.results + clone.revert_results = self.revert_results if self.meta: clone.meta = self.meta.copy() if self.version: @@ -694,12 +753,15 @@ class RetryDetail(AtomDetail): """Resets this retry detail and sets ``state`` attribute value. This sets any previously added ``results`` back to an empty list - and resets the ``failure`` attribute back to ``None`` and sets the - state to the provided one, as well as setting this atom + and resets the ``failure`` and ``revert_failure`` and + ``revert_results`` attributes back to ``None`` and sets the state + to the provided one, as well as setting this retry details ``intention`` attribute to ``EXECUTE``. """ self.results = [] + self.revert_results = None self.failure = None + self.revert_failure = None self.state = state self.intention = states.EXECUTE @@ -711,14 +773,15 @@ class RetryDetail(AtomDetail): copied via ``copy.copy``). NOTE(harlowja): This copy does **not** copy - the incoming objects ``results`` attribute. Instead this - objects ``results`` attribute list is iterated over and a new list - is constructed with each ``(data, failures)`` element in that list - having its ``failures`` (a dictionary of each named + the incoming objects ``results`` or ``revert_results`` attributes. + Instead this objects ``results`` attribute list is iterated over and + a new list is constructed with each ``(data, failures)`` element in + that list having its ``failures`` (a dictionary of each named :py:class:`~taskflow.types.failure.Failure` object that occured) copied but its ``data`` is left untouched. After this is done that new list becomes (via assignment) the cloned - objects ``results`` attribute. + objects ``results`` attribute. The ``revert_results`` is directly + assigned to the cloned objects ``revert_results`` attribute. See: https://bugs.launchpad.net/taskflow/+bug/1452978 for what happens if the ``data`` in ``results`` is copied at a @@ -738,6 +801,7 @@ class RetryDetail(AtomDetail): copied_failures[key] = failure results.append((data, copied_failures)) clone.results = results + clone.revert_results = self.revert_results if self.meta: clone.meta = self.meta.copy() if self.version: @@ -771,21 +835,50 @@ class RetryDetail(AtomDetail): def put(self, state, result): """Puts a result (acquired in the given state) into this detail. - If the result is a :py:class:`~taskflow.types.failure.Failure` object - then the ``failure`` attribute will be set; if the result is not a - :py:class:`~taskflow.types.failure.Failure` object then the - ``results`` attribute will be appended to (and the ``failure`` - attribute will be set to ``None``). In either case the ``state`` - attribute will be set to the provided state. + Returns whether this object was modified (or whether it was not). """ # Do not clean retry history (only on reset does this happen). - self.state = state - if self._was_failure(state, result): - self.failure = result - else: + was_altered = False + if state != self.state: + self.state = state + was_altered = True + if state == states.REVERT_FAILURE: + if result != self.revert_failure: + self.revert_failure = result + was_altered = True + if not _is_all_none(self.revert_results): + self.revert_results = None + was_altered = True + elif state == states.FAILURE: + if result != self.failure: + self.failure = result + was_altered = True + if not _is_all_none(self.revert_results, self.revert_failure): + self.revert_results = None + self.revert_failure = None + was_altered = True + elif state == states.SUCCESS: + if not _is_all_none(self.failure, self.revert_failure, + self.revert_results): + self.failure = None + self.revert_failure = None + self.revert_results = None + # Track what we produced, so that we can examine it (or avoid + # using it again). self.results.append((result, {})) - self.failure = None - return True + was_altered = True + elif state == states.REVERTED: + # We don't really have the ability to determine equality of + # task (user) results at the current time, without making + # potentially bad guesses, so assume the retry detail always needs + # to be saved if they are not exactly equivalent... + if result is not self.revert_results: + self.revert_results = result + was_altered = True + if not _is_all_none(self.revert_failure): + self.revert_failure = None + was_altered = True + return was_altered @classmethod def from_dict(cls, data): diff --git a/taskflow/states.py b/taskflow/states.py index cbef58c79..07e70dd1a 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -41,6 +41,7 @@ SUCCESS = SUCCESS RUNNING = RUNNING RETRYING = 'RETRYING' IGNORE = 'IGNORE' +REVERT_FAILURE = 'REVERT_FAILURE' # Atom intentions. EXECUTE = 'EXECUTE' @@ -157,20 +158,20 @@ def check_flow_transition(old_state, new_state): # Task state transitions -# See: http://docs.openstack.org/developer/taskflow/states.html +# See: http://docs.openstack.org/developer/taskflow/states.html#task _ALLOWED_TASK_TRANSITIONS = frozenset(( (PENDING, RUNNING), # run it! (PENDING, IGNORE), # skip it! - (RUNNING, SUCCESS), # the task finished successfully - (RUNNING, FAILURE), # the task failed + (RUNNING, SUCCESS), # the task executed successfully + (RUNNING, FAILURE), # the task execution failed - (FAILURE, REVERTING), # task failed, do cleanup now - (SUCCESS, REVERTING), # some other task failed, do cleanup now + (FAILURE, REVERTING), # task execution failed, try reverting... + (SUCCESS, REVERTING), # some other task failed, try reverting... - (REVERTING, REVERTED), # revert done - (REVERTING, FAILURE), # revert failed + (REVERTING, REVERTED), # the task reverted successfully + (REVERTING, REVERT_FAILURE), # the task failed reverting (terminal!) (REVERTED, PENDING), # try again (IGNORE, PENDING), # try again diff --git a/taskflow/storage.py b/taskflow/storage.py index 05b489993..cab68f6d4 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -28,15 +28,43 @@ from taskflow.persistence import models from taskflow import retry from taskflow import states from taskflow import task -from taskflow.types import failure from taskflow.utils import misc LOG = logging.getLogger(__name__) -STATES_WITH_RESULTS = (states.SUCCESS, states.REVERTING, states.FAILURE) + + +_EXECUTE_STATES_WITH_RESULTS = ( + # The atom ``execute`` worked out :) + states.SUCCESS, + # The atom ``execute`` didn't work out :( + states.FAILURE, + # In this state we will still have access to prior SUCCESS (or FAILURE) + # results, so make sure extraction is still allowed in this state... + states.REVERTING, +) + +_REVERT_STATES_WITH_RESULTS = ( + # The atom ``revert`` worked out :) + states.REVERTED, + # The atom ``revert`` didn't work out :( + states.REVERT_FAILURE, + # In this state we will still have access to prior SUCCESS (or FAILURE) + # results, so make sure extraction is still allowed in this state... + states.REVERTING, +) + +# Atom states that may have results... +STATES_WITH_RESULTS = set() +STATES_WITH_RESULTS.update(_REVERT_STATES_WITH_RESULTS) +STATES_WITH_RESULTS.update(_EXECUTE_STATES_WITH_RESULTS) +STATES_WITH_RESULTS = tuple(sorted(STATES_WITH_RESULTS)) # TODO(harlowja): do this better (via a singleton or something else...) _TRANSIENT_PROVIDER = object() +# Only for these intentions will we cache any failures that happened... +_SAVE_FAILURE_INTENTIONS = (states.EXECUTE, states.REVERT) + # NOTE(harlowja): Perhaps the container is a dictionary-like object and that # key does not exist (key error), or the container is a tuple/list and a # non-numeric key is being requested (index error), or there was no container @@ -164,8 +192,12 @@ class Storage(object): # so we cache failures here, in atom name -> failure mapping. self._failures = {} for ad in self._flowdetail: + fail_cache = {} if ad.failure is not None: - self._failures[ad.name] = ad.failure + fail_cache[states.EXECUTE] = ad.failure + if ad.revert_failure is not None: + fail_cache[states.REVERT] = ad.revert_failure + self._failures[ad.name] = fail_cache self._atom_name_to_uuid = dict((ad.name, ad.uuid) for ad in self._flowdetail) @@ -247,6 +279,7 @@ class Storage(object): atom_ids[i] = ad.uuid self._atom_name_to_uuid[atom_name] = ad.uuid self._set_result_mapping(atom_name, atom.save_as) + self._failures.setdefault(atom_name, {}) return atom_ids def ensure_atom(self, atom): @@ -448,21 +481,23 @@ class Storage(object): "with index %r (name %s)", atom_name, index, name) @fasteners.write_locked - def save(self, atom_name, data, state=states.SUCCESS): - """Save result for named atom into storage with given state.""" + def save(self, atom_name, result, state=states.SUCCESS): + """Put result for atom with provided name to storage.""" source, clone = self._atomdetail_by_name(atom_name, clone=True) - if clone.put(state, data): - result = self._with_connection(self._save_atom_detail, - source, clone) - else: - result = clone - if state == states.FAILURE and isinstance(data, failure.Failure): + if clone.put(state, result): + self._with_connection(self._save_atom_detail, source, clone) + # We need to somehow place more of this responsibility on the atom + # detail class itself, vs doing it here; since it ties those two + # together (which is bad)... + if state in (states.FAILURE, states.REVERT_FAILURE): # NOTE(imelnikov): failure serialization looses information, # so we cache failures here, in atom name -> failure mapping so # that we can later use the better version on fetch/get. - self._failures[result.name] = data - else: - self._check_all_results_provided(result.name, data) + if clone.intention in _SAVE_FAILURE_INTENTIONS: + fail_cache = self._failures[clone.name] + fail_cache[clone.intention] = result + if state == states.SUCCESS and clone.intention == states.EXECUTE: + self._check_all_results_provided(clone.name, result) @fasteners.write_locked def save_retry_failure(self, retry_name, failed_atom_name, failure): @@ -491,39 +526,69 @@ class Storage(object): self._with_connection(self._save_atom_detail, source, clone) @fasteners.read_locked - def _get(self, atom_name, only_last=False): + def _get(self, atom_name, + results_attr_name, fail_attr_name, + allowed_states, fail_cache_key): source, _clone = self._atomdetail_by_name(atom_name) - if source.failure is not None: - cached = self._failures.get(atom_name) - if source.failure.matches(cached): - # Try to give the version back that should have the backtrace - # instead of one that has it stripped (since backtraces are not - # serializable). - return cached - return source.failure - if source.state not in STATES_WITH_RESULTS: + failure = getattr(source, fail_attr_name) + if failure is not None: + fail_cache = self._failures[atom_name] + try: + fail = fail_cache[fail_cache_key] + if failure.matches(fail): + # Try to give the version back that should have the + # backtrace instead of one that has it + # stripped (since backtraces are not serializable). + failure = fail + except KeyError: + pass + return failure + # TODO(harlowja): this seems like it should be checked before fetching + # the potential failure, instead of after, fix this soon... + if source.state not in allowed_states: raise exceptions.NotFound("Result for atom %s is not currently" " known" % atom_name) - if only_last: - return source.last_results - else: - return source.results + return getattr(source, results_attr_name) - def get(self, atom_name): - """Gets the results for an atom with a given name from storage.""" - return self._get(atom_name) + def get_execute_result(self, atom_name): + """Gets the ``execute`` results for an atom from storage.""" + return self._get(atom_name, 'results', 'failure', + _EXECUTE_STATES_WITH_RESULTS, states.EXECUTE) @fasteners.read_locked - def get_failures(self): - """Get list of failures that happened with this flow. + def _get_failures(self, fail_cache_key): + failures = {} + for atom_name, fail_cache in six.iteritems(self._failures): + try: + failures[atom_name] = fail_cache[fail_cache_key] + except KeyError: + pass + return failures - No order guaranteed. - """ - return self._failures.copy() + def get_execute_failures(self): + """Get all ``execute`` failures that happened with this flow.""" + return self._get_failures(states.EXECUTE) + # TODO(harlowja): remove these in the future? + get = get_execute_result + get_failures = get_execute_failures + + def get_revert_result(self, atom_name): + """Gets the ``revert`` results for an atom from storage.""" + return self._get(atom_name, 'revert_results', 'revert_failure', + _REVERT_STATES_WITH_RESULTS, states.REVERT) + + def get_revert_failures(self): + """Get all ``revert`` failures that happened with this flow.""" + return self._get_failures(states.REVERT) + + @fasteners.read_locked def has_failures(self): - """Returns True if there are failed tasks in the storage.""" - return bool(self._failures) + """Returns true if there are **any** failures in storage.""" + for fail_cache in six.itervalues(self._failures): + if fail_cache: + return True + return False @fasteners.write_locked def reset(self, atom_name, state=states.PENDING): @@ -534,8 +599,8 @@ class Storage(object): if source.state == state: return clone.reset(state) - result = self._with_connection(self._save_atom_detail, source, clone) - self._failures.pop(result.name, None) + self._with_connection(self._save_atom_detail, source, clone) + self._failures[clone.name].clear() def inject_atom_args(self, atom_name, pairs, transient=True): """Add values into storage for a specific atom only. @@ -681,7 +746,7 @@ class Storage(object): @fasteners.read_locked def fetch(self, name, many_handler=None): - """Fetch a named result.""" + """Fetch a named ``execute`` result.""" def _many_handler(values): # By default we just return the first of many (unless provided # a different callback that can translate many results into @@ -702,7 +767,10 @@ class Storage(object): self._transients, name)) else: try: - container = self._get(provider.name, only_last=True) + container = self._get(provider.name, + 'last_results', 'failure', + _EXECUTE_STATES_WITH_RESULTS, + states.EXECUTE) except exceptions.NotFound: pass else: @@ -717,7 +785,7 @@ class Storage(object): @fasteners.read_locked def fetch_unsatisfied_args(self, atom_name, args_mapping, scope_walker=None, optional_args=None): - """Fetch unsatisfied atom arguments using an atoms argument mapping. + """Fetch unsatisfied ``execute`` arguments using an atoms args mapping. NOTE(harlowja): this takes into account the provided scope walker atoms who should produce the required value at runtime, as well as @@ -756,7 +824,9 @@ class Storage(object): results = self._transients else: try: - results = self._get(p.name, only_last=True) + results = self._get(p.name, 'last_results', 'failure', + _EXECUTE_STATES_WITH_RESULTS, + states.EXECUTE) except exceptions.NotFound: results = {} try: @@ -802,7 +872,7 @@ class Storage(object): @fasteners.read_locked def fetch_all(self, many_handler=None): - """Fetch all named results known so far.""" + """Fetch all named ``execute`` results known so far.""" def _many_handler(values): if len(values) > 1: return values @@ -821,7 +891,7 @@ class Storage(object): def fetch_mapped_args(self, args_mapping, atom_name=None, scope_walker=None, optional_args=None): - """Fetch arguments for an atom using an atoms argument mapping.""" + """Fetch ``execute`` arguments for an atom using its args mapping.""" def _extract_first_from(name, sources): """Extracts/returns first occurence of key in list of dicts.""" @@ -835,7 +905,9 @@ class Storage(object): def _get_results(looking_for, provider): """Gets the results saved for a given provider.""" try: - return self._get(provider.name, only_last=True) + return self._get(provider.name, 'last_results', 'failure', + _EXECUTE_STATES_WITH_RESULTS, + states.EXECUTE) except exceptions.NotFound: exceptions.raise_with_cause(exceptions.NotFound, "Expected to be able to find" @@ -963,11 +1035,14 @@ class Storage(object): # NOTE(harlowja): Try to use our local cache to get a more # complete failure object that has a traceback (instead of the # one that is saved which will *typically* not have one)... - cached = self._failures.get(ad.name) - if ad.failure.matches(cached): - failure = cached - else: - failure = ad.failure + failure = ad.failure + fail_cache = self._failures[ad.name] + try: + fail = fail_cache[states.EXECUTE] + if failure.matches(fail): + failure = fail + except KeyError: + pass return retry.History(ad.results, failure=failure) @fasteners.read_locked diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py index 401cf50dc..9d43f312f 100644 --- a/taskflow/tests/unit/action_engine/test_runner.py +++ b/taskflow/tests/unit/action_engine/test_runner.py @@ -126,7 +126,8 @@ class RunnerTest(test.TestCase, _RunnerTestMixin): failure = failures[0] self.assertTrue(failure.check(RuntimeError)) - self.assertEqual(st.FAILURE, rt.storage.get_atom_state(tasks[0].name)) + self.assertEqual(st.REVERT_FAILURE, + rt.storage.get_atom_state(tasks[0].name)) def test_run_iterations_suspended(self): flow = lf.Flow("root") diff --git a/taskflow/tests/unit/test_check_transition.py b/taskflow/tests/unit/test_check_transition.py index 7c820fd9b..a8b5a7c32 100644 --- a/taskflow/tests/unit/test_check_transition.py +++ b/taskflow/tests/unit/test_check_transition.py @@ -21,11 +21,16 @@ from taskflow import test class TransitionTest(test.TestCase): + _DISALLOWED_TPL = "Transition from '%s' to '%s' was found to be disallowed" + _NOT_IGNORED_TPL = "Transition from '%s' to '%s' was not ignored" + def assertTransitionAllowed(self, from_state, to_state): - self.assertTrue(self.check_transition(from_state, to_state)) + msg = self._DISALLOWED_TPL % (from_state, to_state) + self.assertTrue(self.check_transition(from_state, to_state), msg=msg) def assertTransitionIgnored(self, from_state, to_state): - self.assertFalse(self.check_transition(from_state, to_state)) + msg = self._NOT_IGNORED_TPL % (from_state, to_state) + self.assertFalse(self.check_transition(from_state, to_state), msg=msg) def assertTransitionForbidden(self, from_state, to_state): self.assertRaisesRegexp(exc.InvalidState, @@ -101,7 +106,8 @@ class CheckTaskTransitionTest(TransitionTest): def test_from_reverting_state(self): self.assertTransitions(from_state=states.REVERTING, - allowed=(states.FAILURE, states.REVERTED), + allowed=(states.REVERT_FAILURE, + states.REVERTED), ignored=(states.RUNNING, states.REVERTING, states.PENDING, states.SUCCESS)) diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 255ac4fd9..5cc242cb3 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -66,7 +66,7 @@ class EngineTaskTest(object): engine = self._make_engine(flow) expected = ['fail.f RUNNING', 'fail.t RUNNING', 'fail.t FAILURE(Failure: RuntimeError: Woot!)', - 'fail.t REVERTING', 'fail.t REVERTED', + 'fail.t REVERTING', 'fail.t REVERTED(None)', 'fail.f REVERTED'] with utils.CaptureListener(engine, values=values) as capturer: self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) @@ -374,6 +374,29 @@ class EngineLinearFlowTest(utils.EngineTestBase): self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) self.assertEqual(engine.storage.fetch_all(), {}) + def test_revert_provided(self): + flow = lf.Flow('revert').add( + utils.GiveBackRevert('giver'), + utils.FailingTask(name='fail') + ) + engine = self._make_engine(flow, store={'value': 0}) + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) + self.assertEqual(engine.storage.get_revert_result('giver'), 2) + + def test_nasty_revert(self): + flow = lf.Flow('revert').add( + utils.NastyTask('nasty'), + utils.FailingTask(name='fail') + ) + engine = self._make_engine(flow) + self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) + fail = engine.storage.get_revert_result('nasty') + self.assertIsNotNone(fail.check(RuntimeError)) + exec_failures = engine.storage.get_execute_failures() + self.assertIn('fail', exec_failures) + rev_failures = engine.storage.get_revert_failures() + self.assertIn('nasty', rev_failures) + def test_sequential_flow_nested_blocks(self): flow = lf.Flow('nested-1').add( utils.ProgressingTask('task1'), @@ -406,7 +429,7 @@ class EngineLinearFlowTest(utils.EngineTestBase): self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) expected = ['fail.t RUNNING', 'fail.t FAILURE(Failure: RuntimeError: Woot!)', - 'fail.t REVERTING', 'fail.t REVERTED'] + 'fail.t REVERTING', 'fail.t REVERTED(None)'] self.assertEqual(expected, capturer.values) def test_correctly_reverts_children(self): @@ -424,9 +447,9 @@ class EngineLinearFlowTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t SUCCESS(5)', 'fail.t RUNNING', 'fail.t FAILURE(Failure: RuntimeError: Woot!)', - 'fail.t REVERTING', 'fail.t REVERTED', - 'task2.t REVERTING', 'task2.t REVERTED', - 'task1.t REVERTING', 'task1.t REVERTED'] + 'fail.t REVERTING', 'fail.t REVERTED(None)', + 'task2.t REVERTING', 'task2.t REVERTED(None)', + 'task1.t REVERTING', 'task1.t REVERTED(None)'] self.assertEqual(expected, capturer.values) @@ -529,18 +552,19 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) # NOTE(imelnikov): we don't know if task 3 was run, but if it was, - # it should have been reverted in correct order. + # it should have been REVERTED(None) in correct order. possible_values_no_task3 = [ 'task1.t RUNNING', 'task2.t RUNNING', 'fail.t FAILURE(Failure: RuntimeError: Woot!)', - 'task2.t REVERTED', 'task1.t REVERTED' + 'task2.t REVERTED(None)', 'task1.t REVERTED(None)' ] self.assertIsSuperAndSubsequence(capturer.values, possible_values_no_task3) if 'task3' in capturer.values: possible_values_task3 = [ 'task1.t RUNNING', 'task2.t RUNNING', 'task3.t RUNNING', - 'task3.t REVERTED', 'task2.t REVERTED', 'task1.t REVERTED' + 'task3.t REVERTED(None)', 'task2.t REVERTED(None)', + 'task1.t REVERTED(None)' ] self.assertIsSuperAndSubsequence(capturer.values, possible_values_task3) @@ -561,12 +585,12 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) # NOTE(imelnikov): we don't know if task 3 was run, but if it was, - # it should have been reverted in correct order. + # it should have been REVERTED(None) in correct order. possible_values = ['task1.t RUNNING', 'task1.t SUCCESS(5)', 'task2.t RUNNING', 'task2.t SUCCESS(5)', 'task3.t RUNNING', 'task3.t SUCCESS(5)', 'task3.t REVERTING', - 'task3.t REVERTED'] + 'task3.t REVERTED(None)'] self.assertIsSuperAndSubsequence(possible_values, capturer.values) possible_values_no_task3 = ['task1.t RUNNING', 'task2.t RUNNING'] self.assertIsSuperAndSubsequence(capturer.values, @@ -589,12 +613,12 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): # NOTE(imelnikov): if task1 was run, it should have been reverted. if 'task1' in capturer.values: task1_story = ['task1.t RUNNING', 'task1.t SUCCESS(5)', - 'task1.t REVERTED'] + 'task1.t REVERTED(None)'] self.assertIsSuperAndSubsequence(capturer.values, task1_story) # NOTE(imelnikov): task2 should have been run and reverted task2_story = ['task2.t RUNNING', 'task2.t SUCCESS(5)', - 'task2.t REVERTED'] + 'task2.t REVERTED(None)'] self.assertIsSuperAndSubsequence(capturer.values, task2_story) def test_revert_raises_for_linear_in_unordered(self): @@ -608,7 +632,7 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): engine = self._make_engine(flow) with utils.CaptureListener(engine, capture_flow=False) as capturer: self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) - self.assertNotIn('task2.t REVERTED', capturer.values) + self.assertNotIn('task2.t REVERTED(None)', capturer.values) class EngineGraphFlowTest(utils.EngineTestBase): @@ -697,11 +721,11 @@ class EngineGraphFlowTest(utils.EngineTestBase): 'task3.t RUNNING', 'task3.t FAILURE(Failure: RuntimeError: Woot!)', 'task3.t REVERTING', - 'task3.t REVERTED', + 'task3.t REVERTED(None)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED'] + 'task1.t REVERTED(None)'] self.assertEqual(expected, capturer.values) self.assertEqual(engine.storage.get_flow_state(), states.REVERTED) diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index ddb256b0e..54f51bbf1 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -82,8 +82,8 @@ class RetryTest(utils.EngineTestBase): 'task1.t RUNNING', 'task1.t SUCCESS(5)', 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', - 'task2.t REVERTING', 'task2.t REVERTED', - 'task1.t REVERTING', 'task1.t REVERTED', + 'task2.t REVERTING', 'task2.t REVERTED(None)', + 'task1.t REVERTING', 'task1.t REVERTED(None)', 'r1.r RETRYING', 'task1.t PENDING', 'task2.t PENDING', @@ -114,9 +114,9 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'r1.r RETRYING', 'task1.t PENDING', 'task2.t PENDING', @@ -127,11 +127,11 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -153,9 +153,9 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'task1.t REVERTING', - 'task1.t FAILURE', + 'task1.t REVERT_FAILURE(Failure: RuntimeError: Gotcha!)', 'flow-1.f FAILURE'] self.assertEqual(expected, capturer.values) @@ -185,9 +185,9 @@ class RetryTest(utils.EngineTestBase): 'task3.t RUNNING', 'task3.t FAILURE(Failure: RuntimeError: Woot!)', 'task3.t REVERTING', - 'task3.t REVERTED', + 'task3.t REVERTED(None)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r2.r RETRYING', 'task2.t PENDING', 'task3.t PENDING', @@ -231,15 +231,15 @@ class RetryTest(utils.EngineTestBase): 'task4.t RUNNING', 'task4.t FAILURE(Failure: RuntimeError: Woot!)', 'task4.t REVERTING', - 'task4.t REVERTED', + 'task4.t REVERTED(None)', 'task3.t REVERTING', - 'task3.t REVERTED', + 'task3.t REVERTED(None)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r2.r REVERTING', - 'r2.r REVERTED', + 'r2.r REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'r1.r RETRYING', 'task1.t PENDING', 'r2.r PENDING', @@ -280,8 +280,8 @@ class RetryTest(utils.EngineTestBase): 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', 'task1.t REVERTING', - 'task2.t REVERTED', - 'task1.t REVERTED', + 'task2.t REVERTED(None)', + 'task1.t REVERTED(None)', 'r.r RETRYING', 'task1.t PENDING', 'task2.t PENDING', @@ -316,11 +316,11 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r2.r REVERTING', - 'r2.r REVERTED', + 'r2.r REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'r1.r RETRYING', 'task1.t PENDING', 'r2.r PENDING', @@ -359,9 +359,9 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -388,11 +388,11 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -417,13 +417,13 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r2.r REVERTING', - 'r2.r REVERTED', + 'r2.r REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -515,7 +515,7 @@ class RetryTest(utils.EngineTestBase): 'c.t RUNNING', 'c.t FAILURE(Failure: RuntimeError: Woot!)', 'c.t REVERTING', - 'c.t REVERTED', + 'c.t REVERTED(None)', 'r1.r RETRYING', 'c.t PENDING', 'r1.r RUNNING', @@ -542,9 +542,9 @@ class RetryTest(utils.EngineTestBase): 't2.t RUNNING', 't2.t FAILURE(Failure: RuntimeError: Woot!)', 't2.t REVERTING', - 't2.t REVERTED', + 't2.t REVERTED(None)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 't2.t PENDING', @@ -555,9 +555,9 @@ class RetryTest(utils.EngineTestBase): 't2.t RUNNING', 't2.t FAILURE(Failure: RuntimeError: Woot!)', 't2.t REVERTING', - 't2.t REVERTED', + 't2.t REVERTED(None)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 't2.t PENDING', @@ -568,11 +568,11 @@ class RetryTest(utils.EngineTestBase): 't2.t RUNNING', 't2.t FAILURE(Failure: RuntimeError: Woot!)', 't2.t REVERTING', - 't2.t REVERTED', + 't2.t REVERTED(None)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -589,7 +589,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -597,7 +597,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 2)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -605,7 +605,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -613,9 +613,9 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 5)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -632,7 +632,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 2)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -640,7 +640,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -648,9 +648,9 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 5)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertItemsEqual(capturer.values, expected) @@ -674,7 +674,7 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r RETRYING', 'task2.t PENDING', 'r1.r RUNNING', @@ -682,7 +682,7 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 2)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r RETRYING', 'task2.t PENDING', 'r1.r RUNNING', @@ -690,7 +690,7 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r RETRYING', 'task2.t PENDING', 'r1.r RUNNING', @@ -698,9 +698,9 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 5)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -724,7 +724,7 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r RETRYING', 'task2.t PENDING', 'r1.r RUNNING', @@ -732,7 +732,7 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 2)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r RETRYING', 'task2.t PENDING', 'r1.r RUNNING', @@ -740,7 +740,7 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r RETRYING', 'task2.t PENDING', 'r1.r RUNNING', @@ -748,11 +748,11 @@ class RetryTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot with 5)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -778,7 +778,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -786,7 +786,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 2)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -794,9 +794,9 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 5)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -814,7 +814,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -822,7 +822,7 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 2)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r RETRYING', 't1.t PENDING', 'r1.r RUNNING', @@ -830,9 +830,9 @@ class RetryTest(utils.EngineTestBase): 't1.t RUNNING', 't1.t FAILURE(Failure: RuntimeError: Woot with 5)', 't1.t REVERTING', - 't1.t REVERTED', + 't1.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertItemsEqual(capturer.values, expected) @@ -857,7 +857,7 @@ class RetryTest(utils.EngineTestBase): 'task-2.t RUNNING', 'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)', 'task-2.t REVERTING', - 'task-2.t REVERTED', + 'task-2.t REVERTED(None)', 'r1.r RETRYING', 'task-2.t PENDING', 'r1.r RUNNING', @@ -865,7 +865,7 @@ class RetryTest(utils.EngineTestBase): 'task-2.t RUNNING', 'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)', 'task-2.t REVERTING', - 'task-2.t REVERTED', + 'task-2.t REVERTED(None)', 'r1.r RETRYING', 'task-2.t PENDING', 'r1.r RUNNING', @@ -873,9 +873,9 @@ class RetryTest(utils.EngineTestBase): 'task-2.t RUNNING', 'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)', 'task-2.t REVERTING', - 'task-2.t REVERTED', + 'task-2.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -901,7 +901,7 @@ class RetryTest(utils.EngineTestBase): 'task-2.t RUNNING', 'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)', 'task-2.t REVERTING', - 'task-2.t REVERTED', + 'task-2.t REVERTED(None)', 'r1.r RETRYING', 'task-2.t PENDING', 'r1.r RUNNING', @@ -909,7 +909,7 @@ class RetryTest(utils.EngineTestBase): 'task-2.t RUNNING', 'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)', 'task-2.t REVERTING', - 'task-2.t REVERTED', + 'task-2.t REVERTED(None)', 'r1.r RETRYING', 'task-2.t PENDING', 'r1.r RUNNING', @@ -917,11 +917,11 @@ class RetryTest(utils.EngineTestBase): 'task-2.t RUNNING', 'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)', 'task-2.t REVERTING', - 'task-2.t REVERTED', + 'task-2.t REVERTED(None)', 'r1.r REVERTING', - 'r1.r REVERTED', + 'r1.r REVERTED(None)', 'task-1.t REVERTING', - 'task-1.t REVERTED', + 'task-1.t REVERTED(None)', 'flow-1.f REVERTED'] self.assertEqual(expected, capturer.values) @@ -973,7 +973,7 @@ class RetryTest(utils.EngineTestBase): with utils.CaptureListener(engine) as capturer: engine.run() expected = ['task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'flow-1_retry.r RETRYING', 'task1.t PENDING', 'flow-1_retry.r RUNNING', @@ -988,7 +988,7 @@ class RetryTest(utils.EngineTestBase): with utils.CaptureListener(engine) as capturer: engine.run() expected = ['task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'flow-1_retry.r RETRYING', 'task1.t PENDING', 'flow-1_retry.r RUNNING', @@ -1003,7 +1003,7 @@ class RetryTest(utils.EngineTestBase): with utils.CaptureListener(engine) as capturer: engine.run() expected = ['task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'flow-1_retry.r RETRYING', 'task1.t PENDING', 'flow-1_retry.r RUNNING', @@ -1018,7 +1018,7 @@ class RetryTest(utils.EngineTestBase): with utils.CaptureListener(engine) as capturer: engine.run() expected = ['task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'flow-1_retry.r RETRYING', 'task1.t PENDING', 'flow-1_retry.r RUNNING', @@ -1032,7 +1032,7 @@ class RetryTest(utils.EngineTestBase): engine = self._pretend_to_run_a_flow_and_crash('revert scheduled') with utils.CaptureListener(engine) as capturer: engine.run() - expected = ['task1.t REVERTED', + expected = ['task1.t REVERTED(None)', 'flow-1_retry.r RETRYING', 'task1.t PENDING', 'flow-1_retry.r RUNNING', @@ -1077,16 +1077,16 @@ class RetryTest(utils.EngineTestBase): 'c.t FAILURE(Failure: RuntimeError: Woot!)', 'a.t REVERTING', 'c.t REVERTING', - 'a.t REVERTED', - 'c.t REVERTED', + 'a.t REVERTED(None)', + 'c.t REVERTED(None)', 'b.t REVERTING', - 'b.t REVERTED'] + 'b.t REVERTED(None)'] self.assertItemsEqual(capturer.values[:8], expected) # Task 'a' was or was not executed again, both cases are ok. self.assertIsSuperAndSubsequence(capturer.values[8:], [ 'b.t RUNNING', 'c.t FAILURE(Failure: RuntimeError: Woot!)', - 'b.t REVERTED', + 'b.t REVERTED(None)', ]) self.assertEqual(engine.storage.get_flow_state(), st.REVERTED) @@ -1107,9 +1107,9 @@ class RetryTest(utils.EngineTestBase): with utils.CaptureListener(engine, capture_flow=False) as capturer: engine.run() expected = ['c.t REVERTING', - 'c.t REVERTED', + 'c.t REVERTED(None)', 'b.t REVERTING', - 'b.t REVERTED'] + 'b.t REVERTED(None)'] self.assertItemsEqual(capturer.values[:4], expected) expected = ['test2_retry.r RETRYING', 'b.t PENDING', @@ -1149,10 +1149,10 @@ class RetryParallelExecutionTest(utils.EngineTestBase): 'task2.t RUNNING', 'task2.t FAILURE(Failure: RuntimeError: Woot!)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'task1.t SUCCESS(5)', 'task1.t REVERTING', - 'task1.t REVERTED', + 'task1.t REVERTED(None)', 'r.r RETRYING', 'task1.t PENDING', 'task2.t PENDING', @@ -1189,10 +1189,10 @@ class RetryParallelExecutionTest(utils.EngineTestBase): 'task3.t FAILURE(Failure: RuntimeError: Woot!)', 'task3.t REVERTING', 'task1.t REVERTING', - 'task3.t REVERTED', - 'task1.t REVERTED', + 'task3.t REVERTED(None)', + 'task1.t REVERTED(None)', 'task2.t REVERTING', - 'task2.t REVERTED', + 'task2.t REVERTED(None)', 'r.r RETRYING', 'task1.t PENDING', 'task2.t PENDING', diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index 958d5a53c..0e1c47fc3 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -118,13 +118,6 @@ class StorageTestMixin(object): self.assertEqual(s.fetch_all(), {}) self.assertEqual(s.get_atom_state('my task'), states.SUCCESS) - def test_save_and_get_other_state(self): - s = self._get_storage() - s.ensure_atom(test_utils.NoopTask('my task')) - s.save('my task', 5, states.FAILURE) - self.assertEqual(s.get('my task'), 5) - self.assertEqual(s.get_atom_state('my task'), states.FAILURE) - def test_save_and_get_cached_failure(self): a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() @@ -141,7 +134,7 @@ class StorageTestMixin(object): s.ensure_atom(test_utils.NoopTask('my task')) s.save('my task', a_failure, states.FAILURE) self.assertEqual(s.get('my task'), a_failure) - s._failures['my task'] = None + s._failures['my task'] = {} self.assertTrue(a_failure.matches(s.get('my task'))) def test_get_failure_from_reverted_task(self): @@ -564,6 +557,33 @@ class StorageTestMixin(object): args = s.fetch_mapped_args(t.rebind, atom_name=t.name) self.assertEqual(3, args['x']) + def test_save_fetch(self): + t = test_utils.GiveBackRevert('my task') + s = self._get_storage() + s.ensure_atom(t) + s.save('my task', 2) + self.assertEqual(2, s.get('my task')) + self.assertRaises(exceptions.NotFound, + s.get_revert_result, 'my task') + + def test_save_fetch_revert(self): + t = test_utils.GiveBackRevert('my task') + s = self._get_storage() + s.ensure_atom(t) + s.set_atom_intention('my task', states.REVERT) + s.save('my task', 2, state=states.REVERTED) + self.assertRaises(exceptions.NotFound, s.get, 'my task') + self.assertEqual(2, s.get_revert_result('my task')) + + def test_save_fail_fetch_revert(self): + t = test_utils.GiveBackRevert('my task') + s = self._get_storage() + s.ensure_atom(t) + s.set_atom_intention('my task', states.REVERT) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) + s.save('my task', a_failure, state=states.REVERT_FAILURE) + self.assertEqual(a_failure, s.get_revert_result('my task')) + class StorageMemoryTest(StorageTestMixin, test.TestCase): def setUp(self): diff --git a/taskflow/tests/unit/test_suspend.py b/taskflow/tests/unit/test_suspend.py index e5d0288f6..1b358acce 100644 --- a/taskflow/tests/unit/test_suspend.py +++ b/taskflow/tests/unit/test_suspend.py @@ -97,14 +97,14 @@ class SuspendTest(utils.EngineTestBase): 'c.t RUNNING', 'c.t FAILURE(Failure: RuntimeError: Woot!)', 'c.t REVERTING', - 'c.t REVERTED', + 'c.t REVERTED(None)', 'b.t REVERTING', - 'b.t REVERTED'] + 'b.t REVERTED(None)'] self.assertEqual(expected, capturer.values) with utils.CaptureListener(engine, capture_flow=False) as capturer: self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) self.assertEqual(engine.storage.get_flow_state(), states.REVERTED) - expected = ['a.t REVERTING', 'a.t REVERTED'] + expected = ['a.t REVERTING', 'a.t REVERTED(None)'] self.assertEqual(expected, capturer.values) def test_suspend_and_resume_linear_flow_on_revert(self): @@ -124,9 +124,9 @@ class SuspendTest(utils.EngineTestBase): 'c.t RUNNING', 'c.t FAILURE(Failure: RuntimeError: Woot!)', 'c.t REVERTING', - 'c.t REVERTED', + 'c.t REVERTED(None)', 'b.t REVERTING', - 'b.t REVERTED'] + 'b.t REVERTED(None)'] self.assertEqual(expected, capturer.values) # pretend we are resuming @@ -135,7 +135,7 @@ class SuspendTest(utils.EngineTestBase): self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run) self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED) expected = ['a.t REVERTING', - 'a.t REVERTED'] + 'a.t REVERTED(None)'] self.assertEqual(expected, capturer2.values) def test_suspend_and_revert_even_if_task_is_gone(self): @@ -157,9 +157,9 @@ class SuspendTest(utils.EngineTestBase): 'c.t RUNNING', 'c.t FAILURE(Failure: RuntimeError: Woot!)', 'c.t REVERTING', - 'c.t REVERTED', + 'c.t REVERTED(None)', 'b.t REVERTING', - 'b.t REVERTED'] + 'b.t REVERTED(None)'] self.assertEqual(expected, capturer.values) # pretend we are resuming, but task 'c' gone when flow got updated @@ -171,7 +171,7 @@ class SuspendTest(utils.EngineTestBase): with utils.CaptureListener(engine2, capture_flow=False) as capturer2: self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run) self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED) - expected = ['a.t REVERTING', 'a.t REVERTED'] + expected = ['a.t REVERTING', 'a.t REVERTED(None)'] self.assertEqual(capturer2.values, expected) def test_storage_is_rechecked(self): diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index 3acf245b2..c5986739e 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -33,7 +33,7 @@ class TestWorker(test.MockTestCase): self.broker_url = 'test-url' self.exchange = 'test-exchange' self.topic = 'test-topic' - self.endpoint_count = 25 + self.endpoint_count = 26 # patch classes self.executor_mock, self.executor_inst_mock = self.patchClass( diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index b295fc2a2..266a0e8c0 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -117,6 +117,15 @@ class AddOne(task.Task): return source + 1 +class GiveBackRevert(task.Task): + + def execute(self, value): + return value + 1 + + def revert(self, *args, **kwargs): + return kwargs.get('result') + 1 + + class FakeTask(object): def execute(self, **kwargs): diff --git a/taskflow/types/failure.py b/taskflow/types/failure.py index d713098df..dafe73e66 100644 --- a/taskflow/types/failure.py +++ b/taskflow/types/failure.py @@ -291,13 +291,15 @@ class Failure(object): def reraise_if_any(failures): """Re-raise exceptions if argument is not empty. - If argument is empty list, this method returns None. If - argument is a list with a single ``Failure`` object in it, - that failure is reraised. Else, a + If argument is empty list/tuple/iterator, this method returns + None. If argument is coverted into a list with a + single ``Failure`` object in it, that failure is reraised. Else, a :class:`~taskflow.exceptions.WrappedFailure` exception - is raised with a failure list as causes. + is raised with the failure list as causes. """ - failures = list(failures) + if not isinstance(failures, (list, tuple)): + # Convert generators/other into a list... + failures = list(failures) if len(failures) == 1: failures[0].reraise() elif len(failures) > 1: diff --git a/tools/state_graph.py b/tools/state_graph.py index c37cd703f..635ec687b 100755 --- a/tools/state_graph.py +++ b/tools/state_graph.py @@ -68,7 +68,7 @@ def make_machine(start_state, transitions): def map_color(internal_states, state): if state in internal_states: return 'blue' - if state == states.FAILURE: + if state in (states.FAILURE, states.REVERT_FAILURE): return 'red' if state == states.REVERTED: return 'darkorange'