Merge "Update annotation filter for InfluxDB 0.9"

This commit is contained in:
Jenkins
2015-08-07 12:33:28 +00:00
committed by Gerrit Code Review
8 changed files with 193 additions and 81 deletions

View File

@@ -143,12 +143,10 @@ function inject_bulk_metric(ts, hostname, source)
inject_message(msg)
end
-- Return a list of datapoints, each point being a table formatted like this:
-- {name='foo',value=1,tags={k1=v1,...}}
function decode_bulk_metric()
local ok, datapoints = pcall(cjson.decode, read_message("Payload"))
function decode_json_payload()
local ok, data = pcall(cjson.decode, read_message("Payload"))
return datapoints or {}
return data
end
local global_status_to_severity_map = {

View File

@@ -54,7 +54,6 @@ function flush ()
end
end
-- TODO(pasquier-s): support points with multiple fields
function process_message()
local msg_type = read_message("Type")
if msg_type:match('bulk_metric$') then
@@ -67,7 +66,13 @@ end
function process_single_metric()
local tags = {}
local name = read_message("Fields[name]")
local value = read_message("Fields[value]")
local value
if read_message('Type'):match('multivalue_metric$') then
value = utils.decode_json_payload()
else
value = read_message("Fields[value]")
end
if value == nil or name == nil then
return -1
@@ -90,9 +95,53 @@ function process_single_metric()
return 0
end
function process_bulk_metric()
-- The payload contains a list of datapoints, each point being formatted
-- like this: {name='foo',value=1,tags={k1=v1,...}}
local datapoints = utils.decode_json_payload() or {}
for _, point in ipairs(datapoints) do
encode_datapoint(point.name, point.value, point.tags or {})
end
flush()
return 0
end
function encode_scalar_value(value)
if type(value) == "number" then
-- Always send numbers as formatted floats, so InfluxDB will accept
-- them if they happen to change from ints to floats between
-- points in time. Forcing them to always be floats avoids this.
return string.format("%.6f", value)
elseif type(value) == "string" then
-- string values need to be double quoted
return '"' .. value:gsub('"', '\\"') .. '"'
elseif type(value) == "boolean" then
return '"' .. tostring(value) .. '"'
end
end
function encode_value(value)
if type(value) == "table" then
local values = {}
for k,v in pairs(value) do
table.insert(
values,
string.format("%s=%s", escape_string(k), encode_scalar_value(v))
)
end
return table.concat(values, ',')
else
return "value=" .. encode_scalar_value(value)
end
end
-- encode a single datapoint using the InfluxDB line protocol
--
-- name: the measurement's name
-- value: the datapoint's value
-- tags: table of tags
-- value: a scalar value or a list of key-value pairs
-- tags: a table of tags
--
-- Timestamp is taken from the Heka message
function encode_datapoint(name, value, tags)
@@ -103,17 +152,6 @@ function encode_datapoint(name, value, tags)
ts = read_message('Timestamp')
end
-- Encode the value field
if type(value) == "string" then
-- string values need to be double quoted
value = '"' .. value:gsub('"', '\\"') .. '"'
elseif type(value) == "number" or string.match(value, "^[%d.]+$") then
-- Always send numbers as formatted floats, so InfluxDB will accept
-- them if they happen to change from ints to floats between
-- points in time. Forcing them to always be floats avoids this.
value = string.format("%.6f", value)
end
-- Add the common tags
for _, t in ipairs(tag_fields) do
tags[t] = read_message(string.format('Fields[%s]', t)) or defaults[t]
@@ -128,15 +166,15 @@ function encode_datapoint(name, value, tags)
table.sort(tags_array)
if #tags_array > 0 then
point = string.format("%s,%s value=%s %d",
point = string.format("%s,%s %s %d",
escape_string(name),
table.concat(tags_array, ','),
value,
encode_value(value),
ts)
else
point = string.format("%s value=%s %d",
point = string.format("%s %s %d",
escape_string(name),
value,
encoder_value(value),
ts)
end
@@ -144,7 +182,10 @@ function encode_datapoint(name, value, tags)
end
function process_bulk_metric()
local datapoints = utils.decode_bulk_metric()
-- the list of datapoints is encoded in the message payload, each point
-- is a hash table formatted like this:
-- {name='foo',value=1,tags={k1=v1,...}}
local datapoints = utils.decode_json_payload()
for _, point in ipairs(datapoints) do
encode_datapoint(point.name, point.value, point.tags or {})

View File

@@ -14,64 +14,48 @@
require 'cjson'
require 'string'
require 'table'
require "os"
require 'math'
local floor = math.floor
local utils = require 'lma_utils'
local last_flush = os.time()
local datapoints = {}
local base_serie_name = 'annotation'
local measurement_name = read_config('measurement_name') or 'annotations'
local html_break_line = '<br />'
local flush_count = read_config('flush_count') or 100
local flush_interval = read_config('flush_interval') or 5
function flush ()
local now = os.time()
if #datapoints > 0 and (#datapoints > flush_count or now - last_flush > flush_interval) then
inject_payload("json", "influxdb", cjson.encode(datapoints))
datapoints = {}
last_flush = now
end
end
-- Transform a status message into an InfluxDB datapoint
function process_message ()
local ts = floor(read_message('Timestamp')/1e6) -- ms
local msg_type = read_message('Type')
local payload = read_message('Payload')
local service = read_message('Fields[service]')
local name = string.gsub(service, ' ', '_')
local serie_name = string.format('%s.%s', base_serie_name, name)
local title
local text = ''
local service = read_message('Fields[service]')
local status = read_message('Fields[status]')
local prev_status = read_message('Fields[previous_status]')
if msg_type == 'heka.sandbox.status' then
local status = read_message('Fields[status]')
local prev_status = read_message('Fields[previous_status]')
local ok, details = pcall(cjson.decode, payload)
if ok then
text = table.concat(details, html_break_line)
end
if prev_status ~= status then
title = string.format('General status %s -> %s',
utils.global_status_to_label_map[prev_status],
utils.global_status_to_label_map[status])
else
title = string.format('General status remains %s',
utils.global_status_to_label_map[status])
end
datapoints[#datapoints+1] = {
name = serie_name,
columns = {"time", "title", "tag", "text"},
points = {{ts, title, service, text}}
}
local ok, details = pcall(cjson.decode, read_message('Payload'))
if ok then
text = table.concat(details, html_break_line)
end
flush()
if prev_status ~= status then
title = string.format('General status %s -> %s',
utils.global_status_to_label_map[prev_status],
utils.global_status_to_label_map[status])
else
title = string.format('General status remains %s',
utils.global_status_to_label_map[status])
end
local msg = {
Timestamp = read_message('Timestamp'),
Type = 'multivalue_metric',
Severity = utils.label_to_severity_map.INFO,
Hostname = read_message('Hostname'),
Payload = cjson.encode({title=title, tags=service, text=text}),
Fields = {
name = measurement_name,
tag_fields = { 'service' },
service = service,
source = 'influxdb_annotation'
}
}
utils.inject_tags(msg)
inject_message(msg)
return 0
end
function timer_event(ns)
flush()
end

View File

@@ -0,0 +1,77 @@
-- Copyright 2015 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require 'cjson'
require 'string'
require 'table'
require "os"
require 'math'
local floor = math.floor
local utils = require 'lma_utils'
local last_flush = os.time()
local datapoints = {}
local base_serie_name = 'annotation'
local html_break_line = '<br />'
local flush_count = read_config('flush_count') or 100
local flush_interval = read_config('flush_interval') or 5
function flush ()
local now = os.time()
if #datapoints > 0 and (#datapoints > flush_count or now - last_flush > flush_interval) then
inject_payload("json", "influxdb", cjson.encode(datapoints))
datapoints = {}
last_flush = now
end
end
function process_message ()
local ts = floor(read_message('Timestamp')/1e6) -- ms
local msg_type = read_message('Type')
local payload = read_message('Payload')
local service = read_message('Fields[service]')
local name = string.gsub(service, ' ', '_')
local serie_name = string.format('%s.%s', base_serie_name, name)
local title
local text = ''
if msg_type == 'heka.sandbox.status' then
local status = read_message('Fields[status]')
local prev_status = read_message('Fields[previous_status]')
local ok, details = pcall(cjson.decode, payload)
if ok then
text = table.concat(details, html_break_line)
end
if prev_status ~= status then
title = string.format('General status %s -> %s',
utils.global_status_to_label_map[prev_status],
utils.global_status_to_label_map[status])
else
title = string.format('General status remains %s',
utils.global_status_to_label_map[status])
end
datapoints[#datapoints+1] = {
name = serie_name,
columns = {"time", "title", "tag", "text"},
points = {{ts, title, service, text}}
}
end
flush()
return 0
end
function timer_event(ns)
flush()
end

View File

@@ -180,7 +180,7 @@ function compute_status(events, not_up_status, current_time, elts_name, name, st
utils.add_to_bulk_metric(
string.format('openstack_%s_%s_status', name, worker.group_name),
utils.service_status_map.DOWN,
{ service = worker_name}
{ service = worker_name }
)
end
@@ -198,7 +198,7 @@ function compute_status(events, not_up_status, current_time, elts_name, name, st
utils.add_to_bulk_metric(
string.format('openstack_%s_%s_status', name, worker.group_name),
new_status,
{ service = worker_name}
{ service = worker_name }
)
if display_num then
event_detail = string.format("(%s/%s UP)", up_elements[worker_name],
@@ -235,7 +235,7 @@ function compute_status(events, not_up_status, current_time, elts_name, name, st
utils.add_to_bulk_metric(
string.format('openstack_%s_%s_status', name, worker.group_name),
utils.service_status_map.UP,
{ service = worker_name}
{ service = worker_name }
)
end
end

View File

@@ -27,7 +27,7 @@ class lma_collector::influxdb (
heka::filter::sandbox { 'influxdb_accumulator':
config_dir => $lma_collector::params::config_dir,
filename => "${lma_collector::params::plugins_dir}/filters/influxdb_accumulator.lua",
message_matcher => 'Type == \'metric\' || Type == \'heka.sandbox.metric\' || Type == \'heka.sandbox.bulk_metric\'',
message_matcher => 'Type == \'metric\' || Type == \'heka.sandbox.metric\' || Type == \'heka.sandbox.bulk_metric\' || Type == \'heka.sandbox.multivalue_metric\'',
ticker_interval => 1,
config => {
flush_interval => $lma_collector::params::influxdb_flush_interval,
@@ -41,6 +41,16 @@ class lma_collector::influxdb (
notify => Class['lma_collector::service'],
}
heka::filter::sandbox { 'influxdb_annotation':
config_dir => $lma_collector::params::config_dir,
filename => "${lma_collector::params::plugins_dir}/filters/influxdb_annotation.lua",
message_matcher => 'Type == \'heka.sandbox.status\' && Fields[updated] == TRUE',
config => {
serie_name => $lma_collector::params::annotations_serie_name
},
notify => Class['lma_collector::service'],
}
heka::encoder::payload { 'influxdb':
config_dir => $lma_collector::params::config_dir,
notify => Class['lma_collector::service'],

View File

@@ -37,7 +37,7 @@ class lma_collector::influxdb_legacy (
heka::filter::sandbox { 'influxdb_annotation':
config_dir => $lma_collector::params::config_dir,
filename => "${lma_collector::params::plugins_dir}/filters/influxdb_annotation.lua",
filename => "${lma_collector::params::plugins_dir}/filters/influxdb_annotation_legacy.lua",
message_matcher => 'Type == \'heka.sandbox.status\' && Fields[updated] == TRUE',
ticker_interval => 1,
config => {

View File

@@ -105,6 +105,8 @@ class lma_collector::params {
$service_status_interval = floor($collectd_interval * 1.5)
$service_status_payload_name = 'service_status'
$annotations_serie_name = 'annotations'
# Catch all metrics used to compute OpenStack service statutes
$service_status_metrics_regexp_legacy = [
'^openstack.(nova|cinder|neutron).(services|agents).*(up|down|disabled)$',