From 52e4201775423154fd68ec99f2e7f7819e11cf3d Mon Sep 17 00:00:00 2001 From: kgriffs <kurt.griffiths@rackspace.com> Date: Fri, 1 Nov 2013 12:17:37 -0500 Subject: [PATCH] feat(sharding): Cache shard mappings This patch modifies sharding.Catalog so that it caches the mapping between (project, queue) ===> shard. This reduces latency for all data plane operations when sharding is enabled. Change-Id: I54714c4e6c0ed78ac302ce86aed9bb72b200052b Partially-Implements: blueprint storage-sharding --- marconi/queues/storage/sharding.py | 70 +++++++++++++++++-- .../unit/queues/storage/test_shard_catalog.py | 8 +-- .../unit/queues/storage/test_shard_queues.py | 9 ++- .../transport/wsgi/test_queue_lifecycle.py | 2 +- 4 files changed, 72 insertions(+), 17 deletions(-) diff --git a/marconi/queues/storage/sharding.py b/marconi/queues/storage/sharding.py index 76a15bc5d..40c8a0705 100644 --- a/marconi/queues/storage/sharding.py +++ b/marconi/queues/storage/sharding.py @@ -23,10 +23,13 @@ import six from marconi.common import decorators from marconi.common.storage import select from marconi.common import utils as common_utils +from marconi.openstack.common import log from marconi.queues import storage from marconi.queues.storage import errors from marconi.queues.storage import utils +LOG = log.getLogger(__name__) + _CATALOG_OPTIONS = [ cfg.IntOpt('storage', default='sqlite', help='Catalog storage driver'), @@ -34,6 +37,24 @@ _CATALOG_OPTIONS = [ _CATALOG_GROUP = 'sharding:catalog' +# NOTE(kgriffs): E.g.: 'marconi-sharding:5083853/my-queue' +_SHARD_CACHE_PREFIX = 'sharding:' + +# TODO(kgriffs): If a queue is migrated, everyone's +# caches need to have the relevant entry invalidated +# before "unfreezing" the queue, rather than waiting +# on the TTL. +# +# TODO(kgriffs): Make configurable? +_SHARD_CACHE_TTL = 10 + + +def _shard_cache_key(queue, project=None): + # NOTE(kgriffs): Use string concatenation for performance, + # also put project first since it is guaranteed to be + # unique, which should reduce lookup time. + return _SHARD_CACHE_PREFIX + str(project) + '/' + queue + class DataDriver(storage.DataDriverBase): """Sharding meta-driver for routing requests to multiple backends. @@ -361,6 +382,30 @@ class Catalog(object): conf.register_opts(storage_opts, group=storage_group) return utils.load_storage_driver(conf, self._cache) + def _shard_id(self, queue, project=None): + """Get the ID for the shard assigned to the given queue. + + :param queue: name of the queue + :param project: project to which the queue belongs + + :returns: shard id + + :raises: `errors.QueueNotMapped` + """ + cache_key = _shard_cache_key(queue, project) + shard_id = self._cache.get(cache_key) + + if shard_id is None: + shard_id = self._catalogue_ctrl.get(project, queue)['shard'] + + if not self._cache.set(cache_key, shard_id, _SHARD_CACHE_TTL): + LOG.warn('Failed to cache shard ID') + + return shard_id + + def _invalidate_cached_id(self, queue, project=None): + self._cache.unset(_shard_cache_key(queue, project)) + def register(self, queue, project=None): """Register a new queue in the shard catalog. @@ -380,12 +425,16 @@ class Catalog(object): :type project: six.text_type :raises: NoShardFound """ + # NOTE(cpp-cabrera): only register a queue if the entry + # doesn't exist if not self._catalogue_ctrl.exists(project, queue): # NOTE(cpp-cabrera): limit=0 implies unlimited - select from # all shards shard = select.weighted(self._shards_ctrl.list(limit=0)) + if not shard: - raise errors.NoShardFound() + raise errors.NoShardsFound() + self._catalogue_ctrl.insert(project, queue, shard['name']) def deregister(self, queue, project=None): @@ -400,7 +449,7 @@ class Catalog(object): None for the "global" or "generic" project. :type project: six.text_type """ - # TODO(cpp-cabrera): invalidate cache here + self._invalidate_cached_id(queue, project) self._catalogue_ctrl.delete(project, queue) def lookup(self, queue, project=None): @@ -411,14 +460,20 @@ class Catalog(object): None to specify the "global" or "generic" project. :returns: A storage driver instance for the appropriate shard. If - the driver does not exist yet, it is created and cached. + the driver does not exist yet, it is created and cached. If the + queue is not mapped, returns None. :rtype: Maybe DataDriver """ - # TODO(cpp-cabrera): add caching lookup here try: - shard_id = self._catalogue_ctrl.get(project, queue)['shard'] - except errors.QueueNotMapped: + shard_id = self._shard_id(queue, project) + except errors.QueueNotMapped as ex: + LOG.debug(ex) + + # NOTE(kgriffs): Return `None`, rather than letting the + # exception bubble up, so that the higher layer doesn't + # have to duplicate the try..except..log code all over + # the place. return None return self.get_driver(shard_id) @@ -432,9 +487,10 @@ class Catalog(object): :rtype: marconi.queues.storage.base.DataDriver """ - # NOTE(cpp-cabrera): cache storage driver connection try: return self._drivers[shard_id] except KeyError: + # NOTE(cpp-cabrera): cache storage driver connection self._drivers[shard_id] = self._init_driver(shard_id) + return self._drivers[shard_id] diff --git a/tests/unit/queues/storage/test_shard_catalog.py b/tests/unit/queues/storage/test_shard_catalog.py index 507f60943..f30eb6ef4 100644 --- a/tests/unit/queues/storage/test_shard_catalog.py +++ b/tests/unit/queues/storage/test_shard_catalog.py @@ -28,13 +28,13 @@ from marconi import tests as testing # TODO(cpp-cabrera): it would be wonderful to refactor this unit test # so that it could use multiple control storage backends once those # have shards/catalogue implementations. -class TestShardCatalog(testing.TestBase): +@testing.requires_mongodb +class ShardCatalogTest(testing.TestBase): config_file = 'wsgi_mongodb_sharded.conf' - @testing.requires_mongodb def setUp(self): - super(TestShardCatalog, self).setUp() + super(ShardCatalogTest, self).setUp() self.conf.register_opts([cfg.StrOpt('storage')], group='drivers') @@ -56,7 +56,7 @@ class TestShardCatalog(testing.TestBase): def tearDown(self): self.catalogue_ctrl.drop_all() self.shards_ctrl.drop_all() - super(TestShardCatalog, self).tearDown() + super(ShardCatalogTest, self).tearDown() def test_lookup_loads_correct_driver(self): storage = self.catalog.lookup(self.queue, self.project) diff --git a/tests/unit/queues/storage/test_shard_queues.py b/tests/unit/queues/storage/test_shard_queues.py index a46869097..f5a6aa485 100644 --- a/tests/unit/queues/storage/test_shard_queues.py +++ b/tests/unit/queues/storage/test_shard_queues.py @@ -23,14 +23,13 @@ from marconi.common.cache import cache as oslo_cache from marconi.queues.storage import sharding from marconi.queues.storage import utils from marconi import tests as testing -from marconi.tests import base -class TestShardQueues(base.TestBase): +@testing.requires_mongodb +class ShardQueuesTest(testing.TestBase): - @testing.requires_mongodb def setUp(self): - super(TestShardQueues, self).setUp() + super(ShardQueuesTest, self).setUp() conf = self.load_conf('wsgi_mongodb_sharded.conf') conf.register_opts([cfg.StrOpt('storage')], @@ -48,7 +47,7 @@ class TestShardQueues(base.TestBase): def tearDown(self): self.shards_ctrl.drop_all() - super(TestShardQueues, self).tearDown() + super(ShardQueuesTest, self).tearDown() def test_health(self): health = self.driver.is_alive() diff --git a/tests/unit/queues/transport/wsgi/test_queue_lifecycle.py b/tests/unit/queues/transport/wsgi/test_queue_lifecycle.py index 32e51e3ca..2e7b4a815 100644 --- a/tests/unit/queues/transport/wsgi/test_queue_lifecycle.py +++ b/tests/unit/queues/transport/wsgi/test_queue_lifecycle.py @@ -315,11 +315,11 @@ class QueueLifecycleBaseTest(base.TestBase): self.assertEqual(self.srmock.status, falcon.HTTP_204) +@testing.requires_mongodb class QueueLifecycleMongoDBTests(QueueLifecycleBaseTest): config_file = 'wsgi_mongodb.conf' - @testing.requires_mongodb def setUp(self): super(QueueLifecycleMongoDBTests, self).setUp()