diff --git a/doc/source/arguments_and_results.rst b/doc/source/arguments_and_results.rst index 3082e2fc3..cb2c87610 100644 --- a/doc/source/arguments_and_results.rst +++ b/doc/source/arguments_and_results.rst @@ -385,11 +385,13 @@ representation of the ``do_something`` result. Retry arguments =============== -A |Retry| controller works with arguments in the same way as a |Task|. But -it has an additional parameter ``'history'`` that is a list of tuples. Each -tuple contains a result of the previous retry run and a table where the key -is a failed task and the value is a -:py:class:`~taskflow.types.failure.Failure` object. +A |Retry| controller works with arguments in the same way as a |Task|. But it +has an additional parameter ``'history'`` that is itself a +:py:class:`~taskflow.retry.History` object that contains what failed over all +the engines attempts (aka the outcomes). The history object can be +viewed as a tuple that contains a result of the previous retrys run and a +table/dict where each key is a failed atoms name and each value is +a :py:class:`~taskflow.types.failure.Failure` object. Consider the following implementation:: @@ -398,19 +400,19 @@ Consider the following implementation:: default_provides = 'value' def on_failure(self, history, *args, **kwargs): - print history + print(list(history)) return RETRY def execute(self, history, *args, **kwargs): - print history + print(list(history)) return 5 def revert(self, history, *args, **kwargs): - print history + print(list(history)) Imagine the above retry had returned a value ``'5'`` and then some task ``'A'`` -failed with some exception. In this case the above retrys ``on_failure`` -method will receive the following history:: +failed with some exception. In this case ``on_failure`` method will receive +the following history (printed as a list):: [('5', {'A': failure.Failure()})] @@ -419,12 +421,10 @@ At this point (since the implementation returned ``RETRY``) the history and it can then return a value that subseqent tasks can use to alter there behavior. -If instead the |retry.execute| method raises an exception, the |retry.revert| -method of the implementation will be called and +If instead the |retry.execute| method itself raises an exception, +the |retry.revert| method of the implementation will be called and a :py:class:`~taskflow.types.failure.Failure` object will be present in the -history instead of the typical result:: - - [('5', {'A': failure.Failure()}), (failure.Failure(), {})] +history object instead of the typical result. .. note:: diff --git a/taskflow/retry.py b/taskflow/retry.py index 6897e3b7c..4dab636ff 100644 --- a/taskflow/retry.py +++ b/taskflow/retry.py @@ -41,6 +41,70 @@ EXECUTE_REVERT_HISTORY = 'history' REVERT_FLOW_FAILURES = 'flow_failures' +class History(object): + """Helper that simplifies interactions with retry historical contents.""" + + def __init__(self, contents, failure=None): + self._contents = contents + self._failure = failure + + @property + def failure(self): + """Returns the retries own failure or none if not existent.""" + return self._failure + + def outcomes_iter(self, index=None): + """Iterates over the contained failure outcomes. + + If the index is not provided, then all outcomes are iterated over. + + NOTE(harlowja): if the retry itself failed, this will **not** include + those types of failures. Use the :py:attr:`.failure` attribute to + access that instead (if it exists, aka, non-none). + """ + if index is None: + contents = self._contents + else: + contents = [ + self._contents[index], + ] + for (provided, outcomes) in contents: + for (owner, outcome) in six.iteritems(outcomes): + yield (owner, outcome) + + def __len__(self): + return len(self._contents) + + def provided_iter(self): + """Iterates over all the values the retry has attempted (in order).""" + for (provided, outcomes) in self._contents: + yield provided + + def __getitem__(self, index): + return self._contents[index] + + def caused_by(self, exception_cls, index=None, include_retry=False): + """Checks if the exception class provided caused the failures. + + If the index is not provided, then all outcomes are iterated over. + + NOTE(harlowja): only if ``include_retry`` is provided as true (defaults + to false) will the potential retries own failure be + checked against as well. + """ + for (name, failure) in self.outcomes_iter(index=index): + if failure.check(exception_cls): + return True + if include_retry and self._failure is not None: + if self._failure.check(exception_cls): + return True + return False + + def __iter__(self): + """Iterates over the raw contents of this history object.""" + return iter(self._contents) + + @six.add_metaclass(abc.ABCMeta) class Retry(atom.Atom): """A class that can decide how to resolve execution failures. @@ -177,8 +241,7 @@ class ForEachBase(Retry): # Fetches the next resolution result to try, removes overlapping # entries with what has already been tried and then returns the first # resolution strategy remaining. - items = (item for item, _failures in history) - remaining = misc.sequence_minus(values, items) + remaining = misc.sequence_minus(values, history.provided_iter()) if not remaining: raise exc.NotFound("No elements left in collection of iterable " "retry controller %s" % self.name) diff --git a/taskflow/storage.py b/taskflow/storage.py index c667509b4..30b12951f 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -745,20 +745,35 @@ class Storage(object): state = states.PENDING return state + def _translate_into_history(self, ad): + failure = None + if ad.failure is not None: + # NOTE(harlowja): Try to use our local cache to get a more + # complete failure object that has a traceback (instead of the + # one that is saved which will *typically* not have one)... + cached = self._failures.get(ad.name) + if ad.failure.matches(cached): + failure = cached + else: + failure = ad.failure + return retry.History(ad.results, failure=failure) + def get_retry_history(self, retry_name): - """Fetch retry results history.""" + """Fetch a single retrys history.""" with self._lock.read_lock(): ad = self._atomdetail_by_name(retry_name, expected_type=logbook.RetryDetail) - if ad.failure is not None: - cached = self._failures.get(retry_name) - history = list(ad.results) - if ad.failure.matches(cached): - history.append((cached, {})) - else: - history.append((ad.failure, {})) - return history - return ad.results + return self._translate_into_history(ad) + + def get_retry_histories(self): + """Fetch all retrys histories.""" + histories = [] + with self._lock.read_lock(): + for ad in self._flowdetail: + if isinstance(ad, logbook.RetryDetail): + histories.append((ad.name, + self._translate_into_history(ad))) + return histories class MultiThreadedStorage(Storage): diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 544004351..106bfaedb 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -631,11 +631,13 @@ class RetryTest(utils.EngineTestBase): r = FailingRetry() flow = lf.Flow('testflow', r) - self.assertRaisesRegexp(ValueError, '^OMG', - self._make_engine(flow).run) - self.assertEqual(len(r.history), 1) - self.assertEqual(r.history[0][1], {}) - self.assertEqual(isinstance(r.history[0][0], failure.Failure), True) + engine = self._make_engine(flow) + self.assertRaisesRegexp(ValueError, '^OMG', engine.run) + self.assertEqual(1, len(engine.storage.get_retry_histories())) + self.assertEqual(len(r.history), 0) + self.assertEqual([], list(r.history.outcomes_iter())) + self.assertIsNotNone(r.history.failure) + self.assertTrue(r.history.caused_by(ValueError, include_retry=True)) def test_retry_revert_fails(self): diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index deb4db4fa..f774993ca 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -448,7 +448,7 @@ class StorageTestMixin(object): s = self._get_storage() s.ensure_atom(test_utils.NoopRetry('my retry')) history = s.get_retry_history('my retry') - self.assertEqual(history, []) + self.assertEqual([], list(history)) def test_ensure_retry_and_task_with_same_name(self): s = self._get_storage() @@ -463,7 +463,8 @@ class StorageTestMixin(object): s.save('my retry', 'a') s.save('my retry', 'b') history = s.get_retry_history('my retry') - self.assertEqual(history, [('a', {}), ('b', {})]) + self.assertEqual([('a', {}), ('b', {})], list(history)) + self.assertEqual(['a', 'b'], list(history.provided_iter())) def test_save_retry_results_with_mapping(self): s = self._get_storage() @@ -471,9 +472,10 @@ class StorageTestMixin(object): s.save('my retry', 'a') s.save('my retry', 'b') history = s.get_retry_history('my retry') - self.assertEqual(history, [('a', {}), ('b', {})]) - self.assertEqual(s.fetch_all(), {'x': 'b'}) - self.assertEqual(s.fetch('x'), 'b') + self.assertEqual([('a', {}), ('b', {})], list(history)) + self.assertEqual(['a', 'b'], list(history.provided_iter())) + self.assertEqual({'x': 'b'}, s.fetch_all()) + self.assertEqual('b', s.fetch('x')) def test_cleanup_retry_history(self): s = self._get_storage() @@ -482,7 +484,8 @@ class StorageTestMixin(object): s.save('my retry', 'b') s.cleanup_retry_history('my retry', states.REVERTED) history = s.get_retry_history('my retry') - self.assertEqual(history, []) + self.assertEqual(list(history), []) + self.assertEqual(0, len(history)) self.assertEqual(s.fetch_all(), {}) def test_cached_retry_failure(self): @@ -492,8 +495,11 @@ class StorageTestMixin(object): s.save('my retry', 'a') s.save('my retry', a_failure, states.FAILURE) history = s.get_retry_history('my retry') - self.assertEqual(history, [('a', {}), (a_failure, {})]) - self.assertIs(s.has_failures(), True) + self.assertEqual([('a', {})], list(history)) + self.assertTrue(history.caused_by(RuntimeError, include_retry=True)) + self.assertIsNotNone(history.failure) + self.assertEqual(1, len(history)) + self.assertTrue(s.has_failures()) self.assertEqual(s.get_failures(), {'my retry': a_failure}) def test_logbook_get_unknown_atom_type(self):