Use SAML2 requests plugin

This is a major refactoring of the SAML2 plugin to move the logic into a
standalone requests auth plugin, and then have the keystoneauth plugin
simply provide a wrapper around that.

There was really no way to migrate this and keep the existing test files
as they were because the entire structure has been changed.

This will be the recommended way to do federation plugins in future and
keep the auth logic out of keystoneauth as much as possible (as kerberos
already does).

The intention will be that later we should be able to extract the SAML
ECP requests plugin into it's own upstream module.

Change-Id: I4a7377b9350741e8f7a4ed2a49a7e2442eacdd23
This commit is contained in:
Jamie Lennox 2015-11-09 19:17:28 +11:00 committed by Steve Martinelli
parent 76bd9bb00f
commit 701b911437
8 changed files with 501 additions and 471 deletions

View File

@ -12,6 +12,8 @@
from keystoneauth1.extras._saml2 import v3
_V3_SAML2_AVAILABLE = v3._SAML2_AVAILABLE
V3Saml2Password = v3.Saml2Password
V3ADFSPassword = v3.ADFSPassword

View File

@ -20,6 +20,10 @@ class Saml2Password(loading.BaseFederationLoader):
def plugin_class(self):
return _saml2.V3Saml2Password
@property
def available(self):
return _saml2._V3_SAML2_AVAILABLE
def get_options(self):
options = super(Saml2Password, self).get_options()

View File

@ -13,6 +13,8 @@
from keystoneauth1.extras._saml2.v3 import adfs
from keystoneauth1.extras._saml2.v3 import saml2
_SAML2_AVAILABLE = saml2.etree is not None
Saml2Password = saml2.Password
ADFSPassword = adfs.Password

View File

@ -10,14 +10,240 @@
# License for the specific language governing permissions and limitations
# under the License.
from lxml import etree
import abc
try:
from lxml import etree
except ImportError:
etree = None
import requests
import requests.auth
from keystoneauth1 import access
from keystoneauth1 import exceptions
from keystoneauth1.extras._saml2.v3 import base
from keystoneauth1.identity import v3
_PAOS_NAMESPACE = 'urn:liberty:paos:2003-08'
_ECP_NAMESPACE = 'urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp'
_PAOS_HEADER = 'application/vnd.paos+xml'
_PAOS_VER = 'ver="%s";"%s"' % (_PAOS_NAMESPACE, _ECP_NAMESPACE)
_XML_NAMESPACES = {
'ecp': _ECP_NAMESPACE,
'S': 'http://schemas.xmlsoap.org/soap/envelope/',
'paos': _PAOS_NAMESPACE,
}
_XBASE = '/S:Envelope/S:Header/'
_XPATH_SP_RELAY_STATE = '//ecp:RelayState'
_XPATH_SP_CONSUMER_URL = _XBASE + 'paos:Request/@responseConsumerURL'
_XPATH_IDP_CONSUMER_URL = _XBASE + 'ecp:Response/@AssertionConsumerServiceURL'
_SOAP_FAULT = """
<S:Envelope xmlns:S="http://schemas.xmlsoap.org/soap/envelope/">
<S:Body>
<S:Fault>
<faultcode>S:Server</faultcode>
<faultstring>responseConsumerURL from SP and
assertionConsumerServiceURL from IdP do not match
</faultstring>
</S:Fault>
</S:Body>
</S:Envelope>
"""
class Password(base.BaseSAMLPlugin):
class SamlException(Exception):
"""Base SAML plugin exception."""
class InvalidResponse(SamlException):
"""Invalid Response from SAML authentication."""
class ConsumerMismatch(SamlException):
"""The SP and IDP consumers do not match."""
def _response_xml(response, name):
try:
return etree.XML(response.content)
except etree.XMLSyntaxError as e:
msg = 'SAML2: Error parsing XML returned from %s: %s' % (name, e)
raise InvalidResponse(msg)
def _str_from_xml(xml, path):
l = xml.xpath(path, namespaces=_XML_NAMESPACES)
if len(l) != 1:
raise IndexError('%s should provide a single element list' % path)
return l[0]
class _SamlAuth(requests.auth.AuthBase):
"""A generic SAML ECP plugin for requests.
This is a multi-step process including multiple HTTP requests.
Authentication consists of:
* HTTP GET request to the Service Provider.
It's crucial to include HTTP headers indicating we are expecting SOAP
message in return. Service Provider should respond with a SOAP
message.
* HTTP POST request to the external Identity Provider service with
ECP extension enabled. The content sent is a header removed SOAP
message returned from the Service Provider. It's also worth noting
that ECP extension to the SAML2 doesn't define authentication method.
The most popular is HttpBasicAuth with just user and password.
Other possibilities could be X509 certificates or Kerberos.
Upon successful authentication the user should receive a SAML2
assertion.
* HTTP POST request again to the Service Provider. The body of the
request includes SAML2 assertion issued by a trusted Identity
Provider. The request should be sent to the Service Provider
consumer url specified in the SAML2 assertion.
Providing the authentication was successful and both Service Provider
and Identity Providers are trusted to each other, the Service
Provider will issue an unscoped token with a list of groups the
federated user is a member of.
"""
def __init__(self, identity_provider_url, requests_auth):
super(_SamlAuth, self).__init__()
self.identity_provider_url = identity_provider_url
self.requests_auth = requests_auth
def __call__(self, request):
try:
accept = request.headers['Accept']
except KeyError:
request.headers['Accept'] = _PAOS_HEADER
else:
request.headers['Accept'] = ';'.join([accept, _PAOS_HEADER])
request.headers['PAOS'] = _PAOS_VER
request.register_hook('response', self._handle_response)
return request
def _handle_response(self, response, **kwargs):
if (response.status_code == 200 and
response.headers.get('Content-Type') == _PAOS_HEADER):
response = self._ecp_retry(response, **kwargs)
return response
def _ecp_retry(self, sp_response, **kwargs):
history = [sp_response]
def send(*send_args, **send_kwargs):
req = requests.Request(*send_args, **send_kwargs)
return sp_response.connection.send(req.prepare(), **kwargs)
authn_request = _response_xml(sp_response, 'Service Provider')
relay_state = _str_from_xml(authn_request, _XPATH_SP_RELAY_STATE)
sp_consumer_url = _str_from_xml(authn_request, _XPATH_SP_CONSUMER_URL)
authn_request.remove(authn_request[0])
idp_response = send('POST',
self.identity_provider_url,
headers={'Content-type': 'text/xml'},
data=etree.tostring(authn_request),
auth=self.requests_auth)
history.append(idp_response)
authn_response = _response_xml(idp_response, 'Identity Provider')
idp_consumer_url = _str_from_xml(authn_response,
_XPATH_IDP_CONSUMER_URL)
if sp_consumer_url != idp_consumer_url:
# send fault message to the SP, discard the response
send('POST',
sp_consumer_url,
data=_SOAP_FAULT,
headers={'Content-Type': _PAOS_HEADER})
# prepare error message and raise an exception.
msg = ('Consumer URLs from Service Provider %(service_provider)s '
'%(sp_consumer_url)s and Identity Provider '
'%(identity_provider)s %(idp_consumer_url)s are not equal')
msg = msg % {
'service_provider': sp_response.request.url,
'sp_consumer_url': sp_consumer_url,
'identity_provider': self.identity_provider_url,
'idp_consumer_url': idp_consumer_url
}
raise ConsumerMismatch(msg)
authn_response[0][0] = relay_state
# idp_consumer_url is the URL on the SP that handles the ECP body
# returned and creates an authenticated session.
final_resp = send('POST',
idp_consumer_url,
headers={'Content-Type': _PAOS_HEADER},
cookies=idp_response.cookies,
data=etree.tostring(authn_response))
history.append(final_resp)
# the SP should then redirect us back to the original URL to retry the
# original request.
if final_resp.status_code in (requests.codes.found,
requests.codes.other):
# Consume content and release the original connection
# to allow our new request to reuse the same one.
sp_response.content
sp_response.raw.release_conn()
req = sp_response.request.copy()
req.url = final_resp.headers['location']
req.prepare_cookies(final_resp.cookies)
final_resp = sp_response.connection.send(req, **kwargs)
history.append(final_resp)
final_resp.history.extend(history)
return final_resp
class _FederatedSaml(v3.FederationBaseAuth):
def __init__(self, auth_url, identity_provider, protocol,
identity_provider_url, **kwargs):
super(_FederatedSaml, self).__init__(auth_url,
identity_provider,
protocol,
**kwargs)
self.identity_provider_url = identity_provider_url
@abc.abstractmethod
def get_requests_auth(self):
raise NotImplemented()
def get_unscoped_auth_ref(self, session, **kwargs):
method = self.get_requests_auth()
auth = _SamlAuth(self.identity_provider_url, method)
try:
resp = session.get(self.federated_token_url,
requests_auth=auth,
authenticated=False)
except SamlException as e:
raise exceptions.AuthorizationFailure(str(e))
return access.create(resp=resp)
class Password(_FederatedSaml):
r"""Implement authentication plugin for SAML2 protocol.
ECP stands for `Enhanced Client or Proxy` and is a SAML2 extension
@ -52,7 +278,6 @@ class Password(base.BaseSAMLPlugin):
:param password: User's password
:type password: string
:param protocol: Protocol to be used for the authentication.
The name must be equal to one configured at the
keystone sp side. This value is used for building
@ -62,274 +287,15 @@ class Password(base.BaseSAMLPlugin):
"""
SAML2_HEADER_INDEX = 0
ECP_SP_EMPTY_REQUEST_HEADERS = {
'Accept': 'text/html, application/vnd.paos+xml',
'PAOS': ('ver="urn:liberty:paos:2003-08";"urn:oasis:names:tc:'
'SAML:2.0:profiles:SSO:ecp"')
}
def __init__(self, auth_url, identity_provider, protocol,
identity_provider_url, username, password, **kwargs):
super(Password, self).__init__(auth_url,
identity_provider,
protocol,
identity_provider_url,
**kwargs)
self.username = username
self.password = password
ECP_SP_SAML2_REQUEST_HEADERS = {
'Content-Type': 'application/vnd.paos+xml'
}
ECP_SAML2_NAMESPACES = {
'ecp': 'urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp',
'S': 'http://schemas.xmlsoap.org/soap/envelope/',
'paos': 'urn:liberty:paos:2003-08'
}
ECP_RELAY_STATE = '//ecp:RelayState'
ECP_SERVICE_PROVIDER_CONSUMER_URL = ('/S:Envelope/S:Header/paos:Request/'
'@responseConsumerURL')
ECP_IDP_CONSUMER_URL = ('/S:Envelope/S:Header/ecp:Response/'
'@AssertionConsumerServiceURL')
SOAP_FAULT = """
<S:Envelope xmlns:S="http://schemas.xmlsoap.org/soap/envelope/">
<S:Body>
<S:Fault>
<faultcode>S:Server</faultcode>
<faultstring>responseConsumerURL from SP and
assertionConsumerServiceURL from IdP do not match
</faultstring>
</S:Fault>
</S:Body>
</S:Envelope>
"""
def _handle_http_ecp_redirect(self, session, response, method, **kwargs):
if response.status_code not in (self.HTTP_MOVED_TEMPORARILY,
self.HTTP_SEE_OTHER):
return response
location = response.headers['location']
return session.request(location, method, authenticated=False,
**kwargs)
def _prepare_idp_saml2_request(self, saml2_authn_request):
header = saml2_authn_request[self.SAML2_HEADER_INDEX]
saml2_authn_request.remove(header)
def _check_consumer_urls(self, session, sp_response_consumer_url,
idp_sp_response_consumer_url):
"""Check if consumer URLs issued by SP and IdP are equal.
In the initial SAML2 authn Request issued by a Service Provider
there is a url called ``consumer url``. A trusted Identity Provider
should issue identical url. If the URLs are not equal the federated
authn process should be interrupted and the user should be warned.
:param session: session object to send out HTTP requests.
:type session: keystoneauth1.session.Session
:param sp_response_consumer_url: consumer URL issued by a SP
:type sp_response_consumer_url: string
:param idp_sp_response_consumer_url: consumer URL issued by an IdP
:type idp_sp_response_consumer_url: string
"""
if sp_response_consumer_url != idp_sp_response_consumer_url:
# send fault message to the SP, discard the response
session.post(sp_response_consumer_url, data=self.SOAP_FAULT,
headers=self.ECP_SP_SAML2_REQUEST_HEADERS,
authenticated=False)
# prepare error message and raise an exception.
msg = ('Consumer URLs from Service Provider %(service_provider)s '
'%(sp_consumer_url)s and Identity Provider '
'%(identity_provider)s %(idp_consumer_url)s are not equal')
msg = msg % {
'service_provider': self.federated_token_url,
'sp_consumer_url': sp_response_consumer_url,
'identity_provider': self.identity_provider,
'idp_consumer_url': idp_sp_response_consumer_url
}
raise exceptions.AuthorizationFailure(msg)
def _send_service_provider_request(self, session):
"""Initial HTTP GET request to the SAML2 protected endpoint.
It's crucial to include HTTP headers indicating that the client is
willing to take advantage of the ECP SAML2 extension and receive data
as the SOAP.
Unlike standard authentication methods in the OpenStack Identity,
the client accesses::
``/v3/OS-FEDERATION/identity_providers/{identity_providers}/
protocols/{protocol}/auth``
After a successful HTTP call the HTTP response should include SAML2
authn request in the XML format.
If a HTTP response contains ``X-Subject-Token`` in the headers and
the response body is a valid JSON assume the user was already
authenticated and Keystone returned a valid unscoped token.
Return True indicating the user was already authenticated.
:param session: a session object to send out HTTP requests.
:type session: keystoneauth1.session.Session
"""
sp_response = session.get(self.federated_token_url,
headers=self.ECP_SP_EMPTY_REQUEST_HEADERS,
authenticated=False)
if 'X-Subject-Token' in sp_response.headers:
self.authenticated_response = sp_response
return True
try:
self.saml2_authn_request = etree.XML(sp_response.content)
except etree.XMLSyntaxError as e:
msg = ('SAML2: Error parsing XML returned '
'from Service Provider, reason: %s') % e
raise exceptions.AuthorizationFailure(msg)
relay_state = self.saml2_authn_request.xpath(
self.ECP_RELAY_STATE, namespaces=self.ECP_SAML2_NAMESPACES)
self.relay_state = self._first(relay_state)
sp_response_consumer_url = self.saml2_authn_request.xpath(
self.ECP_SERVICE_PROVIDER_CONSUMER_URL,
namespaces=self.ECP_SAML2_NAMESPACES)
self.sp_response_consumer_url = self._first(sp_response_consumer_url)
return False
def _send_idp_saml2_authn_request(self, session):
"""Present modified SAML2 authn assertion from the Service Provider."""
self._prepare_idp_saml2_request(self.saml2_authn_request)
idp_saml2_authn_request = self.saml2_authn_request
# Currently HTTPBasicAuth method is hardcoded into the plugin
idp_response = session.post(
self.identity_provider_url,
headers={'Content-type': 'text/xml'},
data=etree.tostring(idp_saml2_authn_request),
requests_auth=(self.username, self.password),
authenticated=False, log=False)
try:
self.saml2_idp_authn_response = etree.XML(idp_response.content)
except etree.XMLSyntaxError as e:
msg = ('SAML2: Error parsing XML returned '
'from Identity Provider, reason: %s') % e
raise exceptions.AuthorizationFailure(msg)
idp_response_consumer_url = self.saml2_idp_authn_response.xpath(
self.ECP_IDP_CONSUMER_URL,
namespaces=self.ECP_SAML2_NAMESPACES)
self.idp_response_consumer_url = self._first(idp_response_consumer_url)
self._check_consumer_urls(session, self.idp_response_consumer_url,
self.sp_response_consumer_url)
def _send_service_provider_saml2_authn_response(self, session):
"""Present SAML2 assertion to the Service Provider.
The assertion is issued by a trusted Identity Provider for the
authenticated user. This function directs the HTTP request to SP
managed URL, for instance: ``https://<host>:<port>/Shibboleth.sso/
SAML2/ECP``.
Upon success the there's a session created and access to the protected
resource is granted. Many implementations of the SP return HTTP 302
status code pointing to the protected URL (``https://<host>:<port>/v3/
OS-FEDERATION/identity_providers/{identity_provider}/protocols/
{protocol_id}/auth`` in this case). Saml2 plugin should point to that
URL again, with HTTP GET method, expecting an unscoped token.
:param session: a session object to send out HTTP requests.
"""
self.saml2_idp_authn_response[0][0] = self.relay_state
response = session.post(
self.idp_response_consumer_url,
headers=self.ECP_SP_SAML2_REQUEST_HEADERS,
data=etree.tostring(self.saml2_idp_authn_response),
authenticated=False, redirect=False)
# Don't follow HTTP specs - after the HTTP 302/303 response don't
# repeat the call directed to the Location URL. In this case, this is
# an indication that saml2 session is now active and protected resource
# can be accessed.
response = self._handle_http_ecp_redirect(
session, response, method='GET',
headers=self.ECP_SP_SAML2_REQUEST_HEADERS)
self.authenticated_response = response
def get_unscoped_auth_ref(self, session):
"""Get unscoped OpenStack token after federated authentication.
This is a multi-step process including multiple HTTP requests.
The federated authentication consists of:
* HTTP GET request to the Identity Service (acting as a Service
Provider).
It's crucial to include HTTP headers indicating we are expecting SOAP
message in return. Service Provider should respond with such SOAP
message. This step is handed by a method
``Saml2Password_send_service_provider_request()``.
* HTTP POST request to the external Identity Provider service with
ECP extension enabled. The content sent is a header removed SOAP
message returned from the Service Provider. It's also worth noting
that ECP extension to the SAML2 doesn't define authentication method.
The most popular is HttpBasicAuth with just user and password.
Other possibilities could be X509 certificates or Kerberos.
Upon successful authentication the user should receive a SAML2
assertion.
This step is handed by a method
``Saml2Password_send_idp_saml2_authn_request(session)``
* HTTP POST request again to the Service Provider. The body of the
request includes SAML2 assertion issued by a trusted Identity
Provider. The request should be sent to the Service Provider
consumer url specified in the SAML2 assertion.
Providing the authentication was successful and both Service Provider
and Identity Providers are trusted to each other, the Service
Provider will issue an unscoped token with a list of groups the
federated user is a member of.
This step is handed by a method
``Saml2Password_send_service_provider_saml2_authn_response()``
Unscoped token example::
{
"token": {
"methods": [
"saml2"
],
"user": {
"id": "username%40example.com",
"name": "username@example.com",
"OS-FEDERATION": {
"identity_provider": "ACME",
"protocol": "saml2",
"groups": [
{"id": "abc123"},
{"id": "bcd234"}
]
}
}
}
}
:param session : a session object to send out HTTP requests.
:type session: keystoneauth1.session.Session
:returns: AccessInfo
:rtype: :py:class:`keystoneauth1.access.AccessInfo`
"""
saml_authenticated = self._send_service_provider_request(session)
if not saml_authenticated:
self._send_idp_saml2_authn_request(session)
self._send_service_provider_saml2_authn_response(session)
return access.create(resp=self.authenticated_response)
def get_requests_auth(self):
return requests.auth.HTTPBasicAuth(self.username, self.password)

View File

@ -36,8 +36,15 @@ def saml_assertion(**kwargs):
return template('saml_assertion.xml', **kwargs).encode('utf-8')
def authn_request(**kwargs):
kwargs.setdefault('issuer',
'https://openstack4.local/Shibboleth.sso/SAML2/ECP')
return template('authn_request.xml', **kwargs).encode('utf-8')
SP_SOAP_RESPONSE = soap_response()
SAML2_ASSERTION = saml_assertion()
AUTHN_REQUEST = authn_request()
UNSCOPED_TOKEN_HEADER = 'UNSCOPED_TOKEN'

View File

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<S:Envelope xmlns:S="http://schemas.xmlsoap.org/soap/envelope/">
<S:Body>
<samlp:AuthnRequest
xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"
AssertionConsumerServiceURL="https://openstack4.local/Shibboleth.sso/SAML2/ECP"
ID="_a07186e3992e70e92c17b9d249495643"
IssueInstant="2014-06-09T09:48:57Z"
ProtocolBinding="urn:oasis:names:tc:SAML:2.0:bindings:PAOS"
Version="2.0">
<saml:Issuer xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">
$issuer
</saml:Issuer>
<samlp:NameIDPolicy AllowCreate="1"/>
<samlp:Scoping>
<samlp:IDPList>
<samlp:IDPEntry ProviderID="https://idp.testshib.org/idp/shibboleth"/>
</samlp:IDPList>
</samlp:Scoping>
</samlp:AuthnRequest>
</S:Body>
</S:Envelope>

View File

@ -10,255 +10,281 @@
# License for the specific language governing permissions and limitations
# under the License.
import base64
import uuid
from lxml import etree
import requests
from keystoneauth1 import exceptions
from keystoneauth1.extras import _saml2 as saml2
from keystoneauth1 import fixture as ksa_fixtures
from keystoneauth1 import session
from keystoneauth1.tests.unit.extras.saml2 import fixtures as saml2_fixtures
from keystoneauth1.tests.unit.extras.saml2 import utils
from keystoneauth1.tests.unit import matchers
PAOS_HEADER = {'Content-Type': 'application/vnd.paos+xml'}
InvalidResponse = saml2.v3.saml2.InvalidResponse
class AuthenticateviaSAML2Tests(utils.TestCase):
GROUP = 'auth'
TEST_TOKEN = uuid.uuid4().hex
class SamlAuth2PluginTests(utils.TestCase):
"""These test ONLY the standalone requests auth plugin.
def setUp(self):
super(AuthenticateviaSAML2Tests, self).setUp()
Tests for the auth plugin are later so that hopefully these can be
extracted into it's own module.
"""
self.ECP_SP_EMPTY_REQUEST_HEADERS = {
'Accept': 'text/html; application/vnd.paos+xml',
'PAOS': ('ver="urn:liberty:paos:2003-08";'
'"urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp"')
}
TEST_USER = 'user'
TEST_PASS = 'pass'
TEST_SP_URL = 'http://sp.test'
TEST_IDP_URL = 'http://idp.test'
TEST_CONSUMER_URL = "https://openstack4.local/Shibboleth.sso/SAML2/ECP"
self.ECP_SP_SAML2_REQUEST_HEADERS = {
'Content-Type': 'application/vnd.paos+xml'
}
def get_plugin(self, **kwargs):
kwargs.setdefault('identity_provider_url', self.TEST_IDP_URL)
kwargs.setdefault('requests_auth', (self.TEST_USER, self.TEST_PASS))
return saml2.v3.saml2._SamlAuth(**kwargs)
self.ECP_SAML2_NAMESPACES = {
'ecp': 'urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp',
'S': 'http://schemas.xmlsoap.org/soap/envelope/',
'paos': 'urn:liberty:paos:2003-08'
}
self.ECP_RELAY_STATE = '//ecp:RelayState'
self.ECP_SERVICE_PROVIDER_CONSUMER_URL = ('/S:Envelope/S:Header/paos:'
'Request/'
'@responseConsumerURL')
self.ECP_IDP_CONSUMER_URL = ('/S:Envelope/S:Header/ecp:Response/'
'@AssertionConsumerServiceURL')
@property
def calls(self):
return [r.url.strip('/') for r in self.requests_mock.request_history]
self.IDENTITY_PROVIDER = 'testidp'
self.IDENTITY_PROVIDER_URL = 'http://local.url'
self.PROTOCOL = 'saml2'
self.FEDERATION_AUTH_URL = '%s/%s' % (
self.TEST_URL,
'OS-FEDERATION/identity_providers/testidp/protocols/saml2/auth')
self.SHIB_CONSUMER_URL = ('https://openstack4.local/'
'Shibboleth.sso/SAML2/ECP')
def basic_header(self, username=TEST_USER, password=TEST_PASS):
user_pass = ('%s:%s' % (username, password)).encode('utf-8')
return 'Basic %s' % base64.b64encode(user_pass).decode('utf-8')
self.saml2plugin = saml2.V3Saml2Password(
self.TEST_URL,
self.IDENTITY_PROVIDER, self.IDENTITY_PROVIDER_URL,
self.TEST_USER, self.TEST_TOKEN, self.PROTOCOL)
def test_passed_when_not_200(self):
text = uuid.uuid4().hex
test_url = 'http://another.test'
self.requests_mock.get(test_url,
status_code=201,
headers=PAOS_HEADER,
text=text)
def test_initial_sp_call(self):
"""Test initial call, expect SOAP message."""
self.requests_mock.get(
self.FEDERATION_AUTH_URL,
content=utils.make_oneline(saml2_fixtures.SP_SOAP_RESPONSE))
a = self.saml2plugin._send_service_provider_request(self.session)
resp = requests.get(test_url, auth=self.get_plugin())
self.assertEqual(201, resp.status_code)
self.assertEqual(text, resp.text)
self.assertFalse(a)
def test_200_without_paos_header(self):
text = uuid.uuid4().hex
test_url = 'http://another.test'
self.requests_mock.get(test_url, status_code=200, text=text)
sp_soap_response = etree.tostring(self.saml2plugin.saml2_authn_request)
resp = requests.get(test_url, auth=self.get_plugin())
self.assertEqual(200, resp.status_code)
self.assertEqual(text, resp.text)
self.assertThat(saml2_fixtures.SP_SOAP_RESPONSE,
matchers.XMLEquals(sp_soap_response))
def test_standard_workflow_302_redirect(self):
text = uuid.uuid4().hex
self.assertEqual(
self.saml2plugin.sp_response_consumer_url, self.SHIB_CONSUMER_URL,
"Expected consumer_url set to %s instead of %s" % (
self.SHIB_CONSUMER_URL,
str(self.saml2plugin.sp_response_consumer_url)))
self.requests_mock.get(self.TEST_SP_URL, response_list=[
dict(headers=PAOS_HEADER,
content=utils.make_oneline(saml2_fixtures.SP_SOAP_RESPONSE)),
dict(text=text)
])
def test_initial_sp_call_when_saml_authenticated(self):
self.requests_mock.get(
self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER})
authm = self.requests_mock.post(self.TEST_IDP_URL,
content=saml2_fixtures.SAML2_ASSERTION)
a = self.saml2plugin._send_service_provider_request(self.session)
self.assertTrue(a)
self.assertEqual(
saml2_fixtures.UNSCOPED_TOKEN['token'],
self.saml2plugin.authenticated_response.json()['token'])
self.assertEqual(
saml2_fixtures.UNSCOPED_TOKEN_HEADER,
self.saml2plugin.authenticated_response.headers['X-Subject-Token'])
self.requests_mock.post(
self.TEST_CONSUMER_URL,
status_code=302,
headers={'Location': self.TEST_SP_URL})
def test_get_unscoped_token_when_authenticated(self):
self.requests_mock.get(
self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER,
'Content-Type': 'application/json'})
resp = requests.get(self.TEST_SP_URL, auth=self.get_plugin())
self.assertEqual(200, resp.status_code)
self.assertEqual(text, resp.text)
token = self.saml2plugin.get_auth_ref(self.session)
self.assertEqual(self.calls, [self.TEST_SP_URL,
self.TEST_IDP_URL,
self.TEST_CONSUMER_URL,
self.TEST_SP_URL])
self.assertEqual(saml2_fixtures.UNSCOPED_TOKEN_HEADER,
token.auth_token)
self.assertEqual(self.basic_header(),
authm.last_request.headers['Authorization'])
authn_request = self.requests_mock.request_history[1].text
self.assertThat(saml2_fixtures.AUTHN_REQUEST,
matchers.XMLEquals(authn_request))
def test_standard_workflow_303_redirect(self):
text = uuid.uuid4().hex
self.requests_mock.get(self.TEST_SP_URL, response_list=[
dict(headers=PAOS_HEADER,
content=utils.make_oneline(saml2_fixtures.SP_SOAP_RESPONSE)),
dict(text=text)
])
authm = self.requests_mock.post(self.TEST_IDP_URL,
content=saml2_fixtures.SAML2_ASSERTION)
self.requests_mock.post(
self.TEST_CONSUMER_URL,
status_code=303,
headers={'Location': self.TEST_SP_URL})
resp = requests.get(self.TEST_SP_URL, auth=self.get_plugin())
self.assertEqual(200, resp.status_code)
self.assertEqual(text, resp.text)
url_flow = [self.TEST_SP_URL,
self.TEST_IDP_URL,
self.TEST_CONSUMER_URL,
self.TEST_SP_URL]
self.assertEqual(url_flow, [r.url.rstrip('/') for r in resp.history])
self.assertEqual(url_flow, self.calls)
self.assertEqual(self.basic_header(),
authm.last_request.headers['Authorization'])
authn_request = self.requests_mock.request_history[1].text
self.assertThat(saml2_fixtures.AUTHN_REQUEST,
matchers.XMLEquals(authn_request))
def test_initial_sp_call_invalid_response(self):
"""Send initial SP HTTP request and receive wrong server response."""
self.requests_mock.get(self.FEDERATION_AUTH_URL,
self.requests_mock.get(self.TEST_SP_URL,
headers=PAOS_HEADER,
text='NON XML RESPONSE')
self.assertRaises(
exceptions.AuthorizationFailure,
self.saml2plugin._send_service_provider_request,
self.session)
self.assertRaises(InvalidResponse,
requests.get,
self.TEST_SP_URL,
auth=self.get_plugin())
def test_send_authn_req_to_idp(self):
self.requests_mock.post(self.IDENTITY_PROVIDER_URL,
content=saml2_fixtures.SAML2_ASSERTION)
self.assertEqual(self.calls, [self.TEST_SP_URL])
self.saml2plugin.sp_response_consumer_url = self.SHIB_CONSUMER_URL
self.saml2plugin.saml2_authn_request = etree.XML(
saml2_fixtures.SP_SOAP_RESPONSE)
self.saml2plugin._send_idp_saml2_authn_request(self.session)
def test_consumer_mismatch_error_workflow(self):
consumer1 = 'http://consumer1/Shibboleth.sso/SAML2/ECP'
consumer2 = 'http://consumer2/Shibboleth.sso/SAML2/ECP'
soap_response = saml2_fixtures.soap_response(consumer=consumer1)
saml_assertion = saml2_fixtures.saml_assertion(destination=consumer2)
idp_response = etree.tostring(
self.saml2plugin.saml2_idp_authn_response)
self.requests_mock.get(self.TEST_SP_URL,
headers=PAOS_HEADER,
content=soap_response)
self.assertThat(idp_response,
matchers.XMLEquals(saml2_fixtures.SAML2_ASSERTION))
self.requests_mock.post(self.TEST_IDP_URL, content=saml_assertion)
def test_fail_basicauth_idp_authentication(self):
self.requests_mock.post(self.IDENTITY_PROVIDER_URL,
status_code=401)
# receive the SAML error, body unchecked
saml_error = self.requests_mock.post(consumer1)
self.saml2plugin.sp_response_consumer_url = self.SHIB_CONSUMER_URL
self.saml2plugin.saml2_authn_request = etree.XML(
saml2_fixtures.SP_SOAP_RESPONSE)
self.assertRaises(
exceptions.Unauthorized,
self.saml2plugin._send_idp_saml2_authn_request,
self.session)
self.assertRaises(saml2.v3.saml2.ConsumerMismatch,
requests.get,
self.TEST_SP_URL,
auth=self.get_plugin())
def test_mising_username_password_in_plugin(self):
self.assertRaises(TypeError,
saml2.V3Saml2Password,
self.TEST_URL, self.IDENTITY_PROVIDER,
self.IDENTITY_PROVIDER_URL)
self.assertTrue(saml_error.called)
def test_send_authn_response_to_sp(self):
self.requests_mock.post(
self.SHIB_CONSUMER_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER})
self.saml2plugin.relay_state = etree.XML(
saml2_fixtures.SP_SOAP_RESPONSE).xpath(
self.ECP_RELAY_STATE, namespaces=self.ECP_SAML2_NAMESPACES)[0]
class AuthenticateviaSAML2Tests(utils.TestCase):
self.saml2plugin.saml2_idp_authn_response = etree.XML(
saml2_fixtures.SAML2_ASSERTION)
TEST_USER = 'user'
TEST_PASS = 'pass'
TEST_IDP = 'tester'
TEST_PROTOCOL = 'saml2'
TEST_AUTH_URL = 'http://keystone.test:5000/v3/'
self.saml2plugin.idp_response_consumer_url = self.SHIB_CONSUMER_URL
self.saml2plugin._send_service_provider_saml2_authn_response(
self.session)
token_json = self.saml2plugin.authenticated_response.json()['token']
token = self.saml2plugin.authenticated_response.headers[
'X-Subject-Token']
self.assertEqual(saml2_fixtures.UNSCOPED_TOKEN['token'],
token_json)
TEST_IDP_URL = 'https://idp.test'
TEST_CONSUMER_URL = "https://openstack4.local/Shibboleth.sso/SAML2/ECP"
self.assertEqual(saml2_fixtures.UNSCOPED_TOKEN_HEADER,
token)
def get_plugin(self, **kwargs):
kwargs.setdefault('auth_url', self.TEST_AUTH_URL)
kwargs.setdefault('username', self.TEST_USER)
kwargs.setdefault('password', self.TEST_PASS)
kwargs.setdefault('identity_provider', self.TEST_IDP)
kwargs.setdefault('identity_provider_url', self.TEST_IDP_URL)
kwargs.setdefault('protocol', self.TEST_PROTOCOL)
return saml2.V3Saml2Password(**kwargs)
def test_consumer_url_mismatch_success(self):
self.saml2plugin._check_consumer_urls(
self.session, self.SHIB_CONSUMER_URL,
self.SHIB_CONSUMER_URL)
def sp_url(self, **kwargs):
kwargs.setdefault('base', self.TEST_AUTH_URL.rstrip('/'))
kwargs.setdefault('identity_provider', self.TEST_IDP)
kwargs.setdefault('protocol', self.TEST_PROTOCOL)
def test_consumer_url_mismatch(self):
self.requests_mock.post(self.SHIB_CONSUMER_URL)
invalid_consumer_url = uuid.uuid4().hex
self.assertRaises(
exceptions.AuthorizationFailure,
self.saml2plugin._check_consumer_urls,
self.session, self.SHIB_CONSUMER_URL,
invalid_consumer_url)
templ = ('%(base)s/OS-FEDERATION/identity_providers/'
'%(identity_provider)s/protocols/%(protocol)s/auth')
return templ % kwargs
def test_custom_302_redirection(self):
self.requests_mock.post(
self.SHIB_CONSUMER_URL,
text='BODY',
headers={'location': self.FEDERATION_AUTH_URL},
status_code=302)
@property
def calls(self):
return [r.url.strip('/') for r in self.requests_mock.request_history]
self.requests_mock.get(
self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER})
def basic_header(self, username=TEST_USER, password=TEST_PASS):
user_pass = ('%s:%s' % (username, password)).encode('utf-8')
return 'Basic %s' % base64.b64encode(user_pass).decode('utf-8')
self.session.redirect = False
response = self.session.post(
self.SHIB_CONSUMER_URL, data='CLIENT BODY')
self.assertEqual(302, response.status_code)
self.assertEqual(self.FEDERATION_AUTH_URL,
response.headers['location'])
def setUp(self):
super(AuthenticateviaSAML2Tests, self).setUp()
self.session = session.Session()
self.default_sp_url = self.sp_url()
response = self.saml2plugin._handle_http_ecp_redirect(
self.session, response, 'GET')
def test_workflow(self):
token_id = uuid.uuid4().hex
token = ksa_fixtures.V3Token()
self.assertEqual(self.FEDERATION_AUTH_URL, response.request.url)
self.assertEqual('GET', response.request.method)
self.requests_mock.get(self.default_sp_url, response_list=[
dict(headers=PAOS_HEADER,
content=utils.make_oneline(saml2_fixtures.SP_SOAP_RESPONSE)),
dict(headers={'X-Subject-Token': token_id}, json=token)
])
def test_custom_303_redirection(self):
self.requests_mock.post(
self.SHIB_CONSUMER_URL,
text='BODY',
headers={'location': self.FEDERATION_AUTH_URL},
status_code=303)
self.requests_mock.get(
self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER})
self.session.redirect = False
response = self.session.post(
self.SHIB_CONSUMER_URL, data='CLIENT BODY')
self.assertEqual(303, response.status_code)
self.assertEqual(self.FEDERATION_AUTH_URL,
response.headers['location'])
response = self.saml2plugin._handle_http_ecp_redirect(
self.session, response, 'GET')
self.assertEqual(self.FEDERATION_AUTH_URL, response.request.url)
self.assertEqual('GET', response.request.method)
def test_end_to_end_workflow(self):
self.requests_mock.get(
self.FEDERATION_AUTH_URL,
content=utils.make_oneline(saml2_fixtures.SP_SOAP_RESPONSE))
self.requests_mock.post(self.IDENTITY_PROVIDER_URL,
content=saml2_fixtures.SAML2_ASSERTION)
authm = self.requests_mock.post(self.TEST_IDP_URL,
content=saml2_fixtures.SAML2_ASSERTION)
self.requests_mock.post(
self.SHIB_CONSUMER_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER,
'Content-Type': 'application/json'})
self.TEST_CONSUMER_URL,
status_code=302,
headers={'Location': self.sp_url()})
self.session.redirect = False
response = self.saml2plugin.get_auth_ref(self.session)
self.assertEqual(saml2_fixtures.UNSCOPED_TOKEN_HEADER,
response.auth_token)
auth_ref = self.get_plugin().get_auth_ref(self.session)
self.assertEqual(token_id, auth_ref.auth_token)
self.assertEqual(self.calls, [self.default_sp_url,
self.TEST_IDP_URL,
self.TEST_CONSUMER_URL,
self.default_sp_url])
self.assertEqual(self.basic_header(),
authm.last_request.headers['Authorization'])
authn_request = self.requests_mock.request_history[1].text
self.assertThat(saml2_fixtures.AUTHN_REQUEST,
matchers.XMLEquals(authn_request))
def test_consumer_mismatch_error_workflow(self):
consumer1 = 'http://keystone.test/Shibboleth.sso/SAML2/ECP'
consumer2 = 'http://consumer2/Shibboleth.sso/SAML2/ECP'
soap_response = saml2_fixtures.soap_response(consumer=consumer1)
saml_assertion = saml2_fixtures.saml_assertion(destination=consumer2)
self.requests_mock.get(self.default_sp_url,
headers=PAOS_HEADER,
content=soap_response)
self.requests_mock.post(self.TEST_IDP_URL, content=saml_assertion)
# receive the SAML error, body unchecked
saml_error = self.requests_mock.post(consumer1)
self.assertRaises(exceptions.AuthorizationFailure,
self.get_plugin().get_auth_ref,
self.session)
self.assertTrue(saml_error.called)
def test_initial_sp_call_invalid_response(self):
"""Send initial SP HTTP request and receive wrong server response."""
self.requests_mock.get(self.default_sp_url,
headers=PAOS_HEADER,
text='NON XML RESPONSE')
self.assertRaises(exceptions.AuthorizationFailure,
self.get_plugin().get_auth_ref,
self.session)
self.assertEqual(self.calls, [self.default_sp_url])

View File

@ -53,6 +53,7 @@ keystoneauth1.plugin =
v3totp = keystoneauth1.loading._plugins.identity.v3:TOTP
v3fedkerb = keystoneauth1.extras.kerberos._loading:MappedKerberos
v3tokenlessauth = keystoneauth1.loading._plugins.identity.v3:TokenlessAuth
v3samlpassword = keystoneauth1.extras._saml2._loading:Saml2Password
[build_sphinx]
source-dir = doc/source