Move metadef namespace policy checks in the API

This change also includes change in index method for 'GET' all
namespaces to not include properties, objects, tags, associations
etc. in the response if policies of either one is disabled.

The MetadefAPIPolicy object will raise Forbidden for cases where an
operation is forbidden, but the namespace would be otherwise visible to
the user, and NotFound otherwise to obscure the existence of namespaces
that the user can not otherwise see.

Partially implements: blueprint policy-refactor

Change-Id: I59cd3bbd98065457bf0c9cfaf6888b7f56f7617c
Depends-On: https://review.opendev.org/c/openstack/grenade/+/803317
This commit is contained in:
Abhishek Kekane
2021-07-16 19:44:29 +00:00
parent d9b8023410
commit 99331d7efa
6 changed files with 1182 additions and 59 deletions

View File

@@ -30,6 +30,7 @@ from glance.api.v2.model.metadef_object import MetadefObject
from glance.api.v2.model.metadef_property_type import PropertyType
from glance.api.v2.model.metadef_resource_type import ResourceTypeAssociation
from glance.api.v2.model.metadef_tag import MetadefTag
from glance.api.v2 import policy as api_policy
from glance.common import exception
from glance.common import utils
from glance.common import wsgi
@@ -60,7 +61,21 @@ class NamespaceController(object):
def index(self, req, marker=None, limit=None, sort_key='created_at',
sort_dir='desc', filters=None):
try:
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
policy_check = api_policy.MetadefAPIPolicy(
req.context,
enforcer=self.policy)
# NOTE(abhishekk): This is just a "do you have permission to
# list namespace" check. Each namespace is checked against
# get_metadef_namespace below.
policy_check.get_metadef_namespaces()
# NOTE(abhishekk): We also need to fetch resource_types associated
# with namespaces, so better to check we have permission for the
# same in advance.
policy_check.list_metadef_resource_types()
# Get namespace id
if marker:
@@ -70,15 +85,25 @@ class NamespaceController(object):
database_ns_list = ns_repo.list(
marker=marker, limit=limit, sort_key=sort_key,
sort_dir=sort_dir, filters=filters)
for db_namespace in database_ns_list:
ns_list = [
ns for ns in database_ns_list if api_policy.MetadefAPIPolicy(
req.context, md_resource=ns, enforcer=self.policy).check(
'get_metadef_namespace')]
rs_repo = (
self.gateway.get_metadef_resource_type_repo(
req.context, authorization_layer=False))
for db_namespace in ns_list:
# Get resource type associations
filters = dict()
filters['namespace'] = db_namespace.namespace
rs_repo = (
self.gateway.get_metadef_resource_type_repo(req.context))
repo_rs_type_list = rs_repo.list(filters=filters)
resource_type_list = [ResourceTypeAssociation.to_wsme_model(
resource_type) for resource_type in repo_rs_type_list]
resource_type_list = [
ResourceTypeAssociation.to_wsme_model(
resource_type
) for resource_type in repo_rs_type_list]
if resource_type_list:
db_namespace.resource_type_associations = (
resource_type_list)
@@ -86,11 +111,11 @@ class NamespaceController(object):
namespace_list = [Namespace.to_wsme_model(
db_namespace,
get_namespace_href(db_namespace),
self.ns_schema_link) for db_namespace in database_ns_list]
self.ns_schema_link) for db_namespace in ns_list]
namespaces = Namespaces()
namespaces.namespaces = namespace_list
if len(namespace_list) != 0 and len(namespace_list) == limit:
namespaces.next = namespace_list[-1].namespace
namespaces.next = ns_list[-1].namespace
except exception.Forbidden as e:
LOG.debug("User not permitted to retrieve metadata namespaces "
@@ -98,9 +123,6 @@ class NamespaceController(object):
raise webob.exc.HTTPForbidden(explanation=e.msg)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.msg)
except Exception as e:
LOG.error(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPInternalServerError()
return namespaces
@utils.mutating
@@ -109,18 +131,42 @@ class NamespaceController(object):
namespace_created = False
# Create Namespace
ns_factory = self.gateway.get_metadef_namespace_factory(
req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
new_namespace = ns_factory.new_namespace(**namespace.to_dict())
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
# NOTE(abhishekk): Here we are going to check if user is authorized
# to create namespace, resource_types, objects, properties etc.
policy_check = api_policy.MetadefAPIPolicy(
req.context, enforcer=self.policy)
policy_check.add_metadef_namespace()
if namespace.resource_type_associations:
policy_check.add_metadef_resource_type_association()
if namespace.objects:
policy_check.add_metadef_object()
if namespace.properties:
policy_check.add_metadef_property()
if namespace.tags:
policy_check.add_metadef_tag()
# NOTE(abhishekk): As we are getting rid of auth layer, this
# is the place where we should add owner if it is not specified
# in request.
kwargs = namespace.to_dict()
if 'owner' not in kwargs:
kwargs.update({'owner': req.context.owner})
new_namespace = ns_factory.new_namespace(**kwargs)
ns_repo.add(new_namespace)
namespace_created = True
# Create Resource Types
if namespace.resource_type_associations:
rs_factory = (self.gateway.get_metadef_resource_type_factory(
req.context))
req.context, authorization_layer=False))
rs_repo = self.gateway.get_metadef_resource_type_repo(
req.context)
req.context, authorization_layer=False)
for resource_type in namespace.resource_type_associations:
new_resource = rs_factory.new_resource_type(
namespace=namespace.namespace,
@@ -130,9 +176,9 @@ class NamespaceController(object):
# Create Objects
if namespace.objects:
object_factory = self.gateway.get_metadef_object_factory(
req.context)
req.context, authorization_layer=False)
object_repo = self.gateway.get_metadef_object_repo(
req.context)
req.context, authorization_layer=False)
for metadata_object in namespace.objects:
new_meta_object = object_factory.new_object(
namespace=namespace.namespace,
@@ -142,8 +188,9 @@ class NamespaceController(object):
# Create Tags
if namespace.tags:
tag_factory = self.gateway.get_metadef_tag_factory(
req.context)
tag_repo = self.gateway.get_metadef_tag_repo(req.context)
req.context, authorization_layer=False)
tag_repo = self.gateway.get_metadef_tag_repo(
req.context, authorization_layer=False)
for metadata_tag in namespace.tags:
new_meta_tag = tag_factory.new_tag(
namespace=namespace.namespace,
@@ -153,9 +200,9 @@ class NamespaceController(object):
# Create Namespace Properties
if namespace.properties:
prop_factory = (self.gateway.get_metadef_property_factory(
req.context))
req.context, authorization_layer=False))
prop_repo = self.gateway.get_metadef_property_repo(
req.context)
req.context, authorization_layer=False)
for (name, value) in namespace.properties.items():
new_property_type = (
prop_factory.new_namespace_property(
@@ -177,9 +224,6 @@ class NamespaceController(object):
except exception.Duplicate as e:
self._cleanup_namespace(ns_repo, namespace, namespace_created)
raise webob.exc.HTTPConflict(explanation=e.msg)
except Exception as e:
LOG.error(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPInternalServerError()
# Return the user namespace as we don't expose the id to user
new_namespace.properties = namespace.properties
@@ -216,8 +260,30 @@ class NamespaceController(object):
def show(self, req, namespace, filters=None):
try:
# Get namespace
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
namespace_obj = ns_repo.get(namespace)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
try:
namespace_obj = ns_repo.get(namespace)
policy_check = api_policy.MetadefAPIPolicy(
req.context,
md_resource=namespace_obj,
enforcer=self.policy)
policy_check.get_metadef_namespace()
except webob.exc.HTTPForbidden:
LOG.debug("User not permitted to show namespace '%s'",
namespace)
# NOTE (abhishekk): Returning 404 Not Found as the
# namespace is outside of this user's project
raise webob.exc.HTTPNotFound()
# NOTE(abhishekk): We also need to fetch resource_types, objects,
# properties, tags associated with namespace, so better to check
# whether user has permissions for the same.
policy_check.list_metadef_resource_types()
policy_check.get_metadef_objects()
policy_check.get_metadef_properties()
policy_check.get_metadef_tags()
namespace_detail = Namespace.to_wsme_model(
namespace_obj,
get_namespace_href(namespace_obj),
@@ -226,7 +292,8 @@ class NamespaceController(object):
ns_filters['namespace'] = namespace
# Get objects
object_repo = self.gateway.get_metadef_object_repo(req.context)
object_repo = self.gateway.get_metadef_object_repo(
req.context, authorization_layer=False)
db_metaobject_list = object_repo.list(filters=ns_filters)
object_list = [MetadefObject.to_wsme_model(
db_metaobject,
@@ -236,7 +303,8 @@ class NamespaceController(object):
namespace_detail.objects = object_list
# Get resource type associations
rs_repo = self.gateway.get_metadef_resource_type_repo(req.context)
rs_repo = self.gateway.get_metadef_resource_type_repo(
req.context, authorization_layer=False)
db_resource_type_list = rs_repo.list(filters=ns_filters)
resource_type_list = [ResourceTypeAssociation.to_wsme_model(
resource_type) for resource_type in db_resource_type_list]
@@ -245,7 +313,8 @@ class NamespaceController(object):
resource_type_list)
# Get properties
prop_repo = self.gateway.get_metadef_property_repo(req.context)
prop_repo = self.gateway.get_metadef_property_repo(
req.context, authorization_layer=False)
db_properties = prop_repo.list(filters=ns_filters)
property_list = Namespace.to_model_properties(db_properties)
if property_list:
@@ -256,7 +325,8 @@ class NamespaceController(object):
namespace_detail, filters['resource_type'])
# Get tags
tag_repo = self.gateway.get_metadef_tag_repo(req.context)
tag_repo = self.gateway.get_metadef_tag_repo(
req.context, authorization_layer=False)
db_metatag_list = tag_repo.list(filters=ns_filters)
tag_list = [MetadefTag(**{'name': db_metatag.name})
for db_metatag in db_metatag_list]
@@ -269,15 +339,27 @@ class NamespaceController(object):
raise webob.exc.HTTPForbidden(explanation=e.msg)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.msg)
except Exception as e:
LOG.error(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPInternalServerError()
return namespace_detail
def update(self, req, user_ns, namespace):
namespace_repo = self.gateway.get_metadef_namespace_repo(req.context)
namespace_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
try:
ns_obj = namespace_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
# NOTE (abhishekk): Returning 404 Not Found as the
# namespace is outside of this user's project
msg = _("Namespace %s not found") % namespace
raise webob.exc.HTTPNotFound(explanation=msg)
try:
# NOTE(abhishekk): Here we are just checking if use is authorized
# to modify the namespace or not
api_policy.MetadefAPIPolicy(
req.context,
md_resource=ns_obj,
enforcer=self.policy).modify_metadef_namespace()
ns_obj._old_namespace = ns_obj.namespace
ns_obj.namespace = wsme_utils._get_value(user_ns.namespace)
ns_obj.display_name = wsme_utils._get_value(user_ns.display_name)
@@ -303,18 +385,29 @@ class NamespaceController(object):
raise webob.exc.HTTPNotFound(explanation=e.msg)
except exception.Duplicate as e:
raise webob.exc.HTTPConflict(explanation=e.msg)
except Exception as e:
LOG.error(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPInternalServerError()
return Namespace.to_wsme_model(updated_namespace,
get_namespace_href(updated_namespace),
self.ns_schema_link)
def delete(self, req, namespace):
namespace_repo = self.gateway.get_metadef_namespace_repo(req.context)
namespace_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
try:
namespace_obj = namespace_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
# NOTE (abhishekk): Returning 404 Not Found as the
# namespace is outside of this user's project
msg = _("Namespace %s not found") % namespace
raise webob.exc.HTTPNotFound(explanation=msg)
try:
# NOTE(abhishekk): Here we are just checking user is authorized to
# delete the namespace or not.
api_policy.MetadefAPIPolicy(
req.context,
md_resource=namespace_obj,
enforcer=self.policy).delete_metadef_namespace()
namespace_obj.delete()
namespace_repo.remove(namespace_obj)
except exception.Forbidden as e:
@@ -323,14 +416,27 @@ class NamespaceController(object):
raise webob.exc.HTTPForbidden(explanation=e.msg)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.msg)
except Exception as e:
LOG.error(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPInternalServerError()
def delete_objects(self, req, namespace):
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
# NOTE (abhishekk): Returning 404 Not Found as the
# namespace is outside of this user's project
msg = _("Namespace %s not found") % namespace
raise webob.exc.HTTPNotFound(explanation=msg)
try:
# NOTE(abhishekk): This call currently checks whether user
# has permission to delete the namespace or not before deleting
# the objects associated with it.
api_policy.MetadefAPIPolicy(
req.context,
md_resource=namespace_obj,
enforcer=self.policy).delete_metadef_namespace()
namespace_obj.delete()
ns_repo.remove_objects(namespace_obj)
except exception.Forbidden as e:
@@ -339,14 +445,31 @@ class NamespaceController(object):
raise webob.exc.HTTPForbidden(explanation=e.msg)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.msg)
except Exception as e:
LOG.error(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPInternalServerError()
def delete_tags(self, req, namespace):
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
# NOTE (abhishekk): Returning 404 Not Found as the
# namespace is outside of this user's project
msg = _("Namespace %s not found") % namespace
raise webob.exc.HTTPNotFound(explanation=msg)
try:
# NOTE(abhishekk): This call currently checks whether user
# has permission to delete the namespace or not before deleting
# the objects associated with it.
policy_check = api_policy.MetadefAPIPolicy(
req.context,
md_resource=namespace_obj,
enforcer=self.policy)
policy_check.delete_metadef_namespace()
# NOTE(abhishekk): This call checks whether user
# has permission to delete the tags or not.
policy_check.delete_metadef_tags()
namespace_obj.delete()
ns_repo.remove_tags(namespace_obj)
except exception.Forbidden as e:
@@ -355,14 +478,26 @@ class NamespaceController(object):
raise webob.exc.HTTPForbidden(explanation=e.msg)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.msg)
except Exception as e:
LOG.error(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPInternalServerError()
def delete_properties(self, req, namespace):
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
# NOTE (abhishekk): Returning 404 Not Found as the
# namespace is outside of this user's project
msg = _("Namespace %s not found") % namespace
raise webob.exc.HTTPNotFound(explanation=msg)
try:
# NOTE(abhishekk): This call currently checks whether user
# has permission to delete the namespace or not before deleting
# the objects associated with it.
api_policy.MetadefAPIPolicy(
req.context,
md_resource=namespace_obj,
enforcer=self.policy).delete_metadef_namespace()
namespace_obj.delete()
ns_repo.remove_properties(namespace_obj)
except exception.Forbidden as e:
@@ -371,9 +506,6 @@ class NamespaceController(object):
raise webob.exc.HTTPForbidden(explanation=e.msg)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.msg)
except Exception as e:
LOG.error(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPInternalServerError()
def _prefix_property_name(self, namespace_detail, user_resource_type):
prefix = None

View File

@@ -148,3 +148,92 @@ class ImageAPIPolicy(APIPolicyBase):
# replaces the legacy policy.
if not CONF.enforce_secure_rbac:
check_is_image_mutable(self._context, self._image)
class MetadefAPIPolicy(APIPolicyBase):
def __init__(self, context, md_resource=None, target=None, enforcer=None):
self._context = context
self._md_resource = md_resource
if not target:
self._target = self._build_target()
else:
self._target = target
self.enforcer = enforcer or policy.Enforcer()
super(MetadefAPIPolicy, self).__init__(context, target=self._target,
enforcer=self.enforcer)
def _build_target(self):
target = {
"project_id": self._context.project_id
}
if self._md_resource:
target['project_id'] = self._md_resource.owner
target['visibility'] = self._md_resource.visibility
return target
def _enforce(self, rule_name):
"""Translate Forbidden->NotFound for images."""
try:
super(MetadefAPIPolicy, self)._enforce(rule_name)
except webob.exc.HTTPForbidden:
# If we are checking get_metadef_namespace, then Forbidden means
# the user cannot see this namespace, so raise NotFound. If we are
# checking anything else and get Forbidden, then raise
# NotFound in that case as well to avoid exposing namespaces
# the user can not see, while preserving the Forbidden
# behavior for the ones they can see.
if rule_name == 'get_metadef_namespace' or not self.check(
'get_metadef_namespace'):
raise webob.exc.HTTPNotFound()
raise
def check(self, name, *args):
try:
return super(MetadefAPIPolicy, self).check(name, *args)
except webob.exc.HTTPNotFound:
# NOTE(danms): Since our _enforce can raise NotFound, that
# too means a False check response.
return False
def get_metadef_namespace(self):
self._enforce('get_metadef_namespace')
def get_metadef_namespaces(self):
self._enforce('get_metadef_namespaces')
def add_metadef_namespace(self):
self._enforce('add_metadef_namespace')
def modify_metadef_namespace(self):
self._enforce('modify_metadef_namespace')
def delete_metadef_namespace(self):
self._enforce('delete_metadef_namespace')
def get_metadef_objects(self):
self._enforce('get_metadef_objects')
def add_metadef_object(self):
self._enforce('add_metadef_object')
def add_metadef_tag(self):
self._enforce('add_metadef_tag')
def get_metadef_tags(self):
self._enforce('get_metadef_tags')
def delete_metadef_tags(self):
self._enforce('delete_metadef_tags')
def add_metadef_property(self):
self._enforce('add_metadef_property')
def get_metadef_properties(self):
self._enforce('get_metadef_properties')
def add_metadef_resource_type_association(self):
self._enforce('add_metadef_resource_type_association')
def list_metadef_resource_types(self):
self._enforce('list_metadef_resource_types')

View File

@@ -1828,3 +1828,11 @@ class SynchronousAPIBase(test_utils.BaseTestCase):
json={'name': 'foo',
'container_format': 'bare',
'disk_format': 'raw'})
def _create_metadef_resource(self, path=None, data=None,
expected_code=201):
resp = self.api_post(path,
json=data)
md_resource = jsonutils.loads(resp.text)
self.assertEqual(expected_code, resp.status_code)
return md_resource

View File

@@ -0,0 +1,765 @@
# Copyright 2021 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import oslo_policy.policy
from glance.api import policy
from glance.tests import functional
GLOBAL_NAMESPACE_DATA = {
"namespace": "MyNamespace",
"display_name": "My User Friendly Namespace",
"description": "My description",
"resource_type_associations": [{
"name": "MyResourceType",
"prefix": "prefix_",
"properties_target": "temp"
}],
"objects": [{
"name": "MyObject",
"description": "My object for My namespace",
"properties": {
"test_property": {
"title": "test_property",
"description": "Test property for My object",
"type": "string"
},
}
}],
"tags": [{
"name": "MyTag",
}],
"properties": {
"TestProperty": {
"title": "MyTestProperty",
"description": "Test Property for My namespace",
"type": "string"
},
},
}
NAME_SPACE1 = {
"namespace": "MyNamespace",
"display_name": "My User Friendly Namespace",
"description": "My description"
}
NAME_SPACE2 = {
"namespace": "MySecondNamespace",
"display_name": "My User Friendly Namespace",
"description": "My description"
}
class TestMetadefNamespacesPolicy(functional.SynchronousAPIBase):
def setUp(self):
super(TestMetadefNamespacesPolicy, self).setUp()
self.policy = policy.Enforcer()
def set_policy_rules(self, rules):
self.policy.set_rules(
oslo_policy.policy.Rules.from_dict(rules),
overwrite=True)
def start_server(self):
with mock.patch.object(policy, 'Enforcer') as mock_enf:
mock_enf.return_value = self.policy
super(TestMetadefNamespacesPolicy, self).start_server()
def _verify_forbidden_converted_to_not_found(self, path, method,
json=None):
# Note for other reviewers, these tests runs by default using
# admin role, to test this scenario we need private namespace
# of current project to be accessed by other projects non-admin
# user.
headers = self._headers({
'X-Tenant-Id': 'fake-tenant-id',
'X-Roles': 'member',
})
resp = self.api_request(method, path, headers=headers, json=json)
self.assertEqual(404, resp.status_code)
def test_namespace_list_basic(self):
self.start_server()
# First make sure create private namespace works with default policy
path = '/v2/metadefs/namespaces'
md_resource = self._create_metadef_resource(path=path,
data=NAME_SPACE1)
self.assertEqual('MyNamespace', md_resource['namespace'])
# First make sure create public namespace works with default policy
path = '/v2/metadefs/namespaces'
NAME_SPACE2["visibility"] = 'public'
md_resource = self._create_metadef_resource(path=path,
data=NAME_SPACE2)
self.assertEqual('MySecondNamespace', md_resource['namespace'])
# Now make sure 'get_metadef_namespaces' allows user to get all the
# namespaces
resp = self.api_get(path)
md_resource = resp.json
self.assertEqual(2, len(md_resource['namespaces']))
# Now disable get_metadef_namespaces permissions and make sure any
# other attempts fail
self.set_policy_rules({
'get_metadef_namespaces': '!',
'get_metadef_namespace': '@',
})
resp = self.api_get(path)
self.assertEqual(403, resp.status_code)
def test_namespace_list_with_resource_types(self):
self.start_server()
# First make sure create namespace works with default policy
path = '/v2/metadefs/namespaces'
md_resource = self._create_metadef_resource(path=path,
data=GLOBAL_NAMESPACE_DATA)
self.assertEqual('MyNamespace', md_resource['namespace'])
# Now make sure 'get_metadef_namespaces' allows user to get all the
# namespaces with associated resource types
resp = self.api_get(path)
md_resource = resp.json
self.assertEqual(1, len(md_resource['namespaces']))
# Verify that response includes associated resource types as well
for namespace_obj in md_resource['namespaces']:
self.assertIn('resource_type_associations', namespace_obj)
# Now disable list_metadef_resource_types permissions and make sure
# you get forbidden response
self.set_policy_rules({
'get_metadef_namespaces': '@',
'get_metadef_namespace': '@',
'list_metadef_resource_types': '!'
})
resp = self.api_get(path)
self.assertEqual(403, resp.status_code)
# Now enable list_metadef_resource_types and get_metadef_namespaces
# permissions and disable get_metadef_namespace permission to make sure
# you will get empty list as a response
self.set_policy_rules({
'get_metadef_namespaces': '@',
'get_metadef_namespace': '!',
'list_metadef_resource_types': '@'
})
resp = self.api_get(path)
md_resource = resp.json
self.assertEqual(0, len(md_resource['namespaces']))
# Verify that response does not includes associated resource types
for namespace_obj in md_resource['namespaces']:
self.assertNotIn('resource_type_associations', namespace_obj)
def test_namespace_create_basic(self):
self.start_server()
# First make sure create namespace works with default policy
path = '/v2/metadefs/namespaces'
md_resource = self._create_metadef_resource(path=path,
data=NAME_SPACE1)
self.assertEqual('MyNamespace', md_resource['namespace'])
# Now disable add_metadef_namespace permissions and make sure any other
# attempts fail
self.set_policy_rules({
'add_metadef_namespace': '!',
'get_metadef_namespace': '@'
})
resp = self.api_post(path, json=NAME_SPACE2)
self.assertEqual(403, resp.status_code)
def test_namespace_create_with_resource_type_associations(self):
self.start_server()
# First make sure you can create namespace and resource type
# associations with default policy
path = '/v2/metadefs/namespaces'
data = {
"resource_type_associations": [{
"name": "MyResourceType",
"prefix": "prefix_",
"properties_target": "temp"
}],
}
data.update(NAME_SPACE1)
md_resource = self._create_metadef_resource(path=path,
data=data)
self.assertEqual('MyNamespace', md_resource['namespace'])
self.assertEqual(
'MyResourceType',
md_resource['resource_type_associations'][0]['name'])
# Now disable add_metadef_resource_type_association permissions and
# make sure that even you have permission to create namespace the
# request will fail
self.set_policy_rules({
'add_metadef_resource_type_association': '!',
'get_metadef_namespace': '@'
})
data.update(NAME_SPACE2)
resp = self.api_post(path, json=data)
self.assertEqual(403, resp.status_code)
def test_namespace_create_with_objects(self):
self.start_server()
# First make sure you can create namespace and objects
# with default policy
path = '/v2/metadefs/namespaces'
data = {
"objects": [{
"name": "MyObject",
"description": "My object for My namespace",
"properties": {
"test_property": {
"title": "test_property",
"description": "Test property for My object",
"type": "string"
},
}
}],
}
data.update(NAME_SPACE1)
md_resource = self._create_metadef_resource(path=path,
data=data)
self.assertEqual('MyNamespace', md_resource['namespace'])
self.assertEqual(
'MyObject',
md_resource['objects'][0]['name'])
# Now disable add_metadef_object permissions and
# make sure that even you have permission to create namespace the
# request will fail
self.set_policy_rules({
'add_metadef_object': '!',
'get_metadef_namespace': '@'
})
data.update(NAME_SPACE2)
resp = self.api_post(path, json=data)
self.assertEqual(403, resp.status_code)
def test_namespace_create_with_tags(self):
self.start_server()
# First make sure you can create namespace and tags
# with default policy
path = '/v2/metadefs/namespaces'
data = {
"tags": [{
"name": "MyTag",
}],
}
data.update(NAME_SPACE1)
md_resource = self._create_metadef_resource(path=path,
data=data)
self.assertEqual('MyNamespace', md_resource['namespace'])
self.assertEqual(
'MyTag',
md_resource['tags'][0]['name'])
# Now disable add_metadef_object permissions and
# make sure that even you have permission to create namespace the
# request will fail
data.update(NAME_SPACE2)
self.set_policy_rules({
'add_metadef_tag': '!',
'get_metadef_namespace': '@'
})
resp = self.api_post(path, json=data)
self.assertEqual(403, resp.status_code)
def test_namespace_create_with_properties(self):
self.start_server()
# First make sure you can create namespace and properties
# with default policy
path = '/v2/metadefs/namespaces'
data = {
"properties": {
"TestProperty": {
"title": "MyTestProperty",
"description": "Test Property for My namespace",
"type": "string"
},
}
}
data.update(NAME_SPACE1)
md_resource = self._create_metadef_resource(path=path,
data=data)
self.assertEqual('MyNamespace', md_resource['namespace'])
self.assertEqual(
'MyTestProperty',
md_resource['properties']['TestProperty']['title'])
# Now disable add_metadef_property permissions and
# make sure that even you have permission to create namespace the
# request will fail
data.update(NAME_SPACE2)
self.set_policy_rules({
'add_metadef_property': '!',
'get_metadef_namespace': '@'
})
resp = self.api_post(path, json=data)
self.assertEqual(403, resp.status_code)
def test_namespace_get_basic(self):
self.start_server()
# First make sure create namespace works with default policy
path = '/v2/metadefs/namespaces'
md_resource = self._create_metadef_resource(path=path,
data=GLOBAL_NAMESPACE_DATA)
self.assertEqual('MyNamespace', md_resource['namespace'])
# Now make sure get_metadef_namespace will return all associated
# resources in the response as every policy is open.
path = "/v2/metadefs/namespaces/%s" % md_resource['namespace']
resp = self.api_get(path)
md_resource = resp.json
self.assertEqual('MyNamespace', md_resource['namespace'])
self.assertIn('objects', md_resource)
self.assertIn('resource_type_associations', md_resource)
self.assertIn('tags', md_resource)
self.assertIn('properties', md_resource)
# Now disable get_metadef_namespace policy to ensure that you are
# forbidden to fulfill the request and get 404 not found
self.set_policy_rules({'get_metadef_namespace': '!'})
path = "/v2/metadefs/namespaces/%s" % md_resource['namespace']
resp = self.api_get(path)
self.assertEqual(404, resp.status_code)
# Now disable get_metadef_objects policy to ensure that you will
# get forbidden response
self.set_policy_rules({
'get_metadef_objects': '!',
'get_metadef_namespace': '@',
'list_metadef_resource_types': '@',
'get_metadef_properties': '@',
'get_metadef_tags': '@'
})
path = "/v2/metadefs/namespaces/%s" % md_resource['namespace']
resp = self.api_get(path)
self.assertEqual(403, resp.status_code)
# Now disable list_metadef_resource_types policy to ensure that you
# will get forbidden response
self.set_policy_rules({
'get_metadef_objects': '@',
'get_metadef_namespace': '@',
'list_metadef_resource_types': '!',
'get_metadef_properties': '@',
'get_metadef_tags': '@'
})
path = "/v2/metadefs/namespaces/%s" % md_resource['namespace']
resp = self.api_get(path)
self.assertEqual(403, resp.status_code)
# Now disable get_metadef_properties policy to ensure that you will
# ger forbidden response
self.set_policy_rules({
'get_metadef_objects': '@',
'get_metadef_namespace': '@',
'list_metadef_resource_types': '@',
'get_metadef_properties': '!',
'get_metadef_tags': '@'
})
path = "/v2/metadefs/namespaces/%s" % md_resource['namespace']
resp = self.api_get(path)
self.assertEqual(403, resp.status_code)
# Now disable get_metadef_tags policy to ensure that you will
# get forbidden response
self.set_policy_rules({
'get_metadef_objects': '@',
'get_metadef_namespace': '@',
'list_metadef_resource_types': '@',
'get_metadef_properties': '@',
'get_metadef_tags': '!'
})
path = "/v2/metadefs/namespaces/%s" % md_resource['namespace']
resp = self.api_get(path)
self.assertEqual(403, resp.status_code)
def test_namespace_update_basic(self):
self.start_server()
# First make sure create namespace works with default policy
path = '/v2/metadefs/namespaces'
md_resource = self._create_metadef_resource(path=path,
data=NAME_SPACE1)
self.assertEqual('MyNamespace', md_resource['namespace'])
self.assertEqual('private', md_resource['visibility'])
# Now ensure you are able to update the namespace
path = '/v2/metadefs/namespaces/%s' % md_resource['namespace']
data = {
'visibility': 'public',
'namespace': md_resource['namespace'],
}
resp = self.api_put(path, json=data)
md_resource = resp.json
self.assertEqual('MyNamespace', md_resource['namespace'])
self.assertEqual('public', md_resource['visibility'])
# Now disable modify_metadef_namespace permissions and make sure
# any other attempts results in 403 forbidden
self.set_policy_rules({
'modify_metadef_namespace': '!',
'get_metadef_namespace': '@',
})
resp = self.api_put(path, json=data)
self.assertEqual(403, resp.status_code)
# Now enable modify_metadef_namespace and get_metadef_namespace
# permissions and make sure modifying non existing results in
# 404 NotFound
self.set_policy_rules({
'modify_metadef_namespace': '@',
'get_metadef_namespace': '@',
})
path = '/v2/metadefs/namespaces/non-existing'
resp = self.api_put(path, json=data)
self.assertEqual(404, resp.status_code)
# Note for reviewers, this causes our "check get if modify fails"
# logic to return 404 as we expect, but not related to the latest
# rev that checks the namespace get operation first.
self.set_policy_rules({
'modify_metadef_namespace': '!',
'get_metadef_namespace': '!',
})
path = '/v2/metadefs/namespaces/%s' % md_resource['namespace']
resp = self.api_put(path, json=data)
self.assertEqual(404, resp.status_code)
# Ensure accessing non visible namespace will catch 403 and
# return 404 to user
self.set_policy_rules({
'modify_metadef_namespace': '@',
'get_metadef_namespace': '@',
})
# Reset visibility to private
# Now ensure you are able to update the namespace
path = '/v2/metadefs/namespaces/%s' % md_resource['namespace']
data = {
'visibility': 'private',
'namespace': md_resource['namespace'],
}
resp = self.api_put(path, json=data)
md_resource = resp.json
self.assertEqual('MyNamespace', md_resource['namespace'])
self.assertEqual('private', md_resource['visibility'])
# Now try to update the same namespace by different user
self._verify_forbidden_converted_to_not_found(path, 'PUT',
json=data)
def test_namespace_delete_basic(self):
def _create_private_namespace(fn_call, data):
path = '/v2/metadefs/namespaces'
return fn_call(path=path, data=data)
self.start_server()
# First make sure create namespace works with default policy
md_resource = _create_private_namespace(
self._create_metadef_resource, NAME_SPACE1)
self.assertEqual('MyNamespace', md_resource['namespace'])
# Now ensure you are able to delete the namespace
path = '/v2/metadefs/namespaces/%s' % md_resource['namespace']
resp = self.api_delete(path)
self.assertEqual(204, resp.status_code)
# Verify that namespace is deleted
path = "/v2/metadefs/namespaces/%s" % md_resource['namespace']
resp = self.api_get(path)
self.assertEqual(404, resp.status_code)
# Now create another namespace to check deletion is not allowed
md_resource = _create_private_namespace(
self._create_metadef_resource, NAME_SPACE2)
self.assertEqual('MySecondNamespace', md_resource['namespace'])
# Now disable delete_metadef_namespace permissions and make sure
# any other attempts fail
path = '/v2/metadefs/namespaces/%s' % md_resource['namespace']
self.set_policy_rules({
'delete_metadef_namespace': '!',
'get_metadef_namespace': '@'
})
resp = self.api_delete(path)
self.assertEqual(403, resp.status_code)
# Now enable both permissions and make sure deleting non
# exsting namespace returns 404 NotFound
self.set_policy_rules({
'delete_metadef_namespace': '@',
'get_metadef_namespace': '@'
})
path = '/v2/metadefs/namespaces/non-existing'
resp = self.api_delete(path)
self.assertEqual(404, resp.status_code)
# Note for reviewers, this causes our "check get if delete fails"
# logic to return 404 as we expect, but not related to the latest
# rev that checks the namespace get operation first.
self.set_policy_rules({
'delete_metadef_namespace': '!',
'get_metadef_namespace': '!',
})
path = '/v2/metadefs/namespaces/%s' % md_resource['namespace']
resp = self.api_delete(path)
self.assertEqual(404, resp.status_code)
# Ensure accessing non visible namespace will catch 403 and
# return 404 to user
self.set_policy_rules({
'delete_metadef_namespace': '@',
'get_metadef_namespace': '@',
})
self._verify_forbidden_converted_to_not_found(path, 'DELETE')
def test_namespace_delete_objects_basic(self):
self.start_server()
# First make sure create namespace and object works with default
# policy
path = '/v2/metadefs/namespaces'
md_resource = self._create_metadef_resource(path,
data=GLOBAL_NAMESPACE_DATA)
self.assertEqual('MyNamespace', md_resource['namespace'])
self.assertIn('objects', md_resource)
# Now ensure you are able to delete the object(s) from namespace
path = '/v2/metadefs/namespaces/%s/objects' % md_resource['namespace']
resp = self.api_delete(path)
self.assertEqual(204, resp.status_code)
# Verify that object from namespace is deleted but namespace is
# available
path = "/v2/metadefs/namespaces/%s" % md_resource['namespace']
resp = self.api_get(path)
md_resource = resp.json
self.assertNotIn('objects', md_resource)
self.assertEqual('MyNamespace', md_resource['namespace'])
# Now add another object to the namespace
path = '/v2/metadefs/namespaces/%s/objects' % md_resource['namespace']
data = {
"name": "MyObject",
"description": "My object for My namespace",
"properties": {
"test_property": {
"title": "test_property",
"description": "Test property for My object",
"type": "string"
},
}
}
md_object = self._create_metadef_resource(path, data=data)
self.assertEqual('MyObject', md_object['name'])
# Now disable delete_metadef_namespace permissions and make sure
# any other attempts to delete objects fails
path = '/v2/metadefs/namespaces/%s/objects' % md_resource['namespace']
self.set_policy_rules({
'delete_metadef_namespace': '!',
'get_metadef_namespace': '@'
})
resp = self.api_delete(path)
self.assertEqual(403, resp.status_code)
# Now enable both permissions and make sure
# deleting objects for non existing namespace returns 404 Not found
path = '/v2/metadefs/namespaces/non-existing/objects'
self.set_policy_rules({
'delete_metadef_namespace': '@',
'get_metaded_namespace': '@'
})
resp = self.api_delete(path)
self.assertEqual(404, resp.status_code)
# Note for reviewers, this causes our "check get if delete fails"
# logic to return 404 as we expect, but not related to the latest
# rev that checks the namespace get operation first.
self.set_policy_rules({
'delete_metadef_namespace': '!',
'get_metadef_namespace': '!',
})
path = '/v2/metadefs/namespaces/%s/objects' % md_resource['namespace']
resp = self.api_delete(path)
self.assertEqual(404, resp.status_code)
# Ensure accessing non visible namespace will catch 403 and
# return 404 to user
self.set_policy_rules({
'delete_metadef_namespace': '@',
'get_metadef_namespace': '@',
})
self._verify_forbidden_converted_to_not_found(path, 'DELETE')
def test_namespace_delete_properties_basic(self):
self.start_server()
# First make sure create namespace and properties works with default
# policy
path = '/v2/metadefs/namespaces'
md_resource = self._create_metadef_resource(path,
data=GLOBAL_NAMESPACE_DATA)
namespace = md_resource['namespace']
self.assertEqual('MyNamespace', namespace)
self.assertIn('properties', md_resource)
# Now ensure you are able to delete all properties from namespace
path = '/v2/metadefs/namespaces/%s/properties' % namespace
resp = self.api_delete(path)
self.assertEqual(204, resp.status_code)
# Verify that properties from namespace are deleted but namespace is
# available
path = "/v2/metadefs/namespaces/%s" % namespace
resp = self.api_get(path)
md_resource = resp.json
self.assertNotIn('properties', md_resource)
self.assertEqual('MyNamespace', namespace)
# Now add another property to the namespace
path = '/v2/metadefs/namespaces/%s/properties' % namespace
data = {
"name": "MyProperty",
"title": "test_property",
"description": "Test property for My Namespace",
"type": "string"
}
md_resource = self._create_metadef_resource(path,
data=data)
self.assertEqual('MyProperty', md_resource['name'])
# Now disable delete_metadef_namespace permissions and make sure
# any other attempts to delete properties fails
path = '/v2/metadefs/namespaces/%s/properties' % namespace
self.set_policy_rules({
'delete_metadef_namespace': '!',
'get_metadef_namespace': '@',
})
resp = self.api_delete(path)
self.assertEqual(403, resp.status_code)
# Now disable both permissions and make sure
# deleting properties for non existing namespace returns 404 Not found
path = '/v2/metadefs/namespaces/non-existing/properties'
self.set_policy_rules({
'delete_metadef_namespace': '@',
'get_metadef_namespace': '@',
})
resp = self.api_delete(path)
self.assertEqual(404, resp.status_code)
# Note for reviewers, this causes our "check get if delete fails"
# logic to return 404 as we expect, but not related to the latest
# rev that checks the namespace get operation first.
self.set_policy_rules({
'delete_metadef_namespace': '!',
'get_metadef_namespace': '!',
})
path = '/v2/metadefs/namespaces/%s/properties' % namespace
resp = self.api_delete(path)
self.assertEqual(404, resp.status_code)
# Ensure accessing non visible namespace will catch 403 and
# return 404 to user
self.set_policy_rules({
'delete_metadef_namespace': '@',
'get_metadef_namespace': '@',
})
self._verify_forbidden_converted_to_not_found(path, 'DELETE')
def test_namespace_delete_tags_basic(self):
self.start_server()
# First make sure create namespace and tags works with default
# policy
path = '/v2/metadefs/namespaces'
md_resource = self._create_metadef_resource(path,
data=GLOBAL_NAMESPACE_DATA)
namespace = md_resource['namespace']
self.assertEqual('MyNamespace', namespace)
self.assertIn('tags', md_resource)
# Now ensure you are able to delete all properties from namespace
path = '/v2/metadefs/namespaces/%s/tags' % namespace
resp = self.api_delete(path)
self.assertEqual(204, resp.status_code)
# Verify that tags from namespace are deleted but namespace is
# available
path = "/v2/metadefs/namespaces/%s" % namespace
resp = self.api_get(path)
md_resource = resp.json
self.assertNotIn('tags', md_resource)
self.assertEqual('MyNamespace', namespace)
# Now add another tag to the namespace
tag_name = "MyTag"
path = '/v2/metadefs/namespaces/%s/tags/%s' % (namespace,
tag_name)
md_resource = self._create_metadef_resource(path)
self.assertEqual('MyTag', md_resource['name'])
# Now disable delete_metadef_namespace permissions and make sure
# any other attempts to delete tags fails
path = '/v2/metadefs/namespaces/%s/tags' % namespace
self.set_policy_rules({
'delete_metadef_namespace': '!',
'get_metadef_namespace': '@'
})
resp = self.api_delete(path)
self.assertEqual(403, resp.status_code)
# Now enable delete_metadef_namespace permissions and and disable
# delete_metadef_tags to make sure
# any other attempts to delete tags fails
path = '/v2/metadefs/namespaces/%s/tags' % namespace
self.set_policy_rules({
'delete_metadef_namespace': '@',
'delete_metadef_tags': '!',
'get_metadef_namespace': '@'
})
resp = self.api_delete(path)
self.assertEqual(403, resp.status_code)
# Now enable all permissions and make sure deleting tags for
# non existing namespace will return 404 Not found
path = '/v2/metadefs/namespaces/non-existing/tags'
self.set_policy_rules({
'delete_metadef_namespace': '@',
'delete_metadef_tags': '@',
'get_metadef_namespace': '@'
})
resp = self.api_delete(path)
self.assertEqual(404, resp.status_code)
# Note for reviewers, this causes our "check get if delete fails"
# logic to return 404 as we expect, but not related to the latest
# rev that checks the namespace get operation first.
self.set_policy_rules({
'delete_metadef_namespace': '!',
'get_metadef_namespace': '!',
'delete_metadef_tags': '!'
})
path = '/v2/metadefs/namespaces/%s/tags' % namespace
resp = self.api_delete(path)
self.assertEqual(404, resp.status_code)
# Ensure accessing non visible namespace will catch 403 and
# return 404 to user
self.set_policy_rules({
'delete_metadef_namespace': '@',
'get_metadef_namespace': '@',
'delete_metadef_tags': '@'
})
self._verify_forbidden_converted_to_not_found(path, 'DELETE')

View File

@@ -500,7 +500,7 @@ class TestMetadefsControllers(base.IsolatedUnitTest):
def test_namespace_delete_non_visible(self):
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPForbidden,
self.assertRaises(webob.exc.HTTPNotFound,
self.namespace_controller.delete, request,
NAMESPACE2)
self.assertNotificationsLog([])
@@ -570,7 +570,7 @@ class TestMetadefsControllers(base.IsolatedUnitTest):
[{'namespace': NAMESPACE3}])
def test_namespace_non_existing_delete_properties(self):
request = unit_test_utils.get_fake_request()
request = unit_test_utils.get_fake_request(roles=['admin'])
self.assertRaises(webob.exc.HTTPNotFound,
self.namespace_controller.delete_properties,
request,
@@ -608,7 +608,7 @@ class TestMetadefsControllers(base.IsolatedUnitTest):
[{'namespace': NAMESPACE3}])
def test_namespace_non_existing_delete_objects(self):
request = unit_test_utils.get_fake_request()
request = unit_test_utils.get_fake_request(roles=['admin'])
self.assertRaises(webob.exc.HTTPNotFound,
self.namespace_controller.delete_objects,
request,
@@ -645,7 +645,7 @@ class TestMetadefsControllers(base.IsolatedUnitTest):
[{'namespace': NAMESPACE3}])
def test_namespace_non_existing_delete_tags(self):
request = unit_test_utils.get_fake_request()
request = unit_test_utils.get_fake_request(roles=['admin'])
self.assertRaises(webob.exc.HTTPNotFound,
self.namespace_controller.delete_tags,
request,
@@ -843,7 +843,7 @@ class TestMetadefsControllers(base.IsolatedUnitTest):
namespace = namespaces.Namespace()
namespace.namespace = NAMESPACE2
self.assertRaises(webob.exc.HTTPForbidden,
self.assertRaises(webob.exc.HTTPNotFound,
self.namespace_controller.update, request, namespace,
NAMESPACE2)
self.assertNotificationsLog([])

View File

@@ -222,3 +222,132 @@ class APIImagePolicy(APIPolicyBase):
with mock.patch('glance.api.v2.policy.check_is_image_mutable') as m:
self.policy.delete_image()
self.assertFalse(m.called)
class TestMetadefAPIPolicy(APIPolicyBase):
def setUp(self):
super(TestMetadefAPIPolicy, self).setUp()
self.enforcer = mock.MagicMock()
self.md_resource = mock.MagicMock()
self.context = mock.MagicMock()
self.policy = policy.MetadefAPIPolicy(self.context, self.md_resource,
enforcer=self.enforcer)
def test_enforce(self):
self.assertRaises(webob.exc.HTTPNotFound,
super(TestMetadefAPIPolicy, self).test_enforce)
def test_get_metadef_namespace(self):
self.policy.get_metadef_namespace()
self.enforcer.enforce.assert_called_once_with(self.context,
'get_metadef_namespace',
mock.ANY)
def test_get_metadef_namespaces(self):
self.policy.get_metadef_namespaces()
self.enforcer.enforce.assert_called_once_with(self.context,
'get_metadef_namespaces',
mock.ANY)
def test_add_metadef_namespace(self):
self.policy.add_metadef_namespace()
self.enforcer.enforce.assert_called_once_with(self.context,
'add_metadef_namespace',
mock.ANY)
def test_modify_metadef_namespace(self):
self.policy.modify_metadef_namespace()
self.enforcer.enforce.assert_called_once_with(
self.context, 'modify_metadef_namespace', mock.ANY)
def test_delete_metadef_namespace(self):
self.policy.delete_metadef_namespace()
self.enforcer.enforce.assert_called_once_with(
self.context, 'delete_metadef_namespace', mock.ANY)
def test_get_metadef_objects(self):
self.policy.get_metadef_objects()
self.enforcer.enforce.assert_called_once_with(self.context,
'get_metadef_objects',
mock.ANY)
def test_add_metadef_object(self):
self.policy.add_metadef_object()
self.enforcer.enforce.assert_called_once_with(self.context,
'add_metadef_object',
mock.ANY)
def test_add_metadef_tag(self):
self.policy.add_metadef_tag()
self.enforcer.enforce.assert_called_once_with(self.context,
'add_metadef_tag',
mock.ANY)
def test_get_metadef_tags(self):
self.policy.get_metadef_tags()
self.enforcer.enforce.assert_called_once_with(self.context,
'get_metadef_tags',
mock.ANY)
def test_delete_metadef_tags(self):
self.policy.delete_metadef_tags()
self.enforcer.enforce.assert_called_once_with(self.context,
'delete_metadef_tags',
mock.ANY)
def test_add_metadef_property(self):
self.policy.add_metadef_property()
self.enforcer.enforce.assert_called_once_with(self.context,
'add_metadef_property',
mock.ANY)
def test_get_metadef_properties(self):
self.policy.get_metadef_properties()
self.enforcer.enforce.assert_called_once_with(self.context,
'get_metadef_properties',
mock.ANY)
def test_add_metadef_resource_type_association(self):
self.policy.add_metadef_resource_type_association()
self.enforcer.enforce.assert_called_once_with(
self.context, 'add_metadef_resource_type_association', mock.ANY)
def test_list_metadef_resource_types(self):
self.policy.list_metadef_resource_types()
self.enforcer.enforce.assert_called_once_with(
self.context, 'list_metadef_resource_types', mock.ANY)
def test_enforce_exception_behavior(self):
with mock.patch.object(self.policy.enforcer, 'enforce') as mock_enf:
# First make sure we can update if allowed
self.policy.modify_metadef_namespace()
self.assertTrue(mock_enf.called)
# Make sure that if modify_metadef_namespace and
# get_metadef_namespace both return Forbidden then we
# should get NotFound. This is because we are not allowed
# to modify the namespace, nor see that it even exists.
mock_enf.reset_mock()
mock_enf.side_effect = exception.Forbidden
self.assertRaises(webob.exc.HTTPNotFound,
self.policy.modify_metadef_namespace)
# Make sure we checked modify_metadef_namespace, and then
# get_metadef_namespace.
mock_enf.assert_has_calls([
mock.call(mock.ANY, 'modify_metadef_namespace', mock.ANY),
mock.call(mock.ANY, 'get_metadef_namespace', mock.ANY)])
# Make sure that if modify_metadef_namespace is disallowed, but
# get_metadef_namespace is allowed, that we get Forbidden. This is
# because we are allowed to see the namespace, but not modify
# it, so 403 indicates that without confusing the user and
# returning "not found" for a namespace they are able to GET.
mock_enf.reset_mock()
mock_enf.side_effect = [exception.Forbidden, lambda *a: None]
self.assertRaises(webob.exc.HTTPForbidden,
self.policy.modify_metadef_namespace)
# Make sure we checked modify_metadef_namespace, and then
# get_metadef_namespace.
mock_enf.assert_has_calls([
mock.call(mock.ANY, 'modify_metadef_namespace', mock.ANY),
mock.call(mock.ANY, 'get_metadef_namespace', mock.ANY)])