Split-off the additional retry states from the task states

Split the states that are not task states (but are retry
states) into there own additional set and then use that set
and a new function to validate the transition at other locations
in the code-base.

This makes the transitions that are valid for tasks/retries
easily viewable, more easy to read and understand, and
more correct (instead of being a mix of task + retry atom
transitions and states).

Change-Id: I9515c19daf59a21e581f51e757ece2050f348214
This commit is contained in:
Joshua Harlow 2015-06-17 23:34:31 -07:00
parent caf37be345
commit 2fa4af7a24
12 changed files with 142 additions and 97 deletions

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 20 KiB

After

Width:  |  Height:  |  Size: 20 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 18 KiB

After

Width:  |  Height:  |  Size: 18 KiB

View File

@ -35,7 +35,3 @@ class Action(object):
def __init__(self, storage, notifier):
self._storage = storage
self._notifier = notifier
@abc.abstractmethod
def handles(self, atom):
"""Checks if this action handles the provided atom."""

View File

@ -48,10 +48,6 @@ class RetryAction(base.Action):
super(RetryAction, self).__init__(storage, notifier)
self._executor = futures.SynchronousExecutor()
@staticmethod
def handles(atom):
return isinstance(atom, retry_atom.Retry)
def _get_retry_args(self, retry, addons=None):
arguments = self._storage.fetch_mapped_args(
retry.rebind,

View File

@ -32,10 +32,6 @@ class TaskAction(base.Action):
super(TaskAction, self).__init__(storage, notifier)
self._task_executor = task_executor
@staticmethod
def handles(atom):
return isinstance(atom, task_atom.BaseTask)
def _is_identity_transition(self, old_state, state, task, progress):
if state in base.SAVE_RESULT_STATES:
# saving result is never identity transition

View File

@ -34,6 +34,7 @@ class Analyzer(object):
def __init__(self, runtime):
self._storage = runtime.storage
self._execution_graph = runtime.compilation.execution_graph
self._check_atom_transition = runtime.check_atom_transition
def get_next_nodes(self, node=None):
if node is None:
@ -93,37 +94,37 @@ class Analyzer(object):
available_nodes.append(node)
return available_nodes
def _is_ready_for_execute(self, task):
"""Checks if task is ready to be executed."""
state = self.get_state(task)
intention = self._storage.get_atom_intention(task.name)
transition = st.check_task_transition(state, st.RUNNING)
def _is_ready_for_execute(self, atom):
"""Checks if atom is ready to be executed."""
state = self.get_state(atom)
intention = self._storage.get_atom_intention(atom.name)
transition = self._check_atom_transition(atom, state, st.RUNNING)
if not transition or intention != st.EXECUTE:
return False
task_names = []
for prev_task in self._execution_graph.predecessors(task):
task_names.append(prev_task.name)
atom_names = []
for prev_atom in self._execution_graph.predecessors(atom):
atom_names.append(prev_atom.name)
task_states = self._storage.get_atoms_states(task_names)
atom_states = self._storage.get_atoms_states(atom_names)
return all(state == st.SUCCESS and intention == st.EXECUTE
for state, intention in six.itervalues(task_states))
for state, intention in six.itervalues(atom_states))
def _is_ready_for_revert(self, task):
"""Checks if task is ready to be reverted."""
state = self.get_state(task)
intention = self._storage.get_atom_intention(task.name)
transition = st.check_task_transition(state, st.REVERTING)
def _is_ready_for_revert(self, atom):
"""Checks if atom is ready to be reverted."""
state = self.get_state(atom)
intention = self._storage.get_atom_intention(atom.name)
transition = self._check_atom_transition(atom, state, st.REVERTING)
if not transition or intention not in (st.REVERT, st.RETRY):
return False
task_names = []
for prev_task in self._execution_graph.successors(task):
task_names.append(prev_task.name)
atom_names = []
for prev_atom in self._execution_graph.successors(atom):
atom_names.append(prev_atom.name)
task_states = self._storage.get_atoms_states(task_names)
atom_states = self._storage.get_atoms_states(atom_names)
return all(state in (st.PENDING, st.REVERTED)
for state, intention in six.itervalues(task_states))
for state, intention in six.itervalues(atom_states))
def iterate_subgraph(self, atom):
"""Iterates a subgraph connected to given atom."""
@ -148,10 +149,10 @@ class Analyzer(object):
return self._execution_graph.node[atom].get('retry')
def is_success(self):
for node in self._execution_graph.nodes_iter():
if self.get_state(node) != st.SUCCESS:
for atom in self.iterate_all_nodes():
if self.get_state(atom) != st.SUCCESS:
return False
return True
def get_state(self, node):
return self._storage.get_atom_state(node.name)
def get_state(self, atom):
return self._storage.get_atom_state(atom.name)

View File

@ -295,6 +295,7 @@ class ActionEngine(base.Engine):
self.storage,
self.atom_notifier,
self._task_executor)
self._runtime.compile()
self._compiled = True

View File

@ -14,6 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
from taskflow.engines.action_engine import analyzer as an
@ -22,6 +24,7 @@ from taskflow.engines.action_engine import runner as ru
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
from taskflow import states as st
from taskflow import task
from taskflow.utils import misc
@ -38,7 +41,30 @@ class Runtime(object):
self._task_executor = task_executor
self._storage = storage
self._compilation = compilation
self._walkers_to_names = {}
self._atom_cache = {}
def compile(self):
# Build out a cache of commonly used item that are associated
# with the contained atoms (by name), and are useful to have for
# quick lookup on...
change_state_handlers = {
'task': functools.partial(self.task_action.change_state,
progress=0.0),
'retry': self.retry_action.change_state,
}
for atom in self.analyzer.iterate_all_nodes():
metadata = {}
walker = sc.ScopeWalker(self.compilation, atom, names_only=True)
if isinstance(atom, task.BaseTask):
check_transition_handler = st.check_task_transition
change_state_handler = change_state_handlers['task']
else:
check_transition_handler = st.check_retry_transition
change_state_handler = change_state_handlers['retry']
metadata['scope_walker'] = walker
metadata['check_transition_handler'] = check_transition_handler
metadata['change_state_handler'] = change_state_handler
self._atom_cache[atom.name] = metadata
@property
def compilation(self):
@ -75,56 +101,52 @@ class Runtime(object):
self._atom_notifier,
self._task_executor)
def check_atom_transition(self, atom, current_state, target_state):
"""Checks if the atom can transition to the provided target state."""
# This does not check if the name exists (since this is only used
# internally to the engine, and is not exposed to atoms that will
# not exist and therefore doesn't need to handle that case).
metadata = self._atom_cache[atom.name]
check_transition_handler = metadata['check_transition_handler']
return check_transition_handler(current_state, target_state)
def fetch_scopes_for(self, atom_name):
"""Fetches a walker of the visible scopes for the given atom."""
try:
return self._walkers_to_names[atom_name]
metadata = self._atom_cache[atom_name]
except KeyError:
atom = None
for node in self.analyzer.iterate_all_nodes():
if node.name == atom_name:
atom = node
break
if atom is not None:
walker = sc.ScopeWalker(self.compilation, atom,
names_only=True)
self._walkers_to_names[atom_name] = walker
# This signals to the caller that there is no walker for whatever
# atom name was given that doesn't really have any associated atom
# known to be named with that name; this is done since the storage
# layer will call into this layer to fetch a scope for a named
# atom and users can provide random names that do not actually
# exist...
return None
else:
walker = None
return walker
return metadata['scope_walker']
# Various helper methods used by the runtime components; not for public
# consumption...
def reset_nodes(self, nodes, state=st.PENDING, intention=st.EXECUTE):
def reset_nodes(self, atoms, state=st.PENDING, intention=st.EXECUTE):
tweaked = []
node_state_handlers = [
(self.task_action, {'progress': 0.0}),
(self.retry_action, {}),
]
for node in nodes:
for atom in atoms:
metadata = self._atom_cache[atom.name]
if state or intention:
tweaked.append((node, state, intention))
tweaked.append((atom, state, intention))
if state:
handled = False
for h, kwargs in node_state_handlers:
if h.handles(node):
h.change_state(node, state, **kwargs)
handled = True
break
if not handled:
raise TypeError("Unknown how to reset state of"
" node '%s' (%s)" % (node, type(node)))
change_state_handler = metadata['change_state_handler']
change_state_handler(atom, state)
if intention:
self.storage.set_atom_intention(node.name, intention)
self.storage.set_atom_intention(atom.name, intention)
return tweaked
def reset_all(self, state=st.PENDING, intention=st.EXECUTE):
return self.reset_nodes(self.analyzer.iterate_all_nodes(),
state=state, intention=intention)
def reset_subgraph(self, node, state=st.PENDING, intention=st.EXECUTE):
return self.reset_nodes(self.analyzer.iterate_subgraph(node),
def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE):
return self.reset_nodes(self.analyzer.iterate_subgraph(atom),
state=state, intention=intention)
def retry_subflow(self, retry):

View File

@ -69,10 +69,10 @@ _ALLOWED_JOB_TRANSITIONS = frozenset((
def check_job_transition(old_state, new_state):
"""Check that job can transition from old_state to new_state.
"""Check that job can transition from from ``old_state`` to ``new_state``.
If transition can be performed, it returns True. If transition
should be ignored, it returns False. If transition is not
If transition can be performed, it returns true. If transition
should be ignored, it returns false. If transition is not
valid, it raises an InvalidState exception.
"""
if old_state == new_state:
@ -138,10 +138,10 @@ _IGNORED_FLOW_TRANSITIONS = frozenset(
def check_flow_transition(old_state, new_state):
"""Check that flow can transition from old_state to new_state.
"""Check that flow can transition from ``old_state`` to ``new_state``.
If transition can be performed, it returns True. If transition
should be ignored, it returns False. If transition is not
If transition can be performed, it returns true. If transition
should be ignored, it returns false. If transition is not
valid, it raises an InvalidState exception.
"""
if old_state == new_state:
@ -171,18 +171,37 @@ _ALLOWED_TASK_TRANSITIONS = frozenset((
(REVERTING, FAILURE), # revert failed
(REVERTED, PENDING), # try again
(SUCCESS, RETRYING), # retrying retry controller
(RETRYING, RUNNING), # run retry controller that has been retrying
))
def check_task_transition(old_state, new_state):
"""Check that task can transition from old_state to new_state.
"""Check that task can transition from ``old_state`` to ``new_state``.
If transition can be performed, it returns True, False otherwise.
If transition can be performed, it returns true, false otherwise.
"""
pair = (old_state, new_state)
if pair in _ALLOWED_TASK_TRANSITIONS:
return True
return False
# Retry state transitions
# See: http://docs.openstack.org/developer/taskflow/states.html#retry
_ALLOWED_RETRY_TRANSITIONS = list(_ALLOWED_TASK_TRANSITIONS)
_ALLOWED_RETRY_TRANSITIONS.extend([
(SUCCESS, RETRYING), # retrying retry controller
(RETRYING, RUNNING), # run retry controller that has been retrying
])
_ALLOWED_RETRY_TRANSITIONS = frozenset(_ALLOWED_RETRY_TRANSITIONS)
def check_retry_transition(old_state, new_state):
"""Check that retry can transition from ``old_state`` to ``new_state``.
If transition can be performed, it returns true, false otherwise.
"""
pair = (old_state, new_state)
if pair in _ALLOWED_RETRY_TRANSITIONS:
return True
return False

View File

@ -45,8 +45,10 @@ class _RunnerTestMixin(object):
task_executor = executor.SerialTaskExecutor()
task_executor.start()
self.addCleanup(task_executor.stop)
return runtime.Runtime(compilation, store,
r = runtime.Runtime(compilation, store,
task_notifier, task_executor)
r.compile()
return r
class RunnerTest(test.TestCase, _RunnerTestMixin):

View File

@ -87,7 +87,7 @@ class CheckTaskTransitionTest(TransitionTest):
def test_from_success_state(self):
self.assertTransitions(from_state=states.SUCCESS,
allowed=(states.REVERTING, states.RETRYING),
allowed=(states.REVERTING,),
ignored=(states.RUNNING, states.SUCCESS,
states.PENDING, states.FAILURE,
states.REVERTED))
@ -112,6 +112,21 @@ class CheckTaskTransitionTest(TransitionTest):
states.RUNNING,
states.SUCCESS, states.FAILURE))
class CheckRetryTransitionTest(CheckTaskTransitionTest):
def setUp(self):
super(CheckRetryTransitionTest, self).setUp()
self.check_transition = states.check_retry_transition
self.transition_exc_regexp = '^Retry transition.*not allowed'
def test_from_success_state(self):
self.assertTransitions(from_state=states.SUCCESS,
allowed=(states.REVERTING, states.RETRYING),
ignored=(states.RUNNING, states.SUCCESS,
states.PENDING, states.FAILURE,
states.REVERTED))
def test_from_retrying_state(self):
self.assertTransitions(from_state=states.RETRYING,
allowed=(states.RUNNING,),

View File

@ -49,12 +49,10 @@ def clean_event(name):
return name
def make_machine(start_state, transitions, disallowed):
def make_machine(start_state, transitions):
machine = fsm.FSM(start_state)
machine.add_state(start_state)
for (start_state, end_state) in transitions:
if start_state in disallowed or end_state in disallowed:
continue
if start_state not in machine:
machine.add_state(start_state)
if end_state not in machine:
@ -125,12 +123,11 @@ def main():
if options.tasks:
source_type = "Tasks"
source = make_machine(states.PENDING,
list(states._ALLOWED_TASK_TRANSITIONS),
[states.RETRYING])
list(states._ALLOWED_TASK_TRANSITIONS))
elif options.retries:
source_type = "Retries"
source = make_machine(states.PENDING,
list(states._ALLOWED_TASK_TRANSITIONS), [])
list(states._ALLOWED_RETRY_TRANSITIONS))
elif options.engines:
source_type = "Engines"
r = runner.Runner(DummyRuntime(), None)
@ -140,15 +137,15 @@ def main():
elif options.wbe_requests:
source_type = "WBE requests"
source = make_machine(protocol.WAITING,
list(protocol._ALLOWED_TRANSITIONS), [])
list(protocol._ALLOWED_TRANSITIONS))
elif options.jobs:
source_type = "Jobs"
source = make_machine(states.UNCLAIMED,
list(states._ALLOWED_JOB_TRANSITIONS), [])
list(states._ALLOWED_JOB_TRANSITIONS))
else:
source_type = "Flow"
source = make_machine(states.PENDING,
list(states._ALLOWED_FLOW_TRANSITIONS), [])
list(states._ALLOWED_FLOW_TRANSITIONS))
graph_name = "%s states" % source_type
g = pydot.Dot(graph_name=graph_name, rankdir='LR',