From ee1d583721934cf6cec5c4d443ef66a39f9f6141 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Sat, 12 Oct 2013 18:33:04 +0000 Subject: [PATCH] Add task state verification Change-Id: I85abee2a1112ce1b9bb708cb9129f06c794f83b1 --- taskflow/engines/action_engine/task_action.py | 13 ++++- taskflow/states.py | 51 ++++++++++++++++++- tools/state_graph.py | 10 +++- 3 files changed, 70 insertions(+), 4 deletions(-) diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index bee0266f..0bf61878 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -55,8 +55,16 @@ class TaskAction(base.Action): def uuid(self): return self._id - def _change_state(self, engine, state, result=None, progress=None): + def _change_state(self, engine, state, + result=None, progress=None, force=False): """Update result and change state.""" + old_state = engine.storage.get_task_state(self.uuid) + state_check = states.check_task_transition(old_state, state) + if not force and not state_check: + # NOTE(harlowja): if we are forcing this state change, we don't + # care if the state transition should be ignored, if it's not being + # forced then we just ignore this state change. + return if state in RESET_TASK_STATES: engine.storage.reset(self.uuid) if state in SAVE_RESULT_STATES: @@ -79,7 +87,8 @@ class TaskAction(base.Action): task, self.uuid, progress) def _force_state(self, engine, state, progress, result=None): - self._change_state(engine, state, result=result, progress=progress) + self._change_state(engine, state, + result=result, progress=progress, force=True) self._task.update_progress(progress) def execute(self, engine): diff --git a/taskflow/states.py b/taskflow/states.py index 07dc6e63..1ffed0b1 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -39,9 +39,10 @@ RESUMING = 'RESUMING' # Task states. FAILURE = FAILURE -SUCCESS = SUCCESS +PENDING = PENDING REVERTED = REVERTED REVERTING = REVERTING +SUCCESS = SUCCESS # TODO(harlowja): use when we can timeout tasks?? TIMED_OUT = 'TIMED_OUT' @@ -122,3 +123,51 @@ def check_flow_transition(old_state, new_state): return True raise exc.InvalidStateException( "Flow transition from %s to %s is not allowed" % pair) + + +## Task state transitions +# https://wiki.openstack.org/wiki/TaskFlow/States_of_Task_and_Flow#Task_States + +_ALLOWED_TASK_TRANSITIONS = [ + (PENDING, RUNNING), # run it! + + (RUNNING, SUCCESS), # the task finished successfully + (RUNNING, FAILURE), # the task failed + + (FAILURE, REVERTING), # task failed, do cleanup now + (SUCCESS, REVERTING), # some other task failed, do cleanup now + + (REVERTING, REVERTED), # revert done + (REVERTING, FAILURE), # revert failed + + (REVERTED, PENDING), # try again +] +_ALLOWED_TASK_TRANSITIONS.extend( + # NOTE(harlowja): the task was 'killed' while in one of the below 'a' + # states and it is permissible to let the task to start running or + # reverting immediately + (a, b) + for a in (REVERTED, REVERTING, RUNNING) + for b in (RUNNING, REVERTING) + if a != b +) +_ALLOWED_TASK_TRANSITIONS = frozenset(_ALLOWED_TASK_TRANSITIONS) + + +def check_task_transition(old_state, new_state): + """Check that task can transition from old_state to new_state. + + If transition can be performed, it returns True. If transition + should be ignored, it returns False. If transition is not + valid, it raises InvalidStateException. + """ + if old_state == new_state: + return False + pair = (old_state, new_state) + if pair in _ALLOWED_TASK_TRANSITIONS: + return True + # TODO(harlowja): Should we check/allow for 3rd party states to be + # triggered during RUNNING by having a concept of a sub-state that we also + # verify against?? + raise exc.InvalidStateException( + "Task transition from %s to %s is not allowed" % pair) diff --git a/tools/state_graph.py b/tools/state_graph.py index 838490d1..35298e2d 100644 --- a/tools/state_graph.py +++ b/tools/state_graph.py @@ -49,9 +49,17 @@ def main(): parser.add_option("-f", "--file", dest="filename", help="write svg to FILE", metavar="FILE", default="states.svg") + parser.add_option("-t", "--tasks", dest="tasks", + action='store_true', + help="use task state transitions", + default=False) (options, args) = parser.parse_args() g = nx.DiGraph(name="State transitions") - for (u, v) in states._ALLOWED_FLOW_TRANSITIONS: + if not options.tasks: + source = states._ALLOWED_FLOW_TRANSITIONS + else: + source = states._ALLOWED_TASK_TRANSITIONS + for (u, v) in source: if not g.has_node(u): g.add_node(u) if not g.has_node(v):