From 642b640404ce034161a1c958fd8e44eece2cec07 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 18 Feb 2016 21:54:37 -0800 Subject: [PATCH] Warn if pending batches failed during flush --- kafka/producer/record_accumulator.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 70f45f2..c404e9e 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -457,6 +457,9 @@ class RecordAccumulator(object): """ for batch in self._incomplete.all(): batch.produce_future.await() + assert batch.produce_future.is_done + if batch.produce_future.failed(): + log.warning(batch.produce_future.exception) self._flushes_in_progress.decrement() def abort_incomplete_batches(self):