coordination, zookeeper: add get_leader()

Change-Id: I6b62057221b0a382c5605d730c439054e9bb1649
Co-Authored-By: Sahid Ferdjaoui <sahid.ferdjaoui@cloudwatt.com>
This commit is contained in:
Julien Danjou 2014-04-01 13:36:46 +02:00
parent dda34fac81
commit d991188d7c
3 changed files with 48 additions and 5 deletions

View File

@ -257,6 +257,16 @@ class CoordinationDriver(object):
:rtype: CoordAsyncResult
"""
@staticmethod
def get_leader(group_id):
"""Return the leader for a group.
:param group_id: the id of the group:
:returns: the leader
:rtype: CoordAsyncResult
"""
raise NotImplementedError
@staticmethod
def heartbeat():
"""Method to run once in a while to be sure that the member is not dead

View File

@ -302,6 +302,21 @@ class KazooDriver(BaseZooKeeperDriver):
return True
return False
def _get_group_leader_lock(self, group_id):
if group_id not in self._leader_locks:
self._leader_locks[group_id] = self._coord.Lock(
self._path_group(group_id) + "/leader",
self._member_id.decode('ascii'))
return self._leader_locks[group_id]
def get_leader(self, group_id):
contenders = self._get_group_leader_lock(group_id).contenders()
if contenders and contenders[0]:
leader = contenders[0].encode('ascii')
else:
leader = None
return ZooAsyncResult(None, lambda *args: leader)
def run_watchers(self):
ret = []
while True:
@ -312,11 +327,7 @@ class KazooDriver(BaseZooKeeperDriver):
ret.extend(cb())
for group_id in six.iterkeys(self._hooks_elected_leader):
if group_id not in self._leader_locks:
self._leader_locks[group_id] = self._coord.Lock(
self._path_group(group_id))
if self._leader_locks[group_id].acquire(blocking=False):
if self._get_group_leader_lock(group_id).acquire(blocking=False):
# We are now leader for this group
self._hooks_elected_leader[group_id].run(
coordination.LeaderElected(

View File

@ -378,6 +378,8 @@ class TestAPI(testscenarios.TestWithScenarios,
self.event.member_id)
self.assertEqual(self.group_id,
self.event.group_id)
self.assertEqual(self._coord.get_leader(self.group_id).get(),
self.member_id)
self.event = None
@ -390,6 +392,26 @@ class TestAPI(testscenarios.TestWithScenarios,
self.event.member_id)
self.assertEqual(self.group_id,
self.event.group_id)
self.assertEqual(client2.get_leader(self.group_id).get(),
member_id_test2)
def test_get_leader(self):
self._coord.create_group(self.group_id).get()
leader = self._coord.get_leader(self.group_id).get()
self.assertEqual(leader, None)
self._coord.join_group(self.group_id).get()
leader = self._coord.get_leader(self.group_id).get()
self.assertEqual(leader, None)
# Let's get elected
self._coord.watch_elected_as_leader(self.group_id, self._set_event)
self._coord.run_watchers()
leader = self._coord.get_leader(self.group_id).get()
self.assertEqual(leader, self.member_id)
def test_run_for_election_multiple_clients_stand_down(self):
self._coord.create_group(self.group_id).get()