Add support for pymongo 3.0.2

Change-Id: I3d1da5772ee56d8078ae1e568ca07a656893c33f
Closes-bug: #1441393
This commit is contained in:
Flavio Percoco 2015-06-15 09:45:11 +00:00 committed by Flavio Percoco
parent 14e66fa8fe
commit e8f19a7572
12 changed files with 93 additions and 67 deletions

View File

@ -11,7 +11,7 @@ iso8601>=0.1.9
keystonemiddleware>=1.5.0
msgpack-python>=0.4.0
posix_ipc
pymongo>=2.6.3,<3.0
pymongo>=3.0.2
# python-memcached has no Py3k support for now
# python-memcached>=1.48
WebOb>=1.2.3

View File

@ -9,7 +9,7 @@ mock>=1.0
# Backends
redis>=2.10.0
pymongo>=2.6.3,<3.0
pymongo>=3.0.2
# Unit testing
coverage>=3.6

View File

@ -61,3 +61,4 @@ from zaqar.storage.mongodb import driver
# Hoist classes into package namespace
ControlDriver = driver.ControlDriver
DataDriver = driver.DataDriver
FIFODataDriver = driver.FIFODataDriver

View File

@ -68,7 +68,7 @@ class CatalogueController(base.CatalogueBase):
fields = {'_id': 0}
key = utils.scope_queue_name(queue, project)
entry = self._col.find_one({PRIMARY_KEY: key},
fields=fields)
projection=fields)
if entry is None:
raise errors.QueueNotMapped(queue, project)

View File

@ -147,7 +147,7 @@ class ClaimController(storage.Claim):
# Get a list of active, not claimed nor expired
# messages that could be claimed.
msgs = msg_ctrl._active(queue, fields={'_id': 1}, project=project,
msgs = msg_ctrl._active(queue, projection={'_id': 1}, project=project,
limit=limit)
messages = iter([])

View File

@ -84,35 +84,30 @@ class DataDriver(storage.DataDriverBase):
self.mongodb_conf = self.conf[options.MESSAGE_MONGODB_GROUP]
conn = self.connection
server_version = conn.server_info()['version']
server_info = conn.server_info()['version']
self.server_version = tuple(map(int, server_info.split('.')))
if tuple(map(int, server_version.split('.'))) < (2, 2):
raise RuntimeError(_('The mongodb driver requires mongodb>=2.2, '
'%s found') % server_version)
if self.server_version < (2, 2):
raise RuntimeError(_('The mongodb driver requires mongodb>=2.2, '
'%s found') % server_info)
if not len(conn.nodes) > 1 and not conn.is_mongos:
if not self.conf.unreliable:
raise RuntimeError(_('Either a replica set or a mongos is '
'required to guarantee message delivery'))
else:
wc = conn.write_concern.get('w')
majority = (wc == 'majority' or
wc >= 2)
if not wc:
# NOTE(flaper87): No write concern specified, use majority
# and don't count journal as a replica. Use `update` to avoid
# overwriting `wtimeout`
conn.write_concern.update({'w': 'majority'})
elif not self.conf.unreliable and not majority:
_mongo_wc = conn.write_concern.document.get('w')
durable = (_mongo_wc == 'majority' or
_mongo_wc >= 2)
if not self.conf.unreliable and not durable:
raise RuntimeError(_('Using a write concern other than '
'`majority` or > 2 makes the service '
'unreliable. Please use a different '
'write concern or set `unreliable` '
'to True in the config file.'))
conn.write_concern['j'] = False
# FIXME(flaper87): Make this dynamic
self._capabilities = self.BASE_CAPABILITIES
@ -150,18 +145,24 @@ class DataDriver(storage.DataDriverBase):
def message_databases(self):
"""List of message databases, ordered by partition number."""
kwargs = {}
if not self.server_version < (2, 6):
# NOTE(flaper87): Skip mongodb versions below 2.6 when
# setting the write concern on the database. pymongo 3.0
# fails with norepl when creating indexes.
doc = self.connection.write_concern.document.copy()
doc.setdefault('w', 'majority')
doc.setdefault('j', False)
kwargs['write_concern'] = pymongo.WriteConcern(**doc)
name = self.mongodb_conf.database
partitions = self.mongodb_conf.partitions
# NOTE(kgriffs): Partition names are zero-based, and
# the list is ordered by partition, which means that a
# caller can, e.g., get zaqar_mp0 by simply indexing
# the first element in the list of databases:
#
# self.driver.message_databases[0]
#
return [self.connection[name + self._COL_SUFIX + str(p)]
for p in range(partitions)]
databases = []
for p in range(partitions):
db_name = name + self._COL_SUFIX + str(p)
databases.append(self.connection.get_database(db_name, **kwargs))
return databases
@decorators.lazy_property(write=False)
def subscriptions_database(self):

View File

@ -63,7 +63,7 @@ class FlavorsController(base.FlavorsBase):
@utils.raises_conn_error
def _list_by_pool(self, pool, limit=10, detailed=False):
query = {'s': pool}
cursor = self._col.find(query, fields=_field_spec(detailed),
cursor = self._col.find(query, projection=_field_spec(detailed),
limit=limit).sort('n', 1)
normalizer = functools.partial(_normalize, detailed=detailed)
@ -75,7 +75,7 @@ class FlavorsController(base.FlavorsBase):
if marker is not None:
query['n'] = {'$gt': marker}
cursor = self._col.find(query, fields=_field_spec(detailed),
cursor = self._col.find(query, projection=_field_spec(detailed),
limit=limit).sort('n', 1)
marker_name = {}

View File

@ -229,7 +229,7 @@ class MessageController(storage.Message):
collection.remove({PROJ_QUEUE: scope}, w=0)
def _list(self, queue_name, project=None, marker=None,
echo=False, client_uuid=None, fields=None,
echo=False, client_uuid=None, projection=None,
include_claimed=False, sort=1, limit=None):
"""Message document listing helper.
@ -290,7 +290,9 @@ class MessageController(storage.Message):
query['c.e'] = {'$lte': now}
# Construct the request
cursor = collection.find(query, fields=fields, sort=[('k', sort)])
cursor = collection.find(query,
projection=projection,
sort=[('k', sort)])
if limit is not None:
cursor.limit(limit)
@ -331,15 +333,15 @@ class MessageController(storage.Message):
query['c.e'] = {'$lte': timeutils.utcnow_ts()}
collection = self._collection(queue_name, project)
return collection.find(query).hint(COUNTING_INDEX_FIELDS).count()
return collection.count(filter=query, hint=COUNTING_INDEX_FIELDS)
def _active(self, queue_name, marker=None, echo=False,
client_uuid=None, fields=None, project=None,
client_uuid=None, projection=None, project=None,
limit=None):
return self._list(queue_name, project=project, marker=marker,
echo=echo, client_uuid=client_uuid,
fields=fields, include_claimed=False,
projection=projection, include_claimed=False,
limit=limit)
def _claimed(self, queue_name, claim_id,
@ -354,14 +356,17 @@ class MessageController(storage.Message):
'c.e': {'$gt': expires or timeutils.utcnow_ts()},
}
kwargs = {}
collection = self._collection(queue_name, project)
# NOTE(kgriffs): Claimed messages bust be queried from
# the primary to avoid a race condition caused by the
# multi-phased "create claim" algorithm.
preference = pymongo.read_preferences.ReadPreference.PRIMARY
collection = self._collection(queue_name, project)
msgs = collection.find(query, sort=[('k', 1)],
read_preference=preference).hint(
CLAIMED_INDEX_FIELDS)
# NOTE(flaper87): In pymongo 3.0 PRIMARY is the default and
# `read_preference` is read only. We'd need to set it when the
# client is created.
msgs = collection.find(query, sort=[('k', 1)], **kwargs).hint(
CLAIMED_INDEX_FIELDS)
if limit is not None:
msgs = msgs.limit(limit)
@ -564,11 +569,14 @@ class MessageController(storage.Message):
else:
if message['c']['id'] != cid:
kwargs = {}
# NOTE(flaper87): In pymongo 3.0 PRIMARY is the default and
# `read_preference` is read only. We'd need to set it when the
# client is created.
# NOTE(kgriffs): Read from primary in case the message
# was just barely claimed, and claim hasn't made it to
# the secondary.
pref = pymongo.read_preferences.ReadPreference.PRIMARY
message = collection.find_one(query, read_preference=pref)
message = collection.find_one(query, **kwargs)
if message['c']['id'] != cid:
if _is_claimed(message, now):
@ -603,10 +611,10 @@ class MessageController(storage.Message):
query['c.e'] = {'$lte': now}
collection = self._collection(queue_name, project)
fields = {'_id': 1, 't': 1, 'b': 1, 'c.id': 1}
projection = {'_id': 1, 't': 1, 'b': 1, 'c.id': 1}
messages = (collection.find_and_modify(query,
fields=fields,
projection=projection,
remove=True)
for _ in range(limit))

View File

@ -68,7 +68,7 @@ class PoolsController(base.PoolsBase):
if marker is not None:
query['n'] = {'$gt': marker}
cursor = self._col.find(query, fields=_field_spec(detailed),
cursor = self._col.find(query, projection=_field_spec(detailed),
limit=limit).sort('n')
marker_name = {}
@ -90,7 +90,7 @@ class PoolsController(base.PoolsBase):
@utils.raises_conn_error
def _get_group(self, group=None, detailed=False):
cursor = self._col.find({'g': group}, fields=_field_spec(detailed))
cursor = self._col.find({'g': group}, projection=_field_spec(detailed))
normalizer = functools.partial(_normalize, detailed=detailed)
return utils.HookedCursor(cursor, normalizer)

View File

@ -123,7 +123,7 @@ class QueueController(storage.Queue):
"""
doc = self._collection.find_one(_get_scoped_query(name, project),
fields={'c.v': 1, '_id': 0})
projection={'c.v': 1, '_id': 0})
if doc is None:
raise errors.QueueDoesNotExist(name, project)
@ -157,7 +157,7 @@ class QueueController(storage.Queue):
while True:
try:
doc = self._collection.find_and_modify(
query, update, new=True, fields={'c.v': 1, '_id': 0})
query, update, new=True, projection={'c.v': 1, '_id': 0})
break
except pymongo.errors.AutoReconnect as ex:
@ -199,11 +199,11 @@ class QueueController(storage.Queue):
query = utils.scoped_query(marker, project)
fields = {'p_q': 1, '_id': 0}
projection = {'p_q': 1, '_id': 0}
if detailed:
fields['m'] = 1
projection['m'] = 1
cursor = self._collection.find(query, fields=fields)
cursor = self._collection.find(query, projection=projection)
cursor = cursor.limit(limit).sort('p_q')
marker_name = {}
@ -221,7 +221,7 @@ class QueueController(storage.Queue):
@utils.retries_on_autoreconnect
def get_metadata(self, name, project=None):
queue = self._collection.find_one(_get_scoped_query(name, project),
fields={'m': 1, '_id': 0})
projection={'m': 1, '_id': 0})
if queue is None:
raise errors.QueueDoesNotExist(name, project)

View File

@ -58,9 +58,9 @@ class SubscriptionController(base.Subscription):
if marker is not None:
query['_id'] = {'$gt': utils.to_oid(marker)}
fields = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1}
projection = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1}
cursor = self._collection.find(query, fields=fields)
cursor = self._collection.find(query, projection=projection)
cursor = cursor.limit(limit).sort('_id')
marker_name = {}
@ -96,9 +96,9 @@ class SubscriptionController(base.Subscription):
ttl = int(ttl)
expires = now + ttl
source_query = {'p_q': utils.scope_queue_name(source, project)}
target_source = self._queue_collection.find_one(source_query,
fields={'m': 1,
'_id': 0})
target_source = self._queue_collection.find_one(
source_query, projection={'m': 1, '_id': 0})
if target_source is None:
raise errors.QueueDoesNotExist(target_source, project)
try:

View File

@ -201,16 +201,24 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
with mock.patch('pymongo.MongoClient.nodes') as nodes:
nodes.__get__ = mock.Mock(return_value=['node1', 'node2'])
mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
with mock.patch('pymongo.MongoClient.write_concern') as wc:
write_concern = pymongo.WriteConcern(w=2)
wc.__get__ = mock.Mock(return_value=write_concern)
mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
def test_using_mongos(self):
cache = oslo_cache.get_cache()
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
is_mongos.__get__ = mock.Mock(return_value=True)
mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
with mock.patch('pymongo.MongoClient.write_concern') as wc:
write_concern = pymongo.WriteConcern(w=2)
wc.__get__ = mock.Mock(return_value=write_concern)
mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
def test_write_concern_check_works(self):
cache = oslo_cache.get_cache()
@ -219,12 +227,14 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
is_mongos.__get__ = mock.Mock(return_value=True)
with mock.patch('pymongo.MongoClient.write_concern') as wc:
wc.__get__ = mock.Mock(return_value={'w': 1})
write_concern = pymongo.WriteConcern(w=1)
wc.__get__ = mock.Mock(return_value=write_concern)
self.assertRaises(RuntimeError, mongodb.DataDriver,
self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
wc.__get__ = mock.Mock(return_value={'w': 2})
write_concern = pymongo.WriteConcern(w=2)
wc.__get__ = mock.Mock(return_value=write_concern)
mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
@ -233,12 +243,18 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
is_mongos.__get__ = mock.Mock(return_value=True)
self.config(unreliable=True)
driver = mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver
(self.conf, cache))
wc = driver.connection.write_concern
self.assertEqual(wc['w'], 'majority')
self.assertEqual(wc['j'], False)
driver.server_version = (2, 6)
for db in driver.message_databases:
wc = db.write_concern
self.assertEqual(wc.document['w'], 'majority')
self.assertEqual(wc.document['j'], False)
@testing.requires_mongodb
@ -352,7 +368,7 @@ class MongodbMessageTests(MongodbSetupMixin, base.MessageControllerTest):
@testing.requires_mongodb
class MongodbFIFOMessageTests(MongodbSetupMixin, base.MessageControllerTest):
driver_class = mongodb.DataDriver
driver_class = mongodb.FIFODataDriver
config_file = 'wsgi_fifo_mongodb.conf'
controller_class = controllers.FIFOMessageController
control_driver_class = mongodb.ControlDriver