Renamed dogstatsd->monstatsd

This commit is contained in:
Tim Kuhlman 2014-05-07 16:07:03 -06:00
parent c0d5e8c1df
commit af384111c4
13 changed files with 469 additions and 473 deletions

View File

@ -41,8 +41,8 @@ use_mount: no
# Change port the Agent is listening to
# listen_port: 17123
# Additional directory to look for Datadog checks
# additional_checksd: /etc/dd-agent/checks.d/
# Additional directory to look for checks
# additional_checksd: /etc/mon-agent/checks.d/
# Allow non-local traffic to this Agent
# This is required when using this Agent as a proxy for other Agents
@ -52,25 +52,24 @@ use_mount: no
# non_local_traffic: no
# ========================================================================== #
# DogStatsd configuration #
# MonStatsd configuration #
# ========================================================================== #
# DogStatsd is a small server that aggregates your custom app metrics. For
# usage information, check out http://api.datadoghq.com
# MonStatsd is a small server that aggregates your custom app metrics.
# Make sure your client is sending to the same port.
dogstatsd_port : 8125
monstatsd_port : 8125
## The dogstatsd flush period.
# dogstatsd_interval : 10
## The monstatsd flush period.
# monstatsd_interval : 10
## If 'yes', counters and rates will be normalized to 1 second (that is divided
## by the dogstatsd_interval) before being sent to the server. Defaults to 'yes'
# dogstatsd_normalize : yes
## by the monstatsd_interval) before being sent to the server. Defaults to 'yes'
# monstatsd_normalize : yes
# If you want to forward every packet received by the dogstatsd server
# If you want to forward every packet received by the monstatsd server
# to another statsd server, uncomment these lines.
# WARNING: Make sure that forwarded packets are regular statsd packets and not "dogstatsd" packets,
# WARNING: Make sure that forwarded packets are regular statsd packets and not "monstatsd" packets,
# as your other statsd server might not be able to handle them.
# statsd_forward_host: address_of_own_statsd_server
# statsd_forward_port: 8125
@ -85,7 +84,7 @@ dogstatsd_port : 8125
# Some infrastrucures have many constantly changing virtual devices (e.g. folks
# running constantly churning linux containers) whose metrics aren't
# interesting for datadog. To filter out a particular pattern of devices
# interesting. To filter out a particular pattern of devices
# from collection, configure a regex here:
# device_blacklist_re: .*\/dev\/mapper\/lxc-box.*

View File

@ -12,7 +12,6 @@ import subprocess
import tempfile
import time
# datadog
from monagent.common.util import PidFile, get_os
log = logging.getLogger(__name__)
@ -289,7 +288,7 @@ class JMXFetch(object):
@classmethod
def start(cls, confd_path, agentConfig, logging_config, path_to_java, java_run_opts,
default_check_frequency, jmx_checks, command, reporter=None):
statsd_port = agentConfig.get('dogstatsd_port', "8125")
statsd_port = agentConfig.get('monstatsd_port', "8125")
if reporter is None:
reporter = "statsd:%s" % str(statsd_port)

View File

@ -1,4 +1,4 @@
""" Aggregation classes used by the collector and dogstatsd to batch messages sent to the forwarder.
""" Aggregation classes used by the collector and monstatsd to batch messages sent to the forwarder.
"""
import logging
from time import time

View File

@ -512,9 +512,9 @@ class CollectorStatus(AgentStatus):
return status_info
class DogstatsdStatus(AgentStatus):
class MonstatsdStatus(AgentStatus):
NAME = 'Dogstatsd'
NAME = 'Monstatsd'
def __init__(self, flush_count=0, packet_count=0, packets_per_second=0, metric_count=0, event_count=0):
AgentStatus.__init__(self)

View File

@ -23,7 +23,7 @@ except ImportError:
from yaml import Loader
# project
from util import get_os, Platform
from util import get_os
from monagent.collector.jmxfetch import JMXFetch, JMX_COLLECT_COMMAND
# CONSTANTS
@ -188,10 +188,10 @@ def get_config(parse_args=True, cfg_path=None, options=None):
# General config
agent_config = {
'check_freq': DEFAULT_CHECK_FREQUENCY,
'dogstatsd_interval': DEFAULT_STATSD_FREQUENCY,
'dogstatsd_agregator_bucket_size': DEFAULT_STATSD_BUCKET_SIZE,
'dogstatsd_normalize': 'yes',
'dogstatsd_port': 8125,
'monstatsd_interval': DEFAULT_STATSD_FREQUENCY,
'monstatsd_agregator_bucket_size': DEFAULT_STATSD_BUCKET_SIZE,
'monstatsd_normalize': 'yes',
'monstatsd_port': 8125,
'forwarder_url': 'http://localhost:17123',
'hostname': None,
'listen_port': None,
@ -200,8 +200,8 @@ def get_config(parse_args=True, cfg_path=None, options=None):
'additional_checksd': '/etc/mon-agent/checks.d/',
}
dogstatsd_interval = DEFAULT_STATSD_FREQUENCY
dogstatsd_agregator_bucket_size = DEFAULT_STATSD_BUCKET_SIZE
monstatsd_interval = DEFAULT_STATSD_FREQUENCY
monstatsd_agregator_bucket_size = DEFAULT_STATSD_BUCKET_SIZE
# Config handling
try:
@ -254,14 +254,14 @@ def get_config(parse_args=True, cfg_path=None, options=None):
if config.get('Main', 'watchdog').lower() in ('no', 'false'):
agent_config['watchdog'] = False
# Dogstatsd config
dogstatsd_defaults = {
'dogstatsd_port': 8125,
'dogstatsd_interval': dogstatsd_interval,
'dogstatsd_agregator_bucket_size': dogstatsd_agregator_bucket_size,
'dogstatsd_normalize': 'yes',
# monstatsd config
monstatsd_defaults = {
'monstatsd_port': 8125,
'monstatsd_interval': monstatsd_interval,
'monstatsd_agregator_bucket_size': monstatsd_agregator_bucket_size,
'monstatsd_normalize': 'yes',
}
for key, value in dogstatsd_defaults.iteritems():
for key, value in monstatsd_defaults.iteritems():
if config.has_option('Main', key):
agent_config[key] = config.get('Main', key)
else:
@ -274,7 +274,7 @@ def get_config(parse_args=True, cfg_path=None, options=None):
agent_config['statsd_forward_port'] = int(config.get('Main', 'statsd_forward_port'))
# normalize 'yes'/'no' to boolean
dogstatsd_defaults['dogstatsd_normalize'] = _is_affirmative(dogstatsd_defaults['dogstatsd_normalize'])
monstatsd_defaults['monstatsd_normalize'] = _is_affirmative(monstatsd_defaults['monstatsd_normalize'])
# Optional config
# FIXME not the prettiest code ever...
@ -365,7 +365,7 @@ def set_win32_cert_path():
def get_proxy(agent_config, use_system_settings=False):
proxy_settings = {}
# First we read the proxy configuration from datadog.conf
# First we read the proxy configuration from agent.conf
proxy_host = agent_config.get('proxy_host', None)
if proxy_host is not None and not use_system_settings:
proxy_settings['host'] = proxy_host
@ -381,7 +381,7 @@ def get_proxy(agent_config, use_system_settings=False):
log.debug("Proxy Settings: %s:%s@%s:%s" % (proxy_settings['user'], "*****", proxy_settings['host'], proxy_settings['port']))
return proxy_settings
# If no proxy configuration was specified in datadog.conf
# If no proxy configuration was specified in agent.conf
# We try to read it from the system settings
try:
import urllib
@ -582,7 +582,7 @@ def load_check_directory(agent_config):
if not check_config:
continue
d = [
"Configuring %s in datadog.conf is deprecated." % (check_name),
"Configuring %s in agent.conf is deprecated." % (check_name),
"Please use conf.d. In a future release, support for the",
"old style of configuration will be dropped.",
]
@ -659,10 +659,10 @@ def get_logging_config(cfg_path=None):
if system_os != 'windows':
logging_config = {
'log_level': None,
'collector_log_file': '/var/log/datadog/collector.log',
'forwarder_log_file': '/var/log/datadog/forwarder.log',
'dogstatsd_log_file': '/var/log/datadog/dogstatsd.log',
'jmxfetch_log_file': '/var/log/datadog/jmxfetch.log',
'collector_log_file': '/var/log/mon-agent/collector.log',
'forwarder_log_file': '/var/log/mon-agent/forwarder.log',
'monstatsd_log_file': '/var/log/mon-agent/monstatsd.log',
'jmxfetch_log_file': '/var/log/mon-agent/jmxfetch.log',
'log_to_event_viewer': False,
'log_to_syslog': True,
'syslog_host': None,
@ -692,7 +692,7 @@ def get_logging_config(cfg_path=None):
config_example_file = "https://github.com/DataDog/dd-agent/blob/master/datadog.conf.example"
sys.stderr.write("""Python logging config is no longer supported and will be ignored.
To configure logging, update the logging portion of 'datadog.conf' to match:
To configure logging, update the logging portion of 'agent.conf' to match:
'%s'.
""" % config_example_file)
@ -740,7 +740,6 @@ def get_logging_config(cfg_path=None):
return logging_config
def initialize_logging(logger_name):
global windows_file_handler_added
try:

413
monagent/monstatsd/__init__.py Normal file → Executable file
View File

@ -1 +1,412 @@
__author__ = 'kuhlmant'
#!/usr/bin/env python
"""
A Python Statsd implementation with dimensions added
"""
# set up logging before importing any other components
from monagent.common.config import initialize_logging
initialize_logging('monstatsd')
import os
os.umask(022)
# stdlib
import httplib as http_client
import json
import logging
import optparse
import re
import select
import signal
import socket
import sys
from time import time
import threading
from urllib import urlencode
# project
from monagent.common.aggregator import MetricsBucketAggregator
from monagent.common.check_status import MonstatsdStatus
from monagent.common.config import get_config
from monagent.common.daemon import Daemon, AgentSupervisor
from monagent.common.util import PidFile, get_hostname, plural, get_uuid, chunks
log = logging.getLogger('monstatsd')
WATCHDOG_TIMEOUT = 120
UDP_SOCKET_TIMEOUT = 5
# Since we call flush more often than the metrics aggregation interval, we should
# log a bunch of flushes in a row every so often.
FLUSH_LOGGING_PERIOD = 70
FLUSH_LOGGING_INITIAL = 10
FLUSH_LOGGING_COUNT = 5
EVENT_CHUNK_SIZE = 50
def serialize_metrics(metrics):
return json.dumps({"series": metrics})
def serialize_event(event):
return json.dumps(event)
class Reporter(threading.Thread):
"""
The reporter periodically sends the aggregated metrics to the
server.
"""
def __init__(self, interval, metrics_aggregator, api_host, use_watchdog=False, event_chunk_size=None):
threading.Thread.__init__(self)
self.interval = int(interval)
self.finished = threading.Event()
self.metrics_aggregator = metrics_aggregator
self.flush_count = 0
self.log_count = 0
self.watchdog = None
if use_watchdog:
from monagent.common.util import Watchdog
self.watchdog = Watchdog(WATCHDOG_TIMEOUT)
self.api_host = api_host
self.event_chunk_size = event_chunk_size or EVENT_CHUNK_SIZE
self.http_conn_cls = http_client.HTTPSConnection
match = re.match('^(https?)://(.*)', api_host)
if match:
self.api_host = match.group(2)
if match.group(1) == 'http':
self.http_conn_cls = http_client.HTTPConnection
def stop(self):
log.info("Stopping reporter")
self.finished.set()
def run(self):
log.info("Reporting to %s every %ss" % (self.api_host, self.interval))
log.debug("Watchdog enabled: %s" % bool(self.watchdog))
# Persist a start-up message.
MonstatsdStatus().persist()
while not self.finished.isSet(): # Use camel case isSet for 2.4 support.
self.finished.wait(self.interval)
self.flush()
if self.watchdog:
self.watchdog.reset()
# Clean up the status messages.
log.debug("Stopped reporter")
MonstatsdStatus.remove_latest_status()
def flush(self):
try:
self.flush_count += 1
self.log_count += 1
packets_per_second = self.metrics_aggregator.packets_per_second(self.interval)
packet_count = self.metrics_aggregator.total_count
metrics = self.metrics_aggregator.flush()
count = len(metrics)
if self.flush_count % FLUSH_LOGGING_PERIOD == 0:
self.log_count = 0
if count:
self.submit(metrics)
events = self.metrics_aggregator.flush_events()
event_count = len(events)
if event_count:
self.submit_events(events)
should_log = self.flush_count <= FLUSH_LOGGING_INITIAL or self.log_count <= FLUSH_LOGGING_COUNT
log_func = log.info
if not should_log:
log_func = log.debug
log_func("Flush #%s: flushed %s metric%s and %s event%s" % (self.flush_count, count, plural(count), event_count, plural(event_count)))
if self.flush_count == FLUSH_LOGGING_INITIAL:
log.info("First flushes done, %s flushes will be logged every %s flushes." % (FLUSH_LOGGING_COUNT, FLUSH_LOGGING_PERIOD))
# Persist a status message.
packet_count = self.metrics_aggregator.total_count
MonstatsdStatus(
flush_count=self.flush_count,
packet_count=packet_count,
packets_per_second=packets_per_second,
metric_count=count,
event_count=event_count,
).persist()
except Exception, e:
log.exception("Error flushing metrics")
def submit(self, metrics):
# Copy and pasted from dogapi, because it's a bit of a pain to distribute python
# dependencies with the agent.
body = serialize_metrics(metrics)
headers = {'Content-Type':'application/json'}
method = 'POST'
params = {}
url = '/api/v1/series?%s' % urlencode(params)
start_time = time()
status = None
conn = self.http_conn_cls(self.api_host)
try:
conn.request(method, url, body, headers)
#FIXME: add timeout handling code here
response = conn.getresponse()
status = response.status
response.close()
finally:
conn.close()
duration = round((time() - start_time) * 1000.0, 4)
log.debug("%s %s %s%s (%sms)" % (
status, method, self.api_host, url, duration))
return duration
def submit_events(self, events):
headers = {'Content-Type':'application/json'}
method = 'POST'
events_len = len(events)
event_chunk_size = self.event_chunk_size
for chunk in chunks(events, event_chunk_size):
payload = {
'events': {
'api': chunk
},
'uuid': get_uuid(),
'hostname': get_hostname()
}
params = {}
url = '/intake?%s' % urlencode(params)
status = None
conn = self.http_conn_cls(self.api_host)
try:
start_time = time()
conn.request(method, url, json.dumps(payload), headers)
response = conn.getresponse()
status = response.status
response.close()
duration = round((time() - start_time) * 1000.0, 4)
log.debug("%s %s %s%s (%sms)" % (
status, method, self.api_host, url, duration))
finally:
conn.close()
class Server(object):
"""
A statsd udp server.
"""
def __init__(self, metrics_aggregator, host, port, forward_to_host=None, forward_to_port=None):
self.host = host
self.port = int(port)
self.address = (self.host, self.port)
self.metrics_aggregator = metrics_aggregator
self.buffer_size = 1024 * 8
self.running = False
self.should_forward = forward_to_host is not None
self.forward_udp_sock = None
# In case we want to forward every packet received to another statsd server
if self.should_forward:
if forward_to_port is None:
forward_to_port = 8125
log.info("External statsd forwarding enabled. All packets received will be forwarded to %s:%s" % (forward_to_host, forward_to_port))
try:
self.forward_udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.forward_udp_sock.connect((forward_to_host, forward_to_port))
except Exception, e:
log.exception("Error while setting up connection to external statsd server")
def start(self):
""" Run the server. """
# Bind to the UDP socket.
# IPv4 only
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.setblocking(0)
try:
self.socket.bind(self.address)
except socket.gaierror:
if self.address[0] == 'localhost':
log.warning("Warning localhost seems undefined in your host file, using 127.0.0.1 instead")
self.address = ('127.0.0.1', self.address[1])
self.socket.bind(self.address)
log.info('Listening on host & port: %s' % str(self.address))
# Inline variables for quick look-up.
buffer_size = self.buffer_size
#todo dogstatsd is the only thing using this method of the aggregator, is there a more standard way to do it?
aggregator_submit = self.metrics_aggregator.submit_packets
sock = [self.socket]
socket_recv = self.socket.recv
select_select = select.select
select_error = select.error
timeout = UDP_SOCKET_TIMEOUT
should_forward = self.should_forward
forward_udp_sock = self.forward_udp_sock
# Run our select loop.
self.running = True
while self.running:
try:
ready = select_select(sock, [], [], timeout)
if ready[0]:
message = socket_recv(buffer_size)
aggregator_submit(message)
if should_forward:
forward_udp_sock.send(message)
except select_error, se:
# Ignore interrupted system calls from sigterm.
errno = se[0]
if errno != 4:
raise
except (KeyboardInterrupt, SystemExit):
break
except Exception, e:
log.exception('Error receiving datagram')
def stop(self):
self.running = False
class Monstatsd(Daemon):
""" This class is the monstatsd daemon. """
def __init__(self, pid_file, server, reporter, autorestart):
Daemon.__init__(self, pid_file, autorestart=autorestart)
self.server = server
self.reporter = reporter
def _handle_sigterm(self, signum, frame):
log.debug("Caught sigterm. Stopping run loop.")
self.server.stop()
def run(self):
# Gracefully exit on sigterm.
signal.signal(signal.SIGTERM, self._handle_sigterm)
# Handle Keyboard Interrupt
signal.signal(signal.SIGINT, self._handle_sigterm)
# Start the reporting thread before accepting data
self.reporter.start()
try:
try:
self.server.start()
except Exception, e:
log.exception('Error starting server')
raise e
finally:
# The server will block until it's done. Once we're here, shutdown
# the reporting thread.
self.reporter.stop()
self.reporter.join()
log.info("Monstatsd is stopped")
# Restart if asked to restart
if self.autorestart:
sys.exit(AgentSupervisor.RESTART_EXIT_STATUS)
def info(self):
logging.getLogger().setLevel(logging.ERROR)
return MonstatsdStatus.print_latest_status()
def init(config_path=None, use_watchdog=False):
"""Configure the server and the reporting thread.
"""
c = get_config(parse_args=False, cfg_path=config_path)
log.debug("Configuration monstatsd")
port = c['monstatsd_port']
interval = int(c['monstatsd_interval'])
aggregator_interval = int(c['monstatsd_agregator_bucket_size'])
non_local_traffic = c['non_local_traffic']
forward_to_host = c.get('statsd_forward_host')
forward_to_port = c.get('statsd_forward_port')
event_chunk_size = c.get('event_chunk_size')
target = c['forwarder_url']
hostname = get_hostname(c)
# Create the aggregator (which is the point of communication between the
# server and reporting threads.
assert 0 < interval
aggregator = MetricsBucketAggregator(hostname, aggregator_interval, recent_point_threshold=c.get('recent_point_threshold', None))
# Start the reporting thread.
reporter = Reporter(interval, aggregator, target, use_watchdog, event_chunk_size)
# Start the server on an IPv4 stack
# Default to loopback
server_host = 'localhost'
# If specified, bind to all addressses
if non_local_traffic:
server_host = ''
server = Server(aggregator, server_host, port, forward_to_host=forward_to_host, forward_to_port=forward_to_port)
return reporter, server, c
def main(config_path=None):
""" The main entry point for the unix version of monstatsd. """
parser = optparse.OptionParser("%prog [start|stop|restart|status]")
opts, args = parser.parse_args()
reporter, server, cnf = init(config_path, use_watchdog=True)
pid_file = PidFile('monstatsd')
daemon = Monstatsd(pid_file.get_path(), server, reporter,
cnf.get('autorestart', False))
# If no args were passed in, run the server in the foreground.
if not args:
daemon.run()
return 0
# Otherwise, we're process the deamon command.
else:
command = args[0]
if command == 'start':
daemon.start()
elif command == 'stop':
daemon.stop()
elif command == 'restart':
daemon.restart()
elif command == 'status':
daemon.status()
elif command == 'info':
return daemon.info()
else:
sys.stderr.write("Unknown command: %s\n\n" % command)
parser.print_help()
return 1
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -1,412 +0,0 @@
#!/usr/bin/env python
"""
A Python Statsd implementation with some datadog special sauce.
"""
# set up logging before importing any other components
from monagent.common.config import initialize_logging
initialize_logging('dogstatsd')
import os
os.umask(022)
# stdlib
import httplib as http_client
import json
import logging
import optparse
import re
import select
import signal
import socket
import sys
from time import time
import threading
from urllib import urlencode
# project
from monagent.common.aggregator import MetricsBucketAggregator
from common.check_status import DogstatsdStatus
from monagent.common.config import get_config
from common.daemon import Daemon, AgentSupervisor
from common.util import PidFile, get_hostname, plural, get_uuid, chunks
log = logging.getLogger('dogstatsd')
WATCHDOG_TIMEOUT = 120
UDP_SOCKET_TIMEOUT = 5
# Since we call flush more often than the metrics aggregation interval, we should
# log a bunch of flushes in a row every so often.
FLUSH_LOGGING_PERIOD = 70
FLUSH_LOGGING_INITIAL = 10
FLUSH_LOGGING_COUNT = 5
EVENT_CHUNK_SIZE = 50
def serialize_metrics(metrics):
return json.dumps({"series": metrics})
def serialize_event(event):
return json.dumps(event)
class Reporter(threading.Thread):
"""
The reporter periodically sends the aggregated metrics to the
server.
"""
def __init__(self, interval, metrics_aggregator, api_host, use_watchdog=False, event_chunk_size=None):
threading.Thread.__init__(self)
self.interval = int(interval)
self.finished = threading.Event()
self.metrics_aggregator = metrics_aggregator
self.flush_count = 0
self.log_count = 0
self.watchdog = None
if use_watchdog:
from common.util import Watchdog
self.watchdog = Watchdog(WATCHDOG_TIMEOUT)
self.api_host = api_host
self.event_chunk_size = event_chunk_size or EVENT_CHUNK_SIZE
self.http_conn_cls = http_client.HTTPSConnection
match = re.match('^(https?)://(.*)', api_host)
if match:
self.api_host = match.group(2)
if match.group(1) == 'http':
self.http_conn_cls = http_client.HTTPConnection
def stop(self):
log.info("Stopping reporter")
self.finished.set()
def run(self):
log.info("Reporting to %s every %ss" % (self.api_host, self.interval))
log.debug("Watchdog enabled: %s" % bool(self.watchdog))
# Persist a start-up message.
DogstatsdStatus().persist()
while not self.finished.isSet(): # Use camel case isSet for 2.4 support.
self.finished.wait(self.interval)
self.flush()
if self.watchdog:
self.watchdog.reset()
# Clean up the status messages.
log.debug("Stopped reporter")
DogstatsdStatus.remove_latest_status()
def flush(self):
try:
self.flush_count += 1
self.log_count += 1
packets_per_second = self.metrics_aggregator.packets_per_second(self.interval)
packet_count = self.metrics_aggregator.total_count
metrics = self.metrics_aggregator.flush()
count = len(metrics)
if self.flush_count % FLUSH_LOGGING_PERIOD == 0:
self.log_count = 0
if count:
self.submit(metrics)
events = self.metrics_aggregator.flush_events()
event_count = len(events)
if event_count:
self.submit_events(events)
should_log = self.flush_count <= FLUSH_LOGGING_INITIAL or self.log_count <= FLUSH_LOGGING_COUNT
log_func = log.info
if not should_log:
log_func = log.debug
log_func("Flush #%s: flushed %s metric%s and %s event%s" % (self.flush_count, count, plural(count), event_count, plural(event_count)))
if self.flush_count == FLUSH_LOGGING_INITIAL:
log.info("First flushes done, %s flushes will be logged every %s flushes." % (FLUSH_LOGGING_COUNT, FLUSH_LOGGING_PERIOD))
# Persist a status message.
packet_count = self.metrics_aggregator.total_count
DogstatsdStatus(
flush_count=self.flush_count,
packet_count=packet_count,
packets_per_second=packets_per_second,
metric_count=count,
event_count=event_count,
).persist()
except Exception, e:
log.exception("Error flushing metrics")
def submit(self, metrics):
# Copy and pasted from dogapi, because it's a bit of a pain to distribute python
# dependencies with the agent.
body = serialize_metrics(metrics)
headers = {'Content-Type':'application/json'}
method = 'POST'
params = {}
url = '/api/v1/series?%s' % urlencode(params)
start_time = time()
status = None
conn = self.http_conn_cls(self.api_host)
try:
conn.request(method, url, body, headers)
#FIXME: add timeout handling code here
response = conn.getresponse()
status = response.status
response.close()
finally:
conn.close()
duration = round((time() - start_time) * 1000.0, 4)
log.debug("%s %s %s%s (%sms)" % (
status, method, self.api_host, url, duration))
return duration
def submit_events(self, events):
headers = {'Content-Type':'application/json'}
method = 'POST'
events_len = len(events)
event_chunk_size = self.event_chunk_size
for chunk in chunks(events, event_chunk_size):
payload = {
'events': {
'api': chunk
},
'uuid': get_uuid(),
'hostname': get_hostname()
}
params = {}
url = '/intake?%s' % urlencode(params)
status = None
conn = self.http_conn_cls(self.api_host)
try:
start_time = time()
conn.request(method, url, json.dumps(payload), headers)
response = conn.getresponse()
status = response.status
response.close()
duration = round((time() - start_time) * 1000.0, 4)
log.debug("%s %s %s%s (%sms)" % (
status, method, self.api_host, url, duration))
finally:
conn.close()
class Server(object):
"""
A statsd udp server.
"""
def __init__(self, metrics_aggregator, host, port, forward_to_host=None, forward_to_port=None):
self.host = host
self.port = int(port)
self.address = (self.host, self.port)
self.metrics_aggregator = metrics_aggregator
self.buffer_size = 1024 * 8
self.running = False
self.should_forward = forward_to_host is not None
self.forward_udp_sock = None
# In case we want to forward every packet received to another statsd server
if self.should_forward:
if forward_to_port is None:
forward_to_port = 8125
log.info("External statsd forwarding enabled. All packets received will be forwarded to %s:%s" % (forward_to_host, forward_to_port))
try:
self.forward_udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.forward_udp_sock.connect((forward_to_host, forward_to_port))
except Exception, e:
log.exception("Error while setting up connection to external statsd server")
def start(self):
""" Run the server. """
# Bind to the UDP socket.
# IPv4 only
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.setblocking(0)
try:
self.socket.bind(self.address)
except socket.gaierror:
if self.address[0] == 'localhost':
log.warning("Warning localhost seems undefined in your host file, using 127.0.0.1 instead")
self.address = ('127.0.0.1', self.address[1])
self.socket.bind(self.address)
log.info('Listening on host & port: %s' % str(self.address))
# Inline variables for quick look-up.
buffer_size = self.buffer_size
#todo dogstatsd is the only thing using this method of the aggregator, is there a more standard way to do it?
aggregator_submit = self.metrics_aggregator.submit_packets
sock = [self.socket]
socket_recv = self.socket.recv
select_select = select.select
select_error = select.error
timeout = UDP_SOCKET_TIMEOUT
should_forward = self.should_forward
forward_udp_sock = self.forward_udp_sock
# Run our select loop.
self.running = True
while self.running:
try:
ready = select_select(sock, [], [], timeout)
if ready[0]:
message = socket_recv(buffer_size)
aggregator_submit(message)
if should_forward:
forward_udp_sock.send(message)
except select_error, se:
# Ignore interrupted system calls from sigterm.
errno = se[0]
if errno != 4:
raise
except (KeyboardInterrupt, SystemExit):
break
except Exception, e:
log.exception('Error receiving datagram')
def stop(self):
self.running = False
class Dogstatsd(Daemon):
""" This class is the dogstatsd daemon. """
def __init__(self, pid_file, server, reporter, autorestart):
Daemon.__init__(self, pid_file, autorestart=autorestart)
self.server = server
self.reporter = reporter
def _handle_sigterm(self, signum, frame):
log.debug("Caught sigterm. Stopping run loop.")
self.server.stop()
def run(self):
# Gracefully exit on sigterm.
signal.signal(signal.SIGTERM, self._handle_sigterm)
# Handle Keyboard Interrupt
signal.signal(signal.SIGINT, self._handle_sigterm)
# Start the reporting thread before accepting data
self.reporter.start()
try:
try:
self.server.start()
except Exception, e:
log.exception('Error starting server')
raise e
finally:
# The server will block until it's done. Once we're here, shutdown
# the reporting thread.
self.reporter.stop()
self.reporter.join()
log.info("Dogstatsd is stopped")
# Restart if asked to restart
if self.autorestart:
sys.exit(AgentSupervisor.RESTART_EXIT_STATUS)
def info(self):
logging.getLogger().setLevel(logging.ERROR)
return DogstatsdStatus.print_latest_status()
def init(config_path=None, use_watchdog=False):
"""Configure the server and the reporting thread.
"""
c = get_config(parse_args=False, cfg_path=config_path)
log.debug("Configuration dogstatsd")
port = c['dogstatsd_port']
interval = int(c['dogstatsd_interval'])
aggregator_interval = int(c['dogstatsd_agregator_bucket_size'])
non_local_traffic = c['non_local_traffic']
forward_to_host = c.get('statsd_forward_host')
forward_to_port = c.get('statsd_forward_port')
event_chunk_size = c.get('event_chunk_size')
target = c['forwarder_url']
hostname = get_hostname(c)
# Create the aggregator (which is the point of communication between the
# server and reporting threads.
assert 0 < interval
aggregator = MetricsBucketAggregator(hostname, aggregator_interval, recent_point_threshold=c.get('recent_point_threshold', None))
# Start the reporting thread.
reporter = Reporter(interval, aggregator, target, use_watchdog, event_chunk_size)
# Start the server on an IPv4 stack
# Default to loopback
server_host = 'localhost'
# If specified, bind to all addressses
if non_local_traffic:
server_host = ''
server = Server(aggregator, server_host, port, forward_to_host=forward_to_host, forward_to_port=forward_to_port)
return reporter, server, c
def main(config_path=None):
""" The main entry point for the unix version of dogstatsd. """
parser = optparse.OptionParser("%prog [start|stop|restart|status]")
opts, args = parser.parse_args()
reporter, server, cnf = init(config_path, use_watchdog=True)
pid_file = PidFile('dogstatsd')
daemon = Dogstatsd(pid_file.get_path(), server, reporter,
cnf.get('autorestart', False))
# If no args were passed in, run the server in the foreground.
if not args:
daemon.run()
return 0
# Otherwise, we're process the deamon command.
else:
command = args[0]
if command == 'start':
daemon.start()
elif command == 'stop':
daemon.stop()
elif command == 'restart':
daemon.restart()
elif command == 'status':
daemon.status()
elif command == 'info':
return daemon.info()
else:
sys.stderr.write("Unknown command: %s\n\n" % command)
parser.print_help()
return 1
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -174,13 +174,13 @@ class DogstatsdProcess(multiprocessing.Process):
self.is_enabled = True
def run(self):
log.debug("Windows Service - Starting Dogstatsd server")
log.debug("Windows Service - Starting Monstatsd server")
self.reporter, self.server, _ = dogstatsd.init()
self.reporter.start()
self.server.start()
def stop(self):
log.debug("Windows Service - Stopping Dogstatsd server")
log.debug("Windows Service - Stopping Monstatsd server")
self.server.stop()
self.reporter.stop()
self.reporter.join()

View File

@ -53,10 +53,10 @@ install_full: source
# Install the source to usr/share
cp -r $(ROOT)/* $(BUILD)/usr/share/mon/agent/
# Install the common executables.
ln -sf ../share/mon/agent/monagent/monstatsd/dogstatsd.py $(BUILD)/usr/bin/dogstatsd
ln -sf ../share/mon/agent/monagent/monstatsd/__init__.py $(BUILD)/usr/bin/monstatsd
ln -sf ../share/mon/agent/monagent/forwarder/__init__.py $(BUILD)/usr/bin/mon-forwarder
ln -sf ../share/mon/agent/monagent/collector/daemon.py $(BUILD)/usr/bin/mon-collector
chmod 755 $(BUILD)/usr/bin/dogstatsd
chmod 755 $(BUILD)/usr/bin/monstatsd
chmod 755 $(BUILD)/usr/bin/mon-forwarder
chmod 755 $(BUILD)/usr/bin/mon-collector

View File

@ -16,7 +16,7 @@ export PYTHONPATH=$PYTHONPATH:/usr/share/mon/agent/
AGENTPATH="/usr/bin/mon-collector"
AGENTCONF="/etc/mon-agent/agent.conf"
DOGSTATSDPATH="/usr/bin/dogstatsd"
MONSTATSDPATH="/usr/bin/monstatsd"
AGENTUSER="mon-agent"
FORWARDERPATH="/usr/bin/mon-forwarder"
NAME="mon-agent"
@ -125,11 +125,11 @@ case "$1" in
# (right now only mon-agent supports additional flags)
su $AGENTUSER -c "$AGENTPATH info $@"
COLLECTOR_RETURN=$?
su $AGENTUSER -c "$DOGSTATSDPATH info"
DOGSTATSD_RETURN=$?
su $AGENTUSER -c "$MONSTATSDPATH info"
MONSTATSD_RETURN=$?
su $AGENTUSER -c "$FORWARDERPATH info"
FORWARDER_RETURN=$?
exit $(($COLLECTOR_RETURN+$DOGSTATSD_RETURN+$FORWARDER_RETURN))
exit $(($COLLECTOR_RETURN+$MONSTATSD_RETURN+$FORWARDER_RETURN))
;;
status)

View File

@ -11,7 +11,7 @@ case "$1" in
chown -R mon-agent:root /etc/mon-agent
chown -R mon-agent:root /var/log/mon-agent
chown -R root:root /usr/share/mon/agent
chown -h root:root /usr/bin/dogstatsd
chown -h root:root /usr/bin/monstatsd
chown -h root:root /usr/bin/mon-collector
chown -h root:root /usr/bin/mon-forwarder

View File

@ -35,8 +35,8 @@ startsecs=3
priority=998
user=mon-agent
[program:dogstatsd]
command=/usr/bin/dogstatsd
[program:monstatsd]
command=/usr/bin/monstatsd
stdout_logfile=NONE
stderr_logfile=NONE
startsecs=3
@ -44,4 +44,4 @@ priority=998
user=mon-agent
[group:mon-agent]
programs=forwarder,collector,dogstatsd
programs=forwarder,collector,monstatsd

View File

@ -6,10 +6,10 @@ import unittest
import nose.tools as nt
from monagent.common.aggregator import MetricsAggregator
from monstatsd import dogstatsd
from monagent import monstatsd
class TestUnitDogStatsd(unittest.TestCase):
class TestUnitMonStatsd(unittest.TestCase):
@staticmethod
def sort_metrics(metrics):
@ -94,8 +94,8 @@ class TestUnitDogStatsd(unittest.TestCase):
import json
from monagent.common.aggregator import api_formatter
dogstatsd.json = json
serialized = dogstatsd.serialize_metrics([api_formatter("foo", 12, 1, ('tag',), 'host')])
monstatsd.json = json
serialized = monstatsd.serialize_metrics([api_formatter("foo", 12, 1, ('tag',), 'host')])
assert '"tags": ["tag"]' in serialized
def test_counter(self):
@ -364,12 +364,12 @@ class TestUnitDogStatsd(unittest.TestCase):
stats = MetricsAggregator('myhost')
for i in xrange(10):
stats.submit_packets('metric:10|c')
stats.send_packet_count('datadog.dogstatsd.packet.count')
stats.send_packet_count('monstatsd.packet.count')
metrics = self.sort_metrics(stats.flush())
nt.assert_equals(2, len(metrics))
first, second = metrics
nt.assert_equal(first['metric'], 'datadog.dogstatsd.packet.count')
nt.assert_equal(first['metric'], 'monstatsd.packet.count')
nt.assert_equal(first['points'][0][1], 10)
def test_histogram_counter(self):