Merge "Improve Health Manager to avoid duplicate health checks."

This commit is contained in:
Zuul 2019-03-05 19:21:58 +00:00 committed by Gerrit Code Review
commit 0cdb021e19
7 changed files with 1161 additions and 1128 deletions

View File

@ -652,7 +652,7 @@ class Resource(object):
except exception.SenlinException as err:
raise translate_exception(err, request.best_match_language())
except Exception as err:
log_exception(err, sys.exc_info())
log_exception(err)
raise translate_exception(err, request.best_match_language())
try:
@ -830,9 +830,8 @@ class Controller(object):
raise exc.HTTPNotFound()
def log_exception(err, exc_info):
args = {'exc_info': exc_info}
LOG.error("Unexpected error occurred serving API: %s", err, **args)
def log_exception(err):
LOG.error("Unexpected error occurred serving API: %s", err)
def translate_exception(ex, locale):

View File

@ -110,6 +110,9 @@ engine_opts = [
cfg.IntOpt('scheduler_thread_pool_size',
default=1000,
help=_('Maximum number of threads to use for scheduler.')),
cfg.IntOpt('health_manager_thread_pool_size',
default=1000,
help=_('Maximum number of threads to use for health manager.')),
]
cfg.CONF.register_opts(engine_opts)

View File

@ -290,6 +290,12 @@ DETECTION_TYPES = (
# 'LB_STATUS_POLLING',
)
HEALTH_CHECK_TYPES = (
EVENTS, POLLING,
) = (
'EVENTS', 'POLLING'
)
RECOVERY_ACTIONS = (
RECOVER_REBOOT, RECOVER_REBUILD, RECOVER_RECREATE,
) = (

View File

@ -243,6 +243,9 @@ class NovaClient(base.DriverBase):
def server_delete(self, server, ignore_missing=True):
return
def server_stop(self, server):
return
def server_force_delete(self, server, ignore_missing=True):
return

View File

@ -19,6 +19,7 @@ health policies.
from collections import defaultdict
from collections import namedtuple
import eventlet
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
@ -27,7 +28,6 @@ from oslo_service import threadgroup
from oslo_utils import timeutils
import re
import tenacity
import time
from senlin.common import consts
from senlin.common import context
@ -300,7 +300,7 @@ class NodePollStatusHealthCheck(HealthCheckType):
class NodePollUrlHealthCheck(HealthCheckType):
@staticmethod
def _convert_detection_tuple(dictionary):
def convert_detection_tuple(dictionary):
return namedtuple('DetectionMode', dictionary.keys())(**dictionary)
def _expand_url_template(self, url_template, node):
@ -333,23 +333,23 @@ class NodePollUrlHealthCheck(HealthCheckType):
verify=verify_ssl)
except Exception as ex:
if conn_error_as_unhealthy:
LOG.info('%s for %s: connection error when polling URL (%s)',
LOG.info("%s for %s: connection error when polling URL (%s)",
consts.POLL_URL_FAIL, node.name, ex)
return False
else:
LOG.info('%s for %s: ignoring connection error when polling '
'URL (%s)',
LOG.info("%s for %s: ignoring connection error when polling "
"URL (%s)",
consts.POLL_URL_PASS, node.name, ex)
return True
if not re.search(expected_resp_str, result):
LOG.info('%s for %s: did not find expected response string %s in '
'URL result (%s)',
LOG.info("%s for %s: did not find expected response string %s in "
"URL result (%s)",
consts.POLL_URL_FAIL, node.name, expected_resp_str,
result)
return False
LOG.info('%s for %s: matched expected response string.',
LOG.info("%s for %s: matched expected response string.",
consts.POLL_URL_PASS, node.name)
return True
@ -377,8 +377,8 @@ class NodePollUrlHealthCheck(HealthCheckType):
try:
if node.status != consts.NS_ACTIVE:
LOG.info('%s for %s: node is not in ACTIVE state, so skip '
'poll url',
LOG.info("%s for %s: node is not in ACTIVE state, so skip "
"poll url",
consts.POLL_URL_PASS, node.name)
return True
@ -395,7 +395,7 @@ class NodePollUrlHealthCheck(HealthCheckType):
self._node_within_grace_period(node))
except Exception as ex:
LOG.warning(
'%s for %s: Ignoring error on poll URL: %s',
"%s for %s: Ignoring error on poll URL: %s",
consts.POLL_URL_PASS, node.name, ex
)
@ -403,44 +403,352 @@ class NodePollUrlHealthCheck(HealthCheckType):
return True
class HealthManager(service.Service):
class HealthCheck(object):
def __init__(self, engine_service, topic, version):
super(HealthManager, self).__init__()
self.TG = threadgroup.ThreadGroup()
self.engine_id = engine_service.engine_id
self.topic = topic
self.version = version
self.ctx = context.get_admin_context()
def __init__(self, ctx, engine_id, cluster_id, check_type, interval,
node_update_timeout, params, enabled):
self.rpc_client = rpc_client.EngineClient()
self.rt = {
'registries': [],
}
self.health_check_types = defaultdict(lambda: [])
self.ctx = ctx
self.engine_id = engine_id
def task(self):
"""Task that is queued on the health manager thread group.
self.cluster_id = cluster_id
self.check_type = check_type
self.interval = interval
self.node_update_timeout = node_update_timeout
self.params = params
self.enabled = enabled
self.timer = None
self.listener = None
The task is here so that the service always has something to wait()
on, or else the process will exit.
"""
self.health_check_types = []
self.recover_action = {}
self.type = None
self.get_health_check_types()
self.get_recover_actions()
def get_health_check_types(self):
polling_types = [consts.NODE_STATUS_POLLING,
consts.NODE_STATUS_POLL_URL]
detection_types = self.check_type.split(',')
if all(check in polling_types for check in detection_types):
interval = min(self.interval, cfg.CONF.check_interval_max)
for check in detection_types:
self.health_check_types.append(
HealthCheckType.factory(
check, self.cluster_id, interval, self.params
)
)
self.type = consts.POLLING
elif (len(detection_types) == 1 and
detection_types[0] == consts.LIFECYCLE_EVENTS):
self.type = consts.EVENTS
def get_recover_actions(self):
if 'node_delete_timeout' in self.params:
self.recover_action['delete_timeout'] = self.params[
'node_delete_timeout']
if 'node_force_recreate' in self.params:
self.recover_action['force_recreate'] = self.params[
'node_force_recreate']
if 'recover_action' in self.params:
rac = self.params['recover_action']
for operation in rac:
self.recover_action['operation'] = operation.get('name')
def execute_health_check(self):
start_time = timeutils.utcnow(True)
try:
self._load_runtime_registry()
if not self.health_check_types:
LOG.error("No health check types found for cluster: %s",
self.cluster_id)
return _chase_up(start_time, self.interval)
cluster = objects.Cluster.get(self.ctx, self.cluster_id,
project_safe=False)
if not cluster:
LOG.warning("Cluster (%s) is not found.", self.cluster_id)
return _chase_up(start_time, self.interval)
ctx = context.get_service_context(user_id=cluster.user,
project_id=cluster.project)
actions = []
# loop through nodes and run all health checks on each node
nodes = objects.Node.get_all_by_cluster(ctx, self.cluster_id)
for node in nodes:
action = self._check_node_health(ctx, node, cluster)
if action:
actions.append(action)
for a in actions:
# wait for action to complete
res, reason = self._wait_for_action(
ctx, a['action'], self.node_update_timeout)
if not res:
LOG.warning("Node recovery action %s did not complete "
"within specified timeout: %s", a['action'],
reason)
if len(actions) == 0:
LOG.info("Health check passed for all nodes in cluster %s.",
self.cluster_id)
except Exception as ex:
LOG.error("Failed when running '_load_runtime_registry': %s", ex)
return _chase_up(start_time, cfg.CONF.periodic_interval,
name='Health manager task')
LOG.warning("Error while performing health check: %s", ex)
def _add_listener(self, cluster_id, recover_action):
"""Routine to be executed for adding cluster listener.
finally:
return _chase_up(start_time, self.interval)
:param cluster_id: The UUID of the cluster to be filtered.
:param recover_action: The health policy action name.
:returns: Nothing.
def _check_node_health(self, ctx, node, cluster):
node_is_healthy = True
if self.params['recovery_conditional'] == consts.ANY_FAILED:
# recovery happens if any detection mode fails
# i.e. the inverse logic is that node is considered healthy
# if all detection modes pass
node_is_healthy = all(
hc.run_health_check(ctx, node)
for hc in self.health_check_types)
elif self.params['recovery_conditional'] == consts.ALL_FAILED:
# recovery happens if all detection modes fail
# i.e. the inverse logic is that node is considered healthy
# if any detection mode passes
node_is_healthy = any(
hc.run_health_check(ctx, node)
for hc in self.health_check_types)
else:
raise Exception("%s is an invalid recovery conditional" %
self.params['recovery_conditional'])
if not node_is_healthy:
LOG.info("Health check failed for %s in %s and "
"recovery has started.",
node.name, cluster.name)
return self._recover_node(ctx, node.id)
def _wait_for_action(self, ctx, action_id, timeout):
req = objects.ActionGetRequest(identity=action_id)
action = {}
with timeutils.StopWatch(timeout) as timeout_watch:
while not timeout_watch.expired():
action = self.rpc_client.call(ctx, 'action_get', req)
if action['status'] in [consts.ACTION_SUCCEEDED,
consts.ACTION_FAILED,
consts.ACTION_CANCELLED]:
break
eventlet.sleep(2)
if not action:
return False, "Failed to retrieve action."
elif action['status'] == consts.ACTION_SUCCEEDED:
return True, ""
elif (action['status'] == consts.ACTION_FAILED or
action['status'] == consts.ACTION_CANCELLED):
return False, "Cluster check action failed or cancelled"
return False, ("Timeout while waiting for node recovery action to "
"finish")
def _recover_node(self, ctx, node_id):
"""Recover node
:returns: Recover action
"""
try:
req = objects.NodeRecoverRequest(identity=node_id,
params=self.recover_action)
return self.rpc_client.call(ctx, 'node_recover', req)
except Exception as ex:
LOG.error("Error when performing node recovery for %s: %s",
node_id, ex)
return None
def db_create(self):
try:
objects.HealthRegistry.create(
self.ctx, self.cluster_id, self.check_type, self.interval,
self.params, self.engine_id, self.enabled)
return True
except Exception as ex:
LOG.error("Error while adding health entry for cluster %s to "
"database: %s", self.cluster_id, ex)
return False
def db_delete(self):
try:
objects.HealthRegistry.delete(self.ctx, self.cluster_id)
return True
except Exception as ex:
LOG.error("Error while removing health entry for cluster %s from "
"database: %s", self.cluster_id, ex)
return False
def enable(self):
try:
objects.HealthRegistry.update(self.ctx, self.cluster_id,
{'enabled': True})
self.enabled = True
return True
except Exception as ex:
LOG.error("Error while enabling health entry for cluster %s: %s",
self.cluster_id, ex)
return False
def disable(self):
try:
objects.HealthRegistry.update(self.ctx, self.cluster_id,
{'enabled': False})
self.enabled = False
return True
except Exception as ex:
LOG.error("Error while disabling health entry for cluster %s: %s",
self.cluster_id, ex)
return False
class RuntimeHealthRegistry(object):
def __init__(self, ctx, engine_id, thread_group):
self.ctx = ctx
self.engine_id = engine_id
self.rt = {}
self.TG = thread_group
self.health_check_types = defaultdict(lambda: [])
@property
def registries(self):
return self.rt
def register_cluster(self, cluster_id, interval=None,
node_update_timeout=None, params=None,
enabled=True):
"""Register cluster to health registry.
:param cluster_id: The ID of the cluster to be registered.
:param interval: An optional integer indicating the length of checking
periods in seconds.
:param node_update_timeout: Timeout to wait for node action to
complete.
:param dict params: Other parameters for the health check.
:param enabled: Boolean indicating if the health check is enabled.
:return: RuntimeHealthRegistry object for cluster
"""
params = params or {}
# extract check_type from params
check_type = ""
if 'detection_modes' in params:
check_type = ','.join([
NodePollUrlHealthCheck.convert_detection_tuple(d).type
for d in params['detection_modes']
])
# add node_update_timeout to params
params['node_update_timeout'] = node_update_timeout
entry = None
try:
entry = HealthCheck(
ctx=self.ctx,
engine_id=self.engine_id,
cluster_id=cluster_id,
check_type=check_type,
interval=interval,
node_update_timeout=node_update_timeout,
params=params,
enabled=enabled
)
if entry.db_create():
self.registries[cluster_id] = entry
self.add_health_check(self.registries[cluster_id])
except Exception as ex:
LOG.error("Error while trying to register cluster for health "
"checks %s: %s", cluster_id, ex)
if entry:
entry.db_delete()
def unregister_cluster(self, cluster_id):
"""Unregister a cluster from health registry.
:param cluster_id: The ID of the cluster to be unregistered.
:return: RuntimeHealthRegistry object for the cluster being
unregistered.
"""
entry = None
try:
if cluster_id in self.registries:
entry = self.registries.pop(cluster_id)
entry.db_delete()
except Exception as ex:
LOG.error("Error while trying to unregister cluster from health"
"checks %s: %s", cluster_id, ex)
finally:
if entry:
self.remove_health_check(entry)
def enable_cluster(self, cluster_id):
"""Update the status of a cluster to enabled in the health registry.
:param cluster_id: The ID of the cluster to be enabled.
"""
LOG.info("Enabling health check for cluster %s.", cluster_id)
try:
if cluster_id in self.registries:
if self.registries[cluster_id].enable():
self.add_health_check(self.registries[cluster_id])
else:
LOG.error("Unable to enable cluster for health checking: %s",
cluster_id)
except Exception as ex:
LOG.error("Error while enabling health checks for cluster %s: %s",
cluster_id, ex)
if cluster_id in self.registries:
self.remove_health_check(self.registries[cluster_id])
def disable_cluster(self, cluster_id):
"""Update the status of a cluster to disabled in the health registry.
:param cluster_id: The ID of the cluster to be disabled.
:return: None.
"""
LOG.info("Disabling health check for cluster %s.", cluster_id)
try:
if cluster_id in self.registries:
self.registries[cluster_id].disable()
else:
LOG.error("Unable to disable cluster for health checking: %s",
cluster_id)
except Exception as ex:
LOG.error("Error while disabling health checks for cluster %s: %s",
cluster_id, ex)
finally:
if cluster_id in self.registries:
self.remove_health_check(self.registries[cluster_id])
def _add_timer(self, cluster_id):
entry = self.registries[cluster_id]
if entry.timer:
LOG.error("Health check for cluster %s already exists", cluster_id)
return None
timer = self.TG.add_dynamic_timer(entry.execute_health_check, None,
None)
if timer:
entry.timer = timer
else:
LOG.error("Error creating timer for cluster: %s", cluster_id)
def _add_listener(self, cluster_id):
entry = self.registries[cluster_id]
if entry.listener:
LOG.error("Listener for cluster %s already exists", cluster_id)
return
cluster = objects.Cluster.get(self.ctx, cluster_id, project_safe=False)
if not cluster:
LOG.warning("Cluster (%s) is not found.", cluster_id)
@ -453,237 +761,136 @@ class HealthManager(service.Service):
elif profile_type == 'os.heat.stack':
exchange = cfg.CONF.health_manager.heat_control_exchange
else:
return None
return
project = cluster.project
return self.TG.add_thread(ListenerProc, exchange, project, cluster_id,
recover_action)
def _recover_node(self, node_id, ctx, recover_action):
"""Recover node
:returns: Recover action
"""
try:
req = objects.NodeRecoverRequest(identity=node_id,
params=recover_action)
return self.rpc_client.call(ctx, 'node_recover', req)
except Exception as ex:
LOG.error('Error when performing node recovery for %s: %s',
node_id, ex)
return None
def _wait_for_action(self, ctx, action_id, timeout):
req = objects.ActionGetRequest(identity=action_id)
with timeutils.StopWatch(timeout) as timeout_watch:
while not timeout_watch.expired():
action = self.rpc_client.call(ctx, 'action_get', req)
if action['status'] in [
consts.ACTION_SUCCEEDED, consts.ACTION_FAILED,
consts.ACTION_CANCELLED]:
break
time.sleep(2)
if action['status'] == consts.ACTION_SUCCEEDED:
return True, ""
if (action['status'] == consts.ACTION_FAILED or
action['status'] == consts.ACTION_CANCELLED):
return False, "Cluster check action failed or cancelled"
return False, ("Timeout while waiting for node recovery action to "
"finish")
def _add_health_check(self, cluster_id, health_check):
self.health_check_types[cluster_id].append(health_check)
def _execute_health_check(self, interval, cluster_id,
recover_action, recovery_cond,
node_update_timeout):
start_time = timeutils.utcnow(True)
try:
if cluster_id not in self.health_check_types:
LOG.error("Cluster (%s) is not found in health_check_types.",
self.cluster_id)
return _chase_up(start_time, interval)
if len(self.health_check_types[cluster_id]) == 0:
LOG.error("No health check types found for Cluster (%s).",
self.cluster_id)
return _chase_up(start_time, interval)
cluster = objects.Cluster.get(self.ctx, cluster_id,
project_safe=False)
if not cluster:
LOG.warning("Cluster (%s) is not found.", self.cluster_id)
return _chase_up(start_time, interval)
ctx = context.get_service_context(user_id=cluster.user,
project_id=cluster.project)
actions = []
# loop through nodes and run all health checks on each node
nodes = objects.Node.get_all_by_cluster(ctx, cluster_id)
for node in nodes:
node_is_healthy = True
if recovery_cond == consts.ANY_FAILED:
# recovery happens if any detection mode fails
# i.e. the inverse logic is that node is considered healthy
# if all detection modes pass
node_is_healthy = all(
hc.run_health_check(ctx, node)
for hc in self.health_check_types[cluster_id])
elif recovery_cond == consts.ALL_FAILED:
# recovery happens if all detection modes fail
# i.e. the inverse logic is that node is considered healthy
# if any detection mode passes
node_is_healthy = any(
hc.run_health_check(ctx, node)
for hc in self.health_check_types[cluster_id])
else:
raise Exception(
'{} is an invalid recovery conditional'.format(
recovery_cond))
if not node_is_healthy:
LOG.info("Health check failed for %s in %s and "
"recovery has started.",
node.name, cluster.name)
action = self._recover_node(node.id, ctx,
recover_action)
actions.append(action)
for a in actions:
# wait for action to complete
res, reason = self._wait_for_action(
ctx, a['action'], node_update_timeout)
if not res:
LOG.warning("Node recovery action %s did not complete "
"within specified timeout: %s", a['action'],
reason)
if len(actions) == 0:
LOG.info('Health check passed for all nodes in cluster %s.',
cluster_id)
except Exception as ex:
LOG.warning('Error while performing health check: %s', ex)
return _chase_up(start_time, interval)
def _start_check(self, entry):
"""Routine for starting the checking for a cluster.
:param entry: A dict containing the data associated with the cluster.
:returns: An updated registry entry record.
"""
LOG.info('Enabling health check for cluster %s.', entry['cluster_id'])
cid = entry['cluster_id']
ctype = entry['check_type']
# Get the recover action parameter from the entry params
params = entry['params']
recover_action = {}
if 'node_delete_timeout' in params:
recover_action['delete_timeout'] = params['node_delete_timeout']
if 'node_force_recreate' in params:
recover_action['force_recreate'] = params['node_force_recreate']
if 'recover_action' in params:
rac = params['recover_action']
for operation in rac:
recover_action['operation'] = operation.get('name')
polling_types = [consts.NODE_STATUS_POLLING,
consts.NODE_STATUS_POLL_URL]
detection_types = ctype.split(',')
if all(check in polling_types for check in detection_types):
interval = min(entry['interval'], cfg.CONF.check_interval_max)
for check in ctype.split(','):
self._add_health_check(cid, HealthCheckType.factory(
check, cid, interval, params))
timer = self.TG.add_dynamic_timer(self._execute_health_check,
None, None, interval, cid,
recover_action,
params['recovery_conditional'],
params['node_update_timeout'])
entry['timer'] = timer
elif (len(detection_types) == 1 and
detection_types[0] == consts.LIFECYCLE_EVENTS):
LOG.info("Start listening events for cluster (%s).", cid)
listener = self._add_listener(cid, recover_action)
if listener:
entry['listener'] = listener
else:
LOG.warning("Error creating listener for cluster %s", cid)
return None
listener = self.TG.add_thread(ListenerProc, exchange, project,
cluster_id, entry.recover_action)
if listener:
entry.listener = listener
else:
LOG.error("Cluster %(id)s check type %(type)s is invalid.",
{'id': cid, 'type': ctype})
return None
LOG.error("Error creating listener for cluster: %s", cluster_id)
return entry
def add_health_check(self, entry):
"""Add a health check to the RuntimeHealthRegistry.
def _stop_check(self, entry):
"""Routine for stopping the checking for a cluster.
This method creates a timer/thread based on the type of health check
being added.
:param entry: A dict containing the data associated with the cluster.
:returns: ``None``.
:param entry: Entry to add to the registry.
:return: None
"""
LOG.info('Disabling health check for cluster %s.', entry['cluster_id'])
if entry.cluster_id in self.registries:
if not entry.enabled:
return
elif entry.timer:
LOG.error("Health check for cluster %s already exists",
entry.cluster_id)
return
else:
LOG.error("Unable to add health check for cluster: %s",
entry.cluster_id)
return
timer = entry.get('timer', None)
if timer:
if entry.type == consts.POLLING:
self._add_timer(entry.cluster_id)
elif entry.type == consts.EVENTS:
LOG.info("Start listening events for cluster (%s).",
entry.cluster_id)
self._add_listener(entry.cluster_id)
else:
LOG.error("Cluster %(id)s type %(type)s is invalid.",
{'id': entry.cluster_id, 'type': entry.type})
def remove_health_check(self, entry):
"""Remove a health check for the RuntimeHealthRegistry.
This method stops and removes the timer/thread based to the type of
health check being removed.
:param entry:
:return: None
"""
if entry.timer:
# stop timer
timer.stop()
entry.timer.stop()
try:
# tell threadgroup to remove timer
self.TG.timer_done(timer)
self.TG.timer_done(entry.timer)
except ValueError:
pass
finally:
entry.timer = None
if entry['cluster_id'] in self.health_check_types:
self.health_check_types.pop(entry['cluster_id'])
return
if entry.listener:
try:
self.TG.thread_done(entry.listener)
entry.listener.stop()
except ValueError:
pass
finally:
entry.listener = None
listener = entry.get('listener', None)
if listener:
self.TG.thread_done(listener)
listener.stop()
return
def _load_runtime_registry(self):
def load_runtime_registry(self):
"""Load the initial runtime registry with a DB scan."""
db_registries = objects.HealthRegistry.claim(self.ctx, self.engine_id)
for r in db_registries:
for registry in db_registries:
if registry.cluster_id in self.registries:
LOG.warning("Skipping duplicate health check for cluster: %s",
registry.cluster_id)
# Claiming indicates we claim a health registry who's engine was
# dead, and we will update the health registry's engine_id with
# current engine id. But we may not start check always.
entry = {
'cluster_id': r.cluster_id,
'check_type': r.check_type,
'interval': r.interval,
'params': r.params,
'enabled': r.enabled,
}
entry = HealthCheck(
ctx=self.ctx,
engine_id=self.engine_id,
cluster_id=registry.cluster_id,
check_type=registry.check_type,
interval=registry.interval,
node_update_timeout=registry.params['node_update_timeout'],
params=registry.params,
enabled=registry.enabled
)
LOG.info("Loading cluster %(c)s enabled=%(e)s for "
"health monitoring",
{'c': r.cluster_id, 'e': r.enabled})
if r.enabled:
# Stop any running checks for entry before starting.
self._stop_check(entry)
entry = self._start_check(entry)
if entry:
self.rt['registries'].append(entry)
{'c': registry.cluster_id, 'e': registry.enabled})
self.registries[registry.cluster_id] = entry
if registry.enabled:
self.add_health_check(self.registries[registry.cluster_id])
class HealthManager(service.Service):
def __init__(self, engine_service, topic, version):
super(HealthManager, self).__init__()
self.TG = threadgroup.ThreadGroup(
thread_pool_size=cfg.CONF.health_manager_thread_pool_size)
self.engine_id = engine_service.engine_id
self.topic = topic
self.version = version
self.ctx = context.get_admin_context()
self.rpc_client = rpc_client.EngineClient()
self.health_registry = RuntimeHealthRegistry(
ctx=self.ctx, engine_id=self.engine_id, thread_group=self.TG)
def task(self):
"""Task that is queued on the health manager thread group.
The task is here so that the service always has something to wait()
on, or else the process will exit.
"""
start_time = timeutils.utcnow(True)
try:
self.health_registry.load_runtime_registry()
except Exception as ex:
LOG.error("Failed when loading runtime for health manager: %s", ex)
return _chase_up(start_time, cfg.CONF.periodic_interval,
name='Health manager task')
def start(self):
"""Start the health manager RPC server.
@ -705,7 +912,7 @@ class HealthManager(service.Service):
@property
def registries(self):
return self.rt['registries']
return self.health_registry.registries
def listening(self, ctx):
"""Respond to confirm that the rpc service is still alive."""
@ -714,45 +921,24 @@ class HealthManager(service.Service):
def register_cluster(self, ctx, cluster_id, interval=None,
node_update_timeout=None, params=None,
enabled=True):
"""Register cluster for health checking.
"""Register a cluster for health checking.
:param ctx: The context of notify request.
:param cluster_id: The ID of the cluster to be checked.
:param interval: An optional integer indicating the length of checking
periods in seconds.
:param dict params: Other parameters for the health check.
:param cluster_id: The ID of the cluster to be unregistered.
:param interval: Interval of the health check.
:param node_update_timeout: Time to wait before declairing a node
unhealthy.
:param params: Params to be passed to health check.
:param enabled: Set's if the health check is enabled or disabled.
:return: None
"""
params = params or {}
# extract check_type from params
check_type = ""
if 'detection_modes' in params:
check_type = ','.join([
NodePollUrlHealthCheck._convert_detection_tuple(d).type
for d in params['detection_modes']
])
# add node_update_timeout to params
params['node_update_timeout'] = node_update_timeout
registry = objects.HealthRegistry.create(ctx, cluster_id, check_type,
interval, params,
self.engine_id,
enabled=enabled)
entry = {
'cluster_id': registry.cluster_id,
'check_type': registry.check_type,
'interval': registry.interval,
'params': registry.params,
'enabled': registry.enabled
}
if registry.enabled:
self._start_check(entry)
self.rt['registries'].append(entry)
LOG.info("Registering health check for cluster %s.", cluster_id)
self.health_registry.register_cluster(
cluster_id=cluster_id,
interval=interval,
node_update_timeout=node_update_timeout,
params=params,
enabled=enabled)
def unregister_cluster(self, ctx, cluster_id):
"""Unregister a cluster from health checking.
@ -761,29 +947,14 @@ class HealthManager(service.Service):
:param cluster_id: The ID of the cluster to be unregistered.
:return: None
"""
for i in range(len(self.rt['registries']) - 1, -1, -1):
entry = self.rt['registries'][i]
if entry.get('cluster_id') == cluster_id:
self._stop_check(entry)
self.rt['registries'].pop(i)
objects.HealthRegistry.delete(ctx, cluster_id)
LOG.debug('unregister done')
LOG.info("Unregistering health check for cluster %s.", cluster_id)
self.health_registry.unregister_cluster(cluster_id)
def enable_cluster(self, ctx, cluster_id, params=None):
for c in self.rt['registries']:
if c['cluster_id'] == cluster_id and not c['enabled']:
c['enabled'] = True
objects.HealthRegistry.update(ctx, cluster_id,
{'enabled': True})
self._start_check(c)
self.health_registry.enable_cluster(cluster_id)
def disable_cluster(self, ctx, cluster_id, params=None):
for c in self.rt['registries']:
if c['cluster_id'] == cluster_id and c['enabled']:
c['enabled'] = False
objects.HealthRegistry.update(ctx, cluster_id,
{'enabled': False})
self._stop_check(c)
self.health_registry.disable_cluster(cluster_id)
def notify(engine_id, method, **kwargs):

View File

@ -2264,6 +2264,7 @@ class EngineService(service.Service):
query['filters'] = filters
actions = action_obj.Action.get_all(ctx, **query)
return [a.to_dict() for a in actions]
@request_context

File diff suppressed because it is too large Load Diff