Add gerrit stream over MQTT support to gerritbot

With the start of the germqtt project and it being deployed on
firehose.openstack.org services listening to gerrit no longer need
to connect via ssh to get the event stream. This commit adds support
for using an mqtt event stream as the input into gerritbot.

Change-Id: I4130edac746ab7ae979cad5e3ccad3be0321c909
This commit is contained in:
Matthew Treinish 2016-08-03 11:18:50 -04:00
parent d48b1bc417
commit 7c6e57983d
No known key found for this signature in database
GPG Key ID: FD12A0F214C9E177
2 changed files with 83 additions and 6 deletions

View File

@ -26,12 +26,18 @@ force_ssl=false
server_password=SERVERPASS
channel_config=/path/to/yaml/config
pid=/path/to/pid_file
use_mqtt=True
[gerrit]
user=gerrit2
key=/path/to/id_rsa
host=review.example.com
port=29418
[mqtt]
host=example.com
port=1883
websocket=False
"""
# The yaml channel config should look like:
@ -50,6 +56,7 @@ openstack-dev:
import ConfigParser
import daemon
import irc.bot
import json
import logging.config
import os
import re
@ -59,6 +66,8 @@ import threading
import time
import yaml
import paho.mqtt.client as mqtt
try:
import daemon.pidlockfile
pid_file_module = daemon.pidlockfile
@ -243,6 +252,8 @@ class Gerrit(threading.Thread):
# The data we care about was not present, no channels want
# this event.
channel_set = set()
if not channel_set:
channel_set = set()
self.log.info('Potential channels to receive event notification: %s' %
channel_set)
for channel in channel_set:
@ -271,6 +282,58 @@ class Gerrit(threading.Thread):
self.connected = False
class GerritMQTT(Gerrit):
def __init__(self, ircbot, channel_config, server, base_topic='gerrit',
port=1883, websocket=False):
threading.Thread.__init__(self)
self.ircbot = ircbot
self.channel_config = channel_config
self.log = logging.getLogger('gerritbot')
self.server = server
self.port = port
self.websocket = websocket
self.base_topic = base_topic
self.connected = False
def connect(self):
try:
self.client.connect(self.server, port=self.port)
self.log.info('Start watching Gerrit event stream via mqtt!.')
self.connected = True
except Exception:
self.log.exception('Exception while connecting to mqtt')
self.client.reinitialise()
self.connected = False
# Delay before attempting again.
time.sleep(1)
def run(self):
def _on_connect(client, userdata, flags, rc):
client.subscribe(self.base_topic + '/#')
def _on_message(client, userdata, msg):
data = json.loads(msg.payload)
if data:
self._read(data)
if self.websocket:
self.client = mqtt.Client(transport='websockets')
else:
self.client = mqtt.Client()
self.client.on_connect = _on_connect
self.client.on_message = _on_message
while True:
while not self.connected:
self.connect()
try:
self.client.loop()
except Exception:
self.log.exception('Exception encountered in event loop')
time.sleep(5)
class ChannelConfig(object):
def __init__(self, data):
self.data = data
@ -322,12 +385,25 @@ def _main(config):
config.getint('ircbot', 'port'),
config.getboolean('ircbot', 'force_ssl'),
config.get('ircbot', 'server_password'))
g = Gerrit(bot,
channel_config,
config.get('gerrit', 'host'),
config.get('gerrit', 'user'),
config.getint('gerrit', 'port'),
config.get('gerrit', 'key'))
if config.has_option('ircbot', 'use_mqtt'):
use_mqtt = config.getboolean('ircbot', 'use_mqtt')
else:
use_mqtt = False
if use_mqtt:
g = GerritMQTT(bot,
channel_config,
config.get('mqtt', 'host'),
config.get('mqtt', 'base_topic'),
config.getint('mqtt', 'port'),
config.getboolean('mqtt', 'websocket'))
else:
g = Gerrit(bot,
channel_config,
config.get('gerrit', 'host'),
config.get('gerrit', 'user'),
config.getint('gerrit', 'port'),
config.get('gerrit', 'key'))
g.start()
bot.start()

View File

@ -4,3 +4,4 @@ gerritlib
irc
pyyaml
python-daemon
paho-mqtt>=1.2