Files
Doug Szumski a2273026d7 Define retry_tag for unprocessed Fluentd logs
The Kolla Ansible Fluentd config is not idempotent. In practise this
can be an issue if some logs are rejected by OpenSearch /
ElasticSearch bulk API. In this case, by default, the unprocessed
logs will be reprocessed from the start of the Fluentd pipeline,
leading to error messages.

The solution proposed here is to explicitly set the retry_tag as
documented in [1,2], and add a dedicated output for retried logs.

An alternative fix could be to make the pipeline idempotent. This
would require a compromise of either duplicating content, or having
non-standard fields for the log payload/message.

[1] https://github.com/fluent/fluent-plugin-opensearch?tab=readme-ov-file#retry_tag
[2] https://github.com/uken/fluent-plugin-elasticsearch?tab=readme-ov-file#retry_tag

Closes-Bug: #2064104
Change-Id: I310fc1b8e002ce9f0ba60f8bb67b7f372a589314
2025-01-17 10:23:59 +00:00

45 lines
1.6 KiB
Django/Jinja

{% for match_pattern in ['retry_es', '**',] %}
<match {{ match_pattern }}>
@type copy
<store>
@type elasticsearch
host {{ elasticsearch_address }}
port {{ elasticsearch_port | default('9200') }}
scheme {{ fluentd_elasticsearch_scheme }}
{% if fluentd_elasticsearch_path != '' %}
path {{ fluentd_elasticsearch_path }}
{% endif %}
{% if fluentd_elasticsearch_scheme == 'https' %}
ssl_version {{ fluentd_elasticsearch_ssl_version }}
ssl_verify {{ fluentd_elasticsearch_ssl_verify }}
{% if fluentd_elasticsearch_cacert | length > 0 %}
ca_file {{ fluentd_elasticsearch_cacert }}
{% endif %}
{% endif %}
{% if fluentd_elasticsearch_user != '' and fluentd_elasticsearch_password != ''%}
user {{ fluentd_elasticsearch_user }}
password {{ fluentd_elasticsearch_password }}
{% endif %}
logstash_format true
logstash_prefix {{ opensearch_log_index_prefix }}
reconnect_on_error true
{% if match_pattern != 'retry_es' %}
retry_tag retry_es
{% endif %}
request_timeout {{ fluentd_elasticsearch_request_timeout }}
suppress_type_name true
bulk_message_request_threshold {{ fluentd_bulk_message_request_threshold }}
<buffer>
@type file
{% if match_pattern == 'retry_es' %}
path /var/lib/fluentd/data/elasticsearch.buffer/openstack_retry.*
{% else %}
path /var/lib/fluentd/data/elasticsearch.buffer/openstack.*
{% endif %}
flush_interval 15s
chunk_limit_size {{ fluentd_buffer_chunk_limit_size }}
</buffer>
</store>
</match>
{% endfor %}