Merge "Allow injected atom args to be persisted"
This commit is contained in:
@@ -366,9 +366,10 @@ and that class interacts with the a
|
||||
and the :py:class:`~taskflow.storage.Storage` class uses the following
|
||||
lookup order to find (or fail) a atoms requirement lookup/request:
|
||||
|
||||
#. Injected atom specific arguments.
|
||||
#. Transient injected arguments.
|
||||
#. Non-transient injected arguments.
|
||||
#. Transient injected atom specific arguments.
|
||||
#. Non-transient injected atom specific arguments.
|
||||
#. Transient injected arguments (flow specific).
|
||||
#. Non-transient injected arguments (flow specific).
|
||||
#. First scope visited provider that produces the named result; note that
|
||||
if multiple providers are found in the same scope the *first* (the scope
|
||||
walkers yielded ordering defines what *first* means) that produced that
|
||||
|
||||
@@ -20,6 +20,7 @@ import threading
|
||||
|
||||
from concurrent import futures
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import strutils
|
||||
import six
|
||||
|
||||
from taskflow.engines.action_engine import compiler
|
||||
@@ -119,6 +120,7 @@ class ActionEngine(base.Engine):
|
||||
"""
|
||||
self.compile()
|
||||
self.prepare()
|
||||
self.validate()
|
||||
runner = self._runtime.runner
|
||||
last_state = None
|
||||
with _start_stop(self._task_executor):
|
||||
@@ -168,10 +170,34 @@ class ActionEngine(base.Engine):
|
||||
|
||||
def _ensure_storage(self):
|
||||
"""Ensure all contained atoms exist in the storage unit."""
|
||||
transient = strutils.bool_from_string(
|
||||
self._options.get('inject_transient', True))
|
||||
for node in self._compilation.execution_graph.nodes_iter():
|
||||
self.storage.ensure_atom(node)
|
||||
if node.inject:
|
||||
self.storage.inject_atom_args(node.name, node.inject)
|
||||
self.storage.inject_atom_args(node.name,
|
||||
node.inject,
|
||||
transient=transient)
|
||||
|
||||
@lock_utils.locked
|
||||
def validate(self):
|
||||
if not self._storage_ensured:
|
||||
raise exc.InvalidState("Can not validate an engine"
|
||||
" which has not has its storage"
|
||||
" populated")
|
||||
# At this point we can check to ensure all dependencies are either
|
||||
# flow/task provided or storage provided, if there are still missing
|
||||
# dependencies then this flow will fail at runtime (which we can avoid
|
||||
# by failing at validation time).
|
||||
missing = set()
|
||||
fetch = self.storage.fetch_unsatisfied_args
|
||||
for node in self._compilation.execution_graph.nodes_iter():
|
||||
scope_walker = self._runtime.fetch_scopes_for(node)
|
||||
missing.update(fetch(node.name, node.rebind,
|
||||
scope_walker=scope_walker,
|
||||
optional_args=node.optional))
|
||||
if missing:
|
||||
raise exc.MissingDependencies(self._flow, sorted(missing))
|
||||
|
||||
@lock_utils.locked
|
||||
def prepare(self):
|
||||
@@ -186,14 +212,6 @@ class ActionEngine(base.Engine):
|
||||
self._ensure_storage()
|
||||
self._change_state(states.SUSPENDED)
|
||||
self._storage_ensured = True
|
||||
# At this point we can check to ensure all dependencies are either
|
||||
# flow/task provided or storage provided, if there are still missing
|
||||
# dependencies then this flow will fail at runtime (which we can avoid
|
||||
# by failing at preparation time).
|
||||
external_provides = set(self.storage.fetch_all().keys())
|
||||
missing = self._flow.requires - external_provides
|
||||
if missing:
|
||||
raise exc.MissingDependencies(self._flow, sorted(missing))
|
||||
# Reset everything back to pending (if we were previously reverted).
|
||||
if self.storage.get_flow_state() == states.REVERTED:
|
||||
self._runtime.reset_all()
|
||||
|
||||
@@ -67,15 +67,15 @@ class Runtime(object):
|
||||
@misc.cachedproperty
|
||||
def retry_action(self):
|
||||
return ra.RetryAction(self._storage, self._atom_notifier,
|
||||
self._fetch_scopes_for)
|
||||
self.fetch_scopes_for)
|
||||
|
||||
@misc.cachedproperty
|
||||
def task_action(self):
|
||||
return ta.TaskAction(self._storage,
|
||||
self._atom_notifier, self._fetch_scopes_for,
|
||||
self._atom_notifier, self.fetch_scopes_for,
|
||||
self._task_executor)
|
||||
|
||||
def _fetch_scopes_for(self, atom):
|
||||
def fetch_scopes_for(self, atom):
|
||||
"""Fetches a tuple of the visible scopes for the given atom."""
|
||||
try:
|
||||
return self._scopes[atom]
|
||||
|
||||
@@ -92,9 +92,18 @@ class Engine(object):
|
||||
"""Performs any pre-run, but post-compilation actions.
|
||||
|
||||
NOTE(harlowja): During preparation it is currently assumed that the
|
||||
underlying storage will be initialized, all final dependencies
|
||||
will be verified, the tasks will be reset and the engine will enter
|
||||
the PENDING state.
|
||||
underlying storage will be initialized, the atoms will be reset and
|
||||
the engine will enter the PENDING state.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def validate(self):
|
||||
"""Performs any pre-run, post-prepare validation actions.
|
||||
|
||||
NOTE(harlowja): During validation all final dependencies
|
||||
will be verified and ensured. This will by default check that all
|
||||
atoms have satisfiable requirements (satisfied by some other
|
||||
provider).
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
@@ -105,8 +114,8 @@ class Engine(object):
|
||||
def suspend(self):
|
||||
"""Attempts to suspend the engine.
|
||||
|
||||
If the engine is currently running tasks then this will attempt to
|
||||
suspend future work from being started (currently active tasks can
|
||||
If the engine is currently running atoms then this will attempt to
|
||||
suspend future work from being started (currently active atoms can
|
||||
not currently be preempted) and move the engine into a suspend state
|
||||
which can then later be resumed from.
|
||||
"""
|
||||
|
||||
@@ -47,6 +47,13 @@ _TRANSIENT_PROVIDER = object()
|
||||
# can fail extraction during lookup or emit warning on result reception...
|
||||
_EXTRACTION_EXCEPTIONS = (IndexError, KeyError, ValueError, TypeError)
|
||||
|
||||
# Atom detail metadata key used to inject atom non-transient injected args.
|
||||
META_INJECTED = 'injected'
|
||||
|
||||
# Atom detail metadata key(s) used to set atom progress (with any details).
|
||||
META_PROGRESS = 'progress'
|
||||
META_PROGRESS_DETAILS = 'progress_details'
|
||||
|
||||
|
||||
class _Provider(object):
|
||||
"""A named symbol provider that produces a output at the given index."""
|
||||
@@ -356,19 +363,19 @@ class Storage(object):
|
||||
:param details: any task specific progress details
|
||||
"""
|
||||
update_with = {
|
||||
'progress': progress,
|
||||
META_PROGRESS: progress,
|
||||
}
|
||||
if details is not None:
|
||||
# NOTE(imelnikov): as we can update progress without
|
||||
# updating details (e.g. automatically from engine)
|
||||
# we save progress value with details, too.
|
||||
if details:
|
||||
update_with['progress_details'] = {
|
||||
update_with[META_PROGRESS_DETAILS] = {
|
||||
'at_progress': progress,
|
||||
'details': details,
|
||||
}
|
||||
else:
|
||||
update_with['progress_details'] = None
|
||||
update_with[META_PROGRESS_DETAILS] = None
|
||||
self._update_atom_metadata(task_name, update_with,
|
||||
expected_type=logbook.TaskDetail)
|
||||
|
||||
@@ -382,7 +389,7 @@ class Storage(object):
|
||||
ad = self._atomdetail_by_name(task_name,
|
||||
expected_type=logbook.TaskDetail)
|
||||
try:
|
||||
return ad.meta['progress']
|
||||
return ad.meta[META_PROGRESS]
|
||||
except KeyError:
|
||||
return 0.0
|
||||
|
||||
@@ -397,7 +404,7 @@ class Storage(object):
|
||||
ad = self._atomdetail_by_name(task_name,
|
||||
expected_type=logbook.TaskDetail)
|
||||
try:
|
||||
return ad.meta['progress_details']
|
||||
return ad.meta[META_PROGRESS_DETAILS]
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
@@ -504,8 +511,12 @@ class Storage(object):
|
||||
if self._reset_atom(ad, state):
|
||||
self._with_connection(self._save_atom_detail, ad)
|
||||
|
||||
def inject_atom_args(self, atom_name, pairs):
|
||||
"""Add **transient** values into storage for a specific atom only.
|
||||
def inject_atom_args(self, atom_name, pairs, transient=True):
|
||||
"""Add values into storage for a specific atom only.
|
||||
|
||||
:param transient: save the data in-memory only instead of persisting
|
||||
the data to backend storage (useful for resource-like objects
|
||||
or similar objects which can **not** be persisted)
|
||||
|
||||
This method injects a dictionary/pairs of arguments for an atom so that
|
||||
when that atom is scheduled for execution it will have immediate access
|
||||
@@ -536,10 +547,26 @@ class Storage(object):
|
||||
"""
|
||||
if atom_name not in self._atom_name_to_uuid:
|
||||
raise exceptions.NotFound("Unknown atom name: %s" % atom_name)
|
||||
with self._lock.write_lock():
|
||||
|
||||
def save_transient():
|
||||
self._injected_args.setdefault(atom_name, {})
|
||||
self._injected_args[atom_name].update(pairs)
|
||||
|
||||
def save_persistent():
|
||||
ad = self._atomdetail_by_name(atom_name)
|
||||
injected = ad.meta.get(META_INJECTED)
|
||||
if not injected:
|
||||
injected = {}
|
||||
injected.update(pairs)
|
||||
ad.meta[META_INJECTED] = injected
|
||||
self._with_connection(self._save_atom_detail, ad)
|
||||
|
||||
with self._lock.write_lock():
|
||||
if transient:
|
||||
save_transient()
|
||||
else:
|
||||
save_persistent()
|
||||
|
||||
@lock_utils.write_locked
|
||||
def inject(self, pairs, transient=False):
|
||||
"""Add values into storage.
|
||||
@@ -648,11 +675,91 @@ class Storage(object):
|
||||
return many_handler(values)
|
||||
|
||||
@lock_utils.read_locked
|
||||
def fetch_all(self):
|
||||
"""Fetch all named results known so far.
|
||||
def fetch_unsatisfied_args(self, atom_name, args_mapping,
|
||||
scope_walker=None, optional_args=None):
|
||||
"""Fetch unsatisfied atom arguments using an atoms argument mapping.
|
||||
|
||||
NOTE(harlowja): should be used for debugging and testing purposes.
|
||||
NOTE(harlowja): this takes into account the provided scope walker
|
||||
atoms who should produce the required value at runtime, as well as
|
||||
the transient/persistent flow and atom specific injected arguments.
|
||||
It does **not** check if the providers actually have produced the
|
||||
needed values; it just checks that they are registered to produce
|
||||
it in the future.
|
||||
"""
|
||||
|
||||
def _fetch_providers(name):
|
||||
"""Fetchs pair of (default providers, non-default providers)."""
|
||||
default_providers = []
|
||||
non_default_providers = []
|
||||
for p in self._reverse_mapping.get(name, []):
|
||||
if p.name in (_TRANSIENT_PROVIDER, self.injector_name):
|
||||
default_providers.append(p)
|
||||
else:
|
||||
non_default_providers.append(p)
|
||||
return default_providers, non_default_providers
|
||||
|
||||
def _locate_providers(name):
|
||||
"""Finds the accessible *potential* providers."""
|
||||
default_providers, non_default_providers = _fetch_providers(name)
|
||||
providers = []
|
||||
if non_default_providers:
|
||||
if scope_walker is not None:
|
||||
scope_iter = iter(scope_walker)
|
||||
else:
|
||||
scope_iter = iter([])
|
||||
for names in scope_iter:
|
||||
for p in non_default_providers:
|
||||
if p.name in names:
|
||||
providers.append(p)
|
||||
for p in default_providers:
|
||||
if p.name is _TRANSIENT_PROVIDER:
|
||||
results = self._transients
|
||||
else:
|
||||
try:
|
||||
results = self._get(p.name, only_last=True)
|
||||
except exceptions.NotFound:
|
||||
results = {}
|
||||
try:
|
||||
_item_from_single(p, results, name)
|
||||
except exceptions.NotFound:
|
||||
pass
|
||||
else:
|
||||
providers.append(p)
|
||||
return providers
|
||||
|
||||
ad = self._atomdetail_by_name(atom_name)
|
||||
if optional_args is None:
|
||||
optional_args = []
|
||||
injected_sources = [
|
||||
self._injected_args.get(atom_name, {}),
|
||||
ad.meta.get(META_INJECTED, {}),
|
||||
]
|
||||
missing = set(six.iterkeys(args_mapping))
|
||||
for (bound_name, name) in six.iteritems(args_mapping):
|
||||
if LOG.isEnabledFor(logging.BLATHER):
|
||||
LOG.blather("Looking for %r <= %r for atom named: %s",
|
||||
bound_name, name, atom_name)
|
||||
if bound_name in optional_args:
|
||||
LOG.blather("Argument %r is optional, skipping", bound_name)
|
||||
missing.discard(bound_name)
|
||||
continue
|
||||
maybe_providers = 0
|
||||
for source in injected_sources:
|
||||
if not source:
|
||||
continue
|
||||
if name in source:
|
||||
maybe_providers += 1
|
||||
maybe_providers += len(_locate_providers(name))
|
||||
if maybe_providers:
|
||||
LOG.blather("Atom %s will have %s potential providers"
|
||||
" of %r <= %r", atom_name, maybe_providers,
|
||||
bound_name, name)
|
||||
missing.discard(bound_name)
|
||||
return missing
|
||||
|
||||
@lock_utils.read_locked
|
||||
def fetch_all(self):
|
||||
"""Fetch all named results known so far."""
|
||||
def many_handler(values):
|
||||
if len(values) > 1:
|
||||
return values
|
||||
@@ -671,6 +778,15 @@ class Storage(object):
|
||||
optional_args=None):
|
||||
"""Fetch arguments for an atom using an atoms argument mapping."""
|
||||
|
||||
def _extract_first_from(name, sources):
|
||||
"""Extracts/returns first occurence of key in list of dicts."""
|
||||
for i, source in enumerate(sources):
|
||||
if not source:
|
||||
continue
|
||||
if name in source:
|
||||
return (i, source[name])
|
||||
raise KeyError(name)
|
||||
|
||||
def _get_results(looking_for, provider):
|
||||
"""Gets the results saved for a given provider."""
|
||||
try:
|
||||
@@ -710,14 +826,16 @@ class Storage(object):
|
||||
|
||||
if optional_args is None:
|
||||
optional_args = []
|
||||
if atom_name and atom_name not in self._atom_name_to_uuid:
|
||||
raise exceptions.NotFound("Unknown atom name: %s" % atom_name)
|
||||
if atom_name:
|
||||
ad = self._atomdetail_by_name(atom_name)
|
||||
injected_sources = [
|
||||
self._injected_args.get(atom_name, {}),
|
||||
ad.meta.get(META_INJECTED, {}),
|
||||
]
|
||||
else:
|
||||
injected_sources = []
|
||||
if not args_mapping:
|
||||
return {}
|
||||
if atom_name:
|
||||
injected_args = self._injected_args.get(atom_name, {})
|
||||
else:
|
||||
injected_args = {}
|
||||
mapped_args = {}
|
||||
for (bound_name, name) in six.iteritems(args_mapping):
|
||||
if LOG.isEnabledFor(logging.BLATHER):
|
||||
@@ -726,16 +844,26 @@ class Storage(object):
|
||||
bound_name, name, atom_name)
|
||||
else:
|
||||
LOG.blather("Looking for %r <= %r", bound_name, name)
|
||||
if name in injected_args:
|
||||
value = injected_args[name]
|
||||
try:
|
||||
source_index, value = _extract_first_from(name,
|
||||
injected_sources)
|
||||
mapped_args[bound_name] = value
|
||||
LOG.blather("Matched %r <= %r to %r (from injected"
|
||||
" values)", bound_name, name, value)
|
||||
else:
|
||||
if LOG.isEnabledFor(logging.BLATHER):
|
||||
if source_index == 0:
|
||||
LOG.blather("Matched %r <= %r to %r (from injected"
|
||||
" atom-specific transient"
|
||||
" values)", bound_name, name, value)
|
||||
else:
|
||||
LOG.blather("Matched %r <= %r to %r (from injected"
|
||||
" atom-specific persistent"
|
||||
" values)", bound_name, name, value)
|
||||
except KeyError:
|
||||
try:
|
||||
possible_providers = self._reverse_mapping[name]
|
||||
except KeyError:
|
||||
if bound_name in optional_args:
|
||||
LOG.blather("Argument %r is optional, skipping",
|
||||
bound_name)
|
||||
continue
|
||||
raise exceptions.NotFound("Name %r is not mapped as a"
|
||||
" produced output by any"
|
||||
|
||||
@@ -539,6 +539,31 @@ class StorageTestMixin(object):
|
||||
intention = s.get_atom_intention('my retry')
|
||||
self.assertEqual(intention, states.RETRY)
|
||||
|
||||
def test_inject_persistent_missing(self):
|
||||
t = test_utils.ProgressingTask('my retry', requires=['x'])
|
||||
s = self._get_storage()
|
||||
s.ensure_atom(t)
|
||||
missing = s.fetch_unsatisfied_args(t.name, t.rebind)
|
||||
self.assertEqual(set(['x']), missing)
|
||||
s.inject_atom_args(t.name, {'x': 2}, transient=False)
|
||||
missing = s.fetch_unsatisfied_args(t.name, t.rebind)
|
||||
self.assertEqual(set(), missing)
|
||||
args = s.fetch_mapped_args(t.rebind, atom_name=t.name)
|
||||
self.assertEqual(2, args['x'])
|
||||
|
||||
def test_inject_persistent_and_transient_missing(self):
|
||||
t = test_utils.ProgressingTask('my retry', requires=['x'])
|
||||
s = self._get_storage()
|
||||
s.ensure_atom(t)
|
||||
missing = s.fetch_unsatisfied_args(t.name, t.rebind)
|
||||
self.assertEqual(set(['x']), missing)
|
||||
s.inject_atom_args(t.name, {'x': 2}, transient=False)
|
||||
s.inject_atom_args(t.name, {'x': 3}, transient=True)
|
||||
missing = s.fetch_unsatisfied_args(t.name, t.rebind)
|
||||
self.assertEqual(set(), missing)
|
||||
args = s.fetch_mapped_args(t.rebind, atom_name=t.name)
|
||||
self.assertEqual(3, args['x'])
|
||||
|
||||
|
||||
class StorageMemoryTest(StorageTestMixin, test.TestCase):
|
||||
def setUp(self):
|
||||
|
||||
Reference in New Issue
Block a user