diff --git a/docker/heka/plugins/decoders/os_openstack_log.lua b/docker/heka/plugins/decoders/os_openstack_log.lua new file mode 100644 index 0000000000..d8f4446ee4 --- /dev/null +++ b/docker/heka/plugins/decoders/os_openstack_log.lua @@ -0,0 +1,140 @@ +-- Copyright 2015-2016 Mirantis, Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +require "string" +require "table" +local l = require 'lpeg' +l.locale(l) + +local patt = require 'os_patterns' +local utils = require 'os_utils' + +local msg = { + Timestamp = nil, + Type = 'log', + Hostname = nil, + Payload = nil, + Pid = nil, + Fields = nil, + Severity = nil, +} + +-- traceback_lines is a reference to a table used to accumulate lines of +-- a Traceback. traceback_key represent the key of the Traceback lines +-- being accumulated in traceback_lines. This is used to know when to +-- stop accumulating and inject the Heka message. +local traceback_key = nil +local traceback_lines = nil + +function prepare_message (service, timestamp, pid, severity_label, + python_module, programname, payload) + msg.Logger = 'openstack.' .. service + msg.Timestamp = timestamp + msg.Payload = payload + msg.Pid = pid + msg.Severity = utils.label_to_severity_map[severity_label] or 7 + msg.Fields = {} + msg.Fields.severity_label = severity_label + msg.Fields.python_module = python_module + msg.Fields.programname = programname + msg.Payload = payload +end + +-- OpenStack log messages are of this form: +-- 2015-11-30 08:38:59.306 3434 INFO oslo_service.periodic_task [-] Blabla... +-- +-- [-] is the "request" part, it can take multiple forms. + +function process_message () + + -- Logger is of form "_" (e.g. "nova_nova-api", + -- "neutron_l3-agent"). + local logger = read_message("Logger") + local service, program = string.match(logger, '([^_]+)_(.+)') + + local log = read_message("Payload") + local m + + m = patt.openstack:match(log) + if not m then + return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64)) + end + + local key = { + Timestamp = m.Timestamp, + Pid = m.Pid, + SeverityLabel = m.SeverityLabel, + PythonModule = m.PythonModule, + service = service, + program = program, + } + + if traceback_key ~= nil then + -- If traceback_key is not nil then it means we've started accumulated + -- lines of a Python traceback. We keep accumulating the traceback + -- lines util we get a different log key. + if utils.table_equal(traceback_key, key) then + table.insert(traceback_lines, m.Message) + return 0 + else + prepare_message(traceback_key.service, traceback_key.Timestamp, + traceback_key.Pid, traceback_key.SeverityLabel, + traceback_key.PythonModule, traceback_key.program, + table.concat(traceback_lines, '')) + traceback_key = nil + traceback_lines = nil + -- Ignore safe_inject_message status code here to still get a + -- chance to inject the current log message. + utils.safe_inject_message(msg) + end + end + + if patt.traceback:match(m.Message) then + -- Python traceback detected, begin accumulating the lines making + -- up the traceback. + traceback_key = key + traceback_lines = {} + table.insert(traceback_lines, m.Message) + return 0 + end + + prepare_message(service, m.Timestamp, m.Pid, m.SeverityLabel, m.PythonModule, + program, m.Message) + + m = patt.openstack_request_context:match(msg.Payload) + if m then + msg.Fields.request_id = m.RequestId + if m.UserId then + msg.Fields.user_id = m.UserId + end + if m.TenantId then + msg.Fields.tenant_id = m.TenantId + end + end + + m = patt.openstack_http:match(msg.Payload) + if m then + msg.Fields.http_method = m.http_method + msg.Fields.http_status = m.http_status + msg.Fields.http_url = m.http_url + msg.Fields.http_version = m.http_version + msg.Fields.http_response_size = m.http_response_size + msg.Fields.http_response_time = m.http_response_time + m = patt.ip_address:match(msg.Payload) + if m then + msg.Fields.http_client_ip_address = m.ip_address + end + end + + return utils.safe_inject_message(msg) +end