Warn if pending batches failed during flush
This commit is contained in:
@@ -457,6 +457,9 @@ class RecordAccumulator(object):
|
|||||||
"""
|
"""
|
||||||
for batch in self._incomplete.all():
|
for batch in self._incomplete.all():
|
||||||
batch.produce_future.await()
|
batch.produce_future.await()
|
||||||
|
assert batch.produce_future.is_done
|
||||||
|
if batch.produce_future.failed():
|
||||||
|
log.warning(batch.produce_future.exception)
|
||||||
self._flushes_in_progress.decrement()
|
self._flushes_in_progress.decrement()
|
||||||
|
|
||||||
def abort_incomplete_batches(self):
|
def abort_incomplete_batches(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user