Pass error to BrokerConnection.close()

This commit is contained in:
Dana Powers
2016-12-17 10:37:28 -08:00
parent cfb5f0e285
commit cb5e7b80ba
2 changed files with 28 additions and 20 deletions

View File

@@ -557,7 +557,7 @@ class KafkaClient(object):
log.warning('Protocol out of sync on %r, closing', conn) log.warning('Protocol out of sync on %r, closing', conn)
except socket.error: except socket.error:
pass pass
conn.close() conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests'))
continue continue
# Accumulate as many responses as the connection has pending # Accumulate as many responses as the connection has pending

View File

@@ -9,6 +9,7 @@ from random import shuffle
import socket import socket
import ssl import ssl
import time import time
import traceback
from kafka.vendor import six from kafka.vendor import six
@@ -236,10 +237,10 @@ class BrokerConnection(object):
self._gai_index += 1 self._gai_index += 1
while True: while True:
if self._gai_index >= len(self._gai): if self._gai_index >= len(self._gai):
log.error('Unable to connect to any of the names for {0}:{1}'.format( error = 'Unable to connect to any of the names for {0}:{1}'.format(
self._init_host, self._init_port self._init_host, self._init_port)
)) log.error(error)
self.close() self.close(Errors.ConnectionError(error))
return return
afi, _, __, ___, sockaddr = self._gai[self._gai_index] afi, _, __, ___, sockaddr = self._gai[self._gai_index]
if afi not in (socket.AF_INET, socket.AF_INET6): if afi not in (socket.AF_INET, socket.AF_INET6):
@@ -293,12 +294,12 @@ class BrokerConnection(object):
elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022): elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022):
log.error('Connect attempt to %s returned error %s.' log.error('Connect attempt to %s returned error %s.'
' Disconnecting.', self, ret) ' Disconnecting.', self, ret)
self.close() self.close(Errors.ConnectionError(ret))
# Connection timed out # Connection timed out
elif time.time() > request_timeout + self.last_attempt: elif time.time() > request_timeout + self.last_attempt:
log.error('Connection attempt to %s timed out', self) log.error('Connection attempt to %s timed out', self)
self.close() # error=TimeoutError ? self.close(Errors.ConnectionError('timeout'))
# Needs retry # Needs retry
else: else:
@@ -345,9 +346,9 @@ class BrokerConnection(object):
password=self.config['ssl_password']) password=self.config['ssl_password'])
if self.config['ssl_crlfile']: if self.config['ssl_crlfile']:
if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'): if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'):
log.error('%s: No CRL support with this version of Python.' error = 'No CRL support with this version of Python.'
' Disconnecting.', self) log.error('%s: %s Disconnecting.', self, error)
self.close() self.close(Errors.ConnectionError(error))
return return
log.info('%s: Loading SSL CRL from %s', str(self), self.config['ssl_crlfile']) log.info('%s: Loading SSL CRL from %s', str(self), self.config['ssl_crlfile'])
self._ssl_context.load_verify_locations(self.config['ssl_crlfile']) self._ssl_context.load_verify_locations(self.config['ssl_crlfile'])
@@ -359,9 +360,9 @@ class BrokerConnection(object):
self._sock, self._sock,
server_hostname=self.hostname, server_hostname=self.hostname,
do_handshake_on_connect=False) do_handshake_on_connect=False)
except ssl.SSLError: except ssl.SSLError as e:
log.exception('%s: Failed to wrap socket in SSLContext!', str(self)) log.exception('%s: Failed to wrap socket in SSLContext!', str(self))
self.close() self.close(e)
self.last_failure = time.time() self.last_failure = time.time()
def _try_handshake(self): def _try_handshake(self):
@@ -374,7 +375,7 @@ class BrokerConnection(object):
pass pass
except ssl.SSLZeroReturnError: except ssl.SSLZeroReturnError:
log.warning('SSL connection closed by server during handshake.') log.warning('SSL connection closed by server during handshake.')
self.close() self.close(Errors.ConnectionError('SSL connection closed by server during handshake'))
# Other SSLErrors will be raised to user # Other SSLErrors will be raised to user
return False return False
@@ -482,9 +483,15 @@ class BrokerConnection(object):
will be failed with this exception. will be failed with this exception.
Default: kafka.errors.ConnectionError. Default: kafka.errors.ConnectionError.
""" """
if self.state is not ConnectionStates.DISCONNECTED: if self.state is ConnectionStates.DISCONNECTED:
self.state = ConnectionStates.DISCONNECTING if error is not None:
self.config['state_change_callback'](self) log.warning('%s: close() called on disconnected connection with error: %s', self, error)
traceback.print_stack()
return
log.info('%s: Closing connection. %s', self, error or '')
self.state = ConnectionStates.DISCONNECTING
self.config['state_change_callback'](self)
if self._sock: if self._sock:
self._sock.close() self._sock.close()
self._sock = None self._sock = None
@@ -572,7 +579,7 @@ class BrokerConnection(object):
# If requests are pending, we should close the socket and # If requests are pending, we should close the socket and
# fail all the pending request futures # fail all the pending request futures
if self.in_flight_requests: if self.in_flight_requests:
self.close() self.close(Errors.ConnectionError('Socket not connected during recv with in-flight-requests'))
return None return None
elif not self.in_flight_requests: elif not self.in_flight_requests:
@@ -699,7 +706,7 @@ class BrokerConnection(object):
'%s: Correlation ids do not match: sent %d, recv %d' '%s: Correlation ids do not match: sent %d, recv %d'
% (str(self), ifr.correlation_id, recv_correlation_id)) % (str(self), ifr.correlation_id, recv_correlation_id))
ifr.future.failure(error) ifr.future.failure(error)
self.close() self.close(error)
self._processing = False self._processing = False
return None return None
@@ -713,8 +720,9 @@ class BrokerConnection(object):
' Unable to decode %d-byte buffer: %r', self, ' Unable to decode %d-byte buffer: %r', self,
ifr.correlation_id, ifr.response_type, ifr.correlation_id, ifr.response_type,
ifr.request, len(buf), buf) ifr.request, len(buf), buf)
ifr.future.failure(Errors.UnknownError('Unable to decode response')) error = Errors.UnknownError('Unable to decode response')
self.close() ifr.future.failure(error)
self.close(error)
self._processing = False self._processing = False
return None return None