Merge pull request #340 from dpkp/deprecate_metadata_broker_list
Deprecate metadata_broker_list in favor of bootstrap_servers
This commit is contained in:
@@ -25,7 +25,7 @@ OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "ta
|
||||
DEFAULT_CONSUMER_CONFIG = {
|
||||
'client_id': __name__,
|
||||
'group_id': None,
|
||||
'metadata_broker_list': None,
|
||||
'bootstrap_servers': [],
|
||||
'socket_timeout_ms': 30 * 1000,
|
||||
'fetch_message_max_bytes': 1024 * 1024,
|
||||
'auto_offset_reset': 'largest',
|
||||
@@ -47,6 +47,9 @@ DEFAULT_CONSUMER_CONFIG = {
|
||||
'rebalance_backoff_ms': 2000,
|
||||
}
|
||||
|
||||
DEPRECATED_CONFIG_KEYS = {
|
||||
'metadata_broker_list': 'bootstrap_servers',
|
||||
}
|
||||
|
||||
class KafkaConsumer(object):
|
||||
"""
|
||||
@@ -56,7 +59,7 @@ class KafkaConsumer(object):
|
||||
|
||||
# A very basic 'tail' consumer, with no stored offset management
|
||||
kafka = KafkaConsumer('topic1',
|
||||
metadata_broker_list=['localhost:9092'])
|
||||
bootstrap_servers=['localhost:9092'])
|
||||
for m in kafka:
|
||||
print m
|
||||
|
||||
@@ -75,7 +78,7 @@ class KafkaConsumer(object):
|
||||
# more advanced consumer -- multiple topics w/ auto commit offset
|
||||
# management
|
||||
kafka = KafkaConsumer('topic1', 'topic2',
|
||||
metadata_broker_list=['localhost:9092'],
|
||||
bootstrap_servers=['localhost:9092'],
|
||||
group_id='my_consumer_group',
|
||||
auto_commit_enable=True,
|
||||
auto_commit_interval_ms=30 * 1000,
|
||||
@@ -120,7 +123,7 @@ class KafkaConsumer(object):
|
||||
fetch_min_bytes=1,
|
||||
fetch_wait_max_ms=100,
|
||||
refresh_leader_backoff_ms=200,
|
||||
metadata_broker_list=None,
|
||||
bootstrap_servers=[],
|
||||
socket_timeout_ms=30*1000,
|
||||
auto_offset_reset='largest',
|
||||
deserializer_class=lambda msg: msg,
|
||||
@@ -149,7 +152,7 @@ class KafkaConsumer(object):
|
||||
fetch_min_bytes=1,
|
||||
fetch_wait_max_ms=100,
|
||||
refresh_leader_backoff_ms=200,
|
||||
metadata_broker_list=None,
|
||||
bootstrap_servers=[],
|
||||
socket_timeout_ms=30*1000,
|
||||
auto_offset_reset='largest',
|
||||
deserializer_class=lambda msg: msg,
|
||||
@@ -161,6 +164,7 @@ class KafkaConsumer(object):
|
||||
Configuration parameters are described in more detail at
|
||||
http://kafka.apache.org/documentation.html#highlevelconsumerapi
|
||||
"""
|
||||
configs = self._deprecate_configs(**configs)
|
||||
self._config = {}
|
||||
for key in DEFAULT_CONSUMER_CONFIG:
|
||||
self._config[key] = configs.pop(key, DEFAULT_CONSUMER_CONFIG[key])
|
||||
@@ -178,11 +182,11 @@ class KafkaConsumer(object):
|
||||
logger.info("Configuring consumer to auto-commit offsets")
|
||||
self._reset_auto_commit()
|
||||
|
||||
if self._config['metadata_broker_list'] is None:
|
||||
raise KafkaConfigurationError('metadata_broker_list required to '
|
||||
if not self._config['bootstrap_servers']:
|
||||
raise KafkaConfigurationError('bootstrap_servers required to '
|
||||
'configure KafkaConsumer')
|
||||
|
||||
self._client = KafkaClient(self._config['metadata_broker_list'],
|
||||
self._client = KafkaClient(self._config['bootstrap_servers'],
|
||||
client_id=self._config['client_id'],
|
||||
timeout=(self._config['socket_timeout_ms'] / 1000.0))
|
||||
|
||||
@@ -751,3 +755,17 @@ class KafkaConsumer(object):
|
||||
return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
|
||||
for topic_partition in
|
||||
self._topics])
|
||||
|
||||
#
|
||||
# other private methods
|
||||
#
|
||||
|
||||
def _deprecate_configs(self, **configs):
|
||||
for old, new in six.iteritems(DEPRECATED_CONFIG_KEYS):
|
||||
if old in configs:
|
||||
logger.warning('Deprecated Kafka Consumer configuration: %s. '
|
||||
'Please use %s instead.', old, new)
|
||||
old_value = configs.pop(old)
|
||||
if new not in configs:
|
||||
configs[new] = old_value
|
||||
return configs
|
||||
|
||||
Reference in New Issue
Block a user