Merge "add pooling for cache references"
This commit is contained in:
@@ -143,6 +143,7 @@ keystone.token_info
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import contextlib
|
||||||
import datetime
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
@@ -450,9 +451,9 @@ class AuthProtocol(object):
|
|||||||
self.admin_password = self._conf_get('admin_password')
|
self.admin_password = self._conf_get('admin_password')
|
||||||
self.admin_tenant_name = self._conf_get('admin_tenant_name')
|
self.admin_tenant_name = self._conf_get('admin_tenant_name')
|
||||||
|
|
||||||
# Token caching via memcache
|
# Token caching
|
||||||
self._cache = None
|
self._cache_pool = None
|
||||||
self._cache_initialized = False # cache already initialized?
|
self._cache_initialized = False
|
||||||
# memcache value treatment, ENCRYPT or MAC
|
# memcache value treatment, ENCRYPT or MAC
|
||||||
self._memcache_security_strategy = \
|
self._memcache_security_strategy = \
|
||||||
self._conf_get('memcache_security_strategy')
|
self._conf_get('memcache_security_strategy')
|
||||||
@@ -489,16 +490,9 @@ class AuthProtocol(object):
|
|||||||
'is defined')
|
'is defined')
|
||||||
|
|
||||||
def _init_cache(self, env):
|
def _init_cache(self, env):
|
||||||
cache = self._conf_get('cache')
|
self._cache_pool = CachePool(
|
||||||
memcache_servers = self._conf_get('memcached_servers')
|
env.get(self._conf_get('cache')),
|
||||||
|
self._conf_get('memcached_servers'))
|
||||||
if cache and env.get(cache) is not None:
|
|
||||||
# use the cache from the upstream filter
|
|
||||||
self.LOG.info('Using %s memcache for caching token', cache)
|
|
||||||
self._cache = env.get(cache)
|
|
||||||
else:
|
|
||||||
# use Keystone memcache
|
|
||||||
self._cache = memorycache.get_client(memcache_servers)
|
|
||||||
self._cache_initialized = True
|
self._cache_initialized = True
|
||||||
|
|
||||||
def _conf_get(self, name):
|
def _conf_get(self, name):
|
||||||
@@ -980,10 +974,11 @@ class AuthProtocol(object):
|
|||||||
return token only if fresh (not expired).
|
return token only if fresh (not expired).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if self._cache and token_id:
|
if token_id:
|
||||||
if self._memcache_security_strategy is None:
|
if self._memcache_security_strategy is None:
|
||||||
key = CACHE_KEY_TEMPLATE % token_id
|
key = CACHE_KEY_TEMPLATE % token_id
|
||||||
serialized = self._cache.get(key)
|
with self._cache_pool.reserve() as cache:
|
||||||
|
serialized = cache.get(key)
|
||||||
else:
|
else:
|
||||||
secret_key = self._memcache_secret_key
|
secret_key = self._memcache_secret_key
|
||||||
if isinstance(secret_key, six.string_types):
|
if isinstance(secret_key, six.string_types):
|
||||||
@@ -997,7 +992,8 @@ class AuthProtocol(object):
|
|||||||
security_strategy)
|
security_strategy)
|
||||||
cache_key = CACHE_KEY_TEMPLATE % (
|
cache_key = CACHE_KEY_TEMPLATE % (
|
||||||
memcache_crypt.get_cache_key(keys))
|
memcache_crypt.get_cache_key(keys))
|
||||||
raw_cached = self._cache.get(cache_key)
|
with self._cache_pool.reserve() as cache:
|
||||||
|
raw_cached = cache.get(cache_key)
|
||||||
try:
|
try:
|
||||||
# unprotect_data will return None if raw_cached is None
|
# unprotect_data will return None if raw_cached is None
|
||||||
serialized = memcache_crypt.unprotect_data(keys,
|
serialized = memcache_crypt.unprotect_data(keys,
|
||||||
@@ -1064,9 +1060,8 @@ class AuthProtocol(object):
|
|||||||
cache_key = CACHE_KEY_TEMPLATE % memcache_crypt.get_cache_key(keys)
|
cache_key = CACHE_KEY_TEMPLATE % memcache_crypt.get_cache_key(keys)
|
||||||
data_to_store = memcache_crypt.protect_data(keys, serialized_data)
|
data_to_store = memcache_crypt.protect_data(keys, serialized_data)
|
||||||
|
|
||||||
self._cache.set(cache_key,
|
with self._cache_pool.reserve() as cache:
|
||||||
data_to_store,
|
cache.set(cache_key, data_to_store, time=self.token_cache_time)
|
||||||
time=self.token_cache_time)
|
|
||||||
|
|
||||||
def _invalid_user_token(self, msg=False):
|
def _invalid_user_token(self, msg=False):
|
||||||
# NOTE(jamielennox): use False as the default so that None is valid
|
# NOTE(jamielennox): use False as the default so that None is valid
|
||||||
@@ -1146,16 +1141,13 @@ class AuthProtocol(object):
|
|||||||
quick check of token freshness on retrieval.
|
quick check of token freshness on retrieval.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
if self._cache:
|
self.LOG.debug('Storing token in cache')
|
||||||
self.LOG.debug('Storing token in memcache')
|
self._cache_store(token_id, (data, expires))
|
||||||
self._cache_store(token_id, (data, expires))
|
|
||||||
|
|
||||||
def _cache_store_invalid(self, token_id):
|
def _cache_store_invalid(self, token_id):
|
||||||
"""Store invalid token in cache."""
|
"""Store invalid token in cache."""
|
||||||
if self._cache:
|
self.LOG.debug('Marking token as unauthorized in cache')
|
||||||
self.LOG.debug(
|
self._cache_store(token_id, 'invalid')
|
||||||
'Marking token as unauthorized in memcache')
|
|
||||||
self._cache_store(token_id, 'invalid')
|
|
||||||
|
|
||||||
def cert_file_missing(self, proc_output, file_name):
|
def cert_file_missing(self, proc_output, file_name):
|
||||||
return (file_name in proc_output and not os.path.exists(file_name))
|
return (file_name in proc_output and not os.path.exists(file_name))
|
||||||
@@ -1374,6 +1366,33 @@ class AuthProtocol(object):
|
|||||||
self._fetch_cert_file(self.signing_ca_file_name, 'ca')
|
self._fetch_cert_file(self.signing_ca_file_name, 'ca')
|
||||||
|
|
||||||
|
|
||||||
|
class CachePool(list):
|
||||||
|
"""A lazy pool of cache references."""
|
||||||
|
|
||||||
|
def __init__(self, cache, memcached_servers):
|
||||||
|
self._environment_cache = cache
|
||||||
|
self._memcached_servers = memcached_servers
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def reserve(self):
|
||||||
|
"""Context manager to manage a pooled cache reference."""
|
||||||
|
if self._environment_cache is not None:
|
||||||
|
# skip pooling and just use the cache from the upstream filter
|
||||||
|
yield self._environment_cache
|
||||||
|
return # otherwise the context manager will continue!
|
||||||
|
|
||||||
|
try:
|
||||||
|
c = self.pop()
|
||||||
|
except IndexError:
|
||||||
|
# the pool is empty, so we need to create a new client
|
||||||
|
c = memorycache.get_client(self._memcached_servers)
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield c
|
||||||
|
finally:
|
||||||
|
self.append(c)
|
||||||
|
|
||||||
|
|
||||||
def filter_factory(global_conf, **local_conf):
|
def filter_factory(global_conf, **local_conf):
|
||||||
"""Returns a WSGI filter app for use with paste.deploy."""
|
"""Returns a WSGI filter app for use with paste.deploy."""
|
||||||
conf = global_conf.copy()
|
conf = global_conf.copy()
|
||||||
|
@@ -423,7 +423,8 @@ class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest):
|
|||||||
}
|
}
|
||||||
self.set_middleware(conf=conf)
|
self.set_middleware(conf=conf)
|
||||||
self.middleware._init_cache(env)
|
self.middleware._init_cache(env)
|
||||||
self.assertNotEqual(self.middleware._cache, 'CACHE_TEST')
|
with self.middleware._cache_pool.reserve() as cache:
|
||||||
|
self.assertNotEqual(cache, 'CACHE_TEST')
|
||||||
|
|
||||||
|
|
||||||
class CommonAuthTokenMiddlewareTest(object):
|
class CommonAuthTokenMiddlewareTest(object):
|
||||||
@@ -737,7 +738,8 @@ class CommonAuthTokenMiddlewareTest(object):
|
|||||||
}
|
}
|
||||||
self.set_middleware(conf=conf)
|
self.set_middleware(conf=conf)
|
||||||
self.middleware._init_cache(env)
|
self.middleware._init_cache(env)
|
||||||
self.assertEqual(self.middleware._cache, 'CACHE_TEST')
|
with self.middleware._cache_pool.reserve() as cache:
|
||||||
|
self.assertEqual(cache, 'CACHE_TEST')
|
||||||
|
|
||||||
def test_will_expire_soon(self):
|
def test_will_expire_soon(self):
|
||||||
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
|
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
|
||||||
|
Reference in New Issue
Block a user