Add queries for federated attributes in list_users

This patch adds filters to list_user that enable the user to query for
unique_id, idp_id, protocol_id, or a mix of these to get back the
corresponding users of the federated attributes.

Partially-Implements: bp support-federated-attr
Change-Id: Iea5681791e521e9b8d96137fe30c388c10a02b30
This commit is contained in:
“Richard 2016-12-24 20:03:37 +00:00 committed by Richard Avelar
parent 7f2b7e58e7
commit 19c6530b1a
8 changed files with 257 additions and 2 deletions

View File

@ -205,6 +205,12 @@ group_name_query:
in: query in: query
required: false required: false
type: string type: string
idp_id_query:
description: |
Filters the response by an identity provider ID.
in: query
required: false
type: string
include_names_query: include_names_query:
description: | description: |
If set to true, then the names of any entities returned will be include as If set to true, then the names of any entities returned will be include as
@ -334,6 +340,12 @@ project_name_query:
in: query in: query
required: false required: false
type: string type: string
protocol_id_query:
description: |
Filters the response by a protocol ID.
in: query
required: false
type: string
role_id_query: role_id_query:
description: | description: |
Filters the response by a role ID. Filters the response by a role ID.
@ -398,6 +410,12 @@ subtree_as_list:
required: false required: false
type: key-only, no value expected type: key-only, no value expected
min_version: 3.4 min_version: 3.4
unique_id_query:
description: |
Filters the response by a unique ID.
in: query
required: false
type: string
user_id_query: user_id_query:
description: | description: |
Filters the response by a user ID. Filters the response by a user ID.

View File

@ -46,8 +46,11 @@ Request Parameters
- domain_id: domain_id_query - domain_id: domain_id_query
- enabled: enabled_user_query - enabled: enabled_user_query
- idp_id: idp_id_query
- name: name_user_query - name: name_user_query
- password_expires_at: password_expires_at_query - password_expires_at: password_expires_at_query
- protocol_id: protocol_id_query
- unique_id: unique_id_query
Response Parameters Response Parameters
------------------- -------------------

View File

@ -221,7 +221,8 @@ class UserV3(controller.V3Controller):
) )
return UserV3.wrap_member(request.context_dict, ref) return UserV3.wrap_member(request.context_dict, ref)
@controller.filterprotected('domain_id', 'enabled', 'name') @controller.filterprotected('domain_id', 'enabled', 'idp_id', 'name',
'protocol_id', 'unique_id')
def list_users(self, request, filters): def list_users(self, request, filters):
hints = UserV3.build_driver_hints(request, filters) hints = UserV3.build_driver_hints(request, filters)
domain = self._get_domain_id_for_list_request(request) domain = self._get_domain_id_for_list_request(request)

View File

@ -958,6 +958,13 @@ class Manager(manager.Manager):
return self._set_domain_id_and_mapping( return self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.USER) ref, domain_id, driver, mapping.EntityType.USER)
def _handle_federated_attributes_in_hints(self, driver, hints):
federated_attributes = ['idp_id', 'protocol_id', 'unique_id']
for filter_ in hints.filters:
if filter_['name'] in federated_attributes:
return self.shadow_users_api.get_federated_users(hints)
return driver.list_users(hints)
@domains_configured @domains_configured
@exception_translated('user') @exception_translated('user')
def list_users(self, domain_scope=None, hints=None): def list_users(self, domain_scope=None, hints=None):
@ -972,7 +979,7 @@ class Manager(manager.Manager):
# We are effectively satisfying any domain_id filter by the above # We are effectively satisfying any domain_id filter by the above
# driver selection, so remove any such filter. # driver selection, so remove any such filter.
self._mark_domain_id_filter_satisfied(hints) self._mark_domain_id_filter_satisfied(hints)
ref_list = driver.list_users(hints) ref_list = self._handle_federated_attributes_in_hints(driver, hints)
return self._set_domain_id_and_mapping( return self._set_domain_id_and_mapping(
ref_list, domain_scope, driver, mapping.EntityType.USER) ref_list, domain_scope, driver, mapping.EntityType.USER)

View File

@ -12,6 +12,7 @@
import copy import copy
import datetime import datetime
import sqlalchemy
import uuid import uuid
from oslo_config import cfg from oslo_config import cfg
@ -41,6 +42,36 @@ class ShadowUsers(base.ShadowUsersDriverBase):
session.add(user_ref) session.add(user_ref)
return identity_base.filter_user(user_ref.to_dict()) return identity_base.filter_user(user_ref.to_dict())
def _update_query_with_federated_statements(self, hints, query):
statements = []
for filter_ in hints.filters:
if filter_['name'] == 'idp_id':
statements.append(
model.FederatedUser.idp_id == filter_['value'])
if filter_['name'] == 'protocol_id':
statements.append(
model.FederatedUser.protocol_id == filter_['value'])
if filter_['name'] == 'unique_id':
statements.append(
model.FederatedUser.unique_id == filter_['value'])
# Remove federated attributes to prevent redundancies from
# sql.filter_limit_query which filters remaining hints
hints.filters = [
x for x in hints.filters if x['name'] not in ('idp_id',
'protocol_id',
'unique_id')]
query = query.filter(sqlalchemy.and_(*statements))
return query
def get_federated_users(self, hints):
with sql.session_for_read() as session:
query = session.query(model.User).outerjoin(model.LocalUser)
query = query.filter(model.User.id == model.FederatedUser.user_id)
query = self._update_query_with_federated_statements(hints, query)
user_refs = sql.filter_limit_query(model.User, query, hints)
return [identity_base.filter_user(x.to_dict()) for x in user_refs]
def get_federated_user(self, idp_id, protocol_id, unique_id): def get_federated_user(self, idp_id, protocol_id, unique_id):
user_ref = self._get_federated_user(idp_id, protocol_id, unique_id) user_ref = self._get_federated_user(idp_id, protocol_id, unique_id)
return identity_base.filter_user(user_ref.to_dict()) return identity_base.filter_user(user_ref.to_dict())

View File

@ -466,6 +466,89 @@ class IdentityTests(object):
self.assertNotIn('password', user_ref) self.assertNotIn('password', user_ref)
self.assertEqual(expected_user_ids, user_ids) self.assertEqual(expected_user_ids, user_ids)
def _build_hints(self, hints, filters, fed_dict):
for key in filters:
hints.add_filter(key,
fed_dict[key],
comparator='equals')
return hints
def _test_list_users_with_attribute(self, filters, fed_dict):
# Call list_users while no match exists for the federated user
hints = driver_hints.Hints()
hints = self._build_hints(hints, filters, fed_dict)
users = self.identity_api.list_users(hints=hints)
self.assertEqual(0, len(users))
# list_users with a new relational user and federated user
hints = self._build_hints(hints, filters, fed_dict)
self.shadow_users_api.create_federated_user(fed_dict)
users = self.identity_api.list_users(hints=hints)
self.assertEqual(1, len(users))
# create another federated user that shouldnt be matched and ensure
# that still only one match is found
hints = self._build_hints(hints, filters, fed_dict)
fed_dict2 = unit.new_federated_user_ref()
fed_dict2['idp_id'] = 'myidp'
fed_dict2['protocol_id'] = 'mapped'
self.shadow_users_api.create_federated_user(fed_dict2)
users = self.identity_api.list_users(hints=hints)
self.assertEqual(1, len(users))
# create another federated user that should also be matched and ensure
# that there are now two matches in the users list. Unless there is a
# unique id in the filter since unique_ids must be unique and would
# therefore cause a duplicate error.
hints = self._build_hints(hints, filters, fed_dict)
if not any('unique_id' in x['name'] for x in hints.filters):
hints = self._build_hints(hints, filters, fed_dict)
fed_dict3 = unit.new_federated_user_ref()
# check which filters are here and create another match
for filters_ in hints.filters:
if filters_['name'] == 'idp_id':
fed_dict3['idp_id'] = fed_dict['idp_id']
elif filters_['name'] == 'protocol_id':
fed_dict3['protocol_id'] = fed_dict['protocol_id']
self.shadow_users_api.create_federated_user(fed_dict3)
users = self.identity_api.list_users(hints=hints)
self.assertEqual(2, len(users))
def test_list_users_with_unique_id(self):
federated_dict = unit.new_federated_user_ref()
filters = ['unique_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_users_with_idp_id(self):
federated_dict = unit.new_federated_user_ref()
filters = ['idp_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_users_with_protocol_id(self):
federated_dict = unit.new_federated_user_ref()
filters = ['protocol_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_users_with_unique_id_and_idp_id(self):
federated_dict = unit.new_federated_user_ref()
filters = ['unique_id', 'idp_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_users_with_unique_id_and_protocol_id(self):
federated_dict = unit.new_federated_user_ref()
filters = ['unique_id', 'protocol_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_users_with_idp_id_protocol_id(self):
federated_dict = unit.new_federated_user_ref()
filters = ['idp_id', 'protocol_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_users_with_all_federated_attributes(self):
federated_dict = unit.new_federated_user_ref()
filters = ['unique_id', 'idp_id', 'protocol_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_groups(self): def test_list_groups(self):
group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)

View File

@ -13,6 +13,7 @@
# under the License. # under the License.
import datetime import datetime
import itertools
import uuid import uuid
import fixtures import fixtures
@ -24,11 +25,16 @@ from six.moves import http_client
from testtools import matchers from testtools import matchers
from keystone.common import controller from keystone.common import controller
from keystone.common import sql
import keystone.conf import keystone.conf
from keystone.credential.providers import fernet as credential_fernet from keystone.credential.providers import fernet as credential_fernet
from keystone import exception from keystone import exception
from keystone.identity.backends import base as identity_base
from keystone.identity.backends import sql_model as model
from keystone.tests import unit from keystone.tests import unit
from keystone.tests.unit import ksfixtures from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import database
from keystone.tests.unit import mapping_fixtures
from keystone.tests.unit import test_v3 from keystone.tests.unit import test_v3
@ -984,3 +990,105 @@ class PasswordValidationTestCase(ChangePasswordTestCase):
self.change_password(password='mypas2', self.change_password(password='mypas2',
original_password=self.user_ref['password'], original_password=self.user_ref['password'],
expected_status=http_client.BAD_REQUEST) expected_status=http_client.BAD_REQUEST)
class UserAPITests(test_v3.RestfulTestCase):
def _create_federated_attributes(self):
# Create the idp
idp = {
'id': uuid.uuid4().hex,
'enabled': True,
'description': uuid.uuid4().hex
}
self.federation_api.create_idp(idp['id'], idp)
# Create the mapping
mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER
mapping['id'] = uuid.uuid4().hex
self.federation_api.create_mapping(mapping['id'], mapping)
# Create the protocol
protocol = {
'id': uuid.uuid4().hex,
'mapping_id': mapping['id']
}
self.federation_api.create_protocol(idp['id'],
protocol['id'],
protocol)
return idp, protocol
def _create_user_with_federated_user(self, user, fed_dict):
with sql.session_for_write() as session:
federated_ref = model.FederatedUser.from_dict(fed_dict)
user_ref = model.User.from_dict(user)
user_ref.created_at = datetime.datetime.utcnow()
user_ref.federated_users.append(federated_ref)
session.add(user_ref)
return identity_base.filter_user(user_ref.to_dict())
def setUp(self):
super(UserAPITests, self).setUp()
self.useFixture(database.Database())
self.load_backends()
# Create the federated object
idp, protocol = self._create_federated_attributes()
self.fed_dict = unit.new_federated_user_ref()
self.fed_dict['idp_id'] = idp['id']
self.fed_dict['protocol_id'] = protocol['id']
self.fed_dict['unique_id'] = "jdoe"
# Create the domain_id, user, and federated_user relationship
self.domain = unit.new_domain_ref()
self.resource_api.create_domain(self.domain['id'], self.domain)
self.fed_user = unit.new_user_ref(domain_id=self.domain['id'])
self.fed_user = self._create_user_with_federated_user(self.fed_user,
self.fed_dict)
# Create two new fed_users which will have the same idp and protocol
# but be completely different from the first fed_user
# Create a new idp and protocol for fed_user2 and 3
idp, protocol = self._create_federated_attributes()
self.fed_dict2 = unit.new_federated_user_ref()
self.fed_dict2['idp_id'] = idp['id']
self.fed_dict2['protocol_id'] = protocol['id']
self.fed_dict2['unique_id'] = "ravelar"
self.fed_user2 = unit.new_user_ref(domain_id=self.domain['id'])
self.fed_user2 = self._create_user_with_federated_user(self.fed_user2,
self.fed_dict2)
self.fed_dict3 = unit.new_federated_user_ref()
self.fed_dict3['idp_id'] = idp['id']
self.fed_dict3['protocol_id'] = protocol['id']
self.fed_dict3['unique_id'] = "jsmith"
self.fed_user3 = unit.new_user_ref(domain_id=self.domain['id'])
self.fed_user3 = self._create_user_with_federated_user(self.fed_user3,
self.fed_dict3)
def _test_list_users_with_federated_parameter(self, parameter):
# construct the resource url based off what's passed in parameter
resource_url = ('/users?%s=%s'
% (parameter[0], self.fed_dict[parameter[0]]))
for attr in parameter[1:]:
resource_url += '&%s=%s' % (attr, self.fed_dict[attr])
r = self.get(resource_url)
# Check that only one out of 3 fed_users is matched by calling the api
# and that it is a valid response
self.assertEqual(1, len(r.result['users']))
self.assertValidUserListResponse(r, ref=self.fed_user,
resource_url=resource_url)
# Since unique_id will always return one user if matching for unique_id
# in the query, we rule out unique_id for the next tests
if not any('unique_id' in x for x in parameter):
# Check that we get two matches here since fed_user2 and fed_user3
# both have the same idp and protocol
resource_url = ('/users?%s=%s'
% (parameter[0], self.fed_dict2[parameter[0]]))
for attr in parameter[1:]:
resource_url += '&%s=%s' % (attr, self.fed_dict2[attr])
r = self.get(resource_url)
self.assertEqual(2, len(r.result['users']))
self.assertValidUserListResponse(r, ref=self.fed_user2,
resource_url=resource_url)
def test_list_user_with_all_possible_federated_queries(self):
# Create a permutation to test every possible combination of federated
# attributes in the list users query
attributes = ['idp_id', 'protocol_id', 'unique_id']
for attr in range(1, len(attributes) + 1):
for param in list(itertools.combinations(attributes, attr)):
self._test_list_users_with_federated_parameter(param)

View File

@ -0,0 +1,4 @@
---
upgrade:
- Added new filters (``idp_id``, ``protocol_id``, and ``unique_id``) for the
list user API (``GET /v3/users``).