From a2e7b17810ed34719dc101f93dc480e2f9fdce6e Mon Sep 17 00:00:00 2001 From: Bryan Davidson Date: Fri, 30 Aug 2013 12:31:12 -0400 Subject: [PATCH] Refactor for testability of an upcoming change confirm_token_not_expired() in keystoneclient/middleware/auth_token.py has been moved out of the class to make it a function and be more testable. Currently, there is no need to keep it within the class. An upcoming commit makes fixes that rely on this refactor to be tested. Change-Id: I8460a2ee663dec8be0f339735208779a3b988040 --- keystoneclient/middleware/auth_token.py | 48 ++++++++++--------- .../tests/test_auth_token_middleware.py | 20 ++++++++ 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/keystoneclient/middleware/auth_token.py b/keystoneclient/middleware/auth_token.py index 1dc0a7b17..616c18de4 100644 --- a/keystoneclient/middleware/auth_token.py +++ b/keystoneclient/middleware/auth_token.py @@ -309,6 +309,29 @@ def will_expire_soon(expiry): return expiry < soon +def _token_is_v2(token_info): + return ('access' in token_info) + + +def _token_is_v3(token_info): + return ('token' in token_info) + + +def confirm_token_not_expired(data): + if not data: + raise InvalidUserToken('Token authorization failed') + if _token_is_v2(data): + timestamp = data['access']['token']['expires'] + elif _token_is_v3(data): + timestamp = data['token']['expires_at'] + else: + raise InvalidUserToken('Token authorization failed') + expires = timeutils.parse_isotime(timestamp).strftime('%s') + if time.time() >= float(expires): + raise InvalidUserToken('Token authorization failed') + return expires + + def safe_quote(s): """URL-encode strings that are not already URL-encoded.""" return urllib.quote(s) if s == urllib.unquote(s) else s @@ -783,7 +806,7 @@ class AuthProtocol(object): data = jsonutils.loads(verified) else: data = self.verify_uuid_token(user_token, retry) - expires = self._confirm_token_not_expired(data) + expires = confirm_token_not_expired(data) self._cache_put(token_id, data, expires) return data except NetworkError: @@ -797,12 +820,6 @@ class AuthProtocol(object): self.LOG.warn("Authorization failed for token %s", token_id) raise InvalidUserToken('Token authorization failed') - def _token_is_v2(self, token_info): - return ('access' in token_info) - - def _token_is_v3(self, token_info): - return ('token' in token_info) - def _build_user_headers(self, token_info): """Convert token object into headers. @@ -846,7 +863,7 @@ class AuthProtocol(object): project_domain_id = None project_domain_name = None - if self._token_is_v2(token_info): + if _token_is_v2(token_info): user = token_info['access']['user'] token = token_info['access']['token'] roles = ','.join([role['name'] for role in user.get('roles', [])]) @@ -1016,21 +1033,6 @@ class AuthProtocol(object): data_to_store, timeout=self.token_cache_time) - def _confirm_token_not_expired(self, data): - if not data: - raise InvalidUserToken('Token authorization failed') - if self._token_is_v2(data): - timestamp = data['access']['token']['expires'] - elif self._token_is_v3(data): - timestamp = data['token']['expires_at'] - else: - raise InvalidUserToken('Token authorization failed') - expires = timeutils.parse_isotime(timestamp).strftime('%s') - if time.time() >= float(expires): - self.LOG.debug('Token expired a %s', timestamp) - raise InvalidUserToken('Token authorization failed') - return expires - def _cache_put(self, token_id, data, expires): """Put token data into the cache. diff --git a/keystoneclient/tests/test_auth_token_middleware.py b/keystoneclient/tests/test_auth_token_middleware.py index 06e760989..dc5c884e5 100644 --- a/keystoneclient/tests/test_auth_token_middleware.py +++ b/keystoneclient/tests/test_auth_token_middleware.py @@ -701,6 +701,26 @@ class CommonAuthTokenMiddlewareTest(object): seconds=40) self.assertFalse(auth_token.will_expire_soon(fortyseconds)) + def test_token_is_v2_accepts_v2(self): + token = client_fixtures.UUID_TOKEN_DEFAULT + token_response = client_fixtures.TOKEN_RESPONSES[token] + self.assertTrue(auth_token._token_is_v2(token_response)) + + def test_token_is_v2_rejects_v3(self): + token = client_fixtures.v3_UUID_TOKEN_DEFAULT + token_response = client_fixtures.TOKEN_RESPONSES[token] + self.assertFalse(auth_token._token_is_v2(token_response)) + + def test_token_is_v3_rejects_v2(self): + token = client_fixtures.UUID_TOKEN_DEFAULT + token_response = client_fixtures.TOKEN_RESPONSES[token] + self.assertFalse(auth_token._token_is_v3(token_response)) + + def test_token_is_v3_accepts_v3(self): + token = client_fixtures.v3_UUID_TOKEN_DEFAULT + token_response = client_fixtures.TOKEN_RESPONSES[token] + self.assertTrue(auth_token._token_is_v3(token_response)) + def test_encrypt_cache_data(self): httpretty.disable() conf = {