Initial commit

This commit adds the basic framework of mqtt_statsd. It hasn't been
tested yet, so it likely doesn't work. But, the basic approach is there
and it's just a matter of figuring out where I can't type.
This commit is contained in:
Matthew Treinish 2017-04-03 16:55:47 -04:00
parent 8b9a861942
commit 5c263c402f
No known key found for this signature in database
GPG Key ID: FD12A0F214C9E177
4 changed files with 208 additions and 19 deletions

View File

@ -1,19 +1,71 @@
===============================
===========
mqtt_statsd
===============================
===========
A script to publish MQTT metrics into statsd
As the name implies this a script to publish MQTT metrics into statsd. It was
originally designed to handle metrics from the $SYS/ topics in the mosquitto
broker, but the framework is generic enough that any MQTT topic can be used.
Please fill here a long description which must be at least 3 lines wrapped on
80 cols, so that distribution package maintainers can use it in their packages.
Note that this is a hard requirement.
Installation
------------
* Free software: Apache license
* Documentation: http://docs.openstack.org/developer/mqtt_statsd
* Source: http://git.openstack.org/cgit/openstack-infra/mqtt_statsd
* Bugs: http://bugs.launchpad.net/mqtt_statsd
mqtt_statsd is available via pypi, so all you need to do is run::
Features
--------
pip install -U mqtt_statsd
* TODO
to get mqtt_statsd on your system. If you need to use a development version of
mqtt_statsd you can clone the repo and install it locally with::
git clone https://github.com/mtreinish/mqtt_statsd.git && pip install -e mqtt_statsd
which will install mqtt_statsd in your python environment in editable mode for
local development.
Configuring mqtt_statsd
-----------------------
Before you run mqtt_statsd you have to create a yaml configuration file to tell
mqtt_statsd how to connect to both the MQTT broker, and statsd. As well as which
MQTT topics to subscribe to and how to populate statsd with the data it gets
from that MQTT topic. For example::
statsd:
hostname: localhost
# port is optional, the default is shown
port: 8125
# prefix is optional, the default is shown
prefix: mosquitto.stats
mqtt:
hostname: localhost
# port is optional, the default is shown
port: 1883
# keepalive is optional, the default is shown
keepalive: 60
# username is optional, there is no default
username: foo
# password is optional, there is no default. If username isn't set this
# is ignored
password: PASS
# qos is optional, the default is shown
qos: 0
# websocket is optional, it defaults to False
websocket: True
topics:
# You can specify as many topics as you want, and mqtt_statsd will
# listen to all of them
- $SYS/broker/messages/publish/sent:
statsd_topic: publish_messages_sent
# statsd_type is optional, the default is shown. Valid options are
# gague, counter, and timer
statsd_type: gauge
- $SYS/broker/clients/connected:
statsd_topic: connected_clients
Running mqtt_statsd
-------------------
Aftering installing and configuring mqtt_statsd running it is incredibly
straightforward. Just call ``mqtt_statsd`` and it takes 1 mandatory argument,
the path to the yaml config file. For example::
mqtt_statsd my_config_file.yaml

131
mqtt_statsd/daemon.py Normal file
View File

@ -0,0 +1,131 @@
import signal
import sys
import threading
import paho.mqtt.client as mqtt
import statsd
import yaml
class MQTTStat(threading.Thread):
def __init__(self, hostname, topic, statsd_topic, statsd_type,
statsd_client, port=1883, websocket=False, client_id=None,
keepalive=60, will=None, auth=None, tls=None, qos=0):
self.hostname = hostname
self.port = port
self.client_id = client_id
self.keepalive = keepalive
self.will = will
self.auth = auth
self.tls = tls
self.qos = qos
transport = "tcp"
if websocket:
transport = "websocket"
self.statsd_client = statsd_client
self.statsd_topic = statsd_topic
self.statsd_method = statsd_type
def on_message(client, userdata, msg):
if statsd_type == 'gauge':
statsd_client.gauge(statsd_topic, msg.payload)
elif statsd_type == 'timer':
statsd_client.timer(statsd_topic, msg.payload)
elif statsd_type == 'counter':
statsd_client.incr(statsd_topic)
self.client = mqtt.Client(client=self.client_id, transport=transport)
if tls:
self.client.tls_set(**tls)
if auth:
self.client.username_pw_set(auth['username'],
password=auth.get('password'))
self.client.on_message = on_message
self.client.connect(self.hostname, self.port, self.keepalive)
self.client.subscribe(topic)
def run(self):
self.client.loop_forever()
def main():
conf = None
with open(sys.argv[1], 'r') as conf_file:
conf = yaml.load(conf_file.read())
if not conf:
print('Unable to read yaml config file %s' % str(sys.argv[1]))
sys.exit(1)
# Read statsd config
if 'statsd' not in conf:
print('No statsd section found in specified config file')
sys.exit(2)
statsd_host = conf['statsd'].get('hostname')
if not statsd_host:
print('No valid statsd hostname provided in config file')
sys.exit(2)
statsd_port = conf['statsd'].get('port', 8125)
if not statsd_port:
print('No valid statsd port provided in config file')
statsd_prefix = conf['statsd'].get('prefix', 'mosquitto.stats')
statsd_client = statsd.StatsClient(host=statsd_host, port=statsd_port,
prefix=statsd_prefix)
# Read MQTT config
if 'mqtt' not in conf:
print('No MQTT section found in the specified config file')
sys.exit(2)
mqtt_hostname = conf['mqtt'].get('hostname')
if not mqtt_hostname:
print('No valid mqtt hostname provided in the config file')
sys.exit(2)
mqtt_port = conf['mqtt'].get('port', 1883)
mqtt_keepalive = conf['mqtt'].get('keepalive', 60)
# Configure MQTT auth
auth = None
username = conf['mqtt'].get('username')
if username:
auth = {'username': username}
password = conf['mqtt'].get('password')
if password and auth:
auth['password'] = password
# Max QOS
mqtt_qos = conf['mqtt'].get('qos', 0)
# Use websockets
websocket = conf['mqtt'].get('websocket', False)
# TLS configuration
ca_certs = conf['mqtt'].get('ca_certs')
certfile = conf['mqtt'].get('certfile')
keyfile = conf['mqtt'].get('keyfile')
tls = None
if ca_certs is not None:
tls = {'ca_certs': ca_certs, 'certfile': certfile,
'keyfile': keyfile}
# Listen to topics and start statsd reporters
if 'topics' not in conf:
print('No topics specified in the config file')
sys.exit(2)
for topic in conf['topics']:
statsd_topic = conf['topics'][topic].get('statsd_topic')
if not statsd_topic:
print('No statsd topic specified for mqtt topic %s' % topic)
sys.exit(3)
statsd_type = conf['topics'][topic].get('statsd_type', 'gague')
if statsd_type not in ['gauge', 'counter', 'timer']:
print('statsd_type %s on topic %s is not a valid type' % (
statsd_type, topic))
thread = MQTTStat(mqtt_hostname, topic, statsd_topic, statsd_type,
mqtt_port, websocket=websocket, auth=auth,
tls=tls, keepalive=mqtt_keepalive, qos=mqtt_qos)
thread.start()
while True:
signal.pause()
if __name__ == "__main__":
main()

View File

@ -3,3 +3,6 @@
# process, which may cause wedges in the gate later.
pbr>=1.8 # Apache-2.0
statsd>=3.2.1 # MIT
PyYAML>=3.10.0 # MIT
paho-mqtt>=1.1

View File

@ -3,26 +3,29 @@ name = mqtt_statsd
summary = A script to publish MQTT metrics into statsd
description-file =
README.rst
author = OpenStack
author-email = openstack-dev@lists.openstack.org
author = Matthew Treinish
author-email = mtreinish@kortar.org
home-page = http://www.openstack.org/
classifier =
Environment :: OpenStack
Intended Audience :: Information Technology
Intended Audience :: System Administrators
License :: OSI Approved :: Apache Software License
Operating System :: POSIX :: Linux
Programming Language :: Python
Programming Language :: Python :: 2
Programming Language :: Python :: 2.7
Programming Language :: Python :: 3
Programming Language :: Python :: 3.3
Programming Language :: Python :: 3.4
Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6
[files]
packages =
mqtt_statsd
[entry_points]
console_scripts =
mqtt_statsd = mqtt_statsd.daemon:main
[build_sphinx]
source-dir = doc/source
build-dir = doc/build