Add optional 'defer_reverts' behavior

This makes it possible to REVERT a subflow
and have it also revert the parent flow if
the parent flow doesn't have its own retry
strategy. We will probably want to make this
new behavior the default or only behavior
in a future release.

Change-Id: Iea5ac366380ba7396a87d0185703549fb0c2f825
This commit is contained in:
Greg Hill 2015-11-10 19:42:37 -06:00 committed by Joshua Harlow
parent 805ce7e909
commit cd922d4e46
8 changed files with 160 additions and 22 deletions

View File

@ -18,6 +18,7 @@ import abc
import weakref
from oslo_utils import reflection
from oslo_utils import strutils
import six
from taskflow.engines.action_engine import compiler as co
@ -178,6 +179,20 @@ class Completer(object):
elif strategy == retry_atom.REVERT:
# Ask parent retry and figure out what to do...
parent_resolver = self._determine_resolution(retry, failure)
# In the future, this will be the only behavior. REVERT
# should defer to the parent retry if it exists, or use the
# default REVERT_ALL if it doesn't. This lets you safely nest
# flows with retries inside flows without retries and it still
# behave as a user would expect, i.e. if the retry gets
# exhausted it reverts the outer flow unless the outer flow
# has a separate retry behavior.
defer_reverts = strutils.bool_from_string(
self._runtime.options.get('defer_reverts', False)
)
if defer_reverts:
return parent_resolver
# Ok if the parent resolver says something not REVERT, and
# it isn't just using the undefined resolver, assume the
# parent knows best.

View File

@ -319,11 +319,13 @@ class ActionEngine(base.Engine):
if self._compiled:
return
self._compilation = self._compiler.compile()
self._runtime = runtime.Runtime(self._compilation,
self.storage,
self.atom_notifier,
self._task_executor,
self._retry_executor)
self._retry_executor,
options=self._options)
self._runtime.compile()
self._compiled = True

View File

@ -42,13 +42,14 @@ class Runtime(object):
"""
def __init__(self, compilation, storage, atom_notifier,
task_executor, retry_executor):
task_executor, retry_executor, options=None):
self._atom_notifier = atom_notifier
self._task_executor = task_executor
self._retry_executor = retry_executor
self._storage = storage
self._compilation = compilation
self._atom_cache = {}
self._options = misc.ensure_dict(options)
@staticmethod
def _walk_edge_deciders(graph, atom):
@ -130,6 +131,10 @@ class Runtime(object):
def storage(self):
return self._storage
@property
def options(self):
return self._options
@misc.cachedproperty
def analyzer(self):
return an.Analyzer(self)

View File

@ -21,6 +21,7 @@ from debtcollector import moves
import six
from taskflow.types import notifier
from taskflow.utils import misc
@six.add_metaclass(abc.ABCMeta)
@ -41,10 +42,7 @@ class Engine(object):
self._flow = flow
self._flow_detail = flow_detail
self._backend = backend
if not options:
self._options = {}
else:
self._options = dict(options)
self._options = misc.ensure_dict(options)
self._notifier = notifier.Notifier()
self._atom_notifier = notifier.Notifier()

View File

@ -34,13 +34,25 @@ class Decision(misc.StrEnum):
This strategy first consults the parent atom before reverting the
associated subflow to determine if the parent retry object provides a
different reconciliation strategy (if no parent retry object exists
then reverting will proceed, if one does exist the parent retry may
override this reconciliation strategy with its own).
different reconciliation strategy. This allows for safe nesting of
flows with different retry strategies.
If the parent flow has no retry strategy, the default behavior is
to just revert the atoms in the associated subflow. This is
generally not the desired behavior, but is left as the default in
order to keep backwards-compatibility. The ``defer_reverts``
engine option will let you change this behavior. If that is set
to True, a REVERT will always defer to the parent, meaning that
if the parent has no retry strategy, it will be reverted as well.
"""
#: Completely reverts the whole flow.
REVERT_ALL = "REVERT_ALL"
"""Reverts the entire flow, regardless of parent strategy.
This strategy will revert every atom that has executed thus
far, regardless of whether the parent flow has a separate
retry strategy associated with it.
"""
#: Retries the surrounding/associated subflow again.
RETRY = "RETRY"

View File

@ -202,6 +202,69 @@ class RetryTest(utils.EngineTestBase):
'flow-1.f SUCCESS']
self.assertEqual(expected, capturer.values)
def test_new_revert_vs_old(self):
flow = lf.Flow('flow-1').add(
utils.TaskNoRequiresNoReturns("task1"),
lf.Flow('flow-2', retry.Times(1, 'r1', provides='x')).add(
utils.TaskNoRequiresNoReturns("task2"),
utils.ConditionalTask("task3")
),
utils.TaskNoRequiresNoReturns("task4")
)
engine = self._make_engine(flow)
engine.storage.inject({'y': 2})
with utils.CaptureListener(engine) as capturer:
try:
engine.run()
except Exception:
pass
expected = ['flow-1.f RUNNING',
'task1.t RUNNING',
'task1.t SUCCESS(None)',
'r1.r RUNNING',
'r1.r SUCCESS(1)',
'task2.t RUNNING',
'task2.t SUCCESS(None)',
'task3.t RUNNING',
'task3.t FAILURE(Failure: RuntimeError: Woot!)',
'task3.t REVERTING',
'task3.t REVERTED(None)',
'task2.t REVERTING',
'task2.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED(None)',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
engine = self._make_engine(flow, defer_reverts=True)
engine.storage.inject({'y': 2})
with utils.CaptureListener(engine) as capturer:
try:
engine.run()
except Exception:
pass
expected = ['flow-1.f RUNNING',
'task1.t RUNNING',
'task1.t SUCCESS(None)',
'r1.r RUNNING',
'r1.r SUCCESS(1)',
'task2.t RUNNING',
'task2.t SUCCESS(None)',
'task3.t RUNNING',
'task3.t FAILURE(Failure: RuntimeError: Woot!)',
'task3.t REVERTING',
'task3.t REVERTED(None)',
'task2.t REVERTING',
'task2.t REVERTED(None)',
'r1.r REVERTING',
'r1.r REVERTED(None)',
'task1.t REVERTING',
'task1.t REVERTED(None)',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
def test_states_retry_failure_parent_flow_fails(self):
flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x1')).add(
utils.TaskNoRequiresNoReturns("task1"),
@ -1210,11 +1273,12 @@ class RetryParallelExecutionTest(utils.EngineTestBase):
class SerialEngineTest(RetryTest, test.TestCase):
def _make_engine(self, flow, flow_detail=None):
def _make_engine(self, flow, defer_reverts=None, flow_detail=None):
return taskflow.engines.load(flow,
flow_detail=flow_detail,
engine='serial',
backend=self.backend)
backend=self.backend,
defer_reverts=defer_reverts)
class ParallelEngineWithThreadsTest(RetryTest,
@ -1222,36 +1286,46 @@ class ParallelEngineWithThreadsTest(RetryTest,
test.TestCase):
_EXECUTOR_WORKERS = 2
def _make_engine(self, flow, flow_detail=None, executor=None):
def _make_engine(self, flow, defer_reverts=None, flow_detail=None,
executor=None):
if executor is None:
executor = 'threads'
return taskflow.engines.load(flow, flow_detail=flow_detail,
return taskflow.engines.load(flow,
flow_detail=flow_detail,
engine='parallel',
backend=self.backend,
executor=executor,
max_workers=self._EXECUTOR_WORKERS)
max_workers=self._EXECUTOR_WORKERS,
defer_reverts=defer_reverts)
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class ParallelEngineWithEventletTest(RetryTest, test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
def _make_engine(self, flow, defer_reverts=None, flow_detail=None,
executor=None):
if executor is None:
executor = futurist.GreenThreadPoolExecutor()
self.addCleanup(executor.shutdown)
return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend, engine='parallel',
executor=executor)
return taskflow.engines.load(flow,
flow_detail=flow_detail,
backend=self.backend,
engine='parallel',
executor=executor,
defer_reverts=defer_reverts)
class ParallelEngineWithProcessTest(RetryTest, test.TestCase):
_EXECUTOR_WORKERS = 2
def _make_engine(self, flow, flow_detail=None, executor=None):
def _make_engine(self, flow, defer_reverts=None, flow_detail=None,
executor=None):
if executor is None:
executor = 'processes'
return taskflow.engines.load(flow, flow_detail=flow_detail,
return taskflow.engines.load(flow,
flow_detail=flow_detail,
engine='parallel',
backend=self.backend,
executor=executor,
max_workers=self._EXECUTOR_WORKERS)
max_workers=self._EXECUTOR_WORKERS,
defer_reverts=defer_reverts)

View File

@ -340,3 +340,27 @@ class TestIterable(test.TestCase):
def test_dict(self):
self.assertTrue(misc.is_iterable(dict()))
class TestEnsureDict(testscenarios.TestWithScenarios):
scenarios = [
('none', {'original': None, 'expected': {}}),
('empty_dict', {'original': {}, 'expected': {}}),
('empty_list', {'original': [], 'expected': {}}),
('dict', {'original': {'a': 1, 'b': 2}, 'expected': {'a': 1, 'b': 2}}),
]
def test_expected(self):
self.assertEqual(self.expected, misc.ensure_dict(self.original))
self.assertFalse(self.expected is misc.ensure_dict(self.original))
class TestEnsureDictRaises(testscenarios.TestWithScenarios):
scenarios = [
('list', {'original': [1, 2], 'exception': TypeError}),
('tuple', {'original': (1, 2), 'exception': TypeError}),
('set', {'original': set([1, 2]), 'exception': TypeError}),
]
def test_exceptions(self):
self.assertRaises(self.exception, misc.ensure_dict, self.original)

View File

@ -595,3 +595,11 @@ def is_iterable(obj):
"""
return (not isinstance(obj, six.string_types) and
isinstance(obj, collections.Iterable))
def ensure_dict(obj):
"""Copy an existing dictionary or default to empty dict...."""
if not obj:
return {}
# default to a shallow copy to avoid most ownership issues
return dict(obj)