diff --git a/senlin/api/common/wsgi.py b/senlin/api/common/wsgi.py index bafb1ffa7..0ed62868a 100644 --- a/senlin/api/common/wsgi.py +++ b/senlin/api/common/wsgi.py @@ -652,7 +652,7 @@ class Resource(object): except exception.SenlinException as err: raise translate_exception(err, request.best_match_language()) except Exception as err: - log_exception(err, sys.exc_info()) + log_exception(err) raise translate_exception(err, request.best_match_language()) try: @@ -830,9 +830,8 @@ class Controller(object): raise exc.HTTPNotFound() -def log_exception(err, exc_info): - args = {'exc_info': exc_info} - LOG.error("Unexpected error occurred serving API: %s", err, **args) +def log_exception(err): + LOG.error("Unexpected error occurred serving API: %s", err) def translate_exception(ex, locale): diff --git a/senlin/common/config.py b/senlin/common/config.py index d0d88df1f..5cfa7bdf9 100755 --- a/senlin/common/config.py +++ b/senlin/common/config.py @@ -110,6 +110,9 @@ engine_opts = [ cfg.IntOpt('scheduler_thread_pool_size', default=1000, help=_('Maximum number of threads to use for scheduler.')), + cfg.IntOpt('health_manager_thread_pool_size', + default=1000, + help=_('Maximum number of threads to use for health manager.')), ] cfg.CONF.register_opts(engine_opts) diff --git a/senlin/common/consts.py b/senlin/common/consts.py index 7b90db9d6..03739b9fe 100755 --- a/senlin/common/consts.py +++ b/senlin/common/consts.py @@ -290,6 +290,12 @@ DETECTION_TYPES = ( # 'LB_STATUS_POLLING', ) +HEALTH_CHECK_TYPES = ( + EVENTS, POLLING, +) = ( + 'EVENTS', 'POLLING' +) + RECOVERY_ACTIONS = ( RECOVER_REBOOT, RECOVER_REBUILD, RECOVER_RECREATE, ) = ( diff --git a/senlin/drivers/os_test/nova_v2.py b/senlin/drivers/os_test/nova_v2.py index 655991064..e525f55c9 100644 --- a/senlin/drivers/os_test/nova_v2.py +++ b/senlin/drivers/os_test/nova_v2.py @@ -243,6 +243,9 @@ class NovaClient(base.DriverBase): def server_delete(self, server, ignore_missing=True): return + def server_stop(self, server): + return + def server_force_delete(self, server, ignore_missing=True): return diff --git a/senlin/engine/health_manager.py b/senlin/engine/health_manager.py index 4537d2417..dcf5cdac4 100644 --- a/senlin/engine/health_manager.py +++ b/senlin/engine/health_manager.py @@ -19,6 +19,7 @@ health policies. from collections import defaultdict from collections import namedtuple +import eventlet from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging @@ -27,7 +28,6 @@ from oslo_service import threadgroup from oslo_utils import timeutils import re import tenacity -import time from senlin.common import consts from senlin.common import context @@ -300,7 +300,7 @@ class NodePollStatusHealthCheck(HealthCheckType): class NodePollUrlHealthCheck(HealthCheckType): @staticmethod - def _convert_detection_tuple(dictionary): + def convert_detection_tuple(dictionary): return namedtuple('DetectionMode', dictionary.keys())(**dictionary) def _expand_url_template(self, url_template, node): @@ -333,23 +333,23 @@ class NodePollUrlHealthCheck(HealthCheckType): verify=verify_ssl) except Exception as ex: if conn_error_as_unhealthy: - LOG.info('%s for %s: connection error when polling URL (%s)', + LOG.info("%s for %s: connection error when polling URL (%s)", consts.POLL_URL_FAIL, node.name, ex) return False else: - LOG.info('%s for %s: ignoring connection error when polling ' - 'URL (%s)', + LOG.info("%s for %s: ignoring connection error when polling " + "URL (%s)", consts.POLL_URL_PASS, node.name, ex) return True if not re.search(expected_resp_str, result): - LOG.info('%s for %s: did not find expected response string %s in ' - 'URL result (%s)', + LOG.info("%s for %s: did not find expected response string %s in " + "URL result (%s)", consts.POLL_URL_FAIL, node.name, expected_resp_str, result) return False - LOG.info('%s for %s: matched expected response string.', + LOG.info("%s for %s: matched expected response string.", consts.POLL_URL_PASS, node.name) return True @@ -377,8 +377,8 @@ class NodePollUrlHealthCheck(HealthCheckType): try: if node.status != consts.NS_ACTIVE: - LOG.info('%s for %s: node is not in ACTIVE state, so skip ' - 'poll url', + LOG.info("%s for %s: node is not in ACTIVE state, so skip " + "poll url", consts.POLL_URL_PASS, node.name) return True @@ -395,7 +395,7 @@ class NodePollUrlHealthCheck(HealthCheckType): self._node_within_grace_period(node)) except Exception as ex: LOG.warning( - '%s for %s: Ignoring error on poll URL: %s', + "%s for %s: Ignoring error on poll URL: %s", consts.POLL_URL_PASS, node.name, ex ) @@ -403,44 +403,352 @@ class NodePollUrlHealthCheck(HealthCheckType): return True -class HealthManager(service.Service): +class HealthCheck(object): - def __init__(self, engine_service, topic, version): - super(HealthManager, self).__init__() - - self.TG = threadgroup.ThreadGroup() - self.engine_id = engine_service.engine_id - self.topic = topic - self.version = version - self.ctx = context.get_admin_context() + def __init__(self, ctx, engine_id, cluster_id, check_type, interval, + node_update_timeout, params, enabled): self.rpc_client = rpc_client.EngineClient() - self.rt = { - 'registries': [], - } - self.health_check_types = defaultdict(lambda: []) + self.ctx = ctx + self.engine_id = engine_id - def task(self): - """Task that is queued on the health manager thread group. + self.cluster_id = cluster_id + self.check_type = check_type + self.interval = interval + self.node_update_timeout = node_update_timeout + self.params = params + self.enabled = enabled + self.timer = None + self.listener = None - The task is here so that the service always has something to wait() - on, or else the process will exit. - """ + self.health_check_types = [] + self.recover_action = {} + self.type = None + self.get_health_check_types() + self.get_recover_actions() + + def get_health_check_types(self): + polling_types = [consts.NODE_STATUS_POLLING, + consts.NODE_STATUS_POLL_URL] + + detection_types = self.check_type.split(',') + if all(check in polling_types for check in detection_types): + interval = min(self.interval, cfg.CONF.check_interval_max) + for check in detection_types: + self.health_check_types.append( + HealthCheckType.factory( + check, self.cluster_id, interval, self.params + ) + ) + self.type = consts.POLLING + elif (len(detection_types) == 1 and + detection_types[0] == consts.LIFECYCLE_EVENTS): + self.type = consts.EVENTS + + def get_recover_actions(self): + if 'node_delete_timeout' in self.params: + self.recover_action['delete_timeout'] = self.params[ + 'node_delete_timeout'] + if 'node_force_recreate' in self.params: + self.recover_action['force_recreate'] = self.params[ + 'node_force_recreate'] + if 'recover_action' in self.params: + rac = self.params['recover_action'] + for operation in rac: + self.recover_action['operation'] = operation.get('name') + + def execute_health_check(self): start_time = timeutils.utcnow(True) try: - self._load_runtime_registry() + if not self.health_check_types: + LOG.error("No health check types found for cluster: %s", + self.cluster_id) + return _chase_up(start_time, self.interval) + + cluster = objects.Cluster.get(self.ctx, self.cluster_id, + project_safe=False) + if not cluster: + LOG.warning("Cluster (%s) is not found.", self.cluster_id) + return _chase_up(start_time, self.interval) + + ctx = context.get_service_context(user_id=cluster.user, + project_id=cluster.project) + + actions = [] + + # loop through nodes and run all health checks on each node + nodes = objects.Node.get_all_by_cluster(ctx, self.cluster_id) + + for node in nodes: + action = self._check_node_health(ctx, node, cluster) + if action: + actions.append(action) + + for a in actions: + # wait for action to complete + res, reason = self._wait_for_action( + ctx, a['action'], self.node_update_timeout) + if not res: + LOG.warning("Node recovery action %s did not complete " + "within specified timeout: %s", a['action'], + reason) + + if len(actions) == 0: + LOG.info("Health check passed for all nodes in cluster %s.", + self.cluster_id) except Exception as ex: - LOG.error("Failed when running '_load_runtime_registry': %s", ex) - return _chase_up(start_time, cfg.CONF.periodic_interval, - name='Health manager task') + LOG.warning("Error while performing health check: %s", ex) - def _add_listener(self, cluster_id, recover_action): - """Routine to be executed for adding cluster listener. + finally: + return _chase_up(start_time, self.interval) - :param cluster_id: The UUID of the cluster to be filtered. - :param recover_action: The health policy action name. - :returns: Nothing. + def _check_node_health(self, ctx, node, cluster): + node_is_healthy = True + + if self.params['recovery_conditional'] == consts.ANY_FAILED: + # recovery happens if any detection mode fails + # i.e. the inverse logic is that node is considered healthy + # if all detection modes pass + node_is_healthy = all( + hc.run_health_check(ctx, node) + for hc in self.health_check_types) + elif self.params['recovery_conditional'] == consts.ALL_FAILED: + # recovery happens if all detection modes fail + # i.e. the inverse logic is that node is considered healthy + # if any detection mode passes + node_is_healthy = any( + hc.run_health_check(ctx, node) + for hc in self.health_check_types) + else: + raise Exception("%s is an invalid recovery conditional" % + self.params['recovery_conditional']) + + if not node_is_healthy: + LOG.info("Health check failed for %s in %s and " + "recovery has started.", + node.name, cluster.name) + return self._recover_node(ctx, node.id) + + def _wait_for_action(self, ctx, action_id, timeout): + req = objects.ActionGetRequest(identity=action_id) + action = {} + with timeutils.StopWatch(timeout) as timeout_watch: + while not timeout_watch.expired(): + action = self.rpc_client.call(ctx, 'action_get', req) + if action['status'] in [consts.ACTION_SUCCEEDED, + consts.ACTION_FAILED, + consts.ACTION_CANCELLED]: + break + eventlet.sleep(2) + + if not action: + return False, "Failed to retrieve action." + + elif action['status'] == consts.ACTION_SUCCEEDED: + return True, "" + + elif (action['status'] == consts.ACTION_FAILED or + action['status'] == consts.ACTION_CANCELLED): + return False, "Cluster check action failed or cancelled" + + return False, ("Timeout while waiting for node recovery action to " + "finish") + + def _recover_node(self, ctx, node_id): + """Recover node + + :returns: Recover action """ + try: + req = objects.NodeRecoverRequest(identity=node_id, + params=self.recover_action) + + return self.rpc_client.call(ctx, 'node_recover', req) + except Exception as ex: + LOG.error("Error when performing node recovery for %s: %s", + node_id, ex) + return None + + def db_create(self): + try: + objects.HealthRegistry.create( + self.ctx, self.cluster_id, self.check_type, self.interval, + self.params, self.engine_id, self.enabled) + return True + except Exception as ex: + LOG.error("Error while adding health entry for cluster %s to " + "database: %s", self.cluster_id, ex) + return False + + def db_delete(self): + try: + objects.HealthRegistry.delete(self.ctx, self.cluster_id) + return True + except Exception as ex: + LOG.error("Error while removing health entry for cluster %s from " + "database: %s", self.cluster_id, ex) + return False + + def enable(self): + try: + objects.HealthRegistry.update(self.ctx, self.cluster_id, + {'enabled': True}) + self.enabled = True + return True + except Exception as ex: + LOG.error("Error while enabling health entry for cluster %s: %s", + self.cluster_id, ex) + return False + + def disable(self): + try: + objects.HealthRegistry.update(self.ctx, self.cluster_id, + {'enabled': False}) + self.enabled = False + return True + except Exception as ex: + LOG.error("Error while disabling health entry for cluster %s: %s", + self.cluster_id, ex) + return False + + +class RuntimeHealthRegistry(object): + + def __init__(self, ctx, engine_id, thread_group): + self.ctx = ctx + self.engine_id = engine_id + self.rt = {} + self.TG = thread_group + self.health_check_types = defaultdict(lambda: []) + + @property + def registries(self): + return self.rt + + def register_cluster(self, cluster_id, interval=None, + node_update_timeout=None, params=None, + enabled=True): + """Register cluster to health registry. + + :param cluster_id: The ID of the cluster to be registered. + :param interval: An optional integer indicating the length of checking + periods in seconds. + :param node_update_timeout: Timeout to wait for node action to + complete. + :param dict params: Other parameters for the health check. + :param enabled: Boolean indicating if the health check is enabled. + :return: RuntimeHealthRegistry object for cluster + """ + params = params or {} + + # extract check_type from params + check_type = "" + if 'detection_modes' in params: + check_type = ','.join([ + NodePollUrlHealthCheck.convert_detection_tuple(d).type + for d in params['detection_modes'] + ]) + + # add node_update_timeout to params + params['node_update_timeout'] = node_update_timeout + entry = None + try: + entry = HealthCheck( + ctx=self.ctx, + engine_id=self.engine_id, + cluster_id=cluster_id, + check_type=check_type, + interval=interval, + node_update_timeout=node_update_timeout, + params=params, + enabled=enabled + ) + if entry.db_create(): + self.registries[cluster_id] = entry + self.add_health_check(self.registries[cluster_id]) + except Exception as ex: + LOG.error("Error while trying to register cluster for health " + "checks %s: %s", cluster_id, ex) + if entry: + entry.db_delete() + + def unregister_cluster(self, cluster_id): + """Unregister a cluster from health registry. + + :param cluster_id: The ID of the cluster to be unregistered. + :return: RuntimeHealthRegistry object for the cluster being + unregistered. + """ + entry = None + try: + if cluster_id in self.registries: + entry = self.registries.pop(cluster_id) + entry.db_delete() + + except Exception as ex: + LOG.error("Error while trying to unregister cluster from health" + "checks %s: %s", cluster_id, ex) + finally: + if entry: + self.remove_health_check(entry) + + def enable_cluster(self, cluster_id): + """Update the status of a cluster to enabled in the health registry. + + :param cluster_id: The ID of the cluster to be enabled. + """ + LOG.info("Enabling health check for cluster %s.", cluster_id) + try: + if cluster_id in self.registries: + if self.registries[cluster_id].enable(): + self.add_health_check(self.registries[cluster_id]) + else: + LOG.error("Unable to enable cluster for health checking: %s", + cluster_id) + except Exception as ex: + LOG.error("Error while enabling health checks for cluster %s: %s", + cluster_id, ex) + if cluster_id in self.registries: + self.remove_health_check(self.registries[cluster_id]) + + def disable_cluster(self, cluster_id): + """Update the status of a cluster to disabled in the health registry. + + :param cluster_id: The ID of the cluster to be disabled. + :return: None. + """ + LOG.info("Disabling health check for cluster %s.", cluster_id) + try: + if cluster_id in self.registries: + self.registries[cluster_id].disable() + else: + LOG.error("Unable to disable cluster for health checking: %s", + cluster_id) + except Exception as ex: + LOG.error("Error while disabling health checks for cluster %s: %s", + cluster_id, ex) + finally: + if cluster_id in self.registries: + self.remove_health_check(self.registries[cluster_id]) + + def _add_timer(self, cluster_id): + entry = self.registries[cluster_id] + if entry.timer: + LOG.error("Health check for cluster %s already exists", cluster_id) + return None + timer = self.TG.add_dynamic_timer(entry.execute_health_check, None, + None) + if timer: + entry.timer = timer + else: + LOG.error("Error creating timer for cluster: %s", cluster_id) + + def _add_listener(self, cluster_id): + entry = self.registries[cluster_id] + if entry.listener: + LOG.error("Listener for cluster %s already exists", cluster_id) + return + cluster = objects.Cluster.get(self.ctx, cluster_id, project_safe=False) if not cluster: LOG.warning("Cluster (%s) is not found.", cluster_id) @@ -453,237 +761,136 @@ class HealthManager(service.Service): elif profile_type == 'os.heat.stack': exchange = cfg.CONF.health_manager.heat_control_exchange else: - return None + return project = cluster.project - return self.TG.add_thread(ListenerProc, exchange, project, cluster_id, - recover_action) - - def _recover_node(self, node_id, ctx, recover_action): - """Recover node - - :returns: Recover action - """ - try: - req = objects.NodeRecoverRequest(identity=node_id, - params=recover_action) - - return self.rpc_client.call(ctx, 'node_recover', req) - except Exception as ex: - LOG.error('Error when performing node recovery for %s: %s', - node_id, ex) - return None - - def _wait_for_action(self, ctx, action_id, timeout): - req = objects.ActionGetRequest(identity=action_id) - with timeutils.StopWatch(timeout) as timeout_watch: - while not timeout_watch.expired(): - action = self.rpc_client.call(ctx, 'action_get', req) - if action['status'] in [ - consts.ACTION_SUCCEEDED, consts.ACTION_FAILED, - consts.ACTION_CANCELLED]: - break - time.sleep(2) - - if action['status'] == consts.ACTION_SUCCEEDED: - return True, "" - - if (action['status'] == consts.ACTION_FAILED or - action['status'] == consts.ACTION_CANCELLED): - return False, "Cluster check action failed or cancelled" - - return False, ("Timeout while waiting for node recovery action to " - "finish") - - def _add_health_check(self, cluster_id, health_check): - self.health_check_types[cluster_id].append(health_check) - - def _execute_health_check(self, interval, cluster_id, - recover_action, recovery_cond, - node_update_timeout): - start_time = timeutils.utcnow(True) - - try: - if cluster_id not in self.health_check_types: - LOG.error("Cluster (%s) is not found in health_check_types.", - self.cluster_id) - return _chase_up(start_time, interval) - - if len(self.health_check_types[cluster_id]) == 0: - LOG.error("No health check types found for Cluster (%s).", - self.cluster_id) - return _chase_up(start_time, interval) - - cluster = objects.Cluster.get(self.ctx, cluster_id, - project_safe=False) - if not cluster: - LOG.warning("Cluster (%s) is not found.", self.cluster_id) - return _chase_up(start_time, interval) - - ctx = context.get_service_context(user_id=cluster.user, - project_id=cluster.project) - - actions = [] - - # loop through nodes and run all health checks on each node - nodes = objects.Node.get_all_by_cluster(ctx, cluster_id) - - for node in nodes: - node_is_healthy = True - - if recovery_cond == consts.ANY_FAILED: - # recovery happens if any detection mode fails - # i.e. the inverse logic is that node is considered healthy - # if all detection modes pass - node_is_healthy = all( - hc.run_health_check(ctx, node) - for hc in self.health_check_types[cluster_id]) - elif recovery_cond == consts.ALL_FAILED: - # recovery happens if all detection modes fail - # i.e. the inverse logic is that node is considered healthy - # if any detection mode passes - node_is_healthy = any( - hc.run_health_check(ctx, node) - for hc in self.health_check_types[cluster_id]) - else: - raise Exception( - '{} is an invalid recovery conditional'.format( - recovery_cond)) - - if not node_is_healthy: - LOG.info("Health check failed for %s in %s and " - "recovery has started.", - node.name, cluster.name) - action = self._recover_node(node.id, ctx, - recover_action) - actions.append(action) - - for a in actions: - # wait for action to complete - res, reason = self._wait_for_action( - ctx, a['action'], node_update_timeout) - if not res: - LOG.warning("Node recovery action %s did not complete " - "within specified timeout: %s", a['action'], - reason) - - if len(actions) == 0: - LOG.info('Health check passed for all nodes in cluster %s.', - cluster_id) - except Exception as ex: - LOG.warning('Error while performing health check: %s', ex) - - return _chase_up(start_time, interval) - - def _start_check(self, entry): - """Routine for starting the checking for a cluster. - - :param entry: A dict containing the data associated with the cluster. - :returns: An updated registry entry record. - """ - LOG.info('Enabling health check for cluster %s.', entry['cluster_id']) - - cid = entry['cluster_id'] - ctype = entry['check_type'] - # Get the recover action parameter from the entry params - params = entry['params'] - - recover_action = {} - if 'node_delete_timeout' in params: - recover_action['delete_timeout'] = params['node_delete_timeout'] - if 'node_force_recreate' in params: - recover_action['force_recreate'] = params['node_force_recreate'] - if 'recover_action' in params: - rac = params['recover_action'] - for operation in rac: - recover_action['operation'] = operation.get('name') - - polling_types = [consts.NODE_STATUS_POLLING, - consts.NODE_STATUS_POLL_URL] - - detection_types = ctype.split(',') - if all(check in polling_types for check in detection_types): - interval = min(entry['interval'], cfg.CONF.check_interval_max) - for check in ctype.split(','): - self._add_health_check(cid, HealthCheckType.factory( - check, cid, interval, params)) - timer = self.TG.add_dynamic_timer(self._execute_health_check, - None, None, interval, cid, - recover_action, - params['recovery_conditional'], - params['node_update_timeout']) - - entry['timer'] = timer - elif (len(detection_types) == 1 and - detection_types[0] == consts.LIFECYCLE_EVENTS): - LOG.info("Start listening events for cluster (%s).", cid) - listener = self._add_listener(cid, recover_action) - if listener: - entry['listener'] = listener - else: - LOG.warning("Error creating listener for cluster %s", cid) - return None + listener = self.TG.add_thread(ListenerProc, exchange, project, + cluster_id, entry.recover_action) + if listener: + entry.listener = listener else: - LOG.error("Cluster %(id)s check type %(type)s is invalid.", - {'id': cid, 'type': ctype}) - return None + LOG.error("Error creating listener for cluster: %s", cluster_id) - return entry + def add_health_check(self, entry): + """Add a health check to the RuntimeHealthRegistry. - def _stop_check(self, entry): - """Routine for stopping the checking for a cluster. + This method creates a timer/thread based on the type of health check + being added. - :param entry: A dict containing the data associated with the cluster. - :returns: ``None``. + :param entry: Entry to add to the registry. + :return: None """ - LOG.info('Disabling health check for cluster %s.', entry['cluster_id']) + if entry.cluster_id in self.registries: + if not entry.enabled: + return + elif entry.timer: + LOG.error("Health check for cluster %s already exists", + entry.cluster_id) + return + else: + LOG.error("Unable to add health check for cluster: %s", + entry.cluster_id) + return - timer = entry.get('timer', None) - if timer: + if entry.type == consts.POLLING: + self._add_timer(entry.cluster_id) + elif entry.type == consts.EVENTS: + LOG.info("Start listening events for cluster (%s).", + entry.cluster_id) + self._add_listener(entry.cluster_id) + else: + LOG.error("Cluster %(id)s type %(type)s is invalid.", + {'id': entry.cluster_id, 'type': entry.type}) + + def remove_health_check(self, entry): + """Remove a health check for the RuntimeHealthRegistry. + + This method stops and removes the timer/thread based to the type of + health check being removed. + + :param entry: + :return: None + """ + if entry.timer: # stop timer - timer.stop() + entry.timer.stop() try: # tell threadgroup to remove timer - self.TG.timer_done(timer) + self.TG.timer_done(entry.timer) except ValueError: pass + finally: + entry.timer = None - if entry['cluster_id'] in self.health_check_types: - self.health_check_types.pop(entry['cluster_id']) - return + if entry.listener: + try: + self.TG.thread_done(entry.listener) + entry.listener.stop() + except ValueError: + pass + finally: + entry.listener = None - listener = entry.get('listener', None) - if listener: - self.TG.thread_done(listener) - listener.stop() - return - - def _load_runtime_registry(self): + def load_runtime_registry(self): """Load the initial runtime registry with a DB scan.""" db_registries = objects.HealthRegistry.claim(self.ctx, self.engine_id) - for r in db_registries: + for registry in db_registries: + if registry.cluster_id in self.registries: + LOG.warning("Skipping duplicate health check for cluster: %s", + registry.cluster_id) # Claiming indicates we claim a health registry who's engine was # dead, and we will update the health registry's engine_id with # current engine id. But we may not start check always. - entry = { - 'cluster_id': r.cluster_id, - 'check_type': r.check_type, - 'interval': r.interval, - 'params': r.params, - 'enabled': r.enabled, - } + entry = HealthCheck( + ctx=self.ctx, + engine_id=self.engine_id, + cluster_id=registry.cluster_id, + check_type=registry.check_type, + interval=registry.interval, + node_update_timeout=registry.params['node_update_timeout'], + params=registry.params, + enabled=registry.enabled + ) LOG.info("Loading cluster %(c)s enabled=%(e)s for " "health monitoring", - {'c': r.cluster_id, 'e': r.enabled}) - if r.enabled: - # Stop any running checks for entry before starting. - self._stop_check(entry) - entry = self._start_check(entry) - if entry: - self.rt['registries'].append(entry) + {'c': registry.cluster_id, 'e': registry.enabled}) + self.registries[registry.cluster_id] = entry + if registry.enabled: + self.add_health_check(self.registries[registry.cluster_id]) + + +class HealthManager(service.Service): + + def __init__(self, engine_service, topic, version): + super(HealthManager, self).__init__() + + self.TG = threadgroup.ThreadGroup( + thread_pool_size=cfg.CONF.health_manager_thread_pool_size) + self.engine_id = engine_service.engine_id + self.topic = topic + self.version = version + self.ctx = context.get_admin_context() + self.rpc_client = rpc_client.EngineClient() + self.health_registry = RuntimeHealthRegistry( + ctx=self.ctx, engine_id=self.engine_id, thread_group=self.TG) + + def task(self): + """Task that is queued on the health manager thread group. + + The task is here so that the service always has something to wait() + on, or else the process will exit. + """ + start_time = timeutils.utcnow(True) + + try: + self.health_registry.load_runtime_registry() + except Exception as ex: + LOG.error("Failed when loading runtime for health manager: %s", ex) + return _chase_up(start_time, cfg.CONF.periodic_interval, + name='Health manager task') def start(self): """Start the health manager RPC server. @@ -705,7 +912,7 @@ class HealthManager(service.Service): @property def registries(self): - return self.rt['registries'] + return self.health_registry.registries def listening(self, ctx): """Respond to confirm that the rpc service is still alive.""" @@ -714,45 +921,24 @@ class HealthManager(service.Service): def register_cluster(self, ctx, cluster_id, interval=None, node_update_timeout=None, params=None, enabled=True): - """Register cluster for health checking. + """Register a cluster for health checking. :param ctx: The context of notify request. - :param cluster_id: The ID of the cluster to be checked. - :param interval: An optional integer indicating the length of checking - periods in seconds. - :param dict params: Other parameters for the health check. + :param cluster_id: The ID of the cluster to be unregistered. + :param interval: Interval of the health check. + :param node_update_timeout: Time to wait before declairing a node + unhealthy. + :param params: Params to be passed to health check. + :param enabled: Set's if the health check is enabled or disabled. :return: None """ - params = params or {} - - # extract check_type from params - check_type = "" - if 'detection_modes' in params: - check_type = ','.join([ - NodePollUrlHealthCheck._convert_detection_tuple(d).type - for d in params['detection_modes'] - ]) - - # add node_update_timeout to params - params['node_update_timeout'] = node_update_timeout - - registry = objects.HealthRegistry.create(ctx, cluster_id, check_type, - interval, params, - self.engine_id, - enabled=enabled) - - entry = { - 'cluster_id': registry.cluster_id, - 'check_type': registry.check_type, - 'interval': registry.interval, - 'params': registry.params, - 'enabled': registry.enabled - } - - if registry.enabled: - self._start_check(entry) - - self.rt['registries'].append(entry) + LOG.info("Registering health check for cluster %s.", cluster_id) + self.health_registry.register_cluster( + cluster_id=cluster_id, + interval=interval, + node_update_timeout=node_update_timeout, + params=params, + enabled=enabled) def unregister_cluster(self, ctx, cluster_id): """Unregister a cluster from health checking. @@ -761,29 +947,14 @@ class HealthManager(service.Service): :param cluster_id: The ID of the cluster to be unregistered. :return: None """ - for i in range(len(self.rt['registries']) - 1, -1, -1): - entry = self.rt['registries'][i] - if entry.get('cluster_id') == cluster_id: - self._stop_check(entry) - self.rt['registries'].pop(i) - objects.HealthRegistry.delete(ctx, cluster_id) - LOG.debug('unregister done') + LOG.info("Unregistering health check for cluster %s.", cluster_id) + self.health_registry.unregister_cluster(cluster_id) def enable_cluster(self, ctx, cluster_id, params=None): - for c in self.rt['registries']: - if c['cluster_id'] == cluster_id and not c['enabled']: - c['enabled'] = True - objects.HealthRegistry.update(ctx, cluster_id, - {'enabled': True}) - self._start_check(c) + self.health_registry.enable_cluster(cluster_id) def disable_cluster(self, ctx, cluster_id, params=None): - for c in self.rt['registries']: - if c['cluster_id'] == cluster_id and c['enabled']: - c['enabled'] = False - objects.HealthRegistry.update(ctx, cluster_id, - {'enabled': False}) - self._stop_check(c) + self.health_registry.disable_cluster(cluster_id) def notify(engine_id, method, **kwargs): diff --git a/senlin/engine/service.py b/senlin/engine/service.py index d2e1702e0..cb9b361f9 100755 --- a/senlin/engine/service.py +++ b/senlin/engine/service.py @@ -2264,6 +2264,7 @@ class EngineService(service.Service): query['filters'] = filters actions = action_obj.Action.get_all(ctx, **query) + return [a.to_dict() for a in actions] @request_context diff --git a/senlin/tests/unit/engine/test_health_manager.py b/senlin/tests/unit/engine/test_health_manager.py index a0f4f0224..4a58a3c76 100644 --- a/senlin/tests/unit/engine/test_health_manager.py +++ b/senlin/tests/unit/engine/test_health_manager.py @@ -10,13 +10,11 @@ # License for the specific language governing permissions and limitations # under the License. -import copy import re import time import mock from oslo_config import cfg -from oslo_service import threadgroup from oslo_utils import timeutils as tu from senlin.common import consts @@ -953,324 +951,62 @@ class TestNodePollUrlHealthCheck(base.SenlinTestCase): ) -class TestHealthManager(base.SenlinTestCase): +class TestHealthCheck(base.SenlinTestCase): def setUp(self): - super(TestHealthManager, self).setUp() - - mock_eng = mock.Mock() - mock_eng.engine_id = 'ENGINE_ID' - topic = consts.HEALTH_MANAGER_TOPIC - version = consts.RPC_API_VERSION - self.hm = hm.HealthManager(mock_eng, topic, version) - - def test_init(self): - self.assertEqual('ENGINE_ID', self.hm.engine_id) - self.assertIsNotNone(self.hm.TG) - self.assertIsNotNone(self.hm.rpc_client) - self.assertEqual(consts.HEALTH_MANAGER_TOPIC, self.hm.topic) - self.assertEqual(consts.RPC_API_VERSION, self.hm.version) - self.assertEqual(0, len(self.hm.rt['registries'])) - - @mock.patch.object(hm.HealthManager, "_load_runtime_registry") - def test_task(self, mock_load): - self.hm.task() - mock_load.assert_called_once_with() - - @mock.patch.object(hm.HealthManager, "_stop_check") - @mock.patch.object(hm.HealthManager, "_start_check") - @mock.patch.object(hr.HealthRegistry, 'claim') - def test_load_runtime_registry(self, mock_claim, mock_check, mock_stop): - fake_claims = [ - { - 'cluster_id': 'CID1', - 'check_type': consts.NODE_STATUS_POLLING, - 'interval': 12, - 'params': {'k1': 'v1'}, - 'enabled': True, - }, - { - 'cluster_id': 'CID2', - 'check_type': consts.NODE_STATUS_POLLING, - 'interval': 34, - 'params': {'k2': 'v2'}, - 'enabled': False, - }, - ] - mock_claim.return_value = [ - mock.Mock(**fake_claims[0]), - mock.Mock(**fake_claims[1]), - ] - mock_check.return_value = fake_claims - - # do it - self.hm._load_runtime_registry() - - # assertions - mock_claim.assert_called_once_with(self.hm.ctx, self.hm.engine_id) - mock_check.assert_has_calls( - [ - mock.call(fake_claims[0]) - ] - ) - mock_stop.assert_called_once_with(fake_claims[0]) - - @mock.patch.object(obj_profile.Profile, 'get') - @mock.patch.object(obj_cluster.Cluster, 'get') - def test_add_listener_nova(self, mock_cluster, mock_profile): - cfg.CONF.set_override('nova_control_exchange', 'FAKE_NOVA_EXCHANGE', - group='health_manager') - x_listener = mock.Mock() - mock_add_thread = self.patchobject(self.hm.TG, 'add_thread', - return_value=x_listener) - x_cluster = mock.Mock(project='PROJECT_ID', profile_id='PROFILE_ID') - mock_cluster.return_value = x_cluster - x_profile = mock.Mock(type='os.nova.server-1.0') - mock_profile.return_value = x_profile - - recover_action = {'operation': 'REBUILD'} - # do it - res = self.hm._add_listener('CLUSTER_ID', recover_action) - - # assertions - self.assertEqual(x_listener, res) - mock_cluster.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', - project_safe=False) - mock_profile.assert_called_once_with(self.hm.ctx, 'PROFILE_ID', - project_safe=False) - mock_add_thread.assert_called_once_with( - hm.ListenerProc, 'FAKE_NOVA_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID', - recover_action) - - @mock.patch.object(obj_profile.Profile, 'get') - @mock.patch.object(obj_cluster.Cluster, 'get') - def test_add_listener_heat(self, mock_cluster, mock_profile): - cfg.CONF.set_override('heat_control_exchange', 'FAKE_HEAT_EXCHANGE', - group='health_manager') - x_listener = mock.Mock() - mock_add_thread = self.patchobject(self.hm.TG, 'add_thread', - return_value=x_listener) - x_cluster = mock.Mock(project='PROJECT_ID', profile_id='PROFILE_ID') - mock_cluster.return_value = x_cluster - x_profile = mock.Mock(type='os.heat.stack-1.0') - mock_profile.return_value = x_profile - - recover_action = {'operation': 'REBUILD'} - # do it - res = self.hm._add_listener('CLUSTER_ID', recover_action) - - # assertions - self.assertEqual(x_listener, res) - mock_cluster.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', - project_safe=False) - mock_profile.assert_called_once_with(self.hm.ctx, 'PROFILE_ID', - project_safe=False) - mock_add_thread.assert_called_once_with( - hm.ListenerProc, 'FAKE_HEAT_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID', - recover_action) - - @mock.patch.object(obj_profile.Profile, 'get') - @mock.patch.object(obj_cluster.Cluster, 'get') - def test_add_listener_other_types(self, mock_cluster, mock_profile): - mock_add_thread = self.patchobject(self.hm.TG, 'add_thread') - x_cluster = mock.Mock(project='PROJECT_ID', profile_id='PROFILE_ID') - mock_cluster.return_value = x_cluster - x_profile = mock.Mock(type='other.types-1.0') - mock_profile.return_value = x_profile - - recover_action = {'operation': 'REBUILD'} - # do it - res = self.hm._add_listener('CLUSTER_ID', recover_action) - - # assertions - self.assertIsNone(res) - mock_cluster.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', - project_safe=False) - mock_profile.assert_called_once_with(self.hm.ctx, 'PROFILE_ID', - project_safe=False) - self.assertFalse(mock_add_thread.called) - - @mock.patch.object(obj_cluster.Cluster, 'get') - def test_add_listener_cluster_not_found(self, mock_get): - mock_get.return_value = None - mock_add_thread = self.patchobject(self.hm.TG, 'add_thread') - - recover_action = {'operation': 'REBUILD'} - # do it - res = self.hm._add_listener('CLUSTER_ID', recover_action) - - # assertions - self.assertIsNone(res) - mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', - project_safe=False) - self.assertEqual(0, mock_add_thread.call_count) - - @mock.patch.object(rpc_client.EngineClient, 'call') - @mock.patch('senlin.objects.NodeRecoverRequest', autospec=True) - def test_recover_node(self, mock_req, mock_rpc): + super(TestHealthCheck, self).setUp() ctx = mock.Mock() - node_id = 'FAKE_NODE' - recover_action = {'operation': 'REBUILD'} + self.fake_rpc = mock.Mock() + with mock.patch.object(rpc_client, 'EngineClient', + return_value=self.fake_rpc): + self.hc = hm.HealthCheck( + ctx=ctx, + engine_id='ENGINE_ID', + cluster_id='CID', + check_type=consts.NODE_STATUS_POLLING, + interval=60, + node_update_timeout=60, + params={ + 'node_update_timeout': 60, + 'detection_modes': [ + {'type': consts.NODE_STATUS_POLLING} + ], + 'recovery_conditional': consts.ANY_FAILED + }, + enabled=True) - x_req = mock.Mock - mock_req.return_value = x_req + def test_get_health_check_types_polling(self): + self.hc.get_health_check_types() + self.assertEqual(consts.POLLING, self.hc.type) - x_action = {'action': 'RECOVER_ID1'} - mock_rpc.return_value = x_action + def test_get_health_check_types_events(self): + self.hc.check_type = consts.LIFECYCLE_EVENTS + self.hc.get_health_check_types() + self.assertEqual(consts.EVENTS, self.hc.type) - # do it - res = self.hm._recover_node(node_id, ctx, recover_action) - - self.assertEqual(x_action, res) - mock_req.assert_called_once_with( - identity=node_id, params=recover_action) - mock_rpc.assert_called_once_with(ctx, 'node_recover', x_req) - - @mock.patch.object(rpc_client.EngineClient, 'call') - @mock.patch('senlin.objects.NodeRecoverRequest', autospec=True) - def test_recover_node_failed(self, mock_req, mock_rpc): - ctx = mock.Mock() - node_id = 'FAKE_NODE' - recover_action = {'operation': 'REBUILD'} - - x_req = mock.Mock - mock_req.return_value = x_req - - mock_rpc.side_effect = Exception('boom') - - # do it - res = self.hm._recover_node(node_id, ctx, recover_action) - - self.assertIsNone(res) - mock_req.assert_called_once_with( - identity=node_id, params=recover_action) - mock_rpc.assert_called_once_with(ctx, 'node_recover', x_req) - - @mock.patch('senlin.objects.ActionGetRequest') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_wait_for_action(self, mock_rpc, mock_action_req): - x_req = mock.Mock() - mock_action_req.return_value = x_req - - x_action = {'status': consts.ACTION_SUCCEEDED} - mock_rpc.return_value = x_action - - ctx = mock.Mock() - action_id = 'FAKE_ACTION_ID' - timeout = 5 - - # do it - res, err = self.hm._wait_for_action(ctx, action_id, timeout) - - self.assertTrue(res) - self.assertEqual(err, '') - mock_rpc.assert_called_with(ctx, 'action_get', x_req) - - @mock.patch('senlin.objects.ActionGetRequest') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_wait_for_action_success_before_timeout( - self, mock_rpc, mock_action_req): - x_req = mock.Mock() - mock_action_req.return_value = x_req - - x_action1 = {'status': consts.ACTION_RUNNING} - x_action2 = {'status': consts.ACTION_SUCCEEDED} - mock_rpc.side_effect = [x_action1, x_action2] - - ctx = mock.Mock() - action_id = 'FAKE_ACTION_ID' - timeout = 5 - - # do it - res, err = self.hm._wait_for_action(ctx, action_id, timeout) - - self.assertTrue(res) - self.assertEqual(err, '') - mock_rpc.assert_has_calls( - [ - mock.call(ctx, 'action_get', x_req), - mock.call(ctx, 'action_get', x_req) - ] - ) - - @mock.patch('senlin.objects.ActionGetRequest') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_wait_for_action_timeout(self, mock_rpc, mock_action_req): - x_req = mock.Mock() - mock_action_req.return_value = x_req - - x_action = {'status': consts.ACTION_RUNNING} - mock_rpc.return_value = x_action - - ctx = mock.Mock() - action_id = 'FAKE_ACTION_ID' - timeout = 5 - - # do it - res, err = self.hm._wait_for_action(ctx, action_id, timeout) - - self.assertFalse(res) - self.assertTrue(re.search('timeout', err, re.IGNORECASE)) - mock_rpc.assert_has_calls( - [ - mock.call(ctx, 'action_get', x_req) - ] - ) - - @mock.patch('senlin.objects.ActionGetRequest') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_wait_for_action_failed(self, mock_rpc, mock_action_req): - x_req = mock.Mock() - mock_action_req.return_value = x_req - - x_action = {'status': consts.ACTION_FAILED} - mock_rpc.return_value = x_action - - ctx = mock.Mock() - action_id = 'FAKE_ACTION_ID' - timeout = 5 - - # do it - res, err = self.hm._wait_for_action(ctx, action_id, timeout) - - self.assertFalse(res) - self.assertEqual(err, 'Cluster check action failed or cancelled') - mock_rpc.assert_called_with(ctx, 'action_get', x_req) - - @mock.patch('senlin.objects.ActionGetRequest') - @mock.patch.object(rpc_client.EngineClient, 'call') - def test_wait_for_action_cancelled(self, mock_rpc, mock_action_req): - x_req = mock.Mock() - mock_action_req.return_value = x_req - - x_action = {'status': consts.ACTION_CANCELLED} - mock_rpc.return_value = x_action - - ctx = mock.Mock() - action_id = 'FAKE_ACTION_ID' - timeout = 5 - - # do it - res, err = self.hm._wait_for_action(ctx, action_id, timeout) - - self.assertFalse(res) - self.assertEqual(err, 'Cluster check action failed or cancelled') - mock_rpc.assert_called_with(ctx, 'action_get', x_req) + def test_get_recover_actions(self): + self.hc.params = { + 'node_delete_timeout': 60, + 'node_force_recreate': True, + 'recover_action': [{'name': 'FAKE_RECOVER_ACTION'}] + } + self.hc.get_recover_actions() + self.assertEqual(self.hc.params['node_delete_timeout'], + self.hc.recover_action['delete_timeout']) + self.assertEqual(self.hc.params['node_force_recreate'], + self.hc.recover_action['force_recreate']) + self.assertEqual(self.hc.params['recover_action'][0]['name'], + self.hc.recover_action['operation']) @mock.patch.object(obj_node.Node, 'get_all_by_cluster') - @mock.patch.object(hm.HealthManager, "_recover_node") - @mock.patch.object(hm.HealthManager, "_wait_for_action") + @mock.patch.object(hm.HealthCheck, "_recover_node") + @mock.patch.object(hm.HealthCheck, "_wait_for_action") @mock.patch.object(obj_cluster.Cluster, 'get') @mock.patch.object(context, 'get_service_context') def test_execute_health_check_any_mode_healthy( self, mock_ctx, mock_get, mock_wait, mock_recover, mock_nodes): - cluster_id = 'CLUSTER_ID' - interval = 1 - recovery_cond = consts.ANY_FAILED - node_update_timeout = 1 - recovery_action = {'operation': 'REBUILD'} - - x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID') + x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID', + id='CID') mock_get.return_value = x_cluster ctx = mock.Mock() @@ -1292,12 +1028,8 @@ class TestHealthManager(base.SenlinTestCase): ], ] - self.hm.cluster_id = cluster_id - for hc_mocks in hc_test_values: - self.hm.health_check_types = { - cluster_id: hc_mocks - } + self.hc.health_check_types = hc_mocks mock_get.reset_mock() mock_ctx.reset_mock() @@ -1305,11 +1037,9 @@ class TestHealthManager(base.SenlinTestCase): mock_wait.reset_mock() # do it - self.hm._execute_health_check(interval, cluster_id, - recovery_action, - recovery_cond, node_update_timeout) + self.hc.execute_health_check() - mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', + mock_get.assert_called_once_with(self.hc.ctx, 'CID', project_safe=False) mock_ctx.assert_called_once_with(user_id=x_cluster.user, project_id=x_cluster.project) @@ -1326,187 +1056,20 @@ class TestHealthManager(base.SenlinTestCase): mock_wait.assert_not_called() @mock.patch.object(obj_node.Node, 'get_all_by_cluster') - @mock.patch.object(hm.HealthManager, "_recover_node") - @mock.patch.object(hm.HealthManager, "_wait_for_action") - @mock.patch.object(obj_cluster.Cluster, 'get') - @mock.patch.object(context, 'get_service_context') - def test_execute_health_check_any_mode_unhealthy( - self, mock_ctx, mock_get, mock_wait, mock_recover, mock_nodes): - cluster_id = 'CLUSTER_ID' - interval = 1 - recovery_cond = consts.ANY_FAILED - node_update_timeout = 1 - recovery_action = {'operation': 'REBUILD'} - - x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID') - mock_get.return_value = x_cluster - - ctx = mock.Mock() - mock_ctx.return_value = ctx - - mock_wait.return_value = (True, "") - - x_node = mock.Mock(id='FAKE_NODE', status="ERROR") - mock_nodes.return_value = [x_node] - - mock_recover.return_value = {'action': 'FAKE_ACTION_ID'} - - hc_true = {'run_health_check.return_value': True} - hc_false = {'run_health_check.return_value': False} - - hc_test_values = [ - [ - mock.Mock(**hc_false), - mock.Mock(**hc_true), - mock.Mock(**hc_true), - ], - [ - mock.Mock(**hc_true), - mock.Mock(**hc_false), - mock.Mock(**hc_true), - ], - [ - mock.Mock(**hc_true), - mock.Mock(**hc_true), - mock.Mock(**hc_false), - ] - ] - - for hc_mocks in hc_test_values: - self.hm.health_check_types = { - cluster_id: hc_mocks - } - - mock_get.reset_mock() - mock_ctx.reset_mock() - mock_recover.reset_mock() - mock_wait.reset_mock() - - # do it - self.hm._execute_health_check(interval, cluster_id, - recovery_action, - recovery_cond, node_update_timeout) - - mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', - project_safe=False) - mock_ctx.assert_called_once_with(user_id=x_cluster.user, - project_id=x_cluster.project) - - # health checks should be called until one of them returns false - previous_hc_returned_false = False - for mock_hc in hc_mocks: - if not previous_hc_returned_false: - mock_hc.run_health_check.assert_called_once_with( - ctx, x_node) - else: - mock_hc.assert_not_called() - if not mock_hc.run_health_check.return_value: - previous_hc_returned_false = True - - mock_recover.assert_called_once_with('FAKE_NODE', ctx, mock.ANY) - mock_wait.assert_called_once_with( - ctx, 'FAKE_ACTION_ID', node_update_timeout) - - @mock.patch.object(obj_node.Node, 'get_all_by_cluster') - @mock.patch.object(hm.HealthManager, "_recover_node") - @mock.patch.object(hm.HealthManager, "_wait_for_action") - @mock.patch.object(obj_cluster.Cluster, 'get') - @mock.patch.object(context, 'get_service_context') - def test_execute_health_check_all_mode_healthy( - self, mock_ctx, mock_get, mock_wait, mock_recover, mock_nodes): - cluster_id = 'CLUSTER_ID' - interval = 1 - recovery_cond = consts.ALL_FAILED - node_update_timeout = 1 - recovery_action = {'operation': 'REBUILD'} - - x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID') - mock_get.return_value = x_cluster - - ctx = mock.Mock() - mock_ctx.return_value = ctx - - mock_wait.return_value = (True, "") - - x_node = mock.Mock(id='FAKE_NODE1', status="ERROR") - mock_nodes.return_value = [x_node] - - hc_true = {'run_health_check.return_value': True} - hc_false = {'run_health_check.return_value': False} - - hc_test_values = [ - [ - mock.Mock(**hc_true), - mock.Mock(**hc_true), - mock.Mock(**hc_true), - ], - [ - mock.Mock(**hc_false), - mock.Mock(**hc_true), - mock.Mock(**hc_true), - ], - [ - mock.Mock(**hc_true), - mock.Mock(**hc_false), - mock.Mock(**hc_true), - ], - [ - mock.Mock(**hc_true), - mock.Mock(**hc_true), - mock.Mock(**hc_false), - ], - ] - - self.hm.cluster_id = cluster_id - - for hc_mocks in hc_test_values: - self.hm.health_check_types = { - cluster_id: hc_mocks - } - - mock_get.reset_mock() - mock_ctx.reset_mock() - mock_recover.reset_mock() - mock_wait.reset_mock() - - # do it - self.hm._execute_health_check(interval, cluster_id, - recovery_action, - recovery_cond, node_update_timeout) - - mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', - project_safe=False) - mock_ctx.assert_called_once_with(user_id=x_cluster.user, - project_id=x_cluster.project) - - # health checks should be called until one of them returns true - previous_hc_returned_true = False - for mock_hc in hc_mocks: - if not previous_hc_returned_true: - mock_hc.run_health_check.assert_called_once_with( - ctx, x_node) - else: - mock_hc.assert_not_called() - if mock_hc.run_health_check.return_value: - previous_hc_returned_true = True - - mock_recover.assert_not_called() - mock_wait.assert_not_called() - - @mock.patch.object(obj_node.Node, 'get_all_by_cluster') - @mock.patch.object(hm.HealthManager, "_recover_node") - @mock.patch.object(hm.HealthManager, "_wait_for_action") + @mock.patch.object(hm.HealthCheck, "_recover_node") + @mock.patch.object(hm.HealthCheck, "_wait_for_action") @mock.patch.object(obj_cluster.Cluster, 'get') @mock.patch.object(context, 'get_service_context') def test_execute_health_check_all_mode_unhealthy( self, mock_ctx, mock_get, mock_wait, mock_recover, mock_nodes): - cluster_id = 'CLUSTER_ID' - interval = 1 - recovery_cond = consts.ALL_FAILED - node_update_timeout = 1 - recovery_action = {'operation': 'REBUILD'} + self.hc.cluster_id = 'CLUSTER_ID' + self.hc.interval = 1 + self.hc.recovery_cond = consts.ALL_FAILED + self.hc.node_update_timeout = 1 + self.hc.recovery_action = {'operation': 'REBUILD'} - x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID') + x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID', + id='CLUSTER_ID') mock_get.return_value = x_cluster ctx = mock.Mock() @@ -1524,18 +1087,11 @@ class TestHealthManager(base.SenlinTestCase): hc_test_values = [ [ mock.Mock(**hc_false), - mock.Mock(**hc_false), - mock.Mock(**hc_false), ] ] - self.hm.cluster_id = cluster_id - self.hm.node_update_timeout = 1 - for hc_mocks in hc_test_values: - self.hm.health_check_types = { - cluster_id: hc_mocks - } + self.hc.health_check_types = hc_mocks mock_get.reset_mock() mock_ctx.reset_mock() @@ -1543,242 +1099,652 @@ class TestHealthManager(base.SenlinTestCase): mock_wait.reset_mock() # do it - self.hm._execute_health_check(interval, cluster_id, - recovery_action, - recovery_cond, node_update_timeout) + self.hc.execute_health_check() - mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', + mock_get.assert_called_once_with(self.hc.ctx, 'CLUSTER_ID', project_safe=False) mock_ctx.assert_called_once_with(user_id=x_cluster.user, project_id=x_cluster.project) - # all health checks should be called for mock_hc in hc_mocks: - mock_hc.run_health_check.assert_called_once_with(ctx, x_node) + mock_hc.run_health_check.assert_has_calls( + [ + mock.call(ctx, x_node) + ] + ) - mock_recover.assert_called_once_with('FAKE_NODE', ctx, mock.ANY) + mock_recover.assert_called_once_with(ctx, 'FAKE_NODE') mock_wait.assert_called_once_with( - ctx, 'FAKE_ACTION_ID', self.hm.node_update_timeout) + ctx, 'FAKE_ACTION_ID', self.hc.node_update_timeout) @mock.patch.object(obj_cluster.Cluster, 'get') @mock.patch.object(context, 'get_service_context') def test_execute_health_check_cluster_not_found(self, mock_ctx, mock_get): - cluster_id = 'CLUSTER_ID' - interval = 1 - recovery_cond = consts.ANY_FAILED - node_update_timeout = 1 - recovery_action = {'operation': 'REBUILD'} - mock_get.return_value = None - # do it - self.hm._execute_health_check(interval, cluster_id, - recovery_action, recovery_cond, - node_update_timeout) + self.hc.execute_health_check() mock_ctx.assert_not_called() - def test_start_check_invalid_type(self): - entry = { - 'cluster_id': 'CCID', - 'interval': 12, - 'check_type': 'blah', - 'params': { - 'recover_action': [{'name': 'REBUILD'}] - }, - } + @mock.patch.object(hm.HealthCheck, "_recover_node") + def test_check_node_health_any_failed(self, mock_recover): + x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID', + id='CLUSTER_ID') + x_node = mock.Mock(id='FAKE_NODE', status="ERROR") + ctx = mock.Mock() - res = self.hm._start_check(entry) + self.hc.params['recovery_conditional'] = consts.ANY_FAILED + mock_hc_1 = mock.Mock() + mock_hc_1.run_health_check.return_value = True + mock_hc_2 = mock.Mock() + mock_hc_2.run_health_check.return_value = False - self.assertIsNone(res) + self.hc.health_check_types = [mock_hc_1, mock_hc_2] - @mock.patch.object(threadgroup.ThreadGroup, 'add_dynamic_timer') - @mock.patch.object(hm.HealthManager, '_add_health_check') - @mock.patch.object(hm.HealthCheckType, 'factory') - def test_start_check_for_polling(self, mock_hc_factory, mock_add_hc, - mock_add_timer): - x_timer = mock.Mock() - mock_add_timer.return_value = x_timer + self.hc._check_node_health(ctx, x_node, x_cluster) - entry = { - 'cluster_id': 'CCID', - 'interval': 12, - 'check_type': consts.NODE_STATUS_POLLING, - 'params': { - 'recover_action': [{'name': 'REBUILD'}], - 'recovery_conditional': 'ANY_FAILED', - 'node_update_timeout': 1, - }, - } + mock_hc_1.run_health_check.assert_called_once_with(ctx, x_node) + mock_hc_2.run_health_check.assert_called_once_with(ctx, x_node) + mock_recover.assert_called_once_with(ctx, x_node.id) - res = self.hm._start_check(entry) + @mock.patch.object(hm.HealthCheck, "_recover_node") + def test_check_node_health_all_failed(self, mock_recover): + x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID', + id='CLUSTER_ID') + x_node = mock.Mock(id='FAKE_NODE', status="ERROR") + ctx = mock.Mock() - expected = copy.deepcopy(entry) - expected['timer'] = x_timer - self.assertEqual(expected, res) - mock_add_timer.assert_called_once_with( - self.hm._execute_health_check, None, None, 12, 'CCID', - {'operation': 'REBUILD'}, 'ANY_FAILED', 1) - mock_add_hc.assert_called_once_with('CCID', mock.ANY) - mock_hc_factory.assert_called_once_with( - consts.NODE_STATUS_POLLING, 'CCID', 12, entry['params']) + self.hc.params['recovery_conditional'] = consts.ALL_FAILED + mock_hc_1 = mock.Mock() + mock_hc_1.run_health_check.return_value = False + mock_hc_2 = mock.Mock() + mock_hc_2.run_health_check.return_value = False - @mock.patch.object(threadgroup.ThreadGroup, 'add_dynamic_timer') - @mock.patch.object(hm.HealthManager, '_add_health_check') - @mock.patch.object(hm.HealthCheckType, 'factory') - def test_start_check_for_poll_url(self, mock_hc_factory, mock_add_hc, - mock_add_timer): - x_timer = mock.Mock() - mock_add_timer.return_value = x_timer + self.hc.health_check_types = [mock_hc_1, mock_hc_2] - entry = { - 'cluster_id': 'CCID', - 'interval': 12, - 'check_type': consts.NODE_STATUS_POLL_URL, - 'params': { - 'recover_action': [{'name': 'REBUILD'}], - 'recovery_conditional': 'ANY_FAILED', - 'node_update_timeout': 1, - }, - } + self.hc._check_node_health(ctx, x_node, x_cluster) - res = self.hm._start_check(entry) + mock_hc_1.run_health_check.assert_called_once_with(ctx, x_node) + mock_hc_2.run_health_check.assert_called_once_with(ctx, x_node) + mock_recover.assert_called_once_with(ctx, x_node.id) - expected = copy.deepcopy(entry) - expected['timer'] = x_timer - self.assertEqual(expected, res) - mock_add_timer.assert_called_once_with( - self.hm._execute_health_check, None, None, 12, 'CCID', - {'operation': 'REBUILD'}, 'ANY_FAILED', 1) - mock_add_hc.assert_called_once_with('CCID', mock.ANY) - mock_hc_factory.assert_called_once_with( - consts.NODE_STATUS_POLL_URL, - 'CCID', 12, entry['params']) + @mock.patch.object(hm.HealthCheck, "_recover_node") + def test_check_node_health_all_failed_negative(self, mock_recover): + x_cluster = mock.Mock(user='USER_ID', project='PROJECT_ID', + id='CLUSTER_ID') + x_node = mock.Mock(id='FAKE_NODE', status="ERROR") + ctx = mock.Mock() - @mock.patch.object(threadgroup.ThreadGroup, 'add_dynamic_timer') - @mock.patch.object(hm.HealthManager, '_add_health_check') - @mock.patch.object(hm.HealthCheckType, 'factory') - def test_start_check_poll_url_and_polling(self, mock_hc_factory, - mock_add_hc, mock_add_timer): - x_timer = mock.Mock() - mock_add_timer.return_value = x_timer + self.hc.params['recovery_conditional'] = consts.ALL_FAILED + mock_hc_1 = mock.Mock() + mock_hc_1.run_health_check.return_value = False + mock_hc_2 = mock.Mock() + mock_hc_2.run_health_check.return_value = True - check_type = ','.join( - [consts.NODE_STATUS_POLL_URL, consts.NODE_STATUS_POLLING]) - entry = { - 'cluster_id': 'CCID', - 'interval': 12, - 'check_type': check_type, - 'params': { - 'recover_action': [{'name': 'REBUILD'}], - 'recovery_conditional': 'ALL_FAILED', - 'node_update_timeout': 1, - }, - } + self.hc.health_check_types = [mock_hc_1, mock_hc_2] - res = self.hm._start_check(entry) + self.hc._check_node_health(ctx, x_node, x_cluster) - expected = copy.deepcopy(entry) - expected['timer'] = x_timer - self.assertEqual(expected, res) - mock_add_timer.assert_called_once_with( - self.hm._execute_health_check, None, None, 12, 'CCID', - {'operation': 'REBUILD'}, 'ALL_FAILED', 1) - mock_add_hc.assert_has_calls( + mock_hc_1.run_health_check.assert_called_once_with(ctx, x_node) + mock_hc_2.run_health_check.assert_called_once_with(ctx, x_node) + mock_recover.assert_not_called() + + @mock.patch('senlin.objects.ActionGetRequest') + def test_wait_for_action(self, mock_action_req): + x_req = mock.Mock() + mock_action_req.return_value = x_req + + x_action = {'status': consts.ACTION_SUCCEEDED} + self.fake_rpc.call.return_value = x_action + + ctx = mock.Mock() + action_id = 'FAKE_ACTION_ID' + timeout = 5 + + # do it + res, err = self.hc._wait_for_action(ctx, action_id, timeout) + + self.assertTrue(res) + self.assertEqual(err, '') + self.fake_rpc.call.assert_called_with(ctx, 'action_get', x_req) + + @mock.patch('senlin.objects.ActionGetRequest') + def test_wait_for_action_success_before_timeout(self, mock_action_req): + x_req = mock.Mock() + mock_action_req.return_value = x_req + + x_action1 = {'status': consts.ACTION_RUNNING} + x_action2 = {'status': consts.ACTION_SUCCEEDED} + self.fake_rpc.call.side_effect = [x_action1, x_action2] + + ctx = mock.Mock() + action_id = 'FAKE_ACTION_ID' + timeout = 5 + + # do it + res, err = self.hc._wait_for_action(ctx, action_id, timeout) + + self.assertTrue(res) + self.assertEqual(err, '') + self.fake_rpc.call.assert_has_calls( [ - mock.call('CCID', mock.ANY), - mock.call('CCID', mock.ANY) - ] - ) - mock_hc_factory.assert_has_calls( - [ - mock.call( - consts.NODE_STATUS_POLL_URL, 'CCID', 12, entry['params'] - ), - mock.call( - consts.NODE_STATUS_POLLING, 'CCID', 12, entry['params'] - ), + mock.call(ctx, 'action_get', x_req), + mock.call(ctx, 'action_get', x_req) ] ) - def test_start_check_for_listening(self): - x_listener = mock.Mock() - mock_add_listener = self.patchobject(self.hm, '_add_listener', - return_value=x_listener) + @mock.patch('senlin.objects.ActionGetRequest') + def test_wait_for_action_timeout(self, mock_action_req): + x_req = mock.Mock() + mock_action_req.return_value = x_req - entry = { - 'cluster_id': 'CCID', - 'check_type': consts.LIFECYCLE_EVENTS, - 'params': {'recover_action': [{'name': 'REBUILD'}]}, - } - recover_action = {'operation': 'REBUILD'} - res = self.hm._start_check(entry) + x_action = {'status': consts.ACTION_RUNNING} + self.fake_rpc.call.return_value = x_action - expected = copy.deepcopy(entry) - expected['listener'] = x_listener - self.assertEqual(expected, res) - mock_add_listener.assert_called_once_with('CCID', recover_action) - - def test_start_check_for_listening_failed(self): - mock_add_listener = self.patchobject(self.hm, '_add_listener', - return_value=None) - - entry = { - 'cluster_id': 'CCID', - 'check_type': consts.LIFECYCLE_EVENTS, - 'params': {'recover_action': [{'name': 'REBUILD'}]}, - } - recover_action = {'operation': 'REBUILD'} - res = self.hm._start_check(entry) - - self.assertIsNone(res) - mock_add_listener.assert_called_once_with('CCID', recover_action) - - def test_start_check_other_types(self): - entry = { - 'cluster_id': 'CCID', - 'check_type': 'BOGUS TYPE', - 'params': {'recover_action': [{'name': 'REBUILD'}]}, - } - res = self.hm._start_check(entry) - - self.assertIsNone(res) - - def test_stop_check_with_timer(self): - x_timer = mock.Mock() - entry = {'timer': x_timer, 'cluster_id': 'CLUSTER_ID'} - mock_timer_done = self.patchobject(self.hm.TG, 'timer_done') - - x_hc_types = mock.MagicMock() - x_hc_types.__contains__.return_value = True - x_hc_types.__iter__.return_value = ['CLUSTER_ID'] - self.hm.health_check_types = x_hc_types + ctx = mock.Mock() + action_id = 'FAKE_ACTION_ID' + timeout = 5 # do it - res = self.hm._stop_check(entry) + res, err = self.hc._wait_for_action(ctx, action_id, timeout) - self.assertIsNone(res) - x_timer.stop.assert_called_once_with() - mock_timer_done.assert_called_once_with(x_timer) - x_hc_types.pop.assert_called_once_with('CLUSTER_ID') + self.assertFalse(res) + self.assertTrue(re.search('timeout', err, re.IGNORECASE)) + self.fake_rpc.call.assert_has_calls( + [ + mock.call(ctx, 'action_get', x_req) + ] + ) - def test_stop_check_with_listener(self): - x_thread = mock.Mock() - entry = {'listener': x_thread, 'cluster_id': 'CLUSTER_ID'} - mock_thread_done = self.patchobject(self.hm.TG, 'thread_done') + @mock.patch('senlin.objects.ActionGetRequest') + def test_wait_for_action_failed(self, mock_action_req): + x_req = mock.Mock() + mock_action_req.return_value = x_req - x_hc_types = mock.MagicMock() - x_hc_types.__contains__.return_value = False - x_hc_types.__iter__.return_value = ['CLUSTER_ID'] - self.hm.health_check_types = x_hc_types + x_action = {'status': consts.ACTION_FAILED} + self.fake_rpc.call.return_value = x_action + + ctx = mock.Mock() + action_id = 'FAKE_ACTION_ID' + timeout = 5 # do it - res = self.hm._stop_check(entry) + res, err = self.hc._wait_for_action(ctx, action_id, timeout) + + self.assertFalse(res) + self.assertEqual(err, 'Cluster check action failed or cancelled') + self.fake_rpc.call.assert_called_with(ctx, 'action_get', x_req) + + @mock.patch('senlin.objects.ActionGetRequest') + def test_wait_for_action_cancelled(self, mock_action_req): + x_req = mock.Mock() + mock_action_req.return_value = x_req + + x_action = {'status': consts.ACTION_CANCELLED} + self.fake_rpc.call.return_value = x_action + + ctx = mock.Mock() + action_id = 'FAKE_ACTION_ID' + timeout = 5 + + # do it + res, err = self.hc._wait_for_action(ctx, action_id, timeout) + + self.assertFalse(res) + self.assertEqual(err, 'Cluster check action failed or cancelled') + self.fake_rpc.call.assert_called_with(ctx, 'action_get', x_req) + + @mock.patch('senlin.objects.NodeRecoverRequest', autospec=True) + def test_recover_node(self, mock_req): + ctx = mock.Mock() + node_id = 'FAKE_NODE' + self.hc.recover_action = {'operation': 'REBUILD'} + + x_req = mock.Mock + mock_req.return_value = x_req + + x_action = {'action': 'RECOVER_ID1'} + self.fake_rpc.call.return_value = x_action + + # do it + res = self.hc._recover_node(ctx, node_id) + + self.assertEqual(x_action, res) + mock_req.assert_called_once_with( + identity=node_id, params=self.hc.recover_action) + self.fake_rpc.call.assert_called_once_with(ctx, 'node_recover', x_req) + + @mock.patch('senlin.objects.NodeRecoverRequest', autospec=True) + def test_recover_node_failed(self, mock_req): + ctx = mock.Mock() + node_id = 'FAKE_NODE' + self.hc.recover_action = {'operation': 'REBUILD'} + + x_req = mock.Mock + mock_req.return_value = x_req + + self.fake_rpc.call.side_effect = Exception('boom') + + # do it + res = self.hc._recover_node(ctx, node_id) self.assertIsNone(res) - x_thread.stop.assert_called_once_with() - mock_thread_done.assert_called_once_with(x_thread) - x_hc_types.pop.assert_not_called() + mock_req.assert_called_once_with( + identity=node_id, params=self.hc.recover_action) + self.fake_rpc.call.assert_called_once_with(ctx, 'node_recover', x_req) + + @mock.patch('senlin.objects.HealthRegistry', autospec=True) + def test_db_create(self, mock_hrdb): + self.hc.db_create() + mock_hrdb.create.assert_called_once_with( + self.hc.ctx, self.hc.cluster_id, self.hc.check_type, + self.hc.interval, self.hc.params, self.hc.engine_id, + self.hc.enabled) + + @mock.patch('senlin.objects.HealthRegistry', autospec=True) + def test_db_delete(self, mock_hrdb): + self.hc.db_delete() + mock_hrdb.delete.assert_called_once_with(self.hc.ctx, + self.hc.cluster_id) + + @mock.patch('senlin.objects.HealthRegistry', autospec=True) + def test_enable(self, mock_hrdb): + self.hc.enable() + mock_hrdb.update.assert_called_once_with( + self.hc.ctx, self.hc.cluster_id, {'enabled': True}) + + @mock.patch('senlin.objects.HealthRegistry', autospec=True) + def test_disable(self, mock_hrdb): + self.hc.disable() + mock_hrdb.update.assert_called_once_with( + self.hc.ctx, self.hc.cluster_id, {'enabled': False}) + + +class TestRuntimeHealthRegistry(base.SenlinTestCase): + + def setUp(self): + super(TestRuntimeHealthRegistry, self).setUp() + + mock_ctx = mock.Mock() + self.mock_tg = mock.Mock() + self.rhr = hm.RuntimeHealthRegistry(mock_ctx, 'ENGINE_ID', + self.mock_tg) + + def create_mock_entry(self, ctx=None, engine_id='ENGINE_ID', + cluster_id='CID', + check_type=None, + interval=60, node_update_timeout=60, params=None, + enabled=True, timer=None, listener=None, + type=consts.POLLING): + mock_entry = mock.Mock( + ctx=ctx, + engine_id=engine_id, + cluster_id=cluster_id, + check_type=check_type, + interval=interval, + node_update_timeout=node_update_timeout, + params=params, + enabled=enabled, + timer=timer, + listener=listener, + execute_health_check=mock.Mock(), + type=type) + return mock_entry + + @mock.patch.object(hm, 'HealthCheck') + def test_register_cluster(self, mock_hc): + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING]) + mock_entry.db_create = mock.Mock() + mock_hc.return_value = mock_entry + + self.rhr.register_cluster('CID', 60, 60, {}) + + self.assertEqual(mock_entry, self.rhr.registries['CID']) + self.mock_tg.add_dynamic_timer.assert_called_once_with( + mock_entry.execute_health_check, None, None) + self.mock_tg.add_thread.assert_not_called() + mock_entry.db_create.assert_called_once_with() + + @mock.patch.object(hm, 'HealthCheck') + def test_register_cluster_failed(self, mock_hc): + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING]) + mock_entry.db_create = mock.Mock() + mock_entry.db_delete = mock.Mock() + + mock_hc.return_value = mock_entry + self.rhr.add_health_check = mock.Mock() + self.rhr.add_health_check.side_effect = Exception + + self.rhr.register_cluster('CID', 60, 60, {}) + + self.assertEqual(mock_entry, self.rhr.registries['CID']) + self.mock_tg.add_dynamic_timer.assert_not_called() + self.mock_tg.add_thread.assert_not_called() + mock_entry.db_create.assert_called_once_with() + mock_entry.db_delete.assert_called_once_with() + + def test_unregister_cluster_with_timer(self): + timer = mock.Mock() + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING], + timer=timer) + self.rhr.registries['CID'] = mock_entry + mock_entry.db_delete = mock.Mock() + + self.rhr.unregister_cluster('CID') + + mock_entry.db_delete.assert_called_once_with() + timer.stop.assert_called_once_with() + self.mock_tg.timer_done.assert_called_once_with(timer) + self.assertIsNone(mock_entry.timer) + + def test_unregister_cluster_with_listener(self): + listener = mock.Mock() + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING], + listener=listener) + self.rhr.registries['CID'] = mock_entry + mock_entry.db_delete = mock.Mock() + + self.rhr.unregister_cluster('CID') + + mock_entry.db_delete.assert_called_once_with() + listener.stop.assert_called_once_with() + self.mock_tg.thread_done.assert_called_once_with(listener) + self.assertIsNone(mock_entry.listener) + + def test_unregister_cluster_failed(self): + listener = mock.Mock() + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING], + listener=listener) + self.rhr.registries['CID'] = mock_entry + mock_entry.db_delete.side_effect = Exception + + self.rhr.unregister_cluster('CID') + + listener.stop.assert_called_once_with() + self.mock_tg.thread_done.assert_called_once_with(listener) + self.assertIsNone(mock_entry.listener) + + def test_enable_cluster(self): + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING], + enabled=False) + + def mock_enable(): + mock_entry.enabled = True + return True + + mock_entry.enable = mock_enable + + self.rhr.registries['CID'] = mock_entry + + self.rhr.enable_cluster('CID') + + self.assertTrue(mock_entry.enabled) + self.mock_tg.add_dynamic_timer.assert_called_once_with( + mock_entry.execute_health_check, None, None) + self.mock_tg.add_thread.assert_not_called() + + def test_enable_cluster_failed(self): + timer = mock.Mock() + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING], + enabled=False, timer=timer) + mock_entry.enable = mock.Mock() + mock_entry.enable.side_effect = Exception + + self.rhr.registries['CID'] = mock_entry + + self.rhr.enable_cluster('CID') + + self.mock_tg.add_dynamic_timer.assert_not_called() + self.mock_tg.add_thread.assert_not_called() + timer.stop.assert_called_once_with() + self.mock_tg.timer_done.assert_called_once_with(timer) + + def test_disable_cluster(self): + timer = mock.Mock() + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING], + enabled=True, timer=timer) + + def mock_disable(): + mock_entry.enabled = False + + mock_entry.disable = mock_disable + + self.rhr.registries['CID'] = mock_entry + + self.rhr.disable_cluster('CID') + + self.assertEqual(False, mock_entry.enabled) + + self.mock_tg.add_dynamic_timer.assert_not_called() + self.mock_tg.add_thread.assert_not_called() + timer.stop.assert_called_once_with() + self.mock_tg.timer_done.assert_called_once_with(timer) + + def test_disable_cluster_failed(self): + timer = mock.Mock() + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING], + enabled=True, timer=timer) + + mock_entry.enable.side_effect = Exception + + self.rhr.registries['CID'] = mock_entry + + self.rhr.disable_cluster('CID') + + self.mock_tg.add_dynamic_timer.assert_not_called() + self.mock_tg.add_thread.assert_not_called() + timer.stop.assert_called_once_with() + self.mock_tg.timer_done.assert_called_once_with(timer) + + def test_add_timer(self): + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING]) + self.rhr.registries['CID'] = mock_entry + fake_timer = mock.Mock() + self.mock_tg.add_dynamic_timer = mock.Mock() + self.mock_tg.add_dynamic_timer.return_value = fake_timer + + self.rhr._add_timer('CID') + + self.assertEqual(fake_timer, mock_entry.timer) + self.mock_tg.add_dynamic_timer.assert_called_once_with( + mock_entry.execute_health_check, None, None) + + def test_add_timer_failed(self): + fake_timer = mock.Mock() + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING], timer=fake_timer) + self.rhr.registries['CID'] = mock_entry + self.mock_tg.add_dynamic_timer = mock.Mock() + + self.rhr._add_timer('CID') + + self.assertEqual(fake_timer, mock_entry.timer) + self.mock_tg.add_dynamic_timer.assert_not_called() + + @mock.patch.object(obj_profile.Profile, 'get') + @mock.patch.object(obj_cluster.Cluster, 'get') + def test_add_listener_nova(self, mock_cluster, mock_profile): + cfg.CONF.set_override('nova_control_exchange', 'FAKE_NOVA_EXCHANGE', + group='health_manager') + mock_entry = self.create_mock_entry( + check_type=[consts.LIFECYCLE_EVENTS]) + self.rhr.registries['CID'] = mock_entry + fake_listener = mock.Mock() + x_cluster = mock.Mock(project='PROJECT_ID', profile_id='PROFILE_ID') + mock_cluster.return_value = x_cluster + x_profile = mock.Mock(type='os.nova.server-1.0') + mock_profile.return_value = x_profile + self.mock_tg.add_thread = mock.Mock() + self.mock_tg.add_thread.return_value = fake_listener + + self.rhr._add_listener('CID') + + mock_cluster.assert_called_once_with(self.rhr.ctx, 'CID', + project_safe=False) + mock_profile.assert_called_once_with(self.rhr.ctx, 'PROFILE_ID', + project_safe=False) + self.mock_tg.add_thread.assert_called_once_with( + hm.ListenerProc, 'FAKE_NOVA_EXCHANGE', 'PROJECT_ID', 'CID', + mock_entry.recover_action) + + @mock.patch.object(obj_profile.Profile, 'get') + @mock.patch.object(obj_cluster.Cluster, 'get') + def test_add_listener_heat(self, mock_cluster, mock_profile): + cfg.CONF.set_override('heat_control_exchange', 'FAKE_HEAT_EXCHANGE', + group='health_manager') + mock_entry = self.create_mock_entry( + check_type=[consts.LIFECYCLE_EVENTS]) + self.rhr.registries['CID'] = mock_entry + fake_listener = mock.Mock() + x_cluster = mock.Mock(project='PROJECT_ID', profile_id='PROFILE_ID') + mock_cluster.return_value = x_cluster + x_profile = mock.Mock(type='os.heat.stack-1.0') + mock_profile.return_value = x_profile + self.mock_tg.add_thread = mock.Mock() + self.mock_tg.add_thread.return_value = fake_listener + + self.rhr._add_listener('CID') + + mock_cluster.assert_called_once_with(self.rhr.ctx, 'CID', + project_safe=False) + mock_profile.assert_called_once_with(self.rhr.ctx, 'PROFILE_ID', + project_safe=False) + self.mock_tg.add_thread.assert_called_once_with( + hm.ListenerProc, 'FAKE_HEAT_EXCHANGE', 'PROJECT_ID', 'CID', + mock_entry.recover_action) + + @mock.patch.object(obj_profile.Profile, 'get') + @mock.patch.object(obj_cluster.Cluster, 'get') + def test_add_listener_failed(self, mock_cluster, mock_profile): + cfg.CONF.set_override('heat_control_exchange', 'FAKE_HEAT_EXCHANGE', + group='health_manager') + fake_listener = mock.Mock() + mock_entry = self.create_mock_entry( + check_type=[consts.LIFECYCLE_EVENTS], listener=fake_listener) + self.rhr.registries['CID'] = mock_entry + + x_cluster = mock.Mock(project='PROJECT_ID', profile_id='PROFILE_ID') + mock_cluster.return_value = x_cluster + x_profile = mock.Mock(type='os.heat.stack-1.0') + mock_profile.return_value = x_profile + self.mock_tg.add_thread = mock.Mock() + + self.rhr._add_listener('CID') + + mock_cluster.assert_not_called() + mock_profile.assert_not_called() + + self.mock_tg.add_thread.assert_not_called() + + def test_add_health_check_polling(self): + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING]) + self.rhr.registries['CID'] = mock_entry + self.rhr._add_timer = mock.Mock() + self.rhr._add_listener = mock.Mock() + + self.rhr.add_health_check(mock_entry) + + self.rhr._add_timer.assert_called_once_with('CID') + self.rhr._add_listener.assert_not_called() + + def test_add_health_check_events(self): + mock_entry = self.create_mock_entry( + check_type=[consts.LIFECYCLE_EVENTS], type=consts.EVENTS) + self.rhr.registries['CID'] = mock_entry + self.rhr._add_timer = mock.Mock() + self.rhr._add_listener = mock.Mock() + + self.rhr.add_health_check(mock_entry) + + self.rhr._add_timer.assert_not_called() + self.rhr._add_listener.assert_called_once_with('CID') + + def test_add_health_check_disabled(self): + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING], enabled=False) + self.rhr.registries['CID'] = mock_entry + self.rhr._add_timer = mock.Mock() + self.rhr._add_listener = mock.Mock() + + self.rhr.add_health_check(mock_entry) + + self.rhr._add_timer.assert_not_called() + self.rhr._add_listener.assert_not_called() + + def test_add_health_check_timer_exists(self): + fake_timer = mock.Mock() + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING], timer=fake_timer) + self.rhr.registries['CID'] = mock_entry + self.rhr._add_timer = mock.Mock() + self.rhr._add_listener = mock.Mock() + + self.rhr.add_health_check(mock_entry) + + self.rhr._add_timer.assert_not_called() + self.rhr._add_listener.assert_not_called() + + def test_remove_health_check_timer(self): + fake_timer = mock.Mock() + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING], timer=fake_timer) + self.rhr.registries['CID'] = mock_entry + + self.rhr.remove_health_check(mock_entry) + + fake_timer.stop.assert_called_once_with() + self.mock_tg.timer_done.assert_called_once_with(fake_timer) + self.mock_tg.thread_done.asset_not_called() + self.assertIsNone(mock_entry.timer) + + def test_remove_health_check_listener(self): + fake_listener = mock.Mock() + mock_entry = self.create_mock_entry( + check_type=[consts.NODE_STATUS_POLLING], listener=fake_listener) + self.rhr.registries['CID'] = mock_entry + + self.rhr.remove_health_check(mock_entry) + + fake_listener.stop.assert_called_once_with() + self.mock_tg.timer_done.asset_not_called() + self.mock_tg.thread_done.assert_called_once_with(fake_listener) + self.assertIsNone(mock_entry.listener) + + +class TestHealthManager(base.SenlinTestCase): + + def setUp(self): + super(TestHealthManager, self).setUp() + + mock_eng = mock.Mock() + mock_eng.engine_id = 'ENGINE_ID' + topic = consts.HEALTH_MANAGER_TOPIC + version = consts.RPC_API_VERSION + self.hm = hm.HealthManager(mock_eng, topic, version) + + def test_init(self): + self.assertEqual('ENGINE_ID', self.hm.engine_id) + self.assertIsNotNone(self.hm.TG) + self.assertIsNotNone(self.hm.rpc_client) + self.assertEqual(consts.HEALTH_MANAGER_TOPIC, self.hm.topic) + self.assertEqual(consts.RPC_API_VERSION, self.hm.version) + self.assertEqual(0, len(self.hm.registries)) + + def test_task(self): + self.hm.health_registry = mock.Mock() + self.hm.task() + self.hm.health_registry.load_runtime_registry.assert_called_once_with() @mock.patch('oslo_messaging.Target') def test_start(self, mock_target): @@ -1804,144 +1770,28 @@ class TestHealthManager(base.SenlinTestCase): mock_add_timer.assert_called_once_with( self.hm.task, None, cfg.CONF.periodic_interval) - @mock.patch.object(hr.HealthRegistry, 'create') - @mock.patch.object(hm.HealthManager, '_start_check') - def test_register_cluster(self, mock_check, mock_reg_create): - entry = { - 'cluster_id': 'CLUSTER_ID', - 'check_type': consts.NODE_STATUS_POLLING, - 'interval': 50, - 'params': { - 'blah': '123', - 'detection_modes': [ - { - 'type': consts.NODE_STATUS_POLLING, - 'poll_url': '', - 'poll_url_ssl_verify': True, - 'poll_url_conn_error_as_unhealthy': True, - 'poll_url_healthy_response': '', - 'poll_url_retry_limit': '', - 'poll_url_retry_interval': '', - } - ], - }, - 'enabled': True - } + def test_stop(self): + self.hm.TG = mock.Mock() + self.hm.stop() + self.hm.TG.stop_timers.assert_called_once_with() + def test_register_cluster(self): + self.hm.health_registry = mock.Mock() ctx = mock.Mock() + self.hm.register_cluster(ctx, 'CID', 60, 160, {}, True) + self.hm.health_registry.register_cluster.assert_called_once_with( + cluster_id='CID', + enabled=True, + interval=60, + node_update_timeout=160, + params={}) - x_reg = mock.Mock(cluster_id=entry['cluster_id'], - check_type=entry['check_type'], - interval=entry['interval'], params=entry['params'], - enabled=entry['enabled']) - mock_reg_create.return_value = x_reg - - self.hm.register_cluster( - ctx, cluster_id=entry['cluster_id'], interval=entry['interval'], - node_update_timeout=1, params=entry['params'], - enabled=entry['enabled']) - - mock_reg_create.assert_called_once_with( - ctx, entry['cluster_id'], consts.NODE_STATUS_POLLING, - entry['interval'], entry['params'], 'ENGINE_ID', - enabled=entry['enabled']) - mock_check.assert_called_once_with(entry) - self.assertEqual(1, len(self.hm.registries)) - - @mock.patch.object(hr.HealthRegistry, 'create') - @mock.patch.object(hm.HealthManager, '_start_check') - def test_register_cluster_not_enabled(self, mock_check, mock_reg_create): - entry = { - 'cluster_id': 'CLUSTER_ID', - 'check_type': consts.NODE_STATUS_POLLING, - 'interval': 50, - 'params': { - 'blah': '123', - 'detection_modes': [ - { - 'type': consts.NODE_STATUS_POLLING, - 'poll_url': '', - 'poll_url_ssl_verify': True, - 'poll_url_conn_error_as_unhealthy': True, - 'poll_url_healthy_response': '', - 'poll_url_retry_limit': '', - 'poll_url_retry_interval': '', - } - ], - }, - 'enabled': False - } - + def test_unregister_cluster(self): + self.hm.health_registry = mock.Mock() ctx = mock.Mock() - - x_reg = mock.Mock(cluster_id=entry['cluster_id'], - check_type=entry['check_type'], - interval=entry['interval'], params=entry['params'], - enabled=entry['enabled']) - mock_reg_create.return_value = x_reg - - self.hm.register_cluster( - ctx, cluster_id=entry['cluster_id'], interval=entry['interval'], - node_update_timeout=1, params=entry['params'], - enabled=entry['enabled']) - - mock_reg_create.assert_called_once_with( - ctx, entry['cluster_id'], consts.NODE_STATUS_POLLING, - entry['interval'], entry['params'], 'ENGINE_ID', - enabled=entry['enabled']) - mock_check.assert_not_called() - self.assertEqual(1, len(self.hm.registries)) - - @mock.patch.object(hm.HealthManager, '_stop_check') - @mock.patch.object(hr.HealthRegistry, 'delete') - def test_unregister_cluster(self, mock_delete, mock_stop): - ctx = mock.Mock() - timer = mock.Mock() - registry = { - 'cluster_id': 'CLUSTER_ID', - 'check_type': 'NODE_STATUS_POLLING', - 'interval': 50, - 'params': {}, - 'timer': timer, - 'enabled': True, - } - self.hm.rt['registries'] = [registry] - - self.hm.unregister_cluster(ctx, cluster_id='CLUSTER_ID') - - self.assertEqual(0, len(self.hm.registries)) - mock_stop.assert_called_once_with(registry) - mock_delete.assert_called_once_with(ctx, 'CLUSTER_ID') - - @mock.patch.object(hr.HealthRegistry, 'update') - @mock.patch.object(hm.HealthManager, '_start_check') - def test_enable_cluster(self, mock_start, mock_update): - ctx = mock.Mock() - entry1 = {'cluster_id': 'FAKE_ID', 'enabled': False} - entry2 = {'cluster_id': 'ANOTHER_CLUSTER', 'enabled': False} - self.hm.rt['registries'] = [entry1, entry2] - - self.hm.enable_cluster(ctx, 'FAKE_ID') - - mock_start.assert_called_once_with(entry1) - self.assertIn({'cluster_id': 'FAKE_ID', 'enabled': True}, - self.hm.rt['registries']) - mock_update.assert_called_once_with(ctx, 'FAKE_ID', {'enabled': True}) - - @mock.patch.object(hr.HealthRegistry, 'update') - @mock.patch.object(hm.HealthManager, '_stop_check') - def test_disable_cluster(self, mock_stop, mock_update): - ctx = mock.Mock() - entry1 = {'cluster_id': 'FAKE_ID', 'enabled': True} - entry2 = {'cluster_id': 'ANOTHER_CLUSTER', 'enabled': True} - self.hm.rt['registries'] = [entry1, entry2] - - self.hm.disable_cluster(ctx, 'FAKE_ID') - - mock_stop.assert_called_once_with(entry1) - self.assertIn({'cluster_id': 'FAKE_ID', 'enabled': False}, - self.hm.rt['registries']) - mock_update.assert_called_once_with(ctx, 'FAKE_ID', {'enabled': False}) + self.hm.unregister_cluster(ctx, 'CID') + self.hm.health_registry.unregister_cluster.assert_called_once_with( + 'CID') @mock.patch.object(context, 'get_admin_context') @mock.patch.object(hr.HealthRegistry, 'get')