From 3e6a688db021647d49b06704852521eee4dda5e3 Mon Sep 17 00:00:00 2001 From: Duc Truong Date: Tue, 7 Aug 2018 23:35:29 +0000 Subject: [PATCH] Support multiple detection types in health policy This patchset changes the existing health policy to allow NODE_STATUS_POLLING to be combined with NODE_STATUS_POLL_URL to provide multiple detection types. The existing implementation of NODE_STATUS_POLLING was changed to directly perform a do_check on the node instead of doing a cluster check operation. This was done to avoid locking the cluster as part of the health check. When the cluster check was performed during a health check, there was a potential lock contention problem if the user was already performing other operations on the same cluster. Implements: blueprint multiple-detection-modes Change-Id: Iba0b043cc582e668a9066b0586ca8a0b201040e4 --- senlin/common/consts.py | 6 + senlin/common/utils.py | 7 +- senlin/engine/health_manager.py | 450 +++-- senlin/engine/node.py | 5 +- senlin/policies/health_policy.py | 277 ++- senlin/profiles/base.py | 11 +- .../tests/unit/engine/test_health_manager.py | 1651 +++++++++++------ .../tests/unit/policies/test_health_policy.py | 156 +- setup.cfg | 1 + 9 files changed, 1699 insertions(+), 865 deletions(-) diff --git a/senlin/common/consts.py b/senlin/common/consts.py index dd75f7ae9..783782131 100755 --- a/senlin/common/consts.py +++ b/senlin/common/consts.py @@ -288,6 +288,12 @@ RECOVERY_ACTIONS = ( 'REBOOT', 'REBUILD', 'RECREATE', ) +RECOVERY_CONDITIONAL = ( + ALL_FAILED, ANY_FAILED, +) = ( + 'ALL_FAILED', 'ANY_FAILED', +) + NOTIFICATION_PRIORITIES = ( PRIO_AUDIT, PRIO_CRITICAL, PRIO_ERROR, PRIO_WARN, PRIO_INFO, PRIO_DEBUG, PRIO_SAMPLE, diff --git a/senlin/common/utils.py b/senlin/common/utils.py index e17889e1c..56d5add16 100644 --- a/senlin/common/utils.py +++ b/senlin/common/utils.py @@ -88,7 +88,7 @@ def level_from_number(value): return levels.get(n, None) -def url_fetch(url, allowed_schemes=('http', 'https'), verify=True): +def url_fetch(url, timeout=1, allowed_schemes=('http', 'https'), verify=True): """Get the data at the specified URL. The URL must use the http: or https: schemes. @@ -96,7 +96,6 @@ def url_fetch(url, allowed_schemes=('http', 'https'), verify=True): the allowed_schemes argument. Raise an IOError if getting the data fails. """ - LOG.info('Fetching data from %s', url) components = urllib.parse.urlparse(url) @@ -105,12 +104,12 @@ def url_fetch(url, allowed_schemes=('http', 'https'), verify=True): if components.scheme == 'file': try: - return urllib.request.urlopen(url).read() + return urllib.request.urlopen(url, timeout=timeout).read() except urllib.error.URLError as uex: raise URLFetchError(_('Failed to retrieve data: %s') % uex) try: - resp = requests.get(url, stream=True, verify=verify) + resp = requests.get(url, stream=True, verify=verify, timeout=timeout) resp.raise_for_status() # We cannot use resp.text here because it would download the entire diff --git a/senlin/engine/health_manager.py b/senlin/engine/health_manager.py index 68e45352f..5d53a9ef9 100644 --- a/senlin/engine/health_manager.py +++ b/senlin/engine/health_manager.py @@ -17,6 +17,8 @@ trigger corresponding actions to recover the clusters based on the pre-defined health policies. """ +from collections import defaultdict +from collections import namedtuple from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging @@ -24,13 +26,13 @@ from oslo_service import service from oslo_service import threadgroup from oslo_utils import timeutils import re -import six import time from senlin.common import consts from senlin.common import context from senlin.common import messaging as rpc from senlin.common import utils +from senlin.engine import node as node_mod from senlin import objects from senlin.rpc import client as rpc_client @@ -195,94 +197,91 @@ def ListenerProc(exchange, project_id, cluster_id, recover_action): listener.start() -class HealthManager(service.Service): +class HealthCheckType(object): + @staticmethod + def factory(detection_type, cid, interval, params): + node_update_timeout = params['node_update_timeout'] + detection_params = [ + p for p in params['detection_modes'] + if p['type'] == detection_type + ] + if len(detection_params) != 1: + raise Exception( + 'The same detection mode cannot be used more than once in the ' + 'same policy. Encountered {} instances of ' + 'type {}.'.format(len(detection_params), detection_type) + ) - def __init__(self, engine_service, topic, version): - super(HealthManager, self).__init__() - - self.TG = threadgroup.ThreadGroup() - self.engine_id = engine_service.engine_id - self.topic = topic - self.version = version - self.ctx = context.get_admin_context() - self.rpc_client = rpc_client.EngineClient() - self.rt = { - 'registries': [], - } - - def _dummy_task(self): - """A Dummy task that is queued on the health manager thread group. - - The task is here so that the service always has something to wait() - on, or else the process will exit. - """ - self._load_runtime_registry() - - def _wait_for_action(self, ctx, action_id, timeout): - done = False - req = objects.ActionGetRequest(identity=action_id) - with timeutils.StopWatch(timeout) as timeout_watch: - while timeout > 0: - action = self.rpc_client.call(ctx, 'action_get', req) - if action['status'] in [consts.ACTION_SUCCEEDED, - consts.ACTION_FAILED, - consts.ACTION_CANCELLED]: - if action['status'] == consts.ACTION_SUCCEEDED: - done = True - break - time.sleep(2) - timeout = timeout_watch.leftover(True) - - if done: - return True, "" - elif timeout <= 0: - return False, "Timeout while polling cluster status" + if detection_type == consts.NODE_STATUS_POLLING: + return NodePollStatusHealthCheck( + cid, interval, node_update_timeout, detection_params[0]) + elif detection_type == consts.NODE_STATUS_POLL_URL: + return NodePollUrlHealthCheck( + cid, interval, node_update_timeout, detection_params[0]) else: - return False, "Cluster check action failed or cancelled" + raise Exception( + 'Invalid detection type: {}'.format(detection_type)) - def _poll_cluster(self, cluster_id, timeout, recover_action): - """Routine to be executed for polling cluster status. + def __init__(self, cluster_id, interval, node_update_timeout, params): + """Initialize HealthCheckType + :param ctx: :param cluster_id: The UUID of the cluster to be checked. - :param timeout: The maximum number of seconds to wait. - :param recover_action: The health policy action name. - :returns: Nothing. + :param params: Parameters specific to poll url or recovery action. + """ + self.cluster_id = cluster_id + self.interval = interval + self.node_update_timeout = node_update_timeout + self.params = params + + def run_health_check(self, ctx, node): + """Run health check on node + + :returns: True if node is healthy. False otherwise. + """ + pass + + +class NodePollStatusHealthCheck(HealthCheckType): + def run_health_check(self, ctx, node): + """Routine to be executed for polling node status. + + :returns: True if node is healthy. False otherwise. """ - start_time = timeutils.utcnow(True) - cluster = objects.Cluster.get(self.ctx, cluster_id, project_safe=False) - if not cluster: - LOG.warning("Cluster (%s) is not found.", cluster_id) - return _chase_up(start_time, timeout) - ctx = context.get_service_context(user_id=cluster.user, - project_id=cluster.project) - params = {'delete_check_action': True} try: - req = objects.ClusterCheckRequest(identity=cluster_id, - params=params) - action = self.rpc_client.call(ctx, 'cluster_check', req) + # create engine node from db node + entity = node_mod.Node._from_object(ctx, node) + + if not entity.do_check(ctx, return_check_result=True): + # server was not found as a result of performing check + node_last_updated = node.updated_at or node.init_at + if not timeutils.is_older_than( + node_last_updated, self.node_update_timeout): + LOG.info("Node %s was updated at %s which is less " + "than %d secs ago. Skip node recovery from " + "NodePollStatusHealthCheck.", + node.id, node_last_updated, + self.node_update_timeout) + return True + else: + return False + else: + LOG.debug("NodePollStatusHealthCheck reports node %s is " + "healthy.", node.id) + return True except Exception as ex: - LOG.warning("Failed in triggering 'cluster_check' RPC for " - "'%(c)s': %(r)s", - {'c': cluster_id, 'r': six.text_type(ex)}) - return _chase_up(start_time, timeout) + LOG.warning( + 'Error when performing health check on node %s: %s', + node.id, ex + ) + return False - # wait for action to complete - res, reason = self._wait_for_action(ctx, action['action'], timeout) - if not res: - LOG.warning("%s", reason) - return _chase_up(start_time, timeout) - # loop through nodes to trigger recovery - nodes = objects.Node.get_all_by_cluster(ctx, cluster_id) - for node in nodes: - if node.status != consts.NS_ACTIVE: - LOG.info("Requesting node recovery: %s", node.id) - req = objects.NodeRecoverRequest(identity=node.id, - params=recover_action) - self.rpc_client.call(ctx, 'node_recover', req) - - return _chase_up(start_time, timeout) +class NodePollUrlHealthCheck(HealthCheckType): + @staticmethod + def _convert_detection_tuple(dictionary): + return namedtuple('DetectionMode', dictionary.keys())(**dictionary) def _expand_url_template(self, url_template, node): """Expands parameters in an URL template @@ -300,31 +299,29 @@ class HealthManager(service.Service): return url - def _check_url_and_recover_node(self, ctx, node, recover_action, params): + def run_health_check(self, ctx, node): """Routine to check a node status from a url and recovery if necessary - :param ctx: The request context to use for recovery action :param node: The node to be checked. - :param recover_action: The health policy action name. - :param params: Parameters specific to poll url or recovery action - :returns: action if node was triggered for recovery. Otherwise None. + :returns: True if node is considered to be healthy. False otherwise. """ - url_template = params['poll_url'] - verify_ssl = params['poll_url_ssl_verify'] - conn_error_as_unhealthy = params['poll_url_conn_error_as_unhealthy'] - expected_resp_str = params['poll_url_healthy_response'] - max_unhealthy_retry = params['poll_url_retry_limit'] - retry_interval = params['poll_url_retry_interval'] - node_update_timeout = params['node_update_timeout'] + url_template = self.params['poll_url'] + verify_ssl = self.params['poll_url_ssl_verify'] + conn_error_as_unhealthy = self.params[ + 'poll_url_conn_error_as_unhealthy'] + expected_resp_str = self.params['poll_url_healthy_response'] + max_unhealthy_retry = self.params['poll_url_retry_limit'] + retry_interval = self.params['poll_url_retry_interval'] def stop_node_recovery(): node_last_updated = node.updated_at or node.init_at if not timeutils.is_older_than( - node_last_updated, node_update_timeout): + node_last_updated, self.node_update_timeout): LOG.info("Node %s was updated at %s which is less than " - "%d secs ago. Skip node recovery.", - node.id, node_last_updated, node_update_timeout) + "%d secs ago. Skip node recovery from " + "NodePollUrlHealthCheck.", + node.id, node_last_updated, self.node_update_timeout) return True LOG.info("Node %s is reported as down (%d retries left)", @@ -334,84 +331,71 @@ class HealthManager(service.Service): return False url = self._expand_url_template(url_template, node) - LOG.info("Polling node status from URL: %s", url) + LOG.debug("Polling node status from URL: %s", url) available_attemps = max_unhealthy_retry + timeout = max(retry_interval * 0.1, 1) while available_attemps > 0: available_attemps -= 1 try: - result = utils.url_fetch(url, verify=verify_ssl) + result = utils.url_fetch( + url, timeout=timeout, verify=verify_ssl) except utils.URLFetchError as ex: if conn_error_as_unhealthy: if stop_node_recovery(): - return None + return True continue else: LOG.error("Error when requesting node health status from" " %s: %s", url, ex) - return None + return True LOG.debug("Node status returned from URL(%s): %s", url, result) if re.search(expected_resp_str, result): - LOG.debug('Node %s is healthy', node.id) - return None + LOG.debug('NodePollUrlHealthCheck reports node %s is healthy.', + node.id) + return True if node.status != consts.NS_ACTIVE: LOG.info("Skip node recovery because node %s is not in " - "ACTIVE state", node.id) - return None + "ACTIVE state.", node.id) + return True if stop_node_recovery(): - return None + return True - # recover node after exhausting retries - LOG.info("Requesting node recovery: %s", node.id) - req = objects.NodeRecoverRequest(identity=node.id, - params=recover_action) + return False - return self.rpc_client.call(ctx, 'node_recover', req) - def _poll_url(self, cluster_id, timeout, recover_action, params): - """Routine to be executed for polling node status from a url +class HealthManager(service.Service): - :param cluster_id: The UUID of the cluster to be checked. - :param timeout: The maximum number of seconds to wait for recovery - action - :param recover_action: The health policy action name. - :param params: Parameters specific to poll url or recovery action - :returns: Nothing. + def __init__(self, engine_service, topic, version): + super(HealthManager, self).__init__() + + self.TG = threadgroup.ThreadGroup() + self.engine_id = engine_service.engine_id + self.topic = topic + self.version = version + self.ctx = context.get_admin_context() + self.rpc_client = rpc_client.EngineClient() + self.rt = { + 'registries': [], + } + self.health_check_types = defaultdict(lambda: []) + + def _dummy_task(self): + """A Dummy task that is queued on the health manager thread group. + + The task is here so that the service always has something to wait() + on, or else the process will exit. """ - start_time = timeutils.utcnow(True) - cluster = objects.Cluster.get(self.ctx, cluster_id, project_safe=False) - if not cluster: - LOG.warning("Cluster (%s) is not found.", cluster_id) - return _chase_up(start_time, timeout) - - ctx = context.get_service_context(user_id=cluster.user, - project_id=cluster.project) - - actions = [] - - # loop through nodes to poll url for each node - nodes = objects.Node.get_all_by_cluster(ctx, cluster_id) - for node in nodes: - action = self._check_url_and_recover_node(ctx, node, - recover_action, params) - if action: - actions.append(action) - - for a in actions: - # wait for action to complete - res, reason = self._wait_for_action(ctx, a['action'], timeout) - if not res: - LOG.warning("Node recovery action %s did not complete " - "within specified timeout: %s", a['action'], - reason) - - return _chase_up(start_time, timeout) + try: + self._load_runtime_registry() + except Exception as ex: + LOG.error("Failed when running '_load_runtime_registry': %s", ex) def _add_listener(self, cluster_id, recover_action): """Routine to be executed for adding cluster listener. @@ -438,12 +422,129 @@ class HealthManager(service.Service): return self.TG.add_thread(ListenerProc, exchange, project, cluster_id, recover_action) + def _recover_node(self, node_id, ctx, recover_action): + """Recover node + + :returns: Recover action + """ + try: + LOG.info("%s is requesting node recovery " + "for %s.", self.__class__.__name__, node_id) + req = objects.NodeRecoverRequest(identity=node_id, + params=recover_action) + + return self.rpc_client.call(ctx, 'node_recover', req) + except Exception as ex: + LOG.error('Error when performing node recovery for %s: %s', + node_id, ex) + return None + + def _wait_for_action(self, ctx, action_id, timeout): + req = objects.ActionGetRequest(identity=action_id) + with timeutils.StopWatch(timeout) as timeout_watch: + while not timeout_watch.expired(): + action = self.rpc_client.call(ctx, 'action_get', req) + if action['status'] in [ + consts.ACTION_SUCCEEDED, consts.ACTION_FAILED, + consts.ACTION_CANCELLED]: + break + time.sleep(2) + + if action['status'] == consts.ACTION_SUCCEEDED: + return True, "" + + if (action['status'] == consts.ACTION_FAILED or + action['status'] == consts.ACTION_CANCELLED): + return False, "Cluster check action failed or cancelled" + + return False, ("Timeout while waiting for node recovery action to " + "finish") + + def _add_health_check(self, cluster_id, health_check): + self.health_check_types[cluster_id].append(health_check) + + def _execute_health_check(self, interval, cluster_id, + recover_action, recovery_cond, + node_update_timeout): + start_time = timeutils.utcnow(True) + + try: + if cluster_id not in self.health_check_types: + LOG.error("Cluster (%s) is not found in health_check_types.", + self.cluster_id) + return _chase_up(start_time, interval) + + if len(self.health_check_types[cluster_id]) == 0: + LOG.error("No health check types found for Cluster (%s).", + self.cluster_id) + return _chase_up(start_time, interval) + + cluster = objects.Cluster.get(self.ctx, cluster_id, + project_safe=False) + if not cluster: + LOG.warning("Cluster (%s) is not found.", self.cluster_id) + return _chase_up(start_time, interval) + + ctx = context.get_service_context(user_id=cluster.user, + project_id=cluster.project) + + actions = [] + + # loop through nodes and run all health checks on each node + nodes = objects.Node.get_all_by_cluster(ctx, cluster_id) + + for node in nodes: + node_is_healthy = True + + if recovery_cond == consts.ANY_FAILED: + # recovery happens if any detection mode fails + # i.e. the inverse logic is that node is considered healthy + # if all detection modes pass + node_is_healthy = all( + hc.run_health_check(ctx, node) + for hc in self.health_check_types[cluster_id]) + elif recovery_cond == consts.ALL_FAILED: + # recovery happens if all detection modes fail + # i.e. the inverse logic is that node is considered healthy + # if any detection mode passes + node_is_healthy = any( + hc.run_health_check(ctx, node) + for hc in self.health_check_types[cluster_id]) + else: + raise Exception( + '{} is an invalid recovery conditional'.format( + recovery_cond)) + + if not node_is_healthy: + action = self._recover_node(node.id, ctx, + recover_action) + actions.append(action) + + for a in actions: + # wait for action to complete + res, reason = self._wait_for_action( + ctx, a['action'], node_update_timeout) + if not res: + LOG.warning("Node recovery action %s did not complete " + "within specified timeout: %s", a['action'], + reason) + + if len(actions) > 0: + LOG.info('Health check passed for all nodes in cluster %s.', + cluster_id) + except Exception as ex: + LOG.warning('Error while performing health check: %s', ex) + + return _chase_up(start_time, interval) + def _start_check(self, entry): """Routine for starting the checking for a cluster. :param entry: A dict containing the data associated with the cluster. :returns: An updated registry entry record. """ + LOG.info('Enabling health check for cluster %s.', entry['cluster_id']) + cid = entry['cluster_id'] ctype = entry['check_type'] # Get the recover action parameter from the entry params @@ -459,22 +560,24 @@ class HealthManager(service.Service): for operation in rac: recover_action['operation'] = operation.get('name') - if ctype == consts.NODE_STATUS_POLLING: + polling_types = [consts.NODE_STATUS_POLLING, + consts.NODE_STATUS_POLL_URL] + + detection_types = ctype.split(',') + if all(check in polling_types for check in detection_types): interval = min(entry['interval'], cfg.CONF.check_interval_max) - timer = self.TG.add_dynamic_timer(self._poll_cluster, - None, # initial_delay - None, # check_interval_max - cid, interval, recover_action) + for check in ctype.split(','): + self._add_health_check(cid, HealthCheckType.factory( + check, cid, interval, params)) + timer = self.TG.add_dynamic_timer(self._execute_health_check, + None, None, interval, cid, + recover_action, + params['recovery_conditional'], + params['node_update_timeout']) + entry['timer'] = timer - elif ctype == consts.NODE_STATUS_POLL_URL: - interval = min(entry['interval'], cfg.CONF.check_interval_max) - timer = self.TG.add_dynamic_timer(self._poll_url, - None, # initial_delay - None, # check_interval_max - cid, interval, - recover_action, params) - entry['timer'] = timer - elif ctype == consts.LIFECYCLE_EVENTS: + elif (len(detection_types) == 1 and + detection_types[0] == consts.LIFECYCLE_EVENTS): LOG.info("Start listening events for cluster (%s).", cid) listener = self._add_listener(cid, recover_action) if listener: @@ -483,8 +586,8 @@ class HealthManager(service.Service): LOG.warning("Error creating listener for cluster %s", cid) return None else: - LOG.warning("Cluster %(id)s check type %(type)s is invalid.", - {'id': cid, 'type': ctype}) + LOG.error("Cluster %(id)s check type %(type)s is invalid.", + {'id': cid, 'type': ctype}) return None return entry @@ -495,10 +598,17 @@ class HealthManager(service.Service): :param entry: A dict containing the data associated with the cluster. :returns: ``None``. """ + LOG.info('Disabling health check for cluster %s.', entry['cluster_id']) + timer = entry.get('timer', None) if timer: + # stop timer timer.stop() + + # tell threadgroup to remove timer self.TG.timer_done(timer) + if entry['cluster_id'] in self.health_check_types: + self.health_check_types.pop(entry['cluster_id']) return listener = entry.get('listener', None) @@ -558,13 +668,13 @@ class HealthManager(service.Service): """Respond to confirm that the rpc service is still alive.""" return True - def register_cluster(self, ctx, cluster_id, check_type, interval=None, - params=None, enabled=True): + def register_cluster(self, ctx, cluster_id, interval=None, + node_update_timeout=None, params=None, + enabled=True): """Register cluster for health checking. :param ctx: The context of notify request. :param cluster_id: The ID of the cluster to be checked. - :param check_type: A string indicating the type of checks. :param interval: An optional integer indicating the length of checking periods in seconds. :param dict params: Other parameters for the health check. @@ -572,6 +682,17 @@ class HealthManager(service.Service): """ params = params or {} + # extract check_type from params + check_type = "" + if 'detection_modes' in params: + check_type = ','.join([ + NodePollUrlHealthCheck._convert_detection_tuple(d).type + for d in params['detection_modes'] + ]) + + # add node_update_timeout to params + params['node_update_timeout'] = node_update_timeout + registry = objects.HealthRegistry.create(ctx, cluster_id, check_type, interval, params, self.engine_id, @@ -603,6 +724,7 @@ class HealthManager(service.Service): self._stop_check(entry) self.rt['registries'].pop(i) objects.HealthRegistry.delete(ctx, cluster_id) + LOG.debug('unregister done') def enable_cluster(self, ctx, cluster_id, params=None): for c in self.rt['registries']: @@ -651,12 +773,12 @@ def notify(engine_id, method, **kwargs): def register(cluster_id, engine_id=None, **kwargs): params = kwargs.pop('params', {}) interval = kwargs.pop('interval', cfg.CONF.periodic_interval) - check_type = kwargs.pop('check_type', consts.NODE_STATUS_POLLING) + node_update_timeout = kwargs.pop('node_update_timeout', 300) enabled = kwargs.pop('enabled', True) return notify(engine_id, 'register_cluster', cluster_id=cluster_id, interval=interval, - check_type=check_type, + node_update_timeout=node_update_timeout, params=params, enabled=enabled) diff --git a/senlin/engine/node.py b/senlin/engine/node.py index 4a1f74614..7177070e0 100644 --- a/senlin/engine/node.py +++ b/senlin/engine/node.py @@ -316,7 +316,7 @@ class Node(object): self.index = -1 return True - def do_check(self, context): + def do_check(self, context, return_check_result=False): if not self.physical_id: return False @@ -330,6 +330,9 @@ class Node(object): self.set_status(context, consts.NS_ERROR, six.text_type(ex)) return False + if return_check_result: + return res + # Physical object is ACTIVE but for some reason the node status in # senlin was WARNING. We only update the status_reason if res: diff --git a/senlin/policies/health_policy.py b/senlin/policies/health_policy.py index ef0d491cd..cfa234144 100644 --- a/senlin/policies/health_policy.py +++ b/senlin/policies/health_policy.py @@ -10,7 +10,9 @@ # License for the specific language governing permissions and limitations # under the License. +from collections import namedtuple from oslo_config import cfg +from oslo_log import log as logging from senlin.common import constraints from senlin.common import consts @@ -21,16 +23,21 @@ from senlin.common import schema from senlin.engine import health_manager from senlin.policies import base +LOG = logging.getLogger(__name__) + class HealthPolicy(base.Policy): """Policy for health management of a cluster.""" - VERSION = '1.0' + VERSION = '1.1' VERSIONS = { '1.0': [ {'status': consts.EXPERIMENTAL, 'since': '2017.02'}, {'status': consts.SUPPORTED, 'since': '2018.06'}, - ] + ], + '1.1': [ + {'status': consts.SUPPORTED, 'since': '2018.09'} + ], } PRIORITY = 600 @@ -55,20 +62,21 @@ class HealthPolicy(base.Policy): KEYS = (DETECTION, RECOVERY) = ('detection', 'recovery') _DETECTION_KEYS = ( - DETECTION_TYPE, DETECTION_OPTIONS, + DETECTION_MODES, DETECTION_TYPE, DETECTION_OPTIONS, DETECTION_INTERVAL, + NODE_UPDATE_TIMEOUT, RECOVERY_CONDITIONAL ) = ( - 'type', 'options' + 'detection_modes', 'type', 'options', 'interval', + 'node_update_timeout', 'recovery_conditional' ) _DETECTION_OPTIONS = ( - DETECTION_INTERVAL, POLL_URL, POLL_URL_SSL_VERIFY, + POLL_URL, POLL_URL_SSL_VERIFY, POLL_URL_CONN_ERROR_AS_UNHEALTHY, POLL_URL_HEALTHY_RESPONSE, - POLL_URL_RETRY_LIMIT, POLL_URL_RETRY_INTERVAL, NODE_UPDATE_TIMEOUT, + POLL_URL_RETRY_LIMIT, POLL_URL_RETRY_INTERVAL, ) = ( - 'interval', 'poll_url', 'poll_url_ssl_verify', + 'poll_url', 'poll_url_ssl_verify', 'poll_url_conn_error_as_unhealthy', 'poll_url_healthy_response', - 'poll_url_retry_limit', 'poll_url_retry_interval', - 'node_update_timeout', + 'poll_url_retry_limit', 'poll_url_retry_interval' ) _RECOVERY_KEYS = ( @@ -96,68 +104,100 @@ class HealthPolicy(base.Policy): DETECTION: schema.Map( _('Policy aspect for node failure detection.'), schema={ - DETECTION_TYPE: schema.String( - _('Type of node failure detection.'), + DETECTION_INTERVAL: schema.Integer( + _("Number of seconds between pollings. Only " + "required when type is 'NODE_STATUS_POLLING' or " + "'NODE_STATUS_POLL_URL'."), + default=60, + ), + NODE_UPDATE_TIMEOUT: schema.Integer( + _("Number of seconds since last node update to " + "wait before checking node health."), + default=300, + ), + RECOVERY_CONDITIONAL: schema.String( + _("The conditional that determines when recovery should be" + " performed in case multiple detection modes are " + "specified. 'ALL_FAILED' means that all " + "detection modes have to return failed health checks " + "before a node is recovered. 'ANY_FAILED'" + " means that a failed health check with a single " + "detection mode triggers a node recovery."), constraints=[ - constraints.AllowedValues(consts.DETECTION_TYPES), + constraints.AllowedValues( + consts.RECOVERY_CONDITIONAL), ], - required=True, - ), - DETECTION_OPTIONS: schema.Map( - schema={ - DETECTION_INTERVAL: schema.Integer( - _("Number of seconds between pollings. Only " - "required when type is 'NODE_STATUS_POLLING' or " - "'NODE_STATUS_POLL_URL'."), - default=60, - ), - POLL_URL: schema.String( - _("URL to poll for node status. See documentation " - "for valid expansion parameters. Only required " - "when type is 'NODE_STATUS_POLL_URL'."), - default='', - ), - POLL_URL_SSL_VERIFY: schema.Boolean( - _("Whether to verify SSL when calling URL to poll " - "for node status. Only required when type is " - "'NODE_STATUS_POLL_URL'."), - default=True, - ), - POLL_URL_CONN_ERROR_AS_UNHEALTHY: schema.Boolean( - _("Whether to treat URL connection errors as an " - "indication of an unhealthy node. Only required " - "when type is 'NODE_STATUS_POLL_URL'."), - default=True, - ), - POLL_URL_HEALTHY_RESPONSE: schema.String( - _("String pattern in the poll URL response body " - "that indicates a healthy node. " - "Required when type is 'NODE_STATUS_POLL_URL'."), - default='', - ), - POLL_URL_RETRY_LIMIT: schema.Integer( - _("Number of times to retry URL polling when its " - "return body is missing " - "POLL_URL_HEALTHY_RESPONSE string before a node " - "is considered down. Required when type is " - "'NODE_STATUS_POLL_URL'."), - default=3, - ), - POLL_URL_RETRY_INTERVAL: schema.Integer( - _("Number of seconds between URL polling retries " - "before a node is considered down. " - "Required when type is 'NODE_STATUS_POLL_URL'."), - default=3, - ), - NODE_UPDATE_TIMEOUT: schema.Integer( - _("Number of seconds since last node update to " - "wait before checking node health. " - "Required when type is 'NODE_STATUS_POLL_URL'."), - default=300, - ), - }, - default={} + default=consts.ANY_FAILED, + required=False, ), + DETECTION_MODES: schema.List( + _('List of node failure detection modes.'), + schema=schema.Map( + _('Node failure detection mode to try'), + schema={ + DETECTION_TYPE: schema.String( + _('Type of node failure detection.'), + constraints=[ + constraints.AllowedValues( + consts.DETECTION_TYPES), + ], + required=True, + ), + DETECTION_OPTIONS: schema.Map( + schema={ + POLL_URL: schema.String( + _("URL to poll for node status. See " + "documentation for valid expansion " + "parameters. Only required " + "when type is " + "'NODE_STATUS_POLL_URL'."), + default='', + ), + POLL_URL_SSL_VERIFY: schema.Boolean( + _("Whether to verify SSL when calling " + "URL to poll for node status. Only " + "required when type is " + "'NODE_STATUS_POLL_URL'."), + default=True, + ), + POLL_URL_CONN_ERROR_AS_UNHEALTHY: + schema.Boolean( + _("Whether to treat URL connection " + "errors as an indication of an " + "unhealthy node. Only required " + "when type is " + "'NODE_STATUS_POLL_URL'."), + default=True, + ), + POLL_URL_HEALTHY_RESPONSE: schema.String( + _("String pattern in the poll URL " + "response body that indicates a " + "healthy node. Required when type " + "is 'NODE_STATUS_POLL_URL'."), + default='', + ), + POLL_URL_RETRY_LIMIT: schema.Integer( + _("Number of times to retry URL " + "polling when its return body is " + "missing POLL_URL_HEALTHY_RESPONSE " + "string before a node is considered " + "down. Required when type is " + "'NODE_STATUS_POLL_URL'."), + default=3, + ), + POLL_URL_RETRY_INTERVAL: schema.Integer( + _("Number of seconds between URL " + "polling retries before a node is " + "considered down. Required when " + "type is 'NODE_STATUS_POLL_URL'."), + default=3, + ), + }, + default={} + ), + } + ) + ) }, required=True, ), @@ -209,27 +249,44 @@ class HealthPolicy(base.Policy): "action is RECREATE."), default=False, ), - } + }, + required=True, ), } def __init__(self, name, spec, **kwargs): super(HealthPolicy, self).__init__(name, spec, **kwargs) - self.check_type = self.properties[self.DETECTION][self.DETECTION_TYPE] + self.interval = self.properties[self.DETECTION].get( + self.DETECTION_INTERVAL, 60) - options = self.properties[self.DETECTION][self.DETECTION_OPTIONS] - self.interval = options.get(self.DETECTION_INTERVAL, 60) - self.poll_url = options.get(self.POLL_URL, '') - self.poll_url_ssl_verify = options.get(self.POLL_URL_SSL_VERIFY, True) - self.poll_url_conn_error_as_unhealthy = options.get( - self.POLL_URL_CONN_ERROR_AS_UNHEALTHY, True) - self.poll_url_healthy_response = options.get( - self.POLL_URL_HEALTHY_RESPONSE, '') - self.poll_url_retry_limit = options.get(self.POLL_URL_RETRY_LIMIT, '') - self.poll_url_retry_interval = options.get( - self.POLL_URL_RETRY_INTERVAL, '') - self.node_update_timeout = options.get(self.NODE_UPDATE_TIMEOUT, 300) + self.node_update_timeout = self.properties[self.DETECTION].get( + self.NODE_UPDATE_TIMEOUT, 300) + + self.recovery_conditional = self.properties[self.DETECTION].get( + self.RECOVERY_CONDITIONAL, consts.ANY_FAILED) + + DetectionMode = namedtuple( + 'DetectionMode', + [self.DETECTION_TYPE] + list(self._DETECTION_OPTIONS)) + + self.detection_modes = [] + + raw_modes = self.properties[self.DETECTION][self.DETECTION_MODES] + for mode in raw_modes: + options = mode[self.DETECTION_OPTIONS] + + self.detection_modes.append( + DetectionMode( + mode[self.DETECTION_TYPE], + options.get(self.POLL_URL, ''), + options.get(self.POLL_URL_SSL_VERIFY, True), + options.get(self.POLL_URL_CONN_ERROR_AS_UNHEALTHY, True), + options.get(self.POLL_URL_HEALTHY_RESPONSE, ''), + options.get(self.POLL_URL_RETRY_LIMIT, ''), + options.get(self.POLL_URL_RETRY_INTERVAL, '') + ) + ) recover_settings = self.properties[self.RECOVERY] self.recover_actions = recover_settings[self.RECOVERY_ACTIONS] @@ -257,6 +314,30 @@ class HealthPolicy(base.Policy): cfg.CONF.health_check_interval_min} raise exc.InvalidSpec(message=message) + # check valid detection types + polling_types = [consts.NODE_STATUS_POLLING, + consts.NODE_STATUS_POLL_URL] + + has_valid_polling_types = all( + d.type in polling_types + for d in self.detection_modes + ) + has_valid_lifecycle_type = ( + len(self.detection_modes) == 1 and + self.detection_modes[0].type == consts.LIFECYCLE_EVENTS + ) + + if not has_valid_polling_types and not has_valid_lifecycle_type: + message = ("Invalid detection modes in health policy: %s" % + ', '.join([d.type for d in self.detection_modes])) + raise exc.InvalidSpec(message=message) + + if len(self.detection_modes) != len(set(self.detection_modes)): + message = ("Duplicate detection modes are not allowed in " + "health policy: %s" % + ', '.join([d.type for d in self.detection_modes])) + raise exc.InvalidSpec(message=message) + # TODO(Qiming): Add detection of duplicated action names when # support to list of actions is implemented. @@ -283,40 +364,33 @@ class HealthPolicy(base.Policy): return False, err_msg kwargs = { - 'check_type': self.check_type, 'interval': self.interval, + 'node_update_timeout': self.node_update_timeout, 'params': { 'recover_action': self.recover_actions, - 'poll_url': self.poll_url, - 'poll_url_ssl_verify': self.poll_url_ssl_verify, - 'poll_url_conn_error_as_unhealthy': - self.poll_url_conn_error_as_unhealthy, - 'poll_url_healthy_response': self.poll_url_healthy_response, - 'poll_url_retry_limit': self.poll_url_retry_limit, - 'poll_url_retry_interval': self.poll_url_retry_interval, - 'node_update_timeout': self.node_update_timeout, 'node_delete_timeout': self.node_delete_timeout, 'node_force_recreate': self.node_force_recreate, + 'recovery_conditional': self.recovery_conditional, }, 'enabled': enabled } + converted_detection_modes = [ + d._asdict() for d in self.detection_modes + ] + detection_mode = {'detection_modes': converted_detection_modes} + kwargs['params'].update(detection_mode) + health_manager.register(cluster.id, engine_id=None, **kwargs) data = { - 'check_type': self.check_type, 'interval': self.interval, - 'poll_url': self.poll_url, - 'poll_url_ssl_verify': self.poll_url_ssl_verify, - 'poll_url_conn_error_as_unhealthy': - self.poll_url_conn_error_as_unhealthy, - 'poll_url_healthy_response': self.poll_url_healthy_response, - 'poll_url_retry_limit': self.poll_url_retry_limit, - 'poll_url_retry_interval': self.poll_url_retry_interval, 'node_update_timeout': self.node_update_timeout, + 'recovery_conditional': self.recovery_conditional, 'node_delete_timeout': self.node_delete_timeout, 'node_force_recreate': self.node_force_recreate, } + data.update(detection_mode) return True, self._build_policy_data(data) @@ -327,7 +401,10 @@ class HealthPolicy(base.Policy): :param cluster: The target cluster. :returns: A tuple comprising the execution result and reason. """ - health_manager.unregister(cluster.id) + ret = health_manager.unregister(cluster.id) + if not ret: + LOG.warning('Unregistering health manager for cluster %s ' + 'timed out.', cluster.id) return True, '' def pre_op(self, cluster_id, action, **args): diff --git a/senlin/profiles/base.py b/senlin/profiles/base.py index 089b2055c..f27da6b32 100644 --- a/senlin/profiles/base.py +++ b/senlin/profiles/base.py @@ -11,8 +11,10 @@ # under the License. import copy +import eventlet import inspect +from oslo_config import cfg from oslo_context import context as oslo_context from oslo_log import log as logging from oslo_utils import timeutils @@ -302,7 +304,7 @@ class Profile(object): try: return profile.do_check(obj) except exc.InternalError as ex: - LOG.error(ex) + LOG.debug(ex) return False @classmethod @@ -518,6 +520,13 @@ class Profile(object): raise exc.EResourceOperation(op='recovering', type='node', id=obj.id, message=six.text_type(ex)) + + # pause to allow deleted resource to get reclaimed by nova + # this is needed to avoid a problem when the compute resources are + # at their quota limit. The deleted resource has to become available + # so that the new node can be created. + eventlet.sleep(cfg.CONF.batch_interval) + res = None try: res = self.do_create(obj) diff --git a/senlin/tests/unit/engine/test_health_manager.py b/senlin/tests/unit/engine/test_health_manager.py index d2b180f5d..b4f0df191 100644 --- a/senlin/tests/unit/engine/test_health_manager.py +++ b/senlin/tests/unit/engine/test_health_manager.py @@ -11,10 +11,12 @@ # under the License. import copy +import re import time import mock from oslo_config import cfg +from oslo_service import threadgroup from oslo_utils import timeutils as tu from senlin.common import consts @@ -22,6 +24,7 @@ from senlin.common import context from senlin.common import messaging from senlin.common import utils from senlin.engine import health_manager as hm +from senlin.engine import node as node_mod from senlin import objects from senlin.objects import cluster as obj_cluster from senlin.objects import health_registry as hr @@ -502,6 +505,425 @@ class TestListenerProc(base.SenlinTestCase): x_listener.start.assert_called_once_with() +class TestHealthCheckType(base.SenlinTestCase): + def setUp(self): + super(TestHealthCheckType, self).setUp() + + self.hc = hm.NodePollStatusHealthCheck( + cluster_id='CLUSTER_ID', interval=1, node_update_timeout=1, + params='' + ) + + def test_factory(self): + cid = 'CLUSTER_ID' + interval = 1 + params = { + 'detection_modes': [ + { + 'type': 'NODE_STATUS_POLLING', + 'poll_url': '', + 'poll_url_ssl_verify': True, + 'poll_url_conn_error_as_unhealthy': True, + 'poll_url_healthy_response': '', + 'poll_url_retry_limit': '', + 'poll_url_retry_interval': '' + }, + { + 'type': 'NODE_STATUS_POLL_URL', + 'poll_url': '', + 'poll_url_ssl_verify': True, + 'poll_url_conn_error_as_unhealthy': True, + 'poll_url_healthy_response': '', + 'poll_url_retry_limit': '', + 'poll_url_retry_interval': '' + } + ], + 'node_update_timeout': 300, + } + + for d in params['detection_modes']: + hc = hm.HealthCheckType.factory(d['type'], cid, interval, params) + + self.assertEqual(cid, hc.cluster_id) + self.assertEqual(interval, hc.interval) + self.assertEqual(d, hc.params) + self.assertEqual( + params['node_update_timeout'], hc.node_update_timeout) + + def test_factory_invalid_type(self): + cid = 'CLUSTER_ID' + interval = 1 + params = { + 'detection_modes': [ + { + 'type': 'blah', + 'poll_url': '', + 'poll_url_ssl_verify': True, + 'poll_url_conn_error_as_unhealthy': True, + 'poll_url_healthy_response': '', + 'poll_url_retry_limit': '', + 'poll_url_retry_interval': '' + }, + ], + 'node_update_timeout': 300, + } + + with self.assertRaisesRegex(Exception, 'Invalid detection type: blah'): + hm.HealthCheckType.factory('blah', cid, interval, params) + + def test_factory_same_type_twice(self): + cid = 'CLUSTER_ID' + interval = 1 + params = { + 'detection_modes': [ + { + 'type': 'NODE_STATUS_POLLING', + 'poll_url': '', + 'poll_url_ssl_verify': True, + 'poll_url_conn_error_as_unhealthy': True, + 'poll_url_healthy_response': '', + 'poll_url_retry_limit': '', + 'poll_url_retry_interval': '' + }, + { + 'type': 'NODE_STATUS_POLLING', + 'poll_url': '', + 'poll_url_ssl_verify': True, + 'poll_url_conn_error_as_unhealthy': True, + 'poll_url_healthy_response': '', + 'poll_url_retry_limit': '', + 'poll_url_retry_interval': '' + } + ], + 'node_update_timeout': 300, + } + + with self.assertRaisesRegex( + Exception, + '.*Encountered 2 instances of type NODE_STATUS_POLLING'): + hm.HealthCheckType.factory( + 'NODE_STATUS_POLLING', cid, interval, params) + + +class TestNodePollStatusHealthCheck(base.SenlinTestCase): + def setUp(self): + super(TestNodePollStatusHealthCheck, self).setUp() + + self.hc = hm.NodePollStatusHealthCheck( + cluster_id='CLUSTER_ID', + interval=1, node_update_timeout=1, params='' + ) + + @mock.patch.object(node_mod.Node, '_from_object') + @mock.patch.object(tu, 'is_older_than') + def test_run_health_check_healthy(self, mock_tu, mock_node_obj): + x_entity = mock.Mock() + x_entity.do_check.return_value = True + mock_node_obj.return_value = x_entity + + ctx = mock.Mock() + node = mock.Mock(id='FAKE_NODE1', status="ERROR", + updated_at='2018-08-13 18:00:00', + init_at='2018-08-13 17:00:00') + + # do it + res = self.hc.run_health_check(ctx, node) + + self.assertTrue(res) + mock_tu.assert_not_called() + + @mock.patch.object(node_mod.Node, '_from_object') + @mock.patch.object(tu, 'is_older_than') + def test_run_health_check_unhealthy(self, mock_tu, mock_node_obj): + x_entity = mock.Mock() + x_entity.do_check.return_value = False + mock_node_obj.return_value = x_entity + + mock_tu.return_value = True + + ctx = mock.Mock() + node = mock.Mock(id='FAKE_NODE1', status="ERROR", + updated_at='2018-08-13 18:00:00', + init_at='2018-08-13 17:00:00') + + # do it + res = self.hc.run_health_check(ctx, node) + + self.assertFalse(res) + mock_tu.assert_called_once_with(node.updated_at, 1) + + @mock.patch.object(node_mod.Node, '_from_object') + @mock.patch.object(tu, 'is_older_than') + def test_run_health_check_unhealthy_within_timeout( + self, mock_tu, mock_node_obj): + x_entity = mock.Mock() + x_entity.do_check.return_value = False + mock_node_obj.return_value = x_entity + + mock_tu.return_value = False + + ctx = mock.Mock() + node = mock.Mock(id='FAKE_NODE1', status="ERROR", + updated_at='2018-08-13 18:00:00', + init_at='2018-08-13 17:00:00') + + # do it + res = self.hc.run_health_check(ctx, node) + + self.assertTrue(res) + mock_tu.assert_called_once_with(node.updated_at, 1) + + +class TestNodePollUrlHealthCheck(base.SenlinTestCase): + def setUp(self): + super(TestNodePollUrlHealthCheck, self).setUp() + + default_params = { + 'poll_url': 'FAKE_POLL_URL', + 'poll_url_ssl_verify': True, + 'poll_url_conn_error_as_unhealthy': True, + 'poll_url_healthy_response': 'FAKE_HEALTHY_PATTERN', + 'poll_url_retry_limit': 2, + 'poll_url_retry_interval': 1, + 'node_update_timeout': 5 + } + + self.hc = hm.NodePollUrlHealthCheck( + cluster_id='CLUSTER_ID', interval=1, node_update_timeout=1, + params=default_params + ) + + def test_expand_url_template(self): + url_template = 'https://abc123/foo/bar' + node = mock.Mock() + + # do it + res = self.hc._expand_url_template(url_template, node) + + self.assertEqual(res, url_template) + + def test_expand_url_template_nodename(self): + node = mock.Mock() + node.name = 'name' + url_template = 'https://abc123/{nodename}/bar' + expanded_url = 'https://abc123/{}/bar'.format(node.name) + + # do it + res = self.hc._expand_url_template(url_template, node) + + self.assertEqual(res, expanded_url) + + @mock.patch.object(tu, "is_older_than") + @mock.patch.object(hm.NodePollUrlHealthCheck, "_expand_url_template") + @mock.patch.object(utils, 'url_fetch') + def test_run_health_check_healthy( + self, mock_url_fetch, mock_expand_url, mock_time): + ctx = mock.Mock() + node = mock.Mock() + node.status = consts.NS_ACTIVE + mock_time.return_value = True + mock_expand_url.return_value = 'FAKE_EXPANDED_URL' + mock_url_fetch.return_value = ("Healthy because this return value " + "contains FAKE_HEALTHY_PATTERN") + + # do it + res = self.hc.run_health_check(ctx, node) + + self.assertTrue(res) + mock_url_fetch.assert_called_once_with('FAKE_EXPANDED_URL', timeout=1, + verify=True) + + @mock.patch.object(tu, "is_older_than") + @mock.patch.object(hm.NodePollUrlHealthCheck, "_expand_url_template") + @mock.patch.object(utils, 'url_fetch') + def test_run_health_check_healthy_min_timeout( + self, mock_url_fetch, mock_expand_url, mock_time): + ctx = mock.Mock() + node = mock.Mock() + node.status = consts.NS_ACTIVE + mock_time.return_value = True + mock_expand_url.return_value = 'FAKE_EXPANDED_URL' + mock_url_fetch.return_value = ("Healthy because this return value " + "contains FAKE_HEALTHY_PATTERN") + + self.hc.params['poll_url_retry_interval'] = 0 + + # do it + res = self.hc.run_health_check(ctx, node) + + self.assertTrue(res) + mock_url_fetch.assert_called_once_with('FAKE_EXPANDED_URL', timeout=1, + verify=True) + + @mock.patch.object(tu, "is_older_than") + @mock.patch.object(hm.NodePollUrlHealthCheck, "_expand_url_template") + @mock.patch.object(utils, 'url_fetch') + def test_run_health_check_healthy_timeout( + self, mock_url_fetch, mock_expand_url, mock_time): + ctx = mock.Mock() + node = mock.Mock() + node.status = consts.NS_ACTIVE + mock_time.return_value = True + mock_expand_url.return_value = 'FAKE_EXPANDED_URL' + mock_url_fetch.return_value = ("Healthy because this return value " + "contains FAKE_HEALTHY_PATTERN") + + self.hc.params['poll_url_retry_interval'] = 100 + + # do it + res = self.hc.run_health_check(ctx, node) + + self.assertTrue(res) + mock_url_fetch.assert_called_once_with('FAKE_EXPANDED_URL', timeout=10, + verify=True) + + @mock.patch.object(tu, "is_older_than") + @mock.patch.object(hm.NodePollUrlHealthCheck, "_expand_url_template") + @mock.patch.object(utils, 'url_fetch') + def test_run_health_check_unhealthy_inactive( + self, mock_url_fetch, mock_expand_url, mock_time): + ctx = mock.Mock() + node = mock.Mock() + node.status = consts.NS_RECOVERING + mock_time.return_value = True + mock_expand_url.return_value = 'FAKE_EXPANDED_URL' + mock_url_fetch.return_value = "" + + # do it + res = self.hc.run_health_check(ctx, node) + + self.assertTrue(res) + mock_url_fetch.assert_called_once_with('FAKE_EXPANDED_URL', timeout=1, + verify=True) + + @mock.patch.object(tu, "is_older_than") + @mock.patch.object(hm.NodePollUrlHealthCheck, "_expand_url_template") + @mock.patch.object(utils, 'url_fetch') + def test_run_health_check_unhealthy_update_timeout( + self, mock_url_fetch, mock_expand_url, mock_time): + ctx = mock.Mock() + node = mock.Mock() + node.id = 'FAKE_NODE_ID' + node.updated_at = 'FAKE_UPDATE_TIME' + node.status = consts.NS_ACTIVE + mock_time.return_value = False + mock_expand_url.return_value = 'FAKE_EXPANDED_URL' + mock_url_fetch.return_value = "" + + # do it + res = self.hc.run_health_check(ctx, node) + + self.assertTrue(res) + mock_url_fetch.assert_called_once_with('FAKE_EXPANDED_URL', timeout=1, + verify=True) + + @mock.patch.object(tu, "is_older_than") + @mock.patch.object(hm.NodePollUrlHealthCheck, "_expand_url_template") + @mock.patch.object(utils, 'url_fetch') + def test_run_health_check_unhealthy_init_timeout( + self, mock_url_fetch, mock_expand_url, mock_time): + ctx = mock.Mock() + node = mock.Mock() + node.id = 'FAKE_NODE_ID' + node.updated_at = None + node.init_at = 'FAKE_INIT_TIME' + node.status = consts.NS_ACTIVE + mock_time.return_value = False + mock_expand_url.return_value = 'FAKE_EXPANDED_URL' + mock_url_fetch.return_value = "" + + # do it + res = self.hc.run_health_check(ctx, node) + + self.assertTrue(res) + mock_url_fetch.assert_called_once_with('FAKE_EXPANDED_URL', timeout=1, + verify=True) + + @mock.patch.object(time, "sleep") + @mock.patch.object(tu, "is_older_than") + @mock.patch.object(hm.NodePollUrlHealthCheck, "_expand_url_template") + @mock.patch.object(utils, 'url_fetch') + def test_run_health_check_unhealthy(self, + mock_url_fetch, + mock_expand_url, mock_time, + mock_sleep): + ctx = mock.Mock() + node = mock.Mock() + node.status = consts.NS_ACTIVE + node.id = 'FAKE_ID' + mock_time.return_value = True + mock_expand_url.return_value = 'FAKE_EXPANDED_URL' + mock_url_fetch.return_value = "" + + # do it + res = self.hc.run_health_check(ctx, node) + + self.assertFalse(res) + mock_url_fetch.assert_has_calls( + [ + mock.call('FAKE_EXPANDED_URL', timeout=1, verify=True), + mock.call('FAKE_EXPANDED_URL', timeout=1, verify=True) + ] + ) + mock_sleep.assert_has_calls([mock.call(1), mock.call(1)]) + + @mock.patch.object(time, "sleep") + @mock.patch.object(tu, "is_older_than") + @mock.patch.object(hm.NodePollUrlHealthCheck, "_expand_url_template") + @mock.patch.object(utils, 'url_fetch') + def test_run_health_check_conn_error(self, + mock_url_fetch, + mock_expand_url, mock_time, + mock_sleep): + ctx = mock.Mock() + node = mock.Mock() + node.status = consts.NS_ACTIVE + node.id = 'FAKE_ID' + mock_time.return_value = True + mock_expand_url.return_value = 'FAKE_EXPANDED_URL' + mock_url_fetch.side_effect = utils.URLFetchError("Error") + + # do it + res = self.hc.run_health_check(ctx, node) + + self.assertFalse(res) + mock_url_fetch.assert_has_calls( + [ + mock.call('FAKE_EXPANDED_URL', timeout=1, verify=True), + mock.call('FAKE_EXPANDED_URL', timeout=1, verify=True) + ] + ) + mock_sleep.assert_has_calls([mock.call(1), mock.call(1)]) + + @mock.patch.object(time, "sleep") + @mock.patch.object(tu, "is_older_than") + @mock.patch.object(hm.NodePollUrlHealthCheck, "_expand_url_template") + @mock.patch.object(utils, 'url_fetch') + def test_run_health_check_conn_error_noop( + self, mock_url_fetch, mock_expand_url, mock_time, + mock_sleep): + ctx = mock.Mock() + node = mock.Mock() + node.status = consts.NS_ACTIVE + node.id = 'FAKE_ID' + mock_time.return_value = True + mock_expand_url.return_value = 'FAKE_EXPANDED_URL' + mock_url_fetch.side_effect = utils.URLFetchError("Error") + + self.hc.params['poll_url_conn_error_as_unhealthy'] = False + + # do it + res = self.hc.run_health_check(ctx, node) + + self.assertTrue(res) + mock_url_fetch.assert_has_calls( + [ + mock.call('FAKE_EXPANDED_URL', timeout=1, verify=True), + ] + ) + mock_sleep.assert_not_called() + + class TestHealthManager(base.SenlinTestCase): def setUp(self): @@ -526,51 +948,17 @@ class TestHealthManager(base.SenlinTestCase): self.hm._dummy_task() mock_load.assert_called_once_with() + @mock.patch.object(hm.HealthManager, "_start_check") @mock.patch.object(hr.HealthRegistry, 'claim') - @mock.patch.object(objects.HealthRegistry, 'update') - def test_load_runtime_registry(self, mock_update, mock_claim): - mock_claim.return_value = [ - mock.Mock(cluster_id='CID1', - check_type=consts.NODE_STATUS_POLLING, - interval=12, - params={'k1': 'v1'}, - enabled=True), - mock.Mock(cluster_id='CID2', - check_type=consts.NODE_STATUS_POLLING, - interval=34, - params={'k2': 'v2'}, - enabled=False), - mock.Mock(cluster_id='CID3', - check_type='UNKNOWN_CHECK_TYPE', - interval=56, - params={'k3': 'v3'}), - ] - - timer1 = mock.Mock() - timer2 = mock.Mock() - mock_add_timer = self.patchobject(self.hm.TG, 'add_dynamic_timer', - side_effect=[timer1, timer2]) - # do it - self.hm._load_runtime_registry() - - # assertions - mock_claim.assert_called_once_with(self.hm.ctx, self.hm.engine_id) - mock_calls = [ - mock.call(self.hm._poll_cluster, None, None, 'CID1', 12, {}) - ] - mock_add_timer.assert_has_calls(mock_calls) - self.assertEqual(2, len(self.hm.registries)) - self.assertEqual( + def test_load_runtime_registry(self, mock_claim, mock_check): + fake_claims = [ { 'cluster_id': 'CID1', 'check_type': consts.NODE_STATUS_POLLING, 'interval': 12, 'params': {'k1': 'v1'}, - 'timer': timer1, 'enabled': True, }, - self.hm.registries[0]) - self.assertEqual( { 'cluster_id': 'CID2', 'check_type': consts.NODE_STATUS_POLLING, @@ -578,485 +966,23 @@ class TestHealthManager(base.SenlinTestCase): 'params': {'k2': 'v2'}, 'enabled': False, }, - self.hm.registries[1]) - - def test_expand_url_template(self): - url_template = 'https://abc123/foo/bar' - node = mock.Mock() + ] + mock_claim.return_value = [ + mock.Mock(**fake_claims[0]), + mock.Mock(**fake_claims[1]), + ] + mock_check.return_value = fake_claims # do it - res = self.hm._expand_url_template(url_template, node) + self.hm._load_runtime_registry() - self.assertEqual(res, url_template) - - def test_expand_url_template_nodename(self): - node = mock.Mock() - node.name = 'name' - url_template = 'https://abc123/{nodename}/bar' - expanded_url = 'https://abc123/{}/bar'.format(node.name) - - # do it - res = self.hm._expand_url_template(url_template, node) - - self.assertEqual(res, expanded_url) - - @mock.patch.object(hm, "_chase_up") - @mock.patch.object(obj_node.Node, 'get_all_by_cluster') - @mock.patch.object(hm.HealthManager, "_wait_for_action") - @mock.patch.object(obj_cluster.Cluster, 'get') - @mock.patch.object(context, 'get_service_context') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_poll_cluster(self, mock_rpc, mock_ctx, mock_get, - mock_wait, mock_nodes, mock_chase): - x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID') - mock_get.return_value = x_cluster - ctx = mock.Mock() - mock_ctx.return_value = ctx - mock_wait.return_value = (True, "") - x_node = mock.Mock(id='FAKE_NODE', status="ERROR") - mock_nodes.return_value = [x_node] - x_action_check = {'action': 'CHECK_ID'} - x_action_recover = {'action': 'RECOVER_ID'} - mock_rpc.side_effect = [x_action_check, x_action_recover] - - recover_action = {'operation': 'REBUILD'} - # do it - res = self.hm._poll_cluster('CLUSTER_ID', 456, recover_action) - - self.assertEqual(mock_chase.return_value, res) - mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', - project_safe=False) - mock_ctx.assert_called_once_with(user_id=x_cluster.user, - project_id=x_cluster.project) - mock_rpc.assert_has_calls([ - mock.call(ctx, 'cluster_check', mock.ANY), - mock.call(ctx, 'node_recover', mock.ANY) - ]) - mock_wait.assert_called_once_with(ctx, "CHECK_ID", 456) - mock_chase.assert_called_once_with(mock.ANY, 456) - - @mock.patch.object(hm, "_chase_up") - @mock.patch.object(obj_cluster.Cluster, 'get') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_poll_cluster_not_found(self, mock_check, mock_get, mock_chase): - mock_get.return_value = None - - recover_action = {'operation': 'REBUILD'} - # do it - res = self.hm._poll_cluster('CLUSTER_ID', 123, recover_action) - - self.assertEqual(mock_chase.return_value, res) - self.assertEqual(0, mock_check.call_count) - mock_chase.assert_called_once_with(mock.ANY, 123) - - @mock.patch.object(hm, "_chase_up") - @mock.patch.object(context, 'get_service_context') - @mock.patch.object(obj_cluster.Cluster, 'get') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_poll_cluster_failed_check_rpc(self, mock_check, mock_get, - mock_ctx, mock_chase): - x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID') - mock_get.return_value = x_cluster - ctx = mock.Mock() - mock_ctx.return_value = ctx - mock_check.side_effect = Exception("boom") - - recover_action = {'operation': 'REBUILD'} - # do it - res = self.hm._poll_cluster('CLUSTER_ID', 123, recover_action) - - self.assertEqual(mock_chase.return_value, res) - mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', - project_safe=False) - mock_ctx.assert_called_once_with(user_id='USER_ID', - project_id='PROJECT_ID') - mock_check.assert_called_once_with(ctx, 'cluster_check', mock.ANY) - mock_chase.assert_called_once_with(mock.ANY, 123) - - @mock.patch.object(hm, "_chase_up") - @mock.patch.object(hm.HealthManager, "_wait_for_action") - @mock.patch.object(obj_cluster.Cluster, 'get') - @mock.patch.object(context, 'get_service_context') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_poll_cluster_failed_wait(self, mock_rpc, mock_ctx, - mock_get, mock_wait, mock_chase): - x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID') - mock_get.return_value = x_cluster - ctx = mock.Mock() - mock_ctx.return_value = ctx - mock_wait.return_value = (False, "bad") - x_action_check = {'action': 'CHECK_ID'} - mock_rpc.return_value = x_action_check - - recover_action = {'operation': 'REBUILD'} - # do it - res = self.hm._poll_cluster('CLUSTER_ID', 456, recover_action) - - self.assertEqual(mock_chase.return_value, res) - mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', - project_safe=False) - mock_ctx.assert_called_once_with(user_id='USER_ID', - project_id='PROJECT_ID') - mock_rpc.assert_called_once_with(ctx, 'cluster_check', mock.ANY) - mock_wait.assert_called_once_with(ctx, "CHECK_ID", 456) - mock_chase.assert_called_once_with(mock.ANY, 456) - - @mock.patch.object(tu, "is_older_than") - @mock.patch.object(hm.HealthManager, "_expand_url_template") - @mock.patch.object(utils, 'url_fetch') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_check_url_and_recover_node_healthy( - self, mock_rpc, mock_url_fetch, mock_expand_url, mock_time): - ctx = mock.Mock() - node = mock.Mock() - node.status = consts.NS_ACTIVE - mock_time.return_value = True - mock_expand_url.return_value = 'FAKE_EXPANDED_URL' - x_action_check = {'action': 'CHECK_ID'} - mock_rpc.return_value = x_action_check - mock_url_fetch.return_value = ("Healthy because this return value " - "contains FAKE_HEALTHY_PATTERN") - params = { - 'poll_url': 'FAKE_POLL_URL', - 'poll_url_ssl_verify': True, - 'poll_url_conn_error_as_unhealthy': True, - 'poll_url_healthy_response': 'FAKE_HEALTHY_PATTERN', - 'poll_url_retry_limit': 2, - 'poll_url_retry_interval': 1, - 'node_update_timeout': 5, - } - - recover_action = {'operation': 'REBUILD'} - - # do it - res = self.hm._check_url_and_recover_node(ctx, node, recover_action, - params) - - self.assertIsNone(res) - mock_rpc.assert_not_called() - mock_url_fetch.assert_called_once_with('FAKE_EXPANDED_URL', - verify=True) - - @mock.patch.object(tu, "is_older_than") - @mock.patch.object(hm.HealthManager, "_expand_url_template") - @mock.patch.object(utils, 'url_fetch') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_check_url_and_recover_node_unhealthy_inactive( - self, mock_rpc, mock_url_fetch, mock_expand_url, mock_time): - ctx = mock.Mock() - node = mock.Mock() - node.status = consts.NS_RECOVERING - mock_time.return_value = True - mock_expand_url.return_value = 'FAKE_EXPANDED_URL' - x_action_check = {'action': 'CHECK_ID'} - mock_rpc.return_value = x_action_check - mock_url_fetch.return_value = "" - params = { - 'poll_url': 'FAKE_POLL_URL', - 'poll_url_ssl_verify': True, - 'poll_url_conn_error_as_unhealthy': True, - 'poll_url_healthy_response': 'FAKE_HEALTHY_PATTERN', - 'poll_url_retry_limit': 2, - 'poll_url_retry_interval': 1, - 'node_update_timeout': 5, - } - - recover_action = {'operation': 'REBUILD'} - - # do it - res = self.hm._check_url_and_recover_node(ctx, node, recover_action, - params) - - self.assertIsNone(res) - mock_rpc.assert_not_called() - mock_url_fetch.assert_called_once_with('FAKE_EXPANDED_URL', - verify=True) - - @mock.patch.object(tu, "is_older_than") - @mock.patch.object(hm.HealthManager, "_expand_url_template") - @mock.patch.object(utils, 'url_fetch') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_check_url_and_recover_node_unhealthy_update_timeout( - self, mock_rpc, mock_url_fetch, mock_expand_url, mock_time): - ctx = mock.Mock() - node = mock.Mock() - node.id = 'FAKE_NODE_ID' - node.updated_at = 'FAKE_UPDATE_TIME' - node.status = consts.NS_ACTIVE - mock_time.return_value = False - mock_expand_url.return_value = 'FAKE_EXPANDED_URL' - x_action_check = {'action': 'CHECK_ID'} - mock_rpc.return_value = x_action_check - mock_url_fetch.return_value = "" - params = { - 'poll_url': 'FAKE_POLL_URL', - 'poll_url_ssl_verify': True, - 'poll_url_conn_error_as_unhealthy': True, - 'poll_url_healthy_response': 'FAKE_HEALTHY_PATTERN', - 'poll_url_retry_limit': 2, - 'poll_url_retry_interval': 1, - 'node_update_timeout': 5, - } - - recover_action = {'operation': 'REBUILD'} - - # do it - res = self.hm._check_url_and_recover_node(ctx, node, recover_action, - params) - - self.assertIsNone(res) - mock_rpc.assert_not_called() - mock_url_fetch.assert_called_once_with('FAKE_EXPANDED_URL', - verify=True) - - @mock.patch.object(tu, "is_older_than") - @mock.patch.object(hm.HealthManager, "_expand_url_template") - @mock.patch.object(utils, 'url_fetch') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_check_url_and_recover_node_unhealthy_init_timeout( - self, mock_rpc, mock_url_fetch, mock_expand_url, mock_time): - ctx = mock.Mock() - node = mock.Mock() - node.id = 'FAKE_NODE_ID' - node.updated_at = None - node.init_at = 'FAKE_INIT_TIME' - node.status = consts.NS_ACTIVE - mock_time.return_value = False - mock_expand_url.return_value = 'FAKE_EXPANDED_URL' - x_action_check = {'action': 'CHECK_ID'} - mock_rpc.return_value = x_action_check - mock_url_fetch.return_value = "" - params = { - 'poll_url': 'FAKE_POLL_URL', - 'poll_url_ssl_verify': True, - 'poll_url_conn_error_as_unhealthy': True, - 'poll_url_healthy_response': 'FAKE_HEALTHY_PATTERN', - 'poll_url_retry_limit': 2, - 'poll_url_retry_interval': 1, - 'node_update_timeout': 5, - } - - recover_action = {'operation': 'REBUILD'} - - # do it - res = self.hm._check_url_and_recover_node(ctx, node, recover_action, - params) - - self.assertIsNone(res) - mock_rpc.assert_not_called() - mock_url_fetch.assert_called_once_with('FAKE_EXPANDED_URL', - verify=True) - - @mock.patch.object(time, "sleep") - @mock.patch.object(tu, "is_older_than") - @mock.patch.object(hm.HealthManager, "_expand_url_template") - @mock.patch.object(utils, 'url_fetch') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_check_url_and_recover_node_unhealthy(self, - mock_rpc, mock_url_fetch, - mock_expand_url, mock_time, - mock_sleep): - ctx = mock.Mock() - node = mock.Mock() - node.status = consts.NS_ACTIVE - node.id = 'FAKE_ID' - mock_time.return_value = True - mock_expand_url.return_value = 'FAKE_EXPANDED_URL' - x_action_check = {'action': 'CHECK_ID'} - mock_rpc.return_value = x_action_check - mock_url_fetch.return_value = "" - params = { - 'poll_url': 'FAKE_POLL_URL', - 'poll_url_ssl_verify': False, - 'poll_url_conn_error_as_unhealthy': True, - 'poll_url_healthy_response': 'FAKE_HEALTHY_PATTERN', - 'poll_url_retry_limit': 2, - 'poll_url_retry_interval': 1, - 'node_update_timeout': 5, - } - - recover_action = {'operation': 'REBUILD'} - - # do it - res = self.hm._check_url_and_recover_node(ctx, node, recover_action, - params) - - self.assertEqual(mock_rpc.return_value, res) - mock_rpc.assert_called_once_with(ctx, 'node_recover', mock.ANY) - mock_url_fetch.assert_has_calls( + # assertions + mock_claim.assert_called_once_with(self.hm.ctx, self.hm.engine_id) + mock_check.assert_has_calls( [ - mock.call('FAKE_EXPANDED_URL', verify=False), - mock.call('FAKE_EXPANDED_URL', verify=False) + mock.call(fake_claims[0]) ] ) - mock_sleep.assert_has_calls([mock.call(1), mock.call(1)]) - - @mock.patch.object(time, "sleep") - @mock.patch.object(tu, "is_older_than") - @mock.patch.object(hm.HealthManager, "_expand_url_template") - @mock.patch.object(utils, 'url_fetch') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_check_url_and_recover_node_conn_error( - self, mock_rpc, mock_url_fetch, mock_expand_url, mock_time, - mock_sleep): - ctx = mock.Mock() - node = mock.Mock() - node.status = consts.NS_ACTIVE - node.id = 'FAKE_ID' - mock_time.return_value = True - mock_expand_url.return_value = 'FAKE_EXPANDED_URL' - x_action_check = {'action': 'CHECK_ID'} - mock_rpc.return_value = x_action_check - mock_url_fetch.side_effect = utils.URLFetchError("Error") - params = { - 'poll_url': 'FAKE_POLL_URL', - 'poll_url_ssl_verify': False, - 'poll_url_conn_error_as_unhealthy': True, - 'poll_url_healthy_response': 'FAKE_HEALTHY_PATTERN', - 'poll_url_retry_limit': 2, - 'poll_url_retry_interval': 1, - 'node_update_timeout': 5, - } - - recover_action = {'operation': 'REBUILD'} - - # do it - res = self.hm._check_url_and_recover_node(ctx, node, recover_action, - params) - - self.assertEqual(mock_rpc.return_value, res) - mock_rpc.assert_called_once_with(ctx, 'node_recover', mock.ANY) - mock_url_fetch.assert_has_calls( - [ - mock.call('FAKE_EXPANDED_URL', verify=False), - mock.call('FAKE_EXPANDED_URL', verify=False) - ] - ) - mock_sleep.assert_has_calls([mock.call(1), mock.call(1)]) - - @mock.patch.object(time, "sleep") - @mock.patch.object(tu, "is_older_than") - @mock.patch.object(hm.HealthManager, "_expand_url_template") - @mock.patch.object(utils, 'url_fetch') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_check_url_and_recover_node_conn_error_noop( - self, mock_rpc, mock_url_fetch, mock_expand_url, mock_time, - mock_sleep): - ctx = mock.Mock() - node = mock.Mock() - node.status = consts.NS_ACTIVE - node.id = 'FAKE_ID' - mock_time.return_value = True - mock_expand_url.return_value = 'FAKE_EXPANDED_URL' - mock_url_fetch.side_effect = utils.URLFetchError("Error") - params = { - 'poll_url': 'FAKE_POLL_URL', - 'poll_url_ssl_verify': False, - 'poll_url_conn_error_as_unhealthy': False, - 'poll_url_healthy_response': 'FAKE_HEALTHY_PATTERN', - 'poll_url_retry_limit': 2, - 'poll_url_retry_interval': 1, - 'node_update_timeout': 5, - } - - recover_action = {'operation': 'REBUILD'} - - # do it - res = self.hm._check_url_and_recover_node(ctx, node, recover_action, - params) - - self.assertIsNone(res) - mock_rpc.assert_not_called() - mock_url_fetch.assert_has_calls( - [ - mock.call('FAKE_EXPANDED_URL', verify=False), - ] - ) - mock_sleep.assert_not_called() - - @mock.patch.object(hm, "_chase_up") - @mock.patch.object(hm.HealthManager, "_check_url_and_recover_node") - @mock.patch.object(obj_node.Node, 'get_all_by_cluster') - @mock.patch.object(hm.HealthManager, "_wait_for_action") - @mock.patch.object(obj_cluster.Cluster, 'get') - @mock.patch.object(context, 'get_service_context') - def test_poll_url(self, mock_ctx, mock_get, mock_wait, mock_nodes, - mock_check_url, mock_chase): - x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID') - mock_get.return_value = x_cluster - ctx = mock.Mock() - mock_ctx.return_value = ctx - mock_wait.return_value = (True, "") - x_node = mock.Mock(id='FAKE_NODE', status="ERROR") - mock_nodes.return_value = [x_node] - x_action_recover = {'action': 'RECOVER_ID'} - mock_check_url.return_value = x_action_recover - - recover_action = {'operation': 'REBUILD'} - params = {} - - # do it - res = self.hm._poll_url('CLUSTER_ID', 456, recover_action, params) - - self.assertEqual(mock_chase.return_value, res) - mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', - project_safe=False) - mock_ctx.assert_called_once_with(user_id=x_cluster.user, - project_id=x_cluster.project) - mock_check_url.assert_called_once_with(ctx, x_node, - recover_action, params) - mock_wait.assert_called_once_with(ctx, "RECOVER_ID", 456) - mock_chase.assert_called_once_with(mock.ANY, 456) - - @mock.patch.object(hm, "_chase_up") - @mock.patch.object(obj_cluster.Cluster, 'get') - @mock.patch.object(context, 'get_service_context') - def test_poll_url_cluster_not_found(self, mock_ctx, mock_get, - mock_chase): - mock_get.return_value = None - - recover_action = {'operation': 'REBUILD'} - params = {} - - # do it - res = self.hm._poll_url('CLUSTER_ID', 123, recover_action, params) - - self.assertEqual(mock_chase.return_value, res) - mock_ctx.assert_not_called() - mock_chase.assert_called_once_with(mock.ANY, 123) - - @mock.patch.object(hm, "_chase_up") - @mock.patch.object(hm.HealthManager, "_check_url_and_recover_node") - @mock.patch.object(obj_node.Node, 'get_all_by_cluster') - @mock.patch.object(hm.HealthManager, "_wait_for_action") - @mock.patch.object(obj_cluster.Cluster, 'get') - @mock.patch.object(context, 'get_service_context') - def test_poll_url_no_action(self, mock_ctx, mock_get, mock_wait, - mock_nodes, mock_check_url, mock_chase): - x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID') - mock_get.return_value = x_cluster - ctx = mock.Mock() - mock_ctx.return_value = ctx - mock_wait.return_value = (True, "") - x_node = mock.Mock(id='FAKE_NODE', status="ERROR") - mock_nodes.return_value = [x_node] - mock_check_url.return_value = None - - recover_action = {'operation': 'REBUILD'} - params = {} - - # do it - res = self.hm._poll_url('CLUSTER_ID', 456, recover_action, params) - - self.assertEqual(mock_chase.return_value, res) - mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', - project_safe=False) - mock_ctx.assert_called_once_with(user_id=x_cluster.user, - project_id=x_cluster.project) - mock_check_url.assert_called_once_with(ctx, x_node, - recover_action, params) - mock_wait.assert_not_called() - mock_chase.assert_called_once_with(mock.ANY, 456) @mock.patch.object(obj_profile.Profile, 'get') @mock.patch.object(obj_cluster.Cluster, 'get') @@ -1148,30 +1074,531 @@ class TestHealthManager(base.SenlinTestCase): project_safe=False) self.assertEqual(0, mock_add_thread.call_count) - def test_start_check_for_polling(self): + @mock.patch.object(rpc_client.EngineClient, 'call') + @mock.patch('senlin.objects.NodeRecoverRequest', autospec=True) + def test_recover_node(self, mock_req, mock_rpc): + ctx = mock.Mock() + node_id = 'FAKE_NODE' + recover_action = {'operation': 'REBUILD'} + + x_req = mock.Mock + mock_req.return_value = x_req + + x_action = {'action': 'RECOVER_ID1'} + mock_rpc.return_value = x_action + + # do it + res = self.hm._recover_node(node_id, ctx, recover_action) + + self.assertEqual(x_action, res) + mock_req.assert_called_once_with( + identity=node_id, params=recover_action) + mock_rpc.assert_called_once_with(ctx, 'node_recover', x_req) + + @mock.patch.object(rpc_client.EngineClient, 'call') + @mock.patch('senlin.objects.NodeRecoverRequest', autospec=True) + def test_recover_node_failed(self, mock_req, mock_rpc): + ctx = mock.Mock() + node_id = 'FAKE_NODE' + recover_action = {'operation': 'REBUILD'} + + x_req = mock.Mock + mock_req.return_value = x_req + + mock_rpc.side_effect = Exception('boom') + + # do it + res = self.hm._recover_node(node_id, ctx, recover_action) + + self.assertIsNone(res) + mock_req.assert_called_once_with( + identity=node_id, params=recover_action) + mock_rpc.assert_called_once_with(ctx, 'node_recover', x_req) + + @mock.patch('senlin.objects.ActionGetRequest') + @mock.patch.object(rpc_client.EngineClient, 'call') + def test_wait_for_action(self, mock_rpc, mock_action_req): + x_req = mock.Mock() + mock_action_req.return_value = x_req + + x_action = {'status': consts.ACTION_SUCCEEDED} + mock_rpc.return_value = x_action + + ctx = mock.Mock() + action_id = 'FAKE_ACTION_ID' + timeout = 5 + + # do it + res, err = self.hm._wait_for_action(ctx, action_id, timeout) + + self.assertTrue(res) + self.assertEqual(err, '') + mock_rpc.assert_called_with(ctx, 'action_get', x_req) + + @mock.patch('senlin.objects.ActionGetRequest') + @mock.patch.object(rpc_client.EngineClient, 'call') + def test_wait_for_action_success_before_timeout( + self, mock_rpc, mock_action_req): + x_req = mock.Mock() + mock_action_req.return_value = x_req + + x_action1 = {'status': consts.ACTION_RUNNING} + x_action2 = {'status': consts.ACTION_SUCCEEDED} + mock_rpc.side_effect = [x_action1, x_action2] + + ctx = mock.Mock() + action_id = 'FAKE_ACTION_ID' + timeout = 5 + + # do it + res, err = self.hm._wait_for_action(ctx, action_id, timeout) + + self.assertTrue(res) + self.assertEqual(err, '') + mock_rpc.assert_has_calls( + [ + mock.call(ctx, 'action_get', x_req), + mock.call(ctx, 'action_get', x_req) + ] + ) + + @mock.patch('senlin.objects.ActionGetRequest') + @mock.patch.object(rpc_client.EngineClient, 'call') + def test_wait_for_action_timeout(self, mock_rpc, mock_action_req): + x_req = mock.Mock() + mock_action_req.return_value = x_req + + x_action = {'status': consts.ACTION_RUNNING} + mock_rpc.return_value = x_action + + ctx = mock.Mock() + action_id = 'FAKE_ACTION_ID' + timeout = 5 + + # do it + res, err = self.hm._wait_for_action(ctx, action_id, timeout) + + self.assertFalse(res) + self.assertTrue(re.search('timeout', err, re.IGNORECASE)) + mock_rpc.assert_has_calls( + [ + mock.call(ctx, 'action_get', x_req) + ] + ) + + @mock.patch('senlin.objects.ActionGetRequest') + @mock.patch.object(rpc_client.EngineClient, 'call') + def test_wait_for_action_failed(self, mock_rpc, mock_action_req): + x_req = mock.Mock() + mock_action_req.return_value = x_req + + x_action = {'status': consts.ACTION_FAILED} + mock_rpc.return_value = x_action + + ctx = mock.Mock() + action_id = 'FAKE_ACTION_ID' + timeout = 5 + + # do it + res, err = self.hm._wait_for_action(ctx, action_id, timeout) + + self.assertFalse(res) + self.assertEqual(err, 'Cluster check action failed or cancelled') + mock_rpc.assert_called_with(ctx, 'action_get', x_req) + + @mock.patch('senlin.objects.ActionGetRequest') + @mock.patch.object(rpc_client.EngineClient, 'call') + def test_wait_for_action_cancelled(self, mock_rpc, mock_action_req): + x_req = mock.Mock() + mock_action_req.return_value = x_req + + x_action = {'status': consts.ACTION_CANCELLED} + mock_rpc.return_value = x_action + + ctx = mock.Mock() + action_id = 'FAKE_ACTION_ID' + timeout = 5 + + # do it + res, err = self.hm._wait_for_action(ctx, action_id, timeout) + + self.assertFalse(res) + self.assertEqual(err, 'Cluster check action failed or cancelled') + mock_rpc.assert_called_with(ctx, 'action_get', x_req) + + @mock.patch.object(obj_node.Node, 'get_all_by_cluster') + @mock.patch.object(hm.HealthManager, "_recover_node") + @mock.patch.object(hm.HealthManager, "_wait_for_action") + @mock.patch.object(obj_cluster.Cluster, 'get') + @mock.patch.object(context, 'get_service_context') + def test_execute_health_check_any_mode_healthy( + self, mock_ctx, mock_get, mock_wait, mock_recover, mock_nodes): + cluster_id = 'CLUSTER_ID' + interval = 1 + recovery_cond = consts.ANY_FAILED + node_update_timeout = 1 + recovery_action = {'operation': 'REBUILD'} + + x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID') + mock_get.return_value = x_cluster + + ctx = mock.Mock() + mock_ctx.return_value = ctx + + mock_wait.return_value = (True, "") + + x_node1 = mock.Mock(id='FAKE_NODE1', status="ERROR") + x_node2 = mock.Mock(id='FAKE_NODE2', status="ERROR") + mock_nodes.return_value = [x_node1, x_node2] + + hc_true = {'run_health_check.return_value': True} + + hc_test_values = [ + [ + mock.Mock(**hc_true), + mock.Mock(**hc_true), + mock.Mock(**hc_true), + ], + ] + + self.hm.cluster_id = cluster_id + + for hc_mocks in hc_test_values: + self.hm.health_check_types = { + cluster_id: hc_mocks + } + + mock_get.reset_mock() + mock_ctx.reset_mock() + mock_recover.reset_mock() + mock_wait.reset_mock() + + # do it + self.hm._execute_health_check(interval, cluster_id, + recovery_action, + recovery_cond, node_update_timeout) + + mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', + project_safe=False) + mock_ctx.assert_called_once_with(user_id=x_cluster.user, + project_id=x_cluster.project) + + for mock_hc in hc_mocks: + mock_hc.run_health_check.assert_has_calls( + [ + mock.call(ctx, x_node1), + mock.call(ctx, x_node2) + ] + ) + + mock_recover.assert_not_called() + mock_wait.assert_not_called() + + @mock.patch.object(obj_node.Node, 'get_all_by_cluster') + @mock.patch.object(hm.HealthManager, "_recover_node") + @mock.patch.object(hm.HealthManager, "_wait_for_action") + @mock.patch.object(obj_cluster.Cluster, 'get') + @mock.patch.object(context, 'get_service_context') + def test_execute_health_check_any_mode_unhealthy( + self, mock_ctx, mock_get, mock_wait, mock_recover, mock_nodes): + cluster_id = 'CLUSTER_ID' + interval = 1 + recovery_cond = consts.ANY_FAILED + node_update_timeout = 1 + recovery_action = {'operation': 'REBUILD'} + + x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID') + mock_get.return_value = x_cluster + + ctx = mock.Mock() + mock_ctx.return_value = ctx + + mock_wait.return_value = (True, "") + + x_node = mock.Mock(id='FAKE_NODE', status="ERROR") + mock_nodes.return_value = [x_node] + + mock_recover.return_value = {'action': 'FAKE_ACTION_ID'} + + hc_true = {'run_health_check.return_value': True} + hc_false = {'run_health_check.return_value': False} + + hc_test_values = [ + [ + mock.Mock(**hc_false), + mock.Mock(**hc_true), + mock.Mock(**hc_true), + ], + [ + mock.Mock(**hc_true), + mock.Mock(**hc_false), + mock.Mock(**hc_true), + ], + [ + mock.Mock(**hc_true), + mock.Mock(**hc_true), + mock.Mock(**hc_false), + ] + ] + + for hc_mocks in hc_test_values: + self.hm.health_check_types = { + cluster_id: hc_mocks + } + + mock_get.reset_mock() + mock_ctx.reset_mock() + mock_recover.reset_mock() + mock_wait.reset_mock() + + # do it + self.hm._execute_health_check(interval, cluster_id, + recovery_action, + recovery_cond, node_update_timeout) + + mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', + project_safe=False) + mock_ctx.assert_called_once_with(user_id=x_cluster.user, + project_id=x_cluster.project) + + # health checks should be called until one of them returns false + previous_hc_returned_false = False + for mock_hc in hc_mocks: + if not previous_hc_returned_false: + mock_hc.run_health_check.assert_called_once_with( + ctx, x_node) + else: + mock_hc.assert_not_called() + if not mock_hc.run_health_check.return_value: + previous_hc_returned_false = True + + mock_recover.assert_called_once_with('FAKE_NODE', ctx, mock.ANY) + mock_wait.assert_called_once_with( + ctx, 'FAKE_ACTION_ID', node_update_timeout) + + @mock.patch.object(obj_node.Node, 'get_all_by_cluster') + @mock.patch.object(hm.HealthManager, "_recover_node") + @mock.patch.object(hm.HealthManager, "_wait_for_action") + @mock.patch.object(obj_cluster.Cluster, 'get') + @mock.patch.object(context, 'get_service_context') + def test_execute_health_check_all_mode_healthy( + self, mock_ctx, mock_get, mock_wait, mock_recover, mock_nodes): + cluster_id = 'CLUSTER_ID' + interval = 1 + recovery_cond = consts.ALL_FAILED + node_update_timeout = 1 + recovery_action = {'operation': 'REBUILD'} + + x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID') + mock_get.return_value = x_cluster + + ctx = mock.Mock() + mock_ctx.return_value = ctx + + mock_wait.return_value = (True, "") + + x_node = mock.Mock(id='FAKE_NODE1', status="ERROR") + mock_nodes.return_value = [x_node] + + hc_true = {'run_health_check.return_value': True} + hc_false = {'run_health_check.return_value': False} + + hc_test_values = [ + [ + mock.Mock(**hc_true), + mock.Mock(**hc_true), + mock.Mock(**hc_true), + ], + [ + mock.Mock(**hc_false), + mock.Mock(**hc_true), + mock.Mock(**hc_true), + ], + [ + mock.Mock(**hc_true), + mock.Mock(**hc_false), + mock.Mock(**hc_true), + ], + [ + mock.Mock(**hc_true), + mock.Mock(**hc_true), + mock.Mock(**hc_false), + ], + ] + + self.hm.cluster_id = cluster_id + + for hc_mocks in hc_test_values: + self.hm.health_check_types = { + cluster_id: hc_mocks + } + + mock_get.reset_mock() + mock_ctx.reset_mock() + mock_recover.reset_mock() + mock_wait.reset_mock() + + # do it + self.hm._execute_health_check(interval, cluster_id, + recovery_action, + recovery_cond, node_update_timeout) + + mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', + project_safe=False) + mock_ctx.assert_called_once_with(user_id=x_cluster.user, + project_id=x_cluster.project) + + # health checks should be called until one of them returns true + previous_hc_returned_true = False + for mock_hc in hc_mocks: + if not previous_hc_returned_true: + mock_hc.run_health_check.assert_called_once_with( + ctx, x_node) + else: + mock_hc.assert_not_called() + if mock_hc.run_health_check.return_value: + previous_hc_returned_true = True + + mock_recover.assert_not_called() + mock_wait.assert_not_called() + + @mock.patch.object(obj_node.Node, 'get_all_by_cluster') + @mock.patch.object(hm.HealthManager, "_recover_node") + @mock.patch.object(hm.HealthManager, "_wait_for_action") + @mock.patch.object(obj_cluster.Cluster, 'get') + @mock.patch.object(context, 'get_service_context') + def test_execute_health_check_all_mode_unhealthy( + self, mock_ctx, mock_get, mock_wait, mock_recover, mock_nodes): + cluster_id = 'CLUSTER_ID' + interval = 1 + recovery_cond = consts.ALL_FAILED + node_update_timeout = 1 + recovery_action = {'operation': 'REBUILD'} + + x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID') + mock_get.return_value = x_cluster + + ctx = mock.Mock() + mock_ctx.return_value = ctx + + mock_wait.return_value = (True, "") + + x_node = mock.Mock(id='FAKE_NODE', status="ERROR") + mock_nodes.return_value = [x_node] + + mock_recover.return_value = {'action': 'FAKE_ACTION_ID'} + + hc_false = {'run_health_check.return_value': False} + + hc_test_values = [ + [ + mock.Mock(**hc_false), + mock.Mock(**hc_false), + mock.Mock(**hc_false), + ] + ] + + self.hm.cluster_id = cluster_id + self.hm.node_update_timeout = 1 + + for hc_mocks in hc_test_values: + self.hm.health_check_types = { + cluster_id: hc_mocks + } + + mock_get.reset_mock() + mock_ctx.reset_mock() + mock_recover.reset_mock() + mock_wait.reset_mock() + + # do it + self.hm._execute_health_check(interval, cluster_id, + recovery_action, + recovery_cond, node_update_timeout) + + mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', + project_safe=False) + mock_ctx.assert_called_once_with(user_id=x_cluster.user, + project_id=x_cluster.project) + + # all health checks should be called + for mock_hc in hc_mocks: + mock_hc.run_health_check.assert_called_once_with(ctx, x_node) + + mock_recover.assert_called_once_with('FAKE_NODE', ctx, mock.ANY) + mock_wait.assert_called_once_with( + ctx, 'FAKE_ACTION_ID', self.hm.node_update_timeout) + + @mock.patch.object(obj_cluster.Cluster, 'get') + @mock.patch.object(context, 'get_service_context') + def test_execute_health_check_cluster_not_found(self, mock_ctx, mock_get): + cluster_id = 'CLUSTER_ID' + interval = 1 + recovery_cond = consts.ANY_FAILED + node_update_timeout = 1 + recovery_action = {'operation': 'REBUILD'} + + mock_get.return_value = None + + # do it + self.hm._execute_health_check(interval, cluster_id, + recovery_action, recovery_cond, + node_update_timeout) + + mock_ctx.assert_not_called() + + def test_start_check_invalid_type(self): + entry = { + 'cluster_id': 'CCID', + 'interval': 12, + 'check_type': 'blah', + 'params': { + 'recover_action': [{'name': 'REBUILD'}] + }, + } + + res = self.hm._start_check(entry) + + self.assertIsNone(res) + + @mock.patch.object(threadgroup.ThreadGroup, 'add_dynamic_timer') + @mock.patch.object(hm.HealthManager, '_add_health_check') + @mock.patch.object(hm.HealthCheckType, 'factory') + def test_start_check_for_polling(self, mock_hc_factory, mock_add_hc, + mock_add_timer): x_timer = mock.Mock() - mock_add_timer = self.patchobject(self.hm.TG, 'add_dynamic_timer', - return_value=x_timer) + mock_add_timer.return_value = x_timer entry = { 'cluster_id': 'CCID', 'interval': 12, 'check_type': consts.NODE_STATUS_POLLING, - 'params': {'recover_action': [{'name': 'REBUILD'}]}, + 'params': { + 'recover_action': [{'name': 'REBUILD'}], + 'recovery_conditional': 'ANY_FAILED', + 'node_update_timeout': 1, + }, } - recover_action = {'operation': 'REBUILD'} + res = self.hm._start_check(entry) expected = copy.deepcopy(entry) expected['timer'] = x_timer self.assertEqual(expected, res) mock_add_timer.assert_called_once_with( - self.hm._poll_cluster, None, None, 'CCID', 12, recover_action) + self.hm._execute_health_check, None, None, 12, 'CCID', + {'operation': 'REBUILD'}, 'ANY_FAILED', 1) + mock_add_hc.assert_called_once_with('CCID', mock.ANY) + mock_hc_factory.assert_called_once_with( + consts.NODE_STATUS_POLLING, 'CCID', 12, entry['params']) - def test_start_check_for_poll_url(self): + @mock.patch.object(threadgroup.ThreadGroup, 'add_dynamic_timer') + @mock.patch.object(hm.HealthManager, '_add_health_check') + @mock.patch.object(hm.HealthCheckType, 'factory') + def test_start_check_for_poll_url(self, mock_hc_factory, mock_add_hc, + mock_add_timer): x_timer = mock.Mock() - mock_add_timer = self.patchobject(self.hm.TG, 'add_dynamic_timer', - return_value=x_timer) + mock_add_timer.return_value = x_timer entry = { 'cluster_id': 'CCID', @@ -1179,23 +1606,69 @@ class TestHealthManager(base.SenlinTestCase): 'check_type': consts.NODE_STATUS_POLL_URL, 'params': { 'recover_action': [{'name': 'REBUILD'}], - 'node_delete_timeout': 23, - 'node_force_recreate': True + 'recovery_conditional': 'ANY_FAILED', + 'node_update_timeout': 1, }, } - recover_action = { - 'operation': 'REBUILD', - 'delete_timeout': 23, - 'force_recreate': True - } + res = self.hm._start_check(entry) expected = copy.deepcopy(entry) expected['timer'] = x_timer self.assertEqual(expected, res) mock_add_timer.assert_called_once_with( - self.hm._poll_url, None, None, 'CCID', 12, recover_action, - entry['params']) + self.hm._execute_health_check, None, None, 12, 'CCID', + {'operation': 'REBUILD'}, 'ANY_FAILED', 1) + mock_add_hc.assert_called_once_with('CCID', mock.ANY) + mock_hc_factory.assert_called_once_with( + consts.NODE_STATUS_POLL_URL, + 'CCID', 12, entry['params']) + + @mock.patch.object(threadgroup.ThreadGroup, 'add_dynamic_timer') + @mock.patch.object(hm.HealthManager, '_add_health_check') + @mock.patch.object(hm.HealthCheckType, 'factory') + def test_start_check_poll_url_and_polling(self, mock_hc_factory, + mock_add_hc, mock_add_timer): + x_timer = mock.Mock() + mock_add_timer.return_value = x_timer + + check_type = ','.join( + [consts.NODE_STATUS_POLL_URL, consts.NODE_STATUS_POLLING]) + entry = { + 'cluster_id': 'CCID', + 'interval': 12, + 'check_type': check_type, + 'params': { + 'recover_action': [{'name': 'REBUILD'}], + 'recovery_conditional': 'ALL_FAILED', + 'node_update_timeout': 1, + }, + } + + res = self.hm._start_check(entry) + + expected = copy.deepcopy(entry) + expected['timer'] = x_timer + self.assertEqual(expected, res) + mock_add_timer.assert_called_once_with( + self.hm._execute_health_check, None, None, 12, 'CCID', + {'operation': 'REBUILD'}, 'ALL_FAILED', 1) + mock_add_hc.assert_has_calls( + [ + mock.call('CCID', mock.ANY), + mock.call('CCID', mock.ANY) + ] + ) + mock_hc_factory.assert_has_calls( + [ + mock.call( + consts.NODE_STATUS_POLL_URL, 'CCID', 12, entry['params'] + ), + mock.call( + consts.NODE_STATUS_POLLING, 'CCID', 12, entry['params'] + ), + ] + ) def test_start_check_for_listening(self): x_listener = mock.Mock() @@ -1242,27 +1715,39 @@ class TestHealthManager(base.SenlinTestCase): def test_stop_check_with_timer(self): x_timer = mock.Mock() - entry = {'timer': x_timer} + entry = {'timer': x_timer, 'cluster_id': 'CLUSTER_ID'} mock_timer_done = self.patchobject(self.hm.TG, 'timer_done') + x_hc_types = mock.MagicMock() + x_hc_types.__contains__.return_value = True + x_hc_types.__iter__.return_value = ['CLUSTER_ID'] + self.hm.health_check_types = x_hc_types + # do it res = self.hm._stop_check(entry) self.assertIsNone(res) x_timer.stop.assert_called_once_with() mock_timer_done.assert_called_once_with(x_timer) + x_hc_types.pop.assert_called_once_with('CLUSTER_ID') def test_stop_check_with_listener(self): x_thread = mock.Mock() - entry = {'listener': x_thread} + entry = {'listener': x_thread, 'cluster_id': 'CLUSTER_ID'} mock_thread_done = self.patchobject(self.hm.TG, 'thread_done') + x_hc_types = mock.MagicMock() + x_hc_types.__contains__.return_value = False + x_hc_types.__iter__.return_value = ['CLUSTER_ID'] + self.hm.health_check_types = x_hc_types + # do it res = self.hm._stop_check(entry) self.assertIsNone(res) x_thread.stop.assert_called_once_with() mock_thread_done.assert_called_once_with(x_thread) + x_hc_types.pop.assert_not_called() @mock.patch('oslo_messaging.Target') def test_start(self, mock_target): @@ -1289,53 +1774,91 @@ class TestHealthManager(base.SenlinTestCase): cfg.CONF.periodic_interval, self.hm._dummy_task) @mock.patch.object(hr.HealthRegistry, 'create') - def test_register_cluster(self, mock_reg_create): + @mock.patch.object(hm.HealthManager, '_start_check') + def test_register_cluster(self, mock_check, mock_reg_create): + entry = { + 'cluster_id': 'CLUSTER_ID', + 'check_type': consts.NODE_STATUS_POLLING, + 'interval': 50, + 'params': { + 'blah': '123', + 'detection_modes': [ + { + 'type': consts.NODE_STATUS_POLLING, + 'poll_url': '', + 'poll_url_ssl_verify': True, + 'poll_url_conn_error_as_unhealthy': True, + 'poll_url_healthy_response': '', + 'poll_url_retry_limit': '', + 'poll_url_retry_interval': '', + } + ], + }, + 'enabled': True + } + ctx = mock.Mock() - timer = mock.Mock() - mock_add_tm = self.patchobject(self.hm.TG, 'add_dynamic_timer', - return_value=timer) - mock_poll = self.patchobject(self.hm, '_poll_cluster', - return_value=mock.Mock()) - x_reg = mock.Mock(cluster_id='CLUSTER_ID', - check_type=consts.NODE_STATUS_POLLING, - interval=50, params={}) + + x_reg = mock.Mock(cluster_id=entry['cluster_id'], + check_type=entry['check_type'], + interval=entry['interval'], params=entry['params'], + enabled=entry['enabled']) mock_reg_create.return_value = x_reg - self.hm.register_cluster(ctx, - cluster_id='CLUSTER_ID', - check_type=consts.NODE_STATUS_POLLING, - interval=50, enabled=True) + self.hm.register_cluster( + ctx, cluster_id=entry['cluster_id'], interval=entry['interval'], + node_update_timeout=1, params=entry['params'], + enabled=entry['enabled']) mock_reg_create.assert_called_once_with( - ctx, 'CLUSTER_ID', consts.NODE_STATUS_POLLING, 50, {}, 'ENGINE_ID', - enabled=True) - mock_add_tm.assert_called_with(mock_poll, None, None, 'CLUSTER_ID', 50, - {}) + ctx, entry['cluster_id'], consts.NODE_STATUS_POLLING, + entry['interval'], entry['params'], 'ENGINE_ID', + enabled=entry['enabled']) + mock_check.assert_called_once_with(entry) self.assertEqual(1, len(self.hm.registries)) @mock.patch.object(hr.HealthRegistry, 'create') - def test_register_cluster_not_enabled(self, mock_reg_create): + @mock.patch.object(hm.HealthManager, '_start_check') + def test_register_cluster_not_enabled(self, mock_check, mock_reg_create): + entry = { + 'cluster_id': 'CLUSTER_ID', + 'check_type': consts.NODE_STATUS_POLLING, + 'interval': 50, + 'params': { + 'blah': '123', + 'detection_modes': [ + { + 'type': consts.NODE_STATUS_POLLING, + 'poll_url': '', + 'poll_url_ssl_verify': True, + 'poll_url_conn_error_as_unhealthy': True, + 'poll_url_healthy_response': '', + 'poll_url_retry_limit': '', + 'poll_url_retry_interval': '', + } + ], + }, + 'enabled': False + } + ctx = mock.Mock() - timer = mock.Mock() - mock_add_tm = self.patchobject(self.hm.TG, 'add_dynamic_timer', - return_value=timer) - mock_poll = self.patchobject(self.hm, '_poll_cluster', - return_value=mock.Mock()) - x_reg = mock.Mock(cluster_id='CLUSTER_ID', - check_type=consts.NODE_STATUS_POLLING, - interval=50, params={}, enabled=False) + + x_reg = mock.Mock(cluster_id=entry['cluster_id'], + check_type=entry['check_type'], + interval=entry['interval'], params=entry['params'], + enabled=entry['enabled']) mock_reg_create.return_value = x_reg - self.hm.register_cluster(ctx, - cluster_id='CLUSTER_ID', - check_type=consts.NODE_STATUS_POLLING, - interval=50, enabled=x_reg.enabled) + self.hm.register_cluster( + ctx, cluster_id=entry['cluster_id'], interval=entry['interval'], + node_update_timeout=1, params=entry['params'], + enabled=entry['enabled']) mock_reg_create.assert_called_once_with( - ctx, 'CLUSTER_ID', consts.NODE_STATUS_POLLING, 50, {}, 'ENGINE_ID', - enabled=False) - mock_add_tm.assert_not_called() - mock_poll.assert_not_called() + ctx, entry['cluster_id'], consts.NODE_STATUS_POLLING, + entry['interval'], entry['params'], 'ENGINE_ID', + enabled=entry['enabled']) + mock_check.assert_not_called() self.assertEqual(1, len(self.hm.registries)) @mock.patch.object(hm.HealthManager, '_stop_check') diff --git a/senlin/tests/unit/policies/test_health_policy.py b/senlin/tests/unit/policies/test_health_policy.py index 01d62e62b..76317d36b 100644 --- a/senlin/tests/unit/policies/test_health_policy.py +++ b/senlin/tests/unit/policies/test_health_policy.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. +from collections import namedtuple import copy import mock @@ -35,13 +36,15 @@ class TestHealthPolicy(base.SenlinTestCase): self.spec = { 'type': 'senlin.policy.health', - 'version': '1.0', + 'version': '1.1', 'properties': { 'detection': { - 'type': 'NODE_STATUS_POLLING', - 'options': { - 'interval': 60 - } + "detection_modes": [ + { + 'type': 'NODE_STATUS_POLLING' + }, + ], + 'interval': 60 }, 'recovery': { 'fencing': ['COMPUTE'], @@ -62,13 +65,94 @@ class TestHealthPolicy(base.SenlinTestCase): self.hp = health_policy.HealthPolicy('test-policy', self.spec) def test_policy_init(self): - self.assertIsNone(self.hp.id) - self.assertEqual('test-policy', self.hp.name) - self.assertEqual('senlin.policy.health-1.0', self.hp.type) - self.assertEqual('NODE_STATUS_POLLING', self.hp.check_type) - self.assertEqual(60, self.hp.interval) + DetectionMode = namedtuple( + 'DetectionMode', + [self.hp.DETECTION_TYPE] + list(self.hp._DETECTION_OPTIONS)) + + detection_modes = [ + DetectionMode( + type='NODE_STATUS_POLLING', + poll_url='', + poll_url_ssl_verify=True, + poll_url_conn_error_as_unhealthy=True, + poll_url_healthy_response='', + poll_url_retry_limit='', + poll_url_retry_interval='' + ) + ] + + spec = { + 'type': 'senlin.policy.health', + 'version': '1.1', + 'properties': { + 'detection': { + "detection_modes": [ + { + 'type': 'NODE_STATUS_POLLING' + }, + ], + 'interval': 60 + }, + 'recovery': { + 'fencing': ['COMPUTE'], + 'actions': [ + {'name': 'REBUILD'} + ] + } + } + } + + hp = health_policy.HealthPolicy('test-policy', spec) + + self.assertIsNone(hp.id) + self.assertEqual('test-policy', hp.name) + self.assertEqual('senlin.policy.health-1.1', hp.type) + self.assertEqual(detection_modes, hp.detection_modes) + self.assertEqual(60, hp.interval) self.assertEqual([{'name': 'REBUILD', 'params': None}], - self.hp.recover_actions) + hp.recover_actions) + + def test_policy_init_ops(self): + spec = { + 'type': 'senlin.policy.health', + 'version': '1.1', + 'properties': { + 'detection': { + "detection_modes": [ + { + 'type': 'NODE_STATUS_POLLING' + }, + { + 'type': 'NODE_STATUS_POLL_URL' + }, + ], + 'interval': 60 + }, + 'recovery': { + 'fencing': ['COMPUTE'], + 'actions': [ + {'name': 'REBUILD'} + ] + } + } + } + + operations = [None, 'ALL_FAILED', 'ANY_FAILED'] + for op in operations: + # set operation in spec + if op: + spec['properties']['detection']['recovery_conditional'] = op + + # test __init__ + hp = health_policy.HealthPolicy('test-policy', spec) + + # check result + self.assertIsNone(hp.id) + self.assertEqual('test-policy', hp.name) + self.assertEqual('senlin.policy.health-1.1', hp.type) + self.assertEqual(60, hp.interval) + self.assertEqual([{'name': 'REBUILD', 'params': None}], + hp.recover_actions) def test_validate(self): spec = copy.deepcopy(self.spec) @@ -86,7 +170,7 @@ class TestHealthPolicy(base.SenlinTestCase): def test_validate_valid_interval(self): spec = copy.deepcopy(self.spec) - spec["properties"]["detection"]["options"]["interval"] = 20 + spec["properties"]["detection"]["interval"] = 20 self.hp = health_policy.HealthPolicy('test-policy', spec) cfg.CONF.set_override('health_check_interval_min', 20) @@ -95,7 +179,7 @@ class TestHealthPolicy(base.SenlinTestCase): def test_validate_invalid_interval(self): spec = copy.deepcopy(self.spec) - spec["properties"]["detection"]["options"]["interval"] = 10 + spec["properties"]["detection"]["interval"] = 10 self.hp = health_policy.HealthPolicy('test-policy', spec) cfg.CONF.set_override('health_check_interval_min', 20) @@ -116,19 +200,24 @@ class TestHealthPolicy(base.SenlinTestCase): policy_data = { 'HealthPolicy': { 'data': { - 'check_type': self.hp.check_type, 'interval': self.hp.interval, - 'poll_url': '', - 'poll_url_ssl_verify': True, - 'poll_url_conn_error_as_unhealthy': True, - 'poll_url_healthy_response': '', - 'poll_url_retry_limit': 3, - 'poll_url_retry_interval': 3, + 'detection_modes': [ + { + 'type': 'NODE_STATUS_POLLING', + 'poll_url': '', + 'poll_url_ssl_verify': True, + 'poll_url_conn_error_as_unhealthy': True, + 'poll_url_healthy_response': '', + 'poll_url_retry_limit': '', + 'poll_url_retry_interval': '' + } + ], 'node_update_timeout': 300, 'node_delete_timeout': 20, - 'node_force_recreate': False + 'node_force_recreate': False, + 'recovery_conditional': 'ANY_FAILED' }, - 'version': '1.0' + 'version': '1.1' } } @@ -136,19 +225,24 @@ class TestHealthPolicy(base.SenlinTestCase): self.assertTrue(res) self.assertEqual(policy_data, data) kwargs = { - 'check_type': self.hp.check_type, 'interval': self.hp.interval, + 'node_update_timeout': 300, 'params': { 'recover_action': self.hp.recover_actions, - 'poll_url': '', - 'poll_url_ssl_verify': True, - 'poll_url_conn_error_as_unhealthy': True, - 'poll_url_healthy_response': '', - 'poll_url_retry_limit': 3, - 'poll_url_retry_interval': 3, - 'node_update_timeout': 300, 'node_delete_timeout': 20, - 'node_force_recreate': False + 'node_force_recreate': False, + 'recovery_conditional': 'ANY_FAILED', + 'detection_modes': [ + { + 'type': 'NODE_STATUS_POLLING', + 'poll_url': '', + 'poll_url_ssl_verify': True, + 'poll_url_conn_error_as_unhealthy': True, + 'poll_url_healthy_response': '', + 'poll_url_retry_limit': '', + 'poll_url_retry_interval': '' + } + ], }, 'enabled': True } diff --git a/setup.cfg b/setup.cfg index a8b0cd311..484949f11 100644 --- a/setup.cfg +++ b/setup.cfg @@ -57,6 +57,7 @@ senlin.policies = senlin.policy.deletion-1.1 = senlin.policies.deletion_policy:DeletionPolicy senlin.policy.scaling-1.0 = senlin.policies.scaling_policy:ScalingPolicy senlin.policy.health-1.0 = senlin.policies.health_policy:HealthPolicy + senlin.policy.health-1.1 = senlin.policies.health_policy:HealthPolicy senlin.policy.loadbalance-1.0 = senlin.policies.lb_policy:LoadBalancingPolicy senlin.policy.loadbalance-1.1 = senlin.policies.lb_policy:LoadBalancingPolicy senlin.policy.region_placement-1.0 = senlin.policies.region_placement:RegionPlacementPolicy