fb1a67fe6b
As sync_local_state periodic task can be disabled, which will lose out on hash ring refresh, so the refresh logic should be independent of it. this patch adds a updated_at to HashRingManager to track of when the ring was last refreshed, and when checks the ring is set, also check the age and rebuild if older than the new config option hash_ring_reset_interval which is default to 180s. Change-Id: Ie46dbf93b920543f99e11774a29878aaf27c3400 Closes-Bug: #1506657
209 lines
7.9 KiB
Python
209 lines
7.9 KiB
Python
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import bisect
|
|
import hashlib
|
|
import threading
|
|
import time
|
|
|
|
from oslo_config import cfg
|
|
import six
|
|
|
|
from ironic.common import exception
|
|
from ironic.common.i18n import _
|
|
from ironic.db import api as dbapi
|
|
|
|
hash_opts = [
|
|
cfg.IntOpt('hash_partition_exponent',
|
|
default=5,
|
|
help=_('Exponent to determine number of hash partitions to use '
|
|
'when distributing load across conductors. Larger '
|
|
'values will result in more even distribution of load '
|
|
'and less load when rebalancing the ring, but more '
|
|
'memory usage. Number of partitions per conductor is '
|
|
'(2^hash_partition_exponent). This determines the '
|
|
'granularity of rebalancing: given 10 hosts, and an '
|
|
'exponent of the 2, there are 40 partitions in the ring.'
|
|
'A few thousand partitions should make rebalancing '
|
|
'smooth in most cases. The default is suitable for up '
|
|
'to a few hundred conductors. Too many partitions has a '
|
|
'CPU impact.')),
|
|
cfg.IntOpt('hash_distribution_replicas',
|
|
default=1,
|
|
help=_('[Experimental Feature] '
|
|
'Number of hosts to map onto each hash partition. '
|
|
'Setting this to more than one will cause additional '
|
|
'conductor services to prepare deployment environments '
|
|
'and potentially allow the Ironic cluster to recover '
|
|
'more quickly if a conductor instance is terminated.')),
|
|
cfg.IntOpt('hash_ring_reset_interval',
|
|
default=180,
|
|
help=_('Interval (in seconds) between hash ring resets.')),
|
|
]
|
|
|
|
CONF = cfg.CONF
|
|
CONF.register_opts(hash_opts)
|
|
|
|
|
|
class HashRing(object):
|
|
"""A stable hash ring.
|
|
|
|
We map item N to a host Y based on the closest lower hash:
|
|
|
|
- hash(item) -> partition
|
|
- hash(host) -> divider
|
|
- closest lower divider is the host to use
|
|
- we hash each host many times to spread load more finely
|
|
as otherwise adding a host gets (on average) 50% of the load of
|
|
just one other host assigned to it.
|
|
"""
|
|
|
|
def __init__(self, hosts, replicas=None):
|
|
"""Create a new hash ring across the specified hosts.
|
|
|
|
:param hosts: an iterable of hosts which will be mapped.
|
|
:param replicas: number of hosts to map to each hash partition,
|
|
or len(hosts), which ever is lesser.
|
|
Default: CONF.hash_distribution_replicas
|
|
|
|
"""
|
|
if replicas is None:
|
|
replicas = CONF.hash_distribution_replicas
|
|
|
|
try:
|
|
self.hosts = set(hosts)
|
|
self.replicas = replicas if replicas <= len(hosts) else len(hosts)
|
|
except TypeError:
|
|
raise exception.Invalid(
|
|
_("Invalid hosts supplied when building HashRing."))
|
|
|
|
self._host_hashes = {}
|
|
for host in hosts:
|
|
key = str(host).encode('utf8')
|
|
key_hash = hashlib.md5(key)
|
|
for p in range(2 ** CONF.hash_partition_exponent):
|
|
key_hash.update(key)
|
|
hashed_key = self._hash2int(key_hash)
|
|
self._host_hashes[hashed_key] = host
|
|
# Gather the (possibly colliding) resulting hashes into a bisectable
|
|
# list.
|
|
self._partitions = sorted(self._host_hashes.keys())
|
|
|
|
def _hash2int(self, key_hash):
|
|
"""Convert the given hash's digest to a numerical value for the ring.
|
|
|
|
:returns: An integer equivalent value of the digest.
|
|
"""
|
|
return int(key_hash.hexdigest(), 16)
|
|
|
|
def _get_partition(self, data):
|
|
try:
|
|
if six.PY3 and data is not None:
|
|
data = data.encode('utf-8')
|
|
key_hash = hashlib.md5(data)
|
|
hashed_key = self._hash2int(key_hash)
|
|
position = bisect.bisect(self._partitions, hashed_key)
|
|
return position if position < len(self._partitions) else 0
|
|
except TypeError:
|
|
raise exception.Invalid(
|
|
_("Invalid data supplied to HashRing.get_hosts."))
|
|
|
|
def get_hosts(self, data, ignore_hosts=None):
|
|
"""Get the list of hosts which the supplied data maps onto.
|
|
|
|
:param data: A string identifier to be mapped across the ring.
|
|
:param ignore_hosts: A list of hosts to skip when performing the hash.
|
|
Useful to temporarily skip down hosts without
|
|
performing a full rebalance.
|
|
Default: None.
|
|
:returns: a list of hosts.
|
|
The length of this list depends on the number of replicas
|
|
this `HashRing` was created with. It may be less than this
|
|
if ignore_hosts is not None.
|
|
"""
|
|
hosts = []
|
|
if ignore_hosts is None:
|
|
ignore_hosts = set()
|
|
else:
|
|
ignore_hosts = set(ignore_hosts)
|
|
ignore_hosts.intersection_update(self.hosts)
|
|
partition = self._get_partition(data)
|
|
for replica in range(0, self.replicas):
|
|
if len(hosts) + len(ignore_hosts) == len(self.hosts):
|
|
# prevent infinite loop - cannot allocate more fallbacks.
|
|
break
|
|
# Linear probing: partition N, then N+1 etc.
|
|
host = self._get_host(partition)
|
|
while host in hosts or host in ignore_hosts:
|
|
partition += 1
|
|
if partition >= len(self._partitions):
|
|
partition = 0
|
|
host = self._get_host(partition)
|
|
hosts.append(host)
|
|
return hosts
|
|
|
|
def _get_host(self, partition):
|
|
"""Find what host is serving a partition.
|
|
|
|
:param partition: The index of the partition in the partition map.
|
|
e.g. 0 is the first partition, 1 is the second.
|
|
:return: The host object the ring was constructed with.
|
|
"""
|
|
return self._host_hashes[self._partitions[partition]]
|
|
|
|
|
|
class HashRingManager(object):
|
|
_hash_rings = None
|
|
_lock = threading.Lock()
|
|
|
|
def __init__(self):
|
|
self.dbapi = dbapi.get_instance()
|
|
self.updated_at = time.time()
|
|
|
|
@property
|
|
def ring(self):
|
|
interval = CONF.hash_ring_reset_interval
|
|
limit = time.time() - interval
|
|
# Hot path, no lock
|
|
if self._hash_rings is not None and self.updated_at >= limit:
|
|
return self._hash_rings
|
|
|
|
with self._lock:
|
|
if self._hash_rings is None or self.updated_at < limit:
|
|
rings = self._load_hash_rings()
|
|
self.__class__._hash_rings = rings
|
|
self.updated_at = time.time()
|
|
return self._hash_rings
|
|
|
|
def _load_hash_rings(self):
|
|
rings = {}
|
|
d2c = self.dbapi.get_active_driver_dict()
|
|
|
|
for driver_name, hosts in d2c.items():
|
|
rings[driver_name] = HashRing(hosts)
|
|
return rings
|
|
|
|
@classmethod
|
|
def reset(cls):
|
|
with cls._lock:
|
|
cls._hash_rings = None
|
|
|
|
def __getitem__(self, driver_name):
|
|
try:
|
|
return self.ring[driver_name]
|
|
except KeyError:
|
|
raise exception.DriverNotFound(
|
|
_("The driver '%s' is unknown.") % driver_name)
|