Add strict job state transition checking

Also adds basic unit tests to test that the states module
functions have some level of testing to ensure they don't
break.

Change-Id: Icedd6e70820bcf484c0dace76f2acf01d9bc967e
This commit is contained in:
Joshua Harlow 2015-04-16 11:42:42 -07:00
parent a33c533f70
commit 50eb38707a
3 changed files with 112 additions and 1 deletions

View File

@ -93,6 +93,7 @@ class ZookeeperJob(base.Job):
basename = k_paths.basename(self._path)
self._root = self._path[0:-len(basename)]
self._sequence = int(basename[len(JOB_PREFIX):])
self._last_state = None
@property
def lock_path(self):
@ -182,6 +183,13 @@ class ZookeeperJob(base.Job):
@property
def state(self):
current_state = self._fetch_state()
if self._last_state is not None:
states.check_job_transition(self._last_state, current_state)
self._last_state = current_state
return current_state
def _fetch_state(self):
owner = self.board.find_owner(self)
job_data = {}
try:

View File

@ -68,6 +68,22 @@ _ALLOWED_JOB_TRANSITIONS = frozenset((
))
def check_job_transition(old_state, new_state):
"""Check that job can transition from old_state to new_state.
If transition can be performed, it returns True. If transition
should be ignored, it returns False. If transition is not
valid, it raises an InvalidState exception.
"""
if old_state == new_state:
return False
pair = (old_state, new_state)
if pair in _ALLOWED_JOB_TRANSITIONS:
return True
raise exc.InvalidState("Job transition from '%s' to '%s' is not allowed"
% pair)
# Flow state transitions
# See: http://docs.openstack.org/developer/taskflow/states.html
@ -135,7 +151,7 @@ def check_flow_transition(old_state, new_state):
return True
if pair in _IGNORED_FLOW_TRANSITIONS:
return False
raise exc.InvalidState("Flow transition from %s to %s is not allowed"
raise exc.InvalidState("Flow transition from '%s' to '%s' is not allowed"
% pair)

View File

@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import exceptions as excp
from taskflow import states
from taskflow import test
class TestStates(test.TestCase):
def test_valid_flow_states(self):
for start_state, end_state in states._ALLOWED_FLOW_TRANSITIONS:
self.assertTrue(states.check_flow_transition(start_state,
end_state))
def test_ignored_flow_states(self):
for start_state, end_state in states._IGNORED_FLOW_TRANSITIONS:
self.assertFalse(states.check_flow_transition(start_state,
end_state))
def test_invalid_flow_states(self):
invalids = [
# Not a comprhensive set/listing...
(states.RUNNING, states.PENDING),
(states.REVERTED, states.RUNNING),
(states.RESUMING, states.RUNNING),
]
for start_state, end_state in invalids:
self.assertRaises(excp.InvalidState,
states.check_flow_transition,
start_state, end_state)
def test_valid_job_states(self):
for start_state, end_state in states._ALLOWED_JOB_TRANSITIONS:
self.assertTrue(states.check_job_transition(start_state,
end_state))
def test_ignored_job_states(self):
ignored = []
for start_state, end_state in states._ALLOWED_JOB_TRANSITIONS:
ignored.append((start_state, start_state))
ignored.append((end_state, end_state))
for start_state, end_state in ignored:
self.assertFalse(states.check_job_transition(start_state,
end_state))
def test_invalid_job_states(self):
invalids = [
(states.COMPLETE, states.UNCLAIMED),
(states.UNCLAIMED, states.COMPLETE),
]
for start_state, end_state in invalids:
self.assertRaises(excp.InvalidState,
states.check_job_transition,
start_state, end_state)
def test_valid_task_states(self):
for start_state, end_state in states._ALLOWED_TASK_TRANSITIONS:
self.assertTrue(states.check_task_transition(start_state,
end_state))
def test_invalid_task_states(self):
invalids = [
# Not a comprhensive set/listing...
(states.RUNNING, states.PENDING),
(states.PENDING, states.REVERTED),
(states.PENDING, states.SUCCESS),
(states.PENDING, states.FAILURE),
(states.RETRYING, states.PENDING),
]
for start_state, end_state in invalids:
# TODO(harlowja): fix this so that it raises instead of
# returning false...
self.assertFalse(
states.check_task_transition(start_state, end_state))