Merge "Emit aggregated HTTP metrics"

This commit is contained in:
Jenkins 2016-05-04 15:37:05 +00:00 committed by Gerrit Code Review
commit 18b73f9fff
10 changed files with 269 additions and 63 deletions

View File

@ -408,8 +408,12 @@ if $influxdb_mode != 'disabled' {
class { 'lma_collector::collectd::apache': }
# TODO(all): This class is still called to ensure the sandbox deletion
# when upgrading the plugin. Can be removed for next release after 0.10.0.
class { 'lma_collector::logs::http_metrics': }
class { 'lma_collector::logs::aggregated_http_metrics': }
# Notification are always collected, lets extract metrics from there
class { 'lma_collector::notifications::metrics': }

View File

@ -136,10 +136,10 @@ class { 'lma_collector::logs::rabbitmq': }
To make the collector create HTTP metrics from OpenStack log messages that
include HTTP information (method, status, and response time) declare the
`lma_collector::logs::http_metrics` class:
`lma_collector::logs::aggregated_http_metrics` class:
```puppet
class { 'lma_collector::logs::http_metrics': }
class { 'lma_collector::logs::aggregated_http_metrics': }
```
### Store logs into Elasticsearch
@ -432,7 +432,7 @@ Public Classes:
* [`lma_collector::logs::rabbitmq`](#class-lma_collectorlogsrabbitmq)
* [`lma_collector::logs::system`](#class-lma_collectorlogssystem)
* [`lma_collector::logs::swift`](#class-lma_collectorlogsswift)
* [`lma_collector::logs::http_metrics`](#class-lma_collectorlogshttp_metrics)
* [`lma_collector::logs::aggregated_http_metrics`](#class-lma_collectorlogsaggregated_http_metrics)
* [`lma_collector::collectd::base`](#class-lma_collectorcollectdbase)
* [`lma_collector::collectd::haproxy`](#class-lma_collectorcollectdhaproxy)
* [`lma_collector::collectd::rabbitmq`](#class-lma_collectorcollectdrabbitmq)
@ -594,14 +594,23 @@ a Syslog file.
for more information.
* `log_directory`: *Optional*. The log directory. Default: `/var/log`.
#### Class: `lma_collector::logs::http_metrics`
#### Class: `lma_collector::logs::aggregated_http_metrics`
Declare this class to create an Heka filter that derives HTTP metrics from
OpenStack log messages that include HTTP information (method, status and
response time).
response time). Response times are aggregated over an interval and the
following statistics are produced: `min`,`max`,`sum`,`count`,`percentile`.
The metric name is `openstack_<service>_http_responses` where `<service>` is
the OpenStack service name (e.g. "neutron").
The metric name is `openstack_<service>_http_response_times` where `<service>`
is the OpenStack service name (e.g. "neutron").
##### Parameters
* `hostname`: *Optional*. The hostname. Default: $::hostname factor.
* `interval`: *Optional*. Interval in second used to aggregate metrics. Default: 10.
* `max_timer_inject`: *Optional*. The maximum number of messages allowed to be injected by the sandbox. Default: 10.
* `bulk_size`: *Optional*. The number of metrics embedded by a bulk_metric. Default: 500.
* `percentile`: *Optional*. The percentile. Default: 90.
#### Class: `lma_collector::collectd::base`

View File

@ -58,12 +58,17 @@ local default_severity = 7
local bulk_datapoints = {}
-- Add a datapoint to the bulk metric message
-- The 'value' parameter can be a table to support multi-value metric
function add_to_bulk_metric(name, value, tags)
bulk_datapoints[#bulk_datapoints+1] = {
name = name,
value = value,
tags = tags or {},
}
if type(value) == 'table' then
bulk_datapoints[#bulk_datapoints].values = value
else
bulk_datapoints[#bulk_datapoints].value = value
end
end
-- Send the bulk metric message to the Heka pipeline

View File

@ -1,52 +0,0 @@
-- Copyright 2015 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require 'string'
local utils = require 'lma_utils'
local msg = {
Type = "metric", -- will be prefixed by "heka.sandbox."
Timestamp = nil,
Severity = 6,
Fields = nil
}
function process_message ()
local http_method = read_message("Fields[http_method]")
local http_status = read_message("Fields[http_status]")
local response_time = read_message("Fields[http_response_time]")
if http_method == nil or http_status == nil or response_time == nil then
return -1
end
-- keep only the first 2 tokens because some services like Neutron report
-- themselves as 'openstack.<service>.server'
local service = string.gsub(read_message("Logger"), '(%w+)%.(%w+).*', '%1_%2')
msg.Timestamp = read_message("Timestamp")
msg.Fields = {
hostname = read_message("Hostname"),
source = read_message('Fields[programname]') or service,
name = service .. '_http_responses',
type = utils.metric_type['GAUGE'],
value = {value = response_time, representation = 's'},
tenant_id = read_message('Fields[tenant_id]'),
user_id = read_message('Fields[user_id]'),
http_method = http_method,
http_status = http_status,
tag_fields = {'http_method', 'http_status'},
}
utils.inject_tags(msg)
return utils.safe_inject_message(msg)
end

View File

@ -0,0 +1,185 @@
-- Copyright 2016 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require 'string'
require 'math'
require 'os'
local utils = require 'lma_utils'
local tab = require 'table_utils'
local table = require 'table'
local hostname = read_config('hostname') or error('hostname must be specified')
local interval = (read_config('interval') or error('interval must be specified')) + 0
-- max_timer_inject is the maximum number of injected messages by timer_event()
local max_timer_inject = (read_config('max_timer_inject') or 10) + 0
-- bulk_size is the maximum number of metrics embedded by a bulk_metric within the Payload.
-- The bulk_size depends on the hekad max_message_size (64 KB by default).
-- At most, there are 45 metrics/service * 300B (per bucket) =~ 13KB * 4 services = 52KB for 225 metrics.
-- With a max_message_size set to 256KB, it's possible to embed more than 800 metrics.
local bulk_size = (read_config('bulk_size') or 225) + 0
local percentile_thresh = (read_config('percentile') or 90) + 0
-- grace_time is used to palliate the time precision difference
-- (in second or millisecond for logs versus nanosecond for the ticker)
-- and also to compensate the delay introduced by log parsing/decoding
-- which leads to arrive too late in its interval.
local grace_time = (read_config('grace_time') or 0) + 0
local inject_reached_error = 'too many metrics to aggregate, adjust bulk_size and/or max_timer_inject parameters'
local percentile_field_name = string.format('upper_%s', percentile_thresh)
local msg_source = 'http_metric_filter'
local last_tick = os.time() * 1e9
local interval_in_ns = interval * 1e9
local http_verbs = {
GET = true,
POST = true,
OPTIONS = true,
DELETE = true,
PUT = true,
HEAD = true,
TRACE = true,
CONNECT = true,
PATCH = true,
}
local metric_bucket = {
min = 0,
max = 0,
sum = 0,
count = 0,
times = {},
[percentile_field_name] = 0,
rate = 0,
}
local all_times = {}
local num_metrics = 0
function process_message ()
local severity = read_message("Fields[severity_label]")
local logger = read_message("Logger")
local timestamp = read_message("Timestamp")
local http_method = read_message("Fields[http_method]")
local http_status = read_message("Fields[http_status]")
local response_time = read_message("Fields[http_response_time]")
if timestamp < last_tick - grace_time then
-- drop silently old logs
return 0
end
if http_method == nil or http_status == nil or response_time == nil then
return -1
end
-- keep only the first 2 tokens because some services like Neutron report
-- themselves as 'openstack.<service>.server'
local service = string.gsub(read_message("Logger"), '(%w+)%.(%w+).*', '%1_%2')
if service == nil then
return -1, "Cannot match any service from " .. logger
end
-- coerce http_status to integer
http_status = http_status + 0
local http_status_family
if http_status >= 100 and http_status < 200 then
http_status_family = '1xx'
elseif http_status >= 200 and http_status < 300 then
http_status_family = '2xx'
elseif http_status >= 300 and http_status < 400 then
http_status_family = '3xx'
elseif http_status >= 400 and http_status < 500 then
http_status_family = '4xx'
elseif http_status >= 500 and http_status < 600 then
http_status_family = '5xx'
else
return -1, "Unsupported http_status " .. http_status
end
if not http_verbs[http_method] then
return -1, "Unsupported http_method " .. http_method
end
if not all_times[service] then
all_times[service] = {}
end
if not all_times[service][http_method] then
all_times[service][http_method] = {}
end
if not all_times[service][http_method][http_status_family] then
-- verify that the sandbox has enough capacity to emit all metrics
if num_metrics > (bulk_size * max_timer_inject) then
return -1, inject_reached_error
end
all_times[service][http_method][http_status_family] = tab.deepcopy(metric_bucket)
num_metrics = num_metrics + 1
end
local bucket = all_times[service][http_method][http_status_family]
bucket.times[#bucket.times + 1] = response_time
bucket.count = bucket.count + 1
bucket.sum = bucket.sum + response_time
if bucket.max < response_time then
bucket.max = response_time
end
if bucket.min == 0 or bucket.min > response_time then
bucket.min = response_time
end
return 0
end
function timer_event(ns)
last_tick = ns
local num = 0
local msg_injected = 0
for service, methods in pairs(all_times) do
for method, statuses in pairs(methods) do
for status, bucket in pairs(statuses) do
local metric_name = service .. '_http_response_times'
bucket.rate = bucket.count / interval
bucket[percentile_field_name] = bucket.max
if bucket.count > 1 then
table.sort(bucket.times)
local tmp = ((100 - percentile_thresh) / 100) * bucket.count
local idx = bucket.count - math.floor(tmp + .5)
if idx > 0 and bucket.times[idx] then
bucket[percentile_field_name] = bucket.times[idx]
end
end
bucket.times = nil
utils.add_to_bulk_metric(metric_name, bucket, {http_method=method, http_status=status})
all_times[service][method][status] = nil
num = num + 1
num_metrics = num_metrics - 1
if num >= bulk_size then
if msg_injected < max_timer_inject then
utils.inject_bulk_metric(ns, hostname, msg_source)
msg_injected = msg_injected + 1
num = 0
num_metrics = 0
end
end
end
all_times[service][method] = nil
end
all_times[service] = nil
end
if num > 0 then
utils.inject_bulk_metric(ns, hostname, msg_source)
num = 0
num_metrics = 0
end
end

View File

@ -115,14 +115,15 @@ end
function process_bulk_metric()
-- The payload contains a list of datapoints, each point being formatted
-- like this: {name='foo',value=1,tags={k1=v1,...}}
-- either like this: {name='foo',value=1,tags={k1=v1,...}}
-- or for multi_values: {name='bar',values={k1=v1, ..},tags={k1=v1,...}
local datapoints = decode_json_payload()
if not datapoints then
return 'Invalid payload value'
end
for _, point in ipairs(datapoints) do
encode_datapoint(point.name, point.value, point.tags or {})
encode_datapoint(point.name, point.value or point.values, point.tags or {})
end
end

View File

@ -0,0 +1,44 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
class lma_collector::logs::aggregated_http_metrics (
$interval = 10,
$hostname = $::hostname,
$bulk_size = $lma_collector::params::http_aggregated_metrics_bulk_size,
$max_timer_inject = $lma_collector::params::hekad_max_timer_inject,
$percentile = 90,
$grace_time = 5,
) inherits lma_collector::params {
include lma_collector::service::log
$lua_modules_dir = $lma_collector::params::lua_modules_dir
heka::filter::sandbox { 'aggregated_http_metrics':
config_dir => $lma_collector::params::log_config_dir,
filename => "${lma_collector::params::plugins_dir}/filters/http_metrics_aggregator.lua",
message_matcher => 'Type == \'log\' && Fields[http_response_time] != NIL',
ticker_interval => $interval,
config => {
hostname => $hostname,
interval => $interval,
max_timer_inject => $max_timer_inject,
bulk_size => $bulk_size,
percentile => $percentile,
grace_time => $grace_time,
},
module_directory => $lua_modules_dir,
notify => Class['lma_collector::service::log'],
}
}

View File

@ -19,7 +19,9 @@ class lma_collector::logs::http_metrics {
$lua_modules_dir = $lma_collector::params::lua_modules_dir
# This sandbox has been replaced by the aggregated_http_metrics one.
heka::filter::sandbox { 'http_metrics':
ensure => absent,
config_dir => $lma_collector::params::log_config_dir,
filename => "${lma_collector::params::plugins_dir}/filters/http_metrics.lua",
message_matcher => 'Type == \'log\' && Fields[http_response_time] != NIL',

View File

@ -100,6 +100,12 @@ class lma_collector::params {
$buffering_max_buffer_size_for_nagios = 1 * 1024 * 1024
$queue_full_action_for_nagios = 'drop'
# HTTP aggregated metrics bulk_size parameter depends on hekad_max_message_size.
# The bulk_size is calculated considering that one metric bucket is a string
# of 300B size and we pick 60% of the theorical value.
# With the hekad_max_message_size set to 256KB, the bulk_size is 524 metrics.
$http_aggregated_metrics_bulk_size = floor($hekad_max_message_size / 300 * 0.6)
# Heka's default value is 1
$hekad_max_process_inject = 1

View File

@ -163,7 +163,9 @@ These metrics are retrieved from the Neutron database.
API response times
^^^^^^^^^^^^^^^^^^
* ``openstack_<service>_http_responses``, the time (in second) it took to serve the HTTP request. The metric contains ``http_method`` (eg 'GET', 'POST', and so forth) and ``http_status`` (eg '200', '404', and so forth) fields.
* ``openstack_<service>_http_response_times``, HTTP response time statistics.
The statistics are ``min``, ``max``, ``sum``, ``count``, ``upper_90`` (90 percentile) over 10 seconds.
The metric contains ``http_method`` (eg 'GET', 'POST', and so forth) and ``http_status`` (eg '2xx', '4xx', and so forth) fields.
``<service>`` is one of 'cinder', 'glance', 'heat' 'keystone', 'neutron' or 'nova'.