Shadow users - Shadow federated users

"Shadow users: unified identity" implementation:
Federated users have a idp_id, protocol_id, display name,
and a unique ID asserted by the identity provider. These
are the minimal pieces of data required to identify
returning users and provide them with a consistent identity.

Note: the following work items left will be completed in a
separate patch:
* Allow concrete role assignments for federated users
* Shadowing LDAP users

bp shadow-users

Change-Id: Ieb582947038b4a75ef4237939ad8a90079b38aa8
This commit is contained in:
Ronald De Rose 2016-02-11 16:26:48 +00:00 committed by Steve Martinelli
parent 5d6a088455
commit b764a4daa0
15 changed files with 388 additions and 33 deletions

View File

@ -140,7 +140,12 @@ def handle_unscoped_token(context, auth_payload, auth_context,
federation_api, identity_api) federation_api, identity_api)
if is_ephemeral_user(mapped_properties): if is_ephemeral_user(mapped_properties):
user = setup_username(context, mapped_properties) unique_id, display_name = (
get_user_unique_id_and_display_name(context, mapped_properties)
)
user = identity_api.shadow_federated_user(identity_provider,
protocol, unique_id,
display_name)
user_id = user['id'] user_id = user['id']
group_ids = mapped_properties['group_ids'] group_ids = mapped_properties['group_ids']
utils.validate_groups_cardinality(group_ids, mapping_id) utils.validate_groups_cardinality(group_ids, mapping_id)
@ -201,7 +206,7 @@ def apply_mapping_filter(identity_provider, protocol, assertion,
return mapped_properties, mapping_id return mapped_properties, mapping_id
def setup_username(context, mapped_properties): def get_user_unique_id_and_display_name(context, mapped_properties):
"""Setup federated username. """Setup federated username.
Function covers all the cases for properly setting user id, a primary Function covers all the cases for properly setting user id, a primary
@ -223,8 +228,8 @@ def setup_username(context, mapped_properties):
:raises keystone.exception.Unauthorized: If neither `user_name` nor :raises keystone.exception.Unauthorized: If neither `user_name` nor
`user_id` is set. `user_id` is set.
:returns: dictionary with user identification :returns: tuple with user identification
:rtype: dict :rtype: tuple
""" """
user = mapped_properties['user'] user = mapped_properties['user']
@ -245,5 +250,4 @@ def setup_username(context, mapped_properties):
user_id = user_name user_id = user_name
user['id'] = parse.quote(user_id) user['id'] = parse.quote(user_id)
return (user['id'], user['name'])
return user

View File

@ -235,6 +235,12 @@ FILE_OPTIONS = {
'value to False is when configuring a fresh ' 'value to False is when configuring a fresh '
'installation.'), 'installation.'),
], ],
'shadow_users': [
cfg.StrOpt('driver',
default='sql',
help='Entrypoint for the shadow users backend driver '
'in the keystone.identity.shadow_users namespace.'),
],
'trust': [ 'trust': [
cfg.BoolOpt('enabled', default=True, cfg.BoolOpt('enabled', default=True,
help='Delegation and impersonation features can be ' help='Delegation and impersonation features can be '

View File

@ -0,0 +1,43 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import migrate
import sqlalchemy as sql
def upgrade(migrate_engine):
meta = sql.MetaData()
meta.bind = migrate_engine
user_table = sql.Table('user', meta, autoload=True)
idp_table = sql.Table('identity_provider', meta, autoload=True)
protocol_table = sql.Table('federation_protocol', meta, autoload=True)
federated_table = sql.Table(
'federated_user',
meta,
sql.Column('id', sql.Integer, primary_key=True, nullable=False),
sql.Column('user_id', sql.String(64),
sql.ForeignKey(user_table.c.id, ondelete='CASCADE'),
nullable=False),
sql.Column('idp_id', sql.String(64),
sql.ForeignKey(idp_table.c.id, ondelete='CASCADE'),
nullable=False),
sql.Column('protocol_id', sql.String(64), nullable=False),
sql.Column('unique_id', sql.String(255), nullable=False),
sql.Column('display_name', sql.String(255), nullable=True),
sql.UniqueConstraint('idp_id', 'protocol_id', 'unique_id'))
federated_table.create(migrate_engine, checkfirst=True)
migrate.ForeignKeyConstraint(
columns=[federated_table.c.protocol_id, federated_table.c.idp_id],
refcolumns=[protocol_table.c.id, protocol_table.c.idp_id]).create()

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from sqlalchemy import and_ import sqlalchemy
from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy import orm from sqlalchemy import orm
@ -33,14 +33,21 @@ class User(sql.ModelBase, sql.DictBase):
extra = sql.Column(sql.JsonBlob()) extra = sql.Column(sql.JsonBlob())
default_project_id = sql.Column(sql.String(64)) default_project_id = sql.Column(sql.String(64))
local_user = orm.relationship('LocalUser', uselist=False, local_user = orm.relationship('LocalUser', uselist=False,
single_parent=True, single_parent=True, lazy='subquery',
cascade='all,delete-orphan', backref='user') cascade='all,delete-orphan', backref='user')
federated_users = orm.relationship('FederatedUser',
single_parent=True,
lazy='subquery',
cascade='all,delete-orphan',
backref='user')
# name property # name property
@hybrid_property @hybrid_property
def name(self): def name(self):
if self.local_user: if self.local_user:
return self.local_user.name return self.local_user.name
elif self.federated_users:
return self.federated_users[0].display_name
else: else:
return None return None
@ -126,6 +133,26 @@ class Password(sql.ModelBase, sql.DictBase):
password = sql.Column(sql.String(128)) password = sql.Column(sql.String(128))
class FederatedUser(sql.ModelBase, sql.ModelDictMixin):
__tablename__ = 'federated_user'
attributes = ['id', 'user_id', 'idp_id', 'protocol_id', 'unique_id',
'display_name']
id = sql.Column(sql.Integer, primary_key=True)
user_id = sql.Column(sql.String(64), sql.ForeignKey('user.id',
ondelete='CASCADE'))
idp_id = sql.Column(sql.String(64), sql.ForeignKey('identity_provider.id',
ondelete='CASCADE'))
protocol_id = sql.Column(sql.String(64), nullable=False)
unique_id = sql.Column(sql.String(255), nullable=False)
display_name = sql.Column(sql.String(255), nullable=True)
__table_args__ = (
sql.UniqueConstraint('idp_id', 'protocol_id', 'unique_id'),
sqlalchemy.ForeignKeyConstraint(['protocol_id', 'idp_id'],
['federation_protocol.id',
'federation_protocol.idp_id'])
)
class Group(sql.ModelBase, sql.DictBase): class Group(sql.ModelBase, sql.DictBase):
__tablename__ = 'group' __tablename__ = 'group'
attributes = ['id', 'name', 'domain_id', 'description'] attributes = ['id', 'name', 'domain_id', 'description']
@ -216,8 +243,8 @@ class Identity(identity.IdentityDriverV8):
def get_user_by_name(self, user_name, domain_id): def get_user_by_name(self, user_name, domain_id):
with sql.session_for_read() as session: with sql.session_for_read() as session:
query = session.query(User).join(LocalUser) query = session.query(User).join(LocalUser)
query = query.filter(and_(LocalUser.name == user_name, query = query.filter(sqlalchemy.and_(LocalUser.name == user_name,
LocalUser.domain_id == domain_id)) LocalUser.domain_id == domain_id))
try: try:
user_ref = query.one() user_ref = query.one()
except sql.NotFound: except sql.NotFound:

View File

@ -449,7 +449,7 @@ def exception_translated(exception_type):
@notifications.listener @notifications.listener
@dependency.provider('identity_api') @dependency.provider('identity_api')
@dependency.requires('assignment_api', 'credential_api', 'id_mapping_api', @dependency.requires('assignment_api', 'credential_api', 'id_mapping_api',
'resource_api', 'revoke_api') 'resource_api', 'revoke_api', 'shadow_users_api')
class Manager(manager.Manager): class Manager(manager.Manager):
"""Default pivot point for the Identity backend. """Default pivot point for the Identity backend.
@ -1209,6 +1209,35 @@ class Manager(manager.Manager):
update_dict = {'password': new_password} update_dict = {'password': new_password}
self.update_user(user_id, update_dict) self.update_user(user_id, update_dict)
@MEMOIZE
def shadow_federated_user(self, idp_id, protocol_id, unique_id,
display_name):
"""Shadows a federated user by mapping to a user.
:param idp_id: identity provider id
:param protocol_id: protocol id
:param unique_id: unique id for the user within the IdP
:param display_name: user's display name
:returns: dictionary of the mapped User entity
"""
user_dict = {}
try:
user_dict = self.shadow_users_api.get_federated_user(
idp_id, protocol_id, unique_id)
self.update_federated_user_display_name(idp_id, protocol_id,
unique_id, display_name)
except exception.UserNotFound:
federated_dict = {
'idp_id': idp_id,
'protocol_id': protocol_id,
'unique_id': unique_id,
'display_name': display_name
}
user_dict = self.shadow_users_api.create_federated_user(
federated_dict)
return user_dict
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class IdentityDriverV8(object): class IdentityDriverV8(object):
@ -1517,3 +1546,54 @@ class MappingDriverV8(object):
MappingDriver = manager.create_legacy_driver(MappingDriverV8) MappingDriver = manager.create_legacy_driver(MappingDriverV8)
@dependency.provider('shadow_users_api')
class ShadowUsersManager(manager.Manager):
"""Default pivot point for the Shadow Users backend."""
driver_namespace = 'keystone.identity.shadow_users'
def __init__(self):
super(ShadowUsersManager, self).__init__(CONF.shadow_users.driver)
@six.add_metaclass(abc.ABCMeta)
class ShadowUsersDriverV9(object):
"""Interface description for an Shadow Users driver."""
@abc.abstractmethod
def create_federated_user(self, federated_dict):
"""Create a new user with the federated identity
:param dict federated_dict: Reference to the federated user
:param user_id: user ID for linking to the federated identity
:returns dict: Containing the user reference
"""
raise exception.NotImplemented()
@abc.abstractmethod
def get_federated_user(self, idp_id, protocol_id, unique_id):
"""Returns the found user for the federated identity
:param idp_id: The identity provider ID
:param protocol_id: The federation protocol ID
:param unique_id: The unique ID for the user
:returns dict: Containing the user reference
"""
raise exception.NotImplemented()
@abc.abstractmethod
def update_federated_user_display_name(self, idp_id, protocol_id,
unique_id, display_name):
"""Updates federated user's display name if changed
:param idp_id: The identity provider ID
:param protocol_id: The federation protocol ID
:param unique_id: The unique ID for the user
:param display_name: The user's display name
"""
raise exception.NotImplemented()

View File

@ -0,0 +1,73 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from keystone.common import sql
from keystone import exception
from keystone import identity
from keystone.identity.backends import sql as model
class ShadowUsers(identity.ShadowUsersDriverV9):
@sql.handle_conflicts(conflict_type='federated_user')
def create_federated_user(self, federated_dict):
user = {
'id': uuid.uuid4().hex,
'enabled': True
}
with sql.session_for_write() as session:
federated_ref = model.FederatedUser.from_dict(federated_dict)
user_ref = model.User.from_dict(user)
user_ref.federated_users.append(federated_ref)
session.add(user_ref)
return identity.filter_user(user_ref.to_dict())
def get_federated_user(self, idp_id, protocol_id, unique_id):
user_ref = self._get_federated_user(idp_id, protocol_id, unique_id)
return identity.filter_user(user_ref.to_dict())
def _get_federated_user(self, idp_id, protocol_id, unique_id):
"""Returns the found user for the federated identity
:param idp_id: The identity provider ID
:param protocol_id: The federation protocol ID
:param unique_id: The user's unique ID (unique within the IdP)
:returns User: Returns a reference to the User
"""
with sql.session_for_read() as session:
query = session.query(model.User).outerjoin(model.LocalUser)
query = query.join(model.FederatedUser)
query = query.filter(model.FederatedUser.idp_id == idp_id)
query = query.filter(model.FederatedUser.protocol_id ==
protocol_id)
query = query.filter(model.FederatedUser.unique_id == unique_id)
try:
user_ref = query.one()
except sql.NotFound:
raise exception.UserNotFound(user_id=unique_id)
return user_ref
@sql.handle_conflicts(conflict_type='federated_user')
def update_federated_user_display_name(self, idp_id, protocol_id,
unique_id, display_name):
with sql.session_for_write() as session:
query = session.query(model.FederatedUser)
query = query.filter(model.FederatedUser.idp_id == idp_id)
query = query.filter(model.FederatedUser.protocol_id ==
protocol_id)
query = query.filter(model.FederatedUser.unique_id == unique_id)
query = query.filter(model.FederatedUser.display_name !=
display_name)
query.update({'display_name': display_name})
return

View File

@ -59,6 +59,7 @@ def load_backends():
id_generator_api=identity.generator.Manager(), id_generator_api=identity.generator.Manager(),
id_mapping_api=identity.MappingManager(), id_mapping_api=identity.MappingManager(),
identity_api=_IDENTITY_API, identity_api=_IDENTITY_API,
shadow_users_api=identity.ShadowUsersManager(),
oauth_api=oauth1.Manager(), oauth_api=oauth1.Manager(),
policy_api=policy.Manager(), policy_api=policy.Manager(),
resource_api=resource.Manager(), resource_api=resource.Manager(),

View File

@ -523,8 +523,8 @@ class MappingRuleEngineTests(unit.BaseTestCase):
- Check if the user has proper domain ('federated') set - Check if the user has proper domain ('federated') set
- Check if the user has property type set ('ephemeral') - Check if the user has property type set ('ephemeral')
- Check if user's name is properly mapped from the assertion - Check if user's name is properly mapped from the assertion
- Check if user's id is properly set and equal to name, as it was not - Check if unique_id is properly set and equal to display_name,
explicitly specified in the mapping. as it was not explicitly specified in the mapping.
""" """
mapping = mapping_fixtures.MAPPING_USER_IDS mapping = mapping_fixtures.MAPPING_USER_IDS
@ -533,9 +533,11 @@ class MappingRuleEngineTests(unit.BaseTestCase):
mapped_properties = rp.process(assertion) mapped_properties = rp.process(assertion)
self.assertIsNotNone(mapped_properties) self.assertIsNotNone(mapped_properties)
self.assertValidMappedUserObject(mapped_properties) self.assertValidMappedUserObject(mapped_properties)
mapped.setup_username({}, mapped_properties)
self.assertEqual('jsmith', mapped_properties['user']['id'])
self.assertEqual('jsmith', mapped_properties['user']['name']) self.assertEqual('jsmith', mapped_properties['user']['name'])
unique_id, display_name = mapped.get_user_unique_id_and_display_name(
{}, mapped_properties)
self.assertEqual('jsmith', unique_id)
self.assertEqual('jsmith', display_name)
def test_user_identifications_name_and_federated_domain(self): def test_user_identifications_name_and_federated_domain(self):
"""Test varius mapping options and how users are identified. """Test varius mapping options and how users are identified.
@ -546,8 +548,7 @@ class MappingRuleEngineTests(unit.BaseTestCase):
- Check if the user has proper domain ('federated') set - Check if the user has proper domain ('federated') set
- Check if the user has propert type set ('ephemeral') - Check if the user has propert type set ('ephemeral')
- Check if user's name is properly mapped from the assertion - Check if user's name is properly mapped from the assertion
- Check if user's id is properly set and equal to name, as it was not - Check if the unique_id and display_name are properly set
explicitly specified in the mapping.
""" """
mapping = mapping_fixtures.MAPPING_USER_IDS mapping = mapping_fixtures.MAPPING_USER_IDS
@ -556,10 +557,10 @@ class MappingRuleEngineTests(unit.BaseTestCase):
mapped_properties = rp.process(assertion) mapped_properties = rp.process(assertion)
self.assertIsNotNone(mapped_properties) self.assertIsNotNone(mapped_properties)
self.assertValidMappedUserObject(mapped_properties) self.assertValidMappedUserObject(mapped_properties)
mapped.setup_username({}, mapped_properties) unique_id, display_name = mapped.get_user_unique_id_and_display_name(
self.assertEqual('tbo', mapped_properties['user']['name']) {}, mapped_properties)
self.assertEqual('abc123%40example.com', self.assertEqual('tbo', display_name)
mapped_properties['user']['id']) self.assertEqual('abc123%40example.com', unique_id)
def test_user_identification_id(self): def test_user_identification_id(self):
"""Test varius mapping options and how users are identified. """Test varius mapping options and how users are identified.
@ -569,9 +570,8 @@ class MappingRuleEngineTests(unit.BaseTestCase):
Test plan: Test plan:
- Check if the user has proper domain ('federated') set - Check if the user has proper domain ('federated') set
- Check if the user has propert type set ('ephemeral') - Check if the user has propert type set ('ephemeral')
- Check if user's id is properly mapped from the assertion - Check if user's display_name is properly set and equal to unique_id,
- Check if user's name is properly set and equal to id, as it was not as it was not explicitly specified in the mapping.
explicitly specified in the mapping.
""" """
mapping = mapping_fixtures.MAPPING_USER_IDS mapping = mapping_fixtures.MAPPING_USER_IDS
@ -581,9 +581,10 @@ class MappingRuleEngineTests(unit.BaseTestCase):
context = {'environment': {}} context = {'environment': {}}
self.assertIsNotNone(mapped_properties) self.assertIsNotNone(mapped_properties)
self.assertValidMappedUserObject(mapped_properties) self.assertValidMappedUserObject(mapped_properties)
mapped.setup_username(context, mapped_properties) unique_id, display_name = mapped.get_user_unique_id_and_display_name(
self.assertEqual('bob', mapped_properties['user']['name']) context, mapped_properties)
self.assertEqual('bob', mapped_properties['user']['id']) self.assertEqual('bob', unique_id)
self.assertEqual('bob', display_name)
def test_user_identification_id_and_name(self): def test_user_identification_id_and_name(self):
"""Test varius mapping options and how users are identified. """Test varius mapping options and how users are identified.
@ -593,8 +594,8 @@ class MappingRuleEngineTests(unit.BaseTestCase):
Test plan: Test plan:
- Check if the user has proper domain ('federated') set - Check if the user has proper domain ('federated') set
- Check if the user has proper type set ('ephemeral') - Check if the user has proper type set ('ephemeral')
- Check if user's name is properly mapped from the assertion - Check if display_name is properly set from the assertion
- Check if user's id is properly set and and equal to value hardcoded - Check if unique_id is properly set and and equal to value hardcoded
in the mapping in the mapping
This test does two iterations with different assertions used as input This test does two iterations with different assertions used as input
@ -615,10 +616,12 @@ class MappingRuleEngineTests(unit.BaseTestCase):
context = {'environment': {}} context = {'environment': {}}
self.assertIsNotNone(mapped_properties) self.assertIsNotNone(mapped_properties)
self.assertValidMappedUserObject(mapped_properties) self.assertValidMappedUserObject(mapped_properties)
mapped.setup_username(context, mapped_properties) unique_id, display_name = (
self.assertEqual(exp_user_name, mapped_properties['user']['name']) mapped.get_user_unique_id_and_display_name(context,
self.assertEqual('abc123%40example.com', mapped_properties)
mapped_properties['user']['id']) )
self.assertEqual(exp_user_name, display_name)
self.assertEqual('abc123%40example.com', unique_id)
def test_whitelist_pass_through(self): def test_whitelist_pass_through(self):
mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_PASS_THROUGH mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_PASS_THROUGH

View File

@ -342,6 +342,17 @@ def new_user_ref(domain_id, project_id=None, **kwargs):
return ref return ref
def new_federated_user_ref(idp_id=None, protocol_id=None, **kwargs):
ref = {
'idp_id': idp_id or 'ORG_IDP',
'protocol_id': protocol_id or 'saml2',
'unique_id': uuid.uuid4().hex,
'display_name': uuid.uuid4().hex,
}
ref.update(kwargs)
return ref
def new_group_ref(domain_id, **kwargs): def new_group_ref(domain_id, **kwargs):
ref = { ref = {
'id': uuid.uuid4().hex, 'id': uuid.uuid4().hex,

View File

@ -142,6 +142,15 @@ class SqlModels(SqlTests):
('password', sql.String, 128)) ('password', sql.String, 128))
self.assertExpectedSchema('password', cols) self.assertExpectedSchema('password', cols)
def test_federated_user_model(self):
cols = (('id', sql.Integer, None),
('user_id', sql.String, 64),
('idp_id', sql.String, 64),
('protocol_id', sql.String, 64),
('unique_id', sql.String, 255),
('display_name', sql.String, 255))
self.assertExpectedSchema('federated_user', cols)
def test_group_model(self): def test_group_model(self):
cols = (('id', sql.String, 64), cols = (('id', sql.String, 64),
('name', sql.String, 64), ('name', sql.String, 64),
@ -248,6 +257,44 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
ref['name'] = ref['name'].upper() ref['name'] = ref['name'].upper()
self.identity_api.create_user(ref) self.identity_api.create_user(ref)
def test_create_federated_user_unique_constraint(self):
federated_dict = unit.new_federated_user_ref()
user_dict = self.shadow_users_api.create_federated_user(federated_dict)
user_dict = self.identity_api.get_user(user_dict["id"])
self.assertIsNotNone(user_dict["id"])
self.assertRaises(exception.Conflict,
self.shadow_users_api.create_federated_user,
federated_dict)
def test_get_federated_user(self):
federated_dict = unit.new_federated_user_ref()
user_dict_create = self.shadow_users_api.create_federated_user(
federated_dict)
user_dict_get = self.shadow_users_api.get_federated_user(
federated_dict["idp_id"],
federated_dict["protocol_id"],
federated_dict["unique_id"])
self.assertItemsEqual(user_dict_create, user_dict_get)
self.assertEqual(user_dict_create["id"], user_dict_get["id"])
def test_update_federated_user_display_name(self):
federated_dict = unit.new_federated_user_ref()
user_dict_create = self.shadow_users_api.create_federated_user(
federated_dict)
new_display_name = uuid.uuid4().hex
self.shadow_users_api.update_federated_user_display_name(
federated_dict["idp_id"],
federated_dict["protocol_id"],
federated_dict["unique_id"],
new_display_name)
user_ref = self.shadow_users_api._get_federated_user(
federated_dict["idp_id"],
federated_dict["protocol_id"],
federated_dict["unique_id"])
self.assertEqual(user_ref.federated_users[0].display_name,
new_display_name)
self.assertEqual(user_dict_create["id"], user_ref.id)
def test_create_project_case_sensitivity(self): def test_create_project_case_sensitivity(self):
# project name case sensitivity is down to the fact that it is marked # project name case sensitivity is down to the fact that it is marked
# as an SQL UNIQUE column, which may not be valid for other backends, # as an SQL UNIQUE column, which may not be valid for other backends,

View File

@ -925,6 +925,19 @@ class SqlUpgradeTests(SqlMigrateBase):
projects = session.query(proj_table) projects = session.query(proj_table)
_check_projects(projects) _check_projects(projects)
def test_add_federated_user_table(self):
federated_user_table = 'federated_user'
self.upgrade(93)
self.assertTableDoesNotExist(federated_user_table)
self.upgrade(94)
self.assertTableColumns(federated_user_table,
['id',
'user_id',
'idp_id',
'protocol_id',
'unique_id',
'display_name'])
class VersionTests(SqlMigrateBase): class VersionTests(SqlMigrateBase):

View File

@ -2548,6 +2548,34 @@ class FederatedTokenTestsMethodToken(FederatedTokenTests):
self._check_project_scoped_token_attributes(token_resp, project['id']) self._check_project_scoped_token_attributes(token_resp, project['id'])
class FederatedUserTests(test_v3.RestfulTestCase, FederatedSetupMixin):
"""Tests for federated users
Tests new shadow users functionality
"""
def auth_plugin_config_override(self):
methods = ['saml2']
super(FederatedUserTests, self).auth_plugin_config_override(methods)
def setUp(self):
super(FederatedUserTests, self).setUp()
def load_fixtures(self, fixtures):
super(FederatedUserTests, self).load_fixtures(fixtures)
self.load_federation_sample_data()
def test_user_id_persistense(self):
"""Ensure user_id is persistend for multiple federated authn calls."""
r = self._issue_unscoped_token()
user_id = r.json_body['token']['user']['id']
r = self._issue_unscoped_token()
user_id2 = r.json_body['token']['user']['id']
self.assertEqual(user_id, user_id2)
class JsonHomeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin): class JsonHomeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
JSON_HOME_DATA = { JSON_HOME_DATA = {
'http://docs.openstack.org/api/openstack-identity/3/ext/OS-FEDERATION/' 'http://docs.openstack.org/api/openstack-identity/3/ext/OS-FEDERATION/'

View File

@ -483,6 +483,22 @@ class IdentityTestCase(test_v3.RestfulTestCase):
r = self.credential_api.get_credential(credential2['id']) r = self.credential_api.get_credential(credential2['id'])
self.assertDictEqual(credential2, r) self.assertDictEqual(credential2, r)
# shadow user tests
def test_shadow_federated_user(self):
fed_user = unit.new_federated_user_ref()
user = (
self.identity_api.shadow_federated_user(fed_user["idp_id"],
fed_user["protocol_id"],
fed_user["unique_id"],
fed_user["display_name"])
)
self.assertIsNotNone(user["id"])
self.assertEqual(len(user.keys()), 4)
self.assertIsNotNone(user['id'])
self.assertIsNotNone(user['name'])
self.assertIsNone(user['domain_id'])
self.assertEqual(user['enabled'], True)
# group crud tests # group crud tests
def test_create_group(self): def test_create_group(self):

View File

@ -125,6 +125,9 @@ keystone.identity.id_generator =
keystone.identity.id_mapping = keystone.identity.id_mapping =
sql = keystone.identity.mapping_backends.sql:Mapping sql = keystone.identity.mapping_backends.sql:Mapping
keystone.identity.shadow_users =
sql = keystone.identity.shadow_backends.sql:ShadowUsers
keystone.policy = keystone.policy =
rules = keystone.policy.backends.rules:Policy rules = keystone.policy.backends.rules:Policy
sql = keystone.policy.backends.sql:Policy sql = keystone.policy.backends.sql:Policy