Retry failed messages in failover integration tests; use module logger
This commit is contained in:
@@ -2,8 +2,6 @@ import logging
|
||||
import os
|
||||
import time
|
||||
|
||||
from . import unittest
|
||||
|
||||
from kafka import KafkaClient, SimpleConsumer, KeyedProducer
|
||||
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
|
||||
from kafka.producer.base import Producer
|
||||
@@ -15,6 +13,9 @@ from test.testutil import (
|
||||
)
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestFailover(KafkaIntegrationTestCase):
|
||||
create_client = False
|
||||
|
||||
@@ -73,12 +74,12 @@ class TestFailover(KafkaIntegrationTestCase):
|
||||
timeout = 60
|
||||
while not recovered and (time.time() - started) < timeout:
|
||||
try:
|
||||
logging.debug("attempting to send 'success' message after leader killed")
|
||||
log.debug("attempting to send 'success' message after leader killed")
|
||||
producer.send_messages(topic, partition, b'success')
|
||||
logging.debug("success!")
|
||||
log.debug("success!")
|
||||
recovered = True
|
||||
except (FailedPayloadsError, ConnectionError):
|
||||
logging.debug("caught exception sending message -- will retry")
|
||||
log.debug("caught exception sending message -- will retry")
|
||||
continue
|
||||
|
||||
# Verify we successfully sent the message
|
||||
@@ -110,7 +111,7 @@ class TestFailover(KafkaIntegrationTestCase):
|
||||
# kill leader for partition
|
||||
self._kill_leader(topic, partition)
|
||||
|
||||
logging.debug("attempting to send 'success' message after leader killed")
|
||||
log.debug("attempting to send 'success' message after leader killed")
|
||||
|
||||
# in async mode, this should return immediately
|
||||
producer.send_messages(topic, partition, b'success')
|
||||
@@ -164,7 +165,7 @@ class TestFailover(KafkaIntegrationTestCase):
|
||||
if producer.partitioners[kafka_bytestring(topic)].partition(key) == 0:
|
||||
recovered = True
|
||||
except (FailedPayloadsError, ConnectionError):
|
||||
logging.debug("caught exception sending message -- will retry")
|
||||
log.debug("caught exception sending message -- will retry")
|
||||
continue
|
||||
|
||||
# Verify we successfully sent the message
|
||||
@@ -187,12 +188,16 @@ class TestFailover(KafkaIntegrationTestCase):
|
||||
|
||||
def _send_random_messages(self, producer, topic, partition, n):
|
||||
for j in range(n):
|
||||
logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
|
||||
msg = 'msg {0}: {1}'.format(j, random_string(10))
|
||||
resp = producer.send_messages(topic, partition, msg.encode('utf-8'))
|
||||
if len(resp) > 0:
|
||||
self.assertEqual(resp[0].error, 0)
|
||||
logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
|
||||
log.debug('_send_random_message %s to %s:%d', msg, topic, partition)
|
||||
while True:
|
||||
try:
|
||||
producer.send_messages(topic, partition, msg.encode('utf-8'))
|
||||
except:
|
||||
log.exception('failure in _send_random_messages - retrying')
|
||||
continue
|
||||
else:
|
||||
break
|
||||
|
||||
def _kill_leader(self, topic, partition):
|
||||
leader = self.client.topics_to_brokers[TopicAndPartition(kafka_bytestring(topic), partition)]
|
||||
|
||||
Reference in New Issue
Block a user