Listening refactoring in plugins.
Change-Id: I9e7aeda9637f6139fed7039e7111c44e8e5783dc
This commit is contained in:
parent
825f5d9cce
commit
7c25fab119
@ -0,0 +1,62 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# Author: François Rossigneux <francois.rossigneux@inria.fr>
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
|
import zmq
|
||||||
|
|
||||||
|
from kwapi import security
|
||||||
|
from kwapi.openstack.common import log
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def listen(function):
|
||||||
|
"""Subscribes to ZeroMQ messages, and adds received measurements to the
|
||||||
|
database. Messages are dictionaries dumped in JSON format.
|
||||||
|
|
||||||
|
"""
|
||||||
|
LOG.info('Listening to %s' % cfg.CONF.probes_endpoint)
|
||||||
|
|
||||||
|
context = zmq.Context.instance()
|
||||||
|
subscriber = context.socket(zmq.SUB)
|
||||||
|
if not cfg.CONF.watch_probe:
|
||||||
|
subscriber.setsockopt(zmq.SUBSCRIBE, '')
|
||||||
|
else:
|
||||||
|
for probe in cfg.CONF.watch_probe:
|
||||||
|
subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.')
|
||||||
|
for endpoint in cfg.CONF.probes_endpoint:
|
||||||
|
subscriber.connect(endpoint)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
[probe, message] = subscriber.recv_multipart()
|
||||||
|
measurements = json.loads(message)
|
||||||
|
if not isinstance(measurements, dict):
|
||||||
|
LOG.error('Bad message type (not a dict)')
|
||||||
|
elif cfg.CONF.signature_checking and \
|
||||||
|
not security.verify_signature(measurements,
|
||||||
|
cfg.CONF.driver_metering_secret):
|
||||||
|
LOG.error('Bad message signature')
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
probe = measurements['probe_id'].encode('utf-8')
|
||||||
|
function(probe, float(measurements['w']))
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
LOG.error('Malformed power consumption data: %s'
|
||||||
|
% measurements['w'])
|
||||||
|
except KeyError:
|
||||||
|
LOG.error('Malformed message (missing required key)')
|
@ -17,11 +17,13 @@
|
|||||||
"""Set up the API server application instance."""
|
"""Set up the API server application instance."""
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
import thread
|
||||||
|
|
||||||
import flask
|
import flask
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from kwapi.openstack.common import log
|
from kwapi.openstack.common import log
|
||||||
|
from kwapi.plugins import listen
|
||||||
import acl
|
import acl
|
||||||
from collector import Collector
|
from collector import Collector
|
||||||
import v1
|
import v1
|
||||||
@ -49,6 +51,8 @@ def make_app():
|
|||||||
collector = Collector()
|
collector = Collector()
|
||||||
collector.clean()
|
collector.clean()
|
||||||
|
|
||||||
|
thread.start_new_thread(listen, (collector.add,))
|
||||||
|
|
||||||
@app.before_request
|
@app.before_request
|
||||||
def attach_config():
|
def attach_config():
|
||||||
flask.request.collector = collector
|
flask.request.collector = collector
|
||||||
|
@ -14,15 +14,12 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import json
|
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
import zmq
|
|
||||||
|
|
||||||
from kwapi.openstack.common import log
|
from kwapi.openstack.common import log
|
||||||
from kwapi import security
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
@ -81,9 +78,6 @@ class Collector:
|
|||||||
LOG.info('Starting Collector')
|
LOG.info('Starting Collector')
|
||||||
self.database = {}
|
self.database = {}
|
||||||
self.lock = threading.Lock()
|
self.lock = threading.Lock()
|
||||||
thread = threading.Thread(target=self.listen)
|
|
||||||
thread.daemon = True
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
def add(self, probe, watts):
|
def add(self, probe, watts):
|
||||||
"""Creates (or updates) consumption data for this probe."""
|
"""Creates (or updates) consumption data for this probe."""
|
||||||
@ -125,40 +119,3 @@ class Collector:
|
|||||||
timer = threading.Timer(cfg.CONF.cleaning_interval, self.clean)
|
timer = threading.Timer(cfg.CONF.cleaning_interval, self.clean)
|
||||||
timer.daemon = True
|
timer.daemon = True
|
||||||
timer.start()
|
timer.start()
|
||||||
|
|
||||||
def listen(self):
|
|
||||||
"""Subscribes to ZeroMQ messages, and adds received measurements to the
|
|
||||||
database. Messages are dictionaries dumped in JSON format.
|
|
||||||
|
|
||||||
"""
|
|
||||||
LOG.info('API listening to %s' % cfg.CONF.probes_endpoint)
|
|
||||||
|
|
||||||
context = zmq.Context.instance()
|
|
||||||
subscriber = context.socket(zmq.SUB)
|
|
||||||
if not cfg.CONF.watch_probe:
|
|
||||||
subscriber.setsockopt(zmq.SUBSCRIBE, '')
|
|
||||||
else:
|
|
||||||
for probe in cfg.CONF.watch_probe:
|
|
||||||
subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.')
|
|
||||||
for endpoint in cfg.CONF.probes_endpoint:
|
|
||||||
subscriber.connect(endpoint)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
[probe, message] = subscriber.recv_multipart()
|
|
||||||
measurements = json.loads(message)
|
|
||||||
if not isinstance(measurements, dict):
|
|
||||||
LOG.error('Bad message type (not a dict)')
|
|
||||||
elif cfg.CONF.signature_checking and \
|
|
||||||
not security.verify_signature(
|
|
||||||
measurements,
|
|
||||||
cfg.CONF.driver_metering_secret):
|
|
||||||
LOG.error('Bad message signature')
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
self.add(measurements['probe_id'],
|
|
||||||
float(measurements['w']))
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
LOG.error('Malformed power consumption data: %s'
|
|
||||||
% measurements['w'])
|
|
||||||
except KeyError:
|
|
||||||
LOG.error('Malformed message (missing required key)')
|
|
||||||
|
@ -23,6 +23,7 @@ import flask
|
|||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from kwapi.openstack.common import log
|
from kwapi.openstack.common import log
|
||||||
|
from kwapi.plugins import listen
|
||||||
import rrd
|
import rrd
|
||||||
import v1
|
import v1
|
||||||
|
|
||||||
@ -43,7 +44,8 @@ def make_app():
|
|||||||
app = flask.Flask(__name__)
|
app = flask.Flask(__name__)
|
||||||
app.register_blueprint(v1.blueprint)
|
app.register_blueprint(v1.blueprint)
|
||||||
|
|
||||||
thread.start_new_thread(rrd.listen, ())
|
thread.start_new_thread(listen, (rrd.update_rrd,))
|
||||||
|
rrd.create_dirs()
|
||||||
|
|
||||||
@app.before_request
|
@app.before_request
|
||||||
def attach_config():
|
def attach_config():
|
||||||
|
@ -19,7 +19,6 @@
|
|||||||
import collections
|
import collections
|
||||||
import colorsys
|
import colorsys
|
||||||
import errno
|
import errno
|
||||||
import json
|
|
||||||
import os
|
import os
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
import struct
|
import struct
|
||||||
@ -28,10 +27,8 @@ import uuid
|
|||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
import rrdtool
|
import rrdtool
|
||||||
import zmq
|
|
||||||
|
|
||||||
from kwapi.openstack.common import log
|
from kwapi.openstack.common import log
|
||||||
from kwapi import security
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
@ -165,6 +162,14 @@ def create_rrd_file(filename):
|
|||||||
|
|
||||||
def update_rrd(probe, watts):
|
def update_rrd(probe, watts):
|
||||||
"""Updates RRD file associated with this probe."""
|
"""Updates RRD file associated with this probe."""
|
||||||
|
if not probe in probes:
|
||||||
|
color_seq = color_generator(len(probes)+1)
|
||||||
|
lock.acquire()
|
||||||
|
probes.add(probe)
|
||||||
|
for probe in sorted(probes, reverse=True):
|
||||||
|
probe_colors[probe] = color_seq.next()
|
||||||
|
lock.release()
|
||||||
|
|
||||||
filename = cfg.CONF.rrd_dir + '/' + \
|
filename = cfg.CONF.rrd_dir + '/' + \
|
||||||
str(uuid.uuid5(uuid.NAMESPACE_DNS, str(probe))) + '.rrd'
|
str(uuid.uuid5(uuid.NAMESPACE_DNS, str(probe))) + '.rrd'
|
||||||
if not os.path.isfile(filename):
|
if not os.path.isfile(filename):
|
||||||
@ -283,50 +288,3 @@ def build_graph(scale, probe=None):
|
|||||||
else:
|
else:
|
||||||
LOG.info('Retrieve PNG summary graph from cache')
|
LOG.info('Retrieve PNG summary graph from cache')
|
||||||
return png_file
|
return png_file
|
||||||
|
|
||||||
|
|
||||||
def listen():
|
|
||||||
"""Subscribes to ZeroMQ messages, and adds received measurements to the
|
|
||||||
database. Messages are dictionaries dumped in JSON format.
|
|
||||||
|
|
||||||
"""
|
|
||||||
LOG.info('RRD listening to %s' % cfg.CONF.probes_endpoint)
|
|
||||||
|
|
||||||
create_dirs()
|
|
||||||
|
|
||||||
context = zmq.Context.instance()
|
|
||||||
subscriber = context.socket(zmq.SUB)
|
|
||||||
if not cfg.CONF.watch_probe:
|
|
||||||
subscriber.setsockopt(zmq.SUBSCRIBE, '')
|
|
||||||
else:
|
|
||||||
for probe in cfg.CONF.watch_probe:
|
|
||||||
subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.')
|
|
||||||
for endpoint in cfg.CONF.probes_endpoint:
|
|
||||||
subscriber.connect(endpoint)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
[probe, message] = subscriber.recv_multipart()
|
|
||||||
measurements = json.loads(message)
|
|
||||||
if not isinstance(measurements, dict):
|
|
||||||
LOG.error('Bad message type (not a dict)')
|
|
||||||
elif cfg.CONF.signature_checking and \
|
|
||||||
not security.verify_signature(measurements,
|
|
||||||
cfg.CONF.driver_metering_secret):
|
|
||||||
LOG.error('Bad message signature')
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
probe = measurements['probe_id'].encode('utf-8')
|
|
||||||
update_rrd(probe, float(measurements['w']))
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
LOG.error('Malformed power consumption data: %s'
|
|
||||||
% measurements['w'])
|
|
||||||
except KeyError:
|
|
||||||
LOG.error('Malformed message (missing required key)')
|
|
||||||
else:
|
|
||||||
if not probe in probes:
|
|
||||||
color_seq = color_generator(len(probes)+1)
|
|
||||||
lock.acquire()
|
|
||||||
probes.add(probe)
|
|
||||||
for probe in sorted(probes, reverse=True):
|
|
||||||
probe_colors[probe] = color_seq.next()
|
|
||||||
lock.release()
|
|
||||||
|
Loading…
Reference in New Issue
Block a user