Update imports from kafka.common -> kafka.errors / kafka.structs

This commit is contained in:
Dana Powers 2016-04-05 09:34:48 -07:00
parent 221f56d8a0
commit 5a14bd8c94
41 changed files with 111 additions and 127 deletions

View File

@ -129,10 +129,9 @@ SimpleClient (DEPRECATED)
import time
from kafka import SimpleClient
from kafka.common import (
LeaderNotAvailableError, NotLeaderForPartitionError,
ProduceRequestPayload)
from kafka.errors import LeaderNotAvailableError, NotLeaderForPartitionError
from kafka.protocol import create_message
from kafka.structs import ProduceRequestPayload
kafka = SimpleClient('localhost:9092')
payload = ProduceRequestPayload(topic='my-topic', partition=0,

View File

@ -56,7 +56,7 @@ KafkaProducer
.. code:: python
from kafka import KafkaProducer
from kafka.common import KafkaError
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['broker1:1234'])

View File

@ -22,7 +22,7 @@ from kafka.conn import BrokerConnection
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message)
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
from kafka.common import TopicPartition
from kafka.structs import TopicPartition
# To be deprecated when KafkaProducer interface is released
from kafka.client import SimpleClient

View File

@ -7,12 +7,12 @@ import time
import six
import kafka.common
from kafka.common import (TopicPartition, BrokerMetadata, UnknownError,
ConnectionError, FailedPayloadsError,
import kafka.errors
from kafka.errors import (UnknownError, ConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError, ReplicaNotAvailableError)
from kafka.structs import TopicPartition, BrokerMetadata
from kafka.conn import (
collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS,
@ -123,7 +123,7 @@ class SimpleClient(object):
# If there's a problem with finding the coordinator, raise the
# provided error
kafka.common.check_error(resp)
kafka.errors.check_error(resp)
# Otherwise return the BrokerMetadata
return BrokerMetadata(resp.nodeId, resp.host, resp.port)
@ -389,7 +389,7 @@ class SimpleClient(object):
# Or a server api error response
try:
kafka.common.check_error(resp)
kafka.errors.check_error(resp)
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
self.reset_topic_metadata(resp.topic)
raise
@ -509,7 +509,7 @@ class SimpleClient(object):
for error, topic, partitions in resp.topics:
# Errors expected for new topics
if error:
error_type = kafka.common.kafka_errors.get(error, UnknownError)
error_type = kafka.errors.kafka_errors.get(error, UnknownError)
if error_type in (UnknownTopicOrPartitionError, LeaderNotAvailableError):
log.error('Error loading topic metadata for %s: %s (%s)',
topic, error_type, error)
@ -530,7 +530,7 @@ class SimpleClient(object):
# Check for partition errors
if error:
error_type = kafka.common.kafka_errors.get(error, UnknownError)
error_type = kafka.errors.kafka_errors.get(error, UnknownError)
# If No Leader, topics_to_brokers topic_partition -> None
if error_type is LeaderNotAvailableError:

View File

@ -11,10 +11,9 @@ import time
import six
import kafka.common as Errors # TODO: make Errors a separate class
from .cluster import ClusterMetadata
from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
from . import errors as Errors
from .future import Future
from .protocol.metadata import MetadataRequest
from .protocol.produce import ProduceRequest

View File

@ -9,9 +9,9 @@ import time
import six
import kafka.common as Errors
from kafka.common import BrokerMetadata, PartitionMetadata, TopicPartition
from . import errors as Errors
from .future import Future
from .structs import BrokerMetadata, PartitionMetadata, TopicPartition
log = logging.getLogger(__name__)

View File

@ -13,7 +13,7 @@ import warnings
import six
import kafka.common as Errors
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorResponse
@ -149,7 +149,7 @@ class BrokerConnection(object):
Arguments:
error (Exception, optional): pending in-flight-requests
will be failed with this exception.
Default: kafka.common.ConnectionError.
Default: kafka.errors.ConnectionError.
"""
if self._sock:
self._sock.close()

View File

@ -6,12 +6,10 @@ import numbers
from threading import Lock
import warnings
import kafka.common
from kafka.common import (
OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
UnknownTopicOrPartitionError, check_error, KafkaError
)
from kafka.errors import (
UnknownTopicOrPartitionError, check_error, KafkaError)
from kafka.structs import (
OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload)
from kafka.util import ReentrantTimer

View File

@ -6,12 +6,12 @@ import logging
import six
import kafka.common as Errors
from kafka.common import TopicPartition
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.structs import TopicPartition
log = logging.getLogger(__name__)

View File

@ -7,13 +7,13 @@ import time
import six
from kafka.client_async import KafkaClient
from kafka.common import TopicPartition
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.protocol.offset import OffsetResetStrategy
from kafka.structs import TopicPartition
from kafka.version import __version__
log = logging.getLogger(__name__)

View File

@ -6,8 +6,9 @@ import re
import six
from kafka.common import IllegalStateError, OffsetAndMetadata
from kafka.errors import IllegalStateError
from kafka.protocol.offset import OffsetResetStrategy
from kafka.structs import OffsetAndMetadata
log = logging.getLogger(__name__)

View File

@ -3,7 +3,8 @@ Context manager to commit/rollback consumer offsets.
"""
from logging import getLogger
from kafka.common import check_error, OffsetCommitRequestPayload, OffsetOutOfRangeError
from kafka.errors import check_error, OffsetOutOfRangeError
from kafka.structs import OffsetCommitRequestPayload
class OffsetCommitContext(object):

View File

@ -6,7 +6,7 @@ import weakref
import six
import kafka.common as Errors
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.commit import (GroupCoordinatorRequest,
OffsetCommitRequest_v2 as OffsetCommitRequest)

View File

@ -12,14 +12,14 @@ from .base import BaseCoordinator
from .assignors.range import RangePartitionAssignor
from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import ConsumerProtocol
from ..common import OffsetAndMetadata, TopicPartition
from .. import errors as Errors
from ..future import Future
from ..protocol.commit import (
OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
OffsetFetchRequest_v0, OffsetFetchRequest_v1)
from ..structs import OffsetAndMetadata, TopicPartition
from ..util import WeakMethod
import kafka.common as Errors
log = logging.getLogger(__name__)

View File

@ -1,7 +1,7 @@
import copy
import time
import kafka.common as Errors
import kafka.errors as Errors
class Heartbeat(object):

View File

@ -1,8 +1,8 @@
from __future__ import absolute_import
from kafka.common import TopicPartition
from kafka.protocol.struct import Struct
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
from kafka.structs import TopicPartition
class ConsumerProtocolMemberMetadata(Struct):

View File

@ -1,7 +1,7 @@
import functools
import logging
import kafka.common as Errors
import kafka.errors as Errors
log = logging.getLogger(__name__)

View File

@ -14,13 +14,12 @@ from threading import Thread, Event
import six
from kafka.common import (
ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions,
from kafka.structs import (
ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions)
from kafka.errors import (
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES
)
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
log = logging.getLogger('kafka.producer')

View File

@ -7,10 +7,10 @@ import time
from ..codec import (has_gzip, has_snappy, has_lz4,
gzip_encode, snappy_encode, lz4_encode)
from .. import errors as Errors
from ..protocol.types import Int32, Int64
from ..protocol.message import MessageSet, Message
import kafka.common as Errors
class MessageSetBuffer(object):

View File

@ -3,10 +3,9 @@ from __future__ import absolute_import
import collections
import threading
from .. import errors as Errors
from ..future import Future
import kafka.common as Errors
class FutureProduceResult(Future):
def __init__(self, topic_partition):

View File

@ -8,14 +8,14 @@ import threading
import time
from ..client_async import KafkaClient
from ..common import TopicPartition
from ..structs import TopicPartition
from ..partitioner.default import DefaultPartitioner
from ..protocol.message import Message, MessageSet
from .. import errors as Errors
from .future import FutureRecordMetadata, FutureProduceResult
from .record_accumulator import AtomicInteger, RecordAccumulator
from .sender import Sender
import kafka.common as Errors
log = logging.getLogger(__name__)
PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger()

View File

@ -8,12 +8,12 @@ import time
import six
from ..common import TopicPartition
from .. import errors as Errors
from ..structs import TopicPartition
from ..protocol.message import Message, MessageSet
from .buffer import MessageSetBuffer, SimpleBufferPool
from .future import FutureRecordMetadata, FutureProduceResult
import kafka.common as Errors
log = logging.getLogger(__name__)

View File

@ -7,11 +7,11 @@ import threading
import six
from ..common import TopicPartition
from .. import errors as Errors
from ..structs import TopicPartition
from ..version import __version__
from ..protocol.produce import ProduceRequest
import kafka.common as Errors
log = logging.getLogger(__name__)

View File

@ -7,26 +7,21 @@ import six
from six.moves import xrange
import kafka.common
import kafka.protocol.commit
import kafka.protocol.fetch
import kafka.protocol.message
import kafka.protocol.metadata
import kafka.protocol.offset
import kafka.protocol.produce
import kafka.structs
from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
)
from kafka.common import (
ProtocolError, ChecksumError,
UnsupportedCodecError,
ConsumerMetadataResponse
)
gzip_encode, gzip_decode, snappy_encode, snappy_decode)
from kafka.errors import ProtocolError, ChecksumError, UnsupportedCodecError
from kafka.structs import ConsumerMetadataResponse
from kafka.util import (
crc32, read_short_string, read_int_string, relative_unpack,
write_short_string, write_int_string, group_by_topic_and_partition
)
write_short_string, write_int_string, group_by_topic_and_partition)
log = logging.getLogger(__name__)
@ -166,7 +161,7 @@ class KafkaProtocol(object):
Return: list of ProduceResponsePayload
"""
return [
kafka.common.ProduceResponsePayload(topic, partition, error, offset)
kafka.structs.ProduceResponsePayload(topic, partition, error, offset)
for topic, partitions in response.topics
for partition, error, offset in partitions
]
@ -207,9 +202,9 @@ class KafkaProtocol(object):
response: FetchResponse
"""
return [
kafka.common.FetchResponsePayload(
kafka.structs.FetchResponsePayload(
topic, partition, error, highwater_offset, [
kafka.common.OffsetAndMessage(offset, message)
kafka.structs.OffsetAndMessage(offset, message)
for offset, _, message in messages])
for topic, partitions in response.topics
for partition, error, highwater_offset, messages in partitions
@ -239,7 +234,7 @@ class KafkaProtocol(object):
Returns: list of OffsetResponsePayloads
"""
return [
kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets))
kafka.structs.OffsetResponsePayload(topic, partition, error, tuple(offsets))
for topic, partitions in response.topics
for partition, error, offsets in partitions
]
@ -323,7 +318,7 @@ class KafkaProtocol(object):
response: OffsetCommitResponse
"""
return [
kafka.common.OffsetCommitResponsePayload(topic, partition, error)
kafka.structs.OffsetCommitResponsePayload(topic, partition, error)
for topic, partitions in response.topics
for partition, error in partitions
]
@ -362,7 +357,7 @@ class KafkaProtocol(object):
response: OffsetFetchResponse
"""
return [
kafka.common.OffsetFetchResponsePayload(
kafka.structs.OffsetFetchResponsePayload(
topic, partition, offset, metadata, error
)
for topic, partitions in response.topics
@ -379,7 +374,7 @@ def create_message(payload, key=None):
key: bytes, a key used for partition routing (optional)
"""
return kafka.common.Message(0, 0, key, payload)
return kafka.structs.Message(0, 0, key, payload)
def create_gzip_message(payloads, key=None, compresslevel=None):
@ -400,7 +395,7 @@ def create_gzip_message(payloads, key=None, compresslevel=None):
gzipped = gzip_encode(message_set, compresslevel=compresslevel)
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
return kafka.common.Message(0, 0x00 | codec, key, gzipped)
return kafka.structs.Message(0, 0x00 | codec, key, gzipped)
def create_snappy_message(payloads, key=None):
@ -421,7 +416,7 @@ def create_snappy_message(payloads, key=None):
snapped = snappy_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
return kafka.common.Message(0, 0x00 | codec, key, snapped)
return kafka.structs.Message(0, 0x00 | codec, key, snapped)
def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):

View File

@ -7,7 +7,7 @@ import weakref
import six
from kafka.common import BufferUnderflowError
from kafka.errors import BufferUnderflowError
def crc32(data):

View File

@ -6,17 +6,14 @@ import six
from . import unittest
from kafka import SimpleClient
from kafka.common import (
ProduceRequestPayload,
BrokerMetadata,
TopicPartition, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
KafkaTimeoutError, ConnectionError, FailedPayloadsError
)
from kafka.conn import KafkaConnection
from kafka.errors import (
KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError,
UnknownTopicOrPartitionError, ConnectionError, FailedPayloadsError)
from kafka.future import Future
from kafka.protocol import KafkaProtocol, create_message
from kafka.protocol.metadata import MetadataResponse
from kafka.structs import ProduceRequestPayload, BrokerMetadata, TopicPartition
from test.testutil import Timer

View File

@ -4,12 +4,12 @@ import socket
import pytest
from kafka.client_async import KafkaClient
from kafka.common import BrokerMetadata
import kafka.common as Errors
from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse, MetadataRequest
from kafka.protocol.produce import ProduceRequest
from kafka.structs import BrokerMetadata
@pytest.mark.parametrize("bootstrap,expected_hosts", [

View File

@ -1,10 +1,10 @@
import os
from kafka.common import (
FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
KafkaTimeoutError, ProduceRequestPayload
)
from kafka.errors import KafkaTimeoutError
from kafka.protocol import create_message
from kafka.structs import (
FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
ProduceRequestPayload)
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions

View File

@ -5,9 +5,10 @@ from threading import Thread
import mock
from . import unittest
from kafka.common import ConnectionError
from kafka.errors import ConnectionError
from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS
class ConnTest(unittest.TestCase):
def setUp(self):

View File

@ -4,11 +4,11 @@ from mock import MagicMock, patch
from . import unittest
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.common import (
KafkaConfigurationError, FetchResponsePayload, OffsetFetchResponsePayload,
FailedPayloadsError, OffsetAndMessage,
NotLeaderForPartitionError, UnknownTopicOrPartitionError
)
from kafka.errors import (
FailedPayloadsError, KafkaConfigurationError, NotLeaderForPartitionError,
UnknownTopicOrPartitionError)
from kafka.structs import (
FetchResponsePayload, OffsetAndMessage, OffsetFetchResponsePayload)
class TestKafkaConsumer(unittest.TestCase):

View File

@ -7,11 +7,11 @@ import pytest
import six
from kafka import SimpleClient
from kafka.common import TopicPartition
from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse
from kafka.structs import TopicPartition
from test.conftest import version
from test.testutil import random_string

View File

@ -7,11 +7,9 @@ from . import unittest
from kafka import (
KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message
)
from kafka.common import (
ProduceRequestPayload, ConsumerFetchSizeTooSmall,
OffsetOutOfRangeError, TopicPartition
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError
from kafka.structs import ProduceRequestPayload, TopicPartition
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (

View File

@ -5,8 +5,8 @@ from . import unittest
from mock import MagicMock, patch
from kafka.common import OffsetOutOfRangeError
from kafka.context import OffsetCommitContext
from kafka.errors import OffsetOutOfRangeError
class TestOffsetCommitContext(unittest.TestCase):

View File

@ -4,7 +4,7 @@ from __future__ import absolute_import
import pytest
from kafka.client_async import KafkaClient
from kafka.common import TopicPartition, OffsetAndMetadata
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.consumer.subscription_state import (
SubscriptionState, ConsumerRebalanceListener)
from kafka.coordinator.assignors.range import RangePartitionAssignor
@ -13,6 +13,7 @@ from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.protocol import (
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.commit import (
OffsetCommitRequest_v0, OffsetCommitRequest_v1, OffsetCommitRequest_v2,
@ -21,8 +22,6 @@ from kafka.protocol.commit import (
from kafka.protocol.metadata import MetadataResponse
from kafka.util import WeakMethod
import kafka.common as Errors
@pytest.fixture
def conn(mocker):

View File

@ -3,10 +3,10 @@ import os
import time
from kafka import SimpleClient, SimpleConsumer, KeyedProducer
from kafka.common import (
TopicPartition, FailedPayloadsError, ConnectionError, RequestTimedOutError
)
from kafka.errors import (
FailedPayloadsError, ConnectionError, RequestTimedOutError)
from kafka.producer.base import Producer
from kafka.structs import TopicPartition
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, random_string

View File

@ -4,13 +4,12 @@ from __future__ import absolute_import
import pytest
from kafka.client_async import KafkaClient
from kafka.common import TopicPartition, OffsetAndMetadata
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.fetch import FetchRequest
import kafka.common as Errors
from kafka.structs import TopicPartition, OffsetAndMetadata
@pytest.fixture

View File

@ -10,11 +10,9 @@ from kafka import (
RoundRobinPartitioner, HashedPartitioner
)
from kafka.codec import has_snappy
from kafka.common import (
FetchRequestPayload, ProduceRequestPayload,
UnknownTopicOrPartitionError, LeaderNotAvailableError
)
from kafka.errors import UnknownTopicOrPartitionError, LeaderNotAvailableError
from kafka.producer.base import Producer
from kafka.structs import FetchRequestPayload, ProduceRequestPayload
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions

View File

@ -9,12 +9,12 @@ from mock import MagicMock, patch
from . import unittest
from kafka import SimpleClient, SimpleProducer, KeyedProducer
from kafka.common import (
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
ProduceResponsePayload, RetryOptions, TopicPartition
)
from kafka.errors import (
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError)
from kafka.producer.base import Producer, _send_upstream
from kafka.protocol import CODEC_NONE
from kafka.structs import (
ProduceResponsePayload, RetryOptions, TopicPartition)
from six.moves import queue, xrange

View File

@ -7,21 +7,21 @@ from mock import patch, sentinel
from . import unittest
from kafka.codec import has_snappy, gzip_decode, snappy_decode
from kafka.common import (
from kafka.errors import (
ChecksumError, KafkaUnavailableError, UnsupportedCodecError,
ConsumerFetchSizeTooSmall, ProtocolError)
from kafka.protocol import (
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
create_message, create_gzip_message, create_snappy_message,
create_message_set)
from kafka.structs import (
OffsetRequestPayload, OffsetResponsePayload,
OffsetCommitRequestPayload, OffsetCommitResponsePayload,
OffsetFetchRequestPayload, OffsetFetchResponsePayload,
ProduceRequestPayload, ProduceResponsePayload,
FetchRequestPayload, FetchResponsePayload,
Message, ChecksumError, OffsetAndMessage, BrokerMetadata,
KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
ProtocolError, ConsumerMetadataResponse
)
from kafka.protocol import (
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
create_message, create_gzip_message, create_snappy_message,
create_message_set
)
Message, OffsetAndMessage, BrokerMetadata, ConsumerMetadataResponse)
class TestProtocol(unittest.TestCase):
def test_create_message(self):

View File

@ -4,8 +4,9 @@ import struct
import six
from . import unittest
import kafka.common
import kafka.errors
import kafka.util
import kafka.structs
class UtilTest(unittest.TestCase):
@ -48,7 +49,7 @@ class UtilTest(unittest.TestCase):
self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x0bsome string', 0), (b'some string', 15))
def test_read_int_string__insufficient_data(self):
with self.assertRaises(kafka.common.BufferUnderflowError):
with self.assertRaises(kafka.errors.BufferUnderflowError):
kafka.util.read_int_string(b'\x00\x00\x00\x021', 0)
def test_write_short_string(self):
@ -90,7 +91,7 @@ class UtilTest(unittest.TestCase):
self.assertEqual(kafka.util.read_short_string(b'\x00\x0bsome string', 0), (b'some string', 13))
def test_read_int_string__insufficient_data2(self):
with self.assertRaises(kafka.common.BufferUnderflowError):
with self.assertRaises(kafka.errors.BufferUnderflowError):
kafka.util.read_int_string('\x00\x021', 0)
def test_relative_unpack2(self):
@ -100,11 +101,11 @@ class UtilTest(unittest.TestCase):
)
def test_relative_unpack3(self):
with self.assertRaises(kafka.common.BufferUnderflowError):
with self.assertRaises(kafka.errors.BufferUnderflowError):
kafka.util.relative_unpack('>hh', '\x00', 0)
def test_group_by_topic_and_partition(self):
t = kafka.common.TopicPartition
t = kafka.structs.TopicPartition
l = [
t("a", 1),

View File

@ -12,7 +12,7 @@ from six.moves import xrange
from . import unittest
from kafka import SimpleClient
from kafka.common import OffsetRequestPayload
from kafka.structs import OffsetRequestPayload
__all__ = [
'random_string',