More pep8 changes

This commit is contained in:
Tim Kuhlman 2014-04-29 12:16:30 -06:00
parent 67b9dc5aef
commit 046a3426df
47 changed files with 212 additions and 214 deletions

View File

@ -58,7 +58,6 @@ class Gauge(Metric):
self.last_sample_time = time()
self.timestamp = timestamp
def flush(self, timestamp, interval):
if self.value is not None:
res = [self.formatter(
@ -76,6 +75,7 @@ class Gauge(Metric):
return []
class BucketGauge(Gauge):
""" A metric that tracks a value at particular points in time.
The difference beween this class and Gauge is that this class will
@ -101,6 +101,7 @@ class BucketGauge(Gauge):
return []
class Counter(Metric):
""" A metric that tracks a counter value. """
@ -289,6 +290,7 @@ class Rate(Metric):
finally:
self.samples = self.samples[-1:]
class Aggregator(object):
"""
Abstract metric aggregator class.

View File

@ -28,7 +28,8 @@ def get_api_metric(agent_metric, payload, host_tags, log):
elif isinstance(value, tuple):
metrics_list.extend(process_list(name, timestamp, value, host_tags, log))
elif isinstance(value, int) or isinstance(value, float):
metric = {"name": normalizer.normalize_name(name), "timestamp": timestamp, "value": value, "dimensions": dimensions}
metric = {"name": normalizer.normalize_name(name), "timestamp": timestamp,
"value": value, "dimensions": dimensions}
metrics_list.append(metric)
return metrics_list
@ -69,7 +70,7 @@ def process_list(name, timestamp, values, host_tags, log):
dimensions = deepcopy(host_tags)
dimensions.update({"device": encode(item[0])})
if len(item) >= 9:
dimensions.update({"mountpoint": normalizer.encode(item[8])})
dimensions.update({"mountpoint": normalizer.encode(item[8])})
metric = {"name": name, "timestamp": timestamp, "value": normalizer.encode(item[4].rstrip("%")), "dimensions": dimensions}
metrics.append(metric)
elif name == "metrics":
@ -77,13 +78,13 @@ def process_list(name, timestamp, values, host_tags, log):
for item in values:
dimensions = deepcopy(host_tags)
for name2 in item[3].iterkeys():
value2 = item[3][name2]
if name2 == discard or name2 == "type" or name2 == "interval" or value2 is None:
continue
if name2 == "tags":
dimensions.update(process_tags(value2))
else:
dimensions.update({normalizer.encode(name2) : normalizer.encode(value2)})
value2 = item[3][name2]
if name2 == discard or name2 == "type" or name2 == "interval" or value2 is None:
continue
if name2 == "tags":
dimensions.update(process_tags(value2))
else:
dimensions.update({normalizer.encode(name2): normalizer.encode(value2)})
metric_name = normalizer.normalize_name(item[0])
if metric_name == discard:
continue

View File

@ -36,7 +36,7 @@ class MonNormalizer(Singleton):
def encode(string):
return_str = string
if isinstance(string, basestring):
return_str = string.encode('ascii','ignore')
return_str = string.encode('ascii', 'ignore')
return return_str
def _get_metric_map(self):

View File

@ -45,7 +45,7 @@ class Apache(AgentCheck):
# Loop through and extract the numerical values
for line in response.split('\n'):
values = line.split(': ')
if len(values) == 2: # match
if len(values) == 2: # match
metric, value = values
try:
value = float(value)

View File

@ -83,8 +83,7 @@ class Cacti(AgentCheck):
if whitelist:
if not os.path.isfile(whitelist) or not os.access(whitelist, os.R_OK):
# Don't run the check if the whitelist is unavailable
self.log.exception("Unable to read whitelist file at %s" \
% (whitelist))
self.log.exception("Unable to read whitelist file at %s" % whitelist)
wl = open(whitelist)
for line in wl:

View File

@ -67,7 +67,7 @@ class Couchbase(AgentCheck):
if tags is None:
tags = []
tags.append('instance:%s' % server)
instance['is_recent_python'] = sys.version_info >= (2,6,0)
instance['is_recent_python'] = sys.version_info >= (2, 6, 0)
data = self.get_data(server, instance)
self._create_metrics(data, tags=list(set(tags)))

View File

@ -65,6 +65,7 @@ DOCKER_TAGS = [
"Image",
]
class UnixHTTPConnection(httplib.HTTPConnection, object):
"""Class used in conjuction with UnixSocketHandler to make urllib2
compatible with Unix sockets."""

View File

@ -108,7 +108,7 @@ class ElasticSearch(AgentCheck):
"elasticsearch.relocating_shards": ("gauge", "relocating_shards"),
"elasticsearch.initializing_shards": ("gauge", "initializing_shards"),
"elasticsearch.unassigned_shards": ("gauge", "unassigned_shards"),
"elasticsearch.cluster_status": ("gauge", "status", lambda v: {"red":0,"yellow":1,"green":2}.get(v, -1)),
"elasticsearch.cluster_status": ("gauge", "status", lambda v: {"red": 0, "yellow": 1, "green": 2}.get(v, -1)),
}
def __init__(self, name, init_config, agentConfig):

View File

@ -1,5 +1,6 @@
from checks import AgentCheck
class Gearman(AgentCheck):
@staticmethod
@ -42,8 +43,8 @@ class Gearman(AgentCheck):
self.gauge("gearman.queued", queued, tags=tags)
self.gauge("gearman.workers", workers, tags=tags)
self.log.debug("running %d, queued %d, unique tasks %d, workers: %d"
% (running, queued, unique_tasks, workers))
self.log.debug("running %d, queued %d, unique tasks %d, workers: %d" %
(running, queued, unique_tasks, workers))
def _get_conf(self, instance):
host = instance.get('server', None)

View File

@ -35,7 +35,7 @@ class HDFSCheck(AgentCheck):
self.gauge('hdfs.free', stats['remaining'], tags=tags)
self.gauge('hdfs.capacity', stats['capacity'], tags=tags)
self.gauge('hdfs.in_use', float(stats['used']) /
float(stats['capacity']), tags=tags)
float(stats['capacity']), tags=tags)
self.gauge('hdfs.under_replicated', stats['under_replicated'],
tags=tags)
self.gauge('hdfs.missing_blocks', stats['missing_blocks'], tags=tags)

View File

@ -58,8 +58,7 @@ class HostAlive(AgentCheck):
ping_command = ping_prefix + host
try:
ping = subprocess.check_output(ping_command.split(" "),
stderr=subprocess.STDOUT)
ping = subprocess.check_output(ping_command.split(" "), stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return False
@ -81,8 +80,8 @@ class HostAlive(AgentCheck):
if instance['alive_test'] == 'ssh':
success = self._test_ssh(instance['host_name'],
self.init_config.get('ssh_port'),
self.init_config.get('ssh_timeout'))
self.init_config.get('ssh_port'),
self.init_config.get('ssh_timeout'))
elif instance['alive_test'] == 'ping':
success = self._test_ping(instance['host_name'],
self.init_config.get('ping_timeout'))

View File

@ -57,13 +57,13 @@ class HTTPCheck(ServicesCheck):
raise
if response_time:
# Stop the timer as early as possible
running_time = time.time() - start
# Store tags in a temporary list so that we don't modify the global tags data structure
tags_list = []
tags_list.extend(tags)
tags_list.append('url:%s' % addr)
self.gauge('network.http.response_time', running_time, tags=tags_list)
# Stop the timer as early as possible
running_time = time.time() - start
# Store tags in a temporary list so that we don't modify the global tags data structure
tags_list = []
tags_list.extend(tags)
tags_list.append('url:%s' % addr)
self.gauge('network.http.response_time', running_time, tags=tags_list)
if int(resp.status) >= 400:
self.log.info("%s is DOWN, error code: %s" % (addr, str(resp.status)))

View File

@ -81,7 +81,7 @@ class Lighttpd(AgentCheck):
# Loop through and extract the numerical values
for line in response.split('\n'):
values = line.split(': ')
if len(values) == 2: # match
if len(values) == 2: # match
metric, value = values
try:
value = float(value)

View File

@ -58,8 +58,7 @@ class HostAlive(AgentCheck):
ping_command = ping_prefix + host
try:
ping = subprocess.check_output(ping_command.split(" "),
stderr=subprocess.STDOUT)
ping = subprocess.check_output(ping_command.split(" "), stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return False
@ -81,8 +80,8 @@ class HostAlive(AgentCheck):
if instance['alive_test'] == 'ssh':
success = self._test_ssh(instance['host_name'],
self.init_config.get('ssh_port'),
self.init_config.get('ssh_timeout'))
self.init_config.get('ssh_port'),
self.init_config.get('ssh_timeout'))
elif instance['alive_test'] == 'ping':
success = self._test_ping(instance['host_name'],
self.init_config.get('ping_timeout'))

View File

@ -70,11 +70,11 @@ class HTTPCheck(AgentCheck):
raise
if response_time:
# Stop the timer as early as possible
running_time = time.time() - start
tags_rt = tags
tags_rt.append('url:%s' % addr)
self.gauge('mon_http_response_time', running_time, tags=tags_rt)
# Stop the timer as early as possible
running_time = time.time() - start
tags_rt = tags
tags_rt.append('url:%s' % addr)
self.gauge('mon_http_response_time', running_time, tags=tags_rt)
# Add a 'detail' tag if requested
if include_content:

View File

@ -65,9 +65,9 @@ class WrapNagios(AgentCheck):
try:
proc = subprocess.Popen(instance['check_command'].split(" "),
env={"PATH": extra_path},
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
env={"PATH": extra_path},
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output = proc.communicate()
# The check detail is all the text before the pipe
detail = output[0].split('|')[0]
@ -77,8 +77,8 @@ class WrapNagios(AgentCheck):
except OSError:
# Return an UNKNOWN code (3) if I have landed here
self.gauge(instance['service_name'], 3, tags=tags)
self.log.info(instance['check_command'].split(" ")[0]
+ " is missing or unreadable")
self.log.info(instance['check_command'].split(" ")[0] +
" is missing or unreadable")
return
last_run_data[instance['service_name']] = time.time()

View File

@ -30,7 +30,7 @@ STATUS_VARS = {
'Com_update_multi': ('mysql.performance.com_update_multi', RATE),
'Com_delete_multi': ('mysql.performance.com_delete_multi', RATE),
'Com_replace_select': ('mysql.performance.com_replace_select', RATE),
'Qcache_hits':('mysql.performance.qcache_hits', RATE),
'Qcache_hits': ('mysql.performance.qcache_hits', RATE),
'Innodb_mutex_spin_waits': ('mysql.innodb.mutex_spin_waits', RATE),
'Innodb_mutex_spin_rounds': ('mysql.innodb.mutex_spin_rounds', RATE),
'Innodb_mutex_os_waits': ('mysql.innodb.mutex_os_waits', RATE),
@ -42,6 +42,7 @@ STATUS_VARS = {
'Innodb_current_row_locks': ('mysql.innodb.current_row_locks', GAUGE),
}
class MySql(AgentCheck):
def __init__(self, name, init_config, agentConfig):
AgentCheck.__init__(self, name, init_config, agentConfig)

View File

@ -23,10 +23,8 @@ class WrapNagios(AgentCheck):
""" Determine whether or not to skip a check depending on
the checks's check_interval, if specified, and the last
time the check was run """
if (instance['service_name'] in last_run_data
and 'check_interval' in instance):
if (time.time() < last_run_data[instance['service_name']]
+ instance['check_interval']):
if (instance['service_name'] in last_run_data and 'check_interval' in instance):
if (time.time() < last_run_data[instance['service_name']] + instance['check_interval']):
return True
else:
return False
@ -51,8 +49,7 @@ class WrapNagios(AgentCheck):
if last_run_path.endswith('/') is False:
last_run_path += '/'
last_run_file = (last_run_path + 'nagios_wrapper_'
+ hashlib.md5(instance['service_name']).hexdigest() + '.pck')
last_run_file = (last_run_path + 'nagios_wrapper_' + hashlib.md5(instance['service_name']).hexdigest() + '.pck')
# Load last-run data from shared memory file
last_run_data = {}
@ -67,9 +64,9 @@ class WrapNagios(AgentCheck):
try:
proc = subprocess.Popen(instance['check_command'].split(" "),
env={"PATH": extra_path},
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
env={"PATH": extra_path},
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output = proc.communicate()
# The check detail is all the text before the pipe
detail = output[0].split('|')[0]

View File

@ -30,18 +30,18 @@ class Network(AgentCheck):
}
NETSTAT_GAUGE = {
('udp4', 'connections') : 'system.net.udp4.connections',
('udp6', 'connections') : 'system.net.udp6.connections',
('tcp4', 'established') : 'system.net.tcp4.established',
('tcp4', 'opening') : 'system.net.tcp4.opening',
('tcp4', 'closing') : 'system.net.tcp4.closing',
('tcp4', 'listening') : 'system.net.tcp4.listening',
('tcp4', 'time_wait') : 'system.net.tcp4.time_wait',
('tcp6', 'established') : 'system.net.tcp6.established',
('tcp6', 'opening') : 'system.net.tcp6.opening',
('tcp6', 'closing') : 'system.net.tcp6.closing',
('tcp6', 'listening') : 'system.net.tcp6.listening',
('tcp6', 'time_wait') : 'system.net.tcp6.time_wait',
('udp4', 'connections'): 'system.net.udp4.connections',
('udp6', 'connections'): 'system.net.udp6.connections',
('tcp4', 'established'): 'system.net.tcp4.established',
('tcp4', 'opening'): 'system.net.tcp4.opening',
('tcp4', 'closing'): 'system.net.tcp4.closing',
('tcp4', 'listening'): 'system.net.tcp4.listening',
('tcp4', 'time_wait'): 'system.net.tcp4.time_wait',
('tcp6', 'established'): 'system.net.tcp6.established',
('tcp6', 'opening'): 'system.net.tcp6.opening',
('tcp6', 'closing'): 'system.net.tcp6.closing',
('tcp6', 'listening'): 'system.net.tcp6.listening',
('tcp6', 'time_wait'): 'system.net.tcp6.time_wait',
}
def __init__(self, name, init_config, agentConfig, instances=None):

View File

@ -61,7 +61,8 @@ class PostfixCheck(AgentCheck):
raise Exception('The dd-agent user does not have sudo access')
# emit an individually tagged metric
self.gauge('postfix.queue.size', count, tags=tags + ['queue:%s' % queue, 'instance:%s' % os.path.basename(directory)])
self.gauge('postfix.queue.size', count,
tags=tags + ['queue:%s' % queue, 'instance:%s' % os.path.basename(directory)])
# these can be retrieved in a single graph statement
# for example:

View File

@ -1,7 +1,8 @@
from checks import AgentCheck, CheckException
class ShouldRestartException(Exception): pass
class ShouldRestartException(Exception):
pass
class PostgreSql(AgentCheck):
@ -13,7 +14,7 @@ class PostgreSql(AgentCheck):
# turning columns into tags
DB_METRICS = {
'descriptors': [ ('datname', 'db')],
'descriptors': [('datname', 'db')],
'metrics': {'numbackends': ('postgresql.connections', GAUGE),
'xact_commit': ('postgresql.commits', RATE),
'xact_rollback': ('postgresql.rollbacks', RATE),
@ -35,27 +36,23 @@ SELECT datname,
}
NEWER_92_METRICS = {
'deadlocks' : ('postgresql.deadlocks', GAUGE),
'temp_bytes' : ('postgresql.temp_bytes', RATE),
'temp_files' : ('postgresql.temp_files', RATE),
'deadlocks': ('postgresql.deadlocks', GAUGE),
'temp_bytes': ('postgresql.temp_bytes', RATE),
'temp_files': ('postgresql.temp_files', RATE),
}
REL_METRICS = {
'descriptors': [
('relname', 'table')
],
'metrics': {
'seq_scan' : ('postgresql.seq_scans', RATE),
'seq_tup_read' : ('postgresql.seq_rows_read', RATE),
'idx_scan' : ('postgresql.index_scans', RATE),
'idx_tup_fetch' : ('postgresql.index_rows_fetched', RATE),
'n_tup_ins' : ('postgresql.rows_inserted', RATE),
'n_tup_upd' : ('postgresql.rows_updated', RATE),
'n_tup_del' : ('postgresql.rows_deleted', RATE),
'n_tup_hot_upd' : ('postgresql.rows_hot_updated', RATE),
'n_live_tup' : ('postgresql.live_rows', GAUGE),
'n_dead_tup' : ('postgresql.dead_rows', GAUGE),
},
'descriptors': [('relname', 'table')],
'metrics': {'seq_scan': ('postgresql.seq_scans', RATE),
'seq_tup_read': ('postgresql.seq_rows_read', RATE),
'idx_scan': ('postgresql.index_scans', RATE),
'idx_tup_fetch': ('postgresql.index_rows_fetched', RATE),
'n_tup_ins': ('postgresql.rows_inserted', RATE),
'n_tup_upd': ('postgresql.rows_updated', RATE),
'n_tup_del': ('postgresql.rows_deleted', RATE),
'n_tup_hot_upd': ('postgresql.rows_hot_updated', RATE),
'n_live_tup': ('postgresql.live_rows', GAUGE),
'n_dead_tup': ('postgresql.dead_rows', GAUGE)},
'query': """
SELECT relname,
%s
@ -70,9 +67,9 @@ SELECT relname,
('indexrelname', 'index')
],
'metrics': {
'idx_scan' : ('postgresql.index_scans', RATE),
'idx_tup_read' : ('postgresql.index_rows_read', RATE),
'idx_tup_fetch' : ('postgresql.index_rows_fetched', RATE),
'idx_scan': ('postgresql.index_scans', RATE),
'idx_tup_read': ('postgresql.index_rows_read', RATE),
'idx_tup_fetch': ('postgresql.index_rows_fetched', RATE),
},
'query': """
SELECT relname,
@ -83,7 +80,6 @@ SELECT relname,
'relation': True,
}
def __init__(self, name, init_config, agentConfig):
AgentCheck.__init__(self, name, init_config, agentConfig)
self.dbs = {}

View File

@ -51,7 +51,7 @@ class ProcessCheck(AgentCheck):
if not found:
try:
try:
cmdline = proc.cmdline() # psutil >= 2.0
cmdline = proc.cmdline() # psutil >= 2.0
except TypeError:
cmdline = proc.cmdline # psutil < 2.0

View File

@ -26,8 +26,6 @@ NODE_ATTRIBUTES = ['fd_used',
ATTRIBUTES = {QUEUE_TYPE: QUEUE_ATTRIBUTES, NODE_TYPE: NODE_ATTRIBUTES}
TAGS_MAP = {
QUEUE_TYPE: {'node': 'node',
'name': 'queue',
@ -38,6 +36,7 @@ TAGS_MAP = {
METRIC_SUFFIX = {QUEUE_TYPE: "queue", NODE_TYPE: "node"}
class RabbitMQ(AgentCheck):
"""This check is for gathering statistics from the RabbitMQ
Management Plugin (http://www.rabbitmq.com/management.html)

View File

@ -10,41 +10,37 @@ import socket
class Riak(AgentCheck):
keys = [
"vnode_gets",
"vnode_puts",
"vnode_index_reads",
"vnode_index_writes",
"vnode_index_deletes",
"node_gets",
"node_puts",
"pbc_active",
"pbc_connects",
"memory_total",
"memory_processes",
"memory_processes_used",
"memory_atom",
"memory_atom_used",
"memory_binary",
"memory_code",
"memory_ets",
"read_repairs",
"node_put_fsm_rejected_60s",
"node_put_fsm_active_60s",
"node_put_fsm_in_rate",
"node_put_fsm_out_rate",
"node_get_fsm_rejected_60s",
"node_get_fsm_active_60s",
"node_get_fsm_in_rate",
"node_get_fsm_out_rate"
]
keys = ["vnode_gets",
"vnode_puts",
"vnode_index_reads",
"vnode_index_writes",
"vnode_index_deletes",
"node_gets",
"node_puts",
"pbc_active",
"pbc_connects",
"memory_total",
"memory_processes",
"memory_processes_used",
"memory_atom",
"memory_atom_used",
"memory_binary",
"memory_code",
"memory_ets",
"read_repairs",
"node_put_fsm_rejected_60s",
"node_put_fsm_active_60s",
"node_put_fsm_in_rate",
"node_put_fsm_out_rate",
"node_get_fsm_rejected_60s",
"node_get_fsm_active_60s",
"node_get_fsm_in_rate",
"node_get_fsm_out_rate"]
stat_keys = [
"node_get_fsm_siblings",
"node_get_fsm_objsize",
"node_get_fsm_time",
"node_put_fsm_time"
]
stat_keys = ["node_get_fsm_siblings",
"node_get_fsm_objsize",
"node_get_fsm_time",
"node_put_fsm_time"]
def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances)

View File

@ -28,10 +28,9 @@ class SQLServer(AgentCheck):
# Load any custom metrics from conf.d/sqlserver.yaml
for row in init_config.get('custom_metrics', []):
if row['type'] not in VALID_METRIC_TYPES:
self.log.error('%s has an invalid metric type: %s' \
% (row['name'], row['type']))
self.METRICS.append( (row['name'], row['type'], row['counter_name'],
row.get('instance_name', ''), row.get('tag_by', None)) )
self.log.error('%s has an invalid metric type: %s' % (row['name'], row['type']))
self.METRICS.append((row['name'], row['type'], row['counter_name'],
row.get('instance_name', ''), row.get('tag_by', None)))
# Cache connections
self.connections = {}
@ -46,8 +45,7 @@ class SQLServer(AgentCheck):
def _conn_string(host, username, password, database):
''' Return a connection string to use with adodbapi
'''
conn_str = 'Provider=SQLOLEDB;Data Source=%s;Initial Catalog=%s;' \
% (host, database)
conn_str = 'Provider=SQLOLEDB;Data Source=%s;Initial Catalog=%s;' % (host, database)
if username:
conn_str += 'User ID=%s;' % (username)
if password:

View File

@ -29,7 +29,7 @@ class TCPCheck(ServicesCheck):
raise BadConfException("A valid url must be specified")
# IPv6 address format: 2001:db8:85a3:8d3:1319:8a2e:370:7348
if len(split) == 8: # It may then be a IP V6 address, we check that
if len(split) == 8: # It may then be a IP V6 address, we check that
for block in split:
if len(block) != 4:
raise BadConfException("%s is not a correct IPv6 address." % url)
@ -112,7 +112,6 @@ class TCPCheck(ServicesCheck):
else:
source_type = "%s.%s" % (ServicesCheck.SOURCE_TYPE_NAME, instance_source_type_name)
# Get the handles you want to notify
notify = instance.get('notify', self.init_config.get('notify', []))
notify_message = ""

View File

@ -27,7 +27,7 @@ class Varnish(AgentCheck):
else:
# Unsupported data type, ignore
self._reset()
return # don't save
return # don't save
# reset for next stat element
self._reset()

View File

@ -46,14 +46,12 @@ class Win32EventLog(AgentCheck):
# Find all events in the last check that match our search by running a
# straight WQL query against the event log
last_ts = self.last_ts[instance_key]
q = EventLogQuery(
ltype=instance.get('type'),
user=instance.get('user'),
source_name=instance.get('source_name'),
log_file=instance.get('log_file'),
message_filters=instance.get('message_filters', []),
start_ts=last_ts
)
q = EventLogQuery(ltype=instance.get('type'),
user=instance.get('user'),
source_name=instance.get('source_name'),
log_file=instance.get('log_file'),
message_filters=instance.get('message_filters', []),
start_ts=last_ts)
wql = q.to_wql()
self.log.debug("Querying for Event Log events: %s" % wql)
events = w.query(wql)

View File

@ -51,8 +51,7 @@ class WMICheck(AgentCheck):
search = f.values()[0]
if SEARCH_WILDCARD in search:
search = search.replace(SEARCH_WILDCARD, '%')
wql = "SELECT * FROM %s WHERE %s LIKE '%s'" \
% (wmi_class, prop, search)
wql = "SELECT * FROM %s WHERE %s LIKE '%s'" % (wmi_class, prop, search)
results = w.query(wql)
else:
results = getattr(w, wmi_class)(**f)

View File

@ -30,6 +30,7 @@ import socket
import struct
from StringIO import StringIO
class Zookeeper(AgentCheck):
version_pattern = re.compile(r'Zookeeper version: ([^.]+)\.([^.]+)\.([^-]+)', flags=re.I)

View File

@ -63,8 +63,6 @@ class PoolWorker(threading.Thread):
self.running = False
class Pool(object):
"""
The Pool class represents a pool of worker threads. It has methods

View File

@ -83,7 +83,7 @@ class Disk(Check):
elif Platform.is_freebsd(platform_name):
# Filesystem 1K-blocks Used Avail Capacity iused ifree %iused Mounted
# Inodes are in position 5, 6 and we need to compute the total
parts[1] = int(parts[5]) + int(parts[6]) # Total
parts[1] = int(parts[5]) + int(parts[6]) # Total
parts[2] = int(parts[5]) # Used
parts[3] = int(parts[6]) # Available
else:

View File

@ -58,7 +58,7 @@ class TailFile(object):
# Compute CRC of the beginning of the file
crc = None
if size >= self.CRC_SIZE:
tmp_file = open(self._path,'r')
tmp_file = open(self._path, 'r')
data = tmp_file.read(self.CRC_SIZE)
crc = binascii.crc32(data)
@ -86,7 +86,7 @@ class TailFile(object):
self._size = size
self._crc = crc
self._f = open(self._path,'r')
self._f = open(self._path, 'r')
if move_end:
self._log.debug("Opening file %s" % (self._path))
self._f.seek(0, SEEK_END)

View File

@ -44,12 +44,13 @@ from jmxfetch import JMXFetch
# Constants
PID_NAME = "dd-agent"
WATCHDOG_MULTIPLIER = 10
RESTART_INTERVAL = 4 * 24 * 60 * 60 # Defaults to 4 days
RESTART_INTERVAL = 4 * 24 * 60 * 60 # Defaults to 4 days
START_COMMANDS = ['start', 'restart', 'foreground']
# Globals
log = logging.getLogger('collector')
class Agent(Daemon):
"""
The agent class is a daemon that runs the collector in a background process.

View File

@ -27,24 +27,23 @@ from jmxfetch import JMXFetch, JMX_COLLECT_COMMAND
# CONSTANTS
DATADOG_CONF = "datadog.conf"
DEFAULT_CHECK_FREQUENCY = 15 # seconds
DEFAULT_CHECK_FREQUENCY = 15 # seconds
DEFAULT_STATSD_FREQUENCY = 2 # seconds
DEFAULT_STATSD_BUCKET_SIZE = 10 #seconds
DEFAULT_STATSD_BUCKET_SIZE = 10 # seconds
LOGGING_MAX_BYTES = 5 * 1024 * 1024
log = logging.getLogger(__name__)
windows_file_handler_added = False
class PathNotFound(Exception):
pass
def get_parsed_args():
parser = OptionParser()
parser.add_option('-c', '--clean', action='store_true', default=False,
dest='clean')
parser.add_option('-v', '--verbose', action='store_true', default=False,
dest='verbose',
parser.add_option('-c', '--clean', action='store_true', default=False, dest='clean')
parser.add_option('-v', '--verbose', action='store_true', default=False, dest='verbose',
help='Print out stacktraces for errors in checks')
try:
@ -58,6 +57,7 @@ def get_parsed_args():
def get_version():
return "4.3.0"
def skip_leading_wsp(f):
"Works on a file, returns a file-like object"
return StringIO("\n".join(map(string.strip, f.readlines())))
@ -75,9 +75,9 @@ def _windows_commondata_path():
_SHGetFolderPath = windll.shell32.SHGetFolderPathW
_SHGetFolderPath.argtypes = [wintypes.HWND,
ctypes.c_int,
wintypes.HANDLE,
wintypes.DWORD, wintypes.LPCWSTR]
ctypes.c_int,
wintypes.HANDLE,
wintypes.DWORD, wintypes.LPCWSTR]
path_buf = wintypes.create_unicode_buffer(wintypes.MAX_PATH)
result = _SHGetFolderPath(0, CSIDL_COMMON_APPDATA, 0, 0, path_buf)

View File

@ -43,12 +43,15 @@ FLUSH_LOGGING_INITIAL = 10
FLUSH_LOGGING_COUNT = 5
EVENT_CHUNK_SIZE = 50
def serialize_metrics(metrics):
return json.dumps({"series" : metrics})
return json.dumps({"series": metrics})
def serialize_event(event):
return json.dumps(event)
class Reporter(threading.Thread):
"""
The reporter periodically sends the aggregated metrics to the
@ -205,6 +208,7 @@ class Reporter(threading.Thread):
finally:
conn.close()
class Server(object):
"""
A statsd udp server.

View File

@ -33,10 +33,10 @@ LEGACY_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
# Parse Cassandra default system.log log4j pattern: %5p [%t] %d{ISO8601} %F (line %L) %m%n
LOG_PATTERN = re.compile(r"".join([
r"\s*(?P<priority>%s)\s+" % "|".join("(%s)" % p for p in LOG4J_PRIORITY),
r"(\[CompactionExecutor:\d*\]\s+)?", # optional thread name and number
r"(\[CompactionExecutor:\d*\]\s+)?", # optional thread name and number
r"((?P<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d*)|",
r"(?P<time>\d{2}:\d{2}:\d{2},\d*))\s+",
r"(\w+\.java \(line \d+\)\s+)?", # optional source file and line
r"(\w+\.java \(line \d+\)\s+)?", # optional source file and line
r"(?P<msg>Compact(ed|ing) .*)\s*",
]))

View File

@ -27,12 +27,10 @@ SUPERVISORD_LEVELS = [
]
# mapping between datadog and supervisord log levels
ALERT_TYPES_MAPPING = {
"CRIT": "error",
"ERRO": "error",
"WARN": "warning",
"INFO": "info",
}
ALERT_TYPES_MAPPING = {"CRIT": "error",
"ERRO": "error",
"WARN": "warning",
"INFO": "info"}
# regex to extract the 'program' supervisord is managing from the text
program_matcher = re.compile("^\w+:? '?(?P<program>\w+)'?")

View File

@ -12,7 +12,6 @@ def post_headers(agentConfig, payload):
}
def http_emitter(message, log, agentConfig):
"Send payload"

View File

@ -47,16 +47,16 @@ import modules
log = logging.getLogger('forwarder')
log.setLevel(get_logging_config()['log_level'] or logging.INFO)
TRANSACTION_FLUSH_INTERVAL = 5000 # Every 5 seconds
WATCHDOG_INTERVAL_MULTIPLIER = 10 # 10x flush interval
TRANSACTION_FLUSH_INTERVAL = 5000 # Every 5 seconds
WATCHDOG_INTERVAL_MULTIPLIER = 10 # 10x flush interval
# Maximum delay before replaying a transaction
MAX_WAIT_FOR_REPLAY = timedelta(seconds=90)
# Maximum queue size in bytes (when this is reached, old messages are dropped)
MAX_QUEUE_SIZE = 30 * 1024 * 1024 # 30MB
MAX_QUEUE_SIZE = 30 * 1024 * 1024 # 30MB
THROTTLING_DELAY = timedelta(microseconds=1000000/2) # 2 msg/second
THROTTLING_DELAY = timedelta(microseconds=1000000/2) # 2 msg/second
class EmitterThread(threading.Thread):
@ -85,6 +85,7 @@ class EmitterThread(threading.Thread):
except Full:
self.__logger.warn('Dropping packet for %r due to backlog', self.__name)
class EmitterManager(object):
"""Track custom emitters"""
@ -117,6 +118,7 @@ class EmitterManager(object):
logging.info('Queueing for emitter %r', emitterThread.name)
emitterThread.enqueue(data, headers)
class MetricTransaction(Transaction):
_application = None

View File

@ -50,7 +50,10 @@ PYTHON_JMX_STATUS_FILE = 'jmx_status_python.yaml'
LINK_TO_DOC = "See http://docs.datadoghq.com/integrations/java/ for more information"
class InvalidJMXConfiguration(Exception): pass
class InvalidJMXConfiguration(Exception):
pass
class JMXFetch(object):
@ -59,7 +62,7 @@ class JMXFetch(object):
@classmethod
def init(cls, confd_path, agentConfig, logging_config,
default_check_frequency, command=None, checks_list=None, reporter=None):
default_check_frequency, command=None, checks_list=None, reporter=None):
try:
command = command or JMX_COLLECT_COMMAND
jmx_checks, invalid_checks, java_bin_path, java_options = JMXFetch.should_run(confd_path, checks_list)

View File

@ -68,8 +68,7 @@ def get_module(name):
def load(config_string, default_name=None):
"""Given a module name and an object expected to be contained within,
return said object"""
(module_name, object_name) = \
(config_string.rsplit(':', 1) + [default_name])[:2]
(module_name, object_name) = (config_string.rsplit(':', 1) + [default_name])[:2]
module = get_module(module_name)
if object_name:
return getattr(module, object_name)

View File

@ -12,12 +12,12 @@ class Processes(ResourcePlugin):
@staticmethod
def describe_snapshot():
return SnapshotDescriptor(1,
SnapshotField("user", 'str',aggregator=agg.append,temporal_aggregator=agg.append),
SnapshotField("user", 'str', aggregator=agg.append, temporal_aggregator=agg.append),
SnapshotField("pct_cpu", 'float'),
SnapshotField("pct_mem", 'float'),
SnapshotField("vsz", 'int'),
SnapshotField("rss", 'int'),
SnapshotField("family", 'str',aggregator=None, temporal_aggregator=None,
SnapshotField("family", 'str', aggregator=None, temporal_aggregator=None,
group_on=True, temporal_group_on=True),
SnapshotField("ps_count", 'int'))
@ -32,8 +32,8 @@ class Processes(ResourcePlugin):
# Split out each process
processLines = ps.split('\n')
del processLines[0] # Removes the headers
processLines.pop() # Removes a trailing empty line
del processLines[0] # Removes the headers
processLines.pop() # Removes a trailing empty line
processes = []
@ -52,7 +52,7 @@ class Processes(ResourcePlugin):
#keep everything over 1% (cpu or ram)
return o[0] > 1 or o[1] > 1
def _parse_proc_list(self,processes):
def _parse_proc_list(self, processes):
def _compute_family(command):
if command.startswith('['):
@ -60,7 +60,7 @@ class Processes(ResourcePlugin):
else:
return (command.split()[0]).split('/')[-1]
PSLine = namedtuple("PSLine","user,pid,pct_cpu,pct_mem,vsz,rss,tty,stat,started,time,command")
PSLine = namedtuple("PSLine", "user, pid, pct_cpu, pct_mem, vsz, rss, tty, stat, started, time, command")
self.start_snapshot()
for line in processes:
@ -77,10 +77,10 @@ class Processes(ResourcePlugin):
pass
self.end_snapshot(group_by= self.group_by_family)
def flush_snapshots(self,snapshot_group):
self._flush_snapshots(snapshot_group = snapshot_group,
group_by = self.group_by_family,
filter_by= self.filter_by_usage)
def flush_snapshots(self, snapshot_group):
self._flush_snapshots(snapshot_group=snapshot_group,
group_by=self.group_by_family,
filter_by=self.filter_by_usage)
def check(self):
self._parse_proc_list(self._get_proc_list())

View File

@ -46,7 +46,7 @@ class Transaction(object):
def get_next_flush(self):
return self._next_flush
def compute_next_flush(self,max_delay):
def compute_next_flush(self, max_delay):
# Transactions are replayed, try to send them faster for newer transactions
# Send them every MAX_WAIT_FOR_REPLAY at most
td = timedelta(seconds=self._error_count * 20)
@ -56,7 +56,7 @@ class Transaction(object):
newdate = datetime.now() + td
self._next_flush = newdate.replace(microsecond=0)
def time_to_flush(self,now = datetime.now()):
def time_to_flush(self, now=datetime.now()):
return self._next_flush < now
def flush(self):
@ -72,10 +72,10 @@ class TransactionManager(object):
self._MAX_QUEUE_SIZE = max_queue_size
self._THROTTLING_DELAY = throttling_delay
self._flush_without_ioloop = False # useful for tests
self._flush_without_ioloop = False # useful for tests
self._transactions = [] #List of all non commited transactions
self._total_count = 0 # Maintain size/count not to recompute it everytime
self._transactions = [] # List of all non commited transactions
self._total_count = 0 # Maintain size/count not to recompute it everytime
self._total_size = 0
self._flush_count = 0
self._transactions_received = 0
@ -85,8 +85,8 @@ class TransactionManager(object):
# if this overlaps
self._counter = 0
self._trs_to_flush = None # Current transactions being flushed
self._last_flush = datetime.now() # Last flush (for throttling)
self._trs_to_flush = None # Current transactions being flushed
self._last_flush = datetime.now() # Last flush (for throttling)
# Track an initial status message.
ForwarderStatus().persist()
@ -102,7 +102,7 @@ class TransactionManager(object):
self._counter += 1
return self._counter
def append(self,tr):
def append(self, tr):
# Give the transaction an id
tr.set_id(self.get_tr_id())
@ -111,11 +111,11 @@ class TransactionManager(object):
tr_size = tr.get_size()
log.debug("New transaction to add, total size of queue would be: %s KB" %
((self._total_size + tr_size)/ 1024))
((self._total_size + tr_size)/1024))
if (self._total_size + tr_size) > self._MAX_QUEUE_SIZE:
log.warn("Queue is too big, removing old transactions...")
new_trs = sorted(self._transactions,key=attrgetter('_next_flush'), reverse = True)
new_trs = sorted(self._transactions, key=attrgetter('_next_flush'), reverse=True)
for tr2 in new_trs:
if (self._total_size + tr_size) > self._MAX_QUEUE_SIZE:
self._transactions.remove(tr2)
@ -125,7 +125,7 @@ class TransactionManager(object):
# Done
self._transactions.append(tr)
self._total_count += 1
self._total_count += 1
self._transactions_received += 1
self._total_size = self._total_size + tr_size

View File

@ -26,17 +26,20 @@ log = logging.getLogger(__name__)
NumericTypes = (float, int, long)
def plural(count):
if count == 1:
return ""
return "s"
def get_tornado_ioloop():
if tornado_version[0] == 3:
return ioloop.IOLoop.current()
else:
return ioloop.IOLoop.instance()
def get_uuid():
# Generate a unique name that will stay constant between
# invocations, such as platform.node() + uuid.getnode()
@ -110,6 +113,7 @@ def cast_metric_val(val):
raise ValueError
return val
def is_valid_hostname(hostname):
if hostname.lower() in {'localhost', 'localhost.localdomain', 'localhost6.localdomain6', 'ip6-localhost'}:
log.warning("Hostname: %s is local" % hostname)
@ -183,6 +187,7 @@ def get_hostname(config=None):
else:
return hostname
class GCE(object):
URL = "http://169.254.169.254/computeMetadata/v1/?recursive=true"
TIMEOUT = 0.1 # second

View File

@ -23,7 +23,7 @@ from win32.common import handle_exe_click
from jmxfetch import JMXFetch
log = logging.getLogger(__name__)
RESTART_INTERVAL = 24 * 60 * 60 # Defaults to 1 day
RESTART_INTERVAL = 24 * 60 * 60 # Defaults to 1 day
class AgentSvc(win32serviceutil.ServiceFramework):
_svc_name_ = "DatadogAgent"
@ -65,10 +65,9 @@ class AgentSvc(win32serviceutil.ServiceFramework):
def SvcDoRun(self):
import servicemanager
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STARTED,
(self._svc_name_, ''))
servicemanager.LogMsg(servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STARTED,
(self._svc_name_, ''))
self.start_ts = time.time()
# Start all services.

View File

@ -49,13 +49,11 @@ DATADOG_SERVICE = "DatadogAgent"
STATUS_PAGE_URL = "http://localhost:17125/status"
AGENT_LOG_FILE = osp.join(_windows_commondata_path(), 'Datadog', 'logs', 'ddagent.log')
HUMAN_SERVICE_STATUS = {
win32service.SERVICE_RUNNING : 'Service is running',
win32service.SERVICE_START_PENDING : 'Service is starting',
win32service.SERVICE_STOP_PENDING : 'Service is stopping',
win32service.SERVICE_STOPPED : 'Service is stopped',
"Unknown" : "Cannot get service status",
}
HUMAN_SERVICE_STATUS = {win32service.SERVICE_RUNNING: 'Service is running',
win32service.SERVICE_START_PENDING: 'Service is starting',
win32service.SERVICE_STOP_PENDING: 'Service is stopping',
win32service.SERVICE_STOPPED: 'Service is stopped',
"Unknown": "Cannot get service status"}
REFRESH_PERIOD = 5000
@ -74,6 +72,7 @@ SYSTEM_TRAY_MENU = [
(EXIT_MANAGER, lambda: sys.exit(0)),
]
def get_checks():
checks = {}
conf_d_directory = get_confd_path(get_os())
@ -95,6 +94,7 @@ def get_checks():
return checks_list
class EditorFile(object):
def __init__(self, file_path, description):
self.file_path = file_path
@ -113,6 +113,7 @@ class EditorFile(object):
warning_popup("Unable to save file: \n %s" % str(e))
raise
class LogFile(EditorFile):
def __init__(self):
EditorFile.__init__(self, AGENT_LOG_FILE, "Agent log file")
@ -150,6 +151,7 @@ class DatadogConf(EditorFile):
else:
self.check_api_key(editor)
class AgentCheck(EditorFile):
def __init__(self, filename, ext, conf_d_directory):
file_path = osp.join(conf_d_directory, filename)
@ -176,6 +178,7 @@ class AgentCheck(EditorFile):
check_yaml_syntax(content)
EditorFile.save(self, content)
class PropertiesWidget(QWidget):
def __init__(self, parent):
QWidget.__init__(self, parent)