Enable buffering for HTTP output plugins
This change enables buffering for the Heka HTTP output plugin. We don't enable buffering for Nagios outputs: it's ok to lose service checks since Nagios is only interested by the current value. It also adds validation for the buffering parameters because it appeared during the tests that max_buffer_size needs to be greater than max_file_size. Change-Id: Ifbe98a2d649322f1213c4ecab71a95533f59510f
This commit is contained in:
parent
218c7d9884
commit
6467d161ac
@ -0,0 +1,71 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
module Puppet::Parser::Functions
|
||||
newfunction(:validate_buffering, :doc => <<-'ENDHEREDOC') do |args|
|
||||
Validate that the parameters used for buffering are consistent.
|
||||
|
||||
The function takes 3 arguments:
|
||||
- max_buffer_size
|
||||
- max_file_size
|
||||
- full_action
|
||||
|
||||
In practice, max_buffer_size must be greater than max_file_size.
|
||||
|
||||
The following values will pass:
|
||||
|
||||
$max_buffer_size = 2048
|
||||
$max_file_size = 1024
|
||||
validate_buffering($max_buffer_size, $max_file_size, 'drop')
|
||||
|
||||
The following values will fail:
|
||||
|
||||
$max_buffer_size = 1024
|
||||
$max_file_size = 2048
|
||||
validate_buffering($max_buffer_size, $max_file_size)
|
||||
|
||||
$max_buffer_size = 2048
|
||||
$max_file_size = 1024
|
||||
validate_buffering($max_buffer_size, $max_file_size, 'foo')
|
||||
|
||||
ENDHEREDOC
|
||||
|
||||
unless args.length == 3 then
|
||||
raise Puppet::ParseError, ("validate_buffering(): wrong number of arguments (#{args.length}; must be 3)")
|
||||
end
|
||||
|
||||
unless args[0].to_s =~ /^\d+$/ then
|
||||
raise Puppet::ParseError, ("validate_buffering(): bad argument (#{args[0]}}; must be integer)")
|
||||
end
|
||||
max_buffer_size = args[0].to_i
|
||||
|
||||
# When passing undef as args[1], it will be seen as the empty string and
|
||||
# evaluated as 0 which means no limit
|
||||
unless args[1].to_s =~ /^\d*$/ then
|
||||
raise Puppet::ParseError, ("validate_buffering(): bad argument (#{args[0]}}; must be integer)")
|
||||
end
|
||||
max_file_size = args[1].to_i
|
||||
max_file_size = 512 * 1024 * 1024 if max_file_size == 0
|
||||
|
||||
if max_buffer_size > 0 and max_buffer_size < max_file_size then
|
||||
raise(Puppet::ParseError, "validate_buffering(): max_buffer_size (" +
|
||||
"#{max_buffer_size}) should be greater than max_file_size (#{max_file_size})")
|
||||
end
|
||||
|
||||
unless ["drop", "shutdown", "block"].include?(args[2]) then
|
||||
raise(Puppet::ParseError, "validate_buffering(): full_action (" +
|
||||
"#{args[2]}) should be either drop, shutdown or block")
|
||||
end
|
||||
end
|
||||
end
|
@ -14,21 +14,25 @@
|
||||
#
|
||||
define heka::output::elasticsearch (
|
||||
$config_dir,
|
||||
$server = undef,
|
||||
$port = undef,
|
||||
$encoder = $title,
|
||||
$message_matcher = 'FALSE',
|
||||
$flush_interval = 5,
|
||||
$flush_count = 10,
|
||||
$use_buffering = true,
|
||||
$queue_max_buffer_size = 1000000000, # 1GB
|
||||
$server = undef,
|
||||
$port = undef,
|
||||
$encoder = $title,
|
||||
$message_matcher = 'FALSE',
|
||||
$flush_interval = 5,
|
||||
$flush_count = 10,
|
||||
$use_buffering = true,
|
||||
$max_buffer_size = 1024 * 1024 * 1024, # 1GiB
|
||||
$queue_full_action = 'drop',
|
||||
$max_file_size = undef,
|
||||
$ensure = present,
|
||||
$max_file_size = undef,
|
||||
$ensure = present,
|
||||
) {
|
||||
|
||||
include heka::params
|
||||
|
||||
if $use_buffering {
|
||||
validate_buffering($max_buffer_size, $max_file_size, $queue_full_action)
|
||||
}
|
||||
|
||||
file { "${config_dir}/output-${title}.toml":
|
||||
ensure => $ensure,
|
||||
content => template('heka/output/elasticsearch.toml.erb'),
|
||||
|
@ -15,19 +15,26 @@
|
||||
define heka::output::http (
|
||||
$config_dir,
|
||||
$url,
|
||||
$encoder = $title,
|
||||
$message_matcher = 'FALSE',
|
||||
$username = undef,
|
||||
$password = undef,
|
||||
$timeout = undef,
|
||||
$method = 'POST',
|
||||
$headers = {},
|
||||
$ensure = present,
|
||||
$encoder = $title,
|
||||
$message_matcher = 'FALSE',
|
||||
$username = undef,
|
||||
$password = undef,
|
||||
$timeout = undef,
|
||||
$method = 'POST',
|
||||
$headers = {},
|
||||
$use_buffering = true,
|
||||
$max_buffer_size = 1024 * 1024 * 1024, # 1GiB
|
||||
$queue_full_action = 'drop',
|
||||
$max_file_size = undef,
|
||||
$ensure = present,
|
||||
) {
|
||||
|
||||
include heka::params
|
||||
|
||||
validate_hash($headers)
|
||||
if $use_buffering {
|
||||
validate_buffering($max_buffer_size, $max_file_size, $queue_full_action)
|
||||
}
|
||||
|
||||
file { "${config_dir}/output-${title}.toml":
|
||||
ensure => $ensure,
|
||||
|
@ -14,18 +14,22 @@
|
||||
#
|
||||
define heka::output::tcp (
|
||||
$config_dir,
|
||||
$address = '127.0.0.1',
|
||||
$port = 5565,
|
||||
$message_matcher = 'FALSE',
|
||||
$use_buffering = true,
|
||||
$queue_max_buffer_size = 1000000000, # 1GB
|
||||
$address = '127.0.0.1',
|
||||
$port = 5565,
|
||||
$message_matcher = 'FALSE',
|
||||
$use_buffering = true,
|
||||
$max_buffer_size = 1024 * 1024 * 1024, # 1GiB
|
||||
$queue_full_action = 'drop',
|
||||
$max_file_size = undef,
|
||||
$ensure = present,
|
||||
$max_file_size = undef,
|
||||
$ensure = present,
|
||||
) {
|
||||
|
||||
include heka::params
|
||||
|
||||
if $use_buffering {
|
||||
validate_buffering($max_buffer_size, $max_file_size, $queue_full_action)
|
||||
}
|
||||
|
||||
file { "${config_dir}/output-${title}.toml":
|
||||
ensure => $ensure,
|
||||
content => template('heka/output/tcp.toml.erb'),
|
||||
|
@ -0,0 +1,38 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
require 'spec_helper'
|
||||
|
||||
describe 'heka::output::http' do
|
||||
let(:title) { :foo }
|
||||
let(:facts) do
|
||||
{:kernel => 'Linux', :operatingsystem => 'Ubuntu',
|
||||
:osfamily => 'Debian'}
|
||||
end
|
||||
|
||||
describe 'with title = foo' do
|
||||
let(:params) {{:config_dir => '/etc/hekad', :url => 'http://example.com/'}}
|
||||
it { is_expected.to contain_file('/etc/hekad/output-foo.toml') }
|
||||
end
|
||||
|
||||
describe 'with title = foo and buffering' do
|
||||
let(:params) {{:config_dir => '/etc/hekad',
|
||||
:url => 'http://example.com/',
|
||||
:use_buffering => true,
|
||||
:max_file_size => 50000,
|
||||
:max_buffer_size => 100000,
|
||||
:queue_full_action => 'shutdown'
|
||||
}}
|
||||
it { is_expected.to contain_file('/etc/hekad/output-foo.toml') }
|
||||
end
|
||||
end
|
@ -0,0 +1,24 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
require 'spec_helper'
|
||||
|
||||
describe 'validate_buffering' do
|
||||
it { is_expected.to run.with_params().and_raise_error(Puppet::ParseError, /wrong number of arguments/i) }
|
||||
it { is_expected.to run.with_params('foo', 'bar', 'drop').and_raise_error(Puppet::ParseError, /bad argument/i) }
|
||||
it { is_expected.to run.with_params(1024, 2048, 'drop').and_raise_error(Puppet::ParseError, /should be greater/i) }
|
||||
it { is_expected.to run.with_params(2048, 1024, 'foo').and_raise_error(Puppet::ParseError, /should be either/i) }
|
||||
it { is_expected.to run.with_params(0, '', 'shutdown') }
|
||||
it { is_expected.to run.with_params(2048, 1024, 'shutdown') }
|
||||
it { is_expected.to run.with_params(1024*1024*1024, 0, 'block') }
|
||||
end
|
@ -13,7 +13,7 @@ use_buffering = <%= @use_buffering %>
|
||||
|
||||
<% if @use_buffering -%>
|
||||
[<%= @title %>_output.buffering]
|
||||
max_buffer_size = <%= @queue_max_buffer_size %>
|
||||
max_buffer_size = <%= @max_buffer_size %>
|
||||
<% if @max_file_size != :undef -%>
|
||||
max_file_size = <%= @max_file_size %>
|
||||
<% end -%>
|
||||
|
@ -13,10 +13,20 @@ password = "<%= @password %>"
|
||||
http_timeout = <%= @timeout.to_i() * 1000 %>
|
||||
<% end -%>
|
||||
method = "<%= @method %>"
|
||||
use_buffering = <%= @use_buffering %>
|
||||
|
||||
<% if @use_buffering -%>
|
||||
[<%= @title %>_output.buffering]
|
||||
max_buffer_size = <%= @max_buffer_size %>
|
||||
<% if @max_file_size != :undef -%>
|
||||
max_file_size = <%= @max_file_size %>
|
||||
<% end -%>
|
||||
full_action = "<%= @queue_full_action %>"
|
||||
<% end -%>
|
||||
|
||||
<% if @headers.size() > 0 -%>
|
||||
[<%= @title %>_output.headers]
|
||||
<% @headers.keys().sort().each do |header| -%>
|
||||
<%= header %> = ["<%= @headers[header] %>"]
|
||||
<%end%>
|
||||
<% end -%>
|
||||
|
||||
|
@ -7,7 +7,7 @@ message_matcher = "<%= @message_matcher %>"
|
||||
use_buffering = <%= @use_buffering %>
|
||||
<% if @use_buffering -%>
|
||||
[<%= @title %>_tcpoutput.buffering]
|
||||
max_buffer_size = <%= @queue_max_buffer_size %>
|
||||
max_buffer_size = <%= @max_buffer_size %>
|
||||
<% if @max_file_size != :undef -%>
|
||||
max_file_size = <%= @max_file_size %>
|
||||
<% end -%>
|
||||
|
@ -49,6 +49,8 @@ define lma_collector::afd_nagios(
|
||||
headers => {
|
||||
'Content-Type' => 'application/x-www-form-urlencoded'
|
||||
},
|
||||
# Buffering isn't needed for Nagios checks
|
||||
use_buffering => false,
|
||||
require => Heka::Encoder::Sandbox["nagios_afd_${title}"],
|
||||
notify => Class['lma_collector::service'],
|
||||
}
|
||||
|
@ -31,6 +31,7 @@ class lma_collector::aggregator::client (
|
||||
address => $address,
|
||||
port => $port,
|
||||
use_buffering => $lma_collector::params::buffering_enabled,
|
||||
max_buffer_size => $lma_collector::params::buffering_max_buffer_size,
|
||||
max_file_size => $lma_collector::params::buffering_max_file_size,
|
||||
message_matcher => $lma_collector::params::aggregator_client_message_matcher,
|
||||
notify => Class['lma_collector::service'],
|
||||
|
@ -33,6 +33,7 @@ class lma_collector::elasticsearch (
|
||||
port => $port,
|
||||
message_matcher => 'Type == \'log\' || Type == \'notification\'',
|
||||
use_buffering => $lma_collector::params::buffering_enabled,
|
||||
max_buffer_size => $lma_collector::params::buffering_max_buffer_size,
|
||||
max_file_size => $lma_collector::params::buffering_max_file_size,
|
||||
require => Heka::Encoder::Es_json['elasticsearch'],
|
||||
notify => Class['lma_collector::service'],
|
||||
|
@ -60,6 +60,8 @@ define lma_collector::gse_nagios (
|
||||
headers => {
|
||||
'Content-Type' => 'application/x-www-form-urlencoded'
|
||||
},
|
||||
# Buffering isn't needed for Nagios checks
|
||||
use_buffering => false,
|
||||
require => Heka::Encoder::Sandbox["nagios_gse_${title}"],
|
||||
notify => Class['lma_collector::service'],
|
||||
}
|
||||
|
@ -66,6 +66,9 @@ class lma_collector::influxdb (
|
||||
headers => {
|
||||
'Content-Type' => 'application/x-www-form-urlencoded'
|
||||
},
|
||||
use_buffering => $lma_collector::params::buffering_enabled,
|
||||
max_file_size => $lma_collector::params::buffering_max_file_size,
|
||||
max_buffer_size => $lma_collector::params::buffering_max_buffer_size,
|
||||
require => Heka::Encoder::Payload['influxdb'],
|
||||
notify => Class['lma_collector::service'],
|
||||
}
|
||||
|
@ -71,10 +71,10 @@ class lma_collector::params {
|
||||
# this is required by elasticsearch buffered output.
|
||||
# Lets configure 192Kb by default.
|
||||
# see https://github.com/mozilla-services/heka/issues/1389
|
||||
$hekad_max_message_size = 192*1024
|
||||
$hekad_max_message_size = 192 * 1024
|
||||
|
||||
# Lets use the default Heka value
|
||||
$buffering_max_file_size = 0
|
||||
$buffering_max_file_size = 128 * 1024 * 1024
|
||||
$buffering_max_buffer_size = 1024 * 1024 * 1024
|
||||
|
||||
if $buffering_max_file_size != 0 and $buffering_max_file_size < $hekad_max_message_size {
|
||||
fail('max_message_size setting must be greater than max_file_size')
|
||||
|
Loading…
Reference in New Issue
Block a user