From d6a277130b72534e499c64e3e468ec5e572fcc4d Mon Sep 17 00:00:00 2001 From: Devananda van der Veen Date: Fri, 26 Sep 2014 11:35:38 -0700 Subject: [PATCH] Add "affinity" tracking to nodes and conductors Add "conductor_affinity" column to nodes table, containing a reference to the `id` of the conductor service (not its hostname) that has most recently performed some action which could require local state to be maintained (eg, built a PXE config, or started a SOL session). Using the `id` as a foreign key necessitates not deleting conductors when unregistering them, but instead marking them offline. This also helps in determining if a conductor service was only restarted (though this patch does not implement graceful shutdown). Thus, this patch also adds an "online" boolean column to the conductors table to track whether a conductor is on- or offline, and updates the register and unregister methods to use that field transparently. It may be noted that this does not change the behavior of register_conductor or unregister_conductor, though an optional "update_existing" parameter has been added to register_conductor. This replaces a DELETE query with an UPDATE query instead. Co-Authored-By: David Shrewsbury Co-Authored-By: Lucas Alvares Gomes Related-bug: #1279331 Change-Id: I8e8b5cc00fc9f565ad2fb442e9a26077342e0a25 --- ironic/api/controllers/v1/node.py | 3 ++ ironic/conductor/manager.py | 33 +++++++++++--- ironic/db/api.py | 12 +++-- ...7cc9d_add_conductor_affinity_and_online.py | 45 +++++++++++++++++++ ironic/db/sqlalchemy/api.py | 44 +++++++++++------- ironic/db/sqlalchemy/models.py | 15 +++++++ ironic/objects/node.py | 7 ++- ironic/tests/conductor/test_manager.py | 1 + ironic/tests/db/sqlalchemy/test_types.py | 4 +- ironic/tests/db/test_conductor.py | 36 +++++++++++---- ironic/tests/db/utils.py | 8 +++- 11 files changed, 168 insertions(+), 40 deletions(-) create mode 100644 ironic/db/sqlalchemy/alembic/versions/487deb87cc9d_add_conductor_affinity_and_online.py diff --git a/ironic/api/controllers/v1/node.py b/ironic/api/controllers/v1/node.py index e587ce351a..91041d2e0f 100644 --- a/ironic/api/controllers/v1/node.py +++ b/ironic/api/controllers/v1/node.py @@ -432,6 +432,9 @@ class Node(base.APIBase): ports = wsme.wsattr([link.Link], readonly=True) "Links to the collection of ports on this node" + # NOTE(deva): "conductor_affinity" shouldn't be presented on the + # API because it's an internal value. Don't add it here. + def __init__(self, **kwargs): self.fields = [] fields = objects.Node.fields.keys() diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index e9439fb4b0..c872f8966a 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -187,15 +187,19 @@ class ConductorManager(periodic_task.PeriodicTasks): """List of driver names which this conductor supports.""" try: - self.dbapi.register_conductor({'hostname': self.host, - 'drivers': self.drivers}) + # Register this conductor with the cluster + cdr = self.dbapi.register_conductor({'hostname': self.host, + 'drivers': self.drivers}) except exception.ConductorAlreadyRegistered: + # This conductor was already registered and did not shut down + # properly, so log a warning and update the record. LOG.warn(_LW("A conductor with hostname %(hostname)s " "was previously registered. Updating registration"), {'hostname': self.host}) - self.dbapi.unregister_conductor(self.host) - self.dbapi.register_conductor({'hostname': self.host, - 'drivers': self.drivers}) + cdr = self.dbapi.register_conductor({'hostname': self.host, + 'drivers': self.drivers}, + update_existing=True) + self.conductor = cdr self.ring_manager = hash.HashRingManager() """Consistent hash ring which maps drivers to conductors.""" @@ -219,6 +223,8 @@ class ConductorManager(periodic_task.PeriodicTasks): def del_host(self): self._keepalive_evt.set() try: + # Inform the cluster that this conductor is shutting down. + # Note that rebalancing won't begin until after heartbeat timeout. self.dbapi.unregister_conductor(self.host) LOG.info(_LI('Successfully stopped conductor with hostname ' '%(hostname)s.'), @@ -428,6 +434,8 @@ class ConductorManager(periodic_task.PeriodicTasks): """ if isinstance(e, exception.NoFreeConductorWorker): + # NOTE(deva): there is no need to clear conductor_affinity + # because it isn't updated on a failed deploy node.provision_state = provision_state node.target_provision_state = target_provision_state node.last_error = (_("No free conductor workers available")) @@ -525,6 +533,10 @@ class ConductorManager(periodic_task.PeriodicTasks): try: task.driver.deploy.prepare(task) new_state = task.driver.deploy.deploy(task) + + # Update conductor_affinity to reference this conductor's ID + # since there may be local persistent state + node.conductor_affinity = self.conductor.id except Exception as e: with excutils.save_and_reraise_exception(): LOG.warning(_LW('Error in deploy of node %(node)s: %(err)s'), @@ -532,6 +544,7 @@ class ConductorManager(periodic_task.PeriodicTasks): node.last_error = _("Failed to deploy. Error: %s") % e node.provision_state = states.DEPLOYFAIL node.target_provision_state = states.NOSTATE + # NOTE(deva): there is no need to clear conductor_affinity else: # NOTE(deva): Some drivers may return states.DEPLOYWAIT # eg. if they are waiting for a callback @@ -632,7 +645,10 @@ class ConductorManager(periodic_task.PeriodicTasks): else: node.provision_state = new_state finally: - # Clean the instance_info + # NOTE(deva): there is no need to unset conductor_affinity + # because it is a reference to the most recent conductor which + # deployed a node, and does not limit any future actions. + # But we do need to clear the instance_info node.instance_info = {} node.save() @@ -869,7 +885,7 @@ class ConductorManager(periodic_task.PeriodicTasks): except exception.DriverNotFound: return False - return self.host == ring.get_hosts(node_uuid)[0] + return self.host in ring.get_hosts(node_uuid) @messaging.expected_exceptions(exception.NodeLocked) def validate_driver_interfaces(self, context, node_id): @@ -1060,6 +1076,9 @@ class ConductorManager(periodic_task.PeriodicTasks): try: if enabled: task.driver.console.start_console(task) + # TODO(deva): We should be updating conductor_affinity here + # but there is no support for console sessions in + # take_over() right now. else: task.driver.console.stop_console(task) except Exception as e: diff --git a/ironic/db/api.py b/ironic/db/api.py index 1b2f95bf9e..c038bb71df 100644 --- a/ironic/db/api.py +++ b/ironic/db/api.py @@ -317,8 +317,8 @@ class Connection(object): """ @abc.abstractmethod - def register_conductor(self, values): - """Register a new conductor service at the specified hostname. + def register_conductor(self, values, update_existing=False): + """Register an active conductor with the cluster. :param values: A dict of values which must contain the following: { @@ -326,13 +326,17 @@ class Connection(object): this Conductor service. 'drivers': a list of supported drivers. } + :param update_existing: When false, registration will raise an + exception when a conflicting online record + is found. When true, will overwrite the + existing record. Default: False. :returns: A conductor. :raises: ConductorAlreadyRegistered """ @abc.abstractmethod def get_conductor(self, hostname): - """Retrieve a conductor service record from the database. + """Retrieve a conductor's service record from the database. :param hostname: The hostname of the conductor service. :returns: A conductor. @@ -341,7 +345,7 @@ class Connection(object): @abc.abstractmethod def unregister_conductor(self, hostname): - """Unregister this conductor with the service registry. + """Remove this conductor from the service registry immediately. :param hostname: The hostname of this conductor service. :raises: ConductorNotFound diff --git a/ironic/db/sqlalchemy/alembic/versions/487deb87cc9d_add_conductor_affinity_and_online.py b/ironic/db/sqlalchemy/alembic/versions/487deb87cc9d_add_conductor_affinity_and_online.py new file mode 100644 index 0000000000..264aeea6dc --- /dev/null +++ b/ironic/db/sqlalchemy/alembic/versions/487deb87cc9d_add_conductor_affinity_and_online.py @@ -0,0 +1,45 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""add conductor_affinity and online + +Revision ID: 487deb87cc9d +Revises: 3bea56f25597 +Create Date: 2014-09-26 16:16:30.988900 + +""" + +# revision identifiers, used by Alembic. +revision = '487deb87cc9d' +down_revision = '3bea56f25597' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column( + 'conductors', + sa.Column('online', sa.Boolean(), default=True)) + op.add_column( + 'nodes', + sa.Column('conductor_affinity', sa.Integer(), + sa.ForeignKey('conductors.id', + name='nodes_conductor_affinity_fk'), + nullable=True)) + + +def downgrade(): + op.drop_constraint('nodes_conductor_affinity_fk', 'nodes', + type_='foreignkey') + op.drop_column('nodes', 'conductor_affinity') + op.drop_column('conductors', 'online') diff --git a/ironic/db/sqlalchemy/api.py b/ironic/db/sqlalchemy/api.py index 3a74605f80..d01b2359df 100644 --- a/ironic/db/sqlalchemy/api.py +++ b/ironic/db/sqlalchemy/api.py @@ -502,23 +502,31 @@ class Connection(api.Connection): if count != 1: raise exception.ChassisNotFound(chassis=chassis_id) - def register_conductor(self, values): - try: - conductor = models.Conductor() - conductor.update(values) - # NOTE(deva): ensure updated_at field has a non-null initial value - if not conductor.get('updated_at'): - conductor.update({'updated_at': timeutils.utcnow()}) - conductor.save() - return conductor - except db_exc.DBDuplicateEntry: - raise exception.ConductorAlreadyRegistered( - conductor=values['hostname']) + def register_conductor(self, values, update_existing=False): + session = get_session() + with session.begin(): + query = model_query(models.Conductor, session=session).\ + filter_by(hostname=values['hostname']) + try: + ref = query.one() + if ref.online is True and not update_existing: + raise exception.ConductorAlreadyRegistered( + conductor=values['hostname']) + except NoResultFound: + ref = models.Conductor() + ref.update(values) + # always set online and updated_at fields when registering + # a conductor, especially when updating an existing one + ref.update({'updated_at': timeutils.utcnow(), + 'online': True}) + ref.save(session) + return ref def get_conductor(self, hostname): try: return model_query(models.Conductor).\ - filter_by(hostname=hostname).\ + filter_by(hostname=hostname, + online=True).\ one() except NoResultFound: raise exception.ConductorNotFound(conductor=hostname) @@ -527,8 +535,9 @@ class Connection(api.Connection): session = get_session() with session.begin(): query = model_query(models.Conductor, session=session).\ - filter_by(hostname=hostname) - count = query.delete() + filter_by(hostname=hostname, + online=True) + count = query.update({'online': False}) if count == 0: raise exception.ConductorNotFound(conductor=hostname) @@ -538,7 +547,9 @@ class Connection(api.Connection): query = model_query(models.Conductor, session=session).\ filter_by(hostname=hostname) # since we're not changing any other field, manually set updated_at - count = query.update({'updated_at': timeutils.utcnow()}) + # and since we're heartbeating, make sure that online=True + count = query.update({'updated_at': timeutils.utcnow(), + 'online': True}) if count == 0: raise exception.ConductorNotFound(conductor=hostname) @@ -548,6 +559,7 @@ class Connection(api.Connection): limit = timeutils.utcnow() - datetime.timedelta(seconds=interval) result = model_query(models.Conductor).\ + filter_by(online=True).\ filter(models.Conductor.updated_at >= limit).\ all() diff --git a/ironic/db/sqlalchemy/models.py b/ironic/db/sqlalchemy/models.py index 50cf9c885f..691214a84a 100644 --- a/ironic/db/sqlalchemy/models.py +++ b/ironic/db/sqlalchemy/models.py @@ -135,6 +135,7 @@ class Conductor(Base): id = Column(Integer, primary_key=True) hostname = Column(String(255), nullable=False) drivers = Column(JSONEncodedList) + online = Column(Boolean, default=True) class Node(Base): @@ -163,7 +164,21 @@ class Node(Base): properties = Column(JSONEncodedDict) driver = Column(String(15)) driver_info = Column(JSONEncodedDict) + + # NOTE(deva): this is the host name of the conductor which has + # acquired a TaskManager lock on the node. + # We should use an INT FK (conductors.id) in the future. reservation = Column(String(255), nullable=True) + + # NOTE(deva): this is the id of the last conductor which prepared local + # state for the node (eg, a PXE config file). + # When affinity and the hash ring's mapping do not match, + # this indicates that a conductor should rebuild local state. + conductor_affinity = Column(Integer, + ForeignKey('conductors.id', + name='nodes_conductor_affinity_fk'), + nullable=True) + maintenance = Column(Boolean, default=False) console_enabled = Column(Boolean, default=False) extra = Column(JSONEncodedDict) diff --git a/ironic/objects/node.py b/ironic/objects/node.py index df8911051d..abe6b47bc2 100644 --- a/ironic/objects/node.py +++ b/ironic/objects/node.py @@ -29,7 +29,8 @@ class Node(base.IronicObject): # Version 1.4: Add get_by_instance_uuid() # Version 1.5: Add list() # Version 1.6: Add reserve() and release() - VERSION = '1.6' + # Version 1.7: Add conductor_affinity + VERSION = '1.7' dbapi = db_api.get_instance() @@ -46,6 +47,10 @@ class Node(base.IronicObject): 'instance_info': obj_utils.dict_or_none, 'properties': obj_utils.dict_or_none, 'reservation': obj_utils.str_or_none, + # a reference to the id of the conductor service, not its hostname, + # that has most recently performed some action which could require + # local state to be maintained (eg, built a PXE config) + 'conductor_affinity': obj_utils.int_or_none, # One of states.POWER_ON|POWER_OFF|NOSTATE|ERROR 'power_state': obj_utils.str_or_none, diff --git a/ironic/tests/conductor/test_manager.py b/ironic/tests/conductor/test_manager.py index 22c8e59fe4..27bb106f12 100644 --- a/ironic/tests/conductor/test_manager.py +++ b/ironic/tests/conductor/test_manager.py @@ -695,6 +695,7 @@ class DoNodeDeployTearDownTestCase(_ServiceSetUpMixin, @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy') def test__do_node_deploy_ok(self, mock_deploy): + self._start_service() # test when driver.deploy.deploy returns DEPLOYDONE mock_deploy.return_value = states.DEPLOYDONE node = obj_utils.create_test_node(self.context, driver='fake', diff --git a/ironic/tests/db/sqlalchemy/test_types.py b/ironic/tests/db/sqlalchemy/test_types.py index d1e5a74084..71577e3ef3 100644 --- a/ironic/tests/db/sqlalchemy/test_types.py +++ b/ironic/tests/db/sqlalchemy/test_types.py @@ -78,5 +78,5 @@ class SqlAlchemyCustomTypesTestCase(base.DbTestCase): def test_JSONEncodedList_type_check(self): self.assertRaises(db_exc.DBError, self.dbapi.register_conductor, - {'drivers': - {'this is not a list': 'test'}}) + {'hostname': 'test_host3', + 'drivers': {'this is not a list': 'test'}}) diff --git a/ironic/tests/db/test_conductor.py b/ironic/tests/db/test_conductor.py index e9b54de6b9..4501d32188 100644 --- a/ironic/tests/db/test_conductor.py +++ b/ironic/tests/db/test_conductor.py @@ -32,17 +32,23 @@ class DbConductorTestCase(base.DbTestCase): super(DbConductorTestCase, self).setUp() self.dbapi = dbapi.get_instance() + def test_register_conductor_existing_fails(self): + c = utils.get_test_conductor() + self.dbapi.register_conductor(c) + self.assertRaises( + exception.ConductorAlreadyRegistered, + self.dbapi.register_conductor, + c) + + def test_register_conductor_override(self): + c = utils.get_test_conductor() + self.dbapi.register_conductor(c) + self.dbapi.register_conductor(c, update_existing=True) + def _create_test_cdr(self, **kwargs): c = utils.get_test_conductor(**kwargs) return self.dbapi.register_conductor(c) - def test_register_conductor(self): - self._create_test_cdr(id=1) - self.assertRaises( - exception.ConductorAlreadyRegistered, - self._create_test_cdr, - id=2) - def test_get_conductor(self): c1 = self._create_test_cdr() c2 = self.dbapi.get_conductor(c1.hostname) @@ -67,7 +73,7 @@ class DbConductorTestCase(base.DbTestCase): def test_touch_conductor(self, mock_utcnow): test_time = datetime.datetime(2000, 1, 1, 0, 0) mock_utcnow.return_value = test_time - c = self._create_test_cdr(updated_at=test_time) + c = self._create_test_cdr() self.assertEqual(test_time, timeutils.normalize_time(c.updated_at)) test_time = datetime.datetime(2000, 1, 1, 0, 1) @@ -77,12 +83,26 @@ class DbConductorTestCase(base.DbTestCase): self.assertEqual(test_time, timeutils.normalize_time(c.updated_at)) def test_touch_conductor_not_found(self): + # A conductor's heartbeat will not create a new record, + # it will only update existing ones self._create_test_cdr() self.assertRaises( exception.ConductorNotFound, self.dbapi.touch_conductor, 'bad-hostname') + def test_touch_offline_conductor(self): + # Ensure that a conductor's periodic heartbeat task can make the + # conductor visible again, even if it was spuriously marked offline + c = self._create_test_cdr() + self.dbapi.unregister_conductor(c.hostname) + self.assertRaises( + exception.ConductorNotFound, + self.dbapi.get_conductor, + c.hostname) + self.dbapi.touch_conductor(c.hostname) + self.dbapi.get_conductor(c.hostname) + @mock.patch.object(timeutils, 'utcnow') def test_get_active_driver_dict_one_host_no_driver(self, mock_utcnow): h = 'fake-host' diff --git a/ironic/tests/db/utils.py b/ironic/tests/db/utils.py index f55a108faa..edbf1d4d9b 100644 --- a/ironic/tests/db/utils.py +++ b/ironic/tests/db/utils.py @@ -14,6 +14,9 @@ # under the License. """Ironic test utilities.""" + +from oslo.utils import timeutils + from ironic.common import states @@ -151,6 +154,7 @@ def get_test_node(**kw): 'id': kw.get('id', 123), 'uuid': kw.get('uuid', '1be26c0b-03f2-4d2e-ae87-c02d7f33c123'), 'chassis_id': kw.get('chassis_id', 42), + 'conductor_affinity': kw.get('conductor_affinity', None), 'power_state': kw.get('power_state', states.NOSTATE), 'target_power_state': kw.get('target_power_state', states.NOSTATE), 'provision_state': kw.get('provision_state', states.NOSTATE), @@ -200,6 +204,6 @@ def get_test_conductor(**kw): 'id': kw.get('id', 6), 'hostname': kw.get('hostname', 'test-conductor-node'), 'drivers': kw.get('drivers', ['fake-driver', 'null-driver']), - 'created_at': kw.get('created_at'), - 'updated_at': kw.get('updated_at'), + 'created_at': kw.get('created_at', timeutils.utcnow()), + 'updated_at': kw.get('updated_at', timeutils.utcnow()), }