Removing __main__ stuff from client.py
This commit is contained in:
@@ -806,69 +806,3 @@ class KafkaClient(object):
|
|||||||
else:
|
else:
|
||||||
out.append(offset_fetch_response)
|
out.append(offset_fetch_response)
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
|
|
||||||
topic = "foo8"
|
|
||||||
# Bootstrap connection
|
|
||||||
conn = KafkaClient("localhost", 49720)
|
|
||||||
|
|
||||||
# Create some Messages
|
|
||||||
messages = (KafkaProtocol.create_gzip_message(["GZIPPed"]),
|
|
||||||
KafkaProtocol.create_message("not-gzipped"))
|
|
||||||
|
|
||||||
produce1 = ProduceRequest(topic=topic, partition=0, messages=messages)
|
|
||||||
produce2 = ProduceRequest(topic=topic, partition=1, messages=messages)
|
|
||||||
|
|
||||||
# Send the ProduceRequest
|
|
||||||
produce_resp = conn.send_produce_request(payloads=[produce1, produce2])
|
|
||||||
|
|
||||||
# Check for errors
|
|
||||||
for resp in produce_resp:
|
|
||||||
if resp.error != 0:
|
|
||||||
raise Exception("ProduceRequest failed with errorcode=%d", resp.error)
|
|
||||||
print resp
|
|
||||||
|
|
||||||
# Offset commit/fetch
|
|
||||||
#conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("topic-1", 0, 42, "METADATA?")])
|
|
||||||
#conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("topic-1", 0)])
|
|
||||||
|
|
||||||
def init_offsets(offset_response):
|
|
||||||
if offset_response.error not in (ErrorMapping.NO_ERROR, ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON):
|
|
||||||
raise Exception("OffsetFetch failed: %s" % (offset_response))
|
|
||||||
elif offset_response.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
|
|
||||||
return 0
|
|
||||||
else:
|
|
||||||
return offset_response.offset
|
|
||||||
# Load offsets
|
|
||||||
(offset1, offset2) = conn.send_offset_fetch_request(
|
|
||||||
group="group1",
|
|
||||||
payloads=[OffsetFetchRequest(topic, 0),OffsetFetchRequest(topic, 1)],
|
|
||||||
fail_on_error=False,
|
|
||||||
callback=init_offsets
|
|
||||||
)
|
|
||||||
print offset1, offset2
|
|
||||||
|
|
||||||
while True:
|
|
||||||
for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=0, offset=offset1, max_bytes=4096)]):
|
|
||||||
i = 0
|
|
||||||
for msg in resp.messages:
|
|
||||||
print msg
|
|
||||||
offset1 = msg.offset+1
|
|
||||||
print offset1, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 0, offset1, "")])
|
|
||||||
i += 1
|
|
||||||
if i == 0:
|
|
||||||
raise StopIteration("no more messages")
|
|
||||||
|
|
||||||
for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=1, offset=offset2, max_bytes=4096)]):
|
|
||||||
i = 0
|
|
||||||
for msg in resp.messages:
|
|
||||||
print msg
|
|
||||||
offset2 = msg.offset+1
|
|
||||||
print offset2, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 1, offset2, "")])
|
|
||||||
i += 1
|
|
||||||
if i == 0:
|
|
||||||
raise StopIteration("no more messages")
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user