Convert logstash groks to a multi-pipeline setup

The logstash groks were running in line using the legacy method which uses
lexical sorting of all logstash filter files and loads them in order. While
this works it makes it so all data has to travel through all filters.
This change makes use of the logstash multi-pipeline capabilities
using a distributor and fork pattern. This allows data to flow through
logstash more quickly and not block whenever there's an issue with an
output plugin.

Finger-prints using SHA1 when there's a message and UUID when not. This
will ensure we're duplicating log entries which will help speed up
transations and further reduce the storage required.

Change-Id: I38268e33b370da0f1e186ecf65911d4a312c3e6a
Signed-off-by: Kevin Carter <>
This commit is contained in:
Kevin Carter 2018-07-26 19:20:57 -05:00
parent a0780fb582
commit b6343c57a4
No known key found for this signature in database
GPG Key ID: 9443251A787B9FB3
33 changed files with 777 additions and 591 deletions

View File

@ -112,6 +112,7 @@
# ingest nodes.
- name: Set data nodes
elasticsearch_number_of_replicas: "{{ ((data_nodes | length) > 1) | ternary(((data_nodes | length) > 2 | ternary(2, 1)), 0) }}"
elasticsearch_data_hosts: |-
{% set nodes = elasticsearch_data_node_details %}
{% if inventory_hostname in data_nodes %}

View File

@ -8,6 +8,11 @@
environment: "{{ deployment_environment_variables | default({}) }}"
- include_tasks: common_task_data_node_hosts.yml
- always
- name: Create basic indexes
@ -30,7 +35,6 @@
limit: "10000"
refresh_interval: "5s"
number_of_replicas: "1"
- name: "_all/_settings?preserve_existing=true"
index.refresh_interval: "10s"
@ -38,3 +42,6 @@
index.queries.cache.enabled: "true"
indices.queries.cache.size: "5%"
- name: "_all/_settings"
index.number_of_replicas: "{{ elasticsearch_number_of_replicas }}"

View File

@ -1,16 +0,0 @@
filter {
if "oslofmt" in [tags] or "openstack" in [tags] {
if "Can not find policy directory: policy.d" in [message] {
grok {
match => {
"message" => [
"^%{TIMESTAMP_ISO8601:logdate}%{SPACE}%{NUMBER:pid}?%{SPACE}?(?<loglevel>AUDIT|CRITICAL|DEBUG|INFO|TRACE|WARNING|ERROR) \[?\b%{NOTSPACE:module}\b\]?%{SPACE}?%{GREEDYDATA:logmessage}?",
add_field => { "received_at" => "%{@timestamp}" }

View File

@ -1,22 +0,0 @@
filter {
if "journald" in [tags] {
if [systemd_slice] {
mutate {
copy => { "systemd_slice" => "systemd_slice_tag" }
mutate {
gsub => [ "systemd_slice_tag", ".slice", "" ]
if [systemd_slice_tag] != "-" {
mutate {
add_tag => [
mutate {
remove_field => [ "%{systemd_slice_tag}" ]

View File

@ -1,23 +0,0 @@
filter {
if "nova" in [tags] {
mutate {
gsub => ['logmessage',"\"",""]
if [module] == "nova.osapi_compute.wsgi.server" {
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} status\: %{NUMBER:response} len\: %{NUMBER:bytes:int} time\: %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
} else if [module] == "nova.api.ec2" {
grok {
match => { "logmessage" => "\[%{GREEDYDATA:requestid}\] %{NUMBER:seconds}s %{NOTSPACE:requesterip} %{NOTSPACE:verb} %{NOTSPACE:url_path} None\:None %{NUMBER:response} %{GREEDYDATA:user_agent}" }
add_tag => ["apimetrics"]
} else if [module] == "nova.metadata.wsgi.server" {
grok {
match => { "logmessage" => "\[%{GREEDYDATA:requestid}\] %{NOTSPACE:requesterip} %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} status\: %{NUMBER:response} len\: %{NUMBER:bytes} time\: %{NUMBER:seconds}" }
add_tag => ["apimetrics"]

View File

@ -1,28 +0,0 @@
filter {
if "neutron" in [tags] {
if [module] == "neutron.wsgi" {
if "accepted" not in [logmessage] {
mutate {
gsub => ['logmessage',"\"",""]
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
} else if "neutron-ha-tool" in [source] {
mutate {
add_tag => ["neutron-ha-tool"]
remove_tag => ["_grokparsefailure"]
if "starting" in [message] and "_grokparsefailure" in [tags] {
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid}|\-)\](%{SPACE}\(%{NUMBER:pid}\)) %{GREEDYDATA:servicemessage}" }
mutate {
remove_tag => ["_grokparsefailure"]

View File

@ -1,16 +0,0 @@
filter {
if "glance" in [tags] {
if [module] == "eventlet.wsgi.server" {
mutate {
gsub => ['logmessage',"\"",""]
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
mutate {
replace => { "module" => "glance.%{module}" }

View File

@ -1,18 +0,0 @@
filter {
if "cinder" in [tags] {
if [module] == "cinder.eventlet.wsgi.server" {
if "accepted" not in [logmessage] {
mutate {
gsub => ['logmessage',"\"",""]
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
mutate {
replace => { "module" => "cinder.%{module}" }

View File

@ -1,11 +0,0 @@
filter {
if "libvirt" in [tags] {
grok {
match => { "message" => "(?m)^%{TIMESTAMP_ISO8601:logdate}:%{SPACE}%{NUMBER:code}:?%{SPACE}\[?\b%{NOTSPACE:loglevel}\b\]?%{SPACE}?:?%{SPACE}\[?\b%{NOTSPACE:module}\b\]?%{SPACE}?%{GREEDYDATA:logmessage}?" }
add_field => { "received_at" => "%{@timestamp}"}
mutate {
uppercase => [ "loglevel" ]

View File

@ -1,31 +0,0 @@
filter {
if "horizon" in [tags] {
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => {
"message" => [
"\[%{APACHE_ERROR_TIMESTAMP:timestamp}\] \[%{DATA:module}:%{DATA:loglevel}\] \[pid %{POSINT:apache_pid}\:tid %{POSINT:apache_tid}\] ?(?:\[client %{IP:clientip}:%{POSINT:clientport}\] )?%{GREEDYDATA:logmessage}",
geoip {
source => "clientip"
if ![loglevel] {
mutate {
add_field => { "logmessage" => "%{request}" }
add_field => { "module" => "horizon.access" }
add_field => { "loglevel" => "INFO" }
add_tag => [ "apache-access" ]
} else {
mutate {
replace => { "module" => "horizon.error.%{module}" }
add_tag => [ "apache-error" ]
uppercase => [ "loglevel" ]

View File

@ -1,23 +0,0 @@
filter {
if "heat" in [tags] {
if [module] == "eventlet.wsgi.server" {
if "accepted" not in [logmessage] {
mutate {
gsub => ['logmessage',"\"",""]
grok {
match => { "logmessage" => "\[%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} %{NOTSPACE} %{NOTSPACE} %{NOTSPACE}\] %{NOTSPACE:requesterip} %{NOTSPACE} %{NOTSPACE} \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes} %{BASE10NUM:httptime}" }
add_tag => ["apimetrics"]
mutate {
replace => { "module" => "heat.%{module}" }
} else if [module] == "heat.engine.service" {
grok {
match => { "logmessage" => "\[%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} %{NOTSPACE} %{NOTSPACE} %{NOTSPACE} %{GREEDYDATA:servicemessage}" }
add_tag => ["apimetrics"]

View File

@ -1,37 +0,0 @@
filter {
if "mysql" in [tags] {
grok {
match => { "message" => "# User@Host: %{WORD:user}\[%{WORD}\] @ (%{HOSTNAME:client_hostname}|) \[(%{IP:client_ip}|)\]" }
grok {
match => { "message" => "# Thread_id: %{NUMBER:thread_id:int} \s*Schema: (%{WORD:schema}| ) \s*QC_hit: %{WORD:qc_hit}" }
grok {
match => { "message" => "# Query_time: %{NUMBER:query_time:float} \s*Lock_time: %{NUMBER:lock_time:float} \s*Rows_sent: %{NUMBER:rows_sent:int} \s*Rows_examined: %{NUMBER:rows_examined:int}" }
grok {
match => { "message" => "(?m)SET timestamp=%{NUMBER:timestamp};%{GREEDYDATA:logmessage}" }
geoip {
source => "clientip"
date {
match => [ "timestamp", "UNIX" ]
mutate {
remove_field => "timestamp"
mutate {
gsub => [ "logmessage", "^\n", "" ]
add_field => { "module" => "mysql" }
add_field => { "loglevel" => "WARNING" }

View File

@ -1,13 +0,0 @@
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]

View File

@ -1,10 +0,0 @@
filter {
if "auth" in [tags] {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{NOTSPACE:logsource} %{SYSLOGPROG}: (?:%{SPACE})?%{GREEDYDATA:logmessage}" }
mutate {
add_field => { "module" => "auth" }

View File

@ -1,25 +0,0 @@
filter {
if "logstash" in [tags] {
grok {
match => {
"message" => "\{\:timestamp=>\"%{TIMESTAMP_ISO8601:timestamp}\", \:message=>\"%{DATA:logmessage}\"(;|)(, \:address=>\"%{URIHOST:address}\", \:exception=>#<%{DATA:exception}>, \:backtrace=>\[%{DATA:backtrace}\]|)(, \:level=>:%{LOGLEVEL:loglevel}|)\}"
mutate {
add_field => { "module" => "logstash" }
uppercase => [ "loglevel" ]
if [loglevel] == "WARN" {
mutate {
replace => { "loglevel" => "WARNING" }
} else if ![loglevel] {
mutate {
add_field => { "loglevel" => "ERROR" }

View File

@ -1,56 +0,0 @@
filter {
if "swift-container" in [tags] {
grok {
match => {
if "swift-account" in [tags] {
grok {
match => {
if "swift" in [tags] {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{NOTSPACE:logsource} %{SYSLOGPROG:module}: (?:%{SPACE})?%{GREEDYDATA:logmessage}"
grok {
patterns_dir => ['/opt/logstash/patterns']
match => {
"logmessage" => [
"%{GREEDYDATA:logmessage} \(txn\: %{DATA:swift_txn}\)"
tag_on_failure => []
overwrite => [ "logmessage" ]
if [request] {
mutate {
replace => { "logmessage" => "%{request}" }
mutate {
replace => { "module" => "swift.%{module}" }
if [file] =~ "error.log$" {
mutate {
add_field => { "loglevel" => "NOTICE" }
} else {
mutate {
add_field => { "loglevel" => "INFO" }

View File

@ -1,28 +0,0 @@
filter {
if "keystone-access" in [tags] {
grok {
match => { "message" => "%{CISCOTIMESTAMP:keystone_access_timestamp}%{SPACE}%{SYSLOGHOST:log_host}%{SPACE}%{SYSLOGPROG:prog}%{SPACE}%{TIMESTAMP_ISO8601:keystone_timestmp}%{SPACE}%{NUMBER:pid}%{SPACE}%{NOTSPACE:loglevel}%{SPACE}%{NOTSPACE:module}%{SPACE}%{SYSLOG5424SD:requestid}%{SPACE}%{WORD:verb}%{SPACE}%{NOTSPACE:request}" }
if "keystone" in [tags] {
if "apache-access" in [tags] {
grok {
match => { "message" => "%{COMMONAPACHELOG}" }
mutate {
add_field => { "logmessage" => "%{request}" }
add_field => { "module" => "keystone.access" }
add_field => { "loglevel" => "INFO" }
} else if "apache-error" in [tags] {
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => { "message" => "%{KEYSTONE_SUBSECOND_TIMESTAMP:keystone_subsecond_timestamp} %{STANDARD_TIMESTAMP:standard_timestamp} %{NUMBER:pid} %{DATA:loglevel} %{DATA:module} \[%{DATA:requestid}\] %{WORD:verb} %{NOTSPACE:request}" }
mutate {
replace => { "module" => "keystone.error.%{module}" }
uppercase => [ "loglevel" ]

View File

@ -1,10 +0,0 @@
filter {
if "elasticsearch" in [tags] {
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{LOGLEVEL:loglevel}\s*\]\[%{NOTSPACE:module}\s*\] %{GREEDYDATA:logmessage}" }
mutate {
replace => { "module" => "elasticsearch.%{module}" }

View File

@ -1,20 +0,0 @@
filter {
if "rabbitmq" in [tags] {
if [message] == "" {
drop { }
grok {
match => { "message" => "^\=%{LOGLEVEL:loglevel} REPORT\=\=\=\= %{MONTHDAY:event_day}\-%{MONTH:event_month}\-%{YEAR:event_year}\:\:%{TIME:event_time} \=\=\=\n%{GREEDYDATA:logmessage}" }
mutate {
replace => { "module" => "rabbitmq" }
add_field => { "timestamp" => "%{event_day} %{event_month} %{event_year} %{event_time}" }
date {
match => [ "timestamp", "dd MMM YYYY HH:mm:ss" ]
remove_field => [ "event_day", "event_month", "event_year", "event_time", "timestamp" ]

View File

@ -1,12 +0,0 @@
filter {
if "ceph" in [tags] {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:date} %{NOTSPACE:osd_epoch} ?%{SPACE}?%{NOTSPACE:error_bool} %{GREEDYDATA:logmessage}" }
if "ceph-osd" in [tags] {
grok {
match => { "message" => "-- (?<src_host>(%{IPORHOST}\:%{POSINT}/%{POSINT})) (?:[<|>]){1,2} (?<dst_host>(%{IPORHOST}\:%{POSINT}/%{POSINT}))" }

View File

@ -1,23 +0,0 @@
filter {
if "nginx" in [tags] {
if "nginx-access" in [tags] {
grok {
patterns_dir => ['/opt/logstash/patterns']
match => {
"message" => "%{IP:client_ip} - %{USER:client_user} \[%{NGINX_TIMESTAMP:timestamp}\] \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:http_version}\" %{INT:response_code} %{INT:bytes} %{QUOTEDSTRING:referer} %{QUOTEDSTRING:user_agent} %{QUOTEDSTRING:gzip_ratio}"
geoip {
source => "clientip"
if "nginx-error" in [tags] {
grok {
patterns_dir => ['/opt/logstash/patterns']
match => {
"message" => "%{NGINX_ERROR_TIMESTAMP:timestamp} \[%{LOGLEVEL:loglevel}\] %{GREEDYDATA:error_msg}"

View File

@ -1,16 +0,0 @@
filter {
if "magnum" in [tags] {
if [module] == "eventlet.wsgi.server" {
mutate {
gsub => ['logmessage',"\"",""]
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
mutate {
replace => { "module" => "magnum.%{module}" }

View File

@ -1,16 +0,0 @@
filter {
if "octavia" in [tags] {
if [module] == "eventlet.wsgi.server" {
mutate {
gsub => ['logmessage',"\"",""]
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
mutate {
replace => { "module" => "octavia.%{module}" }

View File

@ -1,8 +0,0 @@
filter {
if "Traceback" in [message] {
mutate {
add_tag => ["traceback"]
remove_tag => ["_grokparsefailure"]

View File

@ -8,6 +8,7 @@
temp_dir: /var/lib/logstash/tmp
logstash_pipelines: "{{lookup('template', 'templates/logstash-pipelines.yml.j2') }}"
environment: "{{ deployment_environment_variables | default({}) }}"
@ -27,7 +28,7 @@
- name: Set processor cores fact
q_storage: "{{ (ansible_processor_cores | int) * 2 }}"
q_storage: "{{ (ansible_processor_cores | int) * (ansible_processor_threads_per_core | int) * 2 }}"
- q_storage is not defined
@ -36,7 +37,7 @@
- name: Set logstash facts
elastic_heap_size: "{{ ((q_mem | int) > 30720) | ternary(30720, q_mem) }}"
logstash_queue_size: "{{ (((q_storage | int) > 16) | ternary(16, q_storage) | int) * 1024 }}"
logstash_queue_size: "{{ ((((q_storage | int) >= 2) | ternary(q_storage, 2) | int) * 1024) // ((logstash_pipelines | from_yaml) | length) }}"
elastic_log_rotate_path: "/var/log/logstash"
- always
@ -125,24 +126,6 @@
- Enable and restart logstash
- name: Drop logstash conf file(s)
src: "{{ item.src }}"
dest: "{{ item.dest }}"
- src: templates/jvm.options.j2
dest: /etc/logstash/jvm.options
- src: templates/99-elasticsearch-output.conf.j2
dest: /etc/logstash/conf.d/99-elasticsearch-output.conf
- src: templates/02-beats-input.conf.j2
dest: /etc/logstash/conf.d/02-beats-input.conf
- src: templates/logstash.yml.j2
dest: /etc/logstash/logstash.yml
- Enable and restart logstash
- config
- name: Create patterns directory
name: "/opt/logstash/patterns"
@ -168,41 +151,6 @@
- logstash-filters
- config
- name: Deploy Logstash configuration files
src: "files/{{ item }}"
dest: "/etc/logstash/conf.d/{{ item }}"
- 02-general.conf
- 02-journald.conf
- 03-nova.conf
- 04-neutron.conf
- 05-glance.conf
- 06-cinder.conf
- 07-libvirt.conf
- 08-apache.conf
- 09-heat.conf
- 10-mysql.conf
- 10-syslog-filter.conf
- 11-auth.conf
- 12-logstash.conf
- 13-swift.conf
- 14-keystone.conf
- 16-elasticsearch.conf
- 17-rabbitmq.conf
- 18-ceph.conf
- 19-nginx.conf
- 20-magnum.conf
- 21-octavia.conf
- 98-traceback.conf
- logstash_deploy_filters
- Enable and restart logstash
- logstash-filters
- config
- name: Run kafka output block
- name: Copy kafka keystore into place
@ -219,15 +167,27 @@
- logstash_kafka_ssl_truststore_location is defined
- name: Deploy Logstash kafka configuration files
src: "templates/99-kafka-output.conf.j2"
dest: "/etc/logstash/conf.d/99-kafka-output.conf"
- Enable and restart logstash
- logstash_kafka_options is defined
- name: Drop logstash conf file(s)
src: "{{ item.src }}"
dest: "{{ item.dest }}"
- src: templates/jvm.options.j2
dest: /etc/logstash/jvm.options
- src: templates/logstash.yml.j2
dest: /etc/logstash/logstash.yml
- src: templates/logstash-pipelines.yml.j2
dest: /etc/logstash/pipelines.yml
- src: "templates/logrotate.j2"
dest: "/etc/logrotate.d/logstash"
- Enable and restart logstash
- config
- name: Ensure logstash ownership
path: /var/lib/logstash
@ -247,11 +207,6 @@
group: "logstash"
mode: "0750"
- name: Create logrotate config
src: "templates/logrotate.j2"
dest: "/etc/logrotate.d/logstash"
- name: Run arcsight output block
- name: Initialise arcsight local facts

View File

@ -1,17 +0,0 @@
input {
beats {
port => {{ logstash_beat_input_port }}
filter {
if [source.ip] {
geoip {
source => "source.ip"
} else if [ip] {
geoip {
source => "ip"

View File

@ -1,28 +0,0 @@
filter {
fingerprint {
source => "message"
target => "[@metadata][fingerprint]"
method => "SHA1"
key => "{{ cluster_name | replace(' ', '_') }}"
base64encode => true
output {
if [@metadata][version] {
elasticsearch {
document_id => "%{[@metadata][fingerprint]}"
hosts => {{ elasticsearch_data_hosts | shuffle(seed=inventory_hostname) | to_json }}
sniffing => {{ (not data_node | bool) | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
} else {
elasticsearch {
document_id => "%{[@metadata][fingerprint]}"
hosts => {{ elasticsearch_data_hosts | shuffle(seed=inventory_hostname) | to_json }}
sniffing => {{ (not data_node | bool) | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"

View File

@ -1,13 +0,0 @@
output {
kafka {
{% for key, value in logstash_kafka_options.items() %}
{% if value is number %}
{{ key }} => {{ value }}
{% elif value is iterable and value is not string %}
{{ key }} => "{{ value | join(',') }}"
{% else %}
{{ key }} => "{{ value }}"
{% endif %}
{% endfor %}

View File

@ -33,6 +33,9 @@ setup.template.settings:
codec: best_compression
# This provides for an index split of up to 2 times the number of available shards
number_of_routing_shards: {{ (shards | int) * 2 }}
# The default number of replicas will be based on the number of data nodes
# within the environment with a limit of 2 replicas.
number_of_replicas: {{ elasticsearch_number_of_replicas }}
# A dictionary of settings for the _source field. For more details, please check

View File

@ -68,7 +68,7 @@
"exclude_nodes": (groups['kibana'] | map('extract', hostvars, 'ansible_host') | list)
"number_of_shards": 1,
"number_of_replicas": 1,
"number_of_replicas": elasticsearch_number_of_replicas,
"shrink_suffix": '-shrink',
"copy_aliases": true,
"delete_after": true,

View File

@ -1310,7 +1310,8 @@ filebeat.prospectors:
# The tags of the shipper are included in their own field with each
# transaction published. Tags make it easy to group servers by different
# logical properties.
#tags: ["service-X", "web-tier"]
- filebeat
# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested

View File

@ -0,0 +1,739 @@
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
{% set output_pipeline = ["es_local"] %}
{% if logstash_kafka_options is defined %}
{% set _ = output_pipeline.append('kafka_remote') %}
{% endif %}
{% set output_pipeline = output_pipeline | to_json %}
- "beats-intake"
queue.type: persisted
config.string: |
input {
beats {
id => "inputBeats"
port => {{ logstash_beat_input_port }}
output {
pipeline {
id => "sendDistributorPipeline"
send_to => [distributor]
- "general-distributor"
config.string: |
input {
pipeline {
id => "inputDistribute"
address => distributor
output {
if "filebeat" in [tags] {
pipeline {
id => "sendFilebeatPipeline"
send_to => [filebeat]
} else if "journald" in [tags] {
pipeline {
id => "sendJournalbeatPipeline"
send_to => [journalbeat]
} else {
pipeline {
id => "sendOutputPipeline"
send_to => {{ output_pipeline }}
- "parse-journalbeat"
path.config: "/etc/logstash/conf.d/02-journald.conf"
config.string: |
input {
pipeline {
id => "inputJournalbeat"
address => journalbeat
filter {
if [systemd_slice] {
mutate {
copy => { "systemd_slice" => "systemd_slice_tag" }
mutate {
gsub => [ "systemd_slice_tag", ".slice", "" ]
if [systemd_slice_tag] != "-" {
mutate {
add_tag => [
mutate {
remove_field => [ "%{systemd_slice_tag}" ]
output {
pipeline {
id => "sendFilebeat"
send_to => [filebeat]
- "parse-filebeat"
config.string: |
input {
pipeline {
id => "inputFilebeat"
address => filebeat
filter {
if "Traceback" in [message] {
mutate {
add_tag => ["traceback"]
remove_tag => ["_grokparsefailure"]
output {
if "auth" in [tags] {
pipeline {
id => "sendAuthLog"
send_to => [auth]
} else if "elasticsearch" in [tags] {
pipeline {
id => "sendElasticsearch"
send_to => [elasticsearch]
} else if "ceph" in [tags] {
pipeline {
id => "sendCeph"
send_to => [ceph]
} else if "libvirt" in [tags] {
pipeline {
id => "sendLibvirt"
send_to => [libvirt]
} else if "logstash" in [tags] {
pipeline {
id => "sendLogstash"
send_to => [logstash]
} else if "mysql" in [tags] {
pipeline {
id => "sendMysql"
send_to => [mysql]
} else if "nginx" in [tags] {
pipeline {
id => "sendNginx"
send_to => [nginx]
} else if "openstack" in [tags] {
pipeline {
id => "sendOpenstack"
send_to => [openstack]
} else if "rabbitmq" in [tags] {
pipeline {
id => "sendRabbitmq"
send_to => [rabbitmq]
} else {
pipeline {
id => "sendOutputPipeline"
send_to => {{ output_pipeline }}
- "parse-auth"
config.string: |
input {
pipeline {
id => "inputAuthLog"
address => auth
filter {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{NOTSPACE:logsource} %{SYSLOGPROG}: (?:%{SPACE})?%{GREEDYDATA:logmessage}" }
mutate {
add_field => { "module" => "auth" }
output {
pipeline {
id => "sendOutputPipeline"
send_to => {{ output_pipeline }}
- "parse-ceph"
config.string: |
input {
pipeline {
id => "inputCeph"
address => ceph
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:date} %{NOTSPACE:osd_epoch} ?%{SPACE}?%{NOTSPACE:error_bool} %{GREEDYDATA:logmessage}" }
if "ceph-osd" in [tags] {
grok {
match => { "message" => "-- (?<src_host>(%{IPORHOST}\:%{POSINT}/%{POSINT})) (?:[<|>]){1,2} (?<dst_host>(%{IPORHOST}\:%{POSINT}/%{POSINT}))" }
output {
pipeline {
id => "sendOutputPipeline"
send_to => {{ output_pipeline }}
- "parse-elasticsearch"
config.string: |
input {
pipeline {
id => "inputElasticsearch"
address => elasticsearch
filter {
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{LOGLEVEL:loglevel}\s*\]\[%{NOTSPACE:module}\s*\] %{GREEDYDATA:logmessage}" }
mutate {
replace => { "module" => "elasticsearch.%{module}" }
output {
pipeline {
id => "sendOutputPipeline"
send_to => {{ output_pipeline }}
- "parse-libvirt"
config.string: |
input {
pipeline {
id => "inputLibvirt"
address => libvirt
filter {
grok {
match => { "message" => "(?m)^%{TIMESTAMP_ISO8601:logdate}:%{SPACE}%{NUMBER:code}:?%{SPACE}\[?\b%{NOTSPACE:loglevel}\b\]?%{SPACE}?:?%{SPACE}\[?\b%{NOTSPACE:module}\b\]?%{SPACE}?%{GREEDYDATA:logmessage}?" }
add_field => { "received_at" => "%{@timestamp}"}
mutate {
uppercase => [ "loglevel" ]
output {
pipeline {
id => "sendOutputPipeline"
send_to => {{ output_pipeline }}
- "parse-logstash"
config.string: |
input {
pipeline {
id => "inputLogstash"
address => logstash
filter {
grok {
match => {
"message" => "\{\:timestamp=>\"%{TIMESTAMP_ISO8601:timestamp}\", \:message=>\"%{DATA:logmessage}\"(;|)(, \:address=>\"%{URIHOST:address}\", \:exception=>#<\"%{DATA:exception}\">, \:backtrace=>\[%{DATA:backtrace}\]|)(, \:level=>:\"%{LOGLEVEL:loglevel}\"|)\}"
mutate {
add_field => { "module" => "logstash" }
uppercase => [ "loglevel" ]
if [loglevel] == "WARN" {
mutate {
replace => { "loglevel" => "WARNING" }
} else if ![loglevel] {
mutate {
add_field => { "loglevel" => "ERROR" }
output {
pipeline {
id => "sendOutputPipeline"
send_to => {{ output_pipeline }}
- "parse-mysql"
path.config: "/etc/logstash/conf.d/10-mysql.conf"
config.string: |
input {
pipeline {
id => "inputMysql"
address => mysql
filter {
grok {
match => { "message" => "# User@Host: %{WORD:user}\[%{WORD}\] @ (%{HOSTNAME:client_hostname}|) \[(%{IP:client_ip}|)\]" }
grok {
match => { "message" => "# Thread_id: %{NUMBER:thread_id:int} \s*Schema: (%{WORD:schema}| ) \s*QC_hit: %{WORD:qc_hit}" }
grok {
match => { "message" => "# Query_time: %{NUMBER:query_time:float} \s*Lock_time: %{NUMBER:lock_time:float} \s*Rows_sent: %{NUMBER:rows_sent:int} \s*Rows_examined: %{NUMBER:rows_examined:int}" }
grok {
match => { "message" => "(?m)SET timestamp=%{NUMBER:timestamp};%{GREEDYDATA:logmessage}" }
geoip {
source => "clientip"
date {
match => [ "timestamp", "UNIX" ]
mutate {
remove_field => "timestamp"
mutate {
gsub => [ "logmessage", "^\n", "" ]
add_field => { "module" => "mysql" }
add_field => { "loglevel" => "WARNING" }
output {
pipeline {
id => "sendOutputPipeline"
send_to => {{ output_pipeline }}
- "parse-nginx"
config.string: |
input {
pipeline {
id => "inputNginx"
address => nginx
filter {
if "nginx-access" in [tags] {
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => {
"message" => "%{IP:client_ip} - %{USER:client_user} \[%{NGINX_TIMESTAMP:timestamp}\] \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:http_version}\" %{INT:response_code} %{INT:bytes} %{QUOTEDSTRING:referer} %{QUOTEDSTRING:user_agent} %{QUOTEDSTRING:gzip_ratio}"
geoip {
source => "clientip"
if "nginx-error" in [tags] {
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => {
"message" => "%{NGINX_ERROR_TIMESTAMP:timestamp} \[%{LOGLEVEL:loglevel}\] %{GREEDYDATA:error_msg}"
output {
pipeline {
id => "sendOutputPipeline"
send_to => {{ output_pipeline }}
- "parse-openstack"
config.string: |
input {
pipeline {
id => "inputOpenstack"
address => openstack
filter {
if "oslofmt" in [tags] {
if "Can not find policy directory: policy.d" in [message] {
drop { }
grok {
match => {
"message" => [
"^%{TIMESTAMP_ISO8601:logdate}%{SPACE}%{NUMBER:pid}?%{SPACE}?(?<loglevel>AUDIT|CRITICAL|DEBUG|INFO|TRACE|WARNING|ERROR) \[?\b%{NOTSPACE:module}\b\]?%{SPACE}?%{GREEDYDATA:logmessage}?",
add_field => { "received_at" => "%{@timestamp}" }
if "nova" in [tags] {
mutate {
gsub => ["logmessage","\"",""]
if [module] == "nova.osapi_compute.wsgi.server" {
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} status\: %{NUMBER:response} len\: %{NUMBER:bytes:int} time\: %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
} else if [module] == "nova.api.ec2" {
grok {
match => { "logmessage" => "\[%{GREEDYDATA:requestid}\] %{NUMBER:seconds}s %{NOTSPACE:requesterip} %{NOTSPACE:verb} %{NOTSPACE:url_path} None\:None %{NUMBER:response} %{GREEDYDATA:user_agent}" }
add_tag => ["apimetrics"]
} else if [module] == "nova.metadata.wsgi.server" {
grok {
match => { "logmessage" => "\[%{GREEDYDATA:requestid}\] %{NOTSPACE:requesterip} %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} status\: %{NUMBER:response} len\: %{NUMBER:bytes} time\: %{NUMBER:seconds}" }
add_tag => ["apimetrics"]
} else if "neutron" in [tags] {
if [module] == "neutron.wsgi" {
if "accepted" not in [logmessage] {
mutate {
gsub => ["logmessage","\"",""]
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
} else if "neutron-ha-tool" in [source] {
mutate {
add_tag => ["neutron-ha-tool"]
remove_tag => ["_grokparsefailure"]
if "starting" in [message] and "_grokparsefailure" in [tags] {
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid}|\-)\](%{SPACE}\(%{NUMBER:pid}\)) %{GREEDYDATA:servicemessage}" }
mutate {
remove_tag => ["_grokparsefailure"]
} else if "glance" in [tags] {
if [module] == "eventlet.wsgi.server" {
mutate {
gsub => ["logmessage","\"",""]
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
mutate {
replace => { "module" => "glance.%{module}" }
} else if "cinder" in [tags] {
if [module] == "cinder.eventlet.wsgi.server" {
if "accepted" not in [logmessage] {
mutate {
gsub => ["logmessage","\"",""]
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
mutate {
replace => { "module" => "cinder.%{module}" }
} else if "horizon" in [tags] {
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => {
"message" => [
"\[%{APACHE_ERROR_TIMESTAMP:timestamp}\] \[%{DATA:module}:%{DATA:loglevel}\] \[pid %{POSINT:apache_pid}\:tid %{POSINT:apache_tid}\] ?(?:\[client %{IP:clientip}:%{POSINT:clientport}\] )?%{GREEDYDATA:logmessage}",
geoip {
source => "clientip"
if ![loglevel] {
mutate {
add_field => { "logmessage" => "%{request}" }
add_field => { "module" => "horizon.access" }
add_field => { "loglevel" => "INFO" }
add_tag => [ "apache-access" ]
} else {
mutate {
replace => { "module" => "horizon.error.%{module}" }
add_tag => [ "apache-error" ]
uppercase => [ "loglevel" ]
} else if "heat" in [tags] {
if [module] == "eventlet.wsgi.server" {
if "accepted" not in [logmessage] {
mutate {
gsub => ["logmessage","\"",""]
grok {
match => { "logmessage" => "\[%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} %{NOTSPACE} %{NOTSPACE} %{NOTSPACE}\] %{NOTSPACE:requesterip} %{NOTSPACE} %{NOTSPACE} \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes} %{BASE10NUM:httptime}" }
add_tag => ["apimetrics"]
mutate {
replace => { "module" => "heat.%{module}" }
} else if [module] == "heat.engine.service" {
grok {
match => { "logmessage" => "\[%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} %{NOTSPACE} %{NOTSPACE} %{NOTSPACE} %{GREEDYDATA:servicemessage}" }
add_tag => ["apimetrics"]
} else if "swift-container" in [tags] {
grok {
match => {
} else if "swift-account" in [tags] {
grok {
match => {
} else if "swift" in [tags] {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{NOTSPACE:logsource} %{SYSLOGPROG:module}: (?:%{SPACE})?%{GREEDYDATA:logmessage}"
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => {
"logmessage" => [
"%{GREEDYDATA:logmessage} \(txn\: %{DATA:swift_txn}\)"
tag_on_failure => []
overwrite => [ "logmessage" ]
if [request] {
mutate {
replace => { "logmessage" => "%{request}" }
mutate {
replace => { "module" => "swift.%{module}" }
if [file] =~ "error.log$" {
mutate {
add_field => { "loglevel" => "NOTICE" }
} else {
mutate {
add_field => { "loglevel" => "INFO" }
} else if "keystone-access" in [tags] {
grok {
match => { "message" => "%{CISCOTIMESTAMP:keystone_access_timestamp}%{SPACE}%{SYSLOGHOST:log_host}%{SPACE}%{SYSLOGPROG:prog}%{SPACE}%{TIMESTAMP_ISO8601:keystone_timestmp}%{SPACE}%{NUMBER:pid}%{SPACE}%{NOTSPACE:loglevel}%{SPACE}%{NOTSPACE:module}%{SPACE}%{SYSLOG5424SD:requestid}%{SPACE}%{WORD:verb}%{SPACE}%{NOTSPACE:request}" }
} else if "keystone" in [tags] {
if "apache-access" in [tags] {
grok {
match => { "message" => "%{COMMONAPACHELOG}" }
mutate {
add_field => { "logmessage" => "%{request}" }
add_field => { "module" => "keystone.access" }
add_field => { "loglevel" => "INFO" }
} else if "apache-error" in [tags] {
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => { "message" => "%{KEYSTONE_SUBSECOND_TIMESTAMP:keystone_subsecond_timestamp} %{STANDARD_TIMESTAMP:standard_timestamp} %{NUMBER:pid} %{DATA:loglevel} %{DATA:module} \[%{DATA:requestid}\] %{WORD:verb} %{NOTSPACE:request}" }
mutate {
replace => { "module" => "keystone.error.%{module}" }
uppercase => [ "loglevel" ]
} else if "magnum" in [tags] {
if [module] == "eventlet.wsgi.server" {
mutate {
gsub => ["logmessage","\"",""]
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
mutate {
replace => { "module" => "magnum.%{module}" }
} else if "octavia" in [tags] {
if [module] == "eventlet.wsgi.server" {
mutate {
gsub => ["logmessage","\"",""]
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
mutate {
replace => { "module" => "octavia.%{module}" }
output {
pipeline {
id => "sendOutputPipeline"
send_to => {{ output_pipeline }}
- "parse-rabbitmq"
config.string: |
input {
pipeline {
id => "inputRabbitmq"
address => rabbitmq
filter {
if [message] == "" {
drop { }
grok {
match => { "message" => "^\=%{LOGLEVEL:loglevel} REPORT\=\=\=\= %{MONTHDAY:event_day}\-%{MONTH:event_month}\-%{YEAR:event_year}\:\:%{TIME:event_time} \=\=\=\n%{GREEDYDATA:logmessage}" }
mutate {
replace => { "module" => "rabbitmq" }
add_field => { "timestamp" => "%{event_day} %{event_month} %{event_year} %{event_time}" }
date {
match => [ "timestamp", "dd MMM YYYY HH:mm:ss" ]
remove_field => [ "event_day", "event_month", "event_year", "event_time", "timestamp" ]
output {
pipeline {
id => "sendOutputPipeline"
send_to => {{ output_pipeline }}
- "local-elasticsearch"
config.string: |
input {
pipeline {
id => "inputElasticsearchPipeline"
address => es_local
filter {
if [source.ip] {
geoip {
id => "setGeoIpSource"
source => "source.ip"
} else if [ip] {
geoip {
id => "setGeoIp"
source => "ip"
if [message] {
fingerprint {
id => "setSHA1"
target => "[@metadata][fingerprint]"
method => "SHA1"
key => "{{ inventory_hostname | to_uuid }}"
} else {
fingerprint {
target => "[@metadata][fingerprint]"
method => "UUID"
output {
if [@metadata][version] {
elasticsearch {
id => "elasticsearchOutputPipeline"
document_id => "%{[@metadata][fingerprint]}"
hosts => {{ elasticsearch_data_hosts | shuffle(seed=inventory_hostname) | to_json }}
sniffing => {{ (not data_node | bool) | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
} else {
elasticsearch {
id => "elasticsearchLegacyOutputPipeline"
document_id => "%{[@metadata][fingerprint]}"
hosts => {{ elasticsearch_data_hosts | shuffle(seed=inventory_hostname) | to_json }}
sniffing => {{ (not data_node | bool) | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
{% if logstash_kafka_options is defined %}
- "remote-kafka"
config.string: |
input {
pipeline {
id => "inputKafkaPipeline"
address => kafka_remote
output {
kafka {
{% for key, value in logstash_kafka_options.items() %}
{% if value is number %}
{{ key }} => {{ value }}
{% elif value is iterable and value is not string %}
{{ key }} => "{{ value | join(',') }}"
{% else %}
{{ key }} => "{{ value }}"
{% endif %}
{% endfor %}
{% endif %}

View File

@ -46,7 +46,7 @@ metricbeat.max_start_delay: 10s
#========================== Modules configuration ============================
{% if (not physical_host is defined) or (physical_host is defined and physical_host == inventory_hostname) %}
#------------------------------- System Module -------------------------------
- module: system
@ -111,7 +111,7 @@ metricbeat.modules:
socket.reverse_lookup.enabled: true
socket.reverse_lookup.success_ttl: 60s
socket.reverse_lookup.failure_ttl: 60s
{% endif %}
##------------------------------ Aerospike Module -----------------------------
#- module: aerospike
# metricsets: ["namespace"]