Record produce latency and throttling metrics
This commit is contained in:
@@ -4,6 +4,7 @@ import collections
|
||||
import copy
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
import six
|
||||
|
||||
@@ -145,7 +146,7 @@ class Sender(threading.Thread):
|
||||
log.debug('Sending Produce Request: %r', request)
|
||||
(self._client.send(node_id, request)
|
||||
.add_callback(
|
||||
self._handle_produce_response, batches)
|
||||
self._handle_produce_response, node_id, time.time(), batches)
|
||||
.add_errback(
|
||||
self._failed_produce, batches, node_id))
|
||||
|
||||
@@ -183,7 +184,7 @@ class Sender(threading.Thread):
|
||||
for batch in batches:
|
||||
self._complete_batch(batch, error, -1, None)
|
||||
|
||||
def _handle_produce_response(self, batches, response):
|
||||
def _handle_produce_response(self, node_id, send_time, batches, response):
|
||||
"""Handle a produce response."""
|
||||
# if we have a response, parse it
|
||||
log.debug('Parsing produce response: %r', response)
|
||||
@@ -203,6 +204,10 @@ class Sender(threading.Thread):
|
||||
batch = batches_by_partition[tp]
|
||||
self._complete_batch(batch, error, offset, ts)
|
||||
|
||||
self._sensors.record_latency((time.time() - send_time) * 1000, node=node_id)
|
||||
if response.API_VERSION > 0:
|
||||
self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id)
|
||||
|
||||
else:
|
||||
# this is the acks = 0 case, just complete all requests
|
||||
for batch in batches:
|
||||
@@ -495,8 +500,8 @@ class SenderMetrics(object):
|
||||
|
||||
def record_latency(self, latency, node=None):
|
||||
self.request_time_sensor.record(latency)
|
||||
if node:
|
||||
sensor = self.metrics.get_sensor('node-' + node + '.latency')
|
||||
if node is not None:
|
||||
sensor = self.metrics.get_sensor('node-' + str(node) + '.latency')
|
||||
if sensor:
|
||||
sensor.record(latency)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user