Merge "Isolate InfluxDB accumulator into a Lua module"
This commit is contained in:
@@ -0,0 +1,63 @@
|
||||
-- Copyright 2016 Mirantis, Inc.
|
||||
--
|
||||
-- Licensed under the Apache License, Version 2.0 (the "License");
|
||||
-- you may not use this file except in compliance with the License.
|
||||
-- You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing, software
|
||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
-- See the License for the specific language governing permissions and
|
||||
-- limitations under the License.
|
||||
local time = os.time
|
||||
local string = string
|
||||
local table = table
|
||||
local setmetatable = setmetatable
|
||||
local ipairs = ipairs
|
||||
local pairs = pairs
|
||||
local tostring = tostring
|
||||
local type = type
|
||||
|
||||
local Accumulator = {}
|
||||
Accumulator.__index = Accumulator
|
||||
|
||||
setfenv(1, Accumulator) -- Remove external access to contain everything in the module
|
||||
|
||||
-- Create a new Accumulator
|
||||
--
|
||||
-- flush_count: the maximum number of items to accumulate before flushing
|
||||
-- flush_interval: the maximum number of seconds to wait before flushing
|
||||
-- callback: the function to call back when flushing the accumulator, it will
|
||||
-- receive the table of accumulated items as parameter.
|
||||
function Accumulator.new(flush_count, flush_interval, callback)
|
||||
local a = {}
|
||||
setmetatable(a, Accumulator)
|
||||
a.flush_count = flush_count
|
||||
a.flush_interval = flush_interval
|
||||
a.flush_cb = callback
|
||||
a.last_flush = time() * 1e9
|
||||
a.buffer = {}
|
||||
return a
|
||||
end
|
||||
|
||||
-- Flush the buffer if flush_count or flush_interval are met
|
||||
--
|
||||
-- ns: the current timestamp in nanosecond (optional)
|
||||
function Accumulator:flush(ns)
|
||||
local now = ns or time() * 1e9
|
||||
if #self.buffer > self.flush_count or now - self.last_flush > self.flush_interval then
|
||||
self.flush_cb(self.buffer)
|
||||
self.buffer = {}
|
||||
self.last_flush = now
|
||||
end
|
||||
end
|
||||
|
||||
-- Append an item to the buffer and flush the buffer if needed
|
||||
function Accumulator:append(item)
|
||||
self.buffer[#self.buffer+1] = item
|
||||
self:flush()
|
||||
end
|
||||
|
||||
return Accumulator
|
||||
@@ -16,7 +16,8 @@ require 'os'
|
||||
require 'string'
|
||||
require 'table'
|
||||
local utils = require 'lma_utils'
|
||||
local influxdb = require 'influxdb'
|
||||
local Accumulator = require 'accumulator'
|
||||
local Influxdb = require 'influxdb'
|
||||
local l = require 'lpeg'
|
||||
l.locale(l)
|
||||
|
||||
@@ -36,22 +37,15 @@ local defaults = {
|
||||
tenant_id=default_tenant_id,
|
||||
user_id=default_user_id,
|
||||
}
|
||||
local last_flush = os.time()
|
||||
local datapoints = {}
|
||||
local encoder = influxdb.new(time_precision)
|
||||
|
||||
-- Flush the datapoints to InfluxDB if enough items are present or if the
|
||||
-- timeout has expired
|
||||
function flush ()
|
||||
local now = os.time()
|
||||
if #datapoints > 0 and (#datapoints > flush_count or now - last_flush > flush_interval) then
|
||||
function flush_cb(datapoints)
|
||||
if #datapoints > 0 then
|
||||
datapoints[#datapoints+1] = ''
|
||||
utils.safe_inject_payload("txt", payload_name, table.concat(datapoints, "\n"))
|
||||
|
||||
datapoints = {}
|
||||
last_flush = now
|
||||
end
|
||||
end
|
||||
local accumulator = Accumulator.new(flush_count, flush_interval, flush_cb)
|
||||
local encoder = Influxdb.new(time_precision)
|
||||
|
||||
-- return a table containing the common tags from the message
|
||||
function get_common_tags()
|
||||
@@ -86,7 +80,12 @@ function process_single_metric()
|
||||
i = i + 1
|
||||
end
|
||||
|
||||
datapoints[#datapoints+1] = encoder:encode_datapoint(read_message('Timestamp'), name, value, tags)
|
||||
accumulator:append(
|
||||
encoder:encode_datapoint(
|
||||
read_message('Timestamp'),
|
||||
name,
|
||||
value,
|
||||
tags))
|
||||
return
|
||||
end
|
||||
|
||||
@@ -119,11 +118,12 @@ function process_bulk_metric()
|
||||
point.tags[k] = v
|
||||
end
|
||||
end
|
||||
datapoints[#datapoints+1] = encoder:encode_datapoint(
|
||||
msg_timestamp,
|
||||
point.name,
|
||||
point.value or point.values,
|
||||
point.tags)
|
||||
accumulator:append(
|
||||
encoder:encode_datapoint(
|
||||
msg_timestamp,
|
||||
point.name,
|
||||
point.value or point.values,
|
||||
point.tags))
|
||||
end
|
||||
return
|
||||
end
|
||||
@@ -137,8 +137,6 @@ function process_message()
|
||||
err_msg = process_single_metric()
|
||||
end
|
||||
|
||||
flush()
|
||||
|
||||
if err_msg then
|
||||
return -1, err_msg
|
||||
else
|
||||
@@ -147,5 +145,5 @@ function process_message()
|
||||
end
|
||||
|
||||
function timer_event(ns)
|
||||
flush()
|
||||
accumulator:flush(ns)
|
||||
end
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
-- Copyright 2016 Mirantis, Inc.
|
||||
--
|
||||
-- Licensed under the Apache License, Version 2.0 (the "License");
|
||||
-- you may not use this file except in compliance with the License.
|
||||
-- You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing, software
|
||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
-- See the License for the specific language governing permissions and
|
||||
-- limitations under the License.
|
||||
|
||||
EXPORT_ASSERT_TO_GLOBALS=true
|
||||
require('luaunit')
|
||||
require('os')
|
||||
package.path = package.path .. ";files/plugins/common/?.lua;tests/lua/mocks/?.lua"
|
||||
|
||||
local accumulator = require('accumulator')
|
||||
|
||||
TestAccumulator = {}
|
||||
|
||||
function TestAccumulator:test_flush_on_append()
|
||||
local sentinel = false
|
||||
local function test_cb(items)
|
||||
assertEquals(#items, 3)
|
||||
sentinel = true
|
||||
end
|
||||
local accum = accumulator.new(2, 5, test_cb)
|
||||
accum:append(1)
|
||||
assertEquals(sentinel, false)
|
||||
accum:append(2)
|
||||
assertEquals(sentinel, false)
|
||||
accum:append(3)
|
||||
assertEquals(sentinel, true)
|
||||
end
|
||||
|
||||
function TestAccumulator:test_flush_interval_with_buffer()
|
||||
local now = os.time()
|
||||
local sentinel = false
|
||||
local function test_cb(items)
|
||||
assertEquals(#items, 1)
|
||||
sentinel = true
|
||||
end
|
||||
local accum = accumulator.new(20, 1, test_cb)
|
||||
accum:append(1)
|
||||
assertEquals(sentinel, false)
|
||||
accum:flush((now + 2) * 1e9)
|
||||
assertEquals(sentinel, true)
|
||||
end
|
||||
|
||||
function TestAccumulator:test_flush_interval_with_empty_buffer()
|
||||
local now = os.time()
|
||||
local sentinel = false
|
||||
local function test_cb(items)
|
||||
assertEquals(#items, 0)
|
||||
sentinel = true
|
||||
end
|
||||
local accum = accumulator.new(20, 1, test_cb)
|
||||
accum:flush((now + 2) * 1e9)
|
||||
assertEquals(sentinel, true)
|
||||
end
|
||||
|
||||
lu = LuaUnit
|
||||
lu:setVerbosity( 1 )
|
||||
os.exit( lu:run() )
|
||||
|
||||
Reference in New Issue
Block a user