Add protocol support for ApiVersionRequest; identify 0.10 brokers in check_version

This commit is contained in:
Dana Powers
2016-05-02 20:59:05 -07:00
parent 874f4874a7
commit a3b7dca1b0
4 changed files with 26 additions and 3 deletions

View File

@@ -520,7 +520,7 @@ class BrokerConnection(object):
# vanilla MetadataRequest. If the server did not recognize the first # vanilla MetadataRequest. If the server did not recognize the first
# request, both will be failed with a ConnectionError that wraps # request, both will be failed with a ConnectionError that wraps
# socket.error (32, 54, or 104) # socket.error (32, 54, or 104)
from .protocol.admin import ListGroupsRequest from .protocol.admin import ApiVersionRequest, ListGroupsRequest
from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
from .protocol.metadata import MetadataRequest from .protocol.metadata import MetadataRequest
@@ -536,6 +536,7 @@ class BrokerConnection(object):
log.addFilter(log_filter) log.addFilter(log_filter)
test_cases = [ test_cases = [
('0.10', ApiVersionRequest[0]()),
('0.9', ListGroupsRequest[0]()), ('0.9', ListGroupsRequest[0]()),
('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')), ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])), ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),

View File

@@ -225,7 +225,7 @@ class KafkaConsumer(six.Iterator):
# Check Broker Version if not set explicitly # Check Broker Version if not set explicitly
if self.config['api_version'] == 'auto': if self.config['api_version'] == 'auto':
self.config['api_version'] = self._client.check_version() self.config['api_version'] = self._client.check_version()
assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0'), 'Unrecognized api version' assert self.config['api_version'] in ('0.10', '0.9', '0.8.2', '0.8.1', '0.8.0'), 'Unrecognized api version'
# Convert api_version config to tuple for easy comparisons # Convert api_version config to tuple for easy comparisons
self.config['api_version'] = tuple( self.config['api_version'] = tuple(

View File

@@ -268,7 +268,7 @@ class KafkaProducer(object):
# Check Broker Version if not set explicitly # Check Broker Version if not set explicitly
if self.config['api_version'] == 'auto': if self.config['api_version'] == 'auto':
self.config['api_version'] = client.check_version() self.config['api_version'] = client.check_version()
assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0') assert self.config['api_version'] in ('0.10', '0.9', '0.8.2', '0.8.1', '0.8.0')
# Convert api_version config to tuple for easy comparisons # Convert api_version config to tuple for easy comparisons
self.config['api_version'] = tuple( self.config['api_version'] = tuple(

View File

@@ -2,6 +2,28 @@ from .struct import Struct
from .types import Array, Bytes, Int16, Schema, String from .types import Array, Bytes, Int16, Schema, String
class ApiVersionResponse_v0(Struct):
API_KEY = 18
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('api_versions', Array(
('api_key', Int16),
('min_version', Int16),
('max_version', Int16))))
class ApiVersionRequest_v0(Struct):
API_KEY = 18
API_VERSION = 0
RESPONSE_TYPE = ApiVersionResponse_v0
SCHEMA = Schema()
ApiVersionRequest = [ApiVersionRequest_v0]
ApiVersionResponse = [ApiVersionResponse_v0]
class ListGroupsResponse_v0(Struct): class ListGroupsResponse_v0(Struct):
API_KEY = 16 API_KEY = 16
API_VERSION = 0 API_VERSION = 0