feat(sharding): Cache shard mappings

This patch modifies sharding.Catalog so that it caches the mapping
between (project, queue) ===> shard. This reduces latency for
all data plane operations when sharding is enabled.

Change-Id: I54714c4e6c0ed78ac302ce86aed9bb72b200052b
Partially-Implements: blueprint storage-sharding
This commit is contained in:
kgriffs 2013-11-01 12:17:37 -05:00
parent ecce97fd03
commit 52e4201775
4 changed files with 72 additions and 17 deletions

View File

@ -23,10 +23,13 @@ import six
from marconi.common import decorators
from marconi.common.storage import select
from marconi.common import utils as common_utils
from marconi.openstack.common import log
from marconi.queues import storage
from marconi.queues.storage import errors
from marconi.queues.storage import utils
LOG = log.getLogger(__name__)
_CATALOG_OPTIONS = [
cfg.IntOpt('storage', default='sqlite',
help='Catalog storage driver'),
@ -34,6 +37,24 @@ _CATALOG_OPTIONS = [
_CATALOG_GROUP = 'sharding:catalog'
# NOTE(kgriffs): E.g.: 'marconi-sharding:5083853/my-queue'
_SHARD_CACHE_PREFIX = 'sharding:'
# TODO(kgriffs): If a queue is migrated, everyone's
# caches need to have the relevant entry invalidated
# before "unfreezing" the queue, rather than waiting
# on the TTL.
#
# TODO(kgriffs): Make configurable?
_SHARD_CACHE_TTL = 10
def _shard_cache_key(queue, project=None):
# NOTE(kgriffs): Use string concatenation for performance,
# also put project first since it is guaranteed to be
# unique, which should reduce lookup time.
return _SHARD_CACHE_PREFIX + str(project) + '/' + queue
class DataDriver(storage.DataDriverBase):
"""Sharding meta-driver for routing requests to multiple backends.
@ -361,6 +382,30 @@ class Catalog(object):
conf.register_opts(storage_opts, group=storage_group)
return utils.load_storage_driver(conf, self._cache)
def _shard_id(self, queue, project=None):
"""Get the ID for the shard assigned to the given queue.
:param queue: name of the queue
:param project: project to which the queue belongs
:returns: shard id
:raises: `errors.QueueNotMapped`
"""
cache_key = _shard_cache_key(queue, project)
shard_id = self._cache.get(cache_key)
if shard_id is None:
shard_id = self._catalogue_ctrl.get(project, queue)['shard']
if not self._cache.set(cache_key, shard_id, _SHARD_CACHE_TTL):
LOG.warn('Failed to cache shard ID')
return shard_id
def _invalidate_cached_id(self, queue, project=None):
self._cache.unset(_shard_cache_key(queue, project))
def register(self, queue, project=None):
"""Register a new queue in the shard catalog.
@ -380,12 +425,16 @@ class Catalog(object):
:type project: six.text_type
:raises: NoShardFound
"""
# NOTE(cpp-cabrera): only register a queue if the entry
# doesn't exist
if not self._catalogue_ctrl.exists(project, queue):
# NOTE(cpp-cabrera): limit=0 implies unlimited - select from
# all shards
shard = select.weighted(self._shards_ctrl.list(limit=0))
if not shard:
raise errors.NoShardFound()
raise errors.NoShardsFound()
self._catalogue_ctrl.insert(project, queue, shard['name'])
def deregister(self, queue, project=None):
@ -400,7 +449,7 @@ class Catalog(object):
None for the "global" or "generic" project.
:type project: six.text_type
"""
# TODO(cpp-cabrera): invalidate cache here
self._invalidate_cached_id(queue, project)
self._catalogue_ctrl.delete(project, queue)
def lookup(self, queue, project=None):
@ -411,14 +460,20 @@ class Catalog(object):
None to specify the "global" or "generic" project.
:returns: A storage driver instance for the appropriate shard. If
the driver does not exist yet, it is created and cached.
the driver does not exist yet, it is created and cached. If the
queue is not mapped, returns None.
:rtype: Maybe DataDriver
"""
# TODO(cpp-cabrera): add caching lookup here
try:
shard_id = self._catalogue_ctrl.get(project, queue)['shard']
except errors.QueueNotMapped:
shard_id = self._shard_id(queue, project)
except errors.QueueNotMapped as ex:
LOG.debug(ex)
# NOTE(kgriffs): Return `None`, rather than letting the
# exception bubble up, so that the higher layer doesn't
# have to duplicate the try..except..log code all over
# the place.
return None
return self.get_driver(shard_id)
@ -432,9 +487,10 @@ class Catalog(object):
:rtype: marconi.queues.storage.base.DataDriver
"""
# NOTE(cpp-cabrera): cache storage driver connection
try:
return self._drivers[shard_id]
except KeyError:
# NOTE(cpp-cabrera): cache storage driver connection
self._drivers[shard_id] = self._init_driver(shard_id)
return self._drivers[shard_id]

View File

@ -28,13 +28,13 @@ from marconi import tests as testing
# TODO(cpp-cabrera): it would be wonderful to refactor this unit test
# so that it could use multiple control storage backends once those
# have shards/catalogue implementations.
class TestShardCatalog(testing.TestBase):
@testing.requires_mongodb
class ShardCatalogTest(testing.TestBase):
config_file = 'wsgi_mongodb_sharded.conf'
@testing.requires_mongodb
def setUp(self):
super(TestShardCatalog, self).setUp()
super(ShardCatalogTest, self).setUp()
self.conf.register_opts([cfg.StrOpt('storage')],
group='drivers')
@ -56,7 +56,7 @@ class TestShardCatalog(testing.TestBase):
def tearDown(self):
self.catalogue_ctrl.drop_all()
self.shards_ctrl.drop_all()
super(TestShardCatalog, self).tearDown()
super(ShardCatalogTest, self).tearDown()
def test_lookup_loads_correct_driver(self):
storage = self.catalog.lookup(self.queue, self.project)

View File

@ -23,14 +23,13 @@ from marconi.common.cache import cache as oslo_cache
from marconi.queues.storage import sharding
from marconi.queues.storage import utils
from marconi import tests as testing
from marconi.tests import base
class TestShardQueues(base.TestBase):
@testing.requires_mongodb
class ShardQueuesTest(testing.TestBase):
@testing.requires_mongodb
def setUp(self):
super(TestShardQueues, self).setUp()
super(ShardQueuesTest, self).setUp()
conf = self.load_conf('wsgi_mongodb_sharded.conf')
conf.register_opts([cfg.StrOpt('storage')],
@ -48,7 +47,7 @@ class TestShardQueues(base.TestBase):
def tearDown(self):
self.shards_ctrl.drop_all()
super(TestShardQueues, self).tearDown()
super(ShardQueuesTest, self).tearDown()
def test_health(self):
health = self.driver.is_alive()

View File

@ -315,11 +315,11 @@ class QueueLifecycleBaseTest(base.TestBase):
self.assertEqual(self.srmock.status, falcon.HTTP_204)
@testing.requires_mongodb
class QueueLifecycleMongoDBTests(QueueLifecycleBaseTest):
config_file = 'wsgi_mongodb.conf'
@testing.requires_mongodb
def setUp(self):
super(QueueLifecycleMongoDBTests, self).setUp()