Use socket_options configuration to setsockopts(). Default TCP_NODELAY (#783)

This commit is contained in:
Dana Powers
2016-08-01 08:53:17 -07:00
committed by GitHub
parent 64d3607b87
commit 4162989b77
4 changed files with 39 additions and 17 deletions

View File

@@ -54,6 +54,7 @@ class KafkaClient(object):
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
'security_protocol': 'PLAINTEXT',
@@ -93,26 +94,29 @@ class KafkaClient(object):
server-side log entries that correspond to this client. Also
submitted to GroupCoordinator for logging with respect to
consumer group administration. Default: 'kafka-python-{version}'
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000.
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000.
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
send_buffer_bytes (int): The size of the TCP send buffer
(SO_SNDBUF) to use when sending data. Default: None (relies on
system defaults). Java client defaults to 131072.
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on
system defaults). Java client defaults to 32768.
send_buffer_bytes (int): The size of the TCP send buffer
(SO_SNDBUF) to use when sending data. Default: None (relies on
system defaults). Java client defaults to 131072.
socket_options (list): List of tuple-arguments to socket.setsockopt
to apply to broker connection sockets. Default:
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
metadata_max_age_ms (int): The period of time in milliseconds after
which we force a refresh of metadata even if we haven't seen any
partition leadership changes to proactively discover any new
brokers or partitions. Default: 300000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping

View File

@@ -60,6 +60,7 @@ class BrokerConnection(object):
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
@@ -84,6 +85,15 @@ class BrokerConnection(object):
if key in configs:
self.config[key] = configs[key]
if self.config['receive_buffer_bytes'] is not None:
self.config['socket_options'].append(
(socket.SOL_SOCKET, socket.SO_RCVBUF,
self.config['receive_buffer_bytes']))
if self.config['send_buffer_bytes'] is not None:
self.config['socket_options'].append(
(socket.SOL_SOCKET, socket.SO_SNDBUF,
self.config['send_buffer_bytes']))
self.state = ConnectionStates.DISCONNECTED
self._sock = None
self._ssl_context = None
@@ -144,12 +154,10 @@ class BrokerConnection(object):
self._sock = socket.socket(afi, socket.SOCK_STREAM)
else:
self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
if self.config['receive_buffer_bytes'] is not None:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
self.config['receive_buffer_bytes'])
if self.config['send_buffer_bytes'] is not None:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
self.config['send_buffer_bytes'])
for option in self.config['socket_options']:
self._sock.setsockopt(*option)
self._sock.setblocking(False)
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
self._wrap_ssl()

View File

@@ -2,6 +2,7 @@ from __future__ import absolute_import
import copy
import logging
import socket
import time
import six
@@ -114,12 +115,15 @@ class KafkaConsumer(six.Iterator):
rebalances. Default: 3000
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group managementment facilities. Default: 30000
send_buffer_bytes (int): The size of the TCP send buffer
(SO_SNDBUF) to use when sending data. Default: None (relies on
system defaults). The java client defaults to 131072.
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on
system defaults). The java client defaults to 32768.
send_buffer_bytes (int): The size of the TCP send buffer
(SO_SNDBUF) to use when sending data. Default: None (relies on
system defaults). The java client defaults to 131072.
socket_options (list): List of tuple-arguments to socket.setsockopt
to apply to broker connection sockets. Default:
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
consumer_timeout_ms (int): number of milliseconds to block during
message iteration before raising StopIteration (i.e., ending the
iterator). Default -1 (block forever).
@@ -209,8 +213,9 @@ class KafkaConsumer(six.Iterator):
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'heartbeat_interval_ms': 3000,
'session_timeout_ms': 30000,
'send_buffer_bytes': None,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'consumer_timeout_ms': -1,
'skip_double_compressed_messages': False,
'security_protocol': 'PLAINTEXT',

View File

@@ -3,6 +3,7 @@ from __future__ import absolute_import
import atexit
import copy
import logging
import socket
import threading
import time
import weakref
@@ -188,6 +189,9 @@ class KafkaProducer(object):
send_buffer_bytes (int): The size of the TCP send buffer
(SO_SNDBUF) to use when sending data. Default: None (relies on
system defaults). Java client defaults to 131072.
socket_options (list): List of tuple-arguments to socket.setsockopt
to apply to broker connection sockets. Default:
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
@@ -256,6 +260,7 @@ class KafkaProducer(object):
'request_timeout_ms': 30000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5,
'security_protocol': 'PLAINTEXT',