diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py index c4a02f35..e62380b9 100644 --- a/tooz/drivers/redis.py +++ b/tooz/drivers/redis.py @@ -19,6 +19,7 @@ from __future__ import absolute_import import contextlib from distutils import version import logging +import string from concurrent import futures from oslo_utils import strutils @@ -237,6 +238,64 @@ class RedisDriver(coordination.CoordinationDriver): #: String used to keep a key/member alive (until it next expires). STILL_ALIVE = b"Not dead!" + SCRIPTS = { + 'create_group': """ +-- Extract *all* the variables (so we can easily know what they are)... +local namespaced_group_key = KEYS[1] +local all_groups_key = KEYS[2] +local no_namespaced_group_key = ARGV[1] +if redis.call("exists", namespaced_group_key) == 1 then + return 0 +end +redis.call("sadd", all_groups_key, no_namespaced_group_key) +redis.call("hset", namespaced_group_key, + "${group_existence_key}", "${group_existence_value}") +return 1 +""", + 'delete_group': """ +-- Extract *all* the variables (so we can easily know what they are)... +local namespaced_group_key = KEYS[1] +local all_groups_key = KEYS[2] +local no_namespaced_group_key = ARGV[1] +if redis.call("exists", namespaced_group_key) == 0 then + return -1 +end +if redis.call("sismember", all_groups_key, no_namespaced_group_key) == 0 then + return -2 +end +if redis.call("hlen", namespaced_group_key) > 1 then + return -3 +end +-- First remove from the set (then delete the group); if the set removal +-- fails, at least the group will still exist (and can be fixed manually)... +if redis.call("srem", all_groups_key, no_namespaced_group_key) == 0 then + return -4 +end +redis.call("del", namespaced_group_key) +return 1 +""", + 'update_capabilities': """ +-- Extract *all* the variables (so we can easily know what they are)... +local group_key = KEYS[1] +local member_id = ARGV[1] +local caps = ARGV[2] +if redis.call("exists", group_key) == 0 then + return -1 +end +if redis.call("hexists", group_key, member_id) == 0 then + return -2 +end +redis.call("hset", group_key, member_id, caps) +return 1 +""", + } + """`Lua`_ **template** scripts that will be used by various methods (they + are turned into real scripts and loaded on call into the :func:`.start` + method). + + .. _Lua: http://www.lua.org + """ + def __init__(self, member_id, parsed_url, options): super(RedisDriver, self).__init__() options = utils.collapse(options, exclude=self.CLIENT_LIST_ARGS) @@ -262,6 +321,7 @@ class RedisDriver(coordination.CoordinationDriver): self._executor = None self._started = False self._server_info = {} + self._scripts = {} def _check_fetch_redis_version(self, geq_version, not_existent=True): if isinstance(geq_version, six.string_types): @@ -376,6 +436,23 @@ class RedisDriver(coordination.CoordinationDriver): " being used which is not new" " enough" % (self.MIN_VERSION, redis_version)) + tpl_params = { + 'group_existence_value': self.GROUP_EXISTS_VALUE, + 'group_existence_key': self.GROUP_EXISTS, + } + # For py3.x ensure these are unicode since the string template + # replacement will expect unicode (and we don't want b'' as a + # prefix which will happen in py3.x if this is not done). + for (k, v) in six.iteritems(tpl_params.copy()): + if isinstance(v, six.binary_type): + v = v.decode('ascii') + tpl_params[k] = v + prepared_scripts = {} + for name, raw_script_tpl in six.iteritems(self.SCRIPTS): + script_tpl = string.Template(raw_script_tpl) + script = script_tpl.substitute(**tpl_params) + prepared_scripts[name] = self._client.register_script(script) + self._scripts = prepared_scripts self.heartbeat() self._started = True @@ -447,6 +524,7 @@ class RedisDriver(coordination.CoordinationDriver): exc_info=True) self._client = None self._server_info = {} + self._scripts.clear() self._started = False def _submit(self, cb, *args, **kwargs): @@ -458,48 +536,49 @@ class RedisDriver(coordination.CoordinationDriver): raise coordination.ToozError("Redis driver asynchronous executor" " has been shutdown") + def _get_script(self, script_key): + try: + return self._scripts[script_key] + except KeyError: + raise coordination.ToozError("Redis driver has not been started") + def create_group(self, group_id): - encoded_group = self._encode_group_id(group_id) + script = self._get_script('create_group') - def _create_group(p): - if p.exists(encoded_group): + def _create_group(script): + encoded_group = self._encode_group_id(group_id) + keys = [ + encoded_group, + self._groups, + ] + args = [ + self._encode_group_id(group_id, apply_namespace=False), + ] + result = script(keys=keys, args=args) + result = strutils.bool_from_string(result) + if not result: raise coordination.GroupAlreadyExist(group_id) - # Start the transaction, all following should no longer be - # doing any reads (and should typically just be mutations from - # here on out). They need to be idempotent since transactions - # can fail half-way through if they key is of the wrong type - # or something else happens ... (redis doesn't have transactions - # that support rollbacks). - p.multi() - p.sadd(self._groups, - self._encode_group_id(group_id, apply_namespace=False)) - # Add our special key to avoid redis from deleting the dictionary - # when it becomes empty (which is not what we currently want)... - p.hset(encoded_group, self.GROUP_EXISTS, self.GROUP_EXISTS_VALUE) - return RedisFutureResult(self._submit(self._client.transaction, - _create_group, encoded_group, - self._groups, - value_from_callable=True)) + return RedisFutureResult(self._submit(_create_group, script)) def update_capabilities(self, group_id, capabilities): - encoded_group = self._encode_group_id(group_id) - encoded_member_id = self._encode_member_id(self._member_id) + script = self._get_script('update_capabilities') - def _update_capabilities(p): - if not p.exists(encoded_group): + def _update_capabilities(script): + keys = [ + self._encode_group_id(group_id), + ] + args = [ + self._encode_member_id(self._member_id), + self._dumps(capabilities), + ] + result = int(script(keys=keys, args=args)) + if result == -1: raise coordination.GroupNotCreated(group_id) - if not p.hexists(encoded_group, encoded_member_id): + if result == -2: raise coordination.MemberNotJoined(group_id, self._member_id) - else: - p.multi() - p.hset(encoded_group, encoded_member_id, - self._dumps(capabilities)) - return RedisFutureResult(self._submit(self._client.transaction, - _update_capabilities, - encoded_group, - value_from_callable=True)) + return RedisFutureResult(self._submit(_update_capabilities, script)) def leave_group(self, group_id): encoded_group = self._encode_group_id(group_id) @@ -603,26 +682,31 @@ class RedisDriver(coordination.CoordinationDriver): value_from_callable=True)) def delete_group(self, group_id): - encoded_group = self._encode_group_id(group_id) + script = self._get_script('delete_group') - def _delete_group(p): - # An empty group still have the special key GROUP_EXISTS set, so - # its len is 1 - if p.hlen(encoded_group) > 1: - raise coordination.GroupNotEmpty(group_id) - p.multi() - p.delete(encoded_group) - p.srem(self._groups, - self._encode_group_id(group_id, - apply_namespace=False)) - was_group_deleted, was_group_member_removed = p.execute() - if not any([was_group_deleted, was_group_member_removed]): + def _delete_group(script): + keys = [ + self._encode_group_id(group_id), + self._groups, + ] + args = [ + self._encode_group_id(group_id, apply_namespace=False), + ] + result = int(script(keys=keys, args=args)) + if result in (-1, -2): raise coordination.GroupNotCreated(group_id) + if result == -3: + raise coordination.GroupNotEmpty(group_id) + if result == -4: + raise coordination.ToozError("Unable to remove '%s' key" + " from set located at '%s'" + % (args[0], keys[-1])) + if result != 1: + raise coordination.ToozError("Internal error, unable" + " to complete group '%s' removal" + % (group_id)) - return RedisFutureResult(self._submit(self._client.transaction, - _delete_group, - encoded_group, - value_from_callable=True)) + return RedisFutureResult(self._submit(_delete_group, script)) def _destroy_group(self, group_id): """Should only be used in tests..."""