client.reinit() can raise an exception; catch in async producer
This commit is contained in:
@@ -78,9 +78,17 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
|
|||||||
retrying messages after stop_event is set, defaults to 30.
|
retrying messages after stop_event is set, defaults to 30.
|
||||||
"""
|
"""
|
||||||
request_tries = {}
|
request_tries = {}
|
||||||
client.reinit()
|
|
||||||
stop_at = None
|
|
||||||
|
|
||||||
|
while not stop_event.is_set():
|
||||||
|
try:
|
||||||
|
client.reinit()
|
||||||
|
except Exception as e:
|
||||||
|
log.warn('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms)
|
||||||
|
time.sleep(float(retry_options.backoff_ms) / 1000)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
stop_at = None
|
||||||
while not (stop_event.is_set() and queue.empty() and not request_tries):
|
while not (stop_event.is_set() and queue.empty() and not request_tries):
|
||||||
|
|
||||||
# Handle stop_timeout
|
# Handle stop_timeout
|
||||||
|
Reference in New Issue
Block a user