From c0cc4dbaf154955692c6d60bc948bdf5c2b81f61 Mon Sep 17 00:00:00 2001 From: Daniel Pawlik Date: Tue, 22 Feb 2022 10:36:06 +0100 Subject: [PATCH] Add Logsender tool The logsender tool will be used to parse the content downloaded by logscraper tool with --download parameter and send it to the Elasticsearch service. By using logsender tool, services like: gearman client, gearman worker and logstash are not necessary to maintain, because the information are send directly to the Elasticsearch host. Depends-On: https://review.opendev.org/c/openstack/ci-log-processing/+/830169 Change-Id: I3e28b847e7cafbf4742fb718ef09ce3e658f945f --- .zuul.yaml | 9 + Dockerfile | 1 - README.rst | 99 ++++ ansible/playbooks/check-services-sender.yml | 25 + .../roles/check-services/tasks/download.yml | 122 ++++ .../logscraper/templates/logscraper.sh.j2 | 1 + ansible/roles/logsender/README.rst | 91 +++ ansible/roles/logsender/defaults/main.yml | 27 + ansible/roles/logsender/meta/main.yml | 13 + ansible/roles/logsender/tasks/main.yml | 23 + ansible/roles/logsender/tasks/service.yml | 29 + .../logsender/templates/logsender.service.j2 | 16 + .../roles/logsender/templates/logsender.sh.j2 | 52 ++ doc/source/index.rst | 2 + doc/source/logsender.rst | 96 ++++ logscraper/logsender.py | 387 +++++++++++++ logscraper/tests/test_logsender.py | 520 ++++++++++++++++++ requirements.txt | 4 +- setup.cfg | 1 + 19 files changed, 1516 insertions(+), 2 deletions(-) create mode 100644 ansible/playbooks/check-services-sender.yml create mode 100644 ansible/roles/check-services/tasks/download.yml create mode 100644 ansible/roles/logsender/README.rst create mode 100644 ansible/roles/logsender/defaults/main.yml create mode 100644 ansible/roles/logsender/meta/main.yml create mode 100644 ansible/roles/logsender/tasks/main.yml create mode 100644 ansible/roles/logsender/tasks/service.yml create mode 100644 ansible/roles/logsender/templates/logsender.service.j2 create mode 100644 ansible/roles/logsender/templates/logsender.sh.j2 create mode 100644 doc/source/logsender.rst create mode 100755 logscraper/logsender.py create mode 100644 logscraper/tests/test_logsender.py diff --git a/.zuul.yaml b/.zuul.yaml index 51bb8e9..c841bc0 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -7,6 +7,14 @@ nodes: - name: centos-8-stream label: centos-8-stream +- job: + name: ci-log-processing-functional-test-centos-8-stream-sender + description: Test is validating Logscraper and logsender services + run: ansible/playbooks/check-services-sender.yml + nodeset: + nodes: + - name: centos-8-stream + label: centos-8-stream - project: templates: @@ -18,4 +26,5 @@ - openstack-tox-pep8 - openstack-tox-py38 - ci-log-processing-functional-test-centos-8-stream + - ci-log-processing-functional-test-centos-8-stream-sender gate: *logcheck diff --git a/Dockerfile b/Dockerfile index 85dc0b3..d46fb41 100644 --- a/Dockerfile +++ b/Dockerfile @@ -37,4 +37,3 @@ RUN dnf remove -y python3-devel git && \ rm -rf ~/.cache/pip USER logscraper -ENTRYPOINT ["/usr/local/bin/logscraper"] diff --git a/README.rst b/README.rst index 27d16c6..546d395 100644 --- a/README.rst +++ b/README.rst @@ -13,6 +13,17 @@ check by using Zuul CI API if there are new builds available and if there are some, it would push the informations to the log processing system. + +Zuul Log Sender +--------------- + +The Zuul Log Sender tool is responsible for periodical check +directory, if there are some files that should be send to the +Elasticsearch service. +NOTE: build directories that does not provide files `buildinfo` +and `inventory.yaml` file are skipped. + + Testing ------- @@ -23,6 +34,94 @@ continuous-integration environment, powered by `Zuul Any changes to logscraper script or tests will trigger jobs to thoroughly test those changes. + +Benchmarking +------------ + +The large Zuul CI deployments requires many CI log processing resources. +In that case, we can do a benchmark with two log processing deployments. +All tests will do same: + + - send 100 log builds to Elasticsearch that is running on same host + - logscraper will be using 4 workers + - VM will have 8 vcpus, 16 GB of RAM + +Testing workflows: + +* loggearman and logstash + +This workflow will spawn 3 additional loggearman workers because it this +service is a bottleneck in that log ci workflow. + +You can do it with command: + +.. code-block:: shell + + for i in {1..3}; do \ + podman run --network host -d --name loggearman-worker-$i \ + --volume /etc/loggearman:/etc/loggearman:z \ + --volume /var/log/loggearman:/var/log/loggearman:z \ + quay.rdoproject.org/software-factory/loggearman:latest \ + log-gearman-worker -c /etc/loggearman/worker.yml --foreground -d /var/log/loggearman/worker.log + +To remove: + +.. code-block:: shell + + for i in {1..3}; do \ + podman stop loggearman-worker-$i ; podman rm loggearman-worker-$i + + +On the end, basic calucations: + +.. code-block:: python + + import datetime + start = datetime.datetime.fromisoformat("2022-02-28 16:44:59") + stop = datetime.datetime.fromisoformat("2022-02-28 16:46:01") + print((stop-start).total_seconds()) + + +Time spend to run logscraper and wait for finish all loggearman workers took: 62 seconds and +it takes 680MB of RAM. + + +* logsender workflow + +This workflow will only use logsender tool and it will push the logs +directly to the Elasticsearch service. Same as in previous test, +it will be executed on 4 processes. + +To download logs: + +.. code-block:: shell + + logscraper \ + --zuul-api-url https://zuul.opendev.org/api/tenant/openstack \ + --checkpoint-file /tmp/results-checkpoint.txt \ + --worker 8 \ + --max-skipped 100 \ + --download \ + --directory /tmp/logscraper + +This operation took: 30 seconds and it uses 130 MB of RAM. + +.. code-block:: shell + + logsender --username admin --password mypassword --host localhost --port 9200 --insecure --workers 4 + + +Time spend to run logscraper and wait for finish all loggearman workers took: 35 second and +it takes 520 MB of RAM. + +Conclusion: + +The logsender way seems to use less memory (on Opendev deployment, logstash +service is on different host, but 4096 MB of RAM is not enough) and it is faster, +but the logscraper and logsender process was executed one by one - on the +beginning logscraper download logs, then logsender send them to +Elasticsearch. + Continuous Deployment --------------------- Once changes are reviewed and committed, they will be applied diff --git a/ansible/playbooks/check-services-sender.yml b/ansible/playbooks/check-services-sender.yml new file mode 100644 index 0000000..e63b8a2 --- /dev/null +++ b/ansible/playbooks/check-services-sender.yml @@ -0,0 +1,25 @@ +--- +- hosts: all + become: true + vars: + # logscraper + tenant_builds: + - tenant: openstack + zuul_api_url: https://zuul.opendev.org/api/tenant/openstack + zuul_api_urls: + - https://zuul.opendev.org/api/tenant/openstack + insecure: false + job_names: [] + download: true + download_dir: /mnt/logscraper/openstack + es_username: admin + es_password: admin + es_host: localhost + es_port: 9200 + es_insecure: true + es_index: logstash-logscraper + tasks: + - name: Run check services + include_role: + name: check-services + tasks_from: download.yml diff --git a/ansible/roles/check-services/tasks/download.yml b/ansible/roles/check-services/tasks/download.yml new file mode 100644 index 0000000..a515f89 --- /dev/null +++ b/ansible/roles/check-services/tasks/download.yml @@ -0,0 +1,122 @@ +--- +- name: Install packages + package: + name: podman + state: present + +### BUILD CONTAINER IMAGES ### +- name: Build container images + block: + - name: Build logscraper container image - Zuul + shell: > + podman build -t quay.io/logscraper:dev -f Dockerfile + args: + chdir: "{{ zuul.projects['opendev.org/openstack/ci-log-processing'].src_dir }}" + when: zuul is defined + + - name: Build logscraper container image - non Zuul + shell: > + podman build -t quay.io/logscraper:dev -f Dockerfile + args: + chdir: "{{ playbook_dir }}" + when: zuul is not defined + + - name: Get logscraper image id + shell: | + podman images --noheading quay.io/logscraper:dev | awk '{print $3}' + register: _logscraper_image_id + + - name: Print all images + shell: | + podman images + + - name: Replace container images + set_fact: + container_images: + logscraper: "{{ _logscraper_image_id.stdout }}" + logsender: "{{ _logscraper_image_id.stdout }}" + +### OPENSEARCH #### +- name: Setup Opensearch + shell: > + podman run -d --name opensearch \ + --network host \ + -e "discovery.type=single-node" \ + quay.rdoproject.org/software-factory/opensearch:1.1.0 + +- name: Wait for Opensearch to be up + wait_for: + host: 0.0.0.0 + port: 9200 + delay: 10 + timeout: 300 + +- name: Wait for Opensearch to be up + uri: + url: "https://0.0.0.0:9200" + user: "admin" + password: "admin" + force_basic_auth: true + method: GET + validate_certs: false + status_code: "200" + register: result + until: result.status == 200 + retries: 30 + delay: 10 + +### Logscraper ### +- name: Setup logscraper service + include_role: + name: logscraper + +### Logsender ### +- name: Setup logsender service + include_role: + name: logsender + +# Flush handlers before running test +- name: Force all notified handlers to run now + meta: flush_handlers + +### service validation ### +- name: Ensure that all services are available and running + shell: | + systemctl is-active -q {{ item }} + loop: + - logscraper-openstack + - logsender-openstack + register: _service_status + failed_when: _service_status.rc != 0 + +# FIXME: The index is created on the beginning of the logsender work. +# The playbook should validate if some data has been pushed to the ES. +- name: Get Opensearch indices + uri: + url: "https://localhost:9200/_cat/indices" + user: "admin" + password: "admin" + force_basic_auth: true + method: GET + validate_certs: false + status_code: "200" + return_content: true + register: _opensearch_indices + until: "'logstash-logscraper' in _opensearch_indices.content" + retries: 30 + delay: 10 + +- name: Check if build_branch exists in index content + uri: + url: "https://localhost:9200/logstash-logscraper" + user: "admin" + password: "admin" + force_basic_auth: true + method: GET + validate_certs: false + status_code: "200" + return_content: true + register: _opensearch_index_content + until: "'build_branch' in _opensearch_index_content.content" + retries: 30 + delay: 10 diff --git a/ansible/roles/logscraper/templates/logscraper.sh.j2 b/ansible/roles/logscraper/templates/logscraper.sh.j2 index 187eea1..04189bf 100644 --- a/ansible/roles/logscraper/templates/logscraper.sh.j2 +++ b/ansible/roles/logscraper/templates/logscraper.sh.j2 @@ -12,6 +12,7 @@ --volume {{ item.download_dir }}:{{ item.download_dir }}:z \ {% endif %} {{ container_images['logscraper'] }} \ + /usr/local/bin/logscraper \ {% if 'gearman_port' in item and 'gearman_server' in item %} --gearman-port {{ item.gearman_port }} \ --gearman-server {{ item.gearman_server }} \ diff --git a/ansible/roles/logsender/README.rst b/ansible/roles/logsender/README.rst new file mode 100644 index 0000000..1faab65 --- /dev/null +++ b/ansible/roles/logsender/README.rst @@ -0,0 +1,91 @@ +Logsender ansible role +====================== + +The goal of this role is to setup and configure service related +to logsender script which is responsible to parse log content, +attach required information that are available in `buildlog` and +`inventory.yaml` file and send it to Elasticsearch service. + +Requirements +------------ + +None + +Role Variables +-------------- + +The role is automatically deploying service related to the +log sender service. +Example Ansible variables that are configuring service: + +.. code-block:: yaml + + vars: + tenant_builds: + - tenant: openstack + es_username: admin + es_password: admin + es_host: localhost + es_port: 9200 + es_insecure: true + es_index: logstash-logscraper + download_dir: /mnt/logscraper/sometenant + + +That configuration will will deploy service with name: `logsender-openstack.service`. +It is because there can be multiple instances of logsender service - each +will be configured to other tenant. + +Dependencies +------------ + +None + +Example Playbook +---------------- + +Playbook responsible for deploying service can look like: + +Below is a playbook example, responsible for deploying two logsender +services, where one will responsible to get logs from `openstack` tenant +and second one for getting logs from `sometenant` tenant. + +.. code-block:: yaml + + - name: Configure Logscraper tool + hosts: localhost + become: true + vars: + tenant_builds: + - tenant: openstack + es_username: logstash + es_password: logstash + es_host: localhost + es_port: 9200 + es_insecure: false + es_index: "" + es_index_prefix: "" + download_dir: /mnt/logscraper/openstack + - tenant: sometenant + es_username: logstash + es_password: logstash + es_host: otherhost + es_port: 9200 + es_insecure: false + es_index: "" + es_index_prefix: "" + download_dir: /mnt/logscraper/sometenant + roles: + - logsender + +License +------- + +Apache + +Author Information +------------------ + +Author: OpenStack Contributors +Author email: openstack-discuss@lists.openstack.org +Home page: http://docs.openstack.org/infra/ci-log-processing diff --git a/ansible/roles/logsender/defaults/main.yml b/ansible/roles/logsender/defaults/main.yml new file mode 100644 index 0000000..3e6226b --- /dev/null +++ b/ansible/roles/logsender/defaults/main.yml @@ -0,0 +1,27 @@ +--- +logsender_user: logscraper +logsender_group: logscraper +logscraper_gid: 10210 +logscraper_uid: 10210 + +container_images: + # FIXME: Create new project on Docker hub that will contain that image + logsender: quay.rdoproject.org/software-factory/logscraper:latest + +# Elasticsearch configuration +# tenant_builds: +# - tenant: sometenant +# es_host: localhost +# es_port: 9200 +# es_username: logstash +# es_password: logstash +# es_index_prefix: "" +# es_index: "" +# es_insecure: false +# download_dir: /mnt/logscraper/sometenant +# doc_type: "_doc" +# logsender_workers: 1 +# chunk_size: 1500 +# keep: true +# ignore_es_status: false +tenant_builds: [] diff --git a/ansible/roles/logsender/meta/main.yml b/ansible/roles/logsender/meta/main.yml new file mode 100644 index 0000000..6c1e8ce --- /dev/null +++ b/ansible/roles/logsender/meta/main.yml @@ -0,0 +1,13 @@ +--- +galaxy_info: + author: Openstack Contributors + description: Openstack Logsender tool + company: Openstack + license: Apache + min_ansible_version: 2.9 + platforms: + - name: Centos + versions: + - 8 + galaxy_tags: [] +dependencies: [] diff --git a/ansible/roles/logsender/tasks/main.yml b/ansible/roles/logsender/tasks/main.yml new file mode 100644 index 0000000..cac3bc6 --- /dev/null +++ b/ansible/roles/logsender/tasks/main.yml @@ -0,0 +1,23 @@ +--- +- name: Create dedicated group + group: + name: "{{ logsender_group }}" + state: present + +- name: Create dedicated user + user: + name: "{{ logsender_user }}" + state: present + comment: "Dedicated user for logsender" + group: "{{ logsender_group }}" + shell: "/sbin/nologin" + create_home: false + +- name: Ensure container software is installed + package: + name: podman + state: present + +- name: Configure logsender service + include_tasks: service.yml + loop: "{{ tenant_builds }}" diff --git a/ansible/roles/logsender/tasks/service.yml b/ansible/roles/logsender/tasks/service.yml new file mode 100644 index 0000000..f67f9d2 --- /dev/null +++ b/ansible/roles/logsender/tasks/service.yml @@ -0,0 +1,29 @@ +--- +- name: Generate logsender script + template: + src: logsender.sh.j2 + dest: "/usr/local/bin/logsender-{{ item.tenant }}" + mode: '0755' + register: _start_script + +- name: Generate systemd unit + template: + src: logsender.service.j2 + dest: "/etc/systemd/system/logsender-{{ item.tenant }}.service" + owner: root + group: root + +- name: Enable and restart service + service: + name: logsender-{{ item.tenant }} + state: restarted + daemon-reload: true + enabled: true + when: _start_script.changed + +- name: Ensure that service is running + service: + name: logsender-{{ item.tenant }} + state: started + daemon-reload: true + enabled: true diff --git a/ansible/roles/logsender/templates/logsender.service.j2 b/ansible/roles/logsender/templates/logsender.service.j2 new file mode 100644 index 0000000..c3732b5 --- /dev/null +++ b/ansible/roles/logsender/templates/logsender.service.j2 @@ -0,0 +1,16 @@ +[Unit] +Description=Logsender service for {{ item.tenant }} +After=syslog.target network.target +StartLimitInterval=20 +StartLimitBurst=5 + +[Service] +Type=simple +SyslogIdentifier=logsender-{{ item.tenant }} +Restart=always +RestartSec=3s +ExecStop=-/usr/bin/podman stop -t 10 logsender-{{ item.tenant }} +ExecStart=/usr/local/bin/logsender-{{ item.tenant }} + +[Install] +WantedBy=multi-user.target diff --git a/ansible/roles/logsender/templates/logsender.sh.j2 b/ansible/roles/logsender/templates/logsender.sh.j2 new file mode 100644 index 0000000..b1c2348 --- /dev/null +++ b/ansible/roles/logsender/templates/logsender.sh.j2 @@ -0,0 +1,52 @@ +#!/bin/bash + +/usr/bin/podman run \ + --network host \ + --rm \ + --user 1000:1000 \ + --uidmap 0:{{ logscraper_uid + 1 }}:999 \ + --uidmap 1000:{{ logscraper_uid }}:1 \ + --name logsender-{{ item.tenant }} \ + --volume {{ item.download_dir }}:{{ item.download_dir }}:z \ + {{ container_images['logsender'] }} \ + /usr/local/bin/logsender \ + {% if 'es_host' in item and item['es_host'] %} + --host "{{ item['es_host'] }}" \ + {% endif %} + {% if 'es_port' in item and item['es_port'] %} + --port "{{ item['es_port'] }}" \ + {% endif %} + {% if 'es_username' in item and item['es_username'] %} + --username "{{ item.es_username }}" \ + {% endif %} + {% if 'es_password' in item and item['es_password'] %} + --password "{{ item.es_password }}" \ + {% endif %} + {% if 'es_index_prefix' in item and item['es_index_prefix'] %} + --index-prefix "{{ item['es_index_prefix'] }}" \ + {% endif %} + {% if 'es_index' in item and item['es_index'] %} + --index "{{ item['es_index'] }}" \ + {% endif %} + {% if 'download_dir' in item and item['download_dir'] %} + --directory "{{ item['download_dir'] }}" \ + {% endif %} + {% if 'doc_type' in item and item['doc_type'] %} + --doc-type "{{ item['doc_type'] }}" \ + {% endif %} + {% if 'es_insecure' in item and item['es_insecure'] %} + --insecure \ + {% endif %} + {% if 'logsender_workers' in item and item['logsender_workers'] %} + --workers "{{ item['logsender_workers'] }}" \ + {% endif %} + {% if 'chunk_size' in item and item['chunk_size'] %} + --chunk-size "{{ item['chunk_size'] }}" \ + {% endif %} + {% if 'keep' in item and item['keep'] %} + --keep \ + {% endif %} + {% if 'ignore_es_status' in item and item['ignore_es_status'] %} + --ignore-es-status \ + {% endif %} + --follow diff --git a/doc/source/index.rst b/doc/source/index.rst index 23a0a5b..8237840 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -16,6 +16,7 @@ Contents: .. sidebar:: HOWTOs * :doc:`logscraper` + * :doc:`logsender` * :doc:`ansible-role` * :doc:`loggearman` * :doc:`loggearman-role` @@ -24,6 +25,7 @@ Contents: :maxdepth: 2 logscraper + logsender ansible-role loggearman loggearman-role diff --git a/doc/source/logsender.rst b/doc/source/logsender.rst new file mode 100644 index 0000000..975a119 --- /dev/null +++ b/doc/source/logsender.rst @@ -0,0 +1,96 @@ +Logsender +========= + +The logscraper tool is parsing log files that are available +in the directory, attach important data that are provided in `buildlog` and +`inventory.yaml` files and send it directly to the Opensearch service. + +Available arguments for logsender are: + +.. code-block:: + + logsender --help + + Check log directories and push to the Opensearch service + + options: + -h, --help show this help message and exit + --directory DIRECTORY + Directory, where the logs will be stored. Defaults to: /tmp/logscraper + --host HOST Opensearch host + --port PORT Opensearch port + --username USERNAME Opensearch username + --password PASSWORD Opensearch user password + --index-prefix INDEX_PREFIX + Prefix for the index. Defaults to logstash- + --index INDEX Opensearch index. Defaults to: -YYYY-DD + --insecure Skip validating SSL cert + --follow Keep sending CI logs + --workers WORKERS Worker processes for logsender + --chunk-size CHUNK_SIZE + The bulk chunk size + --keep Do not remove log directory after + --ignore-es-status Ignore Opensearch bulk + --debug DEBUG Be more verbose + + +Basic usage +----------- + +Base on the use case, we can run logsender. + +Example: + +* Send logs to that is running on localhost, skip TLS cert verification + +.. code-block:: + + logsender --username logstash --password logstashpassword --host localhost --port 9200 --insecure + + +* Send logs to service, use 8 workers and ignore Opensearch bulk update status. WARNING: --ignore-es-status should not be used on production environment! + +.. code-block:: + + logsender --username logstash --password logstashpassword --host localhost --port 9200 --insecure --workers 8 --ignore-es-status + + +* Send logs to elasticsaerch service, provide own index name "myindex" and keep log files (they will be not deleted): + +.. code-block:: + + logsender --username logstash --password logstashpassword --index myindex --keep + + +Containerize tool +----------------- + +Instead of using `pip` tool, you can build your own container image +that contains logscraper tool, for example: + +.. code-block:: + + docker build -t logscraper -f Dockerfile + +NOTE: the logsender tool will be included in logscraper container image. + +Then you can execute commands that are described above. + +NOTE: The directory where you store log files should be mounted to the container. +For example: + +.. code-block:: + + podman run \ + --network host \ + -d \ + --name logsender-openstack \ + --volume /mnt/logscraper/openstack:/mnt/logscraper/openstack:z \ + logscraper \ + /usr/local/bin/logsender \ + --username admin \ + --password admin \ + --host localhost \ + --port 9200 \ + --directory /mnt/logscraper/openstack \ + --follow diff --git a/logscraper/logsender.py b/logscraper/logsender.py new file mode 100755 index 0000000..08bd912 --- /dev/null +++ b/logscraper/logsender.py @@ -0,0 +1,387 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2022 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +The goal is to get content from build uuid directory and send to Opensearch + +[ CLI ] -> [ Log directory ] -> [ Zuul inventory ] -> [ Send logs to ES ] +""" + +import argparse +import collections +import copy +import datetime +import itertools +import logging +import multiprocessing +import os +import re +import shutil +import sys +import time + +from opensearchpy import exceptions as opensearch_exceptions +from opensearchpy import helpers +from opensearchpy import OpenSearch +from ruamel.yaml import YAML + + +############################################################################### +# CLI # +############################################################################### +def get_arguments(): + parser = argparse.ArgumentParser(description="Check log directories " + "and push to the Opensearch service") + parser.add_argument("--directory", + help="Directory, where the logs will " + "be stored. Defaults to: /tmp/logscraper", + default="/tmp/logscraper") + parser.add_argument("--host", + help="Opensearch host", + default='localhost') + parser.add_argument("--port", + help="Opensearch port", + type=int, + default=9200) + parser.add_argument("--username", + help="Opensearch username", + default='logstash') + parser.add_argument("--password", help="Opensearch user password") + parser.add_argument("--index-prefix", help="Prefix for the index. " + "Defaults to logstash-", + default='logstash-') + parser.add_argument("--index", + help="Opensearch index. Defaults to: " + "-YYYY-DD") + parser.add_argument("--doc-type", help="Doc type information that will be" + "send to the Opensearch service", + default="_doc") + parser.add_argument("--insecure", + help="Skip validating SSL cert", + action="store_false") + parser.add_argument("--follow", help="Keep sending CI logs", + action="store_true") + parser.add_argument("--workers", help="Worker processes for logsender", + type=int, + default=1) + parser.add_argument("--chunk-size", help="The bulk chunk size", + type=int, + default=1500) + parser.add_argument("--keep", help="Do not remove log directory after", + action="store_true") + parser.add_argument("--ignore-es-status", help="Ignore Opensearch bulk", + action="store_true") + parser.add_argument("--debug", help="Be more verbose", + action="store_true") + args = parser.parse_args() + return args + + +############################################################################### +# Log sender # +############################################################################### +def _is_file_not_empty(file_path): + """Return True when buildinfo file is not empty""" + # NOTE: we can assume, that when file exists, all + # content have been dowloaded to the directory. + return os.path.getsize(file_path) > 0 + + +def check_info_files(root, files): + return True if ( + 'buildinfo' in files and 'inventory.yaml' in files and + _is_file_not_empty("%s/buildinfo" % root) and + _is_file_not_empty("%s/inventory.yaml" % root) + ) else False + + +def read_yaml_file(file_path): + # FIXME: In logscraper yaml.dump seems not to be dumping correctly the + # dictionary, so ruamel lib is needed. + yaml = YAML() + with open(file_path, 'r') as f: + return yaml.load(f) + + +def read_text_file(file_path): + with open(file_path, 'r') as f: + return f.readlines() + + +def get_inventory_info(directory): + try: + build_inventory = read_yaml_file("%s/inventory.yaml" % directory) + return build_inventory['all']['vars']['zuul'] + except FileNotFoundError: + logging.warning("Can not find inventory.yaml in build " + "dir %s" % directory) + + +def get_build_info(directory): + return read_yaml_file("%s/buildinfo" % directory) + + +def get_ready_directories(directory): + """Returns a directory with list of files + + That directories should have a 'buildinfo' and 'inventory.yaml' file + which are not empty. + """ + + log_files = {} + for root, _, files in os.walk(directory): + build_uuid = root.split('/')[-1] + if check_info_files(root, files): + files.remove("buildinfo") + files.remove("inventory.yaml") + log_files[build_uuid] = files + else: + logging.info("Skipping build with uuid %s. Probably all files " + "are not dowloaded yet." % build_uuid) + continue + + return log_files + + +def remove_directory(dir_path): + logging.debug("Removing directory %s" % dir_path) + shutil.rmtree(dir_path) + + +def makeFields(build_details, buildinfo): + fields = {} + fields["build_node"] = "zuul-executor" + # NOTE: that field is added later + # fields["filename"] = build_file + fields["build_name"] = buildinfo.get("job_name") + fields["build_status"] = buildinfo["result"] + fields["project"] = buildinfo.get('project') + fields["voting"] = int(build_details["voting"]) + fields["build_set"] = build_details["buildset"] + fields["build_queue"] = build_details["pipeline"] + fields["build_ref"] = buildinfo.get("ref") + fields["build_branch"] = buildinfo.get("branch") + fields["build_change"] = buildinfo.get("change") + fields["build_patchset"] = buildinfo.get("patchset") + fields["build_newrev"] = build_details.get("newrev", "UNKNOWN") + fields["build_uuid"] = buildinfo.get("uuid") + fields["node_provider"] = "local" + fields["log_url"] = buildinfo.get("log_url") + fields["tenant"] = buildinfo.get("tenant") + if "executor" in build_details and "hostname" in build_details["executor"]: + fields["zuul_executor"] = build_details["executor"]["hostname"] + return fields + + +def send_bulk(es_client, request, workers, ignore_es_status, chunk_size): + """Send bulk request to Opensearch""" + try: + if ignore_es_status: + return collections.deque(helpers.parallel_bulk( + es_client, request, thread_count=workers, + chunk_size=chunk_size)) + + # NOTE: To see bulk update status, we can use: + # https://elasticsearch-py.readthedocs.io/en/7.10.0/helpers.html#example + for success, info in helpers.parallel_bulk(es_client, request, + thread_count=workers, + chunk_size=chunk_size): + if not success: + logging.error("Chunk was not send to Opensearch %s" % info) + return + # If all bulk updates are fine, return True + return True + except Exception as e: + logging.critical("Exception occured on pushing data to " + "Opensearch %s" % e) + return + + +def get_timestamp(line): + try: + timestamp_search = re.search(r'[-0-9]{10}\s+[0-9.:]{12}', line) + timestamp = (timestamp_search.group() if timestamp_search else + datetime.datetime.utcnow().isoformat()) + # NOTE: On python 3.6, it should be: + # datetime.datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f") + # Ci-log-processing is using container with Python 3.8, where + # fromisoformat attribute is available. + return datetime.datetime.fromisoformat(timestamp).isoformat() + except Exception as e: + logging.critical("Exception occured on parsing timestamp %s" % e) + + +def get_message(line): + try: + return line.split("|", 1)[1].replace('\n', '') + except IndexError: + return line.replace('\n', '') + + +def send_to_es(build_file, es_fields, es_client, index, workers, + ignore_es_status, chunk_size, doc_type): + """Send document to the Opensearch""" + request = [] + logging.info("Working on %s" % build_file) + file_content = read_text_file(build_file) + for line in file_content: + fields = copy.deepcopy(es_fields) + fields["@timestamp"] = get_timestamp(line) + + message = get_message(line) + if not message: + continue + fields["message"] = message + + doc = {"_index": index, "_type": doc_type, "_source": fields} + request.append(doc) + return send_bulk(es_client, request, workers, ignore_es_status, chunk_size) + + +def get_build_information(build_dir): + """Return dictionary with build information""" + build_inventory = get_inventory_info(build_dir) + buildinfo = get_build_info(build_dir) + return makeFields(build_inventory, buildinfo) + + +def send(ready_directory, args, directory, index, workers): + """Gen Opensearch fields and send""" + # NOTE: each process should have own Opensearch session, + # due error: TypeError: cannot pickle 'SSLSocket' object - + # SSLSocket cannot be serialized. + es_client = get_es_client(args) + + build_uuid, build_files = ready_directory + build_dir = "%s/%s" % (directory, build_uuid) + es_fields = get_build_information(build_dir) + if not es_fields: + return + + send_status = False + logging.debug("Provided build info %s" % es_fields) + + for build_file in build_files: + es_fields["filename"] = build_file + send_status = send_to_es("%s/%s" % (build_dir, build_file), + es_fields, es_client, index, workers, + args.ignore_es_status, args.chunk_size, + args.doc_type) + + if args.keep: + logging.info("Keeping file %s" % build_dir) + return + + if send_status: + remove_directory(build_dir) + else: + logging.warning("The document was not send. Keeping log file") + + +def get_index(args): + index = args.index + + if not index: + index = args.index_prefix + \ + datetime.datetime.today().strftime('%Y.%m.%d') + + if create_indices(index, args): + return index + + +def create_indices(index, args): + es_client = get_es_client(args) + try: + logging.info("Creating index %s" % index) + return es_client.indices.create(index) + except opensearch_exceptions.AuthorizationException: + logging.critical("You need to have permissions to create an index. " + "Probably you need to add [indices:admin/create] or " + "'create_index' permission to the index permissions " + "inside your role.") + except opensearch_exceptions.RequestError as e: + # NOTE(dpawlik) Use same functionality as Logstash do, so it + # will not require any additional permissions set to the default + # logstash role. + if e.error.lower() == 'resource_already_exists_exception': + logging.debug("The indices already exists, continue") + return True + + +def prepare_and_send(ready_directories, args): + """Prepare information to send and Opensearch""" + + directory = args.directory + workers = args.workers + index = get_index(args) + if not index: + logging.critical("Can not continue without created indices") + sys.exit(1) + + with multiprocessing.Pool(processes=args.workers) as pool: + pool.starmap(send, zip( + list(ready_directories.items()), + itertools.repeat(args), + itertools.repeat(directory), itertools.repeat(index), + itertools.repeat(workers))) + + +def setup_logging(debug): + if debug: + logging.basicConfig(format="%(asctime)s %(message)s", + level=logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + logging.debug("Log sender is starting...") + + +def get_es_client(args): + es_creds = { + "host": args.host, + "port": args.port, + "http_compress": True, + "use_ssl": True, + "verify_certs": args.insecure, + "ssl_show_warn": args.insecure, + } + + if args.username and args.password: + es_creds["http_auth"] = "%s:%s" % (args.username, args.password) + + es_client = OpenSearch([es_creds], timeout=60) + logging.info("Connected to Opensearch: %s" % es_client.info()) + return es_client + + +def run(args): + ready_directories = get_ready_directories(args.directory) + logging.info("Found %s builds to send to Opensearch service" % len( + ready_directories)) + prepare_and_send(ready_directories, args) + logging.info("Finished pushing logs!") + + +def main(): + args = get_arguments() + setup_logging(args.debug) + while True: + run(args) + if not args.follow: + break + time.sleep(60) + + +if __name__ == "__main__": + main() diff --git a/logscraper/tests/test_logsender.py b/logscraper/tests/test_logsender.py new file mode 100644 index 0000000..42519b7 --- /dev/null +++ b/logscraper/tests/test_logsender.py @@ -0,0 +1,520 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2022 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime + +from logscraper import logsender +from logscraper.tests import base +from ruamel.yaml import YAML +from unittest import mock + + +buildinfo = """ +_id: 17428524 +branch: master +build_args: + checkpoint_file: /tmp/results-checkpoint.txt + debug: false + directory: /tmp/logscraper + download: true + follow: false + gearman_port: 4730 + gearman_server: null + insecure: true + job_name: null + logstash_url: null + max_skipped: 500 + workers: 32 + zuul_api_url: + - https://zuul.opendev.org/api/tenant/openstack +buildset: + uuid: 52b29e0e716a4436bd20eed47fa396ce +change: 829161 +duration: 1707.0 +end_time: '2022-02-28T10:07:36' +error_detail: null +event_id: dda0cbf9caaa496b9127a7646b8a28a8 +event_timestamp: '2022-02-28T09:32:08' +final: true +held: false +job_name: openstack-tox-py39 +log_url: https://somehost/829161/3/check/openstack-tox-py39/38bf2cd/ +newrev: null +nodeset: fedora-35 +patchset: '3' +pipeline: check +project: openstack/neutron +provides: [] +ref: refs/changes/61/829161/3 +ref_url: https://review.opendev.org/829161 +result: SUCCESS +start_time: '2022-02-28T09:39:09' +tenant: openstack +uuid: 38bf2cdc947643c9bb04f11f40a0f211 +voting: true +""" + +inventory_info = """ +all: + hosts: + fedora-35: + ansible_connection: ssh + ansible_host: 127.0.0.1 + ansible_port: 22 + ansible_python_interpreter: auto + ansible_user: zuul + ara_compress_html: false + ara_report_path: ara-report + ara_report_type: html + bindep_profile: test py39 + enable_fips: false + nodepool: + az: null + cloud: rax + external_id: 3b2da968-7ec3-4356-b12c-b55b574902f8 + host_id: ed82a4a59ac22bf396288f0b93bf1c658af932130f9d336aad528f21 + interface_ip: 127.0.0.2 + label: fedora-35 + private_ipv4: 127.0.0.3 + private_ipv6: null + provider: rax-dfw + public_ipv4: 127.0.0.2 + public_ipv6: '' + region: DFW + python_version: 3.9 + tox_constraints_file: 'requirements/upper-constraints.txt' + tox_environment: + NOSE_HTML_OUT_FILE: nose_results.html + NOSE_WITH_HTML_OUTPUT: 1 + NOSE_WITH_XUNIT: 1 + tox_envlist: py39 + vars: + ara_compress_html: false + ara_report_path: ara-report + ara_report_type: html + bindep_profile: test py39 + enable_fips: false + python_version: 3.9 + tox_constraints_file: 'requirements/upper-constraints.txt' + tox_environment: + NOSE_HTML_OUT_FILE: nose_results.html + NOSE_WITH_HTML_OUTPUT: 1 + NOSE_WITH_XUNIT: 1 + tox_envlist: py39 + zuul: + _inheritance_path: + - 'some_path' + - 'some_path_2' + attempts: 1 + branch: master + build: 38bf2cdc947643c9bb04f11f40a0f211 + buildset: 52b29e0e716a4436bd20eed47fa396ce + change: '829161' + change_url: https://review.opendev.org/829161 + child_jobs: [] + event_id: dda0cbf9caaa496b9127a7646b8a28a8 + executor: + hostname: ze07.opendev.org + inventory_file: /var/lib/zuul/builds/build/ansible/inventory.yaml + log_root: /var/lib/zuul/builds/build/work/logs + result_data_file: /var/lib/zuul/builds/build/work/results.json + src_root: /var/lib/zuul/builds/build/work/src + work_root: /var/lib/zuul/builds/build/work + items: + - branch: master + change: '828673' + change_url: https://review.opendev.org/828673 + patchset: '4' + project: + canonical_hostname: opendev.org + canonical_name: opendev.org/openstack/neutron + name: openstack/neutron + short_name: neutron + src_dir: src/opendev.org/openstack/neutron + - branch: master + change: '829161' + change_url: https://review.opendev.org/829161 + patchset: '3' + project: + canonical_hostname: opendev.org + canonical_name: opendev.org/openstack/neutron + name: openstack/neutron + short_name: neutron + src_dir: src/opendev.org/openstack/neutron + job: openstack-tox-py39 + jobtags: [] + message: Q3YmM0Y2QzNzhkMWZhOWE5ODYK + patchset: '3' + pipeline: check + playbook_context: + playbook_projects: + trusted/project_0/opendev.org/opendev/base-jobs: + canonical_name: opendev.org/opendev/base-jobs + checkout: master + commit: 19dc53290a26b20d5c2c5b1bb25f029c4b04a716 + trusted/project_1/opendev.org/zuul/zuul-jobs: + canonical_name: opendev.org/zuul/zuul-jobs + checkout: master + commit: e160f59e0e76c7e8625ec2d174b044a7c92cd32e + untrusted/project_0/opendev.org/zuul/zuul-jobs: + canonical_name: opendev.org/zuul/zuul-jobs + checkout: master + commit: e160f59e0e76c7e8625ec2d174b044a7c92cd32e + untrusted/project_1/opendev.org/opendev/base-jobs: + canonical_name: opendev.org/opendev/base-jobs + checkout: master + commit: 19dc53290a26b20d5c2c5b1bb25f029c4b04a716 + playbooks: + - path: untrusted/project/opendev/zuul/zuul-jobs/playbooks/tox/run.yaml + roles: + - checkout: master + checkout_description: zuul branch + link_name: ansible/playbook_0/role_0/base-jobs + link_target: untrusted/project_1/opendev.org/opendev/base-jobs + role_path: ansible/playbook_0/role_0/base-jobs/roles + - checkout: master + checkout_description: playbook branch + link_name: ansible/playbook_0/role_1/zuul-jobs + link_target: untrusted/project_0/opendev.org/zuul/zuul-jobs + role_path: ansible/playbook_0/role_1/zuul-jobs/roles + post_review: false + project: + canonical_hostname: opendev.org + canonical_name: opendev.org/openstack/neutron + name: openstack/neutron + short_name: neutron + src_dir: src/opendev.org/openstack/neutron + projects: + opendev.org/openstack/neutron: + canonical_hostname: opendev.org + canonical_name: opendev.org/openstack/neutron + checkout: master + checkout_description: zuul branch + commit: 7be5a0aff1123b381674191f3baa1ec9c128e0f3 + name: openstack/neutron + required: false + short_name: neutron + src_dir: src/opendev.org/openstack/neutron + opendev.org/openstack/requirements: + canonical_hostname: opendev.org + canonical_name: opendev.org/openstack/requirements + checkout: master + checkout_description: zuul branch + commit: 48fb5c24764d91833d8ca7084ee9f183785becd6 + name: openstack/requirements + required: true + short_name: requirements + src_dir: src/opendev.org/openstack/requirements + ref: refs/changes/61/829161/3 + resources: {} + tenant: openstack + timeout: 3600 + voting: true +""" + +parsed_fields = { + 'build_node': 'zuul-executor', + 'build_name': 'openstack-tox-py39', + 'build_status': 'SUCCESS', + 'project': 'openstack/neutron', + 'voting': 1, + 'build_set': '52b29e0e716a4436bd20eed47fa396ce', + 'build_queue': 'check', + 'build_ref': 'refs/changes/61/829161/3', + 'build_branch': 'master', + 'build_change': 829161, + 'build_patchset': '3', + 'build_newrev': 'UNKNOWN', + 'build_uuid': '38bf2cdc947643c9bb04f11f40a0f211', + 'node_provider': 'local', + 'log_url': + 'https://somehost/829161/3/check/openstack-tox-py39/38bf2cd/', + 'tenant': 'openstack', + 'zuul_executor': 'ze07.opendev.org' +} + + +def _parse_get_yaml(text): + yaml = YAML() + return yaml.load(text) + + +class _MockedPoolMapResult: + def __init__(self, func, iterable): + self.func = func + self.iterable = iterable + + # mocked results + self._value = [self.func(i) for i in iterable] + + def get(self, timeout=0): + return self._value + + +class FakeArgs(object): + def __init__(self, directory=None, host=None, port=None, username=None, + password=None, index_prefix=None, index=None, doc_type=None, + insecure=None, follow=None, workers=None, chunk_size=None, + keep=None, ignore_es_status=None, debug=None): + + self.directory = directory + self.host = host + self.port = port + self.username = username + self.password = password + self.index_prefix = index_prefix + self.index = index + self.doc_type = doc_type + self.insecure = insecure + self.follow = follow + self.workers = workers + self.chunk_size = chunk_size + self.keep = keep + self.ignore_es_status = ignore_es_status + self.debug = debug + + +class TestSender(base.TestCase): + + @mock.patch('logscraper.logsender.remove_directory') + @mock.patch('logscraper.logsender.send_to_es') + @mock.patch('logscraper.logsender.get_build_information') + @mock.patch('logscraper.logsender.get_es_client') + @mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs( + directory="/tmp/testdir", doc_type='_doc')) + def test_send(self, mock_args, mock_es_client, mock_build_info, + mock_send_to_es, mock_remove_dir): + build_uuid = '38bf2cdc947643c9bb04f11f40a0f211' + build_files = ['job-result.txt'] + directory = '/tmp/testdir' + index = 'logstash-index' + workers = 1 + args = logsender.get_arguments() + mock_send_to_es.return_value = True + logsender.send((build_uuid, build_files), args, directory, index, + workers) + self.assertTrue(mock_remove_dir.called) + + @mock.patch('logscraper.logsender.remove_directory') + @mock.patch('logscraper.logsender.send_to_es') + @mock.patch('logscraper.logsender.get_build_information') + @mock.patch('logscraper.logsender.get_es_client') + @mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs( + directory="/tmp/testdir", keep=True, doc_type="_doc")) + def test_send_keep_dir(self, mock_args, mock_es_client, mock_build_info, + mock_send_to_es, mock_remove_dir): + build_uuid = '38bf2cdc947643c9bb04f11f40a0f211' + build_files = ['job-result.txt'] + directory = '/tmp/testdir' + index = 'logstash-index' + workers = 1 + args = logsender.get_arguments() + mock_send_to_es.return_value = True + logsender.send((build_uuid, build_files), args, directory, index, + workers) + self.assertFalse(mock_remove_dir.called) + + @mock.patch('logscraper.logsender.send_bulk') + @mock.patch('logscraper.logsender.read_text_file') + @mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs( + directory="/tmp/testdir", index="myindex", workers=1, + ignore_es_status=False, chunk_size=1000, + doc_type="zuul")) + def test_send_to_es(self, mock_args, mock_text, mock_bulk): + build_file = 'job-result.txt' + es_fields = parsed_fields + es_client = mock.Mock() + args = logsender.get_arguments() + text = ["2022-02-28 09:39:09.596010 | Job console starting...", + "2022-02-28 09:39:09.610160 | Updating repositories", + "2022-02-28 09:39:09.996235 | Preparing job workspace"] + mock_text.return_value = text + es_doc = [{ + '_index': 'myindex', + '_type': 'zuul', + '_source': { + 'build_node': 'zuul-executor', + 'build_name': 'openstack-tox-py39', + 'build_status': 'SUCCESS', + 'project': 'openstack/neutron', + 'voting': 1, + 'build_set': '52b29e0e716a4436bd20eed47fa396ce', + 'build_queue': 'check', + 'build_ref': 'refs/changes/61/829161/3', + 'build_branch': 'master', + 'build_change': 829161, + 'build_patchset': '3', + 'build_newrev': 'UNKNOWN', + 'build_uuid': '38bf2cdc947643c9bb04f11f40a0f211', + 'node_provider': 'local', + 'log_url': + 'https://somehost/829161/3/check/openstack-tox-py39/38bf2cd/', + 'tenant': 'openstack', + 'zuul_executor': 'ze07.opendev.org', + '@timestamp': '2022-02-28T09:39:09.596000', + 'message': ' Job console starting...' + } + }, { + '_index': 'myindex', + '_type': 'zuul', + '_source': { + 'build_node': 'zuul-executor', + 'build_name': 'openstack-tox-py39', + 'build_status': 'SUCCESS', + 'project': 'openstack/neutron', + 'voting': 1, + 'build_set': '52b29e0e716a4436bd20eed47fa396ce', + 'build_queue': 'check', + 'build_ref': 'refs/changes/61/829161/3', + 'build_branch': 'master', + 'build_change': 829161, + 'build_patchset': '3', + 'build_newrev': 'UNKNOWN', + 'build_uuid': '38bf2cdc947643c9bb04f11f40a0f211', + 'node_provider': 'local', + 'log_url': + 'https://somehost/829161/3/check/openstack-tox-py39/38bf2cd/', + 'tenant': 'openstack', + 'zuul_executor': 'ze07.opendev.org', + '@timestamp': '2022-02-28T09:39:09.610000', + 'message': ' Updating repositories' + } + }, { + '_index': 'myindex', + '_type': 'zuul', + '_source': { + 'build_node': 'zuul-executor', + 'build_name': 'openstack-tox-py39', + 'build_status': 'SUCCESS', + 'project': 'openstack/neutron', + 'voting': 1, + 'build_set': '52b29e0e716a4436bd20eed47fa396ce', + 'build_queue': 'check', + 'build_ref': 'refs/changes/61/829161/3', + 'build_branch': 'master', + 'build_change': 829161, + 'build_patchset': '3', + 'build_newrev': 'UNKNOWN', + 'build_uuid': '38bf2cdc947643c9bb04f11f40a0f211', + 'node_provider': 'local', + 'log_url': + 'https://somehost/829161/3/check/openstack-tox-py39/38bf2cd/', + 'tenant': 'openstack', + 'zuul_executor': 'ze07.opendev.org', + '@timestamp': '2022-02-28T09:39:09.996000', + 'message': ' Preparing job workspace' + } + }] + logsender.send_to_es(build_file, es_fields, es_client, args.index, + args.workers, args.ignore_es_status, + args.chunk_size, args.doc_type) + mock_bulk.assert_called_once_with(es_client, es_doc, 1, False, 1000) + + @mock.patch('collections.deque') + @mock.patch('opensearchpy.helpers.parallel_bulk') + def test_send_bulk(self, mock_parallel_bulk, mock_deque): + es_client = mock.MagicMock() + mock_parallel_bulk.return_value = [(True, "200"), (True, "200")] + request = [{'some': 'info'}, {'other': 'info'}] + workers = 1 + chunk_size = 1500 + ignore_es_status = False + bulk = logsender.send_bulk(es_client, request, workers, + ignore_es_status, chunk_size) + self.assertFalse(mock_deque.called) + self.assertTrue(mock_parallel_bulk.called) + self.assertTrue(bulk) + + @mock.patch('collections.deque') + @mock.patch('opensearchpy.helpers.parallel_bulk') + def test_send_bulk_ignore_status(self, mock_parallel_bulk, mock_deque): + es_client = mock.MagicMock() + request = [{'some': 'info'}, {'other': 'info'}] + workers = 1 + chunk_size = 1500 + ignore_es_status = True + logsender.send_bulk(es_client, request, workers, ignore_es_status, + chunk_size) + self.assertTrue(mock_deque.called) + self.assertTrue(mock_parallel_bulk.called) + + @mock.patch('collections.deque') + @mock.patch('opensearchpy.helpers.parallel_bulk') + def test_send_bulk_error(self, mock_parallel_bulk, mock_deque): + es_client = mock.Mock() + mock_parallel_bulk.return_value = [(True, "200"), (False, "500")] + request = [{'some': 'info'}, {'other': 'info'}] + workers = 1 + chunk_size = 1500 + ignore_es_status = False + bulk = logsender.send_bulk(es_client, request, workers, + ignore_es_status, chunk_size) + self.assertFalse(mock_deque.called) + self.assertTrue(mock_parallel_bulk.called) + self.assertIsNone(bulk) + + @mock.patch('logscraper.logsender.read_yaml_file', + side_effect=[_parse_get_yaml(buildinfo), + _parse_get_yaml(inventory_info)]) + def test_makeFields(self, mock_read_yaml_file): + buildinfo_yaml = logsender.get_build_info('fake_dir') + inventory_info_yaml = logsender.get_inventory_info('other_fake_dir') + generated_info = logsender.makeFields(inventory_info_yaml, + buildinfo_yaml) + self.assertEqual(parsed_fields, generated_info) + + def test_get_message(self): + line_1 = "28-02-2022 09:44:58.839036 | Some message" + line_2 = "2022-02-28 09:44:58.839036 | Other message | other log info" + self.assertEqual(" Some message", logsender.get_message(line_1)) + self.assertEqual(" Other message | other log info", + logsender.get_message(line_2)) + + def test_get_timestamp(self): + line_1 = "28-02-2022 09:44:58.839036 | Some message" + line_2 = "2022-02-28 09:44:58.839036 | Other message" + self.assertEqual(None, logsender.get_timestamp(line_1)) + self.assertEqual("2022-02-28T09:44:58.839000", + logsender.get_timestamp(line_2)) + + @mock.patch('logscraper.logsender.get_es_client') + @mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs( + index_prefix="my-index-", workers=2)) + def test_get_index(self, mock_args, mock_es_client): + args = logsender.get_arguments() + expected_index = ("my-index-%s" % + datetime.datetime.today().strftime('%Y.%m.%d')) + index = logsender.get_index(args) + self.assertEqual(expected_index, index) + + @mock.patch('logscraper.logsender.send') + @mock.patch('logscraper.logsender.get_index') + @mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs( + directory="/tmp/testdir", workers=2, index='myindex')) + def test_prepare_and_send(self, mock_args, mock_index, mock_send): + args = logsender.get_arguments() + ready_directories = {'builduuid': ['job-result.txt']} + mock_index.return_value = args.index + with mock.patch( + 'multiprocessing.pool.Pool.starmap', + lambda self, func, iterable, chunksize=None, + callback=None, + error_callback=None: _MockedPoolMapResult(func, iterable), + ): + logsender.prepare_and_send(ready_directories, args) + self.assertTrue(mock_send.called) + mock_send.assert_called_with((('builduuid', ['job-result.txt']), + args, args.directory, args.index, 2)) diff --git a/requirements.txt b/requirements.txt index 360e088..1e09e24 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,7 @@ pbr>=1.6 # Apache-2.0 gear<0.17 requests<2.27 # Apache-2.0 -PyYAML<6.1 # MIT +PyYAML<6.1 # MIT tenacity +opensearch-py<=1.0.0 # Apache-2.0 +ruamel.yaml diff --git a/setup.cfg b/setup.cfg index 83dd9b8..e69dc69 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,3 +25,4 @@ packages = [entry_points] console_scripts = logscraper = logscraper.logscraper:main + logsender = logscraper.logsender:main