Moved additional MP consumer options to **kwargs

This commit is contained in:
Viktor Shlapakov
2015-03-11 12:41:34 +03:00
parent 09028f0b22
commit f8012c1a74

View File

@@ -21,7 +21,7 @@ Events = namedtuple("Events", ["start", "pause", "exit"])
log = logging.getLogger("kafka")
def _mp_consume(client, group, topic, queue, size, events, consumer_options):
def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
"""
A child process worker which consumes messages based on the
notifications given by the controller process
@@ -153,8 +153,8 @@ class MultiProcessConsumer(Consumer):
options.update(simple_consumer_options)
args = (client.copy(), group, topic, self.queue,
self.size, self.events, options)
proc = Process(target=_mp_consume, args=args)
self.size, self.events)
proc = Process(target=_mp_consume, args=args, kwargs=options)
proc.daemon = True
proc.start()
self.procs.append(proc)