Remove policy checkpoints from ComputeAPI

As legacy v2 API removed and v2.1 API check policy at API layer,
those policy checks under API layer are useless. This patch
cleanup them.

This patch also move the lock override unittest from compute layer
to the api layer unittest. It due to the override checks are only
done at api layer for v2.1 API.

Partially implements blueprint remove-legacy-v2-api-code

Change-Id: Ide865dabc3cd9d2cf4cbab7642ffa90f1a4d3fe2
This commit is contained in:
He Jie Xu 2016-06-05 17:06:19 +08:00
parent 545d8d8666
commit 54a5e73bb9
7 changed files with 24 additions and 373 deletions

View File

@ -170,7 +170,7 @@ def policy_decorator(scope):
return wrapped
return outer
wrap_check_policy = policy_decorator(scope='compute')
wrap_check_security_groups_policy = policy_decorator(
scope='compute:security_groups')
@ -1455,25 +1455,6 @@ class API(base.Base):
return instance
def _check_create_policies(self, context, availability_zone,
requested_networks, block_device_mapping, forced_host,
forced_node):
"""Check policies for create()."""
target = {'project_id': context.project_id,
'user_id': context.user_id,
'availability_zone': availability_zone}
if not self.skip_policy_check:
check_policy(context, 'create', target)
if requested_networks and len(requested_networks):
check_policy(context, 'create:attach_network', target)
if block_device_mapping:
check_policy(context, 'create:attach_volume', target)
if forced_host or forced_node:
check_policy(context, 'create:forced_host', {})
def _check_multiple_instances_with_neutron_ports(self,
requested_networks):
"""Check whether multiple instances are created from port id(s)."""
@ -1512,11 +1493,6 @@ class API(base.Base):
Returns a tuple of (instances, reservation_id)
"""
# Check policies up front to fail before performing more expensive work
self._check_create_policies(context, availability_zone,
requested_networks, block_device_mapping, forced_host,
forced_node)
if requested_networks and max_count > 1:
self._check_multiple_instances_with_specified_ip(
requested_networks)
@ -1873,7 +1849,6 @@ class API(base.Base):
reservations=reservations)
# NOTE(maoy): we allow delete to be called no matter what vm_state says.
@wrap_check_policy
@check_instance_lock
@check_instance_cell
@check_instance_state(vm_state=None, task_state=None,
@ -1891,7 +1866,6 @@ class API(base.Base):
self._delete(context, instance, 'delete', self._do_delete,
task_state=task_states.DELETING)
@wrap_check_policy
@check_instance_lock
@check_instance_cell
@check_instance_state(vm_state=None, task_state=None,
@ -1901,7 +1875,6 @@ class API(base.Base):
LOG.debug("Going to try to terminate instance", instance=instance)
self._delete_instance(context, instance)
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.SOFT_DELETED])
def restore(self, context, instance):
@ -1932,7 +1905,6 @@ class API(base.Base):
with excutils.save_and_reraise_exception():
quotas.rollback()
@wrap_check_policy
@check_instance_lock
@check_instance_state(must_have_launched=False)
def force_delete(self, context, instance):
@ -2011,9 +1983,6 @@ class API(base.Base):
LOG.debug("Invalid instance id %s", instance_id)
raise exception.InstanceNotFound(instance_id=instance_id)
if not self.skip_policy_check:
check_policy(context, 'get', instance)
if not want_objects:
instance = obj_base.obj_to_primitive(instance)
return instance
@ -2035,16 +2004,6 @@ class API(base.Base):
direction is based on the list of sort directions in the 'sort_dirs'
parameter.
"""
# TODO(bcwaldon): determine the best argument for target here
target = {
'project_id': context.project_id,
'user_id': context.user_id,
}
if not self.skip_policy_check:
check_policy(context, "get_all", target)
if search_opts is None:
search_opts = {}
@ -2169,7 +2128,6 @@ class API(base.Base):
# NOTE(melwitt): We don't check instance lock for backup because lock is
# intended to prevent accidental change/delete of instances
@wrap_check_policy
@check_instance_cell
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED,
vm_states.PAUSED, vm_states.SUSPENDED])
@ -2211,7 +2169,6 @@ class API(base.Base):
# NOTE(melwitt): We don't check instance lock for snapshot because lock is
# intended to prevent accidental change/delete of instances
@wrap_check_policy
@check_instance_cell
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED,
vm_states.PAUSED, vm_states.SUSPENDED])
@ -2375,7 +2332,6 @@ class API(base.Base):
return self.image_api.create(context, image_meta)
@wrap_check_policy
@check_instance_lock
def reboot(self, context, instance, reboot_type):
"""Reboot the given instance."""
@ -2417,7 +2373,6 @@ class API(base.Base):
block_device_info=None,
reboot_type='HARD')
@wrap_check_policy
@check_instance_lock
@check_instance_cell
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED,
@ -2512,7 +2467,6 @@ class API(base.Base):
request_spec=request_spec,
kwargs=kwargs)
@wrap_check_policy
@check_instance_lock
@check_instance_cell
@check_instance_state(vm_state=[vm_states.RESIZED])
@ -2548,7 +2502,6 @@ class API(base.Base):
migration.dest_compute,
quotas.reservations or [])
@wrap_check_policy
@check_instance_lock
@check_instance_cell
@check_instance_state(vm_state=[vm_states.RESIZED])
@ -2602,7 +2555,6 @@ class API(base.Base):
'resize' or 'migration')
mig.create()
@wrap_check_policy
@check_instance_lock
@check_instance_cell
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED])
@ -2725,7 +2677,6 @@ class API(base.Base):
clean_shutdown=clean_shutdown,
request_spec=request_spec)
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED,
vm_states.PAUSED, vm_states.SUSPENDED])
@ -2751,7 +2702,6 @@ class API(base.Base):
self.compute_rpcapi.shelve_offload_instance(context,
instance=instance, clean_shutdown=clean_shutdown)
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.SHELVED])
def shelve_offload(self, context, instance, clean_shutdown=True):
@ -2762,7 +2712,6 @@ class API(base.Base):
self.compute_rpcapi.shelve_offload_instance(context, instance=instance,
clean_shutdown=clean_shutdown)
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.SHELVED,
vm_states.SHELVED_OFFLOADED])
@ -2783,21 +2732,18 @@ class API(base.Base):
self.compute_task_api.unshelve_instance(context, instance,
request_spec)
@wrap_check_policy
@check_instance_lock
def add_fixed_ip(self, context, instance, network_id):
"""Add fixed_ip from specified network to given instance."""
self.compute_rpcapi.add_fixed_ip_to_instance(context,
instance=instance, network_id=network_id)
@wrap_check_policy
@check_instance_lock
def remove_fixed_ip(self, context, instance, address):
"""Remove fixed_ip from specified network to given instance."""
self.compute_rpcapi.remove_fixed_ip_from_instance(context,
instance=instance, address=address)
@wrap_check_policy
@check_instance_lock
@check_instance_cell
@check_instance_state(vm_state=[vm_states.ACTIVE])
@ -2808,7 +2754,6 @@ class API(base.Base):
self._record_action_start(context, instance, instance_actions.PAUSE)
self.compute_rpcapi.pause_instance(context, instance)
@wrap_check_policy
@check_instance_lock
@check_instance_cell
@check_instance_state(vm_state=[vm_states.PAUSED])
@ -2819,18 +2764,15 @@ class API(base.Base):
self._record_action_start(context, instance, instance_actions.UNPAUSE)
self.compute_rpcapi.unpause_instance(context, instance)
@wrap_check_policy
def get_diagnostics(self, context, instance):
"""Retrieve diagnostics for the given instance."""
return self.compute_rpcapi.get_diagnostics(context, instance=instance)
@wrap_check_policy
def get_instance_diagnostics(self, context, instance):
"""Retrieve diagnostics for the given instance."""
return self.compute_rpcapi.get_instance_diagnostics(context,
instance=instance)
@wrap_check_policy
@check_instance_lock
@check_instance_cell
@check_instance_state(vm_state=[vm_states.ACTIVE])
@ -2841,7 +2783,6 @@ class API(base.Base):
self._record_action_start(context, instance, instance_actions.SUSPEND)
self.compute_rpcapi.suspend_instance(context, instance)
@wrap_check_policy
@check_instance_lock
@check_instance_cell
@check_instance_state(vm_state=[vm_states.SUSPENDED])
@ -2852,7 +2793,6 @@ class API(base.Base):
self._record_action_start(context, instance, instance_actions.RESUME)
self.compute_rpcapi.resume_instance(context, instance)
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED,
vm_states.ERROR])
@ -2880,7 +2820,6 @@ class API(base.Base):
rescue_password=rescue_password, rescue_image_ref=rescue_image_ref,
clean_shutdown=clean_shutdown)
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.RESCUED])
def unrescue(self, context, instance):
@ -2892,7 +2831,6 @@ class API(base.Base):
self.compute_rpcapi.unrescue_instance(context, instance=instance)
@wrap_check_policy
@check_instance_lock
@check_instance_cell
@check_instance_state(vm_state=[vm_states.ACTIVE])
@ -2913,7 +2851,6 @@ class API(base.Base):
instance=instance,
new_pass=password)
@wrap_check_policy
@check_instance_host
def get_vnc_console(self, context, instance, console_type):
"""Get a url to an instance Console."""
@ -2935,7 +2872,6 @@ class API(base.Base):
instance=instance, console_type=console_type)
return connect_info
@wrap_check_policy
@check_instance_host
def get_spice_console(self, context, instance, console_type):
"""Get a url to an instance Console."""
@ -2956,7 +2892,6 @@ class API(base.Base):
instance=instance, console_type=console_type)
return connect_info
@wrap_check_policy
@check_instance_host
def get_rdp_console(self, context, instance, console_type):
"""Get a url to an instance Console."""
@ -2977,7 +2912,6 @@ class API(base.Base):
instance=instance, console_type=console_type)
return connect_info
@wrap_check_policy
@check_instance_host
def get_serial_console(self, context, instance, console_type):
"""Get a url to a serial console."""
@ -2998,7 +2932,6 @@ class API(base.Base):
instance=instance, console_type=console_type)
return connect_info
@wrap_check_policy
@check_instance_host
def get_mks_console(self, context, instance, console_type):
"""Get a url to a MKS console."""
@ -3011,14 +2944,12 @@ class API(base.Base):
access_url=connect_info['access_url'])
return {'url': connect_info['access_url']}
@wrap_check_policy
@check_instance_host
def get_console_output(self, context, instance, tail_length=None):
"""Get console output for an instance."""
return self.compute_rpcapi.get_console_output(context,
instance=instance, tail_length=tail_length)
@wrap_check_policy
def lock(self, context, instance):
"""Lock the given instance."""
# Only update the lock if we are an admin (non-owner)
@ -3040,29 +2971,20 @@ class API(base.Base):
return False
return True
@wrap_check_policy
def unlock(self, context, instance):
"""Unlock the given instance."""
# If the instance was locked by someone else, check
# that we're allowed to override the lock
if not self.skip_policy_check and not self.is_expected_locked_by(
context, instance):
check_policy(context, 'unlock_override', instance)
context = context.elevated()
LOG.debug('Unlocking', context=context, instance=instance)
instance.locked = False
instance.locked_by = None
instance.save()
@wrap_check_policy
@check_instance_lock
@check_instance_cell
def reset_network(self, context, instance):
"""Reset networking on the instance."""
self.compute_rpcapi.reset_network(context, instance=instance)
@wrap_check_policy
@check_instance_lock
@check_instance_cell
def inject_network_info(self, context, instance):
@ -3148,7 +3070,6 @@ class API(base.Base):
return volume_bdm.device_name
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.PAUSED,
vm_states.STOPPED, vm_states.RESIZED,
@ -3208,7 +3129,6 @@ class API(base.Base):
context, volume['id'], instance.uuid)]
self._local_cleanup_bdm_volumes(bdms, instance, context)
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.PAUSED,
vm_states.STOPPED, vm_states.RESIZED,
@ -3221,7 +3141,6 @@ class API(base.Base):
else:
self._detach_volume(context, instance, volume)
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.PAUSED,
vm_states.SUSPENDED, vm_states.STOPPED,
@ -3255,7 +3174,6 @@ class API(base.Base):
self.volume_api.roll_detaching(context, old_volume['id'])
self.volume_api.unreserve_volume(context, new_volume['id'])
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.PAUSED,
vm_states.STOPPED],
@ -3267,7 +3185,6 @@ class API(base.Base):
instance=instance, network_id=network_id, port_id=port_id,
requested_ip=requested_ip)
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.PAUSED,
vm_states.STOPPED],
@ -3277,7 +3194,6 @@ class API(base.Base):
self.compute_rpcapi.detach_interface(context, instance=instance,
port_id=port_id)
@wrap_check_policy
def get_instance_metadata(self, context, instance):
"""Get all metadata associated with an instance."""
return self.db.instance_metadata_get(context, instance.uuid)
@ -3295,19 +3211,9 @@ class API(base.Base):
instances = self._get_instances_by_filters(context, filters={},
sort_keys=['created_at'],
sort_dirs=['desc'])
for instance in instances:
try:
check_policy(context, 'get_all_instance_%s' % metadata_type,
instance)
except exception.PolicyNotAuthorized:
# failed policy check - not allowed to
# read this metadata
continue
return utils.filter_and_format_resource_metadata('instance', instances,
search_filts, metadata_type)
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.PAUSED,
vm_states.SUSPENDED, vm_states.STOPPED],
@ -3319,7 +3225,6 @@ class API(base.Base):
instance=instance,
diff={key: ['-']})
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.PAUSED,
vm_states.SUSPENDED, vm_states.STOPPED],
@ -3546,7 +3451,6 @@ class API(base.Base):
return objects.Migration.get_by_id_and_instance(
context, migration_id, instance_uuid)
@wrap_check_policy
def volume_snapshot_create(self, context, volume_id, create_info):
bdm = objects.BlockDeviceMapping.get_by_volume(
context, volume_id, expected_attrs=['instance'])
@ -3560,7 +3464,6 @@ class API(base.Base):
}
return snapshot
@wrap_check_policy
def volume_snapshot_delete(self, context, volume_id, snapshot_id,
delete_info):
bdm = objects.BlockDeviceMapping.get_by_volume(

View File

@ -31,8 +31,6 @@ from nova import rpc
check_instance_state = compute_api.check_instance_state
wrap_check_policy = compute_api.wrap_check_policy
check_policy = compute_api.check_policy
check_instance_lock = compute_api.check_instance_lock
check_instance_cell = compute_api.check_instance_cell
@ -319,7 +317,6 @@ class ComputeCellsAPI(compute_api.API):
super(ComputeCellsAPI, self).unrescue(context, instance)
self._cast_to_cells(context, instance, 'unrescue')
@wrap_check_policy
@check_instance_cell
def shelve(self, context, instance, clean_shutdown=True):
"""Shelve the given instance."""
@ -328,7 +325,6 @@ class ComputeCellsAPI(compute_api.API):
self._cast_to_cells(context, instance, 'shelve',
clean_shutdown=clean_shutdown)
@wrap_check_policy
@check_instance_cell
def shelve_offload(self, context, instance, clean_shutdown=True):
"""Offload the shelved instance."""
@ -337,14 +333,12 @@ class ComputeCellsAPI(compute_api.API):
self._cast_to_cells(context, instance, 'shelve_offload',
clean_shutdown=clean_shutdown)
@wrap_check_policy
@check_instance_cell
def unshelve(self, context, instance):
"""Unshelve the given instance."""
super(ComputeCellsAPI, self).unshelve(context, instance)
self._cast_to_cells(context, instance, 'unshelve')
@wrap_check_policy
@check_instance_cell
def get_vnc_console(self, context, instance, console_type):
"""Get a url to a VNC Console."""
@ -360,7 +354,6 @@ class ComputeCellsAPI(compute_api.API):
instance.uuid, access_url=connect_info['access_url'])
return {'url': connect_info['access_url']}
@wrap_check_policy
@check_instance_cell
def get_spice_console(self, context, instance, console_type):
"""Get a url to a SPICE Console."""
@ -376,7 +369,6 @@ class ComputeCellsAPI(compute_api.API):
instance.uuid, access_url=connect_info['access_url'])
return {'url': connect_info['access_url']}
@wrap_check_policy
@check_instance_cell
def get_rdp_console(self, context, instance, console_type):
"""Get a url to a RDP Console."""
@ -392,7 +384,6 @@ class ComputeCellsAPI(compute_api.API):
instance.uuid, access_url=connect_info['access_url'])
return {'url': connect_info['access_url']}
@wrap_check_policy
@check_instance_cell
def get_serial_console(self, context, instance, console_type):
"""Get a url to a serial console."""
@ -434,7 +425,6 @@ class ComputeCellsAPI(compute_api.API):
self._cast_to_cells(context, instance, 'detach_volume',
volume)
@wrap_check_policy
@check_instance_cell
def associate_floating_ip(self, context, instance, address):
"""Makes calls to network_api to associate_floating_ip.
@ -452,7 +442,6 @@ class ComputeCellsAPI(compute_api.API):
self._cast_to_cells(context, instance, 'delete_instance_metadata',
key)
@wrap_check_policy
@check_instance_cell
def update_instance_metadata(self, context, instance,
metadata, delete=False):

View File

@ -63,6 +63,28 @@ class LockServerTestsV21(admin_only_action_common.CommonTests):
self.controller._unlock,
self.req, instance.uuid, body)
@mock.patch.object(common, 'get_instance')
def test_unlock_override_not_authorized_with_non_admin_user(
self, mock_get_instance):
instance = fake_instance.fake_instance_obj(self.context)
instance.locked_by = "owner"
mock_get_instance.return_value = instance
self.assertRaises(self.authorization_error,
self.controller._unlock, self.req,
instance.uuid,
{'unlock': None})
@mock.patch.object(common, 'get_instance')
def test_unlock_override_with_admin_user(self, mock_get_instance):
admin_req = fakes.HTTPRequest.blank('', use_admin_context=True)
admin_ctxt = admin_req.environ['nova.context']
instance = fake_instance.fake_instance_obj(admin_ctxt)
instance.locked_by = "owner"
mock_get_instance.return_value = instance
with mock.patch.object(self.compute_api, 'unlock') as mock_unlock:
self.controller._unlock(admin_req, instance.uuid, {'unlock': None})
mock_unlock.assert_called_once_with(admin_ctxt, instance)
class LockServerPolicyEnforcementV21(test.NoDBTestCase):

View File

@ -70,7 +70,6 @@ from nova.objects import block_device as block_device_obj
from nova.objects import fields as obj_fields
from nova.objects import instance as instance_obj
from nova.objects import migrate_data as migrate_data_obj
from nova import policy
from nova import quota
from nova.scheduler import client as scheduler_client
from nova import test
@ -4173,108 +4172,6 @@ class ComputeTestCase(BaseTestCase):
self.compute.terminate_instance(self.context, instance, [], [])
def test_lock(self):
# FIXME(comstud): This test is such crap. This is testing
# compute API lock functionality in a test class for the compute
# manager by running an instance. Hello? We should just have
# unit tests in test_compute_api that test the check_instance_lock
# decorator and make sure that appropriate compute_api methods
# have the decorator.
instance = self._create_fake_instance_obj()
instance_uuid = instance['uuid']
self.compute.build_and_run_instance(self.context, instance, {}, {}, {},
block_device_mapping=[])
non_admin_context = context.RequestContext(None,
None,
is_admin=False)
def check_task_state(task_state):
instance = db.instance_get_by_uuid(self.context, instance_uuid)
self.assertEqual(instance['task_state'], task_state)
instance.refresh()
# should fail with locked nonadmin context
self.compute_api.lock(self.context, instance)
self.assertRaises(exception.InstanceIsLocked,
self.compute_api.reboot,
non_admin_context, instance, 'SOFT')
check_task_state(None)
# should fail with invalid task state
self.compute_api.unlock(self.context, instance)
instance.task_state = task_states.REBOOTING
instance.save()
self.assertRaises(exception.InstanceInvalidState,
self.compute_api.reboot,
non_admin_context, instance, 'SOFT')
check_task_state(task_states.REBOOTING)
# should succeed with admin context
instance.task_state = None
instance.save()
self.compute_api.reboot(self.context, instance, 'SOFT')
check_task_state(task_states.REBOOTING)
self.compute.terminate_instance(self.context, instance, [], [])
def _check_locked_by(self, instance_uuid, locked_by):
instance = db.instance_get_by_uuid(self.context, instance_uuid)
self.assertEqual(instance['locked'], locked_by is not None)
self.assertEqual(instance['locked_by'], locked_by)
return instance
def test_override_owner_lock(self):
# FIXME(comstud): This test is such crap. This is testing
# compute API lock functionality in a test class for the compute
# manager by running an instance. Hello? We should just have
# unit tests in test_compute_api that test the check_instance_lock
# decorator and make sure that appropriate compute_api methods
# have the decorator.
admin_context = context.RequestContext('admin-user',
'admin-project',
is_admin=True)
instance = self._create_fake_instance_obj()
instance_uuid = instance['uuid']
self.compute.build_and_run_instance(self.context, instance, {}, {}, {},
block_device_mapping=[])
# Ensure that an admin can override the owner lock
self.compute_api.lock(self.context, instance)
self._check_locked_by(instance_uuid, 'owner')
self.compute_api.unlock(admin_context, instance)
self._check_locked_by(instance_uuid, None)
def test_upgrade_owner_lock(self):
# FIXME(comstud): This test is such crap. This is testing
# compute API lock functionality in a test class for the compute
# manager by running an instance. Hello? We should just have
# unit tests in test_compute_api that test the check_instance_lock
# decorator and make sure that appropriate compute_api methods
# have the decorator.
admin_context = context.RequestContext('admin-user',
'admin-project',
is_admin=True)
instance = self._create_fake_instance_obj()
instance_uuid = instance['uuid']
self.compute.build_and_run_instance(self.context, instance, {}, {}, {},
block_device_mapping=[])
# Ensure that an admin can upgrade the lock and that
# the owner can no longer unlock
self.compute_api.lock(self.context, instance)
self.compute_api.lock(admin_context, instance)
self._check_locked_by(instance_uuid, 'admin')
instance.refresh()
self.assertRaises(exception.PolicyNotAuthorized,
self.compute_api.unlock,
self.context, instance)
self._check_locked_by(instance_uuid, 'admin')
self.compute_api.unlock(admin_context, instance)
self._check_locked_by(instance_uuid, None)
def _test_state_revert(self, instance, operation, pre_task_state,
kwargs=None, vm_state=None):
if kwargs is None:
@ -11079,112 +10976,6 @@ class ComputeAggrTestCase(BaseTestCase):
slave_info="SLAVE_INFO")
class ComputePolicyTestCase(BaseTestCase):
def setUp(self):
super(ComputePolicyTestCase, self).setUp()
self.compute_api = compute.API()
def test_actions_are_prefixed(self):
self.mox.StubOutWithMock(policy, 'enforce')
nova.policy.enforce(self.context, 'compute:reboot', {})
self.mox.ReplayAll()
compute_api.check_policy(self.context, 'reboot', {})
def test_wrapped_method(self):
instance = self._create_fake_instance_obj(params={'host': None,
'cell_name': 'foo'})
# force delete to fail
rules = {"compute:delete": [["false:false"]]}
self.policy.set_rules(rules)
self.assertRaises(exception.PolicyNotAuthorized,
self.compute_api.delete, self.context, instance)
# reset rules to allow deletion
rules = {"compute:delete": []}
self.policy.set_rules(rules)
self.compute_api.delete(self.context, instance)
def test_create_fail(self):
rules = {"compute:create": [["false:false"]]}
self.policy.set_rules(rules)
self.assertRaises(exception.PolicyNotAuthorized,
self.compute_api.create, self.context, '1', '1')
def test_create_attach_volume_fail(self):
rules = {
"compute:create": [],
"compute:create:attach_network": [["false:false"]],
"compute:create:attach_volume": [],
}
self.policy.set_rules(rules)
self.assertRaises(exception.PolicyNotAuthorized,
self.compute_api.create, self.context, '1', '1',
requested_networks='blah',
block_device_mapping='blah')
def test_create_attach_network_fail(self):
rules = {
"compute:create": [],
"compute:create:attach_network": [],
"compute:create:attach_volume": [["false:false"]],
}
self.policy.set_rules(rules)
self.assertRaises(exception.PolicyNotAuthorized,
self.compute_api.create, self.context, '1', '1',
requested_networks='blah',
block_device_mapping='blah')
def test_get_fail(self):
instance = self._create_fake_instance_obj()
rules = {
"compute:get": [["false:false"]],
}
self.policy.set_rules(rules)
self.assertRaises(exception.PolicyNotAuthorized,
self.compute_api.get, self.context, instance['uuid'])
def test_get_all_fail(self):
rules = {
"compute:get_all": [["false:false"]],
}
self.policy.set_rules(rules)
self.assertRaises(exception.PolicyNotAuthorized,
self.compute_api.get_all, self.context)
def test_force_host_fail(self):
rules = {"compute:create": [],
"compute:create:forced_host": [["role:fake"]],
"network:validate_networks": []}
self.policy.set_rules(rules)
self.assertRaises(exception.PolicyNotAuthorized,
self.compute_api.create, self.context, None, '1',
availability_zone='1', forced_host='1')
def test_force_host_pass(self):
rules = {"compute:create": [],
"compute:create:forced_host": [],
"network:validate_networks": []}
self.policy.set_rules(rules)
self.compute_api.create(self.context,
objects.Flavor(id=1, disabled=False, memory_mb=256, vcpus=1,
root_gb=1, ephemeral_gb=1, swap=0),
image_href=uuids.host_instance, availability_zone='1',
forced_host='1')
class DisabledInstanceTypesTestCase(BaseTestCase):
"""Some instance-types are marked 'disabled' which means that they will not
show up in customer-facing listings. We do, however, want those

View File

@ -19,7 +19,6 @@ import iso8601
import mock
from mox3 import mox
from oslo_messaging import exceptions as oslo_exceptions
from oslo_policy import policy as oslo_policy
from oslo_serialization import jsonutils
from oslo_utils import fixture as utils_fixture
from oslo_utils import timeutils
@ -43,7 +42,6 @@ from nova import objects
from nova.objects import base as obj_base
from nova.objects import fields as fields_obj
from nova.objects import quotas as quotas_obj
from nova import policy
from nova import quota
from nova import test
from nova.tests.unit import fake_block_device
@ -3508,44 +3506,6 @@ class _ComputeAPIUnitTestMixIn(object):
'volume_id': 'volume_id'}]
self._test_check_and_transform_bdm(block_device_mapping)
@mock.patch.object(objects.Instance, 'save')
@mock.patch.object(objects.InstanceAction, 'action_start')
@mock.patch.object(compute_rpcapi.ComputeAPI, 'pause_instance')
@mock.patch.object(objects.Instance, 'get_by_uuid')
@mock.patch.object(compute_api.API, '_get_instances_by_filters',
return_value=[])
@mock.patch.object(compute_api.API, '_create_instance')
def test_skip_policy_check(self, mock_create, mock_get_ins_by_filters,
mock_get, mock_pause, mock_action, mock_save):
policy.reset()
rules = {'compute:pause': '!',
'compute:get': '!',
'compute:get_all': '!',
'compute:create': '!'}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
instance = self._create_instance_obj()
mock_get.return_value = instance
self.assertRaises(exception.PolicyNotAuthorized,
self.compute_api.pause, self.context, instance)
api = compute_api.API(skip_policy_check=True)
api.pause(self.context, instance)
self.assertRaises(exception.PolicyNotAuthorized,
self.compute_api.get, self.context, instance.uuid)
api = compute_api.API(skip_policy_check=True)
api.get(self.context, instance.uuid)
self.assertRaises(exception.PolicyNotAuthorized,
self.compute_api.get_all, self.context)
api = compute_api.API(skip_policy_check=True)
api.get_all(self.context)
self.assertRaises(exception.PolicyNotAuthorized,
self.compute_api.create, self.context, None, None)
api = compute_api.API(skip_policy_check=True)
api.create(self.context, None, None)
@mock.patch.object(compute_api.API, '_get_instances_by_filters')
def test_tenant_to_project_conversion(self, mock_get):
mock_get.return_value = []

View File

@ -436,17 +436,3 @@ class CellsConductorAPIRPCRedirect(test.NoDBTestCase):
tests.add(name[5:])
if tests != set(task_api.cells_compatible):
self.fail("Testcases not equivalent to cells_compatible list")
class CellsComputePolicyTestCase(test_compute.ComputePolicyTestCase):
def setUp(self):
super(CellsComputePolicyTestCase, self).setUp()
global ORIG_COMPUTE_API
ORIG_COMPUTE_API = self.compute_api
self.compute_api = compute_cells_api.ComputeCellsAPI()
deploy_stubs(self.stubs, self.compute_api)
def tearDown(self):
global ORIG_COMPUTE_API
self.compute_api = ORIG_COMPUTE_API
super(CellsComputePolicyTestCase, self).tearDown()

View File

@ -280,7 +280,7 @@ policy_data = """
"rule:admin_api or user_id:%(user_id)s",
"os_compute_api:os-lock-server:lock": "",
"os_compute_api:os-lock-server:unlock": "",
"os_compute_api:os-lock-server:unlock:unlock_override": "",
"os_compute_api:os-lock-server:unlock:unlock_override": "rule:admin_api",
"os_compute_api:os-migrate-server:migrate": "",
"os_compute_api:os-migrate-server:migrate_live": "",
"compute_extension:multinic": "",