diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index d0ed8bbb..c7bad3b7 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -16,9 +16,9 @@ from taskflow.engines.action_engine import executor as ex from taskflow import exceptions as excp -from taskflow import retry as r +from taskflow import retry as retry_atom from taskflow import states as st -from taskflow import task +from taskflow import task as task_atom from taskflow.utils import misc @@ -49,9 +49,9 @@ class FutureGraphAction(object): def _schedule_node(self, node): """Schedule a single node for execution.""" - if isinstance(node, task.BaseTask): + if isinstance(node, task_atom.BaseTask): return self._schedule_task(node) - elif isinstance(node, r.Retry): + elif isinstance(node, retry_atom.Retry): return self._schedule_retry(node) else: raise TypeError("Unknown how to schedule node %s" % node) @@ -108,7 +108,7 @@ class FutureGraphAction(object): for future in done: try: node, event, result = future.result() - if isinstance(node, task.BaseTask): + if isinstance(node, task_atom.BaseTask): self._complete_task(node, event, result) if isinstance(result, misc.Failure): if event == ex.EXECUTED: @@ -185,15 +185,15 @@ class FutureGraphAction(object): if retry: # Ask retry controller what to do in case of failure action = self._retry_action.on_failure(retry, atom, failure) - if action == r.RETRY: + if action == retry_atom.RETRY: # Prepare subflow for revert self._storage.set_atom_intention(retry.name, st.RETRY) for node in self._analyzer.iterate_subgraph(retry): self._storage.set_atom_intention(node.name, st.REVERT) - elif action == r.REVERT: + elif action == retry_atom.REVERT: # Ask parent checkpoint self._process_atom_failure(retry, failure) - elif action == r.REVERT_ALL: + elif action == retry_atom.REVERT_ALL: # Prepare all flow for revert self._revert_all() else: @@ -217,9 +217,9 @@ class FutureGraphAction(object): def _reset_nodes(self, nodes_iter, intention=st.EXECUTE): for node in nodes_iter: - if isinstance(node, task.BaseTask): + if isinstance(node, task_atom.BaseTask): self._task_action.change_state(node, st.PENDING, progress=0.0) - elif isinstance(node, r.Retry): + elif isinstance(node, retry_atom.Retry): self._retry_action.change_state(node, st.PENDING) else: raise TypeError("Unknown how to reset node %s" % node) diff --git a/taskflow/retry.py b/taskflow/retry.py index b02279b3..b3d435b3 100644 --- a/taskflow/retry.py +++ b/taskflow/retry.py @@ -26,18 +26,45 @@ from taskflow.utils import misc LOG = logging.getLogger(__name__) -# Retry actions +# Decision results. REVERT = "REVERT" REVERT_ALL = "REVERT_ALL" RETRY = "RETRY" @six.add_metaclass(abc.ABCMeta) -class Retry(atom.Atom): - """A base class for retry that controls subflow execution. - Retry can be executed multiple times and reverted. On subflow - failure it makes a decision about what should be done with the flow - (retry, revert to the previous retry, revert the whole flow, etc.). +class Decider(object): + """A base class or mixin for an object that can decide how to resolve + execution failures. + + A decider may be executed multiple times on subflow or other atom + failure and it is expected to make a decision about what should be done + to resolve the failure (retry, revert to the previous retry, revert + the whole flow, etc.). + """ + + @abc.abstractmethod + def on_failure(self, history, *args, **kwargs): + """On subflow failure makes a decision about the future flow + execution using information about prior previous failures (if this + historical failure information is not available or was not persisted + this history will be empty). + + Returns retry action constant: + * 'RETRY' when subflow must be reverted and restarted again (maybe + with new parameters). + * 'REVERT' when this subflow must be completely reverted and parent + subflow should make a decision about the flow execution. + * 'REVERT_ALL' in a case when the whole flow must be reverted and + marked as FAILURE. + """ + + +@six.add_metaclass(abc.ABCMeta) +class Retry(atom.Atom, Decider): + """A base class for a retry object that decides how to resolve subflow + execution failures and may also provide execute and revert methods to alter + the inputs of subflow atoms. """ default_provides = None @@ -78,19 +105,6 @@ class Retry(atom.Atom): all subflow's tasks will be reverted before the retry. """ - @abc.abstractmethod - def on_failure(self, history, *args, **kwargs): - """On subflow failure makes a decision about the future flow - execution using information about all previous failures. - Returns retry action constant: - 'RETRY' when subflow must be reverted and restarted again (maybe - with new parameters). - 'REVERT' when this subflow must be completely reverted and parent - subflow should make a decision about the flow execution. - 'REVERT_ALL' in a case when the whole flow must be reverted and - marked as FAILURE. - """ - class AlwaysRevert(Retry): """Retry that always reverts subflow."""