Add the monitoring of Heka messages

This change adds information about the number of messages that are
processed by every decoder and filter of the LMA collector and also
the average time it takes to process these messages.

Change-Id: Iad122336fe8fcb6fa3f4d8ed666b276f6db70a43
This commit is contained in:
Guillaume Thouvenin 2015-07-03 11:47:18 +02:00
parent b1bb4c49df
commit becf4b5ebc
8 changed files with 206 additions and 17 deletions

View File

@ -176,6 +176,11 @@ case $influxdb_mode {
password => $influxdb_password,
require => Class['lma_collector'],
}
class { 'lma_collector::metrics::heka_monitoring':
require => Class['lma_collector']
}
}
'disabled': {
# Nothing to do

View File

@ -0,0 +1,32 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
define heka::output::dashboard (
$config_dir,
$dashboard_address = '0.0.0.0',
$dashboard_port = '4352',
$ticker_interval = '30',
) {
include heka::params
file { "${config_dir}/output-${title}.toml":
ensure => $ensure,
content => template('heka/output/dashboard.toml.erb'),
mode => '0600',
owner => $heka::params::user,
group => $heka::params::user,
}
}

View File

@ -1,3 +1,3 @@
[DashboardOutput]
address = "<%= @dashboard_address %>:<%= @dashboard_port %>"
ticker_interval = <%= @ticker_interval %>

View File

@ -18,6 +18,19 @@ local utils = require 'lma_utils'
local sep = '.'
local processes_map = {
ps_code = 'memory.code',
ps_count = 'threads',
ps_cputime = 'cputime',
ps_data = 'memory.data',
ps_disk_octets = 'disk.bytes',
ps_disk_ops = 'disk.ops',
ps_pagefaults = 'pagefaults',
ps_rss = 'memory.rss',
ps_stacksize = 'stacksize',
ps_vm = 'memory.virtual',
}
function process_message ()
local ok, samples = pcall(cjson.decode, read_message("Payload"))
if not ok then
@ -76,13 +89,20 @@ function process_message ()
elseif metric_source == 'processes' then
if sample['plugin_instance'] == '' then
msg['Fields']['name'] = 'processes'
if sample['type'] == 'ps_state' then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. 'state' .. sep .. sample['type_instance']
else
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
end
else
msg['Fields']['name'] = 'lma_components' .. sep .. sample['plugin_instance']
end
if sample['type'] == 'ps_state' then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. 'state' .. sep .. sample['type_instance']
else
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
if processes_map[sample['type']] then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. processes_map[sample['type']]
else
-- If we are here then we need to add the missing value in processes_map
-- to fix it.
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
end
end
elseif metric_source == 'dbi' and sample['plugin_instance'] == 'mysql_status' then
msg['Fields']['name'] = 'mysql' .. sep .. sample['type_instance']

View File

@ -0,0 +1,80 @@
-- Copyright 2015 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require 'cjson'
require 'string'
require 'math'
local utils = require 'lma_utils'
function process_table(datapoints, timestamp, hostname, kind, array)
-- NOTE: It has been written for "filters" and "decoders". If we need
-- to use it to process other part of the Heka pipeline we need to ensure
-- that JSON provides names and table with ProcessMessageCount and
-- ProcessMessageAvgDuration:
-- "decoder": {
-- ...
-- },
-- "Name": "a name",
-- "ProcessMessageCount" : {
-- "representation": "count",
-- "value": 12
-- },
-- "ProcessMessageAvgDuration" : {
-- "representation": "ns",
-- "value": 192913
-- },
-- { ... }}
for _, v in pairs(array) do
if type(v) == "table" then
name = v['Name']:gsub("_" .. kind, "")
msgCount = v['ProcessMessageCount']['value']
avgDuration = v['ProcessMessageAvgDuration']['value']
utils.add_metric(datapoints,
string.format('%s.lma_components.hekad.%s.%s.count', hostname, kind, name),
{timestamp, msgCount})
utils.add_metric(datapoints,
string.format('%s.lma_components.hekad.%s.%s.duration', hostname, kind, name),
{timestamp, avgDuration})
end
end
end
function process_message ()
local ok, json = pcall(cjson.decode, read_message("Payload"))
if not ok then
return -1
end
local hostname = read_message("Hostname")
local ts = read_message("Timestamp")
local ts_ms = math.floor(ts/1e6)
local datapoints = {}
for k, v in pairs(json) do
if k == "filters" or k == "decoders" then
-- remove the last character from k
process_table(datapoints, ts_ms, hostname, k:sub(1, -2), v)
end
end
if #datapoints > 0 then
inject_payload("json", "influxdb", cjson.encode(datapoints))
return 0
end
-- We should not reach this point
return -1
end

View File

@ -0,0 +1,34 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
class lma_collector::metrics::heka_monitoring (
$dashboard_address = $lma_collector::params::dashboard_address,
$dashboard_port = $lma_collector::params::dashboard_port,
){
include lma_collector::service
heka::filter::sandbox { 'heka_monitoring':
config_dir => $lma_collector::params::config_dir,
filename => "${lma_collector::params::plugins_dir}/filters/heka_monitoring.lua",
message_matcher => "Type == 'heka.all-report'",
notify => Class['lma_collector::service'],
}
# Dashboard is required to enable monitoring messages
heka::output::dashboard { 'dashboard':
config_dir => $lma_collector::params::config_dir,
dashboard_address => $dashboard_address,
dashboard_port => $dashboard_port,
}
}

View File

@ -17,6 +17,10 @@ class lma_collector::params {
$config_dir = "/etc/${service_name}"
$plugins_dir = "/usr/share/${service_name}"
# Address and port of the Heka dashboard for health reports.
$dashboard_address = '127.0.0.1'
$dashboard_port = '4352'
$tags = {}
$syslog_pattern = '<%PRI%>%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg%\n'

View File

@ -1,15 +1,29 @@
.. _LMA_self-monitoring:
* ``lma_components.<process name>.ps_code``, physical memory devoted to executable code (bytes).
* ``lma_components.<process name>.ps_count``, number of threads currently running.
* ``lma_components.<process name>.ps_cputime``, time that this process has been scheduled in user/system mode in the last interval (in microseconds).
* ``lma_components.<process name>.ps_data``, physical memory devoted to other than executable code (bytes).
* ``lma_components.<process name>.ps_disk_octets``, number of bytes the task has caused to be read or written from storage in the last interval.
* ``lma_components.<process name>.ps_disk_ops``, number of read and write I/O operations in the last interval, i.e. syscalls like read(), pread().
* ``lma_components.<process name>.ps_pagefaults``, minor and major page faults in the last interval.
* ``lma_components.<process name>.ps_rss``, non-swapped physical memory used (bytes).
* ``lma_components.<process name>.ps_stacksize``, absolute value of the address of the start (i.e., bottom) of the stack minus the current value of the stack pointer.
* ``lma_components.<process name>.ps_vm``, virtual memory size (bytes).
Processes and memory
^^^^^^^^^^^^^^^^^^^^
Where ``<process name>`` is *hekad*, *collectd*, *influxdb* or *elasticsearch*
* ``lma_components.<service>.cputime``, time that this process has been scheduled in user/system mode in the last interval (in microseconds).
* ``lma_components.<service>.disk.bytes``, number of bytes the task has caused to be read or written from storage in the last interval.
* ``lma_components.<service>.disk.ops``, number of read and write I/O operations in the last interval, i.e. syscalls like read(), pread().
* ``lma_components.<service>.memory.code``, physical memory devoted to executable code (bytes).
* ``lma_components.<service>.memory.data``, physical memory devoted to other than executable code (bytes).
* ``lma_components.<service>.memory.rss``, non-swapped physical memory used (bytes).
* ``lma_components.<service>.memory.vm``, virtual memory size (bytes).
* ``lma_components.<service>.pagefaults``, minor and major page faults in the last interval.
* ``lma_components.<service>.stacksize``, absolute value of the address of the start (i.e., bottom) of the stack minus the current value of the stack pointer.
* ``lma_components.<service>.threads``, number of threads currently running.
Where ``<service>`` is *hekad*, *collectd*, *influxdb* or *elasticsearch*
depending of what is running on the node.
Heka pipeline
^^^^^^^^^^^^^
* ``lma_components.hekad.decoder.<name>.count``, the total number of messages processed by the decoder. This will reset to 0 when the process is restarted.
* ``lma_components.hekad.decoder.<name>.duration``, the average time for processing the message (in nanoseconds).
* ``lma_components.hekad.filter.<name>.count``, the total number of messages processed by the filter. This will reset to 0 when the process is restarted.
* ``lma_components.hekad.filter.<name>.duration``, the average time for processing the message (in nanoseconds).
``<name>`` is the internal name of the decoder or the filter used by *Heka*.