Allow injected atom args to be persisted

Instead of only storing injected atom arguments in memory
allow for specifying those to be persisted; so that users
who desire this feature can persist them (it defaults to
being transient to retain the old API behavior).

This also reworks the validating of engine dependencies
to be more correct. It removes the validation of these
dependencies from the prepare() method and moves them to a
new engine validate() method; this allows users to prepare()
the engine, then inject there atom non-transient arguments
and then validate(); the validation would fail prior to this
at preparation time since no injected arguments would
exist and the user would not have the ability to inject any
that target a specific atom, since the atom detail would
not have been created yet (since that is populated in the
prepartion method).

Change-Id: I2846d0334db32a115592f850d85b206d9e6a3f07
This commit is contained in:
Joshua Harlow
2015-03-10 12:54:00 -07:00
parent f9e520679a
commit f0de22c18a
6 changed files with 223 additions and 42 deletions

View File

@@ -366,9 +366,10 @@ and that class interacts with the a
and the :py:class:`~taskflow.storage.Storage` class uses the following
lookup order to find (or fail) a atoms requirement lookup/request:
#. Injected atom specific arguments.
#. Transient injected arguments.
#. Non-transient injected arguments.
#. Transient injected atom specific arguments.
#. Non-transient injected atom specific arguments.
#. Transient injected arguments (flow specific).
#. Non-transient injected arguments (flow specific).
#. First scope visited provider that produces the named result; note that
if multiple providers are found in the same scope the *first* (the scope
walkers yielded ordering defines what *first* means) that produced that

View File

@@ -20,6 +20,7 @@ import threading
from concurrent import futures
from oslo_utils import excutils
from oslo_utils import strutils
import six
from taskflow.engines.action_engine import compiler
@@ -119,6 +120,7 @@ class ActionEngine(base.Engine):
"""
self.compile()
self.prepare()
self.validate()
runner = self._runtime.runner
last_state = None
with _start_stop(self._task_executor):
@@ -168,10 +170,34 @@ class ActionEngine(base.Engine):
def _ensure_storage(self):
"""Ensure all contained atoms exist in the storage unit."""
transient = strutils.bool_from_string(
self._options.get('inject_transient', True))
for node in self._compilation.execution_graph.nodes_iter():
self.storage.ensure_atom(node)
if node.inject:
self.storage.inject_atom_args(node.name, node.inject)
self.storage.inject_atom_args(node.name,
node.inject,
transient=transient)
@lock_utils.locked
def validate(self):
if not self._storage_ensured:
raise exc.InvalidState("Can not validate an engine"
" which has not has its storage"
" populated")
# At this point we can check to ensure all dependencies are either
# flow/task provided or storage provided, if there are still missing
# dependencies then this flow will fail at runtime (which we can avoid
# by failing at validation time).
missing = set()
fetch = self.storage.fetch_unsatisfied_args
for node in self._compilation.execution_graph.nodes_iter():
scope_walker = self._runtime.fetch_scopes_for(node)
missing.update(fetch(node.name, node.rebind,
scope_walker=scope_walker,
optional_args=node.optional))
if missing:
raise exc.MissingDependencies(self._flow, sorted(missing))
@lock_utils.locked
def prepare(self):
@@ -186,14 +212,6 @@ class ActionEngine(base.Engine):
self._ensure_storage()
self._change_state(states.SUSPENDED)
self._storage_ensured = True
# At this point we can check to ensure all dependencies are either
# flow/task provided or storage provided, if there are still missing
# dependencies then this flow will fail at runtime (which we can avoid
# by failing at preparation time).
external_provides = set(self.storage.fetch_all().keys())
missing = self._flow.requires - external_provides
if missing:
raise exc.MissingDependencies(self._flow, sorted(missing))
# Reset everything back to pending (if we were previously reverted).
if self.storage.get_flow_state() == states.REVERTED:
self._runtime.reset_all()

View File

@@ -67,15 +67,15 @@ class Runtime(object):
@misc.cachedproperty
def retry_action(self):
return ra.RetryAction(self._storage, self._atom_notifier,
self._fetch_scopes_for)
self.fetch_scopes_for)
@misc.cachedproperty
def task_action(self):
return ta.TaskAction(self._storage,
self._atom_notifier, self._fetch_scopes_for,
self._atom_notifier, self.fetch_scopes_for,
self._task_executor)
def _fetch_scopes_for(self, atom):
def fetch_scopes_for(self, atom):
"""Fetches a tuple of the visible scopes for the given atom."""
try:
return self._scopes[atom]

View File

@@ -92,9 +92,18 @@ class Engine(object):
"""Performs any pre-run, but post-compilation actions.
NOTE(harlowja): During preparation it is currently assumed that the
underlying storage will be initialized, all final dependencies
will be verified, the tasks will be reset and the engine will enter
the PENDING state.
underlying storage will be initialized, the atoms will be reset and
the engine will enter the PENDING state.
"""
@abc.abstractmethod
def validate(self):
"""Performs any pre-run, post-prepare validation actions.
NOTE(harlowja): During validation all final dependencies
will be verified and ensured. This will by default check that all
atoms have satisfiable requirements (satisfied by some other
provider).
"""
@abc.abstractmethod
@@ -105,8 +114,8 @@ class Engine(object):
def suspend(self):
"""Attempts to suspend the engine.
If the engine is currently running tasks then this will attempt to
suspend future work from being started (currently active tasks can
If the engine is currently running atoms then this will attempt to
suspend future work from being started (currently active atoms can
not currently be preempted) and move the engine into a suspend state
which can then later be resumed from.
"""

View File

@@ -47,6 +47,13 @@ _TRANSIENT_PROVIDER = object()
# can fail extraction during lookup or emit warning on result reception...
_EXTRACTION_EXCEPTIONS = (IndexError, KeyError, ValueError, TypeError)
# Atom detail metadata key used to inject atom non-transient injected args.
META_INJECTED = 'injected'
# Atom detail metadata key(s) used to set atom progress (with any details).
META_PROGRESS = 'progress'
META_PROGRESS_DETAILS = 'progress_details'
class _Provider(object):
"""A named symbol provider that produces a output at the given index."""
@@ -356,19 +363,19 @@ class Storage(object):
:param details: any task specific progress details
"""
update_with = {
'progress': progress,
META_PROGRESS: progress,
}
if details is not None:
# NOTE(imelnikov): as we can update progress without
# updating details (e.g. automatically from engine)
# we save progress value with details, too.
if details:
update_with['progress_details'] = {
update_with[META_PROGRESS_DETAILS] = {
'at_progress': progress,
'details': details,
}
else:
update_with['progress_details'] = None
update_with[META_PROGRESS_DETAILS] = None
self._update_atom_metadata(task_name, update_with,
expected_type=logbook.TaskDetail)
@@ -382,7 +389,7 @@ class Storage(object):
ad = self._atomdetail_by_name(task_name,
expected_type=logbook.TaskDetail)
try:
return ad.meta['progress']
return ad.meta[META_PROGRESS]
except KeyError:
return 0.0
@@ -397,7 +404,7 @@ class Storage(object):
ad = self._atomdetail_by_name(task_name,
expected_type=logbook.TaskDetail)
try:
return ad.meta['progress_details']
return ad.meta[META_PROGRESS_DETAILS]
except KeyError:
return None
@@ -504,8 +511,12 @@ class Storage(object):
if self._reset_atom(ad, state):
self._with_connection(self._save_atom_detail, ad)
def inject_atom_args(self, atom_name, pairs):
"""Add **transient** values into storage for a specific atom only.
def inject_atom_args(self, atom_name, pairs, transient=True):
"""Add values into storage for a specific atom only.
:param transient: save the data in-memory only instead of persisting
the data to backend storage (useful for resource-like objects
or similar objects which can **not** be persisted)
This method injects a dictionary/pairs of arguments for an atom so that
when that atom is scheduled for execution it will have immediate access
@@ -536,10 +547,26 @@ class Storage(object):
"""
if atom_name not in self._atom_name_to_uuid:
raise exceptions.NotFound("Unknown atom name: %s" % atom_name)
with self._lock.write_lock():
def save_transient():
self._injected_args.setdefault(atom_name, {})
self._injected_args[atom_name].update(pairs)
def save_persistent():
ad = self._atomdetail_by_name(atom_name)
injected = ad.meta.get(META_INJECTED)
if not injected:
injected = {}
injected.update(pairs)
ad.meta[META_INJECTED] = injected
self._with_connection(self._save_atom_detail, ad)
with self._lock.write_lock():
if transient:
save_transient()
else:
save_persistent()
@lock_utils.write_locked
def inject(self, pairs, transient=False):
"""Add values into storage.
@@ -648,11 +675,91 @@ class Storage(object):
return many_handler(values)
@lock_utils.read_locked
def fetch_all(self):
"""Fetch all named results known so far.
def fetch_unsatisfied_args(self, atom_name, args_mapping,
scope_walker=None, optional_args=None):
"""Fetch unsatisfied atom arguments using an atoms argument mapping.
NOTE(harlowja): should be used for debugging and testing purposes.
NOTE(harlowja): this takes into account the provided scope walker
atoms who should produce the required value at runtime, as well as
the transient/persistent flow and atom specific injected arguments.
It does **not** check if the providers actually have produced the
needed values; it just checks that they are registered to produce
it in the future.
"""
def _fetch_providers(name):
"""Fetchs pair of (default providers, non-default providers)."""
default_providers = []
non_default_providers = []
for p in self._reverse_mapping.get(name, []):
if p.name in (_TRANSIENT_PROVIDER, self.injector_name):
default_providers.append(p)
else:
non_default_providers.append(p)
return default_providers, non_default_providers
def _locate_providers(name):
"""Finds the accessible *potential* providers."""
default_providers, non_default_providers = _fetch_providers(name)
providers = []
if non_default_providers:
if scope_walker is not None:
scope_iter = iter(scope_walker)
else:
scope_iter = iter([])
for names in scope_iter:
for p in non_default_providers:
if p.name in names:
providers.append(p)
for p in default_providers:
if p.name is _TRANSIENT_PROVIDER:
results = self._transients
else:
try:
results = self._get(p.name, only_last=True)
except exceptions.NotFound:
results = {}
try:
_item_from_single(p, results, name)
except exceptions.NotFound:
pass
else:
providers.append(p)
return providers
ad = self._atomdetail_by_name(atom_name)
if optional_args is None:
optional_args = []
injected_sources = [
self._injected_args.get(atom_name, {}),
ad.meta.get(META_INJECTED, {}),
]
missing = set(six.iterkeys(args_mapping))
for (bound_name, name) in six.iteritems(args_mapping):
if LOG.isEnabledFor(logging.BLATHER):
LOG.blather("Looking for %r <= %r for atom named: %s",
bound_name, name, atom_name)
if bound_name in optional_args:
LOG.blather("Argument %r is optional, skipping", bound_name)
missing.discard(bound_name)
continue
maybe_providers = 0
for source in injected_sources:
if not source:
continue
if name in source:
maybe_providers += 1
maybe_providers += len(_locate_providers(name))
if maybe_providers:
LOG.blather("Atom %s will have %s potential providers"
" of %r <= %r", atom_name, maybe_providers,
bound_name, name)
missing.discard(bound_name)
return missing
@lock_utils.read_locked
def fetch_all(self):
"""Fetch all named results known so far."""
def many_handler(values):
if len(values) > 1:
return values
@@ -671,6 +778,15 @@ class Storage(object):
optional_args=None):
"""Fetch arguments for an atom using an atoms argument mapping."""
def _extract_first_from(name, sources):
"""Extracts/returns first occurence of key in list of dicts."""
for i, source in enumerate(sources):
if not source:
continue
if name in source:
return (i, source[name])
raise KeyError(name)
def _get_results(looking_for, provider):
"""Gets the results saved for a given provider."""
try:
@@ -710,14 +826,16 @@ class Storage(object):
if optional_args is None:
optional_args = []
if atom_name and atom_name not in self._atom_name_to_uuid:
raise exceptions.NotFound("Unknown atom name: %s" % atom_name)
if atom_name:
ad = self._atomdetail_by_name(atom_name)
injected_sources = [
self._injected_args.get(atom_name, {}),
ad.meta.get(META_INJECTED, {}),
]
else:
injected_sources = []
if not args_mapping:
return {}
if atom_name:
injected_args = self._injected_args.get(atom_name, {})
else:
injected_args = {}
mapped_args = {}
for (bound_name, name) in six.iteritems(args_mapping):
if LOG.isEnabledFor(logging.BLATHER):
@@ -726,16 +844,26 @@ class Storage(object):
bound_name, name, atom_name)
else:
LOG.blather("Looking for %r <= %r", bound_name, name)
if name in injected_args:
value = injected_args[name]
try:
source_index, value = _extract_first_from(name,
injected_sources)
mapped_args[bound_name] = value
LOG.blather("Matched %r <= %r to %r (from injected"
" values)", bound_name, name, value)
else:
if LOG.isEnabledFor(logging.BLATHER):
if source_index == 0:
LOG.blather("Matched %r <= %r to %r (from injected"
" atom-specific transient"
" values)", bound_name, name, value)
else:
LOG.blather("Matched %r <= %r to %r (from injected"
" atom-specific persistent"
" values)", bound_name, name, value)
except KeyError:
try:
possible_providers = self._reverse_mapping[name]
except KeyError:
if bound_name in optional_args:
LOG.blather("Argument %r is optional, skipping",
bound_name)
continue
raise exceptions.NotFound("Name %r is not mapped as a"
" produced output by any"

View File

@@ -533,6 +533,31 @@ class StorageTestMixin(object):
intention = s.get_atom_intention('my retry')
self.assertEqual(intention, states.RETRY)
def test_inject_persistent_missing(self):
t = test_utils.ProgressingTask('my retry', requires=['x'])
s = self._get_storage()
s.ensure_atom(t)
missing = s.fetch_unsatisfied_args(t.name, t.rebind)
self.assertEqual(set(['x']), missing)
s.inject_atom_args(t.name, {'x': 2}, transient=False)
missing = s.fetch_unsatisfied_args(t.name, t.rebind)
self.assertEqual(set(), missing)
args = s.fetch_mapped_args(t.rebind, atom_name=t.name)
self.assertEqual(2, args['x'])
def test_inject_persistent_and_transient_missing(self):
t = test_utils.ProgressingTask('my retry', requires=['x'])
s = self._get_storage()
s.ensure_atom(t)
missing = s.fetch_unsatisfied_args(t.name, t.rebind)
self.assertEqual(set(['x']), missing)
s.inject_atom_args(t.name, {'x': 2}, transient=False)
s.inject_atom_args(t.name, {'x': 3}, transient=True)
missing = s.fetch_unsatisfied_args(t.name, t.rebind)
self.assertEqual(set(), missing)
args = s.fetch_mapped_args(t.rebind, atom_name=t.name)
self.assertEqual(3, args['x'])
class StorageMemoryTest(StorageTestMixin, test.TestCase):
def setUp(self):