Merge "Add queries for federated attributes in list_users"

This commit is contained in:
Jenkins 2017-01-24 09:56:30 +00:00 committed by Gerrit Code Review
commit f7bc5bad7d
8 changed files with 257 additions and 2 deletions

View File

@ -205,6 +205,12 @@ group_name_query:
in: query
required: false
type: string
idp_id_query:
description: |
Filters the response by an identity provider ID.
in: query
required: false
type: string
include_names_query:
description: |
If set to true, then the names of any entities returned will be include as
@ -334,6 +340,12 @@ project_name_query:
in: query
required: false
type: string
protocol_id_query:
description: |
Filters the response by a protocol ID.
in: query
required: false
type: string
role_id_query:
description: |
Filters the response by a role ID.
@ -398,6 +410,12 @@ subtree_as_list:
required: false
type: key-only, no value expected
min_version: 3.4
unique_id_query:
description: |
Filters the response by a unique ID.
in: query
required: false
type: string
user_id_query:
description: |
Filters the response by a user ID.

View File

@ -46,8 +46,11 @@ Request Parameters
- domain_id: domain_id_query
- enabled: enabled_user_query
- idp_id: idp_id_query
- name: name_user_query
- password_expires_at: password_expires_at_query
- protocol_id: protocol_id_query
- unique_id: unique_id_query
Response Parameters
-------------------

View File

@ -221,7 +221,8 @@ class UserV3(controller.V3Controller):
)
return UserV3.wrap_member(request.context_dict, ref)
@controller.filterprotected('domain_id', 'enabled', 'name')
@controller.filterprotected('domain_id', 'enabled', 'idp_id', 'name',
'protocol_id', 'unique_id')
def list_users(self, request, filters):
hints = UserV3.build_driver_hints(request, filters)
domain = self._get_domain_id_for_list_request(request)

View File

@ -958,6 +958,13 @@ class Manager(manager.Manager):
return self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.USER)
def _handle_federated_attributes_in_hints(self, driver, hints):
federated_attributes = ['idp_id', 'protocol_id', 'unique_id']
for filter_ in hints.filters:
if filter_['name'] in federated_attributes:
return self.shadow_users_api.get_federated_users(hints)
return driver.list_users(hints)
@domains_configured
@exception_translated('user')
def list_users(self, domain_scope=None, hints=None):
@ -972,7 +979,7 @@ class Manager(manager.Manager):
# We are effectively satisfying any domain_id filter by the above
# driver selection, so remove any such filter.
self._mark_domain_id_filter_satisfied(hints)
ref_list = driver.list_users(hints)
ref_list = self._handle_federated_attributes_in_hints(driver, hints)
return self._set_domain_id_and_mapping(
ref_list, domain_scope, driver, mapping.EntityType.USER)

View File

@ -12,6 +12,7 @@
import copy
import datetime
import sqlalchemy
import uuid
from oslo_config import cfg
@ -41,6 +42,36 @@ class ShadowUsers(base.ShadowUsersDriverBase):
session.add(user_ref)
return identity_base.filter_user(user_ref.to_dict())
def _update_query_with_federated_statements(self, hints, query):
statements = []
for filter_ in hints.filters:
if filter_['name'] == 'idp_id':
statements.append(
model.FederatedUser.idp_id == filter_['value'])
if filter_['name'] == 'protocol_id':
statements.append(
model.FederatedUser.protocol_id == filter_['value'])
if filter_['name'] == 'unique_id':
statements.append(
model.FederatedUser.unique_id == filter_['value'])
# Remove federated attributes to prevent redundancies from
# sql.filter_limit_query which filters remaining hints
hints.filters = [
x for x in hints.filters if x['name'] not in ('idp_id',
'protocol_id',
'unique_id')]
query = query.filter(sqlalchemy.and_(*statements))
return query
def get_federated_users(self, hints):
with sql.session_for_read() as session:
query = session.query(model.User).outerjoin(model.LocalUser)
query = query.filter(model.User.id == model.FederatedUser.user_id)
query = self._update_query_with_federated_statements(hints, query)
user_refs = sql.filter_limit_query(model.User, query, hints)
return [identity_base.filter_user(x.to_dict()) for x in user_refs]
def get_federated_user(self, idp_id, protocol_id, unique_id):
user_ref = self._get_federated_user(idp_id, protocol_id, unique_id)
return identity_base.filter_user(user_ref.to_dict())

View File

@ -466,6 +466,89 @@ class IdentityTests(object):
self.assertNotIn('password', user_ref)
self.assertEqual(expected_user_ids, user_ids)
def _build_hints(self, hints, filters, fed_dict):
for key in filters:
hints.add_filter(key,
fed_dict[key],
comparator='equals')
return hints
def _test_list_users_with_attribute(self, filters, fed_dict):
# Call list_users while no match exists for the federated user
hints = driver_hints.Hints()
hints = self._build_hints(hints, filters, fed_dict)
users = self.identity_api.list_users(hints=hints)
self.assertEqual(0, len(users))
# list_users with a new relational user and federated user
hints = self._build_hints(hints, filters, fed_dict)
self.shadow_users_api.create_federated_user(fed_dict)
users = self.identity_api.list_users(hints=hints)
self.assertEqual(1, len(users))
# create another federated user that shouldnt be matched and ensure
# that still only one match is found
hints = self._build_hints(hints, filters, fed_dict)
fed_dict2 = unit.new_federated_user_ref()
fed_dict2['idp_id'] = 'myidp'
fed_dict2['protocol_id'] = 'mapped'
self.shadow_users_api.create_federated_user(fed_dict2)
users = self.identity_api.list_users(hints=hints)
self.assertEqual(1, len(users))
# create another federated user that should also be matched and ensure
# that there are now two matches in the users list. Unless there is a
# unique id in the filter since unique_ids must be unique and would
# therefore cause a duplicate error.
hints = self._build_hints(hints, filters, fed_dict)
if not any('unique_id' in x['name'] for x in hints.filters):
hints = self._build_hints(hints, filters, fed_dict)
fed_dict3 = unit.new_federated_user_ref()
# check which filters are here and create another match
for filters_ in hints.filters:
if filters_['name'] == 'idp_id':
fed_dict3['idp_id'] = fed_dict['idp_id']
elif filters_['name'] == 'protocol_id':
fed_dict3['protocol_id'] = fed_dict['protocol_id']
self.shadow_users_api.create_federated_user(fed_dict3)
users = self.identity_api.list_users(hints=hints)
self.assertEqual(2, len(users))
def test_list_users_with_unique_id(self):
federated_dict = unit.new_federated_user_ref()
filters = ['unique_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_users_with_idp_id(self):
federated_dict = unit.new_federated_user_ref()
filters = ['idp_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_users_with_protocol_id(self):
federated_dict = unit.new_federated_user_ref()
filters = ['protocol_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_users_with_unique_id_and_idp_id(self):
federated_dict = unit.new_federated_user_ref()
filters = ['unique_id', 'idp_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_users_with_unique_id_and_protocol_id(self):
federated_dict = unit.new_federated_user_ref()
filters = ['unique_id', 'protocol_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_users_with_idp_id_protocol_id(self):
federated_dict = unit.new_federated_user_ref()
filters = ['idp_id', 'protocol_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_users_with_all_federated_attributes(self):
federated_dict = unit.new_federated_user_ref()
filters = ['unique_id', 'idp_id', 'protocol_id']
self._test_list_users_with_attribute(filters, federated_dict)
def test_list_groups(self):
group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)

View File

@ -13,6 +13,7 @@
# under the License.
import datetime
import itertools
import uuid
import fixtures
@ -24,11 +25,16 @@ from six.moves import http_client
from testtools import matchers
from keystone.common import controller
from keystone.common import sql
import keystone.conf
from keystone.credential.providers import fernet as credential_fernet
from keystone import exception
from keystone.identity.backends import base as identity_base
from keystone.identity.backends import sql_model as model
from keystone.tests import unit
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import database
from keystone.tests.unit import mapping_fixtures
from keystone.tests.unit import test_v3
@ -1023,3 +1029,105 @@ class PasswordValidationTestCase(ChangePasswordTestCase):
self.change_password(password='mypas2',
original_password=self.user_ref['password'],
expected_status=http_client.BAD_REQUEST)
class UserAPITests(test_v3.RestfulTestCase):
def _create_federated_attributes(self):
# Create the idp
idp = {
'id': uuid.uuid4().hex,
'enabled': True,
'description': uuid.uuid4().hex
}
self.federation_api.create_idp(idp['id'], idp)
# Create the mapping
mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER
mapping['id'] = uuid.uuid4().hex
self.federation_api.create_mapping(mapping['id'], mapping)
# Create the protocol
protocol = {
'id': uuid.uuid4().hex,
'mapping_id': mapping['id']
}
self.federation_api.create_protocol(idp['id'],
protocol['id'],
protocol)
return idp, protocol
def _create_user_with_federated_user(self, user, fed_dict):
with sql.session_for_write() as session:
federated_ref = model.FederatedUser.from_dict(fed_dict)
user_ref = model.User.from_dict(user)
user_ref.created_at = datetime.datetime.utcnow()
user_ref.federated_users.append(federated_ref)
session.add(user_ref)
return identity_base.filter_user(user_ref.to_dict())
def setUp(self):
super(UserAPITests, self).setUp()
self.useFixture(database.Database())
self.load_backends()
# Create the federated object
idp, protocol = self._create_federated_attributes()
self.fed_dict = unit.new_federated_user_ref()
self.fed_dict['idp_id'] = idp['id']
self.fed_dict['protocol_id'] = protocol['id']
self.fed_dict['unique_id'] = "jdoe"
# Create the domain_id, user, and federated_user relationship
self.domain = unit.new_domain_ref()
self.resource_api.create_domain(self.domain['id'], self.domain)
self.fed_user = unit.new_user_ref(domain_id=self.domain['id'])
self.fed_user = self._create_user_with_federated_user(self.fed_user,
self.fed_dict)
# Create two new fed_users which will have the same idp and protocol
# but be completely different from the first fed_user
# Create a new idp and protocol for fed_user2 and 3
idp, protocol = self._create_federated_attributes()
self.fed_dict2 = unit.new_federated_user_ref()
self.fed_dict2['idp_id'] = idp['id']
self.fed_dict2['protocol_id'] = protocol['id']
self.fed_dict2['unique_id'] = "ravelar"
self.fed_user2 = unit.new_user_ref(domain_id=self.domain['id'])
self.fed_user2 = self._create_user_with_federated_user(self.fed_user2,
self.fed_dict2)
self.fed_dict3 = unit.new_federated_user_ref()
self.fed_dict3['idp_id'] = idp['id']
self.fed_dict3['protocol_id'] = protocol['id']
self.fed_dict3['unique_id'] = "jsmith"
self.fed_user3 = unit.new_user_ref(domain_id=self.domain['id'])
self.fed_user3 = self._create_user_with_federated_user(self.fed_user3,
self.fed_dict3)
def _test_list_users_with_federated_parameter(self, parameter):
# construct the resource url based off what's passed in parameter
resource_url = ('/users?%s=%s'
% (parameter[0], self.fed_dict[parameter[0]]))
for attr in parameter[1:]:
resource_url += '&%s=%s' % (attr, self.fed_dict[attr])
r = self.get(resource_url)
# Check that only one out of 3 fed_users is matched by calling the api
# and that it is a valid response
self.assertEqual(1, len(r.result['users']))
self.assertValidUserListResponse(r, ref=self.fed_user,
resource_url=resource_url)
# Since unique_id will always return one user if matching for unique_id
# in the query, we rule out unique_id for the next tests
if not any('unique_id' in x for x in parameter):
# Check that we get two matches here since fed_user2 and fed_user3
# both have the same idp and protocol
resource_url = ('/users?%s=%s'
% (parameter[0], self.fed_dict2[parameter[0]]))
for attr in parameter[1:]:
resource_url += '&%s=%s' % (attr, self.fed_dict2[attr])
r = self.get(resource_url)
self.assertEqual(2, len(r.result['users']))
self.assertValidUserListResponse(r, ref=self.fed_user2,
resource_url=resource_url)
def test_list_user_with_all_possible_federated_queries(self):
# Create a permutation to test every possible combination of federated
# attributes in the list users query
attributes = ['idp_id', 'protocol_id', 'unique_id']
for attr in range(1, len(attributes) + 1):
for param in list(itertools.combinations(attributes, attr)):
self._test_list_users_with_federated_parameter(param)

View File

@ -0,0 +1,4 @@
---
upgrade:
- Added new filters (``idp_id``, ``protocol_id``, and ``unique_id``) for the
list user API (``GET /v3/users``).