add private methods _set_consumer_timeout_start() and _check_consumer_timeout()

This commit is contained in:
Dana Powers
2014-09-11 11:44:53 -07:00
parent 82b3e011fa
commit b264d8f517

View File

@@ -86,7 +86,7 @@ class KafkaConsumer(object):
deserializer_class=Event.from_bytes,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
consumer_timeout_ms=-1,
consumer_timeout_ms=-1
Configuration parameters are described in more detail at
@@ -110,7 +110,6 @@ class KafkaConsumer(object):
# Currently unused
'socket_receive_buffer_bytes': 64 * 1024,
'refresh_leader_backoff_ms': 200,
'num_consumer_fetchers': 1,
'default_fetcher_backoff_ms': 1000,
'queued_max_message_chunks': 10,
@@ -213,10 +212,7 @@ class KafkaConsumer(object):
return self
def next(self):
consumer_timeout = False
if self._get_config('consumer_timeout_ms') >= 0:
consumer_timeout = time.time() + (self._get_config('consumer_timeout_ms') / 1000.0)
self._set_consumer_timeout_start()
while True:
# Check for auto-commit
@@ -230,8 +226,7 @@ class KafkaConsumer(object):
except StopIteration:
self._msg_iter = self.fetch_messages()
if consumer_timeout and time.time() > consumer_timeout:
raise ConsumerTimeout('Consumer timed out waiting to fetch messages')
self._check_consumer_timeout()
def offsets(self, group=None):
if not group:
@@ -446,6 +441,16 @@ class KafkaConsumer(object):
self.client.load_metadata_for_topics()
except KafkaUnavailableError:
logger.warning("Unable to refresh topic metadata... cluster unavailable")
self._check_consumer_timeout()
else:
logger.info("Topic metadata refreshed")
return
def _set_consumer_timeout_start(self):
self._consumer_timeout = False
if self._get_config('consumer_timeout_ms') >= 0:
self._consumer_timeout = time.time() + (self._get_config('consumer_timeout_ms') / 1000.0)
def _check_consumer_timeout(self):
if self._consumer_timeout and time.time() > self._consumer_timeout:
raise ConsumerTimeout('Consumer timed out after %d ms' % + self._get_config('consumer_timeout_ms'))