Reorganize init_fetches calls during iteration
Generally should not init_fetches while the generator has pending messages; this revision adds an explicit check / noop to the public interface, and uses a private method internally to attempt to pipeline fetch requests.
This commit is contained in:
@@ -86,9 +86,21 @@ class Fetcher(six.Iterator):
|
||||
def init_fetches(self):
|
||||
"""Send FetchRequests asynchronously for all assigned partitions.
|
||||
|
||||
Note: noop if there are unconsumed records internal to the fetcher
|
||||
|
||||
Returns:
|
||||
List of Futures: each future resolves to a FetchResponse
|
||||
"""
|
||||
# We need to be careful when creating fetch records during iteration
|
||||
# so we verify that there are no records in the deque, or in an
|
||||
# iterator
|
||||
if self._records or self._iterator:
|
||||
log.debug('Skipping init_fetches because there are unconsumed'
|
||||
' records internally')
|
||||
return []
|
||||
return self._init_fetches()
|
||||
|
||||
def _init_fetches(self):
|
||||
futures = []
|
||||
for node_id, request in six.iteritems(self._create_fetch_requests()):
|
||||
if self._client.ready(node_id):
|
||||
@@ -339,6 +351,11 @@ class Fetcher(six.Iterator):
|
||||
self._raise_if_unauthorized_topics()
|
||||
self._raise_if_record_too_large()
|
||||
|
||||
# Send additional FetchRequests when the internal queue is low
|
||||
# this should enable moderate pipelining
|
||||
if len(self._records) == 1:
|
||||
self._init_fetches()
|
||||
|
||||
(fetch_offset, tp, messages) = self._records.popleft()
|
||||
|
||||
if not self._subscriptions.is_assigned(tp):
|
||||
@@ -378,11 +395,6 @@ class Fetcher(six.Iterator):
|
||||
log.debug("Ignoring fetched records for %s at offset %s",
|
||||
tp, fetch_offset)
|
||||
|
||||
# Send any additional FetchRequests that we can now
|
||||
# this will likely fetch each partition individually, rather than
|
||||
# fetch multiple partitions in bulk when they are on the same broker
|
||||
self.init_fetches()
|
||||
|
||||
def __iter__(self): # pylint: disable=non-iterator-returned
|
||||
return self
|
||||
|
||||
|
||||
@@ -626,8 +626,6 @@ class KafkaConsumer(six.Iterator):
|
||||
partitions = self._subscription.missing_fetch_positions()
|
||||
self._update_fetch_positions(partitions)
|
||||
|
||||
# init any new fetches (won't resend pending fetches)
|
||||
self._fetcher.init_fetches()
|
||||
self._client.poll(
|
||||
max(0, self._consumer_timeout - time.time()) * 1000)
|
||||
|
||||
@@ -641,6 +639,8 @@ class KafkaConsumer(six.Iterator):
|
||||
if time.time() > timeout_at:
|
||||
log.debug("internal iterator timeout - breaking for poll")
|
||||
break
|
||||
else:
|
||||
self._fetcher.init_fetches()
|
||||
|
||||
def __iter__(self): # pylint: disable=non-iterator-returned
|
||||
return self
|
||||
@@ -648,6 +648,7 @@ class KafkaConsumer(six.Iterator):
|
||||
def __next__(self):
|
||||
if not self._iterator:
|
||||
self._iterator = self._message_generator()
|
||||
self._fetcher.init_fetches()
|
||||
|
||||
# consumer_timeout_ms can be used to stop iteration early
|
||||
if self.config['consumer_timeout_ms'] >= 0:
|
||||
|
||||
Reference in New Issue
Block a user