From 38d3dfb84a6de7a407cf6002491dc68383f51853 Mon Sep 17 00:00:00 2001 From: Alfredo Moralejo Date: Mon, 13 Oct 2025 14:56:16 +0200 Subject: [PATCH] APISchedulingService migrate audits also on first discovery of services Currently, the APISchedulingService only migrates ongoing audits when it detects the decision-engine changes the state from ACTIVE to FAILED. That makes that, audits assigned to a FAILED decision-engine are not reassigned when the watcher-api service is started after the decision-engine dies. After this patch, the APISchedulingService will also migrate ongoint audits also when a decision-engine is detected as FAILED when the service is started. I'm also moving the audit migration logic to a separate function to improve readability. Closes-Bug: #2127777 Change-Id: Ic3c9d89973ca976dde9f6e9b423671e52b049cdb Signed-off-by: Alfredo Moralejo --- .../notes/bug-2127777-7f512e63e087da91.yaml | 12 ++ watcher/api/scheduling.py | 66 +++--- watcher/tests/api/test_scheduling.py | 191 +++++++++++++++--- 3 files changed, 214 insertions(+), 55 deletions(-) create mode 100644 releasenotes/notes/bug-2127777-7f512e63e087da91.yaml diff --git a/releasenotes/notes/bug-2127777-7f512e63e087da91.yaml b/releasenotes/notes/bug-2127777-7f512e63e087da91.yaml new file mode 100644 index 000000000..5f80342e4 --- /dev/null +++ b/releasenotes/notes/bug-2127777-7f512e63e087da91.yaml @@ -0,0 +1,12 @@ +--- +fixes: + - | + Currently, the APISchedulingService does not migrate any ongoing + continuous audits in a failed decision-engine to an alive one when the + decision-engine dies while the watcher-api is not running. + + This patch fixes those cases by migrating the audits found on a failed + decision-engine when the watcher-api is started in addition to when a + change in a decision-engine status is detected. + + For more details: https://bugs.launchpad.net/watcher/+bug/2127777 diff --git a/watcher/api/scheduling.py b/watcher/api/scheduling.py index 688b850b0..e6bfbb150 100644 --- a/watcher/api/scheduling.py +++ b/watcher/api/scheduling.py @@ -37,18 +37,50 @@ class APISchedulingService(scheduling.BackgroundSchedulerService): super().__init__(gconfig, **options) def get_services_status(self, context): + services_states = [] services = objects.service.Service.list(context) + for service in services: + state = self.get_service_status(context, service.id) + service.state = state + services_states.append(service) + return services_states + + def _migrate_audits_to_new_host(self, ongoing_audits, alive_services): + round_robin = itertools.cycle(alive_services) + for audit in ongoing_audits: + failed_host = audit.hostname + audit.hostname = round_robin.__next__() + audit.save() + LOG.info('Audit %(audit)s has been migrated to ' + '%(host)s since %(failed_host)s is in' + ' %(state)s', + {'audit': audit.uuid, + 'host': audit.hostname, + 'failed_host': failed_host, + 'state': objects.service.ServiceStatus.FAILED}) + + def monitor_services_status(self, context): active_s = objects.service.ServiceStatus.ACTIVE failed_s = objects.service.ServiceStatus.FAILED + services = self.get_services_status(context) + alive_services = [ + s.host for s in services + if s.state == active_s and s.name == 'watcher-decision-engine'] for service in services: - result = self.get_service_status(context, service.id) - if service.id not in self.services_status: + result = service.state + # This covers both a service change, initial service monitor + # startup and adding a new service + if self.services_status.get(service.id) != result: + # Notification is sent only if the service is already monitored + if self.services_status.get(service.id) is not None: + notifications.service.send_service_update(context, service, + state=result) self.services_status[service.id] = result - continue - if self.services_status[service.id] != result: - self.services_status[service.id] = result - notifications.service.send_service_update(context, service, - state=result) + # Only execute the migration logic if there are alive + # services + if len(alive_services) == 0: + LOG.warning('No alive services found for decision engine') + continue if (result == failed_s) and ( service.name == 'watcher-decision-engine'): audit_filters = { @@ -60,22 +92,8 @@ class APISchedulingService(scheduling.BackgroundSchedulerService): context, filters=audit_filters, eager=True) - alive_services = [ - s.host for s in services - if (self.services_status[s.id] == active_s and - s.name == 'watcher-decision-engine')] - - round_robin = itertools.cycle(alive_services) - for audit in ongoing_audits: - audit.hostname = round_robin.__next__() - audit.save() - LOG.info('Audit %(audit)s has been migrated to ' - '%(host)s since %(failed_host)s is in' - ' %(state)s', - {'audit': audit.uuid, - 'host': audit.hostname, - 'failed_host': service.host, - 'state': failed_s}) + self._migrate_audits_to_new_host( + ongoing_audits, alive_services) def get_service_status(self, context, service_id): service = objects.Service.get(context, service_id) @@ -105,7 +123,7 @@ class APISchedulingService(scheduling.BackgroundSchedulerService): def start(self): """Start service.""" context = watcher_context.make_context(is_admin=True) - self.add_job(self.get_services_status, name='service_status', + self.add_job(self.monitor_services_status, name='service_status', trigger='interval', jobstore='default', args=[context], next_run_time=datetime.datetime.now(), seconds=CONF.periodic_interval) diff --git a/watcher/tests/api/test_scheduling.py b/watcher/tests/api/test_scheduling.py index 9f793afd6..545b91455 100644 --- a/watcher/tests/api/test_scheduling.py +++ b/watcher/tests/api/test_scheduling.py @@ -18,6 +18,7 @@ import freezegun from oslo_utils import timeutils from watcher.api import scheduling +from watcher.common import utils as common_utils from watcher.notifications import service from watcher import objects from watcher.tests import base @@ -44,47 +45,31 @@ class TestSchedulingServiceFunctions(db_base.DbTestCase): created_at=timeutils.utcnow()) self.fake_service = objects.Service(**fake_service) - @mock.patch.object(scheduling.APISchedulingService, 'get_service_status') - @mock.patch.object(objects.Service, 'list') + @mock.patch.object(scheduling.APISchedulingService, 'get_services_status') @mock.patch.object(service, 'send_service_update') - def test_get_services_status_without_services_in_list( - self, mock_service_update, mock_get_list, mock_service_status): + def test_monitor_services_with_services_in_list_same_status( + self, mock_service_update, mock_services_status): scheduler = scheduling.APISchedulingService() - mock_get_list.return_value = [self.fake_service] - mock_service_status.return_value = 'ACTIVE' - scheduler.get_services_status(mock.ANY) - mock_service_status.assert_called_once_with(mock.ANY, - self.fake_service.id) - - mock_service_update.assert_not_called() - - @mock.patch.object(scheduling.APISchedulingService, 'get_service_status') - @mock.patch.object(objects.Service, 'list') - @mock.patch.object(service, 'send_service_update') - def test_get_services_status_with_services_in_list_same_status( - self, mock_service_update, mock_get_list, mock_service_status): - scheduler = scheduling.APISchedulingService() - mock_get_list.return_value = [self.fake_service] scheduler.services_status = {1: 'ACTIVE'} - mock_service_status.return_value = 'ACTIVE' - scheduler.get_services_status(mock.ANY) - mock_service_status.assert_called_once_with(mock.ANY, - self.fake_service.id) - + self.fake_service.state = 'ACTIVE' + mock_services_status.return_value = [self.fake_service] + scheduler.monitor_services_status(mock.ANY) + mock_services_status.assert_called_once_with(mock.ANY) mock_service_update.assert_not_called() - @mock.patch.object(scheduling.APISchedulingService, 'get_service_status') + @mock.patch.object(scheduling.APISchedulingService, 'get_services_status') @mock.patch.object(objects.Service, 'list') @mock.patch.object(service, 'send_service_update') - def test_get_services_status_with_services_in_list_diff_status( - self, mock_service_update, mock_get_list, mock_service_status): + def test_monitor_services_with_services_in_list_diff_status( + self, mock_service_update, mock_get_list, mock_services_status): scheduler = scheduling.APISchedulingService() mock_get_list.return_value = [self.fake_service] scheduler.services_status = {1: 'FAILED'} - mock_service_status.return_value = 'ACTIVE' - scheduler.get_services_status(mock.ANY) - mock_service_status.assert_called_once_with(mock.ANY, - self.fake_service.id) + self.fake_service.state = 'ACTIVE' + mock_services_status.return_value = [self.fake_service] + + scheduler.monitor_services_status(mock.ANY) + mock_services_status.assert_called_once_with(mock.ANY) mock_service_update.assert_called_once_with(mock.ANY, self.fake_service, @@ -112,3 +97,147 @@ class TestSchedulingServiceFunctions(db_base.DbTestCase): mock_get.assert_called_once_with(mock.ANY, self.fake_service.id) self.assertEqual('ACTIVE', service_status) + + @mock.patch.object(scheduling.APISchedulingService, 'get_service_status') + @mock.patch.object(objects.Service, 'list') + @mock.patch.object(service, 'send_service_update') + def test_get_services_status_without_services_in_list( + self, mock_service_update, mock_get_list, mock_service_status): + scheduler = scheduling.APISchedulingService() + mock_get_list.return_value = [] + services_status = scheduler.get_services_status(mock.ANY) + self.assertEqual([], services_status) + mock_service_status.assert_not_called() + + @mock.patch.object(scheduling.APISchedulingService, 'get_service_status') + @mock.patch.object(objects.Service, 'list') + def test_get_services_status_with_services_in_list( + self, m_service_list, m_get_service_status): + """Test that get_services_status returns only decision-engines.""" + # Create various services + de_service1 = utils.get_test_service( + id=1, name='watcher-decision-engine', host='host1') + de_service2 = utils.get_test_service( + id=2, name='watcher-decision-engine', host='host2') + api_service = utils.get_test_service( + id=3, name='watcher-api', host='host3') + applier_service = utils.get_test_service( + id=4, name='watcher-applier', host='host4') + + m_service_list.return_value = [ + objects.Service(**de_service1), + objects.Service(**de_service2), + objects.Service(**api_service), + objects.Service(**applier_service) + ] + + m_get_service_status.side_effect = [ + objects.service.ServiceStatus.ACTIVE, + objects.service.ServiceStatus.FAILED, + objects.service.ServiceStatus.ACTIVE, + objects.service.ServiceStatus.ACTIVE + ] + + scheduler = scheduling.APISchedulingService() + result = scheduler.get_services_status(self.context) + + # Verify the calls to get_service_status + m_get_service_status.assert_has_calls([ + mock.call(self.context, 1), + mock.call(self.context, 2), + mock.call(self.context, 3), + mock.call(self.context, 4) + ]) + + # Should return all services + self.assertEqual(4, len(result)) + for wservice in result: + match wservice.host: + case 'host1': + self.assertEqual('watcher-decision-engine', wservice.name) + self.assertEqual( + objects.service.ServiceStatus.ACTIVE, wservice.state) + case 'host2': + self.assertEqual('watcher-decision-engine', wservice.name) + self.assertEqual( + objects.service.ServiceStatus.FAILED, wservice.state) + case 'host3': + self.assertEqual('watcher-api', wservice.name) + self.assertEqual( + objects.service.ServiceStatus.ACTIVE, wservice.state) + case 'host4': + self.assertEqual('watcher-applier', wservice.name) + self.assertEqual( + objects.service.ServiceStatus.ACTIVE, wservice.state) + case _: + self.fail(f'Unexpected host: {wservice.host}') + + def test_migrate_audits_round_robin_assigns_hosts_and_saves(self): + scheduler = scheduling.APISchedulingService() + # Prepare three ongoing audits with the same failed host + uuid_prefix = common_utils.generate_uuid()[:-1] + audits = [ + objects.Audit(context=self.context, + uuid=f'{uuid_prefix}{i}', + hostname='failed-host') + for i in range(3) + ] + + alive_services = ['hostA', 'hostB'] + + with mock.patch.object(scheduling, 'LOG') as m_log: + with mock.patch.object(objects.Audit, 'save') as m_save: + scheduler._migrate_audits_to_new_host(audits, alive_services) + + # Round-robin expected: hostA, hostB, hostA + self.assertEqual('hostA', audits[0].hostname) + self.assertEqual('hostB', audits[1].hostname) + self.assertEqual('hostA', audits[2].hostname) + + # Each audit must be saved once + self.assertEqual(3, m_save.call_count) + + # A log must be emitted per audit + self.assertEqual(3, m_log.info.call_count) + + def test_migrate_audits_logs_expected_payload(self): + scheduler = scheduling.APISchedulingService() + # Prepare audits with distinct failed hosts to validate payload + uuid_prefix = common_utils.generate_uuid()[:-1] + audits = [ + objects.Audit(context=self.context, + uuid=f'{uuid_prefix}{i}', + hostname=f'failed-{i}') + for i in range(2) + ] + + alive_services = ['host1', 'host2'] + + with mock.patch.object(scheduling, 'LOG') as m_log: + with mock.patch.object(objects.Audit, 'save') as m_save: + scheduler._migrate_audits_to_new_host(audits, alive_services) + + # Each audit must be saved once + self.assertEqual(2, m_save.call_count) + + # Validate payloads of log calls + calls = m_log.info.call_args_list + self.assertEqual(2, len(calls)) + + # First audit migrated to host1 + args0, _ = calls[0] + payload0 = args0[1] + self.assertEqual(f'{uuid_prefix}0', payload0['audit']) + self.assertEqual('host1', payload0['host']) + self.assertEqual('failed-0', payload0['failed_host']) + self.assertEqual(objects.service.ServiceStatus.FAILED, + payload0['state']) + + # Second audit migrated to host2 + args1, _ = calls[1] + payload1 = args1[1] + self.assertEqual(f'{uuid_prefix}1', payload1['audit']) + self.assertEqual('host2', payload1['host']) + self.assertEqual('failed-1', payload1['failed_host']) + self.assertEqual(objects.service.ServiceStatus.FAILED, + payload1['state'])