# This file is where you define your pipelines. You can define multiple. # For more information on multiple pipelines, see the documentation: # https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html {% set output_pipeline = ["es_local"] %} {% if logstash_kafka_options is defined %} {% set _ = output_pipeline.append('kafka_remote') %} {% endif %} {% set output_pipeline = output_pipeline | to_json %} {% if logstash_syslog_input_enabled | bool %} - pipeline.id: "syslog-intake" queue.type: persisted config.string: | input { tcp { id => "inputSyslogTcp" port => {{ logstash_syslog_input_port }} type => syslog } udp { id => "inputSyslogUdp" port => {{ logstash_syslog_input_port }} type => syslog } } filter { mutate { add_tag => ["syslog"] } } output { pipeline { id => "sendDistributorPipeline" send_to => [distributor] } } {% endif %} - pipeline.id: "beats-intake" queue.type: persisted config.string: | input { beats { id => "inputBeats" port => {{ logstash_beat_input_port }} } } output { pipeline { id => "sendDistributorPipeline" send_to => [distributor] } } - pipeline.id: "general-distributor" config.string: | input { pipeline { id => "inputDistribute" address => distributor } } output { if "filebeat" in [tags] { pipeline { id => "sendFilebeatPipeline" send_to => [filebeat] } } else if "journald" in [tags] { pipeline { id => "sendJournalbeatPipeline" send_to => [journalbeat] } } else { pipeline { id => "sendOutputPipeline" send_to => {{ output_pipeline }} } } } - pipeline.id: "parse-journalbeat" path.config: "/etc/logstash/conf.d/02-journald.conf" config.string: | input { pipeline { id => "inputJournalbeat" address => journalbeat } } filter { if [systemd_slice] { mutate { copy => { "systemd_slice" => "systemd_slice_tag" } } mutate { gsub => [ "systemd_slice_tag", ".slice", "" ] } if [systemd_slice_tag] != "-" { mutate { add_tag => [ "%{systemd_slice_tag}" ] } } mutate { remove_field => [ "%{systemd_slice_tag}" ] } } } output { pipeline { id => "sendFilebeat" send_to => [filebeat] } } - pipeline.id: "parse-filebeat" config.string: | input { pipeline { id => "inputFilebeat" address => filebeat } } filter { if "Traceback" in [message] { mutate { add_tag => ["traceback"] remove_tag => ["_grokparsefailure"] } } } output { if "auth" in [tags] { pipeline { id => "sendAuthLog" send_to => [auth] } } else if "elasticsearch" in [tags] { pipeline { id => "sendElasticsearch" send_to => [elasticsearch] } } else if "ceph" in [tags] { pipeline { id => "sendCeph" send_to => [ceph] } } else if "libvirt" in [tags] { pipeline { id => "sendLibvirt" send_to => [libvirt] } } else if "logstash" in [tags] { pipeline { id => "sendLogstash" send_to => [logstash] } } else if "mysql" in [tags] { pipeline { id => "sendMysql" send_to => [mysql] } } else if "nginx" in [tags] { pipeline { id => "sendNginx" send_to => [nginx] } } else if "openstack" in [tags] { pipeline { id => "sendOpenstack" send_to => [openstack] } } else if "rabbitmq" in [tags] { pipeline { id => "sendRabbitmq" send_to => [rabbitmq] } } else { pipeline { id => "sendOutputPipeline" send_to => {{ output_pipeline }} } } } - pipeline.id: "parse-auth" config.string: | input { pipeline { id => "inputAuthLog" address => auth } } filter { grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{NOTSPACE:logsource} %{SYSLOGPROG}: (?:%{SPACE})?%{GREEDYDATA:logmessage}" } } mutate { add_field => { "module" => "auth" } } } output { pipeline { id => "sendOutputPipeline" send_to => {{ output_pipeline }} } } - pipeline.id: "parse-ceph" config.string: | input { pipeline { id => "inputCeph" address => ceph } } filter { grok { match => { "message" => "%{TIMESTAMP_ISO8601:date} %{NOTSPACE:osd_epoch} ?%{SPACE}?%{NOTSPACE:error_bool} %{GREEDYDATA:logmessage}" } } if "ceph-osd" in [tags] { grok { match => { "message" => "-- (?(%{IPORHOST}\:%{POSINT}/%{POSINT})) (?:[<|>]){1,2} (?(%{IPORHOST}\:%{POSINT}/%{POSINT}))" } } } } output { pipeline { id => "sendOutputPipeline" send_to => {{ output_pipeline }} } } - pipeline.id: "parse-elasticsearch" config.string: | input { pipeline { id => "inputElasticsearch" address => elasticsearch } } filter { grok { match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{LOGLEVEL:loglevel}\s*\]\[%{NOTSPACE:module}\s*\] %{GREEDYDATA:logmessage}" } } mutate { replace => { "module" => "elasticsearch.%{module}" } } } output { pipeline { id => "sendOutputPipeline" send_to => {{ output_pipeline }} } } - pipeline.id: "parse-libvirt" config.string: | input { pipeline { id => "inputLibvirt" address => libvirt } } filter { grok { match => { "message" => "(?m)^%{TIMESTAMP_ISO8601:logdate}:%{SPACE}%{NUMBER:code}:?%{SPACE}\[?\b%{NOTSPACE:loglevel}\b\]?%{SPACE}?:?%{SPACE}\[?\b%{NOTSPACE:module}\b\]?%{SPACE}?%{GREEDYDATA:logmessage}?" } add_field => { "received_at" => "%{@timestamp}"} } mutate { uppercase => [ "loglevel" ] } } output { pipeline { id => "sendOutputPipeline" send_to => {{ output_pipeline }} } } - pipeline.id: "parse-logstash" config.string: | input { pipeline { id => "inputLogstash" address => logstash } } filter { grok { match => { "message" => "\{\:timestamp=>\"%{TIMESTAMP_ISO8601:timestamp}\", \:message=>\"%{DATA:logmessage}\"(;|)(, \:address=>\"%{URIHOST:address}\", \:exception=>#<\"%{DATA:exception}\">, \:backtrace=>\[%{DATA:backtrace}\]|)(, \:level=>:\"%{LOGLEVEL:loglevel}\"|)\}" } } mutate { add_field => { "module" => "logstash" } uppercase => [ "loglevel" ] } if [loglevel] == "WARN" { mutate { replace => { "loglevel" => "WARNING" } } } else if ![loglevel] { mutate { add_field => { "loglevel" => "ERROR" } } } } output { pipeline { id => "sendOutputPipeline" send_to => {{ output_pipeline }} } } - pipeline.id: "parse-mysql" path.config: "/etc/logstash/conf.d/10-mysql.conf" config.string: | input { pipeline { id => "inputMysql" address => mysql } } filter { grok { match => { "message" => "# User@Host: %{WORD:user}\[%{WORD}\] @ (%{HOSTNAME:client_hostname}|) \[(%{IP:client_ip}|)\]" } } grok { match => { "message" => "# Thread_id: %{NUMBER:thread_id:int} \s*Schema: (%{WORD:schema}| ) \s*QC_hit: %{WORD:qc_hit}" } } grok { match => { "message" => "# Query_time: %{NUMBER:query_time:float} \s*Lock_time: %{NUMBER:lock_time:float} \s*Rows_sent: %{NUMBER:rows_sent:int} \s*Rows_examined: %{NUMBER:rows_examined:int}" } } grok { match => { "message" => "(?m)SET timestamp=%{NUMBER:timestamp};%{GREEDYDATA:logmessage}" } } geoip { source => "clientip" } date { match => [ "timestamp", "UNIX" ] } mutate { remove_field => "timestamp" } mutate { gsub => [ "logmessage", "^\n", "" ] add_field => { "module" => "mysql" } add_field => { "loglevel" => "WARNING" } } } output { pipeline { id => "sendOutputPipeline" send_to => {{ output_pipeline }} } } - pipeline.id: "parse-nginx" config.string: | input { pipeline { id => "inputNginx" address => nginx } } filter { if "nginx-access" in [tags] { grok { patterns_dir => ["/opt/logstash/patterns"] match => { "message" => "%{IP:client_ip} - %{USER:client_user} \[%{NGINX_TIMESTAMP:timestamp}\] \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:http_version}\" %{INT:response_code} %{INT:bytes} %{QUOTEDSTRING:referer} %{QUOTEDSTRING:user_agent} %{QUOTEDSTRING:gzip_ratio}" } } geoip { source => "clientip" } } if "nginx-error" in [tags] { grok { patterns_dir => ["/opt/logstash/patterns"] match => { "message" => "%{NGINX_ERROR_TIMESTAMP:timestamp} \[%{LOGLEVEL:loglevel}\] %{GREEDYDATA:error_msg}" } } } } output { pipeline { id => "sendOutputPipeline" send_to => {{ output_pipeline }} } } - pipeline.id: "parse-openstack" config.string: | input { pipeline { id => "inputOpenstack" address => openstack } } filter { if "oslofmt" in [tags] { if "Can not find policy directory: policy.d" in [message] { drop { } } grok { match => { "message" => [ "^%{TIMESTAMP_ISO8601:logdate}%{SPACE}%{NUMBER:pid}?%{SPACE}?(?AUDIT|CRITICAL|DEBUG|INFO|TRACE|WARNING|ERROR) \[?\b%{NOTSPACE:module}\b\]?%{SPACE}?%{GREEDYDATA:logmessage}?", "^%{CISCOTIMESTAMP:journalddate}%{SPACE}%{SYSLOGHOST:host}%{SPACE}%{SYSLOGPROG:prog}%{SPACE}%{TIMESTAMP_ISO8601:logdate}%{SPACE}%{NUMBER:pid}%{SPACE}%{NOTSPACE:loglevel}%{SPACE}%{NOTSPACE:module}%{SPACE}%{GREEDYDATA:logmessage}" ] } add_field => { "received_at" => "%{@timestamp}" } } } if "nova" in [tags] { mutate { gsub => ["logmessage","\"",""] } if [module] == "nova.osapi_compute.wsgi.server" { grok { match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} status\: %{NUMBER:response} len\: %{NUMBER:bytes:int} time\: %{BASE10NUM:httptime:float}" } add_tag => ["apimetrics"] } } else if [module] == "nova.api.ec2" { grok { match => { "logmessage" => "\[%{GREEDYDATA:requestid}\] %{NUMBER:seconds}s %{NOTSPACE:requesterip} %{NOTSPACE:verb} %{NOTSPACE:url_path} None\:None %{NUMBER:response} %{GREEDYDATA:user_agent}" } add_tag => ["apimetrics"] } } else if [module] == "nova.metadata.wsgi.server" { grok { match => { "logmessage" => "\[%{GREEDYDATA:requestid}\] %{NOTSPACE:requesterip} %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} status\: %{NUMBER:response} len\: %{NUMBER:bytes} time\: %{NUMBER:seconds}" } add_tag => ["apimetrics"] } } } else if "neutron" in [tags] { if [module] == "neutron.wsgi" { if "accepted" not in [logmessage] { mutate { gsub => ["logmessage","\"",""] } grok { match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" } add_tag => ["apimetrics"] } } } else if "neutron-ha-tool" in [source] { mutate { add_tag => ["neutron-ha-tool"] remove_tag => ["_grokparsefailure"] } } if "starting" in [message] and "_grokparsefailure" in [tags] { grok { match => { "logmessage" => "\[(%{NOTSPACE:requestid}|\-)\](%{SPACE}\(%{NUMBER:pid}\)) %{GREEDYDATA:servicemessage}" } } mutate { remove_tag => ["_grokparsefailure"] } } } else if "glance" in [tags] { if [module] == "eventlet.wsgi.server" { mutate { gsub => ["logmessage","\"",""] } grok { match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" } add_tag => ["apimetrics"] } mutate { replace => { "module" => "glance.%{module}" } } } } else if "cinder" in [tags] { if [module] == "cinder.eventlet.wsgi.server" { if "accepted" not in [logmessage] { mutate { gsub => ["logmessage","\"",""] } grok { match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" } add_tag => ["apimetrics"] } } mutate { replace => { "module" => "cinder.%{module}" } } } } else if "horizon" in [tags] { grok { patterns_dir => ["/opt/logstash/patterns"] match => { "message" => [ "%{COMMONAPACHELOG}", "\[%{APACHE_ERROR_TIMESTAMP:timestamp}\] \[%{DATA:module}:%{DATA:loglevel}\] \[pid %{POSINT:apache_pid}\:tid %{POSINT:apache_tid}\] ?(?:\[client %{IP:clientip}:%{POSINT:clientport}\] )?%{GREEDYDATA:logmessage}", "%{SYSLOGTIMESTAMP:timestamp}%{SPACE}%{SYSLOGHOST:host}%{SPACE}%{PROG:prog}%{SPACE}%{IP:clientip}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{SYSLOG5424SD}%{SPACE}%{QS}%{SPACE}%{NUMBER}%{SPACE}%{NUMBER}%{SPACE}%{QS}%{SPACE}%{QS}" ] } } geoip { source => "clientip" } if ![loglevel] { mutate { add_field => { "logmessage" => "%{request}" } add_field => { "module" => "horizon.access" } add_field => { "loglevel" => "INFO" } add_tag => [ "apache-access" ] } } else { mutate { replace => { "module" => "horizon.error.%{module}" } add_tag => [ "apache-error" ] uppercase => [ "loglevel" ] } } } else if "heat" in [tags] { if [module] == "eventlet.wsgi.server" { if "accepted" not in [logmessage] { mutate { gsub => ["logmessage","\"",""] } grok { match => { "logmessage" => "\[%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} %{NOTSPACE} %{NOTSPACE} %{NOTSPACE}\] %{NOTSPACE:requesterip} %{NOTSPACE} %{NOTSPACE} \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes} %{BASE10NUM:httptime}" } add_tag => ["apimetrics"] } } mutate { replace => { "module" => "heat.%{module}" } } } else if [module] == "heat.engine.service" { grok { match => { "logmessage" => "\[%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} %{NOTSPACE} %{NOTSPACE} %{NOTSPACE} %{GREEDYDATA:servicemessage}" } add_tag => ["apimetrics"] } } } else if "swift-container" in [tags] { grok { match => { "message" => "%{CISCOTIMESTAMP}%{SPACE}%{S3_REQUEST_LINE}%{SPACE}%{CISCOTIMESTAMP}%{SPACE}%{HOSTNAME}%{SPACE}%{PROG}%{SPACE}%{USER}%{SPACE}%{USERNAME}%{SPACE}%{NOTSPACE}%{SPACE}%{S3_REQUEST_LINE}%{SPACE}%{HTTPDUSER}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{INT}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{SECOND}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}" } } } else if "swift-account" in [tags] { grok { match => { "message" => "%{SYSLOGTIMESTAMP}%{SPACE}%{HOSTNAME}%{SPACE}%{PROG}%{SPACE}%{SYSLOGTIMESTAMP}%{SPACE}%{S3_REQUEST_LINE}%{SPACE}%{IP}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{SYSLOG5424SD}%{SPACE}%{QS}%{SPACE}%{POSINT}%{SPACE}%{NOTSPACE}%{SPACE}%{QS}%{SPACE}%{QS}%{SPACE}%{QS}%{SPACE}%{SECOND}%{SPACE}%{QS}%{SPACE}%{NUMBER}%{SPACE}%{NOTSPACE}" } } } else if "swift" in [tags] { grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{NOTSPACE:logsource} %{SYSLOGPROG:module}: (?:%{SPACE})?%{GREEDYDATA:logmessage}" } } grok { patterns_dir => ["/opt/logstash/patterns"] match => { "logmessage" => [ "%{COMBINEDAPACHELOG}", "%{SWIFTPROXY_ACCESS}", "%{GREEDYDATA:logmessage} \(txn\: %{DATA:swift_txn}\)" ] } tag_on_failure => [] overwrite => [ "logmessage" ] } if [request] { mutate { replace => { "logmessage" => "%{request}" } } } mutate { replace => { "module" => "swift.%{module}" } } if [file] =~ "error.log$" { mutate { add_field => { "loglevel" => "NOTICE" } } } else { mutate { add_field => { "loglevel" => "INFO" } } } } else if "keystone-access" in [tags] { grok { match => { "message" => "%{CISCOTIMESTAMP:keystone_access_timestamp}%{SPACE}%{SYSLOGHOST:log_host}%{SPACE}%{SYSLOGPROG:prog}%{SPACE}%{TIMESTAMP_ISO8601:keystone_timestmp}%{SPACE}%{NUMBER:pid}%{SPACE}%{NOTSPACE:loglevel}%{SPACE}%{NOTSPACE:module}%{SPACE}%{SYSLOG5424SD:requestid}%{SPACE}%{WORD:verb}%{SPACE}%{NOTSPACE:request}" } } } else if "keystone" in [tags] { if "apache-access" in [tags] { grok { match => { "message" => "%{COMMONAPACHELOG}" } } mutate { add_field => { "logmessage" => "%{request}" } add_field => { "module" => "keystone.access" } add_field => { "loglevel" => "INFO" } } } else if "apache-error" in [tags] { grok { patterns_dir => ["/opt/logstash/patterns"] match => { "message" => "%{KEYSTONE_SUBSECOND_TIMESTAMP:keystone_subsecond_timestamp} %{STANDARD_TIMESTAMP:standard_timestamp} %{NUMBER:pid} %{DATA:loglevel} %{DATA:module} \[%{DATA:requestid}\] %{WORD:verb} %{NOTSPACE:request}" } } mutate { replace => { "module" => "keystone.error.%{module}" } uppercase => [ "loglevel" ] } } } else if "magnum" in [tags] { if [module] == "eventlet.wsgi.server" { mutate { gsub => ["logmessage","\"",""] } grok { match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" } add_tag => ["apimetrics"] } mutate { replace => { "module" => "magnum.%{module}" } } } } else if "octavia" in [tags] { if [module] == "eventlet.wsgi.server" { mutate { gsub => ["logmessage","\"",""] } grok { match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" } add_tag => ["apimetrics"] } mutate { replace => { "module" => "octavia.%{module}" } } } } } output { pipeline { id => "sendOutputPipeline" send_to => {{ output_pipeline }} } } - pipeline.id: "parse-rabbitmq" config.string: | input { pipeline { id => "inputRabbitmq" address => rabbitmq } } filter { if [message] == "" { drop { } } grok { match => { "message" => "^\=%{LOGLEVEL:loglevel} REPORT\=\=\=\= %{MONTHDAY:event_day}\-%{MONTH:event_month}\-%{YEAR:event_year}\:\:%{TIME:event_time} \=\=\=\n%{GREEDYDATA:logmessage}" } } mutate { replace => { "module" => "rabbitmq" } add_field => { "timestamp" => "%{event_day} %{event_month} %{event_year} %{event_time}" } } date { match => [ "timestamp", "dd MMM YYYY HH:mm:ss" ] remove_field => [ "event_day", "event_month", "event_year", "event_time", "timestamp" ] } } output { pipeline { id => "sendOutputPipeline" send_to => {{ output_pipeline }} } } - pipeline.id: "local-elasticsearch" config.string: | input { pipeline { id => "inputElasticsearchPipeline" address => es_local } } filter { if [source.ip] { geoip { id => "setGeoIpSource" source => "source.ip" } } else if [ip] { geoip { id => "setGeoIp" source => "ip" } } if [message] { fingerprint { id => "setSHA1" target => "[@metadata][fingerprint]" method => "SHA1" key => "{{ inventory_hostname | to_uuid }}" } } else { fingerprint { target => "[@metadata][fingerprint]" method => "UUID" } } } output { if [@metadata][version] { elasticsearch { id => "elasticsearchOutputPipeline" document_id => "%{[@metadata][fingerprint]}" hosts => {{ elasticsearch_data_hosts | shuffle(seed=inventory_hostname) | to_json }} sniffing => {{ (not data_node | bool) | lower }} manage_template => {{ (data_node | bool) | lower }} index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" } } else if [@metadata][beat] { elasticsearch { id => "elasticsearchLegacyOutputPipeline" document_id => "%{[@metadata][fingerprint]}" hosts => {{ elasticsearch_data_hosts | shuffle(seed=inventory_hostname) | to_json }} sniffing => {{ (not data_node | bool) | lower }} manage_template => {{ (data_node | bool) | lower }} index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" } } else if "syslog" in [tags] { elasticsearch { id => "elasticsearchSyslogOutputPipeline" document_id => "%{[@metadata][fingerprint]}" hosts => {{ elasticsearch_data_hosts | shuffle(seed=inventory_hostname) | to_json }} sniffing => {{ (not data_node | bool) | lower }} manage_template => {{ (data_node | bool) | lower }} index => "syslog-%{+YYYY.MM.dd}" } } else { elasticsearch { id => "elasticsearchUndefinedOutputPipeline" document_id => "%{[@metadata][fingerprint]}" hosts => {{ elasticsearch_data_hosts | shuffle(seed=inventory_hostname) | to_json }} sniffing => {{ (not data_node | bool) | lower }} manage_template => {{ (data_node | bool) | lower }} index => "undefined-%{+YYYY.MM.dd}" } } } {% if logstash_kafka_options is defined %} - pipeline.id: "remote-kafka" config.string: | input { pipeline { id => "inputKafkaPipeline" address => kafka_remote } } output { kafka { {% for key, value in logstash_kafka_options.items() %} {% if value is number %} {{ key }} => {{ value }} {% elif value is iterable and value is not string %} {{ key }} => "{{ value | join(',') }}" {% else %} {{ key }} => "{{ value }}" {% endif %} {% endfor %} } } {% endif %}