New API for checking pending message count
This commit is contained in:
@@ -101,6 +101,30 @@ class SimpleConsumer(object):
|
||||
else:
|
||||
raise ValueError("Unexpected value for `whence`, %d" % whence)
|
||||
|
||||
def pending(self, partitions=[]):
|
||||
"""
|
||||
Gets the pending message count
|
||||
|
||||
partitions: list of partitions to check for, default is to check all
|
||||
"""
|
||||
if len(partitions) == 0:
|
||||
partitions = self.offsets.keys()
|
||||
|
||||
total = 0
|
||||
reqs = []
|
||||
|
||||
for partition in partitions:
|
||||
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
|
||||
|
||||
resps = self.client.send_offset_request(reqs)
|
||||
for resp in resps:
|
||||
partition = resp.partition
|
||||
pending = resp.offsets[0]
|
||||
offset = self.offsets[partition]
|
||||
total += pending - offset - (1 if offset > 0 else 0)
|
||||
|
||||
return total
|
||||
|
||||
def commit(self, partitions=[]):
|
||||
"""
|
||||
Commit offsets for this consumer
|
||||
|
||||
Reference in New Issue
Block a user