From 50eb38707a03473351aa38608a88df079fb18e1a Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 16 Apr 2015 11:42:42 -0700 Subject: [PATCH] Add strict job state transition checking Also adds basic unit tests to test that the states module functions have some level of testing to ensure they don't break. Change-Id: Icedd6e70820bcf484c0dace76f2acf01d9bc967e --- taskflow/jobs/backends/impl_zookeeper.py | 8 +++ taskflow/states.py | 18 ++++- taskflow/tests/unit/test_states.py | 87 ++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 taskflow/tests/unit/test_states.py diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 34ec5b42..b587925e 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -93,6 +93,7 @@ class ZookeeperJob(base.Job): basename = k_paths.basename(self._path) self._root = self._path[0:-len(basename)] self._sequence = int(basename[len(JOB_PREFIX):]) + self._last_state = None @property def lock_path(self): @@ -182,6 +183,13 @@ class ZookeeperJob(base.Job): @property def state(self): + current_state = self._fetch_state() + if self._last_state is not None: + states.check_job_transition(self._last_state, current_state) + self._last_state = current_state + return current_state + + def _fetch_state(self): owner = self.board.find_owner(self) job_data = {} try: diff --git a/taskflow/states.py b/taskflow/states.py index 85cda2d0..c5ea579e 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -68,6 +68,22 @@ _ALLOWED_JOB_TRANSITIONS = frozenset(( )) +def check_job_transition(old_state, new_state): + """Check that job can transition from old_state to new_state. + + If transition can be performed, it returns True. If transition + should be ignored, it returns False. If transition is not + valid, it raises an InvalidState exception. + """ + if old_state == new_state: + return False + pair = (old_state, new_state) + if pair in _ALLOWED_JOB_TRANSITIONS: + return True + raise exc.InvalidState("Job transition from '%s' to '%s' is not allowed" + % pair) + + # Flow state transitions # See: http://docs.openstack.org/developer/taskflow/states.html @@ -135,7 +151,7 @@ def check_flow_transition(old_state, new_state): return True if pair in _IGNORED_FLOW_TRANSITIONS: return False - raise exc.InvalidState("Flow transition from %s to %s is not allowed" + raise exc.InvalidState("Flow transition from '%s' to '%s' is not allowed" % pair) diff --git a/taskflow/tests/unit/test_states.py b/taskflow/tests/unit/test_states.py new file mode 100644 index 00000000..701a6888 --- /dev/null +++ b/taskflow/tests/unit/test_states.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow import exceptions as excp +from taskflow import states +from taskflow import test + + +class TestStates(test.TestCase): + def test_valid_flow_states(self): + for start_state, end_state in states._ALLOWED_FLOW_TRANSITIONS: + self.assertTrue(states.check_flow_transition(start_state, + end_state)) + + def test_ignored_flow_states(self): + for start_state, end_state in states._IGNORED_FLOW_TRANSITIONS: + self.assertFalse(states.check_flow_transition(start_state, + end_state)) + + def test_invalid_flow_states(self): + invalids = [ + # Not a comprhensive set/listing... + (states.RUNNING, states.PENDING), + (states.REVERTED, states.RUNNING), + (states.RESUMING, states.RUNNING), + ] + for start_state, end_state in invalids: + self.assertRaises(excp.InvalidState, + states.check_flow_transition, + start_state, end_state) + + def test_valid_job_states(self): + for start_state, end_state in states._ALLOWED_JOB_TRANSITIONS: + self.assertTrue(states.check_job_transition(start_state, + end_state)) + + def test_ignored_job_states(self): + ignored = [] + for start_state, end_state in states._ALLOWED_JOB_TRANSITIONS: + ignored.append((start_state, start_state)) + ignored.append((end_state, end_state)) + for start_state, end_state in ignored: + self.assertFalse(states.check_job_transition(start_state, + end_state)) + + def test_invalid_job_states(self): + invalids = [ + (states.COMPLETE, states.UNCLAIMED), + (states.UNCLAIMED, states.COMPLETE), + ] + for start_state, end_state in invalids: + self.assertRaises(excp.InvalidState, + states.check_job_transition, + start_state, end_state) + + def test_valid_task_states(self): + for start_state, end_state in states._ALLOWED_TASK_TRANSITIONS: + self.assertTrue(states.check_task_transition(start_state, + end_state)) + + def test_invalid_task_states(self): + invalids = [ + # Not a comprhensive set/listing... + (states.RUNNING, states.PENDING), + (states.PENDING, states.REVERTED), + (states.PENDING, states.SUCCESS), + (states.PENDING, states.FAILURE), + (states.RETRYING, states.PENDING), + ] + for start_state, end_state in invalids: + # TODO(harlowja): fix this so that it raises instead of + # returning false... + self.assertFalse( + states.check_task_transition(start_state, end_state))