Use producer retries and flush timeout in producer end-to-end test
This commit is contained in:
@@ -21,6 +21,7 @@ def test_end_to_end(kafka_broker, compression):
|
||||
|
||||
connect_str = 'localhost:' + str(kafka_broker.port)
|
||||
producer = KafkaProducer(bootstrap_servers=connect_str,
|
||||
retries=5,
|
||||
max_block_ms=10000,
|
||||
compression_type=compression,
|
||||
value_serializer=str.encode)
|
||||
@@ -34,7 +35,7 @@ def test_end_to_end(kafka_broker, compression):
|
||||
|
||||
for i in range(1000):
|
||||
producer.send(topic, 'msg %d' % i)
|
||||
producer.flush()
|
||||
producer.flush(timeout=30)
|
||||
producer.close()
|
||||
|
||||
consumer.subscribe([topic])
|
||||
|
||||
Reference in New Issue
Block a user