Merge "[MongoDB] Fix bug with reconnection to new master node "
This commit is contained in:
commit
77bf05d78f
@ -73,5 +73,5 @@ class Connection(pymongo_base.Connection):
|
||||
# not been implemented. However calling this method is important for
|
||||
# removal of all the empty dbs created during the test runs since
|
||||
# test run is against mongodb on Jenkins
|
||||
self.conn.drop_database(self.db)
|
||||
self.conn.drop_database(self.db.name)
|
||||
self.conn.close()
|
||||
|
@ -63,6 +63,6 @@ class Connection(pymongo_base.Connection):
|
||||
self.upgrade()
|
||||
|
||||
def clear(self):
|
||||
self.conn.drop_database(self.db)
|
||||
self.conn.drop_database(self.db.name)
|
||||
# Connection will be reopened automatically if needed
|
||||
self.conn.close()
|
||||
|
@ -60,5 +60,5 @@ class Connection(pymongo_base.Connection):
|
||||
# not been implemented. However calling this method is important for
|
||||
# removal of all the empty dbs created during the test runs since
|
||||
# test run is against mongodb on Jenkins
|
||||
self.conn.drop_database(self.db)
|
||||
self.conn.drop_database(self.db.name)
|
||||
self.conn.close()
|
||||
|
@ -47,6 +47,6 @@ class Connection(pymongo_base.Connection):
|
||||
self.upgrade()
|
||||
|
||||
def clear(self):
|
||||
self.conn.drop_database(self.db)
|
||||
self.conn.drop_database(self.db.name)
|
||||
# Connection will be reopened automatically if needed
|
||||
self.conn.close()
|
||||
|
@ -58,6 +58,10 @@ OPTS = [
|
||||
default=None,
|
||||
help='The connection string used to connect to the event '
|
||||
'database. (if unset, connection is used)'),
|
||||
cfg.StrOpt('mongodb_replica_set',
|
||||
default='',
|
||||
help="The connection string used to connect to mongo database, "
|
||||
"if mongodb replica set was chosen."),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(OPTS, group='database')
|
||||
|
@ -198,7 +198,7 @@ class Connection(pymongo_base.Connection):
|
||||
# not been implemented. However calling this method is important for
|
||||
# removal of all the empty dbs created during the test runs since
|
||||
# test run is against mongodb on Jenkins
|
||||
self.conn.drop_database(self.db)
|
||||
self.conn.drop_database(self.db.name)
|
||||
self.conn.close()
|
||||
|
||||
def record_metering_data(self, data):
|
||||
|
@ -470,7 +470,7 @@ class Connection(pymongo_base.Connection):
|
||||
)
|
||||
|
||||
def clear(self):
|
||||
self.conn.drop_database(self.db)
|
||||
self.conn.drop_database(self.db.name)
|
||||
# Connection will be reopened automatically if needed
|
||||
self.conn.close()
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
"""Common functions for MongoDB and DB2 backends
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo.utils import netutils
|
||||
@ -178,7 +179,16 @@ class ConnectionPool(object):
|
||||
@staticmethod
|
||||
def _mongo_connect(url):
|
||||
try:
|
||||
return pymongo.MongoClient(url, safe=True)
|
||||
if cfg.CONF.database.mongodb_replica_set:
|
||||
client = MongoProxy(
|
||||
Prefection(
|
||||
pymongo.MongoReplicaSetClient(
|
||||
url,
|
||||
replicaSet=cfg.CONF.database.mongodb_replica_set)))
|
||||
else:
|
||||
client = MongoProxy(
|
||||
Prefection(pymongo.MongoClient(url, safe=True)))
|
||||
return client
|
||||
except pymongo.errors.ConnectionFailure as e:
|
||||
LOG.warn(_('Unable to connect to the database server: '
|
||||
'%(errmsg)s.') % {'errmsg': e})
|
||||
@ -305,3 +315,89 @@ class QueryTransformer(object):
|
||||
return self._handle_not_op(negated_tree)
|
||||
|
||||
return self._handle_simple_op(operator_node, nodes)
|
||||
|
||||
|
||||
def safe_mongo_call(call):
|
||||
def closure(*args, **kwargs):
|
||||
max_retries = cfg.CONF.database.max_retries
|
||||
retry_interval = cfg.CONF.database.retry_interval
|
||||
attempts = 0
|
||||
while True:
|
||||
try:
|
||||
return call(*args, **kwargs)
|
||||
except pymongo.errors.AutoReconnect as err:
|
||||
if 0 <= max_retries <= attempts:
|
||||
LOG.error(_('Unable to reconnect to the primary mongodb '
|
||||
'after %(retries)d retries. Giving up.') %
|
||||
{'retries': max_retries})
|
||||
raise
|
||||
LOG.warn(_('Unable to reconnect to the primary mongodb: '
|
||||
'%(errmsg)s. Trying again in %(retry_interval)d '
|
||||
'seconds.') %
|
||||
{'errmsg': err, 'retry_interval': retry_interval})
|
||||
attempts += 1
|
||||
time.sleep(retry_interval)
|
||||
return closure
|
||||
|
||||
|
||||
class MongoConn(object):
|
||||
def __init__(self, method):
|
||||
self.method = method
|
||||
|
||||
@safe_mongo_call
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.method(*args, **kwargs)
|
||||
|
||||
MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection)
|
||||
if not typ.startswith('_')])
|
||||
MONGO_METHODS.update(set([typ for typ in dir(pymongo.MongoClient)
|
||||
if not typ.startswith('_')]))
|
||||
MONGO_METHODS.update(set([typ for typ in dir(pymongo)
|
||||
if not typ.startswith('_')]))
|
||||
|
||||
|
||||
class MongoProxy(object):
|
||||
def __init__(self, conn):
|
||||
self.conn = conn
|
||||
|
||||
def __getitem__(self, item):
|
||||
"""Create and return proxy around the method in the connection.
|
||||
|
||||
:param item: name of the connection
|
||||
"""
|
||||
return MongoProxy(self.conn[item])
|
||||
|
||||
def __getattr__(self, item):
|
||||
"""Wrap MongoDB connection.
|
||||
|
||||
If item is the name of an executable method, for example find or
|
||||
insert, wrap this method in the MongoConn.
|
||||
Else wrap getting attribute with MongoProxy.
|
||||
"""
|
||||
if item == 'name':
|
||||
return getattr(self.conn, item)
|
||||
if item in MONGO_METHODS:
|
||||
return MongoConn(getattr(self.conn, item))
|
||||
return MongoProxy(getattr(self.conn, item))
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.conn(*args, **kwargs)
|
||||
|
||||
|
||||
class Prefection(pymongo.collection.Collection):
|
||||
def __init__(self, conn):
|
||||
self.conn = conn
|
||||
|
||||
def find(self, *args, **kwargs):
|
||||
# We need this modifying method to check a connection for MongoDB
|
||||
# in context of MongoProxy approach. Initially 'find' returns Cursor
|
||||
# object and doesn't connect to db while Cursor is not used.
|
||||
found = self.find(*args, **kwargs)
|
||||
try:
|
||||
found[0]
|
||||
except IndexError:
|
||||
pass
|
||||
return found
|
||||
|
||||
def __getattr__(self, item):
|
||||
return getattr(self.conn, item)
|
@ -23,7 +23,9 @@ import datetime
|
||||
import operator
|
||||
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
from oslo.utils import timeutils
|
||||
import pymongo
|
||||
|
||||
import ceilometer
|
||||
from ceilometer.alarm.storage import models as alarm_models
|
||||
@ -3101,3 +3103,89 @@ class BigIntegerTest(tests_db.TestBase,
|
||||
msg = utils.meter_message_from_counter(
|
||||
s, self.CONF.publisher.metering_secret)
|
||||
self.conn.record_metering_data(msg)
|
||||
|
||||
|
||||
class MongoAutoReconnectTest(DBTestBase,
|
||||
tests_db.MixinTestsWithBackendScenarios):
|
||||
cfg.CONF.set_override('retry_interval', 1, group='database')
|
||||
|
||||
@tests_db.run_with('mongodb')
|
||||
def test_mongo_client(self):
|
||||
if cfg.CONF.database.mongodb_replica_set:
|
||||
self.assertIsInstance(self.conn.conn.conn.conn,
|
||||
pymongo.MongoReplicaSetClient)
|
||||
else:
|
||||
self.assertIsInstance(self.conn.conn.conn.conn,
|
||||
pymongo.MongoClient)
|
||||
|
||||
@staticmethod
|
||||
def create_side_effect(method, test_exception):
|
||||
def side_effect(*args, **kwargs):
|
||||
if test_exception.pop():
|
||||
raise pymongo.errors.AutoReconnect
|
||||
else:
|
||||
return method(*args, **kwargs)
|
||||
return side_effect
|
||||
|
||||
@tests_db.run_with('mongodb')
|
||||
def test_mongo_find(self):
|
||||
raise_exc = [False, True]
|
||||
method = self.conn.db.resource.find
|
||||
|
||||
with mock.patch('pymongo.collection.Collection.find',
|
||||
mock.Mock()) as mock_find:
|
||||
mock_find.side_effect = self.create_side_effect(method, raise_exc)
|
||||
mock_find.__name__ = 'find'
|
||||
resources = list(self.conn.get_resources())
|
||||
self.assertEqual(9, len(resources))
|
||||
|
||||
@tests_db.run_with('mongodb')
|
||||
def test_mongo_insert(self):
|
||||
raise_exc = [False, True]
|
||||
method = self.conn.db.meter.insert
|
||||
|
||||
with mock.patch('pymongo.collection.Collection.insert',
|
||||
mock.Mock(return_value=method)) as mock_insert:
|
||||
mock_insert.side_effect = self.create_side_effect(method,
|
||||
raise_exc)
|
||||
mock_insert.__name__ = 'insert'
|
||||
self.create_and_store_sample(
|
||||
timestamp=datetime.datetime(2014, 10, 15, 14, 39),
|
||||
source='test-proxy')
|
||||
meters = list(self.conn.db.meter.find())
|
||||
self.assertEqual(12, len(meters))
|
||||
|
||||
@tests_db.run_with('mongodb')
|
||||
def test_mongo_find_and_modify(self):
|
||||
raise_exc = [False, True]
|
||||
method = self.conn.db.resource.find_and_modify
|
||||
|
||||
with mock.patch('pymongo.collection.Collection.find_and_modify',
|
||||
mock.Mock()) as mock_fam:
|
||||
mock_fam.side_effect = self.create_side_effect(method, raise_exc)
|
||||
mock_fam.__name__ = 'find_and_modify'
|
||||
self.create_and_store_sample(
|
||||
timestamp=datetime.datetime(2014, 10, 15, 14, 39),
|
||||
source='test-proxy')
|
||||
data = self.conn.db.resource.find(
|
||||
{'last_sample_timestamp':
|
||||
datetime.datetime(2014, 10, 15, 14, 39)})[0]['source']
|
||||
self.assertEqual('test-proxy', data)
|
||||
|
||||
@tests_db.run_with('mongodb')
|
||||
def test_mongo_update(self):
|
||||
raise_exc = [False, True]
|
||||
method = self.conn.db.resource.update
|
||||
|
||||
with mock.patch('pymongo.collection.Collection.update',
|
||||
mock.Mock()) as mock_update:
|
||||
mock_update.side_effect = self.create_side_effect(method,
|
||||
raise_exc)
|
||||
mock_update.__name__ = 'update'
|
||||
self.create_and_store_sample(
|
||||
timestamp=datetime.datetime(2014, 10, 15, 17, 39),
|
||||
source='test-proxy-update')
|
||||
data = self.conn.db.resource.find(
|
||||
{'last_sample_timestamp':
|
||||
datetime.datetime(2014, 10, 15, 17, 39)})[0]['source']
|
||||
self.assertEqual('test-proxy-update', data)
|
||||
|
Loading…
Reference in New Issue
Block a user