Move atom action handlers to there own subfolder/submodule

Since action handlers are atom specific it seems appropriate
to place the handlers under a specific submodule so that the
action engine module/folder layout is more obvious and makes
more sense.

This also adds on a handles(atom) method to those handlers
so that the runtime module/class can use those methods to
determine which handler should handle resetting an atom to
its initial state.

Change-Id: I7067d91347b41613fba1492d81d68a590f22c467
This commit is contained in:
Joshua Harlow
2014-12-04 23:59:54 -08:00
parent cb9bc57aa9
commit 4707ac74e1
4 changed files with 21 additions and 10 deletions

View File

@@ -17,7 +17,7 @@
import logging
from taskflow.engines.action_engine import executor as ex
from taskflow import retry as rt
from taskflow import retry as retry_atom
from taskflow import states
from taskflow.types import failure
from taskflow.types import futures
@@ -28,19 +28,25 @@ SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE)
class RetryAction(object):
"""An action that handles executing, state changes, ... of retry atoms."""
def __init__(self, storage, notifier, walker_factory):
self._storage = storage
self._notifier = notifier
self._walker_factory = walker_factory
self._executor = futures.SynchronousExecutor()
@staticmethod
def handles(atom):
return isinstance(atom, retry_atom.Retry)
def _get_retry_args(self, retry, addons=None):
scope_walker = self._walker_factory(retry)
kwargs = self._storage.fetch_mapped_args(retry.rebind,
atom_name=retry.name,
scope_walker=scope_walker)
history = self._storage.get_retry_history(retry.name)
kwargs[rt.EXECUTE_REVERT_HISTORY] = history
kwargs[retry_atom.EXECUTE_REVERT_HISTORY] = history
if addons:
kwargs.update(addons)
return kwargs
@@ -103,7 +109,7 @@ class RetryAction(object):
self.change_state(retry, states.REVERTING)
arg_addons = {
rt.REVERT_FLOW_FAILURES: self._storage.get_failures(),
retry_atom.REVERT_FLOW_FAILURES: self._storage.get_failures(),
}
fut = self._executor.submit(_execute_retry,
self._get_retry_args(retry,

View File

@@ -17,6 +17,7 @@
import logging
from taskflow import states
from taskflow import task as task_atom
from taskflow.types import failure
LOG = logging.getLogger(__name__)
@@ -25,6 +26,7 @@ SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE)
class TaskAction(object):
"""An action that handles scheduling, state changes, ... of task atoms."""
def __init__(self, storage, task_executor, notifier, walker_factory):
self._storage = storage
@@ -32,6 +34,10 @@ class TaskAction(object):
self._notifier = notifier
self._walker_factory = walker_factory
@staticmethod
def handles(atom):
return isinstance(atom, task_atom.BaseTask)
def _is_identity_transition(self, state, task, progress):
if state in SAVE_RESULT_STATES:
# saving result is never identity transition

View File

@@ -14,16 +14,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
from taskflow.engines.action_engine import analyzer as an
from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import retry_action as ra
from taskflow.engines.action_engine import runner as ru
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
from taskflow.engines.action_engine import task_action as ta
from taskflow import retry as retry_atom
from taskflow import states as st
from taskflow import task as task_atom
from taskflow.utils import misc
@@ -93,9 +91,10 @@ class Runtime(object):
def reset_nodes(self, nodes, state=st.PENDING, intention=st.EXECUTE):
for node in nodes:
if state:
if isinstance(node, task_atom.BaseTask):
self.task_action.change_state(node, state, progress=0.0)
elif isinstance(node, retry_atom.Retry):
if self.task_action.handles(node):
self.task_action.change_state(node, state,
progress=0.0)
elif self.retry_action.handles(node):
self.retry_action.change_state(node, state)
else:
raise TypeError("Unknown how to reset atom '%s' (%s)"