PEP8-ify most of the files

consumer.py and conn.py will be done later after pending merges
This commit is contained in:
Mahendra M
2013-05-29 14:20:58 +05:30
committed by David Arthur
parent 97962d4af3
commit 2c257eeb1f
7 changed files with 312 additions and 131 deletions

View File

@@ -11,6 +11,7 @@ except ImportError:
log.warn("Snappy codec not available") log.warn("Snappy codec not available")
hasSnappy = False hasSnappy = False
def gzip_encode(payload): def gzip_encode(payload):
buf = StringIO() buf = StringIO()
f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6) f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
@@ -21,6 +22,7 @@ def gzip_encode(payload):
buf.close() buf.close()
return out return out
def gzip_decode(payload): def gzip_decode(payload):
buf = StringIO(payload) buf = StringIO(payload)
f = gzip.GzipFile(fileobj=buf, mode='r') f = gzip.GzipFile(fileobj=buf, mode='r')
@@ -29,11 +31,13 @@ def gzip_decode(payload):
buf.close() buf.close()
return out return out
def snappy_encode(payload): def snappy_encode(payload):
if not hasSnappy: if not hasSnappy:
raise NotImplementedError("Snappy codec not available") raise NotImplementedError("Snappy codec not available")
return snappy.compress(payload) return snappy.compress(payload)
def snappy_decode(payload): def snappy_decode(payload):
if not hasSnappy: if not hasSnappy:
raise NotImplementedError("Snappy codec not available") raise NotImplementedError("Snappy codec not available")

View File

@@ -5,26 +5,49 @@ from collections import namedtuple
############### ###############
# Request payloads # Request payloads
ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) ProduceRequest = namedtuple("ProduceRequest",
FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "max_bytes"]) ["topic", "partition", "messages"])
OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "max_offsets"])
OffsetCommitRequest = namedtuple("OffsetCommitRequest", ["topic", "partition", "offset", "metadata"]) FetchRequest = namedtuple("FetchRequest",
["topic", "partition", "offset", "max_bytes"])
OffsetRequest = namedtuple("OffsetRequest",
["topic", "partition", "time", "max_offsets"])
OffsetCommitRequest = namedtuple("OffsetCommitRequest",
["topic", "partition", "offset", "metadata"])
OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
# Response payloads # Response payloads
ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) ProduceResponse = namedtuple("ProduceResponse",
FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"]) ["topic", "partition", "error", "offset"])
OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"])
OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"]) FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error",
OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) "highwaterMark", "messages"])
OffsetResponse = namedtuple("OffsetResponse",
["topic", "partition", "error", "offsets"])
OffsetCommitResponse = namedtuple("OffsetCommitResponse",
["topic", "partition", "error"])
OffsetFetchResponse = namedtuple("OffsetFetchResponse",
["topic", "partition", "offset",
"metadata", "error"])
BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr"])
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader",
"replicas", "isr"])
# Other useful structs # Other useful structs
OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"])
Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
class ErrorMapping(object): class ErrorMapping(object):
# Many of these are not actually used by the client # Many of these are not actually used by the client
UNKNOWN = -1 UNKNOWN = -1

View File

@@ -6,6 +6,7 @@ from kafka.protocol import create_message
log = logging.getLogger("kafka") log = logging.getLogger("kafka")
class SimpleProducer(object): class SimpleProducer(object):
""" """
A simple, round-robbin producer. Each message goes to exactly one partition A simple, round-robbin producer. Each message goes to exactly one partition
@@ -19,5 +20,6 @@ class SimpleProducer(object):
def send_messages(self, *msg): def send_messages(self, *msg):
req = ProduceRequest(self.topic, self.next_partition.next(), req = ProduceRequest(self.topic, self.next_partition.next(),
messages=[create_message(m) for m in msg]) messages=[create_message(m) for m in msg])
resp = self.client.send_produce_request([req])[0] resp = self.client.send_produce_request([req])[0]
assert resp.error == 0 assert resp.error == 0

View File

@@ -18,10 +18,12 @@ from kafka.util import (
log = logging.getLogger("kafka") log = logging.getLogger("kafka")
class KafkaProtocol(object): class KafkaProtocol(object):
""" """
Class to encapsulate all of the protocol encoding/decoding. This class does not Class to encapsulate all of the protocol encoding/decoding.
have any state associated with it, it is purely for organization. This class does not have any state associated with it, it is purely
for organization.
""" """
PRODUCE_KEY = 0 PRODUCE_KEY = 0
FETCH_KEY = 1 FETCH_KEY = 1
@@ -48,14 +50,14 @@ class KafkaProtocol(object):
request_key, # ApiKey request_key, # ApiKey
0, # ApiVersion 0, # ApiVersion
correlation_id, # CorrelationId correlation_id, # CorrelationId
len(client_id), # len(client_id),
client_id) # ClientId client_id) # ClientId
@classmethod @classmethod
def _encode_message_set(cls, messages): def _encode_message_set(cls, messages):
""" """
Encode a MessageSet. Unlike other arrays in the protocol, MessageSets are Encode a MessageSet. Unlike other arrays in the protocol,
not length-prefixed MessageSets are not length-prefixed
Format Format
====== ======
@@ -66,7 +68,8 @@ class KafkaProtocol(object):
message_set = "" message_set = ""
for message in messages: for message in messages:
encoded_message = KafkaProtocol._encode_message(message) encoded_message = KafkaProtocol._encode_message(message)
message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message) message_set += struct.pack('>qi%ds' % len(encoded_message), 0,
len(encoded_message), encoded_message)
return message_set return message_set
@classmethod @classmethod
@@ -74,8 +77,8 @@ class KafkaProtocol(object):
""" """
Encode a single message. Encode a single message.
The magic number of a message is a format version number. The only supported The magic number of a message is a format version number.
magic number right now is zero The only supported magic number right now is zero
Format Format
====== ======
@@ -96,24 +99,27 @@ class KafkaProtocol(object):
raise Exception("Unexpected magic number: %d" % message.magic) raise Exception("Unexpected magic number: %d" % message.magic)
return msg return msg
@classmethod @classmethod
def _decode_message_set_iter(cls, data): def _decode_message_set_iter(cls, data):
""" """
Iteratively decode a MessageSet Iteratively decode a MessageSet
Reads repeated elements of (offset, message), calling decode_message to decode a Reads repeated elements of (offset, message), calling decode_message
single message. Since compressed messages contain futher MessageSets, these two methods to decode a single message. Since compressed messages contain futher
have been decoupled so that they may recurse easily. MessageSets, these two methods have been decoupled so that they may
recurse easily.
""" """
cur = 0 cur = 0
while cur < len(data): while cur < len(data):
try: try:
((offset, ), cur) = relative_unpack('>q', data, cur) ((offset, ), cur) = relative_unpack('>q', data, cur)
(msg, cur) = read_int_string(data, cur) (msg, cur) = read_int_string(data, cur)
for (offset, message) in KafkaProtocol._decode_message(msg, offset): for (offset, message) in KafkaProtocol._decode_message(msg,
offset):
yield OffsetAndMessage(offset, message) yield OffsetAndMessage(offset, message)
except BufferUnderflowError: # If we get a partial read of a message, stop
except BufferUnderflowError:
# If we get a partial read of a message, stop
raise StopIteration() raise StopIteration()
@classmethod @classmethod
@@ -121,9 +127,10 @@ class KafkaProtocol(object):
""" """
Decode a single Message Decode a single Message
The only caller of this method is decode_message_set_iter. They are decoupled to The only caller of this method is decode_message_set_iter.
support nested messages (compressed MessageSets). The offset is actually read from They are decoupled to support nested messages (compressed MessageSets).
decode_message_set_iter (it is part of the MessageSet payload). The offset is actually read from decode_message_set_iter (it is part
of the MessageSet payload).
""" """
((crc, magic, att), cur) = relative_unpack('>iBB', data, 0) ((crc, magic, att), cur) = relative_unpack('>iBB', data, 0)
if crc != zlib.crc32(data[4:]): if crc != zlib.crc32(data[4:]):
@@ -131,23 +138,29 @@ class KafkaProtocol(object):
(key, cur) = read_int_string(data, cur) (key, cur) = read_int_string(data, cur)
(value, cur) = read_int_string(data, cur) (value, cur) = read_int_string(data, cur)
if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_NONE:
codec = att & KafkaProtocol.ATTRIBUTE_CODEC_MASK
if codec == KafkaProtocol.CODEC_NONE:
yield (offset, Message(magic, att, key, value)) yield (offset, Message(magic, att, key, value))
elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_GZIP:
elif codec == KafkaProtocol.CODEC_GZIP:
gz = gzip_decode(value) gz = gzip_decode(value)
for (offset, message) in KafkaProtocol._decode_message_set_iter(gz): for (offset, msg) in KafkaProtocol._decode_message_set_iter(gz):
yield (offset, message) yield (offset, msg)
elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_SNAPPY:
elif codec == KafkaProtocol.CODEC_SNAPPY:
snp = snappy_decode(value) snp = snappy_decode(value)
for (offset, message) in KafkaProtocol._decode_message_set_iter(snp): for (offset, msg) in KafkaProtocol._decode_message_set_iter(snp):
yield (offset, message) yield (offset, msg)
################## ##################
# Public API # # Public API #
################## ##################
@classmethod @classmethod
def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1, timeout=1000): def encode_produce_request(cls, client_id, correlation_id,
payloads=None, acks=1, timeout=1000):
""" """
Encode some ProduceRequest structs Encode some ProduceRequest structs
@@ -161,16 +174,26 @@ class KafkaProtocol(object):
1: written to disk by the leader 1: written to disk by the leader
2+: waits for this many number of replicas to sync 2+: waits for this many number of replicas to sync
-1: waits for all replicas to be in sync -1: waits for all replicas to be in sync
timeout: Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout timeout: Maximum time the server will wait for acks from replicas.
This is _not_ a socket timeout
""" """
payloads = [] if payloads is None else payloads
grouped_payloads = group_by_topic_and_partition(payloads) grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY)
message = cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.PRODUCE_KEY)
message += struct.pack('>hii', acks, timeout, len(grouped_payloads)) message += struct.pack('>hii', acks, timeout, len(grouped_payloads))
for topic, topic_payloads in grouped_payloads.items(): for topic, topic_payloads in grouped_payloads.items():
message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(topic_payloads)) message += struct.pack('>h%dsi' % len(topic),
len(topic), topic, len(topic_payloads))
for partition, payload in topic_payloads.items(): for partition, payload in topic_payloads.items():
message_set = KafkaProtocol._encode_message_set(payload.messages) msg_set = KafkaProtocol._encode_message_set(payload.messages)
message += struct.pack('>ii%ds' % len(message_set), partition, len(message_set), message_set) message += struct.pack('>ii%ds' % len(msg_set), partition,
len(msg_set), msg_set)
return struct.pack('>i%ds' % len(message), len(message), message) return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod @classmethod
@@ -183,17 +206,21 @@ class KafkaProtocol(object):
data: bytes to decode data: bytes to decode
""" """
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
for i in range(num_topics): for i in range(num_topics):
((strlen,), cur) = relative_unpack('>h', data, cur) ((strlen,), cur) = relative_unpack('>h', data, cur)
topic = data[cur:cur + strlen] topic = data[cur:cur + strlen]
cur += strlen cur += strlen
((num_partitions,), cur) = relative_unpack('>i', data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions): for i in range(num_partitions):
((partition, error, offset), cur) = relative_unpack('>ihq', data, cur) ((partition, error, offset), cur) = relative_unpack('>ihq',
data, cur)
yield ProduceResponse(topic, partition, error, offset) yield ProduceResponse(topic, partition, error, offset)
@classmethod @classmethod
def encode_fetch_request(cls, client_id, correlation_id, payloads=[], max_wait_time=100, min_bytes=4096): def encode_fetch_request(cls, client_id, correlation_id, payloads=None,
max_wait_time=100, min_bytes=4096):
""" """
Encodes some FetchRequest structs Encodes some FetchRequest structs
@@ -203,17 +230,27 @@ class KafkaProtocol(object):
correlation_id: string correlation_id: string
payloads: list of FetchRequest payloads: list of FetchRequest
max_wait_time: int, how long to block waiting on min_bytes of data max_wait_time: int, how long to block waiting on min_bytes of data
min_bytes: int, the minimum number of bytes to accumulate before returning the response min_bytes: int, the minimum number of bytes to accumulate before
returning the response
""" """
payloads = [] if payloads is None else payloads
grouped_payloads = group_by_topic_and_partition(payloads) grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY)
message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(grouped_payloads)) # -1 is the replica id message = cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.FETCH_KEY)
# -1 is the replica id
message += struct.pack('>iiii', -1, max_wait_time, min_bytes,
len(grouped_payloads))
for topic, topic_payloads in grouped_payloads.items(): for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic) message += write_short_string(topic)
message += struct.pack('>i', len(topic_payloads)) message += struct.pack('>i', len(topic_payloads))
for partition, payload in topic_payloads.items(): for partition, payload in topic_payloads.items():
message += struct.pack('>iqi', partition, payload.offset, payload.max_bytes) message += struct.pack('>iqi', partition, payload.offset,
payload.max_bytes)
return struct.pack('>i%ds' % len(message), len(message), message) return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod @classmethod
@@ -226,25 +263,41 @@ class KafkaProtocol(object):
data: bytes to decode data: bytes to decode
""" """
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
for i in range(num_topics): for i in range(num_topics):
(topic, cur) = read_short_string(data, cur) (topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions): for i in range(num_partitions):
((partition, error, highwater_mark_offset), cur) = relative_unpack('>ihq', data, cur) ((partition, error, highwater_mark_offset), cur) = \
relative_unpack('>ihq', data, cur)
(message_set, cur) = read_int_string(data, cur) (message_set, cur) = read_int_string(data, cur)
yield FetchResponse(topic, partition, error, highwater_mark_offset,
yield FetchResponse(
topic, partition, error,
highwater_mark_offset,
KafkaProtocol._decode_message_set_iter(message_set)) KafkaProtocol._decode_message_set_iter(message_set))
@classmethod @classmethod
def encode_offset_request(cls, client_id, correlation_id, payloads=[]): def encode_offset_request(cls, client_id, correlation_id, payloads=None):
payloads = [] if payloads is None else payloads
grouped_payloads = group_by_topic_and_partition(payloads) grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY)
message += struct.pack('>ii', -1, len(grouped_payloads)) # -1 is the replica id message = cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.OFFSET_KEY)
# -1 is the replica id
message += struct.pack('>ii', -1, len(grouped_payloads))
for topic, topic_payloads in grouped_payloads.items(): for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic) message += write_short_string(topic)
message += struct.pack('>i', len(topic_payloads)) message += struct.pack('>i', len(topic_payloads))
for partition, payload in topic_payloads.items(): for partition, payload in topic_payloads.items():
message += struct.pack('>iqi', partition, payload.time, payload.max_offsets) message += struct.pack('>iqi', partition, payload.time,
payload.max_offsets)
return struct.pack('>i%ds' % len(message), len(message), message) return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod @classmethod
@@ -257,19 +310,24 @@ class KafkaProtocol(object):
data: bytes to decode data: bytes to decode
""" """
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
for i in range(num_topics): for i in range(num_topics):
(topic, cur) = read_short_string(data, cur) (topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions): for i in range(num_partitions):
((partition, error, num_offsets,), cur) = relative_unpack('>ihi', data, cur) ((partition, error, num_offsets,), cur) = \
relative_unpack('>ihi', data, cur)
offsets = [] offsets = []
for j in range(num_offsets): for j in range(num_offsets):
((offset,), cur) = relative_unpack('>q', data, cur) ((offset,), cur) = relative_unpack('>q', data, cur)
offsets.append(offset) offsets.append(offset)
yield OffsetResponse(topic, partition, error, tuple(offsets)) yield OffsetResponse(topic, partition, error, tuple(offsets))
@classmethod @classmethod
def encode_metadata_request(cls, client_id, correlation_id, topics=[]): def encode_metadata_request(cls, client_id, correlation_id, topics=None):
""" """
Encode a MetadataRequest Encode a MetadataRequest
@@ -279,10 +337,15 @@ class KafkaProtocol(object):
correlation_id: string correlation_id: string
topics: list of strings topics: list of strings
""" """
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.METADATA_KEY) topics = [] if topics is None else topics
message = cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.METADATA_KEY)
message += struct.pack('>i', len(topics)) message += struct.pack('>i', len(topics))
for topic in topics: for topic in topics:
message += struct.pack('>h%ds' % len(topic), len(topic), topic) message += struct.pack('>h%ds' % len(topic), len(topic), topic)
return write_int_string(message) return write_int_string(message)
@classmethod @classmethod
@@ -307,22 +370,34 @@ class KafkaProtocol(object):
# Topic info # Topic info
((num_topics,), cur) = relative_unpack('>i', data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur)
topicMetadata = {} topicMetadata = {}
for i in range(num_topics): for i in range(num_topics):
((topicError,), cur) = relative_unpack('>h', data, cur) ((topicError,), cur) = relative_unpack('>h', data, cur)
(topicName, cur) = read_short_string(data, cur) (topicName, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur)
partitionMetadata = {} partitionMetadata = {}
for j in range(num_partitions): for j in range(num_partitions):
((partitionErrorCode, partition, leader, numReplicas), cur) = relative_unpack('>hiii', data, cur) ((partitionErrorCode, partition, leader, numReplicas), cur) = \
(replicas, cur) = relative_unpack('>%di' % numReplicas, data, cur) relative_unpack('>hiii', data, cur)
(replicas, cur) = relative_unpack('>%di' % numReplicas,
data, cur)
((numIsr,), cur) = relative_unpack('>i', data, cur) ((numIsr,), cur) = relative_unpack('>i', data, cur)
(isr, cur) = relative_unpack('>%di' % numIsr, data, cur) (isr, cur) = relative_unpack('>%di' % numIsr, data, cur)
partitionMetadata[partition] = PartitionMetadata(topicName, partition, leader, replicas, isr)
partitionMetadata[partition] = \
PartitionMetadata(topicName, partition, leader,
replicas, isr)
topicMetadata[topicName] = partitionMetadata topicMetadata[topicName] = partitionMetadata
return (brokers, topicMetadata) return (brokers, topicMetadata)
@classmethod @classmethod
def encode_offset_commit_request(cls, client_id, correlation_id, group, payloads): def encode_offset_commit_request(cls, client_id, correlation_id,
group, payloads):
""" """
Encode some OffsetCommitRequest structs Encode some OffsetCommitRequest structs
@@ -334,15 +409,20 @@ class KafkaProtocol(object):
payloads: list of OffsetCommitRequest payloads: list of OffsetCommitRequest
""" """
grouped_payloads = group_by_topic_and_partition(payloads) grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY)
message = cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.OFFSET_COMMIT_KEY)
message += write_short_string(group) message += write_short_string(group)
message += struct.pack('>i', len(grouped_payloads)) message += struct.pack('>i', len(grouped_payloads))
for topic, topic_payloads in grouped_payloads.items(): for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic) message += write_short_string(topic)
message += struct.pack('>i', len(topic_payloads)) message += struct.pack('>i', len(topic_payloads))
for partition, payload in topic_payloads.items(): for partition, payload in topic_payloads.items():
message += struct.pack('>iq', partition, payload.offset) message += struct.pack('>iq', partition, payload.offset)
message += write_short_string(payload.metadata) message += write_short_string(payload.metadata)
return struct.pack('>i%ds' % len(message), len(message), message) return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod @classmethod
@@ -357,15 +437,18 @@ class KafkaProtocol(object):
((correlation_id,), cur) = relative_unpack('>i', data, 0) ((correlation_id,), cur) = relative_unpack('>i', data, 0)
(client_id, cur) = read_short_string(data, cur) (client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur)
for i in xrange(num_topics): for i in xrange(num_topics):
(topic, cur) = read_short_string(data, cur) (topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in xrange(num_partitions): for i in xrange(num_partitions):
((partition, error), cur) = relative_unpack('>ih', data, cur) ((partition, error), cur) = relative_unpack('>ih', data, cur)
yield OffsetCommitResponse(topic, partition, error) yield OffsetCommitResponse(topic, partition, error)
@classmethod @classmethod
def encode_offset_fetch_request(cls, client_id, correlation_id, group, payloads): def encode_offset_fetch_request(cls, client_id, correlation_id,
group, payloads):
""" """
Encode some OffsetFetchRequest structs Encode some OffsetFetchRequest structs
@@ -377,14 +460,19 @@ class KafkaProtocol(object):
payloads: list of OffsetFetchRequest payloads: list of OffsetFetchRequest
""" """
grouped_payloads = group_by_topic_and_partition(payloads) grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY) message = cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.OFFSET_FETCH_KEY)
message += write_short_string(group) message += write_short_string(group)
message += struct.pack('>i', len(grouped_payloads)) message += struct.pack('>i', len(grouped_payloads))
for topic, topic_payloads in grouped_payloads.items(): for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic) message += write_short_string(topic)
message += struct.pack('>i', len(topic_payloads)) message += struct.pack('>i', len(topic_payloads))
for partition, payload in topic_payloads.items(): for partition, payload in topic_payloads.items():
message += struct.pack('>i', partition) message += struct.pack('>i', partition)
return struct.pack('>i%ds' % len(message), len(message), message) return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod @classmethod
@@ -400,14 +488,19 @@ class KafkaProtocol(object):
((correlation_id,), cur) = relative_unpack('>i', data, 0) ((correlation_id,), cur) = relative_unpack('>i', data, 0)
(client_id, cur) = read_short_string(data, cur) (client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur)
for i in range(num_topics): for i in range(num_topics):
(topic, cur) = read_short_string(data, cur) (topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions): for i in range(num_partitions):
((partition, offset), cur) = relative_unpack('>iq', data, cur) ((partition, offset), cur) = relative_unpack('>iq', data, cur)
(metadata, cur) = read_short_string(data, cur) (metadata, cur) = read_short_string(data, cur)
((error,), cur) = relative_unpack('>h', data, cur) ((error,), cur) = relative_unpack('>h', data, cur)
yield OffsetFetchResponse(topic, partition, offset, metadata, error)
yield OffsetFetchResponse(topic, partition, offset,
metadata, error)
def create_message(payload, key=None): def create_message(payload, key=None):
""" """
@@ -420,6 +513,7 @@ def create_message(payload, key=None):
""" """
return Message(0, 0, key, payload) return Message(0, 0, key, payload)
def create_gzip_message(payloads, key=None): def create_gzip_message(payloads, key=None):
""" """
Construct a Gzipped Message containing multiple Messages Construct a Gzipped Message containing multiple Messages
@@ -434,8 +528,12 @@ def create_gzip_message(payloads, key=None):
""" """
message_set = KafkaProtocol._encode_message_set( message_set = KafkaProtocol._encode_message_set(
[create_message(payload) for payload in payloads]) [create_message(payload) for payload in payloads])
gzipped = gzip_encode(message_set) gzipped = gzip_encode(message_set)
return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP), key, gzipped) codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP
return Message(0, 0x00 | codec, key, gzipped)
def create_snappy_message(payloads, key=None): def create_snappy_message(payloads, key=None):
""" """
@@ -451,5 +549,8 @@ def create_snappy_message(payloads, key=None):
""" """
message_set = KafkaProtocol._encode_message_set( message_set = KafkaProtocol._encode_message_set(
[create_message(payload) for payload in payloads]) [create_message(payload) for payload in payloads])
snapped = snappy_encode(message_set) snapped = snappy_encode(message_set)
return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY), key, snapped) codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY
return Message(0, 0x00 | codec, key, snapped)

View File

@@ -10,8 +10,10 @@ log = logging.getLogger("kafka")
raise NotImplementedError("Still need to refactor this class") raise NotImplementedError("Still need to refactor this class")
class KafkaConsumerProcess(Process): class KafkaConsumerProcess(Process):
def __init__(self, client, topic, partition, out_queue, barrier, consumer_fetch_size=1024, consumer_sleep=200): def __init__(self, client, topic, partition, out_queue, barrier,
consumer_fetch_size=1024, consumer_sleep=200):
self.client = copy(client) self.client = copy(client)
self.topic = topic self.topic = topic
self.partition = partition self.partition = partition
@@ -23,29 +25,40 @@ class KafkaConsumerProcess(Process):
Process.__init__(self) Process.__init__(self)
def __str__(self): def __str__(self):
return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % ( return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % \
self.topic, self.partition, self.consumer_sleep) (self.topic, self.partition, self.consumer_sleep)
def run(self): def run(self):
self.barrier.wait() self.barrier.wait()
log.info("Starting %s" % self) log.info("Starting %s" % self)
fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.consumer_fetch_size) fetchRequest = FetchRequest(self.topic, self.partition,
offset=0, size=self.consumer_fetch_size)
while True: while True:
if self.barrier.is_set() == False: if self.barrier.is_set() is False:
log.info("Shutdown %s" % self) log.info("Shutdown %s" % self)
self.client.close() self.client.close()
break break
lastOffset = fetchRequest.offset lastOffset = fetchRequest.offset
(messages, fetchRequest) = self.client.get_message_set(fetchRequest) (messages, fetchRequest) = self.client.get_message_set(fetchRequest)
if fetchRequest.offset == lastOffset: if fetchRequest.offset == lastOffset:
log.debug("No more data for this partition, sleeping a bit (200ms)") log.debug("No more data for this partition, "
"sleeping a bit (200ms)")
time.sleep(self.consumer_sleep) time.sleep(self.consumer_sleep)
continue continue
for message in messages: for message in messages:
self.out_queue.put(message) self.out_queue.put(message)
class KafkaProducerProcess(Process): class KafkaProducerProcess(Process):
def __init__(self, client, topic, in_queue, barrier, producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100): def __init__(self, client, topic, in_queue, barrier,
producer_flush_buffer=500,
producer_flush_timeout=2000,
producer_timeout=100):
self.client = copy(client) self.client = copy(client)
self.topic = topic self.topic = topic
self.in_queue = in_queue self.in_queue = in_queue
@@ -57,8 +70,10 @@ class KafkaProducerProcess(Process):
Process.__init__(self) Process.__init__(self)
def __str__(self): def __str__(self):
return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, flush_timeout=%s, timeout=%s]" % ( return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, \
self.topic, self.producer_flush_buffer, self.producer_flush_timeout, self.producer_timeout) flush_timeout=%s, timeout=%s]" % (
self.topic, self.producer_flush_buffer,
self.producer_flush_timeout, self.producer_timeout)
def run(self): def run(self):
self.barrier.wait() self.barrier.wait()
@@ -67,36 +82,47 @@ class KafkaProducerProcess(Process):
last_produce = time.time() last_produce = time.time()
def flush(messages): def flush(messages):
self.client.send_message_set(ProduceRequest(self.topic, -1, messages)) self.client.send_message_set(ProduceRequest(self.topic, -1,
messages))
del messages[:] del messages[:]
while True: while True:
if self.barrier.is_set() == False: if self.barrier.is_set() is False:
log.info("Shutdown %s, flushing messages" % self) log.info("Shutdown %s, flushing messages" % self)
flush(messages) flush(messages)
self.client.close() self.client.close()
break break
if len(messages) > self.producer_flush_buffer: if len(messages) > self.producer_flush_buffer:
log.debug("Message count threashold reached. Flushing messages") log.debug("Message count threshold reached. Flushing messages")
flush(messages) flush(messages)
last_produce = time.time() last_produce = time.time()
elif (time.time() - last_produce) > self.producer_flush_timeout: elif (time.time() - last_produce) > self.producer_flush_timeout:
log.debug("Producer timeout reached. Flushing messages") log.debug("Producer timeout reached. Flushing messages")
flush(messages) flush(messages)
last_produce = time.time() last_produce = time.time()
try: try:
messages.append(KafkaClient.create_message(self.in_queue.get(True, self.producer_timeout))) msg = KafkaClient.create_message(self.in_queue.get(True,
self.producer_timeout))
messages.append(msg)
except Empty: except Empty:
continue continue
class KafkaQueue(object):
def __init__(self, client, topic, partitions, producer_config={}, consumer_config={}):
"""
KafkaQueue a Queue-like object backed by a Kafka producer and some number of consumers
Messages are eagerly loaded by the consumer in batches of size consumer_fetch_size. class KafkaQueue(object):
Messages are buffered in the producer thread until producer_flush_timeout or def __init__(self, client, topic, partitions,
producer_flush_buffer is reached. producer_config=None, consumer_config=None):
"""
KafkaQueue a Queue-like object backed by a Kafka producer and some
number of consumers
Messages are eagerly loaded by the consumer in batches of size
consumer_fetch_size.
Messages are buffered in the producer thread until
producer_flush_timeout or producer_flush_buffer is reached.
Params Params
====== ======
@@ -108,21 +134,26 @@ class KafkaQueue(object):
Consumer Config Consumer Config
=============== ===============
consumer_fetch_size: int, number of bytes to fetch in one call to Kafka. Default consumer_fetch_size: int, number of bytes to fetch in one call
is 1024 to Kafka. Default is 1024
consumer_sleep: int, time in milliseconds a consumer should sleep when it reaches consumer_sleep: int, time in milliseconds a consumer should sleep
the end of a partition. Default is 200 when it reaches the end of a partition. Default is 200
Producer Config Producer Config
=============== ===============
producer_timeout: int, time in milliseconds a producer should wait for messages to producer_timeout: int, time in milliseconds a producer should
enqueue for producing. Default is 100 wait for messages to enqueue for producing.
producer_flush_timeout: int, time in milliseconds a producer should allow messages Default is 100
to accumulate before sending to Kafka. Default is 2000 producer_flush_timeout: int, time in milliseconds a producer
producer_flush_buffer: int, number of messages a producer should allow to accumulate. should allow messages to accumulate before
Default is 500 sending to Kafka. Default is 2000
producer_flush_buffer: int, number of messages a producer should
allow to accumulate. Default is 500
""" """
producer_config = {} if producer_config is None else producer_config
consumer_config = {} if consumer_config is None else consumer_config
self.in_queue = Queue() self.in_queue = Queue()
self.out_queue = Queue() self.out_queue = Queue()
self.consumers = [] self.consumers = []
@@ -130,12 +161,15 @@ class KafkaQueue(object):
# Initialize and start consumer threads # Initialize and start consumer threads
for partition in partitions: for partition in partitions:
consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier, **consumer_config) consumer = KafkaConsumerProcess(client, topic, partition,
self.in_queue, self.barrier,
**consumer_config)
consumer.start() consumer.start()
self.consumers.append(consumer) self.consumers.append(consumer)
# Initialize and start producer thread # Initialize and start producer thread
self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier, **producer_config) self.producer = KafkaProducerProcess(client, topic, self.out_queue,
self.barrier, **producer_config)
self.producer.start() self.producer.start()
# Trigger everything to start # Trigger everything to start

View File

@@ -3,64 +3,81 @@ from itertools import groupby
import struct import struct
from threading import Timer from threading import Timer
def write_int_string(s): def write_int_string(s):
if s is None: if s is None:
return struct.pack('>i', -1) return struct.pack('>i', -1)
else: else:
return struct.pack('>i%ds' % len(s), len(s), s) return struct.pack('>i%ds' % len(s), len(s), s)
def write_short_string(s): def write_short_string(s):
if s is None: if s is None:
return struct.pack('>h', -1) return struct.pack('>h', -1)
else: else:
return struct.pack('>h%ds' % len(s), len(s), s) return struct.pack('>h%ds' % len(s), len(s), s)
def read_short_string(data, cur): def read_short_string(data, cur):
if len(data) < cur + 2: if len(data) < cur + 2:
raise BufferUnderflowError("Not enough data left") raise BufferUnderflowError("Not enough data left")
(strLen,) = struct.unpack('>h', data[cur:cur + 2]) (strLen,) = struct.unpack('>h', data[cur:cur + 2])
if strLen == -1: if strLen == -1:
return (None, cur + 2) return (None, cur + 2)
cur += 2 cur += 2
if len(data) < cur + strLen: if len(data) < cur + strLen:
raise BufferUnderflowError("Not enough data left") raise BufferUnderflowError("Not enough data left")
out = data[cur:cur + strLen] out = data[cur:cur + strLen]
return (out, cur + strLen) return (out, cur + strLen)
def read_int_string(data, cur): def read_int_string(data, cur):
if len(data) < cur + 4: if len(data) < cur + 4:
raise BufferUnderflowError("Not enough data left") raise BufferUnderflowError("Not enough data left")
(strLen,) = struct.unpack('>i', data[cur:cur + 4]) (strLen,) = struct.unpack('>i', data[cur:cur + 4])
if strLen == -1: if strLen == -1:
return (None, cur + 4) return (None, cur + 4)
cur += 4 cur += 4
if len(data) < cur + strLen: if len(data) < cur + strLen:
raise BufferUnderflowError("Not enough data left") raise BufferUnderflowError("Not enough data left")
out = data[cur:cur + strLen] out = data[cur:cur + strLen]
return (out, cur + strLen) return (out, cur + strLen)
def relative_unpack(fmt, data, cur): def relative_unpack(fmt, data, cur):
size = struct.calcsize(fmt) size = struct.calcsize(fmt)
if len(data) < cur + size: if len(data) < cur + size:
raise BufferUnderflowError("Not enough data left") raise BufferUnderflowError("Not enough data left")
out = struct.unpack(fmt, data[cur:cur + size]) out = struct.unpack(fmt, data[cur:cur + size])
return (out, cur + size) return (out, cur + size)
def group_by_topic_and_partition(tuples): def group_by_topic_and_partition(tuples):
out = defaultdict(dict) out = defaultdict(dict)
for t in tuples: for t in tuples:
out[t.topic][t.partition] = t out[t.topic][t.partition] = t
return out return out
class BufferUnderflowError(Exception): class BufferUnderflowError(Exception):
pass pass
class ChecksumError(Exception): class ChecksumError(Exception):
pass pass
class ReentrantTimer(object): class ReentrantTimer(object):
""" """
A timer that can be restarted, unlike threading.Timer (although this uses threading.Timer) A timer that can be restarted, unlike threading.Timer
(although this uses threading.Timer)
t: timer interval in milliseconds t: timer interval in milliseconds
fn: a callable to invoke fn: a callable to invoke