Merge "Add cleanup step for orphaned health checks"
This commit is contained in:
commit
c4420212e1
|
@ -33,6 +33,9 @@ HEALTH_MANAGER_OPTS = [
|
|||
deprecated_name='health_manager_thread_pool_size',
|
||||
deprecated_group="DEFAULT",
|
||||
help=_('Number of senlin-health-manager threads.')),
|
||||
cfg.IntOpt('cleanup_interval',
|
||||
default=900,
|
||||
help=_('Seconds between running periodic cleanup tasks.')),
|
||||
]
|
||||
|
||||
|
||||
|
|
|
@ -520,6 +520,10 @@ def registry_get_by_param(context, params):
|
|||
return IMPL.registry_get_by_param(context, params)
|
||||
|
||||
|
||||
def registry_list_ids_by_service(context, params):
|
||||
return IMPL.registry_list_by_service(context, params)
|
||||
|
||||
|
||||
def db_sync(engine, version=None):
|
||||
"""Migrate the database to `version` or the most recent version."""
|
||||
return IMPL.db_sync(engine, version=version)
|
||||
|
|
|
@ -1760,6 +1760,12 @@ def registry_get_by_param(context, params):
|
|||
return obj
|
||||
|
||||
|
||||
def registry_list_ids_by_service(context, engine_id):
|
||||
with session_for_read() as session:
|
||||
return session.query(models.HealthRegistry.cluster_id).filter_by(
|
||||
engine_id=engine_id).all()
|
||||
|
||||
|
||||
# Utils
|
||||
def db_sync(engine, version=None):
|
||||
"""Migrate the database to `version` or the most recent version."""
|
||||
|
|
|
@ -30,6 +30,7 @@ from senlin.common import consts
|
|||
from senlin.common import context
|
||||
from senlin.common import messaging as rpc
|
||||
from senlin.common import utils
|
||||
from senlin.db.sqlalchemy import api as db_api
|
||||
from senlin.engine import node as node_mod
|
||||
from senlin.engine.notifications import heat_endpoint
|
||||
from senlin.engine.notifications import nova_endpoint
|
||||
|
@ -534,14 +535,10 @@ class RuntimeHealthRegistry(object):
|
|||
def __init__(self, ctx, engine_id, thread_group):
|
||||
self.ctx = ctx
|
||||
self.engine_id = engine_id
|
||||
self.rt = {}
|
||||
self.registries = {}
|
||||
self.tg = thread_group
|
||||
self.health_check_types = defaultdict(lambda: [])
|
||||
|
||||
@property
|
||||
def registries(self):
|
||||
return self.rt
|
||||
|
||||
def register_cluster(self, cluster_id, interval=None,
|
||||
node_update_timeout=None, params=None,
|
||||
enabled=True):
|
||||
|
@ -777,6 +774,21 @@ class RuntimeHealthRegistry(object):
|
|||
if registry.enabled:
|
||||
self.add_health_check(self.registries[registry.cluster_id])
|
||||
|
||||
def cleanup_orphaned_healthchecks(self):
|
||||
"""Cleanup orphaned healthchecks."""
|
||||
db_registries = db_api.registry_list_ids_by_service(
|
||||
self.ctx, self.engine_id
|
||||
)
|
||||
for registry_id in self.registries:
|
||||
if registry_id in db_registries:
|
||||
continue
|
||||
entity = self.registries[registry_id]
|
||||
if not entity:
|
||||
continue
|
||||
LOG.info('Removing orphaned health check: %s from %s',
|
||||
registry_id, self.engine_id)
|
||||
self.remove_health_check(self.registries[registry_id])
|
||||
|
||||
|
||||
def notify(engine_id, method, **kwargs):
|
||||
"""Send notification to health manager service.
|
||||
|
|
|
@ -40,6 +40,7 @@ class HealthManagerService(service.Service):
|
|||
# which happens after the fork when spawning multiple worker processes
|
||||
self.health_registry = None
|
||||
self.target = None
|
||||
self.cleanup_task_timer = None
|
||||
|
||||
@property
|
||||
def service_name(self):
|
||||
|
@ -59,8 +60,15 @@ class HealthManagerService(service.Service):
|
|||
self.server.start()
|
||||
|
||||
self.tg.add_dynamic_timer(self.task, None, cfg.CONF.periodic_interval)
|
||||
self.cleanup_task_timer = self.tg.add_timer(
|
||||
CONF.health_manager.cleanup_interval, self.cleanup_task,
|
||||
initial_delay=CONF.health_manager.cleanup_interval
|
||||
)
|
||||
|
||||
def stop(self, graceful=False):
|
||||
if self.cleanup_task_timer:
|
||||
self.cleanup_task_timer.stop()
|
||||
self.cleanup_task_timer = None
|
||||
if self.server:
|
||||
self.server.stop()
|
||||
self.server.wait()
|
||||
|
@ -82,6 +90,13 @@ class HealthManagerService(service.Service):
|
|||
start_time, cfg.CONF.periodic_interval, name='Health manager task'
|
||||
)
|
||||
|
||||
def cleanup_task(self):
|
||||
LOG.debug('Running cleanup task')
|
||||
try:
|
||||
self.health_registry.cleanup_orphaned_healthchecks()
|
||||
except Exception as ex:
|
||||
LOG.error("Failed to run cleanup tasks for health manager: %s", ex)
|
||||
|
||||
def listening(self, ctx):
|
||||
"""Respond to confirm that the rpc service is still alive."""
|
||||
return True
|
||||
|
|
|
@ -133,3 +133,38 @@ class DBAPIRegistryTest(base.SenlinTestCase):
|
|||
self.assertEqual(registry.interval, obj.interval)
|
||||
self.assertEqual(registry.params, obj.params)
|
||||
self.assertEqual(registry.engine_id, obj.engine_id)
|
||||
|
||||
def test_registry_list_ids_by_service(self):
|
||||
for index in range(10):
|
||||
self._create_registry(
|
||||
cluster_id='MAIN_FAKE_ID_%d' % index,
|
||||
check_type='NODE_STATUS_POLLING',
|
||||
interval=60,
|
||||
params={},
|
||||
engine_id='ENGINE0'
|
||||
)
|
||||
for index in range(10):
|
||||
self._create_registry(
|
||||
cluster_id='FAKE_ID_%d' % (index + 1),
|
||||
check_type='NODE_STATUS_POLLING',
|
||||
interval=60,
|
||||
params={},
|
||||
engine_id='ENGINE%d'
|
||||
)
|
||||
|
||||
registries = db_api.registry_list_ids_by_service(self.ctx, 'ENGINE0')
|
||||
self.assertEqual(10, len(registries))
|
||||
for registry in registries:
|
||||
self.assertIn('MAIN_FAKE_ID', registry.cluster_id)
|
||||
|
||||
def test_registry_list_ids_by_service_is_empty(self):
|
||||
self._create_registry(
|
||||
cluster_id='FAKE_ID',
|
||||
check_type='NODE_STATUS_POLLING',
|
||||
interval=60,
|
||||
params={},
|
||||
engine_id='ENGINE'
|
||||
)
|
||||
|
||||
registries = db_api.registry_list_ids_by_service(self.ctx, 'ENGINE1')
|
||||
self.assertEqual(0, len(registries))
|
||||
|
|
Loading…
Reference in New Issue