 e35835719e
			
		
	
	e35835719e
	
	
	
		
			
			- previous 0.8.1 src appears to have actually been 0.8.1.1 based on git submodule commit. So technically this adds back 0.8.1 to the test suite.
		
			
				
	
	
		
			258 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			258 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| from datetime import datetime
 | |
| 
 | |
| from kafka import *  # noqa
 | |
| from kafka.common import *  # noqa
 | |
| from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
 | |
| from fixtures import ZookeeperFixture, KafkaFixture
 | |
| from testutil import *
 | |
| 
 | |
| class TestConsumerIntegration(KafkaIntegrationTestCase):
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         if not os.environ.get('KAFKA_VERSION'):
 | |
|             return
 | |
| 
 | |
|         cls.zk = ZookeeperFixture.instance()
 | |
|         cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
 | |
|         cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
 | |
| 
 | |
|         cls.server = cls.server1 # Bootstrapping server
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         if not os.environ.get('KAFKA_VERSION'):
 | |
|             return
 | |
| 
 | |
|         cls.server1.close()
 | |
|         cls.server2.close()
 | |
|         cls.zk.close()
 | |
| 
 | |
|     def send_messages(self, partition, messages):
 | |
|         messages = [ create_message(self.msg(str(msg))) for msg in messages ]
 | |
|         produce = ProduceRequest(self.topic, partition, messages = messages)
 | |
|         resp, = self.client.send_produce_request([produce])
 | |
|         self.assertEquals(resp.error, 0)
 | |
| 
 | |
|         return [ x.value for x in messages ]
 | |
| 
 | |
|     def assert_message_count(self, messages, num_messages):
 | |
|         # Make sure we got them all
 | |
|         self.assertEquals(len(messages), num_messages)
 | |
| 
 | |
|         # Make sure there are no duplicates
 | |
|         self.assertEquals(len(set(messages)), num_messages)
 | |
| 
 | |
|     @kafka_versions("all")
 | |
|     def test_simple_consumer(self):
 | |
|         self.send_messages(0, range(0, 100))
 | |
|         self.send_messages(1, range(100, 200))
 | |
| 
 | |
|         # Start a consumer
 | |
|         consumer = self.consumer()
 | |
| 
 | |
|         self.assert_message_count([ message for message in consumer ], 200)
 | |
| 
 | |
|         consumer.stop()
 | |
| 
 | |
|     @kafka_versions("all")
 | |
|     def test_simple_consumer__seek(self):
 | |
|         self.send_messages(0, range(0, 100))
 | |
|         self.send_messages(1, range(100, 200))
 | |
| 
 | |
|         consumer = self.consumer()
 | |
| 
 | |
|         # Rewind 10 messages from the end
 | |
|         consumer.seek(-10, 2)
 | |
|         self.assert_message_count([ message for message in consumer ], 10)
 | |
| 
 | |
|         # Rewind 13 messages from the end
 | |
|         consumer.seek(-13, 2)
 | |
|         self.assert_message_count([ message for message in consumer ], 13)
 | |
| 
 | |
|         consumer.stop()
 | |
| 
 | |
|     @kafka_versions("all")
 | |
|     def test_simple_consumer_blocking(self):
 | |
|         consumer = self.consumer()
 | |
| 
 | |
|         # Ask for 5 messages, nothing in queue, block 5 seconds
 | |
|         with Timer() as t:
 | |
|             messages = consumer.get_messages(block=True, timeout=5)
 | |
|             self.assert_message_count(messages, 0)
 | |
|         self.assertGreaterEqual(t.interval, 5)
 | |
| 
 | |
|         self.send_messages(0, range(0, 10))
 | |
| 
 | |
|         # Ask for 5 messages, 10 in queue. Get 5 back, no blocking
 | |
|         with Timer() as t:
 | |
|             messages = consumer.get_messages(count=5, block=True, timeout=5)
 | |
|             self.assert_message_count(messages, 5)
 | |
|         self.assertLessEqual(t.interval, 1)
 | |
| 
 | |
|         # Ask for 10 messages, get 5 back, block 5 seconds
 | |
|         with Timer() as t:
 | |
|             messages = consumer.get_messages(count=10, block=True, timeout=5)
 | |
|             self.assert_message_count(messages, 5)
 | |
|         self.assertGreaterEqual(t.interval, 5)
 | |
| 
 | |
|         consumer.stop()
 | |
| 
 | |
|     @kafka_versions("all")
 | |
|     def test_simple_consumer_pending(self):
 | |
|         # Produce 10 messages to partitions 0 and 1
 | |
|         self.send_messages(0, range(0, 10))
 | |
|         self.send_messages(1, range(10, 20))
 | |
| 
 | |
|         consumer = self.consumer()
 | |
| 
 | |
|         self.assertEquals(consumer.pending(), 20)
 | |
|         self.assertEquals(consumer.pending(partitions=[0]), 10)
 | |
|         self.assertEquals(consumer.pending(partitions=[1]), 10)
 | |
| 
 | |
|         consumer.stop()
 | |
| 
 | |
|     @kafka_versions("all")
 | |
|     def test_multi_process_consumer(self):
 | |
|         # Produce 100 messages to partitions 0 and 1
 | |
|         self.send_messages(0, range(0, 100))
 | |
|         self.send_messages(1, range(100, 200))
 | |
| 
 | |
|         consumer = self.consumer(consumer = MultiProcessConsumer)
 | |
| 
 | |
|         self.assert_message_count([ message for message in consumer ], 200)
 | |
| 
 | |
|         consumer.stop()
 | |
| 
 | |
|     @kafka_versions("all")
 | |
|     def test_multi_process_consumer_blocking(self):
 | |
|         consumer = self.consumer(consumer = MultiProcessConsumer)
 | |
| 
 | |
|         # Ask for 5 messages, No messages in queue, block 5 seconds
 | |
|         with Timer() as t:
 | |
|             messages = consumer.get_messages(block=True, timeout=5)
 | |
|             self.assert_message_count(messages, 0)
 | |
| 
 | |
|         self.assertGreaterEqual(t.interval, 5)
 | |
| 
 | |
|         # Send 10 messages
 | |
|         self.send_messages(0, range(0, 10))
 | |
| 
 | |
|         # Ask for 5 messages, 10 messages in queue, block 0 seconds
 | |
|         with Timer() as t:
 | |
|             messages = consumer.get_messages(count=5, block=True, timeout=5)
 | |
|             self.assert_message_count(messages, 5)
 | |
|         self.assertLessEqual(t.interval, 1)
 | |
| 
 | |
|         # Ask for 10 messages, 5 in queue, block 5 seconds
 | |
|         with Timer() as t:
 | |
|             messages = consumer.get_messages(count=10, block=True, timeout=5)
 | |
|             self.assert_message_count(messages, 5)
 | |
|         self.assertGreaterEqual(t.interval, 5)
 | |
| 
 | |
|         consumer.stop()
 | |
| 
 | |
|     @kafka_versions("all")
 | |
|     def test_multi_proc_pending(self):
 | |
|         self.send_messages(0, range(0, 10))
 | |
|         self.send_messages(1, range(10, 20))
 | |
| 
 | |
|         consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
 | |
| 
 | |
|         self.assertEquals(consumer.pending(), 20)
 | |
|         self.assertEquals(consumer.pending(partitions=[0]), 10)
 | |
|         self.assertEquals(consumer.pending(partitions=[1]), 10)
 | |
| 
 | |
|         consumer.stop()
 | |
| 
 | |
|     @kafka_versions("all")
 | |
|     def test_large_messages(self):
 | |
|         # Produce 10 "normal" size messages
 | |
|         small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
 | |
| 
 | |
|         # Produce 10 messages that are large (bigger than default fetch size)
 | |
|         large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ])
 | |
| 
 | |
|         # Consumer should still get all of them
 | |
|         consumer = self.consumer()
 | |
| 
 | |
|         expected_messages = set(small_messages + large_messages)
 | |
|         actual_messages = set([ x.message.value for x in consumer ])
 | |
|         self.assertEqual(expected_messages, actual_messages)
 | |
| 
 | |
|         consumer.stop()
 | |
| 
 | |
|     @kafka_versions("all")
 | |
|     def test_huge_messages(self):
 | |
|         huge_message, = self.send_messages(0, [
 | |
|             create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
 | |
|         ])
 | |
| 
 | |
|         # Create a consumer with the default buffer size
 | |
|         consumer = self.consumer()
 | |
| 
 | |
|         # This consumer failes to get the message
 | |
|         with self.assertRaises(ConsumerFetchSizeTooSmall):
 | |
|             consumer.get_message(False, 0.1)
 | |
| 
 | |
|         consumer.stop()
 | |
| 
 | |
|         # Create a consumer with no fetch size limit
 | |
|         big_consumer = self.consumer(
 | |
|             max_buffer_size = None,
 | |
|             partitions = [0],
 | |
|         )
 | |
| 
 | |
|         # Seek to the last message
 | |
|         big_consumer.seek(-1, 2)
 | |
| 
 | |
|         # Consume giant message successfully
 | |
|         message = big_consumer.get_message(block=False, timeout=10)
 | |
|         self.assertIsNotNone(message)
 | |
|         self.assertEquals(message.message.value, huge_message)
 | |
| 
 | |
|         big_consumer.stop()
 | |
| 
 | |
|     @kafka_versions("0.8.1", "0.8.1.1")
 | |
|     def test_offset_behavior__resuming_behavior(self):
 | |
|         msgs1 = self.send_messages(0, range(0, 100))
 | |
|         msgs2 = self.send_messages(1, range(100, 200))
 | |
| 
 | |
|         # Start a consumer
 | |
|         consumer1 = self.consumer(
 | |
|             auto_commit_every_t = None,
 | |
|             auto_commit_every_n = 20,
 | |
|         )
 | |
| 
 | |
|         # Grab the first 195 messages
 | |
|         output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ]
 | |
|         self.assert_message_count(output_msgs1, 195)
 | |
| 
 | |
|         # The total offset across both partitions should be at 180
 | |
|         consumer2 = self.consumer(
 | |
|             auto_commit_every_t = None,
 | |
|             auto_commit_every_n = 20,
 | |
|         )
 | |
| 
 | |
|         # 181-200
 | |
|         self.assert_message_count([ message for message in consumer2 ], 20)
 | |
| 
 | |
|         consumer1.stop()
 | |
|         consumer2.stop()
 | |
| 
 | |
|     def consumer(self, **kwargs):
 | |
|         if os.environ['KAFKA_VERSION'] == "0.8.0":
 | |
|             # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
 | |
|             kwargs['auto_commit'] = False
 | |
|         else:
 | |
|             kwargs.setdefault('auto_commit', True)
 | |
| 
 | |
|         consumer_class = kwargs.pop('consumer', SimpleConsumer)
 | |
|         group = kwargs.pop('group', self.id())
 | |
|         topic = kwargs.pop('topic', self.topic)
 | |
| 
 | |
|         if consumer_class == SimpleConsumer:
 | |
|             kwargs.setdefault('iter_timeout', 0)
 | |
| 
 | |
|         return consumer_class(self.client, group, topic, **kwargs)
 |