Make all unit tests pass on py3.3/3.4

This commit is contained in:
Bruno Renié
2014-08-28 20:50:20 +02:00
committed by Mark Roberts
parent 83af5102e9
commit cf0b7f0530
22 changed files with 349 additions and 271 deletions

View File

@@ -3,6 +3,8 @@ language: python
python:
- 2.6
- 2.7
- 3.3
- 3.4
- pypy
env:

View File

@@ -81,7 +81,7 @@ class KafkaClient(object):
"""
Generate a new correlation id
"""
return KafkaClient.ID_GEN.next()
return next(KafkaClient.ID_GEN)
def _send_broker_unaware_request(self, requestId, request):
"""

View File

@@ -1,8 +1,11 @@
from cStringIO import StringIO
from io import BytesIO
import gzip
import struct
_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1)
import six
from six.moves import xrange
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
try:
@@ -21,7 +24,7 @@ def has_snappy():
def gzip_encode(payload):
buffer = StringIO()
buffer = BytesIO()
handle = gzip.GzipFile(fileobj=buffer, mode="w")
handle.write(payload)
handle.close()
@@ -32,7 +35,7 @@ def gzip_encode(payload):
def gzip_decode(payload):
buffer = StringIO(payload)
buffer = BytesIO(payload)
handle = gzip.GzipFile(fileobj=buffer, mode='r')
result = handle.read()
handle.close()
@@ -68,9 +71,9 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
for i in xrange(0, len(payload), xerial_blocksize):
yield payload[i:i+xerial_blocksize]
out = StringIO()
out = BytesIO()
header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat
header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
out.write(header)
@@ -121,8 +124,8 @@ def snappy_decode(payload):
if _detect_xerial_stream(payload):
# TODO ? Should become a fileobj ?
out = StringIO()
byt = buffer(payload[16:])
out = BytesIO()
byt = payload[16:]
length = len(byt)
cursor = 0

View File

@@ -5,6 +5,8 @@ import struct
from random import shuffle
from threading import local
import six
from kafka.common import ConnectionError
log = logging.getLogger("kafka")
@@ -19,7 +21,7 @@ def collect_hosts(hosts, randomize=True):
randomize the returned list.
"""
if isinstance(hosts, basestring):
if isinstance(hosts, six.string_types):
hosts = hosts.strip().split(',')
result = []
@@ -92,7 +94,7 @@ class KafkaConnection(local):
# Receiving empty string from recv signals
# that the socket is in error. we will never get
# more data from this socket
if data == '':
if data == b'':
raise socket.error("Not enough data to read message -- did server kill socket?")
except socket.error:
@@ -103,7 +105,7 @@ class KafkaConnection(local):
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
responses.append(data)
return ''.join(responses)
return b''.join(responses)
##################
# Public API #
@@ -144,7 +146,7 @@ class KafkaConnection(local):
# Read the remainder of the response
resp = self._read_bytes(size)
return str(resp)
return resp
def copy(self):
"""

View File

@@ -1,12 +1,19 @@
from __future__ import absolute_import
from itertools import izip_longest, repeat
try:
from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
except ImportError: # python 2
from itertools import izip_longest as izip_longest, repeat
import logging
import time
import numbers
from threading import Lock
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue
try:
from Queue import Empty, Queue
except ImportError: # python 2
from queue import Empty, Queue
import kafka
from kafka.common import (

View File

@@ -4,11 +4,17 @@ import logging
import time
import random
from Queue import Empty
try:
from queue import Empty
except ImportError:
from Queue import Empty
from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
import six
from six.moves import xrange
from kafka.common import (
ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError
)
@@ -172,8 +178,8 @@ class Producer(object):
if not isinstance(msg, (list, tuple)):
raise TypeError("msg is not a list or tuple!")
# Raise TypeError if any message is not encoded as a str
if any(not isinstance(m, str) for m in msg):
# Raise TypeError if any message is not encoded as bytes
if any(not isinstance(m, six.binary_type) for m in msg):
raise TypeError("all produce message payloads must be type str")
if self.async:
@@ -221,7 +227,7 @@ class SimpleProducer(Producer):
batch_send_every_t - If set, messages are send after this timeout
random_start - If true, randomize the initial partition which the
the first message block will be published to, otherwise
if false, the first message block will always publish
if false, the first message block will always publish
to partition 0 before cycling through each partition
"""
def __init__(self, client, async=False,

View File

@@ -1,6 +1,9 @@
import logging
import struct
import zlib
import six
from six.moves import xrange
from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
@@ -13,7 +16,7 @@ from kafka.common import (
UnsupportedCodecError
)
from kafka.util import (
read_short_string, read_int_string, relative_unpack,
crc32, read_short_string, read_int_string, relative_unpack,
write_short_string, write_int_string, group_by_topic_and_partition
)
@@ -67,7 +70,7 @@ class KafkaProtocol(object):
Offset => int64
MessageSize => int32
"""
message_set = ""
message_set = b""
for message in messages:
encoded_message = KafkaProtocol._encode_message(message)
message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message)
@@ -94,7 +97,7 @@ class KafkaProtocol(object):
msg = struct.pack('>BB', message.magic, message.attributes)
msg += write_int_string(message.key)
msg += write_int_string(message.value)
crc = zlib.crc32(msg)
crc = crc32(msg)
msg = struct.pack('>i%ds' % len(msg), crc, msg)
else:
raise ProtocolError("Unexpected magic number: %d" % message.magic)
@@ -146,7 +149,7 @@ class KafkaProtocol(object):
of the MessageSet payload).
"""
((crc, magic, att), cur) = relative_unpack('>iBB', data, 0)
if crc != zlib.crc32(data[4:]):
if crc != crc32(data[4:]):
raise ChecksumError("Message checksum failed")
(key, cur) = read_int_string(data, cur)

View File

@@ -1,14 +1,30 @@
import collections
import struct
import sys
import zlib
from threading import Thread, Event
import six
from kafka.common import BufferUnderflowError
def crc32(data):
"""
Python 2 returns a value in the range [-2**31, 2**31-1].
Python 3 returns a value in the range [0, 2**32-1].
We want a consistent behavior so let's use python2's.
"""
crc = zlib.crc32(data)
if six.PY3 and crc > 2**31:
crc -= 2 ** 32
return crc
def write_int_string(s):
if s is not None and not isinstance(s, str):
raise TypeError('Expected "%s" to be str\n'
if s is not None and not isinstance(s, six.binary_type):
raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>i', -1)
@@ -17,12 +33,12 @@ def write_int_string(s):
def write_short_string(s):
if s is not None and not isinstance(s, str):
raise TypeError('Expected "%s" to be str\n'
if s is not None and not isinstance(s, six.binary_type):
raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>h', -1)
elif len(s) > 32767 and sys.version < (2, 7):
elif len(s) > 32767 and sys.version_info < (2, 7):
# Python 2.6 issues a deprecation warning instead of a struct error
raise struct.error(len(s))
else:

View File

@@ -48,6 +48,7 @@ protocol as well as broker-aware request routing. Gzip and Snappy compression
is also supported for message sets.
""",
keywords="apache kafka",
install_requires=['six'],
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",

View File

@@ -4,10 +4,10 @@ import os.path
import shutil
import subprocess
import tempfile
import urllib2
from six.moves import urllib
import uuid
from urlparse import urlparse
from six.moves.urllib.parse import urlparse # pylint: disable-msg=E0611
from test.service import ExternalService, SpawnedService
from test.testutil import get_open_port
@@ -42,12 +42,12 @@ class Fixture(object):
try:
url = url_base + distfile + '.tgz'
logging.info("Attempting to download %s", url)
response = urllib2.urlopen(url)
except urllib2.HTTPError:
response = urllib.request.urlopen(url)
except urllib.error.HTTPError:
logging.exception("HTTP Error")
url = url_base + distfile + '.tar.gz'
logging.info("Attempting to download %s", url)
response = urllib2.urlopen(url)
response = urllib.request.urlopen(url)
logging.info("Saving distribution file to %s", output_file)
with open(output_file, 'w') as output_file_fd:

View File

@@ -1,5 +1,6 @@
from . import unittest
import six
from mock import MagicMock, patch
from kafka import KafkaClient
@@ -15,25 +16,25 @@ class TestKafkaClient(unittest.TestCase):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)
self.assertEqual(
sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
sorted(client.hosts))
def test_init_with_csv(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092,kafka03:9092')
self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)
self.assertEqual(
sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
sorted(client.hosts))
def test_init_with_unicode_csv(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(hosts=u'kafka01:9092,kafka02:9092,kafka03:9092')
self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)
self.assertEqual(
sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
sorted(client.hosts))
def test_send_broker_unaware_request_fail(self):
'Tests that call fails when all hosts are unavailable'
@@ -58,7 +59,7 @@ class TestKafkaClient(unittest.TestCase):
with self.assertRaises(KafkaUnavailableError):
client._send_broker_unaware_request(1, 'fake request')
for key, conn in mocked_conns.iteritems():
for key, conn in six.iteritems(mocked_conns):
conn.send.assert_called_with(1, 'fake request')
def test_send_broker_unaware_request(self):

View File

@@ -1,4 +1,5 @@
import struct
from six.moves import xrange
from . import unittest
from kafka.codec import (
@@ -8,7 +9,7 @@ from kafka.codec import (
from kafka.protocol import (
create_gzip_message, create_message, create_snappy_message, KafkaProtocol
)
from testutil import *
from .testutil import *
class TestCodec(unittest.TestCase):
def test_gzip(self):
@@ -31,7 +32,7 @@ class TestCodec(unittest.TestCase):
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
random_snappy = snappy_encode('SNAPPY' * 50)
random_snappy = snappy_encode(b'SNAPPY' * 50)
short_data = b'\x01\x02\x03\x04'
self.assertTrue(_detect_xerial_stream(header))
@@ -44,26 +45,28 @@ class TestCodec(unittest.TestCase):
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_decode_xerial(self):
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
random_snappy = snappy_encode('SNAPPY' * 50)
random_snappy = snappy_encode(b'SNAPPY' * 50)
block_len = len(random_snappy)
random_snappy2 = snappy_encode('XERIAL' * 50)
random_snappy2 = snappy_encode(b'XERIAL' * 50)
block_len2 = len(random_snappy2)
to_test = header \
+ struct.pack('!i', block_len) + random_snappy \
+ struct.pack('!i', block_len2) + random_snappy2 \
self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50))
self.assertEquals(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50))
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_encode_xerial(self):
to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \
'\x00\x00\x00\x18' + \
'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \
'\x00\x00\x00\x18' + \
'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
to_ensure = (
b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
b'\x00\x00\x00\x18'
b'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
b'\x00\x00\x00\x18'
b'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
)
to_test = ('SNAPPY' * 50) + ('XERIAL' * 50)
to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50)
compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
self.assertEquals(compressed, to_ensure)

View File

@@ -13,8 +13,8 @@ class ConnTest(unittest.TestCase):
'host': 'localhost',
'port': 9090,
'request_id': 0,
'payload': 'test data',
'payload2': 'another packet'
'payload': b'test data',
'payload2': b'another packet'
}
# Mocking socket.create_connection will cause _sock to always be a
@@ -35,12 +35,12 @@ class ConnTest(unittest.TestCase):
struct.pack('>%ds' % payload_size, self.config['payload']),
struct.pack('>i', payload2_size),
struct.pack('>%ds' % payload2_size, self.config['payload2']),
''
b''
]
# Create a connection object
self.conn = KafkaConnection(self.config['host'], self.config['port'])
# Reset any mock counts caused by __init__
self.MockCreateConn.reset_mock()

View File

@@ -4,8 +4,8 @@ from datetime import datetime
from kafka import * # noqa
from kafka.common import * # noqa
from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
from fixtures import ZookeeperFixture, KafkaFixture
from testutil import *
from .fixtures import ZookeeperFixture, KafkaFixture
from .testutil import *
class TestConsumerIntegration(KafkaIntegrationTestCase):
@classmethod

View File

@@ -6,8 +6,8 @@ from . import unittest
from kafka import * # noqa
from kafka.common import * # noqa
from kafka.producer import Producer
from fixtures import ZookeeperFixture, KafkaFixture
from testutil import *
from .fixtures import ZookeeperFixture, KafkaFixture
from .testutil import *
class TestFailover(KafkaIntegrationTestCase):

View File

@@ -18,13 +18,13 @@ class TestKafkaProducer(unittest.TestCase):
topic = "test-topic"
partition = 0
bad_data_types = (u'你怎么样?', 12, ['a','list'], ('a','tuple'), {'a': 'dict'})
bad_data_types = (u'你怎么样?', 12, ['a', 'list'], ('a', 'tuple'), {'a': 'dict'})
for m in bad_data_types:
with self.assertRaises(TypeError):
logging.debug("attempting to send message of type %s", type(m))
producer.send_messages(topic, partition, m)
good_data_types = ('a string!',)
good_data_types = (b'a string!',)
for m in good_data_types:
# This should not raise an exception
producer.send_messages(topic, partition, m)

View File

@@ -5,8 +5,8 @@ import uuid
from kafka import * # noqa
from kafka.common import * # noqa
from kafka.codec import has_gzip, has_snappy
from fixtures import ZookeeperFixture, KafkaFixture
from testutil import *
from .fixtures import ZookeeperFixture, KafkaFixture
from .testutil import *
class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
topic = 'produce_topic'

View File

@@ -1,9 +1,9 @@
import contextlib
from contextlib import contextmanager
import struct
from . import unittest
import mock
import six
from mock import sentinel
from kafka import KafkaClient
@@ -38,21 +38,21 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(msg.value, payload)
def test_create_gzip(self):
payloads = ["v1", "v2"]
payloads = [b"v1", b"v2"]
msg = create_gzip_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
self.assertEqual(msg.key, None)
# Need to decode to check since gzipped payload is non-deterministic
decoded = gzip_decode(msg.value)
expect = "".join([
expect = b"".join([
struct.pack(">q", 0), # MsgSet offset
struct.pack(">i", 16), # MsgSet size
struct.pack(">i", 1285512130), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", -1), # -1 indicates a null key
struct.pack(">i", 2), # Msg length (bytes)
"v1", # Message contents
b"v1", # Message contents
struct.pack(">q", 0), # MsgSet offset
struct.pack(">i", 16), # MsgSet size
@@ -60,27 +60,27 @@ class TestProtocol(unittest.TestCase):
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", -1), # -1 indicates a null key
struct.pack(">i", 2), # Msg length (bytes)
"v2", # Message contents
b"v2", # Message contents
])
self.assertEqual(decoded, expect)
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_create_snappy(self):
payloads = ["v1", "v2"]
payloads = [b"v1", b"v2"]
msg = create_snappy_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
self.assertEqual(msg.key, None)
decoded = snappy_decode(msg.value)
expect = "".join([
expect = b"".join([
struct.pack(">q", 0), # MsgSet offset
struct.pack(">i", 16), # MsgSet size
struct.pack(">i", 1285512130), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", -1), # -1 indicates a null key
struct.pack(">i", 2), # Msg length (bytes)
"v1", # Message contents
b"v1", # Message contents
struct.pack(">q", 0), # MsgSet offset
struct.pack(">i", 16), # MsgSet size
@@ -88,52 +88,53 @@ class TestProtocol(unittest.TestCase):
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", -1), # -1 indicates a null key
struct.pack(">i", 2), # Msg length (bytes)
"v2", # Message contents
b"v2", # Message contents
])
self.assertEqual(decoded, expect)
def test_encode_message_header(self):
expect = "".join([
expect = b"".join([
struct.pack(">h", 10), # API Key
struct.pack(">h", 0), # API Version
struct.pack(">i", 4), # Correlation Id
struct.pack(">h", len("client3")), # Length of clientId
"client3", # ClientId
b"client3", # ClientId
])
encoded = KafkaProtocol._encode_message_header("client3", 4, 10)
encoded = KafkaProtocol._encode_message_header(b"client3", 4, 10)
self.assertEqual(encoded, expect)
def test_encode_message(self):
message = create_message("test", "key")
message = create_message(b"test", b"key")
encoded = KafkaProtocol._encode_message(message)
expect = "".join([
print("CRC", -1427009701)
expect = b"".join([
struct.pack(">i", -1427009701), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 3), # Length of key
"key", # key
b"key", # key
struct.pack(">i", 4), # Length of value
"test", # value
b"test", # value
])
self.assertEqual(encoded, expect)
def test_decode_message(self):
encoded = "".join([
encoded = b"".join([
struct.pack(">i", -1427009701), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 3), # Length of key
"key", # key
b"key", # key
struct.pack(">i", 4), # Length of value
"test", # value
b"test", # value
])
offset = 10
(returned_offset, decoded_message) = list(KafkaProtocol._decode_message(encoded, offset))[0]
self.assertEqual(returned_offset, offset)
self.assertEqual(decoded_message, create_message("test", "key"))
self.assertEqual(decoded_message, create_message(b"test", b"key"))
def test_encode_message_failure(self):
with self.assertRaises(ProtocolError):
@@ -141,52 +142,52 @@ class TestProtocol(unittest.TestCase):
def test_encode_message_set(self):
message_set = [
create_message("v1", "k1"),
create_message("v2", "k2")
create_message(b"v1", b"k1"),
create_message(b"v2", b"k2")
]
encoded = KafkaProtocol._encode_message_set(message_set)
expect = "".join([
expect = b"".join([
struct.pack(">q", 0), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", 1474775406), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
"k1", # Key
b"k1", # Key
struct.pack(">i", 2), # Length of value
"v1", # Value
b"v1", # Value
struct.pack(">q", 0), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", -16383415), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
"k2", # Key
b"k2", # Key
struct.pack(">i", 2), # Length of value
"v2", # Value
b"v2", # Value
])
self.assertEqual(encoded, expect)
def test_decode_message_set(self):
encoded = "".join([
encoded = b"".join([
struct.pack(">q", 0), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", 1474775406), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
"k1", # Key
b"k1", # Key
struct.pack(">i", 2), # Length of value
"v1", # Value
b"v1", # Value
struct.pack(">q", 1), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", -16383415), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
"k2", # Key
b"k2", # Key
struct.pack(">i", 2), # Length of value
"v2", # Value
b"v2", # Value
])
msgs = list(KafkaProtocol._decode_message_set_iter(encoded))
@@ -197,17 +198,17 @@ class TestProtocol(unittest.TestCase):
returned_offset2, decoded_message2 = msg2
self.assertEqual(returned_offset1, 0)
self.assertEqual(decoded_message1, create_message("v1", "k1"))
self.assertEqual(decoded_message1, create_message(b"v1", b"k1"))
self.assertEqual(returned_offset2, 1)
self.assertEqual(decoded_message2, create_message("v2", "k2"))
self.assertEqual(decoded_message2, create_message(b"v2", b"k2"))
def test_decode_message_gzip(self):
gzip_encoded = ('\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000'
'\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01'
'\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8'
'\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00'
'\x00')
gzip_encoded = (b'\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000'
b'\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01'
b'\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8'
b'\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00'
b'\x00')
offset = 11
messages = list(KafkaProtocol._decode_message(gzip_encoded, offset))
@@ -216,18 +217,18 @@ class TestProtocol(unittest.TestCase):
returned_offset1, decoded_message1 = msg1
self.assertEqual(returned_offset1, 0)
self.assertEqual(decoded_message1, create_message("v1"))
self.assertEqual(decoded_message1, create_message(b"v1"))
returned_offset2, decoded_message2 = msg2
self.assertEqual(returned_offset2, 0)
self.assertEqual(decoded_message2, create_message("v2"))
self.assertEqual(decoded_message2, create_message(b"v2"))
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_decode_message_snappy(self):
snappy_encoded = ('\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00'
'\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff'
'\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5'
'\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2')
snappy_encoded = (b'\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00'
b'\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff'
b'\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5'
b'\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2')
offset = 11
messages = list(KafkaProtocol._decode_message(snappy_encoded, offset))
self.assertEqual(len(messages), 2)
@@ -236,14 +237,14 @@ class TestProtocol(unittest.TestCase):
returned_offset1, decoded_message1 = msg1
self.assertEqual(returned_offset1, 0)
self.assertEqual(decoded_message1, create_message("v1"))
self.assertEqual(decoded_message1, create_message(b"v1"))
returned_offset2, decoded_message2 = msg2
self.assertEqual(returned_offset2, 0)
self.assertEqual(decoded_message2, create_message("v2"))
self.assertEqual(decoded_message2, create_message(b"v2"))
def test_decode_message_checksum_error(self):
invalid_encoded_message = "This is not a valid encoded message"
invalid_encoded_message = b"This is not a valid encoded message"
iter = KafkaProtocol._decode_message(invalid_encoded_message, 0)
self.assertRaises(ChecksumError, list, iter)
@@ -254,25 +255,25 @@ class TestProtocol(unittest.TestCase):
list(KafkaProtocol._decode_message_set_iter('a'))
def test_decode_message_set_stop_iteration(self):
encoded = "".join([
encoded = b"".join([
struct.pack(">q", 0), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", 1474775406), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
"k1", # Key
b"k1", # Key
struct.pack(">i", 2), # Length of value
"v1", # Value
b"v1", # Value
struct.pack(">q", 1), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", -16383415), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
"k2", # Key
b"k2", # Key
struct.pack(">i", 2), # Length of value
"v2", # Value
"@1$%(Y!", # Random padding
b"v2", # Value
b"@1$%(Y!", # Random padding
])
msgs = list(KafkaProtocol._decode_message_set_iter(encoded))
@@ -283,40 +284,40 @@ class TestProtocol(unittest.TestCase):
returned_offset2, decoded_message2 = msg2
self.assertEqual(returned_offset1, 0)
self.assertEqual(decoded_message1, create_message("v1", "k1"))
self.assertEqual(decoded_message1, create_message(b"v1", b"k1"))
self.assertEqual(returned_offset2, 1)
self.assertEqual(decoded_message2, create_message("v2", "k2"))
self.assertEqual(decoded_message2, create_message(b"v2", b"k2"))
def test_encode_produce_request(self):
requests = [
ProduceRequest("topic1", 0, [
create_message("a"),
create_message("b")
ProduceRequest(b"topic1", 0, [
create_message(b"a"),
create_message(b"b")
]),
ProduceRequest("topic2", 1, [
create_message("c")
ProduceRequest(b"topic2", 1, [
create_message(b"c")
])
]
msg_a_binary = KafkaProtocol._encode_message(create_message("a"))
msg_b_binary = KafkaProtocol._encode_message(create_message("b"))
msg_c_binary = KafkaProtocol._encode_message(create_message("c"))
msg_a_binary = KafkaProtocol._encode_message(create_message(b"a"))
msg_b_binary = KafkaProtocol._encode_message(create_message(b"b"))
msg_c_binary = KafkaProtocol._encode_message(create_message(b"c"))
header = "".join([
header = b"".join([
struct.pack('>i', 0x94), # The length of the message overall
struct.pack('>h', 0), # Msg Header, Message type = Produce
struct.pack('>h', 0), # Msg Header, API version
struct.pack('>i', 2), # Msg Header, Correlation ID
struct.pack('>h7s', 7, "client1"), # Msg Header, The client ID
struct.pack('>h7s', 7, b"client1"), # Msg Header, The client ID
struct.pack('>h', 2), # Num acks required
struct.pack('>i', 100), # Request Timeout
struct.pack('>i', 2), # The number of requests
])
total_len = len(msg_a_binary) + len(msg_b_binary)
topic1 = "".join([
struct.pack('>h6s', 6, 'topic1'), # The topic1
topic1 = b"".join([
struct.pack('>h6s', 6, b'topic1'), # The topic1
struct.pack('>i', 1), # One message set
struct.pack('>i', 0), # Partition 0
struct.pack('>i', total_len + 24), # Size of the incoming message set
@@ -328,8 +329,8 @@ class TestProtocol(unittest.TestCase):
msg_b_binary, # Actual message
])
topic2 = "".join([
struct.pack('>h6s', 6, 'topic2'), # The topic1
topic2 = b"".join([
struct.pack('>h6s', 6, b'topic2'), # The topic1
struct.pack('>i', 1), # One message set
struct.pack('>i', 1), # Partition 1
struct.pack('>i', len(msg_c_binary) + 12), # Size of the incoming message set
@@ -338,68 +339,72 @@ class TestProtocol(unittest.TestCase):
msg_c_binary, # Actual message
])
expected1 = "".join([ header, topic1, topic2 ])
expected2 = "".join([ header, topic2, topic1 ])
expected1 = b"".join([ header, topic1, topic2 ])
expected2 = b"".join([ header, topic2, topic1 ])
encoded = KafkaProtocol.encode_produce_request("client1", 2, requests, 2, 100)
encoded = KafkaProtocol.encode_produce_request(b"client1", 2, requests, 2, 100)
self.assertIn(encoded, [ expected1, expected2 ])
def test_decode_produce_response(self):
t1 = "topic1"
t2 = "topic2"
t1 = b"topic1"
t2 = b"topic2"
_long = int
if six.PY2:
_long = long
encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)),
2, 2, len(t1), t1, 2, 0, 0, 10L, 1, 1, 20L,
len(t2), t2, 1, 0, 0, 30L)
2, 2, len(t1), t1, 2, 0, 0, _long(10), 1, 1, _long(20),
len(t2), t2, 1, 0, 0, _long(30))
responses = list(KafkaProtocol.decode_produce_response(encoded))
self.assertEqual(responses,
[ProduceResponse(t1, 0, 0, 10L),
ProduceResponse(t1, 1, 1, 20L),
ProduceResponse(t2, 0, 0, 30L)])
[ProduceResponse(t1, 0, 0, _long(10)),
ProduceResponse(t1, 1, 1, _long(20)),
ProduceResponse(t2, 0, 0, _long(30))])
def test_encode_fetch_request(self):
requests = [
FetchRequest("topic1", 0, 10, 1024),
FetchRequest("topic2", 1, 20, 100),
FetchRequest(b"topic1", 0, 10, 1024),
FetchRequest(b"topic2", 1, 20, 100),
]
header = "".join([
header = b"".join([
struct.pack('>i', 89), # The length of the message overall
struct.pack('>h', 1), # Msg Header, Message type = Fetch
struct.pack('>h', 0), # Msg Header, API version
struct.pack('>i', 3), # Msg Header, Correlation ID
struct.pack('>h7s', 7, "client1"), # Msg Header, The client ID
struct.pack('>h7s', 7, b"client1"),# Msg Header, The client ID
struct.pack('>i', -1), # Replica Id
struct.pack('>i', 2), # Max wait time
struct.pack('>i', 100), # Min bytes
struct.pack('>i', 2), # Num requests
])
topic1 = "".join([
struct.pack('>h6s', 6, 'topic1'), # Topic
topic1 = b"".join([
struct.pack('>h6s', 6, b'topic1'),# Topic
struct.pack('>i', 1), # Num Payloads
struct.pack('>i', 0), # Partition 0
struct.pack('>q', 10), # Offset
struct.pack('>i', 1024), # Max Bytes
])
topic2 = "".join([
struct.pack('>h6s', 6, 'topic2'), # Topic
topic2 = b"".join([
struct.pack('>h6s', 6, b'topic2'),# Topic
struct.pack('>i', 1), # Num Payloads
struct.pack('>i', 1), # Partition 0
struct.pack('>q', 20), # Offset
struct.pack('>i', 100), # Max Bytes
])
expected1 = "".join([ header, topic1, topic2 ])
expected2 = "".join([ header, topic2, topic1 ])
expected1 = b"".join([ header, topic1, topic2 ])
expected2 = b"".join([ header, topic2, topic1 ])
encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2, 100)
encoded = KafkaProtocol.encode_fetch_request(b"client1", 3, requests, 2, 100)
self.assertIn(encoded, [ expected1, expected2 ])
def test_decode_fetch_response(self):
t1 = "topic1"
t2 = "topic2"
msgs = map(create_message, ["message1", "hi", "boo", "foo", "so fun!"])
t1 = b"topic1"
t2 = b"topic2"
msgs = [create_message(msg)
for msg in [b"message1", b"hi", b"boo", b"foo", b"so fun!"]]
ms1 = KafkaProtocol._encode_message_set([msgs[0], msgs[1]])
ms2 = KafkaProtocol._encode_message_set([msgs[2]])
ms3 = KafkaProtocol._encode_message_set([msgs[3], msgs[4]])
@@ -416,7 +421,7 @@ class TestProtocol(unittest.TestCase):
response.error, response.highwaterMark,
list(response.messages))
expanded_responses = map(expand_messages, responses)
expanded_responses = list(map(expand_messages, responses))
expect = [FetchResponse(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]),
OffsetAndMessage(0, msgs[1])]),
FetchResponse(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]),
@@ -425,47 +430,47 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(expanded_responses, expect)
def test_encode_metadata_request_no_topics(self):
expected = "".join([
expected = b"".join([
struct.pack(">i", 17), # Total length of the request
struct.pack('>h', 3), # API key metadata fetch
struct.pack('>h', 0), # API version
struct.pack('>i', 4), # Correlation ID
struct.pack('>h3s', 3, "cid"), # The client ID
struct.pack('>h3s', 3, b"cid"),# The client ID
struct.pack('>i', 0), # No topics, give all the data!
])
encoded = KafkaProtocol.encode_metadata_request("cid", 4)
encoded = KafkaProtocol.encode_metadata_request(b"cid", 4)
self.assertEqual(encoded, expected)
def test_encode_metadata_request_with_topics(self):
expected = "".join([
expected = b"".join([
struct.pack(">i", 25), # Total length of the request
struct.pack('>h', 3), # API key metadata fetch
struct.pack('>h', 0), # API version
struct.pack('>i', 4), # Correlation ID
struct.pack('>h3s', 3, "cid"), # The client ID
struct.pack('>h3s', 3, b"cid"),# The client ID
struct.pack('>i', 2), # Number of topics in the request
struct.pack('>h2s', 2, "t1"), # Topic "t1"
struct.pack('>h2s', 2, "t2"), # Topic "t2"
struct.pack('>h2s', 2, b"t1"), # Topic "t1"
struct.pack('>h2s', 2, b"t2"), # Topic "t2"
])
encoded = KafkaProtocol.encode_metadata_request("cid", 4, ["t1", "t2"])
encoded = KafkaProtocol.encode_metadata_request(b"cid", 4, [b"t1", b"t2"])
self.assertEqual(encoded, expected)
def _create_encoded_metadata_response(self, broker_data, topic_data,
topic_errors, partition_errors):
encoded = struct.pack('>ii', 3, len(broker_data))
for node_id, broker in broker_data.iteritems():
for node_id, broker in six.iteritems(broker_data):
encoded += struct.pack('>ih%dsi' % len(broker.host), node_id,
len(broker.host), broker.host, broker.port)
encoded += struct.pack('>i', len(topic_data))
for topic, partitions in topic_data.iteritems():
for topic, partitions in six.iteritems(topic_data):
encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic],
len(topic), topic, len(partitions))
for partition, metadata in partitions.iteritems():
for partition, metadata in six.iteritems(partitions):
encoded += struct.pack('>hiii',
partition_errors[(topic, partition)],
partition, metadata.leader,
@@ -483,25 +488,25 @@ class TestProtocol(unittest.TestCase):
def test_decode_metadata_response(self):
node_brokers = {
0: BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000),
1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001),
3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000)
0: BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000),
1: BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001),
3: BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
}
topic_partitions = {
"topic1": {
0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)),
1: PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1))
b"topic1": {
0: PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,)),
1: PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1))
},
"topic2": {
0: PartitionMetadata("topic2", 0, 0, (), ())
b"topic2": {
0: PartitionMetadata(b"topic2", 0, 0, (), ())
}
}
topic_errors = {"topic1": 0, "topic2": 1}
topic_errors = {b"topic1": 0, b"topic2": 1}
partition_errors = {
("topic1", 0): 0,
("topic1", 1): 1,
("topic2", 0): 0
(b"topic1", 0): 0,
(b"topic1", 1): 1,
(b"topic2", 0): 0
}
encoded = self._create_encoded_metadata_response(node_brokers,
topic_partitions,
@@ -511,31 +516,31 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(decoded, (node_brokers, topic_partitions))
def test_encode_offset_request(self):
expected = "".join([
expected = b"".join([
struct.pack(">i", 21), # Total length of the request
struct.pack('>h', 2), # Message type = offset fetch
struct.pack('>h', 0), # API version
struct.pack('>i', 4), # Correlation ID
struct.pack('>h3s', 3, "cid"), # The client ID
struct.pack('>h3s', 3, b"cid"), # The client ID
struct.pack('>i', -1), # Replica Id
struct.pack('>i', 0), # No topic/partitions
])
encoded = KafkaProtocol.encode_offset_request("cid", 4)
encoded = KafkaProtocol.encode_offset_request(b"cid", 4)
self.assertEqual(encoded, expected)
def test_encode_offset_request__no_payload(self):
expected = "".join([
expected = b"".join([
struct.pack(">i", 65), # Total length of the request
struct.pack('>h', 2), # Message type = offset fetch
struct.pack('>h', 0), # API version
struct.pack('>i', 4), # Correlation ID
struct.pack('>h3s', 3, "cid"), # The client ID
struct.pack('>h3s', 3, b"cid"), # The client ID
struct.pack('>i', -1), # Replica Id
struct.pack('>i', 1), # Num topics
struct.pack(">h6s", 6, "topic1"), # Topic for the request
struct.pack(">h6s", 6, b"topic1"),# Topic for the request
struct.pack(">i", 2), # Two partitions
struct.pack(">i", 3), # Partition 3
@@ -547,18 +552,18 @@ class TestProtocol(unittest.TestCase):
struct.pack(">i", 1), # One offset requested
])
encoded = KafkaProtocol.encode_offset_request("cid", 4, [
OffsetRequest('topic1', 3, -1, 1),
OffsetRequest('topic1', 4, -1, 1),
encoded = KafkaProtocol.encode_offset_request(b"cid", 4, [
OffsetRequest(b'topic1', 3, -1, 1),
OffsetRequest(b'topic1', 4, -1, 1),
])
self.assertEqual(encoded, expected)
def test_decode_offset_response(self):
encoded = "".join([
encoded = b"".join([
struct.pack(">i", 42), # Correlation ID
struct.pack(">i", 1), # One topics
struct.pack(">h6s", 6, "topic1"), # First topic
struct.pack(">h6s", 6, b"topic1"),# First topic
struct.pack(">i", 2), # Two partitions
struct.pack(">i", 2), # Partition 2
@@ -574,24 +579,24 @@ class TestProtocol(unittest.TestCase):
results = KafkaProtocol.decode_offset_response(encoded)
self.assertEqual(set(results), set([
OffsetResponse(topic = 'topic1', partition = 2, error = 0, offsets=(4,)),
OffsetResponse(topic = 'topic1', partition = 4, error = 0, offsets=(8,)),
OffsetResponse(topic = b'topic1', partition = 2, error = 0, offsets=(4,)),
OffsetResponse(topic = b'topic1', partition = 4, error = 0, offsets=(8,)),
]))
def test_encode_offset_commit_request(self):
header = "".join([
header = b"".join([
struct.pack('>i', 99), # Total message length
struct.pack('>h', 8), # Message type = offset commit
struct.pack('>h', 0), # API version
struct.pack('>i', 42), # Correlation ID
struct.pack('>h9s', 9, "client_id"), # The client ID
struct.pack('>h8s', 8, "group_id"), # The group to commit for
struct.pack('>h9s', 9, b"client_id"),# The client ID
struct.pack('>h8s', 8, b"group_id"), # The group to commit for
struct.pack('>i', 2), # Num topics
])
topic1 = "".join([
struct.pack(">h6s", 6, "topic1"), # Topic for the request
topic1 = b"".join([
struct.pack(">h6s", 6, b"topic1"), # Topic for the request
struct.pack(">i", 2), # Two partitions
struct.pack(">i", 0), # Partition 0
struct.pack(">q", 123), # Offset 123
@@ -601,30 +606,30 @@ class TestProtocol(unittest.TestCase):
struct.pack(">h", -1), # Null metadata
])
topic2 = "".join([
struct.pack(">h6s", 6, "topic2"), # Topic for the request
topic2 = b"".join([
struct.pack(">h6s", 6, b"topic2"), # Topic for the request
struct.pack(">i", 1), # One partition
struct.pack(">i", 2), # Partition 2
struct.pack(">q", 345), # Offset 345
struct.pack(">h", -1), # Null metadata
])
expected1 = "".join([ header, topic1, topic2 ])
expected2 = "".join([ header, topic2, topic1 ])
expected1 = b"".join([ header, topic1, topic2 ])
expected2 = b"".join([ header, topic2, topic1 ])
encoded = KafkaProtocol.encode_offset_commit_request("client_id", 42, "group_id", [
OffsetCommitRequest("topic1", 0, 123, None),
OffsetCommitRequest("topic1", 1, 234, None),
OffsetCommitRequest("topic2", 2, 345, None),
encoded = KafkaProtocol.encode_offset_commit_request(b"client_id", 42, b"group_id", [
OffsetCommitRequest(b"topic1", 0, 123, None),
OffsetCommitRequest(b"topic1", 1, 234, None),
OffsetCommitRequest(b"topic2", 2, 345, None),
])
self.assertIn(encoded, [ expected1, expected2 ])
def test_decode_offset_commit_response(self):
encoded = "".join([
encoded = b"".join([
struct.pack(">i", 42), # Correlation ID
struct.pack(">i", 1), # One topic
struct.pack(">h6s", 6, "topic1"), # First topic
struct.pack(">h6s", 6, b"topic1"),# First topic
struct.pack(">i", 2), # Two partitions
struct.pack(">i", 2), # Partition 2
@@ -636,82 +641,78 @@ class TestProtocol(unittest.TestCase):
results = KafkaProtocol.decode_offset_commit_response(encoded)
self.assertEqual(set(results), set([
OffsetCommitResponse(topic = 'topic1', partition = 2, error = 0),
OffsetCommitResponse(topic = 'topic1', partition = 4, error = 0),
OffsetCommitResponse(topic = b'topic1', partition = 2, error = 0),
OffsetCommitResponse(topic = b'topic1', partition = 4, error = 0),
]))
def test_encode_offset_fetch_request(self):
header = "".join([
header = b"".join([
struct.pack('>i', 69), # Total message length
struct.pack('>h', 9), # Message type = offset fetch
struct.pack('>h', 0), # API version
struct.pack('>i', 42), # Correlation ID
struct.pack('>h9s', 9, "client_id"), # The client ID
struct.pack('>h8s', 8, "group_id"), # The group to commit for
struct.pack('>h9s', 9, b"client_id"),# The client ID
struct.pack('>h8s', 8, b"group_id"), # The group to commit for
struct.pack('>i', 2), # Num topics
])
topic1 = "".join([
struct.pack(">h6s", 6, "topic1"), # Topic for the request
topic1 = b"".join([
struct.pack(">h6s", 6, b"topic1"), # Topic for the request
struct.pack(">i", 2), # Two partitions
struct.pack(">i", 0), # Partition 0
struct.pack(">i", 1), # Partition 1
])
topic2 = "".join([
struct.pack(">h6s", 6, "topic2"), # Topic for the request
topic2 = b"".join([
struct.pack(">h6s", 6, b"topic2"), # Topic for the request
struct.pack(">i", 1), # One partitions
struct.pack(">i", 2), # Partition 2
])
expected1 = "".join([ header, topic1, topic2 ])
expected2 = "".join([ header, topic2, topic1 ])
expected1 = b"".join([ header, topic1, topic2 ])
expected2 = b"".join([ header, topic2, topic1 ])
encoded = KafkaProtocol.encode_offset_fetch_request("client_id", 42, "group_id", [
OffsetFetchRequest("topic1", 0),
OffsetFetchRequest("topic1", 1),
OffsetFetchRequest("topic2", 2),
encoded = KafkaProtocol.encode_offset_fetch_request(b"client_id", 42, b"group_id", [
OffsetFetchRequest(b"topic1", 0),
OffsetFetchRequest(b"topic1", 1),
OffsetFetchRequest(b"topic2", 2),
])
self.assertIn(encoded, [ expected1, expected2 ])
def test_decode_offset_fetch_response(self):
encoded = "".join([
encoded = b"".join([
struct.pack(">i", 42), # Correlation ID
struct.pack(">i", 1), # One topics
struct.pack(">h6s", 6, "topic1"), # First topic
struct.pack(">h6s", 6, b"topic1"),# First topic
struct.pack(">i", 2), # Two partitions
struct.pack(">i", 2), # Partition 2
struct.pack(">q", 4), # Offset 4
struct.pack(">h4s", 4, "meta"), # Metadata
struct.pack(">h4s", 4, b"meta"), # Metadata
struct.pack(">h", 0), # No error
struct.pack(">i", 4), # Partition 4
struct.pack(">q", 8), # Offset 8
struct.pack(">h4s", 4, "meta"), # Metadata
struct.pack(">h4s", 4, b"meta"), # Metadata
struct.pack(">h", 0), # No error
])
results = KafkaProtocol.decode_offset_fetch_response(encoded)
self.assertEqual(set(results), set([
OffsetFetchResponse(topic = 'topic1', partition = 2, offset = 4, error = 0, metadata = "meta"),
OffsetFetchResponse(topic = 'topic1', partition = 4, offset = 8, error = 0, metadata = "meta"),
OffsetFetchResponse(topic = b'topic1', partition = 2, offset = 4, error = 0, metadata = b"meta"),
OffsetFetchResponse(topic = b'topic1', partition = 4, offset = 8, error = 0, metadata = b"meta"),
]))
@contextmanager
def mock_create_message_fns(self):
patches = contextlib.nested(
mock.patch.object(kafka.protocol, "create_message",
return_value=sentinel.message),
mock.patch.object(kafka.protocol, "create_gzip_message",
return_value=sentinel.gzip_message),
mock.patch.object(kafka.protocol, "create_snappy_message",
return_value=sentinel.snappy_message),
)
with patches:
yield
with mock.patch.object(kafka.protocol, "create_message",
return_value=sentinel.message):
with mock.patch.object(kafka.protocol, "create_gzip_message",
return_value=sentinel.gzip_message):
with mock.patch.object(kafka.protocol, "create_snappy_message",
return_value=sentinel.snappy_message):
yield
def test_create_message_set(self):
messages = [1, 2, 3]

View File

@@ -1,5 +1,8 @@
# -*- coding: utf-8 -*-
import struct
import six
import kafka.util
import kafka.common
@@ -13,8 +16,8 @@ class UtilTest(unittest.TestCase):
def test_write_int_string(self):
self.assertEqual(
kafka.util.write_int_string('some string'),
'\x00\x00\x00\x0bsome string'
kafka.util.write_int_string(b'some string'),
b'\x00\x00\x00\x0bsome string'
)
def test_write_int_string__unicode(self):
@@ -22,34 +25,37 @@ class UtilTest(unittest.TestCase):
kafka.util.write_int_string(u'unicode')
#: :type: TypeError
te = cm.exception
self.assertIn('unicode', te.message)
self.assertIn('to be str', te.message)
if six.PY2:
self.assertIn('unicode', str(te))
else:
self.assertIn('str', str(te))
self.assertIn('to be bytes', str(te))
def test_write_int_string__empty(self):
self.assertEqual(
kafka.util.write_int_string(''),
'\x00\x00\x00\x00'
kafka.util.write_int_string(b''),
b'\x00\x00\x00\x00'
)
def test_write_int_string__null(self):
self.assertEqual(
kafka.util.write_int_string(None),
'\xff\xff\xff\xff'
b'\xff\xff\xff\xff'
)
def test_read_int_string(self):
self.assertEqual(kafka.util.read_int_string('\xff\xff\xff\xff', 0), (None, 4))
self.assertEqual(kafka.util.read_int_string('\x00\x00\x00\x00', 0), ('', 4))
self.assertEqual(kafka.util.read_int_string('\x00\x00\x00\x0bsome string', 0), ('some string', 15))
self.assertEqual(kafka.util.read_int_string(b'\xff\xff\xff\xff', 0), (None, 4))
self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x00', 0), (b'', 4))
self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x0bsome string', 0), (b'some string', 15))
def test_read_int_string__insufficient_data(self):
with self.assertRaises(kafka.common.BufferUnderflowError):
kafka.util.read_int_string('\x00\x00\x00\x021', 0)
kafka.util.read_int_string(b'\x00\x00\x00\x021', 0)
def test_write_short_string(self):
self.assertEqual(
kafka.util.write_short_string('some string'),
'\x00\x0bsome string'
kafka.util.write_short_string(b'some string'),
b'\x00\x0bsome string'
)
def test_write_short_string__unicode(self):
@@ -57,29 +63,32 @@ class UtilTest(unittest.TestCase):
kafka.util.write_short_string(u'hello')
#: :type: TypeError
te = cm.exception
self.assertIn('unicode', te.message)
self.assertIn('to be str', te.message)
if six.PY2:
self.assertIn('unicode', str(te))
else:
self.assertIn('str', str(te))
self.assertIn('to be bytes', str(te))
def test_write_short_string__empty(self):
self.assertEqual(
kafka.util.write_short_string(''),
'\x00\x00'
kafka.util.write_short_string(b''),
b'\x00\x00'
)
def test_write_short_string__null(self):
self.assertEqual(
kafka.util.write_short_string(None),
'\xff\xff'
b'\xff\xff'
)
def test_write_short_string__too_long(self):
with self.assertRaises(struct.error):
kafka.util.write_short_string(' ' * 33000)
kafka.util.write_short_string(b' ' * 33000)
def test_read_short_string(self):
self.assertEqual(kafka.util.read_short_string('\xff\xff', 0), (None, 2))
self.assertEqual(kafka.util.read_short_string('\x00\x00', 0), ('', 2))
self.assertEqual(kafka.util.read_short_string('\x00\x0bsome string', 0), ('some string', 13))
self.assertEqual(kafka.util.read_short_string(b'\xff\xff', 0), (None, 2))
self.assertEqual(kafka.util.read_short_string(b'\x00\x00', 0), (b'', 2))
self.assertEqual(kafka.util.read_short_string(b'\x00\x0bsome string', 0), (b'some string', 13))
def test_read_int_string__insufficient_data2(self):
with self.assertRaises(kafka.common.BufferUnderflowError):
@@ -87,7 +96,7 @@ class UtilTest(unittest.TestCase):
def test_relative_unpack2(self):
self.assertEqual(
kafka.util.relative_unpack('>hh', '\x00\x01\x00\x00\x02', 0),
kafka.util.relative_unpack('>hh', b'\x00\x01\x00\x00\x02', 0),
((1, 0), 4)
)

View File

@@ -7,6 +7,8 @@ import string
import time
import uuid
from six.moves import xrange
from . import unittest
from kafka.common import OffsetRequest
@@ -21,8 +23,8 @@ __all__ = [
]
def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l))
return s
s = "".join(random.choice(string.ascii_letters) for i in xrange(l))
return s.encode('utf-8')
def kafka_versions(*versions):
def kafka_versions(func):

20
tox.ini
View File

@@ -1,5 +1,5 @@
[tox]
envlist = lint, py26, py27, pypy
envlist = lint, py26, py27, pypy, py33, py34
[testenv]
deps =
unittest2
@@ -12,7 +12,25 @@ commands =
nosetests {posargs:-v --with-id --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
setenv =
PROJECT_ROOT = {toxinidir}
[testenv:py33]
deps =
nose
nose-timer
coverage
mock
python-snappy
[testenv:py34]
deps =
nose
nose-timer
coverage
mock
python-snappy
[testenv:lint]
basepython = python2.7
deps =
unittest2
mock

View File

@@ -3,6 +3,10 @@
if [ $1 == "pypy" ]; then
echo "pypy"
elif [ $1 == "3.4" ]; then
echo "py34"
elif [ $1 == "3.3" ]; then
echo "py33"
elif [ $1 == "2.7" ]; then
echo "py27"
elif [ $1 == "2.6" ]; then