Merge "Handle retry last_results/last_failure better"
This commit is contained in:
@@ -21,6 +21,7 @@ import logging
|
||||
|
||||
import six
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.openstack.common import timeutils
|
||||
from taskflow.openstack.common import uuidutils
|
||||
from taskflow import states
|
||||
@@ -269,6 +270,13 @@ class AtomDetail(object):
|
||||
# information can be associated with.
|
||||
self.version = None
|
||||
|
||||
@property
|
||||
def last_results(self):
|
||||
"""Gets the atoms last result (if it has many results it should then
|
||||
return the last one of many).
|
||||
"""
|
||||
return self.results
|
||||
|
||||
def update(self, ad):
|
||||
"""Updates the objects state to be the same as the given one."""
|
||||
if ad is self:
|
||||
@@ -394,6 +402,20 @@ class RetryDetail(AtomDetail):
|
||||
self.state = state
|
||||
self.intention = states.EXECUTE
|
||||
|
||||
@property
|
||||
def last_results(self):
|
||||
try:
|
||||
return self.results[-1][0]
|
||||
except IndexError as e:
|
||||
raise exc.NotFound("Last results not found", e)
|
||||
|
||||
@property
|
||||
def last_failures(self):
|
||||
try:
|
||||
return self.results[-1][1]
|
||||
except IndexError as e:
|
||||
raise exc.NotFound("Last failures not found", e)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data):
|
||||
"""Translates the given data into an instance of this class."""
|
||||
|
||||
@@ -350,10 +350,16 @@ class Storage(object):
|
||||
with self._lock.write_lock():
|
||||
ad = self._atomdetail_by_name(retry_name,
|
||||
expected_type=logbook.RetryDetail)
|
||||
failures = ad.results[-1][1]
|
||||
if failed_atom_name not in failures:
|
||||
failures[failed_atom_name] = failure
|
||||
self._with_connection(self._save_atom_detail, ad)
|
||||
try:
|
||||
failures = ad.last_failures
|
||||
except exceptions.NotFound as e:
|
||||
raise exceptions.StorageFailure("Unable to fetch most recent"
|
||||
" retry failures so new retry"
|
||||
" failure can be inserted", e)
|
||||
else:
|
||||
if failed_atom_name not in failures:
|
||||
failures[failed_atom_name] = failure
|
||||
self._with_connection(self._save_atom_detail, ad)
|
||||
|
||||
def cleanup_retry_history(self, retry_name, state):
|
||||
"""Cleanup history of retry atom with given name."""
|
||||
@@ -364,8 +370,7 @@ class Storage(object):
|
||||
ad.results = []
|
||||
self._with_connection(self._save_atom_detail, ad)
|
||||
|
||||
def get(self, atom_name):
|
||||
"""Gets the result for an atom with a given name from storage."""
|
||||
def _get(self, atom_name, only_last=False):
|
||||
with self._lock.read_lock():
|
||||
ad = self._atomdetail_by_name(atom_name)
|
||||
if ad.failure is not None:
|
||||
@@ -376,7 +381,14 @@ class Storage(object):
|
||||
if ad.state not in STATES_WITH_RESULTS:
|
||||
raise exceptions.NotFound("Result for atom %s is not currently"
|
||||
" known" % atom_name)
|
||||
return ad.results
|
||||
if only_last:
|
||||
return ad.last_results
|
||||
else:
|
||||
return ad.results
|
||||
|
||||
def get(self, atom_name):
|
||||
"""Gets the results for an atom with a given name from storage."""
|
||||
return self._get(atom_name)
|
||||
|
||||
def get_failures(self):
|
||||
"""Get list of failures that happened with this flow.
|
||||
@@ -473,17 +485,8 @@ class Storage(object):
|
||||
# Return the first one that is found.
|
||||
for (atom_name, index) in reversed(indexes):
|
||||
try:
|
||||
result = self.get(atom_name)
|
||||
ad = self._atomdetail_by_name(atom_name)
|
||||
|
||||
# If it is a retry's result then fetch values from the
|
||||
# latest retry run only.
|
||||
if isinstance(ad, logbook.RetryDetail):
|
||||
if result:
|
||||
result = result[-1][0]
|
||||
else:
|
||||
result = None
|
||||
return misc.item_from(result, index, name)
|
||||
results = self._get(atom_name, only_last=True)
|
||||
return misc.item_from(results, index, name)
|
||||
except exceptions.NotFound:
|
||||
pass
|
||||
raise exceptions.NotFound("Unable to find result %r" % name)
|
||||
|
||||
Reference in New Issue
Block a user