Retry on redis connection errors
Sometimes, connections get closed by the server. This change
adds retrying to the code.
Conflict:
requirements.txt
NOTE(hberaud): This conflict is due to the cap of tenacity during wallaby
to unblock the gate. This requirement cap won't be an issue for victoria
and for previous stable branches.
fe9c430455
Change-Id: Iaab5ce609c0dcf7085f5dd43efbd37eb4b88f17b
(cherry picked from commit a025b4d4ee9fe1bd722960ea03445034580bec6f)
(cherry picked from commit 62d5d17a5d2495f1f898163bbd83acbbc9873620)
This commit is contained in:
parent
e5025587a6
commit
1ddddbba73
@ -6,7 +6,7 @@ stevedore>=1.16.0 # Apache-2.0
|
||||
voluptuous>=0.8.9 # BSD License
|
||||
msgpack>=0.4.0 # Apache-2.0
|
||||
fasteners>=0.7 # Apache-2.0
|
||||
tenacity>=3.2.1 # Apache-2.0
|
||||
tenacity>=3.2.1,<7.0.0 # Apache-2.0
|
||||
futurist>=1.2.0 # Apache-2.0
|
||||
oslo.utils>=3.15.0 # Apache-2.0
|
||||
oslo.serialization>=1.10.0 # Apache-2.0
|
||||
|
@ -14,7 +14,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
from distutils import version
|
||||
import functools
|
||||
import logging
|
||||
@ -35,12 +34,37 @@ from tooz import utils
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _translate_failures():
|
||||
"""Translates common redis exceptions into tooz exceptions."""
|
||||
def _handle_failures(func=None, n_tries=15):
|
||||
|
||||
"""Translates common redis exceptions into tooz exceptions.
|
||||
|
||||
This also enables retrying on certain exceptions.
|
||||
|
||||
:param func: the function to act on
|
||||
:param n_tries: the number of retries
|
||||
"""
|
||||
|
||||
if func is None:
|
||||
return functools.partial(
|
||||
_handle_failures,
|
||||
n_tries=n_tries
|
||||
)
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
ntries = n_tries
|
||||
while ntries > 1:
|
||||
try:
|
||||
yield
|
||||
except (exceptions.ConnectionError, exceptions.TimeoutError) as e:
|
||||
return func(*args, **kwargs)
|
||||
except exceptions.ConnectionError as e:
|
||||
# retry ntries times and then raise a connection error
|
||||
ntries -= 1
|
||||
if ntries >= 1:
|
||||
utils.raise_with_cause(coordination.ToozConnectionError,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
cause=e)
|
||||
|
||||
except (exceptions.TimeoutError) as e:
|
||||
utils.raise_with_cause(coordination.ToozConnectionError,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
cause=e)
|
||||
@ -48,6 +72,8 @@ def _translate_failures():
|
||||
utils.raise_with_cause(tooz.ToozError,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
cause=e)
|
||||
return func(*args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
class RedisLock(locking.Lock):
|
||||
@ -63,23 +89,23 @@ class RedisLock(locking.Lock):
|
||||
self._coord = coord
|
||||
self._client = client
|
||||
|
||||
@_handle_failures
|
||||
def is_still_owner(self):
|
||||
with _translate_failures():
|
||||
lock_tok = self._lock.local.token
|
||||
if not lock_tok:
|
||||
return False
|
||||
owner_tok = self._client.get(self.name)
|
||||
return owner_tok == lock_tok
|
||||
|
||||
@_handle_failures
|
||||
def break_(self):
|
||||
with _translate_failures():
|
||||
return bool(self._client.delete(self.name))
|
||||
|
||||
@_handle_failures
|
||||
def acquire(self, blocking=True, shared=False):
|
||||
if shared:
|
||||
raise tooz.NotImplemented
|
||||
blocking, timeout = utils.convert_blocking(blocking)
|
||||
with _translate_failures():
|
||||
acquired = self._lock.acquire(
|
||||
blocking=blocking, blocking_timeout=timeout)
|
||||
if acquired:
|
||||
@ -87,9 +113,9 @@ class RedisLock(locking.Lock):
|
||||
self._coord._acquired_locks.add(self)
|
||||
return acquired
|
||||
|
||||
@_handle_failures
|
||||
def release(self):
|
||||
with self._exclusive_access:
|
||||
with _translate_failures():
|
||||
try:
|
||||
self._lock.release()
|
||||
except exceptions.LockError as e:
|
||||
@ -99,10 +125,10 @@ class RedisLock(locking.Lock):
|
||||
self._coord._acquired_locks.discard(self)
|
||||
return True
|
||||
|
||||
@_handle_failures
|
||||
def heartbeat(self):
|
||||
with self._exclusive_access:
|
||||
if self.acquired:
|
||||
with _translate_failures():
|
||||
self._lock.reacquire()
|
||||
return True
|
||||
return False
|
||||
@ -433,6 +459,7 @@ return 1
|
||||
return master_client
|
||||
return redis.StrictRedis(**kwargs)
|
||||
|
||||
@_handle_failures
|
||||
def _start(self):
|
||||
super(RedisDriver, self)._start()
|
||||
try:
|
||||
@ -446,7 +473,6 @@ return 1
|
||||
# Ensure that the server is alive and not dead, this does not
|
||||
# ensure the server will always be alive, but does insure that it
|
||||
# at least is alive once...
|
||||
with _translate_failures():
|
||||
self._server_info = self._client.info()
|
||||
# Validate we have a good enough redis version we are connected
|
||||
# to so that the basic set of features we support will actually
|
||||
@ -506,12 +532,13 @@ return 1
|
||||
def _decode_group_id(self, group_id):
|
||||
return utils.to_binary(group_id, encoding=self._encoding)
|
||||
|
||||
@_handle_failures
|
||||
def heartbeat(self):
|
||||
with _translate_failures():
|
||||
beat_id = self._encode_beat_id(self._member_id)
|
||||
expiry_ms = max(0, int(self.membership_timeout * 1000.0))
|
||||
self._client.psetex(beat_id, time_ms=expiry_ms,
|
||||
value=self.STILL_ALIVE)
|
||||
|
||||
for lock in self._acquired_locks.copy():
|
||||
try:
|
||||
lock.heartbeat()
|
||||
@ -520,6 +547,7 @@ return 1
|
||||
exc_info=True)
|
||||
return min(self.lock_timeout, self.membership_timeout)
|
||||
|
||||
@_handle_failures
|
||||
def _stop(self):
|
||||
while self._acquired_locks:
|
||||
lock = self._acquired_locks.pop()
|
||||
@ -534,7 +562,6 @@ return 1
|
||||
try:
|
||||
# NOTE(harlowja): this will delete nothing if the key doesn't
|
||||
# exist in the first place, which is fine/expected/desired...
|
||||
with _translate_failures():
|
||||
self._client.delete(beat_id)
|
||||
except tooz.ToozError:
|
||||
LOG.warning("Unable to delete heartbeat key '%s'", beat_id,
|
||||
@ -752,5 +779,4 @@ return 1
|
||||
return result
|
||||
|
||||
|
||||
RedisFutureResult = functools.partial(coordination.CoordinatorResult,
|
||||
failure_translator=_translate_failures)
|
||||
RedisFutureResult = functools.partial(coordination.CoordinatorResult)
|
||||
|
Loading…
x
Reference in New Issue
Block a user