Browse Source

Add gerrit stream over MQTT support to gerritbot

With the start of the germqtt project and it being deployed on
firehose.openstack.org services listening to gerrit no longer need
to connect via ssh to get the event stream. This commit adds support
for using an mqtt event stream as the input into gerritbot.

Change-Id: I4130edac746ab7ae979cad5e3ccad3be0321c909
tags/0.3.0
Matthew Treinish 3 years ago
parent
commit
7c6e57983d
No account linked to committer's email address
2 changed files with 83 additions and 6 deletions
  1. 82
    6
      gerritbot/bot.py
  2. 1
    0
      requirements.txt

+ 82
- 6
gerritbot/bot.py View File

@@ -26,12 +26,18 @@ force_ssl=false
26 26
 server_password=SERVERPASS
27 27
 channel_config=/path/to/yaml/config
28 28
 pid=/path/to/pid_file
29
+use_mqtt=True
29 30
 
30 31
 [gerrit]
31 32
 user=gerrit2
32 33
 key=/path/to/id_rsa
33 34
 host=review.example.com
34 35
 port=29418
36
+
37
+[mqtt]
38
+host=example.com
39
+port=1883
40
+websocket=False
35 41
 """
36 42
 
37 43
 # The yaml channel config should look like:
@@ -50,6 +56,7 @@ openstack-dev:
50 56
 import ConfigParser
51 57
 import daemon
52 58
 import irc.bot
59
+import json
53 60
 import logging.config
54 61
 import os
55 62
 import re
@@ -59,6 +66,8 @@ import threading
59 66
 import time
60 67
 import yaml
61 68
 
69
+import paho.mqtt.client as mqtt
70
+
62 71
 try:
63 72
     import daemon.pidlockfile
64 73
     pid_file_module = daemon.pidlockfile
@@ -243,6 +252,8 @@ class Gerrit(threading.Thread):
243 252
             # The data we care about was not present, no channels want
244 253
             # this event.
245 254
             channel_set = set()
255
+        if not channel_set:
256
+            channel_set = set()
246 257
         self.log.info('Potential channels to receive event notification: %s' %
247 258
                       channel_set)
248 259
         for channel in channel_set:
@@ -271,6 +282,58 @@ class Gerrit(threading.Thread):
271 282
                     self.connected = False
272 283
 
273 284
 
285
+class GerritMQTT(Gerrit):
286
+    def __init__(self, ircbot, channel_config, server, base_topic='gerrit',
287
+                 port=1883, websocket=False):
288
+        threading.Thread.__init__(self)
289
+        self.ircbot = ircbot
290
+        self.channel_config = channel_config
291
+        self.log = logging.getLogger('gerritbot')
292
+        self.server = server
293
+        self.port = port
294
+        self.websocket = websocket
295
+        self.base_topic = base_topic
296
+        self.connected = False
297
+
298
+    def connect(self):
299
+        try:
300
+            self.client.connect(self.server, port=self.port)
301
+
302
+            self.log.info('Start watching Gerrit event stream via mqtt!.')
303
+            self.connected = True
304
+        except Exception:
305
+            self.log.exception('Exception while connecting to mqtt')
306
+            self.client.reinitialise()
307
+            self.connected = False
308
+            # Delay before attempting again.
309
+            time.sleep(1)
310
+
311
+    def run(self):
312
+        def _on_connect(client, userdata, flags, rc):
313
+            client.subscribe(self.base_topic + '/#')
314
+
315
+        def _on_message(client, userdata, msg):
316
+            data = json.loads(msg.payload)
317
+            if data:
318
+                self._read(data)
319
+
320
+        if self.websocket:
321
+            self.client = mqtt.Client(transport='websockets')
322
+        else:
323
+            self.client = mqtt.Client()
324
+        self.client.on_connect = _on_connect
325
+        self.client.on_message = _on_message
326
+
327
+        while True:
328
+            while not self.connected:
329
+                self.connect()
330
+            try:
331
+                self.client.loop()
332
+            except Exception:
333
+                self.log.exception('Exception encountered in event loop')
334
+                time.sleep(5)
335
+
336
+
274 337
 class ChannelConfig(object):
275 338
     def __init__(self, data):
276 339
         self.data = data
@@ -322,12 +385,25 @@ def _main(config):
322 385
                     config.getint('ircbot', 'port'),
323 386
                     config.getboolean('ircbot', 'force_ssl'),
324 387
                     config.get('ircbot', 'server_password'))
325
-    g = Gerrit(bot,
326
-               channel_config,
327
-               config.get('gerrit', 'host'),
328
-               config.get('gerrit', 'user'),
329
-               config.getint('gerrit', 'port'),
330
-               config.get('gerrit', 'key'))
388
+    if config.has_option('ircbot', 'use_mqtt'):
389
+        use_mqtt = config.getboolean('ircbot', 'use_mqtt')
390
+    else:
391
+        use_mqtt = False
392
+
393
+    if use_mqtt:
394
+        g = GerritMQTT(bot,
395
+                       channel_config,
396
+                       config.get('mqtt', 'host'),
397
+                       config.get('mqtt', 'base_topic'),
398
+                       config.getint('mqtt', 'port'),
399
+                       config.getboolean('mqtt', 'websocket'))
400
+    else:
401
+        g = Gerrit(bot,
402
+                   channel_config,
403
+                   config.get('gerrit', 'host'),
404
+                   config.get('gerrit', 'user'),
405
+                   config.getint('gerrit', 'port'),
406
+                   config.get('gerrit', 'key'))
331 407
     g.start()
332 408
     bot.start()
333 409
 

+ 1
- 0
requirements.txt View File

@@ -4,3 +4,4 @@ gerritlib
4 4
 irc
5 5
 pyyaml
6 6
 python-daemon
7
+paho-mqtt>=1.2

Loading…
Cancel
Save