Add Sandboxes to handle Ceilometer samples/resources

The Lua sandboxes aren't used for now. To give it a try, one could the
configuration stored in contrib/ceilometer.toml.

blueprint: ceilometer-stacklight-integration

Co-Authored-By: Igor Degtiarov <idegtiarov@mirantis.com>
Co-Authored-By: Ilya Tyaptin <ityaptin@mirantis.com>

Change-Id: I7634dd0ee4f3200d1a82ab26feafa54a8ac74e51
This commit is contained in:
Swann Croiset 2016-05-09 14:35:28 +02:00
parent c4e1f22b8d
commit 8bc362d745
5 changed files with 311 additions and 11 deletions

74
contrib/ceilometer.toml Normal file
View File

@ -0,0 +1,74 @@
[openstack_sample_amqp]
type = "AMQPInput"
url = "amqp://<USER>:<PASS>@<RABBIT_ADDRESS>:5673/"
exchange = "ceilometer"
exchange_type = "topic"
exchange_durability = false
exchange_auto_delete = false
queue_auto_delete = false
queue = "metering.sample"
routing_key = "metering.sample"
decoder = "sample_decoder"
splitter = "NullSplitter"
can_exit = false
[sample_decoder]
type = "SandboxDecoder"
filename = "/usr/share/lma_collector/decoders/metering.lua"
module_directory = "/usr/share/lma_collector_modules;/usr/share/heka/lua_modules"
[sample_decoder.config]
#decode_resources = "TRUE"
metadata_fields = 'status deleted container_format min_ram updated_at min_disk is_public size checksum created_at disk_format protected instance_host host display_name instance_id instance_type status state'
[ceilometer_influxdb_accumulator_filter]
type = "SandboxFilter"
filename = "/usr/share/lma_collector/filters/influxdb_accumulator.lua"
preserve_data = false
message_matcher = "Fields[aggregator] == NIL && Type =~ /ceilometer_samples$/"
ticker_interval = 1
module_directory = "/usr/share/lma_collector_modules;/usr/share/heka/lua_modules"
[ceilometer_influxdb_accumulator_filter.config]
tag_fields = 'deployment_id environment_label hostname tenant_id user_id'
time_precision = 'ms'
payload_name = 'sample_data'
bulk_metric_type_matcher = 'ceilometer_samples$'
flush_count = 10
[influxdb_encoder]
type = "PayloadEncoder"
append_newlines = false
prefix_ts = false
[samples_influxdb_output]
type = "HttpOutput"
message_matcher = "Fields[payload_type] == 'txt' && Fields[payload_name] == 'sample_data'"
encoder = "influxdb_encoder"
address = "http://10.109.41.14:8086/write?db=ceilometer&precision=ms"
username = "ceilo"
password = "<PASSWORD>"
http_timeout = 5000
method = "POST"
[samples_influxdb_output.headers]
Content-Type = ["application/x-www-form-urlencoded"]
[elasticsearch_resource_output]
type = "ElasticSearchOutput"
message_matcher = "Type == 'resource'"
encoder = "elasticsearch_resource_encoder"
flush_interval = 5000
flush_count = 10
server = "http://<ES>:9200"
[elasticsearch_resource_encoder]
type = "SandboxEncoder"
filename = "/usr/share/lma_collector/encoders/es_bulk.lua"
module_directory = "/usr/share/lma_collector_modules;/usr/share/heka/lua_modules"
[elasticsearch_resource_encoder.config]
index = "ceilometer_resource"
type_name = "source"

View File

@ -15,6 +15,7 @@ local cjson = require 'cjson'
local string = require 'string'
local extra = require 'extra_fields'
local patt = require 'patterns'
local math = require 'math'
local pairs = pairs
local inject_message = inject_message
@ -245,4 +246,27 @@ function truncate(str, max_length, delimiter)
return string.sub(str, 1, pos)
end
-- Convert a nanosecond timestamp to a lower precision timestamp.
-- Arguments:
-- timestamp_precision: one of 'ms', 's', 'm' or 'h'.
-- timestamp: a timestamp in nanosecond, if not provided the message Timestamp is used.
function message_timestamp(timestamp_precision, timestamp)
-- Default is to divide ns to ms
local timestamp_divisor = 1e6
-- Divide ns to s
if timestamp_precision == "s" then
timestamp_divisor = 1e9
-- Divide ns to m
elseif timestamp_precision == "m" then
timestamp_divisor = 1e9 * 60
-- Divide ns to h
elseif timestamp_precision == "h" then
timestamp_divisor = 1e9 * 60 * 60
end
if timestamp == nil then
timestamp = read_message("Timestamp")
end
return math.floor(timestamp / timestamp_divisor)
end
return M

View File

@ -0,0 +1,135 @@
-- Copyright 2016 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require "string"
require "cjson"
require 'table'
require 'math'
local patt = require 'patterns'
local utils = require 'lma_utils'
local l = require 'lpeg'
l.locale(l)
function normalize_uuid(uuid)
return patt.Uuid:match(uuid)
end
-- the metadata_fields parameter is a list of words separated by space
local fields_grammar = l.Ct((l.C((l.P(1) - l.P" ")^1) * l.P" "^0)^0)
local metadata_fields = fields_grammar:match(
read_config("metadata_fields") or ""
)
local decode_resources = read_config('decode_resources') or false
local sample_msg = {
Timestamp = nil,
-- This message type has the same structure than 'bulk_metric'.
Type = "ceilometer_samples",
Payload = nil
}
local resource_msg = {
Timestamp = nil,
Type = "ceilometer_resource",
Fields = nil,
}
function inject_metadata(metadata, tags)
local value
for _, field in ipairs(metadata_fields) do
value = metadata[field]
if value ~= nil and type(value) ~= 'table' then
tags["metadata." .. field] = value
end
end
end
function add_resource_to_payload(sample, payload)
local resource_data = {
timestamp = sample.timestamp,
resource_id = sample.resource_id,
source = sample.source or "",
metadata = sample.resource_metadata,
user_id = sample.user_id,
project_id = sample.project_id,
meter = {
[sample.counter_name] = {
type = sample.counter_type,
unit = sample.counter_unit
}
}
}
payload[sample.resource_id] = resource_data
end
function add_sample_to_payload(sample, payload)
local sample_data = {
name='sample',
timestamp = patt.Timestamp:match(sample.timestamp),
values = {
value = sample.counter_volume,
message_id = sample.message_id,
recorded_at = sample.recorded_at,
timestamp = sample.timestamp,
message_signature = sample.signature,
type = sample.counter_type,
unit = sample.counter_unit
}
}
local tags = {
meter = sample.counter_name,
resource_id = sample.resource_id,
project_id = sample.project_id ,
user_id = sample.user_id,
source = sample.source
}
inject_metadata(sample.resource_metadata or {}, tags)
sample_data["tags"] = tags
table.insert(payload, sample_data)
end
function process_message ()
local data = read_message("Payload")
local ok, message = pcall(cjson.decode, data)
if not ok then
return -1, "Cannot decode Payload"
end
local ok, message_body = pcall(cjson.decode, message["oslo.message"])
if not ok then
return -1, "Cannot decode Payload[oslo.message]"
end
local sample_payload = {}
local resource_payload = {}
for _, sample in ipairs(message_body["payload"]) do
add_sample_to_payload(sample, sample_payload)
if decode_resources then
add_resource_to_payload(sample, resource_payload)
end
end
sample_msg.Payload = cjson.encode(sample_payload)
sample_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
utils.safe_inject_message(sample_msg)
if decode_resources then
resource_msg.Payload = cjson.encode(resource_payload)
resource_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
utils.safe_inject_message(resource_msg)
end
return 0
end

View File

@ -0,0 +1,66 @@
-- Copyright 2016 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require "string"
require "cjson"
local elasticsearch = require "elasticsearch"
local index = read_config("index") or "index"
local type_name = read_config("type_name") or "message"
function process_message()
local ns
local resources = cjson.decode(read_message("Payload"))
for resource_id, resource in pairs(resources) do
local update = cjson.encode({update = {_index = index, _type = type_name,
_id = resource_id}})
local body = {
script = 'ctx._source.meters += meter;' ..
'ctx._source.user_id = user_id;' ..
'ctx._source.project_id = project_id;' ..
'ctx._source.source = source; ' ..
'ctx._source.metadata = ' ..
'ctx._source.last_sample_timestamp <= timestamp ? ' ..
'metadata : ctx._source.metadata;' ..
'ctx._source.last_sample_timestamp = ' ..
'ctx._source.last_sample_timestamp < timestamp ?' ..
'timestamp : ctx._source.last_sample_timestamp;' ..
'ctx._source.first_sample_timestamp = ' ..
'ctx._source.first_sample_timestamp > timestamp ?' ..
'timestamp : ctx._source.first_sample_timestamp;',
params = {
meter = resource.meter,
metadata = resource.metadata,
timestamp = resource.timestamp,
user_id = resource.user_id or '',
project_id = resource.project_id or '',
source = resource.source or '',
},
upsert = {
first_sample_timestamp = resource.timestamp,
last_sample_timestamp = resource.timestamp,
project_id = resource.project_id or '',
user_id = resource.user_id or '',
source = resource.source or '',
metadata = resource.metadata,
meters = resource.meter
}
}
body = cjson.encode(body)
add_to_payload(update, "\n", body, "\n")
end
inject_payload()
return 0
end

View File

@ -15,7 +15,6 @@ require 'cjson'
require 'os'
require 'string'
require 'table'
local field_util = require 'field_util'
local utils = require 'lma_utils'
local l = require 'lpeg'
l.locale(l)
@ -25,6 +24,8 @@ local flush_interval = read_config('flush_interval') or 5
local default_tenant_id = read_config("default_tenant_id")
local default_user_id = read_config("default_user_id")
local time_precision = read_config("time_precision")
local payload_name = read_config("payload_name") or "influxdb"
local bulk_metric_type_matcher = read_config("bulk_metric_type_matcher") or "bulk_metric$"
-- the tag_fields parameter is a list of tags separated by spaces
local tag_grammar = l.Ct((l.C((l.P(1) - l.P" ")^1) * l.P" "^0)^0)
@ -47,7 +48,7 @@ function flush ()
local now = os.time()
if #datapoints > 0 and (#datapoints > flush_count or now - last_flush > flush_interval) then
datapoints[#datapoints+1] = ''
utils.safe_inject_payload("txt", "influxdb", table.concat(datapoints, "\n"))
utils.safe_inject_payload("txt", payload_name, table.concat(datapoints, "\n"))
datapoints = {}
last_flush = now
@ -117,13 +118,14 @@ function process_bulk_metric()
-- The payload contains a list of datapoints, each point being formatted
-- either like this: {name='foo',value=1,tags={k1=v1,...}}
-- or for multi_values: {name='bar',values={k1=v1, ..},tags={k1=v1,...}
-- datapoints can also contain a 'timestamp' in millisecond.
local datapoints = decode_json_payload()
if not datapoints then
return 'Invalid payload value'
end
for _, point in ipairs(datapoints) do
encode_datapoint(point.name, point.value or point.values, point.tags or {})
encode_datapoint(point.name, point.value or point.values, point.tags or {}, point.timestamp)
end
end
@ -161,19 +163,18 @@ end
-- name: the measurement's name
-- value: a scalar value or a list of key-value pairs
-- tags: a table of tags
-- timestamp: an optional timestamp in nanosecond
--
-- Timestamp is taken from the Heka message
function encode_datapoint(name, value, tags)
-- Timestamp is taken from the Heka message if not provided as parameter.
function encode_datapoint(name, value, tags, timestamp)
if type(name) ~= 'string' or value == nil or type(tags) ~= 'table' then
-- fail silently if any input parameter is invalid
return
end
local ts
if time_precision and time_precision ~= 'ns' then
ts = field_util.message_timestamp(time_precision)
else
ts = read_message('Timestamp')
local ts = timestamp or read_message('Timestamp')
if time_precision and time_precision ~= 'ns' then
ts = utils.message_timestamp(time_precision, ts)
end
-- Add the common tags
@ -211,7 +212,7 @@ end
function process_message()
local err_msg
local msg_type = read_message("Type")
if msg_type:match('bulk_metric$') then
if msg_type:match(bulk_metric_type_matcher) then
err_msg = process_bulk_metric()
else
err_msg = process_single_metric()