Support consumer_timeout_ms in new KafkaConsumer
This commit is contained in:
@@ -42,6 +42,7 @@ class KafkaConsumer(six.Iterator):
|
||||
'session_timeout_ms': 30000,
|
||||
'send_buffer_bytes': 128 * 1024,
|
||||
'receive_buffer_bytes': 32 * 1024,
|
||||
'consumer_timeout_ms': -1,
|
||||
'api_version': 'auto',
|
||||
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
|
||||
#'metric_reporters': None,
|
||||
@@ -145,6 +146,9 @@ class KafkaConsumer(six.Iterator):
|
||||
(SO_SNDBUF) to use when sending data. Default: 131072
|
||||
receive_buffer_bytes (int): The size of the TCP receive buffer
|
||||
(SO_RCVBUF) to use when reading data. Default: 32768
|
||||
consumer_timeout_ms (int): number of millisecond to throw a timeout
|
||||
exception to the consumer if no message is available for
|
||||
consumption. Default: -1 (dont throw exception)
|
||||
api_version (str): specify which kafka API version to use.
|
||||
0.9 enables full group coordination features; 0.8.2 enables
|
||||
kafka-storage offset commits; 0.8.1 enables zookeeper-storage
|
||||
@@ -183,6 +187,7 @@ class KafkaConsumer(six.Iterator):
|
||||
**self.config)
|
||||
self._closed = False
|
||||
self._iterator = None
|
||||
self._consumer_timeout = float('inf')
|
||||
|
||||
#self.metrics = None
|
||||
if topics:
|
||||
@@ -595,7 +600,7 @@ class KafkaConsumer(six.Iterator):
|
||||
self._fetcher.update_fetch_positions(partitions)
|
||||
|
||||
def _message_generator(self):
|
||||
while True:
|
||||
while time.time() < self._consumer_timeout:
|
||||
if self.config['api_version'] >= (0, 8, 2):
|
||||
self._coordinator.ensure_coordinator_known()
|
||||
|
||||
@@ -612,7 +617,11 @@ class KafkaConsumer(six.Iterator):
|
||||
# init any new fetches (won't resend pending fetches)
|
||||
self._fetcher.init_fetches()
|
||||
self._client.poll(self.config['request_timeout_ms'] / 1000.0)
|
||||
timeout = time.time() + self.config['heartbeat_interval_ms'] / 1000.0
|
||||
timeout = self._consumer_timeout
|
||||
if self.config['api_version'] >= (0, 9):
|
||||
heartbeat_timeout = time.time() + (
|
||||
self.config['heartbeat_interval_ms'] / 1000.0)
|
||||
timeout = min(heartbeat_timeout, timeout)
|
||||
for msg in self._fetcher:
|
||||
yield msg
|
||||
if time.time() > timeout:
|
||||
@@ -624,6 +633,12 @@ class KafkaConsumer(six.Iterator):
|
||||
def __next__(self):
|
||||
if not self._iterator:
|
||||
self._iterator = self._message_generator()
|
||||
|
||||
# consumer_timeout_ms can be used to stop iteration early
|
||||
if self.config['consumer_timeout_ms'] >= 0:
|
||||
self._consumer_timeout = time.time() + (
|
||||
self.config['consumer_timeout_ms'] / 1000.0)
|
||||
|
||||
try:
|
||||
return next(self._iterator)
|
||||
except StopIteration:
|
||||
|
||||
Reference in New Issue
Block a user