diff --git a/deployment_scripts/puppet/manifests/base.pp b/deployment_scripts/puppet/manifests/base.pp index f5b6d3aa5..2301948f9 100644 --- a/deployment_scripts/puppet/manifests/base.pp +++ b/deployment_scripts/puppet/manifests/base.pp @@ -176,6 +176,11 @@ case $influxdb_mode { password => $influxdb_password, require => Class['lma_collector'], } + + class { 'lma_collector::metrics::heka_monitoring': + require => Class['lma_collector'] + } + } 'disabled': { # Nothing to do diff --git a/deployment_scripts/puppet/modules/heka/manifests/output/dashboard.pp b/deployment_scripts/puppet/modules/heka/manifests/output/dashboard.pp new file mode 100644 index 000000000..5b5d96451 --- /dev/null +++ b/deployment_scripts/puppet/modules/heka/manifests/output/dashboard.pp @@ -0,0 +1,32 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +define heka::output::dashboard ( + $config_dir, + $dashboard_address = '0.0.0.0', + $dashboard_port = '4352', + $ticker_interval = '30', +) { + + include heka::params + + file { "${config_dir}/output-${title}.toml": + ensure => $ensure, + content => template('heka/output/dashboard.toml.erb'), + mode => '0600', + owner => $heka::params::user, + group => $heka::params::user, + } +} + diff --git a/deployment_scripts/puppet/modules/heka/templates/output/dashboard.toml.erb b/deployment_scripts/puppet/modules/heka/templates/output/dashboard.toml.erb index 4584f7794..3f45495b7 100644 --- a/deployment_scripts/puppet/modules/heka/templates/output/dashboard.toml.erb +++ b/deployment_scripts/puppet/modules/heka/templates/output/dashboard.toml.erb @@ -1,3 +1,3 @@ [DashboardOutput] address = "<%= @dashboard_address %>:<%= @dashboard_port %>" - +ticker_interval = <%= @ticker_interval %> diff --git a/deployment_scripts/puppet/modules/lma_collector/files/plugins/decoders/collectd.lua b/deployment_scripts/puppet/modules/lma_collector/files/plugins/decoders/collectd.lua index 72703b9f7..023038e58 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/plugins/decoders/collectd.lua +++ b/deployment_scripts/puppet/modules/lma_collector/files/plugins/decoders/collectd.lua @@ -18,6 +18,19 @@ local utils = require 'lma_utils' local sep = '.' +local processes_map = { + ps_code = 'memory.code', + ps_count = 'threads', + ps_cputime = 'cputime', + ps_data = 'memory.data', + ps_disk_octets = 'disk.bytes', + ps_disk_ops = 'disk.ops', + ps_pagefaults = 'pagefaults', + ps_rss = 'memory.rss', + ps_stacksize = 'stacksize', + ps_vm = 'memory.virtual', +} + function process_message () local ok, samples = pcall(cjson.decode, read_message("Payload")) if not ok then @@ -76,13 +89,20 @@ function process_message () elseif metric_source == 'processes' then if sample['plugin_instance'] == '' then msg['Fields']['name'] = 'processes' + if sample['type'] == 'ps_state' then + msg['Fields']['name'] = msg['Fields']['name'] .. sep .. 'state' .. sep .. sample['type_instance'] + else + msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type'] + end else msg['Fields']['name'] = 'lma_components' .. sep .. sample['plugin_instance'] - end - if sample['type'] == 'ps_state' then - msg['Fields']['name'] = msg['Fields']['name'] .. sep .. 'state' .. sep .. sample['type_instance'] - else - msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type'] + if processes_map[sample['type']] then + msg['Fields']['name'] = msg['Fields']['name'] .. sep .. processes_map[sample['type']] + else + -- If we are here then we need to add the missing value in processes_map + -- to fix it. + msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type'] + end end elseif metric_source == 'dbi' and sample['plugin_instance'] == 'mysql_status' then msg['Fields']['name'] = 'mysql' .. sep .. sample['type_instance'] diff --git a/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/heka_monitoring.lua b/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/heka_monitoring.lua new file mode 100644 index 000000000..282429e3a --- /dev/null +++ b/deployment_scripts/puppet/modules/lma_collector/files/plugins/filters/heka_monitoring.lua @@ -0,0 +1,80 @@ +-- Copyright 2015 Mirantis, Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +require 'cjson' +require 'string' +require 'math' +local utils = require 'lma_utils' + +function process_table(datapoints, timestamp, hostname, kind, array) + -- NOTE: It has been written for "filters" and "decoders". If we need + -- to use it to process other part of the Heka pipeline we need to ensure + -- that JSON provides names and table with ProcessMessageCount and + -- ProcessMessageAvgDuration: + -- "decoder": { + -- ... + -- }, + -- "Name": "a name", + -- "ProcessMessageCount" : { + -- "representation": "count", + -- "value": 12 + -- }, + -- "ProcessMessageAvgDuration" : { + -- "representation": "ns", + -- "value": 192913 + -- }, + -- { ... }} + for _, v in pairs(array) do + if type(v) == "table" then + + name = v['Name']:gsub("_" .. kind, "") + msgCount = v['ProcessMessageCount']['value'] + avgDuration = v['ProcessMessageAvgDuration']['value'] + + utils.add_metric(datapoints, + string.format('%s.lma_components.hekad.%s.%s.count', hostname, kind, name), + {timestamp, msgCount}) + utils.add_metric(datapoints, + string.format('%s.lma_components.hekad.%s.%s.duration', hostname, kind, name), + {timestamp, avgDuration}) + end + end +end + +function process_message () + local ok, json = pcall(cjson.decode, read_message("Payload")) + if not ok then + return -1 + end + + local hostname = read_message("Hostname") + local ts = read_message("Timestamp") + local ts_ms = math.floor(ts/1e6) + local datapoints = {} + + for k, v in pairs(json) do + if k == "filters" or k == "decoders" then + -- remove the last character from k + process_table(datapoints, ts_ms, hostname, k:sub(1, -2), v) + end + end + + if #datapoints > 0 then + inject_payload("json", "influxdb", cjson.encode(datapoints)) + return 0 + end + + -- We should not reach this point + return -1 + +end diff --git a/deployment_scripts/puppet/modules/lma_collector/manifests/metrics/heka_monitoring.pp b/deployment_scripts/puppet/modules/lma_collector/manifests/metrics/heka_monitoring.pp new file mode 100644 index 000000000..7ad49c9f7 --- /dev/null +++ b/deployment_scripts/puppet/modules/lma_collector/manifests/metrics/heka_monitoring.pp @@ -0,0 +1,34 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +class lma_collector::metrics::heka_monitoring ( + $dashboard_address = $lma_collector::params::dashboard_address, + $dashboard_port = $lma_collector::params::dashboard_port, +){ + include lma_collector::service + + heka::filter::sandbox { 'heka_monitoring': + config_dir => $lma_collector::params::config_dir, + filename => "${lma_collector::params::plugins_dir}/filters/heka_monitoring.lua", + message_matcher => "Type == 'heka.all-report'", + notify => Class['lma_collector::service'], + } + + # Dashboard is required to enable monitoring messages + heka::output::dashboard { 'dashboard': + config_dir => $lma_collector::params::config_dir, + dashboard_address => $dashboard_address, + dashboard_port => $dashboard_port, + } +} diff --git a/deployment_scripts/puppet/modules/lma_collector/manifests/params.pp b/deployment_scripts/puppet/modules/lma_collector/manifests/params.pp index 952cf9f04..a98416508 100644 --- a/deployment_scripts/puppet/modules/lma_collector/manifests/params.pp +++ b/deployment_scripts/puppet/modules/lma_collector/manifests/params.pp @@ -17,6 +17,10 @@ class lma_collector::params { $config_dir = "/etc/${service_name}" $plugins_dir = "/usr/share/${service_name}" + # Address and port of the Heka dashboard for health reports. + $dashboard_address = '127.0.0.1' + $dashboard_port = '4352' + $tags = {} $syslog_pattern = '<%PRI%>%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg%\n' diff --git a/doc/source/metrics/lma.rst b/doc/source/metrics/lma.rst index c4ef4504e..25904fd1c 100644 --- a/doc/source/metrics/lma.rst +++ b/doc/source/metrics/lma.rst @@ -1,15 +1,29 @@ .. _LMA_self-monitoring: -* ``lma_components..ps_code``, physical memory devoted to executable code (bytes). -* ``lma_components..ps_count``, number of threads currently running. -* ``lma_components..ps_cputime``, time that this process has been scheduled in user/system mode in the last interval (in microseconds). -* ``lma_components..ps_data``, physical memory devoted to other than executable code (bytes). -* ``lma_components..ps_disk_octets``, number of bytes the task has caused to be read or written from storage in the last interval. -* ``lma_components..ps_disk_ops``, number of read and write I/O operations in the last interval, i.e. syscalls like read(), pread(). -* ``lma_components..ps_pagefaults``, minor and major page faults in the last interval. -* ``lma_components..ps_rss``, non-swapped physical memory used (bytes). -* ``lma_components..ps_stacksize``, absolute value of the address of the start (i.e., bottom) of the stack minus the current value of the stack pointer. -* ``lma_components..ps_vm``, virtual memory size (bytes). +Processes and memory +^^^^^^^^^^^^^^^^^^^^ -Where ```` is *hekad*, *collectd*, *influxdb* or *elasticsearch* +* ``lma_components..cputime``, time that this process has been scheduled in user/system mode in the last interval (in microseconds). +* ``lma_components..disk.bytes``, number of bytes the task has caused to be read or written from storage in the last interval. +* ``lma_components..disk.ops``, number of read and write I/O operations in the last interval, i.e. syscalls like read(), pread(). +* ``lma_components..memory.code``, physical memory devoted to executable code (bytes). +* ``lma_components..memory.data``, physical memory devoted to other than executable code (bytes). +* ``lma_components..memory.rss``, non-swapped physical memory used (bytes). +* ``lma_components..memory.vm``, virtual memory size (bytes). +* ``lma_components..pagefaults``, minor and major page faults in the last interval. +* ``lma_components..stacksize``, absolute value of the address of the start (i.e., bottom) of the stack minus the current value of the stack pointer. +* ``lma_components..threads``, number of threads currently running. + +Where ```` is *hekad*, *collectd*, *influxdb* or *elasticsearch* depending of what is running on the node. + + +Heka pipeline +^^^^^^^^^^^^^ + +* ``lma_components.hekad.decoder..count``, the total number of messages processed by the decoder. This will reset to 0 when the process is restarted. +* ``lma_components.hekad.decoder..duration``, the average time for processing the message (in nanoseconds). +* ``lma_components.hekad.filter..count``, the total number of messages processed by the filter. This will reset to 0 when the process is restarted. +* ``lma_components.hekad.filter..duration``, the average time for processing the message (in nanoseconds). + +```` is the internal name of the decoder or the filter used by *Heka*.