Add clustered memcached servers support for tooz.

Change-Id: I7ac6cdfecb6ad1d5fa2a78cfa6b236c45e3d4a96
This commit is contained in:
XuQi 2024-02-04 09:51:57 +00:00
parent 22361cb48d
commit 570933d2ea
1 changed files with 96 additions and 46 deletions

View File

@ -26,10 +26,10 @@ from pymemcache import client as pymemcache_client
import tooz
from tooz import _retry
from tooz import coordination
from tooz import hashring
from tooz import locking
from tooz import utils
LOG = logging.getLogger(__name__)
@ -96,7 +96,7 @@ class MemcachedLock(locking.Lock):
@_retry.retry(stop_max_delay=blocking)
@_translate_failures
def _acquire():
if self.coord.client.add(
if self.coord.add(
self.name,
self.coord._member_id,
expire=self.timeout,
@ -111,7 +111,7 @@ class MemcachedLock(locking.Lock):
@_translate_failures
def break_(self):
return bool(self.coord.client.delete(self.name, noreply=False))
return bool(self.coord.delete(self.name, noreply=False))
@_translate_failures
def release(self):
@ -148,7 +148,7 @@ class MemcachedLock(locking.Lock):
# Do a ghetto test to see what the value is... (see above note),
# and how this really can't be done safely with memcache due to
# it being done in the client side (non-atomic).
value = self.coord.client.get(self.name)
value = self.coord.get(self.name)
if value != self.coord._member_id:
# NOTE(zhen): Although ``member_ id`` is different, self lock
# object needs to be removed from'_ acquired_locks' because it
@ -158,7 +158,7 @@ class MemcachedLock(locking.Lock):
else:
# NOTE(zhen): Whether 'was_deleted' was 'TRUE' or not,
# eventually we have to remove self from '_acquired_locks'.
was_deleted = self.coord.client.delete(self.name, noreply=False)
was_deleted = self.coord.delete(self.name, noreply=False)
self.coord._acquired_locks.remove(self)
return was_deleted
@ -166,9 +166,9 @@ class MemcachedLock(locking.Lock):
def heartbeat(self):
"""Keep the lock alive."""
if self.acquired:
poked = self.coord.client.touch(self.name,
expire=self.timeout,
noreply=False)
poked = self.coord.touch(self.name,
expire=self.timeout,
noreply=False)
if poked:
return True
LOG.warning("Unable to heartbeat by updating key '%s' with "
@ -178,7 +178,7 @@ class MemcachedLock(locking.Lock):
@_translate_failures
def get_owner(self):
return self.coord.client.get(self.name)
return self.coord.get(self.name)
@property
def acquired(self):
@ -209,8 +209,14 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
lock_timeout 30
leader_timeout 30
max_pool_size None
extra_host None
================== =======
If clustered memcached server is needed, additional memcached servers are
listed with ``extra_host`` parameters as follows::
memcached://<host>:<port>?sentinel=<host>:<port>,<host>:<port>
General recommendations/usage considerations:
- Memcache (without different backend technology) is a **cache** enough
@ -251,8 +257,13 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
def __init__(self, member_id, parsed_url, options):
super(MemcachedDriver, self).__init__(member_id, parsed_url, options)
self.host = (parsed_url.hostname or "localhost",
parsed_url.port or 11211)
host = parsed_url.netloc
self._host_list = {host: None}
extra_host = self._options.get('extra_host', None)
if extra_host is not None:
extra_host_list = extra_host.split(',')
for host_ in extra_host_list:
self._host_list[host_] = None
default_timeout = self._options.get('timeout', self.DEFAULT_TIMEOUT)
self.timeout = int(default_timeout)
self.membership_timeout = int(self._options.get(
@ -267,6 +278,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
else:
self.max_pool_size = None
self._acquired_locks = []
self._hash_ring = hashring.HashRing(self._host_list.keys())
@staticmethod
def _msgpack_serializer(key, value):
@ -286,13 +298,15 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
@_translate_failures
def _start(self):
super(MemcachedDriver, self)._start()
self.client = pymemcache_client.PooledClient(
self.host,
serializer=self._msgpack_serializer,
deserializer=self._msgpack_deserializer,
timeout=self.timeout,
connect_timeout=self.timeout,
max_pool_size=self.max_pool_size)
# NOTE(xuqi): Map each host to a pooledclient.
for host in self._host_list.keys():
self._host_list[host] = pymemcache_client.PooledClient(
host,
serializer=self._msgpack_serializer,
deserializer=self._msgpack_deserializer,
timeout=self.timeout,
connect_timeout=self.timeout,
max_pool_size=self.max_pool_size)
# Run heartbeat here because pymemcache use a lazy connection
# method and only connect once you do an operation.
self.heartbeat()
@ -302,8 +316,9 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
super(MemcachedDriver, self)._stop()
for lock in list(self._acquired_locks):
lock.release()
self.client.delete(self._encode_member_id(self._member_id))
self.client.close()
self.delete(self._encode_member_id(self._member_id))
for host, client in self._host_list.items():
client.close()
def _encode_group_id(self, group_id):
return self.GROUP_PREFIX + utils.to_binary(group_id)
@ -313,6 +328,41 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
def _encode_group_leader(self, group_id):
return self.GROUP_LEADER_PREFIX + utils.to_binary(group_id)
def _get_client(self, key):
"""get the client for the key from the hash ring."""
host = self._hash_ring.get_nodes(key)
if host is None:
raise tooz.ToozError("Can not map key to any server.")
return self._host_list[host.pop()]
def _run_cmd(self, cmd, key, *args, **kwargs):
client = self._get_client(key)
func = getattr(client, cmd)
args = list(args)
args.insert(0, key)
return func(*args, **kwargs)
def set(self, key, *args, **kwargs):
return self._run_cmd("set", key, *args, **kwargs)
def get(self, key, *args, **kwargs):
return self._run_cmd("get", key, *args, **kwargs)
def add(self, key, *args, **kwargs):
return self._run_cmd("add", key, *args, **kwargs)
def touch(self, key, *args, **kwargs):
return self._run_cmd("touch", key, *args, **kwargs)
def delete(self, key, *args, **kwargs):
return self._run_cmd("delete", key, *args, **kwargs)
def cas(self, key, *args, **kwargs):
return self._run_cmd("cas", key, *args, **kwargs)
def gets(self, key, *args, **kwargs):
return self._run_cmd("gets", key, *args, **kwargs)
@_retry.retry()
def _add_group_to_group_list(self, group_id):
@ -320,17 +370,17 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
:param group_id: The group id
"""
group_list, cas = self.client.gets(self.GROUP_LIST_KEY)
group_list, cas = self.gets(self.GROUP_LIST_KEY)
if cas:
group_list = set(group_list)
group_list.add(group_id)
if not self.client.cas(self.GROUP_LIST_KEY,
list(group_list), cas):
if not self.cas(self.GROUP_LIST_KEY,
list(group_list), cas):
# Someone updated the group list before us, try again!
raise _retry.TryAgain
else:
if not self.client.add(self.GROUP_LIST_KEY,
[group_id], noreply=False):
if not self.add(self.GROUP_LIST_KEY,
[group_id], noreply=False):
# Someone updated the group list before us, try again!
raise _retry.TryAgain
@ -340,11 +390,11 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
:param group_id: The group id
"""
group_list, cas = self.client.gets(self.GROUP_LIST_KEY)
group_list, cas = self.gets(self.GROUP_LIST_KEY)
group_list = set(group_list)
group_list.remove(group_id)
if not self.client.cas(self.GROUP_LIST_KEY,
list(group_list), cas):
if not self.cas(self.GROUP_LIST_KEY,
list(group_list), cas):
# Someone updated the group list before us, try again!
raise _retry.TryAgain
@ -353,7 +403,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
@_translate_failures
def _create_group():
if not self.client.add(encoded_group, {}, noreply=False):
if not self.add(encoded_group, {}, noreply=False):
raise coordination.GroupAlreadyExist(group_id)
self._add_group_to_group_list(group_id)
@ -363,7 +413,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
@_translate_failures
def _get_groups():
return self.client.get(self.GROUP_LIST_KEY) or []
return self.get(self.GROUP_LIST_KEY) or []
return MemcachedFutureResult(self._executor.submit(_get_groups))
@ -373,7 +423,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
@_retry.retry()
@_translate_failures
def _join_group():
group_members, cas = self.client.gets(encoded_group)
group_members, cas = self.gets(encoded_group)
if group_members is None:
raise coordination.GroupNotCreated(group_id)
if self._member_id in group_members:
@ -382,7 +432,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
group_members[self._member_id] = {
b"capabilities": capabilities,
}
if not self.client.cas(encoded_group, group_members, cas):
if not self.cas(encoded_group, group_members, cas):
# It changed, let's try again
raise _retry.TryAgain
self._joined_groups.add(group_id)
@ -395,13 +445,13 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
@_retry.retry()
@_translate_failures
def _leave_group():
group_members, cas = self.client.gets(encoded_group)
group_members, cas = self.gets(encoded_group)
if group_members is None:
raise coordination.GroupNotCreated(group_id)
if self._member_id not in group_members:
raise coordination.MemberNotJoined(group_id, self._member_id)
del group_members[self._member_id]
if not self.client.cas(encoded_group, group_members, cas):
if not self.cas(encoded_group, group_members, cas):
# It changed, let's try again
raise _retry.TryAgain
self._joined_groups.discard(group_id)
@ -409,7 +459,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
return MemcachedFutureResult(self._executor.submit(_leave_group))
def _destroy_group(self, group_id):
self.client.delete(self._encode_group_id(group_id))
self.delete(self._encode_group_id(group_id))
def delete_group(self, group_id):
encoded_group = self._encode_group_id(group_id)
@ -417,16 +467,16 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
@_retry.retry()
@_translate_failures
def _delete_group():
group_members, cas = self.client.gets(encoded_group)
group_members, cas = self.gets(encoded_group)
if group_members is None:
raise coordination.GroupNotCreated(group_id)
if group_members != {}:
raise coordination.GroupNotEmpty(group_id)
# Delete is not atomic, so we first set the group to
# using CAS, and then we delete it, to avoid race conditions.
if not self.client.cas(encoded_group, None, cas):
if not self.cas(encoded_group, None, cas):
raise _retry.TryAgain
self.client.delete(encoded_group)
self.delete(encoded_group)
self._remove_from_group_list(group_id)
return MemcachedFutureResult(self._executor.submit(_delete_group))
@ -435,18 +485,18 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
@_translate_failures
def _get_members(self, group_id):
encoded_group = self._encode_group_id(group_id)
group_members, cas = self.client.gets(encoded_group)
group_members, cas = self.gets(encoded_group)
if group_members is None:
raise coordination.GroupNotCreated(group_id)
actual_group_members = {}
for m, v in group_members.items():
# Never kick self from the group, we know we're alive
if (m == self._member_id or
self.client.get(self._encode_member_id(m))):
self.get(self._encode_member_id(m))):
actual_group_members[m] = v
if group_members != actual_group_members:
# There are some dead members, update the group
if not self.client.cas(encoded_group, actual_group_members, cas):
if not self.cas(encoded_group, actual_group_members, cas):
# It changed, let's try again
raise _retry.TryAgain
return actual_group_members
@ -475,13 +525,13 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
@_retry.retry()
@_translate_failures
def _update_capabilities():
group_members, cas = self.client.gets(encoded_group)
group_members, cas = self.gets(encoded_group)
if group_members is None:
raise coordination.GroupNotCreated(group_id)
if self._member_id not in group_members:
raise coordination.MemberNotJoined(group_id, self._member_id)
group_members[self._member_id][b'capabilities'] = capabilities
if not self.client.cas(encoded_group, group_members, cas):
if not self.cas(encoded_group, group_members, cas):
# It changed, try again
raise _retry.TryAgain
@ -497,9 +547,9 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
@_translate_failures
def heartbeat(self):
self.client.set(self._encode_member_id(self._member_id),
self.STILL_ALIVE,
expire=self.membership_timeout)
self.set(self._encode_member_id(self._member_id),
self.STILL_ALIVE,
expire=self.membership_timeout)
# Reset the acquired locks
for lock in self._acquired_locks:
lock.heartbeat()