Rescheduling continuous audits from FAILED nodes

This patch set adds background job that reschedules CONTINUOUS
audits from FAILED to ACTIVE decision engine nodes using round-robin
algorithm. It also contains fix for main[1] HA PS about filtering audits.

[1]: https://review.openstack.org/#/c/578102/

Partially-Implements: blueprint support-watcher-ha-active-active-mode
Change-Id: Ib248a6cd3adbd3927c47db6bb819300361492411
This commit is contained in:
Alexander Chadin 2018-07-26 12:23:33 +03:00 committed by Alexander Chadin
parent e426a015ee
commit 20ffb5945f
6 changed files with 51 additions and 15 deletions

View File

@ -16,6 +16,7 @@
import datetime
import itertools
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
@ -40,6 +41,8 @@ class APISchedulingService(scheduling.BackgroundSchedulerService):
def get_services_status(self, context):
services = objects.service.Service.list(context)
active_s = objects.service.ServiceStatus.ACTIVE
failed_s = objects.service.ServiceStatus.FAILED
for service in services:
result = self.get_service_status(context, service.id)
if service.id not in self.services_status:
@ -49,6 +52,32 @@ class APISchedulingService(scheduling.BackgroundSchedulerService):
self.services_status[service.id] = result
notifications.service.send_service_update(context, service,
state=result)
if result == failed_s:
audit_filters = {
'audit_type': objects.audit.AuditType.CONTINUOUS.value,
'state': objects.audit.State.ONGOING,
'hostname': service.host
}
ongoing_audits = objects.Audit.list(
context,
filters=audit_filters,
eager=True)
alive_services = [
s.host for s in services
if (self.services_status[s.id] == active_s and
s.name == 'watcher-decision-engine')]
round_robin = itertools.cycle(alive_services)
for audit in ongoing_audits:
audit.hostname = round_robin.__next__()
audit.save()
LOG.info('Audit %(audit)s has been migrated to '
'%(host)s since %(failed_host)s is in'
' %(state)s',
{'audit': audit.uuid,
'host': audit.hostname,
'failed_host': service.host,
'state': failed_s})
def get_service_status(self, context, service_id):
service = objects.Service.get(context, service_id)

View File

@ -375,7 +375,7 @@ class Connection(api.BaseConnection):
filters = {}
plain_fields = ['uuid', 'audit_type', 'state', 'goal_id',
'strategy_id']
'strategy_id', 'hostname']
join_fieldmap = {
'goal_uuid': ("uuid", models.Goal),
'goal_name': ("name", models.Goal),

View File

@ -59,7 +59,8 @@ class ContinuousAuditHandler(base.AuditHandler):
def _is_audit_inactive(self, audit):
audit = objects.Audit.get_by_uuid(
self.context_show_deleted, audit.uuid)
if objects.audit.AuditStateTransitionManager().is_inactive(audit):
if (objects.audit.AuditStateTransitionManager().is_inactive(audit) or
audit.hostname != CONF.host):
# if audit isn't in active states, audit's job must be removed to
# prevent using of inactive audit in future.
if self.scheduler.get_jobs():
@ -124,28 +125,31 @@ class ContinuousAuditHandler(base.AuditHandler):
'state__in': (objects.audit.State.PENDING,
objects.audit.State.ONGOING,
objects.audit.State.SUCCEEDED),
'hostname__in': (None, CONF.host)
}
audits = objects.Audit.list(
audit_filters['hostname'] = None
unscheduled_audits = objects.Audit.list(
audit_context, filters=audit_filters, eager=True)
for audit in audits:
for audit in unscheduled_audits:
# If continuous audit doesn't have a hostname yet,
# Watcher will set current CONF.host value.
if audit.hostname is None:
audit.hostname = CONF.host
audit.save()
# Let's remove this audit from current execution
# and execute it as usual Audit with hostname later.
audits.remove(audit)
# TODO(alexchadin): Add scheduling of new continuous audits.
audit.hostname = CONF.host
audit.save()
scheduler_job_args = [
(job.args[0].uuid, job) for job
in self.scheduler.get_jobs()
if job.name == 'execute_audit']
scheduler_jobs = dict(scheduler_job_args)
# if audit isn't in active states, audit's job should be removed
jobs_to_remove = []
for job in scheduler_jobs.values():
if self._is_audit_inactive(job.args[0]):
scheduler_jobs.pop(job.args[0].uuid)
jobs_to_remove.append(job.args[0].uuid)
for audit_uuid in jobs_to_remove:
scheduler_jobs.pop(audit_uuid)
audit_filters['hostname'] = CONF.host
audits = objects.Audit.list(
audit_context, filters=audit_filters, eager=True)
for audit in audits:
existing_job = scheduler_jobs.get(audit.uuid, None)
# if audit is not presented in scheduled audits yet,

View File

@ -66,7 +66,7 @@ class TestCancelOngoingActionPlans(db_base.DbTestCase):
self.context,
action_plan_id=1,
state=objects.action.State.PENDING)
cfg.CONF.set_override('host', 'hostname1')
cfg.CONF.set_override("host", "hostname1")
@mock.patch.object(objects.action.Action, 'save')
@mock.patch.object(objects.action_plan.ActionPlan, 'save')

View File

@ -17,6 +17,7 @@
import datetime
import mock
from oslo_config import cfg
from oslo_utils import uuidutils
from apscheduler import job
@ -242,8 +243,10 @@ class TestContinuousAuditHandler(base.DbTestCase):
audit_template_id=audit_template.id,
goal_id=self.goal.id,
audit_type=objects.audit.AuditType.CONTINUOUS.value,
goal=self.goal)
goal=self.goal,
hostname='hostname1')
for id_ in range(2, 4)]
cfg.CONF.set_override("host", "hostname1")
@mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine')

View File

@ -62,7 +62,7 @@ class TestCancelOngoingAudits(db_base.DbTestCase):
goal=self.goal,
hostname='hostname1',
state=objects.audit.State.ONGOING)
cfg.CONF.set_override('host', 'hostname1')
cfg.CONF.set_override("host", "hostname1")
@mock.patch.object(objects.audit.Audit, 'save')
@mock.patch.object(objects.audit.Audit, 'list')