Add Log Processor module
The log-gearman-client and log-gearman-worker scripts have been moved from puppet-log_processor repository [1] to this project. Ported files also include patch that was giving capability with Python3 [2] and fixed pep8 errors. [1] https://opendev.org/opendev/puppet-log_processor [2] https://review.opendev.org/c/opendev/puppet-log_processor/+/809424 Change-Id: I353f98bd07d0c21b80f92307806b50da4c7ee39c
This commit is contained in:
parent
3e39caf520
commit
9a9f159769
24
loggearman/Dockerfile
Normal file
24
loggearman/Dockerfile
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
# Copyright (C) 2021 Red Hat
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
FROM opendevorg/python-builder:3.8 as builder
|
||||||
|
|
||||||
|
ENV OSLO_PACKAGE_VERSION='0.0.1'
|
||||||
|
COPY . /tmp/src
|
||||||
|
RUN assemble
|
||||||
|
|
||||||
|
FROM opendevorg/python-base:3.8 as loggearman
|
||||||
|
|
||||||
|
COPY --from=builder /output/ /output
|
||||||
|
RUN /output/install-from-bindep
|
8
loggearman/README.rst
Normal file
8
loggearman/README.rst
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
OpenStack Log Processor Module
|
||||||
|
==============================
|
||||||
|
|
||||||
|
The Log Processor Module comes from `repository`_ with
|
||||||
|
applied patches from this `patchset`_.
|
||||||
|
|
||||||
|
.. _repository: https://opendev.org/opendev/puppet-log_processor
|
||||||
|
.. _patchset: https://review.opendev.org/c/opendev/puppet-log_processor/+/809424
|
0
loggearman/loggearman/__init__.py
Normal file
0
loggearman/loggearman/__init__.py
Normal file
243
loggearman/loggearman/client.py
Executable file
243
loggearman/loggearman/client.py
Executable file
@ -0,0 +1,243 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
#
|
||||||
|
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import daemon
|
||||||
|
import gear
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import os.path
|
||||||
|
import re
|
||||||
|
import signal
|
||||||
|
import socket
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import yaml
|
||||||
|
import zmq
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
import daemon.pidlockfile as pidfile_mod
|
||||||
|
except ImportError:
|
||||||
|
import daemon.pidfile as pidfile_mod
|
||||||
|
|
||||||
|
|
||||||
|
class EventProcessor(threading.Thread):
|
||||||
|
def __init__(self, zmq_address, gearman_client, files, source_url):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self.files = files
|
||||||
|
self.source_url = source_url
|
||||||
|
self.gearman_client = gearman_client
|
||||||
|
self.zmq_address = zmq_address
|
||||||
|
self._connect_zmq()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
self._read_event()
|
||||||
|
except Exception:
|
||||||
|
# Assume that an error reading data from zmq or deserializing
|
||||||
|
# data received from zmq indicates a zmq error and reconnect.
|
||||||
|
logging.exception("ZMQ exception.")
|
||||||
|
self._connect_zmq()
|
||||||
|
|
||||||
|
def _connect_zmq(self):
|
||||||
|
logging.debug("Connecting to zmq endpoint.")
|
||||||
|
self.context = zmq.Context()
|
||||||
|
self.socket = self.context.socket(zmq.SUB)
|
||||||
|
event_filter = b"onFinalized"
|
||||||
|
self.socket.setsockopt(zmq.SUBSCRIBE, event_filter)
|
||||||
|
self.socket.connect(self.zmq_address)
|
||||||
|
|
||||||
|
def _read_event(self):
|
||||||
|
string = self.socket.recv().decode('utf-8')
|
||||||
|
event = json.loads(string.split(None, 1)[1])
|
||||||
|
logging.debug("Jenkins event received: " + json.dumps(event))
|
||||||
|
for fileopts in self.files:
|
||||||
|
output = {}
|
||||||
|
source_url, out_event = self._parse_event(event, fileopts)
|
||||||
|
job_filter = fileopts.get('job-filter')
|
||||||
|
if (job_filter and not re.match(
|
||||||
|
job_filter, out_event['fields']['build_name'])):
|
||||||
|
continue
|
||||||
|
build_queue_filter = fileopts.get('build-queue-filter')
|
||||||
|
if (build_queue_filter and not re.match(
|
||||||
|
build_queue_filter, out_event['fields']['build_queue'])):
|
||||||
|
continue
|
||||||
|
project_filter = fileopts.get('project-filter')
|
||||||
|
if (project_filter and not re.match(
|
||||||
|
project_filter, out_event['fields']['project'])):
|
||||||
|
continue
|
||||||
|
output['source_url'] = source_url
|
||||||
|
output['retry'] = fileopts.get('retry-get', False)
|
||||||
|
output['event'] = out_event
|
||||||
|
if 'subunit' in fileopts.get('name'):
|
||||||
|
job = gear.Job(b'push-subunit',
|
||||||
|
json.dumps(output).encode('utf8'))
|
||||||
|
else:
|
||||||
|
job = gear.Job(b'push-log', json.dumps(output).encode('utf8'))
|
||||||
|
try:
|
||||||
|
self.gearman_client.submitJob(job)
|
||||||
|
except Exception:
|
||||||
|
logging.exception("Exception submitting job to Gearman.")
|
||||||
|
|
||||||
|
def _get_log_dir(self, event):
|
||||||
|
parameters = event["build"].get("parameters", {})
|
||||||
|
base = parameters.get('LOG_PATH', 'UNKNOWN')
|
||||||
|
return base
|
||||||
|
|
||||||
|
def _parse_fields(self, event, filename):
|
||||||
|
fields = {}
|
||||||
|
fields["filename"] = filename
|
||||||
|
fields["build_name"] = event.get("name", "UNKNOWN")
|
||||||
|
fields["build_status"] = event["build"].get("status", "UNKNOWN")
|
||||||
|
fields["build_node"] = event["build"].get("node_name", "UNKNOWN")
|
||||||
|
fields["build_master"] = event["build"].get("host_name", "UNKNOWN")
|
||||||
|
parameters = event["build"].get("parameters", {})
|
||||||
|
fields["project"] = parameters.get("ZUUL_PROJECT", "UNKNOWN")
|
||||||
|
# The voting value is "1" for voting, "0" for non-voting
|
||||||
|
fields["voting"] = parameters.get("ZUUL_VOTING", "UNKNOWN")
|
||||||
|
# TODO(clarkb) can we do better without duplicated data here?
|
||||||
|
fields["build_uuid"] = parameters.get("ZUUL_UUID", "UNKNOWN")
|
||||||
|
fields["build_short_uuid"] = fields["build_uuid"][:7]
|
||||||
|
fields["build_queue"] = parameters.get("ZUUL_PIPELINE", "UNKNOWN")
|
||||||
|
fields["build_ref"] = parameters.get("ZUUL_REF", "UNKNOWN")
|
||||||
|
fields["build_branch"] = parameters.get("ZUUL_BRANCH", "UNKNOWN")
|
||||||
|
fields["build_zuul_url"] = parameters.get("ZUUL_URL", "UNKNOWN")
|
||||||
|
if parameters.get("ZUUL_CHANGE"):
|
||||||
|
fields["build_change"] = parameters.get("ZUUL_CHANGE", "UNKNOWN")
|
||||||
|
fields["build_patchset"] = parameters.get("ZUUL_PATCHSET",
|
||||||
|
"UNKNOWN")
|
||||||
|
elif parameters.get("ZUUL_NEWREV"):
|
||||||
|
fields["build_newrev"] = parameters.get("ZUUL_NEWREV",
|
||||||
|
"UNKNOWN")
|
||||||
|
if ["build_node"] != "UNKNOWN":
|
||||||
|
node_provider = '-'.join(
|
||||||
|
fields["build_node"].split('-')[-3:-1])
|
||||||
|
fields["node_provider"] = node_provider or "UNKNOWN"
|
||||||
|
else:
|
||||||
|
fields["node_provider"] = "UNKNOWN"
|
||||||
|
return fields
|
||||||
|
|
||||||
|
def _parse_event(self, event, fileopts):
|
||||||
|
fields = self._parse_fields(event, fileopts['name'])
|
||||||
|
log_dir = self._get_log_dir(event)
|
||||||
|
source_url = fileopts.get('source-url', self.source_url) + '/' + \
|
||||||
|
os.path.join(log_dir, fileopts['name'])
|
||||||
|
fields["log_url"] = source_url
|
||||||
|
out_event = {}
|
||||||
|
out_event["fields"] = fields
|
||||||
|
out_event["tags"] = [os.path.basename(fileopts['name'])] + \
|
||||||
|
fileopts.get('tags', [])
|
||||||
|
return source_url, out_event
|
||||||
|
|
||||||
|
|
||||||
|
class Server(object):
|
||||||
|
def __init__(self, config, debuglog):
|
||||||
|
# Config init.
|
||||||
|
self.config = config
|
||||||
|
self.source_url = self.config['source-url']
|
||||||
|
# Pythong logging output file.
|
||||||
|
self.debuglog = debuglog
|
||||||
|
self.processors = []
|
||||||
|
|
||||||
|
def setup_logging(self):
|
||||||
|
if self.debuglog:
|
||||||
|
logging.basicConfig(format='%(asctime)s %(message)s',
|
||||||
|
filename=self.debuglog, level=logging.DEBUG)
|
||||||
|
else:
|
||||||
|
# Prevent leakage into the logstash log stream.
|
||||||
|
logging.basicConfig(level=logging.CRITICAL)
|
||||||
|
logging.debug("Log pusher starting.")
|
||||||
|
|
||||||
|
def setup_processors(self):
|
||||||
|
for publisher in self.config['zmq-publishers']:
|
||||||
|
gearclient = gear.Client()
|
||||||
|
gearclient.addServer('localhost')
|
||||||
|
gearclient.waitForServer()
|
||||||
|
log_processor = EventProcessor(
|
||||||
|
publisher, gearclient,
|
||||||
|
self.config['source-files'], self.source_url)
|
||||||
|
subunit_processor = EventProcessor(
|
||||||
|
publisher, gearclient,
|
||||||
|
self.config['subunit-files'], self.source_url)
|
||||||
|
self.processors.append(log_processor)
|
||||||
|
self.processors.append(subunit_processor)
|
||||||
|
|
||||||
|
def wait_for_name_resolution(self, host, port):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
socket.getaddrinfo(host, port)
|
||||||
|
except socket.gaierror as e:
|
||||||
|
if e.errno == socket.EAI_AGAIN:
|
||||||
|
logging.debug("Temporary failure in name resolution")
|
||||||
|
time.sleep(2)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
break
|
||||||
|
|
||||||
|
def main(self):
|
||||||
|
statsd_host = os.environ.get('STATSD_HOST')
|
||||||
|
statsd_port = int(os.environ.get('STATSD_PORT', 8125))
|
||||||
|
statsd_prefix = os.environ.get('STATSD_PREFIX', 'logstash.geard')
|
||||||
|
if statsd_host:
|
||||||
|
self.wait_for_name_resolution(statsd_host, statsd_port)
|
||||||
|
self.gearserver = gear.Server(
|
||||||
|
statsd_host=statsd_host,
|
||||||
|
statsd_port=statsd_port,
|
||||||
|
statsd_prefix=statsd_prefix)
|
||||||
|
|
||||||
|
self.setup_processors()
|
||||||
|
for processor in self.processors:
|
||||||
|
processor.daemon = True
|
||||||
|
processor.start()
|
||||||
|
while True:
|
||||||
|
signal.pause()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("-c", "--config", required=True,
|
||||||
|
help="Path to yaml config file.")
|
||||||
|
parser.add_argument("-d", "--debuglog",
|
||||||
|
help="Enable debug log. "
|
||||||
|
"Specifies file to write log to.")
|
||||||
|
parser.add_argument("--foreground", action='store_true',
|
||||||
|
help="Run in the foreground.")
|
||||||
|
parser.add_argument("-p", "--pidfile",
|
||||||
|
default="/var/run/jenkins-log-pusher/"
|
||||||
|
"jenkins-log-gearman-client.pid",
|
||||||
|
help="PID file to lock during daemonization.")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
with open(args.config, 'r') as config_stream:
|
||||||
|
config = yaml.safe_load(config_stream)
|
||||||
|
server = Server(config, args.debuglog)
|
||||||
|
|
||||||
|
if args.foreground:
|
||||||
|
server.setup_logging()
|
||||||
|
server.main()
|
||||||
|
else:
|
||||||
|
pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10)
|
||||||
|
with daemon.DaemonContext(pidfile=pidfile):
|
||||||
|
server.setup_logging()
|
||||||
|
server.main()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
566
loggearman/loggearman/worker.py
Executable file
566
loggearman/loggearman/worker.py
Executable file
@ -0,0 +1,566 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
#
|
||||||
|
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import daemon
|
||||||
|
import gear
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import queue
|
||||||
|
import re
|
||||||
|
import requests
|
||||||
|
import select
|
||||||
|
import socket
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
import paho.mqtt.publish as publish
|
||||||
|
|
||||||
|
try:
|
||||||
|
import daemon.pidlockfile as pidfile_mod
|
||||||
|
except ImportError:
|
||||||
|
import daemon.pidfile as pidfile_mod
|
||||||
|
|
||||||
|
|
||||||
|
def semi_busy_wait(seconds):
|
||||||
|
# time.sleep() may return early. If it does sleep() again and repeat
|
||||||
|
# until at least the number of seconds specified has elapsed.
|
||||||
|
start_time = time.time()
|
||||||
|
while True:
|
||||||
|
time.sleep(seconds)
|
||||||
|
cur_time = time.time()
|
||||||
|
seconds = seconds - (cur_time - start_time)
|
||||||
|
if seconds <= 0.0:
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
class FilterException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CRM114Filter(object):
|
||||||
|
def __init__(self, script, path, build_status):
|
||||||
|
self.p = None
|
||||||
|
self.script = script
|
||||||
|
self.path = path
|
||||||
|
self.build_status = build_status
|
||||||
|
if build_status not in ['SUCCESS', 'FAILURE']:
|
||||||
|
return
|
||||||
|
if not os.path.exists(path):
|
||||||
|
os.makedirs(path)
|
||||||
|
args = [script, path, build_status]
|
||||||
|
self.p = subprocess.Popen(args,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
stdin=subprocess.PIPE,
|
||||||
|
close_fds=True)
|
||||||
|
|
||||||
|
def process(self, data):
|
||||||
|
if not self.p:
|
||||||
|
return True
|
||||||
|
self.p.stdin.write(data['message'].encode('utf-8') + '\n')
|
||||||
|
(r, w, x) = select.select([self.p.stdout], [],
|
||||||
|
[self.p.stdin, self.p.stdout], 20)
|
||||||
|
if not r:
|
||||||
|
self.p.kill()
|
||||||
|
raise FilterException('Timeout reading from CRM114')
|
||||||
|
r = self.p.stdout.readline()
|
||||||
|
if not r:
|
||||||
|
err = self.p.stderr.read()
|
||||||
|
if err:
|
||||||
|
raise FilterException(err)
|
||||||
|
else:
|
||||||
|
raise FilterException('Early EOF from CRM114')
|
||||||
|
r = r.strip()
|
||||||
|
data['error_pr'] = float(r)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _catchOSError(self, method):
|
||||||
|
try:
|
||||||
|
method()
|
||||||
|
except OSError:
|
||||||
|
logging.exception("Subprocess cleanup failed.")
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if not self.p:
|
||||||
|
return
|
||||||
|
# CRM114 should die when its stdinput is closed. Close that
|
||||||
|
# fd along with stdout and stderr then return.
|
||||||
|
self._catchOSError(self.p.stdin.close)
|
||||||
|
self._catchOSError(self.p.stdout.close)
|
||||||
|
self._catchOSError(self.p.stderr.close)
|
||||||
|
self._catchOSError(self.p.wait)
|
||||||
|
|
||||||
|
|
||||||
|
class CRM114FilterFactory(object):
|
||||||
|
name = "CRM114"
|
||||||
|
|
||||||
|
def __init__(self, script, basepath):
|
||||||
|
self.script = script
|
||||||
|
self.basepath = basepath
|
||||||
|
# Precompile regexes
|
||||||
|
self.re_remove_suffix = re.compile(r'(\.[^a-zA-Z]+)?(\.gz)?$')
|
||||||
|
self.re_remove_dot = re.compile(r'\.')
|
||||||
|
|
||||||
|
def create(self, fields):
|
||||||
|
# We only want the basename so that the same logfile at different
|
||||||
|
# paths isn't treated as different
|
||||||
|
filename = os.path.basename(fields['filename'])
|
||||||
|
# We want to collapse any numeric or compression suffixes so that
|
||||||
|
# nova.log and nova.log.1 and nova.log.1.gz are treated as the same
|
||||||
|
# logical file
|
||||||
|
filename = self.re_remove_suffix.sub(r'', filename)
|
||||||
|
filename = self.re_remove_dot.sub('_', filename)
|
||||||
|
path = os.path.join(self.basepath, filename)
|
||||||
|
return CRM114Filter(self.script, path, fields['build_status'])
|
||||||
|
|
||||||
|
|
||||||
|
class OsloSeverityFilter(object):
|
||||||
|
DATEFMT = r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}((\.|\,)\d{3,6})?'
|
||||||
|
SEVERITYFMT = '(DEBUG|INFO|WARNING|ERROR|TRACE|AUDIT|CRITICAL)'
|
||||||
|
OSLO_LOGMATCH = (r'^(?P<date>%s)(?P<line>(?P<pid> \d+)? '
|
||||||
|
'(?P<severity>%s).*)' %
|
||||||
|
(DATEFMT, SEVERITYFMT))
|
||||||
|
OSLORE = re.compile(OSLO_LOGMATCH)
|
||||||
|
|
||||||
|
def process(self, data):
|
||||||
|
msg = data['message']
|
||||||
|
m = self.OSLORE.match(msg)
|
||||||
|
if m:
|
||||||
|
data['severity'] = m.group('severity')
|
||||||
|
if data['severity'].lower == 'debug':
|
||||||
|
# Ignore debug-level lines
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class OsloSeverityFilterFactory(object):
|
||||||
|
name = "OsloSeverity"
|
||||||
|
|
||||||
|
def create(self, fields):
|
||||||
|
return OsloSeverityFilter()
|
||||||
|
|
||||||
|
|
||||||
|
class SystemdSeverityFilter(object):
|
||||||
|
'''Match systemd DEBUG level logs
|
||||||
|
|
||||||
|
A line to match looks like:
|
||||||
|
|
||||||
|
Aug 15 18:58:49.910786 hostname devstack@keystone.service[31400]:
|
||||||
|
DEBUG uwsgi ...
|
||||||
|
'''
|
||||||
|
SYSTEMDDATE = r'\w+\s+\d+\s+\d{2}:\d{2}:\d{2}((\.|\,)\d{3,6})?'
|
||||||
|
DATEFMT = r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}((\.|\,)\d{3,6})?'
|
||||||
|
SEVERITYFMT = '(DEBUG|INFO|WARNING|ERROR|TRACE|AUDIT|CRITICAL)'
|
||||||
|
SYSTEMD_LOGMATCH = r'^(?P<date>%s)( (\S+) \S+\[\d+\]\: ' \
|
||||||
|
'(?P<severity>%s)?.*)' % (SYSTEMDDATE, SEVERITYFMT)
|
||||||
|
SYSTEMDRE = re.compile(SYSTEMD_LOGMATCH)
|
||||||
|
|
||||||
|
def process(self, data):
|
||||||
|
msg = data['message']
|
||||||
|
m = self.SYSTEMDRE.match(msg)
|
||||||
|
if m:
|
||||||
|
if m.group('severity') == 'DEBUG':
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class SystemdSeverityFilterFactory(object):
|
||||||
|
name = "SystemdSeverity"
|
||||||
|
|
||||||
|
def create(self, fields):
|
||||||
|
return SystemdSeverityFilter()
|
||||||
|
|
||||||
|
|
||||||
|
class LogRetriever(threading.Thread):
|
||||||
|
def __init__(self, gearman_worker, filters, logq,
|
||||||
|
log_cert_verify, log_ca_certs, mqtt=None):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self.gearman_worker = gearman_worker
|
||||||
|
self.filters = filters
|
||||||
|
self.logq = logq
|
||||||
|
self.mqtt = mqtt
|
||||||
|
self.log_cert_verify = log_cert_verify
|
||||||
|
self.log_ca_certs = log_ca_certs
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
self._handle_event()
|
||||||
|
except Exception:
|
||||||
|
logging.exception("Exception retrieving log event.")
|
||||||
|
|
||||||
|
def _handle_event(self):
|
||||||
|
fields = {}
|
||||||
|
num_log_lines = 0
|
||||||
|
source_url = ''
|
||||||
|
http_session = None
|
||||||
|
job = self.gearman_worker.getJob()
|
||||||
|
try:
|
||||||
|
arguments = json.loads(job.arguments.decode('utf-8'))
|
||||||
|
source_url = arguments['source_url']
|
||||||
|
event = arguments['event']
|
||||||
|
logging.debug("Handling event: " + json.dumps(event))
|
||||||
|
fields = event.get('fields') or event.get('@fields')
|
||||||
|
tags = event.get('tags') or event.get('@tags')
|
||||||
|
if fields['build_status'] != 'ABORTED':
|
||||||
|
# Handle events ignoring aborted builds. These builds are
|
||||||
|
# discarded by zuul.
|
||||||
|
file_obj, http_session = self._open_log_file_url(source_url)
|
||||||
|
|
||||||
|
try:
|
||||||
|
all_filters = []
|
||||||
|
for f in self.filters:
|
||||||
|
logging.debug("Adding filter: %s" % f.name)
|
||||||
|
all_filters.append(f.create(fields))
|
||||||
|
filters = all_filters
|
||||||
|
|
||||||
|
base_event = {}
|
||||||
|
base_event.update(fields)
|
||||||
|
base_event["tags"] = tags
|
||||||
|
for line in self._retrieve_log_line(file_obj):
|
||||||
|
keep_line = True
|
||||||
|
out_event = base_event.copy()
|
||||||
|
out_event["message"] = line
|
||||||
|
new_filters = []
|
||||||
|
for f in filters:
|
||||||
|
if not keep_line:
|
||||||
|
new_filters.append(f)
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
keep_line = f.process(out_event)
|
||||||
|
new_filters.append(f)
|
||||||
|
except FilterException:
|
||||||
|
logging.exception("Exception filtering event: "
|
||||||
|
"%s" % line.encode("utf-8"))
|
||||||
|
filters = new_filters
|
||||||
|
if keep_line:
|
||||||
|
self.logq.put(out_event)
|
||||||
|
num_log_lines += 1
|
||||||
|
|
||||||
|
logging.debug("Pushed %s log lines." % num_log_lines)
|
||||||
|
finally:
|
||||||
|
for f in all_filters:
|
||||||
|
f.close()
|
||||||
|
if http_session:
|
||||||
|
http_session.close()
|
||||||
|
job.sendWorkComplete()
|
||||||
|
# Only send mqtt events for log files we processed.
|
||||||
|
if self.mqtt and num_log_lines:
|
||||||
|
msg = json.dumps({
|
||||||
|
'build_uuid': fields.get('build_uuid'),
|
||||||
|
'source_url': source_url,
|
||||||
|
'status': 'success',
|
||||||
|
})
|
||||||
|
self.mqtt.publish_single(msg, fields.get('project'),
|
||||||
|
fields.get('build_change'),
|
||||||
|
'retrieve_logs',
|
||||||
|
fields.get('build_queue'))
|
||||||
|
except Exception as e:
|
||||||
|
logging.exception("Exception handling log event.")
|
||||||
|
job.sendWorkException(str(e).encode('utf-8'))
|
||||||
|
if self.mqtt:
|
||||||
|
msg = json.dumps({
|
||||||
|
'build_uuid': fields.get('build_uuid'),
|
||||||
|
'source_url': source_url,
|
||||||
|
'status': 'failure',
|
||||||
|
})
|
||||||
|
self.mqtt.publish_single(msg, fields.get('project'),
|
||||||
|
fields.get('build_change'),
|
||||||
|
'retrieve_logs',
|
||||||
|
fields.get('build_queue'))
|
||||||
|
|
||||||
|
def _retrieve_log_line(self, file_obj, chunk_size=4096):
|
||||||
|
if not file_obj:
|
||||||
|
return
|
||||||
|
# Response.iter_lines automatically decodes 'gzip' and 'deflate'
|
||||||
|
# encodings.
|
||||||
|
# https://requests.readthedocs.io/en/master/user/quickstart/#raw-response-content
|
||||||
|
for line in file_obj.iter_lines(chunk_size, decode_unicode=True):
|
||||||
|
yield line
|
||||||
|
|
||||||
|
def _open_log_file_url(self, source_url):
|
||||||
|
file_obj = None
|
||||||
|
|
||||||
|
kwargs = {}
|
||||||
|
if self.log_cert_verify and self.log_ca_certs:
|
||||||
|
kwargs['verify'] = self.log_ca_certs
|
||||||
|
elif not self.log_cert_verify:
|
||||||
|
kwargs['verify'] = self.log_cert_verify
|
||||||
|
|
||||||
|
try:
|
||||||
|
logging.debug("Retrieving: " + source_url)
|
||||||
|
# Use a session to persist the HTTP connection across requests
|
||||||
|
# while downloading chunks of the log file.
|
||||||
|
session = requests.Session()
|
||||||
|
session.headers = {'Accept-encoding': 'deflate, gzip'}
|
||||||
|
file_obj = session.get(source_url, stream=True, **kwargs)
|
||||||
|
file_obj.raise_for_status()
|
||||||
|
except requests.HTTPError as e:
|
||||||
|
if e.response.status_code == 404:
|
||||||
|
logging.info("Unable to retrieve %s: HTTP error 404" %
|
||||||
|
source_url)
|
||||||
|
else:
|
||||||
|
logging.exception("Unable to get log data.")
|
||||||
|
except Exception:
|
||||||
|
# Silently drop fatal errors when retrieving logs.
|
||||||
|
# TODO(clarkb): Handle these errors.
|
||||||
|
# Perhaps simply add a log message to file_obj?
|
||||||
|
logging.exception("Unable to retrieve source file.")
|
||||||
|
raise
|
||||||
|
|
||||||
|
return file_obj, session
|
||||||
|
|
||||||
|
|
||||||
|
class StdOutLogProcessor(object):
|
||||||
|
def __init__(self, logq, pretty_print=False):
|
||||||
|
self.logq = logq
|
||||||
|
self.pretty_print = pretty_print
|
||||||
|
|
||||||
|
def handle_log_event(self):
|
||||||
|
log = self.logq.get()
|
||||||
|
if self.pretty_print:
|
||||||
|
print(json.dumps(log, sort_keys=True,
|
||||||
|
indent=4, separators=(',', ': ')))
|
||||||
|
else:
|
||||||
|
print(json.dumps(log))
|
||||||
|
# Push each log event through to keep logstash up to date.
|
||||||
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
|
||||||
|
class INETLogProcessor(object):
|
||||||
|
socket_type = None
|
||||||
|
|
||||||
|
def __init__(self, logq, host, port):
|
||||||
|
self.logq = logq
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.socket = None
|
||||||
|
|
||||||
|
def _connect_socket(self):
|
||||||
|
logging.debug("Creating socket.")
|
||||||
|
self.socket = socket.socket(socket.AF_INET, self.socket_type)
|
||||||
|
self.socket.connect((self.host, self.port))
|
||||||
|
|
||||||
|
def handle_log_event(self):
|
||||||
|
log = self.logq.get()
|
||||||
|
try:
|
||||||
|
if self.socket is None:
|
||||||
|
self._connect_socket()
|
||||||
|
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
|
||||||
|
except Exception:
|
||||||
|
logging.exception("Exception sending INET event.")
|
||||||
|
# Logstash seems to take about a minute to start again. Wait 90
|
||||||
|
# seconds before attempting to reconnect. If logstash is not
|
||||||
|
# available after 90 seconds we will throw another exception and
|
||||||
|
# die.
|
||||||
|
semi_busy_wait(90)
|
||||||
|
self._connect_socket()
|
||||||
|
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
|
||||||
|
|
||||||
|
|
||||||
|
class UDPLogProcessor(INETLogProcessor):
|
||||||
|
socket_type = socket.SOCK_DGRAM
|
||||||
|
|
||||||
|
|
||||||
|
class TCPLogProcessor(INETLogProcessor):
|
||||||
|
socket_type = socket.SOCK_STREAM
|
||||||
|
|
||||||
|
|
||||||
|
class PushMQTT(object):
|
||||||
|
def __init__(self, hostname, base_topic, port=1883, client_id=None,
|
||||||
|
keepalive=60, will=None, auth=None, tls=None, qos=0):
|
||||||
|
self.hostname = hostname
|
||||||
|
self.port = port
|
||||||
|
self.client_id = client_id
|
||||||
|
self.keepalive = 60
|
||||||
|
self.will = will
|
||||||
|
self.auth = auth
|
||||||
|
self.tls = tls
|
||||||
|
self.qos = qos
|
||||||
|
self.base_topic = base_topic
|
||||||
|
|
||||||
|
def _generate_topic(self, project, job_id, action):
|
||||||
|
return '/'.join([self.base_topic, project, job_id, action])
|
||||||
|
|
||||||
|
def publish_single(self, msg, project, job_id, action, build_queue=None):
|
||||||
|
if job_id:
|
||||||
|
topic = self._generate_topic(project, job_id, action)
|
||||||
|
elif build_queue:
|
||||||
|
topic = self._generate_topic(project, build_queue, action)
|
||||||
|
else:
|
||||||
|
topic = self.base_topic + '/' + project
|
||||||
|
|
||||||
|
publish.single(topic, msg, hostname=self.hostname,
|
||||||
|
port=self.port, client_id=self.client_id,
|
||||||
|
keepalive=self.keepalive, will=self.will,
|
||||||
|
auth=self.auth, tls=self.tls, qos=self.qos)
|
||||||
|
|
||||||
|
|
||||||
|
class Server(object):
|
||||||
|
def __init__(self, config, debuglog):
|
||||||
|
# Config init.
|
||||||
|
self.config = config
|
||||||
|
self.gearman_host = self.config['gearman-host']
|
||||||
|
self.gearman_port = self.config['gearman-port']
|
||||||
|
self.output_host = self.config['output-host']
|
||||||
|
self.output_port = self.config['output-port']
|
||||||
|
self.output_mode = self.config['output-mode']
|
||||||
|
mqtt_host = self.config.get('mqtt-host')
|
||||||
|
mqtt_port = self.config.get('mqtt-port', 1883)
|
||||||
|
mqtt_user = self.config.get('mqtt-user')
|
||||||
|
mqtt_pass = self.config.get('mqtt-pass')
|
||||||
|
mqtt_topic = self.config.get('mqtt-topic', 'gearman-subunit')
|
||||||
|
mqtt_ca_certs = self.config.get('mqtt-ca-certs')
|
||||||
|
mqtt_certfile = self.config.get('mqtt-certfile')
|
||||||
|
mqtt_keyfile = self.config.get('mqtt-keyfile')
|
||||||
|
self.log_ca_certs = self.config.get('log-ca-certs')
|
||||||
|
self.log_cert_verify = self.config.get('log-cert-verify', True)
|
||||||
|
# Pythong logging output file.
|
||||||
|
self.debuglog = debuglog
|
||||||
|
self.retriever = None
|
||||||
|
self.logqueue = queue.Queue(16384)
|
||||||
|
self.processor = None
|
||||||
|
self.filter_factories = []
|
||||||
|
# Run the severity filter first so it can filter out chatty
|
||||||
|
# logs.
|
||||||
|
self.filter_factories.append(OsloSeverityFilterFactory())
|
||||||
|
self.filter_factories.append(SystemdSeverityFilterFactory())
|
||||||
|
crmscript = self.config.get('crm114-script')
|
||||||
|
crmdata = self.config.get('crm114-data')
|
||||||
|
if crmscript and crmdata:
|
||||||
|
self.filter_factories.append(
|
||||||
|
CRM114FilterFactory(crmscript, crmdata))
|
||||||
|
# Setup MQTT
|
||||||
|
self.mqtt = None
|
||||||
|
if mqtt_host:
|
||||||
|
auth = None
|
||||||
|
if mqtt_user:
|
||||||
|
auth = {'username': mqtt_user}
|
||||||
|
if mqtt_pass:
|
||||||
|
auth['password'] = mqtt_pass
|
||||||
|
tls = None
|
||||||
|
if mqtt_ca_certs:
|
||||||
|
tls = {'ca_certs': mqtt_ca_certs,
|
||||||
|
'certfile': mqtt_certfile,
|
||||||
|
'keyfile': mqtt_keyfile}
|
||||||
|
|
||||||
|
self.mqtt = PushMQTT(mqtt_host, mqtt_topic, port=mqtt_port,
|
||||||
|
auth=auth, tls=tls)
|
||||||
|
|
||||||
|
def setup_logging(self):
|
||||||
|
if self.debuglog:
|
||||||
|
logging.basicConfig(format='%(asctime)s %(message)s',
|
||||||
|
filename=self.debuglog, level=logging.DEBUG)
|
||||||
|
else:
|
||||||
|
# Prevent leakage into the logstash log stream.
|
||||||
|
logging.basicConfig(level=logging.CRITICAL)
|
||||||
|
logging.debug("Log pusher starting.")
|
||||||
|
|
||||||
|
def wait_for_name_resolution(self, host, port):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
socket.getaddrinfo(host, port)
|
||||||
|
except socket.gaierror as e:
|
||||||
|
if e.errno == socket.EAI_AGAIN:
|
||||||
|
logging.debug("Temporary failure in name resolution")
|
||||||
|
time.sleep(2)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
break
|
||||||
|
|
||||||
|
def setup_retriever(self):
|
||||||
|
hostname = socket.gethostname()
|
||||||
|
gearman_worker = gear.Worker(hostname + '-pusher')
|
||||||
|
self.wait_for_name_resolution(self.gearman_host, self.gearman_port)
|
||||||
|
gearman_worker.addServer(self.gearman_host,
|
||||||
|
self.gearman_port)
|
||||||
|
gearman_worker.registerFunction(b'push-log')
|
||||||
|
self.retriever = LogRetriever(gearman_worker, self.filter_factories,
|
||||||
|
self.logqueue, self.log_cert_verify,
|
||||||
|
self.log_ca_certs, mqtt=self.mqtt)
|
||||||
|
|
||||||
|
def setup_processor(self):
|
||||||
|
if self.output_mode == "tcp":
|
||||||
|
self.processor = TCPLogProcessor(self.logqueue,
|
||||||
|
self.output_host,
|
||||||
|
self.output_port)
|
||||||
|
elif self.output_mode == "udp":
|
||||||
|
self.processor = UDPLogProcessor(self.logqueue,
|
||||||
|
self.output_host,
|
||||||
|
self.output_port)
|
||||||
|
else:
|
||||||
|
# Note this processor will not work if the process is run as a
|
||||||
|
# daemon. You must use the --foreground option.
|
||||||
|
self.processor = StdOutLogProcessor(self.logqueue)
|
||||||
|
|
||||||
|
def main(self):
|
||||||
|
self.setup_retriever()
|
||||||
|
self.setup_processor()
|
||||||
|
|
||||||
|
self.retriever.daemon = True
|
||||||
|
self.retriever.start()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
self.processor.handle_log_event()
|
||||||
|
except Exception:
|
||||||
|
logging.exception("Exception processing log event.")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("-c", "--config", required=True,
|
||||||
|
help="Path to yaml config file.")
|
||||||
|
parser.add_argument("-d", "--debuglog",
|
||||||
|
help="Enable debug log. "
|
||||||
|
"Specifies file to write log to.")
|
||||||
|
parser.add_argument("--foreground", action='store_true',
|
||||||
|
help="Run in the foreground.")
|
||||||
|
parser.add_argument("-p", "--pidfile",
|
||||||
|
default="/var/run/jenkins-log-pusher/"
|
||||||
|
"jenkins-log-gearman-worker.pid",
|
||||||
|
help="PID file to lock during daemonization.")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
with open(args.config, 'r') as config_stream:
|
||||||
|
config = yaml.safe_load(config_stream)
|
||||||
|
server = Server(config, args.debuglog)
|
||||||
|
|
||||||
|
if args.foreground:
|
||||||
|
server.setup_logging()
|
||||||
|
server.main()
|
||||||
|
else:
|
||||||
|
pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10)
|
||||||
|
with daemon.DaemonContext(pidfile=pidfile):
|
||||||
|
server.setup_logging()
|
||||||
|
server.main()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
6
loggearman/requirements.txt
Normal file
6
loggearman/requirements.txt
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
pbr>=1.6 # Apache-2.0
|
||||||
|
gear<=0.16.0
|
||||||
|
requests<=2.26.0 # Apache-2.0
|
||||||
|
PyYAML<=6.0 # MIT
|
||||||
|
pyzmq<=21.0.2
|
||||||
|
paho-mqtt<=1.6.1
|
26
loggearman/setup.cfg
Normal file
26
loggearman/setup.cfg
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
[metadata]
|
||||||
|
name = loggearman
|
||||||
|
summary = OpenStack Log Processor Module
|
||||||
|
description_file =
|
||||||
|
README.rst
|
||||||
|
author = Openstack Contributors
|
||||||
|
author_email = service-discuss@lists.opendev.org
|
||||||
|
home_page = http://docs.openstack.org/infra/ci-log-processing
|
||||||
|
classifier =
|
||||||
|
Environment :: OpenStack
|
||||||
|
Intended Audience :: Information Technology
|
||||||
|
Intended Audience :: System Administrators
|
||||||
|
License :: OSI Approved :: Apache Software License
|
||||||
|
Operating System :: POSIX :: Linux
|
||||||
|
Programming Language :: Python
|
||||||
|
|
||||||
|
[build_sphinx]
|
||||||
|
all_files = 1
|
||||||
|
build-dir = doc/build
|
||||||
|
source-dir = doc/source
|
||||||
|
warning-is-error = 1
|
||||||
|
|
||||||
|
[entry_points]
|
||||||
|
console_scripts =
|
||||||
|
log-gearman-client = loggearman.client:main
|
||||||
|
log-gearman-worker = loggearman.worker:main
|
20
loggearman/setup.py
Normal file
20
loggearman/setup.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
# Copyright (C) 2021 Red Hat
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import setuptools
|
||||||
|
|
||||||
|
setuptools.setup(
|
||||||
|
setup_requires=['pbr'],
|
||||||
|
pbr=True
|
||||||
|
)
|
Loading…
x
Reference in New Issue
Block a user