Merge pull request #227 from wizzat-feature/py3

Python 3 Support

Conflicts:
	kafka/producer.py
	test/test_client.py
	test/test_client_integration.py
	test/test_codec.py
	test/test_consumer.py
	test/test_consumer_integration.py
	test/test_failover_integration.py
	test/test_producer.py
	test/test_producer_integration.py
	test/test_protocol.py
	test/test_util.py
This commit is contained in:
Dana Powers
2014-09-07 18:52:05 -07:00
28 changed files with 444 additions and 344 deletions

View File

@@ -3,6 +3,8 @@ language: python
python:
- 2.6
- 2.7
- 3.3
- 3.4
- pypy
env:

View File

@@ -1,3 +1,4 @@
import binascii
import collections
import copy
import functools
@@ -20,7 +21,7 @@ log = logging.getLogger("kafka")
class KafkaClient(object):
CLIENT_ID = "kafka-python"
CLIENT_ID = b"kafka-python"
ID_GEN = itertools.count()
# NOTE: The timeout given to the client should always be greater than the
@@ -81,7 +82,7 @@ class KafkaClient(object):
"""
Generate a new correlation id
"""
return KafkaClient.ID_GEN.next()
return next(KafkaClient.ID_GEN)
def _send_broker_unaware_request(self, requestId, request):
"""
@@ -96,7 +97,7 @@ class KafkaClient(object):
return response
except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (request, host, port, e))
"trying next server: %s" % (binascii.b2a_hex(request), host, port, e))
raise KafkaUnavailableError("All servers failed to process request")
@@ -145,7 +146,7 @@ class KafkaClient(object):
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self._get_conn(broker.host, broker.port)
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
requestId = self._next_id()
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
@@ -160,11 +161,11 @@ class KafkaClient(object):
response = conn.recv(requestId)
except ConnectionError as e:
log.warning("Could not receive response to request [%s] "
"from server %s: %s", request, conn, e)
"from server %s: %s", binascii.b2a_hex(request), conn, e)
failed = True
except ConnectionError as e:
log.warning("Could not send request [%s] to server %s: %s",
request, conn, e)
binascii.b2a_hex(request), conn, e)
failed = True
if failed:
@@ -233,8 +234,8 @@ class KafkaClient(object):
A reinit() has to be done on the copy before it can be used again
"""
c = copy.deepcopy(self)
for k, v in c.conns.items():
c.conns[k] = v.copy()
for key in c.conns:
c.conns[key] = self.conns[key].copy()
return c
def reinit(self):

View File

@@ -1,8 +1,11 @@
from cStringIO import StringIO
from io import BytesIO
import gzip
import struct
_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1)
import six
from six.moves import xrange
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
try:
@@ -21,7 +24,7 @@ def has_snappy():
def gzip_encode(payload):
buffer = StringIO()
buffer = BytesIO()
handle = gzip.GzipFile(fileobj=buffer, mode="w")
handle.write(payload)
handle.close()
@@ -32,7 +35,7 @@ def gzip_encode(payload):
def gzip_decode(payload):
buffer = StringIO(payload)
buffer = BytesIO(payload)
handle = gzip.GzipFile(fileobj=buffer, mode='r')
result = handle.read()
handle.close()
@@ -68,9 +71,9 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
for i in xrange(0, len(payload), xerial_blocksize):
yield payload[i:i+xerial_blocksize]
out = StringIO()
out = BytesIO()
header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat
header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
out.write(header)
@@ -121,8 +124,8 @@ def snappy_decode(payload):
if _detect_xerial_stream(payload):
# TODO ? Should become a fileobj ?
out = StringIO()
byt = buffer(payload[16:])
out = BytesIO()
byt = payload[16:]
length = len(byt)
cursor = 0

View File

@@ -5,6 +5,8 @@ import socket
import struct
from threading import local
import six
from kafka.common import ConnectionError
log = logging.getLogger("kafka")
@@ -19,7 +21,7 @@ def collect_hosts(hosts, randomize=True):
randomize the returned list.
"""
if isinstance(hosts, basestring):
if isinstance(hosts, six.string_types):
hosts = hosts.strip().split(',')
result = []
@@ -92,7 +94,7 @@ class KafkaConnection(local):
# Receiving empty string from recv signals
# that the socket is in error. we will never get
# more data from this socket
if data == '':
if data == b'':
raise socket.error("Not enough data to read message -- did server kill socket?")
except socket.error:
@@ -103,7 +105,7 @@ class KafkaConnection(local):
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
responses.append(data)
return ''.join(responses)
return b''.join(responses)
##################
# Public API #
@@ -144,7 +146,7 @@ class KafkaConnection(local):
# Read the remainder of the response
resp = self._read_bytes(size)
return str(resp)
return resp
def copy(self):
"""
@@ -153,6 +155,10 @@ class KafkaConnection(local):
return a new KafkaConnection object
"""
c = copy.deepcopy(self)
# Python 3 doesn't copy custom attributes of the threadlocal subclass
c.host = copy.copy(self.host)
c.port = copy.copy(self.port)
c.timeout = copy.copy(self.timeout)
c._sock = None
return c

View File

@@ -1,12 +1,21 @@
from __future__ import absolute_import
from itertools import izip_longest, repeat
try:
from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
except ImportError: # python 2
from itertools import izip_longest as izip_longest, repeat
import logging
import time
import numbers
from threading import Lock
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue
import six
try:
from Queue import Empty, Queue
except ImportError: # python 2
from queue import Empty, Queue
import kafka.common
from kafka.common import (
@@ -420,7 +429,7 @@ class SimpleConsumer(Consumer):
for p in self.fetch_offsets.keys())
while partitions:
requests = []
for partition, buffer_size in partitions.iteritems():
for partition, buffer_size in six.iteritems(partitions):
requests.append(FetchRequest(self.topic, partition,
self.fetch_offsets[partition],
buffer_size))
@@ -582,7 +591,7 @@ class MultiProcessConsumer(Consumer):
for chunk in chunks:
chunk = filter(lambda x: x is not None, chunk)
args = (client.copy(),
group, topic, chunk,
group, topic, list(chunk),
self.queue, self.start, self.exit,
self.pause, self.size)

View File

@@ -43,7 +43,7 @@ class RoundRobinPartitioner(Partitioner):
if self.partitions != partitions:
self._set_partitions(partitions)
return self.iterpart.next()
return next(self.iterpart)
class HashedPartitioner(Partitioner):

View File

@@ -4,11 +4,17 @@ import logging
import time
import random
from Queue import Empty
try:
from queue import Empty
except ImportError:
from Queue import Empty
from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
import six
from six.moves import xrange
from kafka.common import (
ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError
)
@@ -173,7 +179,7 @@ class Producer(object):
raise TypeError("msg is not a list or tuple!")
# Raise TypeError if any message is not encoded as bytes
if any(not isinstance(m, bytes) for m in msg):
if any(not isinstance(m, six.binary_type) for m in msg):
raise TypeError("all produce message payloads must be type bytes")
if self.async:
@@ -221,7 +227,7 @@ class SimpleProducer(Producer):
batch_send_every_t - If set, messages are send after this timeout
random_start - If true, randomize the initial partition which the
the first message block will be published to, otherwise
if false, the first message block will always publish
if false, the first message block will always publish
to partition 0 before cycling through each partition
"""
def __init__(self, client, async=False,
@@ -252,9 +258,9 @@ class SimpleProducer(Producer):
if self.random_start:
num_partitions = len(self.client.topic_partitions[topic])
for _ in xrange(random.randint(0, num_partitions-1)):
self.partition_cycles[topic].next()
next(self.partition_cycles[topic])
return self.partition_cycles[topic].next()
return next(self.partition_cycles[topic])
def send_messages(self, topic, *msg):
partition = self._next_partition(topic)

View File

@@ -1,6 +1,9 @@
import logging
import struct
import zlib
import six
from six.moves import xrange
from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
@@ -13,7 +16,7 @@ from kafka.common import (
UnsupportedCodecError
)
from kafka.util import (
read_short_string, read_int_string, relative_unpack,
crc32, read_short_string, read_int_string, relative_unpack,
write_short_string, write_int_string, group_by_topic_and_partition
)
@@ -67,7 +70,7 @@ class KafkaProtocol(object):
Offset => int64
MessageSize => int32
"""
message_set = ""
message_set = b""
for message in messages:
encoded_message = KafkaProtocol._encode_message(message)
message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message)
@@ -94,8 +97,8 @@ class KafkaProtocol(object):
msg = struct.pack('>BB', message.magic, message.attributes)
msg += write_int_string(message.key)
msg += write_int_string(message.value)
crc = zlib.crc32(msg)
msg = struct.pack('>i%ds' % len(msg), crc, msg)
crc = crc32(msg)
msg = struct.pack('>I%ds' % len(msg), crc, msg)
else:
raise ProtocolError("Unexpected magic number: %d" % message.magic)
return msg
@@ -145,8 +148,8 @@ class KafkaProtocol(object):
The offset is actually read from decode_message_set_iter (it is part
of the MessageSet payload).
"""
((crc, magic, att), cur) = relative_unpack('>iBB', data, 0)
if crc != zlib.crc32(data[4:]):
((crc, magic, att), cur) = relative_unpack('>IBB', data, 0)
if crc != crc32(data[4:]):
raise ChecksumError("Message checksum failed")
(key, cur) = read_int_string(data, cur)

View File

@@ -1,14 +1,21 @@
import binascii
import collections
import struct
import sys
from threading import Thread, Event
import six
from kafka.common import BufferUnderflowError
def crc32(data):
return binascii.crc32(data) & 0xffffffff
def write_int_string(s):
if s is not None and not isinstance(s, str):
raise TypeError('Expected "%s" to be str\n'
if s is not None and not isinstance(s, six.binary_type):
raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>i', -1)
@@ -17,12 +24,12 @@ def write_int_string(s):
def write_short_string(s):
if s is not None and not isinstance(s, str):
raise TypeError('Expected "%s" to be str\n'
if s is not None and not isinstance(s, six.binary_type):
raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>h', -1)
elif len(s) > 32767 and sys.version < (2, 7):
elif len(s) > 32767 and sys.version_info < (2, 7):
# Python 2.6 issues a deprecation warning instead of a struct error
raise struct.error(len(s))
else:

View File

@@ -22,11 +22,16 @@ class Tox(Command):
sys.exit(tox.cmdline([]))
test_require = ['tox', 'mock']
if sys.version_info < (2, 7):
test_require.append('unittest2')
setup(
name="kafka-python",
version=__version__,
tests_require=["tox", "mock", "unittest2"],
tests_require=test_require,
cmdclass={"test": Tox},
packages=["kafka"],
@@ -43,6 +48,7 @@ protocol as well as broker-aware request routing. Gzip and Snappy compression
is also supported for message sets.
""",
keywords="apache kafka",
install_requires=['six'],
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",

View File

@@ -0,0 +1,6 @@
import sys
if sys.version_info < (2, 7):
import unittest2 as unittest
else:
import unittest

View File

@@ -4,10 +4,10 @@ import os.path
import shutil
import subprocess
import tempfile
import urllib2
from six.moves import urllib
import uuid
from urlparse import urlparse
from six.moves.urllib.parse import urlparse # pylint: disable-msg=E0611
from test.service import ExternalService, SpawnedService
from test.testutil import get_open_port
@@ -42,12 +42,12 @@ class Fixture(object):
try:
url = url_base + distfile + '.tgz'
logging.info("Attempting to download %s", url)
response = urllib2.urlopen(url)
except urllib2.HTTPError:
response = urllib.request.urlopen(url)
except urllib.error.HTTPError:
logging.exception("HTTP Error")
url = url_base + distfile + '.tar.gz'
logging.info("Attempting to download %s", url)
response = urllib2.urlopen(url)
response = urllib.request.urlopen(url)
logging.info("Saving distribution file to %s", output_file)
with open(output_file, 'w') as output_file_fd:

View File

@@ -54,11 +54,11 @@ class SpawnedService(threading.Thread):
if self.child.stdout in rds:
line = self.child.stdout.readline()
self.captured_stdout.append(line)
self.captured_stdout.append(line.decode('utf-8'))
if self.child.stderr in rds:
line = self.child.stderr.readline()
self.captured_stderr.append(line)
self.captured_stderr.append(line.decode('utf-8'))
if self.should_die.is_set():
self.child.terminate()

View File

@@ -1,8 +1,9 @@
import socket
from time import sleep
import unittest2
from mock import MagicMock, patch
import six
from . import unittest
from kafka import KafkaClient
from kafka.common import (
@@ -12,34 +13,34 @@ from kafka.common import (
ConnectionError
)
from kafka.conn import KafkaConnection
from kafka.protocol import create_message
from kafka.protocol import KafkaProtocol, create_message
from test.testutil import Timer
class TestKafkaClient(unittest2.TestCase):
class TestKafkaClient(unittest.TestCase):
def test_init_with_list(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)
self.assertEqual(
sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
sorted(client.hosts))
def test_init_with_csv(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092,kafka03:9092')
self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)
self.assertEqual(
sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
sorted(client.hosts))
def test_init_with_unicode_csv(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(hosts=u'kafka01:9092,kafka02:9092,kafka03:9092')
self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)
self.assertEqual(
sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
sorted(client.hosts))
def test_send_broker_unaware_request_fail(self):
'Tests that call fails when all hosts are unavailable'
@@ -61,11 +62,12 @@ class TestKafkaClient(unittest2.TestCase):
with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])
req = KafkaProtocol.encode_metadata_request(b'client', 0)
with self.assertRaises(KafkaUnavailableError):
client._send_broker_unaware_request(1, 'fake request')
client._send_broker_unaware_request(1, req)
for key, conn in mocked_conns.iteritems():
conn.send.assert_called_with(1, 'fake request')
for key, conn in six.iteritems(mocked_conns):
conn.send.assert_called_with(1, req)
def test_send_broker_unaware_request(self):
'Tests that call works when at least one of the host is available'
@@ -88,7 +90,8 @@ class TestKafkaClient(unittest2.TestCase):
with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
resp = client._send_broker_unaware_request(1, 'fake request')
req = KafkaProtocol.encode_metadata_request(b'client', 0)
resp = client._send_broker_unaware_request(1, req)
self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)

View File

@@ -1,8 +1,5 @@
import os
import socket
import unittest2
from kafka.conn import KafkaConnection
from kafka.common import (
FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
KafkaTimeoutError
@@ -10,7 +7,7 @@ from kafka.common import (
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
KafkaIntegrationTestCase, get_open_port, kafka_versions, Timer
KafkaIntegrationTestCase, kafka_versions
)
class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@@ -51,7 +48,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
# ensure_topic_exists should fail with KafkaTimeoutError
with self.assertRaises(KafkaTimeoutError):
self.client.ensure_topic_exists("this_topic_doesnt_exist", timeout=0)
self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)
####################
# Offset Tests #
@@ -59,12 +56,12 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@kafka_versions("0.8.1", "0.8.1.1")
def test_commit_fetch_offsets(self):
req = OffsetCommitRequest(self.topic, 0, 42, "metadata")
(resp,) = self.client.send_offset_commit_request("group", [req])
req = OffsetCommitRequest(self.topic, 0, 42, b"metadata")
(resp,) = self.client.send_offset_commit_request(b"group", [req])
self.assertEquals(resp.error, 0)
req = OffsetFetchRequest(self.topic, 0)
(resp,) = self.client.send_offset_fetch_request("group", [req])
(resp,) = self.client.send_offset_fetch_request(b"group", [req])
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 42)
self.assertEquals(resp.metadata, "") # Metadata isn't stored for now
self.assertEquals(resp.metadata, b"") # Metadata isn't stored for now

View File

@@ -1,34 +1,37 @@
import struct
import unittest2
from six.moves import xrange
from . import unittest
from kafka.codec import (
has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
)
from testutil import random_string
class TestCodec(unittest2.TestCase):
from test.testutil import random_string
class TestCodec(unittest.TestCase):
def test_gzip(self):
for i in xrange(1000):
s1 = random_string(100)
s2 = gzip_decode(gzip_encode(s1))
self.assertEquals(s1, s2)
@unittest2.skipUnless(has_snappy(), "Snappy not available")
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy(self):
for i in xrange(1000):
s1 = random_string(100)
s2 = snappy_decode(snappy_encode(s1))
self.assertEquals(s1, s2)
@unittest2.skipUnless(has_snappy(), "Snappy not available")
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_detect_xerial(self):
import kafka as kafka1
_detect_xerial_stream = kafka1.codec._detect_xerial_stream
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
random_snappy = snappy_encode('SNAPPY' * 50)
random_snappy = snappy_encode(b'SNAPPY' * 50)
short_data = b'\x01\x02\x03\x04'
self.assertTrue(_detect_xerial_stream(header))
@@ -38,29 +41,31 @@ class TestCodec(unittest2.TestCase):
self.assertFalse(_detect_xerial_stream(random_snappy))
self.assertFalse(_detect_xerial_stream(short_data))
@unittest2.skipUnless(has_snappy(), "Snappy not available")
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_decode_xerial(self):
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
random_snappy = snappy_encode('SNAPPY' * 50)
random_snappy = snappy_encode(b'SNAPPY' * 50)
block_len = len(random_snappy)
random_snappy2 = snappy_encode('XERIAL' * 50)
random_snappy2 = snappy_encode(b'XERIAL' * 50)
block_len2 = len(random_snappy2)
to_test = header \
+ struct.pack('!i', block_len) + random_snappy \
+ struct.pack('!i', block_len2) + random_snappy2 \
self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50))
self.assertEquals(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50))
@unittest2.skipUnless(has_snappy(), "Snappy not available")
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_encode_xerial(self):
to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \
'\x00\x00\x00\x18' + \
'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \
'\x00\x00\x00\x18' + \
'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
to_ensure = (
b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
b'\x00\x00\x00\x18'
b'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
b'\x00\x00\x00\x18'
b'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
)
to_test = ('SNAPPY' * 50) + ('XERIAL' * 50)
to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50)
compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
self.assertEquals(compressed, to_ensure)

View File

@@ -2,19 +2,19 @@ import socket
import struct
import mock
import unittest2
from . import unittest
from kafka.common import ConnectionError
from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS
class ConnTest(unittest2.TestCase):
class ConnTest(unittest.TestCase):
def setUp(self):
self.config = {
'host': 'localhost',
'port': 9090,
'request_id': 0,
'payload': 'test data',
'payload2': 'another packet'
'payload': b'test data',
'payload2': b'another packet'
}
# Mocking socket.create_connection will cause _sock to always be a
@@ -35,12 +35,12 @@ class ConnTest(unittest2.TestCase):
struct.pack('>%ds' % payload_size, self.config['payload']),
struct.pack('>i', payload2_size),
struct.pack('>%ds' % payload2_size, self.config['payload2']),
''
b''
]
# Create a connection object
self.conn = KafkaConnection(self.config['host'], self.config['port'])
# Reset any mock counts caused by __init__
self.MockCreateConn.reset_mock()

View File

@@ -1,10 +1,10 @@
import unittest2
from mock import MagicMock
from . import unittest
from kafka.consumer import SimpleConsumer
class TestKafkaConsumer(unittest2.TestCase):
class TestKafkaConsumer(unittest.TestCase):
def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])

View File

@@ -1,5 +1,7 @@
import os
from six.moves import xrange
from kafka import SimpleConsumer, MultiProcessConsumer, create_message
from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall
from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
@@ -150,7 +152,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with Timer() as t:
messages = consumer.get_messages(count=10, block=True, timeout=5)
self.assert_message_count(messages, 5)
self.assertGreaterEqual(t.interval, 5)
self.assertGreaterEqual(t.interval, 4.95)
consumer.stop()
@@ -269,7 +271,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
kwargs.setdefault('auto_commit', True)
consumer_class = kwargs.pop('consumer', SimpleConsumer)
group = kwargs.pop('group', self.id())
group = kwargs.pop('group', self.id().encode('utf-8'))
topic = kwargs.pop('topic', self.topic)
if consumer_class == SimpleConsumer:

View File

@@ -1,7 +1,8 @@
import logging
import os
import time
import unittest2
from . import unittest
from kafka import KafkaClient, SimpleConsumer
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
@@ -65,7 +66,7 @@ class TestFailover(KafkaIntegrationTestCase):
while not recovered and (time.time() - started) < timeout:
try:
logging.debug("attempting to send 'success' message after leader killed")
producer.send_messages(topic, partition, 'success')
producer.send_messages(topic, partition, b'success')
logging.debug("success!")
recovered = True
except (FailedPayloadsError, ConnectionError):
@@ -84,7 +85,7 @@ class TestFailover(KafkaIntegrationTestCase):
#@kafka_versions("all")
@unittest2.skip("async producer does not support reliable failover yet")
@unittest.skip("async producer does not support reliable failover yet")
def test_switch_leader_async(self):
topic = self.topic
partition = 0

View File

@@ -1,6 +1,6 @@
import unittest2
from . import unittest
class TestPackage(unittest2.TestCase):
class TestPackage(unittest.TestCase):
def test_top_level_namespace(self):
import kafka as kafka1
self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient")

View File

@@ -2,25 +2,25 @@
import logging
import unittest2
from mock import MagicMock
from . import unittest
from kafka.producer import Producer
class TestKafkaProducer(unittest2.TestCase):
class TestKafkaProducer(unittest.TestCase):
def test_producer_message_types(self):
producer = Producer(MagicMock())
topic = "test-topic"
topic = b"test-topic"
partition = 0
bad_data_types = (u'你怎么样?', 12, ['a','list'], ('a','tuple'), {'a': 'dict'})
bad_data_types = (u'你怎么样?', 12, ['a', 'list'], ('a', 'tuple'), {'a': 'dict'})
for m in bad_data_types:
with self.assertRaises(TypeError):
logging.debug("attempting to send message of type %s", type(m))
producer.send_messages(topic, partition, m)
good_data_types = ('a string!',)
good_data_types = (b'a string!',)
for m in good_data_types:
# This should not raise an exception
producer.send_messages(topic, partition, m)

View File

@@ -7,16 +7,16 @@ from kafka import (
create_message, create_gzip_message, create_snappy_message,
RoundRobinPartitioner, HashedPartitioner
)
from kafka.codec import has_snappy
from kafka.common import (
FetchRequest, ProduceRequest, UnknownTopicOrPartitionError
)
from kafka.codec import has_snappy
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
topic = 'produce_topic'
topic = b'produce_topic'
@classmethod
def setUpClass(cls): # noqa
@@ -39,13 +39,15 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request(
[ create_message("Test message %d" % i) for i in range(100) ],
[create_message(("Test message %d" % i).encode('utf-8'))
for i in range(100)],
start_offset,
100,
)
self.assert_produce_request(
[ create_message("Test message %d" % i) for i in range(100) ],
[create_message(("Test message %d" % i).encode('utf-8'))
for i in range(100)],
start_offset+100,
100,
)
@@ -55,7 +57,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request(
[ create_message("Test message %d" % i) for i in range(10000) ],
[create_message(("Test message %d" % i).encode('utf-8'))
for i in range(10000)],
start_offset,
10000,
)
@@ -64,8 +67,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def test_produce_many_gzip(self):
start_offset = self.current_offset(self.topic, 0)
message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
message1 = create_gzip_message([
("Gzipped 1 %d" % i).encode('utf-8') for i in range(100)])
message2 = create_gzip_message([
("Gzipped 2 %d" % i).encode('utf-8') for i in range(100)])
self.assert_produce_request(
[ message1, message2 ],
@@ -92,8 +97,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
msg_count = 1+100
messages = [
create_message("Just a plain message"),
create_gzip_message(["Gzipped %d" % i for i in range(100)]),
create_message(b"Just a plain message"),
create_gzip_message([
("Gzipped %d" % i).encode('utf-8') for i in range(100)]),
]
# All snappy integration tests fail with nosnappyjava
@@ -108,14 +114,18 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request([
create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
create_gzip_message([
("Gzipped batch 1, message %d" % i).encode('utf-8')
for i in range(50000)])
],
start_offset,
50000,
)
self.assert_produce_request([
create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
create_gzip_message([
("Gzipped batch 1, message %d" % i).encode('utf-8')
for i in range(50000)])
],
start_offset+50000,
50000,
@@ -151,7 +161,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_produce__new_topic_fails_with_reasonable_error(self):
new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4()))
new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8')
producer = SimpleProducer(self.client)
# At first it doesn't exist

View File

@@ -1,9 +1,11 @@
from contextlib import contextmanager, nested
from contextlib import contextmanager
import struct
import unittest2
import six
from mock import patch, sentinel
from . import unittest
from kafka.codec import has_snappy, gzip_decode, snappy_decode
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
@@ -12,15 +14,13 @@ from kafka.common import (
BrokerMetadata, PartitionMetadata, ProtocolError,
UnsupportedCodecError
)
from kafka.codec import has_snappy, gzip_decode, snappy_decode
import kafka.protocol
from kafka.protocol import (
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
create_message, create_gzip_message, create_snappy_message,
create_message_set
)
class TestProtocol(unittest2.TestCase):
class TestProtocol(unittest.TestCase):
def test_create_message(self):
payload = "test"
key = "key"
@@ -31,21 +31,21 @@ class TestProtocol(unittest2.TestCase):
self.assertEqual(msg.value, payload)
def test_create_gzip(self):
payloads = ["v1", "v2"]
payloads = [b"v1", b"v2"]
msg = create_gzip_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
self.assertEqual(msg.key, None)
# Need to decode to check since gzipped payload is non-deterministic
decoded = gzip_decode(msg.value)
expect = "".join([
expect = b"".join([
struct.pack(">q", 0), # MsgSet offset
struct.pack(">i", 16), # MsgSet size
struct.pack(">i", 1285512130), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", -1), # -1 indicates a null key
struct.pack(">i", 2), # Msg length (bytes)
"v1", # Message contents
b"v1", # Message contents
struct.pack(">q", 0), # MsgSet offset
struct.pack(">i", 16), # MsgSet size
@@ -53,27 +53,27 @@ class TestProtocol(unittest2.TestCase):
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", -1), # -1 indicates a null key
struct.pack(">i", 2), # Msg length (bytes)
"v2", # Message contents
b"v2", # Message contents
])
self.assertEqual(decoded, expect)
@unittest2.skipUnless(has_snappy(), "Snappy not available")
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_create_snappy(self):
payloads = ["v1", "v2"]
payloads = [b"v1", b"v2"]
msg = create_snappy_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
self.assertEqual(msg.key, None)
decoded = snappy_decode(msg.value)
expect = "".join([
expect = b"".join([
struct.pack(">q", 0), # MsgSet offset
struct.pack(">i", 16), # MsgSet size
struct.pack(">i", 1285512130), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", -1), # -1 indicates a null key
struct.pack(">i", 2), # Msg length (bytes)
"v1", # Message contents
b"v1", # Message contents
struct.pack(">q", 0), # MsgSet offset
struct.pack(">i", 16), # MsgSet size
@@ -81,52 +81,52 @@ class TestProtocol(unittest2.TestCase):
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", -1), # -1 indicates a null key
struct.pack(">i", 2), # Msg length (bytes)
"v2", # Message contents
b"v2", # Message contents
])
self.assertEqual(decoded, expect)
def test_encode_message_header(self):
expect = "".join([
expect = b"".join([
struct.pack(">h", 10), # API Key
struct.pack(">h", 0), # API Version
struct.pack(">i", 4), # Correlation Id
struct.pack(">h", len("client3")), # Length of clientId
"client3", # ClientId
b"client3", # ClientId
])
encoded = KafkaProtocol._encode_message_header("client3", 4, 10)
encoded = KafkaProtocol._encode_message_header(b"client3", 4, 10)
self.assertEqual(encoded, expect)
def test_encode_message(self):
message = create_message("test", "key")
message = create_message(b"test", b"key")
encoded = KafkaProtocol._encode_message(message)
expect = "".join([
expect = b"".join([
struct.pack(">i", -1427009701), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 3), # Length of key
"key", # key
b"key", # key
struct.pack(">i", 4), # Length of value
"test", # value
b"test", # value
])
self.assertEqual(encoded, expect)
def test_decode_message(self):
encoded = "".join([
encoded = b"".join([
struct.pack(">i", -1427009701), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 3), # Length of key
"key", # key
b"key", # key
struct.pack(">i", 4), # Length of value
"test", # value
b"test", # value
])
offset = 10
(returned_offset, decoded_message) = list(KafkaProtocol._decode_message(encoded, offset))[0]
self.assertEqual(returned_offset, offset)
self.assertEqual(decoded_message, create_message("test", "key"))
self.assertEqual(decoded_message, create_message(b"test", b"key"))
def test_encode_message_failure(self):
with self.assertRaises(ProtocolError):
@@ -134,52 +134,52 @@ class TestProtocol(unittest2.TestCase):
def test_encode_message_set(self):
message_set = [
create_message("v1", "k1"),
create_message("v2", "k2")
create_message(b"v1", b"k1"),
create_message(b"v2", b"k2")
]
encoded = KafkaProtocol._encode_message_set(message_set)
expect = "".join([
expect = b"".join([
struct.pack(">q", 0), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", 1474775406), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
"k1", # Key
b"k1", # Key
struct.pack(">i", 2), # Length of value
"v1", # Value
b"v1", # Value
struct.pack(">q", 0), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", -16383415), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
"k2", # Key
b"k2", # Key
struct.pack(">i", 2), # Length of value
"v2", # Value
b"v2", # Value
])
self.assertEqual(encoded, expect)
def test_decode_message_set(self):
encoded = "".join([
encoded = b"".join([
struct.pack(">q", 0), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", 1474775406), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
"k1", # Key
b"k1", # Key
struct.pack(">i", 2), # Length of value
"v1", # Value
b"v1", # Value
struct.pack(">q", 1), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", -16383415), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
"k2", # Key
b"k2", # Key
struct.pack(">i", 2), # Length of value
"v2", # Value
b"v2", # Value
])
msgs = list(KafkaProtocol._decode_message_set_iter(encoded))
@@ -190,17 +190,17 @@ class TestProtocol(unittest2.TestCase):
returned_offset2, decoded_message2 = msg2
self.assertEqual(returned_offset1, 0)
self.assertEqual(decoded_message1, create_message("v1", "k1"))
self.assertEqual(decoded_message1, create_message(b"v1", b"k1"))
self.assertEqual(returned_offset2, 1)
self.assertEqual(decoded_message2, create_message("v2", "k2"))
self.assertEqual(decoded_message2, create_message(b"v2", b"k2"))
def test_decode_message_gzip(self):
gzip_encoded = ('\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000'
'\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01'
'\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8'
'\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00'
'\x00')
gzip_encoded = (b'\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000'
b'\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01'
b'\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8'
b'\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00'
b'\x00')
offset = 11
messages = list(KafkaProtocol._decode_message(gzip_encoded, offset))
@@ -209,18 +209,18 @@ class TestProtocol(unittest2.TestCase):
returned_offset1, decoded_message1 = msg1
self.assertEqual(returned_offset1, 0)
self.assertEqual(decoded_message1, create_message("v1"))
self.assertEqual(decoded_message1, create_message(b"v1"))
returned_offset2, decoded_message2 = msg2
self.assertEqual(returned_offset2, 0)
self.assertEqual(decoded_message2, create_message("v2"))
self.assertEqual(decoded_message2, create_message(b"v2"))
@unittest2.skipUnless(has_snappy(), "Snappy not available")
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_decode_message_snappy(self):
snappy_encoded = ('\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00'
'\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff'
'\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5'
'\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2')
snappy_encoded = (b'\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00'
b'\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff'
b'\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5'
b'\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2')
offset = 11
messages = list(KafkaProtocol._decode_message(snappy_encoded, offset))
self.assertEqual(len(messages), 2)
@@ -229,14 +229,14 @@ class TestProtocol(unittest2.TestCase):
returned_offset1, decoded_message1 = msg1
self.assertEqual(returned_offset1, 0)
self.assertEqual(decoded_message1, create_message("v1"))
self.assertEqual(decoded_message1, create_message(b"v1"))
returned_offset2, decoded_message2 = msg2
self.assertEqual(returned_offset2, 0)
self.assertEqual(decoded_message2, create_message("v2"))
self.assertEqual(decoded_message2, create_message(b"v2"))
def test_decode_message_checksum_error(self):
invalid_encoded_message = "This is not a valid encoded message"
invalid_encoded_message = b"This is not a valid encoded message"
iter = KafkaProtocol._decode_message(invalid_encoded_message, 0)
self.assertRaises(ChecksumError, list, iter)
@@ -247,25 +247,25 @@ class TestProtocol(unittest2.TestCase):
list(KafkaProtocol._decode_message_set_iter('a'))
def test_decode_message_set_stop_iteration(self):
encoded = "".join([
encoded = b"".join([
struct.pack(">q", 0), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", 1474775406), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
"k1", # Key
b"k1", # Key
struct.pack(">i", 2), # Length of value
"v1", # Value
b"v1", # Value
struct.pack(">q", 1), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", -16383415), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
"k2", # Key
b"k2", # Key
struct.pack(">i", 2), # Length of value
"v2", # Value
"@1$%(Y!", # Random padding
b"v2", # Value
b"@1$%(Y!", # Random padding
])
msgs = list(KafkaProtocol._decode_message_set_iter(encoded))
@@ -276,40 +276,40 @@ class TestProtocol(unittest2.TestCase):
returned_offset2, decoded_message2 = msg2
self.assertEqual(returned_offset1, 0)
self.assertEqual(decoded_message1, create_message("v1", "k1"))
self.assertEqual(decoded_message1, create_message(b"v1", b"k1"))
self.assertEqual(returned_offset2, 1)
self.assertEqual(decoded_message2, create_message("v2", "k2"))
self.assertEqual(decoded_message2, create_message(b"v2", b"k2"))
def test_encode_produce_request(self):
requests = [
ProduceRequest("topic1", 0, [
create_message("a"),
create_message("b")
ProduceRequest(b"topic1", 0, [
create_message(b"a"),
create_message(b"b")
]),
ProduceRequest("topic2", 1, [
create_message("c")
ProduceRequest(b"topic2", 1, [
create_message(b"c")
])
]
msg_a_binary = KafkaProtocol._encode_message(create_message("a"))
msg_b_binary = KafkaProtocol._encode_message(create_message("b"))
msg_c_binary = KafkaProtocol._encode_message(create_message("c"))
msg_a_binary = KafkaProtocol._encode_message(create_message(b"a"))
msg_b_binary = KafkaProtocol._encode_message(create_message(b"b"))
msg_c_binary = KafkaProtocol._encode_message(create_message(b"c"))
header = "".join([
header = b"".join([
struct.pack('>i', 0x94), # The length of the message overall
struct.pack('>h', 0), # Msg Header, Message type = Produce
struct.pack('>h', 0), # Msg Header, API version
struct.pack('>i', 2), # Msg Header, Correlation ID
struct.pack('>h7s', 7, "client1"), # Msg Header, The client ID
struct.pack('>h7s', 7, b"client1"), # Msg Header, The client ID
struct.pack('>h', 2), # Num acks required
struct.pack('>i', 100), # Request Timeout
struct.pack('>i', 2), # The number of requests
])
total_len = len(msg_a_binary) + len(msg_b_binary)
topic1 = "".join([
struct.pack('>h6s', 6, 'topic1'), # The topic1
topic1 = b"".join([
struct.pack('>h6s', 6, b'topic1'), # The topic1
struct.pack('>i', 1), # One message set
struct.pack('>i', 0), # Partition 0
struct.pack('>i', total_len + 24), # Size of the incoming message set
@@ -321,8 +321,8 @@ class TestProtocol(unittest2.TestCase):
msg_b_binary, # Actual message
])
topic2 = "".join([
struct.pack('>h6s', 6, 'topic2'), # The topic1
topic2 = b"".join([
struct.pack('>h6s', 6, b'topic2'), # The topic1
struct.pack('>i', 1), # One message set
struct.pack('>i', 1), # Partition 1
struct.pack('>i', len(msg_c_binary) + 12), # Size of the incoming message set
@@ -331,68 +331,72 @@ class TestProtocol(unittest2.TestCase):
msg_c_binary, # Actual message
])
expected1 = "".join([ header, topic1, topic2 ])
expected2 = "".join([ header, topic2, topic1 ])
expected1 = b"".join([ header, topic1, topic2 ])
expected2 = b"".join([ header, topic2, topic1 ])
encoded = KafkaProtocol.encode_produce_request("client1", 2, requests, 2, 100)
encoded = KafkaProtocol.encode_produce_request(b"client1", 2, requests, 2, 100)
self.assertIn(encoded, [ expected1, expected2 ])
def test_decode_produce_response(self):
t1 = "topic1"
t2 = "topic2"
t1 = b"topic1"
t2 = b"topic2"
_long = int
if six.PY2:
_long = long
encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)),
2, 2, len(t1), t1, 2, 0, 0, 10L, 1, 1, 20L,
len(t2), t2, 1, 0, 0, 30L)
2, 2, len(t1), t1, 2, 0, 0, _long(10), 1, 1, _long(20),
len(t2), t2, 1, 0, 0, _long(30))
responses = list(KafkaProtocol.decode_produce_response(encoded))
self.assertEqual(responses,
[ProduceResponse(t1, 0, 0, 10L),
ProduceResponse(t1, 1, 1, 20L),
ProduceResponse(t2, 0, 0, 30L)])
[ProduceResponse(t1, 0, 0, _long(10)),
ProduceResponse(t1, 1, 1, _long(20)),
ProduceResponse(t2, 0, 0, _long(30))])
def test_encode_fetch_request(self):
requests = [
FetchRequest("topic1", 0, 10, 1024),
FetchRequest("topic2", 1, 20, 100),
FetchRequest(b"topic1", 0, 10, 1024),
FetchRequest(b"topic2", 1, 20, 100),
]
header = "".join([
header = b"".join([
struct.pack('>i', 89), # The length of the message overall
struct.pack('>h', 1), # Msg Header, Message type = Fetch
struct.pack('>h', 0), # Msg Header, API version
struct.pack('>i', 3), # Msg Header, Correlation ID
struct.pack('>h7s', 7, "client1"), # Msg Header, The client ID
struct.pack('>h7s', 7, b"client1"),# Msg Header, The client ID
struct.pack('>i', -1), # Replica Id
struct.pack('>i', 2), # Max wait time
struct.pack('>i', 100), # Min bytes
struct.pack('>i', 2), # Num requests
])
topic1 = "".join([
struct.pack('>h6s', 6, 'topic1'), # Topic
topic1 = b"".join([
struct.pack('>h6s', 6, b'topic1'),# Topic
struct.pack('>i', 1), # Num Payloads
struct.pack('>i', 0), # Partition 0
struct.pack('>q', 10), # Offset
struct.pack('>i', 1024), # Max Bytes
])
topic2 = "".join([
struct.pack('>h6s', 6, 'topic2'), # Topic
topic2 = b"".join([
struct.pack('>h6s', 6, b'topic2'),# Topic
struct.pack('>i', 1), # Num Payloads
struct.pack('>i', 1), # Partition 0
struct.pack('>q', 20), # Offset
struct.pack('>i', 100), # Max Bytes
])
expected1 = "".join([ header, topic1, topic2 ])
expected2 = "".join([ header, topic2, topic1 ])
expected1 = b"".join([ header, topic1, topic2 ])
expected2 = b"".join([ header, topic2, topic1 ])
encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2, 100)
encoded = KafkaProtocol.encode_fetch_request(b"client1", 3, requests, 2, 100)
self.assertIn(encoded, [ expected1, expected2 ])
def test_decode_fetch_response(self):
t1 = "topic1"
t2 = "topic2"
msgs = map(create_message, ["message1", "hi", "boo", "foo", "so fun!"])
t1 = b"topic1"
t2 = b"topic2"
msgs = [create_message(msg)
for msg in [b"message1", b"hi", b"boo", b"foo", b"so fun!"]]
ms1 = KafkaProtocol._encode_message_set([msgs[0], msgs[1]])
ms2 = KafkaProtocol._encode_message_set([msgs[2]])
ms3 = KafkaProtocol._encode_message_set([msgs[3], msgs[4]])
@@ -409,7 +413,7 @@ class TestProtocol(unittest2.TestCase):
response.error, response.highwaterMark,
list(response.messages))
expanded_responses = map(expand_messages, responses)
expanded_responses = list(map(expand_messages, responses))
expect = [FetchResponse(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]),
OffsetAndMessage(0, msgs[1])]),
FetchResponse(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]),
@@ -418,47 +422,47 @@ class TestProtocol(unittest2.TestCase):
self.assertEqual(expanded_responses, expect)
def test_encode_metadata_request_no_topics(self):
expected = "".join([
expected = b"".join([
struct.pack(">i", 17), # Total length of the request
struct.pack('>h', 3), # API key metadata fetch
struct.pack('>h', 0), # API version
struct.pack('>i', 4), # Correlation ID
struct.pack('>h3s', 3, "cid"), # The client ID
struct.pack('>h3s', 3, b"cid"),# The client ID
struct.pack('>i', 0), # No topics, give all the data!
])
encoded = KafkaProtocol.encode_metadata_request("cid", 4)
encoded = KafkaProtocol.encode_metadata_request(b"cid", 4)
self.assertEqual(encoded, expected)
def test_encode_metadata_request_with_topics(self):
expected = "".join([
expected = b"".join([
struct.pack(">i", 25), # Total length of the request
struct.pack('>h', 3), # API key metadata fetch
struct.pack('>h', 0), # API version
struct.pack('>i', 4), # Correlation ID
struct.pack('>h3s', 3, "cid"), # The client ID
struct.pack('>h3s', 3, b"cid"),# The client ID
struct.pack('>i', 2), # Number of topics in the request
struct.pack('>h2s', 2, "t1"), # Topic "t1"
struct.pack('>h2s', 2, "t2"), # Topic "t2"
struct.pack('>h2s', 2, b"t1"), # Topic "t1"
struct.pack('>h2s', 2, b"t2"), # Topic "t2"
])
encoded = KafkaProtocol.encode_metadata_request("cid", 4, ["t1", "t2"])
encoded = KafkaProtocol.encode_metadata_request(b"cid", 4, [b"t1", b"t2"])
self.assertEqual(encoded, expected)
def _create_encoded_metadata_response(self, broker_data, topic_data,
topic_errors, partition_errors):
encoded = struct.pack('>ii', 3, len(broker_data))
for node_id, broker in broker_data.iteritems():
for node_id, broker in six.iteritems(broker_data):
encoded += struct.pack('>ih%dsi' % len(broker.host), node_id,
len(broker.host), broker.host, broker.port)
encoded += struct.pack('>i', len(topic_data))
for topic, partitions in topic_data.iteritems():
for topic, partitions in six.iteritems(topic_data):
encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic],
len(topic), topic, len(partitions))
for partition, metadata in partitions.iteritems():
for partition, metadata in six.iteritems(partitions):
encoded += struct.pack('>hiii',
partition_errors[(topic, partition)],
partition, metadata.leader,
@@ -476,25 +480,25 @@ class TestProtocol(unittest2.TestCase):
def test_decode_metadata_response(self):
node_brokers = {
0: BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000),
1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001),
3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000)
0: BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000),
1: BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001),
3: BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
}
topic_partitions = {
"topic1": {
0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)),
1: PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1))
b"topic1": {
0: PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,)),
1: PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1))
},
"topic2": {
0: PartitionMetadata("topic2", 0, 0, (), ())
b"topic2": {
0: PartitionMetadata(b"topic2", 0, 0, (), ())
}
}
topic_errors = {"topic1": 0, "topic2": 1}
topic_errors = {b"topic1": 0, b"topic2": 1}
partition_errors = {
("topic1", 0): 0,
("topic1", 1): 1,
("topic2", 0): 0
(b"topic1", 0): 0,
(b"topic1", 1): 1,
(b"topic2", 0): 0
}
encoded = self._create_encoded_metadata_response(node_brokers,
topic_partitions,
@@ -504,31 +508,31 @@ class TestProtocol(unittest2.TestCase):
self.assertEqual(decoded, (node_brokers, topic_partitions))
def test_encode_offset_request(self):
expected = "".join([
expected = b"".join([
struct.pack(">i", 21), # Total length of the request
struct.pack('>h', 2), # Message type = offset fetch
struct.pack('>h', 0), # API version
struct.pack('>i', 4), # Correlation ID
struct.pack('>h3s', 3, "cid"), # The client ID
struct.pack('>h3s', 3, b"cid"), # The client ID
struct.pack('>i', -1), # Replica Id
struct.pack('>i', 0), # No topic/partitions
])
encoded = KafkaProtocol.encode_offset_request("cid", 4)
encoded = KafkaProtocol.encode_offset_request(b"cid", 4)
self.assertEqual(encoded, expected)
def test_encode_offset_request__no_payload(self):
expected = "".join([
expected = b"".join([
struct.pack(">i", 65), # Total length of the request
struct.pack('>h', 2), # Message type = offset fetch
struct.pack('>h', 0), # API version
struct.pack('>i', 4), # Correlation ID
struct.pack('>h3s', 3, "cid"), # The client ID
struct.pack('>h3s', 3, b"cid"), # The client ID
struct.pack('>i', -1), # Replica Id
struct.pack('>i', 1), # Num topics
struct.pack(">h6s", 6, "topic1"), # Topic for the request
struct.pack(">h6s", 6, b"topic1"),# Topic for the request
struct.pack(">i", 2), # Two partitions
struct.pack(">i", 3), # Partition 3
@@ -540,18 +544,18 @@ class TestProtocol(unittest2.TestCase):
struct.pack(">i", 1), # One offset requested
])
encoded = KafkaProtocol.encode_offset_request("cid", 4, [
OffsetRequest('topic1', 3, -1, 1),
OffsetRequest('topic1', 4, -1, 1),
encoded = KafkaProtocol.encode_offset_request(b"cid", 4, [
OffsetRequest(b'topic1', 3, -1, 1),
OffsetRequest(b'topic1', 4, -1, 1),
])
self.assertEqual(encoded, expected)
def test_decode_offset_response(self):
encoded = "".join([
encoded = b"".join([
struct.pack(">i", 42), # Correlation ID
struct.pack(">i", 1), # One topics
struct.pack(">h6s", 6, "topic1"), # First topic
struct.pack(">h6s", 6, b"topic1"),# First topic
struct.pack(">i", 2), # Two partitions
struct.pack(">i", 2), # Partition 2
@@ -567,24 +571,24 @@ class TestProtocol(unittest2.TestCase):
results = KafkaProtocol.decode_offset_response(encoded)
self.assertEqual(set(results), set([
OffsetResponse(topic = 'topic1', partition = 2, error = 0, offsets=(4,)),
OffsetResponse(topic = 'topic1', partition = 4, error = 0, offsets=(8,)),
OffsetResponse(topic = b'topic1', partition = 2, error = 0, offsets=(4,)),
OffsetResponse(topic = b'topic1', partition = 4, error = 0, offsets=(8,)),
]))
def test_encode_offset_commit_request(self):
header = "".join([
header = b"".join([
struct.pack('>i', 99), # Total message length
struct.pack('>h', 8), # Message type = offset commit
struct.pack('>h', 0), # API version
struct.pack('>i', 42), # Correlation ID
struct.pack('>h9s', 9, "client_id"), # The client ID
struct.pack('>h8s', 8, "group_id"), # The group to commit for
struct.pack('>h9s', 9, b"client_id"),# The client ID
struct.pack('>h8s', 8, b"group_id"), # The group to commit for
struct.pack('>i', 2), # Num topics
])
topic1 = "".join([
struct.pack(">h6s", 6, "topic1"), # Topic for the request
topic1 = b"".join([
struct.pack(">h6s", 6, b"topic1"), # Topic for the request
struct.pack(">i", 2), # Two partitions
struct.pack(">i", 0), # Partition 0
struct.pack(">q", 123), # Offset 123
@@ -594,30 +598,30 @@ class TestProtocol(unittest2.TestCase):
struct.pack(">h", -1), # Null metadata
])
topic2 = "".join([
struct.pack(">h6s", 6, "topic2"), # Topic for the request
topic2 = b"".join([
struct.pack(">h6s", 6, b"topic2"), # Topic for the request
struct.pack(">i", 1), # One partition
struct.pack(">i", 2), # Partition 2
struct.pack(">q", 345), # Offset 345
struct.pack(">h", -1), # Null metadata
])
expected1 = "".join([ header, topic1, topic2 ])
expected2 = "".join([ header, topic2, topic1 ])
expected1 = b"".join([ header, topic1, topic2 ])
expected2 = b"".join([ header, topic2, topic1 ])
encoded = KafkaProtocol.encode_offset_commit_request("client_id", 42, "group_id", [
OffsetCommitRequest("topic1", 0, 123, None),
OffsetCommitRequest("topic1", 1, 234, None),
OffsetCommitRequest("topic2", 2, 345, None),
encoded = KafkaProtocol.encode_offset_commit_request(b"client_id", 42, b"group_id", [
OffsetCommitRequest(b"topic1", 0, 123, None),
OffsetCommitRequest(b"topic1", 1, 234, None),
OffsetCommitRequest(b"topic2", 2, 345, None),
])
self.assertIn(encoded, [ expected1, expected2 ])
def test_decode_offset_commit_response(self):
encoded = "".join([
encoded = b"".join([
struct.pack(">i", 42), # Correlation ID
struct.pack(">i", 1), # One topic
struct.pack(">h6s", 6, "topic1"), # First topic
struct.pack(">h6s", 6, b"topic1"),# First topic
struct.pack(">i", 2), # Two partitions
struct.pack(">i", 2), # Partition 2
@@ -629,82 +633,79 @@ class TestProtocol(unittest2.TestCase):
results = KafkaProtocol.decode_offset_commit_response(encoded)
self.assertEqual(set(results), set([
OffsetCommitResponse(topic = 'topic1', partition = 2, error = 0),
OffsetCommitResponse(topic = 'topic1', partition = 4, error = 0),
OffsetCommitResponse(topic = b'topic1', partition = 2, error = 0),
OffsetCommitResponse(topic = b'topic1', partition = 4, error = 0),
]))
def test_encode_offset_fetch_request(self):
header = "".join([
header = b"".join([
struct.pack('>i', 69), # Total message length
struct.pack('>h', 9), # Message type = offset fetch
struct.pack('>h', 0), # API version
struct.pack('>i', 42), # Correlation ID
struct.pack('>h9s', 9, "client_id"), # The client ID
struct.pack('>h8s', 8, "group_id"), # The group to commit for
struct.pack('>h9s', 9, b"client_id"),# The client ID
struct.pack('>h8s', 8, b"group_id"), # The group to commit for
struct.pack('>i', 2), # Num topics
])
topic1 = "".join([
struct.pack(">h6s", 6, "topic1"), # Topic for the request
topic1 = b"".join([
struct.pack(">h6s", 6, b"topic1"), # Topic for the request
struct.pack(">i", 2), # Two partitions
struct.pack(">i", 0), # Partition 0
struct.pack(">i", 1), # Partition 1
])
topic2 = "".join([
struct.pack(">h6s", 6, "topic2"), # Topic for the request
topic2 = b"".join([
struct.pack(">h6s", 6, b"topic2"), # Topic for the request
struct.pack(">i", 1), # One partitions
struct.pack(">i", 2), # Partition 2
])
expected1 = "".join([ header, topic1, topic2 ])
expected2 = "".join([ header, topic2, topic1 ])
expected1 = b"".join([ header, topic1, topic2 ])
expected2 = b"".join([ header, topic2, topic1 ])
encoded = KafkaProtocol.encode_offset_fetch_request("client_id", 42, "group_id", [
OffsetFetchRequest("topic1", 0),
OffsetFetchRequest("topic1", 1),
OffsetFetchRequest("topic2", 2),
encoded = KafkaProtocol.encode_offset_fetch_request(b"client_id", 42, b"group_id", [
OffsetFetchRequest(b"topic1", 0),
OffsetFetchRequest(b"topic1", 1),
OffsetFetchRequest(b"topic2", 2),
])
self.assertIn(encoded, [ expected1, expected2 ])
def test_decode_offset_fetch_response(self):
encoded = "".join([
encoded = b"".join([
struct.pack(">i", 42), # Correlation ID
struct.pack(">i", 1), # One topics
struct.pack(">h6s", 6, "topic1"), # First topic
struct.pack(">h6s", 6, b"topic1"),# First topic
struct.pack(">i", 2), # Two partitions
struct.pack(">i", 2), # Partition 2
struct.pack(">q", 4), # Offset 4
struct.pack(">h4s", 4, "meta"), # Metadata
struct.pack(">h4s", 4, b"meta"), # Metadata
struct.pack(">h", 0), # No error
struct.pack(">i", 4), # Partition 4
struct.pack(">q", 8), # Offset 8
struct.pack(">h4s", 4, "meta"), # Metadata
struct.pack(">h4s", 4, b"meta"), # Metadata
struct.pack(">h", 0), # No error
])
results = KafkaProtocol.decode_offset_fetch_response(encoded)
self.assertEqual(set(results), set([
OffsetFetchResponse(topic = 'topic1', partition = 2, offset = 4, error = 0, metadata = "meta"),
OffsetFetchResponse(topic = 'topic1', partition = 4, offset = 8, error = 0, metadata = "meta"),
OffsetFetchResponse(topic = b'topic1', partition = 2, offset = 4, error = 0, metadata = b"meta"),
OffsetFetchResponse(topic = b'topic1', partition = 4, offset = 8, error = 0, metadata = b"meta"),
]))
@contextmanager
def mock_create_message_fns(self):
patches = nested(
patch.object(kafka.protocol, "create_message",
return_value=sentinel.message),
patch.object(kafka.protocol, "create_gzip_message",
return_value=sentinel.gzip_message),
patch.object(kafka.protocol, "create_snappy_message",
return_value=sentinel.snappy_message),
)
with patches:
yield
import kafka.protocol
with patch.object(kafka.protocol, "create_message",
return_value=sentinel.message):
with patch.object(kafka.protocol, "create_gzip_message",
return_value=sentinel.gzip_message):
with patch.object(kafka.protocol, "create_snappy_message",
return_value=sentinel.snappy_message):
yield
def test_create_message_set(self):
messages = [1, 2, 3]

View File

@@ -1,21 +1,22 @@
# -*- coding: utf-8 -*-
import struct
import unittest2
import six
from . import unittest
import kafka.util
import kafka.common
import kafka.util
class UtilTest(unittest2.TestCase):
@unittest2.skip("Unwritten")
class UtilTest(unittest.TestCase):
@unittest.skip("Unwritten")
def test_relative_unpack(self):
pass
def test_write_int_string(self):
self.assertEqual(
kafka.util.write_int_string('some string'),
'\x00\x00\x00\x0bsome string'
kafka.util.write_int_string(b'some string'),
b'\x00\x00\x00\x0bsome string'
)
def test_write_int_string__unicode(self):
@@ -23,34 +24,37 @@ class UtilTest(unittest2.TestCase):
kafka.util.write_int_string(u'unicode')
#: :type: TypeError
te = cm.exception
self.assertIn('unicode', te.message)
self.assertIn('to be str', te.message)
if six.PY2:
self.assertIn('unicode', str(te))
else:
self.assertIn('str', str(te))
self.assertIn('to be bytes', str(te))
def test_write_int_string__empty(self):
self.assertEqual(
kafka.util.write_int_string(''),
'\x00\x00\x00\x00'
kafka.util.write_int_string(b''),
b'\x00\x00\x00\x00'
)
def test_write_int_string__null(self):
self.assertEqual(
kafka.util.write_int_string(None),
'\xff\xff\xff\xff'
b'\xff\xff\xff\xff'
)
def test_read_int_string(self):
self.assertEqual(kafka.util.read_int_string('\xff\xff\xff\xff', 0), (None, 4))
self.assertEqual(kafka.util.read_int_string('\x00\x00\x00\x00', 0), ('', 4))
self.assertEqual(kafka.util.read_int_string('\x00\x00\x00\x0bsome string', 0), ('some string', 15))
self.assertEqual(kafka.util.read_int_string(b'\xff\xff\xff\xff', 0), (None, 4))
self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x00', 0), (b'', 4))
self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x0bsome string', 0), (b'some string', 15))
def test_read_int_string__insufficient_data(self):
with self.assertRaises(kafka.common.BufferUnderflowError):
kafka.util.read_int_string('\x00\x00\x00\x021', 0)
kafka.util.read_int_string(b'\x00\x00\x00\x021', 0)
def test_write_short_string(self):
self.assertEqual(
kafka.util.write_short_string('some string'),
'\x00\x0bsome string'
kafka.util.write_short_string(b'some string'),
b'\x00\x0bsome string'
)
def test_write_short_string__unicode(self):
@@ -58,29 +62,32 @@ class UtilTest(unittest2.TestCase):
kafka.util.write_short_string(u'hello')
#: :type: TypeError
te = cm.exception
self.assertIn('unicode', te.message)
self.assertIn('to be str', te.message)
if six.PY2:
self.assertIn('unicode', str(te))
else:
self.assertIn('str', str(te))
self.assertIn('to be bytes', str(te))
def test_write_short_string__empty(self):
self.assertEqual(
kafka.util.write_short_string(''),
'\x00\x00'
kafka.util.write_short_string(b''),
b'\x00\x00'
)
def test_write_short_string__null(self):
self.assertEqual(
kafka.util.write_short_string(None),
'\xff\xff'
b'\xff\xff'
)
def test_write_short_string__too_long(self):
with self.assertRaises(struct.error):
kafka.util.write_short_string(' ' * 33000)
kafka.util.write_short_string(b' ' * 33000)
def test_read_short_string(self):
self.assertEqual(kafka.util.read_short_string('\xff\xff', 0), (None, 2))
self.assertEqual(kafka.util.read_short_string('\x00\x00', 0), ('', 2))
self.assertEqual(kafka.util.read_short_string('\x00\x0bsome string', 0), ('some string', 13))
self.assertEqual(kafka.util.read_short_string(b'\xff\xff', 0), (None, 2))
self.assertEqual(kafka.util.read_short_string(b'\x00\x00', 0), (b'', 2))
self.assertEqual(kafka.util.read_short_string(b'\x00\x0bsome string', 0), (b'some string', 13))
def test_read_int_string__insufficient_data2(self):
with self.assertRaises(kafka.common.BufferUnderflowError):
@@ -88,7 +95,7 @@ class UtilTest(unittest2.TestCase):
def test_relative_unpack2(self):
self.assertEqual(
kafka.util.relative_unpack('>hh', '\x00\x01\x00\x00\x02', 0),
kafka.util.relative_unpack('>hh', b'\x00\x01\x00\x00\x02', 0),
((1, 0), 4)
)

View File

@@ -5,11 +5,13 @@ import random
import socket
import string
import time
import unittest2
import uuid
from kafka.common import OffsetRequest
from six.moves import xrange
from . import unittest
from kafka import KafkaClient
from kafka.common import OffsetRequest
__all__ = [
'random_string',
@@ -20,8 +22,8 @@ __all__ = [
]
def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l))
return s
s = "".join(random.choice(string.ascii_letters) for i in xrange(l))
return s.encode('utf-8')
def kafka_versions(*versions):
def kafka_versions(func):
@@ -45,7 +47,7 @@ def get_open_port():
sock.close()
return port
class KafkaIntegrationTestCase(unittest2.TestCase):
class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True
topic = None
server = None
@@ -56,7 +58,8 @@ class KafkaIntegrationTestCase(unittest2.TestCase):
return
if not self.topic:
self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10).decode('utf-8'))
self.topic = topic.encode('utf-8')
if self.create_client:
self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))
@@ -84,7 +87,7 @@ class KafkaIntegrationTestCase(unittest2.TestCase):
if s not in self._messages:
self._messages[s] = '%s-%s-%s' % (s, self.id(), str(uuid.uuid4()))
return self._messages[s]
return self._messages[s].encode('utf-8')
class Timer(object):
def __enter__(self):

22
tox.ini
View File

@@ -1,5 +1,5 @@
[tox]
envlist = lint, py26, py27, pypy
envlist = lint, py26, py27, pypy, py33, py34
[testenv]
deps =
unittest2
@@ -9,10 +9,28 @@ deps =
mock
python-snappy
commands =
nosetests {posargs:-v --with-id --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
setenv =
PROJECT_ROOT = {toxinidir}
[testenv:py33]
deps =
nose
nose-timer
coverage
mock
python-snappy
[testenv:py34]
deps =
nose
nose-timer
coverage
mock
python-snappy
[testenv:lint]
basepython = python2.7
deps =
unittest2
mock

View File

@@ -3,6 +3,10 @@
if [ $1 == "pypy" ]; then
echo "pypy"
elif [ $1 == "3.4" ]; then
echo "py34"
elif [ $1 == "3.3" ]; then
echo "py33"
elif [ $1 == "2.7" ]; then
echo "py27"
elif [ $1 == "2.6" ]; then