Merge branch 'exception_refactor' into add_tests

This commit is contained in:
Mark Roberts
2014-04-30 18:40:42 -07:00
3 changed files with 114 additions and 54 deletions

View File

@@ -1,15 +1,18 @@
import copy
import logging
import collections
import kafka.common
from collections import defaultdict
from functools import partial
from itertools import count
from kafka.common import *
from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
BrokerResponseError, PartitionUnavailableError,
LeaderUnavailableError,
KafkaUnavailableError)
PartitionUnavailableError,
LeaderUnavailableError, KafkaUnavailableError,
UnknownTopicOrPartitionError, NotLeaderForPartitionError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
@@ -39,6 +42,7 @@ class KafkaClient(object):
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
self.load_metadata_for_topics() # bootstrap with all metadata
##################
# Private API #
##################
@@ -92,10 +96,9 @@ class KafkaClient(object):
conn.send(requestId, request)
response = conn.recv(requestId)
return response
except Exception, e:
except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (request, host, port, e))
continue
raise KafkaUnavailableError("All servers failed to process request")
@@ -123,7 +126,7 @@ class KafkaClient(object):
# Group the requests by topic+partition
original_keys = []
payloads_by_broker = defaultdict(list)
payloads_by_broker = collections.defaultdict(list)
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
@@ -157,11 +160,11 @@ class KafkaClient(object):
continue
try:
response = conn.recv(requestId)
except ConnectionError, e:
except ConnectionError as e:
log.warning("Could not receive response to request [%s] "
"from server %s: %s", request, conn, e)
failed = True
except ConnectionError, e:
except ConnectionError as e:
log.warning("Could not send request [%s] to server %s: %s",
request, conn, e)
failed = True
@@ -184,16 +187,11 @@ class KafkaClient(object):
return '<KafkaClient client_id=%s>' % (self.client_id)
def _raise_on_response_error(self, resp):
if resp.error == ErrorMapping.NO_ERROR:
return
if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON,
ErrorMapping.NOT_LEADER_FOR_PARTITION):
try:
kafka.common.check_error(resp)
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e:
self.reset_topic_metadata(resp.topic)
raise BrokerResponseError(
"Request for %s failed with errorcode=%d (%s)" %
(TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error]))
raise
#################
# Public API #

View File

@@ -48,29 +48,6 @@ Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
ErrorStrings = {
-1 : 'UNKNOWN',
0 : 'NO_ERROR',
1 : 'OFFSET_OUT_OF_RANGE',
2 : 'INVALID_MESSAGE',
3 : 'UNKNOWN_TOPIC_OR_PARTITON',
4 : 'INVALID_FETCH_SIZE',
5 : 'LEADER_NOT_AVAILABLE',
6 : 'NOT_LEADER_FOR_PARTITION',
7 : 'REQUEST_TIMED_OUT',
8 : 'BROKER_NOT_AVAILABLE',
9 : 'REPLICA_NOT_AVAILABLE',
10 : 'MESSAGE_SIZE_TOO_LARGE',
11 : 'STALE_CONTROLLER_EPOCH',
12 : 'OFFSET_METADATA_TOO_LARGE',
}
class ErrorMapping(object):
pass
for k, v in ErrorStrings.items():
setattr(ErrorMapping, v, k)
#################
# Exceptions #
#################
@@ -80,11 +57,76 @@ class KafkaError(RuntimeError):
pass
class KafkaUnavailableError(KafkaError):
class BrokerResponseError(KafkaError):
pass
class BrokerResponseError(KafkaError):
class UnknownError(BrokerResponseError):
errno = -1
message = 'UNKNOWN'
class OffsetOutOfRangeError(BrokerResponseError):
errno = 1
message = 'OFFSET_OUT_OF_RANGE'
class InvalidMessageError(BrokerResponseError):
errno = 2
message = 'INVALID_MESSAGE'
class UnknownTopicOrPartitionError(BrokerResponseError):
errno = 3
message = 'UNKNOWN_TOPIC_OR_PARTITON'
class InvalidFetchRequestError(BrokerResponseError):
errno = 4
message = 'INVALID_FETCH_SIZE'
class LeaderNotAvailableError(BrokerResponseError):
errno = 5
message = 'LEADER_NOT_AVAILABLE'
class NotLeaderForPartitionError(BrokerResponseError):
errno = 6
message = 'NOT_LEADER_FOR_PARTITION'
class RequestTimedOutError(BrokerResponseError):
errno = 7
message = 'REQUEST_TIMED_OUT'
class BrokerNotAvailableError(BrokerResponseError):
errno = 8
message = 'BROKER_NOT_AVAILABLE'
class ReplicaNotAvailableError(BrokerResponseError):
errno = 9
message = 'REPLICA_NOT_AVAILABLE'
class MessageSizeTooLargeError(BrokerResponseError):
errno = 10
message = 'MESSAGE_SIZE_TOO_LARGE'
class StaleControllerEpochError(BrokerResponseError):
errno = 11
message = 'STALE_CONTROLLER_EPOCH'
class OffsetMetadataTooLarge(BrokerResponseError):
errno = 12
message = 'OFFSET_METADATA_TOO_LARGE'
class KafkaUnavailableError(KafkaError):
pass
@@ -122,3 +164,25 @@ class ConsumerNoMoreData(KafkaError):
class ProtocolError(KafkaError):
pass
kafka_errors = {
-1 : UnknownError,
1 : OffsetOutOfRangeError,
2 : InvalidMessageError,
3 : UnknownTopicOrPartitionError,
4 : InvalidFetchRequestError,
5 : LeaderNotAvailableError,
6 : NotLeaderForPartitionError,
7 : RequestTimedOutError,
8 : BrokerNotAvailableError,
9 : ReplicaNotAvailableError,
10 : MessageSizeTooLargeError,
11 : StaleControllerEpochError,
12 : OffsetMetadataTooLarge,
}
def check_error(response):
error = kafka_errors.get(response.error)
if error:
raise error(response)

View File

@@ -8,8 +8,9 @@ from threading import Lock
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue
import kafka
from kafka.common import (
ErrorMapping, FetchRequest,
FetchRequest,
OffsetRequest, OffsetCommitRequest,
OffsetFetchRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
@@ -100,14 +101,11 @@ class Consumer(object):
self.commit_timer.start()
def get_or_init_offset_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
try:
kafka.common.check_error(resp)
return resp.offset
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
except kafka.common.UnknownTopicOrPartitionError:
return 0
else:
raise ProtocolError("OffsetFetchRequest for topic=%s, "
"partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))
if auto_commit:
for partition in partitions:
@@ -154,7 +152,7 @@ class Consumer(object):
resps = self.client.send_offset_commit_request(self.group, reqs)
for resp in resps:
assert resp.error == 0
kafka.common.check_error(resp)
self.count_since_commit = 0
@@ -432,7 +430,7 @@ class SimpleConsumer(Consumer):
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
except ConsumerFetchSizeTooSmall, e:
except ConsumerFetchSizeTooSmall as e:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
log.error("Max fetch size %d too small",
@@ -446,7 +444,7 @@ class SimpleConsumer(Consumer):
log.warn("Fetch size too small, increase to %d (2x) "
"and retry", self.buffer_size)
retry_partitions.add(partition)
except ConsumerNoMoreData, e:
except ConsumerNoMoreData as e:
log.debug("Iteration was ended by %r", e)
except StopIteration:
# Stop iterating through this partition