Federated authentication via ECP functional tests

Adds a first test for the federated authentication feature. It handles
first the authentication using the SAML2 ECP profile.

The tests cleanup have some issues, see related bug.

Related-Bug: 1642692
Change-Id: I3b393a695c6d9f846efdaf302c1beea34e6bd54b
This commit is contained in:
Rodrigo Duarte 2016-06-02 16:08:39 -03:00 committed by Rodrigo Duarte Sousa
parent 79197b5a45
commit e508fe0238
10 changed files with 371 additions and 5 deletions

View File

@ -18,6 +18,8 @@ from keystone_tempest_plugin.services.identity.v3 import (
mapping_rules_client)
from keystone_tempest_plugin.services.identity.v3 import (
service_providers_client)
from keystone_tempest_plugin.services.identity.v3 import auth_client
from keystone_tempest_plugin.services.identity.v3 import saml2_client
from tempest import clients
@ -27,12 +29,14 @@ class Manager(clients.Manager):
def __init__(self, credentials, service=None):
super(Manager, self).__init__(credentials, service)
self.auth_client = auth_client.AuthClient(self.auth_provider)
self.identity_providers_client = (
identity_providers_client.IdentityProvidersClient(
self.auth_provider))
self.mapping_rules_client = (
mapping_rules_client.MappingRulesClient(
self.auth_provider))
self.saml2_client = saml2_client.Saml2Client()
self.service_providers_client = (
service_providers_client.ServiceProvidersClient(
self.auth_provider))

View File

@ -24,4 +24,47 @@ IdentityGroup = []
identity_feature_group = cfg.OptGroup(name='identity-feature-enabled',
title='Enabled Identity Features')
IdentityFeatureGroup = []
IdentityFeatureGroup = [
cfg.BoolOpt('federation',
default=False,
help='Does the environment support the Federated Identity '
'feature?'),
]
fed_scenario_group = cfg.OptGroup(name='fed_scenario',
title='Federation Scenario Tests Options')
FedScenarioGroup = [
# Identity Provider
cfg.StrOpt('idp_id',
help='The Identity Provider ID'),
cfg.ListOpt('idp_remote_ids',
default=[],
help='The Identity Provider remote IDs list'),
cfg.StrOpt('idp_username',
help='Username used to login in the Identity Provider'),
cfg.StrOpt('idp_password',
help='Password used to login in the Identity Provider'),
cfg.StrOpt('idp_ecp_url',
help='Identity Provider SAML2/ECP URL'),
# Mapping rules
cfg.StrOpt('mapping_remote_type',
help='The assertion attribute to be used in the remote rules'),
cfg.StrOpt('mapping_user_name',
default='{0}',
help='The username to be used in the local rules.'),
cfg.StrOpt('mapping_group_name',
default='federated_users',
help='The group name to be used in the local rules. The group '
'must have at least one assignment in one project.'),
cfg.StrOpt('mapping_group_domain_name',
default='federated_domain',
help='The domain name where the "mapping_group_name" is '
'created.'),
# Protocol
cfg.StrOpt('protocol_id',
default='mapped',
help='The Protocol ID')
]

View File

@ -33,7 +33,15 @@ class KeystoneTempestPlugin(plugins.TempestPlugin):
def register_opts(self, conf):
config.register_opt_group(conf, project_config.identity_group,
project_config.IdentityGroup)
config.register_opt_group(conf, project_config.identity_feature_group,
project_config.IdentityFeatureGroup)
config.register_opt_group(conf, project_config.fed_scenario_group,
project_config.FedScenarioGroup)
def get_opt_lists(self):
return [(project_config.identity_group.name,
project_config.IdentityGroup)]
project_config.IdentityGroup),
(project_config.identity_feature_group.name,
project_config.IdentityFeatureGroup),
(project_config.fed_scenario_group.name,
project_config.FedScenarioGroup)]

View File

@ -0,0 +1,39 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from tempest.lib.common import rest_client
from keystone_tempest_plugin.services.identity import clients
class AuthClient(clients.Identity):
def _get_scopes(self, url, token_id):
resp, body = self.raw_request(
url, 'GET', headers={'X-Auth-Token': token_id})
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def get_available_projects_scopes(self, keystone_v3_endpoint, token_id):
"""Get projects that are available to be scoped to based on a token."""
url = '%s/auth/projects' % keystone_v3_endpoint
return self._get_scopes(url, token_id)
def get_available_domains_scopes(self, keystone_v3_endpoint, token_id):
"""Get domains that are available to be scoped to based on a token."""
url = '%s/auth/domains' % keystone_v3_endpoint
return self._get_scopes(url, token_id)

View File

@ -0,0 +1,92 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from lxml import etree
import requests
class Saml2Client(object):
ECP_SP_EMPTY_REQUEST_HEADERS = {
'Accept': 'text/html, application/vnd.paos+xml',
'PAOS': ('ver="urn:liberty:paos:2003-08";"urn:oasis:names:tc:'
'SAML:2.0:profiles:SSO:ecp"')
}
ECP_SP_SAML2_REQUEST_HEADERS = {'Content-Type': 'application/vnd.paos+xml'}
def __init__(self):
self.reset_session()
def reset_session(self):
self.session = requests.Session()
def _idp_auth_url(self, keystone_v3_endpoint, idp_id, protocol_id):
subpath = 'OS-FEDERATION/identity_providers/%s/protocols/%s/auth' % (
idp_id, protocol_id)
return '%s/%s' % (keystone_v3_endpoint, subpath)
def send_service_provider_request(self, keystone_v3_endpoint,
idp_id, protocol_id):
return self.session.get(
self._idp_auth_url(keystone_v3_endpoint, idp_id, protocol_id),
headers=self.ECP_SP_EMPTY_REQUEST_HEADERS
)
def _prepare_sp_saml2_authn_response(self, saml2_idp_authn_response,
relay_state):
# Replace the header contents of the Identity Provider response with
# the relay state initially sent by the Service Provider. The response
# is a SOAP envelope with the following structure:
#
# <S:Envelope
# <S:Header>
# ...
# </S:Header>
# <S:Body>
# ...
# </S:Body>
# </S:Envelope>
saml2_idp_authn_response[0][0] = relay_state
def send_identity_provider_authn_request(self, saml2_authn_request,
idp_url, username, password):
saml2_authn_request.remove(saml2_authn_request[0])
return self.session.post(
idp_url,
headers={'Content-Type': 'text/xml'},
data=etree.tostring(saml2_authn_request),
auth=(username, password)
)
def send_service_provider_saml2_authn_response(
self, saml2_idp_authn_response, relay_state, idp_consumer_url):
self._prepare_sp_saml2_authn_response(
saml2_idp_authn_response, relay_state)
return self.session.post(
idp_consumer_url,
headers=self.ECP_SP_SAML2_REQUEST_HEADERS,
data=etree.tostring(saml2_idp_authn_response),
# Do not follow HTTP redirect
allow_redirects=False
)
def send_service_provider_unscoped_token_request(self, sp_url):
return self.session.get(
sp_url,
headers=self.ECP_SP_SAML2_REQUEST_HEADERS
)

View File

@ -16,8 +16,8 @@ from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from keystone_tempest_plugin.tests.api.identity import base
from keystone_tempest_plugin.tests.api.identity.v3 import fixtures
from keystone_tempest_plugin.tests import base
class IndentityProvidersTest(base.BaseIdentityTest):

View File

@ -17,8 +17,8 @@ from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from keystone_tempest_plugin.tests.api.identity import base
from keystone_tempest_plugin.tests.api.identity.v3 import fixtures
from keystone_tempest_plugin.tests import base
class MappingRulesTest(base.BaseIdentityTest):

View File

@ -17,8 +17,8 @@ from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from keystone_tempest_plugin.tests.api.identity import base
from keystone_tempest_plugin.tests.api.identity.v3 import fixtures
from keystone_tempest_plugin.tests import base
DEFAULT_RELAY_STATE_PREFIX = 'ss:mem:'

View File

@ -33,6 +33,9 @@ class BaseIdentityTest(test.BaseTestCase):
credentials = common_creds.get_configured_admin_credentials(
cls.credential_type, identity_version=cls.identity_version)
cls.keystone_manager = clients.Manager(credentials=credentials)
cls.auth_client = cls.keystone_manager.auth_client
cls.idps_client = cls.keystone_manager.identity_providers_client
cls.mappings_client = cls.keystone_manager.mapping_rules_client
cls.saml2_client = cls.keystone_manager.saml2_client
cls.sps_client = cls.keystone_manager.service_providers_client
cls.tokens_client = cls.keystone_manager.token_v3_client

View File

@ -0,0 +1,177 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from lxml import etree
from six.moves import http_client
from tempest import config
from tempest.lib.common.utils import data_utils
from testtools import skipUnless
from keystone_tempest_plugin.tests import base
CONF = config.CONF
class TestSaml2EcpFederatedAuthentication(base.BaseIdentityTest):
ECP_SAML2_NAMESPACES = {
'ecp': 'urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp',
'S': 'http://schemas.xmlsoap.org/soap/envelope/',
'paos': 'urn:liberty:paos:2003-08'
}
ECP_SERVICE_PROVIDER_CONSUMER_URL = ('/S:Envelope/S:Header/paos:Request/'
'@responseConsumerURL')
ECP_IDP_CONSUMER_URL = ('/S:Envelope/S:Header/ecp:Response/'
'@AssertionConsumerServiceURL')
ECP_RELAY_STATE = '//ecp:RelayState'
def _setup_settings(self):
self.idp_id = CONF.fed_scenario.idp_id
self.idp_url = CONF.fed_scenario.idp_ecp_url
self.keystone_v3_endpoint = CONF.identity.uri_v3
self.password = CONF.fed_scenario.idp_password
self.protocol_id = CONF.fed_scenario.protocol_id
self.username = CONF.fed_scenario.idp_username
def _setup_idp(self):
remote_ids = CONF.fed_scenario.idp_remote_ids
self.idps_client.create_identity_provider(
self.idp_id, remote_ids=remote_ids, enabled=True)
self.addCleanup(
self.idps_client.delete_identity_provider, self.idp_id)
def _setup_mapping(self):
self.mapping_id = data_utils.rand_uuid_hex()
mapping_remote_type = CONF.fed_scenario.mapping_remote_type
mapping_user_name = CONF.fed_scenario.mapping_user_name
mapping_group_name = CONF.fed_scenario.mapping_group_name
mapping_group_domain_name = CONF.fed_scenario.mapping_group_domain_name
rules = [{
'local': [
{
'user': {'name': mapping_user_name}
},
{
'group': {
'domain': {'name': mapping_group_domain_name},
'name': mapping_group_name
}
}
],
'remote': [
{
'type': mapping_remote_type
}
]
}]
mapping_ref = {'rules': rules}
self.mappings_client.create_mapping_rule(self.mapping_id, mapping_ref)
self.addCleanup(
self.mappings_client.delete_mapping_rule, self.mapping_id)
def _setup_protocol(self):
self.idps_client.add_protocol_and_mapping(
self.idp_id, self.protocol_id, self.mapping_id)
self.addCleanup(
self.idps_client.delete_protocol_and_mapping,
self.idp_id,
self.protocol_id)
def setUp(self):
super(TestSaml2EcpFederatedAuthentication, self).setUp()
self._setup_settings()
# Reset client's session to avoid getting garbage from another runs
self.saml2_client.reset_session()
# Setup identity provider, mapping and protocol
self._setup_idp()
self._setup_mapping()
self._setup_protocol()
def _str_from_xml(self, xml, path):
l = xml.xpath(path, namespaces=self.ECP_SAML2_NAMESPACES)
self.assertEqual(1, len(l))
return l[0]
def _request_unscoped_token(self):
resp = self.saml2_client.send_service_provider_request(
self.keystone_v3_endpoint, self.idp_id, self.protocol_id)
self.assertEqual(http_client.OK, resp.status_code)
saml2_authn_request = etree.XML(resp.content)
relay_state = self._str_from_xml(
saml2_authn_request, self.ECP_RELAY_STATE)
sp_consumer_url = self._str_from_xml(
saml2_authn_request, self.ECP_SERVICE_PROVIDER_CONSUMER_URL)
# Perform the authn request to the identity provider
resp = self.saml2_client.send_identity_provider_authn_request(
saml2_authn_request, self.idp_url, self.username, self.password)
self.assertEqual(http_client.OK, resp.status_code)
saml2_idp_authn_response = etree.XML(resp.content)
idp_consumer_url = self._str_from_xml(
saml2_idp_authn_response, self.ECP_IDP_CONSUMER_URL)
# Assert that both saml2_authn_request and saml2_idp_authn_response
# have the same consumer URL.
self.assertEqual(sp_consumer_url, idp_consumer_url)
# Present the identity provider authn response to the service provider.
resp = self.saml2_client.send_service_provider_saml2_authn_response(
saml2_idp_authn_response, relay_state, idp_consumer_url)
# Must receive a redirect from service provider to the URL where the
# unscoped token can be retrieved.
self.assertIn(resp.status_code,
[http_client.FOUND, http_client.SEE_OTHER])
# We can receive multiple types of errors here, the response depends on
# the mapping and the username used to authenticate in the Identity
# Provider and also in the Identity Provider remote ID validation.
# If everything works well, we receive an unscoped token.
sp_url = resp.headers['location']
resp = (
self.saml2_client.send_service_provider_unscoped_token_request(
sp_url))
self.assertEqual(http_client.CREATED, resp.status_code)
self.assertIn('X-Subject-Token', resp.headers)
self.assertNotEmpty(resp.json())
return resp
@skipUnless(CONF.identity_feature_enabled.federation,
"Federated Identity feature not enabled")
def test_request_unscoped_token(self):
self._request_unscoped_token()
@skipUnless(CONF.identity_feature_enabled.federation,
"Federated Identity feature not enabled")
def test_request_scoped_token(self):
resp = self._request_unscoped_token()
token_id = resp.headers['X-Subject-Token']
projects = self.auth_client.get_available_projects_scopes(
self.keystone_v3_endpoint, token_id)['projects']
self.assertNotEmpty(projects)
# Get a scoped token to one of the listed projects
self.tokens_client.auth(
project_id=projects[0]['id'], token=token_id)