Monasca agent on ESX hangs on collector autorestart

This improves the likelyhood that the collector daemon will exit during
auto restart. Also removed gevent requirement and usage so only
eventlet is used. The conflict between gevent and eventlet caused
multiproccessing worker join and sys.exit to hang forever.

Change-Id: I60f980f4a74eafb709e51c0f520a81e2d72cb609
This commit is contained in:
James Gu 2017-01-26 23:18:00 -08:00 committed by James Gu
parent 6f7476874b
commit 30d9176e50
5 changed files with 152 additions and 87 deletions

View File

@ -1,10 +1,11 @@
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP
# (C) Copyright 2015-2017 Hewlett Packard Enterprise Development LP
# Core modules
import logging
from multiprocessing.dummy import Pool
import os
import socket
import sys
import threading
import time
@ -264,7 +265,7 @@ class Collector(util.Dimensions):
return measurements
def stop(self):
def stop(self, timeout=0):
"""Tell the collector to stop at the next logical point.
"""
# This is called when the process is being killed, so
@ -273,10 +274,39 @@ class Collector(util.Dimensions):
# because the forwarder is quite possibly already killed
# in which case we'll get a misleading error in the logs.
# Best to not even try.
log.info("stopping the collector with timeout %d seconds" % timeout)
self.continue_running = False
for check_name in self.collection_times:
check = self.collection_times[check_name]['check']
check.stop()
for check_name in self.collection_results:
run_time = time.time() - self.collection_results[check_name]['start_time']
log.info('When exiting... Plugin %s still running after %d seconds' % (
check_name, run_time))
self.pool.close()
# Can't call self.pool.join() because this is an event thread
# and that will cause a BlockingSwitchOutError
# Won't call join() if timeout is zero. If we are in an event thread
# a BlockingSwitchOutError occurs if wait
if (timeout > 0):
timer = util.Timer()
for worker in self.pool._pool:
t = timeout - timer.total()
if t <= 0:
break
if worker.is_alive():
try:
worker.join(t)
except Exception:
log.error("Unexpected error: ", sys.exc_info()[0])
for worker in self.pool._pool:
if worker.is_alive():
# the worker didn't complete in the specified timeout.
# collector must honor the stop request to avoid agent stop/restart hang.
# os._exit() should be called after collector stops.
log.info('worker %s is still alive when collector stop times out.' % worker.name)

View File

@ -1,16 +1,16 @@
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015-2017 Hewlett Packard Enterprise Development Company LP
import collections
from concurrent import futures
import Queue
import threading
from gevent import monkey
from gevent import Timeout
from multiprocessing.dummy import Pool as ThreadPool
import eventlet
from eventlet.green import time
import multiprocessing
import monasca_agent.collector.checks
DEFAULT_TIMEOUT = 180
DEFAULT_SIZE_POOL = 6
MAX_LOOP_ITERATIONS = 1000
@ -20,7 +20,6 @@ FAILURE = "FAILURE"
up_down = collections.namedtuple('up_down', ['UP', 'DOWN'])
Status = up_down('UP', 'DOWN')
EventType = up_down("servicecheck.state_change.up", "servicecheck.state_change.down")
monkey.patch_all()
class ServicesCheck(monasca_agent.collector.checks.AgentCheck):
@ -34,7 +33,7 @@ class ServicesCheck(monasca_agent.collector.checks.AgentCheck):
The main agent loop will call the check function for each instance for
each iteration of the loop.
The check method will make an asynchronous call to the _process method in
one of the thread initiated in the thread pool created in this class constructor.
one of the thread pool executors created in this class constructor.
The _process method will call the _check method of the inherited class
which will perform the actual check.
@ -57,23 +56,26 @@ class ServicesCheck(monasca_agent.collector.checks.AgentCheck):
# The pool size should be the minimum between the number of instances
# and the DEFAULT_SIZE_POOL. It can also be overridden by the 'threads_count'
# parameter in the init_config of the check
default_size = min(self.instance_count(), DEFAULT_SIZE_POOL)
try:
default_size = min(self.instance_count(), multiprocessing.cpu_count() + 1)
except NotImplementedError:
default_size = min(self.instance_count(), DEFAULT_SIZE_POOL)
self.pool_size = int(self.init_config.get('threads_count', default_size))
self.timeout = int(self.agent_config.get('timeout', DEFAULT_TIMEOUT))
def start_pool(self):
self.log.info("Starting Thread Pool")
self.pool = ThreadPool(self.pool_size)
if threading.activeCount() > MAX_ALLOWED_THREADS:
self.log.error("Thread count ({0}) exceeds maximum ({1})".format(threading.activeCount(),
MAX_ALLOWED_THREADS))
self.running_jobs = set()
if self.pool is None:
self.log.info("Starting Thread Pool Exceutor")
self.pool = futures.ThreadPoolExecutor(max_workers=self.pool_size)
if threading.activeCount() > MAX_ALLOWED_THREADS:
self.log.error('Thread count (%d) exceeds maximum (%d)' % (threading.activeCount(),
MAX_ALLOWED_THREADS))
self.running_jobs = {}
def stop_pool(self):
self.log.info("Stopping Thread Pool")
if self.pool:
self.pool.close()
self.pool.join()
self.pool.shutdown(wait=True)
self.pool = None
def restart_pool(self):
@ -81,80 +83,63 @@ class ServicesCheck(monasca_agent.collector.checks.AgentCheck):
self.start_pool()
def check(self, instance):
if not self.pool:
self.start_pool()
self._process_results()
self.start_pool()
name = instance.get('name', None)
if name is None:
self.log.error('Each service check must have a name')
return
if name not in self.running_jobs:
if (name not in self.running_jobs) or self.running_jobs[name].done():
# A given instance should be processed one at a time
self.running_jobs.add(name)
self.pool.apply_async(self._process, args=(instance,))
self.running_jobs[name] = self.pool.submit(self._process, instance)
else:
self.log.info("Instance: %s skipped because it's already running." % name)
def _process(self, instance):
name = instance.get('name', None)
try:
with Timeout(self.timeout):
with eventlet.timeout.Timeout(self.timeout):
return_value = self._check(instance)
if not return_value:
return
status, msg = return_value
result = (status, msg, name, instance)
# We put the results in the result queue
self.resultsq.put(result)
except Timeout:
self.log.error('ServiceCheck {0} timed out'.format(name))
self._process_result(status, msg, name, instance)
except eventlet.Timeout:
msg = 'ServiceCheck {0} timed out'.format(name)
self.log.error(msg)
self._process_result(FAILURE, msg, name, instance)
except Exception:
self.log.exception('Failure in ServiceCheck {0}'.format(name))
result = (FAILURE, FAILURE, FAILURE, FAILURE)
self.resultsq.put(result)
msg = 'Failure in ServiceCheck {0}'.format(name)
self.log.exception(msg)
self._process_result(FAILURE, msg, name, instance)
finally:
self.running_jobs.remove(name)
del self.running_jobs[name]
def _process_results(self):
for i in range(MAX_LOOP_ITERATIONS):
try:
# We want to fetch the result in a non blocking way
status, msg, name, queue_instance = self.resultsq.get_nowait()
except Queue.Empty:
break
def _process_result(self, status, msg, name, queue_instance):
if name not in self.statuses:
self.statuses[name] = []
if status == FAILURE:
self.nb_failures += 1
if self.nb_failures >= self.pool_size - 1:
self.nb_failures = 0
self.restart_pool()
continue
self.statuses[name].append(status)
if name not in self.statuses:
self.statuses[name] = []
window = int(queue_instance.get('window', 1))
self.statuses[name].append(status)
if window > 256:
self.log.warning("Maximum window size (256) exceeded, defaulting it to 256")
window = 256
window = int(queue_instance.get('window', 1))
threshold = queue_instance.get('threshold', 1)
if window > 256:
self.log.warning("Maximum window size (256) exceeded, defaulting it to 256")
window = 256
if len(self.statuses[name]) > window:
self.statuses[name].pop(0)
threshold = queue_instance.get('threshold', 1)
nb_failures = self.statuses[name].count(Status.DOWN)
if len(self.statuses[name]) > window:
self.statuses[name].pop(0)
nb_failures = self.statuses[name].count(Status.DOWN)
if nb_failures >= threshold:
if self.notified.get(name, Status.UP) != Status.DOWN:
self.notified[name] = Status.DOWN
else:
if self.notified.get(name, Status.UP) != Status.UP:
self.notified[name] = Status.UP
if nb_failures >= threshold:
if self.notified.get(name, Status.UP) != Status.DOWN:
self.notified[name] = Status.DOWN
else:
if self.notified.get(name, Status.UP) != Status.UP:
self.notified[name] = Status.UP
def _check(self, instance):
"""This function should be implemented by inherited classes.

View File

@ -1,5 +1,5 @@
#!/usr/bin/env python
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015-2017 Hewlett Packard Enterprise Development LP
# Core modules
import glob
@ -53,20 +53,26 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
self.start_event = start_event
def _handle_sigterm(self, signum, frame):
log.debug("Caught sigterm. Stopping run loop.")
log.debug("Caught sigterm.")
self._stop(0)
sys.exit(0)
def _handle_sigusr1(self, signum, frame):
log.debug("Caught sigusrl.")
self._stop(120)
sys.exit(monasca_agent.common.daemon.AgentSupervisor.RESTART_EXIT_STATUS)
def _stop(self, timeout=0):
log.info("Stopping collector run loop.")
self.run_forever = False
if jmxfetch.JMXFetch.is_running():
jmxfetch.JMXFetch.stop()
if self.collector:
self.collector.stop()
log.debug("Collector is stopped.")
sys.exit(0)
self.collector.stop(timeout)
def _handle_sigusr1(self, signum, frame):
self._handle_sigterm(signum, frame)
self._do_restart()
log.info('collector stopped')
def run(self, config):
"""Main loop of the collector.
@ -94,6 +100,9 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
self.restart_interval = int(util.get_collector_restart_interval())
self.agent_start = time.time()
exitCode = 0
exitTimeout = 0
# Run the main loop.
while self.run_forever:
collection_start = time.time()
@ -125,7 +134,10 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
# Check if we should restart.
if self.autorestart and self._should_restart():
self._do_restart()
self.run_forever = False
exitCode = monasca_agent.common.daemon.AgentSupervisor.RESTART_EXIT_STATUS
exitTimeout = 120
log.info('Startng an auto restart')
# Only plan for the next loop if we will continue,
# otherwise just exit quickly.
@ -137,23 +149,18 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
log.info("Collection took {0} which is as long or longer then the configured collection frequency "
"of {1}. Starting collection again without waiting in result.".format(collection_time,
check_frequency))
self._stop(exitTimeout)
# Explicitly kill the process, because it might be running
# as a daemon.
log.info("Exiting. Bye bye.")
sys.exit(0)
log.info("Exiting collector daemon, code %d." % exitCode)
os._exit(exitCode)
def _should_restart(self):
if time.time() - self.agent_start > self.restart_interval:
return True
return False
def _do_restart(self):
log.info("Running an auto-restart.")
if self.collector:
self.collector.stop()
sys.exit(monasca_agent.common.daemon.AgentSupervisor.RESTART_EXIT_STATUS)
def main():
options, args = util.get_parsed_args()

View File

@ -9,7 +9,6 @@ oslo.i18n>=2.1.0 # Apache-2.0
oslo.utils>=3.18.0 # Apache-2.0
oslo.vmware>=2.11.0 # Apache-2.0
PyYAML>=3.10.0 # MIT
gevent>=1.1.1
httplib2>=0.7.5 # MIT
netaddr>=0.7.13,!=0.7.16 # BSD
ntplib>=0.3.2,<0.4
@ -22,3 +21,5 @@ six>=1.9.0 # MIT
supervisor>=3.1.3,<3.4
stevedore>=1.17.1 # Apache-2.0
tornado>=4.3
futures>=2.1.3
eventlet!=0.18.3,>=0.18.2 # MIT

View File

@ -0,0 +1,42 @@
# (C) Copyright 2017 Hewlett Packard Enterprise Development LP
from unittest import TestCase
from eventlet.green import time
from monasca_agent.collector.checks.services_checks import ServicesCheck
from monasca_agent.collector.checks.services_checks import Status
class DummyServiceCheck(ServicesCheck):
def __init__(self, name, init_config, agent_config, instances=None):
super(DummyServiceCheck, self).__init__(name, init_config, agent_config, instances)
def _check(self, instance):
w = instance.get('service_wait', 5)
time.sleep(w)
return Status.UP, "UP"
def set_timeout(self, timeout):
self.timeout = timeout
class TestServicesCheck(TestCase):
def setUp(self):
TestCase.setUp(self)
pass
def test_service_check_timeout(self):
init_config = {}
agent_config = {'timeout': 4}
instances = []
for wait in [1,2, 6, 8]:
instances.append({'service_wait': wait, 'name' : 'dummy %d' % wait})
self.dummy_service = DummyServiceCheck("dummy service", init_config, agent_config, instances=instances)
self.dummy_service.run()
time.sleep(10)
self.assertEqual(self.dummy_service.statuses['dummy 1'][0], Status.UP)
self.assertEqual(self.dummy_service.statuses['dummy 2'][0], Status.UP)
self.assertEqual(self.dummy_service.statuses['dummy 6'][0], "FAILURE")
self.assertEqual(self.dummy_service.statuses['dummy 8'][0], "FAILURE")