Add HashRingManager to wrap hash ring singleton
Currently, the API service creates a new hash ring on every request. Instead of that, we should cache the hash ring object -- but we should also expose a way to refresh it when necessary. This method will also be used by the ConductorManager to cache and refresh the hash ring when conductors join / leave the cluster. This patch preserves the existing API behavior by resetting the hash ring on every request. This should be addressed in a subsequent patch. Co-Authored-By: Devananda van der Veen <devananda.vdv@gmail.com> Change-Id: Ib7ab55452499d1e1c362e4cd127f1e6e38106d6c
This commit is contained in:
parent
8a0923c437
commit
9de2d21a96
|
@ -114,10 +114,23 @@ class HashRing(object):
|
||||||
|
|
||||||
|
|
||||||
class HashRingManager(object):
|
class HashRingManager(object):
|
||||||
|
_hash_rings = None
|
||||||
|
_lock = threading.Lock()
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._lock = threading.Lock()
|
|
||||||
self.dbapi = dbapi.get_instance()
|
self.dbapi = dbapi.get_instance()
|
||||||
self.hash_rings = None
|
|
||||||
|
@property
|
||||||
|
def ring(self):
|
||||||
|
# Hot path, no lock
|
||||||
|
if self._hash_rings is not None:
|
||||||
|
return self._hash_rings
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
if self._hash_rings is None:
|
||||||
|
rings = self._load_hash_rings()
|
||||||
|
self.__class__._hash_rings = rings
|
||||||
|
return self._hash_rings
|
||||||
|
|
||||||
def _load_hash_rings(self):
|
def _load_hash_rings(self):
|
||||||
rings = {}
|
rings = {}
|
||||||
|
@ -127,21 +140,14 @@ class HashRingManager(object):
|
||||||
rings[driver_name] = HashRing(hosts)
|
rings[driver_name] = HashRing(hosts)
|
||||||
return rings
|
return rings
|
||||||
|
|
||||||
def _ensure_rings_fresh(self):
|
@classmethod
|
||||||
# Hot path, no lock
|
def reset(self):
|
||||||
# TODO(russell_h): Consider adding time-based invalidation of rings
|
|
||||||
if self.hash_rings is not None:
|
|
||||||
return
|
|
||||||
|
|
||||||
with self._lock:
|
with self._lock:
|
||||||
if self.hash_rings is None:
|
self._hash_rings = None
|
||||||
self.hash_rings = self._load_hash_rings()
|
|
||||||
|
|
||||||
def get_hash_ring(self, driver_name):
|
|
||||||
self._ensure_rings_fresh()
|
|
||||||
|
|
||||||
|
def __getitem__(self, driver_name):
|
||||||
try:
|
try:
|
||||||
return self.hash_rings[driver_name]
|
return self.ring[driver_name]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise exception.DriverNotFound(_("The driver '%s' is unknown.") %
|
raise exception.DriverNotFound(
|
||||||
driver_name)
|
_("The driver '%s' is unknown.") % driver_name)
|
||||||
|
|
|
@ -865,7 +865,7 @@ class ConductorManager(periodic_task.PeriodicTasks):
|
||||||
take out a lock.
|
take out a lock.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
ring = self.ring_manager.get_hash_ring(driver)
|
ring = self.ring_manager[driver]
|
||||||
except exception.DriverNotFound:
|
except exception.DriverNotFound:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ import random
|
||||||
from oslo import messaging
|
from oslo import messaging
|
||||||
|
|
||||||
from ironic.common import exception
|
from ironic.common import exception
|
||||||
from ironic.common import hash_ring as hash
|
from ironic.common import hash_ring
|
||||||
from ironic.common.i18n import _
|
from ironic.common.i18n import _
|
||||||
from ironic.common import rpc
|
from ironic.common import rpc
|
||||||
from ironic.conductor import manager
|
from ironic.conductor import manager
|
||||||
|
@ -75,7 +75,8 @@ class ConductorAPI(object):
|
||||||
self.client = rpc.get_client(target,
|
self.client = rpc.get_client(target,
|
||||||
version_cap=self.RPC_API_VERSION,
|
version_cap=self.RPC_API_VERSION,
|
||||||
serializer=serializer)
|
serializer=serializer)
|
||||||
self.ring_manager = hash.HashRingManager()
|
# NOTE(deva): this is going to be buggy
|
||||||
|
self.ring_manager = hash_ring.HashRingManager()
|
||||||
|
|
||||||
def get_topic_for(self, node):
|
def get_topic_for(self, node):
|
||||||
"""Get the RPC topic for the conductor service which the node
|
"""Get the RPC topic for the conductor service which the node
|
||||||
|
@ -87,7 +88,7 @@ class ConductorAPI(object):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
ring = self.ring_manager.get_hash_ring(node.driver)
|
ring = self.ring_manager[node.driver]
|
||||||
dest = ring.get_hosts(node.uuid)
|
dest = ring.get_hosts(node.uuid)
|
||||||
return self.topic + "." + dest[0]
|
return self.topic + "." + dest[0]
|
||||||
except exception.DriverNotFound:
|
except exception.DriverNotFound:
|
||||||
|
@ -105,7 +106,7 @@ class ConductorAPI(object):
|
||||||
:raises: DriverNotFound
|
:raises: DriverNotFound
|
||||||
|
|
||||||
"""
|
"""
|
||||||
hash_ring = self.ring_manager.get_hash_ring(driver_name)
|
hash_ring = self.ring_manager[driver_name]
|
||||||
host = random.choice(hash_ring.hosts)
|
host = random.choice(hash_ring.hosts)
|
||||||
return self.topic + "." + host
|
return self.topic + "." + host
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ from oslo.config import cfg
|
||||||
from ironic.db.sqlalchemy import migration
|
from ironic.db.sqlalchemy import migration
|
||||||
from ironic.db.sqlalchemy import models
|
from ironic.db.sqlalchemy import models
|
||||||
|
|
||||||
|
from ironic.common import hash_ring
|
||||||
from ironic.common import paths
|
from ironic.common import paths
|
||||||
from ironic.db.sqlalchemy import api as sqla_api
|
from ironic.db.sqlalchemy import api as sqla_api
|
||||||
from ironic.objects import base as objects_base
|
from ironic.objects import base as objects_base
|
||||||
|
@ -178,6 +179,7 @@ class TestCase(testtools.TestCase):
|
||||||
self.addCleanup(self._restore_obj_registry)
|
self.addCleanup(self._restore_obj_registry)
|
||||||
|
|
||||||
self.addCleanup(self._clear_attrs)
|
self.addCleanup(self._clear_attrs)
|
||||||
|
self.addCleanup(hash_ring.HashRingManager().reset)
|
||||||
self.useFixture(fixtures.EnvironmentVariable('http_proxy'))
|
self.useFixture(fixtures.EnvironmentVariable('http_proxy'))
|
||||||
self.policy = self.useFixture(policy_fixture.PolicyFixture())
|
self.policy = self.useFixture(policy_fixture.PolicyFixture())
|
||||||
CONF.set_override('fatal_exception_format_errors', True)
|
CONF.set_override('fatal_exception_format_errors', True)
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from ironic.common import exception
|
from ironic.common import exception
|
||||||
from ironic.common import hash_ring as hash
|
from ironic.common import hash_ring
|
||||||
from ironic.db import api as dbapi
|
from ironic.db import api as dbapi
|
||||||
from ironic.tests import base
|
from ironic.tests import base
|
||||||
from ironic.tests.db import base as db_base
|
from ironic.tests.db import base as db_base
|
||||||
|
@ -36,45 +36,45 @@ class HashRingTestCase(base.TestCase):
|
||||||
def test_create_ring(self):
|
def test_create_ring(self):
|
||||||
hosts = ['foo', 'bar']
|
hosts = ['foo', 'bar']
|
||||||
replicas = 2
|
replicas = 2
|
||||||
ring = hash.HashRing(hosts, replicas=replicas)
|
ring = hash_ring.HashRing(hosts, replicas=replicas)
|
||||||
self.assertEqual(hosts, ring.hosts)
|
self.assertEqual(hosts, ring.hosts)
|
||||||
self.assertEqual(replicas, ring.replicas)
|
self.assertEqual(replicas, ring.replicas)
|
||||||
|
|
||||||
def test_create_with_different_partition_counts(self):
|
def test_create_with_different_partition_counts(self):
|
||||||
hosts = ['foo', 'bar']
|
hosts = ['foo', 'bar']
|
||||||
CONF.set_override('hash_partition_exponent', 2)
|
CONF.set_override('hash_partition_exponent', 2)
|
||||||
ring = hash.HashRing(hosts)
|
ring = hash_ring.HashRing(hosts)
|
||||||
self.assertEqual(2 ** 2, len(ring.part2host))
|
self.assertEqual(2 ** 2, len(ring.part2host))
|
||||||
|
|
||||||
CONF.set_override('hash_partition_exponent', 8)
|
CONF.set_override('hash_partition_exponent', 8)
|
||||||
ring = hash.HashRing(hosts)
|
ring = hash_ring.HashRing(hosts)
|
||||||
self.assertEqual(2 ** 8, len(ring.part2host))
|
self.assertEqual(2 ** 8, len(ring.part2host))
|
||||||
|
|
||||||
CONF.set_override('hash_partition_exponent', 16)
|
CONF.set_override('hash_partition_exponent', 16)
|
||||||
ring = hash.HashRing(hosts)
|
ring = hash_ring.HashRing(hosts)
|
||||||
self.assertEqual(2 ** 16, len(ring.part2host))
|
self.assertEqual(2 ** 16, len(ring.part2host))
|
||||||
|
|
||||||
def test_distribution_one_replica(self):
|
def test_distribution_one_replica(self):
|
||||||
hosts = ['foo', 'bar', 'baz']
|
hosts = ['foo', 'bar', 'baz']
|
||||||
ring = hash.HashRing(hosts, replicas=1)
|
ring = hash_ring.HashRing(hosts, replicas=1)
|
||||||
self.assertEqual(['foo'], ring.get_hosts('fake'))
|
self.assertEqual(['foo'], ring.get_hosts('fake'))
|
||||||
self.assertEqual(['bar'], ring.get_hosts('fake-again'))
|
self.assertEqual(['bar'], ring.get_hosts('fake-again'))
|
||||||
|
|
||||||
def test_distribution_two_replicas(self):
|
def test_distribution_two_replicas(self):
|
||||||
hosts = ['foo', 'bar', 'baz']
|
hosts = ['foo', 'bar', 'baz']
|
||||||
ring = hash.HashRing(hosts, replicas=2)
|
ring = hash_ring.HashRing(hosts, replicas=2)
|
||||||
self.assertEqual(['foo', 'bar'], ring.get_hosts('fake'))
|
self.assertEqual(['foo', 'bar'], ring.get_hosts('fake'))
|
||||||
self.assertEqual(['bar', 'baz'], ring.get_hosts('fake-again'))
|
self.assertEqual(['bar', 'baz'], ring.get_hosts('fake-again'))
|
||||||
|
|
||||||
def test_distribution_three_replicas(self):
|
def test_distribution_three_replicas(self):
|
||||||
hosts = ['foo', 'bar', 'baz']
|
hosts = ['foo', 'bar', 'baz']
|
||||||
ring = hash.HashRing(hosts, replicas=3)
|
ring = hash_ring.HashRing(hosts, replicas=3)
|
||||||
self.assertEqual(['foo', 'bar', 'baz'], ring.get_hosts('fake'))
|
self.assertEqual(['foo', 'bar', 'baz'], ring.get_hosts('fake'))
|
||||||
self.assertEqual(['bar', 'baz', 'foo'], ring.get_hosts('fake-again'))
|
self.assertEqual(['bar', 'baz', 'foo'], ring.get_hosts('fake-again'))
|
||||||
|
|
||||||
def test_ignore_hosts(self):
|
def test_ignore_hosts(self):
|
||||||
hosts = ['foo', 'bar', 'baz']
|
hosts = ['foo', 'bar', 'baz']
|
||||||
ring = hash.HashRing(hosts, replicas=1)
|
ring = hash_ring.HashRing(hosts, replicas=1)
|
||||||
self.assertEqual(['bar'], ring.get_hosts('fake',
|
self.assertEqual(['bar'], ring.get_hosts('fake',
|
||||||
ignore_hosts=['foo']))
|
ignore_hosts=['foo']))
|
||||||
self.assertEqual(['baz'], ring.get_hosts('fake',
|
self.assertEqual(['baz'], ring.get_hosts('fake',
|
||||||
|
@ -84,7 +84,7 @@ class HashRingTestCase(base.TestCase):
|
||||||
|
|
||||||
def test_ignore_hosts_with_replicas(self):
|
def test_ignore_hosts_with_replicas(self):
|
||||||
hosts = ['foo', 'bar', 'baz']
|
hosts = ['foo', 'bar', 'baz']
|
||||||
ring = hash.HashRing(hosts, replicas=2)
|
ring = hash_ring.HashRing(hosts, replicas=2)
|
||||||
self.assertEqual(['bar', 'baz'], ring.get_hosts('fake',
|
self.assertEqual(['bar', 'baz'], ring.get_hosts('fake',
|
||||||
ignore_hosts=['foo']))
|
ignore_hosts=['foo']))
|
||||||
self.assertEqual(['baz'], ring.get_hosts('fake',
|
self.assertEqual(['baz'], ring.get_hosts('fake',
|
||||||
|
@ -98,24 +98,24 @@ class HashRingTestCase(base.TestCase):
|
||||||
|
|
||||||
def test_more_replicas_than_hosts(self):
|
def test_more_replicas_than_hosts(self):
|
||||||
hosts = ['foo', 'bar']
|
hosts = ['foo', 'bar']
|
||||||
ring = hash.HashRing(hosts, replicas=10)
|
ring = hash_ring.HashRing(hosts, replicas=10)
|
||||||
self.assertEqual(hosts, ring.get_hosts('fake'))
|
self.assertEqual(hosts, ring.get_hosts('fake'))
|
||||||
|
|
||||||
def test_ignore_non_existent_host(self):
|
def test_ignore_non_existent_host(self):
|
||||||
hosts = ['foo', 'bar']
|
hosts = ['foo', 'bar']
|
||||||
ring = hash.HashRing(hosts, replicas=1)
|
ring = hash_ring.HashRing(hosts, replicas=1)
|
||||||
self.assertEqual(['foo'], ring.get_hosts('fake',
|
self.assertEqual(['foo'], ring.get_hosts('fake',
|
||||||
ignore_hosts=['baz']))
|
ignore_hosts=['baz']))
|
||||||
|
|
||||||
def test_create_ring_invalid_data(self):
|
def test_create_ring_invalid_data(self):
|
||||||
hosts = None
|
hosts = None
|
||||||
self.assertRaises(exception.Invalid,
|
self.assertRaises(exception.Invalid,
|
||||||
hash.HashRing,
|
hash_ring.HashRing,
|
||||||
hosts)
|
hosts)
|
||||||
|
|
||||||
def test_get_hosts_invalid_data(self):
|
def test_get_hosts_invalid_data(self):
|
||||||
hosts = ['foo', 'bar']
|
hosts = ['foo', 'bar']
|
||||||
ring = hash.HashRing(hosts)
|
ring = hash_ring.HashRing(hosts)
|
||||||
self.assertRaises(exception.Invalid,
|
self.assertRaises(exception.Invalid,
|
||||||
ring.get_hosts,
|
ring.get_hosts,
|
||||||
None)
|
None)
|
||||||
|
@ -125,7 +125,7 @@ class HashRingManagerTestCase(db_base.DbTestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(HashRingManagerTestCase, self).setUp()
|
super(HashRingManagerTestCase, self).setUp()
|
||||||
self.ring_manager = hash.HashRingManager()
|
self.ring_manager = hash_ring.HashRingManager()
|
||||||
self.dbapi = dbapi.get_instance()
|
self.dbapi = dbapi.get_instance()
|
||||||
|
|
||||||
def register_conductors(self):
|
def register_conductors(self):
|
||||||
|
@ -140,13 +140,13 @@ class HashRingManagerTestCase(db_base.DbTestCase):
|
||||||
|
|
||||||
def test_hash_ring_manager_get_ring_success(self):
|
def test_hash_ring_manager_get_ring_success(self):
|
||||||
self.register_conductors()
|
self.register_conductors()
|
||||||
ring = self.ring_manager.get_hash_ring('driver1')
|
ring = self.ring_manager['driver1']
|
||||||
self.assertEqual(sorted(['host1', 'host2']), sorted(ring.hosts))
|
self.assertEqual(sorted(['host1', 'host2']), sorted(ring.hosts))
|
||||||
|
|
||||||
def test_hash_ring_manager_driver_not_found(self):
|
def test_hash_ring_manager_driver_not_found(self):
|
||||||
self.register_conductors()
|
self.register_conductors()
|
||||||
self.assertRaises(exception.DriverNotFound,
|
self.assertRaises(exception.DriverNotFound,
|
||||||
self.ring_manager.get_hash_ring,
|
self.ring_manager.__getitem__,
|
||||||
'driver3')
|
'driver3')
|
||||||
|
|
||||||
def test_hash_ring_manager_no_refresh(self):
|
def test_hash_ring_manager_no_refresh(self):
|
||||||
|
@ -154,9 +154,9 @@ class HashRingManagerTestCase(db_base.DbTestCase):
|
||||||
# initialized, it won't be seen. Long term this is probably
|
# initialized, it won't be seen. Long term this is probably
|
||||||
# undesirable, but today is the intended behavior.
|
# undesirable, but today is the intended behavior.
|
||||||
self.assertRaises(exception.DriverNotFound,
|
self.assertRaises(exception.DriverNotFound,
|
||||||
self.ring_manager.get_hash_ring,
|
self.ring_manager.__getitem__,
|
||||||
'driver1')
|
'driver1')
|
||||||
self.register_conductors()
|
self.register_conductors()
|
||||||
self.assertRaises(exception.DriverNotFound,
|
self.assertRaises(exception.DriverNotFound,
|
||||||
self.ring_manager.get_hash_ring,
|
self.ring_manager.__getitem__,
|
||||||
'driver1')
|
'driver1')
|
||||||
|
|
Loading…
Reference in New Issue