Use RabbitMQ management API

The patch uses the management API to retrieve metrics instead of
executing rabbitmqctl command.

A side effect is that all metrics per-queues are not collected anymore.

Change-Id: I5dab785321e369ec0e1a69a79e0700b276810925
Closes-bug: #1594337
(cherry picked from commit 1ae8829823)
This commit is contained in:
Swann Croiset 2016-07-11 16:25:03 +02:00
parent 658c40e10c
commit ce524c0e62
6 changed files with 111 additions and 214 deletions

View File

@ -377,8 +377,9 @@ if $is_rabbitmq and (hiera('lma::collector::elasticsearch::server', false) or hi
# collectd Puppet module.
unless $is_controller {
class { 'lma_collector::collectd::rabbitmq':
queue => ['/^(\\w*notifications\\.(error|info|warn)|[a-z]+|(metering|event)\.sample)$/'],
require => Class['lma_collector::collectd::base'],
username => 'nova',
password => $rabbit['password'],
require => Class['lma_collector::collectd::base'],
}
}
}

View File

@ -297,7 +297,9 @@ if hiera('lma::collector::influxdb::server', false) {
# controller.
unless $detach_rabbitmq_enabled {
class { 'lma_collector::collectd::rabbitmq':
queue => ['/^(\\w*notifications\\.(error|info|warn)|[a-z]+|(metering|event)\.sample)$/'],
username => 'nova',
password => $rabbit['password'],
require => Class['lma_collector::collectd::base'],
}
}

View File

@ -1,9 +1,7 @@
# Name: rabbitmq-collectd-plugin - rabbitmq_info.py
# Author: https://github.com/phrawzty/rabbitmq-collectd-plugin/commits/master
# Description: This plugin uses Collectd's Python plugin to obtain RabbitMQ
# metrics.
#
# Copyright 2012 Daniel Maher
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -19,201 +17,95 @@
# limitations under the License.
import collectd
import re
import collectd_base as base
import requests
NAME = 'rabbitmq_info'
# Override in config by specifying 'RmqcBin'.
RABBITMQCTL_BIN = '/usr/sbin/rabbitmqctl'
# Override in config by specifying 'Vhost'.
VHOST = "/"
# Used to find disk nodes and running nodes.
CLUSTER_STATUS = re.compile('.*disc,\[([^\]]+)\].*running_nodes,\[([^\]]+)\]',
re.S)
# Override in config by specifying 'Host'.
HOST = '127.0.0.1'
# Override in config by specifying 'Port'.
PORT = '15672'
class RabbitMqPlugin(base.Base):
# we need to substract the length of the longest prefix (eg '.consumers')
MAX_QUEUE_IDENTIFIER_LENGTH = base.Base.MAX_IDENTIFIER_LENGTH - 10
def __init__(self, *args, **kwargs):
super(RabbitMqPlugin, self).__init__(*args, **kwargs)
self.plugin = NAME
self.rabbitmqctl_bin = RABBITMQCTL_BIN
self.vhost = VHOST
self.re_queues = []
self.queues = []
def _matching_queue(self, name):
for r in self.re_queues:
if r.match(name):
return True
for q in self.queues:
if q == name:
return True
return False
self.username = None
self.password = None
self.host = HOST
self.port = PORT
self.session = None
def config_callback(self, conf):
super(RabbitMqPlugin, self).config_callback(conf)
for node in conf.children:
if node.key == 'RmqcBin':
self.rabbitmqctl_bin = node.values[0]
elif node.key == 'Vhost':
self.vhost = node.values[0]
elif node.key == 'Queue':
for val in node.values:
if val.startswith('/') and val.endswith('/') and \
len(val) > 2:
regex = val[1:len(val) - 1]
try:
self.re_queues.append(re.compile(regex))
except Exception as e:
self.logger.error(
'Cannot compile regex {}: {}'.format(regex, e))
raise e
elif len(val) > 0:
self.queues.append(val)
if node.key == 'Username':
self.username = node.values[0]
if node.key == 'Password':
self.password = node.values[0]
if node.key == 'Host':
self.host = node.values[0]
if node.key == 'Port':
self.port = node.values[0]
else:
self.logger.warning('Unknown config key: %s' % node.key)
if not (self.username and self.password):
self.logger.error('Missing Username and Password parameters')
self.session = requests.Session()
self.session.auth = (self.username, self.password)
self.session.mount(
'http://',
requests.adapters.HTTPAdapter(max_retries=self.max_retries)
)
url = "http://{}:{}".format(self.host, self.port)
self.api_nodes_url = "{}/api/nodes".format(url)
self.api_overview_url = "{}/api/overview".format(url)
def itermetrics(self):
stats = {}
stats['messages'] = 0
stats['memory'] = 0
stats['consumers'] = 0
stats['queues'] = 0
stats['unmirrored_queues'] = 0
out, err = self.execute([self.rabbitmqctl_bin, '-q', 'status'],
shell=False)
if not out:
self.logger.error('%s: Failed to get the status' %
self.rabbitmqctl_bin)
return
for v in ('vm_memory_limit', 'disk_free_limit', 'disk_free'):
try:
stats[v] = int(re.findall('{%s,([0-9]+)}' % v, out)[0])
except:
self.logger.error('%s: Failed to get %s' %
(self.rabbitmqctl_bin, v))
mem_str = re.findall('{memory,\s+\[([^\]]+)\]\}', out)
# We are only interested by the total of memory used
# TODO(all): Get all informations about memory usage from mem_str
try:
stats['used_memory'] = int(re.findall('total,([0-9]+)',
mem_str[0])[0])
except:
self.logger.error('%s: Failed to get the memory used by rabbitmq' %
self.rabbitmqctl_bin)
r = self.session.get(self.api_overview_url, timeout=self.timeout)
overview = r.json()
except Exception as e:
self.logger.warning("Got exception for '{}': {}".format(
self.api_nodes_url, e)
)
return
objects = overview['object_totals']
stats['queues'] = objects['queues']
stats['consumers'] = objects['consumers']
stats['connections'] = objects['connections']
stats['exchanges'] = objects['exchanges']
stats['channels'] = objects['channels']
stats['messages'] = overview['queue_totals']['messages']
stats['running_nodes'] = len(overview['contexts'])
if 'vm_memory_limit' in stats and 'used_memory' in stats:
stats['remaining_memory'] = \
stats['vm_memory_limit'] - stats['used_memory']
if 'disk_free' in stats and 'disk_free_limit' in stats:
stats['remaining_disk'] = \
stats['disk_free'] - stats['disk_free_limit']
for k, v in stats.iteritems():
yield {'type_instance': k, 'values': v}
out, err = self.execute([self.rabbitmqctl_bin, '-q', 'cluster_status'],
shell=False)
if not out:
self.logger.error('%s: Failed to get the cluster status' %
self.rabbitmqctl_bin)
stats = {}
nodename = overview['node']
try:
r = self.session.get("{}/{}".format(self.api_nodes_url, nodename),
timeout=self.timeout)
node = r.json()
except Exception as e:
self.logger.warning("Got exception for '{}': {}".format(
self.api_node_url, e)
)
return
# TODO(all): Need to be modified in case we are using RAM nodes.
status = CLUSTER_STATUS.findall(out)
if len(status) == 0:
self.logger.error('%s: Failed to parse (%s)' %
(self.rabbitmqctl_bin, out))
else:
stats['total_nodes'] = len(status[0][0].split(","))
stats['running_nodes'] = len(status[0][1].split(","))
stats['disk_free_limit'] = node['disk_free_limit']
stats['disk_free'] = node['disk_free']
stats['remaining_disk'] = node['disk_free'] - node['disk_free_limit']
out, err = self.execute([self.rabbitmqctl_bin, '-q',
'list_connections'], shell=False)
if not out:
self.logger.error('%s: Failed to get the number of connections' %
self.rabbitmqctl_bin)
return
stats['connections'] = len(out.split('\n'))
out, err = self.execute([self.rabbitmqctl_bin, '-q', 'list_exchanges'],
shell=False)
if not out:
self.logger.error('%s: Failed to get the number of exchanges' %
self.rabbitmqctl_bin)
return
stats['exchanges'] = len(out.split('\n'))
out, err = self.execute([self.rabbitmqctl_bin, '-q', '-p', self.vhost,
'list_queues', 'name', 'messages', 'memory',
'consumers', 'slave_pids',
'synchronised_slave_pids'], shell=False)
if not out:
self.logger.error('%s: Failed to get the list of queues' %
self.rabbitmqctl_bin)
return
for line in out.split('\n'):
ctl_stats = line.split('\t')
try:
ctl_stats[1] = int(ctl_stats[1])
ctl_stats[2] = int(ctl_stats[2])
ctl_stats[3] = int(ctl_stats[3])
except:
continue
stats['queues'] += 1
stats['messages'] += ctl_stats[1]
stats['memory'] += ctl_stats[2]
stats['consumers'] += ctl_stats[3]
queue_name = ctl_stats[0]
if self._matching_queue(queue_name):
meta = {
'queue': ctl_stats[0][:self.MAX_QUEUE_IDENTIFIER_LENGTH]
}
yield {
'type_instance': 'queue_messages',
'values': ctl_stats[1],
'meta': meta
}
yield {
'type_instance': 'queue_memory',
'values': ctl_stats[2],
'meta': meta
}
yield {
'type_instance': 'queue_consumers',
'values': ctl_stats[3],
'meta': meta
}
# we need to check if the list of synchronised slaves is
# equal to the list of slaves.
try:
slaves = re.findall('<([a-zA-Z@\-.0-9]+)>', ctl_stats[4])
for s in slaves:
if s not in ctl_stats[5]:
stats['unmirrored_queues'] += 1
break
except IndexError:
pass
if not stats['memory'] > 0:
self.logger.warning(
'%s reports 0 memory usage. This is probably incorrect.' %
self.rabbitmqctl_bin)
stats['used_memory'] = node['mem_used']
stats['vm_memory_limit'] = node['mem_limit']
stats['remaining_memory'] = node['mem_limit'] - node['mem_used']
for k, v in stats.iteritems():
yield {'type_instance': k, 'values': v}
@ -222,10 +114,6 @@ class RabbitMqPlugin(base.Base):
plugin = RabbitMqPlugin(collectd)
def init_callback():
plugin.restore_sigchld()
def config_callback(conf):
plugin.config_callback(conf)
@ -233,6 +121,5 @@ def config_callback(conf):
def read_callback():
plugin.read_callback()
collectd.register_init(init_callback)
collectd.register_config(config_callback)
collectd.register_read(read_callback)

View File

@ -14,23 +14,35 @@
#
class lma_collector::collectd::rabbitmq (
$queue = [],
$username,
$password,
$host = undef,
$port = undef,
) {
validate_array($queue)
# Add quotes around the array values
$real_queue = suffix(prefix($queue, '"'), '"')
if $queue {
$config = {
'Queue' => $real_queue,
if $host {
$host_config = {
'Host' => "\"${host}\"",
}
} else {
$config = {}
$host_config = {}
}
if $port {
validate_integer($port)
$port_config = {
'Port' => "\"${port}\"",
}
} else {
$port_config = {}
}
$config = {
'Username' => "\"${username}\"",
'Password' => "\"${password}\"",
}
lma_collector::collectd::python { 'rabbitmq_info':
config => $config,
config => merge($config, $host_config, $port_config)
}
}

View File

@ -19,16 +19,23 @@ describe 'lma_collector::collectd::rabbitmq' do
:osfamily => 'Debian', :concat_basedir => '/foo'}
end
describe 'with defaults' do
it { is_expected.to contain_lma_collector__collectd__python('rabbitmq_info') }
end
describe 'with queue parameter' do
describe 'with minimal parameters' do
let(:params) do
{:queue => ['/^(foo|bar)\\w+$/', 'notif']}
{:username => 'foouser', :password => 'foopass' }
end
it { is_expected.to contain_lma_collector__collectd__python('rabbitmq_info') \
.with_config({'Queue' => ['"/^(foo|bar)\\w+$/"', '"notif"']})
.with_config({'Username' => '"foouser"', 'Password' => '"foopass"'})
}
end
describe 'with host and port parameters' do
let(:params) do
{:username => 'foouser', :password => 'foopass', :host => 'foohost',
:port => 123,
}
end
it { is_expected.to contain_lma_collector__collectd__python('rabbitmq_info') \
.with_config({'Username' => '"foouser"', 'Password' => '"foopass"',
'Port' => '"123"', 'Host' => '"foohost"'})
}
end
end

View File

@ -5,26 +5,14 @@ Cluster
* ``rabbitmq_connections``, total number of connections.
* ``rabbitmq_consumers``, total number of consumers.
* ``rabbitmq_disk_free``, the disk free space.
* ``rabbitmq_disk_free_limit``, the minimum amount of free disk for RabbitMQ. When ``rabbitmq_disk_free`` drops below this value, all producers are blocked.
* ``rabbitmq_channels``, total number of channels.
* ``rabbitmq_exchanges``, total number of exchanges.
* ``rabbitmq_memory``, bytes of memory consumed by the Erlang process associated with all queues, including stack, heap and internal structures.
* ``rabbitmq_messages``, total number of messages which are ready to be consumed or not yet acknowledged.
* ``rabbitmq_queues``, total number of queues.
* ``rabbitmq_remaining_disk``, the difference between ``rabbitmq_disk_free`` and ``rabbitmq_disk_free_limit``.
* ``rabbitmq_remaining_memory``, the difference between ``rabbitmq_vm_memory_limit`` and ``rabbitmq_used_memory``.
* ``rabbitmq_running_nodes``, total number of running nodes in the cluster.
* ``rabbitmq_total_nodes``, total number of nodes in the cluster.
* ``rabbitmq_unmirrored_queues``, total number of queues that are not mirrored.
* ``rabbitmq_disk_free``, the disk free space.
* ``rabbitmq_disk_free_limit``, the minimum amount of free disk for RabbitMQ. When ``rabbitmq_disk_free`` drops below this value, all producers are blocked.
* ``rabbitmq_remaining_disk``, the difference between ``rabbitmq_disk_free`` and ``rabbitmq_disk_free_limit``.
* ``rabbitmq_used_memory``, bytes of memory used by the whole RabbitMQ process.
* ``rabbitmq_vm_memory_limit``, the maximum amount of memory allocated for RabbitMQ. When ``rabbitmq_used_memory`` uses more than this value, all producers are blocked.
Queues
^^^^^^
All metrics have a ``queue`` field which contains the name of the RabbitMQ queue.
* ``rabbitmq_queue_consumers``, number of consumers for a given queue.
* ``rabbitmq_queue_memory``, bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.
* ``rabbitmq_queue_messages``, number of messages which are ready to be consumed or not yet acknowledged for the given queue.
* ``rabbitmq_remaining_memory``, the difference between ``rabbitmq_vm_memory_limit`` and ``rabbitmq_used_memory``.