diff --git a/doc/source/engines.rst b/doc/source/engines.rst index e2b85fbd..3ce19030 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -366,9 +366,10 @@ and that class interacts with the a and the :py:class:`~taskflow.storage.Storage` class uses the following lookup order to find (or fail) a atoms requirement lookup/request: -#. Injected atom specific arguments. -#. Transient injected arguments. -#. Non-transient injected arguments. +#. Transient injected atom specific arguments. +#. Non-transient injected atom specific arguments. +#. Transient injected arguments (flow specific). +#. Non-transient injected arguments (flow specific). #. First scope visited provider that produces the named result; note that if multiple providers are found in the same scope the *first* (the scope walkers yielded ordering defines what *first* means) that produced that diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index b40c5fd5..9fa0f5a5 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -20,6 +20,7 @@ import threading from concurrent import futures from oslo_utils import excutils +from oslo_utils import strutils import six from taskflow.engines.action_engine import compiler @@ -119,6 +120,7 @@ class ActionEngine(base.Engine): """ self.compile() self.prepare() + self.validate() runner = self._runtime.runner last_state = None with _start_stop(self._task_executor): @@ -168,10 +170,34 @@ class ActionEngine(base.Engine): def _ensure_storage(self): """Ensure all contained atoms exist in the storage unit.""" + transient = strutils.bool_from_string( + self._options.get('inject_transient', True)) for node in self._compilation.execution_graph.nodes_iter(): self.storage.ensure_atom(node) if node.inject: - self.storage.inject_atom_args(node.name, node.inject) + self.storage.inject_atom_args(node.name, + node.inject, + transient=transient) + + @lock_utils.locked + def validate(self): + if not self._storage_ensured: + raise exc.InvalidState("Can not validate an engine" + " which has not has its storage" + " populated") + # At this point we can check to ensure all dependencies are either + # flow/task provided or storage provided, if there are still missing + # dependencies then this flow will fail at runtime (which we can avoid + # by failing at validation time). + missing = set() + fetch = self.storage.fetch_unsatisfied_args + for node in self._compilation.execution_graph.nodes_iter(): + scope_walker = self._runtime.fetch_scopes_for(node) + missing.update(fetch(node.name, node.rebind, + scope_walker=scope_walker, + optional_args=node.optional)) + if missing: + raise exc.MissingDependencies(self._flow, sorted(missing)) @lock_utils.locked def prepare(self): @@ -186,14 +212,6 @@ class ActionEngine(base.Engine): self._ensure_storage() self._change_state(states.SUSPENDED) self._storage_ensured = True - # At this point we can check to ensure all dependencies are either - # flow/task provided or storage provided, if there are still missing - # dependencies then this flow will fail at runtime (which we can avoid - # by failing at preparation time). - external_provides = set(self.storage.fetch_all().keys()) - missing = self._flow.requires - external_provides - if missing: - raise exc.MissingDependencies(self._flow, sorted(missing)) # Reset everything back to pending (if we were previously reverted). if self.storage.get_flow_state() == states.REVERTED: self._runtime.reset_all() diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 169a6415..d71ff65c 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -67,15 +67,15 @@ class Runtime(object): @misc.cachedproperty def retry_action(self): return ra.RetryAction(self._storage, self._atom_notifier, - self._fetch_scopes_for) + self.fetch_scopes_for) @misc.cachedproperty def task_action(self): return ta.TaskAction(self._storage, - self._atom_notifier, self._fetch_scopes_for, + self._atom_notifier, self.fetch_scopes_for, self._task_executor) - def _fetch_scopes_for(self, atom): + def fetch_scopes_for(self, atom): """Fetches a tuple of the visible scopes for the given atom.""" try: return self._scopes[atom] diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index 5e2263eb..632f626f 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -92,9 +92,18 @@ class Engine(object): """Performs any pre-run, but post-compilation actions. NOTE(harlowja): During preparation it is currently assumed that the - underlying storage will be initialized, all final dependencies - will be verified, the tasks will be reset and the engine will enter - the PENDING state. + underlying storage will be initialized, the atoms will be reset and + the engine will enter the PENDING state. + """ + + @abc.abstractmethod + def validate(self): + """Performs any pre-run, post-prepare validation actions. + + NOTE(harlowja): During validation all final dependencies + will be verified and ensured. This will by default check that all + atoms have satisfiable requirements (satisfied by some other + provider). """ @abc.abstractmethod @@ -105,8 +114,8 @@ class Engine(object): def suspend(self): """Attempts to suspend the engine. - If the engine is currently running tasks then this will attempt to - suspend future work from being started (currently active tasks can + If the engine is currently running atoms then this will attempt to + suspend future work from being started (currently active atoms can not currently be preempted) and move the engine into a suspend state which can then later be resumed from. """ diff --git a/taskflow/storage.py b/taskflow/storage.py index 449aafb6..d1f67c62 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -47,6 +47,13 @@ _TRANSIENT_PROVIDER = object() # can fail extraction during lookup or emit warning on result reception... _EXTRACTION_EXCEPTIONS = (IndexError, KeyError, ValueError, TypeError) +# Atom detail metadata key used to inject atom non-transient injected args. +META_INJECTED = 'injected' + +# Atom detail metadata key(s) used to set atom progress (with any details). +META_PROGRESS = 'progress' +META_PROGRESS_DETAILS = 'progress_details' + class _Provider(object): """A named symbol provider that produces a output at the given index.""" @@ -356,19 +363,19 @@ class Storage(object): :param details: any task specific progress details """ update_with = { - 'progress': progress, + META_PROGRESS: progress, } if details is not None: # NOTE(imelnikov): as we can update progress without # updating details (e.g. automatically from engine) # we save progress value with details, too. if details: - update_with['progress_details'] = { + update_with[META_PROGRESS_DETAILS] = { 'at_progress': progress, 'details': details, } else: - update_with['progress_details'] = None + update_with[META_PROGRESS_DETAILS] = None self._update_atom_metadata(task_name, update_with, expected_type=logbook.TaskDetail) @@ -382,7 +389,7 @@ class Storage(object): ad = self._atomdetail_by_name(task_name, expected_type=logbook.TaskDetail) try: - return ad.meta['progress'] + return ad.meta[META_PROGRESS] except KeyError: return 0.0 @@ -397,7 +404,7 @@ class Storage(object): ad = self._atomdetail_by_name(task_name, expected_type=logbook.TaskDetail) try: - return ad.meta['progress_details'] + return ad.meta[META_PROGRESS_DETAILS] except KeyError: return None @@ -504,8 +511,12 @@ class Storage(object): if self._reset_atom(ad, state): self._with_connection(self._save_atom_detail, ad) - def inject_atom_args(self, atom_name, pairs): - """Add **transient** values into storage for a specific atom only. + def inject_atom_args(self, atom_name, pairs, transient=True): + """Add values into storage for a specific atom only. + + :param transient: save the data in-memory only instead of persisting + the data to backend storage (useful for resource-like objects + or similar objects which can **not** be persisted) This method injects a dictionary/pairs of arguments for an atom so that when that atom is scheduled for execution it will have immediate access @@ -536,10 +547,26 @@ class Storage(object): """ if atom_name not in self._atom_name_to_uuid: raise exceptions.NotFound("Unknown atom name: %s" % atom_name) - with self._lock.write_lock(): + + def save_transient(): self._injected_args.setdefault(atom_name, {}) self._injected_args[atom_name].update(pairs) + def save_persistent(): + ad = self._atomdetail_by_name(atom_name) + injected = ad.meta.get(META_INJECTED) + if not injected: + injected = {} + injected.update(pairs) + ad.meta[META_INJECTED] = injected + self._with_connection(self._save_atom_detail, ad) + + with self._lock.write_lock(): + if transient: + save_transient() + else: + save_persistent() + @lock_utils.write_locked def inject(self, pairs, transient=False): """Add values into storage. @@ -648,11 +675,91 @@ class Storage(object): return many_handler(values) @lock_utils.read_locked - def fetch_all(self): - """Fetch all named results known so far. + def fetch_unsatisfied_args(self, atom_name, args_mapping, + scope_walker=None, optional_args=None): + """Fetch unsatisfied atom arguments using an atoms argument mapping. - NOTE(harlowja): should be used for debugging and testing purposes. + NOTE(harlowja): this takes into account the provided scope walker + atoms who should produce the required value at runtime, as well as + the transient/persistent flow and atom specific injected arguments. + It does **not** check if the providers actually have produced the + needed values; it just checks that they are registered to produce + it in the future. """ + + def _fetch_providers(name): + """Fetchs pair of (default providers, non-default providers).""" + default_providers = [] + non_default_providers = [] + for p in self._reverse_mapping.get(name, []): + if p.name in (_TRANSIENT_PROVIDER, self.injector_name): + default_providers.append(p) + else: + non_default_providers.append(p) + return default_providers, non_default_providers + + def _locate_providers(name): + """Finds the accessible *potential* providers.""" + default_providers, non_default_providers = _fetch_providers(name) + providers = [] + if non_default_providers: + if scope_walker is not None: + scope_iter = iter(scope_walker) + else: + scope_iter = iter([]) + for names in scope_iter: + for p in non_default_providers: + if p.name in names: + providers.append(p) + for p in default_providers: + if p.name is _TRANSIENT_PROVIDER: + results = self._transients + else: + try: + results = self._get(p.name, only_last=True) + except exceptions.NotFound: + results = {} + try: + _item_from_single(p, results, name) + except exceptions.NotFound: + pass + else: + providers.append(p) + return providers + + ad = self._atomdetail_by_name(atom_name) + if optional_args is None: + optional_args = [] + injected_sources = [ + self._injected_args.get(atom_name, {}), + ad.meta.get(META_INJECTED, {}), + ] + missing = set(six.iterkeys(args_mapping)) + for (bound_name, name) in six.iteritems(args_mapping): + if LOG.isEnabledFor(logging.BLATHER): + LOG.blather("Looking for %r <= %r for atom named: %s", + bound_name, name, atom_name) + if bound_name in optional_args: + LOG.blather("Argument %r is optional, skipping", bound_name) + missing.discard(bound_name) + continue + maybe_providers = 0 + for source in injected_sources: + if not source: + continue + if name in source: + maybe_providers += 1 + maybe_providers += len(_locate_providers(name)) + if maybe_providers: + LOG.blather("Atom %s will have %s potential providers" + " of %r <= %r", atom_name, maybe_providers, + bound_name, name) + missing.discard(bound_name) + return missing + + @lock_utils.read_locked + def fetch_all(self): + """Fetch all named results known so far.""" def many_handler(values): if len(values) > 1: return values @@ -671,6 +778,15 @@ class Storage(object): optional_args=None): """Fetch arguments for an atom using an atoms argument mapping.""" + def _extract_first_from(name, sources): + """Extracts/returns first occurence of key in list of dicts.""" + for i, source in enumerate(sources): + if not source: + continue + if name in source: + return (i, source[name]) + raise KeyError(name) + def _get_results(looking_for, provider): """Gets the results saved for a given provider.""" try: @@ -710,14 +826,16 @@ class Storage(object): if optional_args is None: optional_args = [] - if atom_name and atom_name not in self._atom_name_to_uuid: - raise exceptions.NotFound("Unknown atom name: %s" % atom_name) + if atom_name: + ad = self._atomdetail_by_name(atom_name) + injected_sources = [ + self._injected_args.get(atom_name, {}), + ad.meta.get(META_INJECTED, {}), + ] + else: + injected_sources = [] if not args_mapping: return {} - if atom_name: - injected_args = self._injected_args.get(atom_name, {}) - else: - injected_args = {} mapped_args = {} for (bound_name, name) in six.iteritems(args_mapping): if LOG.isEnabledFor(logging.BLATHER): @@ -726,16 +844,26 @@ class Storage(object): bound_name, name, atom_name) else: LOG.blather("Looking for %r <= %r", bound_name, name) - if name in injected_args: - value = injected_args[name] + try: + source_index, value = _extract_first_from(name, + injected_sources) mapped_args[bound_name] = value - LOG.blather("Matched %r <= %r to %r (from injected" - " values)", bound_name, name, value) - else: + if LOG.isEnabledFor(logging.BLATHER): + if source_index == 0: + LOG.blather("Matched %r <= %r to %r (from injected" + " atom-specific transient" + " values)", bound_name, name, value) + else: + LOG.blather("Matched %r <= %r to %r (from injected" + " atom-specific persistent" + " values)", bound_name, name, value) + except KeyError: try: possible_providers = self._reverse_mapping[name] except KeyError: if bound_name in optional_args: + LOG.blather("Argument %r is optional, skipping", + bound_name) continue raise exceptions.NotFound("Name %r is not mapped as a" " produced output by any" diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index a27a811c..7401282b 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -533,6 +533,31 @@ class StorageTestMixin(object): intention = s.get_atom_intention('my retry') self.assertEqual(intention, states.RETRY) + def test_inject_persistent_missing(self): + t = test_utils.ProgressingTask('my retry', requires=['x']) + s = self._get_storage() + s.ensure_atom(t) + missing = s.fetch_unsatisfied_args(t.name, t.rebind) + self.assertEqual(set(['x']), missing) + s.inject_atom_args(t.name, {'x': 2}, transient=False) + missing = s.fetch_unsatisfied_args(t.name, t.rebind) + self.assertEqual(set(), missing) + args = s.fetch_mapped_args(t.rebind, atom_name=t.name) + self.assertEqual(2, args['x']) + + def test_inject_persistent_and_transient_missing(self): + t = test_utils.ProgressingTask('my retry', requires=['x']) + s = self._get_storage() + s.ensure_atom(t) + missing = s.fetch_unsatisfied_args(t.name, t.rebind) + self.assertEqual(set(['x']), missing) + s.inject_atom_args(t.name, {'x': 2}, transient=False) + s.inject_atom_args(t.name, {'x': 3}, transient=True) + missing = s.fetch_unsatisfied_args(t.name, t.rebind) + self.assertEqual(set(), missing) + args = s.fetch_mapped_args(t.rebind, atom_name=t.name) + self.assertEqual(3, args['x']) + class StorageMemoryTest(StorageTestMixin, test.TestCase): def setUp(self):