@@ -448,6 +448,25 @@ class KafkaClient(object):
|
|||||||
continue
|
continue
|
||||||
conn = key.data
|
conn = key.data
|
||||||
processed.add(conn)
|
processed.add(conn)
|
||||||
|
|
||||||
|
if not conn.in_flight_requests:
|
||||||
|
# if we got an EVENT_READ but there were no in-flight requests, one of
|
||||||
|
# two things has happened:
|
||||||
|
#
|
||||||
|
# 1. The remote end closed the connection (because it died, or because
|
||||||
|
# a firewall timed out, or whatever)
|
||||||
|
# 2. The protocol is out of sync.
|
||||||
|
#
|
||||||
|
# either way, we can no longer safely use this connection
|
||||||
|
#
|
||||||
|
# Do a 1-byte read to clear the READ flag, and then close the conn
|
||||||
|
unexpected_data = key.fileobj.recv(1)
|
||||||
|
if unexpected_data: # anything other than a 0-byte read means protocol issues
|
||||||
|
log.warning('Protocol out of sync on %r, closing', conn)
|
||||||
|
conn.close()
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Accumulate as many responses as the connection has pending
|
||||||
while conn.in_flight_requests:
|
while conn.in_flight_requests:
|
||||||
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
|
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user