From 2ac96676e473faa6452d070d6bc73a74f131b1d4 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 2 Jun 2014 23:17:44 -0700 Subject: [PATCH] Rename inject_task_args to inject_atom_args Since storage injection can target atoms and not just tasks (just one type of atom) the method naming is more appropriate when named as atom injection (and associated variable name changes reflect this as well). Also add more stringent checking around the names of atoms that are targeted for injection (ensuring that the name provided actually exists in the storage unit). Change-Id: I0ad0178240613fda166ea8fcdc441b37290445f8 --- taskflow/engines/action_engine/engine.py | 2 +- .../engines/action_engine/retry_action.py | 2 +- taskflow/engines/action_engine/task_action.py | 4 +-- taskflow/storage.py | 25 ++++++++++++++----- 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index e63aeb31..bfc90834 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -171,7 +171,7 @@ class ActionEngine(base.EngineBase): else: self.storage.ensure_task(node.name, version, node.save_as) if node.inject: - self.storage.inject_task_args(node.name, node.inject) + self.storage.inject_atom_args(node.name, node.inject) self._change_state(states.SUSPENDED) # does nothing in PENDING state @lock_utils.locked diff --git a/taskflow/engines/action_engine/retry_action.py b/taskflow/engines/action_engine/retry_action.py index eaedf04b..a1ca3abb 100644 --- a/taskflow/engines/action_engine/retry_action.py +++ b/taskflow/engines/action_engine/retry_action.py @@ -34,7 +34,7 @@ class RetryAction(object): def _get_retry_args(self, retry): kwargs = self._storage.fetch_mapped_args(retry.rebind, - task_name=retry.name) + atom_name=retry.name) kwargs['history'] = self._storage.get_retry_history(retry.name) return kwargs diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index 9ab8c460..c0d1daa5 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -66,7 +66,7 @@ class TaskAction(object): raise exceptions.InvalidState("Task %s is in invalid state and" " can't be executed" % task.name) kwargs = self._storage.fetch_mapped_args(task.rebind, - task_name=task.name) + atom_name=task.name) task_uuid = self._storage.get_atom_uuid(task.name) return self._task_executor.execute_task(task, task_uuid, kwargs, self._on_update_progress) @@ -83,7 +83,7 @@ class TaskAction(object): raise exceptions.InvalidState("Task %s is in invalid state and" " can't be reverted" % task.name) kwargs = self._storage.fetch_mapped_args(task.rebind, - task_name=task.name) + atom_name=task.name) task_uuid = self._storage.get_atom_uuid(task.name) task_result = self._storage.get(task.name) failures = self._storage.get_failures() diff --git a/taskflow/storage.py b/taskflow/storage.py index e3a208a4..353d44f3 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -411,9 +411,22 @@ class Storage(object): if self._reset_atom(ad, state): self._with_connection(self._save_atom_detail, ad) - def inject_task_args(self, task_name, injected_args): - self._injected_args.setdefault(task_name, {}) - self._injected_args[task_name].update(injected_args) + def inject_atom_args(self, atom_name, pairs): + """Add *transient* values into storage for a specific atom only. + + This method injects a dictionary/pairs of arguments for an atom so that + when that atom is scheduled for execution it will have immediate access + to these arguments. + + NOTE(harlowja): injected atom arguments take precedence over arguments + provided by predecessor atoms or arguments provided by injecting into + the flow scope (using the inject() method). + """ + if atom_name not in self._atom_name_to_uuid: + raise exceptions.NotFound("Unknown atom name: %s" % atom_name) + with self._lock.write_lock(): + self._injected_args.setdefault(atom_name, {}) + self._injected_args[atom_name].update(pairs) def inject(self, pairs, transient=False): """Add values into storage. @@ -521,12 +534,12 @@ class Storage(object): pass return results - def fetch_mapped_args(self, args_mapping, task_name=None): + def fetch_mapped_args(self, args_mapping, atom_name=None): """Fetch arguments for an atom using an atoms arguments mapping.""" with self._lock.read_lock(): injected_args = {} - if task_name: - injected_args = self._injected_args.get(task_name, {}) + if atom_name: + injected_args = self._injected_args.get(atom_name, {}) mapped_args = {} for key, name in six.iteritems(args_mapping): if name in injected_args: