Use multiple threads to run the plugins
This is to handle the case of long running plugins that added together can exceed the collection frequency. Running the plugins on multiple threads should bring the collection time down close to the longest running plugin A Thread pool is created and used if the new parameter collector_threads is greater than 1 Due to the Global Interpreter Lock, this will only work well if the checks are I/O bound which seems to be the case so far Changed the logging messages to reference plugins instead of checks to match the usage in docs/ Plugins are run in reverse order of last collection time. Plugins that are still running at the end of a collection cycle are skipped in the next cycle. If all of the pool threads are blocked too long by plugins, the collector will exit Plugins are always run in the thread pool even if pool_size is 1 so that the other changes above still work Change-Id: Ib50f7fe50a26c949a0066baa3d73ff4a25a8de84
This commit is contained in:
@ -72,6 +72,13 @@ Main:
# time to wait between collection runs
check_freq: {args.check_frequency}
# Number of Collector Threads to run
num_collector_threads: {args.num_collector_threads}
# Maximum number of collection cycles where all of the threads in the pool are
# still running plugins before the collector will exit
pool_full_max_retries: {args.pool_full_max_retries}
# Threshold value for warning on collection time of each check (in seconds)
sub_collection_warn: 6
@ -94,6 +94,8 @@ All parameters require a '--' before the parameter such as '--verbose'. Run `mon
| project_domain_name | Project domain name for keystone authentication | |
| project_id | Keystone project id for keystone authentication | |
| check_frequency | How often to run metric collection in seconds | 60 |
| num_collector_threads | Number of threads to use in collector for running checks | 1 |
| pool_full_max_retries | Maximum number of collection cycles where all of the threads in the pool are still running plugins before the collector will exit| 4 |
| keystone_url | This is a required parameter that specifies the url of the keystone api for retrieving tokens. It must be a v3 endpoint. | |
| dimensions | A comma separated list of key:value pairs to include as dimensions in all submitted metrics| region:a,az:1 |
| service | This is an optional parameter that specifies the name of the service associated with this particular node | nova, cinder, myservice |
@ -231,6 +233,13 @@ A plugin config is specified something like this:
# Running
The monasca-setup command will create an appropriate startup script for the agent and so the agent can be run by using the standard daemon control tool for your operating system. If you have configured manually the startup script templates can be found in the code under the packaging directory.
# Running the collector with multiple threads
The number of threads to use for running the plugins is via num_collector_threads. Setting this value to greater than 1 can be very useful when some plugins take a relatively long time to run. With num_collector_threads set to 1, the plugins are run serially. If the sum of the collection times for each plugin is greater than the check_frequency, then the metrics will not be collected as often as they should be. With more threads, the collection time is closer to the longest plugin collection time.
The collector is optimized for collecting as many metrics on schedule as possible. The plugins are run in reverse order of their collection time, i.e., the fastest plugin first. Also, if a plugin does not complete within the collection frequency, that plugin will be skipped in the next collection cycle. These two optimizations together ensure that plugins that complete with collection frequency seconds will get run on every collection cycle.
If there is some problem with multiple plugins that end up blocking the entire thread pool, the collector will exit so that it can be restarted by the supervisord. The parameter pool_full_max_retries controls when this happens. If pool_full_max_retries consecutive collection cycles have ended with the Thread Pool completely full, the collector will exit.
Some of the plugins have their own thread pools to handle asynchronous checks. The collector thread pool is separate and has no special interaction with those thread pools.
# License
(C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
@ -2,6 +2,8 @@
# Core modules
import logging
from multiprocessing.dummy import Pool
import os
import socket
import threading
import time
@ -14,7 +16,6 @@ log = logging.getLogger(__name__)
@ -27,7 +28,7 @@ class Collector(util.Dimensions):
passing it along to the emitters, who send it to their final destination.
def __init__(self, agent_config, emitter, checksd=None):
def __init__(self, agent_config, emitter, checksd):
super(Collector, self).__init__(agent_config)
self.agent_config = agent_config
self.os = util.get_os()
@ -36,14 +37,22 @@ class Collector(util.Dimensions):
self.run_count = 0
self.continue_running = True
self.initialized_checks_d = []
self.init_failed_checks_d = []
self.collection_metrics = {}
if checksd:
# is of type {check_name: check}
self.initialized_checks_d = checksd['initialized_checks']
# is of type {check_name: {error, traceback}}
self.init_failed_checks_d = checksd['init_failed_checks']
# is of type {check_name: check}
initialized_checks_d = checksd['initialized_checks']
self.pool_size = int(self.agent_config.get('num_collector_threads', 1))
||||'Using %d Threads for Collector' % self.pool_size)
self.pool = Pool(self.pool_size)
self.pool_full_count = 0
self.collection_times = {}
self.collection_results = {}
for check in initialized_checks_d:
self.collection_times[] = {'check': check,
'last_collect_time': 99999999}
self.pool_full_max_retries = int(self.agent_config.get('pool_full_max_retries',
def _emit(self, payload):
"""Send the payload via the emitter.
@ -67,81 +76,170 @@ class Collector(util.Dimensions):
log.debug("Finished run #%s. Collection time: %.2fs." %
(self.run_count, round(collect_duration, 2),))
def add_collection_metric(self, name, value):
self.collection_metrics[name] = value
def collector_stats(self, num_metrics, collection_time):
metrics = {}
thread_count = threading.active_count()
metrics['monasca.thread_count'] = thread_count
self.add_collection_metric('monasca.thread_count', thread_count)
if thread_count > MAX_THREADS_COUNT:
log.warn("Collector thread count is high: %d" % thread_count)
metrics['monasca.collection_time_sec'] = collection_time
if collection_time > MAX_COLLECTION_TIME:
||||"Collection time (s) is high: %.1f, metrics count: %d" %
(collection_time, num_metrics))
self.add_collection_metric('monasca.collection_time_sec', collection_time)
return metrics
def run(self):
def run(self, check_frequency):
"""Collect data from each check and submit their data.
There are currently two types of checks the system checks and the configured ones from checks_d
Also, submit a metric which is how long the checks_d took
timer = util.Timer()
self.run_count += 1
log.debug("Starting collection run #%s" % self.run_count)
# checks_d checks
num_metrics = self.run_checks_d()
num_metrics = self.run_checks_d(check_frequency)
collect_duration = timer.step()
# Warn if collection time is approaching the collection period
if collect_duration > (4 * check_frequency / 5):
log.warn("Collection time (s) is high: %.1f, metrics count: %d" %
(collect_duration, num_metrics))
self.collector_stats(num_metrics, collect_duration)
collect_stats = []
dimensions = {'component': 'monasca-agent', 'service': 'monitoring'}
# Add in metrics on the collector run
for name, value in self.collector_stats(num_metrics, collect_duration).iteritems():
for name, value in self.collection_metrics.iteritems():
# Persist the status of the collection run.
def run_checks_d(self):
"""Run defined checks_d checks.
def run_single_check(self, check):
"""Run a single check
returns a list of Measurements.
returns number of measurement collected, colleciton time
sub_timer = util.Timer()
count = 0
log.debug("Running plugin %s" %
# Run the check.
current_check_metrics = check.get_metrics()
# Emit the metrics after each check
# Save the status of the check.
count += len(current_check_metrics)
except Exception:
log.exception("Error running plugin %s" %
sub_collect_duration = sub_timer.step()
sub_collect_duration_mills = sub_collect_duration * 1000
log.debug("Finished plugin %s run. Collection time: %.2fms %d Metrics." % (
||||, round(sub_collect_duration_mills, 2), count))
if sub_collect_duration > util.get_sub_collection_warn():
log.warn("Collection time for check %s is high: %.2fs." % (
||||, round(sub_collect_duration, 2)))
return count, sub_collect_duration_mills
def wait_for_results(self, check_frequency, start_time):
"""Wait either for all running checks to finish or
for check_frequency seconds, whichever comes first
returns number of measurements collected
# Make sure we check for results at least once
wait_time = check_frequency / 10
measurements = 0
for check in self.initialized_checks_d:
time_left = check_frequency
while time_left > 0 and self.collection_results:
for check_name in list(self.collection_results.keys()):
result = self.collection_results[check_name]['result']
if result.ready():
log.debug('Plugin %s has completed' % check_name)
if not result.successful():
log.error('Plugin %s failed' % check_name)
count, collect_time = result.get()
measurements += count
self.collection_times[check_name]['last_collect_time'] = collect_time
del self.collection_results[check_name]
log.debug('Plugin %s still running' % check_name)
time_left = start_time + check_frequency - time.time()
return measurements
def start_checks_in_thread_pool(self, start_time):
"""Add the checks that are not already running to the Thread Pool
# Sort by the last collection time so the checks that take the
# least amount of time are run first so they are more likely to
# complete within the check_frequency
sorted_checks = sorted(self.collection_times.itervalues(),
key=lambda x: x['last_collect_time'])
for entry in sorted_checks:
check = entry['check']
last_collect_time = entry['last_collect_time']
if not self.continue_running:
log.debug("Running check %s" %
# Run the check.
if in self.collection_results:
log.warning('Plugin %s is already running, skipping' %
log.debug('Starting plugin %s, old collect time %d' %
(, last_collect_time))
async_result = self.pool.apply_async(self.run_single_check, [check])
self.collection_results[] = {'result': async_result,
'start_time': start_time}
current_check_metrics = check.get_metrics()
def run_checks_d(self, check_frequency):
"""Run defined checks_d checks using the Thread Pool.
# Emit the metrics after each check
returns number of Measurements.
# Save the status of the check.
measurements += len(current_check_metrics)
start_time = time.time()
except Exception:
log.exception("Error running check %s" %
measurements = self.wait_for_results(check_frequency, start_time)
sub_collect_duration = sub_timer.step()
sub_collect_duration_mills = sub_collect_duration * 1000
log.debug("Finished run check %s. Collection time: %.2fms." % (
||||, round(sub_collect_duration_mills, 2)))
if sub_collect_duration > util.get_sub_collection_warn():
log.warn("Collection time for check %s is high: %.2fs." % (
||||, round(sub_collect_duration, 2)))
# See if any checks are still running
if self.collection_results:
# Output a metric that can be used for Alarming. This metric is only
# emitted when there are checks running too long so a deterministic
# Alarm Definition should be created when monitoring it
for check_name in self.collection_results:
run_time = time.time() - self.collection_results[check_name]['start_time']
log.warning('Plugin %s still running after %d seconds' % (
check_name, run_time))
if len(self.collection_results) >= self.pool_size:
self.pool_full_count += 1
if (self.pool_full_count > self.pool_full_max_retries):
log.error('Thread Pool full and %d plugins still running for ' +
'%d collection cycles, exiting' %
(len(self.collection_results), self.pool_full_count))
self.pool_full_count = 0
return measurements
@ -155,5 +253,9 @@ class Collector(util.Dimensions):
# in which case we'll get a misleading error in the logs.
# Best to not even try.
self.continue_running = False
for check in self.initialized_checks_d:
for check_name in self.collection_times:
check = self.collection_times[check_name]['check']
# Can't call self.pool.join() because this is an event thread
# and that will cause a BlockingSwitchOutError
@ -106,7 +106,7 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
log.warn("Cannot enable profiler")
# Do the work.
# disable profiler and printout stats to stdout
if config.get('profile', False) and config.get('profile').lower() == 'yes' and profiled:
@ -170,6 +170,13 @@ def modify_config(args, detected_config):
return changes
def validate_positive(value):
int_value = int(value)
if int_value <= 0:
raise argparse.ArgumentTypeError("%s must be greater than zero" % value)
return int_value
def parse_arguments(parser):
'-u', '--username', help="Username used for keystone authentication. Required for basic configuration.")
@ -192,7 +199,14 @@ def parse_arguments(parser):
"This assumes the base config has already run.")
parser.add_argument('-a', '--detection_args', help="A string of arguments that will be passed to detection " +
"plugins. Only certain detection plugins use arguments.")
parser.add_argument('--check_frequency', help="How often to run metric collection in seconds", type=int, default=30)
parser.add_argument('--check_frequency', help="How often to run metric collection in seconds",
type=validate_positive, default=30)
parser.add_argument('--num_collector_threads', help="Number of Threads to use in Collector " +
"for running checks", type=validate_positive, default=1)
parser.add_argument('--max_pool_full_count', help="Maximum number of collection cycles where all of the threads " +
"in the pool are still running plugins before the " +
"collector will exit and be restart",
type=validate_positive, default=4)
parser.add_argument('--dimensions', help="Additional dimensions to set for all metrics. A comma separated list " +
"of name/value pairs, 'name:value,name2:value2'")
parser.add_argument('--ca_file', help="Sets the path to the ca certs file if using certificates. " +
Reference in New Issue
Block a user