Add clustered memcached servers support for tooz.
Change-Id: I7ac6cdfecb6ad1d5fa2a78cfa6b236c45e3d4a96
This commit is contained in:
parent
22361cb48d
commit
c17a41d629
|
@ -26,10 +26,10 @@ from pymemcache import client as pymemcache_client
|
|||
import tooz
|
||||
from tooz import _retry
|
||||
from tooz import coordination
|
||||
from tooz import hashring
|
||||
from tooz import locking
|
||||
from tooz import utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -96,7 +96,7 @@ class MemcachedLock(locking.Lock):
|
|||
@_retry.retry(stop_max_delay=blocking)
|
||||
@_translate_failures
|
||||
def _acquire():
|
||||
if self.coord.client.add(
|
||||
if self.coord.add(
|
||||
self.name,
|
||||
self.coord._member_id,
|
||||
expire=self.timeout,
|
||||
|
@ -111,7 +111,7 @@ class MemcachedLock(locking.Lock):
|
|||
|
||||
@_translate_failures
|
||||
def break_(self):
|
||||
return bool(self.coord.client.delete(self.name, noreply=False))
|
||||
return bool(self.coord.delete(self.name, noreply=False))
|
||||
|
||||
@_translate_failures
|
||||
def release(self):
|
||||
|
@ -148,7 +148,7 @@ class MemcachedLock(locking.Lock):
|
|||
# Do a ghetto test to see what the value is... (see above note),
|
||||
# and how this really can't be done safely with memcache due to
|
||||
# it being done in the client side (non-atomic).
|
||||
value = self.coord.client.get(self.name)
|
||||
value = self.coord.get(self.name)
|
||||
if value != self.coord._member_id:
|
||||
# NOTE(zhen): Although ``member_ id`` is different, self lock
|
||||
# object needs to be removed from'_ acquired_locks' because it
|
||||
|
@ -158,7 +158,7 @@ class MemcachedLock(locking.Lock):
|
|||
else:
|
||||
# NOTE(zhen): Whether 'was_deleted' was 'TRUE' or not,
|
||||
# eventually we have to remove self from '_acquired_locks'.
|
||||
was_deleted = self.coord.client.delete(self.name, noreply=False)
|
||||
was_deleted = self.coord.delete(self.name, noreply=False)
|
||||
self.coord._acquired_locks.remove(self)
|
||||
return was_deleted
|
||||
|
||||
|
@ -166,7 +166,7 @@ class MemcachedLock(locking.Lock):
|
|||
def heartbeat(self):
|
||||
"""Keep the lock alive."""
|
||||
if self.acquired:
|
||||
poked = self.coord.client.touch(self.name,
|
||||
poked = self.coord.touch(self.name,
|
||||
expire=self.timeout,
|
||||
noreply=False)
|
||||
if poked:
|
||||
|
@ -178,7 +178,7 @@ class MemcachedLock(locking.Lock):
|
|||
|
||||
@_translate_failures
|
||||
def get_owner(self):
|
||||
return self.coord.client.get(self.name)
|
||||
return self.coord.get(self.name)
|
||||
|
||||
@property
|
||||
def acquired(self):
|
||||
|
@ -209,8 +209,15 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
lock_timeout 30
|
||||
leader_timeout 30
|
||||
max_pool_size None
|
||||
extra_server None
|
||||
================== =======
|
||||
|
||||
If clustered memcached server is needed, additional memcached servers are
|
||||
listed with ``extra_server`` parameters as follows::
|
||||
|
||||
memcached://<host>:<port>?extra_server=<host>:<port>&
|
||||
extra_server=<host>:<port>
|
||||
|
||||
General recommendations/usage considerations:
|
||||
|
||||
- Memcache (without different backend technology) is a **cache** enough
|
||||
|
@ -249,10 +256,23 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
#: String used to keep a key/member alive (until it next expires).
|
||||
STILL_ALIVE = b"It's alive!"
|
||||
|
||||
#: Client arguments that are expected/allowed to be lists.
|
||||
CLIENT_LIST_ARGS = frozenset([
|
||||
'extra_server',
|
||||
])
|
||||
|
||||
EXCLUDE_OPTIONS = CLIENT_LIST_ARGS
|
||||
|
||||
def __init__(self, member_id, parsed_url, options):
|
||||
super(MemcachedDriver, self).__init__(member_id, parsed_url, options)
|
||||
self.host = (parsed_url.hostname or "localhost",
|
||||
parsed_url.port or 11211)
|
||||
self.use_clustered_server = False
|
||||
host = parsed_url.netloc
|
||||
self._host_list = {host: None}
|
||||
extra_servers = self._options.get('extra_server', [])
|
||||
if extra_servers:
|
||||
self.use_clustered_server = True
|
||||
for server in extra_servers:
|
||||
self._host_list[server] = None
|
||||
default_timeout = self._options.get('timeout', self.DEFAULT_TIMEOUT)
|
||||
self.timeout = int(default_timeout)
|
||||
self.membership_timeout = int(self._options.get(
|
||||
|
@ -267,6 +287,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
else:
|
||||
self.max_pool_size = None
|
||||
self._acquired_locks = []
|
||||
self._hash_ring = hashring.HashRing(self._host_list.keys())
|
||||
|
||||
@staticmethod
|
||||
def _msgpack_serializer(key, value):
|
||||
|
@ -286,8 +307,10 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
@_translate_failures
|
||||
def _start(self):
|
||||
super(MemcachedDriver, self)._start()
|
||||
self.client = pymemcache_client.PooledClient(
|
||||
self.host,
|
||||
# NOTE(xuqi): Map each host to a pooledclient.
|
||||
for host in self._host_list.keys():
|
||||
self._host_list[host] = pymemcache_client.PooledClient(
|
||||
host,
|
||||
serializer=self._msgpack_serializer,
|
||||
deserializer=self._msgpack_deserializer,
|
||||
timeout=self.timeout,
|
||||
|
@ -302,8 +325,9 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
super(MemcachedDriver, self)._stop()
|
||||
for lock in list(self._acquired_locks):
|
||||
lock.release()
|
||||
self.client.delete(self._encode_member_id(self._member_id))
|
||||
self.client.close()
|
||||
self.delete(self._encode_member_id(self._member_id))
|
||||
for host, client in self._host_list.items():
|
||||
client.close()
|
||||
|
||||
def _encode_group_id(self, group_id):
|
||||
return self.GROUP_PREFIX + utils.to_binary(group_id)
|
||||
|
@ -314,22 +338,60 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
def _encode_group_leader(self, group_id):
|
||||
return self.GROUP_LEADER_PREFIX + utils.to_binary(group_id)
|
||||
|
||||
def _get_client(self, key):
|
||||
"""get the client for the key from the hash ring."""
|
||||
if not self.use_clustered_server:
|
||||
for host, client in self._host_list.items():
|
||||
return client
|
||||
host = self._hash_ring.get_nodes(key)
|
||||
if host is None:
|
||||
raise tooz.ToozError("Can not map key to any server.")
|
||||
return self._host_list[host.pop()]
|
||||
|
||||
def _run_cmd(self, cmd, key, *args, **kwargs):
|
||||
client = self._get_client(key)
|
||||
func = getattr(client, cmd)
|
||||
args = list(args)
|
||||
args.insert(0, key)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
def set(self, key, *args, **kwargs):
|
||||
return self._run_cmd("set", key, *args, **kwargs)
|
||||
|
||||
def get(self, key, *args, **kwargs):
|
||||
return self._run_cmd("get", key, *args, **kwargs)
|
||||
|
||||
def add(self, key, *args, **kwargs):
|
||||
return self._run_cmd("add", key, *args, **kwargs)
|
||||
|
||||
def touch(self, key, *args, **kwargs):
|
||||
return self._run_cmd("touch", key, *args, **kwargs)
|
||||
|
||||
def delete(self, key, *args, **kwargs):
|
||||
return self._run_cmd("delete", key, *args, **kwargs)
|
||||
|
||||
def cas(self, key, *args, **kwargs):
|
||||
return self._run_cmd("cas", key, *args, **kwargs)
|
||||
|
||||
def gets(self, key, *args, **kwargs):
|
||||
return self._run_cmd("gets", key, *args, **kwargs)
|
||||
|
||||
@_retry.retry()
|
||||
def _add_group_to_group_list(self, group_id):
|
||||
"""Add group to the group list.
|
||||
|
||||
:param group_id: The group id
|
||||
"""
|
||||
group_list, cas = self.client.gets(self.GROUP_LIST_KEY)
|
||||
group_list, cas = self.gets(self.GROUP_LIST_KEY)
|
||||
if cas:
|
||||
group_list = set(group_list)
|
||||
group_list.add(group_id)
|
||||
if not self.client.cas(self.GROUP_LIST_KEY,
|
||||
if not self.cas(self.GROUP_LIST_KEY,
|
||||
list(group_list), cas):
|
||||
# Someone updated the group list before us, try again!
|
||||
raise _retry.TryAgain
|
||||
else:
|
||||
if not self.client.add(self.GROUP_LIST_KEY,
|
||||
if not self.add(self.GROUP_LIST_KEY,
|
||||
[group_id], noreply=False):
|
||||
# Someone updated the group list before us, try again!
|
||||
raise _retry.TryAgain
|
||||
|
@ -340,10 +402,10 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
|
||||
:param group_id: The group id
|
||||
"""
|
||||
group_list, cas = self.client.gets(self.GROUP_LIST_KEY)
|
||||
group_list, cas = self.gets(self.GROUP_LIST_KEY)
|
||||
group_list = set(group_list)
|
||||
group_list.remove(group_id)
|
||||
if not self.client.cas(self.GROUP_LIST_KEY,
|
||||
if not self.cas(self.GROUP_LIST_KEY,
|
||||
list(group_list), cas):
|
||||
# Someone updated the group list before us, try again!
|
||||
raise _retry.TryAgain
|
||||
|
@ -353,7 +415,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
|
||||
@_translate_failures
|
||||
def _create_group():
|
||||
if not self.client.add(encoded_group, {}, noreply=False):
|
||||
if not self.add(encoded_group, {}, noreply=False):
|
||||
raise coordination.GroupAlreadyExist(group_id)
|
||||
self._add_group_to_group_list(group_id)
|
||||
|
||||
|
@ -363,7 +425,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
|
||||
@_translate_failures
|
||||
def _get_groups():
|
||||
return self.client.get(self.GROUP_LIST_KEY) or []
|
||||
return self.get(self.GROUP_LIST_KEY) or []
|
||||
|
||||
return MemcachedFutureResult(self._executor.submit(_get_groups))
|
||||
|
||||
|
@ -373,7 +435,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
@_retry.retry()
|
||||
@_translate_failures
|
||||
def _join_group():
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
group_members, cas = self.gets(encoded_group)
|
||||
if group_members is None:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
if self._member_id in group_members:
|
||||
|
@ -382,7 +444,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
group_members[self._member_id] = {
|
||||
b"capabilities": capabilities,
|
||||
}
|
||||
if not self.client.cas(encoded_group, group_members, cas):
|
||||
if not self.cas(encoded_group, group_members, cas):
|
||||
# It changed, let's try again
|
||||
raise _retry.TryAgain
|
||||
self._joined_groups.add(group_id)
|
||||
|
@ -395,13 +457,13 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
@_retry.retry()
|
||||
@_translate_failures
|
||||
def _leave_group():
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
group_members, cas = self.gets(encoded_group)
|
||||
if group_members is None:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
if self._member_id not in group_members:
|
||||
raise coordination.MemberNotJoined(group_id, self._member_id)
|
||||
del group_members[self._member_id]
|
||||
if not self.client.cas(encoded_group, group_members, cas):
|
||||
if not self.cas(encoded_group, group_members, cas):
|
||||
# It changed, let's try again
|
||||
raise _retry.TryAgain
|
||||
self._joined_groups.discard(group_id)
|
||||
|
@ -409,7 +471,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
return MemcachedFutureResult(self._executor.submit(_leave_group))
|
||||
|
||||
def _destroy_group(self, group_id):
|
||||
self.client.delete(self._encode_group_id(group_id))
|
||||
self.delete(self._encode_group_id(group_id))
|
||||
|
||||
def delete_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
|
@ -417,16 +479,16 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
@_retry.retry()
|
||||
@_translate_failures
|
||||
def _delete_group():
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
group_members, cas = self.gets(encoded_group)
|
||||
if group_members is None:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
if group_members != {}:
|
||||
raise coordination.GroupNotEmpty(group_id)
|
||||
# Delete is not atomic, so we first set the group to
|
||||
# using CAS, and then we delete it, to avoid race conditions.
|
||||
if not self.client.cas(encoded_group, None, cas):
|
||||
if not self.cas(encoded_group, None, cas):
|
||||
raise _retry.TryAgain
|
||||
self.client.delete(encoded_group)
|
||||
self.delete(encoded_group)
|
||||
self._remove_from_group_list(group_id)
|
||||
|
||||
return MemcachedFutureResult(self._executor.submit(_delete_group))
|
||||
|
@ -435,18 +497,18 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
@_translate_failures
|
||||
def _get_members(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
group_members, cas = self.gets(encoded_group)
|
||||
if group_members is None:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
actual_group_members = {}
|
||||
for m, v in group_members.items():
|
||||
# Never kick self from the group, we know we're alive
|
||||
if (m == self._member_id or
|
||||
self.client.get(self._encode_member_id(m))):
|
||||
self.get(self._encode_member_id(m))):
|
||||
actual_group_members[m] = v
|
||||
if group_members != actual_group_members:
|
||||
# There are some dead members, update the group
|
||||
if not self.client.cas(encoded_group, actual_group_members, cas):
|
||||
if not self.cas(encoded_group, actual_group_members, cas):
|
||||
# It changed, let's try again
|
||||
raise _retry.TryAgain
|
||||
return actual_group_members
|
||||
|
@ -475,13 +537,13 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
@_retry.retry()
|
||||
@_translate_failures
|
||||
def _update_capabilities():
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
group_members, cas = self.gets(encoded_group)
|
||||
if group_members is None:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
if self._member_id not in group_members:
|
||||
raise coordination.MemberNotJoined(group_id, self._member_id)
|
||||
group_members[self._member_id][b'capabilities'] = capabilities
|
||||
if not self.client.cas(encoded_group, group_members, cas):
|
||||
if not self.cas(encoded_group, group_members, cas):
|
||||
# It changed, try again
|
||||
raise _retry.TryAgain
|
||||
|
||||
|
@ -497,7 +559,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
|||
|
||||
@_translate_failures
|
||||
def heartbeat(self):
|
||||
self.client.set(self._encode_member_id(self._member_id),
|
||||
self.set(self._encode_member_id(self._member_id),
|
||||
self.STILL_ALIVE,
|
||||
expire=self.membership_timeout)
|
||||
# Reset the acquired locks
|
||||
|
|
Loading…
Reference in New Issue