Merge "Enable filtering of credentials by user ID"

This commit is contained in:
Jenkins 2014-08-19 22:05:27 +00:00 committed by Gerrit Code Review
commit 498a0032db
4 changed files with 39 additions and 9 deletions

View File

@ -67,7 +67,7 @@
"identity:add_user_to_group": "rule:cloud_admin or rule:admin_and_matching_target_group_domain_id",
"identity:get_credential": "rule:admin_required",
"identity:list_credentials": "rule:admin_required",
"identity:list_credentials": "rule:admin_required or user_id:%(user_id)s",
"identity:create_credential": "rule:admin_required",
"identity:update_credential": "rule:admin_required",
"identity:delete_credential": "rule:admin_required",

View File

@ -16,7 +16,6 @@ import hashlib
from keystone.common import controller
from keystone.common import dependency
from keystone.common import driver_hints
from keystone import exception
from keystone.i18n import _
from keystone.openstack.common import jsonutils
@ -78,15 +77,13 @@ class CredentialV3(controller.V3Controller):
else:
return ref
@controller.protected()
def list_credentials(self, context):
# NOTE(henry-nash): Since there are no filters for credentials, we
# shouldn't limit the output, hence we don't pass a hints list into
# the driver.
refs = self.credential_api.list_credentials()
@controller.filterprotected('user_id')
def list_credentials(self, context, filters):
hints = CredentialV3.build_driver_hints(context, filters)
refs = self.credential_api.list_credentials(hints)
ret_refs = [self._blob_to_json(r) for r in refs]
return CredentialV3.wrap_collection(context, ret_refs,
driver_hints.Hints())
hints=hints)
@controller.protected()
def get_credential(self, context, credential_id):

View File

@ -90,6 +90,18 @@ class CredentialTestCase(CredentialBaseTestCase):
r = self.get('/credentials', content_type='xml')
self.assertValidCredentialListResponse(r, ref=self.credential)
def test_list_credentials_filtered_by_user_id(self):
"""Call ``GET /credentials?user_id={user_id}``."""
credential = self.new_credential_ref(
user_id=uuid.uuid4().hex)
self.credential_api.create_credential(
credential['id'], credential)
r = self.get('/credentials?user_id=%s' % self.user['id'])
self.assertValidCredentialListResponse(r, ref=self.credential)
for cred in r.result['credentials']:
self.assertEqual(self.user['id'], cred['user_id'])
def test_create_credential(self):
"""Call ``POST /credentials``."""
ref = self.new_credential_ref(user_id=self.user['id'])

View File

@ -661,3 +661,24 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase):
domain_id=self.admin_domain['id'])
self._test_domain_management()
def test_list_user_credentials(self):
self.credential_user = self.new_credential_ref(self.just_a_user['id'])
self.credential_api.create_credential(self.credential_user['id'],
self.credential_user)
self.credential_admin = self.new_credential_ref(
self.cloud_admin_user['id'])
self.credential_api.create_credential(self.credential_admin['id'],
self.credential_admin)
self.auth = self.build_authentication_request(
user_id=self.just_a_user['id'],
password=self.just_a_user['password'])
url = '/credentials?user_id=%s' % self.just_a_user['id']
self.get(url, auth=self.auth)
url = '/credentials?user_id=%s' % self.cloud_admin_user['id']
self.get(url, auth=self.auth,
expected_status=exception.ForbiddenAction.code)
url = '/credentials'
self.get(url, auth=self.auth,
expected_status=exception.ForbiddenAction.code)