diff --git a/deployment_scripts/puppet/manifests/base.pp b/deployment_scripts/puppet/manifests/base.pp index 56893dbe2..36f2ce1f4 100644 --- a/deployment_scripts/puppet/manifests/base.pp +++ b/deployment_scripts/puppet/manifests/base.pp @@ -377,8 +377,9 @@ if $is_rabbitmq and (hiera('lma::collector::elasticsearch::server', false) or hi # collectd Puppet module. unless $is_controller { class { 'lma_collector::collectd::rabbitmq': - queue => ['/^(\\w*notifications\\.(error|info|warn)|[a-z]+|(metering|event)\.sample)$/'], - require => Class['lma_collector::collectd::base'], + username => 'nova', + password => $rabbit['password'], + require => Class['lma_collector::collectd::base'], } } } diff --git a/deployment_scripts/puppet/manifests/controller.pp b/deployment_scripts/puppet/manifests/controller.pp index f15f00798..1471dba31 100644 --- a/deployment_scripts/puppet/manifests/controller.pp +++ b/deployment_scripts/puppet/manifests/controller.pp @@ -297,7 +297,9 @@ if hiera('lma::collector::influxdb::server', false) { # controller. unless $detach_rabbitmq_enabled { class { 'lma_collector::collectd::rabbitmq': - queue => ['/^(\\w*notifications\\.(error|info|warn)|[a-z]+|(metering|event)\.sample)$/'], + username => 'nova', + password => $rabbit['password'], + require => Class['lma_collector::collectd::base'], } } diff --git a/deployment_scripts/puppet/modules/lma_collector/files/collectd/rabbitmq_info.py b/deployment_scripts/puppet/modules/lma_collector/files/collectd/rabbitmq_info.py index c5da219b8..d4e6b1e50 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/collectd/rabbitmq_info.py +++ b/deployment_scripts/puppet/modules/lma_collector/files/collectd/rabbitmq_info.py @@ -1,9 +1,7 @@ # Name: rabbitmq-collectd-plugin - rabbitmq_info.py -# Author: https://github.com/phrawzty/rabbitmq-collectd-plugin/commits/master # Description: This plugin uses Collectd's Python plugin to obtain RabbitMQ # metrics. # -# Copyright 2012 Daniel Maher # Copyright 2015 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -19,201 +17,95 @@ # limitations under the License. import collectd -import re import collectd_base as base - +import requests NAME = 'rabbitmq_info' -# Override in config by specifying 'RmqcBin'. -RABBITMQCTL_BIN = '/usr/sbin/rabbitmqctl' -# Override in config by specifying 'Vhost'. -VHOST = "/" - -# Used to find disk nodes and running nodes. -CLUSTER_STATUS = re.compile('.*disc,\[([^\]]+)\].*running_nodes,\[([^\]]+)\]', - re.S) +# Override in config by specifying 'Host'. +HOST = '127.0.0.1' +# Override in config by specifying 'Port'. +PORT = '15672' class RabbitMqPlugin(base.Base): - # we need to substract the length of the longest prefix (eg '.consumers') - MAX_QUEUE_IDENTIFIER_LENGTH = base.Base.MAX_IDENTIFIER_LENGTH - 10 - def __init__(self, *args, **kwargs): super(RabbitMqPlugin, self).__init__(*args, **kwargs) self.plugin = NAME - self.rabbitmqctl_bin = RABBITMQCTL_BIN - self.vhost = VHOST - self.re_queues = [] - self.queues = [] - - def _matching_queue(self, name): - for r in self.re_queues: - if r.match(name): - return True - - for q in self.queues: - if q == name: - return True - - return False + self.username = None + self.password = None + self.host = HOST + self.port = PORT + self.session = None def config_callback(self, conf): super(RabbitMqPlugin, self).config_callback(conf) for node in conf.children: - if node.key == 'RmqcBin': - self.rabbitmqctl_bin = node.values[0] - elif node.key == 'Vhost': - self.vhost = node.values[0] - elif node.key == 'Queue': - for val in node.values: - if val.startswith('/') and val.endswith('/') and \ - len(val) > 2: - regex = val[1:len(val) - 1] - try: - self.re_queues.append(re.compile(regex)) - except Exception as e: - self.logger.error( - 'Cannot compile regex {}: {}'.format(regex, e)) - raise e - elif len(val) > 0: - self.queues.append(val) + if node.key == 'Username': + self.username = node.values[0] + if node.key == 'Password': + self.password = node.values[0] + if node.key == 'Host': + self.host = node.values[0] + if node.key == 'Port': + self.port = node.values[0] - else: - self.logger.warning('Unknown config key: %s' % node.key) + if not (self.username and self.password): + self.logger.error('Missing Username and Password parameters') + + self.session = requests.Session() + self.session.auth = (self.username, self.password) + self.session.mount( + 'http://', + requests.adapters.HTTPAdapter(max_retries=self.max_retries) + ) + url = "http://{}:{}".format(self.host, self.port) + self.api_nodes_url = "{}/api/nodes".format(url) + self.api_overview_url = "{}/api/overview".format(url) def itermetrics(self): stats = {} - stats['messages'] = 0 - stats['memory'] = 0 - stats['consumers'] = 0 - stats['queues'] = 0 - stats['unmirrored_queues'] = 0 - - out, err = self.execute([self.rabbitmqctl_bin, '-q', 'status'], - shell=False) - if not out: - self.logger.error('%s: Failed to get the status' % - self.rabbitmqctl_bin) - return - - for v in ('vm_memory_limit', 'disk_free_limit', 'disk_free'): - try: - stats[v] = int(re.findall('{%s,([0-9]+)}' % v, out)[0]) - except: - self.logger.error('%s: Failed to get %s' % - (self.rabbitmqctl_bin, v)) - - mem_str = re.findall('{memory,\s+\[([^\]]+)\]\}', out) - # We are only interested by the total of memory used - # TODO(all): Get all informations about memory usage from mem_str try: - stats['used_memory'] = int(re.findall('total,([0-9]+)', - mem_str[0])[0]) - except: - self.logger.error('%s: Failed to get the memory used by rabbitmq' % - self.rabbitmqctl_bin) + r = self.session.get(self.api_overview_url, timeout=self.timeout) + overview = r.json() + except Exception as e: + self.logger.warning("Got exception for '{}': {}".format( + self.api_nodes_url, e) + ) + return + objects = overview['object_totals'] + stats['queues'] = objects['queues'] + stats['consumers'] = objects['consumers'] + stats['connections'] = objects['connections'] + stats['exchanges'] = objects['exchanges'] + stats['channels'] = objects['channels'] + stats['messages'] = overview['queue_totals']['messages'] + stats['running_nodes'] = len(overview['contexts']) - if 'vm_memory_limit' in stats and 'used_memory' in stats: - stats['remaining_memory'] = \ - stats['vm_memory_limit'] - stats['used_memory'] - if 'disk_free' in stats and 'disk_free_limit' in stats: - stats['remaining_disk'] = \ - stats['disk_free'] - stats['disk_free_limit'] + for k, v in stats.iteritems(): + yield {'type_instance': k, 'values': v} - out, err = self.execute([self.rabbitmqctl_bin, '-q', 'cluster_status'], - shell=False) - if not out: - self.logger.error('%s: Failed to get the cluster status' % - self.rabbitmqctl_bin) + stats = {} + nodename = overview['node'] + try: + r = self.session.get("{}/{}".format(self.api_nodes_url, nodename), + timeout=self.timeout) + node = r.json() + except Exception as e: + self.logger.warning("Got exception for '{}': {}".format( + self.api_node_url, e) + ) return - # TODO(all): Need to be modified in case we are using RAM nodes. - status = CLUSTER_STATUS.findall(out) - if len(status) == 0: - self.logger.error('%s: Failed to parse (%s)' % - (self.rabbitmqctl_bin, out)) - else: - stats['total_nodes'] = len(status[0][0].split(",")) - stats['running_nodes'] = len(status[0][1].split(",")) + stats['disk_free_limit'] = node['disk_free_limit'] + stats['disk_free'] = node['disk_free'] + stats['remaining_disk'] = node['disk_free'] - node['disk_free_limit'] - out, err = self.execute([self.rabbitmqctl_bin, '-q', - 'list_connections'], shell=False) - if not out: - self.logger.error('%s: Failed to get the number of connections' % - self.rabbitmqctl_bin) - return - stats['connections'] = len(out.split('\n')) - - out, err = self.execute([self.rabbitmqctl_bin, '-q', 'list_exchanges'], - shell=False) - if not out: - self.logger.error('%s: Failed to get the number of exchanges' % - self.rabbitmqctl_bin) - return - stats['exchanges'] = len(out.split('\n')) - - out, err = self.execute([self.rabbitmqctl_bin, '-q', '-p', self.vhost, - 'list_queues', 'name', 'messages', 'memory', - 'consumers', 'slave_pids', - 'synchronised_slave_pids'], shell=False) - if not out: - self.logger.error('%s: Failed to get the list of queues' % - self.rabbitmqctl_bin) - return - - for line in out.split('\n'): - ctl_stats = line.split('\t') - try: - ctl_stats[1] = int(ctl_stats[1]) - ctl_stats[2] = int(ctl_stats[2]) - ctl_stats[3] = int(ctl_stats[3]) - except: - continue - - stats['queues'] += 1 - stats['messages'] += ctl_stats[1] - stats['memory'] += ctl_stats[2] - stats['consumers'] += ctl_stats[3] - - queue_name = ctl_stats[0] - if self._matching_queue(queue_name): - meta = { - 'queue': ctl_stats[0][:self.MAX_QUEUE_IDENTIFIER_LENGTH] - } - yield { - 'type_instance': 'queue_messages', - 'values': ctl_stats[1], - 'meta': meta - } - yield { - 'type_instance': 'queue_memory', - 'values': ctl_stats[2], - 'meta': meta - } - yield { - 'type_instance': 'queue_consumers', - 'values': ctl_stats[3], - 'meta': meta - } - - # we need to check if the list of synchronised slaves is - # equal to the list of slaves. - try: - slaves = re.findall('<([a-zA-Z@\-.0-9]+)>', ctl_stats[4]) - for s in slaves: - if s not in ctl_stats[5]: - stats['unmirrored_queues'] += 1 - break - except IndexError: - pass - - if not stats['memory'] > 0: - self.logger.warning( - '%s reports 0 memory usage. This is probably incorrect.' % - self.rabbitmqctl_bin) + stats['used_memory'] = node['mem_used'] + stats['vm_memory_limit'] = node['mem_limit'] + stats['remaining_memory'] = node['mem_limit'] - node['mem_used'] for k, v in stats.iteritems(): yield {'type_instance': k, 'values': v} @@ -222,10 +114,6 @@ class RabbitMqPlugin(base.Base): plugin = RabbitMqPlugin(collectd) -def init_callback(): - plugin.restore_sigchld() - - def config_callback(conf): plugin.config_callback(conf) @@ -233,6 +121,5 @@ def config_callback(conf): def read_callback(): plugin.read_callback() -collectd.register_init(init_callback) collectd.register_config(config_callback) collectd.register_read(read_callback) diff --git a/deployment_scripts/puppet/modules/lma_collector/manifests/collectd/rabbitmq.pp b/deployment_scripts/puppet/modules/lma_collector/manifests/collectd/rabbitmq.pp index d8dcc03cb..457113587 100644 --- a/deployment_scripts/puppet/modules/lma_collector/manifests/collectd/rabbitmq.pp +++ b/deployment_scripts/puppet/modules/lma_collector/manifests/collectd/rabbitmq.pp @@ -14,23 +14,35 @@ # class lma_collector::collectd::rabbitmq ( - $queue = [], + $username, + $password, + $host = undef, + $port = undef, ) { - validate_array($queue) - - # Add quotes around the array values - $real_queue = suffix(prefix($queue, '"'), '"') - - if $queue { - $config = { - 'Queue' => $real_queue, + if $host { + $host_config = { + 'Host' => "\"${host}\"", } } else { - $config = {} + $host_config = {} + } + + if $port { + validate_integer($port) + $port_config = { + 'Port' => "\"${port}\"", + } + } else { + $port_config = {} + } + + $config = { + 'Username' => "\"${username}\"", + 'Password' => "\"${password}\"", } lma_collector::collectd::python { 'rabbitmq_info': - config => $config, + config => merge($config, $host_config, $port_config) } } diff --git a/deployment_scripts/puppet/modules/lma_collector/spec/classes/lma_collector_collectd_rabbitmq_spec.rb b/deployment_scripts/puppet/modules/lma_collector/spec/classes/lma_collector_collectd_rabbitmq_spec.rb index 51d5a24c3..36c66a1cf 100644 --- a/deployment_scripts/puppet/modules/lma_collector/spec/classes/lma_collector_collectd_rabbitmq_spec.rb +++ b/deployment_scripts/puppet/modules/lma_collector/spec/classes/lma_collector_collectd_rabbitmq_spec.rb @@ -19,16 +19,23 @@ describe 'lma_collector::collectd::rabbitmq' do :osfamily => 'Debian', :concat_basedir => '/foo'} end - describe 'with defaults' do - it { is_expected.to contain_lma_collector__collectd__python('rabbitmq_info') } - end - - describe 'with queue parameter' do + describe 'with minimal parameters' do let(:params) do - {:queue => ['/^(foo|bar)\\w+$/', 'notif']} + {:username => 'foouser', :password => 'foopass' } end it { is_expected.to contain_lma_collector__collectd__python('rabbitmq_info') \ - .with_config({'Queue' => ['"/^(foo|bar)\\w+$/"', '"notif"']}) + .with_config({'Username' => '"foouser"', 'Password' => '"foopass"'}) + } + end + describe 'with host and port parameters' do + let(:params) do + {:username => 'foouser', :password => 'foopass', :host => 'foohost', + :port => 123, + } + end + it { is_expected.to contain_lma_collector__collectd__python('rabbitmq_info') \ + .with_config({'Username' => '"foouser"', 'Password' => '"foopass"', + 'Port' => '"123"', 'Host' => '"foohost"'}) } end end diff --git a/doc/user/source/metrics/rabbitmq.rst b/doc/user/source/metrics/rabbitmq.rst index c9b1042ae..7bd33a34d 100644 --- a/doc/user/source/metrics/rabbitmq.rst +++ b/doc/user/source/metrics/rabbitmq.rst @@ -5,26 +5,14 @@ Cluster * ``rabbitmq_connections``, total number of connections. * ``rabbitmq_consumers``, total number of consumers. -* ``rabbitmq_disk_free``, the disk free space. -* ``rabbitmq_disk_free_limit``, the minimum amount of free disk for RabbitMQ. When ``rabbitmq_disk_free`` drops below this value, all producers are blocked. +* ``rabbitmq_channels``, total number of channels. * ``rabbitmq_exchanges``, total number of exchanges. -* ``rabbitmq_memory``, bytes of memory consumed by the Erlang process associated with all queues, including stack, heap and internal structures. * ``rabbitmq_messages``, total number of messages which are ready to be consumed or not yet acknowledged. * ``rabbitmq_queues``, total number of queues. -* ``rabbitmq_remaining_disk``, the difference between ``rabbitmq_disk_free`` and ``rabbitmq_disk_free_limit``. -* ``rabbitmq_remaining_memory``, the difference between ``rabbitmq_vm_memory_limit`` and ``rabbitmq_used_memory``. * ``rabbitmq_running_nodes``, total number of running nodes in the cluster. -* ``rabbitmq_total_nodes``, total number of nodes in the cluster. -* ``rabbitmq_unmirrored_queues``, total number of queues that are not mirrored. +* ``rabbitmq_disk_free``, the disk free space. +* ``rabbitmq_disk_free_limit``, the minimum amount of free disk for RabbitMQ. When ``rabbitmq_disk_free`` drops below this value, all producers are blocked. +* ``rabbitmq_remaining_disk``, the difference between ``rabbitmq_disk_free`` and ``rabbitmq_disk_free_limit``. * ``rabbitmq_used_memory``, bytes of memory used by the whole RabbitMQ process. * ``rabbitmq_vm_memory_limit``, the maximum amount of memory allocated for RabbitMQ. When ``rabbitmq_used_memory`` uses more than this value, all producers are blocked. - - -Queues -^^^^^^ - -All metrics have a ``queue`` field which contains the name of the RabbitMQ queue. - -* ``rabbitmq_queue_consumers``, number of consumers for a given queue. -* ``rabbitmq_queue_memory``, bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures. -* ``rabbitmq_queue_messages``, number of messages which are ready to be consumed or not yet acknowledged for the given queue. +* ``rabbitmq_remaining_memory``, the difference between ``rabbitmq_vm_memory_limit`` and ``rabbitmq_used_memory``.