Big code re-org

This commit is contained in:
David Arthur
2013-03-30 00:28:00 -04:00
parent 3499e2f6ea
commit b6d98c07b4
11 changed files with 868 additions and 673 deletions

View File

@@ -1,6 +1,7 @@
import logging import logging
from kafka.client import KafkaClient, FetchRequest, ProduceRequest from kafka.client import KafkaClient, FetchRequest, ProduceRequest
from kafka.consumer import SimpleConsumer
def produce_example(kafka): def produce_example(kafka):
message = kafka.create_message("testing") message = kafka.create_message("testing")
@@ -20,11 +21,8 @@ def produce_gz_example(kafka):
kafka.send_message_set(request) kafka.send_message_set(request)
def main(): def main():
kafka = KafkaClient("localhost", 9092) client = KafkaClient("localhost", 9092)
produce_example(kafka) consumer = SimpleConsumer(client, "test-group", "my-topic")
produce_gz_example(kafka)
consume_example(kafka)
kafka.close()
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)

View File

@@ -15,3 +15,18 @@ There are a few levels of abstraction:
* Partitioned (run each message through a partitioning function) * Partitioned (run each message through a partitioning function)
** HashPartitioned ** HashPartitioned
** FunctionPartition ** FunctionPartition
# Possible API
client = KafkaClient("localhost", 9092)
producer = KafkaProducer(client, "topic")
producer.send_string("hello")
consumer = KafkaConsumer(client, "group", "topic")
consumer.seek(10, 2) # seek to beginning (lowest offset)
consumer.commit() # commit it
for msg in consumer.iter_messages():
print msg

View File

@@ -1,11 +1,18 @@
__title__ = 'kafka' __title__ = 'kafka'
__version__ = '0.1-alpha' __version__ = '0.2-alpha'
__author__ = 'David Arthur' __author__ = 'David Arthur'
__license__ = 'Apache License 2.0' __license__ = 'Apache License 2.0'
__copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0' __copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0'
from .client import ( from kafka.client import KafkaClient
KafkaClient from kafka.conn import KafkaConnection
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message
) )
from .codec import gzip_encode, gzip_decode from kafka.producer import SimpleProducer
from .codec import snappy_encode, snappy_decode from kafka.consumer import SimpleConsumer
__all__ = [
'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'SimpleConsumer',
'create_message', 'create_gzip_message', 'create_snappy_message'
]

View File

@@ -1,588 +1,19 @@
import base64 import base64
from collections import namedtuple, defaultdict from collections import defaultdict
from functools import partial from functools import partial
from itertools import count, cycle from itertools import count, cycle
import logging import logging
from operator import attrgetter from operator import attrgetter
import socket
import struct import struct
import time import time
import zlib import zlib
from .codec import gzip_encode, gzip_decode from kafka.common import *
from .codec import snappy_encode, snappy_decode from kafka.conn import KafkaConnection
from .util import read_short_string, read_int_string from kafka.protocol import KafkaProtocol
from .util import relative_unpack
from .util import write_short_string, write_int_string
from .util import group_by_topic_and_partition
from .util import BufferUnderflowError, ChecksumError
log = logging.getLogger("kafka") log = logging.getLogger("kafka")
###############
# Structs #
###############
# Request payloads
ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"])
FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "max_bytes"])
OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "max_offsets"])
OffsetCommitRequest = namedtuple("OffsetCommitRequest", ["topic", "partition", "offset", "metadata"])
OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
# Response payloads
ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"])
FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"])
OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"])
OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"])
OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"])
BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr"])
# Other useful structs
OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"])
Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
class ErrorMapping(object):
# Many of these are not actually used by the client
UNKNOWN = -1
NO_ERROR = 0
OFFSET_OUT_OF_RANGE = 1
INVALID_MESSAGE = 2
UNKNOWN_TOPIC_OR_PARTITON = 3
INVALID_FETCH_SIZE = 4
LEADER_NOT_AVAILABLE = 5
NOT_LEADER_FOR_PARTITION = 6
REQUEST_TIMED_OUT = 7
BROKER_NOT_AVAILABLE = 8
REPLICA_NOT_AVAILABLE = 9
MESSAGE_SIZE_TO_LARGE = 10
STALE_CONTROLLER_EPOCH = 11
OFFSET_METADATA_TOO_LARGE = 12
class KafkaProtocol(object):
"""
Class to encapsulate all of the protocol encoding/decoding. This class does not
have any state associated with it, it is purely for organization.
"""
PRODUCE_KEY = 0
FETCH_KEY = 1
OFFSET_KEY = 2
METADATA_KEY = 3
OFFSET_COMMIT_KEY = 6
OFFSET_FETCH_KEY = 7
ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
###################
# Private API #
###################
@classmethod
def _encode_message_header(cls, client_id, correlation_id, request_key):
"""
Encode the common request envelope
"""
return struct.pack('>hhih%ds' % len(client_id),
request_key, # ApiKey
0, # ApiVersion
correlation_id, # CorrelationId
len(client_id), #
client_id) # ClientId
@classmethod
def _encode_message_set(cls, messages):
"""
Encode a MessageSet. Unlike other arrays in the protocol, MessageSets are
not length-prefixed
Format
======
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
"""
message_set = ""
for message in messages:
encoded_message = KafkaProtocol._encode_message(message)
message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message)
return message_set
@classmethod
def _encode_message(cls, message):
"""
Encode a single message.
The magic number of a message is a format version number. The only supported
magic number right now is zero
Format
======
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
"""
if message.magic == 0:
msg = struct.pack('>BB', message.magic, message.attributes)
msg += write_int_string(message.key)
msg += write_int_string(message.value)
crc = zlib.crc32(msg)
msg = struct.pack('>i%ds' % len(msg), crc, msg)
else:
raise Exception("Unexpected magic number: %d" % message.magic)
return msg
@classmethod
def _decode_message_set_iter(cls, data):
"""
Iteratively decode a MessageSet
Reads repeated elements of (offset, message), calling decode_message to decode a
single message. Since compressed messages contain futher MessageSets, these two methods
have been decoupled so that they may recurse easily.
"""
cur = 0
while cur < len(data):
try:
((offset, ), cur) = relative_unpack('>q', data, cur)
(msg, cur) = read_int_string(data, cur)
for (offset, message) in KafkaProtocol._decode_message(msg, offset):
yield OffsetAndMessage(offset, message)
except BufferUnderflowError: # If we get a partial read of a message, stop
raise StopIteration()
@classmethod
def _decode_message(cls, data, offset):
"""
Decode a single Message
The only caller of this method is decode_message_set_iter. They are decoupled to
support nested messages (compressed MessageSets). The offset is actually read from
decode_message_set_iter (it is part of the MessageSet payload).
"""
((crc, magic, att), cur) = relative_unpack('>iBB', data, 0)
if crc != zlib.crc32(data[4:]):
raise ChecksumError("Message checksum failed")
(key, cur) = read_int_string(data, cur)
(value, cur) = read_int_string(data, cur)
if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_NONE:
yield (offset, Message(magic, att, key, value))
elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_GZIP:
gz = gzip_decode(value)
for (offset, message) in KafkaProtocol._decode_message_set_iter(gz):
yield (offset, message)
elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_SNAPPY:
snp = snappy_decode(value)
for (offset, message) in KafkaProtocol._decode_message_set_iter(snp):
yield (offset, message)
##################
# Public API #
##################
@classmethod
def create_message(cls, payload, key=None):
"""
Construct a Message
Params
======
payload: bytes, the payload to send to Kafka
key: bytes, a key used for partition routing (optional)
"""
return Message(0, 0, key, payload)
@classmethod
def create_gzip_message(cls, payloads, key=None):
"""
Construct a Gzipped Message containing multiple Messages
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
Params
======
payloads: list(bytes), a list of payload to send be sent to Kafka
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
[KafkaProtocol.create_message(payload) for payload in payloads])
gzipped = gzip_encode(message_set)
return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP), key, gzipped)
@classmethod
def create_snappy_message(cls, payloads, key=None):
"""
Construct a Snappy Message containing multiple Messages
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
Params
======
payloads: list(bytes), a list of payload to send be sent to Kafka
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
[KafkaProtocol.create_message(payload) for payload in payloads])
snapped = snappy_encode(message_set)
return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY), key, snapped)
@classmethod
def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1, timeout=1000):
"""
Encode some ProduceRequest structs
Params
======
client_id: string
correlation_id: string
payloads: list of ProduceRequest
acks: How "acky" you want the request to be
0: immediate response
1: written to disk by the leader
2+: waits for this many number of replicas to sync
-1: waits for all replicas to be in sync
timeout: Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout
"""
grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY)
message += struct.pack('>hii', acks, timeout, len(grouped_payloads))
for topic, topic_payloads in grouped_payloads.items():
message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(topic_payloads))
for partition, payload in topic_payloads.items():
message_set = KafkaProtocol._encode_message_set(payload.messages)
message += struct.pack('>ii%ds' % len(message_set), partition, len(message_set), message_set)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
def decode_produce_response(cls, data):
"""
Decode bytes to a ProduceResponse
Params
======
data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
for i in range(num_topics):
((strlen,), cur) = relative_unpack('>h', data, cur)
topic = data[cur:cur+strlen]
cur += strlen
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions):
((partition, error, offset), cur) = relative_unpack('>ihq', data, cur)
yield ProduceResponse(topic, partition, error, offset)
@classmethod
def encode_fetch_request(cls, client_id, correlation_id, payloads=[], max_wait_time=100, min_bytes=4096):
"""
Encodes some FetchRequest structs
Params
======
client_id: string
correlation_id: string
payloads: list of FetchRequest
max_wait_time: int, how long to block waiting on min_bytes of data
min_bytes: int, the minimum number of bytes to accumulate before returning the response
"""
grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY)
message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(grouped_payloads)) # -1 is the replica id
for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
message += struct.pack('>i', len(topic_payloads))
for partition, payload in topic_payloads.items():
message += struct.pack('>iqi', partition, payload.offset, payload.max_bytes)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
def decode_fetch_response_iter(cls, data):
"""
Decode bytes to a FetchResponse
Params
======
data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
for i in range(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions):
((partition, error, highwater_mark_offset), cur) = relative_unpack('>ihq', data, cur)
(message_set, cur) = read_int_string(data, cur)
yield FetchResponse(topic, partition, error, highwater_mark_offset,
KafkaProtocol._decode_message_set_iter(message_set))
@classmethod
def encode_offset_request(cls, client_id, correlation_id, payloads=[]):
grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY)
message += struct.pack('>ii', -1, len(grouped_payloads)) # -1 is the replica id
for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
message += struct.pack('>i', len(topic_payloads))
for partition, payload in topic_payloads.items():
message += struct.pack('>iqi', partition, payload.time, payload.max_offsets)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
def decode_offset_response(cls, data):
"""
Decode bytes to an OffsetResponse
Params
======
data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
for i in range(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions):
((partition, error, num_offsets,), cur) = relative_unpack('>ihi', data, cur)
offsets = []
for j in range(num_offsets):
((offset,), cur) = relative_unpack('>q', data, cur)
offsets.append(offset)
yield OffsetResponse(topic, partition, error, tuple(offsets))
@classmethod
def encode_metadata_request(cls, client_id, correlation_id, topics=[]):
"""
Encode a MetadataRequest
Params
======
client_id: string
correlation_id: string
topics: list of strings
"""
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.METADATA_KEY)
message += struct.pack('>i', len(topics))
for topic in topics:
message += struct.pack('>h%ds' % len(topic), len(topic), topic)
return write_int_string(message)
@classmethod
def decode_metadata_response(cls, data):
"""
Decode bytes to a MetadataResponse
Params
======
data: bytes to decode
"""
((correlation_id, numBrokers), cur) = relative_unpack('>ii', data, 0)
# Broker info
brokers = {}
for i in range(numBrokers):
((nodeId, ), cur) = relative_unpack('>i', data, cur)
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)
brokers[nodeId] = BrokerMetadata(nodeId, host, port)
# Topic info
((num_topics,), cur) = relative_unpack('>i', data, cur)
topicMetadata = {}
for i in range(num_topics):
((topicError,), cur) = relative_unpack('>h', data, cur)
(topicName, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
partitionMetadata = {}
for j in range(num_partitions):
((partitionErrorCode, partition, leader, numReplicas), cur) = relative_unpack('>hiii', data, cur)
(replicas, cur) = relative_unpack('>%di' % numReplicas, data, cur)
((numIsr,), cur) = relative_unpack('>i', data, cur)
(isr, cur) = relative_unpack('>%di' % numIsr, data, cur)
partitionMetadata[partition] = PartitionMetadata(topicName, partition, leader, replicas, isr)
topicMetadata[topicName] = partitionMetadata
return (brokers, topicMetadata)
@classmethod
def encode_offset_commit_request(cls, client_id, correlation_id, group, payloads):
"""
Encode some OffsetCommitRequest structs
Params
======
client_id: string
correlation_id: string
group: string, the consumer group you are committing offsets for
payloads: list of OffsetCommitRequest
"""
grouped_payloads= group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY)
message += write_short_string(group)
message += struct.pack('>i', len(grouped_payloads))
for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
message += struct.pack('>i', len(topic_payloads))
for partition, payload in topic_payloads.items():
message += struct.pack('>iq', partition, payload.offset)
message += write_short_string(payload.metadata)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
def decode_offset_commit_response(cls, data):
"""
Decode bytes to an OffsetCommitResponse
Params
======
data: bytes to decode
"""
data = data[2:] # TODO remove me when versionId is removed
((correlation_id,), cur) = relative_unpack('>i', data, 0)
(client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
for i in xrange(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in xrange(num_partitions):
((partition, error), cur) = relative_unpack('>ih', data, cur)
yield OffsetCommitResponse(topic, partition, error)
@classmethod
def encode_offset_fetch_request(cls, client_id, correlation_id, group, payloads):
"""
Encode some OffsetFetchRequest structs
Params
======
client_id: string
correlation_id: string
group: string, the consumer group you are fetching offsets for
payloads: list of OffsetFetchRequest
"""
grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY)
message += write_short_string(group)
message += struct.pack('>i', len(grouped_payloads))
for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
message += struct.pack('>i', len(topic_payloads))
for partition, payload in topic_payloads.items():
message += struct.pack('>i', partition)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
def decode_offset_fetch_response(cls, data):
"""
Decode bytes to an OffsetFetchResponse
Params
======
data: bytes to decode
"""
data = data[2:] # TODO remove me when versionId is removed
((correlation_id,), cur) = relative_unpack('>i', data, 0)
(client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
for i in range(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions):
((partition, offset), cur) = relative_unpack('>iq', data, cur)
(metadata, cur) = read_short_string(data, cur)
((error,), cur) = relative_unpack('>h', data, cur)
yield OffsetFetchResponse(topic, partition, offset, metadata, error)
class KafkaConnection(object):
"""
A socket connection to a single Kafka broker
This class is _not_ thread safe. Each call to `send` must be followed
by a call to `recv` in order to get the correct response. Eventually,
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
def __init__(self, host, port, bufsize=4096):
self.host = host
self.port = port
self.bufsize = bufsize
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self._sock.settimeout(10)
def __str__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
###################
# Private API #
###################
def _consume_response(self):
"""
Fully consumer the response iterator
"""
data = ""
for chunk in self._consume_response_iter():
data += chunk
return data
def _consume_response_iter(self):
"""
This method handles the response header and error messages. It
then returns an iterator for the chunks of the response
"""
log.debug("Handling response from Kafka")
# Read the size off of the header
resp = self._sock.recv(4)
if resp == "":
raise Exception("Got no response from Kafka")
(size,) = struct.unpack('>i', resp)
messageSize = size - 4
log.debug("About to read %d bytes from Kafka", messageSize)
# Read the remainder of the response
total = 0
while total < messageSize:
resp = self._sock.recv(self.bufsize)
log.debug("Read %d bytes from Kafka", len(resp))
if resp == "":
raise BufferUnderflowError("Not enough data to read this response")
total += len(resp)
yield resp
##################
# Public API #
##################
# TODO multiplex socket communication to allow for multi-threaded clients
def send(self, requestId, payload):
"Send a request to Kafka"
sent = self._sock.sendall(payload)
if sent == 0:
raise RuntimeError("Kafka went away")
self.data = self._consume_response()
def recv(self, requestId):
"Get a response from Kafka"
return self.data
def close(self):
"Close this connection"
self._sock.close()
class KafkaClient(object): class KafkaClient(object):
CLIENT_ID = "kafka-python" CLIENT_ID = "kafka-python"
@@ -808,71 +239,3 @@ class KafkaClient(object):
else: else:
out.append(offset_fetch_response) out.append(offset_fetch_response)
return out return out
class SimpleProducer(object):
"""
A simple, round-robbin producer. Each message goes to exactly one partition
"""
def __init__(self, client, topic):
self.client = client
self.topic = topic
self.client.load_metadata_for_topics(topic)
self.next_partition = cycle(self.client.topic_partitions[topic])
def send_message(self, msg):
req = ProduceRequest(self.topic, self.next_partition.next(),
messages=[KafkaProtocol.create_message(msg)])
resp = self.client.send_produce_request([req]).next()
class SimpleConsumer(object):
"""
A simple consumer implementation that consumes all partitions for a topic
"""
def __init__(self, client, group, topic):
self.client = client
self.topic = topic
self.group = group
self.client.load_metadata_for_topics(topic)
self.offsets = {}
def get_or_init_offset_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
return resp.offset
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
return 0
else:
raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))
for partition in self.client.topic_partitions[topic]:
req = OffsetFetchRequest(topic, partition)
(offset,) = self.client.send_offset_fetch_request(group, [req],
callback=get_or_init_offset_callback, fail_on_error=False)
self.offsets[partition] = offset
def __iter__(self):
iters = {}
for partition, offset in self.offsets.items():
iters[partition] = self.__iter_partition__(partition, offset)
while True:
for it in iters.values():
yield it.next()
def __iter_partition__(self, partition, offset):
while True:
req = FetchRequest(self.topic, partition, offset, 1024)
(resp,) = self.client.send_fetch_request([req])
assert resp.topic == self.topic
assert resp.partition == partition
next_offset = None
for message in resp.messages:
next_offset = message.offset
yield message
if next_offset is None:
raise StopIteration("No more messages")
else:
offset = next_offset + 1
# Commit offset here?

43
kafka/common.py Normal file
View File

@@ -0,0 +1,43 @@
from collections import namedtuple
###############
# Structs #
###############
# Request payloads
ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"])
FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "max_bytes"])
OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "max_offsets"])
OffsetCommitRequest = namedtuple("OffsetCommitRequest", ["topic", "partition", "offset", "metadata"])
OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
# Response payloads
ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"])
FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"])
OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"])
OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"])
OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"])
BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr"])
# Other useful structs
OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"])
Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
class ErrorMapping(object):
# Many of these are not actually used by the client
UNKNOWN = -1
NO_ERROR = 0
OFFSET_OUT_OF_RANGE = 1
INVALID_MESSAGE = 2
UNKNOWN_TOPIC_OR_PARTITON = 3
INVALID_FETCH_SIZE = 4
LEADER_NOT_AVAILABLE = 5
NOT_LEADER_FOR_PARTITION = 6
REQUEST_TIMED_OUT = 7
BROKER_NOT_AVAILABLE = 8
REPLICA_NOT_AVAILABLE = 9
MESSAGE_SIZE_TO_LARGE = 10
STALE_CONTROLLER_EPOCH = 11
OFFSET_METADATA_TOO_LARGE = 12

85
kafka/conn.py Normal file
View File

@@ -0,0 +1,85 @@
import logging
import socket
import struct
log = logging.getLogger("kafka")
class KafkaConnection(object):
"""
A socket connection to a single Kafka broker
This class is _not_ thread safe. Each call to `send` must be followed
by a call to `recv` in order to get the correct response. Eventually,
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
def __init__(self, host, port, bufsize=4096):
self.host = host
self.port = port
self.bufsize = bufsize
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self._sock.settimeout(10)
def __str__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
###################
# Private API #
###################
def _consume_response(self):
"""
Fully consumer the response iterator
"""
data = ""
for chunk in self._consume_response_iter():
data += chunk
return data
def _consume_response_iter(self):
"""
This method handles the response header and error messages. It
then returns an iterator for the chunks of the response
"""
log.debug("Handling response from Kafka")
# Read the size off of the header
resp = self._sock.recv(4)
if resp == "":
raise Exception("Got no response from Kafka")
(size,) = struct.unpack('>i', resp)
messageSize = size - 4
log.debug("About to read %d bytes from Kafka", messageSize)
# Read the remainder of the response
total = 0
while total < messageSize:
resp = self._sock.recv(self.bufsize)
log.debug("Read %d bytes from Kafka", len(resp))
if resp == "":
raise BufferUnderflowError("Not enough data to read this response")
total += len(resp)
yield resp
##################
# Public API #
##################
# TODO multiplex socket communication to allow for multi-threaded clients
def send(self, requestId, payload):
"Send a request to Kafka"
sent = self._sock.sendall(payload)
if sent == 0:
raise RuntimeError("Kafka went away")
self.data = self._consume_response()
def recv(self, requestId):
"Get a response from Kafka"
return self.data
def close(self):
"Close this connection"
self._sock.close()

159
kafka/consumer.py Normal file
View File

@@ -0,0 +1,159 @@
import logging
from threading import Lock
from kafka.common import (
ErrorMapping, FetchRequest,
OffsetRequest, OffsetFetchRequest, OffsetCommitRequest
)
log = logging.getLogger("kafka")
class SimpleConsumer(object):
"""
A simple consumer implementation that consumes all partitions for a topic
client: a connected KafkaClient
group: a name for this consumer, used for offset storage and must be unique
topic: the topic to consume
auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another
when one is triggered. These triggers simply call the commit method on this class. A
manual call to commit will also reset these triggers
"""
def __init__(self, client, group, topic, auto_commit=False, auto_commit_every_n=None, auto_commit_every_t=None):
self.client = client
self.topic = topic
self.group = group
self.client.load_metadata_for_topics(topic)
self.offsets = {}
# Set up the auto-commit timer
if auto_commit is True:
if auto_commit_every_t is not None:
self.commit_timer = ReentrantTimer(auto_commit_every_t, self.commit)
self.commit_timer.start()
self.commit_lock = Lock()
self.count_since_commit = 0
self.auto_commit = auto_commit
self.auto_commit_every_n = auto_commit_every_n
self.auto_commit_every_t = auto_commit_every_t
def get_or_init_offset_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
return resp.offset
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
return 0
else:
raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))
for partition in self.client.topic_partitions[topic]:
req = OffsetFetchRequest(topic, partition)
(offset,) = self.client.send_offset_fetch_request(group, [req],
callback=get_or_init_offset_callback, fail_on_error=False)
self.offsets[partition] = offset
print self.offsets
def seek(self, offset, whence):
"""
Alter the current offset in the consumer, similar to fseek
offset: how much to modify the offset
whence: where to modify it from
0 is relative to the earliest available offset (head)
1 is relative to the current offset
2 is relative to the latest known offset (tail)
"""
if whence == 1:
# relative to current position
for partition, _offset in self.offsets.items():
self.offset[partition] = _offset + offset
elif whence in (0, 2):
# relative to beginning or end
reqs = []
for partition in offsets.keys():
if whence == 0:
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
elif whence == 2:
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
else:
pass
resps = self.client.send_offset_request([req])
for resp in resps:
self.offsets[resp.partition] = resp.offsets[0] + offset
else:
raise
def commit(self, partitions=[]):
"""
Commit offsets for this consumer
partitions: list of partitions to commit, default is to commit all of them
"""
# short circuit if nothing happened
if self.count_since_commit == 0:
return
with self.commit_lock:
reqs = []
if len(partitions) == 0: # commit all partitions
for partition, offset in self.offsets.items():
log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % (
offset, self.group, self.topic, partition))
reqs.append(OffsetCommitRequest(self.topic, partition, offset, None))
else:
for partition in partitions:
offset = self.offsets[partition]
log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % (
offset, self.group, self.topic, partition))
reqs.append(OffsetCommitRequest(self.topic, partition, offset, None))
resps = self.send_offset_commit_request(self.group, reqs)
for resp in resps:
assert resp.error == 0
self.count_since_commit = 0
def __iter__(self):
iters = {}
for partition, offset in self.offsets.items():
iters[partition] = self.__iter_partition__(partition, offset)
while True:
for it in iters.values():
yield it.next()
self.count_since_commit += 1
# deal with auto commits
if self.auto_commit is True:
if self.auto_commit_every_n is not None and self.count_since_commit > self.auto_commit_every_n:
if self.commit_timer is not None:
self.commit_timer.stop()
self.commit()
self.commit_timer.start()
else:
self.commit()
def __iter_partition__(self, partition, offset):
while True:
req = FetchRequest(self.topic, partition, offset, 1024)
(resp,) = self.client.send_fetch_request([req])
assert resp.topic == self.topic
assert resp.partition == partition
next_offset = None
for message in resp.messages:
next_offset = message.offset
print partition, message, message.offset
yield message
# update the internal state _after_ we yield the message
self.offsets[partition] = message.offset
print partition, next_offset
if next_offset is None:
break
else:
offset = next_offset + 1

22
kafka/producer.py Normal file
View File

@@ -0,0 +1,22 @@
from itertools import cycle
import logging
from kafka.common import ProduceRequest
from kafka.protocol import create_message
log = logging.getLogger("kafka")
class SimpleProducer(object):
"""
A simple, round-robbin producer. Each message goes to exactly one partition
"""
def __init__(self, client, topic):
self.client = client
self.topic = topic
self.client.load_metadata_for_topics(topic)
self.next_partition = cycle(self.client.topic_partitions[topic])
def send_message(self, msg):
req = ProduceRequest(self.topic, self.next_partition.next(),
messages=[create_message(msg)])
resp = self.client.send_produce_request([req]).next()

457
kafka/protocol.py Normal file
View File

@@ -0,0 +1,457 @@
import logging
import struct
import zlib
from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
)
from kafka.common import (
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
ProduceResponse, FetchResponse, OffsetResponse,
OffsetCommitResponse, OffsetFetchResponse
)
from kafka.util import (
read_short_string, read_int_string, relative_unpack,
write_short_string, write_int_string, group_by_topic_and_partition,
BufferUnderflowError, ChecksumError
)
log = logging.getLogger("kafka")
class KafkaProtocol(object):
"""
Class to encapsulate all of the protocol encoding/decoding. This class does not
have any state associated with it, it is purely for organization.
"""
PRODUCE_KEY = 0
FETCH_KEY = 1
OFFSET_KEY = 2
METADATA_KEY = 3
OFFSET_COMMIT_KEY = 6
OFFSET_FETCH_KEY = 7
ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
###################
# Private API #
###################
@classmethod
def _encode_message_header(cls, client_id, correlation_id, request_key):
"""
Encode the common request envelope
"""
return struct.pack('>hhih%ds' % len(client_id),
request_key, # ApiKey
0, # ApiVersion
correlation_id, # CorrelationId
len(client_id), #
client_id) # ClientId
@classmethod
def _encode_message_set(cls, messages):
"""
Encode a MessageSet. Unlike other arrays in the protocol, MessageSets are
not length-prefixed
Format
======
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
"""
message_set = ""
for message in messages:
encoded_message = KafkaProtocol._encode_message(message)
message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message)
return message_set
@classmethod
def _encode_message(cls, message):
"""
Encode a single message.
The magic number of a message is a format version number. The only supported
magic number right now is zero
Format
======
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
"""
if message.magic == 0:
msg = struct.pack('>BB', message.magic, message.attributes)
msg += write_int_string(message.key)
msg += write_int_string(message.value)
crc = zlib.crc32(msg)
msg = struct.pack('>i%ds' % len(msg), crc, msg)
else:
raise Exception("Unexpected magic number: %d" % message.magic)
return msg
@classmethod
def _decode_message_set_iter(cls, data):
"""
Iteratively decode a MessageSet
Reads repeated elements of (offset, message), calling decode_message to decode a
single message. Since compressed messages contain futher MessageSets, these two methods
have been decoupled so that they may recurse easily.
"""
cur = 0
while cur < len(data):
try:
((offset, ), cur) = relative_unpack('>q', data, cur)
(msg, cur) = read_int_string(data, cur)
for (offset, message) in KafkaProtocol._decode_message(msg, offset):
yield OffsetAndMessage(offset, message)
except BufferUnderflowError: # If we get a partial read of a message, stop
raise StopIteration()
@classmethod
def _decode_message(cls, data, offset):
"""
Decode a single Message
The only caller of this method is decode_message_set_iter. They are decoupled to
support nested messages (compressed MessageSets). The offset is actually read from
decode_message_set_iter (it is part of the MessageSet payload).
"""
((crc, magic, att), cur) = relative_unpack('>iBB', data, 0)
if crc != zlib.crc32(data[4:]):
raise ChecksumError("Message checksum failed")
(key, cur) = read_int_string(data, cur)
(value, cur) = read_int_string(data, cur)
if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_NONE:
yield (offset, Message(magic, att, key, value))
elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_GZIP:
gz = gzip_decode(value)
for (offset, message) in KafkaProtocol._decode_message_set_iter(gz):
yield (offset, message)
elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_SNAPPY:
snp = snappy_decode(value)
for (offset, message) in KafkaProtocol._decode_message_set_iter(snp):
yield (offset, message)
##################
# Public API #
##################
@classmethod
def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1, timeout=1000):
"""
Encode some ProduceRequest structs
Params
======
client_id: string
correlation_id: string
payloads: list of ProduceRequest
acks: How "acky" you want the request to be
0: immediate response
1: written to disk by the leader
2+: waits for this many number of replicas to sync
-1: waits for all replicas to be in sync
timeout: Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout
"""
grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY)
message += struct.pack('>hii', acks, timeout, len(grouped_payloads))
for topic, topic_payloads in grouped_payloads.items():
message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(topic_payloads))
for partition, payload in topic_payloads.items():
message_set = KafkaProtocol._encode_message_set(payload.messages)
message += struct.pack('>ii%ds' % len(message_set), partition, len(message_set), message_set)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
def decode_produce_response(cls, data):
"""
Decode bytes to a ProduceResponse
Params
======
data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
for i in range(num_topics):
((strlen,), cur) = relative_unpack('>h', data, cur)
topic = data[cur:cur+strlen]
cur += strlen
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions):
((partition, error, offset), cur) = relative_unpack('>ihq', data, cur)
yield ProduceResponse(topic, partition, error, offset)
@classmethod
def encode_fetch_request(cls, client_id, correlation_id, payloads=[], max_wait_time=100, min_bytes=4096):
"""
Encodes some FetchRequest structs
Params
======
client_id: string
correlation_id: string
payloads: list of FetchRequest
max_wait_time: int, how long to block waiting on min_bytes of data
min_bytes: int, the minimum number of bytes to accumulate before returning the response
"""
grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY)
message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(grouped_payloads)) # -1 is the replica id
for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
message += struct.pack('>i', len(topic_payloads))
for partition, payload in topic_payloads.items():
message += struct.pack('>iqi', partition, payload.offset, payload.max_bytes)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
def decode_fetch_response_iter(cls, data):
"""
Decode bytes to a FetchResponse
Params
======
data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
for i in range(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions):
((partition, error, highwater_mark_offset), cur) = relative_unpack('>ihq', data, cur)
(message_set, cur) = read_int_string(data, cur)
yield FetchResponse(topic, partition, error, highwater_mark_offset,
KafkaProtocol._decode_message_set_iter(message_set))
@classmethod
def encode_offset_request(cls, client_id, correlation_id, payloads=[]):
grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY)
message += struct.pack('>ii', -1, len(grouped_payloads)) # -1 is the replica id
for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
message += struct.pack('>i', len(topic_payloads))
for partition, payload in topic_payloads.items():
message += struct.pack('>iqi', partition, payload.time, payload.max_offsets)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
def decode_offset_response(cls, data):
"""
Decode bytes to an OffsetResponse
Params
======
data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
for i in range(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions):
((partition, error, num_offsets,), cur) = relative_unpack('>ihi', data, cur)
offsets = []
for j in range(num_offsets):
((offset,), cur) = relative_unpack('>q', data, cur)
offsets.append(offset)
yield OffsetResponse(topic, partition, error, tuple(offsets))
@classmethod
def encode_metadata_request(cls, client_id, correlation_id, topics=[]):
"""
Encode a MetadataRequest
Params
======
client_id: string
correlation_id: string
topics: list of strings
"""
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.METADATA_KEY)
message += struct.pack('>i', len(topics))
for topic in topics:
message += struct.pack('>h%ds' % len(topic), len(topic), topic)
return write_int_string(message)
@classmethod
def decode_metadata_response(cls, data):
"""
Decode bytes to a MetadataResponse
Params
======
data: bytes to decode
"""
((correlation_id, numBrokers), cur) = relative_unpack('>ii', data, 0)
# Broker info
brokers = {}
for i in range(numBrokers):
((nodeId, ), cur) = relative_unpack('>i', data, cur)
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)
brokers[nodeId] = BrokerMetadata(nodeId, host, port)
# Topic info
((num_topics,), cur) = relative_unpack('>i', data, cur)
topicMetadata = {}
for i in range(num_topics):
((topicError,), cur) = relative_unpack('>h', data, cur)
(topicName, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
partitionMetadata = {}
for j in range(num_partitions):
((partitionErrorCode, partition, leader, numReplicas), cur) = relative_unpack('>hiii', data, cur)
(replicas, cur) = relative_unpack('>%di' % numReplicas, data, cur)
((numIsr,), cur) = relative_unpack('>i', data, cur)
(isr, cur) = relative_unpack('>%di' % numIsr, data, cur)
partitionMetadata[partition] = PartitionMetadata(topicName, partition, leader, replicas, isr)
topicMetadata[topicName] = partitionMetadata
return (brokers, topicMetadata)
@classmethod
def encode_offset_commit_request(cls, client_id, correlation_id, group, payloads):
"""
Encode some OffsetCommitRequest structs
Params
======
client_id: string
correlation_id: string
group: string, the consumer group you are committing offsets for
payloads: list of OffsetCommitRequest
"""
grouped_payloads= group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY)
message += write_short_string(group)
message += struct.pack('>i', len(grouped_payloads))
for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
message += struct.pack('>i', len(topic_payloads))
for partition, payload in topic_payloads.items():
message += struct.pack('>iq', partition, payload.offset)
message += write_short_string(payload.metadata)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
def decode_offset_commit_response(cls, data):
"""
Decode bytes to an OffsetCommitResponse
Params
======
data: bytes to decode
"""
data = data[2:] # TODO remove me when versionId is removed
((correlation_id,), cur) = relative_unpack('>i', data, 0)
(client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
for i in xrange(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in xrange(num_partitions):
((partition, error), cur) = relative_unpack('>ih', data, cur)
yield OffsetCommitResponse(topic, partition, error)
@classmethod
def encode_offset_fetch_request(cls, client_id, correlation_id, group, payloads):
"""
Encode some OffsetFetchRequest structs
Params
======
client_id: string
correlation_id: string
group: string, the consumer group you are fetching offsets for
payloads: list of OffsetFetchRequest
"""
grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY)
message += write_short_string(group)
message += struct.pack('>i', len(grouped_payloads))
for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
message += struct.pack('>i', len(topic_payloads))
for partition, payload in topic_payloads.items():
message += struct.pack('>i', partition)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
def decode_offset_fetch_response(cls, data):
"""
Decode bytes to an OffsetFetchResponse
Params
======
data: bytes to decode
"""
data = data[2:] # TODO remove me when versionId is removed
((correlation_id,), cur) = relative_unpack('>i', data, 0)
(client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
for i in range(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions):
((partition, offset), cur) = relative_unpack('>iq', data, cur)
(metadata, cur) = read_short_string(data, cur)
((error,), cur) = relative_unpack('>h', data, cur)
yield OffsetFetchResponse(topic, partition, offset, metadata, error)
def create_message(payload, key=None):
"""
Construct a Message
Params
======
payload: bytes, the payload to send to Kafka
key: bytes, a key used for partition routing (optional)
"""
return Message(0, 0, key, payload)
def create_gzip_message(payloads, key=None):
"""
Construct a Gzipped Message containing multiple Messages
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
Params
======
payloads: list(bytes), a list of payload to send be sent to Kafka
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload) for payload in payloads])
gzipped = gzip_encode(message_set)
return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP), key, gzipped)
def create_snappy_message(payloads, key=None):
"""
Construct a Snappy Message containing multiple Messages
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
Params
======
payloads: list(bytes), a list of payload to send be sent to Kafka
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload) for payload in payloads])
snapped = snappy_encode(message_set)
return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY), key, snapped)

View File

@@ -1,6 +1,7 @@
from collections import defaultdict from collections import defaultdict
from itertools import groupby from itertools import groupby
import struct import struct
from threading import Timer
def write_int_string(s): def write_int_string(s):
if s is None: if s is None:
@@ -56,3 +57,27 @@ class BufferUnderflowError(Exception):
class ChecksumError(Exception): class ChecksumError(Exception):
pass pass
class ReentrantTimer(object):
"""
A timer that can be restarted, unlike threading.Timer (although this uses threading.Timer)
t: timer interval in milliseconds
fn: a callable to invoke
"""
def __init__(self, t, fn):
self.timer = None
self.t = t
self.fn = fn
def start(self):
if self.timer is None:
self.timer = Timer(self.t / 1000., self.fn)
self.timer.start()
else:
self.timer.cancel()
self.timer = Timer(self.t / 1000., self.fn)
self.timer.start()
def stop(self):
self.timer.cancel()

View File

@@ -12,7 +12,8 @@ import time
import unittest import unittest
from urlparse import urlparse from urlparse import urlparse
from kafka.client import * from kafka import *
from kafka.common import *
def get_open_port(): def get_open_port():
sock = socket.socket() sock = socket.socket()
@@ -146,7 +147,7 @@ class TestKafkaClient(unittest.TestCase):
def test_produce_many_simple(self): def test_produce_many_simple(self):
produce = ProduceRequest("test_produce_many_simple", 0, messages=[ produce = ProduceRequest("test_produce_many_simple", 0, messages=[
KafkaProtocol.create_message("Test message %d" % i) for i in range(100) create_message("Test message %d" % i) for i in range(100)
]) ])
for resp in self.client.send_produce_request([produce]): for resp in self.client.send_produce_request([produce]):
@@ -172,7 +173,7 @@ class TestKafkaClient(unittest.TestCase):
def test_produce_10k_simple(self): def test_produce_10k_simple(self):
produce = ProduceRequest("test_produce_10k_simple", 0, messages=[ produce = ProduceRequest("test_produce_10k_simple", 0, messages=[
KafkaProtocol.create_message("Test message %d" % i) for i in range(10000) create_message("Test message %d" % i) for i in range(10000)
]) ])
for resp in self.client.send_produce_request([produce]): for resp in self.client.send_produce_request([produce]):
@@ -183,8 +184,8 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(offset.offsets[0], 10000) self.assertEquals(offset.offsets[0], 10000)
def test_produce_many_gzip(self): def test_produce_many_gzip(self):
message1 = KafkaProtocol.create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
message2 = KafkaProtocol.create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2]) produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2])
@@ -196,8 +197,8 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(offset.offsets[0], 200) self.assertEquals(offset.offsets[0], 200)
def test_produce_many_snappy(self): def test_produce_many_snappy(self):
message1 = KafkaProtocol.create_snappy_message(["Snappy 1 %d" % i for i in range(100)]) message1 = create_snappy_message(["Snappy 1 %d" % i for i in range(100)])
message2 = KafkaProtocol.create_snappy_message(["Snappy 2 %d" % i for i in range(100)]) message2 = create_snappy_message(["Snappy 2 %d" % i for i in range(100)])
produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2]) produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2])
@@ -209,9 +210,9 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(offset.offsets[0], 200) self.assertEquals(offset.offsets[0], 200)
def test_produce_mixed(self): def test_produce_mixed(self):
message1 = KafkaProtocol.create_message("Just a plain message") message1 = create_message("Just a plain message")
message2 = KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100)]) message2 = create_gzip_message(["Gzipped %d" % i for i in range(100)])
message3 = KafkaProtocol.create_snappy_message(["Snappy %d" % i for i in range(100)]) message3 = create_snappy_message(["Snappy %d" % i for i in range(100)])
produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3]) produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3])
@@ -225,7 +226,7 @@ class TestKafkaClient(unittest.TestCase):
def test_produce_100k_gzipped(self): def test_produce_100k_gzipped(self):
produce = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ produce = ProduceRequest("test_produce_100k_gzipped", 0, messages=[
KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100000)]) create_gzip_message(["Gzipped %d" % i for i in range(100000)])
]) ])
for resp in self.client.send_produce_request([produce]): for resp in self.client.send_produce_request([produce]):
@@ -252,8 +253,8 @@ class TestKafkaClient(unittest.TestCase):
def test_produce_consume(self): def test_produce_consume(self):
produce = ProduceRequest("test_produce_consume", 0, messages=[ produce = ProduceRequest("test_produce_consume", 0, messages=[
KafkaProtocol.create_message("Just a test message"), create_message("Just a test message"),
KafkaProtocol.create_message("Message with a key", "foo"), create_message("Message with a key", "foo"),
]) ])
for resp in self.client.send_produce_request([produce]): for resp in self.client.send_produce_request([produce]):
@@ -276,7 +277,7 @@ class TestKafkaClient(unittest.TestCase):
def test_produce_consume_many(self): def test_produce_consume_many(self):
produce = ProduceRequest("test_produce_consume_many", 0, messages=[ produce = ProduceRequest("test_produce_consume_many", 0, messages=[
KafkaProtocol.create_message("Test message %d" % i) for i in range(100) create_message("Test message %d" % i) for i in range(100)
]) ])
for resp in self.client.send_produce_request([produce]): for resp in self.client.send_produce_request([produce]):
@@ -308,10 +309,10 @@ class TestKafkaClient(unittest.TestCase):
def test_produce_consume_two_partitions(self): def test_produce_consume_two_partitions(self):
produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[ produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[
KafkaProtocol.create_message("Partition 0 %d" % i) for i in range(10) create_message("Partition 0 %d" % i) for i in range(10)
]) ])
produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[ produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[
KafkaProtocol.create_message("Partition 1 %d" % i) for i in range(10) create_message("Partition 1 %d" % i) for i in range(10)
]) ])
for resp in self.client.send_produce_request([produce1, produce2]): for resp in self.client.send_produce_request([produce1, produce2]):
@@ -400,22 +401,25 @@ class TestConsumer(unittest.TestCase):
cls.server2.close() cls.server2.close()
def test_consumer(self): def test_consumer(self):
# Produce 100 messages to partition 0
produce1 = ProduceRequest("test_consumer", 0, messages=[ produce1 = ProduceRequest("test_consumer", 0, messages=[
KafkaProtocol.create_message("Test message 0 %d" % i) for i in range(100) create_message("Test message 0 %d" % i) for i in range(100)
])
produce2 = ProduceRequest("test_consumer", 1, messages=[
KafkaProtocol.create_message("Test message 1 %d" % i) for i in range(100)
]) ])
for resp in self.client.send_produce_request([produce1]): for resp in self.client.send_produce_request([produce1]):
self.assertEquals(resp.error, 0) self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0) self.assertEquals(resp.offset, 0)
# Produce 100 messages to partition 1
produce2 = ProduceRequest("test_consumer", 1, messages=[
create_message("Test message 1 %d" % i) for i in range(100)
])
for resp in self.client.send_produce_request([produce2]): for resp in self.client.send_produce_request([produce2]):
self.assertEquals(resp.error, 0) self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0) self.assertEquals(resp.offset, 0)
# Start a consumer
consumer = SimpleConsumer(self.client, "group1", "test_consumer") consumer = SimpleConsumer(self.client, "group1", "test_consumer")
all_messages = [] all_messages = []
for message in consumer: for message in consumer:
@@ -424,6 +428,23 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(len(all_messages), 200) self.assertEquals(len(all_messages), 200)
self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes
# Produce more messages
produce3 = ProduceRequest("test_consumer", 1, messages=[
create_message("Test message 3 %d" % i) for i in range(10)
])
for resp in self.client.send_produce_request([produce3]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 100)
# Start a new consumer, make sure we only get the newly produced messages
consumer = SimpleConsumer(self.client, "group1", "test_consumer")
all_messages = []
for message in consumer:
all_messages.append(message)
self.assertEquals(len(all_messages), 10)
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.DEBUG)
unittest.main() unittest.main()