Add topic hierarchy
This commit adds topic hierarchy to the published events. The topic is now set per project and event-type. This enables flexibility in what types of events you're subscribing to.
This commit is contained in:
parent
368354150b
commit
ca4033c595
|
@ -48,10 +48,9 @@ class GerritStream(object):
|
|||
|
||||
|
||||
class PushMQTT(object):
|
||||
def __init__(self, hostname, topic, port=1883, client_id=None,
|
||||
def __init__(self, hostname, port=1883, client_id=None,
|
||||
keepalive=60, will=None, auth=None, tls=None):
|
||||
self.hostname = hostname
|
||||
self.topic = topic
|
||||
self.port = port
|
||||
self.client_id = client_id
|
||||
self.keepalive = 60
|
||||
|
@ -59,14 +58,14 @@ class PushMQTT(object):
|
|||
self.auth = auth
|
||||
self.tls = tls
|
||||
|
||||
def publish_single(self, msg):
|
||||
publish.single(self.topic, msg, hostname=self.hostname,
|
||||
def publish_single(self, topic, msg):
|
||||
publish.single(topic, msg, hostname=self.hostname,
|
||||
port=self.port, client_id=self.client_id,
|
||||
keepalive=self.keepalive, will=self.will,
|
||||
auth=self.auth, tls=self.tls)
|
||||
|
||||
def publish_multiple(self, msg):
|
||||
publish.multiple(self.topic, msg, hostname=self.hostname,
|
||||
def publish_multiple(self, topic, msg):
|
||||
publish.multiple(topic, msg, hostname=self.hostname,
|
||||
port=self.port, client_id=self.client_id,
|
||||
keepalive=self.keepalive, will=self.will,
|
||||
auth=self.auth, tls=self.tls)
|
||||
|
@ -82,6 +81,13 @@ def get_options():
|
|||
parser.add_argument('conffile', nargs=1, help="Configuration file")
|
||||
return parser.parse_args()
|
||||
|
||||
def get_topic(base_topic, event):
|
||||
project = event.get('project', '')
|
||||
event_type = event.get('type', '')
|
||||
pieces = [base_topic, project, event_type]
|
||||
topic = "/".join(pieces)
|
||||
return topic
|
||||
|
||||
|
||||
def _main(args, config):
|
||||
if config.has_option('gerrit', 'port'):
|
||||
|
@ -105,14 +111,15 @@ def _main(args, config):
|
|||
|
||||
mqttqueue = PushMQTT(
|
||||
config.get('mqtt', 'hostname'),
|
||||
config.get('mqtt', 'topic'),
|
||||
port=mqtt_port,
|
||||
keepalive=keepalive)
|
||||
|
||||
base_topic = config.get('mqtt', 'topic')
|
||||
while True:
|
||||
event = gerrit.get_event()
|
||||
topic = get_topic(base_topic, event)
|
||||
if event:
|
||||
mqttqueue.publish_single(json.dumps(event))
|
||||
mqttqueue.publish_single(topic, json.dumps(event))
|
||||
|
||||
|
||||
def main():
|
||||
|
|
Loading…
Reference in New Issue