Normalize datetimes to account for tz

This patch makes sure that datetimes in the auth_token middleware
are normalized to account for timezone offsets.

Some of the old tests were changed to ensure that the expires string
stored in the cache is in ISO 8601 format and not a random float.

Fixes bug 1195924

Change-Id: I5917ab728193cd2aa8784c4860a96cdc17f3d43f
This commit is contained in:
Bryan Davidson
2013-08-30 13:38:37 -04:00
parent e9fb6c7c8f
commit 93793cb396
2 changed files with 236 additions and 7 deletions

View File

@@ -326,10 +326,12 @@ def confirm_token_not_expired(data):
timestamp = data['token']['expires_at']
else:
raise InvalidUserToken('Token authorization failed')
expires = timeutils.parse_isotime(timestamp).strftime('%s')
if time.time() >= float(expires):
expires = timeutils.parse_isotime(timestamp)
expires = timeutils.normalize_time(expires)
utcnow = timeutils.utcnow()
if utcnow >= expires:
raise InvalidUserToken('Token authorization failed')
return expires
return timeutils.isotime(at=expires, subsecond=True)
def safe_quote(s):
@@ -998,7 +1000,18 @@ class AuthProtocol(object):
raise InvalidUserToken('Token authorization failed')
data, expires = cached
if ignore_expires or time.time() < float(expires):
try:
expires = timeutils.parse_isotime(expires)
except ValueError:
# Gracefully handle upgrade of expiration times from *nix
# timestamps to ISO 8601 formatted dates by ignoring old cached
# values.
return
expires = timeutils.normalize_time(expires)
utcnow = timeutils.utcnow()
if ignore_expires or utcnow < expires:
self.LOG.debug('Returning cached token %s', token_id)
return data
else:

View File

@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import calendar
import datetime
import iso8601
import os
@@ -733,7 +734,9 @@ class CommonAuthTokenMiddlewareTest(object):
}
self.set_middleware(conf=conf)
token = 'my_token'
data = ('this_data', 10e100)
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
expires = timeutils.strtime(some_time_later)
data = ('this_data', expires)
self.middleware._init_cache({})
self.middleware._cache_store(token, data)
self.assertEqual(self.middleware._cache_get(token), data[0])
@@ -747,7 +750,9 @@ class CommonAuthTokenMiddlewareTest(object):
}
self.set_middleware(conf=conf)
token = 'my_token'
data = ('this_data', 10e100)
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
expires = timeutils.strtime(some_time_later)
data = ('this_data', expires)
self.middleware._init_cache({})
self.middleware._cache_store(token, data)
self.assertEqual(self.middleware._cache_get(token), data[0])
@@ -760,7 +765,9 @@ class CommonAuthTokenMiddlewareTest(object):
}
self.set_middleware(conf=conf)
token = 'my_token'
data = ('this_data', 10e100)
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
expires = timeutils.strtime(some_time_later)
data = ('this_data', expires)
self.middleware._init_cache({})
self.middleware._cache_store(token, data)
self.assertEqual(self.middleware._cache_get(token), data[0])
@@ -1242,3 +1249,212 @@ class TokenEncodingTest(testtools.TestCase):
def test_quoted_token(self):
self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))
class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
def setUp(self):
super(TokenExpirationTest, self).setUp()
timeutils.set_time_override()
self.now = timeutils.utcnow()
self.delta = datetime.timedelta(hours=1)
self.one_hour_ago = timeutils.isotime(self.now - self.delta,
subsecond=True)
self.one_hour_earlier = timeutils.isotime(self.now + self.delta,
subsecond=True)
def tearDown(self):
super(TokenExpirationTest, self).tearDown()
timeutils.clear_time_override()
def create_v2_token_fixture(self, expires=None):
v2_fixture = {
'access': {
'token': {
'id': 'blah',
'expires': expires or self.one_hour_earlier,
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
'serviceCatalog': {}
},
}
return v2_fixture
def create_v3_token_fixture(self, expires=None):
v3_fixture = {
'token': {
'expires_at': expires or self.one_hour_earlier,
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
}
return v3_fixture
def test_no_data(self):
data = {}
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)
def test_bad_data(self):
data = {'my_happy_token_dict': 'woo'}
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)
def test_v2_token_not_expired(self):
data = self.create_v2_token_fixture()
expected_expires = data['access']['token']['expires']
actual_expires = auth_token.confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
def test_v2_token_expired(self):
data = self.create_v2_token_fixture(expires=self.one_hour_ago)
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)
def test_v2_token_with_timezone_offset_not_expired(self):
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
current_time = timeutils.normalize_time(current_time)
timeutils.set_time_override(current_time)
data = self.create_v2_token_fixture(
expires='2000-01-01T00:05:10.000123-05:00')
expected_expires = '2000-01-01T05:05:10.000123Z'
actual_expires = auth_token.confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
def test_v2_token_with_timezone_offset_expired(self):
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
current_time = timeutils.normalize_time(current_time)
timeutils.set_time_override(current_time)
data = self.create_v2_token_fixture(
expires='2000-01-01T00:05:10.000123+05:00')
data['access']['token']['expires'] = '2000-01-01T00:05:10.000123+05:00'
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)
def test_v3_token_not_expired(self):
data = self.create_v3_token_fixture()
expected_expires = data['token']['expires_at']
actual_expires = auth_token.confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
def test_v3_token_expired(self):
data = self.create_v3_token_fixture(expires=self.one_hour_ago)
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)
def test_v3_token_with_timezone_offset_not_expired(self):
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
current_time = timeutils.normalize_time(current_time)
timeutils.set_time_override(current_time)
data = self.create_v3_token_fixture(
expires='2000-01-01T00:05:10.000123-05:00')
expected_expires = '2000-01-01T05:05:10.000123Z'
actual_expires = auth_token.confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
def test_v3_token_with_timezone_offset_expired(self):
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
current_time = timeutils.normalize_time(current_time)
timeutils.set_time_override(current_time)
data = self.create_v3_token_fixture(
expires='2000-01-01T00:05:10.000123+05:00')
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)
def test_cached_token_not_expired(self):
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
some_time_later = timeutils.strtime(at=(self.now + self.delta))
expires = some_time_later
self.middleware._cache_put(token, data, expires)
self.assertEqual(self.middleware._cache_get(token), data)
def test_cached_token_not_expired_with_old_style_nix_timestamp(self):
"""Ensure we cannot retrieve a token from the cache.
Getting a token from the cache should return None when the token data
in the cache stores the expires time as a *nix style timestamp.
"""
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
some_time_later = self.now + self.delta
# Store a unix timestamp in the cache.
expires = calendar.timegm(some_time_later.timetuple())
self.middleware._cache_put(token, data, expires)
self.assertIsNone(self.middleware._cache_get(token))
def test_cached_token_expired(self):
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
some_time_earlier = timeutils.strtime(at=(self.now - self.delta))
expires = some_time_earlier
self.middleware._cache_put(token, data, expires)
self.assertIsNone(self.middleware._cache_get(token))
def test_cached_token_with_timezone_offset_not_expired(self):
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
timezone_offset = datetime.timedelta(hours=2)
some_time_later = self.now - timezone_offset + self.delta
expires = timeutils.strtime(some_time_later) + '-02:00'
self.middleware._cache_put(token, data, expires)
self.assertEqual(self.middleware._cache_get(token), data)
def test_cached_token_with_timezone_offset_expired(self):
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
timezone_offset = datetime.timedelta(hours=2)
some_time_earlier = self.now - timezone_offset - self.delta
expires = timeutils.strtime(some_time_earlier) + '-02:00'
self.middleware._cache_put(token, data, expires)
self.assertIsNone(self.middleware._cache_get(token))