Migrate to Influxdb 0.9.0

Change-Id: I0119301dfd9544b68358eb04576b040f1f40ffb2
This commit is contained in:
Deklan Dieterly 2015-05-13 07:44:46 -06:00
parent 244f3ab8dc
commit 77abfc4168
4 changed files with 76 additions and 59 deletions

View File

@ -33,13 +33,13 @@ import os
import six import six
import sys import sys
import threading import threading
import urllib
from influxdb import InfluxDBClient from influxdb import InfluxDBClient
from kafka import KafkaClient from kafka import KafkaClient
from kafka import SimpleConsumer from kafka import SimpleConsumer
from kazoo.client import KazooClient from kazoo.client import KazooClient
from kazoo.recipe.partitioner import SetPartitioner from kazoo.recipe.partitioner import SetPartitioner
import pytz
from oslo.config import cfg from oslo.config import cfg
from openstack.common import log from openstack.common import log
@ -90,6 +90,10 @@ influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb')
cfg.CONF.register_group(influxdb_group) cfg.CONF.register_group(influxdb_group)
cfg.CONF.register_opts(influxdb_opts, influxdb_group) cfg.CONF.register_opts(influxdb_opts, influxdb_group)
cfg.CONF(sys.argv[1:], project='monasca', prog='persister')
log_levels = (cfg.CONF.default_log_levels)
cfg.set_defaults(log.log_opts, default_log_levels=log_levels)
log.setup("monasca-persister")
def main(): def main():
"""Start persister. """Start persister.
@ -97,11 +101,6 @@ def main():
Start metric persister and alarm persister in separate threads. Start metric persister and alarm persister in separate threads.
""" """
cfg.CONF(args=[], project='monasca', prog='persister')
log_levels = (cfg.CONF.default_log_levels)
cfg.set_defaults(log.log_opts, default_log_levels=log_levels)
log.setup("monasca-persister")
metric_persister = MetricPersister(cfg.CONF.kafka_metrics, metric_persister = MetricPersister(cfg.CONF.kafka_metrics,
cfg.CONF.influxdb, cfg.CONF.influxdb,
cfg.CONF.zookeeper) cfg.CONF.zookeeper)
@ -213,7 +212,7 @@ class AbstractPersister(threading.Thread):
self._kafka_topic = kafka_conf.topic self._kafka_topic = kafka_conf.topic
self._message_count = 0 self._message_count = 0
self._data_points = {} self._data_points = []
self._last_flush = datetime.now() self._last_flush = datetime.now()
self._last_partition_check = datetime.now() self._last_partition_check = datetime.now()
@ -224,35 +223,49 @@ class AbstractPersister(threading.Thread):
def _flush(self, partitions): def _flush(self, partitions):
if self._data_points: if self._data_points:
try: try:
self._influxdb_client.write_points(self._data_points.values(), 'ms')
self._influxdb_client.write_points(self._data_points, 'ms')
except Exception: except Exception:
log.exception("Error writing to influxdb: {}"
.format(self._data_points.values())) LOG.exception("Error writing to influxdb: {}"
.format(self._data_points))
raise raise
self._consumer.commit(partitions=partitions) self._consumer.commit(partitions=partitions)
LOG.info("Processed {} messages from topic '{}'".format( LOG.info("Processed {} messages from topic '{}'".format(
self._message_count, self._kafka_topic)) self._message_count, self._kafka_topic))
self._data_points = {}
self._data_points = []
self._message_count = 0 self._message_count = 0
self._last_flush = datetime.now() self._last_flush = datetime.now()
def _is_time_for_repartition_check(self): def _is_time_for_repartition_check(self):
delta_partition_check_time = datetime.now() - self._last_partition_check delta_partition_check_time = (datetime.now() -
self._last_partition_check)
return delta_partition_check_time.seconds >= ( return delta_partition_check_time.seconds >= (
self._partition_interval_recheck_secs) self._partition_interval_recheck_secs)
def _process_messages(self, partitions): def _process_messages(self, partitions):
while 1: while 1:
if self._is_time_for_repartition_check(): if self._is_time_for_repartition_check():
return return
delta_flush_time = datetime.now() - self._last_flush delta_flush_time = datetime.now() - self._last_flush
if delta_flush_time.seconds >= self._max_wait_time_secs: if delta_flush_time.seconds >= self._max_wait_time_secs:
self._flush(partitions) self._flush(partitions)
for message in self._consumer: for message in self._consumer:
@ -261,24 +274,21 @@ class AbstractPersister(threading.Thread):
data_point = self.process_message(message) data_point = self.process_message(message)
key = data_point['name'] self._data_points.append(data_point)
if key in self._data_points:
points = data_point['points']
self._data_points[key]['points'].extend(points)
else:
self._data_points[key] = data_point
self._message_count += 1 self._message_count += 1
if self._is_time_for_repartition_check(): if self._is_time_for_repartition_check():
return return
except Exception: except Exception:
LOG.exception('Error processing message. Message is ' LOG.exception('Error processing message. Message is '
'being dropped. {}'.format(message)) 'being dropped. {}'.format(message))
if self._message_count >= self._database_batch_size: if self._message_count >= self._database_batch_size:
self._flush(partitions) self._flush(partitions)
def _get_set_partitioner(self): def _get_set_partitioner(self):
@ -308,6 +318,7 @@ class AbstractPersister(threading.Thread):
try: try:
set_partitioner = self._get_set_partitioner() set_partitioner = self._get_set_partitioner()
partitions = [] partitions = []
while 1: while 1:
@ -347,13 +358,16 @@ class AbstractPersister(threading.Thread):
# by this instance of the persister. # by this instance of the persister.
partitioned_fetch_offsets = {} partitioned_fetch_offsets = {}
for p in partitions: for p in partitions:
partitioned_fetch_offsets[p] = ( partitioned_fetch_offsets[p] = (
self._consumer.fetch_offsets[p]) self._consumer.fetch_offsets[p])
self._consumer.fetch_offsets = partitioned_fetch_offsets self._consumer.fetch_offsets = partitioned_fetch_offsets
self._last_partition_check = datetime.now() self._last_partition_check = datetime.now()
self._process_messages(partitions) self._process_messages(partitions)
elif set_partitioner.allocating: elif set_partitioner.allocating:
@ -424,26 +438,26 @@ class AlarmPersister(AbstractPersister):
time_stamp = alarm_transitioned['timestamp'] time_stamp = alarm_transitioned['timestamp']
LOG.debug('time stamp: %s', time_stamp) LOG.debug('time stamp: %s', time_stamp)
data = {"points": [[time_stamp, sub_alarms = alarm_transitioned['subAlarms']
'{}',
tenant_id.encode('utf8'), ts = time_stamp / 1000.0
alarm_id.encode('utf8'),
alarm_definition_id.encode('utf8'), data = {"name": 'alarm_state_history',
json.dumps(metrics, ensure_ascii=False).encode( "timestamp": datetime.fromtimestamp(ts, tz=pytz.utc).strftime(
'utf8'), '%Y-%m-%dT%H:%M:%S.%fZ'),
old_state.encode('utf8'), "fields": {
new_state.encode('utf8'), "tenant_id": tenant_id.encode('utf8'),
state_change_reason.encode('utf8')]], "alarm_id": alarm_id.encode('utf8'),
"name": 'alarm_state_history', "metrics": json.dumps(metrics, ensure_ascii=False).encode('utf8'),
"columns": ["time", "new_state": new_state.encode('utf8'),
"reason_data", "old_state": old_state.encode('utf8'),
"tenant_id", "reason": state_change_reason.encode('utf8'),
"alarm_id", "reason_data": state_change_reason.encode('utf8'),
"alarm_definition_id", "sub_alarms": json.dumps(sub_alarms, ensure_ascii=False).encode('utf8')
"metrics", },
"old_state", "tags": {
"new_state", "tenant_id": tenant_id.encode('utf8')
"reason"]} }}
LOG.debug(data) LOG.debug(data)
@ -495,26 +509,29 @@ class MetricPersister(AbstractPersister):
value = metric['value'] value = metric['value']
LOG.debug('value: %s', value) LOG.debug('value: %s', value)
url_encoded_serie_name = ( if 'value_meta' in metric:
urllib.quote(tenant_id.encode('utf8'), safe='') +
'?' +
urllib.quote(region.encode('utf8'), safe='') +
'&' +
urllib.quote(metric_name.encode('utf8'), safe=''))
for dimension_name in dimensions: value_meta = metric['value_meta']
url_encoded_serie_name += (
'&' + urllib.quote(dimension_name.encode('utf8'),
safe='') + '=' + urllib.quote(
dimensions[dimension_name].encode('utf8'), safe=''))
LOG.debug("url_encoded_serie_name: %s", url_encoded_serie_name) else:
data = {"points": [[value, value_meta = ''
time_stamp]],
"name": url_encoded_serie_name, LOG.debug('value_meta: %s', value_meta)
"columns": ["value",
"time"]} ts = time_stamp / 1000.0
data = {"name": metric_name.encode('utf8'),
"timestamp": datetime.fromtimestamp(ts, tz=pytz.utc).strftime(
'%Y-%m-%dT%H:%M:%S.%fZ'),
"fields": {
"value": value,
"value_meta": json.dumps(value_meta, ensure_ascii=False).encode('utf8')
},
"tags": {
"_tenant_id": tenant_id.encode('utf8'),
"_region": region.encode('utf8')
}}
LOG.debug(data) LOG.debug(data)

View File

@ -31,4 +31,4 @@ def prepare_service(argv=None):
argv = sys.argv argv = sys.argv
cfg.CONF(argv[1:], project='persister') cfg.CONF(argv[1:], project='persister')
log.setup('persister') log.setup('persister')
LOG.info('Service has started!') LOG.info('Service has started!')

View File

@ -27,4 +27,4 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(main()) sys.exit(main())

View File

@ -1,6 +1,6 @@
babel babel
eventlet eventlet
influxdb>=0.1.12 influxdb>=1.0.0
iso8601 iso8601
kafka-python>=0.9.2,<0.9.3 kafka-python>=0.9.2,<0.9.3
kazoo>=2.0 kazoo>=2.0