diff --git a/senlin/db/sqlalchemy/api.py b/senlin/db/sqlalchemy/api.py index b1e6d11a9..59a4c4f71 100644 --- a/senlin/db/sqlalchemy/api.py +++ b/senlin/db/sqlalchemy/api.py @@ -1461,7 +1461,7 @@ def action_delete(context, action_id, force=False): action = query.get(action_id) if not action: - raise exception.ActionNotFound(action=action_id) + return # TODO(liuh): Need check if and how an action can be safety deleted action.delete() diff --git a/senlin/engine/actions/base.py b/senlin/engine/actions/base.py index 7c5d46197..600f8a517 100644 --- a/senlin/engine/actions/base.py +++ b/senlin/engine/actions/base.py @@ -94,13 +94,17 @@ class Action(object): # context will be persisted into database so that any worker thread # can pick the action up and execute it on behalf of the initiator - self.id = kwargs.get('id', '') + self.id = kwargs.get('id', None) self.name = kwargs.get('name', '') + + # TODO(Qiming): rework the context initialization logic self.context = req_context.RequestContext.from_dict(context.to_dict()) + # TODO(Qiming): make description a db column self.description = kwargs.get('description', '') - # Target is the ID of a cluster, a node, a profile + # TODO(Qiming): make this a positional argument + # Target is the ID of a cluster, a node or a policy self.target = kwargs.get('target') self.action = action @@ -151,6 +155,8 @@ class Action(object): def store(self, context): '''Store the action record into database table.''' + timestamp = timeutils.utcnow() + values = { 'name': self.name, 'context': self.context.to_dict(), @@ -168,17 +174,19 @@ class Action(object): 'outputs': self.outputs, 'depends_on': self.depends_on, 'depended_by': self.depended_by, - 'created_time': timeutils.utcnow(), + 'created_time': self.created_time, 'updated_time': self.updated_time, 'deleted_time': self.deleted_time, 'data': self.data, } if self.id: - values['updated_time'] = timeutils.utcnow() + self.updated_time = timestamp + values['updated_time'] = timestamp db_api.action_update(context, self.id, values) else: - values['created_time'] = timeutils.utcnow() + self.created_time = timestamp + values['created_time'] = timestamp action = db_api.action_create(context, values) self.id = action.id @@ -208,6 +216,8 @@ class Action(object): 'outputs': record.outputs, 'depends_on': record.depends_on, 'depended_by': record.depended_by, + 'created_time': record.created_time, + 'updated_time': record.updated_time, 'deleted_time': record.deleted_time, 'data': record.data, } @@ -215,15 +225,15 @@ class Action(object): return cls(context, record.action, **kwargs) @classmethod - def load(cls, context, action_id=None, action=None): + def load(cls, context, action_id=None, db_action=None): '''Retrieve an action from database.''' - if action is None: - action = db_api.action_get(context, action_id) + if db_action is None: + db_action = db_api.action_get(context, action_id) - if action is None: + if db_action is None: raise exception.ActionNotFound(action=action_id) - return cls._from_db_record(action) + return cls._from_db_record(db_action) @classmethod def load_all(cls, context, filters=None, limit=None, marker=None, @@ -248,12 +258,12 @@ class Action(object): if cmd not in self.COMMANDS: return - if cmd == self.CANCEL: + if cmd == self.SIG_CANCEL: expected_statuses = (self.INIT, self.WAITING, self.READY, self.RUNNING) - elif cmd == self.SUSPEND: + elif cmd == self.SIG_SUSPEND: expected_statuses = (self.RUNNING) - else: # RESUME + else: # SIG_RESUME expected_statuses = (self.SUSPENDED) if self.status not in expected_statuses: @@ -354,7 +364,6 @@ class Action(object): if target not in ['BEFORE', 'AFTER']: return - self.data['status'] = policy_mod.CHECK_OK # TODO(Anyone): This could use the cluster's runtime data bindings = cp_mod.ClusterPolicy.load_all(self.context, cluster_id, sort_keys=['priority'], @@ -380,16 +389,20 @@ class Action(object): self.data['status'] = policy_mod.CHECK_ERROR self.data['reason'] = _('Policy %(id)s cooldown is still ' 'in progress.') % {'id': policy.id} - break + return method(cluster_id, self) # Abort policy checking if failures found - if self.data['status'] == policy_mod.CHECK_ERROR: + if ('status' in self.data and + self.data['status'] == policy_mod.CHECK_ERROR): LOG.warning(_('Failed policy checking: %s'), self.data['reason']) + self.data['reason'] = _('Policy checking failed at policy ' + '%(id)s.') % {'id': policy.id} return + self.data['status'] = policy_mod.CHECK_OK return def to_dict(self): @@ -412,34 +425,32 @@ class Action(object): 'outputs': self.outputs, 'depends_on': self.depends_on, 'depended_by': self.depended_by, + 'created_time': self.created_time, + 'updated_time': self.updated_time, 'deleted_time': self.deleted_time, 'data': self.data, } return action_dict - @classmethod - def from_dict(cls, context=None, **kwargs): - action = kwargs.pop('action') - return cls(context, action, **kwargs) - def ActionProc(context, action_id, worker_id): '''Action process.''' # Step 1: lock the action for execution timestamp = wallclock() - result = db_api.action_acquire(context, action_id, worker_id, timestamp) - if result is None: + db_action = db_api.action_acquire(context, action_id, worker_id, timestamp) + if db_action is None: LOG.debug(_('Failed locking action "%s" for execution'), action_id) return False # Step 2: materialize the action object - action = Action.load(context, action_id=action_id) + action = Action.load(context, db_action=db_action) LOG.info(_LI('Action %(name)s [%(id)s] started'), {'name': six.text_type(action.action), 'id': action.id}) reason = 'Action completed' + success = True try: # Step 3: execute the action result, reason = action.execute() @@ -453,6 +464,9 @@ def ActionProc(context, action_id, worker_id): '%(action)s (%(id)s) execution: %(reason)s'), {'action': action.action, 'id': action.id, 'reason': reason}) + success = False finally: # NOTE: locks on action is eventually released here by status update action.set_status(result, reason) + + return success diff --git a/senlin/engine/service.py b/senlin/engine/service.py index 558912f4d..2f7f95ed4 100644 --- a/senlin/engine/service.py +++ b/senlin/engine/service.py @@ -1481,7 +1481,7 @@ class EngineService(service.Service): @request_context def action_get(self, context, identity): db_action = self.action_find(context, identity) - action = action_mod.Action.load(context, action=db_action) + action = action_mod.Action.load(context, db_action=db_action) return action.to_dict() def event_find(self, context, identity, show_deleted=False): diff --git a/senlin/tests/unit/engine/actions/__init__.py b/senlin/tests/unit/engine/actions/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/senlin/tests/unit/engine/actions/test_action_base.py b/senlin/tests/unit/engine/actions/test_action_base.py new file mode 100644 index 000000000..113424af8 --- /dev/null +++ b/senlin/tests/unit/engine/actions/test_action_base.py @@ -0,0 +1,815 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy + +import mock +from oslo_config import cfg +import six + +from senlin.common import exception +from senlin.db.sqlalchemy import api as db_api +from senlin.engine.actions import base as action_base +from senlin.engine import cluster_policy as cp_mod +from senlin.engine import environment +from senlin.engine import event +from senlin.policies import base as policy_mod +from senlin.tests.unit.common import base +from senlin.tests.unit.common import utils +from senlin.tests.unit import fakes + + +class DummyAction(action_base.Action): + + def __init__(self, context, action, **kwargs): + super(DummyAction, self).__init__(context, action, **kwargs) + + +class ActionBaseTest(base.SenlinTestCase): + + def setUp(self): + super(ActionBaseTest, self).setUp() + + self.ctx = utils.dummy_context() + self.action_values = { + 'name': 'FAKE_NAME', + 'target': 'FAKE_TARGET', + 'cause': 'FAKE_CAUSE', + 'owner': 'FAKE_OWNER', + 'interval': 60, + 'start_time': 0, + 'end_time': 0, + 'timeout': 120, + 'status': 'FAKE_STATUS', + 'status_reason': 'FAKE_STATUS_REASON', + 'inputs': {'param': 'value'}, + 'outputs': {'key': 'output_value'}, + 'depends_on': ['ACTION_1'], + 'depended_by': ['ACTION_2'], + 'created_time': None, + 'updated_time': None, + 'deleted_time': None, + 'data': {'data_key': 'data_value'}, + } + + def _verify_new_action(self, obj, action): + self.assertIsNone(obj.id) + self.assertEqual('', obj.name) + self.assertEqual('', obj.description) + self.assertIsNone(obj.target) + self.assertEqual(action, obj.action) + self.assertEqual('', obj.cause) + self.assertIsNone(obj.owner) + self.assertEqual(-1, obj.interval) + self.assertIsNone(obj.start_time) + self.assertIsNone(obj.end_time) + self.assertEqual(cfg.CONF.default_action_timeout, obj.timeout) + self.assertEqual('INIT', obj.status) + self.assertEqual('', obj.status_reason) + self.assertEqual({}, obj.inputs) + self.assertEqual({}, obj.outputs) + self.assertEqual([], obj.depends_on) + self.assertEqual([], obj.depended_by) + self.assertIsNone(obj.created_time) + self.assertIsNone(obj.updated_time) + self.assertIsNone(obj.deleted_time) + self.assertEqual({}, obj.data) + + def test_action_new(self): + for action in ['CLUSTER_CREATE', 'NODE_CREATE', 'POLICY_ATTACH', + 'WHAT_EVER']: + obj = action_base.Action(self.ctx, 'CLUSTER_CREATE') + self._verify_new_action(obj, 'CLUSTER_CREATE') + + def test_action_init_with_values(self): + values = copy.deepcopy(self.action_values) + values['id'] = 'FAKE_ID' + values['description'] = 'FAKE_DESC' + values['created_time'] = 'FAKE_CREATED_TIME' + values['updated_time'] = 'FAKE_UPDATED_TIME' + values['deleted_time'] = 'FAKE_DELETED_TIME' + + obj = action_base.Action(self.ctx, 'OBJECT_ACTION', **values) + + self.assertEqual('FAKE_ID', obj.id) + self.assertEqual('FAKE_NAME', obj.name) + self.assertEqual('FAKE_DESC', obj.description) + self.assertEqual('FAKE_TARGET', obj.target) + self.assertEqual('FAKE_CAUSE', obj.cause) + self.assertEqual('FAKE_OWNER', obj.owner) + self.assertEqual(60, obj.interval) + self.assertEqual(0, obj.start_time) + self.assertEqual(0, obj.end_time) + self.assertEqual(120, obj.timeout) + self.assertEqual('FAKE_STATUS', obj.status) + self.assertEqual('FAKE_STATUS_REASON', obj.status_reason) + self.assertEqual({'param': 'value'}, obj.inputs) + self.assertEqual({'key': 'output_value'}, obj.outputs) + self.assertEqual(['ACTION_1'], obj.depends_on) + self.assertEqual(['ACTION_2'], obj.depended_by) + self.assertEqual('FAKE_CREATED_TIME', obj.created_time) + self.assertEqual('FAKE_UPDATED_TIME', obj.updated_time) + self.assertEqual('FAKE_DELETED_TIME', obj.deleted_time) + self.assertEqual({'data_key': 'data_value'}, obj.data) + + def test_action_store_for_create(self): + values = copy.deepcopy(self.action_values) + obj = action_base.Action(self.ctx, 'OBJECT_ACTION', **values) + + self.assertIsNone(obj.created_time) + self.assertIsNone(obj.updated_time) + self.assertIsNone(obj.deleted_time) + + # store for creation + res = obj.store(self.ctx) + self.assertIsNotNone(res) + self.assertEqual(obj.id, res) + self.assertIsNotNone(obj.created_time) + self.assertIsNone(obj.updated_time) + self.assertIsNone(obj.deleted_time) + + def test_action_store_for_update(self): + values = copy.deepcopy(self.action_values) + + obj = action_base.Action(self.ctx, 'OBJECT_ACTION', **values) + obj_id = obj.store(self.ctx) + self.assertIsNotNone(obj_id) + self.assertIsNotNone(obj.created_time) + self.assertIsNone(obj.updated_time) + self.assertIsNone(obj.deleted_time) + + # store for creation + res = obj.store(self.ctx) + self.assertIsNotNone(res) + self.assertEqual(obj_id, res) + self.assertEqual(obj.id, res) + self.assertIsNotNone(obj.created_time) + self.assertIsNotNone(obj.updated_time) + self.assertIsNone(obj.deleted_time) + + def test_from_db_record(self): + values = copy.deepcopy(self.action_values) + obj = action_base.Action(self.ctx, 'OBJECT_ACTION', **values) + obj.store(self.ctx) + + record = db_api.action_get(self.ctx, obj.id) + + action_obj = action_base.Action._from_db_record(record) + self.assertIsInstance(action_obj, action_base.Action) + self.assertEqual(obj.id, action_obj.id) + self.assertEqual(obj.action, action_obj.action) + self.assertEqual(obj.name, action_obj.name) + self.assertEqual(obj.target, action_obj.target) + self.assertEqual(obj.cause, action_obj.cause) + self.assertEqual(obj.owner, action_obj.owner) + self.assertEqual(obj.interval, action_obj.interval) + self.assertEqual(obj.start_time, action_obj.start_time) + self.assertEqual(obj.end_time, action_obj.end_time) + self.assertEqual(obj.timeout, action_obj.timeout) + self.assertEqual(obj.status, action_obj.status) + self.assertEqual(obj.status_reason, action_obj.status_reason) + self.assertEqual(obj.inputs, action_obj.inputs) + self.assertEqual(obj.outputs, action_obj.outputs) + self.assertEqual(obj.depends_on, action_obj.depends_on) + self.assertEqual(obj.depended_by, action_obj.depended_by) + self.assertEqual(obj.created_time, action_obj.created_time) + self.assertEqual(obj.updated_time, action_obj.updated_time) + self.assertEqual(obj.deleted_time, action_obj.deleted_time) + self.assertEqual(obj.data, action_obj.data) + + def test_load(self): + ex = self.assertRaises(exception.ActionNotFound, + action_base.Action.load, + self.ctx, 'non-existent', None) + self.assertEqual('The action (non-existent) could not be found.', + six.text_type(ex)) + + values = copy.deepcopy(self.action_values) + obj = action_base.Action(self.ctx, 'OBJECT_ACTION', **values) + obj.store(self.ctx) + + result = action_base.Action.load(self.ctx, obj.id, None) + # no need to do a thorough test here + self.assertEqual(obj.id, result.id) + self.assertEqual(obj.action, result.action) + + db_action = db_api.action_get(self.ctx, obj.id) + result = action_base.Action.load(self.ctx, None, db_action) + # no need to do a thorough test here + self.assertEqual(obj.id, result.id) + self.assertEqual(obj.action, result.action) + + def test_load_all(self): + result = action_base.Action.load_all(self.ctx) + self.assertEqual([], [c for c in result]) + + values = copy.deepcopy(self.action_values) + action1 = action_base.Action(self.ctx, 'OBJECT_ACTION', **values) + action1.store(self.ctx) + action2 = action_base.Action(self.ctx, 'OBJECT_ACTION', **values) + action2.store(self.ctx) + + # NOTE: we don't test all other parameters because the db api tests + # already covered that + results = list(action_base.Action.load_all(self.ctx)) + actions = [a.id for a in results] + self.assertEqual(2, len(actions)) + self.assertIn(action1.id, actions) + self.assertIn(action2.id, actions) + + @mock.patch.object(db_api, 'action_get_all') + def test_load_all_with_params(self, mock_call): + mock_call.return_value = [] + + results = action_base.Action.load_all( + self.ctx, filters='FAKE_FILTER', limit='FAKE_LIMIT', + marker='FAKE_MARKER', sort_keys='FAKE_KEYS', sort_dir='FAKE_DIR', + show_deleted='FAKE_SHOW_DELETED') + + # the following line is important, or else the generator won't get + # called. + self.assertEqual([], list(results)) + mock_call.assert_called_once_with( + self.ctx, filters='FAKE_FILTER', limit='FAKE_LIMIT', + marker='FAKE_MARKER', sort_keys='FAKE_KEYS', sort_dir='FAKE_DIR', + show_deleted='FAKE_SHOW_DELETED') + + def test_action_delete(self): + result = action_base.Action.delete(self.ctx, 'non-existent') + self.assertIsNone(result) + + values = copy.deepcopy(self.action_values) + action1 = action_base.Action(self.ctx, 'OBJECT_ACTION', **values) + action1.store(self.ctx) + + result = action_base.Action.delete(self.ctx, action1.id) + self.assertIsNone(result) + + @mock.patch.object(db_api, 'action_delete') + def test_action_delete_db_call(self, mock_call): + # test db api call + action_base.Action.delete(self.ctx, 'FAKE_ID') + mock_call.assert_called_once_with(self.ctx, 'FAKE_ID', False) + + @mock.patch.object(db_api, 'action_signal') + def test_action_signal_bad_command(self, mock_call): + values = copy.deepcopy(self.action_values) + action1 = action_base.Action(self.ctx, 'OBJECT_ACTION', **values) + action1.store(self.ctx) + + result = action1.signal(self.ctx, 'BOGUS') + self.assertIsNone(result) + self.assertEqual(0, mock_call.call_count) + + @mock.patch.object(db_api, 'action_signal') + @mock.patch.object(event, 'error') + def test_action_signal_cancel(self, mock_error, mock_call): + values = copy.deepcopy(self.action_values) + action = action_base.Action(self.ctx, 'OBJECT_ACTION', **values) + action.store(self.ctx) + + expected = [action.INIT, action.WAITING, action.READY, action.RUNNING] + for status in expected: + action.status = status + result = action.signal(self.ctx, action.SIG_CANCEL) + self.assertIsNone(result) + self.assertEqual(1, mock_call.call_count) + mock_call.reset_mock() + + invalid = [action.SUSPENDED, action.SUCCEEDED, action.CANCELLED, + action.FAILED] + for status in invalid: + action.status = status + result = action.signal(self.ctx, action.SIG_CANCEL) + self.assertIsNone(result) + self.assertEqual(0, mock_call.call_count) + mock_call.reset_mock() + self.assertEqual(1, mock_error.call_count) + mock_error.reset_mock() + + @mock.patch.object(db_api, 'action_signal') + @mock.patch.object(event, 'error') + def test_action_signal_suspend(self, mock_error, mock_call): + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + + expected = [action.RUNNING] + for status in expected: + action.status = status + result = action.signal(self.ctx, action.SIG_SUSPEND) + self.assertIsNone(result) + self.assertEqual(1, mock_call.call_count) + mock_call.reset_mock() + + invalid = [action.INIT, action.WAITING, action.READY, action.SUSPENDED, + action.SUCCEEDED, action.CANCELLED, action.FAILED] + for status in invalid: + action.status = status + result = action.signal(self.ctx, action.SIG_SUSPEND) + self.assertIsNone(result) + self.assertEqual(0, mock_call.call_count) + mock_call.reset_mock() + self.assertEqual(1, mock_error.call_count) + mock_error.reset_mock() + + @mock.patch.object(db_api, 'action_signal') + @mock.patch.object(event, 'error') + def test_action_signal_resume(self, mock_error, mock_call): + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + + expected = [action.SUSPENDED] + for status in expected: + action.status = status + result = action.signal(self.ctx, action.SIG_RESUME) + self.assertIsNone(result) + self.assertEqual(1, mock_call.call_count) + mock_call.reset_mock() + + invalid = [action.INIT, action.WAITING, action.READY, action.RUNNING, + action.SUCCEEDED, action.CANCELLED, action.FAILED] + for status in invalid: + action.status = status + result = action.signal(self.ctx, action.SIG_RESUME) + self.assertIsNone(result) + self.assertEqual(0, mock_call.call_count) + mock_call.reset_mock() + self.assertEqual(1, mock_error.call_count) + mock_error.reset_mock() + + def test_execute_default(self): + action = action_base.Action.__new__(DummyAction, self.ctx, 'BOOM') + res = action.execute() + self.assertEqual(NotImplemented, res) + + @mock.patch.object(db_api, 'action_mark_succeeded') + @mock.patch.object(db_api, 'action_mark_failed') + @mock.patch.object(db_api, 'action_mark_cancelled') + @mock.patch.object(db_api, 'action_abandon') + def test_set_status(self, mock_abandon, mark_cancel, mark_fail, + mark_succeed): + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + action.id = 'FAKE_ID' + + action.set_status(action.RES_OK, 'FAKE_REASON') + self.assertEqual(action.SUCCEEDED, action.status) + self.assertEqual('FAKE_REASON', action.status_reason) + mark_succeed.assert_called_once_with(action.context, 'FAKE_ID', + mock.ANY) + + action.set_status(action.RES_ERROR, 'FAKE_ERROR') + self.assertEqual(action.FAILED, action.status) + self.assertEqual('FAKE_ERROR', action.status_reason) + mark_fail.assert_called_once_with(action.context, 'FAKE_ID', mock.ANY, + 'FAKE_ERROR') + + mark_fail.reset_mock() + action.set_status(action.RES_TIMEOUT, 'TIMEOUT_ERROR') + self.assertEqual(action.FAILED, action.status) + self.assertEqual('TIMEOUT_ERROR', action.status_reason) + mark_fail.assert_called_once_with(action.context, 'FAKE_ID', mock.ANY, + 'TIMEOUT_ERROR') + + mark_fail.reset_mock() + action.set_status(action.RES_CANCEL, 'CANCELLED') + self.assertEqual(action.CANCELLED, action.status) + self.assertEqual('CANCELLED', action.status_reason) + mark_cancel.assert_called_once_with(action.context, 'FAKE_ID', + mock.ANY) + + mark_fail.reset_mock() + action.set_status(action.RES_RETRY, 'BUSY') + self.assertEqual(action.READY, action.status) + self.assertEqual('BUSY', action.status_reason) + mock_abandon.assert_called_once_with(action.context, 'FAKE_ID') + + @mock.patch.object(db_api, 'action_get') + def test_get_status(self, mock_get): + obj = mock.Mock() + obj.status = 'FAKE_STATUS' + mock_get.return_value = obj + + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + action.id = 'FAKE_ID' + + res = action.get_status() + + self.assertEqual('FAKE_STATUS', res) + self.assertEqual('FAKE_STATUS', action.status) + mock_get.assert_called_once_with(action.context, 'FAKE_ID') + + @mock.patch.object(action_base, 'wallclock') + def test_is_timeout(self, mock_time): + action = action_base.Action.__new__(DummyAction, self.ctx, 'BOOM') + action.start_time = 1 + action.timeout = 10 + + mock_time.return_value = 9 + self.assertFalse(action.is_timeout()) + + mock_time.return_value = 10 + self.assertFalse(action.is_timeout()) + + mock_time.return_value = 11 + self.assertFalse(action.is_timeout()) + + mock_time.return_value = 12 + self.assertTrue(action.is_timeout()) + + def test_check_signal_timeout(self): + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + action.id = 'FAKE_ID' + action.timeout = 10 + self.patchobject(action, 'is_timeout', return_value=True) + + res = action._check_signal() + self.assertEqual(action.RES_TIMEOUT, res) + + @mock.patch.object(db_api, 'action_signal_query') + def test_check_signal_signals_caught(self, mock_query): + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + action.id = 'FAKE_ID' + action.timeout = 100 + self.patchobject(action, 'is_timeout', return_value=False) + sig_cmd = mock.Mock() + mock_query.return_value = sig_cmd + + res = action._check_signal() + self.assertEqual(sig_cmd, res) + mock_query.assert_called_once_with(action.context, 'FAKE_ID') + + @mock.patch.object(db_api, 'action_signal_query') + def test_is_cancelled(self, mock_query): + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + action.id = 'FAKE_ID' + action.timeout = 100 + self.patchobject(action, 'is_timeout', return_value=False) + + mock_query.return_value = action.SIG_CANCEL + res = action.is_cancelled() + self.assertTrue(res) + mock_query.assert_called_once_with(action.context, 'FAKE_ID') + mock_query.reset_mock() + + mock_query.return_value = None + res = action.is_cancelled() + self.assertFalse(res) + mock_query.assert_called_once_with(action.context, 'FAKE_ID') + + @mock.patch.object(db_api, 'action_signal_query') + def test_is_suspended(self, mock_query): + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + action.id = 'FAKE_ID' + action.timeout = 100 + self.patchobject(action, 'is_timeout', return_value=False) + + mock_query.return_value = action.SIG_SUSPEND + res = action.is_suspended() + self.assertTrue(res) + mock_query.assert_called_once_with(action.context, 'FAKE_ID') + mock_query.reset_mock() + + mock_query.return_value = 'OTHERS' + res = action.is_suspended() + self.assertFalse(res) + mock_query.assert_called_once_with(action.context, 'FAKE_ID') + + @mock.patch.object(db_api, 'action_signal_query') + def test_is_resumed(self, mock_query): + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + action.id = 'FAKE_ID' + action.timeout = 100 + self.patchobject(action, 'is_timeout', return_value=False) + + mock_query.return_value = action.SIG_RESUME + res = action.is_resumed() + self.assertTrue(res) + mock_query.assert_called_once_with(action.context, 'FAKE_ID') + mock_query.reset_mock() + + mock_query.return_value = 'OTHERS' + res = action.is_resumed() + self.assertFalse(res) + mock_query.assert_called_once_with(action.context, 'FAKE_ID') + + @mock.patch.object(cp_mod.ClusterPolicy, 'load_all') + def test_policy_check_target_invalid(self, mock_load): + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + res = action.policy_check('FAKE_CLUSTER', 'WHEN') + self.assertIsNone(res) + self.assertEqual(0, mock_load.call_count) + + @mock.patch.object(cp_mod.ClusterPolicy, 'load_all') + def test_policy_check_no_bindings(self, mock_load): + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + mock_load.return_value = [] + res = action.policy_check('FAKE_CLUSTER', 'BEFORE') + self.assertIsNone(res) + self.assertEqual(policy_mod.CHECK_OK, action.data['status']) + mock_load.assert_called_once_with(action.context, 'FAKE_CLUSTER', + sort_keys=['priority'], + filters={'enabled': True}) + + def test_action_to_dict(self): + action = action_base.Action(self.ctx, 'OBJECT_ACTION', + **self.action_values) + action.id = 'FAKE_ID' + expected = { + 'id': 'FAKE_ID', + 'name': 'FAKE_NAME', + 'action': 'OBJECT_ACTION', + 'context': self.ctx.to_dict(), + 'target': 'FAKE_TARGET', + 'cause': 'FAKE_CAUSE', + 'owner': 'FAKE_OWNER', + 'interval': 60, + 'start_time': 0, + 'end_time': 0, + 'timeout': 120, + 'status': 'FAKE_STATUS', + 'status_reason': 'FAKE_STATUS_REASON', + 'inputs': {'param': 'value'}, + 'outputs': {'key': 'output_value'}, + 'depends_on': ['ACTION_1'], + 'depended_by': ['ACTION_2'], + 'created_time': None, + 'updated_time': None, + 'deleted_time': None, + 'data': {'data_key': 'data_value'}, + } + + res = action.to_dict() + self.assertEqual(expected, res) + + +class ActionPolicyCheckTest(base.SenlinTestCase): + + def setUp(self): + super(ActionPolicyCheckTest, self).setUp() + + self.ctx = utils.dummy_context() + environment.global_env().register_policy('DummyPolicy', + fakes.TestPolicy) + + def _create_policy(self): + values = { + 'user': self.ctx.user, + 'project': self.ctx.project, + } + policy = fakes.TestPolicy('DummyPolicy', 'test-policy', **values) + policy.store(self.ctx) + return policy + + def _create_cp_binding(self, cluster_id, policy_id): + values = { + 'priority': 30, + 'cooldown': 60, + 'level': 50, + 'enabled': True, + } + + pb = cp_mod.ClusterPolicy(cluster_id, policy_id, **values) + pb.id = 'FAKE_BINDING_ID' + return pb + + @mock.patch.object(cp_mod.ClusterPolicy, 'load_all') + @mock.patch.object(policy_mod.Policy, 'load') + def test_policy_check_missing_target(self, mock_load, mock_load_all): + cluster_id = 'FAKE_CLUSTER_ID' + # Note: policy is mocked + policy = mock.Mock() + policy.id = 'FAKE_POLICY_ID' + policy.TARGET = [('BEFORE', 'OBJECT_ACTION')] + # Note: policy binding is created but not stored + pb = self._create_cp_binding(cluster_id, policy.id) + self.assertIsNone(pb.last_op) + mock_load_all.return_value = [pb] + mock_load.return_value = policy + action = action_base.Action(self.ctx, 'OBJECT_ACTION_1') + + res = action.policy_check(cluster_id, 'AFTER') + + self.assertIsNone(res) + self.assertEqual(policy_mod.CHECK_OK, action.data['status']) + mock_load_all.assert_called_once_with( + action.context, cluster_id, + sort_keys=['priority'], filters={'enabled': True}) + mock_load.assert_called_once_with(action.context, policy.id) + # last_op was updated anyway + self.assertIsNotNone(pb.last_op) + # neither pre_op nor post_op was called, because target not match + self.assertEqual(0, policy.pre_op.call_count) + self.assertEqual(0, policy.post_op.call_count) + + @mock.patch.object(cp_mod.ClusterPolicy, 'load_all') + @mock.patch.object(policy_mod.Policy, 'load') + def test_policy_check_pre_op(self, mock_load, mock_load_all): + cluster_id = 'FAKE_CLUSTER_ID' + # Note: policy is mocked + policy = mock.Mock() + policy.id = 'FAKE_POLICY_ID' + policy.TARGET = [('BEFORE', 'OBJECT_ACTION')] + # Note: policy binding is created but not stored + pb = self._create_cp_binding(cluster_id, policy.id) + self.assertIsNone(pb.last_op) + mock_load_all.return_value = [pb] + mock_load.return_value = policy + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + + res = action.policy_check(cluster_id, 'BEFORE') + + self.assertIsNone(res) + self.assertEqual(policy_mod.CHECK_OK, action.data['status']) + mock_load_all.assert_called_once_with( + action.context, cluster_id, + sort_keys=['priority'], filters={'enabled': True}) + mock_load.assert_called_once_with(action.context, policy.id) + # last_op was not updated + self.assertIsNone(pb.last_op) + # pre_op is called, but post_op was not called + policy.pre_op.assert_called_once_with(cluster_id, action) + self.assertEqual(0, policy.post_op.call_count) + + @mock.patch.object(cp_mod.ClusterPolicy, 'load_all') + @mock.patch.object(policy_mod.Policy, 'load') + def test_policy_check_post_op(self, mock_load, mock_load_all): + cluster_id = 'FAKE_CLUSTER_ID' + # Note: policy is mocked + policy = mock.Mock() + policy.id = 'FAKE_POLICY_ID' + policy.TARGET = [('AFTER', 'OBJECT_ACTION')] + # Note: policy binding is created but not stored + pb = self._create_cp_binding(cluster_id, policy.id) + self.assertIsNone(pb.last_op) + mock_load_all.return_value = [pb] + mock_load.return_value = policy + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + + res = action.policy_check('FAKE_CLUSTER_ID', 'AFTER') + + self.assertIsNone(res) + self.assertEqual(policy_mod.CHECK_OK, action.data['status']) + mock_load_all.assert_called_once_with( + action.context, cluster_id, + sort_keys=['priority'], filters={'enabled': True}) + mock_load.assert_called_once_with(action.context, policy.id) + # last_op was updated for POST check + self.assertIsNotNone(pb.last_op) + # pre_op is called, but post_op was not called + self.assertEqual(0, policy.pre_op.call_count) + policy.post_op.assert_called_once_with(cluster_id, action) + + @mock.patch.object(cp_mod.ClusterPolicy, 'load_all') + @mock.patch.object(policy_mod.Policy, 'load') + def test_policy_check_cooldown_inprogress(self, mock_load, mock_load_all): + cluster_id = 'FAKE_CLUSTER_ID' + # Note: policy is mocked + policy = mock.Mock() + policy.id = 'FAKE_POLICY_ID' + policy.TARGET = [('AFTER', 'OBJECT_ACTION')] + # Note: policy binding is created but not stored + pb = self._create_cp_binding(cluster_id, policy.id) + self.patchobject(pb, 'cooldown_inprogress', return_value=True) + self.assertIsNone(pb.last_op) + mock_load_all.return_value = [pb] + mock_load.return_value = policy + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + + res = action.policy_check('FAKE_CLUSTER_ID', 'AFTER') + + self.assertIsNone(res) + self.assertEqual(policy_mod.CHECK_ERROR, action.data['status']) + self.assertEqual('Policy FAKE_POLICY_ID cooldown is still in ' + 'progress.', six.text_type(action.data['reason'])) + mock_load_all.assert_called_once_with( + action.context, cluster_id, + sort_keys=['priority'], filters={'enabled': True}) + mock_load.assert_called_once_with(action.context, policy.id) + # last_op was updated for POST check + self.assertIsNotNone(pb.last_op) + # neither pre_op nor post_op was not called, due to cooldown + self.assertEqual(0, policy.pre_op.call_count) + self.assertEqual(0, policy.post_op.call_count) + + @mock.patch.object(cp_mod.ClusterPolicy, 'load_all') + @mock.patch.object(policy_mod.Policy, 'load') + def test_policy_check_abort_in_middle(self, mock_load, mock_load_all): + def fake_post_op(action): + action.data['reason'] = 'BAD THINGS HAPPEN' + action.data['status'] = policy_mod.CHECK_ERROR + return + + cluster_id = 'FAKE_CLUSTER_ID' + # Note: both policies are mocked + policy1 = mock.Mock() + policy1.id = 'FAKE_POLICY_ID_1' + policy1.cooldown = 0 + policy1.TARGET = [('AFTER', 'OBJECT_ACTION')] + policy2 = mock.Mock() + policy2.id = 'FAKE_POLICY_ID_2' + policy2.cooldown = 0 + policy2.TARGET = [('AFTER', 'OBJECT_ACTION')] + + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + # Make policy1.post_op a failure + policy1.post_op = mock.MagicMock() + policy1.post_op.side_effect = fake_post_op(action) + + # Note: policy binding is created but not stored + pb1 = self._create_cp_binding(cluster_id, policy1.id) + pb2 = self._create_cp_binding(cluster_id, policy2.id) + mock_load_all.return_value = [pb1, pb2] + # mock return value for two calls + mock_load.side_effect = [policy1, policy2] + + res = action.policy_check(cluster_id, 'AFTER') + + self.assertIsNone(res) + + # post_op from policy1 was called, but post_op from policy2 was not + policy1.post_op.assert_called_once_with(cluster_id, action) + self.assertEqual(0, policy2.post_op.call_count) + + self.assertEqual(policy_mod.CHECK_ERROR, action.data['status']) + self.assertEqual('Policy checking failed at policy FAKE_POLICY_ID_1.', + six.text_type(action.data['reason'])) + mock_load_all.assert_called_once_with( + action.context, cluster_id, + sort_keys=['priority'], filters={'enabled': True}) + calls = [mock.call(action.context, policy1.id)] + mock_load.assert_has_calls(calls) + + +class ActionProcTest(base.SenlinTestCase): + + def setUp(self): + super(ActionProcTest, self).setUp() + + self.ctx = utils.dummy_context() + + @mock.patch.object(action_base, 'wallclock') + @mock.patch.object(db_api, 'action_acquire') + def test_action_proc_fail_acquire(self, mock_acquire, mock_clock): + mock_clock.return_value = 'TIMESTAMP' + mock_acquire.return_value = None + + res = action_base.ActionProc(self.ctx, 'ACTION', 'WORKER') + self.assertFalse(res) + + mock_clock.assert_called_once_with() + mock_acquire.assert_called_once_with( + self.ctx, 'ACTION', 'WORKER', 'TIMESTAMP') + + @mock.patch.object(action_base, 'wallclock') + @mock.patch.object(db_api, 'action_acquire') + @mock.patch.object(action_base.Action, 'load') + @mock.patch.object(db_api, 'action_mark_succeeded') + def test_action_proc_successful(self, mock_mark, mock_load, mock_acquire, + mock_clock): + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + self.patchobject(action, 'execute', + return_value=(action.RES_OK, 'BIG SUCCESS')) + mock_clock.return_value = 'TIMESTAMP' + mock_db_action = mock.Mock() + mock_acquire.return_value = mock_db_action + mock_load.return_value = action + + res = action_base.ActionProc(self.ctx, 'ACTION', 'WORKER') + self.assertTrue(res) + + mock_acquire.assert_called_once_with( + self.ctx, 'ACTION', 'WORKER', 'TIMESTAMP') + + mock_load.assert_called_once_with(self.ctx, db_action=mock_db_action) + self.assertEqual(action.SUCCEEDED, action.status) + self.assertEqual('BIG SUCCESS', action.status_reason) + + @mock.patch.object(action_base, 'wallclock') + @mock.patch.object(db_api, 'action_acquire') + @mock.patch.object(action_base.Action, 'load') + @mock.patch.object(db_api, 'action_mark_failed') + def test_action_proc_failed_error(self, mock_mark, mock_load, mock_acquire, + mock_clock): + action = action_base.Action(self.ctx, 'OBJECT_ACTION') + self.patchobject(action, 'execute', side_effect=Exception('Boom!')) + mock_clock.return_value = 'TIMESTAMP' + mock_db_action = mock.Mock() + mock_acquire.return_value = mock_db_action + mock_load.return_value = action + + res = action_base.ActionProc(self.ctx, 'ACTION', 'WORKER') + self.assertFalse(res) + + mock_acquire.assert_called_once_with( + self.ctx, 'ACTION', 'WORKER', 'TIMESTAMP') + + mock_load.assert_called_once_with(self.ctx, db_action=mock_db_action) + self.assertEqual(action.FAILED, action.status) + self.assertEqual('Boom!', action.status_reason) diff --git a/senlin/tests/unit/fakes.py b/senlin/tests/unit/fakes.py index 130e09df0..b7ce19f1b 100644 --- a/senlin/tests/unit/fakes.py +++ b/senlin/tests/unit/fakes.py @@ -60,21 +60,25 @@ class TestPolicy(policy_base.Policy): 'KEY2': schema.Integer('key2', default=1), } + TARGET = [ + ('BEFORE', 'CLUSTER_ADD_NODES') + ] + def __init__(self, type_name, name, **kwargs): super(TestPolicy, self).__init__(type_name, name, **kwargs) - def attach(self, context, cluster, policy_data): + def attach(self, cluster): + return True, {} + + def detach(self, cluster): + return True, 'OK' + + def pre_op(self, cluster_id, action): return - def detach(self, context, cluster, policy_data): + def post_op(self, cluster_id, action): return - def pre_op(self, cluster_id, action, policy_data): - return policy_data - - def post_op(self, cluster_id, action, policy_data): - return policy_data - class TestTrigger(trigger_base.Trigger): rule_schema = {