diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index f378b677..d3b0a7a4 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -21,6 +21,7 @@ import logging import six +from taskflow import exceptions as exc from taskflow.openstack.common import timeutils from taskflow.openstack.common import uuidutils from taskflow import states @@ -269,6 +270,13 @@ class AtomDetail(object): # information can be associated with. self.version = None + @property + def last_results(self): + """Gets the atoms last result (if it has many results it should then + return the last one of many). + """ + return self.results + def update(self, ad): """Updates the objects state to be the same as the given one.""" if ad is self: @@ -394,6 +402,20 @@ class RetryDetail(AtomDetail): self.state = state self.intention = states.EXECUTE + @property + def last_results(self): + try: + return self.results[-1][0] + except IndexError as e: + raise exc.NotFound("Last results not found", e) + + @property + def last_failures(self): + try: + return self.results[-1][1] + except IndexError as e: + raise exc.NotFound("Last failures not found", e) + @classmethod def from_dict(cls, data): """Translates the given data into an instance of this class.""" diff --git a/taskflow/storage.py b/taskflow/storage.py index dca870f3..bc70cde5 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -350,10 +350,16 @@ class Storage(object): with self._lock.write_lock(): ad = self._atomdetail_by_name(retry_name, expected_type=logbook.RetryDetail) - failures = ad.results[-1][1] - if failed_atom_name not in failures: - failures[failed_atom_name] = failure - self._with_connection(self._save_atom_detail, ad) + try: + failures = ad.last_failures + except exceptions.NotFound as e: + raise exceptions.StorageFailure("Unable to fetch most recent" + " retry failures so new retry" + " failure can be inserted", e) + else: + if failed_atom_name not in failures: + failures[failed_atom_name] = failure + self._with_connection(self._save_atom_detail, ad) def cleanup_retry_history(self, retry_name, state): """Cleanup history of retry atom with given name.""" @@ -364,8 +370,7 @@ class Storage(object): ad.results = [] self._with_connection(self._save_atom_detail, ad) - def get(self, atom_name): - """Gets the result for an atom with a given name from storage.""" + def _get(self, atom_name, only_last=False): with self._lock.read_lock(): ad = self._atomdetail_by_name(atom_name) if ad.failure is not None: @@ -376,7 +381,14 @@ class Storage(object): if ad.state not in STATES_WITH_RESULTS: raise exceptions.NotFound("Result for atom %s is not currently" " known" % atom_name) - return ad.results + if only_last: + return ad.last_results + else: + return ad.results + + def get(self, atom_name): + """Gets the results for an atom with a given name from storage.""" + return self._get(atom_name) def get_failures(self): """Get list of failures that happened with this flow. @@ -473,17 +485,8 @@ class Storage(object): # Return the first one that is found. for (atom_name, index) in reversed(indexes): try: - result = self.get(atom_name) - ad = self._atomdetail_by_name(atom_name) - - # If it is a retry's result then fetch values from the - # latest retry run only. - if isinstance(ad, logbook.RetryDetail): - if result: - result = result[-1][0] - else: - result = None - return misc.item_from(result, index, name) + results = self._get(atom_name, only_last=True) + return misc.item_from(results, index, name) except exceptions.NotFound: pass raise exceptions.NotFound("Unable to find result %r" % name)