Add simple KafkaProducer -> KafkaConsumer integration test
This commit is contained in:
34
test/test_producer.py
Normal file
34
test/test_producer.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import pytest
|
||||
|
||||
from kafka import KafkaConsumer, KafkaProducer
|
||||
from test.conftest import version
|
||||
from test.testutil import random_string
|
||||
|
||||
|
||||
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
|
||||
def test_end_to_end(kafka_broker):
|
||||
connect_str = 'localhost:' + str(kafka_broker.port)
|
||||
producer = KafkaProducer(bootstrap_servers=connect_str,
|
||||
max_block_ms=10000,
|
||||
value_serializer=str.encode)
|
||||
consumer = KafkaConsumer(bootstrap_servers=connect_str,
|
||||
consumer_timeout_ms=10000,
|
||||
auto_offset_reset='earliest',
|
||||
value_deserializer=bytes.decode)
|
||||
|
||||
topic = random_string(5)
|
||||
|
||||
for i in range(1000):
|
||||
producer.send(topic, 'msg %d' % i)
|
||||
producer.flush()
|
||||
producer.close()
|
||||
|
||||
consumer.subscribe([topic])
|
||||
msgs = set()
|
||||
for i in range(1000):
|
||||
try:
|
||||
msgs.add(next(consumer).value)
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
assert msgs == set(['msg %d' % i for i in range(1000)])
|
Reference in New Issue
Block a user