Return PartialMessage object in MessageSet.decode if message is truncated by max_bytes
This commit is contained in:
@@ -42,6 +42,11 @@ class Message(Struct):
|
|||||||
magic=fields[1], attributes=fields[2], crc=fields[0])
|
magic=fields[1], attributes=fields[2], crc=fields[0])
|
||||||
|
|
||||||
|
|
||||||
|
class PartialMessage(bytes):
|
||||||
|
def __repr__(self):
|
||||||
|
return 'PartialMessage(%s)' % self
|
||||||
|
|
||||||
|
|
||||||
class MessageSet(AbstractType):
|
class MessageSet(AbstractType):
|
||||||
ITEM = Schema(
|
ITEM = Schema(
|
||||||
('offset', Int64),
|
('offset', Int64),
|
||||||
@@ -72,8 +77,9 @@ class MessageSet(AbstractType):
|
|||||||
bytes_to_read = Int32.decode(data)
|
bytes_to_read = Int32.decode(data)
|
||||||
items = []
|
items = []
|
||||||
|
|
||||||
# We need at least 12 bytes to read offset + message size
|
# We need at least 8 + 4 + 14 bytes to read offset + message size + message
|
||||||
while bytes_to_read >= 12:
|
# (14 bytes is a message w/ null key and null value)
|
||||||
|
while bytes_to_read >= 26:
|
||||||
offset = Int64.decode(data)
|
offset = Int64.decode(data)
|
||||||
bytes_to_read -= 8
|
bytes_to_read -= 8
|
||||||
|
|
||||||
@@ -91,8 +97,9 @@ class MessageSet(AbstractType):
|
|||||||
items.append((offset, message_size, message))
|
items.append((offset, message_size, message))
|
||||||
|
|
||||||
# If any bytes are left over, clear them from the buffer
|
# If any bytes are left over, clear them from the buffer
|
||||||
|
# and append a PartialMessage to signal that max_bytes may be too small
|
||||||
if bytes_to_read:
|
if bytes_to_read:
|
||||||
data.read(bytes_to_read)
|
items.append((None, None, PartialMessage(data.read(bytes_to_read))))
|
||||||
|
|
||||||
return items
|
return items
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user