From 24f10343426abb8e9b6b5dfc62a1e25ead82de43 Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Fri, 23 Feb 2018 15:52:35 -0500 Subject: [PATCH] Add optional MQTT notification support This commit adds mqtt support to statusbot. With this when statusbot is configured correctly it will be able to emit events to an MQTT broker for each command it receives. The intent here is to integrate this into firehose.openstack.org[1] [1] https://docs.openstack.org/infra/system-config/firehose.html Change-Id: Iace1cd52643a95b019287acfa1b17621251a52ee --- statusbot/bot.py | 90 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 88 insertions(+), 2 deletions(-) diff --git a/statusbot/bot.py b/statusbot/bot.py index 8811609..236dfe9 100644 --- a/statusbot/bot.py +++ b/statusbot/bot.py @@ -60,6 +60,7 @@ import re import ssl import twitter import urllib +from paho.mqtt import publish try: import daemon.pidlockfile as pid_file_module @@ -76,6 +77,70 @@ irc.client.ServerConnection.buffer_class.errors = 'replace' ANTI_FLOOD_SLEEP = 2 +class MQTTPublisher(object): + def __init__(self, config): + self.hostname = config.get('mqtt', 'hostname') + if config.has_option('mqtt', 'port'): + self.port = config.get('mqtt', 'port') + else: + self.port = 1883 + if config.has_option('mqtt', 'keepalive'): + self.keepalive = config.get('mqtt', 'keepalive') + else: + self.keepalive = 60 + if config.has_option('mqtt', 'basetopic'): + self.basetopic = config.get('mqtt', 'basetopic') + else: + self.basetopic = 'statusbot' + + # Configure auth + self.auth = None + if config.has_option('mqtt', 'username'): + mqtt_username = config.get('mqtt', 'username') + else: + mqtt_username = None + if config.has_option('mqtt', 'password'): + mqtt_password = config.get('mqtt', 'password') + else: + mqtt_password = None + if mqtt_username: + self.auth = {'username': mqtt_username} + if mqtt_password: + self.auth['password'] = mqtt_password + + # QOS setting + if config.has_option('mqtt', 'qos'): + self.qos = config.getint('mqtt', 'qos') + else: + self.qos = 0 + # TLS config + certfile = None + if config.has_option('mqtt', 'certfile'): + certfile = config.get('mqtt', 'certfile') + keyfile = None + if config.has_option('mqtt', 'keyfile'): + keyfile = config.get('mqtt', 'keyfile') + self.tls = None + if config.has_option('mqtt', 'ca_certs'): + self.tls = {'ca_certs': config.get('mqtt', 'ca_certs'), + 'certfile': certfile, + 'keyfile': keyfile} + + def publish_single(self, topic, msg): + topic = self.basetopic + '/' + topic + publish.single(topic, msg, hostname=self.hostname, + port=self.port, client_id=self.client_id, + keepalive=self.keepalive, will=self.will, + auth=self.auth, tls=self.tls, qos=self.qos) + + def publish_multiple(self, topic, msg): + topic = self.basetopic + '/' + topic + publish.multiple(topic, msg, hostname=self.hostname, + port=self.port, client_id=self.client_id, + keepalive=self.keepalive, will=self.will, + auth=self.auth, tls=self.tls, qos=self.qos) + + class WikiPage(object): def __init__(self, config): self.url = config.get('wiki', 'url') @@ -316,7 +381,7 @@ class StatusBot(irc.bot.SingleServerIRCBot): log = logging.getLogger("statusbot.bot") def __init__(self, channels, nicks, publishers, successlog, thankslog, - nickname, password, server, port=6667): + nickname, password, server, port=6667, mqtt_publisher=None): if port == 6697: factory = irc.connection.Factory(wrapper=ssl.wrap_socket) irc.bot.SingleServerIRCBot.__init__(self, @@ -337,6 +402,7 @@ class StatusBot(irc.bot.SingleServerIRCBot): self.publishers = publishers self.successlog = successlog self.thankslog = thankslog + self.mqtt_publisher = mqtt_publisher def on_nicknameinuse(self, c, e): self.log.debug("Nickname in use, releasing") @@ -375,9 +441,11 @@ class StatusBot(irc.bot.SingleServerIRCBot): msg = e.arguments[0][1:] # Unprivileged commands if msg.startswith('#success'): + self.publish_mqtt('success', e.target, nick, msg) self.handle_success_command(e.target, nick, msg) return if msg.startswith('#thanks'): + self.publish_mqtt('thanks', e.target, nick, msg) self.handle_thanks_command(e.target, nick, msg) return # Privileged commands @@ -391,10 +459,24 @@ class StatusBot(irc.bot.SingleServerIRCBot): self.log.debug("Ignoring message from untrusted user %s" % nick) return try: + self.publish_mqtt('status', e.target, nick, msg) self.handle_status_command(e.target, nick, msg) except Exception: self.log.exception("Exception handling command %s" % msg) + def publish_mqtt(self, command, channel, nick, msg): + if command != 'status': + topic = '/'.join([command, channel, nick]) + parts = msg.split() + text = ' '.join(parts[1:]) + else: + parts = msg.split() + status_command = parts[1].lower() + text = ' '.join(parts[2:]) + topic = '/'.join([status_command, channel, nick]) + + self.mqtt_publisher.publish_single(topic, text) + def handle_success_command(self, channel, nick, msg): parts = msg.split() text = ' '.join(parts[1:]) @@ -507,12 +589,16 @@ def _main(configpath): thankslog = ThanksPage(config) if config.has_section('twitter'): publishers.append(Tweet(config)) + mqtt_publisher = None + if config.has_section('mqtt'): + mqtt_publisher = MQTTPublisher(config) bot = StatusBot(channels, nicks, publishers, successlog, thankslog, config.get('ircbot', 'nick'), config.get('ircbot', 'pass'), config.get('ircbot', 'server'), - config.getint('ircbot', 'port')) + config.getint('ircbot', 'port'), + mqtt_publisher) bot.start()