Update more tests, fix intermittent failure
This commit is contained in:
@@ -249,15 +249,15 @@ class TestProtocol(unittest.TestCase):
|
||||
struct.pack(">i", 2), # Length of value
|
||||
"v1", # Value
|
||||
|
||||
struct.pack(">q", 1), # MsgSet Offset
|
||||
struct.pack(">i", 18), # Msg Size
|
||||
struct.pack(">i", -16383415), # CRC
|
||||
struct.pack(">bb", 0, 0), # Magic, flags
|
||||
struct.pack(">i", 2), # Length of key
|
||||
"k2", # Key
|
||||
struct.pack(">i", 2), # Length of value
|
||||
"v2", # Value
|
||||
"@1$%(Y!", # Random padding
|
||||
struct.pack(">q", 1), # MsgSet Offset
|
||||
struct.pack(">i", 18), # Msg Size
|
||||
struct.pack(">i", -16383415), # CRC
|
||||
struct.pack(">bb", 0, 0), # Magic, flags
|
||||
struct.pack(">i", 2), # Length of key
|
||||
"k2", # Key
|
||||
struct.pack(">i", 2), # Length of value
|
||||
"v2", # Value
|
||||
"@1$%(Y!", # Random padding
|
||||
])
|
||||
|
||||
msgs = list(KafkaProtocol._decode_message_set_iter(encoded))
|
||||
@@ -284,19 +284,59 @@ class TestProtocol(unittest.TestCase):
|
||||
])
|
||||
]
|
||||
|
||||
expect = ('\x00\x00\x00\x94\x00\x00\x00\x00\x00\x00\x00\x02\x00\x07'
|
||||
'client1\x00\x02\x00\x00\x00d\x00\x00\x00\x02\x00\x06topic1'
|
||||
'\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00'
|
||||
'\x00\x00\x00\x00\x00\x00\x00\x00\x0fQ\xdf:2\x00\x00\xff\xff'
|
||||
'\xff\xff\x00\x00\x00\x01a\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
'\x00\x00\x00\x0f\xc8\xd6k\x88\x00\x00\xff\xff\xff\xff\x00'
|
||||
'\x00\x00\x01b\x00\x06topic2\x00\x00\x00\x01\x00\x00\x00\x01'
|
||||
'\x00\x00\x00\x1b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
'\x00\x0f\xbf\xd1[\x1e\x00\x00\xff\xff\xff\xff\x00\x00\x00'
|
||||
'\x01c')
|
||||
encoded = KafkaProtocol.encode_produce_request("client1", 2, requests,
|
||||
2, 100)
|
||||
self.assertEqual(encoded, expect)
|
||||
msg_a_binary = KafkaProtocol._encode_message(create_message("a"))
|
||||
msg_b_binary = KafkaProtocol._encode_message(create_message("b"))
|
||||
msg_c_binary = KafkaProtocol._encode_message(create_message("c"))
|
||||
|
||||
header = "".join([
|
||||
struct.pack('>i', 0x94), # The length of the message overall
|
||||
struct.pack('>h', 0), # Msg Header, Message type = Produce
|
||||
struct.pack('>h', 0), # Msg Header, API version
|
||||
struct.pack('>i', 2), # Msg Header, Correlation ID
|
||||
struct.pack('>h7s', 7, "client1"), # Msg Header, The client ID
|
||||
struct.pack('>h', 2), # Num acks required
|
||||
struct.pack('>i', 100), # Request Timeout
|
||||
struct.pack('>i', 2), # The number of requests
|
||||
])
|
||||
|
||||
total_len = len(msg_a_binary) + len(msg_b_binary)
|
||||
topic1 = "".join([
|
||||
struct.pack('>h6s', 6, 'topic1'), # The topic1
|
||||
struct.pack('>i', 1), # One message set
|
||||
struct.pack('>i', 0), # Partition 0
|
||||
struct.pack('>i', total_len + 24), # Size of the incoming message set
|
||||
struct.pack('>q', 0), # No offset specified
|
||||
struct.pack('>i', len(msg_a_binary)), # Length of message
|
||||
msg_a_binary, # Actual message
|
||||
struct.pack('>q', 0), # No offset specified
|
||||
struct.pack('>i', len(msg_b_binary)), # Length of message
|
||||
msg_b_binary, # Actual message
|
||||
])
|
||||
|
||||
topic2 = "".join([
|
||||
struct.pack('>h6s', 6, 'topic2'), # The topic1
|
||||
struct.pack('>i', 1), # One message set
|
||||
struct.pack('>i', 1), # Partition 1
|
||||
struct.pack('>i', len(msg_c_binary) + 12), # Size of the incoming message set
|
||||
struct.pack('>q', 0), # No offset specified
|
||||
struct.pack('>i', len(msg_c_binary)), # Length of message
|
||||
msg_c_binary, # Actual message
|
||||
])
|
||||
|
||||
expect1 = "".join([
|
||||
header,
|
||||
topic1,
|
||||
topic2
|
||||
])
|
||||
|
||||
expect2 = "".join([
|
||||
header,
|
||||
topic2,
|
||||
topic1
|
||||
])
|
||||
|
||||
encoded = KafkaProtocol.encode_produce_request("client1", 2, requests, 2, 100)
|
||||
self.assertIn(encoded, [ expect1, expect2 ])
|
||||
|
||||
def test_decode_produce_response(self):
|
||||
t1 = "topic1"
|
||||
@@ -314,17 +354,7 @@ class TestProtocol(unittest.TestCase):
|
||||
requests = [FetchRequest("topic1", 0, 10, 1024),
|
||||
FetchRequest("topic2", 1, 20, 100)]
|
||||
|
||||
possibility1 = (
|
||||
'\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07'
|
||||
'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00'
|
||||
'\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00'
|
||||
'\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06'
|
||||
'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00'
|
||||
'\x00\x00\x14\x00\x00\x00d'
|
||||
)
|
||||
|
||||
# Todo, this isn't currently different
|
||||
possibility2 = (
|
||||
expected = (
|
||||
'\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07'
|
||||
'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00'
|
||||
'\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00'
|
||||
@@ -334,7 +364,7 @@ class TestProtocol(unittest.TestCase):
|
||||
)
|
||||
|
||||
encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2, 100)
|
||||
self.assertIn(encoded, [ possibility1, possibility2 ])
|
||||
self.assertEqual(encoded, expected)
|
||||
|
||||
def test_decode_fetch_response(self):
|
||||
t1 = "topic1"
|
||||
@@ -365,15 +395,34 @@ class TestProtocol(unittest.TestCase):
|
||||
self.assertEqual(expanded_responses, expect)
|
||||
|
||||
def test_encode_metadata_request_no_topics(self):
|
||||
expected = "".join([
|
||||
struct.pack(">i", 17), # Total length of the request
|
||||
struct.pack('>h', 3), # API key metadata fetch
|
||||
struct.pack('>h', 0), # API version
|
||||
struct.pack('>i', 4), # Correlation ID
|
||||
struct.pack('>h3s', 3, "cid"), # The client ID
|
||||
struct.pack('>i', 0), # No topics, give all the data!
|
||||
])
|
||||
|
||||
encoded = KafkaProtocol.encode_metadata_request("cid", 4)
|
||||
self.assertEqual(encoded, '\x00\x00\x00\x11\x00\x03\x00\x00\x00\x00'
|
||||
'\x00\x04\x00\x03cid\x00\x00\x00\x00')
|
||||
|
||||
self.assertEqual(encoded, expected)
|
||||
|
||||
def test_encode_metadata_request_with_topics(self):
|
||||
expected = "".join([
|
||||
struct.pack(">i", 25), # Total length of the request
|
||||
struct.pack('>h', 3), # API key metadata fetch
|
||||
struct.pack('>h', 0), # API version
|
||||
struct.pack('>i', 4), # Correlation ID
|
||||
struct.pack('>h3s', 3, "cid"), # The client ID
|
||||
struct.pack('>i', 2), # Number of topics in the request
|
||||
struct.pack('>h2s', 2, "t1"), # Topic "t1"
|
||||
struct.pack('>h2s', 2, "t2"), # Topic "t2"
|
||||
])
|
||||
|
||||
encoded = KafkaProtocol.encode_metadata_request("cid", 4, ["t1", "t2"])
|
||||
self.assertEqual(encoded, '\x00\x00\x00\x19\x00\x03\x00\x00\x00\x00'
|
||||
'\x00\x04\x00\x03cid\x00\x00\x00\x02\x00\x02'
|
||||
't1\x00\x02t2')
|
||||
|
||||
self.assertEqual(encoded, expected)
|
||||
|
||||
def _create_encoded_metadata_response(self, broker_data, topic_data,
|
||||
topic_errors, partition_errors):
|
||||
@@ -408,6 +457,7 @@ class TestProtocol(unittest.TestCase):
|
||||
1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001),
|
||||
3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000)
|
||||
}
|
||||
|
||||
topic_partitions = {
|
||||
"topic1": {
|
||||
0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)),
|
||||
@@ -438,7 +488,6 @@ class TestProtocol(unittest.TestCase):
|
||||
def test_decode_offset_response(self):
|
||||
pass
|
||||
|
||||
|
||||
@unittest.skip("Not Implemented")
|
||||
def test_encode_offset_commit_request(self):
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user