Rename inject_task_args to inject_atom_args

Since storage injection can target atoms and not
just tasks (just one type of atom) the method naming
is more appropriate when named as atom injection (and
associated variable name changes reflect this as well).

Also add more stringent checking around the names of
atoms that are targeted for injection (ensuring that
the name provided actually exists in the storage unit).

Change-Id: I0ad0178240613fda166ea8fcdc441b37290445f8
This commit is contained in:
Joshua Harlow
2014-06-02 23:17:44 -07:00
parent c8a281aa36
commit 2ac96676e4
4 changed files with 23 additions and 10 deletions

View File

@@ -171,7 +171,7 @@ class ActionEngine(base.EngineBase):
else:
self.storage.ensure_task(node.name, version, node.save_as)
if node.inject:
self.storage.inject_task_args(node.name, node.inject)
self.storage.inject_atom_args(node.name, node.inject)
self._change_state(states.SUSPENDED) # does nothing in PENDING state
@lock_utils.locked

View File

@@ -34,7 +34,7 @@ class RetryAction(object):
def _get_retry_args(self, retry):
kwargs = self._storage.fetch_mapped_args(retry.rebind,
task_name=retry.name)
atom_name=retry.name)
kwargs['history'] = self._storage.get_retry_history(retry.name)
return kwargs

View File

@@ -66,7 +66,7 @@ class TaskAction(object):
raise exceptions.InvalidState("Task %s is in invalid state and"
" can't be executed" % task.name)
kwargs = self._storage.fetch_mapped_args(task.rebind,
task_name=task.name)
atom_name=task.name)
task_uuid = self._storage.get_atom_uuid(task.name)
return self._task_executor.execute_task(task, task_uuid, kwargs,
self._on_update_progress)
@@ -83,7 +83,7 @@ class TaskAction(object):
raise exceptions.InvalidState("Task %s is in invalid state and"
" can't be reverted" % task.name)
kwargs = self._storage.fetch_mapped_args(task.rebind,
task_name=task.name)
atom_name=task.name)
task_uuid = self._storage.get_atom_uuid(task.name)
task_result = self._storage.get(task.name)
failures = self._storage.get_failures()

View File

@@ -411,9 +411,22 @@ class Storage(object):
if self._reset_atom(ad, state):
self._with_connection(self._save_atom_detail, ad)
def inject_task_args(self, task_name, injected_args):
self._injected_args.setdefault(task_name, {})
self._injected_args[task_name].update(injected_args)
def inject_atom_args(self, atom_name, pairs):
"""Add *transient* values into storage for a specific atom only.
This method injects a dictionary/pairs of arguments for an atom so that
when that atom is scheduled for execution it will have immediate access
to these arguments.
NOTE(harlowja): injected atom arguments take precedence over arguments
provided by predecessor atoms or arguments provided by injecting into
the flow scope (using the inject() method).
"""
if atom_name not in self._atom_name_to_uuid:
raise exceptions.NotFound("Unknown atom name: %s" % atom_name)
with self._lock.write_lock():
self._injected_args.setdefault(atom_name, {})
self._injected_args[atom_name].update(pairs)
def inject(self, pairs, transient=False):
"""Add values into storage.
@@ -521,12 +534,12 @@ class Storage(object):
pass
return results
def fetch_mapped_args(self, args_mapping, task_name=None):
def fetch_mapped_args(self, args_mapping, atom_name=None):
"""Fetch arguments for an atom using an atoms arguments mapping."""
with self._lock.read_lock():
injected_args = {}
if task_name:
injected_args = self._injected_args.get(task_name, {})
if atom_name:
injected_args = self._injected_args.get(atom_name, {})
mapped_args = {}
for key, name in six.iteritems(args_mapping):
if name in injected_args: