Refactor for testability of an upcoming change

confirm_token_not_expired() in keystoneclient/middleware/auth_token.py has
been moved out of the class to make it a function and be more testable.
Currently, there is no need to keep it within the class. An upcoming
commit makes fixes that rely on this refactor to be tested.

Change-Id: I8460a2ee663dec8be0f339735208779a3b988040
This commit is contained in:
Bryan Davidson
2013-08-30 12:31:12 -04:00
parent baa949017a
commit a2e7b17810
2 changed files with 45 additions and 23 deletions

View File

@@ -309,6 +309,29 @@ def will_expire_soon(expiry):
return expiry < soon return expiry < soon
def _token_is_v2(token_info):
return ('access' in token_info)
def _token_is_v3(token_info):
return ('token' in token_info)
def confirm_token_not_expired(data):
if not data:
raise InvalidUserToken('Token authorization failed')
if _token_is_v2(data):
timestamp = data['access']['token']['expires']
elif _token_is_v3(data):
timestamp = data['token']['expires_at']
else:
raise InvalidUserToken('Token authorization failed')
expires = timeutils.parse_isotime(timestamp).strftime('%s')
if time.time() >= float(expires):
raise InvalidUserToken('Token authorization failed')
return expires
def safe_quote(s): def safe_quote(s):
"""URL-encode strings that are not already URL-encoded.""" """URL-encode strings that are not already URL-encoded."""
return urllib.quote(s) if s == urllib.unquote(s) else s return urllib.quote(s) if s == urllib.unquote(s) else s
@@ -783,7 +806,7 @@ class AuthProtocol(object):
data = jsonutils.loads(verified) data = jsonutils.loads(verified)
else: else:
data = self.verify_uuid_token(user_token, retry) data = self.verify_uuid_token(user_token, retry)
expires = self._confirm_token_not_expired(data) expires = confirm_token_not_expired(data)
self._cache_put(token_id, data, expires) self._cache_put(token_id, data, expires)
return data return data
except NetworkError: except NetworkError:
@@ -797,12 +820,6 @@ class AuthProtocol(object):
self.LOG.warn("Authorization failed for token %s", token_id) self.LOG.warn("Authorization failed for token %s", token_id)
raise InvalidUserToken('Token authorization failed') raise InvalidUserToken('Token authorization failed')
def _token_is_v2(self, token_info):
return ('access' in token_info)
def _token_is_v3(self, token_info):
return ('token' in token_info)
def _build_user_headers(self, token_info): def _build_user_headers(self, token_info):
"""Convert token object into headers. """Convert token object into headers.
@@ -846,7 +863,7 @@ class AuthProtocol(object):
project_domain_id = None project_domain_id = None
project_domain_name = None project_domain_name = None
if self._token_is_v2(token_info): if _token_is_v2(token_info):
user = token_info['access']['user'] user = token_info['access']['user']
token = token_info['access']['token'] token = token_info['access']['token']
roles = ','.join([role['name'] for role in user.get('roles', [])]) roles = ','.join([role['name'] for role in user.get('roles', [])])
@@ -1016,21 +1033,6 @@ class AuthProtocol(object):
data_to_store, data_to_store,
timeout=self.token_cache_time) timeout=self.token_cache_time)
def _confirm_token_not_expired(self, data):
if not data:
raise InvalidUserToken('Token authorization failed')
if self._token_is_v2(data):
timestamp = data['access']['token']['expires']
elif self._token_is_v3(data):
timestamp = data['token']['expires_at']
else:
raise InvalidUserToken('Token authorization failed')
expires = timeutils.parse_isotime(timestamp).strftime('%s')
if time.time() >= float(expires):
self.LOG.debug('Token expired a %s', timestamp)
raise InvalidUserToken('Token authorization failed')
return expires
def _cache_put(self, token_id, data, expires): def _cache_put(self, token_id, data, expires):
"""Put token data into the cache. """Put token data into the cache.

View File

@@ -701,6 +701,26 @@ class CommonAuthTokenMiddlewareTest(object):
seconds=40) seconds=40)
self.assertFalse(auth_token.will_expire_soon(fortyseconds)) self.assertFalse(auth_token.will_expire_soon(fortyseconds))
def test_token_is_v2_accepts_v2(self):
token = client_fixtures.UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token]
self.assertTrue(auth_token._token_is_v2(token_response))
def test_token_is_v2_rejects_v3(self):
token = client_fixtures.v3_UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token]
self.assertFalse(auth_token._token_is_v2(token_response))
def test_token_is_v3_rejects_v2(self):
token = client_fixtures.UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token]
self.assertFalse(auth_token._token_is_v3(token_response))
def test_token_is_v3_accepts_v3(self):
token = client_fixtures.v3_UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token]
self.assertTrue(auth_token._token_is_v3(token_response))
def test_encrypt_cache_data(self): def test_encrypt_cache_data(self):
httpretty.disable() httpretty.disable()
conf = { conf = {