Merge "memcached: switch leader election implementation to a lock"
This commit is contained in:
commit
0525da6acf
|
@ -71,6 +71,9 @@ class MemcachedLock(locking.Lock):
|
|||
self.coord.client.touch(self.name,
|
||||
expire=self.timeout)
|
||||
|
||||
def get_owner(self):
|
||||
return self.coord.client.get(self.name)
|
||||
|
||||
|
||||
class MemcachedDriver(coordination.CoordinationDriver):
|
||||
|
||||
|
@ -123,13 +126,12 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
self.heartbeat()
|
||||
|
||||
def stop(self):
|
||||
for lock in self._acquired_locks:
|
||||
lock.release()
|
||||
|
||||
self.client.delete(self._encode_member_id(self._member_id))
|
||||
map(self.leave_group, list(self._groups))
|
||||
|
||||
for group_id in six.iterkeys(self._hooks_elected_leader):
|
||||
if self.get_leader(group_id).get() == self._member_id:
|
||||
self.client.delete(self._encode_group_leader(group_id))
|
||||
|
||||
self.client.close()
|
||||
|
||||
def _encode_group_id(self, group_id):
|
||||
|
@ -255,7 +257,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
|
||||
def get_leader(self, group_id):
|
||||
return MemcachedAsyncResult(
|
||||
self.client.get(self._encode_group_leader(group_id)))
|
||||
self._get_leader_lock(group_id).get_owner())
|
||||
|
||||
def heartbeat(self):
|
||||
self.client.set(self._encode_member_id(self._member_id),
|
||||
|
@ -265,11 +267,6 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
for lock in self._acquired_locks:
|
||||
lock.heartbeat()
|
||||
|
||||
for group_id in six.iterkeys(self._hooks_elected_leader):
|
||||
if self.get_leader(group_id).get() == self._member_id:
|
||||
self.client.touch(self._encode_group_leader(group_id),
|
||||
expire=self.leader_timeout)
|
||||
|
||||
def _init_watch_group(self, group_id):
|
||||
members = self.client.get(self._encode_group_id(group_id))
|
||||
if members is None:
|
||||
|
@ -307,6 +304,10 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
def get_lock(self, name):
|
||||
return MemcachedLock(self, name, self.lock_timeout)
|
||||
|
||||
def _get_leader_lock(self, group_id):
|
||||
return MemcachedLock(self, self._encode_group_leader(group_id),
|
||||
self.leader_timeout)
|
||||
|
||||
def run_watchers(self):
|
||||
result = []
|
||||
for group_id in self.client.get(self._GROUP_LIST_KEY):
|
||||
|
@ -328,18 +329,14 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
|
||||
self._group_members[group_id] = group_members
|
||||
|
||||
for group_id in six.iterkeys(self._hooks_elected_leader):
|
||||
lock_id = self._encode_group_leader(group_id)
|
||||
for group_id, hooks in six.iteritems(self._hooks_elected_leader):
|
||||
# Try to grab the lock, if that fails, that means someone has it
|
||||
# already.
|
||||
if self.client.add(lock_id, self._member_id,
|
||||
expire=self.leader_timeout,
|
||||
noreply=False):
|
||||
if self._get_leader_lock(group_id).acquire(blocking=False):
|
||||
# We got the lock
|
||||
self._hooks_elected_leader[group_id].run(
|
||||
coordination.LeaderElected(
|
||||
group_id,
|
||||
self._member_id))
|
||||
hooks.run(coordination.LeaderElected(
|
||||
group_id,
|
||||
self._member_id))
|
||||
|
||||
return result
|
||||
|
||||
|
|
Loading…
Reference in New Issue