From a11b63d81acc2ae07b3bf4f5dd7e438918d3eadb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 18 Jun 2017 23:45:49 -0700 Subject: [PATCH] Update per KIP-144 --- kafka/client_async.py | 14 +++++++++----- kafka/conn.py | 18 +++++++++++++----- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 44bc9af..5308c1f 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -67,10 +67,14 @@ class KafkaClient(object): reconnect_backoff_ms (int): The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50. - reconnect_backoff_max (int): If higher than reconnect_backoff_ms, - node reconnect backoff will increase on each consecutive failure - up to this maximum. The actual backoff is chosen randomly from - an exponentially increasing range. Default: 60000. + reconnect_backoff_max_ms (int): The maximum amount of time in + milliseconds to wait when reconnecting to a broker that has + repeatedly failed to connect. If provided, the backoff per host + will increase exponentially for each consecutive connection + failure, up to this maximum. To avoid connection storms, a + randomization factor of 0.2 will be applied to the backoff + resulting in a random range between 20% below and 20% above + the computed value. Default: 1000. request_timeout_ms (int): Client request timeout in milliseconds. Default: 40000. retry_backoff_ms (int): Milliseconds to backoff when retrying on @@ -141,7 +145,7 @@ class KafkaClient(object): 'request_timeout_ms': 40000, 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max': 60000, + 'reconnect_backoff_max_ms': 1000, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, diff --git a/kafka/conn.py b/kafka/conn.py index 687b748..139620a 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -5,7 +5,7 @@ import copy import errno import logging import io -from random import randint, shuffle +from random import shuffle, uniform import socket import time import traceback @@ -78,6 +78,14 @@ class BrokerConnection(object): reconnect_backoff_ms (int): The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50. + reconnect_backoff_max_ms (int): The maximum amount of time in + milliseconds to wait when reconnecting to a broker that has + repeatedly failed to connect. If provided, the backoff per host + will increase exponentially for each consecutive connection + failure, up to this maximum. To avoid connection storms, a + randomization factor of 0.2 will be applied to the backoff + resulting in a random range between 20% below and 20% above + the computed value. Default: 1000. request_timeout_ms (int): Client request timeout in milliseconds. Default: 40000. max_in_flight_requests_per_connection (int): Requests are pipelined @@ -140,7 +148,7 @@ class BrokerConnection(object): 'node_id': 0, 'request_timeout_ms': 40000, 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max': 60000, + 'reconnect_backoff_max_ms': 1000, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, @@ -512,11 +520,11 @@ class BrokerConnection(object): self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0 def _update_reconnect_backoff(self): - if self.config['reconnect_backoff_max'] > self.config['reconnect_backoff_ms']: + if self.config['reconnect_backoff_max_ms'] > self.config['reconnect_backoff_ms']: self._failures += 1 self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1) - self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max']) - self._reconnect_backoff = randint(self.config['reconnect_backoff_ms'], self._reconnect_backoff) + self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms']) + self._reconnect_backoff *= uniform(0.8, 1.2) self._reconnect_backoff /= 1000.0 log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)