Add missing 'target_obj' when perform policy check

Generally, we have to pass target object to ``authorize``
when enforce policy check,  but this is ignored during
our develop and review process for a long time, and the
potential issue is anyone can handle the target resource
as ``authorize`` will always succeed if rule is defined
``admin_or_owner`` [1]. Luckily, for most of those APIs
this security concern is protected by our database access
code [2] that only project scope resource is allowed.

However, there is one API that do have security issue when
administrator change the rule into "admin_or_owner".

1. "volume reset_status", which cinder will update the
resource directly in the database, procedure to reproduce
bug is described on the launchpad.

This patch intends to correct most of cases which can be
easily figured out in case of future code changes.

[1]:
73e6e3c147/cinder/context.py (L206)
[2]:
73e6e3c147/cinder/db/sqlalchemy/api.py (L3058)
[3]:
73e6e3c147/cinder/api/contrib/admin_actions.py (L161)

Partial-Bug: #1714858
Change-Id: I351b3ddf8dfe29da8d854d4038d64ca7be17390f
This commit is contained in:
TommyLike 2018-02-28 16:14:17 +00:00
parent 4fb3b64549
commit 7391070474
19 changed files with 127 additions and 60 deletions

View File

@ -78,11 +78,11 @@ class AdminController(wsgi.Controller):
explanation=_("Must specify a valid status"))
return update
def authorize(self, context, action_name):
def authorize(self, context, action_name, target_obj=None):
context.authorize(
'volume_extension:%(resource)s_admin_actions:%(action)s' %
{'resource': self.resource_name,
'action': action_name})
'action': action_name}, target_obj=target_obj)
def _remove_worker(self, context, id):
# Remove the cleanup worker from the DB when we change a resource
@ -107,7 +107,6 @@ class AdminController(wsgi.Controller):
'attached_mode')
context = req.environ['cinder.context']
self.authorize(context, 'reset_status')
update = self.validate_update(body['os-reset_status'])
msg = "Updating %(resource)s '%(id)s' with '%(update)r'"
LOG.debug(msg, {'resource': self.resource_name, 'id': id,
@ -132,9 +131,9 @@ class AdminController(wsgi.Controller):
def _force_delete(self, req, id, body):
"""Delete a resource, bypassing the check that it must be available."""
context = req.environ['cinder.context']
self.authorize(context, 'force_delete')
# Not found exception will be handled at the wsgi level
resource = self._get(context, id)
self.authorize(context, 'force_delete', target_obj=resource)
self._delete(context, resource, force=True)
@ -158,6 +157,10 @@ class VolumeAdminController(AdminController):
'none', 'starting',)
def _update(self, *args, **kwargs):
context = args[0]
volume_id = args[1]
volume = objects.Volume.get_by_id(context, volume_id)
self.authorize(context, 'reset_status', target_obj=volume)
db.volume_update(*args, **kwargs)
def _get(self, *args, **kwargs):
@ -204,9 +207,9 @@ class VolumeAdminController(AdminController):
def _force_detach(self, req, id, body):
"""Roll back a bad detach after the volume been disconnected."""
context = req.environ['cinder.context']
self.authorize(context, 'force_detach')
# Not found exception will be handled at the wsgi level
volume = self._get(context, id)
self.authorize(context, 'force_detach', target_obj=volume)
try:
connector = body['os-force_detach'].get('connector', None)
except AttributeError:
@ -242,9 +245,9 @@ class VolumeAdminController(AdminController):
def _migrate_volume(self, req, id, body):
"""Migrate a volume to the specified host."""
context = req.environ['cinder.context']
self.authorize(context, 'migrate_volume')
# Not found exception will be handled at the wsgi level
volume = self._get(context, id)
self.authorize(context, 'migrate_volume', target_obj=volume)
params = body['os-migrate_volume']
cluster_name, host = common.get_cluster_host(req, params,
@ -258,9 +261,9 @@ class VolumeAdminController(AdminController):
def _migrate_volume_completion(self, req, id, body):
"""Complete an in-progress migration."""
context = req.environ['cinder.context']
self.authorize(context, 'migrate_volume_completion')
# Not found exception will be handled at the wsgi level
volume = self._get(context, id)
self.authorize(context, 'migrate_volume_completion', target_obj=volume)
params = body['os-migrate_volume_completion']
try:
new_volume_id = params['new_volume']
@ -286,6 +289,7 @@ class SnapshotAdminController(AdminController):
snapshot_id = args[1]
fields = args[2]
snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
self.authorize(context, 'reset_status', target_obj=snapshot)
snapshot.update(fields)
snapshot.save()
@ -316,7 +320,6 @@ class BackupAdminController(AdminController):
def _reset_status(self, req, id, body):
"""Reset status on the resource."""
context = req.environ['cinder.context']
self.authorize(context, 'reset_status')
update = self.validate_update(body['os-reset_status'])
msg = "Updating %(resource)s '%(id)s' with '%(update)r'"
LOG.debug(msg, {'resource': self.resource_name, 'id': id,

View File

@ -40,8 +40,6 @@ class SnapshotActionsController(wsgi.Controller):
"""
context = req.environ['cinder.context']
context.authorize(policy.UPDATE_STATUS_POLICY)
LOG.debug("body: %s", body)
try:
status = body['os-update_snapshot_status']['status']
@ -59,6 +57,8 @@ class SnapshotActionsController(wsgi.Controller):
fields.SnapshotStatus.ERROR_DELETING]}
current_snapshot = objects.Snapshot.get_by_id(context, id)
context.authorize(policy.UPDATE_STATUS_POLICY,
target_obj=current_snapshot)
if current_snapshot.status not in status_map:
msg = _("Snapshot status %(cur)s not allowed for "

View File

@ -84,13 +84,13 @@ class SnapshotManageController(wsgi.Controller):
"""
context = req.environ['cinder.context']
context.authorize(policy.MANAGE_POLICY)
snapshot = body['snapshot']
# Check whether volume exists
volume_id = snapshot['volume_id']
# Not found exception will be handled at the wsgi level
volume = self.volume_api.get(context, volume_id)
context.authorize(policy.MANAGE_POLICY, target_obj=volume)
LOG.debug('Manage snapshot request body: %s', body)

View File

@ -46,12 +46,12 @@ class SnapshotUnmanageController(wsgi.Controller):
A Not Found error is returned if the specified snapshot does not exist.
"""
context = req.environ['cinder.context']
context.authorize(policy.UNMANAGE_POLICY)
LOG.info("Unmanage snapshot with id: %s", id)
try:
snapshot = self.volume_api.get_snapshot(context, id)
context.authorize(policy.UNMANAGE_POLICY, target_obj=snapshot)
self.volume_api.delete_snapshot(context, snapshot,
unmanage_only=True)
# Not found exception will be handled at the wsgi level

View File

@ -233,7 +233,7 @@ class VolumeActionsController(wsgi.Controller):
# Not found exception will be handled at the wsgi level
volume = self.volume_api.get(context, id)
context.authorize(policy.UPLOAD_IMAGE_POLICY)
context.authorize(policy.UPLOAD_IMAGE_POLICY, target_obj=volume)
# check for valid disk-format
disk_format = params.get("disk_format", "raw")
if not image_utils.validate_disk_format(disk_format):
@ -270,7 +270,8 @@ class VolumeActionsController(wsgi.Controller):
image_metadata['protected'] = params.get('protected', 'False')
if image_metadata['visibility'] == 'public':
context.authorize(policy.UPLOAD_PUBLIC_POLICY)
context.authorize(policy.UPLOAD_PUBLIC_POLICY,
target_obj=volume)
image_metadata['protected'] = (
utils.get_bool_param('protected', image_metadata))

View File

@ -18,6 +18,7 @@
from cinder.api import extensions
from cinder.api.openstack import wsgi
from cinder import db
from cinder import objects
from cinder.policies import volumes as policy
@ -27,7 +28,9 @@ class VolumeEncryptionMetadataController(wsgi.Controller):
def index(self, req, volume_id):
"""Returns the encryption metadata for a given volume."""
context = req.environ['cinder.context']
context.authorize(policy.ENCRYPTION_METADATA_POLICY)
volume = objects.Volume.get_by_id(context, volume_id)
context.authorize(policy.ENCRYPTION_METADATA_POLICY,
target_obj=volume)
return db.volume_encryption_metadata_get(context, volume_id)
def show(self, req, volume_id, id):

View File

@ -25,6 +25,7 @@ from cinder.api.schemas import volume_image_metadata
from cinder.api import validation
from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder.policies import volume_metadata as policy
from cinder import volume
@ -88,7 +89,9 @@ class VolumeImageMetadataController(wsgi.Controller):
@validation.schema(volume_image_metadata.set_image_metadata)
def create(self, req, id, body):
context = req.environ['cinder.context']
if context.authorize(policy.IMAGE_METADATA_POLICY):
volume = objects.Volume.get_by_id(context, id)
if context.authorize(policy.IMAGE_METADATA_POLICY,
target_obj=volume):
metadata = body['os-set_image_metadata']['metadata']
new_metadata = self._update_volume_image_metadata(context,
id,
@ -128,7 +131,8 @@ class VolumeImageMetadataController(wsgi.Controller):
def delete(self, req, id, body):
"""Deletes an existing image metadata."""
context = req.environ['cinder.context']
if context.authorize(policy.IMAGE_METADATA_POLICY):
volume = objects.Volume.get_by_id(context, id)
if context.authorize(policy.IMAGE_METADATA_POLICY, target_obj=volume):
key = body['os-unset_image_metadata']['key']
vol, metadata = self._get_image_metadata(context, id)

View File

@ -47,12 +47,12 @@ class VolumeUnmanageController(wsgi.Controller):
attached to an instance.
"""
context = req.environ['cinder.context']
context.authorize(policy.UNMANAGE_POLICY)
LOG.info("Unmanage volume with id: %s", id)
# Not found exception will be handled at the wsgi level
vol = self.volume_api.get(context, id)
context.authorize(policy.UNMANAGE_POLICY, target_obj=vol)
self.volume_api.delete(context, vol, unmanage_only=True)
return webob.Response(status_int=http_client.ACCEPTED)

View File

@ -57,7 +57,7 @@ class MessagesController(wsgi.Controller):
# Not found exception will be handled at the wsgi level
message = self.message_api.get(context, id)
context.authorize(policy.GET_POLICY)
context.authorize(policy.GET_POLICY, target_obj=message)
self._build_user_message(message)
return self._view_builder.detail(req, message)

View File

@ -66,11 +66,11 @@ class VolumeController(volumes_v2.VolumeController):
LOG.info("Delete volume with id: %(id)s %(params)s",
{'id': id, 'params': params}, context=context)
if force:
context.authorize(policy.FORCE_DELETE_POLICY)
volume = self.volume_api.get(context, id)
if force:
context.authorize(policy.FORCE_DELETE_POLICY, target_obj=volume)
self.volume_api.delete(context, volume,
cascade=cascade,
force=force)

View File

@ -36,6 +36,7 @@ from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder.objects import fields
from cinder.policies import backup_actions as backup_action_policy
from cinder.policies import backups as policy
import cinder.policy
from cinder import quota
@ -65,8 +66,9 @@ class API(base.Base):
super(API, self).__init__(db)
def get(self, context, backup_id):
context.authorize(policy.GET_POLICY)
return objects.Backup.get_by_id(context, backup_id)
backup = objects.Backup.get_by_id(context, backup_id)
context.authorize(policy.GET_POLICY, target_obj=backup)
return backup
def _check_support_to_force_delete(self, context, backup_host):
result = self.backup_rpcapi.check_support_to_force_delete(context,
@ -84,7 +86,7 @@ class API(base.Base):
:raises BackupDriverException:
:raises ServiceNotFound:
"""
context.authorize(policy.DELETE_POLICY)
context.authorize(policy.DELETE_POLICY, target_obj=backup)
if not force and backup.status not in [fields.BackupStatus.AVAILABLE,
fields.BackupStatus.ERROR]:
msg = _('Backup status must be available or error')
@ -198,8 +200,8 @@ class API(base.Base):
container, incremental=False, availability_zone=None,
force=False, snapshot_id=None, metadata=None):
"""Make the RPC call to create a volume backup."""
context.authorize(policy.CREATE_POLICY)
volume = self.volume_api.get(context, volume_id)
context.authorize(policy.CREATE_POLICY, target_obj=volume)
snapshot = None
if snapshot_id:
snapshot = self.volume_api.get_snapshot(context, snapshot_id)
@ -336,8 +338,8 @@ class API(base.Base):
def restore(self, context, backup_id, volume_id=None, name=None):
"""Make the RPC call to restore a volume backup."""
context.authorize(policy.RESTORE_POLICY)
backup = self.get(context, backup_id)
context.authorize(policy.RESTORE_POLICY, target_obj=backup)
if backup['status'] != fields.BackupStatus.AVAILABLE:
msg = _('Backup status must be available')
raise exception.InvalidBackup(reason=msg)
@ -421,6 +423,9 @@ class API(base.Base):
"""
# get backup info
backup = self.get(context, backup_id)
context.authorize(
backup_action_policy.BASE_POLICY_NAME % "reset_status",
target_obj=backup)
backup.host = self._get_available_backup_service_host(
backup.host, backup.availability_zone)
backup.save()
@ -439,8 +444,8 @@ class API(base.Base):
:returns: contains 'backup_url' and 'backup_service'
:raises InvalidBackup:
"""
context.authorize(policy.EXPORT_POLICY)
backup = self.get(context, backup_id)
context.authorize(policy.EXPORT_POLICY, target_obj=backup)
if backup['status'] != fields.BackupStatus.AVAILABLE:
msg = (_('Backup status must be available and not %s.') %
backup['status'])
@ -560,8 +565,8 @@ class API(base.Base):
return backup
def update(self, context, backup_id, fields):
context.authorize(policy.UPDATE_POLICY)
backup = self.get(context, backup_id)
context.authorize(policy.UPDATE_POLICY, target_obj=backup)
backup.update(fields)
backup.save()
return backup

View File

@ -877,7 +877,8 @@ class API(base.Base):
return group_snapshot
def delete_group_snapshot(self, context, group_snapshot, force=False):
context.authorize(gsnap_policy.DELETE_POLICY)
context.authorize(gsnap_policy.DELETE_POLICY,
target_obj=group_snapshot)
group_snapshot.assert_not_frozen()
values = {'status': 'deleting'}
expected = {'status': ('available', 'error')}
@ -904,15 +905,18 @@ class API(base.Base):
group_snapshot)
def update_group_snapshot(self, context, group_snapshot, fields):
context.authorize(gsnap_policy.UPDATE_POLICY)
context.authorize(gsnap_policy.UPDATE_POLICY,
target_obj=group_snapshot)
group_snapshot.update(fields)
group_snapshot.save()
def get_group_snapshot(self, context, group_snapshot_id):
context.authorize(gsnap_policy.GET_POLICY)
group_snapshots = objects.GroupSnapshot.get_by_id(context,
group_snapshot_id)
return group_snapshots
group_snapshot = objects.GroupSnapshot.get_by_id(context,
group_snapshot_id)
context.authorize(gsnap_policy.GET_POLICY,
target_obj=group_snapshot)
return group_snapshot
def get_all_group_snapshots(self, context, filters=None, marker=None,
limit=None, offset=None, sort_keys=None,
@ -936,7 +940,8 @@ class API(base.Base):
def reset_group_snapshot_status(self, context, gsnapshot, status):
"""Reset status of group snapshot"""
context.authorize(gsnap_action_policy.RESET_STATUS)
context.authorize(gsnap_action_policy.RESET_STATUS,
target_obj=gsnapshot)
field = {'updated_at': timeutils.utcnow(),
'status': status}
gsnapshot.update(field)

View File

@ -232,11 +232,12 @@ class AdminActionsTest(BaseAdminTest):
volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('error', volume['status'])
def test_reset_status_as_non_admin(self):
@mock.patch('cinder.objects.volume.Volume.get_by_id')
def test_reset_status_as_non_admin(self, fake_get):
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID)
volume = db.volume_create(self.ctx,
{'status': 'error', 'size': 1})
fake_get.return_value = volume
resp = self._issue_volume_reset(ctx,
volume,
{'status': 'error'})
@ -634,11 +635,13 @@ class AdminActionsTest(BaseAdminTest):
volume = self._migrate_volume_exec(self.ctx, volume, host,
expected_status)
def test_migrate_volume_as_non_admin(self):
@mock.patch("cinder.volume.api.API.get")
def test_migrate_volume_as_non_admin(self, fake_get):
expected_status = http_client.FORBIDDEN
host = 'test2'
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID)
volume = self._migrate_volume_prep()
fake_get.return_value = volume
self._migrate_volume_exec(ctx, volume, host, expected_status)
def test_migrate_volume_without_host_parameter(self):
@ -716,11 +719,13 @@ class AdminActionsTest(BaseAdminTest):
else:
self.assertNotIn('save_volume_id', resp_dict)
def test_migrate_volume_comp_as_non_admin(self):
@mock.patch("cinder.volume.api.API.get")
def test_migrate_volume_comp_as_non_admin(self, fake_get):
volume = db.volume_create(self.ctx, {'id': fake.VOLUME_ID})
new_volume = db.volume_create(self.ctx, {'id': fake.VOLUME2_ID})
expected_status = http_client.FORBIDDEN
expected_id = None
fake_get.return_value = volume
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID)
self._migrate_volume_comp_exec(ctx, volume, new_volume, False,
expected_status, expected_id)

View File

@ -105,8 +105,8 @@ class ExtendedSnapshotAttributesTest(test.TestCase):
self.assertSnapshotAttributes(self._get_snapshot(res.body),
project_id=fake.PROJECT_ID,
progress='0%')
calls = [mock.call(snap_policy.GET_POLICY), mock.call(
snap_policy.EXTEND_ATTRIBUTE, fatal=False)]
calls = [mock.call(snap_policy.GET_POLICY, target_obj=snapshot_obj),
mock.call(snap_policy.EXTEND_ATTRIBUTE, fatal=False)]
mock_authorize.assert_has_calls(calls)
@mock.patch('cinder.context.RequestContext.authorize')

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
import uuid
from oslo_policy import policy as oslo_policy
@ -201,7 +202,8 @@ class VolumeImageMetadataTest(test.TestCase):
self.assertEqual({'key1': 'value1', 'key2': 'value2'},
self._get_image_metadata_list(res.body)[0])
def test_create_image_metadata(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_create_image_metadata(self, fake_get):
self.mock_object(volume.api.API, 'get_volume_image_metadata',
return_empty_image_metadata)
self.mock_object(db, 'volume_metadata_update',
@ -213,6 +215,7 @@ class VolumeImageMetadataTest(test.TestCase):
req.method = "POST"
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
fake_get.return_value = {}
res = req.get_response(fakes.wsgi_app(
fake_auth_context=self.user_ctxt))
@ -220,12 +223,14 @@ class VolumeImageMetadataTest(test.TestCase):
self.assertEqual(fake_image_metadata,
jsonutils.loads(res.body)["metadata"])
def test_create_image_metadata_policy_not_authorized(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_create_image_metadata_policy_not_authorized(self, fake_get):
rules = {
metadata_policy.IMAGE_METADATA_POLICY: base_policy.RULE_ADMIN_API
}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
self.addCleanup(policy.reset)
fake_get.return_value = {}
req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
fake.PROJECT_ID, fake.VOLUME_ID), use_admin_context=False)
@ -241,13 +246,15 @@ class VolumeImageMetadataTest(test.TestCase):
self.controller.create, req, fake.VOLUME_ID,
body=body)
def test_create_with_keys_case_insensitive(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_create_with_keys_case_insensitive(self, fake_get):
# If the keys in uppercase_and_lowercase, should return the one
# which server added
self.mock_object(volume.api.API, 'get_volume_image_metadata',
return_empty_image_metadata)
self.mock_object(db, 'volume_metadata_update',
fake_create_volume_metadata)
fake_get.return_value = {}
body = {
"os-set_image_metadata": {
@ -272,11 +279,13 @@ class VolumeImageMetadataTest(test.TestCase):
self.assertEqual(fake_image_metadata,
jsonutils.loads(res.body)["metadata"])
def test_create_empty_body(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_create_empty_body(self, fake_get):
req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
fake.PROJECT_ID, fake.VOLUME_ID))
req.method = 'POST'
req.headers["content-type"] = "application/json"
fake_get.return_value = {}
self.assertRaises(exception.ValidationError,
self.controller.create, req, fake.VOLUME_ID,
@ -297,7 +306,8 @@ class VolumeImageMetadataTest(test.TestCase):
self.controller.create, req, fake.VOLUME_ID,
body=body)
def test_invalid_metadata_items_on_create(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_invalid_metadata_items_on_create(self, fake_get):
self.mock_object(db, 'volume_metadata_update',
fake_create_volume_metadata)
req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
@ -308,6 +318,7 @@ class VolumeImageMetadataTest(test.TestCase):
data = {"os-set_image_metadata": {
"metadata": {"a" * 260: "value1"}}
}
fake_get.return_value = {}
# Test for long key
req.body = jsonutils.dump_as_bytes(data)
@ -333,7 +344,8 @@ class VolumeImageMetadataTest(test.TestCase):
self.controller.create, req, fake.VOLUME_ID,
body=data)
def test_delete(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_delete(self, fake_get):
self.mock_object(db, 'volume_metadata_delete',
volume_metadata_delete)
@ -345,17 +357,20 @@ class VolumeImageMetadataTest(test.TestCase):
req.method = 'POST'
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
fake_get.return_value = {}
res = req.get_response(fakes.wsgi_app(
fake_auth_context=self.user_ctxt))
self.assertEqual(http_client.OK, res.status_int)
def test_delete_image_metadata_policy_not_authorized(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_delete_image_metadata_policy_not_authorized(self, fake_get):
rules = {
metadata_policy.IMAGE_METADATA_POLICY: base_policy.RULE_ADMIN_API
}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
self.addCleanup(policy.reset)
fake_get.return_value = {}
req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
fake.PROJECT_ID, fake.VOLUME_ID), use_admin_context=False)
@ -371,7 +386,8 @@ class VolumeImageMetadataTest(test.TestCase):
self.controller.delete, req, fake.VOLUME_ID,
body=None)
def test_delete_meta_not_found(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_delete_meta_not_found(self, fake_get):
data = {"os-unset_image_metadata": {
"key": "invalid_id"}
}
@ -380,12 +396,14 @@ class VolumeImageMetadataTest(test.TestCase):
req.method = 'POST'
req.body = jsonutils.dump_as_bytes(data)
req.headers["content-type"] = "application/json"
fake_get.return_value = {}
self.assertRaises(exception.GlanceMetadataNotFound,
self.controller.delete, req, fake.VOLUME_ID,
body=data)
def test_delete_nonexistent_volume(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_delete_nonexistent_volume(self, fake_get):
self.mock_object(db, 'volume_metadata_delete',
return_volume_nonexistent)
@ -397,6 +415,7 @@ class VolumeImageMetadataTest(test.TestCase):
req.method = 'POST'
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
fake_get.return_value = {}
self.assertRaises(exception.GlanceMetadataNotFound,
self.controller.delete, req, fake.VOLUME_ID,

View File

@ -25,6 +25,7 @@ from cinder import exception
import cinder.group
from cinder import objects
from cinder.objects import fields
from cinder.policies import group_snapshots as g_snap_policies
from cinder import quota
from cinder import test
from cinder.tests.unit import fake_constants as fake
@ -297,12 +298,16 @@ class GroupAPITestCase(test.TestCase):
remove_volumes=vol2.id)
@mock.patch('cinder.objects.GroupSnapshot.get_by_id')
def test_get_group_snapshot(self, mock_group_snap):
@mock.patch('cinder.context.RequestContext.authorize')
def test_get_group_snapshot(self, mock_authorize, mock_group_snap):
fake_group_snap = 'fake_group_snap'
mock_group_snap.return_value = fake_group_snap
grp_snap = self.group_api.get_group_snapshot(
self.ctxt, fake.GROUP_SNAPSHOT_ID)
self.assertEqual(fake_group_snap, grp_snap)
mock_authorize.assert_called_once_with(
g_snap_policies.GET_POLICY,
target_obj=fake_group_snap)
@ddt.data(True, False)
@mock.patch('cinder.objects.GroupSnapshotList.get_all')

View File

@ -17,6 +17,8 @@ from oslo_config import cfg
from cinder import context
from cinder import exception
from cinder import objects
from cinder.policies import volume_actions as vol_action_policies
from cinder.policies import volumes as volume_policies
from cinder import quota
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import volume as base
@ -63,8 +65,9 @@ class VolumeRetypeTestCase(base.BaseVolumeTestCase):
else:
return self.default_vol_type
@mock.patch('cinder.context.RequestContext.authorize')
@mock.patch.object(volume_types, 'get_by_name_or_id')
def test_retype_multiattach(self, _mock_get_types):
def test_retype_multiattach(self, _mock_get_types, mock_authorize):
"""Verify multiattach retype restrictions."""
_mock_get_types.side_effect = self.fake_get_vtype
@ -106,3 +109,15 @@ class VolumeRetypeTestCase(base.BaseVolumeTestCase):
self.context,
vol,
'multiattach-type')
mock_authorize.assert_has_calls(
[mock.call(volume_policies.CREATE_POLICY),
mock.call(volume_policies.CREATE_POLICY),
mock.call(vol_action_policies.RETYPE_POLICY, target_obj=mock.ANY),
mock.call(volume_policies.MULTIATTACH_POLICY,
target_obj=mock.ANY),
mock.call(volume_policies.CREATE_POLICY),
mock.call(volume_policies.MULTIATTACH_POLICY),
mock.call(volume_policies.CREATE_POLICY),
mock.call(vol_action_policies.RETYPE_POLICY, target_obj=mock.ANY),
mock.call(vol_action_policies.RETYPE_POLICY, target_obj=mock.ANY),
])

View File

@ -66,10 +66,10 @@ class API(base.Base):
def delete(self, context, transfer_id):
"""Make the RPC call to delete a volume transfer."""
context.authorize(policy.DELETE_POLICY)
transfer = self.db.transfer_get(context, transfer_id)
volume_ref = self.db.volume_get(context, transfer.volume_id)
context.authorize(policy.DELETE_POLICY, target_obj=volume_ref)
volume_utils.notify_about_volume_usage(context, volume_ref,
"transfer.delete.start")
if volume_ref['status'] != 'awaiting-transfer':
@ -115,9 +115,9 @@ class API(base.Base):
def create(self, context, volume_id, display_name):
"""Creates an entry in the transfers table."""
context.authorize(policy.CREATE_POLICY)
LOG.info("Generating transfer record for volume %s", volume_id)
volume_ref = self.db.volume_get(context, volume_id)
context.authorize(policy.CREATE_POLICY, target_obj=volume_ref)
if volume_ref['status'] != "available":
raise exception.InvalidVolume(reason=_("status must be available"))
if volume_ref['encryption_key_id'] is not None:

View File

@ -645,8 +645,8 @@ class API(base.Base):
return volumes
def get_snapshot(self, context, snapshot_id):
context.authorize(snapshot_policy.GET_POLICY)
snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
context.authorize(snapshot_policy.GET_POLICY, target_obj=snapshot)
# FIXME(jdg): The objects don't have the db name entries
# so build the resource tag manually for now.
@ -656,8 +656,8 @@ class API(base.Base):
return snapshot
def get_volume(self, context, volume_id):
context.authorize(vol_policy.GET_POLICY)
volume = objects.Volume.get_by_id(context, volume_id)
context.authorize(vol_policy.GET_POLICY, target_obj=volume)
LOG.info("Volume retrieved successfully.", resource=volume)
return volume
@ -863,7 +863,7 @@ class API(base.Base):
cgsnapshot_id,
commit_quota=True,
group_snapshot_id=None):
context.authorize(snapshot_policy.CREATE_POLICY)
context.authorize(snapshot_policy.CREATE_POLICY, target_obj=volume)
utils.check_metadata_properties(metadata)
if not volume.host:
@ -1656,7 +1656,8 @@ class API(base.Base):
# If they are retyping to a multiattach capable, make sure they
# are allowed to do so.
if tgt_is_multiattach:
context.authorize(vol_policy.MULTIATTACH_POLICY)
context.authorize(vol_policy.MULTIATTACH_POLICY,
target_obj=volume)
# We're checking here in so that we can report any quota issues as
# early as possible, but won't commit until we change the type. We
@ -2070,7 +2071,8 @@ class API(base.Base):
vref.status == 'in-use' and
vref.bootable):
ctxt.authorize(
attachment_policy.MULTIATTACH_BOOTABLE_VOLUME_POLICY)
attachment_policy.MULTIATTACH_BOOTABLE_VOLUME_POLICY,
target_obj=vref)
# FIXME(JDG): We want to be able to do things here like reserve a
# volume for Nova to do BFV WHILE the volume may be in the process of