Remove KafkaConsumer.BYTES_CONFIGURATION_KEYS
- rely on KafkaClient to handle client_id string/bytes encoding - accept either string or bytes for group_id - convert group_id string to utf-8 bytes internally
This commit is contained in:
@@ -47,8 +47,6 @@ DEFAULT_CONSUMER_CONFIG = {
|
|||||||
'rebalance_backoff_ms': 2000,
|
'rebalance_backoff_ms': 2000,
|
||||||
}
|
}
|
||||||
|
|
||||||
BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id')
|
|
||||||
|
|
||||||
|
|
||||||
class KafkaConsumer(object):
|
class KafkaConsumer(object):
|
||||||
"""
|
"""
|
||||||
@@ -171,13 +169,6 @@ class KafkaConsumer(object):
|
|||||||
raise KafkaConfigurationError('Unknown configuration key(s): ' +
|
raise KafkaConfigurationError('Unknown configuration key(s): ' +
|
||||||
str(list(configs.keys())))
|
str(list(configs.keys())))
|
||||||
|
|
||||||
# Handle str/bytes conversions
|
|
||||||
for config_key in BYTES_CONFIGURATION_KEYS:
|
|
||||||
if isinstance(self._config[config_key], six.string_types):
|
|
||||||
logger.warning("Converting configuration key '%s' to bytes" %
|
|
||||||
config_key)
|
|
||||||
self._config[config_key] = self._config[config_key].encode('utf-8')
|
|
||||||
|
|
||||||
if self._config['auto_commit_enable']:
|
if self._config['auto_commit_enable']:
|
||||||
if not self._config['group_id']:
|
if not self._config['group_id']:
|
||||||
raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
|
raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
|
||||||
@@ -554,7 +545,7 @@ class KafkaConsumer(object):
|
|||||||
|
|
||||||
if commits:
|
if commits:
|
||||||
logger.info('committing consumer offsets to group %s', self._config['group_id'])
|
logger.info('committing consumer offsets to group %s', self._config['group_id'])
|
||||||
resps = self._client.send_offset_commit_request(self._config['group_id'],
|
resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']),
|
||||||
commits,
|
commits,
|
||||||
fail_on_error=False)
|
fail_on_error=False)
|
||||||
|
|
||||||
@@ -618,7 +609,7 @@ class KafkaConsumer(object):
|
|||||||
logger.info("Consumer fetching stored offsets")
|
logger.info("Consumer fetching stored offsets")
|
||||||
for topic_partition in self._topics:
|
for topic_partition in self._topics:
|
||||||
(resp,) = self._client.send_offset_fetch_request(
|
(resp,) = self._client.send_offset_fetch_request(
|
||||||
self._config['group_id'],
|
kafka_bytestring(self._config['group_id']),
|
||||||
[OffsetFetchRequest(topic_partition[0], topic_partition[1])],
|
[OffsetFetchRequest(topic_partition[0], topic_partition[1])],
|
||||||
fail_on_error=False)
|
fail_on_error=False)
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user