From 227cf52319f14306ddd89a2bef42ac264210ac2e Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 23 Mar 2015 12:39:15 -0700 Subject: [PATCH] Make the storage layer more resilent to failures When a storage failure happens the internally maintained flow detail or atom details should not be updated; to ensure this happens update a cloned atom/flow detail first and only if the save succeeds do we update the internal version (and perform any futher work). Change-Id: Ib8da476b6be2c71b663e0fc659b5df24e6b9f095 --- taskflow/storage.py | 227 ++++++++++++++++++++++++-------------------- 1 file changed, 125 insertions(+), 102 deletions(-) diff --git a/taskflow/storage.py b/taskflow/storage.py index fe601c9a..83311e92 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -170,22 +170,21 @@ class Storage(object): for ad in self._flowdetail) try: - injector_td = self._atomdetail_by_name( - self.injector_name, - expected_type=logbook.TaskDetail) + source, _clone = self._atomdetail_by_name( + self.injector_name, expected_type=logbook.TaskDetail) except exceptions.NotFound: pass else: - names = six.iterkeys(injector_td.results) - self._set_result_mapping(injector_td.name, - dict((name, name) for name in names)) + names_iter = six.iterkeys(source.results) + self._set_result_mapping(source.name, + dict((name, name) for name in names_iter)) def _with_connection(self, functor, *args, **kwargs): # Run the given functor with a backend connection as its first # argument (providing the additional positional arguments and keyword # arguments as subsequent arguments). with contextlib.closing(self._backend.get_connection()) as conn: - functor(conn, *args, **kwargs) + return functor(conn, *args, **kwargs) def ensure_atom(self, atom): """Ensure that there is an atomdetail in storage for the given atom. @@ -258,18 +257,23 @@ class Storage(object): self._set_result_mapping(retry_name, result_mapping) return retry_id - def _create_atom_detail(self, _detail_cls, name, uuid, task_version=None): + def _create_atom_detail(self, detail_cls, name, uuid, task_version=None): """Add the atom detail to flow detail. Atom becomes known to storage by that name and uuid. Atom state is set to PENDING. """ - ad = _detail_cls(name, uuid) + ad = detail_cls(name, uuid) ad.state = states.PENDING ad.version = task_version - self._flowdetail.add(ad) - self._with_connection(self._save_flow_detail) + # Add the atom detail to the clone, which upon success will be + # updated into the contained flow detail; if it does not get saved + # then no update will happen. + source, clone = self._fetch_flowdetail(clone=True) + clone.add(ad) + self._with_connection(self._save_flow_detail, source, clone) self._atom_name_to_uuid[ad.name] = ad.uuid + return ad @property def flow_name(self): @@ -289,13 +293,21 @@ class Storage(object): # This never changes (so no read locking needed). return self._backend - def _save_flow_detail(self, conn): + def _save_flow_detail(self, conn, original_flow_detail, flow_detail): # NOTE(harlowja): we need to update our contained flow detail if # the result of the update actually added more (aka another process # added item to the flow detail). - self._flowdetail.update(conn.update_flow_details(self._flowdetail)) + original_flow_detail.update(conn.update_flow_details(flow_detail)) + return original_flow_detail - def _atomdetail_by_name(self, atom_name, expected_type=None): + def _fetch_flowdetail(self, clone=False): + source = self._flowdetail + if clone: + return (source, source.copy()) + else: + return (source, source) + + def _atomdetail_by_name(self, atom_name, expected_type=None, clone=False): try: ad = self._flowdetail.find(self._atom_name_to_uuid[atom_name]) except KeyError: @@ -308,47 +320,51 @@ class Storage(object): raise TypeError("Atom %s is not of the expected type: %s" % (atom_name, reflection.get_class_name(expected_type))) - return ad + if clone: + return (ad, ad.copy()) + else: + return (ad, ad) - def _save_atom_detail(self, conn, atom_detail): + def _save_atom_detail(self, conn, original_atom_detail, atom_detail): # NOTE(harlowja): we need to update our contained atom detail if # the result of the update actually added more (aka another process # is also modifying the task detail), since python is by reference # and the contained atom detail will reflect the old state if we don't # do this update. - atom_detail.update(conn.update_atom_details(atom_detail)) + original_atom_detail.update(conn.update_atom_details(atom_detail)) + return original_atom_detail @lock_utils.read_locked def get_atom_uuid(self, atom_name): """Gets an atoms uuid given a atoms name.""" - ad = self._atomdetail_by_name(atom_name) - return ad.uuid + source, _clone = self._atomdetail_by_name(atom_name) + return source.uuid @lock_utils.write_locked def set_atom_state(self, atom_name, state): """Sets an atoms state.""" - ad = self._atomdetail_by_name(atom_name) - ad.state = state - self._with_connection(self._save_atom_detail, ad) + source, clone = self._atomdetail_by_name(atom_name, clone=True) + clone.state = state + self._with_connection(self._save_atom_detail, source, clone) @lock_utils.read_locked def get_atom_state(self, atom_name): """Gets the state of an atom given an atoms name.""" - ad = self._atomdetail_by_name(atom_name) - return ad.state + source, _clone = self._atomdetail_by_name(atom_name) + return source.state @lock_utils.write_locked def set_atom_intention(self, atom_name, intention): """Sets the intention of an atom given an atoms name.""" - ad = self._atomdetail_by_name(atom_name) - ad.intention = intention - self._with_connection(self._save_atom_detail, ad) + source, clone = self._atomdetail_by_name(atom_name, clone=True) + clone.intention = intention + self._with_connection(self._save_atom_detail, source, clone) @lock_utils.read_locked def get_atom_intention(self, atom_name): """Gets the intention of an atom given an atoms name.""" - ad = self._atomdetail_by_name(atom_name) - return ad.intention + source, _clone = self._atomdetail_by_name(atom_name) + return source.intention @lock_utils.read_locked def get_atoms_states(self, atom_names): @@ -360,11 +376,12 @@ class Storage(object): @lock_utils.write_locked def _update_atom_metadata(self, atom_name, update_with, expected_type=None): - ad = self._atomdetail_by_name(atom_name, - expected_type=expected_type) + source, clone = self._atomdetail_by_name(atom_name, + expected_type=expected_type, + clone=True) if update_with: - ad.meta.update(update_with) - self._with_connection(self._save_atom_detail, ad) + clone.meta.update(update_with) + self._with_connection(self._save_atom_detail, source, clone) def update_atom_metadata(self, atom_name, update_with): """Updates a atoms associated metadata. @@ -407,10 +424,10 @@ class Storage(object): :param task_name: tasks name :returns: current task progress value """ - ad = self._atomdetail_by_name(task_name, - expected_type=logbook.TaskDetail) + source, _clone = self._atomdetail_by_name( + task_name, expected_type=logbook.TaskDetail) try: - return ad.meta[META_PROGRESS] + return source.meta[META_PROGRESS] except KeyError: return 0.0 @@ -422,10 +439,10 @@ class Storage(object): :returns: None if progress_details not defined, else progress_details dict """ - ad = self._atomdetail_by_name(task_name, - expected_type=logbook.TaskDetail) + source, _clone = self._atomdetail_by_name( + task_name, expected_type=logbook.TaskDetail) try: - return ad.meta[META_PROGRESS_DETAILS] + return source.meta[META_PROGRESS_DETAILS] except KeyError: return None @@ -449,23 +466,24 @@ class Storage(object): @lock_utils.write_locked def save(self, atom_name, data, state=states.SUCCESS): """Put result for atom with id 'uuid' to storage.""" - ad = self._atomdetail_by_name(atom_name) - ad.put(state, data) + source, clone = self._atomdetail_by_name(atom_name, clone=True) + clone.put(state, data) + result = self._with_connection(self._save_atom_detail, source, clone) if state == states.FAILURE and isinstance(data, failure.Failure): # NOTE(imelnikov): failure serialization looses information, - # so we cache failures here, in atom name -> failure mapping. - self._failures[ad.name] = data + # so we cache failures here, in atom name -> failure mapping so + # that we can later use the better version on fetch/get. + self._failures[result.name] = data else: - self._check_all_results_provided(ad.name, data) - self._with_connection(self._save_atom_detail, ad) + self._check_all_results_provided(result.name, data) @lock_utils.write_locked def save_retry_failure(self, retry_name, failed_atom_name, failure): """Save subflow failure to retry controller history.""" - ad = self._atomdetail_by_name(retry_name, - expected_type=logbook.RetryDetail) + source, clone = self._atomdetail_by_name( + retry_name, expected_type=logbook.RetryDetail, clone=True) try: - failures = ad.last_failures + failures = clone.last_failures except exceptions.NotFound as e: raise exceptions.StorageFailure("Unable to fetch most recent" " retry failures so new retry" @@ -473,32 +491,35 @@ class Storage(object): else: if failed_atom_name not in failures: failures[failed_atom_name] = failure - self._with_connection(self._save_atom_detail, ad) + self._with_connection(self._save_atom_detail, source, clone) @lock_utils.write_locked def cleanup_retry_history(self, retry_name, state): """Cleanup history of retry atom with given name.""" - ad = self._atomdetail_by_name(retry_name, - expected_type=logbook.RetryDetail) - ad.state = state - ad.results = [] - self._with_connection(self._save_atom_detail, ad) + source, clone = self._atomdetail_by_name( + retry_name, expected_type=logbook.RetryDetail, clone=True) + clone.state = state + clone.results = [] + self._with_connection(self._save_atom_detail, source, clone) @lock_utils.read_locked def _get(self, atom_name, only_last=False): - ad = self._atomdetail_by_name(atom_name) - if ad.failure is not None: + source, _clone = self._atomdetail_by_name(atom_name) + if source.failure is not None: cached = self._failures.get(atom_name) - if ad.failure.matches(cached): + if source.failure.matches(cached): + # Try to give the version back that should have the backtrace + # instead of one that has it stripped (since backtraces are not + # serializable). return cached - return ad.failure - if ad.state not in STATES_WITH_RESULTS: + return source.failure + if source.state not in STATES_WITH_RESULTS: raise exceptions.NotFound("Result for atom %s is not currently" " known" % atom_name) if only_last: - return ad.last_results + return source.last_results else: - return ad.results + return source.results def get(self, atom_name): """Gets the results for an atom with a given name from storage.""" @@ -516,21 +537,17 @@ class Storage(object): """Returns True if there are failed tasks in the storage.""" return bool(self._failures) - def _reset_atom(self, ad, state): - if ad.name == self.injector_name: - return False - if ad.state == state: - return False - ad.reset(state) - self._failures.pop(ad.name, None) - return True - @lock_utils.write_locked def reset(self, atom_name, state=states.PENDING): - """Reset atom with given name (if the task is in a given state).""" - ad = self._atomdetail_by_name(atom_name) - if self._reset_atom(ad, state): - self._with_connection(self._save_atom_detail, ad) + """Reset atom with given name (if the atom is not in a given state).""" + if atom_name == self.injector_name: + return + source, clone = self._atomdetail_by_name(atom_name, clone=True) + if source.state == state: + return + clone.reset(state) + result = self._with_connection(self._save_atom_detail, source, clone) + self._failures.pop(result.name, None) def inject_atom_args(self, atom_name, pairs, transient=True): """Add values into storage for a specific atom only. @@ -574,13 +591,13 @@ class Storage(object): self._injected_args[atom_name].update(pairs) def save_persistent(): - ad = self._atomdetail_by_name(atom_name) - injected = ad.meta.get(META_INJECTED) + source, clone = self._atomdetail_by_name(atom_name, clone=True) + injected = source.meta.get(META_INJECTED) if not injected: injected = {} injected.update(pairs) - ad.meta[META_INJECTED] = injected - self._with_connection(self._save_atom_detail, ad) + clone.meta[META_INJECTED] = injected + self._with_connection(self._save_atom_detail, source, clone) with self._lock.write_lock(): if transient: @@ -618,20 +635,22 @@ class Storage(object): def save_persistent(): try: - ad = self._atomdetail_by_name(self.injector_name, - expected_type=logbook.TaskDetail) + source, clone = self._atomdetail_by_name( + self.injector_name, + expected_type=logbook.TaskDetail, + clone=True) except exceptions.NotFound: - uuid = uuidutils.generate_uuid() - self._create_atom_detail(logbook.TaskDetail, - self.injector_name, uuid) - ad = self._atomdetail_by_name(self.injector_name, - expected_type=logbook.TaskDetail) - ad.results = dict(pairs) - ad.state = states.SUCCESS + source = self._create_atom_detail(logbook.TaskDetail, + self.injector_name, + uuidutils.generate_uuid()) + clone = source + clone.results = dict(pairs) + clone.state = states.SUCCESS else: - ad.results.update(pairs) - self._with_connection(self._save_atom_detail, ad) - return (self.injector_name, six.iterkeys(ad.results)) + clone.results.update(pairs) + result = self._with_connection(self._save_atom_detail, + source, clone) + return (self.injector_name, six.iterkeys(result.results)) def save_transient(): self._transients.update(pairs) @@ -641,6 +660,7 @@ class Storage(object): provider_name, names = save_transient() else: provider_name, names = save_persistent() + self._set_result_mapping(provider_name, dict((name, name) for name in names)) @@ -750,14 +770,14 @@ class Storage(object): providers.append(p) return providers - ad = self._atomdetail_by_name(atom_name) + source, _clone = self._atomdetail_by_name(atom_name) if scope_walker is None: scope_walker = self._scope_fetcher(atom_name) if optional_args is None: optional_args = [] injected_sources = [ self._injected_args.get(atom_name, {}), - ad.meta.get(META_INJECTED, {}), + source.meta.get(META_INJECTED, {}), ] missing = set(six.iterkeys(args_mapping)) for (bound_name, name) in six.iteritems(args_mapping): @@ -856,10 +876,10 @@ class Storage(object): if optional_args is None: optional_args = [] if atom_name: - ad = self._atomdetail_by_name(atom_name) + source, _clone = self._atomdetail_by_name(atom_name) injected_sources = [ self._injected_args.get(atom_name, {}), - ad.meta.get(META_INJECTED, {}), + source.meta.get(META_INJECTED, {}), ] if scope_walker is None: scope_walker = self._scope_fetcher(atom_name) @@ -917,20 +937,23 @@ class Storage(object): @lock_utils.write_locked def set_flow_state(self, state): """Set flow details state and save it.""" - self._flowdetail.state = state - self._with_connection(self._save_flow_detail) + source, clone = self._fetch_flowdetail(clone=True) + clone.state = state + self._with_connection(self._save_flow_detail, source, clone) @lock_utils.write_locked def update_flow_metadata(self, update_with): """Update flowdetails metadata and save it.""" if update_with: - self._flowdetail.meta.update(update_with) - self._with_connection(self._save_flow_detail) + source, clone = self._fetch_flowdetail(clone=True) + clone.meta.update(update_with) + self._with_connection(self._save_flow_detail, source, clone) @lock_utils.read_locked def get_flow_state(self): """Get state from flow details.""" - state = self._flowdetail.state + source = self._flowdetail + state = source.state if state is None: state = states.PENDING return state @@ -951,9 +974,9 @@ class Storage(object): @lock_utils.read_locked def get_retry_history(self, retry_name): """Fetch a single retrys history.""" - ad = self._atomdetail_by_name(retry_name, - expected_type=logbook.RetryDetail) - return self._translate_into_history(ad) + source, _clone = self._atomdetail_by_name( + retry_name, expected_type=logbook.RetryDetail) + return self._translate_into_history(source) @lock_utils.read_locked def get_retry_histories(self):