Merge "Refactor for testability of an upcoming change"

This commit is contained in:
Jenkins
2013-10-07 17:17:57 +00:00
committed by Gerrit Code Review
2 changed files with 45 additions and 23 deletions

View File

@@ -309,6 +309,29 @@ def will_expire_soon(expiry):
return expiry < soon
def _token_is_v2(token_info):
return ('access' in token_info)
def _token_is_v3(token_info):
return ('token' in token_info)
def confirm_token_not_expired(data):
if not data:
raise InvalidUserToken('Token authorization failed')
if _token_is_v2(data):
timestamp = data['access']['token']['expires']
elif _token_is_v3(data):
timestamp = data['token']['expires_at']
else:
raise InvalidUserToken('Token authorization failed')
expires = timeutils.parse_isotime(timestamp).strftime('%s')
if time.time() >= float(expires):
raise InvalidUserToken('Token authorization failed')
return expires
def safe_quote(s):
"""URL-encode strings that are not already URL-encoded."""
return urllib.quote(s) if s == urllib.unquote(s) else s
@@ -783,7 +806,7 @@ class AuthProtocol(object):
data = jsonutils.loads(verified)
else:
data = self.verify_uuid_token(user_token, retry)
expires = self._confirm_token_not_expired(data)
expires = confirm_token_not_expired(data)
self._cache_put(token_id, data, expires)
return data
except NetworkError:
@@ -797,12 +820,6 @@ class AuthProtocol(object):
self.LOG.warn("Authorization failed for token %s", token_id)
raise InvalidUserToken('Token authorization failed')
def _token_is_v2(self, token_info):
return ('access' in token_info)
def _token_is_v3(self, token_info):
return ('token' in token_info)
def _build_user_headers(self, token_info):
"""Convert token object into headers.
@@ -846,7 +863,7 @@ class AuthProtocol(object):
project_domain_id = None
project_domain_name = None
if self._token_is_v2(token_info):
if _token_is_v2(token_info):
user = token_info['access']['user']
token = token_info['access']['token']
roles = ','.join([role['name'] for role in user.get('roles', [])])
@@ -1019,21 +1036,6 @@ class AuthProtocol(object):
data_to_store,
timeout=self.token_cache_time)
def _confirm_token_not_expired(self, data):
if not data:
raise InvalidUserToken('Token authorization failed')
if self._token_is_v2(data):
timestamp = data['access']['token']['expires']
elif self._token_is_v3(data):
timestamp = data['token']['expires_at']
else:
raise InvalidUserToken('Token authorization failed')
expires = timeutils.parse_isotime(timestamp).strftime('%s')
if time.time() >= float(expires):
self.LOG.debug('Token expired a %s', timestamp)
raise InvalidUserToken('Token authorization failed')
return expires
def _cache_put(self, token_id, data, expires):
"""Put token data into the cache.

View File

@@ -704,6 +704,26 @@ class CommonAuthTokenMiddlewareTest(object):
seconds=40)
self.assertFalse(auth_token.will_expire_soon(fortyseconds))
def test_token_is_v2_accepts_v2(self):
token = client_fixtures.UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token]
self.assertTrue(auth_token._token_is_v2(token_response))
def test_token_is_v2_rejects_v3(self):
token = client_fixtures.v3_UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token]
self.assertFalse(auth_token._token_is_v2(token_response))
def test_token_is_v3_rejects_v2(self):
token = client_fixtures.UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token]
self.assertFalse(auth_token._token_is_v3(token_response))
def test_token_is_v3_accepts_v3(self):
token = client_fixtures.v3_UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token]
self.assertTrue(auth_token._token_is_v3(token_response))
def test_encrypt_cache_data(self):
httpretty.disable()
conf = {