Merge "Retry on redis connection errors"

This commit is contained in:
Zuul 2021-04-27 10:15:38 +00:00 committed by Gerrit Code Review
commit 174065f875
2 changed files with 79 additions and 53 deletions

View File

@ -6,7 +6,7 @@ stevedore>=1.16.0 # Apache-2.0
voluptuous>=0.8.9 # BSD License voluptuous>=0.8.9 # BSD License
msgpack>=0.4.0 # Apache-2.0 msgpack>=0.4.0 # Apache-2.0
fasteners>=0.7 # Apache-2.0 fasteners>=0.7 # Apache-2.0
tenacity>=3.2.1, <7.0.0 # Apache-2.0 tenacity>=3.2.1,<7.0.0 # Apache-2.0
futurist>=1.2.0 # Apache-2.0 futurist>=1.2.0 # Apache-2.0
oslo.utils>=4.7.0 # Apache-2.0 oslo.utils>=4.7.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0 oslo.serialization>=1.10.0 # Apache-2.0

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import contextlib
from distutils import version from distutils import version
import functools import functools
import logging import logging
@ -35,19 +34,46 @@ from tooz import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@contextlib.contextmanager def _handle_failures(func=None, n_tries=15):
def _translate_failures():
"""Translates common redis exceptions into tooz exceptions.""" """Translates common redis exceptions into tooz exceptions.
try:
yield This also enables retrying on certain exceptions.
except (exceptions.ConnectionError, exceptions.TimeoutError) as e:
utils.raise_with_cause(coordination.ToozConnectionError, :param func: the function to act on
encodeutils.exception_to_unicode(e), :param n_tries: the number of retries
cause=e) """
except exceptions.RedisError as e:
utils.raise_with_cause(tooz.ToozError, if func is None:
encodeutils.exception_to_unicode(e), return functools.partial(
cause=e) _handle_failures,
n_tries=n_tries
)
@functools.wraps(func)
def wrapper(*args, **kwargs):
ntries = n_tries
while ntries > 1:
try:
return func(*args, **kwargs)
except exceptions.ConnectionError as e:
# retry ntries times and then raise a connection error
ntries -= 1
if ntries >= 1:
utils.raise_with_cause(coordination.ToozConnectionError,
encodeutils.exception_to_unicode(e),
cause=e)
except (exceptions.TimeoutError) as e:
utils.raise_with_cause(coordination.ToozConnectionError,
encodeutils.exception_to_unicode(e),
cause=e)
except exceptions.RedisError as e:
utils.raise_with_cause(tooz.ToozError,
encodeutils.exception_to_unicode(e),
cause=e)
return func(*args, **kwargs)
return wrapper
class RedisLock(locking.Lock): class RedisLock(locking.Lock):
@ -63,48 +89,48 @@ class RedisLock(locking.Lock):
self._coord = coord self._coord = coord
self._client = client self._client = client
@_handle_failures
def is_still_owner(self): def is_still_owner(self):
with _translate_failures(): lock_tok = self._lock.local.token
lock_tok = self._lock.local.token if not lock_tok:
if not lock_tok: return False
return False owner_tok = self._client.get(self.name)
owner_tok = self._client.get(self.name) return owner_tok == lock_tok
return owner_tok == lock_tok
@_handle_failures
def break_(self): def break_(self):
with _translate_failures(): return bool(self._client.delete(self.name))
return bool(self._client.delete(self.name))
@_handle_failures
def acquire(self, blocking=True, shared=False): def acquire(self, blocking=True, shared=False):
if shared: if shared:
raise tooz.NotImplemented raise tooz.NotImplemented
blocking, timeout = utils.convert_blocking(blocking) blocking, timeout = utils.convert_blocking(blocking)
with _translate_failures(): acquired = self._lock.acquire(
acquired = self._lock.acquire( blocking=blocking, blocking_timeout=timeout)
blocking=blocking, blocking_timeout=timeout) if acquired:
if acquired: with self._exclusive_access:
with self._exclusive_access: self._coord._acquired_locks.add(self)
self._coord._acquired_locks.add(self) return acquired
return acquired
@_handle_failures
def release(self): def release(self):
with self._exclusive_access: with self._exclusive_access:
with _translate_failures(): try:
try: self._lock.release()
self._lock.release() except exceptions.LockError as e:
except exceptions.LockError as e: LOG.error("Unable to release lock '%r': %s", self, e)
LOG.error("Unable to release lock '%r': %s", self, e) return False
return False finally:
finally: self._coord._acquired_locks.discard(self)
self._coord._acquired_locks.discard(self) return True
return True
@_handle_failures
def heartbeat(self): def heartbeat(self):
with self._exclusive_access: with self._exclusive_access:
if self.acquired: if self.acquired:
with _translate_failures(): self._lock.reacquire()
self._lock.reacquire() return True
return True
return False return False
@property @property
@ -433,6 +459,7 @@ return 1
return master_client return master_client
return redis.StrictRedis(**kwargs) return redis.StrictRedis(**kwargs)
@_handle_failures
def _start(self): def _start(self):
super(RedisDriver, self)._start() super(RedisDriver, self)._start()
try: try:
@ -446,8 +473,7 @@ return 1
# Ensure that the server is alive and not dead, this does not # Ensure that the server is alive and not dead, this does not
# ensure the server will always be alive, but does insure that it # ensure the server will always be alive, but does insure that it
# at least is alive once... # at least is alive once...
with _translate_failures(): self._server_info = self._client.info()
self._server_info = self._client.info()
# Validate we have a good enough redis version we are connected # Validate we have a good enough redis version we are connected
# to so that the basic set of features we support will actually # to so that the basic set of features we support will actually
# work (instead of blowing up). # work (instead of blowing up).
@ -506,12 +532,13 @@ return 1
def _decode_group_id(self, group_id): def _decode_group_id(self, group_id):
return utils.to_binary(group_id, encoding=self._encoding) return utils.to_binary(group_id, encoding=self._encoding)
@_handle_failures
def heartbeat(self): def heartbeat(self):
with _translate_failures(): beat_id = self._encode_beat_id(self._member_id)
beat_id = self._encode_beat_id(self._member_id) expiry_ms = max(0, int(self.membership_timeout * 1000.0))
expiry_ms = max(0, int(self.membership_timeout * 1000.0)) self._client.psetex(beat_id, time_ms=expiry_ms,
self._client.psetex(beat_id, time_ms=expiry_ms, value=self.STILL_ALIVE)
value=self.STILL_ALIVE)
for lock in self._acquired_locks.copy(): for lock in self._acquired_locks.copy():
try: try:
lock.heartbeat() lock.heartbeat()
@ -520,6 +547,7 @@ return 1
exc_info=True) exc_info=True)
return min(self.lock_timeout, self.membership_timeout) return min(self.lock_timeout, self.membership_timeout)
@_handle_failures
def _stop(self): def _stop(self):
while self._acquired_locks: while self._acquired_locks:
lock = self._acquired_locks.pop() lock = self._acquired_locks.pop()
@ -534,8 +562,7 @@ return 1
try: try:
# NOTE(harlowja): this will delete nothing if the key doesn't # NOTE(harlowja): this will delete nothing if the key doesn't
# exist in the first place, which is fine/expected/desired... # exist in the first place, which is fine/expected/desired...
with _translate_failures(): self._client.delete(beat_id)
self._client.delete(beat_id)
except tooz.ToozError: except tooz.ToozError:
LOG.warning("Unable to delete heartbeat key '%s'", beat_id, LOG.warning("Unable to delete heartbeat key '%s'", beat_id,
exc_info=True) exc_info=True)
@ -752,5 +779,4 @@ return 1
return result return result
RedisFutureResult = functools.partial(coordination.CoordinatorResult, RedisFutureResult = functools.partial(coordination.CoordinatorResult)
failure_translator=_translate_failures)