Add a hashring implementation
This is a consistent hash ring implementation based on the one that can be found in Nova, Ironic, Ceilometer and Aodh. Change-Id: I5d2f4efcd354a187747fa645482db2029a1a14b7
This commit is contained in:
parent
04866ab9c3
commit
e29ca79ed9
10
doc/source/tutorial/hashring.rst
Normal file
10
doc/source/tutorial/hashring.rst
Normal file
@ -0,0 +1,10 @@
|
||||
===========
|
||||
Hash ring
|
||||
===========
|
||||
|
||||
Tooz provides a consistent hash ring implementation. It can be used to map
|
||||
objects (represented via binary keys) to one or several nodes. When the node
|
||||
list changes, the rebalancing of objects across the ring is kept minimal.
|
||||
|
||||
.. literalinclude:: ../../../examples/hashring.py
|
||||
:language: python
|
@ -12,3 +12,4 @@ use tooz in your application.
|
||||
group_membership
|
||||
leader_election
|
||||
lock
|
||||
hashring
|
||||
|
15
examples/hashring.py
Normal file
15
examples/hashring.py
Normal file
@ -0,0 +1,15 @@
|
||||
from tooz import hashring
|
||||
|
||||
hashring = hashring.HashRing({'node1', 'node2', 'node3'})
|
||||
|
||||
# Returns set(['node2'])
|
||||
nodes_for_foo = hashring[b'foo']
|
||||
|
||||
# Returns set(['node2', 'node3'])
|
||||
nodes_for_foo_with_replicas = hashring.get_nodes(b'foo',
|
||||
replicas=2)
|
||||
|
||||
# Returns set(['node1', 'node3'])
|
||||
nodes_for_foo_with_replicas = hashring.get_nodes(b'foo',
|
||||
replicas=2,
|
||||
ignore_nodes={'node2'})
|
3
releasenotes/notes/hashring-0470f9119ef63d49.yaml
Normal file
3
releasenotes/notes/hashring-0470f9119ef63d49.yaml
Normal file
@ -0,0 +1,3 @@
|
||||
---
|
||||
features:
|
||||
- Add `tooz.hashring`, a consistent hash ring implementation.
|
133
tooz/hashring.py
Normal file
133
tooz/hashring.py
Normal file
@ -0,0 +1,133 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (C) 2016 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import bisect
|
||||
import hashlib
|
||||
|
||||
import six
|
||||
|
||||
import tooz
|
||||
|
||||
|
||||
class UnknownNode(tooz.ToozError):
|
||||
"""Node is unknown."""
|
||||
def __init__(self, node):
|
||||
super(UnknownNode, self).__init__("Unknown node `%s'" % node)
|
||||
self.node = node
|
||||
|
||||
|
||||
class HashRing(object):
|
||||
"""Map objects onto nodes based on their consistent hash."""
|
||||
|
||||
def __init__(self, nodes, partitions=2**5):
|
||||
"""Create a new hashring.
|
||||
|
||||
:param nodes: List of nodes where objects will be mapped onto.
|
||||
:param partitions: Number of partitions to spread objects onto.
|
||||
"""
|
||||
self.nodes = set()
|
||||
self._ring = dict()
|
||||
self._partitions = []
|
||||
self._partition_number = partitions
|
||||
|
||||
self.add_nodes(set(nodes))
|
||||
|
||||
def add_node(self, node):
|
||||
"""Add a node to the hashring.
|
||||
|
||||
:param node: Node to add.
|
||||
"""
|
||||
return self.add_nodes((node,))
|
||||
|
||||
def add_nodes(self, nodes):
|
||||
"""Add nodes to the hashring.
|
||||
|
||||
:param nodes: Nodes to add.
|
||||
"""
|
||||
for node in nodes:
|
||||
key = str(node).encode('utf-8')
|
||||
key_hash = hashlib.md5(key)
|
||||
for r in six.moves.range(self._partition_number):
|
||||
key_hash.update(key)
|
||||
self._ring[self._hash2int(key_hash)] = node
|
||||
|
||||
self.nodes.add(node)
|
||||
|
||||
self._partitions = sorted(self._ring.keys())
|
||||
|
||||
def remove_node(self, node):
|
||||
"""Remove a node from the hashring.
|
||||
|
||||
Raises py:exc:`UnknownNode`
|
||||
|
||||
:param node: Node to remove.
|
||||
"""
|
||||
try:
|
||||
self.nodes.remove(node)
|
||||
except KeyError:
|
||||
raise UnknownNode(node)
|
||||
|
||||
key = str(node).encode('utf-8')
|
||||
key_hash = hashlib.md5(key)
|
||||
for r in six.moves.range(self._partition_number):
|
||||
key_hash.update(key)
|
||||
del self._ring[self._hash2int(key_hash)]
|
||||
|
||||
self._partitions = sorted(self._ring.keys())
|
||||
|
||||
@staticmethod
|
||||
def _hash2int(key):
|
||||
return int(key.hexdigest(), 16)
|
||||
|
||||
def _get_partition(self, data):
|
||||
hashed_key = self._hash2int(hashlib.md5(data))
|
||||
position = bisect.bisect(self._partitions, hashed_key)
|
||||
return position if position < len(self._partitions) else 0
|
||||
|
||||
def _get_node(self, partition):
|
||||
return self._ring[self._partitions[partition]]
|
||||
|
||||
def get_nodes(self, data, ignore_nodes=None, replicas=1):
|
||||
"""Get the set of nodes which the supplied data map onto.
|
||||
|
||||
:param data: A byte identifier to be mapped across the ring.
|
||||
:param ignore_nodes: Set of nodes to ignore.
|
||||
:param replicas: Number of replicas to use.
|
||||
:return: A set of nodes whose length depends on the number of replicas.
|
||||
"""
|
||||
partition = self._get_partition(data)
|
||||
|
||||
ignore_nodes = set(ignore_nodes) if ignore_nodes else set()
|
||||
candidates = self.nodes - ignore_nodes
|
||||
|
||||
replicas = min(replicas, len(candidates))
|
||||
|
||||
nodes = set()
|
||||
for replica in six.moves.range(0, replicas):
|
||||
node = self._get_node(partition)
|
||||
while node in nodes or node in ignore_nodes:
|
||||
partition += 1
|
||||
if partition >= len(self._partitions):
|
||||
partition = 0
|
||||
node = self._get_node(partition)
|
||||
nodes.add(node)
|
||||
|
||||
return nodes
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.get_nodes(key)
|
||||
|
||||
def __len__(self):
|
||||
return len(self._partitions)
|
192
tooz/tests/test_hashring.py
Normal file
192
tooz/tests/test_hashring.py
Normal file
@ -0,0 +1,192 @@
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import hashlib
|
||||
|
||||
import mock
|
||||
from testtools import matchers
|
||||
from testtools import testcase
|
||||
|
||||
from tooz import hashring
|
||||
|
||||
|
||||
class HashRingTestCase(testcase.TestCase):
|
||||
|
||||
# NOTE(deva): the mapping used in these tests is as follows:
|
||||
# if nodes = [foo, bar]:
|
||||
# fake -> foo, bar
|
||||
# if nodes = [foo, bar, baz]:
|
||||
# fake -> foo, bar, baz
|
||||
# fake-again -> bar, baz, foo
|
||||
|
||||
@mock.patch.object(hashlib, 'md5', autospec=True)
|
||||
def test__hash2int_returns_int(self, mock_md5):
|
||||
r1 = 32 * 'a'
|
||||
r2 = 32 * 'b'
|
||||
# 2**PARTITION_EXPONENT calls to md5.update per node
|
||||
# PARTITION_EXPONENT is currently always 5, so 32 calls each here
|
||||
mock_md5.return_value.hexdigest.side_effect = [r1] * 32 + [r2] * 32
|
||||
|
||||
nodes = ['foo', 'bar']
|
||||
ring = hashring.HashRing(nodes)
|
||||
|
||||
self.assertIn(int(r1, 16), ring._ring)
|
||||
self.assertIn(int(r2, 16), ring._ring)
|
||||
|
||||
def test_create_ring(self):
|
||||
nodes = ['foo', 'bar']
|
||||
ring = hashring.HashRing(nodes)
|
||||
self.assertEqual(set(nodes), ring.nodes)
|
||||
self.assertEqual(2 ** 5 * 2, len(ring))
|
||||
|
||||
def test_add_node(self):
|
||||
nodes = ['foo', 'bar']
|
||||
ring = hashring.HashRing(nodes)
|
||||
self.assertEqual(set(nodes), ring.nodes)
|
||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||
nodes.append('baz')
|
||||
ring.add_node('baz')
|
||||
self.assertEqual(set(nodes), ring.nodes)
|
||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||
|
||||
def test_remove_node(self):
|
||||
nodes = ['foo', 'bar']
|
||||
ring = hashring.HashRing(nodes)
|
||||
self.assertEqual(set(nodes), ring.nodes)
|
||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||
nodes.remove('bar')
|
||||
ring.remove_node('bar')
|
||||
self.assertEqual(set(nodes), ring.nodes)
|
||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||
|
||||
def test_remove_node_unknown(self):
|
||||
nodes = ['foo', 'bar']
|
||||
ring = hashring.HashRing(nodes)
|
||||
self.assertRaises(
|
||||
hashring.UnknownNode,
|
||||
ring.remove_node, 'biz')
|
||||
|
||||
def test_add_then_removenode(self):
|
||||
nodes = ['foo', 'bar']
|
||||
ring = hashring.HashRing(nodes)
|
||||
self.assertEqual(set(nodes), ring.nodes)
|
||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||
nodes.append('baz')
|
||||
ring.add_node('baz')
|
||||
self.assertEqual(set(nodes), ring.nodes)
|
||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||
nodes.remove('bar')
|
||||
ring.remove_node('bar')
|
||||
self.assertEqual(set(nodes), ring.nodes)
|
||||
self.assertEqual(2 ** 5 * len(nodes), len(ring))
|
||||
|
||||
def test_distribution_one_replica(self):
|
||||
nodes = ['foo', 'bar', 'baz']
|
||||
ring = hashring.HashRing(nodes)
|
||||
fake_1_nodes = ring.get_nodes(b'fake')
|
||||
fake_2_nodes = ring.get_nodes(b'fake-again')
|
||||
# We should have one nodes for each thing
|
||||
self.assertEqual(1, len(fake_1_nodes))
|
||||
self.assertEqual(1, len(fake_2_nodes))
|
||||
# And they must not be the same answers even on this simple data.
|
||||
self.assertNotEqual(fake_1_nodes, fake_2_nodes)
|
||||
|
||||
def test_distribution_more_replica(self):
|
||||
nodes = ['foo', 'bar', 'baz']
|
||||
ring = hashring.HashRing(nodes)
|
||||
fake_1_nodes = ring.get_nodes(b'fake', replicas=2)
|
||||
fake_2_nodes = ring.get_nodes(b'fake-again', replicas=2)
|
||||
# We should have one nodes for each thing
|
||||
self.assertEqual(2, len(fake_1_nodes))
|
||||
self.assertEqual(2, len(fake_2_nodes))
|
||||
fake_1_nodes = ring.get_nodes(b'fake', replicas=3)
|
||||
fake_2_nodes = ring.get_nodes(b'fake-again', replicas=3)
|
||||
# We should have one nodes for each thing
|
||||
self.assertEqual(3, len(fake_1_nodes))
|
||||
self.assertEqual(3, len(fake_2_nodes))
|
||||
self.assertEqual(fake_1_nodes, fake_2_nodes)
|
||||
|
||||
def test_ignore_nodes(self):
|
||||
nodes = ['foo', 'bar', 'baz']
|
||||
ring = hashring.HashRing(nodes)
|
||||
equals_bar_or_baz = matchers.MatchesAny(
|
||||
matchers.Equals({'bar'}),
|
||||
matchers.Equals({'baz'}))
|
||||
self.assertThat(
|
||||
ring.get_nodes(b'fake', ignore_nodes=['foo']),
|
||||
equals_bar_or_baz)
|
||||
self.assertThat(
|
||||
ring.get_nodes(b'fake', ignore_nodes=['foo', 'bar']),
|
||||
equals_bar_or_baz)
|
||||
self.assertEqual(set(), ring.get_nodes(b'fake', ignore_nodes=nodes))
|
||||
|
||||
@staticmethod
|
||||
def _compare_rings(nodes, conductors, ring, new_conductors, new_ring):
|
||||
delta = {}
|
||||
mapping = {
|
||||
'node': list(ring.get_nodes(node.encode('ascii')))[0]
|
||||
for node in nodes
|
||||
}
|
||||
new_mapping = {
|
||||
'node': list(new_ring.get_nodes(node.encode('ascii')))[0]
|
||||
for node in nodes
|
||||
}
|
||||
|
||||
for key, old in mapping.items():
|
||||
new = new_mapping.get(key, None)
|
||||
if new != old:
|
||||
delta[key] = (old, new)
|
||||
return delta
|
||||
|
||||
def test_rebalance_stability_join(self):
|
||||
num_services = 10
|
||||
num_nodes = 10000
|
||||
# Adding 1 service to a set of N should move 1/(N+1) of all nodes
|
||||
# Eg, for a cluster of 10 nodes, adding one should move 1/11, or 9%
|
||||
# We allow for 1/N to allow for rounding in tests.
|
||||
redistribution_factor = 1.0 / num_services
|
||||
|
||||
nodes = [str(x) for x in range(num_nodes)]
|
||||
services = [str(x) for x in range(num_services)]
|
||||
new_services = services + ['new']
|
||||
delta = self._compare_rings(
|
||||
nodes, services, hashring.HashRing(services),
|
||||
new_services, hashring.HashRing(new_services))
|
||||
|
||||
self.assertLess(len(delta), num_nodes * redistribution_factor)
|
||||
|
||||
def test_rebalance_stability_leave(self):
|
||||
num_services = 10
|
||||
num_nodes = 10000
|
||||
# Removing 1 service from a set of N should move 1/(N) of all nodes
|
||||
# Eg, for a cluster of 10 nodes, removing one should move 1/10, or 10%
|
||||
# We allow for 1/(N-1) to allow for rounding in tests.
|
||||
redistribution_factor = 1.0 / (num_services - 1)
|
||||
|
||||
nodes = [str(x) for x in range(num_nodes)]
|
||||
services = [str(x) for x in range(num_services)]
|
||||
new_services = services[:]
|
||||
new_services.pop()
|
||||
delta = self._compare_rings(
|
||||
nodes, services, hashring.HashRing(services),
|
||||
new_services, hashring.HashRing(new_services))
|
||||
|
||||
self.assertLess(len(delta), num_nodes * redistribution_factor)
|
||||
|
||||
def test_ignore_non_existent_node(self):
|
||||
nodes = ['foo', 'bar']
|
||||
ring = hashring.HashRing(nodes)
|
||||
self.assertEqual({'foo'}, ring.get_nodes(b'fake',
|
||||
ignore_nodes=['baz']))
|
Loading…
Reference in New Issue
Block a user