From ea272ee743bfcd06bcb371afc54c1e37b35d8c10 Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Fri, 4 Oct 2013 18:21:29 +0400 Subject: [PATCH] Validate each flow state change When engine changing state, the transition is now validated. This helps to avoid complex and error prone checks in engine._change_state and make validating code more reusable in different engines. Change-Id: I2a06823c532926bb3bd034f7252b14bdbbc1fa1d Implements: bp:transition-control --- taskflow/engines/action_engine/engine.py | 6 +- taskflow/exceptions.py | 2 +- taskflow/states.py | 82 +++++++++++++++++++- taskflow/storage.py | 5 +- taskflow/tests/unit/test_check_transition.py | 45 +++++++++++ taskflow/tests/unit/test_storage.py | 4 + 6 files changed, 138 insertions(+), 6 deletions(-) create mode 100644 taskflow/tests/unit/test_check_transition.py diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 63d6bfd4..b9915664 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -104,11 +104,11 @@ class ActionEngine(base.EngineBase): @decorators.locked(lock='_state_lock') def _change_state(self, state): - if (state == states.SUSPENDING and not (self.is_running or - self.is_reverting)): + old_state = self.storage.get_flow_state() + if not states.check_flow_transition(old_state, state): return self.storage.set_flow_state(state) - details = dict(engine=self) + details = dict(engine=self, old_state=old_state) self.notifier.notify(state, details) def on_task_state_change(self, task_action, state, result=None): diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 626ef007..2105ab5e 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -131,4 +131,4 @@ class WrappedFailure(TaskFlowException): return None def __str__(self): - return 'WrappedFailure: %s' % self._causes + return 'WrappedFailure: %s' % [str(cause) for cause in self._causes] diff --git a/taskflow/states.py b/taskflow/states.py index d35b48f3..5e4aa34e 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -2,7 +2,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -16,6 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. +from taskflow import exceptions as exc + # Job states. CLAIMED = 'CLAIMED' FAILURE = 'FAILURE' @@ -33,6 +35,7 @@ RUNNING = RUNNING SUCCESS = SUCCESS SUSPENDING = 'SUSPENDING' SUSPENDED = 'SUSPENDED' +RESUMING = 'RESUMING' # Task states. FAILURE = FAILURE @@ -42,3 +45,80 @@ REVERTING = REVERTING # TODO(harlowja): use when we can timeout tasks?? TIMED_OUT = 'TIMED_OUT' + + +## Flow state transitions +# https://wiki.openstack.org/wiki/TaskFlow/States_of_Task_and_Flow#Flow_States + +_ALLOWED_FLOW_TRANSITIONS = frozenset(( + (PENDING, RUNNING), # run it! + + (RUNNING, SUCCESS), # all tasks finished successfully + (RUNNING, FAILURE), # some of task failed + (RUNNING, SUSPENDING), # engine.suspend was called + + (SUCCESS, RUNNING), # see note below + + (FAILURE, RUNNING), # see note below + (FAILURE, REVERTING), # flow failed, do cleanup now + + (REVERTING, REVERTED), # revert done + (REVERTING, FAILURE), # revert failed + (REVERTING, SUSPENDING), # engine.suspend was called + + (REVERTED, RUNNING), # try again + + (SUSPENDING, SUSPENDED), # suspend finished + (SUSPENDING, SUCCESS), # all tasks finished while we were waiting + (SUSPENDING, FAILURE), # some tasks failed while we were waiting + (SUSPENDING, REVERTED), # all tasks were reverted while we were waiting + + (SUSPENDED, RUNNING), # restart from suspended + (SUSPENDED, REVERTING), # revert from suspended + + (RESUMING, SUSPENDED), # after flow resumed, it is suspended +)) + + +# NOTE(imelnikov) SUCCESS->RUNNING and FAILURE->RUNNING transitions are +# useful when flow or flowdetails baciking it were altered after the flow +# was finished; then, client code may want to run through flow again +# to ensure all tasks from updated flow had a chance to run. + + +# NOTE(imelnikov): Engine cannot transition flow from SUSPENDING to +# SUSPENDED while some tasks from the flow are running and some results +# from them are not retrieved and saved properly, so while flow is +# in SUSPENDING state it may wait for some of the tasks to stop. Then, +# flow can go to SUSPENDED, SUCCESS, FAILURE or REVERTED state depending +# of actual state of the tasks -- e.g. if all tasks were finished +# successfully while we were waiting, flow can be transitioned from +# SUSPENDING to SUCCESS state. + + +_IGNORED_FLOW_TRANSITIONS = frozenset( + (a, b) + for a in (PENDING, FAILURE, SUCCESS, SUSPENDED, REVERTED) + for b in (SUSPENDING, SUSPENDED, RESUMING) + if a != b +) + + +def check_flow_transition(old_state, new_state): + """Check that flow can transition from old_state to new_state. + + If transition can be performed, it returns True. If transition + should be ignored, it returns False. If transition is not + invalid, it raises InvalidStateException. + """ + if old_state == new_state: + return False + pair = (old_state, new_state) + if pair in _ALLOWED_FLOW_TRANSITIONS: + return True + if pair in _IGNORED_FLOW_TRANSITIONS: + return False + if new_state == RESUMING: + return True + raise exc.InvalidStateException( + "Flow transition from %s to %s is not allowed" % pair) diff --git a/taskflow/storage.py b/taskflow/storage.py index 923c0517..15bfa537 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -278,7 +278,10 @@ class Storage(object): def get_flow_state(self): """Set state from flowdetails""" - return self._flowdetail.state + state = self._flowdetail.state + if state is None: + state = states.PENDING + return state class ThreadSafeStorage(Storage): diff --git a/taskflow/tests/unit/test_check_transition.py b/taskflow/tests/unit/test_check_transition.py new file mode 100644 index 00000000..d587fa8f --- /dev/null +++ b/taskflow/tests/unit/test_check_transition.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow import exceptions as exc +from taskflow import states +from taskflow import test + + +class CheckFlowTransitionTest(test.TestCase): + + def test_same_state(self): + self.assertFalse( + states.check_flow_transition(states.SUCCESS, states.SUCCESS)) + + def test_rerunning_allowed(self): + self.assertTrue( + states.check_flow_transition(states.SUCCESS, states.RUNNING)) + + def test_no_resuming_from_pending(self): + self.assertFalse( + states.check_flow_transition(states.PENDING, states.RESUMING)) + + def test_resuming_from_running(self): + self.assertTrue( + states.check_flow_transition(states.RUNNING, states.RESUMING)) + + def test_bad_transition_raises(self): + with self.assertRaisesRegexp(exc.InvalidStateException, + '^Flow transition.*not allowed'): + states.check_flow_transition(states.FAILURE, states.SUCCESS) diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index a1e79b1f..014fd20a 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -185,6 +185,10 @@ class StorageTest(test.TestCase): '^Unknown task name:'): s.get_uuid_by_name('42') + def test_initial_flow_state(self): + s = self._get_storage() + self.assertEquals(s.get_flow_state(), states.PENDING) + def test_get_flow_state(self): _lb, fd = p_utils.temporary_flow_detail(backend=self.backend) fd.state = states.FAILURE