Merge pull request #412 from haosdent/seek_absolute_offset

fix #410 SimpleConsumer cannot seek to an absolute offset.
This commit is contained in:
Dana Powers
2015-06-20 09:39:17 -07:00
2 changed files with 60 additions and 17 deletions

View File

@@ -188,33 +188,62 @@ class SimpleConsumer(Consumer):
""" """
self.partition_info = True self.partition_info = True
def seek(self, offset, whence): def seek(self, offset, whence=None, partition=None):
""" """
Alter the current offset in the consumer, similar to fseek Alter the current offset in the consumer, similar to fseek
Arguments: Arguments:
offset: how much to modify the offset offset: how much to modify the offset
whence: where to modify it from whence: where to modify it from, default is None
* 0 is relative to the earliest available offset (head) * None is an absolute offset
* 1 is relative to the current offset * 0 is relative to the earliest available offset (head)
* 2 is relative to the latest known offset (tail) * 1 is relative to the current offset
* 2 is relative to the latest known offset (tail)
partition: modify which partition, default is None.
If partition is None, would modify all partitions.
""" """
if whence == 1: # relative to current position if whence is None: # set an absolute offset
for partition, _offset in self.offsets.items(): if partition is None:
self.offsets[partition] = _offset + offset for tmp_partition in self.offsets:
self.offsets[tmp_partition] = offset
else:
self.offsets[partition] = offset
elif whence == 1: # relative to current position
if partition is None:
for tmp_partition, _offset in self.offsets.items():
self.offsets[tmp_partition] = _offset + offset
else:
self.offsets[partition] += offset
elif whence in (0, 2): # relative to beginning or end elif whence in (0, 2): # relative to beginning or end
# divide the request offset by number of partitions,
# distribute the remained evenly
(delta, rem) = divmod(offset, len(self.offsets))
deltas = {}
for partition, r in izip_longest(self.offsets.keys(),
repeat(1, rem), fillvalue=0):
deltas[partition] = delta + r
reqs = [] reqs = []
for partition in self.offsets.keys(): deltas = {}
if partition is None:
# divide the request offset by number of partitions,
# distribute the remained evenly
(delta, rem) = divmod(offset, len(self.offsets))
for tmp_partition, r in izip_longest(self.offsets.keys(),
repeat(1, rem),
fillvalue=0):
deltas[tmp_partition] = delta + r
for tmp_partition in self.offsets.keys():
if whence == 0:
reqs.append(OffsetRequest(self.topic,
tmp_partition,
-2,
1))
elif whence == 2:
reqs.append(OffsetRequest(self.topic,
tmp_partition,
-1,
1))
else:
pass
else:
deltas[partition] = offset
if whence == 0: if whence == 0:
reqs.append(OffsetRequest(self.topic, partition, -2, 1)) reqs.append(OffsetRequest(self.topic, partition, -2, 1))
elif whence == 2: elif whence == 2:

View File

@@ -164,6 +164,20 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.seek(-13, 2) consumer.seek(-13, 2)
self.assert_message_count([ message for message in consumer ], 13) self.assert_message_count([ message for message in consumer ], 13)
# Set absolute offset
consumer.seek(100)
self.assert_message_count([ message for message in consumer ], 0)
consumer.seek(100, partition=0)
self.assert_message_count([ message for message in consumer ], 0)
consumer.seek(101, partition=1)
self.assert_message_count([ message for message in consumer ], 0)
consumer.seek(90, partition=0)
self.assert_message_count([ message for message in consumer ], 10)
consumer.seek(20, partition=1)
self.assert_message_count([ message for message in consumer ], 80)
consumer.seek(0, partition=1)
self.assert_message_count([ message for message in consumer ], 100)
consumer.stop() consumer.stop()
@kafka_versions("all") @kafka_versions("all")