Moving emitter from collector to forwarder

This commit is contained in:
gary-hessler 2014-04-08 10:28:50 -06:00
parent 4ad93c62d2
commit f613a3425c
8 changed files with 188 additions and 52 deletions

4
.gitignore vendored
View File

@ -1,8 +1,12 @@
.pydevproject
.project
.idea
.settings
*.pyc
agent.conf
Monitoring_Agent.egg-info
build
dist
mon-agent_4.2.0-0_all.deb
artifacts
root

View File

@ -32,15 +32,15 @@ if int(sys.version_info[1]) <= 3:
# Custom modules
from checks.collector import Collector
from checks.check_status import CollectorStatus
from config import get_config, get_system_stats, get_parsed_args, load_check_directory, get_confd_path, check_yaml, get_logging_config, _is_affirmative
from config import get_config, get_system_stats, get_parsed_args, load_check_directory, get_confd_path, check_yaml, get_logging_config
from daemon import Daemon, AgentSupervisor
from emitter import http_emitter
from util import Watchdog, PidFile, EC2, get_os
from jmxfetch import JMXFetch
from mon_lib.mon_api_emitter import MonApiEmitter
# Constants
PID_NAME = "mon-agent"
PID_NAME = "dd-agent"
WATCHDOG_MULTIPLIER = 10
RESTART_INTERVAL = 4 * 24 * 60 * 60 # Defaults to 4 days
START_COMMANDS = ['start', 'restart', 'foreground']
@ -167,12 +167,7 @@ class Agent(Daemon):
sys.exit(0)
def _get_emitters(self, agentConfig):
emitters = []
if _is_affirmative(agentConfig.get("send_to_datadog")):
emitters.append(http_emitter)
elif _is_affirmative(agentConfig.get("send_to_mon_api")):
emitters.append(MonApiEmitter)
return emitters
return [http_emitter]
def _get_watchdog(self, check_freq, agentConfig):
watchdog = None

View File

@ -1,5 +1,5 @@
import requests
from keystone import Keystone
from mon_keystone import Keystone
from util import json, md5
class MonAPI(object):
@ -24,7 +24,7 @@ class MonAPI(object):
try:
data = json.dumps(payload)
print data
response = requests.post(self.endpoint, data=data, headers=self.headers, verify=False)
response = requests.post(self.endpoint, data=data, headers=self.headers)
if response:
if response.status_code >= 200 and response.status_code <= 299:
# Good status from web service

View File

@ -1,16 +1,17 @@
import time
import calendar
import datetime
from copy import deepcopy
from monapi import MonAPI
from mon_lib.mon_api import MonAPI
from mon_lib.mon_normalizer import MonNormalizer
from config import _is_affirmative
class MonApiEmitter(object):
def __init__(self, payload, logger, config):
self.logger = logger
self.logger.debug("Configuration Info: " + str(config))
self.logger.debug("Initializing the mon-api emitter...")
self.mapping_key = "_mapping"
self.config = config
self.payload = payload
self.project_id = config['mon_api_project_id']
self.mon_api_url = config['mon_api_url']
self.user_id = config['mon_api_username']
@ -18,11 +19,17 @@ class MonApiEmitter(object):
self.use_keystone = config['use_keystone']
self.keystone_url = config['keystone_url']
self.aggregate_metrics = config['aggregate_metrics']
self.host_tags = self.get_standard_dimensions()
self.discard = "DISCARD"
self.sendToAPI()
self.payload = payload
self.device_name = ""
self.normalizer = MonNormalizer(logger, config['mon_mapping_file'])
if self.normalizer.is_initialized():
self.emitter()
def sendToAPI(self):
def emitter(self):
self.logger.debug("Beginning metrics processing in mon-api emitter...")
self.host_tags = self.get_standard_dimensions()
api = MonAPI(self.mon_api_url, self.use_keystone, self.keystone_url, self.project_id, self.user_id, self.password, self.logger)
self.logger.debug('mon_api_http_emitter: attempting postback to ' + self.mon_api_url)
@ -32,14 +39,13 @@ class MonApiEmitter(object):
try:
self.logger.debug("Agent Metric to Process: " + str(agent_metric))
api_metric = self.get_api_metric(agent_metric, self.project_id)
self.logger.debug("API Metric to Send: " + str(api_metric))
if _is_affirmative(self.aggregate_metrics):
metrics_list.extend(api_metric)
else:
api.create_or_update_metric(api_metric)
self.logger.debug("Sending metric to API: %s", str(api_metric))
#self.logger.debug('mon_api_http_emitter: postback response: ' + str(response.read()))
except Exception as ex:
self.logger.exception("Error sending message to mon-api")
@ -50,11 +56,11 @@ class MonApiEmitter(object):
timestamp = self.get_timestamp(self.payload)
metrics_list = []
dimensions = deepcopy(self.host_tags)
name = self.normalize_name(agent_metric)
name = self.normalizer.normalize_name(agent_metric)
if name != self.discard:
value = self.payload[agent_metric]
if isinstance(value, int) or isinstance(value, float):
metric = {"name": name, "timestamp": timestamp, "value": value, "dimensions": dimensions}
if isinstance(value, str):
metric = {"name": self.normalizer.normalize_name(name), "timestamp": timestamp, "value": self.normalizer.encode(value), "dimensions": dimensions}
metrics_list.append(metric)
elif isinstance(value, dict):
metrics_list.extend(self.process_dict(name, timestamp, value))
@ -64,61 +70,90 @@ class MonApiEmitter(object):
metrics_list.extend(self.process_list(name, timestamp, value))
elif isinstance(value, tuple):
metrics_list.extend(self.process_list(name, timestamp, value))
elif isinstance(value, int) or isinstance(value, float):
metric = {"name": self.normalizer.normalize_name(name), "timestamp": timestamp, "value": value, "dimensions": dimensions}
metrics_list.append(metric)
return metrics_list
def get_timestamp(self, message):
if "collection_timestamp" in message:
timestamp = message["collection_timestamp"]
elif "timestamp" in message:
timestamp = message["timestamp"]
else:
timestamp = time.gmtime()
timestamp = calendar.timegm(datetime.datetime.utcnow().utctimetuple())
return timestamp
def process_dict(self, name, timestamp, values):
metrics = []
if name == "ioStats" or name == "system_metrics":
if name == "ioStats":
for key in values.iterkeys():
self.device_name = key
metrics.extend(self.process_dict(key, timestamp, values[key]))
return self.process_dict(key, timestamp, values[key])
else:
for key in values.iterkeys():
metric_name = self.normalize_name(key)
metric_name = self.normalizer.normalize_name(key)
if metric_name != self.discard:
dimensions = deepcopy(self.host_tags)
dimensions.update({"device": self.device_name})
metric = {"name": metric_name, "timestamp": timestamp, "value": values[key], "dimensions": dimensions}
dimensions.update({"device": self.normalizer.encode(name)})
metric = {"name": metric_name, "timestamp": timestamp, "value": self.normalizer.encode(values[key]), "dimensions": dimensions}
metrics.append(metric)
return metrics
def process_list(self, name, timestamp, values):
metrics = []
if name == "diskUsage" or name == "inodes":
if name == "disk_usage" or name == "inodes":
for item in values:
if name != self.discard:
dimensions = deepcopy(self.host_tags)
dimensions.update({"device": item[0]})
dimensions.update({"device": self.normalizer.encode(item[0])})
if len(item) >= 9:
dimensions.update({"mountpoint": item[8]})
metric = {"name": name, "timestamp": timestamp, "value": item[4].rstrip("%"), "dimensions": dimensions}
dimensions.update({"mountpoint": self.normalizer.encode(item[8])})
metric = {"name": name, "timestamp": timestamp, "value": self.normalizer.encode(item[4].rstrip("%")), "dimensions": dimensions}
metrics.append(metric)
elif name == "metrics":
# These are metrics sent in a format we know about from checks
# self.logger.debug("Metric Values: ", str(values))
for item in values:
# self.logger.debug("Metric Item: ", str(item))
dimensions = deepcopy(self.host_tags)
for name2 in item[3].iterkeys():
value2 = item[3][name2]
# self.logger.debug("Metric Item2: ", name2)
if name2 == "type" or name2 == "interval" or value2 == None:
continue
if name2 == "tags":
dimensions.update(self.process_tags(value2))
else:
dimensions.update({name2 : value2})
metric = {"name": item[0], "timestamp": timestamp, "value": item[2], "dimensions": dimensions}
dimensions.update({self.normalizer.encode(name2) : self.normalizer.encode(value2)})
metric = {"name": self.normalizer.normalize_name(item[0]), "timestamp": timestamp, "value": item[2], "dimensions": dimensions}
metrics.append(metric)
elif name == "series":
# These are metrics sent in a format we know about from dogstatsd
for item in values:
dimensions = deepcopy(self.host_tags)
metric_name = ""
metric_timestamp = 0
metric_value = 0
points = []
for name2 in item.iterkeys():
value2 = item[name2]
if name2 == "type" or name2 == "interval" or value2 == None:
continue
if name2 == "points":
points = value2
elif name2 == "tags":
dimensions.update(self.process_tags(value2))
elif name2 == "metric":
metric_name = self.normalizer.normalize_name(value2)
else:
dimensions.update({self.normalizer.encode(name2) : self.normalizer.encode(value2)})
for point in points:
metric_timestamp = point[0]
metric_value = point[1]
metric = {"name": metric_name, "timestamp": metric_timestamp, "value": metric_value, "dimensions": dimensions}
metrics.append(metric)
else:
# We don't know what this metric list is. Just add it as dimensions
counter = 0
dimensions = deepcopy(self.host_tags)
self.logger.info("Found an unknown metric...")
for item in values:
dimensions.update({"Value" + str(counter) : item})
counter+= 1
@ -129,25 +164,23 @@ class MonApiEmitter(object):
def process_tags(self, tags):
# This will process tag strings in the format "name:value" and put them in a dictionary to be added as dimensions
processed_tags = {}
index = 0
# Metrics tags are a list of strings
for tag in tags:
tag_parts = tag.split(':')
name = tag_parts[0].strip()
value = tag_parts[1].strip()
processed_tags.update({name.encode('ascii','ignore') : value.encode('ascii','ignore')})
if(tag.find(':') != -1):
tag_parts = tag.split(':')
name = tag_parts[0].strip()
value = tag_parts[1].strip()
processed_tags.update({self.normalizer.encode(name) : self.normalizer.encode(value)})
else:
processed_tags.update({"tag" + str(index) : self.normalizer.encode(tag)})
index += 1
return processed_tags
def normalize_name(self, key):
name = key
lookup = key.lower() + self.mapping_key
if lookup in self.config:
name = self.config[lookup]
return name
def get_standard_dimensions(self):
dimensions = {}
if "internalHostname" in self.payload:
dimensions.update({"hostname": self.payload["internalHostname"]})
dimensions.update({"hostname": self.normalizer.encode(self.payload["internalHostname"])})
if "host-tags" in self.payload:
self.logger.debug("Host-Tags" + str(self.payload["host-tags"]))
host_tags = self.payload["host-tags"]
@ -157,3 +190,4 @@ class MonApiEmitter(object):
tags = tag.split(',')
dimensions.update(self.process_tags(tags))
return dimensions

View File

@ -0,0 +1,58 @@
{
"agentVersion": "DISCARD",
"apiKey": "DISCARD",
"collection_timestamp": "DISCARD",
"cpuIdle" : "cpu_idle_perc",
"cpuStolen" : "cpu_stolen_perc",
"cpuSystem" : "cpu_system_perc",
"cpuUser" : "cpu_user_perc",
"cpuWait" : "cpu_iowait_perc",
"diskUsage" : "disk_usage",
"events": "DISCARD",
"host-tags": "DISCARD",
"internalHostname" : "DISCARD",
"inodes": "inodes",
"rkB/s" : "io_read_kbytes_sec",
"avgrq-sz" : "DISCARD",
"%util" : "DISCARD",
"svctm" : "DISCARD",
"r/s" : "io_read_req_sec",
"wrqm/s" : "DISCARD",
"wkB/s" : "io_write_kbytes_sec",
"r_await" : "DISCARD",
"w_await" : "DISCARD",
"rrqm/s" : "DISCARD",
"w/s" : "io_write_req_sec",
"await" : "DISCARD",
"avgqu-sz" : "DISCARD",
"memPhysTotal" : "mem_total_mb",
"memPhysFree" : "mem_free_mb",
"memPhysUsable" : "mem_usable_mb",
"memPhysPctUsable" : "mem_usable_perc",
"memPhysUsed" : "mem_usable_perc",
"memBuffers" : "mem_used_buffers",
"memCached" : "mem_used_cached",
"memShared" : "mem_used_shared",
"memSwapPctFree" : "mem_swap_free_perc",
"memSwapTotal" : "mem_swap_total_mb",
"memSwapUsed" : "mem_swap_used_mb",
"memSwapFree" : "mem_swap_free_mb",
"meta": "DISCARD",
"os": "DISCARD",
"processes": "DISCARD",
"python": "DISCARD",
"system.load.1" : "load_avg_1_min",
"system.load.5" : "load_avg_5_min",
"system.load.15" : "load_avg_15_min",
"system.load.norm.1": "DISCARD",
"system.load.norm.5": "DISCARD",
"system.load.norm.15": "DISCARD",
"systemStats": "DISCARD",
"system.net.bytes_rcvd": "net_bytes_in",
"system.net.bytes_sent": "net_bytes_out",
"system.net.packets_in.count": "net_packets_in",
"system.net.packets_out.count": "net_packets_out",
"system.net.packets_in.error": "net_errors_in",
"system.net.packets_out.error": "net_errors_out",
"uuid": "DISCARD"
}

View File

@ -0,0 +1,44 @@
from util import json
import os
class MonNormalizer(object):
def __init__(self, logger, mapping_file_path):
self.logger = logger
self.mapping_file_path = mapping_file_path
self.metric_map = self.__get_metric_map()
def is_initialized(self):
return self.metric_map != None
def normalize_name(self, key):
name = key
if name in self.metric_map:
name = self.encode(self.metric_map[name])
else:
name = self.encode(name)
return name
def encode(self, string):
return_str = string
if isinstance(string, basestring):
return_str = string.encode('ascii','ignore')
return return_str
def __get_metric_map(self):
json_data = None
try:
json_data = open(self.mapping_file_path, 'r')
data = json.loads(json_data.read())
json_data.close()
return data
except IOError as e:
self.logger.error("I/O error while loading metric mapping file({0}): {1}".format(e.errno, e.strerror))
except ValueError as v:
self.logger.error("Value error while decoding JSON from metric mapping file({0}): {1}".format(v.errno, v.strerror))
except:
self.logger.error("Unable to process metric mapping file...")
if json_data:
json_data.close()
return None

View File

@ -64,6 +64,7 @@ install_base: source
cp -r $(ROOT)/* $(BUILD)/usr/share/datadog/agent/
cp $(ROOT)/datadog.conf.example $(BUILD)/etc/dd-agent
cp -r $(ROOT)/conf.d/* $(BUILD)/etc/dd-agent/conf.d/
cp -r $(ROOT)/mon_lib/mon_mapping.json $(BUILD)/etc/dd-agent/
# Install the common executables.
ln -sf ../share/datadog/agent/dogstatsd.py $(BUILD)/usr/bin/dogstatsd
ln -sf ../share/datadog/agent/agent.py $(BUILD)/usr/bin/dd-agent