diff --git a/requirements-py3.txt b/requirements-py3.txt index c1cef2f76..da163723c 100644 --- a/requirements-py3.txt +++ b/requirements-py3.txt @@ -11,7 +11,7 @@ iso8601>=0.1.9 keystonemiddleware>=1.5.0 msgpack-python>=0.4.0 posix_ipc -pymongo>=2.6.3,<3.0 +pymongo>=3.0.2 # python-memcached has no Py3k support for now # python-memcached>=1.48 WebOb>=1.2.3 diff --git a/test-requirements.txt b/test-requirements.txt index c9cb56106..3c43dfa9e 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -9,7 +9,7 @@ mock>=1.0 # Backends redis>=2.10.0 -pymongo>=2.6.3,<3.0 +pymongo>=3.0.2 # Unit testing coverage>=3.6 diff --git a/zaqar/storage/mongodb/__init__.py b/zaqar/storage/mongodb/__init__.py index 4433b5e6b..e63e8728a 100644 --- a/zaqar/storage/mongodb/__init__.py +++ b/zaqar/storage/mongodb/__init__.py @@ -61,3 +61,4 @@ from zaqar.storage.mongodb import driver # Hoist classes into package namespace ControlDriver = driver.ControlDriver DataDriver = driver.DataDriver +FIFODataDriver = driver.FIFODataDriver diff --git a/zaqar/storage/mongodb/catalogue.py b/zaqar/storage/mongodb/catalogue.py index 38361eb6c..8a8196006 100644 --- a/zaqar/storage/mongodb/catalogue.py +++ b/zaqar/storage/mongodb/catalogue.py @@ -68,7 +68,7 @@ class CatalogueController(base.CatalogueBase): fields = {'_id': 0} key = utils.scope_queue_name(queue, project) entry = self._col.find_one({PRIMARY_KEY: key}, - fields=fields) + projection=fields) if entry is None: raise errors.QueueNotMapped(queue, project) diff --git a/zaqar/storage/mongodb/claims.py b/zaqar/storage/mongodb/claims.py index 6cad70050..9b506a261 100644 --- a/zaqar/storage/mongodb/claims.py +++ b/zaqar/storage/mongodb/claims.py @@ -147,7 +147,7 @@ class ClaimController(storage.Claim): # Get a list of active, not claimed nor expired # messages that could be claimed. - msgs = msg_ctrl._active(queue, fields={'_id': 1}, project=project, + msgs = msg_ctrl._active(queue, projection={'_id': 1}, project=project, limit=limit) messages = iter([]) diff --git a/zaqar/storage/mongodb/driver.py b/zaqar/storage/mongodb/driver.py index bc143aa76..8a0191daf 100644 --- a/zaqar/storage/mongodb/driver.py +++ b/zaqar/storage/mongodb/driver.py @@ -84,35 +84,30 @@ class DataDriver(storage.DataDriverBase): self.mongodb_conf = self.conf[options.MESSAGE_MONGODB_GROUP] conn = self.connection - server_version = conn.server_info()['version'] + server_info = conn.server_info()['version'] + self.server_version = tuple(map(int, server_info.split('.'))) - if tuple(map(int, server_version.split('.'))) < (2, 2): - raise RuntimeError(_('The mongodb driver requires mongodb>=2.2, ' - '%s found') % server_version) + if self.server_version < (2, 2): + raise RuntimeError(_('The mongodb driver requires mongodb>=2.2, ' + '%s found') % server_info) if not len(conn.nodes) > 1 and not conn.is_mongos: if not self.conf.unreliable: raise RuntimeError(_('Either a replica set or a mongos is ' 'required to guarantee message delivery')) else: - wc = conn.write_concern.get('w') - majority = (wc == 'majority' or - wc >= 2) - if not wc: - # NOTE(flaper87): No write concern specified, use majority - # and don't count journal as a replica. Use `update` to avoid - # overwriting `wtimeout` - conn.write_concern.update({'w': 'majority'}) - elif not self.conf.unreliable and not majority: + _mongo_wc = conn.write_concern.document.get('w') + durable = (_mongo_wc == 'majority' or + _mongo_wc >= 2) + + if not self.conf.unreliable and not durable: raise RuntimeError(_('Using a write concern other than ' '`majority` or > 2 makes the service ' 'unreliable. Please use a different ' 'write concern or set `unreliable` ' 'to True in the config file.')) - conn.write_concern['j'] = False - # FIXME(flaper87): Make this dynamic self._capabilities = self.BASE_CAPABILITIES @@ -150,18 +145,24 @@ class DataDriver(storage.DataDriverBase): def message_databases(self): """List of message databases, ordered by partition number.""" + kwargs = {} + if not self.server_version < (2, 6): + # NOTE(flaper87): Skip mongodb versions below 2.6 when + # setting the write concern on the database. pymongo 3.0 + # fails with norepl when creating indexes. + doc = self.connection.write_concern.document.copy() + doc.setdefault('w', 'majority') + doc.setdefault('j', False) + kwargs['write_concern'] = pymongo.WriteConcern(**doc) + name = self.mongodb_conf.database partitions = self.mongodb_conf.partitions - # NOTE(kgriffs): Partition names are zero-based, and - # the list is ordered by partition, which means that a - # caller can, e.g., get zaqar_mp0 by simply indexing - # the first element in the list of databases: - # - # self.driver.message_databases[0] - # - return [self.connection[name + self._COL_SUFIX + str(p)] - for p in range(partitions)] + databases = [] + for p in range(partitions): + db_name = name + self._COL_SUFIX + str(p) + databases.append(self.connection.get_database(db_name, **kwargs)) + return databases @decorators.lazy_property(write=False) def subscriptions_database(self): diff --git a/zaqar/storage/mongodb/flavors.py b/zaqar/storage/mongodb/flavors.py index c057bc9bd..d4a25b0ea 100644 --- a/zaqar/storage/mongodb/flavors.py +++ b/zaqar/storage/mongodb/flavors.py @@ -63,7 +63,7 @@ class FlavorsController(base.FlavorsBase): @utils.raises_conn_error def _list_by_pool(self, pool, limit=10, detailed=False): query = {'s': pool} - cursor = self._col.find(query, fields=_field_spec(detailed), + cursor = self._col.find(query, projection=_field_spec(detailed), limit=limit).sort('n', 1) normalizer = functools.partial(_normalize, detailed=detailed) @@ -75,7 +75,7 @@ class FlavorsController(base.FlavorsBase): if marker is not None: query['n'] = {'$gt': marker} - cursor = self._col.find(query, fields=_field_spec(detailed), + cursor = self._col.find(query, projection=_field_spec(detailed), limit=limit).sort('n', 1) marker_name = {} diff --git a/zaqar/storage/mongodb/messages.py b/zaqar/storage/mongodb/messages.py index 0c6f8932e..023ed7d10 100644 --- a/zaqar/storage/mongodb/messages.py +++ b/zaqar/storage/mongodb/messages.py @@ -229,7 +229,7 @@ class MessageController(storage.Message): collection.remove({PROJ_QUEUE: scope}, w=0) def _list(self, queue_name, project=None, marker=None, - echo=False, client_uuid=None, fields=None, + echo=False, client_uuid=None, projection=None, include_claimed=False, sort=1, limit=None): """Message document listing helper. @@ -290,7 +290,9 @@ class MessageController(storage.Message): query['c.e'] = {'$lte': now} # Construct the request - cursor = collection.find(query, fields=fields, sort=[('k', sort)]) + cursor = collection.find(query, + projection=projection, + sort=[('k', sort)]) if limit is not None: cursor.limit(limit) @@ -331,15 +333,15 @@ class MessageController(storage.Message): query['c.e'] = {'$lte': timeutils.utcnow_ts()} collection = self._collection(queue_name, project) - return collection.find(query).hint(COUNTING_INDEX_FIELDS).count() + return collection.count(filter=query, hint=COUNTING_INDEX_FIELDS) def _active(self, queue_name, marker=None, echo=False, - client_uuid=None, fields=None, project=None, + client_uuid=None, projection=None, project=None, limit=None): return self._list(queue_name, project=project, marker=marker, echo=echo, client_uuid=client_uuid, - fields=fields, include_claimed=False, + projection=projection, include_claimed=False, limit=limit) def _claimed(self, queue_name, claim_id, @@ -354,14 +356,17 @@ class MessageController(storage.Message): 'c.e': {'$gt': expires or timeutils.utcnow_ts()}, } + kwargs = {} + collection = self._collection(queue_name, project) + # NOTE(kgriffs): Claimed messages bust be queried from # the primary to avoid a race condition caused by the # multi-phased "create claim" algorithm. - preference = pymongo.read_preferences.ReadPreference.PRIMARY - collection = self._collection(queue_name, project) - msgs = collection.find(query, sort=[('k', 1)], - read_preference=preference).hint( - CLAIMED_INDEX_FIELDS) + # NOTE(flaper87): In pymongo 3.0 PRIMARY is the default and + # `read_preference` is read only. We'd need to set it when the + # client is created. + msgs = collection.find(query, sort=[('k', 1)], **kwargs).hint( + CLAIMED_INDEX_FIELDS) if limit is not None: msgs = msgs.limit(limit) @@ -564,11 +569,14 @@ class MessageController(storage.Message): else: if message['c']['id'] != cid: + kwargs = {} + # NOTE(flaper87): In pymongo 3.0 PRIMARY is the default and + # `read_preference` is read only. We'd need to set it when the + # client is created. # NOTE(kgriffs): Read from primary in case the message # was just barely claimed, and claim hasn't made it to # the secondary. - pref = pymongo.read_preferences.ReadPreference.PRIMARY - message = collection.find_one(query, read_preference=pref) + message = collection.find_one(query, **kwargs) if message['c']['id'] != cid: if _is_claimed(message, now): @@ -603,10 +611,10 @@ class MessageController(storage.Message): query['c.e'] = {'$lte': now} collection = self._collection(queue_name, project) - fields = {'_id': 1, 't': 1, 'b': 1, 'c.id': 1} + projection = {'_id': 1, 't': 1, 'b': 1, 'c.id': 1} messages = (collection.find_and_modify(query, - fields=fields, + projection=projection, remove=True) for _ in range(limit)) diff --git a/zaqar/storage/mongodb/pools.py b/zaqar/storage/mongodb/pools.py index 38c39084a..0a11bcff0 100644 --- a/zaqar/storage/mongodb/pools.py +++ b/zaqar/storage/mongodb/pools.py @@ -68,7 +68,7 @@ class PoolsController(base.PoolsBase): if marker is not None: query['n'] = {'$gt': marker} - cursor = self._col.find(query, fields=_field_spec(detailed), + cursor = self._col.find(query, projection=_field_spec(detailed), limit=limit).sort('n') marker_name = {} @@ -90,7 +90,7 @@ class PoolsController(base.PoolsBase): @utils.raises_conn_error def _get_group(self, group=None, detailed=False): - cursor = self._col.find({'g': group}, fields=_field_spec(detailed)) + cursor = self._col.find({'g': group}, projection=_field_spec(detailed)) normalizer = functools.partial(_normalize, detailed=detailed) return utils.HookedCursor(cursor, normalizer) diff --git a/zaqar/storage/mongodb/queues.py b/zaqar/storage/mongodb/queues.py index 77aef65a7..eb5dc801d 100644 --- a/zaqar/storage/mongodb/queues.py +++ b/zaqar/storage/mongodb/queues.py @@ -123,7 +123,7 @@ class QueueController(storage.Queue): """ doc = self._collection.find_one(_get_scoped_query(name, project), - fields={'c.v': 1, '_id': 0}) + projection={'c.v': 1, '_id': 0}) if doc is None: raise errors.QueueDoesNotExist(name, project) @@ -157,7 +157,7 @@ class QueueController(storage.Queue): while True: try: doc = self._collection.find_and_modify( - query, update, new=True, fields={'c.v': 1, '_id': 0}) + query, update, new=True, projection={'c.v': 1, '_id': 0}) break except pymongo.errors.AutoReconnect as ex: @@ -199,11 +199,11 @@ class QueueController(storage.Queue): query = utils.scoped_query(marker, project) - fields = {'p_q': 1, '_id': 0} + projection = {'p_q': 1, '_id': 0} if detailed: - fields['m'] = 1 + projection['m'] = 1 - cursor = self._collection.find(query, fields=fields) + cursor = self._collection.find(query, projection=projection) cursor = cursor.limit(limit).sort('p_q') marker_name = {} @@ -221,7 +221,7 @@ class QueueController(storage.Queue): @utils.retries_on_autoreconnect def get_metadata(self, name, project=None): queue = self._collection.find_one(_get_scoped_query(name, project), - fields={'m': 1, '_id': 0}) + projection={'m': 1, '_id': 0}) if queue is None: raise errors.QueueDoesNotExist(name, project) diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index ecc2f7014..665a7fc3b 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -58,9 +58,9 @@ class SubscriptionController(base.Subscription): if marker is not None: query['_id'] = {'$gt': utils.to_oid(marker)} - fields = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1} + projection = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1} - cursor = self._collection.find(query, fields=fields) + cursor = self._collection.find(query, projection=projection) cursor = cursor.limit(limit).sort('_id') marker_name = {} @@ -96,9 +96,9 @@ class SubscriptionController(base.Subscription): ttl = int(ttl) expires = now + ttl source_query = {'p_q': utils.scope_queue_name(source, project)} - target_source = self._queue_collection.find_one(source_query, - fields={'m': 1, - '_id': 0}) + target_source = self._queue_collection.find_one( + source_query, projection={'m': 1, '_id': 0}) + if target_source is None: raise errors.QueueDoesNotExist(target_source, project) try: diff --git a/zaqar/tests/unit/storage/test_impl_mongodb.py b/zaqar/tests/unit/storage/test_impl_mongodb.py index 12f55ce82..db6088a9d 100644 --- a/zaqar/tests/unit/storage/test_impl_mongodb.py +++ b/zaqar/tests/unit/storage/test_impl_mongodb.py @@ -201,16 +201,24 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase): with mock.patch('pymongo.MongoClient.nodes') as nodes: nodes.__get__ = mock.Mock(return_value=['node1', 'node2']) - mongodb.DataDriver(self.conf, cache, - mongodb.ControlDriver(self.conf, cache)) + + with mock.patch('pymongo.MongoClient.write_concern') as wc: + write_concern = pymongo.WriteConcern(w=2) + wc.__get__ = mock.Mock(return_value=write_concern) + mongodb.DataDriver(self.conf, cache, + mongodb.ControlDriver(self.conf, cache)) def test_using_mongos(self): cache = oslo_cache.get_cache() with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos: is_mongos.__get__ = mock.Mock(return_value=True) - mongodb.DataDriver(self.conf, cache, - mongodb.ControlDriver(self.conf, cache)) + + with mock.patch('pymongo.MongoClient.write_concern') as wc: + write_concern = pymongo.WriteConcern(w=2) + wc.__get__ = mock.Mock(return_value=write_concern) + mongodb.DataDriver(self.conf, cache, + mongodb.ControlDriver(self.conf, cache)) def test_write_concern_check_works(self): cache = oslo_cache.get_cache() @@ -219,12 +227,14 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase): is_mongos.__get__ = mock.Mock(return_value=True) with mock.patch('pymongo.MongoClient.write_concern') as wc: - wc.__get__ = mock.Mock(return_value={'w': 1}) + write_concern = pymongo.WriteConcern(w=1) + wc.__get__ = mock.Mock(return_value=write_concern) self.assertRaises(RuntimeError, mongodb.DataDriver, self.conf, cache, mongodb.ControlDriver(self.conf, cache)) - wc.__get__ = mock.Mock(return_value={'w': 2}) + write_concern = pymongo.WriteConcern(w=2) + wc.__get__ = mock.Mock(return_value=write_concern) mongodb.DataDriver(self.conf, cache, mongodb.ControlDriver(self.conf, cache)) @@ -233,12 +243,18 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase): with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos: is_mongos.__get__ = mock.Mock(return_value=True) + self.config(unreliable=True) driver = mongodb.DataDriver(self.conf, cache, mongodb.ControlDriver (self.conf, cache)) - wc = driver.connection.write_concern - self.assertEqual(wc['w'], 'majority') - self.assertEqual(wc['j'], False) + + driver.server_version = (2, 6) + + for db in driver.message_databases: + wc = db.write_concern + + self.assertEqual(wc.document['w'], 'majority') + self.assertEqual(wc.document['j'], False) @testing.requires_mongodb @@ -352,7 +368,7 @@ class MongodbMessageTests(MongodbSetupMixin, base.MessageControllerTest): @testing.requires_mongodb class MongodbFIFOMessageTests(MongodbSetupMixin, base.MessageControllerTest): - driver_class = mongodb.DataDriver + driver_class = mongodb.FIFODataDriver config_file = 'wsgi_fifo_mongodb.conf' controller_class = controllers.FIFOMessageController control_driver_class = mongodb.ControlDriver