Catch ReplicaNotAvailableError in MetadataResponse -- per kafka-devs, this error can and should be ignored
This commit is contained in:
@@ -11,7 +11,7 @@ from kafka.common import (TopicAndPartition, BrokerMetadata,
|
|||||||
ConnectionError, FailedPayloadsError,
|
ConnectionError, FailedPayloadsError,
|
||||||
KafkaTimeoutError, KafkaUnavailableError,
|
KafkaTimeoutError, KafkaUnavailableError,
|
||||||
LeaderNotAvailableError, UnknownTopicOrPartitionError,
|
LeaderNotAvailableError, UnknownTopicOrPartitionError,
|
||||||
NotLeaderForPartitionError)
|
NotLeaderForPartitionError, ReplicaNotAvailableError)
|
||||||
|
|
||||||
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
|
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
|
||||||
from kafka.protocol import KafkaProtocol
|
from kafka.protocol import KafkaProtocol
|
||||||
@@ -350,6 +350,11 @@ class KafkaClient(object):
|
|||||||
log.error('No leader for topic %s partition %d', topic, partition)
|
log.error('No leader for topic %s partition %d', topic, partition)
|
||||||
self.topics_to_brokers[topic_part] = None
|
self.topics_to_brokers[topic_part] = None
|
||||||
continue
|
continue
|
||||||
|
# If one of the replicas is unavailable -- ignore
|
||||||
|
# this error code is provided for admin purposes only
|
||||||
|
# we never talk to replicas, only the leader
|
||||||
|
except ReplicaNotAvailableError:
|
||||||
|
log.warning('Some (non-leader) replicas not available for topic %s partition %d', topic, partition)
|
||||||
|
|
||||||
# If Known Broker, topic_partition -> BrokerMetadata
|
# If Known Broker, topic_partition -> BrokerMetadata
|
||||||
if leader in self.brokers:
|
if leader in self.brokers:
|
||||||
|
|||||||
Reference in New Issue
Block a user