KAFKA-2832: Add a consumer config option to exclude internal topics (#765)

Use exclude_internal_topics config in KafkaConsumer to avoid subscribe patterns matching internal topics
  Raise error during rebalance if subscribed topics are not authorized
This commit is contained in:
Dana Powers
2016-07-17 08:37:15 -07:00
committed by GitHub
parent 506d023978
commit 31a29ecea0
2 changed files with 13 additions and 4 deletions

View File

@@ -176,6 +176,10 @@ class KafkaConsumer(six.Iterator):
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
exclude_internal_topics (bool): Whether records from internal topics
(such as offsets) should be exposed to the consumer. If set to True
the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+ Default: True
Note:
Configuration parameters are described in more detail at
@@ -222,6 +226,7 @@ class KafkaConsumer(six.Iterator):
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector,
'exclude_internal_topics': True,
}
def __init__(self, *topics, **configs):

View File

@@ -36,6 +36,7 @@ class ConsumerCoordinator(BaseCoordinator):
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
'api_version': (0, 9),
'exclude_internal_topics': True,
}
def __init__(self, client, subscription, metrics, metric_group_prefix,
@@ -70,6 +71,10 @@ class ConsumerCoordinator(BaseCoordinator):
using Kafka's group managementment facilities. Default: 30000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
exclude_internal_topics (bool): Whether records from internal topics
(such as offsets) should be exposed to the consumer. If set to
True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True
"""
super(ConsumerCoordinator, self).__init__(client, **configs)
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -131,13 +136,12 @@ class ConsumerCoordinator(BaseCoordinator):
def _handle_metadata_update(self, cluster):
# if we encounter any unauthorized topics, raise an exception
# TODO
#if self._cluster.unauthorized_topics:
# raise TopicAuthorizationError(self._cluster.unauthorized_topics)
if cluster.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(cluster.unauthorized_topics)
if self._subscription.subscribed_pattern:
topics = []
for topic in cluster.topics():
for topic in cluster.topics(self.config['exclude_internal_topics']):
if self._subscription.subscribed_pattern.match(topic):
topics.append(topic)