Add private methods _does_auto_commit_ms and _does_auto_commit_messages

This commit is contained in:
Dana Powers
2014-09-21 19:34:48 -07:00
parent 1a06f79cb8
commit 08df93b3f9

View File

@@ -494,7 +494,7 @@ class KafkaConsumer(object):
self._offsets.task_done[topic_partition] = offset self._offsets.task_done[topic_partition] = offset
# Check for auto-commit # Check for auto-commit
if self._config['auto_commit_enable']: if self._does_auto_commit_messages():
self._incr_auto_commit_message_count() self._incr_auto_commit_message_count()
if self._should_auto_commit(): if self._should_auto_commit():
@@ -672,14 +672,11 @@ class KafkaConsumer(object):
# #
def _should_auto_commit(self): def _should_auto_commit(self):
if not self._config['auto_commit_enable']: if self._does_auto_commit_ms():
return False
if self._config['auto_commit_interval_ms'] > 0:
if time.time() >= self._next_commit_time: if time.time() >= self._next_commit_time:
return True return True
if self._config['auto_commit_interval_messages'] > 0: if self._does_auto_commit_messages():
if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']: if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']:
return True return True
@@ -688,12 +685,30 @@ class KafkaConsumer(object):
def _reset_auto_commit(self): def _reset_auto_commit(self):
self._uncommitted_message_count = 0 self._uncommitted_message_count = 0
self._next_commit_time = None self._next_commit_time = None
if self._config['auto_commit_interval_ms'] > 0: if self._does_auto_commit_ms():
self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
def _incr_auto_commit_message_count(self, n=1): def _incr_auto_commit_message_count(self, n=1):
self._uncommitted_message_count += n self._uncommitted_message_count += n
def _does_auto_commit_ms(self):
if not self._config['auto_commit_enable']:
return False
conf = self._config['auto_commit_interval_ms']
if conf is not None and conf > 0:
return True
return False
def _does_auto_commit_messages(self):
if not self._config['auto_commit_enable']:
return False
conf = self._config['auto_commit_interval_messages']
if conf is not None and conf > 0:
return True
return False
# #
# Message iterator private methods # Message iterator private methods
# #