Add test for header in Saml2 plugin

This patch adds tests for https://review.openstack.org/#/c/420970/,
with this, we can be more confident that regressions won't easily
happen again in the future.

Change-Id: I40ebc795266b8fe4a191c543f6fadeefac661d2a
Related-Bug: 1656946
This commit is contained in:
Rodrigo Duarte Sousa 2017-01-17 10:15:58 -03:00
parent b1301e606d
commit 01e0122a14

View File

@ -23,7 +23,8 @@ from keystoneauth1.tests.unit.extras.saml2 import fixtures as saml2_fixtures
from keystoneauth1.tests.unit.extras.saml2 import utils from keystoneauth1.tests.unit.extras.saml2 import utils
from keystoneauth1.tests.unit import matchers from keystoneauth1.tests.unit import matchers
PAOS_HEADER = {'Content-Type': 'application/vnd.paos+xml'} PAOS_HEADER = 'application/vnd.paos+xml'
CONTENT_TYPE_PAOS_HEADER = {'Content-Type': PAOS_HEADER}
InvalidResponse = saml2.v3.saml2.InvalidResponse InvalidResponse = saml2.v3.saml2.InvalidResponse
@ -34,6 +35,8 @@ class SamlAuth2PluginTests(utils.TestCase):
extracted into it's own module. extracted into it's own module.
""" """
HEADER_MEDIA_TYPE_SEPARATOR = ','
TEST_USER = 'user' TEST_USER = 'user'
TEST_PASS = 'pass' TEST_PASS = 'pass'
TEST_SP_URL = 'http://sp.test' TEST_SP_URL = 'http://sp.test'
@ -53,12 +56,31 @@ class SamlAuth2PluginTests(utils.TestCase):
user_pass = ('%s:%s' % (username, password)).encode('utf-8') user_pass = ('%s:%s' % (username, password)).encode('utf-8')
return 'Basic %s' % base64.b64encode(user_pass).decode('utf-8') return 'Basic %s' % base64.b64encode(user_pass).decode('utf-8')
def test_request_accept_headers(self):
# Include some random Accept header
random_header = uuid.uuid4().hex
headers = {'Accept': random_header}
req = requests.Request('GET', 'http://another.test', headers=headers)
plugin = self.get_plugin()
plugin_headers = plugin(req).headers
self.assertIn('Accept', plugin_headers)
# Since we have included a random Accept header, the plugin should have
# added the PAOS_HEADER to it using the correct media type separator
accept_header = plugin_headers['Accept']
self.assertIn(self.HEADER_MEDIA_TYPE_SEPARATOR, accept_header)
self.assertIn(random_header,
accept_header.split(self.HEADER_MEDIA_TYPE_SEPARATOR))
self.assertIn(PAOS_HEADER,
accept_header.split(self.HEADER_MEDIA_TYPE_SEPARATOR))
def test_passed_when_not_200(self): def test_passed_when_not_200(self):
text = uuid.uuid4().hex text = uuid.uuid4().hex
test_url = 'http://another.test' test_url = 'http://another.test'
self.requests_mock.get(test_url, self.requests_mock.get(test_url,
status_code=201, status_code=201,
headers=PAOS_HEADER, headers=CONTENT_TYPE_PAOS_HEADER,
text=text) text=text)
resp = requests.get(test_url, auth=self.get_plugin()) resp = requests.get(test_url, auth=self.get_plugin())
@ -78,7 +100,7 @@ class SamlAuth2PluginTests(utils.TestCase):
text = uuid.uuid4().hex text = uuid.uuid4().hex
self.requests_mock.get(self.TEST_SP_URL, response_list=[ self.requests_mock.get(self.TEST_SP_URL, response_list=[
dict(headers=PAOS_HEADER, dict(headers=CONTENT_TYPE_PAOS_HEADER,
content=utils.make_oneline(saml2_fixtures.SP_SOAP_RESPONSE)), content=utils.make_oneline(saml2_fixtures.SP_SOAP_RESPONSE)),
dict(text=text) dict(text=text)
]) ])
@ -111,7 +133,7 @@ class SamlAuth2PluginTests(utils.TestCase):
text = uuid.uuid4().hex text = uuid.uuid4().hex
self.requests_mock.get(self.TEST_SP_URL, response_list=[ self.requests_mock.get(self.TEST_SP_URL, response_list=[
dict(headers=PAOS_HEADER, dict(headers=CONTENT_TYPE_PAOS_HEADER,
content=utils.make_oneline(saml2_fixtures.SP_SOAP_RESPONSE)), content=utils.make_oneline(saml2_fixtures.SP_SOAP_RESPONSE)),
dict(text=text) dict(text=text)
]) ])
@ -146,7 +168,7 @@ class SamlAuth2PluginTests(utils.TestCase):
def test_initial_sp_call_invalid_response(self): def test_initial_sp_call_invalid_response(self):
"""Send initial SP HTTP request and receive wrong server response.""" """Send initial SP HTTP request and receive wrong server response."""
self.requests_mock.get(self.TEST_SP_URL, self.requests_mock.get(self.TEST_SP_URL,
headers=PAOS_HEADER, headers=CONTENT_TYPE_PAOS_HEADER,
text='NON XML RESPONSE') text='NON XML RESPONSE')
self.assertRaises(InvalidResponse, self.assertRaises(InvalidResponse,
@ -163,7 +185,7 @@ class SamlAuth2PluginTests(utils.TestCase):
saml_assertion = saml2_fixtures.saml_assertion(destination=consumer2) saml_assertion = saml2_fixtures.saml_assertion(destination=consumer2)
self.requests_mock.get(self.TEST_SP_URL, self.requests_mock.get(self.TEST_SP_URL,
headers=PAOS_HEADER, headers=CONTENT_TYPE_PAOS_HEADER,
content=soap_response) content=soap_response)
self.requests_mock.post(self.TEST_IDP_URL, content=saml_assertion) self.requests_mock.post(self.TEST_IDP_URL, content=saml_assertion)
@ -226,7 +248,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
token = ksa_fixtures.V3Token() token = ksa_fixtures.V3Token()
self.requests_mock.get(self.default_sp_url, response_list=[ self.requests_mock.get(self.default_sp_url, response_list=[
dict(headers=PAOS_HEADER, dict(headers=CONTENT_TYPE_PAOS_HEADER,
content=utils.make_oneline(saml2_fixtures.SP_SOAP_RESPONSE)), content=utils.make_oneline(saml2_fixtures.SP_SOAP_RESPONSE)),
dict(headers={'X-Subject-Token': token_id}, json=token) dict(headers={'X-Subject-Token': token_id}, json=token)
]) ])
@ -263,7 +285,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
saml_assertion = saml2_fixtures.saml_assertion(destination=consumer2) saml_assertion = saml2_fixtures.saml_assertion(destination=consumer2)
self.requests_mock.get(self.default_sp_url, self.requests_mock.get(self.default_sp_url,
headers=PAOS_HEADER, headers=CONTENT_TYPE_PAOS_HEADER,
content=soap_response) content=soap_response)
self.requests_mock.post(self.TEST_IDP_URL, content=saml_assertion) self.requests_mock.post(self.TEST_IDP_URL, content=saml_assertion)
@ -280,7 +302,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
def test_initial_sp_call_invalid_response(self): def test_initial_sp_call_invalid_response(self):
"""Send initial SP HTTP request and receive wrong server response.""" """Send initial SP HTTP request and receive wrong server response."""
self.requests_mock.get(self.default_sp_url, self.requests_mock.get(self.default_sp_url,
headers=PAOS_HEADER, headers=CONTENT_TYPE_PAOS_HEADER,
text='NON XML RESPONSE') text='NON XML RESPONSE')
self.assertRaises(exceptions.AuthorizationFailure, self.assertRaises(exceptions.AuthorizationFailure,