Describe consumer thread-safety
This commit is contained in:
committed by
Dana Powers
parent
73d78bc76a
commit
97261f491d
@@ -110,6 +110,15 @@ for more details.
|
|||||||
>>> for i in range(1000):
|
>>> for i in range(1000):
|
||||||
... producer.send('foobar', b'msg %d' % i)
|
... producer.send('foobar', b'msg %d' % i)
|
||||||
|
|
||||||
|
Thread safety
|
||||||
|
*************
|
||||||
|
|
||||||
|
The KafkaProducer can be used across threads without issue, unlike the
|
||||||
|
KafkaConsumer which cannot.
|
||||||
|
|
||||||
|
While it is possible to use the KafkaConsumer in a thread-local manner,
|
||||||
|
multiprocessing is recommended.
|
||||||
|
|
||||||
Compression
|
Compression
|
||||||
***********
|
***********
|
||||||
|
|
||||||
|
|||||||
@@ -109,6 +109,16 @@ client. See `KafkaProducer <apidoc/KafkaProducer.html>`_ for more details.
|
|||||||
... producer.send('foobar', b'msg %d' % i)
|
... producer.send('foobar', b'msg %d' % i)
|
||||||
|
|
||||||
|
|
||||||
|
Thread safety
|
||||||
|
*************
|
||||||
|
|
||||||
|
The KafkaProducer can be used across threads without issue, unlike the
|
||||||
|
KafkaConsumer which cannot.
|
||||||
|
|
||||||
|
While it is possible to use the KafkaConsumer in a thread-local manner,
|
||||||
|
multiprocessing is recommended.
|
||||||
|
|
||||||
|
|
||||||
Compression
|
Compression
|
||||||
***********
|
***********
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
import threading, logging, time
|
import threading, logging, time
|
||||||
|
import multiprocessing
|
||||||
|
|
||||||
from kafka import KafkaConsumer, KafkaProducer
|
from kafka import KafkaConsumer, KafkaProducer
|
||||||
|
|
||||||
@@ -16,7 +17,7 @@ class Producer(threading.Thread):
|
|||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
class Consumer(threading.Thread):
|
class Consumer(multiprocessing.Process):
|
||||||
daemon = True
|
daemon = True
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
@@ -29,12 +30,12 @@ class Consumer(threading.Thread):
|
|||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
threads = [
|
tasks = [
|
||||||
Producer(),
|
Producer(),
|
||||||
Consumer()
|
Consumer()
|
||||||
]
|
]
|
||||||
|
|
||||||
for t in threads:
|
for t in tasks:
|
||||||
t.start()
|
t.start()
|
||||||
|
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
|
|||||||
@@ -33,6 +33,8 @@ class KafkaConsumer(six.Iterator):
|
|||||||
to allow multiple consumers to load balance consumption of topics (requires
|
to allow multiple consumers to load balance consumption of topics (requires
|
||||||
kafka >= 0.9.0.0).
|
kafka >= 0.9.0.0).
|
||||||
|
|
||||||
|
The consumer is not thread safe and should not be shared across threads.
|
||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
*topics (str): optional list of topics to subscribe to. If not set,
|
*topics (str): optional list of topics to subscribe to. If not set,
|
||||||
call :meth:`~kafka.KafkaConsumer.subscribe` or
|
call :meth:`~kafka.KafkaConsumer.subscribe` or
|
||||||
|
|||||||
Reference in New Issue
Block a user