Add + use read/write lock decorators

For when a whole function should be locked by
a read or write lock it is quite useful to have
decorators that acquire and release and wrap
the target function with the provided lock.

This change switches the storage module to use
these decorators where appropriate (mainly in
places where the read or write lock is activated
for the duration of the functions call, at which
point it is clearer to just use the new decorators
instead).

Change-Id: I70d2c1ab478e9b7da9446482a4ffb28f6f5227b7
This commit is contained in:
Joshua Harlow
2015-03-08 18:28:44 -07:00
parent bd1dfce36a
commit 20fdbba141
2 changed files with 256 additions and 202 deletions

View File

@@ -290,53 +290,53 @@ class Storage(object):
# do this update.
atom_detail.update(conn.update_atom_details(atom_detail))
@lock_utils.read_locked
def get_atom_uuid(self, atom_name):
"""Gets an atoms uuid given a atoms name."""
with self._lock.read_lock():
ad = self._atomdetail_by_name(atom_name)
return ad.uuid
ad = self._atomdetail_by_name(atom_name)
return ad.uuid
@lock_utils.write_locked
def set_atom_state(self, atom_name, state):
"""Sets an atoms state."""
with self._lock.write_lock():
ad = self._atomdetail_by_name(atom_name)
ad.state = state
self._with_connection(self._save_atom_detail, ad)
ad = self._atomdetail_by_name(atom_name)
ad.state = state
self._with_connection(self._save_atom_detail, ad)
@lock_utils.read_locked
def get_atom_state(self, atom_name):
"""Gets the state of an atom given an atoms name."""
with self._lock.read_lock():
ad = self._atomdetail_by_name(atom_name)
return ad.state
ad = self._atomdetail_by_name(atom_name)
return ad.state
@lock_utils.write_locked
def set_atom_intention(self, atom_name, intention):
"""Sets the intention of an atom given an atoms name."""
with self._lock.write_lock():
ad = self._atomdetail_by_name(atom_name)
ad.intention = intention
self._with_connection(self._save_atom_detail, ad)
ad = self._atomdetail_by_name(atom_name)
ad.intention = intention
self._with_connection(self._save_atom_detail, ad)
@lock_utils.read_locked
def get_atom_intention(self, atom_name):
"""Gets the intention of an atom given an atoms name."""
with self._lock.read_lock():
ad = self._atomdetail_by_name(atom_name)
return ad.intention
ad = self._atomdetail_by_name(atom_name)
return ad.intention
@lock_utils.read_locked
def get_atoms_states(self, atom_names):
"""Gets all atoms states given a set of names."""
with self._lock.read_lock():
return dict((name, (self.get_atom_state(name),
self.get_atom_intention(name)))
for name in atom_names)
return dict((name, (self.get_atom_state(name),
self.get_atom_intention(name)))
for name in atom_names)
@lock_utils.write_locked
def _update_atom_metadata(self, atom_name, update_with,
expected_type=None):
with self._lock.write_lock():
ad = self._atomdetail_by_name(atom_name,
expected_type=expected_type)
if update_with:
ad.meta.update(update_with)
self._with_connection(self._save_atom_detail, ad)
ad = self._atomdetail_by_name(atom_name,
expected_type=expected_type)
if update_with:
ad.meta.update(update_with)
self._with_connection(self._save_atom_detail, ad)
def update_atom_metadata(self, atom_name, update_with):
"""Updates a atoms associated metadata.
@@ -372,20 +372,21 @@ class Storage(object):
self._update_atom_metadata(task_name, update_with,
expected_type=logbook.TaskDetail)
@lock_utils.read_locked
def get_task_progress(self, task_name):
"""Get the progress of a task given a tasks name.
:param task_name: tasks name
:returns: current task progress value
"""
with self._lock.read_lock():
ad = self._atomdetail_by_name(task_name,
expected_type=logbook.TaskDetail)
try:
return ad.meta['progress']
except KeyError:
return 0.0
ad = self._atomdetail_by_name(task_name,
expected_type=logbook.TaskDetail)
try:
return ad.meta['progress']
except KeyError:
return 0.0
@lock_utils.read_locked
def get_task_progress_details(self, task_name):
"""Get the progress details of a task given a tasks name.
@@ -393,13 +394,12 @@ class Storage(object):
:returns: None if progress_details not defined, else progress_details
dict
"""
with self._lock.read_lock():
ad = self._atomdetail_by_name(task_name,
expected_type=logbook.TaskDetail)
try:
return ad.meta['progress_details']
except KeyError:
return None
ad = self._atomdetail_by_name(task_name,
expected_type=logbook.TaskDetail)
try:
return ad.meta['progress_details']
except KeyError:
return None
def _check_all_results_provided(self, atom_name, container):
"""Warn if an atom did not provide some of its expected results.
@@ -418,76 +418,75 @@ class Storage(object):
LOG.warning("Atom %s did not supply result "
"with index %r (name %s)", atom_name, index, name)
@lock_utils.write_locked
def save(self, atom_name, data, state=states.SUCCESS):
"""Put result for atom with id 'uuid' to storage."""
with self._lock.write_lock():
ad = self._atomdetail_by_name(atom_name)
ad.put(state, data)
if state == states.FAILURE and isinstance(data, failure.Failure):
# NOTE(imelnikov): failure serialization looses information,
# so we cache failures here, in atom name -> failure mapping.
self._failures[ad.name] = data
else:
self._check_all_results_provided(ad.name, data)
self._with_connection(self._save_atom_detail, ad)
ad = self._atomdetail_by_name(atom_name)
ad.put(state, data)
if state == states.FAILURE and isinstance(data, failure.Failure):
# NOTE(imelnikov): failure serialization looses information,
# so we cache failures here, in atom name -> failure mapping.
self._failures[ad.name] = data
else:
self._check_all_results_provided(ad.name, data)
self._with_connection(self._save_atom_detail, ad)
@lock_utils.write_locked
def save_retry_failure(self, retry_name, failed_atom_name, failure):
"""Save subflow failure to retry controller history."""
with self._lock.write_lock():
ad = self._atomdetail_by_name(retry_name,
expected_type=logbook.RetryDetail)
try:
failures = ad.last_failures
except exceptions.NotFound as e:
raise exceptions.StorageFailure("Unable to fetch most recent"
" retry failures so new retry"
" failure can be inserted", e)
else:
if failed_atom_name not in failures:
failures[failed_atom_name] = failure
self._with_connection(self._save_atom_detail, ad)
ad = self._atomdetail_by_name(retry_name,
expected_type=logbook.RetryDetail)
try:
failures = ad.last_failures
except exceptions.NotFound as e:
raise exceptions.StorageFailure("Unable to fetch most recent"
" retry failures so new retry"
" failure can be inserted", e)
else:
if failed_atom_name not in failures:
failures[failed_atom_name] = failure
self._with_connection(self._save_atom_detail, ad)
@lock_utils.write_locked
def cleanup_retry_history(self, retry_name, state):
"""Cleanup history of retry atom with given name."""
with self._lock.write_lock():
ad = self._atomdetail_by_name(retry_name,
expected_type=logbook.RetryDetail)
ad.state = state
ad.results = []
self._with_connection(self._save_atom_detail, ad)
ad = self._atomdetail_by_name(retry_name,
expected_type=logbook.RetryDetail)
ad.state = state
ad.results = []
self._with_connection(self._save_atom_detail, ad)
@lock_utils.read_locked
def _get(self, atom_name, only_last=False):
with self._lock.read_lock():
ad = self._atomdetail_by_name(atom_name)
if ad.failure is not None:
cached = self._failures.get(atom_name)
if ad.failure.matches(cached):
return cached
return ad.failure
if ad.state not in STATES_WITH_RESULTS:
raise exceptions.NotFound("Result for atom %s is not currently"
" known" % atom_name)
if only_last:
return ad.last_results
else:
return ad.results
ad = self._atomdetail_by_name(atom_name)
if ad.failure is not None:
cached = self._failures.get(atom_name)
if ad.failure.matches(cached):
return cached
return ad.failure
if ad.state not in STATES_WITH_RESULTS:
raise exceptions.NotFound("Result for atom %s is not currently"
" known" % atom_name)
if only_last:
return ad.last_results
else:
return ad.results
def get(self, atom_name):
"""Gets the results for an atom with a given name from storage."""
return self._get(atom_name)
@lock_utils.read_locked
def get_failures(self):
"""Get list of failures that happened with this flow.
No order guaranteed.
"""
with self._lock.read_lock():
return self._failures.copy()
return self._failures.copy()
def has_failures(self):
"""Returns True if there are failed tasks in the storage."""
with self._lock.read_lock():
return bool(self._failures)
return bool(self._failures)
def _reset_atom(self, ad, state):
if ad.name == self.injector_name:
@@ -498,12 +497,12 @@ class Storage(object):
self._failures.pop(ad.name, None)
return True
@lock_utils.write_locked
def reset(self, atom_name, state=states.PENDING):
"""Reset atom with given name (if the task is in a given state)."""
with self._lock.write_lock():
ad = self._atomdetail_by_name(atom_name)
if self._reset_atom(ad, state):
self._with_connection(self._save_atom_detail, ad)
ad = self._atomdetail_by_name(atom_name)
if self._reset_atom(ad, state):
self._with_connection(self._save_atom_detail, ad)
def inject_atom_args(self, atom_name, pairs):
"""Add **transient** values into storage for a specific atom only.
@@ -541,6 +540,7 @@ class Storage(object):
self._injected_args.setdefault(atom_name, {})
self._injected_args[atom_name].update(pairs)
@lock_utils.write_locked
def inject(self, pairs, transient=False):
"""Add values into storage.
@@ -589,13 +589,12 @@ class Storage(object):
self._transients.update(pairs)
return (_TRANSIENT_PROVIDER, six.iterkeys(self._transients))
with self._lock.write_lock():
if transient:
provider_name, names = save_transient()
else:
provider_name, names = save_persistent()
self._set_result_mapping(provider_name,
dict((name, name) for name in names))
if transient:
provider_name, names = save_transient()
else:
provider_name, names = save_persistent()
self._set_result_mapping(provider_name,
dict((name, name) for name in names))
def _set_result_mapping(self, provider_name, mapping):
"""Sets the result mapping for a given producer.
@@ -615,6 +614,7 @@ class Storage(object):
if provider not in entries:
entries.append(provider)
@lock_utils.read_locked
def fetch(self, name, many_handler=None):
"""Fetch a named result."""
# By default we just return the first of many (unless provided
@@ -622,32 +622,32 @@ class Storage(object):
# more meaningful).
if many_handler is None:
many_handler = lambda values: values[0]
with self._lock.read_lock():
try:
providers = self._reverse_mapping[name]
except KeyError:
raise exceptions.NotFound("Name %r is not mapped as a"
" produced output by any"
" providers" % name)
values = []
for provider in providers:
if provider.name is _TRANSIENT_PROVIDER:
values.append(_item_from_single(provider,
self._transients, name))
else:
try:
container = self._get(provider.name, only_last=True)
except exceptions.NotFound:
pass
else:
values.append(_item_from_single(provider,
container, name))
if not values:
raise exceptions.NotFound("Unable to find result %r,"
" searched %s" % (name, providers))
try:
providers = self._reverse_mapping[name]
except KeyError:
raise exceptions.NotFound("Name %r is not mapped as a"
" produced output by any"
" providers" % name)
values = []
for provider in providers:
if provider.name is _TRANSIENT_PROVIDER:
values.append(_item_from_single(provider,
self._transients, name))
else:
return many_handler(values)
try:
container = self._get(provider.name, only_last=True)
except exceptions.NotFound:
pass
else:
values.append(_item_from_single(provider,
container, name))
if not values:
raise exceptions.NotFound("Unable to find result %r,"
" searched %s" % (name, providers))
else:
return many_handler(values)
@lock_utils.read_locked
def fetch_all(self):
"""Fetch all named results known so far.
@@ -657,15 +657,15 @@ class Storage(object):
if len(values) > 1:
return values
return values[0]
with self._lock.read_lock():
results = {}
for name in six.iterkeys(self._reverse_mapping):
try:
results[name] = self.fetch(name, many_handler=many_handler)
except exceptions.NotFound:
pass
return results
results = {}
for name in six.iterkeys(self._reverse_mapping):
try:
results[name] = self.fetch(name, many_handler=many_handler)
except exceptions.NotFound:
pass
return results
@lock_utils.read_locked
def fetch_mapped_args(self, args_mapping,
atom_name=None, scope_walker=None,
optional_args=None):
@@ -708,66 +708,65 @@ class Storage(object):
for p in providers]
return []
with self._lock.read_lock():
if optional_args is None:
optional_args = []
if atom_name and atom_name not in self._atom_name_to_uuid:
raise exceptions.NotFound("Unknown atom name: %s" % atom_name)
if not args_mapping:
return {}
if atom_name:
injected_args = self._injected_args.get(atom_name, {})
else:
injected_args = {}
mapped_args = {}
for (bound_name, name) in six.iteritems(args_mapping):
if LOG.isEnabledFor(logging.BLATHER):
if atom_name:
LOG.blather("Looking for %r <= %r for atom named: %s",
bound_name, name, atom_name)
else:
LOG.blather("Looking for %r <= %r", bound_name, name)
if name in injected_args:
value = injected_args[name]
mapped_args[bound_name] = value
LOG.blather("Matched %r <= %r to %r (from injected"
" values)", bound_name, name, value)
if optional_args is None:
optional_args = []
if atom_name and atom_name not in self._atom_name_to_uuid:
raise exceptions.NotFound("Unknown atom name: %s" % atom_name)
if not args_mapping:
return {}
if atom_name:
injected_args = self._injected_args.get(atom_name, {})
else:
injected_args = {}
mapped_args = {}
for (bound_name, name) in six.iteritems(args_mapping):
if LOG.isEnabledFor(logging.BLATHER):
if atom_name:
LOG.blather("Looking for %r <= %r for atom named: %s",
bound_name, name, atom_name)
else:
try:
possible_providers = self._reverse_mapping[name]
except KeyError:
if bound_name in optional_args:
continue
raise exceptions.NotFound("Name %r is not mapped as a"
" produced output by any"
" providers" % name)
# Reduce the possible providers to one that are allowed.
providers = _locate_providers(name, possible_providers)
if not providers:
raise exceptions.NotFound(
"Mapped argument %r <= %r was not produced"
" by any accessible provider (%s possible"
" providers were scanned)"
% (bound_name, name, len(possible_providers)))
provider, value = _item_from_first_of(providers, name)
mapped_args[bound_name] = value
LOG.blather("Matched %r <= %r to %r (from %s)",
bound_name, name, value, provider)
return mapped_args
LOG.blather("Looking for %r <= %r", bound_name, name)
if name in injected_args:
value = injected_args[name]
mapped_args[bound_name] = value
LOG.blather("Matched %r <= %r to %r (from injected"
" values)", bound_name, name, value)
else:
try:
possible_providers = self._reverse_mapping[name]
except KeyError:
if bound_name in optional_args:
continue
raise exceptions.NotFound("Name %r is not mapped as a"
" produced output by any"
" providers" % name)
# Reduce the possible providers to one that are allowed.
providers = _locate_providers(name, possible_providers)
if not providers:
raise exceptions.NotFound(
"Mapped argument %r <= %r was not produced"
" by any accessible provider (%s possible"
" providers were scanned)"
% (bound_name, name, len(possible_providers)))
provider, value = _item_from_first_of(providers, name)
mapped_args[bound_name] = value
LOG.blather("Matched %r <= %r to %r (from %s)",
bound_name, name, value, provider)
return mapped_args
@lock_utils.write_locked
def set_flow_state(self, state):
"""Set flow details state and save it."""
with self._lock.write_lock():
self._flowdetail.state = state
self._with_connection(self._save_flow_detail)
self._flowdetail.state = state
self._with_connection(self._save_flow_detail)
@lock_utils.read_locked
def get_flow_state(self):
"""Get state from flow details."""
with self._lock.read_lock():
state = self._flowdetail.state
if state is None:
state = states.PENDING
return state
state = self._flowdetail.state
if state is None:
state = states.PENDING
return state
def _translate_into_history(self, ad):
failure = None
@@ -782,19 +781,19 @@ class Storage(object):
failure = ad.failure
return retry.History(ad.results, failure=failure)
@lock_utils.read_locked
def get_retry_history(self, retry_name):
"""Fetch a single retrys history."""
with self._lock.read_lock():
ad = self._atomdetail_by_name(retry_name,
expected_type=logbook.RetryDetail)
return self._translate_into_history(ad)
ad = self._atomdetail_by_name(retry_name,
expected_type=logbook.RetryDetail)
return self._translate_into_history(ad)
@lock_utils.read_locked
def get_retry_histories(self):
"""Fetch all retrys histories."""
histories = []
with self._lock.read_lock():
for ad in self._flowdetail:
if isinstance(ad, logbook.RetryDetail):
histories.append((ad.name,
self._translate_into_history(ad)))
for ad in self._flowdetail:
if isinstance(ad, logbook.RetryDetail):
histories.append((ad.name,
self._translate_into_history(ad)))
return histories

View File

@@ -64,15 +64,10 @@ def locked(*args, **kwargs):
activates the given lock or list of locks as a context manager,
automatically releasing that lock on exit.
NOTE(harlowja): if no attribute is provided then by default the attribute
named '_lock' is looked for in the instance object this decorator is
attached to.
NOTE(harlowja): when we get the wrapt module approved we can address the
correctness of this decorator with regards to classmethods, to keep sanity
and correctness it is recommended to avoid using this on classmethods, once
https://review.openstack.org/#/c/94754/ is merged this will be refactored
and that use-case can be provided in a correct manner.
NOTE(harlowja): if no attribute name is provided then by default the
attribute named '_lock' is looked for (this attribute is expected to be
the lock/list of locks object/s) in the instance object this decorator
is attached to.
"""
def decorator(f):
@@ -101,6 +96,66 @@ def locked(*args, **kwargs):
return decorator
def read_locked(*args, **kwargs):
"""Acquires & releases a read lock around call into decorated method.
NOTE(harlowja): if no attribute name is provided then by default the
attribute named '_lock' is looked for (this attribute is expected to be
the rw-lock object) in the instance object this decorator is attached to.
"""
def decorator(f):
attr_name = kwargs.get('lock', '_lock')
@six.wraps(f)
def wrapper(self, *args, **kwargs):
rw_lock = getattr(self, attr_name)
with rw_lock.read_lock():
return f(self, *args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if len(args) == 1:
return decorator(args[0])
else:
return decorator
def write_locked(*args, **kwargs):
"""Acquires & releases a write lock around call into decorated method.
NOTE(harlowja): if no attribute name is provided then by default the
attribute named '_lock' is looked for (this attribute is expected to be
the rw-lock object) in the instance object this decorator is attached to.
"""
def decorator(f):
attr_name = kwargs.get('lock', '_lock')
@six.wraps(f)
def wrapper(self, *args, **kwargs):
rw_lock = getattr(self, attr_name)
with rw_lock.write_lock():
return f(self, *args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if len(args) == 1:
return decorator(args[0])
else:
return decorator
class ReaderWriterLock(object):
"""A reader/writer lock.