diff --git a/marconi/common/utils.py b/marconi/common/utils.py new file mode 100644 index 000000000..d87010779 --- /dev/null +++ b/marconi/common/utils.py @@ -0,0 +1,38 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""utils: general-purpose utilities.""" + +import six + + +def fields(d, names, pred=lambda x: True, + key_transform=lambda x: x, value_transform=lambda x: x): + """Returns the entries in this dictionary with keys appearing in names. + :type d: dict + :type names: [a] + :param pred: a filter that is applied to the values of the dictionary. + :type pred: (a -> bool) + :param key_transform: a transform to apply to the key before returning it + :type key_transform: a -> a + :param value_transform: a transform to apply to the value before + returning it + :type value_transform: a -> a + :rtype: dict + + """ + return dict((key_transform(k), value_transform(v)) + for k, v in six.iteritems(d) + if k in names and pred(v)) diff --git a/marconi/queues/bootstrap.py b/marconi/queues/bootstrap.py index b257b5f9f..25d15a20a 100644 --- a/marconi/queues/bootstrap.py +++ b/marconi/queues/bootstrap.py @@ -58,7 +58,7 @@ class Bootstrap(object): self.driver_conf = self.conf[_DRIVER_GROUP] log.setup('marconi') - mode = 'admin' if self.conf.admin_mode else 'public' + mode = 'admin' if conf.admin_mode else 'public' self._transport_type = 'marconi.queues.{0}.transport'.format(mode) @decorators.lazy_property(write=False) diff --git a/marconi/queues/storage/__init__.py b/marconi/queues/storage/__init__.py index 7c3f9ddf4..288906768 100644 --- a/marconi/queues/storage/__init__.py +++ b/marconi/queues/storage/__init__.py @@ -9,3 +9,4 @@ DataDriverBase = base.DataDriverBase ClaimBase = base.ClaimBase MessageBase = base.MessageBase QueueBase = base.QueueBase +ShardsBase = base.ShardsBase diff --git a/marconi/queues/storage/base.py b/marconi/queues/storage/base.py index 7e09df333..ce6ee5ec5 100644 --- a/marconi/queues/storage/base.py +++ b/marconi/queues/storage/base.py @@ -368,8 +368,19 @@ class ClaimBase(ControllerBase): raise NotImplementedError +class AdminControllerBase(object): + """Top-level class for controllers. + + :param driver: Instance of the driver + instantiating this controller. + """ + + def __init__(self, driver): + self.driver = driver + + @six.add_metaclass(abc.ABCMeta) -class ShardsController(ControllerBase): +class ShardsBase(AdminControllerBase): """A controller for managing shards.""" @abc.abstractmethod @@ -404,11 +415,13 @@ class ShardsController(ControllerBase): raise NotImplementedError @abc.abstractmethod - def get(self, name): + def get(self, name, detailed=False): """Returns a single shard entry. :param name: The name of this shard :type name: six.text_type + :param detailed: Should the options data be included? + :type detailed: bool :returns: weight, uri, and options for this shard :rtype: {} :raises: ShardDoesNotExist if not found diff --git a/marconi/queues/storage/mongodb/driver.py b/marconi/queues/storage/mongodb/driver.py index 1eb1317e4..0f4b02617 100644 --- a/marconi/queues/storage/mongodb/driver.py +++ b/marconi/queues/storage/mongodb/driver.py @@ -28,6 +28,16 @@ from marconi.queues.storage.mongodb import options LOG = logging.getLogger(__name__) +def _connection(conf): + """MongoDB client connection instance.""" + if conf.uri and 'replicaSet' in conf.uri: + MongoClient = pymongo.MongoReplicaSetClient + else: + MongoClient = pymongo.MongoClient + + return MongoClient(conf.uri) + + class DataDriver(storage.DataDriverBase): def __init__(self, conf): @@ -68,14 +78,7 @@ class DataDriver(storage.DataDriverBase): @decorators.lazy_property(write=False) def connection(self): - """MongoDB client connection instance.""" - - if self.mongodb_conf.uri and 'replicaSet' in self.mongodb_conf.uri: - MongoClient = pymongo.MongoReplicaSetClient - else: - MongoClient = pymongo.MongoClient - - return MongoClient(self.mongodb_conf.uri) + return _connection(self.mongodb_conf) @decorators.lazy_property(write=False) def queue_controller(self): @@ -100,6 +103,16 @@ class ControlDriver(storage.ControlDriverBase): self.mongodb_conf = self.conf[options.MONGODB_GROUP] + @decorators.lazy_property(write=False) + def connection(self): + """MongoDB client connection instance.""" + return _connection(self.mongodb_conf) + + @decorators.lazy_property(write=False) + def shards_database(self): + name = self.mongodb_conf.database + '_shards' + return self.connection[name] + @property def shards_controller(self): return controllers.ShardsController(self) diff --git a/marconi/queues/storage/mongodb/shards.py b/marconi/queues/storage/mongodb/shards.py index a7af06463..7fbe54207 100644 --- a/marconi/queues/storage/mongodb/shards.py +++ b/marconi/queues/storage/mongodb/shards.py @@ -14,28 +14,91 @@ # See the License for the specific language governing permissions and # limitations under the License. -from marconi.queues.storage import base +"""shards: an implementation of the shard management storage +controller for mongodb. + +Schema: + 'n': name :: six.text_type + 'u': uri :: six.text_type + 'w': weight :: int + 'o': options :: dict +""" + +from marconi.common import utils as common_utils +from marconi.queues.storage import base, exceptions +from marconi.queues.storage.mongodb import utils + +SHARDS_INDEX = [ + ('n', 1) +] + +# NOTE(cpp-cabrera): used for get/list operations. There's no need to +# show the marker or the _id - they're implementation details. +OMIT_FIELDS = (('_id', 0),) -class ShardsController(base.ShardsController): +def _field_spec(detailed=False): + return dict(OMIT_FIELDS + (() if detailed else (('o', 0),))) + +class ShardsController(base.ShardsBase): + + def __init__(self, *args, **kwargs): + super(ShardsController, self).__init__(*args, **kwargs) + + self._col = self.driver.shards_database.shards + self._col.ensure_index(SHARDS_INDEX, + background=True, + name='shards_name', + unique=True) + + @utils.raises_conn_error def list(self, marker=None, limit=10, detailed=False): - pass + query = {} + if marker is not None: + query['n'] = {'$gt': marker} - def get(self, name): - pass + return self._col.find(query, fields=_field_spec(detailed), + limit=limit) + @utils.raises_conn_error + def get(self, name, detailed=False): + res = self._col.find_one({'n': name}, + _field_spec(detailed)) + if not res: + raise exceptions.ShardDoesNotExist(name) + return res + + @utils.raises_conn_error def create(self, name, weight, uri, options=None): - pass + options = {} if options is None else options + self._col.update({'n': name}, + {'$set': {'n': name, 'w': weight, 'u': uri, + 'o': options}}, + upsert=True) + @utils.raises_conn_error def exists(self, name): - pass + return self._col.find_one({'n': name}) is not None + @utils.raises_conn_error def update(self, name, **kwargs): - pass + names = ('uri', 'weight', 'options') + fields = common_utils.fields(kwargs, names, + pred=lambda x: x is not None, + key_transform=lambda x: x[0]) + assert fields, '`weight`, `uri`, or `options` not found in kwargs' + res = self._col.update({'n': name}, + {'$set': fields}, + upsert=False) + if not res['updatedExisting']: + raise exceptions.ShardDoesNotExist(name) + @utils.raises_conn_error def delete(self, name): - pass + self._col.remove({'n': name}, w=0) + @utils.raises_conn_error def drop_all(self): - pass + self._col.drop() + self._col.ensure_index(SHARDS_INDEX, unique=True) diff --git a/marconi/queues/storage/sharding.py b/marconi/queues/storage/sharding.py index 291ddc676..67caeb71b 100644 --- a/marconi/queues/storage/sharding.py +++ b/marconi/queues/storage/sharding.py @@ -144,10 +144,14 @@ class Catalog(object): # TODO(kgriffs): SHARDING - Read options from catalog backend conf = cfg.ConfigOpts() + general_opts = [ + cfg.BoolOpt('admin_mode', default=False) + ] options = [ - cfg.StrOpt('storage', default='sqlite') + cfg.StrOpt('storage', default='sqlite'), ] + conf.register_opts(general_opts) conf.register_opts(options, group='queues:drivers') return utils.load_storage_driver(conf) diff --git a/marconi/queues/storage/sqlite/shards.py b/marconi/queues/storage/sqlite/shards.py index a7af06463..9aecd3bc6 100644 --- a/marconi/queues/storage/sqlite/shards.py +++ b/marconi/queues/storage/sqlite/shards.py @@ -17,7 +17,7 @@ from marconi.queues.storage import base -class ShardsController(base.ShardsController): +class ShardsController(base.ShardsBase): def list(self, marker=None, limit=10, detailed=False): pass diff --git a/marconi/queues/storage/utils.py b/marconi/queues/storage/utils.py index e94bf1292..4f55ab53a 100644 --- a/marconi/queues/storage/utils.py +++ b/marconi/queues/storage/utils.py @@ -32,7 +32,7 @@ def load_storage_driver(conf): """ try: - mgr = driver.DriverManager('marconi.queues.storage', + mgr = driver.DriverManager('marconi.queues.data.storage', conf['queues:drivers'].storage, invoke_on_load=True, invoke_args=[conf]) diff --git a/marconi/tests/faulty_storage.py b/marconi/tests/faulty_storage.py index e9464883f..20e1ff859 100644 --- a/marconi/tests/faulty_storage.py +++ b/marconi/tests/faulty_storage.py @@ -17,6 +17,9 @@ from marconi.queues import storage class DataDriver(storage.DataDriverBase): + def __init__(self, conf): + super(DataDriver, self).__init__(conf) + @property def default_options(self): return {} @@ -35,9 +38,9 @@ class DataDriver(storage.DataDriverBase): class ControlDriver(storage.ControlDriverBase): - @property - def default_options(self): - return {} + + def __init__(self, conf): + super(ControlDriver, self).__init__(conf) @property def shards_controller(self): diff --git a/marconi/tests/queues/storage/base.py b/marconi/tests/queues/storage/base.py index 22743ea3a..158aab78d 100644 --- a/marconi/tests/queues/storage/base.py +++ b/marconi/tests/queues/storage/base.py @@ -601,6 +601,117 @@ class ClaimControllerTest(ControllerBaseTest): project=self.project) +class ShardsControllerTest(ControllerBaseTest): + """Shards Controller base tests. + + NOTE(flaper87): Implementations of this class should + override the tearDown method in order + to clean up storage's state. + """ + controller_base_class = storage.ShardsBase + + def setUp(self): + super(ShardsControllerTest, self).setUp() + self.shards_controller = self.driver.shards_controller + + # Let's create one shard + self.shard = str(uuid.uuid1()) + self.shards_controller.create(self.shard, 100, 'localhost', {}) + + def tearDown(self): + self.shards_controller.drop_all() + super(ShardsControllerTest, self).tearDown() + + def test_create_succeeds(self): + self.shards_controller.create(str(uuid.uuid1()), + 100, 'localhost', {}) + + def test_create_replaces_on_duplicate_insert(self): + name = str(uuid.uuid1()) + self.shards_controller.create(name, + 100, 'localhost', {}) + self.shards_controller.create(name, + 111, 'localhost2', {}) + entry = self.shards_controller.get(name) + self._shard_expects(entry, xname=name, xweight=111, + xlocation='localhost2') + + def _shard_expects(self, shard, xname, xweight, xlocation): + self.assertIn('n', shard) + self.assertEqual(shard['n'], xname) + self.assertIn('w', shard) + self.assertEqual(shard['w'], xweight) + self.assertIn('u', shard) + self.assertEqual(shard['u'], xlocation) + + def test_get_returns_expected_content(self): + res = self.shards_controller.get(self.shard) + self._shard_expects(res, self.shard, 100, 'localhost') + self.assertNotIn('o', res) + + def test_detailed_get_returns_expected_content(self): + res = self.shards_controller.get(self.shard, detailed=True) + self.assertIn('o', res) + self.assertEqual(res['o'], {}) + + def test_get_raises_if_not_found(self): + self.assertRaises(storage.exceptions.ShardDoesNotExist, + self.shards_controller.get, 'notexists') + + def test_exists(self): + self.assertTrue(self.shards_controller.exists(self.shard)) + self.assertFalse(self.shards_controller.exists('notexists')) + + def test_update_raises_assertion_error_on_bad_fields(self): + self.assertRaises(AssertionError, self.shards_controller.update, + self.shard) + + def test_update_works(self): + self.shards_controller.update(self.shard, weight=101, + uri='redis://localhost', + options={'a': 1}) + res = self.shards_controller.get(self.shard, detailed=True) + self._shard_expects(res, self.shard, 101, 'redis://localhost') + self.assertEqual(res['o'], {'a': 1}) + + def test_delete_works(self): + self.shards_controller.delete(self.shard) + self.assertFalse(self.shards_controller.exists(self.shard)) + + def test_delete_nonexistent_is_silent(self): + self.shards_controller.delete('nonexisting') + + def test_drop_all_leads_to_empty_listing(self): + self.shards_controller.drop_all() + cursor = self.shards_controller.list() + self.assertRaises(StopIteration, next, cursor) + + def test_listing_simple(self): + # NOTE(cpp-cabrera): base entry interferes with listing results + self.shards_controller.delete(self.shard) + + for i in range(15): + self.shards_controller.create(str(i), i, str(i), {}) + + res = list(self.shards_controller.list()) + self.assertEqual(len(res), 10) + for i, entry in enumerate(res): + self._shard_expects(entry, str(i), i, str(i)) + + res = list(self.shards_controller.list(limit=5)) + self.assertEqual(len(res), 5) + + res = next(self.shards_controller.list(marker='3')) + self._shard_expects(res, '4', 4, '4') + + res = list(self.shards_controller.list(detailed=True)) + self.assertEqual(len(res), 10) + for i, entry in enumerate(res): + self._shard_expects(entry, str(i), i, str(i)) + self.assertIn('o', entry) + self.assertEqual(entry['o'], {}) + + def _insert_fixtures(controller, queue_name, project=None, client_uuid=None, num=4, ttl=120): diff --git a/setup.cfg b/setup.cfg index 8b300ff06..72f73b2a5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -26,10 +26,14 @@ packages = console_scripts = marconi-server = marconi.cmd.server:run -marconi.queues.storage = +marconi.queues.data.storage = sqlite = marconi.queues.storage.sqlite.driver:DataDriver mongodb = marconi.queues.storage.mongodb.driver:DataDriver +marconi.queues.control.storage = + sqlite = marconi.queues.storage.sqlite.driver:ControlDriver + mongodb = marconi.queues.storage.mongodb.driver:ControlDriver + marconi.queues.public.transport = wsgi = marconi.queues.transport.wsgi.public.driver:Driver diff --git a/tests/etc/drivers_storage_invalid.conf b/tests/etc/drivers_storage_invalid.conf index e779a56f8..9a0ecd11e 100644 --- a/tests/etc/drivers_storage_invalid.conf +++ b/tests/etc/drivers_storage_invalid.conf @@ -1,6 +1,7 @@ [DEFAULT] debug = False verbose = False +admin_mode = False [queues:drivers] transport = wsgi diff --git a/tests/etc/wsgi_sqlite.conf b/tests/etc/wsgi_sqlite.conf index 82ffd6366..e5dfe37b5 100644 --- a/tests/etc/wsgi_sqlite.conf +++ b/tests/etc/wsgi_sqlite.conf @@ -1,6 +1,7 @@ [DEFAULT] debug = False verbose = False +admin_mode = False [queues:drivers] transport = wsgi diff --git a/tests/unit/queues/storage/test_impl_mongodb.py b/tests/unit/queues/storage/test_impl_mongodb.py index 9eb1f7090..a852e0ce6 100644 --- a/tests/unit/queues/storage/test_impl_mongodb.py +++ b/tests/unit/queues/storage/test_impl_mongodb.py @@ -332,3 +332,16 @@ class MongodbClaimTests(base.ClaimControllerTest): self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.controller.update, self.queue_name, claim_id, {}, project=self.project) + + +@testing.requires_mongodb +class MongodbShardsTests(base.ShardsControllerTest): + driver_class = mongodb.ControlDriver + controller_class = controllers.ShardsController + + def setUp(self): + super(MongodbShardsTests, self).setUp() + self.load_conf('wsgi_mongodb.conf') + + def tearDown(self): + super(MongodbShardsTests, self).tearDown() diff --git a/tests/unit/queues/storage/test_shard_catalog.py b/tests/unit/queues/storage/test_shard_catalog.py index 0732bb83d..bde665180 100644 --- a/tests/unit/queues/storage/test_shard_catalog.py +++ b/tests/unit/queues/storage/test_shard_catalog.py @@ -29,7 +29,7 @@ class TestShardCatalog(base.TestBase): conf_file = 'etc/wsgi_sqlite_sharded.conf' conf = cfg.ConfigOpts() - conf(default_config_files=[conf_file]) + conf(args=[], default_config_files=[conf_file]) lookup = sharding.Catalog(conf).lookup diff --git a/tests/unit/test_bootstrap.py b/tests/unit/test_bootstrap.py index 29f84278c..b4140d311 100644 --- a/tests/unit/test_bootstrap.py +++ b/tests/unit/test_bootstrap.py @@ -25,8 +25,8 @@ from marconi.tests import base class TestBootstrap(base.TestBase): def _bootstrap(self, conf_file): - conf = self.load_conf(conf_file) - return bootstrap.Bootstrap(conf) + self.conf = self.load_conf(conf_file) + return bootstrap.Bootstrap(self.conf) def test_storage_invalid(self): boot = self._bootstrap('etc/drivers_storage_invalid.conf')