Update per KIP-144

This commit is contained in:
Dana Powers
2017-06-18 23:45:49 -07:00
parent c12e04bc0c
commit a11b63d81a
2 changed files with 22 additions and 10 deletions

View File

@@ -67,10 +67,14 @@ class KafkaClient(object):
reconnect_backoff_ms (int): The amount of time in milliseconds to reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host. wait before attempting to reconnect to a given host.
Default: 50. Default: 50.
reconnect_backoff_max (int): If higher than reconnect_backoff_ms, reconnect_backoff_max_ms (int): The maximum amount of time in
node reconnect backoff will increase on each consecutive failure milliseconds to wait when reconnecting to a broker that has
up to this maximum. The actual backoff is chosen randomly from repeatedly failed to connect. If provided, the backoff per host
an exponentially increasing range. Default: 60000. will increase exponentially for each consecutive connection
failure, up to this maximum. To avoid connection storms, a
randomization factor of 0.2 will be applied to the backoff
resulting in a random range between 20% below and 20% above
the computed value. Default: 1000.
request_timeout_ms (int): Client request timeout in milliseconds. request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000. Default: 40000.
retry_backoff_ms (int): Milliseconds to backoff when retrying on retry_backoff_ms (int): Milliseconds to backoff when retrying on
@@ -141,7 +145,7 @@ class KafkaClient(object):
'request_timeout_ms': 40000, 'request_timeout_ms': 40000,
'connections_max_idle_ms': 9 * 60 * 1000, 'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50, 'reconnect_backoff_ms': 50,
'reconnect_backoff_max': 60000, 'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5, 'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None, 'receive_buffer_bytes': None,
'send_buffer_bytes': None, 'send_buffer_bytes': None,

View File

@@ -5,7 +5,7 @@ import copy
import errno import errno
import logging import logging
import io import io
from random import randint, shuffle from random import shuffle, uniform
import socket import socket
import time import time
import traceback import traceback
@@ -78,6 +78,14 @@ class BrokerConnection(object):
reconnect_backoff_ms (int): The amount of time in milliseconds to reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host. wait before attempting to reconnect to a given host.
Default: 50. Default: 50.
reconnect_backoff_max_ms (int): The maximum amount of time in
milliseconds to wait when reconnecting to a broker that has
repeatedly failed to connect. If provided, the backoff per host
will increase exponentially for each consecutive connection
failure, up to this maximum. To avoid connection storms, a
randomization factor of 0.2 will be applied to the backoff
resulting in a random range between 20% below and 20% above
the computed value. Default: 1000.
request_timeout_ms (int): Client request timeout in milliseconds. request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000. Default: 40000.
max_in_flight_requests_per_connection (int): Requests are pipelined max_in_flight_requests_per_connection (int): Requests are pipelined
@@ -140,7 +148,7 @@ class BrokerConnection(object):
'node_id': 0, 'node_id': 0,
'request_timeout_ms': 40000, 'request_timeout_ms': 40000,
'reconnect_backoff_ms': 50, 'reconnect_backoff_ms': 50,
'reconnect_backoff_max': 60000, 'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5, 'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None, 'receive_buffer_bytes': None,
'send_buffer_bytes': None, 'send_buffer_bytes': None,
@@ -512,11 +520,11 @@ class BrokerConnection(object):
self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0 self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0
def _update_reconnect_backoff(self): def _update_reconnect_backoff(self):
if self.config['reconnect_backoff_max'] > self.config['reconnect_backoff_ms']: if self.config['reconnect_backoff_max_ms'] > self.config['reconnect_backoff_ms']:
self._failures += 1 self._failures += 1
self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1) self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1)
self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max']) self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms'])
self._reconnect_backoff = randint(self.config['reconnect_backoff_ms'], self._reconnect_backoff) self._reconnect_backoff *= uniform(0.8, 1.2)
self._reconnect_backoff /= 1000.0 self._reconnect_backoff /= 1000.0
log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures) log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)