Avoid busy poll during metadata refresh failure with retry_backoff_ms (#733)
This commit is contained in:
@@ -126,6 +126,7 @@ class KafkaClient(object):
|
||||
self.cluster = ClusterMetadata(**self.config)
|
||||
self._topics = set() # empty set will fetch all topic metadata
|
||||
self._metadata_refresh_in_progress = False
|
||||
self._last_no_node_available_ms = 0
|
||||
self._selector = selectors.DefaultSelector()
|
||||
self._conns = {}
|
||||
self._connecting = set()
|
||||
@@ -600,38 +601,50 @@ class KafkaClient(object):
|
||||
int: milliseconds until next refresh
|
||||
"""
|
||||
ttl = self.cluster.ttl()
|
||||
if ttl > 0:
|
||||
return ttl
|
||||
next_reconnect_ms = self._last_no_node_available_ms + self.cluster.refresh_backoff()
|
||||
next_reconnect_ms = max(next_reconnect_ms - time.time() * 1000, 0)
|
||||
wait_for_in_progress_ms = 9999999999 if self._metadata_refresh_in_progress else 0
|
||||
timeout = max(ttl, next_reconnect_ms, wait_for_in_progress_ms)
|
||||
|
||||
if self._metadata_refresh_in_progress:
|
||||
return 9999999999
|
||||
if timeout == 0:
|
||||
node_id = self.least_loaded_node()
|
||||
if node_id is None:
|
||||
log.debug("Give up sending metadata request since no node is available")
|
||||
# mark the timestamp for no node available to connect
|
||||
self._last_no_node_available_ms = time.time() * 1000
|
||||
return timeout
|
||||
|
||||
node_id = self.least_loaded_node()
|
||||
if node_id is None:
|
||||
return 0
|
||||
topics = list(self._topics)
|
||||
if self.cluster.need_all_topic_metadata:
|
||||
topics = []
|
||||
|
||||
topics = list(self._topics)
|
||||
if self.cluster.need_all_topic_metadata:
|
||||
topics = []
|
||||
if self._can_send_request(node_id):
|
||||
request = MetadataRequest[0](topics)
|
||||
log.debug("Sending metadata request %s to node %s", request, node_id)
|
||||
future = self.send(node_id, request)
|
||||
future.add_callback(self.cluster.update_metadata)
|
||||
future.add_errback(self.cluster.failed_update)
|
||||
|
||||
if self._can_send_request(node_id):
|
||||
request = MetadataRequest[0](topics)
|
||||
log.debug("Sending metadata request %s to node %s", request, node_id)
|
||||
future = self.send(node_id, request)
|
||||
future.add_callback(self.cluster.update_metadata)
|
||||
future.add_errback(self.cluster.failed_update)
|
||||
self._metadata_refresh_in_progress = True
|
||||
def refresh_done(val_or_error):
|
||||
self._metadata_refresh_in_progress = False
|
||||
future.add_callback(refresh_done)
|
||||
future.add_errback(refresh_done)
|
||||
|
||||
self._metadata_refresh_in_progress = True
|
||||
def refresh_done(val_or_error):
|
||||
self._metadata_refresh_in_progress = False
|
||||
future.add_callback(refresh_done)
|
||||
future.add_errback(refresh_done)
|
||||
elif self._can_connect(node_id):
|
||||
log.debug("Initializing connection to node %s for metadata request", node_id)
|
||||
self._maybe_connect(node_id)
|
||||
# If initiateConnect failed immediately, this node will be put into blackout and we
|
||||
# should allow immediately retrying in case there is another candidate node. If it
|
||||
# is still connecting, the worst case is that we end up setting a longer timeout
|
||||
# on the next round and then wait for the response.
|
||||
else:
|
||||
# connected, but can't send more OR connecting
|
||||
# In either case, we just need to wait for a network event to let us know the selected
|
||||
# connection might be usable again.
|
||||
self._last_no_node_available_ms = time.time() * 1000
|
||||
|
||||
elif self._can_connect(node_id):
|
||||
log.debug("Initializing connection to node %s for metadata request", node_id)
|
||||
self._maybe_connect(node_id)
|
||||
|
||||
return 0
|
||||
return timeout
|
||||
|
||||
def schedule(self, task, at):
|
||||
"""Schedule a new task to be executed at the given time.
|
||||
|
||||
@@ -131,6 +131,10 @@ class ClusterMetadata(object):
|
||||
|
||||
return max(ttl, next_retry, 0)
|
||||
|
||||
def refresh_backoff(self):
|
||||
"""Return milliseconds to wait before attempting to retry after failure"""
|
||||
return self.config['retry_backoff_ms']
|
||||
|
||||
def request_update(self):
|
||||
"""Flags metadata for update, return Future()
|
||||
|
||||
|
||||
@@ -293,8 +293,106 @@ def test_set_topics():
|
||||
pass
|
||||
|
||||
|
||||
def test_maybe_refresh_metadata():
|
||||
pass
|
||||
def test_maybe_refresh_metadata_ttl(mocker):
|
||||
mocker.patch.object(KafkaClient, '_bootstrap')
|
||||
_poll = mocker.patch.object(KafkaClient, '_poll')
|
||||
|
||||
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
|
||||
|
||||
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
|
||||
tasks.return_value = 9999999
|
||||
|
||||
ttl = mocker.patch.object(cli.cluster, 'ttl')
|
||||
ttl.return_value = 1234
|
||||
|
||||
cli.poll(timeout_ms=9999999, sleep=True)
|
||||
_poll.assert_called_with(1.234, sleep=True)
|
||||
|
||||
|
||||
def test_maybe_refresh_metadata_backoff(mocker):
|
||||
mocker.patch.object(KafkaClient, '_bootstrap')
|
||||
_poll = mocker.patch.object(KafkaClient, '_poll')
|
||||
|
||||
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
|
||||
|
||||
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
|
||||
tasks.return_value = 9999999
|
||||
|
||||
ttl = mocker.patch.object(cli.cluster, 'ttl')
|
||||
ttl.return_value = 0
|
||||
|
||||
now = time.time()
|
||||
t = mocker.patch('time.time')
|
||||
t.return_value = now
|
||||
cli._last_no_node_available_ms = now * 1000
|
||||
|
||||
cli.poll(timeout_ms=9999999, sleep=True)
|
||||
_poll.assert_called_with(2.222, sleep=True)
|
||||
|
||||
|
||||
def test_maybe_refresh_metadata_in_progress(mocker):
|
||||
mocker.patch.object(KafkaClient, '_bootstrap')
|
||||
_poll = mocker.patch.object(KafkaClient, '_poll')
|
||||
|
||||
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
|
||||
|
||||
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
|
||||
tasks.return_value = 9999999
|
||||
|
||||
ttl = mocker.patch.object(cli.cluster, 'ttl')
|
||||
ttl.return_value = 0
|
||||
|
||||
cli._metadata_refresh_in_progress = True
|
||||
|
||||
cli.poll(timeout_ms=9999999, sleep=True)
|
||||
_poll.assert_called_with(9999.999, sleep=True)
|
||||
|
||||
|
||||
def test_maybe_refresh_metadata_update(mocker):
|
||||
mocker.patch.object(KafkaClient, '_bootstrap')
|
||||
_poll = mocker.patch.object(KafkaClient, '_poll')
|
||||
|
||||
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
|
||||
|
||||
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
|
||||
tasks.return_value = 9999999
|
||||
|
||||
ttl = mocker.patch.object(cli.cluster, 'ttl')
|
||||
ttl.return_value = 0
|
||||
|
||||
mocker.patch.object(cli, 'least_loaded_node', return_value='foobar')
|
||||
mocker.patch.object(cli, '_can_send_request', return_value=True)
|
||||
send = mocker.patch.object(cli, 'send')
|
||||
|
||||
cli.poll(timeout_ms=9999999, sleep=True)
|
||||
_poll.assert_called_with(0, sleep=True)
|
||||
assert cli._metadata_refresh_in_progress
|
||||
request = MetadataRequest[0]([])
|
||||
send.assert_called_with('foobar', request)
|
||||
|
||||
|
||||
def test_maybe_refresh_metadata_failure(mocker):
|
||||
mocker.patch.object(KafkaClient, '_bootstrap')
|
||||
_poll = mocker.patch.object(KafkaClient, '_poll')
|
||||
|
||||
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
|
||||
|
||||
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
|
||||
tasks.return_value = 9999999
|
||||
|
||||
ttl = mocker.patch.object(cli.cluster, 'ttl')
|
||||
ttl.return_value = 0
|
||||
|
||||
mocker.patch.object(cli, 'least_loaded_node', return_value='foobar')
|
||||
|
||||
now = time.time()
|
||||
t = mocker.patch('time.time')
|
||||
t.return_value = now
|
||||
|
||||
cli.poll(timeout_ms=9999999, sleep=True)
|
||||
_poll.assert_called_with(0, sleep=True)
|
||||
assert cli._last_no_node_available_ms == now * 1000
|
||||
assert not cli._metadata_refresh_in_progress
|
||||
|
||||
|
||||
def test_schedule():
|
||||
|
||||
Reference in New Issue
Block a user