Remove legacy status filters

This change removes the Heka filters that computed the services
statuses. It also cleans up the Puppet code that referred to it.

Change-Id: Ib6c1c9054333b9e71f5a8a2f08600eae5d287816
This commit is contained in:
Simon Pasquier 2015-09-17 16:59:35 +02:00
parent 2d74e1bf01
commit fc391d641f
6 changed files with 1 additions and 567 deletions

View File

@ -171,9 +171,6 @@ if $lma_collector['influxdb_mode'] != 'disabled' {
services => ['mysql', 'rabbitmq', 'haproxy', 'memcached', 'apache']
}
# Service status metrics and annotations
class { 'lma_collector::metrics::service_status': }
# AFD filters
class { 'lma_collector::afd::api': }
class { 'lma_collector::afd::workers': }

View File

@ -53,61 +53,6 @@ metric_type = {
local default_severity = 7
service_status_map = {
UP = 0,
DEGRADED = 1,
DOWN = 2,
UNKNOWN = 3,
}
service_status_to_label_map = {
[0] = 'UP',
[1] = 'DEGRADED',
[2] = 'DOWN',
[3] = 'UNKNOWN',
}
global_status_map = {
OKAY = 0,
WARN = 1,
FAIL = 2,
UNKNOWN = 3,
}
global_status_to_label_map = {
[0] = 'OKAY',
[1] = 'WARN',
[2] = 'FAIL',
[3] = 'UNKNOWN',
}
check_api_to_status_map = {
[0] = 2, -- DOWN
[1] = 0, -- UP
[2] = 3, -- UNKNOWN
}
check_api_status_to_state_map = {
[0] = 'down',
[1] = 'up',
[2] = 'unknown',
}
state_map = {
UP = 'up',
DOWN = 'down',
DISABLED = 'disabled',
UNKNOWN = 'unknown'
}
function add_metric(datapoints, name, points)
datapoints[#datapoints+1] = {
name = name,
columns = {"time", "value" },
points = {points}
}
end
local bulk_datapoints = {}
-- Add a datapoint to the bulk metric message
@ -149,29 +94,6 @@ function decode_json_payload()
return data
end
local global_status_to_severity_map = {
[global_status_map.OKAY] = label_to_severity_map.INFO,
[global_status_map.WARN] = label_to_severity_map.WARNING,
[global_status_map.FAIL] = label_to_severity_map.CRITICAL,
[global_status_map.UNKNOWN] = label_to_severity_map.NOTICE,
}
function inject_status_message(time, service, status, prev_status, updated, details)
local msg = {
Timestamp = time,
Payload = details,
Type = 'status', -- prepended with 'heka.sandbox'
Severity = global_status_to_severity_map[status],
Fields = {
service = service,
status = status,
previous_status = prev_status,
updated = updated,
}
}
inject_message(msg)
end
-- Parse a Syslog-based payload and update the Heka message
-- Return true if successful, false otherwise
function parse_syslog_message(grammar, payload, msg)

View File

@ -1,174 +0,0 @@
-- Copyright 2015 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-- The filter accumulates data into a table and emits regularly a message per
-- service with a payload like this:
-- {
-- "vip_active_at": 1435829917607,
-- "name": "nova",
-- "states": {
-- "check_api":{
-- "nova":{
-- "down":{
-- "value":0,
-- "group_name":"endpoint",
-- "last_seen":1433252000524
-- },
-- "up":{
-- "value":1,
-- "group_name":"endpoint",
-- "last_seen":1433252000524
-- }
-- },
-- ...
-- },
-- "workers":{
-- "scheduler":{
-- "down":{
-- "value":0,
-- "group_name":"services",
-- "last_seen":1433251999229
-- },
-- "disabled":{
-- "value":1,
-- "group_name":"services",
-- "last_seen":1433251999226
-- },
-- "up":{
-- "value":2,
-- "group_name":"services",
-- "last_seen":1433251999227
-- }
-- },
-- ...
-- },
-- "haproxy":{
-- "nova-api":{
-- "down":{
-- "value":0,
-- "group_name":"pool",
-- "last_seen":1433252000957
-- },
-- "up":{
-- "value":3,
-- "group_name":"pool",
-- "last_seen":1433252000954
-- }
-- }
-- }
-- ...
-- }
-- }
require 'cjson'
require 'string'
require 'math'
local floor = math.floor
local utils = require 'lma_utils'
_PRESERVATION_VERSION = 2
-- variables with global scope are preserved between restarts
services = {}
vip_active_at = 0
local payload_name = read_config('inject_payload_name') or 'service_status'
local state_label_map = {
up = utils.state_map.UP,
down = utils.state_map.DOWN,
disabled = utils.state_map.DISABLED,
}
function process_message ()
local ts = read_message("Timestamp")
local metric_name = read_message("Fields[name]")
local value = read_message("Fields[value]")
local state = state_label_map[read_message('Fields[state]')]
local name
local top_entry
local item_name
local group_name
if metric_name == 'pacemaker_local_resource_active' and read_message("Fields[resource]") == 'vip__public' then
if value == 1 then
vip_active_at = ts
else
vip_active_at = 0
end
return 0
end
if string.find(metric_name, '^openstack_([^._]+)_services$') or string.find(metric_name, '^openstack_([^._]+)_agents$') then
name, group_name = string.match(metric_name, '([^_]+)_([^_]+)$')
top_entry = 'workers'
item_name = read_message('Fields[service]')
elseif string.find(metric_name, '_check_api$') then
-- A service can have several API checks, by convention the service name
-- is written down "<name>-<item>" or just "<name>".
name = string.match(read_message('Fields[service]'), '^([^-]+)')
top_entry = 'check_api'
group_name = 'endpoint'
item_name = read_message('Fields[service]')
-- convert 0/1 value to up/down state
state = utils.check_api_status_to_state_map[value]
-- and always override value to 1
value = 1
elseif metric_name == 'haproxy_backend_servers' then
name = string.match(read_message('Fields[backend]'), '^([^-]+)')
top_entry = 'haproxy'
group_name = 'pool'
item_name = read_message('Fields[backend]')
end
if not name or not item_name then
return -1
end
-- table initialization for the first time we see a service
if not services[name] then services[name] = {} end
if not services[name][top_entry] then services[name][top_entry] = {} end
if not services[name][top_entry][item_name] then services[name][top_entry][item_name] = {} end
local service = services[name][top_entry][item_name]
service[state] = {last_seen=ts, value=value, group_name=group_name}
-- In the logic to treat check_api results like others, group by up/down
-- and reset the counterpart w/ value=0
if top_entry == 'check_api' then
local invert_state
if state == utils.state_map.UP then
invert_state = utils.state_map.DOWN
elseif state == utils.state_map.DOWN then
invert_state = utils.state_map.UP
end
if invert_state then
if not service[invert_state] then
service[invert_state] = {}
end
service[invert_state] = {last_seen=ts, value=0, group_name=group_name}
end
end
return 0
end
function timer_event(ns)
for name, states in pairs(services) do
inject_payload('json', payload_name,
cjson.encode({vip_active_at=vip_active_at, name=name, states=states}))
end
end

View File

@ -1,247 +0,0 @@
-- Copyright 2015 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require 'cjson'
require 'string'
require 'math'
local max = math.max
local utils = require 'lma_utils'
_PRESERVATION_VERSION = 3
-- variables with global scope are preserved between restarts
all_service_status = {}
-- local scope variables
local timeout = (read_config("timeout") or 60) * 1e9
function process_message ()
local ok, data = pcall(cjson.decode, read_message("Payload"))
if not ok then
return -1
end
local timestamp = read_message('Timestamp')
local hostname = read_message("Hostname")
local service_name = data.name
local states = data.states
local worker_status = -1
local check_api_status = -1
local haproxy_server_status = -1
local global_status
local events = {}
local not_up_status = {}
local msg_event
if not all_service_status[service_name] then all_service_status[service_name] = {} end
if states.workers then
worker_status = compute_status(events, not_up_status, timestamp, 'workers', service_name, states.workers, true)
end
if states.check_api then
check_api_status = compute_status(events, not_up_status, timestamp, 'check_api', service_name, states.check_api, false)
end
if states.haproxy then
haproxy_server_status = compute_status(events, not_up_status, timestamp, 'haproxy', service_name, states.haproxy, true)
end
global_status = max(worker_status, check_api_status, haproxy_server_status)
-- global service status
utils.add_to_bulk_metric(
string.format('openstack_%s_status', service_name),
global_status
)
utils.inject_bulk_metric(timestamp, hostname, 'service_status_filter')
-- only emit status if the public vip is active
if not expired(data.vip_active_at) then
local prev = all_service_status[service_name].global_status or utils.global_status_map.UNKNOWN
local updated
updated = (prev ~= global_status or #events > 0)
-- always append not UP status elements in details
for k, v in pairs(not_up_status) do events[#events+1] = v end
local details = ''
if #events > 0 then
details = cjson.encode(events)
end
utils.inject_status_message(
timestamp, service_name, global_status, prev, updated, details
)
end
all_service_status[service_name].global_status = global_status
return 0
end
function get_previous_status(service_name, top_entry, name)
if not all_service_status[service_name] then
all_service_status[service_name] = {}
end
if not all_service_status[service_name][top_entry] then
all_service_status[service_name][top_entry] = {}
end
if not all_service_status[service_name][top_entry][name] then
all_service_status[service_name][top_entry][name] = utils.service_status_map.UNKNOWN
end
return all_service_status[service_name][top_entry][name]
end
function set_status(service_name, top_entry, name, status)
all_service_status[service_name][top_entry][name] = status
end
function compute_status(events, not_up_status, current_time, elts_name, name, states, display_num)
local down_elts = {}
local down_elts_count = 0
local zero_up = {}
local zero_up_count = 0
local one_up = {}
local one_disabled = {}
local one_disabled_count = 0
local service_status = utils.service_status_map.UNKNOWN
local up_elements = {}
local total_elements = {}
for worker, worker_data in pairs(states) do
if not total_elements[worker] then
total_elements[worker] = 0
end
if not up_elements[worker] then
up_elements[worker] = 0
end
for state, data in pairs(worker_data) do
if not expired(current_time, data.last_seen) then
total_elements[worker] = total_elements[worker] + data.value
if state == utils.state_map.DOWN and data.value > 0 then
down_elts[worker] = data
down_elts_count = down_elts_count + 1
end
if state == utils.state_map.UP then
if data.value > 0 then
one_up[worker] = data
else
zero_up[worker] = data
zero_up_count = zero_up_count + 1
end
up_elements[worker] = data.value
end
if state == utils.state_map.DISABLED and data.value > 0 then
one_disabled[worker] = data
one_disabled_count = one_disabled_count + 1
end
end
end
end
-- general element status
if zero_up_count > 0 then
service_status = utils.service_status_map.DOWN
elseif down_elts_count > 0 then
service_status = utils.service_status_map.DEGRADED
elseif down_elts_count == 0 then
service_status = utils.service_status_map.UP
end
-- elements clearly down
for worker_name, worker in pairs(zero_up) do
local prev = get_previous_status(name, elts_name, worker_name)
local DOWN = utils.service_status_map.DOWN
local event_detail = ""
set_status(name, elts_name, worker_name, DOWN)
if display_num then
event_detail = string.format("(%s/%s UP)", up_elements[worker_name],
total_elements[worker_name])
end
if prev and prev ~= DOWN then
events[#events+1] = string.format("%s %s %s -> %s %s", worker_name,
worker.group_name,
utils.service_status_to_label_map[prev],
utils.service_status_to_label_map[DOWN],
event_detail)
else
not_up_status[#not_up_status+1] = string.format("%s %s %s %s",
worker_name,
worker.group_name,
utils.service_status_to_label_map[DOWN],
event_detail)
end
utils.add_to_bulk_metric(
string.format('openstack_%s_%s_status', name, worker.group_name),
utils.service_status_map.DOWN,
{ service = worker_name }
)
end
-- elements down or degraded
for worker_name, worker in pairs(down_elts) do
local prev = get_previous_status(name, elts_name, worker_name)
local new_status
local event_detail
if one_up[worker_name] then
new_status = utils.service_status_map.DEGRADED
else
new_status = utils.service_status_map.DOWN
end
set_status(name, elts_name, worker_name, new_status)
utils.add_to_bulk_metric(
string.format('openstack_%s_%s_status', name, worker.group_name),
new_status,
{ service = worker_name }
)
if display_num then
event_detail = string.format("(%s/%s UP)", up_elements[worker_name],
total_elements[worker_name])
else
event_detail = ""
end
if prev ~= new_status then
events[#events+1] = string.format("%s %s %s -> %s %s", worker_name,
worker.group_name,
utils.service_status_to_label_map[prev],
utils.service_status_to_label_map[new_status],
event_detail)
elseif not zero_up[worker_name] then
not_up_status[#not_up_status+1] = string.format("%s %s %s %s", worker_name,
worker.group_name,
utils.service_status_to_label_map[new_status],
event_detail)
end
end
-- elements up
for worker_name, worker in pairs(one_up) do
if not zero_up[worker_name] and not down_elts[worker_name] then
local prev = get_previous_status(name, elts_name, worker_name)
local UP = utils.service_status_map.UP
set_status(name, elts_name, worker_name, UP)
if prev and prev ~= utils.service_status_map.UP then
events[#events+1] = string.format("%s %s %s -> %s", worker_name,
worker.group_name,
utils.service_status_to_label_map[prev],
utils.service_status_to_label_map[UP])
end
utils.add_to_bulk_metric(
string.format('openstack_%s_%s_status', name, worker.group_name),
utils.service_status_map.UP,
{ service = worker_name }
)
end
end
return service_status
end
function expired(last_time)
return not (last_time > 0 and (read_message('Timestamp') - last_time) <= timeout)
end

View File

@ -1,49 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
class lma_collector::metrics::service_status (
$metrics_matcher = $lma_collector::params::service_status_metrics_matcher,
$timeout = $lma_collector::params::service_status_timeout,
) inherits lma_collector::params {
validate_string($metrics_matcher)
$payload_name = $lma_collector::params::service_status_payload_name
if (size(metrics_regexp) > 0){
heka::filter::sandbox { 'service_accumulator_states':
config_dir => $lma_collector::params::config_dir,
filename => "${lma_collector::params::plugins_dir}/filters/service_accumulator_states.lua",
message_matcher => $metrics_matcher,
ticker_interval => $lma_collector::params::service_status_interval,
preserve_data => true,
config => {
inject_payload_name => $payload_name,
},
notify => Class['lma_collector::service'],
}
heka::filter::sandbox { 'service_status':
config_dir => $lma_collector::params::config_dir,
filename => "${lma_collector::params::plugins_dir}/filters/service_status.lua",
message_matcher => "Fields[payload_type] == 'json' && Fields[payload_name] == '${payload_name}'",
preserve_data => true,
config => {
timeout => $timeout,
},
notify => Class['lma_collector::service'],
}
}
}

View File

@ -75,9 +75,8 @@ class lma_collector::params {
fail('max_message_size setting must be greater than max_file_size')
}
# The 'service_status' filter injects 2 message per process_message() call
# Heka's default value is 1
$hekad_max_process_inject = 2
$hekad_max_process_inject = 1
# The GSE filters can inject up to 20 messages per timer_event() call
# Heka's default value is 10
@ -127,23 +126,9 @@ class lma_collector::params {
$collectd_types = [ 'ceph', 'ceph_perf' ]
$heartbeat_timeout = 30
$service_status_timeout = 65
$service_status_interval = floor($collectd_interval * 1.5)
$service_status_payload_name = 'service_status'
$annotations_serie_name = 'annotations'
# Catch all metrics used to compute OpenStack service statutes
$service_status_metrics_matcher = join([
'(Type == \'metric\' || Type == \'heka.sandbox.metric\') && ',
'(Fields[name] =~ /^openstack_(nova|cinder|neutron)_(services|agents)$/ || ',
# Exception for mysqld backend because the MySQL service status is
# computed by a dedicated filter and this avoids to send an annoying
# status Heka message.
'(Fields[name] == \'haproxy_backend_servers\' && Fields[backend] !~ /mysql/) || ',
'(Fields[name] == \'pacemaker_local_resource_active\' && Fields[resource] == \'vip__public\') || ',
'Fields[name] =~ /^openstack.*check_api$/)'
], '')
$worker_report_interval = 60
$worker_downtime_factor = 2