Update kafka.common imports to Payloads namedtuples in test_protocol
This commit is contained in:
@@ -7,10 +7,10 @@ from . import unittest
|
||||
|
||||
from kafka.codec import has_snappy, gzip_decode, snappy_decode
|
||||
from kafka.common import (
|
||||
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
|
||||
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
|
||||
ProduceRequest, FetchRequest, Message, ChecksumError,
|
||||
ProduceResponse, FetchResponse, OffsetAndMessage,
|
||||
OffsetRequestPayload, OffsetCommitRequest, OffsetFetchRequest,
|
||||
OffsetResponsePayload, OffsetCommitResponse, OffsetFetchResponse,
|
||||
ProduceRequestPayload, FetchRequestPayload, Message, ChecksumError,
|
||||
ProduceResponsePayload, FetchResponsePayload, OffsetAndMessage,
|
||||
BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
|
||||
KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
|
||||
ProtocolError, ConsumerMetadataResponse
|
||||
@@ -335,28 +335,30 @@ class TestProtocol(unittest.TestCase):
|
||||
b"@1$%(Y!", # Random padding
|
||||
])
|
||||
|
||||
msgs = list(KafkaProtocol._decode_message_set_iter(encoded))
|
||||
msgs = MessageSet.decode(io.BytesIO(encoded))
|
||||
self.assertEqual(len(msgs), 2)
|
||||
msg1, msg2 = msgs
|
||||
|
||||
returned_offset1, decoded_message1 = msg1
|
||||
returned_offset2, decoded_message2 = msg2
|
||||
returned_offset1, msg_size1, decoded_message1 = msg1
|
||||
returned_offset2, msg_size2, decoded_message2 = msg2
|
||||
|
||||
self.assertEqual(returned_offset1, 0)
|
||||
self.assertEqual(decoded_message1, create_message(b"v1", b"k1"))
|
||||
self.assertEqual(decoded_message1.value, b"v1")
|
||||
self.assertEqual(decoded_message1.key, b"k1")
|
||||
|
||||
self.assertEqual(returned_offset2, 1)
|
||||
self.assertEqual(decoded_message2, create_message(b"v2", b"k2"))
|
||||
self.assertEqual(decoded_message2.value, b"v2")
|
||||
self.assertEqual(decoded_message2.key, b"k2")
|
||||
|
||||
@unittest.skip('needs updating for new protocol classes')
|
||||
def test_encode_produce_request(self):
|
||||
requests = [
|
||||
ProduceRequest(b"topic1", 0, [
|
||||
create_message(b"a"),
|
||||
create_message(b"b")
|
||||
ProduceRequestPayload("topic1", 0, [
|
||||
kafka.protocol.message.Message(b"a"),
|
||||
kafka.protocol.message.Message(b"b")
|
||||
]),
|
||||
ProduceRequest(b"topic2", 1, [
|
||||
create_message(b"c")
|
||||
ProduceRequestPayload("topic2", 1, [
|
||||
kafka.protocol.message.Message(b"c")
|
||||
])
|
||||
]
|
||||
|
||||
@@ -480,15 +482,15 @@ class TestProtocol(unittest.TestCase):
|
||||
|
||||
responses = list(KafkaProtocol.decode_fetch_response(encoded))
|
||||
def expand_messages(response):
|
||||
return FetchResponse(response.topic, response.partition,
|
||||
return FetchResponsePayload(response.topic, response.partition,
|
||||
response.error, response.highwaterMark,
|
||||
list(response.messages))
|
||||
|
||||
expanded_responses = list(map(expand_messages, responses))
|
||||
expect = [FetchResponse(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]),
|
||||
expect = [FetchResponsePayload(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]),
|
||||
OffsetAndMessage(0, msgs[1])]),
|
||||
FetchResponse(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]),
|
||||
FetchResponse(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]),
|
||||
FetchResponsePayload(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]),
|
||||
FetchResponsePayload(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]),
|
||||
OffsetAndMessage(0, msgs[4])])]
|
||||
self.assertEqual(expanded_responses, expect)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user