Factor hash ring management out of the conductor

This patch pulls some logic around querying and instantiating hash
rings out of the ConductorManager into a new HashRingManager.

Change-Id: I523cb77aae287c44f59b40783e54f6557bc924bc
This commit is contained in:
Russell Haering 2014-03-25 12:50:53 -07:00
parent 415c5619c2
commit f0adfecd95
5 changed files with 100 additions and 24 deletions

View File

@ -16,10 +16,12 @@
import array
import hashlib
import struct
import threading
from oslo.config import cfg
from ironic.common import exception
from ironic.db import api as dbapi
hash_opts = [
cfg.IntOpt('hash_partition_exponent',
@ -108,3 +110,36 @@ class HashRing(object):
partition = 0
host_ids.append(self.part2host[partition])
return [self.hosts[h] for h in host_ids]
class HashRingManager(object):
def __init__(self):
self._lock = threading.Lock()
self.dbapi = dbapi.get_instance()
self.hash_rings = None
def _load_hash_rings(self):
rings = {}
d2c = self.dbapi.get_active_driver_dict()
for driver_name, hosts in d2c.iteritems():
rings[driver_name] = HashRing(hosts)
return rings
def _ensure_rings_fresh(self):
# Hot path, no lock
# TODO(russell_h): Consider adding time-based invalidation of rings
if self.hash_rings is not None:
return
with self._lock:
if self.hash_rings is None:
self.hash_rings = self._load_hash_rings()
def get_hash_ring(self, driver_name):
self._ensure_rings_fresh()
try:
return self.hash_rings[driver_name]
except KeyError:
raise exception.DriverNotFound(driver_name=driver_name)

View File

@ -148,7 +148,7 @@ class ConductorManager(service.PeriodicService):
self.dbapi.register_conductor({'hostname': self.host,
'drivers': self.drivers})
self.driver_rings = self._get_current_driver_rings()
self.ring_manager = hash.HashRingManager()
"""Consistent hash ring which maps drivers to conductors."""
self._worker_pool = greenpool.GreenPool(size=CONF.rpc_thread_pool_size)
@ -617,16 +617,6 @@ class ConductorManager(service.PeriodicService):
except exception.NoFreeConductorWorker:
task.release_resources()
def _get_current_driver_rings(self):
"""Build the current hash ring for this ConductorManager's drivers."""
ring = {}
d2c = self.dbapi.get_active_driver_dict()
for driver in self.drivers:
ring[driver] = hash.HashRing(d2c[driver])
return ring
def rebalance_node_ring(self):
"""Perform any actions necessary when rebalancing the consistent hash.
@ -638,11 +628,19 @@ class ConductorManager(service.PeriodicService):
pass
def _mapped_to_this_conductor(self, node_uuid, driver):
"""Check that node is mapped to this conductor."""
if driver in self.drivers:
mapped_hosts = self.driver_rings[driver].get_hosts(node_uuid)
return self.host == mapped_hosts[0]
return False
"""Check that node is mapped to this conductor.
Note that because mappings are eventually consistent, it is possible
for two conductors to simultaneously believe that a node is mapped to
them. Any operation that depends on exclusive control of a node should
take out a lock.
"""
try:
ring = self.ring_manager.get_hash_ring(driver)
except exception.DriverNotFound:
return False
return self.host == ring.get_hosts(node_uuid)[0]
@messaging.client_exceptions(exception.NodeLocked)
def validate_driver_interfaces(self, context, node_id):

View File

@ -23,7 +23,6 @@ from oslo.config import cfg
from ironic.common import exception
from ironic.common import hash_ring as hash
from ironic.conductor import manager
from ironic.db import api as dbapi
from ironic.objects import base as objects_base
import ironic.openstack.common.rpc.proxy
@ -69,11 +68,7 @@ class ConductorAPI(ironic.openstack.common.rpc.proxy.RpcProxy):
if topic is None:
topic = manager.MANAGER_TOPIC
# Initialize consistent hash ring
self.hash_rings = {}
d2c = dbapi.get_instance().get_active_driver_dict()
for driver in d2c.keys():
self.hash_rings[driver] = hash.HashRing(d2c[driver])
self.ring_manager = hash.HashRingManager()
super(ConductorAPI, self).__init__(
topic=topic,
@ -90,10 +85,10 @@ class ConductorAPI(ironic.openstack.common.rpc.proxy.RpcProxy):
"""
try:
ring = self.hash_rings[node.driver]
ring = self.ring_manager.get_hash_ring(node.driver)
dest = ring.get_hosts(node.uuid)
return self.topic + "." + dest[0]
except KeyError:
except exception.DriverNotFound:
reason = (_('No conductor service registered which supports '
'driver %s.') % node.driver)
raise exception.NoValidHost(reason=reason)

View File

@ -1027,6 +1027,7 @@ class ManagerTestCase(base.DbTestCase):
provision_updated_at=past)
node = self.dbapi.create_node(n)
mock_utcnow.return_value = present
self.service._conductor_service_record_keepalive(self.context)
with mock.patch.object(self.driver.deploy, 'clean_up') as clean_mock:
self.service._check_deploy_timeouts(self.context)
self.service._worker_pool.waitall()
@ -1048,6 +1049,7 @@ class ManagerTestCase(base.DbTestCase):
provision_updated_at=past)
node = self.dbapi.create_node(n)
mock_utcnow.return_value = present
self.service._conductor_service_record_keepalive(self.context)
with mock.patch.object(self.driver.deploy, 'clean_up') as clean_mock:
self.service._check_deploy_timeouts(self.context)
node.refresh(self.context)
@ -1075,6 +1077,7 @@ class ManagerTestCase(base.DbTestCase):
provision_updated_at=past)
node = self.dbapi.create_node(n)
mock_utcnow.return_value = present
self.service._conductor_service_record_keepalive(self.context)
with mock.patch.object(self.driver.deploy, 'clean_up') as clean_mock:
error = 'test-123'
clean_mock.side_effect = exception.IronicException(message=error)

View File

@ -17,7 +17,10 @@ from oslo.config import cfg
from ironic.common import exception
from ironic.common import hash_ring as hash
from ironic.db import api as dbapi
from ironic.openstack.common import context
from ironic.tests import base
from ironic.tests.db import base as db_base
CONF = cfg.CONF
@ -117,3 +120,45 @@ class HashRingTestCase(base.TestCase):
self.assertRaises(exception.Invalid,
ring.get_hosts,
None)
class HashRingManagerTestCase(db_base.DbTestCase):
def setUp(self):
super(HashRingManagerTestCase, self).setUp()
self.ring_manager = hash.HashRingManager()
self.context = context.get_admin_context()
self.dbapi = dbapi.get_instance()
def register_conductors(self):
self.dbapi.register_conductor({
'hostname': 'host1',
'drivers': ['driver1', 'driver2'],
})
self.dbapi.register_conductor({
'hostname': 'host2',
'drivers': ['driver1'],
})
def test_hash_ring_manager_get_ring_success(self):
self.register_conductors()
ring = self.ring_manager.get_hash_ring('driver1')
self.assertEqual(sorted(['host1', 'host2']), sorted(ring.hosts))
def test_hash_ring_manager_driver_not_found(self):
self.register_conductors()
self.assertRaises(exception.DriverNotFound,
self.ring_manager.get_hash_ring,
'driver3')
def test_hash_ring_manager_no_refresh(self):
# If a new conductor is registered after the ring manager is
# initialized, it won't be seen. Long term this is probably
# undesirable, but today is the intended behavior.
self.assertRaises(exception.DriverNotFound,
self.ring_manager.get_hash_ring,
'driver1')
self.register_conductors()
self.assertRaises(exception.DriverNotFound,
self.ring_manager.get_hash_ring,
'driver1')