diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index 7cc40a9c..3c78ea3c 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -26,10 +26,10 @@ from pymemcache import client as pymemcache_client import tooz from tooz import _retry from tooz import coordination +from tooz import hashring from tooz import locking from tooz import utils - LOG = logging.getLogger(__name__) @@ -96,7 +96,7 @@ class MemcachedLock(locking.Lock): @_retry.retry(stop_max_delay=blocking) @_translate_failures def _acquire(): - if self.coord.client.add( + if self.coord.add( self.name, self.coord._member_id, expire=self.timeout, @@ -111,7 +111,7 @@ class MemcachedLock(locking.Lock): @_translate_failures def break_(self): - return bool(self.coord.client.delete(self.name, noreply=False)) + return bool(self.coord.delete(self.name, noreply=False)) @_translate_failures def release(self): @@ -148,7 +148,7 @@ class MemcachedLock(locking.Lock): # Do a ghetto test to see what the value is... (see above note), # and how this really can't be done safely with memcache due to # it being done in the client side (non-atomic). - value = self.coord.client.get(self.name) + value = self.coord.get(self.name) if value != self.coord._member_id: # NOTE(zhen): Although ``member_ id`` is different, self lock # object needs to be removed from'_ acquired_locks' because it @@ -158,7 +158,7 @@ class MemcachedLock(locking.Lock): else: # NOTE(zhen): Whether 'was_deleted' was 'TRUE' or not, # eventually we have to remove self from '_acquired_locks'. - was_deleted = self.coord.client.delete(self.name, noreply=False) + was_deleted = self.coord.delete(self.name, noreply=False) self.coord._acquired_locks.remove(self) return was_deleted @@ -166,9 +166,9 @@ class MemcachedLock(locking.Lock): def heartbeat(self): """Keep the lock alive.""" if self.acquired: - poked = self.coord.client.touch(self.name, - expire=self.timeout, - noreply=False) + poked = self.coord.touch(self.name, + expire=self.timeout, + noreply=False) if poked: return True LOG.warning("Unable to heartbeat by updating key '%s' with " @@ -178,7 +178,7 @@ class MemcachedLock(locking.Lock): @_translate_failures def get_owner(self): - return self.coord.client.get(self.name) + return self.coord.get(self.name) @property def acquired(self): @@ -209,8 +209,14 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, lock_timeout 30 leader_timeout 30 max_pool_size None + extra_host None ================== ======= + If clustered memcached server is needed, additional memcached servers are + listed with ``extra_host`` parameters as follows:: + + memcached://:?sentinel=:,: + General recommendations/usage considerations: - Memcache (without different backend technology) is a **cache** enough @@ -251,8 +257,13 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, def __init__(self, member_id, parsed_url, options): super(MemcachedDriver, self).__init__(member_id, parsed_url, options) - self.host = (parsed_url.hostname or "localhost", - parsed_url.port or 11211) + host = parsed_url.netloc + self._host_list = {host: None} + extra_host = self._options.get('extra_host', None) + if extra_host is not None: + extra_host_list = extra_host.split(',') + for host_ in extra_host_list: + self._host_list[host_] = None default_timeout = self._options.get('timeout', self.DEFAULT_TIMEOUT) self.timeout = int(default_timeout) self.membership_timeout = int(self._options.get( @@ -267,6 +278,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, else: self.max_pool_size = None self._acquired_locks = [] + self._hash_ring = hashring.HashRing(self._host_list.keys()) @staticmethod def _msgpack_serializer(key, value): @@ -286,13 +298,15 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, @_translate_failures def _start(self): super(MemcachedDriver, self)._start() - self.client = pymemcache_client.PooledClient( - self.host, - serializer=self._msgpack_serializer, - deserializer=self._msgpack_deserializer, - timeout=self.timeout, - connect_timeout=self.timeout, - max_pool_size=self.max_pool_size) + # NOTE(xuqi): Map each host to a pooledclient. + for host in self._host_list.keys(): + self._host_list[host] = pymemcache_client.PooledClient( + host, + serializer=self._msgpack_serializer, + deserializer=self._msgpack_deserializer, + timeout=self.timeout, + connect_timeout=self.timeout, + max_pool_size=self.max_pool_size) # Run heartbeat here because pymemcache use a lazy connection # method and only connect once you do an operation. self.heartbeat() @@ -302,8 +316,9 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, super(MemcachedDriver, self)._stop() for lock in list(self._acquired_locks): lock.release() - self.client.delete(self._encode_member_id(self._member_id)) - self.client.close() + self.delete(self._encode_member_id(self._member_id)) + for host, client in self._host_list.items(): + client.close() def _encode_group_id(self, group_id): return self.GROUP_PREFIX + utils.to_binary(group_id) @@ -313,6 +328,41 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, def _encode_group_leader(self, group_id): return self.GROUP_LEADER_PREFIX + utils.to_binary(group_id) + + def _get_client(self, key): + """get the client for the key from the hash ring.""" + host = self._hash_ring.get_nodes(key) + if host is None: + raise tooz.ToozError("Can not map key to any server.") + return self._host_list[host.pop()] + + def _run_cmd(self, cmd, key, *args, **kwargs): + client = self._get_client(key) + func = getattr(client, cmd) + args = list(args) + args.insert(0, key) + return func(*args, **kwargs) + + def set(self, key, *args, **kwargs): + return self._run_cmd("set", key, *args, **kwargs) + + def get(self, key, *args, **kwargs): + return self._run_cmd("get", key, *args, **kwargs) + + def add(self, key, *args, **kwargs): + return self._run_cmd("add", key, *args, **kwargs) + + def touch(self, key, *args, **kwargs): + return self._run_cmd("touch", key, *args, **kwargs) + + def delete(self, key, *args, **kwargs): + return self._run_cmd("delete", key, *args, **kwargs) + + def cas(self, key, *args, **kwargs): + return self._run_cmd("cas", key, *args, **kwargs) + + def gets(self, key, *args, **kwargs): + return self._run_cmd("gets", key, *args, **kwargs) @_retry.retry() def _add_group_to_group_list(self, group_id): @@ -320,17 +370,17 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, :param group_id: The group id """ - group_list, cas = self.client.gets(self.GROUP_LIST_KEY) + group_list, cas = self.gets(self.GROUP_LIST_KEY) if cas: group_list = set(group_list) group_list.add(group_id) - if not self.client.cas(self.GROUP_LIST_KEY, - list(group_list), cas): + if not self.cas(self.GROUP_LIST_KEY, + list(group_list), cas): # Someone updated the group list before us, try again! raise _retry.TryAgain else: - if not self.client.add(self.GROUP_LIST_KEY, - [group_id], noreply=False): + if not self.add(self.GROUP_LIST_KEY, + [group_id], noreply=False): # Someone updated the group list before us, try again! raise _retry.TryAgain @@ -340,11 +390,11 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, :param group_id: The group id """ - group_list, cas = self.client.gets(self.GROUP_LIST_KEY) + group_list, cas = self.gets(self.GROUP_LIST_KEY) group_list = set(group_list) group_list.remove(group_id) - if not self.client.cas(self.GROUP_LIST_KEY, - list(group_list), cas): + if not self.cas(self.GROUP_LIST_KEY, + list(group_list), cas): # Someone updated the group list before us, try again! raise _retry.TryAgain @@ -353,7 +403,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, @_translate_failures def _create_group(): - if not self.client.add(encoded_group, {}, noreply=False): + if not self.add(encoded_group, {}, noreply=False): raise coordination.GroupAlreadyExist(group_id) self._add_group_to_group_list(group_id) @@ -363,7 +413,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, @_translate_failures def _get_groups(): - return self.client.get(self.GROUP_LIST_KEY) or [] + return self.get(self.GROUP_LIST_KEY) or [] return MemcachedFutureResult(self._executor.submit(_get_groups)) @@ -373,7 +423,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, @_retry.retry() @_translate_failures def _join_group(): - group_members, cas = self.client.gets(encoded_group) + group_members, cas = self.gets(encoded_group) if group_members is None: raise coordination.GroupNotCreated(group_id) if self._member_id in group_members: @@ -382,7 +432,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, group_members[self._member_id] = { b"capabilities": capabilities, } - if not self.client.cas(encoded_group, group_members, cas): + if not self.cas(encoded_group, group_members, cas): # It changed, let's try again raise _retry.TryAgain self._joined_groups.add(group_id) @@ -395,13 +445,13 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, @_retry.retry() @_translate_failures def _leave_group(): - group_members, cas = self.client.gets(encoded_group) + group_members, cas = self.gets(encoded_group) if group_members is None: raise coordination.GroupNotCreated(group_id) if self._member_id not in group_members: raise coordination.MemberNotJoined(group_id, self._member_id) del group_members[self._member_id] - if not self.client.cas(encoded_group, group_members, cas): + if not self.cas(encoded_group, group_members, cas): # It changed, let's try again raise _retry.TryAgain self._joined_groups.discard(group_id) @@ -409,7 +459,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, return MemcachedFutureResult(self._executor.submit(_leave_group)) def _destroy_group(self, group_id): - self.client.delete(self._encode_group_id(group_id)) + self.delete(self._encode_group_id(group_id)) def delete_group(self, group_id): encoded_group = self._encode_group_id(group_id) @@ -417,16 +467,16 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, @_retry.retry() @_translate_failures def _delete_group(): - group_members, cas = self.client.gets(encoded_group) + group_members, cas = self.gets(encoded_group) if group_members is None: raise coordination.GroupNotCreated(group_id) if group_members != {}: raise coordination.GroupNotEmpty(group_id) # Delete is not atomic, so we first set the group to # using CAS, and then we delete it, to avoid race conditions. - if not self.client.cas(encoded_group, None, cas): + if not self.cas(encoded_group, None, cas): raise _retry.TryAgain - self.client.delete(encoded_group) + self.delete(encoded_group) self._remove_from_group_list(group_id) return MemcachedFutureResult(self._executor.submit(_delete_group)) @@ -435,18 +485,18 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, @_translate_failures def _get_members(self, group_id): encoded_group = self._encode_group_id(group_id) - group_members, cas = self.client.gets(encoded_group) + group_members, cas = self.gets(encoded_group) if group_members is None: raise coordination.GroupNotCreated(group_id) actual_group_members = {} for m, v in group_members.items(): # Never kick self from the group, we know we're alive if (m == self._member_id or - self.client.get(self._encode_member_id(m))): + self.get(self._encode_member_id(m))): actual_group_members[m] = v if group_members != actual_group_members: # There are some dead members, update the group - if not self.client.cas(encoded_group, actual_group_members, cas): + if not self.cas(encoded_group, actual_group_members, cas): # It changed, let's try again raise _retry.TryAgain return actual_group_members @@ -475,13 +525,13 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, @_retry.retry() @_translate_failures def _update_capabilities(): - group_members, cas = self.client.gets(encoded_group) + group_members, cas = self.gets(encoded_group) if group_members is None: raise coordination.GroupNotCreated(group_id) if self._member_id not in group_members: raise coordination.MemberNotJoined(group_id, self._member_id) group_members[self._member_id][b'capabilities'] = capabilities - if not self.client.cas(encoded_group, group_members, cas): + if not self.cas(encoded_group, group_members, cas): # It changed, try again raise _retry.TryAgain @@ -497,9 +547,9 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, @_translate_failures def heartbeat(self): - self.client.set(self._encode_member_id(self._member_id), - self.STILL_ALIVE, - expire=self.membership_timeout) + self.set(self._encode_member_id(self._member_id), + self.STILL_ALIVE, + expire=self.membership_timeout) # Reset the acquired locks for lock in self._acquired_locks: lock.heartbeat()