From 7221c70469db19e620e1d0ae77c25311d8f5d04e Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Thu, 21 Aug 2014 17:09:56 +0200 Subject: [PATCH] memcached: switch leader election implementation to a lock We reuse our lock code to manage the leader election, a.k.a. eat our own dog food. This also fixes the issue that all lock were not released when stop() was called. Change-Id: Ibbec3e8ac29d55247ec3b1b8941d5a9569b67677 --- tooz/drivers/memcached.py | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index 026bd63f..7f24c5ca 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -71,6 +71,9 @@ class MemcachedLock(locking.Lock): self.coord.client.touch(self.name, expire=self.timeout) + def get_owner(self): + return self.coord.client.get(self.name) + class MemcachedDriver(coordination.CoordinationDriver): @@ -123,13 +126,12 @@ class MemcachedDriver(coordination.CoordinationDriver): self.heartbeat() def stop(self): + for lock in self._acquired_locks: + lock.release() + self.client.delete(self._encode_member_id(self._member_id)) map(self.leave_group, list(self._groups)) - for group_id in six.iterkeys(self._hooks_elected_leader): - if self.get_leader(group_id).get() == self._member_id: - self.client.delete(self._encode_group_leader(group_id)) - self.client.close() def _encode_group_id(self, group_id): @@ -255,7 +257,7 @@ class MemcachedDriver(coordination.CoordinationDriver): def get_leader(self, group_id): return MemcachedAsyncResult( - self.client.get(self._encode_group_leader(group_id))) + self._get_leader_lock(group_id).get_owner()) def heartbeat(self): self.client.set(self._encode_member_id(self._member_id), @@ -265,11 +267,6 @@ class MemcachedDriver(coordination.CoordinationDriver): for lock in self._acquired_locks: lock.heartbeat() - for group_id in six.iterkeys(self._hooks_elected_leader): - if self.get_leader(group_id).get() == self._member_id: - self.client.touch(self._encode_group_leader(group_id), - expire=self.leader_timeout) - def _init_watch_group(self, group_id): members = self.client.get(self._encode_group_id(group_id)) if members is None: @@ -307,6 +304,10 @@ class MemcachedDriver(coordination.CoordinationDriver): def get_lock(self, name): return MemcachedLock(self, name, self.lock_timeout) + def _get_leader_lock(self, group_id): + return MemcachedLock(self, self._encode_group_leader(group_id), + self.leader_timeout) + def run_watchers(self): result = [] for group_id in self.client.get(self._GROUP_LIST_KEY): @@ -328,18 +329,14 @@ class MemcachedDriver(coordination.CoordinationDriver): self._group_members[group_id] = group_members - for group_id in six.iterkeys(self._hooks_elected_leader): - lock_id = self._encode_group_leader(group_id) + for group_id, hooks in six.iteritems(self._hooks_elected_leader): # Try to grab the lock, if that fails, that means someone has it # already. - if self.client.add(lock_id, self._member_id, - expire=self.leader_timeout, - noreply=False): + if self._get_leader_lock(group_id).acquire(blocking=False): # We got the lock - self._hooks_elected_leader[group_id].run( - coordination.LeaderElected( - group_id, - self._member_id)) + hooks.run(coordination.LeaderElected( + group_id, + self._member_id)) return result