From 3216c9016e49d463d176cc3da1a9b5e8daf4c808 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 6 May 2015 16:57:08 -0700 Subject: [PATCH] Use a lua script(s) instead of transactions Lua scripts are guaranteed to be atomic so we can avoid transactions (and the resulting potential retries that can be triggered); in general they are faster than client side retries and are seemingly preferred over transactions (transactions also don't rollback so they aren't really offering to much anyway). This replaces some of the calls that would benefit from the atomic nature of scripts with scripts (vs there prior usage of transactions). Useful links: - http://redis.io/commands/eval - http://redis.io/topics/transactions#redis-scripting-and-transactions - http://redis.io/topics/transactions#why-redis-does-not-support-roll-backs Change-Id: Icba07d0c73a283c1b3575c388883bb9a9621322c --- tooz/drivers/redis.py | 182 ++++++++++++++++++++++++++++++------------ 1 file changed, 133 insertions(+), 49 deletions(-) diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py index c4a02f35..e62380b9 100644 --- a/tooz/drivers/redis.py +++ b/tooz/drivers/redis.py @@ -19,6 +19,7 @@ from __future__ import absolute_import import contextlib from distutils import version import logging +import string from concurrent import futures from oslo_utils import strutils @@ -237,6 +238,64 @@ class RedisDriver(coordination.CoordinationDriver): #: String used to keep a key/member alive (until it next expires). STILL_ALIVE = b"Not dead!" + SCRIPTS = { + 'create_group': """ +-- Extract *all* the variables (so we can easily know what they are)... +local namespaced_group_key = KEYS[1] +local all_groups_key = KEYS[2] +local no_namespaced_group_key = ARGV[1] +if redis.call("exists", namespaced_group_key) == 1 then + return 0 +end +redis.call("sadd", all_groups_key, no_namespaced_group_key) +redis.call("hset", namespaced_group_key, + "${group_existence_key}", "${group_existence_value}") +return 1 +""", + 'delete_group': """ +-- Extract *all* the variables (so we can easily know what they are)... +local namespaced_group_key = KEYS[1] +local all_groups_key = KEYS[2] +local no_namespaced_group_key = ARGV[1] +if redis.call("exists", namespaced_group_key) == 0 then + return -1 +end +if redis.call("sismember", all_groups_key, no_namespaced_group_key) == 0 then + return -2 +end +if redis.call("hlen", namespaced_group_key) > 1 then + return -3 +end +-- First remove from the set (then delete the group); if the set removal +-- fails, at least the group will still exist (and can be fixed manually)... +if redis.call("srem", all_groups_key, no_namespaced_group_key) == 0 then + return -4 +end +redis.call("del", namespaced_group_key) +return 1 +""", + 'update_capabilities': """ +-- Extract *all* the variables (so we can easily know what they are)... +local group_key = KEYS[1] +local member_id = ARGV[1] +local caps = ARGV[2] +if redis.call("exists", group_key) == 0 then + return -1 +end +if redis.call("hexists", group_key, member_id) == 0 then + return -2 +end +redis.call("hset", group_key, member_id, caps) +return 1 +""", + } + """`Lua`_ **template** scripts that will be used by various methods (they + are turned into real scripts and loaded on call into the :func:`.start` + method). + + .. _Lua: http://www.lua.org + """ + def __init__(self, member_id, parsed_url, options): super(RedisDriver, self).__init__() options = utils.collapse(options, exclude=self.CLIENT_LIST_ARGS) @@ -262,6 +321,7 @@ class RedisDriver(coordination.CoordinationDriver): self._executor = None self._started = False self._server_info = {} + self._scripts = {} def _check_fetch_redis_version(self, geq_version, not_existent=True): if isinstance(geq_version, six.string_types): @@ -376,6 +436,23 @@ class RedisDriver(coordination.CoordinationDriver): " being used which is not new" " enough" % (self.MIN_VERSION, redis_version)) + tpl_params = { + 'group_existence_value': self.GROUP_EXISTS_VALUE, + 'group_existence_key': self.GROUP_EXISTS, + } + # For py3.x ensure these are unicode since the string template + # replacement will expect unicode (and we don't want b'' as a + # prefix which will happen in py3.x if this is not done). + for (k, v) in six.iteritems(tpl_params.copy()): + if isinstance(v, six.binary_type): + v = v.decode('ascii') + tpl_params[k] = v + prepared_scripts = {} + for name, raw_script_tpl in six.iteritems(self.SCRIPTS): + script_tpl = string.Template(raw_script_tpl) + script = script_tpl.substitute(**tpl_params) + prepared_scripts[name] = self._client.register_script(script) + self._scripts = prepared_scripts self.heartbeat() self._started = True @@ -447,6 +524,7 @@ class RedisDriver(coordination.CoordinationDriver): exc_info=True) self._client = None self._server_info = {} + self._scripts.clear() self._started = False def _submit(self, cb, *args, **kwargs): @@ -458,48 +536,49 @@ class RedisDriver(coordination.CoordinationDriver): raise coordination.ToozError("Redis driver asynchronous executor" " has been shutdown") + def _get_script(self, script_key): + try: + return self._scripts[script_key] + except KeyError: + raise coordination.ToozError("Redis driver has not been started") + def create_group(self, group_id): - encoded_group = self._encode_group_id(group_id) + script = self._get_script('create_group') - def _create_group(p): - if p.exists(encoded_group): + def _create_group(script): + encoded_group = self._encode_group_id(group_id) + keys = [ + encoded_group, + self._groups, + ] + args = [ + self._encode_group_id(group_id, apply_namespace=False), + ] + result = script(keys=keys, args=args) + result = strutils.bool_from_string(result) + if not result: raise coordination.GroupAlreadyExist(group_id) - # Start the transaction, all following should no longer be - # doing any reads (and should typically just be mutations from - # here on out). They need to be idempotent since transactions - # can fail half-way through if they key is of the wrong type - # or something else happens ... (redis doesn't have transactions - # that support rollbacks). - p.multi() - p.sadd(self._groups, - self._encode_group_id(group_id, apply_namespace=False)) - # Add our special key to avoid redis from deleting the dictionary - # when it becomes empty (which is not what we currently want)... - p.hset(encoded_group, self.GROUP_EXISTS, self.GROUP_EXISTS_VALUE) - return RedisFutureResult(self._submit(self._client.transaction, - _create_group, encoded_group, - self._groups, - value_from_callable=True)) + return RedisFutureResult(self._submit(_create_group, script)) def update_capabilities(self, group_id, capabilities): - encoded_group = self._encode_group_id(group_id) - encoded_member_id = self._encode_member_id(self._member_id) + script = self._get_script('update_capabilities') - def _update_capabilities(p): - if not p.exists(encoded_group): + def _update_capabilities(script): + keys = [ + self._encode_group_id(group_id), + ] + args = [ + self._encode_member_id(self._member_id), + self._dumps(capabilities), + ] + result = int(script(keys=keys, args=args)) + if result == -1: raise coordination.GroupNotCreated(group_id) - if not p.hexists(encoded_group, encoded_member_id): + if result == -2: raise coordination.MemberNotJoined(group_id, self._member_id) - else: - p.multi() - p.hset(encoded_group, encoded_member_id, - self._dumps(capabilities)) - return RedisFutureResult(self._submit(self._client.transaction, - _update_capabilities, - encoded_group, - value_from_callable=True)) + return RedisFutureResult(self._submit(_update_capabilities, script)) def leave_group(self, group_id): encoded_group = self._encode_group_id(group_id) @@ -603,26 +682,31 @@ class RedisDriver(coordination.CoordinationDriver): value_from_callable=True)) def delete_group(self, group_id): - encoded_group = self._encode_group_id(group_id) + script = self._get_script('delete_group') - def _delete_group(p): - # An empty group still have the special key GROUP_EXISTS set, so - # its len is 1 - if p.hlen(encoded_group) > 1: - raise coordination.GroupNotEmpty(group_id) - p.multi() - p.delete(encoded_group) - p.srem(self._groups, - self._encode_group_id(group_id, - apply_namespace=False)) - was_group_deleted, was_group_member_removed = p.execute() - if not any([was_group_deleted, was_group_member_removed]): + def _delete_group(script): + keys = [ + self._encode_group_id(group_id), + self._groups, + ] + args = [ + self._encode_group_id(group_id, apply_namespace=False), + ] + result = int(script(keys=keys, args=args)) + if result in (-1, -2): raise coordination.GroupNotCreated(group_id) + if result == -3: + raise coordination.GroupNotEmpty(group_id) + if result == -4: + raise coordination.ToozError("Unable to remove '%s' key" + " from set located at '%s'" + % (args[0], keys[-1])) + if result != 1: + raise coordination.ToozError("Internal error, unable" + " to complete group '%s' removal" + % (group_id)) - return RedisFutureResult(self._submit(self._client.transaction, - _delete_group, - encoded_group, - value_from_callable=True)) + return RedisFutureResult(self._submit(_delete_group, script)) def _destroy_group(self, group_id): """Should only be used in tests..."""