Use Flask blueprint.

Create setup.py (pip install kwapi).
Config filename as argument.
Ability to hit "control-c" to quit.
Multiple improvements (bug fixes, best code organization).
This commit is contained in:
François Rossigneux 2012-11-16 14:36:16 +01:00
parent b44f36b15c
commit 53d14a439b
22 changed files with 478 additions and 244 deletions

47
api.py
View File

@ -1,47 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from flask import Flask, redirect, url_for, jsonify, abort
import collector
import config
app = Flask(__name__)
@app.route('/')
def index():
return redirect(url_for('v1'))
@app.route('/v1/')
def v1():
return 'Welcome to Kwapi!'
@app.route('/v1/probes/')
def v1_probe_list():
return jsonify(probes=database.keys())
@app.route('/v1/probes/<probe>/')
def v1_probe_info(probe):
try:
result = {probe: database[probe]}
except KeyError:
abort(404)
return jsonify(result)
@app.route('/v1/probes/<probe>/<meter>/')
def v1_probe_value(probe, meter):
try:
result = {meter: database[probe][meter]}
except KeyError:
abort(404)
return jsonify(result)
if __name__ == '__main__':
config = config.get_config('kwapi.conf', 'configspec.ini')
if config is None:
sys.exit(1)
collector = collector.Collector()
collector.clean(config['collector_cleaning_interval'], periodic=True)
collector.start_listen(config['socket'])
database = collector.database
app.run(debug=True)

18
bin/kwapi-api Executable file
View File

@ -0,0 +1,18 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import logging
from kwapi import config
from kwapi import log
from kwapi.api import app
if __name__ == '__main__':
if config.CONF is None:
sys.exit(1)
log.setup(config.CONF['api_log'], logging.WARNING, logging.DEBUG)
root = app.make_app(enable_acl=False)
root.run(host='0.0.0.0', port=config.CONF['api_port'])

25
bin/kwapi-drivers Executable file
View File

@ -0,0 +1,25 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import logging
import signal
from kwapi import config
from kwapi import log
from kwapi.drivers import driver_manager
if __name__ == "__main__":
if config.CONF is None:
sys.exit(1)
log.setup(config.CONF['drivers_log'], logging.WARNING, logging.DEBUG)
driver_manager.load_all_drivers()
driver_manager.check_drivers_alive(config.CONF['check_drivers_interval'])
signal.signal(signal.SIGTERM, driver_manager.signal_handler)
try:
signal.pause()
except KeyboardInterrupt:
driver_manager.terminate()

View File

@ -1,25 +0,0 @@
import logging
import sys
from configobj import ConfigObj, Section, flatten_errors
from validate import Validator, ValidateError
import drivers
def driver_check(class_name):
try:
getattr(sys.modules['drivers'], class_name)
except AttributeError:
raise ValidateError("%s doesn't exist." % class_name)
return class_name
def get_config(config_file, configspec_file):
config = ConfigObj(config_file, configspec=configspec_file)
validator = Validator({'driver': driver_check})
results = config.validate(validator)
if results != True:
for(section_list, key, _) in flatten_errors(config, results):
if key is not None:
logging.critical('The "%s" key in the section "%s" failed validation.' % (key, ', '.join(section_list)))
else:
logging.critical('The following section was missing:%s.' % ', '.join(section_list))
else:
return config

View File

@ -1,8 +0,0 @@
collector_socket = string
check_probes_interval = integer
collector_cleaning_interval = integer
socket = string
[__many__]
probes = string_list
driver = driver

View File

@ -1,26 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from threading import Thread
class Driver(Thread):
def __init__(self, probe_ids, kwargs):
Thread.__init__(self)
self.probe_ids = probe_ids
self.kwargs = kwargs
self.probe_observers = []
self.terminate = False
def run(self):
raise NotImplementedError
def update_value(self, probe_id, value):
for notify_new_value in self.probe_observers :
notify_new_value(probe_id, value)
def subscribe(self, observer):
self.probe_observers.append(observer)
def stop(self):
self.terminate = True

38
etc/kwapi/kwapi.conf Normal file
View File

@ -0,0 +1,38 @@
# Kwapi config file
# Communication
api_port = 5000
collector_socket = /tmp/kwapi-collector
# Timers
collector_cleaning_interval = 300
check_drivers_interval = 60
# Log files
api_log = /tmp/kwapi-api.log
drivers_log = /tmp/kwapi-drivers.log
# Wattmeter sections
#
# [wattmeter_name]
# probes = list_of_probes
# driver = driver_class
# [[parameters]]
# arg = value
[Wattmeter 1]
probes = A,
driver = Wattsup
[[parameters]]
device = /dev/ttyUSB0
[Wattmeter 2]
probes = B,
driver = Dummy
[[parameters]]
min = 10
max = 20
[Wattmeter 3]
probes = C, D
driver = Dummy

View File

@ -1,23 +0,0 @@
# Kwapi config file
collector_socket = /tmp/kwapi-collector
check_probes_interval = 4
collector_cleaning_interval = 86400
socket = /tmp/kwapi-collector
[Wattmeter 1]
probes = A,
driver = Wattsup
[[parameters]]
device = /dev/ttyUSB0
[Wattmeter 2]
probes = B,
driver = Dummy
[[parameters]]
min = 10
max = 20
[Wattmeter 3]
probes = C, D
driver = Dummy

0
kwapi/__init__.py Normal file
View File

0
kwapi/api/__init__.py Normal file
View File

32
kwapi/api/app.py Normal file
View File

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
"""Set up the API server application instance."""
import logging
import flask
import flask.helpers
from kwapi import config
from collector import Collector
import v1
#import acl
def make_app(enable_acl=True):
"""Instantiates Flask app, attaches collector database, installs acl."""
logging.info('Starting API')
app = flask.Flask('kwapi.api')
app.register_blueprint(v1.blueprint, url_prefix='/v1')
collector = Collector(config.CONF['collector_socket'])
collector.clean(config.CONF['collector_cleaning_interval'], periodic=True)
@app.before_request
def attach_config():
flask.request.database = collector.database
# Install the middleware wrapper
if enable_acl:
return acl.install(app, cfg.CONF)
return app

41
collector.py → kwapi/api/collector.py Executable file → Normal file
View File

@ -1,15 +1,16 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import time
import socket
import os, os.path
import socket
import threading
import time
class Record(dict):
"""Contains fields (timestamp, kwh, w) and a method to update consumption."""
def __init__(self, timestamp, kwh, watts):
"""Initializes fields with the given arguments."""
dict.__init__(self)
self._dict = {}
self['timestamp'] = timestamp
@ -17,17 +18,25 @@ class Record(dict):
self['w'] = watts
def add(self, watts):
"""Updates fields with consumption data."""
currentTime = time.time()
self['kwh'] += (currentTime - self['timestamp']) / 3600.0 * (watts / 1000.0)
self['w'] = watts
self['timestamp'] = currentTime
class Collector:
"""Collector gradually fills its database with received values from wattmeter drivers."""
def __init__(self):
def __init__(self, socket_name):
"""Initializes an empty database and start listening the socket."""
logging.info('Starting Collector')
self.database = {}
thread = threading.Thread(target=self.listen, args=[socket_name])
thread.daemon = True
thread.start()
def add(self, probe, watts):
"""Creates (or update) consumption data for this probe."""
if probe in self.database:
self.database[probe].add(watts)
else:
@ -35,6 +44,7 @@ class Collector:
self.database[probe] = record
def remove(self, probe):
"""Removes this probe from database."""
if probe in self.database:
del self.database[probe]
return True
@ -42,9 +52,15 @@ class Collector:
return False
def clean(self, timeout, periodic):
"""Removes probes from database if they didn't send new values over the last timeout period (seconds).
If periodic, this method is executed automatically after the timeout interval.
"""
logging.info('Cleaning collector')
# Cleaning
for probe in self.database.keys():
if time.time() - self.database[probe].timestamp > timeout:
if time.time() - self.database[probe]['timestamp'] > timeout:
logging.info('Removing data of probe %s' % probe)
self.remove(probe)
# Cancel next execution of this function
@ -55,10 +71,17 @@ class Collector:
# Schedule periodic execution of this function
if periodic:
self.timer = threading.Timer(timeout, self.clean, [timeout])
self.timer = threading.Timer(timeout, self.clean, [timeout, True])
self.timer.daemon = True
self.timer.start()
def listen(self, socket_name):
"""Listen the socket, and add received values to the database.
Datagram format is "probe:value".
"""
logging.info('Collector listenig to %s' % socket_name)
if os.path.exists(socket_name):
os.remove(socket_name)
@ -68,7 +91,7 @@ class Collector:
while True:
datagram = server.recv(1024)
if not datagram:
print 'Error: not datagram'
logging.error('Received data are not datagram')
break
else:
data = datagram.split(':')
@ -81,7 +104,3 @@ class Collector:
logging.error('Malformed datagram: %s' % datagram)
server.close()
os.remove(socket_name)
def start_listen(self, socket_name):
thread = threading.Thread(target=self.listen, args=[socket_name])
thread.start()

36
kwapi/api/v1.py Normal file
View File

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
"""This blueprint defines all URLs and answers."""
import flask
import flask.helpers
blueprint = flask.Blueprint('v1', __name__)
@blueprint.route('/')
def welcome():
"""Returns detailed information about this specific version of the API."""
return 'Welcome to Kwapi!'
@blueprint.route('/probes/')
def list_probes():
"""Returns a list of all the known probes."""
return flask.jsonify(probes=flask.request.database.keys())
@blueprint.route('/probes/<probe>/')
def probe_info(probe):
"""Returns all information about this probe (id, timestamp, kWh, W)."""
try:
result = {probe: flask.request.database[probe]}
except KeyError:
flask.abort(404)
return flask.jsonify(result)
@blueprint.route('/probes/<probe>/<meter>/')
def probe_value(probe, meter):
"""Returns the probe meter value."""
try:
result = {meter: flask.request.database[probe][meter]}
except KeyError:
flask.abort(404)
return flask.jsonify(result)

71
kwapi/config.py Normal file
View File

@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
"""Kwapi configuration."""
from optparse import OptionParser
import os
import sys
from configobj import ConfigObj, Section, flatten_errors
from validate import Validator, ValidateError
import drivers
def driver_exists(class_name):
"""Checks if class_name is a real Python class."""
try:
getattr(sys.modules['kwapi.drivers'], class_name)
except AttributeError:
raise ValidateError("%s doesn't exist." % class_name)
return class_name
def get_drivers():
"""Returns a generator over all drivers (class_name, probe_ids, kwargs)."""
for key in CONF.keys():
if isinstance(CONF[key], Section):
class_name = CONF[key]['driver']
probe_ids = CONF[key]['probes']
try:
kwargs = CONF[key]['parameters']
except KeyError:
kwargs = {}
yield class_name, probe_ids, kwargs
def get_config(config_file):
"""Validates config_file and returns a ConfigObj (dictionary-like) if success."""
spec = cfg.split("\n")
try:
config = ConfigObj(infile=config_file, configspec=spec, file_error=True)
except IOError:
print 'Error: %s not found' % config_file
else:
validator = Validator({'driver': driver_exists})
results = config.validate(validator)
if results != True:
for(section_list, key, _) in flatten_errors(config, results):
if key is not None:
print 'Error: the "%s" key in the section "%s" failed validation' % (key, ', '.join(section_list))
else:
print 'Error: the following section was missing:%s' % ', '.join(section_list)
else:
return config
# Config file format specifications
cfg = """
api_log = string
api_port = integer
collector_socket = string
check_drivers_interval = integer
collector_cleaning_interval = integer
drivers_log = string
[__many__]
probes = string_list
driver = driver
"""
parser = OptionParser()
parser.add_option('-f', metavar="FILE", default='/etc/kwapi/kwapi.conf', help='configuration file')
(options, args) = parser.parse_args()
CONF = get_config(options.f)

43
kwapi/drivers/driver.py Normal file
View File

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
import logging
from threading import Thread, Event
class Driver(Thread):
"""Generic driver class, derived from Thread."""
def __init__(self, probe_ids, kwargs):
"""Initializes driver."""
logging.info('Loading driver %s(probe_ids=%s, kwargs=%s)' % (self.__class__.__name__, probe_ids, kwargs))
Thread.__init__(self)
self.probe_ids = probe_ids
self.kwargs = kwargs
self.probe_observers = []
self.stop_request = Event()
def run(self):
"""Run the driver thread. Needs to be implemented in a derived class."""
raise NotImplementedError
def join(self):
"""Asks the driver thread to terminate."""
self.stop_request.set()
super(Driver, self).join()
def stop_request_pending(self):
"""Returns true if a stop request is pending."""
return self.stop_request.is_set()
def update_value(self, probe_id, value):
"""Calls the callback method of all observers, with the following arguments: probe_id, value."""
for notify_new_value in self.probe_observers :
notify_new_value(probe_id, value)
def subscribe(self, observer):
"""Appends the observer (callback method) to the observers list."""
self.probe_observers.append(observer)
def stop(self):
"""Asks the probe to terminate."""
self.terminate = True

View File

@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
"""Loads probe threads, and transmits their values to the collector."""
import logging
import os
import socket
import sys
import signal
from subprocess import call
import thread
import threading
from configobj import Section
from kwapi import config
threads = []
def load_all_drivers():
"""Loads all drivers from config."""
for entry in config.CONF.values():
if isinstance(entry, Section):
class_name = entry['driver']
probe_ids = entry['probes']
kwargs = {}
if 'parameters' in entry.keys():
kwargs = entry['parameters']
thread = load_driver(class_name, probe_ids, kwargs)
if thread is not None:
threads.append(thread)
def load_driver(class_name, probe_ids, kwargs):
"""Starts a probe thread."""
try:
probeClass = getattr(sys.modules['kwapi.drivers'], class_name)
except AttributeError:
raise NameError("%s doesn't exist." % class_name)
try:
probeObject = probeClass(probe_ids, **kwargs)
except Exception as exception:
logging.error('Exception occurred while initializing %s(%s, %s): %s' % (class_name, probe_ids, kwargs, exception))
else:
probeObject.subscribe(send_value)
probeObject.start()
return probeObject
def check_drivers_alive(interval):
"""Checks all drivers and reloads those that crashed.
This method is executed automatically at the given interval.
"""
logging.info('Checks driver threads')
for index, thread in enumerate(threads):
if not thread.is_alive():
logging.warning('%s(probe_ids=%s, kwargs=%s) is crashed' % (thread.__class__.__name__, thread.probe_ids, thread.kwargs))
new_thread = load_driver(thread.__class__.__name__, thread.probe_ids, thread.kwargs)
if new_thread is not None:
threads[index] = new_thread
if interval > 0:
timer = threading.Timer(interval, check_drivers_alive, [interval])
timer.daemon = True
timer.start()
def signal_handler(signum, frame):
"""Intercepts TERM signal and properly terminates probe threads."""
if signum is signal.SIGTERM:
terminate()
def terminate():
"""Terminates driver threads"""
for thread in threads:
thread.join()
def send_value(probe_id, value):
"""Send a datagram to a socket, with the following format: "probe_id:value"."""
socket_name = config.CONF['collector_socket']
if os.path.exists(socket_name):
client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
client.connect(socket_name)
client.sendall(probe_id + ':' + str(value))
except:
logging.error('Cannot connect to %s' % socket_name)
else:
client.close()

14
drivers/dummy.py → kwapi/drivers/dummy.py Executable file → Normal file
View File

@ -1,18 +1,28 @@
# -*- coding: utf-8 -*-
from driver import Driver
from random import randrange
import time
from driver import Driver
class Dummy(Driver):
"""Dummy driver derived from Driver class. Usefull for tests."""
def __init__(self, probe_ids, **kwargs):
"""Initializes the dummy driver.
Keyword arguments:
probe_ids -- list containing the probes IDs (a wattmeter monitor sometimes several probes
kwargs -- keywords (min_value and max_value) defining the random value interval
"""
Driver.__init__(self, probe_ids, kwargs)
self.min_value = int(kwargs.get('min', 75))
self.max_value = int(kwargs.get('max', 100))
def run(self):
while not self.terminate:
"""Starts the driver thread."""
while not self.stop_request_pending():
for probe_id in self.probe_ids:
value = randrange(self.min_value, self.max_value)
self.update_value(probe_id, value)

27
drivers/wattsup.py → kwapi/drivers/wattsup.py Executable file → Normal file
View File

@ -1,12 +1,22 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from driver import Driver
import logging
import serial
from serial.serialutil import SerialException
from driver import Driver
class Wattsup(Driver):
"""Driver for Wattsup wattmeters."""
def __init__(self, probe_ids, **kwargs):
"""Initializes the Wattsup driver.
Keyword arguments:
probe_ids -- list containing the probes IDs (a wattmeter monitor sometimes several probes
kwargs -- keyword (device) defining the device to read (/dev/ttyUSB0)
"""
Driver.__init__(self, probe_ids, kwargs)
# Configure serial port
@ -22,18 +32,24 @@ class Wattsup(Driver):
# Clear memory
self.serial.write('#R,W,0;')
self.serial.read(256)
# Start external logging with interval = 1
self.serial.write('#L,W,3,E,1,1;')
self.serial.read(256)
def run(self):
while not self.terminate:
packet = self.get_packet()
"""Starts the driver thread."""
while not self.stop_request_pending():
try:
packet = self.get_packet()
except SerialException:
self.serial.close()
self.stop()
value = self.extract_watts(packet)
self.update_value(self.probe_ids[0], value)
def get_packet(self):
"""Returns the next packet sent by the wattmeter."""
packet = ''
while True:
char = self.serial.read(1)
@ -44,5 +60,6 @@ class Wattsup(Driver):
return packet
def extract_watts(self, packet):
"""Extracts the consumption data (watts) from the packet."""
value = float(packet.split(',')[3])/10.0
return value

24
kwapi/log.py Normal file
View File

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
"""Kwapi logging."""
import logging
def setup(file_name, file_level, console_level):
"""Setup logging with the given parameters."""
# Create logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# Create file handler
file_handler = logging.FileHandler(file_name, mode='w')
file_handler.setLevel(file_level)
# Create console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(console_level)
# Create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(message)s', "%Y-%m-%d %H:%M:%S")
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)

View File

@ -1,97 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import os
import socket
import sys
import signal
from subprocess import call
import thread
import threading
from configobj import Section
import config
import drivers
threads = []
def setup_logging(log_level, file_name, print_to_stdout):
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', filename=file_name, filemode='w', level=log_level)
if print_to_stdout:
console_handler = logging.StreamHandler()
console_handler.setLevel(log_level)
logger = logging.getLogger()
logger.addHandler(console_handler)
def load_probes_from_conf(config):
for key in config.keys():
if isinstance(config[key], Section):
class_name = config[key]['driver']
probe_ids = config[key]['probes']
kwargs = {}
if 'parameters' in config[key].keys():
kwargs = config[key]['parameters']
thread = load_probe(class_name, probe_ids, kwargs)
if thread is not None:
threads.append(thread)
def load_probe(class_name, probe_ids, kwargs):
try:
probeClass = getattr(sys.modules['drivers'], class_name)
except AttributeError:
raise NameError("%s doesn't exist." % class_name)
try:
probeObject = probeClass(probe_ids, **kwargs)
except Exception as exception:
logging.error('Probe %s constructor error: %s' % (probe_ids, exception))
return None
probeObject.subscribe(send_value)
probeObject.start()
return probeObject
def check_probes_alive(interval):
for index, thread in enumerate(threads):
if not thread.is_alive():
logging.warning('%s crashed!', thread)
threads[index] = load_probe(thread.__class__.__name__, thread.probe_ids, thread.kwargs)
if interval > 0:
timer = threading.Timer(interval, check_probes_alive, [interval])
timer.daemon = True
timer.start()
def signal_handler(signum, frame):
if signum is signal.SIGTERM:
terminate()
def terminate():
for thread in threads:
thread.stop()
for thread in threads:
thread.join()
def send_value(probe_id, value):
# TODO Do not read config everytime
socket_name = config.get_config('kwapi.conf', 'configspec.ini')['socket']
if os.path.exists(socket_name):
client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
client.connect(socket_name)
client.send(probe_id + ':' + str(value))
client.close()
if __name__ == "__main__":
setup_logging(logging.DEBUG, 'kwapi.log', print_to_stdout=True)
config = config.get_config('kwapi.conf', 'configspec.ini')
if config is None:
sys.exit(1)
load_probes_from_conf(config)
# Check probe crashes
check_probes_alive(config['check_probes_interval'])
signal.signal(signal.SIGTERM, signal_handler)
try:
signal.pause()
except KeyboardInterrupt:
terminate()

41
setup.py Executable file
View File

@ -0,0 +1,41 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from distutils.core import setup
setup(
name='kwapi',
version='1.0',
description='Energy Efficiency Architecture',
author='François Rossigneux',
author_email='francois.rossigneux@inria.fr',
url='http://gitorious.ow2.org/xlcloud/kwapi',
classifiers=[
'Development Status :: 3 - Alpha',
'Framework :: Setuptools Plugin',
'Environment :: OpenStack',
'Intended Audience :: Information Technology',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: Apache Software License',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Topic :: System :: Monitoring',
],
packages=['kwapi', 'kwapi.api', 'kwapi.drivers'],
scripts=['bin/kwapi-api',
'bin/kwapi-drivers'],
data_files=[('/etc/kwapi', ['etc/kwapi/kwapi.conf'])],
install_requires=['flask', 'configobj', 'pyserial', 'requests']
)