Merge pull request #772 from dpkp/more_metrics

Add client, base coordinator, and a few extra producer metrics.
This commit is contained in:
Dana Powers
2016-07-26 08:07:57 -07:00
committed by GitHub
6 changed files with 138 additions and 78 deletions

View File

@@ -24,6 +24,8 @@ from .cluster import ClusterMetadata
from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
from . import errors as Errors
from .future import Future
from .metrics.stats import Avg, Count, Rate
from .metrics.stats.rate import TimeUnit
from .protocol.metadata import MetadataRequest
from .protocol.produce import ProduceRequest
from . import socketpair
@@ -65,6 +67,8 @@ class KafkaClient(object):
'api_version': None,
'api_version_auto_timeout_ms': 2000,
'selector': selectors.DefaultSelector,
'metrics': None,
'metric_group_prefix': '',
}
API_VERSIONS = [
(0, 10),
@@ -139,6 +143,9 @@ class KafkaClient(object):
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
metrics (kafka.metrics.Metrics): Optionally provide a metrics
instance for capturing network IO stats. Default: None.
metric_group_prefix (str): Prefix for metric names. Default: ''
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -167,6 +174,9 @@ class KafkaClient(object):
self._selector.register(self._wake_r, selectors.EVENT_READ)
self._closed = False
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._sensors = None
if self.config['metrics']:
self._sensors = KafkaClientMetrics(self.config['metrics'], self.config['metric_group_prefix'])
# Check Broker Version if not set explicitly
if self.config['api_version'] is None:
@@ -487,7 +497,14 @@ class KafkaClient(object):
responses = []
processed = set()
for key, events in self._selector.select(timeout):
start_select = time.time()
ready = self._selector.select(timeout)
end_select = time.time()
if self._sensors:
self._sensors.select_time.record((end_select - start_select) * 1000000000)
for key, events in ready:
if key.fileobj is self._wake_r:
self._clear_wake_fd()
continue
@@ -531,6 +548,9 @@ class KafkaClient(object):
response = conn.recv()
if response:
responses.append(response)
if self._sensors:
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
return responses
def in_flight_request_count(self, node_id=None):
@@ -848,3 +868,33 @@ class DelayedTaskQueue(object):
break
ready_tasks.append(task)
return ready_tasks
class KafkaClientMetrics(object):
def __init__(self, metrics, metric_group_prefix):
self.metrics = metrics
self.metric_group_name = metric_group_prefix + '-metrics'
self.select_time = metrics.sensor('select-time')
self.select_time.add(metrics.metric_name(
'select-rate', self.metric_group_name,
'Number of times the I/O layer checked for new I/O to perform per'
' second'), Rate(sampled_stat=Count()))
self.select_time.add(metrics.metric_name(
'io-wait-time-ns-avg', self.metric_group_name,
'The average length of time the I/O thread spent waiting for a'
' socket ready for reads or writes in nanoseconds.'), Avg())
self.select_time.add(metrics.metric_name(
'io-wait-ratio', self.metric_group_name,
'The fraction of time the I/O thread spent waiting.'),
Rate(time_unit=TimeUnit.NANOSECONDS))
self.io_time = metrics.sensor('io-time')
self.io_time.add(metrics.metric_name(
'io-time-ns-avg', self.metric_group_name,
'The average length of time for I/O per select call in nanoseconds.'),
Avg())
self.io_time.add(metrics.metric_name(
'io-ratio', self.metric_group_name,
'The fraction of time the I/O thread spent doing I/O'),
Rate(time_unit=TimeUnit.NANOSECONDS))

View File

@@ -1,3 +1,5 @@
from __future__ import absolute_import, division
import abc
import copy
import logging
@@ -6,12 +8,14 @@ import weakref
import six
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest,
LeaveGroupRequest, SyncGroupRequest)
from .heartbeat import Heartbeat
from .. import errors as Errors
from ..future import Future
from ..metrics import AnonMeasurable
from ..metrics.stats import Avg, Count, Max, Rate
from ..protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
from ..protocol.group import (HeartbeatRequest, JoinGroupRequest,
LeaveGroupRequest, SyncGroupRequest)
log = logging.getLogger('kafka.coordinator')
@@ -53,7 +57,7 @@ class BaseCoordinator(object):
'api_version': (0, 9),
}
def __init__(self, client, **configs):
def __init__(self, client, metrics, metric_group_prefix, **configs):
"""
Keyword Arguments:
group_id (str): name of the consumer group to join for dynamic
@@ -87,7 +91,8 @@ class BaseCoordinator(object):
self.needs_join_prepare = True
self.heartbeat = Heartbeat(**self.config)
self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
#self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
metric_group_prefix)
def __del__(self):
if hasattr(self, 'heartbeat_task') and self.heartbeat_task:
@@ -254,7 +259,7 @@ class BaseCoordinator(object):
continue
elif not future.retriable():
raise exception # pylint: disable-msg=raising-bad-type
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
time.sleep(self.config['retry_backoff_ms'] / 1000)
def _send_join_group_request(self):
"""Join the group and return the assignment for the next generation.
@@ -285,7 +290,7 @@ class BaseCoordinator(object):
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_join_group_response, future)
_f.add_callback(self._handle_join_group_response, future, time.time())
_f.add_errback(self._failed_request, self.coordinator_id,
request, future)
return future
@@ -300,7 +305,7 @@ class BaseCoordinator(object):
self.coordinator_dead()
future.failure(error)
def _handle_join_group_response(self, future, response):
def _handle_join_group_response(self, future, send_time, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Received successful JoinGroup response for group %s: %s",
@@ -311,7 +316,7 @@ class BaseCoordinator(object):
self.protocol = response.group_protocol
log.info("Joined group '%s' (generation %s) with member_id %s",
self.group_id, self.generation, self.member_id)
#self.sensors.join_latency.record(response.requestLatencyMs())
self.sensors.join_latency.record((time.time() - send_time) * 1000)
if response.leader_id == response.member_id:
log.info("Elected group leader -- performing partition"
" assignments using %s", self.protocol)
@@ -402,17 +407,17 @@ class BaseCoordinator(object):
return Future().failure(e)
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_sync_group_response, future)
_f.add_callback(self._handle_sync_group_response, future, time.time())
_f.add_errback(self._failed_request, self.coordinator_id,
request, future)
return future
def _handle_sync_group_response(self, future, response):
def _handle_sync_group_response(self, future, send_time, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.info("Successfully joined group %s with generation %s",
self.group_id, self.generation)
#self.sensors.syncLatency.record(response.requestLatencyMs())
self.sensors.sync_latency.record((time.time() - send_time) * 1000)
future.success(response.member_assignment)
return
@@ -540,13 +545,13 @@ class BaseCoordinator(object):
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_heartbeat_response, future)
_f.add_callback(self._handle_heartbeat_response, future, time.time())
_f.add_errback(self._failed_request, self.coordinator_id,
request, future)
return future
def _handle_heartbeat_response(self, future, response):
#self.sensors.heartbeat_latency.record(response.requestLatencyMs())
def _handle_heartbeat_response(self, future, send_time, response):
self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Received successful heartbeat response for group %s",
@@ -651,60 +656,56 @@ class HeartbeatTask(object):
def _handle_heartbeat_failure(self, e):
log.warning("Heartbeat failed (%s); retrying", e)
self._request_in_flight = False
etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000
self._client.schedule(self, etd)
'''
class GroupCoordinatorMetrics(object):
def __init__(self, metrics, prefix, tags=None):
def __init__(self, heartbeat, metrics, prefix, tags=None):
self.heartbeat = heartbeat
self.metrics = metrics
self.group_name = prefix + "-coordinator-metrics"
self.metric_group_name = prefix + "-coordinator-metrics"
self.heartbeat_latency = metrics.sensor("heartbeat-latency")
self.heartbeat_latency.add(metrics.metricName(
"heartbeat-response-time-max", self.group_name,
"The max time taken to receive a response to a heartbeat request",
tags), metrics.Max())
self.heartbeat_latency.add(metrics.metricName(
"heartbeat-rate", self.group_name,
"The average number of heartbeats per second",
tags), metrics.Rate(sampled_stat=metrics.Count()))
self.heartbeat_latency = metrics.sensor('heartbeat-latency')
self.heartbeat_latency.add(metrics.metric_name(
'heartbeat-response-time-max', self.metric_group_name,
'The max time taken to receive a response to a heartbeat request',
tags), Max())
self.heartbeat_latency.add(metrics.metric_name(
'heartbeat-rate', self.metric_group_name,
'The average number of heartbeats per second',
tags), Rate(sampled_stat=Count()))
self.join_latency = metrics.sensor("join-latency")
self.join_latency.add(metrics.metricName(
"join-time-avg", self.group_name,
"The average time taken for a group rejoin",
tags), metrics.Avg())
self.join_latency.add(metrics.metricName(
"join-time-max", self.group_name,
"The max time taken for a group rejoin",
tags), metrics.Avg())
self.join_latency.add(metrics.metricName(
"join-rate", self.group_name,
"The number of group joins per second",
tags), metrics.Rate(sampled_stat=metrics.Count()))
self.join_latency = metrics.sensor('join-latency')
self.join_latency.add(metrics.metric_name(
'join-time-avg', self.metric_group_name,
'The average time taken for a group rejoin',
tags), Avg())
self.join_latency.add(metrics.metric_name(
'join-time-max', self.metric_group_name,
'The max time taken for a group rejoin',
tags), Avg())
self.join_latency.add(metrics.metric_name(
'join-rate', self.metric_group_name,
'The number of group joins per second',
tags), Rate(sampled_stat=Count()))
self.sync_latency = metrics.sensor("sync-latency")
self.sync_latency.add(metrics.metricName(
"sync-time-avg", self.group_name,
"The average time taken for a group sync",
tags), metrics.Avg())
self.sync_latency.add(metrics.MetricName(
"sync-time-max", self.group_name,
"The max time taken for a group sync",
tags), metrics.Avg())
self.sync_latency.add(metrics.metricName(
"sync-rate", self.group_name,
"The number of group syncs per second",
tags), metrics.Rate(sampled_stat=metrics.Count()))
self.sync_latency = metrics.sensor('sync-latency')
self.sync_latency.add(metrics.metric_name(
'sync-time-avg', self.metric_group_name,
'The average time taken for a group sync',
tags), Avg())
self.sync_latency.add(metrics.metric_name(
'sync-time-max', self.metric_group_name,
'The max time taken for a group sync',
tags), Avg())
self.sync_latency.add(metrics.metric_name(
'sync-rate', self.metric_group_name,
'The number of group syncs per second',
tags), Rate(sampled_stat=Count()))
"""
lastHeartbeat = Measurable(
measure=lambda _, value: value - heartbeat.last_heartbeat_send()
)
metrics.addMetric(metrics.metricName(
"last-heartbeat-seconds-ago", self.group_name,
"The number of seconds since the last controller heartbeat",
tags), lastHeartbeat)
"""
'''
metrics.add_metric(metrics.metric_name(
'last-heartbeat-seconds-ago', self.metric_group_name,
'The number of seconds since the last controller heartbeat',
tags), AnonMeasurable(
lambda _, now: (now / 1000) - self.heartbeat.last_send))

View File

@@ -76,7 +76,10 @@ class ConsumerCoordinator(BaseCoordinator):
True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True
"""
super(ConsumerCoordinator, self).__init__(client, **configs)
super(ConsumerCoordinator, self).__init__(client,
metrics, metric_group_prefix,
**configs)
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
@@ -107,8 +110,8 @@ class ConsumerCoordinator(BaseCoordinator):
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
self._auto_commit_task.reschedule()
self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
self._subscription)
self.consumer_sensors = ConsumerCoordinatorMetrics(
metrics, metric_group_prefix, self._subscription)
def __del__(self):
if hasattr(self, '_cluster') and self._cluster:
@@ -485,7 +488,7 @@ class ConsumerCoordinator(BaseCoordinator):
def _handle_offset_commit_response(self, offsets, future, send_time, response):
# TODO look at adding request_latency_ms to response (like java kafka)
self._sensors.commit_latency.record((time.time() - send_time) * 1000)
self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
unauthorized_topics = set()
for topic, partitions in response.topics:

View File

@@ -20,8 +20,8 @@ class Heartbeat(object):
self.interval = self.config['heartbeat_interval_ms'] / 1000.0
self.timeout = self.config['session_timeout_ms'] / 1000.0
self.last_send = 0
self.last_receive = 0
self.last_send = -1 * float('inf')
self.last_receive = -1 * float('inf')
self.last_reset = time.time()
def sent_heartbeat(self):

View File

@@ -308,7 +308,8 @@ class KafkaProducer(object):
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
client = KafkaClient(**self.config)
client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
**self.config)
# Get auto-discovered version from client if necessary
if self.config['api_version'] is None:

View File

@@ -4,6 +4,7 @@ import collections
import copy
import logging
import threading
import time
import six
@@ -145,7 +146,7 @@ class Sender(threading.Thread):
log.debug('Sending Produce Request: %r', request)
(self._client.send(node_id, request)
.add_callback(
self._handle_produce_response, batches)
self._handle_produce_response, node_id, time.time(), batches)
.add_errback(
self._failed_produce, batches, node_id))
@@ -183,7 +184,7 @@ class Sender(threading.Thread):
for batch in batches:
self._complete_batch(batch, error, -1, None)
def _handle_produce_response(self, batches, response):
def _handle_produce_response(self, node_id, send_time, batches, response):
"""Handle a produce response."""
# if we have a response, parse it
log.debug('Parsing produce response: %r', response)
@@ -203,6 +204,10 @@ class Sender(threading.Thread):
batch = batches_by_partition[tp]
self._complete_batch(batch, error, offset, ts)
self._sensors.record_latency((time.time() - send_time) * 1000, node=node_id)
if response.API_VERSION > 0:
self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id)
else:
# this is the acks = 0 case, just complete all requests
for batch in batches:
@@ -495,8 +500,8 @@ class SenderMetrics(object):
def record_latency(self, latency, node=None):
self.request_time_sensor.record(latency)
if node:
sensor = self.metrics.get_sensor('node-' + node + '.latency')
if node is not None:
sensor = self.metrics.get_sensor('node-' + str(node) + '.latency')
if sensor:
sensor.record(latency)