Convert serveral tests to struct.pack
This commit is contained in:
@@ -37,24 +37,24 @@ class TestProtocol(unittest.TestCase):
|
||||
self.assertEqual(msg.key, None)
|
||||
# Need to decode to check since gzipped payload is non-deterministic
|
||||
decoded = gzip_decode(msg.value)
|
||||
expect = (
|
||||
"\x00\x00\x00\x00\x00\x00\x00\x00" # MsgSet1 Offset
|
||||
"\x00\x00\x00\x10" # MsgSet1 Size
|
||||
"\x4c\x9f\x5b\xc2" # Msg1 CRC
|
||||
"\x00" # Msg1 Magic
|
||||
"\x00" # Msg1 Flags
|
||||
"\xff\xff\xff\xff" # Msg1, null key
|
||||
"\x00\x00\x00\x02" # Msg1, msg Size
|
||||
"v1" # Msg1, contents
|
||||
"\x00\x00\x00\x00\x00\x00\x00\x00" # MsgSet2 Offset
|
||||
"\x00\x00\x00\x10" # MsgSet2 Size
|
||||
"\xd5\x96\x0a\x78" # Msg2, CRC
|
||||
"\x00" # Msg2, magic
|
||||
"\x00" # Msg2, flags
|
||||
"\xff\xff\xff\xff" # Msg2, null key
|
||||
"\x00\x00\x00\x02" # Msg2, msg size
|
||||
"v2" # Msg2, contents
|
||||
)
|
||||
expect = "".join([
|
||||
struct.pack(">q", 0), # MsgSet offset
|
||||
struct.pack(">i", 16), # MsgSet size
|
||||
struct.pack(">i", 1285512130), # CRC
|
||||
struct.pack(">bb", 0, 0), # Magic, flags
|
||||
struct.pack(">i", -1), # -1 indicates a null key
|
||||
struct.pack(">i", 2), # Msg length (bytes)
|
||||
"v1", # Message contents
|
||||
|
||||
struct.pack(">q", 0), # MsgSet offset
|
||||
struct.pack(">i", 16), # MsgSet size
|
||||
struct.pack(">i", -711587208), # CRC
|
||||
struct.pack(">bb", 0, 0), # Magic, flags
|
||||
struct.pack(">i", -1), # -1 indicates a null key
|
||||
struct.pack(">i", 2), # Msg length (bytes)
|
||||
"v2", # Message contents
|
||||
])
|
||||
|
||||
self.assertEqual(decoded, expect)
|
||||
|
||||
@unittest.skipUnless(has_snappy(), "Snappy not available")
|
||||
@@ -65,89 +65,101 @@ class TestProtocol(unittest.TestCase):
|
||||
self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
|
||||
KafkaProtocol.CODEC_SNAPPY)
|
||||
self.assertEqual(msg.key, None)
|
||||
expect = (
|
||||
"\x00\x00\x00\x00\x00\x00\x00\x00" # MsgSet1 Offset
|
||||
"\x00\x00\x00\x10" # MsgSet1 Size
|
||||
"\x4c\x9f\x5b\xc2" # Msg1 CRC
|
||||
"\x00" # Msg1 Magic
|
||||
"\x00" # Msg1 Flags
|
||||
"\xff\xff\xff\xff" # Msg1, null key
|
||||
"\x00\x00\x00\x02" # Msg1, msg Size
|
||||
"v1" # Msg1, contents
|
||||
"\x00\x00\x00\x00\x00\x00\x00\x00" # MsgSet2 Offset
|
||||
"\x00\x00\x00\x10" # MsgSet2 Size
|
||||
"\xd5\x96\x0a\x78" # Msg2, CRC
|
||||
"\x00" # Msg2, magic
|
||||
"\x00" # Msg2, flags
|
||||
"\xff\xff\xff\xff" # Msg2, null key
|
||||
"\x00\x00\x00\x02" # Msg2, msg size
|
||||
"v2" # Msg2, contents
|
||||
)
|
||||
expect = "".join([
|
||||
struct.pack(">q", 0), # MsgSet offset
|
||||
struct.pack(">i", 16), # MsgSet size
|
||||
struct.pack(">i", 1285512130), # CRC
|
||||
struct.pack(">bb", 0, 0), # Magic, flags
|
||||
struct.pack(">i", -1), # -1 indicates a null key
|
||||
struct.pack(">i", 2), # Msg length (bytes)
|
||||
"v1", # Message contents
|
||||
|
||||
struct.pack(">q", 0), # MsgSet offset
|
||||
struct.pack(">i", 16), # MsgSet size
|
||||
struct.pack(">i", -711587208), # CRC
|
||||
struct.pack(">bb", 0, 0), # Magic, flags
|
||||
struct.pack(">i", -1), # -1 indicates a null key
|
||||
struct.pack(">i", 2), # Msg length (bytes)
|
||||
"v2", # Message contents
|
||||
])
|
||||
|
||||
self.assertEqual(msg.value, expect)
|
||||
|
||||
def test_encode_message_header(self):
|
||||
expect = (
|
||||
"\x00\n" # API Key
|
||||
"\x00\x00" # API Version
|
||||
"\x00\x00\x00\x04" # CorrelationId
|
||||
"\x00\x07" # Client length
|
||||
"client3" # Client Id
|
||||
)
|
||||
expect = "".join([
|
||||
struct.pack(">h", 10), # API Key
|
||||
struct.pack(">h", 0), # API Version
|
||||
struct.pack(">i", 4), # Correlation Id
|
||||
struct.pack(">h", len("client3")), # Length of clientId
|
||||
"client3", # ClientId
|
||||
])
|
||||
|
||||
encoded = KafkaProtocol._encode_message_header("client3", 4, 10)
|
||||
self.assertEqual(encoded, expect)
|
||||
|
||||
def test_encode_message(self):
|
||||
message = create_message("test", "key")
|
||||
encoded = KafkaProtocol._encode_message(message)
|
||||
expect = (
|
||||
"\xaa\xf1\x8f\x5b" # CRC
|
||||
"\x00" # Magic
|
||||
"\x00" # Flags
|
||||
"\x00\x00\x00\x03" # Key Length
|
||||
"key" # Key contents
|
||||
"\x00\x00\x00\x04" # Msg Length
|
||||
"test" # Msg contents
|
||||
)
|
||||
expect = "".join([
|
||||
struct.pack(">i", -1427009701), # CRC
|
||||
struct.pack(">bb", 0, 0), # Magic, flags
|
||||
struct.pack(">i", 3), # Length of key
|
||||
"key", # key
|
||||
struct.pack(">i", 4), # Length of value
|
||||
"test", # value
|
||||
])
|
||||
|
||||
self.assertEqual(encoded, expect)
|
||||
|
||||
def test_decode_message(self):
|
||||
encoded = "".join([
|
||||
struct.pack(">i", -1427009701), # CRC
|
||||
struct.pack(">bb", 0, 0), # Magic, flags
|
||||
struct.pack(">i", 3), # Length of key
|
||||
"key", # key
|
||||
struct.pack(">i", 4), # Length of value
|
||||
"test", # value
|
||||
])
|
||||
|
||||
offset = 10
|
||||
(returned_offset, decoded_message) = list(KafkaProtocol._decode_message(encoded, offset))[0]
|
||||
|
||||
self.assertEqual(returned_offset, offset)
|
||||
self.assertEqual(decoded_message, create_message("test", "key"))
|
||||
|
||||
def test_encode_message_failure(self):
|
||||
with self.assertRaises(ProtocolError):
|
||||
KafkaProtocol._encode_message(Message(1, 0, "key", "test"))
|
||||
|
||||
def test_encode_message_set(self):
|
||||
message_set = [create_message("v1", "k1"), create_message("v2", "k2")]
|
||||
message_set = [
|
||||
create_message("v1", "k1"),
|
||||
create_message("v2", "k2")
|
||||
]
|
||||
|
||||
encoded = KafkaProtocol._encode_message_set(message_set)
|
||||
expect = (
|
||||
"\x00\x00\x00\x00\x00\x00\x00\x00" # Msgset1, Offset (Meaningless)
|
||||
"\x00\x00\x00\x12" # Msgset1, Msg Size
|
||||
"\x57\xe7\x49\x6e" # Msg1, CRC
|
||||
"\x00" # Msg1, Magic
|
||||
"\x00" # Msg1, Flags
|
||||
"\x00\x00\x00\x02" # Msg1, key size
|
||||
"k1" # Msg1, key
|
||||
"\x00\x00\x00\x02" # Msg1, value size
|
||||
"v1" # Msg1, value
|
||||
"\x00\x00\x00\x00\x00\x00\x00\x00" # Msgset2, Offset (Meaningless)
|
||||
"\x00\x00\x00\x12" # Msgset2, Msg Size
|
||||
"\xff\x06\x02\x49" # Msg2, CRC
|
||||
"\x00" # Msg2, Magic
|
||||
"\x00" # Msg2, flags
|
||||
"\x00\x00\x00\x02" # Msg2, key size
|
||||
"k2" # Msg2, key
|
||||
"\x00\x00\x00\x02" # Msg2, value size
|
||||
"v2" # MSg2, value
|
||||
)
|
||||
expect = "".join([
|
||||
struct.pack(">q", 0), # MsgSet Offset
|
||||
struct.pack(">i", 18), # Msg Size
|
||||
struct.pack(">i", 1474775406), # CRC
|
||||
struct.pack(">bb", 0, 0), # Magic, flags
|
||||
struct.pack(">i", 2), # Length of key
|
||||
"k1", # Key
|
||||
struct.pack(">i", 2), # Length of value
|
||||
"v1", # Value
|
||||
|
||||
struct.pack(">q", 0), # MsgSet Offset
|
||||
struct.pack(">i", 18), # Msg Size
|
||||
struct.pack(">i", -16383415), # CRC
|
||||
struct.pack(">bb", 0, 0), # Magic, flags
|
||||
struct.pack(">i", 2), # Length of key
|
||||
"k2", # Key
|
||||
struct.pack(">i", 2), # Length of value
|
||||
"v2", # Value
|
||||
])
|
||||
|
||||
self.assertEqual(encoded, expect)
|
||||
|
||||
def test_decode_message(self):
|
||||
encoded = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test"
|
||||
offset = 10
|
||||
(returned_offset, decoded_message) = \
|
||||
list(KafkaProtocol._decode_message(encoded, offset))[0]
|
||||
self.assertEqual(returned_offset, offset)
|
||||
self.assertEqual(decoded_message, create_message("test", "key"))
|
||||
|
||||
def test_decode_message_set(self):
|
||||
encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2'
|
||||
'\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00'
|
||||
|
||||
Reference in New Issue
Block a user