Add initial producer-sender metrics

This commit is contained in:
Dana Powers 2016-07-10 21:36:41 -07:00
parent 20f4c95289
commit c34d138796
5 changed files with 261 additions and 18 deletions

View File

@ -1,4 +1,4 @@
from __future__ import absolute_import
from __future__ import absolute_import, division
import collections
import io
@ -55,6 +55,8 @@ class MessageSetBuffer(object):
self._batch_size = batch_size
self._closed = False
self._messages = 0
self._bytes_written = 4 # Int32 header is 4 bytes
self._final_size = None
def append(self, offset, message):
"""Apend a Message to the MessageSet.
@ -62,6 +64,8 @@ class MessageSetBuffer(object):
Arguments:
offset (int): offset of the message
message (Message or bytes): message struct or encoded bytes
Returns: bytes written
"""
if isinstance(message, Message):
encoded = message.encode()
@ -70,6 +74,8 @@ class MessageSetBuffer(object):
msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded
self._buffer.write(msg)
self._messages += 1
self._bytes_written += len(msg)
return len(msg)
def has_room_for(self, key, value):
if self._closed:
@ -107,16 +113,20 @@ class MessageSetBuffer(object):
self._buffer.write(Int32.encode(len(encoded)))
self._buffer.write(encoded)
# Update the message set size, and return ready for full read()
size = self._buffer.tell() - 4
# Update the message set size (less the 4 byte header),
# and return with buffer ready for full read()
self._final_size = self._buffer.tell()
self._buffer.seek(0)
self._buffer.write(Int32.encode(size))
self._buffer.write(Int32.encode(self._final_size - 4))
self._buffer.seek(0)
self._closed = True
def size_in_bytes(self):
return self._buffer.tell()
return self._final_size or self._buffer.tell()
def compression_rate(self):
return self.size_in_bytes() / self._bytes_written
def buffer(self):
return self._buffer

View File

@ -9,6 +9,7 @@ import weakref
from .. import errors as Errors
from ..client_async import KafkaClient
from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
from ..protocol.message import Message, MessageSet
from ..structs import TopicPartition
@ -220,6 +221,13 @@ class KafkaProducer(object):
api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker
api version. Only applies if api_version set to 'auto'
metric_reporters (list): A list of classes to use as metrics reporters.
Implementing the AbstractMetricsReporter interface allows plugging
in classes that will be notified of new metric creation. Default: []
metrics_num_samples (int): The number of samples maintained to compute
metrics. Default: 2
metrics_sample_window_ms (int): The maximum age in milliseconds of
samples used to compute metrics. Default: 30000
Note:
Configuration parameters are described in more detail at
@ -255,7 +263,10 @@ class KafkaProducer(object):
'ssl_keyfile': None,
'ssl_crlfile': None,
'api_version': None,
'api_version_auto_timeout_ms': 2000
'api_version_auto_timeout_ms': 2000,
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
}
def __init__(self, **configs):
@ -285,6 +296,14 @@ class KafkaProducer(object):
log.warning('use api_version=%s (%s is deprecated)',
str(self.config['api_version']), deprecated)
# Configure metrics
metrics_tags = {'client-id': self.config['client_id']}
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
time_window_ms=self.config['metrics_sample_window_ms'],
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
client = KafkaClient(**self.config)
# Get auto-discovered version from client if necessary
@ -298,7 +317,8 @@ class KafkaProducer(object):
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
self._sender = Sender(client, self._metadata, self._accumulator,
self._sender = Sender(client, self._metadata,
self._accumulator, self._metrics,
guarantee_message_order=guarantee_message_order,
**self.config)
self._sender.daemon = True
@ -382,6 +402,7 @@ class KafkaProducer(object):
if not invoked_from_callback:
self._sender.join()
self._metrics.close()
try:
self.config['key_serializer'].close()
except AttributeError:

View File

@ -38,7 +38,7 @@ class AtomicInteger(object):
class RecordBatch(object):
def __init__(self, tp, records, message_version=0):
self.record_count = 0
#self.max_record_size = 0 # for metrics only
self.max_record_size = 0
now = time.time()
self.created = now
self.drained = None
@ -56,8 +56,8 @@ class RecordBatch(object):
return None
msg = Message(value, key=key, magic=self.message_version)
self.records.append(self.record_count, msg)
# self.max_record_size = max(self.max_record_size, Record.record_size(key, value)) # for metrics only
record_size = self.records.append(self.record_count, msg)
self.max_record_size = max(self.max_record_size, record_size)
self.last_append = time.time()
future = FutureRecordMetadata(self.produce_future, self.record_count,
timestamp_ms)

View File

@ -1,4 +1,4 @@
from __future__ import absolute_import
from __future__ import absolute_import, division
import collections
import copy
@ -8,9 +8,11 @@ import threading
import six
from .. import errors as Errors
from ..metrics.measurable import AnonMeasurable
from ..metrics.stats import Avg, Count, Max, Rate
from ..protocol.produce import ProduceRequest
from ..structs import TopicPartition
from ..version import __version__
from ..protocol.produce import ProduceRequest
log = logging.getLogger(__name__)
@ -31,7 +33,7 @@ class Sender(threading.Thread):
'api_version': (0, 8, 0),
}
def __init__(self, client, metadata, accumulator, **configs):
def __init__(self, client, metadata, accumulator, metrics, **configs):
super(Sender, self).__init__()
self.config = copy.copy(self._DEFAULT_CONFIG)
for key in self.config:
@ -45,6 +47,7 @@ class Sender(threading.Thread):
self._running = True
self._force_close = False
self._topics_to_add = set()
self._sensors = SenderMetrics(metrics, self._client, self._metadata)
def run(self):
"""The main run loop for the sender thread."""
@ -119,7 +122,10 @@ class Sender(threading.Thread):
expired_batches = self._accumulator.abort_expired_batches(
self.config['request_timeout_ms'], self._metadata)
for expired_batch in expired_batches:
self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count)
self._sensors.update_produce_request_metrics(batches_by_node)
requests = self._create_produce_requests(batches_by_node)
# If we have any nodes that are ready to send + have sendable data,
# poll with 0 timeout so this can immediately loop and try sending more
@ -223,6 +229,7 @@ class Sender(threading.Thread):
self.config['retries'] - batch.attempts - 1,
error)
self._accumulator.reenqueue(batch)
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
else:
if error is Errors.TopicAuthorizationFailedError:
error = error(batch.topic_partition.topic)
@ -230,6 +237,8 @@ class Sender(threading.Thread):
# tell the user the result of their request
batch.done(base_offset, timestamp_ms, error)
self._accumulator.deallocate(batch)
if error is not None:
self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
if getattr(error, 'invalid_metadata', False):
self._metadata.request_update()
@ -296,3 +305,200 @@ class Sender(threading.Thread):
def wakeup(self):
"""Wake up the selector associated with this send thread."""
self._client.wakeup()
class SenderMetrics(object):
def __init__(self, metrics, client, metadata):
self.metrics = metrics
self._client = client
self._metadata = metadata
sensor_name = 'batch-size'
self.batch_size_sensor = self.metrics.sensor(sensor_name)
self.add_metric('batch-size-avg', Avg(),
sensor_name=sensor_name,
description='The average number of bytes sent per partition per-request.')
self.add_metric('batch-size-max', Max(),
sensor_name=sensor_name,
description='The max number of bytes sent per partition per-request.')
sensor_name = 'compression-rate'
self.compression_rate_sensor = self.metrics.sensor(sensor_name)
self.add_metric('compression-rate-avg', Avg(),
sensor_name=sensor_name,
description='The average compression rate of record batches.')
sensor_name = 'queue-time'
self.queue_time_sensor = self.metrics.sensor(sensor_name)
self.add_metric('record-queue-time-avg', Avg(),
sensor_name=sensor_name,
description='The average time in ms record batches spent in the record accumulator.')
self.add_metric('record-queue-time-max', Max(),
sensor_name=sensor_name,
description='The maximum time in ms record batches spent in the record accumulator.')
sensor_name = 'request-time'
self.request_time_sensor = self.metrics.sensor(sensor_name)
self.add_metric('request-latency-avg', Avg(),
sensor_name=sensor_name,
description='The average request latency in ms')
self.add_metric('request-latency-max', Max(),
sensor_name=sensor_name,
description='The maximum request latency in ms')
sensor_name = 'produce-throttle-time'
self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name)
self.add_metric('produce-throttle-time-avg', Avg(),
sensor_name=sensor_name,
description='The average throttle time in ms')
self.add_metric('produce-throttle-time-max', Max(),
sensor_name=sensor_name,
description='The maximum throttle time in ms')
sensor_name = 'records-per-request'
self.records_per_request_sensor = self.metrics.sensor(sensor_name)
self.add_metric('record-send-rate', Rate(),
sensor_name=sensor_name,
description='The average number of records sent per second.')
self.add_metric('records-per-request-avg', Avg(),
sensor_name=sensor_name,
description='The average number of records per request.')
sensor_name = 'bytes'
self.byte_rate_sensor = self.metrics.sensor(sensor_name)
self.add_metric('byte-rate', Rate(),
sensor_name=sensor_name,
description='The average number of bytes sent per second.')
sensor_name = 'record-retries'
self.retry_sensor = self.metrics.sensor(sensor_name)
self.add_metric('record-retry-rate', Rate(),
sensor_name=sensor_name,
description='The average per-second number of retried record sends')
sensor_name = 'errors'
self.error_sensor = self.metrics.sensor(sensor_name)
self.add_metric('record-error-rate', Rate(),
sensor_name=sensor_name,
description='The average per-second number of record sends that resulted in errors')
sensor_name = 'record-size-max'
self.max_record_size_sensor = self.metrics.sensor(sensor_name)
self.add_metric('record-size-max', Max(),
sensor_name=sensor_name,
description='The maximum record size across all batches')
self.add_metric('record-size-avg', Avg(),
sensor_name=sensor_name,
description='The average maximum record size per batch')
self.add_metric('requests-in-flight',
AnonMeasurable(lambda *_: self._client.in_flight_request_count()),
description='The current number of in-flight requests awaiting a response.')
self.add_metric('metadata-age',
AnonMeasurable(lambda _, now: (now - self._metadata._last_successful_refresh_ms) / 1000),
description='The age in seconds of the current producer metadata being used.')
def add_metric(self, metric_name, measurable, group_name='producer-metrics',
description=None, tags=None,
sensor_name=None):
m = self.metrics
metric = m.metric_name(metric_name, group_name, description, tags)
if sensor_name:
sensor = m.sensor(sensor_name)
sensor.add(metric, measurable)
else:
m.add_metric(metric, measurable)
def maybe_register_topic_metrics(self, topic):
def sensor_name(name):
return 'topic.{0}.{1}'.format(topic, name)
# if one sensor of the metrics has been registered for the topic,
# then all other sensors should have been registered; and vice versa
if not self.metrics.get_sensor(sensor_name('records-per-batch')):
self.add_metric('record-send-rate', Rate(),
sensor_name=sensor_name('records-per-batch'),
group_name='producer-topic-metrics.' + topic,
description= 'Records sent per second for topic ' + topic)
self.add_metric('byte-rate', Rate(),
sensor_name=sensor_name('bytes'),
group_name='producer-topic-metrics.' + topic,
description='Bytes per second for topic ' + topic)
self.add_metric('compression-rate', Avg(),
sensor_name=sensor_name('compression-rate'),
group_name='producer-topic-metrics.' + topic,
description='Average Compression ratio for topic ' + topic)
self.add_metric('record-retry-rate', Rate(),
sensor_name=sensor_name('record-retries'),
group_name='producer-topic-metrics.' + topic,
description='Record retries per second for topic ' + topic)
self.add_metric('record-error-rate', Rate(),
sensor_name=sensor_name('record-errors'),
group_name='producer-topic-metrics.' + topic,
description='Record errors per second for topic ' + topic)
def update_produce_request_metrics(self, batches_map):
for node_batch in batches_map.values():
records = 0
total_bytes = 0
for batch in node_batch:
# register all per-topic metrics at once
topic = batch.topic_partition.topic
self.maybe_register_topic_metrics(topic)
# per-topic record send rate
topic_records_count = self.metrics.get_sensor(
'topic.' + topic + '.records-per-batch')
topic_records_count.record(batch.record_count)
# per-topic bytes send rate
topic_byte_rate = self.metrics.get_sensor(
'topic.' + topic + '.bytes')
topic_byte_rate.record(batch.records.size_in_bytes())
# per-topic compression rate
topic_compression_rate = self.metrics.get_sensor(
'topic.' + topic + '.compression-rate')
topic_compression_rate.record(batch.records.compression_rate())
# global metrics
self.batch_size_sensor.record(batch.records.size_in_bytes())
if batch.drained:
self.queue_time_sensor.record(batch.drained - batch.created)
self.compression_rate_sensor.record(batch.records.compression_rate())
self.max_record_size_sensor.record(batch.max_record_size)
records += batch.record_count
total_bytes += batch.records.size_in_bytes()
self.records_per_request_sensor.record(records)
self.byte_rate_sensor.record(total_bytes)
def record_retries(self, topic, count):
self.retry_sensor.record(count)
sensor = self.metrics.get_sensor('topic.' + topic + '.record-retries')
if sensor:
sensor.record(count)
def record_errors(self, topic, count):
self.error_sensor.record(count)
sensor = self.metrics.get_sensor('topic.' + topic + '.record-errors')
if sensor:
sensor.record(count)
def record_latency(self, latency, node=None):
self.request_time_sensor.record(latency)
if node:
sensor = self.metrics.get_sensor('node-' + node + '.latency')
if sensor:
sensor.record(latency)
def record_throttle_time(self, throttle_time_ms, node=None):
self.produce_throttle_time_sensor.record(throttle_time_ms)

View File

@ -7,12 +7,13 @@ import pytest
from kafka.client_async import KafkaClient
from kafka.cluster import ClusterMetadata
from kafka.producer.buffer import MessageSetBuffer
from kafka.producer.sender import Sender
from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics import Metrics
from kafka.producer.buffer import MessageSetBuffer
from kafka.protocol.produce import ProduceRequest
from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch
from kafka.producer.sender import Sender
from kafka.structs import TopicPartition, OffsetAndMetadata
@ -29,8 +30,13 @@ def accumulator():
@pytest.fixture
def sender(client, accumulator):
return Sender(client, client.cluster, accumulator)
def metrics():
return Metrics()
@pytest.fixture
def sender(client, accumulator, metrics):
return Sender(client, client.cluster, accumulator, metrics)
@pytest.mark.parametrize(("api_version", "produce_version"), [