806 lines
30 KiB
Python
806 lines
30 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from distutils import version
|
|
import functools
|
|
import logging
|
|
import re
|
|
import string
|
|
import threading
|
|
|
|
from oslo_utils import encodeutils
|
|
from oslo_utils import strutils
|
|
import redis
|
|
from redis import exceptions
|
|
from redis import sentinel
|
|
|
|
import tooz
|
|
from tooz import coordination
|
|
from tooz import locking
|
|
from tooz import utils
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
def _handle_failures(n_tries=15):
|
|
|
|
"""Translates common redis exceptions into tooz exceptions.
|
|
|
|
This also enables retrying on certain exceptions.
|
|
|
|
:param func: the function to act on
|
|
:param n_tries: the number of retries
|
|
"""
|
|
def inner(func):
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
ntries = n_tries
|
|
while ntries:
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except exceptions.ConnectionError as e:
|
|
# retry ntries times and then raise a connection error
|
|
ntries -= 1
|
|
if not ntries:
|
|
LOG.debug(
|
|
"Redis connection error, "
|
|
"retry limit has been reached, aborting - %s", e
|
|
)
|
|
utils.raise_with_cause(
|
|
coordination.ToozConnectionError,
|
|
encodeutils.exception_to_unicode(e),
|
|
cause=e)
|
|
LOG.debug("Redis connection error, will retry - %s", e)
|
|
|
|
except (exceptions.TimeoutError) as e:
|
|
utils.raise_with_cause(coordination.ToozConnectionError,
|
|
encodeutils.exception_to_unicode(e),
|
|
cause=e)
|
|
except exceptions.RedisError as e:
|
|
utils.raise_with_cause(tooz.ToozError,
|
|
encodeutils.exception_to_unicode(e),
|
|
cause=e)
|
|
return wrapper
|
|
return inner
|
|
|
|
|
|
class RedisLock(locking.Lock):
|
|
def __init__(self, coord, client, name, timeout):
|
|
name = "%s_%s_lock" % (coord.namespace, str(name))
|
|
super(RedisLock, self).__init__(name)
|
|
# NOTE(jd) Make sure we don't release and heartbeat at the same time by
|
|
# using a exclusive access lock (LP#1557593)
|
|
self._exclusive_access = threading.Lock()
|
|
self._lock = client.lock(name,
|
|
timeout=timeout,
|
|
thread_local=False)
|
|
self._coord = coord
|
|
self._client = client
|
|
|
|
@_handle_failures()
|
|
def is_still_owner(self):
|
|
lock_tok = self._lock.local.token
|
|
if not lock_tok:
|
|
return False
|
|
owner_tok = self._client.get(self.name)
|
|
return owner_tok == lock_tok
|
|
|
|
@_handle_failures()
|
|
def break_(self):
|
|
return bool(self._client.delete(self.name))
|
|
|
|
@_handle_failures()
|
|
def acquire(self, blocking=True, shared=False):
|
|
if shared:
|
|
raise tooz.NotImplemented
|
|
blocking, timeout = utils.convert_blocking(blocking)
|
|
acquired = self._lock.acquire(
|
|
blocking=blocking, blocking_timeout=timeout)
|
|
if acquired:
|
|
with self._exclusive_access:
|
|
self._coord._acquired_locks.add(self)
|
|
return acquired
|
|
|
|
@_handle_failures()
|
|
def release(self):
|
|
with self._exclusive_access:
|
|
try:
|
|
self._lock.release()
|
|
except exceptions.LockError as e:
|
|
LOG.error("Unable to release lock '%r': %s", self, e)
|
|
return False
|
|
finally:
|
|
self._coord._acquired_locks.discard(self)
|
|
return True
|
|
|
|
@_handle_failures()
|
|
def heartbeat(self):
|
|
with self._exclusive_access:
|
|
if self.acquired:
|
|
self._lock.reacquire()
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def acquired(self):
|
|
return self in self._coord._acquired_locks
|
|
|
|
|
|
class RedisDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|
coordination.CoordinationDriverWithExecutor):
|
|
"""Redis provides a few nice benefits that act as a poormans zookeeper.
|
|
|
|
It **is** fully functional and implements all of the coordination
|
|
driver API(s). It stores data into `redis`_ using the provided `redis`_
|
|
API(s) using `msgpack`_ encoded values as needed.
|
|
|
|
- Durability (when setup with `AOF`_ mode).
|
|
- Consistent, note that this is still restricted to only
|
|
one redis server, without the recently released redis (alpha)
|
|
clustering > 1 server will not be consistent when partitions
|
|
or failures occur (even redis clustering docs state it is
|
|
not a fully AP or CP solution, which means even with it there
|
|
will still be *potential* inconsistencies).
|
|
- Master/slave failover (when setup with redis `sentinel`_), giving
|
|
some notion of HA (values *can* be lost when a failover transition
|
|
occurs).
|
|
|
|
The Redis driver connection URI should look like::
|
|
|
|
redis://[USERNAME:PASSWORD@]HOST:PORT
|
|
[?OPTION=VALUE[&OPTION2=VALUE2[&...]]]
|
|
|
|
For a list of options recognized by this driver, see the documentation
|
|
for the member CLIENT_ARGS, and to determine the expected types of those
|
|
options see CLIENT_BOOL_ARGS, CLIENT_INT_ARGS, and CLIENT_LIST_ARGS.
|
|
|
|
To use a `sentinel`_ the connection URI must point to the sentinel server.
|
|
At connection time the sentinel will be asked for the current IP and port
|
|
of the master and then connect there. The connection URI for sentinel
|
|
should be written as follows::
|
|
|
|
redis://<sentinel host>:<sentinel port>?sentinel=<master name>
|
|
|
|
Additional sentinel hosts are listed with multiple ``sentinel_fallback``
|
|
parameters as follows::
|
|
|
|
redis://<sentinel host>:<sentinel port>?sentinel=<master name>&
|
|
sentinel_fallback=<other sentinel host>:<sentinel port>&
|
|
sentinel_fallback=<other sentinel host>:<sentinel port>&
|
|
sentinel_fallback=<other sentinel host>:<sentinel port>
|
|
|
|
Further resources/links:
|
|
|
|
- http://redis.io/
|
|
- http://redis.io/topics/sentinel
|
|
- http://redis.io/topics/cluster-spec
|
|
|
|
Note that this client will itself retry on transaction failure (when they
|
|
keys being watched have changed underneath the current transaction).
|
|
Currently the number of attempts that are tried is infinite (this might
|
|
be addressed in https://github.com/andymccurdy/redis-py/issues/566 when
|
|
that gets worked on). See http://redis.io/topics/transactions for more
|
|
information on this topic.
|
|
|
|
General recommendations/usage considerations:
|
|
|
|
- When used for locks, run in AOF mode and think carefully about how
|
|
your redis deployment handles losing a server (the clustering support
|
|
is supposed to aid in losing servers, but it is also of unknown
|
|
reliablity and is relatively new, so use at your own risk).
|
|
|
|
.. _redis: http://redis.io/
|
|
.. _msgpack: http://msgpack.org/
|
|
.. _sentinel: http://redis.io/topics/sentinel
|
|
.. _AOF: http://redis.io/topics/persistence
|
|
"""
|
|
|
|
CHARACTERISTICS = (
|
|
coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS,
|
|
coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES,
|
|
coordination.Characteristics.DISTRIBUTED_ACROSS_HOSTS,
|
|
coordination.Characteristics.CAUSAL,
|
|
)
|
|
"""
|
|
Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable
|
|
enum member(s) that can be used to interogate how this driver works.
|
|
"""
|
|
|
|
MIN_VERSION = version.LooseVersion("2.6.0")
|
|
"""
|
|
The min redis version that this driver requires to operate with...
|
|
"""
|
|
|
|
GROUP_EXISTS = b'__created__'
|
|
"""
|
|
Redis deletes dictionaries that have no keys in them, which means the
|
|
key will disappear which means we can't tell the difference between
|
|
a group not existing and a group being empty without this key being
|
|
saved...
|
|
"""
|
|
|
|
#: Value used (with group exists key) to keep a group from disappearing.
|
|
GROUP_EXISTS_VALUE = b'1'
|
|
|
|
#: Default namespace for keys when none is provided.
|
|
DEFAULT_NAMESPACE = b'_tooz'
|
|
|
|
NAMESPACE_SEP = b':'
|
|
"""
|
|
Separator that is used to combine a key with the namespace (to get
|
|
the **actual** key that will be used).
|
|
"""
|
|
|
|
DEFAULT_ENCODING = 'utf8'
|
|
"""
|
|
This is for python3.x; which will behave differently when returned
|
|
binary types or unicode types (redis uses binary internally it appears),
|
|
so to just stick with a common way of doing this, make all the things
|
|
binary (with this default encoding if one is not given and a unicode
|
|
string is provided).
|
|
"""
|
|
|
|
CLIENT_ARGS = frozenset([
|
|
'db',
|
|
'encoding',
|
|
'retry_on_timeout',
|
|
'socket_keepalive',
|
|
'socket_timeout',
|
|
'socket_connect_timeout',
|
|
'ssl',
|
|
'ssl_certfile',
|
|
'ssl_keyfile',
|
|
'ssl_ca_certs',
|
|
'sentinel',
|
|
'sentinel_fallback',
|
|
])
|
|
"""
|
|
Keys that we allow to proxy from the coordinator configuration into the
|
|
redis client (used to configure the redis client internals so that
|
|
it works as you expect/want it to).
|
|
|
|
See: http://redis-py.readthedocs.org/en/latest/#redis.Redis
|
|
|
|
See: https://github.com/andymccurdy/redis-py/blob/2.10.3/redis/client.py
|
|
"""
|
|
|
|
#: Client arguments that are expected/allowed to be lists.
|
|
CLIENT_LIST_ARGS = frozenset([
|
|
'sentinel_fallback',
|
|
])
|
|
|
|
#: Client arguments that are expected to be boolean convertible.
|
|
CLIENT_BOOL_ARGS = frozenset([
|
|
'retry_on_timeout',
|
|
'ssl',
|
|
])
|
|
|
|
#: Client arguments that are expected to be int convertible.
|
|
CLIENT_INT_ARGS = frozenset([
|
|
'db',
|
|
'socket_keepalive',
|
|
])
|
|
|
|
#: Client arguments that are expected to be float convertible.
|
|
CLIENT_FLOAT_ARGS = frozenset([
|
|
'socket_timeout',
|
|
'socket_connect_timeout',
|
|
])
|
|
|
|
#: Default socket timeout to use when none is provided.
|
|
CLIENT_DEFAULT_SOCKET_TO = 30
|
|
|
|
#: String used to keep a key/member alive (until it next expires).
|
|
STILL_ALIVE = b"Not dead!"
|
|
|
|
SCRIPTS = {
|
|
'create_group': """
|
|
-- Extract *all* the variables (so we can easily know what they are)...
|
|
local namespaced_group_key = KEYS[1]
|
|
local all_groups_key = KEYS[2]
|
|
local no_namespaced_group_key = ARGV[1]
|
|
if redis.call("exists", namespaced_group_key) == 1 then
|
|
return 0
|
|
end
|
|
redis.call("sadd", all_groups_key, no_namespaced_group_key)
|
|
redis.call("hset", namespaced_group_key,
|
|
"${group_existence_key}", "${group_existence_value}")
|
|
return 1
|
|
""",
|
|
'delete_group': """
|
|
-- Extract *all* the variables (so we can easily know what they are)...
|
|
local namespaced_group_key = KEYS[1]
|
|
local all_groups_key = KEYS[2]
|
|
local no_namespaced_group_key = ARGV[1]
|
|
if redis.call("exists", namespaced_group_key) == 0 then
|
|
return -1
|
|
end
|
|
if redis.call("sismember", all_groups_key, no_namespaced_group_key) == 0 then
|
|
return -2
|
|
end
|
|
if redis.call("hlen", namespaced_group_key) > 1 then
|
|
return -3
|
|
end
|
|
-- First remove from the set (then delete the group); if the set removal
|
|
-- fails, at least the group will still exist (and can be fixed manually)...
|
|
if redis.call("srem", all_groups_key, no_namespaced_group_key) == 0 then
|
|
return -4
|
|
end
|
|
redis.call("del", namespaced_group_key)
|
|
return 1
|
|
""",
|
|
'update_capabilities': """
|
|
-- Extract *all* the variables (so we can easily know what they are)...
|
|
local group_key = KEYS[1]
|
|
local member_id = ARGV[1]
|
|
local caps = ARGV[2]
|
|
if redis.call("exists", group_key) == 0 then
|
|
return -1
|
|
end
|
|
if redis.call("hexists", group_key, member_id) == 0 then
|
|
return -2
|
|
end
|
|
redis.call("hset", group_key, member_id, caps)
|
|
return 1
|
|
""",
|
|
}
|
|
"""`Lua`_ **template** scripts that will be used by various methods (they
|
|
are turned into real scripts and loaded on call into the :func:`.start`
|
|
method).
|
|
|
|
.. _Lua: http://www.lua.org
|
|
"""
|
|
|
|
EXCLUDE_OPTIONS = CLIENT_LIST_ARGS
|
|
|
|
def __init__(self, member_id, parsed_url, options):
|
|
super(RedisDriver, self).__init__(member_id, parsed_url, options)
|
|
self._parsed_url = parsed_url
|
|
self._encoding = self._options.get('encoding', self.DEFAULT_ENCODING)
|
|
timeout = self._options.get('timeout', self.CLIENT_DEFAULT_SOCKET_TO)
|
|
self.timeout = int(timeout)
|
|
self.membership_timeout = float(self._options.get(
|
|
'membership_timeout', timeout))
|
|
lock_timeout = self._options.get('lock_timeout', self.timeout)
|
|
self.lock_timeout = int(lock_timeout)
|
|
namespace = self._options.get('namespace', self.DEFAULT_NAMESPACE)
|
|
self._namespace = utils.to_binary(namespace, encoding=self._encoding)
|
|
self._group_prefix = self._namespace + b"_group"
|
|
self._beat_prefix = self._namespace + b"_beats"
|
|
self._groups = self._namespace + b"_groups"
|
|
self._client = None
|
|
self._acquired_locks = set()
|
|
self._started = False
|
|
self._server_info = {}
|
|
self._scripts = {}
|
|
|
|
def _check_fetch_redis_version(self, geq_version, not_existent=True):
|
|
if isinstance(geq_version, str):
|
|
desired_version = version.LooseVersion(geq_version)
|
|
elif isinstance(geq_version, version.LooseVersion):
|
|
desired_version = geq_version
|
|
else:
|
|
raise TypeError("Version check expects a string/version type")
|
|
try:
|
|
redis_version = version.LooseVersion(
|
|
self._server_info['redis_version'])
|
|
except KeyError:
|
|
return (not_existent, None)
|
|
else:
|
|
if redis_version < desired_version:
|
|
return (False, redis_version)
|
|
else:
|
|
return (True, redis_version)
|
|
|
|
@property
|
|
def namespace(self):
|
|
return self._namespace
|
|
|
|
@property
|
|
def running(self):
|
|
return self._started
|
|
|
|
def get_lock(self, name):
|
|
return RedisLock(self, self._client, name, self.lock_timeout)
|
|
|
|
_dumps = staticmethod(utils.dumps)
|
|
_loads = staticmethod(utils.loads)
|
|
|
|
@classmethod
|
|
def _parse_sentinel(cls, sentinel):
|
|
# IPv6 (eg. [::1]:6379 )
|
|
match = re.search(r'\[(\S+)\]:(\d+)', sentinel)
|
|
if match:
|
|
return (match[1], int(match[2]))
|
|
# IPv4 or hostname (eg. 127.0.0.1:6379 or localhost:6379)
|
|
match = re.search(r'(\S+):(\d+)', sentinel)
|
|
if match:
|
|
return (match[1], int(match[2]))
|
|
raise ValueError('Malformed sentinel server format')
|
|
|
|
@classmethod
|
|
def _make_client(cls, parsed_url, options, default_socket_timeout):
|
|
kwargs = {}
|
|
if parsed_url.hostname:
|
|
kwargs['host'] = parsed_url.hostname
|
|
if parsed_url.port:
|
|
kwargs['port'] = parsed_url.port
|
|
else:
|
|
if not parsed_url.path:
|
|
raise ValueError("Expected socket path in parsed urls path")
|
|
kwargs['unix_socket_path'] = parsed_url.path
|
|
if parsed_url.username:
|
|
kwargs['username'] = parsed_url.username
|
|
if parsed_url.password:
|
|
kwargs['password'] = parsed_url.password
|
|
for a in cls.CLIENT_ARGS:
|
|
if a not in options:
|
|
continue
|
|
if a in cls.CLIENT_BOOL_ARGS:
|
|
v = strutils.bool_from_string(options[a])
|
|
elif a in cls.CLIENT_LIST_ARGS:
|
|
v = options[a]
|
|
elif a in cls.CLIENT_INT_ARGS:
|
|
v = int(options[a])
|
|
elif a in cls.CLIENT_FLOAT_ARGS:
|
|
v = float(options[a])
|
|
else:
|
|
v = options[a]
|
|
kwargs[a] = v
|
|
if 'socket_timeout' not in kwargs:
|
|
kwargs['socket_timeout'] = default_socket_timeout
|
|
|
|
# Ask the sentinel for the current master if there is a
|
|
# sentinel arg.
|
|
if 'sentinel' in kwargs:
|
|
sentinel_hosts = [
|
|
cls._parse_sentinel(fallback)
|
|
for fallback in kwargs.pop('sentinel_fallback', [])
|
|
]
|
|
sentinel_hosts.insert(0, (kwargs['host'], kwargs['port']))
|
|
sentinel_name = kwargs.pop('sentinel')
|
|
sentinel_server = sentinel.Sentinel(
|
|
sentinel_hosts,
|
|
sentinel_kwargs=kwargs,
|
|
**kwargs)
|
|
master_client = sentinel_server.master_for(sentinel_name)
|
|
# The master_client is a redis.Redis using a
|
|
# Sentinel managed connection pool.
|
|
return master_client
|
|
return redis.Redis(**kwargs)
|
|
|
|
@_handle_failures()
|
|
def _start(self):
|
|
super(RedisDriver, self)._start()
|
|
try:
|
|
self._client = self._make_client(self._parsed_url, self._options,
|
|
self.timeout)
|
|
except exceptions.RedisError as e:
|
|
utils.raise_with_cause(coordination.ToozConnectionError,
|
|
encodeutils.exception_to_unicode(e),
|
|
cause=e)
|
|
else:
|
|
# Ensure that the server is alive and not dead, this does not
|
|
# ensure the server will always be alive, but does insure that it
|
|
# at least is alive once...
|
|
self._server_info = self._client.info()
|
|
# Validate we have a good enough redis version we are connected
|
|
# to so that the basic set of features we support will actually
|
|
# work (instead of blowing up).
|
|
new_enough, redis_version = self._check_fetch_redis_version(
|
|
self.MIN_VERSION)
|
|
if not new_enough:
|
|
raise tooz.NotImplemented("Redis version greater than or"
|
|
" equal to '%s' is required"
|
|
" to use this driver; '%s' is"
|
|
" being used which is not new"
|
|
" enough" % (self.MIN_VERSION,
|
|
redis_version))
|
|
tpl_params = {
|
|
'group_existence_value': self.GROUP_EXISTS_VALUE,
|
|
'group_existence_key': self.GROUP_EXISTS,
|
|
}
|
|
# For py3.x ensure these are unicode since the string template
|
|
# replacement will expect unicode (and we don't want b'' as a
|
|
# prefix which will happen in py3.x if this is not done).
|
|
for (k, v) in tpl_params.copy().items():
|
|
if isinstance(v, bytes):
|
|
v = v.decode('ascii')
|
|
tpl_params[k] = v
|
|
prepared_scripts = {}
|
|
for name, raw_script_tpl in self.SCRIPTS.items():
|
|
script_tpl = string.Template(raw_script_tpl)
|
|
script = script_tpl.substitute(**tpl_params)
|
|
prepared_scripts[name] = self._client.register_script(script)
|
|
self._scripts = prepared_scripts
|
|
self.heartbeat()
|
|
self._started = True
|
|
|
|
def _encode_beat_id(self, member_id):
|
|
member_id = utils.to_binary(member_id, encoding=self._encoding)
|
|
return self.NAMESPACE_SEP.join([self._beat_prefix, member_id])
|
|
|
|
def _encode_member_id(self, member_id):
|
|
member_id = utils.to_binary(member_id, encoding=self._encoding)
|
|
if member_id == self.GROUP_EXISTS:
|
|
raise ValueError("Not allowed to use private keys as a member id")
|
|
return member_id
|
|
|
|
def _decode_member_id(self, member_id):
|
|
return utils.to_binary(member_id, encoding=self._encoding)
|
|
|
|
def _encode_group_leader(self, group_id):
|
|
group_id = utils.to_binary(group_id, encoding=self._encoding)
|
|
return b"leader_of_" + group_id
|
|
|
|
def _encode_group_id(self, group_id, apply_namespace=True):
|
|
group_id = utils.to_binary(group_id, encoding=self._encoding)
|
|
if not apply_namespace:
|
|
return group_id
|
|
return self.NAMESPACE_SEP.join([self._group_prefix, group_id])
|
|
|
|
def _decode_group_id(self, group_id):
|
|
return utils.to_binary(group_id, encoding=self._encoding)
|
|
|
|
@_handle_failures()
|
|
def heartbeat(self):
|
|
beat_id = self._encode_beat_id(self._member_id)
|
|
expiry_ms = max(0, int(self.membership_timeout * 1000.0))
|
|
self._client.psetex(beat_id, time_ms=expiry_ms,
|
|
value=self.STILL_ALIVE)
|
|
|
|
for lock in self._acquired_locks.copy():
|
|
try:
|
|
lock.heartbeat()
|
|
except tooz.ToozError:
|
|
LOG.warning("Unable to heartbeat lock '%s'", lock,
|
|
exc_info=True)
|
|
return min(self.lock_timeout, self.membership_timeout)
|
|
|
|
@_handle_failures()
|
|
def _stop(self):
|
|
while self._acquired_locks:
|
|
lock = self._acquired_locks.pop()
|
|
try:
|
|
lock.release()
|
|
except tooz.ToozError:
|
|
LOG.warning("Unable to release lock '%s'", lock, exc_info=True)
|
|
super(RedisDriver, self)._stop()
|
|
if self._client is not None:
|
|
# Make sure we no longer exist...
|
|
beat_id = self._encode_beat_id(self._member_id)
|
|
try:
|
|
# NOTE(harlowja): this will delete nothing if the key doesn't
|
|
# exist in the first place, which is fine/expected/desired...
|
|
self._client.delete(beat_id)
|
|
except tooz.ToozError:
|
|
LOG.warning("Unable to delete heartbeat key '%s'", beat_id,
|
|
exc_info=True)
|
|
self._client = None
|
|
self._server_info = {}
|
|
self._scripts.clear()
|
|
self._started = False
|
|
|
|
def _submit(self, cb, *args, **kwargs):
|
|
if not self._started:
|
|
raise tooz.ToozError("Redis driver has not been started")
|
|
return self._executor.submit(cb, *args, **kwargs)
|
|
|
|
def _get_script(self, script_key):
|
|
try:
|
|
return self._scripts[script_key]
|
|
except KeyError:
|
|
raise tooz.ToozError("Redis driver has not been started")
|
|
|
|
def create_group(self, group_id):
|
|
script = self._get_script('create_group')
|
|
|
|
def _create_group(script):
|
|
encoded_group = self._encode_group_id(group_id)
|
|
keys = [
|
|
encoded_group,
|
|
self._groups,
|
|
]
|
|
args = [
|
|
self._encode_group_id(group_id, apply_namespace=False),
|
|
]
|
|
result = script(keys=keys, args=args)
|
|
result = strutils.bool_from_string(result)
|
|
if not result:
|
|
raise coordination.GroupAlreadyExist(group_id)
|
|
|
|
return RedisFutureResult(self._submit(_create_group, script))
|
|
|
|
def update_capabilities(self, group_id, capabilities):
|
|
script = self._get_script('update_capabilities')
|
|
|
|
def _update_capabilities(script):
|
|
keys = [
|
|
self._encode_group_id(group_id),
|
|
]
|
|
args = [
|
|
self._encode_member_id(self._member_id),
|
|
self._dumps(capabilities),
|
|
]
|
|
result = int(script(keys=keys, args=args))
|
|
if result == -1:
|
|
raise coordination.GroupNotCreated(group_id)
|
|
if result == -2:
|
|
raise coordination.MemberNotJoined(group_id, self._member_id)
|
|
|
|
return RedisFutureResult(self._submit(_update_capabilities, script))
|
|
|
|
def leave_group(self, group_id):
|
|
encoded_group = self._encode_group_id(group_id)
|
|
encoded_member_id = self._encode_member_id(self._member_id)
|
|
|
|
def _leave_group(p):
|
|
if not p.exists(encoded_group):
|
|
raise coordination.GroupNotCreated(group_id)
|
|
p.multi()
|
|
p.hdel(encoded_group, encoded_member_id)
|
|
c = p.execute()[0]
|
|
if c == 0:
|
|
raise coordination.MemberNotJoined(group_id, self._member_id)
|
|
else:
|
|
self._joined_groups.discard(group_id)
|
|
|
|
return RedisFutureResult(self._submit(self._client.transaction,
|
|
_leave_group, encoded_group,
|
|
value_from_callable=True))
|
|
|
|
def get_members(self, group_id):
|
|
encoded_group = self._encode_group_id(group_id)
|
|
|
|
def _get_members(p):
|
|
if not p.exists(encoded_group):
|
|
raise coordination.GroupNotCreated(group_id)
|
|
potential_members = set()
|
|
for m in p.hkeys(encoded_group):
|
|
m = self._decode_member_id(m)
|
|
if m != self.GROUP_EXISTS:
|
|
potential_members.add(m)
|
|
if not potential_members:
|
|
return set()
|
|
# Ok now we need to see which members have passed away...
|
|
gone_members = set()
|
|
member_values = p.mget(map(self._encode_beat_id,
|
|
potential_members))
|
|
for (potential_member, value) in zip(potential_members,
|
|
member_values):
|
|
# Always preserve self (just incase we haven't heartbeated
|
|
# while this call/s was being made...), this does *not* prevent
|
|
# another client from removing this though...
|
|
if potential_member == self._member_id:
|
|
continue
|
|
if not value:
|
|
gone_members.add(potential_member)
|
|
# Trash all the members that no longer are with us... RIP...
|
|
if gone_members:
|
|
p.multi()
|
|
encoded_gone_members = list(self._encode_member_id(m)
|
|
for m in gone_members)
|
|
p.hdel(encoded_group, *encoded_gone_members)
|
|
p.execute()
|
|
return set(m for m in potential_members
|
|
if m not in gone_members)
|
|
return potential_members
|
|
|
|
return RedisFutureResult(self._submit(self._client.transaction,
|
|
_get_members, encoded_group,
|
|
value_from_callable=True))
|
|
|
|
def get_member_capabilities(self, group_id, member_id):
|
|
encoded_group = self._encode_group_id(group_id)
|
|
encoded_member_id = self._encode_member_id(member_id)
|
|
|
|
def _get_member_capabilities(p):
|
|
if not p.exists(encoded_group):
|
|
raise coordination.GroupNotCreated(group_id)
|
|
capabilities = p.hget(encoded_group, encoded_member_id)
|
|
if capabilities is None:
|
|
raise coordination.MemberNotJoined(group_id, member_id)
|
|
return self._loads(capabilities)
|
|
|
|
return RedisFutureResult(self._submit(self._client.transaction,
|
|
_get_member_capabilities,
|
|
encoded_group,
|
|
value_from_callable=True))
|
|
|
|
def join_group(self, group_id, capabilities=b""):
|
|
encoded_group = self._encode_group_id(group_id)
|
|
encoded_member_id = self._encode_member_id(self._member_id)
|
|
|
|
def _join_group(p):
|
|
if not p.exists(encoded_group):
|
|
raise coordination.GroupNotCreated(group_id)
|
|
p.multi()
|
|
p.hset(encoded_group, encoded_member_id,
|
|
self._dumps(capabilities))
|
|
c = p.execute()[0]
|
|
if c == 0:
|
|
# Field already exists...
|
|
raise coordination.MemberAlreadyExist(group_id,
|
|
self._member_id)
|
|
else:
|
|
self._joined_groups.add(group_id)
|
|
|
|
return RedisFutureResult(self._submit(self._client.transaction,
|
|
_join_group,
|
|
encoded_group,
|
|
value_from_callable=True))
|
|
|
|
def delete_group(self, group_id):
|
|
script = self._get_script('delete_group')
|
|
|
|
def _delete_group(script):
|
|
keys = [
|
|
self._encode_group_id(group_id),
|
|
self._groups,
|
|
]
|
|
args = [
|
|
self._encode_group_id(group_id, apply_namespace=False),
|
|
]
|
|
result = int(script(keys=keys, args=args))
|
|
if result in (-1, -2):
|
|
raise coordination.GroupNotCreated(group_id)
|
|
if result == -3:
|
|
raise coordination.GroupNotEmpty(group_id)
|
|
if result == -4:
|
|
raise tooz.ToozError("Unable to remove '%s' key"
|
|
" from set located at '%s'"
|
|
% (args[0], keys[-1]))
|
|
if result != 1:
|
|
raise tooz.ToozError("Internal error, unable"
|
|
" to complete group '%s' removal"
|
|
% (group_id))
|
|
|
|
return RedisFutureResult(self._submit(_delete_group, script))
|
|
|
|
def _destroy_group(self, group_id):
|
|
"""Should only be used in tests..."""
|
|
self._client.delete(self._encode_group_id(group_id))
|
|
|
|
def get_groups(self):
|
|
|
|
def _get_groups():
|
|
results = []
|
|
for g in self._client.smembers(self._groups):
|
|
results.append(self._decode_group_id(g))
|
|
return results
|
|
|
|
return RedisFutureResult(self._submit(_get_groups))
|
|
|
|
def _get_leader_lock(self, group_id):
|
|
name = self._encode_group_leader(group_id)
|
|
return self.get_lock(name)
|
|
|
|
def run_elect_coordinator(self):
|
|
for group_id, hooks in self._hooks_elected_leader.items():
|
|
leader_lock = self._get_leader_lock(group_id)
|
|
if leader_lock.acquire(blocking=False):
|
|
# We got the lock
|
|
hooks.run(coordination.LeaderElected(group_id,
|
|
self._member_id))
|
|
|
|
def run_watchers(self, timeout=None):
|
|
result = super(RedisDriver, self).run_watchers(timeout=timeout)
|
|
self.run_elect_coordinator()
|
|
return result
|
|
|
|
|
|
RedisFutureResult = functools.partial(coordination.CoordinatorResult)
|