Merge pull request #335 from scrapinghub/fix-mp-consumer-distribution
Wrong partitions distribution logic for MP Consumer
This commit is contained in:
@@ -123,26 +123,28 @@ class MultiProcessConsumer(Consumer):
|
||||
self.pause = Event() # Requests the consumers to pause fetch
|
||||
self.size = Value('i', 0) # Indicator of number of messages to fetch
|
||||
|
||||
partitions = self.offsets.keys()
|
||||
# dict.keys() returns a view in py3 + it's not a thread-safe operation
|
||||
# http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3
|
||||
# It's safer to copy dict as it only runs during the init.
|
||||
partitions = list(self.offsets.copy().keys())
|
||||
|
||||
# If unspecified, start one consumer per partition
|
||||
# By default, start one consumer process for all partitions
|
||||
# The logic below ensures that
|
||||
# * we do not cross the num_procs limit
|
||||
# * we have an even distribution of partitions among processes
|
||||
if not partitions_per_proc:
|
||||
partitions_per_proc = round(len(partitions) * 1.0 / num_procs)
|
||||
if partitions_per_proc < num_procs * 0.5:
|
||||
partitions_per_proc += 1
|
||||
|
||||
if partitions_per_proc:
|
||||
num_procs = len(partitions) / partitions_per_proc
|
||||
if num_procs * partitions_per_proc < len(partitions):
|
||||
num_procs += 1
|
||||
|
||||
# The final set of chunks
|
||||
chunker = lambda *x: [] + list(x)
|
||||
chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc))
|
||||
chunks = [partitions[proc::num_procs] for proc in range(num_procs)]
|
||||
|
||||
self.procs = []
|
||||
for chunk in chunks:
|
||||
chunk = filter(lambda x: x is not None, chunk)
|
||||
args = (client.copy(),
|
||||
group, topic, list(chunk),
|
||||
group, topic, chunk,
|
||||
self.queue, self.start, self.exit,
|
||||
self.pause, self.size)
|
||||
|
||||
|
Reference in New Issue
Block a user