nova/nova/tests/unit/policies/test_instance_actions.py

306 lines
14 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import fixtures
import mock
from nova.api.openstack import api_version_request
from oslo_policy import policy as oslo_policy
from oslo_utils.fixture import uuidsentinel as uuids
from oslo_utils import timeutils
from nova.api.openstack.compute import instance_actions as instance_actions_v21
from nova.compute import vm_states
from nova import exception
from nova.policies import base as base_policy
from nova.policies import instance_actions as ia_policies
from nova import policy
from nova.tests import fixtures as nova_fixtures
from nova.tests.unit.api.openstack.compute import test_instance_actions
from nova.tests.unit.api.openstack import fakes
from nova.tests.unit import fake_instance
from nova.tests.unit import fake_server_actions
from nova.tests.unit.policies import base
FAKE_UUID = fake_server_actions.FAKE_UUID
FAKE_REQUEST_ID = fake_server_actions.FAKE_REQUEST_ID1
class InstanceActionsPolicyTest(base.BasePolicyTest):
"""Test os-instance-actions APIs policies with all possible context.
This class defines the set of context with different roles
which are allowed and not allowed to pass the policy checks.
With those set of context, it will call the API operation and
verify the expected behaviour.
"""
def setUp(self):
super(InstanceActionsPolicyTest, self).setUp()
self.controller = instance_actions_v21.InstanceActionsController()
self.req = fakes.HTTPRequest.blank('')
self.fake_actions = copy.deepcopy(fake_server_actions.FAKE_ACTIONS)
self.fake_events = copy.deepcopy(fake_server_actions.FAKE_EVENTS)
self.mock_get = self.useFixture(
fixtures.MockPatch('nova.api.openstack.common.get_instance')).mock
uuid = uuids.fake_id
self.instance = fake_instance.fake_instance_obj(
self.project_member_context,
id=1, uuid=uuid, project_id=self.project_id,
vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# With legacy rule and no scope checks, any role in project can
# get server action and all admin is able to get server action
# with event details.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# and project reader can get their server topology without host info.
self.project_reader_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
def _set_policy_rules(self, overwrite=True):
rules = {ia_policies.BASE_POLICY_NAME % 'show': '@'}
policy.set_rules(oslo_policy.Rules.from_dict(rules),
overwrite=overwrite)
def test_index_instance_action_policy(self):
rule_name = ia_policies.BASE_POLICY_NAME % "list"
self.common_policy_auth(
self.project_reader_authorized_contexts,
rule_name, self.controller.index,
self.req, self.instance['uuid'])
@mock.patch('nova.compute.api.InstanceActionAPI.action_get_by_request_id')
def test_show_instance_action_policy(self, mock_action_get):
fake_action = self.fake_actions[FAKE_UUID][FAKE_REQUEST_ID]
mock_action_get.return_value = fake_action
rule_name = ia_policies.BASE_POLICY_NAME % "show"
self.common_policy_auth(
self.project_reader_authorized_contexts,
rule_name, self.controller.show,
self.req, self.instance['uuid'], fake_action['request_id'])
@mock.patch('nova.objects.InstanceActionEventList.get_by_action')
@mock.patch('nova.objects.InstanceAction.get_by_request_id')
def test_show_instance_action_policy_with_events(
self, mock_get_action, mock_get_events):
"""Test to ensure skip checking policy rule
'os_compute_api:os-instance-actions:show'.
"""
fake_action = self.fake_actions[FAKE_UUID][FAKE_REQUEST_ID]
mock_get_action.return_value = fake_action
fake_events = self.fake_events[fake_action['id']]
fake_action['events'] = fake_events
mock_get_events.return_value = fake_events
fake_action_fmt = test_instance_actions.format_action(
copy.deepcopy(fake_action))
self._set_policy_rules(overwrite=False)
rule_name = ia_policies.BASE_POLICY_NAME % "events"
authorize_res, unauthorize_res = self.common_policy_auth(
self.project_admin_authorized_contexts,
rule_name, self.controller.show,
self.req, self.instance['uuid'],
fake_action['request_id'], fatal=False)
for action in authorize_res:
# In order to unify the display forms of 'start_time' and
# 'finish_time', format the results returned by the show api.
res_fmt = test_instance_actions.format_action(
action['instanceAction'])
self.assertEqual(fake_action_fmt['events'], res_fmt['events'])
for action in unauthorize_res:
self.assertNotIn('events', action['instanceAction'])
class InstanceActionsNoLegacyNoScopePolicyTest(InstanceActionsPolicyTest):
"""Test os-instance-actions APIs policies with no legacy deprecated rules
and no scope checks.
"""
without_deprecated_rules = True
rules_without_deprecation = {
ia_policies.BASE_POLICY_NAME % 'list':
base_policy.PROJECT_READER,
ia_policies.BASE_POLICY_NAME % 'show':
base_policy.PROJECT_READER,
ia_policies.BASE_POLICY_NAME % 'events':
base_policy.PROJECT_ADMIN,
}
def setUp(self):
super(InstanceActionsNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule, legacy admin loose power.
self.project_admin_authorized_contexts = [self.project_admin_context]
self.project_reader_authorized_contexts = [
self.project_admin_context, self.project_member_context,
self.project_reader_context]
class InstanceActionsDeprecatedPolicyTest(base.BasePolicyTest):
"""Test os-instance-actions APIs Deprecated policies.
This class checks if deprecated policy rules are overridden
by user on policy.yaml file then they still work because
oslo.policy add deprecated rules in logical OR condition
and enforces them for policy checks if overridden.
"""
def setUp(self):
super(InstanceActionsDeprecatedPolicyTest, self).setUp()
self.controller = instance_actions_v21.InstanceActionsController()
self.admin_or_owner_req = fakes.HTTPRequest.blank('')
self.admin_or_owner_req.environ[
'nova.context'] = self.project_admin_context
self.reader_req = fakes.HTTPRequest.blank('')
self.reader_req.environ['nova.context'] = self.project_reader_context
self.deprecated_policy = ia_policies.ROOT_POLICY
# Overridde rule with different checks than defaults so that we can
# verify the rule overridden case.
override_rules = {
self.deprecated_policy: base_policy.RULE_ADMIN_OR_OWNER,
}
# NOTE(brinzhang): Only override the deprecated rule in policy file
# so that we can verify if overridden checks are considered by
# oslo.policy.
# Oslo.policy will consider the overridden rules if:
# 1. overridden deprecated rule's checks are different than defaults
# 2. new rules are not present in policy file
self.policy = self.useFixture(nova_fixtures.OverridePolicyFixture(
rules_in_file=override_rules))
@mock.patch('nova.compute.api.InstanceActionAPI.actions_get')
@mock.patch('nova.api.openstack.common.get_instance')
def test_deprecated_policy_overridden_rule_is_checked(
self, mock_instance_get, mock_actions_get):
# Test to verify if deprecated overridden policy is working.
instance = fake_instance.fake_instance_obj(
self.admin_or_owner_req.environ['nova.context'])
# Check for success as admin_or_owner role. Deprecated rule
# has been overridden with admin checks in policy.yaml
# If admin role pass it means overridden rule is enforced by
# oslo.policy because new default is system reader and the old
# default is admin.
self.controller.index(self.admin_or_owner_req, instance['uuid'])
# check for failure with reader context.
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller.index,
self.reader_req,
instance['uuid'])
self.assertEqual(
"Policy doesn't allow os_compute_api:os-instance-actions:list "
"to be performed.", exc.format_message())
class InstanceActionsScopeTypePolicyTest(InstanceActionsPolicyTest):
"""Test os-instance-actions APIs policies with system scope enabled.
This class set the nova.conf [oslo_policy] enforce_scope to True,
so that we can switch on the scope checking on oslo policy side.
It defines the set of context with scoped token which are allowed
and not allowed to pass the policy checks.
With those set of context, it will run the API operation and
verify the expected behaviour.
"""
def setUp(self):
super(InstanceActionsScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# With Scope enable, system users no longer allowed.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
self.project_reader_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
@mock.patch('nova.objects.InstanceActionEventList.get_by_action')
@mock.patch('nova.objects.InstanceAction.get_by_request_id')
def test_show_instance_action_policy_with_show_details(
self, mock_get_action, mock_get_events):
"""Test to ensure skip checking policy rule
'os_compute_api:os-instance-actions:show'.
"""
self.req.api_version_request = api_version_request.APIVersionRequest(
'2.84')
fake_action = self.fake_actions[FAKE_UUID][FAKE_REQUEST_ID]
mock_get_action.return_value = fake_action
fake_events = self.fake_events[fake_action['id']]
fake_action['events'] = fake_events
mock_get_events.return_value = fake_events
fake_action_fmt = test_instance_actions.format_action(
copy.deepcopy(fake_action))
self._set_policy_rules(overwrite=False)
rule_name = ia_policies.BASE_POLICY_NAME % "events:details"
authorize_res, unauthorize_res = self.common_policy_auth(
self.project_admin_authorized_contexts,
rule_name, self.controller.show,
self.req, self.instance['uuid'],
fake_action['request_id'], fatal=False)
for action in authorize_res:
# Ensure the 'details' field in the action events
for event in action['instanceAction']['events']:
self.assertIn('details', event)
# In order to unify the display forms of 'start_time' and
# 'finish_time', format the results returned by the show api.
res_fmt = test_instance_actions.format_action(
action['instanceAction'])
self.assertEqual(fake_action_fmt['events'], res_fmt['events'])
# Because of the microversion > '2.51', that will be contain
# 'events' in the os-instance-actions show api response, but the
# 'details' should not contain in the action events.
for action in unauthorize_res:
# Ensure the 'details' field not in the action events
for event in action['instanceAction']['events']:
self.assertNotIn('details', event)
class InstanceActionsScopeTypeNoLegacyPolicyTest(InstanceActionsPolicyTest):
"""Test os-instance-actions APIs policies with system scope enabled,
and no more deprecated rules.
"""
without_deprecated_rules = True
rules_without_deprecation = {
ia_policies.BASE_POLICY_NAME % 'list':
base_policy.PROJECT_READER,
ia_policies.BASE_POLICY_NAME % 'show':
base_policy.PROJECT_READER,
ia_policies.BASE_POLICY_NAME % 'events':
base_policy.PROJECT_ADMIN,
}
def setUp(self):
super(InstanceActionsScopeTypeNoLegacyPolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# With no legacy and scope enable, only project admin, member,
# and reader will be able to get server action and only admin
# with event details.
self.project_admin_authorized_contexts = [self.project_admin_context]
self.project_reader_authorized_contexts = [
self.project_admin_context, self.project_member_context,
self.project_reader_context]