From f8012c1a74a60623d541d30ff5a21e86fd48c81a Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Wed, 11 Mar 2015 12:41:34 +0300 Subject: [PATCH] Moved additional MP consumer options to **kwargs --- kafka/consumer/multiprocess.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 2bb97f3..cdfaeeb 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -21,7 +21,7 @@ Events = namedtuple("Events", ["start", "pause", "exit"]) log = logging.getLogger("kafka") -def _mp_consume(client, group, topic, queue, size, events, consumer_options): +def _mp_consume(client, group, topic, queue, size, events, **consumer_options): """ A child process worker which consumes messages based on the notifications given by the controller process @@ -153,8 +153,8 @@ class MultiProcessConsumer(Consumer): options.update(simple_consumer_options) args = (client.copy(), group, topic, self.queue, - self.size, self.events, options) - proc = Process(target=_mp_consume, args=args) + self.size, self.events) + proc = Process(target=_mp_consume, args=args, kwargs=options) proc.daemon = True proc.start() self.procs.append(proc)