instrument metrics for fetch requests

This commit is contained in:
Zack Dever
2016-04-13 13:52:36 -07:00
parent 81dc89a4fd
commit e2b340c440
3 changed files with 88 additions and 59 deletions

View File

@@ -3,11 +3,13 @@ from __future__ import absolute_import
import collections
import copy
import logging
import time
import six
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
@@ -40,7 +42,8 @@ class Fetcher(six.Iterator):
'api_version': (0, 8, 0),
}
def __init__(self, client, subscriptions, **configs):
def __init__(self, client, subscriptions, metrics, metric_group_prefix,
**configs):
"""Initialize a Kafka Message Fetcher.
Keyword Arguments:
@@ -68,8 +71,6 @@ class Fetcher(six.Iterator):
the messages occurred. This check adds some overhead, so it may
be disabled in cases seeking extreme performance. Default: True
"""
#metrics=None,
#metric_group_prefix='consumer',
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
@@ -83,8 +84,7 @@ class Fetcher(six.Iterator):
self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
self._fetch_futures = collections.deque()
#self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
self._sensors = FetchManagerMetrics(metrics, metric_group_prefix)
def init_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.
@@ -109,7 +109,7 @@ class Fetcher(six.Iterator):
if self._client.ready(node_id):
log.debug("Sending FetchRequest to node %s", node_id)
future = self._client.send(node_id, request)
future.add_callback(self._handle_fetch_response, request)
future.add_callback(self._handle_fetch_response, request, time.time())
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
futures.append(future)
self._fetch_futures.extend(futures)
@@ -575,10 +575,11 @@ class Fetcher(six.Iterator):
partition_data.items())
return requests
def _handle_fetch_response(self, request, response):
def _handle_fetch_response(self, request, send_time, response):
"""The callback for fetch completion"""
#total_bytes = 0
#total_count = 0
total_bytes = 0
total_count = 0
recv_time = time.time()
fetch_offsets = {}
for topic, partitions in request.topics:
@@ -609,6 +610,7 @@ class Fetcher(six.Iterator):
position)
continue
num_bytes = 0
partial = None
if messages and isinstance(messages[-1][-1], PartialMessage):
partial = messages.pop()
@@ -618,18 +620,18 @@ class Fetcher(six.Iterator):
" offset %d to buffered record list", tp,
position)
self._records.append((fetch_offset, tp, messages))
#last_offset, _, _ = messages[-1]
#self.sensors.records_fetch_lag.record(highwater - last_offset)
last_offset, _, _ = messages[-1]
self._sensors.records_fetch_lag.record(highwater - last_offset)
num_bytes = sum(msg[1] for msg in messages)
elif partial:
# we did not read a single message from a non-empty
# buffer because that message's size is larger than
# fetch size, in this case record this exception
self._record_too_large_partitions[tp] = fetch_offset
# TODO: bytes metrics
#self.sensors.record_topic_fetch_metrics(tp.topic, num_bytes, parsed.size());
#totalBytes += num_bytes;
#totalCount += parsed.size();
self._sensors.record_topic_fetch_metrics(topic, num_bytes, len(messages))
total_bytes += num_bytes
total_count += len(messages)
elif error_type in (Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError):
self._client.cluster.request_update()
@@ -649,56 +651,82 @@ class Fetcher(six.Iterator):
else:
raise error_type('Unexpected error while fetching data')
"""TOOD - metrics
self.sensors.bytesFetched.record(totalBytes)
self.sensors.recordsFetched.record(totalCount)
self.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime())
self.sensors.fetchLatency.record(resp.requestLatencyMs())
self._sensors.bytes_fetched.record(total_bytes)
self._sensors.records_fetched.record(total_count)
self._sensors.fetch_throttle_time_sensor.record(response['throttle_time_ms'])
self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
class FetchManagerMetrics(object):
def __init__(self, metrics, prefix):
self.metrics = metrics
self.group_name = prefix + "-fetch-manager-metrics"
self.group_name = '%s-fetch-manager-metrics' % prefix
self.bytes_fetched = metrics.sensor("bytes-fetched")
self.bytes_fetched.add(metrics.metricName("fetch-size-avg", self.group_name,
"The average number of bytes fetched per request"), metrics.Avg())
self.bytes_fetched.add(metrics.metricName("fetch-size-max", self.group_name,
"The maximum number of bytes fetched per request"), metrics.Max())
self.bytes_fetched.add(metrics.metricName("bytes-consumed-rate", self.group_name,
"The average number of bytes consumed per second"), metrics.Rate())
self.bytes_fetched = metrics.sensor('bytes-fetched')
self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name,
'The average number of bytes fetched per request'), Avg())
self.bytes_fetched.add(metrics.metric_name('fetch-size-max', self.group_name,
'The maximum number of bytes fetched per request'), Max())
self.bytes_fetched.add(metrics.metric_name('bytes-consumed-rate', self.group_name,
'The average number of bytes consumed per second'), Rate())
self.records_fetched = self.metrics.sensor("records-fetched")
self.records_fetched.add(metrics.metricName("records-per-request-avg", self.group_name,
"The average number of records in each request"), metrics.Avg())
self.records_fetched.add(metrics.metricName("records-consumed-rate", self.group_name,
"The average number of records consumed per second"), metrics.Rate())
self.records_fetched = self.metrics.sensor('records-fetched')
self.records_fetched.add(metrics.metric_name('records-per-request-avg', self.group_name,
'The average number of records in each request'), Avg())
self.records_fetched.add(metrics.metric_name('records-consumed-rate', self.group_name,
'The average number of records consumed per second'), Rate())
self.fetch_latency = metrics.sensor("fetch-latency")
self.fetch_latency.add(metrics.metricName("fetch-latency-avg", self.group_name,
"The average time taken for a fetch request."), metrics.Avg())
self.fetch_latency.add(metrics.metricName("fetch-latency-max", self.group_name,
"The max time taken for any fetch request."), metrics.Max())
self.fetch_latency.add(metrics.metricName("fetch-rate", self.group_name,
"The number of fetch requests per second."), metrics.Rate(sampled_stat=metrics.Count()))
self.fetch_latency = metrics.sensor('fetch-latency')
self.fetch_latency.add(metrics.metric_name('fetch-latency-avg', self.group_name,
'The average time taken for a fetch request.'), Avg())
self.fetch_latency.add(metrics.metric_name('fetch-latency-max', self.group_name,
'The max time taken for any fetch request.'), Max())
self.fetch_latency.add(metrics.metric_name('fetch-rate', self.group_name,
'The number of fetch requests per second.'), Rate(sampled_stat=Count()))
self.records_fetch_lag = metrics.sensor("records-lag")
self.records_fetch_lag.add(metrics.metricName("records-lag-max", self.group_name,
"The maximum lag in terms of number of records for any partition in self window"), metrics.Max())
self.records_fetch_lag = metrics.sensor('records-lag')
self.records_fetch_lag.add(metrics.metric_name('records-lag-max', self.group_name,
'The maximum lag in terms of number of records for any partition in self window'), Max())
self.fetch_throttle_time_sensor = metrics.sensor("fetch-throttle-time")
self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-avg", self.group_name,
"The average throttle time in ms"), metrics.Avg())
self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-max", self.group_name,
"The maximum throttle time in ms"), metrics.Max())
self.fetch_throttle_time_sensor = metrics.sensor('fetch-throttle-time')
self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-avg', self.group_name,
'The average throttle time in ms'), Avg())
self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-max', self.group_name,
'The maximum throttle time in ms'), Max())
def record_topic_fetch_metrics(topic, num_bytes, num_records):
# record bytes fetched
name = '.'.join(["topic", topic, "bytes-fetched"])
self.metrics[name].record(num_bytes);
def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
metric_tags = {'topic': topic.replace('.', '_')}
# record records fetched
name = '.'.join(["topic", topic, "records-fetched"])
self.metrics[name].record(num_records)
"""
# record bytes fetched
name = '.'.join(['topic', topic, 'bytes-fetched'])
bytes_fetched = self.metrics.get_sensor(name)
if not bytes_fetched:
bytes_fetched = self.metrics.sensor(name)
bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
self.group_name,
'The average number of bytes fetched per request for topic %s' % topic,
metric_tags), Avg())
bytes_fetched.add(self.metrics.metric_name('fetch-size-max',
self.group_name,
'The maximum number of bytes fetched per request for topic %s' % topic,
metric_tags), Max())
bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate',
self.group_name,
'The average number of bytes consumed per second for topic %s' % topic,
metric_tags), Rate())
bytes_fetched.record(num_bytes)
# record records fetched
name = '.'.join(['topic', topic, 'records-fetched'])
records_fetched = self.metrics.get_sensor(name)
if not records_fetched:
records_fetched = self.metrics.sensor(name)
records_fetched.add(self.metrics.metric_name('records-per-request-avg',
self.group_name,
'The average number of records in each request for topic %s' % topic,
metric_tags), Avg())
records_fetched.add(self.metrics.metric_name('records-consumed-rate',
self.group_name,
'The average number of records consumed per second for topic %s' % topic,
metric_tags), Rate())
records_fetched.record(num_records)

View File

@@ -218,7 +218,7 @@ class KafkaConsumer(six.Iterator):
reporters.append(DictReporter('kafka.consumer'))
self._metrics = Metrics(metric_config, reporters)
metric_group_prefix = 'consumer'
# TODO _metrics likely needs to be passed to KafkaClient, Fetcher, etc.
# TODO _metrics likely needs to be passed to KafkaClient, etc.
self._client = KafkaClient(**self.config)
@@ -233,7 +233,7 @@ class KafkaConsumer(six.Iterator):
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
self._client, self._subscription, **self.config)
self._client, self._subscription, self._metrics, metric_group_prefix, **self.config)
self._coordinator = ConsumerCoordinator(
self._client, self._subscription, self._metrics, metric_group_prefix,
assignors=self.config['partition_assignment_strategy'],

View File

@@ -8,6 +8,7 @@ from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics import Metrics
from kafka.protocol.fetch import FetchRequest
from kafka.structs import TopicPartition, OffsetAndMetadata
@@ -29,7 +30,7 @@ def fetcher(client, subscription_state):
subscription_state.assign_from_subscribed(assignment)
for tp in assignment:
subscription_state.seek(tp, 0)
return Fetcher(client, subscription_state)
return Fetcher(client, subscription_state, Metrics(), 'test_fetcher')
def test_init_fetches(fetcher, mocker):