Merge "tests: Remove duplicate policy tests"

This commit is contained in:
Zuul 2021-06-16 15:49:27 +00:00 committed by Gerrit Code Review
commit ead497692b
5 changed files with 2 additions and 317 deletions

View File

@ -24,7 +24,6 @@ from nova.api.openstack.compute import server_groups as sg_v21
from nova import context
from nova import exception
from nova import objects
from nova.policies import server_groups as sg_policies
from nova import test
from nova.tests import fixtures
from nova.tests.unit.api.openstack import fakes
@ -164,26 +163,6 @@ class ServerGroupTestV21(test.NoDBTestCase):
# test as non-admin
self.controller.create(self.req, body={'server_group': sgroup})
def test_create_server_group_rbac_admin_only(self):
sgroup = server_group_template()
sgroup['policies'] = ['affinity']
# override policy to restrict to admin
rule_name = sg_policies.POLICY_ROOT % 'create'
rules = {rule_name: 'is_admin:True'}
self.policy.set_rules(rules, overwrite=False)
# check for success as admin
self.controller.create(self.admin_req, body={'server_group': sgroup})
# check for failure as non-admin
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller.create, self.req,
body={'server_group': sgroup})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def _create_instance(self, ctx, cell):
with context.target_cell(ctx, cell) as cctx:
instance = objects.Instance(context=cctx,
@ -419,25 +398,6 @@ class ServerGroupTestV21(test.NoDBTestCase):
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.show, self.foo_req, ig_uuid)
def test_display_members_rbac_admin_only(self):
ctx = context.RequestContext('fake_user', fakes.FAKE_PROJECT_ID)
ig_uuid = self._create_groups_and_instances(ctx)[0]
# override policy to restrict to admin
rule_name = sg_policies.POLICY_ROOT % 'show'
rules = {rule_name: 'is_admin:True'}
self.policy.set_rules(rules, overwrite=False)
# check for success as admin
self.controller.show(self.admin_req, ig_uuid)
# check for failure as non-admin
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller.show, self.req, ig_uuid)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_create_server_group_with_non_alphanumeric_in_name(self):
# The fix for bug #1434335 expanded the allowable character set
# for server group names to include non-alphanumeric characters
@ -626,22 +586,6 @@ class ServerGroupTestV21(test.NoDBTestCase):
limited='&limit=dummy&limit=1',
path='/os-server-groups?all_projects=1')
def test_list_server_groups_rbac_admin_only(self):
# override policy to restrict to admin
rule_name = sg_policies.POLICY_ROOT % 'index'
rules = {rule_name: 'is_admin:True'}
self.policy.set_rules(rules, overwrite=False)
# check for success as admin
self.controller.index(self.admin_req)
# check for failure as non-admin
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller.index, self.req)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
@mock.patch('nova.objects.InstanceGroup.destroy')
def test_delete_server_group_by_id(self, mock_destroy):
sg = server_group_template(id=uuidsentinel.sg1_id)
@ -679,26 +623,6 @@ class ServerGroupTestV21(test.NoDBTestCase):
ig_uuid = self._create_groups_and_instances(ctx)[0]
self.controller.delete(self.req, ig_uuid)
def test_delete_server_group_rbac_admin_only(self):
ctx = context.RequestContext('fake_user', fakes.FAKE_PROJECT_ID)
# override policy to restrict to admin
rule_name = sg_policies.POLICY_ROOT % 'delete'
rules = {rule_name: 'is_admin:True'}
self.policy.set_rules(rules, overwrite=False)
# check for success as admin
ig_uuid = self._create_groups_and_instances(ctx)[0]
self.controller.delete(self.admin_req, ig_uuid)
# check for failure as non-admin
ig_uuid = self._create_groups_and_instances(ctx)[0]
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller.delete, self.req, ig_uuid)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
class ServerGroupTestV213(ServerGroupTestV21):
wsgi_api_version = '2.13'

View File

@ -13,16 +13,13 @@
# under the License.
import mock
from oslo_policy import policy as oslo_policy
from oslo_utils.fixture import uuidsentinel as uuids
import webob
from nova.api.openstack.compute import servers \
as server_v21
from nova.api.openstack.compute import servers
from nova.compute import api as compute_api
from nova.db import api as db
from nova import exception
from nova import policy
from nova import test
from nova.tests import fixtures as nova_fixtures
from nova.tests.unit.api.openstack import fakes
@ -40,7 +37,7 @@ class ServerStartStopTestV21(test.TestCase):
project_id=fakes.FAKE_PROJECT_ID))
def _setup_controller(self):
self.controller = server_v21.ServersController()
self.controller = servers.ServersController()
@mock.patch.object(compute_api.API, 'start')
def test_start(self, start_mock):
@ -119,100 +116,3 @@ class ServerStartStopTestV21(test.TestCase):
body = dict(stop="")
self.assertRaises(webob.exc.HTTPNotFound,
self.controller._stop_server, self.req, uuids.instance, body)
class ServerStartStopPolicyEnforcementV21(test.TestCase):
start_policy = "os_compute_api:servers:start"
stop_policy = "os_compute_api:servers:stop"
def setUp(self):
super(ServerStartStopPolicyEnforcementV21, self).setUp()
self.controller = server_v21.ServersController()
self.req = fakes.HTTPRequest.blank('')
self.useFixture(nova_fixtures.SingleCellSimple())
self.stub_out(
'nova.db.api.instance_get_by_uuid',
fakes.fake_instance_get(
project_id=self.req.environ['nova.context'].project_id))
def test_start_policy_failed(self):
rules = {
self.start_policy: "project_id:non_fake"
}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
body = dict(start="")
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller._start_server,
self.req, uuids.instance, body)
self.assertIn(self.start_policy, exc.format_message())
def test_start_overridden_policy_failed_with_other_user_in_same_project(
self):
rules = {
self.start_policy: "user_id:%(user_id)s"
}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
# Change the user_id in request context.
self.req.environ['nova.context'].user_id = 'other-user'
body = dict(start="")
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller._start_server,
self.req, uuids.instance, body)
self.assertIn(self.start_policy, exc.format_message())
@mock.patch('nova.compute.api.API.start')
def test_start_overridden_policy_pass_with_same_user(self, start_mock):
rules = {
self.start_policy: "user_id:%(user_id)s"
}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
body = dict(start="")
self.controller._start_server(self.req, uuids.instance, body)
start_mock.assert_called_once_with(mock.ANY, mock.ANY)
def test_stop_policy_failed_with_other_project(self):
rules = {
self.stop_policy: "project_id:%(project_id)s"
}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
body = dict(stop="")
# Change the project_id in request context.
self.req.environ['nova.context'].project_id = 'other-project'
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller._stop_server,
self.req, uuids.instance, body)
self.assertIn(self.stop_policy, exc.format_message())
@mock.patch('nova.compute.api.API.stop')
def test_stop_overridden_policy_pass_with_same_project(self, stop_mock):
rules = {
self.stop_policy: "project_id:%(project_id)s"
}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
body = dict(stop="")
self.controller._stop_server(self.req, uuids.instance, body)
stop_mock.assert_called_once_with(mock.ANY, mock.ANY)
def test_stop_overridden_policy_failed_with_other_user_in_same_project(
self):
rules = {
self.stop_policy: "user_id:%(user_id)s"
}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
# Change the user_id in request context.
self.req.environ['nova.context'].user_id = 'other-user'
body = dict(stop="")
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller._stop_server,
self.req, uuids.instance, body)
self.assertIn(self.stop_policy, exc.format_message())
@mock.patch('nova.compute.api.API.stop')
def test_stop_overridden_policy_pass_with_same_user(self, stop_mock):
rules = {
self.stop_policy: "user_id:%(user_id)s"
}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
body = dict(stop="")
self.controller._stop_server(self.req, uuids.instance, body)
stop_mock.assert_called_once_with(mock.ANY, mock.ANY)

View File

@ -1176,25 +1176,6 @@ class ServersControllerTest(ControllerTest):
servers = self.controller.index(req)['servers']
self.assertEqual(1, len(servers))
def test_all_tenants_fail_policy(self):
def fake_get_all(context, search_opts=None, **kwargs):
self.assertIsNotNone(search_opts)
return [fakes.stub_instance_obj(100)]
rules = {
"os_compute_api:servers:index:get_all_tenants":
"project_id:non_fake",
"os_compute_api:servers:get_all":
"project_id:%s" % self.project_id,
}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
self.mock_get_all.side_effect = fake_get_all
req = self.req(self.path_with_query % 'all_tenants=1')
self.assertRaises(exception.PolicyNotAuthorized,
self.controller.index, req)
def test_get_servers_allows_flavor(self):
def fake_get_all(context, search_opts=None,
limit=None, marker=None,
@ -3726,18 +3707,6 @@ class ServersControllerRebuildTestV263(ControllerTest):
self.req, FAKE_UUID, body=self.body)
self.assertIn('Additional properties are not allowed', str(ex))
def test_rebuild_server_with_trusted_certs_policy_failed(self):
rule_name = "os_compute_api:servers:rebuild:trusted_certs"
rules = {"os_compute_api:servers:rebuild": "@",
rule_name: "project:%s" % fakes.FAKE_PROJECT_ID}
self.policy.set_rules(rules)
exc = self.assertRaises(exception.PolicyNotAuthorized,
self._rebuild_server,
certs=['0b5d2c72-12cc-4ba6-a8d7-3ff5cc1d8cb8'])
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
@mock.patch.object(compute_api.API, 'rebuild')
def test_rebuild_server_with_cert_validation_error(
self, mock_rebuild):
@ -3913,14 +3882,6 @@ class ServersControllerUpdateTest(ControllerTest):
self.assertRaises(webob.exc.HTTPNotFound, self.controller.update,
req, FAKE_UUID, body=body)
def test_update_server_policy_fail(self):
rule = {'compute:update': 'role:admin'}
policy.set_rules(oslo_policy.Rules.from_dict(rule))
body = {'server': {'name': 'server_test'}}
req = self._get_request(body)
self.assertRaises(exception.PolicyNotAuthorized,
self.controller.update, req, FAKE_UUID, body=body)
class ServersControllerTriggerCrashDumpTest(ControllerTest):
@ -3953,15 +3914,6 @@ class ServersControllerTriggerCrashDumpTest(ControllerTest):
body=self.body)
mock_trigger_crash_dump.assert_called_with(ctxt, self.instance)
def test_trigger_crash_dump_policy_failed(self):
rule_name = "os_compute_api:servers:trigger_crash_dump"
self.policy.set_rules({rule_name: "project_id:non_fake"})
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller._action_trigger_crash_dump,
self.req, FAKE_UUID, body=self.body)
self.assertIn("os_compute_api:servers:trigger_crash_dump",
exc.format_message())
@mock.patch.object(compute_api.API, 'trigger_crash_dump',
fake_start_stop_not_ready)
def test_trigger_crash_dump_not_ready(self):
@ -4117,19 +4069,6 @@ class ServerStatusTest(test.TestCase):
task_states.REBOOTING_HARD)
self.assertEqual(response['server']['status'], 'HARD_REBOOT')
def test_reboot_resize_policy_fail(self):
rule = {'compute:reboot': 'role:admin'}
policy.set_rules(oslo_policy.Rules.from_dict(rule))
req = fakes.HTTPRequestV21.blank(self.path_action % '1234')
self.stub_out('nova.compute.api.API.get',
fakes.fake_compute_get(
vm_state='ACTIVE',
task_state=None,
project_id=req.environ['nova.context'].project_id))
self.assertRaises(exception.PolicyNotAuthorized,
self.controller._action_reboot, req, '1234',
body={'reboot': {'type': 'HARD'}})
def test_rebuild(self):
response = self._get_with_state(vm_states.ACTIVE,
task_states.REBUILDING)
@ -4144,19 +4083,6 @@ class ServerStatusTest(test.TestCase):
task_states.RESIZE_PREP)
self.assertEqual(response['server']['status'], 'RESIZE')
def test_confirm_resize_policy_fail(self):
rule = {'compute:confirm_resize': 'role:admin'}
policy.set_rules(oslo_policy.Rules.from_dict(rule))
req = fakes.HTTPRequestV21.blank(self.path_action % '1234')
self.stub_out('nova.compute.api.API.get',
fakes.fake_compute_get(
vm_state='ACTIVE',
task_state=None,
project_id=req.environ['nova.context'].project_id))
self.assertRaises(exception.PolicyNotAuthorized,
self.controller._action_confirm_resize, req, '1234', {})
def test_verify_resize(self):
response = self._get_with_state(vm_states.RESIZED, None)
self.assertEqual(response['server']['status'], 'VERIFY_RESIZE')
@ -4166,19 +4092,6 @@ class ServerStatusTest(test.TestCase):
task_states.RESIZE_REVERTING)
self.assertEqual(response['server']['status'], 'REVERT_RESIZE')
def test_revert_resize_policy_fail(self):
rule = {'compute:revert_resize': 'role:admin'}
policy.set_rules(oslo_policy.Rules.from_dict(rule))
req = fakes.HTTPRequestV21.blank(self.path_action % '1234')
self.stub_out('nova.compute.api.API.get',
fakes.fake_compute_get(
vm_state='ACTIVE',
task_state=None,
project_id=req.environ['nova.context'].project_id))
self.assertRaises(exception.PolicyNotAuthorized,
self.controller._action_revert_resize, req, '1234', {})
def test_password_update(self):
response = self._get_with_state(vm_states.ACTIVE,
task_states.UPDATING_PASSWORD)
@ -6791,22 +6704,6 @@ class ServersControllerCreateTestV263(ServersControllerCreateTest):
body=self.body)
self.assertIn('Additional properties are not allowed', str(ex))
def test_create_server_with_trusted_certs_policy_failed(self):
rule_name = "os_compute_api:servers:create:trusted_certs"
rules = {"os_compute_api:servers:create": "@",
"os_compute_api:servers:create:forced_host": "@",
"os_compute_api:servers:create:attach_volume": "@",
"os_compute_api:servers:create:attach_network": "@",
rule_name: "project:fake"}
self._create_instance_req(['0b5d2c72-12cc-4ba6-a8d7-3ff5cc1d8cb8'])
self.policy.set_rules(rules)
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller.create, self.req,
body=self.body)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
@mock.patch.object(compute_api.API, 'create')
def test_create_server_with_cert_validation_error(
self, mock_create):

View File

@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
@ -1198,18 +1197,6 @@ class ServicesTestV253(test.TestCase):
self.controller.update, self.req, 1234, body={})
self.assertIn('Invalid uuid', str(ex))
def test_update_policy_failed(self):
"""Tests that policy is checked with microversion 2.53."""
rule_name = "os_compute_api:os-services:update"
self.policy.set_rules({rule_name: "project_id:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.update, self.req, uuidsentinel.service_uuid,
body={})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_update_service_not_found(self):
"""Tests that we get a 404 response if the service is not found by
the given uuid when handling a PUT request.

View File

@ -16,7 +16,6 @@
import datetime
import mock
from oslo_policy import policy as oslo_policy
from oslo_utils.fixture import uuidsentinel as uuids
from oslo_utils import timeutils
import webob
@ -28,14 +27,11 @@ import nova.conf
from nova import context
from nova import exception
from nova import objects
from nova import policy
from nova import test
from nova.tests.unit.api.openstack import fakes
CONF = nova.conf.CONF
SERVERS = 5
TENANTS = 2
HOURS = 24
@ -47,7 +43,6 @@ NOW = timeutils.utcnow()
START = NOW - datetime.timedelta(hours=HOURS)
STOP = NOW
FAKE_FLAVOR = {
'id': 1,
'vcpus': VCPUS,
@ -318,24 +313,6 @@ class SimpleTenantUsageTestV21(test.TestCase):
else:
self.assertNotIn('tenant_usage_links', res_dict)
def test_verify_show_cannot_view_other_tenant(self):
req = fakes.HTTPRequest.blank('?start=%s&end=%s' %
(START.isoformat(), STOP.isoformat()),
version=self.version)
req.environ['nova.context'] = self.alt_user_context
rules = {
self.policy_rule_prefix + ":show": [
["role:admin"], ["project_id:%(project_id)s"]]
}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
try:
self.assertRaises(exception.PolicyNotAuthorized,
self.controller.show, req, 'faketenant_0')
finally:
policy.reset()
def test_get_tenants_usage_with_bad_start_date(self):
future = NOW + datetime.timedelta(hours=HOURS)
req = fakes.HTTPRequest.blank('?start=%s&end=%s' %