Merge pull request #527 from dpkp/iterator_timeout

Fix internal timeout / sleep handling in consumer iterator
This commit is contained in:
Dana Powers
2016-01-30 10:43:43 -08:00

View File

@@ -635,26 +635,22 @@ class KafkaConsumer(six.Iterator):
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
poll_ms = 1000 * (self._consumer_timeout - time.time())
if not self._fetcher.in_flight_fetches():
poll_ms = 0
self._client.poll(poll_ms)
# We need to make sure we at least keep up with scheduled tasks,
# like heartbeats, auto-commits, and metadata refreshes
timeout_at = min(self._consumer_timeout,
self._client._delayed_tasks.next_at() + time.time(),
self._client.cluster.ttl() / 1000.0 + time.time())
timeout_at = self._next_timeout()
if self.config['api_version'] >= (0, 9):
if self.config['group_id'] is not None and not self.assignment():
sleep_time = time.time() - timeout_at
log.debug('No partitions assigned; sleeping for %s', sleep_time)
time.sleep(sleep_time)
continue
poll_ms = 1000 * (time.time() - self._consumer_timeout)
# Dont bother blocking if there are no fetches
if not self._fetcher.in_flight_fetches():
poll_ms = 0
self._client.poll(poll_ms)
sleep_time = max(timeout_at - time.time(), 0)
if sleep_time > 0 and not self._client.in_flight_request_count():
log.debug('No partitions assigned; sleeping for %s', sleep_time)
time.sleep(sleep_time)
continue
if time.time() > timeout_at:
continue
@@ -672,6 +668,11 @@ class KafkaConsumer(six.Iterator):
else:
self._fetcher.init_fetches()
def _next_timeout(self):
return min(self._consumer_timeout,
self._client._delayed_tasks.next_at() + time.time(),
self._client.cluster.ttl() / 1000.0 + time.time())
def __iter__(self): # pylint: disable=non-iterator-returned
return self