keystone/keystone/tests/unit/test_wsgi.py

514 lines
20 KiB
Python

# encoding: utf-8
#
# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gettext
import socket
import uuid
import eventlet
import mock
import oslo_i18n
from oslo_serialization import jsonutils
import six
from six.moves import http_client
from testtools import matchers
import webob
from keystone.common import environment
from keystone.common import wsgi
from keystone import exception
from keystone.tests import unit
class FakeApp(wsgi.Application):
def index(self, context):
return {'a': 'b'}
class FakeAttributeCheckerApp(wsgi.Application):
def index(self, context):
return context['query_string']
def assert_attribute(self, body, attr):
"""Asserts that the given request has a certain attribute."""
ref = jsonutils.loads(body)
self._require_attribute(ref, attr)
def assert_attributes(self, body, attr):
"""Asserts that the given request has a certain set attributes."""
ref = jsonutils.loads(body)
self._require_attributes(ref, attr)
class RouterTest(unit.TestCase):
def setUp(self):
self.router = wsgi.RoutersBase()
super(RouterTest, self).setUp()
def test_invalid_status(self):
fake_mapper = uuid.uuid4().hex
fake_controller = uuid.uuid4().hex
fake_path = uuid.uuid4().hex
fake_rel = uuid.uuid4().hex
self.assertRaises(exception.Error,
self.router._add_resource,
fake_mapper, fake_controller, fake_path, fake_rel,
status=uuid.uuid4().hex)
class BaseWSGITest(unit.TestCase):
def setUp(self):
self.app = FakeApp()
super(BaseWSGITest, self).setUp()
def _make_request(self, url='/'):
req = webob.Request.blank(url)
args = {'action': 'index', 'controller': None}
req.environ['wsgiorg.routing_args'] = [None, args]
return req
class ApplicationTest(BaseWSGITest):
def test_response_content_type(self):
req = self._make_request()
resp = req.get_response(self.app)
self.assertEqual('application/json', resp.content_type)
def test_query_string_available(self):
class FakeApp(wsgi.Application):
def index(self, context):
return context['query_string']
req = self._make_request(url='/?1=2')
resp = req.get_response(FakeApp())
self.assertEqual({'1': '2'}, jsonutils.loads(resp.body))
def test_headers_available(self):
class FakeApp(wsgi.Application):
def index(self, context):
return context['headers']
app = FakeApp()
req = self._make_request(url='/?1=2')
req.headers['X-Foo'] = "bar"
resp = req.get_response(app)
self.assertIn('X-Foo', eval(resp.body))
def test_render_response(self):
data = {'attribute': 'value'}
body = b'{"attribute": "value"}'
resp = wsgi.render_response(body=data)
self.assertEqual('200 OK', resp.status)
self.assertEqual(http_client.OK, resp.status_int)
self.assertEqual(body, resp.body)
self.assertEqual('X-Auth-Token', resp.headers.get('Vary'))
self.assertEqual(str(len(body)), resp.headers.get('Content-Length'))
def test_render_response_custom_status(self):
resp = wsgi.render_response(
status=(http_client.NOT_IMPLEMENTED, 'Not Implemented'))
self.assertEqual('501 Not Implemented', resp.status)
self.assertEqual(http_client.NOT_IMPLEMENTED, resp.status_int)
def test_successful_require_attribute(self):
app = FakeAttributeCheckerApp()
req = self._make_request(url='/?1=2')
resp = req.get_response(app)
app.assert_attribute(resp.body, '1')
def test_require_attribute_fail_if_attribute_not_present(self):
app = FakeAttributeCheckerApp()
req = self._make_request(url='/?1=2')
resp = req.get_response(app)
self.assertRaises(exception.ValidationError,
app.assert_attribute, resp.body, 'a')
def test_successful_require_multiple_attributes(self):
app = FakeAttributeCheckerApp()
req = self._make_request(url='/?a=1&b=2')
resp = req.get_response(app)
app.assert_attributes(resp.body, ['a', 'b'])
def test_attribute_missing_from_request(self):
app = FakeAttributeCheckerApp()
req = self._make_request(url='/?a=1&b=2')
resp = req.get_response(app)
ex = self.assertRaises(exception.ValidationError,
app.assert_attributes,
resp.body, ['a', 'missing_attribute'])
self.assertThat(six.text_type(ex),
matchers.Contains('missing_attribute'))
def test_no_required_attributes_present(self):
app = FakeAttributeCheckerApp()
req = self._make_request(url='/')
resp = req.get_response(app)
ex = self.assertRaises(exception.ValidationError,
app.assert_attributes, resp.body,
['missing_attribute1', 'missing_attribute2'])
self.assertThat(six.text_type(ex),
matchers.Contains('missing_attribute1'))
self.assertThat(six.text_type(ex),
matchers.Contains('missing_attribute2'))
def test_render_response_custom_headers(self):
resp = wsgi.render_response(headers=[('Custom-Header', 'Some-Value')])
self.assertEqual('Some-Value', resp.headers.get('Custom-Header'))
self.assertEqual('X-Auth-Token', resp.headers.get('Vary'))
def test_render_response_non_str_headers_converted(self):
resp = wsgi.render_response(
headers=[('Byte-Header', 'Byte-Value'),
(u'Unicode-Header', u'Unicode-Value')])
# assert that all headers are identified.
self.assertThat(resp.headers, matchers.HasLength(4))
self.assertEqual('Unicode-Value', resp.headers.get('Unicode-Header'))
# assert that unicode value is converted, the expected type is str
# on both python2 and python3.
self.assertEqual(str,
type(resp.headers.get('Unicode-Header')))
def test_render_response_no_body(self):
resp = wsgi.render_response()
self.assertEqual('204 No Content', resp.status)
self.assertEqual(http_client.NO_CONTENT, resp.status_int)
self.assertEqual(b'', resp.body)
self.assertEqual('0', resp.headers.get('Content-Length'))
self.assertIsNone(resp.headers.get('Content-Type'))
def test_render_response_head_with_body(self):
resp = wsgi.render_response({'id': uuid.uuid4().hex}, method='HEAD')
self.assertEqual(http_client.OK, resp.status_int)
self.assertEqual(b'', resp.body)
self.assertNotEqual('0', resp.headers.get('Content-Length'))
self.assertEqual('application/json', resp.headers.get('Content-Type'))
def test_application_local_config(self):
class FakeApp(wsgi.Application):
def __init__(self, *args, **kwargs):
self.kwargs = kwargs
app = FakeApp.factory({}, testkey="test")
self.assertIn("testkey", app.kwargs)
self.assertEqual("test", app.kwargs["testkey"])
def test_render_exception(self):
e = exception.Unauthorized(message=u'\u7f51\u7edc')
resp = wsgi.render_exception(e)
self.assertEqual(http_client.UNAUTHORIZED, resp.status_int)
def test_render_exception_host(self):
e = exception.Unauthorized(message=u'\u7f51\u7edc')
context = {'host_url': 'http://%s:5000' % uuid.uuid4().hex}
resp = wsgi.render_exception(e, context=context)
self.assertEqual(http_client.UNAUTHORIZED, resp.status_int)
def test_improperly_encoded_params(self):
class FakeApp(wsgi.Application):
def index(self, context):
return context['query_string']
# this is high bit set ASCII, copy & pasted from Windows.
# aka code page 1252. It is not valid UTF8.
req = self._make_request(url='/?name=nonexit%E8nt')
self.assertRaises(exception.ValidationError, req.get_response,
FakeApp())
def test_properly_encoded_params(self):
class FakeApp(wsgi.Application):
def index(self, context):
return context['query_string']
# nonexitènt encoded as UTF-8
req = self._make_request(url='/?name=nonexit%C3%A8nt')
resp = req.get_response(FakeApp())
self.assertEqual({'name': u'nonexit\xe8nt'},
jsonutils.loads(resp.body))
class ExtensionRouterTest(BaseWSGITest):
def test_extensionrouter_local_config(self):
class FakeRouter(wsgi.ExtensionRouter):
def __init__(self, *args, **kwargs):
self.kwargs = kwargs
factory = FakeRouter.factory({}, testkey="test")
app = factory(self.app)
self.assertIn("testkey", app.kwargs)
self.assertEqual("test", app.kwargs["testkey"])
class MiddlewareTest(BaseWSGITest):
def test_middleware_request(self):
class FakeMiddleware(wsgi.Middleware):
def process_request(self, req):
req.environ['fake_request'] = True
return req
req = self._make_request()
resp = FakeMiddleware(None)(req)
self.assertIn('fake_request', resp.environ)
def test_middleware_response(self):
class FakeMiddleware(wsgi.Middleware):
def process_response(self, request, response):
response.environ = {}
response.environ['fake_response'] = True
return response
req = self._make_request()
resp = FakeMiddleware(self.app)(req)
self.assertIn('fake_response', resp.environ)
def test_middleware_bad_request(self):
class FakeMiddleware(wsgi.Middleware):
def process_response(self, request, response):
raise exception.Unauthorized()
req = self._make_request()
req.environ['REMOTE_ADDR'] = '127.0.0.1'
resp = FakeMiddleware(self.app)(req)
self.assertEqual(exception.Unauthorized.code, resp.status_int)
def test_middleware_type_error(self):
class FakeMiddleware(wsgi.Middleware):
def process_response(self, request, response):
raise TypeError()
req = self._make_request()
req.environ['REMOTE_ADDR'] = '127.0.0.1'
resp = FakeMiddleware(self.app)(req)
# This is a validationerror type
self.assertEqual(exception.ValidationError.code, resp.status_int)
def test_middleware_exception_error(self):
exception_str = b'EXCEPTIONERROR'
class FakeMiddleware(wsgi.Middleware):
def process_response(self, request, response):
raise exception.UnexpectedError(exception_str)
def do_request():
req = self._make_request()
resp = FakeMiddleware(self.app)(req)
self.assertEqual(exception.UnexpectedError.code, resp.status_int)
return resp
# Exception data should not be in the message when insecure_debug is
# False
self.config_fixture.config(debug=False, insecure_debug=False)
self.assertNotIn(exception_str, do_request().body)
# Exception data should be in the message when insecure_debug is True
self.config_fixture.config(debug=True, insecure_debug=True)
self.assertIn(exception_str, do_request().body)
class LocalizedResponseTest(unit.TestCase):
def test_request_match_default(self):
# The default language if no Accept-Language is provided is None
req = webob.Request.blank('/')
self.assertIsNone(wsgi.best_match_language(req))
@mock.patch.object(oslo_i18n, 'get_available_languages')
def test_request_match_language_expected(self, mock_gal):
# If Accept-Language is a supported language, best_match_language()
# returns it.
language = uuid.uuid4().hex
mock_gal.return_value = [language]
req = webob.Request.blank('/', headers={'Accept-Language': language})
self.assertEqual(language, wsgi.best_match_language(req))
@mock.patch.object(oslo_i18n, 'get_available_languages')
def test_request_match_language_unexpected(self, mock_gal):
# If Accept-Language is a language we do not support,
# best_match_language() returns None.
supported_language = uuid.uuid4().hex
mock_gal.return_value = [supported_language]
request_language = uuid.uuid4().hex
req = webob.Request.blank(
'/', headers={'Accept-Language': request_language})
self.assertIsNone(wsgi.best_match_language(req))
def test_static_translated_string_is_lazy_translatable(self):
# Statically created message strings are an object that can get
# lazy-translated rather than a regular string.
self.assertNotEqual(six.text_type,
type(exception.Unauthorized.message_format))
@mock.patch.object(oslo_i18n, 'get_available_languages')
def test_get_localized_response(self, mock_gal):
# If the request has the Accept-Language set to a supported language
# and an exception is raised by the application that is translatable
# then the response will have the translated message.
language = uuid.uuid4().hex
mock_gal.return_value = [language]
# The arguments for the xlated message format have to match the args
# for the chosen exception (exception.NotFound)
xlated_msg_fmt = "Xlated NotFound, %(target)s."
# Fake out gettext.translation() to return a translator for our
# expected language and a passthrough translator for other langs.
def fake_translation(*args, **kwargs):
class IdentityTranslator(object):
def ugettext(self, msgid):
return msgid
gettext = ugettext
class LangTranslator(object):
def ugettext(self, msgid):
if msgid == exception.NotFound.message_format:
return xlated_msg_fmt
return msgid
gettext = ugettext
if language in kwargs.get('languages', []):
return LangTranslator()
return IdentityTranslator()
with mock.patch.object(gettext, 'translation',
side_effect=fake_translation) as xlation_mock:
target = uuid.uuid4().hex
# Fake app raises NotFound exception to simulate Keystone raising.
class FakeApp(wsgi.Application):
def index(self, context):
raise exception.NotFound(target=target)
# Make the request with Accept-Language on the app, expect an error
# response with the translated message.
req = webob.Request.blank('/')
args = {'action': 'index', 'controller': None}
req.environ['wsgiorg.routing_args'] = [None, args]
req.headers['Accept-Language'] = language
resp = req.get_response(FakeApp())
# Assert that the translated message appears in the response.
exp_msg = xlated_msg_fmt % dict(target=target)
self.assertThat(resp.json['error']['message'],
matchers.Equals(exp_msg))
self.assertThat(xlation_mock.called, matchers.Equals(True))
class ServerTest(unit.TestCase):
def setUp(self):
super(ServerTest, self).setUp()
self.host = '127.0.0.1'
self.port = '1234'
@mock.patch('eventlet.listen')
@mock.patch('socket.getaddrinfo')
def test_keepalive_unset(self, mock_getaddrinfo, mock_listen):
mock_getaddrinfo.return_value = [(1, 2, 3, 4, 5)]
mock_sock_dup = mock_listen.return_value.dup.return_value
server = environment.Server(mock.MagicMock(), host=self.host,
port=self.port)
server.start()
self.addCleanup(server.stop)
self.assertTrue(mock_listen.called)
self.assertFalse(mock_sock_dup.setsockopt.called)
@mock.patch('eventlet.listen')
@mock.patch('socket.getaddrinfo')
def test_keepalive_set(self, mock_getaddrinfo, mock_listen):
mock_getaddrinfo.return_value = [(1, 2, 3, 4, 5)]
mock_sock_dup = mock_listen.return_value.dup.return_value
server = environment.Server(mock.MagicMock(), host=self.host,
port=self.port, keepalive=True)
server.start()
self.addCleanup(server.stop)
mock_sock_dup.setsockopt.assert_called_once_with(socket.SOL_SOCKET,
socket.SO_KEEPALIVE,
1)
self.assertTrue(mock_listen.called)
@mock.patch('eventlet.listen')
@mock.patch('socket.getaddrinfo')
def test_keepalive_and_keepidle_set(self, mock_getaddrinfo, mock_listen):
mock_getaddrinfo.return_value = [(1, 2, 3, 4, 5)]
mock_sock_dup = mock_listen.return_value.dup.return_value
server = environment.Server(mock.MagicMock(), host=self.host,
port=self.port, keepalive=True,
keepidle=1)
server.start()
self.addCleanup(server.stop)
if hasattr(socket, 'TCP_KEEPIDLE'):
self.assertEqual(2, mock_sock_dup.setsockopt.call_count)
# Test the last set of call args i.e. for the keepidle
mock_sock_dup.setsockopt.assert_called_with(socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE,
1)
else:
self.assertEqual(1, mock_sock_dup.setsockopt.call_count)
self.assertTrue(mock_listen.called)
def test_client_socket_timeout(self):
# mocking server method of eventlet.wsgi to check it is called with
# configured 'client_socket_timeout' value.
for socket_timeout in range(1, 10):
self.config_fixture.config(group='eventlet_server',
client_socket_timeout=socket_timeout)
server = environment.Server(mock.MagicMock(), host=self.host,
port=self.port)
with mock.patch.object(eventlet.wsgi, 'server') as mock_server:
fake_application = uuid.uuid4().hex
fake_socket = uuid.uuid4().hex
server._run(fake_application, fake_socket)
mock_server.assert_called_once_with(
fake_socket,
fake_application,
debug=mock.ANY,
socket_timeout=socket_timeout,
log=mock.ANY,
keepalive=mock.ANY)
def test_wsgi_keep_alive(self):
# mocking server method of eventlet.wsgi to check it is called with
# configured 'wsgi_keep_alive' value.
wsgi_keepalive = False
self.config_fixture.config(group='eventlet_server',
wsgi_keep_alive=wsgi_keepalive)
server = environment.Server(mock.MagicMock(), host=self.host,
port=self.port)
with mock.patch.object(eventlet.wsgi, 'server') as mock_server:
fake_application = uuid.uuid4().hex
fake_socket = uuid.uuid4().hex
server._run(fake_application, fake_socket)
mock_server.assert_called_once_with(fake_socket,
fake_application,
debug=mock.ANY,
socket_timeout=mock.ANY,
log=mock.ANY,
keepalive=wsgi_keepalive)