Introduce conductor touch while offline

This adds an `online` argument to the conductor touch methods so that
touch can be called with `online=False`. When called periodically this
allows the conductor `updated_at` to be within the threshold to avoid
locked nodes being failed as orphans by another conductor.

This will be used by drain shutdown (and graceful shutdown) so that
tasks can complete on existing locked nodes within the shutdown timeout,
while the conductor is also removed from the hash ring so new tasks are
not started on that conductor.

This change introduces the api but the existing behaviour won't change
until BaseConductorManager.del_host() no longer calls keepalive_halt().

Change-Id: Iedd62193fac1009137b9ee47a6ef5a9a8576f261
This commit is contained in:
Steve Baker 2023-10-05 14:59:45 +13:00
parent 9118440577
commit 3f9151163e
7 changed files with 33 additions and 9 deletions

View File

@ -321,13 +321,16 @@ class BaseConductorManager(object):
# This is only used in tests currently. Delete it? # This is only used in tests currently. Delete it?
self._periodic_task_callables = periodic_task_callables self._periodic_task_callables = periodic_task_callables
def keepalive_halt(self):
self._keepalive_evt.set()
def del_host(self, deregister=True, clear_node_reservations=True): def del_host(self, deregister=True, clear_node_reservations=True):
# Conductor deregistration fails if called on non-initialized # Conductor deregistration fails if called on non-initialized
# conductor (e.g. when rpc server is unreachable). # conductor (e.g. when rpc server is unreachable).
if not hasattr(self, 'conductor'): if not hasattr(self, 'conductor'):
return return
self._shutdown = True self._shutdown = True
self._keepalive_evt.set() self.keepalive_halt()
if clear_node_reservations: if clear_node_reservations:
# clear all locks held by this conductor before deregistering # clear all locks held by this conductor before deregistering
@ -469,7 +472,7 @@ class BaseConductorManager(object):
return return
while not self._keepalive_evt.is_set(): while not self._keepalive_evt.is_set():
try: try:
self.conductor.touch() self.conductor.touch(online=not self._shutdown)
except db_exception.DBConnectionError: except db_exception.DBConnectionError:
LOG.warning('Conductor could not connect to database ' LOG.warning('Conductor could not connect to database '
'while heartbeating.') 'while heartbeating.')

View File

@ -585,10 +585,16 @@ class Connection(object, metaclass=abc.ABCMeta):
""" """
@abc.abstractmethod @abc.abstractmethod
def touch_conductor(self, hostname): def touch_conductor(self, hostname, online=True):
"""Mark a conductor as active by updating its 'updated_at' property. """Mark a conductor as active by updating its 'updated_at' property.
Calling periodically with ``online=False`` will result in the conductor
appearing unregistered, but recently enough to prevent other conductors
failing orphan nodes. This improves the behaviour of graceful and drain
shutdown.
:param hostname: The hostname of this conductor service. :param hostname: The hostname of this conductor service.
:param online: Whether the conductor is online.
:raises: ConductorNotFound :raises: ConductorNotFound
""" """

View File

@ -1392,13 +1392,13 @@ class Connection(api.Connection):
raise exception.ConductorNotFound(conductor=hostname) raise exception.ConductorNotFound(conductor=hostname)
@oslo_db_api.retry_on_deadlock @oslo_db_api.retry_on_deadlock
def touch_conductor(self, hostname): def touch_conductor(self, hostname, online=True):
with _session_for_write() as session: with _session_for_write() as session:
query = sa.update(models.Conductor).where( query = sa.update(models.Conductor).where(
models.Conductor.hostname == hostname models.Conductor.hostname == hostname
).values({ ).values({
'updated_at': timeutils.utcnow(), 'updated_at': timeutils.utcnow(),
'online': True} 'online': online}
).execution_options(synchronize_session=False) ).execution_options(synchronize_session=False)
res = session.execute(query) res = session.execute(query)
count = res.rowcount count = res.rowcount

View File

@ -111,9 +111,9 @@ class Conductor(base.IronicObject, object_base.VersionedObjectDictCompat):
# methods can be used in the future to replace current explicit RPC calls. # methods can be used in the future to replace current explicit RPC calls.
# Implications of calling new remote procedures should be thought through. # Implications of calling new remote procedures should be thought through.
# @object_base.remotable # @object_base.remotable
def touch(self, context=None): def touch(self, context=None, online=True):
"""Touch this conductor's DB record, marking it as up-to-date.""" """Touch this conductor's DB record, marking it as up-to-date."""
self.dbapi.touch_conductor(self.hostname) self.dbapi.touch_conductor(self.hostname, online=online)
# NOTE(xek): We don't want to enable RPC on this call just yet. Remotable # NOTE(xek): We don't want to enable RPC on this call just yet. Remotable
# methods can be used in the future to replace current explicit RPC calls. # methods can be used in the future to replace current explicit RPC calls.

View File

@ -359,7 +359,7 @@ class KeepAliveTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
mock_is_sqlite.return_value = False mock_is_sqlite.return_value = False
self.service._conductor_service_record_keepalive() self.service._conductor_service_record_keepalive()
self.assertEqual(1, mock_is_sqlite.call_count) self.assertEqual(1, mock_is_sqlite.call_count)
mock_touch.assert_called_once_with(self.hostname) mock_touch.assert_called_once_with(self.hostname, online=True)
@mock.patch.object(common_utils, 'is_ironic_using_sqlite', autospec=True) @mock.patch.object(common_utils, 'is_ironic_using_sqlite', autospec=True)
def test__conductor_service_record_keepalive_failed_db_conn( def test__conductor_service_record_keepalive_failed_db_conn(

View File

@ -156,6 +156,21 @@ class DbConductorTestCase(base.DbTestCase):
c = self.dbapi.get_conductor(c.hostname) c = self.dbapi.get_conductor(c.hostname)
self.assertEqual(test_time, timeutils.normalize_time(c.updated_at)) self.assertEqual(test_time, timeutils.normalize_time(c.updated_at))
@mock.patch.object(timeutils, 'utcnow', autospec=True)
def test_touch_conductor_offline(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
c = self._create_test_cdr()
self.assertEqual(test_time, timeutils.normalize_time(c.updated_at))
test_time = datetime.datetime(2000, 1, 1, 0, 1)
mock_utcnow.return_value = test_time
self.dbapi.touch_conductor(c.hostname, online=False)
self.assertRaises(
exception.ConductorNotFound,
self.dbapi.get_conductor,
c.hostname)
def test_touch_conductor_not_found(self): def test_touch_conductor_not_found(self):
# A conductor's heartbeat will not create a new record, # A conductor's heartbeat will not create a new record,
# it will only update existing ones # it will only update existing ones

View File

@ -77,7 +77,7 @@ class TestConductorObject(db_base.DbTestCase):
c = objects.Conductor.get_by_hostname(self.context, host) c = objects.Conductor.get_by_hostname(self.context, host)
c.touch(self.context) c.touch(self.context)
mock_get_cdr.assert_called_once_with(host, online=True) mock_get_cdr.assert_called_once_with(host, online=True)
mock_touch_cdr.assert_called_once_with(host) mock_touch_cdr.assert_called_once_with(host, online=True)
def test_refresh(self): def test_refresh(self):
host = self.fake_conductor['hostname'] host = self.fake_conductor['hostname']