From cd922d4e466f27a470208ceba4beb8c1f4da2db3 Mon Sep 17 00:00:00 2001 From: Greg Hill Date: Tue, 10 Nov 2015 19:42:37 -0600 Subject: [PATCH] Add optional 'defer_reverts' behavior This makes it possible to REVERT a subflow and have it also revert the parent flow if the parent flow doesn't have its own retry strategy. We will probably want to make this new behavior the default or only behavior in a future release. Change-Id: Iea5ac366380ba7396a87d0185703549fb0c2f825 --- taskflow/engines/action_engine/completer.py | 15 ++++ taskflow/engines/action_engine/engine.py | 4 +- taskflow/engines/action_engine/runtime.py | 7 +- taskflow/engines/base.py | 6 +- taskflow/retry.py | 20 ++++- taskflow/tests/unit/test_retries.py | 98 ++++++++++++++++++--- taskflow/tests/unit/test_utils.py | 24 +++++ taskflow/utils/misc.py | 8 ++ 8 files changed, 160 insertions(+), 22 deletions(-) diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 1dcb326b..ee988c4a 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -18,6 +18,7 @@ import abc import weakref from oslo_utils import reflection +from oslo_utils import strutils import six from taskflow.engines.action_engine import compiler as co @@ -178,6 +179,20 @@ class Completer(object): elif strategy == retry_atom.REVERT: # Ask parent retry and figure out what to do... parent_resolver = self._determine_resolution(retry, failure) + + # In the future, this will be the only behavior. REVERT + # should defer to the parent retry if it exists, or use the + # default REVERT_ALL if it doesn't. This lets you safely nest + # flows with retries inside flows without retries and it still + # behave as a user would expect, i.e. if the retry gets + # exhausted it reverts the outer flow unless the outer flow + # has a separate retry behavior. + defer_reverts = strutils.bool_from_string( + self._runtime.options.get('defer_reverts', False) + ) + if defer_reverts: + return parent_resolver + # Ok if the parent resolver says something not REVERT, and # it isn't just using the undefined resolver, assume the # parent knows best. diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 74e150c1..845b702a 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -319,11 +319,13 @@ class ActionEngine(base.Engine): if self._compiled: return self._compilation = self._compiler.compile() + self._runtime = runtime.Runtime(self._compilation, self.storage, self.atom_notifier, self._task_executor, - self._retry_executor) + self._retry_executor, + options=self._options) self._runtime.compile() self._compiled = True diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index dc9aa276..719f7563 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -42,13 +42,14 @@ class Runtime(object): """ def __init__(self, compilation, storage, atom_notifier, - task_executor, retry_executor): + task_executor, retry_executor, options=None): self._atom_notifier = atom_notifier self._task_executor = task_executor self._retry_executor = retry_executor self._storage = storage self._compilation = compilation self._atom_cache = {} + self._options = misc.ensure_dict(options) @staticmethod def _walk_edge_deciders(graph, atom): @@ -130,6 +131,10 @@ class Runtime(object): def storage(self): return self._storage + @property + def options(self): + return self._options + @misc.cachedproperty def analyzer(self): return an.Analyzer(self) diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index a500dd47..5330fc18 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -21,6 +21,7 @@ from debtcollector import moves import six from taskflow.types import notifier +from taskflow.utils import misc @six.add_metaclass(abc.ABCMeta) @@ -41,10 +42,7 @@ class Engine(object): self._flow = flow self._flow_detail = flow_detail self._backend = backend - if not options: - self._options = {} - else: - self._options = dict(options) + self._options = misc.ensure_dict(options) self._notifier = notifier.Notifier() self._atom_notifier = notifier.Notifier() diff --git a/taskflow/retry.py b/taskflow/retry.py index 93991326..aa9208e3 100644 --- a/taskflow/retry.py +++ b/taskflow/retry.py @@ -34,13 +34,25 @@ class Decision(misc.StrEnum): This strategy first consults the parent atom before reverting the associated subflow to determine if the parent retry object provides a - different reconciliation strategy (if no parent retry object exists - then reverting will proceed, if one does exist the parent retry may - override this reconciliation strategy with its own). + different reconciliation strategy. This allows for safe nesting of + flows with different retry strategies. + + If the parent flow has no retry strategy, the default behavior is + to just revert the atoms in the associated subflow. This is + generally not the desired behavior, but is left as the default in + order to keep backwards-compatibility. The ``defer_reverts`` + engine option will let you change this behavior. If that is set + to True, a REVERT will always defer to the parent, meaning that + if the parent has no retry strategy, it will be reverted as well. """ - #: Completely reverts the whole flow. REVERT_ALL = "REVERT_ALL" + """Reverts the entire flow, regardless of parent strategy. + + This strategy will revert every atom that has executed thus + far, regardless of whether the parent flow has a separate + retry strategy associated with it. + """ #: Retries the surrounding/associated subflow again. RETRY = "RETRY" diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 6dc01851..5f6a22e1 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -202,6 +202,69 @@ class RetryTest(utils.EngineTestBase): 'flow-1.f SUCCESS'] self.assertEqual(expected, capturer.values) + def test_new_revert_vs_old(self): + flow = lf.Flow('flow-1').add( + utils.TaskNoRequiresNoReturns("task1"), + lf.Flow('flow-2', retry.Times(1, 'r1', provides='x')).add( + utils.TaskNoRequiresNoReturns("task2"), + utils.ConditionalTask("task3") + ), + utils.TaskNoRequiresNoReturns("task4") + ) + engine = self._make_engine(flow) + engine.storage.inject({'y': 2}) + with utils.CaptureListener(engine) as capturer: + try: + engine.run() + except Exception: + pass + + expected = ['flow-1.f RUNNING', + 'task1.t RUNNING', + 'task1.t SUCCESS(None)', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'task2.t RUNNING', + 'task2.t SUCCESS(None)', + 'task3.t RUNNING', + 'task3.t FAILURE(Failure: RuntimeError: Woot!)', + 'task3.t REVERTING', + 'task3.t REVERTED(None)', + 'task2.t REVERTING', + 'task2.t REVERTED(None)', + 'r1.r REVERTING', + 'r1.r REVERTED(None)', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + + engine = self._make_engine(flow, defer_reverts=True) + engine.storage.inject({'y': 2}) + with utils.CaptureListener(engine) as capturer: + try: + engine.run() + except Exception: + pass + + expected = ['flow-1.f RUNNING', + 'task1.t RUNNING', + 'task1.t SUCCESS(None)', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'task2.t RUNNING', + 'task2.t SUCCESS(None)', + 'task3.t RUNNING', + 'task3.t FAILURE(Failure: RuntimeError: Woot!)', + 'task3.t REVERTING', + 'task3.t REVERTED(None)', + 'task2.t REVERTING', + 'task2.t REVERTED(None)', + 'r1.r REVERTING', + 'r1.r REVERTED(None)', + 'task1.t REVERTING', + 'task1.t REVERTED(None)', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + def test_states_retry_failure_parent_flow_fails(self): flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x1')).add( utils.TaskNoRequiresNoReturns("task1"), @@ -1210,11 +1273,12 @@ class RetryParallelExecutionTest(utils.EngineTestBase): class SerialEngineTest(RetryTest, test.TestCase): - def _make_engine(self, flow, flow_detail=None): + def _make_engine(self, flow, defer_reverts=None, flow_detail=None): return taskflow.engines.load(flow, flow_detail=flow_detail, engine='serial', - backend=self.backend) + backend=self.backend, + defer_reverts=defer_reverts) class ParallelEngineWithThreadsTest(RetryTest, @@ -1222,36 +1286,46 @@ class ParallelEngineWithThreadsTest(RetryTest, test.TestCase): _EXECUTOR_WORKERS = 2 - def _make_engine(self, flow, flow_detail=None, executor=None): + def _make_engine(self, flow, defer_reverts=None, flow_detail=None, + executor=None): if executor is None: executor = 'threads' - return taskflow.engines.load(flow, flow_detail=flow_detail, + return taskflow.engines.load(flow, + flow_detail=flow_detail, engine='parallel', backend=self.backend, executor=executor, - max_workers=self._EXECUTOR_WORKERS) + max_workers=self._EXECUTOR_WORKERS, + defer_reverts=defer_reverts) @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') class ParallelEngineWithEventletTest(RetryTest, test.TestCase): - def _make_engine(self, flow, flow_detail=None, executor=None): + def _make_engine(self, flow, defer_reverts=None, flow_detail=None, + executor=None): if executor is None: executor = futurist.GreenThreadPoolExecutor() self.addCleanup(executor.shutdown) - return taskflow.engines.load(flow, flow_detail=flow_detail, - backend=self.backend, engine='parallel', - executor=executor) + return taskflow.engines.load(flow, + flow_detail=flow_detail, + backend=self.backend, + engine='parallel', + executor=executor, + defer_reverts=defer_reverts) class ParallelEngineWithProcessTest(RetryTest, test.TestCase): _EXECUTOR_WORKERS = 2 - def _make_engine(self, flow, flow_detail=None, executor=None): + def _make_engine(self, flow, defer_reverts=None, flow_detail=None, + executor=None): if executor is None: executor = 'processes' - return taskflow.engines.load(flow, flow_detail=flow_detail, + return taskflow.engines.load(flow, + flow_detail=flow_detail, engine='parallel', backend=self.backend, executor=executor, - max_workers=self._EXECUTOR_WORKERS) + max_workers=self._EXECUTOR_WORKERS, + defer_reverts=defer_reverts) diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index 64bb2330..6ea9f4fb 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -340,3 +340,27 @@ class TestIterable(test.TestCase): def test_dict(self): self.assertTrue(misc.is_iterable(dict())) + + +class TestEnsureDict(testscenarios.TestWithScenarios): + scenarios = [ + ('none', {'original': None, 'expected': {}}), + ('empty_dict', {'original': {}, 'expected': {}}), + ('empty_list', {'original': [], 'expected': {}}), + ('dict', {'original': {'a': 1, 'b': 2}, 'expected': {'a': 1, 'b': 2}}), + ] + + def test_expected(self): + self.assertEqual(self.expected, misc.ensure_dict(self.original)) + self.assertFalse(self.expected is misc.ensure_dict(self.original)) + + +class TestEnsureDictRaises(testscenarios.TestWithScenarios): + scenarios = [ + ('list', {'original': [1, 2], 'exception': TypeError}), + ('tuple', {'original': (1, 2), 'exception': TypeError}), + ('set', {'original': set([1, 2]), 'exception': TypeError}), + ] + + def test_exceptions(self): + self.assertRaises(self.exception, misc.ensure_dict, self.original) diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index ca8faa5e..e837a426 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -595,3 +595,11 @@ def is_iterable(obj): """ return (not isinstance(obj, six.string_types) and isinstance(obj, collections.Iterable)) + + +def ensure_dict(obj): + """Copy an existing dictionary or default to empty dict....""" + if not obj: + return {} + # default to a shallow copy to avoid most ownership issues + return dict(obj)