Increase throughput of persister

Add batching to InfluxDB writes to improve throughput.

Change-Id: Ia173f55726cb11245f0bcf4580f1af8129c23aa3
This commit is contained in:
Deklan Dieterly 2014-10-02 08:33:19 -06:00
parent a58acce9b4
commit d3acf12034
6 changed files with 164 additions and 89 deletions

1
.gitignore vendored
View File

@ -5,3 +5,4 @@ target/
debs/
logs/
.idea/
*.pyc

View File

@ -10,10 +10,14 @@ alarm_history_group_id = 1_alarm-state-transitions
alarm_history_topic = alarm-state-transitions
alarm_history_consumer_id = 1
alarm_history_client_id = 1
alarm_batch_size = 1000
alarm_max_wait_time_seconds = 30
metrics_group_id = 1_metrics
metrics_topic = metrics
metrics_consumer_id = 1
metrics_client_id = 1
metrics_batch_size = 1000
metrics_max_wait_time_seconds = 30
[influxdb]
database_name = test

View File

@ -23,19 +23,22 @@
Also able to use Openstack service to start the persister.
"""
import threading
from kafka import KafkaClient, SimpleConsumer
from influxdb import InfluxDBClient
from datetime import datetime
import json
import urllib
import sys
import threading
import urllib
from influxdb import InfluxDBClient
from kafka import KafkaClient
from kafka import SimpleConsumer
from oslo.config import cfg
from openstack.common import log
from openstack.common import service as os_service
import service
LOG = log.getLogger(__name__)
kafka_opts = [
@ -44,10 +47,14 @@ kafka_opts = [
cfg.StrOpt('alarm_history_topic'),
cfg.StrOpt('alarm_history_consumer_id'),
cfg.StrOpt('alarm_history_client_id'),
cfg.IntOpt('alarm_batch_size'),
cfg.IntOpt('alarm_max_wait_time_seconds'),
cfg.StrOpt('metrics_group_id'),
cfg.StrOpt('metrics_topic'),
cfg.StrOpt('metrics_consumer_id'),
cfg.StrOpt('metrics_client_id')
cfg.StrOpt('metrics_client_id'),
cfg.IntOpt('metrics_batch_size'),
cfg.IntOpt('metrics_max_wait_time_seconds')
]
kafka_group = cfg.OptGroup(name='kafka',
@ -108,25 +115,41 @@ class AlarmPersister(threading.Thread):
def run(self):
count = 0
json_body = []
last_flush = datetime.now()
try:
kafka = KafkaClient(self.conf.kafka.uri)
consumer = SimpleConsumer(kafka,
self.conf.kafka.alarm_history_group_id,
self.conf.kafka.alarm_history_topic,
auto_commit=True)
auto_commit=False, iter_timeout=1)
influxdb_client = InfluxDBClient(self.conf.influxdb.ip_address,
self.conf.influxdb.port,
self.conf.influxdb.user,
self.conf.influxdb.password,
self.conf.influxdb.database_name)
while (True):
delta_time = datetime.now() - last_flush
if (delta_time.seconds >
self.conf.kafka.alarm_max_wait_time_seconds):
if json_body:
influxdb_client.write_points(json_body)
consumer.commit()
last_flush = datetime.now()
count = 0
json_body = []
for message in consumer:
LOG.debug(message.message.value.decode('utf8'))
decoded = json.loads(message.message.value)
LOG.debug(json.dumps(decoded, sort_keys=True, indent=4))
LOG.debug(
json.dumps(decoded, sort_keys=True, indent=4))
actions_enabled = decoded['alarm-transitioned'][
'actionsEnabled']
@ -139,6 +162,13 @@ class AlarmPersister(threading.Thread):
alarm_id = decoded['alarm-transitioned']['alarmId']
LOG.debug('alarm id: %s', alarm_id)
alarm_definition_id = decoded['alarm-transitioned'][
'alarmDefinitionId']
LOG.debug('alarm definition id: %s', alarm_definition_id)
metrics = decoded['alarm-transitioned']['metrics']
LOG.debug('metrics: %s', metrics)
alarm_name = decoded['alarm-transitioned']['alarmName']
LOG.debug('alarm name: %s', alarm_name)
@ -148,9 +178,10 @@ class AlarmPersister(threading.Thread):
old_state = decoded['alarm-transitioned']['oldState']
LOG.debug('old state: %s', old_state)
state_changeReason = decoded['alarm-transitioned'][
state_change_reason = decoded['alarm-transitioned'][
'stateChangeReason']
LOG.debug('state change reason: %s', state_changeReason)
LOG.debug('state change reason: %s',
state_change_reason)
tenant_id = decoded['alarm-transitioned']['tenantId']
LOG.debug('tenant id: %s', tenant_id)
@ -158,18 +189,30 @@ class AlarmPersister(threading.Thread):
time_stamp = decoded['alarm-transitioned']['timestamp']
LOG.debug('time stamp: %s', time_stamp)
json_body = [
{"points": [
data = {"points": [
[time_stamp, '{}', tenant_id.encode('utf8'),
alarm_id.encode('utf8'), old_state.encode('utf8'),
alarm_id.encode('utf8'),
alarm_definition_id.encode('utf8'),
json.dumps(metrics).encode('utf8'), old_state.encode('utf8'),
new_state.encode('utf8'),
state_changeReason.encode('utf8')]],
state_change_reason.encode('utf8')]],
"name": 'alarm_state_history',
"columns": ["time", "reason_data", "tenant_id",
"alarm_id", "old_state", "new_state",
"reason"]}]
"alarm_id", "alarm_definition_id",
"metrics", "old_state",
"new_state",
"reason"]}
LOG.debug(data)
json_body.append(data)
count += 1
if count % self.conf.kafka.alarm_batch_size == 0:
influxdb_client.write_points(json_body)
consumer.commit()
last_flush = datetime.now()
count = 0
json_body = []
except Exception:
LOG.exception(
@ -187,13 +230,17 @@ class MetricPersister(threading.Thread):
def run(self):
count = 0
json_body = []
last_flush = datetime.now()
try:
kafka = KafkaClient(self.conf.kafka.uri)
consumer = SimpleConsumer(kafka,
self.conf.kafka.metrics_group_id,
self.conf.kafka.metrics_topic,
auto_commit=True)
auto_commit=False, iter_timeout=1)
influxdb_client = InfluxDBClient(self.conf.influxdb.ip_address,
self.conf.influxdb.port,
@ -201,11 +248,24 @@ class MetricPersister(threading.Thread):
self.conf.influxdb.password,
self.conf.influxdb.database_name)
while (True):
delta_time = datetime.now() - last_flush
if (delta_time.seconds >
self.conf.kafka.metrics_max_wait_time_seconds):
if json_body:
influxdb_client.write_points(json_body)
consumer.commit()
last_flush = datetime.now()
count = 0
json_body = []
for message in consumer:
LOG.debug(message.message.value.decode('utf8'))
decoded = json.loads(message.message.value)
LOG.debug(json.dumps(decoded, sort_keys=True, indent=4))
LOG.debug(
json.dumps(decoded, sort_keys=True, indent=4))
metric_name = decoded['metric']['name']
LOG.debug('name: %s', metric_name)
@ -221,9 +281,11 @@ class MetricPersister(threading.Thread):
dimensions = {}
if 'dimensions' in decoded['metric']:
for dimension_name in decoded['metric']['dimensions']:
for dimension_name in decoded['metric'][
'dimensions']:
dimensions[dimension_name] = (
decoded['metric']['dimensions'][dimension_name])
decoded['metric']['dimensions'][
dimension_name])
LOG.debug('dimension: %s : %s', dimension_name,
dimensions[dimension_name])
@ -235,8 +297,10 @@ class MetricPersister(threading.Thread):
url_encoded_serie_name = (
urllib.quote(metric_name.encode('utf8'), safe='')
+ '?' + urllib.quote(tenant_id.encode('utf8'), safe='')
+ '&' + urllib.quote(region.encode('utf8'), safe=''))
+ '?' + urllib.quote(tenant_id.encode('utf8'),
safe='')
+ '&' + urllib.quote(region.encode('utf8'),
safe=''))
for dimension_name in dimensions:
url_encoded_serie_name += ('&'
@ -244,20 +308,26 @@ class MetricPersister(threading.Thread):
dimension_name.encode('utf8'), safe='')
+ '='
+ urllib.quote(
dimensions[dimension_name].encode('utf8'), safe=''))
dimensions[dimension_name].encode('utf8'),
safe=''))
LOG.debug("url_encoded_serie_name: %s", url_encoded_serie_name)
LOG.debug("url_encoded_serie_name: %s",
url_encoded_serie_name)
json_body = [
{"points": [[value, time_stamp]],
data = {"points": [[value, time_stamp]],
"name": url_encoded_serie_name,
"columns": ["value", "time"]}]
"columns": ["value", "time"]}
LOG.debug(json_body)
LOG.debug(data)
json_body.append(data)
count += 1
if count % self.conf.kafka.metrics_batch_size == 0:
influxdb_client.write_points(json_body)
consumer.commit()
last_flush = datetime.now()
count = 0
json_body = []
except Exception:
LOG.exception(
@ -277,4 +347,3 @@ def main_service():
# Used if run without Openstack service.
if __name__ == "__main__":
sys.exit(main())

View File

@ -18,6 +18,7 @@
"""
import sys
from persister import main_service
@ -27,4 +28,3 @@ def main():
if __name__ == "__main__":
sys.exit(main())

View File

@ -29,3 +29,4 @@ max-line-length = 120
[wheel]
universal = 1

View File

@ -28,5 +28,5 @@ max-line-length = 120
# H307 like imports should be grouped together
# H405 multi line docstring summary not separated with an empty line
# H904 Wrap long lines in parentheses instead of a backslash
ignore = F821,H201,H302,H305,H307,H405,H904
ignore = F821,H201,H302,H305,H307,H405,H904,E126,E125,H306,E302,E122
exclude=.venv,.git,.tox,dist,*openstack/common*,*egg,build