Use reflection to avoid multiple errno definitions
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
import inspect
|
||||
import sys
|
||||
from collections import namedtuple
|
||||
|
||||
###############
|
||||
@@ -79,9 +81,6 @@ class KafkaError(RuntimeError):
|
||||
class BrokerResponseError(KafkaError):
|
||||
pass
|
||||
|
||||
class NoError(BrokerResponseError):
|
||||
errno = 0
|
||||
message = 'SUCCESS'
|
||||
|
||||
class UnknownError(BrokerResponseError):
|
||||
errno = -1
|
||||
@@ -201,27 +200,16 @@ class KafkaConfigurationError(KafkaError):
|
||||
pass
|
||||
|
||||
|
||||
kafka_errors = {
|
||||
-1 : UnknownError,
|
||||
0 : NoError,
|
||||
1 : OffsetOutOfRangeError,
|
||||
2 : InvalidMessageError,
|
||||
3 : UnknownTopicOrPartitionError,
|
||||
4 : InvalidFetchRequestError,
|
||||
5 : LeaderNotAvailableError,
|
||||
6 : NotLeaderForPartitionError,
|
||||
7 : RequestTimedOutError,
|
||||
8 : BrokerNotAvailableError,
|
||||
9 : ReplicaNotAvailableError,
|
||||
10 : MessageSizeTooLargeError,
|
||||
11 : StaleControllerEpochError,
|
||||
12 : OffsetMetadataTooLargeError,
|
||||
13 : StaleLeaderEpochCodeError,
|
||||
}
|
||||
def _iter_broker_errors():
|
||||
for name, obj in inspect.getmembers(sys.modules[__name__]):
|
||||
if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
|
||||
yield obj
|
||||
|
||||
|
||||
kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()])
|
||||
|
||||
|
||||
def check_error(response):
|
||||
error = kafka_errors.get(response.error, UnknownError)
|
||||
if error is not NoError:
|
||||
raise error(response)
|
||||
|
||||
if response.error:
|
||||
error_class = kafka_errors.get(response.error, UnknownError)
|
||||
raise error_class(response)
|
||||
|
@@ -10,9 +10,8 @@ from kafka.common import (
|
||||
ProduceRequest, MetadataResponse,
|
||||
BrokerMetadata, TopicMetadata, PartitionMetadata,
|
||||
TopicAndPartition, KafkaUnavailableError,
|
||||
LeaderNotAvailableError, NoError,
|
||||
UnknownTopicOrPartitionError, KafkaTimeoutError,
|
||||
ConnectionError
|
||||
LeaderNotAvailableError, UnknownTopicOrPartitionError,
|
||||
KafkaTimeoutError, ConnectionError
|
||||
)
|
||||
from kafka.conn import KafkaConnection
|
||||
from kafka.protocol import KafkaProtocol, create_message
|
||||
|
Reference in New Issue
Block a user