Add tests for encode_offset_request
This commit is contained in:
@@ -3,7 +3,7 @@ import unittest
|
||||
|
||||
from kafka import KafkaClient
|
||||
from kafka.common import (
|
||||
ProduceRequest, FetchRequest, Message, ChecksumError,
|
||||
OffsetRequest, ProduceRequest, FetchRequest, Message, ChecksumError,
|
||||
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
|
||||
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
|
||||
TopicAndPartition, KafkaUnavailableError, ProtocolError,
|
||||
@@ -522,9 +522,49 @@ class TestProtocol(unittest.TestCase):
|
||||
decoded = KafkaProtocol.decode_metadata_response(encoded)
|
||||
self.assertEqual(decoded, (node_brokers, topic_partitions))
|
||||
|
||||
@unittest.skip("Not Implemented")
|
||||
def test_encode_offset_request(self):
|
||||
pass
|
||||
expected = "".join([
|
||||
struct.pack(">i", 21), # Total length of the request
|
||||
struct.pack('>h', 2), # Message type = offset fetch
|
||||
struct.pack('>h', 0), # API version
|
||||
struct.pack('>i', 4), # Correlation ID
|
||||
struct.pack('>h3s', 3, "cid"), # The client ID
|
||||
struct.pack('>i', -1), # Replica Id
|
||||
struct.pack('>i', 0), # No topic/partitions
|
||||
])
|
||||
|
||||
encoded = KafkaProtocol.encode_offset_request("cid", 4)
|
||||
|
||||
self.assertEqual(encoded, expected)
|
||||
|
||||
def test_encode_offset_request__no_payload(self):
|
||||
expected = "".join([
|
||||
struct.pack(">i", 65), # Total length of the request
|
||||
|
||||
struct.pack('>h', 2), # Message type = offset fetch
|
||||
struct.pack('>h', 0), # API version
|
||||
struct.pack('>i', 4), # Correlation ID
|
||||
struct.pack('>h3s', 3, "cid"), # The client ID
|
||||
struct.pack('>i', -1), # Replica Id
|
||||
struct.pack('>i', 1), # Num topics
|
||||
struct.pack(">h6s", 6, "topic1"), # Topic for the request
|
||||
struct.pack(">i", 2), # Two partitions
|
||||
|
||||
struct.pack(">i", 3), # Partition 3
|
||||
struct.pack(">q", -1), # No time offset
|
||||
struct.pack(">i", 1), # One offset requested
|
||||
|
||||
struct.pack(">i", 4), # Partition 3
|
||||
struct.pack(">q", -1), # No time offset
|
||||
struct.pack(">i", 1), # One offset requested
|
||||
])
|
||||
|
||||
encoded = KafkaProtocol.encode_offset_request("cid", 4, [
|
||||
OffsetRequest('topic1', 3, -1, 1),
|
||||
OffsetRequest('topic1', 4, -1, 1),
|
||||
])
|
||||
|
||||
self.assertEqual(encoded, expected)
|
||||
|
||||
@unittest.skip("Not Implemented")
|
||||
def test_decode_offset_response(self):
|
||||
|
||||
Reference in New Issue
Block a user