Add ignore_leadernotavailable kwarg to SimpleClient.load_metadata_for_topics
This commit is contained in:
@@ -450,17 +450,10 @@ class SimpleClient(object):
|
||||
while not self.has_metadata_for_topic(topic):
|
||||
if time.time() > start_time + timeout:
|
||||
raise KafkaTimeoutError('Unable to create topic {0}'.format(topic))
|
||||
try:
|
||||
self.load_metadata_for_topics(topic)
|
||||
except LeaderNotAvailableError:
|
||||
pass
|
||||
except UnknownTopicOrPartitionError:
|
||||
# Server is not configured to auto-create
|
||||
# retrying in this case will not help
|
||||
raise
|
||||
self.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
|
||||
time.sleep(.5)
|
||||
|
||||
def load_metadata_for_topics(self, *topics):
|
||||
def load_metadata_for_topics(self, *topics, **kwargs):
|
||||
"""Fetch broker and topic-partition metadata from the server.
|
||||
|
||||
Updates internal data: broker list, topic/partition list, and
|
||||
@@ -476,6 +469,9 @@ class SimpleClient(object):
|
||||
*topics (optional): If a list of topics is provided,
|
||||
the metadata refresh will be limited to the specified topics
|
||||
only.
|
||||
ignore_leadernotavailable (bool): suppress LeaderNotAvailableError
|
||||
so that metadata is loaded correctly during auto-create.
|
||||
Default: False.
|
||||
|
||||
Raises:
|
||||
UnknownTopicOrPartitionError: Raised for topics that do not exist,
|
||||
@@ -484,6 +480,11 @@ class SimpleClient(object):
|
||||
when the broker is configured to auto-create topics. Retry
|
||||
after a short backoff (topics/partitions are initializing).
|
||||
"""
|
||||
if 'ignore_leadernotavailable' in kwargs:
|
||||
ignore_leadernotavailable = kwargs['ignore_leadernotavailable']
|
||||
else:
|
||||
ignore_leadernotavailable = False
|
||||
|
||||
if topics:
|
||||
self.reset_topic_metadata(*topics)
|
||||
else:
|
||||
@@ -506,6 +507,9 @@ class SimpleClient(object):
|
||||
topic, error_type, error)
|
||||
if topic not in topics:
|
||||
continue
|
||||
elif (error_type is LeaderNotAvailableError and
|
||||
ignore_leadernotavailable):
|
||||
continue
|
||||
raise error_type(topic)
|
||||
|
||||
self.topic_partitions[topic] = {}
|
||||
|
@@ -53,7 +53,7 @@ class Consumer(object):
|
||||
self.client = client
|
||||
self.topic = topic
|
||||
self.group = group
|
||||
self.client.load_metadata_for_topics(topic)
|
||||
self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
|
||||
self.offsets = {}
|
||||
|
||||
if partitions is None:
|
||||
|
@@ -29,7 +29,7 @@ class KeyedProducer(Producer):
|
||||
def _next_partition(self, topic, key):
|
||||
if topic not in self.partitioners:
|
||||
if not self.client.has_metadata_for_topic(topic):
|
||||
self.client.load_metadata_for_topics(topic)
|
||||
self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
|
||||
|
||||
self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
|
||||
|
||||
|
Reference in New Issue
Block a user