From eb9a496c8b0897d81f35d269b5413aa3a40407da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Rossigneux?= Date: Tue, 4 Dec 2012 14:11:11 +0100 Subject: [PATCH] Multiple measurements support (volts, amperes, watts). Drivers send dictionaries dumped in JSON format (instead of probe_id:value format). Driver method "send_value" is renamed to "send_measurements". --- kwapi/api/collector.py | 19 ++++++++++--------- kwapi/drivers/driver.py | 8 +++++--- kwapi/drivers/dummy.py | 7 +++++-- kwapi/drivers/wattsup.py | 5 +++-- 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/kwapi/api/collector.py b/kwapi/api/collector.py index 6f30483..8ce9a46 100644 --- a/kwapi/api/collector.py +++ b/kwapi/api/collector.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- +import json import threading import time @@ -90,8 +91,8 @@ class Collector: self.timer.start() def listen(self, conf): - """Subscribes to ZeroMQ messages, and adds received values to the database. - Message format is "probe:value". + """Subscribes to ZeroMQ messages, and adds received measurements to the database. + Messages are dictionaries dumped in JSON format. """ LOG.info('Collector listenig to %s' % conf.probes_endpoint) @@ -103,11 +104,11 @@ class Collector: while True: message = subscriber.recv() - data = message.split(':') - if len(data) == 2: - try: - self.add(data[0], float(data[1])) - except: - LOG.error('Message format error: %s' % message) + measurements = json.loads(message) + if not isinstance(measurements, dict): + LOG.error('Bad message type (not a dict)') else: - LOG.error('Malformed message: %s' % message) + try: + self.add(measurements['probe_id'], float(measurements['w'])) + except KeyError: + LOG.error('Malformed message (missing required key)') diff --git a/kwapi/drivers/driver.py b/kwapi/drivers/driver.py index ec6e717..90ff68e 100644 --- a/kwapi/drivers/driver.py +++ b/kwapi/drivers/driver.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- +import json from threading import Thread, Event import zmq @@ -35,9 +36,10 @@ class Driver(Thread): """Returns true if a stop request is pending.""" return self.stop_request.is_set() - def send_value(self, probe_id, value): - """Sends a message via ZeroMQ with the following format: probe_id, value.""" - self.publisher.send(probe_id + ':' + str(value)) + def send_measurements(self, probe_id, measurements): + """Sends a message via ZeroMQ (dictionary dumped in JSON format).""" + measurements['probe_id'] = probe_id + self.publisher.send(json.dumps(measurements)) def subscribe(self, observer): """Appends the observer (callback method) to the observers list.""" diff --git a/kwapi/drivers/dummy.py b/kwapi/drivers/dummy.py index 14d7f71..f65b5a4 100644 --- a/kwapi/drivers/dummy.py +++ b/kwapi/drivers/dummy.py @@ -24,6 +24,9 @@ class Dummy(Driver): """Starts the driver thread.""" while not self.stop_request_pending(): for probe_id in self.probe_ids: - value = randrange(self.min_value, self.max_value) - self.send_value(probe_id, value) + measurements = {} + measurements['w'] = randrange(self.min_value, self.max_value) + measurements['v'] = 230.0 + measurements['a'] = measurements['w'] / measurements['v'] + self.send_measurements(probe_id, measurements) time.sleep(1) diff --git a/kwapi/drivers/wattsup.py b/kwapi/drivers/wattsup.py index 6237109..18aecd9 100644 --- a/kwapi/drivers/wattsup.py +++ b/kwapi/drivers/wattsup.py @@ -44,8 +44,9 @@ class Wattsup(Driver): except SerialException: self.serial.close() self.stop() - value = self.extract_watts(packet) - self.send_value(self.probe_ids[0], value) + measurements = {} + measurements['w'] = self.extract_watts(packet) + self.send_measurements(self.probe_ids[0], measurements) def get_packet(self): """Returns the next packet sent by the wattmeter."""