Remove log gearman functionality

The combination of logscraper and logsender is working for a long time
in OpenDev infra and it never has any issue even during release time.

Change-Id: I1be94eca5d842b02c558e5ffda85c23fa66e6759
This commit is contained in:
Daniel Pawlik 2024-03-06 16:14:50 +01:00 committed by daniel.pawlik
parent d06d0e7494
commit bff98095b1
36 changed files with 61 additions and 1700 deletions

@ -1,13 +1,4 @@
---
- job:
name: ci-log-processing-functional-test-centos-8-stream
description: Test is validating ci log processing services
run: ansible/playbooks/check-services.yml
voting: false
nodeset:
nodes:
- name: centos-8-stream
label: centos-8-stream
- job:
name: ci-log-processing-functional-test-centos-8-stream-sender
description: Test is validating Logscraper and logsender services
@ -38,7 +29,7 @@
- job:
name: ci-log-processing-build-image
parent: opendev-build-docker-image
description: Build logscraper and loggearman images.
description: Build logscraper
allowed-projects: openstack/ci-log-processing
timeout: 2700
vars: &cilogprocessing_image_vars
@ -48,11 +39,6 @@
target: logscraper
tags:
&imagetag "{{ zuul.tag is defined | ternary([zuul.get('tag', '').split('.')[0], '.'.join(zuul.get('tag', '').split('.')[:2]), zuul.get('tag', '')], ['latest']) }}"
- context: loggearman/
repository: cilogprocessing/loggearman
target: loggearman
tags:
*imagetag
- job:
name: ci-log-processing-upload-image
@ -69,8 +55,6 @@
soft: true
- name: openstack-tox-py38
soft: true
- name: ci-log-processing-functional-test-centos-8-stream
soft: true
- name: ci-log-processing-functional-test-centos-8-stream-sender
soft: true
- name: ci-log-processing-build-image
@ -86,14 +70,12 @@
- openstack-tox-pep8
- openstack-tox-py38
- ci-log-processing-build-image
- ci-log-processing-functional-test-centos-8-stream
- ci-log-processing-functional-test-centos-8-stream-sender
gate:
jobs:
- openstack-tox-linters
- openstack-tox-pep8
- openstack-tox-py38
- ci-log-processing-functional-test-centos-8-stream
- ci-log-processing-functional-test-centos-8-stream-sender
- ci-log-processing-build-image
post:

@ -31,77 +31,10 @@ Dashboards objects versioning.
Available workflows
-------------------
The Openstack CI Log Processing project is providing two configurations
The Openstack CI Log Processing project is provides configuration
for sending logs from Zuul CI to the Opensearch host.
1. Logscraper, log gearman client, log gearman worker, logstash
With this solution, log workflow looks like:
.. code-block:: shell
+------------------+ 1. Get last builds info +----------------+
| |-------------------------> | |
| Logscraper | | Zuul API |
| |<------------------------- | |
+------------------+ 2. Fetch data +----------------+
|
|
+-------------------+
|
|
3. Send queue |
logs to gearman |
client |
v
+------------------+
| |
| Log gearman |
| client |
+------------------+
+-------------- --------------+
| | |
| 4. Consume queue, | |
| download log files | |
| | |
v v v
+---------------+ +----------------+ +---------------+
| Log gearman | | Log gearman | | Log gearman |
| worker | | worker | | worker |
+---------------+ +----------------+ +---------------+
| | |
| 5. Send to | |
| Logstash | |
| v |
| +----------------+ |
| | | |
+---------> | Logstash | <-------+
| |
+----------------+
|
6. Send to |
Opensearch |
|
+--------v--------+
| |
| Opensearch |
| |
+-----------------+
On the beginning, this project was designed to use that solution, but
it have a few bottlenecks:
- log gearman client can use many memory, when log gearman worker is not fast,
- one log gearman worker is not enough even on small infrastructure,
- logstash service can fail,
- logscraper is checking if log files are available, then log gearman
is downloading the logs, which can make an issue on log sever, that
host does not have free socket.
You can deploy your log workflow by using example Ansible playbook that
you can find in `ansible/playbooks/check-services.yml` in this project.
2. Logscraper, logsender
Logscraper, logsender
This workflow removes bottlenecks by removing: log gearman client,
log gearman worker and logstash service. Logs are downloaded when

@ -1,33 +0,0 @@
---
- hosts: all
become: true
vars:
# loggearman - worker
output_host: 0.0.0.0
output_port: 9999
gearman_host: 0.0.0.0
gearman_port: 4730
log_cert_verify: false
# loggearman - client
source_url: ""
gearman_client_host: "{{ gearman_host }}"
gearman_client_port: "{{ gearman_port }}"
# logscraper
tenant_builds:
- tenant: openstack
gearman_port: "{{ gearman_port }}"
gearman_server: "{{ gearman_host }}"
zuul_api_url:
- https://zuul.opendev.org/api/tenant/openstack
insecure: false
job_names: []
download: false
pre_tasks:
- name: Update all packages
become: true
package:
name: "*"
state: latest
roles:
- check-services
- backup-dashboards-objects

@ -1,6 +0,0 @@
---
- name: Configure log-gearman-client and log-gearman-worker tools
hosts: logscraper01.openstack.org
become: true
roles:
- loggearman

@ -26,25 +26,6 @@
podman images --noheading quay.io/logscraper:dev | awk '{print $3}'
register: _logscraper_image_id
- name: Build logscraper container image
shell: >
podman build -t quay.io/loggearman:dev -f loggearman/Dockerfile
args:
chdir: "{{ zuul.projects['opendev.org/openstack/ci-log-processing'].src_dir }}"
when: zuul is defined
- name: Build loggearman container image - non Zuul
shell: >
podman build -t quay.io/loggearman:dev -f loggearman/Dockerfile
args:
chdir: "{{ playbook_dir }}"
when: zuul is not defined
- name: Get loggearman image id
shell: |
podman images --noheading quay.io/loggearman:dev | awk '{print $3}'
register: _loggearman_image_id
- name: Print all images
shell: |
podman images
@ -53,7 +34,6 @@
set_fact:
container_images:
logscraper: "{{ _logscraper_image_id.stdout }}"
loggearman: "{{ _loggearman_image_id.stdout }}"
### OPENSEARCH ####
- name: Setup Opensearch
@ -129,23 +109,6 @@
delay: 10
timeout: 300
### Loggearman ###
- name: Setup loggearman service
include_role:
name: loggearman
# Flush handlers before running test
- name: Force all notified handlers to run now
meta: flush_handlers
### service validation ###
- name: Check if log gearman client is listening
wait_for:
host: "{{ gearman_host }}"
port: "{{ gearman_port }}"
delay: 10
timeout: 300
### Logscraper ###
- name: Setup logscraper service
include_role:
@ -156,8 +119,6 @@
systemctl is-active -q {{ item }}
loop:
- logscraper-openstack
- loggearman-client
- loggearman-worker
register: _service_status
failed_when: _service_status.rc != 0
@ -191,14 +152,6 @@
shell: |
podman logs opensearch
- name: Get gearman client logs
shell: |
podman logs loggearman-client
- name: Get gearman worker logs
shell: |
podman logs loggearman-worker
- name: Get indices to fail the test
uri:
url: "https://127.0.0.1:9200/_cat/indices"

@ -1,32 +0,0 @@
Loggearman ansible role
=======================
The goal of this role is to setup and configure service related
to `log-gearman-client` and `log-gearman-worker` scripts, that
were ported to this project repository from `puppet-log_processor repository
<https://opendev.org/opendev/puppet-log_processor/src/branch/master/files>`__.
Configuration
-------------
The role is automatically deploying services:
* log-gearman-client
* log-gearman-worker
inside the container.
Example playbook setup
----------------------
.. code-block:: yaml
- name: Configure loggearman tool
hosts: localhost
become: true
vars:
source_url: https://localhost
output_hosts: mylogstashhost.com
log_cert_verify: True
roles:
- loggearman

@ -1,31 +0,0 @@
---
loggearman_user: loggearman
loggearman_group: loggearman
loggearman_gid: 10211
loggearman_uid: 10211
loggearman_dir: /etc/loggearman
loggearman_log_dir: /var/log/loggearman
container_images:
# FIXME: Move image to dedicated repository on Docker hub.
loggearman: quay.io/software-factory/loggearman:latest
# Gearman client
gearman_client_host: 0.0.0.0
gearman_client_port: 4730
source_url: ""
zmq_publishers: []
subunit_files: []
source_files: []
# Gearman worker
gearman_host: 0.0.0.0
gearman_port: 4730
output_host: logstash.example.com
output_port: 9999
output_mode: tcp
crm114_script: ""
crm114_data: ""
log_ca_certs: ""
log_cert_verify: true

@ -1,14 +0,0 @@
---
- name: restart loggearman client
service:
name: loggearman-client
state: restarted
daemon-reload: true
enabled: true
- name: restart loggearman worker
service:
name: loggearman-worker
state: restarted
daemon-reload: true
enabled: true

@ -1,62 +0,0 @@
---
- name: Create decidated group
group:
name: "{{ loggearman_group }}"
gid: "{{ loggearman_gid }}"
state: present
- name: Create dedicated user
user:
name: "{{ loggearman_user }}"
state: present
comment: "Dedicated user for loggearman"
group: "{{ loggearman_group }}"
uid: "{{ loggearman_uid }}"
shell: "/sbin/nologin"
create_home: false
- name: Create dedicated directories
file:
path: "{{ item }}"
state: directory
owner: "{{ loggearman_user }}"
group: "{{ loggearman_group }}"
mode: "0755"
loop:
- "{{ loggearman_dir }}"
- "{{ loggearman_log_dir }}"
- name: Init log files
file:
path: "{{ loggearman_log_dir }}/{{ item }}.log"
state: touch
owner: "{{ loggearman_user }}"
group: "{{ loggearman_group }}"
mode: "0644"
loop:
- client
- worker
- name: Ensure container software is installed
package:
name: podman
state: present
- name: Create configuration files
template:
src: "{{ item }}.yml.j2"
dest: "{{ loggearman_dir }}/{{ item }}.yml"
owner: "{{ loggearman_user }}"
group: "{{ loggearman_group }}"
mode: "0644"
loop:
- client
- worker
notify:
- restart loggearman {{ item }}
- name: Configure loggearman service
include_tasks: service.yml
loop:
- client
- worker

@ -1,17 +0,0 @@
---
- name: Generate podman-loggearman-{{ item }} script
template:
src: loggearman.sh.j2
dest: "/usr/local/bin/podman-loggearman-{{ item }}"
mode: '0755'
notify:
- restart loggearman {{ item }}
- name: Generate systemd unit loggearman-{{ item }}
template:
src: loggearman.service.j2
dest: "/etc/systemd/system/loggearman-{{ item }}.service"
owner: root
group: root
notify:
- restart loggearman {{ item }}

@ -1,7 +0,0 @@
---
gearman-host: {{ gearman_client_host }}
gearman-port: {{ gearman_client_port }}
source-url: {{ source_url }}
zmq-publishers: {{ zmq_publishers }}
subunit-files: {{ subunit_files }}
source-files: {{ source_files }}

@ -1,16 +0,0 @@
[Unit]
Description=loggearman {{ item }} service
After=syslog.target network.target
StartLimitInterval=20
StartLimitBurst=5
[Service]
Type=simple
SyslogIdentifier=loggearman-{{ item }}
ExecStart=/usr/local/bin/podman-loggearman-{{ item }}
ExecStop=/usr/bin/podman stop loggearman-{{ item }}
Restart=always
RestartSec=5s
[Install]
WantedBy=multi-user.target

@ -1,17 +0,0 @@
#!/bin/bash
# MANAGED BY ANSIBLE
/usr/bin/podman run \
--network host \
--rm \
--user 1000:1000 \
--uidmap 0:{{ loggearman_uid + 1 }}:999 \
--uidmap 1000:{{ loggearman_uid }}:1 \
--name loggearman-{{ item }} \
--volume {{ loggearman_dir }}:{{ loggearman_dir }}:z \
--volume {{ loggearman_log_dir }}:{{ loggearman_log_dir }}:z \
{{ container_images['loggearman'] }} \
log-gearman-{{ item }} \
-c {{ loggearman_dir }}/{{ item }}.yml \
--foreground \
-d {{ loggearman_log_dir }}/{{ item }}.log

@ -1,16 +0,0 @@
---
gearman-host: {{ gearman_host }}
gearman-port: {{ gearman_port }}
output-host: {{ output_host }}
output-port: {{ output_port }}
output-mode: {{ output_mode }}
log-cert-verify: {{ log_cert_verify }}
{% if crm114_script %}
crm114-script: {{ crm114_script }}
{% endif %}
{% if crm114_data %}
crm114-data: {{ crm114_data }}
{% endif %}
{% if log_ca_certs %}
log-ca-certs: {{ log_ca_certs }}
{% endif %}

@ -2,8 +2,8 @@ Logscraper ansible role
=======================
The goal of this role is to setup and configure service related
to logscraper script which is responsible to to push recent
zuul builds into log gearman processor.
to logscraper script which is responsible to pull latest Zuul CI job
logs to local storage.
Requirements
------------
@ -23,8 +23,6 @@ for example:
vars:
tenant_builds:
- tenant: openstack
gearman_port: 4731
gearman_server: logstash.openstack.org
zuul_api_url:
- https://zuul.opendev.org/api/tenant/openstack
insecure: false
@ -57,8 +55,6 @@ and second one for getting logs from `sometenant` tenant.
vars:
tenant_builds:
- tenant: openstack
gearman_port: 4731
gearman_server: logstash.openstack.org
zuul_api_url:
- https://zuul.opendev.org/api/tenant/openstack
insecure: False
@ -66,7 +62,6 @@ and second one for getting logs from `sometenant` tenant.
zuul_api_url:
- https://zuul.opendev.org/api/tenant/sometenant
insecure: True
download: true
download_dir: /mnt/logscraper
file_list:
- /etc/logscraper/my-downloadlist.yaml

@ -12,8 +12,6 @@ container_images:
# Example:
# tenant_builds:
# - tenant: openstack
# gearman_port: 4731
# gearman_server: logstash.openstack.org
# zuul_api_url:
# - https://zuul.opendev.org/api/tenant/openstack
# insecure: false
@ -22,13 +20,11 @@ container_images:
# zuul_api_url:
# - https://zuul.opendev.org/api/tenant/sometenant
# insecure: true
# download: true
# download_dir: /mnt/logscraper/sometenant
# file_list: []
# job_name:
# - test
# - test_new
# logstash_url: https://somelogstash.com:9999
# max_skipped: 100
# debug: true
# logscraper_wait_time: 120

@ -35,7 +35,7 @@
dest: "{{ logscraper_dir }}/download-list-{{ item.tenant }}.yaml"
owner: "{{ logscraper_user }}"
group: "{{ logscraper_group }}"
mode: "0640"
mode: "0644"
register: _download_file
- name: Generate systemd unit

@ -18,4 +18,4 @@
--volume {{ item.download_dir }}:{{ item.download_dir }}:z \
{% endif %}
{{ container_images['logscraper'] }} \
/usr/bin/logscraper --config {{ logscraper_dir }}/logscraper-{{ item['tenant'] }}.config
/usr/local/bin/logscraper --config {{ logscraper_dir }}/logscraper-{{ item['tenant'] }}.config

@ -16,4 +16,4 @@
--volume {{ item['logsender_custom_ca_crt'] }}:{{ item['logsender_custom_ca_crt'] }}:z \
{% endif %}
{{ container_images['logsender'] }} \
/usr/bin/logsender --config {{ logscraper_dir }}/logsender-{{ item['tenant'] }}.config
/usr/local/bin/logsender --config {{ logscraper_dir }}/logsender-{{ item['tenant'] }}.config

@ -24,8 +24,6 @@ Indices and tables
* :doc:`logscraper-role`
* :doc:`logsender`
* :doc:`logsender-role`
* :doc:`loggearman`
* :doc:`loggearman-role`
.. toctree::
:maxdepth: 2
@ -36,5 +34,3 @@ Indices and tables
logscraper-role
logsender
logsender-role
loggearman
loggearman-role

@ -1 +0,0 @@
../../ansible/roles/loggearman/README.rst

@ -1,22 +0,0 @@
Loggearman
==========
The Loggearman tools are responsible for listening events,
parse them, get logs from log server and push them to
the Logstash service.
Loggearman Client
-----------------
The Loggearman Client is responsible for listening events that
comes on port 4730 (by default), parse them and redirect them to
German server, that later will be processed by loggearman worker.
Loggearman Worker
-----------------
The Loggearman Worker is responsible to get log files from the
log server, parse them and send line by line to the Logstash service
with necessary fields like: build_uuid, build_name, etc.

@ -11,7 +11,7 @@ It is available by typing:
logscraper --help
Fetch and push last Zuul CI job logs into gearman.
Fetch and push last Zuul CI job logs:
optional arguments:
-h, --help show this help message and exit
@ -20,25 +20,16 @@ It is available by typing:
times.
--job-name JOB_NAME CI job name(s). Parameter can be set multiple times.
If not set it would scrape every latest builds.
--gearman-server GEARMAN_SERVER
Gearman host addresss
--gearman-port GEARMAN_PORT
Gearman listen port. Defaults to 4730.
--follow Keep polling zuul builds
--insecure Skip validating SSL cert
--checkpoint-file CHECKPOINT_FILE
File that will keep information about last uuid
timestamp for a job.
--logstash-url LOGSTASH_URL
When provided, script will check connection to
Logstash service before sending to log processing
system. For example: logstash.local:9999
--workers WORKERS Worker processes for logscraper
--max-skipped MAX_SKIPPED
How many job results should be checked until last uuid
written in checkpoint file is founded
--debug Print more information
--download Download logs and do not send to gearman service
--directory DIRECTORY
Directory, where the logs will be stored. Defaults to:
/tmp/logscraper
@ -51,30 +42,6 @@ Base on the use case, we can run logscraper.
Example:
* periodical check if there are some new logs for `openstack` tenant:
.. code-block::
logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /tmp/results-checkpoint --follow
* one shot on getting logs from `zuul` tenant:
.. code-block::
logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --checkpoint-file /tmp/zuul-result-timestamp
* periodically scrape logs from tenants: `openstack`, `zuul` and `local`
.. code-block::
logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --zuul-api-url https://zuul.opendev.org/api/tenant/local --checkpoint-file /tmp/someresults --follow
* scrape logs from two defined job names: `tripleo-ci-centos-8-containers-multinode` and `openstack-tox-linters` for tenants: `openstack` and `local`:
.. code-block::
logscraper --gearman-server localhost --job-name tripleo-ci-centos-8-containers-multinode --job-name openstack-tox-linters --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --zuul-api-url https://zuul.opendev.org/api/tenant/local
* download logs to /mnt/logscraper. NOTE: if you are using container service, this directory needs to be mounted!
.. code-block::
@ -97,13 +64,6 @@ Then you can execute commands that are described above.
NOTE: if you want to use parameter `--checkpoint-file`, you need to mount a volume
to the container, for example:
.. code-block::
docker run -v $(pwd):/checkpoint-dir:z -d logscraper logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint --follow
In this example, logscraper will download log files to the /mnt/logscraper directory:
.. code-block::
docker run -v $(pwd):/checkpoint-dir:z -v /mnt/logscraper:/mnt/logscraper:z -d logscraper logscraper --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint --directory /mnt/logscraper --download --follow

@ -1,41 +0,0 @@
# Copyright (C) 2021 Red Hat
# Copyright (C) 2022 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
FROM quay.io/centos/centos:stream8 as loggearman
ENV OSLO_PACKAGE_VERSION='0.0.1'
ENV PATH=~/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
ENV LANG=en_US.UTF-8
RUN groupadd --gid 1000 loggearman && \
useradd --home-dir /home/loggearman --gid 1000 --uid 1000 loggearman
RUN dnf update -y && \
dnf install -y python38 python38-setuptools \
python38-devel python38-wheel \
python38-pip git
COPY . /tmp/src
RUN cd /tmp/src && \
pip3 install -r requirements.txt && \
python3 setup.py install && \
rm -rf /tmp/src
RUN dnf remove -y python3-devel git && \
dnf autoremove -y && \
dnf clean all && \
rm -rf ~/.cache/pip
USER loggearman

@ -1,8 +0,0 @@
OpenStack Log Processor Module
==============================
The Log Processor Module comes from `repository`_ with
applied patches from this `patchset`_.
.. _repository: https://opendev.org/opendev/puppet-log_processor
.. _patchset: https://review.opendev.org/c/opendev/puppet-log_processor/+/809424

@ -1,246 +0,0 @@
#!/usr/bin/env python3
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import daemon
import gear
import json
import logging
import os
import os.path
import re
import signal
import socket
import threading
import time
import yaml
import zmq
try:
import daemon.pidlockfile as pidfile_mod
except ImportError:
import daemon.pidfile as pidfile_mod
class EventProcessor(threading.Thread):
def __init__(self, zmq_address, gearman_client, files, source_url):
threading.Thread.__init__(self)
self.files = files
self.source_url = source_url
self.gearman_client = gearman_client
self.zmq_address = zmq_address
self._connect_zmq()
def run(self):
while True:
try:
self._read_event()
except Exception:
# Assume that an error reading data from zmq or deserializing
# data received from zmq indicates a zmq error and reconnect.
logging.exception("ZMQ exception.")
self._connect_zmq()
def _connect_zmq(self):
logging.debug("Connecting to zmq endpoint.")
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
event_filter = b"onFinalized"
self.socket.setsockopt(zmq.SUBSCRIBE, event_filter)
self.socket.connect(self.zmq_address)
def _read_event(self):
string = self.socket.recv().decode('utf-8')
event = json.loads(string.split(None, 1)[1])
logging.debug("Jenkins event received: " + json.dumps(event))
for fileopts in self.files:
output = {}
source_url, out_event = self._parse_event(event, fileopts)
job_filter = fileopts.get('job-filter')
if (job_filter and not re.match(
job_filter, out_event['fields']['build_name'])):
continue
build_queue_filter = fileopts.get('build-queue-filter')
if (build_queue_filter and not re.match(
build_queue_filter, out_event['fields']['build_queue'])):
continue
project_filter = fileopts.get('project-filter')
if (project_filter and not re.match(
project_filter, out_event['fields']['project'])):
continue
output['source_url'] = source_url
output['retry'] = fileopts.get('retry-get', False)
output['event'] = out_event
if 'subunit' in fileopts.get('name'):
job = gear.Job(b'push-subunit',
json.dumps(output).encode('utf8'))
else:
job = gear.Job(b'push-log', json.dumps(output).encode('utf8'))
try:
self.gearman_client.submitJob(job)
except Exception:
logging.exception("Exception submitting job to Gearman.")
def _get_log_dir(self, event):
parameters = event["build"].get("parameters", {})
base = parameters.get('LOG_PATH', 'UNKNOWN')
return base
def _parse_fields(self, event, filename):
fields = {}
fields["filename"] = filename
fields["build_name"] = event.get("name", "UNKNOWN")
fields["build_status"] = event["build"].get("status", "UNKNOWN")
fields["build_node"] = event["build"].get("node_name", "UNKNOWN")
fields["build_master"] = event["build"].get("host_name", "UNKNOWN")
parameters = event["build"].get("parameters", {})
fields["project"] = parameters.get("ZUUL_PROJECT", "UNKNOWN")
# The voting value is "1" for voting, "0" for non-voting
fields["voting"] = parameters.get("ZUUL_VOTING", "UNKNOWN")
# TODO(clarkb) can we do better without duplicated data here?
fields["build_uuid"] = parameters.get("ZUUL_UUID", "UNKNOWN")
fields["build_short_uuid"] = fields["build_uuid"][:7]
fields["build_queue"] = parameters.get("ZUUL_PIPELINE", "UNKNOWN")
fields["build_ref"] = parameters.get("ZUUL_REF", "UNKNOWN")
fields["build_branch"] = parameters.get("ZUUL_BRANCH", "UNKNOWN")
fields["build_zuul_url"] = parameters.get("ZUUL_URL", "UNKNOWN")
if parameters.get("ZUUL_CHANGE"):
fields["build_change"] = parameters.get("ZUUL_CHANGE", "UNKNOWN")
fields["build_patchset"] = parameters.get("ZUUL_PATCHSET",
"UNKNOWN")
elif parameters.get("ZUUL_NEWREV"):
fields["build_newrev"] = parameters.get("ZUUL_NEWREV",
"UNKNOWN")
if ["build_node"] != "UNKNOWN":
node_provider = '-'.join(
fields["build_node"].split('-')[-3:-1])
fields["node_provider"] = node_provider or "UNKNOWN"
else:
fields["node_provider"] = "UNKNOWN"
return fields
def _parse_event(self, event, fileopts):
fields = self._parse_fields(event, fileopts['name'])
log_dir = self._get_log_dir(event)
source_url = fileopts.get('source-url', self.source_url) + '/' + \
os.path.join(log_dir, fileopts['name'])
fields["log_url"] = source_url
out_event = {}
out_event["fields"] = fields
out_event["tags"] = [os.path.basename(fileopts['name'])] + \
fileopts.get('tags', [])
return source_url, out_event
class Server(object):
def __init__(self, config, debuglog):
# Config init.
self.config = config
self.source_url = self.config['source-url']
# Pythong logging output file.
self.debuglog = debuglog
self.processors = []
def setup_logging(self):
if self.debuglog:
logging.basicConfig(format='%(asctime)s %(message)s',
filename=self.debuglog, level=logging.DEBUG)
else:
# Prevent leakage into the logstash log stream.
logging.basicConfig(level=logging.CRITICAL)
logging.debug("Log pusher starting.")
def setup_processors(self):
for publisher in self.config['zmq-publishers']:
gearclient = gear.Client()
host = self.config.get('gearman-host', 'localhost')
port = self.config.get('gearman-port', 4730)
gearclient.addServer(host, port=port)
gearclient.waitForServer()
log_processor = EventProcessor(
publisher, gearclient,
self.config['source-files'], self.source_url)
subunit_processor = EventProcessor(
publisher, gearclient,
self.config['subunit-files'], self.source_url)
self.processors.append(log_processor)
self.processors.append(subunit_processor)
def wait_for_name_resolution(self, host, port):
while True:
try:
socket.getaddrinfo(host, port)
except socket.gaierror as e:
if e.errno == socket.EAI_AGAIN:
logging.debug("Temporary failure in name resolution")
time.sleep(2)
continue
else:
raise
break
def main(self):
statsd_host = os.environ.get('STATSD_HOST')
statsd_port = int(os.environ.get('STATSD_PORT', 8125))
statsd_prefix = os.environ.get('STATSD_PREFIX', 'logstash.geard')
if statsd_host:
self.wait_for_name_resolution(statsd_host, statsd_port)
self.gearserver = gear.Server(
port=self.config.get('gearman-port', 4730),
statsd_host=statsd_host,
statsd_port=statsd_port,
statsd_prefix=statsd_prefix)
self.setup_processors()
for processor in self.processors:
processor.daemon = True
processor.start()
while True:
signal.pause()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", required=True,
help="Path to yaml config file.")
parser.add_argument("-d", "--debuglog",
help="Enable debug log. "
"Specifies file to write log to.")
parser.add_argument("--foreground", action='store_true',
help="Run in the foreground.")
parser.add_argument("-p", "--pidfile",
default="/var/run/jenkins-log-pusher/"
"jenkins-log-gearman-client.pid",
help="PID file to lock during daemonization.")
args = parser.parse_args()
with open(args.config, 'r') as config_stream:
config = yaml.safe_load(config_stream)
server = Server(config, args.debuglog)
if args.foreground:
server.setup_logging()
server.main()
else:
pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10)
with daemon.DaemonContext(pidfile=pidfile):
server.setup_logging()
server.main()
if __name__ == '__main__':
main()

@ -1,566 +0,0 @@
#!/usr/bin/env python3
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import daemon
import gear
import json
import logging
import os
import queue
import re
import requests
import select
import socket
import subprocess
import sys
import threading
import time
import yaml
import paho.mqtt.publish as publish
try:
import daemon.pidlockfile as pidfile_mod
except ImportError:
import daemon.pidfile as pidfile_mod
def semi_busy_wait(seconds):
# time.sleep() may return early. If it does sleep() again and repeat
# until at least the number of seconds specified has elapsed.
start_time = time.time()
while True:
time.sleep(seconds)
cur_time = time.time()
seconds = seconds - (cur_time - start_time)
if seconds <= 0.0:
return
class FilterException(Exception):
pass
class CRM114Filter(object):
def __init__(self, script, path, build_status):
self.p = None
self.script = script
self.path = path
self.build_status = build_status
if build_status not in ['SUCCESS', 'FAILURE']:
return
if not os.path.exists(path):
os.makedirs(path)
args = [script, path, build_status]
self.p = subprocess.Popen(args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
close_fds=True)
def process(self, data):
if not self.p:
return True
self.p.stdin.write(data['message'].encode('utf-8') + '\n')
(r, w, x) = select.select([self.p.stdout], [],
[self.p.stdin, self.p.stdout], 20)
if not r:
self.p.kill()
raise FilterException('Timeout reading from CRM114')
r = self.p.stdout.readline()
if not r:
err = self.p.stderr.read()
if err:
raise FilterException(err)
else:
raise FilterException('Early EOF from CRM114')
r = r.strip()
data['error_pr'] = float(r)
return True
def _catchOSError(self, method):
try:
method()
except OSError:
logging.exception("Subprocess cleanup failed.")
def close(self):
if not self.p:
return
# CRM114 should die when its stdinput is closed. Close that
# fd along with stdout and stderr then return.
self._catchOSError(self.p.stdin.close)
self._catchOSError(self.p.stdout.close)
self._catchOSError(self.p.stderr.close)
self._catchOSError(self.p.wait)
class CRM114FilterFactory(object):
name = "CRM114"
def __init__(self, script, basepath):
self.script = script
self.basepath = basepath
# Precompile regexes
self.re_remove_suffix = re.compile(r'(\.[^a-zA-Z]+)?(\.gz)?$')
self.re_remove_dot = re.compile(r'\.')
def create(self, fields):
# We only want the basename so that the same logfile at different
# paths isn't treated as different
filename = os.path.basename(fields['filename'])
# We want to collapse any numeric or compression suffixes so that
# nova.log and nova.log.1 and nova.log.1.gz are treated as the same
# logical file
filename = self.re_remove_suffix.sub(r'', filename)
filename = self.re_remove_dot.sub('_', filename)
path = os.path.join(self.basepath, filename)
return CRM114Filter(self.script, path, fields['build_status'])
class OsloSeverityFilter(object):
DATEFMT = r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}((\.|\,)\d{3,6})?'
SEVERITYFMT = '(DEBUG|INFO|WARNING|ERROR|TRACE|AUDIT|CRITICAL)'
OSLO_LOGMATCH = (r'^(?P<date>%s)(?P<line>(?P<pid> \d+)? '
'(?P<severity>%s).*)' %
(DATEFMT, SEVERITYFMT))
OSLORE = re.compile(OSLO_LOGMATCH)
def process(self, data):
msg = data['message']
m = self.OSLORE.match(msg)
if m:
data['severity'] = m.group('severity')
if data['severity'].lower == 'debug':
# Ignore debug-level lines
return False
return True
def close(self):
pass
class OsloSeverityFilterFactory(object):
name = "OsloSeverity"
def create(self, fields):
return OsloSeverityFilter()
class SystemdSeverityFilter(object):
'''Match systemd DEBUG level logs
A line to match looks like:
Aug 15 18:58:49.910786 hostname devstack@keystone.service[31400]:
DEBUG uwsgi ...
'''
SYSTEMDDATE = r'\w+\s+\d+\s+\d{2}:\d{2}:\d{2}((\.|\,)\d{3,6})?'
DATEFMT = r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}((\.|\,)\d{3,6})?'
SEVERITYFMT = '(DEBUG|INFO|WARNING|ERROR|TRACE|AUDIT|CRITICAL)'
SYSTEMD_LOGMATCH = r'^(?P<date>%s)( (\S+) \S+\[\d+\]\: ' \
'(?P<severity>%s)?.*)' % (SYSTEMDDATE, SEVERITYFMT)
SYSTEMDRE = re.compile(SYSTEMD_LOGMATCH)
def process(self, data):
msg = data['message']
m = self.SYSTEMDRE.match(msg)
if m:
if m.group('severity') == 'DEBUG':
return False
return True
def close(self):
pass
class SystemdSeverityFilterFactory(object):
name = "SystemdSeverity"
def create(self, fields):
return SystemdSeverityFilter()
class LogRetriever(threading.Thread):
def __init__(self, gearman_worker, filters, logq,
log_cert_verify, log_ca_certs, mqtt=None):
threading.Thread.__init__(self)
self.gearman_worker = gearman_worker
self.filters = filters
self.logq = logq
self.mqtt = mqtt
self.log_cert_verify = log_cert_verify
self.log_ca_certs = log_ca_certs
def run(self):
while True:
try:
self._handle_event()
except Exception:
logging.exception("Exception retrieving log event.")
def _handle_event(self):
fields = {}
num_log_lines = 0
source_url = ''
http_session = None
job = self.gearman_worker.getJob()
try:
arguments = json.loads(job.arguments.decode('utf-8'))
source_url = arguments['source_url']
event = arguments['event']
logging.debug("Handling event: " + json.dumps(event))
fields = event.get('fields') or event.get('@fields')
tags = event.get('tags') or event.get('@tags')
if fields['build_status'] != 'ABORTED':
# Handle events ignoring aborted builds. These builds are
# discarded by zuul.
file_obj, http_session = self._open_log_file_url(source_url)
try:
all_filters = []
for f in self.filters:
logging.debug("Adding filter: %s" % f.name)
all_filters.append(f.create(fields))
filters = all_filters
base_event = {}
base_event.update(fields)
base_event["tags"] = tags
for line in self._retrieve_log_line(file_obj):
keep_line = True
out_event = base_event.copy()
out_event["message"] = line
new_filters = []
for f in filters:
if not keep_line:
new_filters.append(f)
continue
try:
keep_line = f.process(out_event)
new_filters.append(f)
except FilterException:
logging.exception("Exception filtering event: "
"%s" % line.encode("utf-8"))
filters = new_filters
if keep_line:
self.logq.put(out_event)
num_log_lines += 1
logging.debug("Pushed %s log lines." % num_log_lines)
finally:
for f in all_filters:
f.close()
if http_session:
http_session.close()
job.sendWorkComplete()
# Only send mqtt events for log files we processed.
if self.mqtt and num_log_lines:
msg = json.dumps({
'build_uuid': fields.get('build_uuid'),
'source_url': source_url,
'status': 'success',
})
self.mqtt.publish_single(msg, fields.get('project'),
fields.get('build_change'),
'retrieve_logs',
fields.get('build_queue'))
except Exception as e:
logging.exception("Exception handling log event.")
job.sendWorkException(str(e).encode('utf-8'))
if self.mqtt:
msg = json.dumps({
'build_uuid': fields.get('build_uuid'),
'source_url': source_url,
'status': 'failure',
})
self.mqtt.publish_single(msg, fields.get('project'),
fields.get('build_change'),
'retrieve_logs',
fields.get('build_queue'))
def _retrieve_log_line(self, file_obj, chunk_size=4096):
if not file_obj:
return
# Response.iter_lines automatically decodes 'gzip' and 'deflate'
# encodings.
# https://requests.readthedocs.io/en/master/user/quickstart/#raw-response-content
for line in file_obj.iter_lines(chunk_size, decode_unicode=True):
yield line
def _open_log_file_url(self, source_url):
file_obj = None
kwargs = {}
if self.log_cert_verify and self.log_ca_certs:
kwargs['verify'] = self.log_ca_certs
elif not self.log_cert_verify:
kwargs['verify'] = self.log_cert_verify
try:
logging.debug("Retrieving: " + source_url)
# Use a session to persist the HTTP connection across requests
# while downloading chunks of the log file.
session = requests.Session()
session.headers = {'Accept-encoding': 'deflate, gzip'}
file_obj = session.get(source_url, stream=True, **kwargs)
file_obj.raise_for_status()
except requests.HTTPError as e:
if e.response.status_code == 404:
logging.info("Unable to retrieve %s: HTTP error 404" %
source_url)
else:
logging.exception("Unable to get log data.")
except Exception:
# Silently drop fatal errors when retrieving logs.
# TODO(clarkb): Handle these errors.
# Perhaps simply add a log message to file_obj?
logging.exception("Unable to retrieve source file.")
raise
return file_obj, session
class StdOutLogProcessor(object):
def __init__(self, logq, pretty_print=False):
self.logq = logq
self.pretty_print = pretty_print
def handle_log_event(self):
log = self.logq.get()
if self.pretty_print:
print(json.dumps(log, sort_keys=True,
indent=4, separators=(',', ': ')))
else:
print(json.dumps(log))
# Push each log event through to keep logstash up to date.
sys.stdout.flush()
class INETLogProcessor(object):
socket_type = None
def __init__(self, logq, host, port):
self.logq = logq
self.host = host
self.port = port
self.socket = None
def _connect_socket(self):
logging.debug("Creating socket.")
self.socket = socket.socket(socket.AF_INET, self.socket_type)
self.socket.connect((self.host, self.port))
def handle_log_event(self):
log = self.logq.get()
try:
if self.socket is None:
self._connect_socket()
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
except Exception:
logging.exception("Exception sending INET event.")
# Logstash seems to take about a minute to start again. Wait 90
# seconds before attempting to reconnect. If logstash is not
# available after 90 seconds we will throw another exception and
# die.
semi_busy_wait(90)
self._connect_socket()
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
class UDPLogProcessor(INETLogProcessor):
socket_type = socket.SOCK_DGRAM
class TCPLogProcessor(INETLogProcessor):
socket_type = socket.SOCK_STREAM
class PushMQTT(object):
def __init__(self, hostname, base_topic, port=1883, client_id=None,
keepalive=60, will=None, auth=None, tls=None, qos=0):
self.hostname = hostname
self.port = port
self.client_id = client_id
self.keepalive = 60
self.will = will
self.auth = auth
self.tls = tls
self.qos = qos
self.base_topic = base_topic
def _generate_topic(self, project, job_id, action):
return '/'.join([self.base_topic, project, job_id, action])
def publish_single(self, msg, project, job_id, action, build_queue=None):
if job_id:
topic = self._generate_topic(project, job_id, action)
elif build_queue:
topic = self._generate_topic(project, build_queue, action)
else:
topic = self.base_topic + '/' + project
publish.single(topic, msg, hostname=self.hostname,
port=self.port, client_id=self.client_id,
keepalive=self.keepalive, will=self.will,
auth=self.auth, tls=self.tls, qos=self.qos)
class Server(object):
def __init__(self, config, debuglog):
# Config init.
self.config = config
self.gearman_host = self.config['gearman-host']
self.gearman_port = self.config['gearman-port']
self.output_host = self.config['output-host']
self.output_port = self.config['output-port']
self.output_mode = self.config['output-mode']
mqtt_host = self.config.get('mqtt-host')
mqtt_port = self.config.get('mqtt-port', 1883)
mqtt_user = self.config.get('mqtt-user')
mqtt_pass = self.config.get('mqtt-pass')
mqtt_topic = self.config.get('mqtt-topic', 'gearman-subunit')
mqtt_ca_certs = self.config.get('mqtt-ca-certs')
mqtt_certfile = self.config.get('mqtt-certfile')
mqtt_keyfile = self.config.get('mqtt-keyfile')
self.log_ca_certs = self.config.get('log-ca-certs')
self.log_cert_verify = self.config.get('log-cert-verify', True)
# Pythong logging output file.
self.debuglog = debuglog
self.retriever = None
self.logqueue = queue.Queue(16384)
self.processor = None
self.filter_factories = []
# Run the severity filter first so it can filter out chatty
# logs.
self.filter_factories.append(OsloSeverityFilterFactory())
self.filter_factories.append(SystemdSeverityFilterFactory())
crmscript = self.config.get('crm114-script')
crmdata = self.config.get('crm114-data')
if crmscript and crmdata:
self.filter_factories.append(
CRM114FilterFactory(crmscript, crmdata))
# Setup MQTT
self.mqtt = None
if mqtt_host:
auth = None
if mqtt_user:
auth = {'username': mqtt_user}
if mqtt_pass:
auth['password'] = mqtt_pass
tls = None
if mqtt_ca_certs:
tls = {'ca_certs': mqtt_ca_certs,
'certfile': mqtt_certfile,
'keyfile': mqtt_keyfile}
self.mqtt = PushMQTT(mqtt_host, mqtt_topic, port=mqtt_port,
auth=auth, tls=tls)
def setup_logging(self):
if self.debuglog:
logging.basicConfig(format='%(asctime)s %(message)s',
filename=self.debuglog, level=logging.DEBUG)
else:
# Prevent leakage into the logstash log stream.
logging.basicConfig(level=logging.CRITICAL)
logging.debug("Log pusher starting.")
def wait_for_name_resolution(self, host, port):
while True:
try:
socket.getaddrinfo(host, port)
except socket.gaierror as e:
if e.errno == socket.EAI_AGAIN:
logging.debug("Temporary failure in name resolution")
time.sleep(2)
continue
else:
raise
break
def setup_retriever(self):
hostname = socket.gethostname()
gearman_worker = gear.Worker(hostname + '-pusher')
self.wait_for_name_resolution(self.gearman_host, self.gearman_port)
gearman_worker.addServer(self.gearman_host,
self.gearman_port)
gearman_worker.registerFunction(b'push-log')
self.retriever = LogRetriever(gearman_worker, self.filter_factories,
self.logqueue, self.log_cert_verify,
self.log_ca_certs, mqtt=self.mqtt)
def setup_processor(self):
if self.output_mode == "tcp":
self.processor = TCPLogProcessor(self.logqueue,
self.output_host,
self.output_port)
elif self.output_mode == "udp":
self.processor = UDPLogProcessor(self.logqueue,
self.output_host,
self.output_port)
else:
# Note this processor will not work if the process is run as a
# daemon. You must use the --foreground option.
self.processor = StdOutLogProcessor(self.logqueue)
def main(self):
self.setup_retriever()
self.setup_processor()
self.retriever.daemon = True
self.retriever.start()
while True:
try:
self.processor.handle_log_event()
except Exception:
logging.exception("Exception processing log event.")
raise
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", required=True,
help="Path to yaml config file.")
parser.add_argument("-d", "--debuglog",
help="Enable debug log. "
"Specifies file to write log to.")
parser.add_argument("--foreground", action='store_true',
help="Run in the foreground.")
parser.add_argument("-p", "--pidfile",
default="/var/run/jenkins-log-pusher/"
"jenkins-log-gearman-worker.pid",
help="PID file to lock during daemonization.")
args = parser.parse_args()
with open(args.config, 'r') as config_stream:
config = yaml.safe_load(config_stream)
server = Server(config, args.debuglog)
if args.foreground:
server.setup_logging()
server.main()
else:
pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10)
with daemon.DaemonContext(pidfile=pidfile):
server.setup_logging()
server.main()
if __name__ == '__main__':
main()

@ -1,6 +0,0 @@
pbr>=1.6 # Apache-2.0
gear<=0.16.0
requests<=2.26.0 # Apache-2.0
PyYAML<=6.0 # MIT
pyzmq<=21.0.2
paho-mqtt<=1.6.1

@ -1,26 +0,0 @@
[metadata]
name = loggearman
summary = OpenStack Log Processor Module
description_file =
README.rst
author = Openstack Contributors
author_email = service-discuss@lists.opendev.org
home_page = http://docs.openstack.org/infra/ci-log-processing
classifier =
Environment :: OpenStack
Intended Audience :: Information Technology
Intended Audience :: System Administrators
License :: OSI Approved :: Apache Software License
Operating System :: POSIX :: Linux
Programming Language :: Python
[build_sphinx]
all_files = 1
build-dir = doc/build
source-dir = doc/source
warning-is-error = 1
[entry_points]
console_scripts =
log-gearman-client = loggearman.client:main
log-gearman-worker = loggearman.worker:main

@ -1,20 +0,0 @@
# Copyright (C) 2021 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import setuptools
setuptools.setup(
setup_requires=['pbr'],
pbr=True
)

@ -17,6 +17,4 @@ monitoring_port: 9128
######################
# DEPRECATED OPTIONS #
######################
# gearman_server: localhost
# gearman_port: 4730
# logstash_url: https://localhost:9600

@ -15,9 +15,11 @@
# under the License.
"""
The goal is to push recent zuul builds into log gearman processor.
The goal is to take recent logs from Zuul CI job to the disk.
Below short view:
[ CLI ] -> [ Config ] -> [ ZuulFetcher ] -> [ LogPublisher ]
(logscraper) -> (logsender)
# Zuul builds results are not sorted by end time. Here is a problematic
@ -61,14 +63,11 @@ end_time.
import argparse
import configparser
import datetime
import gear
import itertools
import json
import logging
import multiprocessing
import os
import requests
import socket
import sqlite3
import sys
import tenacity
@ -129,8 +128,8 @@ def _verify_ca(args):
# CLI #
###############################################################################
def get_arguments():
parser = argparse.ArgumentParser(description="Fetch and push last Zuul "
"CI job logs into gearman.")
parser = argparse.ArgumentParser(description="Fetch last Zuul CI job "
"logs.")
parser.add_argument("--config", help="Logscraper config file",
required=True)
parser.add_argument("--file-list", help="File list to download. Parameter "
@ -141,19 +140,12 @@ def get_arguments():
parser.add_argument("--job-name", help="CI job name(s). Parameter can be "
"set multiple times. If not set it would scrape "
"every latest builds", nargs='+', default=[])
parser.add_argument("--gearman-server", help="Gearman host address")
parser.add_argument("--gearman-port", help="Gearman listen port.",
type=int)
parser.add_argument("--follow", help="Keep polling zuul builds",
action="store_true")
parser.add_argument("--insecure", help="Skip validating SSL cert",
action="store_true")
parser.add_argument("--checkpoint-file", help="File that will keep "
"information about last uuid timestamp for a job.")
parser.add_argument("--logstash-url", help="When provided, script will "
"check connection to Logstash service before sending "
"to log processing system. For example: "
"logstash.local:9999")
parser.add_argument("--workers", help="Worker processes for logscraper",
type=int)
parser.add_argument("--max-skipped", help="How many job results should be "
@ -162,9 +154,6 @@ def get_arguments():
type=int)
parser.add_argument("--debug", help="Print more information",
action="store_true")
parser.add_argument("--download", help="Download logs and do not send "
"to gearman service",
action="store_true")
parser.add_argument("--directory", help="Directory, where the logs will "
"be stored.")
parser.add_argument("--wait-time", help="Pause time for the next "
@ -317,24 +306,11 @@ class Monitoring:
###############################################################################
class LogMatcher(object):
def __init__(self, server, port, success, log_url, host_vars, config):
self.client = gear.Client()
self.client.addServer(server, port)
self.hosts = host_vars
self.success = success
self.log_url = log_url
self.config_file = config
def submitJobs(self, jobname, files, result):
self.client.waitForServer(90)
ret = []
for f in files:
output = self.makeOutput(f, result)
output = json.dumps(output).encode("utf8")
job = gear.TextJob(jobname, output)
self.client.submitJob(job, background=True)
ret.append(dict(handle=job.handle, arguments=output))
return ret
def makeOutput(self, file_object, result):
output = {}
output["retry"] = False
@ -680,7 +656,6 @@ def run_build(build):
"""
args = build.get("build_args")
config_file = build.get("config_file")
logging.info(
"Processing logs for %s | %s | %s | %s",
@ -690,64 +665,33 @@ def run_build(build):
build["uuid"],
)
if args.download:
logging.debug("Started fetching build logs")
directory = "%s/%s" % (args.directory, build["uuid"])
try:
if not os.path.exists(directory):
os.makedirs(directory)
except PermissionError:
logging.critical("Can not create directory %s" % directory)
except Exception as e:
logging.critical("Exception occurred %s on creating dir %s" % (
e, directory))
logging.debug("Started fetching build logs")
directory = "%s/%s" % (args.directory, build["uuid"])
try:
if not os.path.exists(directory):
os.makedirs(directory)
except PermissionError:
logging.critical("Can not create directory %s" % directory)
except Exception as e:
logging.critical("Exception occurred %s on creating dir %s" % (
e, directory))
validate_ca = _verify_ca(args)
validate_ca = _verify_ca(args)
if 'log_url' in build and build["log_url"]:
check_specified_files(build, validate_ca, args.timeout, directory)
else:
# NOTE: if build result does not contain 'log_url', so there is
# no result files to parse, but we would like to have that
# knowledge, so it will create own job-results.txt file that
# contains:
# build["end_time"] | build["result"]
logging.info("There is no log url for the build %s, so no file can"
" be downloaded. Creating custom job-results.txt " %
build["uuid"])
create_custom_result(build, directory)
save_build_info(directory, build)
if 'log_url' in build and build["log_url"]:
check_specified_files(build, validate_ca, args.timeout, directory)
else:
# NOTE: As it was earlier, logs that contains status other than
# "SUCCESS" or "FAILURE" will be parsed by Gearman service.
logging.debug("Parsing content for gearman service")
validate_ca = _verify_ca(args)
results = dict(files=[], jobs=[], invocation={})
files = check_specified_files(build, validate_ca, args.timeout)
# NOTE: if build result does not contain 'log_url', so there is
# no result files to parse, but we would like to have that
# knowledge, so it will create own job-results.txt file that
# contains:
# build["end_time"] | build["result"]
logging.info("There is no log url for the build %s, so no file can"
" be downloaded. Creating custom job-results.txt " %
build["uuid"])
create_custom_result(build, directory)
results["files"] = files
lmc = LogMatcher(
args.gearman_server,
args.gearman_port,
build["result"],
build["log_url"],
{},
config_file
)
lmc.submitJobs("push-log", results["files"], build)
def check_connection(logstash_url):
"""Return True when Logstash service is reachable
Check if service is up before pushing results.
"""
host, port = logstash_url.split(':')
logging.debug("Checking connection to %s on port %s" % (host, port))
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex((host, port)) == 0
save_build_info(directory, build)
def run_scraping(args, zuul_api_url, job_name=None, monitoring=None):
@ -772,11 +716,6 @@ def run_scraping(args, zuul_api_url, job_name=None, monitoring=None):
logging.info("Processing %d builds", len(builds))
if args.logstash_url and not check_connection(args.logstash_url):
logging.critical("Can not connect to logstash %s. "
"Is it up?" % args.logstash_url)
return
if builds:
pool = multiprocessing.Pool(int(args.workers))
try:
@ -824,10 +763,6 @@ def main():
monitoring = Monitoring()
start_http_server(args.monitoring_port)
if args.download and args.gearman_server and args.gearman_port:
logging.critical("Can not use logscraper to send logs to gearman "
"and download logs. Choose one")
sys.exit(1)
while True:
run(args, monitoring)

@ -15,7 +15,6 @@
# under the License.
import datetime
import json
import tempfile
from logscraper import logscraper
@ -144,8 +143,7 @@ class _MockedPoolMapAsyncResult:
class FakeArgs(object):
def __init__(self, zuul_api_url=None, gearman_server=None,
gearman_port=None, follow=False, insecure=False,
def __init__(self, zuul_api_url=None, follow=False, insecure=False,
checkpoint_file=None, ignore_checkpoint=None,
logstash_url=None, workers=None, max_skipped=None,
job_name=None, download=None, directory=None,
@ -154,8 +152,6 @@ class FakeArgs(object):
timeout=None):
self.zuul_api_url = zuul_api_url
self.gearman_server = gearman_server
self.gearman_port = gearman_port
self.follow = follow
self.insecure = insecure
self.checkpoint_file = checkpoint_file
@ -251,41 +247,17 @@ class TestScraper(base.TestCase):
zuul_api_url=['http://somehost.com/api/tenant/tenant1',
'http://somehost.com/api/tenant/tenant2',
'http://somehost.com/api/tenant/tenant3'],
gearman_server='localhost',
job_name=['testjob1', 'testjob2'])
args = logscraper.get_arguments()
logscraper.run(args, mock_monitoring)
self.assertEqual(2, mock_scraping.call_count)
@mock.patch('socket.socket')
def test_check_connection(self, mock_socket):
with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
mock_args.return_value = FakeArgs(
zuul_api_url='somehost.com',
gearman_server='localhost',
logstash_url='localhost:9999')
args = logscraper.get_arguments()
logscraper.check_connection(args.logstash_url)
self.assertTrue(mock_socket.called)
@mock.patch('socket.socket')
def test_check_connection_wrong_host(self, mock_socket):
with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
mock_args.return_value = FakeArgs(
zuul_api_url='somehost.com',
gearman_server='localhost',
logstash_url='localhost')
args = logscraper.get_arguments()
self.assertRaises(ValueError, logscraper.check_connection,
args.logstash_url)
@mock.patch('logscraper.logscraper.get_builds',
return_value=iter([{'_id': '1234'}]))
@mock.patch('argparse.ArgumentParser.parse_args')
def test_get_last_job_results(self, mock_args, mock_get_builds):
mock_args.return_value = FakeArgs(
zuul_api_url='http://somehost.com/api/tenant/sometenant',
gearman_server='localhost',
checkpoint_file='/tmp/testfile')
args = logscraper.get_arguments()
some_config = logscraper.Config(args, args.zuul_api_url)
@ -306,6 +278,7 @@ class TestScraper(base.TestCase):
'someuuid', None, 10)
self.assertRaises(ValueError, make_fake_list, job_result)
@mock.patch('sqlite3.connect', return_value=mock.MagicMock())
@mock.patch('logscraper.logscraper.load_config')
@mock.patch('logscraper.logscraper.save_build_info')
@mock.patch('logscraper.logscraper.check_specified_files')
@ -313,16 +286,13 @@ class TestScraper(base.TestCase):
@mock.patch('os.path.isfile')
@mock.patch('logscraper.logscraper.check_specified_files',
return_value=['job-output.txt'])
@mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
@mock.patch('argparse.ArgumentParser.parse_args',
return_value=FakeArgs(
zuul_api_url=['http://somehost.com/api/tenant/tenant1'],
gearman_server='localhost',
gearman_port=4731,
workers=1))
def test_run_scraping(self, mock_args, mock_submit, mock_files,
def test_run_scraping(self, mock_args, mock_files,
mock_isfile, mock_readfile, mock_specified_files,
mock_save_buildinfo, mock_config):
mock_save_buildinfo, mock_config, mock_sqlite):
with mock.patch('logscraper.logscraper.get_last_job_results'
) as mock_job_results:
with mock.patch('multiprocessing.pool.Pool.map_async',
@ -333,12 +303,9 @@ class TestScraper(base.TestCase):
mock_job_results.return_value = [builds_result[0]]
logscraper.run_scraping(
args, 'http://somehost.com/api/tenant/tenant1')
self.assertEqual(builds_result[0]['uuid'],
mock_submit.call_args.args[2]['uuid'])
self.assertTrue(mock_submit.called)
self.assertEqual(builds_result[0],
mock_specified_files.call_args.args[0])
self.assertFalse(mock_save_buildinfo.called)
self.assertTrue(mock_save_buildinfo.called)
@mock.patch('requests.get')
@mock.patch('logscraper.logscraper.Monitoring')
@ -351,11 +318,12 @@ class TestScraper(base.TestCase):
zuul_api_url=['http://somehost.com/api/tenant/tenant1',
'http://somehost.com/api/tenant/tenant2',
'http://somehost.com/api/tenant/tenant3'],
gearman_server='localhost')
)
args = logscraper.get_arguments()
logscraper.run(args, mock_monitoring)
self.assertEqual(3, mock_scraping.call_count)
@mock.patch('sqlite3.connect', return_value=mock.MagicMock())
@mock.patch('logscraper.logscraper.load_config')
@mock.patch('logscraper.logscraper.save_build_info')
@mock.patch('logscraper.logscraper.check_specified_files')
@ -363,15 +331,14 @@ class TestScraper(base.TestCase):
@mock.patch('os.path.isfile')
@mock.patch('logscraper.logscraper.check_specified_files',
return_value=['job-output.txt'])
@mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
@mock.patch('argparse.ArgumentParser.parse_args',
return_value=FakeArgs(
zuul_api_url=['http://somehost.com/api/tenant/tenant1'],
workers=1, download=True, directory="/tmp/testdir"))
def test_run_scraping_download(self, mock_args, mock_submit, mock_files,
def test_run_scraping_download(self, mock_args, mock_files,
mock_isfile, mock_readfile,
mock_specified_files, mock_save_buildinfo,
mock_config):
mock_config, mock_sqlite):
with mock.patch('logscraper.logscraper.get_last_job_results'
) as mock_job_results:
with mock.patch(
@ -385,12 +352,12 @@ class TestScraper(base.TestCase):
logscraper.run_scraping(
args, 'http://somehost.com/api/tenant/tenant1')
self.assertFalse(mock_submit.called)
self.assertTrue(mock_specified_files.called)
self.assertEqual(builds_result[0],
mock_specified_files.call_args.args[0])
self.assertTrue(mock_save_buildinfo.called)
@mock.patch('sqlite3.connect', return_value=mock.MagicMock())
@mock.patch('logscraper.logscraper.load_config')
@mock.patch('logscraper.logscraper.save_build_info')
@mock.patch('logscraper.logscraper.check_specified_files')
@ -398,15 +365,14 @@ class TestScraper(base.TestCase):
@mock.patch('os.path.isfile')
@mock.patch('logscraper.logscraper.check_specified_files',
return_value=['job-output.txt'])
@mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
@mock.patch('argparse.ArgumentParser.parse_args',
return_value=FakeArgs(
zuul_api_url=['http://somehost.com/api/tenant/tenant1'],
workers=1, download=True, directory="/tmp/testdir"))
def test_run_scraping_monitoring(self, mock_args, mock_submit, mock_files,
def test_run_scraping_monitoring(self, mock_args, mock_files,
mock_isfile, mock_readfile,
mock_specified_files, mock_save_buildinfo,
mock_config):
mock_config, mock_sqlite):
with mock.patch('logscraper.logscraper.get_last_job_results'
) as mock_job_results:
with mock.patch(
@ -424,7 +390,6 @@ class TestScraper(base.TestCase):
self.assertEqual('job_name', monitoring.job_count._labelnames[0])
self.assertEqual(2, len(monitoring.job_count._metrics))
self.assertFalse(mock_submit.called)
self.assertTrue(mock_specified_files.called)
self.assertEqual(builds_result[0],
mock_specified_files.call_args.args[0])
@ -432,14 +397,12 @@ class TestScraper(base.TestCase):
@mock.patch('logscraper.logscraper.create_custom_result')
@mock.patch('logscraper.logscraper.check_specified_files')
@mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
@mock.patch('gear.BaseClient.waitForServer')
@mock.patch('argparse.ArgumentParser.parse_args',
return_value=FakeArgs(
zuul_api_url=['http://somehost.com/api/tenant/tenant1'],
workers=1, download=True, directory="/tmp/testdir"))
def test_run_aborted_download(self, mock_args, mock_gear, mock_gear_client,
mock_check_files, mock_custom_result):
def test_run_aborted_download(self, mock_args, mock_check_files,
mock_custom_result):
# Take job result that log_url is empty.
result = builds_result[2]
result['files'] = ['job-output.txt']
@ -454,21 +417,17 @@ class TestScraper(base.TestCase):
logscraper.run_build(result)
logscraper.run_build(result_node_fail)
self.assertFalse(mock_gear_client.called)
self.assertFalse(mock_check_files.called)
self.assertTrue(mock_custom_result.called)
@mock.patch('logscraper.logscraper.create_custom_result')
@mock.patch('logscraper.logscraper.check_specified_files')
@mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
@mock.patch('gear.BaseClient.waitForServer')
@mock.patch('argparse.ArgumentParser.parse_args',
return_value=FakeArgs(
zuul_api_url=['http://somehost.com/api/tenant/tenant1'],
workers=1, gearman_server='localhost',
gearman_port='4731'))
def test_run_aborted(self, mock_args, mock_gear, mock_gear_client,
mock_check_files, mock_custom_result):
workers=1))
def test_run_aborted(self, mock_args, mock_check_files,
mock_custom_result):
# Take job result that build_status is "ABORTED" or "NODE_FAILURE"
result = builds_result[2]
result['files'] = ['job-output.txt']
@ -483,9 +442,8 @@ class TestScraper(base.TestCase):
logscraper.run_build(result)
logscraper.run_build(result_node_fail)
self.assertTrue(mock_gear_client.called)
self.assertTrue(mock_check_files.called)
self.assertFalse(mock_custom_result.called)
self.assertFalse(mock_check_files.called)
self.assertTrue(mock_custom_result.called)
@mock.patch('requests.get')
@mock.patch('logscraper.logscraper.Monitoring')
@ -499,7 +457,7 @@ class TestScraper(base.TestCase):
zuul_api_url=['http://somehost.com/api/tenant/tenant1',
'http://somehost.com/api/tenant/tenant2',
'http://somehost.com/api/tenant/tenant3'],
gearman_server='localhost')
)
args = logscraper.get_arguments()
logscraper.run(args, mock_monitoring)
@ -609,14 +567,15 @@ class TestScraper(base.TestCase):
class TestConfig(base.TestCase):
@mock.patch('sqlite3.connect', return_value=mock.MagicMock())
@mock.patch('logscraper.logscraper.load_config')
@mock.patch('sys.exit')
def test_config_object(self, mock_sys, mock_config):
def test_config_object(self, mock_sys, mock_config, mock_sqlite):
# Assume that url is wrong so it raise IndexError
with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
mock_args.return_value = FakeArgs(
zuul_api_url='somehost.com',
gearman_server='localhost')
)
args = logscraper.get_arguments()
self.assertRaises(IndexError, logscraper.Config, args,
args.zuul_api_url)
@ -624,7 +583,7 @@ class TestConfig(base.TestCase):
with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
mock_args.return_value = FakeArgs(
zuul_api_url='https://somehost.com',
gearman_server='localhost')
)
args = logscraper.get_arguments()
logscraper.Config(args, args.zuul_api_url)
mock_sys.assert_called()
@ -636,7 +595,6 @@ class TestConfig(base.TestCase):
# correct url without job name
mock_args.return_value = FakeArgs(
zuul_api_url='http://somehost.com/api/tenant/sometenant',
gearman_server='localhost',
checkpoint_file='/tmp/testfile')
args = logscraper.get_arguments()
some_config = logscraper.Config(args, args.zuul_api_url)
@ -656,161 +614,6 @@ class TestLogMatcher(base.TestCase):
}]
}
@mock.patch('logscraper.logscraper.load_config')
@mock.patch('gear.TextJob')
@mock.patch('gear.Client.submitJob')
@mock.patch('gear.BaseClient.waitForServer')
def test_submitJobs(self, mock_gear, mock_gear_client, mock_gear_job,
mock_load_config):
result = builds_result[0]
result['files'] = ['job-output.txt']
result['tenant'] = 'sometenant'
parsed_job = {
"build_branch": "master",
"build_change": 806255,
"build_duration": 307.0,
"build_name": "openstack-tox-py38",
"build_node": "zuul-executor",
"build_patchset": "9",
"build_queue": "check",
"build_ref": "refs/changes/55/806255/9",
"build_set": {"uuid": "bf11828235c649ff859ad87d7c4aa525"},
"build_status": "SUCCESS",
"build_uuid": "bf11828235c649ff859ad87d7c4aa525",
"build_zuul_url": "N/A",
"filename": "job-output.txt",
"log_url": "https://t.com/openstack/a0f8968/job-output.txt",
"node_provider": "local",
"project": "openstack/tempest",
"tenant": "sometenant",
"voting": 1}
expected_gear_job = {"retry": False, "event": {
"fields": parsed_job,
"tags": ["job-output.txt", "console", "console.html"]},
"source_url": "https://t.com/openstack/a0f8968/job-output.txt"}
mock_load_config.return_value = self.config_file
with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
mock_args.return_value = FakeArgs(
zuul_api_url='http://somehost.com/api/tenant/sometenant',
gearman_server='localhost',
gearman_port='4731')
args = logscraper.get_arguments()
lmc = logscraper.LogMatcher(args.gearman_server, args.gearman_port,
result['result'], result['log_url'],
{}, self.config_file)
lmc.submitJobs('push-log', result['files'], result)
mock_gear_client.assert_called_once()
self.assertEqual(
expected_gear_job,
json.loads(mock_gear_job.call_args.args[1].decode('utf-8'))
)
@mock.patch('logscraper.logscraper.load_config')
@mock.patch('gear.TextJob')
@mock.patch('gear.Client.submitJob')
@mock.patch('gear.BaseClient.waitForServer')
def test_submitJobs_failure(self, mock_gear, mock_gear_client,
mock_gear_job, mock_load_config):
# Take job result that build_status is "ABORTED"
result = builds_result[1]
result['files'] = ['job-output.txt']
result['tenant'] = 'sometenant'
parsed_job = {
'build_branch': 'master',
'build_change': 816445,
"build_duration": 603.0,
'build_name': 'tripleo-centos-8',
'build_node': 'zuul-executor',
'build_patchset': '1',
'build_queue': 'check',
'build_ref': 'refs/changes/45/816445/1',
'build_set': {'uuid': '4a0ffebe30a94efe819fffc03cf33ea4'},
'build_status': 'FAILURE',
'build_uuid': '4a0ffebe30a94efe819fffc03cf33ea4',
'build_zuul_url': 'N/A',
'filename': 'job-output.txt',
'log_url': 'https://t.com/tripleo-8/3982864/job-output.txt',
'node_provider': 'local',
'project': 'openstack/tripleo-ansible',
'tenant': 'sometenant',
'voting': 1}
expected_gear_job = {"retry": False, "event": {
"fields": parsed_job,
"tags": ["job-output.txt", "console", "console.html"]},
"source_url": "https://t.com/tripleo-8/3982864/job-output.txt"}
mock_load_config.return_value = self.config_file
with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
mock_args.return_value = FakeArgs(
zuul_api_url='http://somehost.com/api/tenant/sometenant',
gearman_server='localhost',
gearman_port='4731')
args = logscraper.get_arguments()
lmc = logscraper.LogMatcher(args.gearman_server, args.gearman_port,
result['result'], result['log_url'],
{}, self.config_file)
lmc.submitJobs('push-log', result['files'], result)
mock_gear_client.assert_called_once()
self.assertEqual(
expected_gear_job,
json.loads(mock_gear_job.call_args.args[1].decode('utf-8'))
)
@mock.patch('logscraper.logscraper.load_config')
@mock.patch('gear.TextJob')
@mock.patch('gear.Client.submitJob')
@mock.patch('gear.BaseClient.waitForServer')
def test_submitJobs_aborted(self, mock_gear, mock_gear_client,
mock_gear_job, mock_load_config):
# Take job result that build_status is "ABORTED"
result = builds_result[2]
result['files'] = ['job-output.txt']
result['tenant'] = 'sometenant'
parsed_job = {
'build_branch': 'stable/victoria',
'build_change': 816486,
'build_duration': 18,
'build_name': 'openstack-tox-lower-constraints',
'build_node': 'zuul-executor',
'build_patchset': '1',
'build_queue': 'check',
'build_ref': 'refs/changes/86/816486/1',
'build_set': {'uuid': 'bd044dfe3ecc484fbbf74fdeb7fb56aa'},
'build_status': 'FAILURE',
'build_uuid': 'bd044dfe3ecc484fbbf74fdeb7fb56aa',
'build_zuul_url': 'N/A',
'filename': 'job-output.txt',
'log_url': 'job-output.txt',
'node_provider': 'local',
'project': 'openstack/nova',
'tenant': 'sometenant',
'voting': 1}
# NOTE: normally ABORTED jobs does not provide log_url,
# so source_url will be just a file to iterate.
# In the logscraper, aborted jobs are just skipped.
expected_gear_job = {"retry": False, "event": {
"fields": parsed_job,
"tags": ["job-output.txt", "console", "console.html"]},
"source_url": "job-output.txt"}
mock_load_config.return_value = self.config_file
with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
mock_args.return_value = FakeArgs(
zuul_api_url='http://somehost.com/api/tenant/sometenant',
gearman_server='localhost',
gearman_port='4731')
args = logscraper.get_arguments()
lmc = logscraper.LogMatcher(args.gearman_server, args.gearman_port,
result['result'], result['log_url'],
{}, self.config_file)
lmc.submitJobs('push-log', result['files'], result)
mock_gear_client.assert_called_once()
self.assertEqual(
expected_gear_job,
json.loads(mock_gear_job.call_args.args[1].decode('utf-8'))
)
@mock.patch('builtins.open', new_callable=mock.mock_open())
@mock.patch('os.path.isfile')
@mock.patch('requests.get')

@ -36,8 +36,6 @@ build_args:
directory: /tmp/logscraper
download: true
follow: false
gearman_port: 4730
gearman_server: null
insecure: true
job_name: null
logstash_url: null

@ -1,5 +1,4 @@
pbr>=1.6 # Apache-2.0
gear<0.17
requests<2.27 # Apache-2.0
PyYAML<6.1 # MIT
tenacity