Add gerrit stream over MQTT support to gerritbot

With the start of the germqtt project and it being deployed on
firehose.openstack.org services listening to gerrit no longer need
to connect via ssh to get the event stream. This commit adds support
for using an mqtt event stream as the input into gerritbot.

Change-Id: I4130edac746ab7ae979cad5e3ccad3be0321c909
This commit is contained in:
Matthew Treinish 2016-08-03 11:18:50 -04:00
parent d48b1bc417
commit 7c6e57983d
No known key found for this signature in database
GPG Key ID: FD12A0F214C9E177
2 changed files with 83 additions and 6 deletions

View File

@ -26,12 +26,18 @@ force_ssl=false
server_password=SERVERPASS server_password=SERVERPASS
channel_config=/path/to/yaml/config channel_config=/path/to/yaml/config
pid=/path/to/pid_file pid=/path/to/pid_file
use_mqtt=True
[gerrit] [gerrit]
user=gerrit2 user=gerrit2
key=/path/to/id_rsa key=/path/to/id_rsa
host=review.example.com host=review.example.com
port=29418 port=29418
[mqtt]
host=example.com
port=1883
websocket=False
""" """
# The yaml channel config should look like: # The yaml channel config should look like:
@ -50,6 +56,7 @@ openstack-dev:
import ConfigParser import ConfigParser
import daemon import daemon
import irc.bot import irc.bot
import json
import logging.config import logging.config
import os import os
import re import re
@ -59,6 +66,8 @@ import threading
import time import time
import yaml import yaml
import paho.mqtt.client as mqtt
try: try:
import daemon.pidlockfile import daemon.pidlockfile
pid_file_module = daemon.pidlockfile pid_file_module = daemon.pidlockfile
@ -243,6 +252,8 @@ class Gerrit(threading.Thread):
# The data we care about was not present, no channels want # The data we care about was not present, no channels want
# this event. # this event.
channel_set = set() channel_set = set()
if not channel_set:
channel_set = set()
self.log.info('Potential channels to receive event notification: %s' % self.log.info('Potential channels to receive event notification: %s' %
channel_set) channel_set)
for channel in channel_set: for channel in channel_set:
@ -271,6 +282,58 @@ class Gerrit(threading.Thread):
self.connected = False self.connected = False
class GerritMQTT(Gerrit):
def __init__(self, ircbot, channel_config, server, base_topic='gerrit',
port=1883, websocket=False):
threading.Thread.__init__(self)
self.ircbot = ircbot
self.channel_config = channel_config
self.log = logging.getLogger('gerritbot')
self.server = server
self.port = port
self.websocket = websocket
self.base_topic = base_topic
self.connected = False
def connect(self):
try:
self.client.connect(self.server, port=self.port)
self.log.info('Start watching Gerrit event stream via mqtt!.')
self.connected = True
except Exception:
self.log.exception('Exception while connecting to mqtt')
self.client.reinitialise()
self.connected = False
# Delay before attempting again.
time.sleep(1)
def run(self):
def _on_connect(client, userdata, flags, rc):
client.subscribe(self.base_topic + '/#')
def _on_message(client, userdata, msg):
data = json.loads(msg.payload)
if data:
self._read(data)
if self.websocket:
self.client = mqtt.Client(transport='websockets')
else:
self.client = mqtt.Client()
self.client.on_connect = _on_connect
self.client.on_message = _on_message
while True:
while not self.connected:
self.connect()
try:
self.client.loop()
except Exception:
self.log.exception('Exception encountered in event loop')
time.sleep(5)
class ChannelConfig(object): class ChannelConfig(object):
def __init__(self, data): def __init__(self, data):
self.data = data self.data = data
@ -322,12 +385,25 @@ def _main(config):
config.getint('ircbot', 'port'), config.getint('ircbot', 'port'),
config.getboolean('ircbot', 'force_ssl'), config.getboolean('ircbot', 'force_ssl'),
config.get('ircbot', 'server_password')) config.get('ircbot', 'server_password'))
g = Gerrit(bot, if config.has_option('ircbot', 'use_mqtt'):
channel_config, use_mqtt = config.getboolean('ircbot', 'use_mqtt')
config.get('gerrit', 'host'), else:
config.get('gerrit', 'user'), use_mqtt = False
config.getint('gerrit', 'port'),
config.get('gerrit', 'key')) if use_mqtt:
g = GerritMQTT(bot,
channel_config,
config.get('mqtt', 'host'),
config.get('mqtt', 'base_topic'),
config.getint('mqtt', 'port'),
config.getboolean('mqtt', 'websocket'))
else:
g = Gerrit(bot,
channel_config,
config.get('gerrit', 'host'),
config.get('gerrit', 'user'),
config.getint('gerrit', 'port'),
config.get('gerrit', 'key'))
g.start() g.start()
bot.start() bot.start()

View File

@ -4,3 +4,4 @@ gerritlib
irc irc
pyyaml pyyaml
python-daemon python-daemon
paho-mqtt>=1.2