Move metadef resource type association policy checks in the API

Moved policy checks for metadef resource typpe associations in
the API layer. Added additional functional tests for coverage.

Partially implements: blueprint policy-refactor

Change-Id: Icff756606429ecfd9183bdceba33aaff347bb0b7
Depends-On: https://review.opendev.org/c/openstack/grenade/+/803317
This commit is contained in:
Abhishek Kekane 2021-07-19 17:54:54 +00:00
parent 6ffd80f9c0
commit e92c664df1
5 changed files with 354 additions and 25 deletions

View File

@ -15,7 +15,6 @@
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import encodeutils
import six
from six.moves import http_client as http
import webob.exc
@ -26,6 +25,7 @@ from glance.api.v2.model.metadef_resource_type import ResourceType
from glance.api.v2.model.metadef_resource_type import ResourceTypeAssociation
from glance.api.v2.model.metadef_resource_type import ResourceTypeAssociations
from glance.api.v2.model.metadef_resource_type import ResourceTypes
from glance.api.v2 import policy as api_policy
from glance.common import exception
from glance.common import wsgi
import glance.db
@ -50,7 +50,17 @@ class ResourceTypeController(object):
try:
filters = {'namespace': None}
rs_type_repo = self.gateway.get_metadef_resource_type_repo(
req.context)
req.context, authorization_layer=False)
# NOTE(abhishekk): Here we are just checking if user is
# authorized to view/list metadef resource types or not.
# Also there is no relation between list_metadef_resource_types
# and get_metadef_resource_type policies so can not enforce
# get_metadef_resource_type policy on individual resource
# type here.
api_policy.MetadefAPIPolicy(
req.context,
enforcer=self.policy).list_metadef_resource_types()
db_resource_type_list = rs_type_repo.list(filters=filters)
resource_type_list = [ResourceType.to_wsme_model(
resource_type) for resource_type in db_resource_type_list]
@ -62,37 +72,76 @@ class ResourceTypeController(object):
raise webob.exc.HTTPForbidden(explanation=e.msg)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.msg)
except Exception as e:
LOG.error(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPInternalServerError(e)
return resource_types
def show(self, req, namespace):
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
# NOTE (abhishekk): Returning 404 Not Found as the
# namespace is outside of this user's project
msg = _("Namespace %s not found") % namespace
raise webob.exc.HTTPNotFound(explanation=msg)
try:
# NOTE(abhishekk): Here we are just checking if user is
# authorized to view/list metadef resource types or not.
# Each resource_type is checked against
# get_metadef_resource_type below.
api_policy.MetadefAPIPolicy(
req.context,
md_resource=namespace_obj,
enforcer=self.policy).list_metadef_resource_types()
filters = {'namespace': namespace}
rs_type_repo = self.gateway.get_metadef_resource_type_repo(
req.context)
db_resource_type_list = rs_type_repo.list(filters=filters)
resource_type_list = [ResourceTypeAssociation.to_wsme_model(
resource_type) for resource_type in db_resource_type_list]
req.context, authorization_layer=False)
db_type_list = rs_type_repo.list(filters=filters)
rs_type_list = [
ResourceTypeAssociation.to_wsme_model(
rs_type
) for rs_type in db_type_list if api_policy.MetadefAPIPolicy(
req.context, md_resource=rs_type.namespace,
enforcer=self.policy
).check('get_metadef_resource_type')]
resource_types = ResourceTypeAssociations()
resource_types.resource_type_associations = resource_type_list
resource_types.resource_type_associations = rs_type_list
except exception.Forbidden as e:
LOG.debug("User not permitted to retrieve metadata resource types "
"within '%s' namespace", namespace)
raise webob.exc.HTTPForbidden(explanation=e.msg)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.msg)
except Exception as e:
LOG.error(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPInternalServerError(e)
return resource_types
def create(self, req, resource_type, namespace):
rs_type_factory = self.gateway.get_metadef_resource_type_factory(
req.context)
rs_type_repo = self.gateway.get_metadef_resource_type_repo(req.context)
req.context, authorization_layer=False)
rs_type_repo = self.gateway.get_metadef_resource_type_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
# NOTE (abhishekk): Returning 404 Not Found as the
# namespace is outside of this user's project
msg = _("Namespace %s not found") % namespace
raise webob.exc.HTTPNotFound(explanation=msg)
try:
# NOTE(abhishekk): Metadef resource type is created for Metadef
# namespaces. Here we are just checking if user is authorized
# to create metadef resource types or not.
api_policy.MetadefAPIPolicy(
req.context,
md_resource=namespace_obj,
enforcer=self.policy).add_metadef_resource_type_association()
new_resource_type = rs_type_factory.new_resource_type(
namespace=namespace, **resource_type.to_dict())
rs_type_repo.add(new_resource_type)
@ -105,14 +154,31 @@ class ResourceTypeController(object):
raise webob.exc.HTTPNotFound(explanation=e.msg)
except exception.Duplicate as e:
raise webob.exc.HTTPConflict(explanation=e.msg)
except Exception as e:
LOG.error(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPInternalServerError()
return ResourceTypeAssociation.to_wsme_model(new_resource_type)
def delete(self, req, namespace, resource_type):
rs_type_repo = self.gateway.get_metadef_resource_type_repo(req.context)
rs_type_repo = self.gateway.get_metadef_resource_type_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
# NOTE (abhishekk): Returning 404 Not Found as the
# namespace is outside of this user's project
msg = _("Namespace %s not found") % namespace
raise webob.exc.HTTPNotFound(explanation=msg)
try:
# NOTE(abhishekk): Metadef resource type is created for Metadef
# namespaces. Here we are just checking if user is authorized
# to delete metadef resource types or not.
api_policy.MetadefAPIPolicy(
req.context,
md_resource=namespace_obj,
enforcer=self.policy
).remove_metadef_resource_type_association()
filters = {}
found = False
filters['namespace'] = namespace
@ -133,9 +199,6 @@ class ResourceTypeController(object):
"delete") % {'resourcetype': resource_type})
LOG.error(msg)
raise webob.exc.HTTPNotFound(explanation=msg)
except Exception as e:
LOG.error(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPInternalServerError()
class RequestDeserializer(wsgi.JSONRequestDeserializer):

View File

@ -257,6 +257,12 @@ class MetadefAPIPolicy(APIPolicyBase):
def list_metadef_resource_types(self):
self._enforce('list_metadef_resource_types')
def get_metadef_resource_type(self):
self._enforce('get_metadef_resource_type')
def remove_metadef_resource_type_association(self):
self._enforce('remove_metadef_resource_type_association')
class MemberAPIPolicy(APIPolicyBase):
def __init__(self, context, image, target=None, enforcer=None):

View File

@ -0,0 +1,250 @@
# Copyright 2021 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import oslo_policy.policy
from glance.api import policy
from glance.tests import functional
NAME_SPACE1 = {
"namespace": "MyNamespace",
"display_name": "My User Friendly Namespace",
"description": "My description"
}
RESOURCETYPE_1 = {
"name": "MyResourceType",
"prefix": "prefix_",
"properties_target": "temp"
}
RESOURCETYPE_2 = {
"name": "MySecondResourceType",
"prefix": "temp_prefix_",
"properties_target": "temp_2"
}
class TestMetadefResourceTypesPolicy(functional.SynchronousAPIBase):
def setUp(self):
super(TestMetadefResourceTypesPolicy, self).setUp()
self.policy = policy.Enforcer(suppress_deprecation_warnings=True)
def load_data(self, create_resourcetypes=False):
path = '/v2/metadefs/namespaces'
md_resource = self._create_metadef_resource(path=path,
data=NAME_SPACE1)
self.assertEqual('MyNamespace', md_resource['namespace'])
if create_resourcetypes:
namespace = md_resource['namespace']
path = '/v2/metadefs/namespaces/%s/resource_types' % namespace
for resource in [RESOURCETYPE_1, RESOURCETYPE_2]:
md_resource = self._create_metadef_resource(path=path,
data=resource)
self.assertEqual(resource['name'], md_resource['name'])
def set_policy_rules(self, rules):
self.policy.set_rules(
oslo_policy.policy.Rules.from_dict(rules),
overwrite=True)
def start_server(self):
with mock.patch.object(policy, 'Enforcer') as mock_enf:
mock_enf.return_value = self.policy
super(TestMetadefResourceTypesPolicy, self).start_server()
def _verify_forbidden_converted_to_not_found(self, path, method,
json=None):
# Note for other reviewers, these tests runs by default using
# admin role, to test this scenario we need private namespace
# of current project to be accessed by other projects non-admin
# user.
headers = self._headers({
'X-Tenant-Id': 'fake-tenant-id',
'X-Roles': 'member',
})
resp = self.api_request(method, path, headers=headers, json=json)
self.assertEqual(404, resp.status_code)
def test_namespace_resourcetypes_list_basic(self):
self.start_server()
# Create namespace and resourcetypes
self.load_data(create_resourcetypes=True)
# First make sure list resourcetypes works with default policy
namespace = NAME_SPACE1['namespace']
path = '/v2/metadefs/namespaces/%s/resource_types' % namespace
resp = self.api_get(path)
md_resource = resp.json
self.assertEqual(2, len(md_resource['resource_type_associations']))
# Now disable list_metadef_resource_types permissions and make
# sure any other attempts fail
self.set_policy_rules({
'list_metadef_resource_types': '!',
'get_metadef_namespace': '@',
})
resp = self.api_get(path)
self.assertEqual(403, resp.status_code)
# Now disable both permissions and make sure you will get
# 404 Not Found
self.set_policy_rules({
'list_metadef_resource_types': '!',
'get_metadef_namespace': '!',
})
resp = self.api_get(path)
# Note for reviewers, this causes our "check get if list fails"
# logic to return 404 as we expect, but not related to the latest
# rev that checks the namespace get operation first.
self.assertEqual(404, resp.status_code)
# Now enable list_metadef_resource_types and disable
# get_metadef_resource_type permission to make sure that you will get
# empty list as a response
self.set_policy_rules({
'list_metadef_resource_types': '@',
'get_metadef_resource_type': '!',
'get_metadef_namespace': '@',
})
resp = self.api_get(path)
md_resource = resp.json
self.assertEqual(0, len(md_resource['resource_type_associations']))
# Ensure accessing non visible namespace will catch 403 and
# return 404 to user
self.set_policy_rules({
'list_metadef_resource_types': '@',
'get_metadef_resource_type': '@',
'get_metadef_namespace': '@',
})
self._verify_forbidden_converted_to_not_found(path, 'GET')
def test_resourcetypes_list_basic(self):
self.start_server()
# Create namespace and resourcetypes
self.load_data(create_resourcetypes=True)
# First make sure list resourcetypes works with default policy
path = '/v2/metadefs/resource_types'
resp = self.api_get(path)
md_resource = resp.json
# NOTE(abhishekk): /v2/metadefs/resource_types returns list which
# contains all resource_types in a dictionary, so the length will
# always be 1 here.
self.assertEqual(1, len(md_resource))
# Now disable get_metadef_resource_type permissions and make
# sure any other attempts fail
self.set_policy_rules({
'list_metadef_resource_types': '!',
'get_metadef_namespace': '@',
})
resp = self.api_get(path)
self.assertEqual(403, resp.status_code)
def test_resourcetype_create_basic(self):
self.start_server()
# Create namespace
self.load_data()
# First make sure create resourcetype works with default policy
namespace = NAME_SPACE1['namespace']
path = '/v2/metadefs/namespaces/%s/resource_types' % namespace
md_resource = self._create_metadef_resource(path=path,
data=RESOURCETYPE_1)
self.assertEqual('MyResourceType', md_resource['name'])
# Now disable add_metadef_resource_type_association permissions
# and make sure any other attempts fail
self.set_policy_rules({
'add_metadef_resource_type_association': '!',
'get_metadef_namespace': '@',
})
resp = self.api_post(path, json=RESOURCETYPE_2)
self.assertEqual(403, resp.status_code)
# Now disable both permissions and make sure you will get
# 404 Not Found
self.set_policy_rules({
'add_metadef_resource_type_association': '!',
'get_metadef_namespace': '!',
})
resp = self.api_post(path, json=RESOURCETYPE_2)
# Note for reviewers, this causes our "check get if create fails"
# logic to return 404 as we expect, but not related to the latest
# rev that checks the namespace get operation first.
self.assertEqual(404, resp.status_code)
# Ensure accessing non visible namespace will catch 403 and
# return 404 to user
self.set_policy_rules({
'add_metadef_resource_type_association': '@',
'get_metadef_namespace': '@',
})
self._verify_forbidden_converted_to_not_found(path, 'POST',
json=RESOURCETYPE_2)
def test_object_delete_basic(self):
self.start_server()
# Create namespace and objects
self.load_data(create_resourcetypes=True)
# Now ensure you are able to delete the resource_types
path = '/v2/metadefs/namespaces/%s/resource_types/%s' % (
NAME_SPACE1['namespace'], RESOURCETYPE_1['name'])
resp = self.api_delete(path)
self.assertEqual(204, resp.status_code)
# Verify that resource_type is deleted
namespace = NAME_SPACE1['namespace']
path = '/v2/metadefs/namespaces/%s/resource_types' % namespace
resp = self.api_get(path)
md_resource = resp.json
# assert namespace has only one resource type association
self.assertEqual(1, len(md_resource['resource_type_associations']))
# assert deleted association is not present in response
for resource in md_resource['resource_type_associations']:
self.assertNotEqual(RESOURCETYPE_1['name'], resource['name'])
# Now disable remove_metadef_resource_type_association permissions
# and make sure any other attempts fail
path = '/v2/metadefs/namespaces/%s/resource_types/%s' % (
NAME_SPACE1['namespace'], RESOURCETYPE_2['name'])
self.set_policy_rules({
'remove_metadef_resource_type_association': '!',
'get_metadef_namespace': '@',
})
resp = self.api_delete(path)
self.assertEqual(403, resp.status_code)
# Now disable both permissions and make sure you will get
# 404 Not Found
self.set_policy_rules({
'remove_metadef_resource_type_association': '!',
'get_metadef_namespace': '!',
})
resp = self.api_delete(path)
# Note for reviewers, this causes our "check get if delete fails"
# logic to return 404 as we expect, but not related to the latest
# rev that checks the namespace get operation first.
self.assertEqual(404, resp.status_code)
# Ensure accessing non visible namespace will catch 403 and
# return 404 to user
self.set_policy_rules({
'remove_metadef_resource_type_association': '@',
'get_metadef_namespace': '@',
})
self._verify_forbidden_converted_to_not_found(path, 'DELETE')

View File

@ -1687,7 +1687,7 @@ class TestMetadefsControllers(base.IsolatedUnitTest):
def test_resource_type_show_non_visible(self):
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPForbidden, self.rt_controller.show,
self.assertRaises(webob.exc.HTTPNotFound, self.rt_controller.show,
request, NAMESPACE2)
def test_resource_type_show_non_visible_admin(self):
@ -1754,7 +1754,7 @@ class TestMetadefsControllers(base.IsolatedUnitTest):
def test_resource_type_association_delete_non_visible(self):
request = unit_test_utils.get_fake_request(tenant=TENANT3)
self.assertRaises(webob.exc.HTTPForbidden, self.rt_controller.delete,
self.assertRaises(webob.exc.HTTPNotFound, self.rt_controller.delete,
request, NAMESPACE1, RESOURCE_TYPE1)
self.assertNotificationsLog([])
@ -1827,7 +1827,7 @@ class TestMetadefsControllers(base.IsolatedUnitTest):
rt = resource_types.ResourceTypeAssociation()
rt.name = RESOURCE_TYPE2
rt.prefix = 'pref'
self.assertRaises(webob.exc.HTTPForbidden, self.rt_controller.create,
self.assertRaises(webob.exc.HTTPNotFound, self.rt_controller.create,
request, rt, NAMESPACE1)
self.assertNotificationsLog([])

View File

@ -413,6 +413,16 @@ class TestMetadefAPIPolicy(APIPolicyBase):
mock.call(mock.ANY, 'modify_metadef_namespace', mock.ANY),
mock.call(mock.ANY, 'get_metadef_namespace', mock.ANY)])
def test_get_metadef_resource_type(self):
self.policy.get_metadef_resource_type()
self.enforcer.enforce.assert_called_once_with(
self.context, 'get_metadef_resource_type', mock.ANY)
def test_remove_metadef_resource_type_association(self):
self.policy.remove_metadef_resource_type_association()
self.enforcer.enforce.assert_called_once_with(
self.context, 'remove_metadef_resource_type_association', mock.ANY)
class TestMemberAPIPolicy(utils.BaseTestCase):
def setUp(self):