Browse Source

Add gerrit stream over MQTT support to gerritbot

With the start of the germqtt project and it being deployed on
firehose.openstack.org services listening to gerrit no longer need
to connect via ssh to get the event stream. This commit adds support
for using an mqtt event stream as the input into gerritbot.

Change-Id: I4130edac746ab7ae979cad5e3ccad3be0321c909
tags/0.3.0
Matthew Treinish 3 years ago
parent
commit
7c6e57983d
No account linked to committer's email address
2 changed files with 83 additions and 6 deletions
  1. +82
    -6
      gerritbot/bot.py
  2. +1
    -0
      requirements.txt

+ 82
- 6
gerritbot/bot.py View File

@@ -26,12 +26,18 @@ force_ssl=false
server_password=SERVERPASS
channel_config=/path/to/yaml/config
pid=/path/to/pid_file
use_mqtt=True

[gerrit]
user=gerrit2
key=/path/to/id_rsa
host=review.example.com
port=29418

[mqtt]
host=example.com
port=1883
websocket=False
"""

# The yaml channel config should look like:
@@ -50,6 +56,7 @@ openstack-dev:
import ConfigParser
import daemon
import irc.bot
import json
import logging.config
import os
import re
@@ -59,6 +66,8 @@ import threading
import time
import yaml

import paho.mqtt.client as mqtt

try:
import daemon.pidlockfile
pid_file_module = daemon.pidlockfile
@@ -243,6 +252,8 @@ class Gerrit(threading.Thread):
# The data we care about was not present, no channels want
# this event.
channel_set = set()
if not channel_set:
channel_set = set()
self.log.info('Potential channels to receive event notification: %s' %
channel_set)
for channel in channel_set:
@@ -271,6 +282,58 @@ class Gerrit(threading.Thread):
self.connected = False


class GerritMQTT(Gerrit):
def __init__(self, ircbot, channel_config, server, base_topic='gerrit',
port=1883, websocket=False):
threading.Thread.__init__(self)
self.ircbot = ircbot
self.channel_config = channel_config
self.log = logging.getLogger('gerritbot')
self.server = server
self.port = port
self.websocket = websocket
self.base_topic = base_topic
self.connected = False

def connect(self):
try:
self.client.connect(self.server, port=self.port)

self.log.info('Start watching Gerrit event stream via mqtt!.')
self.connected = True
except Exception:
self.log.exception('Exception while connecting to mqtt')
self.client.reinitialise()
self.connected = False
# Delay before attempting again.
time.sleep(1)

def run(self):
def _on_connect(client, userdata, flags, rc):
client.subscribe(self.base_topic + '/#')

def _on_message(client, userdata, msg):
data = json.loads(msg.payload)
if data:
self._read(data)

if self.websocket:
self.client = mqtt.Client(transport='websockets')
else:
self.client = mqtt.Client()
self.client.on_connect = _on_connect
self.client.on_message = _on_message

while True:
while not self.connected:
self.connect()
try:
self.client.loop()
except Exception:
self.log.exception('Exception encountered in event loop')
time.sleep(5)


class ChannelConfig(object):
def __init__(self, data):
self.data = data
@@ -322,12 +385,25 @@ def _main(config):
config.getint('ircbot', 'port'),
config.getboolean('ircbot', 'force_ssl'),
config.get('ircbot', 'server_password'))
g = Gerrit(bot,
channel_config,
config.get('gerrit', 'host'),
config.get('gerrit', 'user'),
config.getint('gerrit', 'port'),
config.get('gerrit', 'key'))
if config.has_option('ircbot', 'use_mqtt'):
use_mqtt = config.getboolean('ircbot', 'use_mqtt')
else:
use_mqtt = False

if use_mqtt:
g = GerritMQTT(bot,
channel_config,
config.get('mqtt', 'host'),
config.get('mqtt', 'base_topic'),
config.getint('mqtt', 'port'),
config.getboolean('mqtt', 'websocket'))
else:
g = Gerrit(bot,
channel_config,
config.get('gerrit', 'host'),
config.get('gerrit', 'user'),
config.getint('gerrit', 'port'),
config.get('gerrit', 'key'))
g.start()
bot.start()


+ 1
- 0
requirements.txt View File

@@ -4,3 +4,4 @@ gerritlib
irc
pyyaml
python-daemon
paho-mqtt>=1.2

Loading…
Cancel
Save