Remove log gearman functionality
The combination of logscraper and logsender is working for a long time in OpenDev infra and it never has any issue even during release time. Change-Id: I1be94eca5d842b02c558e5ffda85c23fa66e6759
This commit is contained in:
		
							
								
								
									
										20
									
								
								.zuul.yaml
									
									
									
									
									
								
							
							
						
						
									
										20
									
								
								.zuul.yaml
									
									
									
									
									
								
							@@ -1,13 +1,4 @@
 | 
			
		||||
---
 | 
			
		||||
- job:
 | 
			
		||||
    name: ci-log-processing-functional-test-centos-8-stream
 | 
			
		||||
    description: Test is validating ci log processing services
 | 
			
		||||
    run: ansible/playbooks/check-services.yml
 | 
			
		||||
    voting: false
 | 
			
		||||
    nodeset:
 | 
			
		||||
      nodes:
 | 
			
		||||
        - name: centos-8-stream
 | 
			
		||||
          label: centos-8-stream
 | 
			
		||||
- job:
 | 
			
		||||
    name: ci-log-processing-functional-test-centos-8-stream-sender
 | 
			
		||||
    description: Test is validating Logscraper and logsender services
 | 
			
		||||
@@ -38,7 +29,7 @@
 | 
			
		||||
- job:
 | 
			
		||||
    name: ci-log-processing-build-image
 | 
			
		||||
    parent: opendev-build-docker-image
 | 
			
		||||
    description: Build logscraper and loggearman images.
 | 
			
		||||
    description: Build logscraper
 | 
			
		||||
    allowed-projects: openstack/ci-log-processing
 | 
			
		||||
    timeout: 2700
 | 
			
		||||
    vars: &cilogprocessing_image_vars
 | 
			
		||||
@@ -48,11 +39,6 @@
 | 
			
		||||
          target: logscraper
 | 
			
		||||
          tags:
 | 
			
		||||
            &imagetag "{{ zuul.tag is defined | ternary([zuul.get('tag', '').split('.')[0], '.'.join(zuul.get('tag', '').split('.')[:2]), zuul.get('tag', '')], ['latest']) }}"
 | 
			
		||||
        - context: loggearman/
 | 
			
		||||
          repository: cilogprocessing/loggearman
 | 
			
		||||
          target: loggearman
 | 
			
		||||
          tags:
 | 
			
		||||
            *imagetag
 | 
			
		||||
 | 
			
		||||
- job:
 | 
			
		||||
    name: ci-log-processing-upload-image
 | 
			
		||||
@@ -69,8 +55,6 @@
 | 
			
		||||
        soft: true
 | 
			
		||||
      - name: openstack-tox-py38
 | 
			
		||||
        soft: true
 | 
			
		||||
      - name: ci-log-processing-functional-test-centos-8-stream
 | 
			
		||||
        soft: true
 | 
			
		||||
      - name: ci-log-processing-functional-test-centos-8-stream-sender
 | 
			
		||||
        soft: true
 | 
			
		||||
      - name: ci-log-processing-build-image
 | 
			
		||||
@@ -86,14 +70,12 @@
 | 
			
		||||
        - openstack-tox-pep8
 | 
			
		||||
        - openstack-tox-py38
 | 
			
		||||
        - ci-log-processing-build-image
 | 
			
		||||
        - ci-log-processing-functional-test-centos-8-stream
 | 
			
		||||
        - ci-log-processing-functional-test-centos-8-stream-sender
 | 
			
		||||
    gate:
 | 
			
		||||
      jobs:
 | 
			
		||||
        - openstack-tox-linters
 | 
			
		||||
        - openstack-tox-pep8
 | 
			
		||||
        - openstack-tox-py38
 | 
			
		||||
        - ci-log-processing-functional-test-centos-8-stream
 | 
			
		||||
        - ci-log-processing-functional-test-centos-8-stream-sender
 | 
			
		||||
        - ci-log-processing-build-image
 | 
			
		||||
    post:
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										71
									
								
								README.rst
									
									
									
									
									
								
							
							
						
						
									
										71
									
								
								README.rst
									
									
									
									
									
								
							@@ -31,77 +31,10 @@ Dashboards objects versioning.
 | 
			
		||||
Available workflows
 | 
			
		||||
-------------------
 | 
			
		||||
 | 
			
		||||
The Openstack CI Log Processing project is providing two configurations
 | 
			
		||||
The Openstack CI Log Processing project is provides configuration
 | 
			
		||||
for sending logs from Zuul CI to the Opensearch host.
 | 
			
		||||
 | 
			
		||||
1. Logscraper, log gearman client, log gearman worker, logstash
 | 
			
		||||
 | 
			
		||||
With this solution, log workflow looks like:
 | 
			
		||||
 | 
			
		||||
.. code-block:: shell
 | 
			
		||||
 | 
			
		||||
   +------------------+  1. Get last builds info  +----------------+
 | 
			
		||||
   |                  |-------------------------> |                |
 | 
			
		||||
   |    Logscraper    |                           |     Zuul API   |
 | 
			
		||||
   |                  |<------------------------- |                |
 | 
			
		||||
   +------------------+  2. Fetch data            +----------------+
 | 
			
		||||
            |
 | 
			
		||||
            |
 | 
			
		||||
            +-------------------+
 | 
			
		||||
                                |
 | 
			
		||||
                                |
 | 
			
		||||
                3. Send queue   |
 | 
			
		||||
                logs to gearman |
 | 
			
		||||
                client          |
 | 
			
		||||
                                v
 | 
			
		||||
                      +------------------+
 | 
			
		||||
                      |                  |
 | 
			
		||||
                      |  Log gearman     |
 | 
			
		||||
                      |     client       |
 | 
			
		||||
                      +------------------+
 | 
			
		||||
           +--------------           --------------+
 | 
			
		||||
           |                    |                  |
 | 
			
		||||
           |  4. Consume queue, |                  |
 | 
			
		||||
           | download log files |                  |
 | 
			
		||||
           |                    |                  |
 | 
			
		||||
           v                    v                  v
 | 
			
		||||
   +---------------+   +----------------+  +---------------+
 | 
			
		||||
   |  Log gearman  |   |   Log gearman  |  | Log gearman   |
 | 
			
		||||
   |  worker       |   |   worker       |  | worker        |
 | 
			
		||||
   +---------------+   +----------------+  +---------------+
 | 
			
		||||
          |                     |                  |
 | 
			
		||||
          |   5. Send to        |                  |
 | 
			
		||||
          |      Logstash       |                  |
 | 
			
		||||
          |                     v                  |
 | 
			
		||||
          |            +----------------+          |
 | 
			
		||||
          |            |                |          |
 | 
			
		||||
          +--------->  |    Logstash    |  <-------+
 | 
			
		||||
                       |                |
 | 
			
		||||
                       +----------------+
 | 
			
		||||
                               |
 | 
			
		||||
                 6. Send to    |
 | 
			
		||||
                    Opensearch |
 | 
			
		||||
                               |
 | 
			
		||||
                      +--------v--------+
 | 
			
		||||
                      |                 |
 | 
			
		||||
                      |    Opensearch   |
 | 
			
		||||
                      |                 |
 | 
			
		||||
                      +-----------------+
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
On the beginning, this project was designed to use that solution, but
 | 
			
		||||
it have a few bottlenecks:
 | 
			
		||||
- log gearman client can use many memory, when log gearman worker is not fast,
 | 
			
		||||
- one log gearman worker is not enough even on small infrastructure,
 | 
			
		||||
- logstash service can fail,
 | 
			
		||||
- logscraper is checking if log files are available, then log gearman
 | 
			
		||||
  is downloading the logs, which can make an issue on log sever, that
 | 
			
		||||
  host does not have free socket.
 | 
			
		||||
 | 
			
		||||
You can deploy your log workflow by using example Ansible playbook that
 | 
			
		||||
you can find in `ansible/playbooks/check-services.yml` in this project.
 | 
			
		||||
 | 
			
		||||
2. Logscraper, logsender
 | 
			
		||||
Logscraper, logsender
 | 
			
		||||
 | 
			
		||||
This workflow removes bottlenecks by removing: log gearman client,
 | 
			
		||||
log gearman worker and logstash service. Logs are downloaded when
 | 
			
		||||
 
 | 
			
		||||
@@ -1,33 +0,0 @@
 | 
			
		||||
---
 | 
			
		||||
- hosts: all
 | 
			
		||||
  become: true
 | 
			
		||||
  vars:
 | 
			
		||||
    # loggearman - worker
 | 
			
		||||
    output_host: 0.0.0.0
 | 
			
		||||
    output_port: 9999
 | 
			
		||||
    gearman_host: 0.0.0.0
 | 
			
		||||
    gearman_port: 4730
 | 
			
		||||
    log_cert_verify: false
 | 
			
		||||
    # loggearman - client
 | 
			
		||||
    source_url: ""
 | 
			
		||||
    gearman_client_host: "{{ gearman_host }}"
 | 
			
		||||
    gearman_client_port: "{{ gearman_port }}"
 | 
			
		||||
    # logscraper
 | 
			
		||||
    tenant_builds:
 | 
			
		||||
      - tenant: openstack
 | 
			
		||||
        gearman_port: "{{ gearman_port }}"
 | 
			
		||||
        gearman_server: "{{ gearman_host }}"
 | 
			
		||||
        zuul_api_url:
 | 
			
		||||
          - https://zuul.opendev.org/api/tenant/openstack
 | 
			
		||||
        insecure: false
 | 
			
		||||
        job_names: []
 | 
			
		||||
        download: false
 | 
			
		||||
  pre_tasks:
 | 
			
		||||
    - name: Update all packages
 | 
			
		||||
      become: true
 | 
			
		||||
      package:
 | 
			
		||||
        name: "*"
 | 
			
		||||
        state: latest
 | 
			
		||||
  roles:
 | 
			
		||||
    - check-services
 | 
			
		||||
    - backup-dashboards-objects
 | 
			
		||||
@@ -1,6 +0,0 @@
 | 
			
		||||
---
 | 
			
		||||
- name: Configure log-gearman-client and log-gearman-worker tools
 | 
			
		||||
  hosts: logscraper01.openstack.org
 | 
			
		||||
  become: true
 | 
			
		||||
  roles:
 | 
			
		||||
    - loggearman
 | 
			
		||||
@@ -26,25 +26,6 @@
 | 
			
		||||
        podman images --noheading quay.io/logscraper:dev  | awk '{print $3}'
 | 
			
		||||
      register: _logscraper_image_id
 | 
			
		||||
 | 
			
		||||
    - name: Build logscraper container image
 | 
			
		||||
      shell: >
 | 
			
		||||
        podman build -t quay.io/loggearman:dev -f loggearman/Dockerfile
 | 
			
		||||
      args:
 | 
			
		||||
        chdir: "{{ zuul.projects['opendev.org/openstack/ci-log-processing'].src_dir }}"
 | 
			
		||||
      when: zuul is defined
 | 
			
		||||
 | 
			
		||||
    - name: Build loggearman container image - non Zuul
 | 
			
		||||
      shell: >
 | 
			
		||||
        podman build -t quay.io/loggearman:dev -f loggearman/Dockerfile
 | 
			
		||||
      args:
 | 
			
		||||
        chdir: "{{ playbook_dir }}"
 | 
			
		||||
      when: zuul is not defined
 | 
			
		||||
 | 
			
		||||
    - name: Get loggearman image id
 | 
			
		||||
      shell: |
 | 
			
		||||
        podman images --noheading quay.io/loggearman:dev  | awk '{print $3}'
 | 
			
		||||
      register: _loggearman_image_id
 | 
			
		||||
 | 
			
		||||
    - name: Print all images
 | 
			
		||||
      shell: |
 | 
			
		||||
        podman images
 | 
			
		||||
@@ -53,7 +34,6 @@
 | 
			
		||||
      set_fact:
 | 
			
		||||
        container_images:
 | 
			
		||||
          logscraper: "{{ _logscraper_image_id.stdout }}"
 | 
			
		||||
          loggearman: "{{ _loggearman_image_id.stdout }}"
 | 
			
		||||
 | 
			
		||||
### OPENSEARCH ####
 | 
			
		||||
- name: Setup Opensearch
 | 
			
		||||
@@ -129,23 +109,6 @@
 | 
			
		||||
    delay: 10
 | 
			
		||||
    timeout: 300
 | 
			
		||||
 | 
			
		||||
### Loggearman ###
 | 
			
		||||
- name: Setup loggearman service
 | 
			
		||||
  include_role:
 | 
			
		||||
    name: loggearman
 | 
			
		||||
 | 
			
		||||
# Flush handlers before running test
 | 
			
		||||
- name: Force all notified handlers to run now
 | 
			
		||||
  meta: flush_handlers
 | 
			
		||||
 | 
			
		||||
### service validation ###
 | 
			
		||||
- name: Check if log gearman client is listening
 | 
			
		||||
  wait_for:
 | 
			
		||||
    host: "{{ gearman_host }}"
 | 
			
		||||
    port: "{{ gearman_port }}"
 | 
			
		||||
    delay: 10
 | 
			
		||||
    timeout: 300
 | 
			
		||||
 | 
			
		||||
### Logscraper ###
 | 
			
		||||
- name: Setup logscraper service
 | 
			
		||||
  include_role:
 | 
			
		||||
@@ -156,8 +119,6 @@
 | 
			
		||||
    systemctl is-active -q {{ item }}
 | 
			
		||||
  loop:
 | 
			
		||||
    - logscraper-openstack
 | 
			
		||||
    - loggearman-client
 | 
			
		||||
    - loggearman-worker
 | 
			
		||||
  register: _service_status
 | 
			
		||||
  failed_when: _service_status.rc != 0
 | 
			
		||||
 | 
			
		||||
@@ -191,14 +152,6 @@
 | 
			
		||||
      shell: |
 | 
			
		||||
        podman logs opensearch
 | 
			
		||||
 | 
			
		||||
    - name: Get gearman client logs
 | 
			
		||||
      shell: |
 | 
			
		||||
        podman logs loggearman-client
 | 
			
		||||
 | 
			
		||||
    - name: Get gearman worker logs
 | 
			
		||||
      shell: |
 | 
			
		||||
        podman logs loggearman-worker
 | 
			
		||||
 | 
			
		||||
    - name: Get indices to fail the test
 | 
			
		||||
      uri:
 | 
			
		||||
        url: "https://127.0.0.1:9200/_cat/indices"
 | 
			
		||||
 
 | 
			
		||||
@@ -1,32 +0,0 @@
 | 
			
		||||
Loggearman ansible role
 | 
			
		||||
=======================
 | 
			
		||||
 | 
			
		||||
The goal of this role is to setup and configure service related
 | 
			
		||||
to `log-gearman-client` and `log-gearman-worker` scripts, that
 | 
			
		||||
were ported to this project repository from `puppet-log_processor repository
 | 
			
		||||
<https://opendev.org/opendev/puppet-log_processor/src/branch/master/files>`__.
 | 
			
		||||
 | 
			
		||||
Configuration
 | 
			
		||||
-------------
 | 
			
		||||
 | 
			
		||||
The role is automatically deploying services:
 | 
			
		||||
 | 
			
		||||
* log-gearman-client
 | 
			
		||||
* log-gearman-worker
 | 
			
		||||
 | 
			
		||||
inside the container.
 | 
			
		||||
 | 
			
		||||
Example playbook setup
 | 
			
		||||
----------------------
 | 
			
		||||
 | 
			
		||||
.. code-block:: yaml
 | 
			
		||||
 | 
			
		||||
   - name: Configure loggearman tool
 | 
			
		||||
     hosts: localhost
 | 
			
		||||
     become: true
 | 
			
		||||
     vars:
 | 
			
		||||
       source_url: https://localhost
 | 
			
		||||
       output_hosts: mylogstashhost.com
 | 
			
		||||
       log_cert_verify: True
 | 
			
		||||
     roles:
 | 
			
		||||
       - loggearman
 | 
			
		||||
@@ -1,31 +0,0 @@
 | 
			
		||||
---
 | 
			
		||||
loggearman_user: loggearman
 | 
			
		||||
loggearman_group: loggearman
 | 
			
		||||
loggearman_gid: 10211
 | 
			
		||||
loggearman_uid: 10211
 | 
			
		||||
 | 
			
		||||
loggearman_dir: /etc/loggearman
 | 
			
		||||
loggearman_log_dir: /var/log/loggearman
 | 
			
		||||
 | 
			
		||||
container_images:
 | 
			
		||||
  # FIXME: Move image to dedicated repository on Docker hub.
 | 
			
		||||
  loggearman: quay.io/software-factory/loggearman:latest
 | 
			
		||||
 | 
			
		||||
# Gearman client
 | 
			
		||||
gearman_client_host: 0.0.0.0
 | 
			
		||||
gearman_client_port: 4730
 | 
			
		||||
source_url: ""
 | 
			
		||||
zmq_publishers: []
 | 
			
		||||
subunit_files: []
 | 
			
		||||
source_files: []
 | 
			
		||||
 | 
			
		||||
# Gearman worker
 | 
			
		||||
gearman_host: 0.0.0.0
 | 
			
		||||
gearman_port: 4730
 | 
			
		||||
output_host: logstash.example.com
 | 
			
		||||
output_port: 9999
 | 
			
		||||
output_mode: tcp
 | 
			
		||||
crm114_script: ""
 | 
			
		||||
crm114_data: ""
 | 
			
		||||
log_ca_certs: ""
 | 
			
		||||
log_cert_verify: true
 | 
			
		||||
@@ -1,14 +0,0 @@
 | 
			
		||||
---
 | 
			
		||||
- name: restart loggearman client
 | 
			
		||||
  service:
 | 
			
		||||
    name: loggearman-client
 | 
			
		||||
    state: restarted
 | 
			
		||||
    daemon-reload: true
 | 
			
		||||
    enabled: true
 | 
			
		||||
 | 
			
		||||
- name: restart loggearman worker
 | 
			
		||||
  service:
 | 
			
		||||
    name: loggearman-worker
 | 
			
		||||
    state: restarted
 | 
			
		||||
    daemon-reload: true
 | 
			
		||||
    enabled: true
 | 
			
		||||
@@ -1,62 +0,0 @@
 | 
			
		||||
---
 | 
			
		||||
- name: Create decidated group
 | 
			
		||||
  group:
 | 
			
		||||
    name: "{{ loggearman_group }}"
 | 
			
		||||
    gid: "{{ loggearman_gid }}"
 | 
			
		||||
    state: present
 | 
			
		||||
 | 
			
		||||
- name: Create dedicated user
 | 
			
		||||
  user:
 | 
			
		||||
    name: "{{ loggearman_user }}"
 | 
			
		||||
    state: present
 | 
			
		||||
    comment: "Dedicated user for loggearman"
 | 
			
		||||
    group: "{{ loggearman_group }}"
 | 
			
		||||
    uid: "{{ loggearman_uid }}"
 | 
			
		||||
    shell: "/sbin/nologin"
 | 
			
		||||
    create_home: false
 | 
			
		||||
 | 
			
		||||
- name: Create dedicated directories
 | 
			
		||||
  file:
 | 
			
		||||
    path: "{{ item }}"
 | 
			
		||||
    state: directory
 | 
			
		||||
    owner: "{{ loggearman_user }}"
 | 
			
		||||
    group: "{{ loggearman_group }}"
 | 
			
		||||
    mode: "0755"
 | 
			
		||||
  loop:
 | 
			
		||||
    - "{{ loggearman_dir }}"
 | 
			
		||||
    - "{{ loggearman_log_dir }}"
 | 
			
		||||
 | 
			
		||||
- name: Init log files
 | 
			
		||||
  file:
 | 
			
		||||
    path: "{{ loggearman_log_dir }}/{{ item }}.log"
 | 
			
		||||
    state: touch
 | 
			
		||||
    owner: "{{ loggearman_user }}"
 | 
			
		||||
    group: "{{ loggearman_group }}"
 | 
			
		||||
    mode: "0644"
 | 
			
		||||
  loop:
 | 
			
		||||
    - client
 | 
			
		||||
    - worker
 | 
			
		||||
 | 
			
		||||
- name: Ensure container software is installed
 | 
			
		||||
  package:
 | 
			
		||||
    name: podman
 | 
			
		||||
    state: present
 | 
			
		||||
 | 
			
		||||
- name: Create configuration files
 | 
			
		||||
  template:
 | 
			
		||||
    src: "{{ item }}.yml.j2"
 | 
			
		||||
    dest: "{{ loggearman_dir }}/{{ item }}.yml"
 | 
			
		||||
    owner: "{{ loggearman_user }}"
 | 
			
		||||
    group: "{{ loggearman_group }}"
 | 
			
		||||
    mode: "0644"
 | 
			
		||||
  loop:
 | 
			
		||||
    - client
 | 
			
		||||
    - worker
 | 
			
		||||
  notify:
 | 
			
		||||
    - restart loggearman {{ item }}
 | 
			
		||||
 | 
			
		||||
- name: Configure loggearman service
 | 
			
		||||
  include_tasks: service.yml
 | 
			
		||||
  loop:
 | 
			
		||||
    - client
 | 
			
		||||
    - worker
 | 
			
		||||
@@ -1,17 +0,0 @@
 | 
			
		||||
---
 | 
			
		||||
- name: Generate podman-loggearman-{{ item }} script
 | 
			
		||||
  template:
 | 
			
		||||
    src: loggearman.sh.j2
 | 
			
		||||
    dest: "/usr/local/bin/podman-loggearman-{{ item }}"
 | 
			
		||||
    mode: '0755'
 | 
			
		||||
  notify:
 | 
			
		||||
    - restart loggearman {{ item }}
 | 
			
		||||
 | 
			
		||||
- name: Generate systemd unit loggearman-{{ item }}
 | 
			
		||||
  template:
 | 
			
		||||
    src: loggearman.service.j2
 | 
			
		||||
    dest: "/etc/systemd/system/loggearman-{{ item }}.service"
 | 
			
		||||
    owner: root
 | 
			
		||||
    group: root
 | 
			
		||||
  notify:
 | 
			
		||||
    - restart loggearman {{ item }}
 | 
			
		||||
@@ -1,7 +0,0 @@
 | 
			
		||||
---
 | 
			
		||||
gearman-host: {{ gearman_client_host }}
 | 
			
		||||
gearman-port: {{ gearman_client_port }}
 | 
			
		||||
source-url: {{ source_url }}
 | 
			
		||||
zmq-publishers: {{ zmq_publishers }}
 | 
			
		||||
subunit-files: {{ subunit_files }}
 | 
			
		||||
source-files: {{ source_files }}
 | 
			
		||||
@@ -1,16 +0,0 @@
 | 
			
		||||
[Unit]
 | 
			
		||||
Description=loggearman {{ item }} service
 | 
			
		||||
After=syslog.target network.target
 | 
			
		||||
StartLimitInterval=20
 | 
			
		||||
StartLimitBurst=5
 | 
			
		||||
 | 
			
		||||
[Service]
 | 
			
		||||
Type=simple
 | 
			
		||||
SyslogIdentifier=loggearman-{{ item }}
 | 
			
		||||
ExecStart=/usr/local/bin/podman-loggearman-{{ item }}
 | 
			
		||||
ExecStop=/usr/bin/podman stop loggearman-{{ item }}
 | 
			
		||||
Restart=always
 | 
			
		||||
RestartSec=5s
 | 
			
		||||
 | 
			
		||||
[Install]
 | 
			
		||||
WantedBy=multi-user.target
 | 
			
		||||
@@ -1,17 +0,0 @@
 | 
			
		||||
#!/bin/bash
 | 
			
		||||
 | 
			
		||||
# MANAGED BY ANSIBLE
 | 
			
		||||
/usr/bin/podman run \
 | 
			
		||||
    --network host \
 | 
			
		||||
    --rm \
 | 
			
		||||
    --user 1000:1000 \
 | 
			
		||||
    --uidmap 0:{{ loggearman_uid + 1 }}:999 \
 | 
			
		||||
    --uidmap 1000:{{ loggearman_uid }}:1 \
 | 
			
		||||
    --name loggearman-{{ item }} \
 | 
			
		||||
    --volume {{ loggearman_dir }}:{{ loggearman_dir }}:z \
 | 
			
		||||
    --volume {{ loggearman_log_dir }}:{{ loggearman_log_dir }}:z \
 | 
			
		||||
    {{ container_images['loggearman'] }} \
 | 
			
		||||
    log-gearman-{{ item }} \
 | 
			
		||||
    -c {{ loggearman_dir }}/{{ item }}.yml \
 | 
			
		||||
    --foreground \
 | 
			
		||||
    -d {{ loggearman_log_dir }}/{{ item }}.log
 | 
			
		||||
@@ -1,16 +0,0 @@
 | 
			
		||||
---
 | 
			
		||||
gearman-host: {{ gearman_host }}
 | 
			
		||||
gearman-port: {{ gearman_port }}
 | 
			
		||||
output-host: {{ output_host }}
 | 
			
		||||
output-port: {{ output_port }}
 | 
			
		||||
output-mode: {{ output_mode }}
 | 
			
		||||
log-cert-verify: {{ log_cert_verify }}
 | 
			
		||||
{% if crm114_script %}
 | 
			
		||||
crm114-script: {{ crm114_script }}
 | 
			
		||||
{% endif %}
 | 
			
		||||
{% if crm114_data %}
 | 
			
		||||
crm114-data: {{ crm114_data }}
 | 
			
		||||
{% endif %}
 | 
			
		||||
{% if log_ca_certs %}
 | 
			
		||||
log-ca-certs: {{ log_ca_certs }}
 | 
			
		||||
{% endif %}
 | 
			
		||||
@@ -2,8 +2,8 @@ Logscraper ansible role
 | 
			
		||||
=======================
 | 
			
		||||
 | 
			
		||||
The goal of this role is to setup and configure service related
 | 
			
		||||
to logscraper script which is responsible to to push recent
 | 
			
		||||
zuul builds into log gearman processor.
 | 
			
		||||
to logscraper script which is responsible to pull latest Zuul CI job
 | 
			
		||||
logs to local storage.
 | 
			
		||||
 | 
			
		||||
Requirements
 | 
			
		||||
------------
 | 
			
		||||
@@ -23,8 +23,6 @@ for example:
 | 
			
		||||
  vars:
 | 
			
		||||
    tenant_builds:
 | 
			
		||||
      - tenant: openstack
 | 
			
		||||
        gearman_port: 4731
 | 
			
		||||
        gearman_server: logstash.openstack.org
 | 
			
		||||
        zuul_api_url:
 | 
			
		||||
          - https://zuul.opendev.org/api/tenant/openstack
 | 
			
		||||
        insecure: false
 | 
			
		||||
@@ -57,8 +55,6 @@ and second one for getting logs from `sometenant` tenant.
 | 
			
		||||
    vars:
 | 
			
		||||
      tenant_builds:
 | 
			
		||||
        - tenant: openstack
 | 
			
		||||
          gearman_port: 4731
 | 
			
		||||
          gearman_server: logstash.openstack.org
 | 
			
		||||
          zuul_api_url:
 | 
			
		||||
            - https://zuul.opendev.org/api/tenant/openstack
 | 
			
		||||
          insecure: False
 | 
			
		||||
@@ -66,7 +62,6 @@ and second one for getting logs from `sometenant` tenant.
 | 
			
		||||
          zuul_api_url:
 | 
			
		||||
            - https://zuul.opendev.org/api/tenant/sometenant
 | 
			
		||||
          insecure: True
 | 
			
		||||
          download: true
 | 
			
		||||
          download_dir: /mnt/logscraper
 | 
			
		||||
          file_list:
 | 
			
		||||
            - /etc/logscraper/my-downloadlist.yaml
 | 
			
		||||
 
 | 
			
		||||
@@ -12,8 +12,6 @@ container_images:
 | 
			
		||||
# Example:
 | 
			
		||||
# tenant_builds:
 | 
			
		||||
#   - tenant: openstack
 | 
			
		||||
#     gearman_port: 4731
 | 
			
		||||
#     gearman_server: logstash.openstack.org
 | 
			
		||||
#     zuul_api_url:
 | 
			
		||||
#       - https://zuul.opendev.org/api/tenant/openstack
 | 
			
		||||
#     insecure: false
 | 
			
		||||
@@ -22,13 +20,11 @@ container_images:
 | 
			
		||||
#     zuul_api_url:
 | 
			
		||||
#       - https://zuul.opendev.org/api/tenant/sometenant
 | 
			
		||||
#     insecure: true
 | 
			
		||||
#     download: true
 | 
			
		||||
#     download_dir: /mnt/logscraper/sometenant
 | 
			
		||||
#     file_list: []
 | 
			
		||||
#     job_name:
 | 
			
		||||
#       - test
 | 
			
		||||
#       - test_new
 | 
			
		||||
#     logstash_url: https://somelogstash.com:9999
 | 
			
		||||
#     max_skipped: 100
 | 
			
		||||
#     debug: true
 | 
			
		||||
#     logscraper_wait_time: 120
 | 
			
		||||
 
 | 
			
		||||
@@ -35,7 +35,7 @@
 | 
			
		||||
    dest: "{{ logscraper_dir }}/download-list-{{ item.tenant }}.yaml"
 | 
			
		||||
    owner: "{{ logscraper_user }}"
 | 
			
		||||
    group: "{{ logscraper_group }}"
 | 
			
		||||
    mode: "0640"
 | 
			
		||||
    mode: "0644"
 | 
			
		||||
  register: _download_file
 | 
			
		||||
 | 
			
		||||
- name: Generate systemd unit
 | 
			
		||||
 
 | 
			
		||||
@@ -18,4 +18,4 @@
 | 
			
		||||
    --volume {{ item.download_dir }}:{{ item.download_dir }}:z \
 | 
			
		||||
    {% endif %}
 | 
			
		||||
    {{ container_images['logscraper'] }} \
 | 
			
		||||
    /usr/bin/logscraper --config {{ logscraper_dir }}/logscraper-{{ item['tenant'] }}.config
 | 
			
		||||
    /usr/local/bin/logscraper --config {{ logscraper_dir }}/logscraper-{{ item['tenant'] }}.config
 | 
			
		||||
 
 | 
			
		||||
@@ -16,4 +16,4 @@
 | 
			
		||||
    --volume {{ item['logsender_custom_ca_crt'] }}:{{ item['logsender_custom_ca_crt'] }}:z \
 | 
			
		||||
    {% endif %}
 | 
			
		||||
    {{ container_images['logsender'] }} \
 | 
			
		||||
    /usr/bin/logsender --config {{ logscraper_dir }}/logsender-{{ item['tenant'] }}.config
 | 
			
		||||
    /usr/local/bin/logsender --config {{ logscraper_dir }}/logsender-{{ item['tenant'] }}.config
 | 
			
		||||
 
 | 
			
		||||
@@ -24,8 +24,6 @@ Indices and tables
 | 
			
		||||
   * :doc:`logscraper-role`
 | 
			
		||||
   * :doc:`logsender`
 | 
			
		||||
   * :doc:`logsender-role`
 | 
			
		||||
   * :doc:`loggearman`
 | 
			
		||||
   * :doc:`loggearman-role`
 | 
			
		||||
 | 
			
		||||
.. toctree::
 | 
			
		||||
   :maxdepth: 2
 | 
			
		||||
@@ -36,5 +34,3 @@ Indices and tables
 | 
			
		||||
   logscraper-role
 | 
			
		||||
   logsender
 | 
			
		||||
   logsender-role
 | 
			
		||||
   loggearman
 | 
			
		||||
   loggearman-role
 | 
			
		||||
 
 | 
			
		||||
@@ -1 +0,0 @@
 | 
			
		||||
../../ansible/roles/loggearman/README.rst
 | 
			
		||||
@@ -1,22 +0,0 @@
 | 
			
		||||
Loggearman
 | 
			
		||||
==========
 | 
			
		||||
 | 
			
		||||
The Loggearman tools are responsible for listening events,
 | 
			
		||||
parse them, get logs from log server and push them to
 | 
			
		||||
the Logstash service.
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
Loggearman Client
 | 
			
		||||
-----------------
 | 
			
		||||
 | 
			
		||||
The Loggearman Client is responsible for listening events that
 | 
			
		||||
comes on port 4730 (by default), parse them and redirect them to
 | 
			
		||||
German server, that later will be processed by loggearman worker.
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
Loggearman Worker
 | 
			
		||||
-----------------
 | 
			
		||||
 | 
			
		||||
The Loggearman Worker is responsible to get log files from the
 | 
			
		||||
log server, parse them and send line by line to the Logstash service
 | 
			
		||||
with necessary fields like: build_uuid, build_name, etc.
 | 
			
		||||
@@ -11,7 +11,7 @@ It is available by typing:
 | 
			
		||||
 | 
			
		||||
   logscraper --help
 | 
			
		||||
 | 
			
		||||
   Fetch and push last Zuul CI job logs into gearman.
 | 
			
		||||
   Fetch and push last Zuul CI job logs:
 | 
			
		||||
 | 
			
		||||
   optional arguments:
 | 
			
		||||
     -h, --help            show this help message and exit
 | 
			
		||||
@@ -20,25 +20,16 @@ It is available by typing:
 | 
			
		||||
                           times.
 | 
			
		||||
     --job-name JOB_NAME   CI job name(s). Parameter can be set multiple times.
 | 
			
		||||
                           If not set it would scrape every latest builds.
 | 
			
		||||
     --gearman-server GEARMAN_SERVER
 | 
			
		||||
                           Gearman host addresss
 | 
			
		||||
     --gearman-port GEARMAN_PORT
 | 
			
		||||
                           Gearman listen port. Defaults to 4730.
 | 
			
		||||
     --follow              Keep polling zuul builds
 | 
			
		||||
     --insecure            Skip validating SSL cert
 | 
			
		||||
     --checkpoint-file CHECKPOINT_FILE
 | 
			
		||||
                           File that will keep information about last uuid
 | 
			
		||||
                           timestamp for a job.
 | 
			
		||||
     --logstash-url LOGSTASH_URL
 | 
			
		||||
                           When provided, script will check connection to
 | 
			
		||||
                           Logstash service before sending to log processing
 | 
			
		||||
                           system. For example: logstash.local:9999
 | 
			
		||||
     --workers WORKERS     Worker processes for logscraper
 | 
			
		||||
     --max-skipped MAX_SKIPPED
 | 
			
		||||
                           How many job results should be checked until last uuid
 | 
			
		||||
                           written in checkpoint file is founded
 | 
			
		||||
     --debug               Print more information
 | 
			
		||||
     --download            Download logs and do not send to gearman service
 | 
			
		||||
     --directory DIRECTORY
 | 
			
		||||
                           Directory, where the logs will be stored. Defaults to:
 | 
			
		||||
                           /tmp/logscraper
 | 
			
		||||
@@ -51,30 +42,6 @@ Base on the use case, we can run logscraper.
 | 
			
		||||
 | 
			
		||||
Example:
 | 
			
		||||
 | 
			
		||||
* periodical check if there are some new logs for `openstack` tenant:
 | 
			
		||||
 | 
			
		||||
.. code-block::
 | 
			
		||||
 | 
			
		||||
  logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /tmp/results-checkpoint --follow
 | 
			
		||||
 | 
			
		||||
* one shot on getting logs from `zuul` tenant:
 | 
			
		||||
 | 
			
		||||
.. code-block::
 | 
			
		||||
 | 
			
		||||
  logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --checkpoint-file /tmp/zuul-result-timestamp
 | 
			
		||||
 | 
			
		||||
* periodically scrape logs from tenants: `openstack`, `zuul` and `local`
 | 
			
		||||
 | 
			
		||||
.. code-block::
 | 
			
		||||
 | 
			
		||||
  logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --zuul-api-url https://zuul.opendev.org/api/tenant/local --checkpoint-file /tmp/someresults --follow
 | 
			
		||||
 | 
			
		||||
* scrape logs from two defined job names: `tripleo-ci-centos-8-containers-multinode` and `openstack-tox-linters` for tenants: `openstack` and `local`:
 | 
			
		||||
 | 
			
		||||
.. code-block::
 | 
			
		||||
 | 
			
		||||
  logscraper --gearman-server localhost --job-name tripleo-ci-centos-8-containers-multinode --job-name openstack-tox-linters --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --zuul-api-url https://zuul.opendev.org/api/tenant/local
 | 
			
		||||
 | 
			
		||||
* download logs to /mnt/logscraper. NOTE: if you are using container service, this directory needs to be mounted!
 | 
			
		||||
 | 
			
		||||
.. code-block::
 | 
			
		||||
@@ -97,13 +64,6 @@ Then you can execute commands that are described above.
 | 
			
		||||
NOTE: if you want to use parameter `--checkpoint-file`, you need to mount a volume
 | 
			
		||||
to the container, for example:
 | 
			
		||||
 | 
			
		||||
.. code-block::
 | 
			
		||||
 | 
			
		||||
   docker run -v $(pwd):/checkpoint-dir:z -d logscraper logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint --follow
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
In this example, logscraper will download log files to the /mnt/logscraper directory:
 | 
			
		||||
 | 
			
		||||
.. code-block::
 | 
			
		||||
 | 
			
		||||
   docker run -v $(pwd):/checkpoint-dir:z -v /mnt/logscraper:/mnt/logscraper:z -d logscraper logscraper --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint --directory /mnt/logscraper --download --follow
 | 
			
		||||
 
 | 
			
		||||
@@ -1,41 +0,0 @@
 | 
			
		||||
# Copyright (C) 2021 Red Hat
 | 
			
		||||
# Copyright (C) 2022 Red Hat
 | 
			
		||||
#
 | 
			
		||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
			
		||||
# not use this file except in compliance with the License. You may obtain
 | 
			
		||||
# a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#      http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
# Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
			
		||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
			
		||||
# License for the specific language governing permissions and limitations
 | 
			
		||||
# under the License.
 | 
			
		||||
 | 
			
		||||
FROM quay.io/centos/centos:stream8 as loggearman
 | 
			
		||||
 | 
			
		||||
ENV OSLO_PACKAGE_VERSION='0.0.1'
 | 
			
		||||
ENV PATH=~/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
 | 
			
		||||
ENV LANG=en_US.UTF-8
 | 
			
		||||
 | 
			
		||||
RUN groupadd --gid 1000 loggearman && \
 | 
			
		||||
    useradd --home-dir /home/loggearman --gid 1000 --uid 1000 loggearman
 | 
			
		||||
 | 
			
		||||
RUN dnf update -y && \
 | 
			
		||||
    dnf install -y python38 python38-setuptools \
 | 
			
		||||
                   python38-devel python38-wheel \
 | 
			
		||||
                   python38-pip git
 | 
			
		||||
 | 
			
		||||
COPY . /tmp/src
 | 
			
		||||
RUN cd /tmp/src && \
 | 
			
		||||
    pip3 install -r requirements.txt && \
 | 
			
		||||
    python3 setup.py install && \
 | 
			
		||||
    rm -rf /tmp/src
 | 
			
		||||
 | 
			
		||||
RUN dnf remove -y python3-devel git && \
 | 
			
		||||
    dnf autoremove -y && \
 | 
			
		||||
    dnf clean all && \
 | 
			
		||||
    rm -rf ~/.cache/pip
 | 
			
		||||
 | 
			
		||||
USER loggearman
 | 
			
		||||
@@ -1,8 +0,0 @@
 | 
			
		||||
OpenStack Log Processor Module
 | 
			
		||||
==============================
 | 
			
		||||
 | 
			
		||||
The Log Processor Module comes from `repository`_ with
 | 
			
		||||
applied patches from this `patchset`_.
 | 
			
		||||
 | 
			
		||||
.. _repository: https://opendev.org/opendev/puppet-log_processor
 | 
			
		||||
.. _patchset: https://review.opendev.org/c/opendev/puppet-log_processor/+/809424
 | 
			
		||||
@@ -1,246 +0,0 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
#
 | 
			
		||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
 | 
			
		||||
#
 | 
			
		||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
			
		||||
# not use this file except in compliance with the License. You may obtain
 | 
			
		||||
# a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#      http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
# Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
			
		||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
			
		||||
# License for the specific language governing permissions and limitations
 | 
			
		||||
# under the License.
 | 
			
		||||
 | 
			
		||||
import argparse
 | 
			
		||||
import daemon
 | 
			
		||||
import gear
 | 
			
		||||
import json
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
import os.path
 | 
			
		||||
import re
 | 
			
		||||
import signal
 | 
			
		||||
import socket
 | 
			
		||||
import threading
 | 
			
		||||
import time
 | 
			
		||||
import yaml
 | 
			
		||||
import zmq
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
try:
 | 
			
		||||
    import daemon.pidlockfile as pidfile_mod
 | 
			
		||||
except ImportError:
 | 
			
		||||
    import daemon.pidfile as pidfile_mod
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class EventProcessor(threading.Thread):
 | 
			
		||||
    def __init__(self, zmq_address, gearman_client, files, source_url):
 | 
			
		||||
        threading.Thread.__init__(self)
 | 
			
		||||
        self.files = files
 | 
			
		||||
        self.source_url = source_url
 | 
			
		||||
        self.gearman_client = gearman_client
 | 
			
		||||
        self.zmq_address = zmq_address
 | 
			
		||||
        self._connect_zmq()
 | 
			
		||||
 | 
			
		||||
    def run(self):
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                self._read_event()
 | 
			
		||||
            except Exception:
 | 
			
		||||
                # Assume that an error reading data from zmq or deserializing
 | 
			
		||||
                # data received from zmq indicates a zmq error and reconnect.
 | 
			
		||||
                logging.exception("ZMQ exception.")
 | 
			
		||||
                self._connect_zmq()
 | 
			
		||||
 | 
			
		||||
    def _connect_zmq(self):
 | 
			
		||||
        logging.debug("Connecting to zmq endpoint.")
 | 
			
		||||
        self.context = zmq.Context()
 | 
			
		||||
        self.socket = self.context.socket(zmq.SUB)
 | 
			
		||||
        event_filter = b"onFinalized"
 | 
			
		||||
        self.socket.setsockopt(zmq.SUBSCRIBE, event_filter)
 | 
			
		||||
        self.socket.connect(self.zmq_address)
 | 
			
		||||
 | 
			
		||||
    def _read_event(self):
 | 
			
		||||
        string = self.socket.recv().decode('utf-8')
 | 
			
		||||
        event = json.loads(string.split(None, 1)[1])
 | 
			
		||||
        logging.debug("Jenkins event received: " + json.dumps(event))
 | 
			
		||||
        for fileopts in self.files:
 | 
			
		||||
            output = {}
 | 
			
		||||
            source_url, out_event = self._parse_event(event, fileopts)
 | 
			
		||||
            job_filter = fileopts.get('job-filter')
 | 
			
		||||
            if (job_filter and not re.match(
 | 
			
		||||
                    job_filter, out_event['fields']['build_name'])):
 | 
			
		||||
                continue
 | 
			
		||||
            build_queue_filter = fileopts.get('build-queue-filter')
 | 
			
		||||
            if (build_queue_filter and not re.match(
 | 
			
		||||
                    build_queue_filter, out_event['fields']['build_queue'])):
 | 
			
		||||
                continue
 | 
			
		||||
            project_filter = fileopts.get('project-filter')
 | 
			
		||||
            if (project_filter and not re.match(
 | 
			
		||||
                    project_filter, out_event['fields']['project'])):
 | 
			
		||||
                continue
 | 
			
		||||
            output['source_url'] = source_url
 | 
			
		||||
            output['retry'] = fileopts.get('retry-get', False)
 | 
			
		||||
            output['event'] = out_event
 | 
			
		||||
            if 'subunit' in fileopts.get('name'):
 | 
			
		||||
                job = gear.Job(b'push-subunit',
 | 
			
		||||
                               json.dumps(output).encode('utf8'))
 | 
			
		||||
            else:
 | 
			
		||||
                job = gear.Job(b'push-log', json.dumps(output).encode('utf8'))
 | 
			
		||||
            try:
 | 
			
		||||
                self.gearman_client.submitJob(job)
 | 
			
		||||
            except Exception:
 | 
			
		||||
                logging.exception("Exception submitting job to Gearman.")
 | 
			
		||||
 | 
			
		||||
    def _get_log_dir(self, event):
 | 
			
		||||
        parameters = event["build"].get("parameters", {})
 | 
			
		||||
        base = parameters.get('LOG_PATH', 'UNKNOWN')
 | 
			
		||||
        return base
 | 
			
		||||
 | 
			
		||||
    def _parse_fields(self, event, filename):
 | 
			
		||||
        fields = {}
 | 
			
		||||
        fields["filename"] = filename
 | 
			
		||||
        fields["build_name"] = event.get("name", "UNKNOWN")
 | 
			
		||||
        fields["build_status"] = event["build"].get("status", "UNKNOWN")
 | 
			
		||||
        fields["build_node"] = event["build"].get("node_name", "UNKNOWN")
 | 
			
		||||
        fields["build_master"] = event["build"].get("host_name", "UNKNOWN")
 | 
			
		||||
        parameters = event["build"].get("parameters", {})
 | 
			
		||||
        fields["project"] = parameters.get("ZUUL_PROJECT", "UNKNOWN")
 | 
			
		||||
        # The voting value is "1" for voting, "0" for non-voting
 | 
			
		||||
        fields["voting"] = parameters.get("ZUUL_VOTING", "UNKNOWN")
 | 
			
		||||
        # TODO(clarkb) can we do better without duplicated data here?
 | 
			
		||||
        fields["build_uuid"] = parameters.get("ZUUL_UUID", "UNKNOWN")
 | 
			
		||||
        fields["build_short_uuid"] = fields["build_uuid"][:7]
 | 
			
		||||
        fields["build_queue"] = parameters.get("ZUUL_PIPELINE", "UNKNOWN")
 | 
			
		||||
        fields["build_ref"] = parameters.get("ZUUL_REF", "UNKNOWN")
 | 
			
		||||
        fields["build_branch"] = parameters.get("ZUUL_BRANCH", "UNKNOWN")
 | 
			
		||||
        fields["build_zuul_url"] = parameters.get("ZUUL_URL", "UNKNOWN")
 | 
			
		||||
        if parameters.get("ZUUL_CHANGE"):
 | 
			
		||||
            fields["build_change"] = parameters.get("ZUUL_CHANGE", "UNKNOWN")
 | 
			
		||||
            fields["build_patchset"] = parameters.get("ZUUL_PATCHSET",
 | 
			
		||||
                                                      "UNKNOWN")
 | 
			
		||||
        elif parameters.get("ZUUL_NEWREV"):
 | 
			
		||||
            fields["build_newrev"] = parameters.get("ZUUL_NEWREV",
 | 
			
		||||
                                                    "UNKNOWN")
 | 
			
		||||
        if ["build_node"] != "UNKNOWN":
 | 
			
		||||
            node_provider = '-'.join(
 | 
			
		||||
                fields["build_node"].split('-')[-3:-1])
 | 
			
		||||
            fields["node_provider"] = node_provider or "UNKNOWN"
 | 
			
		||||
        else:
 | 
			
		||||
            fields["node_provider"] = "UNKNOWN"
 | 
			
		||||
        return fields
 | 
			
		||||
 | 
			
		||||
    def _parse_event(self, event, fileopts):
 | 
			
		||||
        fields = self._parse_fields(event, fileopts['name'])
 | 
			
		||||
        log_dir = self._get_log_dir(event)
 | 
			
		||||
        source_url = fileopts.get('source-url', self.source_url) + '/' + \
 | 
			
		||||
            os.path.join(log_dir, fileopts['name'])
 | 
			
		||||
        fields["log_url"] = source_url
 | 
			
		||||
        out_event = {}
 | 
			
		||||
        out_event["fields"] = fields
 | 
			
		||||
        out_event["tags"] = [os.path.basename(fileopts['name'])] + \
 | 
			
		||||
            fileopts.get('tags', [])
 | 
			
		||||
        return source_url, out_event
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Server(object):
 | 
			
		||||
    def __init__(self, config, debuglog):
 | 
			
		||||
        # Config init.
 | 
			
		||||
        self.config = config
 | 
			
		||||
        self.source_url = self.config['source-url']
 | 
			
		||||
        # Pythong logging output file.
 | 
			
		||||
        self.debuglog = debuglog
 | 
			
		||||
        self.processors = []
 | 
			
		||||
 | 
			
		||||
    def setup_logging(self):
 | 
			
		||||
        if self.debuglog:
 | 
			
		||||
            logging.basicConfig(format='%(asctime)s %(message)s',
 | 
			
		||||
                                filename=self.debuglog, level=logging.DEBUG)
 | 
			
		||||
        else:
 | 
			
		||||
            # Prevent leakage into the logstash log stream.
 | 
			
		||||
            logging.basicConfig(level=logging.CRITICAL)
 | 
			
		||||
        logging.debug("Log pusher starting.")
 | 
			
		||||
 | 
			
		||||
    def setup_processors(self):
 | 
			
		||||
        for publisher in self.config['zmq-publishers']:
 | 
			
		||||
            gearclient = gear.Client()
 | 
			
		||||
            host = self.config.get('gearman-host', 'localhost')
 | 
			
		||||
            port = self.config.get('gearman-port', 4730)
 | 
			
		||||
            gearclient.addServer(host, port=port)
 | 
			
		||||
            gearclient.waitForServer()
 | 
			
		||||
            log_processor = EventProcessor(
 | 
			
		||||
                publisher, gearclient,
 | 
			
		||||
                self.config['source-files'], self.source_url)
 | 
			
		||||
            subunit_processor = EventProcessor(
 | 
			
		||||
                publisher, gearclient,
 | 
			
		||||
                self.config['subunit-files'], self.source_url)
 | 
			
		||||
            self.processors.append(log_processor)
 | 
			
		||||
            self.processors.append(subunit_processor)
 | 
			
		||||
 | 
			
		||||
    def wait_for_name_resolution(self, host, port):
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                socket.getaddrinfo(host, port)
 | 
			
		||||
            except socket.gaierror as e:
 | 
			
		||||
                if e.errno == socket.EAI_AGAIN:
 | 
			
		||||
                    logging.debug("Temporary failure in name resolution")
 | 
			
		||||
                    time.sleep(2)
 | 
			
		||||
                    continue
 | 
			
		||||
                else:
 | 
			
		||||
                    raise
 | 
			
		||||
            break
 | 
			
		||||
 | 
			
		||||
    def main(self):
 | 
			
		||||
        statsd_host = os.environ.get('STATSD_HOST')
 | 
			
		||||
        statsd_port = int(os.environ.get('STATSD_PORT', 8125))
 | 
			
		||||
        statsd_prefix = os.environ.get('STATSD_PREFIX', 'logstash.geard')
 | 
			
		||||
        if statsd_host:
 | 
			
		||||
            self.wait_for_name_resolution(statsd_host, statsd_port)
 | 
			
		||||
        self.gearserver = gear.Server(
 | 
			
		||||
            port=self.config.get('gearman-port', 4730),
 | 
			
		||||
            statsd_host=statsd_host,
 | 
			
		||||
            statsd_port=statsd_port,
 | 
			
		||||
            statsd_prefix=statsd_prefix)
 | 
			
		||||
 | 
			
		||||
        self.setup_processors()
 | 
			
		||||
        for processor in self.processors:
 | 
			
		||||
            processor.daemon = True
 | 
			
		||||
            processor.start()
 | 
			
		||||
        while True:
 | 
			
		||||
            signal.pause()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def main():
 | 
			
		||||
    parser = argparse.ArgumentParser()
 | 
			
		||||
    parser.add_argument("-c", "--config", required=True,
 | 
			
		||||
                        help="Path to yaml config file.")
 | 
			
		||||
    parser.add_argument("-d", "--debuglog",
 | 
			
		||||
                        help="Enable debug log. "
 | 
			
		||||
                             "Specifies file to write log to.")
 | 
			
		||||
    parser.add_argument("--foreground", action='store_true',
 | 
			
		||||
                        help="Run in the foreground.")
 | 
			
		||||
    parser.add_argument("-p", "--pidfile",
 | 
			
		||||
                        default="/var/run/jenkins-log-pusher/"
 | 
			
		||||
                                "jenkins-log-gearman-client.pid",
 | 
			
		||||
                        help="PID file to lock during daemonization.")
 | 
			
		||||
    args = parser.parse_args()
 | 
			
		||||
 | 
			
		||||
    with open(args.config, 'r') as config_stream:
 | 
			
		||||
        config = yaml.safe_load(config_stream)
 | 
			
		||||
    server = Server(config, args.debuglog)
 | 
			
		||||
 | 
			
		||||
    if args.foreground:
 | 
			
		||||
        server.setup_logging()
 | 
			
		||||
        server.main()
 | 
			
		||||
    else:
 | 
			
		||||
        pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10)
 | 
			
		||||
        with daemon.DaemonContext(pidfile=pidfile):
 | 
			
		||||
            server.setup_logging()
 | 
			
		||||
            server.main()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    main()
 | 
			
		||||
@@ -1,566 +0,0 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
#
 | 
			
		||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
 | 
			
		||||
#
 | 
			
		||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
			
		||||
# not use this file except in compliance with the License. You may obtain
 | 
			
		||||
# a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#      http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
# Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
			
		||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
			
		||||
# License for the specific language governing permissions and limitations
 | 
			
		||||
# under the License.
 | 
			
		||||
 | 
			
		||||
import argparse
 | 
			
		||||
import daemon
 | 
			
		||||
import gear
 | 
			
		||||
import json
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
import queue
 | 
			
		||||
import re
 | 
			
		||||
import requests
 | 
			
		||||
import select
 | 
			
		||||
import socket
 | 
			
		||||
import subprocess
 | 
			
		||||
import sys
 | 
			
		||||
import threading
 | 
			
		||||
import time
 | 
			
		||||
import yaml
 | 
			
		||||
 | 
			
		||||
import paho.mqtt.publish as publish
 | 
			
		||||
 | 
			
		||||
try:
 | 
			
		||||
    import daemon.pidlockfile as pidfile_mod
 | 
			
		||||
except ImportError:
 | 
			
		||||
    import daemon.pidfile as pidfile_mod
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def semi_busy_wait(seconds):
 | 
			
		||||
    # time.sleep() may return early. If it does sleep() again and repeat
 | 
			
		||||
    # until at least the number of seconds specified has elapsed.
 | 
			
		||||
    start_time = time.time()
 | 
			
		||||
    while True:
 | 
			
		||||
        time.sleep(seconds)
 | 
			
		||||
        cur_time = time.time()
 | 
			
		||||
        seconds = seconds - (cur_time - start_time)
 | 
			
		||||
        if seconds <= 0.0:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FilterException(Exception):
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class CRM114Filter(object):
 | 
			
		||||
    def __init__(self, script, path, build_status):
 | 
			
		||||
        self.p = None
 | 
			
		||||
        self.script = script
 | 
			
		||||
        self.path = path
 | 
			
		||||
        self.build_status = build_status
 | 
			
		||||
        if build_status not in ['SUCCESS', 'FAILURE']:
 | 
			
		||||
            return
 | 
			
		||||
        if not os.path.exists(path):
 | 
			
		||||
            os.makedirs(path)
 | 
			
		||||
        args = [script, path, build_status]
 | 
			
		||||
        self.p = subprocess.Popen(args,
 | 
			
		||||
                                  stdout=subprocess.PIPE,
 | 
			
		||||
                                  stderr=subprocess.PIPE,
 | 
			
		||||
                                  stdin=subprocess.PIPE,
 | 
			
		||||
                                  close_fds=True)
 | 
			
		||||
 | 
			
		||||
    def process(self, data):
 | 
			
		||||
        if not self.p:
 | 
			
		||||
            return True
 | 
			
		||||
        self.p.stdin.write(data['message'].encode('utf-8') + '\n')
 | 
			
		||||
        (r, w, x) = select.select([self.p.stdout], [],
 | 
			
		||||
                                  [self.p.stdin, self.p.stdout], 20)
 | 
			
		||||
        if not r:
 | 
			
		||||
            self.p.kill()
 | 
			
		||||
            raise FilterException('Timeout reading from CRM114')
 | 
			
		||||
        r = self.p.stdout.readline()
 | 
			
		||||
        if not r:
 | 
			
		||||
            err = self.p.stderr.read()
 | 
			
		||||
            if err:
 | 
			
		||||
                raise FilterException(err)
 | 
			
		||||
            else:
 | 
			
		||||
                raise FilterException('Early EOF from CRM114')
 | 
			
		||||
        r = r.strip()
 | 
			
		||||
        data['error_pr'] = float(r)
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def _catchOSError(self, method):
 | 
			
		||||
        try:
 | 
			
		||||
            method()
 | 
			
		||||
        except OSError:
 | 
			
		||||
            logging.exception("Subprocess cleanup failed.")
 | 
			
		||||
 | 
			
		||||
    def close(self):
 | 
			
		||||
        if not self.p:
 | 
			
		||||
            return
 | 
			
		||||
        # CRM114 should die when its stdinput is closed. Close that
 | 
			
		||||
        # fd along with stdout and stderr then return.
 | 
			
		||||
        self._catchOSError(self.p.stdin.close)
 | 
			
		||||
        self._catchOSError(self.p.stdout.close)
 | 
			
		||||
        self._catchOSError(self.p.stderr.close)
 | 
			
		||||
        self._catchOSError(self.p.wait)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class CRM114FilterFactory(object):
 | 
			
		||||
    name = "CRM114"
 | 
			
		||||
 | 
			
		||||
    def __init__(self, script, basepath):
 | 
			
		||||
        self.script = script
 | 
			
		||||
        self.basepath = basepath
 | 
			
		||||
        # Precompile regexes
 | 
			
		||||
        self.re_remove_suffix = re.compile(r'(\.[^a-zA-Z]+)?(\.gz)?$')
 | 
			
		||||
        self.re_remove_dot = re.compile(r'\.')
 | 
			
		||||
 | 
			
		||||
    def create(self, fields):
 | 
			
		||||
        # We only want the basename so that the same logfile at different
 | 
			
		||||
        # paths isn't treated as different
 | 
			
		||||
        filename = os.path.basename(fields['filename'])
 | 
			
		||||
        # We want to collapse any numeric or compression suffixes so that
 | 
			
		||||
        # nova.log and nova.log.1 and nova.log.1.gz are treated as the same
 | 
			
		||||
        # logical file
 | 
			
		||||
        filename = self.re_remove_suffix.sub(r'', filename)
 | 
			
		||||
        filename = self.re_remove_dot.sub('_', filename)
 | 
			
		||||
        path = os.path.join(self.basepath, filename)
 | 
			
		||||
        return CRM114Filter(self.script, path, fields['build_status'])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class OsloSeverityFilter(object):
 | 
			
		||||
    DATEFMT = r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}((\.|\,)\d{3,6})?'
 | 
			
		||||
    SEVERITYFMT = '(DEBUG|INFO|WARNING|ERROR|TRACE|AUDIT|CRITICAL)'
 | 
			
		||||
    OSLO_LOGMATCH = (r'^(?P<date>%s)(?P<line>(?P<pid> \d+)? '
 | 
			
		||||
                     '(?P<severity>%s).*)' %
 | 
			
		||||
                     (DATEFMT, SEVERITYFMT))
 | 
			
		||||
    OSLORE = re.compile(OSLO_LOGMATCH)
 | 
			
		||||
 | 
			
		||||
    def process(self, data):
 | 
			
		||||
        msg = data['message']
 | 
			
		||||
        m = self.OSLORE.match(msg)
 | 
			
		||||
        if m:
 | 
			
		||||
            data['severity'] = m.group('severity')
 | 
			
		||||
            if data['severity'].lower == 'debug':
 | 
			
		||||
                # Ignore debug-level lines
 | 
			
		||||
                return False
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def close(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class OsloSeverityFilterFactory(object):
 | 
			
		||||
    name = "OsloSeverity"
 | 
			
		||||
 | 
			
		||||
    def create(self, fields):
 | 
			
		||||
        return OsloSeverityFilter()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SystemdSeverityFilter(object):
 | 
			
		||||
    '''Match systemd DEBUG level logs
 | 
			
		||||
 | 
			
		||||
    A line to match looks like:
 | 
			
		||||
 | 
			
		||||
    Aug 15 18:58:49.910786 hostname devstack@keystone.service[31400]:
 | 
			
		||||
                           DEBUG uwsgi ...
 | 
			
		||||
    '''
 | 
			
		||||
    SYSTEMDDATE = r'\w+\s+\d+\s+\d{2}:\d{2}:\d{2}((\.|\,)\d{3,6})?'
 | 
			
		||||
    DATEFMT = r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}((\.|\,)\d{3,6})?'
 | 
			
		||||
    SEVERITYFMT = '(DEBUG|INFO|WARNING|ERROR|TRACE|AUDIT|CRITICAL)'
 | 
			
		||||
    SYSTEMD_LOGMATCH = r'^(?P<date>%s)( (\S+) \S+\[\d+\]\: ' \
 | 
			
		||||
        '(?P<severity>%s)?.*)' % (SYSTEMDDATE, SEVERITYFMT)
 | 
			
		||||
    SYSTEMDRE = re.compile(SYSTEMD_LOGMATCH)
 | 
			
		||||
 | 
			
		||||
    def process(self, data):
 | 
			
		||||
        msg = data['message']
 | 
			
		||||
        m = self.SYSTEMDRE.match(msg)
 | 
			
		||||
        if m:
 | 
			
		||||
            if m.group('severity') == 'DEBUG':
 | 
			
		||||
                return False
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def close(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SystemdSeverityFilterFactory(object):
 | 
			
		||||
    name = "SystemdSeverity"
 | 
			
		||||
 | 
			
		||||
    def create(self, fields):
 | 
			
		||||
        return SystemdSeverityFilter()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class LogRetriever(threading.Thread):
 | 
			
		||||
    def __init__(self, gearman_worker, filters, logq,
 | 
			
		||||
                 log_cert_verify, log_ca_certs, mqtt=None):
 | 
			
		||||
        threading.Thread.__init__(self)
 | 
			
		||||
        self.gearman_worker = gearman_worker
 | 
			
		||||
        self.filters = filters
 | 
			
		||||
        self.logq = logq
 | 
			
		||||
        self.mqtt = mqtt
 | 
			
		||||
        self.log_cert_verify = log_cert_verify
 | 
			
		||||
        self.log_ca_certs = log_ca_certs
 | 
			
		||||
 | 
			
		||||
    def run(self):
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                self._handle_event()
 | 
			
		||||
            except Exception:
 | 
			
		||||
                logging.exception("Exception retrieving log event.")
 | 
			
		||||
 | 
			
		||||
    def _handle_event(self):
 | 
			
		||||
        fields = {}
 | 
			
		||||
        num_log_lines = 0
 | 
			
		||||
        source_url = ''
 | 
			
		||||
        http_session = None
 | 
			
		||||
        job = self.gearman_worker.getJob()
 | 
			
		||||
        try:
 | 
			
		||||
            arguments = json.loads(job.arguments.decode('utf-8'))
 | 
			
		||||
            source_url = arguments['source_url']
 | 
			
		||||
            event = arguments['event']
 | 
			
		||||
            logging.debug("Handling event: " + json.dumps(event))
 | 
			
		||||
            fields = event.get('fields') or event.get('@fields')
 | 
			
		||||
            tags = event.get('tags') or event.get('@tags')
 | 
			
		||||
            if fields['build_status'] != 'ABORTED':
 | 
			
		||||
                # Handle events ignoring aborted builds. These builds are
 | 
			
		||||
                # discarded by zuul.
 | 
			
		||||
                file_obj, http_session = self._open_log_file_url(source_url)
 | 
			
		||||
 | 
			
		||||
                try:
 | 
			
		||||
                    all_filters = []
 | 
			
		||||
                    for f in self.filters:
 | 
			
		||||
                        logging.debug("Adding filter: %s" % f.name)
 | 
			
		||||
                        all_filters.append(f.create(fields))
 | 
			
		||||
                    filters = all_filters
 | 
			
		||||
 | 
			
		||||
                    base_event = {}
 | 
			
		||||
                    base_event.update(fields)
 | 
			
		||||
                    base_event["tags"] = tags
 | 
			
		||||
                    for line in self._retrieve_log_line(file_obj):
 | 
			
		||||
                        keep_line = True
 | 
			
		||||
                        out_event = base_event.copy()
 | 
			
		||||
                        out_event["message"] = line
 | 
			
		||||
                        new_filters = []
 | 
			
		||||
                        for f in filters:
 | 
			
		||||
                            if not keep_line:
 | 
			
		||||
                                new_filters.append(f)
 | 
			
		||||
                                continue
 | 
			
		||||
                            try:
 | 
			
		||||
                                keep_line = f.process(out_event)
 | 
			
		||||
                                new_filters.append(f)
 | 
			
		||||
                            except FilterException:
 | 
			
		||||
                                logging.exception("Exception filtering event: "
 | 
			
		||||
                                                  "%s" % line.encode("utf-8"))
 | 
			
		||||
                        filters = new_filters
 | 
			
		||||
                        if keep_line:
 | 
			
		||||
                            self.logq.put(out_event)
 | 
			
		||||
                        num_log_lines += 1
 | 
			
		||||
 | 
			
		||||
                    logging.debug("Pushed %s log lines." % num_log_lines)
 | 
			
		||||
                finally:
 | 
			
		||||
                    for f in all_filters:
 | 
			
		||||
                        f.close()
 | 
			
		||||
                    if http_session:
 | 
			
		||||
                        http_session.close()
 | 
			
		||||
            job.sendWorkComplete()
 | 
			
		||||
            # Only send mqtt events for log files we processed.
 | 
			
		||||
            if self.mqtt and num_log_lines:
 | 
			
		||||
                msg = json.dumps({
 | 
			
		||||
                    'build_uuid': fields.get('build_uuid'),
 | 
			
		||||
                    'source_url': source_url,
 | 
			
		||||
                    'status': 'success',
 | 
			
		||||
                })
 | 
			
		||||
                self.mqtt.publish_single(msg, fields.get('project'),
 | 
			
		||||
                                         fields.get('build_change'),
 | 
			
		||||
                                         'retrieve_logs',
 | 
			
		||||
                                         fields.get('build_queue'))
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logging.exception("Exception handling log event.")
 | 
			
		||||
            job.sendWorkException(str(e).encode('utf-8'))
 | 
			
		||||
            if self.mqtt:
 | 
			
		||||
                msg = json.dumps({
 | 
			
		||||
                    'build_uuid': fields.get('build_uuid'),
 | 
			
		||||
                    'source_url': source_url,
 | 
			
		||||
                    'status': 'failure',
 | 
			
		||||
                })
 | 
			
		||||
                self.mqtt.publish_single(msg, fields.get('project'),
 | 
			
		||||
                                         fields.get('build_change'),
 | 
			
		||||
                                         'retrieve_logs',
 | 
			
		||||
                                         fields.get('build_queue'))
 | 
			
		||||
 | 
			
		||||
    def _retrieve_log_line(self, file_obj, chunk_size=4096):
 | 
			
		||||
        if not file_obj:
 | 
			
		||||
            return
 | 
			
		||||
        # Response.iter_lines automatically decodes 'gzip' and 'deflate'
 | 
			
		||||
        # encodings.
 | 
			
		||||
        # https://requests.readthedocs.io/en/master/user/quickstart/#raw-response-content
 | 
			
		||||
        for line in file_obj.iter_lines(chunk_size, decode_unicode=True):
 | 
			
		||||
            yield line
 | 
			
		||||
 | 
			
		||||
    def _open_log_file_url(self, source_url):
 | 
			
		||||
        file_obj = None
 | 
			
		||||
 | 
			
		||||
        kwargs = {}
 | 
			
		||||
        if self.log_cert_verify and self.log_ca_certs:
 | 
			
		||||
            kwargs['verify'] = self.log_ca_certs
 | 
			
		||||
        elif not self.log_cert_verify:
 | 
			
		||||
            kwargs['verify'] = self.log_cert_verify
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            logging.debug("Retrieving: " + source_url)
 | 
			
		||||
            # Use a session to persist the HTTP connection across requests
 | 
			
		||||
            # while downloading chunks of the log file.
 | 
			
		||||
            session = requests.Session()
 | 
			
		||||
            session.headers = {'Accept-encoding': 'deflate, gzip'}
 | 
			
		||||
            file_obj = session.get(source_url, stream=True, **kwargs)
 | 
			
		||||
            file_obj.raise_for_status()
 | 
			
		||||
        except requests.HTTPError as e:
 | 
			
		||||
            if e.response.status_code == 404:
 | 
			
		||||
                logging.info("Unable to retrieve %s: HTTP error 404" %
 | 
			
		||||
                             source_url)
 | 
			
		||||
            else:
 | 
			
		||||
                logging.exception("Unable to get log data.")
 | 
			
		||||
        except Exception:
 | 
			
		||||
            # Silently drop fatal errors when retrieving logs.
 | 
			
		||||
            # TODO(clarkb): Handle these errors.
 | 
			
		||||
            # Perhaps simply add a log message to file_obj?
 | 
			
		||||
            logging.exception("Unable to retrieve source file.")
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
        return file_obj, session
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class StdOutLogProcessor(object):
 | 
			
		||||
    def __init__(self, logq, pretty_print=False):
 | 
			
		||||
        self.logq = logq
 | 
			
		||||
        self.pretty_print = pretty_print
 | 
			
		||||
 | 
			
		||||
    def handle_log_event(self):
 | 
			
		||||
        log = self.logq.get()
 | 
			
		||||
        if self.pretty_print:
 | 
			
		||||
            print(json.dumps(log, sort_keys=True,
 | 
			
		||||
                  indent=4, separators=(',', ': ')))
 | 
			
		||||
        else:
 | 
			
		||||
            print(json.dumps(log))
 | 
			
		||||
        # Push each log event through to keep logstash up to date.
 | 
			
		||||
        sys.stdout.flush()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class INETLogProcessor(object):
 | 
			
		||||
    socket_type = None
 | 
			
		||||
 | 
			
		||||
    def __init__(self, logq, host, port):
 | 
			
		||||
        self.logq = logq
 | 
			
		||||
        self.host = host
 | 
			
		||||
        self.port = port
 | 
			
		||||
        self.socket = None
 | 
			
		||||
 | 
			
		||||
    def _connect_socket(self):
 | 
			
		||||
        logging.debug("Creating socket.")
 | 
			
		||||
        self.socket = socket.socket(socket.AF_INET, self.socket_type)
 | 
			
		||||
        self.socket.connect((self.host, self.port))
 | 
			
		||||
 | 
			
		||||
    def handle_log_event(self):
 | 
			
		||||
        log = self.logq.get()
 | 
			
		||||
        try:
 | 
			
		||||
            if self.socket is None:
 | 
			
		||||
                self._connect_socket()
 | 
			
		||||
            self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
 | 
			
		||||
        except Exception:
 | 
			
		||||
            logging.exception("Exception sending INET event.")
 | 
			
		||||
            # Logstash seems to take about a minute to start again. Wait 90
 | 
			
		||||
            # seconds before attempting to reconnect. If logstash is not
 | 
			
		||||
            # available after 90 seconds we will throw another exception and
 | 
			
		||||
            # die.
 | 
			
		||||
            semi_busy_wait(90)
 | 
			
		||||
            self._connect_socket()
 | 
			
		||||
            self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class UDPLogProcessor(INETLogProcessor):
 | 
			
		||||
    socket_type = socket.SOCK_DGRAM
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TCPLogProcessor(INETLogProcessor):
 | 
			
		||||
    socket_type = socket.SOCK_STREAM
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PushMQTT(object):
 | 
			
		||||
    def __init__(self, hostname, base_topic, port=1883, client_id=None,
 | 
			
		||||
                 keepalive=60, will=None, auth=None, tls=None, qos=0):
 | 
			
		||||
        self.hostname = hostname
 | 
			
		||||
        self.port = port
 | 
			
		||||
        self.client_id = client_id
 | 
			
		||||
        self.keepalive = 60
 | 
			
		||||
        self.will = will
 | 
			
		||||
        self.auth = auth
 | 
			
		||||
        self.tls = tls
 | 
			
		||||
        self.qos = qos
 | 
			
		||||
        self.base_topic = base_topic
 | 
			
		||||
 | 
			
		||||
    def _generate_topic(self, project, job_id, action):
 | 
			
		||||
        return '/'.join([self.base_topic, project, job_id, action])
 | 
			
		||||
 | 
			
		||||
    def publish_single(self, msg, project, job_id, action, build_queue=None):
 | 
			
		||||
        if job_id:
 | 
			
		||||
            topic = self._generate_topic(project, job_id, action)
 | 
			
		||||
        elif build_queue:
 | 
			
		||||
            topic = self._generate_topic(project, build_queue, action)
 | 
			
		||||
        else:
 | 
			
		||||
            topic = self.base_topic + '/' + project
 | 
			
		||||
 | 
			
		||||
        publish.single(topic, msg, hostname=self.hostname,
 | 
			
		||||
                       port=self.port, client_id=self.client_id,
 | 
			
		||||
                       keepalive=self.keepalive, will=self.will,
 | 
			
		||||
                       auth=self.auth, tls=self.tls, qos=self.qos)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Server(object):
 | 
			
		||||
    def __init__(self, config, debuglog):
 | 
			
		||||
        # Config init.
 | 
			
		||||
        self.config = config
 | 
			
		||||
        self.gearman_host = self.config['gearman-host']
 | 
			
		||||
        self.gearman_port = self.config['gearman-port']
 | 
			
		||||
        self.output_host = self.config['output-host']
 | 
			
		||||
        self.output_port = self.config['output-port']
 | 
			
		||||
        self.output_mode = self.config['output-mode']
 | 
			
		||||
        mqtt_host = self.config.get('mqtt-host')
 | 
			
		||||
        mqtt_port = self.config.get('mqtt-port', 1883)
 | 
			
		||||
        mqtt_user = self.config.get('mqtt-user')
 | 
			
		||||
        mqtt_pass = self.config.get('mqtt-pass')
 | 
			
		||||
        mqtt_topic = self.config.get('mqtt-topic', 'gearman-subunit')
 | 
			
		||||
        mqtt_ca_certs = self.config.get('mqtt-ca-certs')
 | 
			
		||||
        mqtt_certfile = self.config.get('mqtt-certfile')
 | 
			
		||||
        mqtt_keyfile = self.config.get('mqtt-keyfile')
 | 
			
		||||
        self.log_ca_certs = self.config.get('log-ca-certs')
 | 
			
		||||
        self.log_cert_verify = self.config.get('log-cert-verify', True)
 | 
			
		||||
        # Pythong logging output file.
 | 
			
		||||
        self.debuglog = debuglog
 | 
			
		||||
        self.retriever = None
 | 
			
		||||
        self.logqueue = queue.Queue(16384)
 | 
			
		||||
        self.processor = None
 | 
			
		||||
        self.filter_factories = []
 | 
			
		||||
        # Run the severity filter first so it can filter out chatty
 | 
			
		||||
        # logs.
 | 
			
		||||
        self.filter_factories.append(OsloSeverityFilterFactory())
 | 
			
		||||
        self.filter_factories.append(SystemdSeverityFilterFactory())
 | 
			
		||||
        crmscript = self.config.get('crm114-script')
 | 
			
		||||
        crmdata = self.config.get('crm114-data')
 | 
			
		||||
        if crmscript and crmdata:
 | 
			
		||||
            self.filter_factories.append(
 | 
			
		||||
                CRM114FilterFactory(crmscript, crmdata))
 | 
			
		||||
        # Setup MQTT
 | 
			
		||||
        self.mqtt = None
 | 
			
		||||
        if mqtt_host:
 | 
			
		||||
            auth = None
 | 
			
		||||
            if mqtt_user:
 | 
			
		||||
                auth = {'username': mqtt_user}
 | 
			
		||||
            if mqtt_pass:
 | 
			
		||||
                auth['password'] = mqtt_pass
 | 
			
		||||
            tls = None
 | 
			
		||||
            if mqtt_ca_certs:
 | 
			
		||||
                tls = {'ca_certs': mqtt_ca_certs,
 | 
			
		||||
                       'certfile': mqtt_certfile,
 | 
			
		||||
                       'keyfile': mqtt_keyfile}
 | 
			
		||||
 | 
			
		||||
            self.mqtt = PushMQTT(mqtt_host, mqtt_topic, port=mqtt_port,
 | 
			
		||||
                                 auth=auth, tls=tls)
 | 
			
		||||
 | 
			
		||||
    def setup_logging(self):
 | 
			
		||||
        if self.debuglog:
 | 
			
		||||
            logging.basicConfig(format='%(asctime)s %(message)s',
 | 
			
		||||
                                filename=self.debuglog, level=logging.DEBUG)
 | 
			
		||||
        else:
 | 
			
		||||
            # Prevent leakage into the logstash log stream.
 | 
			
		||||
            logging.basicConfig(level=logging.CRITICAL)
 | 
			
		||||
        logging.debug("Log pusher starting.")
 | 
			
		||||
 | 
			
		||||
    def wait_for_name_resolution(self, host, port):
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                socket.getaddrinfo(host, port)
 | 
			
		||||
            except socket.gaierror as e:
 | 
			
		||||
                if e.errno == socket.EAI_AGAIN:
 | 
			
		||||
                    logging.debug("Temporary failure in name resolution")
 | 
			
		||||
                    time.sleep(2)
 | 
			
		||||
                    continue
 | 
			
		||||
                else:
 | 
			
		||||
                    raise
 | 
			
		||||
            break
 | 
			
		||||
 | 
			
		||||
    def setup_retriever(self):
 | 
			
		||||
        hostname = socket.gethostname()
 | 
			
		||||
        gearman_worker = gear.Worker(hostname + '-pusher')
 | 
			
		||||
        self.wait_for_name_resolution(self.gearman_host, self.gearman_port)
 | 
			
		||||
        gearman_worker.addServer(self.gearman_host,
 | 
			
		||||
                                 self.gearman_port)
 | 
			
		||||
        gearman_worker.registerFunction(b'push-log')
 | 
			
		||||
        self.retriever = LogRetriever(gearman_worker, self.filter_factories,
 | 
			
		||||
                                      self.logqueue, self.log_cert_verify,
 | 
			
		||||
                                      self.log_ca_certs, mqtt=self.mqtt)
 | 
			
		||||
 | 
			
		||||
    def setup_processor(self):
 | 
			
		||||
        if self.output_mode == "tcp":
 | 
			
		||||
            self.processor = TCPLogProcessor(self.logqueue,
 | 
			
		||||
                                             self.output_host,
 | 
			
		||||
                                             self.output_port)
 | 
			
		||||
        elif self.output_mode == "udp":
 | 
			
		||||
            self.processor = UDPLogProcessor(self.logqueue,
 | 
			
		||||
                                             self.output_host,
 | 
			
		||||
                                             self.output_port)
 | 
			
		||||
        else:
 | 
			
		||||
            # Note this processor will not work if the process is run as a
 | 
			
		||||
            # daemon. You must use the --foreground option.
 | 
			
		||||
            self.processor = StdOutLogProcessor(self.logqueue)
 | 
			
		||||
 | 
			
		||||
    def main(self):
 | 
			
		||||
        self.setup_retriever()
 | 
			
		||||
        self.setup_processor()
 | 
			
		||||
 | 
			
		||||
        self.retriever.daemon = True
 | 
			
		||||
        self.retriever.start()
 | 
			
		||||
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                self.processor.handle_log_event()
 | 
			
		||||
            except Exception:
 | 
			
		||||
                logging.exception("Exception processing log event.")
 | 
			
		||||
                raise
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def main():
 | 
			
		||||
    parser = argparse.ArgumentParser()
 | 
			
		||||
    parser.add_argument("-c", "--config", required=True,
 | 
			
		||||
                        help="Path to yaml config file.")
 | 
			
		||||
    parser.add_argument("-d", "--debuglog",
 | 
			
		||||
                        help="Enable debug log. "
 | 
			
		||||
                             "Specifies file to write log to.")
 | 
			
		||||
    parser.add_argument("--foreground", action='store_true',
 | 
			
		||||
                        help="Run in the foreground.")
 | 
			
		||||
    parser.add_argument("-p", "--pidfile",
 | 
			
		||||
                        default="/var/run/jenkins-log-pusher/"
 | 
			
		||||
                                "jenkins-log-gearman-worker.pid",
 | 
			
		||||
                        help="PID file to lock during daemonization.")
 | 
			
		||||
    args = parser.parse_args()
 | 
			
		||||
 | 
			
		||||
    with open(args.config, 'r') as config_stream:
 | 
			
		||||
        config = yaml.safe_load(config_stream)
 | 
			
		||||
    server = Server(config, args.debuglog)
 | 
			
		||||
 | 
			
		||||
    if args.foreground:
 | 
			
		||||
        server.setup_logging()
 | 
			
		||||
        server.main()
 | 
			
		||||
    else:
 | 
			
		||||
        pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10)
 | 
			
		||||
        with daemon.DaemonContext(pidfile=pidfile):
 | 
			
		||||
            server.setup_logging()
 | 
			
		||||
            server.main()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    main()
 | 
			
		||||
@@ -1,6 +0,0 @@
 | 
			
		||||
pbr>=1.6         # Apache-2.0
 | 
			
		||||
gear<=0.16.0
 | 
			
		||||
requests<=2.26.0 # Apache-2.0
 | 
			
		||||
PyYAML<=6.0      # MIT
 | 
			
		||||
pyzmq<=21.0.2
 | 
			
		||||
paho-mqtt<=1.6.1
 | 
			
		||||
@@ -1,26 +0,0 @@
 | 
			
		||||
[metadata]
 | 
			
		||||
name = loggearman
 | 
			
		||||
summary = OpenStack Log Processor Module
 | 
			
		||||
description_file =
 | 
			
		||||
    README.rst
 | 
			
		||||
author = Openstack Contributors
 | 
			
		||||
author_email = service-discuss@lists.opendev.org
 | 
			
		||||
home_page = http://docs.openstack.org/infra/ci-log-processing
 | 
			
		||||
classifier =
 | 
			
		||||
    Environment :: OpenStack
 | 
			
		||||
    Intended Audience :: Information Technology
 | 
			
		||||
    Intended Audience :: System Administrators
 | 
			
		||||
    License :: OSI Approved :: Apache Software License
 | 
			
		||||
    Operating System :: POSIX :: Linux
 | 
			
		||||
    Programming Language :: Python
 | 
			
		||||
 | 
			
		||||
[build_sphinx]
 | 
			
		||||
all_files = 1
 | 
			
		||||
build-dir = doc/build
 | 
			
		||||
source-dir = doc/source
 | 
			
		||||
warning-is-error = 1
 | 
			
		||||
 | 
			
		||||
[entry_points]
 | 
			
		||||
console_scripts =
 | 
			
		||||
    log-gearman-client = loggearman.client:main
 | 
			
		||||
    log-gearman-worker = loggearman.worker:main
 | 
			
		||||
@@ -1,20 +0,0 @@
 | 
			
		||||
# Copyright (C) 2021 Red Hat
 | 
			
		||||
#
 | 
			
		||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
			
		||||
# not use this file except in compliance with the License. You may obtain
 | 
			
		||||
# a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#      http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
# Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
			
		||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
			
		||||
# License for the specific language governing permissions and limitations
 | 
			
		||||
# under the License.
 | 
			
		||||
 | 
			
		||||
import setuptools
 | 
			
		||||
 | 
			
		||||
setuptools.setup(
 | 
			
		||||
    setup_requires=['pbr'],
 | 
			
		||||
    pbr=True
 | 
			
		||||
)
 | 
			
		||||
@@ -17,6 +17,4 @@ monitoring_port: 9128
 | 
			
		||||
######################
 | 
			
		||||
# DEPRECATED OPTIONS #
 | 
			
		||||
######################
 | 
			
		||||
# gearman_server: localhost
 | 
			
		||||
# gearman_port: 4730
 | 
			
		||||
# logstash_url: https://localhost:9600
 | 
			
		||||
 
 | 
			
		||||
@@ -15,9 +15,11 @@
 | 
			
		||||
# under the License.
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
The goal is to push recent zuul builds into log gearman processor.
 | 
			
		||||
The goal is to take recent logs from Zuul CI job to the disk.
 | 
			
		||||
Below short view:
 | 
			
		||||
 | 
			
		||||
[ CLI ] -> [ Config ] -> [ ZuulFetcher ] -> [ LogPublisher ]
 | 
			
		||||
                            (logscraper) ->    (logsender)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Zuul builds results are not sorted by end time. Here is a problematic
 | 
			
		||||
@@ -61,14 +63,11 @@ end_time.
 | 
			
		||||
import argparse
 | 
			
		||||
import configparser
 | 
			
		||||
import datetime
 | 
			
		||||
import gear
 | 
			
		||||
import itertools
 | 
			
		||||
import json
 | 
			
		||||
import logging
 | 
			
		||||
import multiprocessing
 | 
			
		||||
import os
 | 
			
		||||
import requests
 | 
			
		||||
import socket
 | 
			
		||||
import sqlite3
 | 
			
		||||
import sys
 | 
			
		||||
import tenacity
 | 
			
		||||
@@ -129,8 +128,8 @@ def _verify_ca(args):
 | 
			
		||||
#                                    CLI                                      #
 | 
			
		||||
###############################################################################
 | 
			
		||||
def get_arguments():
 | 
			
		||||
    parser = argparse.ArgumentParser(description="Fetch and push last Zuul "
 | 
			
		||||
                                     "CI job logs into gearman.")
 | 
			
		||||
    parser = argparse.ArgumentParser(description="Fetch last Zuul CI job "
 | 
			
		||||
                                     "logs.")
 | 
			
		||||
    parser.add_argument("--config", help="Logscraper config file",
 | 
			
		||||
                        required=True)
 | 
			
		||||
    parser.add_argument("--file-list", help="File list to download. Parameter "
 | 
			
		||||
@@ -141,19 +140,12 @@ def get_arguments():
 | 
			
		||||
    parser.add_argument("--job-name", help="CI job name(s). Parameter can be "
 | 
			
		||||
                        "set multiple times. If not set it would scrape "
 | 
			
		||||
                        "every latest builds", nargs='+', default=[])
 | 
			
		||||
    parser.add_argument("--gearman-server", help="Gearman host address")
 | 
			
		||||
    parser.add_argument("--gearman-port", help="Gearman listen port.",
 | 
			
		||||
                        type=int)
 | 
			
		||||
    parser.add_argument("--follow", help="Keep polling zuul builds",
 | 
			
		||||
                        action="store_true")
 | 
			
		||||
    parser.add_argument("--insecure", help="Skip validating SSL cert",
 | 
			
		||||
                        action="store_true")
 | 
			
		||||
    parser.add_argument("--checkpoint-file", help="File that will keep "
 | 
			
		||||
                        "information about last uuid timestamp for a job.")
 | 
			
		||||
    parser.add_argument("--logstash-url", help="When provided, script will "
 | 
			
		||||
                        "check connection to Logstash service before sending "
 | 
			
		||||
                        "to log processing system. For example: "
 | 
			
		||||
                        "logstash.local:9999")
 | 
			
		||||
    parser.add_argument("--workers", help="Worker processes for logscraper",
 | 
			
		||||
                        type=int)
 | 
			
		||||
    parser.add_argument("--max-skipped", help="How many job results should be "
 | 
			
		||||
@@ -162,9 +154,6 @@ def get_arguments():
 | 
			
		||||
                        type=int)
 | 
			
		||||
    parser.add_argument("--debug", help="Print more information",
 | 
			
		||||
                        action="store_true")
 | 
			
		||||
    parser.add_argument("--download", help="Download logs and do not send "
 | 
			
		||||
                        "to gearman service",
 | 
			
		||||
                        action="store_true")
 | 
			
		||||
    parser.add_argument("--directory", help="Directory, where the logs will "
 | 
			
		||||
                        "be stored.")
 | 
			
		||||
    parser.add_argument("--wait-time", help="Pause time for the next "
 | 
			
		||||
@@ -317,24 +306,11 @@ class Monitoring:
 | 
			
		||||
###############################################################################
 | 
			
		||||
class LogMatcher(object):
 | 
			
		||||
    def __init__(self, server, port, success, log_url, host_vars, config):
 | 
			
		||||
        self.client = gear.Client()
 | 
			
		||||
        self.client.addServer(server, port)
 | 
			
		||||
        self.hosts = host_vars
 | 
			
		||||
        self.success = success
 | 
			
		||||
        self.log_url = log_url
 | 
			
		||||
        self.config_file = config
 | 
			
		||||
 | 
			
		||||
    def submitJobs(self, jobname, files, result):
 | 
			
		||||
        self.client.waitForServer(90)
 | 
			
		||||
        ret = []
 | 
			
		||||
        for f in files:
 | 
			
		||||
            output = self.makeOutput(f, result)
 | 
			
		||||
            output = json.dumps(output).encode("utf8")
 | 
			
		||||
            job = gear.TextJob(jobname, output)
 | 
			
		||||
            self.client.submitJob(job, background=True)
 | 
			
		||||
            ret.append(dict(handle=job.handle, arguments=output))
 | 
			
		||||
        return ret
 | 
			
		||||
 | 
			
		||||
    def makeOutput(self, file_object, result):
 | 
			
		||||
        output = {}
 | 
			
		||||
        output["retry"] = False
 | 
			
		||||
@@ -680,7 +656,6 @@ def run_build(build):
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    args = build.get("build_args")
 | 
			
		||||
    config_file = build.get("config_file")
 | 
			
		||||
 | 
			
		||||
    logging.info(
 | 
			
		||||
        "Processing logs for %s | %s | %s | %s",
 | 
			
		||||
@@ -690,64 +665,33 @@ def run_build(build):
 | 
			
		||||
        build["uuid"],
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    if args.download:
 | 
			
		||||
        logging.debug("Started fetching build logs")
 | 
			
		||||
        directory = "%s/%s" % (args.directory, build["uuid"])
 | 
			
		||||
        try:
 | 
			
		||||
            if not os.path.exists(directory):
 | 
			
		||||
                os.makedirs(directory)
 | 
			
		||||
        except PermissionError:
 | 
			
		||||
            logging.critical("Can not create directory %s" % directory)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logging.critical("Exception occurred %s on creating dir %s" % (
 | 
			
		||||
                e, directory))
 | 
			
		||||
    logging.debug("Started fetching build logs")
 | 
			
		||||
    directory = "%s/%s" % (args.directory, build["uuid"])
 | 
			
		||||
    try:
 | 
			
		||||
        if not os.path.exists(directory):
 | 
			
		||||
            os.makedirs(directory)
 | 
			
		||||
    except PermissionError:
 | 
			
		||||
        logging.critical("Can not create directory %s" % directory)
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logging.critical("Exception occurred %s on creating dir %s" % (
 | 
			
		||||
            e, directory))
 | 
			
		||||
 | 
			
		||||
        validate_ca = _verify_ca(args)
 | 
			
		||||
    validate_ca = _verify_ca(args)
 | 
			
		||||
 | 
			
		||||
        if 'log_url' in build and build["log_url"]:
 | 
			
		||||
            check_specified_files(build, validate_ca, args.timeout, directory)
 | 
			
		||||
        else:
 | 
			
		||||
            # NOTE: if build result does not contain 'log_url', so there is
 | 
			
		||||
            # no result files to parse, but we would like to have that
 | 
			
		||||
            # knowledge, so it will create own job-results.txt file that
 | 
			
		||||
            # contains:
 | 
			
		||||
            # build["end_time"] | build["result"]
 | 
			
		||||
            logging.info("There is no log url for the build %s, so no file can"
 | 
			
		||||
                         " be downloaded. Creating custom job-results.txt " %
 | 
			
		||||
                         build["uuid"])
 | 
			
		||||
            create_custom_result(build, directory)
 | 
			
		||||
 | 
			
		||||
        save_build_info(directory, build)
 | 
			
		||||
    if 'log_url' in build and build["log_url"]:
 | 
			
		||||
        check_specified_files(build, validate_ca, args.timeout, directory)
 | 
			
		||||
    else:
 | 
			
		||||
        # NOTE: As it was earlier, logs that contains status other than
 | 
			
		||||
        # "SUCCESS" or "FAILURE" will be parsed by Gearman service.
 | 
			
		||||
        logging.debug("Parsing content for gearman service")
 | 
			
		||||
        validate_ca = _verify_ca(args)
 | 
			
		||||
        results = dict(files=[], jobs=[], invocation={})
 | 
			
		||||
        files = check_specified_files(build, validate_ca, args.timeout)
 | 
			
		||||
        # NOTE: if build result does not contain 'log_url', so there is
 | 
			
		||||
        # no result files to parse, but we would like to have that
 | 
			
		||||
        # knowledge, so it will create own job-results.txt file that
 | 
			
		||||
        # contains:
 | 
			
		||||
        # build["end_time"] | build["result"]
 | 
			
		||||
        logging.info("There is no log url for the build %s, so no file can"
 | 
			
		||||
                     " be downloaded. Creating custom job-results.txt " %
 | 
			
		||||
                     build["uuid"])
 | 
			
		||||
        create_custom_result(build, directory)
 | 
			
		||||
 | 
			
		||||
        results["files"] = files
 | 
			
		||||
        lmc = LogMatcher(
 | 
			
		||||
            args.gearman_server,
 | 
			
		||||
            args.gearman_port,
 | 
			
		||||
            build["result"],
 | 
			
		||||
            build["log_url"],
 | 
			
		||||
            {},
 | 
			
		||||
            config_file
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        lmc.submitJobs("push-log", results["files"], build)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_connection(logstash_url):
 | 
			
		||||
    """Return True when Logstash service is reachable
 | 
			
		||||
 | 
			
		||||
    Check if service is up before pushing results.
 | 
			
		||||
    """
 | 
			
		||||
    host, port = logstash_url.split(':')
 | 
			
		||||
    logging.debug("Checking connection to %s on port %s" % (host, port))
 | 
			
		||||
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
 | 
			
		||||
        return s.connect_ex((host, port)) == 0
 | 
			
		||||
    save_build_info(directory, build)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def run_scraping(args, zuul_api_url, job_name=None, monitoring=None):
 | 
			
		||||
@@ -772,11 +716,6 @@ def run_scraping(args, zuul_api_url, job_name=None, monitoring=None):
 | 
			
		||||
 | 
			
		||||
    logging.info("Processing %d builds", len(builds))
 | 
			
		||||
 | 
			
		||||
    if args.logstash_url and not check_connection(args.logstash_url):
 | 
			
		||||
        logging.critical("Can not connect to logstash %s. "
 | 
			
		||||
                         "Is it up?" % args.logstash_url)
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    if builds:
 | 
			
		||||
        pool = multiprocessing.Pool(int(args.workers))
 | 
			
		||||
        try:
 | 
			
		||||
@@ -824,10 +763,6 @@ def main():
 | 
			
		||||
        monitoring = Monitoring()
 | 
			
		||||
        start_http_server(args.monitoring_port)
 | 
			
		||||
 | 
			
		||||
    if args.download and args.gearman_server and args.gearman_port:
 | 
			
		||||
        logging.critical("Can not use logscraper to send logs to gearman "
 | 
			
		||||
                         "and download logs. Choose one")
 | 
			
		||||
        sys.exit(1)
 | 
			
		||||
    while True:
 | 
			
		||||
        run(args, monitoring)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -15,7 +15,6 @@
 | 
			
		||||
# under the License.
 | 
			
		||||
 | 
			
		||||
import datetime
 | 
			
		||||
import json
 | 
			
		||||
import tempfile
 | 
			
		||||
 | 
			
		||||
from logscraper import logscraper
 | 
			
		||||
@@ -144,8 +143,7 @@ class _MockedPoolMapAsyncResult:
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FakeArgs(object):
 | 
			
		||||
    def __init__(self, zuul_api_url=None, gearman_server=None,
 | 
			
		||||
                 gearman_port=None, follow=False, insecure=False,
 | 
			
		||||
    def __init__(self, zuul_api_url=None, follow=False, insecure=False,
 | 
			
		||||
                 checkpoint_file=None, ignore_checkpoint=None,
 | 
			
		||||
                 logstash_url=None, workers=None, max_skipped=None,
 | 
			
		||||
                 job_name=None, download=None, directory=None,
 | 
			
		||||
@@ -154,8 +152,6 @@ class FakeArgs(object):
 | 
			
		||||
                 timeout=None):
 | 
			
		||||
 | 
			
		||||
        self.zuul_api_url = zuul_api_url
 | 
			
		||||
        self.gearman_server = gearman_server
 | 
			
		||||
        self.gearman_port = gearman_port
 | 
			
		||||
        self.follow = follow
 | 
			
		||||
        self.insecure = insecure
 | 
			
		||||
        self.checkpoint_file = checkpoint_file
 | 
			
		||||
@@ -251,41 +247,17 @@ class TestScraper(base.TestCase):
 | 
			
		||||
                zuul_api_url=['http://somehost.com/api/tenant/tenant1',
 | 
			
		||||
                              'http://somehost.com/api/tenant/tenant2',
 | 
			
		||||
                              'http://somehost.com/api/tenant/tenant3'],
 | 
			
		||||
                gearman_server='localhost',
 | 
			
		||||
                job_name=['testjob1', 'testjob2'])
 | 
			
		||||
            args = logscraper.get_arguments()
 | 
			
		||||
            logscraper.run(args, mock_monitoring)
 | 
			
		||||
            self.assertEqual(2, mock_scraping.call_count)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('socket.socket')
 | 
			
		||||
    def test_check_connection(self, mock_socket):
 | 
			
		||||
        with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
 | 
			
		||||
            mock_args.return_value = FakeArgs(
 | 
			
		||||
                zuul_api_url='somehost.com',
 | 
			
		||||
                gearman_server='localhost',
 | 
			
		||||
                logstash_url='localhost:9999')
 | 
			
		||||
            args = logscraper.get_arguments()
 | 
			
		||||
            logscraper.check_connection(args.logstash_url)
 | 
			
		||||
            self.assertTrue(mock_socket.called)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('socket.socket')
 | 
			
		||||
    def test_check_connection_wrong_host(self, mock_socket):
 | 
			
		||||
        with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
 | 
			
		||||
            mock_args.return_value = FakeArgs(
 | 
			
		||||
                zuul_api_url='somehost.com',
 | 
			
		||||
                gearman_server='localhost',
 | 
			
		||||
                logstash_url='localhost')
 | 
			
		||||
            args = logscraper.get_arguments()
 | 
			
		||||
            self.assertRaises(ValueError, logscraper.check_connection,
 | 
			
		||||
                              args.logstash_url)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('logscraper.logscraper.get_builds',
 | 
			
		||||
                return_value=iter([{'_id': '1234'}]))
 | 
			
		||||
    @mock.patch('argparse.ArgumentParser.parse_args')
 | 
			
		||||
    def test_get_last_job_results(self, mock_args, mock_get_builds):
 | 
			
		||||
        mock_args.return_value = FakeArgs(
 | 
			
		||||
            zuul_api_url='http://somehost.com/api/tenant/sometenant',
 | 
			
		||||
            gearman_server='localhost',
 | 
			
		||||
            checkpoint_file='/tmp/testfile')
 | 
			
		||||
        args = logscraper.get_arguments()
 | 
			
		||||
        some_config = logscraper.Config(args, args.zuul_api_url)
 | 
			
		||||
@@ -306,6 +278,7 @@ class TestScraper(base.TestCase):
 | 
			
		||||
            'someuuid', None, 10)
 | 
			
		||||
        self.assertRaises(ValueError, make_fake_list, job_result)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('sqlite3.connect', return_value=mock.MagicMock())
 | 
			
		||||
    @mock.patch('logscraper.logscraper.load_config')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.save_build_info')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.check_specified_files')
 | 
			
		||||
@@ -313,16 +286,13 @@ class TestScraper(base.TestCase):
 | 
			
		||||
    @mock.patch('os.path.isfile')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.check_specified_files',
 | 
			
		||||
                return_value=['job-output.txt'])
 | 
			
		||||
    @mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
 | 
			
		||||
    @mock.patch('argparse.ArgumentParser.parse_args',
 | 
			
		||||
                return_value=FakeArgs(
 | 
			
		||||
                    zuul_api_url=['http://somehost.com/api/tenant/tenant1'],
 | 
			
		||||
                    gearman_server='localhost',
 | 
			
		||||
                    gearman_port=4731,
 | 
			
		||||
                    workers=1))
 | 
			
		||||
    def test_run_scraping(self, mock_args, mock_submit, mock_files,
 | 
			
		||||
    def test_run_scraping(self, mock_args,  mock_files,
 | 
			
		||||
                          mock_isfile, mock_readfile, mock_specified_files,
 | 
			
		||||
                          mock_save_buildinfo, mock_config):
 | 
			
		||||
                          mock_save_buildinfo, mock_config, mock_sqlite):
 | 
			
		||||
        with mock.patch('logscraper.logscraper.get_last_job_results'
 | 
			
		||||
                        ) as mock_job_results:
 | 
			
		||||
            with mock.patch('multiprocessing.pool.Pool.map_async',
 | 
			
		||||
@@ -333,12 +303,9 @@ class TestScraper(base.TestCase):
 | 
			
		||||
                mock_job_results.return_value = [builds_result[0]]
 | 
			
		||||
                logscraper.run_scraping(
 | 
			
		||||
                    args, 'http://somehost.com/api/tenant/tenant1')
 | 
			
		||||
                self.assertEqual(builds_result[0]['uuid'],
 | 
			
		||||
                                 mock_submit.call_args.args[2]['uuid'])
 | 
			
		||||
                self.assertTrue(mock_submit.called)
 | 
			
		||||
                self.assertEqual(builds_result[0],
 | 
			
		||||
                                 mock_specified_files.call_args.args[0])
 | 
			
		||||
                self.assertFalse(mock_save_buildinfo.called)
 | 
			
		||||
                self.assertTrue(mock_save_buildinfo.called)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('requests.get')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.Monitoring')
 | 
			
		||||
@@ -351,11 +318,12 @@ class TestScraper(base.TestCase):
 | 
			
		||||
                zuul_api_url=['http://somehost.com/api/tenant/tenant1',
 | 
			
		||||
                              'http://somehost.com/api/tenant/tenant2',
 | 
			
		||||
                              'http://somehost.com/api/tenant/tenant3'],
 | 
			
		||||
                gearman_server='localhost')
 | 
			
		||||
            )
 | 
			
		||||
            args = logscraper.get_arguments()
 | 
			
		||||
            logscraper.run(args, mock_monitoring)
 | 
			
		||||
            self.assertEqual(3, mock_scraping.call_count)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('sqlite3.connect', return_value=mock.MagicMock())
 | 
			
		||||
    @mock.patch('logscraper.logscraper.load_config')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.save_build_info')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.check_specified_files')
 | 
			
		||||
@@ -363,15 +331,14 @@ class TestScraper(base.TestCase):
 | 
			
		||||
    @mock.patch('os.path.isfile')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.check_specified_files',
 | 
			
		||||
                return_value=['job-output.txt'])
 | 
			
		||||
    @mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
 | 
			
		||||
    @mock.patch('argparse.ArgumentParser.parse_args',
 | 
			
		||||
                return_value=FakeArgs(
 | 
			
		||||
                    zuul_api_url=['http://somehost.com/api/tenant/tenant1'],
 | 
			
		||||
                    workers=1, download=True, directory="/tmp/testdir"))
 | 
			
		||||
    def test_run_scraping_download(self, mock_args, mock_submit, mock_files,
 | 
			
		||||
    def test_run_scraping_download(self, mock_args, mock_files,
 | 
			
		||||
                                   mock_isfile, mock_readfile,
 | 
			
		||||
                                   mock_specified_files, mock_save_buildinfo,
 | 
			
		||||
                                   mock_config):
 | 
			
		||||
                                   mock_config, mock_sqlite):
 | 
			
		||||
        with mock.patch('logscraper.logscraper.get_last_job_results'
 | 
			
		||||
                        ) as mock_job_results:
 | 
			
		||||
            with mock.patch(
 | 
			
		||||
@@ -385,12 +352,12 @@ class TestScraper(base.TestCase):
 | 
			
		||||
                logscraper.run_scraping(
 | 
			
		||||
                    args, 'http://somehost.com/api/tenant/tenant1')
 | 
			
		||||
 | 
			
		||||
            self.assertFalse(mock_submit.called)
 | 
			
		||||
            self.assertTrue(mock_specified_files.called)
 | 
			
		||||
            self.assertEqual(builds_result[0],
 | 
			
		||||
                             mock_specified_files.call_args.args[0])
 | 
			
		||||
            self.assertTrue(mock_save_buildinfo.called)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('sqlite3.connect', return_value=mock.MagicMock())
 | 
			
		||||
    @mock.patch('logscraper.logscraper.load_config')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.save_build_info')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.check_specified_files')
 | 
			
		||||
@@ -398,15 +365,14 @@ class TestScraper(base.TestCase):
 | 
			
		||||
    @mock.patch('os.path.isfile')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.check_specified_files',
 | 
			
		||||
                return_value=['job-output.txt'])
 | 
			
		||||
    @mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
 | 
			
		||||
    @mock.patch('argparse.ArgumentParser.parse_args',
 | 
			
		||||
                return_value=FakeArgs(
 | 
			
		||||
                    zuul_api_url=['http://somehost.com/api/tenant/tenant1'],
 | 
			
		||||
                    workers=1, download=True, directory="/tmp/testdir"))
 | 
			
		||||
    def test_run_scraping_monitoring(self, mock_args, mock_submit, mock_files,
 | 
			
		||||
    def test_run_scraping_monitoring(self, mock_args, mock_files,
 | 
			
		||||
                                     mock_isfile, mock_readfile,
 | 
			
		||||
                                     mock_specified_files, mock_save_buildinfo,
 | 
			
		||||
                                     mock_config):
 | 
			
		||||
                                     mock_config, mock_sqlite):
 | 
			
		||||
        with mock.patch('logscraper.logscraper.get_last_job_results'
 | 
			
		||||
                        ) as mock_job_results:
 | 
			
		||||
            with mock.patch(
 | 
			
		||||
@@ -424,7 +390,6 @@ class TestScraper(base.TestCase):
 | 
			
		||||
 | 
			
		||||
            self.assertEqual('job_name', monitoring.job_count._labelnames[0])
 | 
			
		||||
            self.assertEqual(2, len(monitoring.job_count._metrics))
 | 
			
		||||
            self.assertFalse(mock_submit.called)
 | 
			
		||||
            self.assertTrue(mock_specified_files.called)
 | 
			
		||||
            self.assertEqual(builds_result[0],
 | 
			
		||||
                             mock_specified_files.call_args.args[0])
 | 
			
		||||
@@ -432,14 +397,12 @@ class TestScraper(base.TestCase):
 | 
			
		||||
 | 
			
		||||
    @mock.patch('logscraper.logscraper.create_custom_result')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.check_specified_files')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
 | 
			
		||||
    @mock.patch('gear.BaseClient.waitForServer')
 | 
			
		||||
    @mock.patch('argparse.ArgumentParser.parse_args',
 | 
			
		||||
                return_value=FakeArgs(
 | 
			
		||||
                    zuul_api_url=['http://somehost.com/api/tenant/tenant1'],
 | 
			
		||||
                    workers=1, download=True, directory="/tmp/testdir"))
 | 
			
		||||
    def test_run_aborted_download(self, mock_args, mock_gear, mock_gear_client,
 | 
			
		||||
                                  mock_check_files, mock_custom_result):
 | 
			
		||||
    def test_run_aborted_download(self, mock_args, mock_check_files,
 | 
			
		||||
                                  mock_custom_result):
 | 
			
		||||
        # Take job result that log_url is empty.
 | 
			
		||||
        result = builds_result[2]
 | 
			
		||||
        result['files'] = ['job-output.txt']
 | 
			
		||||
@@ -454,21 +417,17 @@ class TestScraper(base.TestCase):
 | 
			
		||||
 | 
			
		||||
        logscraper.run_build(result)
 | 
			
		||||
        logscraper.run_build(result_node_fail)
 | 
			
		||||
        self.assertFalse(mock_gear_client.called)
 | 
			
		||||
        self.assertFalse(mock_check_files.called)
 | 
			
		||||
        self.assertTrue(mock_custom_result.called)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('logscraper.logscraper.create_custom_result')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.check_specified_files')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.LogMatcher.submitJobs')
 | 
			
		||||
    @mock.patch('gear.BaseClient.waitForServer')
 | 
			
		||||
    @mock.patch('argparse.ArgumentParser.parse_args',
 | 
			
		||||
                return_value=FakeArgs(
 | 
			
		||||
                    zuul_api_url=['http://somehost.com/api/tenant/tenant1'],
 | 
			
		||||
                    workers=1, gearman_server='localhost',
 | 
			
		||||
                    gearman_port='4731'))
 | 
			
		||||
    def test_run_aborted(self, mock_args, mock_gear, mock_gear_client,
 | 
			
		||||
                         mock_check_files, mock_custom_result):
 | 
			
		||||
                    workers=1))
 | 
			
		||||
    def test_run_aborted(self, mock_args, mock_check_files,
 | 
			
		||||
                         mock_custom_result):
 | 
			
		||||
        # Take job result that build_status is "ABORTED" or "NODE_FAILURE"
 | 
			
		||||
        result = builds_result[2]
 | 
			
		||||
        result['files'] = ['job-output.txt']
 | 
			
		||||
@@ -483,9 +442,8 @@ class TestScraper(base.TestCase):
 | 
			
		||||
 | 
			
		||||
        logscraper.run_build(result)
 | 
			
		||||
        logscraper.run_build(result_node_fail)
 | 
			
		||||
        self.assertTrue(mock_gear_client.called)
 | 
			
		||||
        self.assertTrue(mock_check_files.called)
 | 
			
		||||
        self.assertFalse(mock_custom_result.called)
 | 
			
		||||
        self.assertFalse(mock_check_files.called)
 | 
			
		||||
        self.assertTrue(mock_custom_result.called)
 | 
			
		||||
 | 
			
		||||
    @mock.patch('requests.get')
 | 
			
		||||
    @mock.patch('logscraper.logscraper.Monitoring')
 | 
			
		||||
@@ -499,7 +457,7 @@ class TestScraper(base.TestCase):
 | 
			
		||||
                zuul_api_url=['http://somehost.com/api/tenant/tenant1',
 | 
			
		||||
                              'http://somehost.com/api/tenant/tenant2',
 | 
			
		||||
                              'http://somehost.com/api/tenant/tenant3'],
 | 
			
		||||
                gearman_server='localhost')
 | 
			
		||||
            )
 | 
			
		||||
            args = logscraper.get_arguments()
 | 
			
		||||
 | 
			
		||||
            logscraper.run(args, mock_monitoring)
 | 
			
		||||
@@ -609,14 +567,15 @@ class TestScraper(base.TestCase):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestConfig(base.TestCase):
 | 
			
		||||
    @mock.patch('sqlite3.connect', return_value=mock.MagicMock())
 | 
			
		||||
    @mock.patch('logscraper.logscraper.load_config')
 | 
			
		||||
    @mock.patch('sys.exit')
 | 
			
		||||
    def test_config_object(self, mock_sys, mock_config):
 | 
			
		||||
    def test_config_object(self, mock_sys, mock_config, mock_sqlite):
 | 
			
		||||
        # Assume that url is wrong so it raise IndexError
 | 
			
		||||
        with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
 | 
			
		||||
            mock_args.return_value = FakeArgs(
 | 
			
		||||
                zuul_api_url='somehost.com',
 | 
			
		||||
                gearman_server='localhost')
 | 
			
		||||
            )
 | 
			
		||||
            args = logscraper.get_arguments()
 | 
			
		||||
            self.assertRaises(IndexError, logscraper.Config, args,
 | 
			
		||||
                              args.zuul_api_url)
 | 
			
		||||
@@ -624,7 +583,7 @@ class TestConfig(base.TestCase):
 | 
			
		||||
        with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
 | 
			
		||||
            mock_args.return_value = FakeArgs(
 | 
			
		||||
                zuul_api_url='https://somehost.com',
 | 
			
		||||
                gearman_server='localhost')
 | 
			
		||||
            )
 | 
			
		||||
            args = logscraper.get_arguments()
 | 
			
		||||
            logscraper.Config(args, args.zuul_api_url)
 | 
			
		||||
            mock_sys.assert_called()
 | 
			
		||||
@@ -636,7 +595,6 @@ class TestConfig(base.TestCase):
 | 
			
		||||
        # correct url without job name
 | 
			
		||||
        mock_args.return_value = FakeArgs(
 | 
			
		||||
            zuul_api_url='http://somehost.com/api/tenant/sometenant',
 | 
			
		||||
            gearman_server='localhost',
 | 
			
		||||
            checkpoint_file='/tmp/testfile')
 | 
			
		||||
        args = logscraper.get_arguments()
 | 
			
		||||
        some_config = logscraper.Config(args, args.zuul_api_url)
 | 
			
		||||
@@ -656,161 +614,6 @@ class TestLogMatcher(base.TestCase):
 | 
			
		||||
            }]
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    @mock.patch('logscraper.logscraper.load_config')
 | 
			
		||||
    @mock.patch('gear.TextJob')
 | 
			
		||||
    @mock.patch('gear.Client.submitJob')
 | 
			
		||||
    @mock.patch('gear.BaseClient.waitForServer')
 | 
			
		||||
    def test_submitJobs(self, mock_gear, mock_gear_client, mock_gear_job,
 | 
			
		||||
                        mock_load_config):
 | 
			
		||||
        result = builds_result[0]
 | 
			
		||||
        result['files'] = ['job-output.txt']
 | 
			
		||||
        result['tenant'] = 'sometenant'
 | 
			
		||||
        parsed_job = {
 | 
			
		||||
            "build_branch": "master",
 | 
			
		||||
            "build_change": 806255,
 | 
			
		||||
            "build_duration": 307.0,
 | 
			
		||||
            "build_name": "openstack-tox-py38",
 | 
			
		||||
            "build_node": "zuul-executor",
 | 
			
		||||
            "build_patchset": "9",
 | 
			
		||||
            "build_queue": "check",
 | 
			
		||||
            "build_ref": "refs/changes/55/806255/9",
 | 
			
		||||
            "build_set": {"uuid": "bf11828235c649ff859ad87d7c4aa525"},
 | 
			
		||||
            "build_status": "SUCCESS",
 | 
			
		||||
            "build_uuid": "bf11828235c649ff859ad87d7c4aa525",
 | 
			
		||||
            "build_zuul_url": "N/A",
 | 
			
		||||
            "filename": "job-output.txt",
 | 
			
		||||
            "log_url": "https://t.com/openstack/a0f8968/job-output.txt",
 | 
			
		||||
            "node_provider": "local",
 | 
			
		||||
            "project": "openstack/tempest",
 | 
			
		||||
            "tenant": "sometenant",
 | 
			
		||||
            "voting": 1}
 | 
			
		||||
 | 
			
		||||
        expected_gear_job = {"retry": False, "event": {
 | 
			
		||||
            "fields": parsed_job,
 | 
			
		||||
            "tags": ["job-output.txt", "console", "console.html"]},
 | 
			
		||||
            "source_url": "https://t.com/openstack/a0f8968/job-output.txt"}
 | 
			
		||||
        mock_load_config.return_value = self.config_file
 | 
			
		||||
        with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
 | 
			
		||||
            mock_args.return_value = FakeArgs(
 | 
			
		||||
                zuul_api_url='http://somehost.com/api/tenant/sometenant',
 | 
			
		||||
                gearman_server='localhost',
 | 
			
		||||
                gearman_port='4731')
 | 
			
		||||
            args = logscraper.get_arguments()
 | 
			
		||||
            lmc = logscraper.LogMatcher(args.gearman_server, args.gearman_port,
 | 
			
		||||
                                        result['result'], result['log_url'],
 | 
			
		||||
                                        {}, self.config_file)
 | 
			
		||||
            lmc.submitJobs('push-log', result['files'], result)
 | 
			
		||||
            mock_gear_client.assert_called_once()
 | 
			
		||||
            self.assertEqual(
 | 
			
		||||
                expected_gear_job,
 | 
			
		||||
                json.loads(mock_gear_job.call_args.args[1].decode('utf-8'))
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    @mock.patch('logscraper.logscraper.load_config')
 | 
			
		||||
    @mock.patch('gear.TextJob')
 | 
			
		||||
    @mock.patch('gear.Client.submitJob')
 | 
			
		||||
    @mock.patch('gear.BaseClient.waitForServer')
 | 
			
		||||
    def test_submitJobs_failure(self, mock_gear, mock_gear_client,
 | 
			
		||||
                                mock_gear_job, mock_load_config):
 | 
			
		||||
        # Take job result that build_status is "ABORTED"
 | 
			
		||||
        result = builds_result[1]
 | 
			
		||||
        result['files'] = ['job-output.txt']
 | 
			
		||||
        result['tenant'] = 'sometenant'
 | 
			
		||||
        parsed_job = {
 | 
			
		||||
            'build_branch': 'master',
 | 
			
		||||
            'build_change': 816445,
 | 
			
		||||
            "build_duration": 603.0,
 | 
			
		||||
            'build_name': 'tripleo-centos-8',
 | 
			
		||||
            'build_node': 'zuul-executor',
 | 
			
		||||
            'build_patchset': '1',
 | 
			
		||||
            'build_queue': 'check',
 | 
			
		||||
            'build_ref': 'refs/changes/45/816445/1',
 | 
			
		||||
            'build_set': {'uuid': '4a0ffebe30a94efe819fffc03cf33ea4'},
 | 
			
		||||
            'build_status': 'FAILURE',
 | 
			
		||||
            'build_uuid': '4a0ffebe30a94efe819fffc03cf33ea4',
 | 
			
		||||
            'build_zuul_url': 'N/A',
 | 
			
		||||
            'filename': 'job-output.txt',
 | 
			
		||||
            'log_url': 'https://t.com/tripleo-8/3982864/job-output.txt',
 | 
			
		||||
            'node_provider': 'local',
 | 
			
		||||
            'project': 'openstack/tripleo-ansible',
 | 
			
		||||
            'tenant': 'sometenant',
 | 
			
		||||
            'voting': 1}
 | 
			
		||||
 | 
			
		||||
        expected_gear_job = {"retry": False, "event": {
 | 
			
		||||
            "fields": parsed_job,
 | 
			
		||||
            "tags": ["job-output.txt", "console", "console.html"]},
 | 
			
		||||
            "source_url": "https://t.com/tripleo-8/3982864/job-output.txt"}
 | 
			
		||||
        mock_load_config.return_value = self.config_file
 | 
			
		||||
        with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
 | 
			
		||||
            mock_args.return_value = FakeArgs(
 | 
			
		||||
                zuul_api_url='http://somehost.com/api/tenant/sometenant',
 | 
			
		||||
                gearman_server='localhost',
 | 
			
		||||
                gearman_port='4731')
 | 
			
		||||
            args = logscraper.get_arguments()
 | 
			
		||||
            lmc = logscraper.LogMatcher(args.gearman_server, args.gearman_port,
 | 
			
		||||
                                        result['result'], result['log_url'],
 | 
			
		||||
                                        {}, self.config_file)
 | 
			
		||||
            lmc.submitJobs('push-log', result['files'], result)
 | 
			
		||||
            mock_gear_client.assert_called_once()
 | 
			
		||||
            self.assertEqual(
 | 
			
		||||
                expected_gear_job,
 | 
			
		||||
                json.loads(mock_gear_job.call_args.args[1].decode('utf-8'))
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    @mock.patch('logscraper.logscraper.load_config')
 | 
			
		||||
    @mock.patch('gear.TextJob')
 | 
			
		||||
    @mock.patch('gear.Client.submitJob')
 | 
			
		||||
    @mock.patch('gear.BaseClient.waitForServer')
 | 
			
		||||
    def test_submitJobs_aborted(self, mock_gear, mock_gear_client,
 | 
			
		||||
                                mock_gear_job, mock_load_config):
 | 
			
		||||
        # Take job result that build_status is "ABORTED"
 | 
			
		||||
        result = builds_result[2]
 | 
			
		||||
        result['files'] = ['job-output.txt']
 | 
			
		||||
        result['tenant'] = 'sometenant'
 | 
			
		||||
        parsed_job = {
 | 
			
		||||
            'build_branch': 'stable/victoria',
 | 
			
		||||
            'build_change': 816486,
 | 
			
		||||
            'build_duration': 18,
 | 
			
		||||
            'build_name': 'openstack-tox-lower-constraints',
 | 
			
		||||
            'build_node': 'zuul-executor',
 | 
			
		||||
            'build_patchset': '1',
 | 
			
		||||
            'build_queue': 'check',
 | 
			
		||||
            'build_ref': 'refs/changes/86/816486/1',
 | 
			
		||||
            'build_set': {'uuid': 'bd044dfe3ecc484fbbf74fdeb7fb56aa'},
 | 
			
		||||
            'build_status': 'FAILURE',
 | 
			
		||||
            'build_uuid': 'bd044dfe3ecc484fbbf74fdeb7fb56aa',
 | 
			
		||||
            'build_zuul_url': 'N/A',
 | 
			
		||||
            'filename': 'job-output.txt',
 | 
			
		||||
            'log_url': 'job-output.txt',
 | 
			
		||||
            'node_provider': 'local',
 | 
			
		||||
            'project': 'openstack/nova',
 | 
			
		||||
            'tenant': 'sometenant',
 | 
			
		||||
            'voting': 1}
 | 
			
		||||
 | 
			
		||||
        # NOTE: normally ABORTED jobs does not provide log_url,
 | 
			
		||||
        # so source_url will be just a file to iterate.
 | 
			
		||||
        # In the logscraper, aborted jobs are just skipped.
 | 
			
		||||
        expected_gear_job = {"retry": False, "event": {
 | 
			
		||||
            "fields": parsed_job,
 | 
			
		||||
            "tags": ["job-output.txt", "console", "console.html"]},
 | 
			
		||||
            "source_url": "job-output.txt"}
 | 
			
		||||
        mock_load_config.return_value = self.config_file
 | 
			
		||||
        with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
 | 
			
		||||
            mock_args.return_value = FakeArgs(
 | 
			
		||||
                zuul_api_url='http://somehost.com/api/tenant/sometenant',
 | 
			
		||||
                gearman_server='localhost',
 | 
			
		||||
                gearman_port='4731')
 | 
			
		||||
            args = logscraper.get_arguments()
 | 
			
		||||
            lmc = logscraper.LogMatcher(args.gearman_server, args.gearman_port,
 | 
			
		||||
                                        result['result'], result['log_url'],
 | 
			
		||||
                                        {}, self.config_file)
 | 
			
		||||
            lmc.submitJobs('push-log', result['files'], result)
 | 
			
		||||
            mock_gear_client.assert_called_once()
 | 
			
		||||
            self.assertEqual(
 | 
			
		||||
                expected_gear_job,
 | 
			
		||||
                json.loads(mock_gear_job.call_args.args[1].decode('utf-8'))
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    @mock.patch('builtins.open', new_callable=mock.mock_open())
 | 
			
		||||
    @mock.patch('os.path.isfile')
 | 
			
		||||
    @mock.patch('requests.get')
 | 
			
		||||
 
 | 
			
		||||
@@ -36,8 +36,6 @@ build_args:
 | 
			
		||||
  directory: /tmp/logscraper
 | 
			
		||||
  download: true
 | 
			
		||||
  follow: false
 | 
			
		||||
  gearman_port: 4730
 | 
			
		||||
  gearman_server: null
 | 
			
		||||
  insecure: true
 | 
			
		||||
  job_name: null
 | 
			
		||||
  logstash_url: null
 | 
			
		||||
 
 | 
			
		||||
@@ -1,5 +1,4 @@
 | 
			
		||||
pbr>=1.6         # Apache-2.0
 | 
			
		||||
gear<0.17
 | 
			
		||||
requests<2.27    # Apache-2.0
 | 
			
		||||
PyYAML<6.1       # MIT
 | 
			
		||||
tenacity
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user