From 23a62fef9fb7f67467aaa714257be7fa451364c4 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 11 Jun 2015 14:38:34 -0700 Subject: [PATCH] Perform a few optimizations to decrease persistence interactions To reduce the amount of *unneeded* persistence backend interactions we can optimize a few cases to avoid saving anything when nothing has changed; this should help in a few cases, and is easy low hanging fruit. Part of blueprint make-things-speedy Change-Id: I4fe958c94ef308919395345fd5c0d85f181446fb --- taskflow/persistence/logbook.py | 40 ++++++++++++++++++++++++--------- taskflow/storage.py | 20 ++++++++++------- 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index be254ea1..c7a6eae5 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -52,10 +52,6 @@ def _safe_unmarshal_time(when): return timeutils.unmarshall_time(when) -def _was_failure(state, result): - return state == states.FAILURE and isinstance(result, ft.Failure) - - def _fix_meta(data): # Handle the case where older schemas allowed this to be non-dict by # correcting this case by replacing it with a dictionary when a non-dict @@ -434,6 +430,11 @@ class AtomDetail(object): self.meta = {} self.version = None + @staticmethod + def _was_failure(state, result): + # Internal helper method... + return state == states.FAILURE and isinstance(result, ft.Failure) + @property def last_results(self): """Gets the atoms last result. @@ -601,13 +602,29 @@ class TaskDetail(AtomDetail): will be set to ``None``). In either case the ``state`` attribute will be set to the provided state. """ - self.state = state - if _was_failure(state, result): - self.failure = result - self.results = None + was_altered = False + if self.state != state: + self.state = state + was_altered = True + if self._was_failure(state, result): + if self.failure != result: + self.failure = result + was_altered = True + if self.results is not None: + self.results = None + was_altered = True else: - self.results = result - self.failure = None + # We don't really have the ability to determine equality of + # task (user) results at the current time, without making + # potentially bad guesses, so assume the task detail always needs + # to be saved if they are not exactly equivalent... + if self.results is not result: + self.results = result + was_altered = True + if self.failure is not None: + self.failure = None + was_altered = True + return was_altered def merge(self, other, deep_copy=False): """Merges the current task detail with the given one. @@ -763,11 +780,12 @@ class RetryDetail(AtomDetail): """ # Do not clean retry history (only on reset does this happen). self.state = state - if _was_failure(state, result): + if self._was_failure(state, result): self.failure = result else: self.results.append((result, {})) self.failure = None + return True @classmethod def from_dict(cls, data): diff --git a/taskflow/storage.py b/taskflow/storage.py index 6be4b26a..d50d8771 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -169,7 +169,6 @@ class Storage(object): self._atom_name_to_uuid = dict((ad.name, ad.uuid) for ad in self._flowdetail) - try: source, _clone = self._atomdetail_by_name( self.injector_name, expected_type=logbook.TaskDetail) @@ -320,8 +319,9 @@ class Storage(object): def set_atom_state(self, atom_name, state): """Sets an atoms state.""" source, clone = self._atomdetail_by_name(atom_name, clone=True) - clone.state = state - self._with_connection(self._save_atom_detail, source, clone) + if source.state != state: + clone.state = state + self._with_connection(self._save_atom_detail, source, clone) @fasteners.read_locked def get_atom_state(self, atom_name): @@ -333,8 +333,9 @@ class Storage(object): def set_atom_intention(self, atom_name, intention): """Sets the intention of an atom given an atoms name.""" source, clone = self._atomdetail_by_name(atom_name, clone=True) - clone.intention = intention - self._with_connection(self._save_atom_detail, source, clone) + if source.intention != intention: + clone.intention = intention + self._with_connection(self._save_atom_detail, source, clone) @fasteners.read_locked def get_atom_intention(self, atom_name): @@ -441,10 +442,13 @@ class Storage(object): @fasteners.write_locked def save(self, atom_name, data, state=states.SUCCESS): - """Put result for atom with id 'uuid' to storage.""" + """Save result for named atom into storage with given state.""" source, clone = self._atomdetail_by_name(atom_name, clone=True) - clone.put(state, data) - result = self._with_connection(self._save_atom_detail, source, clone) + if clone.put(state, data): + result = self._with_connection(self._save_atom_detail, + source, clone) + else: + result = clone if state == states.FAILURE and isinstance(data, failure.Failure): # NOTE(imelnikov): failure serialization looses information, # so we cache failures here, in atom name -> failure mapping so