Refactor shadow users tests

The main reason for this change is to create real Identity Provider data
for the shadow users (backend and core) tests related to federated
users. This is needed in a subsequent patch where we set the domain for
federated users.

I ended up moving the shadow user tests out of the identity tests, as my
changes were conflicting with some of those tests and it provided a
simpler implementation.

Partial-Bug: #1642687
Partially-Implements: bp support-federated-attr
Depends-On: I08a8f3cb59150c8e9a2f90c5ea6b0aa197a03572
Change-Id: If8c8ad39c4c55a2d800bf4432411db59799e84e6
This commit is contained in:
Ronald De Rose 2017-01-21 20:47:04 +00:00
parent 2bd88d30e1
commit 6e0faa96ed
7 changed files with 254 additions and 168 deletions

View File

@ -0,0 +1,144 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import uuid
from keystone.common import sql
import keystone.conf
from keystone import exception
from keystone.identity.backends import sql_model as model
from keystone.tests import unit
CONF = keystone.conf.CONF
class ShadowUsersBackendTests(object):
def test_create_nonlocal_user_unique_constraint(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user_created = self.shadow_users_api.create_nonlocal_user(user)
self.assertNotIn('password', user_created)
self.assertEqual(user_created['id'], user['id'])
self.assertEqual(user_created['domain_id'], user['domain_id'])
self.assertEqual(user_created['name'], user['name'])
new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
new_user['name'] = user['name']
self.assertRaises(exception.Conflict,
self.shadow_users_api.create_nonlocal_user,
new_user)
def test_create_nonlocal_user_does_not_create_local_user(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
new_nonlocal_user = self.shadow_users_api.create_nonlocal_user(user)
user_ref = self._get_user_ref(new_nonlocal_user['id'])
self.assertIsNone(user_ref.local_user)
def test_nonlocal_user_unique_user_id_constraint(self):
user_ref = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.shadow_users_api.create_nonlocal_user(user_ref)
# attempt to create a nonlocal_user with the same user_id
nonlocal_user = {
'domain_id': CONF.identity.default_domain_id,
'name': uuid.uuid4().hex,
'user_id': user['id']
}
self.assertRaises(sql.DBDuplicateEntry, self._add_nonlocal_user,
nonlocal_user)
def test_get_user(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user.pop('email')
user.pop('password')
user_created = self.shadow_users_api.create_nonlocal_user(user)
self.assertEqual(user_created['id'], user['id'])
user_found = self.shadow_users_api.get_user(user_created['id'])
self.assertItemsEqual(user_created, user_found)
def test_create_federated_user_unique_constraint(self):
user_dict = self.shadow_users_api.create_federated_user(
self.federated_user)
user_dict = self.shadow_users_api.get_user(user_dict["id"])
self.assertIsNotNone(user_dict["id"])
self.assertRaises(exception.Conflict,
self.shadow_users_api.create_federated_user,
self.federated_user)
def test_get_federated_user(self):
user_dict_create = self.shadow_users_api.create_federated_user(
self.federated_user)
user_dict_get = self.shadow_users_api.get_federated_user(
self.federated_user["idp_id"],
self.federated_user["protocol_id"],
self.federated_user["unique_id"])
self.assertItemsEqual(user_dict_create, user_dict_get)
self.assertEqual(user_dict_create["id"], user_dict_get["id"])
def test_update_federated_user_display_name(self):
user_dict_create = self.shadow_users_api.create_federated_user(
self.federated_user)
new_display_name = uuid.uuid4().hex
self.shadow_users_api.update_federated_user_display_name(
self.federated_user["idp_id"],
self.federated_user["protocol_id"],
self.federated_user["unique_id"],
new_display_name)
user_ref = self.shadow_users_api._get_federated_user(
self.federated_user["idp_id"],
self.federated_user["protocol_id"],
self.federated_user["unique_id"])
self.assertEqual(user_ref.federated_users[0].display_name,
new_display_name)
self.assertEqual(user_dict_create["id"], user_ref.id)
def test_set_last_active_at(self):
self.config_fixture.config(group='security_compliance',
disable_user_account_days_inactive=90)
now = datetime.datetime.utcnow().date()
password = uuid.uuid4().hex
user = self._create_user(password)
user_auth = self.identity_api.authenticate(
self.make_request(),
user_id=user['id'],
password=password)
user_ref = self._get_user_ref(user_auth['id'])
self.assertGreaterEqual(now, user_ref.last_active_at)
def test_set_last_active_at_when_config_setting_is_none(self):
self.config_fixture.config(group='security_compliance',
disable_user_account_days_inactive=None)
password = uuid.uuid4().hex
user = self._create_user(password)
user_auth = self.identity_api.authenticate(
self.make_request(),
user_id=user['id'],
password=password)
user_ref = self._get_user_ref(user_auth['id'])
self.assertIsNone(user_ref.last_active_at)
def _add_nonlocal_user(self, nonlocal_user):
with sql.session_for_write() as session:
nonlocal_user_ref = model.NonLocalUser.from_dict(nonlocal_user)
session.add(nonlocal_user_ref)
def _create_user(self, password):
user = {
'name': uuid.uuid4().hex,
'domain_id': self.domain_id,
'enabled': True,
'password': password
}
return self.identity_api.create_user(user)
def _get_user_ref(self, user_id):
with sql.session_for_read() as session:
return session.query(model.User).get(user_id)

View File

@ -0,0 +1,58 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
class ShadowUsersCoreTests(object):
def test_shadow_federated_user(self):
user = self.identity_api.shadow_federated_user(
self.federated_user['idp_id'],
self.federated_user['protocol_id'],
self.federated_user['unique_id'],
self.federated_user['display_name'])
self.assertIsNotNone(user['id'])
self.assertEqual(5, len(user.keys()))
self.assertIsNotNone(user['name'])
self.assertIsNone(user['password_expires_at'])
self.assertIsNone(user['domain_id'])
# NOTE(breton): below, attribute `enabled` is explicitly tested to be
# equal True. assertTrue should not be used, because it converts
# the passed value to bool().
self.assertEqual(True, user['enabled'])
def test_shadow_existing_federated_user(self):
# introduce the user to keystone for the first time
shadow_user1 = self.identity_api.shadow_federated_user(
self.federated_user['idp_id'],
self.federated_user['protocol_id'],
self.federated_user['unique_id'],
self.federated_user['display_name'])
self.assertEqual(self.federated_user['display_name'],
shadow_user1['name'])
# shadow the user again, with another name to invalidate the cache
# internally, this operation causes request to the driver. It should
# not fail.
self.federated_user['display_name'] = uuid.uuid4().hex
shadow_user2 = self.identity_api.shadow_federated_user(
self.federated_user['idp_id'],
self.federated_user['protocol_id'],
self.federated_user['unique_id'],
self.federated_user['display_name'])
self.assertEqual(self.federated_user['display_name'],
shadow_user2['name'])
self.assertNotEqual(shadow_user1['name'], shadow_user2['name'])
# The shadowed users still share the same unique ID.
self.assertEqual(shadow_user1['id'], shadow_user2['id'])

View File

@ -12,17 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import uuid
from six.moves import range
from testtools import matchers
from keystone.common import driver_hints
from keystone.common import sql
import keystone.conf
from keystone import exception
from keystone.identity.backends import sql_model as model
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit import filtering
@ -1358,115 +1355,3 @@ class LimitTests(filtering.FilterTests):
def test_list_projects_filtered_and_limited(self):
self._test_list_entity_filtered_and_limited('project')
class ShadowUsersTests(object):
def test_create_nonlocal_user_unique_constraint(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user_created = self.shadow_users_api.create_nonlocal_user(user)
self.assertNotIn('password', user_created)
self.assertEqual(user_created['id'], user['id'])
self.assertEqual(user_created['domain_id'], user['domain_id'])
self.assertEqual(user_created['name'], user['name'])
new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
new_user['name'] = user['name']
self.assertRaises(exception.Conflict,
self.shadow_users_api.create_nonlocal_user,
new_user)
def test_create_nonlocal_user_does_not_create_local_user(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
new_nonlocal_user = self.shadow_users_api.create_nonlocal_user(user)
user_ref = self._get_user_ref(new_nonlocal_user['id'])
self.assertIsNone(user_ref.local_user)
def test_nonlocal_user_unique_user_id_constraint(self):
user_ref = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.shadow_users_api.create_nonlocal_user(user_ref)
# attempt to create a nonlocal_user with the same user_id
nonlocal_user = {
'domain_id': CONF.identity.default_domain_id,
'name': uuid.uuid4().hex,
'user_id': user['id']
}
self.assertRaises(sql.DBDuplicateEntry, self._add_nonlocal_user,
nonlocal_user)
def _add_nonlocal_user(self, nonlocal_user):
with sql.session_for_write() as session:
nonlocal_user_ref = model.NonLocalUser.from_dict(nonlocal_user)
session.add(nonlocal_user_ref)
def test_get_user(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user.pop('email')
user.pop('password')
user_created = self.shadow_users_api.create_nonlocal_user(user)
self.assertEqual(user_created['id'], user['id'])
user_found = self.shadow_users_api.get_user(user_created['id'])
self.assertItemsEqual(user_created, user_found)
def test_create_federated_user_unique_constraint(self):
federated_dict = unit.new_federated_user_ref()
user_dict = self.shadow_users_api.create_federated_user(federated_dict)
user_dict = self.shadow_users_api.get_user(user_dict["id"])
self.assertIsNotNone(user_dict["id"])
self.assertRaises(exception.Conflict,
self.shadow_users_api.create_federated_user,
federated_dict)
def test_get_federated_user(self):
federated_dict = unit.new_federated_user_ref()
user_dict_create = self.shadow_users_api.create_federated_user(
federated_dict)
user_dict_get = self.shadow_users_api.get_federated_user(
federated_dict["idp_id"],
federated_dict["protocol_id"],
federated_dict["unique_id"])
self.assertItemsEqual(user_dict_create, user_dict_get)
self.assertEqual(user_dict_create["id"], user_dict_get["id"])
def test_update_federated_user_display_name(self):
federated_dict = unit.new_federated_user_ref()
user_dict_create = self.shadow_users_api.create_federated_user(
federated_dict)
new_display_name = uuid.uuid4().hex
self.shadow_users_api.update_federated_user_display_name(
federated_dict["idp_id"],
federated_dict["protocol_id"],
federated_dict["unique_id"],
new_display_name)
# NOTE(notmorgan): to_dict not called here, an explicit session context
# is not needed.
user_ref = self.shadow_users_api._get_federated_user(
federated_dict["idp_id"],
federated_dict["protocol_id"],
federated_dict["unique_id"])
self.assertEqual(user_ref.federated_users[0].display_name,
new_display_name)
self.assertEqual(user_dict_create["id"], user_ref.id)
def test_set_last_active_at(self):
self.config_fixture.config(group='security_compliance',
disable_user_account_days_inactive=90)
now = datetime.datetime.utcnow().date()
user_ref = self.identity_api.authenticate(
self.make_request(),
user_id=self.user_sna['id'],
password=self.user_sna['password'])
user_ref = self._get_user_ref(user_ref['id'])
self.assertGreaterEqual(now, user_ref.last_active_at)
def test_set_last_active_at_when_config_setting_is_none(self):
self.config_fixture.config(group='security_compliance',
disable_user_account_days_inactive=None)
user_ref = self.identity_api.authenticate(
self.make_request(),
user_id=self.user_sna['id'],
password=self.user_sna['password'])
user_ref = self._get_user_ref(user_ref['id'])
self.assertIsNone(user_ref.last_active_at)
def _get_user_ref(self, user_id):
with sql.session_for_read() as session:
return session.query(model.User).get(user_id)

View File

@ -176,55 +176,3 @@ class TestDatabaseDomainConfigs(unit.TestCase):
self.assertEqual(CONF.ldap.suffix, res.ldap.suffix)
self.assertEqual(CONF.ldap.use_tls, res.ldap.use_tls)
self.assertEqual(CONF.ldap.query_scope, res.ldap.query_scope)
class TestShadowUsers(unit.TestCase):
def setUp(self):
super(TestShadowUsers, self).setUp()
self.useFixture(database.Database())
self.load_backends()
def test_shadow_federated_user(self):
fed_user = unit.new_federated_user_ref()
user = (
self.identity_api.shadow_federated_user(fed_user['idp_id'],
fed_user['protocol_id'],
fed_user['unique_id'],
fed_user['display_name'])
)
self.assertIsNotNone(user['id'])
self.assertEqual(5, len(user.keys()))
self.assertIsNotNone(user['name'])
self.assertIsNone(user['password_expires_at'])
self.assertIsNone(user['domain_id'])
# NOTE(breton): below, attribute `enabled` is explicitly tested to be
# equal True. assertTrue should not be used, because it converts
# the passed value to bool().
self.assertEqual(True, user['enabled'])
def test_shadow_existing_federated_user(self):
fed_user = unit.new_federated_user_ref()
# introduce the user to keystone for the first time
shadow_user1 = self.identity_api.shadow_federated_user(
fed_user['idp_id'],
fed_user['protocol_id'],
fed_user['unique_id'],
fed_user['display_name'])
self.assertEqual(fed_user['display_name'], shadow_user1['name'])
# shadow the user again, with another name to invalidate the cache
# internally, this operation causes request to the driver. It should
# not fail.
fed_user['display_name'] = uuid.uuid4().hex
shadow_user2 = self.identity_api.shadow_federated_user(
fed_user['idp_id'],
fed_user['protocol_id'],
fed_user['unique_id'],
fed_user['display_name'])
self.assertEqual(fed_user['display_name'], shadow_user2['name'])
self.assertNotEqual(shadow_user1['name'], shadow_user2['name'])
# The shadowed users still share the same unique ID.
self.assertEqual(shadow_user1['id'], shadow_user2['id'])

View File

@ -225,7 +225,6 @@ class SqlModels(SqlTests):
class SqlIdentity(SqlTests,
identity_tests.IdentityTests,
identity_tests.ShadowUsersTests,
assignment_tests.AssignmentTests,
resource_tests.ResourceTests):
def test_password_hashed(self):

View File

@ -0,0 +1,52 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from keystone.tests import unit
from keystone.tests.unit.identity.shadow_users import test_backend
from keystone.tests.unit.identity.shadow_users import test_core
from keystone.tests.unit.ksfixtures import database
class ShadowUsersTests(unit.TestCase,
test_backend.ShadowUsersBackendTests,
test_core.ShadowUsersCoreTests):
def setUp(self):
super(ShadowUsersTests, self).setUp()
self.useFixture(database.Database())
self.load_backends()
self.idp = {
'id': uuid.uuid4().hex,
'enabled': True,
'description': uuid.uuid4().hex
}
self.mapping = {
'id': uuid.uuid4().hex,
}
self.protocol = {
'id': uuid.uuid4().hex,
'idp_id': self.idp['id'],
'mapping_id': self.mapping['id']
}
self.federated_user = {
'idp_id': self.idp['id'],
'protocol_id': self.protocol['id'],
'unique_id': uuid.uuid4().hex,
'display_name': uuid.uuid4().hex
}
self.federation_api.create_idp(self.idp['id'], self.idp)
self.federation_api.create_mapping(self.mapping['id'], self.mapping)
self.federation_api.create_protocol(
self.idp['id'], self.protocol['id'], self.protocol)
self.domain_id = (
self.federation_api.get_idp(self.idp['id'])['domain_id'])