Merge "Remove task_action state checks"

This commit is contained in:
Jenkins
2013-10-17 04:21:36 +00:00
committed by Gerrit Code Review
2 changed files with 48 additions and 33 deletions

View File

@@ -28,8 +28,6 @@ LOG = logging.getLogger(__name__)
RESET_TASK_STATES = (states.PENDING,) RESET_TASK_STATES = (states.PENDING,)
SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE) SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE)
ALREADY_FINISHED_STATES = (states.SUCCESS,)
NEVER_RAN_STATES = (states.PENDING,)
@contextlib.contextmanager @contextlib.contextmanager
@@ -55,16 +53,11 @@ class TaskAction(base.Action):
def uuid(self): def uuid(self):
return self._id return self._id
def _change_state(self, engine, state, def _change_state(self, engine, state, result=None, progress=None):
result=None, progress=None, force=False):
"""Update result and change state.""" """Update result and change state."""
old_state = engine.storage.get_task_state(self.uuid) old_state = engine.storage.get_task_state(self.uuid)
state_check = states.check_task_transition(old_state, state) if not states.check_task_transition(old_state, state):
if not force and not state_check: return False
# NOTE(harlowja): if we are forcing this state change, we don't
# care if the state transition should be ignored, if it's not being
# forced then we just ignore this state change.
return
if state in RESET_TASK_STATES: if state in RESET_TASK_STATES:
engine.storage.reset(self.uuid) engine.storage.reset(self.uuid)
if state in SAVE_RESULT_STATES: if state in SAVE_RESULT_STATES:
@@ -74,6 +67,7 @@ class TaskAction(base.Action):
if progress is not None: if progress is not None:
engine.storage.set_task_progress(self.uuid, progress) engine.storage.set_task_progress(self.uuid, progress)
engine._on_task_state_change(self, state, result=result) engine._on_task_state_change(self, state, result=result)
return True
def _on_update_progress(self, task, event_data, progress, **kwargs): def _on_update_progress(self, task, event_data, progress, **kwargs):
"""Update task progress value that stored in engine.""" """Update task progress value that stored in engine."""
@@ -86,16 +80,17 @@ class TaskAction(base.Action):
LOG.exception("Failed setting task progress for %s (%s) to %0.3f", LOG.exception("Failed setting task progress for %s (%s) to %0.3f",
task, self.uuid, progress) task, self.uuid, progress)
def _force_state(self, engine, state, progress, result=None): def _change_state_update_task(self, engine, state, progress, result=None):
self._change_state(engine, state, stated_changed = self._change_state(engine, state,
result=result, progress=progress, force=True) result=result, progress=progress)
if not stated_changed:
return False
self._task.update_progress(progress) self._task.update_progress(progress)
return True
def execute(self, engine): def execute(self, engine):
if engine.storage.get_task_state(self.uuid) in ALREADY_FINISHED_STATES: if not self._change_state_update_task(engine, states.RUNNING, 0.0):
# Skip tasks that already finished.
return return
self._force_state(engine, states.RUNNING, 0.0)
with _autobind(self._task, with _autobind(self._task,
'update_progress', self._on_update_progress, 'update_progress', self._on_update_progress,
engine=engine): engine=engine):
@@ -106,15 +101,15 @@ class TaskAction(base.Action):
failure = misc.Failure() failure = misc.Failure()
self._change_state(engine, states.FAILURE, result=failure) self._change_state(engine, states.FAILURE, result=failure)
failure.reraise() failure.reraise()
self._force_state(engine, states.SUCCESS, 1.0, result=result) self._change_state_update_task(engine, states.SUCCESS, 1.0,
result=result)
def revert(self, engine): def revert(self, engine):
if engine.storage.get_task_state(self.uuid) in NEVER_RAN_STATES: if not self._change_state_update_task(engine, states.REVERTING, 0.0):
# NOTE(imelnikov): in all the other states, the task # NOTE(imelnikov): in all the other states, the task
# execution was at least attempted, so we should give # execution was at least attempted, so we should give
# task a chance for cleanup # task a chance for cleanup
return return
self._force_state(engine, states.REVERTING, 0.0)
with _autobind(self._task, with _autobind(self._task,
'update_progress', self._on_update_progress, 'update_progress', self._on_update_progress,
engine=engine): engine=engine):
@@ -125,5 +120,5 @@ class TaskAction(base.Action):
except Exception: except Exception:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
self._change_state(engine, states.FAILURE) self._change_state(engine, states.FAILURE)
self._force_state(engine, states.REVERTED, 1.0) self._change_state_update_task(engine, states.REVERTED, 1.0)
self._force_state(engine, states.PENDING, 0.0) self._change_state_update_task(engine, states.PENDING, 0.0)

View File

@@ -128,7 +128,7 @@ def check_flow_transition(old_state, new_state):
## Task state transitions ## Task state transitions
# https://wiki.openstack.org/wiki/TaskFlow/States_of_Task_and_Flow#Task_States # https://wiki.openstack.org/wiki/TaskFlow/States_of_Task_and_Flow#Task_States
_ALLOWED_TASK_TRANSITIONS = [ _ALLOWED_TASK_TRANSITIONS = frozenset((
(PENDING, RUNNING), # run it! (PENDING, RUNNING), # run it!
(RUNNING, SUCCESS), # the task finished successfully (RUNNING, SUCCESS), # the task finished successfully
@@ -141,17 +141,37 @@ _ALLOWED_TASK_TRANSITIONS = [
(REVERTING, FAILURE), # revert failed (REVERTING, FAILURE), # revert failed
(REVERTED, PENDING), # try again (REVERTED, PENDING), # try again
]
_ALLOWED_TASK_TRANSITIONS.extend( # NOTE(harlowja): allow the tasks to restart if in the same state
# NOTE(harlowja): the task was 'killed' while in one of the below 'a' # as a they were in before as a task may be 'killed' while in one of the
# below states and it is permissible to let the task to re-enter that
# same state to try to finish
(REVERTING, REVERTING),
(RUNNING, RUNNING),
# NOTE(harlowja): the task was 'killed' while in one of the starting/ending
# states and it is permissible to let the task to start running or # states and it is permissible to let the task to start running or
# reverting immediately # reverting again (if it really wants too)
(a, b) (REVERTING, RUNNING),
for a in (REVERTED, REVERTING, RUNNING) (RUNNING, REVERTING),
for b in (RUNNING, REVERTING) ))
if a != b
_IGNORED_TASK_TRANSITIONS = [
(SUCCESS, RUNNING), # already finished
(PENDING, REVERTING), # never ran in the first place
(REVERTED, REVERTING), # the task already reverted
]
# NOTE(harlowja): ignore transitions to the same state (in these cases).
#
# NOTE(harlowja): the above ALLOWED_TASK_TRANSITIONS does allow
# transitions to certain equivalent states (but only for a few special
# cases)
_IGNORED_TASK_TRANSITIONS.extend(
(a, a) for a in (PENDING, FAILURE, SUCCESS, REVERTED)
) )
_ALLOWED_TASK_TRANSITIONS = frozenset(_ALLOWED_TASK_TRANSITIONS)
_IGNORED_TASK_TRANSITIONS = frozenset(_IGNORED_TASK_TRANSITIONS)
def check_task_transition(old_state, new_state): def check_task_transition(old_state, new_state):
@@ -161,11 +181,11 @@ def check_task_transition(old_state, new_state):
should be ignored, it returns False. If transition is not should be ignored, it returns False. If transition is not
valid, it raises InvalidStateException. valid, it raises InvalidStateException.
""" """
if old_state == new_state:
return False
pair = (old_state, new_state) pair = (old_state, new_state)
if pair in _ALLOWED_TASK_TRANSITIONS: if pair in _ALLOWED_TASK_TRANSITIONS:
return True return True
if pair in _IGNORED_TASK_TRANSITIONS:
return False
# TODO(harlowja): Should we check/allow for 3rd party states to be # TODO(harlowja): Should we check/allow for 3rd party states to be
# triggered during RUNNING by having a concept of a sub-state that we also # triggered during RUNNING by having a concept of a sub-state that we also
# verify against?? # verify against??