Retry on redis connection errors
Sometimes, connections get closed by the server. This change
adds retrying to the code.
Change-Id: Iaab5ce609c0dcf7085f5dd43efbd37eb4b88f17b
(cherry picked from commit a025b4d4ee
)
This commit is contained in:
parent
fe9c430455
commit
62d5d17a5d
@ -6,7 +6,7 @@ stevedore>=1.16.0 # Apache-2.0
|
|||||||
voluptuous>=0.8.9 # BSD License
|
voluptuous>=0.8.9 # BSD License
|
||||||
msgpack>=0.4.0 # Apache-2.0
|
msgpack>=0.4.0 # Apache-2.0
|
||||||
fasteners>=0.7 # Apache-2.0
|
fasteners>=0.7 # Apache-2.0
|
||||||
tenacity>=3.2.1, <7.0.0 # Apache-2.0
|
tenacity>=3.2.1,<7.0.0 # Apache-2.0
|
||||||
futurist>=1.2.0 # Apache-2.0
|
futurist>=1.2.0 # Apache-2.0
|
||||||
oslo.utils>=4.7.0 # Apache-2.0
|
oslo.utils>=4.7.0 # Apache-2.0
|
||||||
oslo.serialization>=1.10.0 # Apache-2.0
|
oslo.serialization>=1.10.0 # Apache-2.0
|
||||||
|
@ -14,7 +14,6 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import contextlib
|
|
||||||
from distutils import version
|
from distutils import version
|
||||||
import functools
|
import functools
|
||||||
import logging
|
import logging
|
||||||
@ -35,19 +34,46 @@ from tooz import utils
|
|||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
def _handle_failures(func=None, n_tries=15):
|
||||||
def _translate_failures():
|
|
||||||
"""Translates common redis exceptions into tooz exceptions."""
|
"""Translates common redis exceptions into tooz exceptions.
|
||||||
try:
|
|
||||||
yield
|
This also enables retrying on certain exceptions.
|
||||||
except (exceptions.ConnectionError, exceptions.TimeoutError) as e:
|
|
||||||
utils.raise_with_cause(coordination.ToozConnectionError,
|
:param func: the function to act on
|
||||||
encodeutils.exception_to_unicode(e),
|
:param n_tries: the number of retries
|
||||||
cause=e)
|
"""
|
||||||
except exceptions.RedisError as e:
|
|
||||||
utils.raise_with_cause(tooz.ToozError,
|
if func is None:
|
||||||
encodeutils.exception_to_unicode(e),
|
return functools.partial(
|
||||||
cause=e)
|
_handle_failures,
|
||||||
|
n_tries=n_tries
|
||||||
|
)
|
||||||
|
|
||||||
|
@functools.wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
ntries = n_tries
|
||||||
|
while ntries > 1:
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
except exceptions.ConnectionError as e:
|
||||||
|
# retry ntries times and then raise a connection error
|
||||||
|
ntries -= 1
|
||||||
|
if ntries >= 1:
|
||||||
|
utils.raise_with_cause(coordination.ToozConnectionError,
|
||||||
|
encodeutils.exception_to_unicode(e),
|
||||||
|
cause=e)
|
||||||
|
|
||||||
|
except (exceptions.TimeoutError) as e:
|
||||||
|
utils.raise_with_cause(coordination.ToozConnectionError,
|
||||||
|
encodeutils.exception_to_unicode(e),
|
||||||
|
cause=e)
|
||||||
|
except exceptions.RedisError as e:
|
||||||
|
utils.raise_with_cause(tooz.ToozError,
|
||||||
|
encodeutils.exception_to_unicode(e),
|
||||||
|
cause=e)
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
class RedisLock(locking.Lock):
|
class RedisLock(locking.Lock):
|
||||||
@ -63,48 +89,48 @@ class RedisLock(locking.Lock):
|
|||||||
self._coord = coord
|
self._coord = coord
|
||||||
self._client = client
|
self._client = client
|
||||||
|
|
||||||
|
@_handle_failures
|
||||||
def is_still_owner(self):
|
def is_still_owner(self):
|
||||||
with _translate_failures():
|
lock_tok = self._lock.local.token
|
||||||
lock_tok = self._lock.local.token
|
if not lock_tok:
|
||||||
if not lock_tok:
|
return False
|
||||||
return False
|
owner_tok = self._client.get(self.name)
|
||||||
owner_tok = self._client.get(self.name)
|
return owner_tok == lock_tok
|
||||||
return owner_tok == lock_tok
|
|
||||||
|
|
||||||
|
@_handle_failures
|
||||||
def break_(self):
|
def break_(self):
|
||||||
with _translate_failures():
|
return bool(self._client.delete(self.name))
|
||||||
return bool(self._client.delete(self.name))
|
|
||||||
|
|
||||||
|
@_handle_failures
|
||||||
def acquire(self, blocking=True, shared=False):
|
def acquire(self, blocking=True, shared=False):
|
||||||
if shared:
|
if shared:
|
||||||
raise tooz.NotImplemented
|
raise tooz.NotImplemented
|
||||||
blocking, timeout = utils.convert_blocking(blocking)
|
blocking, timeout = utils.convert_blocking(blocking)
|
||||||
with _translate_failures():
|
acquired = self._lock.acquire(
|
||||||
acquired = self._lock.acquire(
|
blocking=blocking, blocking_timeout=timeout)
|
||||||
blocking=blocking, blocking_timeout=timeout)
|
if acquired:
|
||||||
if acquired:
|
with self._exclusive_access:
|
||||||
with self._exclusive_access:
|
self._coord._acquired_locks.add(self)
|
||||||
self._coord._acquired_locks.add(self)
|
return acquired
|
||||||
return acquired
|
|
||||||
|
|
||||||
|
@_handle_failures
|
||||||
def release(self):
|
def release(self):
|
||||||
with self._exclusive_access:
|
with self._exclusive_access:
|
||||||
with _translate_failures():
|
try:
|
||||||
try:
|
self._lock.release()
|
||||||
self._lock.release()
|
except exceptions.LockError as e:
|
||||||
except exceptions.LockError as e:
|
LOG.error("Unable to release lock '%r': %s", self, e)
|
||||||
LOG.error("Unable to release lock '%r': %s", self, e)
|
return False
|
||||||
return False
|
finally:
|
||||||
finally:
|
self._coord._acquired_locks.discard(self)
|
||||||
self._coord._acquired_locks.discard(self)
|
return True
|
||||||
return True
|
|
||||||
|
|
||||||
|
@_handle_failures
|
||||||
def heartbeat(self):
|
def heartbeat(self):
|
||||||
with self._exclusive_access:
|
with self._exclusive_access:
|
||||||
if self.acquired:
|
if self.acquired:
|
||||||
with _translate_failures():
|
self._lock.reacquire()
|
||||||
self._lock.reacquire()
|
return True
|
||||||
return True
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -433,6 +459,7 @@ return 1
|
|||||||
return master_client
|
return master_client
|
||||||
return redis.StrictRedis(**kwargs)
|
return redis.StrictRedis(**kwargs)
|
||||||
|
|
||||||
|
@_handle_failures
|
||||||
def _start(self):
|
def _start(self):
|
||||||
super(RedisDriver, self)._start()
|
super(RedisDriver, self)._start()
|
||||||
try:
|
try:
|
||||||
@ -446,8 +473,7 @@ return 1
|
|||||||
# Ensure that the server is alive and not dead, this does not
|
# Ensure that the server is alive and not dead, this does not
|
||||||
# ensure the server will always be alive, but does insure that it
|
# ensure the server will always be alive, but does insure that it
|
||||||
# at least is alive once...
|
# at least is alive once...
|
||||||
with _translate_failures():
|
self._server_info = self._client.info()
|
||||||
self._server_info = self._client.info()
|
|
||||||
# Validate we have a good enough redis version we are connected
|
# Validate we have a good enough redis version we are connected
|
||||||
# to so that the basic set of features we support will actually
|
# to so that the basic set of features we support will actually
|
||||||
# work (instead of blowing up).
|
# work (instead of blowing up).
|
||||||
@ -506,12 +532,13 @@ return 1
|
|||||||
def _decode_group_id(self, group_id):
|
def _decode_group_id(self, group_id):
|
||||||
return utils.to_binary(group_id, encoding=self._encoding)
|
return utils.to_binary(group_id, encoding=self._encoding)
|
||||||
|
|
||||||
|
@_handle_failures
|
||||||
def heartbeat(self):
|
def heartbeat(self):
|
||||||
with _translate_failures():
|
beat_id = self._encode_beat_id(self._member_id)
|
||||||
beat_id = self._encode_beat_id(self._member_id)
|
expiry_ms = max(0, int(self.membership_timeout * 1000.0))
|
||||||
expiry_ms = max(0, int(self.membership_timeout * 1000.0))
|
self._client.psetex(beat_id, time_ms=expiry_ms,
|
||||||
self._client.psetex(beat_id, time_ms=expiry_ms,
|
value=self.STILL_ALIVE)
|
||||||
value=self.STILL_ALIVE)
|
|
||||||
for lock in self._acquired_locks.copy():
|
for lock in self._acquired_locks.copy():
|
||||||
try:
|
try:
|
||||||
lock.heartbeat()
|
lock.heartbeat()
|
||||||
@ -520,6 +547,7 @@ return 1
|
|||||||
exc_info=True)
|
exc_info=True)
|
||||||
return min(self.lock_timeout, self.membership_timeout)
|
return min(self.lock_timeout, self.membership_timeout)
|
||||||
|
|
||||||
|
@_handle_failures
|
||||||
def _stop(self):
|
def _stop(self):
|
||||||
while self._acquired_locks:
|
while self._acquired_locks:
|
||||||
lock = self._acquired_locks.pop()
|
lock = self._acquired_locks.pop()
|
||||||
@ -534,8 +562,7 @@ return 1
|
|||||||
try:
|
try:
|
||||||
# NOTE(harlowja): this will delete nothing if the key doesn't
|
# NOTE(harlowja): this will delete nothing if the key doesn't
|
||||||
# exist in the first place, which is fine/expected/desired...
|
# exist in the first place, which is fine/expected/desired...
|
||||||
with _translate_failures():
|
self._client.delete(beat_id)
|
||||||
self._client.delete(beat_id)
|
|
||||||
except tooz.ToozError:
|
except tooz.ToozError:
|
||||||
LOG.warning("Unable to delete heartbeat key '%s'", beat_id,
|
LOG.warning("Unable to delete heartbeat key '%s'", beat_id,
|
||||||
exc_info=True)
|
exc_info=True)
|
||||||
@ -752,5 +779,4 @@ return 1
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
RedisFutureResult = functools.partial(coordination.CoordinatorResult,
|
RedisFutureResult = functools.partial(coordination.CoordinatorResult)
|
||||||
failure_translator=_translate_failures)
|
|
||||||
|
Loading…
Reference in New Issue
Block a user