Add state_change_callback to BrokerConnection
This commit is contained in:
@@ -31,6 +31,7 @@ DEFAULT_KAFKA_PORT = 9092
|
||||
|
||||
|
||||
class ConnectionStates(object):
|
||||
DISCONNECTING = '<disconnecting>'
|
||||
DISCONNECTED = '<disconnected>'
|
||||
CONNECTING = '<connecting>'
|
||||
CONNECTED = '<connected>'
|
||||
@@ -49,6 +50,7 @@ class BrokerConnection(object):
|
||||
'receive_buffer_bytes': None,
|
||||
'send_buffer_bytes': None,
|
||||
'api_version': (0, 8, 2), # default to most restrictive
|
||||
'state_change_callback': lambda conn: True,
|
||||
}
|
||||
|
||||
def __init__(self, host, port, afi, **configs):
|
||||
@@ -87,6 +89,7 @@ class BrokerConnection(object):
|
||||
self._sock.setblocking(False)
|
||||
self.state = ConnectionStates.CONNECTING
|
||||
self.last_attempt = time.time()
|
||||
self.config['state_change_callback'](self)
|
||||
|
||||
if self.state is ConnectionStates.CONNECTING:
|
||||
# in non-blocking mode, use repeated calls to socket.connect_ex
|
||||
@@ -101,6 +104,7 @@ class BrokerConnection(object):
|
||||
if not ret or ret == errno.EISCONN:
|
||||
log.debug('%s: established TCP connection', str(self))
|
||||
self.state = ConnectionStates.CONNECTED
|
||||
self.config['state_change_callback'](self)
|
||||
|
||||
# Connection failed
|
||||
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
|
||||
@@ -151,6 +155,9 @@ class BrokerConnection(object):
|
||||
will be failed with this exception.
|
||||
Default: kafka.errors.ConnectionError.
|
||||
"""
|
||||
if self.state is not ConnectionStates.DISCONNECTED:
|
||||
self.state = ConnectionStates.DISCONNECTING
|
||||
self.config['state_change_callback'](self)
|
||||
if self._sock:
|
||||
self._sock.close()
|
||||
self._sock = None
|
||||
@@ -165,6 +172,7 @@ class BrokerConnection(object):
|
||||
while self.in_flight_requests:
|
||||
ifr = self.in_flight_requests.popleft()
|
||||
ifr.future.failure(error)
|
||||
self.config['state_change_callback'](self)
|
||||
|
||||
def send(self, request, expect_response=True):
|
||||
"""send request, return Future()
|
||||
|
||||
Reference in New Issue
Block a user