From 3b49bd5aa98df4b24970e8cec587b9408602a76c Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Mon, 3 Apr 2017 22:09:49 -0400 Subject: [PATCH] Fix MQTT subscriptions There were a few typos and bugs in the MQTT subscription code. This fixes those so MQTT actually connects and subscribes properly. --- mqtt_statsd/daemon.py | 49 +++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/mqtt_statsd/daemon.py b/mqtt_statsd/daemon.py index c6ee298..5e4309e 100644 --- a/mqtt_statsd/daemon.py +++ b/mqtt_statsd/daemon.py @@ -26,10 +26,12 @@ class MQTTStat(threading.Thread): def __init__(self, hostname, topic, statsd_topic, statsd_type, statsd_client, port=1883, websocket=False, client_id=None, keepalive=60, will=None, auth=None, tls=None, qos=0): + super(MQTTStat, self).__init__() self.hostname = hostname self.port = port self.client_id = client_id self.keepalive = keepalive + self.mqtt_topic = topic self.will = will self.auth = auth self.tls = tls @@ -39,27 +41,30 @@ class MQTTStat(threading.Thread): transport = "websocket" self.statsd_client = statsd_client self.statsd_topic = statsd_topic - self.statsd_method = statsd_type - - def on_message(client, userdata, msg): - if statsd_type == 'gauge': - statsd_client.gauge(statsd_topic, msg.payload) - elif statsd_type == 'timer': - statsd_client.timer(statsd_topic, msg.payload) - elif statsd_type == 'counter': - statsd_client.incr(statsd_topic) - - self.client = mqtt.Client(client=self.client_id, transport=transport) + self.statsd_type = statsd_type + self.client = mqtt.Client(transport=transport) if tls: self.client.tls_set(**tls) if auth: self.client.username_pw_set(auth['username'], password=auth.get('password')) - self.client.on_message = on_message - self.client.connect(self.hostname, self.port, self.keepalive) - self.client.subscribe(topic) + def run(self): + def on_connect(client, userdata, flags, rc): + client.subscribe(self.mqtt_topic) + + + def on_message(client, userdata, msg): + if self.statsd_type == 'gauge': + self.statsd_client.gauge(self.statsd_topic, msg.payload) + elif self.statsd_type == 'timer': + self.statsd_client.timer(self.statsd_topic, msg.payload) + elif self.statsd_type == 'counter': + self.statsd_client.incr(self.statsd_topic) + self.client.on_connect = on_connect + self.client.on_message = on_message + self.client.connect(self.hostname, self.port) self.client.loop_forever() @@ -123,15 +128,19 @@ def main(): sys.exit(2) for topic in conf['topics']: - statsd_topic = conf['topics'][topic].get('statsd_topic') - if not statsd_topic: - print('No statsd topic specified for mqtt topic %s' % topic) + mqtt_topic = topic.get('mqtt_topic') + if not mqtt_topic: + print("No mqtt_topic specified for an entry in topics list") sys.exit(3) - statsd_type = conf['topics'][topic].get('statsd_type', 'gague') + statsd_topic = topic.get('statsd_topic') + if not statsd_topic: + print('No statsd topic specified for mqtt topic %s' % mqtt_topic) + sys.exit(3) + statsd_type = topic.get('statsd_type', 'gauge') if statsd_type not in ['gauge', 'counter', 'timer']: print('statsd_type %s on topic %s is not a valid type' % ( - statsd_type, topic)) - thread = MQTTStat(mqtt_hostname, topic, statsd_topic, statsd_type, + statsd_type, mqtt_topic)) + thread = MQTTStat(mqtt_hostname, mqtt_topic, statsd_topic, statsd_type, statsd_client, mqtt_port, websocket=websocket, auth=auth, tls=tls, keepalive=mqtt_keepalive, qos=mqtt_qos)