Add optional timeout parameter to KafkaProducer.flush()

This commit is contained in:
Dana Powers
2016-03-12 23:50:52 -08:00
parent afce25267d
commit e4196d5b7a
2 changed files with 13 additions and 9 deletions

View File

@@ -391,7 +391,7 @@ class KafkaProducer(object):
FutureProduceResult(TopicPartition(topic, partition)),
-1).failure(e)
def flush(self):
def flush(self, timeout=None):
"""
Invoking this method makes all buffered records immediately available
to send (even if linger_ms is greater than 0) and blocks on the
@@ -408,7 +408,7 @@ class KafkaProducer(object):
log.debug("Flushing accumulated records in producer.") # trace
self._accumulator.begin_flush()
self._sender.wakeup()
self._accumulator.await_flush_completion()
self._accumulator.await_flush_completion(timeout=timeout)
def _ensure_valid_record_size(self, size):
"""Validate that the record size isn't too large."""

View File

@@ -454,16 +454,20 @@ class RecordAccumulator(object):
"""
self._flushes_in_progress.increment()
def await_flush_completion(self):
def await_flush_completion(self, timeout=None):
"""
Mark all partitions as ready to send and block until the send is complete
"""
for batch in self._incomplete.all():
batch.produce_future.await()
assert batch.produce_future.is_done
if batch.produce_future.failed():
log.warning(batch.produce_future.exception)
self._flushes_in_progress.decrement()
try:
for batch in self._incomplete.all():
log.debug('Waiting on produce to %s',
batch.produce_future.topic_partition)
assert batch.produce_future.await(timeout=timeout), 'Timeout waiting for future'
assert batch.produce_future.is_done, 'Future not done?'
if batch.produce_future.failed():
log.warning(batch.produce_future.exception)
finally:
self._flushes_in_progress.decrement()
def abort_incomplete_batches(self):
"""