Allow collector service database connection retry

At the startup collector service loads the database dispatcher,
if the DB is offline momentarily the collector service starts but
continues to run with out having the ability to re-try the DB
connection. This fix will allow the connection to be retried next
time when collector service attempts to write to the database.

Change-Id: I10fef61a9c525e1c84d8cadbca86d6bde84927ba
Closes-Bug: 1301777
This commit is contained in:
Srinivas Sakhamuri 2014-10-09 03:39:30 +00:00
parent 3fbed6d3c7
commit 48f0c4b423
7 changed files with 63 additions and 68 deletions

View File

@ -38,8 +38,33 @@ class DatabaseDispatcher(dispatcher.Base):
"""
def __init__(self, conf):
super(DatabaseDispatcher, self).__init__(conf)
self.meter_conn = storage.get_connection_from_config(conf, 'metering')
self.event_conn = storage.get_connection_from_config(conf, 'event')
self._meter_conn = self._get_db_conn('metering', True)
self._event_conn = self._get_db_conn('event', True)
def _get_db_conn(self, purpose, ignore_exception=False):
try:
return storage.get_connection_from_config(self.conf, purpose)
except Exception as err:
params = {"purpose": purpose, "err": err}
LOG.exception(_("Failed to connect to db, purpose %(purpose)s "
"re-try later: %(err)s") % params)
if not ignore_exception:
raise
@property
def meter_conn(self):
if not self._meter_conn:
self._meter_conn = self._get_db_conn('metering')
return self._meter_conn
@property
def event_conn(self):
if not self._event_conn:
self._event_conn = self._get_db_conn('event')
return self._event_conn
def record_metering_data(self, data):
# We may have receive only one counter on the wire

View File

@ -19,6 +19,7 @@
from oslo.config import cfg
from oslo.db import options as db_options
import retrying
import six
import six.moves.urllib.parse as urlparse
from stevedore import driver
@ -74,6 +75,11 @@ class StorageBadAggregate(Exception):
code = 400
# Convert retry_interval secs to msecs for retry decorator
@retrying.retry(wait_fixed=cfg.CONF.database.retry_interval * 1000,
stop_max_attempt_number=cfg.CONF.database.max_retries
if cfg.CONF.database.max_retries >= 0
else None)
def get_connection_from_config(conf, purpose=None):
if conf.database_connection:
conf.set_override('connection', conf.database_connection,
@ -83,6 +89,9 @@ def get_connection_from_config(conf, purpose=None):
if purpose:
namespace = 'ceilometer.%s.storage' % purpose
url = getattr(conf.database, '%s_connection' % purpose) or url
# Set max_retries to 0, since oslo.db in certain cases may attempt to retry
# making the db connection retried max_retries ^ 2 times in failure case
conf.set_override('max_retries', 0, group='database')
return get_connection(url, namespace)

View File

@ -19,8 +19,6 @@
"""
import time
from oslo.config import cfg
from oslo.utils import netutils
import pymongo
@ -179,26 +177,12 @@ class ConnectionPool(object):
@staticmethod
def _mongo_connect(url):
max_retries = cfg.CONF.database.max_retries
retry_interval = cfg.CONF.database.retry_interval
attempts = 0
while True:
try:
client = pymongo.MongoClient(url, safe=True)
except pymongo.errors.ConnectionFailure as e:
if 0 <= max_retries <= attempts:
LOG.error(_('Unable to connect to the database after '
'%(retries)d retries. Giving up.') %
{'retries': max_retries})
raise
LOG.warn(_('Unable to connect to the database server: '
'%(errmsg)s. Trying again in %(retry_interval)d '
'seconds.') %
{'errmsg': e, 'retry_interval': retry_interval})
attempts += 1
time.sleep(retry_interval)
else:
return client
try:
return pymongo.MongoClient(url, safe=True)
except pymongo.errors.ConnectionFailure as e:
LOG.warn(_('Unable to connect to the database server: '
'%(errmsg)s.') % {'errmsg': e})
raise
class QueryTransformer(object):

View File

@ -73,7 +73,7 @@ class TestDispatcherDB(base.BaseTestCase):
def record_metering_data(self, data):
self.called = True
self.dispatcher.meter_conn = ErrorConnection()
self.dispatcher._meter_conn = ErrorConnection()
self.dispatcher.record_metering_data(msg)

View File

@ -16,8 +16,10 @@
# under the License.
"""Tests for ceilometer/storage/
"""
import mock
from oslo.config import fixture as fixture_config
from oslotest import base
import retrying
from ceilometer.alarm.storage import impl_log as impl_log_alarm
from ceilometer.alarm.storage import impl_sqlalchemy as impl_sqlalchemy_alarm
@ -30,7 +32,6 @@ import six
class EngineTest(base.BaseTestCase):
def test_get_connection(self):
engine = storage.get_connection('log://localhost',
'ceilometer.metering.storage')
@ -44,6 +45,23 @@ class EngineTest(base.BaseTestCase):
self.assertIn('no-such-engine', six.text_type(err))
class ConnectionRetryTest(base.BaseTestCase):
def setUp(self):
super(ConnectionRetryTest, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
def test_retries(self):
with mock.patch.object(retrying.time, 'sleep') as retry_sleep:
try:
self.CONF.set_override("connection", "no-such-engine://",
group="database")
storage.get_connection_from_config(self.CONF)
except RuntimeError as err:
self.assertIn('no-such-engine', six.text_type(err))
self.assertEqual(retry_sleep.call_count, 9)
retry_sleep.assert_called_with(10.0)
class ConnectionConfigTest(base.BaseTestCase):
def setUp(self):
super(ConnectionConfigTest, self).setUp()

View File

@ -12,17 +12,13 @@
"""Tests the mongodb and db2 common functionality
"""
import contextlib
import copy
import datetime
import mock
import pymongo
from ceilometer.openstack.common.gettextutils import _
from ceilometer.publisher import utils
from ceilometer import sample
from ceilometer.storage.mongo import utils as pymongo_utils
from ceilometer.tests import db as tests_db
from ceilometer.tests.storage import test_storage_scenarios
@ -168,41 +164,3 @@ class CompatibilityTest(test_storage_scenarios.DBTestBase,
def test_counter_unit(self):
meters = list(self.conn.get_meters())
self.assertEqual(1, len(meters))
def test_mongodb_connect_raises_after_custom_number_of_attempts(self):
retry_interval = 13
max_retries = 37
self.CONF.set_override(
'retry_interval', retry_interval, group='database')
self.CONF.set_override(
'max_retries', max_retries, group='database')
# PyMongo is being used to connect even to DB2, but it only
# accepts URLs with the 'mongodb' scheme. This replacement is
# usually done in the DB2 connection implementation, but since
# we don't call that, we have to do it here.
self.CONF.set_override(
'connection', self.db_manager.url.replace('db2:', 'mongodb:', 1),
group='database')
pool = pymongo_utils.ConnectionPool()
with contextlib.nested(
mock.patch(
'pymongo.MongoClient',
side_effect=pymongo.errors.ConnectionFailure('foo')),
mock.patch.object(pymongo_utils.LOG, 'error'),
mock.patch.object(pymongo_utils.LOG, 'warn'),
mock.patch.object(pymongo_utils.time, 'sleep')
) as (MockMongo, MockLOGerror, MockLOGwarn, Mocksleep):
self.assertRaises(pymongo.errors.ConnectionFailure,
pool.connect, self.CONF.database.connection)
Mocksleep.assert_has_calls([mock.call(retry_interval)
for i in range(max_retries)])
MockLOGwarn.assert_any_call(
_('Unable to connect to the database server: %(errmsg)s.'
' Trying again in %(retry_interval)d seconds.') %
{'errmsg': 'foo',
'retry_interval': retry_interval})
MockLOGerror.assert_called_with(
_('Unable to connect to the database after '
'%(retries)d retries. Giving up.') %
{'retries': max_retries})

View File

@ -2,6 +2,7 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
retrying>=1.2.2,!=1.3.0
alembic>=0.6.4
anyjson>=0.3.3
argparse