get weight of existing members
partitioner does not capture the weight of existing members so members do not have the same weights for each other depending on when they joined Change-Id: Ibb2a9ed92d2b38c4ab888133cf66c14cf8a5a052
This commit is contained in:
parent
f624e16f62
commit
3994c470d3
|
@ -35,14 +35,17 @@ class Partitioner(object):
|
|||
|
||||
def __init__(self, coordinator, group_id,
|
||||
partitions=DEFAULT_PARTITION_NUMBER):
|
||||
members = coordinator.get_members(group_id)
|
||||
self.partitions = partitions
|
||||
self.group_id = group_id
|
||||
self._coord = coordinator
|
||||
caps = [(m, self._coord.get_member_capabilities(self.group_id, m))
|
||||
for m in members.get()]
|
||||
self._coord.watch_join_group(self.group_id, self._on_member_join)
|
||||
self._coord.watch_leave_group(self.group_id, self._on_member_leave)
|
||||
members = self._coord.get_members(self.group_id)
|
||||
self.ring = hashring.HashRing(members.get(),
|
||||
partitions=self.partitions)
|
||||
self.ring = hashring.HashRing([], partitions=self.partitions)
|
||||
for m_id, cap in caps:
|
||||
self.ring.add_node(m_id, utils.loads(cap.get()).get("weight", 1))
|
||||
|
||||
def _on_member_join(self, event):
|
||||
try:
|
||||
|
|
|
@ -30,12 +30,12 @@ class TestPartitioner(tests.TestWithCoordinator):
|
|||
c.stop()
|
||||
super(TestPartitioner, self).tearDown()
|
||||
|
||||
def _add_members(self, number_of_members):
|
||||
def _add_members(self, number_of_members, weight=1):
|
||||
for _ in six.moves.range(number_of_members):
|
||||
m = tests.get_random_uuid()
|
||||
coord = coordination.get_coordinator(self.url, m)
|
||||
coord.start()
|
||||
coord.join_partitioned_group(self.group_id)
|
||||
coord.join_partitioned_group(self.group_id, weight=weight)
|
||||
self._extra_coords.append(coord)
|
||||
self._coord.run_watchers()
|
||||
|
||||
|
@ -60,6 +60,19 @@ class TestPartitioner(tests.TestWithCoordinator):
|
|||
self.assertEqual(1, len(p.ring.nodes))
|
||||
p.stop()
|
||||
|
||||
def test_hashring_weight(self):
|
||||
p = self._coord.join_partitioned_group(self.group_id, weight=5)
|
||||
self.assertEqual([5], list(p.ring.nodes.values()))
|
||||
coord = coordination.get_coordinator(self.url, tests.get_random_uuid())
|
||||
coord.start()
|
||||
p2 = coord.join_partitioned_group(self.group_id, weight=10)
|
||||
self._extra_coords.append(coord)
|
||||
self._coord.run_watchers()
|
||||
self.assertEqual(set([5, 10]), set(p.ring.nodes.values()))
|
||||
self.assertEqual(set([5, 10]), set(p2.ring.nodes.values()))
|
||||
p.stop()
|
||||
p2.stop()
|
||||
|
||||
def test_stop(self):
|
||||
p = self._coord.join_partitioned_group(self.group_id)
|
||||
p.stop()
|
||||
|
|
Loading…
Reference in New Issue