Merge "Make the storage layer more resilent to failures"

This commit is contained in:
Jenkins
2015-04-25 16:51:06 +00:00
committed by Gerrit Code Review

View File

@@ -170,22 +170,21 @@ class Storage(object):
for ad in self._flowdetail)
try:
injector_td = self._atomdetail_by_name(
self.injector_name,
expected_type=logbook.TaskDetail)
source, _clone = self._atomdetail_by_name(
self.injector_name, expected_type=logbook.TaskDetail)
except exceptions.NotFound:
pass
else:
names = six.iterkeys(injector_td.results)
self._set_result_mapping(injector_td.name,
dict((name, name) for name in names))
names_iter = six.iterkeys(source.results)
self._set_result_mapping(source.name,
dict((name, name) for name in names_iter))
def _with_connection(self, functor, *args, **kwargs):
# Run the given functor with a backend connection as its first
# argument (providing the additional positional arguments and keyword
# arguments as subsequent arguments).
with contextlib.closing(self._backend.get_connection()) as conn:
functor(conn, *args, **kwargs)
return functor(conn, *args, **kwargs)
def ensure_atom(self, atom):
"""Ensure that there is an atomdetail in storage for the given atom.
@@ -258,18 +257,23 @@ class Storage(object):
self._set_result_mapping(retry_name, result_mapping)
return retry_id
def _create_atom_detail(self, _detail_cls, name, uuid, task_version=None):
def _create_atom_detail(self, detail_cls, name, uuid, task_version=None):
"""Add the atom detail to flow detail.
Atom becomes known to storage by that name and uuid.
Atom state is set to PENDING.
"""
ad = _detail_cls(name, uuid)
ad = detail_cls(name, uuid)
ad.state = states.PENDING
ad.version = task_version
self._flowdetail.add(ad)
self._with_connection(self._save_flow_detail)
# Add the atom detail to the clone, which upon success will be
# updated into the contained flow detail; if it does not get saved
# then no update will happen.
source, clone = self._fetch_flowdetail(clone=True)
clone.add(ad)
self._with_connection(self._save_flow_detail, source, clone)
self._atom_name_to_uuid[ad.name] = ad.uuid
return ad
@property
def flow_name(self):
@@ -289,13 +293,21 @@ class Storage(object):
# This never changes (so no read locking needed).
return self._backend
def _save_flow_detail(self, conn):
def _save_flow_detail(self, conn, original_flow_detail, flow_detail):
# NOTE(harlowja): we need to update our contained flow detail if
# the result of the update actually added more (aka another process
# added item to the flow detail).
self._flowdetail.update(conn.update_flow_details(self._flowdetail))
original_flow_detail.update(conn.update_flow_details(flow_detail))
return original_flow_detail
def _atomdetail_by_name(self, atom_name, expected_type=None):
def _fetch_flowdetail(self, clone=False):
source = self._flowdetail
if clone:
return (source, source.copy())
else:
return (source, source)
def _atomdetail_by_name(self, atom_name, expected_type=None, clone=False):
try:
ad = self._flowdetail.find(self._atom_name_to_uuid[atom_name])
except KeyError:
@@ -308,47 +320,51 @@ class Storage(object):
raise TypeError("Atom %s is not of the expected type: %s"
% (atom_name,
reflection.get_class_name(expected_type)))
return ad
if clone:
return (ad, ad.copy())
else:
return (ad, ad)
def _save_atom_detail(self, conn, atom_detail):
def _save_atom_detail(self, conn, original_atom_detail, atom_detail):
# NOTE(harlowja): we need to update our contained atom detail if
# the result of the update actually added more (aka another process
# is also modifying the task detail), since python is by reference
# and the contained atom detail will reflect the old state if we don't
# do this update.
atom_detail.update(conn.update_atom_details(atom_detail))
original_atom_detail.update(conn.update_atom_details(atom_detail))
return original_atom_detail
@lock_utils.read_locked
def get_atom_uuid(self, atom_name):
"""Gets an atoms uuid given a atoms name."""
ad = self._atomdetail_by_name(atom_name)
return ad.uuid
source, _clone = self._atomdetail_by_name(atom_name)
return source.uuid
@lock_utils.write_locked
def set_atom_state(self, atom_name, state):
"""Sets an atoms state."""
ad = self._atomdetail_by_name(atom_name)
ad.state = state
self._with_connection(self._save_atom_detail, ad)
source, clone = self._atomdetail_by_name(atom_name, clone=True)
clone.state = state
self._with_connection(self._save_atom_detail, source, clone)
@lock_utils.read_locked
def get_atom_state(self, atom_name):
"""Gets the state of an atom given an atoms name."""
ad = self._atomdetail_by_name(atom_name)
return ad.state
source, _clone = self._atomdetail_by_name(atom_name)
return source.state
@lock_utils.write_locked
def set_atom_intention(self, atom_name, intention):
"""Sets the intention of an atom given an atoms name."""
ad = self._atomdetail_by_name(atom_name)
ad.intention = intention
self._with_connection(self._save_atom_detail, ad)
source, clone = self._atomdetail_by_name(atom_name, clone=True)
clone.intention = intention
self._with_connection(self._save_atom_detail, source, clone)
@lock_utils.read_locked
def get_atom_intention(self, atom_name):
"""Gets the intention of an atom given an atoms name."""
ad = self._atomdetail_by_name(atom_name)
return ad.intention
source, _clone = self._atomdetail_by_name(atom_name)
return source.intention
@lock_utils.read_locked
def get_atoms_states(self, atom_names):
@@ -360,11 +376,12 @@ class Storage(object):
@lock_utils.write_locked
def _update_atom_metadata(self, atom_name, update_with,
expected_type=None):
ad = self._atomdetail_by_name(atom_name,
expected_type=expected_type)
source, clone = self._atomdetail_by_name(atom_name,
expected_type=expected_type,
clone=True)
if update_with:
ad.meta.update(update_with)
self._with_connection(self._save_atom_detail, ad)
clone.meta.update(update_with)
self._with_connection(self._save_atom_detail, source, clone)
def update_atom_metadata(self, atom_name, update_with):
"""Updates a atoms associated metadata.
@@ -407,10 +424,10 @@ class Storage(object):
:param task_name: tasks name
:returns: current task progress value
"""
ad = self._atomdetail_by_name(task_name,
expected_type=logbook.TaskDetail)
source, _clone = self._atomdetail_by_name(
task_name, expected_type=logbook.TaskDetail)
try:
return ad.meta[META_PROGRESS]
return source.meta[META_PROGRESS]
except KeyError:
return 0.0
@@ -422,10 +439,10 @@ class Storage(object):
:returns: None if progress_details not defined, else progress_details
dict
"""
ad = self._atomdetail_by_name(task_name,
expected_type=logbook.TaskDetail)
source, _clone = self._atomdetail_by_name(
task_name, expected_type=logbook.TaskDetail)
try:
return ad.meta[META_PROGRESS_DETAILS]
return source.meta[META_PROGRESS_DETAILS]
except KeyError:
return None
@@ -449,23 +466,24 @@ class Storage(object):
@lock_utils.write_locked
def save(self, atom_name, data, state=states.SUCCESS):
"""Put result for atom with id 'uuid' to storage."""
ad = self._atomdetail_by_name(atom_name)
ad.put(state, data)
source, clone = self._atomdetail_by_name(atom_name, clone=True)
clone.put(state, data)
result = self._with_connection(self._save_atom_detail, source, clone)
if state == states.FAILURE and isinstance(data, failure.Failure):
# NOTE(imelnikov): failure serialization looses information,
# so we cache failures here, in atom name -> failure mapping.
self._failures[ad.name] = data
# so we cache failures here, in atom name -> failure mapping so
# that we can later use the better version on fetch/get.
self._failures[result.name] = data
else:
self._check_all_results_provided(ad.name, data)
self._with_connection(self._save_atom_detail, ad)
self._check_all_results_provided(result.name, data)
@lock_utils.write_locked
def save_retry_failure(self, retry_name, failed_atom_name, failure):
"""Save subflow failure to retry controller history."""
ad = self._atomdetail_by_name(retry_name,
expected_type=logbook.RetryDetail)
source, clone = self._atomdetail_by_name(
retry_name, expected_type=logbook.RetryDetail, clone=True)
try:
failures = ad.last_failures
failures = clone.last_failures
except exceptions.NotFound as e:
raise exceptions.StorageFailure("Unable to fetch most recent"
" retry failures so new retry"
@@ -473,32 +491,35 @@ class Storage(object):
else:
if failed_atom_name not in failures:
failures[failed_atom_name] = failure
self._with_connection(self._save_atom_detail, ad)
self._with_connection(self._save_atom_detail, source, clone)
@lock_utils.write_locked
def cleanup_retry_history(self, retry_name, state):
"""Cleanup history of retry atom with given name."""
ad = self._atomdetail_by_name(retry_name,
expected_type=logbook.RetryDetail)
ad.state = state
ad.results = []
self._with_connection(self._save_atom_detail, ad)
source, clone = self._atomdetail_by_name(
retry_name, expected_type=logbook.RetryDetail, clone=True)
clone.state = state
clone.results = []
self._with_connection(self._save_atom_detail, source, clone)
@lock_utils.read_locked
def _get(self, atom_name, only_last=False):
ad = self._atomdetail_by_name(atom_name)
if ad.failure is not None:
source, _clone = self._atomdetail_by_name(atom_name)
if source.failure is not None:
cached = self._failures.get(atom_name)
if ad.failure.matches(cached):
if source.failure.matches(cached):
# Try to give the version back that should have the backtrace
# instead of one that has it stripped (since backtraces are not
# serializable).
return cached
return ad.failure
if ad.state not in STATES_WITH_RESULTS:
return source.failure
if source.state not in STATES_WITH_RESULTS:
raise exceptions.NotFound("Result for atom %s is not currently"
" known" % atom_name)
if only_last:
return ad.last_results
return source.last_results
else:
return ad.results
return source.results
def get(self, atom_name):
"""Gets the results for an atom with a given name from storage."""
@@ -516,21 +537,17 @@ class Storage(object):
"""Returns True if there are failed tasks in the storage."""
return bool(self._failures)
def _reset_atom(self, ad, state):
if ad.name == self.injector_name:
return False
if ad.state == state:
return False
ad.reset(state)
self._failures.pop(ad.name, None)
return True
@lock_utils.write_locked
def reset(self, atom_name, state=states.PENDING):
"""Reset atom with given name (if the task is in a given state)."""
ad = self._atomdetail_by_name(atom_name)
if self._reset_atom(ad, state):
self._with_connection(self._save_atom_detail, ad)
"""Reset atom with given name (if the atom is not in a given state)."""
if atom_name == self.injector_name:
return
source, clone = self._atomdetail_by_name(atom_name, clone=True)
if source.state == state:
return
clone.reset(state)
result = self._with_connection(self._save_atom_detail, source, clone)
self._failures.pop(result.name, None)
def inject_atom_args(self, atom_name, pairs, transient=True):
"""Add values into storage for a specific atom only.
@@ -574,13 +591,13 @@ class Storage(object):
self._injected_args[atom_name].update(pairs)
def save_persistent():
ad = self._atomdetail_by_name(atom_name)
injected = ad.meta.get(META_INJECTED)
source, clone = self._atomdetail_by_name(atom_name, clone=True)
injected = source.meta.get(META_INJECTED)
if not injected:
injected = {}
injected.update(pairs)
ad.meta[META_INJECTED] = injected
self._with_connection(self._save_atom_detail, ad)
clone.meta[META_INJECTED] = injected
self._with_connection(self._save_atom_detail, source, clone)
with self._lock.write_lock():
if transient:
@@ -618,20 +635,22 @@ class Storage(object):
def save_persistent():
try:
ad = self._atomdetail_by_name(self.injector_name,
expected_type=logbook.TaskDetail)
source, clone = self._atomdetail_by_name(
self.injector_name,
expected_type=logbook.TaskDetail,
clone=True)
except exceptions.NotFound:
uuid = uuidutils.generate_uuid()
self._create_atom_detail(logbook.TaskDetail,
self.injector_name, uuid)
ad = self._atomdetail_by_name(self.injector_name,
expected_type=logbook.TaskDetail)
ad.results = dict(pairs)
ad.state = states.SUCCESS
source = self._create_atom_detail(logbook.TaskDetail,
self.injector_name,
uuidutils.generate_uuid())
clone = source
clone.results = dict(pairs)
clone.state = states.SUCCESS
else:
ad.results.update(pairs)
self._with_connection(self._save_atom_detail, ad)
return (self.injector_name, six.iterkeys(ad.results))
clone.results.update(pairs)
result = self._with_connection(self._save_atom_detail,
source, clone)
return (self.injector_name, six.iterkeys(result.results))
def save_transient():
self._transients.update(pairs)
@@ -641,6 +660,7 @@ class Storage(object):
provider_name, names = save_transient()
else:
provider_name, names = save_persistent()
self._set_result_mapping(provider_name,
dict((name, name) for name in names))
@@ -750,14 +770,14 @@ class Storage(object):
providers.append(p)
return providers
ad = self._atomdetail_by_name(atom_name)
source, _clone = self._atomdetail_by_name(atom_name)
if scope_walker is None:
scope_walker = self._scope_fetcher(atom_name)
if optional_args is None:
optional_args = []
injected_sources = [
self._injected_args.get(atom_name, {}),
ad.meta.get(META_INJECTED, {}),
source.meta.get(META_INJECTED, {}),
]
missing = set(six.iterkeys(args_mapping))
for (bound_name, name) in six.iteritems(args_mapping):
@@ -856,10 +876,10 @@ class Storage(object):
if optional_args is None:
optional_args = []
if atom_name:
ad = self._atomdetail_by_name(atom_name)
source, _clone = self._atomdetail_by_name(atom_name)
injected_sources = [
self._injected_args.get(atom_name, {}),
ad.meta.get(META_INJECTED, {}),
source.meta.get(META_INJECTED, {}),
]
if scope_walker is None:
scope_walker = self._scope_fetcher(atom_name)
@@ -917,20 +937,23 @@ class Storage(object):
@lock_utils.write_locked
def set_flow_state(self, state):
"""Set flow details state and save it."""
self._flowdetail.state = state
self._with_connection(self._save_flow_detail)
source, clone = self._fetch_flowdetail(clone=True)
clone.state = state
self._with_connection(self._save_flow_detail, source, clone)
@lock_utils.write_locked
def update_flow_metadata(self, update_with):
"""Update flowdetails metadata and save it."""
if update_with:
self._flowdetail.meta.update(update_with)
self._with_connection(self._save_flow_detail)
source, clone = self._fetch_flowdetail(clone=True)
clone.meta.update(update_with)
self._with_connection(self._save_flow_detail, source, clone)
@lock_utils.read_locked
def get_flow_state(self):
"""Get state from flow details."""
state = self._flowdetail.state
source = self._flowdetail
state = source.state
if state is None:
state = states.PENDING
return state
@@ -951,9 +974,9 @@ class Storage(object):
@lock_utils.read_locked
def get_retry_history(self, retry_name):
"""Fetch a single retrys history."""
ad = self._atomdetail_by_name(retry_name,
expected_type=logbook.RetryDetail)
return self._translate_into_history(ad)
source, _clone = self._atomdetail_by_name(
retry_name, expected_type=logbook.RetryDetail)
return self._translate_into_history(source)
@lock_utils.read_locked
def get_retry_histories(self):