diff --git a/monasca_persister/README.md b/monasca_persister/README.md index de2a03b1..046170ca 100644 --- a/monasca_persister/README.md +++ b/monasca_persister/README.md @@ -2,20 +2,20 @@ A Monasca Persister written in Python. -Reads alarms and metrics from a Kafka queue and stores them in an InfluxDB +Reads alarms and metrics from a Kafka queue and stores them in an InfluxDB database. # License -Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +(C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or diff --git a/monasca_persister/__init__.py b/monasca_persister/__init__.py index 58cbdea8..e69de29b 100644 --- a/monasca_persister/__init__.py +++ b/monasca_persister/__init__.py @@ -1 +0,0 @@ -__author__ = 'dieterlyd' diff --git a/monasca_persister/persister.conf b/monasca_persister/persister.conf index c3813bd1..af837342 100644 --- a/monasca_persister/persister.conf +++ b/monasca_persister/persister.conf @@ -1,17 +1,18 @@ [DEFAULT] log_file = persister.log -log_dir = . +log_dir = /var/log/monasca/persister # Default log level is WARNING # Show debugging output in logs (sets DEBUG log level output) debug = false # Show more verbose log output (sets INFO log level output) if debug is False verbose = true -[database] -# Choose database type -# database_type := 'cassandra' | 'influxdb' -#database_type = cassandra -database_type = influxdb +[repositories] +# The driver to use for the metrics repository +metrics_driver = monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository + +# The driver to use for the alarm state history repository +alarm_state_history_driver = monasca_persister.repositories.influxdb.alarm_state_history_repository:AlarmStateHistInfluxdbRepository [zookeeper] # Comma separated list of host:port @@ -59,7 +60,7 @@ port = 8086 user = mon_persister password = password -# Uncomment and set cluster_ip_addresses if database_type is 'cassandra' +# Uncomment, set cluster_ip_addresses, and change the repositories to point to the cassandra classes #[cassandra] # Comma separated list of Cassandra node IP addresses. No spaces. #cluster_ip_addresses: 10.10.10.3 diff --git a/monasca_persister/persister.py b/monasca_persister/persister.py index f3c32f57..05a68b56 100644 --- a/monasca_persister/persister.py +++ b/monasca_persister/persister.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,50 +16,21 @@ """Persister Module The Persister reads metrics and alarms from Kafka and then stores them - in InfluxDB. + in into either Influxdb or Cassandra Start the perister as stand-alone process by running 'persister.py --config-file ' - - Also able to use Openstack service to start the persister. - """ -import abc -import hashlib -import urllib -from datetime import datetime -import json -import os - -import six import sys -import threading - -from cassandra.cluster import Cluster -from cassandra.query import BatchStatement - -from influxdb import InfluxDBClient -import pytz +import simport from oslo_config import cfg from oslo_log import log -from oslo_service import service as os_service -import service - -from monasca_common.kafka.consumer import KafkaConsumer +from repositories.persister import Persister LOG = log.getLogger(__name__) -log.register_options(cfg.CONF) -log.set_defaults() - -database_opts = [cfg.StrOpt('database_type')] - -database_group = cfg.OptGroup(name='database') - -cfg.CONF.register_group(database_group) -cfg.CONF.register_opts(database_opts, database_group) zookeeper_opts = [cfg.StrOpt('uri'), cfg.IntOpt('partition_interval_recheck_seconds')] @@ -93,72 +63,36 @@ cfg.CONF.register_group(kafka_alarm_history_group) cfg.CONF.register_opts(kafka_metrics_opts, kafka_metrics_group) cfg.CONF.register_opts(kafka_alarm_history_opts, kafka_alarm_history_group) -influxdb_opts = [cfg.StrOpt('database_name'), - cfg.StrOpt('ip_address'), - cfg.StrOpt('port'), - cfg.StrOpt('user'), - cfg.StrOpt('password')] +repositories_opts = [ + cfg.StrOpt('metrics_driver', help='The repository driver to use for metrics'), + cfg.StrOpt('alarm_state_history_driver', help='The repository driver to use for alarm state history')] -influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb') -cfg.CONF.register_group(influxdb_group) -cfg.CONF.register_opts(influxdb_opts, influxdb_group) - -cassandra_opts = [cfg.StrOpt('cluster_ip_addresses'), - cfg.StrOpt('keyspace')] - -cassandra_group = cfg.OptGroup(name='cassandra') -cfg.CONF.register_group(cassandra_group) -cfg.CONF.register_opts(cassandra_opts, cassandra_group) - -cfg.CONF(sys.argv[1:], project='monasca', prog='persister') -log.setup(cfg.CONF, "monasca-persister") +repositories_group = cfg.OptGroup(name='repositories', title='repositories') +cfg.CONF.register_group(repositories_group) +cfg.CONF.register_opts(repositories_opts, repositories_group) def main(): + log.register_options(cfg.CONF) + log.set_defaults() + cfg.CONF(sys.argv[1:], project='monasca', prog='persister') + log.setup(cfg.CONF, "monasca-persister") + """Start persister. Start metric persister and alarm persister in separate threads. """ - database_type = cfg.CONF.database.database_type + metric_repository = simport.load(cfg.CONF.repositories.metrics_driver) + alarm_state_history_repository = simport.load(cfg.CONF.repositories.alarm_state_history_driver) - if database_type is None: - LOG.warn("Database type is not configured.") - LOG.warn("Using influxdb for default database type.") - LOG.warn("Please configure a database type using the 'database_type' " - "property in the config file.") + metric_persister = Persister(cfg.CONF.kafka_metrics, + cfg.CONF.zookeeper, + metric_repository) - # Allow None for database_type for backwards compatibility. - if database_type is None or database_type.lower() == 'influxdb': - - metric_persister = MetricInfluxdbPersister(cfg.CONF.kafka_metrics, - cfg.CONF.influxdb, - cfg.CONF.zookeeper) - - alarm_persister = AlarmStateHistInfluxdbPersister( - cfg.CONF.kafka_alarm_history, - cfg.CONF.influxdb, - cfg.CONF.zookeeper) - - elif database_type.lower() == 'cassandra': - - metric_persister = MetricCassandraPersister( - cfg.CONF.kafka_metrics, - cfg.CONF.cassandra, - cfg.CONF.zookeeper) - - alarm_persister = AlarmStateHistCassandraPersister( - cfg.CONF.kafka_alarm_history, - cfg.CONF.cassandra, - cfg.CONF.zookeeper) - - else: - - LOG.error("Unknown database type [{}] is not implemented".format( - database_type)) - LOG.error("Known database types are [influxdb] and [cassandra]") - LOG.error("Please configure a known database type in the config file.") - os._exit(1) + alarm_persister = Persister(cfg.CONF.kafka_alarm_history, + cfg.CONF.zookeeper, + alarm_state_history_repository) metric_persister.start() alarm_persister.start() @@ -182,516 +116,5 @@ def main(): LOG.info('Monasca Persister has started successfully!') - -def shutdown_all_threads_and_die(): - """Shut down all threads and exit process. - - Hit it with a hammer to kill all threads and die. May cause duplicate - messages in kafka queue to be reprocessed when the persister starts again. - Happens if the persister dies just after sending metrics and alarms to the - DB but does not reach the commit. - """ - - os._exit(1) - - -class Persister(os_service.Service): - """Class used with Openstack service. - """ - - def __init__(self, threads=1): - super(Persister, self).__init__(threads) - - def start(self): - - try: - - main() - - except: - LOG.exception('Persister encountered fatal error. ' - 'Shutting down all threads and exiting.') - shutdown_all_threads_and_die() - - -@six.add_metaclass(abc.ABCMeta) -class AbstractPersister(threading.Thread): - - def __init__(self, kafka_conf, db_conf, zookeeper_conf): - - super(AbstractPersister, self).__init__() - - self._data_points = [] - - self._kafka_topic = kafka_conf.topic - - self._database_batch_size = kafka_conf.database_batch_size - - self._consumer = KafkaConsumer( - kafka_conf.uri, - zookeeper_conf.uri, - kafka_conf.zookeeper_path, - kafka_conf.group_id, - kafka_conf.topic, - repartition_callback=self._flush, - commit_callback=self._flush, - commit_timeout=kafka_conf.max_wait_time_seconds) - - self.init_db(db_conf) - - @abc.abstractmethod - def init_db(self, db_conf): - pass - - @abc.abstractmethod - def process_message(self, message): - pass - - @abc.abstractmethod - def execute_batch(self, data_points): - pass - - def _flush(self): - if not self._data_points: - return - - try: - self.execute_batch(self._data_points) - - LOG.info("Processed {} messages from topic '{}'".format( - len(self._data_points), self._kafka_topic)) - - self._data_points = [] - self._consumer.commit() - except Exception: - LOG.exception("Error writing to database: {}" - .format(self._data_points)) - raise - - def run(self): - try: - for raw_message in self._consumer: - try: - message = raw_message[1] - data_point = self.process_message(message) - self._data_points.append(data_point) - except Exception: - LOG.exception('Error processing message. Message is ' - 'being dropped. {}'.format(message)) - - if len(self._data_points) >= self._database_batch_size: - self._flush() - except: - LOG.exception( - 'Persister encountered fatal exception processing ' - 'messages. ' - 'Shutting down all threads and exiting') - shutdown_all_threads_and_die() - - -@six.add_metaclass(abc.ABCMeta) -class AbstractCassandraPersister(AbstractPersister): - - def __init__(self, kafka_conf, cassandra_db_conf, zookeeper_conf): - - super(AbstractCassandraPersister, self).__init__( - kafka_conf, cassandra_db_conf, zookeeper_conf) - - def init_db(self, cassandra_db_conf): - - self._cassandra_cluster = Cluster( - cassandra_db_conf.cluster_ip_addresses.split(',')) - - self.cassandra_session = self._cassandra_cluster.connect( - cassandra_db_conf.keyspace) - - self._batch_stmt = BatchStatement() - -class MetricMeasurementInfo(object): - - def __init__(self, tenant_id, region, metric_hash, metric_set, - measurement): - - self.tenant_id = tenant_id - self.region = region - self.metric_hash = metric_hash - self.metric_set = metric_set - self.measurement = measurement - - -class MetricCassandraPersister(AbstractCassandraPersister): - - def __init__(self, kafka_conf, cassandra_db_conf, zookeeper_conf): - - super(MetricCassandraPersister, self).__init__( - kafka_conf, - cassandra_db_conf, - zookeeper_conf) - - self._insert_measurement_stmt = self.cassandra_session.prepare( - 'insert into measurements (tenant_id,' - 'region, metric_hash, time_stamp, value,' - 'value_meta) values (?, ?, ?, ?, ?, ?)') - - self._insert_metric_map_stmt = self.cassandra_session.prepare( - 'insert into metric_map (tenant_id,' - 'region, metric_hash, ' - 'metric_set) values' - '(?,?,?,?)') - - def process_message(self, message): - - (dimensions, metric_name, region, tenant_id, time_stamp, value, - value_meta) = parse_measurement_message(message) - - metric_hash, metric_set = create_metric_hash(metric_name, - dimensions) - - measurement = (tenant_id.encode('utf8'), - region.encode('utf8'), - metric_hash, - time_stamp, - value, - json.dumps(value_meta, ensure_ascii=False).encode( - 'utf8')) - - LOG.debug(measurement) - - return MetricMeasurementInfo( - tenant_id.encode('utf8'), - region.encode('utf8'), - metric_hash, - metric_set, - measurement) - - def execute_batch(self, metric_measurement_infos): - - for metric_measurement_info in metric_measurement_infos: - - self._batch_stmt.add(self._insert_measurement_stmt, - metric_measurement_info.measurement) - - metric_map = (metric_measurement_info.tenant_id, - metric_measurement_info.region, - metric_measurement_info.metric_hash, - metric_measurement_info.metric_set) - - self._batch_stmt.add(self._insert_metric_map_stmt, - metric_map) - - self.cassandra_session.execute(self._batch_stmt) - - self._batch_stmt = BatchStatement() - -def create_metric_hash(metric_name, dimensions): - - metric_name_part = '__name__' + '=' + urllib.quote_plus(metric_name) - - hash_string = metric_name_part - - metric_set = set() - - metric_set.add(metric_name_part) - - for dim_name in sorted(dimensions.iterkeys()): - dimension = (urllib.quote_plus(dim_name) + '=' + urllib.quote_plus( - dimensions[dim_name])) - metric_set.add(dimension) - hash_string += dimension - - sha1_hash = hashlib.sha1(hash_string).hexdigest() - - return bytearray.fromhex(sha1_hash), metric_set - - -class AlarmStateHistCassandraPersister(AbstractCassandraPersister): - - def __init__(self, kafka_conf, cassandra_db_conf, zookeeper_conf): - - super(AlarmStateHistCassandraPersister, self).__init__( - kafka_conf, - cassandra_db_conf, - zookeeper_conf) - - self._insert_alarm_state_hist_stmt = self.cassandra_session.prepare( - 'insert into alarm_state_history (tenant_id, alarm_id, ' - 'metrics, new_state, ' - 'old_state, reason, reason_data, ' - 'sub_alarms, time_stamp) values (?,?,?,?,?,?,?,?,?)') - - def process_message(self, message): - - (alarm_id, metrics, new_state, old_state, state_change_reason, - sub_alarms_json_snake_case, tenant_id, - time_stamp) = parse_alarm_state_hist_message( - message) - - alarm_state_hist = ( - tenant_id.encode('utf8'), - alarm_id.encode('utf8'), - json.dumps(metrics, ensure_ascii=False).encode( - 'utf8'), - new_state.encode('utf8'), - old_state.encode('utf8'), - state_change_reason.encode('utf8'), - "{}".encode('utf8'), - sub_alarms_json_snake_case.encode('utf8'), - time_stamp - ) - - LOG.debug(alarm_state_hist) - - return alarm_state_hist - - def execute_batch(self, alarm_state_hists): - - for alarm_state_hist in alarm_state_hists: - self._batch_stmt.add(self._insert_alarm_state_hist_stmt, - alarm_state_hist) - - self.cassandra_session.execute(self._batch_stmt) - - self._batch_stmt = BatchStatement() - - -@six.add_metaclass(abc.ABCMeta) -class AbstractInfluxdbPersister(AbstractPersister): - - def __init__(self, kafka_conf, influxdb_db_conf, zookeeper_conf): - - super(AbstractInfluxdbPersister, self).__init__( - kafka_conf, influxdb_db_conf, zookeeper_conf) - - def init_db(self, influxdb_db_conf): - - self._influxdb_client = InfluxDBClient(influxdb_db_conf.ip_address, - influxdb_db_conf.port, - influxdb_db_conf.user, - influxdb_db_conf.password, - influxdb_db_conf.database_name) - - def execute_batch(self, data_points): - - self._influxdb_client.write_points(data_points, 'ms') - - -class AlarmStateHistInfluxdbPersister(AbstractInfluxdbPersister): - - def __init__(self, kafka_conf, influxdb_db_conf, zookeeper_conf): - - super(AlarmStateHistInfluxdbPersister, self).__init__( - kafka_conf, influxdb_db_conf, zookeeper_conf) - - def process_message(self, message): - - (alarm_id, metrics, new_state, old_state, link, - lifecycle_state, state_change_reason, - sub_alarms_json_snake_case, tenant_id, - time_stamp) = parse_alarm_state_hist_message( - message) - - ts = time_stamp / 1000.0 - - data = {"measurement": 'alarm_state_history', - "time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( - '%Y-%m-%dT%H:%M:%S.%fZ'), - "fields": { - "tenant_id": tenant_id.encode('utf8'), - "alarm_id": alarm_id.encode('utf8'), - "metrics": json.dumps(metrics, ensure_ascii=False).encode( - 'utf8'), - "new_state": new_state.encode('utf8'), - "old_state": old_state.encode('utf8'), - "link": link.encode('utf8'), - "lifecycle_state": lifecycle_state.encode('utf8'), - "reason": state_change_reason.encode('utf8'), - "reason_data": "{}".encode('utf8'), - "sub_alarms": sub_alarms_json_snake_case.encode('utf8') - }, - "tags": { - "tenant_id": tenant_id.encode('utf8') - }} - - LOG.debug(data) - - return data - - -class MetricInfluxdbPersister(AbstractInfluxdbPersister): - - def __init__(self, kafka_conf, influxdb_db_conf, zookeeper_conf): - - super(MetricInfluxdbPersister, self).__init__(kafka_conf, - influxdb_db_conf, - zookeeper_conf) - - def process_message(self, message): - - (dimensions, metric_name, region, tenant_id, time_stamp, value, - value_meta) = parse_measurement_message(message) - - tags = dimensions - tags['_tenant_id'] = tenant_id.encode('utf8') - tags['_region'] = region.encode('utf8') - - ts = time_stamp / 1000.0 - - data = {"measurement": metric_name.encode('utf8'), - "time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( - '%Y-%m-%dT%H:%M:%S.%fZ'), - "fields": { - "value": value, - "value_meta": json.dumps(value_meta, - ensure_ascii=False).encode('utf8') - }, - "tags": tags} - - LOG.debug(data) - - return data - - -def parse_measurement_message(message): - - LOG.debug(message.message.value.decode('utf8')) - - decoded_message = json.loads(message.message.value) - LOG.debug(json.dumps(decoded_message, sort_keys=True, indent=4)) - - metric = decoded_message['metric'] - - metric_name = metric['name'] - LOG.debug('name: %s', metric_name) - - creation_time = decoded_message['creation_time'] - LOG.debug('creation time: %s', creation_time) - - region = decoded_message['meta']['region'] - LOG.debug('region: %s', region) - - tenant_id = decoded_message['meta']['tenantId'] - LOG.debug('tenant id: %s', tenant_id) - - dimensions = {} - if 'dimensions' in metric: - for dimension_name in metric['dimensions']: - dimensions[dimension_name.encode('utf8')] = ( - metric['dimensions'][dimension_name].encode('utf8')) - LOG.debug('dimension: %s : %s', dimension_name, - dimensions[dimension_name]) - - time_stamp = metric['timestamp'] - LOG.debug('timestamp %s', time_stamp) - - value = float(metric['value']) - LOG.debug('value: %s', value) - - if 'value_meta' in metric and metric['value_meta']: - - value_meta = metric['value_meta'] - - else: - - value_meta = {} - LOG.debug('value_meta: %s', value_meta) - - return (dimensions, metric_name, region, tenant_id, time_stamp, value, - value_meta) - - -def parse_alarm_state_hist_message(message): - - LOG.debug(message.message.value.decode('utf8')) - - decoded_message = json.loads(message.message.value) - LOG.debug(json.dumps(decoded_message, sort_keys=True, indent=4)) - - alarm_transitioned = decoded_message['alarm-transitioned'] - - actions_enabled = alarm_transitioned['actionsEnabled'] - LOG.debug('actions enabled: %s', actions_enabled) - - alarm_description = alarm_transitioned['alarmDescription'] - LOG.debug('alarm description: %s', alarm_description) - - alarm_id = alarm_transitioned['alarmId'] - LOG.debug('alarm id: %s', alarm_id) - - alarm_definition_id = alarm_transitioned[ - 'alarmDefinitionId'] - LOG.debug('alarm definition id: %s', alarm_definition_id) - - metrics = alarm_transitioned['metrics'] - LOG.debug('metrics: %s', metrics) - - alarm_name = alarm_transitioned['alarmName'] - LOG.debug('alarm name: %s', alarm_name) - - new_state = alarm_transitioned['newState'] - LOG.debug('new state: %s', new_state) - - old_state = alarm_transitioned['oldState'] - LOG.debug('old state: %s', old_state) - - # Key may not exist or value may be none, convert both to "" - if 'link' in alarm_transitioned and alarm_transitioned['link'] is not None: - link = alarm_transitioned['link'] - else: - link = "" - LOG.debug('link: %s', link) - - # Key may not exist or value may be none, convert both to "" - if 'lifecycleState' in alarm_transitioned and alarm_transitioned['lifecycleState'] is not None: - lifecycle_state = alarm_transitioned['lifecycleState'] - else: - lifecycle_state = "" - LOG.debug('lifecycle_state: %s', lifecycle_state) - - state_change_reason = alarm_transitioned[ - 'stateChangeReason'] - LOG.debug('state change reason: %s', state_change_reason) - - tenant_id = alarm_transitioned['tenantId'] - LOG.debug('tenant id: %s', tenant_id) - - time_stamp = alarm_transitioned['timestamp'] - LOG.debug('time stamp: %s', time_stamp) - - sub_alarms = alarm_transitioned['subAlarms'] - if sub_alarms: - - sub_alarms_json = json.dumps(sub_alarms, ensure_ascii=False) - - sub_alarms_json_snake_case = sub_alarms_json.replace( - '"subAlarmExpression":', - '"sub_alarm_expression":') - - sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace( - '"metricDefinition":', - '"metric_definition":') - - sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace( - '"subAlarmState":', - '"sub_alarm_state":') - - else: - - sub_alarms_json_snake_case = "[]" - - return (alarm_id, metrics, new_state, old_state, link, - lifecycle_state, state_change_reason, - sub_alarms_json_snake_case, tenant_id, time_stamp) - -def main_service(): - """Method to use with Openstack service. - """ - - service.prepare_service() - launcher = os_service.ServiceLauncher() - launcher.launch_service(Persister()) - launcher.wait() - - -# Used if run without Openstack service. if __name__ == "__main__": sys.exit(main()) diff --git a/monasca_persister/repositories/__init__.py b/monasca_persister/repositories/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/monasca_persister/servicerunner.py b/monasca_persister/repositories/abstract_repository.py similarity index 58% rename from monasca_persister/servicerunner.py rename to monasca_persister/repositories/abstract_repository.py index be1ab628..6e9301cd 100644 --- a/monasca_persister/servicerunner.py +++ b/monasca_persister/repositories/abstract_repository.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python -# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,18 +12,20 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. - -""" servicerunner runs the persister as an Openstack service. - -""" -import sys - -from persister import main_service +import abc +import six -def main(): - main_service() +@six.add_metaclass(abc.ABCMeta) +class AbstractRepository(object): + def __init__(self): + super(AbstractRepository, self).__init__() -if __name__ == "__main__": - sys.exit(main()) + @abc.abstractmethod + def process_message(self, message): + pass + + @abc.abstractmethod + def write_batch(self, data_points): + pass diff --git a/monasca_persister/service.py b/monasca_persister/repositories/cassandra/__init__.py similarity index 62% rename from monasca_persister/service.py rename to monasca_persister/repositories/cassandra/__init__.py index ad9b0d7b..a9d04897 100644 --- a/monasca_persister/service.py +++ b/monasca_persister/repositories/cassandra/__init__.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python -# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,16 +12,11 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -import sys - from oslo_config import cfg -from oslo_log import log -LOG = log.getLogger(__name__) +cassandra_opts = [cfg.StrOpt('cluster_ip_addresses'), + cfg.StrOpt('keyspace')] -def prepare_service(argv=None): - if argv is None: - argv = sys.argv - cfg.CONF(argv[1:], project='persister') - log.setup(cfg.CONF, 'persister') - LOG.info('Service has started!') +cassandra_group = cfg.OptGroup(name='cassandra') +cfg.CONF.register_group(cassandra_group) +cfg.CONF.register_opts(cassandra_opts, cassandra_group) diff --git a/monasca_persister/repositories/cassandra/abstract_repository.py b/monasca_persister/repositories/cassandra/abstract_repository.py new file mode 100644 index 00000000..a0d5e9d7 --- /dev/null +++ b/monasca_persister/repositories/cassandra/abstract_repository.py @@ -0,0 +1,37 @@ +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import abc +from cassandra.cluster import Cluster +from cassandra.query import BatchStatement +from oslo_config import cfg +import six + +from repositories.abstract_repository import AbstractRepository + + +@six.add_metaclass(abc.ABCMeta) +class AbstractCassandraRepository(AbstractRepository): + + def __init__(self): + super(AbstractCassandraRepository, self).__init__() + self.conf = cfg.CONF + + self._cassandra_cluster = Cluster( + self.conf.cassandra.cluster_ip_addresses.split(',')) + + self.cassandra_session = self._cassandra_cluster.connect( + self.conf.cassandra.keyspace) + + self._batch_stmt = BatchStatement() diff --git a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py new file mode 100644 index 00000000..b74d9b6d --- /dev/null +++ b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py @@ -0,0 +1,70 @@ +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json + +from cassandra.query import BatchStatement +from oslo_log import log + +from repositories.cassandra.abstract_repository import AbstractCassandraRepository +from repositories.utils import parse_alarm_state_hist_message + +LOG = log.getLogger(__name__) + + +class AlarmStateHistCassandraRepository(AbstractCassandraRepository): + + def __init__(self): + + super(AlarmStateHistCassandraRepository, self).__init__() + + self._insert_alarm_state_hist_stmt = self.cassandra_session.prepare( + 'insert into alarm_state_history (tenant_id, alarm_id, ' + 'metrics, new_state, ' + 'old_state, reason, reason_data, ' + 'sub_alarms, time_stamp) values (?,?,?,?,?,?,?,?,?)') + + def process_message(self, message): + + (alarm_id, metrics, new_state, old_state, state_change_reason, + sub_alarms_json_snake_case, tenant_id, + time_stamp) = parse_alarm_state_hist_message( + message) + + alarm_state_hist = ( + tenant_id.encode('utf8'), + alarm_id.encode('utf8'), + json.dumps(metrics, ensure_ascii=False).encode( + 'utf8'), + new_state.encode('utf8'), + old_state.encode('utf8'), + state_change_reason.encode('utf8'), + "{}".encode('utf8'), + sub_alarms_json_snake_case.encode('utf8'), + time_stamp + ) + + LOG.debug(alarm_state_hist) + + return alarm_state_hist + + def write_batch(self, alarm_state_hists): + + for alarm_state_hist in alarm_state_hists: + self._batch_stmt.add(self._insert_alarm_state_hist_stmt, + alarm_state_hist) + + self.cassandra_session.execute(self._batch_stmt) + + self._batch_stmt = BatchStatement() \ No newline at end of file diff --git a/monasca_persister/repositories/cassandra/metrics_repository.py b/monasca_persister/repositories/cassandra/metrics_repository.py new file mode 100644 index 00000000..77963d1b --- /dev/null +++ b/monasca_persister/repositories/cassandra/metrics_repository.py @@ -0,0 +1,120 @@ +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import hashlib +import json + +from cassandra.query import BatchStatement +from oslo_log import log +import urllib + +from repositories.cassandra.abstract_repository import AbstractCassandraRepository +from repositories.utils import parse_measurement_message + +LOG = log.getLogger(__name__) + + +class MetricCassandraRepository(AbstractCassandraRepository): + + def __init__(self): + + super(MetricCassandraRepository, self).__init__() + + self._insert_measurement_stmt = self.cassandra_session.prepare( + 'insert into measurements (tenant_id,' + 'region, metric_hash, time_stamp, value,' + 'value_meta) values (?, ?, ?, ?, ?, ?)') + + self._insert_metric_map_stmt = self.cassandra_session.prepare( + 'insert into metric_map (tenant_id,' + 'region, metric_hash, ' + 'metric_set) values' + '(?,?,?,?)') + + def process_message(self, message): + + (dimensions, metric_name, region, tenant_id, time_stamp, value, + value_meta) = parse_measurement_message(message) + + metric_hash, metric_set = create_metric_hash(metric_name, + dimensions) + + measurement = (tenant_id.encode('utf8'), + region.encode('utf8'), + metric_hash, + time_stamp, + value, + json.dumps(value_meta, ensure_ascii=False).encode( + 'utf8')) + + LOG.debug(measurement) + + return MetricMeasurementInfo( + tenant_id.encode('utf8'), + region.encode('utf8'), + metric_hash, + metric_set, + measurement) + + def write_batch(self, metric_measurement_infos): + + for metric_measurement_info in metric_measurement_infos: + + self._batch_stmt.add(self._insert_measurement_stmt, + metric_measurement_info.measurement) + + metric_map = (metric_measurement_info.tenant_id, + metric_measurement_info.region, + metric_measurement_info.metric_hash, + metric_measurement_info.metric_set) + + self._batch_stmt.add(self._insert_metric_map_stmt, + metric_map) + + self.cassandra_session.execute(self._batch_stmt) + + self._batch_stmt = BatchStatement() + + +class MetricMeasurementInfo(object): + + def __init__(self, tenant_id, region, metric_hash, metric_set, + measurement): + + self.tenant_id = tenant_id + self.region = region + self.metric_hash = metric_hash + self.metric_set = metric_set + self.measurement = measurement + + +def create_metric_hash(metric_name, dimensions): + + metric_name_part = '__name__' + '=' + urllib.quote_plus(metric_name) + + hash_string = metric_name_part + + metric_set = set() + + metric_set.add(metric_name_part) + + for dim_name in sorted(dimensions.iterkeys()): + dimension = (urllib.quote_plus(dim_name) + '=' + urllib.quote_plus( + dimensions[dim_name])) + metric_set.add(dimension) + hash_string += dimension + + sha1_hash = hashlib.sha1(hash_string).hexdigest() + + return bytearray.fromhex(sha1_hash), metric_set diff --git a/monasca_persister/repositories/influxdb/__init__.py b/monasca_persister/repositories/influxdb/__init__.py new file mode 100644 index 00000000..fc03740b --- /dev/null +++ b/monasca_persister/repositories/influxdb/__init__.py @@ -0,0 +1,25 @@ +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from oslo_config import cfg + +influxdb_opts = [cfg.StrOpt('database_name'), + cfg.StrOpt('ip_address'), + cfg.StrOpt('port'), + cfg.StrOpt('user'), + cfg.StrOpt('password')] + +influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb') +cfg.CONF.register_group(influxdb_group) +cfg.CONF.register_opts(influxdb_opts, influxdb_group) diff --git a/monasca_persister/repositories/influxdb/abstract_repository.py b/monasca_persister/repositories/influxdb/abstract_repository.py new file mode 100644 index 00000000..9461a861 --- /dev/null +++ b/monasca_persister/repositories/influxdb/abstract_repository.py @@ -0,0 +1,36 @@ +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import abc +from influxdb import InfluxDBClient +from oslo_config import cfg +import six + +from repositories.abstract_repository import AbstractRepository + + +@six.add_metaclass(abc.ABCMeta) +class AbstractInfluxdbRepository(AbstractRepository): + + def __init__(self): + super(AbstractInfluxdbRepository, self).__init__() + self.conf = cfg.CONF + self._influxdb_client = InfluxDBClient(self.conf.influxdb.ip_address, + self.conf.influxdb.port, + self.conf.influxdb.user, + self.conf.influxdb.password, + self.conf.influxdb.database_name) + + def write_batch(self, data_points): + self._influxdb_client.write_points(data_points, 'ms') diff --git a/monasca_persister/repositories/influxdb/alarm_state_history_repository.py b/monasca_persister/repositories/influxdb/alarm_state_history_repository.py new file mode 100644 index 00000000..d17a4f5e --- /dev/null +++ b/monasca_persister/repositories/influxdb/alarm_state_history_repository.py @@ -0,0 +1,65 @@ +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import datetime +import json + +from oslo_log import log +import pytz + +from repositories.influxdb.abstract_repository import AbstractInfluxdbRepository +from repositories.utils import parse_alarm_state_hist_message + +LOG = log.getLogger(__name__) + + +class AlarmStateHistInfluxdbRepository(AbstractInfluxdbRepository): + + def __init__(self): + + super(AlarmStateHistInfluxdbRepository, self).__init__() + + def process_message(self, message): + + (alarm_id, metrics, new_state, old_state, link, + lifecycle_state, state_change_reason, + sub_alarms_json_snake_case, tenant_id, + time_stamp) = parse_alarm_state_hist_message( + message) + + ts = time_stamp / 1000.0 + + data = {"measurement": 'alarm_state_history', + "time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( + '%Y-%m-%dT%H:%M:%S.%fZ'), + "fields": { + "tenant_id": tenant_id.encode('utf8'), + "alarm_id": alarm_id.encode('utf8'), + "metrics": json.dumps(metrics, ensure_ascii=False).encode( + 'utf8'), + "new_state": new_state.encode('utf8'), + "old_state": old_state.encode('utf8'), + "link": link.encode('utf8'), + "lifecycle_state": lifecycle_state.encode('utf8'), + "reason": state_change_reason.encode('utf8'), + "reason_data": "{}".encode('utf8'), + "sub_alarms": sub_alarms_json_snake_case.encode('utf8') + }, + "tags": { + "tenant_id": tenant_id.encode('utf8') + }} + + LOG.debug(data) + + return data diff --git a/monasca_persister/repositories/influxdb/metrics_repository.py b/monasca_persister/repositories/influxdb/metrics_repository.py new file mode 100644 index 00000000..3f7af8a8 --- /dev/null +++ b/monasca_persister/repositories/influxdb/metrics_repository.py @@ -0,0 +1,56 @@ +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import datetime +import json + +from oslo_log import log +import pytz + +from repositories.influxdb.abstract_repository import AbstractInfluxdbRepository +from repositories.utils import parse_measurement_message + +LOG = log.getLogger(__name__) + + +class MetricInfluxdbRepository(AbstractInfluxdbRepository): + + def __init__(self): + + super(MetricInfluxdbRepository, self).__init__() + + def process_message(self, message): + + (dimensions, metric_name, region, tenant_id, time_stamp, value, + value_meta) = parse_measurement_message(message) + + tags = dimensions + tags['_tenant_id'] = tenant_id.encode('utf8') + tags['_region'] = region.encode('utf8') + + ts = time_stamp / 1000.0 + + data = {"measurement": metric_name.encode('utf8'), + "time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( + '%Y-%m-%dT%H:%M:%S.%fZ'), + "fields": { + "value": value, + "value_meta": json.dumps(value_meta, + ensure_ascii=False).encode('utf8') + }, + "tags": tags} + + LOG.debug(data) + + return data diff --git a/monasca_persister/repositories/persister.py b/monasca_persister/repositories/persister.py new file mode 100644 index 00000000..cd6b09e4 --- /dev/null +++ b/monasca_persister/repositories/persister.py @@ -0,0 +1,84 @@ +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import threading + +from oslo_log import log + +from monasca_common.kafka.consumer import KafkaConsumer + +LOG = log.getLogger(__name__) + + +class Persister(threading.Thread): + + def __init__(self, kafka_conf, zookeeper_conf, repository): + + super(Persister, self).__init__() + + self._data_points = [] + + self._kafka_topic = kafka_conf.topic + + self._database_batch_size = kafka_conf.database_batch_size + + self._consumer = KafkaConsumer( + kafka_conf.uri, + zookeeper_conf.uri, + kafka_conf.zookeeper_path, + kafka_conf.group_id, + kafka_conf.topic, + repartition_callback=self._flush, + commit_callback=self._flush, + commit_timeout=kafka_conf.max_wait_time_seconds) + + self.repository = repository() + + def _flush(self): + if not self._data_points: + return + + try: + self.repository.write_batch(self._data_points) + + LOG.info("Processed {} messages from topic '{}'".format( + len(self._data_points), self._kafka_topic)) + + self._data_points = [] + self._consumer.commit() + except Exception: + LOG.exception("Error writing to database: {}" + .format(self._data_points)) + raise + + def run(self): + try: + for raw_message in self._consumer: + try: + message = raw_message[1] + data_point = self.repository.process_message(message) + self._data_points.append(data_point) + except Exception: + LOG.exception('Error processing message. Message is ' + 'being dropped. {}'.format(message)) + + if len(self._data_points) >= self._database_batch_size: + self._flush() + except: + LOG.exception( + 'Persister encountered fatal exception processing ' + 'messages. ' + 'Shutting down all threads and exiting') + os._exit(1) diff --git a/monasca_persister/repositories/utils.py b/monasca_persister/repositories/utils.py new file mode 100644 index 00000000..fa4f3a3e --- /dev/null +++ b/monasca_persister/repositories/utils.py @@ -0,0 +1,102 @@ +# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json + + +def parse_measurement_message(message): + + decoded_message = json.loads(message.message.value) + + metric = decoded_message['metric'] + + metric_name = metric['name'] + + region = decoded_message['meta']['region'] + + tenant_id = decoded_message['meta']['tenantId'] + + dimensions = {} + if 'dimensions' in metric: + for dimension_name in metric['dimensions']: + dimensions[dimension_name.encode('utf8')] = ( + metric['dimensions'][dimension_name].encode('utf8')) + + time_stamp = metric['timestamp'] + + value = float(metric['value']) + + if 'value_meta' in metric and metric['value_meta']: + value_meta = metric['value_meta'] + + else: + value_meta = {} + + return (dimensions, metric_name, region, tenant_id, time_stamp, value, + value_meta) + + +def parse_alarm_state_hist_message(message): + + decoded_message = json.loads(message.message.value) + + alarm_transitioned = decoded_message['alarm-transitioned'] + + alarm_id = alarm_transitioned['alarmId'] + + metrics = alarm_transitioned['metrics'] + + new_state = alarm_transitioned['newState'] + + old_state = alarm_transitioned['oldState'] + + # Key may not exist or value may be none, convert both to "" + if 'link' in alarm_transitioned and alarm_transitioned['link'] is not None: + link = alarm_transitioned['link'] + else: + link = "" + + # Key may not exist or value may be none, convert both to "" + if 'lifecycleState' in alarm_transitioned and alarm_transitioned['lifecycleState'] is not None: + lifecycle_state = alarm_transitioned['lifecycleState'] + else: + lifecycle_state = "" + + state_change_reason = alarm_transitioned['stateChangeReason'] + + tenant_id = alarm_transitioned['tenantId'] + + time_stamp = alarm_transitioned['timestamp'] + + sub_alarms = alarm_transitioned['subAlarms'] + if sub_alarms: + sub_alarms_json = json.dumps(sub_alarms, ensure_ascii=False) + + sub_alarms_json_snake_case = sub_alarms_json.replace( + '"subAlarmExpression":', + '"sub_alarm_expression":') + + sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace( + '"metricDefinition":', + '"metric_definition":') + + sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace( + '"subAlarmState":', + '"sub_alarm_state":') + else: + sub_alarms_json_snake_case = "[]" + + return (alarm_id, metrics, new_state, old_state, link, + lifecycle_state, state_change_reason, + sub_alarms_json_snake_case, tenant_id, time_stamp) diff --git a/monasca_persister/tests/__init__.py b/monasca_persister/tests/__init__.py index 58cbdea8..e69de29b 100644 --- a/monasca_persister/tests/__init__.py +++ b/monasca_persister/tests/__init__.py @@ -1 +0,0 @@ -__author__ = 'dieterlyd' diff --git a/requirements.txt b/requirements.txt index d30c2ab3..4ff8a8e1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,5 @@ babel influxdb==2.8.0 cassandra-driver==3.0.0 iso8601 +simport monasca-common