Merge "Move auth_token tests not requiring v2/v3 to new class"
This commit is contained in:
@@ -421,7 +421,141 @@ class CachePoolTest(BaseAuthTokenMiddlewareTest):
|
||||
set([inner_cache, outer_cache]), set(self.middleware._cache_pool))
|
||||
|
||||
|
||||
class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
|
||||
testresources.ResourcedTestCase):
|
||||
"""These tests are not affected by the token format
|
||||
(see CommonAuthTokenMiddlewareTest).
|
||||
"""
|
||||
|
||||
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
|
||||
|
||||
def test_will_expire_soon(self):
|
||||
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
|
||||
seconds=10)
|
||||
self.assertTrue(auth_token.will_expire_soon(tenseconds))
|
||||
fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
|
||||
seconds=40)
|
||||
self.assertFalse(auth_token.will_expire_soon(fortyseconds))
|
||||
|
||||
def test_token_is_v2_accepts_v2(self):
|
||||
token = self.examples.UUID_TOKEN_DEFAULT
|
||||
token_response = self.examples.TOKEN_RESPONSES[token]
|
||||
self.assertTrue(auth_token._token_is_v2(token_response))
|
||||
|
||||
def test_token_is_v2_rejects_v3(self):
|
||||
token = self.examples.v3_UUID_TOKEN_DEFAULT
|
||||
token_response = self.examples.TOKEN_RESPONSES[token]
|
||||
self.assertFalse(auth_token._token_is_v2(token_response))
|
||||
|
||||
def test_token_is_v3_rejects_v2(self):
|
||||
token = self.examples.UUID_TOKEN_DEFAULT
|
||||
token_response = self.examples.TOKEN_RESPONSES[token]
|
||||
self.assertFalse(auth_token._token_is_v3(token_response))
|
||||
|
||||
def test_token_is_v3_accepts_v3(self):
|
||||
token = self.examples.v3_UUID_TOKEN_DEFAULT
|
||||
token_response = self.examples.TOKEN_RESPONSES[token]
|
||||
self.assertTrue(auth_token._token_is_v3(token_response))
|
||||
|
||||
@testtools.skipUnless(memcached_available(), 'memcached not available')
|
||||
def test_encrypt_cache_data(self):
|
||||
httpretty.disable()
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'encrypt',
|
||||
'memcache_secret_key': 'mysecret'
|
||||
}
|
||||
self.set_middleware(conf=conf)
|
||||
token = b'my_token'
|
||||
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
|
||||
expires = timeutils.strtime(some_time_later)
|
||||
data = ('this_data', expires)
|
||||
self.middleware._init_cache({})
|
||||
self.middleware._cache_store(token, data)
|
||||
self.assertEqual(self.middleware._cache_get(token), data[0])
|
||||
|
||||
@testtools.skipUnless(memcached_available(), 'memcached not available')
|
||||
def test_sign_cache_data(self):
|
||||
httpretty.disable()
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'mac',
|
||||
'memcache_secret_key': 'mysecret'
|
||||
}
|
||||
self.set_middleware(conf=conf)
|
||||
token = b'my_token'
|
||||
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
|
||||
expires = timeutils.strtime(some_time_later)
|
||||
data = ('this_data', expires)
|
||||
self.middleware._init_cache({})
|
||||
self.middleware._cache_store(token, data)
|
||||
self.assertEqual(self.middleware._cache_get(token), data[0])
|
||||
|
||||
@testtools.skipUnless(memcached_available(), 'memcached not available')
|
||||
def test_no_memcache_protection(self):
|
||||
httpretty.disable()
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_secret_key': 'mysecret'
|
||||
}
|
||||
self.set_middleware(conf=conf)
|
||||
token = 'my_token'
|
||||
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
|
||||
expires = timeutils.strtime(some_time_later)
|
||||
data = ('this_data', expires)
|
||||
self.middleware._init_cache({})
|
||||
self.middleware._cache_store(token, data)
|
||||
self.assertEqual(self.middleware._cache_get(token), data[0])
|
||||
|
||||
def test_assert_valid_memcache_protection_config(self):
|
||||
# test missing memcache_secret_key
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'Encrypt'
|
||||
}
|
||||
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
|
||||
conf=conf)
|
||||
# test invalue memcache_security_strategy
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'whatever'
|
||||
}
|
||||
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
|
||||
conf=conf)
|
||||
# test missing memcache_secret_key
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'mac'
|
||||
}
|
||||
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
|
||||
conf=conf)
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'Encrypt',
|
||||
'memcache_secret_key': ''
|
||||
}
|
||||
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
|
||||
conf=conf)
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'mAc',
|
||||
'memcache_secret_key': ''
|
||||
}
|
||||
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
|
||||
conf=conf)
|
||||
|
||||
def test_config_revocation_cache_timeout(self):
|
||||
conf = {
|
||||
'revocation_cache_time': 24,
|
||||
'auth_uri': 'https://keystone.example.com:1234',
|
||||
}
|
||||
middleware = auth_token.AuthProtocol(self.fake_app, conf)
|
||||
self.assertEqual(middleware.token_revocation_list_cache_timeout,
|
||||
datetime.timedelta(seconds=24))
|
||||
|
||||
|
||||
class CommonAuthTokenMiddlewareTest(object):
|
||||
"""These tests are run once using v2 tokens and again using v3 tokens."""
|
||||
|
||||
def test_init_does_not_call_http(self):
|
||||
conf = {
|
||||
@@ -786,130 +920,6 @@ class CommonAuthTokenMiddlewareTest(object):
|
||||
extra_environ = {'swift.cache': memorycache.Client()}
|
||||
self.test_memcache_set_expired(extra_conf, extra_environ)
|
||||
|
||||
def test_will_expire_soon(self):
|
||||
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
|
||||
seconds=10)
|
||||
self.assertTrue(auth_token.will_expire_soon(tenseconds))
|
||||
fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
|
||||
seconds=40)
|
||||
self.assertFalse(auth_token.will_expire_soon(fortyseconds))
|
||||
|
||||
def test_token_is_v2_accepts_v2(self):
|
||||
token = self.examples.UUID_TOKEN_DEFAULT
|
||||
token_response = self.examples.TOKEN_RESPONSES[token]
|
||||
self.assertTrue(auth_token._token_is_v2(token_response))
|
||||
|
||||
def test_token_is_v2_rejects_v3(self):
|
||||
token = self.examples.v3_UUID_TOKEN_DEFAULT
|
||||
token_response = self.examples.TOKEN_RESPONSES[token]
|
||||
self.assertFalse(auth_token._token_is_v2(token_response))
|
||||
|
||||
def test_token_is_v3_rejects_v2(self):
|
||||
token = self.examples.UUID_TOKEN_DEFAULT
|
||||
token_response = self.examples.TOKEN_RESPONSES[token]
|
||||
self.assertFalse(auth_token._token_is_v3(token_response))
|
||||
|
||||
def test_token_is_v3_accepts_v3(self):
|
||||
token = self.examples.v3_UUID_TOKEN_DEFAULT
|
||||
token_response = self.examples.TOKEN_RESPONSES[token]
|
||||
self.assertTrue(auth_token._token_is_v3(token_response))
|
||||
|
||||
@testtools.skipUnless(memcached_available(), 'memcached not available')
|
||||
def test_encrypt_cache_data(self):
|
||||
httpretty.disable()
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'encrypt',
|
||||
'memcache_secret_key': 'mysecret'
|
||||
}
|
||||
self.set_middleware(conf=conf)
|
||||
token = b'my_token'
|
||||
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
|
||||
expires = timeutils.strtime(some_time_later)
|
||||
data = ('this_data', expires)
|
||||
self.middleware._init_cache({})
|
||||
self.middleware._cache_store(token, data)
|
||||
self.assertEqual(self.middleware._cache_get(token), data[0])
|
||||
|
||||
@testtools.skipUnless(memcached_available(), 'memcached not available')
|
||||
def test_sign_cache_data(self):
|
||||
httpretty.disable()
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'mac',
|
||||
'memcache_secret_key': 'mysecret'
|
||||
}
|
||||
self.set_middleware(conf=conf)
|
||||
token = b'my_token'
|
||||
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
|
||||
expires = timeutils.strtime(some_time_later)
|
||||
data = ('this_data', expires)
|
||||
self.middleware._init_cache({})
|
||||
self.middleware._cache_store(token, data)
|
||||
self.assertEqual(self.middleware._cache_get(token), data[0])
|
||||
|
||||
@testtools.skipUnless(memcached_available(), 'memcached not available')
|
||||
def test_no_memcache_protection(self):
|
||||
httpretty.disable()
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_secret_key': 'mysecret'
|
||||
}
|
||||
self.set_middleware(conf=conf)
|
||||
token = 'my_token'
|
||||
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
|
||||
expires = timeutils.strtime(some_time_later)
|
||||
data = ('this_data', expires)
|
||||
self.middleware._init_cache({})
|
||||
self.middleware._cache_store(token, data)
|
||||
self.assertEqual(self.middleware._cache_get(token), data[0])
|
||||
|
||||
def test_assert_valid_memcache_protection_config(self):
|
||||
# test missing memcache_secret_key
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'Encrypt'
|
||||
}
|
||||
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
|
||||
conf=conf)
|
||||
# test invalue memcache_security_strategy
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'whatever'
|
||||
}
|
||||
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
|
||||
conf=conf)
|
||||
# test missing memcache_secret_key
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'mac'
|
||||
}
|
||||
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
|
||||
conf=conf)
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'Encrypt',
|
||||
'memcache_secret_key': ''
|
||||
}
|
||||
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
|
||||
conf=conf)
|
||||
conf = {
|
||||
'memcached_servers': MEMCACHED_SERVERS,
|
||||
'memcache_security_strategy': 'mAc',
|
||||
'memcache_secret_key': ''
|
||||
}
|
||||
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
|
||||
conf=conf)
|
||||
|
||||
def test_config_revocation_cache_timeout(self):
|
||||
conf = {
|
||||
'revocation_cache_time': 24,
|
||||
'auth_uri': 'https://keystone.example.com:1234',
|
||||
}
|
||||
middleware = auth_token.AuthProtocol(self.fake_app, conf)
|
||||
self.assertEqual(middleware.token_revocation_list_cache_timeout,
|
||||
datetime.timedelta(seconds=24))
|
||||
|
||||
def test_http_error_not_cached_token(self):
|
||||
"""Test to don't cache token as invalid on network errors.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user