2014-09-16 16:15:49 -06:00
|
|
|
#!/usr/bin/env python
|
|
|
|
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
|
|
# implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
"""Persister Module
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
The Persister reads metrics and alarms from Kafka and then stores them
|
|
|
|
in InfluxDB.
|
|
|
|
|
|
|
|
Start the perister as stand-alone process by running 'persister.py
|
|
|
|
--config-file <config file>'
|
|
|
|
|
|
|
|
Also able to use Openstack service to start the persister.
|
2014-09-16 16:15:49 -06:00
|
|
|
|
|
|
|
"""
|
2014-10-03 09:38:12 -06:00
|
|
|
|
|
|
|
import abc
|
2014-10-02 08:33:19 -06:00
|
|
|
from datetime import datetime
|
2014-09-16 16:15:49 -06:00
|
|
|
import json
|
2014-11-19 15:20:27 -07:00
|
|
|
import os
|
2014-10-03 09:38:12 -06:00
|
|
|
import six
|
2014-09-16 16:15:49 -06:00
|
|
|
import sys
|
2014-10-02 08:33:19 -06:00
|
|
|
import threading
|
|
|
|
import urllib
|
|
|
|
|
|
|
|
from influxdb import InfluxDBClient
|
|
|
|
from kafka import KafkaClient
|
|
|
|
from kafka import SimpleConsumer
|
2014-09-16 16:15:49 -06:00
|
|
|
from oslo.config import cfg
|
|
|
|
|
|
|
|
from openstack.common import log
|
|
|
|
from openstack.common import service as os_service
|
|
|
|
import service
|
|
|
|
|
2014-10-02 08:33:19 -06:00
|
|
|
|
2014-09-16 16:15:49 -06:00
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
kafka_metrics_opts = [cfg.StrOpt('uri'),
|
|
|
|
cfg.StrOpt('group_id'),
|
|
|
|
cfg.StrOpt('topic'),
|
|
|
|
cfg.StrOpt('consumer_id'),
|
|
|
|
cfg.StrOpt('client_id'),
|
|
|
|
cfg.IntOpt('batch_size'),
|
|
|
|
cfg.IntOpt('max_wait_time_seconds')]
|
|
|
|
|
|
|
|
kafka_alarm_history_opts = [cfg.StrOpt('uri'),
|
|
|
|
cfg.StrOpt('group_id'),
|
|
|
|
cfg.StrOpt('topic'),
|
|
|
|
cfg.StrOpt('consumer_id'),
|
|
|
|
cfg.StrOpt('client_id'),
|
|
|
|
cfg.IntOpt('batch_size'),
|
|
|
|
cfg.IntOpt('max_wait_time_seconds')]
|
|
|
|
|
|
|
|
kafka_metrics_group = cfg.OptGroup(name='kafka_metrics', title='kafka_metrics')
|
|
|
|
kafka_alarm_history_group = cfg.OptGroup(name='kafka_alarm_history',
|
|
|
|
title='kafka_alarm_history')
|
|
|
|
|
|
|
|
cfg.CONF.register_group(kafka_metrics_group)
|
|
|
|
cfg.CONF.register_group(kafka_alarm_history_group)
|
|
|
|
cfg.CONF.register_opts(kafka_metrics_opts, kafka_metrics_group)
|
|
|
|
cfg.CONF.register_opts(kafka_alarm_history_opts, kafka_alarm_history_group)
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-11-20 09:27:58 -07:00
|
|
|
influxdb_opts = [cfg.StrOpt('database_name'),
|
|
|
|
cfg.StrOpt('ip_address'),
|
|
|
|
cfg.StrOpt('port'),
|
|
|
|
cfg.StrOpt('user'),
|
2014-10-03 09:38:12 -06:00
|
|
|
cfg.StrOpt('password')]
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb')
|
2014-09-16 16:15:49 -06:00
|
|
|
cfg.CONF.register_group(influxdb_group)
|
|
|
|
cfg.CONF.register_opts(influxdb_opts, influxdb_group)
|
|
|
|
|
|
|
|
cfg.CONF(sys.argv[1:])
|
|
|
|
|
|
|
|
log_levels = (cfg.CONF.default_log_levels)
|
|
|
|
cfg.set_defaults(log.log_opts, default_log_levels=log_levels)
|
2014-11-20 09:27:58 -07:00
|
|
|
log.setup("monasca-persister")
|
2014-09-16 16:15:49 -06:00
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
metric_persister = MetricPersister(cfg.CONF.kafka_metrics,
|
|
|
|
cfg.CONF.influxdb)
|
|
|
|
alarm_persister = AlarmPersister(cfg.CONF.kafka_alarm_history,
|
|
|
|
cfg.CONF.influxdb)
|
2014-11-19 09:57:37 -07:00
|
|
|
|
2014-11-19 15:20:27 -07:00
|
|
|
metric_persister.start()
|
|
|
|
alarm_persister.start()
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
LOG.info('''
|
|
|
|
|
|
|
|
_____
|
|
|
|
/ \ ____ ____ _____ ______ ____ _____
|
|
|
|
/ \ / \ / _ \ / \\\__ \ / ___// ___\\\__ \\
|
|
|
|
/ Y ( <_> ) | \/ __ \_\___ \\ \___ / __ \\_
|
|
|
|
\____|__ /\____/|___| (____ /____ >\___ >____ /
|
|
|
|
\/ \/ \/ \/ \/ \/
|
|
|
|
__________ .__ __
|
|
|
|
\______ \ ___________ _____|__| _______/ |_ ___________
|
|
|
|
| ___// __ \_ __ \/ ___/ |/ ___/\ __\/ __ \_ __ \\
|
|
|
|
| | \ ___/| | \/\___ \| |\___ \ | | \ ___/| | \/
|
|
|
|
|____| \___ >__| /____ >__/____ > |__| \___ >__|
|
|
|
|
\/ \/ \/ \/
|
|
|
|
|
|
|
|
''')
|
|
|
|
|
|
|
|
LOG.info('Monasca Persister started successfully!')
|
2014-11-20 09:27:58 -07:00
|
|
|
|
2014-09-16 16:15:49 -06:00
|
|
|
|
|
|
|
class Persister(os_service.Service):
|
|
|
|
"""Class used with Openstack service.
|
|
|
|
"""
|
|
|
|
|
2014-11-19 15:20:27 -07:00
|
|
|
def __init__(self, threads=1):
|
2014-11-19 09:57:37 -07:00
|
|
|
super(Persister, self).__init__(threads)
|
|
|
|
|
2014-11-19 15:20:27 -07:00
|
|
|
def start(self):
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
2014-11-19 09:57:37 -07:00
|
|
|
main()
|
|
|
|
|
2014-11-20 09:27:58 -07:00
|
|
|
except:
|
2014-11-19 15:20:27 -07:00
|
|
|
LOG.exception('Persister encountered fatal error. Shutting down.')
|
|
|
|
os._exit(1)
|
2014-09-16 16:15:49 -06:00
|
|
|
|
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
@six.add_metaclass(abc.ABCMeta)
|
|
|
|
class AbstractPersister(threading.Thread):
|
2014-11-24 19:50:28 -07:00
|
|
|
def __init__(self, kafka_conf, influxdb_conf):
|
2014-11-19 15:20:27 -07:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
super(AbstractPersister, self).__init__()
|
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
kafka = KafkaClient(kafka_conf.uri)
|
|
|
|
self._consumer = SimpleConsumer(kafka,
|
|
|
|
kafka_conf.group_id,
|
|
|
|
kafka_conf.topic,
|
|
|
|
# Set to true even though we actually do
|
|
|
|
# the commits manually. Needed to
|
|
|
|
# initialize
|
|
|
|
# offsets correctly.
|
|
|
|
auto_commit=True,
|
|
|
|
# Make these values None so that the
|
|
|
|
# manual commit will do the actual
|
|
|
|
# commit.
|
|
|
|
# Needed so that offsets are initialized
|
|
|
|
# correctly. If not done, then restarts
|
|
|
|
# will reread messages from beginning of
|
|
|
|
# the queue.
|
|
|
|
auto_commit_every_n=None,
|
|
|
|
auto_commit_every_t=None,
|
|
|
|
iter_timeout=1)
|
|
|
|
|
|
|
|
self._influxdb_client = InfluxDBClient(influxdb_conf.ip_address,
|
|
|
|
influxdb_conf.port,
|
|
|
|
influxdb_conf.user,
|
|
|
|
influxdb_conf.password,
|
|
|
|
influxdb_conf.database_name)
|
|
|
|
|
|
|
|
self._max_wait_time_secs = kafka_conf.max_wait_time_seconds
|
|
|
|
self._batch_size = kafka_conf.batch_size
|
|
|
|
self._kafka_topic = kafka_conf.topic
|
2014-10-03 09:38:12 -06:00
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
self._json_body = []
|
|
|
|
self._last_flush = datetime.now()
|
2014-10-03 09:38:12 -06:00
|
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
|
def process_message(self, message):
|
|
|
|
pass
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
def _flush(self):
|
2014-11-21 09:08:01 -07:00
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
if self._json_body:
|
|
|
|
self._influxdb_client.write_points(self._json_body)
|
|
|
|
self._consumer.commit()
|
2014-11-25 10:12:02 -07:00
|
|
|
LOG.info("processed {} messages from topic '{}'".format(
|
2014-11-24 19:50:28 -07:00
|
|
|
len(self._json_body), self._kafka_topic))
|
|
|
|
self._json_body = []
|
|
|
|
self._last_flush = datetime.now()
|
2014-11-19 15:20:27 -07:00
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
def run(self):
|
2014-10-02 08:33:19 -06:00
|
|
|
|
2014-09-16 16:15:49 -06:00
|
|
|
try:
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
while True:
|
2014-10-02 08:33:19 -06:00
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
delta_time = datetime.now() - self._last_flush
|
|
|
|
if delta_time.seconds > self._max_wait_time_secs:
|
2014-11-24 19:50:28 -07:00
|
|
|
self._flush()
|
2014-10-03 09:38:12 -06:00
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
for message in self._consumer:
|
2014-11-24 19:50:28 -07:00
|
|
|
try:
|
|
|
|
self._json_body.append(self.process_message(message))
|
|
|
|
except Exception:
|
|
|
|
LOG.exception('Error processing message. Message is '
|
|
|
|
'being dropped. {}'.format(message))
|
2014-11-21 09:08:01 -07:00
|
|
|
if len(self._json_body) >= self._batch_size:
|
2014-11-24 19:50:28 -07:00
|
|
|
self._flush()
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-11-20 09:27:58 -07:00
|
|
|
except:
|
2014-09-16 16:15:49 -06:00
|
|
|
LOG.exception(
|
2014-11-24 19:50:28 -07:00
|
|
|
'Persister encountered fatal exception processing messages. '
|
|
|
|
'Shutting down all threads and exiting')
|
2014-11-19 15:20:27 -07:00
|
|
|
os._exit(1)
|
2014-09-16 16:15:49 -06:00
|
|
|
|
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
class AlarmPersister(AbstractPersister):
|
|
|
|
"""Class for persisting alarms.
|
|
|
|
"""
|
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
def __init__(self, kafka_conf, influxdb_conf):
|
2014-11-19 15:20:27 -07:00
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
super(AlarmPersister, self).__init__(kafka_conf, influxdb_conf)
|
2014-11-21 09:08:01 -07:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
def process_message(self, message):
|
2014-11-19 15:20:27 -07:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug(message.message.value.decode('utf8'))
|
|
|
|
|
|
|
|
decoded = json.loads(message.message.value)
|
|
|
|
LOG.debug(json.dumps(decoded, sort_keys=True, indent=4))
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
alarm_transitioned = decoded['alarm-transitioned']
|
|
|
|
|
|
|
|
actions_enabled = alarm_transitioned['actionsEnabled']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('actions enabled: %s', actions_enabled)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
alarm_description = alarm_transitioned['alarmDescription']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('alarm description: %s', alarm_description)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
alarm_id = alarm_transitioned['alarmId']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('alarm id: %s', alarm_id)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
alarm_definition_id = alarm_transitioned[
|
2014-10-03 09:38:12 -06:00
|
|
|
'alarmDefinitionId']
|
|
|
|
LOG.debug('alarm definition id: %s', alarm_definition_id)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
metrics = alarm_transitioned['metrics']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('metrics: %s', metrics)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
alarm_name = alarm_transitioned['alarmName']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('alarm name: %s', alarm_name)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
new_state = alarm_transitioned['newState']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('new state: %s', new_state)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
old_state = alarm_transitioned['oldState']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('old state: %s', old_state)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
state_change_reason = alarm_transitioned[
|
2014-10-03 09:38:12 -06:00
|
|
|
'stateChangeReason']
|
|
|
|
LOG.debug('state change reason: %s', state_change_reason)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
tenant_id = alarm_transitioned['tenantId']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('tenant id: %s', tenant_id)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
time_stamp = alarm_transitioned['timestamp']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('time stamp: %s', time_stamp)
|
|
|
|
|
2014-11-21 09:08:01 -07:00
|
|
|
data = {"points": [[time_stamp,
|
|
|
|
'{}',
|
|
|
|
tenant_id.encode('utf8'),
|
2014-10-03 09:38:12 -06:00
|
|
|
alarm_id.encode('utf8'),
|
|
|
|
alarm_definition_id.encode('utf8'),
|
2014-11-24 19:50:28 -07:00
|
|
|
json.dumps(metrics, ensure_ascii=False).encode(
|
|
|
|
'utf8'),
|
2014-11-21 09:08:01 -07:00
|
|
|
old_state.encode('utf8'),
|
2014-10-03 09:38:12 -06:00
|
|
|
new_state.encode('utf8'),
|
|
|
|
state_change_reason.encode('utf8')]],
|
|
|
|
"name": 'alarm_state_history',
|
2014-11-21 09:08:01 -07:00
|
|
|
"columns": ["time",
|
|
|
|
"reason_data",
|
|
|
|
"tenant_id",
|
|
|
|
"alarm_id",
|
|
|
|
"alarm_definition_id",
|
|
|
|
"metrics",
|
|
|
|
"old_state",
|
|
|
|
"new_state",
|
|
|
|
"reason"]}
|
2014-10-03 09:38:12 -06:00
|
|
|
|
|
|
|
LOG.debug(data)
|
2014-11-21 09:08:01 -07:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
|
class MetricPersister(AbstractPersister):
|
2014-09-16 16:15:49 -06:00
|
|
|
"""Class for persisting metrics.
|
|
|
|
"""
|
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
def __init__(self, kafka_conf, influxdb_conf):
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
super(MetricPersister, self).__init__(kafka_conf, influxdb_conf)
|
2014-11-21 09:08:01 -07:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
def process_message(self, message):
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug(message.message.value.decode('utf8'))
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
decoded = json.loads(message.message.value)
|
|
|
|
LOG.debug(json.dumps(decoded, sort_keys=True, indent=4))
|
2014-10-02 08:33:19 -06:00
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
metric = decoded['metric']
|
|
|
|
|
|
|
|
metric_name = metric['name']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('name: %s', metric_name)
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
creation_time = decoded['creation_time']
|
|
|
|
LOG.debug('creation time: %s', creation_time)
|
|
|
|
|
|
|
|
region = decoded['meta']['region']
|
|
|
|
LOG.debug('region: %s', region)
|
|
|
|
|
|
|
|
tenant_id = decoded['meta']['tenantId']
|
|
|
|
LOG.debug('tenant id: %s', tenant_id)
|
|
|
|
|
|
|
|
dimensions = {}
|
2014-10-08 17:06:33 -06:00
|
|
|
if 'dimensions' in metric:
|
|
|
|
for dimension_name in metric['dimensions']:
|
2014-10-03 09:38:12 -06:00
|
|
|
dimensions[dimension_name] = (
|
2014-10-08 17:06:33 -06:00
|
|
|
metric['dimensions'][dimension_name])
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('dimension: %s : %s', dimension_name,
|
|
|
|
dimensions[dimension_name])
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
time_stamp = metric['timestamp']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('timestamp %s', time_stamp)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
value = metric['value']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('value: %s', value)
|
|
|
|
|
|
|
|
url_encoded_serie_name = (
|
|
|
|
urllib.quote(metric_name.encode('utf8'),
|
|
|
|
safe='') + '?' + urllib.quote(
|
|
|
|
tenant_id.encode('utf8'), safe='') + '&' + urllib.quote(
|
|
|
|
region.encode('utf8'), safe=''))
|
|
|
|
|
|
|
|
for dimension_name in dimensions:
|
|
|
|
url_encoded_serie_name += (
|
|
|
|
'&' + urllib.quote(dimension_name.encode('utf8'),
|
|
|
|
safe='') + '=' + urllib.quote(
|
|
|
|
dimensions[dimension_name].encode('utf8'), safe=''))
|
|
|
|
|
|
|
|
LOG.debug("url_encoded_serie_name: %s", url_encoded_serie_name)
|
|
|
|
|
2014-11-21 09:08:01 -07:00
|
|
|
data = {"points": [[value,
|
|
|
|
time_stamp]],
|
|
|
|
"name": url_encoded_serie_name,
|
|
|
|
"columns": ["value",
|
|
|
|
"time"]}
|
2014-10-03 09:38:12 -06:00
|
|
|
|
|
|
|
LOG.debug(data)
|
2014-11-19 15:20:27 -07:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
return data
|
2014-09-16 16:15:49 -06:00
|
|
|
|
|
|
|
|
2014-09-29 09:34:48 -06:00
|
|
|
def main_service():
|
2014-09-16 16:15:49 -06:00
|
|
|
"""Method to use with Openstack service.
|
|
|
|
"""
|
|
|
|
|
|
|
|
service.prepare_service()
|
|
|
|
launcher = os_service.ServiceLauncher()
|
|
|
|
launcher.launch_service(Persister())
|
|
|
|
launcher.wait()
|
|
|
|
|
|
|
|
# Used if run without Openstack service.
|
|
|
|
if __name__ == "__main__":
|
2014-11-20 09:27:58 -07:00
|
|
|
sys.exit(main())
|