Add a history retry object, makes retry histories easier to use

When a retry object is asked to make a decision about the atoms
it controls it is currently provided a complex object that contains
what has failed and why and what was provided to try to resolve this
failure as raw types. To make it easier to interact with that history
provide and use a more easier to interact with helper object that
provides useful functionality built ontop of the raw types.

Part of blueprint intuitive-retries

Change-Id: I93f86552f5a0c26b269319e4de6d9b8fb3b3b219
This commit is contained in:
Joshua Harlow 2014-09-23 16:55:05 -07:00
parent 2fcf67d6b9
commit 2a7ca47967
5 changed files with 126 additions and 40 deletions

View File

@ -385,11 +385,13 @@ representation of the ``do_something`` result.
Retry arguments Retry arguments
=============== ===============
A |Retry| controller works with arguments in the same way as a |Task|. But A |Retry| controller works with arguments in the same way as a |Task|. But it
it has an additional parameter ``'history'`` that is a list of tuples. Each has an additional parameter ``'history'`` that is itself a
tuple contains a result of the previous retry run and a table where the key :py:class:`~taskflow.retry.History` object that contains what failed over all
is a failed task and the value is a the engines attempts (aka the outcomes). The history object can be
:py:class:`~taskflow.types.failure.Failure` object. viewed as a tuple that contains a result of the previous retrys run and a
table/dict where each key is a failed atoms name and each value is
a :py:class:`~taskflow.types.failure.Failure` object.
Consider the following implementation:: Consider the following implementation::
@ -398,19 +400,19 @@ Consider the following implementation::
default_provides = 'value' default_provides = 'value'
def on_failure(self, history, *args, **kwargs): def on_failure(self, history, *args, **kwargs):
print history print(list(history))
return RETRY return RETRY
def execute(self, history, *args, **kwargs): def execute(self, history, *args, **kwargs):
print history print(list(history))
return 5 return 5
def revert(self, history, *args, **kwargs): def revert(self, history, *args, **kwargs):
print history print(list(history))
Imagine the above retry had returned a value ``'5'`` and then some task ``'A'`` Imagine the above retry had returned a value ``'5'`` and then some task ``'A'``
failed with some exception. In this case the above retrys ``on_failure`` failed with some exception. In this case ``on_failure`` method will receive
method will receive the following history:: the following history (printed as a list)::
[('5', {'A': failure.Failure()})] [('5', {'A': failure.Failure()})]
@ -419,12 +421,10 @@ At this point (since the implementation returned ``RETRY``) the
history and it can then return a value that subseqent tasks can use to alter history and it can then return a value that subseqent tasks can use to alter
there behavior. there behavior.
If instead the |retry.execute| method raises an exception, the |retry.revert| If instead the |retry.execute| method itself raises an exception,
method of the implementation will be called and the |retry.revert| method of the implementation will be called and
a :py:class:`~taskflow.types.failure.Failure` object will be present in the a :py:class:`~taskflow.types.failure.Failure` object will be present in the
history instead of the typical result:: history object instead of the typical result.
[('5', {'A': failure.Failure()}), (failure.Failure(), {})]
.. note:: .. note::

View File

@ -41,6 +41,70 @@ EXECUTE_REVERT_HISTORY = 'history'
REVERT_FLOW_FAILURES = 'flow_failures' REVERT_FLOW_FAILURES = 'flow_failures'
class History(object):
"""Helper that simplifies interactions with retry historical contents."""
def __init__(self, contents, failure=None):
self._contents = contents
self._failure = failure
@property
def failure(self):
"""Returns the retries own failure or none if not existent."""
return self._failure
def outcomes_iter(self, index=None):
"""Iterates over the contained failure outcomes.
If the index is not provided, then all outcomes are iterated over.
NOTE(harlowja): if the retry itself failed, this will **not** include
those types of failures. Use the :py:attr:`.failure` attribute to
access that instead (if it exists, aka, non-none).
"""
if index is None:
contents = self._contents
else:
contents = [
self._contents[index],
]
for (provided, outcomes) in contents:
for (owner, outcome) in six.iteritems(outcomes):
yield (owner, outcome)
def __len__(self):
return len(self._contents)
def provided_iter(self):
"""Iterates over all the values the retry has attempted (in order)."""
for (provided, outcomes) in self._contents:
yield provided
def __getitem__(self, index):
return self._contents[index]
def caused_by(self, exception_cls, index=None, include_retry=False):
"""Checks if the exception class provided caused the failures.
If the index is not provided, then all outcomes are iterated over.
NOTE(harlowja): only if ``include_retry`` is provided as true (defaults
to false) will the potential retries own failure be
checked against as well.
"""
for (name, failure) in self.outcomes_iter(index=index):
if failure.check(exception_cls):
return True
if include_retry and self._failure is not None:
if self._failure.check(exception_cls):
return True
return False
def __iter__(self):
"""Iterates over the raw contents of this history object."""
return iter(self._contents)
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class Retry(atom.Atom): class Retry(atom.Atom):
"""A class that can decide how to resolve execution failures. """A class that can decide how to resolve execution failures.
@ -177,8 +241,7 @@ class ForEachBase(Retry):
# Fetches the next resolution result to try, removes overlapping # Fetches the next resolution result to try, removes overlapping
# entries with what has already been tried and then returns the first # entries with what has already been tried and then returns the first
# resolution strategy remaining. # resolution strategy remaining.
items = (item for item, _failures in history) remaining = misc.sequence_minus(values, history.provided_iter())
remaining = misc.sequence_minus(values, items)
if not remaining: if not remaining:
raise exc.NotFound("No elements left in collection of iterable " raise exc.NotFound("No elements left in collection of iterable "
"retry controller %s" % self.name) "retry controller %s" % self.name)

View File

@ -745,20 +745,35 @@ class Storage(object):
state = states.PENDING state = states.PENDING
return state return state
def _translate_into_history(self, ad):
failure = None
if ad.failure is not None:
# NOTE(harlowja): Try to use our local cache to get a more
# complete failure object that has a traceback (instead of the
# one that is saved which will *typically* not have one)...
cached = self._failures.get(ad.name)
if ad.failure.matches(cached):
failure = cached
else:
failure = ad.failure
return retry.History(ad.results, failure=failure)
def get_retry_history(self, retry_name): def get_retry_history(self, retry_name):
"""Fetch retry results history.""" """Fetch a single retrys history."""
with self._lock.read_lock(): with self._lock.read_lock():
ad = self._atomdetail_by_name(retry_name, ad = self._atomdetail_by_name(retry_name,
expected_type=logbook.RetryDetail) expected_type=logbook.RetryDetail)
if ad.failure is not None: return self._translate_into_history(ad)
cached = self._failures.get(retry_name)
history = list(ad.results) def get_retry_histories(self):
if ad.failure.matches(cached): """Fetch all retrys histories."""
history.append((cached, {})) histories = []
else: with self._lock.read_lock():
history.append((ad.failure, {})) for ad in self._flowdetail:
return history if isinstance(ad, logbook.RetryDetail):
return ad.results histories.append((ad.name,
self._translate_into_history(ad)))
return histories
class MultiThreadedStorage(Storage): class MultiThreadedStorage(Storage):

View File

@ -631,11 +631,13 @@ class RetryTest(utils.EngineTestBase):
r = FailingRetry() r = FailingRetry()
flow = lf.Flow('testflow', r) flow = lf.Flow('testflow', r)
self.assertRaisesRegexp(ValueError, '^OMG', engine = self._make_engine(flow)
self._make_engine(flow).run) self.assertRaisesRegexp(ValueError, '^OMG', engine.run)
self.assertEqual(len(r.history), 1) self.assertEqual(1, len(engine.storage.get_retry_histories()))
self.assertEqual(r.history[0][1], {}) self.assertEqual(len(r.history), 0)
self.assertEqual(isinstance(r.history[0][0], failure.Failure), True) self.assertEqual([], list(r.history.outcomes_iter()))
self.assertIsNotNone(r.history.failure)
self.assertTrue(r.history.caused_by(ValueError, include_retry=True))
def test_retry_revert_fails(self): def test_retry_revert_fails(self):

View File

@ -448,7 +448,7 @@ class StorageTestMixin(object):
s = self._get_storage() s = self._get_storage()
s.ensure_atom(test_utils.NoopRetry('my retry')) s.ensure_atom(test_utils.NoopRetry('my retry'))
history = s.get_retry_history('my retry') history = s.get_retry_history('my retry')
self.assertEqual(history, []) self.assertEqual([], list(history))
def test_ensure_retry_and_task_with_same_name(self): def test_ensure_retry_and_task_with_same_name(self):
s = self._get_storage() s = self._get_storage()
@ -463,7 +463,8 @@ class StorageTestMixin(object):
s.save('my retry', 'a') s.save('my retry', 'a')
s.save('my retry', 'b') s.save('my retry', 'b')
history = s.get_retry_history('my retry') history = s.get_retry_history('my retry')
self.assertEqual(history, [('a', {}), ('b', {})]) self.assertEqual([('a', {}), ('b', {})], list(history))
self.assertEqual(['a', 'b'], list(history.provided_iter()))
def test_save_retry_results_with_mapping(self): def test_save_retry_results_with_mapping(self):
s = self._get_storage() s = self._get_storage()
@ -471,9 +472,10 @@ class StorageTestMixin(object):
s.save('my retry', 'a') s.save('my retry', 'a')
s.save('my retry', 'b') s.save('my retry', 'b')
history = s.get_retry_history('my retry') history = s.get_retry_history('my retry')
self.assertEqual(history, [('a', {}), ('b', {})]) self.assertEqual([('a', {}), ('b', {})], list(history))
self.assertEqual(s.fetch_all(), {'x': 'b'}) self.assertEqual(['a', 'b'], list(history.provided_iter()))
self.assertEqual(s.fetch('x'), 'b') self.assertEqual({'x': 'b'}, s.fetch_all())
self.assertEqual('b', s.fetch('x'))
def test_cleanup_retry_history(self): def test_cleanup_retry_history(self):
s = self._get_storage() s = self._get_storage()
@ -482,7 +484,8 @@ class StorageTestMixin(object):
s.save('my retry', 'b') s.save('my retry', 'b')
s.cleanup_retry_history('my retry', states.REVERTED) s.cleanup_retry_history('my retry', states.REVERTED)
history = s.get_retry_history('my retry') history = s.get_retry_history('my retry')
self.assertEqual(history, []) self.assertEqual(list(history), [])
self.assertEqual(0, len(history))
self.assertEqual(s.fetch_all(), {}) self.assertEqual(s.fetch_all(), {})
def test_cached_retry_failure(self): def test_cached_retry_failure(self):
@ -492,8 +495,11 @@ class StorageTestMixin(object):
s.save('my retry', 'a') s.save('my retry', 'a')
s.save('my retry', a_failure, states.FAILURE) s.save('my retry', a_failure, states.FAILURE)
history = s.get_retry_history('my retry') history = s.get_retry_history('my retry')
self.assertEqual(history, [('a', {}), (a_failure, {})]) self.assertEqual([('a', {})], list(history))
self.assertIs(s.has_failures(), True) self.assertTrue(history.caused_by(RuntimeError, include_retry=True))
self.assertIsNotNone(history.failure)
self.assertEqual(1, len(history))
self.assertTrue(s.has_failures())
self.assertEqual(s.get_failures(), {'my retry': a_failure}) self.assertEqual(s.get_failures(), {'my retry': a_failure})
def test_logbook_get_unknown_atom_type(self): def test_logbook_get_unknown_atom_type(self):