coordination, zookeeper: implement leader election

Change-Id: Ic3d1934b87be37ba56f744abd045adb634bcdb08
This commit is contained in:
Julien Danjou 2014-04-01 17:28:38 +02:00
parent 3123c5a1fe
commit dda34fac81
4 changed files with 183 additions and 4 deletions

View File

@ -48,12 +48,21 @@ class MemberLeftGroup(Event):
self.member_id = member_id
class LeaderElected(Event):
"""A leader as been elected."""
def __init__(self, group_id, member_id):
self.group_id = group_id
self.member_id = member_id
@six.add_metaclass(abc.ABCMeta)
class CoordinationDriver(object):
def __init__(self):
self._hooks_join_group = collections.defaultdict(Hooks)
self._hooks_leave_group = collections.defaultdict(Hooks)
self._hooks_elected_leader = collections.defaultdict(Hooks)
# A cache for group members
self._group_members = collections.defaultdict(set)
@ -118,6 +127,44 @@ class CoordinationDriver(object):
and group_id in self._group_members):
del self._group_members[group_id]
@abc.abstractmethod
def watch_elected_as_leader(self, group_id, callback):
"""Call a function when member gets elected as leader.
The callback functions will be executed when `run_watchers` is
called.
:param group_id: The group id to watch
:param callback: The function to execute when a member leaves this
group
"""
self._hooks_elected_leader[group_id].append(callback)
@abc.abstractmethod
def unwatch_elected_as_leader(self, group_id, callback):
"""Call a function when member gets elected as leader.
The callback functions will be executed when `run_watchers` is
called.
:param group_id: The group id to watch
:param callback: The function to execute when a member leaves this
group
"""
self._hooks_elected_leader[group_id].remove(callback)
if not self._hooks.elected_leader[group_id]:
del self._hooks.elected_leader[group_id]
@staticmethod
def stand_down_group_leader(group_id):
"""Stand down as the group leader if we are.
:param group_id: The group where we don't want to be a leader anymore
"""
raise NotImplementedError
def start(self, timeout):
"""Start the service engine.

View File

@ -237,6 +237,14 @@ class MemcachedDriver(coordination.CoordinationDriver):
return super(MemcachedDriver, self).unwatch_leave_group(
group_id, callback)
@staticmethod
def watch_elected_as_leader(group_id, callback):
raise NotImplementedError
@staticmethod
def unwatch_elected_as_leader(group_id, callback):
raise NotImplementedError
def run_watchers(self):
result = []
for group_id in self.client.get(self._GROUP_LIST_KEY):

View File

@ -42,7 +42,8 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
raise coordination.ToozError("operation error: %s" % (e))
self._group_members = collections.defaultdict(set)
self._children_changes = six.moves.queue.Queue()
self._watchers = six.moves.queue.Queue()
self._leader_locks = {}
def stop(self):
self._coord.stop()
@ -216,7 +217,7 @@ class KazooDriver(BaseZooKeeperDriver):
# Copy function in case it's removed later from the
# hook list
hooks = copy.copy(self._hooks_join_group[group_id])
self._children_changes.put(
self._watchers.put(
lambda: hooks.run(
coordination.MemberJoinedGroup(
group_id,
@ -226,7 +227,7 @@ class KazooDriver(BaseZooKeeperDriver):
# Copy function in case it's removed later from the
# hook list
hooks = copy.copy(self._hooks_leave_group[group_id])
self._children_changes.put(
self._watchers.put(
lambda: hooks.run(
coordination.MemberLeftGroup(
group_id,
@ -287,14 +288,41 @@ class KazooDriver(BaseZooKeeperDriver):
return super(BaseZooKeeperDriver, self).unwatch_leave_group(
group_id, callback)
def watch_elected_as_leader(self, group_id, callback):
return super(BaseZooKeeperDriver, self).watch_elected_as_leader(
group_id, callback)
def unwatch_elected_as_leader(self, group_id, callback):
return super(BaseZooKeeperDriver, self).unwatch_elected_as_leader(
group_id, callback)
def stand_down_group_leader(self, group_id):
if group_id in self._leader_locks:
self._leader_locks[group_id].release()
return True
return False
def run_watchers(self):
ret = []
while True:
try:
cb = self._children_changes.get(block=False)
cb = self._watchers.get(block=False)
except six.moves.queue.Empty:
break
ret.extend(cb())
for group_id in six.iterkeys(self._hooks_elected_leader):
if group_id not in self._leader_locks:
self._leader_locks[group_id] = self._coord.Lock(
self._path_group(group_id))
if self._leader_locks[group_id].acquire(blocking=False):
# We are now leader for this group
self._hooks_elected_leader[group_id].run(
coordination.LeaderElected(
group_id,
self._member_id))
return ret
@ -328,6 +356,14 @@ class ZakeDriver(BaseZooKeeperDriver):
def unwatch_leave_group(group_id, callback):
raise NotImplementedError
@staticmethod
def watch_elected_as_leader(group_id, callback):
raise NotImplementedError
@staticmethod
def unwatch_elected_as_leader(group_id, callback):
raise NotImplementedError
@staticmethod
def run_watchers():
raise NotImplementedError

View File

@ -347,6 +347,94 @@ class TestAPI(testscenarios.TestWithScenarios,
lambda: None)
self.assertEqual(0, len(self._coord._hooks_leave_group[self.group_id]))
def test_run_for_election(self):
self._coord.create_group(self.group_id).get()
self._coord.watch_elected_as_leader(self.group_id, self._set_event)
self._coord.run_watchers()
self.assertIsInstance(self.event,
tooz.coordination.LeaderElected)
self.assertEqual(self.member_id,
self.event.member_id)
self.assertEqual(self.group_id,
self.event.group_id)
def test_run_for_election_multiple_clients(self):
self._coord.create_group(self.group_id).get()
self._coord.watch_elected_as_leader(self.group_id, self._set_event)
self._coord.run_watchers()
member_id_test2 = self._get_random_uuid()
client2 = tooz.coordination.get_coordinator(self.backend,
member_id_test2,
**self.kwargs)
client2.start()
client2.watch_elected_as_leader(self.group_id, self._set_event)
client2.run_watchers()
self.assertIsInstance(self.event,
tooz.coordination.LeaderElected)
self.assertEqual(self.member_id,
self.event.member_id)
self.assertEqual(self.group_id,
self.event.group_id)
self.event = None
self._coord.stop()
client2.run_watchers()
self.assertIsInstance(self.event,
tooz.coordination.LeaderElected)
self.assertEqual(member_id_test2,
self.event.member_id)
self.assertEqual(self.group_id,
self.event.group_id)
def test_run_for_election_multiple_clients_stand_down(self):
self._coord.create_group(self.group_id).get()
self._coord.watch_elected_as_leader(self.group_id, self._set_event)
self._coord.run_watchers()
member_id_test2 = self._get_random_uuid()
client2 = tooz.coordination.get_coordinator(self.backend,
member_id_test2,
**self.kwargs)
client2.start()
client2.watch_elected_as_leader(self.group_id, self._set_event)
client2.run_watchers()
self.assertIsInstance(self.event,
tooz.coordination.LeaderElected)
self.assertEqual(self.member_id,
self.event.member_id)
self.assertEqual(self.group_id,
self.event.group_id)
self.event = None
self._coord.stand_down_group_leader(self.group_id)
client2.run_watchers()
self.assertIsInstance(self.event,
tooz.coordination.LeaderElected)
self.assertEqual(member_id_test2,
self.event.member_id)
self.assertEqual(self.group_id,
self.event.group_id)
self.event = None
client2.stand_down_group_leader(self.group_id)
self._coord.run_watchers()
self.assertIsInstance(self.event,
tooz.coordination.LeaderElected)
self.assertEqual(self.member_id,
self.event.member_id)
self.assertEqual(self.group_id,
self.event.group_id)
@staticmethod
def _get_random_uuid():
return str(uuid.uuid4()).encode('ascii')