Permit volume type operations for policy authorized users

Currently, following volume type operations are not
permitted for non admin users because these db operations
require admin context.

* create
* update
* delete
* type-access-add
* type-access-remove

In order to allow a cloud operator to use the policy based
user access control for these operations, a context during
these operations should be elevated before db operations.

After applying this change, the cloud operator can manage
policy for volume type operations like this.

1. To permit volume type operations for specific user,
   add "storage_type_admin" role.

2. Add "admin_or_storage_type_admin" rule to policy.json.
   "admin_or_storage_type_admin":
       "is_admin:True or role:storage_type_admin",

3. Modify rule for types_manage.
   "volume_extension:types_manage":
       "rule:admin_or_storage_type_admin",

Change-Id: I1e91ad6573f78cfa35c36209944ea1d074a17604
Closes-Bug: #1538305
This commit is contained in:
Mitsuhiro Tanino 2016-01-29 12:48:33 -05:00
parent b043410f39
commit 4ccd1bd151
4 changed files with 185 additions and 9 deletions

View File

@ -169,6 +169,33 @@ class VolumeTypesManageApiTest(test.TestCase):
self.controller._delete(req, 1)
self.assertEqual(1, len(self.notifier.notifications))
@mock.patch('cinder.volume.volume_types.destroy')
@mock.patch('cinder.volume.volume_types.get_volume_type')
@mock.patch('cinder.policy.enforce')
def test_volume_types_delete_with_non_admin(self, mock_policy_enforce,
mock_get, mock_destroy):
# allow policy authorized user to delete type
mock_policy_enforce.return_value = None
mock_get.return_value = \
{'extra_specs': {"key1": "value1"},
'id': 1,
'name': u'vol_type_1',
'description': u'vol_type_desc_1'}
mock_destroy.side_effect = return_volume_types_destroy
req = fakes.HTTPRequest.blank('/v2/fake/types/1',
use_admin_context=False)
self.assertEqual(0, len(self.notifier.notifications))
self.controller._delete(req, 1)
self.assertEqual(1, len(self.notifier.notifications))
# non policy authorized user fails to delete type
mock_policy_enforce.side_effect = (
exception.PolicyNotAuthorized(action='type_delete'))
self.assertRaises(exception.PolicyNotAuthorized,
self.controller._delete,
req, 1)
def test_create(self):
self.stubs.Set(volume_types, 'create',
return_volume_types_create)
@ -263,6 +290,41 @@ class VolumeTypesManageApiTest(test.TestCase):
body = {'volume_type': 'string'}
self._create_volume_type_bad_body(body=body)
@mock.patch('cinder.volume.volume_types.create')
@mock.patch('cinder.volume.volume_types.get_volume_type_by_name')
@mock.patch('cinder.policy.enforce')
def test_create_with_none_admin(self, mock_policy_enforce,
mock_get_volume_type_by_name,
mock_create_type):
# allow policy authorized user to create type
mock_policy_enforce.return_value = None
mock_get_volume_type_by_name.return_value = \
{'extra_specs': {"key1": "value1"},
'id': 1,
'name': u'vol_type_1',
'description': u'vol_type_desc_1'}
body = {"volume_type": {"name": "vol_type_1",
"os-volume-type-access:is_public": True,
"extra_specs": {"key1": "value1"}}}
req = fakes.HTTPRequest.blank('/v2/fake/types',
use_admin_context=False)
self.assertEqual(0, len(self.notifier.notifications))
res_dict = self.controller._create(req, body)
self.assertEqual(1, len(self.notifier.notifications))
self._check_test_results(res_dict, {
'expected_name': 'vol_type_1', 'expected_desc': 'vol_type_desc_1'})
# non policy authorized user fails to create type
mock_policy_enforce.side_effect = (
exception.PolicyNotAuthorized(action='type_create'))
self.assertRaises(exception.PolicyNotAuthorized,
self.controller._create,
req, body)
@mock.patch('cinder.volume.volume_types.update')
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_update(self, mock_get, mock_update):
@ -477,6 +539,39 @@ class VolumeTypesManageApiTest(test.TestCase):
'expected_desc': 'vol_type_desc_666'})
self.assertEqual(1, len(self.notifier.notifications))
@mock.patch('cinder.volume.volume_types.update')
@mock.patch('cinder.volume.volume_types.get_volume_type')
@mock.patch('cinder.policy.enforce')
def test_update_with_non_admin(self, mock_policy_enforce, mock_get,
mock_update):
# allow policy authorized user to update type
mock_policy_enforce.return_value = None
mock_get.return_value = return_volume_types_get_volume_type_updated(
'1', is_public=False)
body = {"volume_type": {"name": "vol_type_1_1",
"description": "vol_type_desc_1_1",
"is_public": False}}
req = fakes.HTTPRequest.blank('/v2/fake/types/1',
use_admin_context=False)
req.method = 'PUT'
self.assertEqual(0, len(self.notifier.notifications))
res_dict = self.controller._update(req, '1', body)
self.assertEqual(1, len(self.notifier.notifications))
self._check_test_results(res_dict,
{'expected_desc': 'vol_type_desc_1_1',
'expected_name': 'vol_type_1_1',
'is_public': False})
# non policy authorized user fails to update type
mock_policy_enforce.side_effect = (
exception.PolicyNotAuthorized(action='type_update'))
self.assertRaises(exception.PolicyNotAuthorized,
self.controller._update,
req, '1', body)
def _check_test_results(self, results, expected_results):
self.assertEqual(1, len(results))
self.assertEqual(expected_results['expected_desc'],

View File

@ -102,6 +102,45 @@ class VolumeTypeTestCase(test.TestCase):
new_type_name)
volume_types.destroy(self.ctxt, type_ref.id)
def test_volume_type_create_then_destroy_with_non_admin(self):
"""Ensure volume types can be created and deleted by non-admin user.
If a non-admn user is authorized at API, volume type operations
should be permitted.
"""
prev_all_vtypes = volume_types.get_all_types(self.ctxt)
self.ctxt = context.RequestContext('fake', 'fake', is_admin=False)
# create
type_ref = volume_types.create(self.ctxt,
self.vol_type1_name,
self.vol_type1_specs,
description=self.vol_type1_description)
new = volume_types.get_volume_type_by_name(self.ctxt,
self.vol_type1_name)
self.assertEqual(self.vol_type1_description, new['description'])
new_all_vtypes = volume_types.get_all_types(self.ctxt)
self.assertEqual(len(prev_all_vtypes) + 1,
len(new_all_vtypes),
'drive type was not created')
# update
new_type_name = self.vol_type1_name + '_updated'
new_type_desc = self.vol_type1_description + '_updated'
type_ref_updated = volume_types.update(self.ctxt,
type_ref.id,
new_type_name,
new_type_desc)
self.assertEqual(new_type_name, type_ref_updated['name'])
self.assertEqual(new_type_desc, type_ref_updated['description'])
# destroy
volume_types.destroy(self.ctxt, type_ref['id'])
new_all_vtypes = volume_types.get_all_types(self.ctxt)
self.assertEqual(prev_all_vtypes,
new_all_vtypes,
'drive type was not deleted')
def test_create_volume_type_with_invalid_params(self):
"""Ensure exception will be returned."""
vol_type_invalid_specs = "invalid_extra_specs"
@ -269,6 +308,29 @@ class VolumeTypeTestCase(test.TestCase):
vtype_access = db.volume_type_access_get_all(self.ctxt, vtype_id)
self.assertNotIn(project_id, vtype_access)
def test_add_access_with_non_admin(self):
self.ctxt = context.RequestContext('fake', 'fake', is_admin=False)
project_id = '456'
vtype = volume_types.create(self.ctxt, 'type1', is_public=False)
vtype_id = vtype.get('id')
volume_types.add_volume_type_access(self.ctxt, vtype_id, project_id)
vtype_access = db.volume_type_access_get_all(self.ctxt.elevated(),
vtype_id)
self.assertIn(project_id, [a.project_id for a in vtype_access])
def test_remove_access_with_non_admin(self):
self.ctxt = context.RequestContext('fake', 'fake', is_admin=False)
project_id = '456'
vtype = volume_types.create(self.ctxt, 'type1', projects=['456'],
is_public=False)
vtype_id = vtype.get('id')
volume_types.remove_volume_type_access(self.ctxt, vtype_id, project_id)
vtype_access = db.volume_type_access_get_all(self.ctxt.elevated(),
vtype_id)
self.assertNotIn(project_id, vtype_access)
def test_get_volume_type_qos_specs(self):
qos_ref = qos_specs.create(self.ctxt, 'qos-specs-1', {'k1': 'v1',
'k2': 'v2',

View File

@ -44,8 +44,9 @@ def create(context,
"""Creates volume types."""
extra_specs = extra_specs or {}
projects = projects or []
elevated = context if context.is_admin else context.elevated()
try:
type_ref = db.volume_type_create(context,
type_ref = db.volume_type_create(elevated,
dict(name=name,
extra_specs=extra_specs,
is_public=is_public,
@ -63,9 +64,10 @@ def update(context, id, name, description, is_public=None):
if id is None:
msg = _("id cannot be None")
raise exception.InvalidVolumeType(reason=msg)
old_volume_type = get_volume_type(context, id)
elevated = context if context.is_admin else context.elevated()
old_volume_type = get_volume_type(elevated, id)
try:
type_updated = db.volume_type_update(context,
type_updated = db.volume_type_update(elevated,
id,
dict(name=name,
description=description,
@ -74,7 +76,7 @@ def update(context, id, name, description, is_public=None):
if name:
old_type_name = old_volume_type.get('name')
if old_type_name != name:
QUOTAS.update_quota_resource(context,
QUOTAS.update_quota_resource(elevated,
old_type_name,
name)
except db_exc.DBError:
@ -89,7 +91,8 @@ def destroy(context, id):
msg = _("id cannot be None")
raise exception.InvalidVolumeType(reason=msg)
else:
db.volume_type_destroy(context, id)
elevated = context if context.is_admin else context.elevated()
db.volume_type_destroy(elevated, id)
def get_all_types(context, inactive=0, filters=None, marker=None,
@ -172,11 +175,12 @@ def add_volume_type_access(context, volume_type_id, project_id):
if volume_type_id is None:
msg = _("volume_type_id cannot be None")
raise exception.InvalidVolumeType(reason=msg)
if is_public_volume_type(context, volume_type_id):
elevated = context if context.is_admin else context.elevated()
if is_public_volume_type(elevated, volume_type_id):
msg = _("Type access modification is not applicable to public volume "
"type.")
raise exception.InvalidVolumeType(reason=msg)
return db.volume_type_access_add(context, volume_type_id, project_id)
return db.volume_type_access_add(elevated, volume_type_id, project_id)
def remove_volume_type_access(context, volume_type_id, project_id):
@ -184,11 +188,12 @@ def remove_volume_type_access(context, volume_type_id, project_id):
if volume_type_id is None:
msg = _("volume_type_id cannot be None")
raise exception.InvalidVolumeType(reason=msg)
if is_public_volume_type(context, volume_type_id):
elevated = context if context.is_admin else context.elevated()
if is_public_volume_type(elevated, volume_type_id):
msg = _("Type access modification is not applicable to public volume "
"type.")
raise exception.InvalidVolumeType(reason=msg)
return db.volume_type_access_remove(context, volume_type_id, project_id)
return db.volume_type_access_remove(elevated, volume_type_id, project_id)
def is_encrypted(context, volume_type_id):

View File

@ -0,0 +1,14 @@
---
fixes:
- |
Enabled a cloud operator to correctly manage policy for
volume type operations. To permit volume type operations
for specific user, you can for example do as follows.
* Add ``storage_type_admin`` role.
* Add ``admin_or_storage_type_admin`` rule to ``policy.json``, e.g.
``"admin_or_storage_type_admin": "is_admin:True or role:storage_type_admin",``
* Modify rule for types_manage and volume_type_access, e.g.
``"volume_extension:types_manage": "rule:admin_or_storage_type_admin",
"volume_extension:volume_type_access:addProjectAccess": "rule:admin_or_storage_type_admin",
"volume_extension:volume_type_access:removeProjectAccess": "rule:admin_or_storage_type_admin",``