diff --git a/.zuul.yaml b/.zuul.yaml index f72f7dc..fe8b709 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -1,13 +1,4 @@ --- -- job: - name: ci-log-processing-functional-test-centos-8-stream - description: Test is validating ci log processing services - run: ansible/playbooks/check-services.yml - voting: false - nodeset: - nodes: - - name: centos-8-stream - label: centos-8-stream - job: name: ci-log-processing-functional-test-centos-8-stream-sender description: Test is validating Logscraper and logsender services @@ -38,7 +29,7 @@ - job: name: ci-log-processing-build-image parent: opendev-build-docker-image - description: Build logscraper and loggearman images. + description: Build logscraper allowed-projects: openstack/ci-log-processing timeout: 2700 vars: &cilogprocessing_image_vars @@ -48,11 +39,6 @@ target: logscraper tags: &imagetag "{{ zuul.tag is defined | ternary([zuul.get('tag', '').split('.')[0], '.'.join(zuul.get('tag', '').split('.')[:2]), zuul.get('tag', '')], ['latest']) }}" - - context: loggearman/ - repository: cilogprocessing/loggearman - target: loggearman - tags: - *imagetag - job: name: ci-log-processing-upload-image @@ -69,8 +55,6 @@ soft: true - name: openstack-tox-py38 soft: true - - name: ci-log-processing-functional-test-centos-8-stream - soft: true - name: ci-log-processing-functional-test-centos-8-stream-sender soft: true - name: ci-log-processing-build-image @@ -86,14 +70,12 @@ - openstack-tox-pep8 - openstack-tox-py38 - ci-log-processing-build-image - - ci-log-processing-functional-test-centos-8-stream - ci-log-processing-functional-test-centos-8-stream-sender gate: jobs: - openstack-tox-linters - openstack-tox-pep8 - openstack-tox-py38 - - ci-log-processing-functional-test-centos-8-stream - ci-log-processing-functional-test-centos-8-stream-sender - ci-log-processing-build-image post: diff --git a/README.rst b/README.rst index 1975fa8..c501227 100644 --- a/README.rst +++ b/README.rst @@ -31,77 +31,10 @@ Dashboards objects versioning. Available workflows ------------------- -The Openstack CI Log Processing project is providing two configurations +The Openstack CI Log Processing project is provides configuration for sending logs from Zuul CI to the Opensearch host. -1. Logscraper, log gearman client, log gearman worker, logstash - -With this solution, log workflow looks like: - -.. code-block:: shell - - +------------------+ 1. Get last builds info +----------------+ - | |-------------------------> | | - | Logscraper | | Zuul API | - | |<------------------------- | | - +------------------+ 2. Fetch data +----------------+ - | - | - +-------------------+ - | - | - 3. Send queue | - logs to gearman | - client | - v - +------------------+ - | | - | Log gearman | - | client | - +------------------+ - +-------------- --------------+ - | | | - | 4. Consume queue, | | - | download log files | | - | | | - v v v - +---------------+ +----------------+ +---------------+ - | Log gearman | | Log gearman | | Log gearman | - | worker | | worker | | worker | - +---------------+ +----------------+ +---------------+ - | | | - | 5. Send to | | - | Logstash | | - | v | - | +----------------+ | - | | | | - +---------> | Logstash | <-------+ - | | - +----------------+ - | - 6. Send to | - Opensearch | - | - +--------v--------+ - | | - | Opensearch | - | | - +-----------------+ - - -On the beginning, this project was designed to use that solution, but -it have a few bottlenecks: -- log gearman client can use many memory, when log gearman worker is not fast, -- one log gearman worker is not enough even on small infrastructure, -- logstash service can fail, -- logscraper is checking if log files are available, then log gearman - is downloading the logs, which can make an issue on log sever, that - host does not have free socket. - -You can deploy your log workflow by using example Ansible playbook that -you can find in `ansible/playbooks/check-services.yml` in this project. - -2. Logscraper, logsender +Logscraper, logsender This workflow removes bottlenecks by removing: log gearman client, log gearman worker and logstash service. Logs are downloaded when diff --git a/ansible/playbooks/check-services.yml b/ansible/playbooks/check-services.yml deleted file mode 100644 index 8a4c2d8..0000000 --- a/ansible/playbooks/check-services.yml +++ /dev/null @@ -1,33 +0,0 @@ ---- -- hosts: all - become: true - vars: - # loggearman - worker - output_host: 0.0.0.0 - output_port: 9999 - gearman_host: 0.0.0.0 - gearman_port: 4730 - log_cert_verify: false - # loggearman - client - source_url: "" - gearman_client_host: "{{ gearman_host }}" - gearman_client_port: "{{ gearman_port }}" - # logscraper - tenant_builds: - - tenant: openstack - gearman_port: "{{ gearman_port }}" - gearman_server: "{{ gearman_host }}" - zuul_api_url: - - https://zuul.opendev.org/api/tenant/openstack - insecure: false - job_names: [] - download: false - pre_tasks: - - name: Update all packages - become: true - package: - name: "*" - state: latest - roles: - - check-services - - backup-dashboards-objects diff --git a/ansible/playbooks/service-loggearman.yml b/ansible/playbooks/service-loggearman.yml deleted file mode 100644 index 5f22797..0000000 --- a/ansible/playbooks/service-loggearman.yml +++ /dev/null @@ -1,6 +0,0 @@ ---- -- name: Configure log-gearman-client and log-gearman-worker tools - hosts: logscraper01.openstack.org - become: true - roles: - - loggearman diff --git a/ansible/roles/check-services/tasks/main.yml b/ansible/roles/check-services/tasks/main.yml index ecd92b0..10d4bc0 100644 --- a/ansible/roles/check-services/tasks/main.yml +++ b/ansible/roles/check-services/tasks/main.yml @@ -26,25 +26,6 @@ podman images --noheading quay.io/logscraper:dev | awk '{print $3}' register: _logscraper_image_id - - name: Build logscraper container image - shell: > - podman build -t quay.io/loggearman:dev -f loggearman/Dockerfile - args: - chdir: "{{ zuul.projects['opendev.org/openstack/ci-log-processing'].src_dir }}" - when: zuul is defined - - - name: Build loggearman container image - non Zuul - shell: > - podman build -t quay.io/loggearman:dev -f loggearman/Dockerfile - args: - chdir: "{{ playbook_dir }}" - when: zuul is not defined - - - name: Get loggearman image id - shell: | - podman images --noheading quay.io/loggearman:dev | awk '{print $3}' - register: _loggearman_image_id - - name: Print all images shell: | podman images @@ -53,7 +34,6 @@ set_fact: container_images: logscraper: "{{ _logscraper_image_id.stdout }}" - loggearman: "{{ _loggearman_image_id.stdout }}" ### OPENSEARCH #### - name: Setup Opensearch @@ -129,23 +109,6 @@ delay: 10 timeout: 300 -### Loggearman ### -- name: Setup loggearman service - include_role: - name: loggearman - -# Flush handlers before running test -- name: Force all notified handlers to run now - meta: flush_handlers - -### service validation ### -- name: Check if log gearman client is listening - wait_for: - host: "{{ gearman_host }}" - port: "{{ gearman_port }}" - delay: 10 - timeout: 300 - ### Logscraper ### - name: Setup logscraper service include_role: @@ -156,8 +119,6 @@ systemctl is-active -q {{ item }} loop: - logscraper-openstack - - loggearman-client - - loggearman-worker register: _service_status failed_when: _service_status.rc != 0 @@ -191,14 +152,6 @@ shell: | podman logs opensearch - - name: Get gearman client logs - shell: | - podman logs loggearman-client - - - name: Get gearman worker logs - shell: | - podman logs loggearman-worker - - name: Get indices to fail the test uri: url: "https://127.0.0.1:9200/_cat/indices" diff --git a/ansible/roles/loggearman/README.rst b/ansible/roles/loggearman/README.rst deleted file mode 100644 index 1dafecb..0000000 --- a/ansible/roles/loggearman/README.rst +++ /dev/null @@ -1,32 +0,0 @@ -Loggearman ansible role -======================= - -The goal of this role is to setup and configure service related -to `log-gearman-client` and `log-gearman-worker` scripts, that -were ported to this project repository from `puppet-log_processor repository -`__. - -Configuration -------------- - -The role is automatically deploying services: - -* log-gearman-client -* log-gearman-worker - -inside the container. - -Example playbook setup ----------------------- - -.. code-block:: yaml - - - name: Configure loggearman tool - hosts: localhost - become: true - vars: - source_url: https://localhost - output_hosts: mylogstashhost.com - log_cert_verify: True - roles: - - loggearman diff --git a/ansible/roles/loggearman/defaults/main.yml b/ansible/roles/loggearman/defaults/main.yml deleted file mode 100644 index ed08325..0000000 --- a/ansible/roles/loggearman/defaults/main.yml +++ /dev/null @@ -1,31 +0,0 @@ ---- -loggearman_user: loggearman -loggearman_group: loggearman -loggearman_gid: 10211 -loggearman_uid: 10211 - -loggearman_dir: /etc/loggearman -loggearman_log_dir: /var/log/loggearman - -container_images: - # FIXME: Move image to dedicated repository on Docker hub. - loggearman: quay.io/software-factory/loggearman:latest - -# Gearman client -gearman_client_host: 0.0.0.0 -gearman_client_port: 4730 -source_url: "" -zmq_publishers: [] -subunit_files: [] -source_files: [] - -# Gearman worker -gearman_host: 0.0.0.0 -gearman_port: 4730 -output_host: logstash.example.com -output_port: 9999 -output_mode: tcp -crm114_script: "" -crm114_data: "" -log_ca_certs: "" -log_cert_verify: true diff --git a/ansible/roles/loggearman/handlers/main.yml b/ansible/roles/loggearman/handlers/main.yml deleted file mode 100644 index 452434b..0000000 --- a/ansible/roles/loggearman/handlers/main.yml +++ /dev/null @@ -1,14 +0,0 @@ ---- -- name: restart loggearman client - service: - name: loggearman-client - state: restarted - daemon-reload: true - enabled: true - -- name: restart loggearman worker - service: - name: loggearman-worker - state: restarted - daemon-reload: true - enabled: true diff --git a/ansible/roles/loggearman/tasks/main.yml b/ansible/roles/loggearman/tasks/main.yml deleted file mode 100644 index 6a227ac..0000000 --- a/ansible/roles/loggearman/tasks/main.yml +++ /dev/null @@ -1,62 +0,0 @@ ---- -- name: Create decidated group - group: - name: "{{ loggearman_group }}" - gid: "{{ loggearman_gid }}" - state: present - -- name: Create dedicated user - user: - name: "{{ loggearman_user }}" - state: present - comment: "Dedicated user for loggearman" - group: "{{ loggearman_group }}" - uid: "{{ loggearman_uid }}" - shell: "/sbin/nologin" - create_home: false - -- name: Create dedicated directories - file: - path: "{{ item }}" - state: directory - owner: "{{ loggearman_user }}" - group: "{{ loggearman_group }}" - mode: "0755" - loop: - - "{{ loggearman_dir }}" - - "{{ loggearman_log_dir }}" - -- name: Init log files - file: - path: "{{ loggearman_log_dir }}/{{ item }}.log" - state: touch - owner: "{{ loggearman_user }}" - group: "{{ loggearman_group }}" - mode: "0644" - loop: - - client - - worker - -- name: Ensure container software is installed - package: - name: podman - state: present - -- name: Create configuration files - template: - src: "{{ item }}.yml.j2" - dest: "{{ loggearman_dir }}/{{ item }}.yml" - owner: "{{ loggearman_user }}" - group: "{{ loggearman_group }}" - mode: "0644" - loop: - - client - - worker - notify: - - restart loggearman {{ item }} - -- name: Configure loggearman service - include_tasks: service.yml - loop: - - client - - worker diff --git a/ansible/roles/loggearman/tasks/service.yml b/ansible/roles/loggearman/tasks/service.yml deleted file mode 100644 index fd05f7e..0000000 --- a/ansible/roles/loggearman/tasks/service.yml +++ /dev/null @@ -1,17 +0,0 @@ ---- -- name: Generate podman-loggearman-{{ item }} script - template: - src: loggearman.sh.j2 - dest: "/usr/local/bin/podman-loggearman-{{ item }}" - mode: '0755' - notify: - - restart loggearman {{ item }} - -- name: Generate systemd unit loggearman-{{ item }} - template: - src: loggearman.service.j2 - dest: "/etc/systemd/system/loggearman-{{ item }}.service" - owner: root - group: root - notify: - - restart loggearman {{ item }} diff --git a/ansible/roles/loggearman/templates/client.yml.j2 b/ansible/roles/loggearman/templates/client.yml.j2 deleted file mode 100644 index df01c77..0000000 --- a/ansible/roles/loggearman/templates/client.yml.j2 +++ /dev/null @@ -1,7 +0,0 @@ ---- -gearman-host: {{ gearman_client_host }} -gearman-port: {{ gearman_client_port }} -source-url: {{ source_url }} -zmq-publishers: {{ zmq_publishers }} -subunit-files: {{ subunit_files }} -source-files: {{ source_files }} diff --git a/ansible/roles/loggearman/templates/loggearman.service.j2 b/ansible/roles/loggearman/templates/loggearman.service.j2 deleted file mode 100644 index 231cc28..0000000 --- a/ansible/roles/loggearman/templates/loggearman.service.j2 +++ /dev/null @@ -1,16 +0,0 @@ -[Unit] -Description=loggearman {{ item }} service -After=syslog.target network.target -StartLimitInterval=20 -StartLimitBurst=5 - -[Service] -Type=simple -SyslogIdentifier=loggearman-{{ item }} -ExecStart=/usr/local/bin/podman-loggearman-{{ item }} -ExecStop=/usr/bin/podman stop loggearman-{{ item }} -Restart=always -RestartSec=5s - -[Install] -WantedBy=multi-user.target diff --git a/ansible/roles/loggearman/templates/loggearman.sh.j2 b/ansible/roles/loggearman/templates/loggearman.sh.j2 deleted file mode 100644 index e29c25e..0000000 --- a/ansible/roles/loggearman/templates/loggearman.sh.j2 +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash - -# MANAGED BY ANSIBLE -/usr/bin/podman run \ - --network host \ - --rm \ - --user 1000:1000 \ - --uidmap 0:{{ loggearman_uid + 1 }}:999 \ - --uidmap 1000:{{ loggearman_uid }}:1 \ - --name loggearman-{{ item }} \ - --volume {{ loggearman_dir }}:{{ loggearman_dir }}:z \ - --volume {{ loggearman_log_dir }}:{{ loggearman_log_dir }}:z \ - {{ container_images['loggearman'] }} \ - log-gearman-{{ item }} \ - -c {{ loggearman_dir }}/{{ item }}.yml \ - --foreground \ - -d {{ loggearman_log_dir }}/{{ item }}.log diff --git a/ansible/roles/loggearman/templates/worker.yml.j2 b/ansible/roles/loggearman/templates/worker.yml.j2 deleted file mode 100644 index 39857f3..0000000 --- a/ansible/roles/loggearman/templates/worker.yml.j2 +++ /dev/null @@ -1,16 +0,0 @@ ---- -gearman-host: {{ gearman_host }} -gearman-port: {{ gearman_port }} -output-host: {{ output_host }} -output-port: {{ output_port }} -output-mode: {{ output_mode }} -log-cert-verify: {{ log_cert_verify }} -{% if crm114_script %} -crm114-script: {{ crm114_script }} -{% endif %} -{% if crm114_data %} -crm114-data: {{ crm114_data }} -{% endif %} -{% if log_ca_certs %} -log-ca-certs: {{ log_ca_certs }} -{% endif %} diff --git a/ansible/roles/logscraper/README.rst b/ansible/roles/logscraper/README.rst index 4563464..a93b86f 100644 --- a/ansible/roles/logscraper/README.rst +++ b/ansible/roles/logscraper/README.rst @@ -2,8 +2,8 @@ Logscraper ansible role ======================= The goal of this role is to setup and configure service related -to logscraper script which is responsible to to push recent -zuul builds into log gearman processor. +to logscraper script which is responsible to pull latest Zuul CI job +logs to local storage. Requirements ------------ @@ -23,8 +23,6 @@ for example: vars: tenant_builds: - tenant: openstack - gearman_port: 4731 - gearman_server: logstash.openstack.org zuul_api_url: - https://zuul.opendev.org/api/tenant/openstack insecure: false @@ -57,8 +55,6 @@ and second one for getting logs from `sometenant` tenant. vars: tenant_builds: - tenant: openstack - gearman_port: 4731 - gearman_server: logstash.openstack.org zuul_api_url: - https://zuul.opendev.org/api/tenant/openstack insecure: False @@ -66,7 +62,6 @@ and second one for getting logs from `sometenant` tenant. zuul_api_url: - https://zuul.opendev.org/api/tenant/sometenant insecure: True - download: true download_dir: /mnt/logscraper file_list: - /etc/logscraper/my-downloadlist.yaml diff --git a/ansible/roles/logscraper/defaults/main.yml b/ansible/roles/logscraper/defaults/main.yml index e66726e..3146d51 100644 --- a/ansible/roles/logscraper/defaults/main.yml +++ b/ansible/roles/logscraper/defaults/main.yml @@ -12,8 +12,6 @@ container_images: # Example: # tenant_builds: # - tenant: openstack -# gearman_port: 4731 -# gearman_server: logstash.openstack.org # zuul_api_url: # - https://zuul.opendev.org/api/tenant/openstack # insecure: false @@ -22,13 +20,11 @@ container_images: # zuul_api_url: # - https://zuul.opendev.org/api/tenant/sometenant # insecure: true -# download: true # download_dir: /mnt/logscraper/sometenant # file_list: [] # job_name: # - test # - test_new -# logstash_url: https://somelogstash.com:9999 # max_skipped: 100 # debug: true # logscraper_wait_time: 120 diff --git a/ansible/roles/logscraper/tasks/service.yml b/ansible/roles/logscraper/tasks/service.yml index f04becf..11dbc70 100644 --- a/ansible/roles/logscraper/tasks/service.yml +++ b/ansible/roles/logscraper/tasks/service.yml @@ -35,7 +35,7 @@ dest: "{{ logscraper_dir }}/download-list-{{ item.tenant }}.yaml" owner: "{{ logscraper_user }}" group: "{{ logscraper_group }}" - mode: "0640" + mode: "0644" register: _download_file - name: Generate systemd unit diff --git a/ansible/roles/logscraper/templates/logscraper.sh.j2 b/ansible/roles/logscraper/templates/logscraper.sh.j2 index d030f12..cd60f77 100644 --- a/ansible/roles/logscraper/templates/logscraper.sh.j2 +++ b/ansible/roles/logscraper/templates/logscraper.sh.j2 @@ -18,4 +18,4 @@ --volume {{ item.download_dir }}:{{ item.download_dir }}:z \ {% endif %} {{ container_images['logscraper'] }} \ - /usr/bin/logscraper --config {{ logscraper_dir }}/logscraper-{{ item['tenant'] }}.config + /usr/local/bin/logscraper --config {{ logscraper_dir }}/logscraper-{{ item['tenant'] }}.config diff --git a/ansible/roles/logsender/templates/logsender.sh.j2 b/ansible/roles/logsender/templates/logsender.sh.j2 index f623d3b..6f28024 100644 --- a/ansible/roles/logsender/templates/logsender.sh.j2 +++ b/ansible/roles/logsender/templates/logsender.sh.j2 @@ -16,4 +16,4 @@ --volume {{ item['logsender_custom_ca_crt'] }}:{{ item['logsender_custom_ca_crt'] }}:z \ {% endif %} {{ container_images['logsender'] }} \ - /usr/bin/logsender --config {{ logscraper_dir }}/logsender-{{ item['tenant'] }}.config + /usr/local/bin/logsender --config {{ logscraper_dir }}/logsender-{{ item['tenant'] }}.config diff --git a/doc/source/index.rst b/doc/source/index.rst index 8665850..fac5a16 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -24,8 +24,6 @@ Indices and tables * :doc:`logscraper-role` * :doc:`logsender` * :doc:`logsender-role` - * :doc:`loggearman` - * :doc:`loggearman-role` .. toctree:: :maxdepth: 2 @@ -36,5 +34,3 @@ Indices and tables logscraper-role logsender logsender-role - loggearman - loggearman-role diff --git a/doc/source/loggearman-role.rst b/doc/source/loggearman-role.rst deleted file mode 120000 index 14ecda8..0000000 --- a/doc/source/loggearman-role.rst +++ /dev/null @@ -1 +0,0 @@ -../../ansible/roles/loggearman/README.rst \ No newline at end of file diff --git a/doc/source/loggearman.rst b/doc/source/loggearman.rst deleted file mode 100644 index 91f6dd0..0000000 --- a/doc/source/loggearman.rst +++ /dev/null @@ -1,22 +0,0 @@ -Loggearman -========== - -The Loggearman tools are responsible for listening events, -parse them, get logs from log server and push them to -the Logstash service. - - -Loggearman Client ------------------ - -The Loggearman Client is responsible for listening events that -comes on port 4730 (by default), parse them and redirect them to -German server, that later will be processed by loggearman worker. - - -Loggearman Worker ------------------ - -The Loggearman Worker is responsible to get log files from the -log server, parse them and send line by line to the Logstash service -with necessary fields like: build_uuid, build_name, etc. diff --git a/doc/source/logscraper.rst b/doc/source/logscraper.rst index 3b32261..3bf44f4 100644 --- a/doc/source/logscraper.rst +++ b/doc/source/logscraper.rst @@ -11,7 +11,7 @@ It is available by typing: logscraper --help - Fetch and push last Zuul CI job logs into gearman. + Fetch and push last Zuul CI job logs: optional arguments: -h, --help show this help message and exit @@ -20,25 +20,16 @@ It is available by typing: times. --job-name JOB_NAME CI job name(s). Parameter can be set multiple times. If not set it would scrape every latest builds. - --gearman-server GEARMAN_SERVER - Gearman host addresss - --gearman-port GEARMAN_PORT - Gearman listen port. Defaults to 4730. --follow Keep polling zuul builds --insecure Skip validating SSL cert --checkpoint-file CHECKPOINT_FILE File that will keep information about last uuid timestamp for a job. - --logstash-url LOGSTASH_URL - When provided, script will check connection to - Logstash service before sending to log processing - system. For example: logstash.local:9999 --workers WORKERS Worker processes for logscraper --max-skipped MAX_SKIPPED How many job results should be checked until last uuid written in checkpoint file is founded --debug Print more information - --download Download logs and do not send to gearman service --directory DIRECTORY Directory, where the logs will be stored. Defaults to: /tmp/logscraper @@ -51,30 +42,6 @@ Base on the use case, we can run logscraper. Example: -* periodical check if there are some new logs for `openstack` tenant: - -.. code-block:: - - logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /tmp/results-checkpoint --follow - -* one shot on getting logs from `zuul` tenant: - -.. code-block:: - - logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --checkpoint-file /tmp/zuul-result-timestamp - -* periodically scrape logs from tenants: `openstack`, `zuul` and `local` - -.. code-block:: - - logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --zuul-api-url https://zuul.opendev.org/api/tenant/local --checkpoint-file /tmp/someresults --follow - -* scrape logs from two defined job names: `tripleo-ci-centos-8-containers-multinode` and `openstack-tox-linters` for tenants: `openstack` and `local`: - -.. code-block:: - - logscraper --gearman-server localhost --job-name tripleo-ci-centos-8-containers-multinode --job-name openstack-tox-linters --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --zuul-api-url https://zuul.opendev.org/api/tenant/local - * download logs to /mnt/logscraper. NOTE: if you are using container service, this directory needs to be mounted! .. code-block:: @@ -97,13 +64,6 @@ Then you can execute commands that are described above. NOTE: if you want to use parameter `--checkpoint-file`, you need to mount a volume to the container, for example: -.. code-block:: - - docker run -v $(pwd):/checkpoint-dir:z -d logscraper logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint --follow - - -In this example, logscraper will download log files to the /mnt/logscraper directory: - .. code-block:: docker run -v $(pwd):/checkpoint-dir:z -v /mnt/logscraper:/mnt/logscraper:z -d logscraper logscraper --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint --directory /mnt/logscraper --download --follow diff --git a/loggearman/Dockerfile b/loggearman/Dockerfile deleted file mode 100644 index ed3cfe5..0000000 --- a/loggearman/Dockerfile +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright (C) 2021 Red Hat -# Copyright (C) 2022 Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -FROM quay.io/centos/centos:stream8 as loggearman - -ENV OSLO_PACKAGE_VERSION='0.0.1' -ENV PATH=~/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin -ENV LANG=en_US.UTF-8 - -RUN groupadd --gid 1000 loggearman && \ - useradd --home-dir /home/loggearman --gid 1000 --uid 1000 loggearman - -RUN dnf update -y && \ - dnf install -y python38 python38-setuptools \ - python38-devel python38-wheel \ - python38-pip git - -COPY . /tmp/src -RUN cd /tmp/src && \ - pip3 install -r requirements.txt && \ - python3 setup.py install && \ - rm -rf /tmp/src - -RUN dnf remove -y python3-devel git && \ - dnf autoremove -y && \ - dnf clean all && \ - rm -rf ~/.cache/pip - -USER loggearman diff --git a/loggearman/README.rst b/loggearman/README.rst deleted file mode 100644 index d35e6be..0000000 --- a/loggearman/README.rst +++ /dev/null @@ -1,8 +0,0 @@ -OpenStack Log Processor Module -============================== - -The Log Processor Module comes from `repository`_ with -applied patches from this `patchset`_. - -.. _repository: https://opendev.org/opendev/puppet-log_processor -.. _patchset: https://review.opendev.org/c/opendev/puppet-log_processor/+/809424 diff --git a/loggearman/loggearman/__init__.py b/loggearman/loggearman/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/loggearman/loggearman/client.py b/loggearman/loggearman/client.py deleted file mode 100755 index 5dbff17..0000000 --- a/loggearman/loggearman/client.py +++ /dev/null @@ -1,246 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import argparse -import daemon -import gear -import json -import logging -import os -import os.path -import re -import signal -import socket -import threading -import time -import yaml -import zmq - - -try: - import daemon.pidlockfile as pidfile_mod -except ImportError: - import daemon.pidfile as pidfile_mod - - -class EventProcessor(threading.Thread): - def __init__(self, zmq_address, gearman_client, files, source_url): - threading.Thread.__init__(self) - self.files = files - self.source_url = source_url - self.gearman_client = gearman_client - self.zmq_address = zmq_address - self._connect_zmq() - - def run(self): - while True: - try: - self._read_event() - except Exception: - # Assume that an error reading data from zmq or deserializing - # data received from zmq indicates a zmq error and reconnect. - logging.exception("ZMQ exception.") - self._connect_zmq() - - def _connect_zmq(self): - logging.debug("Connecting to zmq endpoint.") - self.context = zmq.Context() - self.socket = self.context.socket(zmq.SUB) - event_filter = b"onFinalized" - self.socket.setsockopt(zmq.SUBSCRIBE, event_filter) - self.socket.connect(self.zmq_address) - - def _read_event(self): - string = self.socket.recv().decode('utf-8') - event = json.loads(string.split(None, 1)[1]) - logging.debug("Jenkins event received: " + json.dumps(event)) - for fileopts in self.files: - output = {} - source_url, out_event = self._parse_event(event, fileopts) - job_filter = fileopts.get('job-filter') - if (job_filter and not re.match( - job_filter, out_event['fields']['build_name'])): - continue - build_queue_filter = fileopts.get('build-queue-filter') - if (build_queue_filter and not re.match( - build_queue_filter, out_event['fields']['build_queue'])): - continue - project_filter = fileopts.get('project-filter') - if (project_filter and not re.match( - project_filter, out_event['fields']['project'])): - continue - output['source_url'] = source_url - output['retry'] = fileopts.get('retry-get', False) - output['event'] = out_event - if 'subunit' in fileopts.get('name'): - job = gear.Job(b'push-subunit', - json.dumps(output).encode('utf8')) - else: - job = gear.Job(b'push-log', json.dumps(output).encode('utf8')) - try: - self.gearman_client.submitJob(job) - except Exception: - logging.exception("Exception submitting job to Gearman.") - - def _get_log_dir(self, event): - parameters = event["build"].get("parameters", {}) - base = parameters.get('LOG_PATH', 'UNKNOWN') - return base - - def _parse_fields(self, event, filename): - fields = {} - fields["filename"] = filename - fields["build_name"] = event.get("name", "UNKNOWN") - fields["build_status"] = event["build"].get("status", "UNKNOWN") - fields["build_node"] = event["build"].get("node_name", "UNKNOWN") - fields["build_master"] = event["build"].get("host_name", "UNKNOWN") - parameters = event["build"].get("parameters", {}) - fields["project"] = parameters.get("ZUUL_PROJECT", "UNKNOWN") - # The voting value is "1" for voting, "0" for non-voting - fields["voting"] = parameters.get("ZUUL_VOTING", "UNKNOWN") - # TODO(clarkb) can we do better without duplicated data here? - fields["build_uuid"] = parameters.get("ZUUL_UUID", "UNKNOWN") - fields["build_short_uuid"] = fields["build_uuid"][:7] - fields["build_queue"] = parameters.get("ZUUL_PIPELINE", "UNKNOWN") - fields["build_ref"] = parameters.get("ZUUL_REF", "UNKNOWN") - fields["build_branch"] = parameters.get("ZUUL_BRANCH", "UNKNOWN") - fields["build_zuul_url"] = parameters.get("ZUUL_URL", "UNKNOWN") - if parameters.get("ZUUL_CHANGE"): - fields["build_change"] = parameters.get("ZUUL_CHANGE", "UNKNOWN") - fields["build_patchset"] = parameters.get("ZUUL_PATCHSET", - "UNKNOWN") - elif parameters.get("ZUUL_NEWREV"): - fields["build_newrev"] = parameters.get("ZUUL_NEWREV", - "UNKNOWN") - if ["build_node"] != "UNKNOWN": - node_provider = '-'.join( - fields["build_node"].split('-')[-3:-1]) - fields["node_provider"] = node_provider or "UNKNOWN" - else: - fields["node_provider"] = "UNKNOWN" - return fields - - def _parse_event(self, event, fileopts): - fields = self._parse_fields(event, fileopts['name']) - log_dir = self._get_log_dir(event) - source_url = fileopts.get('source-url', self.source_url) + '/' + \ - os.path.join(log_dir, fileopts['name']) - fields["log_url"] = source_url - out_event = {} - out_event["fields"] = fields - out_event["tags"] = [os.path.basename(fileopts['name'])] + \ - fileopts.get('tags', []) - return source_url, out_event - - -class Server(object): - def __init__(self, config, debuglog): - # Config init. - self.config = config - self.source_url = self.config['source-url'] - # Pythong logging output file. - self.debuglog = debuglog - self.processors = [] - - def setup_logging(self): - if self.debuglog: - logging.basicConfig(format='%(asctime)s %(message)s', - filename=self.debuglog, level=logging.DEBUG) - else: - # Prevent leakage into the logstash log stream. - logging.basicConfig(level=logging.CRITICAL) - logging.debug("Log pusher starting.") - - def setup_processors(self): - for publisher in self.config['zmq-publishers']: - gearclient = gear.Client() - host = self.config.get('gearman-host', 'localhost') - port = self.config.get('gearman-port', 4730) - gearclient.addServer(host, port=port) - gearclient.waitForServer() - log_processor = EventProcessor( - publisher, gearclient, - self.config['source-files'], self.source_url) - subunit_processor = EventProcessor( - publisher, gearclient, - self.config['subunit-files'], self.source_url) - self.processors.append(log_processor) - self.processors.append(subunit_processor) - - def wait_for_name_resolution(self, host, port): - while True: - try: - socket.getaddrinfo(host, port) - except socket.gaierror as e: - if e.errno == socket.EAI_AGAIN: - logging.debug("Temporary failure in name resolution") - time.sleep(2) - continue - else: - raise - break - - def main(self): - statsd_host = os.environ.get('STATSD_HOST') - statsd_port = int(os.environ.get('STATSD_PORT', 8125)) - statsd_prefix = os.environ.get('STATSD_PREFIX', 'logstash.geard') - if statsd_host: - self.wait_for_name_resolution(statsd_host, statsd_port) - self.gearserver = gear.Server( - port=self.config.get('gearman-port', 4730), - statsd_host=statsd_host, - statsd_port=statsd_port, - statsd_prefix=statsd_prefix) - - self.setup_processors() - for processor in self.processors: - processor.daemon = True - processor.start() - while True: - signal.pause() - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("-c", "--config", required=True, - help="Path to yaml config file.") - parser.add_argument("-d", "--debuglog", - help="Enable debug log. " - "Specifies file to write log to.") - parser.add_argument("--foreground", action='store_true', - help="Run in the foreground.") - parser.add_argument("-p", "--pidfile", - default="/var/run/jenkins-log-pusher/" - "jenkins-log-gearman-client.pid", - help="PID file to lock during daemonization.") - args = parser.parse_args() - - with open(args.config, 'r') as config_stream: - config = yaml.safe_load(config_stream) - server = Server(config, args.debuglog) - - if args.foreground: - server.setup_logging() - server.main() - else: - pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10) - with daemon.DaemonContext(pidfile=pidfile): - server.setup_logging() - server.main() - - -if __name__ == '__main__': - main() diff --git a/loggearman/loggearman/worker.py b/loggearman/loggearman/worker.py deleted file mode 100755 index 27e9d8a..0000000 --- a/loggearman/loggearman/worker.py +++ /dev/null @@ -1,566 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import argparse -import daemon -import gear -import json -import logging -import os -import queue -import re -import requests -import select -import socket -import subprocess -import sys -import threading -import time -import yaml - -import paho.mqtt.publish as publish - -try: - import daemon.pidlockfile as pidfile_mod -except ImportError: - import daemon.pidfile as pidfile_mod - - -def semi_busy_wait(seconds): - # time.sleep() may return early. If it does sleep() again and repeat - # until at least the number of seconds specified has elapsed. - start_time = time.time() - while True: - time.sleep(seconds) - cur_time = time.time() - seconds = seconds - (cur_time - start_time) - if seconds <= 0.0: - return - - -class FilterException(Exception): - pass - - -class CRM114Filter(object): - def __init__(self, script, path, build_status): - self.p = None - self.script = script - self.path = path - self.build_status = build_status - if build_status not in ['SUCCESS', 'FAILURE']: - return - if not os.path.exists(path): - os.makedirs(path) - args = [script, path, build_status] - self.p = subprocess.Popen(args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE, - close_fds=True) - - def process(self, data): - if not self.p: - return True - self.p.stdin.write(data['message'].encode('utf-8') + '\n') - (r, w, x) = select.select([self.p.stdout], [], - [self.p.stdin, self.p.stdout], 20) - if not r: - self.p.kill() - raise FilterException('Timeout reading from CRM114') - r = self.p.stdout.readline() - if not r: - err = self.p.stderr.read() - if err: - raise FilterException(err) - else: - raise FilterException('Early EOF from CRM114') - r = r.strip() - data['error_pr'] = float(r) - return True - - def _catchOSError(self, method): - try: - method() - except OSError: - logging.exception("Subprocess cleanup failed.") - - def close(self): - if not self.p: - return - # CRM114 should die when its stdinput is closed. Close that - # fd along with stdout and stderr then return. - self._catchOSError(self.p.stdin.close) - self._catchOSError(self.p.stdout.close) - self._catchOSError(self.p.stderr.close) - self._catchOSError(self.p.wait) - - -class CRM114FilterFactory(object): - name = "CRM114" - - def __init__(self, script, basepath): - self.script = script - self.basepath = basepath - # Precompile regexes - self.re_remove_suffix = re.compile(r'(\.[^a-zA-Z]+)?(\.gz)?$') - self.re_remove_dot = re.compile(r'\.') - - def create(self, fields): - # We only want the basename so that the same logfile at different - # paths isn't treated as different - filename = os.path.basename(fields['filename']) - # We want to collapse any numeric or compression suffixes so that - # nova.log and nova.log.1 and nova.log.1.gz are treated as the same - # logical file - filename = self.re_remove_suffix.sub(r'', filename) - filename = self.re_remove_dot.sub('_', filename) - path = os.path.join(self.basepath, filename) - return CRM114Filter(self.script, path, fields['build_status']) - - -class OsloSeverityFilter(object): - DATEFMT = r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}((\.|\,)\d{3,6})?' - SEVERITYFMT = '(DEBUG|INFO|WARNING|ERROR|TRACE|AUDIT|CRITICAL)' - OSLO_LOGMATCH = (r'^(?P%s)(?P(?P \d+)? ' - '(?P%s).*)' % - (DATEFMT, SEVERITYFMT)) - OSLORE = re.compile(OSLO_LOGMATCH) - - def process(self, data): - msg = data['message'] - m = self.OSLORE.match(msg) - if m: - data['severity'] = m.group('severity') - if data['severity'].lower == 'debug': - # Ignore debug-level lines - return False - return True - - def close(self): - pass - - -class OsloSeverityFilterFactory(object): - name = "OsloSeverity" - - def create(self, fields): - return OsloSeverityFilter() - - -class SystemdSeverityFilter(object): - '''Match systemd DEBUG level logs - - A line to match looks like: - - Aug 15 18:58:49.910786 hostname devstack@keystone.service[31400]: - DEBUG uwsgi ... - ''' - SYSTEMDDATE = r'\w+\s+\d+\s+\d{2}:\d{2}:\d{2}((\.|\,)\d{3,6})?' - DATEFMT = r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}((\.|\,)\d{3,6})?' - SEVERITYFMT = '(DEBUG|INFO|WARNING|ERROR|TRACE|AUDIT|CRITICAL)' - SYSTEMD_LOGMATCH = r'^(?P%s)( (\S+) \S+\[\d+\]\: ' \ - '(?P%s)?.*)' % (SYSTEMDDATE, SEVERITYFMT) - SYSTEMDRE = re.compile(SYSTEMD_LOGMATCH) - - def process(self, data): - msg = data['message'] - m = self.SYSTEMDRE.match(msg) - if m: - if m.group('severity') == 'DEBUG': - return False - return True - - def close(self): - pass - - -class SystemdSeverityFilterFactory(object): - name = "SystemdSeverity" - - def create(self, fields): - return SystemdSeverityFilter() - - -class LogRetriever(threading.Thread): - def __init__(self, gearman_worker, filters, logq, - log_cert_verify, log_ca_certs, mqtt=None): - threading.Thread.__init__(self) - self.gearman_worker = gearman_worker - self.filters = filters - self.logq = logq - self.mqtt = mqtt - self.log_cert_verify = log_cert_verify - self.log_ca_certs = log_ca_certs - - def run(self): - while True: - try: - self._handle_event() - except Exception: - logging.exception("Exception retrieving log event.") - - def _handle_event(self): - fields = {} - num_log_lines = 0 - source_url = '' - http_session = None - job = self.gearman_worker.getJob() - try: - arguments = json.loads(job.arguments.decode('utf-8')) - source_url = arguments['source_url'] - event = arguments['event'] - logging.debug("Handling event: " + json.dumps(event)) - fields = event.get('fields') or event.get('@fields') - tags = event.get('tags') or event.get('@tags') - if fields['build_status'] != 'ABORTED': - # Handle events ignoring aborted builds. These builds are - # discarded by zuul. - file_obj, http_session = self._open_log_file_url(source_url) - - try: - all_filters = [] - for f in self.filters: - logging.debug("Adding filter: %s" % f.name) - all_filters.append(f.create(fields)) - filters = all_filters - - base_event = {} - base_event.update(fields) - base_event["tags"] = tags - for line in self._retrieve_log_line(file_obj): - keep_line = True - out_event = base_event.copy() - out_event["message"] = line - new_filters = [] - for f in filters: - if not keep_line: - new_filters.append(f) - continue - try: - keep_line = f.process(out_event) - new_filters.append(f) - except FilterException: - logging.exception("Exception filtering event: " - "%s" % line.encode("utf-8")) - filters = new_filters - if keep_line: - self.logq.put(out_event) - num_log_lines += 1 - - logging.debug("Pushed %s log lines." % num_log_lines) - finally: - for f in all_filters: - f.close() - if http_session: - http_session.close() - job.sendWorkComplete() - # Only send mqtt events for log files we processed. - if self.mqtt and num_log_lines: - msg = json.dumps({ - 'build_uuid': fields.get('build_uuid'), - 'source_url': source_url, - 'status': 'success', - }) - self.mqtt.publish_single(msg, fields.get('project'), - fields.get('build_change'), - 'retrieve_logs', - fields.get('build_queue')) - except Exception as e: - logging.exception("Exception handling log event.") - job.sendWorkException(str(e).encode('utf-8')) - if self.mqtt: - msg = json.dumps({ - 'build_uuid': fields.get('build_uuid'), - 'source_url': source_url, - 'status': 'failure', - }) - self.mqtt.publish_single(msg, fields.get('project'), - fields.get('build_change'), - 'retrieve_logs', - fields.get('build_queue')) - - def _retrieve_log_line(self, file_obj, chunk_size=4096): - if not file_obj: - return - # Response.iter_lines automatically decodes 'gzip' and 'deflate' - # encodings. - # https://requests.readthedocs.io/en/master/user/quickstart/#raw-response-content - for line in file_obj.iter_lines(chunk_size, decode_unicode=True): - yield line - - def _open_log_file_url(self, source_url): - file_obj = None - - kwargs = {} - if self.log_cert_verify and self.log_ca_certs: - kwargs['verify'] = self.log_ca_certs - elif not self.log_cert_verify: - kwargs['verify'] = self.log_cert_verify - - try: - logging.debug("Retrieving: " + source_url) - # Use a session to persist the HTTP connection across requests - # while downloading chunks of the log file. - session = requests.Session() - session.headers = {'Accept-encoding': 'deflate, gzip'} - file_obj = session.get(source_url, stream=True, **kwargs) - file_obj.raise_for_status() - except requests.HTTPError as e: - if e.response.status_code == 404: - logging.info("Unable to retrieve %s: HTTP error 404" % - source_url) - else: - logging.exception("Unable to get log data.") - except Exception: - # Silently drop fatal errors when retrieving logs. - # TODO(clarkb): Handle these errors. - # Perhaps simply add a log message to file_obj? - logging.exception("Unable to retrieve source file.") - raise - - return file_obj, session - - -class StdOutLogProcessor(object): - def __init__(self, logq, pretty_print=False): - self.logq = logq - self.pretty_print = pretty_print - - def handle_log_event(self): - log = self.logq.get() - if self.pretty_print: - print(json.dumps(log, sort_keys=True, - indent=4, separators=(',', ': '))) - else: - print(json.dumps(log)) - # Push each log event through to keep logstash up to date. - sys.stdout.flush() - - -class INETLogProcessor(object): - socket_type = None - - def __init__(self, logq, host, port): - self.logq = logq - self.host = host - self.port = port - self.socket = None - - def _connect_socket(self): - logging.debug("Creating socket.") - self.socket = socket.socket(socket.AF_INET, self.socket_type) - self.socket.connect((self.host, self.port)) - - def handle_log_event(self): - log = self.logq.get() - try: - if self.socket is None: - self._connect_socket() - self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) - except Exception: - logging.exception("Exception sending INET event.") - # Logstash seems to take about a minute to start again. Wait 90 - # seconds before attempting to reconnect. If logstash is not - # available after 90 seconds we will throw another exception and - # die. - semi_busy_wait(90) - self._connect_socket() - self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) - - -class UDPLogProcessor(INETLogProcessor): - socket_type = socket.SOCK_DGRAM - - -class TCPLogProcessor(INETLogProcessor): - socket_type = socket.SOCK_STREAM - - -class PushMQTT(object): - def __init__(self, hostname, base_topic, port=1883, client_id=None, - keepalive=60, will=None, auth=None, tls=None, qos=0): - self.hostname = hostname - self.port = port - self.client_id = client_id - self.keepalive = 60 - self.will = will - self.auth = auth - self.tls = tls - self.qos = qos - self.base_topic = base_topic - - def _generate_topic(self, project, job_id, action): - return '/'.join([self.base_topic, project, job_id, action]) - - def publish_single(self, msg, project, job_id, action, build_queue=None): - if job_id: - topic = self._generate_topic(project, job_id, action) - elif build_queue: - topic = self._generate_topic(project, build_queue, action) - else: - topic = self.base_topic + '/' + project - - publish.single(topic, msg, hostname=self.hostname, - port=self.port, client_id=self.client_id, - keepalive=self.keepalive, will=self.will, - auth=self.auth, tls=self.tls, qos=self.qos) - - -class Server(object): - def __init__(self, config, debuglog): - # Config init. - self.config = config - self.gearman_host = self.config['gearman-host'] - self.gearman_port = self.config['gearman-port'] - self.output_host = self.config['output-host'] - self.output_port = self.config['output-port'] - self.output_mode = self.config['output-mode'] - mqtt_host = self.config.get('mqtt-host') - mqtt_port = self.config.get('mqtt-port', 1883) - mqtt_user = self.config.get('mqtt-user') - mqtt_pass = self.config.get('mqtt-pass') - mqtt_topic = self.config.get('mqtt-topic', 'gearman-subunit') - mqtt_ca_certs = self.config.get('mqtt-ca-certs') - mqtt_certfile = self.config.get('mqtt-certfile') - mqtt_keyfile = self.config.get('mqtt-keyfile') - self.log_ca_certs = self.config.get('log-ca-certs') - self.log_cert_verify = self.config.get('log-cert-verify', True) - # Pythong logging output file. - self.debuglog = debuglog - self.retriever = None - self.logqueue = queue.Queue(16384) - self.processor = None - self.filter_factories = [] - # Run the severity filter first so it can filter out chatty - # logs. - self.filter_factories.append(OsloSeverityFilterFactory()) - self.filter_factories.append(SystemdSeverityFilterFactory()) - crmscript = self.config.get('crm114-script') - crmdata = self.config.get('crm114-data') - if crmscript and crmdata: - self.filter_factories.append( - CRM114FilterFactory(crmscript, crmdata)) - # Setup MQTT - self.mqtt = None - if mqtt_host: - auth = None - if mqtt_user: - auth = {'username': mqtt_user} - if mqtt_pass: - auth['password'] = mqtt_pass - tls = None - if mqtt_ca_certs: - tls = {'ca_certs': mqtt_ca_certs, - 'certfile': mqtt_certfile, - 'keyfile': mqtt_keyfile} - - self.mqtt = PushMQTT(mqtt_host, mqtt_topic, port=mqtt_port, - auth=auth, tls=tls) - - def setup_logging(self): - if self.debuglog: - logging.basicConfig(format='%(asctime)s %(message)s', - filename=self.debuglog, level=logging.DEBUG) - else: - # Prevent leakage into the logstash log stream. - logging.basicConfig(level=logging.CRITICAL) - logging.debug("Log pusher starting.") - - def wait_for_name_resolution(self, host, port): - while True: - try: - socket.getaddrinfo(host, port) - except socket.gaierror as e: - if e.errno == socket.EAI_AGAIN: - logging.debug("Temporary failure in name resolution") - time.sleep(2) - continue - else: - raise - break - - def setup_retriever(self): - hostname = socket.gethostname() - gearman_worker = gear.Worker(hostname + '-pusher') - self.wait_for_name_resolution(self.gearman_host, self.gearman_port) - gearman_worker.addServer(self.gearman_host, - self.gearman_port) - gearman_worker.registerFunction(b'push-log') - self.retriever = LogRetriever(gearman_worker, self.filter_factories, - self.logqueue, self.log_cert_verify, - self.log_ca_certs, mqtt=self.mqtt) - - def setup_processor(self): - if self.output_mode == "tcp": - self.processor = TCPLogProcessor(self.logqueue, - self.output_host, - self.output_port) - elif self.output_mode == "udp": - self.processor = UDPLogProcessor(self.logqueue, - self.output_host, - self.output_port) - else: - # Note this processor will not work if the process is run as a - # daemon. You must use the --foreground option. - self.processor = StdOutLogProcessor(self.logqueue) - - def main(self): - self.setup_retriever() - self.setup_processor() - - self.retriever.daemon = True - self.retriever.start() - - while True: - try: - self.processor.handle_log_event() - except Exception: - logging.exception("Exception processing log event.") - raise - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("-c", "--config", required=True, - help="Path to yaml config file.") - parser.add_argument("-d", "--debuglog", - help="Enable debug log. " - "Specifies file to write log to.") - parser.add_argument("--foreground", action='store_true', - help="Run in the foreground.") - parser.add_argument("-p", "--pidfile", - default="/var/run/jenkins-log-pusher/" - "jenkins-log-gearman-worker.pid", - help="PID file to lock during daemonization.") - args = parser.parse_args() - - with open(args.config, 'r') as config_stream: - config = yaml.safe_load(config_stream) - server = Server(config, args.debuglog) - - if args.foreground: - server.setup_logging() - server.main() - else: - pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10) - with daemon.DaemonContext(pidfile=pidfile): - server.setup_logging() - server.main() - - -if __name__ == '__main__': - main() diff --git a/loggearman/requirements.txt b/loggearman/requirements.txt deleted file mode 100644 index 960eb2c..0000000 --- a/loggearman/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -pbr>=1.6 # Apache-2.0 -gear<=0.16.0 -requests<=2.26.0 # Apache-2.0 -PyYAML<=6.0 # MIT -pyzmq<=21.0.2 -paho-mqtt<=1.6.1 diff --git a/loggearman/setup.cfg b/loggearman/setup.cfg deleted file mode 100644 index 50015d1..0000000 --- a/loggearman/setup.cfg +++ /dev/null @@ -1,26 +0,0 @@ -[metadata] -name = loggearman -summary = OpenStack Log Processor Module -description_file = - README.rst -author = Openstack Contributors -author_email = service-discuss@lists.opendev.org -home_page = http://docs.openstack.org/infra/ci-log-processing -classifier = - Environment :: OpenStack - Intended Audience :: Information Technology - Intended Audience :: System Administrators - License :: OSI Approved :: Apache Software License - Operating System :: POSIX :: Linux - Programming Language :: Python - -[build_sphinx] -all_files = 1 -build-dir = doc/build -source-dir = doc/source -warning-is-error = 1 - -[entry_points] -console_scripts = - log-gearman-client = loggearman.client:main - log-gearman-worker = loggearman.worker:main diff --git a/loggearman/setup.py b/loggearman/setup.py deleted file mode 100644 index 1cb98d9..0000000 --- a/loggearman/setup.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2021 Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import setuptools - -setuptools.setup( - setup_requires=['pbr'], - pbr=True -) diff --git a/logscraper/logscraper.conf.sample b/logscraper/logscraper.conf.sample index 156bcb9..770c8a6 100644 --- a/logscraper/logscraper.conf.sample +++ b/logscraper/logscraper.conf.sample @@ -17,6 +17,4 @@ monitoring_port: 9128 ###################### # DEPRECATED OPTIONS # ###################### -# gearman_server: localhost -# gearman_port: 4730 # logstash_url: https://localhost:9600 diff --git a/logscraper/logscraper.py b/logscraper/logscraper.py index 7a4c4d1..c74a088 100755 --- a/logscraper/logscraper.py +++ b/logscraper/logscraper.py @@ -15,9 +15,11 @@ # under the License. """ -The goal is to push recent zuul builds into log gearman processor. +The goal is to take recent logs from Zuul CI job to the disk. +Below short view: [ CLI ] -> [ Config ] -> [ ZuulFetcher ] -> [ LogPublisher ] + (logscraper) -> (logsender) # Zuul builds results are not sorted by end time. Here is a problematic @@ -61,14 +63,11 @@ end_time. import argparse import configparser import datetime -import gear import itertools -import json import logging import multiprocessing import os import requests -import socket import sqlite3 import sys import tenacity @@ -129,8 +128,8 @@ def _verify_ca(args): # CLI # ############################################################################### def get_arguments(): - parser = argparse.ArgumentParser(description="Fetch and push last Zuul " - "CI job logs into gearman.") + parser = argparse.ArgumentParser(description="Fetch last Zuul CI job " + "logs.") parser.add_argument("--config", help="Logscraper config file", required=True) parser.add_argument("--file-list", help="File list to download. Parameter " @@ -141,19 +140,12 @@ def get_arguments(): parser.add_argument("--job-name", help="CI job name(s). Parameter can be " "set multiple times. If not set it would scrape " "every latest builds", nargs='+', default=[]) - parser.add_argument("--gearman-server", help="Gearman host address") - parser.add_argument("--gearman-port", help="Gearman listen port.", - type=int) parser.add_argument("--follow", help="Keep polling zuul builds", action="store_true") parser.add_argument("--insecure", help="Skip validating SSL cert", action="store_true") parser.add_argument("--checkpoint-file", help="File that will keep " "information about last uuid timestamp for a job.") - parser.add_argument("--logstash-url", help="When provided, script will " - "check connection to Logstash service before sending " - "to log processing system. For example: " - "logstash.local:9999") parser.add_argument("--workers", help="Worker processes for logscraper", type=int) parser.add_argument("--max-skipped", help="How many job results should be " @@ -162,9 +154,6 @@ def get_arguments(): type=int) parser.add_argument("--debug", help="Print more information", action="store_true") - parser.add_argument("--download", help="Download logs and do not send " - "to gearman service", - action="store_true") parser.add_argument("--directory", help="Directory, where the logs will " "be stored.") parser.add_argument("--wait-time", help="Pause time for the next " @@ -317,24 +306,11 @@ class Monitoring: ############################################################################### class LogMatcher(object): def __init__(self, server, port, success, log_url, host_vars, config): - self.client = gear.Client() - self.client.addServer(server, port) self.hosts = host_vars self.success = success self.log_url = log_url self.config_file = config - def submitJobs(self, jobname, files, result): - self.client.waitForServer(90) - ret = [] - for f in files: - output = self.makeOutput(f, result) - output = json.dumps(output).encode("utf8") - job = gear.TextJob(jobname, output) - self.client.submitJob(job, background=True) - ret.append(dict(handle=job.handle, arguments=output)) - return ret - def makeOutput(self, file_object, result): output = {} output["retry"] = False @@ -680,7 +656,6 @@ def run_build(build): """ args = build.get("build_args") - config_file = build.get("config_file") logging.info( "Processing logs for %s | %s | %s | %s", @@ -690,64 +665,33 @@ def run_build(build): build["uuid"], ) - if args.download: - logging.debug("Started fetching build logs") - directory = "%s/%s" % (args.directory, build["uuid"]) - try: - if not os.path.exists(directory): - os.makedirs(directory) - except PermissionError: - logging.critical("Can not create directory %s" % directory) - except Exception as e: - logging.critical("Exception occurred %s on creating dir %s" % ( - e, directory)) + logging.debug("Started fetching build logs") + directory = "%s/%s" % (args.directory, build["uuid"]) + try: + if not os.path.exists(directory): + os.makedirs(directory) + except PermissionError: + logging.critical("Can not create directory %s" % directory) + except Exception as e: + logging.critical("Exception occurred %s on creating dir %s" % ( + e, directory)) - validate_ca = _verify_ca(args) + validate_ca = _verify_ca(args) - if 'log_url' in build and build["log_url"]: - check_specified_files(build, validate_ca, args.timeout, directory) - else: - # NOTE: if build result does not contain 'log_url', so there is - # no result files to parse, but we would like to have that - # knowledge, so it will create own job-results.txt file that - # contains: - # build["end_time"] | build["result"] - logging.info("There is no log url for the build %s, so no file can" - " be downloaded. Creating custom job-results.txt " % - build["uuid"]) - create_custom_result(build, directory) - - save_build_info(directory, build) + if 'log_url' in build and build["log_url"]: + check_specified_files(build, validate_ca, args.timeout, directory) else: - # NOTE: As it was earlier, logs that contains status other than - # "SUCCESS" or "FAILURE" will be parsed by Gearman service. - logging.debug("Parsing content for gearman service") - validate_ca = _verify_ca(args) - results = dict(files=[], jobs=[], invocation={}) - files = check_specified_files(build, validate_ca, args.timeout) + # NOTE: if build result does not contain 'log_url', so there is + # no result files to parse, but we would like to have that + # knowledge, so it will create own job-results.txt file that + # contains: + # build["end_time"] | build["result"] + logging.info("There is no log url for the build %s, so no file can" + " be downloaded. Creating custom job-results.txt " % + build["uuid"]) + create_custom_result(build, directory) - results["files"] = files - lmc = LogMatcher( - args.gearman_server, - args.gearman_port, - build["result"], - build["log_url"], - {}, - config_file - ) - - lmc.submitJobs("push-log", results["files"], build) - - -def check_connection(logstash_url): - """Return True when Logstash service is reachable - - Check if service is up before pushing results. - """ - host, port = logstash_url.split(':') - logging.debug("Checking connection to %s on port %s" % (host, port)) - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - return s.connect_ex((host, port)) == 0 + save_build_info(directory, build) def run_scraping(args, zuul_api_url, job_name=None, monitoring=None): @@ -772,11 +716,6 @@ def run_scraping(args, zuul_api_url, job_name=None, monitoring=None): logging.info("Processing %d builds", len(builds)) - if args.logstash_url and not check_connection(args.logstash_url): - logging.critical("Can not connect to logstash %s. " - "Is it up?" % args.logstash_url) - return - if builds: pool = multiprocessing.Pool(int(args.workers)) try: @@ -824,10 +763,6 @@ def main(): monitoring = Monitoring() start_http_server(args.monitoring_port) - if args.download and args.gearman_server and args.gearman_port: - logging.critical("Can not use logscraper to send logs to gearman " - "and download logs. Choose one") - sys.exit(1) while True: run(args, monitoring) diff --git a/logscraper/tests/test_logscraper.py b/logscraper/tests/test_logscraper.py index a3a6954..af63881 100644 --- a/logscraper/tests/test_logscraper.py +++ b/logscraper/tests/test_logscraper.py @@ -15,7 +15,6 @@ # under the License. import datetime -import json import tempfile from logscraper import logscraper @@ -144,8 +143,7 @@ class _MockedPoolMapAsyncResult: class FakeArgs(object): - def __init__(self, zuul_api_url=None, gearman_server=None, - gearman_port=None, follow=False, insecure=False, + def __init__(self, zuul_api_url=None, follow=False, insecure=False, checkpoint_file=None, ignore_checkpoint=None, logstash_url=None, workers=None, max_skipped=None, job_name=None, download=None, directory=None, @@ -154,8 +152,6 @@ class FakeArgs(object): timeout=None): self.zuul_api_url = zuul_api_url - self.gearman_server = gearman_server - self.gearman_port = gearman_port self.follow = follow self.insecure = insecure self.checkpoint_file = checkpoint_file @@ -251,41 +247,17 @@ class TestScraper(base.TestCase): zuul_api_url=['http://somehost.com/api/tenant/tenant1', 'http://somehost.com/api/tenant/tenant2', 'http://somehost.com/api/tenant/tenant3'], - gearman_server='localhost', job_name=['testjob1', 'testjob2']) args = logscraper.get_arguments() logscraper.run(args, mock_monitoring) self.assertEqual(2, mock_scraping.call_count) - @mock.patch('socket.socket') - def test_check_connection(self, mock_socket): - with mock.patch('argparse.ArgumentParser.parse_args') as mock_args: - mock_args.return_value = FakeArgs( - zuul_api_url='somehost.com', - gearman_server='localhost', - logstash_url='localhost:9999') - args = logscraper.get_arguments() - logscraper.check_connection(args.logstash_url) - self.assertTrue(mock_socket.called) - - @mock.patch('socket.socket') - def test_check_connection_wrong_host(self, mock_socket): - with mock.patch('argparse.ArgumentParser.parse_args') as mock_args: - mock_args.return_value = FakeArgs( - zuul_api_url='somehost.com', - gearman_server='localhost', - logstash_url='localhost') - args = logscraper.get_arguments() - self.assertRaises(ValueError, logscraper.check_connection, - args.logstash_url) - @mock.patch('logscraper.logscraper.get_builds', return_value=iter([{'_id': '1234'}])) @mock.patch('argparse.ArgumentParser.parse_args') def test_get_last_job_results(self, mock_args, mock_get_builds): mock_args.return_value = FakeArgs( zuul_api_url='http://somehost.com/api/tenant/sometenant', - gearman_server='localhost', checkpoint_file='/tmp/testfile') args = logscraper.get_arguments() some_config = logscraper.Config(args, args.zuul_api_url) @@ -306,6 +278,7 @@ class TestScraper(base.TestCase): 'someuuid', None, 10) self.assertRaises(ValueError, make_fake_list, job_result) + @mock.patch('sqlite3.connect', return_value=mock.MagicMock()) @mock.patch('logscraper.logscraper.load_config') @mock.patch('logscraper.logscraper.save_build_info') @mock.patch('logscraper.logscraper.check_specified_files') @@ -313,16 +286,13 @@ class TestScraper(base.TestCase): @mock.patch('os.path.isfile') @mock.patch('logscraper.logscraper.check_specified_files', return_value=['job-output.txt']) - @mock.patch('logscraper.logscraper.LogMatcher.submitJobs') @mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs( zuul_api_url=['http://somehost.com/api/tenant/tenant1'], - gearman_server='localhost', - gearman_port=4731, workers=1)) - def test_run_scraping(self, mock_args, mock_submit, mock_files, + def test_run_scraping(self, mock_args, mock_files, mock_isfile, mock_readfile, mock_specified_files, - mock_save_buildinfo, mock_config): + mock_save_buildinfo, mock_config, mock_sqlite): with mock.patch('logscraper.logscraper.get_last_job_results' ) as mock_job_results: with mock.patch('multiprocessing.pool.Pool.map_async', @@ -333,12 +303,9 @@ class TestScraper(base.TestCase): mock_job_results.return_value = [builds_result[0]] logscraper.run_scraping( args, 'http://somehost.com/api/tenant/tenant1') - self.assertEqual(builds_result[0]['uuid'], - mock_submit.call_args.args[2]['uuid']) - self.assertTrue(mock_submit.called) self.assertEqual(builds_result[0], mock_specified_files.call_args.args[0]) - self.assertFalse(mock_save_buildinfo.called) + self.assertTrue(mock_save_buildinfo.called) @mock.patch('requests.get') @mock.patch('logscraper.logscraper.Monitoring') @@ -351,11 +318,12 @@ class TestScraper(base.TestCase): zuul_api_url=['http://somehost.com/api/tenant/tenant1', 'http://somehost.com/api/tenant/tenant2', 'http://somehost.com/api/tenant/tenant3'], - gearman_server='localhost') + ) args = logscraper.get_arguments() logscraper.run(args, mock_monitoring) self.assertEqual(3, mock_scraping.call_count) + @mock.patch('sqlite3.connect', return_value=mock.MagicMock()) @mock.patch('logscraper.logscraper.load_config') @mock.patch('logscraper.logscraper.save_build_info') @mock.patch('logscraper.logscraper.check_specified_files') @@ -363,15 +331,14 @@ class TestScraper(base.TestCase): @mock.patch('os.path.isfile') @mock.patch('logscraper.logscraper.check_specified_files', return_value=['job-output.txt']) - @mock.patch('logscraper.logscraper.LogMatcher.submitJobs') @mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs( zuul_api_url=['http://somehost.com/api/tenant/tenant1'], workers=1, download=True, directory="/tmp/testdir")) - def test_run_scraping_download(self, mock_args, mock_submit, mock_files, + def test_run_scraping_download(self, mock_args, mock_files, mock_isfile, mock_readfile, mock_specified_files, mock_save_buildinfo, - mock_config): + mock_config, mock_sqlite): with mock.patch('logscraper.logscraper.get_last_job_results' ) as mock_job_results: with mock.patch( @@ -385,12 +352,12 @@ class TestScraper(base.TestCase): logscraper.run_scraping( args, 'http://somehost.com/api/tenant/tenant1') - self.assertFalse(mock_submit.called) self.assertTrue(mock_specified_files.called) self.assertEqual(builds_result[0], mock_specified_files.call_args.args[0]) self.assertTrue(mock_save_buildinfo.called) + @mock.patch('sqlite3.connect', return_value=mock.MagicMock()) @mock.patch('logscraper.logscraper.load_config') @mock.patch('logscraper.logscraper.save_build_info') @mock.patch('logscraper.logscraper.check_specified_files') @@ -398,15 +365,14 @@ class TestScraper(base.TestCase): @mock.patch('os.path.isfile') @mock.patch('logscraper.logscraper.check_specified_files', return_value=['job-output.txt']) - @mock.patch('logscraper.logscraper.LogMatcher.submitJobs') @mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs( zuul_api_url=['http://somehost.com/api/tenant/tenant1'], workers=1, download=True, directory="/tmp/testdir")) - def test_run_scraping_monitoring(self, mock_args, mock_submit, mock_files, + def test_run_scraping_monitoring(self, mock_args, mock_files, mock_isfile, mock_readfile, mock_specified_files, mock_save_buildinfo, - mock_config): + mock_config, mock_sqlite): with mock.patch('logscraper.logscraper.get_last_job_results' ) as mock_job_results: with mock.patch( @@ -424,7 +390,6 @@ class TestScraper(base.TestCase): self.assertEqual('job_name', monitoring.job_count._labelnames[0]) self.assertEqual(2, len(monitoring.job_count._metrics)) - self.assertFalse(mock_submit.called) self.assertTrue(mock_specified_files.called) self.assertEqual(builds_result[0], mock_specified_files.call_args.args[0]) @@ -432,14 +397,12 @@ class TestScraper(base.TestCase): @mock.patch('logscraper.logscraper.create_custom_result') @mock.patch('logscraper.logscraper.check_specified_files') - @mock.patch('logscraper.logscraper.LogMatcher.submitJobs') - @mock.patch('gear.BaseClient.waitForServer') @mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs( zuul_api_url=['http://somehost.com/api/tenant/tenant1'], workers=1, download=True, directory="/tmp/testdir")) - def test_run_aborted_download(self, mock_args, mock_gear, mock_gear_client, - mock_check_files, mock_custom_result): + def test_run_aborted_download(self, mock_args, mock_check_files, + mock_custom_result): # Take job result that log_url is empty. result = builds_result[2] result['files'] = ['job-output.txt'] @@ -454,21 +417,17 @@ class TestScraper(base.TestCase): logscraper.run_build(result) logscraper.run_build(result_node_fail) - self.assertFalse(mock_gear_client.called) self.assertFalse(mock_check_files.called) self.assertTrue(mock_custom_result.called) @mock.patch('logscraper.logscraper.create_custom_result') @mock.patch('logscraper.logscraper.check_specified_files') - @mock.patch('logscraper.logscraper.LogMatcher.submitJobs') - @mock.patch('gear.BaseClient.waitForServer') @mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs( zuul_api_url=['http://somehost.com/api/tenant/tenant1'], - workers=1, gearman_server='localhost', - gearman_port='4731')) - def test_run_aborted(self, mock_args, mock_gear, mock_gear_client, - mock_check_files, mock_custom_result): + workers=1)) + def test_run_aborted(self, mock_args, mock_check_files, + mock_custom_result): # Take job result that build_status is "ABORTED" or "NODE_FAILURE" result = builds_result[2] result['files'] = ['job-output.txt'] @@ -483,9 +442,8 @@ class TestScraper(base.TestCase): logscraper.run_build(result) logscraper.run_build(result_node_fail) - self.assertTrue(mock_gear_client.called) - self.assertTrue(mock_check_files.called) - self.assertFalse(mock_custom_result.called) + self.assertFalse(mock_check_files.called) + self.assertTrue(mock_custom_result.called) @mock.patch('requests.get') @mock.patch('logscraper.logscraper.Monitoring') @@ -499,7 +457,7 @@ class TestScraper(base.TestCase): zuul_api_url=['http://somehost.com/api/tenant/tenant1', 'http://somehost.com/api/tenant/tenant2', 'http://somehost.com/api/tenant/tenant3'], - gearman_server='localhost') + ) args = logscraper.get_arguments() logscraper.run(args, mock_monitoring) @@ -609,14 +567,15 @@ class TestScraper(base.TestCase): class TestConfig(base.TestCase): + @mock.patch('sqlite3.connect', return_value=mock.MagicMock()) @mock.patch('logscraper.logscraper.load_config') @mock.patch('sys.exit') - def test_config_object(self, mock_sys, mock_config): + def test_config_object(self, mock_sys, mock_config, mock_sqlite): # Assume that url is wrong so it raise IndexError with mock.patch('argparse.ArgumentParser.parse_args') as mock_args: mock_args.return_value = FakeArgs( zuul_api_url='somehost.com', - gearman_server='localhost') + ) args = logscraper.get_arguments() self.assertRaises(IndexError, logscraper.Config, args, args.zuul_api_url) @@ -624,7 +583,7 @@ class TestConfig(base.TestCase): with mock.patch('argparse.ArgumentParser.parse_args') as mock_args: mock_args.return_value = FakeArgs( zuul_api_url='https://somehost.com', - gearman_server='localhost') + ) args = logscraper.get_arguments() logscraper.Config(args, args.zuul_api_url) mock_sys.assert_called() @@ -636,7 +595,6 @@ class TestConfig(base.TestCase): # correct url without job name mock_args.return_value = FakeArgs( zuul_api_url='http://somehost.com/api/tenant/sometenant', - gearman_server='localhost', checkpoint_file='/tmp/testfile') args = logscraper.get_arguments() some_config = logscraper.Config(args, args.zuul_api_url) @@ -656,161 +614,6 @@ class TestLogMatcher(base.TestCase): }] } - @mock.patch('logscraper.logscraper.load_config') - @mock.patch('gear.TextJob') - @mock.patch('gear.Client.submitJob') - @mock.patch('gear.BaseClient.waitForServer') - def test_submitJobs(self, mock_gear, mock_gear_client, mock_gear_job, - mock_load_config): - result = builds_result[0] - result['files'] = ['job-output.txt'] - result['tenant'] = 'sometenant' - parsed_job = { - "build_branch": "master", - "build_change": 806255, - "build_duration": 307.0, - "build_name": "openstack-tox-py38", - "build_node": "zuul-executor", - "build_patchset": "9", - "build_queue": "check", - "build_ref": "refs/changes/55/806255/9", - "build_set": {"uuid": "bf11828235c649ff859ad87d7c4aa525"}, - "build_status": "SUCCESS", - "build_uuid": "bf11828235c649ff859ad87d7c4aa525", - "build_zuul_url": "N/A", - "filename": "job-output.txt", - "log_url": "https://t.com/openstack/a0f8968/job-output.txt", - "node_provider": "local", - "project": "openstack/tempest", - "tenant": "sometenant", - "voting": 1} - - expected_gear_job = {"retry": False, "event": { - "fields": parsed_job, - "tags": ["job-output.txt", "console", "console.html"]}, - "source_url": "https://t.com/openstack/a0f8968/job-output.txt"} - mock_load_config.return_value = self.config_file - with mock.patch('argparse.ArgumentParser.parse_args') as mock_args: - mock_args.return_value = FakeArgs( - zuul_api_url='http://somehost.com/api/tenant/sometenant', - gearman_server='localhost', - gearman_port='4731') - args = logscraper.get_arguments() - lmc = logscraper.LogMatcher(args.gearman_server, args.gearman_port, - result['result'], result['log_url'], - {}, self.config_file) - lmc.submitJobs('push-log', result['files'], result) - mock_gear_client.assert_called_once() - self.assertEqual( - expected_gear_job, - json.loads(mock_gear_job.call_args.args[1].decode('utf-8')) - ) - - @mock.patch('logscraper.logscraper.load_config') - @mock.patch('gear.TextJob') - @mock.patch('gear.Client.submitJob') - @mock.patch('gear.BaseClient.waitForServer') - def test_submitJobs_failure(self, mock_gear, mock_gear_client, - mock_gear_job, mock_load_config): - # Take job result that build_status is "ABORTED" - result = builds_result[1] - result['files'] = ['job-output.txt'] - result['tenant'] = 'sometenant' - parsed_job = { - 'build_branch': 'master', - 'build_change': 816445, - "build_duration": 603.0, - 'build_name': 'tripleo-centos-8', - 'build_node': 'zuul-executor', - 'build_patchset': '1', - 'build_queue': 'check', - 'build_ref': 'refs/changes/45/816445/1', - 'build_set': {'uuid': '4a0ffebe30a94efe819fffc03cf33ea4'}, - 'build_status': 'FAILURE', - 'build_uuid': '4a0ffebe30a94efe819fffc03cf33ea4', - 'build_zuul_url': 'N/A', - 'filename': 'job-output.txt', - 'log_url': 'https://t.com/tripleo-8/3982864/job-output.txt', - 'node_provider': 'local', - 'project': 'openstack/tripleo-ansible', - 'tenant': 'sometenant', - 'voting': 1} - - expected_gear_job = {"retry": False, "event": { - "fields": parsed_job, - "tags": ["job-output.txt", "console", "console.html"]}, - "source_url": "https://t.com/tripleo-8/3982864/job-output.txt"} - mock_load_config.return_value = self.config_file - with mock.patch('argparse.ArgumentParser.parse_args') as mock_args: - mock_args.return_value = FakeArgs( - zuul_api_url='http://somehost.com/api/tenant/sometenant', - gearman_server='localhost', - gearman_port='4731') - args = logscraper.get_arguments() - lmc = logscraper.LogMatcher(args.gearman_server, args.gearman_port, - result['result'], result['log_url'], - {}, self.config_file) - lmc.submitJobs('push-log', result['files'], result) - mock_gear_client.assert_called_once() - self.assertEqual( - expected_gear_job, - json.loads(mock_gear_job.call_args.args[1].decode('utf-8')) - ) - - @mock.patch('logscraper.logscraper.load_config') - @mock.patch('gear.TextJob') - @mock.patch('gear.Client.submitJob') - @mock.patch('gear.BaseClient.waitForServer') - def test_submitJobs_aborted(self, mock_gear, mock_gear_client, - mock_gear_job, mock_load_config): - # Take job result that build_status is "ABORTED" - result = builds_result[2] - result['files'] = ['job-output.txt'] - result['tenant'] = 'sometenant' - parsed_job = { - 'build_branch': 'stable/victoria', - 'build_change': 816486, - 'build_duration': 18, - 'build_name': 'openstack-tox-lower-constraints', - 'build_node': 'zuul-executor', - 'build_patchset': '1', - 'build_queue': 'check', - 'build_ref': 'refs/changes/86/816486/1', - 'build_set': {'uuid': 'bd044dfe3ecc484fbbf74fdeb7fb56aa'}, - 'build_status': 'FAILURE', - 'build_uuid': 'bd044dfe3ecc484fbbf74fdeb7fb56aa', - 'build_zuul_url': 'N/A', - 'filename': 'job-output.txt', - 'log_url': 'job-output.txt', - 'node_provider': 'local', - 'project': 'openstack/nova', - 'tenant': 'sometenant', - 'voting': 1} - - # NOTE: normally ABORTED jobs does not provide log_url, - # so source_url will be just a file to iterate. - # In the logscraper, aborted jobs are just skipped. - expected_gear_job = {"retry": False, "event": { - "fields": parsed_job, - "tags": ["job-output.txt", "console", "console.html"]}, - "source_url": "job-output.txt"} - mock_load_config.return_value = self.config_file - with mock.patch('argparse.ArgumentParser.parse_args') as mock_args: - mock_args.return_value = FakeArgs( - zuul_api_url='http://somehost.com/api/tenant/sometenant', - gearman_server='localhost', - gearman_port='4731') - args = logscraper.get_arguments() - lmc = logscraper.LogMatcher(args.gearman_server, args.gearman_port, - result['result'], result['log_url'], - {}, self.config_file) - lmc.submitJobs('push-log', result['files'], result) - mock_gear_client.assert_called_once() - self.assertEqual( - expected_gear_job, - json.loads(mock_gear_job.call_args.args[1].decode('utf-8')) - ) - @mock.patch('builtins.open', new_callable=mock.mock_open()) @mock.patch('os.path.isfile') @mock.patch('requests.get') diff --git a/logscraper/tests/test_logsender.py b/logscraper/tests/test_logsender.py index deafec8..1877f7d 100755 --- a/logscraper/tests/test_logsender.py +++ b/logscraper/tests/test_logsender.py @@ -36,8 +36,6 @@ build_args: directory: /tmp/logscraper download: true follow: false - gearman_port: 4730 - gearman_server: null insecure: true job_name: null logstash_url: null diff --git a/requirements.txt b/requirements.txt index 76638fd..fa0fa5a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ pbr>=1.6 # Apache-2.0 -gear<0.17 requests<2.27 # Apache-2.0 PyYAML<6.1 # MIT tenacity