Merge pull request #336 from scrapinghub/feature-mp-consumer-params
Using additional params for MP consumer child process
This commit is contained in:
@@ -25,6 +25,7 @@ MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
|
||||
|
||||
ITER_TIMEOUT_SECONDS = 60
|
||||
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
|
||||
FULL_QUEUE_WAIT_TIME_SECONDS = 0.1
|
||||
|
||||
|
||||
class Consumer(object):
|
||||
|
||||
@@ -2,23 +2,27 @@ from __future__ import absolute_import
|
||||
|
||||
import logging
|
||||
import time
|
||||
from multiprocessing import Process, Queue as MPQueue, Event, Value
|
||||
|
||||
from collections import namedtuple
|
||||
from multiprocessing import Process, Manager as MPManager
|
||||
|
||||
try:
|
||||
from Queue import Empty
|
||||
from Queue import Empty, Full
|
||||
except ImportError: # python 2
|
||||
from queue import Empty
|
||||
from queue import Empty, Full
|
||||
|
||||
from .base import (
|
||||
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
|
||||
NO_MESSAGES_WAIT_TIME_SECONDS
|
||||
NO_MESSAGES_WAIT_TIME_SECONDS,
|
||||
FULL_QUEUE_WAIT_TIME_SECONDS
|
||||
)
|
||||
from .simple import Consumer, SimpleConsumer
|
||||
|
||||
Events = namedtuple("Events", ["start", "pause", "exit"])
|
||||
|
||||
log = logging.getLogger("kafka")
|
||||
|
||||
|
||||
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
|
||||
def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
|
||||
"""
|
||||
A child process worker which consumes messages based on the
|
||||
notifications given by the controller process
|
||||
@@ -34,20 +38,20 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
|
||||
# We will start consumers without auto-commit. Auto-commit will be
|
||||
# done by the master controller process.
|
||||
consumer = SimpleConsumer(client, group, topic,
|
||||
partitions=chunk,
|
||||
auto_commit=False,
|
||||
auto_commit_every_n=None,
|
||||
auto_commit_every_t=None)
|
||||
auto_commit_every_t=None,
|
||||
**consumer_options)
|
||||
|
||||
# Ensure that the consumer provides the partition information
|
||||
consumer.provide_partition_info()
|
||||
|
||||
while True:
|
||||
# Wait till the controller indicates us to start consumption
|
||||
start.wait()
|
||||
events.start.wait()
|
||||
|
||||
# If we are asked to quit, do so
|
||||
if exit.is_set():
|
||||
if events.exit.is_set():
|
||||
break
|
||||
|
||||
# Consume messages and add them to the queue. If the controller
|
||||
@@ -56,7 +60,13 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
|
||||
|
||||
message = consumer.get_message()
|
||||
if message:
|
||||
queue.put(message)
|
||||
while True:
|
||||
try:
|
||||
queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
|
||||
break
|
||||
except Full:
|
||||
if events.exit.is_set(): break
|
||||
|
||||
count += 1
|
||||
|
||||
# We have reached the required size. The controller might have
|
||||
@@ -65,7 +75,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
|
||||
# loop consuming all available messages before the controller
|
||||
# can reset the 'start' event
|
||||
if count == size.value:
|
||||
pause.wait()
|
||||
events.pause.wait()
|
||||
|
||||
else:
|
||||
# In case we did not receive any message, give up the CPU for
|
||||
@@ -105,7 +115,8 @@ class MultiProcessConsumer(Consumer):
|
||||
def __init__(self, client, group, topic, auto_commit=True,
|
||||
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
||||
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
|
||||
num_procs=1, partitions_per_proc=0):
|
||||
num_procs=1, partitions_per_proc=0,
|
||||
**simple_consumer_options):
|
||||
|
||||
# Initiate the base consumer class
|
||||
super(MultiProcessConsumer, self).__init__(
|
||||
@@ -117,11 +128,13 @@ class MultiProcessConsumer(Consumer):
|
||||
|
||||
# Variables for managing and controlling the data flow from
|
||||
# consumer child process to master
|
||||
self.queue = MPQueue(1024) # Child consumers dump messages into this
|
||||
self.start = Event() # Indicates the consumers to start fetch
|
||||
self.exit = Event() # Requests the consumers to shutdown
|
||||
self.pause = Event() # Requests the consumers to pause fetch
|
||||
self.size = Value('i', 0) # Indicator of number of messages to fetch
|
||||
manager = MPManager()
|
||||
self.queue = manager.Queue(1024) # Child consumers dump messages into this
|
||||
self.events = Events(
|
||||
start = manager.Event(), # Indicates the consumers to start fetch
|
||||
exit = manager.Event(), # Requests the consumers to shutdown
|
||||
pause = manager.Event()) # Requests the consumers to pause fetch
|
||||
self.size = manager.Value('i', 0) # Indicator of number of messages to fetch
|
||||
|
||||
# dict.keys() returns a view in py3 + it's not a thread-safe operation
|
||||
# http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3
|
||||
@@ -143,12 +156,14 @@ class MultiProcessConsumer(Consumer):
|
||||
|
||||
self.procs = []
|
||||
for chunk in chunks:
|
||||
args = (client.copy(),
|
||||
group, topic, chunk,
|
||||
self.queue, self.start, self.exit,
|
||||
self.pause, self.size)
|
||||
options = {'partitions': list(chunk)}
|
||||
if simple_consumer_options:
|
||||
simple_consumer_options.pop('partitions', None)
|
||||
options.update(simple_consumer_options)
|
||||
|
||||
proc = Process(target=_mp_consume, args=args)
|
||||
args = (client.copy(), group, topic, self.queue,
|
||||
self.size, self.events)
|
||||
proc = Process(target=_mp_consume, args=args, kwargs=options)
|
||||
proc.daemon = True
|
||||
proc.start()
|
||||
self.procs.append(proc)
|
||||
@@ -159,9 +174,9 @@ class MultiProcessConsumer(Consumer):
|
||||
|
||||
def stop(self):
|
||||
# Set exit and start off all waiting consumers
|
||||
self.exit.set()
|
||||
self.pause.set()
|
||||
self.start.set()
|
||||
self.events.exit.set()
|
||||
self.events.pause.set()
|
||||
self.events.start.set()
|
||||
|
||||
for proc in self.procs:
|
||||
proc.join()
|
||||
@@ -176,10 +191,10 @@ class MultiProcessConsumer(Consumer):
|
||||
# Trigger the consumer procs to start off.
|
||||
# We will iterate till there are no more messages available
|
||||
self.size.value = 0
|
||||
self.pause.set()
|
||||
self.events.pause.set()
|
||||
|
||||
while True:
|
||||
self.start.set()
|
||||
self.events.start.set()
|
||||
try:
|
||||
# We will block for a small while so that the consumers get
|
||||
# a chance to run and put some messages in the queue
|
||||
@@ -191,12 +206,12 @@ class MultiProcessConsumer(Consumer):
|
||||
|
||||
# Count, check and commit messages if necessary
|
||||
self.offsets[partition] = message.offset + 1
|
||||
self.start.clear()
|
||||
self.events.start.clear()
|
||||
self.count_since_commit += 1
|
||||
self._auto_commit()
|
||||
yield message
|
||||
|
||||
self.start.clear()
|
||||
self.events.start.clear()
|
||||
|
||||
def get_messages(self, count=1, block=True, timeout=10):
|
||||
"""
|
||||
@@ -216,7 +231,7 @@ class MultiProcessConsumer(Consumer):
|
||||
# necessary, but these will not be committed to kafka. Also, the extra
|
||||
# messages can be provided in subsequent runs
|
||||
self.size.value = count
|
||||
self.pause.clear()
|
||||
self.events.pause.clear()
|
||||
|
||||
if timeout is not None:
|
||||
max_time = time.time() + timeout
|
||||
@@ -228,7 +243,7 @@ class MultiProcessConsumer(Consumer):
|
||||
# go into overdrive and keep consuming thousands of
|
||||
# messages when the user might need only a few
|
||||
if self.queue.empty():
|
||||
self.start.set()
|
||||
self.events.start.set()
|
||||
|
||||
try:
|
||||
partition, message = self.queue.get(block, timeout)
|
||||
@@ -242,8 +257,8 @@ class MultiProcessConsumer(Consumer):
|
||||
timeout = max_time - time.time()
|
||||
|
||||
self.size.value = 0
|
||||
self.start.clear()
|
||||
self.pause.set()
|
||||
self.events.start.clear()
|
||||
self.events.pause.set()
|
||||
|
||||
# Update and commit offsets if necessary
|
||||
self.offsets.update(new_offsets)
|
||||
|
||||
@@ -61,7 +61,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
group = kwargs.pop('group', self.id().encode('utf-8'))
|
||||
topic = kwargs.pop('topic', self.topic)
|
||||
|
||||
if consumer_class == SimpleConsumer:
|
||||
if consumer_class in [SimpleConsumer, MultiProcessConsumer]:
|
||||
kwargs.setdefault('iter_timeout', 0)
|
||||
|
||||
return consumer_class(self.client, group, topic, **kwargs)
|
||||
@@ -243,7 +243,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
self.send_messages(0, range(0, 10))
|
||||
self.send_messages(1, range(10, 20))
|
||||
|
||||
consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
|
||||
consumer = MultiProcessConsumer(self.client, "group1", self.topic,
|
||||
auto_commit=False, iter_timeout=0)
|
||||
|
||||
self.assertEqual(consumer.pending(), 20)
|
||||
self.assertEqual(consumer.pending(partitions=[0]), 10)
|
||||
|
||||
Reference in New Issue
Block a user