Fail-fast on timeout constraint violations during KafkaConsumer creation (#986)
This commit is contained in:

committed by
Dana Powers

parent
bcb4009b93
commit
432f00eb66
@@ -6,6 +6,8 @@ import socket
|
|||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
from kafka.errors import KafkaConfigurationError
|
||||||
|
|
||||||
from kafka.vendor import six
|
from kafka.vendor import six
|
||||||
|
|
||||||
from kafka.client_async import KafkaClient, selectors
|
from kafka.client_async import KafkaClient, selectors
|
||||||
@@ -267,6 +269,17 @@ class KafkaConsumer(six.Iterator):
|
|||||||
new_config, self.config['auto_offset_reset'])
|
new_config, self.config['auto_offset_reset'])
|
||||||
self.config['auto_offset_reset'] = new_config
|
self.config['auto_offset_reset'] = new_config
|
||||||
|
|
||||||
|
request_timeout_ms = self.config['request_timeout_ms']
|
||||||
|
session_timeout_ms = self.config['session_timeout_ms']
|
||||||
|
fetch_max_wait_ms = self.config['fetch_max_wait_ms']
|
||||||
|
if request_timeout_ms <= session_timeout_ms:
|
||||||
|
raise KafkaConfigurationError(
|
||||||
|
"Request timeout (%s) must be larger than session timeout (%s)" %
|
||||||
|
(request_timeout_ms, session_timeout_ms))
|
||||||
|
if request_timeout_ms <= fetch_max_wait_ms:
|
||||||
|
raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" %
|
||||||
|
(request_timeout_ms, fetch_max_wait_ms))
|
||||||
|
|
||||||
metrics_tags = {'client-id': self.config['client_id']}
|
metrics_tags = {'client-id': self.config['client_id']}
|
||||||
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
|
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
|
||||||
time_window_ms=self.config['metrics_sample_window_ms'],
|
time_window_ms=self.config['metrics_sample_window_ms'],
|
||||||
|
@@ -16,6 +16,14 @@ class TestKafkaConsumer(unittest.TestCase):
|
|||||||
with self.assertRaises(AssertionError):
|
with self.assertRaises(AssertionError):
|
||||||
SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
|
SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
|
||||||
|
|
||||||
|
def test_session_timeout_larger_than_request_timeout_raises(self):
|
||||||
|
with self.assertRaises(KafkaConfigurationError):
|
||||||
|
KafkaConsumer(bootstrap_servers='localhost:9092', session_timeout_ms=60000, request_timeout_ms=40000)
|
||||||
|
|
||||||
|
def test_fetch_max_wait_larger_than_request_timeout_raises(self):
|
||||||
|
with self.assertRaises(KafkaConfigurationError):
|
||||||
|
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)
|
||||||
|
|
||||||
|
|
||||||
class TestMultiProcessConsumer(unittest.TestCase):
|
class TestMultiProcessConsumer(unittest.TestCase):
|
||||||
@unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows')
|
@unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows')
|
||||||
|
Reference in New Issue
Block a user