Merge "APISchedulingService migrate audits also on first discovery of services"
This commit is contained in:
12
releasenotes/notes/bug-2127777-7f512e63e087da91.yaml
Normal file
12
releasenotes/notes/bug-2127777-7f512e63e087da91.yaml
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Currently, the APISchedulingService does not migrate any ongoing
|
||||||
|
continuous audits in a failed decision-engine to an alive one when the
|
||||||
|
decision-engine dies while the watcher-api is not running.
|
||||||
|
|
||||||
|
This patch fixes those cases by migrating the audits found on a failed
|
||||||
|
decision-engine when the watcher-api is started in addition to when a
|
||||||
|
change in a decision-engine status is detected.
|
||||||
|
|
||||||
|
For more details: https://bugs.launchpad.net/watcher/+bug/2127777
|
||||||
@@ -37,18 +37,50 @@ class APISchedulingService(scheduling.BackgroundSchedulerService):
|
|||||||
super().__init__(gconfig, **options)
|
super().__init__(gconfig, **options)
|
||||||
|
|
||||||
def get_services_status(self, context):
|
def get_services_status(self, context):
|
||||||
|
services_states = []
|
||||||
services = objects.service.Service.list(context)
|
services = objects.service.Service.list(context)
|
||||||
|
for service in services:
|
||||||
|
state = self.get_service_status(context, service.id)
|
||||||
|
service.state = state
|
||||||
|
services_states.append(service)
|
||||||
|
return services_states
|
||||||
|
|
||||||
|
def _migrate_audits_to_new_host(self, ongoing_audits, alive_services):
|
||||||
|
round_robin = itertools.cycle(alive_services)
|
||||||
|
for audit in ongoing_audits:
|
||||||
|
failed_host = audit.hostname
|
||||||
|
audit.hostname = round_robin.__next__()
|
||||||
|
audit.save()
|
||||||
|
LOG.info('Audit %(audit)s has been migrated to '
|
||||||
|
'%(host)s since %(failed_host)s is in'
|
||||||
|
' %(state)s',
|
||||||
|
{'audit': audit.uuid,
|
||||||
|
'host': audit.hostname,
|
||||||
|
'failed_host': failed_host,
|
||||||
|
'state': objects.service.ServiceStatus.FAILED})
|
||||||
|
|
||||||
|
def monitor_services_status(self, context):
|
||||||
active_s = objects.service.ServiceStatus.ACTIVE
|
active_s = objects.service.ServiceStatus.ACTIVE
|
||||||
failed_s = objects.service.ServiceStatus.FAILED
|
failed_s = objects.service.ServiceStatus.FAILED
|
||||||
|
services = self.get_services_status(context)
|
||||||
|
alive_services = [
|
||||||
|
s.host for s in services
|
||||||
|
if s.state == active_s and s.name == 'watcher-decision-engine']
|
||||||
for service in services:
|
for service in services:
|
||||||
result = self.get_service_status(context, service.id)
|
result = service.state
|
||||||
if service.id not in self.services_status:
|
# This covers both a service change, initial service monitor
|
||||||
self.services_status[service.id] = result
|
# startup and adding a new service
|
||||||
continue
|
if self.services_status.get(service.id) != result:
|
||||||
if self.services_status[service.id] != result:
|
# Notification is sent only if the service is already monitored
|
||||||
self.services_status[service.id] = result
|
if self.services_status.get(service.id) is not None:
|
||||||
notifications.service.send_service_update(context, service,
|
notifications.service.send_service_update(context, service,
|
||||||
state=result)
|
state=result)
|
||||||
|
self.services_status[service.id] = result
|
||||||
|
# Only execute the migration logic if there are alive
|
||||||
|
# services
|
||||||
|
if len(alive_services) == 0:
|
||||||
|
LOG.warning('No alive services found for decision engine')
|
||||||
|
continue
|
||||||
if (result == failed_s) and (
|
if (result == failed_s) and (
|
||||||
service.name == 'watcher-decision-engine'):
|
service.name == 'watcher-decision-engine'):
|
||||||
audit_filters = {
|
audit_filters = {
|
||||||
@@ -60,22 +92,8 @@ class APISchedulingService(scheduling.BackgroundSchedulerService):
|
|||||||
context,
|
context,
|
||||||
filters=audit_filters,
|
filters=audit_filters,
|
||||||
eager=True)
|
eager=True)
|
||||||
alive_services = [
|
self._migrate_audits_to_new_host(
|
||||||
s.host for s in services
|
ongoing_audits, alive_services)
|
||||||
if (self.services_status[s.id] == active_s and
|
|
||||||
s.name == 'watcher-decision-engine')]
|
|
||||||
|
|
||||||
round_robin = itertools.cycle(alive_services)
|
|
||||||
for audit in ongoing_audits:
|
|
||||||
audit.hostname = round_robin.__next__()
|
|
||||||
audit.save()
|
|
||||||
LOG.info('Audit %(audit)s has been migrated to '
|
|
||||||
'%(host)s since %(failed_host)s is in'
|
|
||||||
' %(state)s',
|
|
||||||
{'audit': audit.uuid,
|
|
||||||
'host': audit.hostname,
|
|
||||||
'failed_host': service.host,
|
|
||||||
'state': failed_s})
|
|
||||||
|
|
||||||
def get_service_status(self, context, service_id):
|
def get_service_status(self, context, service_id):
|
||||||
service = objects.Service.get(context, service_id)
|
service = objects.Service.get(context, service_id)
|
||||||
@@ -105,7 +123,7 @@ class APISchedulingService(scheduling.BackgroundSchedulerService):
|
|||||||
def start(self):
|
def start(self):
|
||||||
"""Start service."""
|
"""Start service."""
|
||||||
context = watcher_context.make_context(is_admin=True)
|
context = watcher_context.make_context(is_admin=True)
|
||||||
self.add_job(self.get_services_status, name='service_status',
|
self.add_job(self.monitor_services_status, name='service_status',
|
||||||
trigger='interval', jobstore='default', args=[context],
|
trigger='interval', jobstore='default', args=[context],
|
||||||
next_run_time=datetime.datetime.now(),
|
next_run_time=datetime.datetime.now(),
|
||||||
seconds=CONF.periodic_interval)
|
seconds=CONF.periodic_interval)
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ import freezegun
|
|||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
|
|
||||||
from watcher.api import scheduling
|
from watcher.api import scheduling
|
||||||
|
from watcher.common import utils as common_utils
|
||||||
from watcher.notifications import service
|
from watcher.notifications import service
|
||||||
from watcher import objects
|
from watcher import objects
|
||||||
from watcher.tests import base
|
from watcher.tests import base
|
||||||
@@ -44,47 +45,31 @@ class TestSchedulingServiceFunctions(db_base.DbTestCase):
|
|||||||
created_at=timeutils.utcnow())
|
created_at=timeutils.utcnow())
|
||||||
self.fake_service = objects.Service(**fake_service)
|
self.fake_service = objects.Service(**fake_service)
|
||||||
|
|
||||||
@mock.patch.object(scheduling.APISchedulingService, 'get_service_status')
|
@mock.patch.object(scheduling.APISchedulingService, 'get_services_status')
|
||||||
@mock.patch.object(objects.Service, 'list')
|
|
||||||
@mock.patch.object(service, 'send_service_update')
|
@mock.patch.object(service, 'send_service_update')
|
||||||
def test_get_services_status_without_services_in_list(
|
def test_monitor_services_with_services_in_list_same_status(
|
||||||
self, mock_service_update, mock_get_list, mock_service_status):
|
self, mock_service_update, mock_services_status):
|
||||||
scheduler = scheduling.APISchedulingService()
|
scheduler = scheduling.APISchedulingService()
|
||||||
mock_get_list.return_value = [self.fake_service]
|
|
||||||
mock_service_status.return_value = 'ACTIVE'
|
|
||||||
scheduler.get_services_status(mock.ANY)
|
|
||||||
mock_service_status.assert_called_once_with(mock.ANY,
|
|
||||||
self.fake_service.id)
|
|
||||||
|
|
||||||
mock_service_update.assert_not_called()
|
|
||||||
|
|
||||||
@mock.patch.object(scheduling.APISchedulingService, 'get_service_status')
|
|
||||||
@mock.patch.object(objects.Service, 'list')
|
|
||||||
@mock.patch.object(service, 'send_service_update')
|
|
||||||
def test_get_services_status_with_services_in_list_same_status(
|
|
||||||
self, mock_service_update, mock_get_list, mock_service_status):
|
|
||||||
scheduler = scheduling.APISchedulingService()
|
|
||||||
mock_get_list.return_value = [self.fake_service]
|
|
||||||
scheduler.services_status = {1: 'ACTIVE'}
|
scheduler.services_status = {1: 'ACTIVE'}
|
||||||
mock_service_status.return_value = 'ACTIVE'
|
self.fake_service.state = 'ACTIVE'
|
||||||
scheduler.get_services_status(mock.ANY)
|
mock_services_status.return_value = [self.fake_service]
|
||||||
mock_service_status.assert_called_once_with(mock.ANY,
|
scheduler.monitor_services_status(mock.ANY)
|
||||||
self.fake_service.id)
|
mock_services_status.assert_called_once_with(mock.ANY)
|
||||||
|
|
||||||
mock_service_update.assert_not_called()
|
mock_service_update.assert_not_called()
|
||||||
|
|
||||||
@mock.patch.object(scheduling.APISchedulingService, 'get_service_status')
|
@mock.patch.object(scheduling.APISchedulingService, 'get_services_status')
|
||||||
@mock.patch.object(objects.Service, 'list')
|
@mock.patch.object(objects.Service, 'list')
|
||||||
@mock.patch.object(service, 'send_service_update')
|
@mock.patch.object(service, 'send_service_update')
|
||||||
def test_get_services_status_with_services_in_list_diff_status(
|
def test_monitor_services_with_services_in_list_diff_status(
|
||||||
self, mock_service_update, mock_get_list, mock_service_status):
|
self, mock_service_update, mock_get_list, mock_services_status):
|
||||||
scheduler = scheduling.APISchedulingService()
|
scheduler = scheduling.APISchedulingService()
|
||||||
mock_get_list.return_value = [self.fake_service]
|
mock_get_list.return_value = [self.fake_service]
|
||||||
scheduler.services_status = {1: 'FAILED'}
|
scheduler.services_status = {1: 'FAILED'}
|
||||||
mock_service_status.return_value = 'ACTIVE'
|
self.fake_service.state = 'ACTIVE'
|
||||||
scheduler.get_services_status(mock.ANY)
|
mock_services_status.return_value = [self.fake_service]
|
||||||
mock_service_status.assert_called_once_with(mock.ANY,
|
|
||||||
self.fake_service.id)
|
scheduler.monitor_services_status(mock.ANY)
|
||||||
|
mock_services_status.assert_called_once_with(mock.ANY)
|
||||||
|
|
||||||
mock_service_update.assert_called_once_with(mock.ANY,
|
mock_service_update.assert_called_once_with(mock.ANY,
|
||||||
self.fake_service,
|
self.fake_service,
|
||||||
@@ -112,3 +97,147 @@ class TestSchedulingServiceFunctions(db_base.DbTestCase):
|
|||||||
mock_get.assert_called_once_with(mock.ANY,
|
mock_get.assert_called_once_with(mock.ANY,
|
||||||
self.fake_service.id)
|
self.fake_service.id)
|
||||||
self.assertEqual('ACTIVE', service_status)
|
self.assertEqual('ACTIVE', service_status)
|
||||||
|
|
||||||
|
@mock.patch.object(scheduling.APISchedulingService, 'get_service_status')
|
||||||
|
@mock.patch.object(objects.Service, 'list')
|
||||||
|
@mock.patch.object(service, 'send_service_update')
|
||||||
|
def test_get_services_status_without_services_in_list(
|
||||||
|
self, mock_service_update, mock_get_list, mock_service_status):
|
||||||
|
scheduler = scheduling.APISchedulingService()
|
||||||
|
mock_get_list.return_value = []
|
||||||
|
services_status = scheduler.get_services_status(mock.ANY)
|
||||||
|
self.assertEqual([], services_status)
|
||||||
|
mock_service_status.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch.object(scheduling.APISchedulingService, 'get_service_status')
|
||||||
|
@mock.patch.object(objects.Service, 'list')
|
||||||
|
def test_get_services_status_with_services_in_list(
|
||||||
|
self, m_service_list, m_get_service_status):
|
||||||
|
"""Test that get_services_status returns only decision-engines."""
|
||||||
|
# Create various services
|
||||||
|
de_service1 = utils.get_test_service(
|
||||||
|
id=1, name='watcher-decision-engine', host='host1')
|
||||||
|
de_service2 = utils.get_test_service(
|
||||||
|
id=2, name='watcher-decision-engine', host='host2')
|
||||||
|
api_service = utils.get_test_service(
|
||||||
|
id=3, name='watcher-api', host='host3')
|
||||||
|
applier_service = utils.get_test_service(
|
||||||
|
id=4, name='watcher-applier', host='host4')
|
||||||
|
|
||||||
|
m_service_list.return_value = [
|
||||||
|
objects.Service(**de_service1),
|
||||||
|
objects.Service(**de_service2),
|
||||||
|
objects.Service(**api_service),
|
||||||
|
objects.Service(**applier_service)
|
||||||
|
]
|
||||||
|
|
||||||
|
m_get_service_status.side_effect = [
|
||||||
|
objects.service.ServiceStatus.ACTIVE,
|
||||||
|
objects.service.ServiceStatus.FAILED,
|
||||||
|
objects.service.ServiceStatus.ACTIVE,
|
||||||
|
objects.service.ServiceStatus.ACTIVE
|
||||||
|
]
|
||||||
|
|
||||||
|
scheduler = scheduling.APISchedulingService()
|
||||||
|
result = scheduler.get_services_status(self.context)
|
||||||
|
|
||||||
|
# Verify the calls to get_service_status
|
||||||
|
m_get_service_status.assert_has_calls([
|
||||||
|
mock.call(self.context, 1),
|
||||||
|
mock.call(self.context, 2),
|
||||||
|
mock.call(self.context, 3),
|
||||||
|
mock.call(self.context, 4)
|
||||||
|
])
|
||||||
|
|
||||||
|
# Should return all services
|
||||||
|
self.assertEqual(4, len(result))
|
||||||
|
for wservice in result:
|
||||||
|
match wservice.host:
|
||||||
|
case 'host1':
|
||||||
|
self.assertEqual('watcher-decision-engine', wservice.name)
|
||||||
|
self.assertEqual(
|
||||||
|
objects.service.ServiceStatus.ACTIVE, wservice.state)
|
||||||
|
case 'host2':
|
||||||
|
self.assertEqual('watcher-decision-engine', wservice.name)
|
||||||
|
self.assertEqual(
|
||||||
|
objects.service.ServiceStatus.FAILED, wservice.state)
|
||||||
|
case 'host3':
|
||||||
|
self.assertEqual('watcher-api', wservice.name)
|
||||||
|
self.assertEqual(
|
||||||
|
objects.service.ServiceStatus.ACTIVE, wservice.state)
|
||||||
|
case 'host4':
|
||||||
|
self.assertEqual('watcher-applier', wservice.name)
|
||||||
|
self.assertEqual(
|
||||||
|
objects.service.ServiceStatus.ACTIVE, wservice.state)
|
||||||
|
case _:
|
||||||
|
self.fail(f'Unexpected host: {wservice.host}')
|
||||||
|
|
||||||
|
def test_migrate_audits_round_robin_assigns_hosts_and_saves(self):
|
||||||
|
scheduler = scheduling.APISchedulingService()
|
||||||
|
# Prepare three ongoing audits with the same failed host
|
||||||
|
uuid_prefix = common_utils.generate_uuid()[:-1]
|
||||||
|
audits = [
|
||||||
|
objects.Audit(context=self.context,
|
||||||
|
uuid=f'{uuid_prefix}{i}',
|
||||||
|
hostname='failed-host')
|
||||||
|
for i in range(3)
|
||||||
|
]
|
||||||
|
|
||||||
|
alive_services = ['hostA', 'hostB']
|
||||||
|
|
||||||
|
with mock.patch.object(scheduling, 'LOG') as m_log:
|
||||||
|
with mock.patch.object(objects.Audit, 'save') as m_save:
|
||||||
|
scheduler._migrate_audits_to_new_host(audits, alive_services)
|
||||||
|
|
||||||
|
# Round-robin expected: hostA, hostB, hostA
|
||||||
|
self.assertEqual('hostA', audits[0].hostname)
|
||||||
|
self.assertEqual('hostB', audits[1].hostname)
|
||||||
|
self.assertEqual('hostA', audits[2].hostname)
|
||||||
|
|
||||||
|
# Each audit must be saved once
|
||||||
|
self.assertEqual(3, m_save.call_count)
|
||||||
|
|
||||||
|
# A log must be emitted per audit
|
||||||
|
self.assertEqual(3, m_log.info.call_count)
|
||||||
|
|
||||||
|
def test_migrate_audits_logs_expected_payload(self):
|
||||||
|
scheduler = scheduling.APISchedulingService()
|
||||||
|
# Prepare audits with distinct failed hosts to validate payload
|
||||||
|
uuid_prefix = common_utils.generate_uuid()[:-1]
|
||||||
|
audits = [
|
||||||
|
objects.Audit(context=self.context,
|
||||||
|
uuid=f'{uuid_prefix}{i}',
|
||||||
|
hostname=f'failed-{i}')
|
||||||
|
for i in range(2)
|
||||||
|
]
|
||||||
|
|
||||||
|
alive_services = ['host1', 'host2']
|
||||||
|
|
||||||
|
with mock.patch.object(scheduling, 'LOG') as m_log:
|
||||||
|
with mock.patch.object(objects.Audit, 'save') as m_save:
|
||||||
|
scheduler._migrate_audits_to_new_host(audits, alive_services)
|
||||||
|
|
||||||
|
# Each audit must be saved once
|
||||||
|
self.assertEqual(2, m_save.call_count)
|
||||||
|
|
||||||
|
# Validate payloads of log calls
|
||||||
|
calls = m_log.info.call_args_list
|
||||||
|
self.assertEqual(2, len(calls))
|
||||||
|
|
||||||
|
# First audit migrated to host1
|
||||||
|
args0, _ = calls[0]
|
||||||
|
payload0 = args0[1]
|
||||||
|
self.assertEqual(f'{uuid_prefix}0', payload0['audit'])
|
||||||
|
self.assertEqual('host1', payload0['host'])
|
||||||
|
self.assertEqual('failed-0', payload0['failed_host'])
|
||||||
|
self.assertEqual(objects.service.ServiceStatus.FAILED,
|
||||||
|
payload0['state'])
|
||||||
|
|
||||||
|
# Second audit migrated to host2
|
||||||
|
args1, _ = calls[1]
|
||||||
|
payload1 = args1[1]
|
||||||
|
self.assertEqual(f'{uuid_prefix}1', payload1['audit'])
|
||||||
|
self.assertEqual('host2', payload1['host'])
|
||||||
|
self.assertEqual('failed-1', payload1['failed_host'])
|
||||||
|
self.assertEqual(objects.service.ServiceStatus.FAILED,
|
||||||
|
payload1['state'])
|
||||||
|
|||||||
Reference in New Issue
Block a user