(test) SimpleConsumer.reset_partition_offset should not raise exception on failure
This commit is contained in:
@@ -4,7 +4,7 @@ from . import unittest
|
||||
|
||||
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
|
||||
from kafka.common import (
|
||||
KafkaConfigurationError, FetchResponse,
|
||||
KafkaConfigurationError, FetchResponse, OffsetFetchResponse,
|
||||
FailedPayloadsError, OffsetAndMessage,
|
||||
NotLeaderForPartitionError, UnknownTopicOrPartitionError
|
||||
)
|
||||
@@ -105,6 +105,21 @@ class TestSimpleConsumer(unittest.TestCase):
|
||||
# This should not raise an exception
|
||||
self.assertFalse(consumer.commit(partitions=[0, 1]))
|
||||
|
||||
def test_simple_consumer_reset_partition_offset(self):
|
||||
client = MagicMock()
|
||||
|
||||
def mock_offset_request(payloads, **kwargs):
|
||||
raise FailedPayloadsError(payloads[0])
|
||||
|
||||
client.send_offset_request.side_effect = mock_offset_request
|
||||
|
||||
consumer = SimpleConsumer(client, group='foobar',
|
||||
topic='topic', partitions=[0, 1],
|
||||
auto_commit=False)
|
||||
|
||||
# This should not raise an exception
|
||||
self.assertEqual(consumer.reset_partition_offset(0), None)
|
||||
|
||||
@staticmethod
|
||||
def fail_requests_factory(error_factory):
|
||||
# Mock so that only the first request gets a valid response
|
||||
|
||||
Reference in New Issue
Block a user