Update example.py to compile, add friendly load_example.py
This commit is contained in:
48
example.py
Normal file → Executable file
48
example.py
Normal file → Executable file
@@ -1,23 +1,45 @@
|
||||
import logging
|
||||
#!/usr/bin/env python
|
||||
import threading, logging, time
|
||||
|
||||
from kafka.client import KafkaClient, FetchRequest, ProduceRequest
|
||||
from kafka.client import KafkaClient
|
||||
from kafka.consumer import SimpleConsumer
|
||||
from kafka.producer import SimpleProducer
|
||||
|
||||
def produce_example(client):
|
||||
producer = SimpleProducer(client, "my-topic")
|
||||
producer.send_messages("test")
|
||||
class Producer(threading.Thread):
|
||||
daemon = True
|
||||
|
||||
def consume_example(client):
|
||||
consumer = SimpleConsumer(client, "test-group", "my-topic")
|
||||
for message in consumer:
|
||||
print(message)
|
||||
def run(self):
|
||||
client = KafkaClient("localhost", 9092)
|
||||
producer = SimpleProducer(client)
|
||||
|
||||
while True:
|
||||
producer.send_messages('my-topic', "test")
|
||||
producer.send_messages('my-topic', "\xc2Hola, mundo!")
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
class Consumer(threading.Thread):
|
||||
daemon = True
|
||||
|
||||
def run(self):
|
||||
client = KafkaClient("localhost", 9092)
|
||||
consumer = SimpleConsumer(client, "test-group", "my-topic")
|
||||
|
||||
for message in consumer:
|
||||
print(message)
|
||||
|
||||
def main():
|
||||
client = KafkaClient("localhost", 9092)
|
||||
produce_example(client)
|
||||
consume_example(client)
|
||||
threads = [
|
||||
Producer(),
|
||||
Consumer()
|
||||
]
|
||||
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.basicConfig(level=logging.WARN)
|
||||
main()
|
||||
|
||||
57
load_example.py
Executable file
57
load_example.py
Executable file
@@ -0,0 +1,57 @@
|
||||
#!/usr/bin/env python
|
||||
import threading, logging, time, collections
|
||||
|
||||
from kafka.client import KafkaClient
|
||||
from kafka.consumer import SimpleConsumer
|
||||
from kafka.producer import SimpleProducer
|
||||
|
||||
msg_size = 524288
|
||||
|
||||
class Producer(threading.Thread):
|
||||
daemon = True
|
||||
big_msg = "1" * msg_size
|
||||
|
||||
def run(self):
|
||||
client = KafkaClient("localhost", 9092)
|
||||
producer = SimpleProducer(client)
|
||||
self.sent = 0
|
||||
|
||||
while True:
|
||||
producer.send_messages('my-topic', self.big_msg)
|
||||
self.sent += 1
|
||||
|
||||
|
||||
class Consumer(threading.Thread):
|
||||
daemon = True
|
||||
|
||||
def run(self):
|
||||
client = KafkaClient("localhost", 9092)
|
||||
consumer = SimpleConsumer(client, "test-group", "my-topic",
|
||||
max_buffer_size = None,
|
||||
)
|
||||
self.valid = 0
|
||||
self.invalid = 0
|
||||
|
||||
for message in consumer:
|
||||
if len(message.message.value) == msg_size:
|
||||
self.valid += 1
|
||||
else:
|
||||
self.invalid += 1
|
||||
|
||||
def main():
|
||||
threads = [
|
||||
Producer(),
|
||||
Consumer()
|
||||
]
|
||||
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
time.sleep(10)
|
||||
print 'Messages sent: %d' % threads[0].sent
|
||||
print 'Messages recvd: %d' % threads[1].valid
|
||||
print 'Messages invalid: %d' % threads[1].invalid
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
||||
Reference in New Issue
Block a user