Adding some integration tests

This commit is contained in:
David Arthur
2012-09-28 20:46:50 -04:00
parent 51d8bbb231
commit 478de24d5b
3 changed files with 44 additions and 8 deletions

View File

@@ -4,7 +4,7 @@ from kafka import KafkaClient, FetchRequest, ProduceRequest
def produce_example(kafka):
message = kafka.create_message("testing")
request = ProduceRequest("my-topic", 0, [message])
request = ProduceRequest("my-topic", -1, [message])
kafka.send_message_set(request)
def consume_example(kafka):

View File

@@ -584,18 +584,17 @@ class KafkaClient(object):
# Simple User API #
#######################
def send_messages_simple(self, topic, partition, *payloads):
def send_messages_simple(self, topic, *payloads):
"""
Send one or more strings to Kafka
Params
======
topic: string
partition: int
payloads: strings
"""
messages = tuple([create_message(payload) for payload in payloads])
self.send_message_set(ProduceRequest(topic, partition, messages))
self.send_message_set(ProduceRequest(topic, -1, messages))
def iter_messages(self, topic, partition, offset, size, auto=True):
"""

View File

@@ -83,13 +83,50 @@ class IntegrationTest(unittest.TestCase):
self.kafka = KafkaClient("localhost", port)
def test_produce(self):
req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")])
req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'my-topic'-0"))
self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0"))
req = ProduceRequest("my-topic", 1, [KafkaClient.create_message("testing")])
req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'my-topic'-1"))
self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1"))
def test_produce_consume(self):
message1 = KafkaClient.create_message("testing 1")
message2 = KafkaClient.create_message("testing 2")
req = ProduceRequest("test-produce-consume", 0, [message1, message2])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0"))
time.sleep(1)
req = FetchRequest("test-produce-consume", 0, 0, 1024)
(messages, req) = self.kafka.get_message_set(req)
self.assertEquals(len(messages), 2)
self.assertEquals(messages[0], message1)
self.assertEquals(messages[1], message2)
message3 = KafkaClient.create_message("testing 3")
message4 = KafkaClient.create_message("testing 4")
req = ProduceRequest("test-produce-consume", 1, [message3, message4])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1"))
time.sleep(1)
req = FetchRequest("test-produce-consume", 1, 0, 1024)
(messages, req) = self.kafka.get_message_set(req)
self.assertEquals(len(messages), 2)
self.assertEquals(messages[0], message3)
self.assertEquals(messages[1], message4)
def test_check_offset(self):
message1 = KafkaClient.create_message("testing 1")
req = ProduceRequest("test-check-offset", 0, [message1])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0"))
time.sleep(1)
req = FetchRequest("test-check-offset", 0, 0, 1024)
(messages, req) = self.kafka.get_message_set(req)
self.assertEquals(len(messages), 1)
self.assertEquals(messages[0], message1)
assertEquals(req.offset, len(KafkaClient.encode_message(message1)))
def tearDown(self):
self.kafka.close()