Merge "Add weight support to the hashring"
This commit is contained in:
@@ -37,33 +37,42 @@ class HashRing(object):
|
|||||||
:param nodes: List of nodes where objects will be mapped onto.
|
:param nodes: List of nodes where objects will be mapped onto.
|
||||||
:param partitions: Number of partitions to spread objects onto.
|
:param partitions: Number of partitions to spread objects onto.
|
||||||
"""
|
"""
|
||||||
self.nodes = set()
|
self.nodes = {}
|
||||||
self._ring = dict()
|
self._ring = dict()
|
||||||
self._partitions = []
|
self._partitions = []
|
||||||
self._partition_number = partitions
|
self._partition_number = partitions
|
||||||
|
|
||||||
self.add_nodes(set(nodes))
|
self.add_nodes(set(nodes))
|
||||||
|
|
||||||
def add_node(self, node):
|
def add_node(self, node, weight=1):
|
||||||
"""Add a node to the hashring.
|
"""Add a node to the hashring.
|
||||||
|
|
||||||
:param node: Node to add.
|
:param node: Node to add.
|
||||||
"""
|
:param weight: How many resource instances this node should manage
|
||||||
return self.add_nodes((node,))
|
compared to the other nodes (default 1). Higher weights will be
|
||||||
|
assigned more resources. Three nodes A, B and C with weights 1, 2 and 3
|
||||||
|
will each handle 1/6, 1/3 and 1/2 of the resources, respectively.
|
||||||
|
|
||||||
def add_nodes(self, nodes):
|
"""
|
||||||
|
return self.add_nodes((node,), weight)
|
||||||
|
|
||||||
|
def add_nodes(self, nodes, weight=1):
|
||||||
"""Add nodes to the hashring.
|
"""Add nodes to the hashring.
|
||||||
|
|
||||||
:param nodes: Nodes to add.
|
:param nodes: Nodes to add.
|
||||||
|
:param weight: How many resource instances this node should manage
|
||||||
|
compared to the other nodes (default 1). Higher weights will be
|
||||||
|
assigned more resources. Three nodes A, B and C with weights 1, 2 and 3
|
||||||
|
will each handle 1/6, 1/3 and 1/2 of the resources, respectively.
|
||||||
"""
|
"""
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
key = str(node).encode('utf-8')
|
key = str(node).encode('utf-8')
|
||||||
key_hash = hashlib.md5(key)
|
key_hash = hashlib.md5(key)
|
||||||
for r in six.moves.range(self._partition_number):
|
for r in six.moves.range(self._partition_number * weight):
|
||||||
key_hash.update(key)
|
key_hash.update(key)
|
||||||
self._ring[self._hash2int(key_hash)] = node
|
self._ring[self._hash2int(key_hash)] = node
|
||||||
|
|
||||||
self.nodes.add(node)
|
self.nodes[node] = weight
|
||||||
|
|
||||||
self._partitions = sorted(self._ring.keys())
|
self._partitions = sorted(self._ring.keys())
|
||||||
|
|
||||||
@@ -75,13 +84,13 @@ class HashRing(object):
|
|||||||
:param node: Node to remove.
|
:param node: Node to remove.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
self.nodes.remove(node)
|
weight = self.nodes.pop(node)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise UnknownNode(node)
|
raise UnknownNode(node)
|
||||||
|
|
||||||
key = str(node).encode('utf-8')
|
key = str(node).encode('utf-8')
|
||||||
key_hash = hashlib.md5(key)
|
key_hash = hashlib.md5(key)
|
||||||
for r in six.moves.range(self._partition_number):
|
for r in six.moves.range(self._partition_number * weight):
|
||||||
key_hash.update(key)
|
key_hash.update(key)
|
||||||
del self._ring[self._hash2int(key_hash)]
|
del self._ring[self._hash2int(key_hash)]
|
||||||
|
|
||||||
@@ -110,7 +119,7 @@ class HashRing(object):
|
|||||||
partition = self._get_partition(data)
|
partition = self._get_partition(data)
|
||||||
|
|
||||||
ignore_nodes = set(ignore_nodes) if ignore_nodes else set()
|
ignore_nodes = set(ignore_nodes) if ignore_nodes else set()
|
||||||
candidates = self.nodes - ignore_nodes
|
candidates = set(self.nodes.keys()) - ignore_nodes
|
||||||
|
|
||||||
replicas = min(replicas, len(candidates))
|
replicas = min(replicas, len(candidates))
|
||||||
|
|
||||||
|
|||||||
@@ -46,29 +46,39 @@ class HashRingTestCase(testcase.TestCase):
|
|||||||
self.assertIn(int(r2, 16), ring._ring)
|
self.assertIn(int(r2, 16), ring._ring)
|
||||||
|
|
||||||
def test_create_ring(self):
|
def test_create_ring(self):
|
||||||
nodes = ['foo', 'bar']
|
nodes = {'foo', 'bar'}
|
||||||
ring = hashring.HashRing(nodes)
|
ring = hashring.HashRing(nodes)
|
||||||
self.assertEqual(set(nodes), ring.nodes)
|
self.assertEqual(nodes, set(ring.nodes.keys()))
|
||||||
self.assertEqual(2 ** 5 * 2, len(ring))
|
self.assertEqual(2 ** 5 * 2, len(ring))
|
||||||
|
|
||||||
def test_add_node(self):
|
def test_add_node(self):
|
||||||
nodes = ['foo', 'bar']
|
nodes = {'foo', 'bar'}
|
||||||
ring = hashring.HashRing(nodes)
|
ring = hashring.HashRing(nodes)
|
||||||
self.assertEqual(set(nodes), ring.nodes)
|
self.assertEqual(nodes, set(ring.nodes.keys()))
|
||||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||||
nodes.append('baz')
|
nodes.add('baz')
|
||||||
ring.add_node('baz')
|
ring.add_node('baz')
|
||||||
self.assertEqual(set(nodes), ring.nodes)
|
self.assertEqual(nodes, set(ring.nodes.keys()))
|
||||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||||
|
|
||||||
def test_remove_node(self):
|
def test_add_node_weight(self):
|
||||||
nodes = ['foo', 'bar']
|
nodes = {'foo', 'bar'}
|
||||||
ring = hashring.HashRing(nodes)
|
ring = hashring.HashRing(nodes)
|
||||||
self.assertEqual(set(nodes), ring.nodes)
|
self.assertEqual(nodes, set(ring.nodes.keys()))
|
||||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||||
nodes.remove('bar')
|
nodes.add('baz')
|
||||||
|
ring.add_node('baz', weight=10)
|
||||||
|
self.assertEqual(nodes, set(ring.nodes.keys()))
|
||||||
|
self.assertEqual(2 ** 5 * 12, len(ring))
|
||||||
|
|
||||||
|
def test_remove_node(self):
|
||||||
|
nodes = {'foo', 'bar'}
|
||||||
|
ring = hashring.HashRing(nodes)
|
||||||
|
self.assertEqual(nodes, set(ring.nodes.keys()))
|
||||||
|
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||||
|
nodes.discard('bar')
|
||||||
ring.remove_node('bar')
|
ring.remove_node('bar')
|
||||||
self.assertEqual(set(nodes), ring.nodes)
|
self.assertEqual(nodes, set(ring.nodes.keys()))
|
||||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||||
|
|
||||||
def test_remove_node_unknown(self):
|
def test_remove_node_unknown(self):
|
||||||
@@ -79,17 +89,17 @@ class HashRingTestCase(testcase.TestCase):
|
|||||||
ring.remove_node, 'biz')
|
ring.remove_node, 'biz')
|
||||||
|
|
||||||
def test_add_then_removenode(self):
|
def test_add_then_removenode(self):
|
||||||
nodes = ['foo', 'bar']
|
nodes = {'foo', 'bar'}
|
||||||
ring = hashring.HashRing(nodes)
|
ring = hashring.HashRing(nodes)
|
||||||
self.assertEqual(set(nodes), ring.nodes)
|
self.assertEqual(nodes, set(ring.nodes.keys()))
|
||||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||||
nodes.append('baz')
|
nodes.add('baz')
|
||||||
ring.add_node('baz')
|
ring.add_node('baz')
|
||||||
self.assertEqual(set(nodes), ring.nodes)
|
self.assertEqual(nodes, set(ring.nodes.keys()))
|
||||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||||
nodes.remove('bar')
|
nodes.discard('bar')
|
||||||
ring.remove_node('bar')
|
ring.remove_node('bar')
|
||||||
self.assertEqual(set(nodes), ring.nodes)
|
self.assertEqual(nodes, set(ring.nodes.keys()))
|
||||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||||
|
|
||||||
def test_distribution_one_replica(self):
|
def test_distribution_one_replica(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user