Support multiple detection types in health policy
This patchset changes the existing health policy to allow NODE_STATUS_POLLING to be combined with NODE_STATUS_POLL_URL to provide multiple detection types. The existing implementation of NODE_STATUS_POLLING was changed to directly perform a do_check on the node instead of doing a cluster check operation. This was done to avoid locking the cluster as part of the health check. When the cluster check was performed during a health check, there was a potential lock contention problem if the user was already performing other operations on the same cluster. Implements: blueprint multiple-detection-modes Change-Id: Iba0b043cc582e668a9066b0586ca8a0b201040e4
This commit is contained in:
parent
694bd525d5
commit
3e6a688db0
|
@ -288,6 +288,12 @@ RECOVERY_ACTIONS = (
|
|||
'REBOOT', 'REBUILD', 'RECREATE',
|
||||
)
|
||||
|
||||
RECOVERY_CONDITIONAL = (
|
||||
ALL_FAILED, ANY_FAILED,
|
||||
) = (
|
||||
'ALL_FAILED', 'ANY_FAILED',
|
||||
)
|
||||
|
||||
NOTIFICATION_PRIORITIES = (
|
||||
PRIO_AUDIT, PRIO_CRITICAL, PRIO_ERROR, PRIO_WARN, PRIO_INFO, PRIO_DEBUG,
|
||||
PRIO_SAMPLE,
|
||||
|
|
|
@ -88,7 +88,7 @@ def level_from_number(value):
|
|||
return levels.get(n, None)
|
||||
|
||||
|
||||
def url_fetch(url, allowed_schemes=('http', 'https'), verify=True):
|
||||
def url_fetch(url, timeout=1, allowed_schemes=('http', 'https'), verify=True):
|
||||
"""Get the data at the specified URL.
|
||||
|
||||
The URL must use the http: or https: schemes.
|
||||
|
@ -96,7 +96,6 @@ def url_fetch(url, allowed_schemes=('http', 'https'), verify=True):
|
|||
the allowed_schemes argument.
|
||||
Raise an IOError if getting the data fails.
|
||||
"""
|
||||
LOG.info('Fetching data from %s', url)
|
||||
|
||||
components = urllib.parse.urlparse(url)
|
||||
|
||||
|
@ -105,12 +104,12 @@ def url_fetch(url, allowed_schemes=('http', 'https'), verify=True):
|
|||
|
||||
if components.scheme == 'file':
|
||||
try:
|
||||
return urllib.request.urlopen(url).read()
|
||||
return urllib.request.urlopen(url, timeout=timeout).read()
|
||||
except urllib.error.URLError as uex:
|
||||
raise URLFetchError(_('Failed to retrieve data: %s') % uex)
|
||||
|
||||
try:
|
||||
resp = requests.get(url, stream=True, verify=verify)
|
||||
resp = requests.get(url, stream=True, verify=verify, timeout=timeout)
|
||||
resp.raise_for_status()
|
||||
|
||||
# We cannot use resp.text here because it would download the entire
|
||||
|
|
|
@ -17,6 +17,8 @@ trigger corresponding actions to recover the clusters based on the pre-defined
|
|||
health policies.
|
||||
"""
|
||||
|
||||
from collections import defaultdict
|
||||
from collections import namedtuple
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
|
@ -24,13 +26,13 @@ from oslo_service import service
|
|||
from oslo_service import threadgroup
|
||||
from oslo_utils import timeutils
|
||||
import re
|
||||
import six
|
||||
import time
|
||||
|
||||
from senlin.common import consts
|
||||
from senlin.common import context
|
||||
from senlin.common import messaging as rpc
|
||||
from senlin.common import utils
|
||||
from senlin.engine import node as node_mod
|
||||
from senlin import objects
|
||||
from senlin.rpc import client as rpc_client
|
||||
|
||||
|
@ -195,94 +197,91 @@ def ListenerProc(exchange, project_id, cluster_id, recover_action):
|
|||
listener.start()
|
||||
|
||||
|
||||
class HealthManager(service.Service):
|
||||
class HealthCheckType(object):
|
||||
@staticmethod
|
||||
def factory(detection_type, cid, interval, params):
|
||||
node_update_timeout = params['node_update_timeout']
|
||||
detection_params = [
|
||||
p for p in params['detection_modes']
|
||||
if p['type'] == detection_type
|
||||
]
|
||||
if len(detection_params) != 1:
|
||||
raise Exception(
|
||||
'The same detection mode cannot be used more than once in the '
|
||||
'same policy. Encountered {} instances of '
|
||||
'type {}.'.format(len(detection_params), detection_type)
|
||||
)
|
||||
|
||||
def __init__(self, engine_service, topic, version):
|
||||
super(HealthManager, self).__init__()
|
||||
|
||||
self.TG = threadgroup.ThreadGroup()
|
||||
self.engine_id = engine_service.engine_id
|
||||
self.topic = topic
|
||||
self.version = version
|
||||
self.ctx = context.get_admin_context()
|
||||
self.rpc_client = rpc_client.EngineClient()
|
||||
self.rt = {
|
||||
'registries': [],
|
||||
}
|
||||
|
||||
def _dummy_task(self):
|
||||
"""A Dummy task that is queued on the health manager thread group.
|
||||
|
||||
The task is here so that the service always has something to wait()
|
||||
on, or else the process will exit.
|
||||
"""
|
||||
self._load_runtime_registry()
|
||||
|
||||
def _wait_for_action(self, ctx, action_id, timeout):
|
||||
done = False
|
||||
req = objects.ActionGetRequest(identity=action_id)
|
||||
with timeutils.StopWatch(timeout) as timeout_watch:
|
||||
while timeout > 0:
|
||||
action = self.rpc_client.call(ctx, 'action_get', req)
|
||||
if action['status'] in [consts.ACTION_SUCCEEDED,
|
||||
consts.ACTION_FAILED,
|
||||
consts.ACTION_CANCELLED]:
|
||||
if action['status'] == consts.ACTION_SUCCEEDED:
|
||||
done = True
|
||||
break
|
||||
time.sleep(2)
|
||||
timeout = timeout_watch.leftover(True)
|
||||
|
||||
if done:
|
||||
return True, ""
|
||||
elif timeout <= 0:
|
||||
return False, "Timeout while polling cluster status"
|
||||
if detection_type == consts.NODE_STATUS_POLLING:
|
||||
return NodePollStatusHealthCheck(
|
||||
cid, interval, node_update_timeout, detection_params[0])
|
||||
elif detection_type == consts.NODE_STATUS_POLL_URL:
|
||||
return NodePollUrlHealthCheck(
|
||||
cid, interval, node_update_timeout, detection_params[0])
|
||||
else:
|
||||
return False, "Cluster check action failed or cancelled"
|
||||
raise Exception(
|
||||
'Invalid detection type: {}'.format(detection_type))
|
||||
|
||||
def _poll_cluster(self, cluster_id, timeout, recover_action):
|
||||
"""Routine to be executed for polling cluster status.
|
||||
def __init__(self, cluster_id, interval, node_update_timeout, params):
|
||||
"""Initialize HealthCheckType
|
||||
|
||||
:param ctx:
|
||||
:param cluster_id: The UUID of the cluster to be checked.
|
||||
:param timeout: The maximum number of seconds to wait.
|
||||
:param recover_action: The health policy action name.
|
||||
:returns: Nothing.
|
||||
:param params: Parameters specific to poll url or recovery action.
|
||||
"""
|
||||
self.cluster_id = cluster_id
|
||||
self.interval = interval
|
||||
self.node_update_timeout = node_update_timeout
|
||||
self.params = params
|
||||
|
||||
def run_health_check(self, ctx, node):
|
||||
"""Run health check on node
|
||||
|
||||
:returns: True if node is healthy. False otherwise.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class NodePollStatusHealthCheck(HealthCheckType):
|
||||
def run_health_check(self, ctx, node):
|
||||
"""Routine to be executed for polling node status.
|
||||
|
||||
:returns: True if node is healthy. False otherwise.
|
||||
"""
|
||||
start_time = timeutils.utcnow(True)
|
||||
cluster = objects.Cluster.get(self.ctx, cluster_id, project_safe=False)
|
||||
if not cluster:
|
||||
LOG.warning("Cluster (%s) is not found.", cluster_id)
|
||||
return _chase_up(start_time, timeout)
|
||||
|
||||
ctx = context.get_service_context(user_id=cluster.user,
|
||||
project_id=cluster.project)
|
||||
params = {'delete_check_action': True}
|
||||
try:
|
||||
req = objects.ClusterCheckRequest(identity=cluster_id,
|
||||
params=params)
|
||||
action = self.rpc_client.call(ctx, 'cluster_check', req)
|
||||
# create engine node from db node
|
||||
entity = node_mod.Node._from_object(ctx, node)
|
||||
|
||||
if not entity.do_check(ctx, return_check_result=True):
|
||||
# server was not found as a result of performing check
|
||||
node_last_updated = node.updated_at or node.init_at
|
||||
if not timeutils.is_older_than(
|
||||
node_last_updated, self.node_update_timeout):
|
||||
LOG.info("Node %s was updated at %s which is less "
|
||||
"than %d secs ago. Skip node recovery from "
|
||||
"NodePollStatusHealthCheck.",
|
||||
node.id, node_last_updated,
|
||||
self.node_update_timeout)
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
LOG.debug("NodePollStatusHealthCheck reports node %s is "
|
||||
"healthy.", node.id)
|
||||
return True
|
||||
except Exception as ex:
|
||||
LOG.warning("Failed in triggering 'cluster_check' RPC for "
|
||||
"'%(c)s': %(r)s",
|
||||
{'c': cluster_id, 'r': six.text_type(ex)})
|
||||
return _chase_up(start_time, timeout)
|
||||
LOG.warning(
|
||||
'Error when performing health check on node %s: %s',
|
||||
node.id, ex
|
||||
)
|
||||
return False
|
||||
|
||||
# wait for action to complete
|
||||
res, reason = self._wait_for_action(ctx, action['action'], timeout)
|
||||
if not res:
|
||||
LOG.warning("%s", reason)
|
||||
return _chase_up(start_time, timeout)
|
||||
|
||||
# loop through nodes to trigger recovery
|
||||
nodes = objects.Node.get_all_by_cluster(ctx, cluster_id)
|
||||
for node in nodes:
|
||||
if node.status != consts.NS_ACTIVE:
|
||||
LOG.info("Requesting node recovery: %s", node.id)
|
||||
req = objects.NodeRecoverRequest(identity=node.id,
|
||||
params=recover_action)
|
||||
self.rpc_client.call(ctx, 'node_recover', req)
|
||||
|
||||
return _chase_up(start_time, timeout)
|
||||
class NodePollUrlHealthCheck(HealthCheckType):
|
||||
@staticmethod
|
||||
def _convert_detection_tuple(dictionary):
|
||||
return namedtuple('DetectionMode', dictionary.keys())(**dictionary)
|
||||
|
||||
def _expand_url_template(self, url_template, node):
|
||||
"""Expands parameters in an URL template
|
||||
|
@ -300,31 +299,29 @@ class HealthManager(service.Service):
|
|||
|
||||
return url
|
||||
|
||||
def _check_url_and_recover_node(self, ctx, node, recover_action, params):
|
||||
def run_health_check(self, ctx, node):
|
||||
"""Routine to check a node status from a url and recovery if necessary
|
||||
|
||||
:param ctx: The request context to use for recovery action
|
||||
:param node: The node to be checked.
|
||||
:param recover_action: The health policy action name.
|
||||
:param params: Parameters specific to poll url or recovery action
|
||||
:returns: action if node was triggered for recovery. Otherwise None.
|
||||
:returns: True if node is considered to be healthy. False otherwise.
|
||||
"""
|
||||
|
||||
url_template = params['poll_url']
|
||||
verify_ssl = params['poll_url_ssl_verify']
|
||||
conn_error_as_unhealthy = params['poll_url_conn_error_as_unhealthy']
|
||||
expected_resp_str = params['poll_url_healthy_response']
|
||||
max_unhealthy_retry = params['poll_url_retry_limit']
|
||||
retry_interval = params['poll_url_retry_interval']
|
||||
node_update_timeout = params['node_update_timeout']
|
||||
url_template = self.params['poll_url']
|
||||
verify_ssl = self.params['poll_url_ssl_verify']
|
||||
conn_error_as_unhealthy = self.params[
|
||||
'poll_url_conn_error_as_unhealthy']
|
||||
expected_resp_str = self.params['poll_url_healthy_response']
|
||||
max_unhealthy_retry = self.params['poll_url_retry_limit']
|
||||
retry_interval = self.params['poll_url_retry_interval']
|
||||
|
||||
def stop_node_recovery():
|
||||
node_last_updated = node.updated_at or node.init_at
|
||||
if not timeutils.is_older_than(
|
||||
node_last_updated, node_update_timeout):
|
||||
node_last_updated, self.node_update_timeout):
|
||||
LOG.info("Node %s was updated at %s which is less than "
|
||||
"%d secs ago. Skip node recovery.",
|
||||
node.id, node_last_updated, node_update_timeout)
|
||||
"%d secs ago. Skip node recovery from "
|
||||
"NodePollUrlHealthCheck.",
|
||||
node.id, node_last_updated, self.node_update_timeout)
|
||||
return True
|
||||
|
||||
LOG.info("Node %s is reported as down (%d retries left)",
|
||||
|
@ -334,84 +331,71 @@ class HealthManager(service.Service):
|
|||
return False
|
||||
|
||||
url = self._expand_url_template(url_template, node)
|
||||
LOG.info("Polling node status from URL: %s", url)
|
||||
LOG.debug("Polling node status from URL: %s", url)
|
||||
|
||||
available_attemps = max_unhealthy_retry
|
||||
timeout = max(retry_interval * 0.1, 1)
|
||||
while available_attemps > 0:
|
||||
available_attemps -= 1
|
||||
|
||||
try:
|
||||
result = utils.url_fetch(url, verify=verify_ssl)
|
||||
result = utils.url_fetch(
|
||||
url, timeout=timeout, verify=verify_ssl)
|
||||
except utils.URLFetchError as ex:
|
||||
if conn_error_as_unhealthy:
|
||||
if stop_node_recovery():
|
||||
return None
|
||||
return True
|
||||
continue
|
||||
else:
|
||||
LOG.error("Error when requesting node health status from"
|
||||
" %s: %s", url, ex)
|
||||
return None
|
||||
return True
|
||||
|
||||
LOG.debug("Node status returned from URL(%s): %s", url,
|
||||
result)
|
||||
if re.search(expected_resp_str, result):
|
||||
LOG.debug('Node %s is healthy', node.id)
|
||||
return None
|
||||
LOG.debug('NodePollUrlHealthCheck reports node %s is healthy.',
|
||||
node.id)
|
||||
return True
|
||||
|
||||
if node.status != consts.NS_ACTIVE:
|
||||
LOG.info("Skip node recovery because node %s is not in "
|
||||
"ACTIVE state", node.id)
|
||||
return None
|
||||
"ACTIVE state.", node.id)
|
||||
return True
|
||||
|
||||
if stop_node_recovery():
|
||||
return None
|
||||
return True
|
||||
|
||||
# recover node after exhausting retries
|
||||
LOG.info("Requesting node recovery: %s", node.id)
|
||||
req = objects.NodeRecoverRequest(identity=node.id,
|
||||
params=recover_action)
|
||||
return False
|
||||
|
||||
return self.rpc_client.call(ctx, 'node_recover', req)
|
||||
|
||||
def _poll_url(self, cluster_id, timeout, recover_action, params):
|
||||
"""Routine to be executed for polling node status from a url
|
||||
class HealthManager(service.Service):
|
||||
|
||||
:param cluster_id: The UUID of the cluster to be checked.
|
||||
:param timeout: The maximum number of seconds to wait for recovery
|
||||
action
|
||||
:param recover_action: The health policy action name.
|
||||
:param params: Parameters specific to poll url or recovery action
|
||||
:returns: Nothing.
|
||||
def __init__(self, engine_service, topic, version):
|
||||
super(HealthManager, self).__init__()
|
||||
|
||||
self.TG = threadgroup.ThreadGroup()
|
||||
self.engine_id = engine_service.engine_id
|
||||
self.topic = topic
|
||||
self.version = version
|
||||
self.ctx = context.get_admin_context()
|
||||
self.rpc_client = rpc_client.EngineClient()
|
||||
self.rt = {
|
||||
'registries': [],
|
||||
}
|
||||
self.health_check_types = defaultdict(lambda: [])
|
||||
|
||||
def _dummy_task(self):
|
||||
"""A Dummy task that is queued on the health manager thread group.
|
||||
|
||||
The task is here so that the service always has something to wait()
|
||||
on, or else the process will exit.
|
||||
"""
|
||||
start_time = timeutils.utcnow(True)
|
||||
|
||||
cluster = objects.Cluster.get(self.ctx, cluster_id, project_safe=False)
|
||||
if not cluster:
|
||||
LOG.warning("Cluster (%s) is not found.", cluster_id)
|
||||
return _chase_up(start_time, timeout)
|
||||
|
||||
ctx = context.get_service_context(user_id=cluster.user,
|
||||
project_id=cluster.project)
|
||||
|
||||
actions = []
|
||||
|
||||
# loop through nodes to poll url for each node
|
||||
nodes = objects.Node.get_all_by_cluster(ctx, cluster_id)
|
||||
for node in nodes:
|
||||
action = self._check_url_and_recover_node(ctx, node,
|
||||
recover_action, params)
|
||||
if action:
|
||||
actions.append(action)
|
||||
|
||||
for a in actions:
|
||||
# wait for action to complete
|
||||
res, reason = self._wait_for_action(ctx, a['action'], timeout)
|
||||
if not res:
|
||||
LOG.warning("Node recovery action %s did not complete "
|
||||
"within specified timeout: %s", a['action'],
|
||||
reason)
|
||||
|
||||
return _chase_up(start_time, timeout)
|
||||
try:
|
||||
self._load_runtime_registry()
|
||||
except Exception as ex:
|
||||
LOG.error("Failed when running '_load_runtime_registry': %s", ex)
|
||||
|
||||
def _add_listener(self, cluster_id, recover_action):
|
||||
"""Routine to be executed for adding cluster listener.
|
||||
|
@ -438,12 +422,129 @@ class HealthManager(service.Service):
|
|||
return self.TG.add_thread(ListenerProc, exchange, project, cluster_id,
|
||||
recover_action)
|
||||
|
||||
def _recover_node(self, node_id, ctx, recover_action):
|
||||
"""Recover node
|
||||
|
||||
:returns: Recover action
|
||||
"""
|
||||
try:
|
||||
LOG.info("%s is requesting node recovery "
|
||||
"for %s.", self.__class__.__name__, node_id)
|
||||
req = objects.NodeRecoverRequest(identity=node_id,
|
||||
params=recover_action)
|
||||
|
||||
return self.rpc_client.call(ctx, 'node_recover', req)
|
||||
except Exception as ex:
|
||||
LOG.error('Error when performing node recovery for %s: %s',
|
||||
node_id, ex)
|
||||
return None
|
||||
|
||||
def _wait_for_action(self, ctx, action_id, timeout):
|
||||
req = objects.ActionGetRequest(identity=action_id)
|
||||
with timeutils.StopWatch(timeout) as timeout_watch:
|
||||
while not timeout_watch.expired():
|
||||
action = self.rpc_client.call(ctx, 'action_get', req)
|
||||
if action['status'] in [
|
||||
consts.ACTION_SUCCEEDED, consts.ACTION_FAILED,
|
||||
consts.ACTION_CANCELLED]:
|
||||
break
|
||||
time.sleep(2)
|
||||
|
||||
if action['status'] == consts.ACTION_SUCCEEDED:
|
||||
return True, ""
|
||||
|
||||
if (action['status'] == consts.ACTION_FAILED or
|
||||
action['status'] == consts.ACTION_CANCELLED):
|
||||
return False, "Cluster check action failed or cancelled"
|
||||
|
||||
return False, ("Timeout while waiting for node recovery action to "
|
||||
"finish")
|
||||
|
||||
def _add_health_check(self, cluster_id, health_check):
|
||||
self.health_check_types[cluster_id].append(health_check)
|
||||
|
||||
def _execute_health_check(self, interval, cluster_id,
|
||||
recover_action, recovery_cond,
|
||||
node_update_timeout):
|
||||
start_time = timeutils.utcnow(True)
|
||||
|
||||
try:
|
||||
if cluster_id not in self.health_check_types:
|
||||
LOG.error("Cluster (%s) is not found in health_check_types.",
|
||||
self.cluster_id)
|
||||
return _chase_up(start_time, interval)
|
||||
|
||||
if len(self.health_check_types[cluster_id]) == 0:
|
||||
LOG.error("No health check types found for Cluster (%s).",
|
||||
self.cluster_id)
|
||||
return _chase_up(start_time, interval)
|
||||
|
||||
cluster = objects.Cluster.get(self.ctx, cluster_id,
|
||||
project_safe=False)
|
||||
if not cluster:
|
||||
LOG.warning("Cluster (%s) is not found.", self.cluster_id)
|
||||
return _chase_up(start_time, interval)
|
||||
|
||||
ctx = context.get_service_context(user_id=cluster.user,
|
||||
project_id=cluster.project)
|
||||
|
||||
actions = []
|
||||
|
||||
# loop through nodes and run all health checks on each node
|
||||
nodes = objects.Node.get_all_by_cluster(ctx, cluster_id)
|
||||
|
||||
for node in nodes:
|
||||
node_is_healthy = True
|
||||
|
||||
if recovery_cond == consts.ANY_FAILED:
|
||||
# recovery happens if any detection mode fails
|
||||
# i.e. the inverse logic is that node is considered healthy
|
||||
# if all detection modes pass
|
||||
node_is_healthy = all(
|
||||
hc.run_health_check(ctx, node)
|
||||
for hc in self.health_check_types[cluster_id])
|
||||
elif recovery_cond == consts.ALL_FAILED:
|
||||
# recovery happens if all detection modes fail
|
||||
# i.e. the inverse logic is that node is considered healthy
|
||||
# if any detection mode passes
|
||||
node_is_healthy = any(
|
||||
hc.run_health_check(ctx, node)
|
||||
for hc in self.health_check_types[cluster_id])
|
||||
else:
|
||||
raise Exception(
|
||||
'{} is an invalid recovery conditional'.format(
|
||||
recovery_cond))
|
||||
|
||||
if not node_is_healthy:
|
||||
action = self._recover_node(node.id, ctx,
|
||||
recover_action)
|
||||
actions.append(action)
|
||||
|
||||
for a in actions:
|
||||
# wait for action to complete
|
||||
res, reason = self._wait_for_action(
|
||||
ctx, a['action'], node_update_timeout)
|
||||
if not res:
|
||||
LOG.warning("Node recovery action %s did not complete "
|
||||
"within specified timeout: %s", a['action'],
|
||||
reason)
|
||||
|
||||
if len(actions) > 0:
|
||||
LOG.info('Health check passed for all nodes in cluster %s.',
|
||||
cluster_id)
|
||||
except Exception as ex:
|
||||
LOG.warning('Error while performing health check: %s', ex)
|
||||
|
||||
return _chase_up(start_time, interval)
|
||||
|
||||
def _start_check(self, entry):
|
||||
"""Routine for starting the checking for a cluster.
|
||||
|
||||
:param entry: A dict containing the data associated with the cluster.
|
||||
:returns: An updated registry entry record.
|
||||
"""
|
||||
LOG.info('Enabling health check for cluster %s.', entry['cluster_id'])
|
||||
|
||||
cid = entry['cluster_id']
|
||||
ctype = entry['check_type']
|
||||
# Get the recover action parameter from the entry params
|
||||
|
@ -459,22 +560,24 @@ class HealthManager(service.Service):
|
|||
for operation in rac:
|
||||
recover_action['operation'] = operation.get('name')
|
||||
|
||||
if ctype == consts.NODE_STATUS_POLLING:
|
||||
polling_types = [consts.NODE_STATUS_POLLING,
|
||||
consts.NODE_STATUS_POLL_URL]
|
||||
|
||||
detection_types = ctype.split(',')
|
||||
if all(check in polling_types for check in detection_types):
|
||||
interval = min(entry['interval'], cfg.CONF.check_interval_max)
|
||||
timer = self.TG.add_dynamic_timer(self._poll_cluster,
|
||||
None, # initial_delay
|
||||
None, # check_interval_max
|
||||
cid, interval, recover_action)
|
||||
for check in ctype.split(','):
|
||||
self._add_health_check(cid, HealthCheckType.factory(
|
||||
check, cid, interval, params))
|
||||
timer = self.TG.add_dynamic_timer(self._execute_health_check,
|
||||
None, None, interval, cid,
|
||||
recover_action,
|
||||
params['recovery_conditional'],
|
||||
params['node_update_timeout'])
|
||||
|
||||
entry['timer'] = timer
|
||||
elif ctype == consts.NODE_STATUS_POLL_URL:
|
||||
interval = min(entry['interval'], cfg.CONF.check_interval_max)
|
||||
timer = self.TG.add_dynamic_timer(self._poll_url,
|
||||
None, # initial_delay
|
||||
None, # check_interval_max
|
||||
cid, interval,
|
||||
recover_action, params)
|
||||
entry['timer'] = timer
|
||||
elif ctype == consts.LIFECYCLE_EVENTS:
|
||||
elif (len(detection_types) == 1 and
|
||||
detection_types[0] == consts.LIFECYCLE_EVENTS):
|
||||
LOG.info("Start listening events for cluster (%s).", cid)
|
||||
listener = self._add_listener(cid, recover_action)
|
||||
if listener:
|
||||
|
@ -483,8 +586,8 @@ class HealthManager(service.Service):
|
|||
LOG.warning("Error creating listener for cluster %s", cid)
|
||||
return None
|
||||
else:
|
||||
LOG.warning("Cluster %(id)s check type %(type)s is invalid.",
|
||||
{'id': cid, 'type': ctype})
|
||||
LOG.error("Cluster %(id)s check type %(type)s is invalid.",
|
||||
{'id': cid, 'type': ctype})
|
||||
return None
|
||||
|
||||
return entry
|
||||
|
@ -495,10 +598,17 @@ class HealthManager(service.Service):
|
|||
:param entry: A dict containing the data associated with the cluster.
|
||||
:returns: ``None``.
|
||||
"""
|
||||
LOG.info('Disabling health check for cluster %s.', entry['cluster_id'])
|
||||
|
||||
timer = entry.get('timer', None)
|
||||
if timer:
|
||||
# stop timer
|
||||
timer.stop()
|
||||
|
||||
# tell threadgroup to remove timer
|
||||
self.TG.timer_done(timer)
|
||||
if entry['cluster_id'] in self.health_check_types:
|
||||
self.health_check_types.pop(entry['cluster_id'])
|
||||
return
|
||||
|
||||
listener = entry.get('listener', None)
|
||||
|
@ -558,13 +668,13 @@ class HealthManager(service.Service):
|
|||
"""Respond to confirm that the rpc service is still alive."""
|
||||
return True
|
||||
|
||||
def register_cluster(self, ctx, cluster_id, check_type, interval=None,
|
||||
params=None, enabled=True):
|
||||
def register_cluster(self, ctx, cluster_id, interval=None,
|
||||
node_update_timeout=None, params=None,
|
||||
enabled=True):
|
||||
"""Register cluster for health checking.
|
||||
|
||||
:param ctx: The context of notify request.
|
||||
:param cluster_id: The ID of the cluster to be checked.
|
||||
:param check_type: A string indicating the type of checks.
|
||||
:param interval: An optional integer indicating the length of checking
|
||||
periods in seconds.
|
||||
:param dict params: Other parameters for the health check.
|
||||
|
@ -572,6 +682,17 @@ class HealthManager(service.Service):
|
|||
"""
|
||||
params = params or {}
|
||||
|
||||
# extract check_type from params
|
||||
check_type = ""
|
||||
if 'detection_modes' in params:
|
||||
check_type = ','.join([
|
||||
NodePollUrlHealthCheck._convert_detection_tuple(d).type
|
||||
for d in params['detection_modes']
|
||||
])
|
||||
|
||||
# add node_update_timeout to params
|
||||
params['node_update_timeout'] = node_update_timeout
|
||||
|
||||
registry = objects.HealthRegistry.create(ctx, cluster_id, check_type,
|
||||
interval, params,
|
||||
self.engine_id,
|
||||
|
@ -603,6 +724,7 @@ class HealthManager(service.Service):
|
|||
self._stop_check(entry)
|
||||
self.rt['registries'].pop(i)
|
||||
objects.HealthRegistry.delete(ctx, cluster_id)
|
||||
LOG.debug('unregister done')
|
||||
|
||||
def enable_cluster(self, ctx, cluster_id, params=None):
|
||||
for c in self.rt['registries']:
|
||||
|
@ -651,12 +773,12 @@ def notify(engine_id, method, **kwargs):
|
|||
def register(cluster_id, engine_id=None, **kwargs):
|
||||
params = kwargs.pop('params', {})
|
||||
interval = kwargs.pop('interval', cfg.CONF.periodic_interval)
|
||||
check_type = kwargs.pop('check_type', consts.NODE_STATUS_POLLING)
|
||||
node_update_timeout = kwargs.pop('node_update_timeout', 300)
|
||||
enabled = kwargs.pop('enabled', True)
|
||||
return notify(engine_id, 'register_cluster',
|
||||
cluster_id=cluster_id,
|
||||
interval=interval,
|
||||
check_type=check_type,
|
||||
node_update_timeout=node_update_timeout,
|
||||
params=params,
|
||||
enabled=enabled)
|
||||
|
||||
|
|
|
@ -316,7 +316,7 @@ class Node(object):
|
|||
self.index = -1
|
||||
return True
|
||||
|
||||
def do_check(self, context):
|
||||
def do_check(self, context, return_check_result=False):
|
||||
if not self.physical_id:
|
||||
return False
|
||||
|
||||
|
@ -330,6 +330,9 @@ class Node(object):
|
|||
self.set_status(context, consts.NS_ERROR, six.text_type(ex))
|
||||
return False
|
||||
|
||||
if return_check_result:
|
||||
return res
|
||||
|
||||
# Physical object is ACTIVE but for some reason the node status in
|
||||
# senlin was WARNING. We only update the status_reason
|
||||
if res:
|
||||
|
|
|
@ -10,7 +10,9 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from collections import namedtuple
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from senlin.common import constraints
|
||||
from senlin.common import consts
|
||||
|
@ -21,16 +23,21 @@ from senlin.common import schema
|
|||
from senlin.engine import health_manager
|
||||
from senlin.policies import base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HealthPolicy(base.Policy):
|
||||
"""Policy for health management of a cluster."""
|
||||
|
||||
VERSION = '1.0'
|
||||
VERSION = '1.1'
|
||||
VERSIONS = {
|
||||
'1.0': [
|
||||
{'status': consts.EXPERIMENTAL, 'since': '2017.02'},
|
||||
{'status': consts.SUPPORTED, 'since': '2018.06'},
|
||||
]
|
||||
],
|
||||
'1.1': [
|
||||
{'status': consts.SUPPORTED, 'since': '2018.09'}
|
||||
],
|
||||
}
|
||||
PRIORITY = 600
|
||||
|
||||
|
@ -55,20 +62,21 @@ class HealthPolicy(base.Policy):
|
|||
KEYS = (DETECTION, RECOVERY) = ('detection', 'recovery')
|
||||
|
||||
_DETECTION_KEYS = (
|
||||
DETECTION_TYPE, DETECTION_OPTIONS,
|
||||
DETECTION_MODES, DETECTION_TYPE, DETECTION_OPTIONS, DETECTION_INTERVAL,
|
||||
NODE_UPDATE_TIMEOUT, RECOVERY_CONDITIONAL
|
||||
) = (
|
||||
'type', 'options'
|
||||
'detection_modes', 'type', 'options', 'interval',
|
||||
'node_update_timeout', 'recovery_conditional'
|
||||
)
|
||||
|
||||
_DETECTION_OPTIONS = (
|
||||
DETECTION_INTERVAL, POLL_URL, POLL_URL_SSL_VERIFY,
|
||||
POLL_URL, POLL_URL_SSL_VERIFY,
|
||||
POLL_URL_CONN_ERROR_AS_UNHEALTHY, POLL_URL_HEALTHY_RESPONSE,
|
||||
POLL_URL_RETRY_LIMIT, POLL_URL_RETRY_INTERVAL, NODE_UPDATE_TIMEOUT,
|
||||
POLL_URL_RETRY_LIMIT, POLL_URL_RETRY_INTERVAL,
|
||||
) = (
|
||||
'interval', 'poll_url', 'poll_url_ssl_verify',
|
||||
'poll_url', 'poll_url_ssl_verify',
|
||||
'poll_url_conn_error_as_unhealthy', 'poll_url_healthy_response',
|
||||
'poll_url_retry_limit', 'poll_url_retry_interval',
|
||||
'node_update_timeout',
|
||||
'poll_url_retry_limit', 'poll_url_retry_interval'
|
||||
)
|
||||
|
||||
_RECOVERY_KEYS = (
|
||||
|
@ -96,68 +104,100 @@ class HealthPolicy(base.Policy):
|
|||
DETECTION: schema.Map(
|
||||
_('Policy aspect for node failure detection.'),
|
||||
schema={
|
||||
DETECTION_TYPE: schema.String(
|
||||
_('Type of node failure detection.'),
|
||||
DETECTION_INTERVAL: schema.Integer(
|
||||
_("Number of seconds between pollings. Only "
|
||||
"required when type is 'NODE_STATUS_POLLING' or "
|
||||
"'NODE_STATUS_POLL_URL'."),
|
||||
default=60,
|
||||
),
|
||||
NODE_UPDATE_TIMEOUT: schema.Integer(
|
||||
_("Number of seconds since last node update to "
|
||||
"wait before checking node health."),
|
||||
default=300,
|
||||
),
|
||||
RECOVERY_CONDITIONAL: schema.String(
|
||||
_("The conditional that determines when recovery should be"
|
||||
" performed in case multiple detection modes are "
|
||||
"specified. 'ALL_FAILED' means that all "
|
||||
"detection modes have to return failed health checks "
|
||||
"before a node is recovered. 'ANY_FAILED'"
|
||||
" means that a failed health check with a single "
|
||||
"detection mode triggers a node recovery."),
|
||||
constraints=[
|
||||
constraints.AllowedValues(consts.DETECTION_TYPES),
|
||||
constraints.AllowedValues(
|
||||
consts.RECOVERY_CONDITIONAL),
|
||||
],
|
||||
required=True,
|
||||
),
|
||||
DETECTION_OPTIONS: schema.Map(
|
||||
schema={
|
||||
DETECTION_INTERVAL: schema.Integer(
|
||||
_("Number of seconds between pollings. Only "
|
||||
"required when type is 'NODE_STATUS_POLLING' or "
|
||||
"'NODE_STATUS_POLL_URL'."),
|
||||
default=60,
|
||||
),
|
||||
POLL_URL: schema.String(
|
||||
_("URL to poll for node status. See documentation "
|
||||
"for valid expansion parameters. Only required "
|
||||
"when type is 'NODE_STATUS_POLL_URL'."),
|
||||
default='',
|
||||
),
|
||||
POLL_URL_SSL_VERIFY: schema.Boolean(
|
||||
_("Whether to verify SSL when calling URL to poll "
|
||||
"for node status. Only required when type is "
|
||||
"'NODE_STATUS_POLL_URL'."),
|
||||
default=True,
|
||||
),
|
||||
POLL_URL_CONN_ERROR_AS_UNHEALTHY: schema.Boolean(
|
||||
_("Whether to treat URL connection errors as an "
|
||||
"indication of an unhealthy node. Only required "
|
||||
"when type is 'NODE_STATUS_POLL_URL'."),
|
||||
default=True,
|
||||
),
|
||||
POLL_URL_HEALTHY_RESPONSE: schema.String(
|
||||
_("String pattern in the poll URL response body "
|
||||
"that indicates a healthy node. "
|
||||
"Required when type is 'NODE_STATUS_POLL_URL'."),
|
||||
default='',
|
||||
),
|
||||
POLL_URL_RETRY_LIMIT: schema.Integer(
|
||||
_("Number of times to retry URL polling when its "
|
||||
"return body is missing "
|
||||
"POLL_URL_HEALTHY_RESPONSE string before a node "
|
||||
"is considered down. Required when type is "
|
||||
"'NODE_STATUS_POLL_URL'."),
|
||||
default=3,
|
||||
),
|
||||
POLL_URL_RETRY_INTERVAL: schema.Integer(
|
||||
_("Number of seconds between URL polling retries "
|
||||
"before a node is considered down. "
|
||||
"Required when type is 'NODE_STATUS_POLL_URL'."),
|
||||
default=3,
|
||||
),
|
||||
NODE_UPDATE_TIMEOUT: schema.Integer(
|
||||
_("Number of seconds since last node update to "
|
||||
"wait before checking node health. "
|
||||
"Required when type is 'NODE_STATUS_POLL_URL'."),
|
||||
default=300,
|
||||
),
|
||||
},
|
||||
default={}
|
||||
default=consts.ANY_FAILED,
|
||||
required=False,
|
||||
),
|
||||
DETECTION_MODES: schema.List(
|
||||
_('List of node failure detection modes.'),
|
||||
schema=schema.Map(
|
||||
_('Node failure detection mode to try'),
|
||||
schema={
|
||||
DETECTION_TYPE: schema.String(
|
||||
_('Type of node failure detection.'),
|
||||
constraints=[
|
||||
constraints.AllowedValues(
|
||||
consts.DETECTION_TYPES),
|
||||
],
|
||||
required=True,
|
||||
),
|
||||
DETECTION_OPTIONS: schema.Map(
|
||||
schema={
|
||||
POLL_URL: schema.String(
|
||||
_("URL to poll for node status. See "
|
||||
"documentation for valid expansion "
|
||||
"parameters. Only required "
|
||||
"when type is "
|
||||
"'NODE_STATUS_POLL_URL'."),
|
||||
default='',
|
||||
),
|
||||
POLL_URL_SSL_VERIFY: schema.Boolean(
|
||||
_("Whether to verify SSL when calling "
|
||||
"URL to poll for node status. Only "
|
||||
"required when type is "
|
||||
"'NODE_STATUS_POLL_URL'."),
|
||||
default=True,
|
||||
),
|
||||
POLL_URL_CONN_ERROR_AS_UNHEALTHY:
|
||||
schema.Boolean(
|
||||
_("Whether to treat URL connection "
|
||||
"errors as an indication of an "
|
||||
"unhealthy node. Only required "
|
||||
"when type is "
|
||||
"'NODE_STATUS_POLL_URL'."),
|
||||
default=True,
|
||||
),
|
||||
POLL_URL_HEALTHY_RESPONSE: schema.String(
|
||||
_("String pattern in the poll URL "
|
||||
"response body that indicates a "
|
||||
"healthy node. Required when type "
|
||||
"is 'NODE_STATUS_POLL_URL'."),
|
||||
default='',
|
||||
),
|
||||
POLL_URL_RETRY_LIMIT: schema.Integer(
|
||||
_("Number of times to retry URL "
|
||||
"polling when its return body is "
|
||||
"missing POLL_URL_HEALTHY_RESPONSE "
|
||||
"string before a node is considered "
|
||||
"down. Required when type is "
|
||||
"'NODE_STATUS_POLL_URL'."),
|
||||
default=3,
|
||||
),
|
||||
POLL_URL_RETRY_INTERVAL: schema.Integer(
|
||||
_("Number of seconds between URL "
|
||||
"polling retries before a node is "
|
||||
"considered down. Required when "
|
||||
"type is 'NODE_STATUS_POLL_URL'."),
|
||||
default=3,
|
||||
),
|
||||
},
|
||||
default={}
|
||||
),
|
||||
}
|
||||
)
|
||||
)
|
||||
},
|
||||
required=True,
|
||||
),
|
||||
|
@ -209,27 +249,44 @@ class HealthPolicy(base.Policy):
|
|||
"action is RECREATE."),
|
||||
default=False,
|
||||
),
|
||||
}
|
||||
},
|
||||
required=True,
|
||||
),
|
||||
}
|
||||
|
||||
def __init__(self, name, spec, **kwargs):
|
||||
super(HealthPolicy, self).__init__(name, spec, **kwargs)
|
||||
|
||||
self.check_type = self.properties[self.DETECTION][self.DETECTION_TYPE]
|
||||
self.interval = self.properties[self.DETECTION].get(
|
||||
self.DETECTION_INTERVAL, 60)
|
||||
|
||||
options = self.properties[self.DETECTION][self.DETECTION_OPTIONS]
|
||||
self.interval = options.get(self.DETECTION_INTERVAL, 60)
|
||||
self.poll_url = options.get(self.POLL_URL, '')
|
||||
self.poll_url_ssl_verify = options.get(self.POLL_URL_SSL_VERIFY, True)
|
||||
self.poll_url_conn_error_as_unhealthy = options.get(
|
||||
self.POLL_URL_CONN_ERROR_AS_UNHEALTHY, True)
|
||||
self.poll_url_healthy_response = options.get(
|
||||
self.POLL_URL_HEALTHY_RESPONSE, '')
|
||||
self.poll_url_retry_limit = options.get(self.POLL_URL_RETRY_LIMIT, '')
|
||||
self.poll_url_retry_interval = options.get(
|
||||
self.POLL_URL_RETRY_INTERVAL, '')
|
||||
self.node_update_timeout = options.get(self.NODE_UPDATE_TIMEOUT, 300)
|
||||
self.node_update_timeout = self.properties[self.DETECTION].get(
|
||||
self.NODE_UPDATE_TIMEOUT, 300)
|
||||
|
||||
self.recovery_conditional = self.properties[self.DETECTION].get(
|
||||
self.RECOVERY_CONDITIONAL, consts.ANY_FAILED)
|
||||
|
||||
DetectionMode = namedtuple(
|
||||
'DetectionMode',
|
||||
[self.DETECTION_TYPE] + list(self._DETECTION_OPTIONS))
|
||||
|
||||
self.detection_modes = []
|
||||
|
||||
raw_modes = self.properties[self.DETECTION][self.DETECTION_MODES]
|
||||
for mode in raw_modes:
|
||||
options = mode[self.DETECTION_OPTIONS]
|
||||
|
||||
self.detection_modes.append(
|
||||
DetectionMode(
|
||||
mode[self.DETECTION_TYPE],
|
||||
options.get(self.POLL_URL, ''),
|
||||
options.get(self.POLL_URL_SSL_VERIFY, True),
|
||||
options.get(self.POLL_URL_CONN_ERROR_AS_UNHEALTHY, True),
|
||||
options.get(self.POLL_URL_HEALTHY_RESPONSE, ''),
|
||||
options.get(self.POLL_URL_RETRY_LIMIT, ''),
|
||||
options.get(self.POLL_URL_RETRY_INTERVAL, '')
|
||||
)
|
||||
)
|
||||
|
||||
recover_settings = self.properties[self.RECOVERY]
|
||||
self.recover_actions = recover_settings[self.RECOVERY_ACTIONS]
|
||||
|
@ -257,6 +314,30 @@ class HealthPolicy(base.Policy):
|
|||
cfg.CONF.health_check_interval_min}
|
||||
raise exc.InvalidSpec(message=message)
|
||||
|
||||
# check valid detection types
|
||||
polling_types = [consts.NODE_STATUS_POLLING,
|
||||
consts.NODE_STATUS_POLL_URL]
|
||||
|
||||
has_valid_polling_types = all(
|
||||
d.type in polling_types
|
||||
for d in self.detection_modes
|
||||
)
|
||||
has_valid_lifecycle_type = (
|
||||
len(self.detection_modes) == 1 and
|
||||
self.detection_modes[0].type == consts.LIFECYCLE_EVENTS
|
||||
)
|
||||
|
||||
if not has_valid_polling_types and not has_valid_lifecycle_type:
|
||||
message = ("Invalid detection modes in health policy: %s" %
|
||||
', '.join([d.type for d in self.detection_modes]))
|
||||
raise exc.InvalidSpec(message=message)
|
||||
|
||||
if len(self.detection_modes) != len(set(self.detection_modes)):
|
||||
message = ("Duplicate detection modes are not allowed in "
|
||||
"health policy: %s" %
|
||||
', '.join([d.type for d in self.detection_modes]))
|
||||
raise exc.InvalidSpec(message=message)
|
||||
|
||||
# TODO(Qiming): Add detection of duplicated action names when
|
||||
# support to list of actions is implemented.
|
||||
|
||||
|
@ -283,40 +364,33 @@ class HealthPolicy(base.Policy):
|
|||
return False, err_msg
|
||||
|
||||
kwargs = {
|
||||
'check_type': self.check_type,
|
||||
'interval': self.interval,
|
||||
'node_update_timeout': self.node_update_timeout,
|
||||
'params': {
|
||||
'recover_action': self.recover_actions,
|
||||
'poll_url': self.poll_url,
|
||||
'poll_url_ssl_verify': self.poll_url_ssl_verify,
|
||||
'poll_url_conn_error_as_unhealthy':
|
||||
self.poll_url_conn_error_as_unhealthy,
|
||||
'poll_url_healthy_response': self.poll_url_healthy_response,
|
||||
'poll_url_retry_limit': self.poll_url_retry_limit,
|
||||
'poll_url_retry_interval': self.poll_url_retry_interval,
|
||||
'node_update_timeout': self.node_update_timeout,
|
||||
'node_delete_timeout': self.node_delete_timeout,
|
||||
'node_force_recreate': self.node_force_recreate,
|
||||
'recovery_conditional': self.recovery_conditional,
|
||||
},
|
||||
'enabled': enabled
|
||||
}
|
||||
|
||||
converted_detection_modes = [
|
||||
d._asdict() for d in self.detection_modes
|
||||
]
|
||||
detection_mode = {'detection_modes': converted_detection_modes}
|
||||
kwargs['params'].update(detection_mode)
|
||||
|
||||
health_manager.register(cluster.id, engine_id=None, **kwargs)
|
||||
|
||||
data = {
|
||||
'check_type': self.check_type,
|
||||
'interval': self.interval,
|
||||
'poll_url': self.poll_url,
|
||||
'poll_url_ssl_verify': self.poll_url_ssl_verify,
|
||||
'poll_url_conn_error_as_unhealthy':
|
||||
self.poll_url_conn_error_as_unhealthy,
|
||||
'poll_url_healthy_response': self.poll_url_healthy_response,
|
||||
'poll_url_retry_limit': self.poll_url_retry_limit,
|
||||
'poll_url_retry_interval': self.poll_url_retry_interval,
|
||||
'node_update_timeout': self.node_update_timeout,
|
||||
'recovery_conditional': self.recovery_conditional,
|
||||
'node_delete_timeout': self.node_delete_timeout,
|
||||
'node_force_recreate': self.node_force_recreate,
|
||||
}
|
||||
data.update(detection_mode)
|
||||
|
||||
return True, self._build_policy_data(data)
|
||||
|
||||
|
@ -327,7 +401,10 @@ class HealthPolicy(base.Policy):
|
|||
:param cluster: The target cluster.
|
||||
:returns: A tuple comprising the execution result and reason.
|
||||
"""
|
||||
health_manager.unregister(cluster.id)
|
||||
ret = health_manager.unregister(cluster.id)
|
||||
if not ret:
|
||||
LOG.warning('Unregistering health manager for cluster %s '
|
||||
'timed out.', cluster.id)
|
||||
return True, ''
|
||||
|
||||
def pre_op(self, cluster_id, action, **args):
|
||||
|
|
|
@ -11,8 +11,10 @@
|
|||
# under the License.
|
||||
|
||||
import copy
|
||||
import eventlet
|
||||
import inspect
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_context import context as oslo_context
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
|
@ -302,7 +304,7 @@ class Profile(object):
|
|||
try:
|
||||
return profile.do_check(obj)
|
||||
except exc.InternalError as ex:
|
||||
LOG.error(ex)
|
||||
LOG.debug(ex)
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
|
@ -518,6 +520,13 @@ class Profile(object):
|
|||
raise exc.EResourceOperation(op='recovering', type='node',
|
||||
id=obj.id,
|
||||
message=six.text_type(ex))
|
||||
|
||||
# pause to allow deleted resource to get reclaimed by nova
|
||||
# this is needed to avoid a problem when the compute resources are
|
||||
# at their quota limit. The deleted resource has to become available
|
||||
# so that the new node can be created.
|
||||
eventlet.sleep(cfg.CONF.batch_interval)
|
||||
|
||||
res = None
|
||||
try:
|
||||
res = self.do_create(obj)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -10,6 +10,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from collections import namedtuple
|
||||
import copy
|
||||
|
||||
import mock
|
||||
|
@ -35,13 +36,15 @@ class TestHealthPolicy(base.SenlinTestCase):
|
|||
|
||||
self.spec = {
|
||||
'type': 'senlin.policy.health',
|
||||
'version': '1.0',
|
||||
'version': '1.1',
|
||||
'properties': {
|
||||
'detection': {
|
||||
'type': 'NODE_STATUS_POLLING',
|
||||
'options': {
|
||||
'interval': 60
|
||||
}
|
||||
"detection_modes": [
|
||||
{
|
||||
'type': 'NODE_STATUS_POLLING'
|
||||
},
|
||||
],
|
||||
'interval': 60
|
||||
},
|
||||
'recovery': {
|
||||
'fencing': ['COMPUTE'],
|
||||
|
@ -62,13 +65,94 @@ class TestHealthPolicy(base.SenlinTestCase):
|
|||
self.hp = health_policy.HealthPolicy('test-policy', self.spec)
|
||||
|
||||
def test_policy_init(self):
|
||||
self.assertIsNone(self.hp.id)
|
||||
self.assertEqual('test-policy', self.hp.name)
|
||||
self.assertEqual('senlin.policy.health-1.0', self.hp.type)
|
||||
self.assertEqual('NODE_STATUS_POLLING', self.hp.check_type)
|
||||
self.assertEqual(60, self.hp.interval)
|
||||
DetectionMode = namedtuple(
|
||||
'DetectionMode',
|
||||
[self.hp.DETECTION_TYPE] + list(self.hp._DETECTION_OPTIONS))
|
||||
|
||||
detection_modes = [
|
||||
DetectionMode(
|
||||
type='NODE_STATUS_POLLING',
|
||||
poll_url='',
|
||||
poll_url_ssl_verify=True,
|
||||
poll_url_conn_error_as_unhealthy=True,
|
||||
poll_url_healthy_response='',
|
||||
poll_url_retry_limit='',
|
||||
poll_url_retry_interval=''
|
||||
)
|
||||
]
|
||||
|
||||
spec = {
|
||||
'type': 'senlin.policy.health',
|
||||
'version': '1.1',
|
||||
'properties': {
|
||||
'detection': {
|
||||
"detection_modes": [
|
||||
{
|
||||
'type': 'NODE_STATUS_POLLING'
|
||||
},
|
||||
],
|
||||
'interval': 60
|
||||
},
|
||||
'recovery': {
|
||||
'fencing': ['COMPUTE'],
|
||||
'actions': [
|
||||
{'name': 'REBUILD'}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
hp = health_policy.HealthPolicy('test-policy', spec)
|
||||
|
||||
self.assertIsNone(hp.id)
|
||||
self.assertEqual('test-policy', hp.name)
|
||||
self.assertEqual('senlin.policy.health-1.1', hp.type)
|
||||
self.assertEqual(detection_modes, hp.detection_modes)
|
||||
self.assertEqual(60, hp.interval)
|
||||
self.assertEqual([{'name': 'REBUILD', 'params': None}],
|
||||
self.hp.recover_actions)
|
||||
hp.recover_actions)
|
||||
|
||||
def test_policy_init_ops(self):
|
||||
spec = {
|
||||
'type': 'senlin.policy.health',
|
||||
'version': '1.1',
|
||||
'properties': {
|
||||
'detection': {
|
||||
"detection_modes": [
|
||||
{
|
||||
'type': 'NODE_STATUS_POLLING'
|
||||
},
|
||||
{
|
||||
'type': 'NODE_STATUS_POLL_URL'
|
||||
},
|
||||
],
|
||||
'interval': 60
|
||||
},
|
||||
'recovery': {
|
||||
'fencing': ['COMPUTE'],
|
||||
'actions': [
|
||||
{'name': 'REBUILD'}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
operations = [None, 'ALL_FAILED', 'ANY_FAILED']
|
||||
for op in operations:
|
||||
# set operation in spec
|
||||
if op:
|
||||
spec['properties']['detection']['recovery_conditional'] = op
|
||||
|
||||
# test __init__
|
||||
hp = health_policy.HealthPolicy('test-policy', spec)
|
||||
|
||||
# check result
|
||||
self.assertIsNone(hp.id)
|
||||
self.assertEqual('test-policy', hp.name)
|
||||
self.assertEqual('senlin.policy.health-1.1', hp.type)
|
||||
self.assertEqual(60, hp.interval)
|
||||
self.assertEqual([{'name': 'REBUILD', 'params': None}],
|
||||
hp.recover_actions)
|
||||
|
||||
def test_validate(self):
|
||||
spec = copy.deepcopy(self.spec)
|
||||
|
@ -86,7 +170,7 @@ class TestHealthPolicy(base.SenlinTestCase):
|
|||
|
||||
def test_validate_valid_interval(self):
|
||||
spec = copy.deepcopy(self.spec)
|
||||
spec["properties"]["detection"]["options"]["interval"] = 20
|
||||
spec["properties"]["detection"]["interval"] = 20
|
||||
self.hp = health_policy.HealthPolicy('test-policy', spec)
|
||||
|
||||
cfg.CONF.set_override('health_check_interval_min', 20)
|
||||
|
@ -95,7 +179,7 @@ class TestHealthPolicy(base.SenlinTestCase):
|
|||
|
||||
def test_validate_invalid_interval(self):
|
||||
spec = copy.deepcopy(self.spec)
|
||||
spec["properties"]["detection"]["options"]["interval"] = 10
|
||||
spec["properties"]["detection"]["interval"] = 10
|
||||
self.hp = health_policy.HealthPolicy('test-policy', spec)
|
||||
|
||||
cfg.CONF.set_override('health_check_interval_min', 20)
|
||||
|
@ -116,19 +200,24 @@ class TestHealthPolicy(base.SenlinTestCase):
|
|||
policy_data = {
|
||||
'HealthPolicy': {
|
||||
'data': {
|
||||
'check_type': self.hp.check_type,
|
||||
'interval': self.hp.interval,
|
||||
'poll_url': '',
|
||||
'poll_url_ssl_verify': True,
|
||||
'poll_url_conn_error_as_unhealthy': True,
|
||||
'poll_url_healthy_response': '',
|
||||
'poll_url_retry_limit': 3,
|
||||
'poll_url_retry_interval': 3,
|
||||
'detection_modes': [
|
||||
{
|
||||
'type': 'NODE_STATUS_POLLING',
|
||||
'poll_url': '',
|
||||
'poll_url_ssl_verify': True,
|
||||
'poll_url_conn_error_as_unhealthy': True,
|
||||
'poll_url_healthy_response': '',
|
||||
'poll_url_retry_limit': '',
|
||||
'poll_url_retry_interval': ''
|
||||
}
|
||||
],
|
||||
'node_update_timeout': 300,
|
||||
'node_delete_timeout': 20,
|
||||
'node_force_recreate': False
|
||||
'node_force_recreate': False,
|
||||
'recovery_conditional': 'ANY_FAILED'
|
||||
},
|
||||
'version': '1.0'
|
||||
'version': '1.1'
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -136,19 +225,24 @@ class TestHealthPolicy(base.SenlinTestCase):
|
|||
self.assertTrue(res)
|
||||
self.assertEqual(policy_data, data)
|
||||
kwargs = {
|
||||
'check_type': self.hp.check_type,
|
||||
'interval': self.hp.interval,
|
||||
'node_update_timeout': 300,
|
||||
'params': {
|
||||
'recover_action': self.hp.recover_actions,
|
||||
'poll_url': '',
|
||||
'poll_url_ssl_verify': True,
|
||||
'poll_url_conn_error_as_unhealthy': True,
|
||||
'poll_url_healthy_response': '',
|
||||
'poll_url_retry_limit': 3,
|
||||
'poll_url_retry_interval': 3,
|
||||
'node_update_timeout': 300,
|
||||
'node_delete_timeout': 20,
|
||||
'node_force_recreate': False
|
||||
'node_force_recreate': False,
|
||||
'recovery_conditional': 'ANY_FAILED',
|
||||
'detection_modes': [
|
||||
{
|
||||
'type': 'NODE_STATUS_POLLING',
|
||||
'poll_url': '',
|
||||
'poll_url_ssl_verify': True,
|
||||
'poll_url_conn_error_as_unhealthy': True,
|
||||
'poll_url_healthy_response': '',
|
||||
'poll_url_retry_limit': '',
|
||||
'poll_url_retry_interval': ''
|
||||
}
|
||||
],
|
||||
},
|
||||
'enabled': True
|
||||
}
|
||||
|
|
|
@ -57,6 +57,7 @@ senlin.policies =
|
|||
senlin.policy.deletion-1.1 = senlin.policies.deletion_policy:DeletionPolicy
|
||||
senlin.policy.scaling-1.0 = senlin.policies.scaling_policy:ScalingPolicy
|
||||
senlin.policy.health-1.0 = senlin.policies.health_policy:HealthPolicy
|
||||
senlin.policy.health-1.1 = senlin.policies.health_policy:HealthPolicy
|
||||
senlin.policy.loadbalance-1.0 = senlin.policies.lb_policy:LoadBalancingPolicy
|
||||
senlin.policy.loadbalance-1.1 = senlin.policies.lb_policy:LoadBalancingPolicy
|
||||
senlin.policy.region_placement-1.0 = senlin.policies.region_placement:RegionPlacementPolicy
|
||||
|
|
Loading…
Reference in New Issue