diff --git a/keystone/middleware/core.py b/keystone/middleware/core.py index 8e206ddef4..c6324f3b9c 100644 --- a/keystone/middleware/core.py +++ b/keystone/middleware/core.py @@ -80,10 +80,11 @@ class JsonBodyMiddleware(wsgi.Middleware): an underscore. """ - def process_request(self, request): - #if 'json' not in request.params: - # return + # Ignore unrecognized content types. Empty string indicates + # the client did not explicitly set the header + if not request.content_type in ('application/json', ''): + return params_json = request.body if not params_json: @@ -92,6 +93,9 @@ class JsonBodyMiddleware(wsgi.Middleware): params_parsed = {} try: params_parsed = json.loads(params_json) + except ValueError: + msg = "Malformed json in request body" + raise webob.exc.HTTPBadRequest(explanation=msg) finally: if not params_parsed: params_parsed = {} diff --git a/tests/test_middleware.py b/tests/test_middleware.py new file mode 100644 index 0000000000..685853ab24 --- /dev/null +++ b/tests/test_middleware.py @@ -0,0 +1,76 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import webob + +from keystone import config +from keystone import middleware +from keystone import test + + +CONF = config.CONF + + +def make_request(**kwargs): + return webob.Request.blank('/', **kwargs) + + +class TokenAuthMiddlewareTest(test.TestCase): + def test_request(self): + req = make_request() + req.headers[middleware.AUTH_TOKEN_HEADER] = 'MAGIC' + middleware.TokenAuthMiddleware(None).process_request(req) + context = req.environ[middleware.CONTEXT_ENV] + self.assertEqual(context['token_id'], 'MAGIC') + + +class AdminTokenAuthMiddlewareTest(test.TestCase): + def test_request_admin(self): + req = make_request() + req.headers[middleware.AUTH_TOKEN_HEADER] = CONF.admin_token + middleware.AdminTokenAuthMiddleware(None).process_request(req) + context = req.environ[middleware.CONTEXT_ENV] + self.assertTrue(context['is_admin']) + + def test_request_non_admin(self): + req = make_request() + req.headers[middleware.AUTH_TOKEN_HEADER] = 'NOT-ADMIN' + middleware.AdminTokenAuthMiddleware(None).process_request(req) + context = req.environ[middleware.CONTEXT_ENV] + self.assertFalse(context['is_admin']) + + +class PostParamsMiddlewareTest(test.TestCase): + def test_request_with_params(self): + req = make_request(POST={"arg1": "one"}) + middleware.PostParamsMiddleware(None).process_request(req) + params = req.environ[middleware.PARAMS_ENV] + self.assertEqual(params, {"arg1": "one"}) + + +class JsonBodyMiddlewareTest(test.TestCase): + def test_request_with_params(self): + req = make_request(body='{"arg1": "one", "arg2": ["a"]}', + content_type='application/json') + middleware.JsonBodyMiddleware(None).process_request(req) + params = req.environ[middleware.PARAMS_ENV] + self.assertEqual(params, {"arg1": "one", "arg2": ["a"]}) + + def test_malformed_json(self): + req = make_request(body='{"arg1": "on', + content_type='application/json') + _middleware = middleware.JsonBodyMiddleware(None) + self.assertRaises(webob.exc.HTTPBadRequest, + _middleware.process_request, req) + + def test_no_content_type(self): + req = make_request(body='{"arg1": "one", "arg2": ["a"]}') + middleware.JsonBodyMiddleware(None).process_request(req) + params = req.environ[middleware.PARAMS_ENV] + self.assertEqual(params, {"arg1": "one", "arg2": ["a"]}) + + def test_unrecognized_content_type(self): + req = make_request(body='{"arg1": "one", "arg2": ["a"]}', + content_type='text/plain') + middleware.JsonBodyMiddleware(None).process_request(req) + params = req.environ.get(middleware.PARAMS_ENV, {}) + self.assertEqual(params, {})