e3f55e7b54
The table relationship between 'user' and 'nonlocal_user' should be 1 to 1, which is consistent with 'user' to 'local_user'. However, it's mistakenly 1 to many. In fact, the backend code treats 'user' to 'nonlocal_user' as 1:1 and wouldn't allow duplicates, so this will have zero impact on existing deployments. This patch fixes this by making the user_id column unique. Closes-Bug: #1649412 Partially-Implements: bp support-federated-attr Change-Id: Ib371df18f3fb2c67e5421cf0bf4551183902cf00
1442 lines
64 KiB
Python
1442 lines
64 KiB
Python
# Copyright 2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import datetime
|
|
import uuid
|
|
|
|
import mock
|
|
from six.moves import range
|
|
from testtools import matchers
|
|
|
|
from keystone.common import driver_hints
|
|
from keystone.common import sql
|
|
import keystone.conf
|
|
from keystone import exception
|
|
from keystone.identity.backends import sql_model as model
|
|
from keystone.tests import unit
|
|
from keystone.tests.unit import default_fixtures
|
|
from keystone.tests.unit import filtering
|
|
|
|
|
|
CONF = keystone.conf.CONF
|
|
|
|
|
|
class IdentityTests(object):
|
|
|
|
def _get_domain_fixture(self):
|
|
domain = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain['id'], domain)
|
|
return domain
|
|
|
|
def _set_domain_scope(self, domain_id):
|
|
# We only provide a domain scope if we have multiple drivers
|
|
if CONF.identity.domain_specific_drivers_enabled:
|
|
return domain_id
|
|
|
|
def test_authenticate_bad_user(self):
|
|
self.assertRaises(AssertionError,
|
|
self.identity_api.authenticate,
|
|
self.make_request(),
|
|
user_id=uuid.uuid4().hex,
|
|
password=self.user_foo['password'])
|
|
|
|
def test_authenticate_bad_password(self):
|
|
self.assertRaises(AssertionError,
|
|
self.identity_api.authenticate,
|
|
self.make_request(),
|
|
user_id=self.user_foo['id'],
|
|
password=uuid.uuid4().hex)
|
|
|
|
def test_authenticate(self):
|
|
user_ref = self.identity_api.authenticate(
|
|
self.make_request(),
|
|
user_id=self.user_sna['id'],
|
|
password=self.user_sna['password'])
|
|
# NOTE(termie): the password field is left in user_sna to make
|
|
# it easier to authenticate in tests, but should
|
|
# not be returned by the api
|
|
self.user_sna.pop('password')
|
|
self.user_sna['enabled'] = True
|
|
self.assertDictEqual(self.user_sna, user_ref)
|
|
|
|
def test_authenticate_and_get_roles_no_metadata(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
|
|
# Remove user id. It is ignored by create_user() and will break the
|
|
# subset test below.
|
|
del user['id']
|
|
|
|
new_user = self.identity_api.create_user(user)
|
|
self.assignment_api.add_user_to_project(self.tenant_baz['id'],
|
|
new_user['id'])
|
|
user_ref = self.identity_api.authenticate(
|
|
self.make_request(),
|
|
user_id=new_user['id'],
|
|
password=user['password'])
|
|
self.assertNotIn('password', user_ref)
|
|
# NOTE(termie): the password field is left in user_sna to make
|
|
# it easier to authenticate in tests, but should
|
|
# not be returned by the api
|
|
user.pop('password')
|
|
self.assertDictContainsSubset(user, user_ref)
|
|
role_list = self.assignment_api.get_roles_for_user_and_project(
|
|
new_user['id'], self.tenant_baz['id'])
|
|
self.assertEqual(1, len(role_list))
|
|
self.assertIn(CONF.member_role_id, role_list)
|
|
|
|
def test_authenticate_if_no_password_set(self):
|
|
id_ = uuid.uuid4().hex
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
self.identity_api.create_user(user)
|
|
|
|
self.assertRaises(AssertionError,
|
|
self.identity_api.authenticate,
|
|
self.make_request(),
|
|
user_id=id_,
|
|
password='password')
|
|
|
|
def test_create_unicode_user_name(self):
|
|
unicode_name = u'name \u540d\u5b57'
|
|
user = unit.new_user_ref(name=unicode_name,
|
|
domain_id=CONF.identity.default_domain_id)
|
|
ref = self.identity_api.create_user(user)
|
|
self.assertEqual(unicode_name, ref['name'])
|
|
|
|
def test_get_user(self):
|
|
user_ref = self.identity_api.get_user(self.user_foo['id'])
|
|
# NOTE(termie): the password field is left in user_foo to make
|
|
# it easier to authenticate in tests, but should
|
|
# not be returned by the api
|
|
self.user_foo.pop('password')
|
|
self.assertDictEqual(self.user_foo, user_ref)
|
|
|
|
def test_get_user_returns_required_attributes(self):
|
|
user_ref = self.identity_api.get_user(self.user_foo['id'])
|
|
self.assertIn('id', user_ref)
|
|
self.assertIn('name', user_ref)
|
|
self.assertIn('enabled', user_ref)
|
|
self.assertIn('password_expires_at', user_ref)
|
|
|
|
@unit.skip_if_cache_disabled('identity')
|
|
def test_cache_layer_get_user(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
self.identity_api.create_user(user)
|
|
ref = self.identity_api.get_user_by_name(user['name'],
|
|
user['domain_id'])
|
|
# cache the result.
|
|
self.identity_api.get_user(ref['id'])
|
|
# delete bypassing identity api
|
|
domain_id, driver, entity_id = (
|
|
self.identity_api._get_domain_driver_and_entity_id(ref['id']))
|
|
driver.delete_user(entity_id)
|
|
|
|
self.assertDictEqual(ref, self.identity_api.get_user(ref['id']))
|
|
self.identity_api.get_user.invalidate(self.identity_api, ref['id'])
|
|
self.assertRaises(exception.UserNotFound,
|
|
self.identity_api.get_user, ref['id'])
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
user = self.identity_api.create_user(user)
|
|
ref = self.identity_api.get_user_by_name(user['name'],
|
|
user['domain_id'])
|
|
user['description'] = uuid.uuid4().hex
|
|
# cache the result.
|
|
self.identity_api.get_user(ref['id'])
|
|
# update using identity api and get back updated user.
|
|
user_updated = self.identity_api.update_user(ref['id'], user)
|
|
self.assertDictContainsSubset(self.identity_api.get_user(ref['id']),
|
|
user_updated)
|
|
self.assertDictContainsSubset(
|
|
self.identity_api.get_user_by_name(ref['name'], ref['domain_id']),
|
|
user_updated)
|
|
|
|
def test_get_user_returns_not_found(self):
|
|
self.assertRaises(exception.UserNotFound,
|
|
self.identity_api.get_user,
|
|
uuid.uuid4().hex)
|
|
|
|
def test_get_user_by_name(self):
|
|
user_ref = self.identity_api.get_user_by_name(
|
|
self.user_foo['name'], CONF.identity.default_domain_id)
|
|
# NOTE(termie): the password field is left in user_foo to make
|
|
# it easier to authenticate in tests, but should
|
|
# not be returned by the api
|
|
self.user_foo.pop('password')
|
|
self.assertDictEqual(self.user_foo, user_ref)
|
|
|
|
@unit.skip_if_cache_disabled('identity')
|
|
def test_cache_layer_get_user_by_name(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
self.identity_api.create_user(user)
|
|
ref = self.identity_api.get_user_by_name(user['name'],
|
|
user['domain_id'])
|
|
# delete bypassing the identity api.
|
|
domain_id, driver, entity_id = (
|
|
self.identity_api._get_domain_driver_and_entity_id(ref['id']))
|
|
driver.delete_user(entity_id)
|
|
|
|
self.assertDictEqual(ref, self.identity_api.get_user_by_name(
|
|
user['name'], CONF.identity.default_domain_id))
|
|
self.identity_api.get_user_by_name.invalidate(
|
|
self.identity_api, user['name'], CONF.identity.default_domain_id)
|
|
self.assertRaises(exception.UserNotFound,
|
|
self.identity_api.get_user_by_name,
|
|
user['name'], CONF.identity.default_domain_id)
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
user = self.identity_api.create_user(user)
|
|
ref = self.identity_api.get_user_by_name(user['name'],
|
|
user['domain_id'])
|
|
user['description'] = uuid.uuid4().hex
|
|
user_updated = self.identity_api.update_user(ref['id'], user)
|
|
self.assertDictContainsSubset(self.identity_api.get_user(ref['id']),
|
|
user_updated)
|
|
self.assertDictContainsSubset(
|
|
self.identity_api.get_user_by_name(ref['name'], ref['domain_id']),
|
|
user_updated)
|
|
|
|
def test_get_user_by_name_returns_not_found(self):
|
|
self.assertRaises(exception.UserNotFound,
|
|
self.identity_api.get_user_by_name,
|
|
uuid.uuid4().hex,
|
|
CONF.identity.default_domain_id)
|
|
|
|
def test_create_duplicate_user_name_fails(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
user = self.identity_api.create_user(user)
|
|
self.assertRaises(exception.Conflict,
|
|
self.identity_api.create_user,
|
|
user)
|
|
|
|
def test_create_duplicate_user_name_in_different_domains(self):
|
|
new_domain = unit.new_domain_ref()
|
|
self.resource_api.create_domain(new_domain['id'], new_domain)
|
|
user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
|
|
user2 = unit.new_user_ref(name=user1['name'],
|
|
domain_id=new_domain['id'])
|
|
|
|
self.identity_api.create_user(user1)
|
|
self.identity_api.create_user(user2)
|
|
|
|
def test_move_user_between_domains(self):
|
|
domain1 = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain1['id'], domain1)
|
|
domain2 = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain2['id'], domain2)
|
|
user = unit.new_user_ref(domain_id=domain1['id'])
|
|
user = self.identity_api.create_user(user)
|
|
user['domain_id'] = domain2['id']
|
|
# Update the user asserting that a deprecation warning is emitted
|
|
with mock.patch(
|
|
'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
|
|
self.identity_api.update_user(user['id'], user)
|
|
self.assertTrue(mock_dep.called)
|
|
|
|
updated_user_ref = self.identity_api.get_user(user['id'])
|
|
self.assertEqual(domain2['id'], updated_user_ref['domain_id'])
|
|
|
|
def test_move_user_between_domains_with_clashing_names_fails(self):
|
|
domain1 = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain1['id'], domain1)
|
|
domain2 = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain2['id'], domain2)
|
|
# First, create a user in domain1
|
|
user1 = unit.new_user_ref(domain_id=domain1['id'])
|
|
user1 = self.identity_api.create_user(user1)
|
|
# Now create a user in domain2 with a potentially clashing
|
|
# name - which should work since we have domain separation
|
|
user2 = unit.new_user_ref(name=user1['name'],
|
|
domain_id=domain2['id'])
|
|
user2 = self.identity_api.create_user(user2)
|
|
# Now try and move user1 into the 2nd domain - which should
|
|
# fail since the names clash
|
|
user1['domain_id'] = domain2['id']
|
|
self.assertRaises(exception.Conflict,
|
|
self.identity_api.update_user,
|
|
user1['id'],
|
|
user1)
|
|
|
|
def test_rename_duplicate_user_name_fails(self):
|
|
user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
user2 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
self.identity_api.create_user(user1)
|
|
user2 = self.identity_api.create_user(user2)
|
|
user2['name'] = user1['name']
|
|
self.assertRaises(exception.Conflict,
|
|
self.identity_api.update_user,
|
|
user2['id'],
|
|
user2)
|
|
|
|
def test_update_user_id_fails(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
user = self.identity_api.create_user(user)
|
|
original_id = user['id']
|
|
user['id'] = 'fake2'
|
|
self.assertRaises(exception.ValidationError,
|
|
self.identity_api.update_user,
|
|
original_id,
|
|
user)
|
|
user_ref = self.identity_api.get_user(original_id)
|
|
self.assertEqual(original_id, user_ref['id'])
|
|
self.assertRaises(exception.UserNotFound,
|
|
self.identity_api.get_user,
|
|
'fake2')
|
|
|
|
def test_delete_user_with_group_project_domain_links(self):
|
|
role1 = unit.new_role_ref()
|
|
self.role_api.create_role(role1['id'], role1)
|
|
domain1 = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain1['id'], domain1)
|
|
project1 = unit.new_project_ref(domain_id=domain1['id'])
|
|
self.resource_api.create_project(project1['id'], project1)
|
|
user1 = unit.new_user_ref(domain_id=domain1['id'])
|
|
user1 = self.identity_api.create_user(user1)
|
|
group1 = unit.new_group_ref(domain_id=domain1['id'])
|
|
group1 = self.identity_api.create_group(group1)
|
|
self.assignment_api.create_grant(user_id=user1['id'],
|
|
project_id=project1['id'],
|
|
role_id=role1['id'])
|
|
self.assignment_api.create_grant(user_id=user1['id'],
|
|
domain_id=domain1['id'],
|
|
role_id=role1['id'])
|
|
self.identity_api.add_user_to_group(user_id=user1['id'],
|
|
group_id=group1['id'])
|
|
roles_ref = self.assignment_api.list_grants(
|
|
user_id=user1['id'],
|
|
project_id=project1['id'])
|
|
self.assertEqual(1, len(roles_ref))
|
|
roles_ref = self.assignment_api.list_grants(
|
|
user_id=user1['id'],
|
|
domain_id=domain1['id'])
|
|
self.assertEqual(1, len(roles_ref))
|
|
self.identity_api.check_user_in_group(
|
|
user_id=user1['id'],
|
|
group_id=group1['id'])
|
|
self.identity_api.delete_user(user1['id'])
|
|
self.assertRaises(exception.NotFound,
|
|
self.identity_api.check_user_in_group,
|
|
user1['id'],
|
|
group1['id'])
|
|
|
|
def test_delete_group_with_user_project_domain_links(self):
|
|
role1 = unit.new_role_ref()
|
|
self.role_api.create_role(role1['id'], role1)
|
|
domain1 = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain1['id'], domain1)
|
|
project1 = unit.new_project_ref(domain_id=domain1['id'])
|
|
self.resource_api.create_project(project1['id'], project1)
|
|
user1 = unit.new_user_ref(domain_id=domain1['id'])
|
|
user1 = self.identity_api.create_user(user1)
|
|
group1 = unit.new_group_ref(domain_id=domain1['id'])
|
|
group1 = self.identity_api.create_group(group1)
|
|
|
|
self.assignment_api.create_grant(group_id=group1['id'],
|
|
project_id=project1['id'],
|
|
role_id=role1['id'])
|
|
self.assignment_api.create_grant(group_id=group1['id'],
|
|
domain_id=domain1['id'],
|
|
role_id=role1['id'])
|
|
self.identity_api.add_user_to_group(user_id=user1['id'],
|
|
group_id=group1['id'])
|
|
roles_ref = self.assignment_api.list_grants(
|
|
group_id=group1['id'],
|
|
project_id=project1['id'])
|
|
self.assertEqual(1, len(roles_ref))
|
|
roles_ref = self.assignment_api.list_grants(
|
|
group_id=group1['id'],
|
|
domain_id=domain1['id'])
|
|
self.assertEqual(1, len(roles_ref))
|
|
self.identity_api.check_user_in_group(
|
|
user_id=user1['id'],
|
|
group_id=group1['id'])
|
|
self.identity_api.delete_group(group1['id'])
|
|
self.identity_api.get_user(user1['id'])
|
|
|
|
def test_update_user_returns_not_found(self):
|
|
user_id = uuid.uuid4().hex
|
|
self.assertRaises(exception.UserNotFound,
|
|
self.identity_api.update_user,
|
|
user_id,
|
|
{'id': user_id,
|
|
'domain_id': CONF.identity.default_domain_id})
|
|
|
|
def test_delete_user_returns_not_found(self):
|
|
self.assertRaises(exception.UserNotFound,
|
|
self.identity_api.delete_user,
|
|
uuid.uuid4().hex)
|
|
|
|
def test_create_user_long_name_fails(self):
|
|
user = unit.new_user_ref(name='a' * 256,
|
|
domain_id=CONF.identity.default_domain_id)
|
|
self.assertRaises(exception.ValidationError,
|
|
self.identity_api.create_user,
|
|
user)
|
|
|
|
def test_create_user_blank_name_fails(self):
|
|
user = unit.new_user_ref(name='',
|
|
domain_id=CONF.identity.default_domain_id)
|
|
self.assertRaises(exception.ValidationError,
|
|
self.identity_api.create_user,
|
|
user)
|
|
|
|
def test_create_user_missed_password(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
user = self.identity_api.create_user(user)
|
|
self.identity_api.get_user(user['id'])
|
|
# Make sure the user is not allowed to login
|
|
# with a password that is empty string or None
|
|
self.assertRaises(AssertionError,
|
|
self.identity_api.authenticate,
|
|
self.make_request(),
|
|
user_id=user['id'],
|
|
password='')
|
|
self.assertRaises(AssertionError,
|
|
self.identity_api.authenticate,
|
|
self.make_request(),
|
|
user_id=user['id'],
|
|
password=None)
|
|
|
|
def test_create_user_none_password(self):
|
|
user = unit.new_user_ref(password=None,
|
|
domain_id=CONF.identity.default_domain_id)
|
|
user = self.identity_api.create_user(user)
|
|
self.identity_api.get_user(user['id'])
|
|
# Make sure the user is not allowed to login
|
|
# with a password that is empty string or None
|
|
self.assertRaises(AssertionError,
|
|
self.identity_api.authenticate,
|
|
self.make_request(),
|
|
user_id=user['id'],
|
|
password='')
|
|
self.assertRaises(AssertionError,
|
|
self.identity_api.authenticate,
|
|
self.make_request(),
|
|
user_id=user['id'],
|
|
password=None)
|
|
|
|
def test_create_user_invalid_name_fails(self):
|
|
user = unit.new_user_ref(name=None,
|
|
domain_id=CONF.identity.default_domain_id)
|
|
self.assertRaises(exception.ValidationError,
|
|
self.identity_api.create_user,
|
|
user)
|
|
|
|
user = unit.new_user_ref(name=123,
|
|
domain_id=CONF.identity.default_domain_id)
|
|
self.assertRaises(exception.ValidationError,
|
|
self.identity_api.create_user,
|
|
user)
|
|
|
|
def test_create_user_invalid_enabled_type_string(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id,
|
|
# invalid string value
|
|
enabled='true')
|
|
self.assertRaises(exception.ValidationError,
|
|
self.identity_api.create_user,
|
|
user)
|
|
|
|
def test_update_user_long_name_fails(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
user = self.identity_api.create_user(user)
|
|
user['name'] = 'a' * 256
|
|
self.assertRaises(exception.ValidationError,
|
|
self.identity_api.update_user,
|
|
user['id'],
|
|
user)
|
|
|
|
def test_list_users(self):
|
|
users = self.identity_api.list_users(
|
|
domain_scope=self._set_domain_scope(
|
|
CONF.identity.default_domain_id))
|
|
self.assertEqual(len(default_fixtures.USERS), len(users))
|
|
user_ids = set(user['id'] for user in users)
|
|
expected_user_ids = set(getattr(self, 'user_%s' % user['id'])['id']
|
|
for user in default_fixtures.USERS)
|
|
for user_ref in users:
|
|
self.assertNotIn('password', user_ref)
|
|
self.assertEqual(expected_user_ids, user_ids)
|
|
|
|
def test_list_groups(self):
|
|
group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
|
|
group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
|
|
group1 = self.identity_api.create_group(group1)
|
|
group2 = self.identity_api.create_group(group2)
|
|
groups = self.identity_api.list_groups(
|
|
domain_scope=self._set_domain_scope(
|
|
CONF.identity.default_domain_id))
|
|
self.assertEqual(2, len(groups))
|
|
group_ids = []
|
|
for group in groups:
|
|
group_ids.append(group.get('id'))
|
|
self.assertIn(group1['id'], group_ids)
|
|
self.assertIn(group2['id'], group_ids)
|
|
|
|
def test_create_user_doesnt_modify_passed_in_dict(self):
|
|
new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
original_user = new_user.copy()
|
|
self.identity_api.create_user(new_user)
|
|
self.assertDictEqual(original_user, new_user)
|
|
|
|
def test_update_user_enable(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
user = self.identity_api.create_user(user)
|
|
user_ref = self.identity_api.get_user(user['id'])
|
|
self.assertTrue(user_ref['enabled'])
|
|
|
|
user['enabled'] = False
|
|
self.identity_api.update_user(user['id'], user)
|
|
user_ref = self.identity_api.get_user(user['id'])
|
|
self.assertEqual(user['enabled'], user_ref['enabled'])
|
|
|
|
# If not present, enabled field should not be updated
|
|
del user['enabled']
|
|
self.identity_api.update_user(user['id'], user)
|
|
user_ref = self.identity_api.get_user(user['id'])
|
|
self.assertFalse(user_ref['enabled'])
|
|
|
|
user['enabled'] = True
|
|
self.identity_api.update_user(user['id'], user)
|
|
user_ref = self.identity_api.get_user(user['id'])
|
|
self.assertEqual(user['enabled'], user_ref['enabled'])
|
|
|
|
del user['enabled']
|
|
self.identity_api.update_user(user['id'], user)
|
|
user_ref = self.identity_api.get_user(user['id'])
|
|
self.assertTrue(user_ref['enabled'])
|
|
|
|
# Integers are valid Python's booleans. Explicitly test it.
|
|
user['enabled'] = 0
|
|
self.identity_api.update_user(user['id'], user)
|
|
user_ref = self.identity_api.get_user(user['id'])
|
|
self.assertFalse(user_ref['enabled'])
|
|
|
|
# Any integers other than 0 are interpreted as True
|
|
user['enabled'] = -42
|
|
self.identity_api.update_user(user['id'], user)
|
|
user_ref = self.identity_api.get_user(user['id'])
|
|
# NOTE(breton): below, attribute `enabled` is explicitly tested to be
|
|
# equal True. assertTrue should not be used, because it converts
|
|
# the passed value to bool().
|
|
self.assertIs(True, user_ref['enabled'])
|
|
|
|
def test_update_user_name(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
user = self.identity_api.create_user(user)
|
|
user_ref = self.identity_api.get_user(user['id'])
|
|
self.assertEqual(user['name'], user_ref['name'])
|
|
|
|
changed_name = user_ref['name'] + '_changed'
|
|
user_ref['name'] = changed_name
|
|
updated_user = self.identity_api.update_user(user_ref['id'], user_ref)
|
|
|
|
# NOTE(dstanek): the SQL backend adds an 'extra' field containing a
|
|
# dictionary of the extra fields in addition to the
|
|
# fields in the object. For the details see:
|
|
# SqlIdentity.test_update_project_returns_extra
|
|
updated_user.pop('extra', None)
|
|
|
|
self.assertDictEqual(user_ref, updated_user)
|
|
|
|
user_ref = self.identity_api.get_user(user_ref['id'])
|
|
self.assertEqual(changed_name, user_ref['name'])
|
|
|
|
def test_add_user_to_group(self):
|
|
domain = self._get_domain_fixture()
|
|
new_group = unit.new_group_ref(domain_id=domain['id'])
|
|
new_group = self.identity_api.create_group(new_group)
|
|
new_user = unit.new_user_ref(domain_id=domain['id'])
|
|
new_user = self.identity_api.create_user(new_user)
|
|
self.identity_api.add_user_to_group(new_user['id'],
|
|
new_group['id'])
|
|
groups = self.identity_api.list_groups_for_user(new_user['id'])
|
|
|
|
found = False
|
|
for x in groups:
|
|
if (x['id'] == new_group['id']):
|
|
found = True
|
|
self.assertTrue(found)
|
|
|
|
def test_add_user_to_group_returns_not_found(self):
|
|
domain = self._get_domain_fixture()
|
|
new_user = unit.new_user_ref(domain_id=domain['id'])
|
|
new_user = self.identity_api.create_user(new_user)
|
|
self.assertRaises(exception.GroupNotFound,
|
|
self.identity_api.add_user_to_group,
|
|
new_user['id'],
|
|
uuid.uuid4().hex)
|
|
|
|
new_group = unit.new_group_ref(domain_id=domain['id'])
|
|
new_group = self.identity_api.create_group(new_group)
|
|
self.assertRaises(exception.UserNotFound,
|
|
self.identity_api.add_user_to_group,
|
|
uuid.uuid4().hex,
|
|
new_group['id'])
|
|
|
|
self.assertRaises(exception.NotFound,
|
|
self.identity_api.add_user_to_group,
|
|
uuid.uuid4().hex,
|
|
uuid.uuid4().hex)
|
|
|
|
def test_check_user_in_group(self):
|
|
domain = self._get_domain_fixture()
|
|
new_group = unit.new_group_ref(domain_id=domain['id'])
|
|
new_group = self.identity_api.create_group(new_group)
|
|
new_user = unit.new_user_ref(domain_id=domain['id'])
|
|
new_user = self.identity_api.create_user(new_user)
|
|
self.identity_api.add_user_to_group(new_user['id'],
|
|
new_group['id'])
|
|
self.identity_api.check_user_in_group(new_user['id'], new_group['id'])
|
|
|
|
def test_check_user_not_in_group(self):
|
|
new_group = unit.new_group_ref(
|
|
domain_id=CONF.identity.default_domain_id)
|
|
new_group = self.identity_api.create_group(new_group)
|
|
|
|
new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
new_user = self.identity_api.create_user(new_user)
|
|
|
|
self.assertRaises(exception.NotFound,
|
|
self.identity_api.check_user_in_group,
|
|
new_user['id'],
|
|
new_group['id'])
|
|
|
|
def test_check_user_in_group_returns_not_found(self):
|
|
new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
new_user = self.identity_api.create_user(new_user)
|
|
|
|
new_group = unit.new_group_ref(
|
|
domain_id=CONF.identity.default_domain_id)
|
|
new_group = self.identity_api.create_group(new_group)
|
|
|
|
self.assertRaises(exception.UserNotFound,
|
|
self.identity_api.check_user_in_group,
|
|
uuid.uuid4().hex,
|
|
new_group['id'])
|
|
|
|
self.assertRaises(exception.GroupNotFound,
|
|
self.identity_api.check_user_in_group,
|
|
new_user['id'],
|
|
uuid.uuid4().hex)
|
|
|
|
self.assertRaises(exception.NotFound,
|
|
self.identity_api.check_user_in_group,
|
|
uuid.uuid4().hex,
|
|
uuid.uuid4().hex)
|
|
|
|
def test_list_users_in_group(self):
|
|
domain = self._get_domain_fixture()
|
|
new_group = unit.new_group_ref(domain_id=domain['id'])
|
|
new_group = self.identity_api.create_group(new_group)
|
|
# Make sure we get an empty list back on a new group, not an error.
|
|
user_refs = self.identity_api.list_users_in_group(new_group['id'])
|
|
self.assertEqual([], user_refs)
|
|
# Make sure we get the correct users back once they have been added
|
|
# to the group.
|
|
new_user = unit.new_user_ref(domain_id=domain['id'])
|
|
new_user = self.identity_api.create_user(new_user)
|
|
self.identity_api.add_user_to_group(new_user['id'],
|
|
new_group['id'])
|
|
user_refs = self.identity_api.list_users_in_group(new_group['id'])
|
|
found = False
|
|
for x in user_refs:
|
|
if (x['id'] == new_user['id']):
|
|
found = True
|
|
self.assertNotIn('password', x)
|
|
self.assertTrue(found)
|
|
|
|
def test_list_users_in_group_returns_not_found(self):
|
|
self.assertRaises(exception.GroupNotFound,
|
|
self.identity_api.list_users_in_group,
|
|
uuid.uuid4().hex)
|
|
|
|
def test_list_groups_for_user(self):
|
|
domain = self._get_domain_fixture()
|
|
test_groups = []
|
|
test_users = []
|
|
GROUP_COUNT = 3
|
|
USER_COUNT = 2
|
|
|
|
for x in range(0, USER_COUNT):
|
|
new_user = unit.new_user_ref(domain_id=domain['id'])
|
|
new_user = self.identity_api.create_user(new_user)
|
|
test_users.append(new_user)
|
|
positive_user = test_users[0]
|
|
negative_user = test_users[1]
|
|
|
|
for x in range(0, USER_COUNT):
|
|
group_refs = self.identity_api.list_groups_for_user(
|
|
test_users[x]['id'])
|
|
self.assertEqual(0, len(group_refs))
|
|
|
|
for x in range(0, GROUP_COUNT):
|
|
before_count = x
|
|
after_count = x + 1
|
|
new_group = unit.new_group_ref(domain_id=domain['id'])
|
|
new_group = self.identity_api.create_group(new_group)
|
|
test_groups.append(new_group)
|
|
|
|
# add the user to the group and ensure that the
|
|
# group count increases by one for each
|
|
group_refs = self.identity_api.list_groups_for_user(
|
|
positive_user['id'])
|
|
self.assertEqual(before_count, len(group_refs))
|
|
self.identity_api.add_user_to_group(
|
|
positive_user['id'],
|
|
new_group['id'])
|
|
group_refs = self.identity_api.list_groups_for_user(
|
|
positive_user['id'])
|
|
self.assertEqual(after_count, len(group_refs))
|
|
|
|
# Make sure the group count for the unrelated user did not change
|
|
group_refs = self.identity_api.list_groups_for_user(
|
|
negative_user['id'])
|
|
self.assertEqual(0, len(group_refs))
|
|
|
|
# remove the user from each group and ensure that
|
|
# the group count reduces by one for each
|
|
for x in range(0, 3):
|
|
before_count = GROUP_COUNT - x
|
|
after_count = GROUP_COUNT - x - 1
|
|
group_refs = self.identity_api.list_groups_for_user(
|
|
positive_user['id'])
|
|
self.assertEqual(before_count, len(group_refs))
|
|
self.identity_api.remove_user_from_group(
|
|
positive_user['id'],
|
|
test_groups[x]['id'])
|
|
group_refs = self.identity_api.list_groups_for_user(
|
|
positive_user['id'])
|
|
self.assertEqual(after_count, len(group_refs))
|
|
# Make sure the group count for the unrelated user
|
|
# did not change
|
|
group_refs = self.identity_api.list_groups_for_user(
|
|
negative_user['id'])
|
|
self.assertEqual(0, len(group_refs))
|
|
|
|
def test_remove_user_from_group(self):
|
|
domain = self._get_domain_fixture()
|
|
new_group = unit.new_group_ref(domain_id=domain['id'])
|
|
new_group = self.identity_api.create_group(new_group)
|
|
new_user = unit.new_user_ref(domain_id=domain['id'])
|
|
new_user = self.identity_api.create_user(new_user)
|
|
self.identity_api.add_user_to_group(new_user['id'],
|
|
new_group['id'])
|
|
groups = self.identity_api.list_groups_for_user(new_user['id'])
|
|
self.assertIn(new_group['id'], [x['id'] for x in groups])
|
|
self.identity_api.remove_user_from_group(new_user['id'],
|
|
new_group['id'])
|
|
groups = self.identity_api.list_groups_for_user(new_user['id'])
|
|
self.assertNotIn(new_group['id'], [x['id'] for x in groups])
|
|
|
|
def test_remove_user_from_group_returns_not_found(self):
|
|
domain = self._get_domain_fixture()
|
|
new_user = unit.new_user_ref(domain_id=domain['id'])
|
|
new_user = self.identity_api.create_user(new_user)
|
|
new_group = unit.new_group_ref(domain_id=domain['id'])
|
|
new_group = self.identity_api.create_group(new_group)
|
|
self.assertRaises(exception.GroupNotFound,
|
|
self.identity_api.remove_user_from_group,
|
|
new_user['id'],
|
|
uuid.uuid4().hex)
|
|
|
|
self.assertRaises(exception.UserNotFound,
|
|
self.identity_api.remove_user_from_group,
|
|
uuid.uuid4().hex,
|
|
new_group['id'])
|
|
|
|
self.assertRaises(exception.NotFound,
|
|
self.identity_api.remove_user_from_group,
|
|
uuid.uuid4().hex,
|
|
uuid.uuid4().hex)
|
|
|
|
def test_group_crud(self):
|
|
domain = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain['id'], domain)
|
|
group = unit.new_group_ref(domain_id=domain['id'])
|
|
group = self.identity_api.create_group(group)
|
|
group_ref = self.identity_api.get_group(group['id'])
|
|
self.assertDictContainsSubset(group, group_ref)
|
|
|
|
group['name'] = uuid.uuid4().hex
|
|
self.identity_api.update_group(group['id'], group)
|
|
group_ref = self.identity_api.get_group(group['id'])
|
|
self.assertDictContainsSubset(group, group_ref)
|
|
|
|
self.identity_api.delete_group(group['id'])
|
|
self.assertRaises(exception.GroupNotFound,
|
|
self.identity_api.get_group,
|
|
group['id'])
|
|
|
|
def test_create_group_name_with_trailing_whitespace(self):
|
|
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
|
|
group_name = group['name'] = (group['name'] + ' ')
|
|
group_returned = self.identity_api.create_group(group)
|
|
self.assertEqual(group_returned['name'], group_name.strip())
|
|
|
|
def test_update_group_name_with_trailing_whitespace(self):
|
|
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
|
|
group_create = self.identity_api.create_group(group)
|
|
group_name = group['name'] = (group['name'] + ' ')
|
|
group_update = self.identity_api.update_group(group_create['id'],
|
|
group)
|
|
self.assertEqual(group_update['id'], group_create['id'])
|
|
self.assertEqual(group_update['name'], group_name.strip())
|
|
|
|
def test_get_group_by_name(self):
|
|
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
|
|
group_name = group['name']
|
|
group = self.identity_api.create_group(group)
|
|
spoiler = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
|
|
self.identity_api.create_group(spoiler)
|
|
|
|
group_ref = self.identity_api.get_group_by_name(
|
|
group_name, CONF.identity.default_domain_id)
|
|
self.assertDictEqual(group, group_ref)
|
|
|
|
def test_get_group_by_name_returns_not_found(self):
|
|
self.assertRaises(exception.GroupNotFound,
|
|
self.identity_api.get_group_by_name,
|
|
uuid.uuid4().hex,
|
|
CONF.identity.default_domain_id)
|
|
|
|
@unit.skip_if_cache_disabled('identity')
|
|
def test_cache_layer_group_crud(self):
|
|
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
|
|
group = self.identity_api.create_group(group)
|
|
# cache the result
|
|
group_ref = self.identity_api.get_group(group['id'])
|
|
# delete the group bypassing identity api.
|
|
domain_id, driver, entity_id = (
|
|
self.identity_api._get_domain_driver_and_entity_id(group['id']))
|
|
driver.delete_group(entity_id)
|
|
|
|
self.assertEqual(group_ref, self.identity_api.get_group(group['id']))
|
|
self.identity_api.get_group.invalidate(self.identity_api, group['id'])
|
|
self.assertRaises(exception.GroupNotFound,
|
|
self.identity_api.get_group, group['id'])
|
|
|
|
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
|
|
group = self.identity_api.create_group(group)
|
|
# cache the result
|
|
self.identity_api.get_group(group['id'])
|
|
group['name'] = uuid.uuid4().hex
|
|
group_ref = self.identity_api.update_group(group['id'], group)
|
|
# after updating through identity api, get updated group
|
|
self.assertDictContainsSubset(self.identity_api.get_group(group['id']),
|
|
group_ref)
|
|
|
|
def test_create_duplicate_group_name_fails(self):
|
|
group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
|
|
group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id,
|
|
name=group1['name'])
|
|
group1 = self.identity_api.create_group(group1)
|
|
self.assertRaises(exception.Conflict,
|
|
self.identity_api.create_group,
|
|
group2)
|
|
|
|
def test_create_duplicate_group_name_in_different_domains(self):
|
|
new_domain = unit.new_domain_ref()
|
|
self.resource_api.create_domain(new_domain['id'], new_domain)
|
|
group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
|
|
group2 = unit.new_group_ref(domain_id=new_domain['id'],
|
|
name=group1['name'])
|
|
group1 = self.identity_api.create_group(group1)
|
|
group2 = self.identity_api.create_group(group2)
|
|
|
|
def test_move_group_between_domains(self):
|
|
domain1 = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain1['id'], domain1)
|
|
domain2 = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain2['id'], domain2)
|
|
group = unit.new_group_ref(domain_id=domain1['id'])
|
|
group = self.identity_api.create_group(group)
|
|
group['domain_id'] = domain2['id']
|
|
# Update the group asserting that a deprecation warning is emitted
|
|
with mock.patch(
|
|
'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
|
|
self.identity_api.update_group(group['id'], group)
|
|
self.assertTrue(mock_dep.called)
|
|
|
|
updated_group_ref = self.identity_api.get_group(group['id'])
|
|
self.assertEqual(domain2['id'], updated_group_ref['domain_id'])
|
|
|
|
def test_move_group_between_domains_with_clashing_names_fails(self):
|
|
domain1 = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain1['id'], domain1)
|
|
domain2 = unit.new_domain_ref()
|
|
self.resource_api.create_domain(domain2['id'], domain2)
|
|
# First, create a group in domain1
|
|
group1 = unit.new_group_ref(domain_id=domain1['id'])
|
|
group1 = self.identity_api.create_group(group1)
|
|
# Now create a group in domain2 with a potentially clashing
|
|
# name - which should work since we have domain separation
|
|
group2 = unit.new_group_ref(name=group1['name'],
|
|
domain_id=domain2['id'])
|
|
group2 = self.identity_api.create_group(group2)
|
|
# Now try and move group1 into the 2nd domain - which should
|
|
# fail since the names clash
|
|
group1['domain_id'] = domain2['id']
|
|
self.assertRaises(exception.Conflict,
|
|
self.identity_api.update_group,
|
|
group1['id'],
|
|
group1)
|
|
|
|
def test_user_crud(self):
|
|
user_dict = unit.new_user_ref(
|
|
domain_id=CONF.identity.default_domain_id)
|
|
del user_dict['id']
|
|
user = self.identity_api.create_user(user_dict)
|
|
user_ref = self.identity_api.get_user(user['id'])
|
|
del user_dict['password']
|
|
user_ref_dict = {x: user_ref[x] for x in user_ref}
|
|
self.assertDictContainsSubset(user_dict, user_ref_dict)
|
|
|
|
user_dict['password'] = uuid.uuid4().hex
|
|
self.identity_api.update_user(user['id'], user_dict)
|
|
user_ref = self.identity_api.get_user(user['id'])
|
|
del user_dict['password']
|
|
user_ref_dict = {x: user_ref[x] for x in user_ref}
|
|
self.assertDictContainsSubset(user_dict, user_ref_dict)
|
|
|
|
self.identity_api.delete_user(user['id'])
|
|
self.assertRaises(exception.UserNotFound,
|
|
self.identity_api.get_user,
|
|
user['id'])
|
|
|
|
def test_arbitrary_attributes_are_returned_from_create_user(self):
|
|
attr_value = uuid.uuid4().hex
|
|
user_data = unit.new_user_ref(
|
|
domain_id=CONF.identity.default_domain_id,
|
|
arbitrary_attr=attr_value)
|
|
|
|
user = self.identity_api.create_user(user_data)
|
|
|
|
self.assertEqual(attr_value, user['arbitrary_attr'])
|
|
|
|
def test_arbitrary_attributes_are_returned_from_get_user(self):
|
|
attr_value = uuid.uuid4().hex
|
|
user_data = unit.new_user_ref(
|
|
domain_id=CONF.identity.default_domain_id,
|
|
arbitrary_attr=attr_value)
|
|
|
|
user_data = self.identity_api.create_user(user_data)
|
|
|
|
user = self.identity_api.get_user(user_data['id'])
|
|
self.assertEqual(attr_value, user['arbitrary_attr'])
|
|
|
|
def test_new_arbitrary_attributes_are_returned_from_update_user(self):
|
|
user_data = unit.new_user_ref(
|
|
domain_id=CONF.identity.default_domain_id)
|
|
|
|
user = self.identity_api.create_user(user_data)
|
|
attr_value = uuid.uuid4().hex
|
|
user['arbitrary_attr'] = attr_value
|
|
updated_user = self.identity_api.update_user(user['id'], user)
|
|
|
|
self.assertEqual(attr_value, updated_user['arbitrary_attr'])
|
|
|
|
def test_updated_arbitrary_attributes_are_returned_from_update_user(self):
|
|
attr_value = uuid.uuid4().hex
|
|
user_data = unit.new_user_ref(
|
|
domain_id=CONF.identity.default_domain_id,
|
|
arbitrary_attr=attr_value)
|
|
|
|
new_attr_value = uuid.uuid4().hex
|
|
user = self.identity_api.create_user(user_data)
|
|
user['arbitrary_attr'] = new_attr_value
|
|
updated_user = self.identity_api.update_user(user['id'], user)
|
|
|
|
self.assertEqual(new_attr_value, updated_user['arbitrary_attr'])
|
|
|
|
def test_user_update_and_user_get_return_same_response(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
|
|
user = self.identity_api.create_user(user)
|
|
|
|
updated_user = {'enabled': False}
|
|
updated_user_ref = self.identity_api.update_user(
|
|
user['id'], updated_user)
|
|
|
|
# SQL backend adds 'extra' field
|
|
updated_user_ref.pop('extra', None)
|
|
|
|
self.assertIs(False, updated_user_ref['enabled'])
|
|
|
|
user_ref = self.identity_api.get_user(user['id'])
|
|
self.assertDictEqual(updated_user_ref, user_ref)
|
|
|
|
@unit.skip_if_no_multiple_domains_support
|
|
def test_list_domains_filtered_and_limited(self):
|
|
# The test is designed for multiple domains only
|
|
def create_domains(domain_count, domain_name_prefix):
|
|
for _ in range(domain_count):
|
|
domain_name = '%s-%s' % (domain_name_prefix, uuid.uuid4().hex)
|
|
domain = unit.new_domain_ref(name=domain_name)
|
|
self.domain_list[domain_name] = \
|
|
self.resource_api.create_domain(domain['id'], domain)
|
|
|
|
def clean_up_domains():
|
|
for _, domain in self.domain_list.items():
|
|
domain['enabled'] = False
|
|
self.resource_api.update_domain(domain['id'], domain)
|
|
self.resource_api.delete_domain(domain['id'])
|
|
|
|
self.domain_list = {}
|
|
create_domains(2, 'domaingroup1')
|
|
create_domains(3, 'domaingroup2')
|
|
|
|
self.addCleanup(clean_up_domains)
|
|
unfiltered_domains = self.resource_api.list_domains()
|
|
|
|
# Should get back just 4 entities
|
|
self.config_fixture.config(list_limit=4)
|
|
hints = driver_hints.Hints()
|
|
entities = self.resource_api.list_domains(hints=hints)
|
|
self.assertThat(entities, matchers.HasLength(hints.limit['limit']))
|
|
self.assertTrue(hints.limit['truncated'])
|
|
|
|
# Get one exact item from the list
|
|
hints = driver_hints.Hints()
|
|
hints.add_filter('name', unfiltered_domains[3]['name'])
|
|
entities = self.resource_api.list_domains(hints=hints)
|
|
self.assertThat(entities, matchers.HasLength(1))
|
|
self.assertEqual(entities[0], unfiltered_domains[3])
|
|
|
|
# Get 2 entries
|
|
hints = driver_hints.Hints()
|
|
hints.add_filter('name', 'domaingroup1', comparator='startswith')
|
|
entities = self.resource_api.list_domains(hints=hints)
|
|
self.assertThat(entities, matchers.HasLength(2))
|
|
self.assertThat(entities[0]['name'],
|
|
matchers.StartsWith('domaingroup1'))
|
|
self.assertThat(entities[1]['name'],
|
|
matchers.StartsWith('domaingroup1'))
|
|
|
|
|
|
class FilterTests(filtering.FilterTests):
|
|
def test_list_entities_filtered(self):
|
|
for entity in ['user', 'group', 'project']:
|
|
# Create 20 entities
|
|
entity_list = self._create_test_data(entity, 20)
|
|
|
|
# Try filtering to get one an exact item out of the list
|
|
hints = driver_hints.Hints()
|
|
hints.add_filter('name', entity_list[10]['name'])
|
|
entities = self._list_entities(entity)(hints=hints)
|
|
self.assertEqual(1, len(entities))
|
|
self.assertEqual(entity_list[10]['id'], entities[0]['id'])
|
|
# Check the driver has removed the filter from the list hints
|
|
self.assertFalse(hints.get_exact_filter_by_name('name'))
|
|
self._delete_test_data(entity, entity_list)
|
|
|
|
def test_list_users_inexact_filtered(self):
|
|
# Create 20 users, some with specific names. We set the names at create
|
|
# time (rather than updating them), since the LDAP driver does not
|
|
# support name updates.
|
|
user_name_data = {
|
|
# user index: name for user
|
|
5: 'The',
|
|
6: 'The Ministry',
|
|
7: 'The Ministry of',
|
|
8: 'The Ministry of Silly',
|
|
9: 'The Ministry of Silly Walks',
|
|
# ...and one for useful case insensitivity testing
|
|
10: 'The ministry of silly walks OF'
|
|
}
|
|
user_list = self._create_test_data(
|
|
'user', 20, domain_id=CONF.identity.default_domain_id,
|
|
name_dict=user_name_data)
|
|
|
|
hints = driver_hints.Hints()
|
|
hints.add_filter('name', 'ministry', comparator='contains')
|
|
users = self.identity_api.list_users(hints=hints)
|
|
self.assertEqual(5, len(users))
|
|
self._match_with_list(users, user_list,
|
|
list_start=6, list_end=11)
|
|
# TODO(henry-nash) Check inexact filter has been removed.
|
|
|
|
hints = driver_hints.Hints()
|
|
hints.add_filter('name', 'The', comparator='startswith')
|
|
users = self.identity_api.list_users(hints=hints)
|
|
self.assertEqual(6, len(users))
|
|
self._match_with_list(users, user_list,
|
|
list_start=5, list_end=11)
|
|
# TODO(henry-nash) Check inexact filter has been removed.
|
|
|
|
hints = driver_hints.Hints()
|
|
hints.add_filter('name', 'of', comparator='endswith')
|
|
users = self.identity_api.list_users(hints=hints)
|
|
self.assertEqual(2, len(users))
|
|
# We can't assume we will get back the users in any particular order
|
|
self.assertIn(user_list[7]['id'], [users[0]['id'], users[1]['id']])
|
|
self.assertIn(user_list[10]['id'], [users[0]['id'], users[1]['id']])
|
|
# TODO(henry-nash) Check inexact filter has been removed.
|
|
|
|
# TODO(henry-nash): Add some case sensitive tests. However,
|
|
# these would be hard to validate currently, since:
|
|
#
|
|
# For SQL, the issue is that MySQL 0.7, by default, is installed in
|
|
# case insensitive mode (which is what is run by default for our
|
|
# SQL backend tests). For production deployments. OpenStack
|
|
# assumes a case sensitive database. For these tests, therefore, we
|
|
# need to be able to check the sensitivity of the database so as to
|
|
# know whether to run case sensitive tests here.
|
|
#
|
|
# For LDAP/AD, although dependent on the schema being used, attributes
|
|
# are typically configured to be case aware, but not case sensitive.
|
|
|
|
self._delete_test_data('user', user_list)
|
|
|
|
def _groups_for_user_data(self):
|
|
number_of_groups = 10
|
|
group_name_data = {
|
|
# entity index: name for entity
|
|
5: 'The',
|
|
6: 'The Ministry',
|
|
9: 'The Ministry of Silly Walks',
|
|
}
|
|
group_list = self._create_test_data(
|
|
'group', number_of_groups,
|
|
domain_id=CONF.identity.default_domain_id,
|
|
name_dict=group_name_data)
|
|
user_list = self._create_test_data('user', 2)
|
|
|
|
for group in range(7):
|
|
# Create membership, including with two out of the three groups
|
|
# with well know names
|
|
self.identity_api.add_user_to_group(user_list[0]['id'],
|
|
group_list[group]['id'])
|
|
# ...and some spoiler memberships
|
|
for group in range(7, number_of_groups):
|
|
self.identity_api.add_user_to_group(user_list[1]['id'],
|
|
group_list[group]['id'])
|
|
|
|
return group_list, user_list
|
|
|
|
def test_groups_for_user_inexact_filtered(self):
|
|
"""Test use of filtering doesn't break groups_for_user listing.
|
|
|
|
Some backends may use filtering to achieve the list of groups for a
|
|
user, so test that it can combine a second filter.
|
|
|
|
Test Plan:
|
|
|
|
- Create 10 groups, some with names we can filter on
|
|
- Create 2 users
|
|
- Assign 1 of those users to most of the groups, including some of the
|
|
well known named ones
|
|
- Assign the other user to other groups as spoilers
|
|
- Ensure that when we list groups for users with a filter on the group
|
|
name, both restrictions have been enforced on what is returned.
|
|
|
|
"""
|
|
group_list, user_list = self._groups_for_user_data()
|
|
|
|
hints = driver_hints.Hints()
|
|
hints.add_filter('name', 'Ministry', comparator='contains')
|
|
groups = self.identity_api.list_groups_for_user(
|
|
user_list[0]['id'], hints=hints)
|
|
# We should only get back one group, since of the two that contain
|
|
# 'Ministry' the user only belongs to one.
|
|
self.assertThat(len(groups), matchers.Equals(1))
|
|
self.assertEqual(group_list[6]['id'], groups[0]['id'])
|
|
|
|
hints = driver_hints.Hints()
|
|
hints.add_filter('name', 'The', comparator='startswith')
|
|
groups = self.identity_api.list_groups_for_user(
|
|
user_list[0]['id'], hints=hints)
|
|
# We should only get back 2 out of the 3 groups that start with 'The'
|
|
# hence showing that both "filters" have been applied
|
|
self.assertThat(len(groups), matchers.Equals(2))
|
|
self.assertIn(group_list[5]['id'], [groups[0]['id'], groups[1]['id']])
|
|
self.assertIn(group_list[6]['id'], [groups[0]['id'], groups[1]['id']])
|
|
|
|
hints.add_filter('name', 'The', comparator='endswith')
|
|
groups = self.identity_api.list_groups_for_user(
|
|
user_list[0]['id'], hints=hints)
|
|
# We should only get back one group since it is the only one that
|
|
# ends with 'The'
|
|
self.assertThat(len(groups), matchers.Equals(1))
|
|
self.assertEqual(group_list[5]['id'], groups[0]['id'])
|
|
|
|
self._delete_test_data('user', user_list)
|
|
self._delete_test_data('group', group_list)
|
|
|
|
def test_groups_for_user_exact_filtered(self):
|
|
"""Test exact filters doesn't break groups_for_user listing."""
|
|
group_list, user_list = self._groups_for_user_data()
|
|
hints = driver_hints.Hints()
|
|
hints.add_filter('name', 'The Ministry', comparator='equals')
|
|
groups = self.identity_api.list_groups_for_user(
|
|
user_list[0]['id'], hints=hints)
|
|
# We should only get back 1 out of the 3 groups with name 'The
|
|
# Ministry' hence showing that both "filters" have been applied.
|
|
self.assertEqual(1, len(groups))
|
|
self.assertEqual(group_list[6]['id'], groups[0]['id'])
|
|
self._delete_test_data('user', user_list)
|
|
self._delete_test_data('group', group_list)
|
|
|
|
def _get_user_name_field_size(self):
|
|
"""Return the size of the user name field for the backend.
|
|
|
|
Subclasses can override this method to indicate that the user name
|
|
field is limited in length. The user name is the field used in the test
|
|
that validates that a filter value works even if it's longer than a
|
|
field.
|
|
|
|
If the backend doesn't limit the value length then return None.
|
|
|
|
"""
|
|
return None
|
|
|
|
def test_filter_value_wider_than_field(self):
|
|
# If a filter value is given that's larger than the field in the
|
|
# backend then no values are returned.
|
|
|
|
user_name_field_size = self._get_user_name_field_size()
|
|
|
|
if user_name_field_size is None:
|
|
# The backend doesn't limit the size of the user name, so pass this
|
|
# test.
|
|
return
|
|
|
|
# Create some users just to make sure would return something if the
|
|
# filter was ignored.
|
|
self._create_test_data('user', 2)
|
|
|
|
hints = driver_hints.Hints()
|
|
value = 'A' * (user_name_field_size + 1)
|
|
hints.add_filter('name', value)
|
|
users = self.identity_api.list_users(hints=hints)
|
|
self.assertEqual([], users)
|
|
|
|
def _list_users_in_group_data(self):
|
|
number_of_users = 10
|
|
user_name_data = {
|
|
1: 'Arthur Conan Doyle',
|
|
3: 'Arthur Rimbaud',
|
|
9: 'Arthur Schopenhauer',
|
|
}
|
|
user_list = self._create_test_data(
|
|
'user', number_of_users,
|
|
domain_id=CONF.identity.default_domain_id,
|
|
name_dict=user_name_data)
|
|
group = self._create_one_entity(
|
|
'group', CONF.identity.default_domain_id, 'Great Writers')
|
|
for i in range(7):
|
|
self.identity_api.add_user_to_group(user_list[i]['id'],
|
|
group['id'])
|
|
|
|
return user_list, group
|
|
|
|
def test_list_users_in_group_inexact_filtered(self):
|
|
user_list, group = self._list_users_in_group_data()
|
|
|
|
hints = driver_hints.Hints()
|
|
hints.add_filter('name', 'Arthur', comparator='contains')
|
|
users = self.identity_api.list_users_in_group(group['id'], hints=hints)
|
|
self.assertThat(len(users), matchers.Equals(2))
|
|
self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']])
|
|
self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']])
|
|
|
|
hints = driver_hints.Hints()
|
|
hints.add_filter('name', 'Arthur', comparator='startswith')
|
|
users = self.identity_api.list_users_in_group(group['id'], hints=hints)
|
|
self.assertThat(len(users), matchers.Equals(2))
|
|
self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']])
|
|
self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']])
|
|
|
|
hints = driver_hints.Hints()
|
|
hints.add_filter('name', 'Doyle', comparator='endswith')
|
|
users = self.identity_api.list_users_in_group(group['id'], hints=hints)
|
|
self.assertThat(len(users), matchers.Equals(1))
|
|
self.assertEqual(user_list[1]['id'], users[0]['id'])
|
|
|
|
self._delete_test_data('user', user_list)
|
|
self._delete_entity('group')(group['id'])
|
|
|
|
def test_list_users_in_group_exact_filtered(self):
|
|
hints = driver_hints.Hints()
|
|
user_list, group = self._list_users_in_group_data()
|
|
hints.add_filter('name', 'Arthur Rimbaud', comparator='equals')
|
|
users = self.identity_api.list_users_in_group(group['id'], hints=hints)
|
|
self.assertEqual(1, len(users))
|
|
self.assertEqual(user_list[3]['id'], users[0]['id'])
|
|
self._delete_test_data('user', user_list)
|
|
self._delete_entity('group')(group['id'])
|
|
|
|
|
|
class LimitTests(filtering.FilterTests):
|
|
ENTITIES = ['user', 'group', 'project']
|
|
|
|
def setUp(self):
|
|
"""Setup for Limit Test Cases."""
|
|
self.entity_lists = {}
|
|
|
|
for entity in self.ENTITIES:
|
|
# Create 20 entities
|
|
self.entity_lists[entity] = self._create_test_data(entity, 20)
|
|
self.addCleanup(self.clean_up_entities)
|
|
|
|
def clean_up_entities(self):
|
|
"""Clean up entity test data from Limit Test Cases."""
|
|
for entity in self.ENTITIES:
|
|
self._delete_test_data(entity, self.entity_lists[entity])
|
|
del self.entity_lists
|
|
|
|
def _test_list_entity_filtered_and_limited(self, entity):
|
|
self.config_fixture.config(list_limit=10)
|
|
# Should get back just 10 entities
|
|
hints = driver_hints.Hints()
|
|
entities = self._list_entities(entity)(hints=hints)
|
|
self.assertEqual(hints.limit['limit'], len(entities))
|
|
self.assertTrue(hints.limit['truncated'])
|
|
|
|
# Override with driver specific limit
|
|
if entity == 'project':
|
|
self.config_fixture.config(group='resource', list_limit=5)
|
|
else:
|
|
self.config_fixture.config(group='identity', list_limit=5)
|
|
|
|
# Should get back just 5 users
|
|
hints = driver_hints.Hints()
|
|
entities = self._list_entities(entity)(hints=hints)
|
|
self.assertEqual(hints.limit['limit'], len(entities))
|
|
|
|
# Finally, let's pretend we want to get the full list of entities,
|
|
# even with the limits set, as part of some internal calculation.
|
|
# Calling the API without a hints list should achieve this, and
|
|
# return at least the 20 entries we created (there may be other
|
|
# entities lying around created by other tests/setup).
|
|
entities = self._list_entities(entity)()
|
|
self.assertGreaterEqual(len(entities), 20)
|
|
self._match_with_list(self.entity_lists[entity], entities)
|
|
|
|
def test_list_users_filtered_and_limited(self):
|
|
self._test_list_entity_filtered_and_limited('user')
|
|
|
|
def test_list_groups_filtered_and_limited(self):
|
|
self._test_list_entity_filtered_and_limited('group')
|
|
|
|
def test_list_projects_filtered_and_limited(self):
|
|
self._test_list_entity_filtered_and_limited('project')
|
|
|
|
|
|
class ShadowUsersTests(object):
|
|
def test_create_nonlocal_user_unique_constraint(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
user_created = self.shadow_users_api.create_nonlocal_user(user)
|
|
self.assertNotIn('password', user_created)
|
|
self.assertEqual(user_created['id'], user['id'])
|
|
self.assertEqual(user_created['domain_id'], user['domain_id'])
|
|
self.assertEqual(user_created['name'], user['name'])
|
|
new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
new_user['name'] = user['name']
|
|
self.assertRaises(exception.Conflict,
|
|
self.shadow_users_api.create_nonlocal_user,
|
|
new_user)
|
|
|
|
def test_create_nonlocal_user_does_not_create_local_user(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
new_nonlocal_user = self.shadow_users_api.create_nonlocal_user(user)
|
|
user_ref = self._get_user_ref(new_nonlocal_user['id'])
|
|
self.assertIsNone(user_ref.local_user)
|
|
|
|
def test_nonlocal_user_unique_user_id_constraint(self):
|
|
user_ref = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
user = self.shadow_users_api.create_nonlocal_user(user_ref)
|
|
# attempt to create a nonlocal_user with the same user_id
|
|
nonlocal_user = {
|
|
'domain_id': CONF.identity.default_domain_id,
|
|
'name': uuid.uuid4().hex,
|
|
'user_id': user['id']
|
|
}
|
|
self.assertRaises(sql.DBDuplicateEntry, self._add_nonlocal_user,
|
|
nonlocal_user)
|
|
|
|
def _add_nonlocal_user(self, nonlocal_user):
|
|
with sql.session_for_write() as session:
|
|
nonlocal_user_ref = model.NonLocalUser.from_dict(nonlocal_user)
|
|
session.add(nonlocal_user_ref)
|
|
|
|
def test_get_user(self):
|
|
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
|
user.pop('email')
|
|
user.pop('password')
|
|
user_created = self.shadow_users_api.create_nonlocal_user(user)
|
|
self.assertEqual(user_created['id'], user['id'])
|
|
user_found = self.shadow_users_api.get_user(user_created['id'])
|
|
self.assertItemsEqual(user_created, user_found)
|
|
|
|
def test_create_federated_user_unique_constraint(self):
|
|
federated_dict = unit.new_federated_user_ref()
|
|
user_dict = self.shadow_users_api.create_federated_user(federated_dict)
|
|
user_dict = self.shadow_users_api.get_user(user_dict["id"])
|
|
self.assertIsNotNone(user_dict["id"])
|
|
self.assertRaises(exception.Conflict,
|
|
self.shadow_users_api.create_federated_user,
|
|
federated_dict)
|
|
|
|
def test_get_federated_user(self):
|
|
federated_dict = unit.new_federated_user_ref()
|
|
user_dict_create = self.shadow_users_api.create_federated_user(
|
|
federated_dict)
|
|
user_dict_get = self.shadow_users_api.get_federated_user(
|
|
federated_dict["idp_id"],
|
|
federated_dict["protocol_id"],
|
|
federated_dict["unique_id"])
|
|
self.assertItemsEqual(user_dict_create, user_dict_get)
|
|
self.assertEqual(user_dict_create["id"], user_dict_get["id"])
|
|
|
|
def test_update_federated_user_display_name(self):
|
|
federated_dict = unit.new_federated_user_ref()
|
|
user_dict_create = self.shadow_users_api.create_federated_user(
|
|
federated_dict)
|
|
new_display_name = uuid.uuid4().hex
|
|
self.shadow_users_api.update_federated_user_display_name(
|
|
federated_dict["idp_id"],
|
|
federated_dict["protocol_id"],
|
|
federated_dict["unique_id"],
|
|
new_display_name)
|
|
user_ref = self.shadow_users_api._get_federated_user(
|
|
federated_dict["idp_id"],
|
|
federated_dict["protocol_id"],
|
|
federated_dict["unique_id"])
|
|
self.assertEqual(user_ref.federated_users[0].display_name,
|
|
new_display_name)
|
|
self.assertEqual(user_dict_create["id"], user_ref.id)
|
|
|
|
def test_set_last_active_at(self):
|
|
self.config_fixture.config(group='security_compliance',
|
|
disable_user_account_days_inactive=90)
|
|
now = datetime.datetime.utcnow().date()
|
|
user_ref = self.identity_api.authenticate(
|
|
self.make_request(),
|
|
user_id=self.user_sna['id'],
|
|
password=self.user_sna['password'])
|
|
user_ref = self._get_user_ref(user_ref['id'])
|
|
self.assertGreaterEqual(now, user_ref.last_active_at)
|
|
|
|
def test_set_last_active_at_when_config_setting_is_none(self):
|
|
self.config_fixture.config(group='security_compliance',
|
|
disable_user_account_days_inactive=None)
|
|
user_ref = self.identity_api.authenticate(
|
|
self.make_request(),
|
|
user_id=self.user_sna['id'],
|
|
password=self.user_sna['password'])
|
|
user_ref = self._get_user_ref(user_ref['id'])
|
|
self.assertIsNone(user_ref.last_active_at)
|
|
|
|
def _get_user_ref(self, user_id):
|
|
with sql.session_for_read() as session:
|
|
return session.query(model.User).get(user_id)
|