Fix task_done checks when no previous commit exists; add test
This commit is contained in:
@@ -485,12 +485,11 @@ class KafkaConsumer(object):
|
|||||||
offset, prev_done)
|
offset, prev_done)
|
||||||
|
|
||||||
# Warn on smaller offsets than previous commit
|
# Warn on smaller offsets than previous commit
|
||||||
# "commit" offsets are actually the offset of the next # message to fetch.
|
# "commit" offsets are actually the offset of the next message to fetch.
|
||||||
# so task_done should be compared with (commit - 1)
|
prev_commit = self._offsets.commit[topic_partition]
|
||||||
prev_done = (self._offsets.commit[topic_partition] - 1)
|
if prev_commit is not None and ((offset + 1) <= prev_commit):
|
||||||
if prev_done is not None and (offset <= prev_done):
|
logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d',
|
||||||
logger.warning('Marking task_done on a previously committed offset?: %d <= %d',
|
offset, prev_commit)
|
||||||
offset, prev_done)
|
|
||||||
|
|
||||||
self._offsets.task_done[topic_partition] = offset
|
self._offsets.task_done[topic_partition] = offset
|
||||||
|
|
||||||
|
@@ -353,3 +353,46 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
messages.add((msg.partition, msg.offset))
|
messages.add((msg.partition, msg.offset))
|
||||||
self.assertEqual(len(messages), 5)
|
self.assertEqual(len(messages), 5)
|
||||||
self.assertGreaterEqual(t.interval, 1)
|
self.assertGreaterEqual(t.interval, 1)
|
||||||
|
|
||||||
|
@kafka_versions("0.8.1", "0.8.1.1")
|
||||||
|
def test_kafka_consumer__offset_commit_resume(self):
|
||||||
|
GROUP_ID = random_string(10)
|
||||||
|
|
||||||
|
self.send_messages(0, range(0, 100))
|
||||||
|
self.send_messages(1, range(100, 200))
|
||||||
|
|
||||||
|
# Start a consumer
|
||||||
|
consumer1 = self.kafka_consumer(
|
||||||
|
group_id = GROUP_ID,
|
||||||
|
auto_commit_enable = True,
|
||||||
|
auto_commit_interval_ms = None,
|
||||||
|
auto_commit_interval_messages = 20,
|
||||||
|
auto_offset_reset='smallest',
|
||||||
|
)
|
||||||
|
|
||||||
|
# Grab the first 195 messages
|
||||||
|
output_msgs1 = []
|
||||||
|
for _ in xrange(195):
|
||||||
|
m = consumer1.next()
|
||||||
|
output_msgs1.append(m)
|
||||||
|
consumer1.task_done(m)
|
||||||
|
self.assert_message_count(output_msgs1, 195)
|
||||||
|
|
||||||
|
# The total offset across both partitions should be at 180
|
||||||
|
consumer2 = self.kafka_consumer(
|
||||||
|
group_id = GROUP_ID,
|
||||||
|
auto_commit_enable = True,
|
||||||
|
auto_commit_interval_ms = None,
|
||||||
|
auto_commit_interval_messages = 20,
|
||||||
|
consumer_timeout_ms = 100,
|
||||||
|
auto_offset_reset='smallest',
|
||||||
|
)
|
||||||
|
|
||||||
|
# 181-200
|
||||||
|
output_msgs2 = []
|
||||||
|
with self.assertRaises(ConsumerTimeout):
|
||||||
|
while True:
|
||||||
|
m = consumer2.next()
|
||||||
|
output_msgs2.append(m)
|
||||||
|
self.assert_message_count(output_msgs2, 20)
|
||||||
|
self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)
|
||||||
|
Reference in New Issue
Block a user