diff --git a/ceilometer/alarm/storage/impl_db2.py b/ceilometer/alarm/storage/impl_db2.py index 9ca37f20..92db547f 100644 --- a/ceilometer/alarm/storage/impl_db2.py +++ b/ceilometer/alarm/storage/impl_db2.py @@ -73,5 +73,5 @@ class Connection(pymongo_base.Connection): # not been implemented. However calling this method is important for # removal of all the empty dbs created during the test runs since # test run is against mongodb on Jenkins - self.conn.drop_database(self.db) + self.conn.drop_database(self.db.name) self.conn.close() diff --git a/ceilometer/alarm/storage/impl_mongodb.py b/ceilometer/alarm/storage/impl_mongodb.py index 19fff006..60c0ca4f 100644 --- a/ceilometer/alarm/storage/impl_mongodb.py +++ b/ceilometer/alarm/storage/impl_mongodb.py @@ -63,6 +63,6 @@ class Connection(pymongo_base.Connection): self.upgrade() def clear(self): - self.conn.drop_database(self.db) + self.conn.drop_database(self.db.name) # Connection will be reopened automatically if needed self.conn.close() diff --git a/ceilometer/event/storage/impl_db2.py b/ceilometer/event/storage/impl_db2.py index 8a1231da..a2a83f71 100644 --- a/ceilometer/event/storage/impl_db2.py +++ b/ceilometer/event/storage/impl_db2.py @@ -60,5 +60,5 @@ class Connection(pymongo_base.Connection): # not been implemented. However calling this method is important for # removal of all the empty dbs created during the test runs since # test run is against mongodb on Jenkins - self.conn.drop_database(self.db) + self.conn.drop_database(self.db.name) self.conn.close() diff --git a/ceilometer/event/storage/impl_mongodb.py b/ceilometer/event/storage/impl_mongodb.py index bba96f9d..20e7a066 100644 --- a/ceilometer/event/storage/impl_mongodb.py +++ b/ceilometer/event/storage/impl_mongodb.py @@ -47,6 +47,6 @@ class Connection(pymongo_base.Connection): self.upgrade() def clear(self): - self.conn.drop_database(self.db) + self.conn.drop_database(self.db.name) # Connection will be reopened automatically if needed self.conn.close() diff --git a/ceilometer/storage/__init__.py b/ceilometer/storage/__init__.py index 384384f2..320fa9e3 100644 --- a/ceilometer/storage/__init__.py +++ b/ceilometer/storage/__init__.py @@ -58,6 +58,10 @@ OPTS = [ default=None, help='The connection string used to connect to the event ' 'database. (if unset, connection is used)'), + cfg.StrOpt('mongodb_replica_set', + default='', + help="The connection string used to connect to mongo database, " + "if mongodb replica set was chosen."), ] cfg.CONF.register_opts(OPTS, group='database') diff --git a/ceilometer/storage/impl_db2.py b/ceilometer/storage/impl_db2.py index 1b1e5127..955db0e5 100644 --- a/ceilometer/storage/impl_db2.py +++ b/ceilometer/storage/impl_db2.py @@ -198,7 +198,7 @@ class Connection(pymongo_base.Connection): # not been implemented. However calling this method is important for # removal of all the empty dbs created during the test runs since # test run is against mongodb on Jenkins - self.conn.drop_database(self.db) + self.conn.drop_database(self.db.name) self.conn.close() def record_metering_data(self, data): diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index b9ce7a96..490eb52a 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -470,7 +470,7 @@ class Connection(pymongo_base.Connection): ) def clear(self): - self.conn.drop_database(self.db) + self.conn.drop_database(self.db.name) # Connection will be reopened automatically if needed self.conn.close() diff --git a/ceilometer/storage/mongo/utils.py b/ceilometer/storage/mongo/utils.py index ae94bdab..e0797123 100644 --- a/ceilometer/storage/mongo/utils.py +++ b/ceilometer/storage/mongo/utils.py @@ -18,6 +18,7 @@ """Common functions for MongoDB and DB2 backends """ +import time from oslo.config import cfg from oslo.utils import netutils @@ -178,7 +179,16 @@ class ConnectionPool(object): @staticmethod def _mongo_connect(url): try: - return pymongo.MongoClient(url, safe=True) + if cfg.CONF.database.mongodb_replica_set: + client = MongoProxy( + Prefection( + pymongo.MongoReplicaSetClient( + url, + replicaSet=cfg.CONF.database.mongodb_replica_set))) + else: + client = MongoProxy( + Prefection(pymongo.MongoClient(url, safe=True))) + return client except pymongo.errors.ConnectionFailure as e: LOG.warn(_('Unable to connect to the database server: ' '%(errmsg)s.') % {'errmsg': e}) @@ -305,3 +315,89 @@ class QueryTransformer(object): return self._handle_not_op(negated_tree) return self._handle_simple_op(operator_node, nodes) + + +def safe_mongo_call(call): + def closure(*args, **kwargs): + max_retries = cfg.CONF.database.max_retries + retry_interval = cfg.CONF.database.retry_interval + attempts = 0 + while True: + try: + return call(*args, **kwargs) + except pymongo.errors.AutoReconnect as err: + if 0 <= max_retries <= attempts: + LOG.error(_('Unable to reconnect to the primary mongodb ' + 'after %(retries)d retries. Giving up.') % + {'retries': max_retries}) + raise + LOG.warn(_('Unable to reconnect to the primary mongodb: ' + '%(errmsg)s. Trying again in %(retry_interval)d ' + 'seconds.') % + {'errmsg': err, 'retry_interval': retry_interval}) + attempts += 1 + time.sleep(retry_interval) + return closure + + +class MongoConn(object): + def __init__(self, method): + self.method = method + + @safe_mongo_call + def __call__(self, *args, **kwargs): + return self.method(*args, **kwargs) + +MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection) + if not typ.startswith('_')]) +MONGO_METHODS.update(set([typ for typ in dir(pymongo.MongoClient) + if not typ.startswith('_')])) +MONGO_METHODS.update(set([typ for typ in dir(pymongo) + if not typ.startswith('_')])) + + +class MongoProxy(object): + def __init__(self, conn): + self.conn = conn + + def __getitem__(self, item): + """Create and return proxy around the method in the connection. + + :param item: name of the connection + """ + return MongoProxy(self.conn[item]) + + def __getattr__(self, item): + """Wrap MongoDB connection. + + If item is the name of an executable method, for example find or + insert, wrap this method in the MongoConn. + Else wrap getting attribute with MongoProxy. + """ + if item == 'name': + return getattr(self.conn, item) + if item in MONGO_METHODS: + return MongoConn(getattr(self.conn, item)) + return MongoProxy(getattr(self.conn, item)) + + def __call__(self, *args, **kwargs): + return self.conn(*args, **kwargs) + + +class Prefection(pymongo.collection.Collection): + def __init__(self, conn): + self.conn = conn + + def find(self, *args, **kwargs): + # We need this modifying method to check a connection for MongoDB + # in context of MongoProxy approach. Initially 'find' returns Cursor + # object and doesn't connect to db while Cursor is not used. + found = self.find(*args, **kwargs) + try: + found[0] + except IndexError: + pass + return found + + def __getattr__(self, item): + return getattr(self.conn, item) \ No newline at end of file diff --git a/ceilometer/tests/storage/test_storage_scenarios.py b/ceilometer/tests/storage/test_storage_scenarios.py index a136e247..36890159 100644 --- a/ceilometer/tests/storage/test_storage_scenarios.py +++ b/ceilometer/tests/storage/test_storage_scenarios.py @@ -23,7 +23,9 @@ import datetime import operator import mock +from oslo.config import cfg from oslo.utils import timeutils +import pymongo import ceilometer from ceilometer.alarm.storage import models as alarm_models @@ -3101,3 +3103,89 @@ class BigIntegerTest(tests_db.TestBase, msg = utils.meter_message_from_counter( s, self.CONF.publisher.metering_secret) self.conn.record_metering_data(msg) + + +class MongoAutoReconnectTest(DBTestBase, + tests_db.MixinTestsWithBackendScenarios): + cfg.CONF.set_override('retry_interval', 1, group='database') + + @tests_db.run_with('mongodb') + def test_mongo_client(self): + if cfg.CONF.database.mongodb_replica_set: + self.assertIsInstance(self.conn.conn.conn.conn, + pymongo.MongoReplicaSetClient) + else: + self.assertIsInstance(self.conn.conn.conn.conn, + pymongo.MongoClient) + + @staticmethod + def create_side_effect(method, test_exception): + def side_effect(*args, **kwargs): + if test_exception.pop(): + raise pymongo.errors.AutoReconnect + else: + return method(*args, **kwargs) + return side_effect + + @tests_db.run_with('mongodb') + def test_mongo_find(self): + raise_exc = [False, True] + method = self.conn.db.resource.find + + with mock.patch('pymongo.collection.Collection.find', + mock.Mock()) as mock_find: + mock_find.side_effect = self.create_side_effect(method, raise_exc) + mock_find.__name__ = 'find' + resources = list(self.conn.get_resources()) + self.assertEqual(9, len(resources)) + + @tests_db.run_with('mongodb') + def test_mongo_insert(self): + raise_exc = [False, True] + method = self.conn.db.meter.insert + + with mock.patch('pymongo.collection.Collection.insert', + mock.Mock(return_value=method)) as mock_insert: + mock_insert.side_effect = self.create_side_effect(method, + raise_exc) + mock_insert.__name__ = 'insert' + self.create_and_store_sample( + timestamp=datetime.datetime(2014, 10, 15, 14, 39), + source='test-proxy') + meters = list(self.conn.db.meter.find()) + self.assertEqual(12, len(meters)) + + @tests_db.run_with('mongodb') + def test_mongo_find_and_modify(self): + raise_exc = [False, True] + method = self.conn.db.resource.find_and_modify + + with mock.patch('pymongo.collection.Collection.find_and_modify', + mock.Mock()) as mock_fam: + mock_fam.side_effect = self.create_side_effect(method, raise_exc) + mock_fam.__name__ = 'find_and_modify' + self.create_and_store_sample( + timestamp=datetime.datetime(2014, 10, 15, 14, 39), + source='test-proxy') + data = self.conn.db.resource.find( + {'last_sample_timestamp': + datetime.datetime(2014, 10, 15, 14, 39)})[0]['source'] + self.assertEqual('test-proxy', data) + + @tests_db.run_with('mongodb') + def test_mongo_update(self): + raise_exc = [False, True] + method = self.conn.db.resource.update + + with mock.patch('pymongo.collection.Collection.update', + mock.Mock()) as mock_update: + mock_update.side_effect = self.create_side_effect(method, + raise_exc) + mock_update.__name__ = 'update' + self.create_and_store_sample( + timestamp=datetime.datetime(2014, 10, 15, 17, 39), + source='test-proxy-update') + data = self.conn.db.resource.find( + {'last_sample_timestamp': + datetime.datetime(2014, 10, 15, 17, 39)})[0]['source'] + self.assertEqual('test-proxy-update', data)