Add "affinity" tracking to nodes and conductors

Add "conductor_affinity" column to nodes table, containing a reference to the
`id` of the conductor service (not its hostname) that has most recently
performed some action which could require local state to be maintained
(eg, built a PXE config, or started a SOL session).

Using the `id` as a foreign key necessitates not deleting conductors
when unregistering them, but instead marking them offline. This also
helps in determining if a conductor service was only restarted (though
this patch does not implement graceful shutdown).

Thus, this patch also adds an "online" boolean column to the conductors
table to track whether a conductor is on- or offline, and updates
the register and unregister methods to use that field transparently.
It may be noted that this does not change the behavior of
register_conductor or unregister_conductor, though an optional
"update_existing" parameter has been added to register_conductor. This
replaces a DELETE query with an UPDATE query instead.

Co-Authored-By: David Shrewsbury <shrewsbury.dave@gmail.com>
Co-Authored-By: Lucas Alvares Gomes <lucasagomes@gmail.com>

Related-bug: #1279331
Change-Id: I8e8b5cc00fc9f565ad2fb442e9a26077342e0a25
This commit is contained in:
Devananda van der Veen 2014-09-26 11:35:38 -07:00
parent 9de2d21a96
commit d6a277130b
11 changed files with 168 additions and 40 deletions

View File

@ -432,6 +432,9 @@ class Node(base.APIBase):
ports = wsme.wsattr([link.Link], readonly=True)
"Links to the collection of ports on this node"
# NOTE(deva): "conductor_affinity" shouldn't be presented on the
# API because it's an internal value. Don't add it here.
def __init__(self, **kwargs):
self.fields = []
fields = objects.Node.fields.keys()

View File

@ -187,15 +187,19 @@ class ConductorManager(periodic_task.PeriodicTasks):
"""List of driver names which this conductor supports."""
try:
self.dbapi.register_conductor({'hostname': self.host,
'drivers': self.drivers})
# Register this conductor with the cluster
cdr = self.dbapi.register_conductor({'hostname': self.host,
'drivers': self.drivers})
except exception.ConductorAlreadyRegistered:
# This conductor was already registered and did not shut down
# properly, so log a warning and update the record.
LOG.warn(_LW("A conductor with hostname %(hostname)s "
"was previously registered. Updating registration"),
{'hostname': self.host})
self.dbapi.unregister_conductor(self.host)
self.dbapi.register_conductor({'hostname': self.host,
'drivers': self.drivers})
cdr = self.dbapi.register_conductor({'hostname': self.host,
'drivers': self.drivers},
update_existing=True)
self.conductor = cdr
self.ring_manager = hash.HashRingManager()
"""Consistent hash ring which maps drivers to conductors."""
@ -219,6 +223,8 @@ class ConductorManager(periodic_task.PeriodicTasks):
def del_host(self):
self._keepalive_evt.set()
try:
# Inform the cluster that this conductor is shutting down.
# Note that rebalancing won't begin until after heartbeat timeout.
self.dbapi.unregister_conductor(self.host)
LOG.info(_LI('Successfully stopped conductor with hostname '
'%(hostname)s.'),
@ -428,6 +434,8 @@ class ConductorManager(periodic_task.PeriodicTasks):
"""
if isinstance(e, exception.NoFreeConductorWorker):
# NOTE(deva): there is no need to clear conductor_affinity
# because it isn't updated on a failed deploy
node.provision_state = provision_state
node.target_provision_state = target_provision_state
node.last_error = (_("No free conductor workers available"))
@ -525,6 +533,10 @@ class ConductorManager(periodic_task.PeriodicTasks):
try:
task.driver.deploy.prepare(task)
new_state = task.driver.deploy.deploy(task)
# Update conductor_affinity to reference this conductor's ID
# since there may be local persistent state
node.conductor_affinity = self.conductor.id
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.warning(_LW('Error in deploy of node %(node)s: %(err)s'),
@ -532,6 +544,7 @@ class ConductorManager(periodic_task.PeriodicTasks):
node.last_error = _("Failed to deploy. Error: %s") % e
node.provision_state = states.DEPLOYFAIL
node.target_provision_state = states.NOSTATE
# NOTE(deva): there is no need to clear conductor_affinity
else:
# NOTE(deva): Some drivers may return states.DEPLOYWAIT
# eg. if they are waiting for a callback
@ -632,7 +645,10 @@ class ConductorManager(periodic_task.PeriodicTasks):
else:
node.provision_state = new_state
finally:
# Clean the instance_info
# NOTE(deva): there is no need to unset conductor_affinity
# because it is a reference to the most recent conductor which
# deployed a node, and does not limit any future actions.
# But we do need to clear the instance_info
node.instance_info = {}
node.save()
@ -869,7 +885,7 @@ class ConductorManager(periodic_task.PeriodicTasks):
except exception.DriverNotFound:
return False
return self.host == ring.get_hosts(node_uuid)[0]
return self.host in ring.get_hosts(node_uuid)
@messaging.expected_exceptions(exception.NodeLocked)
def validate_driver_interfaces(self, context, node_id):
@ -1060,6 +1076,9 @@ class ConductorManager(periodic_task.PeriodicTasks):
try:
if enabled:
task.driver.console.start_console(task)
# TODO(deva): We should be updating conductor_affinity here
# but there is no support for console sessions in
# take_over() right now.
else:
task.driver.console.stop_console(task)
except Exception as e:

View File

@ -317,8 +317,8 @@ class Connection(object):
"""
@abc.abstractmethod
def register_conductor(self, values):
"""Register a new conductor service at the specified hostname.
def register_conductor(self, values, update_existing=False):
"""Register an active conductor with the cluster.
:param values: A dict of values which must contain the following:
{
@ -326,13 +326,17 @@ class Connection(object):
this Conductor service.
'drivers': a list of supported drivers.
}
:param update_existing: When false, registration will raise an
exception when a conflicting online record
is found. When true, will overwrite the
existing record. Default: False.
:returns: A conductor.
:raises: ConductorAlreadyRegistered
"""
@abc.abstractmethod
def get_conductor(self, hostname):
"""Retrieve a conductor service record from the database.
"""Retrieve a conductor's service record from the database.
:param hostname: The hostname of the conductor service.
:returns: A conductor.
@ -341,7 +345,7 @@ class Connection(object):
@abc.abstractmethod
def unregister_conductor(self, hostname):
"""Unregister this conductor with the service registry.
"""Remove this conductor from the service registry immediately.
:param hostname: The hostname of this conductor service.
:raises: ConductorNotFound

View File

@ -0,0 +1,45 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""add conductor_affinity and online
Revision ID: 487deb87cc9d
Revises: 3bea56f25597
Create Date: 2014-09-26 16:16:30.988900
"""
# revision identifiers, used by Alembic.
revision = '487deb87cc9d'
down_revision = '3bea56f25597'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column(
'conductors',
sa.Column('online', sa.Boolean(), default=True))
op.add_column(
'nodes',
sa.Column('conductor_affinity', sa.Integer(),
sa.ForeignKey('conductors.id',
name='nodes_conductor_affinity_fk'),
nullable=True))
def downgrade():
op.drop_constraint('nodes_conductor_affinity_fk', 'nodes',
type_='foreignkey')
op.drop_column('nodes', 'conductor_affinity')
op.drop_column('conductors', 'online')

View File

@ -502,23 +502,31 @@ class Connection(api.Connection):
if count != 1:
raise exception.ChassisNotFound(chassis=chassis_id)
def register_conductor(self, values):
try:
conductor = models.Conductor()
conductor.update(values)
# NOTE(deva): ensure updated_at field has a non-null initial value
if not conductor.get('updated_at'):
conductor.update({'updated_at': timeutils.utcnow()})
conductor.save()
return conductor
except db_exc.DBDuplicateEntry:
raise exception.ConductorAlreadyRegistered(
conductor=values['hostname'])
def register_conductor(self, values, update_existing=False):
session = get_session()
with session.begin():
query = model_query(models.Conductor, session=session).\
filter_by(hostname=values['hostname'])
try:
ref = query.one()
if ref.online is True and not update_existing:
raise exception.ConductorAlreadyRegistered(
conductor=values['hostname'])
except NoResultFound:
ref = models.Conductor()
ref.update(values)
# always set online and updated_at fields when registering
# a conductor, especially when updating an existing one
ref.update({'updated_at': timeutils.utcnow(),
'online': True})
ref.save(session)
return ref
def get_conductor(self, hostname):
try:
return model_query(models.Conductor).\
filter_by(hostname=hostname).\
filter_by(hostname=hostname,
online=True).\
one()
except NoResultFound:
raise exception.ConductorNotFound(conductor=hostname)
@ -527,8 +535,9 @@ class Connection(api.Connection):
session = get_session()
with session.begin():
query = model_query(models.Conductor, session=session).\
filter_by(hostname=hostname)
count = query.delete()
filter_by(hostname=hostname,
online=True)
count = query.update({'online': False})
if count == 0:
raise exception.ConductorNotFound(conductor=hostname)
@ -538,7 +547,9 @@ class Connection(api.Connection):
query = model_query(models.Conductor, session=session).\
filter_by(hostname=hostname)
# since we're not changing any other field, manually set updated_at
count = query.update({'updated_at': timeutils.utcnow()})
# and since we're heartbeating, make sure that online=True
count = query.update({'updated_at': timeutils.utcnow(),
'online': True})
if count == 0:
raise exception.ConductorNotFound(conductor=hostname)
@ -548,6 +559,7 @@ class Connection(api.Connection):
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
result = model_query(models.Conductor).\
filter_by(online=True).\
filter(models.Conductor.updated_at >= limit).\
all()

View File

@ -135,6 +135,7 @@ class Conductor(Base):
id = Column(Integer, primary_key=True)
hostname = Column(String(255), nullable=False)
drivers = Column(JSONEncodedList)
online = Column(Boolean, default=True)
class Node(Base):
@ -163,7 +164,21 @@ class Node(Base):
properties = Column(JSONEncodedDict)
driver = Column(String(15))
driver_info = Column(JSONEncodedDict)
# NOTE(deva): this is the host name of the conductor which has
# acquired a TaskManager lock on the node.
# We should use an INT FK (conductors.id) in the future.
reservation = Column(String(255), nullable=True)
# NOTE(deva): this is the id of the last conductor which prepared local
# state for the node (eg, a PXE config file).
# When affinity and the hash ring's mapping do not match,
# this indicates that a conductor should rebuild local state.
conductor_affinity = Column(Integer,
ForeignKey('conductors.id',
name='nodes_conductor_affinity_fk'),
nullable=True)
maintenance = Column(Boolean, default=False)
console_enabled = Column(Boolean, default=False)
extra = Column(JSONEncodedDict)

View File

@ -29,7 +29,8 @@ class Node(base.IronicObject):
# Version 1.4: Add get_by_instance_uuid()
# Version 1.5: Add list()
# Version 1.6: Add reserve() and release()
VERSION = '1.6'
# Version 1.7: Add conductor_affinity
VERSION = '1.7'
dbapi = db_api.get_instance()
@ -46,6 +47,10 @@ class Node(base.IronicObject):
'instance_info': obj_utils.dict_or_none,
'properties': obj_utils.dict_or_none,
'reservation': obj_utils.str_or_none,
# a reference to the id of the conductor service, not its hostname,
# that has most recently performed some action which could require
# local state to be maintained (eg, built a PXE config)
'conductor_affinity': obj_utils.int_or_none,
# One of states.POWER_ON|POWER_OFF|NOSTATE|ERROR
'power_state': obj_utils.str_or_none,

View File

@ -695,6 +695,7 @@ class DoNodeDeployTearDownTestCase(_ServiceSetUpMixin,
@mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
def test__do_node_deploy_ok(self, mock_deploy):
self._start_service()
# test when driver.deploy.deploy returns DEPLOYDONE
mock_deploy.return_value = states.DEPLOYDONE
node = obj_utils.create_test_node(self.context, driver='fake',

View File

@ -78,5 +78,5 @@ class SqlAlchemyCustomTypesTestCase(base.DbTestCase):
def test_JSONEncodedList_type_check(self):
self.assertRaises(db_exc.DBError,
self.dbapi.register_conductor,
{'drivers':
{'this is not a list': 'test'}})
{'hostname': 'test_host3',
'drivers': {'this is not a list': 'test'}})

View File

@ -32,17 +32,23 @@ class DbConductorTestCase(base.DbTestCase):
super(DbConductorTestCase, self).setUp()
self.dbapi = dbapi.get_instance()
def test_register_conductor_existing_fails(self):
c = utils.get_test_conductor()
self.dbapi.register_conductor(c)
self.assertRaises(
exception.ConductorAlreadyRegistered,
self.dbapi.register_conductor,
c)
def test_register_conductor_override(self):
c = utils.get_test_conductor()
self.dbapi.register_conductor(c)
self.dbapi.register_conductor(c, update_existing=True)
def _create_test_cdr(self, **kwargs):
c = utils.get_test_conductor(**kwargs)
return self.dbapi.register_conductor(c)
def test_register_conductor(self):
self._create_test_cdr(id=1)
self.assertRaises(
exception.ConductorAlreadyRegistered,
self._create_test_cdr,
id=2)
def test_get_conductor(self):
c1 = self._create_test_cdr()
c2 = self.dbapi.get_conductor(c1.hostname)
@ -67,7 +73,7 @@ class DbConductorTestCase(base.DbTestCase):
def test_touch_conductor(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
c = self._create_test_cdr(updated_at=test_time)
c = self._create_test_cdr()
self.assertEqual(test_time, timeutils.normalize_time(c.updated_at))
test_time = datetime.datetime(2000, 1, 1, 0, 1)
@ -77,12 +83,26 @@ class DbConductorTestCase(base.DbTestCase):
self.assertEqual(test_time, timeutils.normalize_time(c.updated_at))
def test_touch_conductor_not_found(self):
# A conductor's heartbeat will not create a new record,
# it will only update existing ones
self._create_test_cdr()
self.assertRaises(
exception.ConductorNotFound,
self.dbapi.touch_conductor,
'bad-hostname')
def test_touch_offline_conductor(self):
# Ensure that a conductor's periodic heartbeat task can make the
# conductor visible again, even if it was spuriously marked offline
c = self._create_test_cdr()
self.dbapi.unregister_conductor(c.hostname)
self.assertRaises(
exception.ConductorNotFound,
self.dbapi.get_conductor,
c.hostname)
self.dbapi.touch_conductor(c.hostname)
self.dbapi.get_conductor(c.hostname)
@mock.patch.object(timeutils, 'utcnow')
def test_get_active_driver_dict_one_host_no_driver(self, mock_utcnow):
h = 'fake-host'

View File

@ -14,6 +14,9 @@
# under the License.
"""Ironic test utilities."""
from oslo.utils import timeutils
from ironic.common import states
@ -151,6 +154,7 @@ def get_test_node(**kw):
'id': kw.get('id', 123),
'uuid': kw.get('uuid', '1be26c0b-03f2-4d2e-ae87-c02d7f33c123'),
'chassis_id': kw.get('chassis_id', 42),
'conductor_affinity': kw.get('conductor_affinity', None),
'power_state': kw.get('power_state', states.NOSTATE),
'target_power_state': kw.get('target_power_state', states.NOSTATE),
'provision_state': kw.get('provision_state', states.NOSTATE),
@ -200,6 +204,6 @@ def get_test_conductor(**kw):
'id': kw.get('id', 6),
'hostname': kw.get('hostname', 'test-conductor-node'),
'drivers': kw.get('drivers', ['fake-driver', 'null-driver']),
'created_at': kw.get('created_at'),
'updated_at': kw.get('updated_at'),
'created_at': kw.get('created_at', timeutils.utcnow()),
'updated_at': kw.get('updated_at', timeutils.utcnow()),
}