Add KeyedProducer test with null payloads
This commit is contained in:
		| @@ -341,6 +341,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): | |||||||
|     #   KeyedProducer Tests    # |     #   KeyedProducer Tests    # | ||||||
|     ############################ |     ############################ | ||||||
|  |  | ||||||
|  |     @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") | ||||||
|  |     def test_keyedproducer_null_payload(self): | ||||||
|  |         partitions = self.client.get_partition_ids_for_topic(self.topic) | ||||||
|  |         start_offsets = [self.current_offset(self.topic, p) for p in partitions] | ||||||
|  |  | ||||||
|  |         producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) | ||||||
|  |         key = "test" | ||||||
|  |  | ||||||
|  |         resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) | ||||||
|  |         self.assert_produce_response(resp, start_offsets[0]) | ||||||
|  |         resp = producer.send_messages(self.topic, self.key("key2"), None) | ||||||
|  |         self.assert_produce_response(resp, start_offsets[1]) | ||||||
|  |         resp = producer.send_messages(self.topic, self.key("key3"), None) | ||||||
|  |         self.assert_produce_response(resp, start_offsets[0]+1) | ||||||
|  |         resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four")) | ||||||
|  |         self.assert_produce_response(resp, start_offsets[1]+1) | ||||||
|  |  | ||||||
|  |         self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ]) | ||||||
|  |         self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ]) | ||||||
|  |  | ||||||
|  |         producer.stop() | ||||||
|  |  | ||||||
|     @kafka_versions("all") |     @kafka_versions("all") | ||||||
|     def test_round_robin_partitioner(self): |     def test_round_robin_partitioner(self): | ||||||
|         partitions = self.client.get_partition_ids_for_topic(self.topic) |         partitions = self.client.get_partition_ids_for_topic(self.topic) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Viktor Shlapakov
					Viktor Shlapakov