diff --git a/marconi/queues/storage/__init__.py b/marconi/queues/storage/__init__.py index 288906768..2ea81dcd5 100644 --- a/marconi/queues/storage/__init__.py +++ b/marconi/queues/storage/__init__.py @@ -6,6 +6,7 @@ from marconi.queues.storage import exceptions # NOQA # Hoist classes into package namespace ControlDriverBase = base.ControlDriverBase DataDriverBase = base.DataDriverBase +CatalogueBase = base.CatalogueBase ClaimBase = base.ClaimBase MessageBase = base.MessageBase QueueBase = base.QueueBase diff --git a/marconi/queues/storage/base.py b/marconi/queues/storage/base.py index ce6ee5ec5..9ff19a9d9 100644 --- a/marconi/queues/storage/base.py +++ b/marconi/queues/storage/base.py @@ -87,6 +87,11 @@ class ControlDriverBase(object): def __init__(self, conf): self.conf = conf + @abc.abstractproperty + def catalogue_controller(self): + """Returns the driver's catalogue controller.""" + raise NotImplementedError + @abc.abstractproperty def shards_controller(self): """Returns storage's shard management controller.""" @@ -465,3 +470,94 @@ class ShardsBase(AdminControllerBase): def drop_all(self): """Deletes all shards from storage.""" raise NotImplementedError + + +@six.add_metaclass(abc.ABCMeta) +class CatalogueBase(ControllerBase): + """A controller for managing the catalogue. The catalogue is + responsible for maintaining a mapping between project.queue + entries to their shard. + """ + + @abc.abstractmethod + def list(self, project): + """Returns a list of queue entries from the catalogue associated with + this project. + + :param project: The project to use when filtering through queue + entries. + :type project: six.text_type + :returns: [{'project': ..., 'queue': ..., 'shard': ...},] + :rtype: [dict] + """ + raise NotImplementedError + + @abc.abstractmethod + def get(self, project, queue): + """Returns the shard identifier for the queue registered under this + project. + + :param project: Namespace to search for the given queue + :type project: six.text_type + :param queue: The name of the queue to search for + :type queue: six.text_type + :returns: {'shard': ...} + :rtype: dict + :raises: QueueNotMapped + """ + raise NotImplementedError + + @abc.abstractmethod + def exists(self, project, queue): + """Determines whether the given queue exists under project. + + :param project: Namespace to check. + :type project: six.text_type + :param queue: str - Particular queue to check for + :type queue: six.text_type + :return: True if the queue exists under this project + :rtype: bool + """ + + @abc.abstractmethod + def insert(self, project, queue, shard): + """Creates a new catalogue entry, or updates it if it already existed. + + :param project: str - Namespace to insert the given queue into + :type project: six.text_type + :param queue: str - The name of the queue to insert + :type queue: six.text_type + :param shard: shard identifier to associate this queue with + :type shard: six.text_type + """ + raise NotImplementedError + + @abc.abstractmethod + def delete(self, project, queue): + """Removes this entry from the catalogue. + + :param project: The namespace to search for this queue + :type project: six.text_type + :param queue: The queue name to remove + :type queue: six.text_type + """ + raise NotImplementedError + + @abc.abstractmethod + def update(self, project, queue, shards=None): + """Updates the shard identifier for this queue + + :param project: Namespace to search + :type project: six.text_type + :param queue: The name of the queue + :type queue: six.text_type + :param shards: The name of the shard where this project/queue lives. + :type shards: six.text_type + :raises: QueueNotMapped + """ + raise NotImplementedError + + @abc.abstractmethod + def drop_all(self): + """Drops all catalogue entries from storage.""" + raise NotImplementedError diff --git a/marconi/queues/storage/exceptions.py b/marconi/queues/storage/exceptions.py index 37d370c39..641ecd860 100644 --- a/marconi/queues/storage/exceptions.py +++ b/marconi/queues/storage/exceptions.py @@ -100,6 +100,15 @@ class ClaimDoesNotExist(DoesNotExist): super(ClaimDoesNotExist, self).__init__(msg) +class QueueNotMapped(DoesNotExist): + + def __init__(self, queue, project): + msg = (u'No shard found for ' + u'queue %(queue)s for project %(project)s' % + dict(queue=queue, project=project)) + super(QueueNotMapped, self).__init__(msg) + + class MessageIsClaimedBy(NotPermitted): def __init__(self, mid, cid): diff --git a/marconi/queues/storage/mongodb/catalogue.py b/marconi/queues/storage/mongodb/catalogue.py new file mode 100644 index 000000000..9df74e940 --- /dev/null +++ b/marconi/queues/storage/mongodb/catalogue.py @@ -0,0 +1,107 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""MongoDB storage controller for the queues catalogue. + +Serves to construct an association between a project + queue -> shard + +{ + 'p_q': project_queue :: six.text_type, + 's': shard_identifier :: six.text_type +} +""" + +import marconi.openstack.common.log as logging +from marconi.queues.storage import base, exceptions +from marconi.queues.storage.mongodb import utils + + +LOG = logging.getLogger(__name__) + +PRIMARY_KEY = utils.PROJ_QUEUE_KEY + +CATALOGUE_INDEX = [ + (PRIMARY_KEY, 1) +] + + +class CatalogueController(base.CatalogueBase): + + def __init__(self, *args, **kwargs): + super(CatalogueController, self).__init__(*args, **kwargs) + + self._col = self.driver.catalogue_database.catalogue + self._col.ensure_index(CATALOGUE_INDEX, unique=True) + + @utils.raises_conn_error + def _insert(self, project, queue, shard, upsert): + key = utils.scope_queue_name(queue, project) + return self._col.update({PRIMARY_KEY: key}, + {'$set': {'s': shard}}, upsert=upsert) + + @utils.raises_conn_error + def list(self, project): + fields = {'_id': 0} + + query = utils.scoped_query(None, project) + return utils.HookedCursor(self._col.find(query, fields), + _normalize) + + @utils.raises_conn_error + def get(self, project, queue): + fields = {'_id': 0} + key = utils.scope_queue_name(queue, project) + entry = self._col.find_one({PRIMARY_KEY: key}, + fields=fields) + + if entry is None: + raise exceptions.QueueNotMapped(project, queue) + + return _normalize(entry) + + @utils.raises_conn_error + def exists(self, project, queue): + key = utils.scope_queue_name(queue, project) + return self._col.find_one({PRIMARY_KEY: key}) is not None + + def insert(self, project, queue, shard): + # NOTE(cpp-cabrera): _insert handles conn_error + self._insert(project, queue, shard, upsert=True) + + @utils.raises_conn_error + def delete(self, project, queue): + self._col.remove({PRIMARY_KEY: utils.scope_queue_name(queue, project)}, + w=0) + + def update(self, project, queue, shard=None): + # NOTE(cpp-cabrera): _insert handles conn_error + res = self._insert(project, queue, shard, upsert=False) + + if not res['updatedExisting']: + raise exceptions.QueueNotMapped(project, queue) + + @utils.raises_conn_error + def drop_all(self): + self._col.drop() + self._col.ensure_index(CATALOGUE_INDEX, unique=True) + + +def _normalize(entry): + project, queue = utils.parse_scoped_project_queue(entry[PRIMARY_KEY]) + return { + 'queue': queue, + 'project': project, + 'shard': entry['s'] + } diff --git a/marconi/queues/storage/mongodb/controllers.py b/marconi/queues/storage/mongodb/controllers.py index 97058b6bc..2ab5a1c9a 100644 --- a/marconi/queues/storage/mongodb/controllers.py +++ b/marconi/queues/storage/mongodb/controllers.py @@ -22,12 +22,14 @@ Field Mappings: updated and documented in each controller class. """ +from marconi.queues.storage.mongodb import catalogue from marconi.queues.storage.mongodb import claims from marconi.queues.storage.mongodb import messages from marconi.queues.storage.mongodb import queues from marconi.queues.storage.mongodb import shards +CatalogueController = catalogue.CatalogueController ClaimController = claims.ClaimController MessageController = messages.MessageController QueueController = queues.QueueController diff --git a/marconi/queues/storage/mongodb/driver.py b/marconi/queues/storage/mongodb/driver.py index 0f4b02617..b25bc4398 100644 --- a/marconi/queues/storage/mongodb/driver.py +++ b/marconi/queues/storage/mongodb/driver.py @@ -29,7 +29,6 @@ LOG = logging.getLogger(__name__) def _connection(conf): - """MongoDB client connection instance.""" if conf.uri and 'replicaSet' in conf.uri: MongoClient = pymongo.MongoReplicaSetClient else: @@ -52,7 +51,7 @@ class DataDriver(storage.DataDriverBase): def queues_database(self): """Database dedicated to the "queues" collection. - The queues collection is separated out into it's own database + The queues collection is separated out into its own database to avoid writer lock contention with the messages collections. """ @@ -78,6 +77,7 @@ class DataDriver(storage.DataDriverBase): @decorators.lazy_property(write=False) def connection(self): + """MongoDB client connection instance.""" return _connection(self.mongodb_conf) @decorators.lazy_property(write=False) @@ -116,3 +116,18 @@ class ControlDriver(storage.ControlDriverBase): @property def shards_controller(self): return controllers.ShardsController(self) + + @decorators.lazy_property(write=False) + def catalogue_database(self): + """Database dedicated to the "queues" collection. + + The queues collection is separated out into its own database + to avoid writer lock contention with the messages collections. + """ + + name = self.mongodb_conf.database + '_catalogue' + return self.connection[name] + + @property + def catalogue_controller(self): + return controllers.CatalogueController(self) diff --git a/marconi/queues/storage/mongodb/messages.py b/marconi/queues/storage/mongodb/messages.py index 45a455b9f..475c02a50 100644 --- a/marconi/queues/storage/mongodb/messages.py +++ b/marconi/queues/storage/mongodb/messages.py @@ -55,23 +55,27 @@ TTL_INDEX_FIELDS = [ ('e', 1), ] +# NOTE(cpp-cabrera): to unify use of project/queue across mongodb +# storage impls. +PROJ_QUEUE = utils.PROJ_QUEUE_KEY + # NOTE(kgriffs): This index is for listing messages, usually # filtering out claimed ones. ACTIVE_INDEX_FIELDS = [ - ('p_q', 1), # Project will to be unique, so put first + (PROJ_QUEUE, 1), # Project will be unique, so put first ('k', 1), # Used for sorting and paging, must come before range queries ('c.e', 1), # Used for filtering out claimed messages ] # For counting COUNTING_INDEX_FIELDS = [ - ('p_q', 1), # Project will to be unique, so put first + (PROJ_QUEUE, 1), # Project will be unique, so put first ('c.e', 1), # Used for filtering out claimed messages ] # Index used for claims CLAIMED_INDEX_FIELDS = [ - ('p_q', 1), + (PROJ_QUEUE, 1), ('c.id', 1), ('k', 1), ('c.e', 1), @@ -79,7 +83,7 @@ CLAIMED_INDEX_FIELDS = [ # Index used to ensure uniqueness. MARKER_INDEX_FIELDS = [ - ('p_q', 1), + (PROJ_QUEUE, 1), ('k', 1), ] @@ -193,7 +197,7 @@ class MessageController(storage.MessageBase): """ scope = utils.scope_queue_name(queue_name, project) collection = self._collection(queue_name, project) - collection.remove({'p_q': scope}, w=0) + collection.remove({PROJ_QUEUE: scope}, w=0) def _list(self, queue_name, project=None, marker=None, echo=False, client_uuid=None, fields=None, @@ -234,7 +238,7 @@ class MessageController(storage.MessageBase): query = { # Messages must belong to this # queue and project - 'p_q': utils.scope_queue_name(queue_name, project), + PROJ_QUEUE: utils.scope_queue_name(queue_name, project), } if not echo: @@ -275,7 +279,7 @@ class MessageController(storage.MessageBase): """ query = { # Messages must belong to this queue - 'p_q': utils.scope_queue_name(queue_name, project), + PROJ_QUEUE: utils.scope_queue_name(queue_name, project), } if not include_claimed: @@ -301,7 +305,7 @@ class MessageController(storage.MessageBase): claim_id = {'$ne': None} query = { - 'p_q': utils.scope_queue_name(queue_name, project), + PROJ_QUEUE: utils.scope_queue_name(queue_name, project), 'c.id': claim_id, 'c.e': {'$gt': expires or timeutils.utcnow_ts()}, } @@ -341,7 +345,7 @@ class MessageController(storage.MessageBase): scope = utils.scope_queue_name(queue_name, project) collection = self._collection(queue_name, project) - collection.update({'p_q': scope, 'c.id': cid}, + collection.update({PROJ_QUEUE: scope, 'c.id': cid}, {'$set': {'c': {'id': None, 'e': now}}}, upsert=False, multi=True) @@ -402,7 +406,7 @@ class MessageController(storage.MessageBase): query = { '_id': mid, - 'p_q': utils.scope_queue_name(queue_name, project), + PROJ_QUEUE: utils.scope_queue_name(queue_name, project), } collection = self._collection(queue_name, project) @@ -425,7 +429,7 @@ class MessageController(storage.MessageBase): # Base query, always check expire time query = { '_id': {'$in': message_ids}, - 'p_q': utils.scope_queue_name(queue_name, project), + PROJ_QUEUE: utils.scope_queue_name(queue_name, project), } collection = self._collection(queue_name, project) @@ -454,7 +458,7 @@ class MessageController(storage.MessageBase): prepared_messages = [ { 't': message['ttl'], - 'p_q': utils.scope_queue_name(queue_name, project), + PROJ_QUEUE: utils.scope_queue_name(queue_name, project), 'e': now_dt + datetime.timedelta(seconds=message['ttl']), 'u': client_uuid, 'c': {'id': None, 'e': now}, @@ -616,7 +620,7 @@ class MessageController(storage.MessageBase): query = { '_id': mid, - 'p_q': utils.scope_queue_name(queue_name, project), + PROJ_QUEUE: utils.scope_queue_name(queue_name, project), } # NOTE(cpp-cabrera): return early - the user gaves us an @@ -650,7 +654,7 @@ class MessageController(storage.MessageBase): message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid] query = { '_id': {'$in': message_ids}, - 'p_q': utils.scope_queue_name(queue_name, project), + PROJ_QUEUE: utils.scope_queue_name(queue_name, project), } collection = self._collection(queue_name, project) diff --git a/marconi/queues/storage/mongodb/queues.py b/marconi/queues/storage/mongodb/queues.py index b4db28066..e308db962 100644 --- a/marconi/queues/storage/mongodb/queues.py +++ b/marconi/queues/storage/mongodb/queues.py @@ -172,19 +172,7 @@ class QueueController(storage.QueueBase): if limit is None: limit = self.driver.limits_conf.default_queue_paging - query = {} - scoped_name = utils.scope_queue_name(marker, project) - - if not scoped_name.startswith('/'): - # NOTE(kgriffs): scoped queue, e.g., 'project-id/queue-name' - project_prefix = '^' + project + '/' - query['p_q'] = {'$regex': project_prefix, '$gt': scoped_name} - elif scoped_name == '/': - # NOTE(kgriffs): list global queues, but exclude scoped ones - query['p_q'] = {'$regex': '^/'} - else: - # NOTE(kgriffs): unscoped queue, e.g., '/my-global-queue' - query['p_q'] = {'$regex': '^/', '$gt': scoped_name} + query = utils.scoped_query(marker, project) fields = {'p_q': 1, '_id': 0} if detailed: diff --git a/marconi/queues/storage/mongodb/utils.py b/marconi/queues/storage/mongodb/utils.py index e411f6ca3..f66ce34a6 100644 --- a/marconi/queues/storage/mongodb/utils.py +++ b/marconi/queues/storage/mongodb/utils.py @@ -33,6 +33,9 @@ from marconi.queues.storage import exceptions as storage_exceptions # TZ-aware UNIX epoch for convenience. EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=tz_util.utc) +# NOTE(cpp-cabrera): the authoritative form of project/queue keys. +PROJ_QUEUE_KEY = 'p_q' + LOG = logging.getLogger(__name__) @@ -172,6 +175,47 @@ def descope_queue_name(scoped_name): return scoped_name.partition('/')[2] or None +def parse_scoped_project_queue(scoped_name): + """Returns the project and queue name for a scoped catalogue entry. + + :param scoped_name: a project/queue as given by :scope_queue_name: + :type scoped_name: six.text_type + :returns: (project, queue) + :rtype: (six.text_type, six.text_type) + """ + return scoped_name.split('/') + + +def scoped_query(queue, project): + """Returns a dict usable for querying for scoped project/queues. + + :param queue: name of queue to seek + :type queue: six.text_type + :param project: namespace + :type project: six.text_type + :param key: query key to use + :type key: six.text_type + :returns: query to issue + :rtype: dict + """ + key = PROJ_QUEUE_KEY + query = {} + scoped_name = scope_queue_name(queue, project) + + if not scoped_name.startswith('/'): + # NOTE(kgriffs): scoped queue, e.g., 'project-id/queue-name' + project_prefix = '^' + project + '/' + query[key] = {'$regex': project_prefix, '$gt': scoped_name} + elif scoped_name == '/': + # NOTE(kgriffs): list global queues, but exclude scoped ones + query[key] = {'$regex': '^/'} + else: + # NOTE(kgriffs): unscoped queue, e.g., '/my-global-queue' + query[key] = {'$regex': '^/', '$gt': scoped_name} + + return query + + def get_partition(num_partitions, queue, project=None): """Get the partition number for a given queue and project. diff --git a/marconi/queues/storage/sqlite/catalogue.py b/marconi/queues/storage/sqlite/catalogue.py new file mode 100644 index 000000000..57adbdffc --- /dev/null +++ b/marconi/queues/storage/sqlite/catalogue.py @@ -0,0 +1,48 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""sqlite storage controller for the queues catalogue. + +Serves to construct an association between a project + queue -> shard +""" + +from marconi.queues.storage import base + + +class CatalogueController(base.CatalogueBase): + + def __init__(self, *args, **kwargs): + super(CatalogueController, self).__init__(*args, **kwargs) + + def list(self, project): + pass + + def get(self, project, queue): + pass + + def exists(self, project, queue): + pass + + def insert(self, project, queue, shard): + pass + + def delete(self, project, queue): + pass + + def update(self, project, queue, shards=None): + pass + + def drop_all(self): + pass diff --git a/marconi/queues/storage/sqlite/controllers.py b/marconi/queues/storage/sqlite/controllers.py index 67c275714..1dff2ab66 100644 --- a/marconi/queues/storage/sqlite/controllers.py +++ b/marconi/queues/storage/sqlite/controllers.py @@ -16,12 +16,14 @@ """Exports SQLite driver controllers.""" +from marconi.queues.storage.sqlite import catalogue from marconi.queues.storage.sqlite import claims from marconi.queues.storage.sqlite import messages from marconi.queues.storage.sqlite import queues from marconi.queues.storage.sqlite import shards +CatalogueController = catalogue.CatalogueController ClaimController = claims.ClaimController MessageController = messages.MessageController QueueController = queues.QueueController diff --git a/marconi/queues/storage/sqlite/driver.py b/marconi/queues/storage/sqlite/driver.py index d755259b4..f0882183e 100644 --- a/marconi/queues/storage/sqlite/driver.py +++ b/marconi/queues/storage/sqlite/driver.py @@ -214,6 +214,10 @@ class ControlDriver(storage.ControlDriverBase): self.__db = self.__conn.cursor() self.run('''PRAGMA foreign_keys = ON''') + @property + def catalogue_controller(self): + return controllers.CatalogueController(self) + @property def shards_controller(self): return controllers.ShardsController(self) diff --git a/marconi/tests/faulty_storage.py b/marconi/tests/faulty_storage.py index 20e1ff859..f769b19f8 100644 --- a/marconi/tests/faulty_storage.py +++ b/marconi/tests/faulty_storage.py @@ -42,6 +42,10 @@ class ControlDriver(storage.ControlDriverBase): def __init__(self, conf): super(ControlDriver, self).__init__(conf) + @property + def catalogue_controller(self): + return None + @property def shards_controller(self): return None diff --git a/marconi/tests/helpers.py b/marconi/tests/helpers.py index d894d93a7..c009db5b3 100644 --- a/marconi/tests/helpers.py +++ b/marconi/tests/helpers.py @@ -132,6 +132,51 @@ def entries(controller, count): controller.delete(p, q) +@contextlib.contextmanager +def shard_entry(controller, project, queue, shard): + """Creates a catalogue entry with the given details, and deletes + it once the context manager goes out of scope. + + :param controller: storage handler + :type controller: queues.storage.base:CatalogueBase + :param project: namespace for queue + :type project: six.text_type + :param queue: name of queue + :type queue: six.text_type + :param shard: an identifier for the shard + :type shard: six.text_type + :returns: (project, queue, shard) + :rtype: (six.text_type, six.text_type, six.text_type) + """ + controller.insert(project, queue, shard) + yield (project, queue, shard) + controller.delete(project, queue) + + +@contextlib.contextmanager +def shard_entries(controller, count): + """Creates `count` catalogue entries with the given details, and + deletes them once the context manager goes out of scope. + + :param controller: storage handler + :type controller: queues.storage.base:CatalogueBase + :param count: number of entries to create + :type count: int + :returns: [(project, queue, shard)] + :rtype: [(six.text_type, six.text_type, six.text_type)] + """ + spec = [(u'_', six.text_type(uuid.uuid1()), six.text_type(i)) + for i in range(count)] + + for p, q, s in spec: + controller.insert(p, q, s) + + yield spec + + for p, q, _ in spec: + controller.delete(p, q) + + def requires_mongodb(test_case): """Decorator to flag a test case as being dependent on MongoDB. diff --git a/marconi/tests/queues/storage/base.py b/marconi/tests/queues/storage/base.py index 158aab78d..df2d73d55 100644 --- a/marconi/tests/queues/storage/base.py +++ b/marconi/tests/queues/storage/base.py @@ -19,12 +19,14 @@ import uuid import ddt from oslo.config import cfg +import six from testtools import matchers from marconi.openstack.common import timeutils from marconi.queues import storage from marconi.queues.storage import exceptions from marconi import tests as testing +from marconi.tests import helpers class ControllerBaseTest(testing.TestBase): @@ -712,6 +714,109 @@ class ShardsControllerTest(ControllerBaseTest): self.assertEqual(entry['o'], {}) +class CatalogueControllerTest(ControllerBaseTest): + controller_base_class = storage.CatalogueBase + + def setUp(self): + super(CatalogueControllerTest, self).setUp() + self.controller = self.driver.catalogue_controller + self.queue = six.text_type(uuid.uuid1()) + self.project = six.text_type(uuid.uuid1()) + + def tearDown(self): + self.controller.drop_all() + super(CatalogueControllerTest, self).tearDown() + + def _check_structure(self, entry): + self.assertIn('queue', entry) + self.assertIn('project', entry) + self.assertIn('shard', entry) + self.assertIsInstance(entry['queue'], six.text_type) + self.assertIsInstance(entry['project'], six.text_type) + self.assertIsInstance(entry['shard'], six.text_type) + + def _check_value(self, entry, xqueue, xproject, xshard): + self.assertEqual(entry['queue'], xqueue) + self.assertEqual(entry['project'], xproject) + self.assertEqual(entry['shard'], xshard) + + def test_catalogue_entry_life_cycle(self): + queue = self.queue + project = self.project + + # check listing is initially empty + for p in self.controller.list(project): + self.fail('There should be no entries at this time') + + # create a listing, check its length + with helpers.shard_entries(self.controller, 10) as expect: + project = expect[0][0] + xs = list(self.controller.list(project)) + self.assertEqual(len(xs), 10) + + # create, check existence, delete + with helpers.shard_entry(self.controller, project, queue, u'a'): + self.assertTrue(self.controller.exists(project, queue)) + + # verify it no longer exists + self.assertFalse(self.controller.exists(project, queue)) + + # verify it isn't listable + self.assertEqual(len(list(self.controller.list(project))), 0) + + def test_list(self): + with helpers.shard_entries(self.controller, 10) as expect: + values = zip(self.controller.list(u'_'), expect) + for e, x in values: + p, q, s = x + self._check_structure(e) + self._check_value(e, xqueue=q, xproject=p, xshard=s) + + def test_update(self): + with helpers.shard_entry(self.controller, self.project, + self.queue, u'a') as expect: + p, q, s = expect + self.controller.update(p, q, shard=u'b') + entry = self.controller.get(p, q) + self._check_value(entry, xqueue=q, xproject=p, xshard=u'b') + + def test_update_raises_when_entry_does_not_exist(self): + self.assertRaises(exceptions.QueueNotMapped, + self.controller.update, + 'not', 'not', 'a') + + def test_get(self): + with helpers.shard_entry(self.controller, + self.project, + self.queue, u'a') as expect: + p, q, s = expect + e = self.controller.get(p, q) + self._check_value(e, xqueue=q, xproject=p, xshard=s) + + def test_get_raises_if_does_not_exist(self): + with helpers.shard_entry(self.controller, + self.project, + self.queue, u'a') as expect: + p, q, _ = expect + self.assertRaises(exceptions.QueueNotMapped, + self.controller.get, + p, 'non_existing') + self.assertRaises(exceptions.QueueNotMapped, + self.controller.get, + 'non_existing', q) + self.assertRaises(exceptions.QueueNotMapped, + self.controller.get, + 'non_existing', 'non_existing') + + def test_exists(self): + with helpers.shard_entry(self.controller, + self.project, + self.queue, u'a') as expect: + p, q, _ = expect + self.assertTrue(self.controller.exists(p, q)) + self.assertFalse(self.controller.exists('nada', 'not_here')) + + def _insert_fixtures(controller, queue_name, project=None, client_uuid=None, num=4, ttl=120): diff --git a/tests/unit/queues/storage/test_impl_mongodb.py b/tests/unit/queues/storage/test_impl_mongodb.py index a852e0ce6..a041d8bb5 100644 --- a/tests/unit/queues/storage/test_impl_mongodb.py +++ b/tests/unit/queues/storage/test_impl_mongodb.py @@ -345,3 +345,17 @@ class MongodbShardsTests(base.ShardsControllerTest): def tearDown(self): super(MongodbShardsTests, self).tearDown() + + +@testing.requires_mongodb +class MongodbCatalogueTests(base.CatalogueControllerTest): + driver_class = mongodb.ControlDriver + controller_class = controllers.CatalogueController + + def setUp(self): + super(MongodbCatalogueTests, self).setUp() + self.load_conf('wsgi_mongodb.conf') + + def tearDown(self): + self.controller.drop_all() + super(MongodbCatalogueTests, self).tearDown()