Merge "Extend the Conductor RPC object"

This commit is contained in:
Jenkins 2016-03-18 14:39:16 +00:00 committed by Gerrit Code Review
commit 8e81b964a5
4 changed files with 75 additions and 10 deletions

View File

@ -36,6 +36,7 @@ from ironic.common import rpc
from ironic.common import states
from ironic.conductor import task_manager
from ironic.db import api as dbapi
from ironic import objects
conductor_opts = [
@ -141,8 +142,8 @@ class BaseConductorManager(object):
self.dbapi.clear_node_reservations_for_conductor(self.host)
try:
# Register this conductor with the cluster
cdr = self.dbapi.register_conductor({'hostname': self.host,
'drivers': driver_names})
self.conductor = objects.Conductor.register(
admin_context, self.host, driver_names)
except exception.ConductorAlreadyRegistered:
# This conductor was already registered and did not shut down
# properly, so log a warning and update the record.
@ -150,10 +151,8 @@ class BaseConductorManager(object):
_LW("A conductor with hostname %(hostname)s "
"was previously registered. Updating registration"),
{'hostname': self.host})
cdr = self.dbapi.register_conductor({'hostname': self.host,
'drivers': driver_names},
update_existing=True)
self.conductor = cdr
self.conductor = objects.Conductor.register(
admin_context, self.host, driver_names, update_existing=True)
# Start periodic tasks
self._periodic_tasks_worker = self._executor.submit(
@ -201,7 +200,7 @@ class BaseConductorManager(object):
# Inform the cluster that this conductor is shutting down.
# Note that rebalancing will not occur immediately, but when
# the periodic sync takes place.
self.dbapi.unregister_conductor(self.host)
self.conductor.unregister()
LOG.info(_LI('Successfully stopped conductor with hostname '
'%(hostname)s.'),
{'hostname': self.host})
@ -283,7 +282,7 @@ class BaseConductorManager(object):
def _conductor_service_record_keepalive(self):
while not self._keepalive_evt.is_set():
try:
self.dbapi.touch_conductor(self.host)
self.conductor.touch()
except db_exception.DBConnectionError:
LOG.warning(_LW('Conductor could not connect to database '
'while heartbeating.'))

View File

@ -24,6 +24,10 @@ from ironic.objects import fields as object_fields
@base.IronicObjectRegistry.register
class Conductor(base.IronicObject, object_base.VersionedObjectDictCompat):
# Version 1.0: Initial version
# Version 1.1: Add register() and unregister(), make the context parameter
# to touch() optional.
VERSION = '1.1'
dbapi = db_api.get_instance()
@ -79,6 +83,37 @@ class Conductor(base.IronicObject, object_base.VersionedObjectDictCompat):
# methods can be used in the future to replace current explicit RPC calls.
# Implications of calling new remote procedures should be thought through.
# @object_base.remotable
def touch(self, context):
def touch(self, context=None):
"""Touch this conductor's DB record, marking it as up-to-date."""
self.dbapi.touch_conductor(self.hostname)
# NOTE(xek): We don't want to enable RPC on this call just yet. Remotable
# methods can be used in the future to replace current explicit RPC calls.
# Implications of calling new remote procedures should be thought through.
# @object_base.remotable
@classmethod
def register(cls, context, hostname, drivers, update_existing=False):
"""Register an active conductor with the cluster.
:param hostname: the hostname on which the conductor will run
:param drivers: the list of drivers enabled in the conductor
:param update_existing: When false, registration will raise an
exception when a conflicting online record
is found. When true, will overwrite the
existing record. Default: False.
:raises: ConductorAlreadyRegistered
:returns: a :class:`Conductor` object.
"""
db_cond = cls.dbapi.register_conductor({'hostname': hostname,
'drivers': drivers},
update_existing=update_existing)
return Conductor._from_db_object(cls(context), db_cond)
# NOTE(xek): We don't want to enable RPC on this call just yet. Remotable
# methods can be used in the future to replace current explicit RPC calls.
# Implications of calling new remote procedures should be thought through.
# @object_base.remotable
def unregister(self, context=None):
"""Remove this conductor from the service registry."""
self.dbapi.unregister_conductor(self.hostname)

View File

@ -85,3 +85,34 @@ class TestConductorObject(base.DbTestCase):
c.updated_at)
self.assertEqual(expected, mock_get_cdr.call_args_list)
self.assertEqual(self.context, c._context)
def _test_register(self, update_existing=False):
host = self.fake_conductor['hostname']
drivers = self.fake_conductor['drivers']
with mock.patch.object(self.dbapi, 'register_conductor',
autospec=True) as mock_register_cdr:
mock_register_cdr.return_value = self.fake_conductor
c = objects.Conductor.register(self.context, host, drivers,
update_existing=update_existing)
self.assertIsInstance(c, objects.Conductor)
mock_register_cdr.assert_called_once_with(
{'drivers': drivers, 'hostname': host},
update_existing=update_existing)
def test_register(self):
self._test_register()
def test_register_update_existing_true(self):
self._test_register(update_existing=True)
def test_unregister(self):
host = self.fake_conductor['hostname']
with mock.patch.object(self.dbapi, 'get_conductor',
autospec=True) as mock_get_cdr:
with mock.patch.object(self.dbapi, 'unregister_conductor',
autospec=True) as mock_unregister_cdr:
mock_get_cdr.return_value = self.fake_conductor
c = objects.Conductor.get_by_hostname(self.context, host)
c.unregister()
mock_unregister_cdr.assert_called_once_with(host)

View File

@ -409,7 +409,7 @@ expected_object_fingerprints = {
'Chassis': '1.3-d656e039fd8ae9f34efc232ab3980905',
'Port': '1.5-a224755c3da5bc5cf1a14a11c0d00f3f',
'Portgroup': '1.0-1ac4db8fa31edd9e1637248ada4c25a1',
'Conductor': '1.0-5091f249719d4a465062a1b3dc7f860d'
'Conductor': '1.1-5091f249719d4a465062a1b3dc7f860d'
}