diff --git a/monasca_persister/persister.py b/monasca_persister/persister.py index 7cc85851..f6df1f2c 100644 --- a/monasca_persister/persister.py +++ b/monasca_persister/persister.py @@ -33,13 +33,13 @@ import os import six import sys import threading -import urllib from influxdb import InfluxDBClient from kafka import KafkaClient from kafka import SimpleConsumer from kazoo.client import KazooClient from kazoo.recipe.partitioner import SetPartitioner +import pytz from oslo.config import cfg from openstack.common import log @@ -90,6 +90,10 @@ influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb') cfg.CONF.register_group(influxdb_group) cfg.CONF.register_opts(influxdb_opts, influxdb_group) +cfg.CONF(sys.argv[1:], project='monasca', prog='persister') +log_levels = (cfg.CONF.default_log_levels) +cfg.set_defaults(log.log_opts, default_log_levels=log_levels) +log.setup("monasca-persister") def main(): """Start persister. @@ -97,11 +101,6 @@ def main(): Start metric persister and alarm persister in separate threads. """ - cfg.CONF(args=[], project='monasca', prog='persister') - log_levels = (cfg.CONF.default_log_levels) - cfg.set_defaults(log.log_opts, default_log_levels=log_levels) - log.setup("monasca-persister") - metric_persister = MetricPersister(cfg.CONF.kafka_metrics, cfg.CONF.influxdb, cfg.CONF.zookeeper) @@ -213,7 +212,7 @@ class AbstractPersister(threading.Thread): self._kafka_topic = kafka_conf.topic self._message_count = 0 - self._data_points = {} + self._data_points = [] self._last_flush = datetime.now() self._last_partition_check = datetime.now() @@ -224,35 +223,49 @@ class AbstractPersister(threading.Thread): def _flush(self, partitions): if self._data_points: + try: - self._influxdb_client.write_points(self._data_points.values(), 'ms') + + self._influxdb_client.write_points(self._data_points, 'ms') + except Exception: - log.exception("Error writing to influxdb: {}" - .format(self._data_points.values())) + + LOG.exception("Error writing to influxdb: {}" + .format(self._data_points)) + raise self._consumer.commit(partitions=partitions) + LOG.info("Processed {} messages from topic '{}'".format( self._message_count, self._kafka_topic)) - self._data_points = {} + + self._data_points = [] + self._message_count = 0 + self._last_flush = datetime.now() def _is_time_for_repartition_check(self): - delta_partition_check_time = datetime.now() - self._last_partition_check + delta_partition_check_time = (datetime.now() - + self._last_partition_check) return delta_partition_check_time.seconds >= ( self._partition_interval_recheck_secs) def _process_messages(self, partitions): + while 1: if self._is_time_for_repartition_check(): + return delta_flush_time = datetime.now() - self._last_flush + if delta_flush_time.seconds >= self._max_wait_time_secs: + self._flush(partitions) for message in self._consumer: @@ -261,24 +274,21 @@ class AbstractPersister(threading.Thread): data_point = self.process_message(message) - key = data_point['name'] - - if key in self._data_points: - points = data_point['points'] - self._data_points[key]['points'].extend(points) - else: - self._data_points[key] = data_point + self._data_points.append(data_point) self._message_count += 1 if self._is_time_for_repartition_check(): + return except Exception: + LOG.exception('Error processing message. Message is ' 'being dropped. {}'.format(message)) if self._message_count >= self._database_batch_size: + self._flush(partitions) def _get_set_partitioner(self): @@ -308,6 +318,7 @@ class AbstractPersister(threading.Thread): try: set_partitioner = self._get_set_partitioner() + partitions = [] while 1: @@ -347,13 +358,16 @@ class AbstractPersister(threading.Thread): # by this instance of the persister. partitioned_fetch_offsets = {} + for p in partitions: + partitioned_fetch_offsets[p] = ( self._consumer.fetch_offsets[p]) self._consumer.fetch_offsets = partitioned_fetch_offsets self._last_partition_check = datetime.now() + self._process_messages(partitions) elif set_partitioner.allocating: @@ -424,26 +438,26 @@ class AlarmPersister(AbstractPersister): time_stamp = alarm_transitioned['timestamp'] LOG.debug('time stamp: %s', time_stamp) - data = {"points": [[time_stamp, - '{}', - tenant_id.encode('utf8'), - alarm_id.encode('utf8'), - alarm_definition_id.encode('utf8'), - json.dumps(metrics, ensure_ascii=False).encode( - 'utf8'), - old_state.encode('utf8'), - new_state.encode('utf8'), - state_change_reason.encode('utf8')]], - "name": 'alarm_state_history', - "columns": ["time", - "reason_data", - "tenant_id", - "alarm_id", - "alarm_definition_id", - "metrics", - "old_state", - "new_state", - "reason"]} + sub_alarms = alarm_transitioned['subAlarms'] + + ts = time_stamp / 1000.0 + + data = {"name": 'alarm_state_history', + "timestamp": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( + '%Y-%m-%dT%H:%M:%S.%fZ'), + "fields": { + "tenant_id": tenant_id.encode('utf8'), + "alarm_id": alarm_id.encode('utf8'), + "metrics": json.dumps(metrics, ensure_ascii=False).encode('utf8'), + "new_state": new_state.encode('utf8'), + "old_state": old_state.encode('utf8'), + "reason": state_change_reason.encode('utf8'), + "reason_data": state_change_reason.encode('utf8'), + "sub_alarms": json.dumps(sub_alarms, ensure_ascii=False).encode('utf8') + }, + "tags": { + "tenant_id": tenant_id.encode('utf8') + }} LOG.debug(data) @@ -495,26 +509,29 @@ class MetricPersister(AbstractPersister): value = metric['value'] LOG.debug('value: %s', value) - url_encoded_serie_name = ( - urllib.quote(tenant_id.encode('utf8'), safe='') + - '?' + - urllib.quote(region.encode('utf8'), safe='') + - '&' + - urllib.quote(metric_name.encode('utf8'), safe='')) + if 'value_meta' in metric: - for dimension_name in dimensions: - url_encoded_serie_name += ( - '&' + urllib.quote(dimension_name.encode('utf8'), - safe='') + '=' + urllib.quote( - dimensions[dimension_name].encode('utf8'), safe='')) + value_meta = metric['value_meta'] - LOG.debug("url_encoded_serie_name: %s", url_encoded_serie_name) + else: - data = {"points": [[value, - time_stamp]], - "name": url_encoded_serie_name, - "columns": ["value", - "time"]} + value_meta = '' + + LOG.debug('value_meta: %s', value_meta) + + ts = time_stamp / 1000.0 + + data = {"name": metric_name.encode('utf8'), + "timestamp": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( + '%Y-%m-%dT%H:%M:%S.%fZ'), + "fields": { + "value": value, + "value_meta": json.dumps(value_meta, ensure_ascii=False).encode('utf8') + }, + "tags": { + "_tenant_id": tenant_id.encode('utf8'), + "_region": region.encode('utf8') + }} LOG.debug(data) diff --git a/monasca_persister/service.py b/monasca_persister/service.py index 7c372499..3d75f33b 100644 --- a/monasca_persister/service.py +++ b/monasca_persister/service.py @@ -31,4 +31,4 @@ def prepare_service(argv=None): argv = sys.argv cfg.CONF(argv[1:], project='persister') log.setup('persister') - LOG.info('Service has started!') \ No newline at end of file + LOG.info('Service has started!') diff --git a/monasca_persister/servicerunner.py b/monasca_persister/servicerunner.py index a124e68f..be1ab628 100644 --- a/monasca_persister/servicerunner.py +++ b/monasca_persister/servicerunner.py @@ -27,4 +27,4 @@ def main(): if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file + sys.exit(main()) diff --git a/requirements.txt b/requirements.txt index 7d69e3fa..2643cf67 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ babel eventlet -influxdb>=0.1.12 +influxdb>=1.0.0 iso8601 kafka-python>=0.9.2,<0.9.3 kazoo>=2.0