diff --git a/doc/source/tutorial/hashring.rst b/doc/source/tutorial/hashring.rst new file mode 100644 index 0000000..0adc573 --- /dev/null +++ b/doc/source/tutorial/hashring.rst @@ -0,0 +1,10 @@ +=========== + Hash ring +=========== + +Tooz provides a consistent hash ring implementation. It can be used to map +objects (represented via binary keys) to one or several nodes. When the node +list changes, the rebalancing of objects across the ring is kept minimal. + +.. literalinclude:: ../../../examples/hashring.py + :language: python diff --git a/doc/source/tutorial/index.rst b/doc/source/tutorial/index.rst index 8cb31a3..eeb2ea0 100644 --- a/doc/source/tutorial/index.rst +++ b/doc/source/tutorial/index.rst @@ -12,3 +12,4 @@ use tooz in your application. group_membership leader_election lock + hashring diff --git a/examples/hashring.py b/examples/hashring.py new file mode 100644 index 0000000..31ab1d4 --- /dev/null +++ b/examples/hashring.py @@ -0,0 +1,15 @@ +from tooz import hashring + +hashring = hashring.HashRing({'node1', 'node2', 'node3'}) + +# Returns set(['node2']) +nodes_for_foo = hashring[b'foo'] + +# Returns set(['node2', 'node3']) +nodes_for_foo_with_replicas = hashring.get_nodes(b'foo', + replicas=2) + +# Returns set(['node1', 'node3']) +nodes_for_foo_with_replicas = hashring.get_nodes(b'foo', + replicas=2, + ignore_nodes={'node2'}) diff --git a/releasenotes/notes/hashring-0470f9119ef63d49.yaml b/releasenotes/notes/hashring-0470f9119ef63d49.yaml new file mode 100644 index 0000000..de7b7cf --- /dev/null +++ b/releasenotes/notes/hashring-0470f9119ef63d49.yaml @@ -0,0 +1,3 @@ +--- +features: + - Add `tooz.hashring`, a consistent hash ring implementation. diff --git a/tooz/hashring.py b/tooz/hashring.py new file mode 100644 index 0000000..61f6799 --- /dev/null +++ b/tooz/hashring.py @@ -0,0 +1,133 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import bisect +import hashlib + +import six + +import tooz + + +class UnknownNode(tooz.ToozError): + """Node is unknown.""" + def __init__(self, node): + super(UnknownNode, self).__init__("Unknown node `%s'" % node) + self.node = node + + +class HashRing(object): + """Map objects onto nodes based on their consistent hash.""" + + def __init__(self, nodes, partitions=2**5): + """Create a new hashring. + + :param nodes: List of nodes where objects will be mapped onto. + :param partitions: Number of partitions to spread objects onto. + """ + self.nodes = set() + self._ring = dict() + self._partitions = [] + self._partition_number = partitions + + self.add_nodes(set(nodes)) + + def add_node(self, node): + """Add a node to the hashring. + + :param node: Node to add. + """ + return self.add_nodes((node,)) + + def add_nodes(self, nodes): + """Add nodes to the hashring. + + :param nodes: Nodes to add. + """ + for node in nodes: + key = str(node).encode('utf-8') + key_hash = hashlib.md5(key) + for r in six.moves.range(self._partition_number): + key_hash.update(key) + self._ring[self._hash2int(key_hash)] = node + + self.nodes.add(node) + + self._partitions = sorted(self._ring.keys()) + + def remove_node(self, node): + """Remove a node from the hashring. + + Raises py:exc:`UnknownNode` + + :param node: Node to remove. + """ + try: + self.nodes.remove(node) + except KeyError: + raise UnknownNode(node) + + key = str(node).encode('utf-8') + key_hash = hashlib.md5(key) + for r in six.moves.range(self._partition_number): + key_hash.update(key) + del self._ring[self._hash2int(key_hash)] + + self._partitions = sorted(self._ring.keys()) + + @staticmethod + def _hash2int(key): + return int(key.hexdigest(), 16) + + def _get_partition(self, data): + hashed_key = self._hash2int(hashlib.md5(data)) + position = bisect.bisect(self._partitions, hashed_key) + return position if position < len(self._partitions) else 0 + + def _get_node(self, partition): + return self._ring[self._partitions[partition]] + + def get_nodes(self, data, ignore_nodes=None, replicas=1): + """Get the set of nodes which the supplied data map onto. + + :param data: A byte identifier to be mapped across the ring. + :param ignore_nodes: Set of nodes to ignore. + :param replicas: Number of replicas to use. + :return: A set of nodes whose length depends on the number of replicas. + """ + partition = self._get_partition(data) + + ignore_nodes = set(ignore_nodes) if ignore_nodes else set() + candidates = self.nodes - ignore_nodes + + replicas = min(replicas, len(candidates)) + + nodes = set() + for replica in six.moves.range(0, replicas): + node = self._get_node(partition) + while node in nodes or node in ignore_nodes: + partition += 1 + if partition >= len(self._partitions): + partition = 0 + node = self._get_node(partition) + nodes.add(node) + + return nodes + + def __getitem__(self, key): + return self.get_nodes(key) + + def __len__(self): + return len(self._partitions) diff --git a/tooz/tests/test_hashring.py b/tooz/tests/test_hashring.py new file mode 100644 index 0000000..d03312e --- /dev/null +++ b/tooz/tests/test_hashring.py @@ -0,0 +1,192 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import hashlib + +import mock +from testtools import matchers +from testtools import testcase + +from tooz import hashring + + +class HashRingTestCase(testcase.TestCase): + + # NOTE(deva): the mapping used in these tests is as follows: + # if nodes = [foo, bar]: + # fake -> foo, bar + # if nodes = [foo, bar, baz]: + # fake -> foo, bar, baz + # fake-again -> bar, baz, foo + + @mock.patch.object(hashlib, 'md5', autospec=True) + def test__hash2int_returns_int(self, mock_md5): + r1 = 32 * 'a' + r2 = 32 * 'b' + # 2**PARTITION_EXPONENT calls to md5.update per node + # PARTITION_EXPONENT is currently always 5, so 32 calls each here + mock_md5.return_value.hexdigest.side_effect = [r1] * 32 + [r2] * 32 + + nodes = ['foo', 'bar'] + ring = hashring.HashRing(nodes) + + self.assertIn(int(r1, 16), ring._ring) + self.assertIn(int(r2, 16), ring._ring) + + def test_create_ring(self): + nodes = ['foo', 'bar'] + ring = hashring.HashRing(nodes) + self.assertEqual(set(nodes), ring.nodes) + self.assertEqual(2 ** 5 * 2, len(ring)) + + def test_add_node(self): + nodes = ['foo', 'bar'] + ring = hashring.HashRing(nodes) + self.assertEqual(set(nodes), ring.nodes) + self.assertEqual(2 ** 5 * len(nodes), len(ring)) + nodes.append('baz') + ring.add_node('baz') + self.assertEqual(set(nodes), ring.nodes) + self.assertEqual(2 ** 5 * len(nodes), len(ring)) + + def test_remove_node(self): + nodes = ['foo', 'bar'] + ring = hashring.HashRing(nodes) + self.assertEqual(set(nodes), ring.nodes) + self.assertEqual(2 ** 5 * len(nodes), len(ring)) + nodes.remove('bar') + ring.remove_node('bar') + self.assertEqual(set(nodes), ring.nodes) + self.assertEqual(2 ** 5 * len(nodes), len(ring)) + + def test_remove_node_unknown(self): + nodes = ['foo', 'bar'] + ring = hashring.HashRing(nodes) + self.assertRaises( + hashring.UnknownNode, + ring.remove_node, 'biz') + + def test_add_then_removenode(self): + nodes = ['foo', 'bar'] + ring = hashring.HashRing(nodes) + self.assertEqual(set(nodes), ring.nodes) + self.assertEqual(2 ** 5 * len(nodes), len(ring)) + nodes.append('baz') + ring.add_node('baz') + self.assertEqual(set(nodes), ring.nodes) + self.assertEqual(2 ** 5 * len(nodes), len(ring)) + nodes.remove('bar') + ring.remove_node('bar') + self.assertEqual(set(nodes), ring.nodes) + self.assertEqual(2 ** 5 * len(nodes), len(ring)) + + def test_distribution_one_replica(self): + nodes = ['foo', 'bar', 'baz'] + ring = hashring.HashRing(nodes) + fake_1_nodes = ring.get_nodes(b'fake') + fake_2_nodes = ring.get_nodes(b'fake-again') + # We should have one nodes for each thing + self.assertEqual(1, len(fake_1_nodes)) + self.assertEqual(1, len(fake_2_nodes)) + # And they must not be the same answers even on this simple data. + self.assertNotEqual(fake_1_nodes, fake_2_nodes) + + def test_distribution_more_replica(self): + nodes = ['foo', 'bar', 'baz'] + ring = hashring.HashRing(nodes) + fake_1_nodes = ring.get_nodes(b'fake', replicas=2) + fake_2_nodes = ring.get_nodes(b'fake-again', replicas=2) + # We should have one nodes for each thing + self.assertEqual(2, len(fake_1_nodes)) + self.assertEqual(2, len(fake_2_nodes)) + fake_1_nodes = ring.get_nodes(b'fake', replicas=3) + fake_2_nodes = ring.get_nodes(b'fake-again', replicas=3) + # We should have one nodes for each thing + self.assertEqual(3, len(fake_1_nodes)) + self.assertEqual(3, len(fake_2_nodes)) + self.assertEqual(fake_1_nodes, fake_2_nodes) + + def test_ignore_nodes(self): + nodes = ['foo', 'bar', 'baz'] + ring = hashring.HashRing(nodes) + equals_bar_or_baz = matchers.MatchesAny( + matchers.Equals({'bar'}), + matchers.Equals({'baz'})) + self.assertThat( + ring.get_nodes(b'fake', ignore_nodes=['foo']), + equals_bar_or_baz) + self.assertThat( + ring.get_nodes(b'fake', ignore_nodes=['foo', 'bar']), + equals_bar_or_baz) + self.assertEqual(set(), ring.get_nodes(b'fake', ignore_nodes=nodes)) + + @staticmethod + def _compare_rings(nodes, conductors, ring, new_conductors, new_ring): + delta = {} + mapping = { + 'node': list(ring.get_nodes(node.encode('ascii')))[0] + for node in nodes + } + new_mapping = { + 'node': list(new_ring.get_nodes(node.encode('ascii')))[0] + for node in nodes + } + + for key, old in mapping.items(): + new = new_mapping.get(key, None) + if new != old: + delta[key] = (old, new) + return delta + + def test_rebalance_stability_join(self): + num_services = 10 + num_nodes = 10000 + # Adding 1 service to a set of N should move 1/(N+1) of all nodes + # Eg, for a cluster of 10 nodes, adding one should move 1/11, or 9% + # We allow for 1/N to allow for rounding in tests. + redistribution_factor = 1.0 / num_services + + nodes = [str(x) for x in range(num_nodes)] + services = [str(x) for x in range(num_services)] + new_services = services + ['new'] + delta = self._compare_rings( + nodes, services, hashring.HashRing(services), + new_services, hashring.HashRing(new_services)) + + self.assertLess(len(delta), num_nodes * redistribution_factor) + + def test_rebalance_stability_leave(self): + num_services = 10 + num_nodes = 10000 + # Removing 1 service from a set of N should move 1/(N) of all nodes + # Eg, for a cluster of 10 nodes, removing one should move 1/10, or 10% + # We allow for 1/(N-1) to allow for rounding in tests. + redistribution_factor = 1.0 / (num_services - 1) + + nodes = [str(x) for x in range(num_nodes)] + services = [str(x) for x in range(num_services)] + new_services = services[:] + new_services.pop() + delta = self._compare_rings( + nodes, services, hashring.HashRing(services), + new_services, hashring.HashRing(new_services)) + + self.assertLess(len(delta), num_nodes * redistribution_factor) + + def test_ignore_non_existent_node(self): + nodes = ['foo', 'bar'] + ring = hashring.HashRing(nodes) + self.assertEqual({'foo'}, ring.get_nodes(b'fake', + ignore_nodes=['baz']))