Re-init the sockets in the new process
This commit is contained in:
@@ -176,6 +176,10 @@ class KafkaClient(object):
|
||||
for conn in self.conns.values():
|
||||
conn.close()
|
||||
|
||||
def reinit(self):
|
||||
for conn in self.conns.values():
|
||||
conn.reinit()
|
||||
|
||||
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
|
||||
fail_on_error=True, callback=None):
|
||||
"""
|
||||
|
||||
@@ -86,3 +86,12 @@ class KafkaConnection(local):
|
||||
def close(self):
|
||||
"Close this connection"
|
||||
self._sock.close()
|
||||
|
||||
def reinit(self):
|
||||
"""
|
||||
Re-initialize the socket connection
|
||||
"""
|
||||
self._sock.close()
|
||||
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._sock.connect((self.host, self.port))
|
||||
self._sock.settimeout(10)
|
||||
|
||||
@@ -445,7 +445,6 @@ class MultiProcessConsumer(Consumer):
|
||||
proc = Process(target=self._consume, args=(chunk,))
|
||||
proc.daemon = True
|
||||
proc.start()
|
||||
time.sleep(0.2)
|
||||
self.procs.append(proc)
|
||||
|
||||
def _consume(self, partitions):
|
||||
@@ -454,6 +453,9 @@ class MultiProcessConsumer(Consumer):
|
||||
notifications given by the controller process
|
||||
"""
|
||||
|
||||
# Make the child processes open separate socket connections
|
||||
self.client.reinit()
|
||||
|
||||
# We will start consumers without auto-commit. Auto-commit will be
|
||||
# done by the master controller process.
|
||||
consumer = SimpleConsumer(self.client, self.group, self.topic,
|
||||
|
||||
Reference in New Issue
Block a user