From 30d9176e50fbb7a034ab795d325fce58070a6517 Mon Sep 17 00:00:00 2001 From: James Gu Date: Thu, 26 Jan 2017 23:18:00 -0800 Subject: [PATCH] Monasca agent on ESX hangs on collector autorestart This improves the likelyhood that the collector daemon will exit during auto restart. Also removed gevent requirement and usage so only eventlet is used. The conflict between gevent and eventlet caused multiproccessing worker join and sys.exit to hang forever. Change-Id: I60f980f4a74eafb709e51c0f520a81e2d72cb609 --- monasca_agent/collector/checks/collector.py | 38 +++++- .../collector/checks/services_checks.py | 113 ++++++++---------- monasca_agent/collector/daemon.py | 41 ++++--- requirements.txt | 5 +- tests/test_services_check.py | 42 +++++++ 5 files changed, 152 insertions(+), 87 deletions(-) create mode 100644 tests/test_services_check.py diff --git a/monasca_agent/collector/checks/collector.py b/monasca_agent/collector/checks/collector.py index 78f7e7b1..7e5f671d 100644 --- a/monasca_agent/collector/checks/collector.py +++ b/monasca_agent/collector/checks/collector.py @@ -1,10 +1,11 @@ -# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP +# (C) Copyright 2015-2017 Hewlett Packard Enterprise Development LP # Core modules import logging from multiprocessing.dummy import Pool import os import socket +import sys import threading import time @@ -264,7 +265,7 @@ class Collector(util.Dimensions): return measurements - def stop(self): + def stop(self, timeout=0): """Tell the collector to stop at the next logical point. """ # This is called when the process is being killed, so @@ -273,10 +274,39 @@ class Collector(util.Dimensions): # because the forwarder is quite possibly already killed # in which case we'll get a misleading error in the logs. # Best to not even try. + + log.info("stopping the collector with timeout %d seconds" % timeout) + self.continue_running = False for check_name in self.collection_times: check = self.collection_times[check_name]['check'] check.stop() + + for check_name in self.collection_results: + run_time = time.time() - self.collection_results[check_name]['start_time'] + log.info('When exiting... Plugin %s still running after %d seconds' % ( + check_name, run_time)) + self.pool.close() - # Can't call self.pool.join() because this is an event thread - # and that will cause a BlockingSwitchOutError + + # Won't call join() if timeout is zero. If we are in an event thread + # a BlockingSwitchOutError occurs if wait + + if (timeout > 0): + timer = util.Timer() + for worker in self.pool._pool: + t = timeout - timer.total() + if t <= 0: + break + if worker.is_alive(): + try: + worker.join(t) + except Exception: + log.error("Unexpected error: ", sys.exc_info()[0]) + + for worker in self.pool._pool: + if worker.is_alive(): + # the worker didn't complete in the specified timeout. + # collector must honor the stop request to avoid agent stop/restart hang. + # os._exit() should be called after collector stops. + log.info('worker %s is still alive when collector stop times out.' % worker.name) diff --git a/monasca_agent/collector/checks/services_checks.py b/monasca_agent/collector/checks/services_checks.py index 6d29e538..af1ee7e0 100644 --- a/monasca_agent/collector/checks/services_checks.py +++ b/monasca_agent/collector/checks/services_checks.py @@ -1,16 +1,16 @@ -# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP +# (C) Copyright 2015-2017 Hewlett Packard Enterprise Development Company LP import collections +from concurrent import futures import Queue import threading -from gevent import monkey -from gevent import Timeout -from multiprocessing.dummy import Pool as ThreadPool +import eventlet +from eventlet.green import time +import multiprocessing import monasca_agent.collector.checks - DEFAULT_TIMEOUT = 180 DEFAULT_SIZE_POOL = 6 MAX_LOOP_ITERATIONS = 1000 @@ -20,7 +20,6 @@ FAILURE = "FAILURE" up_down = collections.namedtuple('up_down', ['UP', 'DOWN']) Status = up_down('UP', 'DOWN') EventType = up_down("servicecheck.state_change.up", "servicecheck.state_change.down") -monkey.patch_all() class ServicesCheck(monasca_agent.collector.checks.AgentCheck): @@ -34,7 +33,7 @@ class ServicesCheck(monasca_agent.collector.checks.AgentCheck): The main agent loop will call the check function for each instance for each iteration of the loop. The check method will make an asynchronous call to the _process method in - one of the thread initiated in the thread pool created in this class constructor. + one of the thread pool executors created in this class constructor. The _process method will call the _check method of the inherited class which will perform the actual check. @@ -57,23 +56,26 @@ class ServicesCheck(monasca_agent.collector.checks.AgentCheck): # The pool size should be the minimum between the number of instances # and the DEFAULT_SIZE_POOL. It can also be overridden by the 'threads_count' # parameter in the init_config of the check - default_size = min(self.instance_count(), DEFAULT_SIZE_POOL) + try: + default_size = min(self.instance_count(), multiprocessing.cpu_count() + 1) + except NotImplementedError: + default_size = min(self.instance_count(), DEFAULT_SIZE_POOL) self.pool_size = int(self.init_config.get('threads_count', default_size)) self.timeout = int(self.agent_config.get('timeout', DEFAULT_TIMEOUT)) def start_pool(self): - self.log.info("Starting Thread Pool") - self.pool = ThreadPool(self.pool_size) - if threading.activeCount() > MAX_ALLOWED_THREADS: - self.log.error("Thread count ({0}) exceeds maximum ({1})".format(threading.activeCount(), - MAX_ALLOWED_THREADS)) - self.running_jobs = set() + if self.pool is None: + self.log.info("Starting Thread Pool Exceutor") + self.pool = futures.ThreadPoolExecutor(max_workers=self.pool_size) + if threading.activeCount() > MAX_ALLOWED_THREADS: + self.log.error('Thread count (%d) exceeds maximum (%d)' % (threading.activeCount(), + MAX_ALLOWED_THREADS)) + self.running_jobs = {} def stop_pool(self): self.log.info("Stopping Thread Pool") if self.pool: - self.pool.close() - self.pool.join() + self.pool.shutdown(wait=True) self.pool = None def restart_pool(self): @@ -81,80 +83,63 @@ class ServicesCheck(monasca_agent.collector.checks.AgentCheck): self.start_pool() def check(self, instance): - if not self.pool: - self.start_pool() - self._process_results() + self.start_pool() name = instance.get('name', None) if name is None: self.log.error('Each service check must have a name') return - if name not in self.running_jobs: + if (name not in self.running_jobs) or self.running_jobs[name].done(): # A given instance should be processed one at a time - self.running_jobs.add(name) - self.pool.apply_async(self._process, args=(instance,)) + self.running_jobs[name] = self.pool.submit(self._process, instance) else: self.log.info("Instance: %s skipped because it's already running." % name) def _process(self, instance): name = instance.get('name', None) try: - with Timeout(self.timeout): + with eventlet.timeout.Timeout(self.timeout): return_value = self._check(instance) if not return_value: return status, msg = return_value - result = (status, msg, name, instance) - # We put the results in the result queue - self.resultsq.put(result) - except Timeout: - self.log.error('ServiceCheck {0} timed out'.format(name)) + self._process_result(status, msg, name, instance) + except eventlet.Timeout: + msg = 'ServiceCheck {0} timed out'.format(name) + self.log.error(msg) + self._process_result(FAILURE, msg, name, instance) except Exception: - self.log.exception('Failure in ServiceCheck {0}'.format(name)) - result = (FAILURE, FAILURE, FAILURE, FAILURE) - self.resultsq.put(result) + msg = 'Failure in ServiceCheck {0}'.format(name) + self.log.exception(msg) + self._process_result(FAILURE, msg, name, instance) finally: - self.running_jobs.remove(name) + del self.running_jobs[name] - def _process_results(self): - for i in range(MAX_LOOP_ITERATIONS): - try: - # We want to fetch the result in a non blocking way - status, msg, name, queue_instance = self.resultsq.get_nowait() - except Queue.Empty: - break + def _process_result(self, status, msg, name, queue_instance): + if name not in self.statuses: + self.statuses[name] = [] - if status == FAILURE: - self.nb_failures += 1 - if self.nb_failures >= self.pool_size - 1: - self.nb_failures = 0 - self.restart_pool() - continue + self.statuses[name].append(status) - if name not in self.statuses: - self.statuses[name] = [] + window = int(queue_instance.get('window', 1)) - self.statuses[name].append(status) + if window > 256: + self.log.warning("Maximum window size (256) exceeded, defaulting it to 256") + window = 256 - window = int(queue_instance.get('window', 1)) + threshold = queue_instance.get('threshold', 1) - if window > 256: - self.log.warning("Maximum window size (256) exceeded, defaulting it to 256") - window = 256 + if len(self.statuses[name]) > window: + self.statuses[name].pop(0) - threshold = queue_instance.get('threshold', 1) + nb_failures = self.statuses[name].count(Status.DOWN) - if len(self.statuses[name]) > window: - self.statuses[name].pop(0) - - nb_failures = self.statuses[name].count(Status.DOWN) - - if nb_failures >= threshold: - if self.notified.get(name, Status.UP) != Status.DOWN: - self.notified[name] = Status.DOWN - else: - if self.notified.get(name, Status.UP) != Status.UP: - self.notified[name] = Status.UP + if nb_failures >= threshold: + if self.notified.get(name, Status.UP) != Status.DOWN: + self.notified[name] = Status.DOWN + else: + if self.notified.get(name, Status.UP) != Status.UP: + self.notified[name] = Status.UP def _check(self, instance): """This function should be implemented by inherited classes. diff --git a/monasca_agent/collector/daemon.py b/monasca_agent/collector/daemon.py index ac0cd4e1..5620240e 100644 --- a/monasca_agent/collector/daemon.py +++ b/monasca_agent/collector/daemon.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP +# (C) Copyright 2015-2017 Hewlett Packard Enterprise Development LP # Core modules import glob @@ -53,20 +53,26 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon): self.start_event = start_event def _handle_sigterm(self, signum, frame): - log.debug("Caught sigterm. Stopping run loop.") + log.debug("Caught sigterm.") + self._stop(0) + sys.exit(0) + + def _handle_sigusr1(self, signum, frame): + log.debug("Caught sigusrl.") + self._stop(120) + sys.exit(monasca_agent.common.daemon.AgentSupervisor.RESTART_EXIT_STATUS) + + def _stop(self, timeout=0): + log.info("Stopping collector run loop.") self.run_forever = False if jmxfetch.JMXFetch.is_running(): jmxfetch.JMXFetch.stop() if self.collector: - self.collector.stop() - log.debug("Collector is stopped.") - sys.exit(0) + self.collector.stop(timeout) - def _handle_sigusr1(self, signum, frame): - self._handle_sigterm(signum, frame) - self._do_restart() + log.info('collector stopped') def run(self, config): """Main loop of the collector. @@ -94,6 +100,9 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon): self.restart_interval = int(util.get_collector_restart_interval()) self.agent_start = time.time() + exitCode = 0 + exitTimeout = 0 + # Run the main loop. while self.run_forever: collection_start = time.time() @@ -125,7 +134,10 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon): # Check if we should restart. if self.autorestart and self._should_restart(): - self._do_restart() + self.run_forever = False + exitCode = monasca_agent.common.daemon.AgentSupervisor.RESTART_EXIT_STATUS + exitTimeout = 120 + log.info('Startng an auto restart') # Only plan for the next loop if we will continue, # otherwise just exit quickly. @@ -137,23 +149,18 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon): log.info("Collection took {0} which is as long or longer then the configured collection frequency " "of {1}. Starting collection again without waiting in result.".format(collection_time, check_frequency)) + self._stop(exitTimeout) # Explicitly kill the process, because it might be running # as a daemon. - log.info("Exiting. Bye bye.") - sys.exit(0) + log.info("Exiting collector daemon, code %d." % exitCode) + os._exit(exitCode) def _should_restart(self): if time.time() - self.agent_start > self.restart_interval: return True return False - def _do_restart(self): - log.info("Running an auto-restart.") - if self.collector: - self.collector.stop() - sys.exit(monasca_agent.common.daemon.AgentSupervisor.RESTART_EXIT_STATUS) - def main(): options, args = util.get_parsed_args() diff --git a/requirements.txt b/requirements.txt index a3493a71..8ce5415a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,6 @@ oslo.i18n>=2.1.0 # Apache-2.0 oslo.utils>=3.18.0 # Apache-2.0 oslo.vmware>=2.11.0 # Apache-2.0 PyYAML>=3.10.0 # MIT -gevent>=1.1.1 httplib2>=0.7.5 # MIT netaddr>=0.7.13,!=0.7.16 # BSD ntplib>=0.3.2,<0.4 @@ -21,4 +20,6 @@ redis>=2.10.0 # MIT six>=1.9.0 # MIT supervisor>=3.1.3,<3.4 stevedore>=1.17.1 # Apache-2.0 -tornado>=4.3 \ No newline at end of file +tornado>=4.3 +futures>=2.1.3 +eventlet!=0.18.3,>=0.18.2 # MIT diff --git a/tests/test_services_check.py b/tests/test_services_check.py new file mode 100644 index 00000000..9a1b2d58 --- /dev/null +++ b/tests/test_services_check.py @@ -0,0 +1,42 @@ +# (C) Copyright 2017 Hewlett Packard Enterprise Development LP + +from unittest import TestCase +from eventlet.green import time + +from monasca_agent.collector.checks.services_checks import ServicesCheck + +from monasca_agent.collector.checks.services_checks import Status + +class DummyServiceCheck(ServicesCheck): + def __init__(self, name, init_config, agent_config, instances=None): + super(DummyServiceCheck, self).__init__(name, init_config, agent_config, instances) + + def _check(self, instance): + w = instance.get('service_wait', 5) + time.sleep(w) + return Status.UP, "UP" + + def set_timeout(self, timeout): + self.timeout = timeout + +class TestServicesCheck(TestCase): + def setUp(self): + TestCase.setUp(self) + pass + + def test_service_check_timeout(self): + + init_config = {} + agent_config = {'timeout': 4} + instances = [] + for wait in [1,2, 6, 8]: + instances.append({'service_wait': wait, 'name' : 'dummy %d' % wait}) + self.dummy_service = DummyServiceCheck("dummy service", init_config, agent_config, instances=instances) + self.dummy_service.run() + time.sleep(10) + self.assertEqual(self.dummy_service.statuses['dummy 1'][0], Status.UP) + self.assertEqual(self.dummy_service.statuses['dummy 2'][0], Status.UP) + + self.assertEqual(self.dummy_service.statuses['dummy 6'][0], "FAILURE") + self.assertEqual(self.dummy_service.statuses['dummy 8'][0], "FAILURE") +