Add clustered memcached servers support for tooz.
Change-Id: I7ac6cdfecb6ad1d5fa2a78cfa6b236c45e3d4a96
This commit is contained in:
parent
22361cb48d
commit
ba3a753146
|
@ -26,10 +26,10 @@ from pymemcache import client as pymemcache_client
|
||||||
import tooz
|
import tooz
|
||||||
from tooz import _retry
|
from tooz import _retry
|
||||||
from tooz import coordination
|
from tooz import coordination
|
||||||
|
from tooz import hashring
|
||||||
from tooz import locking
|
from tooz import locking
|
||||||
from tooz import utils
|
from tooz import utils
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -96,7 +96,7 @@ class MemcachedLock(locking.Lock):
|
||||||
@_retry.retry(stop_max_delay=blocking)
|
@_retry.retry(stop_max_delay=blocking)
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def _acquire():
|
def _acquire():
|
||||||
if self.coord.client.add(
|
if self.coord.add(
|
||||||
self.name,
|
self.name,
|
||||||
self.coord._member_id,
|
self.coord._member_id,
|
||||||
expire=self.timeout,
|
expire=self.timeout,
|
||||||
|
@ -111,7 +111,7 @@ class MemcachedLock(locking.Lock):
|
||||||
|
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def break_(self):
|
def break_(self):
|
||||||
return bool(self.coord.client.delete(self.name, noreply=False))
|
return bool(self.coord.delete(self.name, noreply=False))
|
||||||
|
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def release(self):
|
def release(self):
|
||||||
|
@ -148,7 +148,7 @@ class MemcachedLock(locking.Lock):
|
||||||
# Do a ghetto test to see what the value is... (see above note),
|
# Do a ghetto test to see what the value is... (see above note),
|
||||||
# and how this really can't be done safely with memcache due to
|
# and how this really can't be done safely with memcache due to
|
||||||
# it being done in the client side (non-atomic).
|
# it being done in the client side (non-atomic).
|
||||||
value = self.coord.client.get(self.name)
|
value = self.coord.get(self.name)
|
||||||
if value != self.coord._member_id:
|
if value != self.coord._member_id:
|
||||||
# NOTE(zhen): Although ``member_ id`` is different, self lock
|
# NOTE(zhen): Although ``member_ id`` is different, self lock
|
||||||
# object needs to be removed from'_ acquired_locks' because it
|
# object needs to be removed from'_ acquired_locks' because it
|
||||||
|
@ -158,7 +158,7 @@ class MemcachedLock(locking.Lock):
|
||||||
else:
|
else:
|
||||||
# NOTE(zhen): Whether 'was_deleted' was 'TRUE' or not,
|
# NOTE(zhen): Whether 'was_deleted' was 'TRUE' or not,
|
||||||
# eventually we have to remove self from '_acquired_locks'.
|
# eventually we have to remove self from '_acquired_locks'.
|
||||||
was_deleted = self.coord.client.delete(self.name, noreply=False)
|
was_deleted = self.coord.delete(self.name, noreply=False)
|
||||||
self.coord._acquired_locks.remove(self)
|
self.coord._acquired_locks.remove(self)
|
||||||
return was_deleted
|
return was_deleted
|
||||||
|
|
||||||
|
@ -166,9 +166,9 @@ class MemcachedLock(locking.Lock):
|
||||||
def heartbeat(self):
|
def heartbeat(self):
|
||||||
"""Keep the lock alive."""
|
"""Keep the lock alive."""
|
||||||
if self.acquired:
|
if self.acquired:
|
||||||
poked = self.coord.client.touch(self.name,
|
poked = self.coord.touch(self.name,
|
||||||
expire=self.timeout,
|
expire=self.timeout,
|
||||||
noreply=False)
|
noreply=False)
|
||||||
if poked:
|
if poked:
|
||||||
return True
|
return True
|
||||||
LOG.warning("Unable to heartbeat by updating key '%s' with "
|
LOG.warning("Unable to heartbeat by updating key '%s' with "
|
||||||
|
@ -178,7 +178,7 @@ class MemcachedLock(locking.Lock):
|
||||||
|
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def get_owner(self):
|
def get_owner(self):
|
||||||
return self.coord.client.get(self.name)
|
return self.coord.get(self.name)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def acquired(self):
|
def acquired(self):
|
||||||
|
@ -209,8 +209,14 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
lock_timeout 30
|
lock_timeout 30
|
||||||
leader_timeout 30
|
leader_timeout 30
|
||||||
max_pool_size None
|
max_pool_size None
|
||||||
|
extra_host None
|
||||||
================== =======
|
================== =======
|
||||||
|
|
||||||
|
If clustered memcached server is needed, additional memcached servers are
|
||||||
|
listed with ``extra_host`` parameters as follows::
|
||||||
|
|
||||||
|
memcached://<host>:<port>?sentinel=<host>:<port>,<host>:<port>
|
||||||
|
|
||||||
General recommendations/usage considerations:
|
General recommendations/usage considerations:
|
||||||
|
|
||||||
- Memcache (without different backend technology) is a **cache** enough
|
- Memcache (without different backend technology) is a **cache** enough
|
||||||
|
@ -251,8 +257,15 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
|
|
||||||
def __init__(self, member_id, parsed_url, options):
|
def __init__(self, member_id, parsed_url, options):
|
||||||
super(MemcachedDriver, self).__init__(member_id, parsed_url, options)
|
super(MemcachedDriver, self).__init__(member_id, parsed_url, options)
|
||||||
self.host = (parsed_url.hostname or "localhost",
|
self.use_clustered_server = False
|
||||||
parsed_url.port or 11211)
|
host = parsed_url.netloc
|
||||||
|
self._host_list = {host: None}
|
||||||
|
extra_host = self._options.get('extra_host', None)
|
||||||
|
if extra_host is not None:
|
||||||
|
extra_host_list = extra_host.split(',')
|
||||||
|
self.use_clustered_server = True
|
||||||
|
for host_ in extra_host_list:
|
||||||
|
self._host_list[host_] = None
|
||||||
default_timeout = self._options.get('timeout', self.DEFAULT_TIMEOUT)
|
default_timeout = self._options.get('timeout', self.DEFAULT_TIMEOUT)
|
||||||
self.timeout = int(default_timeout)
|
self.timeout = int(default_timeout)
|
||||||
self.membership_timeout = int(self._options.get(
|
self.membership_timeout = int(self._options.get(
|
||||||
|
@ -267,6 +280,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
else:
|
else:
|
||||||
self.max_pool_size = None
|
self.max_pool_size = None
|
||||||
self._acquired_locks = []
|
self._acquired_locks = []
|
||||||
|
self._hash_ring = hashring.HashRing(self._host_list.keys())
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _msgpack_serializer(key, value):
|
def _msgpack_serializer(key, value):
|
||||||
|
@ -286,13 +300,15 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def _start(self):
|
def _start(self):
|
||||||
super(MemcachedDriver, self)._start()
|
super(MemcachedDriver, self)._start()
|
||||||
self.client = pymemcache_client.PooledClient(
|
# NOTE(xuqi): Map each host to a pooledclient.
|
||||||
self.host,
|
for host in self._host_list.keys():
|
||||||
serializer=self._msgpack_serializer,
|
self._host_list[host] = pymemcache_client.PooledClient(
|
||||||
deserializer=self._msgpack_deserializer,
|
host,
|
||||||
timeout=self.timeout,
|
serializer=self._msgpack_serializer,
|
||||||
connect_timeout=self.timeout,
|
deserializer=self._msgpack_deserializer,
|
||||||
max_pool_size=self.max_pool_size)
|
timeout=self.timeout,
|
||||||
|
connect_timeout=self.timeout,
|
||||||
|
max_pool_size=self.max_pool_size)
|
||||||
# Run heartbeat here because pymemcache use a lazy connection
|
# Run heartbeat here because pymemcache use a lazy connection
|
||||||
# method and only connect once you do an operation.
|
# method and only connect once you do an operation.
|
||||||
self.heartbeat()
|
self.heartbeat()
|
||||||
|
@ -302,8 +318,9 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
super(MemcachedDriver, self)._stop()
|
super(MemcachedDriver, self)._stop()
|
||||||
for lock in list(self._acquired_locks):
|
for lock in list(self._acquired_locks):
|
||||||
lock.release()
|
lock.release()
|
||||||
self.client.delete(self._encode_member_id(self._member_id))
|
self.delete(self._encode_member_id(self._member_id))
|
||||||
self.client.close()
|
for host, client in self._host_list.items():
|
||||||
|
client.close()
|
||||||
|
|
||||||
def _encode_group_id(self, group_id):
|
def _encode_group_id(self, group_id):
|
||||||
return self.GROUP_PREFIX + utils.to_binary(group_id)
|
return self.GROUP_PREFIX + utils.to_binary(group_id)
|
||||||
|
@ -314,23 +331,61 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
def _encode_group_leader(self, group_id):
|
def _encode_group_leader(self, group_id):
|
||||||
return self.GROUP_LEADER_PREFIX + utils.to_binary(group_id)
|
return self.GROUP_LEADER_PREFIX + utils.to_binary(group_id)
|
||||||
|
|
||||||
|
def _get_client(self, key):
|
||||||
|
"""get the client for the key from the hash ring."""
|
||||||
|
if not self.use_clustered_server:
|
||||||
|
for host, client in self._host_list.items():
|
||||||
|
return client
|
||||||
|
host = self._hash_ring.get_nodes(key)
|
||||||
|
if host is None:
|
||||||
|
raise tooz.ToozError("Can not map key to any server.")
|
||||||
|
return self._host_list[host.pop()]
|
||||||
|
|
||||||
|
def _run_cmd(self, cmd, key, *args, **kwargs):
|
||||||
|
client = self._get_client(key)
|
||||||
|
func = getattr(client, cmd)
|
||||||
|
args = list(args)
|
||||||
|
args.insert(0, key)
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
def set(self, key, *args, **kwargs):
|
||||||
|
return self._run_cmd("set", key, *args, **kwargs)
|
||||||
|
|
||||||
|
def get(self, key, *args, **kwargs):
|
||||||
|
return self._run_cmd("get", key, *args, **kwargs)
|
||||||
|
|
||||||
|
def add(self, key, *args, **kwargs):
|
||||||
|
return self._run_cmd("add", key, *args, **kwargs)
|
||||||
|
|
||||||
|
def touch(self, key, *args, **kwargs):
|
||||||
|
return self._run_cmd("touch", key, *args, **kwargs)
|
||||||
|
|
||||||
|
def delete(self, key, *args, **kwargs):
|
||||||
|
return self._run_cmd("delete", key, *args, **kwargs)
|
||||||
|
|
||||||
|
def cas(self, key, *args, **kwargs):
|
||||||
|
return self._run_cmd("cas", key, *args, **kwargs)
|
||||||
|
|
||||||
|
def gets(self, key, *args, **kwargs):
|
||||||
|
return self._run_cmd("gets", key, *args, **kwargs)
|
||||||
|
|
||||||
@_retry.retry()
|
@_retry.retry()
|
||||||
def _add_group_to_group_list(self, group_id):
|
def _add_group_to_group_list(self, group_id):
|
||||||
"""Add group to the group list.
|
"""Add group to the group list.
|
||||||
|
|
||||||
:param group_id: The group id
|
:param group_id: The group id
|
||||||
"""
|
"""
|
||||||
group_list, cas = self.client.gets(self.GROUP_LIST_KEY)
|
group_list, cas = self.gets(self.GROUP_LIST_KEY)
|
||||||
if cas:
|
if cas:
|
||||||
group_list = set(group_list)
|
group_list = set(group_list)
|
||||||
group_list.add(group_id)
|
group_list.add(group_id)
|
||||||
if not self.client.cas(self.GROUP_LIST_KEY,
|
if not self.cas(self.GROUP_LIST_KEY,
|
||||||
list(group_list), cas):
|
list(group_list), cas):
|
||||||
# Someone updated the group list before us, try again!
|
# Someone updated the group list before us, try again!
|
||||||
raise _retry.TryAgain
|
raise _retry.TryAgain
|
||||||
else:
|
else:
|
||||||
if not self.client.add(self.GROUP_LIST_KEY,
|
if not self.add(self.GROUP_LIST_KEY,
|
||||||
[group_id], noreply=False):
|
[group_id], noreply=False):
|
||||||
# Someone updated the group list before us, try again!
|
# Someone updated the group list before us, try again!
|
||||||
raise _retry.TryAgain
|
raise _retry.TryAgain
|
||||||
|
|
||||||
|
@ -340,11 +395,11 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
|
|
||||||
:param group_id: The group id
|
:param group_id: The group id
|
||||||
"""
|
"""
|
||||||
group_list, cas = self.client.gets(self.GROUP_LIST_KEY)
|
group_list, cas = self.gets(self.GROUP_LIST_KEY)
|
||||||
group_list = set(group_list)
|
group_list = set(group_list)
|
||||||
group_list.remove(group_id)
|
group_list.remove(group_id)
|
||||||
if not self.client.cas(self.GROUP_LIST_KEY,
|
if not self.cas(self.GROUP_LIST_KEY,
|
||||||
list(group_list), cas):
|
list(group_list), cas):
|
||||||
# Someone updated the group list before us, try again!
|
# Someone updated the group list before us, try again!
|
||||||
raise _retry.TryAgain
|
raise _retry.TryAgain
|
||||||
|
|
||||||
|
@ -353,7 +408,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
|
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def _create_group():
|
def _create_group():
|
||||||
if not self.client.add(encoded_group, {}, noreply=False):
|
if not self.add(encoded_group, {}, noreply=False):
|
||||||
raise coordination.GroupAlreadyExist(group_id)
|
raise coordination.GroupAlreadyExist(group_id)
|
||||||
self._add_group_to_group_list(group_id)
|
self._add_group_to_group_list(group_id)
|
||||||
|
|
||||||
|
@ -363,7 +418,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
|
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def _get_groups():
|
def _get_groups():
|
||||||
return self.client.get(self.GROUP_LIST_KEY) or []
|
return self.get(self.GROUP_LIST_KEY) or []
|
||||||
|
|
||||||
return MemcachedFutureResult(self._executor.submit(_get_groups))
|
return MemcachedFutureResult(self._executor.submit(_get_groups))
|
||||||
|
|
||||||
|
@ -373,7 +428,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
@_retry.retry()
|
@_retry.retry()
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def _join_group():
|
def _join_group():
|
||||||
group_members, cas = self.client.gets(encoded_group)
|
group_members, cas = self.gets(encoded_group)
|
||||||
if group_members is None:
|
if group_members is None:
|
||||||
raise coordination.GroupNotCreated(group_id)
|
raise coordination.GroupNotCreated(group_id)
|
||||||
if self._member_id in group_members:
|
if self._member_id in group_members:
|
||||||
|
@ -382,7 +437,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
group_members[self._member_id] = {
|
group_members[self._member_id] = {
|
||||||
b"capabilities": capabilities,
|
b"capabilities": capabilities,
|
||||||
}
|
}
|
||||||
if not self.client.cas(encoded_group, group_members, cas):
|
if not self.cas(encoded_group, group_members, cas):
|
||||||
# It changed, let's try again
|
# It changed, let's try again
|
||||||
raise _retry.TryAgain
|
raise _retry.TryAgain
|
||||||
self._joined_groups.add(group_id)
|
self._joined_groups.add(group_id)
|
||||||
|
@ -395,13 +450,13 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
@_retry.retry()
|
@_retry.retry()
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def _leave_group():
|
def _leave_group():
|
||||||
group_members, cas = self.client.gets(encoded_group)
|
group_members, cas = self.gets(encoded_group)
|
||||||
if group_members is None:
|
if group_members is None:
|
||||||
raise coordination.GroupNotCreated(group_id)
|
raise coordination.GroupNotCreated(group_id)
|
||||||
if self._member_id not in group_members:
|
if self._member_id not in group_members:
|
||||||
raise coordination.MemberNotJoined(group_id, self._member_id)
|
raise coordination.MemberNotJoined(group_id, self._member_id)
|
||||||
del group_members[self._member_id]
|
del group_members[self._member_id]
|
||||||
if not self.client.cas(encoded_group, group_members, cas):
|
if not self.cas(encoded_group, group_members, cas):
|
||||||
# It changed, let's try again
|
# It changed, let's try again
|
||||||
raise _retry.TryAgain
|
raise _retry.TryAgain
|
||||||
self._joined_groups.discard(group_id)
|
self._joined_groups.discard(group_id)
|
||||||
|
@ -409,7 +464,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
return MemcachedFutureResult(self._executor.submit(_leave_group))
|
return MemcachedFutureResult(self._executor.submit(_leave_group))
|
||||||
|
|
||||||
def _destroy_group(self, group_id):
|
def _destroy_group(self, group_id):
|
||||||
self.client.delete(self._encode_group_id(group_id))
|
self.delete(self._encode_group_id(group_id))
|
||||||
|
|
||||||
def delete_group(self, group_id):
|
def delete_group(self, group_id):
|
||||||
encoded_group = self._encode_group_id(group_id)
|
encoded_group = self._encode_group_id(group_id)
|
||||||
|
@ -417,16 +472,16 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
@_retry.retry()
|
@_retry.retry()
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def _delete_group():
|
def _delete_group():
|
||||||
group_members, cas = self.client.gets(encoded_group)
|
group_members, cas = self.gets(encoded_group)
|
||||||
if group_members is None:
|
if group_members is None:
|
||||||
raise coordination.GroupNotCreated(group_id)
|
raise coordination.GroupNotCreated(group_id)
|
||||||
if group_members != {}:
|
if group_members != {}:
|
||||||
raise coordination.GroupNotEmpty(group_id)
|
raise coordination.GroupNotEmpty(group_id)
|
||||||
# Delete is not atomic, so we first set the group to
|
# Delete is not atomic, so we first set the group to
|
||||||
# using CAS, and then we delete it, to avoid race conditions.
|
# using CAS, and then we delete it, to avoid race conditions.
|
||||||
if not self.client.cas(encoded_group, None, cas):
|
if not self.cas(encoded_group, None, cas):
|
||||||
raise _retry.TryAgain
|
raise _retry.TryAgain
|
||||||
self.client.delete(encoded_group)
|
self.delete(encoded_group)
|
||||||
self._remove_from_group_list(group_id)
|
self._remove_from_group_list(group_id)
|
||||||
|
|
||||||
return MemcachedFutureResult(self._executor.submit(_delete_group))
|
return MemcachedFutureResult(self._executor.submit(_delete_group))
|
||||||
|
@ -435,18 +490,18 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def _get_members(self, group_id):
|
def _get_members(self, group_id):
|
||||||
encoded_group = self._encode_group_id(group_id)
|
encoded_group = self._encode_group_id(group_id)
|
||||||
group_members, cas = self.client.gets(encoded_group)
|
group_members, cas = self.gets(encoded_group)
|
||||||
if group_members is None:
|
if group_members is None:
|
||||||
raise coordination.GroupNotCreated(group_id)
|
raise coordination.GroupNotCreated(group_id)
|
||||||
actual_group_members = {}
|
actual_group_members = {}
|
||||||
for m, v in group_members.items():
|
for m, v in group_members.items():
|
||||||
# Never kick self from the group, we know we're alive
|
# Never kick self from the group, we know we're alive
|
||||||
if (m == self._member_id or
|
if (m == self._member_id or
|
||||||
self.client.get(self._encode_member_id(m))):
|
self.get(self._encode_member_id(m))):
|
||||||
actual_group_members[m] = v
|
actual_group_members[m] = v
|
||||||
if group_members != actual_group_members:
|
if group_members != actual_group_members:
|
||||||
# There are some dead members, update the group
|
# There are some dead members, update the group
|
||||||
if not self.client.cas(encoded_group, actual_group_members, cas):
|
if not self.cas(encoded_group, actual_group_members, cas):
|
||||||
# It changed, let's try again
|
# It changed, let's try again
|
||||||
raise _retry.TryAgain
|
raise _retry.TryAgain
|
||||||
return actual_group_members
|
return actual_group_members
|
||||||
|
@ -475,13 +530,13 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
@_retry.retry()
|
@_retry.retry()
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def _update_capabilities():
|
def _update_capabilities():
|
||||||
group_members, cas = self.client.gets(encoded_group)
|
group_members, cas = self.gets(encoded_group)
|
||||||
if group_members is None:
|
if group_members is None:
|
||||||
raise coordination.GroupNotCreated(group_id)
|
raise coordination.GroupNotCreated(group_id)
|
||||||
if self._member_id not in group_members:
|
if self._member_id not in group_members:
|
||||||
raise coordination.MemberNotJoined(group_id, self._member_id)
|
raise coordination.MemberNotJoined(group_id, self._member_id)
|
||||||
group_members[self._member_id][b'capabilities'] = capabilities
|
group_members[self._member_id][b'capabilities'] = capabilities
|
||||||
if not self.client.cas(encoded_group, group_members, cas):
|
if not self.cas(encoded_group, group_members, cas):
|
||||||
# It changed, try again
|
# It changed, try again
|
||||||
raise _retry.TryAgain
|
raise _retry.TryAgain
|
||||||
|
|
||||||
|
@ -497,9 +552,9 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
|
||||||
|
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def heartbeat(self):
|
def heartbeat(self):
|
||||||
self.client.set(self._encode_member_id(self._member_id),
|
self.set(self._encode_member_id(self._member_id),
|
||||||
self.STILL_ALIVE,
|
self.STILL_ALIVE,
|
||||||
expire=self.membership_timeout)
|
expire=self.membership_timeout)
|
||||||
# Reset the acquired locks
|
# Reset the acquired locks
|
||||||
for lock in self._acquired_locks:
|
for lock in self._acquired_locks:
|
||||||
lock.heartbeat()
|
lock.heartbeat()
|
||||||
|
|
Loading…
Reference in New Issue