Merge "Test case for base action module"

This commit is contained in:
Jenkins 2015-08-11 14:11:09 +00:00 committed by Gerrit Code Review
commit 963fdf6163
6 changed files with 867 additions and 34 deletions

View File

@ -1461,7 +1461,7 @@ def action_delete(context, action_id, force=False):
action = query.get(action_id)
if not action:
raise exception.ActionNotFound(action=action_id)
return
# TODO(liuh): Need check if and how an action can be safety deleted
action.delete()

View File

@ -94,13 +94,17 @@ class Action(object):
# context will be persisted into database so that any worker thread
# can pick the action up and execute it on behalf of the initiator
self.id = kwargs.get('id', '')
self.id = kwargs.get('id', None)
self.name = kwargs.get('name', '')
# TODO(Qiming): rework the context initialization logic
self.context = req_context.RequestContext.from_dict(context.to_dict())
# TODO(Qiming): make description a db column
self.description = kwargs.get('description', '')
# Target is the ID of a cluster, a node, a profile
# TODO(Qiming): make this a positional argument
# Target is the ID of a cluster, a node or a policy
self.target = kwargs.get('target')
self.action = action
@ -151,6 +155,8 @@ class Action(object):
def store(self, context):
'''Store the action record into database table.'''
timestamp = timeutils.utcnow()
values = {
'name': self.name,
'context': self.context.to_dict(),
@ -168,17 +174,19 @@ class Action(object):
'outputs': self.outputs,
'depends_on': self.depends_on,
'depended_by': self.depended_by,
'created_time': timeutils.utcnow(),
'created_time': self.created_time,
'updated_time': self.updated_time,
'deleted_time': self.deleted_time,
'data': self.data,
}
if self.id:
values['updated_time'] = timeutils.utcnow()
self.updated_time = timestamp
values['updated_time'] = timestamp
db_api.action_update(context, self.id, values)
else:
values['created_time'] = timeutils.utcnow()
self.created_time = timestamp
values['created_time'] = timestamp
action = db_api.action_create(context, values)
self.id = action.id
@ -208,6 +216,8 @@ class Action(object):
'outputs': record.outputs,
'depends_on': record.depends_on,
'depended_by': record.depended_by,
'created_time': record.created_time,
'updated_time': record.updated_time,
'deleted_time': record.deleted_time,
'data': record.data,
}
@ -215,15 +225,15 @@ class Action(object):
return cls(context, record.action, **kwargs)
@classmethod
def load(cls, context, action_id=None, action=None):
def load(cls, context, action_id=None, db_action=None):
'''Retrieve an action from database.'''
if action is None:
action = db_api.action_get(context, action_id)
if db_action is None:
db_action = db_api.action_get(context, action_id)
if action is None:
if db_action is None:
raise exception.ActionNotFound(action=action_id)
return cls._from_db_record(action)
return cls._from_db_record(db_action)
@classmethod
def load_all(cls, context, filters=None, limit=None, marker=None,
@ -248,12 +258,12 @@ class Action(object):
if cmd not in self.COMMANDS:
return
if cmd == self.CANCEL:
if cmd == self.SIG_CANCEL:
expected_statuses = (self.INIT, self.WAITING, self.READY,
self.RUNNING)
elif cmd == self.SUSPEND:
elif cmd == self.SIG_SUSPEND:
expected_statuses = (self.RUNNING)
else: # RESUME
else: # SIG_RESUME
expected_statuses = (self.SUSPENDED)
if self.status not in expected_statuses:
@ -354,7 +364,6 @@ class Action(object):
if target not in ['BEFORE', 'AFTER']:
return
self.data['status'] = policy_mod.CHECK_OK
# TODO(Anyone): This could use the cluster's runtime data
bindings = cp_mod.ClusterPolicy.load_all(self.context, cluster_id,
sort_keys=['priority'],
@ -380,16 +389,20 @@ class Action(object):
self.data['status'] = policy_mod.CHECK_ERROR
self.data['reason'] = _('Policy %(id)s cooldown is still '
'in progress.') % {'id': policy.id}
break
return
method(cluster_id, self)
# Abort policy checking if failures found
if self.data['status'] == policy_mod.CHECK_ERROR:
if ('status' in self.data and
self.data['status'] == policy_mod.CHECK_ERROR):
LOG.warning(_('Failed policy checking: %s'),
self.data['reason'])
self.data['reason'] = _('Policy checking failed at policy '
'%(id)s.') % {'id': policy.id}
return
self.data['status'] = policy_mod.CHECK_OK
return
def to_dict(self):
@ -412,34 +425,32 @@ class Action(object):
'outputs': self.outputs,
'depends_on': self.depends_on,
'depended_by': self.depended_by,
'created_time': self.created_time,
'updated_time': self.updated_time,
'deleted_time': self.deleted_time,
'data': self.data,
}
return action_dict
@classmethod
def from_dict(cls, context=None, **kwargs):
action = kwargs.pop('action')
return cls(context, action, **kwargs)
def ActionProc(context, action_id, worker_id):
'''Action process.'''
# Step 1: lock the action for execution
timestamp = wallclock()
result = db_api.action_acquire(context, action_id, worker_id, timestamp)
if result is None:
db_action = db_api.action_acquire(context, action_id, worker_id, timestamp)
if db_action is None:
LOG.debug(_('Failed locking action "%s" for execution'), action_id)
return False
# Step 2: materialize the action object
action = Action.load(context, action_id=action_id)
action = Action.load(context, db_action=db_action)
LOG.info(_LI('Action %(name)s [%(id)s] started'),
{'name': six.text_type(action.action), 'id': action.id})
reason = 'Action completed'
success = True
try:
# Step 3: execute the action
result, reason = action.execute()
@ -453,6 +464,9 @@ def ActionProc(context, action_id, worker_id):
'%(action)s (%(id)s) execution: %(reason)s'),
{'action': action.action, 'id': action.id,
'reason': reason})
success = False
finally:
# NOTE: locks on action is eventually released here by status update
action.set_status(result, reason)
return success

View File

@ -1481,7 +1481,7 @@ class EngineService(service.Service):
@request_context
def action_get(self, context, identity):
db_action = self.action_find(context, identity)
action = action_mod.Action.load(context, action=db_action)
action = action_mod.Action.load(context, db_action=db_action)
return action.to_dict()
def event_find(self, context, identity, show_deleted=False):

View File

@ -0,0 +1,815 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
from oslo_config import cfg
import six
from senlin.common import exception
from senlin.db.sqlalchemy import api as db_api
from senlin.engine.actions import base as action_base
from senlin.engine import cluster_policy as cp_mod
from senlin.engine import environment
from senlin.engine import event
from senlin.policies import base as policy_mod
from senlin.tests.unit.common import base
from senlin.tests.unit.common import utils
from senlin.tests.unit import fakes
class DummyAction(action_base.Action):
def __init__(self, context, action, **kwargs):
super(DummyAction, self).__init__(context, action, **kwargs)
class ActionBaseTest(base.SenlinTestCase):
def setUp(self):
super(ActionBaseTest, self).setUp()
self.ctx = utils.dummy_context()
self.action_values = {
'name': 'FAKE_NAME',
'target': 'FAKE_TARGET',
'cause': 'FAKE_CAUSE',
'owner': 'FAKE_OWNER',
'interval': 60,
'start_time': 0,
'end_time': 0,
'timeout': 120,
'status': 'FAKE_STATUS',
'status_reason': 'FAKE_STATUS_REASON',
'inputs': {'param': 'value'},
'outputs': {'key': 'output_value'},
'depends_on': ['ACTION_1'],
'depended_by': ['ACTION_2'],
'created_time': None,
'updated_time': None,
'deleted_time': None,
'data': {'data_key': 'data_value'},
}
def _verify_new_action(self, obj, action):
self.assertIsNone(obj.id)
self.assertEqual('', obj.name)
self.assertEqual('', obj.description)
self.assertIsNone(obj.target)
self.assertEqual(action, obj.action)
self.assertEqual('', obj.cause)
self.assertIsNone(obj.owner)
self.assertEqual(-1, obj.interval)
self.assertIsNone(obj.start_time)
self.assertIsNone(obj.end_time)
self.assertEqual(cfg.CONF.default_action_timeout, obj.timeout)
self.assertEqual('INIT', obj.status)
self.assertEqual('', obj.status_reason)
self.assertEqual({}, obj.inputs)
self.assertEqual({}, obj.outputs)
self.assertEqual([], obj.depends_on)
self.assertEqual([], obj.depended_by)
self.assertIsNone(obj.created_time)
self.assertIsNone(obj.updated_time)
self.assertIsNone(obj.deleted_time)
self.assertEqual({}, obj.data)
def test_action_new(self):
for action in ['CLUSTER_CREATE', 'NODE_CREATE', 'POLICY_ATTACH',
'WHAT_EVER']:
obj = action_base.Action(self.ctx, 'CLUSTER_CREATE')
self._verify_new_action(obj, 'CLUSTER_CREATE')
def test_action_init_with_values(self):
values = copy.deepcopy(self.action_values)
values['id'] = 'FAKE_ID'
values['description'] = 'FAKE_DESC'
values['created_time'] = 'FAKE_CREATED_TIME'
values['updated_time'] = 'FAKE_UPDATED_TIME'
values['deleted_time'] = 'FAKE_DELETED_TIME'
obj = action_base.Action(self.ctx, 'OBJECT_ACTION', **values)
self.assertEqual('FAKE_ID', obj.id)
self.assertEqual('FAKE_NAME', obj.name)
self.assertEqual('FAKE_DESC', obj.description)
self.assertEqual('FAKE_TARGET', obj.target)
self.assertEqual('FAKE_CAUSE', obj.cause)
self.assertEqual('FAKE_OWNER', obj.owner)
self.assertEqual(60, obj.interval)
self.assertEqual(0, obj.start_time)
self.assertEqual(0, obj.end_time)
self.assertEqual(120, obj.timeout)
self.assertEqual('FAKE_STATUS', obj.status)
self.assertEqual('FAKE_STATUS_REASON', obj.status_reason)
self.assertEqual({'param': 'value'}, obj.inputs)
self.assertEqual({'key': 'output_value'}, obj.outputs)
self.assertEqual(['ACTION_1'], obj.depends_on)
self.assertEqual(['ACTION_2'], obj.depended_by)
self.assertEqual('FAKE_CREATED_TIME', obj.created_time)
self.assertEqual('FAKE_UPDATED_TIME', obj.updated_time)
self.assertEqual('FAKE_DELETED_TIME', obj.deleted_time)
self.assertEqual({'data_key': 'data_value'}, obj.data)
def test_action_store_for_create(self):
values = copy.deepcopy(self.action_values)
obj = action_base.Action(self.ctx, 'OBJECT_ACTION', **values)
self.assertIsNone(obj.created_time)
self.assertIsNone(obj.updated_time)
self.assertIsNone(obj.deleted_time)
# store for creation
res = obj.store(self.ctx)
self.assertIsNotNone(res)
self.assertEqual(obj.id, res)
self.assertIsNotNone(obj.created_time)
self.assertIsNone(obj.updated_time)
self.assertIsNone(obj.deleted_time)
def test_action_store_for_update(self):
values = copy.deepcopy(self.action_values)
obj = action_base.Action(self.ctx, 'OBJECT_ACTION', **values)
obj_id = obj.store(self.ctx)
self.assertIsNotNone(obj_id)
self.assertIsNotNone(obj.created_time)
self.assertIsNone(obj.updated_time)
self.assertIsNone(obj.deleted_time)
# store for creation
res = obj.store(self.ctx)
self.assertIsNotNone(res)
self.assertEqual(obj_id, res)
self.assertEqual(obj.id, res)
self.assertIsNotNone(obj.created_time)
self.assertIsNotNone(obj.updated_time)
self.assertIsNone(obj.deleted_time)
def test_from_db_record(self):
values = copy.deepcopy(self.action_values)
obj = action_base.Action(self.ctx, 'OBJECT_ACTION', **values)
obj.store(self.ctx)
record = db_api.action_get(self.ctx, obj.id)
action_obj = action_base.Action._from_db_record(record)
self.assertIsInstance(action_obj, action_base.Action)
self.assertEqual(obj.id, action_obj.id)
self.assertEqual(obj.action, action_obj.action)
self.assertEqual(obj.name, action_obj.name)
self.assertEqual(obj.target, action_obj.target)
self.assertEqual(obj.cause, action_obj.cause)
self.assertEqual(obj.owner, action_obj.owner)
self.assertEqual(obj.interval, action_obj.interval)
self.assertEqual(obj.start_time, action_obj.start_time)
self.assertEqual(obj.end_time, action_obj.end_time)
self.assertEqual(obj.timeout, action_obj.timeout)
self.assertEqual(obj.status, action_obj.status)
self.assertEqual(obj.status_reason, action_obj.status_reason)
self.assertEqual(obj.inputs, action_obj.inputs)
self.assertEqual(obj.outputs, action_obj.outputs)
self.assertEqual(obj.depends_on, action_obj.depends_on)
self.assertEqual(obj.depended_by, action_obj.depended_by)
self.assertEqual(obj.created_time, action_obj.created_time)
self.assertEqual(obj.updated_time, action_obj.updated_time)
self.assertEqual(obj.deleted_time, action_obj.deleted_time)
self.assertEqual(obj.data, action_obj.data)
def test_load(self):
ex = self.assertRaises(exception.ActionNotFound,
action_base.Action.load,
self.ctx, 'non-existent', None)
self.assertEqual('The action (non-existent) could not be found.',
six.text_type(ex))
values = copy.deepcopy(self.action_values)
obj = action_base.Action(self.ctx, 'OBJECT_ACTION', **values)
obj.store(self.ctx)
result = action_base.Action.load(self.ctx, obj.id, None)
# no need to do a thorough test here
self.assertEqual(obj.id, result.id)
self.assertEqual(obj.action, result.action)
db_action = db_api.action_get(self.ctx, obj.id)
result = action_base.Action.load(self.ctx, None, db_action)
# no need to do a thorough test here
self.assertEqual(obj.id, result.id)
self.assertEqual(obj.action, result.action)
def test_load_all(self):
result = action_base.Action.load_all(self.ctx)
self.assertEqual([], [c for c in result])
values = copy.deepcopy(self.action_values)
action1 = action_base.Action(self.ctx, 'OBJECT_ACTION', **values)
action1.store(self.ctx)
action2 = action_base.Action(self.ctx, 'OBJECT_ACTION', **values)
action2.store(self.ctx)
# NOTE: we don't test all other parameters because the db api tests
# already covered that
results = list(action_base.Action.load_all(self.ctx))
actions = [a.id for a in results]
self.assertEqual(2, len(actions))
self.assertIn(action1.id, actions)
self.assertIn(action2.id, actions)
@mock.patch.object(db_api, 'action_get_all')
def test_load_all_with_params(self, mock_call):
mock_call.return_value = []
results = action_base.Action.load_all(
self.ctx, filters='FAKE_FILTER', limit='FAKE_LIMIT',
marker='FAKE_MARKER', sort_keys='FAKE_KEYS', sort_dir='FAKE_DIR',
show_deleted='FAKE_SHOW_DELETED')
# the following line is important, or else the generator won't get
# called.
self.assertEqual([], list(results))
mock_call.assert_called_once_with(
self.ctx, filters='FAKE_FILTER', limit='FAKE_LIMIT',
marker='FAKE_MARKER', sort_keys='FAKE_KEYS', sort_dir='FAKE_DIR',
show_deleted='FAKE_SHOW_DELETED')
def test_action_delete(self):
result = action_base.Action.delete(self.ctx, 'non-existent')
self.assertIsNone(result)
values = copy.deepcopy(self.action_values)
action1 = action_base.Action(self.ctx, 'OBJECT_ACTION', **values)
action1.store(self.ctx)
result = action_base.Action.delete(self.ctx, action1.id)
self.assertIsNone(result)
@mock.patch.object(db_api, 'action_delete')
def test_action_delete_db_call(self, mock_call):
# test db api call
action_base.Action.delete(self.ctx, 'FAKE_ID')
mock_call.assert_called_once_with(self.ctx, 'FAKE_ID', False)
@mock.patch.object(db_api, 'action_signal')
def test_action_signal_bad_command(self, mock_call):
values = copy.deepcopy(self.action_values)
action1 = action_base.Action(self.ctx, 'OBJECT_ACTION', **values)
action1.store(self.ctx)
result = action1.signal(self.ctx, 'BOGUS')
self.assertIsNone(result)
self.assertEqual(0, mock_call.call_count)
@mock.patch.object(db_api, 'action_signal')
@mock.patch.object(event, 'error')
def test_action_signal_cancel(self, mock_error, mock_call):
values = copy.deepcopy(self.action_values)
action = action_base.Action(self.ctx, 'OBJECT_ACTION', **values)
action.store(self.ctx)
expected = [action.INIT, action.WAITING, action.READY, action.RUNNING]
for status in expected:
action.status = status
result = action.signal(self.ctx, action.SIG_CANCEL)
self.assertIsNone(result)
self.assertEqual(1, mock_call.call_count)
mock_call.reset_mock()
invalid = [action.SUSPENDED, action.SUCCEEDED, action.CANCELLED,
action.FAILED]
for status in invalid:
action.status = status
result = action.signal(self.ctx, action.SIG_CANCEL)
self.assertIsNone(result)
self.assertEqual(0, mock_call.call_count)
mock_call.reset_mock()
self.assertEqual(1, mock_error.call_count)
mock_error.reset_mock()
@mock.patch.object(db_api, 'action_signal')
@mock.patch.object(event, 'error')
def test_action_signal_suspend(self, mock_error, mock_call):
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
expected = [action.RUNNING]
for status in expected:
action.status = status
result = action.signal(self.ctx, action.SIG_SUSPEND)
self.assertIsNone(result)
self.assertEqual(1, mock_call.call_count)
mock_call.reset_mock()
invalid = [action.INIT, action.WAITING, action.READY, action.SUSPENDED,
action.SUCCEEDED, action.CANCELLED, action.FAILED]
for status in invalid:
action.status = status
result = action.signal(self.ctx, action.SIG_SUSPEND)
self.assertIsNone(result)
self.assertEqual(0, mock_call.call_count)
mock_call.reset_mock()
self.assertEqual(1, mock_error.call_count)
mock_error.reset_mock()
@mock.patch.object(db_api, 'action_signal')
@mock.patch.object(event, 'error')
def test_action_signal_resume(self, mock_error, mock_call):
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
expected = [action.SUSPENDED]
for status in expected:
action.status = status
result = action.signal(self.ctx, action.SIG_RESUME)
self.assertIsNone(result)
self.assertEqual(1, mock_call.call_count)
mock_call.reset_mock()
invalid = [action.INIT, action.WAITING, action.READY, action.RUNNING,
action.SUCCEEDED, action.CANCELLED, action.FAILED]
for status in invalid:
action.status = status
result = action.signal(self.ctx, action.SIG_RESUME)
self.assertIsNone(result)
self.assertEqual(0, mock_call.call_count)
mock_call.reset_mock()
self.assertEqual(1, mock_error.call_count)
mock_error.reset_mock()
def test_execute_default(self):
action = action_base.Action.__new__(DummyAction, self.ctx, 'BOOM')
res = action.execute()
self.assertEqual(NotImplemented, res)
@mock.patch.object(db_api, 'action_mark_succeeded')
@mock.patch.object(db_api, 'action_mark_failed')
@mock.patch.object(db_api, 'action_mark_cancelled')
@mock.patch.object(db_api, 'action_abandon')
def test_set_status(self, mock_abandon, mark_cancel, mark_fail,
mark_succeed):
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
action.id = 'FAKE_ID'
action.set_status(action.RES_OK, 'FAKE_REASON')
self.assertEqual(action.SUCCEEDED, action.status)
self.assertEqual('FAKE_REASON', action.status_reason)
mark_succeed.assert_called_once_with(action.context, 'FAKE_ID',
mock.ANY)
action.set_status(action.RES_ERROR, 'FAKE_ERROR')
self.assertEqual(action.FAILED, action.status)
self.assertEqual('FAKE_ERROR', action.status_reason)
mark_fail.assert_called_once_with(action.context, 'FAKE_ID', mock.ANY,
'FAKE_ERROR')
mark_fail.reset_mock()
action.set_status(action.RES_TIMEOUT, 'TIMEOUT_ERROR')
self.assertEqual(action.FAILED, action.status)
self.assertEqual('TIMEOUT_ERROR', action.status_reason)
mark_fail.assert_called_once_with(action.context, 'FAKE_ID', mock.ANY,
'TIMEOUT_ERROR')
mark_fail.reset_mock()
action.set_status(action.RES_CANCEL, 'CANCELLED')
self.assertEqual(action.CANCELLED, action.status)
self.assertEqual('CANCELLED', action.status_reason)
mark_cancel.assert_called_once_with(action.context, 'FAKE_ID',
mock.ANY)
mark_fail.reset_mock()
action.set_status(action.RES_RETRY, 'BUSY')
self.assertEqual(action.READY, action.status)
self.assertEqual('BUSY', action.status_reason)
mock_abandon.assert_called_once_with(action.context, 'FAKE_ID')
@mock.patch.object(db_api, 'action_get')
def test_get_status(self, mock_get):
obj = mock.Mock()
obj.status = 'FAKE_STATUS'
mock_get.return_value = obj
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
action.id = 'FAKE_ID'
res = action.get_status()
self.assertEqual('FAKE_STATUS', res)
self.assertEqual('FAKE_STATUS', action.status)
mock_get.assert_called_once_with(action.context, 'FAKE_ID')
@mock.patch.object(action_base, 'wallclock')
def test_is_timeout(self, mock_time):
action = action_base.Action.__new__(DummyAction, self.ctx, 'BOOM')
action.start_time = 1
action.timeout = 10
mock_time.return_value = 9
self.assertFalse(action.is_timeout())
mock_time.return_value = 10
self.assertFalse(action.is_timeout())
mock_time.return_value = 11
self.assertFalse(action.is_timeout())
mock_time.return_value = 12
self.assertTrue(action.is_timeout())
def test_check_signal_timeout(self):
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
action.id = 'FAKE_ID'
action.timeout = 10
self.patchobject(action, 'is_timeout', return_value=True)
res = action._check_signal()
self.assertEqual(action.RES_TIMEOUT, res)
@mock.patch.object(db_api, 'action_signal_query')
def test_check_signal_signals_caught(self, mock_query):
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
action.id = 'FAKE_ID'
action.timeout = 100
self.patchobject(action, 'is_timeout', return_value=False)
sig_cmd = mock.Mock()
mock_query.return_value = sig_cmd
res = action._check_signal()
self.assertEqual(sig_cmd, res)
mock_query.assert_called_once_with(action.context, 'FAKE_ID')
@mock.patch.object(db_api, 'action_signal_query')
def test_is_cancelled(self, mock_query):
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
action.id = 'FAKE_ID'
action.timeout = 100
self.patchobject(action, 'is_timeout', return_value=False)
mock_query.return_value = action.SIG_CANCEL
res = action.is_cancelled()
self.assertTrue(res)
mock_query.assert_called_once_with(action.context, 'FAKE_ID')
mock_query.reset_mock()
mock_query.return_value = None
res = action.is_cancelled()
self.assertFalse(res)
mock_query.assert_called_once_with(action.context, 'FAKE_ID')
@mock.patch.object(db_api, 'action_signal_query')
def test_is_suspended(self, mock_query):
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
action.id = 'FAKE_ID'
action.timeout = 100
self.patchobject(action, 'is_timeout', return_value=False)
mock_query.return_value = action.SIG_SUSPEND
res = action.is_suspended()
self.assertTrue(res)
mock_query.assert_called_once_with(action.context, 'FAKE_ID')
mock_query.reset_mock()
mock_query.return_value = 'OTHERS'
res = action.is_suspended()
self.assertFalse(res)
mock_query.assert_called_once_with(action.context, 'FAKE_ID')
@mock.patch.object(db_api, 'action_signal_query')
def test_is_resumed(self, mock_query):
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
action.id = 'FAKE_ID'
action.timeout = 100
self.patchobject(action, 'is_timeout', return_value=False)
mock_query.return_value = action.SIG_RESUME
res = action.is_resumed()
self.assertTrue(res)
mock_query.assert_called_once_with(action.context, 'FAKE_ID')
mock_query.reset_mock()
mock_query.return_value = 'OTHERS'
res = action.is_resumed()
self.assertFalse(res)
mock_query.assert_called_once_with(action.context, 'FAKE_ID')
@mock.patch.object(cp_mod.ClusterPolicy, 'load_all')
def test_policy_check_target_invalid(self, mock_load):
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
res = action.policy_check('FAKE_CLUSTER', 'WHEN')
self.assertIsNone(res)
self.assertEqual(0, mock_load.call_count)
@mock.patch.object(cp_mod.ClusterPolicy, 'load_all')
def test_policy_check_no_bindings(self, mock_load):
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
mock_load.return_value = []
res = action.policy_check('FAKE_CLUSTER', 'BEFORE')
self.assertIsNone(res)
self.assertEqual(policy_mod.CHECK_OK, action.data['status'])
mock_load.assert_called_once_with(action.context, 'FAKE_CLUSTER',
sort_keys=['priority'],
filters={'enabled': True})
def test_action_to_dict(self):
action = action_base.Action(self.ctx, 'OBJECT_ACTION',
**self.action_values)
action.id = 'FAKE_ID'
expected = {
'id': 'FAKE_ID',
'name': 'FAKE_NAME',
'action': 'OBJECT_ACTION',
'context': self.ctx.to_dict(),
'target': 'FAKE_TARGET',
'cause': 'FAKE_CAUSE',
'owner': 'FAKE_OWNER',
'interval': 60,
'start_time': 0,
'end_time': 0,
'timeout': 120,
'status': 'FAKE_STATUS',
'status_reason': 'FAKE_STATUS_REASON',
'inputs': {'param': 'value'},
'outputs': {'key': 'output_value'},
'depends_on': ['ACTION_1'],
'depended_by': ['ACTION_2'],
'created_time': None,
'updated_time': None,
'deleted_time': None,
'data': {'data_key': 'data_value'},
}
res = action.to_dict()
self.assertEqual(expected, res)
class ActionPolicyCheckTest(base.SenlinTestCase):
def setUp(self):
super(ActionPolicyCheckTest, self).setUp()
self.ctx = utils.dummy_context()
environment.global_env().register_policy('DummyPolicy',
fakes.TestPolicy)
def _create_policy(self):
values = {
'user': self.ctx.user,
'project': self.ctx.project,
}
policy = fakes.TestPolicy('DummyPolicy', 'test-policy', **values)
policy.store(self.ctx)
return policy
def _create_cp_binding(self, cluster_id, policy_id):
values = {
'priority': 30,
'cooldown': 60,
'level': 50,
'enabled': True,
}
pb = cp_mod.ClusterPolicy(cluster_id, policy_id, **values)
pb.id = 'FAKE_BINDING_ID'
return pb
@mock.patch.object(cp_mod.ClusterPolicy, 'load_all')
@mock.patch.object(policy_mod.Policy, 'load')
def test_policy_check_missing_target(self, mock_load, mock_load_all):
cluster_id = 'FAKE_CLUSTER_ID'
# Note: policy is mocked
policy = mock.Mock()
policy.id = 'FAKE_POLICY_ID'
policy.TARGET = [('BEFORE', 'OBJECT_ACTION')]
# Note: policy binding is created but not stored
pb = self._create_cp_binding(cluster_id, policy.id)
self.assertIsNone(pb.last_op)
mock_load_all.return_value = [pb]
mock_load.return_value = policy
action = action_base.Action(self.ctx, 'OBJECT_ACTION_1')
res = action.policy_check(cluster_id, 'AFTER')
self.assertIsNone(res)
self.assertEqual(policy_mod.CHECK_OK, action.data['status'])
mock_load_all.assert_called_once_with(
action.context, cluster_id,
sort_keys=['priority'], filters={'enabled': True})
mock_load.assert_called_once_with(action.context, policy.id)
# last_op was updated anyway
self.assertIsNotNone(pb.last_op)
# neither pre_op nor post_op was called, because target not match
self.assertEqual(0, policy.pre_op.call_count)
self.assertEqual(0, policy.post_op.call_count)
@mock.patch.object(cp_mod.ClusterPolicy, 'load_all')
@mock.patch.object(policy_mod.Policy, 'load')
def test_policy_check_pre_op(self, mock_load, mock_load_all):
cluster_id = 'FAKE_CLUSTER_ID'
# Note: policy is mocked
policy = mock.Mock()
policy.id = 'FAKE_POLICY_ID'
policy.TARGET = [('BEFORE', 'OBJECT_ACTION')]
# Note: policy binding is created but not stored
pb = self._create_cp_binding(cluster_id, policy.id)
self.assertIsNone(pb.last_op)
mock_load_all.return_value = [pb]
mock_load.return_value = policy
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
res = action.policy_check(cluster_id, 'BEFORE')
self.assertIsNone(res)
self.assertEqual(policy_mod.CHECK_OK, action.data['status'])
mock_load_all.assert_called_once_with(
action.context, cluster_id,
sort_keys=['priority'], filters={'enabled': True})
mock_load.assert_called_once_with(action.context, policy.id)
# last_op was not updated
self.assertIsNone(pb.last_op)
# pre_op is called, but post_op was not called
policy.pre_op.assert_called_once_with(cluster_id, action)
self.assertEqual(0, policy.post_op.call_count)
@mock.patch.object(cp_mod.ClusterPolicy, 'load_all')
@mock.patch.object(policy_mod.Policy, 'load')
def test_policy_check_post_op(self, mock_load, mock_load_all):
cluster_id = 'FAKE_CLUSTER_ID'
# Note: policy is mocked
policy = mock.Mock()
policy.id = 'FAKE_POLICY_ID'
policy.TARGET = [('AFTER', 'OBJECT_ACTION')]
# Note: policy binding is created but not stored
pb = self._create_cp_binding(cluster_id, policy.id)
self.assertIsNone(pb.last_op)
mock_load_all.return_value = [pb]
mock_load.return_value = policy
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
res = action.policy_check('FAKE_CLUSTER_ID', 'AFTER')
self.assertIsNone(res)
self.assertEqual(policy_mod.CHECK_OK, action.data['status'])
mock_load_all.assert_called_once_with(
action.context, cluster_id,
sort_keys=['priority'], filters={'enabled': True})
mock_load.assert_called_once_with(action.context, policy.id)
# last_op was updated for POST check
self.assertIsNotNone(pb.last_op)
# pre_op is called, but post_op was not called
self.assertEqual(0, policy.pre_op.call_count)
policy.post_op.assert_called_once_with(cluster_id, action)
@mock.patch.object(cp_mod.ClusterPolicy, 'load_all')
@mock.patch.object(policy_mod.Policy, 'load')
def test_policy_check_cooldown_inprogress(self, mock_load, mock_load_all):
cluster_id = 'FAKE_CLUSTER_ID'
# Note: policy is mocked
policy = mock.Mock()
policy.id = 'FAKE_POLICY_ID'
policy.TARGET = [('AFTER', 'OBJECT_ACTION')]
# Note: policy binding is created but not stored
pb = self._create_cp_binding(cluster_id, policy.id)
self.patchobject(pb, 'cooldown_inprogress', return_value=True)
self.assertIsNone(pb.last_op)
mock_load_all.return_value = [pb]
mock_load.return_value = policy
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
res = action.policy_check('FAKE_CLUSTER_ID', 'AFTER')
self.assertIsNone(res)
self.assertEqual(policy_mod.CHECK_ERROR, action.data['status'])
self.assertEqual('Policy FAKE_POLICY_ID cooldown is still in '
'progress.', six.text_type(action.data['reason']))
mock_load_all.assert_called_once_with(
action.context, cluster_id,
sort_keys=['priority'], filters={'enabled': True})
mock_load.assert_called_once_with(action.context, policy.id)
# last_op was updated for POST check
self.assertIsNotNone(pb.last_op)
# neither pre_op nor post_op was not called, due to cooldown
self.assertEqual(0, policy.pre_op.call_count)
self.assertEqual(0, policy.post_op.call_count)
@mock.patch.object(cp_mod.ClusterPolicy, 'load_all')
@mock.patch.object(policy_mod.Policy, 'load')
def test_policy_check_abort_in_middle(self, mock_load, mock_load_all):
def fake_post_op(action):
action.data['reason'] = 'BAD THINGS HAPPEN'
action.data['status'] = policy_mod.CHECK_ERROR
return
cluster_id = 'FAKE_CLUSTER_ID'
# Note: both policies are mocked
policy1 = mock.Mock()
policy1.id = 'FAKE_POLICY_ID_1'
policy1.cooldown = 0
policy1.TARGET = [('AFTER', 'OBJECT_ACTION')]
policy2 = mock.Mock()
policy2.id = 'FAKE_POLICY_ID_2'
policy2.cooldown = 0
policy2.TARGET = [('AFTER', 'OBJECT_ACTION')]
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
# Make policy1.post_op a failure
policy1.post_op = mock.MagicMock()
policy1.post_op.side_effect = fake_post_op(action)
# Note: policy binding is created but not stored
pb1 = self._create_cp_binding(cluster_id, policy1.id)
pb2 = self._create_cp_binding(cluster_id, policy2.id)
mock_load_all.return_value = [pb1, pb2]
# mock return value for two calls
mock_load.side_effect = [policy1, policy2]
res = action.policy_check(cluster_id, 'AFTER')
self.assertIsNone(res)
# post_op from policy1 was called, but post_op from policy2 was not
policy1.post_op.assert_called_once_with(cluster_id, action)
self.assertEqual(0, policy2.post_op.call_count)
self.assertEqual(policy_mod.CHECK_ERROR, action.data['status'])
self.assertEqual('Policy checking failed at policy FAKE_POLICY_ID_1.',
six.text_type(action.data['reason']))
mock_load_all.assert_called_once_with(
action.context, cluster_id,
sort_keys=['priority'], filters={'enabled': True})
calls = [mock.call(action.context, policy1.id)]
mock_load.assert_has_calls(calls)
class ActionProcTest(base.SenlinTestCase):
def setUp(self):
super(ActionProcTest, self).setUp()
self.ctx = utils.dummy_context()
@mock.patch.object(action_base, 'wallclock')
@mock.patch.object(db_api, 'action_acquire')
def test_action_proc_fail_acquire(self, mock_acquire, mock_clock):
mock_clock.return_value = 'TIMESTAMP'
mock_acquire.return_value = None
res = action_base.ActionProc(self.ctx, 'ACTION', 'WORKER')
self.assertFalse(res)
mock_clock.assert_called_once_with()
mock_acquire.assert_called_once_with(
self.ctx, 'ACTION', 'WORKER', 'TIMESTAMP')
@mock.patch.object(action_base, 'wallclock')
@mock.patch.object(db_api, 'action_acquire')
@mock.patch.object(action_base.Action, 'load')
@mock.patch.object(db_api, 'action_mark_succeeded')
def test_action_proc_successful(self, mock_mark, mock_load, mock_acquire,
mock_clock):
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
self.patchobject(action, 'execute',
return_value=(action.RES_OK, 'BIG SUCCESS'))
mock_clock.return_value = 'TIMESTAMP'
mock_db_action = mock.Mock()
mock_acquire.return_value = mock_db_action
mock_load.return_value = action
res = action_base.ActionProc(self.ctx, 'ACTION', 'WORKER')
self.assertTrue(res)
mock_acquire.assert_called_once_with(
self.ctx, 'ACTION', 'WORKER', 'TIMESTAMP')
mock_load.assert_called_once_with(self.ctx, db_action=mock_db_action)
self.assertEqual(action.SUCCEEDED, action.status)
self.assertEqual('BIG SUCCESS', action.status_reason)
@mock.patch.object(action_base, 'wallclock')
@mock.patch.object(db_api, 'action_acquire')
@mock.patch.object(action_base.Action, 'load')
@mock.patch.object(db_api, 'action_mark_failed')
def test_action_proc_failed_error(self, mock_mark, mock_load, mock_acquire,
mock_clock):
action = action_base.Action(self.ctx, 'OBJECT_ACTION')
self.patchobject(action, 'execute', side_effect=Exception('Boom!'))
mock_clock.return_value = 'TIMESTAMP'
mock_db_action = mock.Mock()
mock_acquire.return_value = mock_db_action
mock_load.return_value = action
res = action_base.ActionProc(self.ctx, 'ACTION', 'WORKER')
self.assertFalse(res)
mock_acquire.assert_called_once_with(
self.ctx, 'ACTION', 'WORKER', 'TIMESTAMP')
mock_load.assert_called_once_with(self.ctx, db_action=mock_db_action)
self.assertEqual(action.FAILED, action.status)
self.assertEqual('Boom!', action.status_reason)

View File

@ -60,21 +60,25 @@ class TestPolicy(policy_base.Policy):
'KEY2': schema.Integer('key2', default=1),
}
TARGET = [
('BEFORE', 'CLUSTER_ADD_NODES')
]
def __init__(self, type_name, name, **kwargs):
super(TestPolicy, self).__init__(type_name, name, **kwargs)
def attach(self, context, cluster, policy_data):
def attach(self, cluster):
return True, {}
def detach(self, cluster):
return True, 'OK'
def pre_op(self, cluster_id, action):
return
def detach(self, context, cluster, policy_data):
def post_op(self, cluster_id, action):
return
def pre_op(self, cluster_id, action, policy_data):
return policy_data
def post_op(self, cluster_id, action, policy_data):
return policy_data
class TestTrigger(trigger_base.Trigger):
rule_schema = {