diff --git a/marconi/common/decorators.py b/marconi/common/decorators.py index 12b0b3963..c042223e0 100644 --- a/marconi/common/decorators.py +++ b/marconi/common/decorators.py @@ -15,16 +15,22 @@ import functools +import msgpack -def cached_getattr(meth): - """Caches attributes returned by __getattr__ +import marconi.openstack.common.log as logging - It can be used to cache results from +LOG = logging.getLogger(__name__) + + +def memoized_getattr(meth): + """Memoizes attributes returned by __getattr__ + + It can be used to remember the results from __getattr__ and reduce the debt of calling it again when the same attribute is accessed. - This decorator caches attributes by setting - them in the object itself. + This decorator memoizes attributes by setting + them on the object itself. The wrapper returned by this decorator won't alter the returned value. @@ -40,6 +46,91 @@ def cached_getattr(meth): return wrapper +def caches(keygen, ttl, cond=None): + """Flags a getter method as being cached using oslo.cache. + + It is assumed that the containing class defines an attribute + named `_cache` that is an instance of an oslo.cache backend. + + The getter should raise an exception if the value can't be + loaded, which will skip the caching step. Otherwise, the + getter must return a value that can be encoded with + msgpack. + + Note that you can also flag a remover method such that it + will purge an associated item from the cache, e.g.: + + def project_cache_key(user, project=None): + return user + ':' + str(project) + + class Project(object): + def __init__(self, db, cache): + self._db = db + self._cache = cache + + @decorators.caches(project_cache_key, 60) + def get_project(self, user, project=None): + return self._db.get_project(user, project) + + @get_project.purges + def del_project(self, user, project=None): + self._db.delete_project(user, project) + + :param keygen: A static key generator function. This function + must accept the same arguments as the getter, sans `self`. + :param ttl: TTL for the cache entry, in seconds. + :param cond: Conditional for whether or not to cache the + value. Must be a function that takes a single value, and + returns True or False. + """ + + def purges_prop(remover): + + @functools.wraps(remover) + def wrapper(self, *args, **kwargs): + # First, purge from cache + key = keygen(*args, **kwargs) + del self._cache[key] + + # Remove/delete from origin + remover(self, *args, **kwargs) + + return wrapper + + def prop(getter): + + @functools.wraps(getter) + def wrapper(self, *args, **kwargs): + key = keygen(*args, **kwargs) + packed_value = self._cache.get(key) + + if packed_value is None: + value = getter(self, *args, **kwargs) + + # Cache new value if desired + if cond is None or cond(value): + # NOTE(kgriffs): Setting use_bin_type is essential + # for being able to distinguish between Unicode + # and binary strings when decoding; otherwise, + # both types are normalized to the MessagePack + # str format family. + packed_value = msgpack.packb(value, use_bin_type=True) + + if not self._cache.set(key, packed_value, ttl): + LOG.warn('Failed to cache key: ' + key) + else: + # NOTE(kgriffs): unpackb does not default to UTF-8, + # so we have to explicitly ask for it. + value = msgpack.unpackb(packed_value, encoding='utf-8') + + return value + + wrapper.purges = purges_prop + return wrapper + + return prop + + def lazy_property(write=False, delete=True): """Creates a lazy property. diff --git a/marconi/common/pipeline.py b/marconi/common/pipeline.py index f1ab5c8fd..1b4648c08 100644 --- a/marconi/common/pipeline.py +++ b/marconi/common/pipeline.py @@ -48,7 +48,7 @@ class Pipeline(object): def append(self, stage): self._pipeline.append(stage) - @decorators.cached_getattr + @decorators.memoized_getattr def __getattr__(self, name): with self.consumer_for(name) as consumer: return consumer diff --git a/marconi/queues/storage/base.py b/marconi/queues/storage/base.py index 48436779d..eb0a4a178 100644 --- a/marconi/queues/storage/base.py +++ b/marconi/queues/storage/base.py @@ -33,7 +33,7 @@ class DriverBase(object): :type conf: `oslo.config.ConfigOpts` :param cache: Cache instance to use for reducing latency for certain lookups. - :type cache: `marconi.common.cache.backends.BaseCache` + :type cache: `marconi.openstack.common.cache.backends.BaseCache` """ def __init__(self, conf, cache): self.conf = conf @@ -54,7 +54,7 @@ class DataDriverBase(DriverBase): :type conf: `oslo.config.ConfigOpts` :param cache: Cache instance to use for reducing latency for certain lookups. - :type cache: `marconi.common.cache.backends.BaseCache` + :type cache: `marconi.openstack.common.cache.backends.BaseCache` """ def __init__(self, conf, cache): @@ -96,7 +96,7 @@ class ControlDriverBase(DriverBase): :type conf: `oslo.config.ConfigOpts` :param cache: Cache instance to use for reducing latency for certain lookups. - :type cache: `marconi.common.cache.backends.BaseCache` + :type cache: `marconi.openstack.common.cache.backends.BaseCache` """ @abc.abstractproperty diff --git a/marconi/queues/storage/mongodb/queues.py b/marconi/queues/storage/mongodb/queues.py index 03259b83b..45960fbf3 100644 --- a/marconi/queues/storage/mongodb/queues.py +++ b/marconi/queues/storage/mongodb/queues.py @@ -23,6 +23,7 @@ Field Mappings: import pymongo.errors +from marconi.common import decorators from marconi.openstack.common.gettextutils import _ import marconi.openstack.common.log as logging from marconi.openstack.common import timeutils @@ -30,9 +31,33 @@ from marconi.queues import storage from marconi.queues.storage import errors from marconi.queues.storage.mongodb import utils - LOG = logging.getLogger(__name__) +# NOTE(kgriffs): E.g.: 'marconi-queuecontroller:5083853/my-queue' +_QUEUE_CACHE_PREFIX = 'queuecontroller:' + +# NOTE(kgriffs): This causes some race conditions, but they are +# harmless. If a queue was deleted, but we are still returning +# that it exists, some messages may get inserted without the +# client getting an error. In this case, those messages would +# be orphaned and expire eventually according to their TTL. +# +# What this means for the client is that they have a bug; they +# deleted a queue and then immediately tried to post messages +# to it. If they keep trying to use the queue, they will +# eventually start getting an error, once the cache entry +# expires, which should clue them in on what happened. +# +# TODO(kgriffs): Make dynamic? +_QUEUE_CACHE_TTL = 5 + + +def _queue_exists_key(queue, project=None): + # NOTE(kgriffs): Use string concatenation for performance, + # also put project first since it is guaranteed to be + # unique, which should reduce lookup time. + return _QUEUE_CACHE_PREFIX + 'exists:' + str(project) + '/' + queue + class QueueController(storage.Queue): """Implements queue resource operations using MongoDB. @@ -59,6 +84,7 @@ class QueueController(storage.Queue): def __init__(self, *args, **kwargs): super(QueueController, self).__init__(*args, **kwargs) + self._cache = self.driver.cache self._collection = self.driver.queues_database.queues # NOTE(flaper87): This creates a unique index for @@ -224,8 +250,11 @@ class QueueController(storage.Queue): else: return True + # NOTE(kgriffs): Only cache when it exists; if it doesn't exist, and + # someone creates it, we want it to be immediately visible. @utils.raises_conn_error @utils.retries_on_autoreconnect + @decorators.caches(_queue_exists_key, _QUEUE_CACHE_TTL, lambda v: v) def exists(self, name, project=None): query = _get_scoped_query(name, project) return self._collection.find_one(query) is not None @@ -243,6 +272,7 @@ class QueueController(storage.Queue): @utils.raises_conn_error @utils.retries_on_autoreconnect + @exists.purges def delete(self, name, project=None): self.driver.message_controller._purge_queue(name, project) self._collection.remove(_get_scoped_query(name, project)) diff --git a/marconi/queues/storage/sharding.py b/marconi/queues/storage/sharding.py index f24754313..1e23f7e90 100644 --- a/marconi/queues/storage/sharding.py +++ b/marconi/queues/storage/sharding.py @@ -41,7 +41,7 @@ _SHARD_CACHE_PREFIX = 'sharding:' # before "unfreezing" the queue, rather than waiting # on the TTL. # -# TODO(kgriffs): Make configurable? +# TODO(kgriffs): Make dynamic? _SHARD_CACHE_TTL = 10 @@ -106,7 +106,7 @@ class RoutingController(storage.base.ControllerBase): self._ctrl_property_name = self._resource_name + '_controller' self._shard_catalog = shard_catalog - @decorators.cached_getattr + @decorators.memoized_getattr def __getattr__(self, name): # NOTE(kgriffs): Use a closure trick to avoid # some attr lookups each time forward() is called. @@ -358,6 +358,7 @@ class Catalog(object): conf = utils.dynamic_conf(shard['uri'], shard['options']) return utils.load_storage_driver(conf, self._cache) + @decorators.caches(_shard_cache_key, _SHARD_CACHE_TTL) def _shard_id(self, queue, project=None): """Get the ID for the shard assigned to the given queue. @@ -368,19 +369,7 @@ class Catalog(object): :raises: `errors.QueueNotMapped` """ - cache_key = _shard_cache_key(queue, project) - shard_id = self._cache.get(cache_key) - - if shard_id is None: - shard_id = self._catalogue_ctrl.get(project, queue)['shard'] - - if not self._cache.set(cache_key, shard_id, _SHARD_CACHE_TTL): - LOG.warn('Failed to cache shard ID') - - return shard_id - - def _invalidate_cached_id(self, queue, project=None): - self._cache.unset_many([_shard_cache_key(queue, project)]) + return self._catalogue_ctrl.get(project, queue)['shard'] def register(self, queue, project=None): """Register a new queue in the shard catalog. @@ -413,6 +402,7 @@ class Catalog(object): self._catalogue_ctrl.insert(project, queue, shard['name']) + @_shard_id.purges def deregister(self, queue, project=None): """Removes a queue from the shard catalog. @@ -425,7 +415,6 @@ class Catalog(object): None for the "global" or "generic" project. :type project: six.text_type """ - self._invalidate_cached_id(queue, project) self._catalogue_ctrl.delete(project, queue) def lookup(self, queue, project=None): diff --git a/marconi/queues/transport/base.py b/marconi/queues/transport/base.py index a6655c923..ea78e3d4b 100644 --- a/marconi/queues/transport/base.py +++ b/marconi/queues/transport/base.py @@ -41,7 +41,7 @@ class DriverBase(object): :param storage: The storage driver :type storage: marconi.queues.storage.base.DataDriverBase :param cache: caching object - :type cache: marconi.common.cache.backends.BaseCache + :type cache: marconi.openstack.common.cache.backends.BaseCache :param control: Storage driver to handle the control plane :type control: marconi.queues.storage.base.ControlDriverBase """ diff --git a/marconi/tests/common/test_cache.py b/marconi/tests/common/test_cache.py deleted file mode 100644 index 5ab082bac..000000000 --- a/marconi/tests/common/test_cache.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright (c) 2013 Rackspace, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy -# of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations under -# the License. - -import traceback - -from marconi import tests as testing - - -class TestCache(testing.TestBase): - - def test_import(self): - try: - from marconi.common.cache._backends import memcached - from marconi.common.cache._backends import memory - from marconi.common.cache import backends - from marconi.common.cache import cache - - except ImportError as ex: - self.fail(traceback.format_exc(ex)) - - # Avoid pyflakes warnings - cache = cache - backends = backends - memory = memory - memcached = memcached diff --git a/tests/unit/common/test_decorators.py b/tests/unit/common/test_decorators.py index 936b94b6a..20bb21fce 100644 --- a/tests/unit/common/test_decorators.py +++ b/tests/unit/common/test_decorators.py @@ -13,17 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import msgpack +from oslo.config import cfg + from marconi.common import decorators +from marconi.openstack.common.cache import cache as oslo_cache from marconi.tests import base class TestDecorators(base.TestBase): - def test_cached_getattr(self): + def test_memoized_getattr(self): class TestClass(object): - @decorators.cached_getattr + @decorators.memoized_getattr def __getattr__(self, name): return name @@ -31,3 +35,104 @@ class TestDecorators(base.TestBase): result = instance.testing self.assertEqual(result, 'testing') self.assertIn('testing', instance.__dict__) + + def test_cached(self): + conf = cfg.ConfigOpts() + oslo_cache.register_oslo_configs(conf) + cache = oslo_cache.get_cache(conf.cache_url) + + sample_project = { + u'name': u'Cats Abound', + u'bits': b'\x80\x81\x82\x83\x84', + b'key': u'Value. \x80', + } + + def create_key(user, project=None): + return user + ':' + str(project) + + class TestClass(object): + + def __init__(self, cache): + self._cache = cache + self.project_gets = 0 + self.project_dels = 0 + + @decorators.caches(create_key, 60) + def get_project(self, user, project=None): + self.project_gets += 1 + return sample_project + + @get_project.purges + def del_project(self, user, project=None): + self.project_dels += 1 + + instance = TestClass(cache) + + args = ('23', 'cats') + + project = instance.get_project(*args) + self.assertEqual(project, sample_project) + self.assertEqual(instance.project_gets, 1) + + # Should be in the cache now. + project = msgpack.unpackb(cache.get(create_key(*args)), + encoding='utf-8') + self.assertEqual(project, sample_project) + + # Should read from the cache this time (counter will not + # be incremented). + project = instance.get_project(*args) + self.assertEqual(project, sample_project) + self.assertEqual(instance.project_gets, 1) + + # Use kwargs this time + instance.del_project('23', project='cats') + self.assertEqual(instance.project_dels, 1) + + # Should be a cache miss since we purged (above) + project = instance.get_project(*args) + self.assertEqual(instance.project_gets, 2) + + def test_cached_with_cond(self): + conf = cfg.ConfigOpts() + oslo_cache.register_oslo_configs(conf) + cache = oslo_cache.get_cache(conf.cache_url) + + class TestClass(object): + + def __init__(self, cache): + self._cache = cache + self.user_gets = 0 + + @decorators.caches(lambda x: x, 60, lambda v: v != 'kgriffs') + def get_user(self, name): + self.user_gets += 1 + return name + + instance = TestClass(cache) + + name = 'malini' + + user = instance.get_user(name) + self.assertEqual(user, name) + self.assertEqual(instance.user_gets, 1) + + # Should be in the cache now. + user = msgpack.unpackb(cache.get(name)) + self.assertEqual(user, name) + + # Should read from the cache this time (counter will not + # be incremented). + user = instance.get_user(name) + self.assertEqual(user, name) + self.assertEqual(instance.user_gets, 1) + + # Won't go into the cache because of cond + name = 'kgriffs' + for i in range(3): + user = instance.get_user(name) + + self.assertEqual(cache.get(name), None) + + self.assertEqual(user, name) + self.assertEqual(instance.user_gets, 2 + i)