Make hash ring mapping be more consistent

Nejc Saje reported on the openstack-dev list that our hash ring was not
stable as hosts were added or removed. This is a significant issue since
the hash ring is meant to minimize the impact of adding or removing
conductors to only approximately the change in capacity.

IMPORTANT: When this patch is added to a running Ironic environment it
will cause ring calculations to change. This will cause most existing
node -> conductor mappings to be invalid, which will cause existing
deployed nodes to be unable to reboot (if using a netbooting driver like
pxe_ipmi). The cluster will self-correct over time as the periodic task
_sync_hash_ring() rebuilds PXE environments.

NOTE: The meaning of the hash_partition_exponent configuration setting
had to be changed - if we define the total number of partitions of the
ring via it then adding a host simultaneously removes some partitions of
each existing host as well as adding new partitions from the new host -
this results in more perturbation. Instead we use it to define the
number of partitions per host. Existing deployments will continue to
work, but may be able to reduce some CPU overhead by shrinking their
setting.

Change-Id: Iad3c9f6b460e9162ca91d5719e42f7950f2f18df
Closes-Bug: #1365334
DocImpact: See above.
UpgradeImpact: See above.
This commit is contained in:
Robert Collins 2014-09-04 00:47:13 +00:00 committed by Devananda van der Veen
parent 552a927e56
commit b272e61218
4 changed files with 167 additions and 51 deletions

View File

@ -236,8 +236,14 @@
# distributing load across conductors. Larger values will
# result in more even distribution of load and less load when
# rebalancing the ring, but more memory usage. Number of
# partitions is (2^hash_partition_exponent). (integer value)
#hash_partition_exponent=16
# partitions per conductor is (2^hash_partition_exponent).
# This determines the granularity of rebalancing: given 10
# hosts, and an exponent of the 2, there are 40 partitions in
# the ring.A few thousand partitions should make rebalancing
# smooth in most cases. The default is suitable for up to a
# few hundred conductors. Too many partitions has a CPU
# impact. (integer value)
#hash_partition_exponent=5
# [Experimental Feature] Number of hosts to map onto each hash
# partition. Setting this to more than one will cause

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import array
import bisect
import hashlib
import struct
import threading
@ -26,12 +26,19 @@ from ironic.db import api as dbapi
hash_opts = [
cfg.IntOpt('hash_partition_exponent',
default=16,
default=5,
help='Exponent to determine number of hash partitions to use '
'when distributing load across conductors. Larger values '
'will result in more even distribution of load and less '
'load when rebalancing the ring, but more memory usage. '
'Number of partitions is (2^hash_partition_exponent).'),
'Number of partitions per conductor is '
'(2^hash_partition_exponent). This determines the '
'granularity of rebalancing: given 10 hosts, and an '
'exponent of the 2, there are 40 partitions in the ring.'
'A few thousand partitions should make rebalancing '
'smooth in most cases. The default is suitable for up to '
'a few hundred conductors. Too many partitions has a CPU '
'impact.'),
cfg.IntOpt('hash_distribution_replicas',
default=1,
help='[Experimental Feature] '
@ -47,6 +54,16 @@ CONF.register_opts(hash_opts)
class HashRing(object):
"""A stable hash ring.
We map item N to a host Y based on the closest lower hash
- hash(item) -> partition
- hash(host) -> divider
- closest lower divider is the host to use
- we hash each host many times to spread load more finely
as otherwise adding a host gets (on average) 50% of the load of
just one other host assigned to it.
"""
def __init__(self, hosts, replicas=None):
"""Create a new hash ring across the specified hosts.
@ -61,21 +78,30 @@ class HashRing(object):
replicas = CONF.hash_distribution_replicas
try:
self.hosts = list(hosts)
self.hosts = set(hosts)
self.replicas = replicas if replicas <= len(hosts) else len(hosts)
except TypeError:
raise exception.Invalid(
_("Invalid hosts supplied when building HashRing."))
self.partition_shift = 32 - CONF.hash_partition_exponent
self.part2host = array.array('H')
for p in range(2 ** CONF.hash_partition_exponent):
self.part2host.append(p % len(hosts))
self._host_hashes = {}
for host in hosts:
key = str(host).encode('utf8')
key_hash = hashlib.md5(key)
for p in range(2 ** CONF.hash_partition_exponent):
key_hash.update(key)
hashed_key = struct.unpack_from('>I', key_hash.digest())[0]
self._host_hashes[hashed_key] = host
# Gather the (possibly colliding) resulting hashes into a bisectable
# list.
self._partitions = sorted(self._host_hashes.keys())
def _get_partition(self, data):
try:
return (struct.unpack_from('>I', hashlib.md5(data).digest())[0]
>> self.partition_shift)
hashed_key = struct.unpack_from(
'>I', hashlib.md5(data).digest())[0]
position = bisect.bisect(self._partitions, hashed_key)
return position if position < len(self._partitions) else 0
except TypeError:
raise exception.Invalid(
_("Invalid data supplied to HashRing.get_hosts."))
@ -93,24 +119,35 @@ class HashRing(object):
this `HashRing` was created with. It may be less than this
if ignore_hosts is not None.
"""
host_ids = []
hosts = []
if ignore_hosts is None:
ignore_host_ids = []
ignore_hosts = set()
else:
ignore_host_ids = [self.hosts.index(h)
for h in ignore_hosts if h in self.hosts]
ignore_hosts = set(ignore_hosts)
ignore_hosts.intersection_update(self.hosts)
partition = self._get_partition(data)
for replica in range(0, self.replicas):
if len(host_ids + ignore_host_ids) == len(self.hosts):
# prevent infinite loop
if len(hosts) + len(ignore_hosts) == len(self.hosts):
# prevent infinite loop - cannot allocate more fallbacks.
break
while self.part2host[partition] in host_ids + ignore_host_ids:
# Linear probing: partition N, then N+1 etc.
host = self._get_host(partition)
while host in hosts or host in ignore_hosts:
partition += 1
if partition >= len(self.part2host):
if partition >= len(self._partitions):
partition = 0
host_ids.append(self.part2host[partition])
return [self.hosts[h] for h in host_ids]
host = self._get_host(partition)
hosts.append(host)
return hosts
def _get_host(self, partition):
"""Find what host is serving a partition.
:param partition: The index of the partition in the partition map.
e.g. 0 is the first partition, 1 is the second.
:return: The host object the ring was constructed with.
"""
return self._host_hashes[self._partitions[partition]]
class HashRingManager(object):

View File

@ -107,7 +107,7 @@ class ConductorAPI(object):
"""
hash_ring = self.ring_manager[driver_name]
host = random.choice(hash_ring.hosts)
host = random.choice(list(hash_ring.hosts))
return self.topic + "." + host
def update_node(self, context, node_obj, topic=None):

View File

@ -14,6 +14,7 @@
# under the License.
from oslo.config import cfg
from testtools import matchers
from ironic.common import exception
from ironic.common import hash_ring
@ -37,69 +38,141 @@ class HashRingTestCase(base.TestCase):
hosts = ['foo', 'bar']
replicas = 2
ring = hash_ring.HashRing(hosts, replicas=replicas)
self.assertEqual(hosts, ring.hosts)
self.assertEqual(set(hosts), ring.hosts)
self.assertEqual(replicas, ring.replicas)
def test_create_with_different_partition_counts(self):
hosts = ['foo', 'bar']
CONF.set_override('hash_partition_exponent', 2)
ring = hash_ring.HashRing(hosts)
self.assertEqual(2 ** 2, len(ring.part2host))
self.assertEqual(2 ** 2 * 2, len(ring._partitions))
CONF.set_override('hash_partition_exponent', 8)
ring = hash_ring.HashRing(hosts)
self.assertEqual(2 ** 8, len(ring.part2host))
self.assertEqual(2 ** 8 * 2, len(ring._partitions))
CONF.set_override('hash_partition_exponent', 16)
ring = hash_ring.HashRing(hosts)
self.assertEqual(2 ** 16, len(ring.part2host))
self.assertEqual(2 ** 16 * 2, len(ring._partitions))
def test_distribution_one_replica(self):
hosts = ['foo', 'bar', 'baz']
ring = hash_ring.HashRing(hosts, replicas=1)
self.assertEqual(['foo'], ring.get_hosts('fake'))
self.assertEqual(['bar'], ring.get_hosts('fake-again'))
fake_1_hosts = ring.get_hosts('fake')
fake_2_hosts = ring.get_hosts('fake-again')
# We should have one hosts for each thing
self.assertThat(fake_1_hosts, matchers.HasLength(1))
self.assertThat(fake_2_hosts, matchers.HasLength(1))
# And they must not be the same answers even on this simple data.
self.assertNotEqual(fake_1_hosts, fake_2_hosts)
def test_distribution_two_replicas(self):
hosts = ['foo', 'bar', 'baz']
ring = hash_ring.HashRing(hosts, replicas=2)
self.assertEqual(['foo', 'bar'], ring.get_hosts('fake'))
self.assertEqual(['bar', 'baz'], ring.get_hosts('fake-again'))
fake_1_hosts = ring.get_hosts('fake')
fake_2_hosts = ring.get_hosts('fake-again')
# We should have two hosts for each thing
self.assertThat(fake_1_hosts, matchers.HasLength(2))
self.assertThat(fake_2_hosts, matchers.HasLength(2))
# And they must not be the same answers even on this simple data
# because if they were we'd be making the active replica a hot spot.
self.assertNotEqual(fake_1_hosts, fake_2_hosts)
def test_distribution_three_replicas(self):
hosts = ['foo', 'bar', 'baz']
ring = hash_ring.HashRing(hosts, replicas=3)
self.assertEqual(['foo', 'bar', 'baz'], ring.get_hosts('fake'))
self.assertEqual(['bar', 'baz', 'foo'], ring.get_hosts('fake-again'))
fake_1_hosts = ring.get_hosts('fake')
fake_2_hosts = ring.get_hosts('fake-again')
# We should have two hosts for each thing
self.assertThat(fake_1_hosts, matchers.HasLength(3))
self.assertThat(fake_2_hosts, matchers.HasLength(3))
# And they must not be the same answers even on this simple data
# because if they were we'd be making the active replica a hot spot.
self.assertNotEqual(fake_1_hosts, fake_2_hosts)
self.assertNotEqual(fake_1_hosts[0], fake_2_hosts[0])
def test_ignore_hosts(self):
hosts = ['foo', 'bar', 'baz']
ring = hash_ring.HashRing(hosts, replicas=1)
self.assertEqual(['bar'], ring.get_hosts('fake',
ignore_hosts=['foo']))
self.assertEqual(['baz'], ring.get_hosts('fake',
ignore_hosts=['foo', 'bar']))
self.assertEqual([], ring.get_hosts('fake',
ignore_hosts=hosts))
equals_bar_or_baz = matchers.MatchesAny(
matchers.Equals(['bar']),
matchers.Equals(['baz']))
self.assertThat(
ring.get_hosts('fake', ignore_hosts=['foo']),
equals_bar_or_baz)
self.assertThat(
ring.get_hosts('fake', ignore_hosts=['foo', 'bar']),
equals_bar_or_baz)
self.assertEqual([], ring.get_hosts('fake', ignore_hosts=hosts))
def test_ignore_hosts_with_replicas(self):
hosts = ['foo', 'bar', 'baz']
ring = hash_ring.HashRing(hosts, replicas=2)
self.assertEqual(['bar', 'baz'], ring.get_hosts('fake',
ignore_hosts=['foo']))
self.assertEqual(['baz'], ring.get_hosts('fake',
ignore_hosts=['foo', 'bar']))
self.assertEqual(['baz', 'foo'], ring.get_hosts('fake-again',
ignore_hosts=['bar']))
self.assertEqual(['foo'], ring.get_hosts('fake-again',
ignore_hosts=['bar', 'baz']))
self.assertEqual([], ring.get_hosts('fake',
ignore_hosts=hosts))
self.assertEqual(
set(['bar', 'baz']),
set(ring.get_hosts('fake', ignore_hosts=['foo'])))
self.assertEqual(set(['baz']),
set(ring.get_hosts('fake', ignore_hosts=['foo', 'bar'])))
self.assertEqual(
set(['baz', 'foo']),
set(ring.get_hosts('fake-again', ignore_hosts=['bar'])))
self.assertEqual(
set(['foo']),
set(ring.get_hosts('fake-again', ignore_hosts=['bar', 'baz'])))
self.assertEqual([], ring.get_hosts('fake', ignore_hosts=hosts))
def _compare_rings(self, nodes, conductors, ring,
new_conductors, new_ring):
delta = {}
mapping = dict((node, ring.get_hosts(node)[0]) for node in nodes)
new_mapping = dict(
(node, new_ring.get_hosts(node)[0]) for node in nodes)
for key, old in mapping.items():
new = new_mapping.get(key, None)
if new != old:
delta[key] = (old, new)
return delta
def test_rebalance_stability_join(self):
num_conductors = 10
num_nodes = 10000
# Adding 1 conductor to a set of N should move 1/(N+1) of all nodes
# Eg, for a cluster of 10 nodes, adding one should move 1/11, or 9%
# We allow for 1/N to allow for rounding in tests.
redistribution_factor = 1.0 / num_conductors
nodes = [str(x) for x in range(num_nodes)]
conductors = [str(x) for x in range(num_conductors)]
new_conductors = conductors + ['new']
delta = self._compare_rings(nodes,
conductors, hash_ring.HashRing(conductors),
new_conductors, hash_ring.HashRing(new_conductors))
self.assertTrue(len(delta) < num_nodes * redistribution_factor)
def test_rebalance_stability_leave(self):
num_conductors = 10
num_nodes = 10000
# Removing 1 conductor from a set of N should move 1/(N) of all nodes
# Eg, for a cluster of 10 nodes, removing one should move 1/10, or 10%
# We allow for 1/(N-1) to allow for rounding in tests.
redistribution_factor = 1.0 / (num_conductors - 1)
nodes = [str(x) for x in range(num_nodes)]
conductors = [str(x) for x in range(num_conductors)]
new_conductors = conductors[:]
new_conductors.pop()
delta = self._compare_rings(nodes,
conductors, hash_ring.HashRing(conductors),
new_conductors, hash_ring.HashRing(new_conductors))
self.assertTrue(len(delta) < num_nodes * redistribution_factor)
def test_more_replicas_than_hosts(self):
hosts = ['foo', 'bar']
ring = hash_ring.HashRing(hosts, replicas=10)
self.assertEqual(hosts, ring.get_hosts('fake'))
self.assertEqual(set(hosts), set(ring.get_hosts('fake')))
def test_ignore_non_existent_host(self):
hosts = ['foo', 'bar']