Add unit tests for SimpleConsumer error handling
This commit is contained in:
@@ -3,7 +3,12 @@ from mock import MagicMock, patch
|
||||
from . import unittest
|
||||
|
||||
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
|
||||
from kafka.common import KafkaConfigurationError
|
||||
from kafka.common import (
|
||||
KafkaConfigurationError, FetchResponse,
|
||||
FailedPayloadsError, OffsetAndMessage,
|
||||
NotLeaderForPartitionError, UnknownTopicOrPartitionError
|
||||
)
|
||||
|
||||
|
||||
class TestKafkaConsumer(unittest.TestCase):
|
||||
def test_non_integer_partitions(self):
|
||||
@@ -14,6 +19,7 @@ class TestKafkaConsumer(unittest.TestCase):
|
||||
with self.assertRaises(KafkaConfigurationError):
|
||||
KafkaConsumer()
|
||||
|
||||
|
||||
class TestMultiProcessConsumer(unittest.TestCase):
|
||||
def test_partition_list(self):
|
||||
client = MagicMock()
|
||||
@@ -22,3 +28,70 @@ class TestMultiProcessConsumer(unittest.TestCase):
|
||||
consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
|
||||
self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
|
||||
self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member
|
||||
|
||||
def test_simple_consumer_failed_payloads(self):
|
||||
client = MagicMock()
|
||||
consumer = SimpleConsumer(client, group=None,
|
||||
topic='topic', partitions=[0, 1],
|
||||
auto_commit=False)
|
||||
|
||||
def failed_payloads(payload):
|
||||
return FailedPayloadsError(payload)
|
||||
|
||||
client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads)
|
||||
|
||||
# This should not raise an exception
|
||||
consumer.get_messages(5)
|
||||
|
||||
def test_simple_consumer_leader_change(self):
|
||||
client = MagicMock()
|
||||
consumer = SimpleConsumer(client, group=None,
|
||||
topic='topic', partitions=[0, 1],
|
||||
auto_commit=False)
|
||||
|
||||
# Mock so that only the first request gets a valid response
|
||||
def not_leader(request):
|
||||
return FetchResponse(request.topic, request.partition,
|
||||
NotLeaderForPartitionError.errno, -1, ())
|
||||
|
||||
client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader)
|
||||
|
||||
# This should not raise an exception
|
||||
consumer.get_messages(20)
|
||||
|
||||
# client should have updated metadata
|
||||
self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1)
|
||||
self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1)
|
||||
|
||||
def test_simple_consumer_unknown_topic_partition(self):
|
||||
client = MagicMock()
|
||||
consumer = SimpleConsumer(client, group=None,
|
||||
topic='topic', partitions=[0, 1],
|
||||
auto_commit=False)
|
||||
|
||||
# Mock so that only the first request gets a valid response
|
||||
def unknown_topic_partition(request):
|
||||
return FetchResponse(request.topic, request.partition,
|
||||
UnknownTopicOrPartitionError.errno, -1, ())
|
||||
|
||||
client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition)
|
||||
|
||||
# This should not raise an exception
|
||||
with self.assertRaises(UnknownTopicOrPartitionError):
|
||||
consumer.get_messages(20)
|
||||
|
||||
@staticmethod
|
||||
def fail_requests_factory(error_factory):
|
||||
# Mock so that only the first request gets a valid response
|
||||
def fail_requests(payloads, **kwargs):
|
||||
responses = [
|
||||
FetchResponse(payloads[0].topic, payloads[0].partition, 0, 0,
|
||||
(OffsetAndMessage(
|
||||
payloads[0].offset + i,
|
||||
"msg %d" % (payloads[0].offset + i))
|
||||
for i in range(10))),
|
||||
]
|
||||
for failure in payloads[1:]:
|
||||
responses.append(error_factory(failure))
|
||||
return responses
|
||||
return fail_requests
|
||||
|
Reference in New Issue
Block a user