Merge pull request #794 from dpkp/conn_metrics

Complete metrics instrumentation
This commit is contained in:
Dana Powers
2016-08-04 14:22:40 -07:00
committed by GitHub
13 changed files with 253 additions and 49 deletions

View File

@@ -24,6 +24,7 @@ from .cluster import ClusterMetadata
from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
from . import errors as Errors from . import errors as Errors
from .future import Future from .future import Future
from .metrics import AnonMeasurable
from .metrics.stats import Avg, Count, Rate from .metrics.stats import Avg, Count, Rate
from .metrics.stats.rate import TimeUnit from .metrics.stats.rate import TimeUnit
from .protocol.metadata import MetadataRequest from .protocol.metadata import MetadataRequest
@@ -187,10 +188,13 @@ class KafkaClient(object):
self._wake_lock = threading.Lock() self._wake_lock = threading.Lock()
self._selector.register(self._wake_r, selectors.EVENT_READ) self._selector.register(self._wake_r, selectors.EVENT_READ)
self._closed = False self._closed = False
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._sensors = None self._sensors = None
if self.config['metrics']: if self.config['metrics']:
self._sensors = KafkaClientMetrics(self.config['metrics'], self.config['metric_group_prefix']) self._sensors = KafkaClientMetrics(self.config['metrics'],
self.config['metric_group_prefix'],
self._conns)
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
# Check Broker Version if not set explicitly # Check Broker Version if not set explicitly
if self.config['api_version'] is None: if self.config['api_version'] is None:
@@ -218,6 +222,7 @@ class KafkaClient(object):
cb = functools.partial(self._conn_state_change, 'bootstrap') cb = functools.partial(self._conn_state_change, 'bootstrap')
bootstrap = BrokerConnection(host, port, afi, bootstrap = BrokerConnection(host, port, afi,
state_change_callback=cb, state_change_callback=cb,
node_id='bootstrap',
**self.config) **self.config)
bootstrap.connect() bootstrap.connect()
while bootstrap.connecting(): while bootstrap.connecting():
@@ -273,6 +278,8 @@ class KafkaClient(object):
except KeyError: except KeyError:
pass pass
self._selector.register(conn._sock, selectors.EVENT_READ, conn) self._selector.register(conn._sock, selectors.EVENT_READ, conn)
if self._sensors:
self._sensors.connection_created.record()
if 'bootstrap' in self._conns and node_id != 'bootstrap': if 'bootstrap' in self._conns and node_id != 'bootstrap':
bootstrap = self._conns.pop('bootstrap') bootstrap = self._conns.pop('bootstrap')
@@ -289,6 +296,8 @@ class KafkaClient(object):
self._selector.unregister(conn._sock) self._selector.unregister(conn._sock)
except KeyError: except KeyError:
pass pass
if self._sensors:
self._sensors.connection_closed.record()
if self._refresh_on_disconnects and not self._closed: if self._refresh_on_disconnects and not self._closed:
log.warning("Node %s connection failed -- refreshing metadata", node_id) log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update() self.cluster.request_update()
@@ -305,6 +314,7 @@ class KafkaClient(object):
cb = functools.partial(self._conn_state_change, node_id) cb = functools.partial(self._conn_state_change, node_id)
self._conns[node_id] = BrokerConnection(host, broker.port, afi, self._conns[node_id] = BrokerConnection(host, broker.port, afi,
state_change_callback=cb, state_change_callback=cb,
node_id=node_id,
**self.config) **self.config)
conn = self._conns[node_id] conn = self._conns[node_id]
if conn.connected(): if conn.connected():
@@ -888,10 +898,19 @@ class DelayedTaskQueue(object):
class KafkaClientMetrics(object): class KafkaClientMetrics(object):
def __init__(self, metrics, metric_group_prefix): def __init__(self, metrics, metric_group_prefix, conns):
self.metrics = metrics self.metrics = metrics
self.metric_group_name = metric_group_prefix + '-metrics' self.metric_group_name = metric_group_prefix + '-metrics'
self.connection_closed = metrics.sensor('connections-closed')
self.connection_closed.add(metrics.metric_name(
'connection-close-rate', self.metric_group_name,
'Connections closed per second in the window.'), Rate())
self.connection_created = metrics.sensor('connections-created')
self.connection_created.add(metrics.metric_name(
'connection-creation-rate', self.metric_group_name,
'New connections established per second in the window.'), Rate())
self.select_time = metrics.sensor('select-time') self.select_time = metrics.sensor('select-time')
self.select_time.add(metrics.metric_name( self.select_time.add(metrics.metric_name(
'select-rate', self.metric_group_name, 'select-rate', self.metric_group_name,
@@ -915,3 +934,8 @@ class KafkaClientMetrics(object):
'io-ratio', self.metric_group_name, 'io-ratio', self.metric_group_name,
'The fraction of time the I/O thread spent doing I/O'), 'The fraction of time the I/O thread spent doing I/O'),
Rate(time_unit=TimeUnit.NANOSECONDS)) Rate(time_unit=TimeUnit.NANOSECONDS))
metrics.add_metric(metrics.metric_name(
'connection-count', self.metric_group_name,
'The current number of active connections.'), AnonMeasurable(
lambda config, now: len(conns)))

View File

@@ -14,8 +14,9 @@ from kafka.vendor import six
import kafka.errors as Errors import kafka.errors as Errors
from kafka.future import Future from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.api import RequestHeader from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest, SaslHandShakeResponse from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import GroupCoordinatorResponse from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.types import Int32 from kafka.protocol.types import Int32
from kafka.version import __version__ from kafka.version import __version__
@@ -58,6 +59,7 @@ InFlightRequest = collections.namedtuple('InFlightRequest',
class BrokerConnection(object): class BrokerConnection(object):
DEFAULT_CONFIG = { DEFAULT_CONFIG = {
'client_id': 'kafka-python-' + __version__, 'client_id': 'kafka-python-' + __version__,
'node_id': 0,
'request_timeout_ms': 40000, 'request_timeout_ms': 40000,
'reconnect_backoff_ms': 50, 'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5, 'max_in_flight_requests_per_connection': 5,
@@ -74,6 +76,8 @@ class BrokerConnection(object):
'ssl_password': None, 'ssl_password': None,
'api_version': (0, 8, 2), # default to most restrictive 'api_version': (0, 8, 2), # default to most restrictive
'state_change_callback': lambda conn: True, 'state_change_callback': lambda conn: True,
'metrics': None,
'metric_group_prefix': '',
'sasl_mechanism': 'PLAIN', 'sasl_mechanism': 'PLAIN',
'sasl_plain_username': None, 'sasl_plain_username': None,
'sasl_plain_password': None 'sasl_plain_password': None
@@ -81,6 +85,74 @@ class BrokerConnection(object):
SASL_MECHANISMS = ('PLAIN',) SASL_MECHANISMS = ('PLAIN',)
def __init__(self, host, port, afi, **configs): def __init__(self, host, port, afi, **configs):
"""Initialize a kafka broker connection
Keyword Arguments:
client_id (str): a name for this client. This string is passed in
each request to servers and can be used to identify specific
server-side log entries that correspond to this client. Also
submitted to GroupCoordinator for logging with respect to
consumer group administration. Default: 'kafka-python-{version}'
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on
system defaults). Java client defaults to 32768.
send_buffer_bytes (int): The size of the TCP send buffer
(SO_SNDBUF) to use when sending data. Default: None (relies on
system defaults). Java client defaults to 131072.
socket_options (list): List of tuple-arguments to socket.setsockopt
to apply to broker connection sockets. Default:
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None.
ssl_check_hostname (bool): flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname.
default: True.
ssl_cafile (str): optional filename of ca file to use in certificate
veriication. default: None.
ssl_certfile (str): optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to
establish the certificate's authenticity. default: None.
ssl_keyfile (str): optional filename containing the client private key.
default: None.
ssl_password (callable, str, bytes, bytearray): optional password or
callable function that returns a password, for decrypting the
client private key. Default: None.
ssl_crlfile (str): optional filename containing the CRL to check for
certificate expiration. By default, no CRL check is done. When
providing a file, only the leaf certificate will be checked against
this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
default: None.
api_version (tuple): specify which kafka API version to use. Accepted
values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10)
If None, KafkaClient will attempt to infer the broker
version by probing various APIs. Default: None
api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker
api version. Only applies if api_version is None
state_chance_callback (callable): function to be called when the
connection state changes from CONNECTING to CONNECTED etc.
metrics (kafka.metrics.Metrics): Optionally provide a metrics
instance for capturing network IO stats. Default: None.
metric_group_prefix (str): Prefix for metric names. Default: ''
sasl_mechanism (str): string picking sasl mechanism when security_protocol
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
Default: None
sasl_plain_username (str): username for sasl PLAIN authentication.
Default: None
sasl_plain_password (str): passowrd for sasl PLAIN authentication.
Defualt: None
"""
self.host = host self.host = host
self.hostname = host self.hostname = host
self.port = port self.port = port
@@ -123,6 +195,11 @@ class BrokerConnection(object):
self._correlation_id = 0 self._correlation_id = 0
self._gai = None self._gai = None
self._gai_index = 0 self._gai_index = 0
self._sensors = None
if self.config['metrics']:
self._sensors = BrokerConnectionMetrics(self.config['metrics'],
self.config['metric_group_prefix'],
self.config['node_id'])
def connect(self): def connect(self):
"""Attempt to connect and return ConnectionState""" """Attempt to connect and return ConnectionState"""
@@ -453,6 +530,8 @@ class BrokerConnection(object):
sent_bytes = self._sock.send(data[total_sent:]) sent_bytes = self._sock.send(data[total_sent:])
total_sent += sent_bytes total_sent += sent_bytes
assert total_sent == len(data) assert total_sent == len(data)
if self._sensors:
self._sensors.bytes_sent.record(total_sent)
self._sock.setblocking(False) self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e: except (AssertionError, ConnectionError) as e:
log.exception("Error sending %s to %s", request, self) log.exception("Error sending %s to %s", request, self)
@@ -583,6 +662,8 @@ class BrokerConnection(object):
self._receiving = False self._receiving = False
self._next_payload_bytes = 0 self._next_payload_bytes = 0
if self._sensors:
self._sensors.bytes_received.record(4 + self._rbuffer.tell())
self._rbuffer.seek(0) self._rbuffer.seek(0)
response = self._process_response(self._rbuffer) response = self._process_response(self._rbuffer)
self._rbuffer.seek(0) self._rbuffer.seek(0)
@@ -593,6 +674,8 @@ class BrokerConnection(object):
assert not self._processing, 'Recursion not supported' assert not self._processing, 'Recursion not supported'
self._processing = True self._processing = True
ifr = self.in_flight_requests.popleft() ifr = self.in_flight_requests.popleft()
if self._sensors:
self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000)
# verify send/recv correlation ids match # verify send/recv correlation ids match
recv_correlation_id = Int32.decode(read_buffer) recv_correlation_id = Int32.decode(read_buffer)
@@ -762,6 +845,111 @@ class BrokerConnection(object):
self.port) self.port)
class BrokerConnectionMetrics(object):
def __init__(self, metrics, metric_group_prefix, node_id):
self.metrics = metrics
# Any broker may have registered summary metrics already
# but if not, we need to create them so we can set as parents below
all_conns_transferred = metrics.get_sensor('bytes-sent-received')
if not all_conns_transferred:
metric_group_name = metric_group_prefix + '-metrics'
bytes_transferred = metrics.sensor('bytes-sent-received')
bytes_transferred.add(metrics.metric_name(
'network-io-rate', metric_group_name,
'The average number of network operations (reads or writes) on all'
' connections per second.'), Rate(sampled_stat=Count()))
bytes_sent = metrics.sensor('bytes-sent',
parents=[bytes_transferred])
bytes_sent.add(metrics.metric_name(
'outgoing-byte-rate', metric_group_name,
'The average number of outgoing bytes sent per second to all'
' servers.'), Rate())
bytes_sent.add(metrics.metric_name(
'request-rate', metric_group_name,
'The average number of requests sent per second.'),
Rate(sampled_stat=Count()))
bytes_sent.add(metrics.metric_name(
'request-size-avg', metric_group_name,
'The average size of all requests in the window.'), Avg())
bytes_sent.add(metrics.metric_name(
'request-size-max', metric_group_name,
'The maximum size of any request sent in the window.'), Max())
bytes_received = metrics.sensor('bytes-received',
parents=[bytes_transferred])
bytes_received.add(metrics.metric_name(
'incoming-byte-rate', metric_group_name,
'Bytes/second read off all sockets'), Rate())
bytes_received.add(metrics.metric_name(
'response-rate', metric_group_name,
'Responses received sent per second.'),
Rate(sampled_stat=Count()))
request_latency = metrics.sensor('request-latency')
request_latency.add(metrics.metric_name(
'request-latency-avg', metric_group_name,
'The average request latency in ms.'),
Avg())
request_latency.add(metrics.metric_name(
'request-latency-max', metric_group_name,
'The maximum request latency in ms.'),
Max())
# if one sensor of the metrics has been registered for the connection,
# then all other sensors should have been registered; and vice versa
node_str = 'node-{0}'.format(node_id)
node_sensor = metrics.get_sensor(node_str + '.bytes-sent')
if not node_sensor:
metric_group_name = metric_group_prefix + '-node-metrics.' + node_str
self.bytes_sent = metrics.sensor(
node_str + '.bytes-sent',
parents=[metrics.get_sensor('bytes-sent')])
self.bytes_sent.add(metrics.metric_name(
'outgoing-byte-rate', metric_group_name,
'The average number of outgoing bytes sent per second.'),
Rate())
self.bytes_sent.add(metrics.metric_name(
'request-rate', metric_group_name,
'The average number of requests sent per second.'),
Rate(sampled_stat=Count()))
self.bytes_sent.add(metrics.metric_name(
'request-size-avg', metric_group_name,
'The average size of all requests in the window.'),
Avg())
self.bytes_sent.add(metrics.metric_name(
'request-size-max', metric_group_name,
'The maximum size of any request sent in the window.'),
Max())
self.bytes_received = metrics.sensor(
node_str + '.bytes-received',
parents=[metrics.get_sensor('bytes-received')])
self.bytes_received.add(metrics.metric_name(
'incoming-byte-rate', metric_group_name,
'Bytes/second read off node-connection socket'),
Rate())
self.bytes_received.add(metrics.metric_name(
'response-rate', metric_group_name,
'The average number of responses received per second.'),
Rate(sampled_stat=Count()))
self.request_time = self.metrics.sensor(
node_str + '.latency',
parents=[metrics.get_sensor('request-latency')])
self.request_time.add(metrics.metric_name(
'request-latency-avg', metric_group_name,
'The average request latency in ms.'),
Avg())
self.request_time.add(metrics.metric_name(
'request-latency-max', metric_group_name,
'The maximum request latency in ms.'),
Max())
def _address_family(address): def _address_family(address):
""" """
Attempt to determine the family of an address (or hostname) Attempt to determine the family of an address (or hostname)

View File

@@ -42,11 +42,11 @@ class Fetcher(six.Iterator):
'check_crcs': True, 'check_crcs': True,
'skip_double_compressed_messages': False, 'skip_double_compressed_messages': False,
'iterator_refetch_records': 1, # undocumented -- interface may change 'iterator_refetch_records': 1, # undocumented -- interface may change
'metric_group_prefix': 'consumer',
'api_version': (0, 8, 0), 'api_version': (0, 8, 0),
} }
def __init__(self, client, subscriptions, metrics, metric_group_prefix, def __init__(self, client, subscriptions, metrics, **configs):
**configs):
"""Initialize a Kafka Message Fetcher. """Initialize a Kafka Message Fetcher.
Keyword Arguments: Keyword Arguments:
@@ -94,7 +94,7 @@ class Fetcher(six.Iterator):
self._record_too_large_partitions = dict() # {topic_partition: offset} self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None self._iterator = None
self._fetch_futures = collections.deque() self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, metric_group_prefix) self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
def init_fetches(self): def init_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions. """Send FetchRequests asynchronously for all assigned partitions.

View File

@@ -239,6 +239,7 @@ class KafkaConsumer(six.Iterator):
'metric_reporters': [], 'metric_reporters': [],
'metrics_num_samples': 2, 'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000, 'metrics_sample_window_ms': 30000,
'metric_group_prefix': 'consumer',
'selector': selectors.DefaultSelector, 'selector': selectors.DefaultSelector,
'exclude_internal_topics': True, 'exclude_internal_topics': True,
'sasl_mechanism': None, 'sasl_mechanism': None,
@@ -268,7 +269,6 @@ class KafkaConsumer(six.Iterator):
tags=metrics_tags) tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']] reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters) self._metrics = Metrics(metric_config, reporters)
metric_group_prefix = 'consumer'
# TODO _metrics likely needs to be passed to KafkaClient, etc. # TODO _metrics likely needs to be passed to KafkaClient, etc.
# api_version was previously a str. accept old format for now # api_version was previously a str. accept old format for now
@@ -281,7 +281,7 @@ class KafkaConsumer(six.Iterator):
log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated', log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated',
str(self.config['api_version']), str_version) str(self.config['api_version']), str_version)
self._client = KafkaClient(**self.config) self._client = KafkaClient(metrics=self._metrics, **self.config)
# Get auto-discovered version from client if necessary # Get auto-discovered version from client if necessary
if self.config['api_version'] is None: if self.config['api_version'] is None:
@@ -289,9 +289,9 @@ class KafkaConsumer(six.Iterator):
self._subscription = SubscriptionState(self.config['auto_offset_reset']) self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher( self._fetcher = Fetcher(
self._client, self._subscription, self._metrics, metric_group_prefix, **self.config) self._client, self._subscription, self._metrics, **self.config)
self._coordinator = ConsumerCoordinator( self._coordinator = ConsumerCoordinator(
self._client, self._subscription, self._metrics, metric_group_prefix, self._client, self._subscription, self._metrics,
assignors=self.config['partition_assignment_strategy'], assignors=self.config['partition_assignment_strategy'],
**self.config) **self.config)
self._closed = False self._closed = False

View File

@@ -55,9 +55,10 @@ class BaseCoordinator(object):
'heartbeat_interval_ms': 3000, 'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100, 'retry_backoff_ms': 100,
'api_version': (0, 9), 'api_version': (0, 9),
'metric_group_prefix': '',
} }
def __init__(self, client, metrics, metric_group_prefix, **configs): def __init__(self, client, metrics, **configs):
""" """
Keyword Arguments: Keyword Arguments:
group_id (str): name of the consumer group to join for dynamic group_id (str): name of the consumer group to join for dynamic
@@ -92,7 +93,7 @@ class BaseCoordinator(object):
self.heartbeat = Heartbeat(**self.config) self.heartbeat = Heartbeat(**self.config)
self.heartbeat_task = HeartbeatTask(weakref.proxy(self)) self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics, self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
metric_group_prefix) self.config['metric_group_prefix'])
def __del__(self): def __del__(self):
if hasattr(self, 'heartbeat_task') and self.heartbeat_task: if hasattr(self, 'heartbeat_task') and self.heartbeat_task:

View File

@@ -37,10 +37,10 @@ class ConsumerCoordinator(BaseCoordinator):
'retry_backoff_ms': 100, 'retry_backoff_ms': 100,
'api_version': (0, 9), 'api_version': (0, 9),
'exclude_internal_topics': True, 'exclude_internal_topics': True,
'metric_group_prefix': 'consumer'
} }
def __init__(self, client, subscription, metrics, metric_group_prefix, def __init__(self, client, subscription, metrics, **configs):
**configs):
"""Initialize the coordination manager. """Initialize the coordination manager.
Keyword Arguments: Keyword Arguments:
@@ -76,9 +76,7 @@ class ConsumerCoordinator(BaseCoordinator):
True the only way to receive records from an internal topic is True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True subscribing to it. Requires 0.10+. Default: True
""" """
super(ConsumerCoordinator, self).__init__(client, super(ConsumerCoordinator, self).__init__(client, metrics, **configs)
metrics, metric_group_prefix,
**configs)
self.config = copy.copy(self.DEFAULT_CONFIG) self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config: for key in self.config:
@@ -111,7 +109,7 @@ class ConsumerCoordinator(BaseCoordinator):
self._auto_commit_task.reschedule() self._auto_commit_task.reschedule()
self.consumer_sensors = ConsumerCoordinatorMetrics( self.consumer_sensors = ConsumerCoordinatorMetrics(
metrics, metric_group_prefix, self._subscription) metrics, self.config['metric_group_prefix'], self._subscription)
def __del__(self): def __del__(self):
if hasattr(self, '_cluster') and self._cluster: if hasattr(self, '_cluster') and self._cluster:

View File

@@ -9,6 +9,7 @@ from ..codec import (has_gzip, has_snappy, has_lz4,
gzip_encode, snappy_encode, gzip_encode, snappy_encode,
lz4_encode, lz4_encode_old_kafka) lz4_encode, lz4_encode_old_kafka)
from .. import errors as Errors from .. import errors as Errors
from ..metrics.stats import Rate
from ..protocol.types import Int32, Int64 from ..protocol.types import Int32, Int64
from ..protocol.message import MessageSet, Message from ..protocol.message import MessageSet, Message
@@ -135,7 +136,7 @@ class MessageSetBuffer(object):
class SimpleBufferPool(object): class SimpleBufferPool(object):
"""A simple pool of BytesIO objects with a weak memory ceiling.""" """A simple pool of BytesIO objects with a weak memory ceiling."""
def __init__(self, memory, poolable_size): def __init__(self, memory, poolable_size, metrics=None, metric_group_prefix='producer-metrics'):
"""Create a new buffer pool. """Create a new buffer pool.
Arguments: Arguments:
@@ -150,10 +151,13 @@ class SimpleBufferPool(object):
self._free = collections.deque([io.BytesIO() for _ in range(buffers)]) self._free = collections.deque([io.BytesIO() for _ in range(buffers)])
self._waiters = collections.deque() self._waiters = collections.deque()
#self.metrics = metrics; self.wait_time = None
#self.waitTime = this.metrics.sensor("bufferpool-wait-time"); if metrics:
#MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation."); self.wait_time = metrics.sensor('bufferpool-wait-time')
#this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); self.wait_time.add(metrics.metric_name(
'bufferpool-wait-ratio', metric_group_prefix,
'The fraction of time an appender waits for space allocation.'),
Rate())
def allocate(self, size, max_time_to_block_ms): def allocate(self, size, max_time_to_block_ms):
""" """
@@ -187,7 +191,8 @@ class SimpleBufferPool(object):
start_wait = time.time() start_wait = time.time()
more_memory.wait(max_time_to_block_ms / 1000.0) more_memory.wait(max_time_to_block_ms / 1000.0)
end_wait = time.time() end_wait = time.time()
#this.waitTime.record(endWait - startWait, time.milliseconds()); if self.wait_time:
self.wait_time.record(end_wait - start_wait)
if self._free: if self._free:
buf = self._free.popleft() buf = self._free.popleft()

View File

@@ -335,7 +335,7 @@ class KafkaProducer(object):
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
message_version = 1 if self.config['api_version'] >= (0, 10) else 0 message_version = 1 if self.config['api_version'] >= (0, 10) else 0
self._accumulator = RecordAccumulator(message_version=message_version, **self.config) self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
self._metadata = client.cluster self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
self._sender = Sender(client, self._metadata, self._sender = Sender(client, self._metadata,

View File

@@ -162,6 +162,8 @@ class RecordAccumulator(object):
'linger_ms': 0, 'linger_ms': 0,
'retry_backoff_ms': 100, 'retry_backoff_ms': 100,
'message_version': 0, 'message_version': 0,
'metrics': None,
'metric_group_prefix': 'producer-metrics',
} }
def __init__(self, **configs): def __init__(self, **configs):
@@ -176,7 +178,9 @@ class RecordAccumulator(object):
self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch] self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch]
self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries
self._free = SimpleBufferPool(self.config['buffer_memory'], self._free = SimpleBufferPool(self.config['buffer_memory'],
self.config['batch_size']) self.config['batch_size'],
metrics=self.config['metrics'],
metric_group_prefix=self.config['metric_group_prefix'])
self._incomplete = IncompleteRecordBatches() self._incomplete = IncompleteRecordBatches()
# The following variables should only be accessed by the sender thread, # The following variables should only be accessed by the sender thread,
# so we don't need to protect them w/ locking. # so we don't need to protect them w/ locking.

View File

@@ -204,7 +204,6 @@ class Sender(threading.Thread):
batch = batches_by_partition[tp] batch = batches_by_partition[tp]
self._complete_batch(batch, error, offset, ts) self._complete_batch(batch, error, offset, ts)
self._sensors.record_latency((time.time() - send_time) * 1000, node=node_id)
if response.API_VERSION > 0: if response.API_VERSION > 0:
self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id) self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id)
@@ -343,15 +342,6 @@ class SenderMetrics(object):
sensor_name=sensor_name, sensor_name=sensor_name,
description='The maximum time in ms record batches spent in the record accumulator.') description='The maximum time in ms record batches spent in the record accumulator.')
sensor_name = 'request-time'
self.request_time_sensor = self.metrics.sensor(sensor_name)
self.add_metric('request-latency-avg', Avg(),
sensor_name=sensor_name,
description='The average request latency in ms')
self.add_metric('request-latency-max', Max(),
sensor_name=sensor_name,
description='The maximum request latency in ms')
sensor_name = 'produce-throttle-time' sensor_name = 'produce-throttle-time'
self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name) self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name)
self.add_metric('produce-throttle-time-avg', Avg(), self.add_metric('produce-throttle-time-avg', Avg(),
@@ -498,12 +488,5 @@ class SenderMetrics(object):
if sensor: if sensor:
sensor.record(count) sensor.record(count)
def record_latency(self, latency, node=None):
self.request_time_sensor.record(latency)
if node is not None:
sensor = self.metrics.get_sensor('node-' + str(node) + '.latency')
if sensor:
sensor.record(latency)
def record_throttle_time(self, throttle_time_ms, node=None): def record_throttle_time(self, throttle_time_ms, node=None):
self.produce_throttle_time_sensor.record(throttle_time_ms) self.produce_throttle_time_sensor.record(throttle_time_ms)

View File

@@ -49,6 +49,7 @@ def test_bootstrap_success(conn):
args, kwargs = conn.call_args args, kwargs = conn.call_args
assert args == ('localhost', 9092, socket.AF_UNSPEC) assert args == ('localhost', 9092, socket.AF_UNSPEC)
kwargs.pop('state_change_callback') kwargs.pop('state_change_callback')
kwargs.pop('node_id')
assert kwargs == cli.config assert kwargs == cli.config
conn.connect.assert_called_with() conn.connect.assert_called_with()
conn.send.assert_called_once_with(MetadataRequest[0]([])) conn.send.assert_called_once_with(MetadataRequest[0]([]))
@@ -62,6 +63,7 @@ def test_bootstrap_failure(conn):
args, kwargs = conn.call_args args, kwargs = conn.call_args
assert args == ('localhost', 9092, socket.AF_UNSPEC) assert args == ('localhost', 9092, socket.AF_UNSPEC)
kwargs.pop('state_change_callback') kwargs.pop('state_change_callback')
kwargs.pop('node_id')
assert kwargs == cli.config assert kwargs == cli.config
conn.connect.assert_called_with() conn.connect.assert_called_with()
conn.close.assert_called_with() conn.close.assert_called_with()

View File

@@ -29,8 +29,7 @@ def client(conn):
@pytest.fixture @pytest.fixture
def coordinator(client): def coordinator(client):
return ConsumerCoordinator(client, SubscriptionState(), Metrics(), return ConsumerCoordinator(client, SubscriptionState(), Metrics())
'consumer')
def test_init(client, coordinator): def test_init(client, coordinator):
@@ -42,7 +41,7 @@ def test_init(client, coordinator):
@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
def test_autocommit_enable_api_version(client, api_version): def test_autocommit_enable_api_version(client, api_version):
coordinator = ConsumerCoordinator(client, SubscriptionState(), coordinator = ConsumerCoordinator(client, SubscriptionState(),
Metrics(), 'consumer', Metrics(),
enable_auto_commit=True, enable_auto_commit=True,
group_id='foobar', group_id='foobar',
api_version=api_version) api_version=api_version)
@@ -362,7 +361,7 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception')
client = KafkaClient(api_version=api_version) client = KafkaClient(api_version=api_version)
coordinator = ConsumerCoordinator(client, SubscriptionState(), coordinator = ConsumerCoordinator(client, SubscriptionState(),
Metrics(), 'consumer', Metrics(),
api_version=api_version, api_version=api_version,
enable_auto_commit=enable, enable_auto_commit=enable,
group_id=group_id) group_id=group_id)

View File

@@ -30,7 +30,7 @@ def fetcher(client, subscription_state):
subscription_state.assign_from_subscribed(assignment) subscription_state.assign_from_subscribed(assignment)
for tp in assignment: for tp in assignment:
subscription_state.seek(tp, 0) subscription_state.seek(tp, 0)
return Fetcher(client, subscription_state, Metrics(), 'test_fetcher') return Fetcher(client, subscription_state, Metrics())
def test_init_fetches(fetcher, mocker): def test_init_fetches(fetcher, mocker):