From 20fdbba14188c384dc0365d6451834b0887a6835 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Sun, 8 Mar 2015 18:28:44 -0700 Subject: [PATCH] Add + use read/write lock decorators For when a whole function should be locked by a read or write lock it is quite useful to have decorators that acquire and release and wrap the target function with the provided lock. This change switches the storage module to use these decorators where appropriate (mainly in places where the read or write lock is activated for the duration of the functions call, at which point it is clearer to just use the new decorators instead). Change-Id: I70d2c1ab478e9b7da9446482a4ffb28f6f5227b7 --- taskflow/storage.py | 385 +++++++++++++++++------------------ taskflow/utils/lock_utils.py | 73 ++++++- 2 files changed, 256 insertions(+), 202 deletions(-) diff --git a/taskflow/storage.py b/taskflow/storage.py index 4207a662..449aafb6 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -290,53 +290,53 @@ class Storage(object): # do this update. atom_detail.update(conn.update_atom_details(atom_detail)) + @lock_utils.read_locked def get_atom_uuid(self, atom_name): """Gets an atoms uuid given a atoms name.""" - with self._lock.read_lock(): - ad = self._atomdetail_by_name(atom_name) - return ad.uuid + ad = self._atomdetail_by_name(atom_name) + return ad.uuid + @lock_utils.write_locked def set_atom_state(self, atom_name, state): """Sets an atoms state.""" - with self._lock.write_lock(): - ad = self._atomdetail_by_name(atom_name) - ad.state = state - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(atom_name) + ad.state = state + self._with_connection(self._save_atom_detail, ad) + @lock_utils.read_locked def get_atom_state(self, atom_name): """Gets the state of an atom given an atoms name.""" - with self._lock.read_lock(): - ad = self._atomdetail_by_name(atom_name) - return ad.state + ad = self._atomdetail_by_name(atom_name) + return ad.state + @lock_utils.write_locked def set_atom_intention(self, atom_name, intention): """Sets the intention of an atom given an atoms name.""" - with self._lock.write_lock(): - ad = self._atomdetail_by_name(atom_name) - ad.intention = intention - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(atom_name) + ad.intention = intention + self._with_connection(self._save_atom_detail, ad) + @lock_utils.read_locked def get_atom_intention(self, atom_name): """Gets the intention of an atom given an atoms name.""" - with self._lock.read_lock(): - ad = self._atomdetail_by_name(atom_name) - return ad.intention + ad = self._atomdetail_by_name(atom_name) + return ad.intention + @lock_utils.read_locked def get_atoms_states(self, atom_names): """Gets all atoms states given a set of names.""" - with self._lock.read_lock(): - return dict((name, (self.get_atom_state(name), - self.get_atom_intention(name))) - for name in atom_names) + return dict((name, (self.get_atom_state(name), + self.get_atom_intention(name))) + for name in atom_names) + @lock_utils.write_locked def _update_atom_metadata(self, atom_name, update_with, expected_type=None): - with self._lock.write_lock(): - ad = self._atomdetail_by_name(atom_name, - expected_type=expected_type) - if update_with: - ad.meta.update(update_with) - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(atom_name, + expected_type=expected_type) + if update_with: + ad.meta.update(update_with) + self._with_connection(self._save_atom_detail, ad) def update_atom_metadata(self, atom_name, update_with): """Updates a atoms associated metadata. @@ -372,20 +372,21 @@ class Storage(object): self._update_atom_metadata(task_name, update_with, expected_type=logbook.TaskDetail) + @lock_utils.read_locked def get_task_progress(self, task_name): """Get the progress of a task given a tasks name. :param task_name: tasks name :returns: current task progress value """ - with self._lock.read_lock(): - ad = self._atomdetail_by_name(task_name, - expected_type=logbook.TaskDetail) - try: - return ad.meta['progress'] - except KeyError: - return 0.0 + ad = self._atomdetail_by_name(task_name, + expected_type=logbook.TaskDetail) + try: + return ad.meta['progress'] + except KeyError: + return 0.0 + @lock_utils.read_locked def get_task_progress_details(self, task_name): """Get the progress details of a task given a tasks name. @@ -393,13 +394,12 @@ class Storage(object): :returns: None if progress_details not defined, else progress_details dict """ - with self._lock.read_lock(): - ad = self._atomdetail_by_name(task_name, - expected_type=logbook.TaskDetail) - try: - return ad.meta['progress_details'] - except KeyError: - return None + ad = self._atomdetail_by_name(task_name, + expected_type=logbook.TaskDetail) + try: + return ad.meta['progress_details'] + except KeyError: + return None def _check_all_results_provided(self, atom_name, container): """Warn if an atom did not provide some of its expected results. @@ -418,76 +418,75 @@ class Storage(object): LOG.warning("Atom %s did not supply result " "with index %r (name %s)", atom_name, index, name) + @lock_utils.write_locked def save(self, atom_name, data, state=states.SUCCESS): """Put result for atom with id 'uuid' to storage.""" - with self._lock.write_lock(): - ad = self._atomdetail_by_name(atom_name) - ad.put(state, data) - if state == states.FAILURE and isinstance(data, failure.Failure): - # NOTE(imelnikov): failure serialization looses information, - # so we cache failures here, in atom name -> failure mapping. - self._failures[ad.name] = data - else: - self._check_all_results_provided(ad.name, data) - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(atom_name) + ad.put(state, data) + if state == states.FAILURE and isinstance(data, failure.Failure): + # NOTE(imelnikov): failure serialization looses information, + # so we cache failures here, in atom name -> failure mapping. + self._failures[ad.name] = data + else: + self._check_all_results_provided(ad.name, data) + self._with_connection(self._save_atom_detail, ad) + @lock_utils.write_locked def save_retry_failure(self, retry_name, failed_atom_name, failure): """Save subflow failure to retry controller history.""" - with self._lock.write_lock(): - ad = self._atomdetail_by_name(retry_name, - expected_type=logbook.RetryDetail) - try: - failures = ad.last_failures - except exceptions.NotFound as e: - raise exceptions.StorageFailure("Unable to fetch most recent" - " retry failures so new retry" - " failure can be inserted", e) - else: - if failed_atom_name not in failures: - failures[failed_atom_name] = failure - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(retry_name, + expected_type=logbook.RetryDetail) + try: + failures = ad.last_failures + except exceptions.NotFound as e: + raise exceptions.StorageFailure("Unable to fetch most recent" + " retry failures so new retry" + " failure can be inserted", e) + else: + if failed_atom_name not in failures: + failures[failed_atom_name] = failure + self._with_connection(self._save_atom_detail, ad) + @lock_utils.write_locked def cleanup_retry_history(self, retry_name, state): """Cleanup history of retry atom with given name.""" - with self._lock.write_lock(): - ad = self._atomdetail_by_name(retry_name, - expected_type=logbook.RetryDetail) - ad.state = state - ad.results = [] - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(retry_name, + expected_type=logbook.RetryDetail) + ad.state = state + ad.results = [] + self._with_connection(self._save_atom_detail, ad) + @lock_utils.read_locked def _get(self, atom_name, only_last=False): - with self._lock.read_lock(): - ad = self._atomdetail_by_name(atom_name) - if ad.failure is not None: - cached = self._failures.get(atom_name) - if ad.failure.matches(cached): - return cached - return ad.failure - if ad.state not in STATES_WITH_RESULTS: - raise exceptions.NotFound("Result for atom %s is not currently" - " known" % atom_name) - if only_last: - return ad.last_results - else: - return ad.results + ad = self._atomdetail_by_name(atom_name) + if ad.failure is not None: + cached = self._failures.get(atom_name) + if ad.failure.matches(cached): + return cached + return ad.failure + if ad.state not in STATES_WITH_RESULTS: + raise exceptions.NotFound("Result for atom %s is not currently" + " known" % atom_name) + if only_last: + return ad.last_results + else: + return ad.results def get(self, atom_name): """Gets the results for an atom with a given name from storage.""" return self._get(atom_name) + @lock_utils.read_locked def get_failures(self): """Get list of failures that happened with this flow. No order guaranteed. """ - with self._lock.read_lock(): - return self._failures.copy() + return self._failures.copy() def has_failures(self): """Returns True if there are failed tasks in the storage.""" - with self._lock.read_lock(): - return bool(self._failures) + return bool(self._failures) def _reset_atom(self, ad, state): if ad.name == self.injector_name: @@ -498,12 +497,12 @@ class Storage(object): self._failures.pop(ad.name, None) return True + @lock_utils.write_locked def reset(self, atom_name, state=states.PENDING): """Reset atom with given name (if the task is in a given state).""" - with self._lock.write_lock(): - ad = self._atomdetail_by_name(atom_name) - if self._reset_atom(ad, state): - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(atom_name) + if self._reset_atom(ad, state): + self._with_connection(self._save_atom_detail, ad) def inject_atom_args(self, atom_name, pairs): """Add **transient** values into storage for a specific atom only. @@ -541,6 +540,7 @@ class Storage(object): self._injected_args.setdefault(atom_name, {}) self._injected_args[atom_name].update(pairs) + @lock_utils.write_locked def inject(self, pairs, transient=False): """Add values into storage. @@ -589,13 +589,12 @@ class Storage(object): self._transients.update(pairs) return (_TRANSIENT_PROVIDER, six.iterkeys(self._transients)) - with self._lock.write_lock(): - if transient: - provider_name, names = save_transient() - else: - provider_name, names = save_persistent() - self._set_result_mapping(provider_name, - dict((name, name) for name in names)) + if transient: + provider_name, names = save_transient() + else: + provider_name, names = save_persistent() + self._set_result_mapping(provider_name, + dict((name, name) for name in names)) def _set_result_mapping(self, provider_name, mapping): """Sets the result mapping for a given producer. @@ -615,6 +614,7 @@ class Storage(object): if provider not in entries: entries.append(provider) + @lock_utils.read_locked def fetch(self, name, many_handler=None): """Fetch a named result.""" # By default we just return the first of many (unless provided @@ -622,32 +622,32 @@ class Storage(object): # more meaningful). if many_handler is None: many_handler = lambda values: values[0] - with self._lock.read_lock(): - try: - providers = self._reverse_mapping[name] - except KeyError: - raise exceptions.NotFound("Name %r is not mapped as a" - " produced output by any" - " providers" % name) - values = [] - for provider in providers: - if provider.name is _TRANSIENT_PROVIDER: - values.append(_item_from_single(provider, - self._transients, name)) - else: - try: - container = self._get(provider.name, only_last=True) - except exceptions.NotFound: - pass - else: - values.append(_item_from_single(provider, - container, name)) - if not values: - raise exceptions.NotFound("Unable to find result %r," - " searched %s" % (name, providers)) + try: + providers = self._reverse_mapping[name] + except KeyError: + raise exceptions.NotFound("Name %r is not mapped as a" + " produced output by any" + " providers" % name) + values = [] + for provider in providers: + if provider.name is _TRANSIENT_PROVIDER: + values.append(_item_from_single(provider, + self._transients, name)) else: - return many_handler(values) + try: + container = self._get(provider.name, only_last=True) + except exceptions.NotFound: + pass + else: + values.append(_item_from_single(provider, + container, name)) + if not values: + raise exceptions.NotFound("Unable to find result %r," + " searched %s" % (name, providers)) + else: + return many_handler(values) + @lock_utils.read_locked def fetch_all(self): """Fetch all named results known so far. @@ -657,15 +657,15 @@ class Storage(object): if len(values) > 1: return values return values[0] - with self._lock.read_lock(): - results = {} - for name in six.iterkeys(self._reverse_mapping): - try: - results[name] = self.fetch(name, many_handler=many_handler) - except exceptions.NotFound: - pass - return results + results = {} + for name in six.iterkeys(self._reverse_mapping): + try: + results[name] = self.fetch(name, many_handler=many_handler) + except exceptions.NotFound: + pass + return results + @lock_utils.read_locked def fetch_mapped_args(self, args_mapping, atom_name=None, scope_walker=None, optional_args=None): @@ -708,66 +708,65 @@ class Storage(object): for p in providers] return [] - with self._lock.read_lock(): - if optional_args is None: - optional_args = [] - if atom_name and atom_name not in self._atom_name_to_uuid: - raise exceptions.NotFound("Unknown atom name: %s" % atom_name) - if not args_mapping: - return {} - if atom_name: - injected_args = self._injected_args.get(atom_name, {}) - else: - injected_args = {} - mapped_args = {} - for (bound_name, name) in six.iteritems(args_mapping): - if LOG.isEnabledFor(logging.BLATHER): - if atom_name: - LOG.blather("Looking for %r <= %r for atom named: %s", - bound_name, name, atom_name) - else: - LOG.blather("Looking for %r <= %r", bound_name, name) - if name in injected_args: - value = injected_args[name] - mapped_args[bound_name] = value - LOG.blather("Matched %r <= %r to %r (from injected" - " values)", bound_name, name, value) + if optional_args is None: + optional_args = [] + if atom_name and atom_name not in self._atom_name_to_uuid: + raise exceptions.NotFound("Unknown atom name: %s" % atom_name) + if not args_mapping: + return {} + if atom_name: + injected_args = self._injected_args.get(atom_name, {}) + else: + injected_args = {} + mapped_args = {} + for (bound_name, name) in six.iteritems(args_mapping): + if LOG.isEnabledFor(logging.BLATHER): + if atom_name: + LOG.blather("Looking for %r <= %r for atom named: %s", + bound_name, name, atom_name) else: - try: - possible_providers = self._reverse_mapping[name] - except KeyError: - if bound_name in optional_args: - continue - raise exceptions.NotFound("Name %r is not mapped as a" - " produced output by any" - " providers" % name) - # Reduce the possible providers to one that are allowed. - providers = _locate_providers(name, possible_providers) - if not providers: - raise exceptions.NotFound( - "Mapped argument %r <= %r was not produced" - " by any accessible provider (%s possible" - " providers were scanned)" - % (bound_name, name, len(possible_providers))) - provider, value = _item_from_first_of(providers, name) - mapped_args[bound_name] = value - LOG.blather("Matched %r <= %r to %r (from %s)", - bound_name, name, value, provider) - return mapped_args + LOG.blather("Looking for %r <= %r", bound_name, name) + if name in injected_args: + value = injected_args[name] + mapped_args[bound_name] = value + LOG.blather("Matched %r <= %r to %r (from injected" + " values)", bound_name, name, value) + else: + try: + possible_providers = self._reverse_mapping[name] + except KeyError: + if bound_name in optional_args: + continue + raise exceptions.NotFound("Name %r is not mapped as a" + " produced output by any" + " providers" % name) + # Reduce the possible providers to one that are allowed. + providers = _locate_providers(name, possible_providers) + if not providers: + raise exceptions.NotFound( + "Mapped argument %r <= %r was not produced" + " by any accessible provider (%s possible" + " providers were scanned)" + % (bound_name, name, len(possible_providers))) + provider, value = _item_from_first_of(providers, name) + mapped_args[bound_name] = value + LOG.blather("Matched %r <= %r to %r (from %s)", + bound_name, name, value, provider) + return mapped_args + @lock_utils.write_locked def set_flow_state(self, state): """Set flow details state and save it.""" - with self._lock.write_lock(): - self._flowdetail.state = state - self._with_connection(self._save_flow_detail) + self._flowdetail.state = state + self._with_connection(self._save_flow_detail) + @lock_utils.read_locked def get_flow_state(self): """Get state from flow details.""" - with self._lock.read_lock(): - state = self._flowdetail.state - if state is None: - state = states.PENDING - return state + state = self._flowdetail.state + if state is None: + state = states.PENDING + return state def _translate_into_history(self, ad): failure = None @@ -782,19 +781,19 @@ class Storage(object): failure = ad.failure return retry.History(ad.results, failure=failure) + @lock_utils.read_locked def get_retry_history(self, retry_name): """Fetch a single retrys history.""" - with self._lock.read_lock(): - ad = self._atomdetail_by_name(retry_name, - expected_type=logbook.RetryDetail) - return self._translate_into_history(ad) + ad = self._atomdetail_by_name(retry_name, + expected_type=logbook.RetryDetail) + return self._translate_into_history(ad) + @lock_utils.read_locked def get_retry_histories(self): """Fetch all retrys histories.""" histories = [] - with self._lock.read_lock(): - for ad in self._flowdetail: - if isinstance(ad, logbook.RetryDetail): - histories.append((ad.name, - self._translate_into_history(ad))) + for ad in self._flowdetail: + if isinstance(ad, logbook.RetryDetail): + histories.append((ad.name, + self._translate_into_history(ad))) return histories diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py index d7312088..726e10f9 100644 --- a/taskflow/utils/lock_utils.py +++ b/taskflow/utils/lock_utils.py @@ -64,15 +64,10 @@ def locked(*args, **kwargs): activates the given lock or list of locks as a context manager, automatically releasing that lock on exit. - NOTE(harlowja): if no attribute is provided then by default the attribute - named '_lock' is looked for in the instance object this decorator is - attached to. - - NOTE(harlowja): when we get the wrapt module approved we can address the - correctness of this decorator with regards to classmethods, to keep sanity - and correctness it is recommended to avoid using this on classmethods, once - https://review.openstack.org/#/c/94754/ is merged this will be refactored - and that use-case can be provided in a correct manner. + NOTE(harlowja): if no attribute name is provided then by default the + attribute named '_lock' is looked for (this attribute is expected to be + the lock/list of locks object/s) in the instance object this decorator + is attached to. """ def decorator(f): @@ -101,6 +96,66 @@ def locked(*args, **kwargs): return decorator +def read_locked(*args, **kwargs): + """Acquires & releases a read lock around call into decorated method. + + NOTE(harlowja): if no attribute name is provided then by default the + attribute named '_lock' is looked for (this attribute is expected to be + the rw-lock object) in the instance object this decorator is attached to. + """ + + def decorator(f): + attr_name = kwargs.get('lock', '_lock') + + @six.wraps(f) + def wrapper(self, *args, **kwargs): + rw_lock = getattr(self, attr_name) + with rw_lock.read_lock(): + return f(self, *args, **kwargs) + + return wrapper + + # This is needed to handle when the decorator has args or the decorator + # doesn't have args, python is rather weird here... + if kwargs or not args: + return decorator + else: + if len(args) == 1: + return decorator(args[0]) + else: + return decorator + + +def write_locked(*args, **kwargs): + """Acquires & releases a write lock around call into decorated method. + + NOTE(harlowja): if no attribute name is provided then by default the + attribute named '_lock' is looked for (this attribute is expected to be + the rw-lock object) in the instance object this decorator is attached to. + """ + + def decorator(f): + attr_name = kwargs.get('lock', '_lock') + + @six.wraps(f) + def wrapper(self, *args, **kwargs): + rw_lock = getattr(self, attr_name) + with rw_lock.write_lock(): + return f(self, *args, **kwargs) + + return wrapper + + # This is needed to handle when the decorator has args or the decorator + # doesn't have args, python is rather weird here... + if kwargs or not args: + return decorator + else: + if len(args) == 1: + return decorator(args[0]) + else: + return decorator + + class ReaderWriterLock(object): """A reader/writer lock.