APISchedulingService migrate audits also on first discovery of services

Currently, the APISchedulingService only migrates ongoing audits
when it detects the decision-engine changes the state from ACTIVE
to FAILED. That makes that, audits assigned to a FAILED decision-engine
are not reassigned when the watcher-api service is started after the
decision-engine dies.

After this patch, the APISchedulingService will also migrate ongoint audits
also when a decision-engine is detected as FAILED when the service is
started.

I'm also moving the audit migration logic to a separate function to
improve readability.

Closes-Bug: #2127777

Change-Id: Ic3c9d89973ca976dde9f6e9b423671e52b049cdb
Signed-off-by: Alfredo Moralejo <amoralej@redhat.com>
This commit is contained in:
Alfredo Moralejo
2025-10-13 14:56:16 +02:00
parent b5725d6ea6
commit 38d3dfb84a
3 changed files with 214 additions and 55 deletions

View File

@@ -0,0 +1,12 @@
---
fixes:
- |
Currently, the APISchedulingService does not migrate any ongoing
continuous audits in a failed decision-engine to an alive one when the
decision-engine dies while the watcher-api is not running.
This patch fixes those cases by migrating the audits found on a failed
decision-engine when the watcher-api is started in addition to when a
change in a decision-engine status is detected.
For more details: https://bugs.launchpad.net/watcher/+bug/2127777

View File

@@ -37,18 +37,50 @@ class APISchedulingService(scheduling.BackgroundSchedulerService):
super().__init__(gconfig, **options)
def get_services_status(self, context):
services_states = []
services = objects.service.Service.list(context)
for service in services:
state = self.get_service_status(context, service.id)
service.state = state
services_states.append(service)
return services_states
def _migrate_audits_to_new_host(self, ongoing_audits, alive_services):
round_robin = itertools.cycle(alive_services)
for audit in ongoing_audits:
failed_host = audit.hostname
audit.hostname = round_robin.__next__()
audit.save()
LOG.info('Audit %(audit)s has been migrated to '
'%(host)s since %(failed_host)s is in'
' %(state)s',
{'audit': audit.uuid,
'host': audit.hostname,
'failed_host': failed_host,
'state': objects.service.ServiceStatus.FAILED})
def monitor_services_status(self, context):
active_s = objects.service.ServiceStatus.ACTIVE
failed_s = objects.service.ServiceStatus.FAILED
services = self.get_services_status(context)
alive_services = [
s.host for s in services
if s.state == active_s and s.name == 'watcher-decision-engine']
for service in services:
result = self.get_service_status(context, service.id)
if service.id not in self.services_status:
result = service.state
# This covers both a service change, initial service monitor
# startup and adding a new service
if self.services_status.get(service.id) != result:
# Notification is sent only if the service is already monitored
if self.services_status.get(service.id) is not None:
notifications.service.send_service_update(context, service,
state=result)
self.services_status[service.id] = result
continue
if self.services_status[service.id] != result:
self.services_status[service.id] = result
notifications.service.send_service_update(context, service,
state=result)
# Only execute the migration logic if there are alive
# services
if len(alive_services) == 0:
LOG.warning('No alive services found for decision engine')
continue
if (result == failed_s) and (
service.name == 'watcher-decision-engine'):
audit_filters = {
@@ -60,22 +92,8 @@ class APISchedulingService(scheduling.BackgroundSchedulerService):
context,
filters=audit_filters,
eager=True)
alive_services = [
s.host for s in services
if (self.services_status[s.id] == active_s and
s.name == 'watcher-decision-engine')]
round_robin = itertools.cycle(alive_services)
for audit in ongoing_audits:
audit.hostname = round_robin.__next__()
audit.save()
LOG.info('Audit %(audit)s has been migrated to '
'%(host)s since %(failed_host)s is in'
' %(state)s',
{'audit': audit.uuid,
'host': audit.hostname,
'failed_host': service.host,
'state': failed_s})
self._migrate_audits_to_new_host(
ongoing_audits, alive_services)
def get_service_status(self, context, service_id):
service = objects.Service.get(context, service_id)
@@ -105,7 +123,7 @@ class APISchedulingService(scheduling.BackgroundSchedulerService):
def start(self):
"""Start service."""
context = watcher_context.make_context(is_admin=True)
self.add_job(self.get_services_status, name='service_status',
self.add_job(self.monitor_services_status, name='service_status',
trigger='interval', jobstore='default', args=[context],
next_run_time=datetime.datetime.now(),
seconds=CONF.periodic_interval)

View File

@@ -18,6 +18,7 @@ import freezegun
from oslo_utils import timeutils
from watcher.api import scheduling
from watcher.common import utils as common_utils
from watcher.notifications import service
from watcher import objects
from watcher.tests import base
@@ -44,47 +45,31 @@ class TestSchedulingServiceFunctions(db_base.DbTestCase):
created_at=timeutils.utcnow())
self.fake_service = objects.Service(**fake_service)
@mock.patch.object(scheduling.APISchedulingService, 'get_service_status')
@mock.patch.object(objects.Service, 'list')
@mock.patch.object(scheduling.APISchedulingService, 'get_services_status')
@mock.patch.object(service, 'send_service_update')
def test_get_services_status_without_services_in_list(
self, mock_service_update, mock_get_list, mock_service_status):
def test_monitor_services_with_services_in_list_same_status(
self, mock_service_update, mock_services_status):
scheduler = scheduling.APISchedulingService()
mock_get_list.return_value = [self.fake_service]
mock_service_status.return_value = 'ACTIVE'
scheduler.get_services_status(mock.ANY)
mock_service_status.assert_called_once_with(mock.ANY,
self.fake_service.id)
mock_service_update.assert_not_called()
@mock.patch.object(scheduling.APISchedulingService, 'get_service_status')
@mock.patch.object(objects.Service, 'list')
@mock.patch.object(service, 'send_service_update')
def test_get_services_status_with_services_in_list_same_status(
self, mock_service_update, mock_get_list, mock_service_status):
scheduler = scheduling.APISchedulingService()
mock_get_list.return_value = [self.fake_service]
scheduler.services_status = {1: 'ACTIVE'}
mock_service_status.return_value = 'ACTIVE'
scheduler.get_services_status(mock.ANY)
mock_service_status.assert_called_once_with(mock.ANY,
self.fake_service.id)
self.fake_service.state = 'ACTIVE'
mock_services_status.return_value = [self.fake_service]
scheduler.monitor_services_status(mock.ANY)
mock_services_status.assert_called_once_with(mock.ANY)
mock_service_update.assert_not_called()
@mock.patch.object(scheduling.APISchedulingService, 'get_service_status')
@mock.patch.object(scheduling.APISchedulingService, 'get_services_status')
@mock.patch.object(objects.Service, 'list')
@mock.patch.object(service, 'send_service_update')
def test_get_services_status_with_services_in_list_diff_status(
self, mock_service_update, mock_get_list, mock_service_status):
def test_monitor_services_with_services_in_list_diff_status(
self, mock_service_update, mock_get_list, mock_services_status):
scheduler = scheduling.APISchedulingService()
mock_get_list.return_value = [self.fake_service]
scheduler.services_status = {1: 'FAILED'}
mock_service_status.return_value = 'ACTIVE'
scheduler.get_services_status(mock.ANY)
mock_service_status.assert_called_once_with(mock.ANY,
self.fake_service.id)
self.fake_service.state = 'ACTIVE'
mock_services_status.return_value = [self.fake_service]
scheduler.monitor_services_status(mock.ANY)
mock_services_status.assert_called_once_with(mock.ANY)
mock_service_update.assert_called_once_with(mock.ANY,
self.fake_service,
@@ -112,3 +97,147 @@ class TestSchedulingServiceFunctions(db_base.DbTestCase):
mock_get.assert_called_once_with(mock.ANY,
self.fake_service.id)
self.assertEqual('ACTIVE', service_status)
@mock.patch.object(scheduling.APISchedulingService, 'get_service_status')
@mock.patch.object(objects.Service, 'list')
@mock.patch.object(service, 'send_service_update')
def test_get_services_status_without_services_in_list(
self, mock_service_update, mock_get_list, mock_service_status):
scheduler = scheduling.APISchedulingService()
mock_get_list.return_value = []
services_status = scheduler.get_services_status(mock.ANY)
self.assertEqual([], services_status)
mock_service_status.assert_not_called()
@mock.patch.object(scheduling.APISchedulingService, 'get_service_status')
@mock.patch.object(objects.Service, 'list')
def test_get_services_status_with_services_in_list(
self, m_service_list, m_get_service_status):
"""Test that get_services_status returns only decision-engines."""
# Create various services
de_service1 = utils.get_test_service(
id=1, name='watcher-decision-engine', host='host1')
de_service2 = utils.get_test_service(
id=2, name='watcher-decision-engine', host='host2')
api_service = utils.get_test_service(
id=3, name='watcher-api', host='host3')
applier_service = utils.get_test_service(
id=4, name='watcher-applier', host='host4')
m_service_list.return_value = [
objects.Service(**de_service1),
objects.Service(**de_service2),
objects.Service(**api_service),
objects.Service(**applier_service)
]
m_get_service_status.side_effect = [
objects.service.ServiceStatus.ACTIVE,
objects.service.ServiceStatus.FAILED,
objects.service.ServiceStatus.ACTIVE,
objects.service.ServiceStatus.ACTIVE
]
scheduler = scheduling.APISchedulingService()
result = scheduler.get_services_status(self.context)
# Verify the calls to get_service_status
m_get_service_status.assert_has_calls([
mock.call(self.context, 1),
mock.call(self.context, 2),
mock.call(self.context, 3),
mock.call(self.context, 4)
])
# Should return all services
self.assertEqual(4, len(result))
for wservice in result:
match wservice.host:
case 'host1':
self.assertEqual('watcher-decision-engine', wservice.name)
self.assertEqual(
objects.service.ServiceStatus.ACTIVE, wservice.state)
case 'host2':
self.assertEqual('watcher-decision-engine', wservice.name)
self.assertEqual(
objects.service.ServiceStatus.FAILED, wservice.state)
case 'host3':
self.assertEqual('watcher-api', wservice.name)
self.assertEqual(
objects.service.ServiceStatus.ACTIVE, wservice.state)
case 'host4':
self.assertEqual('watcher-applier', wservice.name)
self.assertEqual(
objects.service.ServiceStatus.ACTIVE, wservice.state)
case _:
self.fail(f'Unexpected host: {wservice.host}')
def test_migrate_audits_round_robin_assigns_hosts_and_saves(self):
scheduler = scheduling.APISchedulingService()
# Prepare three ongoing audits with the same failed host
uuid_prefix = common_utils.generate_uuid()[:-1]
audits = [
objects.Audit(context=self.context,
uuid=f'{uuid_prefix}{i}',
hostname='failed-host')
for i in range(3)
]
alive_services = ['hostA', 'hostB']
with mock.patch.object(scheduling, 'LOG') as m_log:
with mock.patch.object(objects.Audit, 'save') as m_save:
scheduler._migrate_audits_to_new_host(audits, alive_services)
# Round-robin expected: hostA, hostB, hostA
self.assertEqual('hostA', audits[0].hostname)
self.assertEqual('hostB', audits[1].hostname)
self.assertEqual('hostA', audits[2].hostname)
# Each audit must be saved once
self.assertEqual(3, m_save.call_count)
# A log must be emitted per audit
self.assertEqual(3, m_log.info.call_count)
def test_migrate_audits_logs_expected_payload(self):
scheduler = scheduling.APISchedulingService()
# Prepare audits with distinct failed hosts to validate payload
uuid_prefix = common_utils.generate_uuid()[:-1]
audits = [
objects.Audit(context=self.context,
uuid=f'{uuid_prefix}{i}',
hostname=f'failed-{i}')
for i in range(2)
]
alive_services = ['host1', 'host2']
with mock.patch.object(scheduling, 'LOG') as m_log:
with mock.patch.object(objects.Audit, 'save') as m_save:
scheduler._migrate_audits_to_new_host(audits, alive_services)
# Each audit must be saved once
self.assertEqual(2, m_save.call_count)
# Validate payloads of log calls
calls = m_log.info.call_args_list
self.assertEqual(2, len(calls))
# First audit migrated to host1
args0, _ = calls[0]
payload0 = args0[1]
self.assertEqual(f'{uuid_prefix}0', payload0['audit'])
self.assertEqual('host1', payload0['host'])
self.assertEqual('failed-0', payload0['failed_host'])
self.assertEqual(objects.service.ServiceStatus.FAILED,
payload0['state'])
# Second audit migrated to host2
args1, _ = calls[1]
payload1 = args1[1]
self.assertEqual(f'{uuid_prefix}1', payload1['audit'])
self.assertEqual('host2', payload1['host'])
self.assertEqual('failed-1', payload1['failed_host'])
self.assertEqual(objects.service.ServiceStatus.FAILED,
payload1['state'])