PR 331 fixup: do not attempt to get new messages if there are pending retries
This commit is contained in:
@@ -52,13 +52,18 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
|
|||||||
|
|
||||||
while not stop_event.is_set():
|
while not stop_event.is_set():
|
||||||
timeout = batch_time
|
timeout = batch_time
|
||||||
|
count = batch_size
|
||||||
# it's a simplification: we're comparing message sets and
|
|
||||||
# messages: each set can contain [1..batch_size] messages
|
|
||||||
count = batch_size - len(request_tries)
|
|
||||||
send_at = time.time() + timeout
|
send_at = time.time() + timeout
|
||||||
msgset = defaultdict(list)
|
msgset = defaultdict(list)
|
||||||
|
|
||||||
|
# Merging messages will require a bit more work to manage correctly
|
||||||
|
# for now, dont look for new batches if we have old ones to retry
|
||||||
|
if request_tries:
|
||||||
|
count = 0
|
||||||
|
log.debug('Skipping new batch collection to handle retries')
|
||||||
|
else:
|
||||||
|
log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout))
|
||||||
|
|
||||||
# Keep fetching till we gather enough messages or a
|
# Keep fetching till we gather enough messages or a
|
||||||
# timeout is reached
|
# timeout is reached
|
||||||
while count > 0 and timeout >= 0:
|
while count > 0 and timeout >= 0:
|
||||||
|
|||||||
Reference in New Issue
Block a user