Cache atom name -> actions and provide accessor function

Instead of doing repeated type checks in the completer engine
internal component/object just use the runtime compilation routine
to cache the mapping of atom names to actions and then use an exposed
function to fetch the needed action in the completer object
as needed.

Change-Id: I07161e7956d039cf89d057b8082e12b82adcd82f
This commit is contained in:
Joshua Harlow
2015-10-07 17:20:15 -07:00
committed by Thomas Goirand
parent 73d38dce76
commit 96f52eccc1
2 changed files with 17 additions and 21 deletions

View File

@@ -26,7 +26,6 @@ from taskflow.engines.action_engine import executor as ex
from taskflow import logging from taskflow import logging
from taskflow import retry as retry_atom from taskflow import retry as retry_atom
from taskflow import states as st from taskflow import states as st
from taskflow import task as task_atom
from taskflow.types import failure from taskflow.types import failure
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -110,26 +109,10 @@ class Completer(object):
self._runtime = weakref.proxy(runtime) self._runtime = weakref.proxy(runtime)
self._analyzer = runtime.analyzer self._analyzer = runtime.analyzer
self._storage = runtime.storage self._storage = runtime.storage
self._task_action = runtime.task_action
self._retry_action = runtime.retry_action
self._undefined_resolver = RevertAll(self._runtime) self._undefined_resolver = RevertAll(self._runtime)
self._defer_reverts = strutils.bool_from_string( self._defer_reverts = strutils.bool_from_string(
self._runtime.options.get('defer_reverts', False)) self._runtime.options.get('defer_reverts', False))
def _complete_task(self, task, outcome, result):
"""Completes the given task, processes task failure."""
if outcome == ex.EXECUTED:
self._task_action.complete_execution(task, result)
else:
self._task_action.complete_reversion(task, result)
def _complete_retry(self, retry, outcome, result):
"""Completes the given retry, processes retry failure."""
if outcome == ex.EXECUTED:
self._retry_action.complete_execution(retry, result)
else:
self._retry_action.complete_reversion(retry, result)
def resume(self): def resume(self):
"""Resumes atoms in the contained graph. """Resumes atoms in the contained graph.
@@ -165,10 +148,11 @@ class Completer(object):
Returns whether the result should be saved into an accumulator of Returns whether the result should be saved into an accumulator of
failures or whether this should not be done. failures or whether this should not be done.
""" """
if isinstance(node, task_atom.BaseTask): handler = self._runtime.fetch_action(node)
self._complete_task(node, outcome, result) if outcome == ex.EXECUTED:
handler.complete_execution(node, result)
else: else:
self._complete_retry(node, outcome, result) handler.complete_reversion(node, result)
if isinstance(result, failure.Failure): if isinstance(result, failure.Failure):
if outcome == ex.EXECUTED: if outcome == ex.EXECUTED:
self._process_atom_failure(node, result) self._process_atom_failure(node, result)
@@ -182,7 +166,8 @@ class Completer(object):
retry = self._analyzer.find_retry(atom) retry = self._analyzer.find_retry(atom)
if retry is not None: if retry is not None:
# Ask retry controller what to do in case of failure. # Ask retry controller what to do in case of failure.
strategy = self._retry_action.on_failure(retry, atom, failure) handler = self._runtime.fetch_action(retry)
strategy = handler.on_failure(retry, atom, failure)
if strategy == retry_atom.RETRY: if strategy == retry_atom.RETRY:
return RevertAndRetry(self._runtime, retry) return RevertAndRetry(self._runtime, retry)
elif strategy == retry_atom.REVERT: elif strategy == retry_atom.REVERT:

View File

@@ -101,6 +101,10 @@ class Runtime(object):
com.TASK: st.check_task_transition, com.TASK: st.check_task_transition,
com.RETRY: st.check_retry_transition, com.RETRY: st.check_retry_transition,
} }
actions = {
com.TASK: self.task_action,
com.RETRY: self.retry_action,
}
graph = self._compilation.execution_graph graph = self._compilation.execution_graph
for node, node_data in graph.nodes_iter(data=True): for node, node_data in graph.nodes_iter(data=True):
node_kind = node_data['kind'] node_kind = node_data['kind']
@@ -110,6 +114,7 @@ class Runtime(object):
check_transition_handler = check_transition_handlers[node_kind] check_transition_handler = check_transition_handlers[node_kind]
change_state_handler = change_state_handlers[node_kind] change_state_handler = change_state_handlers[node_kind]
scheduler = schedulers[node_kind] scheduler = schedulers[node_kind]
action = actions[node_kind]
else: else:
raise exc.CompilationFailure("Unknown node kind '%s'" raise exc.CompilationFailure("Unknown node kind '%s'"
" encountered" % node_kind) " encountered" % node_kind)
@@ -121,6 +126,7 @@ class Runtime(object):
metadata['change_state_handler'] = change_state_handler metadata['change_state_handler'] = change_state_handler
metadata['scheduler'] = scheduler metadata['scheduler'] = scheduler
metadata['edge_deciders'] = tuple(deciders_it) metadata['edge_deciders'] = tuple(deciders_it)
metadata['action'] = action
self._atom_cache[node.name] = metadata self._atom_cache[node.name] = metadata
@property @property
@@ -197,6 +203,11 @@ class Runtime(object):
# not exist and therefore doesn't need to handle that case). # not exist and therefore doesn't need to handle that case).
return self._fetch_atom_metadata_entry(atom.name, 'scheduler') return self._fetch_atom_metadata_entry(atom.name, 'scheduler')
def fetch_action(self, atom):
"""Fetches the cached action handler for the given atom."""
metadata = self._atom_cache[atom.name]
return metadata['action']
def fetch_scopes_for(self, atom_name): def fetch_scopes_for(self, atom_name):
"""Fetches a walker of the visible scopes for the given atom.""" """Fetches a walker of the visible scopes for the given atom."""
try: try: