Batched message methods now return dict of lists
This commit is contained in:
@@ -267,14 +267,14 @@ class Fetcher(six.Iterator):
|
|||||||
AssertionError: if used with iterator (incompatible)
|
AssertionError: if used with iterator (incompatible)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: {TopicPartition: deque([messages])}
|
dict: {TopicPartition: [messages]}
|
||||||
"""
|
"""
|
||||||
assert self._iterator is None, (
|
assert self._iterator is None, (
|
||||||
'fetched_records is incompatible with message iterator')
|
'fetched_records is incompatible with message iterator')
|
||||||
if self._subscriptions.needs_partition_assignment:
|
if self._subscriptions.needs_partition_assignment:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
drained = collections.defaultdict(collections.deque)
|
drained = collections.defaultdict(list)
|
||||||
self._raise_if_offset_out_of_range()
|
self._raise_if_offset_out_of_range()
|
||||||
self._raise_if_unauthorized_topics()
|
self._raise_if_unauthorized_topics()
|
||||||
self._raise_if_record_too_large()
|
self._raise_if_record_too_large()
|
||||||
|
|||||||
@@ -343,7 +343,7 @@ class KafkaConsumer(six.Iterator):
|
|||||||
records that are available now. Must not be negative. Default: 0
|
records that are available now. Must not be negative. Default: 0
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: topic to deque of records since the last fetch for the
|
dict: topic to list of records since the last fetch for the
|
||||||
subscribed list of topics and partitions
|
subscribed list of topics and partitions
|
||||||
"""
|
"""
|
||||||
assert timeout_ms >= 0, 'Timeout must not be negative'
|
assert timeout_ms >= 0, 'Timeout must not be negative'
|
||||||
@@ -377,7 +377,7 @@ class KafkaConsumer(six.Iterator):
|
|||||||
timeout_ms (int): The maximum time in milliseconds to block
|
timeout_ms (int): The maximum time in milliseconds to block
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: map of topic to deque of records (may be empty)
|
dict: map of topic to list of records (may be empty)
|
||||||
"""
|
"""
|
||||||
# TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
|
# TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
|
||||||
self._coordinator.ensure_coordinator_known()
|
self._coordinator.ensure_coordinator_known()
|
||||||
|
|||||||
Reference in New Issue
Block a user