Produce messages to both partitions in async producer leader switch test
This commit is contained in:
@@ -98,10 +98,14 @@ class TestFailover(KafkaIntegrationTestCase):
|
|||||||
|
|
||||||
# Test the base class Producer -- send_messages to a specific partition
|
# Test the base class Producer -- send_messages to a specific partition
|
||||||
producer = Producer(self.client, async=True,
|
producer = Producer(self.client, async=True,
|
||||||
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
|
batch_send_every_n=15,
|
||||||
|
batch_send_every_t=3,
|
||||||
|
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
|
||||||
|
async_log_messages_on_error=False)
|
||||||
|
|
||||||
# Send 10 random messages
|
# Send 10 random messages
|
||||||
self._send_random_messages(producer, topic, partition, 10)
|
self._send_random_messages(producer, topic, partition, 10)
|
||||||
|
self._send_random_messages(producer, topic, partition + 1, 10)
|
||||||
|
|
||||||
# kill leader for partition
|
# kill leader for partition
|
||||||
self._kill_leader(topic, partition)
|
self._kill_leader(topic, partition)
|
||||||
@@ -110,9 +114,11 @@ class TestFailover(KafkaIntegrationTestCase):
|
|||||||
|
|
||||||
# in async mode, this should return immediately
|
# in async mode, this should return immediately
|
||||||
producer.send_messages(topic, partition, b'success')
|
producer.send_messages(topic, partition, b'success')
|
||||||
|
producer.send_messages(topic, partition + 1, b'success')
|
||||||
|
|
||||||
# send to new leader
|
# send to new leader
|
||||||
self._send_random_messages(producer, topic, partition, 10)
|
self._send_random_messages(producer, topic, partition, 10)
|
||||||
|
self._send_random_messages(producer, topic, partition + 1, 10)
|
||||||
|
|
||||||
# Stop the producer and wait for it to shutdown
|
# Stop the producer and wait for it to shutdown
|
||||||
producer.stop()
|
producer.stop()
|
||||||
@@ -129,6 +135,8 @@ class TestFailover(KafkaIntegrationTestCase):
|
|||||||
# Should be equal to 10 before + 1 recovery + 10 after
|
# Should be equal to 10 before + 1 recovery + 10 after
|
||||||
self.assert_message_count(topic, 21, partitions=(partition,),
|
self.assert_message_count(topic, 21, partitions=(partition,),
|
||||||
at_least=True)
|
at_least=True)
|
||||||
|
self.assert_message_count(topic, 21, partitions=(partition + 1,),
|
||||||
|
at_least=True)
|
||||||
|
|
||||||
@kafka_versions("all")
|
@kafka_versions("all")
|
||||||
def test_switch_leader_keyed_producer(self):
|
def test_switch_leader_keyed_producer(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user