Avoid multiple connection attempts when refreshing metadata
This commit is contained in:
@@ -183,7 +183,6 @@ class KafkaClient(object):
|
|||||||
self.cluster = ClusterMetadata(**self.config)
|
self.cluster = ClusterMetadata(**self.config)
|
||||||
self._topics = set() # empty set will fetch all topic metadata
|
self._topics = set() # empty set will fetch all topic metadata
|
||||||
self._metadata_refresh_in_progress = False
|
self._metadata_refresh_in_progress = False
|
||||||
self._last_no_node_available_ms = 0
|
|
||||||
self._selector = self.config['selector']()
|
self._selector = self.config['selector']()
|
||||||
self._conns = {}
|
self._conns = {}
|
||||||
self._connecting = set()
|
self._connecting = set()
|
||||||
@@ -709,50 +708,55 @@ class KafkaClient(object):
|
|||||||
int: milliseconds until next refresh
|
int: milliseconds until next refresh
|
||||||
"""
|
"""
|
||||||
ttl = self.cluster.ttl()
|
ttl = self.cluster.ttl()
|
||||||
next_reconnect_ms = self._last_no_node_available_ms + self.cluster.refresh_backoff()
|
wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0
|
||||||
next_reconnect_ms = max(next_reconnect_ms - time.time() * 1000, 0)
|
metadata_timeout = max(ttl, wait_for_in_progress_ms)
|
||||||
wait_for_in_progress_ms = 9999999999 if self._metadata_refresh_in_progress else 0
|
|
||||||
timeout = max(ttl, next_reconnect_ms, wait_for_in_progress_ms)
|
|
||||||
|
|
||||||
if timeout == 0:
|
if metadata_timeout > 0:
|
||||||
node_id = self.least_loaded_node()
|
return metadata_timeout
|
||||||
if node_id is None:
|
|
||||||
log.debug("Give up sending metadata request since no node is available")
|
|
||||||
# mark the timestamp for no node available to connect
|
|
||||||
self._last_no_node_available_ms = time.time() * 1000
|
|
||||||
return timeout
|
|
||||||
|
|
||||||
if self._can_send_request(node_id):
|
# Beware that the behavior of this method and the computation of
|
||||||
topics = list(self._topics)
|
# timeouts for poll() are highly dependent on the behavior of
|
||||||
if self.cluster.need_all_topic_metadata or not topics:
|
# least_loaded_node()
|
||||||
topics = [] if self.config['api_version'] < (0, 10) else None
|
node_id = self.least_loaded_node()
|
||||||
api_version = 0 if self.config['api_version'] < (0, 10) else 1
|
if node_id is None:
|
||||||
request = MetadataRequest[api_version](topics)
|
log.debug("Give up sending metadata request since no node is available");
|
||||||
log.debug("Sending metadata request %s to node %s", request, node_id)
|
return self.config['reconnect_backoff_ms']
|
||||||
future = self.send(node_id, request)
|
|
||||||
future.add_callback(self.cluster.update_metadata)
|
|
||||||
future.add_errback(self.cluster.failed_update)
|
|
||||||
|
|
||||||
self._metadata_refresh_in_progress = True
|
if self._can_send_request(node_id):
|
||||||
def refresh_done(val_or_error):
|
topics = list(self._topics)
|
||||||
self._metadata_refresh_in_progress = False
|
if self.cluster.need_all_topic_metadata or not topics:
|
||||||
future.add_callback(refresh_done)
|
topics = [] if self.config['api_version'] < (0, 10) else None
|
||||||
future.add_errback(refresh_done)
|
api_version = 0 if self.config['api_version'] < (0, 10) else 1
|
||||||
|
request = MetadataRequest[api_version](topics)
|
||||||
|
log.debug("Sending metadata request %s to node %s", request, node_id)
|
||||||
|
future = self.send(node_id, request)
|
||||||
|
future.add_callback(self.cluster.update_metadata)
|
||||||
|
future.add_errback(self.cluster.failed_update)
|
||||||
|
|
||||||
elif self._can_connect(node_id):
|
self._metadata_refresh_in_progress = True
|
||||||
log.debug("Initializing connection to node %s for metadata request", node_id)
|
def refresh_done(val_or_error):
|
||||||
self._maybe_connect(node_id)
|
self._metadata_refresh_in_progress = False
|
||||||
# If _maybe_connect failed immediately, this node will be put into blackout and we
|
future.add_callback(refresh_done)
|
||||||
# should allow immediately retrying in case there is another candidate node. If it
|
future.add_errback(refresh_done)
|
||||||
# is still connecting, the worst case is that we end up setting a longer timeout
|
return self.config['request_timeout_ms']
|
||||||
# on the next round and then wait for the response.
|
|
||||||
else:
|
|
||||||
# connected, but can't send more OR connecting
|
|
||||||
# In either case, we just need to wait for a network event to let us know the selected
|
|
||||||
# connection might be usable again.
|
|
||||||
self._last_no_node_available_ms = time.time() * 1000
|
|
||||||
|
|
||||||
return timeout
|
# If there's any connection establishment underway, wait until it completes. This prevents
|
||||||
|
# the client from unnecessarily connecting to additional nodes while a previous connection
|
||||||
|
# attempt has not been completed.
|
||||||
|
if self._connecting:
|
||||||
|
# Strictly the timeout we should return here is "connect timeout", but as we don't
|
||||||
|
# have such application level configuration, using request timeout instead.
|
||||||
|
return self.config['request_timeout_ms']
|
||||||
|
|
||||||
|
if self._can_connect(node_id):
|
||||||
|
log.debug("Initializing connection to node %s for metadata request", node_id)
|
||||||
|
self._maybe_connect(node_id)
|
||||||
|
return self.config['reconnect_backoff_ms']
|
||||||
|
|
||||||
|
# connected but can't send more, OR connecting
|
||||||
|
# In either case we just need to wait for a network event
|
||||||
|
# to let us know the selected connection might be usable again.
|
||||||
|
return float('inf')
|
||||||
|
|
||||||
def schedule(self, task, at):
|
def schedule(self, task, at):
|
||||||
"""Schedule a new task to be executed at the given time.
|
"""Schedule a new task to be executed at the given time.
|
||||||
|
|||||||
@@ -319,7 +319,7 @@ def client(mocker):
|
|||||||
mocker.patch.object(KafkaClient, '_bootstrap')
|
mocker.patch.object(KafkaClient, '_bootstrap')
|
||||||
_poll = mocker.patch.object(KafkaClient, '_poll')
|
_poll = mocker.patch.object(KafkaClient, '_poll')
|
||||||
|
|
||||||
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222, api_version=(0, 9))
|
cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))
|
||||||
|
|
||||||
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
|
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
|
||||||
tasks.return_value = 9999999
|
tasks.return_value = 9999999
|
||||||
@@ -332,7 +332,7 @@ def client(mocker):
|
|||||||
def test_maybe_refresh_metadata_ttl(mocker, client):
|
def test_maybe_refresh_metadata_ttl(mocker, client):
|
||||||
client.cluster.ttl.return_value = 1234
|
client.cluster.ttl.return_value = 1234
|
||||||
|
|
||||||
client.poll(timeout_ms=9999999, sleep=True)
|
client.poll(timeout_ms=12345678, sleep=True)
|
||||||
client._poll.assert_called_with(1.234, sleep=True)
|
client._poll.assert_called_with(1.234, sleep=True)
|
||||||
|
|
||||||
|
|
||||||
@@ -340,17 +340,16 @@ def test_maybe_refresh_metadata_backoff(mocker, client):
|
|||||||
now = time.time()
|
now = time.time()
|
||||||
t = mocker.patch('time.time')
|
t = mocker.patch('time.time')
|
||||||
t.return_value = now
|
t.return_value = now
|
||||||
client._last_no_node_available_ms = now * 1000
|
|
||||||
|
|
||||||
client.poll(timeout_ms=9999999, sleep=True)
|
client.poll(timeout_ms=12345678, sleep=True)
|
||||||
client._poll.assert_called_with(2.222, sleep=True)
|
client._poll.assert_called_with(2.222, sleep=True)
|
||||||
|
|
||||||
|
|
||||||
def test_maybe_refresh_metadata_in_progress(mocker, client):
|
def test_maybe_refresh_metadata_in_progress(mocker, client):
|
||||||
client._metadata_refresh_in_progress = True
|
client._metadata_refresh_in_progress = True
|
||||||
|
|
||||||
client.poll(timeout_ms=9999999, sleep=True)
|
client.poll(timeout_ms=12345678, sleep=True)
|
||||||
client._poll.assert_called_with(9999.999, sleep=True)
|
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
|
||||||
|
|
||||||
|
|
||||||
def test_maybe_refresh_metadata_update(mocker, client):
|
def test_maybe_refresh_metadata_update(mocker, client):
|
||||||
@@ -358,23 +357,22 @@ def test_maybe_refresh_metadata_update(mocker, client):
|
|||||||
mocker.patch.object(client, '_can_send_request', return_value=True)
|
mocker.patch.object(client, '_can_send_request', return_value=True)
|
||||||
send = mocker.patch.object(client, 'send')
|
send = mocker.patch.object(client, 'send')
|
||||||
|
|
||||||
client.poll(timeout_ms=9999999, sleep=True)
|
client.poll(timeout_ms=12345678, sleep=True)
|
||||||
client._poll.assert_called_with(0, sleep=True)
|
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
|
||||||
assert client._metadata_refresh_in_progress
|
assert client._metadata_refresh_in_progress
|
||||||
request = MetadataRequest[0]([])
|
request = MetadataRequest[0]([])
|
||||||
send.assert_called_with('foobar', request)
|
send.assert_called_with('foobar', request)
|
||||||
|
|
||||||
|
|
||||||
def test_maybe_refresh_metadata_failure(mocker, client):
|
def test_maybe_refresh_metadata_cant_send(mocker, client):
|
||||||
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
|
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
|
||||||
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
t = mocker.patch('time.time')
|
t = mocker.patch('time.time')
|
||||||
t.return_value = now
|
t.return_value = now
|
||||||
|
|
||||||
client.poll(timeout_ms=9999999, sleep=True)
|
client.poll(timeout_ms=12345678, sleep=True)
|
||||||
client._poll.assert_called_with(0, sleep=True)
|
client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout)
|
||||||
assert client._last_no_node_available_ms == now * 1000
|
|
||||||
assert not client._metadata_refresh_in_progress
|
assert not client._metadata_refresh_in_progress
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user