Some work on a simple consumer
This commit is contained in:
@@ -685,7 +685,7 @@ class KafkaClient(object):
|
|||||||
for produce_response in KafkaProtocol.decode_produce_response(response):
|
for produce_response in KafkaProtocol.decode_produce_response(response):
|
||||||
# Check for errors
|
# Check for errors
|
||||||
if fail_on_error == True and produce_response.error != ErrorMapping.NO_ERROR:
|
if fail_on_error == True and produce_response.error != ErrorMapping.NO_ERROR:
|
||||||
raise Exception("ProduceRequest for %s failed with errorcode=%d",
|
raise Exception("ProduceRequest for %s failed with errorcode=%d" %
|
||||||
(TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error))
|
(TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error))
|
||||||
# Run the callback
|
# Run the callback
|
||||||
if callback is not None:
|
if callback is not None:
|
||||||
@@ -825,6 +825,9 @@ class SimpleProducer(object):
|
|||||||
resp = self.client.send_produce_request([req]).next()
|
resp = self.client.send_produce_request([req]).next()
|
||||||
|
|
||||||
class SimpleConsumer(object):
|
class SimpleConsumer(object):
|
||||||
|
"""
|
||||||
|
A simple consumer implementation that consumes all partitions for a topic
|
||||||
|
"""
|
||||||
def __init__(self, client, group, topic):
|
def __init__(self, client, group, topic):
|
||||||
self.client = client
|
self.client = client
|
||||||
self.topic = topic
|
self.topic = topic
|
||||||
@@ -832,7 +835,7 @@ class SimpleConsumer(object):
|
|||||||
self.client.load_metadata_for_topics(topic)
|
self.client.load_metadata_for_topics(topic)
|
||||||
self.offsets = {}
|
self.offsets = {}
|
||||||
|
|
||||||
def get_or_init_offset(resp):
|
def get_or_init_offset_callback(resp):
|
||||||
if resp.error == ErrorMapping.NO_ERROR:
|
if resp.error == ErrorMapping.NO_ERROR:
|
||||||
return resp.offset
|
return resp.offset
|
||||||
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
|
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
|
||||||
@@ -843,8 +846,33 @@ class SimpleConsumer(object):
|
|||||||
|
|
||||||
for partition in self.client.topic_partitions[topic]:
|
for partition in self.client.topic_partitions[topic]:
|
||||||
req = OffsetFetchRequest(topic, partition)
|
req = OffsetFetchRequest(topic, partition)
|
||||||
(offset,) = self.client.send_offset_fetch_request(group, [req], callback=get_or_init_offset, fail_on_error=False)
|
(offset,) = self.client.send_offset_fetch_request(group, [req],
|
||||||
|
callback=get_or_init_offset_callback, fail_on_error=False)
|
||||||
self.offsets[partition] = offset
|
self.offsets[partition] = offset
|
||||||
|
|
||||||
print self.offsets
|
def __iter__(self):
|
||||||
|
iters = {}
|
||||||
|
for partition, offset in self.offsets.items():
|
||||||
|
iters[partition] = self.__iter_partition__(partition, offset)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
for it in iters.values():
|
||||||
|
yield it.next()
|
||||||
|
|
||||||
|
def __iter_partition__(self, partition, offset):
|
||||||
|
while True:
|
||||||
|
req = FetchRequest(self.topic, partition, offset, 1024)
|
||||||
|
(resp,) = self.client.send_fetch_request([req])
|
||||||
|
assert resp.topic == self.topic
|
||||||
|
assert resp.partition == partition
|
||||||
|
next_offset = None
|
||||||
|
for message in resp.messages:
|
||||||
|
next_offset = message.offset
|
||||||
|
yield message
|
||||||
|
if next_offset is None:
|
||||||
|
raise StopIteration("No more messages")
|
||||||
|
else:
|
||||||
|
offset = next_offset + 1
|
||||||
|
# Commit offset here?
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -35,8 +35,10 @@ def build_kafka_classpath():
|
|||||||
return cp
|
return cp
|
||||||
|
|
||||||
class KafkaFixture(Thread):
|
class KafkaFixture(Thread):
|
||||||
def __init__(self, host, port):
|
def __init__(self, host, port, broker_id, zk_chroot=None):
|
||||||
Thread.__init__(self)
|
Thread.__init__(self)
|
||||||
|
self.broker_id = broker_id
|
||||||
|
self.zk_chroot = zk_chroot
|
||||||
self.port = port
|
self.port = port
|
||||||
self.capture = ""
|
self.capture = ""
|
||||||
self.shouldDie = Event()
|
self.shouldDie = Event()
|
||||||
@@ -50,19 +52,24 @@ class KafkaFixture(Thread):
|
|||||||
stdout = open(os.path.join(logDir, 'stdout'), 'w')
|
stdout = open(os.path.join(logDir, 'stdout'), 'w')
|
||||||
|
|
||||||
# Create the config file
|
# Create the config file
|
||||||
zkChroot = "kafka-python_%s" % self.tmpDir.replace("/", "_")
|
if self.zk_chroot is None:
|
||||||
|
self.zk_chroot= "kafka-python_%s" % self.tmpDir.replace("/", "_")
|
||||||
logConfig = "test/resources/log4j.properties"
|
logConfig = "test/resources/log4j.properties"
|
||||||
configFile = os.path.join(self.tmpDir, 'server.properties')
|
configFile = os.path.join(self.tmpDir, 'server.properties')
|
||||||
f = open('test/resources/server.properties', 'r')
|
f = open('test/resources/server.properties', 'r')
|
||||||
props = f.read()
|
props = f.read()
|
||||||
f = open(configFile, 'w')
|
f = open(configFile, 'w')
|
||||||
f.write(props % {'kafka.port': self.port, 'kafka.tmp.dir': logDir, 'kafka.partitions': 2, 'zk.chroot': zkChroot})
|
f.write(props % {'broker.id': self.broker_id,
|
||||||
|
'kafka.port': self.port,
|
||||||
|
'kafka.tmp.dir': logDir,
|
||||||
|
'kafka.partitions': 2,
|
||||||
|
'zk.chroot': self.zk_chroot})
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
cp = build_kafka_classpath()
|
cp = build_kafka_classpath()
|
||||||
|
|
||||||
# Create the Zookeeper chroot
|
# Create the Zookeeper chroot
|
||||||
args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, zkChroot))
|
args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, self.zk_chroot))
|
||||||
proc = subprocess.Popen(args)
|
proc = subprocess.Popen(args)
|
||||||
ret = proc.wait()
|
ret = proc.wait()
|
||||||
assert ret == 0
|
assert ret == 0
|
||||||
@@ -123,7 +130,7 @@ class TestKafkaClient(unittest.TestCase):
|
|||||||
cls.client = KafkaClient(host, port)
|
cls.client = KafkaClient(host, port)
|
||||||
else:
|
else:
|
||||||
port = get_open_port()
|
port = get_open_port()
|
||||||
cls.server = KafkaFixture("localhost", port)
|
cls.server = KafkaFixture("localhost", port, 0)
|
||||||
cls.server.start()
|
cls.server.start()
|
||||||
cls.server.wait_for("Kafka server started")
|
cls.server.wait_for("Kafka server started")
|
||||||
cls.client = KafkaClient("localhost", port)
|
cls.client = KafkaClient("localhost", port)
|
||||||
@@ -367,10 +374,55 @@ class TestKafkaClient(unittest.TestCase):
|
|||||||
self.assertEquals(len(messages), 1)
|
self.assertEquals(len(messages), 1)
|
||||||
self.assertEquals(messages[0].message.value, "two")
|
self.assertEquals(messages[0].message.value, "two")
|
||||||
|
|
||||||
# Consumer Tests
|
class TestConsumer(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
# Broker 0
|
||||||
|
port = get_open_port()
|
||||||
|
cls.server1 = KafkaFixture("localhost", port, 0)
|
||||||
|
cls.server1.start()
|
||||||
|
cls.server1.wait_for("Kafka server started")
|
||||||
|
|
||||||
|
# Broker 1
|
||||||
|
zk = cls.server1.zk_chroot
|
||||||
|
port = get_open_port()
|
||||||
|
cls.server2 = KafkaFixture("localhost", port, 1, zk)
|
||||||
|
cls.server2.start()
|
||||||
|
cls.server2.wait_for("Kafka server started")
|
||||||
|
|
||||||
|
# Client bootstraps from broker 1
|
||||||
|
cls.client = KafkaClient("localhost", port)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tearDownClass(cls):
|
||||||
|
cls.client.close()
|
||||||
|
cls.server1.close()
|
||||||
|
cls.server2.close()
|
||||||
|
|
||||||
def test_consumer(self):
|
def test_consumer(self):
|
||||||
|
produce1 = ProduceRequest("test_consumer", 0, messages=[
|
||||||
|
KafkaProtocol.create_message("Test message 0 %d" % i) for i in range(100)
|
||||||
|
])
|
||||||
|
|
||||||
|
produce2 = ProduceRequest("test_consumer", 1, messages=[
|
||||||
|
KafkaProtocol.create_message("Test message 1 %d" % i) for i in range(100)
|
||||||
|
])
|
||||||
|
|
||||||
|
for resp in self.client.send_produce_request([produce1]):
|
||||||
|
self.assertEquals(resp.error, 0)
|
||||||
|
self.assertEquals(resp.offset, 0)
|
||||||
|
|
||||||
|
for resp in self.client.send_produce_request([produce2]):
|
||||||
|
self.assertEquals(resp.error, 0)
|
||||||
|
self.assertEquals(resp.offset, 0)
|
||||||
|
|
||||||
consumer = SimpleConsumer(self.client, "group1", "test_consumer")
|
consumer = SimpleConsumer(self.client, "group1", "test_consumer")
|
||||||
|
all_messages = []
|
||||||
|
for message in consumer:
|
||||||
|
all_messages.append(message)
|
||||||
|
|
||||||
|
self.assertEquals(len(all_messages), 200)
|
||||||
|
self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|||||||
@@ -17,7 +17,7 @@
|
|||||||
############################# Server Basics #############################
|
############################# Server Basics #############################
|
||||||
|
|
||||||
# The id of the broker. This must be set to a unique integer for each broker.
|
# The id of the broker. This must be set to a unique integer for each broker.
|
||||||
broker.id=0
|
broker.id=%(broker.id)d
|
||||||
|
|
||||||
############################# Socket Server Settings #############################
|
############################# Socket Server Settings #############################
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user