From 9712f613c9e7e4b0436f501b513249eab4edc4e9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 6 Jun 2015 16:09:30 -0700 Subject: [PATCH] PR 331 fixup: do not attempt to get new messages if there are pending retries --- kafka/producer/base.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 15768be..2f47d87 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -52,13 +52,18 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, while not stop_event.is_set(): timeout = batch_time - - # it's a simplification: we're comparing message sets and - # messages: each set can contain [1..batch_size] messages - count = batch_size - len(request_tries) + count = batch_size send_at = time.time() + timeout msgset = defaultdict(list) + # Merging messages will require a bit more work to manage correctly + # for now, dont look for new batches if we have old ones to retry + if request_tries: + count = 0 + log.debug('Skipping new batch collection to handle retries') + else: + log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout)) + # Keep fetching till we gather enough messages or a # timeout is reached while count > 0 and timeout >= 0: