Revert "Increase the number of points per InfluxDB batch"
This reverts commit 567562faaf
.
Change-Id: I14549b8cad02058c3352c71bad80ff2ad0dcd970
This commit is contained in:
@@ -807,10 +807,6 @@ InfluxDB.
|
|||||||
options: an array. Default: `[]`.
|
options: an array. Default: `[]`.
|
||||||
* `time_precision`: *Optional*. Time precision. Valid options: a string.
|
* `time_precision`: *Optional*. Time precision. Valid options: a string.
|
||||||
Default: `ms`.
|
Default: `ms`.
|
||||||
* `flush_count`: *Optional*. Maximum number of datapoints to send in a single
|
|
||||||
write request. Valid values: an integer. Default: `5000`.
|
|
||||||
* `flush_interval`: *Optional*. Maximum number of seconds to wait before
|
|
||||||
writing data to InfluxDB. Valid values: an integer. Default: `5`.
|
|
||||||
|
|
||||||
#### Class: `lma_collector::notifications::input`
|
#### Class: `lma_collector::notifications::input`
|
||||||
|
|
||||||
|
@@ -19,8 +19,8 @@ local utils = require 'lma_utils'
|
|||||||
local l = require 'lpeg'
|
local l = require 'lpeg'
|
||||||
l.locale(l)
|
l.locale(l)
|
||||||
|
|
||||||
local flush_count = (read_config('flush_count') or 100) + 0
|
local flush_count = read_config('flush_count') or 100
|
||||||
local flush_interval = (read_config('flush_interval') or 5) + 0
|
local flush_interval = read_config('flush_interval') or 5
|
||||||
local default_tenant_id = read_config("default_tenant_id")
|
local default_tenant_id = read_config("default_tenant_id")
|
||||||
local default_user_id = read_config("default_user_id")
|
local default_user_id = read_config("default_user_id")
|
||||||
local time_precision = read_config("time_precision")
|
local time_precision = read_config("time_precision")
|
||||||
|
@@ -20,16 +20,16 @@ class lma_collector::influxdb (
|
|||||||
$port = $lma_collector::params::influxdb_port,
|
$port = $lma_collector::params::influxdb_port,
|
||||||
$tag_fields = $lma_collector::params::influxdb_tag_fields,
|
$tag_fields = $lma_collector::params::influxdb_tag_fields,
|
||||||
$time_precision = $lma_collector::params::influxdb_time_precision,
|
$time_precision = $lma_collector::params::influxdb_time_precision,
|
||||||
$flush_count = $lma_collector::params::influxdb_flush_count,
|
|
||||||
$flush_interval = $lma_collector::params::influxdb_flush_interval,
|
|
||||||
) inherits lma_collector::params {
|
) inherits lma_collector::params {
|
||||||
include lma_collector::service::metric
|
include lma_collector::service::metric
|
||||||
|
|
||||||
$lua_modules_dir = $lma_collector::params::lua_modules_dir
|
$lua_modules_dir = $lma_collector::params::lua_modules_dir
|
||||||
|
|
||||||
validate_string($database, $user, $password, $server, $time_precision)
|
validate_string($database)
|
||||||
|
validate_string($user)
|
||||||
|
validate_string($password)
|
||||||
|
validate_string($server)
|
||||||
validate_array($tag_fields)
|
validate_array($tag_fields)
|
||||||
validate_integer([$flush_count, $flush_interval])
|
|
||||||
|
|
||||||
heka::filter::sandbox { 'influxdb_accumulator':
|
heka::filter::sandbox { 'influxdb_accumulator':
|
||||||
config_dir => $lma_collector::params::metric_config_dir,
|
config_dir => $lma_collector::params::metric_config_dir,
|
||||||
@@ -37,8 +37,8 @@ class lma_collector::influxdb (
|
|||||||
message_matcher => $lma_collector::params::influxdb_message_matcher,
|
message_matcher => $lma_collector::params::influxdb_message_matcher,
|
||||||
ticker_interval => 1,
|
ticker_interval => 1,
|
||||||
config => {
|
config => {
|
||||||
flush_interval => $flush_interval,
|
flush_interval => $lma_collector::params::influxdb_flush_interval,
|
||||||
flush_count => $flush_count,
|
flush_count => $lma_collector::params::influxdb_flush_count,
|
||||||
tag_fields => join(sort(concat(['hostname'], $tag_fields)), ' '),
|
tag_fields => join(sort(concat(['hostname'], $tag_fields)), ' '),
|
||||||
time_precision => $time_precision,
|
time_precision => $time_precision,
|
||||||
# FIXME(pasquier-s): provide the default_tenant_id & default_user_id
|
# FIXME(pasquier-s): provide the default_tenant_id & default_user_id
|
||||||
|
@@ -156,9 +156,6 @@ class lma_collector::params {
|
|||||||
|
|
||||||
$influxdb_port = '8086'
|
$influxdb_port = '8086'
|
||||||
$influxdb_timeout = 5
|
$influxdb_timeout = 5
|
||||||
$influxdb_flush_interval = 5
|
|
||||||
# InfluxDB recommends a batch size of 5,000 points
|
|
||||||
$influxdb_flush_count = 5000
|
|
||||||
$influxdb_tag_fields = []
|
$influxdb_tag_fields = []
|
||||||
$influxdb_time_precision = 'ms'
|
$influxdb_time_precision = 'ms'
|
||||||
$influxdb_message_matcher = join([
|
$influxdb_message_matcher = join([
|
||||||
|
@@ -20,8 +20,8 @@ describe 'lma_collector::influxdb' do
|
|||||||
end
|
end
|
||||||
|
|
||||||
describe 'with mandatory parameters' do
|
describe 'with mandatory parameters' do
|
||||||
let(:params) {{ :server => 'localhost', :user => 'lma', :password => 'lma',
|
let(:params) {{ :server => 'localhost', :user => 'lma', :password =>
|
||||||
:database => 'lma' }}
|
'lma', :database => 'lma' }}
|
||||||
it { is_expected.to contain_heka__output__http('influxdb') }
|
it { is_expected.to contain_heka__output__http('influxdb') }
|
||||||
it { is_expected.to contain_heka__encoder__payload('influxdb') }
|
it { is_expected.to contain_heka__encoder__payload('influxdb') }
|
||||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator') }
|
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator') }
|
||||||
@@ -29,27 +29,13 @@ describe 'lma_collector::influxdb' do
|
|||||||
end
|
end
|
||||||
|
|
||||||
describe 'with tag_fields parameter' do
|
describe 'with tag_fields parameter' do
|
||||||
let(:params) {{ :server => 'localhost', :user => 'lma', :password => 'lma',
|
let(:params) {{ :server => 'localhost', :user => 'lma', :password =>
|
||||||
:database => 'lma', :tag_fields => ['foo', 'zzz'] }}
|
'lma', :database => 'lma', :tag_fields => ['foo', 'zzz'] }}
|
||||||
it { is_expected.to contain_heka__output__http('influxdb') }
|
it { is_expected.to contain_heka__output__http('influxdb') }
|
||||||
it { is_expected.to contain_heka__encoder__payload('influxdb') }
|
it { is_expected.to contain_heka__encoder__payload('influxdb') }
|
||||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({
|
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({
|
||||||
"tag_fields" => "foo hostname zzz", "flush_interval"=> "5",
|
"tag_fields" => "foo hostname zzz", "flush_interval"=> :undef,
|
||||||
"flush_count"=> "5000", "time_precision" => "ms"}) }
|
"flush_count"=> :undef, "time_precision" => "ms"}) }
|
||||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') }
|
|
||||||
end
|
|
||||||
|
|
||||||
describe 'with flush and precision parameters' do
|
|
||||||
let(:params) {{ :server => 'localhost', :user => 'lma', :password => 'lma',
|
|
||||||
:database => 'lma', :flush_count => 1, :flush_interval => 2,
|
|
||||||
:time_precision => 's'
|
|
||||||
}}
|
|
||||||
it { is_expected.to contain_heka__output__http('influxdb') }
|
|
||||||
it { is_expected.to contain_heka__encoder__payload('influxdb') }
|
|
||||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_accumulator').with_config({
|
|
||||||
"flush_interval"=> "2", "flush_count"=> "1", "time_precision" => "s",
|
|
||||||
"tag_fields" => "hostname"
|
|
||||||
}) }
|
|
||||||
it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') }
|
it { is_expected.to contain_heka__filter__sandbox('influxdb_annotation') }
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
Reference in New Issue
Block a user