Merge "Use a lua script(s) instead of transactions"
This commit is contained in:
commit
f674ed55ce
|
@ -19,6 +19,7 @@ from __future__ import absolute_import
|
|||
import contextlib
|
||||
from distutils import version
|
||||
import logging
|
||||
import string
|
||||
|
||||
from concurrent import futures
|
||||
from oslo_utils import strutils
|
||||
|
@ -231,6 +232,64 @@ class RedisDriver(coordination._RunWatchersMixin,
|
|||
#: String used to keep a key/member alive (until it next expires).
|
||||
STILL_ALIVE = b"Not dead!"
|
||||
|
||||
SCRIPTS = {
|
||||
'create_group': """
|
||||
-- Extract *all* the variables (so we can easily know what they are)...
|
||||
local namespaced_group_key = KEYS[1]
|
||||
local all_groups_key = KEYS[2]
|
||||
local no_namespaced_group_key = ARGV[1]
|
||||
if redis.call("exists", namespaced_group_key) == 1 then
|
||||
return 0
|
||||
end
|
||||
redis.call("sadd", all_groups_key, no_namespaced_group_key)
|
||||
redis.call("hset", namespaced_group_key,
|
||||
"${group_existence_key}", "${group_existence_value}")
|
||||
return 1
|
||||
""",
|
||||
'delete_group': """
|
||||
-- Extract *all* the variables (so we can easily know what they are)...
|
||||
local namespaced_group_key = KEYS[1]
|
||||
local all_groups_key = KEYS[2]
|
||||
local no_namespaced_group_key = ARGV[1]
|
||||
if redis.call("exists", namespaced_group_key) == 0 then
|
||||
return -1
|
||||
end
|
||||
if redis.call("sismember", all_groups_key, no_namespaced_group_key) == 0 then
|
||||
return -2
|
||||
end
|
||||
if redis.call("hlen", namespaced_group_key) > 1 then
|
||||
return -3
|
||||
end
|
||||
-- First remove from the set (then delete the group); if the set removal
|
||||
-- fails, at least the group will still exist (and can be fixed manually)...
|
||||
if redis.call("srem", all_groups_key, no_namespaced_group_key) == 0 then
|
||||
return -4
|
||||
end
|
||||
redis.call("del", namespaced_group_key)
|
||||
return 1
|
||||
""",
|
||||
'update_capabilities': """
|
||||
-- Extract *all* the variables (so we can easily know what they are)...
|
||||
local group_key = KEYS[1]
|
||||
local member_id = ARGV[1]
|
||||
local caps = ARGV[2]
|
||||
if redis.call("exists", group_key) == 0 then
|
||||
return -1
|
||||
end
|
||||
if redis.call("hexists", group_key, member_id) == 0 then
|
||||
return -2
|
||||
end
|
||||
redis.call("hset", group_key, member_id, caps)
|
||||
return 1
|
||||
""",
|
||||
}
|
||||
"""`Lua`_ **template** scripts that will be used by various methods (they
|
||||
are turned into real scripts and loaded on call into the :func:`.start`
|
||||
method).
|
||||
|
||||
.. _Lua: http://www.lua.org
|
||||
"""
|
||||
|
||||
def __init__(self, member_id, parsed_url, options):
|
||||
super(RedisDriver, self).__init__()
|
||||
options = utils.collapse(options, exclude=self.CLIENT_LIST_ARGS)
|
||||
|
@ -255,6 +314,7 @@ class RedisDriver(coordination._RunWatchersMixin,
|
|||
self._executor = None
|
||||
self._started = False
|
||||
self._server_info = {}
|
||||
self._scripts = {}
|
||||
|
||||
def _check_fetch_redis_version(self, geq_version, not_existent=True):
|
||||
if isinstance(geq_version, six.string_types):
|
||||
|
@ -369,6 +429,23 @@ class RedisDriver(coordination._RunWatchersMixin,
|
|||
" being used which is not new"
|
||||
" enough" % (self.MIN_VERSION,
|
||||
redis_version))
|
||||
tpl_params = {
|
||||
'group_existence_value': self.GROUP_EXISTS_VALUE,
|
||||
'group_existence_key': self.GROUP_EXISTS,
|
||||
}
|
||||
# For py3.x ensure these are unicode since the string template
|
||||
# replacement will expect unicode (and we don't want b'' as a
|
||||
# prefix which will happen in py3.x if this is not done).
|
||||
for (k, v) in six.iteritems(tpl_params.copy()):
|
||||
if isinstance(v, six.binary_type):
|
||||
v = v.decode('ascii')
|
||||
tpl_params[k] = v
|
||||
prepared_scripts = {}
|
||||
for name, raw_script_tpl in six.iteritems(self.SCRIPTS):
|
||||
script_tpl = string.Template(raw_script_tpl)
|
||||
script = script_tpl.substitute(**tpl_params)
|
||||
prepared_scripts[name] = self._client.register_script(script)
|
||||
self._scripts = prepared_scripts
|
||||
self.heartbeat()
|
||||
self._started = True
|
||||
|
||||
|
@ -444,6 +521,7 @@ class RedisDriver(coordination._RunWatchersMixin,
|
|||
exc_info=True)
|
||||
self._client = None
|
||||
self._server_info = {}
|
||||
self._scripts.clear()
|
||||
self._started = False
|
||||
|
||||
def _submit(self, cb, *args, **kwargs):
|
||||
|
@ -455,48 +533,49 @@ class RedisDriver(coordination._RunWatchersMixin,
|
|||
raise coordination.ToozError("Redis driver asynchronous executor"
|
||||
" has been shutdown")
|
||||
|
||||
def _get_script(self, script_key):
|
||||
try:
|
||||
return self._scripts[script_key]
|
||||
except KeyError:
|
||||
raise coordination.ToozError("Redis driver has not been started")
|
||||
|
||||
def create_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
script = self._get_script('create_group')
|
||||
|
||||
def _create_group(p):
|
||||
if p.exists(encoded_group):
|
||||
def _create_group(script):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
keys = [
|
||||
encoded_group,
|
||||
self._groups,
|
||||
]
|
||||
args = [
|
||||
self._encode_group_id(group_id, apply_namespace=False),
|
||||
]
|
||||
result = script(keys=keys, args=args)
|
||||
result = strutils.bool_from_string(result)
|
||||
if not result:
|
||||
raise coordination.GroupAlreadyExist(group_id)
|
||||
# Start the transaction, all following should no longer be
|
||||
# doing any reads (and should typically just be mutations from
|
||||
# here on out). They need to be idempotent since transactions
|
||||
# can fail half-way through if they key is of the wrong type
|
||||
# or something else happens ... (redis doesn't have transactions
|
||||
# that support rollbacks).
|
||||
p.multi()
|
||||
p.sadd(self._groups,
|
||||
self._encode_group_id(group_id, apply_namespace=False))
|
||||
# Add our special key to avoid redis from deleting the dictionary
|
||||
# when it becomes empty (which is not what we currently want)...
|
||||
p.hset(encoded_group, self.GROUP_EXISTS, self.GROUP_EXISTS_VALUE)
|
||||
|
||||
return RedisFutureResult(self._submit(self._client.transaction,
|
||||
_create_group, encoded_group,
|
||||
self._groups,
|
||||
value_from_callable=True))
|
||||
return RedisFutureResult(self._submit(_create_group, script))
|
||||
|
||||
def update_capabilities(self, group_id, capabilities):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
encoded_member_id = self._encode_member_id(self._member_id)
|
||||
script = self._get_script('update_capabilities')
|
||||
|
||||
def _update_capabilities(p):
|
||||
if not p.exists(encoded_group):
|
||||
def _update_capabilities(script):
|
||||
keys = [
|
||||
self._encode_group_id(group_id),
|
||||
]
|
||||
args = [
|
||||
self._encode_member_id(self._member_id),
|
||||
self._dumps(capabilities),
|
||||
]
|
||||
result = int(script(keys=keys, args=args))
|
||||
if result == -1:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
if not p.hexists(encoded_group, encoded_member_id):
|
||||
if result == -2:
|
||||
raise coordination.MemberNotJoined(group_id, self._member_id)
|
||||
else:
|
||||
p.multi()
|
||||
p.hset(encoded_group, encoded_member_id,
|
||||
self._dumps(capabilities))
|
||||
|
||||
return RedisFutureResult(self._submit(self._client.transaction,
|
||||
_update_capabilities,
|
||||
encoded_group,
|
||||
value_from_callable=True))
|
||||
return RedisFutureResult(self._submit(_update_capabilities, script))
|
||||
|
||||
def leave_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
|
@ -600,26 +679,31 @@ class RedisDriver(coordination._RunWatchersMixin,
|
|||
value_from_callable=True))
|
||||
|
||||
def delete_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
script = self._get_script('delete_group')
|
||||
|
||||
def _delete_group(p):
|
||||
# An empty group still have the special key GROUP_EXISTS set, so
|
||||
# its len is 1
|
||||
if p.hlen(encoded_group) > 1:
|
||||
raise coordination.GroupNotEmpty(group_id)
|
||||
p.multi()
|
||||
p.delete(encoded_group)
|
||||
p.srem(self._groups,
|
||||
self._encode_group_id(group_id,
|
||||
apply_namespace=False))
|
||||
was_group_deleted, was_group_member_removed = p.execute()
|
||||
if not any([was_group_deleted, was_group_member_removed]):
|
||||
def _delete_group(script):
|
||||
keys = [
|
||||
self._encode_group_id(group_id),
|
||||
self._groups,
|
||||
]
|
||||
args = [
|
||||
self._encode_group_id(group_id, apply_namespace=False),
|
||||
]
|
||||
result = int(script(keys=keys, args=args))
|
||||
if result in (-1, -2):
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
if result == -3:
|
||||
raise coordination.GroupNotEmpty(group_id)
|
||||
if result == -4:
|
||||
raise coordination.ToozError("Unable to remove '%s' key"
|
||||
" from set located at '%s'"
|
||||
% (args[0], keys[-1]))
|
||||
if result != 1:
|
||||
raise coordination.ToozError("Internal error, unable"
|
||||
" to complete group '%s' removal"
|
||||
% (group_id))
|
||||
|
||||
return RedisFutureResult(self._submit(self._client.transaction,
|
||||
_delete_group,
|
||||
encoded_group,
|
||||
value_from_callable=True))
|
||||
return RedisFutureResult(self._submit(_delete_group, script))
|
||||
|
||||
def _destroy_group(self, group_id):
|
||||
"""Should only be used in tests..."""
|
||||
|
|
Loading…
Reference in New Issue