raise KafkaTimeoutException when flush times out

This commit is contained in:
Andrew Kowalik
2017-05-04 10:46:38 -07:00
committed by Jeff Widman
parent bb626dbffb
commit 83f2d322a9
2 changed files with 9 additions and 2 deletions

View File

@@ -554,6 +554,10 @@ class KafkaProducer(object):
Arguments:
timeout (float, optional): timeout in seconds to wait for completion.
Raises:
KafkaTimeoutError: failure to flush buffered records within the
provided timeout
"""
log.debug("Flushing accumulated records in producer.") # trace
self._accumulator.begin_flush()

View File

@@ -526,8 +526,11 @@ class RecordAccumulator(object):
for batch in self._incomplete.all():
log.debug('Waiting on produce to %s',
batch.produce_future.topic_partition)
assert batch.produce_future.wait(timeout=timeout), 'Timeout waiting for future'
assert batch.produce_future.is_done, 'Future not done?'
if not batch.produce_future.wait(timeout=timeout):
raise Errors.KafkaTimeoutError('Timeout waiting for future')
if not batch.produce_future.is_done:
raise Errors.UnknownError('Future not done')
if batch.produce_future.failed():
log.warning(batch.produce_future.exception)
finally: