Merge "Factor out the on_failure to a mixin type"
This commit is contained in:
@@ -16,9 +16,9 @@
|
|||||||
|
|
||||||
from taskflow.engines.action_engine import executor as ex
|
from taskflow.engines.action_engine import executor as ex
|
||||||
from taskflow import exceptions as excp
|
from taskflow import exceptions as excp
|
||||||
from taskflow import retry as r
|
from taskflow import retry as retry_atom
|
||||||
from taskflow import states as st
|
from taskflow import states as st
|
||||||
from taskflow import task
|
from taskflow import task as task_atom
|
||||||
from taskflow.utils import misc
|
from taskflow.utils import misc
|
||||||
|
|
||||||
|
|
||||||
@@ -49,9 +49,9 @@ class FutureGraphAction(object):
|
|||||||
|
|
||||||
def _schedule_node(self, node):
|
def _schedule_node(self, node):
|
||||||
"""Schedule a single node for execution."""
|
"""Schedule a single node for execution."""
|
||||||
if isinstance(node, task.BaseTask):
|
if isinstance(node, task_atom.BaseTask):
|
||||||
return self._schedule_task(node)
|
return self._schedule_task(node)
|
||||||
elif isinstance(node, r.Retry):
|
elif isinstance(node, retry_atom.Retry):
|
||||||
return self._schedule_retry(node)
|
return self._schedule_retry(node)
|
||||||
else:
|
else:
|
||||||
raise TypeError("Unknown how to schedule node %s" % node)
|
raise TypeError("Unknown how to schedule node %s" % node)
|
||||||
@@ -108,7 +108,7 @@ class FutureGraphAction(object):
|
|||||||
for future in done:
|
for future in done:
|
||||||
try:
|
try:
|
||||||
node, event, result = future.result()
|
node, event, result = future.result()
|
||||||
if isinstance(node, task.BaseTask):
|
if isinstance(node, task_atom.BaseTask):
|
||||||
self._complete_task(node, event, result)
|
self._complete_task(node, event, result)
|
||||||
if isinstance(result, misc.Failure):
|
if isinstance(result, misc.Failure):
|
||||||
if event == ex.EXECUTED:
|
if event == ex.EXECUTED:
|
||||||
@@ -185,15 +185,15 @@ class FutureGraphAction(object):
|
|||||||
if retry:
|
if retry:
|
||||||
# Ask retry controller what to do in case of failure
|
# Ask retry controller what to do in case of failure
|
||||||
action = self._retry_action.on_failure(retry, atom, failure)
|
action = self._retry_action.on_failure(retry, atom, failure)
|
||||||
if action == r.RETRY:
|
if action == retry_atom.RETRY:
|
||||||
# Prepare subflow for revert
|
# Prepare subflow for revert
|
||||||
self._storage.set_atom_intention(retry.name, st.RETRY)
|
self._storage.set_atom_intention(retry.name, st.RETRY)
|
||||||
for node in self._analyzer.iterate_subgraph(retry):
|
for node in self._analyzer.iterate_subgraph(retry):
|
||||||
self._storage.set_atom_intention(node.name, st.REVERT)
|
self._storage.set_atom_intention(node.name, st.REVERT)
|
||||||
elif action == r.REVERT:
|
elif action == retry_atom.REVERT:
|
||||||
# Ask parent checkpoint
|
# Ask parent checkpoint
|
||||||
self._process_atom_failure(retry, failure)
|
self._process_atom_failure(retry, failure)
|
||||||
elif action == r.REVERT_ALL:
|
elif action == retry_atom.REVERT_ALL:
|
||||||
# Prepare all flow for revert
|
# Prepare all flow for revert
|
||||||
self._revert_all()
|
self._revert_all()
|
||||||
else:
|
else:
|
||||||
@@ -217,9 +217,9 @@ class FutureGraphAction(object):
|
|||||||
|
|
||||||
def _reset_nodes(self, nodes_iter, intention=st.EXECUTE):
|
def _reset_nodes(self, nodes_iter, intention=st.EXECUTE):
|
||||||
for node in nodes_iter:
|
for node in nodes_iter:
|
||||||
if isinstance(node, task.BaseTask):
|
if isinstance(node, task_atom.BaseTask):
|
||||||
self._task_action.change_state(node, st.PENDING, progress=0.0)
|
self._task_action.change_state(node, st.PENDING, progress=0.0)
|
||||||
elif isinstance(node, r.Retry):
|
elif isinstance(node, retry_atom.Retry):
|
||||||
self._retry_action.change_state(node, st.PENDING)
|
self._retry_action.change_state(node, st.PENDING)
|
||||||
else:
|
else:
|
||||||
raise TypeError("Unknown how to reset node %s" % node)
|
raise TypeError("Unknown how to reset node %s" % node)
|
||||||
|
|||||||
@@ -26,18 +26,45 @@ from taskflow.utils import misc
|
|||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Retry actions
|
# Decision results.
|
||||||
REVERT = "REVERT"
|
REVERT = "REVERT"
|
||||||
REVERT_ALL = "REVERT_ALL"
|
REVERT_ALL = "REVERT_ALL"
|
||||||
RETRY = "RETRY"
|
RETRY = "RETRY"
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class Retry(atom.Atom):
|
class Decider(object):
|
||||||
"""A base class for retry that controls subflow execution.
|
"""A base class or mixin for an object that can decide how to resolve
|
||||||
Retry can be executed multiple times and reverted. On subflow
|
execution failures.
|
||||||
failure it makes a decision about what should be done with the flow
|
|
||||||
(retry, revert to the previous retry, revert the whole flow, etc.).
|
A decider may be executed multiple times on subflow or other atom
|
||||||
|
failure and it is expected to make a decision about what should be done
|
||||||
|
to resolve the failure (retry, revert to the previous retry, revert
|
||||||
|
the whole flow, etc.).
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def on_failure(self, history, *args, **kwargs):
|
||||||
|
"""On subflow failure makes a decision about the future flow
|
||||||
|
execution using information about prior previous failures (if this
|
||||||
|
historical failure information is not available or was not persisted
|
||||||
|
this history will be empty).
|
||||||
|
|
||||||
|
Returns retry action constant:
|
||||||
|
* 'RETRY' when subflow must be reverted and restarted again (maybe
|
||||||
|
with new parameters).
|
||||||
|
* 'REVERT' when this subflow must be completely reverted and parent
|
||||||
|
subflow should make a decision about the flow execution.
|
||||||
|
* 'REVERT_ALL' in a case when the whole flow must be reverted and
|
||||||
|
marked as FAILURE.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
|
class Retry(atom.Atom, Decider):
|
||||||
|
"""A base class for a retry object that decides how to resolve subflow
|
||||||
|
execution failures and may also provide execute and revert methods to alter
|
||||||
|
the inputs of subflow atoms.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
default_provides = None
|
default_provides = None
|
||||||
@@ -78,19 +105,6 @@ class Retry(atom.Atom):
|
|||||||
all subflow's tasks will be reverted before the retry.
|
all subflow's tasks will be reverted before the retry.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def on_failure(self, history, *args, **kwargs):
|
|
||||||
"""On subflow failure makes a decision about the future flow
|
|
||||||
execution using information about all previous failures.
|
|
||||||
Returns retry action constant:
|
|
||||||
'RETRY' when subflow must be reverted and restarted again (maybe
|
|
||||||
with new parameters).
|
|
||||||
'REVERT' when this subflow must be completely reverted and parent
|
|
||||||
subflow should make a decision about the flow execution.
|
|
||||||
'REVERT_ALL' in a case when the whole flow must be reverted and
|
|
||||||
marked as FAILURE.
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class AlwaysRevert(Retry):
|
class AlwaysRevert(Retry):
|
||||||
"""Retry that always reverts subflow."""
|
"""Retry that always reverts subflow."""
|
||||||
|
|||||||
Reference in New Issue
Block a user