From 44c44a350592b6e47f53b70527d68726d3e51aec Mon Sep 17 00:00:00 2001 From: Mike Fedosin Date: Wed, 22 Mar 2017 23:09:08 +0300 Subject: [PATCH] Add unit tests for trusted auth middleware Change-Id: Icc96792d7da61d2082aa289173eaa7705a615533 --- glare/api/middleware/context.py | 30 ++-- .../unit/middleware/test_trusted_auth.py | 144 ++++++++++++++++++ 2 files changed, 162 insertions(+), 12 deletions(-) create mode 100644 glare/tests/unit/middleware/test_trusted_auth.py diff --git a/glare/api/middleware/context.py b/glare/api/middleware/context.py index 25acebe..1c486a9 100644 --- a/glare/api/middleware/context.py +++ b/glare/api/middleware/context.py @@ -132,22 +132,28 @@ class TrustedAuthMiddleware(BaseContextMiddleware): msg = _("Auth token must be provided") raise exception.Unauthorized(msg) try: - user, tenant, roles = auth_token.split(':') + user, tenant, roles = auth_token.strip().split(':', 3) except ValueError: msg = _("Wrong auth token format. It must be 'user:tenant:roles'") raise exception.Unauthorized(msg) - if tenant.lower() == 'none': + if not tenant: + msg = _("Tenant must be specified in auth token. " + "Format of the token is 'user:tenant:roles'") + raise exception.Unauthorized(msg) + elif tenant.lower() == 'none': tenant = None + req.headers['X-Identity-Status'] = 'Nope' + else: + req.headers['X-Identity-Status'] = 'Confirmed' + req.headers['X-User-Id'] = user req.headers['X-Tenant-Id'] = tenant - req.headers['X-Roles'] = roles.split(',') - req.headers['X-Identity-Status'] = 'Confirmed' - kwargs = { - 'user': user, - 'tenant': tenant, - 'roles': roles, - 'is_admin': 'admin' in req.headers['X-Roles'], - 'auth_token': auth_token, - } + req.headers['X-Roles'] = roles - req.context = context.RequestContext(**kwargs) + if req.headers.get('X-Identity-Status') == 'Confirmed': + kwargs = {'request_id': req.environ.get(request_id.ENV_REQUEST_ID)} + req.context = RequestContext.from_environ(req.environ, **kwargs) + elif CONF.allow_anonymous_access: + req.context = RequestContext(read_only=True, is_admin=False) + else: + raise exception.Unauthorized() diff --git a/glare/tests/unit/middleware/test_trusted_auth.py b/glare/tests/unit/middleware/test_trusted_auth.py new file mode 100644 index 0000000..6a2c018 --- /dev/null +++ b/glare/tests/unit/middleware/test_trusted_auth.py @@ -0,0 +1,144 @@ +# Copyright 2017 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import webob + +from glare.api.middleware import context +from glare.common import exception as exc +from glare.tests.unit import base + + +class TestTrustedAuthMiddleware(base.BaseTestCase): + def _build_request(self, token): + req = webob.Request.blank("/") + req.headers["x-auth-token"] = token + req.get_response = lambda app: None + return req + + def _build_middleware(self): + return context.TrustedAuthMiddleware(None) + + def test_header_parsing(self): + token = 'user1:tenant1:role1,role2' + req = self._build_request(token) + self._build_middleware().process_request(req) + + self.assertEqual("Confirmed", req.headers["X-Identity-Status"]) + self.assertEqual("user1", req.headers["X-User-Id"]) + self.assertEqual("tenant1", req.headers["X-Tenant-Id"]) + self.assertEqual("role1,role2", req.headers["X-Roles"]) + + self.assertEqual(token, req.context.auth_token) + self.assertEqual('user1', req.context.user) + self.assertEqual('tenant1', req.context.tenant) + self.assertEqual(['role1', 'role2'], req.context.roles) + + def test_wrong_format(self): + req = self._build_request('WRONG_FORMAT') + middleware = self._build_middleware() + self.assertRaises(exc.Unauthorized, + middleware.process_request, req) + + req = self._build_request('user1:tenant1:role1:role2') + self.assertRaises(exc.Unauthorized, + middleware.process_request, req) + + def test_no_tenant(self): + req = self._build_request('user1::role') + middleware = self._build_middleware() + self.assertRaises(exc.Unauthorized, + middleware.process_request, req) + + def test_no_roles(self): + # stripping extra spaces in request + req = self._build_request('user1:tenant1:') + self._build_middleware().process_request(req) + self.assertFalse(req.context.is_admin) + self.assertEqual('user1', req.context.user) + self.assertEqual("user1", req.headers["X-User-Id"]) + self.assertEqual("", req.headers["X-Roles"]) + self.assertEqual([], req.context.roles) + + def test_is_admin_flag(self): + # is_admin check should look for 'admin' role by default + req = self._build_request('user1:tenant1:role1,admin') + self._build_middleware().process_request(req) + self.assertTrue(req.context.is_admin) + + # without the 'admin' role, is_admin should be False + req = self._build_request('user1:tenant1:role1,role2') + self._build_middleware().process_request(req) + self.assertFalse(req.context.is_admin) + + # if we change the admin_role attribute, we should be able to use it + req = self._build_request('user1:tenant1:role1,role2') + self.policy(context_is_admin='role:role1') + self._build_middleware().process_request(req) + self.assertTrue(req.context.is_admin) + + def test_roles_case_insensitive(self): + # accept role from request + req = self._build_request('user1:tenant1:Admin,role2') + self._build_middleware().process_request(req) + self.assertTrue(req.context.is_admin) + + # accept role from config + req = self._build_request('user1:tenant1:role1,role2') + self.policy(context_is_admin='role:rOLe1') + self._build_middleware().process_request(req) + self.assertTrue(req.context.is_admin) + + def test_token_stripping(self): + # stripping extra spaces in request + req = self._build_request(' user1:tenant1:role1\t') + self.policy(context_is_admin='role:role1') + self._build_middleware().process_request(req) + self.assertTrue(req.context.is_admin) + self.assertEqual('user1', req.context.user) + self.assertEqual("user1", req.headers["X-User-Id"]) + self.assertEqual("role1", req.headers["X-Roles"]) + + def test_anonymous_access_enabled(self): + req = self._build_request('user1:none:role1,role2') + self.config(allow_anonymous_access=True) + middleware = self._build_middleware() + middleware.process_request(req) + self.assertIsNone(req.context.auth_token) + self.assertIsNone(req.context.user) + self.assertIsNone(req.context.tenant) + self.assertEqual([], req.context.roles) + self.assertFalse(req.context.is_admin) + self.assertTrue(req.context.read_only) + + def test_anonymous_access_defaults_to_disabled(self): + req = self._build_request('user1:none:role1,role2') + middleware = self._build_middleware() + self.assertRaises(exc.Unauthorized, + middleware.process_request, req) + + def test_response(self): + req = self._build_request('user1:tenant1:role1,role2') + req.context = context.RequestContext() + request_id = req.context.request_id + + resp = webob.Response() + resp.request = req + self._build_middleware().process_response(resp) + self.assertEqual(request_id, resp.headers['x-openstack-request-id']) + resp_req_id = resp.headers['x-openstack-request-id'] + # Validate that request-id do not starts with 'req-req-' + if isinstance(resp_req_id, bytes): + resp_req_id = resp_req_id.decode('utf-8') + self.assertFalse(resp_req_id.startswith('req-req-')) + self.assertTrue(resp_req_id.startswith('req-'))