KAFKA-2136: support Fetch and Produce v1 (throttle_time_ms)

This commit is contained in:
Dana Powers
2016-04-05 13:04:24 -07:00
parent 331442ee0f
commit 3d16f2ff5f
5 changed files with 57 additions and 14 deletions

View File

@@ -37,6 +37,7 @@ class Fetcher(six.Iterator):
'max_partition_fetch_bytes': 1048576, 'max_partition_fetch_bytes': 1048576,
'check_crcs': True, 'check_crcs': True,
'iterator_refetch_records': 1, # undocumented -- interface may change 'iterator_refetch_records': 1, # undocumented -- interface may change
'api_version': (0, 8, 0),
} }
def __init__(self, client, subscriptions, **configs): def __init__(self, client, subscriptions, **configs):
@@ -531,7 +532,7 @@ class Fetcher(six.Iterator):
FetchRequests skipped if no leader, or node has requests in flight FetchRequests skipped if no leader, or node has requests in flight
Returns: Returns:
dict: {node_id: FetchRequest, ...} dict: {node_id: FetchRequest, ...} (version depends on api_version)
""" """
# create the fetch info as a dict of lists of partition info tuples # create the fetch info as a dict of lists of partition info tuples
# which can be passed to FetchRequest() via .items() # which can be passed to FetchRequest() via .items()
@@ -564,9 +565,10 @@ class Fetcher(six.Iterator):
log.debug("Adding fetch request for partition %s at offset %d", log.debug("Adding fetch request for partition %s at offset %d",
partition, position) partition, position)
version = 1 if self.config['api_version'] >= (0, 9) else 0
requests = {} requests = {}
for node_id, partition_data in six.iteritems(fetchable): for node_id, partition_data in six.iteritems(fetchable):
requests[node_id] = FetchRequest[0]( requests[node_id] = FetchRequest[version](
-1, # replica_id -1, # replica_id
self.config['fetch_max_wait_ms'], self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'], self.config['fetch_min_bytes'],

View File

@@ -27,6 +27,7 @@ class Sender(threading.Thread):
'retries': 0, 'retries': 0,
'request_timeout_ms': 30000, 'request_timeout_ms': 30000,
'client_id': 'kafka-python-' + __version__, 'client_id': 'kafka-python-' + __version__,
'api_version': (0, 8, 0),
} }
def __init__(self, client, metadata, accumulator, **configs): def __init__(self, client, metadata, accumulator, **configs):
@@ -232,7 +233,7 @@ class Sender(threading.Thread):
collated: {node_id: [RecordBatch]} collated: {node_id: [RecordBatch]}
Returns: Returns:
dict: {node_id: ProduceRequest} dict: {node_id: ProduceRequest} (version depends on api_version)
""" """
requests = {} requests = {}
for node_id, batches in six.iteritems(collated): for node_id, batches in six.iteritems(collated):
@@ -245,7 +246,7 @@ class Sender(threading.Thread):
"""Create a produce request from the given record batches. """Create a produce request from the given record batches.
Returns: Returns:
ProduceRequest ProduceRequest (version depends on api_version)
""" """
produce_records_by_partition = collections.defaultdict(dict) produce_records_by_partition = collections.defaultdict(dict)
for batch in batches: for batch in batches:
@@ -256,7 +257,8 @@ class Sender(threading.Thread):
buf = batch.records.buffer() buf = batch.records.buffer()
produce_records_by_partition[topic][partition] = buf produce_records_by_partition[topic][partition] = buf
return ProduceRequest[0]( version = 1 if self.config['api_version'] >= (0, 9) else 0
return ProduceRequest[version](
required_acks=acks, required_acks=acks,
timeout=timeout, timeout=timeout,
topics=[(topic, list(partition_info.items())) topics=[(topic, list(partition_info.items()))

View File

@@ -17,6 +17,21 @@ class FetchResponse_v0(Struct):
) )
class FetchResponse_v1(Struct):
API_KEY = 1
API_VERSION = 1
SCHEMA = Schema(
('throttle_time_ms', Int32),
('topics', Array(
('topics', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('highwater_offset', Int64),
('message_set', MessageSet)))))
)
class FetchRequest_v0(Struct): class FetchRequest_v0(Struct):
API_KEY = 1 API_KEY = 1
API_VERSION = 0 API_VERSION = 0
@@ -34,5 +49,12 @@ class FetchRequest_v0(Struct):
) )
FetchRequest = [FetchRequest_v0] class FetchRequest_v1(Struct):
FetchResponse = [FetchResponse_v0] API_KEY = 1
API_VERSION = 1
RESPONSE_TYPE = FetchResponse_v1
SCHEMA = FetchRequest_v0.SCHEMA
FetchRequest = [FetchRequest_v0, FetchRequest_v1]
FetchResponse = [FetchResponse_v0, FetchResponse_v1]

View File

@@ -336,11 +336,7 @@ class KafkaProtocol(object):
payloads: list of OffsetFetchRequestPayload payloads: list of OffsetFetchRequestPayload
from_kafka: bool, default False, set True for Kafka-committed offsets from_kafka: bool, default False, set True for Kafka-committed offsets
""" """
if from_kafka: version = 1 if from_kafka else 0
version = 1
else:
version = 0
return kafka.protocol.commit.OffsetFetchRequest[version]( return kafka.protocol.commit.OffsetFetchRequest[version](
consumer_group=group, consumer_group=group,
topics=[( topics=[(

View File

@@ -16,6 +16,20 @@ class ProduceResponse_v0(Struct):
) )
class ProduceResponse_v1(Struct):
API_KEY = 0
API_VERSION = 1
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('offset', Int64))))),
('throttle_time_ms', Int32)
)
class ProduceRequest_v0(Struct): class ProduceRequest_v0(Struct):
API_KEY = 0 API_KEY = 0
API_VERSION = 0 API_VERSION = 0
@@ -31,5 +45,12 @@ class ProduceRequest_v0(Struct):
) )
ProduceRequest = [ProduceRequest_v0] class ProduceRequest_v1(Struct):
ProduceResponse = [ProduceResponse_v0] API_KEY = 0
API_VERSION = 1
RESPONSE_TYPE = ProduceResponse_v1
SCHEMA = ProduceRequest_v0.SCHEMA
ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1]
ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1]