keystone/keystone/tests/unit/identity/test_backends.py

1419 lines
63 KiB
Python

# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import uuid
import mock
from six.moves import range
from testtools import matchers
from keystone.common import driver_hints
from keystone.common import sql
import keystone.conf
from keystone import exception
from keystone.identity.backends import sql_model as model
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit import filtering
CONF = keystone.conf.CONF
class IdentityTests(object):
def _get_domain_fixture(self):
domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
return domain
def _set_domain_scope(self, domain_id):
# We only provide a domain scope if we have multiple drivers
if CONF.identity.domain_specific_drivers_enabled:
return domain_id
def test_authenticate_bad_user(self):
self.assertRaises(AssertionError,
self.identity_api.authenticate,
self.make_request(),
user_id=uuid.uuid4().hex,
password=self.user_foo['password'])
def test_authenticate_bad_password(self):
self.assertRaises(AssertionError,
self.identity_api.authenticate,
self.make_request(),
user_id=self.user_foo['id'],
password=uuid.uuid4().hex)
def test_authenticate(self):
user_ref = self.identity_api.authenticate(
self.make_request(),
user_id=self.user_sna['id'],
password=self.user_sna['password'])
# NOTE(termie): the password field is left in user_sna to make
# it easier to authenticate in tests, but should
# not be returned by the api
self.user_sna.pop('password')
self.user_sna['enabled'] = True
self.assertDictEqual(self.user_sna, user_ref)
def test_authenticate_and_get_roles_no_metadata(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
# Remove user id. It is ignored by create_user() and will break the
# subset test below.
del user['id']
new_user = self.identity_api.create_user(user)
self.assignment_api.add_user_to_project(self.tenant_baz['id'],
new_user['id'])
user_ref = self.identity_api.authenticate(
self.make_request(),
user_id=new_user['id'],
password=user['password'])
self.assertNotIn('password', user_ref)
# NOTE(termie): the password field is left in user_sna to make
# it easier to authenticate in tests, but should
# not be returned by the api
user.pop('password')
self.assertDictContainsSubset(user, user_ref)
role_list = self.assignment_api.get_roles_for_user_and_project(
new_user['id'], self.tenant_baz['id'])
self.assertEqual(1, len(role_list))
self.assertIn(CONF.member_role_id, role_list)
def test_authenticate_if_no_password_set(self):
id_ = uuid.uuid4().hex
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
self.identity_api.create_user(user)
self.assertRaises(AssertionError,
self.identity_api.authenticate,
self.make_request(),
user_id=id_,
password='password')
def test_create_unicode_user_name(self):
unicode_name = u'name \u540d\u5b57'
user = unit.new_user_ref(name=unicode_name,
domain_id=CONF.identity.default_domain_id)
ref = self.identity_api.create_user(user)
self.assertEqual(unicode_name, ref['name'])
def test_get_user(self):
user_ref = self.identity_api.get_user(self.user_foo['id'])
# NOTE(termie): the password field is left in user_foo to make
# it easier to authenticate in tests, but should
# not be returned by the api
self.user_foo.pop('password')
self.assertDictEqual(self.user_foo, user_ref)
def test_get_user_returns_required_attributes(self):
user_ref = self.identity_api.get_user(self.user_foo['id'])
self.assertIn('id', user_ref)
self.assertIn('name', user_ref)
self.assertIn('enabled', user_ref)
self.assertIn('password_expires_at', user_ref)
@unit.skip_if_cache_disabled('identity')
def test_cache_layer_get_user(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
self.identity_api.create_user(user)
ref = self.identity_api.get_user_by_name(user['name'],
user['domain_id'])
# cache the result.
self.identity_api.get_user(ref['id'])
# delete bypassing identity api
domain_id, driver, entity_id = (
self.identity_api._get_domain_driver_and_entity_id(ref['id']))
driver.delete_user(entity_id)
self.assertDictEqual(ref, self.identity_api.get_user(ref['id']))
self.identity_api.get_user.invalidate(self.identity_api, ref['id'])
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user, ref['id'])
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
ref = self.identity_api.get_user_by_name(user['name'],
user['domain_id'])
user['description'] = uuid.uuid4().hex
# cache the result.
self.identity_api.get_user(ref['id'])
# update using identity api and get back updated user.
user_updated = self.identity_api.update_user(ref['id'], user)
self.assertDictContainsSubset(self.identity_api.get_user(ref['id']),
user_updated)
self.assertDictContainsSubset(
self.identity_api.get_user_by_name(ref['name'], ref['domain_id']),
user_updated)
def test_get_user_returns_not_found(self):
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
uuid.uuid4().hex)
def test_get_user_by_name(self):
user_ref = self.identity_api.get_user_by_name(
self.user_foo['name'], CONF.identity.default_domain_id)
# NOTE(termie): the password field is left in user_foo to make
# it easier to authenticate in tests, but should
# not be returned by the api
self.user_foo.pop('password')
self.assertDictEqual(self.user_foo, user_ref)
@unit.skip_if_cache_disabled('identity')
def test_cache_layer_get_user_by_name(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
self.identity_api.create_user(user)
ref = self.identity_api.get_user_by_name(user['name'],
user['domain_id'])
# delete bypassing the identity api.
domain_id, driver, entity_id = (
self.identity_api._get_domain_driver_and_entity_id(ref['id']))
driver.delete_user(entity_id)
self.assertDictEqual(ref, self.identity_api.get_user_by_name(
user['name'], CONF.identity.default_domain_id))
self.identity_api.get_user_by_name.invalidate(
self.identity_api, user['name'], CONF.identity.default_domain_id)
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user_by_name,
user['name'], CONF.identity.default_domain_id)
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
ref = self.identity_api.get_user_by_name(user['name'],
user['domain_id'])
user['description'] = uuid.uuid4().hex
user_updated = self.identity_api.update_user(ref['id'], user)
self.assertDictContainsSubset(self.identity_api.get_user(ref['id']),
user_updated)
self.assertDictContainsSubset(
self.identity_api.get_user_by_name(ref['name'], ref['domain_id']),
user_updated)
def test_get_user_by_name_returns_not_found(self):
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user_by_name,
uuid.uuid4().hex,
CONF.identity.default_domain_id)
def test_create_duplicate_user_name_fails(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
self.assertRaises(exception.Conflict,
self.identity_api.create_user,
user)
def test_create_duplicate_user_name_in_different_domains(self):
new_domain = unit.new_domain_ref()
self.resource_api.create_domain(new_domain['id'], new_domain)
user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user2 = unit.new_user_ref(name=user1['name'],
domain_id=new_domain['id'])
self.identity_api.create_user(user1)
self.identity_api.create_user(user2)
def test_move_user_between_domains(self):
domain1 = unit.new_domain_ref()
self.resource_api.create_domain(domain1['id'], domain1)
domain2 = unit.new_domain_ref()
self.resource_api.create_domain(domain2['id'], domain2)
user = unit.new_user_ref(domain_id=domain1['id'])
user = self.identity_api.create_user(user)
user['domain_id'] = domain2['id']
# Update the user asserting that a deprecation warning is emitted
with mock.patch(
'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
self.identity_api.update_user(user['id'], user)
self.assertTrue(mock_dep.called)
updated_user_ref = self.identity_api.get_user(user['id'])
self.assertEqual(domain2['id'], updated_user_ref['domain_id'])
def test_move_user_between_domains_with_clashing_names_fails(self):
domain1 = unit.new_domain_ref()
self.resource_api.create_domain(domain1['id'], domain1)
domain2 = unit.new_domain_ref()
self.resource_api.create_domain(domain2['id'], domain2)
# First, create a user in domain1
user1 = unit.new_user_ref(domain_id=domain1['id'])
user1 = self.identity_api.create_user(user1)
# Now create a user in domain2 with a potentially clashing
# name - which should work since we have domain separation
user2 = unit.new_user_ref(name=user1['name'],
domain_id=domain2['id'])
user2 = self.identity_api.create_user(user2)
# Now try and move user1 into the 2nd domain - which should
# fail since the names clash
user1['domain_id'] = domain2['id']
self.assertRaises(exception.Conflict,
self.identity_api.update_user,
user1['id'],
user1)
def test_rename_duplicate_user_name_fails(self):
user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user2 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
self.identity_api.create_user(user1)
user2 = self.identity_api.create_user(user2)
user2['name'] = user1['name']
self.assertRaises(exception.Conflict,
self.identity_api.update_user,
user2['id'],
user2)
def test_update_user_id_fails(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
original_id = user['id']
user['id'] = 'fake2'
self.assertRaises(exception.ValidationError,
self.identity_api.update_user,
original_id,
user)
user_ref = self.identity_api.get_user(original_id)
self.assertEqual(original_id, user_ref['id'])
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
'fake2')
def test_delete_user_with_group_project_domain_links(self):
role1 = unit.new_role_ref()
self.role_api.create_role(role1['id'], role1)
domain1 = unit.new_domain_ref()
self.resource_api.create_domain(domain1['id'], domain1)
project1 = unit.new_project_ref(domain_id=domain1['id'])
self.resource_api.create_project(project1['id'], project1)
user1 = unit.new_user_ref(domain_id=domain1['id'])
user1 = self.identity_api.create_user(user1)
group1 = unit.new_group_ref(domain_id=domain1['id'])
group1 = self.identity_api.create_group(group1)
self.assignment_api.create_grant(user_id=user1['id'],
project_id=project1['id'],
role_id=role1['id'])
self.assignment_api.create_grant(user_id=user1['id'],
domain_id=domain1['id'],
role_id=role1['id'])
self.identity_api.add_user_to_group(user_id=user1['id'],
group_id=group1['id'])
roles_ref = self.assignment_api.list_grants(
user_id=user1['id'],
project_id=project1['id'])
self.assertEqual(1, len(roles_ref))
roles_ref = self.assignment_api.list_grants(
user_id=user1['id'],
domain_id=domain1['id'])
self.assertEqual(1, len(roles_ref))
self.identity_api.check_user_in_group(
user_id=user1['id'],
group_id=group1['id'])
self.identity_api.delete_user(user1['id'])
self.assertRaises(exception.NotFound,
self.identity_api.check_user_in_group,
user1['id'],
group1['id'])
def test_delete_group_with_user_project_domain_links(self):
role1 = unit.new_role_ref()
self.role_api.create_role(role1['id'], role1)
domain1 = unit.new_domain_ref()
self.resource_api.create_domain(domain1['id'], domain1)
project1 = unit.new_project_ref(domain_id=domain1['id'])
self.resource_api.create_project(project1['id'], project1)
user1 = unit.new_user_ref(domain_id=domain1['id'])
user1 = self.identity_api.create_user(user1)
group1 = unit.new_group_ref(domain_id=domain1['id'])
group1 = self.identity_api.create_group(group1)
self.assignment_api.create_grant(group_id=group1['id'],
project_id=project1['id'],
role_id=role1['id'])
self.assignment_api.create_grant(group_id=group1['id'],
domain_id=domain1['id'],
role_id=role1['id'])
self.identity_api.add_user_to_group(user_id=user1['id'],
group_id=group1['id'])
roles_ref = self.assignment_api.list_grants(
group_id=group1['id'],
project_id=project1['id'])
self.assertEqual(1, len(roles_ref))
roles_ref = self.assignment_api.list_grants(
group_id=group1['id'],
domain_id=domain1['id'])
self.assertEqual(1, len(roles_ref))
self.identity_api.check_user_in_group(
user_id=user1['id'],
group_id=group1['id'])
self.identity_api.delete_group(group1['id'])
self.identity_api.get_user(user1['id'])
def test_update_user_returns_not_found(self):
user_id = uuid.uuid4().hex
self.assertRaises(exception.UserNotFound,
self.identity_api.update_user,
user_id,
{'id': user_id,
'domain_id': CONF.identity.default_domain_id})
def test_delete_user_returns_not_found(self):
self.assertRaises(exception.UserNotFound,
self.identity_api.delete_user,
uuid.uuid4().hex)
def test_create_user_long_name_fails(self):
user = unit.new_user_ref(name='a' * 256,
domain_id=CONF.identity.default_domain_id)
self.assertRaises(exception.ValidationError,
self.identity_api.create_user,
user)
def test_create_user_blank_name_fails(self):
user = unit.new_user_ref(name='',
domain_id=CONF.identity.default_domain_id)
self.assertRaises(exception.ValidationError,
self.identity_api.create_user,
user)
def test_create_user_missed_password(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
self.identity_api.get_user(user['id'])
# Make sure the user is not allowed to login
# with a password that is empty string or None
self.assertRaises(AssertionError,
self.identity_api.authenticate,
self.make_request(),
user_id=user['id'],
password='')
self.assertRaises(AssertionError,
self.identity_api.authenticate,
self.make_request(),
user_id=user['id'],
password=None)
def test_create_user_none_password(self):
user = unit.new_user_ref(password=None,
domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
self.identity_api.get_user(user['id'])
# Make sure the user is not allowed to login
# with a password that is empty string or None
self.assertRaises(AssertionError,
self.identity_api.authenticate,
self.make_request(),
user_id=user['id'],
password='')
self.assertRaises(AssertionError,
self.identity_api.authenticate,
self.make_request(),
user_id=user['id'],
password=None)
def test_create_user_invalid_name_fails(self):
user = unit.new_user_ref(name=None,
domain_id=CONF.identity.default_domain_id)
self.assertRaises(exception.ValidationError,
self.identity_api.create_user,
user)
user = unit.new_user_ref(name=123,
domain_id=CONF.identity.default_domain_id)
self.assertRaises(exception.ValidationError,
self.identity_api.create_user,
user)
def test_create_user_invalid_enabled_type_string(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id,
# invalid string value
enabled='true')
self.assertRaises(exception.ValidationError,
self.identity_api.create_user,
user)
def test_update_user_long_name_fails(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
user['name'] = 'a' * 256
self.assertRaises(exception.ValidationError,
self.identity_api.update_user,
user['id'],
user)
def test_list_users(self):
users = self.identity_api.list_users(
domain_scope=self._set_domain_scope(
CONF.identity.default_domain_id))
self.assertEqual(len(default_fixtures.USERS), len(users))
user_ids = set(user['id'] for user in users)
expected_user_ids = set(getattr(self, 'user_%s' % user['id'])['id']
for user in default_fixtures.USERS)
for user_ref in users:
self.assertNotIn('password', user_ref)
self.assertEqual(expected_user_ids, user_ids)
def test_list_groups(self):
group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group1 = self.identity_api.create_group(group1)
group2 = self.identity_api.create_group(group2)
groups = self.identity_api.list_groups(
domain_scope=self._set_domain_scope(
CONF.identity.default_domain_id))
self.assertEqual(2, len(groups))
group_ids = []
for group in groups:
group_ids.append(group.get('id'))
self.assertIn(group1['id'], group_ids)
self.assertIn(group2['id'], group_ids)
def test_create_user_doesnt_modify_passed_in_dict(self):
new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
original_user = new_user.copy()
self.identity_api.create_user(new_user)
self.assertDictEqual(original_user, new_user)
def test_update_user_enable(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
user_ref = self.identity_api.get_user(user['id'])
self.assertTrue(user_ref['enabled'])
user['enabled'] = False
self.identity_api.update_user(user['id'], user)
user_ref = self.identity_api.get_user(user['id'])
self.assertEqual(user['enabled'], user_ref['enabled'])
# If not present, enabled field should not be updated
del user['enabled']
self.identity_api.update_user(user['id'], user)
user_ref = self.identity_api.get_user(user['id'])
self.assertFalse(user_ref['enabled'])
user['enabled'] = True
self.identity_api.update_user(user['id'], user)
user_ref = self.identity_api.get_user(user['id'])
self.assertEqual(user['enabled'], user_ref['enabled'])
del user['enabled']
self.identity_api.update_user(user['id'], user)
user_ref = self.identity_api.get_user(user['id'])
self.assertTrue(user_ref['enabled'])
# Integers are valid Python's booleans. Explicitly test it.
user['enabled'] = 0
self.identity_api.update_user(user['id'], user)
user_ref = self.identity_api.get_user(user['id'])
self.assertFalse(user_ref['enabled'])
# Any integers other than 0 are interpreted as True
user['enabled'] = -42
self.identity_api.update_user(user['id'], user)
user_ref = self.identity_api.get_user(user['id'])
# NOTE(breton): below, attribute `enabled` is explicitly tested to be
# equal True. assertTrue should not be used, because it converts
# the passed value to bool().
self.assertIs(user_ref['enabled'], True)
def test_update_user_name(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
user_ref = self.identity_api.get_user(user['id'])
self.assertEqual(user['name'], user_ref['name'])
changed_name = user_ref['name'] + '_changed'
user_ref['name'] = changed_name
updated_user = self.identity_api.update_user(user_ref['id'], user_ref)
# NOTE(dstanek): the SQL backend adds an 'extra' field containing a
# dictionary of the extra fields in addition to the
# fields in the object. For the details see:
# SqlIdentity.test_update_project_returns_extra
updated_user.pop('extra', None)
self.assertDictEqual(user_ref, updated_user)
user_ref = self.identity_api.get_user(user_ref['id'])
self.assertEqual(changed_name, user_ref['name'])
def test_add_user_to_group(self):
domain = self._get_domain_fixture()
new_group = unit.new_group_ref(domain_id=domain['id'])
new_group = self.identity_api.create_group(new_group)
new_user = unit.new_user_ref(domain_id=domain['id'])
new_user = self.identity_api.create_user(new_user)
self.identity_api.add_user_to_group(new_user['id'],
new_group['id'])
groups = self.identity_api.list_groups_for_user(new_user['id'])
found = False
for x in groups:
if (x['id'] == new_group['id']):
found = True
self.assertTrue(found)
def test_add_user_to_group_returns_not_found(self):
domain = self._get_domain_fixture()
new_user = unit.new_user_ref(domain_id=domain['id'])
new_user = self.identity_api.create_user(new_user)
self.assertRaises(exception.GroupNotFound,
self.identity_api.add_user_to_group,
new_user['id'],
uuid.uuid4().hex)
new_group = unit.new_group_ref(domain_id=domain['id'])
new_group = self.identity_api.create_group(new_group)
self.assertRaises(exception.UserNotFound,
self.identity_api.add_user_to_group,
uuid.uuid4().hex,
new_group['id'])
self.assertRaises(exception.NotFound,
self.identity_api.add_user_to_group,
uuid.uuid4().hex,
uuid.uuid4().hex)
def test_check_user_in_group(self):
domain = self._get_domain_fixture()
new_group = unit.new_group_ref(domain_id=domain['id'])
new_group = self.identity_api.create_group(new_group)
new_user = unit.new_user_ref(domain_id=domain['id'])
new_user = self.identity_api.create_user(new_user)
self.identity_api.add_user_to_group(new_user['id'],
new_group['id'])
self.identity_api.check_user_in_group(new_user['id'], new_group['id'])
def test_check_user_not_in_group(self):
new_group = unit.new_group_ref(
domain_id=CONF.identity.default_domain_id)
new_group = self.identity_api.create_group(new_group)
new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
new_user = self.identity_api.create_user(new_user)
self.assertRaises(exception.NotFound,
self.identity_api.check_user_in_group,
new_user['id'],
new_group['id'])
def test_check_user_in_group_returns_not_found(self):
new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
new_user = self.identity_api.create_user(new_user)
new_group = unit.new_group_ref(
domain_id=CONF.identity.default_domain_id)
new_group = self.identity_api.create_group(new_group)
self.assertRaises(exception.UserNotFound,
self.identity_api.check_user_in_group,
uuid.uuid4().hex,
new_group['id'])
self.assertRaises(exception.GroupNotFound,
self.identity_api.check_user_in_group,
new_user['id'],
uuid.uuid4().hex)
self.assertRaises(exception.NotFound,
self.identity_api.check_user_in_group,
uuid.uuid4().hex,
uuid.uuid4().hex)
def test_list_users_in_group(self):
domain = self._get_domain_fixture()
new_group = unit.new_group_ref(domain_id=domain['id'])
new_group = self.identity_api.create_group(new_group)
# Make sure we get an empty list back on a new group, not an error.
user_refs = self.identity_api.list_users_in_group(new_group['id'])
self.assertEqual([], user_refs)
# Make sure we get the correct users back once they have been added
# to the group.
new_user = unit.new_user_ref(domain_id=domain['id'])
new_user = self.identity_api.create_user(new_user)
self.identity_api.add_user_to_group(new_user['id'],
new_group['id'])
user_refs = self.identity_api.list_users_in_group(new_group['id'])
found = False
for x in user_refs:
if (x['id'] == new_user['id']):
found = True
self.assertNotIn('password', x)
self.assertTrue(found)
def test_list_users_in_group_returns_not_found(self):
self.assertRaises(exception.GroupNotFound,
self.identity_api.list_users_in_group,
uuid.uuid4().hex)
def test_list_groups_for_user(self):
domain = self._get_domain_fixture()
test_groups = []
test_users = []
GROUP_COUNT = 3
USER_COUNT = 2
for x in range(0, USER_COUNT):
new_user = unit.new_user_ref(domain_id=domain['id'])
new_user = self.identity_api.create_user(new_user)
test_users.append(new_user)
positive_user = test_users[0]
negative_user = test_users[1]
for x in range(0, USER_COUNT):
group_refs = self.identity_api.list_groups_for_user(
test_users[x]['id'])
self.assertEqual(0, len(group_refs))
for x in range(0, GROUP_COUNT):
before_count = x
after_count = x + 1
new_group = unit.new_group_ref(domain_id=domain['id'])
new_group = self.identity_api.create_group(new_group)
test_groups.append(new_group)
# add the user to the group and ensure that the
# group count increases by one for each
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(before_count, len(group_refs))
self.identity_api.add_user_to_group(
positive_user['id'],
new_group['id'])
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(after_count, len(group_refs))
# Make sure the group count for the unrelated user did not change
group_refs = self.identity_api.list_groups_for_user(
negative_user['id'])
self.assertEqual(0, len(group_refs))
# remove the user from each group and ensure that
# the group count reduces by one for each
for x in range(0, 3):
before_count = GROUP_COUNT - x
after_count = GROUP_COUNT - x - 1
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(before_count, len(group_refs))
self.identity_api.remove_user_from_group(
positive_user['id'],
test_groups[x]['id'])
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(after_count, len(group_refs))
# Make sure the group count for the unrelated user
# did not change
group_refs = self.identity_api.list_groups_for_user(
negative_user['id'])
self.assertEqual(0, len(group_refs))
def test_remove_user_from_group(self):
domain = self._get_domain_fixture()
new_group = unit.new_group_ref(domain_id=domain['id'])
new_group = self.identity_api.create_group(new_group)
new_user = unit.new_user_ref(domain_id=domain['id'])
new_user = self.identity_api.create_user(new_user)
self.identity_api.add_user_to_group(new_user['id'],
new_group['id'])
groups = self.identity_api.list_groups_for_user(new_user['id'])
self.assertIn(new_group['id'], [x['id'] for x in groups])
self.identity_api.remove_user_from_group(new_user['id'],
new_group['id'])
groups = self.identity_api.list_groups_for_user(new_user['id'])
self.assertNotIn(new_group['id'], [x['id'] for x in groups])
def test_remove_user_from_group_returns_not_found(self):
domain = self._get_domain_fixture()
new_user = unit.new_user_ref(domain_id=domain['id'])
new_user = self.identity_api.create_user(new_user)
new_group = unit.new_group_ref(domain_id=domain['id'])
new_group = self.identity_api.create_group(new_group)
self.assertRaises(exception.GroupNotFound,
self.identity_api.remove_user_from_group,
new_user['id'],
uuid.uuid4().hex)
self.assertRaises(exception.UserNotFound,
self.identity_api.remove_user_from_group,
uuid.uuid4().hex,
new_group['id'])
self.assertRaises(exception.NotFound,
self.identity_api.remove_user_from_group,
uuid.uuid4().hex,
uuid.uuid4().hex)
def test_group_crud(self):
domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
group = unit.new_group_ref(domain_id=domain['id'])
group = self.identity_api.create_group(group)
group_ref = self.identity_api.get_group(group['id'])
self.assertDictContainsSubset(group, group_ref)
group['name'] = uuid.uuid4().hex
self.identity_api.update_group(group['id'], group)
group_ref = self.identity_api.get_group(group['id'])
self.assertDictContainsSubset(group, group_ref)
self.identity_api.delete_group(group['id'])
self.assertRaises(exception.GroupNotFound,
self.identity_api.get_group,
group['id'])
def test_create_group_name_with_trailing_whitespace(self):
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group_name = group['name'] = (group['name'] + ' ')
group_returned = self.identity_api.create_group(group)
self.assertEqual(group_returned['name'], group_name.strip())
def test_update_group_name_with_trailing_whitespace(self):
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group_create = self.identity_api.create_group(group)
group_name = group['name'] = (group['name'] + ' ')
group_update = self.identity_api.update_group(group_create['id'],
group)
self.assertEqual(group_update['id'], group_create['id'])
self.assertEqual(group_update['name'], group_name.strip())
def test_get_group_by_name(self):
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group_name = group['name']
group = self.identity_api.create_group(group)
spoiler = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
self.identity_api.create_group(spoiler)
group_ref = self.identity_api.get_group_by_name(
group_name, CONF.identity.default_domain_id)
self.assertDictEqual(group, group_ref)
def test_get_group_by_name_returns_not_found(self):
self.assertRaises(exception.GroupNotFound,
self.identity_api.get_group_by_name,
uuid.uuid4().hex,
CONF.identity.default_domain_id)
@unit.skip_if_cache_disabled('identity')
def test_cache_layer_group_crud(self):
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group = self.identity_api.create_group(group)
# cache the result
group_ref = self.identity_api.get_group(group['id'])
# delete the group bypassing identity api.
domain_id, driver, entity_id = (
self.identity_api._get_domain_driver_and_entity_id(group['id']))
driver.delete_group(entity_id)
self.assertEqual(group_ref, self.identity_api.get_group(group['id']))
self.identity_api.get_group.invalidate(self.identity_api, group['id'])
self.assertRaises(exception.GroupNotFound,
self.identity_api.get_group, group['id'])
group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group = self.identity_api.create_group(group)
# cache the result
self.identity_api.get_group(group['id'])
group['name'] = uuid.uuid4().hex
group_ref = self.identity_api.update_group(group['id'], group)
# after updating through identity api, get updated group
self.assertDictContainsSubset(self.identity_api.get_group(group['id']),
group_ref)
def test_create_duplicate_group_name_fails(self):
group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id,
name=group1['name'])
group1 = self.identity_api.create_group(group1)
self.assertRaises(exception.Conflict,
self.identity_api.create_group,
group2)
def test_create_duplicate_group_name_in_different_domains(self):
new_domain = unit.new_domain_ref()
self.resource_api.create_domain(new_domain['id'], new_domain)
group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group2 = unit.new_group_ref(domain_id=new_domain['id'],
name=group1['name'])
group1 = self.identity_api.create_group(group1)
group2 = self.identity_api.create_group(group2)
def test_move_group_between_domains(self):
domain1 = unit.new_domain_ref()
self.resource_api.create_domain(domain1['id'], domain1)
domain2 = unit.new_domain_ref()
self.resource_api.create_domain(domain2['id'], domain2)
group = unit.new_group_ref(domain_id=domain1['id'])
group = self.identity_api.create_group(group)
group['domain_id'] = domain2['id']
# Update the group asserting that a deprecation warning is emitted
with mock.patch(
'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
self.identity_api.update_group(group['id'], group)
self.assertTrue(mock_dep.called)
updated_group_ref = self.identity_api.get_group(group['id'])
self.assertEqual(domain2['id'], updated_group_ref['domain_id'])
def test_move_group_between_domains_with_clashing_names_fails(self):
domain1 = unit.new_domain_ref()
self.resource_api.create_domain(domain1['id'], domain1)
domain2 = unit.new_domain_ref()
self.resource_api.create_domain(domain2['id'], domain2)
# First, create a group in domain1
group1 = unit.new_group_ref(domain_id=domain1['id'])
group1 = self.identity_api.create_group(group1)
# Now create a group in domain2 with a potentially clashing
# name - which should work since we have domain separation
group2 = unit.new_group_ref(name=group1['name'],
domain_id=domain2['id'])
group2 = self.identity_api.create_group(group2)
# Now try and move group1 into the 2nd domain - which should
# fail since the names clash
group1['domain_id'] = domain2['id']
self.assertRaises(exception.Conflict,
self.identity_api.update_group,
group1['id'],
group1)
def test_user_crud(self):
user_dict = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id)
del user_dict['id']
user = self.identity_api.create_user(user_dict)
user_ref = self.identity_api.get_user(user['id'])
del user_dict['password']
user_ref_dict = {x: user_ref[x] for x in user_ref}
self.assertDictContainsSubset(user_dict, user_ref_dict)
user_dict['password'] = uuid.uuid4().hex
self.identity_api.update_user(user['id'], user_dict)
user_ref = self.identity_api.get_user(user['id'])
del user_dict['password']
user_ref_dict = {x: user_ref[x] for x in user_ref}
self.assertDictContainsSubset(user_dict, user_ref_dict)
self.identity_api.delete_user(user['id'])
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
user['id'])
def test_arbitrary_attributes_are_returned_from_create_user(self):
attr_value = uuid.uuid4().hex
user_data = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id,
arbitrary_attr=attr_value)
user = self.identity_api.create_user(user_data)
self.assertEqual(attr_value, user['arbitrary_attr'])
def test_arbitrary_attributes_are_returned_from_get_user(self):
attr_value = uuid.uuid4().hex
user_data = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id,
arbitrary_attr=attr_value)
user_data = self.identity_api.create_user(user_data)
user = self.identity_api.get_user(user_data['id'])
self.assertEqual(attr_value, user['arbitrary_attr'])
def test_new_arbitrary_attributes_are_returned_from_update_user(self):
user_data = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user_data)
attr_value = uuid.uuid4().hex
user['arbitrary_attr'] = attr_value
updated_user = self.identity_api.update_user(user['id'], user)
self.assertEqual(attr_value, updated_user['arbitrary_attr'])
def test_updated_arbitrary_attributes_are_returned_from_update_user(self):
attr_value = uuid.uuid4().hex
user_data = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id,
arbitrary_attr=attr_value)
new_attr_value = uuid.uuid4().hex
user = self.identity_api.create_user(user_data)
user['arbitrary_attr'] = new_attr_value
updated_user = self.identity_api.update_user(user['id'], user)
self.assertEqual(new_attr_value, updated_user['arbitrary_attr'])
def test_user_update_and_user_get_return_same_response(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
updated_user = {'enabled': False}
updated_user_ref = self.identity_api.update_user(
user['id'], updated_user)
# SQL backend adds 'extra' field
updated_user_ref.pop('extra', None)
self.assertIs(False, updated_user_ref['enabled'])
user_ref = self.identity_api.get_user(user['id'])
self.assertDictEqual(updated_user_ref, user_ref)
@unit.skip_if_no_multiple_domains_support
def test_list_domains_filtered_and_limited(self):
# The test is designed for multiple domains only
def create_domains(domain_count, domain_name_prefix):
for _ in range(domain_count):
domain_name = '%s-%s' % (domain_name_prefix, uuid.uuid4().hex)
domain = unit.new_domain_ref(name=domain_name)
self.domain_list[domain_name] = \
self.resource_api.create_domain(domain['id'], domain)
def clean_up_domains():
for _, domain in self.domain_list.items():
domain['enabled'] = False
self.resource_api.update_domain(domain['id'], domain)
self.resource_api.delete_domain(domain['id'])
self.domain_list = {}
create_domains(2, 'domaingroup1')
create_domains(3, 'domaingroup2')
self.addCleanup(clean_up_domains)
unfiltered_domains = self.resource_api.list_domains()
# Should get back just 4 entities
self.config_fixture.config(list_limit=4)
hints = driver_hints.Hints()
entities = self.resource_api.list_domains(hints=hints)
self.assertThat(entities, matchers.HasLength(hints.limit['limit']))
self.assertTrue(hints.limit['truncated'])
# Get one exact item from the list
hints = driver_hints.Hints()
hints.add_filter('name', unfiltered_domains[3]['name'])
entities = self.resource_api.list_domains(hints=hints)
self.assertThat(entities, matchers.HasLength(1))
self.assertEqual(entities[0], unfiltered_domains[3])
# Get 2 entries
hints = driver_hints.Hints()
hints.add_filter('name', 'domaingroup1', comparator='startswith')
entities = self.resource_api.list_domains(hints=hints)
self.assertThat(entities, matchers.HasLength(2))
self.assertThat(entities[0]['name'],
matchers.StartsWith('domaingroup1'))
self.assertThat(entities[1]['name'],
matchers.StartsWith('domaingroup1'))
class FilterTests(filtering.FilterTests):
def test_list_entities_filtered(self):
for entity in ['user', 'group', 'project']:
# Create 20 entities
entity_list = self._create_test_data(entity, 20)
# Try filtering to get one an exact item out of the list
hints = driver_hints.Hints()
hints.add_filter('name', entity_list[10]['name'])
entities = self._list_entities(entity)(hints=hints)
self.assertEqual(1, len(entities))
self.assertEqual(entity_list[10]['id'], entities[0]['id'])
# Check the driver has removed the filter from the list hints
self.assertFalse(hints.get_exact_filter_by_name('name'))
self._delete_test_data(entity, entity_list)
def test_list_users_inexact_filtered(self):
# Create 20 users, some with specific names. We set the names at create
# time (rather than updating them), since the LDAP driver does not
# support name updates.
user_name_data = {
# user index: name for user
5: 'The',
6: 'The Ministry',
7: 'The Ministry of',
8: 'The Ministry of Silly',
9: 'The Ministry of Silly Walks',
# ...and one for useful case insensitivity testing
10: 'The ministry of silly walks OF'
}
user_list = self._create_test_data(
'user', 20, domain_id=CONF.identity.default_domain_id,
name_dict=user_name_data)
hints = driver_hints.Hints()
hints.add_filter('name', 'ministry', comparator='contains')
users = self.identity_api.list_users(hints=hints)
self.assertEqual(5, len(users))
self._match_with_list(users, user_list,
list_start=6, list_end=11)
# TODO(henry-nash) Check inexact filter has been removed.
hints = driver_hints.Hints()
hints.add_filter('name', 'The', comparator='startswith')
users = self.identity_api.list_users(hints=hints)
self.assertEqual(6, len(users))
self._match_with_list(users, user_list,
list_start=5, list_end=11)
# TODO(henry-nash) Check inexact filter has been removed.
hints = driver_hints.Hints()
hints.add_filter('name', 'of', comparator='endswith')
users = self.identity_api.list_users(hints=hints)
self.assertEqual(2, len(users))
# We can't assume we will get back the users in any particular order
self.assertIn(user_list[7]['id'], [users[0]['id'], users[1]['id']])
self.assertIn(user_list[10]['id'], [users[0]['id'], users[1]['id']])
# TODO(henry-nash) Check inexact filter has been removed.
# TODO(henry-nash): Add some case sensitive tests. However,
# these would be hard to validate currently, since:
#
# For SQL, the issue is that MySQL 0.7, by default, is installed in
# case insensitive mode (which is what is run by default for our
# SQL backend tests). For production deployments. OpenStack
# assumes a case sensitive database. For these tests, therefore, we
# need to be able to check the sensitivity of the database so as to
# know whether to run case sensitive tests here.
#
# For LDAP/AD, although dependent on the schema being used, attributes
# are typically configured to be case aware, but not case sensitive.
self._delete_test_data('user', user_list)
def _groups_for_user_data(self):
number_of_groups = 10
group_name_data = {
# entity index: name for entity
5: 'The',
6: 'The Ministry',
9: 'The Ministry of Silly Walks',
}
group_list = self._create_test_data(
'group', number_of_groups,
domain_id=CONF.identity.default_domain_id,
name_dict=group_name_data)
user_list = self._create_test_data('user', 2)
for group in range(7):
# Create membership, including with two out of the three groups
# with well know names
self.identity_api.add_user_to_group(user_list[0]['id'],
group_list[group]['id'])
# ...and some spoiler memberships
for group in range(7, number_of_groups):
self.identity_api.add_user_to_group(user_list[1]['id'],
group_list[group]['id'])
return group_list, user_list
def test_groups_for_user_inexact_filtered(self):
"""Test use of filtering doesn't break groups_for_user listing.
Some backends may use filtering to achieve the list of groups for a
user, so test that it can combine a second filter.
Test Plan:
- Create 10 groups, some with names we can filter on
- Create 2 users
- Assign 1 of those users to most of the groups, including some of the
well known named ones
- Assign the other user to other groups as spoilers
- Ensure that when we list groups for users with a filter on the group
name, both restrictions have been enforced on what is returned.
"""
group_list, user_list = self._groups_for_user_data()
hints = driver_hints.Hints()
hints.add_filter('name', 'Ministry', comparator='contains')
groups = self.identity_api.list_groups_for_user(
user_list[0]['id'], hints=hints)
# We should only get back one group, since of the two that contain
# 'Ministry' the user only belongs to one.
self.assertThat(len(groups), matchers.Equals(1))
self.assertEqual(group_list[6]['id'], groups[0]['id'])
hints = driver_hints.Hints()
hints.add_filter('name', 'The', comparator='startswith')
groups = self.identity_api.list_groups_for_user(
user_list[0]['id'], hints=hints)
# We should only get back 2 out of the 3 groups that start with 'The'
# hence showing that both "filters" have been applied
self.assertThat(len(groups), matchers.Equals(2))
self.assertIn(group_list[5]['id'], [groups[0]['id'], groups[1]['id']])
self.assertIn(group_list[6]['id'], [groups[0]['id'], groups[1]['id']])
hints.add_filter('name', 'The', comparator='endswith')
groups = self.identity_api.list_groups_for_user(
user_list[0]['id'], hints=hints)
# We should only get back one group since it is the only one that
# ends with 'The'
self.assertThat(len(groups), matchers.Equals(1))
self.assertEqual(group_list[5]['id'], groups[0]['id'])
self._delete_test_data('user', user_list)
self._delete_test_data('group', group_list)
def test_groups_for_user_exact_filtered(self):
"""Test exact filters doesn't break groups_for_user listing."""
group_list, user_list = self._groups_for_user_data()
hints = driver_hints.Hints()
hints.add_filter('name', 'The Ministry', comparator='equals')
groups = self.identity_api.list_groups_for_user(
user_list[0]['id'], hints=hints)
# We should only get back 1 out of the 3 groups with name 'The
# Ministry' hence showing that both "filters" have been applied.
self.assertEqual(1, len(groups))
self.assertEqual(group_list[6]['id'], groups[0]['id'])
self._delete_test_data('user', user_list)
self._delete_test_data('group', group_list)
def _get_user_name_field_size(self):
"""Return the size of the user name field for the backend.
Subclasses can override this method to indicate that the user name
field is limited in length. The user name is the field used in the test
that validates that a filter value works even if it's longer than a
field.
If the backend doesn't limit the value length then return None.
"""
return None
def test_filter_value_wider_than_field(self):
# If a filter value is given that's larger than the field in the
# backend then no values are returned.
user_name_field_size = self._get_user_name_field_size()
if user_name_field_size is None:
# The backend doesn't limit the size of the user name, so pass this
# test.
return
# Create some users just to make sure would return something if the
# filter was ignored.
self._create_test_data('user', 2)
hints = driver_hints.Hints()
value = 'A' * (user_name_field_size + 1)
hints.add_filter('name', value)
users = self.identity_api.list_users(hints=hints)
self.assertEqual([], users)
def _list_users_in_group_data(self):
number_of_users = 10
user_name_data = {
1: 'Arthur Conan Doyle',
3: 'Arthur Rimbaud',
9: 'Arthur Schopenhauer',
}
user_list = self._create_test_data(
'user', number_of_users,
domain_id=CONF.identity.default_domain_id,
name_dict=user_name_data)
group = self._create_one_entity(
'group', CONF.identity.default_domain_id, 'Great Writers')
for i in range(7):
self.identity_api.add_user_to_group(user_list[i]['id'],
group['id'])
return user_list, group
def test_list_users_in_group_inexact_filtered(self):
user_list, group = self._list_users_in_group_data()
hints = driver_hints.Hints()
hints.add_filter('name', 'Arthur', comparator='contains')
users = self.identity_api.list_users_in_group(group['id'], hints=hints)
self.assertThat(len(users), matchers.Equals(2))
self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']])
self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']])
hints = driver_hints.Hints()
hints.add_filter('name', 'Arthur', comparator='startswith')
users = self.identity_api.list_users_in_group(group['id'], hints=hints)
self.assertThat(len(users), matchers.Equals(2))
self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']])
self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']])
hints = driver_hints.Hints()
hints.add_filter('name', 'Doyle', comparator='endswith')
users = self.identity_api.list_users_in_group(group['id'], hints=hints)
self.assertThat(len(users), matchers.Equals(1))
self.assertEqual(user_list[1]['id'], users[0]['id'])
self._delete_test_data('user', user_list)
self._delete_entity('group')(group['id'])
def test_list_users_in_group_exact_filtered(self):
hints = driver_hints.Hints()
user_list, group = self._list_users_in_group_data()
hints.add_filter('name', 'Arthur Rimbaud', comparator='equals')
users = self.identity_api.list_users_in_group(group['id'], hints=hints)
self.assertEqual(1, len(users))
self.assertEqual(user_list[3]['id'], users[0]['id'])
self._delete_test_data('user', user_list)
self._delete_entity('group')(group['id'])
class LimitTests(filtering.FilterTests):
ENTITIES = ['user', 'group', 'project']
def setUp(self):
"""Setup for Limit Test Cases."""
self.entity_lists = {}
for entity in self.ENTITIES:
# Create 20 entities
self.entity_lists[entity] = self._create_test_data(entity, 20)
self.addCleanup(self.clean_up_entities)
def clean_up_entities(self):
"""Clean up entity test data from Limit Test Cases."""
for entity in self.ENTITIES:
self._delete_test_data(entity, self.entity_lists[entity])
del self.entity_lists
def _test_list_entity_filtered_and_limited(self, entity):
self.config_fixture.config(list_limit=10)
# Should get back just 10 entities
hints = driver_hints.Hints()
entities = self._list_entities(entity)(hints=hints)
self.assertEqual(hints.limit['limit'], len(entities))
self.assertTrue(hints.limit['truncated'])
# Override with driver specific limit
if entity == 'project':
self.config_fixture.config(group='resource', list_limit=5)
else:
self.config_fixture.config(group='identity', list_limit=5)
# Should get back just 5 users
hints = driver_hints.Hints()
entities = self._list_entities(entity)(hints=hints)
self.assertEqual(hints.limit['limit'], len(entities))
# Finally, let's pretend we want to get the full list of entities,
# even with the limits set, as part of some internal calculation.
# Calling the API without a hints list should achieve this, and
# return at least the 20 entries we created (there may be other
# entities lying around created by other tests/setup).
entities = self._list_entities(entity)()
self.assertGreaterEqual(len(entities), 20)
self._match_with_list(self.entity_lists[entity], entities)
def test_list_users_filtered_and_limited(self):
self._test_list_entity_filtered_and_limited('user')
def test_list_groups_filtered_and_limited(self):
self._test_list_entity_filtered_and_limited('group')
def test_list_projects_filtered_and_limited(self):
self._test_list_entity_filtered_and_limited('project')
class ShadowUsersTests(object):
def test_create_nonlocal_user_unique_constraint(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user_created = self.shadow_users_api.create_nonlocal_user(user)
self.assertNotIn('password', user_created)
self.assertEqual(user_created['id'], user['id'])
self.assertEqual(user_created['domain_id'], user['domain_id'])
self.assertEqual(user_created['name'], user['name'])
new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
new_user['name'] = user['name']
self.assertRaises(exception.Conflict,
self.shadow_users_api.create_nonlocal_user,
new_user)
def test_get_user(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user.pop('email')
user.pop('password')
user_created = self.shadow_users_api.create_nonlocal_user(user)
self.assertEqual(user_created['id'], user['id'])
user_found = self.shadow_users_api.get_user(user_created['id'])
self.assertItemsEqual(user_created, user_found)
def test_create_federated_user_unique_constraint(self):
federated_dict = unit.new_federated_user_ref()
user_dict = self.shadow_users_api.create_federated_user(federated_dict)
user_dict = self.shadow_users_api.get_user(user_dict["id"])
self.assertIsNotNone(user_dict["id"])
self.assertRaises(exception.Conflict,
self.shadow_users_api.create_federated_user,
federated_dict)
def test_get_federated_user(self):
federated_dict = unit.new_federated_user_ref()
user_dict_create = self.shadow_users_api.create_federated_user(
federated_dict)
user_dict_get = self.shadow_users_api.get_federated_user(
federated_dict["idp_id"],
federated_dict["protocol_id"],
federated_dict["unique_id"])
self.assertItemsEqual(user_dict_create, user_dict_get)
self.assertEqual(user_dict_create["id"], user_dict_get["id"])
def test_update_federated_user_display_name(self):
federated_dict = unit.new_federated_user_ref()
user_dict_create = self.shadow_users_api.create_federated_user(
federated_dict)
new_display_name = uuid.uuid4().hex
self.shadow_users_api.update_federated_user_display_name(
federated_dict["idp_id"],
federated_dict["protocol_id"],
federated_dict["unique_id"],
new_display_name)
user_ref = self.shadow_users_api._get_federated_user(
federated_dict["idp_id"],
federated_dict["protocol_id"],
federated_dict["unique_id"])
self.assertEqual(user_ref.federated_users[0].display_name,
new_display_name)
self.assertEqual(user_dict_create["id"], user_ref.id)
def test_set_last_active_at(self):
self.config_fixture.config(group='security_compliance',
disable_user_account_days_inactive=90)
now = datetime.datetime.utcnow().date()
user_ref = self.identity_api.authenticate(
self.make_request(),
user_id=self.user_sna['id'],
password=self.user_sna['password'])
user_ref = self._get_user_ref(user_ref['id'])
self.assertGreaterEqual(now, user_ref.last_active_at)
def test_set_last_active_at_when_config_setting_is_none(self):
self.config_fixture.config(group='security_compliance',
disable_user_account_days_inactive=None)
user_ref = self.identity_api.authenticate(
self.make_request(),
user_id=self.user_sna['id'],
password=self.user_sna['password'])
user_ref = self._get_user_ref(user_ref['id'])
self.assertIsNone(user_ref.last_active_at)
def _get_user_ref(self, user_id):
with sql.session_for_read() as session:
return session.query(model.User).get(user_id)