Add Logsender tool

The logsender tool will be used to parse the content downloaded
by logscraper tool with --download parameter and send it to the
Elasticsearch service.
By using logsender tool, services like: gearman client, gearman
worker and logstash are not necessary to maintain, because
the information are send directly to the Elasticsearch host.

Depends-On: https://review.opendev.org/c/openstack/ci-log-processing/+/830169

Change-Id: I3e28b847e7cafbf4742fb718ef09ce3e658f945f
This commit is contained in:
Daniel Pawlik 2022-02-22 10:36:06 +01:00
parent 9d32d082e1
commit c0cc4dbaf1
19 changed files with 1516 additions and 2 deletions

View File

@ -7,6 +7,14 @@
nodes:
- name: centos-8-stream
label: centos-8-stream
- job:
name: ci-log-processing-functional-test-centos-8-stream-sender
description: Test is validating Logscraper and logsender services
run: ansible/playbooks/check-services-sender.yml
nodeset:
nodes:
- name: centos-8-stream
label: centos-8-stream
- project:
templates:
@ -18,4 +26,5 @@
- openstack-tox-pep8
- openstack-tox-py38
- ci-log-processing-functional-test-centos-8-stream
- ci-log-processing-functional-test-centos-8-stream-sender
gate: *logcheck

View File

@ -37,4 +37,3 @@ RUN dnf remove -y python3-devel git && \
rm -rf ~/.cache/pip
USER logscraper
ENTRYPOINT ["/usr/local/bin/logscraper"]

View File

@ -13,6 +13,17 @@ check by using Zuul CI API if there are new builds available
and if there are some, it would push the informations to
the log processing system.
Zuul Log Sender
---------------
The Zuul Log Sender tool is responsible for periodical check
directory, if there are some files that should be send to the
Elasticsearch service.
NOTE: build directories that does not provide files `buildinfo`
and `inventory.yaml` file are skipped.
Testing
-------
@ -23,6 +34,94 @@ continuous-integration environment, powered by `Zuul
Any changes to logscraper script or tests will trigger jobs to
thoroughly test those changes.
Benchmarking
------------
The large Zuul CI deployments requires many CI log processing resources.
In that case, we can do a benchmark with two log processing deployments.
All tests will do same:
- send 100 log builds to Elasticsearch that is running on same host
- logscraper will be using 4 workers
- VM will have 8 vcpus, 16 GB of RAM
Testing workflows:
* loggearman and logstash
This workflow will spawn 3 additional loggearman workers because it this
service is a bottleneck in that log ci workflow.
You can do it with command:
.. code-block:: shell
for i in {1..3}; do \
podman run --network host -d --name loggearman-worker-$i \
--volume /etc/loggearman:/etc/loggearman:z \
--volume /var/log/loggearman:/var/log/loggearman:z \
quay.rdoproject.org/software-factory/loggearman:latest \
log-gearman-worker -c /etc/loggearman/worker.yml --foreground -d /var/log/loggearman/worker.log
To remove:
.. code-block:: shell
for i in {1..3}; do \
podman stop loggearman-worker-$i ; podman rm loggearman-worker-$i
On the end, basic calucations:
.. code-block:: python
import datetime
start = datetime.datetime.fromisoformat("2022-02-28 16:44:59")
stop = datetime.datetime.fromisoformat("2022-02-28 16:46:01")
print((stop-start).total_seconds())
Time spend to run logscraper and wait for finish all loggearman workers took: 62 seconds and
it takes 680MB of RAM.
* logsender workflow
This workflow will only use logsender tool and it will push the logs
directly to the Elasticsearch service. Same as in previous test,
it will be executed on 4 processes.
To download logs:
.. code-block:: shell
logscraper \
--zuul-api-url https://zuul.opendev.org/api/tenant/openstack \
--checkpoint-file /tmp/results-checkpoint.txt \
--worker 8 \
--max-skipped 100 \
--download \
--directory /tmp/logscraper
This operation took: 30 seconds and it uses 130 MB of RAM.
.. code-block:: shell
logsender --username admin --password mypassword --host localhost --port 9200 --insecure --workers 4
Time spend to run logscraper and wait for finish all loggearman workers took: 35 second and
it takes 520 MB of RAM.
Conclusion:
The logsender way seems to use less memory (on Opendev deployment, logstash
service is on different host, but 4096 MB of RAM is not enough) and it is faster,
but the logscraper and logsender process was executed one by one - on the
beginning logscraper download logs, then logsender send them to
Elasticsearch.
Continuous Deployment
---------------------
Once changes are reviewed and committed, they will be applied

View File

@ -0,0 +1,25 @@
---
- hosts: all
become: true
vars:
# logscraper
tenant_builds:
- tenant: openstack
zuul_api_url: https://zuul.opendev.org/api/tenant/openstack
zuul_api_urls:
- https://zuul.opendev.org/api/tenant/openstack
insecure: false
job_names: []
download: true
download_dir: /mnt/logscraper/openstack
es_username: admin
es_password: admin
es_host: localhost
es_port: 9200
es_insecure: true
es_index: logstash-logscraper
tasks:
- name: Run check services
include_role:
name: check-services
tasks_from: download.yml

View File

@ -0,0 +1,122 @@
---
- name: Install packages
package:
name: podman
state: present
### BUILD CONTAINER IMAGES ###
- name: Build container images
block:
- name: Build logscraper container image - Zuul
shell: >
podman build -t quay.io/logscraper:dev -f Dockerfile
args:
chdir: "{{ zuul.projects['opendev.org/openstack/ci-log-processing'].src_dir }}"
when: zuul is defined
- name: Build logscraper container image - non Zuul
shell: >
podman build -t quay.io/logscraper:dev -f Dockerfile
args:
chdir: "{{ playbook_dir }}"
when: zuul is not defined
- name: Get logscraper image id
shell: |
podman images --noheading quay.io/logscraper:dev | awk '{print $3}'
register: _logscraper_image_id
- name: Print all images
shell: |
podman images
- name: Replace container images
set_fact:
container_images:
logscraper: "{{ _logscraper_image_id.stdout }}"
logsender: "{{ _logscraper_image_id.stdout }}"
### OPENSEARCH ####
- name: Setup Opensearch
shell: >
podman run -d --name opensearch \
--network host \
-e "discovery.type=single-node" \
quay.rdoproject.org/software-factory/opensearch:1.1.0
- name: Wait for Opensearch to be up
wait_for:
host: 0.0.0.0
port: 9200
delay: 10
timeout: 300
- name: Wait for Opensearch to be up
uri:
url: "https://0.0.0.0:9200"
user: "admin"
password: "admin"
force_basic_auth: true
method: GET
validate_certs: false
status_code: "200"
register: result
until: result.status == 200
retries: 30
delay: 10
### Logscraper ###
- name: Setup logscraper service
include_role:
name: logscraper
### Logsender ###
- name: Setup logsender service
include_role:
name: logsender
# Flush handlers before running test
- name: Force all notified handlers to run now
meta: flush_handlers
### service validation ###
- name: Ensure that all services are available and running
shell: |
systemctl is-active -q {{ item }}
loop:
- logscraper-openstack
- logsender-openstack
register: _service_status
failed_when: _service_status.rc != 0
# FIXME: The index is created on the beginning of the logsender work.
# The playbook should validate if some data has been pushed to the ES.
- name: Get Opensearch indices
uri:
url: "https://localhost:9200/_cat/indices"
user: "admin"
password: "admin"
force_basic_auth: true
method: GET
validate_certs: false
status_code: "200"
return_content: true
register: _opensearch_indices
until: "'logstash-logscraper' in _opensearch_indices.content"
retries: 30
delay: 10
- name: Check if build_branch exists in index content
uri:
url: "https://localhost:9200/logstash-logscraper"
user: "admin"
password: "admin"
force_basic_auth: true
method: GET
validate_certs: false
status_code: "200"
return_content: true
register: _opensearch_index_content
until: "'build_branch' in _opensearch_index_content.content"
retries: 30
delay: 10

View File

@ -12,6 +12,7 @@
--volume {{ item.download_dir }}:{{ item.download_dir }}:z \
{% endif %}
{{ container_images['logscraper'] }} \
/usr/local/bin/logscraper \
{% if 'gearman_port' in item and 'gearman_server' in item %}
--gearman-port {{ item.gearman_port }} \
--gearman-server {{ item.gearman_server }} \

View File

@ -0,0 +1,91 @@
Logsender ansible role
======================
The goal of this role is to setup and configure service related
to logsender script which is responsible to parse log content,
attach required information that are available in `buildlog` and
`inventory.yaml` file and send it to Elasticsearch service.
Requirements
------------
None
Role Variables
--------------
The role is automatically deploying service related to the
log sender service.
Example Ansible variables that are configuring service:
.. code-block:: yaml
vars:
tenant_builds:
- tenant: openstack
es_username: admin
es_password: admin
es_host: localhost
es_port: 9200
es_insecure: true
es_index: logstash-logscraper
download_dir: /mnt/logscraper/sometenant
That configuration will will deploy service with name: `logsender-openstack.service`.
It is because there can be multiple instances of logsender service - each
will be configured to other tenant.
Dependencies
------------
None
Example Playbook
----------------
Playbook responsible for deploying service can look like:
Below is a playbook example, responsible for deploying two logsender
services, where one will responsible to get logs from `openstack` tenant
and second one for getting logs from `sometenant` tenant.
.. code-block:: yaml
- name: Configure Logscraper tool
hosts: localhost
become: true
vars:
tenant_builds:
- tenant: openstack
es_username: logstash
es_password: logstash
es_host: localhost
es_port: 9200
es_insecure: false
es_index: ""
es_index_prefix: ""
download_dir: /mnt/logscraper/openstack
- tenant: sometenant
es_username: logstash
es_password: logstash
es_host: otherhost
es_port: 9200
es_insecure: false
es_index: ""
es_index_prefix: ""
download_dir: /mnt/logscraper/sometenant
roles:
- logsender
License
-------
Apache
Author Information
------------------
Author: OpenStack Contributors
Author email: openstack-discuss@lists.openstack.org
Home page: http://docs.openstack.org/infra/ci-log-processing

View File

@ -0,0 +1,27 @@
---
logsender_user: logscraper
logsender_group: logscraper
logscraper_gid: 10210
logscraper_uid: 10210
container_images:
# FIXME: Create new project on Docker hub that will contain that image
logsender: quay.rdoproject.org/software-factory/logscraper:latest
# Elasticsearch configuration
# tenant_builds:
# - tenant: sometenant
# es_host: localhost
# es_port: 9200
# es_username: logstash
# es_password: logstash
# es_index_prefix: ""
# es_index: ""
# es_insecure: false
# download_dir: /mnt/logscraper/sometenant
# doc_type: "_doc"
# logsender_workers: 1
# chunk_size: 1500
# keep: true
# ignore_es_status: false
tenant_builds: []

View File

@ -0,0 +1,13 @@
---
galaxy_info:
author: Openstack Contributors
description: Openstack Logsender tool
company: Openstack
license: Apache
min_ansible_version: 2.9
platforms:
- name: Centos
versions:
- 8
galaxy_tags: []
dependencies: []

View File

@ -0,0 +1,23 @@
---
- name: Create dedicated group
group:
name: "{{ logsender_group }}"
state: present
- name: Create dedicated user
user:
name: "{{ logsender_user }}"
state: present
comment: "Dedicated user for logsender"
group: "{{ logsender_group }}"
shell: "/sbin/nologin"
create_home: false
- name: Ensure container software is installed
package:
name: podman
state: present
- name: Configure logsender service
include_tasks: service.yml
loop: "{{ tenant_builds }}"

View File

@ -0,0 +1,29 @@
---
- name: Generate logsender script
template:
src: logsender.sh.j2
dest: "/usr/local/bin/logsender-{{ item.tenant }}"
mode: '0755'
register: _start_script
- name: Generate systemd unit
template:
src: logsender.service.j2
dest: "/etc/systemd/system/logsender-{{ item.tenant }}.service"
owner: root
group: root
- name: Enable and restart service
service:
name: logsender-{{ item.tenant }}
state: restarted
daemon-reload: true
enabled: true
when: _start_script.changed
- name: Ensure that service is running
service:
name: logsender-{{ item.tenant }}
state: started
daemon-reload: true
enabled: true

View File

@ -0,0 +1,16 @@
[Unit]
Description=Logsender service for {{ item.tenant }}
After=syslog.target network.target
StartLimitInterval=20
StartLimitBurst=5
[Service]
Type=simple
SyslogIdentifier=logsender-{{ item.tenant }}
Restart=always
RestartSec=3s
ExecStop=-/usr/bin/podman stop -t 10 logsender-{{ item.tenant }}
ExecStart=/usr/local/bin/logsender-{{ item.tenant }}
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,52 @@
#!/bin/bash
/usr/bin/podman run \
--network host \
--rm \
--user 1000:1000 \
--uidmap 0:{{ logscraper_uid + 1 }}:999 \
--uidmap 1000:{{ logscraper_uid }}:1 \
--name logsender-{{ item.tenant }} \
--volume {{ item.download_dir }}:{{ item.download_dir }}:z \
{{ container_images['logsender'] }} \
/usr/local/bin/logsender \
{% if 'es_host' in item and item['es_host'] %}
--host "{{ item['es_host'] }}" \
{% endif %}
{% if 'es_port' in item and item['es_port'] %}
--port "{{ item['es_port'] }}" \
{% endif %}
{% if 'es_username' in item and item['es_username'] %}
--username "{{ item.es_username }}" \
{% endif %}
{% if 'es_password' in item and item['es_password'] %}
--password "{{ item.es_password }}" \
{% endif %}
{% if 'es_index_prefix' in item and item['es_index_prefix'] %}
--index-prefix "{{ item['es_index_prefix'] }}" \
{% endif %}
{% if 'es_index' in item and item['es_index'] %}
--index "{{ item['es_index'] }}" \
{% endif %}
{% if 'download_dir' in item and item['download_dir'] %}
--directory "{{ item['download_dir'] }}" \
{% endif %}
{% if 'doc_type' in item and item['doc_type'] %}
--doc-type "{{ item['doc_type'] }}" \
{% endif %}
{% if 'es_insecure' in item and item['es_insecure'] %}
--insecure \
{% endif %}
{% if 'logsender_workers' in item and item['logsender_workers'] %}
--workers "{{ item['logsender_workers'] }}" \
{% endif %}
{% if 'chunk_size' in item and item['chunk_size'] %}
--chunk-size "{{ item['chunk_size'] }}" \
{% endif %}
{% if 'keep' in item and item['keep'] %}
--keep \
{% endif %}
{% if 'ignore_es_status' in item and item['ignore_es_status'] %}
--ignore-es-status \
{% endif %}
--follow

View File

@ -16,6 +16,7 @@ Contents:
.. sidebar:: HOWTOs
* :doc:`logscraper`
* :doc:`logsender`
* :doc:`ansible-role`
* :doc:`loggearman`
* :doc:`loggearman-role`
@ -24,6 +25,7 @@ Contents:
:maxdepth: 2
logscraper
logsender
ansible-role
loggearman
loggearman-role

96
doc/source/logsender.rst Normal file
View File

@ -0,0 +1,96 @@
Logsender
=========
The logscraper tool is parsing log files that are available
in the directory, attach important data that are provided in `buildlog` and
`inventory.yaml` files and send it directly to the Opensearch service.
Available arguments for logsender are:
.. code-block::
logsender --help
Check log directories and push to the Opensearch service
options:
-h, --help show this help message and exit
--directory DIRECTORY
Directory, where the logs will be stored. Defaults to: /tmp/logscraper
--host HOST Opensearch host
--port PORT Opensearch port
--username USERNAME Opensearch username
--password PASSWORD Opensearch user password
--index-prefix INDEX_PREFIX
Prefix for the index. Defaults to logstash-
--index INDEX Opensearch index. Defaults to: <index-prefix>-YYYY-DD
--insecure Skip validating SSL cert
--follow Keep sending CI logs
--workers WORKERS Worker processes for logsender
--chunk-size CHUNK_SIZE
The bulk chunk size
--keep Do not remove log directory after
--ignore-es-status Ignore Opensearch bulk
--debug DEBUG Be more verbose
Basic usage
-----------
Base on the use case, we can run logsender.
Example:
* Send logs to that is running on localhost, skip TLS cert verification
.. code-block::
logsender --username logstash --password logstashpassword --host localhost --port 9200 --insecure
* Send logs to service, use 8 workers and ignore Opensearch bulk update status. WARNING: --ignore-es-status should not be used on production environment!
.. code-block::
logsender --username logstash --password logstashpassword --host localhost --port 9200 --insecure --workers 8 --ignore-es-status
* Send logs to elasticsaerch service, provide own index name "myindex" and keep log files (they will be not deleted):
.. code-block::
logsender --username logstash --password logstashpassword --index myindex --keep
Containerize tool
-----------------
Instead of using `pip` tool, you can build your own container image
that contains logscraper tool, for example:
.. code-block::
docker build -t logscraper -f Dockerfile
NOTE: the logsender tool will be included in logscraper container image.
Then you can execute commands that are described above.
NOTE: The directory where you store log files should be mounted to the container.
For example:
.. code-block::
podman run \
--network host \
-d \
--name logsender-openstack \
--volume /mnt/logscraper/openstack:/mnt/logscraper/openstack:z \
logscraper \
/usr/local/bin/logsender \
--username admin \
--password admin \
--host localhost \
--port 9200 \
--directory /mnt/logscraper/openstack \
--follow

387
logscraper/logsender.py Executable file
View File

@ -0,0 +1,387 @@
#!/usr/bin/env python3
#
# Copyright (C) 2022 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The goal is to get content from build uuid directory and send to Opensearch
[ CLI ] -> [ Log directory ] -> [ Zuul inventory ] -> [ Send logs to ES ]
"""
import argparse
import collections
import copy
import datetime
import itertools
import logging
import multiprocessing
import os
import re
import shutil
import sys
import time
from opensearchpy import exceptions as opensearch_exceptions
from opensearchpy import helpers
from opensearchpy import OpenSearch
from ruamel.yaml import YAML
###############################################################################
# CLI #
###############################################################################
def get_arguments():
parser = argparse.ArgumentParser(description="Check log directories "
"and push to the Opensearch service")
parser.add_argument("--directory",
help="Directory, where the logs will "
"be stored. Defaults to: /tmp/logscraper",
default="/tmp/logscraper")
parser.add_argument("--host",
help="Opensearch host",
default='localhost')
parser.add_argument("--port",
help="Opensearch port",
type=int,
default=9200)
parser.add_argument("--username",
help="Opensearch username",
default='logstash')
parser.add_argument("--password", help="Opensearch user password")
parser.add_argument("--index-prefix", help="Prefix for the index. "
"Defaults to logstash-",
default='logstash-')
parser.add_argument("--index",
help="Opensearch index. Defaults to: "
"<index-prefix>-YYYY-DD")
parser.add_argument("--doc-type", help="Doc type information that will be"
"send to the Opensearch service",
default="_doc")
parser.add_argument("--insecure",
help="Skip validating SSL cert",
action="store_false")
parser.add_argument("--follow", help="Keep sending CI logs",
action="store_true")
parser.add_argument("--workers", help="Worker processes for logsender",
type=int,
default=1)
parser.add_argument("--chunk-size", help="The bulk chunk size",
type=int,
default=1500)
parser.add_argument("--keep", help="Do not remove log directory after",
action="store_true")
parser.add_argument("--ignore-es-status", help="Ignore Opensearch bulk",
action="store_true")
parser.add_argument("--debug", help="Be more verbose",
action="store_true")
args = parser.parse_args()
return args
###############################################################################
# Log sender #
###############################################################################
def _is_file_not_empty(file_path):
"""Return True when buildinfo file is not empty"""
# NOTE: we can assume, that when file exists, all
# content have been dowloaded to the directory.
return os.path.getsize(file_path) > 0
def check_info_files(root, files):
return True if (
'buildinfo' in files and 'inventory.yaml' in files and
_is_file_not_empty("%s/buildinfo" % root) and
_is_file_not_empty("%s/inventory.yaml" % root)
) else False
def read_yaml_file(file_path):
# FIXME: In logscraper yaml.dump seems not to be dumping correctly the
# dictionary, so ruamel lib is needed.
yaml = YAML()
with open(file_path, 'r') as f:
return yaml.load(f)
def read_text_file(file_path):
with open(file_path, 'r') as f:
return f.readlines()
def get_inventory_info(directory):
try:
build_inventory = read_yaml_file("%s/inventory.yaml" % directory)
return build_inventory['all']['vars']['zuul']
except FileNotFoundError:
logging.warning("Can not find inventory.yaml in build "
"dir %s" % directory)
def get_build_info(directory):
return read_yaml_file("%s/buildinfo" % directory)
def get_ready_directories(directory):
"""Returns a directory with list of files
That directories should have a 'buildinfo' and 'inventory.yaml' file
which are not empty.
"""
log_files = {}
for root, _, files in os.walk(directory):
build_uuid = root.split('/')[-1]
if check_info_files(root, files):
files.remove("buildinfo")
files.remove("inventory.yaml")
log_files[build_uuid] = files
else:
logging.info("Skipping build with uuid %s. Probably all files "
"are not dowloaded yet." % build_uuid)
continue
return log_files
def remove_directory(dir_path):
logging.debug("Removing directory %s" % dir_path)
shutil.rmtree(dir_path)
def makeFields(build_details, buildinfo):
fields = {}
fields["build_node"] = "zuul-executor"
# NOTE: that field is added later
# fields["filename"] = build_file
fields["build_name"] = buildinfo.get("job_name")
fields["build_status"] = buildinfo["result"]
fields["project"] = buildinfo.get('project')
fields["voting"] = int(build_details["voting"])
fields["build_set"] = build_details["buildset"]
fields["build_queue"] = build_details["pipeline"]
fields["build_ref"] = buildinfo.get("ref")
fields["build_branch"] = buildinfo.get("branch")
fields["build_change"] = buildinfo.get("change")
fields["build_patchset"] = buildinfo.get("patchset")
fields["build_newrev"] = build_details.get("newrev", "UNKNOWN")
fields["build_uuid"] = buildinfo.get("uuid")
fields["node_provider"] = "local"
fields["log_url"] = buildinfo.get("log_url")
fields["tenant"] = buildinfo.get("tenant")
if "executor" in build_details and "hostname" in build_details["executor"]:
fields["zuul_executor"] = build_details["executor"]["hostname"]
return fields
def send_bulk(es_client, request, workers, ignore_es_status, chunk_size):
"""Send bulk request to Opensearch"""
try:
if ignore_es_status:
return collections.deque(helpers.parallel_bulk(
es_client, request, thread_count=workers,
chunk_size=chunk_size))
# NOTE: To see bulk update status, we can use:
# https://elasticsearch-py.readthedocs.io/en/7.10.0/helpers.html#example
for success, info in helpers.parallel_bulk(es_client, request,
thread_count=workers,
chunk_size=chunk_size):
if not success:
logging.error("Chunk was not send to Opensearch %s" % info)
return
# If all bulk updates are fine, return True
return True
except Exception as e:
logging.critical("Exception occured on pushing data to "
"Opensearch %s" % e)
return
def get_timestamp(line):
try:
timestamp_search = re.search(r'[-0-9]{10}\s+[0-9.:]{12}', line)
timestamp = (timestamp_search.group() if timestamp_search else
datetime.datetime.utcnow().isoformat())
# NOTE: On python 3.6, it should be:
# datetime.datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f")
# Ci-log-processing is using container with Python 3.8, where
# fromisoformat attribute is available.
return datetime.datetime.fromisoformat(timestamp).isoformat()
except Exception as e:
logging.critical("Exception occured on parsing timestamp %s" % e)
def get_message(line):
try:
return line.split("|", 1)[1].replace('\n', '')
except IndexError:
return line.replace('\n', '')
def send_to_es(build_file, es_fields, es_client, index, workers,
ignore_es_status, chunk_size, doc_type):
"""Send document to the Opensearch"""
request = []
logging.info("Working on %s" % build_file)
file_content = read_text_file(build_file)
for line in file_content:
fields = copy.deepcopy(es_fields)
fields["@timestamp"] = get_timestamp(line)
message = get_message(line)
if not message:
continue
fields["message"] = message
doc = {"_index": index, "_type": doc_type, "_source": fields}
request.append(doc)
return send_bulk(es_client, request, workers, ignore_es_status, chunk_size)
def get_build_information(build_dir):
"""Return dictionary with build information"""
build_inventory = get_inventory_info(build_dir)
buildinfo = get_build_info(build_dir)
return makeFields(build_inventory, buildinfo)
def send(ready_directory, args, directory, index, workers):
"""Gen Opensearch fields and send"""
# NOTE: each process should have own Opensearch session,
# due error: TypeError: cannot pickle 'SSLSocket' object -
# SSLSocket cannot be serialized.
es_client = get_es_client(args)
build_uuid, build_files = ready_directory
build_dir = "%s/%s" % (directory, build_uuid)
es_fields = get_build_information(build_dir)
if not es_fields:
return
send_status = False
logging.debug("Provided build info %s" % es_fields)
for build_file in build_files:
es_fields["filename"] = build_file
send_status = send_to_es("%s/%s" % (build_dir, build_file),
es_fields, es_client, index, workers,
args.ignore_es_status, args.chunk_size,
args.doc_type)
if args.keep:
logging.info("Keeping file %s" % build_dir)
return
if send_status:
remove_directory(build_dir)
else:
logging.warning("The document was not send. Keeping log file")
def get_index(args):
index = args.index
if not index:
index = args.index_prefix + \
datetime.datetime.today().strftime('%Y.%m.%d')
if create_indices(index, args):
return index
def create_indices(index, args):
es_client = get_es_client(args)
try:
logging.info("Creating index %s" % index)
return es_client.indices.create(index)
except opensearch_exceptions.AuthorizationException:
logging.critical("You need to have permissions to create an index. "
"Probably you need to add [indices:admin/create] or "
"'create_index' permission to the index permissions "
"inside your role.")
except opensearch_exceptions.RequestError as e:
# NOTE(dpawlik) Use same functionality as Logstash do, so it
# will not require any additional permissions set to the default
# logstash role.
if e.error.lower() == 'resource_already_exists_exception':
logging.debug("The indices already exists, continue")
return True
def prepare_and_send(ready_directories, args):
"""Prepare information to send and Opensearch"""
directory = args.directory
workers = args.workers
index = get_index(args)
if not index:
logging.critical("Can not continue without created indices")
sys.exit(1)
with multiprocessing.Pool(processes=args.workers) as pool:
pool.starmap(send, zip(
list(ready_directories.items()),
itertools.repeat(args),
itertools.repeat(directory), itertools.repeat(index),
itertools.repeat(workers)))
def setup_logging(debug):
if debug:
logging.basicConfig(format="%(asctime)s %(message)s",
level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.debug("Log sender is starting...")
def get_es_client(args):
es_creds = {
"host": args.host,
"port": args.port,
"http_compress": True,
"use_ssl": True,
"verify_certs": args.insecure,
"ssl_show_warn": args.insecure,
}
if args.username and args.password:
es_creds["http_auth"] = "%s:%s" % (args.username, args.password)
es_client = OpenSearch([es_creds], timeout=60)
logging.info("Connected to Opensearch: %s" % es_client.info())
return es_client
def run(args):
ready_directories = get_ready_directories(args.directory)
logging.info("Found %s builds to send to Opensearch service" % len(
ready_directories))
prepare_and_send(ready_directories, args)
logging.info("Finished pushing logs!")
def main():
args = get_arguments()
setup_logging(args.debug)
while True:
run(args)
if not args.follow:
break
time.sleep(60)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,520 @@
#!/usr/bin/env python3
#
# Copyright (C) 2022 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from logscraper import logsender
from logscraper.tests import base
from ruamel.yaml import YAML
from unittest import mock
buildinfo = """
_id: 17428524
branch: master
build_args:
checkpoint_file: /tmp/results-checkpoint.txt
debug: false
directory: /tmp/logscraper
download: true
follow: false
gearman_port: 4730
gearman_server: null
insecure: true
job_name: null
logstash_url: null
max_skipped: 500
workers: 32
zuul_api_url:
- https://zuul.opendev.org/api/tenant/openstack
buildset:
uuid: 52b29e0e716a4436bd20eed47fa396ce
change: 829161
duration: 1707.0
end_time: '2022-02-28T10:07:36'
error_detail: null
event_id: dda0cbf9caaa496b9127a7646b8a28a8
event_timestamp: '2022-02-28T09:32:08'
final: true
held: false
job_name: openstack-tox-py39
log_url: https://somehost/829161/3/check/openstack-tox-py39/38bf2cd/
newrev: null
nodeset: fedora-35
patchset: '3'
pipeline: check
project: openstack/neutron
provides: []
ref: refs/changes/61/829161/3
ref_url: https://review.opendev.org/829161
result: SUCCESS
start_time: '2022-02-28T09:39:09'
tenant: openstack
uuid: 38bf2cdc947643c9bb04f11f40a0f211
voting: true
"""
inventory_info = """
all:
hosts:
fedora-35:
ansible_connection: ssh
ansible_host: 127.0.0.1
ansible_port: 22
ansible_python_interpreter: auto
ansible_user: zuul
ara_compress_html: false
ara_report_path: ara-report
ara_report_type: html
bindep_profile: test py39
enable_fips: false
nodepool:
az: null
cloud: rax
external_id: 3b2da968-7ec3-4356-b12c-b55b574902f8
host_id: ed82a4a59ac22bf396288f0b93bf1c658af932130f9d336aad528f21
interface_ip: 127.0.0.2
label: fedora-35
private_ipv4: 127.0.0.3
private_ipv6: null
provider: rax-dfw
public_ipv4: 127.0.0.2
public_ipv6: ''
region: DFW
python_version: 3.9
tox_constraints_file: 'requirements/upper-constraints.txt'
tox_environment:
NOSE_HTML_OUT_FILE: nose_results.html
NOSE_WITH_HTML_OUTPUT: 1
NOSE_WITH_XUNIT: 1
tox_envlist: py39
vars:
ara_compress_html: false
ara_report_path: ara-report
ara_report_type: html
bindep_profile: test py39
enable_fips: false
python_version: 3.9
tox_constraints_file: 'requirements/upper-constraints.txt'
tox_environment:
NOSE_HTML_OUT_FILE: nose_results.html
NOSE_WITH_HTML_OUTPUT: 1
NOSE_WITH_XUNIT: 1
tox_envlist: py39
zuul:
_inheritance_path:
- 'some_path'
- 'some_path_2'
attempts: 1
branch: master
build: 38bf2cdc947643c9bb04f11f40a0f211
buildset: 52b29e0e716a4436bd20eed47fa396ce
change: '829161'
change_url: https://review.opendev.org/829161
child_jobs: []
event_id: dda0cbf9caaa496b9127a7646b8a28a8
executor:
hostname: ze07.opendev.org
inventory_file: /var/lib/zuul/builds/build/ansible/inventory.yaml
log_root: /var/lib/zuul/builds/build/work/logs
result_data_file: /var/lib/zuul/builds/build/work/results.json
src_root: /var/lib/zuul/builds/build/work/src
work_root: /var/lib/zuul/builds/build/work
items:
- branch: master
change: '828673'
change_url: https://review.opendev.org/828673
patchset: '4'
project:
canonical_hostname: opendev.org
canonical_name: opendev.org/openstack/neutron
name: openstack/neutron
short_name: neutron
src_dir: src/opendev.org/openstack/neutron
- branch: master
change: '829161'
change_url: https://review.opendev.org/829161
patchset: '3'
project:
canonical_hostname: opendev.org
canonical_name: opendev.org/openstack/neutron
name: openstack/neutron
short_name: neutron
src_dir: src/opendev.org/openstack/neutron
job: openstack-tox-py39
jobtags: []
message: Q3YmM0Y2QzNzhkMWZhOWE5ODYK
patchset: '3'
pipeline: check
playbook_context:
playbook_projects:
trusted/project_0/opendev.org/opendev/base-jobs:
canonical_name: opendev.org/opendev/base-jobs
checkout: master
commit: 19dc53290a26b20d5c2c5b1bb25f029c4b04a716
trusted/project_1/opendev.org/zuul/zuul-jobs:
canonical_name: opendev.org/zuul/zuul-jobs
checkout: master
commit: e160f59e0e76c7e8625ec2d174b044a7c92cd32e
untrusted/project_0/opendev.org/zuul/zuul-jobs:
canonical_name: opendev.org/zuul/zuul-jobs
checkout: master
commit: e160f59e0e76c7e8625ec2d174b044a7c92cd32e
untrusted/project_1/opendev.org/opendev/base-jobs:
canonical_name: opendev.org/opendev/base-jobs
checkout: master
commit: 19dc53290a26b20d5c2c5b1bb25f029c4b04a716
playbooks:
- path: untrusted/project/opendev/zuul/zuul-jobs/playbooks/tox/run.yaml
roles:
- checkout: master
checkout_description: zuul branch
link_name: ansible/playbook_0/role_0/base-jobs
link_target: untrusted/project_1/opendev.org/opendev/base-jobs
role_path: ansible/playbook_0/role_0/base-jobs/roles
- checkout: master
checkout_description: playbook branch
link_name: ansible/playbook_0/role_1/zuul-jobs
link_target: untrusted/project_0/opendev.org/zuul/zuul-jobs
role_path: ansible/playbook_0/role_1/zuul-jobs/roles
post_review: false
project:
canonical_hostname: opendev.org
canonical_name: opendev.org/openstack/neutron
name: openstack/neutron
short_name: neutron
src_dir: src/opendev.org/openstack/neutron
projects:
opendev.org/openstack/neutron:
canonical_hostname: opendev.org
canonical_name: opendev.org/openstack/neutron
checkout: master
checkout_description: zuul branch
commit: 7be5a0aff1123b381674191f3baa1ec9c128e0f3
name: openstack/neutron
required: false
short_name: neutron
src_dir: src/opendev.org/openstack/neutron
opendev.org/openstack/requirements:
canonical_hostname: opendev.org
canonical_name: opendev.org/openstack/requirements
checkout: master
checkout_description: zuul branch
commit: 48fb5c24764d91833d8ca7084ee9f183785becd6
name: openstack/requirements
required: true
short_name: requirements
src_dir: src/opendev.org/openstack/requirements
ref: refs/changes/61/829161/3
resources: {}
tenant: openstack
timeout: 3600
voting: true
"""
parsed_fields = {
'build_node': 'zuul-executor',
'build_name': 'openstack-tox-py39',
'build_status': 'SUCCESS',
'project': 'openstack/neutron',
'voting': 1,
'build_set': '52b29e0e716a4436bd20eed47fa396ce',
'build_queue': 'check',
'build_ref': 'refs/changes/61/829161/3',
'build_branch': 'master',
'build_change': 829161,
'build_patchset': '3',
'build_newrev': 'UNKNOWN',
'build_uuid': '38bf2cdc947643c9bb04f11f40a0f211',
'node_provider': 'local',
'log_url':
'https://somehost/829161/3/check/openstack-tox-py39/38bf2cd/',
'tenant': 'openstack',
'zuul_executor': 'ze07.opendev.org'
}
def _parse_get_yaml(text):
yaml = YAML()
return yaml.load(text)
class _MockedPoolMapResult:
def __init__(self, func, iterable):
self.func = func
self.iterable = iterable
# mocked results
self._value = [self.func(i) for i in iterable]
def get(self, timeout=0):
return self._value
class FakeArgs(object):
def __init__(self, directory=None, host=None, port=None, username=None,
password=None, index_prefix=None, index=None, doc_type=None,
insecure=None, follow=None, workers=None, chunk_size=None,
keep=None, ignore_es_status=None, debug=None):
self.directory = directory
self.host = host
self.port = port
self.username = username
self.password = password
self.index_prefix = index_prefix
self.index = index
self.doc_type = doc_type
self.insecure = insecure
self.follow = follow
self.workers = workers
self.chunk_size = chunk_size
self.keep = keep
self.ignore_es_status = ignore_es_status
self.debug = debug
class TestSender(base.TestCase):
@mock.patch('logscraper.logsender.remove_directory')
@mock.patch('logscraper.logsender.send_to_es')
@mock.patch('logscraper.logsender.get_build_information')
@mock.patch('logscraper.logsender.get_es_client')
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
directory="/tmp/testdir", doc_type='_doc'))
def test_send(self, mock_args, mock_es_client, mock_build_info,
mock_send_to_es, mock_remove_dir):
build_uuid = '38bf2cdc947643c9bb04f11f40a0f211'
build_files = ['job-result.txt']
directory = '/tmp/testdir'
index = 'logstash-index'
workers = 1
args = logsender.get_arguments()
mock_send_to_es.return_value = True
logsender.send((build_uuid, build_files), args, directory, index,
workers)
self.assertTrue(mock_remove_dir.called)
@mock.patch('logscraper.logsender.remove_directory')
@mock.patch('logscraper.logsender.send_to_es')
@mock.patch('logscraper.logsender.get_build_information')
@mock.patch('logscraper.logsender.get_es_client')
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
directory="/tmp/testdir", keep=True, doc_type="_doc"))
def test_send_keep_dir(self, mock_args, mock_es_client, mock_build_info,
mock_send_to_es, mock_remove_dir):
build_uuid = '38bf2cdc947643c9bb04f11f40a0f211'
build_files = ['job-result.txt']
directory = '/tmp/testdir'
index = 'logstash-index'
workers = 1
args = logsender.get_arguments()
mock_send_to_es.return_value = True
logsender.send((build_uuid, build_files), args, directory, index,
workers)
self.assertFalse(mock_remove_dir.called)
@mock.patch('logscraper.logsender.send_bulk')
@mock.patch('logscraper.logsender.read_text_file')
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
directory="/tmp/testdir", index="myindex", workers=1,
ignore_es_status=False, chunk_size=1000,
doc_type="zuul"))
def test_send_to_es(self, mock_args, mock_text, mock_bulk):
build_file = 'job-result.txt'
es_fields = parsed_fields
es_client = mock.Mock()
args = logsender.get_arguments()
text = ["2022-02-28 09:39:09.596010 | Job console starting...",
"2022-02-28 09:39:09.610160 | Updating repositories",
"2022-02-28 09:39:09.996235 | Preparing job workspace"]
mock_text.return_value = text
es_doc = [{
'_index': 'myindex',
'_type': 'zuul',
'_source': {
'build_node': 'zuul-executor',
'build_name': 'openstack-tox-py39',
'build_status': 'SUCCESS',
'project': 'openstack/neutron',
'voting': 1,
'build_set': '52b29e0e716a4436bd20eed47fa396ce',
'build_queue': 'check',
'build_ref': 'refs/changes/61/829161/3',
'build_branch': 'master',
'build_change': 829161,
'build_patchset': '3',
'build_newrev': 'UNKNOWN',
'build_uuid': '38bf2cdc947643c9bb04f11f40a0f211',
'node_provider': 'local',
'log_url':
'https://somehost/829161/3/check/openstack-tox-py39/38bf2cd/',
'tenant': 'openstack',
'zuul_executor': 'ze07.opendev.org',
'@timestamp': '2022-02-28T09:39:09.596000',
'message': ' Job console starting...'
}
}, {
'_index': 'myindex',
'_type': 'zuul',
'_source': {
'build_node': 'zuul-executor',
'build_name': 'openstack-tox-py39',
'build_status': 'SUCCESS',
'project': 'openstack/neutron',
'voting': 1,
'build_set': '52b29e0e716a4436bd20eed47fa396ce',
'build_queue': 'check',
'build_ref': 'refs/changes/61/829161/3',
'build_branch': 'master',
'build_change': 829161,
'build_patchset': '3',
'build_newrev': 'UNKNOWN',
'build_uuid': '38bf2cdc947643c9bb04f11f40a0f211',
'node_provider': 'local',
'log_url':
'https://somehost/829161/3/check/openstack-tox-py39/38bf2cd/',
'tenant': 'openstack',
'zuul_executor': 'ze07.opendev.org',
'@timestamp': '2022-02-28T09:39:09.610000',
'message': ' Updating repositories'
}
}, {
'_index': 'myindex',
'_type': 'zuul',
'_source': {
'build_node': 'zuul-executor',
'build_name': 'openstack-tox-py39',
'build_status': 'SUCCESS',
'project': 'openstack/neutron',
'voting': 1,
'build_set': '52b29e0e716a4436bd20eed47fa396ce',
'build_queue': 'check',
'build_ref': 'refs/changes/61/829161/3',
'build_branch': 'master',
'build_change': 829161,
'build_patchset': '3',
'build_newrev': 'UNKNOWN',
'build_uuid': '38bf2cdc947643c9bb04f11f40a0f211',
'node_provider': 'local',
'log_url':
'https://somehost/829161/3/check/openstack-tox-py39/38bf2cd/',
'tenant': 'openstack',
'zuul_executor': 'ze07.opendev.org',
'@timestamp': '2022-02-28T09:39:09.996000',
'message': ' Preparing job workspace'
}
}]
logsender.send_to_es(build_file, es_fields, es_client, args.index,
args.workers, args.ignore_es_status,
args.chunk_size, args.doc_type)
mock_bulk.assert_called_once_with(es_client, es_doc, 1, False, 1000)
@mock.patch('collections.deque')
@mock.patch('opensearchpy.helpers.parallel_bulk')
def test_send_bulk(self, mock_parallel_bulk, mock_deque):
es_client = mock.MagicMock()
mock_parallel_bulk.return_value = [(True, "200"), (True, "200")]
request = [{'some': 'info'}, {'other': 'info'}]
workers = 1
chunk_size = 1500
ignore_es_status = False
bulk = logsender.send_bulk(es_client, request, workers,
ignore_es_status, chunk_size)
self.assertFalse(mock_deque.called)
self.assertTrue(mock_parallel_bulk.called)
self.assertTrue(bulk)
@mock.patch('collections.deque')
@mock.patch('opensearchpy.helpers.parallel_bulk')
def test_send_bulk_ignore_status(self, mock_parallel_bulk, mock_deque):
es_client = mock.MagicMock()
request = [{'some': 'info'}, {'other': 'info'}]
workers = 1
chunk_size = 1500
ignore_es_status = True
logsender.send_bulk(es_client, request, workers, ignore_es_status,
chunk_size)
self.assertTrue(mock_deque.called)
self.assertTrue(mock_parallel_bulk.called)
@mock.patch('collections.deque')
@mock.patch('opensearchpy.helpers.parallel_bulk')
def test_send_bulk_error(self, mock_parallel_bulk, mock_deque):
es_client = mock.Mock()
mock_parallel_bulk.return_value = [(True, "200"), (False, "500")]
request = [{'some': 'info'}, {'other': 'info'}]
workers = 1
chunk_size = 1500
ignore_es_status = False
bulk = logsender.send_bulk(es_client, request, workers,
ignore_es_status, chunk_size)
self.assertFalse(mock_deque.called)
self.assertTrue(mock_parallel_bulk.called)
self.assertIsNone(bulk)
@mock.patch('logscraper.logsender.read_yaml_file',
side_effect=[_parse_get_yaml(buildinfo),
_parse_get_yaml(inventory_info)])
def test_makeFields(self, mock_read_yaml_file):
buildinfo_yaml = logsender.get_build_info('fake_dir')
inventory_info_yaml = logsender.get_inventory_info('other_fake_dir')
generated_info = logsender.makeFields(inventory_info_yaml,
buildinfo_yaml)
self.assertEqual(parsed_fields, generated_info)
def test_get_message(self):
line_1 = "28-02-2022 09:44:58.839036 | Some message"
line_2 = "2022-02-28 09:44:58.839036 | Other message | other log info"
self.assertEqual(" Some message", logsender.get_message(line_1))
self.assertEqual(" Other message | other log info",
logsender.get_message(line_2))
def test_get_timestamp(self):
line_1 = "28-02-2022 09:44:58.839036 | Some message"
line_2 = "2022-02-28 09:44:58.839036 | Other message"
self.assertEqual(None, logsender.get_timestamp(line_1))
self.assertEqual("2022-02-28T09:44:58.839000",
logsender.get_timestamp(line_2))
@mock.patch('logscraper.logsender.get_es_client')
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
index_prefix="my-index-", workers=2))
def test_get_index(self, mock_args, mock_es_client):
args = logsender.get_arguments()
expected_index = ("my-index-%s" %
datetime.datetime.today().strftime('%Y.%m.%d'))
index = logsender.get_index(args)
self.assertEqual(expected_index, index)
@mock.patch('logscraper.logsender.send')
@mock.patch('logscraper.logsender.get_index')
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
directory="/tmp/testdir", workers=2, index='myindex'))
def test_prepare_and_send(self, mock_args, mock_index, mock_send):
args = logsender.get_arguments()
ready_directories = {'builduuid': ['job-result.txt']}
mock_index.return_value = args.index
with mock.patch(
'multiprocessing.pool.Pool.starmap',
lambda self, func, iterable, chunksize=None,
callback=None,
error_callback=None: _MockedPoolMapResult(func, iterable),
):
logsender.prepare_and_send(ready_directories, args)
self.assertTrue(mock_send.called)
mock_send.assert_called_with((('builduuid', ['job-result.txt']),
args, args.directory, args.index, 2))

View File

@ -1,5 +1,7 @@
pbr>=1.6 # Apache-2.0
gear<0.17
requests<2.27 # Apache-2.0
PyYAML<6.1 # MIT
PyYAML<6.1 # MIT
tenacity
opensearch-py<=1.0.0 # Apache-2.0
ruamel.yaml

View File

@ -25,3 +25,4 @@ packages =
[entry_points]
console_scripts =
logscraper = logscraper.logscraper:main
logsender = logscraper.logsender:main