Use a lua script(s) instead of transactions
Lua scripts are guaranteed to be atomic so we can avoid transactions (and the resulting potential retries that can be triggered); in general they are faster than client side retries and are seemingly preferred over transactions (transactions also don't rollback so they aren't really offering to much anyway). This replaces some of the calls that would benefit from the atomic nature of scripts with scripts (vs there prior usage of transactions). Useful links: - http://redis.io/commands/eval - http://redis.io/topics/transactions#redis-scripting-and-transactions - http://redis.io/topics/transactions#why-redis-does-not-support-roll-backs Change-Id: Icba07d0c73a283c1b3575c388883bb9a9621322c
This commit is contained in:
parent
473e90ff6a
commit
3216c9016e
|
@ -19,6 +19,7 @@ from __future__ import absolute_import
|
|||
import contextlib
|
||||
from distutils import version
|
||||
import logging
|
||||
import string
|
||||
|
||||
from concurrent import futures
|
||||
from oslo_utils import strutils
|
||||
|
@ -237,6 +238,64 @@ class RedisDriver(coordination.CoordinationDriver):
|
|||
#: String used to keep a key/member alive (until it next expires).
|
||||
STILL_ALIVE = b"Not dead!"
|
||||
|
||||
SCRIPTS = {
|
||||
'create_group': """
|
||||
-- Extract *all* the variables (so we can easily know what they are)...
|
||||
local namespaced_group_key = KEYS[1]
|
||||
local all_groups_key = KEYS[2]
|
||||
local no_namespaced_group_key = ARGV[1]
|
||||
if redis.call("exists", namespaced_group_key) == 1 then
|
||||
return 0
|
||||
end
|
||||
redis.call("sadd", all_groups_key, no_namespaced_group_key)
|
||||
redis.call("hset", namespaced_group_key,
|
||||
"${group_existence_key}", "${group_existence_value}")
|
||||
return 1
|
||||
""",
|
||||
'delete_group': """
|
||||
-- Extract *all* the variables (so we can easily know what they are)...
|
||||
local namespaced_group_key = KEYS[1]
|
||||
local all_groups_key = KEYS[2]
|
||||
local no_namespaced_group_key = ARGV[1]
|
||||
if redis.call("exists", namespaced_group_key) == 0 then
|
||||
return -1
|
||||
end
|
||||
if redis.call("sismember", all_groups_key, no_namespaced_group_key) == 0 then
|
||||
return -2
|
||||
end
|
||||
if redis.call("hlen", namespaced_group_key) > 1 then
|
||||
return -3
|
||||
end
|
||||
-- First remove from the set (then delete the group); if the set removal
|
||||
-- fails, at least the group will still exist (and can be fixed manually)...
|
||||
if redis.call("srem", all_groups_key, no_namespaced_group_key) == 0 then
|
||||
return -4
|
||||
end
|
||||
redis.call("del", namespaced_group_key)
|
||||
return 1
|
||||
""",
|
||||
'update_capabilities': """
|
||||
-- Extract *all* the variables (so we can easily know what they are)...
|
||||
local group_key = KEYS[1]
|
||||
local member_id = ARGV[1]
|
||||
local caps = ARGV[2]
|
||||
if redis.call("exists", group_key) == 0 then
|
||||
return -1
|
||||
end
|
||||
if redis.call("hexists", group_key, member_id) == 0 then
|
||||
return -2
|
||||
end
|
||||
redis.call("hset", group_key, member_id, caps)
|
||||
return 1
|
||||
""",
|
||||
}
|
||||
"""`Lua`_ **template** scripts that will be used by various methods (they
|
||||
are turned into real scripts and loaded on call into the :func:`.start`
|
||||
method).
|
||||
|
||||
.. _Lua: http://www.lua.org
|
||||
"""
|
||||
|
||||
def __init__(self, member_id, parsed_url, options):
|
||||
super(RedisDriver, self).__init__()
|
||||
options = utils.collapse(options, exclude=self.CLIENT_LIST_ARGS)
|
||||
|
@ -262,6 +321,7 @@ class RedisDriver(coordination.CoordinationDriver):
|
|||
self._executor = None
|
||||
self._started = False
|
||||
self._server_info = {}
|
||||
self._scripts = {}
|
||||
|
||||
def _check_fetch_redis_version(self, geq_version, not_existent=True):
|
||||
if isinstance(geq_version, six.string_types):
|
||||
|
@ -376,6 +436,23 @@ class RedisDriver(coordination.CoordinationDriver):
|
|||
" being used which is not new"
|
||||
" enough" % (self.MIN_VERSION,
|
||||
redis_version))
|
||||
tpl_params = {
|
||||
'group_existence_value': self.GROUP_EXISTS_VALUE,
|
||||
'group_existence_key': self.GROUP_EXISTS,
|
||||
}
|
||||
# For py3.x ensure these are unicode since the string template
|
||||
# replacement will expect unicode (and we don't want b'' as a
|
||||
# prefix which will happen in py3.x if this is not done).
|
||||
for (k, v) in six.iteritems(tpl_params.copy()):
|
||||
if isinstance(v, six.binary_type):
|
||||
v = v.decode('ascii')
|
||||
tpl_params[k] = v
|
||||
prepared_scripts = {}
|
||||
for name, raw_script_tpl in six.iteritems(self.SCRIPTS):
|
||||
script_tpl = string.Template(raw_script_tpl)
|
||||
script = script_tpl.substitute(**tpl_params)
|
||||
prepared_scripts[name] = self._client.register_script(script)
|
||||
self._scripts = prepared_scripts
|
||||
self.heartbeat()
|
||||
self._started = True
|
||||
|
||||
|
@ -447,6 +524,7 @@ class RedisDriver(coordination.CoordinationDriver):
|
|||
exc_info=True)
|
||||
self._client = None
|
||||
self._server_info = {}
|
||||
self._scripts.clear()
|
||||
self._started = False
|
||||
|
||||
def _submit(self, cb, *args, **kwargs):
|
||||
|
@ -458,48 +536,49 @@ class RedisDriver(coordination.CoordinationDriver):
|
|||
raise coordination.ToozError("Redis driver asynchronous executor"
|
||||
" has been shutdown")
|
||||
|
||||
def _get_script(self, script_key):
|
||||
try:
|
||||
return self._scripts[script_key]
|
||||
except KeyError:
|
||||
raise coordination.ToozError("Redis driver has not been started")
|
||||
|
||||
def create_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
script = self._get_script('create_group')
|
||||
|
||||
def _create_group(p):
|
||||
if p.exists(encoded_group):
|
||||
def _create_group(script):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
keys = [
|
||||
encoded_group,
|
||||
self._groups,
|
||||
]
|
||||
args = [
|
||||
self._encode_group_id(group_id, apply_namespace=False),
|
||||
]
|
||||
result = script(keys=keys, args=args)
|
||||
result = strutils.bool_from_string(result)
|
||||
if not result:
|
||||
raise coordination.GroupAlreadyExist(group_id)
|
||||
# Start the transaction, all following should no longer be
|
||||
# doing any reads (and should typically just be mutations from
|
||||
# here on out). They need to be idempotent since transactions
|
||||
# can fail half-way through if they key is of the wrong type
|
||||
# or something else happens ... (redis doesn't have transactions
|
||||
# that support rollbacks).
|
||||
p.multi()
|
||||
p.sadd(self._groups,
|
||||
self._encode_group_id(group_id, apply_namespace=False))
|
||||
# Add our special key to avoid redis from deleting the dictionary
|
||||
# when it becomes empty (which is not what we currently want)...
|
||||
p.hset(encoded_group, self.GROUP_EXISTS, self.GROUP_EXISTS_VALUE)
|
||||
|
||||
return RedisFutureResult(self._submit(self._client.transaction,
|
||||
_create_group, encoded_group,
|
||||
self._groups,
|
||||
value_from_callable=True))
|
||||
return RedisFutureResult(self._submit(_create_group, script))
|
||||
|
||||
def update_capabilities(self, group_id, capabilities):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
encoded_member_id = self._encode_member_id(self._member_id)
|
||||
script = self._get_script('update_capabilities')
|
||||
|
||||
def _update_capabilities(p):
|
||||
if not p.exists(encoded_group):
|
||||
def _update_capabilities(script):
|
||||
keys = [
|
||||
self._encode_group_id(group_id),
|
||||
]
|
||||
args = [
|
||||
self._encode_member_id(self._member_id),
|
||||
self._dumps(capabilities),
|
||||
]
|
||||
result = int(script(keys=keys, args=args))
|
||||
if result == -1:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
if not p.hexists(encoded_group, encoded_member_id):
|
||||
if result == -2:
|
||||
raise coordination.MemberNotJoined(group_id, self._member_id)
|
||||
else:
|
||||
p.multi()
|
||||
p.hset(encoded_group, encoded_member_id,
|
||||
self._dumps(capabilities))
|
||||
|
||||
return RedisFutureResult(self._submit(self._client.transaction,
|
||||
_update_capabilities,
|
||||
encoded_group,
|
||||
value_from_callable=True))
|
||||
return RedisFutureResult(self._submit(_update_capabilities, script))
|
||||
|
||||
def leave_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
|
@ -603,26 +682,31 @@ class RedisDriver(coordination.CoordinationDriver):
|
|||
value_from_callable=True))
|
||||
|
||||
def delete_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
script = self._get_script('delete_group')
|
||||
|
||||
def _delete_group(p):
|
||||
# An empty group still have the special key GROUP_EXISTS set, so
|
||||
# its len is 1
|
||||
if p.hlen(encoded_group) > 1:
|
||||
raise coordination.GroupNotEmpty(group_id)
|
||||
p.multi()
|
||||
p.delete(encoded_group)
|
||||
p.srem(self._groups,
|
||||
self._encode_group_id(group_id,
|
||||
apply_namespace=False))
|
||||
was_group_deleted, was_group_member_removed = p.execute()
|
||||
if not any([was_group_deleted, was_group_member_removed]):
|
||||
def _delete_group(script):
|
||||
keys = [
|
||||
self._encode_group_id(group_id),
|
||||
self._groups,
|
||||
]
|
||||
args = [
|
||||
self._encode_group_id(group_id, apply_namespace=False),
|
||||
]
|
||||
result = int(script(keys=keys, args=args))
|
||||
if result in (-1, -2):
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
if result == -3:
|
||||
raise coordination.GroupNotEmpty(group_id)
|
||||
if result == -4:
|
||||
raise coordination.ToozError("Unable to remove '%s' key"
|
||||
" from set located at '%s'"
|
||||
% (args[0], keys[-1]))
|
||||
if result != 1:
|
||||
raise coordination.ToozError("Internal error, unable"
|
||||
" to complete group '%s' removal"
|
||||
% (group_id))
|
||||
|
||||
return RedisFutureResult(self._submit(self._client.transaction,
|
||||
_delete_group,
|
||||
encoded_group,
|
||||
value_from_callable=True))
|
||||
return RedisFutureResult(self._submit(_delete_group, script))
|
||||
|
||||
def _destroy_group(self, group_id):
|
||||
"""Should only be used in tests..."""
|
||||
|
|
Loading…
Reference in New Issue