Implement get_unique_role_by_name

Implement a convenience method in the role manager to look up a single role
by its name. This is useful for moving role-specific logic out of the
trust controller and will be useful again for application credentials
which will also need to look up roles by name.

Change-Id: I94083cd4e719f105b5555e4676e119a358d6b1c6
This commit is contained in:
Colleen Murphy 2017-12-27 23:44:32 +01:00
parent 1e21c52f3b
commit e6fb2318f1
3 changed files with 32 additions and 13 deletions

View File

@ -22,6 +22,7 @@ from oslo_log import log
from keystone.common import cache from keystone.common import cache
from keystone.common import driver_hints from keystone.common import driver_hints
from keystone.common import manager from keystone.common import manager
from keystone.common import provider_api
import keystone.conf import keystone.conf
from keystone import exception from keystone import exception
from keystone.i18n import _ from keystone.i18n import _
@ -30,6 +31,7 @@ from keystone import notifications
CONF = keystone.conf.CONF CONF = keystone.conf.CONF
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
PROVIDERS = provider_api.ProviderAPIs
# This is a general cache region for assignment administration (CRUD # This is a general cache region for assignment administration (CRUD
# operations). # operations).
@ -1245,6 +1247,21 @@ class RoleManager(manager.Manager):
def get_role(self, role_id): def get_role(self, role_id):
return self.driver.get_role(role_id) return self.driver.get_role(role_id)
def get_unique_role_by_name(self, role_name, hints=None):
if not hints:
hints = driver_hints.Hints()
hints.add_filter("name", role_name, case_sensitive=True)
found_roles = PROVIDERS.role_api.list_roles(hints)
if not found_roles:
raise exception.RoleNotFound(
_("Role %s is not defined") % role_name
)
elif len(found_roles) == 1:
return {'id': found_roles[0]['id']}
else:
raise exception.AmbiguityError(resource='role',
name=role_name)
def create_role(self, role_id, role, initiator=None): def create_role(self, role_id, role, initiator=None):
ret = self.driver.create_role(role_id, role) ret = self.driver.create_role(role_id, role)
notifications.Audit.created(self._ROLE, role_id, initiator) notifications.Audit.created(self._ROLE, role_id, initiator)

View File

@ -27,6 +27,11 @@ class RoleTests(object):
self.role_api.get_role, self.role_api.get_role,
uuid.uuid4().hex) uuid.uuid4().hex)
def test_get_unique_role_by_name_returns_not_found(self):
self.assertRaises(exception.RoleNotFound,
self.role_api.get_unique_role_by_name,
uuid.uuid4().hex)
def test_create_duplicate_role_name_fails(self): def test_create_duplicate_role_name_fails(self):
role_id = uuid.uuid4().hex role_id = uuid.uuid4().hex
role = unit.new_role_ref(id=role_id, name='fake1name') role = unit.new_role_ref(id=role_id, name='fake1name')
@ -53,11 +58,15 @@ class RoleTests(object):
def test_role_crud(self): def test_role_crud(self):
role = unit.new_role_ref() role = unit.new_role_ref()
role_name = role['name']
self.role_api.create_role(role['id'], role) self.role_api.create_role(role['id'], role)
role_ref = self.role_api.get_role(role['id']) role_ref = self.role_api.get_role(role['id'])
role_ref_dict = {x: role_ref[x] for x in role_ref} role_ref_dict = {x: role_ref[x] for x in role_ref}
self.assertDictEqual(role, role_ref_dict) self.assertDictEqual(role, role_ref_dict)
role_ref = self.role_api.get_unique_role_by_name(role_name)
self.assertEqual(role['id'], role_ref['id'])
role['name'] = uuid.uuid4().hex role['name'] = uuid.uuid4().hex
updated_role_ref = self.role_api.update_role(role['id'], role) updated_role_ref = self.role_api.update_role(role['id'], role)
role_ref = self.role_api.get_role(role['id']) role_ref = self.role_api.get_role(role['id'])

View File

@ -18,7 +18,7 @@ from oslo_utils import timeutils
from keystone import assignment from keystone import assignment
from keystone.common import controller from keystone.common import controller
from keystone.common import driver_hints from keystone.common import provider_api
from keystone.common import utils from keystone.common import utils
from keystone.common import validation from keystone.common import validation
from keystone import exception from keystone import exception
@ -26,6 +26,9 @@ from keystone.i18n import _
from keystone.trust import schema from keystone.trust import schema
PROVIDERS = provider_api.ProviderAPIs
def _trustor_trustee_only(trust, user_id): def _trustor_trustee_only(trust, user_id):
if user_id not in [trust.get('trustee_user_id'), if user_id not in [trust.get('trustee_user_id'),
trust.get('trustor_user_id')]: trust.get('trustor_user_id')]:
@ -81,18 +84,8 @@ class TrustV3(controller.V3Controller):
if role.get('id'): if role.get('id'):
roles.append({'id': role['id']}) roles.append({'id': role['id']})
else: else:
hints = driver_hints.Hints() role_api = PROVIDERS.role_api
hints.add_filter("name", role['name'], case_sensitive=True) roles.append(role_api.get_unique_role_by_name(role['name']))
found_roles = self.role_api.list_roles(hints)
if not found_roles:
raise exception.RoleNotFound(
_("Role %s is not defined") % role['name']
)
elif len(found_roles) == 1:
roles.append({'id': found_roles[0]['id']})
else:
raise exception.AmbiguityError(resource='role',
name=role['name'])
return roles return roles
def _find_redelegated_trust(self, request): def _find_redelegated_trust(self, request):