Write a test for issue 313 - keyed producer failover

This commit is contained in:
Dana Powers
2015-02-06 11:50:34 -08:00
parent be9f4099e0
commit 01f7c0d176

View File

@@ -7,6 +7,7 @@ from . import unittest
from kafka import KafkaClient, SimpleConsumer
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
from kafka.producer.base import Producer
from kafka.producer import KeyedProducer
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
@@ -116,6 +117,45 @@ class TestFailover(KafkaIntegrationTestCase):
# Should be equal to 10 before + 1 recovery + 10 after
self.assert_message_count(topic, 21, partitions=(partition,))
@kafka_versions("all")
def test_switch_leader_keyed_producer(self):
topic = self.topic
producer = KeyedProducer(self.client, async=False)
# Send 10 random messages
for _ in range(10):
key = random_string(3)
msg = random_string(10)
producer.send_messages(topic, key, msg)
# kill leader for partition 0
self._kill_leader(topic, 0)
recovered = False
started = time.time()
timeout = 60
while not recovered and (time.time() - started) < timeout:
try:
key = random_string(3)
msg = random_string(10)
producer.send_messages(topic, key, msg)
if producer.partitioners[topic].partition(key) == 0:
recovered = True
except (FailedPayloadsError, ConnectionError):
logging.debug("caught exception sending message -- will retry")
continue
# Verify we successfully sent the message
self.assertTrue(recovered)
# send some more messages just to make sure no more exceptions
for _ in range(10):
key = random_string(3)
msg = random_string(10)
producer.send_messages(topic, key, msg)
def _send_random_messages(self, producer, topic, partition, n):
for j in range(n):
logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)