Explicit testing of protocol errors. Make tests more explicit, and start working on intermittent failures in test_encode_fetch_request and test_encode_produc_request
This commit is contained in:
@@ -118,3 +118,7 @@ class ConsumerFetchSizeTooSmall(KafkaError):
|
|||||||
|
|
||||||
class ConsumerNoMoreData(KafkaError):
|
class ConsumerNoMoreData(KafkaError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ProtocolError(KafkaError):
|
||||||
|
pass
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ from kafka.codec import (
|
|||||||
from kafka.common import (
|
from kafka.common import (
|
||||||
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
|
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
|
||||||
ProduceResponse, FetchResponse, OffsetResponse,
|
ProduceResponse, FetchResponse, OffsetResponse,
|
||||||
OffsetCommitResponse, OffsetFetchResponse,
|
OffsetCommitResponse, OffsetFetchResponse, ProtocolError,
|
||||||
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall
|
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall
|
||||||
)
|
)
|
||||||
from kafka.util import (
|
from kafka.util import (
|
||||||
@@ -68,8 +68,7 @@ class KafkaProtocol(object):
|
|||||||
message_set = ""
|
message_set = ""
|
||||||
for message in messages:
|
for message in messages:
|
||||||
encoded_message = KafkaProtocol._encode_message(message)
|
encoded_message = KafkaProtocol._encode_message(message)
|
||||||
message_set += struct.pack('>qi%ds' % len(encoded_message), 0,
|
message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message)
|
||||||
len(encoded_message), encoded_message)
|
|
||||||
return message_set
|
return message_set
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@@ -96,7 +95,7 @@ class KafkaProtocol(object):
|
|||||||
crc = zlib.crc32(msg)
|
crc = zlib.crc32(msg)
|
||||||
msg = struct.pack('>i%ds' % len(msg), crc, msg)
|
msg = struct.pack('>i%ds' % len(msg), crc, msg)
|
||||||
else:
|
else:
|
||||||
raise Exception("Unexpected magic number: %d" % message.magic)
|
raise ProtocolError("Unexpected magic number: %d" % message.magic)
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from kafka.common import (
|
|||||||
ProduceRequest, FetchRequest, Message, ChecksumError,
|
ProduceRequest, FetchRequest, Message, ChecksumError,
|
||||||
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
|
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
|
||||||
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
|
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
|
||||||
TopicAndPartition, KafkaUnavailableError,
|
TopicAndPartition, KafkaUnavailableError, ProtocolError,
|
||||||
LeaderUnavailableError, PartitionUnavailableError
|
LeaderUnavailableError, PartitionUnavailableError
|
||||||
)
|
)
|
||||||
from kafka.codec import (
|
from kafka.codec import (
|
||||||
@@ -93,12 +93,20 @@ class TestProtocol(unittest.TestCase):
|
|||||||
def test_encode_message(self):
|
def test_encode_message(self):
|
||||||
message = create_message("test", "key")
|
message = create_message("test", "key")
|
||||||
encoded = KafkaProtocol._encode_message(message)
|
encoded = KafkaProtocol._encode_message(message)
|
||||||
expect = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test"
|
expect = (
|
||||||
|
"\xaa\xf1\x8f\x5b" # CRC
|
||||||
|
"\x00" # Magic
|
||||||
|
"\x00" # Flags
|
||||||
|
"\x00\x00\x00\x03" # Key Length
|
||||||
|
"key" # Key contents
|
||||||
|
"\x00\x00\x00\x04" # Msg Length
|
||||||
|
"test" # Msg contents
|
||||||
|
)
|
||||||
self.assertEqual(encoded, expect)
|
self.assertEqual(encoded, expect)
|
||||||
|
|
||||||
def test_encode_message_failure(self):
|
def test_encode_message_failure(self):
|
||||||
self.assertRaises(Exception, KafkaProtocol._encode_message,
|
with self.assertRaises(ProtocolError):
|
||||||
Message(1, 0, "key", "test"))
|
KafkaProtocol._encode_message(Message(1, 0, "key", "test"))
|
||||||
|
|
||||||
def test_encode_message_set(self):
|
def test_encode_message_set(self):
|
||||||
message_set = [create_message("v1", "k1"), create_message("v2", "k2")]
|
message_set = [create_message("v1", "k1"), create_message("v2", "k2")]
|
||||||
@@ -224,15 +232,28 @@ class TestProtocol(unittest.TestCase):
|
|||||||
def test_encode_fetch_request(self):
|
def test_encode_fetch_request(self):
|
||||||
requests = [FetchRequest("topic1", 0, 10, 1024),
|
requests = [FetchRequest("topic1", 0, 10, 1024),
|
||||||
FetchRequest("topic2", 1, 20, 100)]
|
FetchRequest("topic2", 1, 20, 100)]
|
||||||
expect = ('\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07'
|
|
||||||
'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00'
|
possibility1 = (
|
||||||
'\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00'
|
'\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07'
|
||||||
'\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06'
|
'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00'
|
||||||
'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00'
|
'\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00'
|
||||||
'\x00\x00\x14\x00\x00\x00d')
|
'\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06'
|
||||||
encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2,
|
'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00'
|
||||||
100)
|
'\x00\x00\x14\x00\x00\x00d'
|
||||||
self.assertEqual(encoded, expect)
|
)
|
||||||
|
|
||||||
|
# Todo, this isn't currently different
|
||||||
|
possibility2 = (
|
||||||
|
'\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07'
|
||||||
|
'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00'
|
||||||
|
'\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00'
|
||||||
|
'\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06'
|
||||||
|
'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00'
|
||||||
|
'\x00\x00\x14\x00\x00\x00d'
|
||||||
|
)
|
||||||
|
|
||||||
|
encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2, 100)
|
||||||
|
self.assertIn(encoded, [ possibility1, possibility2 ])
|
||||||
|
|
||||||
def test_decode_fetch_response(self):
|
def test_decode_fetch_response(self):
|
||||||
t1 = "topic1"
|
t1 = "topic1"
|
||||||
|
|||||||
Reference in New Issue
Block a user