Add an identity backend method to get group by name.

We already support a get_user_by_name method, but an equivalent
one for groups is missing. This is needed for enhancements to
our federation mapping engine. This patch provides this new
method.

This patch does not expose this functionality as part of the
REST API, it is only supported at the manager/driver level.

Partially implements: bp mapping-enhancements

Change-Id: I2508a99125d9aec2a66b3b48e63c9b1ed2fe12a4
This commit is contained in:
Henry Nash 2014-12-04 15:28:49 +00:00 committed by ayoung
parent 96b1326bcf
commit 7541fda1db
4 changed files with 55 additions and 0 deletions

View File

@ -129,6 +129,11 @@ class Identity(identity.Driver):
def get_group(self, group_id):
return self.group.get_filtered(group_id)
def get_group_by_name(self, group_name, domain_id):
# domain_id will already have been handled in the Manager layer,
# parameter left in so this matches the Driver specification
return self.group.get_filtered_by_name(group_name)
def update_group(self, group_id, group):
self.group.check_allow_update()
if 'name' in group:
@ -385,6 +390,10 @@ class GroupApi(common_ldap.BaseLdap):
group = self.get(group_id)
return common_ldap.filter_entity(group)
def get_filtered_by_name(self, group_name):
group = self.get_by_name(group_name)
return common_ldap.filter_entity(group)
def get_all_filtered(self, query=None):
return [common_ldap.filter_entity(group)
for group in self.get_all(query)]

View File

@ -279,6 +279,17 @@ class Identity(identity.Driver):
session = sql.get_session()
return self._get_group(session, group_id).to_dict()
def get_group_by_name(self, group_name, domain_id):
session = sql.get_session()
query = session.query(Group)
query = query.filter_by(name=group_name)
query = query.filter_by(domain_id=domain_id)
try:
group_ref = query.one()
except sql.NotFound:
raise exception.GroupNotFound(group_id=group_name)
return group_ref.to_dict()
@sql.handle_conflicts(conflict_type='group')
def update_group(self, group_id, group):
session = sql.get_session()

View File

@ -693,6 +693,14 @@ class Manager(manager.Manager):
return self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.GROUP)
@domains_configured
@exception_translated('group')
def get_group_by_name(self, group_name, domain_id):
driver = self._select_identity_driver(domain_id)
ref = driver.get_group_by_name(group_name, domain_id)
return self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.GROUP)
@notifications.updated(_GROUP)
@domains_configured
@exception_translated('group')
@ -1032,6 +1040,16 @@ class Driver(object):
"""
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def get_group_by_name(self, group_name, domain_id):
"""Get a group by name.
:returns: group_ref
:raises: keystone.exception.GroupNotFound
"""
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def update_group(self, group_id, group):
"""Updates an existing group.

View File

@ -2548,6 +2548,23 @@ class IdentityTests(object):
self.identity_api.get_group,
group['id'])
def test_get_group_by_name(self):
group_name = uuid.uuid4().hex
group = {'domain_id': DEFAULT_DOMAIN_ID, 'name': group_name}
group = self.identity_api.create_group(group)
spoiler = {'domain_id': DEFAULT_DOMAIN_ID, 'name': uuid.uuid4().hex}
self.identity_api.create_group(spoiler)
group_ref = self.identity_api.get_group_by_name(
group_name, DEFAULT_DOMAIN_ID)
self.assertDictEqual(group_ref, group)
def test_get_group_by_name_404(self):
self.assertRaises(exception.GroupNotFound,
self.identity_api.get_group_by_name,
uuid.uuid4().hex,
DEFAULT_DOMAIN_ID)
def test_create_duplicate_group_name_fails(self):
group1 = {'domain_id': DEFAULT_DOMAIN_ID, 'name': uuid.uuid4().hex}
group2 = {'domain_id': DEFAULT_DOMAIN_ID, 'name': group1['name']}