Improve KafkaConsumer iterator loop timeouts
- Consider all delayed tasks, not just heartbeat - Include metadata update timeout - Fix second / millisecond bug calling client.poll()
This commit is contained in:
@@ -623,19 +623,19 @@ class KafkaConsumer(six.Iterator):
|
||||
# fetch positions if we have partitions we're subscribed to that we
|
||||
# don't know the offset for
|
||||
if not self._subscription.has_all_fetch_positions():
|
||||
self._update_fetch_positions(self._subscription.missing_fetch_positions())
|
||||
partitions = self._subscription.missing_fetch_positions()
|
||||
self._update_fetch_positions(partitions)
|
||||
|
||||
# init any new fetches (won't resend pending fetches)
|
||||
self._fetcher.init_fetches()
|
||||
self._client.poll(self.config['request_timeout_ms'] / 1000.0)
|
||||
timeout = self._consumer_timeout
|
||||
if self.config['api_version'] >= (0, 9):
|
||||
heartbeat_timeout = time.time() + (
|
||||
self.config['heartbeat_interval_ms'] / 1000.0)
|
||||
timeout = min(heartbeat_timeout, timeout)
|
||||
self._client.poll()
|
||||
|
||||
timeout_at = min(self._consumer_timeout,
|
||||
self._client._delayed_tasks.next_at(),
|
||||
self._client.cluster.ttl() / 1000.0 + time.time())
|
||||
for msg in self._fetcher:
|
||||
yield msg
|
||||
if time.time() > timeout:
|
||||
if time.time() > timeout_at:
|
||||
break
|
||||
|
||||
def __iter__(self): # pylint: disable=non-iterator-returned
|
||||
|
||||
Reference in New Issue
Block a user