Enable retries in redis driver

this is followup to Iaab5ce609c0dcf7085f5dd43efbd37eb4b88f17b

actually retry for specified number of retries instead of raising
error on first ConnectionError

Change-Id: Ibca3f568b65dfea252da4b67f6d5105ba7f1ecb1
(cherry picked from commit 47c4d56e44)
This commit is contained in:
Pavlo Shchelokovskyy 2021-06-10 16:32:32 +03:00 committed by Emma Foley
parent 62d5d17a5d
commit 11526e594c
2 changed files with 41 additions and 36 deletions

View File

@ -0,0 +1,5 @@
---
features:
- |
Redis driver retries actions for up to 15 times when met with error
connecting to Redis.

View File

@ -34,7 +34,7 @@ from tooz import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def _handle_failures(func=None, n_tries=15): def _handle_failures(n_tries=15):
"""Translates common redis exceptions into tooz exceptions. """Translates common redis exceptions into tooz exceptions.
@ -43,26 +43,26 @@ def _handle_failures(func=None, n_tries=15):
:param func: the function to act on :param func: the function to act on
:param n_tries: the number of retries :param n_tries: the number of retries
""" """
def inner(func):
if func is None:
return functools.partial(
_handle_failures,
n_tries=n_tries
)
@functools.wraps(func) @functools.wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
ntries = n_tries ntries = n_tries
while ntries > 1: while ntries:
try: try:
return func(*args, **kwargs) return func(*args, **kwargs)
except exceptions.ConnectionError as e: except exceptions.ConnectionError as e:
# retry ntries times and then raise a connection error # retry ntries times and then raise a connection error
ntries -= 1 ntries -= 1
if ntries >= 1: if not ntries:
utils.raise_with_cause(coordination.ToozConnectionError, LOG.debug(
"Redis connection error, "
"retry limit has been reached, aborting - %s", e
)
utils.raise_with_cause(
coordination.ToozConnectionError,
encodeutils.exception_to_unicode(e), encodeutils.exception_to_unicode(e),
cause=e) cause=e)
LOG.debug("Redis connection error, will retry - %s", e)
except (exceptions.TimeoutError) as e: except (exceptions.TimeoutError) as e:
utils.raise_with_cause(coordination.ToozConnectionError, utils.raise_with_cause(coordination.ToozConnectionError,
@ -72,8 +72,8 @@ def _handle_failures(func=None, n_tries=15):
utils.raise_with_cause(tooz.ToozError, utils.raise_with_cause(tooz.ToozError,
encodeutils.exception_to_unicode(e), encodeutils.exception_to_unicode(e),
cause=e) cause=e)
return func(*args, **kwargs)
return wrapper return wrapper
return inner
class RedisLock(locking.Lock): class RedisLock(locking.Lock):
@ -89,7 +89,7 @@ class RedisLock(locking.Lock):
self._coord = coord self._coord = coord
self._client = client self._client = client
@_handle_failures @_handle_failures()
def is_still_owner(self): def is_still_owner(self):
lock_tok = self._lock.local.token lock_tok = self._lock.local.token
if not lock_tok: if not lock_tok:
@ -97,11 +97,11 @@ class RedisLock(locking.Lock):
owner_tok = self._client.get(self.name) owner_tok = self._client.get(self.name)
return owner_tok == lock_tok return owner_tok == lock_tok
@_handle_failures @_handle_failures()
def break_(self): def break_(self):
return bool(self._client.delete(self.name)) return bool(self._client.delete(self.name))
@_handle_failures @_handle_failures()
def acquire(self, blocking=True, shared=False): def acquire(self, blocking=True, shared=False):
if shared: if shared:
raise tooz.NotImplemented raise tooz.NotImplemented
@ -113,7 +113,7 @@ class RedisLock(locking.Lock):
self._coord._acquired_locks.add(self) self._coord._acquired_locks.add(self)
return acquired return acquired
@_handle_failures @_handle_failures()
def release(self): def release(self):
with self._exclusive_access: with self._exclusive_access:
try: try:
@ -125,7 +125,7 @@ class RedisLock(locking.Lock):
self._coord._acquired_locks.discard(self) self._coord._acquired_locks.discard(self)
return True return True
@_handle_failures @_handle_failures()
def heartbeat(self): def heartbeat(self):
with self._exclusive_access: with self._exclusive_access:
if self.acquired: if self.acquired:
@ -459,7 +459,7 @@ return 1
return master_client return master_client
return redis.StrictRedis(**kwargs) return redis.StrictRedis(**kwargs)
@_handle_failures @_handle_failures()
def _start(self): def _start(self):
super(RedisDriver, self)._start() super(RedisDriver, self)._start()
try: try:
@ -532,7 +532,7 @@ return 1
def _decode_group_id(self, group_id): def _decode_group_id(self, group_id):
return utils.to_binary(group_id, encoding=self._encoding) return utils.to_binary(group_id, encoding=self._encoding)
@_handle_failures @_handle_failures()
def heartbeat(self): def heartbeat(self):
beat_id = self._encode_beat_id(self._member_id) beat_id = self._encode_beat_id(self._member_id)
expiry_ms = max(0, int(self.membership_timeout * 1000.0)) expiry_ms = max(0, int(self.membership_timeout * 1000.0))
@ -547,7 +547,7 @@ return 1
exc_info=True) exc_info=True)
return min(self.lock_timeout, self.membership_timeout) return min(self.lock_timeout, self.membership_timeout)
@_handle_failures @_handle_failures()
def _stop(self): def _stop(self):
while self._acquired_locks: while self._acquired_locks:
lock = self._acquired_locks.pop() lock = self._acquired_locks.pop()