Reversing client-server roles between the Collector and the Driver Manager.
Switching from AF_UNIX socket to ZeroMQ (TCP support, multiple clients support). Renaming config option "collector_socket" to "probes_endpoint".
This commit is contained in:
parent
5733e0b029
commit
cdbb4ed636
|
@ -7,7 +7,7 @@ api_metering_secret = change this or be hacked
|
|||
|
||||
# Communication
|
||||
api_port = 5000
|
||||
collector_socket = /tmp/kwapi-collector
|
||||
probes_endpoint = ipc:///tmp/kwapi
|
||||
|
||||
# Timers
|
||||
collector_cleaning_interval = 300
|
||||
|
|
|
@ -18,7 +18,7 @@ def make_app(enable_acl):
|
|||
app = flask.Flask('kwapi.api')
|
||||
app.register_blueprint(v1.blueprint, url_prefix='/v1')
|
||||
|
||||
collector = Collector(config.CONF['collector_socket'])
|
||||
collector = Collector(config.CONF['probes_endpoint'])
|
||||
collector.clean(config.CONF['collector_cleaning_interval'], periodic=True)
|
||||
|
||||
@app.before_request
|
||||
|
|
|
@ -6,6 +6,8 @@ import socket
|
|||
import threading
|
||||
import time
|
||||
|
||||
import zmq
|
||||
|
||||
class Record(dict):
|
||||
"""Contains fields (timestamp, kwh, w) and a method to update consumption."""
|
||||
|
||||
|
@ -75,32 +77,25 @@ class Collector:
|
|||
self.timer.daemon = True
|
||||
self.timer.start()
|
||||
|
||||
def listen(self, socket_name):
|
||||
"""Listen the socket, and add received values to the database.
|
||||
Datagram format is "probe:value".
|
||||
def listen(self, endpoint):
|
||||
"""Subscribes to ZeroMQ messages, and adds received values to the database.
|
||||
Message format is "probe:value".
|
||||
|
||||
"""
|
||||
logging.info('Collector listenig to %s' % socket_name)
|
||||
logging.info('Collector listenig to %s' % endpoint)
|
||||
|
||||
context = zmq.Context()
|
||||
subscriber = context.socket(zmq.SUB)
|
||||
subscriber.setsockopt(zmq.SUBSCRIBE, '')
|
||||
subscriber.connect(endpoint)
|
||||
|
||||
if os.path.exists(socket_name):
|
||||
os.remove(socket_name)
|
||||
|
||||
server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
server.bind(socket_name)
|
||||
|
||||
while True:
|
||||
datagram = server.recv(1024)
|
||||
if not datagram:
|
||||
logging.error('Received data are not datagram')
|
||||
break
|
||||
message = subscriber.recv()
|
||||
data = message.split(':')
|
||||
if len(data) == 2:
|
||||
try:
|
||||
self.add(data[0], float(data[1]))
|
||||
except:
|
||||
logging.error('Message format error: %s' % message)
|
||||
else:
|
||||
data = datagram.split(':')
|
||||
if len(data) == 2:
|
||||
try:
|
||||
self.add(data[0], float(data[1]))
|
||||
except:
|
||||
logging.error('Datagram format error: %s' % datagram)
|
||||
else:
|
||||
logging.error('Malformed datagram: %s' % datagram)
|
||||
server.close()
|
||||
os.remove(socket_name)
|
||||
logging.error('Malformed message: %s' % message)
|
||||
|
|
|
@ -57,7 +57,7 @@ acl_auth_url = string
|
|||
api_log = string
|
||||
api_metering_secret = string
|
||||
api_port = integer
|
||||
collector_socket = string
|
||||
probes_endpoint = string
|
||||
check_drivers_interval = integer
|
||||
collector_cleaning_interval = integer
|
||||
drivers_log = string
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Loads probe threads, and transmits their values to the collector."""
|
||||
"""Loads probe threads, and transmits their values via ZeroMQ."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
@ -9,12 +9,17 @@ import sys
|
|||
import signal
|
||||
from subprocess import call
|
||||
import thread
|
||||
import threading
|
||||
from threading import Timer
|
||||
|
||||
from configobj import Section
|
||||
import zmq
|
||||
|
||||
from kwapi import config
|
||||
|
||||
context = zmq.Context()
|
||||
publisher = context.socket(zmq.PUB)
|
||||
publisher.bind(config.CONF['probes_endpoint'])
|
||||
|
||||
threads = []
|
||||
|
||||
def load_all_drivers():
|
||||
|
@ -58,7 +63,7 @@ def check_drivers_alive(interval):
|
|||
if new_thread is not None:
|
||||
threads[index] = new_thread
|
||||
if interval > 0:
|
||||
timer = threading.Timer(interval, check_drivers_alive, [interval])
|
||||
timer = Timer(interval, check_drivers_alive, [interval])
|
||||
timer.daemon = True
|
||||
timer.start()
|
||||
|
||||
|
@ -71,16 +76,8 @@ def terminate():
|
|||
"""Terminates driver threads"""
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
publisher.close()
|
||||
|
||||
def send_value(probe_id, value):
|
||||
"""Send a datagram to a socket, with the following format: "probe_id:value"."""
|
||||
socket_name = config.CONF['collector_socket']
|
||||
if os.path.exists(socket_name):
|
||||
client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
try:
|
||||
client.connect(socket_name)
|
||||
client.sendall(probe_id + ':' + str(value))
|
||||
except:
|
||||
logging.error('Cannot connect to %s' % socket_name)
|
||||
else:
|
||||
client.close()
|
||||
"""Sends a message via ZeroMQ, with the following format: "probe_id:value"."""
|
||||
publisher.send(probe_id + ':' + str(value))
|
||||
|
|
Loading…
Reference in New Issue