Allow atoms to save their own state/result
Instead of having storage adjust the internals of the atom detail types (which requires type checking), remove the type checking and let the types themselves decide where to put their own states and results. Change-Id: I397954dc746e9dacb2b65e352d11d8f7f36cdac4
This commit is contained in:
@@ -49,6 +49,10 @@ def _safe_unmarshal_time(when):
|
|||||||
return timeutils.unmarshall_time(when)
|
return timeutils.unmarshall_time(when)
|
||||||
|
|
||||||
|
|
||||||
|
def _was_failure(state, result):
|
||||||
|
return state == states.FAILURE and isinstance(result, misc.Failure)
|
||||||
|
|
||||||
|
|
||||||
def _fix_meta(data):
|
def _fix_meta(data):
|
||||||
# Handle the case where older schemas allowed this to be non-dict by
|
# Handle the case where older schemas allowed this to be non-dict by
|
||||||
# correcting this case by replacing it with a dictionary when a non-dict
|
# correcting this case by replacing it with a dictionary when a non-dict
|
||||||
@@ -317,6 +321,10 @@ class AtomDetail(object):
|
|||||||
def to_dict(self):
|
def to_dict(self):
|
||||||
"""Translates the internal state of this object to a dictionary."""
|
"""Translates the internal state of this object to a dictionary."""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def put(self, state, result):
|
||||||
|
"""Puts a result (acquired in the given state) into this detail."""
|
||||||
|
|
||||||
def _to_dict_shared(self):
|
def _to_dict_shared(self):
|
||||||
if self.failure:
|
if self.failure:
|
||||||
failure = self.failure.to_dict()
|
failure = self.failure.to_dict()
|
||||||
@@ -367,6 +375,15 @@ class TaskDetail(AtomDetail):
|
|||||||
self.state = state
|
self.state = state
|
||||||
self.intention = states.EXECUTE
|
self.intention = states.EXECUTE
|
||||||
|
|
||||||
|
def put(self, state, result):
|
||||||
|
self.state = state
|
||||||
|
if _was_failure(state, result):
|
||||||
|
self.failure = result
|
||||||
|
self.results = None
|
||||||
|
else:
|
||||||
|
self.results = result
|
||||||
|
self.failure = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_dict(cls, data):
|
def from_dict(cls, data):
|
||||||
"""Translates the given data into an instance of this class."""
|
"""Translates the given data into an instance of this class."""
|
||||||
@@ -416,6 +433,15 @@ class RetryDetail(AtomDetail):
|
|||||||
except IndexError as e:
|
except IndexError as e:
|
||||||
raise exc.NotFound("Last failures not found", e)
|
raise exc.NotFound("Last failures not found", e)
|
||||||
|
|
||||||
|
def put(self, state, result):
|
||||||
|
# Do not clean retry history (only on reset does this happen).
|
||||||
|
self.state = state
|
||||||
|
if _was_failure(state, result):
|
||||||
|
self.failure = result
|
||||||
|
else:
|
||||||
|
self.results.append((result, {}))
|
||||||
|
self.failure = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_dict(cls, data):
|
def from_dict(cls, data):
|
||||||
"""Translates the given data into an instance of this class."""
|
"""Translates the given data into an instance of this class."""
|
||||||
|
|||||||
@@ -53,7 +53,7 @@ class Storage(object):
|
|||||||
self._lock = self._lock_cls()
|
self._lock = self._lock_cls()
|
||||||
|
|
||||||
# NOTE(imelnikov): failure serialization looses information,
|
# NOTE(imelnikov): failure serialization looses information,
|
||||||
# so we cache failures here, in task name -> misc.Failure mapping.
|
# so we cache failures here, in atom name -> failure mapping.
|
||||||
self._failures = {}
|
self._failures = {}
|
||||||
for ad in self._flowdetail:
|
for ad in self._flowdetail:
|
||||||
if ad.failure is not None:
|
if ad.failure is not None:
|
||||||
@@ -322,24 +322,12 @@ class Storage(object):
|
|||||||
"""Put result for atom with id 'uuid' to storage."""
|
"""Put result for atom with id 'uuid' to storage."""
|
||||||
with self._lock.write_lock():
|
with self._lock.write_lock():
|
||||||
ad = self._atomdetail_by_name(atom_name)
|
ad = self._atomdetail_by_name(atom_name)
|
||||||
ad.state = state
|
ad.put(state, data)
|
||||||
if state == states.FAILURE and isinstance(data, misc.Failure):
|
if state == states.FAILURE and isinstance(data, misc.Failure):
|
||||||
# FIXME(harlowja): this seems like it should be internal logic
|
# NOTE(imelnikov): failure serialization looses information,
|
||||||
# in the atom detail object and not in here. Fix that soon...
|
# so we cache failures here, in atom name -> failure mapping.
|
||||||
#
|
|
||||||
# Do not clean retry history
|
|
||||||
if not isinstance(ad, logbook.RetryDetail):
|
|
||||||
ad.results = None
|
|
||||||
ad.failure = data
|
|
||||||
self._failures[ad.name] = data
|
self._failures[ad.name] = data
|
||||||
else:
|
else:
|
||||||
# FIXME(harlowja): this seems like it should be internal logic
|
|
||||||
# in the atom detail object and not in here. Fix that soon...
|
|
||||||
if isinstance(ad, logbook.RetryDetail):
|
|
||||||
ad.results.append((data, {}))
|
|
||||||
else:
|
|
||||||
ad.results = data
|
|
||||||
ad.failure = None
|
|
||||||
self._check_all_results_provided(ad.name, data)
|
self._check_all_results_provided(ad.name, data)
|
||||||
self._with_connection(self._save_atom_detail, ad)
|
self._with_connection(self._save_atom_detail, ad)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user