Fix a bunch of bugs

This commit is contained in:
David Arthur
2013-02-20 22:05:21 -05:00
parent 3f0d91883d
commit 5eb79c04a1
2 changed files with 138 additions and 65 deletions

View File

@@ -14,6 +14,7 @@ from .codec import snappy_encode, snappy_decode
from .util import read_short_string, read_int_string
from .util import relative_unpack
from .util import write_short_string, write_int_string
from .util import group_list_by_key
from .util import BufferUnderflowError, ChecksumError
log = logging.getLogger("kafka")
@@ -217,7 +218,7 @@ class KafkaProtocol(object):
@classmethod
def encode_produce_request(self, client_id, correlation_id, payloads=[], acks=1, timeout=1000):
def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1, timeout=1000):
"""
Encode some ProduceRequest structs
@@ -236,8 +237,7 @@ class KafkaProtocol(object):
payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY)
message += struct.pack('>hii', acks, timeout, len(payloads_by_topic))
for topic, payload in payloads_by_topic:
payloads = list(payloads)
for topic, payloads in payloads_by_topic.items():
message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(payloads))
for payload in payloads:
message_set = KafkaProtocol._encode_message_set(payload.messages)
@@ -280,8 +280,7 @@ class KafkaProtocol(object):
payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY)
message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(payloads_by_topic)) # -1 is the replica id
for topic, payload in payloads_by_topic:
payloads = list(payloads)
for topic, payloads in payloads_by_topic.items():
message += write_short_string(topic)
message += struct.pack('>i', len(payloads))
for payload in payloads:
@@ -312,8 +311,7 @@ class KafkaProtocol(object):
payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY)
message += struct.pack('>ii', -1, len(payloads_by_topic)) # -1 is the replica id
for topic, payload in payloads_by_topic:
payloads = list(payloads)
for topic, payloads in payloads_by_topic.items():
message += write_short_string(topic)
message += struct.pack('>i', len(payloads))
for payload in payloads:
@@ -406,8 +404,7 @@ class KafkaProtocol(object):
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY)
message += write_short_string(group)
message += struct.pack('>i', len(payloads_by_topic))
for topic, payload in payloads_by_topic:
payloads = list(payloads)
for topic, payloads in payloads_by_topic.items():
message += write_short_string(topic)
message += struct.pack('>i', len(payloads))
for payload in payloads:
@@ -450,8 +447,7 @@ class KafkaProtocol(object):
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY)
message += write_short_string(group)
message += struct.pack('>i', len(payloads_by_topic))
for topic, payload in payloads_by_topic:
payloads = list(payloads)
for topic, payloads in payloads_by_topic.items():
message += write_short_string(topic)
message += struct.pack('>i', len(payloads))
for payload in payloads:
@@ -582,13 +578,14 @@ class KafkaClient(object):
def load_metadata_for_topics(self, *topics):
"""
Discover brokers and metadata for a set of topics
Discover brokers and metadata for a set of topics. This method will
recurse in the event of a retry.
"""
requestId = self.next_id()
request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, requestId, topics)
conn = self.conns.values()[0] # Just get the first one in the list
conn.send(requestId, request)
response = conn.recv(requestId)
response = self.try_send_request(requestId, request)
if response is None:
raise Exception("All servers failed to process request")
(brokers, topics) = KafkaProtocol.decode_metadata_response(response)
log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics)
@@ -600,7 +597,6 @@ class KafkaClient(object):
log.info("Partition is unassigned, delay for 1s and retry")
time.sleep(1)
self.load_metadata_for_topics(topic)
return
else:
self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader]
@@ -608,18 +604,44 @@ class KafkaClient(object):
key = TopicAndPartition(topic, partition)
if key not in self.topics_to_brokers:
self.load_metadata_for_topics(topic)
if key not in self.topics_to_brokers:
raise Exception("Partition does not exist: %s" % str(key))
return self.topics_to_brokers[key]
def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
"""
Encode and send some ProduceRequests
ProduceRequests will be grouped by (topic, partition) and then sent to a specific
broker. Output is a list of responses in the same order as the list of payloads
specified
Params
======
payloads: list of ProduceRequest
fail_on_error: boolean, should we raise an Exception if we encounter an API error?
callback: function, instead of returning the ProduceResponse, first pass it through this function
Return
======
list of ProduceResponse or callback(ProduceResponse), in the order of input payloads
"""
key_fn = lambda x: (x.topic, x.partition)
# Note the order of the incoming payloads
original_keys = [key_fn(payload) for payload in payloads]
# Group the produce requests by topic+partition
payloads_by_topic_and_partition = group_list_by_key(payloads, key=lambda x: (x.topic, x.partition))
payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn)
# Group the produce requests by which broker they go to
payloads_by_broker = defaultdict(list)
for (topic, partition), payload in payloads_by_topic_and_partition:
payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload)
for (topic, partition), payloads in payloads_by_topic_and_partition.items():
payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads
# Accumulate the responses in a dictionary, keyed by key_fn
acc = {}
out = []
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self.get_conn_for_broker(broker)
@@ -635,10 +657,13 @@ class KafkaClient(object):
(TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error))
# Run the callback
if callback is not None:
out.append(callback(produce_response))
acc[key_fn(produce_response)] = callback(produce_response)
else:
out.append(produce_response)
return out
acc[key_fn(produce_response)] = produce_response
print(acc)
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys)
def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None):
"""
@@ -647,15 +672,22 @@ class KafkaClient(object):
Payloads are grouped by topic and partition so they can be pipelined to the same
brokers.
"""
key_fn = lambda x: (x.topic, x.partition)
# Note the order of the incoming payloads
original_keys = [key_fn(payload) for payload in payloads]
# Group the produce requests by topic+partition
payloads_by_topic_and_partition = group_list_by_key(payloads, key=lambda x: (x.topic, x.partition))
payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn)
# Group the produce requests by which broker they go to
payloads_by_broker = defaultdict(list)
for (topic, partition), payload in payloads_by_topic_and_partition:
payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload)
for (topic, partition), payloads in payloads_by_topic_and_partition.items():
payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads
# Accumulate the responses in a dictionary, keyed by key_fn
acc = {}
out = []
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self.get_conn_for_broker(broker)
@@ -667,21 +699,41 @@ class KafkaClient(object):
for fetch_response in KafkaProtocol.decode_fetch_response_iter(response):
# Check for errors
if fail_on_error == True and fetch_response.error != 0:
raise Exception("FetchRequest %s failed with errorcode=%d",
raise Exception("FetchRequest %s failed with errorcode=%d" %
(TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error))
# Run the callback
if callback is not None:
out.append(callback(fetch_response))
acc[key_fn(fetch_response)] = callback(fetch_response)
else:
out.append(fetch_response)
return out
acc[key_fn(fetch_response)] = fetch_response
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys)
def try_send_request(self, requestId, request):
"""
Attempt to send a broker-agnostic request to one of the available brokers.
Keep trying until you succeed.
"""
for conn in self.conns.values():
try:
conn.send(requestId, request)
response = conn.recv(requestId)
return response
except Exception:
log.warning("Could not commit offset to server %s, trying next server", conn)
continue
return None
def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None):
conn = self.conns.values()[0] # Just get the first one in the list
requestId = self.next_id()
request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads)
conn.send(requestId, request)
response = conn.recv(requestId)
response = self.try_send_request(requestId, request)
if response is None:
if fail_on_error is True:
raise Exception("All servers failed to process request")
else:
return None
out = []
for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response):
if fail_on_error == True and offset_commit_response.error != 0:
@@ -693,15 +745,19 @@ class KafkaClient(object):
return out
def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None):
conn = self.conns.values()[0] # Just get the first one in the list
requestId = self.next_id()
request = KafkaProtocol.encode_offset_fetch_request(KafkaClient.CLIENT_ID, requestId, group, payloads)
conn.send(requestId, request)
response = conn.recv(requestId)
response = self.try_send_request(requestId, request)
if response is None:
if fail_on_error is True:
raise Exception("All servers failed to process request")
else:
return None
out = []
for offset_fetch_response in KafkaProtocol.decode_offset_fetch_response(response):
if fail_on_error == True and offset_fetch_response.error != 0:
raise Exception("OffsetFetchRequest failed with errorcode=%s", offset_fetch_response.error)
raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % (
offset_fetch_response.topic, offset_fetch_response.partition, offset_fetch_response.error))
if callback is not None:
out.append(callback(offset_fetch_response))
else:
@@ -709,20 +765,22 @@ class KafkaClient(object):
return out
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
topic = "foo8"
# Bootstrap connection
conn = KafkaClient("localhost", 9092)
# Create some Messages
messages = (KafkaProtocol.create_gzip_message("GZIPPed"),
messages = (KafkaProtocol.create_gzip_message(["GZIPPed"]),
KafkaProtocol.create_message("not-gzipped"))
# Create a ProduceRequest
produce = ProduceRequest(topic="foo5", partition=0, messages=messages)
produce1 = ProduceRequest(topic=topic, partition=0, messages=messages)
produce2 = ProduceRequest(topic=topic, partition=1, messages=messages)
# Send the ProduceRequest
produce_resp = conn.send_produce_request(payloads=[produce])
produce_resp = conn.send_produce_request(payloads=[produce1, produce2])
# Check for errors
for resp in produce_resp:
@@ -734,29 +792,41 @@ if __name__ == "__main__":
#conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("topic-1", 0, 42, "METADATA?")])
#conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("topic-1", 0)])
print conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("foo5", 0)])
offset = 0
done = False
while not done:
print offset
for resp in conn.send_fetch_request(payloads=[FetchRequest(topic="foo5", partition=0, offset=offset, max_bytes=4096)]):
def init_offsets(offset_response):
if offset_response.error not in (ErrorMapping.NO_ERROR, ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON):
raise Exception("OffsetFetch failed: %s" % (offset_response))
elif offset_response.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
return 0
else:
return offset_response.offset
# Load offsets
(offset1, offset2) = conn.send_offset_fetch_request(
group="group1",
payloads=[OffsetFetchRequest(topic, 0),OffsetFetchRequest(topic, 1)],
fail_on_error=False,
callback=init_offsets
)
print offset1, offset2
while True:
for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=0, offset=offset1, max_bytes=4096)]):
i = 0
for msg in resp.messages:
print conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("foo5", 0, offset, "")])
print msg, offset
offset = msg.offset+1
print msg
offset1 = msg.offset+1
print offset1, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 0, offset1, "")])
i += 1
if i == 0:
raise StopIteration("no more messages")
class Consumer(object):
def __init__(self, conn):
self._conn = conn
class Producer(object):
pass
for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=1, offset=offset2, max_bytes=4096)]):
i = 0
for msg in resp.messages:
print msg
offset2 = msg.offset+1
print offset2, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 1, offset2, "")])
i += 1
if i == 0:
raise StopIteration("no more messages")

View File

@@ -44,9 +44,12 @@ def relative_unpack(fmt, data, cur):
out = struct.unpack(fmt, data[cur:cur+size])
return (out, cur+size)
def group_list_by_key(l, key):
sorted_l = sorted(l, key=key)
return list(groupby(sorted_l, key=key))
def group_list_by_key(it, key):
sorted_it = sorted(it, key=key)
out = {}
for k, group in groupby(sorted_it, key=key):
out[k] = list(group)
return out
class BufferUnderflowError(Exception):
pass