Merge branch 'issue-22'

Conflicts:
	kafka/consumer.py
This commit is contained in:
David Arthur
2013-05-28 10:36:55 -04:00
2 changed files with 45 additions and 0 deletions

View File

@@ -103,6 +103,30 @@ class SimpleConsumer(object):
else: else:
raise ValueError("Unexpected value for `whence`, %d" % whence) raise ValueError("Unexpected value for `whence`, %d" % whence)
def pending(self, partitions=[]):
"""
Gets the pending message count
partitions: list of partitions to check for, default is to check all
"""
if len(partitions) == 0:
partitions = self.offsets.keys()
total = 0
reqs = []
for partition in partitions:
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
resps = self.client.send_offset_request(reqs)
for resp in resps:
partition = resp.partition
pending = resp.offsets[0]
offset = self.offsets[partition]
total += pending - offset - (1 if offset > 0 else 0)
return total
def _timed_commit(self): def _timed_commit(self):
""" """
Commit offsets as part of timer Commit offsets as part of timer

View File

@@ -456,6 +456,27 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(len(all_messages), 13) self.assertEquals(len(all_messages), 13)
def test_pending(self):
# Produce 10 messages to partition 0 and 1
produce1 = ProduceRequest("test_pending", 0, messages=[
create_message("Test message 0 %d" % i) for i in range(10)
])
for resp in self.client.send_produce_request([produce1]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
produce2 = ProduceRequest("test_pending", 1, messages=[
create_message("Test message 1 %d" % i) for i in range(10)
])
for resp in self.client.send_produce_request([produce2]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
consumer = SimpleConsumer(self.client, "group1", "test_pending")
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)