#761 Follow-up: use api_version tuples in BrokerConnection.check_version
This commit is contained in:
@@ -547,7 +547,6 @@ class BrokerConnection(object):
|
|||||||
|
|
||||||
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
|
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Monkeypatch the connection request timeout
|
# Monkeypatch the connection request timeout
|
||||||
# Generally this timeout should not get triggered
|
# Generally this timeout should not get triggered
|
||||||
# but in case it does, we want it to be reasonably short
|
# but in case it does, we want it to be reasonably short
|
||||||
@@ -575,11 +574,11 @@ class BrokerConnection(object):
|
|||||||
log.addFilter(log_filter)
|
log.addFilter(log_filter)
|
||||||
|
|
||||||
test_cases = [
|
test_cases = [
|
||||||
('0.10', ApiVersionRequest[0]()),
|
((0, 10), ApiVersionRequest[0]()),
|
||||||
('0.9', ListGroupsRequest[0]()),
|
((0, 9), ListGroupsRequest[0]()),
|
||||||
('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
|
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
|
||||||
('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
|
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
|
||||||
('0.8.0', MetadataRequest[0]([])),
|
((0, 8, 0), MetadataRequest[0]([])),
|
||||||
]
|
]
|
||||||
|
|
||||||
def connect():
|
def connect():
|
||||||
@@ -615,9 +614,9 @@ class BrokerConnection(object):
|
|||||||
self._sock.setblocking(False)
|
self._sock.setblocking(False)
|
||||||
|
|
||||||
if f.succeeded():
|
if f.succeeded():
|
||||||
log.info('Broker version identifed as %s', version)
|
log.info('Broker version identifed as %s', '.'.join(map(str, version)))
|
||||||
log.info("Set configuration api_version='%s' to skip auto"
|
log.info('Set configuration api_version=%s to skip auto'
|
||||||
" check_version requests on startup", version)
|
' check_version requests on startup', version)
|
||||||
break
|
break
|
||||||
|
|
||||||
# Only enable strict checking to verify that we understand failure
|
# Only enable strict checking to verify that we understand failure
|
||||||
@@ -634,7 +633,7 @@ class BrokerConnection(object):
|
|||||||
# requests (bug...). In this case we expect to see a correlation
|
# requests (bug...). In this case we expect to see a correlation
|
||||||
# id mismatch
|
# id mismatch
|
||||||
elif (isinstance(f.exception, Errors.CorrelationIdError) and
|
elif (isinstance(f.exception, Errors.CorrelationIdError) and
|
||||||
version == '0.10'):
|
version == (0, 10)):
|
||||||
pass
|
pass
|
||||||
elif six.PY2:
|
elif six.PY2:
|
||||||
assert isinstance(f.exception.args[0], socket.error)
|
assert isinstance(f.exception.args[0], socket.error)
|
||||||
@@ -648,7 +647,7 @@ class BrokerConnection(object):
|
|||||||
|
|
||||||
log.removeFilter(log_filter)
|
log.removeFilter(log_filter)
|
||||||
self.config['request_timeout_ms'] = stashed_request_timeout_ms
|
self.config['request_timeout_ms'] = stashed_request_timeout_ms
|
||||||
return tuple(map(int, version.split('.')))
|
return version
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<BrokerConnection host=%s/%s port=%d>" % (self.hostname, self.host,
|
return "<BrokerConnection host=%s/%s port=%d>" % (self.hostname, self.host,
|
||||||
|
Reference in New Issue
Block a user