From 2fa4af7a24584b211ca549f2df715c2d126360c8 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 17 Jun 2015 23:34:31 -0700 Subject: [PATCH] Split-off the additional retry states from the task states Split the states that are not task states (but are retry states) into there own additional set and then use that set and a new function to validate the transition at other locations in the code-base. This makes the transitions that are valid for tasks/retries easily viewable, more easy to read and understand, and more correct (instead of being a mix of task + retry atom transitions and states). Change-Id: I9515c19daf59a21e581f51e757ece2050f348214 --- doc/source/img/retry_states.svg | 6 +- doc/source/img/task_states.svg | 4 +- .../engines/action_engine/actions/base.py | 4 - .../engines/action_engine/actions/retry.py | 4 - .../engines/action_engine/actions/task.py | 4 - taskflow/engines/action_engine/analyzer.py | 49 ++++++----- taskflow/engines/action_engine/engine.py | 1 + taskflow/engines/action_engine/runtime.py | 88 ++++++++++++------- taskflow/states.py | 41 ++++++--- .../tests/unit/action_engine/test_runner.py | 6 +- taskflow/tests/unit/test_check_transition.py | 17 +++- tools/state_graph.py | 15 ++-- 12 files changed, 142 insertions(+), 97 deletions(-) diff --git a/doc/source/img/retry_states.svg b/doc/source/img/retry_states.svg index 8b0c63573..1a25bda30 100644 --- a/doc/source/img/retry_states.svg +++ b/doc/source/img/retry_states.svg @@ -1,8 +1,8 @@ - - -Retries statesPENDINGRUNNINGSUCCESSFAILURERETRYINGREVERTINGREVERTEDstart + +Retries statesPENDINGRUNNINGSUCCESSFAILURERETRYINGREVERTINGREVERTEDstart diff --git a/doc/source/img/task_states.svg b/doc/source/img/task_states.svg index 14a1f098f..dbb48c602 100644 --- a/doc/source/img/task_states.svg +++ b/doc/source/img/task_states.svg @@ -1,8 +1,8 @@ - -Tasks statesPENDINGRUNNINGSUCCESSFAILUREREVERTINGREVERTEDstart +Tasks statesPENDINGRUNNINGSUCCESSFAILUREREVERTINGREVERTEDstart diff --git a/taskflow/engines/action_engine/actions/base.py b/taskflow/engines/action_engine/actions/base.py index 869ef228e..369a6c66b 100644 --- a/taskflow/engines/action_engine/actions/base.py +++ b/taskflow/engines/action_engine/actions/base.py @@ -35,7 +35,3 @@ class Action(object): def __init__(self, storage, notifier): self._storage = storage self._notifier = notifier - - @abc.abstractmethod - def handles(self, atom): - """Checks if this action handles the provided atom.""" diff --git a/taskflow/engines/action_engine/actions/retry.py b/taskflow/engines/action_engine/actions/retry.py index f69d5a5b8..c8cad50ad 100644 --- a/taskflow/engines/action_engine/actions/retry.py +++ b/taskflow/engines/action_engine/actions/retry.py @@ -48,10 +48,6 @@ class RetryAction(base.Action): super(RetryAction, self).__init__(storage, notifier) self._executor = futures.SynchronousExecutor() - @staticmethod - def handles(atom): - return isinstance(atom, retry_atom.Retry) - def _get_retry_args(self, retry, addons=None): arguments = self._storage.fetch_mapped_args( retry.rebind, diff --git a/taskflow/engines/action_engine/actions/task.py b/taskflow/engines/action_engine/actions/task.py index 2a11bf8df..ab4b50d90 100644 --- a/taskflow/engines/action_engine/actions/task.py +++ b/taskflow/engines/action_engine/actions/task.py @@ -32,10 +32,6 @@ class TaskAction(base.Action): super(TaskAction, self).__init__(storage, notifier) self._task_executor = task_executor - @staticmethod - def handles(atom): - return isinstance(atom, task_atom.BaseTask) - def _is_identity_transition(self, old_state, state, task, progress): if state in base.SAVE_RESULT_STATES: # saving result is never identity transition diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py index a8f20c0dd..bef7b8b5f 100644 --- a/taskflow/engines/action_engine/analyzer.py +++ b/taskflow/engines/action_engine/analyzer.py @@ -34,6 +34,7 @@ class Analyzer(object): def __init__(self, runtime): self._storage = runtime.storage self._execution_graph = runtime.compilation.execution_graph + self._check_atom_transition = runtime.check_atom_transition def get_next_nodes(self, node=None): if node is None: @@ -93,37 +94,37 @@ class Analyzer(object): available_nodes.append(node) return available_nodes - def _is_ready_for_execute(self, task): - """Checks if task is ready to be executed.""" - state = self.get_state(task) - intention = self._storage.get_atom_intention(task.name) - transition = st.check_task_transition(state, st.RUNNING) + def _is_ready_for_execute(self, atom): + """Checks if atom is ready to be executed.""" + state = self.get_state(atom) + intention = self._storage.get_atom_intention(atom.name) + transition = self._check_atom_transition(atom, state, st.RUNNING) if not transition or intention != st.EXECUTE: return False - task_names = [] - for prev_task in self._execution_graph.predecessors(task): - task_names.append(prev_task.name) + atom_names = [] + for prev_atom in self._execution_graph.predecessors(atom): + atom_names.append(prev_atom.name) - task_states = self._storage.get_atoms_states(task_names) + atom_states = self._storage.get_atoms_states(atom_names) return all(state == st.SUCCESS and intention == st.EXECUTE - for state, intention in six.itervalues(task_states)) + for state, intention in six.itervalues(atom_states)) - def _is_ready_for_revert(self, task): - """Checks if task is ready to be reverted.""" - state = self.get_state(task) - intention = self._storage.get_atom_intention(task.name) - transition = st.check_task_transition(state, st.REVERTING) + def _is_ready_for_revert(self, atom): + """Checks if atom is ready to be reverted.""" + state = self.get_state(atom) + intention = self._storage.get_atom_intention(atom.name) + transition = self._check_atom_transition(atom, state, st.REVERTING) if not transition or intention not in (st.REVERT, st.RETRY): return False - task_names = [] - for prev_task in self._execution_graph.successors(task): - task_names.append(prev_task.name) + atom_names = [] + for prev_atom in self._execution_graph.successors(atom): + atom_names.append(prev_atom.name) - task_states = self._storage.get_atoms_states(task_names) + atom_states = self._storage.get_atoms_states(atom_names) return all(state in (st.PENDING, st.REVERTED) - for state, intention in six.itervalues(task_states)) + for state, intention in six.itervalues(atom_states)) def iterate_subgraph(self, atom): """Iterates a subgraph connected to given atom.""" @@ -148,10 +149,10 @@ class Analyzer(object): return self._execution_graph.node[atom].get('retry') def is_success(self): - for node in self._execution_graph.nodes_iter(): - if self.get_state(node) != st.SUCCESS: + for atom in self.iterate_all_nodes(): + if self.get_state(atom) != st.SUCCESS: return False return True - def get_state(self, node): - return self._storage.get_atom_state(node.name) + def get_state(self, atom): + return self._storage.get_atom_state(atom.name) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 124b8a56b..df8d1d3d9 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -295,6 +295,7 @@ class ActionEngine(base.Engine): self.storage, self.atom_notifier, self._task_executor) + self._runtime.compile() self._compiled = True diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index fc16fd9d9..0fba861d3 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -14,6 +14,8 @@ # License for the specific language governing permissions and limitations # under the License. +import functools + from taskflow.engines.action_engine.actions import retry as ra from taskflow.engines.action_engine.actions import task as ta from taskflow.engines.action_engine import analyzer as an @@ -22,6 +24,7 @@ from taskflow.engines.action_engine import runner as ru from taskflow.engines.action_engine import scheduler as sched from taskflow.engines.action_engine import scopes as sc from taskflow import states as st +from taskflow import task from taskflow.utils import misc @@ -38,7 +41,30 @@ class Runtime(object): self._task_executor = task_executor self._storage = storage self._compilation = compilation - self._walkers_to_names = {} + self._atom_cache = {} + + def compile(self): + # Build out a cache of commonly used item that are associated + # with the contained atoms (by name), and are useful to have for + # quick lookup on... + change_state_handlers = { + 'task': functools.partial(self.task_action.change_state, + progress=0.0), + 'retry': self.retry_action.change_state, + } + for atom in self.analyzer.iterate_all_nodes(): + metadata = {} + walker = sc.ScopeWalker(self.compilation, atom, names_only=True) + if isinstance(atom, task.BaseTask): + check_transition_handler = st.check_task_transition + change_state_handler = change_state_handlers['task'] + else: + check_transition_handler = st.check_retry_transition + change_state_handler = change_state_handlers['retry'] + metadata['scope_walker'] = walker + metadata['check_transition_handler'] = check_transition_handler + metadata['change_state_handler'] = change_state_handler + self._atom_cache[atom.name] = metadata @property def compilation(self): @@ -75,56 +101,52 @@ class Runtime(object): self._atom_notifier, self._task_executor) + def check_atom_transition(self, atom, current_state, target_state): + """Checks if the atom can transition to the provided target state.""" + # This does not check if the name exists (since this is only used + # internally to the engine, and is not exposed to atoms that will + # not exist and therefore doesn't need to handle that case). + metadata = self._atom_cache[atom.name] + check_transition_handler = metadata['check_transition_handler'] + return check_transition_handler(current_state, target_state) + def fetch_scopes_for(self, atom_name): """Fetches a walker of the visible scopes for the given atom.""" try: - return self._walkers_to_names[atom_name] + metadata = self._atom_cache[atom_name] except KeyError: - atom = None - for node in self.analyzer.iterate_all_nodes(): - if node.name == atom_name: - atom = node - break - if atom is not None: - walker = sc.ScopeWalker(self.compilation, atom, - names_only=True) - self._walkers_to_names[atom_name] = walker - else: - walker = None - return walker + # This signals to the caller that there is no walker for whatever + # atom name was given that doesn't really have any associated atom + # known to be named with that name; this is done since the storage + # layer will call into this layer to fetch a scope for a named + # atom and users can provide random names that do not actually + # exist... + return None + else: + return metadata['scope_walker'] # Various helper methods used by the runtime components; not for public # consumption... - def reset_nodes(self, nodes, state=st.PENDING, intention=st.EXECUTE): + def reset_nodes(self, atoms, state=st.PENDING, intention=st.EXECUTE): tweaked = [] - node_state_handlers = [ - (self.task_action, {'progress': 0.0}), - (self.retry_action, {}), - ] - for node in nodes: + for atom in atoms: + metadata = self._atom_cache[atom.name] if state or intention: - tweaked.append((node, state, intention)) + tweaked.append((atom, state, intention)) if state: - handled = False - for h, kwargs in node_state_handlers: - if h.handles(node): - h.change_state(node, state, **kwargs) - handled = True - break - if not handled: - raise TypeError("Unknown how to reset state of" - " node '%s' (%s)" % (node, type(node))) + change_state_handler = metadata['change_state_handler'] + change_state_handler(atom, state) if intention: - self.storage.set_atom_intention(node.name, intention) + self.storage.set_atom_intention(atom.name, intention) return tweaked def reset_all(self, state=st.PENDING, intention=st.EXECUTE): return self.reset_nodes(self.analyzer.iterate_all_nodes(), state=state, intention=intention) - def reset_subgraph(self, node, state=st.PENDING, intention=st.EXECUTE): - return self.reset_nodes(self.analyzer.iterate_subgraph(node), + def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE): + return self.reset_nodes(self.analyzer.iterate_subgraph(atom), state=state, intention=intention) def retry_subflow(self, retry): diff --git a/taskflow/states.py b/taskflow/states.py index c5ea579e8..265d6b2c9 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -69,10 +69,10 @@ _ALLOWED_JOB_TRANSITIONS = frozenset(( def check_job_transition(old_state, new_state): - """Check that job can transition from old_state to new_state. + """Check that job can transition from from ``old_state`` to ``new_state``. - If transition can be performed, it returns True. If transition - should be ignored, it returns False. If transition is not + If transition can be performed, it returns true. If transition + should be ignored, it returns false. If transition is not valid, it raises an InvalidState exception. """ if old_state == new_state: @@ -138,10 +138,10 @@ _IGNORED_FLOW_TRANSITIONS = frozenset( def check_flow_transition(old_state, new_state): - """Check that flow can transition from old_state to new_state. + """Check that flow can transition from ``old_state`` to ``new_state``. - If transition can be performed, it returns True. If transition - should be ignored, it returns False. If transition is not + If transition can be performed, it returns true. If transition + should be ignored, it returns false. If transition is not valid, it raises an InvalidState exception. """ if old_state == new_state: @@ -171,18 +171,37 @@ _ALLOWED_TASK_TRANSITIONS = frozenset(( (REVERTING, FAILURE), # revert failed (REVERTED, PENDING), # try again - - (SUCCESS, RETRYING), # retrying retry controller - (RETRYING, RUNNING), # run retry controller that has been retrying )) def check_task_transition(old_state, new_state): - """Check that task can transition from old_state to new_state. + """Check that task can transition from ``old_state`` to ``new_state``. - If transition can be performed, it returns True, False otherwise. + If transition can be performed, it returns true, false otherwise. """ pair = (old_state, new_state) if pair in _ALLOWED_TASK_TRANSITIONS: return True return False + + +# Retry state transitions +# See: http://docs.openstack.org/developer/taskflow/states.html#retry + +_ALLOWED_RETRY_TRANSITIONS = list(_ALLOWED_TASK_TRANSITIONS) +_ALLOWED_RETRY_TRANSITIONS.extend([ + (SUCCESS, RETRYING), # retrying retry controller + (RETRYING, RUNNING), # run retry controller that has been retrying +]) +_ALLOWED_RETRY_TRANSITIONS = frozenset(_ALLOWED_RETRY_TRANSITIONS) + + +def check_retry_transition(old_state, new_state): + """Check that retry can transition from ``old_state`` to ``new_state``. + + If transition can be performed, it returns true, false otherwise. + """ + pair = (old_state, new_state) + if pair in _ALLOWED_RETRY_TRANSITIONS: + return True + return False diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py index 98ae0e287..eb0f0a2c8 100644 --- a/taskflow/tests/unit/action_engine/test_runner.py +++ b/taskflow/tests/unit/action_engine/test_runner.py @@ -45,8 +45,10 @@ class _RunnerTestMixin(object): task_executor = executor.SerialTaskExecutor() task_executor.start() self.addCleanup(task_executor.stop) - return runtime.Runtime(compilation, store, - task_notifier, task_executor) + r = runtime.Runtime(compilation, store, + task_notifier, task_executor) + r.compile() + return r class RunnerTest(test.TestCase, _RunnerTestMixin): diff --git a/taskflow/tests/unit/test_check_transition.py b/taskflow/tests/unit/test_check_transition.py index bed7bc995..7c820fd9b 100644 --- a/taskflow/tests/unit/test_check_transition.py +++ b/taskflow/tests/unit/test_check_transition.py @@ -87,7 +87,7 @@ class CheckTaskTransitionTest(TransitionTest): def test_from_success_state(self): self.assertTransitions(from_state=states.SUCCESS, - allowed=(states.REVERTING, states.RETRYING), + allowed=(states.REVERTING,), ignored=(states.RUNNING, states.SUCCESS, states.PENDING, states.FAILURE, states.REVERTED)) @@ -112,6 +112,21 @@ class CheckTaskTransitionTest(TransitionTest): states.RUNNING, states.SUCCESS, states.FAILURE)) + +class CheckRetryTransitionTest(CheckTaskTransitionTest): + + def setUp(self): + super(CheckRetryTransitionTest, self).setUp() + self.check_transition = states.check_retry_transition + self.transition_exc_regexp = '^Retry transition.*not allowed' + + def test_from_success_state(self): + self.assertTransitions(from_state=states.SUCCESS, + allowed=(states.REVERTING, states.RETRYING), + ignored=(states.RUNNING, states.SUCCESS, + states.PENDING, states.FAILURE, + states.REVERTED)) + def test_from_retrying_state(self): self.assertTransitions(from_state=states.RETRYING, allowed=(states.RUNNING,), diff --git a/tools/state_graph.py b/tools/state_graph.py index 319614041..253ce6068 100755 --- a/tools/state_graph.py +++ b/tools/state_graph.py @@ -49,12 +49,10 @@ def clean_event(name): return name -def make_machine(start_state, transitions, disallowed): +def make_machine(start_state, transitions): machine = fsm.FSM(start_state) machine.add_state(start_state) for (start_state, end_state) in transitions: - if start_state in disallowed or end_state in disallowed: - continue if start_state not in machine: machine.add_state(start_state) if end_state not in machine: @@ -125,12 +123,11 @@ def main(): if options.tasks: source_type = "Tasks" source = make_machine(states.PENDING, - list(states._ALLOWED_TASK_TRANSITIONS), - [states.RETRYING]) + list(states._ALLOWED_TASK_TRANSITIONS)) elif options.retries: source_type = "Retries" source = make_machine(states.PENDING, - list(states._ALLOWED_TASK_TRANSITIONS), []) + list(states._ALLOWED_RETRY_TRANSITIONS)) elif options.engines: source_type = "Engines" r = runner.Runner(DummyRuntime(), None) @@ -140,15 +137,15 @@ def main(): elif options.wbe_requests: source_type = "WBE requests" source = make_machine(protocol.WAITING, - list(protocol._ALLOWED_TRANSITIONS), []) + list(protocol._ALLOWED_TRANSITIONS)) elif options.jobs: source_type = "Jobs" source = make_machine(states.UNCLAIMED, - list(states._ALLOWED_JOB_TRANSITIONS), []) + list(states._ALLOWED_JOB_TRANSITIONS)) else: source_type = "Flow" source = make_machine(states.PENDING, - list(states._ALLOWED_FLOW_TRANSITIONS), []) + list(states._ALLOWED_FLOW_TRANSITIONS)) graph_name = "%s states" % source_type g = pydot.Dot(graph_name=graph_name, rankdir='LR',