Add support for bulk metric message

This change introduces a new type of Heka message called 'bulk_metric'.

A bulk metric message can be emitted by any filter plugin using the
add_to_metric() and inject_bulk_metric() function from the lma_utils
module:

  local ts = read_message('Timestamp')
  utils.add_to_metric('foo', 1, {tag1 = value1})
  utils.add_to_metric('bar', 2, {})
  utils.inject_bulk_metric(ts, 'node-1', 'custom_filter')

The structure of the message injected in the Heka pipeline will be:

  Timestamp: <ts>
  Severity: INFO
  Hostname: node-1
  Payload: >
    [{"name":"foo","value":1,"tags":{"tag1":"value1"}},
    {"name":"bar","value":2,"tags":[]}]
  Fields:
    - source: custom_filter
    - hostname: node-1

Eventually the bulk metric message is caught by the InfluxDB
accumulator filter and encoded using the InfluxDB line protocol.

Change-Id: I96986fd8287d65ae018c7636f9dd745dba2fc761
Implements: blueprint upgrade-influxdb-grafana
This commit is contained in:
Simon Pasquier
2015-08-04 14:59:10 +02:00
parent 9fb614e7c2
commit 8f268f04ff
7 changed files with 248 additions and 75 deletions

View File

@@ -181,6 +181,10 @@ case $influxdb_mode {
password => $influxdb_password,
require => Class['lma_collector'],
}
class { 'lma_collector::metrics::heka_monitoring_legacy':
require => Class['lma_collector']
}
} else {
class { 'lma_collector::collectd::base':
processes => $processes,
@@ -196,10 +200,10 @@ case $influxdb_mode {
password => $influxdb_password,
require => Class['lma_collector'],
}
}
class { 'lma_collector::metrics::heka_monitoring':
require => Class['lma_collector']
class { 'lma_collector::metrics::heka_monitoring':
require => Class['lma_collector']
}
}
}

View File

@@ -11,11 +11,14 @@
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
local cjson = require 'cjson'
local string = require 'string'
local extra = require 'extra_fields'
local patt = require 'patterns'
local pairs = pairs
local inject_message = inject_message
local read_message = read_message
local pcall = pcall
local M = {}
setfenv(1, M) -- Remove external access to contain everything in the module
@@ -105,6 +108,49 @@ function add_metric(datapoints, name, points)
}
end
local bulk_datapoints = {}
-- Add a datapoint to the bulk metric message
function add_to_bulk_metric(name, value, tags)
bulk_datapoints[#bulk_datapoints+1] = {
name = name,
value = value,
tags = tags or {},
}
end
-- Send the bulk metric message to the Heka pipeline
function inject_bulk_metric(ts, hostname, source)
if #bulk_datapoints == 0 then
return
end
local msg = {
Hostname = hostname,
Timestamp = ts,
Payload = cjson.encode(bulk_datapoints),
Type = 'bulk_metric', -- prepended with 'heka.sandbox'
Severity = label_to_severity_map.INFO,
Fields = {
hostname = hostname,
source = source
}
}
-- reset the local table storing the datapoints
bulk_datapoints = {}
inject_tags(msg)
inject_message(msg)
end
-- Return a list of datapoints, each point being a table formatted like this:
-- {name='foo',value=1,tags={k1=v1,...}}
function decode_bulk_metric()
local ok, datapoints = pcall(cjson.decode, read_message("Payload"))
return datapoints or {}
end
local global_status_to_severity_map = {
[global_status_map.OKAY] = label_to_severity_map.INFO,
[global_status_map.WARN] = label_to_severity_map.WARNING,

View File

@@ -16,11 +16,12 @@ require 'string'
require 'math'
local utils = require 'lma_utils'
function process_table(datapoints, timestamp, hostname, kind, array)
-- NOTE: It has been written for "filters" and "decoders". If we need
-- to use it to process other part of the Heka pipeline we need to ensure
-- that JSON provides names and table with ProcessMessageCount and
-- ProcessMessageAvgDuration:
function process_table(typ, array)
-- NOTE: It has been written for "filters" and "decoders". If we need to
-- use it to collect metrics from other components of the Heka pipeline,
-- we need to ensure that JSON provides names and table with
-- ProcessMessageCount and ProcessMessageAvgDuration:
--
-- "decoder": {
-- ...
-- },
@@ -34,47 +35,44 @@ function process_table(datapoints, timestamp, hostname, kind, array)
-- "value": 192913
-- },
-- { ... }}
--
for _, v in pairs(array) do
if type(v) == "table" then
-- strip off the '_decoder'/'_filter' suffix
local name = v['Name']:gsub("_" .. typ, "")
name = v['Name']:gsub("_" .. kind, "")
local tags = {
['type'] = typ,
['name'] = name,
}
msgCount = v['ProcessMessageCount']['value']
avgDuration = v['ProcessMessageAvgDuration']['value']
utils.add_metric(datapoints,
string.format('%s.lma_components.hekad.%s.%s.count', hostname, kind, name),
{timestamp, msgCount})
utils.add_metric(datapoints,
string.format('%s.lma_components.hekad.%s.%s.duration', hostname, kind, name),
{timestamp, avgDuration})
utils.add_to_bulk_metric('hekad_msg_count', v['ProcessMessageCount']['value'], tags)
utils.add_to_bulk_metric('hekad_msg_avg_duration', v['ProcessMessageAvgDuration']['value'], tags)
end
end
end
function singularize(str)
return str:gsub('s$', '')
end
function process_message ()
local ok, json = pcall(cjson.decode, read_message("Payload"))
local ok, data = pcall(cjson.decode, read_message("Payload"))
if not ok then
return -1
end
local hostname = read_message("Hostname")
local ts = read_message("Timestamp")
local ts_ms = math.floor(ts/1e6)
local datapoints = {}
for k, v in pairs(json) do
for k, v in pairs(data) do
if k == "filters" or k == "decoders" then
-- remove the last character from k
process_table(datapoints, ts_ms, hostname, k:sub(1, -2), v)
process_table(singularize(k), v)
end
end
if #datapoints > 0 then
inject_payload("json", "influxdb", cjson.encode(datapoints))
return 0
end
-- We should not reach this point
return -1
utils.inject_bulk_metric(ts, hostname, 'heka_monitoring')
return 0
end

View File

@@ -0,0 +1,80 @@
-- Copyright 2015 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require 'cjson'
require 'string'
require 'math'
local utils = require 'lma_utils'
function process_table(datapoints, timestamp, hostname, kind, array)
-- NOTE: It has been written for "filters" and "decoders". If we need
-- to use it to process other part of the Heka pipeline we need to ensure
-- that JSON provides names and table with ProcessMessageCount and
-- ProcessMessageAvgDuration:
-- "decoder": {
-- ...
-- },
-- "Name": "a name",
-- "ProcessMessageCount" : {
-- "representation": "count",
-- "value": 12
-- },
-- "ProcessMessageAvgDuration" : {
-- "representation": "ns",
-- "value": 192913
-- },
-- { ... }}
for _, v in pairs(array) do
if type(v) == "table" then
name = v['Name']:gsub("_" .. kind, "")
msgCount = v['ProcessMessageCount']['value']
avgDuration = v['ProcessMessageAvgDuration']['value']
utils.add_metric(datapoints,
string.format('%s.lma_components.hekad.%s.%s.count', hostname, kind, name),
{timestamp, msgCount})
utils.add_metric(datapoints,
string.format('%s.lma_components.hekad.%s.%s.duration', hostname, kind, name),
{timestamp, avgDuration})
end
end
end
function process_message ()
local ok, json = pcall(cjson.decode, read_message("Payload"))
if not ok then
return -1
end
local hostname = read_message("Hostname")
local ts = read_message("Timestamp")
local ts_ms = math.floor(ts/1e6)
local datapoints = {}
for k, v in pairs(json) do
if k == "filters" or k == "decoders" then
-- remove the last character from k
process_table(datapoints, ts_ms, hostname, k:sub(1, -2), v)
end
end
if #datapoints > 0 then
inject_payload("json", "influxdb", cjson.encode(datapoints))
return 0
end
-- We should not reach this point
return -1
end

View File

@@ -11,10 +11,12 @@
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require 'cjson'
require 'os'
require 'string'
require 'table'
local field_util = require 'field_util'
local utils = require 'lma_utils'
local l = require 'lpeg'
l.locale(l)
@@ -39,20 +41,6 @@ function escape_string(str)
return tostring(str):gsub("([ ,])", "\\%1")
end
function encode_tag(tag, default_value)
local value = read_message(string.format('Fields[%s]', tag))
if not value or value == '' then
if default_value ~= nil then
value = default_value
else
return nil
end
end
return escape_string(tag) .. '=' .. escape_string(value)
end
-- Flush the datapoints to InfluxDB if enough items are present or if the
-- timeout has expired
function flush ()
@@ -66,49 +54,53 @@ function flush ()
end
end
-- TODO(pasquier-s): support messages with multiple points
-- TODO(pasquier-s): support points with multiple fields
function process_message()
local point
local tags = {}
local value = read_message("Fields[value]")
local ts
if time_precision and time_precision ~= 'ns' then
ts = field_util.message_timestamp(time_precision)
local msg_type = read_message("Type")
if msg_type:match('bulk_metric$') then
return process_bulk_metric()
else
ts = read_message('Timestamp')
return process_single_metric()
end
end
if value == nil then
function process_single_metric()
local tags = {}
local name = read_message("Fields[name]")
local value = read_message("Fields[value]")
if value == nil or name == nil then
return -1
end
-- collect Fields[tag_fields]
local msg_tag_fields = {}
local i = 0
while true do
local t = read_message("Fields[tag_fields]", 0, i)
if not t then
break
end
table.insert(msg_tag_fields, t)
tags[t] = read_message(string.format('Fields[%s]', t))
i = i + 1
end
-- Add common tags
for _, tag in ipairs(tag_fields) do
local t = encode_tag(tag, defaults[tag])
if t then
table.insert(tags, t)
end
end
encode_datapoint(name, value, tags)
flush()
-- Add specific tags
for _, tag in ipairs(msg_tag_fields) do
local t = encode_tag(tag)
if t then
table.insert(tags, t)
end
return 0
end
-- name: the measurement's name
-- value: the datapoint's value
-- tags: table of tags
--
-- Timestamp is taken from the Heka message
function encode_datapoint(name, value, tags)
local ts
if time_precision and time_precision ~= 'ns' then
ts = field_util.message_timestamp(time_precision)
else
ts = read_message('Timestamp')
end
-- Encode the value field
@@ -122,23 +114,42 @@ function process_message()
value = string.format("%.6f", value)
end
if #tags > 0 then
-- for performance reasons, it is recommended to always send the tags
-- in the same order.
table.sort(tags)
-- Add the common tags
for _, t in ipairs(tag_fields) do
tags[t] = read_message(string.format('Fields[%s]', t)) or defaults[t]
end
local tags_array = {}
for k,v in pairs(tags) do
table.insert(tags_array, escape_string(k) .. '=' .. escape_string(v))
end
-- for performance reasons, it is recommended to always send the tags
-- in the same order.
table.sort(tags_array)
if #tags_array > 0 then
point = string.format("%s,%s value=%s %d",
escape_string(read_message('Fields[name]')),
table.concat(tags, ','),
escape_string(name),
table.concat(tags_array, ','),
value,
ts)
else
point = string.format("%s value=%s %d",
escape_string(read_message('Fields[name]')),
escape_string(name),
value,
ts)
end
datapoints[#datapoints+1] = point
end
function process_bulk_metric()
local datapoints = utils.decode_bulk_metric()
for _, point in ipairs(datapoints) do
encode_datapoint(point.name, point.value, point.tags or {})
end
flush()
return 0
end

View File

@@ -27,7 +27,7 @@ class lma_collector::influxdb (
heka::filter::sandbox { 'influxdb_accumulator':
config_dir => $lma_collector::params::config_dir,
filename => "${lma_collector::params::plugins_dir}/filters/influxdb_accumulator.lua",
message_matcher => 'Type == \'metric\' || Type == \'heka.sandbox.metric\'',
message_matcher => 'Type == \'metric\' || Type == \'heka.sandbox.metric\' || Type == \'heka.sandbox.bulk_metric\'',
ticker_interval => 1,
config => {
flush_interval => $lma_collector::params::influxdb_flush_interval,

View File

@@ -0,0 +1,34 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
class lma_collector::metrics::heka_monitoring_legacy (
$dashboard_address = $lma_collector::params::dashboard_address,
$dashboard_port = $lma_collector::params::dashboard_port,
){
include lma_collector::service
heka::filter::sandbox { 'heka_monitoring':
config_dir => $lma_collector::params::config_dir,
filename => "${lma_collector::params::plugins_dir}/filters/heka_monitoring_legacy.lua",
message_matcher => "Type == 'heka.all-report'",
notify => Class['lma_collector::service'],
}
# Dashboard is required to enable monitoring messages
heka::output::dashboard { 'dashboard':
config_dir => $lma_collector::params::config_dir,
dashboard_address => $dashboard_address,
dashboard_port => $dashboard_port,
}
}