# Copyright 2016 FUJITSU LIMITED # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. input { kafka { zk_connect => "127.0.0.1:2181" topic_id => "transformed-log" group_id => "log-metric" consumer_id => "monasca_log_metrics" consumer_threads => "4" } } filter { # drop logs that have not set log level if "level" not in [log] { drop { periodic_flush => true } } else { ruby { code => " log_level = event['log']['level'].downcase event['log']['level'] = log_level " } } # drop logs with log level not in warning,error if [log][level] not in [warning,error] { drop { periodic_flush => true } } ruby { code => " log_level = event['log']['level'].downcase log_ts = Time.now.to_f * 1000.0 # metric name metric_name = 'log.%s' % log_level # build metric metric = {} metric['name'] = metric_name metric['timestamp'] = log_ts metric['value'] = 1 metric['dimensions'] = event['log']['dimensions'] metric['value_meta'] = {} event['metric'] = metric.to_hash " } mutate { remove_field => ["log", "@version", "@timestamp", "log_level_original", "tags"] } } output { kafka { bootstrap_servers => "%KAFKA_SERVICE_HOST%:%KAFKA_SERVICE_PORT%" topic_id => "metrics" client_id => "monasca_log_metrics" compression_type => "none" } }