Add optional MQTT notification support
This commit adds mqtt support to statusbot. With this when statusbot is configured correctly it will be able to emit events to an MQTT broker for each command it receives. The intent here is to integrate this into firehose.openstack.org[1] [1] https://docs.openstack.org/infra/system-config/firehose.html Change-Id: Iace1cd52643a95b019287acfa1b17621251a52ee
This commit is contained in:
parent
7b3a04a575
commit
67b7d951e2
|
@ -60,6 +60,7 @@ import re
|
|||
import ssl
|
||||
import twitter
|
||||
import urllib
|
||||
from paho.mqtt import publish
|
||||
|
||||
try:
|
||||
import daemon.pidlockfile as pid_file_module
|
||||
|
@ -76,6 +77,70 @@ irc.client.ServerConnection.buffer_class.errors = 'replace'
|
|||
ANTI_FLOOD_SLEEP = 2
|
||||
|
||||
|
||||
class MQTTPublisher(object):
|
||||
def __init__(self, config):
|
||||
self.hostname = config.get('mqtt', 'hostname')
|
||||
if config.has_option('mqtt', 'port'):
|
||||
self.port = config.get('mqtt', 'port')
|
||||
else:
|
||||
self.port = 1883
|
||||
if config.has_option('mqtt', 'keepalive'):
|
||||
self.keepalive = config.get('mqtt', 'keepalive')
|
||||
else:
|
||||
self.keepalive = 60
|
||||
if config.has_option('mqtt', 'basetopic'):
|
||||
self.basetopic = config.get('mqtt', 'basetopic')
|
||||
else:
|
||||
self.basetopic = 'statusbot'
|
||||
|
||||
# Configure auth
|
||||
self.auth = None
|
||||
if config.has_option('mqtt', 'username'):
|
||||
mqtt_username = config.get('mqtt', 'username')
|
||||
else:
|
||||
mqtt_username = None
|
||||
if config.has_option('mqtt', 'password'):
|
||||
mqtt_password = config.get('mqtt', 'password')
|
||||
else:
|
||||
mqtt_password = None
|
||||
if mqtt_username:
|
||||
self.auth = {'username': mqtt_username}
|
||||
if mqtt_password:
|
||||
self.auth['password'] = mqtt_password
|
||||
|
||||
# QOS setting
|
||||
if config.has_option('mqtt', 'qos'):
|
||||
self.qos = config.getint('mqtt', 'qos')
|
||||
else:
|
||||
self.qos = 0
|
||||
# TLS config
|
||||
certfile = None
|
||||
if config.has_option('mqtt', 'certfile'):
|
||||
certfile = config.get('mqtt', 'certfile')
|
||||
keyfile = None
|
||||
if config.has_option('mqtt', 'keyfile'):
|
||||
keyfile = config.get('mqtt', 'keyfile')
|
||||
self.tls = None
|
||||
if config.has_option('mqtt', 'ca_certs'):
|
||||
self.tls = {'ca_certs': config.get('mqtt', 'ca_certs'),
|
||||
'certfile': certfile,
|
||||
'keyfile': keyfile}
|
||||
|
||||
def publish_single(self, topic, msg):
|
||||
topic = self.basetopic + '/' + topic
|
||||
publish.single(topic, msg, hostname=self.hostname,
|
||||
port=self.port, client_id=self.client_id,
|
||||
keepalive=self.keepalive, will=self.will,
|
||||
auth=self.auth, tls=self.tls, qos=self.qos)
|
||||
|
||||
def publish_multiple(self, topic, msg):
|
||||
topic = self.basetopic + '/' + topic
|
||||
publish.multiple(topic, msg, hostname=self.hostname,
|
||||
port=self.port, client_id=self.client_id,
|
||||
keepalive=self.keepalive, will=self.will,
|
||||
auth=self.auth, tls=self.tls, qos=self.qos)
|
||||
|
||||
|
||||
class WikiPage(object):
|
||||
def __init__(self, config):
|
||||
self.url = config.get('wiki', 'url')
|
||||
|
@ -316,7 +381,7 @@ class StatusBot(irc.bot.SingleServerIRCBot):
|
|||
log = logging.getLogger("statusbot.bot")
|
||||
|
||||
def __init__(self, channels, nicks, publishers, successlog, thankslog,
|
||||
nickname, password, server, port=6667):
|
||||
nickname, password, server, port=6667, mqtt_publisher=None):
|
||||
if port == 6697:
|
||||
factory = irc.connection.Factory(wrapper=ssl.wrap_socket)
|
||||
irc.bot.SingleServerIRCBot.__init__(self,
|
||||
|
@ -337,6 +402,7 @@ class StatusBot(irc.bot.SingleServerIRCBot):
|
|||
self.publishers = publishers
|
||||
self.successlog = successlog
|
||||
self.thankslog = thankslog
|
||||
self.mqtt_publisher = mqtt_publisher
|
||||
|
||||
def on_nicknameinuse(self, c, e):
|
||||
self.log.debug("Nickname in use, releasing")
|
||||
|
@ -375,9 +441,11 @@ class StatusBot(irc.bot.SingleServerIRCBot):
|
|||
msg = e.arguments[0][1:]
|
||||
# Unprivileged commands
|
||||
if msg.startswith('#success'):
|
||||
self.publish_mqtt('success', e.target, nick, msg)
|
||||
self.handle_success_command(e.target, nick, msg)
|
||||
return
|
||||
if msg.startswith('#thanks'):
|
||||
self.publish_mqtt('thanks', e.target, nick, msg)
|
||||
self.handle_thanks_command(e.target, nick, msg)
|
||||
return
|
||||
# Privileged commands
|
||||
|
@ -391,10 +459,26 @@ class StatusBot(irc.bot.SingleServerIRCBot):
|
|||
self.log.debug("Ignoring message from untrusted user %s" % nick)
|
||||
return
|
||||
try:
|
||||
self.publish_mqtt('status', e.target, nick, msg)
|
||||
self.handle_status_command(e.target, nick, msg)
|
||||
except Exception:
|
||||
self.log.exception("Exception handling command %s" % msg)
|
||||
|
||||
def publish_mqtt(self, command, channel, nick, msg):
|
||||
if not self.mqtt_publisher:
|
||||
return
|
||||
if command != 'status':
|
||||
topic = '/'.join([command, channel, nick])
|
||||
parts = msg.split()
|
||||
text = ' '.join(parts[1:])
|
||||
else:
|
||||
parts = msg.split()
|
||||
status_command = parts[1].lower()
|
||||
text = ' '.join(parts[2:])
|
||||
topic = '/'.join([status_command, channel, nick])
|
||||
|
||||
self.mqtt_publisher.publish_single(topic, text)
|
||||
|
||||
def handle_success_command(self, channel, nick, msg):
|
||||
parts = msg.split()
|
||||
text = ' '.join(parts[1:])
|
||||
|
@ -507,12 +591,16 @@ def _main(configpath):
|
|||
thankslog = ThanksPage(config)
|
||||
if config.has_section('twitter'):
|
||||
publishers.append(Tweet(config))
|
||||
mqtt_publisher = None
|
||||
if config.has_section('mqtt'):
|
||||
mqtt_publisher = MQTTPublisher(config)
|
||||
|
||||
bot = StatusBot(channels, nicks, publishers, successlog, thankslog,
|
||||
config.get('ircbot', 'nick'),
|
||||
config.get('ircbot', 'pass'),
|
||||
config.get('ircbot', 'server'),
|
||||
config.getint('ircbot', 'port'))
|
||||
config.getint('ircbot', 'port'),
|
||||
mqtt_publisher)
|
||||
bot.start()
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue