Add check metrics to collectd plugins

The check metrics are always emitted by Python plugins to express
the success or the failure of the metric collection.

Some plugins don't emit the check metric because it is redundant for them.
This applies to all the openstack_* plugins where
check_openstack_api plugin has the same meaning than 'check' metrics.

The patch fixes also the error handling for the rabbitmq_info plugin.

Change-Id: Ic972e81ddd955e718bbc45b4b8502fa8ea3c2939
This commit is contained in:
Swann Croiset 2016-07-29 09:31:15 +02:00
parent 2f9020f9b2
commit 20cbd03dae
13 changed files with 101 additions and 49 deletions

View File

@ -70,7 +70,8 @@ class CephOSDPerfPlugin(base.CephBase):
perf_dump = self.execute_to_json('ceph --admin-daemon %s perf dump'
% socket_name)
if not perf_dump:
continue
raise base.CheckException(
"Fail to run 'ceph perf dump' for OSD {}".format(osd_id))
for prefix, stats in perf_dump.iteritems():
if prefix not in self.PREFIXES or not stats:
@ -83,7 +84,7 @@ class CephOSDPerfPlugin(base.CephBase):
'values': self.convert_to_collectd_value(stats[k])
}
plugin = CephOSDPerfPlugin(collectd)
plugin = CephOSDPerfPlugin(collectd, 'ceph_osd')
def init_callback():

View File

@ -30,7 +30,7 @@ class CephOSDStatsPlugin(base.CephBase):
def itermetrics(self):
osd_stats = self.execute_to_json('ceph pg dump osds --format json')
if not osd_stats:
return
raise base.CheckException("Fail to execute 'pg dump osds'")
for osd in osd_stats:
osd_id = osd['osd']
@ -48,7 +48,7 @@ class CephOSDStatsPlugin(base.CephBase):
osd['fs_perf_stat']['commit_latency_ms']],
}
plugin = CephOSDStatsPlugin(collectd)
plugin = CephOSDStatsPlugin(collectd, 'ceph_mon')
def init_callback():

View File

@ -35,7 +35,7 @@ class CephMonPlugin(base.CephBase):
def itermetrics(self):
status = self.execute_to_json('ceph -s --format json')
if not status:
return
raise base.CheckException("Fail to execute 'ceph -s'")
yield {
'type': 'health',
@ -78,7 +78,7 @@ class CephMonPlugin(base.CephBase):
'values': state['count']
}
plugin = CephMonPlugin(collectd)
plugin = CephMonPlugin(collectd, 'ceph_mon')
def init_callback():

View File

@ -30,7 +30,7 @@ class CephPoolPlugin(base.CephBase):
def itermetrics(self):
df = self.execute_to_json('ceph df --format json')
if not df:
return
raise base.CheckException("Fail to run 'ceph df'")
objects_count = 0
for pool in df['pools']:
@ -73,7 +73,7 @@ class CephPoolPlugin(base.CephBase):
stats = self.execute_to_json('ceph osd pool stats --format json')
if not stats:
return
raise base.CheckException("Fail to run 'ceph osd pool stats'")
for pool in stats:
client_io_rate = pool.get('client_io_rate', {})
@ -91,7 +91,7 @@ class CephPoolPlugin(base.CephBase):
osd = self.execute_to_json('ceph osd dump --format json')
if not osd:
return
raise base.CheckException("Fail to run 'ceph osd dump'")
for pool in osd['pools']:
for name in ('size', 'pg_num', 'pg_placement_num'):
@ -117,7 +117,7 @@ class CephPoolPlugin(base.CephBase):
'values': [_up, _down, _in, _out]
}
plugin = CephPoolPlugin(collectd)
plugin = CephPoolPlugin(collectd, 'ceph_mon')
def init_callback():

View File

@ -25,6 +25,10 @@ import traceback
INTERVAL = 10
class CheckException(Exception):
pass
# A decorator that will call the decorated function only when the plugin has
# detected that it is currently active.
def read_callback_wrapper(f):
@ -45,7 +49,7 @@ class Base(object):
MAX_IDENTIFIER_LENGTH = 63
def __init__(self, collectd):
def __init__(self, collectd, service_name=None):
self.debug = False
self.timeout = 5
self.max_retries = 3
@ -57,6 +61,8 @@ class Base(object):
self.depends_on_resource = None
self.do_collect_data = True
self.service_name = service_name
def config_callback(self, conf):
for node in conf.children:
if node.key == "Debug":
@ -77,10 +83,27 @@ class Base(object):
try:
for metric in self.itermetrics():
self.dispatch_metric(metric)
except CheckException as e:
msg = '{}: {}'.format(self.plugin, e)
self.logger.warning(msg)
self.dispatch_check_metric(self.FAIL, msg)
except Exception as e:
self.logger.error('%s: Failed to get metrics: %s: %s' %
(self.plugin, e, traceback.format_exc()))
return
msg = '{}: Failed to get metrics: {}'.format(self.plugin, e)
self.logger.error('{}: {}'.format(msg, traceback.format_exc()))
self.dispatch_check_metric(self.FAIL, msg)
else:
self.dispatch_check_metric(self.OK)
def dispatch_check_metric(self, check, failure=None):
metric = {
'meta': {'service_check': self.service_name or self.plugin},
'values': check,
}
if failure is not None:
metric['meta']['failure'] = failure
self.dispatch_metric(metric)
def itermetrics(self):
"""Iterate over the collected metrics

View File

@ -28,8 +28,6 @@ METRICS = ['number_of_nodes', 'active_primary_shards', 'active_primary_shards',
'active_shards', 'relocating_shards', 'unassigned_shards',
'number_of_pending_tasks', 'initializing_shards']
HEALTH_ON_ERROR = {'type_instance': 'health', 'values': HEALTH_MAP['red']}
class ElasticsearchClusterHealthPlugin(base.Base):
def __init__(self, *args, **kwargs):
@ -63,15 +61,14 @@ class ElasticsearchClusterHealthPlugin(base.Base):
try:
r = self.session.get(self.url)
except Exception as e:
self.logger.error("Got exception for '{}': {}".format(self.url, e))
yield HEALTH_ON_ERROR
return
msg = "Got exception for '{}': {}".format(self.url, e)
raise base.CheckException(msg)
if r.status_code != 200:
self.logger.error("{} responded with code {}".format(
self.url, r.status_code))
yield HEALTH_ON_ERROR
return
msg = "{} responded with code {}".format(
self.url, r.status_code)
raise base.CheckException(msg)
data = r.json()
self.logger.debug("Got response from Elasticsearch: '%s'" % data)
@ -79,6 +76,7 @@ class ElasticsearchClusterHealthPlugin(base.Base):
'type_instance': 'health',
'values': HEALTH_MAP[data['status']]
}
for metric in METRICS:
value = data.get(metric)
if value is None:
@ -91,7 +89,7 @@ class ElasticsearchClusterHealthPlugin(base.Base):
'values': value
}
plugin = ElasticsearchClusterHealthPlugin(collectd)
plugin = ElasticsearchClusterHealthPlugin(collectd, 'elasticsearch')
def init_callback():

View File

@ -167,8 +167,9 @@ class HAProxyPlugin(base.Base):
try:
stats = haproxy.get_server_info()
except socket.error:
self.logger.warning(
"Unable to connect to HAProxy socket at %s" % self.socket)
msg = "Unable to connect to HAProxy socket at {}".format(
self.socket)
raise base.CheckException(msg)
else:
for k, v in stats.iteritems():
if k not in SERVER_METRICS:
@ -184,9 +185,9 @@ class HAProxyPlugin(base.Base):
try:
stats = haproxy.get_server_stats()
except socket.error:
self.logger.warning(
"Unable to connect to HAProxy socket at %s" % self.socket)
return
msg = "Unable to connect to HAProxy socket at {}".format(
self.socket)
raise base.CheckException(msg)
def match(x):
if x['pxname'] in self.proxy_ignore:

View File

@ -98,14 +98,12 @@ class InfluxDBClusterPlugin(base.Base):
try:
r = self.session.get(url, params=payload)
except Exception as e:
self.logger.error("Got {0} when getting stats from {1}".format(
e, url))
return
msg = "Got {0} when getting stats from {1}".format(e, url)
raise base.CheckException(msg)
if r.status_code != 200:
self.logger.error("Got response {0} from {0}".format(
r.status_code, url))
return
msg = "Got response {0} from {0}".format(r.status_code, url)
raise base.CheckException(msg)
data = r.json()
try:

View File

@ -48,8 +48,9 @@ class PacemakerResourcePlugin(base.Base):
'--quiet', '--resource', resource],
shell=False)
if not out:
self.logger.error("%s: Failed to get the status for '%s'" %
(self.plugin, resource))
msg = "{}: Failed to get the status for '%s'".format(
self.plugin, resource)
raise base.CheckException(msg)
else:
value = 0
@ -60,7 +61,7 @@ class PacemakerResourcePlugin(base.Base):
'values': value
}
plugin = PacemakerResourcePlugin(collectd)
plugin = PacemakerResourcePlugin(collectd, 'pacemaker')
def init_callback():

View File

@ -71,10 +71,14 @@ class RabbitMqPlugin(base.Base):
r = self.session.get(self.api_overview_url, timeout=self.timeout)
overview = r.json()
except Exception as e:
self.logger.warning("Got exception for '{}': {}".format(
self.api_nodes_url, e)
)
return
msg = "Got exception for '{}': {}".format(self.api_overview_url, e)
raise base.CheckException(msg)
if r.status_code != 200:
msg = "{} responded with code {}".format(
self.api_overview_url, r.status_code)
raise base.CheckException(msg)
objects = overview['object_totals']
stats['queues'] = objects['queues']
stats['consumers'] = objects['consumers']
@ -94,10 +98,14 @@ class RabbitMqPlugin(base.Base):
timeout=self.timeout)
node = r.json()
except Exception as e:
self.logger.warning("Got exception for '{}': {}".format(
self.api_node_url, e)
)
return
msg = "Got exception for '{}': {}".format(self.api_nodes_url, e)
raise base.CheckException(msg)
if r.status_code != 200:
msg = "{} responded with code {}".format(
self.api_nodes_url, r.status_code)
self.logger.error(msg)
raise base.CheckException(msg)
stats['disk_free_limit'] = node['disk_free_limit']
stats['disk_free'] = node['disk_free']
@ -111,7 +119,7 @@ class RabbitMqPlugin(base.Base):
yield {'type_instance': k, 'values': v}
plugin = RabbitMqPlugin(collectd)
plugin = RabbitMqPlugin(collectd, 'rabbitmq')
def config_callback(conf):

View File

@ -80,7 +80,11 @@ function process_message ()
-- Normalize metric name, unfortunately collectd plugins aren't
-- always consistent on metric namespaces so we need a few if/else
-- statements to cover all cases.
if metric_source == 'df' then
if sample['meta'] and sample['meta']['service_check'] then
msg['Fields']['name'] = sample['meta']['service_check'] .. sep .. 'check'
msg['Fields']['details'] = sample['meta']['failure']
elseif metric_source == 'df' then
local entity
if sample['type'] == 'df_inodes' then
entity = 'inodes'

View File

@ -78,4 +78,9 @@ Elasticsearch
InfluxDB
++++++++
.. include:: metrics/influxdb.rst
.. include:: metrics/influxdb.rst
Checks
++++++
.. include:: metrics/checks.rst

View File

@ -0,0 +1,13 @@
.. _check-metrics:
The check metrics are emitted to express the success or the failure of the
metric collections for the local services.
The value is ``1`` when successful and ``0`` if it fails.
* ``rabbitmq_check``, for RabbitMQ.
* ``haproxy_check``, for HAProxy.
* ``pacemaker_check``, for Pacemaker.
* ``ceph_mon_check``, for Ceph monitor.
* ``ceph_osd_check``, for Ceph OSD.
* ``elasticsearch_check``, for Elasticsearch.
* ``influxdb_check``, for InfluxDB.