Add gerrit stream over MQTT support to gerritbot
With the start of the germqtt project and it being deployed on firehose.openstack.org services listening to gerrit no longer need to connect via ssh to get the event stream. This commit adds support for using an mqtt event stream as the input into gerritbot. Change-Id: I4130edac746ab7ae979cad5e3ccad3be0321c909
This commit is contained in:
parent
d48b1bc417
commit
7c6e57983d
@ -26,12 +26,18 @@ force_ssl=false
|
||||
server_password=SERVERPASS
|
||||
channel_config=/path/to/yaml/config
|
||||
pid=/path/to/pid_file
|
||||
use_mqtt=True
|
||||
|
||||
[gerrit]
|
||||
user=gerrit2
|
||||
key=/path/to/id_rsa
|
||||
host=review.example.com
|
||||
port=29418
|
||||
|
||||
[mqtt]
|
||||
host=example.com
|
||||
port=1883
|
||||
websocket=False
|
||||
"""
|
||||
|
||||
# The yaml channel config should look like:
|
||||
@ -50,6 +56,7 @@ openstack-dev:
|
||||
import ConfigParser
|
||||
import daemon
|
||||
import irc.bot
|
||||
import json
|
||||
import logging.config
|
||||
import os
|
||||
import re
|
||||
@ -59,6 +66,8 @@ import threading
|
||||
import time
|
||||
import yaml
|
||||
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
try:
|
||||
import daemon.pidlockfile
|
||||
pid_file_module = daemon.pidlockfile
|
||||
@ -243,6 +252,8 @@ class Gerrit(threading.Thread):
|
||||
# The data we care about was not present, no channels want
|
||||
# this event.
|
||||
channel_set = set()
|
||||
if not channel_set:
|
||||
channel_set = set()
|
||||
self.log.info('Potential channels to receive event notification: %s' %
|
||||
channel_set)
|
||||
for channel in channel_set:
|
||||
@ -271,6 +282,58 @@ class Gerrit(threading.Thread):
|
||||
self.connected = False
|
||||
|
||||
|
||||
class GerritMQTT(Gerrit):
|
||||
def __init__(self, ircbot, channel_config, server, base_topic='gerrit',
|
||||
port=1883, websocket=False):
|
||||
threading.Thread.__init__(self)
|
||||
self.ircbot = ircbot
|
||||
self.channel_config = channel_config
|
||||
self.log = logging.getLogger('gerritbot')
|
||||
self.server = server
|
||||
self.port = port
|
||||
self.websocket = websocket
|
||||
self.base_topic = base_topic
|
||||
self.connected = False
|
||||
|
||||
def connect(self):
|
||||
try:
|
||||
self.client.connect(self.server, port=self.port)
|
||||
|
||||
self.log.info('Start watching Gerrit event stream via mqtt!.')
|
||||
self.connected = True
|
||||
except Exception:
|
||||
self.log.exception('Exception while connecting to mqtt')
|
||||
self.client.reinitialise()
|
||||
self.connected = False
|
||||
# Delay before attempting again.
|
||||
time.sleep(1)
|
||||
|
||||
def run(self):
|
||||
def _on_connect(client, userdata, flags, rc):
|
||||
client.subscribe(self.base_topic + '/#')
|
||||
|
||||
def _on_message(client, userdata, msg):
|
||||
data = json.loads(msg.payload)
|
||||
if data:
|
||||
self._read(data)
|
||||
|
||||
if self.websocket:
|
||||
self.client = mqtt.Client(transport='websockets')
|
||||
else:
|
||||
self.client = mqtt.Client()
|
||||
self.client.on_connect = _on_connect
|
||||
self.client.on_message = _on_message
|
||||
|
||||
while True:
|
||||
while not self.connected:
|
||||
self.connect()
|
||||
try:
|
||||
self.client.loop()
|
||||
except Exception:
|
||||
self.log.exception('Exception encountered in event loop')
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
class ChannelConfig(object):
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
@ -322,12 +385,25 @@ def _main(config):
|
||||
config.getint('ircbot', 'port'),
|
||||
config.getboolean('ircbot', 'force_ssl'),
|
||||
config.get('ircbot', 'server_password'))
|
||||
g = Gerrit(bot,
|
||||
channel_config,
|
||||
config.get('gerrit', 'host'),
|
||||
config.get('gerrit', 'user'),
|
||||
config.getint('gerrit', 'port'),
|
||||
config.get('gerrit', 'key'))
|
||||
if config.has_option('ircbot', 'use_mqtt'):
|
||||
use_mqtt = config.getboolean('ircbot', 'use_mqtt')
|
||||
else:
|
||||
use_mqtt = False
|
||||
|
||||
if use_mqtt:
|
||||
g = GerritMQTT(bot,
|
||||
channel_config,
|
||||
config.get('mqtt', 'host'),
|
||||
config.get('mqtt', 'base_topic'),
|
||||
config.getint('mqtt', 'port'),
|
||||
config.getboolean('mqtt', 'websocket'))
|
||||
else:
|
||||
g = Gerrit(bot,
|
||||
channel_config,
|
||||
config.get('gerrit', 'host'),
|
||||
config.get('gerrit', 'user'),
|
||||
config.getint('gerrit', 'port'),
|
||||
config.get('gerrit', 'key'))
|
||||
g.start()
|
||||
bot.start()
|
||||
|
||||
|
@ -4,3 +4,4 @@ gerritlib
|
||||
irc
|
||||
pyyaml
|
||||
python-daemon
|
||||
paho-mqtt>=1.2
|
||||
|
Loading…
x
Reference in New Issue
Block a user