Fix policy.py

Because inside check_policy() there is hardcoded
'share' target prepended to all policies, any
policy we check will be checked against 'share'
policy. Change check_policy() to use explicit
target and action instead of just action.
Change wrap_check_policy decorator to be a
decorator maker which accepts resource name
as an argument.
Closes-Bug: #1274951
Partial-Bug: #1271943

Change-Id: I85c184035619d78107d56ea94918f608d8d7c282
This commit is contained in:
Aleks Chirko 2014-01-31 18:37:11 +02:00
parent 9312341359
commit 826b15692e
5 changed files with 69 additions and 56 deletions

View File

@ -18,5 +18,9 @@
"share_extension:share_admin_actions:reset_status": [["rule:admin_api"]],
"share_extension:snapshot_admin_actions:reset_status": [["rule:admin_api"]],
"share_extension:services": [["rule:admin_api"]]
"share_extension:services": [["rule:admin_api"]],
"security_service:create": [["rule:admin_api"]],
"security_service:delete": [["rule:admin_api"]],
"security_service:update": [["rule:admin_api"]]
}

View File

@ -31,6 +31,7 @@ from manila.openstack.common import log as logging
from manila import policy
RESOURCE_NAME = 'security_service'
LOG = logging.getLogger(__name__)
@ -69,7 +70,8 @@ class SecurityServiceController(wsgi.Controller):
context = req.environ['manila.context']
try:
security_service = db.security_service_get(context, id)
policy.check_policy(context, 'show', security_service)
policy.check_policy(context, RESOURCE_NAME, 'show',
security_service)
except exception.NotFound:
raise exc.HTTPNotFound()
@ -84,7 +86,8 @@ class SecurityServiceController(wsgi.Controller):
try:
security_service = db.security_service_get(context, id)
policy.check_policy(context, 'show', security_service)
policy.check_policy(context, RESOURCE_NAME,
'delete', security_service)
db.security_service_delete(context, id)
except exception.NotFound:
raise exc.HTTPNotFound()
@ -108,7 +111,8 @@ class SecurityServiceController(wsgi.Controller):
builder.
"""
context = req.environ['manila.context']
policy.check_policy(context, 'get_all_security_services')
policy.check_policy(context, RESOURCE_NAME,
'get_all_security_services')
search_opts = {}
search_opts.update(req.GET)
@ -168,7 +172,8 @@ class SecurityServiceController(wsgi.Controller):
try:
security_service = db.security_service_get(context, id)
policy.check_policy(context, 'show', security_service)
policy.check_policy(context, RESOURCE_NAME, 'show',
security_service)
except exception.NotFound:
raise exc.HTTPNotFound()
@ -179,6 +184,7 @@ class SecurityServiceController(wsgi.Controller):
for key in valid_update_keys
if key in security_service_data])
policy.check_policy(context, RESOURCE_NAME, 'update', security_service)
security_service = db.security_service_update(context, id, update_dict)
return self._view_builder.detail(req, security_service)
@ -186,7 +192,7 @@ class SecurityServiceController(wsgi.Controller):
def create(self, req, body):
"""Creates a new security service."""
context = req.environ['manila.context']
policy.check_policy(context, 'create')
policy.check_policy(context, RESOURCE_NAME, 'create')
if not self.is_valid_body(body, 'security_service'):
raise exc.HTTPUnprocessableEntity()

View File

@ -106,25 +106,24 @@ def check_is_admin(roles):
return policy.enforce(match_list, target, credentials)
def wrap_check_policy(func):
def wrap_check_policy(resource):
"""Check policy corresponding to the wrapped methods prior to execution.
This decorator requires the first 3 args of the wrapped function
to be (self, context, share).
"""
@functools.wraps(func)
def wrapped(self, context, target_obj, *args, **kwargs):
check_policy(context, func.__name__, target_obj)
return func(self, context, target_obj, *args, **kwargs)
def check_policy_wraper(func):
@functools.wraps(func)
def wrapped(self, context, target_obj, *args, **kwargs):
check_policy(context, resource, func.__name__, target_obj)
return func(self, context, target_obj, *args, **kwargs)
return wrapped
return wrapped
return check_policy_wraper
def check_policy(context, action, target_obj=None):
def check_policy(context, resource, action, target_obj=None):
target = {
'project_id': context.project_id,
'user_id': context.user_id,
}
target.update(target_obj or {})
_action = 'share:%s' % action
_action = '%s:%s' % (resource, action)
enforce(context, _action, target)

View File

@ -52,7 +52,7 @@ class API(base.Base):
snapshot=None, availability_zone=None, metadata=None,
share_network_id=None):
"""Create new share."""
policy.check_policy(context, 'create')
policy.check_policy(context, 'share', 'create')
self._check_metadata_properties(context, metadata)
@ -164,7 +164,7 @@ class API(base.Base):
return share
@policy.wrap_check_policy
@policy.wrap_check_policy('share')
def delete(self, context, share):
"""Delete share."""
if context.is_admin and context.project_id != share['project_id']:
@ -205,7 +205,7 @@ class API(base.Base):
def create_snapshot(self, context, share, name, description,
force=False):
policy.check_policy(context, 'create_snapshot', share)
policy.check_policy(context, 'share', 'create_snapshot', share)
if ((not force) and (share['status'] != "available")):
msg = _("must be available")
@ -265,7 +265,7 @@ class API(base.Base):
self.share_rpcapi.create_snapshot(context, share, snapshot)
return snapshot
@policy.wrap_check_policy
@policy.wrap_check_policy('share')
def delete_snapshot(self, context, snapshot, force=False):
if not force and snapshot['status'] not in ["available", "error"]:
msg = _("Share Snapshot status must be available or ")
@ -276,21 +276,21 @@ class API(base.Base):
share = self.db.share_get(context, snapshot['share_id'])
self.share_rpcapi.delete_snapshot(context, snapshot, share['host'])
@policy.wrap_check_policy
@policy.wrap_check_policy('share')
def update(self, context, share, fields):
return self.db.share_update(context, share['id'], fields)
@policy.wrap_check_policy
@policy.wrap_check_policy('share')
def snapshot_update(self, context, snapshot, fields):
return self.db.share_snapshot_update(context, snapshot['id'], fields)
def get(self, context, share_id):
rv = self.db.share_get(context, share_id)
policy.check_policy(context, 'get', rv)
policy.check_policy(context, 'share', 'get', rv)
return rv
def get_all(self, context, search_opts={}):
policy.check_policy(context, 'get_all')
policy.check_policy(context, 'share', 'get_all')
search_opts = search_opts or {}
@ -317,12 +317,12 @@ class API(base.Base):
return shares
def get_snapshot(self, context, snapshot_id):
policy.check_policy(context, 'get_snapshot')
policy.check_policy(context, 'share', 'get_snapshot')
rv = self.db.share_snapshot_get(context, snapshot_id)
return dict(rv.iteritems())
def get_all_snapshots(self, context, search_opts=None):
policy.check_policy(context, 'get_all_snapshots')
policy.check_policy(context, 'share', 'get_all_snapshots')
search_opts = search_opts or {}
@ -356,7 +356,7 @@ class API(base.Base):
if share['status'] not in ["available"]:
msg = _("Share status must be available")
raise exception.InvalidShare(reason=msg)
policy.check_policy(ctx, 'allow_access')
policy.check_policy(ctx, 'share', 'allow_access')
values = {'share_id': share['id'],
'access_type': access_type,
'access_to': access_to}
@ -371,7 +371,7 @@ class API(base.Base):
def deny_access(self, ctx, share, access):
"""Deny access to share."""
policy.check_policy(ctx, 'deny_access')
policy.check_policy(ctx, 'share', 'deny_access')
#First check state of the target share
if not share['host']:
msg = _("Share host is None")
@ -394,7 +394,7 @@ class API(base.Base):
def access_get_all(self, context, share):
"""Returns all access rules for share."""
policy.check_policy(context, 'access_get_all')
policy.check_policy(context, 'share', 'access_get_all')
rules = self.db.share_access_get_all_for_share(context, share['id'])
return [{'id': rule.id,
'access_type': rule.access_type,
@ -403,17 +403,17 @@ class API(base.Base):
def access_get(self, context, access_id):
"""Returns access rule with the id."""
policy.check_policy(context, 'access_get')
policy.check_policy(context, 'share', 'access_get')
rule = self.db.share_access_get(context, access_id)
return rule
@policy.wrap_check_policy
@policy.wrap_check_policy('share')
def get_share_metadata(self, context, share):
"""Get all metadata associated with a share."""
rv = self.db.share_metadata_get(context, share['id'])
return dict(rv.iteritems())
@policy.wrap_check_policy
@policy.wrap_check_policy('share')
def delete_share_metadata(self, context, share, key):
"""Delete the given metadata item from a share."""
self.db.share_metadata_delete(context, share['id'], key)
@ -437,7 +437,7 @@ class API(base.Base):
LOG.warn(msg)
raise exception.InvalidShareMetadataSize(message=msg)
@policy.wrap_check_policy
@policy.wrap_check_policy('share')
def update_share_metadata(self, context, share, metadata, delete=False):
"""Updates or creates share metadata.

View File

@ -164,7 +164,8 @@ class ShareAPITestCase(test.TestCase):
'export_location': share['export_location']}
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'create_snapshot', share)
share_api.policy.check_policy(self.context, 'share',
'create_snapshot', share)
self.mox.StubOutWithMock(quota.QUOTAS, 'reserve')
quota.QUOTAS.reserve(self.context, snapshots=1, gigabytes=1).\
AndReturn('reservation')
@ -185,7 +186,7 @@ class ShareAPITestCase(test.TestCase):
status='available')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(
self.context, 'delete_snapshot', snapshot)
self.context, 'share', 'delete_snapshot', snapshot)
self.mox.StubOutWithMock(db_driver, 'share_snapshot_update')
db_driver.share_snapshot_update(self.context, snapshot['id'],
{'status': 'deleting'})
@ -202,7 +203,7 @@ class ShareAPITestCase(test.TestCase):
status='creating')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(
self.context, 'delete_snapshot', snapshot)
self.context, 'share', 'delete_snapshot', snapshot)
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShareSnapshot,
self.api.delete_snapshot, self.context, snapshot)
@ -211,7 +212,8 @@ class ShareAPITestCase(test.TestCase):
share = fake_share('fakeid',
status='error')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'create_snapshot', share)
share_api.policy.check_policy(self.context, 'share',
'create_snapshot', share)
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShare, self.api.create_snapshot,
self.context, share, 'fakename', 'fakedesc')
@ -250,7 +252,7 @@ class ShareAPITestCase(test.TestCase):
def test_get_snapshot(self):
fake_get_snap = {'fake_key': 'fake_val'}
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'get_snapshot')
share_api.policy.check_policy(self.context, 'share', 'get_snapshot')
self.mox.StubOutWithMock(db_driver, 'share_snapshot_get')
db_driver.share_snapshot_get(self.context,
'fakeid').AndReturn(fake_get_snap)
@ -342,7 +344,8 @@ class ShareAPITestCase(test.TestCase):
self.mox.StubOutWithMock(db_driver, 'share_get')
db_driver.share_get(self.context, 'fakeid').AndReturn('fakeshare')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'get', 'fakeshare')
share_api.policy.check_policy(self.context, 'share', 'get',
'fakeshare')
self.mox.ReplayAll()
result = self.api.get(self.context, 'fakeid')
self.assertEqual(result, 'fakeshare')
@ -350,7 +353,7 @@ class ShareAPITestCase(test.TestCase):
def test_get_all_admin_not_all_tenants(self):
ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=True)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(ctx, 'get_all')
share_api.policy.check_policy(ctx, 'share', 'get_all')
self.mox.StubOutWithMock(db_driver, 'share_get_all_by_project')
db_driver.share_get_all_by_project(ctx, 'fakepid')
self.mox.ReplayAll()
@ -358,7 +361,7 @@ class ShareAPITestCase(test.TestCase):
def test_get_all_admin_all_tenants(self):
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'get_all')
share_api.policy.check_policy(self.context, 'share', 'get_all')
self.mox.StubOutWithMock(db_driver, 'share_get_all')
db_driver.share_get_all(self.context)
self.mox.ReplayAll()
@ -367,7 +370,7 @@ class ShareAPITestCase(test.TestCase):
def test_get_all_not_admin(self):
ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=False)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(ctx, 'get_all')
share_api.policy.check_policy(ctx, 'share', 'get_all')
self.mox.StubOutWithMock(db_driver, 'share_get_all_by_project')
db_driver.share_get_all_by_project(ctx, 'fakepid')
self.mox.ReplayAll()
@ -378,7 +381,7 @@ class ShareAPITestCase(test.TestCase):
fake_objs = [{'name': 'fakename1'}, search_opts]
ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=False)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(ctx, 'get_all')
share_api.policy.check_policy(ctx, 'share', 'get_all')
self.mox.StubOutWithMock(db_driver, 'share_get_all_by_project')
db_driver.share_get_all_by_project(ctx,
'fakepid').AndReturn(fake_objs)
@ -389,7 +392,7 @@ class ShareAPITestCase(test.TestCase):
def test_get_all_snapshots_admin_not_all_tenants(self):
ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=True)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(ctx, 'get_all_snapshots')
share_api.policy.check_policy(ctx, 'share', 'get_all_snapshots')
self.mox.StubOutWithMock(db_driver,
'share_snapshot_get_all_by_project')
db_driver.share_snapshot_get_all_by_project(ctx, 'fakepid')
@ -398,7 +401,8 @@ class ShareAPITestCase(test.TestCase):
def test_get_all_snapshots_admin_all_tenants(self):
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'get_all_snapshots')
share_api.policy.check_policy(self.context, 'share',
'get_all_snapshots')
self.mox.StubOutWithMock(db_driver, 'share_snapshot_get_all')
db_driver.share_snapshot_get_all(self.context)
self.mox.ReplayAll()
@ -408,7 +412,7 @@ class ShareAPITestCase(test.TestCase):
def test_get_all_snapshots_not_admin(self):
ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=False)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(ctx, 'get_all_snapshots')
share_api.policy.check_policy(ctx, 'share', 'get_all_snapshots')
self.mox.StubOutWithMock(db_driver,
'share_snapshot_get_all_by_project')
db_driver.share_snapshot_get_all_by_project(ctx, 'fakepid')
@ -420,7 +424,7 @@ class ShareAPITestCase(test.TestCase):
fake_objs = [{'name': 'fakename1'}, search_opts]
ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=False)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(ctx, 'get_all_snapshots')
share_api.policy.check_policy(ctx, 'share', 'get_all_snapshots')
self.mox.StubOutWithMock(db_driver,
'share_snapshot_get_all_by_project')
db_driver.share_snapshot_get_all_by_project(ctx, 'fakepid').\
@ -435,7 +439,7 @@ class ShareAPITestCase(test.TestCase):
'access_type': 'fakeacctype',
'access_to': 'fakeaccto'}
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'allow_access')
share_api.policy.check_policy(self.context, 'share', 'allow_access')
self.mox.StubOutWithMock(db_driver, 'share_access_create')
db_driver.share_access_create(self.context, values).\
AndReturn('fakeacc')
@ -461,7 +465,7 @@ class ShareAPITestCase(test.TestCase):
share = fake_share('fakeid', status='available')
access = fake_access('fakaccid', state='fakeerror')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'deny_access')
share_api.policy.check_policy(self.context, 'share', 'deny_access')
self.mox.StubOutWithMock(db_driver, 'share_access_delete')
db_driver.share_access_delete(self.context, access['id'])
self.mox.ReplayAll()
@ -471,7 +475,7 @@ class ShareAPITestCase(test.TestCase):
share = fake_share('fakeid', status='available')
access = fake_access('fakaccid', state='fakeactive')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'deny_access')
share_api.policy.check_policy(self.context, 'share', 'deny_access')
self.mox.StubOutWithMock(db_driver, 'share_access_update')
db_driver.share_access_update(self.context, access['id'],
{'state': 'fakedeleting'})
@ -483,7 +487,7 @@ class ShareAPITestCase(test.TestCase):
share = fake_share('fakeid', status='available')
access = fake_access('fakaccid', state='fakenew')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'deny_access')
share_api.policy.check_policy(self.context, 'share', 'deny_access')
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShareAccess, self.api.deny_access,
self.context, share, access)
@ -491,7 +495,7 @@ class ShareAPITestCase(test.TestCase):
def test_deny_access_status_not_available(self):
share = fake_share('fakeid', status='error')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'deny_access')
share_api.policy.check_policy(self.context, 'share', 'deny_access')
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShare, self.api.deny_access,
self.context, share, 'fakeacc')
@ -499,14 +503,14 @@ class ShareAPITestCase(test.TestCase):
def test_deny_access_no_host(self):
share = fake_share('fakeid', host=None)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'deny_access')
share_api.policy.check_policy(self.context, 'share', 'deny_access')
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShare, self.api.deny_access,
self.context, share, 'fakeacc')
def test_access_get(self):
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'access_get')
share_api.policy.check_policy(self.context, 'share', 'access_get')
self.mox.StubOutWithMock(db_driver, 'share_access_get')
db_driver.share_access_get(self.context, 'fakeid').AndReturn('fake')
self.mox.ReplayAll()
@ -516,7 +520,7 @@ class ShareAPITestCase(test.TestCase):
def test_access_get_all(self):
share = fake_share('fakeid')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'access_get_all')
share_api.policy.check_policy(self.context, 'share', 'access_get_all')
self.mox.StubOutWithMock(db_driver, 'share_access_get_all_for_share')
db_driver.share_access_get_all_for_share(self.context, 'fakeid').\
AndReturn([fake_access('fakeacc0id', state='fakenew'),