Rewrote config processing to simplify logic.

Reorganized and created classes for the methods in config.py.
Added more sections to the agent config file.
Moved a lot of functions out of config.py that were more
utility-like to util.py.

Change-Id: I19fc709fb165895382099b49fd9c83991ca1183d
This commit is contained in:
gary-hessler 2015-01-06 17:01:34 -07:00
parent 6c9db3c622
commit 39ade0e4dd
17 changed files with 900 additions and 1201 deletions

View File

@ -78,6 +78,25 @@ use_mount: no
# https://github.com/DataDog/dd-agent/wiki/Network-Traffic-and-Proxy-Configuration
# non_local_traffic: no
# ========================================================================== #
# System Metrics configuration #
# ========================================================================== #
# Enabled categories of system metrics to collect
# Current system metrics available: cpu,disk,io,load,memory
system_metrics: cpu,disk,io,load,memory
# Disk and IO #
# Some infrastrucures have many constantly changing virtual devices (e.g. folks
# running constantly churning linux containers) whose metrics aren't
# interesting. To filter out a particular pattern of devices
# from disk and IO collection, configure a regex here:
# device_blacklist_re: .*\/dev\/mapper\/lxc-box.*
# For disk metrics it is also possible to ignore entire filesystem types
ignore_filesystem_types: tmpfs,devtmpfs
[Statsd]
# ========================================================================== #
# Monasca Statsd configuration #
# ========================================================================== #
@ -98,26 +117,10 @@ monasca_statsd_port : 8125
# to another statsd server, uncomment these lines.
# WARNING: Make sure that forwarded packets are regular statsd packets and not "monasca_statsd" packets,
# as your other statsd server might not be able to handle them.
# statsd_forward_host: address_of_own_statsd_server
# statsd_forward_port: 8125
# ========================================================================== #
# System Metrics configuration #
# ========================================================================== #
# Enabled categories of system metrics to collect
system_metrics: cpu,disk,io,load,memory
# Disk and IO #
# Some infrastrucures have many constantly changing virtual devices (e.g. folks
# running constantly churning linux containers) whose metrics aren't
# interesting. To filter out a particular pattern of devices
# from disk and IO collection, configure a regex here:
# device_blacklist_re: .*\/dev\/mapper\/lxc-box.*
# For disk metrics it is also possible to ignore entire filesystem types
ignore_filesystem_types: tmpfs,devtmpfs
# monasca_statsd_forward_host: address_of_own_statsd_server
# monasca_statsd_statsd_forward_port: 8125
[Logging]
# ========================================================================== #
# Logging
# ========================================================================== #

View File

@ -46,7 +46,6 @@ class Check(util.Dimensions):
# None: [(ts, value), (ts, value)]}
# untagged values are indexed by None
super(Check, self).__init__(agent_config)
self.agent_config = agent_config
self._sample_store = {}
self._counters = {} # metric_name: bool
self.logger = logger
@ -279,8 +278,7 @@ class AgentCheck(util.Dimensions):
super(AgentCheck, self).__init__(agent_config)
self.name = name
self.init_config = init_config
self.agent_config = agent_config
self.hostname = util.get_hostname(agent_config)
self.hostname = util.get_hostname()
self.log = logging.getLogger('%s.%s' % (__name__, name))
self.aggregator = aggregator.MetricsAggregator(self.hostname,
@ -607,8 +605,9 @@ def run_check(name, path=None):
import tests.common
# Read the config file
confd_path = path or os.path.join(config.get_confd_path(util.get_os()),
'%s.yaml' % name)
config = Config()
confd_path = path or os.path.join(config.get_confd_path(),
'{}.yaml'.format(name))
try:
f = open(confd_path)

View File

@ -31,8 +31,8 @@ class Collector(util.Dimensions):
def __init__(self, agent_config, emitter, checksd=None):
super(Collector, self).__init__(agent_config)
self.emit_duration = None
self.agent_config = agent_config
self.emit_duration = None
self.os = util.get_os()
self.plugins = None
self.emitter = emitter
@ -59,7 +59,7 @@ class Collector(util.Dimensions):
self._checks = []
# Only setup the configured system checks
for check in self.agent_config.get('system_metrics', []):
self._checks.append(possible_checks[check](log, agent_config))
self._checks.append(possible_checks[check](log, self.agent_config))
if checksd:
# is of type {check_name: check}

View File

@ -38,8 +38,8 @@ class ServicesCheck(monasca_agent.collector.checks.AgentCheck):
when the service turns down.
"""
def __init__(self, name, init_config, agentConfig, instances):
monasca_agent.collector.checks.AgentCheck.__init__(self, name, init_config, agentConfig, instances)
def __init__(self, name, init_config, agent_config, instances):
monasca_agent.collector.checks.AgentCheck.__init__(self, name, init_config, agent_config, instances)
# A dictionary to keep track of service statuses
self.statuses = {}

View File

@ -13,7 +13,7 @@ from httplib2 import HttpLib2Error
import monasca_agent.collector.checks.services_checks as services_checks
import monasca_agent.common.keystone as keystone
import monasca_agent.common.config as cfg
class HTTPCheck(services_checks.ServicesCheck):
@ -47,7 +47,8 @@ class HTTPCheck(services_checks.ServicesCheck):
def _check(self, instance):
addr, username, password, timeout, headers, response_time, dimensions, disable_ssl_validation, pattern, use_keystone = self._load_conf(
instance)
config = cfg.Config()
api_config = config.get_config('Api')
content = ''
new_dimensions = self._set_dimensions({'url': addr})
@ -59,7 +60,6 @@ class HTTPCheck(services_checks.ServicesCheck):
retry = False
while not done or retry:
if use_keystone:
api_config = self.agent_config['Api']
key = keystone.Keystone(api_config)
token = key.get_token()
if token:

View File

@ -12,13 +12,13 @@ import time
import checks.collector
import jmxfetch
import monasca_agent.common.check_status
import monasca_agent.common.config
import monasca_agent.common.config as cfg
import monasca_agent.common.daemon
import monasca_agent.common.emitter
import monasca_agent.common.util
import monasca_agent.common.util as util
# set up logging before importing any other components
monasca_agent.common.config.initialize_logging('collector')
util.initialize_logging('collector')
os.umask(022)
# Check we're not using an old version of Python. We need 2.4 above because
@ -70,11 +70,10 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
logging.getLogger().setLevel(logging.ERROR)
return monasca_agent.common.check_status.CollectorStatus.print_latest_status(verbose=verbose)
def run(self, config=None):
def run(self, config):
"""Main loop of the collector.
"""
# Gracefully exit on sigterm.
signal.signal(signal.SIGTERM, self._handle_sigterm)
@ -87,12 +86,9 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
# Save the agent start-up stats.
monasca_agent.common.check_status.CollectorStatus().persist()
# Intialize the collector.
if config is None:
config = monasca_agent.common.config.get_config(parse_args=True)
# Load the checks_d checks
checksd = monasca_agent.common.config.load_check_directory(config)
checksd = util.load_check_directory()
self.collector = checks.collector.Collector(config, monasca_agent.common.emitter.http_emitter, checksd)
# Configure the watchdog.
@ -160,9 +156,9 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
def _get_watchdog(check_freq, agentConfig):
watchdog = None
if agentConfig.get("watchdog", True):
watchdog = monasca_agent.common.util.Watchdog(check_freq * WATCHDOG_MULTIPLIER,
max_mem_mb=agentConfig.get('limit_memory_consumption',
None))
watchdog = util.Watchdog(check_freq * WATCHDOG_MULTIPLIER,
max_mem_mb=agentConfig.get('limit_memory_consumption',
None))
watchdog.reset()
return watchdog
@ -179,10 +175,11 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
def main():
options, args = monasca_agent.common.config.get_parsed_args()
agentConfig = monasca_agent.common.config.get_config(options=options)
options, args = util.get_parsed_args()
config = cfg.Config()
collector_config = config.get_config(['Main', 'Api', 'Logging'])
# todo autorestart isn't used remove
autorestart = agentConfig.get('autorestart', False)
autorestart = collector_config.get('autorestart', False)
COMMANDS = [
'start',
@ -206,7 +203,7 @@ def main():
sys.stderr.write("Unknown command: %s\n" % command)
return 3
pid_file = monasca_agent.common.util.PidFile('monasca-agent')
pid_file = util.PidFile('monasca-agent')
if options.clean:
pid_file.clean()
@ -214,7 +211,7 @@ def main():
agent = CollectorDaemon(pid_file.get_path(), autorestart)
if command in START_COMMANDS:
log.info('Agent version %s' % monasca_agent.common.config.get_version())
log.info('Agent version %s' % config.get_version())
if 'start' == command:
log.info('Start daemon')
@ -249,11 +246,11 @@ def main():
monasca_agent.common.daemon.AgentSupervisor.start(parent_func, child_func)
else:
# Run in the standard foreground.
agent.run(config=agentConfig)
agent.run(collector_config)
elif 'check' == command:
check_name = args[1]
checks = monasca_agent.common.config.load_check_directory(agentConfig)
checks = util.load_check_directory()
for check in checks['initialized_checks']:
if check.name == check_name:
check.run()
@ -268,7 +265,7 @@ def main():
elif 'check_all' == command:
print("Loading check directory...")
checks = monasca_agent.common.config.load_check_directory(agentConfig)
checks = util.load_check_directory()
print("...directory loaded.\n")
for check in checks['initialized_checks']:
print("#" * 80)
@ -279,12 +276,13 @@ def main():
print("#" * 80 + "\n\n")
elif 'configcheck' == command or 'configtest' == command:
osname = monasca_agent.common.util.get_os()
osname = util.get_os()
all_valid = True
for conf_path in glob.glob(os.path.join(monasca_agent.common.config.get_confd_path(osname), "*.yaml")):
paths = util.Paths()
for conf_path in glob.glob(os.path.join(paths.get_confd_path(), "*.yaml")):
basename = os.path.basename(conf_path)
try:
monasca_agent.common.config.check_yaml(conf_path)
config.check_yaml(conf_path)
except Exception as e:
all_valid = False
print("%s contains errors:\n %s" % (basename, e))
@ -303,11 +301,11 @@ def main():
if len(args) < 2 or args[1] not in jmxfetch.JMX_LIST_COMMANDS.keys():
print("#" * 80)
print("JMX tool to be used to help configuring your JMX checks.")
print("JMX tool to be used to help configure your JMX checks.")
print("See http://docs.datadoghq.com/integrations/java/ for more information")
print("#" * 80)
print("\n")
print("You have to specify one of the following command:")
print("You have to specify one of the following commands:")
for command, desc in jmxfetch.JMX_LIST_COMMANDS.iteritems():
print(" - %s [OPTIONAL: LIST OF CHECKS]: %s" % (command, desc))
print("Example: sudo /etc/init.d/monasca-agent jmx list_matching_attributes tomcat jmx solr")
@ -316,18 +314,18 @@ def main():
else:
jmx_command = args[1]
checks_list = args[2:]
confd_directory = monasca_agent.common.config.get_confd_path(monasca_agent.common.util.get_os())
should_run = jmxfetch.JMXFetch.init(
confd_directory,
agentConfig,
monasca_agent.common.config.get_logging_config(),
15,
jmx_command,
checks_list,
reporter="console")
paths = util.Paths()
confd_path = paths.get_confd_path()
# Start JMXFetch if needed
should_run = jmxfetch.JMXFetch.init(confd_path,
config,
15,
jmx_command,
checks_list,
reporter="console")
if not should_run:
print("Couldn't find any valid JMX configuration in your conf.d directory: %s" % confd_directory)
print("Have you enabled any JMX check ?")
print("Couldn't find any valid JMX configuration in your conf.d directory: %s" % confd_path)
print("Have you enabled any JMX checks ?")
return 0

View File

@ -54,8 +54,8 @@ class JMXFetch(object):
pid_file_path = pid_file.get_path()
@classmethod
def init(cls, confd_path, agentConfig, logging_config,
default_check_frequency, command=None, checks_list=None, reporter=None):
def init(cls, confd_path, agent_config, default_check_frequency,
command=None, checks_list=None, reporter=None):
try:
command = command or JMX_COLLECT_COMMAND
jmx_checks, invalid_checks, java_bin_path, java_options = JMXFetch.should_run(
@ -71,8 +71,8 @@ class JMXFetch(object):
log.warning("JMXFetch is already running, restarting it.")
JMXFetch.stop()
JMXFetch.start(confd_path, agentConfig, logging_config,
java_bin_path, java_options, default_check_frequency,
JMXFetch.start(confd_path, agent_config, java_bin_path,
java_options, default_check_frequency,
jmx_checks, command, reporter)
return True
except Exception:
@ -299,9 +299,9 @@ class JMXFetch(object):
os.path.join(os.path.abspath(__file__), "..", "../../", "jmxfetch", JMX_FETCH_JAR_NAME))
@classmethod
def start(cls, confd_path, agentConfig, logging_config, path_to_java, java_run_opts,
def start(cls, confd_path, agent_config, path_to_java, java_run_opts,
default_check_frequency, jmx_checks, command, reporter=None):
statsd_port = agentConfig.get('monasca_statsd_port', "8125")
statsd_port = agent_config.get('monasca_statsd_port', "8125")
if reporter is None:
reporter = "statsd:%s" % str(statsd_port)
@ -323,9 +323,9 @@ class JMXFetch(object):
# Path of the conf.d directory that will be read by jmxfetch,
'--conf_directory', r"%s" % confd_path,
# Log Level: Mapping from Python log level to log4j log levels
'--log_level', JAVA_LOGGING_LEVEL.get(logging_config.get("log_level"), "INFO"),
'--log_level', JAVA_LOGGING_LEVEL.get(agent_config.get("log_level"), "INFO"),
# Path of the log file
'--log_location', r"%s" % logging_config.get('jmxfetch_log_file'),
'--log_location', r"%s" % agent_config.get('jmxfetch_log_file'),
'--reporter', reporter, # Reporter to use
# Path to the status file to write
'--status_location', r"%s" % path_to_status_file,

View File

@ -14,8 +14,9 @@ import tempfile
import time
# project
import collections
import config
from collections import defaultdict
import util
import yaml
# 3rd party
@ -30,7 +31,6 @@ NTP_OFFSET_THRESHOLD = 600
log = logging.getLogger(__name__)
class Stylizer(object):
STYLES = {
@ -108,6 +108,7 @@ class AgentStatus(object):
"""
NAME = None
agent_config = config.Config()
def __init__(self):
self.created_at = datetime.datetime.now()
@ -141,7 +142,7 @@ class AgentStatus(object):
@classmethod
def _title_lines(cls):
name_line = "%s (v %s)" % (cls.NAME, config.get_version())
name_line = "%s (v %s)" % (cls.NAME, AgentStatus.agent_config.get_version())
lines = [
"=" * len(name_line),
"%s" % name_line,
@ -348,16 +349,15 @@ class CollectorStatus(AgentStatus):
''
]
osname = config.get_os()
paths = util.Paths()
try:
confd_path = config.get_confd_path(osname)
except config.PathNotFound:
confd_path = paths.get_confd_path()
except util.PathNotFound:
confd_path = 'Not found'
try:
checksd_path = config.get_checksd_path(osname)
except config.PathNotFound:
checksd_path = paths.get_checksd_path()
except util.PathNotFound:
checksd_path = 'Not found'
lines.append(' conf.d: ' + confd_path)
@ -499,13 +499,14 @@ class CollectorStatus(AgentStatus):
osname = config.get_os()
paths = util.Paths()
try:
status_info['confd_path'] = config.get_confd_path(osname)
status_info['confd_path'] = paths.get_confd_path()
except config.PathNotFound:
status_info['confd_path'] = 'Not found'
try:
status_info['checksd_path'] = config.get_checksd_path(osname)
status_info['checksd_path'] = paths.get_checksd_path()
except config.PathNotFound:
status_info['checksd_path'] = 'Not found'
@ -651,7 +652,7 @@ def get_jmx_status():
(java_status_path, python_status_path))
return []
check_data = defaultdict(lambda: defaultdict(list))
check_data = collections.defaultdict(lambda: collections.defaultdict(list))
try:
if os.path.exists(java_status_path):
java_jmx_stats = yaml.load(file(java_status_path))

File diff suppressed because it is too large Load Diff

View File

@ -2,19 +2,14 @@ import logging
from keystoneclient.v3 import client as ksclient
import monasca_agent.common.singleton as singleton
log = logging.getLogger(__name__)
class Keystone(object):
# Make this a singleton class so we don't get the token every time
# the class is created
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(Keystone, cls).__new__(
cls, *args, **kwargs)
return cls._instance
__metaclass__ = singleton.Singleton
def __init__(self, config):
self.config = config

View File

@ -0,0 +1,9 @@
class Singleton(type):
def __init__(cls, name, bases, dict):
super(Singleton, cls).__init__(name, bases, dict)
cls.instance = None
def __call__(cls,*args,**kw):
if cls.instance is None:
cls.instance = super(Singleton, cls).__call__(*args, **kw)
return cls.instance

View File

@ -1,189 +1,45 @@
from hashlib import md5
import inspect
import imp
import itertools
import glob
import math
import hashlib
import optparse
import os
import platform
import re
import signal
import socket
import subprocess
import sys
import math
import time
import uuid
import tempfile
import re
import time
import traceback
import uuid
import logging
log = logging.getLogger(__name__)
# Tornado
try:
from tornado import ioloop, version_info as tornado_version
except ImportError:
pass # We are likely running the agent without the forwarder and tornado is not installed
# We are likely running the agent without the forwarder and tornado is not installed
# Generate a warning
log.warn('Tornado web server is not installed. Metrics will not be forwarded to the Monasca API!')
VALID_HOSTNAME_RFC_1123_PATTERN = re.compile(
r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$")
MAX_HOSTNAME_LEN = 255
import logging
log = logging.getLogger(__name__)
LOGGING_MAX_BYTES = 5 * 1024 * 1024
NumericTypes = (float, int, long)
def plural(count):
if count == 1:
return ""
return "s"
import monasca_agent.common.config as configuration
def get_tornado_ioloop():
if tornado_version[0] == 3:
return ioloop.IOLoop.current()
else:
return ioloop.IOLoop.instance()
def get_uuid():
# Generate a unique name that will stay constant between
# invocations, such as platform.node() + uuid.getnode()
# Use uuid5, which does not depend on the clock and is
# recommended over uuid3.
# This is important to be able to identify a server even if
# its drives have been wiped clean.
# Note that this is not foolproof but we can reconcile servers
# on the back-end if need be, based on mac addresses.
return uuid.uuid5(uuid.NAMESPACE_DNS, platform.node() + str(uuid.getnode())).hex
def get_os():
"Human-friendly OS name"
if sys.platform == 'darwin':
return 'mac'
elif sys.platform.find('freebsd') != -1:
return 'freebsd'
elif sys.platform.find('linux') != -1:
return 'linux'
elif sys.platform.find('win32') != -1:
return 'windows'
elif sys.platform.find('sunos') != -1:
return 'solaris'
else:
return sys.platform
def headers(agentConfig):
# Build the request headers
return {
'User-Agent': 'Mon Agent/%s' % agentConfig['version'],
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'text/html, */*',
}
def getTopIndex():
macV = None
if sys.platform == 'darwin':
macV = platform.mac_ver()
# Output from top is slightly modified on OS X 10.6 (case #28239)
if macV and macV[0].startswith('10.6.'):
return 6
else:
return 5
def isnan(val):
if hasattr(math, 'isnan'):
return math.isnan(val)
# for py < 2.6, use a different check
# http://stackoverflow.com/questions/944700/how-to-check-for-nan-in-python
return str(val) == str(1e400 * 0)
def cast_metric_val(val):
# ensure that the metric value is a numeric type
if not isinstance(val, NumericTypes):
# Try the int conversion first because want to preserve
# whether the value is an int or a float. If neither work,
# raise a ValueError to be handled elsewhere
for cast in [int, float]:
try:
val = cast(val)
return val
except ValueError:
continue
raise ValueError
return val
def is_valid_hostname(hostname):
if hostname.lower() in {'localhost', 'localhost.localdomain',
'localhost6.localdomain6', 'ip6-localhost'}:
log.warning("Hostname: %s is local" % hostname)
return False
if len(hostname) > MAX_HOSTNAME_LEN:
log.warning("Hostname: %s is too long (max length is %s characters)" %
(hostname, MAX_HOSTNAME_LEN))
return False
if VALID_HOSTNAME_RFC_1123_PATTERN.match(hostname) is None:
log.warning("Hostname: %s is not complying with RFC 1123" % hostname)
return False
return True
def get_hostname(config=None):
"""
Get the canonical host name this agent should identify as. This is
the authoritative source of the host name for the agent.
Tries, in order:
* agent config (agent.conf, "hostname:")
* 'hostname -f' (on unix)
* socket.gethostname()
"""
hostname = None
# first, try the config
if config is None:
from monasca_agent.common.config import get_config
config = get_config(parse_args=True)
config_hostname = config.get('hostname')
if config_hostname and is_valid_hostname(config_hostname):
return config_hostname
# then move on to os-specific detection
if hostname is None:
def _get_hostname_unix():
try:
# try fqdn
p = subprocess.Popen(['/bin/hostname', '-f'], stdout=subprocess.PIPE)
out, err = p.communicate()
if p.returncode == 0:
return out.strip()
except Exception:
return None
os_name = get_os()
if os_name in ['mac', 'freebsd', 'linux', 'solaris']:
unix_hostname = _get_hostname_unix()
if unix_hostname and is_valid_hostname(unix_hostname):
hostname = unix_hostname
# fall back on socket.gethostname(), socket.getfqdn() is too unreliable
if hostname is None:
try:
socket_hostname = socket.gethostname()
except socket.error:
socket_hostname = None
if socket_hostname and is_valid_hostname(socket_hostname):
hostname = socket_hostname
if hostname is None:
log.critical(
'Unable to reliably determine host name. You can define one in agent.conf or in your hosts file')
raise Exception(
'Unable to reliably determine host name. You can define one in agent.conf or in your hosts file')
else:
return hostname
class PathNotFound(Exception):
pass
class Watchdog(object):
@ -302,7 +158,7 @@ class LaconicFilter(logging.Filter):
@staticmethod
def hash(msg):
return md5(msg).hexdigest()
return hashlib.md5(msg).hexdigest()
def filter(self, record):
try:
@ -397,8 +253,8 @@ class Dimensions(object):
Class to update the default dimensions.
"""
def __init__(self, config):
self.agent_config = config
def __init__(self, agent_config):
self.agent_config = agent_config
def _set_dimensions(self, dimensions):
"""Method to append the default dimensions from the config file.
@ -406,13 +262,509 @@ class Dimensions(object):
new_dimensions = {'hostname': get_hostname()}
if dimensions is not None:
new_dimensions.update(dimensions.copy())
default_dimensions = self.agent_config['Api'].get('dimensions', {})
default_dimensions = self.agent_config.get('dimensions', {})
if default_dimensions:
# Add or update any default dimensions that were set in the agent config file
new_dimensions.update(default_dimensions)
return new_dimensions
class Paths(object):
"""
Return information about system paths.
"""
def __init__(self):
self.osname = get_os()
def get_confd_path(self):
bad_path = ''
if self.osname == 'windows':
try:
return self._windows_confd_path()
except PathNotFound as e:
if len(e.args) > 0:
bad_path = e.args[0]
else:
try:
return self._unix_confd_path()
except PathNotFound as e:
if len(e.args) > 0:
bad_path = e.args[0]
cur_path = os.path.dirname(os.path.realpath(__file__))
cur_path = os.path.join(cur_path, 'conf.d')
if os.path.exists(cur_path):
return cur_path
raise PathNotFound(bad_path)
def _unix_confd_path(self):
path = os.path.join(configuration.DEFAULT_CONFIG_DIR, 'conf.d')
if os.path.exists(path):
return path
raise PathNotFound(path)
def _windows_confd_path(self):
common_data = _windows_commondata_path()
path = os.path.join(common_data, 'Datadog', 'conf.d')
if os.path.exists(path):
return path
raise PathNotFound(path)
def get_checksd_path(self):
if self.osname == 'windows':
return self._windows_checksd_path()
else:
return self._unix_checksd_path()
def _unix_checksd_path(self):
# Unix only will look up based on the current directory
# because checks_d will hang with the other python modules
cur_path = os.path.dirname(os.path.realpath(__file__))
checksd_path = os.path.join(cur_path, '../collector/checks_d')
if os.path.exists(checksd_path):
return checksd_path
raise PathNotFound(checksd_path)
def _windows_checksd_path(self):
if hasattr(sys, 'frozen'):
# we're frozen - from py2exe
prog_path = os.path.dirname(sys.executable)
checksd_path = os.path.join(prog_path, '..', 'checks_d')
else:
cur_path = os.path.dirname(__file__)
checksd_path = os.path.join(cur_path, '../collector/checks_d')
if os.path.exists(checksd_path):
return checksd_path
raise PathNotFound(checksd_path)
def _windows_commondata_path():
"""Return the common appdata path, using ctypes
From http://stackoverflow.com/questions/626796/\
how-do-i-find-the-windows-common-application-data-folder-using-python
"""
import ctypes
from ctypes import wintypes, windll
_SHGetFolderPath = windll.shell32.SHGetFolderPathW
_SHGetFolderPath.argtypes = [wintypes.HWND,
ctypes.c_int,
wintypes.HANDLE,
wintypes.DWORD, wintypes.LPCWSTR]
path_buf = wintypes.create_unicode_buffer(wintypes.MAX_PATH)
return path_buf.value
def set_win32_cert_path(self):
"""In order to use tornado.httpclient with the packaged .exe on Windows we
need to override the default ceritifcate location which is based on the path
to tornado and will give something like "C:\path\to\program.exe\tornado/cert-file".
"""
if hasattr(sys, 'frozen'):
# we're frozen - from py2exe
prog_path = os.path.dirname(sys.executable)
crt_path = os.path.join(prog_path, 'ca-certificates.crt')
else:
cur_path = os.path.dirname(__file__)
crt_path = os.path.join(cur_path, 'packaging', 'monasca-agent', 'win32',
'install_files', 'ca-certificates.crt')
import tornado.simple_httpclient
log.info("Windows certificate path: %s" % crt_path)
tornado.simple_httpclient._DEFAULT_CA_CERTS = crt_path
def plural(count):
if count == 1:
return ""
return "s"
def get_tornado_ioloop():
if tornado_version[0] == 3:
return ioloop.IOLoop.current()
else:
return ioloop.IOLoop.instance()
def get_uuid():
# Generate a unique name that will stay constant between
# invocations, such as platform.node() + uuid.getnode()
# Use uuid5, which does not depend on the clock and is
# recommended over uuid3.
# This is important to be able to identify a server even if
# its drives have been wiped clean.
# Note that this is not foolproof but we can reconcile servers
# on the back-end if need be, based on mac addresses.
return uuid.uuid5(uuid.NAMESPACE_DNS, platform.node() + str(uuid.getnode())).hex
def get_os():
"Human-friendly OS name"
if sys.platform == 'darwin':
return 'mac'
elif sys.platform.find('freebsd') != -1:
return 'freebsd'
elif sys.platform.find('linux') != -1:
return 'linux'
elif sys.platform.find('win32') != -1:
return 'windows'
elif sys.platform.find('sunos') != -1:
return 'solaris'
else:
return sys.platform
def headers(agentConfig):
# Build the request headers
return {
'User-Agent': 'Mon Agent/%s' % agentConfig['version'],
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'text/html, */*',
}
def getTopIndex():
macV = None
if sys.platform == 'darwin':
macV = platform.mac_ver()
# Output from top is slightly modified on OS X 10.6 (case #28239)
if macV and macV[0].startswith('10.6.'):
return 6
else:
return 5
def isnan(val):
if hasattr(math, 'isnan'):
return math.isnan(val)
# for py < 2.6, use a different check
# http://stackoverflow.com/questions/944700/how-to-check-for-nan-in-python
return str(val) == str(1e400 * 0)
def cast_metric_val(val):
# ensure that the metric value is a numeric type
if not isinstance(val, NumericTypes):
# Try the int conversion first because want to preserve
# whether the value is an int or a float. If neither work,
# raise a ValueError to be handled elsewhere
for cast in [int, float]:
try:
val = cast(val)
return val
except ValueError:
continue
raise ValueError
return val
def is_valid_hostname(hostname):
if hostname.lower() in {'localhost', 'localhost.localdomain',
'localhost6.localdomain6', 'ip6-localhost'}:
log.warning("Hostname: %s is local" % hostname)
return False
if len(hostname) > MAX_HOSTNAME_LEN:
log.warning("Hostname: %s is too long (max length is %s characters)" %
(hostname, MAX_HOSTNAME_LEN))
return False
if VALID_HOSTNAME_RFC_1123_PATTERN.match(hostname) is None:
log.warning("Hostname: %s is not complying with RFC 1123" % hostname)
return False
return True
def get_hostname():
"""
Get the canonical host name this agent should identify as. This is
the authoritative source of the host name for the agent.
Tries, in order:
* agent config (agent.conf, "hostname:")
* 'hostname -f' (on unix)
* socket.gethostname()
"""
hostname = None
# first, try the config
config = configuration.Config()
agent_config = config.get_config(sections='Main')
config_hostname = agent_config.get('hostname')
if config_hostname and is_valid_hostname(config_hostname):
return config_hostname
# then move on to os-specific detection
if hostname is None:
def _get_hostname_unix():
try:
# try fqdn
p = subprocess.Popen(['/bin/hostname', '-f'], stdout=subprocess.PIPE)
out, err = p.communicate()
if p.returncode == 0:
return out.strip()
except Exception:
return None
os_name = get_os()
if os_name in ['mac', 'freebsd', 'linux', 'solaris']:
unix_hostname = _get_hostname_unix()
if unix_hostname and is_valid_hostname(unix_hostname):
hostname = unix_hostname
# fall back on socket.gethostname(), socket.getfqdn() is too unreliable
if hostname is None:
try:
socket_hostname = socket.gethostname()
except socket.error:
socket_hostname = None
if socket_hostname and is_valid_hostname(socket_hostname):
hostname = socket_hostname
if hostname is None:
log.critical(
'Unable to reliably determine host name. You can define one in agent.conf or in your hosts file')
raise Exception(
'Unable to reliably determine host name. You can define one in agent.conf or in your hosts file')
else:
return hostname
def get_parsed_args():
parser = optparse.OptionParser()
parser.add_option('-c', '--clean', action='store_true', default=False, dest='clean')
parser.add_option('-v', '--verbose', action='store_true', default=False, dest='verbose',
help='Print out stacktraces for errors in checks')
try:
options, args = parser.parse_args()
except SystemExit:
# Ignore parse errors
options, args = optparse.Values({'clean': False}), []
return options, args
def load_check_directory():
''' Return the initialized checks from checks_d, and a mapping of checks that failed to
initialize. Only checks that have a configuration
file in conf.d will be returned. '''
from monasca_agent.collector.checks import AgentCheck
config = configuration.Config()
agent_config = config.get_config('Main')
initialized_checks = {}
init_failed_checks = {}
paths = Paths()
checks_paths = [glob.glob(os.path.join(agent_config['additional_checksd'], '*.py'))]
try:
checksd_path = paths.get_checksd_path()
checks_paths.append(glob.glob(os.path.join(checksd_path, '*.py')))
except PathNotFound as e:
log.error(e.args[0])
sys.exit(3)
try:
confd_path = paths.get_confd_path()
except PathNotFound as e:
log.error(
"No conf.d folder found at '%s' or in the directory where the Agent is currently deployed.\n" %
e.args[0])
sys.exit(3)
# For backwards-compatability with old style checks, we have to load every
# checks_d module and check for a corresponding config OR check if the old
# config will "activate" the check.
#
# Once old-style checks aren't supported, we'll just read the configs and
# import the corresponding check module
for check in itertools.chain(*checks_paths):
check_name = os.path.basename(check).split('.')[0]
if check_name in initialized_checks or check_name in init_failed_checks:
log.debug(
'Skipping check %s because it has already been loaded from another location', check)
continue
try:
check_module = imp.load_source('checksd_%s' % check_name, check)
except Exception as e:
traceback_message = traceback.format_exc()
# Let's see if there is a conf.d for this check
conf_path = os.path.join(confd_path, '%s.yaml' % check_name)
if os.path.exists(conf_path):
# There is a configuration file for that check but the module can't be imported
init_failed_checks[check_name] = {'error': e, 'traceback': traceback_message}
log.exception('Unable to import check module %s.py from checks_d' % check_name)
else: # There is no conf for that check. Let's not spam the logs for it.
log.debug('Unable to import check module %s.py from checks_d' % check_name)
continue
check_class = None
classes = inspect.getmembers(check_module, inspect.isclass)
for _, clsmember in classes:
if clsmember == AgentCheck:
continue
if issubclass(clsmember, AgentCheck):
check_class = clsmember
if AgentCheck in clsmember.__bases__:
continue
else:
break
if not check_class:
if not check_name == '__init__':
log.error('No check class (inheriting from AgentCheck) found in %s.py' % check_name)
continue
# Check if the config exists OR we match the old-style config
conf_path = os.path.join(confd_path, '%s.yaml' % check_name)
if os.path.exists(conf_path):
try:
check_config = config.check_yaml(conf_path)
except Exception as e:
log.exception("Unable to parse yaml config in %s" % conf_path)
traceback_message = traceback.format_exc()
init_failed_checks[check_name] = {'error': e, 'traceback': traceback_message}
continue
elif hasattr(check_class, 'parse_agent_config'):
# FIXME: Remove this check once all old-style checks are gone
try:
check_config = check_class.parse_agent_config(agent_config)
except Exception as e:
continue
if not check_config:
continue
d = [
"Configuring %s in agent.conf is deprecated." % (check_name),
"Please use conf.d. In a future release, support for the",
"old style of configuration will be dropped.",
]
log.warn(" ".join(d))
else:
log.debug('No conf.d/%s.yaml found for checks_d/%s.py' % (check_name, check_name))
continue
# Look for the per-check config, which *must* exist
if not check_config.get('instances'):
log.error("Config %s is missing 'instances'" % conf_path)
continue
# Init all of the check's classes with
init_config = check_config.get('init_config', {})
# init_config: in the configuration triggers init_config to be defined
# to None.
if init_config is None:
init_config = {}
instances = check_config['instances']
try:
try:
c = check_class(check_name, init_config=init_config,
agent_config=agent_config, instances=instances)
except TypeError as e:
# Backwards compatibility for checks which don't support the
# instances argument in the constructor.
c = check_class(check_name, init_config=init_config,
agent_config=agent_config)
c.instances = instances
except Exception as e:
log.exception('Unable to initialize check %s' % check_name)
traceback_message = traceback.format_exc()
init_failed_checks[check_name] = {'error': e, 'traceback': traceback_message}
else:
initialized_checks[check_name] = c
# Add custom pythonpath(s) if available
if 'pythonpath' in check_config:
pythonpath = check_config['pythonpath']
if not isinstance(pythonpath, list):
pythonpath = [pythonpath]
sys.path.extend(pythonpath)
log.debug('Loaded check.d/%s.py' % check_name)
log.info('initialized checks_d checks: %s' % initialized_checks.keys())
log.info('initialization failed checks_d checks: %s' % init_failed_checks.keys())
return {'initialized_checks': initialized_checks.values(),
'init_failed_checks': init_failed_checks,
}
def initialize_logging(logger_name):
try:
log_format = '%%(asctime)s | %%(levelname)s | %s | %%(name)s(%%(filename)s:%%(lineno)s) | %%(message)s' % logger_name
log_date_format = "%Y-%m-%d %H:%M:%S %Z"
config = configuration.Config()
logging_config = config.get_config(sections='Logging')
logging.basicConfig(
format=log_format,
level=logging_config['log_level'] or logging.INFO,
)
# set up file loggers
log_file = logging_config.get('%s_log_file' % logger_name)
if log_file is not None and not logging_config['disable_file_logging']:
# make sure the log directory is writeable
# NOTE: the entire directory needs to be writable so that rotation works
if os.access(os.path.dirname(log_file), os.R_OK | os.W_OK):
file_handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=LOGGING_MAX_BYTES, backupCount=1)
formatter = logging.Formatter(log_format, log_date_format)
file_handler.setFormatter(formatter)
root_log = logging.getLogger()
root_log.addHandler(file_handler)
else:
sys.stderr.write("Log file is unwritable: '%s'\n" % log_file)
# set up syslog
if logging_config['log_to_syslog']:
try:
syslog_format = '%s[%%(process)d]: %%(levelname)s (%%(filename)s:%%(lineno)s): %%(message)s' % logger_name
from logging.handlers import SysLogHandler
if logging_config['syslog_host'] is not None and logging_config[
'syslog_port'] is not None:
sys_log_addr = (logging_config['syslog_host'], logging_config['syslog_port'])
else:
sys_log_addr = "/dev/log"
# Special-case macs
if sys.platform == 'darwin':
sys_log_addr = "/var/run/syslog"
handler = SysLogHandler(address=sys_log_addr, facility=SysLogHandler.LOG_DAEMON)
handler.setFormatter(
logging.Formatter(syslog_format, log_date_format))
root_log = logging.getLogger()
root_log.addHandler(handler)
except Exception as e:
sys.stderr.write("Error setting up syslog: '%s'\n" % str(e))
traceback.print_exc()
except Exception as e:
sys.stderr.write("Couldn't initialize logging: %s\n" % str(e))
traceback.print_exc()
# if config fails entirely, enable basic stdout logging as a fallback
logging.basicConfig(
format=log_format,
level=logging.INFO,
)
# re-get the log after logging is initialized
global log
log = logging.getLogger(__name__)
"""
Iterable Recipes
"""

View File

@ -26,12 +26,12 @@ class MonAPI(object):
self.api_version = '2_0'
self.keystone = keystone.Keystone(config)
self.mon_client = None
self.max_buffer_size = config['max_buffer_size']
self.backlog_send_rate = config['backlog_send_rate']
self.max_buffer_size = int(config['max_buffer_size'])
self.backlog_send_rate = int(config['backlog_send_rate'])
self.message_queue = collections.deque(maxlen=self.max_buffer_size)
# 'amplifier' is completely optional and may not exist in the config
try:
self.amplifier = config['amplifier']
self.amplifier = int(config['amplifier'])
except KeyError:
self.amplifier = None

View File

@ -1,63 +1,56 @@
#!/usr/bin/env python
"""
Datadog
www.datadoghq.com
----
Make sense of your IT Data
Licensed under Simplified BSD License (see LICENSE)
(C) Boxed Ice 2010 all rights reserved
(C) Datadog, Inc. 2010-2013 all rights reserved
"""
# set up logging before importing any other components
from monasca_agent.common.config import initialize_logging
from monasca_agent.forwarder.api.mon import MonAPI
# Standard imports
import socket
import logging
import signal
import sys
import datetime
initialize_logging('forwarder')
from monasca_agent.common.config import get_logging_config
# set up logging before importing any other components
import monasca_agent.common.util as util
import monasca_agent.forwarder.api.mon as mon
import monasca_agent.common.config as cfg
util.initialize_logging('forwarder')
import os
os.umask(022)
# Standard imports
import logging
import sys
from datetime import timedelta
import signal
from socket import gaierror, error as socket_error
# Tornado
import tornado.httpclient
import tornado.httpserver
import tornado.ioloop
import tornado.web
from tornado.escape import json_decode
from tornado.options import define, parse_command_line, options
import tornado.escape
import tornado.options
# agent import
from monasca_agent.common.check_status import ForwarderStatus
from monasca_agent.common.config import get_config
from monasca_agent.common.metrics import Measurement
from monasca_agent.common.util import Watchdog, get_tornado_ioloop
from transaction import Transaction, TransactionManager
import monasca_agent.common.check_status as check_status
import monasca_agent.common.metrics as metrics
import monasca_agent.common.util as util
import transaction
log = logging.getLogger('forwarder')
log.setLevel(get_logging_config()['log_level'] or logging.INFO)
TRANSACTION_FLUSH_INTERVAL = 5000 # Every 5 seconds
WATCHDOG_INTERVAL_MULTIPLIER = 10 # 10x flush interval
# Maximum delay before replaying a transaction
MAX_WAIT_FOR_REPLAY = timedelta(seconds=90)
MAX_WAIT_FOR_REPLAY = datetime.timedelta(seconds=90)
# Maximum queue size in bytes (when this is reached, old messages are dropped)
MAX_QUEUE_SIZE = 30 * 1024 * 1024 # 30MB
THROTTLING_DELAY = timedelta(microseconds=1000000 / 2) # 2 msg/second
THROTTLING_DELAY = datetime.timedelta(microseconds=1000000 / 2) # 2 msg/second
class MetricTransaction(Transaction):
class MetricTransaction(transaction.Transaction):
_application = None
_trManager = None
@ -87,7 +80,7 @@ class MetricTransaction(Transaction):
self._headers = headers
# Call after data has been set (size is computed in Transaction's init)
Transaction.__init__(self)
transaction.Transaction.__init__(self)
# Insert the transaction in the Manager
self._trManager.append(self)
@ -140,7 +133,7 @@ class AgentInputHandler(tornado.web.RequestHandler):
# monasca_agent.common.metrics.Measurements expressed as a dict
msg = tornado.escape.json_decode(self.request.body)
try:
measurements = [Measurement(**m) for m in msg]
measurements = [metrics.Measurement(**m) for m in msg]
except Exception:
log.exception('Error parsing body of Agent Input')
raise tornado.web.HTTPError(500)
@ -161,11 +154,11 @@ class Forwarder(tornado.web.Application):
def __init__(self, port, agent_config, watchdog=True, skip_ssl_validation=False,
use_simple_http_client=False):
self._port = int(port)
self._agentConfig = agent_config
self._agent_config = agent_config
self._metrics = {}
MetricTransaction.set_application(self)
MetricTransaction.set_endpoints(MonAPI(agent_config['Api']))
self._tr_manager = TransactionManager(MAX_WAIT_FOR_REPLAY, MAX_QUEUE_SIZE, THROTTLING_DELAY)
MetricTransaction.set_endpoints(mon.MonAPI(agent_config))
self._tr_manager = transaction.TransactionManager(MAX_WAIT_FOR_REPLAY, MAX_QUEUE_SIZE, THROTTLING_DELAY)
MetricTransaction.set_tr_manager(self._tr_manager)
self._watchdog = None
@ -177,7 +170,7 @@ class Forwarder(tornado.web.Application):
if watchdog:
watchdog_timeout = TRANSACTION_FLUSH_INTERVAL * WATCHDOG_INTERVAL_MULTIPLIER
self._watchdog = Watchdog(
self._watchdog = util.Watchdog(
watchdog_timeout, max_mem_mb=agent_config.get('limit_memory_consumption', None))
def _post_metrics(self):
@ -215,7 +208,7 @@ class Forwarder(tornado.web.Application):
log_function=self.log_request
)
non_local_traffic = self._agentConfig.get("non_local_traffic", False)
non_local_traffic = self._agent_config.get("non_local_traffic", False)
tornado.web.Application.__init__(self, handlers, **settings)
http_server = tornado.httpserver.HTTPServer(self)
@ -228,17 +221,17 @@ class Forwarder(tornado.web.Application):
# localhost in lieu of 127.0.0.1 to support IPv6
try:
http_server.listen(self._port, address="localhost")
except gaierror:
except socket.gaierror:
log.warning(
"localhost seems undefined in your host file, using 127.0.0.1 instead")
http_server.listen(self._port, address="127.0.0.1")
except socket_error as e:
except socket.error as e:
if "Errno 99" in str(e):
log.warning("IPv6 doesn't seem to be fully supported. Falling back to IPv4")
http_server.listen(self._port, address="127.0.0.1")
else:
raise
except socket_error as e:
except socket.error as e:
log.exception(
"Socket error %s. Is another application listening on the same port ? Exiting", e)
sys.exit(1)
@ -249,9 +242,9 @@ class Forwarder(tornado.web.Application):
log.info("Listening on port %d" % self._port)
# Register callbacks
self.mloop = get_tornado_ioloop()
self.mloop = util.get_tornado_ioloop()
logging.getLogger().setLevel(get_logging_config()['log_level'] or logging.INFO)
logging.getLogger().setLevel(self._agent_config.get('log_level', logging.INFO))
def flush_trs():
if self._watchdog:
@ -275,9 +268,10 @@ class Forwarder(tornado.web.Application):
def init_forwarder(skip_ssl_validation=False, use_simple_http_client=False):
agent_config = get_config(parse_args=False)
config = cfg.Config()
agent_config = config.get_config(['Main', 'Api', 'Logging'])
port = agent_config.get('listen_port', 17123)
port = agent_config['listen_port']
if port is None:
port = 17123
else:
@ -297,17 +291,17 @@ def init_forwarder(skip_ssl_validation=False, use_simple_http_client=False):
def main():
define("sslcheck", default=1, help="Verify SSL hostname, on by default")
define("use_simple_http_client", default=0,
help="Use Tornado SimpleHTTPClient instead of CurlAsyncHTTPClient")
args = parse_command_line()
tornado.options.define("sslcheck", default=1, help="Verify SSL hostname, on by default")
tornado.options.define("use_simple_http_client", default=0,
help="Use Tornado SimpleHTTPClient instead of CurlAsyncHTTPClient")
args = tornado.options.parse_command_line()
skip_ssl_validation = False
use_simple_http_client = False
if unicode(options.sslcheck) == u"0":
if unicode(tornado.options.options.sslcheck) == u"0":
skip_ssl_validation = True
if unicode(options.use_simple_http_client) == u"1":
if unicode(tornado.options.options.use_simple_http_client) == u"1":
use_simple_http_client = True
# If we don't have any arguments, run the server.
@ -316,14 +310,14 @@ def main():
try:
app.run()
finally:
ForwarderStatus.remove_latest_status()
check_status.ForwarderStatus.remove_latest_status()
else:
usage = "%s [help|info]. Run with no commands to start the server" % (sys.argv[0])
command = args[0]
if command == 'info':
logging.getLogger().setLevel(logging.ERROR)
return ForwarderStatus.print_latest_status()
return check_status.ForwarderStatus.print_latest_status()
elif command == 'help':
print(usage)
else:

View File

@ -4,11 +4,14 @@ A Python Statsd implementation with dimensions added
"""
# set up logging before importing any other components
from monasca_agent.common.config import initialize_logging
from monasca_agent.statsd.reporter import Reporter
from monasca_agent.statsd.udp import Server
import monasca_agent.common.util as util
util.initialize_logging('statsd')
import monasca_agent.common.config as cfg
import monasca_agent.statsd.reporter as reporter
import monasca_agent.statsd.udp as udp
initialize_logging('statsd')
# stdlib
import argparse
@ -17,10 +20,8 @@ import signal
import sys
# project
from monasca_agent.common.aggregator import MetricsAggregator
from monasca_agent.common.check_status import MonascaStatsdStatus
from monasca_agent.common.config import get_config
from monasca_agent.common.util import get_hostname
import monasca_agent.common.aggregator as agg
import monasca_agent.common.check_status as check_status
log = logging.getLogger('statsd')
@ -29,27 +30,32 @@ class MonascaStatsd(object):
""" This class is the monasca_statsd daemon. """
def __init__(self, config_path):
config = get_config(parse_args=False, cfg_path=config_path)
config = cfg.Config()
statsd_config = config.get_config(['Main', 'Statsd'])
# Create the aggregator (which is the point of communication between the server and reporting threads.
aggregator = MetricsAggregator(get_hostname(config),
int(config['monasca_statsd_agregator_interval']),
recent_point_threshold=config.get('recent_point_threshold', None))
aggregator = agg.MetricsAggregator(util.get_hostname(),
int(statsd_config['monasca_statsd_agregator_interval']),
recent_point_threshold=statsd_config['recent_point_threshold'])
# Start the reporting thread.
interval = int(config['monasca_statsd_interval'])
interval = int(statsd_config['monasca_statsd_interval'])
assert 0 < interval
self.reporter = Reporter(interval, aggregator, config['forwarder_url'], True, config.get('event_chunk_size'))
self.reporter = reporter.Reporter(interval,
aggregator,
statsd_config['forwarder_url'],
True,
statsd_config.get('event_chunk_size'))
# Start the server on an IPv4 stack
if config['non_local_traffic']:
if statsd_config['non_local_traffic']:
server_host = ''
else:
server_host = 'localhost'
self.server = Server(aggregator, server_host, config['monasca_statsd_port'],
forward_to_host=config.get('statsd_forward_host'),
forward_to_port=config.get('statsd_forward_port'))
self.server = udp.Server(aggregator, server_host, statsd_config['monasca_statsd_port'],
forward_to_host=statsd_config.get('monasca_statsd_forward_host'),
forward_to_port=int(statsd_config.get('monasca_statsd_forward_port')))
def _handle_sigterm(self, signum, frame):
log.debug("Caught sigterm. Stopping run loop.")
@ -89,7 +95,7 @@ def main():
if args.info:
logging.getLogger().setLevel(logging.ERROR)
return MonascaStatsdStatus.print_latest_status()
return check_status.MonascaStatsdStatus.print_latest_status()
monasca_statsd = MonascaStatsd(args.config)
monasca_statsd.run()

View File

@ -1,12 +1,11 @@
import json
import logging
import threading
from monasca_agent.common.check_status import MonascaStatsdStatus
from monasca_agent.common.emitter import http_emitter
from monasca_agent.common.util import plural
from monasca_agent.common.config import initialize_logging
initialize_logging('statsd')
log = logging.getLogger('statsd')
import monasca_agent.common.check_status as check_status
import monasca_agent.common.emitter as emitter
import monasca_agent.common.util as util
log = logging.getLogger(__name__)
WATCHDOG_TIMEOUT = 120
@ -54,7 +53,7 @@ class Reporter(threading.Thread):
log.debug("Watchdog enabled: %s" % bool(self.watchdog))
# Persist a start-up message.
MonascaStatsdStatus().persist()
check_status.MonascaStatsdStatus().persist()
while not self.finished.isSet(): # Use camel case isSet for 2.4 support.
self.finished.wait(self.interval)
@ -64,7 +63,7 @@ class Reporter(threading.Thread):
# Clean up the status messages.
log.debug("Stopped reporter")
MonascaStatsdStatus.remove_latest_status()
check_status.MonascaStatsdStatus.remove_latest_status()
def flush(self):
try:
@ -77,7 +76,7 @@ class Reporter(threading.Thread):
self.log_count = 0
if count:
try:
http_emitter(metrics, log, self.api_host)
emitter.http_emitter(metrics, log, self.api_host)
except Exception:
log.exception("Error running emitter.")
@ -94,9 +93,9 @@ class Reporter(threading.Thread):
"Flush #%s: flushed %s metric%s and %s event%s" %
(self.flush_count,
count,
plural(count),
event_count,
plural(event_count)))
util.plural(count),
event_count,
util.plural(event_count)))
if self.flush_count == FLUSH_LOGGING_INITIAL:
log.info(
"First flushes done, %s flushes will be logged every %s flushes." %
@ -105,13 +104,11 @@ class Reporter(threading.Thread):
# Persist a status message.
packet_count = self.aggregator.total_count
packets_per_second = self.aggregator.packets_per_second(self.interval)
MonascaStatsdStatus(
flush_count=self.flush_count,
packet_count=packet_count,
packets_per_second=packets_per_second,
metric_count=count,
event_count=event_count,
).persist()
check_status.MonascaStatsdStatus(flush_count=self.flush_count,
packet_count=packet_count,
packets_per_second=packets_per_second,
metric_count=count,
event_count=event_count).persist()
except Exception:
log.exception("Error flushing metrics")

View File

@ -2,9 +2,8 @@ import ast
import logging
import select
import socket
from monasca_agent.common.config import initialize_logging
initialize_logging('statsd')
log = logging.getLogger('statsd')
log = logging.getLogger(__name__)
UDP_SOCKET_TIMEOUT = 5