Consumers get_messages: allow blocking until some messages are received

Modified MultiProcessConsumer's and SimpleConsumer's `block` argument to
allow integer value which defines the number of messages to block for.

This allows callers to ask for a high number of messages and block only
until some of them are received.  Otherwise callers would have to request
messages one by one or block for some time.
This commit is contained in:
Oskari Saarenmaa
2015-09-18 14:06:51 +03:00
parent b525e1a8d6
commit e74a8ba494
3 changed files with 36 additions and 11 deletions

View File

@@ -226,10 +226,12 @@ class MultiProcessConsumer(Consumer):
Keyword Arguments: Keyword Arguments:
count: Indicates the maximum number of messages to be fetched count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched. block: If True, the API will block till all messages are fetched.
timeout: If block is True, the function will block for the specified If block is a positive integer the API will block until that
time (in seconds) until count messages is fetched. If None, many messages are fetched.
it will block forever. timeout: When blocking is requested the function will block for
the specified time (in seconds) until count messages is
fetched. If None, it will block forever.
""" """
messages = [] messages = []
@@ -252,8 +254,10 @@ class MultiProcessConsumer(Consumer):
if self.queue.empty(): if self.queue.empty():
self.events.start.set() self.events.start.set()
block_next_call = block is True or block > len(messages)
try: try:
partition, message = self.queue.get(block, timeout) partition, message = self.queue.get(block_next_call,
timeout)
except Empty: except Empty:
break break

View File

@@ -272,10 +272,12 @@ class SimpleConsumer(Consumer):
Keyword Arguments: Keyword Arguments:
count: Indicates the maximum number of messages to be fetched count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched. block: If True, the API will block till all messages are fetched.
timeout: If block is True, the function will block for the specified If block is a positive integer the API will block until that
time (in seconds) until count messages is fetched. If None, many messages are fetched.
it will block forever. timeout: When blocking is requested the function will block for
the specified time (in seconds) until count messages is
fetched. If None, it will block forever.
""" """
messages = [] messages = []
if timeout is not None: if timeout is not None:
@@ -286,12 +288,13 @@ class SimpleConsumer(Consumer):
while len(messages) < count: while len(messages) < count:
block_time = timeout - time.time() block_time = timeout - time.time()
log.debug('calling _get_message block=%s timeout=%s', block, block_time) log.debug('calling _get_message block=%s timeout=%s', block, block_time)
result = self._get_message(block, block_time, block_next_call = block is True or block > len(messages)
result = self._get_message(block_next_call, block_time,
get_partition_info=True, get_partition_info=True,
update_offset=False) update_offset=False)
log.debug('got %s from _get_messages', result) log.debug('got %s from _get_messages', result)
if not result: if not result:
if block and (timeout is None or time.time() <= timeout): if block_next_call and (timeout is None or time.time() <= timeout):
continue continue
break break

View File

@@ -204,6 +204,14 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assert_message_count(messages, 5) self.assert_message_count(messages, 5)
self.assertGreaterEqual(t.interval, 1) self.assertGreaterEqual(t.interval, 1)
# Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
# second, get 5 back, no blocking
self.send_messages(0, range(0, 5))
with Timer() as t:
messages = consumer.get_messages(count=10, block=1, timeout=1)
self.assert_message_count(messages, 5)
self.assertLessEqual(t.interval, 1)
consumer.stop() consumer.stop()
@kafka_versions("all") @kafka_versions("all")
@@ -272,6 +280,16 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assert_message_count(messages, 5) self.assert_message_count(messages, 5)
self.assertGreaterEqual(t.interval, 1) self.assertGreaterEqual(t.interval, 1)
# Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
# second, get at least one back, no blocking
self.send_messages(0, range(0, 5))
with Timer() as t:
messages = consumer.get_messages(count=10, block=1, timeout=1)
received_message_count = len(messages)
self.assertGreaterEqual(received_message_count, 1)
self.assert_message_count(messages, received_message_count)
self.assertLessEqual(t.interval, 1)
consumer.stop() consumer.stop()
@kafka_versions("all") @kafka_versions("all")