Add base coordinator metrics
This commit is contained in:
@@ -1,3 +1,5 @@
|
|||||||
|
from __future__ import absolute_import, division
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
import copy
|
import copy
|
||||||
import logging
|
import logging
|
||||||
@@ -6,12 +8,14 @@ import weakref
|
|||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
import kafka.errors as Errors
|
|
||||||
from kafka.future import Future
|
|
||||||
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
|
|
||||||
from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest,
|
|
||||||
LeaveGroupRequest, SyncGroupRequest)
|
|
||||||
from .heartbeat import Heartbeat
|
from .heartbeat import Heartbeat
|
||||||
|
from .. import errors as Errors
|
||||||
|
from ..future import Future
|
||||||
|
from ..metrics import AnonMeasurable
|
||||||
|
from ..metrics.stats import Avg, Count, Max, Rate
|
||||||
|
from ..protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
|
||||||
|
from ..protocol.group import (HeartbeatRequest, JoinGroupRequest,
|
||||||
|
LeaveGroupRequest, SyncGroupRequest)
|
||||||
|
|
||||||
log = logging.getLogger('kafka.coordinator')
|
log = logging.getLogger('kafka.coordinator')
|
||||||
|
|
||||||
@@ -53,7 +57,7 @@ class BaseCoordinator(object):
|
|||||||
'api_version': (0, 9),
|
'api_version': (0, 9),
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, client, **configs):
|
def __init__(self, client, metrics, metric_group_prefix, **configs):
|
||||||
"""
|
"""
|
||||||
Keyword Arguments:
|
Keyword Arguments:
|
||||||
group_id (str): name of the consumer group to join for dynamic
|
group_id (str): name of the consumer group to join for dynamic
|
||||||
@@ -87,7 +91,8 @@ class BaseCoordinator(object):
|
|||||||
self.needs_join_prepare = True
|
self.needs_join_prepare = True
|
||||||
self.heartbeat = Heartbeat(**self.config)
|
self.heartbeat = Heartbeat(**self.config)
|
||||||
self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
|
self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
|
||||||
#self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
|
self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
|
||||||
|
metric_group_prefix)
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
if hasattr(self, 'heartbeat_task') and self.heartbeat_task:
|
if hasattr(self, 'heartbeat_task') and self.heartbeat_task:
|
||||||
@@ -254,7 +259,7 @@ class BaseCoordinator(object):
|
|||||||
continue
|
continue
|
||||||
elif not future.retriable():
|
elif not future.retriable():
|
||||||
raise exception # pylint: disable-msg=raising-bad-type
|
raise exception # pylint: disable-msg=raising-bad-type
|
||||||
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
|
time.sleep(self.config['retry_backoff_ms'] / 1000)
|
||||||
|
|
||||||
def _send_join_group_request(self):
|
def _send_join_group_request(self):
|
||||||
"""Join the group and return the assignment for the next generation.
|
"""Join the group and return the assignment for the next generation.
|
||||||
@@ -285,7 +290,7 @@ class BaseCoordinator(object):
|
|||||||
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
|
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
|
||||||
future = Future()
|
future = Future()
|
||||||
_f = self._client.send(self.coordinator_id, request)
|
_f = self._client.send(self.coordinator_id, request)
|
||||||
_f.add_callback(self._handle_join_group_response, future)
|
_f.add_callback(self._handle_join_group_response, future, time.time())
|
||||||
_f.add_errback(self._failed_request, self.coordinator_id,
|
_f.add_errback(self._failed_request, self.coordinator_id,
|
||||||
request, future)
|
request, future)
|
||||||
return future
|
return future
|
||||||
@@ -300,7 +305,7 @@ class BaseCoordinator(object):
|
|||||||
self.coordinator_dead()
|
self.coordinator_dead()
|
||||||
future.failure(error)
|
future.failure(error)
|
||||||
|
|
||||||
def _handle_join_group_response(self, future, response):
|
def _handle_join_group_response(self, future, send_time, response):
|
||||||
error_type = Errors.for_code(response.error_code)
|
error_type = Errors.for_code(response.error_code)
|
||||||
if error_type is Errors.NoError:
|
if error_type is Errors.NoError:
|
||||||
log.debug("Received successful JoinGroup response for group %s: %s",
|
log.debug("Received successful JoinGroup response for group %s: %s",
|
||||||
@@ -311,7 +316,7 @@ class BaseCoordinator(object):
|
|||||||
self.protocol = response.group_protocol
|
self.protocol = response.group_protocol
|
||||||
log.info("Joined group '%s' (generation %s) with member_id %s",
|
log.info("Joined group '%s' (generation %s) with member_id %s",
|
||||||
self.group_id, self.generation, self.member_id)
|
self.group_id, self.generation, self.member_id)
|
||||||
#self.sensors.join_latency.record(response.requestLatencyMs())
|
self.sensors.join_latency.record((time.time() - send_time) * 1000)
|
||||||
if response.leader_id == response.member_id:
|
if response.leader_id == response.member_id:
|
||||||
log.info("Elected group leader -- performing partition"
|
log.info("Elected group leader -- performing partition"
|
||||||
" assignments using %s", self.protocol)
|
" assignments using %s", self.protocol)
|
||||||
@@ -402,17 +407,17 @@ class BaseCoordinator(object):
|
|||||||
return Future().failure(e)
|
return Future().failure(e)
|
||||||
future = Future()
|
future = Future()
|
||||||
_f = self._client.send(self.coordinator_id, request)
|
_f = self._client.send(self.coordinator_id, request)
|
||||||
_f.add_callback(self._handle_sync_group_response, future)
|
_f.add_callback(self._handle_sync_group_response, future, time.time())
|
||||||
_f.add_errback(self._failed_request, self.coordinator_id,
|
_f.add_errback(self._failed_request, self.coordinator_id,
|
||||||
request, future)
|
request, future)
|
||||||
return future
|
return future
|
||||||
|
|
||||||
def _handle_sync_group_response(self, future, response):
|
def _handle_sync_group_response(self, future, send_time, response):
|
||||||
error_type = Errors.for_code(response.error_code)
|
error_type = Errors.for_code(response.error_code)
|
||||||
if error_type is Errors.NoError:
|
if error_type is Errors.NoError:
|
||||||
log.info("Successfully joined group %s with generation %s",
|
log.info("Successfully joined group %s with generation %s",
|
||||||
self.group_id, self.generation)
|
self.group_id, self.generation)
|
||||||
#self.sensors.syncLatency.record(response.requestLatencyMs())
|
self.sensors.sync_latency.record((time.time() - send_time) * 1000)
|
||||||
future.success(response.member_assignment)
|
future.success(response.member_assignment)
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -540,13 +545,13 @@ class BaseCoordinator(object):
|
|||||||
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member
|
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member
|
||||||
future = Future()
|
future = Future()
|
||||||
_f = self._client.send(self.coordinator_id, request)
|
_f = self._client.send(self.coordinator_id, request)
|
||||||
_f.add_callback(self._handle_heartbeat_response, future)
|
_f.add_callback(self._handle_heartbeat_response, future, time.time())
|
||||||
_f.add_errback(self._failed_request, self.coordinator_id,
|
_f.add_errback(self._failed_request, self.coordinator_id,
|
||||||
request, future)
|
request, future)
|
||||||
return future
|
return future
|
||||||
|
|
||||||
def _handle_heartbeat_response(self, future, response):
|
def _handle_heartbeat_response(self, future, send_time, response):
|
||||||
#self.sensors.heartbeat_latency.record(response.requestLatencyMs())
|
self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
|
||||||
error_type = Errors.for_code(response.error_code)
|
error_type = Errors.for_code(response.error_code)
|
||||||
if error_type is Errors.NoError:
|
if error_type is Errors.NoError:
|
||||||
log.debug("Received successful heartbeat response for group %s",
|
log.debug("Received successful heartbeat response for group %s",
|
||||||
@@ -651,60 +656,56 @@ class HeartbeatTask(object):
|
|||||||
def _handle_heartbeat_failure(self, e):
|
def _handle_heartbeat_failure(self, e):
|
||||||
log.warning("Heartbeat failed (%s); retrying", e)
|
log.warning("Heartbeat failed (%s); retrying", e)
|
||||||
self._request_in_flight = False
|
self._request_in_flight = False
|
||||||
etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
|
etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000
|
||||||
self._client.schedule(self, etd)
|
self._client.schedule(self, etd)
|
||||||
|
|
||||||
'''
|
|
||||||
class GroupCoordinatorMetrics(object):
|
class GroupCoordinatorMetrics(object):
|
||||||
def __init__(self, metrics, prefix, tags=None):
|
def __init__(self, heartbeat, metrics, prefix, tags=None):
|
||||||
|
self.heartbeat = heartbeat
|
||||||
self.metrics = metrics
|
self.metrics = metrics
|
||||||
self.group_name = prefix + "-coordinator-metrics"
|
self.metric_group_name = prefix + "-coordinator-metrics"
|
||||||
|
|
||||||
self.heartbeat_latency = metrics.sensor("heartbeat-latency")
|
self.heartbeat_latency = metrics.sensor('heartbeat-latency')
|
||||||
self.heartbeat_latency.add(metrics.metricName(
|
self.heartbeat_latency.add(metrics.metric_name(
|
||||||
"heartbeat-response-time-max", self.group_name,
|
'heartbeat-response-time-max', self.metric_group_name,
|
||||||
"The max time taken to receive a response to a heartbeat request",
|
'The max time taken to receive a response to a heartbeat request',
|
||||||
tags), metrics.Max())
|
tags), Max())
|
||||||
self.heartbeat_latency.add(metrics.metricName(
|
self.heartbeat_latency.add(metrics.metric_name(
|
||||||
"heartbeat-rate", self.group_name,
|
'heartbeat-rate', self.metric_group_name,
|
||||||
"The average number of heartbeats per second",
|
'The average number of heartbeats per second',
|
||||||
tags), metrics.Rate(sampled_stat=metrics.Count()))
|
tags), Rate(sampled_stat=Count()))
|
||||||
|
|
||||||
self.join_latency = metrics.sensor("join-latency")
|
self.join_latency = metrics.sensor('join-latency')
|
||||||
self.join_latency.add(metrics.metricName(
|
self.join_latency.add(metrics.metric_name(
|
||||||
"join-time-avg", self.group_name,
|
'join-time-avg', self.metric_group_name,
|
||||||
"The average time taken for a group rejoin",
|
'The average time taken for a group rejoin',
|
||||||
tags), metrics.Avg())
|
tags), Avg())
|
||||||
self.join_latency.add(metrics.metricName(
|
self.join_latency.add(metrics.metric_name(
|
||||||
"join-time-max", self.group_name,
|
'join-time-max', self.metric_group_name,
|
||||||
"The max time taken for a group rejoin",
|
'The max time taken for a group rejoin',
|
||||||
tags), metrics.Avg())
|
tags), Avg())
|
||||||
self.join_latency.add(metrics.metricName(
|
self.join_latency.add(metrics.metric_name(
|
||||||
"join-rate", self.group_name,
|
'join-rate', self.metric_group_name,
|
||||||
"The number of group joins per second",
|
'The number of group joins per second',
|
||||||
tags), metrics.Rate(sampled_stat=metrics.Count()))
|
tags), Rate(sampled_stat=Count()))
|
||||||
|
|
||||||
self.sync_latency = metrics.sensor("sync-latency")
|
self.sync_latency = metrics.sensor('sync-latency')
|
||||||
self.sync_latency.add(metrics.metricName(
|
self.sync_latency.add(metrics.metric_name(
|
||||||
"sync-time-avg", self.group_name,
|
'sync-time-avg', self.metric_group_name,
|
||||||
"The average time taken for a group sync",
|
'The average time taken for a group sync',
|
||||||
tags), metrics.Avg())
|
tags), Avg())
|
||||||
self.sync_latency.add(metrics.MetricName(
|
self.sync_latency.add(metrics.metric_name(
|
||||||
"sync-time-max", self.group_name,
|
'sync-time-max', self.metric_group_name,
|
||||||
"The max time taken for a group sync",
|
'The max time taken for a group sync',
|
||||||
tags), metrics.Avg())
|
tags), Avg())
|
||||||
self.sync_latency.add(metrics.metricName(
|
self.sync_latency.add(metrics.metric_name(
|
||||||
"sync-rate", self.group_name,
|
'sync-rate', self.metric_group_name,
|
||||||
"The number of group syncs per second",
|
'The number of group syncs per second',
|
||||||
tags), metrics.Rate(sampled_stat=metrics.Count()))
|
tags), Rate(sampled_stat=Count()))
|
||||||
|
|
||||||
"""
|
metrics.add_metric(metrics.metric_name(
|
||||||
lastHeartbeat = Measurable(
|
'last-heartbeat-seconds-ago', self.metric_group_name,
|
||||||
measure=lambda _, value: value - heartbeat.last_heartbeat_send()
|
'The number of seconds since the last controller heartbeat',
|
||||||
)
|
tags), AnonMeasurable(
|
||||||
metrics.addMetric(metrics.metricName(
|
lambda _, now: (now / 1000) - self.heartbeat.last_send))
|
||||||
"last-heartbeat-seconds-ago", self.group_name,
|
|
||||||
"The number of seconds since the last controller heartbeat",
|
|
||||||
tags), lastHeartbeat)
|
|
||||||
"""
|
|
||||||
'''
|
|
||||||
|
@@ -76,7 +76,10 @@ class ConsumerCoordinator(BaseCoordinator):
|
|||||||
True the only way to receive records from an internal topic is
|
True the only way to receive records from an internal topic is
|
||||||
subscribing to it. Requires 0.10+. Default: True
|
subscribing to it. Requires 0.10+. Default: True
|
||||||
"""
|
"""
|
||||||
super(ConsumerCoordinator, self).__init__(client, **configs)
|
super(ConsumerCoordinator, self).__init__(client,
|
||||||
|
metrics, metric_group_prefix,
|
||||||
|
**configs)
|
||||||
|
|
||||||
self.config = copy.copy(self.DEFAULT_CONFIG)
|
self.config = copy.copy(self.DEFAULT_CONFIG)
|
||||||
for key in self.config:
|
for key in self.config:
|
||||||
if key in configs:
|
if key in configs:
|
||||||
@@ -107,8 +110,8 @@ class ConsumerCoordinator(BaseCoordinator):
|
|||||||
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
|
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
|
||||||
self._auto_commit_task.reschedule()
|
self._auto_commit_task.reschedule()
|
||||||
|
|
||||||
self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
|
self.consumer_sensors = ConsumerCoordinatorMetrics(
|
||||||
self._subscription)
|
metrics, metric_group_prefix, self._subscription)
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
if hasattr(self, '_cluster') and self._cluster:
|
if hasattr(self, '_cluster') and self._cluster:
|
||||||
@@ -485,7 +488,7 @@ class ConsumerCoordinator(BaseCoordinator):
|
|||||||
|
|
||||||
def _handle_offset_commit_response(self, offsets, future, send_time, response):
|
def _handle_offset_commit_response(self, offsets, future, send_time, response):
|
||||||
# TODO look at adding request_latency_ms to response (like java kafka)
|
# TODO look at adding request_latency_ms to response (like java kafka)
|
||||||
self._sensors.commit_latency.record((time.time() - send_time) * 1000)
|
self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
|
||||||
unauthorized_topics = set()
|
unauthorized_topics = set()
|
||||||
|
|
||||||
for topic, partitions in response.topics:
|
for topic, partitions in response.topics:
|
||||||
|
@@ -20,8 +20,8 @@ class Heartbeat(object):
|
|||||||
|
|
||||||
self.interval = self.config['heartbeat_interval_ms'] / 1000.0
|
self.interval = self.config['heartbeat_interval_ms'] / 1000.0
|
||||||
self.timeout = self.config['session_timeout_ms'] / 1000.0
|
self.timeout = self.config['session_timeout_ms'] / 1000.0
|
||||||
self.last_send = 0
|
self.last_send = -1 * float('inf')
|
||||||
self.last_receive = 0
|
self.last_receive = -1 * float('inf')
|
||||||
self.last_reset = time.time()
|
self.last_reset = time.time()
|
||||||
|
|
||||||
def sent_heartbeat(self):
|
def sent_heartbeat(self):
|
||||||
|
Reference in New Issue
Block a user