a few extra AssertionError messages in KafkaConsumer
This commit is contained in:
@@ -180,7 +180,7 @@ class KafkaConsumer(six.Iterator):
|
|||||||
# Check Broker Version if not set explicitly
|
# Check Broker Version if not set explicitly
|
||||||
if self.config['api_version'] == 'auto':
|
if self.config['api_version'] == 'auto':
|
||||||
self.config['api_version'] = self._client.check_version()
|
self.config['api_version'] = self._client.check_version()
|
||||||
assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0')
|
assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0'), 'Unrecognized api version'
|
||||||
|
|
||||||
# Convert api_version config to tuple for easy comparisons
|
# Convert api_version config to tuple for easy comparisons
|
||||||
self.config['api_version'] = tuple(
|
self.config['api_version'] = tuple(
|
||||||
@@ -679,7 +679,7 @@ class KafkaConsumer(six.Iterator):
|
|||||||
self._fetcher.update_fetch_positions(partitions)
|
self._fetcher.update_fetch_positions(partitions)
|
||||||
|
|
||||||
def _message_generator(self):
|
def _message_generator(self):
|
||||||
assert self.assignment() or self.subscription() is not None
|
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
|
||||||
while time.time() < self._consumer_timeout:
|
while time.time() < self._consumer_timeout:
|
||||||
if self.config['group_id'] is not None:
|
if self.config['group_id'] is not None:
|
||||||
if self.config['api_version'] >= (0, 8, 2):
|
if self.config['api_version'] >= (0, 8, 2):
|
||||||
|
|||||||
Reference in New Issue
Block a user