Check for pending ssl bytes in KafkaClient loop
This commit is contained in:
@@ -414,7 +414,9 @@ class KafkaClient(object):
|
||||
def _poll(self, timeout, sleep=True):
|
||||
# select on reads across all connected sockets, blocking up to timeout
|
||||
assert self.in_flight_request_count() > 0 or self._connecting or sleep
|
||||
|
||||
responses = []
|
||||
processed = set()
|
||||
for key, events in self._selector.select(timeout):
|
||||
if key.fileobj is self._wake_r:
|
||||
self._clear_wake_fd()
|
||||
@@ -422,6 +424,7 @@ class KafkaClient(object):
|
||||
elif not (events & selectors.EVENT_READ):
|
||||
continue
|
||||
conn = key.data
|
||||
processed.add(conn)
|
||||
while conn.in_flight_requests:
|
||||
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
|
||||
|
||||
@@ -430,6 +433,15 @@ class KafkaClient(object):
|
||||
if not response:
|
||||
break
|
||||
responses.append(response)
|
||||
|
||||
# Check for additional pending SSL bytes
|
||||
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
|
||||
# TODO: optimize
|
||||
for conn in self._conns.values():
|
||||
if conn not in processed and conn.connected() and conn._sock.pending():
|
||||
response = conn.recv()
|
||||
if response:
|
||||
responses.append(response)
|
||||
return responses
|
||||
|
||||
def in_flight_request_count(self, node_id=None):
|
||||
|
||||
Reference in New Issue
Block a user