Support simple message iteration in Fetcher and new KafkaConsumer
This commit is contained in:
@@ -316,6 +316,55 @@ class Fetcher(object):
|
||||
tp, fetch_offset)
|
||||
return dict(drained)
|
||||
|
||||
def __iter__(self):
|
||||
"""Iterate over fetched_records"""
|
||||
if self._subscriptions.needs_partition_assignment:
|
||||
raise StopIteration('Subscription needs partition assignment')
|
||||
|
||||
self._raise_if_offset_out_of_range()
|
||||
self._raise_if_unauthorized_topics()
|
||||
self._raise_if_record_too_large()
|
||||
|
||||
while self._records:
|
||||
(fetch_offset, tp, messages) = self._records.popleft()
|
||||
|
||||
if not self._subscriptions.is_assigned(tp):
|
||||
# this can happen when a rebalance happened before
|
||||
# fetched records are returned
|
||||
log.warning("Not returning fetched records for partition %s"
|
||||
" since it is no longer assigned", tp)
|
||||
continue
|
||||
|
||||
# note that the consumed position should always be available
|
||||
# as long as the partition is still assigned
|
||||
consumed = self._subscriptions.assignment[tp].consumed
|
||||
if not self._subscriptions.is_fetchable(tp):
|
||||
# this can happen when a partition consumption paused before
|
||||
# fetched records are returned
|
||||
log.warning("Not returning fetched records for assigned partition"
|
||||
" %s since it is no longer fetchable", tp)
|
||||
|
||||
# we also need to reset the fetch positions to pretend we did
|
||||
# not fetch this partition in the previous request at all
|
||||
self._subscriptions.assignment[tp].fetched = consumed
|
||||
|
||||
elif fetch_offset == consumed:
|
||||
# TODO: handle compressed messages
|
||||
for offset, size, msg in messages:
|
||||
if msg.attributes:
|
||||
raise Errors.KafkaError('Compressed messages not supported yet')
|
||||
elif self.config['check_crcs'] and not msg.validate_crc():
|
||||
raise Errors.InvalidMessageError(msg)
|
||||
|
||||
self._subscriptions.assignment[tp].consumed = offset + 1
|
||||
key, value = self._deserialize(msg)
|
||||
yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
|
||||
else:
|
||||
# these records aren't next in line based on the last consumed
|
||||
# position, ignore them they must be from an obsolete request
|
||||
log.warning("Ignoring fetched records for %s at offset %s",
|
||||
tp, fetch_offset)
|
||||
|
||||
def _deserialize(self, msg):
|
||||
if self.config['key_deserializer']:
|
||||
key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable
|
||||
|
||||
@@ -4,6 +4,8 @@ import copy
|
||||
import logging
|
||||
import time
|
||||
|
||||
import six
|
||||
|
||||
import kafka.common as Errors
|
||||
|
||||
from kafka.client_async import KafkaClient
|
||||
@@ -565,3 +567,23 @@ class KafkaConsumer(object):
|
||||
|
||||
# then do any offset lookups in case some positions are not known
|
||||
self._fetcher.update_fetch_positions(partitions)
|
||||
|
||||
def __iter__(self):
|
||||
while True:
|
||||
# records = self._poll_once(self.config['request_timeout_ms'])
|
||||
self._coordinator.ensure_coordinator_known()
|
||||
|
||||
# ensure we have partitions assigned if we expect to
|
||||
if self._subscription.partitions_auto_assigned():
|
||||
self._coordinator.ensure_active_group()
|
||||
|
||||
# fetch positions if we have partitions we're subscribed to that we
|
||||
# don't know the offset for
|
||||
if not self._subscription.has_all_fetch_positions():
|
||||
self._update_fetch_positions(self._subscription.missing_fetch_positions())
|
||||
|
||||
# init any new fetches (won't resend pending fetches)
|
||||
self._fetcher.init_fetches()
|
||||
self._client.poll(self.config['request_timeout_ms'] / 1000.0)
|
||||
for msg in self._fetcher:
|
||||
yield msg
|
||||
|
||||
Reference in New Issue
Block a user