Got MultiProcessConsumer working
Other changes
* Put a message size restriction on the shared queue
- to prevent message overload
* Wait for a while after each process is started (in constructor)
* Wait for a while in each child if the consumer does not return any messages
Just to be nice to the CPU.
* Control the start event more granularly - this prevents infinite loops
if the control does not return to the generator. For eg:
for msg in consumer:
assert False
* Update message status before yield
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
from itertools import izip_longest, repeat
|
||||
import logging
|
||||
import time
|
||||
from threading import Lock
|
||||
from multiprocessing import Process, Queue, Event, Value
|
||||
from Queue import Empty
|
||||
|
||||
from kafka.common import (
|
||||
ErrorMapping, FetchRequest,
|
||||
@@ -412,14 +414,14 @@ class MultiProcessConsumer(Consumer):
|
||||
|
||||
# Initiate the base consumer class
|
||||
super(MultiProcessConsumer, self).__init__(client, group, topic,
|
||||
partitions=partitions,
|
||||
partitions=None,
|
||||
auto_commit=auto_commit,
|
||||
auto_commit_every_n=auto_commit_every_n,
|
||||
auto_commit_every_t=auto_commit_every_t)
|
||||
|
||||
# Variables for managing and controlling the data flow from
|
||||
# consumer child process to master
|
||||
self.queue = Queue() # Child consumers dump messages into this
|
||||
self.queue = Queue(1024) # Child consumers dump messages into this
|
||||
self.start = Event() # Indicates the consumers to start fetch
|
||||
self.exit = Event() # Requests the consumers to shutdown
|
||||
self.pause = Event() # Requests the consumers to pause fetch
|
||||
@@ -441,9 +443,11 @@ class MultiProcessConsumer(Consumer):
|
||||
|
||||
self.procs = []
|
||||
for chunk in chunks:
|
||||
proc = Process(target=_self._consume, args=(chunk,))
|
||||
chunk = filter(lambda x: x is not None, list(chunk))
|
||||
proc = Process(target=self._consume, args=(chunk,))
|
||||
proc.daemon = True
|
||||
proc.start()
|
||||
time.sleep(0.2)
|
||||
self.procs.append(proc)
|
||||
|
||||
def _consume(self, partitions):
|
||||
@@ -468,7 +472,7 @@ class MultiProcessConsumer(Consumer):
|
||||
self.start.wait()
|
||||
|
||||
# If we are asked to quit, do so
|
||||
if self.exit.isSet():
|
||||
if self.exit.is_set():
|
||||
break
|
||||
|
||||
# Consume messages and add them to the queue. If the controller
|
||||
@@ -488,6 +492,11 @@ class MultiProcessConsumer(Consumer):
|
||||
self.pause.wait()
|
||||
break
|
||||
|
||||
# In case we did not receive any message, give up the CPU for
|
||||
# a while before we try again
|
||||
if count == 0:
|
||||
time.sleep(0.1)
|
||||
|
||||
consumer.stop()
|
||||
|
||||
def stop(self):
|
||||
@@ -507,21 +516,22 @@ class MultiProcessConsumer(Consumer):
|
||||
# Trigger the consumer procs to start off.
|
||||
# We will iterate till there are no more messages available
|
||||
self.size.value = 0
|
||||
self.start.set()
|
||||
self.pause.set()
|
||||
|
||||
while True:
|
||||
self.start.set()
|
||||
try:
|
||||
# We will block for a small while so that the consumers get
|
||||
# a chance to run and put some messages in the queue
|
||||
partition, message = self.queue.get(block=True, timeout=0.1)
|
||||
except Queue.Empty:
|
||||
partition, message = self.queue.get(block=True, timeout=1)
|
||||
except Empty:
|
||||
break
|
||||
|
||||
yield message
|
||||
|
||||
# Count, check and commit messages if necessary
|
||||
self.offsets[partition] = message.offset
|
||||
self.start.clear()
|
||||
yield message
|
||||
|
||||
self.count_since_commit += 1
|
||||
self._auto_commit()
|
||||
|
||||
@@ -555,7 +565,7 @@ class MultiProcessConsumer(Consumer):
|
||||
|
||||
try:
|
||||
partition, message = self.queue.get(block, timeout)
|
||||
except Queue.Empty:
|
||||
except Empty:
|
||||
break
|
||||
|
||||
messages.append(message)
|
||||
|
||||
Reference in New Issue
Block a user