From 75eac61a0689c2c032716e9a1b6429f29fb482b0 Mon Sep 17 00:00:00 2001 From: Anastasia Karpinska Date: Thu, 6 Feb 2014 19:34:39 +0200 Subject: [PATCH] Flow smart revert with retry controller - Remove flow REVERTING state. Now flow can be running and reverting simultaneously. Until flow isn't finished it is in RUNNING state. - Add RETRYING state for the retry controller. - Implement smart revertion and flow retries and retries resumption. - Default retry controllers: Times, ForEach and ParameterizedForEach. - Example and unit tests. Implements: blueprint subgraph-execution Implements: blueprint reversion-strategies Implements: blueprint smart-revert Change-Id: Ifa600bcad1edf2910f02ac36783cd458afbd880c --- taskflow/engines/action_engine/engine.py | 28 +- .../engines/action_engine/graph_action.py | 153 +++-- .../engines/action_engine/graph_analyzer.py | 70 +- .../engines/action_engine/retry_action.py | 32 +- taskflow/engines/action_engine/task_action.py | 20 +- taskflow/examples/retry_flow.out.txt | 6 + taskflow/examples/retry_flow.py | 65 ++ taskflow/persistence/logbook.py | 16 + taskflow/retry.py | 84 +++ taskflow/states.py | 55 +- taskflow/storage.py | 27 +- taskflow/test.py | 7 + taskflow/tests/unit/test_action_engine.py | 12 +- taskflow/tests/unit/test_arguments_passing.py | 6 +- taskflow/tests/unit/test_check_transition.py | 45 +- taskflow/tests/unit/test_flow_dependencies.py | 12 + taskflow/tests/unit/test_retries.py | 608 +++++++++++++++++- taskflow/tests/unit/test_storage.py | 6 +- .../tests/unit/worker_based/test_worker.py | 2 +- taskflow/tests/utils.py | 59 +- taskflow/utils/flow_utils.py | 4 + 21 files changed, 1124 insertions(+), 193 deletions(-) create mode 100644 taskflow/examples/retry_flow.out.txt create mode 100644 taskflow/examples/retry_flow.py diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index ae3677ff3..75e202a6a 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -65,22 +65,6 @@ class ActionEngine(base.EngineBase): self._task_action = None self._retry_action = None - def _revert(self, current_failure=None): - self._change_state(states.REVERTING) - try: - state = self._root.revert() - except Exception: - with excutils.save_and_reraise_exception(): - self._change_state(states.FAILURE) - - self._change_state(state) - if state == states.SUSPENDED: - return - failures = self.storage.get_failures() - misc.Failure.reraise_if_any(failures.values()) - if current_failure: - current_failure.reraise() - def __str__(self): return "%s: %s" % (reflection.get_class_name(self), id(self)) @@ -107,10 +91,7 @@ class ActionEngine(base.EngineBase): raise exc.MissingDependencies(self._flow, sorted(missing)) self._task_executor.start() try: - if self.storage.has_failures(): - self._revert() - else: - self._run() + self._run() finally: self._task_executor.stop() @@ -119,10 +100,13 @@ class ActionEngine(base.EngineBase): try: state = self._root.execute() except Exception: - self._change_state(states.FAILURE) - self._revert(misc.Failure()) + with excutils.save_and_reraise_exception(): + self._change_state(states.FAILURE) else: self._change_state(state) + if state != states.SUSPENDED and state != states.SUCCESS: + failures = self.storage.get_failures() + misc.Failure.reraise_if_any(failures.values()) @lock_utils.locked(lock='_state_lock') def _change_state(self, state): diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index da5a3f25e..a0099478e 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -14,6 +14,8 @@ # License for the specific language governing permissions and limitations # under the License. +from taskflow.engines.action_engine import executor as ex +from taskflow import retry as r from taskflow import states as st from taskflow import task from taskflow.utils import misc @@ -39,47 +41,39 @@ class FutureGraphAction(object): def is_running(self): return self._storage.get_flow_state() == st.RUNNING - def is_reverting(self): - return self._storage.get_flow_state() == st.REVERTING - def execute(self): - was_suspended = self._run( - self.is_running, - self._task_action.schedule_execution, - self._task_action.complete_execution, - self._retry_action.execute, - self._analyzer.browse_nodes_for_execute) - return st.SUSPENDED if was_suspended else st.SUCCESS + was_suspended = self._run() + if was_suspended: + return st.SUSPENDED + if self._analyzer.is_success(): + return st.SUCCESS + else: + return st.REVERTED - def revert(self): - was_suspended = self._run( - self.is_reverting, - self._task_action.schedule_reversion, - self._task_action.complete_reversion, - self._retry_action.revert, - self._analyzer.browse_nodes_for_revert) - return st.SUSPENDED if was_suspended else st.REVERTED - - def _run(self, running, schedule_task, complete_task, - complete_retry, get_next_nodes): + def _run(self): def schedule(nodes, not_done): for node in nodes: + # Returns schedule function for current atom and + # executes scheduling if isinstance(node, task.BaseTask): - future = schedule_task(node) + future = self._schedule_task(node) + elif isinstance(node, r.Retry): + future = self._schedule_retry(node) else: - # Retry controller is always executed immediately in the - # main thread and it should not be scheduled. - future = complete_retry(node) + raise TypeError("Unknown how to schedule node %s" % node) if future is not None: not_done.append(future) else: - schedule(get_next_nodes(node), not_done) + schedule(self._analyzer.get_next_nodes(node), not_done) - failures = [] not_done = [] - schedule(get_next_nodes(), not_done) + # Prepare flow to be resumed + next_nodes = self._prepare_flow_for_resume() + next_nodes.update(self._analyzer.get_next_nodes()) + schedule(next_nodes, not_done) was_suspended = False + failures = [] while not_done: # NOTE(imelnikov): if timeout occurs before any of futures # completes, done list will be empty and we'll just go @@ -87,20 +81,26 @@ class FutureGraphAction(object): done, not_done = self._task_action.wait_for_any( not_done, _WAITING_TIMEOUT) - next_nodes = [] + next_nodes = set() for future in done: - # NOTE(harlowja): event will be used in the future for smart - # reversion (ignoring it for now). - node, _event, result = future.result() + node, event, result = future.result() if isinstance(node, task.BaseTask): - complete_task(node, result) + self._complete_task(node, event, result) + intention = self._storage.get_atom_intention(node.name) + if event == ex.EXECUTED and intention == st.REVERT: + next_nodes.add(node) if isinstance(result, misc.Failure): - failures.append(result) + if event == ex.EXECUTED: + self._process_atom_failure(node, result) + next_nodes.update( + self._analyzer.browse_nodes_for_revert()) + else: + failures.append(result) else: - next_nodes.extend(get_next_nodes(node)) + next_nodes.update(self._analyzer.get_next_nodes(node)) if next_nodes: - if running() and not failures: + if self.is_running() and not failures: schedule(next_nodes, not_done) else: # NOTE(imelnikov): engine stopped while there were @@ -108,5 +108,84 @@ class FutureGraphAction(object): # or were suspended. was_suspended = True - misc.Failure.reraise_if_any(failures) + if failures: + misc.Failure.reraise_if_any(failures) return was_suspended + + def _schedule_task(self, task): + """Schedules the given task for revert or execute depending + on its intention. + """ + intention = self._storage.get_atom_intention(task.name) + if intention == st.EXECUTE: + return self._task_action.schedule_execution(task) + elif intention == st.REVERT: + return self._task_action.schedule_reversion(task) + + def _complete_task(self, task, event, result): + """Completes the given task, process task failure.""" + if event == ex.EXECUTED: + self._task_action.complete_execution(task, result) + else: + self._task_action.complete_reversion(task, result) + + def _schedule_retry(self, retry): + """Schedules the given retry for revert or execute depending + on its intention. + """ + intention = self._storage.get_atom_intention(retry.name) + if intention == st.EXECUTE: + return self._retry_action.execute(retry) + elif intention == st.REVERT: + return self._retry_action.revert(retry) + elif intention == st.RETRY: + self._retry_action.change_state(retry, st.RETRYING) + self._retry_subflow(retry) + return self._retry_action.execute(retry) + + def _process_atom_failure(self, atom, failure): + """On atom failure find its retry controller, ask for the action to + perform with failed subflow and set proper intention for subflow nodes. + """ + retry = self._analyzer.find_atom_retry(atom) + if retry: + # Ask retry controller what to do in case of failure + action = self._retry_action.on_failure(retry, atom, failure) + if action == r.RETRY: + # Prepare subflow for revert + self._storage.set_atom_intention(retry.name, st.RETRY) + for node in self._analyzer.iterate_subgraph(retry): + self._storage.set_atom_intention(node.name, st.REVERT) + elif action == r.REVERT: + # Ask parent checkpoint + self._process_atom_failure(retry, failure) + elif action == r.REVERT_ALL: + # Prepare all flow for revert + self._revert_all() + else: + self._revert_all() + + def _revert_all(self): + for node in self._analyzer.iterate_all_nodes(): + self._storage.set_atom_intention(node.name, st.REVERT) + + def _prepare_flow_for_resume(self): + for node in self._analyzer.iterate_all_nodes(): + if self._analyzer.get_state(node) == st.FAILURE: + self._process_atom_failure(node, self._storage.get(node.name)) + for retry in self._analyzer.iterate_retries(st.RETRYING): + self._retry_subflow(retry) + next_nodes = set() + for node in self._analyzer.iterate_all_nodes(): + if self._analyzer.get_state(node) in (st.RUNNING, st.REVERTING): + next_nodes.add(node) + return next_nodes + + def _retry_subflow(self, retry): + self._storage.set_atom_intention(retry.name, st.EXECUTE) + for node in self._analyzer.iterate_subgraph(retry): + if isinstance(node, task.BaseTask): + self._task_action.change_state(node, st.PENDING, progress=0.0) + else: + self._retry_action.change_state(node, st.PENDING) + self._storage.set_atom_intention(node.name, st.EXECUTE) diff --git a/taskflow/engines/action_engine/graph_analyzer.py b/taskflow/engines/action_engine/graph_analyzer.py index 0c2a7db86..8e86ba842 100644 --- a/taskflow/engines/action_engine/graph_analyzer.py +++ b/taskflow/engines/action_engine/graph_analyzer.py @@ -16,6 +16,7 @@ import six +from taskflow import retry as r from taskflow import states as st @@ -33,6 +34,11 @@ class GraphAnalyzer(object): def execution_graph(self): return self._graph + def get_next_nodes(self, node=None): + execute = self.browse_nodes_for_execute(node) + revert = self.browse_nodes_for_revert(node) + return execute + revert + def browse_nodes_for_execute(self, node=None): """Browse next nodes to execute for given node if specified and for whole graph otherwise. @@ -66,8 +72,10 @@ class GraphAnalyzer(object): def _is_ready_for_execute(self, task): """Checks if task is ready to be executed.""" - state = self._storage.get_task_state(task.name) - if not st.check_task_transition(state, st.RUNNING): + state = self.get_state(task) + intention = self._storage.get_atom_intention(task.name) + transition = st.check_task_transition(state, st.RUNNING) + if not transition or intention != st.EXECUTE: return False task_names = [] @@ -75,14 +83,16 @@ class GraphAnalyzer(object): task_names.append(prev_task.name) task_states = self._storage.get_tasks_states(task_names) - return all(state == st.SUCCESS - for state in six.itervalues(task_states)) + return all(state == st.SUCCESS and intention == st.EXECUTE + for state, intention in six.itervalues(task_states)) def _is_ready_for_revert(self, task): """Checks if task is ready to be reverted.""" - state = self._storage.get_task_state(task.name) - if not st.check_task_transition(state, st.REVERTING): + state = self.get_state(task) + intention = self._storage.get_atom_intention(task.name) + transition = st.check_task_transition(state, st.REVERTING) + if not transition or intention not in (st.REVERT, st.RETRY): return False task_names = [] @@ -91,4 +101,50 @@ class GraphAnalyzer(object): task_states = self._storage.get_tasks_states(task_names) return all(state in (st.PENDING, st.REVERTED) - for state in six.itervalues(task_states)) + for state, intention in six.itervalues(task_states)) + + def iterate_subgraph(self, retry): + """Iterates a subgraph connected to current retry controller, including + nested retry controllers and its nodes. + """ + visited_nodes = set() + retries_scope = set() + retries_scope.add(retry) + + nodes = self._graph.successors(retry) + while nodes: + next_nodes = [] + for node in nodes: + if node not in visited_nodes: + visited_nodes.add(node) + if self.find_atom_retry(node) in retries_scope: + yield node + if isinstance(node, r.Retry): + retries_scope.add(node) + next_nodes += self._graph.successors(node) + nodes = next_nodes + + def iterate_retries(self, state=None): + """Iterates retry controllers of a graph with given state or all + retries if state is None. + """ + for node in self._graph.nodes_iter(): + if isinstance(node, r.Retry): + if not state or self.get_state(node) == state: + yield node + + def iterate_all_nodes(self): + for node in self._graph.nodes_iter(): + yield node + + def find_atom_retry(self, atom): + return self._graph.node[atom].get('retry') + + def is_success(self): + for node in self._graph.nodes_iter(): + if self.get_state(node) != st.SUCCESS: + return False + return True + + def get_state(self, node): + return self._storage.get_task_state(node.name) diff --git a/taskflow/engines/action_engine/retry_action.py b/taskflow/engines/action_engine/retry_action.py index 68f932829..61b53de29 100644 --- a/taskflow/engines/action_engine/retry_action.py +++ b/taskflow/engines/action_engine/retry_action.py @@ -18,7 +18,9 @@ import logging +from taskflow.engines.action_engine import executor as ex from taskflow import states +from taskflow.utils import async_utils from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -36,14 +38,14 @@ class RetryAction(object): kwargs['history'] = self._storage.get_retry_history(retry.name) return kwargs - def _change_state(self, retry, state, result=None): + def change_state(self, retry, state, result=None): old_state = self._storage.get_task_state(retry.name) - if not states.check_task_transition(old_state, state): - return False + if old_state == state: + return state != states.PENDING if state in SAVE_RESULT_STATES: self._storage.save(retry.name, result, state) elif state == states.REVERTED: - self.storage.cleanup_retry_history(retry.name, state) + self._storage.cleanup_retry_history(retry.name, state) else: self._storage.set_task_state(retry.name, state) @@ -55,25 +57,33 @@ class RetryAction(object): return True def execute(self, retry): - if not self._change_state(retry, states.RUNNING): + if not self.change_state(retry, states.RUNNING): return kwargs = self._get_retry_args(retry) try: result = retry.execute(**kwargs) except Exception: result = misc.Failure() - self._change_state(retry, states.FAILURE, result=result) + self.change_state(retry, states.FAILURE, result=result) else: - self._change_state(retry, states.SUCCESS, result=result) + self.change_state(retry, states.SUCCESS, result=result) + return async_utils.make_completed_future((retry, ex.EXECUTED, result)) def revert(self, retry): - if not self._change_state(retry, states.REVERTING): + if not self.change_state(retry, states.REVERTING): return kwargs = self._get_retry_args(retry) kwargs['flow_failures'] = self._storage.get_failures() try: - retry.revert(**kwargs) + result = retry.revert(**kwargs) except Exception: - self._change_state(retry, states.FAILURE) + result = misc.Failure() + self.change_state(retry, states.FAILURE) else: - self._change_state(retry, states.REVERTED) + self.change_state(retry, states.REVERTED) + return async_utils.make_completed_future((retry, ex.REVERTED, result)) + + def on_failure(self, retry, atom, last_failure): + self._storage.save_retry_failure(retry.name, atom.name, last_failure) + kwargs = self._get_retry_args(retry) + return retry.on_failure(**kwargs) diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index 650a9f9ed..3f05f6e1e 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -31,10 +31,10 @@ class TaskAction(object): self._task_executor = task_executor self._notifier = notifier - def _change_state(self, task, state, result=None, progress=None): + def change_state(self, task, state, result=None, progress=None): old_state = self._storage.get_task_state(task.name) - if not states.check_task_transition(old_state, state): - return False + if old_state == state: + return state != states.PENDING if state in SAVE_RESULT_STATES: self._storage.save(task.name, result, state) else: @@ -62,7 +62,7 @@ class TaskAction(object): task, progress) def schedule_execution(self, task): - if not self._change_state(task, states.RUNNING, progress=0.0): + if not self.change_state(task, states.RUNNING, progress=0.0): return kwargs = self._storage.fetch_mapped_args(task.rebind) task_uuid = self._storage.get_task_uuid(task.name) @@ -71,13 +71,13 @@ class TaskAction(object): def complete_execution(self, task, result): if isinstance(result, misc.Failure): - self._change_state(task, states.FAILURE, result=result) + self.change_state(task, states.FAILURE, result=result) else: - self._change_state(task, states.SUCCESS, - result=result, progress=1.0) + self.change_state(task, states.SUCCESS, + result=result, progress=1.0) def schedule_reversion(self, task): - if not self._change_state(task, states.REVERTING, progress=0.0): + if not self.change_state(task, states.REVERTING, progress=0.0): return kwargs = self._storage.fetch_mapped_args(task.rebind) task_uuid = self._storage.get_task_uuid(task.name) @@ -90,9 +90,9 @@ class TaskAction(object): def complete_reversion(self, task, rev_result): if isinstance(rev_result, misc.Failure): - self._change_state(task, states.FAILURE) + self.change_state(task, states.FAILURE) else: - self._change_state(task, states.REVERTED, progress=1.0) + self.change_state(task, states.REVERTED, progress=1.0) def wait_for_any(self, fs, timeout): return self._task_executor.wait_for_any(fs, timeout) diff --git a/taskflow/examples/retry_flow.out.txt b/taskflow/examples/retry_flow.out.txt new file mode 100644 index 000000000..5064f5bc9 --- /dev/null +++ b/taskflow/examples/retry_flow.out.txt @@ -0,0 +1,6 @@ +Calling jim 333. +Wrong number, apologizing. +Calling jim 444. +Wrong number, apologizing. +Calling jim 555. +Hello Jim! diff --git a/taskflow/examples/retry_flow.py b/taskflow/examples/retry_flow.py new file mode 100644 index 000000000..f632dc586 --- /dev/null +++ b/taskflow/examples/retry_flow.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +import taskflow.engines +from taskflow.patterns import linear_flow as lf +from taskflow import retry +from taskflow import task + +# INTRO: In this example we create a retry controller that receives a phone +# directory and tries different phone numbers. The next task tries to call Jim +# using the given number. If if is not a Jim's number, the tasks raises an +# exception and retry controller takes the next number from the phone +# directory and retries the call. +# +# This example shows a basic usage of retry controllers in a flow. +# Retry controllers allows to revert and retry a failed subflow with new +# parameters. + + +class CallJim(task.Task): + def execute(self, jim_number): + print ("Calling jim %s." % jim_number) + if jim_number != 555: + raise Exception("Wrong number!") + else: + print ("Hello Jim!") + + def revert(self, jim_number, **kwargs): + print ("Wrong number, apologizing.") + + +# Create your flow and associated tasks (the work to be done). +flow = lf.Flow('retrying-linear', + retry=retry.ParameterizedForEach( + rebind=['phone_directory'], + provides='jim_number')).add(CallJim()) + +# Now run that flow using the provided initial data (store below). +taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]}) diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index 35f38edc8..d60fdac0e 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -195,6 +195,10 @@ class AtomDetail(object): def atom_type(self): """Identifies atom type represented by this detail.""" + @abc.abstractmethod + def reset(self, state): + """Resets detail results ans failures.""" + class TaskDetail(AtomDetail): """This class represents a task detail for flow task object.""" @@ -205,6 +209,12 @@ class TaskDetail(AtomDetail): def atom_type(self): return TASK_DETAIL + def reset(self, state): + self.results = None + self.failure = None + self.state = state + self.intention = states.EXECUTE + class RetryDetail(AtomDetail): """This class represents a retry detail for retry controller object.""" @@ -216,6 +226,12 @@ class RetryDetail(AtomDetail): def atom_type(self): return RETRY_DETAIL + def reset(self, state): + self.results = [] + self.failure = None + self.state = state + self.intention = states.EXECUTE + def get_atom_detail_class(atom_type): if atom_type == TASK_DETAIL: diff --git a/taskflow/retry.py b/taskflow/retry.py index 9da0c5417..d1ce6b05f 100644 --- a/taskflow/retry.py +++ b/taskflow/retry.py @@ -23,6 +23,7 @@ import logging import six from taskflow import atom +from taskflow import exceptions as exc LOG = logging.getLogger(__name__) @@ -99,3 +100,86 @@ class AlwaysRevert(Retry): def execute(self, *args, **kwargs): pass + + +class AlwaysRevertAll(Retry): + """Retry that always reverts a whole flow.""" + + def on_failure(self, **kwargs): + return REVERT_ALL + + def execute(self, **kwargs): + pass + + +class Times(Retry): + """Retries subflow given number of times. Returns attempt number.""" + + def __init__(self, attempts=1, name=None, provides=None, requires=None, + auto_extract=True, rebind=None): + super(Times, self).__init__(name, provides, requires, + auto_extract, rebind) + self._attempts = attempts + + def on_failure(self, history, *args, **kwargs): + if len(history) < self._attempts: + return RETRY + return REVERT + + def execute(self, history, *args, **kwargs): + return len(history)+1 + + +class ForEachBase(Retry): + """Base class for retries that iterate given collection.""" + + def _get_next_value(self, values, history): + values = list(values) # copy it + for (item, failures) in history: + try: + values.remove(item) # remove exactly one element from item + except ValueError: + # one of the results is not in our list now -- who cares? + pass + if not values: + raise exc.NotFound("No elements left in collection of iterable " + "retry controller %s" % self.name) + return values[0] + + def _on_failure(self, values, history): + try: + self._get_next_value(values, history) + except exc.NotFound: + return REVERT + else: + return RETRY + + +class ForEach(ForEachBase): + """Accepts a collection of values to the constructor. Returns the next + element of the collection on each try. + """ + + def __init__(self, values, name=None, provides=None, requires=None, + auto_extract=True, rebind=None): + super(ForEach, self).__init__(name, provides, requires, + auto_extract, rebind) + self._values = values + + def on_failure(self, history, *args, **kwargs): + return self._on_failure(self._values, history) + + def execute(self, history, *args, **kwargs): + return self._get_next_value(self._values, history) + + +class ParameterizedForEach(ForEachBase): + """Accepts a collection of values from storage as a parameter of execute + method. Returns the next element of the collection on each try. + """ + + def on_failure(self, values, history, *args, **kwargs): + return self._on_failure(values, history) + + def execute(self, values, history, *args, **kwargs): + return self._get_next_value(values, history) diff --git a/taskflow/states.py b/taskflow/states.py index 540319cf7..14893857d 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -36,8 +36,10 @@ RESUMING = 'RESUMING' FAILURE = FAILURE PENDING = PENDING REVERTED = REVERTED -REVERTING = REVERTING +REVERTING = 'REVERTING' SUCCESS = SUCCESS +RUNNING = RUNNING +RETRYING = 'RETRYING' # Atom intentions. EXECUTE = 'EXECUTE' @@ -58,18 +60,13 @@ _ALLOWED_FLOW_TRANSITIONS = frozenset(( (RUNNING, SUCCESS), # all tasks finished successfully (RUNNING, FAILURE), # some of task failed + (RUNNING, REVERTED), # some of task failed and flow has been reverted (RUNNING, SUSPENDING), # engine.suspend was called (RUNNING, RESUMING), # resuming from a previous running (SUCCESS, RUNNING), # see note below (FAILURE, RUNNING), # see note below - (FAILURE, REVERTING), # flow failed, do cleanup now - - (REVERTING, REVERTED), # revert done - (REVERTING, FAILURE), # revert failed - (REVERTING, SUSPENDING), # engine.suspend was called - (REVERTING, RESUMING), # resuming from a previous reverting (REVERTED, PENDING), # try again @@ -80,7 +77,6 @@ _ALLOWED_FLOW_TRANSITIONS = frozenset(( (SUSPENDING, RESUMING), # resuming from a previous suspending (SUSPENDED, RUNNING), # restart from suspended - (SUSPENDED, REVERTING), # revert from suspended (RESUMING, SUSPENDED), # after flow resumed, it is suspended )) @@ -144,52 +140,17 @@ _ALLOWED_TASK_TRANSITIONS = frozenset(( (REVERTED, PENDING), # try again - # NOTE(harlowja): allow the tasks to restart if in the same state - # as a they were in before as a task may be 'killed' while in one of the - # below states and it is permissible to let the task to re-enter that - # same state to try to finish. - (REVERTING, REVERTING), - (RUNNING, RUNNING), - - # NOTE(harlowja): the task was 'killed' while in one of the starting/ending - # states and it is permissible to let the task to start running or - # reverting again (if it really wants too). - (REVERTING, RUNNING), - (RUNNING, REVERTING), + (SUCCESS, RETRYING), # retrying retry controller + (RETRYING, RUNNING), # run retry controller that has been retrying )) -_IGNORED_TASK_TRANSITIONS = [ - (SUCCESS, RUNNING), # already finished - (PENDING, REVERTING), # never ran in the first place - (REVERTED, REVERTING), # the task already reverted -] - -# NOTE(harlowja): ignore transitions to the same state (in these cases). -# -# NOTE(harlowja): the above ALLOWED_TASK_TRANSITIONS does allow -# transitions to certain equivalent states (but only for a few special -# cases). -_IGNORED_TASK_TRANSITIONS.extend( - (a, a) for a in (PENDING, FAILURE, SUCCESS, REVERTED) -) - -_IGNORED_TASK_TRANSITIONS = frozenset(_IGNORED_TASK_TRANSITIONS) - def check_task_transition(old_state, new_state): """Check that task can transition from old_state to new_state. - If transition can be performed, it returns True. If transition - should be ignored, it returns False. If transition is not - valid, it raises InvalidState exception. + If transition can be performed, it returns True, False otherwise. """ pair = (old_state, new_state) if pair in _ALLOWED_TASK_TRANSITIONS: return True - if pair in _IGNORED_TASK_TRANSITIONS: - return False - # TODO(harlowja): Should we check/allow for 3rd party states to be - # triggered during RUNNING by having a concept of a sub-state that we also - # verify against?? - raise exc.InvalidState("Task transition from %s to %s is not allowed" - % pair) + return False diff --git a/taskflow/storage.py b/taskflow/storage.py index a9e943c79..ab641ad01 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -215,7 +215,8 @@ class Storage(object): def get_tasks_states(self, task_names): """Gets all task states.""" with self._lock.read_lock(): - return dict((name, self.get_task_state(name)) + return dict((name, (self.get_task_state(name), + self.get_atom_intention(name))) for name in task_names) def update_task_metadata(self, task_name, update_with): @@ -314,6 +315,18 @@ class Storage(object): self._check_all_results_provided(td.name, data) self._with_connection(self._save_task_detail, task_detail=td) + def save_retry_failure(self, retry_name, failed_atom_name, failure): + """Save subflow failure to retry controller history.""" + with self._lock.write_lock(): + td = self._taskdetail_by_name(retry_name) + if td.atom_type != logbook.RETRY_DETAIL: + raise TypeError( + "Atom %s is not a retry controller." % retry_name) + failures = td.results[-1][1] + if failed_atom_name not in failures: + failures[failed_atom_name] = failure + self._with_connection(self._save_task_detail, task_detail=td) + def cleanup_retry_history(self, retry_name, state): """Cleanup history of retry with given name.""" with self._lock.write_lock(): @@ -354,9 +367,7 @@ class Storage(object): return False if td.state == state: return False - td.results = None - td.failure = None - td.state = state + td.reset(state) self._failures.pop(td.name, None) return True @@ -501,6 +512,14 @@ class Storage(object): """Fetch retry results history.""" with self._lock.read_lock(): td = self._taskdetail_by_name(name) + if td.failure is not None: + cached = self._failures.get(name) + history = list(td.results) + if td.failure.matches(cached): + history.append((cached, {})) + else: + history.append((td.failure, {})) + return history return td.results diff --git a/taskflow/test.py b/taskflow/test.py index 6699a3535..2557a132b 100644 --- a/taskflow/test.py +++ b/taskflow/test.py @@ -151,6 +151,13 @@ class TestCase(testcase.TestCase): except exceptions.WrappedFailure as e: self.assertThat(e, FailureRegexpMatcher(exc_class, pattern)) + def assertIsContainsSameElements(self, seq1, seq2, msg=None): + if sorted(seq1) != sorted(seq2): + if msg is None: + msg = ("%r doesn't contain same elements as %r." + % (seq1, seq2)) + self.fail(msg) + class MockTestCase(TestCase): diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index e86ae62ea..28429d06b 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -62,10 +62,7 @@ class EngineTaskTest(utils.EngineTestBase): def test_run_task_with_notifications(self): flow = utils.SaveOrderTask(name='task1') engine = self._make_engine(flow) - engine.notifier.register('*', self._flow_callback, - kwargs={'values': self.values}) - engine.task_notifier.register('*', self._callback, - kwargs={'values': self.values}) + utils.register_notifiers(engine, self.values) engine.run() self.assertEqual(self.values, ['flow RUNNING', @@ -77,15 +74,10 @@ class EngineTaskTest(utils.EngineTestBase): def test_failing_task_with_notifications(self): flow = utils.FailingTask('fail') engine = self._make_engine(flow) - engine.notifier.register('*', self._flow_callback, - kwargs={'values': self.values}) - engine.task_notifier.register('*', self._callback, - kwargs={'values': self.values}) + utils.register_notifiers(engine, self.values) expected = ['flow RUNNING', 'fail RUNNING', 'fail FAILURE', - 'flow FAILURE', - 'flow REVERTING', 'fail REVERTING', 'fail reverted(Failure: RuntimeError: Woot!)', 'fail REVERTED', diff --git a/taskflow/tests/unit/test_arguments_passing.py b/taskflow/tests/unit/test_arguments_passing.py index 79b5a7b91..0a038bd1e 100644 --- a/taskflow/tests/unit/test_arguments_passing.py +++ b/taskflow/tests/unit/test_arguments_passing.py @@ -47,9 +47,9 @@ class ArgumentsPassingTest(utils.EngineTestBase): }) def test_save_dict(self): - flow = utils.TaskMultiDictk(provides=set(['badger', - 'mushroom', - 'snake'])) + flow = utils.TaskMultiDict(provides=set(['badger', + 'mushroom', + 'snake'])) engine = self._make_engine(flow) engine.run() self.assertEqual(engine.storage.fetch_all(), { diff --git a/taskflow/tests/unit/test_check_transition.py b/taskflow/tests/unit/test_check_transition.py index 0d532e4d6..082f46c2e 100644 --- a/taskflow/tests/unit/test_check_transition.py +++ b/taskflow/tests/unit/test_check_transition.py @@ -74,39 +74,46 @@ class CheckTaskTransitionTest(TransitionTest): def test_from_pending_state(self): self.assertTransitions(from_state=states.PENDING, allowed=(states.RUNNING,), - ignored=(states.PENDING, states.REVERTING), - forbidden=(states.SUCCESS, states.FAILURE, - states.REVERTED)) + ignored=(states.PENDING, states.REVERTING, + states.SUCCESS, states.FAILURE, + states.REVERTED)) def test_from_running_state(self): self.assertTransitions(from_state=states.RUNNING, - allowed=(states.RUNNING, states.SUCCESS, - states.FAILURE, states.REVERTING), - forbidden=(states.PENDING, states.REVERTED)) + allowed=(states.SUCCESS, states.FAILURE,), + ignored=(states.REVERTING, states.RUNNING, + states.PENDING, states.REVERTED)) def test_from_success_state(self): self.assertTransitions(from_state=states.SUCCESS, - allowed=(states.REVERTING,), - ignored=(states.RUNNING, states.SUCCESS), - forbidden=(states.PENDING, states.FAILURE, - states.REVERTED)) + allowed=(states.REVERTING, states.RETRYING), + ignored=(states.RUNNING, states.SUCCESS, + states.PENDING, states.FAILURE, + states.REVERTED)) def test_from_failure_state(self): self.assertTransitions(from_state=states.FAILURE, allowed=(states.REVERTING,), - ignored=(states.FAILURE,), - forbidden=(states.PENDING, states.RUNNING, - states.SUCCESS, states.REVERTED)) + ignored=(states.FAILURE, states.RUNNING, + states.PENDING, + states.SUCCESS, states.REVERTED)) def test_from_reverting_state(self): self.assertTransitions(from_state=states.REVERTING, - allowed=(states.RUNNING, states.FAILURE, - states.REVERTING, states.REVERTED), - forbidden=(states.PENDING, states.SUCCESS)) + allowed=(states.FAILURE, states.REVERTED), + ignored=(states.RUNNING, states.REVERTING, + states.PENDING, states.SUCCESS)) def test_from_reverted_state(self): self.assertTransitions(from_state=states.REVERTED, allowed=(states.PENDING,), - ignored=(states.REVERTING, states.REVERTED), - forbidden=(states.RUNNING, states.SUCCESS, - states.FAILURE)) + ignored=(states.REVERTING, states.REVERTED, + states.RUNNING, + states.SUCCESS, states.FAILURE)) + + def test_from_retrying_state(self): + self.assertTransitions(from_state=states.RETRYING, + allowed=(states.RUNNING,), + ignored=(states.RETRYING, states.SUCCESS, + states.PENDING, states.FAILURE, + states.REVERTED)) diff --git a/taskflow/tests/unit/test_flow_dependencies.py b/taskflow/tests/unit/test_flow_dependencies.py index cec2c6dde..9e3a25c86 100644 --- a/taskflow/tests/unit/test_flow_dependencies.py +++ b/taskflow/tests/unit/test_flow_dependencies.py @@ -441,3 +441,15 @@ class FlowDependenciesTest(test.TestCase): flow.add, lf.Flow('lf2', retry.AlwaysRevert('rt2', provides=['x']))) + + def test_builtin_retry_args(self): + + class FullArgsRetry(retry.AlwaysRevert): + def execute(self, history, **kwargs): + pass + + def revert(self, history, **kwargs): + pass + + flow = lf.Flow('lf', retry=FullArgsRetry(requires='a')) + self.assertEqual(flow.requires, set(['a'])) diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 81d260d42..45e794eb7 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -16,18 +16,18 @@ # License for the specific language governing permissions and limitations # under the License. -import testtools - from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf import taskflow.engines +from taskflow import exceptions as exc +from taskflow import retry +from taskflow import states as st from taskflow import test from taskflow.tests import utils - -from taskflow.utils import eventlet_utils as eu +from taskflow.utils import misc class RetryTest(utils.EngineTestBase): @@ -50,6 +50,591 @@ class RetryTest(utils.EngineTestBase): engine.run() self.assertEqual(engine.storage.fetch_all(), {'x': 1}) + def test_states_retry_success_linear_flow(self): + flow = lf.Flow('flow-1', retry.Times(4, 'r1', provides='x')).add( + utils.SaveOrderTask("task1"), + utils.ConditionalTask("task2") + ) + engine = self._make_engine(flow) + utils.register_notifiers(engine, self.values) + engine.storage.inject({'y': 2}) + engine.run() + self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2}) + expected = ['flow RUNNING', + 'r1 RUNNING', + 'r1 SUCCESS', + 'task1 RUNNING', + 'task1', + 'task1 SUCCESS', + 'task2 RUNNING', + 'task2', + 'task2 FAILURE', + 'task2 REVERTING', + u'task2 reverted(Failure: RuntimeError: Woot!)', + 'task2 REVERTED', + 'task1 REVERTING', + 'task1 reverted(5)', + 'task1 REVERTED', + 'r1 RETRYING', + 'task1 PENDING', + 'task2 PENDING', + 'r1 RUNNING', + 'r1 SUCCESS', + 'task1 RUNNING', + 'task1', + 'task1 SUCCESS', + 'task2 RUNNING', + 'task2', + 'task2 SUCCESS', + 'flow SUCCESS'] + self.assertEqual(self.values, expected) + + def test_states_retry_reverted_linear_flow(self): + flow = lf.Flow('flow-1', retry.Times(2, 'r1', provides='x')).add( + utils.SaveOrderTask("task1"), + utils.ConditionalTask("task2") + ) + engine = self._make_engine(flow) + utils.register_notifiers(engine, self.values) + engine.storage.inject({'y': 4}) + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + self.assertEqual(engine.storage.fetch_all(), {'y': 4}) + expected = ['flow RUNNING', + 'r1 RUNNING', + 'r1 SUCCESS', + 'task1 RUNNING', + 'task1', + 'task1 SUCCESS', + 'task2 RUNNING', + 'task2', + 'task2 FAILURE', + 'task2 REVERTING', + u'task2 reverted(Failure: RuntimeError: Woot!)', + 'task2 REVERTED', + 'task1 REVERTING', + 'task1 reverted(5)', + 'task1 REVERTED', + 'r1 RETRYING', + 'task1 PENDING', + 'task2 PENDING', + 'r1 RUNNING', + 'r1 SUCCESS', + 'task1 RUNNING', + 'task1', + 'task1 SUCCESS', + 'task2 RUNNING', + 'task2', + 'task2 FAILURE', + 'task2 REVERTING', + u'task2 reverted(Failure: RuntimeError: Woot!)', + 'task2 REVERTED', + 'task1 REVERTING', + 'task1 reverted(5)', + 'task1 REVERTED', + 'r1 REVERTING', + 'r1 REVERTED', + 'flow REVERTED'] + self.assertEqual(self.values, expected) + + def test_states_retry_failure_linear_flow(self): + flow = lf.Flow('flow-1', retry.Times(2, 'r1', provides='x')).add( + utils.NastyTask("task1"), + utils.ConditionalTask("task2") + ) + engine = self._make_engine(flow) + utils.register_notifiers(engine, self.values) + engine.storage.inject({'y': 4}) + self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run) + self.assertEqual(engine.storage.fetch_all(), {'y': 4, 'x': 1}) + expected = ['flow RUNNING', + 'r1 RUNNING', + 'r1 SUCCESS', + 'task1 RUNNING', + 'task1 SUCCESS', + 'task2 RUNNING', + 'task2', + 'task2 FAILURE', + 'task2 REVERTING', + u'task2 reverted(Failure: RuntimeError: Woot!)', + 'task2 REVERTED', + 'task1 REVERTING', + 'task1 FAILURE', + 'flow FAILURE'] + self.assertEqual(self.values, expected) + + def test_states_retry_failure_nested_flow_fails(self): + flow = lf.Flow('flow-1', utils.retry.AlwaysRevert('r1')).add( + utils.TaskNoRequiresNoReturns("task1"), + lf.Flow('flow-2', retry.Times(3, 'r2', provides='x')).add( + utils.TaskNoRequiresNoReturns("task2"), + utils.ConditionalTask("task3") + ), + utils.TaskNoRequiresNoReturns("task4") + ) + engine = self._make_engine(flow) + utils.register_notifiers(engine, self.values) + engine.storage.inject({'y': 2}) + engine.run() + self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2}) + expected = ['flow RUNNING', + 'r1 RUNNING', + 'r1 SUCCESS', + 'task1 RUNNING', + 'task1 SUCCESS', + 'r2 RUNNING', + 'r2 SUCCESS', + 'task2 RUNNING', + 'task2 SUCCESS', + 'task3 RUNNING', + 'task3', + 'task3 FAILURE', + 'task3 REVERTING', + u'task3 reverted(Failure: RuntimeError: Woot!)', + 'task3 REVERTED', + 'task2 REVERTING', + 'task2 REVERTED', + 'r2 RETRYING', + 'task2 PENDING', + 'task3 PENDING', + 'r2 RUNNING', + 'r2 SUCCESS', + 'task2 RUNNING', + 'task2 SUCCESS', + 'task3 RUNNING', + 'task3', + 'task3 SUCCESS', + 'task4 RUNNING', + 'task4 SUCCESS', + 'flow SUCCESS'] + self.assertEqual(self.values, expected) + + def test_states_retry_failure_parent_flow_fails(self): + flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x1')).add( + utils.TaskNoRequiresNoReturns("task1"), + lf.Flow('flow-2', retry.Times(3, 'r2', provides='x2')).add( + utils.TaskNoRequiresNoReturns("task2"), + utils.TaskNoRequiresNoReturns("task3") + ), + utils.ConditionalTask("task4", rebind={'x': 'x1'}) + ) + engine = self._make_engine(flow) + utils.register_notifiers(engine, self.values) + engine.storage.inject({'y': 2}) + engine.run() + self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x1': 2, + 'x2': 1}) + expected = ['flow RUNNING', + 'r1 RUNNING', + 'r1 SUCCESS', + 'task1 RUNNING', + 'task1 SUCCESS', + 'r2 RUNNING', + 'r2 SUCCESS', + 'task2 RUNNING', + 'task2 SUCCESS', + 'task3 RUNNING', + 'task3 SUCCESS', + 'task4 RUNNING', + 'task4', + 'task4 FAILURE', + 'task4 REVERTING', + u'task4 reverted(Failure: RuntimeError: Woot!)', + 'task4 REVERTED', + 'task3 REVERTING', + 'task3 REVERTED', + 'task2 REVERTING', + 'task2 REVERTED', + 'r2 REVERTING', + 'r2 REVERTED', + 'task1 REVERTING', + 'task1 REVERTED', + 'r1 RETRYING', + 'task1 PENDING', + 'r2 PENDING', + 'task2 PENDING', + 'task3 PENDING', + 'task4 PENDING', + 'r1 RUNNING', + 'r1 SUCCESS', + 'task1 RUNNING', + 'task1 SUCCESS', + 'r2 RUNNING', + 'r2 SUCCESS', + 'task2 RUNNING', + 'task2 SUCCESS', + 'task3 RUNNING', + 'task3 SUCCESS', + 'task4 RUNNING', + 'task4', + 'task4 SUCCESS', + 'flow SUCCESS'] + self.assertEqual(self.values, expected) + + def test_unordered_flow_task_fails_parallel_tasks_should_be_reverted(self): + flow = uf.Flow('flow-1', retry.Times(3, 'r', provides='x')).add( + utils.SaveOrderTask("task1"), + utils.ConditionalTask("task2") + ) + engine = self._make_engine(flow) + engine.storage.inject({'y': 2}) + engine.run() + self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2}) + expected = ['task2', + 'task1', + u'task2 reverted(Failure: RuntimeError: Woot!)', + 'task1 reverted(5)', + 'task2', + 'task1'] + self.assertIsContainsSameElements(self.values, expected) + + def test_nested_flow_reverts_parent_retries(self): + retry1 = retry.Times(3, 'r1', provides='x') + retry2 = retry.Times(0, 'r2', provides='x2') + + flow = lf.Flow('flow-1', retry1).add( + utils.SaveOrderTask("task1"), + lf.Flow('flow-2', retry2).add(utils.ConditionalTask("task2")) + ) + engine = self._make_engine(flow) + engine.storage.inject({'y': 2}) + utils.register_notifiers(engine, self.values) + engine.run() + self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2, 'x2': 1}) + expected = ['flow RUNNING', + 'r1 RUNNING', + 'r1 SUCCESS', + 'task1 RUNNING', + 'task1', + 'task1 SUCCESS', + 'r2 RUNNING', + 'r2 SUCCESS', + 'task2 RUNNING', + 'task2', + 'task2 FAILURE', + 'task2 REVERTING', + u'task2 reverted(Failure: RuntimeError: Woot!)', + 'task2 REVERTED', + 'r2 REVERTING', + 'r2 REVERTED', + 'task1 REVERTING', + 'task1 reverted(5)', + 'task1 REVERTED', + 'r1 RETRYING', + 'task1 PENDING', + 'r2 PENDING', + 'task2 PENDING', + 'r1 RUNNING', + 'r1 SUCCESS', + 'task1 RUNNING', + 'task1', + 'task1 SUCCESS', + 'r2 RUNNING', + 'r2 SUCCESS', + 'task2 RUNNING', + 'task2', + 'task2 SUCCESS', + 'flow SUCCESS'] + self.assertEqual(self.values, expected) + + def test_revert_all_retry(self): + flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x')).add( + utils.SaveOrderTask("task1"), + lf.Flow('flow-2', retry.AlwaysRevertAll('r2')).add( + utils.ConditionalTask("task2")) + ) + engine = self._make_engine(flow) + engine.storage.inject({'y': 2}) + utils.register_notifiers(engine, self.values) + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + self.assertEqual(engine.storage.fetch_all(), {'y': 2}) + expected = ['flow RUNNING', + 'r1 RUNNING', + 'r1 SUCCESS', + 'task1 RUNNING', + 'task1', + 'task1 SUCCESS', + 'r2 RUNNING', + 'r2 SUCCESS', + 'task2 RUNNING', + 'task2', + 'task2 FAILURE', + 'task2 REVERTING', + u'task2 reverted(Failure: RuntimeError: Woot!)', + 'task2 REVERTED', + 'r2 REVERTING', + 'r2 REVERTED', + 'task1 REVERTING', + 'task1 reverted(5)', + 'task1 REVERTED', + 'r1 REVERTING', + 'r1 REVERTED', + 'flow REVERTED'] + self.assertEqual(self.values, expected) + + def test_restart_reverted_flow_with_retry(self): + flow = lf.Flow('test', retry=utils.OneReturnRetry(provides='x')).add( + utils.FailingTask('fail')) + engine = self._make_engine(flow) + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + + def test_run_just_retry(self): + flow = utils.OneReturnRetry(provides='x') + engine = self._make_engine(flow) + self.assertRaisesRegexp(TypeError, 'Retry controller', engine.run) + + def test_use_retry_as_a_task(self): + flow = lf.Flow('test').add(utils.OneReturnRetry(provides='x')) + engine = self._make_engine(flow) + self.assertRaisesRegexp(TypeError, 'Retry controller', engine.run) + + def test_resume_flow_that_had_been_interrupted_during_retrying(self): + flow = lf.Flow('flow-1', retry.Times(3, 'r1')).add( + utils.SaveOrderTask('t1'), + utils.SaveOrderTask('t2'), + utils.SaveOrderTask('t3') + ) + engine = self._make_engine(flow) + engine.compile() + utils.register_notifiers(engine, self.values) + engine.storage.set_task_state('r1', st.RETRYING) + engine.storage.set_task_state('t1', st.PENDING) + engine.storage.set_task_state('t2', st.REVERTED) + engine.storage.set_task_state('t3', st.REVERTED) + + engine.run() + expected = ['flow RUNNING', + 't2 PENDING', + 't3 PENDING', + 'r1 RUNNING', + 'r1 SUCCESS', + 't1 RUNNING', + 't1', + 't1 SUCCESS', + 't2 RUNNING', + 't2', + 't2 SUCCESS', + 't3 RUNNING', + 't3', + 't3 SUCCESS', + 'flow SUCCESS'] + self.assertEqual(self.values, expected) + + def test_resume_flow_that_should_be_retried(self): + flow = lf.Flow('flow-1', retry.Times(3, 'r1')).add( + utils.SaveOrderTask('t1'), + utils.SaveOrderTask('t2') + ) + engine = self._make_engine(flow) + engine.compile() + utils.register_notifiers(engine, self.values) + engine.storage.set_atom_intention('r1', st.RETRY) + engine.storage.set_task_state('r1', st.SUCCESS) + engine.storage.set_task_state('t1', st.REVERTED) + engine.storage.set_task_state('t2', st.REVERTED) + + engine.run() + expected = ['flow RUNNING', + 'r1 RETRYING', + 't1 PENDING', + 't2 PENDING', + 'r1 RUNNING', + 'r1 SUCCESS', + 't1 RUNNING', + 't1', + 't1 SUCCESS', + 't2 RUNNING', + 't2', + 't2 SUCCESS', + 'flow SUCCESS'] + self.assertEqual(self.values, expected) + + def test_retry_tasks_that_has_not_been_reverted(self): + flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x')).add( + utils.ConditionalTask('c'), + utils.SaveOrderTask('t1') + ) + engine = self._make_engine(flow) + engine.storage.inject({'y': 2}) + engine.run() + expected = ['c', + u'c reverted(Failure: RuntimeError: Woot!)', + 'c', + 't1'] + self.assertEqual(self.values, expected) + + def test_default_times_retry(self): + flow = lf.Flow('flow-1', retry.Times(3, 'r1')).add( + utils.SaveOrderTask('t1'), + utils.FailingTask('t2')) + engine = self._make_engine(flow) + + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['t1', + u't2 reverted(Failure: RuntimeError: Woot!)', + 't1 reverted(5)', + 't1', + u't2 reverted(Failure: RuntimeError: Woot!)', + 't1 reverted(5)', + 't1', + u't2 reverted(Failure: RuntimeError: Woot!)', + 't1 reverted(5)'] + self.assertEqual(self.values, expected) + + def test_for_each_with_list(self): + collection = [3, 2, 3, 5] + retry1 = retry.ForEach(collection, 'r1', provides='x') + flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1')) + engine = self._make_engine(flow) + + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = [u't1 reverted(Failure: RuntimeError: Woot with 3)', + u't1 reverted(Failure: RuntimeError: Woot with 2)', + u't1 reverted(Failure: RuntimeError: Woot with 3)', + u't1 reverted(Failure: RuntimeError: Woot with 5)'] + self.assertEqual(self.values, expected) + + def test_for_each_with_set(self): + collection = ([3, 2, 5]) + retry1 = retry.ForEach(collection, 'r1', provides='x') + flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1')) + engine = self._make_engine(flow) + + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = [u't1 reverted(Failure: RuntimeError: Woot with 3)', + u't1 reverted(Failure: RuntimeError: Woot with 2)', + u't1 reverted(Failure: RuntimeError: Woot with 5)'] + self.assertIsContainsSameElements(self.values, expected) + + def test_for_each_empty_collection(self): + values = [] + retry1 = retry.ForEach(values, 'r1', provides='x') + flow = lf.Flow('flow-1', retry1).add(utils.ConditionalTask('t1')) + engine = self._make_engine(flow) + engine.storage.inject({'y': 1}) + self.assertRaisesRegexp(exc.NotFound, '^No elements left', engine.run) + + def test_parameterized_for_each_with_list(self): + values = [3, 2, 5] + retry1 = retry.ParameterizedForEach('r1', provides='x') + flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1')) + engine = self._make_engine(flow) + engine.storage.inject({'values': values, 'y': 1}) + + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = [u't1 reverted(Failure: RuntimeError: Woot with 3)', + u't1 reverted(Failure: RuntimeError: Woot with 2)', + u't1 reverted(Failure: RuntimeError: Woot with 5)'] + self.assertEqual(self.values, expected) + + def test_parameterized_for_each_with_set(self): + values = ([3, 2, 5]) + retry1 = retry.ParameterizedForEach('r1', provides='x') + flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1')) + engine = self._make_engine(flow) + engine.storage.inject({'values': values, 'y': 1}) + + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = [u't1 reverted(Failure: RuntimeError: Woot with 3)', + u't1 reverted(Failure: RuntimeError: Woot with 2)', + u't1 reverted(Failure: RuntimeError: Woot with 5)'] + self.assertIsContainsSameElements(self.values, expected) + + def test_parameterized_for_each_empty_collection(self): + values = [] + retry1 = retry.ParameterizedForEach('r1', provides='x') + flow = lf.Flow('flow-1', retry1).add(utils.ConditionalTask('t1')) + engine = self._make_engine(flow) + engine.storage.inject({'values': values, 'y': 1}) + self.assertRaisesRegexp(exc.NotFound, '^No elements left', engine.run) + + def test_retry_after_failure_before_processig_failure(self): + flow = uf.Flow('flow-1', retry.Times(3, provides='x')).add( + utils.SaveOrderTask('task1')) + engine = self._make_engine(flow) + engine.compile() + # imagine we run engine + engine.storage.set_flow_state(st.RUNNING) + engine.storage.set_atom_intention('flow-1_retry', st.EXECUTE) + engine.storage.set_atom_intention('task1', st.EXECUTE) + # we execute retry + engine.storage.save('flow-1_retry', 1) + # task fails (if we comment it out, it works) + engine.storage.save('task1', + misc.Failure.from_exception(RuntimeError('foo')), + state=st.FAILURE) + # then process die and we resume engine + engine.run() + + def test_retry_fails(self): + + class FailingRetry(retry.Retry): + + def execute(self, **kwargs): + raise ValueError('OMG I FAILED') + + def revert(self, history, **kwargs): + self.history = history + + def on_failure(self, **kwargs): + return retry.REVERT + + r = FailingRetry() + flow = lf.Flow('testflow', r) + self.assertRaisesRegexp(ValueError, '^OMG', + self._make_engine(flow).run) + self.assertEqual(len(r.history), 1) + self.assertEqual(r.history[0][1], {}) + self.assertEqual(isinstance(r.history[0][0], misc.Failure), True) + + +class RetryParallelExecutionTest(utils.EngineTestBase): + + def test_when_subflow_fails_revert_running_tasks(self): + waiting_task = utils.WaitForOneFromTask('task1', 'task2', + [st.SUCCESS, st.FAILURE]) + flow = uf.Flow('flow-1', retry.Times(3, 'r', provides='x')).add( + waiting_task, + utils.ConditionalTask('task2') + ) + engine = self._make_engine(flow) + engine.task_notifier.register('*', waiting_task.callback) + engine.storage.inject({'y': 2}) + engine.run() + self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2}) + expected = ['task2', + 'task1', + u'task2 reverted(Failure: RuntimeError: Woot!)', + 'task1 reverted(5)', + 'task2', + 'task1'] + self.assertIsContainsSameElements(self.values, expected) + + def test_when_subflow_fails_revert_success_tasks(self): + waiting_task = utils.WaitForOneFromTask('task2', 'task1', + [st.SUCCESS, st.FAILURE]) + flow = uf.Flow('flow-1', retry.Times(3, 'r', provides='x')).add( + utils.SaveOrderTask('task1'), + lf.Flow('flow-2').add( + waiting_task, + utils.ConditionalTask('task3')) + ) + engine = self._make_engine(flow) + engine.task_notifier.register('*', waiting_task.callback) + engine.storage.inject({'y': 2}) + engine.run() + self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2}) + expected = ['task1', + 'task2', + 'task3', + u'task3 reverted(Failure: RuntimeError: Woot!)', + 'task1 reverted(5)', + 'task2 reverted(5)', + 'task1', + 'task2', + 'task3'] + self.assertIsContainsSameElements(self.values, expected) + class SingleThreadedEngineTest(RetryTest, test.TestCase): @@ -61,6 +646,7 @@ class SingleThreadedEngineTest(RetryTest, class MultiThreadedEngineTest(RetryTest, + RetryParallelExecutionTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): engine_conf = dict(engine='parallel', @@ -68,17 +654,3 @@ class MultiThreadedEngineTest(RetryTest, return taskflow.engines.load(flow, flow_detail=flow_detail, engine_conf=engine_conf, backend=self.backend) - - -@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') -class ParallelEngineWithEventletTest(RetryTest, - test.TestCase): - - def _make_engine(self, flow, flow_detail=None, executor=None): - if executor is None: - executor = eu.GreenExecutor() - engine_conf = dict(engine='parallel', - executor=executor) - return taskflow.engines.load(flow, flow_detail=flow_detail, - engine_conf=engine_conf, - backend=self.backend) diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index 3d6bc5bef..d26fcb455 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -81,8 +81,8 @@ class StorageTest(test.TestCase): s.ensure_task('my task2') s.save('my task', 'foo') expected = { - 'my task': states.SUCCESS, - 'my task2': states.PENDING, + 'my task': (states.SUCCESS, states.EXECUTE), + 'my task2': (states.PENDING, states.EXECUTE), } self.assertEqual(s.get_tasks_states(['my task', 'my task2']), expected) @@ -542,7 +542,7 @@ class StorageTest(test.TestCase): s.save('my retry', 'a') s.save('my retry', fail, states.FAILURE) history = s.get_retry_history('my retry') - self.assertEqual(history, [('a', {})]) + self.assertEqual(history, [('a', {}), (fail, {})]) self.assertIs(s.has_failures(), True) self.assertEqual(s.get_failures(), {'my retry': fail}) diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index 658c98f4d..4e9d321b3 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -33,7 +33,7 @@ class TestWorker(test.MockTestCase): self.exchange = 'test-exchange' self.topic = 'test-topic' self.threads_count = 5 - self.endpoint_count = 18 + self.endpoint_count = 21 # patch classes self.executor_mock, self.executor_inst_mock = self._patch_class( diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index d287a4b11..6c4d6ae3a 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -16,7 +16,9 @@ import contextlib +import collections import six +import threading from taskflow import exceptions from taskflow.persistence.backends import impl_memory @@ -97,6 +99,23 @@ class ProvidesRequiresTask(task.Task): return dict((k, k) for k in self.provides) +def task_callback(state, values, details): + name = details.get('task_name', None) + if not name: + name = details.get('retry_name', '') + values.append('%s %s' % (name, state)) + + +def flow_callback(state, values, details): + values.append('flow %s' % state) + + +def register_notifiers(engine, values): + engine.notifier.register('*', flow_callback, kwargs={'values': values}) + engine.task_notifier.register('*', task_callback, + kwargs={'values': values}) + + class SaveOrderTask(task.Task): def __init__(self, name=None, *args, **kwargs): @@ -137,6 +156,11 @@ class ProgressingTask(task.Task): return 5 +class FailingTaskWithOneArg(SaveOrderTask): + def execute(self, x, **kwargs): + raise RuntimeError('Woot with %s' % x) + + class NastyTask(task.Task): def execute(self, **kwargs): @@ -223,7 +247,7 @@ class TaskMultiArgMultiReturn(task.Task): pass -class TaskMultiDictk(task.Task): +class TaskMultiDict(task.Task): def execute(self): output = {} @@ -279,3 +303,36 @@ class OneReturnRetry(retry.AlwaysRevert): def revert(self, **kwargs): pass + + +class ConditionalTask(SaveOrderTask): + + def execute(self, x, y): + super(ConditionalTask, self).execute() + if x != y: + raise RuntimeError('Woot!') + + +class WaitForOneFromTask(SaveOrderTask): + + def __init__(self, name, wait_for, wait_states, **kwargs): + super(WaitForOneFromTask, self).__init__(name, **kwargs) + if not isinstance(wait_for, collections.Iterable): + self.wait_for = [wait_for] + else: + self.wait_for = wait_for + if not isinstance(wait_states, collections.Iterable): + self.wait_states = [wait_states] + else: + self.wait_states = wait_states + self.event = threading.Event() + + def execute(self): + self.event.wait() + return super(WaitForOneFromTask, self).execute() + + def callback(self, state, details): + name = details.get('task_name', None) + if name not in self.wait_for or state not in self.wait_states: + return + self.event.set() diff --git a/taskflow/utils/flow_utils.py b/taskflow/utils/flow_utils.py index 47abb53f1..7fdb103df 100644 --- a/taskflow/utils/flow_utils.py +++ b/taskflow/utils/flow_utils.py @@ -24,6 +24,7 @@ from taskflow import flow from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf +from taskflow import retry from taskflow import task from taskflow.utils import graph_utils as gu from taskflow.utils import lock_utils as lu @@ -85,6 +86,9 @@ class Flattener(object): return self._flatten_graph elif isinstance(item, task.BaseTask): return self._flatten_task + elif isinstance(item, retry.Retry): + raise TypeError("Retry controller %s (%s) is used not as a flow " + "parameter" % (item, type(item))) else: return None