Remove unnecessary calls in KafkaClient._poll

- Dont process connections; outer poll() loop does this now
  - Only recv connections that select says are readable
This commit is contained in:
Dana Powers
2015-12-30 11:50:42 -08:00
parent 9bc01657ed
commit cfae9e3fa3

View File

@@ -264,22 +264,22 @@ class KafkaClient(object):
def _poll(self, timeout): def _poll(self, timeout):
# select on reads across all connected sockets, blocking up to timeout # select on reads across all connected sockets, blocking up to timeout
sockets = [conn._sock for conn in six.itervalues(self._conns) sockets = dict([(conn._sock, conn)
if (conn.state is ConnectionStates.CONNECTED and for conn in six.itervalues(self._conns)
conn.in_flight_requests)] if (conn.state is ConnectionStates.CONNECTED
if sockets: and conn.in_flight_requests)])
select.select(sockets, [], [], timeout) if not sockets:
return []
ready, _, _ = select.select(list(sockets.keys()), [], [], timeout)
responses = [] responses = []
# list, not iterator, because inline callbacks may add to self._conns # list, not iterator, because inline callbacks may add to self._conns
for conn in list(self._conns.values()): for sock in ready:
if conn.state is ConnectionStates.CONNECTING: conn = sockets[sock]
conn.connect() response = conn.recv() # Note: conn.recv runs callbacks / errbacks
if response:
if conn.in_flight_requests: responses.append(response)
response = conn.recv() # This will run callbacks / errbacks
if response:
responses.append(response)
return responses return responses
def in_flight_request_count(self, node_id=None): def in_flight_request_count(self, node_id=None):