Various fixes for locks and version compatibility

Avoid using lua locks

The default redis library will try to be smart about using
or not using lua based locks, it seems to not correctly predict
this correctly and then fails with errors such as:

''unknown command 'EVALSHA''

Upstream issue filed:

https://github.com/andymccurdy/redis-py/issues/550

The 'traditional' locks require PEXPIRE used by the redis-py
library which requires redis 2.6+ to actually work. So when
we are using a server version that is not new enough
we should conditionally raise a not implemented error to
avoid it later failing (which it will).

This also fixes a bunch of encoding issues that the enabling
of the py33 system now exposed (damn encodings...)

Change-Id: Ied6045276c9a1398db710a68aa75fe92aac9b4bb
This commit is contained in:
Joshua Harlow 2014-10-28 10:07:18 -07:00
parent eb0c584949
commit 3a470225f9
1 changed files with 92 additions and 24 deletions

View File

@ -17,6 +17,7 @@
from __future__ import absolute_import
import contextlib
from distutils import version
import logging
from concurrent import futures
@ -24,6 +25,7 @@ import msgpack
from oslo.utils import strutils
import redis
from redis import exceptions
from redis import lock as redis_locks
import six
import tooz
@ -34,6 +36,40 @@ from tooz import utils
LOG = logging.getLogger(__name__)
def _version_checker(version_required):
"""Checks server version is supported *before* running decorated method."""
if isinstance(version_required, six.string_types):
desired_version = version.LooseVersion(version_required)
else:
raise TypeError("Version decorator expects a string type")
def wrapper(meth):
@six.wraps(meth)
def decorator(self, *args, **kwargs):
try:
redis_version = version.LooseVersion(
self._server_info['redis_version'])
except KeyError:
# Assume it works if the server doesn't have this info (or
# doesn't have it yet).
pass
else:
if redis_version < desired_version:
raise tooz.NotImplemented("Redis version greater than or"
" equal to '%s' is required"
" to use this feature; '%s' is"
" being used which is not new"
" enough" % (desired_version,
redis_version))
return meth(self, *args, **kwargs)
return decorator
return wrapper
@contextlib.contextmanager
def _translate_failures():
"""Translates common redis exceptions into tooz exceptions."""
@ -48,7 +84,15 @@ def _translate_failures():
class RedisLock(locking.Lock):
def __init__(self, coord, client, name, timeout):
self._name = "%s_%s_lock" % (coord.namespace, six.text_type(name))
self._lock = client.lock(self._name, timeout=timeout)
# Avoid using lua locks to keep compatible with more versions
# of redis (and not just the ones that have lua support, also avoids
# ones that don't appear to have fully working lua support...)
#
# Issue opened: https://github.com/andymccurdy/redis-py/issues/550
#
# When that gets fixed (and detects the servers capabilities better
# we can likely turn this back on to being smart).
self._lock = redis_locks.Lock(client, self._name, timeout=timeout)
self._coord = coord
self._acquired = False
@ -112,8 +156,15 @@ class RedisDriver(coordination.CoordinationDriver):
# key will disappear which means we can't tell the difference between
# a group not existing and a group being empty without this key being
# saved...
_GROUP_EXISTS = '__created__'
_NAMESPACE_SEP = ':'
_GROUP_EXISTS = b'__created__'
_NAMESPACE_SEP = b':'
# This is for python3.x; which will behave differently when returned
# binary types or unicode types (redis uses binary internally it appears),
# so to just stick with a common way of doing this, make all the things
# binary (with this default encoding if one is not given and a unicode
# string is provided).
_DEFAULT_ENCODING = 'utf8'
# These are used when extracting options from to make a client.
#
@ -144,20 +195,29 @@ class RedisDriver(coordination.CoordinationDriver):
super(RedisDriver, self).__init__()
self._parsed_url = parsed_url
self._options = options
encoding = options.get('encoding', [self._DEFAULT_ENCODING])
self._encoding = encoding[-1]
timeout = options.get('timeout', [self._CLIENT_DEFAULT_SOCKET_TO])
self.timeout = int(timeout[-1])
lock_timeout = options.get('lock_timeout', [self.timeout])
self.lock_timeout = int(lock_timeout[-1])
self._namespace = options.get('namespace', '_tooz')
self._group_prefix = "%s_group" % (self._namespace)
self._leader_prefix = "%s_leader" % (self._namespace)
self._groups = "%s_groups" % (self._namespace)
namespace = options.get('namespace', ['_tooz'])[-1]
self._namespace = self._to_binary(namespace)
self._group_prefix = self._namespace + b"_group"
self._leader_prefix = self._namespace + b"_leader"
self._groups = self._namespace + b"_groups"
self._client = None
self._member_id = member_id
self._member_id = self._to_binary(member_id)
self._acquired_locks = set()
self._joined_groups = set()
self._executor = None
self._started = False
self._server_info = {}
def _to_binary(self, text):
if not isinstance(text, six.binary_type):
text = text.encode(self._encoding)
return text
@property
def namespace(self):
@ -167,6 +227,9 @@ class RedisDriver(coordination.CoordinationDriver):
def running(self):
return self._started
# 2.6.0 is required since internally PEXPIRE is used and that was only
# added in 2.6.0; so avoid using this on versions less than that...
@_version_checker("2.6.0")
def get_lock(self, name):
return RedisLock(self, self._client, name, self.lock_timeout)
@ -230,26 +293,28 @@ class RedisDriver(coordination.CoordinationDriver):
# Ensure that the server is alive and not dead, this does not
# ensure the server will always be alive, but does insure that it
# at least is alive once...
with _translate_failures():
self._server_info = self._client.info()
self.heartbeat()
self._started = True
@classmethod
def _encode_member_id(cls, member_id):
if member_id == cls._GROUP_EXISTS:
def _encode_member_id(self, member_id):
member_id = self._to_binary(member_id)
if member_id == self._GROUP_EXISTS:
raise ValueError("Not allowed to use private keys as a member id")
return six.text_type(member_id)
@staticmethod
def _decode_member_id(member_id):
return member_id
def _encode_group_id(self, group_id):
return self._NAMESPACE_SEP.join([self._group_prefix,
six.text_type(group_id)])
def _decode_member_id(self, member_id):
return self._to_binary(member_id)
def _encode_group_leader(self, group_id):
return self._NAMESPACE_SEP.join([self._leader_prefix,
six.text_type(group_id)])
def _encode_group_id(self, group_id, apply_namespace=True):
group_id = self._to_binary(group_id)
if not apply_namespace:
return group_id
return self._NAMESPACE_SEP.join([self._group_prefix, group_id])
def _decode_group_id(self, group_id):
return self._to_binary(group_id)
def heartbeat(self):
with _translate_failures():
@ -280,6 +345,7 @@ class RedisDriver(coordination.CoordinationDriver):
self._executor = None
if self._client is not None:
self._client = None
self._server_info = {}
self._started = False
def _submit(self, cb, *args, **kwargs):
@ -297,7 +363,8 @@ class RedisDriver(coordination.CoordinationDriver):
def _create_group(p):
if p.exists(encoded_group):
raise coordination.GroupAlreadyExist(group_id)
p.sadd(self._groups, group_id)
p.sadd(self._groups,
self._encode_group_id(group_id, apply_namespace=False))
# Add our special key to avoid redis from deleting the dictionary
# when it becomes empty (which is not what we currently want)...
p.hset(encoded_group, self._GROUP_EXISTS, '1')
@ -350,8 +417,9 @@ class RedisDriver(coordination.CoordinationDriver):
raise coordination.GroupNotCreated(group_id)
members = []
for m in p.hkeys(encoded_group):
m = self._decode_member_id(m)
if m != self._GROUP_EXISTS:
members.append(self._decode_member_id(m))
members.append(m)
return members
return RedisFutureResult(self._submit(self._client.transaction,
@ -401,7 +469,7 @@ class RedisDriver(coordination.CoordinationDriver):
def _get_groups():
results = []
for g in self._client.smembers(self._groups):
results.append(g)
results.append(self._decode_group_id(g))
return results
return RedisFutureResult(self._submit(_get_groups))