diff --git a/taskflow/storage.py b/taskflow/storage.py index 4207a662..449aafb6 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -290,53 +290,53 @@ class Storage(object): # do this update. atom_detail.update(conn.update_atom_details(atom_detail)) + @lock_utils.read_locked def get_atom_uuid(self, atom_name): """Gets an atoms uuid given a atoms name.""" - with self._lock.read_lock(): - ad = self._atomdetail_by_name(atom_name) - return ad.uuid + ad = self._atomdetail_by_name(atom_name) + return ad.uuid + @lock_utils.write_locked def set_atom_state(self, atom_name, state): """Sets an atoms state.""" - with self._lock.write_lock(): - ad = self._atomdetail_by_name(atom_name) - ad.state = state - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(atom_name) + ad.state = state + self._with_connection(self._save_atom_detail, ad) + @lock_utils.read_locked def get_atom_state(self, atom_name): """Gets the state of an atom given an atoms name.""" - with self._lock.read_lock(): - ad = self._atomdetail_by_name(atom_name) - return ad.state + ad = self._atomdetail_by_name(atom_name) + return ad.state + @lock_utils.write_locked def set_atom_intention(self, atom_name, intention): """Sets the intention of an atom given an atoms name.""" - with self._lock.write_lock(): - ad = self._atomdetail_by_name(atom_name) - ad.intention = intention - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(atom_name) + ad.intention = intention + self._with_connection(self._save_atom_detail, ad) + @lock_utils.read_locked def get_atom_intention(self, atom_name): """Gets the intention of an atom given an atoms name.""" - with self._lock.read_lock(): - ad = self._atomdetail_by_name(atom_name) - return ad.intention + ad = self._atomdetail_by_name(atom_name) + return ad.intention + @lock_utils.read_locked def get_atoms_states(self, atom_names): """Gets all atoms states given a set of names.""" - with self._lock.read_lock(): - return dict((name, (self.get_atom_state(name), - self.get_atom_intention(name))) - for name in atom_names) + return dict((name, (self.get_atom_state(name), + self.get_atom_intention(name))) + for name in atom_names) + @lock_utils.write_locked def _update_atom_metadata(self, atom_name, update_with, expected_type=None): - with self._lock.write_lock(): - ad = self._atomdetail_by_name(atom_name, - expected_type=expected_type) - if update_with: - ad.meta.update(update_with) - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(atom_name, + expected_type=expected_type) + if update_with: + ad.meta.update(update_with) + self._with_connection(self._save_atom_detail, ad) def update_atom_metadata(self, atom_name, update_with): """Updates a atoms associated metadata. @@ -372,20 +372,21 @@ class Storage(object): self._update_atom_metadata(task_name, update_with, expected_type=logbook.TaskDetail) + @lock_utils.read_locked def get_task_progress(self, task_name): """Get the progress of a task given a tasks name. :param task_name: tasks name :returns: current task progress value """ - with self._lock.read_lock(): - ad = self._atomdetail_by_name(task_name, - expected_type=logbook.TaskDetail) - try: - return ad.meta['progress'] - except KeyError: - return 0.0 + ad = self._atomdetail_by_name(task_name, + expected_type=logbook.TaskDetail) + try: + return ad.meta['progress'] + except KeyError: + return 0.0 + @lock_utils.read_locked def get_task_progress_details(self, task_name): """Get the progress details of a task given a tasks name. @@ -393,13 +394,12 @@ class Storage(object): :returns: None if progress_details not defined, else progress_details dict """ - with self._lock.read_lock(): - ad = self._atomdetail_by_name(task_name, - expected_type=logbook.TaskDetail) - try: - return ad.meta['progress_details'] - except KeyError: - return None + ad = self._atomdetail_by_name(task_name, + expected_type=logbook.TaskDetail) + try: + return ad.meta['progress_details'] + except KeyError: + return None def _check_all_results_provided(self, atom_name, container): """Warn if an atom did not provide some of its expected results. @@ -418,76 +418,75 @@ class Storage(object): LOG.warning("Atom %s did not supply result " "with index %r (name %s)", atom_name, index, name) + @lock_utils.write_locked def save(self, atom_name, data, state=states.SUCCESS): """Put result for atom with id 'uuid' to storage.""" - with self._lock.write_lock(): - ad = self._atomdetail_by_name(atom_name) - ad.put(state, data) - if state == states.FAILURE and isinstance(data, failure.Failure): - # NOTE(imelnikov): failure serialization looses information, - # so we cache failures here, in atom name -> failure mapping. - self._failures[ad.name] = data - else: - self._check_all_results_provided(ad.name, data) - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(atom_name) + ad.put(state, data) + if state == states.FAILURE and isinstance(data, failure.Failure): + # NOTE(imelnikov): failure serialization looses information, + # so we cache failures here, in atom name -> failure mapping. + self._failures[ad.name] = data + else: + self._check_all_results_provided(ad.name, data) + self._with_connection(self._save_atom_detail, ad) + @lock_utils.write_locked def save_retry_failure(self, retry_name, failed_atom_name, failure): """Save subflow failure to retry controller history.""" - with self._lock.write_lock(): - ad = self._atomdetail_by_name(retry_name, - expected_type=logbook.RetryDetail) - try: - failures = ad.last_failures - except exceptions.NotFound as e: - raise exceptions.StorageFailure("Unable to fetch most recent" - " retry failures so new retry" - " failure can be inserted", e) - else: - if failed_atom_name not in failures: - failures[failed_atom_name] = failure - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(retry_name, + expected_type=logbook.RetryDetail) + try: + failures = ad.last_failures + except exceptions.NotFound as e: + raise exceptions.StorageFailure("Unable to fetch most recent" + " retry failures so new retry" + " failure can be inserted", e) + else: + if failed_atom_name not in failures: + failures[failed_atom_name] = failure + self._with_connection(self._save_atom_detail, ad) + @lock_utils.write_locked def cleanup_retry_history(self, retry_name, state): """Cleanup history of retry atom with given name.""" - with self._lock.write_lock(): - ad = self._atomdetail_by_name(retry_name, - expected_type=logbook.RetryDetail) - ad.state = state - ad.results = [] - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(retry_name, + expected_type=logbook.RetryDetail) + ad.state = state + ad.results = [] + self._with_connection(self._save_atom_detail, ad) + @lock_utils.read_locked def _get(self, atom_name, only_last=False): - with self._lock.read_lock(): - ad = self._atomdetail_by_name(atom_name) - if ad.failure is not None: - cached = self._failures.get(atom_name) - if ad.failure.matches(cached): - return cached - return ad.failure - if ad.state not in STATES_WITH_RESULTS: - raise exceptions.NotFound("Result for atom %s is not currently" - " known" % atom_name) - if only_last: - return ad.last_results - else: - return ad.results + ad = self._atomdetail_by_name(atom_name) + if ad.failure is not None: + cached = self._failures.get(atom_name) + if ad.failure.matches(cached): + return cached + return ad.failure + if ad.state not in STATES_WITH_RESULTS: + raise exceptions.NotFound("Result for atom %s is not currently" + " known" % atom_name) + if only_last: + return ad.last_results + else: + return ad.results def get(self, atom_name): """Gets the results for an atom with a given name from storage.""" return self._get(atom_name) + @lock_utils.read_locked def get_failures(self): """Get list of failures that happened with this flow. No order guaranteed. """ - with self._lock.read_lock(): - return self._failures.copy() + return self._failures.copy() def has_failures(self): """Returns True if there are failed tasks in the storage.""" - with self._lock.read_lock(): - return bool(self._failures) + return bool(self._failures) def _reset_atom(self, ad, state): if ad.name == self.injector_name: @@ -498,12 +497,12 @@ class Storage(object): self._failures.pop(ad.name, None) return True + @lock_utils.write_locked def reset(self, atom_name, state=states.PENDING): """Reset atom with given name (if the task is in a given state).""" - with self._lock.write_lock(): - ad = self._atomdetail_by_name(atom_name) - if self._reset_atom(ad, state): - self._with_connection(self._save_atom_detail, ad) + ad = self._atomdetail_by_name(atom_name) + if self._reset_atom(ad, state): + self._with_connection(self._save_atom_detail, ad) def inject_atom_args(self, atom_name, pairs): """Add **transient** values into storage for a specific atom only. @@ -541,6 +540,7 @@ class Storage(object): self._injected_args.setdefault(atom_name, {}) self._injected_args[atom_name].update(pairs) + @lock_utils.write_locked def inject(self, pairs, transient=False): """Add values into storage. @@ -589,13 +589,12 @@ class Storage(object): self._transients.update(pairs) return (_TRANSIENT_PROVIDER, six.iterkeys(self._transients)) - with self._lock.write_lock(): - if transient: - provider_name, names = save_transient() - else: - provider_name, names = save_persistent() - self._set_result_mapping(provider_name, - dict((name, name) for name in names)) + if transient: + provider_name, names = save_transient() + else: + provider_name, names = save_persistent() + self._set_result_mapping(provider_name, + dict((name, name) for name in names)) def _set_result_mapping(self, provider_name, mapping): """Sets the result mapping for a given producer. @@ -615,6 +614,7 @@ class Storage(object): if provider not in entries: entries.append(provider) + @lock_utils.read_locked def fetch(self, name, many_handler=None): """Fetch a named result.""" # By default we just return the first of many (unless provided @@ -622,32 +622,32 @@ class Storage(object): # more meaningful). if many_handler is None: many_handler = lambda values: values[0] - with self._lock.read_lock(): - try: - providers = self._reverse_mapping[name] - except KeyError: - raise exceptions.NotFound("Name %r is not mapped as a" - " produced output by any" - " providers" % name) - values = [] - for provider in providers: - if provider.name is _TRANSIENT_PROVIDER: - values.append(_item_from_single(provider, - self._transients, name)) - else: - try: - container = self._get(provider.name, only_last=True) - except exceptions.NotFound: - pass - else: - values.append(_item_from_single(provider, - container, name)) - if not values: - raise exceptions.NotFound("Unable to find result %r," - " searched %s" % (name, providers)) + try: + providers = self._reverse_mapping[name] + except KeyError: + raise exceptions.NotFound("Name %r is not mapped as a" + " produced output by any" + " providers" % name) + values = [] + for provider in providers: + if provider.name is _TRANSIENT_PROVIDER: + values.append(_item_from_single(provider, + self._transients, name)) else: - return many_handler(values) + try: + container = self._get(provider.name, only_last=True) + except exceptions.NotFound: + pass + else: + values.append(_item_from_single(provider, + container, name)) + if not values: + raise exceptions.NotFound("Unable to find result %r," + " searched %s" % (name, providers)) + else: + return many_handler(values) + @lock_utils.read_locked def fetch_all(self): """Fetch all named results known so far. @@ -657,15 +657,15 @@ class Storage(object): if len(values) > 1: return values return values[0] - with self._lock.read_lock(): - results = {} - for name in six.iterkeys(self._reverse_mapping): - try: - results[name] = self.fetch(name, many_handler=many_handler) - except exceptions.NotFound: - pass - return results + results = {} + for name in six.iterkeys(self._reverse_mapping): + try: + results[name] = self.fetch(name, many_handler=many_handler) + except exceptions.NotFound: + pass + return results + @lock_utils.read_locked def fetch_mapped_args(self, args_mapping, atom_name=None, scope_walker=None, optional_args=None): @@ -708,66 +708,65 @@ class Storage(object): for p in providers] return [] - with self._lock.read_lock(): - if optional_args is None: - optional_args = [] - if atom_name and atom_name not in self._atom_name_to_uuid: - raise exceptions.NotFound("Unknown atom name: %s" % atom_name) - if not args_mapping: - return {} - if atom_name: - injected_args = self._injected_args.get(atom_name, {}) - else: - injected_args = {} - mapped_args = {} - for (bound_name, name) in six.iteritems(args_mapping): - if LOG.isEnabledFor(logging.BLATHER): - if atom_name: - LOG.blather("Looking for %r <= %r for atom named: %s", - bound_name, name, atom_name) - else: - LOG.blather("Looking for %r <= %r", bound_name, name) - if name in injected_args: - value = injected_args[name] - mapped_args[bound_name] = value - LOG.blather("Matched %r <= %r to %r (from injected" - " values)", bound_name, name, value) + if optional_args is None: + optional_args = [] + if atom_name and atom_name not in self._atom_name_to_uuid: + raise exceptions.NotFound("Unknown atom name: %s" % atom_name) + if not args_mapping: + return {} + if atom_name: + injected_args = self._injected_args.get(atom_name, {}) + else: + injected_args = {} + mapped_args = {} + for (bound_name, name) in six.iteritems(args_mapping): + if LOG.isEnabledFor(logging.BLATHER): + if atom_name: + LOG.blather("Looking for %r <= %r for atom named: %s", + bound_name, name, atom_name) else: - try: - possible_providers = self._reverse_mapping[name] - except KeyError: - if bound_name in optional_args: - continue - raise exceptions.NotFound("Name %r is not mapped as a" - " produced output by any" - " providers" % name) - # Reduce the possible providers to one that are allowed. - providers = _locate_providers(name, possible_providers) - if not providers: - raise exceptions.NotFound( - "Mapped argument %r <= %r was not produced" - " by any accessible provider (%s possible" - " providers were scanned)" - % (bound_name, name, len(possible_providers))) - provider, value = _item_from_first_of(providers, name) - mapped_args[bound_name] = value - LOG.blather("Matched %r <= %r to %r (from %s)", - bound_name, name, value, provider) - return mapped_args + LOG.blather("Looking for %r <= %r", bound_name, name) + if name in injected_args: + value = injected_args[name] + mapped_args[bound_name] = value + LOG.blather("Matched %r <= %r to %r (from injected" + " values)", bound_name, name, value) + else: + try: + possible_providers = self._reverse_mapping[name] + except KeyError: + if bound_name in optional_args: + continue + raise exceptions.NotFound("Name %r is not mapped as a" + " produced output by any" + " providers" % name) + # Reduce the possible providers to one that are allowed. + providers = _locate_providers(name, possible_providers) + if not providers: + raise exceptions.NotFound( + "Mapped argument %r <= %r was not produced" + " by any accessible provider (%s possible" + " providers were scanned)" + % (bound_name, name, len(possible_providers))) + provider, value = _item_from_first_of(providers, name) + mapped_args[bound_name] = value + LOG.blather("Matched %r <= %r to %r (from %s)", + bound_name, name, value, provider) + return mapped_args + @lock_utils.write_locked def set_flow_state(self, state): """Set flow details state and save it.""" - with self._lock.write_lock(): - self._flowdetail.state = state - self._with_connection(self._save_flow_detail) + self._flowdetail.state = state + self._with_connection(self._save_flow_detail) + @lock_utils.read_locked def get_flow_state(self): """Get state from flow details.""" - with self._lock.read_lock(): - state = self._flowdetail.state - if state is None: - state = states.PENDING - return state + state = self._flowdetail.state + if state is None: + state = states.PENDING + return state def _translate_into_history(self, ad): failure = None @@ -782,19 +781,19 @@ class Storage(object): failure = ad.failure return retry.History(ad.results, failure=failure) + @lock_utils.read_locked def get_retry_history(self, retry_name): """Fetch a single retrys history.""" - with self._lock.read_lock(): - ad = self._atomdetail_by_name(retry_name, - expected_type=logbook.RetryDetail) - return self._translate_into_history(ad) + ad = self._atomdetail_by_name(retry_name, + expected_type=logbook.RetryDetail) + return self._translate_into_history(ad) + @lock_utils.read_locked def get_retry_histories(self): """Fetch all retrys histories.""" histories = [] - with self._lock.read_lock(): - for ad in self._flowdetail: - if isinstance(ad, logbook.RetryDetail): - histories.append((ad.name, - self._translate_into_history(ad))) + for ad in self._flowdetail: + if isinstance(ad, logbook.RetryDetail): + histories.append((ad.name, + self._translate_into_history(ad))) return histories diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py index d7312088..726e10f9 100644 --- a/taskflow/utils/lock_utils.py +++ b/taskflow/utils/lock_utils.py @@ -64,15 +64,10 @@ def locked(*args, **kwargs): activates the given lock or list of locks as a context manager, automatically releasing that lock on exit. - NOTE(harlowja): if no attribute is provided then by default the attribute - named '_lock' is looked for in the instance object this decorator is - attached to. - - NOTE(harlowja): when we get the wrapt module approved we can address the - correctness of this decorator with regards to classmethods, to keep sanity - and correctness it is recommended to avoid using this on classmethods, once - https://review.openstack.org/#/c/94754/ is merged this will be refactored - and that use-case can be provided in a correct manner. + NOTE(harlowja): if no attribute name is provided then by default the + attribute named '_lock' is looked for (this attribute is expected to be + the lock/list of locks object/s) in the instance object this decorator + is attached to. """ def decorator(f): @@ -101,6 +96,66 @@ def locked(*args, **kwargs): return decorator +def read_locked(*args, **kwargs): + """Acquires & releases a read lock around call into decorated method. + + NOTE(harlowja): if no attribute name is provided then by default the + attribute named '_lock' is looked for (this attribute is expected to be + the rw-lock object) in the instance object this decorator is attached to. + """ + + def decorator(f): + attr_name = kwargs.get('lock', '_lock') + + @six.wraps(f) + def wrapper(self, *args, **kwargs): + rw_lock = getattr(self, attr_name) + with rw_lock.read_lock(): + return f(self, *args, **kwargs) + + return wrapper + + # This is needed to handle when the decorator has args or the decorator + # doesn't have args, python is rather weird here... + if kwargs or not args: + return decorator + else: + if len(args) == 1: + return decorator(args[0]) + else: + return decorator + + +def write_locked(*args, **kwargs): + """Acquires & releases a write lock around call into decorated method. + + NOTE(harlowja): if no attribute name is provided then by default the + attribute named '_lock' is looked for (this attribute is expected to be + the rw-lock object) in the instance object this decorator is attached to. + """ + + def decorator(f): + attr_name = kwargs.get('lock', '_lock') + + @six.wraps(f) + def wrapper(self, *args, **kwargs): + rw_lock = getattr(self, attr_name) + with rw_lock.write_lock(): + return f(self, *args, **kwargs) + + return wrapper + + # This is needed to handle when the decorator has args or the decorator + # doesn't have args, python is rather weird here... + if kwargs or not args: + return decorator + else: + if len(args) == 1: + return decorator(args[0]) + else: + return decorator + + class ReaderWriterLock(object): """A reader/writer lock.