Python persister batches data points with common name
Removed some checked in .pyc files Change-Id: I39896e4669ca98378d1d6c5825405fe24e667093
This commit is contained in:
parent
73b81916c8
commit
fc49e2aad3
@ -212,7 +212,8 @@ class AbstractPersister(threading.Thread):
|
||||
self._database_batch_size = kafka_conf.database_batch_size
|
||||
self._kafka_topic = kafka_conf.topic
|
||||
|
||||
self._json_body = []
|
||||
self._message_count = 0
|
||||
self._data_points = {}
|
||||
self._last_flush = datetime.now()
|
||||
self._last_partition_check = datetime.now()
|
||||
|
||||
@ -222,12 +223,19 @@ class AbstractPersister(threading.Thread):
|
||||
|
||||
def _flush(self, partitions):
|
||||
|
||||
if self._json_body:
|
||||
self._influxdb_client.write_points(self._json_body)
|
||||
if self._data_points:
|
||||
try:
|
||||
self._influxdb_client.write_points(self._data_points.values())
|
||||
except Exception:
|
||||
log.exception("Error writing to influxdb: {}"
|
||||
.format(self._data_points.values()))
|
||||
raise
|
||||
|
||||
self._consumer.commit(partitions=partitions)
|
||||
LOG.info("Processed {} messages from topic '{}'".format(
|
||||
len(self._json_body), self._kafka_topic))
|
||||
self._json_body = []
|
||||
self._message_count, self._kafka_topic))
|
||||
self._data_points = {}
|
||||
self._message_count = 0
|
||||
self._last_flush = datetime.now()
|
||||
|
||||
def _is_time_for_repartition_check(self):
|
||||
@ -238,7 +246,6 @@ class AbstractPersister(threading.Thread):
|
||||
self._partition_interval_recheck_secs)
|
||||
|
||||
def _process_messages(self, partitions):
|
||||
|
||||
while 1:
|
||||
|
||||
if self._is_time_for_repartition_check():
|
||||
@ -252,7 +259,17 @@ class AbstractPersister(threading.Thread):
|
||||
|
||||
try:
|
||||
|
||||
self._json_body.append(self.process_message(message))
|
||||
data_point = self.process_message(message)
|
||||
|
||||
key = data_point['name']
|
||||
|
||||
if key in self._data_points:
|
||||
points = data_point['points']
|
||||
self._data_points[key]['points'].extend(points)
|
||||
else:
|
||||
self._data_points[key] = data_point
|
||||
|
||||
self._message_count += 1
|
||||
|
||||
if self._is_time_for_repartition_check():
|
||||
return
|
||||
@ -261,7 +278,7 @@ class AbstractPersister(threading.Thread):
|
||||
LOG.exception('Error processing message. Message is '
|
||||
'being dropped. {}'.format(message))
|
||||
|
||||
if len(self._json_body) >= self._database_batch_size:
|
||||
if self._message_count >= self._database_batch_size:
|
||||
self._flush(partitions)
|
||||
|
||||
def _get_set_partitioner(self):
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user