Change KafkaProtocol to encode/decode Structs, not bytes

- add Payload to kafka.common Request/Responses namedtuples
  - OffsetFetch and OffsetCommit still need to be converted
This commit is contained in:
Dana Powers
2015-12-09 15:20:57 -08:00
parent 9740b2b88b
commit d0de279459
2 changed files with 109 additions and 249 deletions

View File

@@ -21,24 +21,24 @@ ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
["error", "nodeId", "host", "port"]) ["error", "nodeId", "host", "port"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
ProduceRequest = namedtuple("ProduceRequest", ProduceRequestPayload = namedtuple("ProduceRequestPayload",
["topic", "partition", "messages"]) ["topic", "partition", "messages"])
ProduceResponse = namedtuple("ProduceResponse", ProduceResponsePayload = namedtuple("ProduceResponsePayload",
["topic", "partition", "error", "offset"]) ["topic", "partition", "error", "offset"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI
FetchRequest = namedtuple("FetchRequest", FetchRequestPayload = namedtuple("FetchRequest",
["topic", "partition", "offset", "max_bytes"]) ["topic", "partition", "offset", "max_bytes"])
FetchResponse = namedtuple("FetchResponse", FetchResponsePayload = namedtuple("FetchResponse",
["topic", "partition", "error", "highwaterMark", "messages"]) ["topic", "partition", "error", "highwaterMark", "messages"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
OffsetRequest = namedtuple("OffsetRequest", OffsetRequestPayload = namedtuple("OffsetRequest",
["topic", "partition", "time", "max_offsets"]) ["topic", "partition", "time", "max_offsets"])
OffsetResponse = namedtuple("OffsetResponse", OffsetResponsePayload = namedtuple("OffsetResponse",
["topic", "partition", "error", "offsets"]) ["topic", "partition", "error", "offsets"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI

View File

@@ -7,16 +7,21 @@ import six
from six.moves import xrange from six.moves import xrange
import kafka.common
import kafka.protocol.commit
import kafka.protocol.fetch
import kafka.protocol.message
import kafka.protocol.metadata
import kafka.protocol.offset
import kafka.protocol.produce
from kafka.codec import ( from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode gzip_encode, gzip_decode, snappy_encode, snappy_decode
) )
from kafka.common import ( from kafka.common import (
Message, OffsetAndMessage, TopicAndPartition, OffsetCommitResponse, OffsetFetchResponse,
BrokerMetadata, TopicMetadata, PartitionMetadata, ProtocolError, ChecksumError,
MetadataResponse, ProduceResponse, FetchResponse, UnsupportedCodecError,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProtocolError, BufferUnderflowError, ChecksumError,
ConsumerFetchSizeTooSmall, UnsupportedCodecError,
ConsumerMetadataResponse ConsumerMetadataResponse
) )
from kafka.util import ( from kafka.util import (
@@ -114,41 +119,6 @@ class KafkaProtocol(object):
raise ProtocolError("Unexpected magic number: %d" % message.magic) raise ProtocolError("Unexpected magic number: %d" % message.magic)
return msg return msg
@classmethod
def _decode_message_set_iter(cls, data):
"""
Iteratively decode a MessageSet
Reads repeated elements of (offset, message), calling decode_message
to decode a single message. Since compressed messages contain futher
MessageSets, these two methods have been decoupled so that they may
recurse easily.
"""
cur = 0
read_message = False
while cur < len(data):
try:
((offset, ), cur) = relative_unpack('>q', data, cur)
(msg, cur) = read_int_string(data, cur)
for (offset, message) in KafkaProtocol._decode_message(msg, offset):
read_message = True
yield OffsetAndMessage(offset, message)
except BufferUnderflowError:
# NOTE: Not sure this is correct error handling:
# Is it possible to get a BUE if the message set is somewhere
# in the middle of the fetch response? If so, we probably have
# an issue that's not fetch size too small.
# Aren't we ignoring errors if we fail to unpack data by
# raising StopIteration()?
# If _decode_message() raises a ChecksumError, couldn't that
# also be due to the fetch size being too small?
if read_message is False:
# If we get a partial read of a message, but haven't
# yielded anything there's a problem
raise ConsumerFetchSizeTooSmall()
else:
raise StopIteration()
@classmethod @classmethod
def _decode_message(cls, data, offset): def _decode_message(cls, data, offset):
""" """
@@ -169,7 +139,7 @@ class KafkaProtocol(object):
codec = att & ATTRIBUTE_CODEC_MASK codec = att & ATTRIBUTE_CODEC_MASK
if codec == CODEC_NONE: if codec == CODEC_NONE:
yield (offset, Message(magic, att, key, value)) yield (offset, kafka.common.Message(magic, att, key, value))
elif codec == CODEC_GZIP: elif codec == CODEC_GZIP:
gz = gzip_decode(value) gz = gzip_decode(value)
@@ -186,253 +156,143 @@ class KafkaProtocol(object):
################## ##################
@classmethod @classmethod
def encode_produce_request(cls, client_id, correlation_id, def encode_produce_request(cls, payloads=(), acks=1, timeout=1000):
payloads=None, acks=1, timeout=1000):
""" """
Encode some ProduceRequest structs Encode a ProduceRequest struct
Arguments: Arguments:
client_id: string payloads: list of ProduceRequestPayload
correlation_id: int
payloads: list of ProduceRequest
acks: How "acky" you want the request to be acks: How "acky" you want the request to be
0: immediate response
1: written to disk by the leader 1: written to disk by the leader
2+: waits for this many number of replicas to sync 0: immediate response
-1: waits for all replicas to be in sync -1: waits for all replicas to be in sync
timeout: Maximum time the server will wait for acks from replicas. timeout: Maximum time (in ms) the server will wait for replica acks.
This is _not_ a socket timeout This is _not_ a socket timeout
Returns: ProduceRequest
""" """
payloads = [] if payloads is None else payloads if acks not in (1, 0, -1):
grouped_payloads = group_by_topic_and_partition(payloads) raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
message = [] return kafka.protocol.produce.ProduceRequest(
message.append(cls._encode_message_header(client_id, correlation_id, required_acks=acks,
KafkaProtocol.PRODUCE_KEY)) timeout=timeout,
topics=[(
message.append(struct.pack('>hii', acks, timeout, topic,
len(grouped_payloads))) [(
partition,
for topic, topic_payloads in grouped_payloads.items(): [(0, 0, kafka.protocol.message.Message(msg.value, key=msg.key,
message.append(struct.pack('>h%dsi' % len(topic), len(topic), topic, magic=msg.magic,
len(topic_payloads))) attributes=msg.attributes))
for msg in payload.messages])
for partition, payload in topic_payloads.items(): for partition, payload in topic_payloads.items()])
msg_set = KafkaProtocol._encode_message_set(payload.messages) for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
message.append(struct.pack('>ii%ds' % len(msg_set), partition,
len(msg_set), msg_set))
msg = b''.join(message)
return struct.pack('>i%ds' % len(msg), len(msg), msg)
@classmethod @classmethod
def decode_produce_response(cls, data): def decode_produce_response(cls, response):
""" """
Decode bytes to a ProduceResponse Decode ProduceResponse to ProduceResponsePayload
Arguments: Arguments:
data: bytes to decode response: ProduceResponse
Return: list of ProduceResponsePayload
""" """
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) return [
kafka.common.ProduceResponsePayload(topic, partition, error, offset)
for _ in range(num_topics): for topic, partitions in response.topics
((strlen,), cur) = relative_unpack('>h', data, cur) for partition, error, offset in partitions
topic = data[cur:cur + strlen] ]
cur += strlen
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for _ in range(num_partitions):
((partition, error, offset), cur) = relative_unpack('>ihq',
data, cur)
yield ProduceResponse(topic, partition, error, offset)
@classmethod @classmethod
def encode_fetch_request(cls, client_id, correlation_id, payloads=None, def encode_fetch_request(cls, payloads=(), max_wait_time=100, min_bytes=4096):
max_wait_time=100, min_bytes=4096):
""" """
Encodes some FetchRequest structs Encodes a FetchRequest struct
Arguments: Arguments:
client_id: string payloads: list of FetchRequestPayload
correlation_id: int max_wait_time (int, optional): ms to block waiting for min_bytes
payloads: list of FetchRequest data. Defaults to 100.
max_wait_time: int, how long to block waiting on min_bytes of data min_bytes (int, optional): minimum bytes required to return before
min_bytes: int, the minimum number of bytes to accumulate before max_wait_time. Defaults to 4096.
returning the response
Return: FetchRequest
""" """
return kafka.protocol.fetch.FetchRequest(
payloads = [] if payloads is None else payloads replica_id=-1,
grouped_payloads = group_by_topic_and_partition(payloads) max_wait_time=max_wait_time,
min_bytes=min_bytes,
message = [] topics=[(
message.append(cls._encode_message_header(client_id, correlation_id, topic,
KafkaProtocol.FETCH_KEY)) [(
partition,
# -1 is the replica id payload.offset,
message.append(struct.pack('>iiii', -1, max_wait_time, min_bytes, payload.max_bytes)
len(grouped_payloads))) for partition, payload in topic_payloads.items()])
for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
for topic, topic_payloads in grouped_payloads.items():
message.append(write_short_string(topic))
message.append(struct.pack('>i', len(topic_payloads)))
for partition, payload in topic_payloads.items():
message.append(struct.pack('>iqi', partition, payload.offset,
payload.max_bytes))
msg = b''.join(message)
return struct.pack('>i%ds' % len(msg), len(msg), msg)
@classmethod @classmethod
def decode_fetch_response(cls, data): def decode_fetch_response(cls, response):
""" """
Decode bytes to a FetchResponse Decode FetchResponse struct to FetchResponsePayloads
Arguments: Arguments:
data: bytes to decode response: FetchResponse
""" """
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) return [
kafka.common.FetchResponsePayload(
for _ in range(num_topics): topic, partition, error, highwater_offset, [
(topic, cur) = read_short_string(data, cur) kafka.common.OffsetAndMessage(offset, message)
((num_partitions,), cur) = relative_unpack('>i', data, cur) for offset, _, message in messages])
for topic, partitions in response.topics
for j in range(num_partitions): for partition, error, highwater_offset, messages in partitions
((partition, error, highwater_mark_offset), cur) = \ ]
relative_unpack('>ihq', data, cur)
(message_set, cur) = read_int_string(data, cur)
yield FetchResponse(
topic, partition, error,
highwater_mark_offset,
KafkaProtocol._decode_message_set_iter(message_set))
@classmethod @classmethod
def encode_offset_request(cls, client_id, correlation_id, payloads=None): def encode_offset_request(cls, payloads=()):
payloads = [] if payloads is None else payloads return kafka.protocol.offset.OffsetRequest(
grouped_payloads = group_by_topic_and_partition(payloads) replica_id=-1,
topics=[(
message = [] topic,
message.append(cls._encode_message_header(client_id, correlation_id, [(
KafkaProtocol.OFFSET_KEY)) partition,
payload.time,
# -1 is the replica id payload.max_offsets)
message.append(struct.pack('>ii', -1, len(grouped_payloads))) for partition, payload in six.iteritems(topic_payloads)])
for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
for topic, topic_payloads in grouped_payloads.items():
message.append(write_short_string(topic))
message.append(struct.pack('>i', len(topic_payloads)))
for partition, payload in topic_payloads.items():
message.append(struct.pack('>iqi', partition, payload.time,
payload.max_offsets))
msg = b''.join(message)
return struct.pack('>i%ds' % len(msg), len(msg), msg)
@classmethod @classmethod
def decode_offset_response(cls, data): def decode_offset_response(cls, response):
""" """
Decode bytes to an OffsetResponse Decode OffsetResponse into OffsetResponsePayloads
Arguments: Arguments:
data: bytes to decode response: OffsetResponse
Returns: list of OffsetResponsePayloads
""" """
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) return [
kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets))
for _ in range(num_topics): for topic, partitions in response.topics
(topic, cur) = read_short_string(data, cur) for partition, error, offsets in partitions
((num_partitions,), cur) = relative_unpack('>i', data, cur) ]
for _ in range(num_partitions):
((partition, error, num_offsets,), cur) = \
relative_unpack('>ihi', data, cur)
offsets = []
for k in range(num_offsets):
((offset,), cur) = relative_unpack('>q', data, cur)
offsets.append(offset)
yield OffsetResponse(topic, partition, error, tuple(offsets))
@classmethod @classmethod
def encode_metadata_request(cls, client_id, correlation_id, topics=None, def encode_metadata_request(cls, topics=(), payloads=None):
payloads=None):
""" """
Encode a MetadataRequest Encode a MetadataRequest
Arguments: Arguments:
client_id: string
correlation_id: int
topics: list of strings topics: list of strings
""" """
if payloads is None: if payloads is not None:
topics = [] if topics is None else topics
else:
topics = payloads topics = payloads
message = [] return kafka.protocol.metadata.MetadataRequest(topics)
message.append(cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.METADATA_KEY))
message.append(struct.pack('>i', len(topics)))
for topic in topics:
message.append(struct.pack('>h%ds' % len(topic), len(topic), topic))
msg = b''.join(message)
return write_int_string(msg)
@classmethod @classmethod
def decode_metadata_response(cls, data): def decode_metadata_response(cls, response):
""" return response
Decode bytes to a MetadataResponse
Arguments:
data: bytes to decode
"""
((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
# Broker info
brokers = []
for _ in range(numbrokers):
((nodeId, ), cur) = relative_unpack('>i', data, cur)
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)
brokers.append(BrokerMetadata(nodeId, host, port))
# Topic info
((num_topics,), cur) = relative_unpack('>i', data, cur)
topic_metadata = []
for _ in range(num_topics):
((topic_error,), cur) = relative_unpack('>h', data, cur)
(topic_name, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
partition_metadata = []
for _ in range(num_partitions):
((partition_error_code, partition, leader, numReplicas), cur) = \
relative_unpack('>hiii', data, cur)
(replicas, cur) = relative_unpack(
'>%di' % numReplicas, data, cur)
((num_isr,), cur) = relative_unpack('>i', data, cur)
(isr, cur) = relative_unpack('>%di' % num_isr, data, cur)
partition_metadata.append(
PartitionMetadata(topic_name, partition, leader,
replicas, isr, partition_error_code)
)
topic_metadata.append(
TopicMetadata(topic_name, topic_error, partition_metadata)
)
return MetadataResponse(brokers, topic_metadata)
@classmethod @classmethod
def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads): def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
@@ -587,7 +447,7 @@ def create_message(payload, key=None):
key: bytes, a key used for partition routing (optional) key: bytes, a key used for partition routing (optional)
""" """
return Message(0, 0, key, payload) return kafka.common.Message(0, 0, key, payload)
def create_gzip_message(payloads, key=None, compresslevel=None): def create_gzip_message(payloads, key=None, compresslevel=None):
@@ -608,7 +468,7 @@ def create_gzip_message(payloads, key=None, compresslevel=None):
gzipped = gzip_encode(message_set, compresslevel=compresslevel) gzipped = gzip_encode(message_set, compresslevel=compresslevel)
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
return Message(0, 0x00 | codec, key, gzipped) return kafka.common.Message(0, 0x00 | codec, key, gzipped)
def create_snappy_message(payloads, key=None): def create_snappy_message(payloads, key=None):
@@ -629,7 +489,7 @@ def create_snappy_message(payloads, key=None):
snapped = snappy_encode(message_set) snapped = snappy_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
return Message(0, 0x00 | codec, key, snapped) return kafka.common.Message(0, 0x00 | codec, key, snapped)
def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None): def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):