feat: Storage sharding foundation

This patch provides the plumbing for implementing storage
sharding across multiple backends. Sharding is agnostic to
storage driver type and transport type. The new feature is
optional, and disabled by default.

The design eschews placing any kind of sharding reverse proxy
in the network, allowing the storage drivers to continue
communicating directly with their respective backends.

Sharding can be enabled by setting the global "sharding"
option to True. Future patches will add a sharding section to
the config that can be used to tweak the way sharding works when
it is enabled.

Storage drivers are managed by a Catalog class. The Catalog is
responsible for registering and deregistering queues in the
catalog backend, and for looking up an appropriate driver,
according to which shard a particular queue has been assigned.

In the future, this design will make it straightforward to map
individual queues to different storage backends, according to user
preference.

FWIW, I considered enabling sharding by inserting the routing driver
as the last stage in the storage pipeline. However, it felt like
a hack for the following reasons:

  * Doing so orphaned the regular, solitary driver that was
    still always loaded at the end of the pipeline.
  * Since the bootstrap was not aware of the sharding driver,
    it could not be used to provide setup, so the catalog
    object had to be turned into a singleton and options
    had to always be loaded from the global config.
  * The driver would have to be added to each controller
    pipeline, and would have to always be the last stage in
    the pipeline. Introducing a simple "sharded" boolean option
    seemed to be a more straightforward, less error-prone way
    for operators to enable sharding.

Partially-Implements: blueprint storage-sharding
Change-Id: I5190211e81fe4acd311b2cfdd0bae806cc3fec81
This commit is contained in:
kgriffs 2013-10-14 10:02:57 -05:00
parent 572a6296e0
commit e0892978cd
9 changed files with 349 additions and 11 deletions

View File

@ -14,6 +14,9 @@ log_file = /var/log/marconi/queues.log
;auth_strategy =
# Set to True to enable sharding across multiple storage backends
;sharding = False
# ================= Syslog Options ============================
# Send logs to syslog (/dev/log) instead of to file specified

View File

@ -20,10 +20,17 @@ from marconi.common import decorators
from marconi.common import exceptions
from marconi.openstack.common import log
from marconi.queues.storage import pipeline
from marconi.queues.storage import sharding
from marconi.queues.storage import utils as storage_utils
from marconi.queues import transport # NOQA
LOG = log.getLogger(__name__)
_GENERAL_OPTIONS = [
cfg.BoolOpt('sharding', default=False,
help='Enable sharding across multiple storage backends'),
]
_DRIVER_OPTIONS = [
cfg.StrOpt('transport', default='wsgi',
help='Transport driver to use'),
@ -47,6 +54,7 @@ class Bootstrap(object):
default_file = [config_file]
self.conf = cfg.ConfigOpts()
self.conf.register_opts(_GENERAL_OPTIONS)
self.conf.register_opts(_DRIVER_OPTIONS, group=_DRIVER_GROUP)
self.driver_conf = self.conf[_DRIVER_GROUP]
@ -57,18 +65,16 @@ class Bootstrap(object):
@decorators.lazy_property(write=False)
def storage(self):
storage_name = self.driver_conf.storage
LOG.debug(_(u'Loading storage driver: ') + storage_name)
LOG.debug(_(u'Loading storage driver'))
try:
mgr = driver.DriverManager('marconi.queues.storage',
storage_name,
invoke_on_load=True,
invoke_args=[self.conf])
return pipeline.Driver(self.conf, mgr.driver)
except RuntimeError as exc:
LOG.exception(exc)
raise exceptions.InvalidDriver(exc)
if self.conf.sharding:
LOG.debug(_(u'Storage sharding enabled'))
storage_driver = sharding.Driver(self.conf)
else:
storage_driver = storage_utils.load_storage_driver(self.conf)
LOG.debug(_(u'Loading storage pipeline'))
return pipeline.Driver(self.conf, storage_driver)
@decorators.lazy_property(write=False)
def transport(self):

View File

@ -0,0 +1,205 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from marconi.common import decorators
from marconi.queues import storage
from marconi.queues.storage import utils
_CATALOG_OPTIONS = [
cfg.IntOpt('storage', default='sqlite',
help='Catalog storage driver'),
]
_CATALOG_GROUP = 'queues:sharding:catalog'
class Driver(storage.DriverBase):
"""Sharding meta-driver for routing requests to multiple backends.
:param storage_conf: Ignored, since this is a meta-driver
:param catalog_conf: Options pertaining to the shard catalog
"""
def __init__(self, conf):
super(Driver, self).__init__(conf)
self._shard_catalog = Catalog(conf)
@decorators.lazy_property(write=False)
def queue_controller(self):
return QueueController(self._shard_catalog)
@decorators.lazy_property(write=False)
def message_controller(self):
return MessageController(self._shard_catalog)
@decorators.lazy_property(write=False)
def claim_controller(self):
return ClaimController(self._shard_catalog)
class RoutingController(storage.base.ControllerBase):
"""Routes operations to the appropriate shard.
This controller stands in for a regular storage controller,
routing operations to a driver instance that represents
the shard to which the queue has been assigned.
Do not instantiate this class directly; use one of the
more specific child classes instead.
"""
_resource_name = None
def __init__(self, shard_catalog):
super(RoutingController, self).__init__(None)
self._ctrl_property_name = self._resource_name + '_controller'
self._shard_catalog = shard_catalog
@decorators.cached_getattr
def __getattr__(self, name):
# NOTE(kgriffs): Use a closure trick to avoid
# some attr lookups each time foward() is called.
lookup = self._shard_catalog.lookup
# NOTE(kgriffs): Assume that every controller method
# that is exposed to the transport declares queue name
# as its first arg. The only exception to this
# is QueueController.list
def forward(queue, *args, **kwargs):
# NOTE(kgriffs): Using .get since 'project' is an
# optional argument.
storage = lookup(queue, kwargs.get('project'))
target_ctrl = getattr(storage, self._ctrl_property_name)
return getattr(target_ctrl, name)(queue, *args, **kwargs)
return forward
class QueueController(RoutingController):
"""Controller to facilitate special processing for queue operations."""
_resource_name = 'queue'
def __init__(self, shard_catalog):
super(QueueController, self).__init__(shard_catalog)
def list(self, project=None, marker=None,
limit=None, detailed=False):
# TODO(kgriffs): SHARDING - Query all shards and merge
# the results, then return the resulting list.
# TODO(kgriffs): Remove this placeholder code - it is
# only here to make tests pass in the short term!
target = self._shard_catalog.lookup(None, project).queue_controller
return target.list(project=project, marker=marker,
limit=limit, detailed=detailed)
def create(self, name, project=None):
self._shard_catalog.register(name, project)
target = self._shard_catalog.lookup(name, project).queue_controller
return target.create(name, project)
def delete(self, name, project=None):
self._shard_catalog.deregister(name, project)
target = self._shard_catalog.lookup(name, project).queue_controller
return target.delete(name, project)
class MessageController(RoutingController):
_resource_name = 'message'
class ClaimController(RoutingController):
_resource_name = 'claim'
class Catalog(object):
"""Represents the mapping between queues and shard drivers."""
def __init__(self, conf):
self._shards = {}
self._conf = conf
self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP)
self._catalog_conf = self._conf[_CATALOG_GROUP]
def _init_shard(self, shard_id):
# TODO(kgriffs): SHARDING - Read options from catalog backend
conf = cfg.ConfigOpts()
options = [
cfg.StrOpt('storage', default='sqlite')
]
conf.register_opts(options, group='queues:drivers')
return utils.load_storage_driver(conf)
def register(self, queue, project=None):
"""Register a new queue in the shard catalog.
This method should be called whenever a new queue is being
created, and will create an entry in the shard catalog for
the given queue.
After using this method to register the queue in the
catalog, the caller should call `lookup()` to get a reference
to a storage driver which will allow interacting with the
queue's assigned backend shard.
:param queue: Name of the new queue to assign to a shard
:param project: Project to which the queue belongs, or
None for the "global" or "generic" project.
"""
# TODO(kgriffs): SHARDING - Implement this!
pass
def deregister(self, queue, project=None):
"""Removes a queue from the shard catalog.
Call this method after successfully deleting it from a
backend shard.
"""
# TODO(kgriffs): SHARDING - Implement this!
pass
def lookup(self, queue, project=None):
"""Lookup a shard driver for the given queue and project.
:param queue: Name of the queue for which to find a shard
:param project: Project to which the queue belongs, or
None to specify the "global" or "generic" project.
:returns: A storage driver instance for the appropriate shard. If
the driver does not exist yet, it is created and cached.
"""
# TODO(kgriffs): SHARDING - Raise an exception if the queue
# does not have a mapping (it does not exist).
# TODO(kgriffs): SHARDING - Get ID from the catalog backend
shard_id = '[insert_id]'
try:
shard = self._shards[shard_id]
except KeyError:
self._shards[shard_id] = shard = self._init_shard(shard_id)
return shard

View File

@ -0,0 +1,43 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from stevedore import driver
from marconi.common import exceptions
from marconi.openstack.common import log
LOG = log.getLogger(__name__)
def load_storage_driver(conf):
"""Loads a storage driver and returns it.
The driver's initializer will be passed conf as its only arg.
:param conf: Configuration instance to use for loading the
driver. Must include a 'queues:drivers' group.
"""
try:
mgr = driver.DriverManager('marconi.queues.storage',
conf['queues:drivers'].storage,
invoke_on_load=True,
invoke_args=[conf])
return mgr.driver
except RuntimeError as exc:
LOG.exception(exc)
raise exceptions.InvalidDriver(exc)

View File

@ -0,0 +1,10 @@
[DEFAULT]
sharding = True
[queues:drivers]
transport = wsgi
storage = mongodb
[queues:drivers:storage:mongodb]
uri = mongodb://127.0.0.1:27017
database = marconi_test

View File

@ -0,0 +1,6 @@
[DEFAULT]
sharding = True
[queues:drivers]
transport = wsgi
storage = sqlite

View File

@ -0,0 +1,43 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from marconi.queues.storage import sharding
from marconi.queues.storage import sqlite
from marconi.tests import base
class TestShardCatalog(base.TestBase):
def test_lookup(self):
# TODO(kgriffs): SHARDING - configure sharding to use an in-memory
# backend store, and register the queues we are going to look up.
conf_file = 'etc/wsgi_sqlite_sharded.conf'
conf = cfg.ConfigOpts()
conf(default_config_files=[conf_file])
lookup = sharding.Catalog(conf).lookup
storage = lookup('q1', '123456')
self.assertIsInstance(storage, sqlite.Driver)
storage = lookup('q2', '123456')
self.assertIsInstance(storage, sqlite.Driver)
storage = lookup('g1', None)
self.assertIsInstance(storage, sqlite.Driver)

View File

@ -419,6 +419,11 @@ class MessagesSQLiteTests(MessagesBaseTest):
config_filename = 'wsgi_sqlite.conf'
class MessagesSQLiteShardedTests(MessagesBaseTest):
config_filename = 'wsgi_sqlite_sharded.conf'
@testing.requires_mongodb
class MessagesMongoDBTests(MessagesBaseTest):
@ -428,6 +433,15 @@ class MessagesMongoDBTests(MessagesBaseTest):
super(MessagesMongoDBTests, self).setUp()
@testing.requires_mongodb
class MessagesMongoDBShardedTests(MessagesBaseTest):
config_filename = 'wsgi_mongodb_sharded.conf'
def setUp(self):
super(MessagesMongoDBShardedTests, self).setUp()
class MessagesFaultyDriverTests(base.TestBaseFaulty):
config_filename = 'wsgi_faulty.conf'

View File

@ -18,6 +18,7 @@ from oslo.config import cfg
from marconi.common import exceptions
import marconi.queues
from marconi.queues.storage import pipeline
from marconi.queues.storage import sharding
from marconi.queues.storage import sqlite
from marconi.queues.transport import wsgi
from marconi.tests import base
@ -40,6 +41,13 @@ class TestBootstrap(base.TestBase):
self.assertIsInstance(bootstrap.storage, pipeline.Driver)
self.assertIsInstance(bootstrap.storage._storage, sqlite.Driver)
def test_storage_sqlite_sharded(self):
"""Makes sure we can load the shard driver."""
conf_file = 'etc/wsgi_sqlite_sharded.conf'
bootstrap = marconi.Bootstrap(conf_file)
self.assertIsInstance(bootstrap.storage._storage, sharding.Driver)
def test_transport_invalid(self):
conf_file = 'etc/drivers_transport_invalid.conf'
bootstrap = marconi.Bootstrap(conf_file)