Integrate collectd into the LMA collector

The collectd service collects metrics from many sources:
- System (like CPU, RAM, disk, network and so on)
- MySQL
- RabbitMQ
- OpenStack services

It sends the data to the LMA collector using its HTTP JSON output. The
LMA collector then decodes this input and injects it into the Heka
pipeline. Eventually the metrics will be sent to InfluxDB.

Note: until we have the InfluxDB-Grafana plugin ready, the InfluxDB parameters
are hidden in the Fuel UI.

Change-Id: I59577fcdc014be8d0f1d4824ef416afda3604506
This commit is contained in:
Simon Pasquier 2015-03-10 16:46:13 +01:00
parent 2ed4d03c73
commit 77a1e6eb6d
26 changed files with 1278 additions and 14 deletions

View File

@ -33,16 +33,49 @@ class { 'lma_collector::logs::monitor':
require => Class['lma_collector'],
}
$influxdb_mode = $fuel_settings['lma_collector']['influxdb_mode']
case $influxdb_mode {
'remote','local': {
if $influxdb_mode == 'remote' {
$influxdb_server = $fuel_settings['lma_collector']['influxdb_address']
}
else {
$influxdb_node_name = $fuel_settings['lma_collector']['influxdb_node_name']
$influxdb_nodes = filter_nodes($fuel_settings['nodes'], 'user_node_name', $influxdb_node_name)
if size($influxdb_nodes) < 1 {
fail("Could not find node '${influxdb_node_name}' in the environment")
}
$influxdb_server = $influxdb_nodes[0]['internal_address']
}
class { 'lma_collector::collectd::base':
}
class { 'lma_collector::influxdb':
server => $influxdb_server,
database => $fuel_settings['lma_collector']['influxdb_database'],
user => $fuel_settings['lma_collector']['influxdb_user'],
password => $fuel_settings['lma_collector']['influxdb_password'],
}
}
'disabled': {
# Nothing to do
}
default: {
fail("'${influxdb_mode}' mode not supported for InfluxDB")
}
}
$elasticsearch_mode = $fuel_settings['lma_collector']['elasticsearch_mode']
case $elasticsearch_mode {
'remote': {
$es_server = $fuel_settings['lma_collector']['elasticsearch_address']
}
'local': {
$node_name = $fuel_settings['lma_collector']['elasticsearch_node_name']
$es_nodes = filter_nodes($fuel_settings['nodes'], 'user_node_name', $node_name)
$es_node_name = $fuel_settings['lma_collector']['elasticsearch_node_name']
$es_nodes = filter_nodes($fuel_settings['nodes'], 'user_node_name', $es_node_name)
if size($es_nodes) < 1 {
fail("Could not find node '${node_name}' in the environment")
fail("Could not find node '${es_node_name}' in the environment")
}
$es_server = $es_nodes[0]['internal_address']
}

View File

@ -3,6 +3,8 @@ $fuel_settings = parseyaml(file('/etc/astute.yaml'))
include lma_collector::params
$management_vip = $fuel_settings['management_vip']
$enable_notifications = $fuel_settings['lma_collector']['enable_notifications']
if $fuel_settings['ceilometer']['enabled'] {
$notification_topics = [$lma_collector::params::openstack_topic, $lma_collector::params::lma_topic]
@ -11,6 +13,20 @@ else {
$notification_topics = [$lma_collector::params::lma_topic]
}
if $fuel_settings['rabbit']['user'] {
$rabbitmq_user = $fuel_settings['rabbit']['user']
}
else {
$rabbitmq_user = 'nova'
}
if $fuel_settings['deployment_mode'] =~ /^ha/ {
$rabbitmq_pid_file = '/var/run/rabbitmq/p_pid'
}
else {
$rabbitmq_pid_file = '/var/run/rabbitmq/pid'
}
# Logs
class { 'lma_collector::logs::openstack': }
@ -23,13 +39,6 @@ if $fuel_settings['deployment_mode'] =~ /^ha/ {
}
# Notifications
if $fuel_settings['rabbit']['user'] {
$rabbitmq_user = $fuel_settings['rabbit']['user']
}
else {
$rabbitmq_user = 'nova'
}
if $enable_notifications {
class { 'lma_collector::notifications::controller':
host => $fuel_settings['management_vip'],
@ -38,3 +47,19 @@ if $enable_notifications {
topics => $notification_topics,
}
}
# Metrics
if $fuel_settings['lma_collector']['influxdb_mode'] != 'disabled' {
class { 'lma_collector::collectd::controller':
service_user => 'nova',
service_password => $fuel_settings['nova']['user_password'],
service_tenant => 'services',
keystone_url => "http://${management_vip}:5000/v2.0",
rabbitmq_pid_file => $rabbitmq_pid_file,
}
class { 'lma_collector::collectd::mysql':
username => 'nova',
password => $fuel_settings['nova']['db_password'],
}
}

View File

@ -0,0 +1,18 @@
define heka::encoder::payload (
$ensure = present,
$config_dir,
$append_newlines = false,
$prefix_ts = false,
) {
include heka::params
file { "${config_dir}/encoder-${title}.toml":
ensure => $ensure,
content => template('heka/encoder/payload.toml.erb'),
mode => '0600',
owner => $heka::params::user,
group => $heka::params::user,
}
}

View File

@ -0,0 +1,18 @@
define heka::input::httplisten (
$ensure = present,
$config_dir,
$decoder,
$address = '127.0.0.1',
$port,
) {
include heka::params
file { "${config_dir}/httplisten-${title}.toml":
ensure => $ensure,
content => template('heka/input/httplisten.toml.erb'),
mode => '0600',
owner => $heka::params::user,
group => $heka::params::user,
}
}

View File

@ -0,0 +1,23 @@
define heka::output::http (
$ensure = present,
$config_dir,
$url,
$encoder = $title,
$message_matcher = 'FALSE',
$username = undef,
$password = undef,
$timeout = undef,
$method = 'POST',
) {
include heka::params
file { "${config_dir}/output-${title}.toml":
ensure => $ensure,
content => template('heka/output/http.toml.erb'),
mode => '0600',
owner => $heka::params::user,
group => $heka::params::user,
}
}

View File

@ -4,7 +4,8 @@ filename = "<%= @filename %>"
<% if @config.size() > 0 %>
[<%= @title %>_decoder.config]
<% @config.each do |k,v| %>
<% @config.each do |k,v| %>
<% next if v.nil? or v == :undef -%>
<%= k %> = <%= v.is_a?(String) ? "'#{v}'" : v %>
<% end %>
<% end %>

View File

@ -0,0 +1,4 @@
[<%= @title %>_encoder]
type = "PayloadEncoder"
append_newlines = <%= @append_newlines %>
prefix_ts = <%= @prefix_ts %>

View File

@ -9,7 +9,8 @@ ticker_interval = <%= @ticker_interval %>
<% if @config.size() > 0 %>
[<%= @title %>_filter.config]
<% @config.each do |k,v| %>
<%= @k %> = '<%= @v %>'
<% @config.each do |k,v| %>
<% next if v.nil? or v == :undef -%>
<%= k %> = <%= v.is_a?(String) ? "'#{v}'" : v %>
<% end %>
<% end %>

View File

@ -0,0 +1,5 @@
[<%= @title %>_httplisten]
type="HttpListenInput"
address = "<%= @address %>:<%= @port %>"
decoder = "<%= @decoder %>_decoder"
splitter = "NullSplitter"

View File

@ -0,0 +1,15 @@
[<%= @title %>_output]
type = "HttpOutput"
message_matcher = "<%= @message_matcher %>"
encoder = "<%= @encoder %>_encoder"
address = "<%= @url %>"
<% if @username -%>
username = "<%= @username %>"
<% end -%>
<% if @password -%>
password = "<%= @password %>"
<% end -%>
<% unless @timeout.nil? -%>
http_timeout = <%= @timeout.to_i() * 1000 %>
<% end -%>
method = "<%= @method %>"

View File

@ -0,0 +1,100 @@
#!/usr/bin/python
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Collectd plugin for checking the status of OpenStack API services
import collectd
import openstack
PLUGIN_NAME = 'check_openstack_api'
INTERVAL = 60
class APICheckPlugin(openstack.CollectdPlugin):
""" Class to check the status of OpenStack API services.
"""
OK = 0
FAIL = 1
UNKNOWN = 2
# TODO(pasquier-s): add heat-cfn, nova_ec2, cinderv2, ceilometer, murano,
# sahara
RESOURCE_MAP = {
'cinder': 'volumes',
'glance': 'v1/images',
'heat': 'stacks',
'keystone': 'tenants',
'neutron': 'v2.0/networks',
'nova': 'flavors',
}
def check_api(self):
""" Check the status of all the API services.
Yields a list of dict items with 'service', 'status' (either OK,
FAIL or UNKNOWN) and 'region' keys.
"""
catalog = self.service_catalog
for service in catalog:
if service['name'] not in self.RESOURCE_MAP:
self.logger.warning("Don't know how to check service '%s'" %
service['name'])
status = self.UNKNOWN
else:
r = self.get(service['name'],
self.RESOURCE_MAP[service['name']])
if not r or r.status_code < 200 or r.status_code > 299:
status = self.FAIL
else:
status = self.OK
yield {
'service': service['name'],
'status': status,
'region': service['region']
}
def read_callback(self):
for item in self.check_api():
value = collectd.Values(
plugin=PLUGIN_NAME,
plugin_instance=item['service'],
type='gauge',
type_instance=item['region'],
interval=INTERVAL,
# w/a for https://github.com/collectd/collectd/issues/716
meta={'0': True}
)
if item['status'] == self.OK:
value.values = [1]
elif item['status'] == self.FAIL:
value.values = [0]
else:
# skip if status is UNKNOWN
continue
value.dispatch()
plugin = APICheckPlugin(collectd)
def config_callback(conf):
plugin.config_callback(conf)
def read_callback():
plugin.read_callback()
collectd.register_config(config_callback)
collectd.register_read(read_callback, INTERVAL)

View File

@ -0,0 +1,88 @@
#!/usr/bin/python
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Collectd plugin for getting hypervisor statistics from Nova
import collectd
import openstack
PLUGIN_NAME = 'hypervisor_stats'
INTERVAL = 60
class HypervisorStatsPlugin(openstack.CollectdPlugin):
""" Class to report the statistics on Nova hypervisors.
"""
VALUE_MAP = {
'current_workload': 'running_tasks',
'running_vms': 'running_instances',
'local_gb_used': 'used_disk_GB',
'free_disk_gb': 'free_disk_GB',
'memory_mb_used': 'used_ram_MB',
'free_ram_mb': 'free_ram_MB',
'vcpus_used': 'used_vcpus',
}
def config_callback(self, config):
super(HypervisorStatsPlugin, self).config_callback(config)
for node in config.children:
if node.key == 'CpuAllocationRatio':
self.extra_config['cpu_ratio'] = float(node.values[0])
if 'cpu_ratio' not in self.extra_config:
self.logger.warning('CpuAllocationRatio parameter not set')
def dispatch_value(self, hostname, name, value):
v = collectd.Values(
plugin=PLUGIN_NAME,
plugin_instance=hostname,
type='gauge',
type_instance=name,
interval=INTERVAL,
# w/a for https://github.com/collectd/collectd/issues/716
meta={'0': True},
values=[value]
)
v.dispatch()
def read_callback(self):
r = self.get('nova', 'os-hypervisors/detail')
if not r:
self.logger.warning("Could not get hypervisor statistics")
return
for h in r.json().get('hypervisors', {}):
# keep only system's hostname
hostname = h['hypervisor_hostname'].split('.')[0]
for k, v in self.VALUE_MAP.iteritems():
self.dispatch_value(hostname,
v, h.get(k, 0))
if 'cpu_ratio' in self.extra_config:
vcpus = int(self.extra_config['cpu_ratio'] * h.get('vcpus', 0))
self.dispatch_value(hostname,
'free_vcpus',
vcpus - h.get('vcpus_used', 0))
plugin = HypervisorStatsPlugin(collectd)
def config_callback(conf):
plugin.config_callback(conf)
def read_callback():
plugin.read_callback()
collectd.register_config(config_callback)
collectd.register_read(read_callback, INTERVAL)

View File

@ -0,0 +1,186 @@
#!/usr/bin/python
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import dateutil.parser
import dateutil.tz
import requests
import simplejson as json
class OSClient(object):
""" Base class for querying the OpenStack API endpoints.
It uses the Keystone service catalog to discover the API endpoints.
"""
EXPIRATION_TOKEN_DELTA = datetime.timedelta(0, 30)
def __init__(self, username, password, tenant, keystone_url, timeout,
logger):
self.logger = logger
self.username = username
self.password = password
self.tenant_name = tenant
self.keystone_url = keystone_url
self.service_catalog = []
self.tenant_id = None
self.timeout = timeout
self.token = None
self.valid_until = None
self.get_token()
def is_valid_token(self):
now = datetime.datetime.now(tz=dateutil.tz.tzutc())
return self.token and self.valid_until and self.valid_until > now
def clear_token(self):
self.token = None
self.valid_until = None
def get_token(self):
self.clear_token()
data = json.dumps({
"auth":
{
'tenantName': self.tenant_name,
'passwordCredentials':
{
'username': self.username,
'password': self.password
}
}
}
)
self.logger.info("Trying to get token from '%s'" % self.keystone_url)
r = self.make_request(requests.post,
'%s/tokens' % self.keystone_url, data=data,
token_required=False)
if not r:
self.logger.error("Cannot get a valid token from %s" %
self.keystone_url)
return
if r.status_code < 200 or r.status_code > 299:
self.logger.error("%s responded with code %d" %
(self.keystone_url, r.status_code))
return
data = r.json()
self.logger.debug("Got response from Keystone: '%s'" % data)
self.token = data['access']['token']['id']
self.tenant_id = data['access']['token']['tenant']['id']
self.valid_until = dateutil.parser.parse(
data['access']['token']['expires']) - self.EXPIRATION_TOKEN_DELTA
self.service_catalog = []
for item in data['access']['serviceCatalog']:
endpoint = item['endpoints'][0]
self.service_catalog.append({
'name': item['name'],
'region': endpoint['region'],
'service_type': item['type'],
'url': endpoint['internalURL'],
})
self.logger.debug("Got token '%s'" % self.token)
return self.token
def make_request(self, func, url, data=None, token_required=True):
kwargs = {
'url': url,
'timeout': self.timeout,
'headers': {'Content-type': 'application/json'}
}
if token_required and not self.is_valid_token() and \
not self.get_token():
self.logger.error("Aborting request, no valid token")
return
else:
kwargs['headers']['X-Auth-Token'] = self.token
if data is not None:
kwargs['data'] = data
try:
r = func(**kwargs)
r.json()
except Exception as e:
self.logger.error("Got exception for '%s': '%s'" %
(kwargs['url'], e))
return
self.logger.info("%s responded with status code %d" %
(kwargs['url'], r.status_code))
if r.status_code == 401:
# Clear token in case it is revoked or invalid
self.clear_token()
return r
class CollectdPlugin(object):
def __init__(self, logger):
self.os_client = None
self.logger = logger
self.timeout = 5
self.extra_config = {}
def _build_url(self, service, resource):
url = (self.get_service(service) or {}).get('url')
if url:
if url[-1] != '/':
url += '/'
url = "%s%s" % (url, resource)
else:
self.logger.error("Service '%s' not found in catalog" % service)
return url
def get(self, service, resource):
url = self._build_url(service, resource)
if not url:
return
self.logger.info("GET '%s'" % url)
return self.os_client.make_request(requests.get, url)
def post(self, service, resource, data):
url = self._build_url(service, resource)
if not url:
return
self.logger.info("POST '%s'" % url)
return self.os_client.make_request(requests.post, url, data)
@property
def service_catalog(self):
return self.os_client.service_catalog
def get_service(self, service_name):
return next((x for x in self.service_catalog
if x['name'] == service_name), None)
def config_callback(self, config):
for node in config.children:
if node.key == 'Timeout':
self.timeout = int(node.values[0])
elif node.key == 'Username':
username = node.values[0]
elif node.key == 'Password':
password = node.values[0]
elif node.key == 'Tenant':
tenant_name = node.values[0]
elif node.key == 'KeystoneUrl':
keystone_url = node.values[0]
self.os_client = OSClient(username, password, tenant_name,
keystone_url, self.timeout, self.logger)
def read_callback(self):
raise "read_callback method needs to be overriden!"

View File

@ -0,0 +1,211 @@
# Name: rabbitmq-collectd-plugin - rabbitmq_info.py
# Author: https://github.com/phrawzty/rabbitmq-collectd-plugin/commits/master
# Description: This plugin uses Collectd's Python plugin to obtain RabbitMQ
# metrics.
#
# Copyright 2012 Daniel Maher
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collectd
import subprocess
import re
NAME = 'rabbitmq_info'
# Override in config by specifying 'RmqcBin'.
RABBITMQCTL_BIN = '/usr/sbin/rabbitmqctl'
# Override in config by specifying 'PmapBin'
PMAP_BIN = '/usr/bin/pmap'
# Override in config by specifying 'PidofBin'.
PIDOF_BIN = '/bin/pidof'
# Override in config by specifying 'PidFile.
PID_FILE = "/var/run/rabbitmq/pid"
# Override in config by specifying 'Vhost'.
VHOST = "/"
# Override in config by specifying 'Verbose'.
VERBOSE_LOGGING = False
# Used to find disk nodes and running nodes.
CLUSTER_STATUS = re.compile('.*disc,\[([^\]]+)\].*running_nodes,\[([^\]]+)\]',
re.S)
# Obtain the interesting statistical info
def get_stats():
stats = {}
stats['messages'] = 0
stats['memory'] = 0
stats['consumers'] = 0
stats['queues'] = 0
stats['pmap_mapped'] = 0
stats['pmap_used'] = 0
stats['pmap_shared'] = 0
# call rabbitmqctl to get cluster status
try:
p = subprocess.Popen([RABBITMQCTL_BIN, '-q', 'cluster_status'],
shell=False, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
except:
logger('err', '%s: Failed to get status' % RABBITMQCTL_BIN)
return None
# TODO: Need to be modified in case we are using RAM nodes.
status = CLUSTER_STATUS.findall(p.communicate()[0])
if len(status) == 0:
logger('err', '%s: Failed to parse (%s)' % (RABBITMQCTL_BIN, status))
else:
stats['total_nodes'] = len(status[0][0].split(","))
stats['running_nodes'] = len(status[0][1].split(","))
try:
list_connections = subprocess.Popen([RABBITMQCTL_BIN, '-q', 'list_connections'],
shell=False,
stdout=subprocess.PIPE)
stats['connections'] = len(list_connections.communicate()[0].split())
except:
logger('err', '%s: Failed to get the number of connections' % RABBITMQCTL_BIN)
return None
try:
list_exchanges = subprocess.Popen([RABBITMQCTL_BIN, '-q', 'list_exchanges'],
shell=False,
stdout=subprocess.PIPE)
stats['exchanges'] = len(list_exchanges.communicate()[0].split())
except:
logger('err', '%s: Failed to get the number of exchanges' % RABBITMQCTL_BIN)
return None
# call rabbitmqctl
try:
p = subprocess.Popen([RABBITMQCTL_BIN, '-q', '-p', VHOST, 'list_queues',
'name', 'messages', 'memory', 'consumers'],
shell=False,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except:
logger('err', '%s: Failed to get the list of queues' % RABBITMQCTL_BIN)
return None
for line in p.stdout.readlines():
ctl_stats = line.split()
try:
ctl_stats[1] = int(ctl_stats[1])
ctl_stats[2] = int(ctl_stats[2])
ctl_stats[3] = int(ctl_stats[3])
except:
continue
queue_name = ctl_stats[0]
stats['queues'] += 1
stats['messages'] += ctl_stats[1]
stats['memory'] += ctl_stats[2]
stats['consumers'] += ctl_stats[3]
stats['%s.messages' % queue_name] = ctl_stats[1]
stats['%s.memory' % queue_name] = ctl_stats[2]
stats['%s.consumers' % queue_name] = ctl_stats[3]
if not stats['memory'] > 0:
logger('warn', '%s reports 0 memory usage. This is probably incorrect.'
% RABBITMQCTL_BIN)
# get the pid of rabbitmq
try:
with open(PID_FILE, 'r') as f:
pid = f.read().strip()
except:
logger('err', 'Unable to read %s' % PID_FILE)
return None
# use pmap to get proper memory stats
try:
p = subprocess.Popen([PMAP_BIN, '-d', pid], shell=False,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except:
logger('err', 'Failed to run %s' % PMAP_BIN)
return None
line = p.stdout.readlines()[-1].strip()
if re.match('mapped', line):
m = re.match(r"\D+(\d+)\D+(\d+)\D+(\d+)", line)
stats['pmap_mapped'] = int(m.group(1))
stats['pmap_used'] = int(m.group(2))
stats['pmap_shared'] = int(m.group(3))
else:
logger('warn', '%s returned something strange.' % PMAP_BIN)
return None
# Verbose output
logger('verb', '[rmqctl] Messages: %i, Memory: %i, Consumers: %i' %
(stats['messages'], stats['memory'], stats['consumers']))
logger('verb', '[pmap] Mapped: %i, Used: %i, Shared: %i' %
(stats['pmap_mapped'], stats['pmap_used'], stats['pmap_shared']))
return stats
# Config data from collectd
def configure_callback(conf):
global RABBITMQCTL_BIN, PMAP_BIN, PID_FILE, VERBOSE_LOGGING, VHOST
for node in conf.children:
if node.key == 'RmqcBin':
RABBITMQCTL_BIN = node.values[0]
elif node.key == 'PmapBin':
PMAP_BIN = node.values[0]
elif node.key == 'PidFile':
PID_FILE = node.values[0]
elif node.key == 'Verbose':
VERBOSE_LOGGING = bool(node.values[0])
elif node.key == 'Vhost':
VHOST = node.values[0]
else:
logger('warn', 'Unknown config key: %s' % node.key)
# Send info to collectd
def read_callback():
logger('verb', 'read_callback')
info = get_stats()
if not info:
logger('err', 'No information received - very bad.')
return
logger('verb', 'About to trigger the dispatch..')
# send values
for key in info:
logger('verb', 'Dispatching %s : %i' % (key, info[key]))
val = collectd.Values(plugin=NAME)
val.type = 'gauge'
val.type_instance = key
val.values = [int(info[key])]
# w/a for https://github.com/collectd/collectd/issues/716
val.meta = {'0': True}
val.dispatch()
# Send log messages (via collectd)
def logger(t, msg):
if t == 'err':
collectd.error('%s: %s' % (NAME, msg))
if t == 'warn':
collectd.warning('%s: %s' % (NAME, msg))
elif t == 'verb' and VERBOSE_LOGGING is True:
collectd.info('%s: %s' % (NAME, msg))
# Runtime
collectd.register_config(configure_callback)
collectd.warning('Initialising rabbitmq_info')
collectd.register_read(read_callback)

View File

@ -0,0 +1,120 @@
-- Copyright 2015 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require "string"
require "cjson"
local utils = require 'lma_utils'
local sep = '.'
function process_message ()
local ok, samples = pcall(cjson.decode, read_message("Payload"))
if not ok then
-- TODO: log error
return -1
end
for _, sample in ipairs(samples) do
local metric_prefix = sample['type']
if sample['type_instance'] ~= "" then metric_prefix = metric_prefix .. sep .. sample['type_instance'] end
local metric_source = sample['plugin']
for i, value in ipairs(sample['values']) do
local metric_name = metric_prefix
if sample['dsnames'][i] ~= "value" then metric_name = metric_name .. sep .. sample['dsnames'][i] end
local msg = {
Timestamp = sample['time'] * 1e9, -- Heka expects nanoseconds
Hostname = sample['host'],
Logger = "collectd",
Payload = cjson.encode(sample),
Severity = 6,
Type = "metric",
Fields = {
hostname = sample['host'],
interval = sample['interval'],
source = metric_source,
type = sample['dstypes'][i],
value = value,
}
}
-- Normalize metric name, unfortunately collectd plugins aren't
-- always consistent on metric namespaces so we need a few if/else
-- statements to cover all cases.
if metric_source == 'df' then
local mount = sample['plugin_instance']
local entity
if sample['type'] == 'df_inodes' then
entity = 'inodes'
else -- sample['type'] == 'df_complex'
entity = 'space'
end
msg['Fields']['name'] = 'fs' .. sep .. mount .. sep .. entity .. sep .. sample['type_instance']
msg['Fields']['device'] = '/' .. string.gsub(mount, '-', '/')
elseif metric_source == 'disk' then
msg['Fields']['device'] = sample['plugin_instance']
msg['Fields']['name'] = 'disk' .. sep .. sample['plugin_instance'] .. sep .. metric_name
elseif metric_source == 'cpu' then
msg['Fields']['device'] = 'cpu' .. sample['plugin_instance']
msg['Fields']['name'] = 'cpu' .. sep .. sample['plugin_instance'] .. sep .. sample['type_instance']
elseif metric_source == 'interface' then
msg['Fields']['device'] = sample['plugin_instance']
msg['Fields']['name'] = 'net' .. sep .. sample['plugin_instance'] .. sep .. sample['type'] .. sep .. sample['dsnames'][i]
elseif metric_source == 'processes' then
if sample['type'] == 'ps_state' then
msg['Fields']['name'] = 'processes' .. sep .. 'state' .. sep .. sample['type_instance']
else
msg['Fields']['name'] = 'processes' .. sep .. sample['type']
end
elseif metric_source == 'mysql' then
if sample['type'] == 'threads' then
msg['Fields']['name'] = 'mysql_' .. metric_name
else
msg['Fields']['name'] = metric_name
end
elseif metric_source == 'check_openstack_api' then
-- OpenStack API metrics
-- 'plugin_instance' = <service name>
msg['Fields']['name'] = 'openstack' .. sep .. sample['plugin_instance'] .. sep .. 'check_api'
if sample['type_instance'] ~= nil and sample['type_instance'] ~= '' then
msg['Fields']['os_region'] = sample['type_instance']
end
elseif metric_source == 'hypervisor_stats' then
-- OpenStack hypervisor metrics
-- 'plugin_instance' = <hostname>
-- 'type_instance' = <metric name> which can end by _MB or _GB
msg['Fields']['hostname'] = sample['plugin_instance']
msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep
local name, unit
name, unit = string.match(sample['type_instance'], '^(.+)_(.B)$')
if name then
msg['Fields']['name'] = msg['Fields']['name'] .. name
msg.Fields['value'] = {value = msg.Fields['value'], representation = unit}
else
msg['Fields']['name'] = msg['Fields']['name'] .. sample['type_instance']
end
elseif metric_source == 'rabbitmq_info' then
msg['Fields']['name'] = 'rabbitmq' .. sep .. sample['type_instance']
else
msg['Fields']['name'] = metric_name
end
utils.inject_tags(msg)
inject_message(msg)
end
end
return 0
end

View File

@ -0,0 +1,57 @@
-- Copyright 2015 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require "cjson"
require "string"
require "os"
local last_flush = os.time()
local datapoints = {}
local flush_count = read_config('flush_count') or 100
local flush_interval = read_config('flush_interval') or 5
function flush ()
local now = os.time()
if #datapoints > 0 and (#datapoints > flush_count or now - last_flush > flush_interval) then
inject_payload("json", "influxdb", cjson.encode(datapoints))
datapoints = {}
last_flush = now
end
end
function process_message ()
local ts = read_message("Timestamp") / 1e6 -- InfluxDB defaults to ms
local hostname = read_message("Fields[hostname]") or read_message("Hostname")
local metric_name = read_message("Fields[name]")
local value = read_message("Fields[value]")
if value == nil and metric_name == nil then
return -1
end
local serie_name = string.format('%s.%s', hostname, metric_name)
datapoints[#datapoints+1] = {
name = serie_name,
columns = {"time", "value"},
points = {{ts, value}}
}
flush()
return 0
end
function timer_event(ns)
flush()
end

View File

@ -0,0 +1,79 @@
class lma_collector::collectd::base {
include lma_collector::params
include lma_collector::service
$port = $lma_collector::params::collectd_port
class { '::collectd':
purge => true,
recurse => true,
purge_config => true,
fqdnlookup => false,
interval => $lma_collector::params::collectd_interval,
}
class { 'collectd::plugin::logfile':
log_level => 'warning',
log_file => $lma_collector::params::collectd_logfile,
}
class { 'collectd::plugin::write_http':
urls => {
"http://127.0.0.1:${port}" => {
'format' => 'JSON',
storerates => true
}
}
}
class { 'collectd::plugin::cpu':
}
# TODO: pass this list as a parameter or add a custom fact
class { 'collectd::plugin::df':
mountpoints => ['/', '/boot'],
}
$block_devices = join(split($::blockdevices, ','), '|')
class { 'collectd::plugin::disk':
disks => [ "/^${ block_devices }$/" ],
}
class { 'collectd::plugin::interface':
interfaces => grep(split($::interfaces, ','), '^eth\d+$')
}
class { 'collectd::plugin::load':
}
class { 'collectd::plugin::memory':
}
class { 'collectd::plugin::processes':
}
class { 'collectd::plugin::swap':
}
class { 'collectd::plugin::users':
}
file { "/etc/logrotate.d/collectd":
ensure => present,
content => "${lma_collector::params::collectd_logfile} {\n daily\n missingok\n}"
}
heka::decoder::sandbox { 'collectd':
config_dir => $lma_collector::params::config_dir,
filename => "${lma_collector::params::plugins_dir}/decoders/collectd.lua" ,
notify => Class['lma_collector::service'],
}
heka::input::httplisten { 'collectd':
config_dir => $lma_collector::params::config_dir,
address => '127.0.0.1',
port => $port,
decoder => 'collectd',
require => Heka::Decoder::Sandbox['collectd'],
notify => Class['lma_collector::service'],
}
}

View File

@ -0,0 +1,55 @@
class lma_collector::collectd::controller (
$service_user = $lma_collector::params::openstack_user,
$service_password = $lma_collector::params::openstack_password,
$service_tenant = $lma_collector::params::openstack_tenant,
$keystone_url = $lma_collector::params::keystone_url,
$nova_cpu_allocation_ratio = $lma_collector::params::nova_cpu_allocation_ratio,
$rabbitmq_pid_file = $lma_collector::params::rabbitmq_pid_file,
) inherits lma_collector::params {
include collectd::params
include lma_collector::collectd::service
# We can't use the collectd::plugin::python type here because it doesn't
# support the configuration of multiple Python plugins yet.
# See https://github.com/pdxcat/puppet-module-collectd/issues/227
$modules = {
'rabbitmq_info' => {
'PidFile' => $rabbitmq_pid_file,
},
'check_openstack_api' => {
'Username' => $service_user,
'Password' => $service_password,
'Tenant' => $service_tenant,
'KeystoneUrl' => $keystone_url,
'Timeout' => $lma_collector::params::openstack_client_timeout,
},
'hypervisor_stats' => {
'Username' => $service_user,
'Password' => $service_password,
'Tenant' => $service_tenant,
'KeystoneUrl' => $keystone_url,
'Timeout' => $lma_collector::params::openstack_client_timeout,
'CpuAllocationRatio' => $nova_cpu_allocation_ratio,
},
}
file {"${collectd::params::plugin_conf_dir}/openstack.conf":
owner => 'root',
group => $collectd::params::root_group,
mode => '0644',
content => template('lma_collector/collectd_python.conf.erb'),
notify => Class['lma_collector::collectd::service'],
}
lma_collector::collectd::python_script { "rabbitmq_info.py":
}
lma_collector::collectd::python_script { "check_openstack_api.py":
}
lma_collector::collectd::python_script { "hypervisor_stats.py":
}
lma_collector::collectd::python_script { "openstack.py":
}
}

View File

@ -0,0 +1,13 @@
class lma_collector::collectd::mysql (
$username = $lma_collector::params::mysql_username,
$password = $lma_collector::params::mysql_password,
) inherits lma_collector::params {
include lma_collector::collectd::service
collectd::plugin::mysql::database { 'openstack':
host => 'localhost',
username => $username,
password => $password,
notify => Class['lma_collector::collectd::service'],
}
}

View File

@ -0,0 +1,15 @@
define lma_collector::collectd::python_script {
include collectd::params
include lma_collector::params
include lma_collector::collectd::service
$python_module_path = $lma_collector::params::python_module_path
file { "${python_module_path}/${title}":
owner => 'root',
group => $collectd::params::root_group,
mode => '0644',
source => "puppet:///modules/lma_collector/collectd/${title}",
notify => Class['lma_collector::collectd::service'],
}
}

View File

@ -0,0 +1,39 @@
# Class: lma_collector::collectd::service
#
# Manages the collectd daemon
#
# Sample Usage:
#
# sometype { 'foo':
# notify => Class['lma_collector::collectd::service'],
# }
#
#
class lma_collector::collectd::service (
$service_enable = true,
$service_ensure = 'running',
$service_manage = true,
) {
include collectd::params
validate_bool($service_enable)
validate_bool($service_manage)
case $service_ensure {
true, false, 'running', 'stopped': {
$_service_ensure = $service_ensure
}
default: {
$_service_ensure = undef
}
}
if $service_manage {
service { 'collectd':
ensure => $_service_ensure,
name => $service_name,
enable => $service_enable,
}
}
}

View File

@ -0,0 +1,39 @@
class lma_collector::influxdb (
$server = $lma_collector::params::influxdb_server,
$port = $lma_collector::params::influxdb_port,
$database = $lma_collector::params::influxdb_database,
$user = $lma_collector::params::influxdb_user,
$password = $lma_collector::params::influxdb_password,
) inherits lma_collector::params {
include lma_collector::service
validate_string($server)
heka::filter::sandbox { 'influxdb_accumulator':
config_dir => $lma_collector::params::config_dir,
filename => "${lma_collector::params::plugins_dir}/filters/influxdb_accumulator.lua",
message_matcher => "Type == 'metric' || Type == 'heka.sandbox.metric'",
ticker_interval => 1,
config => {
flush_interval => $lma_collector::params::influxdb_flush_interval,
flush_count => $lma_collector::params::influxdb_flush_count,
},
notify => Class['lma_collector::service'],
}
heka::encoder::payload { 'influxdb':
config_dir => $lma_collector::params::config_dir,
notify => Class['lma_collector::service'],
}
heka::output::http { 'influxdb':
config_dir => $lma_collector::params::config_dir,
url => "http://${server}:${port}/db/${database}/series",
message_matcher => "Fields[payload_type] == 'json' && Fields[payload_name] == 'influxdb'",
username => $user,
password => $password,
timeout => $lma_collector::params::influxdb_timeout,
require => [Heka::Encoder::Payload['influxdb'], Heka::Filter::Sandbox['influxdb_accumulator']],
notify => Class['lma_collector::service'],
}
}

View File

@ -77,4 +77,10 @@ class lma_collector (
notify => Class['lma_collector::service'],
require => File[$plugins_dir]
}
if size($lma_collector::params::additional_packages) > 0 {
package { $lma_collector::params::additional_packages:
ensure => present,
}
}
}

View File

@ -30,6 +30,36 @@ class lma_collector::params {
$openstack_topic = 'notifications'
$notification_driver = 'messaging'
# collectd parameters
$collectd_port = "8325"
$collectd_interval = 10
$collectd_logfile = "/var/log/collectd.log"
case $::osfamily {
'Debian': {
$python_module_path = '/usr/lib/collectd'
}
'RedHat': {
$python_module_path = '/usr/lib64/collectd'
}
}
$additional_packages = [ 'python-dateutil' ]
$mysql_username = ''
$mysql_password = ''
$rabbitmq_pid_file = '/var/run/rabbitmq/pid'
$openstack_user = ''
$openstack_password = ''
$openstack_tenant = ''
$openstack_url = "http://127.0.0.1:5000/v2.0/"
$openstack_client_timeout = 5
$nova_cpu_allocation_ratio = 8.0
$elasticsearch_server = false
$elasticsearch_port = '9200'
$influxdb_server = false
$influxdb_port = '8086'
$influxdb_database = 'lma'
$influxdb_user = 'lma'
$influxdb_password = 'lmapass'
$influxdb_timeout = 5
}

View File

@ -0,0 +1,21 @@
<LoadPlugin "python">
Globals true
</LoadPlugin>
<Plugin "python">
ModulePath "<%= @python_module_path %>"
<% @modules.sort.each do |module_name,config| -%>
Import "<%= module_name %>"
<% end -%>
<% @modules.sort.each do |module_name,config| -%>
<Module "<%= module_name %>">
Verbose false
<% config.sort.each do |key,value| -%>
<%= key -%> "<%= value -%>"
<% end -%>
</Module>
<% end -%>
</Plugin>

View File

@ -41,7 +41,7 @@ attributes:
restrictions:
- condition: "settings:lma_collector.elasticsearch_mode.value != 'remote'"
action: "disable"
regex:
regex: &node_address_regex
source: '^[a-zA-Z\d][a-zA-Z\d_\-.]+$'
error: "Invalid address or name"
@ -50,3 +50,65 @@ attributes:
weight: 50
value: false
label: "Collect OpenStack notifications"
influxdb_mode:
type: "radio"
weight: 60
value: "disabled"
label: "Metric analytics"
values:
- data: "disabled"
label: "Disabled"
- data: "local"
label: "Local node"
- data: "remote"
label: "Remote server"
# Hide all InfluxDB parameters for now
# Change action from "hide" to "none" to change this
restrictions: &hide_control
- condition: true
action: "hide"
influxdb_node_name:
value: 'influxdb'
label: "InfluxDB node's name"
description: 'Label of the node running the InfluxDB/Grafana plugin that is deployed in the environment.'
weight: 65
type: "text"
restrictions: *hide_control
influxdb_address:
value: ''
label: 'InfluxDB address'
description: 'IP address or fully qualified domain name of the InfluxDB server.'
weight: 70
type: "text"
regex: *node_address_regex
restrictions: *hide_control
influxdb_database:
value: 'lma'
label: 'InfluxDB database name'
description: ''
weight: 75
type: "text"
regex: &not_empty_parameter
source: '\S'
error: "Invalid value"
restrictions: *hide_control
influxdb_user:
value: 'lma'
label: 'InfluxDB user'
description: ''
weight: 80
type: "text"
regex: *not_empty_parameter
restrictions: *hide_control
influxdb_password:
value: 'lmapass'
label: 'InfluxDB password'
description: ''
weight: 85
type: "password"
regex: *not_empty_parameter
restrictions: *hide_control