Monkeypatch max_in_flight_requests_per_connection when checking broker version (#834)
This commit is contained in:
@@ -738,11 +738,15 @@ class BrokerConnection(object):
|
||||
|
||||
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
|
||||
"""
|
||||
# Monkeypatch the connection request timeout
|
||||
# Generally this timeout should not get triggered
|
||||
# but in case it does, we want it to be reasonably short
|
||||
stashed_request_timeout_ms = self.config['request_timeout_ms']
|
||||
self.config['request_timeout_ms'] = timeout * 1000
|
||||
# Monkeypatch some connection configurations to avoid timeouts
|
||||
override_config = {
|
||||
'request_timeout_ms': timeout * 1000,
|
||||
'max_in_flight_requests_per_connection': 5
|
||||
}
|
||||
stashed = {}
|
||||
for key in override_config:
|
||||
stashed[key] = self.config[key]
|
||||
self.config[key] = override_config[key]
|
||||
|
||||
# kafka kills the connection when it doesnt recognize an API request
|
||||
# so we can send a test request and then follow immediately with a
|
||||
@@ -837,7 +841,8 @@ class BrokerConnection(object):
|
||||
raise Errors.UnrecognizedBrokerVersion()
|
||||
|
||||
log.removeFilter(log_filter)
|
||||
self.config['request_timeout_ms'] = stashed_request_timeout_ms
|
||||
for key in stashed:
|
||||
self.config[key] = stashed[key]
|
||||
return version
|
||||
|
||||
def __repr__(self):
|
||||
|
Reference in New Issue
Block a user