Improve Elasticsearch collectd plugin

This change modifies the Elastcisearch plugin to retrieve the cluster
metrics only from the node that is the elected master. This avoids
sending and storing duplicated metrics into InfluxDB.

Change-Id: Iaeb90593e5aa4a2ddedea6abd4445a1a64b554d2
Co-Authored-By: Simon Pasquier <spasquier@mirantis.com>
This commit is contained in:
Swann Croiset 2017-01-10 11:18:04 +01:00
parent c866caadc7
commit 0552c04687
1 changed files with 29 additions and 7 deletions

View File

@ -35,6 +35,7 @@ class ElasticsearchClusterHealthPlugin(base.Base):
self.plugin = NAME
self.address = '127.0.0.1'
self.port = 9200
self._node_id = None
self.session = requests.Session()
self.url = None
self.session.mount(
@ -51,25 +52,41 @@ class ElasticsearchClusterHealthPlugin(base.Base):
if node.key == 'Port':
self.port = node.values[0]
self.url = "http://{address}:{port}/_cluster/health".format(
self.url = "http://{address}:{port}/".format(
**{
'address': self.address,
'port': int(self.port),
})
def itermetrics(self):
def query_api(self, resource):
url = "{}{}".format(self.url, resource)
try:
r = self.session.get(self.url)
r = self.session.get(url)
except Exception as e:
msg = "Got exception for '{}': {}".format(self.url, e)
msg = "Got exception for '{}': {}".format(url, e)
raise base.CheckException(msg)
if r.status_code != 200:
msg = "{} responded with code {}".format(
self.url, r.status_code)
msg = "{} responded with code {}".format(url, r.status_code)
raise base.CheckException(msg)
data = r.json()
return r.json()
@property
def node_id(self):
if self._node_id is None:
local_node = self.query_api('_nodes/_local')
self._node_id = local_node.get('nodes', {}).keys()[0]
return self._node_id
def itermetrics(self):
# Collect cluster metrics only from the elected master
master_node = self.query_api('_cluster/state/master_node')
if master_node.get('master_node', '') != self.node_id:
return
data = self.query_api('_cluster/health')
self.logger.debug("Got response from Elasticsearch: '%s'" % data)
yield {
@ -92,6 +109,10 @@ class ElasticsearchClusterHealthPlugin(base.Base):
plugin = ElasticsearchClusterHealthPlugin(collectd, 'elasticsearch')
def init_callback():
plugin.restore_sigchld()
def config_callback(conf):
plugin.config_callback(conf)
@ -99,5 +120,6 @@ def config_callback(conf):
def read_callback():
plugin.read_callback()
collectd.register_init(init_callback)
collectd.register_config(config_callback)
collectd.register_read(read_callback)