# # Copyright 2016 FUJITSU LIMITED # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. # input { kafka { zk_connect => "127.0.0.1:2181" topic_id => "log" group_id => "transformer-logstash-consumer" } } filter { ruby { code => "event['message_tmp'] = event['log']['message'][0..49]" } grok { match => { "[message_tmp]" => "(?i)(?AUDIT|CRITICAL|DEBUG|INFO|TRACE|ERR(OR)?|WARN(ING)?)|\"level\":\s?(?\d{2})" } } if ! [log_level] { grok { match => { "[log][message]" => "(?i)(?AUDIT|CRITICAL|DEBUG|INFO|TRACE|ERR(OR)?|WARN(ING)?)|\"level\":\s?(?\d{2})" } } } ruby { init => " LOG_LEVELS_MAP = { # SYSLOG 'warn' => :Warning, 'err' => :Error, # Bunyan errcodes '10' => :Trace, '20' => :Debug, '30' => :Info, '40' => :Warning, '50' => :Error, '60' => :Fatal } " code => " if event['log_level'] # keep original value log_level = event['log_level'].downcase if LOG_LEVELS_MAP.has_key?(log_level) event['log_level_original'] = event['log_level'] event['log_level'] = LOG_LEVELS_MAP[log_level] else event['log_level'] = log_level.capitalize end else event['log_level'] = 'Unknown' end " } mutate { add_field => { "[log][level]" => "%{log_level}" } # remove temporary fields remove_field => ["log_level", "message_tmp"] } } output { kafka { bootstrap_servers => "127.0.0.1:9092" topic_id => "transformed-log" } }