diff --git a/gerritbot/bot.py b/gerritbot/bot.py index d024537..624452a 100755 --- a/gerritbot/bot.py +++ b/gerritbot/bot.py @@ -26,12 +26,18 @@ force_ssl=false server_password=SERVERPASS channel_config=/path/to/yaml/config pid=/path/to/pid_file +use_mqtt=True [gerrit] user=gerrit2 key=/path/to/id_rsa host=review.example.com port=29418 + +[mqtt] +host=example.com +port=1883 +websocket=False """ # The yaml channel config should look like: @@ -50,6 +56,7 @@ openstack-dev: import ConfigParser import daemon import irc.bot +import json import logging.config import os import re @@ -59,6 +66,8 @@ import threading import time import yaml +import paho.mqtt.client as mqtt + try: import daemon.pidlockfile pid_file_module = daemon.pidlockfile @@ -243,6 +252,8 @@ class Gerrit(threading.Thread): # The data we care about was not present, no channels want # this event. channel_set = set() + if not channel_set: + channel_set = set() self.log.info('Potential channels to receive event notification: %s' % channel_set) for channel in channel_set: @@ -271,6 +282,58 @@ class Gerrit(threading.Thread): self.connected = False +class GerritMQTT(Gerrit): + def __init__(self, ircbot, channel_config, server, base_topic='gerrit', + port=1883, websocket=False): + threading.Thread.__init__(self) + self.ircbot = ircbot + self.channel_config = channel_config + self.log = logging.getLogger('gerritbot') + self.server = server + self.port = port + self.websocket = websocket + self.base_topic = base_topic + self.connected = False + + def connect(self): + try: + self.client.connect(self.server, port=self.port) + + self.log.info('Start watching Gerrit event stream via mqtt!.') + self.connected = True + except Exception: + self.log.exception('Exception while connecting to mqtt') + self.client.reinitialise() + self.connected = False + # Delay before attempting again. + time.sleep(1) + + def run(self): + def _on_connect(client, userdata, flags, rc): + client.subscribe(self.base_topic + '/#') + + def _on_message(client, userdata, msg): + data = json.loads(msg.payload) + if data: + self._read(data) + + if self.websocket: + self.client = mqtt.Client(transport='websockets') + else: + self.client = mqtt.Client() + self.client.on_connect = _on_connect + self.client.on_message = _on_message + + while True: + while not self.connected: + self.connect() + try: + self.client.loop() + except Exception: + self.log.exception('Exception encountered in event loop') + time.sleep(5) + + class ChannelConfig(object): def __init__(self, data): self.data = data @@ -322,12 +385,25 @@ def _main(config): config.getint('ircbot', 'port'), config.getboolean('ircbot', 'force_ssl'), config.get('ircbot', 'server_password')) - g = Gerrit(bot, - channel_config, - config.get('gerrit', 'host'), - config.get('gerrit', 'user'), - config.getint('gerrit', 'port'), - config.get('gerrit', 'key')) + if config.has_option('ircbot', 'use_mqtt'): + use_mqtt = config.getboolean('ircbot', 'use_mqtt') + else: + use_mqtt = False + + if use_mqtt: + g = GerritMQTT(bot, + channel_config, + config.get('mqtt', 'host'), + config.get('mqtt', 'base_topic'), + config.getint('mqtt', 'port'), + config.getboolean('mqtt', 'websocket')) + else: + g = Gerrit(bot, + channel_config, + config.get('gerrit', 'host'), + config.get('gerrit', 'user'), + config.getint('gerrit', 'port'), + config.get('gerrit', 'key')) g.start() bot.start() diff --git a/requirements.txt b/requirements.txt index 0edc21f..42c316d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ gerritlib irc pyyaml python-daemon +paho-mqtt>=1.2