Merge "Retry query if db deadlock error is received"
This commit is contained in:
commit
32359046d9
@ -55,18 +55,26 @@ def safe_for_db_retry(f):
|
|||||||
:param f: database api method.
|
:param f: database api method.
|
||||||
:type f: function.
|
:type f: function.
|
||||||
"""
|
"""
|
||||||
f.__dict__['enable_retry'] = True
|
f.enable_retry_on_disconnect = True
|
||||||
|
return f
|
||||||
|
|
||||||
|
|
||||||
|
def retry_on_deadlock(f):
|
||||||
|
"""Retry a DB API call if Deadlock was received.
|
||||||
|
|
||||||
|
wrap_db_entry will be applied to all db.api functions marked with this
|
||||||
|
decorator.
|
||||||
|
"""
|
||||||
|
f.enable_retry_on_deadlock = True
|
||||||
return f
|
return f
|
||||||
|
|
||||||
|
|
||||||
class wrap_db_retry(object):
|
class wrap_db_retry(object):
|
||||||
"""Decorator class. Retry db.api methods, if DBConnectionError() raised.
|
"""Retry db.api methods, if db_error raised
|
||||||
|
|
||||||
Retry decorated db.api methods. If we enabled `use_db_reconnect`
|
Retry decorated db.api methods. This decorator catches db_error and retries
|
||||||
in config, this decorator will be applied to all db.api functions,
|
function in a loop until it succeeds, or until maximum retries count
|
||||||
marked with @safe_for_db_retry decorator.
|
will be reached.
|
||||||
Decorator catches DBConnectionError() and retries function in a
|
|
||||||
loop until it succeeds, or until maximum retries count will be reached.
|
|
||||||
|
|
||||||
Keyword arguments:
|
Keyword arguments:
|
||||||
|
|
||||||
@ -84,9 +92,14 @@ class wrap_db_retry(object):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, retry_interval, max_retries, inc_retry_interval,
|
def __init__(self, retry_interval, max_retries, inc_retry_interval,
|
||||||
max_retry_interval):
|
max_retry_interval, retry_on_disconnect, retry_on_deadlock):
|
||||||
super(wrap_db_retry, self).__init__()
|
super(wrap_db_retry, self).__init__()
|
||||||
|
|
||||||
|
self.db_error = ()
|
||||||
|
if retry_on_disconnect:
|
||||||
|
self.db_error += (exception.DBConnectionError, )
|
||||||
|
if retry_on_deadlock:
|
||||||
|
self.db_error += (exception.DBDeadlock, )
|
||||||
self.retry_interval = retry_interval
|
self.retry_interval = retry_interval
|
||||||
self.max_retries = max_retries
|
self.max_retries = max_retries
|
||||||
self.inc_retry_interval = inc_retry_interval
|
self.inc_retry_interval = inc_retry_interval
|
||||||
@ -97,17 +110,18 @@ class wrap_db_retry(object):
|
|||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
next_interval = self.retry_interval
|
next_interval = self.retry_interval
|
||||||
remaining = self.max_retries
|
remaining = self.max_retries
|
||||||
|
db_error = self.db_error
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
return f(*args, **kwargs)
|
return f(*args, **kwargs)
|
||||||
except exception.DBConnectionError as e:
|
except db_error as e:
|
||||||
if remaining == 0:
|
if remaining == 0:
|
||||||
LOG.exception(_LE('DB exceeded retry limit.'))
|
LOG.exception(_LE('DB exceeded retry limit.'))
|
||||||
raise exception.DBError(e)
|
raise e
|
||||||
if remaining != -1:
|
if remaining != -1:
|
||||||
remaining -= 1
|
remaining -= 1
|
||||||
LOG.exception(_LE('DB connection error.'))
|
LOG.exception(_LE('DB error.'))
|
||||||
# NOTE(vsergeyev): We are using patched time module, so
|
# NOTE(vsergeyev): We are using patched time module, so
|
||||||
# this effectively yields the execution
|
# this effectively yields the execution
|
||||||
# context to another green thread.
|
# context to another green thread.
|
||||||
@ -193,12 +207,18 @@ class DBAPI(object):
|
|||||||
# NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry
|
# NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry
|
||||||
# DB API methods, decorated with @safe_for_db_retry
|
# DB API methods, decorated with @safe_for_db_retry
|
||||||
# on disconnect.
|
# on disconnect.
|
||||||
if self.use_db_reconnect and hasattr(attr, 'enable_retry'):
|
retry_on_disconnect = self.use_db_reconnect and getattr(
|
||||||
|
attr, 'enable_retry_on_disconnect', False)
|
||||||
|
retry_on_deadlock = getattr(attr, 'enable_retry_on_deadlock', False)
|
||||||
|
|
||||||
|
if retry_on_disconnect or retry_on_deadlock:
|
||||||
attr = wrap_db_retry(
|
attr = wrap_db_retry(
|
||||||
retry_interval=self.retry_interval,
|
retry_interval=self.retry_interval,
|
||||||
max_retries=self.max_retries,
|
max_retries=self.max_retries,
|
||||||
inc_retry_interval=self.inc_retry_interval,
|
inc_retry_interval=self.inc_retry_interval,
|
||||||
max_retry_interval=self.max_retry_interval)(attr)
|
max_retry_interval=self.max_retry_interval,
|
||||||
|
retry_on_disconnect=retry_on_disconnect,
|
||||||
|
retry_on_deadlock=retry_on_deadlock)(attr)
|
||||||
|
|
||||||
return attr
|
return attr
|
||||||
|
|
||||||
|
@ -119,18 +119,20 @@ database_opts = [
|
|||||||
'on connection lost.'),
|
'on connection lost.'),
|
||||||
cfg.IntOpt('db_retry_interval',
|
cfg.IntOpt('db_retry_interval',
|
||||||
default=1,
|
default=1,
|
||||||
help='Seconds between database connection retries.'),
|
help='Seconds between retries of a database transaction.'),
|
||||||
cfg.BoolOpt('db_inc_retry_interval',
|
cfg.BoolOpt('db_inc_retry_interval',
|
||||||
default=True,
|
default=True,
|
||||||
help='If True, increases the interval between database '
|
help='If True, increases the interval between retries '
|
||||||
'connection retries up to db_max_retry_interval.'),
|
'of a database operation up to db_max_retry_interval.'),
|
||||||
cfg.IntOpt('db_max_retry_interval',
|
cfg.IntOpt('db_max_retry_interval',
|
||||||
default=10,
|
default=10,
|
||||||
help='If db_inc_retry_interval is set, the '
|
help='If db_inc_retry_interval is set, the '
|
||||||
'maximum seconds between database connection retries.'),
|
'maximum seconds between retries of a '
|
||||||
|
'database operation.'),
|
||||||
cfg.IntOpt('db_max_retries',
|
cfg.IntOpt('db_max_retries',
|
||||||
default=20,
|
default=20,
|
||||||
help='Maximum database connection retries before error is '
|
help='Maximum retries in case of connection error or deadlock '
|
||||||
|
'error before error is '
|
||||||
'raised. Set to -1 to specify an infinite retry '
|
'raised. Set to -1 to specify an infinite retry '
|
||||||
'count.'),
|
'count.'),
|
||||||
]
|
]
|
||||||
|
@ -33,32 +33,60 @@ def get_backend():
|
|||||||
|
|
||||||
|
|
||||||
class DBAPI(object):
|
class DBAPI(object):
|
||||||
def _api_raise(self, *args, **kwargs):
|
def _api_raise(self, exception_to_raise, *args, **kwargs):
|
||||||
"""Simulate raising a database-has-gone-away error
|
"""Simulate raising a database error
|
||||||
|
|
||||||
This method creates a fake OperationalError with an ID matching
|
This method creates a fake OperationalError with an ID matching
|
||||||
a valid MySQL "database has gone away" situation. It also decrements
|
a valid MySQL "database has gone away" situation. It also decrements
|
||||||
the error_counter so that we can artificially keep track of
|
the error_counter so that we can artificially keep track of
|
||||||
how many times this function is called by the wrapper. When
|
how many times this function is called by the wrapper. When
|
||||||
error_counter reaches zero, this function returns True, simulating
|
error_counter reaches zero, this function returns True, simulating
|
||||||
the database becoming available again and the query succeeding.
|
the query succeeding.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if self.error_counter > 0:
|
if self.error_counter > 0:
|
||||||
self.error_counter -= 1
|
self.error_counter -= 1
|
||||||
orig = sqla.exc.DBAPIError(False, False, False)
|
orig = sqla.exc.DBAPIError(False, False, False)
|
||||||
orig.args = [2006, 'Test raise operational error']
|
orig.args = [2006, 'Test raise operational error']
|
||||||
e = exception.DBConnectionError(orig)
|
exception_type = type(exception_to_raise)
|
||||||
|
e = exception_type(orig)
|
||||||
raise e
|
raise e
|
||||||
else:
|
else:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def api_raise_default(self, *args, **kwargs):
|
def api_raise_conn_err_default(self, *args, **kwargs):
|
||||||
return self._api_raise(*args, **kwargs)
|
return self._api_raise(exception.DBConnectionError(), *args, **kwargs)
|
||||||
|
|
||||||
@api.safe_for_db_retry
|
@api.safe_for_db_retry
|
||||||
def api_raise_enable_retry(self, *args, **kwargs):
|
def api_raise_conn_err_enable_retry(self, *args, **kwargs):
|
||||||
return self._api_raise(*args, **kwargs)
|
return self._api_raise(exception.DBConnectionError(), *args, **kwargs)
|
||||||
|
|
||||||
|
def api_raise_deadlock_err_default(self, *args, **kwargs):
|
||||||
|
return self._api_raise(exception.DBDeadlock(), *args, **kwargs)
|
||||||
|
|
||||||
|
@api.retry_on_deadlock
|
||||||
|
def api_raise_deadlock_err_decorated(self, *args, **kwargs):
|
||||||
|
return self._api_raise(exception.DBDeadlock(), *args, **kwargs)
|
||||||
|
|
||||||
|
@api.safe_for_db_retry
|
||||||
|
def api_raise_deadlock_safe_db_retry_decorated(self, *args, **kwargs):
|
||||||
|
return self._api_raise(exception.DBDeadlock(), *args, **kwargs)
|
||||||
|
|
||||||
|
@api.safe_for_db_retry
|
||||||
|
@api.retry_on_deadlock
|
||||||
|
def api_raise_deadlock_err_two_decorators(self, *args, **kwargs):
|
||||||
|
if self.error_counter > 2:
|
||||||
|
return False
|
||||||
|
if self.error_counter == 2:
|
||||||
|
self.error_counter -= 1
|
||||||
|
orig = sqla.exc.DBAPIError(False, False, False)
|
||||||
|
orig.args = [2006, 'Test raise operational error']
|
||||||
|
raise exception.DBConnectionError(orig)
|
||||||
|
if self.error_counter == 1:
|
||||||
|
self.error_counter -= 1
|
||||||
|
raise exception.DBDeadlock()
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
def api_class_call1(_self, *args, **kwargs):
|
def api_class_call1(_self, *args, **kwargs):
|
||||||
return args, kwargs
|
return args, kwargs
|
||||||
@ -103,14 +131,15 @@ class DBReconnectTestCase(DBAPITestCase):
|
|||||||
self.dbapi = api.DBAPI('sqlalchemy', {'sqlalchemy': __name__})
|
self.dbapi = api.DBAPI('sqlalchemy', {'sqlalchemy': __name__})
|
||||||
|
|
||||||
self.test_db_api.error_counter = 5
|
self.test_db_api.error_counter = 5
|
||||||
self.assertRaises(exception.DBConnectionError, self.dbapi._api_raise)
|
self.assertRaises(exception.DBConnectionError,
|
||||||
|
self.dbapi.api_raise_conn_err_default)
|
||||||
|
|
||||||
def test_raise_connection_error_decorated(self):
|
def test_raise_connection_error_decorated(self):
|
||||||
self.dbapi = api.DBAPI('sqlalchemy', {'sqlalchemy': __name__})
|
self.dbapi = api.DBAPI('sqlalchemy', {'sqlalchemy': __name__})
|
||||||
|
|
||||||
self.test_db_api.error_counter = 5
|
self.test_db_api.error_counter = 5
|
||||||
self.assertRaises(exception.DBConnectionError,
|
self.assertRaises(exception.DBConnectionError,
|
||||||
self.dbapi.api_raise_enable_retry)
|
self.dbapi.api_raise_conn_err_enable_retry)
|
||||||
self.assertEqual(4, self.test_db_api.error_counter, 'Unexpected retry')
|
self.assertEqual(4, self.test_db_api.error_counter, 'Unexpected retry')
|
||||||
|
|
||||||
def test_raise_connection_error_enabled(self):
|
def test_raise_connection_error_enabled(self):
|
||||||
@ -120,7 +149,7 @@ class DBReconnectTestCase(DBAPITestCase):
|
|||||||
|
|
||||||
self.test_db_api.error_counter = 5
|
self.test_db_api.error_counter = 5
|
||||||
self.assertRaises(exception.DBConnectionError,
|
self.assertRaises(exception.DBConnectionError,
|
||||||
self.dbapi.api_raise_default)
|
self.dbapi.api_raise_conn_err_default)
|
||||||
self.assertEqual(4, self.test_db_api.error_counter, 'Unexpected retry')
|
self.assertEqual(4, self.test_db_api.error_counter, 'Unexpected retry')
|
||||||
|
|
||||||
def test_retry_one(self):
|
def test_retry_one(self):
|
||||||
@ -129,12 +158,9 @@ class DBReconnectTestCase(DBAPITestCase):
|
|||||||
use_db_reconnect=True,
|
use_db_reconnect=True,
|
||||||
retry_interval=1)
|
retry_interval=1)
|
||||||
|
|
||||||
try:
|
func = self.dbapi.api_raise_conn_err_enable_retry
|
||||||
func = self.dbapi.api_raise_enable_retry
|
self.test_db_api.error_counter = 1
|
||||||
self.test_db_api.error_counter = 1
|
self.assertTrue(func(), 'Single retry did not succeed.')
|
||||||
self.assertTrue(func(), 'Single retry did not succeed.')
|
|
||||||
except Exception:
|
|
||||||
self.fail('Single retry raised an un-wrapped error.')
|
|
||||||
|
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
0, self.test_db_api.error_counter,
|
0, self.test_db_api.error_counter,
|
||||||
@ -147,12 +173,9 @@ class DBReconnectTestCase(DBAPITestCase):
|
|||||||
retry_interval=1,
|
retry_interval=1,
|
||||||
inc_retry_interval=False)
|
inc_retry_interval=False)
|
||||||
|
|
||||||
try:
|
func = self.dbapi.api_raise_conn_err_enable_retry
|
||||||
func = self.dbapi.api_raise_enable_retry
|
self.test_db_api.error_counter = 2
|
||||||
self.test_db_api.error_counter = 2
|
self.assertTrue(func(), 'Multiple retry did not succeed.')
|
||||||
self.assertTrue(func(), 'Multiple retry did not succeed.')
|
|
||||||
except Exception:
|
|
||||||
self.fail('Multiple retry raised an un-wrapped error.')
|
|
||||||
|
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
0, self.test_db_api.error_counter,
|
0, self.test_db_api.error_counter,
|
||||||
@ -166,7 +189,7 @@ class DBReconnectTestCase(DBAPITestCase):
|
|||||||
inc_retry_interval=False,
|
inc_retry_interval=False,
|
||||||
max_retries=3)
|
max_retries=3)
|
||||||
|
|
||||||
func = self.dbapi.api_raise_enable_retry
|
func = self.dbapi.api_raise_conn_err_enable_retry
|
||||||
self.test_db_api.error_counter = 5
|
self.test_db_api.error_counter = 5
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
exception.DBError, func,
|
exception.DBError, func,
|
||||||
@ -175,3 +198,84 @@ class DBReconnectTestCase(DBAPITestCase):
|
|||||||
self.assertNotEqual(
|
self.assertNotEqual(
|
||||||
0, self.test_db_api.error_counter,
|
0, self.test_db_api.error_counter,
|
||||||
'Retry did not stop after sql_max_retries iterations.')
|
'Retry did not stop after sql_max_retries iterations.')
|
||||||
|
|
||||||
|
|
||||||
|
class DBDeadlockTestCase(DBAPITestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(DBDeadlockTestCase, self).setUp()
|
||||||
|
|
||||||
|
self.test_db_api = DBAPI()
|
||||||
|
patcher = mock.patch(__name__ + '.get_backend',
|
||||||
|
return_value=self.test_db_api)
|
||||||
|
patcher.start()
|
||||||
|
self.addCleanup(patcher.stop)
|
||||||
|
|
||||||
|
def test_raise_deadlock_error(self):
|
||||||
|
self.dbapi = api.DBAPI('sqlalchemy', {'sqlalchemy': __name__})
|
||||||
|
|
||||||
|
self.test_db_api.error_counter = 5
|
||||||
|
self.assertRaises(
|
||||||
|
exception.DBDeadlock,
|
||||||
|
self.dbapi.api_raise_deadlock_err_default)
|
||||||
|
|
||||||
|
def test_raise_deadlock_error_db_reconnect_enabled(self):
|
||||||
|
self.dbapi = api.DBAPI('sqlalchemy',
|
||||||
|
{'sqlalchemy': __name__},
|
||||||
|
use_db_reconnect=True)
|
||||||
|
|
||||||
|
self.test_db_api.error_counter = 5
|
||||||
|
self.assertRaises(exception.DBDeadlock,
|
||||||
|
self.dbapi.api_raise_deadlock_err_default)
|
||||||
|
self.assertEqual(4, self.test_db_api.error_counter, 'Unexpected retry')
|
||||||
|
|
||||||
|
def test_raise_deadlock_error_connection_error_decorated(self):
|
||||||
|
self.dbapi = api.DBAPI('sqlalchemy',
|
||||||
|
{'sqlalchemy': __name__},
|
||||||
|
use_db_reconnect=True)
|
||||||
|
|
||||||
|
self.test_db_api.error_counter = 5
|
||||||
|
self.assertRaises(
|
||||||
|
exception.DBDeadlock,
|
||||||
|
self.dbapi.api_raise_deadlock_safe_db_retry_decorated)
|
||||||
|
|
||||||
|
def test_retry_one(self):
|
||||||
|
self.dbapi = api.DBAPI('sqlalchemy',
|
||||||
|
{'sqlalchemy': __name__},
|
||||||
|
retry_interval=1)
|
||||||
|
|
||||||
|
func = self.dbapi.api_raise_deadlock_err_decorated
|
||||||
|
self.test_db_api.error_counter = 1
|
||||||
|
self.assertTrue(func(), 'Single retry did not succeed.')
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
0, self.test_db_api.error_counter,
|
||||||
|
'Counter not decremented, retry logic probably failed.')
|
||||||
|
|
||||||
|
def test_retry_two(self):
|
||||||
|
self.dbapi = api.DBAPI('sqlalchemy',
|
||||||
|
{'sqlalchemy': __name__},
|
||||||
|
retry_interval=1,
|
||||||
|
inc_retry_interval=False)
|
||||||
|
|
||||||
|
func = self.dbapi.api_raise_deadlock_err_decorated
|
||||||
|
self.test_db_api.error_counter = 2
|
||||||
|
self.assertTrue(func(), 'Multiple retry did not succeed.')
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
0, self.test_db_api.error_counter,
|
||||||
|
'Counter not decremented, retry logic probably failed.')
|
||||||
|
|
||||||
|
def test_retry_two_different_exception(self):
|
||||||
|
self.dbapi = api.DBAPI('sqlalchemy',
|
||||||
|
{'sqlalchemy': __name__},
|
||||||
|
use_db_reconnect=True,
|
||||||
|
retry_interval=1,
|
||||||
|
inc_retry_interval=False)
|
||||||
|
|
||||||
|
func = self.dbapi.api_raise_deadlock_err_two_decorators
|
||||||
|
self.test_db_api.error_counter = 2
|
||||||
|
self.assertTrue(func(), 'Multiple retry did not succeed.')
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
0, self.test_db_api.error_counter,
|
||||||
|
'Counter not decremented, retry logic probably failed.')
|
||||||
|
Loading…
x
Reference in New Issue
Block a user