Pass a real image target to the policy enforcer

Previously, every call to policy.enforce passed an empty dictionary as
the target. This prevents operators from using tenant specific
restrictions in their policy.json files since the target will always be
an empty dictionary.

If you try to restrict some actions so an image owner (users with the
correct tenant id) can perform actions, the check categorically fails
because the target is okay is an empty dictionary. By passing the
ImageTarget instance wrapping an Image, we can properly grant access to
the image owner(s) based on tenant (e.g., owner:%(tenant)). Without this
fix, the only check that actually works in glance is a RoleCheck (e.g.,
role:admin).

Partial-bug: 1346648
Implements: blueprint pass-targets-to-policy-enforcer
Change-Id: Id914c478ca7c4dfde3f08028d8b70c623f26b6e9
This commit is contained in:
Ian Cordasco 2015-01-12 15:56:29 -06:00
parent 8a2be34e7e
commit b159aa8b64
4 changed files with 446 additions and 67 deletions

View File

@ -116,6 +116,57 @@ To limit an action to a particular role or roles, you list the roles like so ::
The above would add a rule that only allowed users that had roles of either
"admin" or "superuser" to delete an image.
Writing Rules
-------------
Role checks are going to continue to work exactly as they already do. If the
role defined in the check is one that the user holds, then that will pass,
e.g., ``role:admin``.
To write a generic rule, you need to know that there are three values provided
by Glance that can be used in a rule on the left side of the colon (``:``).
Those values are the current user's credentials in the form of:
- role
- tenant
- owner
The left side of the colon can also contain any value that Python can
understand, e.g.,:
- ``True``
- ``False``
- ``"a string"``
- &c.
Using ``tenant`` and ``owner`` will only work with images. Consider the
following rule::
tenant:%(owner)s
This will use the ``tenant`` value of the currently authenticated user. It
will also use ``owner`` from the image it is acting upon. If those two
values are equivalent the check will pass. All attributes on an image (as well
as extra image properties) are available for use on the right side of the
colon. The most useful are the following:
- ``owner``
- ``protected``
- ``is_public``
Therefore, you could construct a set of rules like the following::
{
"not_protected": "False:%(protected)s",
"is_owner": "tenant:%(owner)s",
"is_owner_or_admin": "rule:is_owner or role:admin",
"not_protected_and_is_owner": "rule:not_protected and rule:is_owner",
"get_image": "rule:is_owner_or_admin",
"delete_image": "rule:not_protected_and_is_owner",
"add_member": "rule:not_protected_and_is_owner"
}
Examples
--------
@ -124,7 +175,7 @@ Example 1. (The default policy configuration)
::
{
"default": []
"default": ""
}
Note that an empty JSON list means that all methods of the
@ -135,8 +186,8 @@ Example 2. Disallow modification calls to non-admins
::
{
"default": [],
"add_image": ["role:admin"],
"modify_image": ["role:admin"],
"delete_image": ["role:admin"]
"default": "",
"add_image": "role:admin",
"modify_image": "role:admin",
"delete_image": "role:admin"
}

View File

@ -111,19 +111,25 @@ class ImageRepoProxy(glance.domain.proxy.Repo):
item_proxy_kwargs=proxy_kwargs)
def get(self, image_id):
self.policy.enforce(self.context, 'get_image', {})
return super(ImageRepoProxy, self).get(image_id)
try:
image = super(ImageRepoProxy, self).get(image_id)
except exception.NotFound:
self.policy.enforce(self.context, 'get_image', {})
raise
else:
self.policy.enforce(self.context, 'get_image', ImageTarget(image))
return image
def list(self, *args, **kwargs):
self.policy.enforce(self.context, 'get_images', {})
return super(ImageRepoProxy, self).list(*args, **kwargs)
def save(self, image, from_state=None):
self.policy.enforce(self.context, 'modify_image', {})
self.policy.enforce(self.context, 'modify_image', image.target)
return super(ImageRepoProxy, self).save(image, from_state=from_state)
def add(self, image):
self.policy.enforce(self.context, 'add_image', {})
self.policy.enforce(self.context, 'add_image', image.target)
return super(ImageRepoProxy, self).add(image)
@ -131,6 +137,7 @@ class ImageProxy(glance.domain.proxy.Image):
def __init__(self, image, context, policy):
self.image = image
self.target = ImageTarget(image)
self.context = context
self.policy = policy
super(ImageProxy, self).__init__(image)
@ -142,7 +149,7 @@ class ImageProxy(glance.domain.proxy.Image):
@visibility.setter
def visibility(self, value):
if value == 'public':
self.policy.enforce(self.context, 'publicize_image', {})
self.policy.enforce(self.context, 'publicize_image', self.target)
self.image.visibility = value
@property
@ -154,25 +161,24 @@ class ImageProxy(glance.domain.proxy.Image):
def locations(self, value):
if not isinstance(value, (list, ImageLocationsProxy)):
raise exception.Invalid(_('Invalid locations: %s') % value)
self.policy.enforce(self.context, 'set_image_location', {})
self.policy.enforce(self.context, 'set_image_location', self.target)
new_locations = list(value)
if (set([loc['url'] for loc in self.image.locations]) -
set([loc['url'] for loc in new_locations])):
self.policy.enforce(self.context, 'delete_image_location', {})
self.policy.enforce(self.context, 'delete_image_location',
self.target)
self.image.locations = new_locations
def delete(self):
self.policy.enforce(self.context, 'delete_image', {})
self.policy.enforce(self.context, 'delete_image', self.target)
return self.image.delete()
def get_data(self, *args, **kwargs):
target = ImageTarget(self.image)
self.policy.enforce(self.context, 'download_image',
target=target)
self.policy.enforce(self.context, 'download_image', self.target)
return self.image.get_data(*args, **kwargs)
def set_data(self, *args, **kwargs):
self.policy.enforce(self.context, 'upload_image', {})
self.policy.enforce(self.context, 'upload_image', self.target)
return self.image.set_data(*args, **kwargs)
def get_member_repo(self, **kwargs):
@ -210,27 +216,28 @@ class ImageMemberRepoProxy(glance.domain.proxy.Repo):
def __init__(self, member_repo, context, policy):
self.member_repo = member_repo
self.target = ImageTarget(self.member_repo.image)
self.context = context
self.policy = policy
def add(self, member):
self.policy.enforce(self.context, 'add_member', {})
self.policy.enforce(self.context, 'add_member', self.target)
self.member_repo.add(member)
def get(self, member_id):
self.policy.enforce(self.context, 'get_member', {})
self.policy.enforce(self.context, 'get_member', self.target)
return self.member_repo.get(member_id)
def save(self, member, from_state=None):
self.policy.enforce(self.context, 'modify_member', {})
self.policy.enforce(self.context, 'modify_member', self.target)
self.member_repo.save(member, from_state=from_state)
def list(self, *args, **kwargs):
self.policy.enforce(self.context, 'get_members', {})
self.policy.enforce(self.context, 'get_members', self.target)
return self.member_repo.list(*args, **kwargs)
def remove(self, member):
self.policy.enforce(self.context, 'delete_member', {})
self.policy.enforce(self.context, 'delete_member', self.target)
self.member_repo.remove(member)
@ -356,31 +363,38 @@ class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
class ImageTarget(object):
SENTINEL = object()
def __init__(self, image):
"""
Initialize the object
def __init__(self, target):
"""Initialize the object
:param image: Image object
:param target: Object being targetted
"""
self.image = image
self.target = target
def __getitem__(self, key):
"""
Returns the value of 'key' from the image if image has that attribute
else tries to retrieve value from the extra_properties of image.
"""Return the value of 'key' from the target.
If the target has the attribute 'key', return it.
:param key: value to retrieve
"""
# Need to change the key 'id' to 'image_id' as Image object has
# attribute as 'image_id' in case of V2.
key = self.key_transforms(key)
value = getattr(self.target, key, self.SENTINEL)
if value is self.SENTINEL:
extra_properties = getattr(self.target, 'extra_properties', None)
if extra_properties is not None:
value = extra_properties[key]
else:
value = None
return value
def key_transforms(self, key):
if key == 'id':
key = 'image_id'
if hasattr(self.image, key):
return getattr(self.image, key)
else:
return self.image.extra_properties[key]
return key
# Metadef Namespace classes

View File

@ -777,6 +777,290 @@ class TestImages(functional.FunctionalTest):
self.stop_servers()
def test_image_modification_works_for_owning_tenant_id(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"get_image": "",
"modify_image": "tenant:%(owner)s",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin'})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Get the image's ID
image = jsonutils.loads(response.text)
image_id = image['id']
path = self._url('/v2/images/%s' % image_id)
media_type = 'application/openstack-images-v2.1-json-patch'
headers['content-type'] = media_type
del headers['X-Roles']
data = jsonutils.dumps([
{'op': 'replace', 'path': '/name', 'value': 'new-name'},
])
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(200, response.status_code)
self.stop_servers()
def test_image_modification_fails_on_mismatched_tenant_ids(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"get_image": "",
"modify_image": "'A-Fake-Tenant-Id':%(owner)s",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin'})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Get the image's ID
image = jsonutils.loads(response.text)
image_id = image['id']
path = self._url('/v2/images/%s' % image_id)
media_type = 'application/openstack-images-v2.1-json-patch'
headers['content-type'] = media_type
del headers['X-Roles']
data = jsonutils.dumps([
{'op': 'replace', 'path': '/name', 'value': 'new-name'},
])
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(403, response.status_code)
self.stop_servers()
def test_member_additions_works_for_owning_tenant_id(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"get_image": "",
"modify_image": "",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted",
"add_member": "tenant:%(owner)s",
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin'})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Get the image's ID
image = jsonutils.loads(response.text)
image_id = image['id']
# Get the image's members resource
path = self._url('/v2/images/%s/members' % image_id)
body = jsonutils.dumps({'member': TENANT3})
del headers['X-Roles']
response = requests.post(path, headers=headers, data=body)
self.assertEqual(200, response.status_code)
self.stop_servers()
def test_image_additions_works_only_for_specific_tenant_id(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "'{0}':%(owner)s".format(TENANT1),
"get_image": "",
"modify_image": "",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted",
"add_member": "",
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin', 'X-Tenant-Id': TENANT1})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
headers['X-Tenant-Id'] = TENANT2
response = requests.post(path, headers=headers, data=data)
self.assertEqual(403, response.status_code)
self.stop_servers()
def test_owning_tenant_id_can_retrieve_image_information(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"get_image": "tenant:%(owner)s",
"modify_image": "",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted",
"add_member": "",
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin', 'X-Tenant-Id': TENANT1})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Remove the admin role
del headers['X-Roles']
# Get the image's ID
image = jsonutils.loads(response.text)
image_id = image['id']
# Can retrieve the image as TENANT1
path = self._url('/v2/images/%s' % image_id)
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code)
# Can retrieve the image's members as TENANT1
path = self._url('/v2/images/%s/members' % image_id)
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code)
headers['X-Tenant-Id'] = TENANT2
response = requests.get(path, headers=headers)
self.assertEqual(403, response.status_code)
self.stop_servers()
def test_owning_tenant_can_publicize_image(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"publicize_iamge": "tenant:%(owner)s",
"get_image": "tenant:%(owner)s",
"modify_image": "",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted",
"add_member": "",
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin', 'X-Tenant-Id': TENANT1})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Get the image's ID
image = jsonutils.loads(response.text)
image_id = image['id']
path = self._url('/v2/images/%s' % image_id)
headers = self._headers({
'Content-Type': 'application/openstack-images-v2.1-json-patch',
'X-Tenant-Id': TENANT1,
})
doc = [{'op': 'replace', 'path': '/visibility', 'value': 'public'}]
data = jsonutils.dumps(doc)
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(200, response.status_code)
def test_owning_tenant_can_delete_image(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"publicize_iamge": "tenant:%(owner)s",
"get_image": "tenant:%(owner)s",
"modify_image": "",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted",
"add_member": "",
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin', 'X-Tenant-Id': TENANT1})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Get the image's ID
image = jsonutils.loads(response.text)
image_id = image['id']
path = self._url('/v2/images/%s' % image_id)
response = requests.delete(path, headers=headers)
self.assertEqual(204, response.status_code)
def test_image_size_cap(self):
self.api_server.image_size_cap = 128
self.start_servers(**self.__dict__.copy())

View File

@ -72,6 +72,8 @@ class ImageFactoryStub(object):
class MemberRepoStub(object):
image = None
def add(self, image_member):
image_member.output = 'member_repo_add'
@ -207,41 +209,53 @@ class TestImagePolicy(test_utils.BaseTestCase):
self.assertRaises(exception.Forbidden,
setattr, image, 'visibility', 'public')
self.assertEqual('private', image.visibility)
self.policy.enforce.assert_called_once_with({}, "publicize_image", {})
self.policy.enforce.assert_called_once_with({}, "publicize_image",
image.target)
def test_publicize_image_allowed(self):
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
image.visibility = 'public'
self.assertEqual('public', image.visibility)
self.policy.enforce.assert_called_once_with({}, "publicize_image", {})
self.policy.enforce.assert_called_once_with({}, "publicize_image",
image.target)
def test_delete_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
self.assertRaises(exception.Forbidden, image.delete)
self.assertEqual('active', image.status)
self.policy.enforce.assert_called_once_with({}, "delete_image", {})
self.policy.enforce.assert_called_once_with({}, "delete_image",
image.target)
def test_delete_image_allowed(self):
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
image.delete()
self.assertEqual('deleted', image.status)
self.policy.enforce.assert_called_once_with({}, "delete_image", {})
self.policy.enforce.assert_called_once_with({}, "delete_image",
image.target)
def test_get_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
{}, self.policy)
self.assertRaises(exception.Forbidden, image_repo.get, UUID1)
self.policy.enforce.assert_called_once_with({}, "get_image", {})
image_target = mock.Mock()
with mock.patch.object(glance.api.policy, 'ImageTarget') as target:
target.return_value = image_target
image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
{}, self.policy)
self.assertRaises(exception.Forbidden, image_repo.get, UUID1)
self.policy.enforce.assert_called_once_with({}, "get_image",
image_target)
def test_get_image_allowed(self):
image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
{}, self.policy)
output = image_repo.get(UUID1)
image_target = mock.Mock()
with mock.patch.object(glance.api.policy, 'ImageTarget') as target:
target.return_value = image_target
image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
{}, self.policy)
output = image_repo.get(UUID1)
self.assertIsInstance(output, glance.api.policy.ImageProxy)
self.assertEqual('image_from_get', output.image)
self.policy.enforce.assert_called_once_with({}, "get_image", {})
self.policy.enforce.assert_called_once_with({}, "get_image",
image_target)
def test_get_images_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
@ -265,14 +279,16 @@ class TestImagePolicy(test_utils.BaseTestCase):
{}, self.policy)
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
self.assertRaises(exception.Forbidden, image_repo.save, image)
self.policy.enforce.assert_called_once_with({}, "modify_image", {})
self.policy.enforce.assert_called_once_with({}, "modify_image",
image.target)
def test_modify_image_allowed(self):
image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
{}, self.policy)
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
image_repo.save(image)
self.policy.enforce.assert_called_once_with({}, "modify_image", {})
self.policy.enforce.assert_called_once_with({}, "modify_image",
image.target)
def test_add_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
@ -280,14 +296,16 @@ class TestImagePolicy(test_utils.BaseTestCase):
{}, self.policy)
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
self.assertRaises(exception.Forbidden, image_repo.add, image)
self.policy.enforce.assert_called_once_with({}, "add_image", {})
self.policy.enforce.assert_called_once_with({}, "add_image",
image.target)
def test_add_image_allowed(self):
image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
{}, self.policy)
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
image_repo.add(image)
self.policy.enforce.assert_called_once_with({}, "add_image", {})
self.policy.enforce.assert_called_once_with({}, "add_image",
image.target)
def test_new_image_visibility(self):
self.policy.enforce.side_effect = exception.Forbidden
@ -308,20 +326,21 @@ class TestImagePolicy(test_utils.BaseTestCase):
'test_key': 'test_4321'
}
image_stub = ImageStub(UUID1, extra_properties=extra_properties)
image = glance.api.policy.ImageProxy(image_stub, {}, self.policy)
with mock.patch('glance.api.policy.ImageTarget'):
image = glance.api.policy.ImageProxy(image_stub, {}, self.policy)
target = image.target
self.policy.enforce.side_effect = exception.Forbidden
glance.api.policy.ImageTarget = mock.Mock()
target = glance.api.policy.ImageTarget(image)
self.assertRaises(exception.Forbidden, image.get_data)
self.policy.enforce.assert_called_once_with({}, "download_image",
target=target)
target)
def test_image_set_data(self):
self.policy.enforce.side_effect = exception.Forbidden
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
self.assertRaises(exception.Forbidden, image.set_data)
self.policy.enforce.assert_called_once_with({}, "upload_image", {})
self.policy.enforce.assert_called_once_with({}, "upload_image",
image.target)
class TestMemberPolicy(test_utils.BaseTestCase):
@ -330,60 +349,71 @@ class TestMemberPolicy(test_utils.BaseTestCase):
self.policy.enforce = mock.Mock()
self.member_repo = glance.api.policy.ImageMemberRepoProxy(
MemberRepoStub(), {}, self.policy)
self.target = self.member_repo.target
super(TestMemberPolicy, self).setUp()
def test_add_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.add, '')
self.policy.enforce.assert_called_once_with({}, "add_member", {})
self.policy.enforce.assert_called_once_with({}, "add_member",
self.target)
def test_add_member_allowed(self):
image_member = ImageMembershipStub()
self.member_repo.add(image_member)
self.assertEqual('member_repo_add', image_member.output)
self.policy.enforce.assert_called_once_with({}, "add_member", {})
self.policy.enforce.assert_called_once_with({}, "add_member",
self.target)
def test_get_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.get, '')
self.policy.enforce.assert_called_once_with({}, "get_member", {})
self.policy.enforce.assert_called_once_with({}, "get_member",
self.target)
def test_get_member_allowed(self):
output = self.member_repo.get('')
self.assertEqual('member_repo_get', output)
self.policy.enforce.assert_called_once_with({}, "get_member", {})
self.policy.enforce.assert_called_once_with({}, "get_member",
self.target)
def test_modify_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.save, '')
self.policy.enforce.assert_called_once_with({}, "modify_member", {})
self.policy.enforce.assert_called_once_with({}, "modify_member",
self.target)
def test_modify_member_allowed(self):
image_member = ImageMembershipStub()
self.member_repo.save(image_member)
self.assertEqual('member_repo_save', image_member.output)
self.policy.enforce.assert_called_once_with({}, "modify_member", {})
self.policy.enforce.assert_called_once_with({}, "modify_member",
self.target)
def test_get_members_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.list, '')
self.policy.enforce.assert_called_once_with({}, "get_members", {})
self.policy.enforce.assert_called_once_with({}, "get_members",
self.target)
def test_get_members_allowed(self):
output = self.member_repo.list('')
self.assertEqual('member_repo_list', output)
self.policy.enforce.assert_called_once_with({}, "get_members", {})
self.policy.enforce.assert_called_once_with({}, "get_members",
self.target)
def test_delete_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.remove, '')
self.policy.enforce.assert_called_once_with({}, "delete_member", {})
self.policy.enforce.assert_called_once_with({}, "delete_member",
self.target)
def test_delete_member_allowed(self):
image_member = ImageMembershipStub()
self.member_repo.remove(image_member)
self.assertEqual('member_repo_remove', image_member.output)
self.policy.enforce.assert_called_once_with({}, "delete_member", {})
self.policy.enforce.assert_called_once_with({}, "delete_member",
self.target)
class TestTaskPolicy(test_utils.BaseTestCase):