Dont override system rcvbuf or sndbuf unless user configures explicitly
This commit is contained in:
@@ -41,8 +41,8 @@ class KafkaClient(object):
|
||||
'request_timeout_ms': 40000,
|
||||
'reconnect_backoff_ms': 50,
|
||||
'max_in_flight_requests_per_connection': 5,
|
||||
'receive_buffer_bytes': 32768,
|
||||
'send_buffer_bytes': 131072,
|
||||
'receive_buffer_bytes': None,
|
||||
'send_buffer_bytes': None,
|
||||
'retry_backoff_ms': 100,
|
||||
'metadata_max_age_ms': 300000,
|
||||
}
|
||||
@@ -71,9 +71,11 @@ class KafkaClient(object):
|
||||
to kafka brokers up to this number of maximum requests per
|
||||
broker connection. Default: 5.
|
||||
send_buffer_bytes (int): The size of the TCP send buffer
|
||||
(SO_SNDBUF) to use when sending data. Default: 131072
|
||||
(SO_SNDBUF) to use when sending data. Default: None (relies on
|
||||
system defaults). Java client defaults to 131072.
|
||||
receive_buffer_bytes (int): The size of the TCP receive buffer
|
||||
(SO_RCVBUF) to use when reading data. Default: 32768
|
||||
(SO_RCVBUF) to use when reading data. Default: None (relies on
|
||||
system defaults). Java client defaults to 32768.
|
||||
metadata_max_age_ms (int): The period of time in milliseconds after
|
||||
which we force a refresh of metadata even if we haven't seen any
|
||||
partition leadership changes to proactively discover any new
|
||||
|
||||
@@ -47,8 +47,8 @@ class BrokerConnection(object):
|
||||
'request_timeout_ms': 40000,
|
||||
'reconnect_backoff_ms': 50,
|
||||
'max_in_flight_requests_per_connection': 5,
|
||||
'receive_buffer_bytes': 32768,
|
||||
'send_buffer_bytes': 131072,
|
||||
'receive_buffer_bytes': None,
|
||||
'send_buffer_bytes': None,
|
||||
'api_version': (0, 8, 2), # default to most restrictive
|
||||
}
|
||||
|
||||
@@ -77,8 +77,10 @@ class BrokerConnection(object):
|
||||
if self.state is ConnectionStates.DISCONNECTED:
|
||||
self.close()
|
||||
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
if self.config['receive_buffer_bytes'] is not None:
|
||||
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
|
||||
self.config['receive_buffer_bytes'])
|
||||
if self.config['send_buffer_bytes'] is not None:
|
||||
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
|
||||
self.config['send_buffer_bytes'])
|
||||
self._sock.setblocking(False)
|
||||
|
||||
@@ -111,9 +111,11 @@ class KafkaConsumer(six.Iterator):
|
||||
session_timeout_ms (int): The timeout used to detect failures when
|
||||
using Kafka's group managementment facilities. Default: 30000
|
||||
send_buffer_bytes (int): The size of the TCP send buffer
|
||||
(SO_SNDBUF) to use when sending data. Default: 131072
|
||||
(SO_SNDBUF) to use when sending data. Default: None (relies on
|
||||
system defaults). The java client defaults to 131072.
|
||||
receive_buffer_bytes (int): The size of the TCP receive buffer
|
||||
(SO_RCVBUF) to use when reading data. Default: 32768
|
||||
(SO_RCVBUF) to use when reading data. Default: None (relies on
|
||||
system defaults). The java client defaults to 32768.
|
||||
consumer_timeout_ms (int): number of millisecond to throw a timeout
|
||||
exception to the consumer if no message is available for
|
||||
consumption. Default: -1 (dont throw exception)
|
||||
@@ -149,8 +151,8 @@ class KafkaConsumer(six.Iterator):
|
||||
'partition_assignment_strategy': (RoundRobinPartitionAssignor,),
|
||||
'heartbeat_interval_ms': 3000,
|
||||
'session_timeout_ms': 30000,
|
||||
'send_buffer_bytes': 128 * 1024,
|
||||
'receive_buffer_bytes': 32 * 1024,
|
||||
'send_buffer_bytes': None,
|
||||
'receive_buffer_bytes': None,
|
||||
'consumer_timeout_ms': -1,
|
||||
'api_version': 'auto',
|
||||
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
|
||||
|
||||
@@ -180,9 +180,11 @@ class KafkaProducer(object):
|
||||
request_timeout_ms (int): Client request timeout in milliseconds.
|
||||
Default: 30000.
|
||||
receive_buffer_bytes (int): The size of the TCP receive buffer
|
||||
(SO_RCVBUF) to use when reading data. Default: 32768
|
||||
(SO_RCVBUF) to use when reading data. Default: None (relies on
|
||||
system defaults). Java client defaults to 32768.
|
||||
send_buffer_bytes (int): The size of the TCP send buffer
|
||||
(SO_SNDBUF) to use when sending data. Default: 131072
|
||||
(SO_SNDBUF) to use when sending data. Default: None (relies on
|
||||
system defaults). Java client defaults to 131072.
|
||||
reconnect_backoff_ms (int): The amount of time in milliseconds to
|
||||
wait before attempting to reconnect to a given host.
|
||||
Default: 50.
|
||||
@@ -215,8 +217,8 @@ class KafkaProducer(object):
|
||||
'metadata_max_age_ms': 300000,
|
||||
'retry_backoff_ms': 100,
|
||||
'request_timeout_ms': 30000,
|
||||
'receive_buffer_bytes': 32768,
|
||||
'send_buffer_bytes': 131072,
|
||||
'receive_buffer_bytes': None,
|
||||
'send_buffer_bytes': None,
|
||||
'reconnect_backoff_ms': 50,
|
||||
'max_in_flight_requests_per_connection': 5,
|
||||
'api_version': 'auto',
|
||||
|
||||
Reference in New Issue
Block a user