diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py index 54da9d0e..bc7e6453 100644 --- a/tooz/drivers/redis.py +++ b/tooz/drivers/redis.py @@ -611,6 +611,15 @@ class RedisDriver(coordination.CoordinationDriver): group_members = set() else: group_members = set(group_members) + # I was booted out... + # + # TODO(harlowja): perhaps we should have a way to notify + # watchers that this has happened (the below mechanism will + # also do this, but it might be better to have a separate + # way when 'self' membership is lost)? + if (group_id in self._joined_groups and + self._member_id not in group_members): + self._joined_groups.discard(group_id) old_group_members = self._group_members.get(group_id, set()) for member_id in (group_members - old_group_members): result.extend( diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py index a15a0d65..a48cfda2 100644 --- a/tooz/drivers/zookeeper.py +++ b/tooz/drivers/zookeeper.py @@ -266,7 +266,7 @@ class KazooDriver(BaseZooKeeperDriver): lambda: hooks.run( coordination.MemberJoinedGroup( group_id, - member_id.encode('ascii')))) + utils.to_binary(member_id)))) for member_id in (last_children - children): # Copy function in case it's removed later from the @@ -276,7 +276,7 @@ class KazooDriver(BaseZooKeeperDriver): lambda: hooks.run( coordination.MemberLeftGroup( group_id, - member_id.encode('ascii')))) + utils.to_binary(member_id)))) self._group_members[group_id] = children diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py index ace3093e..8e996df6 100644 --- a/tooz/tests/test_coordination.py +++ b/tooz/tests/test_coordination.py @@ -373,6 +373,34 @@ class TestAPI(testscenarios.TestWithScenarios, lambda: None) self.assertEqual(0, len(self._coord._hooks_join_group[self.group_id])) + def test_watch_join_group_booted_out(self): + self._coord.create_group(self.group_id).get() + self._coord.join_group(self.group_id).get() + self._coord.watch_join_group(self.group_id, self._set_event) + self._coord.watch_leave_group(self.group_id, self._set_event) + + member_id_test2 = self._get_random_uuid() + client2 = tooz.coordination.get_coordinator(self.url, + member_id_test2) + client2.start() + client2.join_group(self.group_id).get() + + while True: + if self._coord.run_watchers(): + break + + client3 = tooz.coordination.get_coordinator(self.url, self.member_id) + client3.start() + client3.leave_group(self.group_id) + + # Only works for clients that have access to the groups they are part + # of, to ensure that after we got booted out by client3 that this + # client now no longer believes its part of the group. + if hasattr(self._coord, '_joined_groups'): + self.assertIn(self.group_id, self._coord._joined_groups) + self._coord.run_watchers() + self.assertNotIn(self.group_id, self._coord._joined_groups) + def test_watch_leave_group_non_existent(self): self.assertRaises(tooz.coordination.GroupNotCreated, self._coord.watch_leave_group, diff --git a/tooz/utils.py b/tooz/utils.py index 8720d7d8..3cc11d41 100644 --- a/tooz/utils.py +++ b/tooz/utils.py @@ -23,3 +23,10 @@ def exception_message(exc): return six.text_type(exc) except UnicodeError: return str(exc) + + +def to_binary(text, encoding='ascii'): + """Return the binary representation of string (if not already binary).""" + if not isinstance(text, six.binary_type): + text = text.encode(encoding) + return text