Used thread-safe dict.copy().keys() for MP consumer partitions
This commit is contained in:
@@ -123,7 +123,10 @@ class MultiProcessConsumer(Consumer):
|
||||
self.pause = Event() # Requests the consumers to pause fetch
|
||||
self.size = Value('i', 0) # Indicator of number of messages to fetch
|
||||
|
||||
partitions = list(self.offsets.keys())
|
||||
# dict.keys() returns a view in py3 + it's not a thread-safe operation
|
||||
# http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3
|
||||
# It's safer to copy dict as it only runs during the init.
|
||||
partitions = list(self.offsets.copy().keys())
|
||||
|
||||
# By default, start one consumer process for all partitions
|
||||
# The logic below ensures that
|
||||
|
||||
Reference in New Issue
Block a user