Starting on unit tests
Updated several methods in KafkaClient to be classmethods. Updated some inline documentation also
This commit is contained in:
@@ -3,7 +3,7 @@ import logging
|
||||
from kafka import KafkaClient, FetchRequest, ProduceRequest
|
||||
|
||||
def produce_example(kafka):
|
||||
message = kafka.create_message_from_string("testing")
|
||||
message = kafka.create_message("testing")
|
||||
request = ProduceRequest("my-topic", 0, [message])
|
||||
kafka.send_message_set(request)
|
||||
|
||||
@@ -15,7 +15,7 @@ def consume_example(kafka):
|
||||
print(nextRequest)
|
||||
|
||||
def produce_gz_example(kafka):
|
||||
message = kafka.create_gzipped_message("this message was gzipped", "along with this one")
|
||||
message = kafka.create_gzip_message("this message was gzipped", "along with this one")
|
||||
request = ProduceRequest("my-topic", 0, [message])
|
||||
kafka.send_message_set(request)
|
||||
|
||||
|
||||
148
kafka.py
148
kafka.py
@@ -31,9 +31,9 @@ FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size
|
||||
ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"])
|
||||
OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"])
|
||||
|
||||
def gzip_compress(payload):
|
||||
def gzip_encode(payload):
|
||||
buf = StringIO()
|
||||
f = gzip.GzipFile(fileobj=buf, mode='w')
|
||||
f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
|
||||
f.write(payload)
|
||||
f.close()
|
||||
buf.seek(0)
|
||||
@@ -41,7 +41,7 @@ def gzip_compress(payload):
|
||||
buf.close()
|
||||
return out
|
||||
|
||||
def gzip_decompress(payload):
|
||||
def gzip_decode(payload):
|
||||
buf = StringIO(payload)
|
||||
f = gzip.GzipFile(fileobj=buf, mode='r')
|
||||
out = f.read()
|
||||
@@ -49,6 +49,7 @@ def gzip_decompress(payload):
|
||||
buf.close()
|
||||
return out
|
||||
|
||||
|
||||
def length_prefix_message(msg):
|
||||
"""
|
||||
Prefix a message with it's length as an int
|
||||
@@ -84,9 +85,10 @@ class KafkaClient(object):
|
||||
|
||||
ATTRIBUTE_CODEC_MASK = 0x03
|
||||
|
||||
def __init__(self, host, port):
|
||||
def __init__(self, host, port, bufsize=1024):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.bufsize = bufsize
|
||||
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._sock.connect((host, port))
|
||||
self._sock.settimeout(10)
|
||||
@@ -117,7 +119,7 @@ class KafkaClient(object):
|
||||
# Response iterator
|
||||
total = 0
|
||||
while total < (size-2):
|
||||
resp = self._sock.recv(1024)
|
||||
resp = self._sock.recv(self.bufsize)
|
||||
log.debug("Read %d bytes from Kafka", len(resp))
|
||||
if resp == "":
|
||||
raise Exception("Underflow")
|
||||
@@ -133,7 +135,8 @@ class KafkaClient(object):
|
||||
data += chunk
|
||||
return data
|
||||
|
||||
def encode_message(self, message):
|
||||
@classmethod
|
||||
def encode_message(cls, message):
|
||||
"""
|
||||
Encode a Message from a Message tuple
|
||||
|
||||
@@ -163,20 +166,26 @@ class KafkaClient(object):
|
||||
msg = struct.pack('>BBi%ds' % len(message.payload),
|
||||
message.magic, message.attributes, message.crc, message.payload)
|
||||
else:
|
||||
raise Exception("Unknown message version: %d" % message.magic)
|
||||
raise Exception("Unexpected magic number: %d" % message.magic)
|
||||
msg = length_prefix_message(msg)
|
||||
log.debug("Encoded %s as %r" % (message, msg))
|
||||
return msg
|
||||
|
||||
def encode_message_set(self, messages):
|
||||
# TODO document
|
||||
@classmethod
|
||||
def encode_message_set(cls, messages):
|
||||
"""
|
||||
Encode a MessageSet
|
||||
|
||||
One or more concatenated Messages
|
||||
"""
|
||||
message_set = ""
|
||||
for message in messages:
|
||||
encoded_message = self.encode_message(message)
|
||||
encoded_message = cls.encode_message(message)
|
||||
message_set += encoded_message
|
||||
return message_set
|
||||
|
||||
def encode_produce_request(self, produceRequest):
|
||||
@classmethod
|
||||
def encode_produce_request(cls, produceRequest):
|
||||
"""
|
||||
Encode a ProduceRequest
|
||||
|
||||
@@ -198,16 +207,41 @@ class KafkaClient(object):
|
||||
KafkaClient.PRODUCE_KEY, len(topic), topic, partition, len(message_set), message_set)
|
||||
return req
|
||||
|
||||
def encode_multi_produce_request(self, produceRequests):
|
||||
# TODO document
|
||||
@classmethod
|
||||
def encode_multi_produce_request(cls, produceRequests):
|
||||
"""
|
||||
Encode a MultiProducerRequest
|
||||
|
||||
Params
|
||||
======
|
||||
produceRequest: list of ProduceRequest objects
|
||||
|
||||
Returns
|
||||
=======
|
||||
Encoded request
|
||||
|
||||
Wire Format
|
||||
===========
|
||||
<MultiProducerReqeust> ::= <request-key> <num> <ProduceRequests>
|
||||
<num> ::= <int16>
|
||||
<ProduceRequests> ::= <ProduceRequest> [ <ProduceRequests> ]
|
||||
<ProduceRequest> ::= <topic> <partition> <len> <MessageSet>
|
||||
<topic> ::= <topic-length><string>
|
||||
<topic-length> ::= <int16>
|
||||
<partition> ::= <int32>
|
||||
<len> ::= <int32>
|
||||
|
||||
num is the number of ProduceRequests being encoded
|
||||
"""
|
||||
req = struct.pack('>HH', KafkaClient.MULTIPRODUCE_KEY, len(produceRequests))
|
||||
for (topic, partition, messages) in produceRequests:
|
||||
message_set = self.encode_message_set(messages)
|
||||
message_set = cls.encode_message_set(messages)
|
||||
req += struct.pack('>H%dsii%ds' % (len(topic), len(message_set)),
|
||||
len(topic), topic, partition, len(message_set), message_set)
|
||||
return req
|
||||
|
||||
def encode_fetch_request(self, fetchRequest):
|
||||
@classmethod
|
||||
def encode_fetch_request(cls, fetchRequest):
|
||||
"""
|
||||
Encode a FetchRequest message
|
||||
|
||||
@@ -228,7 +262,8 @@ class KafkaClient(object):
|
||||
KafkaClient.FETCH_KEY, len(topic), topic, partition, offset, size)
|
||||
return req
|
||||
|
||||
def encode_multi_fetch_request(self, fetchRequests):
|
||||
@classmethod
|
||||
def encode_multi_fetch_request(cls, fetchRequests):
|
||||
"""
|
||||
Encode the MultiFetchRequest message from a list of FetchRequest objects
|
||||
|
||||
@@ -260,7 +295,8 @@ class KafkaClient(object):
|
||||
req += struct.pack('>H%dsiqi' % len(topic), len(topic), topic, partition, offset, size)
|
||||
return req
|
||||
|
||||
def encode_offset_request(self, offsetRequest):
|
||||
@classmethod
|
||||
def encode_offset_request(cls, offsetRequest):
|
||||
"""
|
||||
Encode an OffsetRequest message
|
||||
|
||||
@@ -281,43 +317,57 @@ class KafkaClient(object):
|
||||
req = struct.pack('>HH%dsiqi' % len(topic), KafkaClient.OFFSET_KEY, len(topic), topic, partition, offset, maxOffsets)
|
||||
return req
|
||||
|
||||
def decode_message(self, data):
|
||||
@classmethod
|
||||
def decode_message(cls, data):
|
||||
"""
|
||||
Decode a Message
|
||||
|
||||
Since a Message can actually contained a compressed payload of multiple nested Messages,
|
||||
this method returns a generator.
|
||||
Verify crc and decode the Message. A compressed Message's payload is actually
|
||||
an encoded MessageSet. This allows Messages to be nested within Messages and
|
||||
as such, this method will recurse.
|
||||
|
||||
Params
|
||||
======
|
||||
data, bytes
|
||||
|
||||
Returns
|
||||
=======
|
||||
Generator of Messages (depth-first)
|
||||
"""
|
||||
# TODO document
|
||||
N = len(data)
|
||||
(magic,) = struct.unpack('>B', data[0:1])
|
||||
if magic == 0: # v0 Message
|
||||
# Read crc; check the crc; append the message
|
||||
if magic == 0:
|
||||
# version 0
|
||||
(crc,) = struct.unpack('>i', data[1:5])
|
||||
payload = data[5:N]
|
||||
assert zlib.crc32(payload) == crc
|
||||
msg = Message(magic, None, crc, payload)
|
||||
log.debug("Got v0 Message, %s", msg)
|
||||
yield msg
|
||||
elif magic == 1: # v1 Message
|
||||
# Read attributes, crc; check the crc; append the message
|
||||
elif magic == 1:
|
||||
# version 1
|
||||
(att, crc) = struct.unpack('>Bi', data[1:6])
|
||||
payload = data[6:N]
|
||||
assert zlib.crc32(payload) == crc
|
||||
# Uncompressed, just a single Message
|
||||
if att & KafkaClient.ATTRIBUTE_CODEC_MASK == 0:
|
||||
# Uncompressed, just a single Message
|
||||
msg = Message(magic, att, crc, payload)
|
||||
log.debug("Got v1 Message, %s", msg)
|
||||
yield msg
|
||||
elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 1:
|
||||
gz = gzip_decompress(payload)
|
||||
(msgs, _) = self.read_message_set(gz)
|
||||
# Gzip encoded Message
|
||||
gz = gzip_decode(payload)
|
||||
(msgs, _) = cls.read_message_set(gz)
|
||||
for msg in msgs:
|
||||
yield msg
|
||||
elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 2:
|
||||
# Snappy encoded Message
|
||||
raise NotImplementedError("Snappy codec is not yet supported")
|
||||
else:
|
||||
raise RuntimeError("Unsupported compression type: %d" % (att & KafkaClient.ATTRIBUTE_CODEC_MASK))
|
||||
|
||||
def read_message_set(self, data):
|
||||
@classmethod
|
||||
def read_message_set(cls, data):
|
||||
"""
|
||||
Read a MessageSet
|
||||
|
||||
@@ -363,7 +413,7 @@ class KafkaClient(object):
|
||||
cur += 4
|
||||
|
||||
# Decode the message(s)
|
||||
for m in self.decode_message(data[cur:cur+N]):
|
||||
for m in cls.decode_message(data[cur:cur+N]):
|
||||
msgs.append(m)
|
||||
|
||||
# Advance the cursor
|
||||
@@ -376,15 +426,37 @@ class KafkaClient(object):
|
||||
# Advanced User API #
|
||||
#########################
|
||||
|
||||
def create_message_from_string(self, payload):
|
||||
#TODO document
|
||||
@classmethod
|
||||
def create_message(cls, payload):
|
||||
"""
|
||||
Create a standard Message
|
||||
|
||||
Params
|
||||
======
|
||||
payload, bytes
|
||||
|
||||
Returns
|
||||
=======
|
||||
A Message tuple
|
||||
"""
|
||||
return Message(1, 0, zlib.crc32(payload), payload)
|
||||
|
||||
def create_gzipped_message(self, *payloads):
|
||||
#TODO document
|
||||
messages = [self.create_message_from_string(payload) for payload in payloads]
|
||||
message_set = self.encode_message_set(messages)
|
||||
gzipped = gzip_compress(message_set)
|
||||
@classmethod
|
||||
def create_gzip_message(cls, *payloads):
|
||||
"""
|
||||
Create a Gzip encoded Message
|
||||
|
||||
Params
|
||||
======
|
||||
payloads, list of messages (bytes) to be encoded
|
||||
|
||||
Returns
|
||||
=======
|
||||
A Message tuple
|
||||
"""
|
||||
messages = [cls.create_message(payload) for payload in payloads]
|
||||
message_set = cls.encode_message_set(messages)
|
||||
gzipped = gzip_encode(message_set)
|
||||
return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x01), zlib.crc32(gzipped), gzipped)
|
||||
|
||||
def send_message_set(self, produceRequest):
|
||||
@@ -522,7 +594,7 @@ class KafkaClient(object):
|
||||
partition: int
|
||||
payloads: strings
|
||||
"""
|
||||
messages = tuple([create_message_from_string(payload) for payload in payloads])
|
||||
messages = tuple([create_message(payload) for payload in payloads])
|
||||
self.send_message_set(ProduceRequest(topic, partition, messages))
|
||||
|
||||
def iter_messages(self, topic, partition, offset, size, auto=True):
|
||||
|
||||
44
test.py
Normal file
44
test.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import binascii
|
||||
import unittest
|
||||
|
||||
from kafka import KafkaClient
|
||||
|
||||
class TestMessage(unittest.TestCase):
|
||||
def test_message_simple(self):
|
||||
msg = KafkaClient.create_message("testing")
|
||||
enc = KafkaClient.encode_message(msg)
|
||||
expect = "\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing"
|
||||
self.assertEquals(enc, expect)
|
||||
(messages, read) = KafkaClient.read_message_set(enc)
|
||||
self.assertEquals(len(messages), 1)
|
||||
self.assertEquals(messages[0], msg)
|
||||
|
||||
def test_message_list(self):
|
||||
msgs = [
|
||||
KafkaClient.create_message("one"),
|
||||
KafkaClient.create_message("two"),
|
||||
KafkaClient.create_message("three")
|
||||
]
|
||||
enc = KafkaClient.encode_message_set(msgs)
|
||||
expect = ("\x00\x00\x00\t\x01\x00zl\x86\xf1one\x00\x00\x00\t\x01\x00\x11"
|
||||
"\xca\x8aftwo\x00\x00\x00\x0b\x01\x00F\xc5\xd8\xf5three")
|
||||
self.assertEquals(enc, expect)
|
||||
(messages, read) = KafkaClient.read_message_set(enc)
|
||||
self.assertEquals(len(messages), 3)
|
||||
self.assertEquals(messages[0].payload, "one")
|
||||
self.assertEquals(messages[1].payload, "two")
|
||||
self.assertEquals(messages[2].payload, "three")
|
||||
|
||||
|
||||
def test_message_gzip(self):
|
||||
msg = KafkaClient.create_gzip_message("one", "two", "three")
|
||||
enc = KafkaClient.encode_message(msg)
|
||||
# Can't check the bytes directly since Gzip is non-deterministic
|
||||
(messages, read) = KafkaClient.read_message_set(enc)
|
||||
self.assertEquals(len(messages), 3)
|
||||
self.assertEquals(messages[0].payload, "one")
|
||||
self.assertEquals(messages[1].payload, "two")
|
||||
self.assertEquals(messages[2].payload, "three")
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user