Retry on redis connection errors

Sometimes, connections get closed by the server. This change
adds retrying to the code.

Conflict:
requirements.txt

NOTE(hberaud): This conflict is due to the cap of tenacity during wallaby
to unblock the gate. This requirement cap won't be an issue for victoria
and for previous stable branches.

fe9c430455

Change-Id: Iaab5ce609c0dcf7085f5dd43efbd37eb4b88f17b
(cherry picked from commit a025b4d4ee)
(cherry picked from commit 62d5d17a5d)
(cherry picked from commit 1ddddbba73)
This commit is contained in:
Matthias Runge 2021-03-05 11:24:25 +01:00 committed by Stephen Finucane
parent 1df9e9c302
commit b8e0d819d4
2 changed files with 79 additions and 53 deletions

View File

@ -7,7 +7,7 @@ six>=1.9.0 # MIT
voluptuous>=0.8.9 # BSD License voluptuous>=0.8.9 # BSD License
msgpack>=0.4.0 # Apache-2.0 msgpack>=0.4.0 # Apache-2.0
fasteners>=0.7 # Apache-2.0 fasteners>=0.7 # Apache-2.0
tenacity>=3.2.1 # Apache-2.0 tenacity>=3.2.1,<7.0.0 # Apache-2.0
futurist>=1.2.0 # Apache-2.0 futurist>=1.2.0 # Apache-2.0
oslo.utils>=3.15.0 # Apache-2.0 oslo.utils>=3.15.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0 oslo.serialization>=1.10.0 # Apache-2.0

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import contextlib
from distutils import version from distutils import version
import functools import functools
import logging import logging
@ -38,19 +37,46 @@ from tooz import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@contextlib.contextmanager def _handle_failures(func=None, n_tries=15):
def _translate_failures():
"""Translates common redis exceptions into tooz exceptions.""" """Translates common redis exceptions into tooz exceptions.
try:
yield This also enables retrying on certain exceptions.
except (exceptions.ConnectionError, exceptions.TimeoutError) as e:
utils.raise_with_cause(coordination.ToozConnectionError, :param func: the function to act on
encodeutils.exception_to_unicode(e), :param n_tries: the number of retries
cause=e) """
except exceptions.RedisError as e:
utils.raise_with_cause(tooz.ToozError, if func is None:
encodeutils.exception_to_unicode(e), return functools.partial(
cause=e) _handle_failures,
n_tries=n_tries
)
@functools.wraps(func)
def wrapper(*args, **kwargs):
ntries = n_tries
while ntries > 1:
try:
return func(*args, **kwargs)
except exceptions.ConnectionError as e:
# retry ntries times and then raise a connection error
ntries -= 1
if ntries >= 1:
utils.raise_with_cause(coordination.ToozConnectionError,
encodeutils.exception_to_unicode(e),
cause=e)
except (exceptions.TimeoutError) as e:
utils.raise_with_cause(coordination.ToozConnectionError,
encodeutils.exception_to_unicode(e),
cause=e)
except exceptions.RedisError as e:
utils.raise_with_cause(tooz.ToozError,
encodeutils.exception_to_unicode(e),
cause=e)
return func(*args, **kwargs)
return wrapper
class RedisLock(locking.Lock): class RedisLock(locking.Lock):
@ -66,48 +92,48 @@ class RedisLock(locking.Lock):
self._coord = coord self._coord = coord
self._client = client self._client = client
@_handle_failures
def is_still_owner(self): def is_still_owner(self):
with _translate_failures(): lock_tok = self._lock.local.token
lock_tok = self._lock.local.token if not lock_tok:
if not lock_tok: return False
return False owner_tok = self._client.get(self.name)
owner_tok = self._client.get(self.name) return owner_tok == lock_tok
return owner_tok == lock_tok
@_handle_failures
def break_(self): def break_(self):
with _translate_failures(): return bool(self._client.delete(self.name))
return bool(self._client.delete(self.name))
@_handle_failures
def acquire(self, blocking=True, shared=False): def acquire(self, blocking=True, shared=False):
if shared: if shared:
raise tooz.NotImplemented raise tooz.NotImplemented
blocking, timeout = utils.convert_blocking(blocking) blocking, timeout = utils.convert_blocking(blocking)
with _translate_failures(): acquired = self._lock.acquire(
acquired = self._lock.acquire( blocking=blocking, blocking_timeout=timeout)
blocking=blocking, blocking_timeout=timeout) if acquired:
if acquired: with self._exclusive_access:
with self._exclusive_access: self._coord._acquired_locks.add(self)
self._coord._acquired_locks.add(self) return acquired
return acquired
@_handle_failures
def release(self): def release(self):
with self._exclusive_access: with self._exclusive_access:
with _translate_failures(): try:
try: self._lock.release()
self._lock.release() except exceptions.LockError as e:
except exceptions.LockError as e: LOG.error("Unable to release lock '%r': %s", self, e)
LOG.error("Unable to release lock '%r': %s", self, e) return False
return False finally:
finally: self._coord._acquired_locks.discard(self)
self._coord._acquired_locks.discard(self) return True
return True
@_handle_failures
def heartbeat(self): def heartbeat(self):
with self._exclusive_access: with self._exclusive_access:
if self.acquired: if self.acquired:
with _translate_failures(): self._lock.reacquire()
self._lock.reacquire() return True
return True
return False return False
@property @property
@ -436,6 +462,7 @@ return 1
return master_client return master_client
return redis.StrictRedis(**kwargs) return redis.StrictRedis(**kwargs)
@_handle_failures
def _start(self): def _start(self):
super(RedisDriver, self)._start() super(RedisDriver, self)._start()
try: try:
@ -449,8 +476,7 @@ return 1
# Ensure that the server is alive and not dead, this does not # Ensure that the server is alive and not dead, this does not
# ensure the server will always be alive, but does insure that it # ensure the server will always be alive, but does insure that it
# at least is alive once... # at least is alive once...
with _translate_failures(): self._server_info = self._client.info()
self._server_info = self._client.info()
# Validate we have a good enough redis version we are connected # Validate we have a good enough redis version we are connected
# to so that the basic set of features we support will actually # to so that the basic set of features we support will actually
# work (instead of blowing up). # work (instead of blowing up).
@ -509,12 +535,13 @@ return 1
def _decode_group_id(self, group_id): def _decode_group_id(self, group_id):
return utils.to_binary(group_id, encoding=self._encoding) return utils.to_binary(group_id, encoding=self._encoding)
@_handle_failures
def heartbeat(self): def heartbeat(self):
with _translate_failures(): beat_id = self._encode_beat_id(self._member_id)
beat_id = self._encode_beat_id(self._member_id) expiry_ms = max(0, int(self.membership_timeout * 1000.0))
expiry_ms = max(0, int(self.membership_timeout * 1000.0)) self._client.psetex(beat_id, time_ms=expiry_ms,
self._client.psetex(beat_id, time_ms=expiry_ms, value=self.STILL_ALIVE)
value=self.STILL_ALIVE)
for lock in self._acquired_locks.copy(): for lock in self._acquired_locks.copy():
try: try:
lock.heartbeat() lock.heartbeat()
@ -523,6 +550,7 @@ return 1
exc_info=True) exc_info=True)
return min(self.lock_timeout, self.membership_timeout) return min(self.lock_timeout, self.membership_timeout)
@_handle_failures
def _stop(self): def _stop(self):
while self._acquired_locks: while self._acquired_locks:
lock = self._acquired_locks.pop() lock = self._acquired_locks.pop()
@ -537,8 +565,7 @@ return 1
try: try:
# NOTE(harlowja): this will delete nothing if the key doesn't # NOTE(harlowja): this will delete nothing if the key doesn't
# exist in the first place, which is fine/expected/desired... # exist in the first place, which is fine/expected/desired...
with _translate_failures(): self._client.delete(beat_id)
self._client.delete(beat_id)
except tooz.ToozError: except tooz.ToozError:
LOG.warning("Unable to delete heartbeat key '%s'", beat_id, LOG.warning("Unable to delete heartbeat key '%s'", beat_id,
exc_info=True) exc_info=True)
@ -755,5 +782,4 @@ return 1
return result return result
RedisFutureResult = functools.partial(coordination.CoordinatorResult, RedisFutureResult = functools.partial(coordination.CoordinatorResult)
failure_translator=_translate_failures)