Split up and speed up producer based integration tests

This commit is contained in:
Mark Roberts
2014-04-17 16:11:57 -07:00
parent 7eaca8eea7
commit 8983e73437
4 changed files with 345 additions and 73 deletions

View File

@@ -101,7 +101,7 @@ class Consumer(object):
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
return 0
else:
raise Exception("OffsetFetchRequest for topic=%s, "
raise ProtocolError("OffsetFetchRequest for topic=%s, "
"partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))

View File

@@ -54,4 +54,5 @@ class HashedPartitioner(Partitioner):
def partition(self, key, partitions):
size = len(partitions)
idx = hash(key) % size
return partitions[idx]

View File

@@ -1,5 +1,5 @@
import unittest
import time
import unittest
from kafka import * # noqa
from kafka.common import * # noqa
@@ -14,48 +14,35 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))
@classmethod
def tearDownClass(cls): # noqa
cls.client.close()
cls.server.close()
cls.zk.close()
def test_produce_many_simple(self):
start_offset = self.current_offset(self.topic, 0)
produce = ProduceRequest(self.topic, 0, messages=[
create_message("Test message %d" % i) for i in range(100)
])
self.assert_produce_request(
[ create_message("Test message %d" % i) for i in range(100) ],
start_offset,
100,
)
resp = self.client.send_produce_request([produce])
self.assertEqual(len(resp), 1) # Only one response
self.assertEqual(resp[0].error, 0) # No error
self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
self.assertEqual(self.current_offset(self.topic, 0), start_offset+100)
resp = self.client.send_produce_request([produce])
self.assertEqual(len(resp), 1) # Only one response
self.assertEqual(resp[0].error, 0) # No error
self.assertEqual(resp[0].offset, start_offset+100) # Initial offset of first message
self.assertEqual(self.current_offset(self.topic, 0), start_offset+200)
self.assert_produce_request(
[ create_message("Test message %d" % i) for i in range(100) ],
start_offset+100,
100,
)
def test_produce_10k_simple(self):
start_offset = self.current_offset(self.topic, 0)
produce = ProduceRequest(self.topic, 0, messages=[
create_message("Test message %d" % i) for i in range(10000)
])
resp = self.client.send_produce_request([produce])
self.assertEqual(len(resp), 1) # Only one response
self.assertEqual(resp[0].error, 0) # No error
self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
self.assertEqual(self.current_offset(self.topic, 0), start_offset+10000)
self.assert_produce_request(
[ create_message("Test message %d" % i) for i in range(10000) ],
start_offset,
10000,
)
def test_produce_many_gzip(self):
start_offset = self.current_offset(self.topic, 0)
@@ -63,31 +50,23 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
produce = ProduceRequest(self.topic, 0, messages=[message1, message2])
resp = self.client.send_produce_request([produce])
self.assertEqual(len(resp), 1) # Only one response
self.assertEqual(resp[0].error, 0) # No error
self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
self.assertEqual(self.current_offset(self.topic, 0), start_offset+200)
self.assert_produce_request(
[ message1, message2 ],
start_offset,
200,
)
@unittest.skip("All snappy integration tests fail with nosnappyjava")
def test_produce_many_snappy(self):
start_offset = self.current_offset(self.topic, 0)
produce = ProduceRequest(self.topic, 0, messages=[
create_snappy_message(["Snappy 1 %d" % i for i in range(100)]),
create_snappy_message(["Snappy 2 %d" % i for i in range(100)]),
])
resp = self.client.send_produce_request([produce])
self.assertEqual(len(resp), 1) # Only one response
self.assertEqual(resp[0].error, 0) # No error
self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
self.assertEqual(self.current_offset(self.topic, 0), start_offset+200)
self.assert_produce_request([
create_snappy_message(["Snappy 1 %d" % i for i in range(100)]),
create_snappy_message(["Snappy 2 %d" % i for i in range(100)]),
],
start_offset,
200,
)
def test_produce_mixed(self):
start_offset = self.current_offset(self.topic, 0)
@@ -103,37 +82,282 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
msg_count += 100
messages.append(create_snappy_message(["Snappy %d" % i for i in range(100)]))
produce = ProduceRequest(self.topic, 0, messages=messages)
resp = self.client.send_produce_request([produce])
self.assertEqual(len(resp), 1) # Only one response
self.assertEqual(resp[0].error, 0) # No error
self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
self.assertEqual(self.current_offset(self.topic, 0), start_offset+msg_count)
self.assert_produce_request(messages, start_offset, msg_count)
def test_produce_100k_gzipped(self):
start_offset = self.current_offset(self.topic, 0)
req1 = ProduceRequest(self.topic, 0, messages=[
create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
])
resp1 = self.client.send_produce_request([req1])
self.assert_produce_request([
create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
],
start_offset,
50000,
)
self.assertEqual(len(resp1), 1) # Only one response
self.assertEqual(resp1[0].error, 0) # No error
self.assertEqual(resp1[0].offset, start_offset) # Initial offset of first message
self.assert_produce_request([
create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
],
start_offset+50000,
50000,
)
self.assertEqual(self.current_offset(self.topic, 0), start_offset+50000)
############################
# SimpleProducer Tests #
############################
req2 = ProduceRequest(self.topic, 0, messages=[
create_gzip_message(["Gzipped batch 2, message %d" % i for i in range(50000)])
def test_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client)
# Will go to partition 0
msg1, msg2, msg3, msg4, msg5 = [ str(uuid.uuid4()) for x in xrange(5) ]
resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
self.assert_produce_response(resp, start_offset0)
# Will go to partition 1
resp = producer.send_messages(self.topic, self.msg("three"))
self.assert_produce_response(resp, start_offset1)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two") ])
self.assert_fetch_offset(1, start_offset1, [ self.msg("three") ])
# Will go to partition 0
resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
self.assert_produce_response(resp, start_offset0+2)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ])
producer.stop()
def test_round_robin_partitioner(self):
msg1, msg2, msg3, msg4 = [ str(uuid.uuid4()) for _ in range(4) ]
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
resp1 = producer.send(self.topic, "key1", self.msg("one"))
resp2 = producer.send(self.topic, "key2", self.msg("two"))
resp3 = producer.send(self.topic, "key3", self.msg("three"))
resp4 = producer.send(self.topic, "key4", self.msg("four"))
self.assert_produce_response(resp1, start_offset0+0)
self.assert_produce_response(resp2, start_offset1+0)
self.assert_produce_response(resp3, start_offset0+1)
self.assert_produce_response(resp4, start_offset1+1)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("three") ])
self.assert_fetch_offset(1, start_offset1, [ self.msg("two"), self.msg("four") ])
producer.stop()
def test_hashed_partitioner(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
resp1 = producer.send(self.topic, 1, self.msg("one"))
resp2 = producer.send(self.topic, 2, self.msg("two"))
resp3 = producer.send(self.topic, 3, self.msg("three"))
resp4 = producer.send(self.topic, 3, self.msg("four"))
resp5 = producer.send(self.topic, 4, self.msg("five"))
self.assert_produce_response(resp1, start_offset1+0)
self.assert_produce_response(resp2, start_offset0+0)
self.assert_produce_response(resp3, start_offset1+1)
self.assert_produce_response(resp4, start_offset1+2)
self.assert_produce_response(resp5, start_offset0+1)
self.assert_fetch_offset(0, start_offset0, [ self.msg("two"), self.msg("five") ])
self.assert_fetch_offset(1, start_offset1, [ self.msg("one"), self.msg("three"), self.msg("four") ])
producer.stop()
def test_acks_none(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEquals(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
producer.stop()
def test_acks_local_write(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assert_produce_response(resp, start_offset0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
producer.stop()
def test_acks_cluster_commit(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(
self.client,
req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assert_produce_response(resp, start_offset0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
producer.stop()
def test_batched_simple_producer__triggers_by_message(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client,
batch_send=True,
batch_send_every_n=5,
batch_send_every_t=20)
# Send 5 messages and do a fetch
resp = producer.send_messages(self.topic,
self.msg("one"),
self.msg("two"),
self.msg("three"),
self.msg("four"),
)
# Batch mode is async. No ack
self.assertEquals(len(resp), 0)
# It hasn't sent yet
self.assert_fetch_offset(0, start_offset0, [])
self.assert_fetch_offset(1, start_offset1, [])
resp = producer.send_messages(self.topic,
self.msg("five"),
self.msg("six"),
self.msg("seven"),
)
# Batch mode is async. No ack
self.assertEquals(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [
self.msg("one"),
self.msg("two"),
self.msg("three"),
self.msg("four"),
])
resp2 = self.client.send_produce_request([req2])
self.assert_fetch_offset(1, start_offset1, [
self.msg("five"),
# self.msg("six"),
# self.msg("seven"),
])
self.assertEqual(len(resp2), 1) # Only one response
self.assertEqual(resp2[0].error, 0) # No error
self.assertEqual(resp2[0].offset, start_offset+50000) # Initial offset of first message
producer.stop()
self.assertEqual(self.current_offset(self.topic, 0), start_offset+100000)
def test_batched_simple_producer__triggers_by_time(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client,
batch_send=True,
batch_send_every_n=100,
batch_send_every_t=5)
# Send 5 messages and do a fetch
resp = producer.send_messages(self.topic,
self.msg("one"),
self.msg("two"),
self.msg("three"),
self.msg("four"),
)
# Batch mode is async. No ack
self.assertEquals(len(resp), 0)
# It hasn't sent yet
self.assert_fetch_offset(0, start_offset0, [])
self.assert_fetch_offset(1, start_offset1, [])
resp = producer.send_messages(self.topic,
self.msg("five"),
self.msg("six"),
self.msg("seven"),
)
# Batch mode is async. No ack
self.assertEquals(len(resp), 0)
# Wait the timeout out
time.sleep(5)
self.assert_fetch_offset(0, start_offset0, [
self.msg("one"),
self.msg("two"),
self.msg("three"),
self.msg("four"),
])
self.assert_fetch_offset(1, start_offset1, [
self.msg("five"),
self.msg("six"),
self.msg("seven"),
])
producer.stop()
def test_async_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, async=True)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEquals(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
producer.stop()
def test_async_keyed_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
resp = producer.send(self.topic, "key1", self.msg("one"))
self.assertEquals(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
producer.stop()
def assert_produce_request(self, messages, initial_offset, message_ct):
produce = ProduceRequest(self.topic, 0, messages=messages)
# There should only be one response message from the server.
# This will throw an exception if there's more than one.
resp = self.client.send_produce_request([ produce ])
self.assert_produce_response(resp, initial_offset)
self.assertEqual(self.current_offset(self.topic, 0), initial_offset + message_ct)
def assert_produce_response(self, resp, initial_offset):
self.assertEqual(len(resp), 1)
self.assertEqual(resp[0].error, 0)
self.assertEqual(resp[0].offset, initial_offset)
def assert_fetch_offset(self, partition, start_offset, expected_messages):
# There should only be one response message from the server.
# This will throw an exception if there's more than one.
resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ])
self.assertEquals(resp.error, 0)
self.assertEquals(resp.partition, partition)
messages = [ x.message.value for x in resp.messages ]
self.assertEqual(messages, expected_messages)
self.assertEquals(resp.highwaterMark, start_offset+len(expected_messages))

View File

@@ -1,6 +1,12 @@
import uuid
import time
import unittest
import os
import random
import string
import logging
from kafka.common import OffsetRequest
from kafka import KafkaClient
def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l))
@@ -8,3 +14,44 @@ def random_string(l):
def skip_integration():
return os.environ.get('SKIP_INTEGRATION')
def ensure_topic_creation(client, topic_name, timeout = 30):
start_time = time.time()
client.load_metadata_for_topics(topic_name)
while not client.has_metadata_for_topic(topic_name):
if time.time() > start_time + timeout:
raise Exception("Unable to create topic %s" % topic_name)
client.load_metadata_for_topics(topic_name)
time.sleep(1)
class KafkaIntegrationTestCase(unittest.TestCase):
topic = None
def setUp(self):
super(KafkaIntegrationTestCase, self).setUp()
if not self.topic:
self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))
ensure_topic_creation(self.client, self.topic)
self._messages = {}
def tearDown(self):
super(KafkaIntegrationTestCase, self).tearDown()
self.client.close()
def current_offset(self, topic, partition):
offsets, = self.client.send_offset_request([ OffsetRequest(topic, partition, -1, 1) ])
return offsets.offsets[0]
def msgs(self, iterable):
return [ self.msg(x) for x in iterable ]
def msg(self, s):
if s not in self._messages:
self._messages[s] = '%s-%s-%s' % (s, self.id(), str(uuid.uuid4()))
return self._messages[s]
logging.basicConfig(level=logging.DEBUG)