diff --git a/requirements.txt b/requirements.txt index 5796358f..99a915a7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ stevedore>=1.16.0 # Apache-2.0 voluptuous>=0.8.9 # BSD License msgpack>=0.4.0 # Apache-2.0 fasteners>=0.7 # Apache-2.0 -tenacity>=3.2.1, <7.0.0 # Apache-2.0 +tenacity>=3.2.1,<7.0.0 # Apache-2.0 futurist>=1.2.0 # Apache-2.0 oslo.utils>=4.7.0 # Apache-2.0 oslo.serialization>=1.10.0 # Apache-2.0 diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py index 0c01a1bf..5e753509 100644 --- a/tooz/drivers/redis.py +++ b/tooz/drivers/redis.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import contextlib from distutils import version import functools import logging @@ -35,19 +34,46 @@ from tooz import utils LOG = logging.getLogger(__name__) -@contextlib.contextmanager -def _translate_failures(): - """Translates common redis exceptions into tooz exceptions.""" - try: - yield - except (exceptions.ConnectionError, exceptions.TimeoutError) as e: - utils.raise_with_cause(coordination.ToozConnectionError, - encodeutils.exception_to_unicode(e), - cause=e) - except exceptions.RedisError as e: - utils.raise_with_cause(tooz.ToozError, - encodeutils.exception_to_unicode(e), - cause=e) +def _handle_failures(func=None, n_tries=15): + + """Translates common redis exceptions into tooz exceptions. + + This also enables retrying on certain exceptions. + + :param func: the function to act on + :param n_tries: the number of retries + """ + + if func is None: + return functools.partial( + _handle_failures, + n_tries=n_tries + ) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + ntries = n_tries + while ntries > 1: + try: + return func(*args, **kwargs) + except exceptions.ConnectionError as e: + # retry ntries times and then raise a connection error + ntries -= 1 + if ntries >= 1: + utils.raise_with_cause(coordination.ToozConnectionError, + encodeutils.exception_to_unicode(e), + cause=e) + + except (exceptions.TimeoutError) as e: + utils.raise_with_cause(coordination.ToozConnectionError, + encodeutils.exception_to_unicode(e), + cause=e) + except exceptions.RedisError as e: + utils.raise_with_cause(tooz.ToozError, + encodeutils.exception_to_unicode(e), + cause=e) + return func(*args, **kwargs) + return wrapper class RedisLock(locking.Lock): @@ -63,48 +89,48 @@ class RedisLock(locking.Lock): self._coord = coord self._client = client + @_handle_failures def is_still_owner(self): - with _translate_failures(): - lock_tok = self._lock.local.token - if not lock_tok: - return False - owner_tok = self._client.get(self.name) - return owner_tok == lock_tok + lock_tok = self._lock.local.token + if not lock_tok: + return False + owner_tok = self._client.get(self.name) + return owner_tok == lock_tok + @_handle_failures def break_(self): - with _translate_failures(): - return bool(self._client.delete(self.name)) + return bool(self._client.delete(self.name)) + @_handle_failures def acquire(self, blocking=True, shared=False): if shared: raise tooz.NotImplemented blocking, timeout = utils.convert_blocking(blocking) - with _translate_failures(): - acquired = self._lock.acquire( - blocking=blocking, blocking_timeout=timeout) - if acquired: - with self._exclusive_access: - self._coord._acquired_locks.add(self) - return acquired + acquired = self._lock.acquire( + blocking=blocking, blocking_timeout=timeout) + if acquired: + with self._exclusive_access: + self._coord._acquired_locks.add(self) + return acquired + @_handle_failures def release(self): with self._exclusive_access: - with _translate_failures(): - try: - self._lock.release() - except exceptions.LockError as e: - LOG.error("Unable to release lock '%r': %s", self, e) - return False - finally: - self._coord._acquired_locks.discard(self) - return True + try: + self._lock.release() + except exceptions.LockError as e: + LOG.error("Unable to release lock '%r': %s", self, e) + return False + finally: + self._coord._acquired_locks.discard(self) + return True + @_handle_failures def heartbeat(self): with self._exclusive_access: if self.acquired: - with _translate_failures(): - self._lock.reacquire() - return True + self._lock.reacquire() + return True return False @property @@ -433,6 +459,7 @@ return 1 return master_client return redis.StrictRedis(**kwargs) + @_handle_failures def _start(self): super(RedisDriver, self)._start() try: @@ -446,8 +473,7 @@ return 1 # Ensure that the server is alive and not dead, this does not # ensure the server will always be alive, but does insure that it # at least is alive once... - with _translate_failures(): - self._server_info = self._client.info() + self._server_info = self._client.info() # Validate we have a good enough redis version we are connected # to so that the basic set of features we support will actually # work (instead of blowing up). @@ -506,12 +532,13 @@ return 1 def _decode_group_id(self, group_id): return utils.to_binary(group_id, encoding=self._encoding) + @_handle_failures def heartbeat(self): - with _translate_failures(): - beat_id = self._encode_beat_id(self._member_id) - expiry_ms = max(0, int(self.membership_timeout * 1000.0)) - self._client.psetex(beat_id, time_ms=expiry_ms, - value=self.STILL_ALIVE) + beat_id = self._encode_beat_id(self._member_id) + expiry_ms = max(0, int(self.membership_timeout * 1000.0)) + self._client.psetex(beat_id, time_ms=expiry_ms, + value=self.STILL_ALIVE) + for lock in self._acquired_locks.copy(): try: lock.heartbeat() @@ -520,6 +547,7 @@ return 1 exc_info=True) return min(self.lock_timeout, self.membership_timeout) + @_handle_failures def _stop(self): while self._acquired_locks: lock = self._acquired_locks.pop() @@ -534,8 +562,7 @@ return 1 try: # NOTE(harlowja): this will delete nothing if the key doesn't # exist in the first place, which is fine/expected/desired... - with _translate_failures(): - self._client.delete(beat_id) + self._client.delete(beat_id) except tooz.ToozError: LOG.warning("Unable to delete heartbeat key '%s'", beat_id, exc_info=True) @@ -752,5 +779,4 @@ return 1 return result -RedisFutureResult = functools.partial(coordination.CoordinatorResult, - failure_translator=_translate_failures) +RedisFutureResult = functools.partial(coordination.CoordinatorResult)