68 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			68 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| 
 | |
| from kafka.common import (
 | |
|     FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
 | |
|     KafkaTimeoutError
 | |
| )
 | |
| 
 | |
| from test.fixtures import ZookeeperFixture, KafkaFixture
 | |
| from test.testutil import (
 | |
|     KafkaIntegrationTestCase, kafka_versions
 | |
| )
 | |
| 
 | |
| class TestKafkaClientIntegration(KafkaIntegrationTestCase):
 | |
|     @classmethod
 | |
|     def setUpClass(cls):  # noqa
 | |
|         if not os.environ.get('KAFKA_VERSION'):
 | |
|             return
 | |
| 
 | |
|         cls.zk = ZookeeperFixture.instance()
 | |
|         cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):  # noqa
 | |
|         if not os.environ.get('KAFKA_VERSION'):
 | |
|             return
 | |
| 
 | |
|         cls.server.close()
 | |
|         cls.zk.close()
 | |
| 
 | |
|     @kafka_versions("all")
 | |
|     def test_consume_none(self):
 | |
|         fetch = FetchRequest(self.topic, 0, 0, 1024)
 | |
| 
 | |
|         fetch_resp, = self.client.send_fetch_request([fetch])
 | |
|         self.assertEqual(fetch_resp.error, 0)
 | |
|         self.assertEqual(fetch_resp.topic, self.topic)
 | |
|         self.assertEqual(fetch_resp.partition, 0)
 | |
| 
 | |
|         messages = list(fetch_resp.messages)
 | |
|         self.assertEqual(len(messages), 0)
 | |
| 
 | |
|     @kafka_versions("all")
 | |
|     def test_ensure_topic_exists(self):
 | |
| 
 | |
|         # assume that self.topic was created by setUp
 | |
|         # if so, this should succeed
 | |
|         self.client.ensure_topic_exists(self.topic, timeout=1)
 | |
| 
 | |
|         # ensure_topic_exists should fail with KafkaTimeoutError
 | |
|         with self.assertRaises(KafkaTimeoutError):
 | |
|             self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)
 | |
| 
 | |
|     ####################
 | |
|     #   Offset Tests   #
 | |
|     ####################
 | |
| 
 | |
|     @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
 | |
|     def test_commit_fetch_offsets(self):
 | |
|         req = OffsetCommitRequest(self.topic, 0, 42, b"metadata")
 | |
|         (resp,) = self.client.send_offset_commit_request(b"group", [req])
 | |
|         self.assertEqual(resp.error, 0)
 | |
| 
 | |
|         req = OffsetFetchRequest(self.topic, 0)
 | |
|         (resp,) = self.client.send_offset_fetch_request(b"group", [req])
 | |
|         self.assertEqual(resp.error, 0)
 | |
|         self.assertEqual(resp.offset, 42)
 | |
|         self.assertEqual(resp.metadata, b"")  # Metadata isn't stored for now
 | 
