Make log-pusher.py properly configurable.

* modules/openstack_project/files/logstash/log-pusher.py: Make the log
pusher properly configurable with a yaml configuration. As part of this
change support multiple zmq publisher inputs, multiple file retrievers,
job name filtering, and event tagging (with the filename).

* modules/openstack_project/files/logstash/jenkins-log-pusher.yaml:
Initial config for the log pusher.

* modules/openstack_project/manifests/logstash.pp: Put new log pusher:
config in place.

* modules/openstack_proejct/files/logstash/jenkins-log-pusher.init: Run
the log pusher service with the new config file.

Change-Id: I4c8405b1edfa16bbcc8f998627c6240bef23f302
Reviewed-on: https://review.openstack.org/28113
Reviewed-by: James E. Blair <corvus@inaugust.com>
Reviewed-by: Jeremy Stanley <fungi@yuggoth.org>
Approved: Jeremy Stanley <fungi@yuggoth.org>
Tested-by: Jenkins
This commit is contained in:
Clark Boylan 2013-05-02 14:54:13 -07:00 committed by Jenkins
parent aae5ffc65e
commit f07756b5bd
4 changed files with 97 additions and 37 deletions

View File

@ -16,7 +16,7 @@ PATH=/sbin:/usr/sbin:/bin:/usr/bin
DESC="Jenkins Log Pusher"
NAME=jenkins-log-pusher
DAEMON=/usr/local/bin/log-pusher.py
DAEMON_ARGS='-r -z tcp://jenkins.openstack.org:8888 -l http://logs.openstack.org -f console.html -d /var/log/logstash/pusher-debug.log -t'
DAEMON_ARGS='-c /etc/logstash/jenkins-log-pusher.yaml -d /var/log/logstash/pusher-debug.log'
#PIDFILE=/var/run/$NAME/$NAME.pid
SCRIPTNAME=/etc/init.d/$NAME
USER=logstash

View File

@ -0,0 +1,16 @@
# Defaults
source-defaults:
source-url: http://logs.openstack.org
output-host: localhost
output-port: 9999
output-mode: tcp
retry-get: False
# List of zmq event inputs.
zmq-publishers:
- tcp://jenkins.openstack.org:8888
# List of files to source logs from.
source-files:
- name: console.html
retry-get: True

View File

@ -21,17 +21,19 @@ import logging
import threading
import time
import queue
import re
import socket
import sys
import urllib.error
import urllib.request
import yaml
import zmq
class EventCatcher(threading.Thread):
def __init__(self, eventq, zmq_address):
def __init__(self, eventqs, zmq_address):
threading.Thread.__init__(self)
self.eventq = eventq
self.eventqs = eventqs
self.zmq_address = zmq_address
self._connect_zmq()
@ -57,7 +59,8 @@ class EventCatcher(threading.Thread):
string = self.socket.recv().decode('utf-8')
event = json.loads(string.split(None, 1)[1])
logging.debug("Jenkins event received: " + json.dumps(event))
self.eventq.put(event)
for eventq in self.eventqs:
eventq.put(event)
class LogRetriever(threading.Thread):
@ -75,13 +78,16 @@ class LogRetriever(threading.Thread):
'UNKNOWN': "/periodic/{build_name}/{build_number}/",
}
def __init__(self, eventq, logq, log_address, filename, retry=False):
def __init__(self, eventq, logq, log_address,
filename, retry=False, job_filter=''):
threading.Thread.__init__(self)
self.eventq = eventq
self.logq = logq
self.retry = retry
self.log_address = log_address
self.filename = filename
self.job_filter = job_filter
self.tag = [self.filename]
def run(self):
while True:
@ -94,7 +100,8 @@ class LogRetriever(threading.Thread):
event = self.eventq.get()
logging.debug("Handling event: " + json.dumps(event))
fields = self._parse_fields(event)
if fields['build_status'] != 'ABORTED':
matches = re.search(self.job_filter, fields['build_name'])
if fields['build_status'] != 'ABORTED' and matches:
# Handle events ignoring aborted builds. These builds are
# discarded by zuul.
log_lines = self._retrieve_log(fields)
@ -103,6 +110,7 @@ class LogRetriever(threading.Thread):
for line in log_lines:
out_event = {}
out_event["@fields"] = fields
out_event["@tags"] = self.tag
out_event["event_message"] = line
self.logq.put(out_event)
@ -223,7 +231,7 @@ class StdOutLogProcessor(object):
class INETLogProcessor(object):
socket_type = None
def __init__(self, logq, host='localhost', port=9999):
def __init__(self, logq, host, port):
self.logq = logq
self.host = host
self.port = port
@ -253,23 +261,11 @@ class TCPLogProcessor(INETLogProcessor):
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-z", "--zmqaddress", required=True,
help="Address to use as source for zmq events.")
parser.add_argument("-l", "--logaddress", required=True,
help="Http(s) address to use as source for log files.")
parser.add_argument("-f", "--filename", required=True,
help="Name of log file to retrieve from log server.")
parser.add_argument("-p", "--pretty", action="store_true",
help="Print pretty json.")
parser.add_argument("-r", "--retry", action="store_true",
help="Retry until full console log is retrieved.")
parser.add_argument("-c", "--config", required=True,
help="Path to yaml config file.")
parser.add_argument("-d", "--debuglog",
help="Enable debug log. "
"Specifies file to write log to.")
parser.add_argument("-u", "--udp", action="store_true",
help="Output to UDP destination.")
parser.add_argument("-t", "--tcp", action="store_true",
help="Output to TCP destination.")
args = parser.parse_args()
if args.debuglog:
@ -279,22 +275,54 @@ def main():
# Prevent leakage into the logstash log stream.
logging.basicConfig(level=logging.CRITICAL)
logging.debug("Log pusher starting.")
eventqueue = queue.Queue()
logqueue = queue.Queue()
catcher = EventCatcher(eventqueue, args.zmqaddress)
retriever = LogRetriever(eventqueue, logqueue, args.logaddress,
args.filename, retry=args.retry)
if args.tcp:
processor = TCPLogProcessor(logqueue)
elif args.udp:
processor = UDPLogProcessor(logqueue)
else:
processor = StdOutLogProcessor(logqueue, pretty_print=args.pretty)
catcher.daemon = True
catcher.start()
retriever.daemon = True
retriever.start()
config_stream = open(args.config, 'r')
config = yaml.load(config_stream)
defaults = config['source-defaults']
default_source_url = defaults['source-url']
default_output_host = defaults['output-host']
default_output_port = defaults['output-port']
default_output_mode = defaults['output-mode']
default_retry = defaults['retry-get']
event_queues = []
# TODO(clarkb) support multiple outputs
logqueue = queue.Queue()
retrievers = []
for source_file in config['source-files']:
eventqueue = queue.Queue()
event_queues.append(eventqueue)
retriever = LogRetriever(eventqueue, logqueue,
source_file.get('source-url',
default_source_url),
source_file['name'],
retry=source_file.get('retry-get',
default_retry),
job_filter=source_file.get('filter',
''))
retrievers.append(retriever)
catchers = []
for zmq_publisher in config['zmq-publishers']:
catcher = EventCatcher(event_queues, zmq_publisher)
catchers.append(catcher)
if default_output_mode == "tcp":
processor = TCPLogProcessor(logqueue,
default_output_host, default_output_port)
elif default_output_mode == "udp":
processor = UDPLogProcessor(logqueue,
default_output_host, default_output_port)
else:
processor = StdOutLogProcessor(logqueue)
for catcher in catchers:
catcher.daemon = True
catcher.start()
for retriever in retrievers:
retriever.daemon = True
retriever.start()
while True:
try:
processor.handle_log_event()

View File

@ -42,6 +42,10 @@ class openstack_project::logstash (
ensure => 'present',
}
package { 'python3-yaml':
ensure => 'present',
}
file { '/usr/local/bin/log-pusher.py':
ensure => present,
owner => 'root',
@ -51,13 +55,25 @@ class openstack_project::logstash (
require => Package['python3'],
}
file { '/etc/logstash/jenkins-log-pusher.yaml':
ensure => present,
owner => 'root',
group => 'root',
mode => '0555',
source => 'puppet:///modules/openstack_project/logstash/jenkins-log-pusher.yaml',
require => Class['logstash::indexer'],
}
file { '/etc/init.d/jenkins-log-pusher':
ensure => present,
owner => 'root',
group => 'root',
mode => '0555',
source => 'puppet:///modules/openstack_project/logstash/jenkins-log-pusher.init',
require => File['/usr/local/bin/log-pusher.py'],
require => [
File['/usr/local/bin/log-pusher.py'],
File['/etc/logstash/jenkins-log-pusher.yaml'],
],
}
service { 'jenkins-log-pusher':