206 lines
7.8 KiB
Python
206 lines
7.8 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import uuid
|
|
|
|
from keystone.common import provider_api
|
|
from keystone import exception
|
|
from keystone.tests import unit
|
|
from keystone.tests.unit import default_fixtures
|
|
from keystone.tests.unit.identity.shadow_users import test_backend
|
|
from keystone.tests.unit.identity.shadow_users import test_core
|
|
from keystone.tests.unit.ksfixtures import database
|
|
|
|
PROVIDERS = provider_api.ProviderAPIs
|
|
|
|
|
|
class ShadowUsersTests(unit.TestCase,
|
|
test_backend.ShadowUsersBackendTests,
|
|
test_core.ShadowUsersCoreTests):
|
|
def setUp(self):
|
|
super(ShadowUsersTests, self).setUp()
|
|
self.useFixture(database.Database())
|
|
self.load_backends()
|
|
PROVIDERS.resource_api.create_domain(
|
|
default_fixtures.ROOT_DOMAIN['id'], default_fixtures.ROOT_DOMAIN)
|
|
self.idp = {
|
|
'id': uuid.uuid4().hex,
|
|
'enabled': True,
|
|
'description': uuid.uuid4().hex
|
|
}
|
|
self.mapping = {
|
|
'id': uuid.uuid4().hex,
|
|
}
|
|
self.protocol = {
|
|
'id': uuid.uuid4().hex,
|
|
'idp_id': self.idp['id'],
|
|
'mapping_id': self.mapping['id']
|
|
}
|
|
self.federated_user = {
|
|
'idp_id': self.idp['id'],
|
|
'protocol_id': self.protocol['id'],
|
|
'unique_id': uuid.uuid4().hex,
|
|
'display_name': uuid.uuid4().hex
|
|
}
|
|
self.email = uuid.uuid4().hex
|
|
PROVIDERS.federation_api.create_idp(self.idp['id'], self.idp)
|
|
PROVIDERS.federation_api.create_mapping(
|
|
self.mapping['id'], self.mapping
|
|
)
|
|
PROVIDERS.federation_api.create_protocol(
|
|
self.idp['id'], self.protocol['id'], self.protocol)
|
|
self.domain_id = (
|
|
PROVIDERS.federation_api.get_idp(self.idp['id'])['domain_id'])
|
|
|
|
|
|
class TestUserWithFederatedUser(ShadowUsersTests):
|
|
|
|
def setUp(self):
|
|
super(TestUserWithFederatedUser, self).setUp()
|
|
self.useFixture(database.Database())
|
|
self.load_backends()
|
|
|
|
def assertFederatedDictsEqual(self, fed_dict, fed_object):
|
|
self.assertEqual(fed_dict['idp_id'], fed_object['idp_id'])
|
|
self.assertEqual(fed_dict['protocol_id'],
|
|
fed_object['protocols'][0]['protocol_id'])
|
|
self.assertEqual(fed_dict['unique_id'],
|
|
fed_object['protocols'][0]['unique_id'])
|
|
|
|
def test_get_user_when_user_has_federated_object(self):
|
|
fed_dict = unit.new_federated_user_ref(idp_id=self.idp['id'],
|
|
protocol_id=self.protocol['id'])
|
|
user = self.shadow_users_api.create_federated_user(
|
|
self.domain_id, fed_dict)
|
|
|
|
# test that the user returns a federated object and that there is only
|
|
# one returned
|
|
user_ref = self.identity_api.get_user(user['id'])
|
|
self.assertIn('federated', user_ref)
|
|
self.assertEqual(1, len(user_ref['federated']))
|
|
|
|
self.assertFederatedDictsEqual(fed_dict, user_ref['federated'][0])
|
|
|
|
def test_create_user_with_invalid_idp_and_protocol_fails(self):
|
|
baduser = unit.new_user_ref(domain_id=self.domain_id)
|
|
baduser['federated'] = [
|
|
{
|
|
'idp_id': 'fakeidp',
|
|
'protocols': [
|
|
{
|
|
'protocol_id': 'nonexistent',
|
|
'unique_id': 'unknown'
|
|
}
|
|
]
|
|
}
|
|
]
|
|
# Check validation works by throwing a federated object with
|
|
# invalid idp_id, protocol_id inside the user passed to create_user.
|
|
self.assertRaises(exception.ValidationError,
|
|
self.identity_api.create_user,
|
|
baduser)
|
|
|
|
baduser['federated'][0]['idp_id'] = self.idp['id']
|
|
self.assertRaises(exception.ValidationError,
|
|
self.identity_api.create_user,
|
|
baduser)
|
|
|
|
def test_create_user_with_federated_attributes(self):
|
|
# Create the schema of a federated attribute being passed in with a
|
|
# user.
|
|
user = unit.new_user_ref(domain_id=self.domain_id)
|
|
unique_id = uuid.uuid4().hex
|
|
user['federated'] = [
|
|
{
|
|
'idp_id': self.idp['id'],
|
|
'protocols': [
|
|
{
|
|
'protocol_id': self.protocol['id'],
|
|
'unique_id': unique_id
|
|
}
|
|
]
|
|
}
|
|
]
|
|
|
|
# Test that there are no current federated_users that match our users
|
|
# federated object and create the user
|
|
self.assertRaises(exception.UserNotFound,
|
|
self.shadow_users_api.get_federated_user,
|
|
self.idp['id'],
|
|
self.protocol['id'],
|
|
unique_id)
|
|
|
|
ref = self.identity_api.create_user(user)
|
|
|
|
# Test that the user and federated object now exists
|
|
self.assertEqual(user['name'], ref['name'])
|
|
self.assertEqual(user['federated'], ref['federated'])
|
|
fed_user = self.shadow_users_api.get_federated_user(
|
|
self.idp['id'],
|
|
self.protocol['id'],
|
|
unique_id)
|
|
self.assertIsNotNone(fed_user)
|
|
|
|
def test_update_user_with_invalid_idp_and_protocol_fails(self):
|
|
baduser = unit.new_user_ref(domain_id=self.domain_id)
|
|
baduser['federated'] = [
|
|
{
|
|
'idp_id': 'fakeidp',
|
|
'protocols': [
|
|
{
|
|
'protocol_id': 'nonexistent',
|
|
'unique_id': 'unknown'
|
|
}
|
|
]
|
|
}
|
|
]
|
|
# Check validation works by throwing a federated object with
|
|
# invalid idp_id, protocol_id inside the user passed to create_user.
|
|
self.assertRaises(exception.ValidationError,
|
|
self.identity_api.create_user,
|
|
baduser)
|
|
|
|
baduser['federated'][0]['idp_id'] = self.idp['id']
|
|
self.assertRaises(exception.ValidationError,
|
|
self.identity_api.create_user,
|
|
baduser)
|
|
|
|
def test_update_user_with_federated_attributes(self):
|
|
user = self.shadow_users_api.create_federated_user(
|
|
self.domain_id, self.federated_user)
|
|
user = self.identity_api.get_user(user['id'])
|
|
|
|
# Test that update user can return a federated object with the user as
|
|
# a response if the user has any
|
|
user = self.identity_api.update_user(user['id'], user)
|
|
self.assertFederatedDictsEqual(self.federated_user,
|
|
user['federated'][0])
|
|
|
|
# Test that update user can replace a users federated objects if added
|
|
# in the request and that its response is that new federated objects
|
|
new_fed = [
|
|
{
|
|
'idp_id': self.idp['id'],
|
|
'protocols': [
|
|
{
|
|
'protocol_id': self.protocol['id'],
|
|
'unique_id': uuid.uuid4().hex
|
|
}
|
|
]
|
|
}
|
|
]
|
|
user['federated'] = new_fed
|
|
user = self.identity_api.update_user(user['id'], user)
|
|
self.assertTrue('federated' in user)
|
|
self.assertTrue(len(user['federated']), 1)
|
|
self.assertEqual(user['federated'][0], new_fed[0])
|