Fix servers policy for admin_or_owner

servers API policy is default to admin_or_owner[1] but API
is allowed for everyone.

We can see the test trying with other project context can access the API
- https://review.opendev.org/#/c/717204

This is because API does not pass the server project_id in policy target[2]
and if no target is passed then, policy.py add the default targets which is
nothing but context.project_id (allow for everyone who try to access)[3]

This commit fix this policy by passing the server's project_id in policy
target.

Closes-bug: #1871665
Partial implement blueprint policy-defaults-refresh

[1] cd16ae25c8/nova/policies/servers.py (L285)
[2] cd16ae25c8/nova/api/openstack/compute/servers.py (L872)
[3] c16315165c/nova/policy.py (L191)

Change-Id: Ia8234fd9f4ee1871d6f225c8bd4e4adc5289d605
This commit is contained in:
Ghanshyam Mann 2020-04-08 10:38:49 -05:00
parent 9b3bc4817e
commit 2a4a716209
3 changed files with 161 additions and 79 deletions

View File

@ -448,7 +448,6 @@ class ServersController(wsgi.Controller):
def show(self, req, id):
"""Returns server details by server id."""
context = req.environ['nova.context']
context.can(server_policies.SERVERS % 'show')
cell_down_support = api_version_request.is_supported(
req, min_version=PARTIAL_CONSTRUCT_FOR_CELL_DOWN_MIN_VERSION)
show_server_groups = api_version_request.is_supported(
@ -457,6 +456,9 @@ class ServersController(wsgi.Controller):
instance = self._get_server(
context, req, id, is_detail=True,
cell_down_support=cell_down_support)
context.can(server_policies.SERVERS % 'show',
target={'project_id': instance.project_id})
return self._view_builder.show(
req, instance, cell_down_support=cell_down_support,
show_server_groups=show_server_groups)
@ -869,8 +871,9 @@ class ServersController(wsgi.Controller):
@wsgi.action('confirmResize')
def _action_confirm_resize(self, req, id, body):
context = req.environ['nova.context']
context.can(server_policies.SERVERS % 'confirm_resize')
instance = self._get_server(context, req, id)
context.can(server_policies.SERVERS % 'confirm_resize',
target={'project_id': instance.project_id})
try:
self.compute_api.confirm_resize(context, instance)
except exception.MigrationNotFound:
@ -887,8 +890,9 @@ class ServersController(wsgi.Controller):
@wsgi.action('revertResize')
def _action_revert_resize(self, req, id, body):
context = req.environ['nova.context']
context.can(server_policies.SERVERS % 'revert_resize')
instance = self._get_server(context, req, id)
context.can(server_policies.SERVERS % 'revert_resize',
target={'project_id': instance.project_id})
try:
self.compute_api.revert_resize(context, instance)
except exception.MigrationNotFound:
@ -911,8 +915,9 @@ class ServersController(wsgi.Controller):
reboot_type = body['reboot']['type'].upper()
context = req.environ['nova.context']
context.can(server_policies.SERVERS % 'reboot')
instance = self._get_server(context, req, id)
context.can(server_policies.SERVERS % 'reboot',
target={'project_id': instance.project_id})
try:
self.compute_api.reboot(context, instance, reboot_type)
@ -1193,7 +1198,10 @@ class ServersController(wsgi.Controller):
def _action_create_image(self, req, id, body):
"""Snapshot a server instance."""
context = req.environ['nova.context']
context.can(server_policies.SERVERS % 'create_image')
instance = self._get_server(context, req, id)
target = {'project_id': instance.project_id}
context.can(server_policies.SERVERS % 'create_image',
target=target)
entity = body["createImage"]
image_name = common.normalize_name(entity["name"])
@ -1205,8 +1213,6 @@ class ServersController(wsgi.Controller):
api_version_request.MAX_IMAGE_META_PROXY_API_VERSION):
common.check_img_metadata_properties_quota(context, metadata)
instance = self._get_server(context, req, id)
bdms = objects.BlockDeviceMappingList.get_by_instance_uuid(
context, instance.uuid)
@ -1214,7 +1220,7 @@ class ServersController(wsgi.Controller):
if compute_utils.is_volume_backed_instance(context, instance,
bdms):
context.can(server_policies.SERVERS %
'create_image:allow_volume_backed')
'create_image:allow_volume_backed', target=target)
image = self.compute_api.snapshot_volume_backed(
context,
instance,
@ -1298,7 +1304,9 @@ class ServersController(wsgi.Controller):
"""Start an instance."""
context = req.environ['nova.context']
instance = self._get_instance(context, id)
context.can(server_policies.SERVERS % 'start', instance)
context.can(server_policies.SERVERS % 'start',
target={'user_id': instance.user_id,
'project_id': instance.project_id})
try:
self.compute_api.start(context, instance)
except (exception.InstanceNotReady, exception.InstanceIsLocked) as e:

View File

@ -276,6 +276,22 @@ class ControllerTest(test.TestCase):
class ServersControllerTest(ControllerTest):
wsgi_api_version = os_wsgi.DEFAULT_API_VERSION
def setUp(self):
super(ServersControllerTest, self).setUp()
self.request = fakes.HTTPRequest.blank(
self.path_with_id_v2 % FAKE_UUID,
use_admin_context=False,
version=self.wsgi_api_version)
return_server = fakes.fake_compute_get(
id=2, availability_zone='nova',
launched_at=None,
terminated_at=None,
task_state=None,
vm_state=vm_states.ACTIVE,
power_state=1,
project_id=self.request.environ['nova.context'].project_id)
self.mock_get.side_effect = return_server
def req(self, url, use_admin_context=False):
return fakes.HTTPRequest.blank(url,
use_admin_context=use_admin_context,
@ -348,8 +364,7 @@ class ServersControllerTest(ControllerTest):
self.assertEqual([(None, None, port, None)], res.as_tuples())
def test_get_server_by_uuid(self):
req = self.req(self.path_with_id % FAKE_UUID)
res_dict = self.controller.show(req, FAKE_UUID)
res_dict = self.controller.show(self.request, FAKE_UUID)
self.assertEqual(res_dict['server']['id'], FAKE_UUID)
def test_get_server_joins(self):
@ -359,27 +374,31 @@ class ServersControllerTest(ControllerTest):
'numa_topology'], expected_attrs)
ctxt = context.RequestContext('fake', self.project_id)
return fake_instance.fake_instance_obj(
ctxt, expected_attrs=expected_attrs)
ctxt, expected_attrs=expected_attrs,
project_id=self.request.environ['nova.context'].project_id)
self.mock_get.side_effect = fake_get
req = self.req(self.path_with_id % FAKE_UUID)
self.controller.show(req, FAKE_UUID)
self.controller.show(self.request, FAKE_UUID)
def test_unique_host_id(self):
"""Create two servers with the same host and different
project_ids and check that the host_id's are unique.
"""
def return_instance_with_host(context, *args, **kwargs):
project_id = uuidutils.generate_uuid()
project_id = context.project_id
return fakes.stub_instance_obj(context, id=1, uuid=FAKE_UUID,
project_id=project_id,
host='fake_host')
req = self.req(self.path_with_id % FAKE_UUID)
req1 = self.req(self.path_with_id % FAKE_UUID)
project_id = uuidutils.generate_uuid()
req2 = fakes.HTTPRequest.blank(self.path_with_id % FAKE_UUID,
version=self.wsgi_api_version,
project_id=project_id)
self.mock_get.side_effect = return_instance_with_host
server1 = self.controller.show(req, FAKE_UUID)
server2 = self.controller.show(req, FAKE_UUID)
server1 = self.controller.show(req1, FAKE_UUID)
server2 = self.controller.show(req2, FAKE_UUID)
self.assertNotEqual(server1['server']['hostId'],
server2['server']['hostId'])
@ -390,9 +409,8 @@ class ServersControllerTest(ControllerTest):
"server": {
"id": uuid,
"user_id": "fake_user",
"tenant_id": "fake_project",
"updated": "2010-11-11T11:00:00Z",
"created": "2010-10-10T12:00:00Z",
"updated": "2010-11-11T11:00:00Z",
"progress": progress,
"name": "server2",
"status": status,
@ -456,7 +474,8 @@ class ServersControllerTest(ControllerTest):
"os-extended-volumes:volumes_attached": [
{'id': 'some_volume_1'},
{'id': 'some_volume_2'},
]
],
"tenant_id": self.request.environ['nova.context'].project_id
}
}
@ -465,20 +484,23 @@ class ServersControllerTest(ControllerTest):
flavor_bookmark = "http://localhost/%s/flavors/2" % self.project_id
uuid = FAKE_UUID
req = self.req(self.path_with_id_v2, uuid)
res_dict = self.controller.show(req, uuid)
res_dict = self.controller.show(self.request, uuid)
expected_server = self._get_server_data_dict(uuid,
image_bookmark,
flavor_bookmark,
progress=0)
expected_server['server']['tenant_id'] = self.request.environ[
'nova.context'].project_id
self.assertThat(res_dict, matchers.DictMatches(expected_server))
def test_get_server_empty_az(self):
self.mock_get.side_effect = fakes.fake_compute_get(
availability_zone='')
uuid = FAKE_UUID
req = self.req(self.path_with_id_v2 % uuid)
self.mock_get.side_effect = fakes.fake_compute_get(
availability_zone='',
project_id=req.environ['nova.context'].project_id)
res_dict = self.controller.show(req, uuid)
self.assertEqual(res_dict['server']['OS-EXT-AZ:availability_zone'], '')
@ -486,15 +508,17 @@ class ServersControllerTest(ControllerTest):
image_bookmark = "http://localhost/%s/images/10" % self.project_id
flavor_bookmark = "http://localhost/%s/flavors/2" % self.project_id
req = self.req(self.path_with_id % FAKE_UUID)
res_dict = self.controller.show(req, FAKE_UUID)
res_dict = self.controller.show(self.request, FAKE_UUID)
expected_server = self._get_server_data_dict(FAKE_UUID,
image_bookmark,
flavor_bookmark,
progress=0)
expected_server['server']['tenant_id'] = self.request.environ[
'nova.context'].project_id
self.assertThat(res_dict, matchers.DictMatches(expected_server))
self.mock_get.assert_called_once_with(
req.environ['nova.context'], FAKE_UUID,
self.request.environ['nova.context'], FAKE_UUID,
expected_attrs=['flavor', 'info_cache', 'metadata',
'numa_topology'], cell_down_support=False)
@ -502,16 +526,17 @@ class ServersControllerTest(ControllerTest):
image_bookmark = "http://localhost/%s/images/10" % self.project_id
flavor_bookmark = "http://localhost/%s/flavors/2" % self.project_id
req = self.req(self.path_with_id % FAKE_UUID)
res_dict = self.controller.show(req, FAKE_UUID)
res_dict = self.controller.show(self.request, FAKE_UUID)
expected_server = self._get_server_data_dict(FAKE_UUID,
image_bookmark,
flavor_bookmark,
progress=0)
expected_server['server']['tenant_id'] = self.request.environ[
'nova.context'].project_id
self.assertThat(res_dict, matchers.DictMatches(expected_server))
self.mock_get.assert_called_once_with(
req.environ['nova.context'], FAKE_UUID,
self.request.environ['nova.context'], FAKE_UUID,
expected_attrs=['flavor', 'info_cache', 'metadata',
'numa_topology'], cell_down_support=False)
@ -595,9 +620,10 @@ class ServersControllerTest(ControllerTest):
def test_show_server_hide_addresses_in_building(self):
uuid = FAKE_UUID
self.mock_get.side_effect = fakes.fake_compute_get(
uuid=uuid, vm_state=vm_states.BUILDING)
req = self.req(self.path_with_id_v2 % uuid)
self.mock_get.side_effect = fakes.fake_compute_get(
uuid=uuid, vm_state=vm_states.BUILDING,
project_id=req.environ['nova.context'].project_id)
res_dict = self.controller.show(req, uuid)
self.assertEqual({}, res_dict['server']['addresses'])
@ -630,9 +656,10 @@ class ServersControllerTest(ControllerTest):
],
},
}
self.mock_get.side_effect = fakes.fake_compute_get(
nw_cache=nw_cache, uuid=uuid, vm_state=vm_states.ACTIVE)
req = self.req(self.path_with_id_v2 % uuid)
self.mock_get.side_effect = fakes.fake_compute_get(
nw_cache=nw_cache, uuid=uuid, vm_state=vm_states.ACTIVE,
project_id=req.environ['nova.context'].project_id)
res_dict = self.controller.show(req, uuid)
self.assertThat(res_dict['server']['addresses'],
matchers.DictMatches(expected['addresses']))
@ -1635,11 +1662,12 @@ class ServersControllerTest(ControllerTest):
def test_show_server_usage(self):
DATE1 = datetime.datetime(year=2013, month=4, day=5, hour=12)
DATE2 = datetime.datetime(year=2013, month=4, day=5, hour=13)
self.mock_get.side_effect = fakes.fake_compute_get(
id=1, uuid=FAKE_UUID, launched_at=DATE1, terminated_at=DATE2)
req = self.req(self.path_with_id % FAKE_UUID)
req.accept = 'application/json'
req.method = 'GET'
self.mock_get.side_effect = fakes.fake_compute_get(
id=1, uuid=FAKE_UUID, launched_at=DATE1, terminated_at=DATE2,
project_id=req.environ['nova.context'].project_id)
res = req.get_response(compute.APIRouterV21())
self.assertEqual(res.status_int, 200)
self.useFixture(utils_fixture.TimeFixture())
@ -1770,6 +1798,8 @@ class ServersControllerTestV23(ServersControllerTest):
def setUp(self):
super(ServersControllerTestV23, self).setUp()
self.request = self.req(self.path_with_id % FAKE_UUID)
self.project_id = self.request.environ['nova.context'].project_id
self.mock_get.side_effect = fakes.fake_compute_get(
id=2, uuid=FAKE_UUID,
node="node-fake",
@ -1784,7 +1814,8 @@ class ServersControllerTestV23(ServersControllerTest):
terminated_at=None,
task_state=None,
vm_state=vm_states.ACTIVE,
power_state=1)
power_state=1,
project_id=self.project_id)
def _get_server_data_dict(self, uuid, image_bookmark, flavor_bookmark,
status="ACTIVE", progress=100):
@ -1809,14 +1840,14 @@ class ServersControllerTestV23(ServersControllerTest):
server_dict['server']["os-extended-volumes:volumes_attached"] = [
{'id': 'some_volume_1', 'delete_on_termination': True},
{'id': 'some_volume_2', 'delete_on_termination': False}]
server_dict['server']["tenant_id"] = self.project_id
return server_dict
def test_show(self):
image_bookmark = "http://localhost/%s/images/10" % self.project_id
flavor_bookmark = "http://localhost/%s/flavors/2" % self.project_id
req = self.req(self.path_with_id % FAKE_UUID)
res_dict = self.controller.show(req, FAKE_UUID)
res_dict = self.controller.show(self.request, FAKE_UUID)
expected_server = self._get_server_data_dict(FAKE_UUID,
image_bookmark,
@ -1844,14 +1875,16 @@ class ServersControllerTestV23(ServersControllerTest):
terminated_at=None,
task_state=None,
vm_state=vm_states.ACTIVE,
power_state=1)
power_state=1,
project_id=context.project_id)
obj_list.append(server)
return objects.InstanceList(objects=obj_list)
self.mock_get_all.side_effect = None
self.mock_get_all.return_value = fake_get_all(context)
req = self.req(self.path_detail)
self.mock_get_all.return_value = fake_get_all(
req.environ['nova.context'])
servers_list = self.controller.detail(req)
image_bookmark = "http://localhost/%s/images/10" % self.project_id
flavor_bookmark = "http://localhost/%s/flavors/2" % self.project_id
@ -1882,7 +1915,8 @@ class ServersControllerTestV29(ServersControllerTest):
terminated_at=None,
task_state=None,
vm_state=vm_states.ACTIVE,
power_state=1)
power_state=1,
project_id=self.request.environ['nova.context'].project_id)
def _get_server_data_dict(self, uuid, image_bookmark, flavor_bookmark,
status="ACTIVE", progress=100):
@ -1908,11 +1942,15 @@ class ServersControllerTestV29(ServersControllerTest):
server_dict['server']["os-extended-volumes:volumes_attached"] = [
{'id': 'some_volume_1', 'delete_on_termination': True},
{'id': 'some_volume_2', 'delete_on_termination': False}]
server_dict['server']["tenant_id"] = self.request.environ[
'nova.context'].project_id
return server_dict
def _test_get_server_with_lock(self, locked_by):
image_bookmark = "http://localhost/%s/images/10" % self.project_id
flavor_bookmark = "http://localhost/%s/flavors/2" % self.project_id
req = self.req(self.path_with_id % FAKE_UUID)
project_id = req.environ['nova.context'].project_id
self.mock_get.side_effect = fakes.fake_compute_get(
id=2, locked_by=locked_by, uuid=FAKE_UUID,
node="node-fake",
@ -1927,9 +1965,9 @@ class ServersControllerTestV29(ServersControllerTest):
terminated_at=None,
task_state=None,
vm_state=vm_states.ACTIVE,
power_state=1)
power_state=1,
project_id=project_id)
req = self.req(self.path_with_id % FAKE_UUID)
res_dict = self.controller.show(req, FAKE_UUID)
expected_server = self._get_server_data_dict(FAKE_UUID,
@ -1937,6 +1975,7 @@ class ServersControllerTestV29(ServersControllerTest):
flavor_bookmark,
progress=0)
expected_server['server']['locked'] = True if locked_by else False
expected_server['server']['tenant_id'] = project_id
self.assertThat(res_dict, matchers.DictMatches(expected_server))
return res_dict
@ -2032,7 +2071,8 @@ class ServersControllerTestV216(ServersControllerTest):
terminated_at=None,
task_state=None,
vm_state=vm_states.ACTIVE,
power_state=1)
power_state=1,
project_id=self.request.environ['nova.context'].project_id)
self.mock_get_instance_host_status = self.useFixture(
fixtures.MockPatchObject(
compute_api.API, 'get_instance_host_status',
@ -2066,6 +2106,8 @@ class ServersControllerTestV216(ServersControllerTest):
server_dict['server']["os-extended-volumes:volumes_attached"] = [
{'id': 'some_volume_1', 'delete_on_termination': True},
{'id': 'some_volume_2', 'delete_on_termination': False}]
server_dict['server']['tenant_id'] = self.request.environ[
'nova.context'].project_id
return server_dict
@ -2087,14 +2129,14 @@ class ServersControllerTestV216(ServersControllerTest):
def test_show(self):
image_bookmark = "http://localhost/%s/images/10" % self.project_id
flavor_bookmark = "http://localhost/%s/flavors/2" % self.project_id
req = self.req(self.path_with_id % FAKE_UUID)
res_dict = self.controller.show(req, FAKE_UUID)
res_dict = self.controller.show(self.request, FAKE_UUID)
expected_server = self._get_server_data_dict(FAKE_UUID,
image_bookmark,
flavor_bookmark,
progress=0)
self.assertThat(res_dict, matchers.DictMatches(expected_server))
func = functools.partial(self.controller.show, req, FAKE_UUID)
func = functools.partial(self.controller.show, self.request,
FAKE_UUID)
self._verify_host_status_policy_behavior(func)
def test_detail(self):
@ -2118,14 +2160,16 @@ class ServersControllerTestV216(ServersControllerTest):
terminated_at=None,
task_state=None,
vm_state=vm_states.ACTIVE,
power_state=1)
power_state=1,
project_id=context.project_id)
obj_list.append(server)
return objects.InstanceList(objects=obj_list)
self.mock_get_all.side_effect = None
self.mock_get_all.return_value = fake_get_all(context)
req = self.req(self.path_detail)
self.mock_get_all.return_value = fake_get_all(
req.environ['nova.context'])
servers_list = self.controller.detail(req)
self.assertEqual(2, len(servers_list['servers']))
image_bookmark = "http://localhost/%s/images/10" % self.project_id
@ -2163,7 +2207,8 @@ class ServersControllerTestV219(ServersControllerTest):
terminated_at=None,
task_state=None,
vm_state=vm_states.ACTIVE,
power_state=1)
power_state=1,
project_id=self.request.environ['nova.context'].project_id)
self.useFixture(fixtures.MockPatchObject(
compute_api.API, 'get_instance_host_status',
return_value='UP')).mock
@ -2200,6 +2245,8 @@ class ServersControllerTestV219(ServersControllerTest):
def _test_get_server_with_description(self, description):
image_bookmark = "http://localhost/%s/images/10" % self.project_id
flavor_bookmark = "http://localhost/%s/flavors/2" % self.project_id
req = self.req(self.path_with_id % FAKE_UUID)
project_id = req.environ['nova.context'].project_id
self.mock_get.side_effect = fakes.fake_compute_get(
id=2, display_description=description, uuid=FAKE_UUID,
node="node-fake",
@ -2214,9 +2261,9 @@ class ServersControllerTestV219(ServersControllerTest):
terminated_at=None,
task_state=None,
vm_state=vm_states.ACTIVE,
power_state=1)
power_state=1,
project_id=project_id)
req = self.req(self.path_with_id % FAKE_UUID)
res_dict = self.controller.show(req, FAKE_UUID)
expected_server = self._get_server_data_dict(FAKE_UUID,
@ -2224,6 +2271,7 @@ class ServersControllerTestV219(ServersControllerTest):
flavor_bookmark,
progress=0,
description=description)
expected_server['server']['tenant_id'] = project_id
self.assertThat(res_dict, matchers.DictMatches(expected_server))
return res_dict
@ -2265,7 +2313,8 @@ class ServersControllerTestV226(ControllerTest):
def fake_get(*args, **kwargs):
self.assertIn('tags', kwargs['expected_attrs'])
fake_server = fakes.stub_instance_obj(
ctxt, id=2, vm_state=vm_states.ACTIVE, progress=100)
ctxt, id=2, vm_state=vm_states.ACTIVE, progress=100,
project_id=ctxt.project_id)
tag_list = objects.TagList(objects=[
objects.Tag(resource_id=FAKE_UUID, tag=tag)
@ -2512,6 +2561,10 @@ class ServersControllerTestV271(ControllerTest):
def test_show_server_group_not_exist(self):
req = self.req(self.path_with_id % FAKE_UUID)
return_server = fakes.fake_compute_get(
id=2, vm_state=vm_states.ACTIVE,
project_id=req.environ['nova.context'].project_id)
self.mock_get.side_effect = return_server
servers = self.controller.show(req, FAKE_UUID)
expect_sg = []
self.assertEqual(expect_sg, servers['server']['server_groups'])
@ -4040,11 +4093,13 @@ class ServerStatusTest(test.TestCase):
self.controller = servers.ServersController()
def _get_with_state(self, vm_state, task_state=None):
self.stub_out('nova.compute.api.API.get',
fakes.fake_compute_get(vm_state=vm_state,
task_state=task_state))
request = fakes.HTTPRequestV21.blank(self.path_with_id % FAKE_UUID)
self.stub_out('nova.compute.api.API.get',
fakes.fake_compute_get(
vm_state=vm_state,
task_state=task_state,
project_id=request.environ['nova.context'].project_id))
return self.controller.show(request, FAKE_UUID)
def test_active(self):
@ -4065,6 +4120,11 @@ class ServerStatusTest(test.TestCase):
rule = {'compute:reboot': 'role:admin'}
policy.set_rules(oslo_policy.Rules.from_dict(rule))
req = fakes.HTTPRequestV21.blank(self.path_action % '1234')
self.stub_out('nova.compute.api.API.get',
fakes.fake_compute_get(
vm_state='ACTIVE',
task_state=None,
project_id=req.environ['nova.context'].project_id))
self.assertRaises(exception.PolicyNotAuthorized,
self.controller._action_reboot, req, '1234',
body={'reboot': {'type': 'HARD'}})
@ -4087,6 +4147,12 @@ class ServerStatusTest(test.TestCase):
rule = {'compute:confirm_resize': 'role:admin'}
policy.set_rules(oslo_policy.Rules.from_dict(rule))
req = fakes.HTTPRequestV21.blank(self.path_action % '1234')
self.stub_out('nova.compute.api.API.get',
fakes.fake_compute_get(
vm_state='ACTIVE',
task_state=None,
project_id=req.environ['nova.context'].project_id))
self.assertRaises(exception.PolicyNotAuthorized,
self.controller._action_confirm_resize, req, '1234', {})
@ -4103,6 +4169,12 @@ class ServerStatusTest(test.TestCase):
rule = {'compute:revert_resize': 'role:admin'}
policy.set_rules(oslo_policy.Rules.from_dict(rule))
req = fakes.HTTPRequestV21.blank(self.path_action % '1234')
self.stub_out('nova.compute.api.API.get',
fakes.fake_compute_get(
vm_state='ACTIVE',
task_state=None,
project_id=req.environ['nova.context'].project_id))
self.assertRaises(exception.PolicyNotAuthorized,
self.controller._action_revert_resize, req, '1234', {})
@ -8107,7 +8179,20 @@ class ServersPolicyEnforcementV21(test.NoDBTestCase):
self.useFixture(nova_fixtures.AllServicesCurrent())
self.controller = servers.ServersController()
self.req = fakes.HTTPRequest.blank('')
user_id = self.req.environ['nova.context'].user_id
project_id = self.req.environ['nova.context'].project_id
self.image_uuid = '76fa36fc-c930-4bf3-8c8a-ea2a2420deb6'
self.instance = fake_instance.fake_instance_obj(
self.req.environ['nova.context'],
id=1, uuid=uuids.fake_id, project_id=project_id,
user_id=user_id, vm_state=vm_states.ACTIVE)
self.mock_get = self.useFixture(
fixtures.MockPatch('nova.api.openstack.common.get_instance')).mock
self.mock_get.return_value = self.instance
self.mock_get_instance = self.useFixture(fixtures.MockPatchObject(
self.controller, '_get_instance')).mock
self.mock_get_instance.return_value = self.instance
def _common_policy_check(self, rules, rule_name, func, *arg, **kwarg):
self.policy.set_rules(rules)
@ -8142,13 +8227,8 @@ class ServersPolicyEnforcementV21(test.NoDBTestCase):
self.req, FAKE_UUID, body={'trigger_crash_dump': None})
@mock.patch('nova.compute.api.API.trigger_crash_dump')
@mock.patch.object(servers.ServersController, '_get_instance')
def test_trigger_crash_dump_overridden_policy_pass_with_same_project(
self, _get_instance_mock, trigger_crash_dump_mock):
instance = fake_instance.fake_instance_obj(
self.req.environ['nova.context'],
project_id=self.req.environ['nova.context'].project_id)
_get_instance_mock.return_value = instance
self, trigger_crash_dump_mock):
rule_name = "os_compute_api:servers:trigger_crash_dump"
self.policy.set_rules({rule_name: "project_id:%(project_id)s"})
self.req.api_version_request = (
@ -8156,7 +8236,7 @@ class ServersPolicyEnforcementV21(test.NoDBTestCase):
self.controller._action_trigger_crash_dump(
self.req, fakes.FAKE_UUID, body={'trigger_crash_dump': None})
trigger_crash_dump_mock.assert_called_once_with(
self.req.environ['nova.context'], instance)
self.req.environ['nova.context'], self.instance)
@mock.patch.object(servers.ServersController, '_get_instance')
def test_trigger_crash_dump_overridden_policy_failed_with_other_user(
@ -8179,13 +8259,8 @@ class ServersPolicyEnforcementV21(test.NoDBTestCase):
exc.format_message())
@mock.patch('nova.compute.api.API.trigger_crash_dump')
@mock.patch.object(servers.ServersController, '_get_instance')
def test_trigger_crash_dump_overridden_policy_pass_with_same_user(
self, _get_instance_mock, trigger_crash_dump_mock):
instance = fake_instance.fake_instance_obj(
self.req.environ['nova.context'],
user_id=self.req.environ['nova.context'].user_id)
_get_instance_mock.return_value = instance
self, trigger_crash_dump_mock):
rule_name = "os_compute_api:servers:trigger_crash_dump"
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.req.api_version_request = (
@ -8193,7 +8268,7 @@ class ServersPolicyEnforcementV21(test.NoDBTestCase):
self.controller._action_trigger_crash_dump(
self.req, fakes.FAKE_UUID, body={'trigger_crash_dump': None})
trigger_crash_dump_mock.assert_called_once_with(
self.req.environ['nova.context'], instance)
self.req.environ['nova.context'], self.instance)
def test_index_policy_failed(self):
rule_name = "os_compute_api:servers:index"
@ -8223,9 +8298,7 @@ class ServersPolicyEnforcementV21(test.NoDBTestCase):
self._common_policy_check(
rule, rule_name, self.controller._get_servers, req, False)
@mock.patch.object(common, 'get_instance')
def test_show_policy_failed(self, get_instance_mock):
get_instance_mock.return_value = None
def test_show_policy_failed(self):
rule_name = "os_compute_api:servers:show"
rule = {rule_name: "project:non_fake"}
self._common_policy_check(

View File

@ -18,6 +18,7 @@ policy_data = """
"context_is_admin": "role:admin or role:administrator",
"os_compute_api:servers:show:host_status": "",
"os_compute_api:servers:show": "",
"os_compute_api:servers:show:host_status:unknown-only": "",
"os_compute_api:servers:allow_all_filters": "",
"os_compute_api:servers:migrations:force_complete": "",