diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 1dcb326b..ee988c4a 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -18,6 +18,7 @@ import abc import weakref from oslo_utils import reflection +from oslo_utils import strutils import six from taskflow.engines.action_engine import compiler as co @@ -178,6 +179,20 @@ class Completer(object): elif strategy == retry_atom.REVERT: # Ask parent retry and figure out what to do... parent_resolver = self._determine_resolution(retry, failure) + + # In the future, this will be the only behavior. REVERT + # should defer to the parent retry if it exists, or use the + # default REVERT_ALL if it doesn't. This lets you safely nest + # flows with retries inside flows without retries and it still + # behave as a user would expect, i.e. if the retry gets + # exhausted it reverts the outer flow unless the outer flow + # has a separate retry behavior. + defer_reverts = strutils.bool_from_string( + self._runtime.options.get('defer_reverts', False) + ) + if defer_reverts: + return parent_resolver + # Ok if the parent resolver says something not REVERT, and # it isn't just using the undefined resolver, assume the # parent knows best. diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 74e150c1..845b702a 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -319,11 +319,13 @@ class ActionEngine(base.Engine): if self._compiled: return self._compilation = self._compiler.compile() + self._runtime = runtime.Runtime(self._compilation, self.storage, self.atom_notifier, self._task_executor, - self._retry_executor) + self._retry_executor, + options=self._options) self._runtime.compile() self._compiled = True diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index dc9aa276..719f7563 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -42,13 +42,14 @@ class Runtime(object): """ def __init__(self, compilation, storage, atom_notifier, - task_executor, retry_executor): + task_executor, retry_executor, options=None): self._atom_notifier = atom_notifier self._task_executor = task_executor self._retry_executor = retry_executor self._storage = storage self._compilation = compilation self._atom_cache = {} + self._options = misc.ensure_dict(options) @staticmethod def _walk_edge_deciders(graph, atom): @@ -130,6 +131,10 @@ class Runtime(object): def storage(self): return self._storage + @property + def options(self): + return self._options + @misc.cachedproperty def analyzer(self): return an.Analyzer(self) diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index a500dd47..5330fc18 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -21,6 +21,7 @@ from debtcollector import moves import six from taskflow.types import notifier +from taskflow.utils import misc @six.add_metaclass(abc.ABCMeta) @@ -41,10 +42,7 @@ class Engine(object): self._flow = flow self._flow_detail = flow_detail self._backend = backend - if not options: - self._options = {} - else: - self._options = dict(options) + self._options = misc.ensure_dict(options) self._notifier = notifier.Notifier() self._atom_notifier = notifier.Notifier() diff --git a/taskflow/retry.py b/taskflow/retry.py index 93991326..aa9208e3 100644 --- a/taskflow/retry.py +++ b/taskflow/retry.py @@ -34,13 +34,25 @@ class Decision(misc.StrEnum): This strategy first consults the parent atom before reverting the associated subflow to determine if the parent retry object provides a - different reconciliation strategy (if no parent retry object exists - then reverting will proceed, if one does exist the parent retry may - override this reconciliation strategy with its own). + different reconciliation strategy. This allows for safe nesting of + flows with different retry strategies. + + If the parent flow has no retry strategy, the default behavior is + to just revert the atoms in the associated subflow. This is + generally not the desired behavior, but is left as the default in + order to keep backwards-compatibility. The ``defer_reverts`` + engine option will let you change this behavior. If that is set + to True, a REVERT will always defer to the parent, meaning that + if the parent has no retry strategy, it will be reverted as well. """ - #: Completely reverts the whole flow. REVERT_ALL = "REVERT_ALL" + """Reverts the entire flow, regardless of parent strategy. + + This strategy will revert every atom that has executed thus + far, regardless of whether the parent flow has a separate + retry strategy associated with it. + """ #: Retries the surrounding/associated subflow again. RETRY = "RETRY" diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 6dc01851..5f6a22e1 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -202,6 +202,69 @@ class RetryTest(utils.EngineTestBase): 'flow-1.f SUCCESS'] self.assertEqual(expected, capturer.values) + def test_new_revert_vs_old(self): + flow = lf.Flow('flow-1').add( + utils.TaskNoRequiresNoReturns("task1"), + lf.Flow('flow-2', retry.Times(1, 'r1', provides='x')).add( + utils.TaskNoRequiresNoReturns("task2"), + utils.ConditionalTask("task3") + ), + utils.TaskNoRequiresNoReturns("task4") + ) + engine = self._make_engine(flow) + engine.storage.inject({'y': 2}) + with utils.CaptureListener(engine) as capturer: + try: + engine.run() + except Exception: + pass + + expected = ['flow-1.f RUNNING', + 'task1.t RUNNING', + 'task1.t SUCCESS(None)', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'task2.t RUNNING', + 'task2.t SUCCESS(None)', + 'task3.t RUNNING', + 'task3.t FAILURE(Failure: RuntimeError: Woot!)', + 'task3.t REVERTING', + 'task3.t REVERTED(None)', + 'task2.t REVERTING', + 'task2.t REVERTED(None)', + 'r1.r REVERTING', + 'r1.r REVERTED(None)', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + + engine = self._make_engine(flow, defer_reverts=True) + engine.storage.inject({'y': 2}) + with utils.CaptureListener(engine) as capturer: + try: + engine.run() + except Exception: + pass + + expected = ['flow-1.f RUNNING', + 'task1.t RUNNING', + 'task1.t SUCCESS(None)', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'task2.t RUNNING', + 'task2.t SUCCESS(None)', + 'task3.t RUNNING', + 'task3.t FAILURE(Failure: RuntimeError: Woot!)', + 'task3.t REVERTING', + 'task3.t REVERTED(None)', + 'task2.t REVERTING', + 'task2.t REVERTED(None)', + 'r1.r REVERTING', + 'r1.r REVERTED(None)', + 'task1.t REVERTING', + 'task1.t REVERTED(None)', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + def test_states_retry_failure_parent_flow_fails(self): flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x1')).add( utils.TaskNoRequiresNoReturns("task1"), @@ -1210,11 +1273,12 @@ class RetryParallelExecutionTest(utils.EngineTestBase): class SerialEngineTest(RetryTest, test.TestCase): - def _make_engine(self, flow, flow_detail=None): + def _make_engine(self, flow, defer_reverts=None, flow_detail=None): return taskflow.engines.load(flow, flow_detail=flow_detail, engine='serial', - backend=self.backend) + backend=self.backend, + defer_reverts=defer_reverts) class ParallelEngineWithThreadsTest(RetryTest, @@ -1222,36 +1286,46 @@ class ParallelEngineWithThreadsTest(RetryTest, test.TestCase): _EXECUTOR_WORKERS = 2 - def _make_engine(self, flow, flow_detail=None, executor=None): + def _make_engine(self, flow, defer_reverts=None, flow_detail=None, + executor=None): if executor is None: executor = 'threads' - return taskflow.engines.load(flow, flow_detail=flow_detail, + return taskflow.engines.load(flow, + flow_detail=flow_detail, engine='parallel', backend=self.backend, executor=executor, - max_workers=self._EXECUTOR_WORKERS) + max_workers=self._EXECUTOR_WORKERS, + defer_reverts=defer_reverts) @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') class ParallelEngineWithEventletTest(RetryTest, test.TestCase): - def _make_engine(self, flow, flow_detail=None, executor=None): + def _make_engine(self, flow, defer_reverts=None, flow_detail=None, + executor=None): if executor is None: executor = futurist.GreenThreadPoolExecutor() self.addCleanup(executor.shutdown) - return taskflow.engines.load(flow, flow_detail=flow_detail, - backend=self.backend, engine='parallel', - executor=executor) + return taskflow.engines.load(flow, + flow_detail=flow_detail, + backend=self.backend, + engine='parallel', + executor=executor, + defer_reverts=defer_reverts) class ParallelEngineWithProcessTest(RetryTest, test.TestCase): _EXECUTOR_WORKERS = 2 - def _make_engine(self, flow, flow_detail=None, executor=None): + def _make_engine(self, flow, defer_reverts=None, flow_detail=None, + executor=None): if executor is None: executor = 'processes' - return taskflow.engines.load(flow, flow_detail=flow_detail, + return taskflow.engines.load(flow, + flow_detail=flow_detail, engine='parallel', backend=self.backend, executor=executor, - max_workers=self._EXECUTOR_WORKERS) + max_workers=self._EXECUTOR_WORKERS, + defer_reverts=defer_reverts) diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index 64bb2330..6ea9f4fb 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -340,3 +340,27 @@ class TestIterable(test.TestCase): def test_dict(self): self.assertTrue(misc.is_iterable(dict())) + + +class TestEnsureDict(testscenarios.TestWithScenarios): + scenarios = [ + ('none', {'original': None, 'expected': {}}), + ('empty_dict', {'original': {}, 'expected': {}}), + ('empty_list', {'original': [], 'expected': {}}), + ('dict', {'original': {'a': 1, 'b': 2}, 'expected': {'a': 1, 'b': 2}}), + ] + + def test_expected(self): + self.assertEqual(self.expected, misc.ensure_dict(self.original)) + self.assertFalse(self.expected is misc.ensure_dict(self.original)) + + +class TestEnsureDictRaises(testscenarios.TestWithScenarios): + scenarios = [ + ('list', {'original': [1, 2], 'exception': TypeError}), + ('tuple', {'original': (1, 2), 'exception': TypeError}), + ('set', {'original': set([1, 2]), 'exception': TypeError}), + ] + + def test_exceptions(self): + self.assertRaises(self.exception, misc.ensure_dict, self.original) diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index ca8faa5e..e837a426 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -595,3 +595,11 @@ def is_iterable(obj): """ return (not isinstance(obj, six.string_types) and isinstance(obj, collections.Iterable)) + + +def ensure_dict(obj): + """Copy an existing dictionary or default to empty dict....""" + if not obj: + return {} + # default to a shallow copy to avoid most ownership issues + return dict(obj)