feat: shards mongodb driver + tests

This patch adds shard management capability to the queues mongodb
driver.

The storage API is also correctly split into an control and data
interfaces. The data drivers are loaded by default. The control
drivers are loaded as needed by the transport layer.

Unit tests are also provided to verify that the driver (and future
drivers) work as expected.

Change-Id: Iad034a429a763c9a2ce161f05c928b090ab58944
Partially-implements: blueprint storage-sharding
Partially-Closes: 1241686
This commit is contained in:
Alejandro Cabrera 2013-10-28 11:17:28 -04:00
parent 142c7ae0d6
commit 9c7036ff4e
17 changed files with 296 additions and 31 deletions

38
marconi/common/utils.py Normal file
View File

@ -0,0 +1,38 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""utils: general-purpose utilities."""
import six
def fields(d, names, pred=lambda x: True,
key_transform=lambda x: x, value_transform=lambda x: x):
"""Returns the entries in this dictionary with keys appearing in names.
:type d: dict
:type names: [a]
:param pred: a filter that is applied to the values of the dictionary.
:type pred: (a -> bool)
:param key_transform: a transform to apply to the key before returning it
:type key_transform: a -> a
:param value_transform: a transform to apply to the value before
returning it
:type value_transform: a -> a
:rtype: dict
"""
return dict((key_transform(k), value_transform(v))
for k, v in six.iteritems(d)
if k in names and pred(v))

View File

@ -58,7 +58,7 @@ class Bootstrap(object):
self.driver_conf = self.conf[_DRIVER_GROUP] self.driver_conf = self.conf[_DRIVER_GROUP]
log.setup('marconi') log.setup('marconi')
mode = 'admin' if self.conf.admin_mode else 'public' mode = 'admin' if conf.admin_mode else 'public'
self._transport_type = 'marconi.queues.{0}.transport'.format(mode) self._transport_type = 'marconi.queues.{0}.transport'.format(mode)
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)

View File

@ -9,3 +9,4 @@ DataDriverBase = base.DataDriverBase
ClaimBase = base.ClaimBase ClaimBase = base.ClaimBase
MessageBase = base.MessageBase MessageBase = base.MessageBase
QueueBase = base.QueueBase QueueBase = base.QueueBase
ShardsBase = base.ShardsBase

View File

@ -368,8 +368,19 @@ class ClaimBase(ControllerBase):
raise NotImplementedError raise NotImplementedError
class AdminControllerBase(object):
"""Top-level class for controllers.
:param driver: Instance of the driver
instantiating this controller.
"""
def __init__(self, driver):
self.driver = driver
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class ShardsController(ControllerBase): class ShardsBase(AdminControllerBase):
"""A controller for managing shards.""" """A controller for managing shards."""
@abc.abstractmethod @abc.abstractmethod
@ -404,11 +415,13 @@ class ShardsController(ControllerBase):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def get(self, name): def get(self, name, detailed=False):
"""Returns a single shard entry. """Returns a single shard entry.
:param name: The name of this shard :param name: The name of this shard
:type name: six.text_type :type name: six.text_type
:param detailed: Should the options data be included?
:type detailed: bool
:returns: weight, uri, and options for this shard :returns: weight, uri, and options for this shard
:rtype: {} :rtype: {}
:raises: ShardDoesNotExist if not found :raises: ShardDoesNotExist if not found

View File

@ -28,6 +28,16 @@ from marconi.queues.storage.mongodb import options
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def _connection(conf):
"""MongoDB client connection instance."""
if conf.uri and 'replicaSet' in conf.uri:
MongoClient = pymongo.MongoReplicaSetClient
else:
MongoClient = pymongo.MongoClient
return MongoClient(conf.uri)
class DataDriver(storage.DataDriverBase): class DataDriver(storage.DataDriverBase):
def __init__(self, conf): def __init__(self, conf):
@ -68,14 +78,7 @@ class DataDriver(storage.DataDriverBase):
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def connection(self): def connection(self):
"""MongoDB client connection instance.""" return _connection(self.mongodb_conf)
if self.mongodb_conf.uri and 'replicaSet' in self.mongodb_conf.uri:
MongoClient = pymongo.MongoReplicaSetClient
else:
MongoClient = pymongo.MongoClient
return MongoClient(self.mongodb_conf.uri)
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def queue_controller(self): def queue_controller(self):
@ -100,6 +103,16 @@ class ControlDriver(storage.ControlDriverBase):
self.mongodb_conf = self.conf[options.MONGODB_GROUP] self.mongodb_conf = self.conf[options.MONGODB_GROUP]
@decorators.lazy_property(write=False)
def connection(self):
"""MongoDB client connection instance."""
return _connection(self.mongodb_conf)
@decorators.lazy_property(write=False)
def shards_database(self):
name = self.mongodb_conf.database + '_shards'
return self.connection[name]
@property @property
def shards_controller(self): def shards_controller(self):
return controllers.ShardsController(self) return controllers.ShardsController(self)

View File

@ -14,28 +14,91 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from marconi.queues.storage import base """shards: an implementation of the shard management storage
controller for mongodb.
Schema:
'n': name :: six.text_type
'u': uri :: six.text_type
'w': weight :: int
'o': options :: dict
"""
from marconi.common import utils as common_utils
from marconi.queues.storage import base, exceptions
from marconi.queues.storage.mongodb import utils
SHARDS_INDEX = [
('n', 1)
]
# NOTE(cpp-cabrera): used for get/list operations. There's no need to
# show the marker or the _id - they're implementation details.
OMIT_FIELDS = (('_id', 0),)
class ShardsController(base.ShardsController): def _field_spec(detailed=False):
return dict(OMIT_FIELDS + (() if detailed else (('o', 0),)))
class ShardsController(base.ShardsBase):
def __init__(self, *args, **kwargs):
super(ShardsController, self).__init__(*args, **kwargs)
self._col = self.driver.shards_database.shards
self._col.ensure_index(SHARDS_INDEX,
background=True,
name='shards_name',
unique=True)
@utils.raises_conn_error
def list(self, marker=None, limit=10, detailed=False): def list(self, marker=None, limit=10, detailed=False):
pass query = {}
if marker is not None:
query['n'] = {'$gt': marker}
def get(self, name): return self._col.find(query, fields=_field_spec(detailed),
pass limit=limit)
@utils.raises_conn_error
def get(self, name, detailed=False):
res = self._col.find_one({'n': name},
_field_spec(detailed))
if not res:
raise exceptions.ShardDoesNotExist(name)
return res
@utils.raises_conn_error
def create(self, name, weight, uri, options=None): def create(self, name, weight, uri, options=None):
pass options = {} if options is None else options
self._col.update({'n': name},
{'$set': {'n': name, 'w': weight, 'u': uri,
'o': options}},
upsert=True)
@utils.raises_conn_error
def exists(self, name): def exists(self, name):
pass return self._col.find_one({'n': name}) is not None
@utils.raises_conn_error
def update(self, name, **kwargs): def update(self, name, **kwargs):
pass names = ('uri', 'weight', 'options')
fields = common_utils.fields(kwargs, names,
pred=lambda x: x is not None,
key_transform=lambda x: x[0])
assert fields, '`weight`, `uri`, or `options` not found in kwargs'
res = self._col.update({'n': name},
{'$set': fields},
upsert=False)
if not res['updatedExisting']:
raise exceptions.ShardDoesNotExist(name)
@utils.raises_conn_error
def delete(self, name): def delete(self, name):
pass self._col.remove({'n': name}, w=0)
@utils.raises_conn_error
def drop_all(self): def drop_all(self):
pass self._col.drop()
self._col.ensure_index(SHARDS_INDEX, unique=True)

View File

@ -144,10 +144,14 @@ class Catalog(object):
# TODO(kgriffs): SHARDING - Read options from catalog backend # TODO(kgriffs): SHARDING - Read options from catalog backend
conf = cfg.ConfigOpts() conf = cfg.ConfigOpts()
general_opts = [
cfg.BoolOpt('admin_mode', default=False)
]
options = [ options = [
cfg.StrOpt('storage', default='sqlite') cfg.StrOpt('storage', default='sqlite'),
] ]
conf.register_opts(general_opts)
conf.register_opts(options, group='queues:drivers') conf.register_opts(options, group='queues:drivers')
return utils.load_storage_driver(conf) return utils.load_storage_driver(conf)

View File

@ -17,7 +17,7 @@
from marconi.queues.storage import base from marconi.queues.storage import base
class ShardsController(base.ShardsController): class ShardsController(base.ShardsBase):
def list(self, marker=None, limit=10, detailed=False): def list(self, marker=None, limit=10, detailed=False):
pass pass

View File

@ -32,7 +32,7 @@ def load_storage_driver(conf):
""" """
try: try:
mgr = driver.DriverManager('marconi.queues.storage', mgr = driver.DriverManager('marconi.queues.data.storage',
conf['queues:drivers'].storage, conf['queues:drivers'].storage,
invoke_on_load=True, invoke_on_load=True,
invoke_args=[conf]) invoke_args=[conf])

View File

@ -17,6 +17,9 @@ from marconi.queues import storage
class DataDriver(storage.DataDriverBase): class DataDriver(storage.DataDriverBase):
def __init__(self, conf):
super(DataDriver, self).__init__(conf)
@property @property
def default_options(self): def default_options(self):
return {} return {}
@ -35,9 +38,9 @@ class DataDriver(storage.DataDriverBase):
class ControlDriver(storage.ControlDriverBase): class ControlDriver(storage.ControlDriverBase):
@property
def default_options(self): def __init__(self, conf):
return {} super(ControlDriver, self).__init__(conf)
@property @property
def shards_controller(self): def shards_controller(self):

View File

@ -601,6 +601,117 @@ class ClaimControllerTest(ControllerBaseTest):
project=self.project) project=self.project)
class ShardsControllerTest(ControllerBaseTest):
"""Shards Controller base tests.
NOTE(flaper87): Implementations of this class should
override the tearDown method in order
to clean up storage's state.
"""
controller_base_class = storage.ShardsBase
def setUp(self):
super(ShardsControllerTest, self).setUp()
self.shards_controller = self.driver.shards_controller
# Let's create one shard
self.shard = str(uuid.uuid1())
self.shards_controller.create(self.shard, 100, 'localhost', {})
def tearDown(self):
self.shards_controller.drop_all()
super(ShardsControllerTest, self).tearDown()
def test_create_succeeds(self):
self.shards_controller.create(str(uuid.uuid1()),
100, 'localhost', {})
def test_create_replaces_on_duplicate_insert(self):
name = str(uuid.uuid1())
self.shards_controller.create(name,
100, 'localhost', {})
self.shards_controller.create(name,
111, 'localhost2', {})
entry = self.shards_controller.get(name)
self._shard_expects(entry, xname=name, xweight=111,
xlocation='localhost2')
def _shard_expects(self, shard, xname, xweight, xlocation):
self.assertIn('n', shard)
self.assertEqual(shard['n'], xname)
self.assertIn('w', shard)
self.assertEqual(shard['w'], xweight)
self.assertIn('u', shard)
self.assertEqual(shard['u'], xlocation)
def test_get_returns_expected_content(self):
res = self.shards_controller.get(self.shard)
self._shard_expects(res, self.shard, 100, 'localhost')
self.assertNotIn('o', res)
def test_detailed_get_returns_expected_content(self):
res = self.shards_controller.get(self.shard, detailed=True)
self.assertIn('o', res)
self.assertEqual(res['o'], {})
def test_get_raises_if_not_found(self):
self.assertRaises(storage.exceptions.ShardDoesNotExist,
self.shards_controller.get, 'notexists')
def test_exists(self):
self.assertTrue(self.shards_controller.exists(self.shard))
self.assertFalse(self.shards_controller.exists('notexists'))
def test_update_raises_assertion_error_on_bad_fields(self):
self.assertRaises(AssertionError, self.shards_controller.update,
self.shard)
def test_update_works(self):
self.shards_controller.update(self.shard, weight=101,
uri='redis://localhost',
options={'a': 1})
res = self.shards_controller.get(self.shard, detailed=True)
self._shard_expects(res, self.shard, 101, 'redis://localhost')
self.assertEqual(res['o'], {'a': 1})
def test_delete_works(self):
self.shards_controller.delete(self.shard)
self.assertFalse(self.shards_controller.exists(self.shard))
def test_delete_nonexistent_is_silent(self):
self.shards_controller.delete('nonexisting')
def test_drop_all_leads_to_empty_listing(self):
self.shards_controller.drop_all()
cursor = self.shards_controller.list()
self.assertRaises(StopIteration, next, cursor)
def test_listing_simple(self):
# NOTE(cpp-cabrera): base entry interferes with listing results
self.shards_controller.delete(self.shard)
for i in range(15):
self.shards_controller.create(str(i), i, str(i), {})
res = list(self.shards_controller.list())
self.assertEqual(len(res), 10)
for i, entry in enumerate(res):
self._shard_expects(entry, str(i), i, str(i))
res = list(self.shards_controller.list(limit=5))
self.assertEqual(len(res), 5)
res = next(self.shards_controller.list(marker='3'))
self._shard_expects(res, '4', 4, '4')
res = list(self.shards_controller.list(detailed=True))
self.assertEqual(len(res), 10)
for i, entry in enumerate(res):
self._shard_expects(entry, str(i), i, str(i))
self.assertIn('o', entry)
self.assertEqual(entry['o'], {})
def _insert_fixtures(controller, queue_name, project=None, def _insert_fixtures(controller, queue_name, project=None,
client_uuid=None, num=4, ttl=120): client_uuid=None, num=4, ttl=120):

View File

@ -26,10 +26,14 @@ packages =
console_scripts = console_scripts =
marconi-server = marconi.cmd.server:run marconi-server = marconi.cmd.server:run
marconi.queues.storage = marconi.queues.data.storage =
sqlite = marconi.queues.storage.sqlite.driver:DataDriver sqlite = marconi.queues.storage.sqlite.driver:DataDriver
mongodb = marconi.queues.storage.mongodb.driver:DataDriver mongodb = marconi.queues.storage.mongodb.driver:DataDriver
marconi.queues.control.storage =
sqlite = marconi.queues.storage.sqlite.driver:ControlDriver
mongodb = marconi.queues.storage.mongodb.driver:ControlDriver
marconi.queues.public.transport = marconi.queues.public.transport =
wsgi = marconi.queues.transport.wsgi.public.driver:Driver wsgi = marconi.queues.transport.wsgi.public.driver:Driver

View File

@ -1,6 +1,7 @@
[DEFAULT] [DEFAULT]
debug = False debug = False
verbose = False verbose = False
admin_mode = False
[queues:drivers] [queues:drivers]
transport = wsgi transport = wsgi

View File

@ -1,6 +1,7 @@
[DEFAULT] [DEFAULT]
debug = False debug = False
verbose = False verbose = False
admin_mode = False
[queues:drivers] [queues:drivers]
transport = wsgi transport = wsgi

View File

@ -332,3 +332,16 @@ class MongodbClaimTests(base.ClaimControllerTest):
self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.assertRaises(storage.exceptions.ClaimDoesNotExist,
self.controller.update, self.queue_name, self.controller.update, self.queue_name,
claim_id, {}, project=self.project) claim_id, {}, project=self.project)
@testing.requires_mongodb
class MongodbShardsTests(base.ShardsControllerTest):
driver_class = mongodb.ControlDriver
controller_class = controllers.ShardsController
def setUp(self):
super(MongodbShardsTests, self).setUp()
self.load_conf('wsgi_mongodb.conf')
def tearDown(self):
super(MongodbShardsTests, self).tearDown()

View File

@ -29,7 +29,7 @@ class TestShardCatalog(base.TestBase):
conf_file = 'etc/wsgi_sqlite_sharded.conf' conf_file = 'etc/wsgi_sqlite_sharded.conf'
conf = cfg.ConfigOpts() conf = cfg.ConfigOpts()
conf(default_config_files=[conf_file]) conf(args=[], default_config_files=[conf_file])
lookup = sharding.Catalog(conf).lookup lookup = sharding.Catalog(conf).lookup

View File

@ -25,8 +25,8 @@ from marconi.tests import base
class TestBootstrap(base.TestBase): class TestBootstrap(base.TestBase):
def _bootstrap(self, conf_file): def _bootstrap(self, conf_file):
conf = self.load_conf(conf_file) self.conf = self.load_conf(conf_file)
return bootstrap.Bootstrap(conf) return bootstrap.Bootstrap(self.conf)
def test_storage_invalid(self): def test_storage_invalid(self):
boot = self._bootstrap('etc/drivers_storage_invalid.conf') boot = self._bootstrap('etc/drivers_storage_invalid.conf')