Merge "Normalize datetimes to account for tz"

This commit is contained in:
Jenkins
2013-10-09 17:23:18 +00:00
committed by Gerrit Code Review
2 changed files with 236 additions and 7 deletions

View File

@@ -326,10 +326,12 @@ def confirm_token_not_expired(data):
timestamp = data['token']['expires_at']
else:
raise InvalidUserToken('Token authorization failed')
expires = timeutils.parse_isotime(timestamp).strftime('%s')
if time.time() >= float(expires):
expires = timeutils.parse_isotime(timestamp)
expires = timeutils.normalize_time(expires)
utcnow = timeutils.utcnow()
if utcnow >= expires:
raise InvalidUserToken('Token authorization failed')
return expires
return timeutils.isotime(at=expires, subsecond=True)
def safe_quote(s):
@@ -999,7 +1001,18 @@ class AuthProtocol(object):
raise InvalidUserToken('Token authorization failed')
data, expires = cached
if ignore_expires or time.time() < float(expires):
try:
expires = timeutils.parse_isotime(expires)
except ValueError:
# Gracefully handle upgrade of expiration times from *nix
# timestamps to ISO 8601 formatted dates by ignoring old cached
# values.
return
expires = timeutils.normalize_time(expires)
utcnow = timeutils.utcnow()
if ignore_expires or utcnow < expires:
self.LOG.debug('Returning cached token %s', token_id)
return data
else:

View File

@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import calendar
import datetime
import iso8601
import os
@@ -733,7 +734,9 @@ class CommonAuthTokenMiddlewareTest(object):
}
self.set_middleware(conf=conf)
token = 'my_token'
data = ('this_data', 10e100)
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
expires = timeutils.strtime(some_time_later)
data = ('this_data', expires)
self.middleware._init_cache({})
self.middleware._cache_store(token, data)
self.assertEqual(self.middleware._cache_get(token), data[0])
@@ -747,7 +750,9 @@ class CommonAuthTokenMiddlewareTest(object):
}
self.set_middleware(conf=conf)
token = 'my_token'
data = ('this_data', 10e100)
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
expires = timeutils.strtime(some_time_later)
data = ('this_data', expires)
self.middleware._init_cache({})
self.middleware._cache_store(token, data)
self.assertEqual(self.middleware._cache_get(token), data[0])
@@ -760,7 +765,9 @@ class CommonAuthTokenMiddlewareTest(object):
}
self.set_middleware(conf=conf)
token = 'my_token'
data = ('this_data', 10e100)
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
expires = timeutils.strtime(some_time_later)
data = ('this_data', expires)
self.middleware._init_cache({})
self.middleware._cache_store(token, data)
self.assertEqual(self.middleware._cache_get(token), data[0])
@@ -1247,3 +1254,212 @@ class TokenEncodingTest(testtools.TestCase):
def test_quoted_token(self):
self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))
class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
def setUp(self):
super(TokenExpirationTest, self).setUp()
timeutils.set_time_override()
self.now = timeutils.utcnow()
self.delta = datetime.timedelta(hours=1)
self.one_hour_ago = timeutils.isotime(self.now - self.delta,
subsecond=True)
self.one_hour_earlier = timeutils.isotime(self.now + self.delta,
subsecond=True)
def tearDown(self):
super(TokenExpirationTest, self).tearDown()
timeutils.clear_time_override()
def create_v2_token_fixture(self, expires=None):
v2_fixture = {
'access': {
'token': {
'id': 'blah',
'expires': expires or self.one_hour_earlier,
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
'serviceCatalog': {}
},
}
return v2_fixture
def create_v3_token_fixture(self, expires=None):
v3_fixture = {
'token': {
'expires_at': expires or self.one_hour_earlier,
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
}
return v3_fixture
def test_no_data(self):
data = {}
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)
def test_bad_data(self):
data = {'my_happy_token_dict': 'woo'}
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)
def test_v2_token_not_expired(self):
data = self.create_v2_token_fixture()
expected_expires = data['access']['token']['expires']
actual_expires = auth_token.confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
def test_v2_token_expired(self):
data = self.create_v2_token_fixture(expires=self.one_hour_ago)
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)
def test_v2_token_with_timezone_offset_not_expired(self):
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
current_time = timeutils.normalize_time(current_time)
timeutils.set_time_override(current_time)
data = self.create_v2_token_fixture(
expires='2000-01-01T00:05:10.000123-05:00')
expected_expires = '2000-01-01T05:05:10.000123Z'
actual_expires = auth_token.confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
def test_v2_token_with_timezone_offset_expired(self):
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
current_time = timeutils.normalize_time(current_time)
timeutils.set_time_override(current_time)
data = self.create_v2_token_fixture(
expires='2000-01-01T00:05:10.000123+05:00')
data['access']['token']['expires'] = '2000-01-01T00:05:10.000123+05:00'
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)
def test_v3_token_not_expired(self):
data = self.create_v3_token_fixture()
expected_expires = data['token']['expires_at']
actual_expires = auth_token.confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
def test_v3_token_expired(self):
data = self.create_v3_token_fixture(expires=self.one_hour_ago)
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)
def test_v3_token_with_timezone_offset_not_expired(self):
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
current_time = timeutils.normalize_time(current_time)
timeutils.set_time_override(current_time)
data = self.create_v3_token_fixture(
expires='2000-01-01T00:05:10.000123-05:00')
expected_expires = '2000-01-01T05:05:10.000123Z'
actual_expires = auth_token.confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
def test_v3_token_with_timezone_offset_expired(self):
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
current_time = timeutils.normalize_time(current_time)
timeutils.set_time_override(current_time)
data = self.create_v3_token_fixture(
expires='2000-01-01T00:05:10.000123+05:00')
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
data)
def test_cached_token_not_expired(self):
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
some_time_later = timeutils.strtime(at=(self.now + self.delta))
expires = some_time_later
self.middleware._cache_put(token, data, expires)
self.assertEqual(self.middleware._cache_get(token), data)
def test_cached_token_not_expired_with_old_style_nix_timestamp(self):
"""Ensure we cannot retrieve a token from the cache.
Getting a token from the cache should return None when the token data
in the cache stores the expires time as a *nix style timestamp.
"""
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
some_time_later = self.now + self.delta
# Store a unix timestamp in the cache.
expires = calendar.timegm(some_time_later.timetuple())
self.middleware._cache_put(token, data, expires)
self.assertIsNone(self.middleware._cache_get(token))
def test_cached_token_expired(self):
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
some_time_earlier = timeutils.strtime(at=(self.now - self.delta))
expires = some_time_earlier
self.middleware._cache_put(token, data, expires)
self.assertIsNone(self.middleware._cache_get(token))
def test_cached_token_with_timezone_offset_not_expired(self):
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
timezone_offset = datetime.timedelta(hours=2)
some_time_later = self.now - timezone_offset + self.delta
expires = timeutils.strtime(some_time_later) + '-02:00'
self.middleware._cache_put(token, data, expires)
self.assertEqual(self.middleware._cache_get(token), data)
def test_cached_token_with_timezone_offset_expired(self):
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._init_cache({})
timezone_offset = datetime.timedelta(hours=2)
some_time_earlier = self.now - timezone_offset - self.delta
expires = timeutils.strtime(some_time_earlier) + '-02:00'
self.middleware._cache_put(token, data, expires)
self.assertIsNone(self.middleware._cache_get(token))