Fix #44 Add missing exception class
Also move the exceptions to common instead of util
This commit is contained in:
@@ -5,3 +5,5 @@
|
|||||||
* Adding fetch_size_bytes to SimpleConsumer constructor to allow for user-configurable fetch sizes
|
* Adding fetch_size_bytes to SimpleConsumer constructor to allow for user-configurable fetch sizes
|
||||||
|
|
||||||
* Allow SimpleConsumer to automatically increase the fetch size if a partial message is read and no other messages were read during that fetch request. The increase factor is 1.5
|
* Allow SimpleConsumer to automatically increase the fetch size if a partial message is read and no other messages were read during that fetch request. The increase factor is 1.5
|
||||||
|
|
||||||
|
* Exception classes moved to kafka.common
|
||||||
|
|||||||
@@ -64,3 +64,19 @@ class ErrorMapping(object):
|
|||||||
MESSAGE_SIZE_TO_LARGE = 10
|
MESSAGE_SIZE_TO_LARGE = 10
|
||||||
STALE_CONTROLLER_EPOCH = 11
|
STALE_CONTROLLER_EPOCH = 11
|
||||||
OFFSET_METADATA_TOO_LARGE = 12
|
OFFSET_METADATA_TOO_LARGE = 12
|
||||||
|
|
||||||
|
#################
|
||||||
|
# Exceptions #
|
||||||
|
#################
|
||||||
|
|
||||||
|
class BufferUnderflowError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class ChecksumError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class ConsumerFetchSizeTooSmall(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class ConsumerNoMoreData(Exception):
|
||||||
|
pass
|
||||||
|
|||||||
@@ -8,12 +8,11 @@ from Queue import Empty
|
|||||||
|
|
||||||
from kafka.common import (
|
from kafka.common import (
|
||||||
ErrorMapping, FetchRequest,
|
ErrorMapping, FetchRequest,
|
||||||
OffsetRequest, OffsetFetchRequest, OffsetCommitRequest
|
OffsetRequest, OffsetFetchRequest, OffsetCommitRequest,
|
||||||
|
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
|
||||||
)
|
)
|
||||||
|
|
||||||
from kafka.util import (
|
from kafka.util import ReentrantTimer
|
||||||
ReentrantTimer, ConsumerFetchSizeTooSmall
|
|
||||||
)
|
|
||||||
|
|
||||||
log = logging.getLogger("kafka")
|
log = logging.getLogger("kafka")
|
||||||
|
|
||||||
|
|||||||
@@ -8,12 +8,12 @@ from kafka.codec import (
|
|||||||
from kafka.common import (
|
from kafka.common import (
|
||||||
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
|
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
|
||||||
ProduceResponse, FetchResponse, OffsetResponse,
|
ProduceResponse, FetchResponse, OffsetResponse,
|
||||||
OffsetCommitResponse, OffsetFetchResponse
|
OffsetCommitResponse, OffsetFetchResponse,
|
||||||
|
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall
|
||||||
)
|
)
|
||||||
from kafka.util import (
|
from kafka.util import (
|
||||||
read_short_string, read_int_string, relative_unpack,
|
read_short_string, read_int_string, relative_unpack,
|
||||||
write_short_string, write_int_string, group_by_topic_and_partition,
|
write_short_string, write_int_string, group_by_topic_and_partition
|
||||||
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall
|
|
||||||
)
|
)
|
||||||
|
|
||||||
log = logging.getLogger("kafka")
|
log = logging.getLogger("kafka")
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ from multiprocessing import Process, Queue, Event
|
|||||||
from Queue import Empty
|
from Queue import Empty
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from .client import KafkaClient, FetchRequest, ProduceRequest
|
from kafka.client import KafkaClient, FetchRequest, ProduceRequest
|
||||||
|
|
||||||
log = logging.getLogger("kafka")
|
log = logging.getLogger("kafka")
|
||||||
|
|
||||||
|
|||||||
@@ -66,15 +66,6 @@ def group_by_topic_and_partition(tuples):
|
|||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
class BufferUnderflowError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ChecksumError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class ConsumerFetchSizeTooSmall(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class ReentrantTimer(object):
|
class ReentrantTimer(object):
|
||||||
"""
|
"""
|
||||||
|
|||||||
Reference in New Issue
Block a user