Use connection state functions where possible
This commit is contained in:
@@ -229,7 +229,7 @@ class KafkaClient(object):
|
|||||||
bootstrap.connect()
|
bootstrap.connect()
|
||||||
while bootstrap.connecting():
|
while bootstrap.connecting():
|
||||||
bootstrap.connect()
|
bootstrap.connect()
|
||||||
if bootstrap.state is not ConnectionStates.CONNECTED:
|
if not bootstrap.connected():
|
||||||
bootstrap.close()
|
bootstrap.close()
|
||||||
continue
|
continue
|
||||||
future = bootstrap.send(metadata_request)
|
future = bootstrap.send(metadata_request)
|
||||||
@@ -261,7 +261,7 @@ class KafkaClient(object):
|
|||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
conn = self._conns[node_id]
|
conn = self._conns[node_id]
|
||||||
return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out()
|
return conn.disconnected() and not conn.blacked_out()
|
||||||
|
|
||||||
def _conn_state_change(self, node_id, conn):
|
def _conn_state_change(self, node_id, conn):
|
||||||
if conn.connecting():
|
if conn.connecting():
|
||||||
@@ -398,7 +398,7 @@ class KafkaClient(object):
|
|||||||
|
|
||||||
conn = self._conns[node_id]
|
conn = self._conns[node_id]
|
||||||
time_waited_ms = time.time() - (conn.last_attempt or 0)
|
time_waited_ms = time.time() - (conn.last_attempt or 0)
|
||||||
if conn.state is ConnectionStates.DISCONNECTED:
|
if conn.disconnected():
|
||||||
return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
|
return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
|
||||||
elif conn.connecting():
|
elif conn.connecting():
|
||||||
return 0
|
return 0
|
||||||
|
@@ -523,6 +523,7 @@ class BrokerConnection(object):
|
|||||||
return self._send(request, expect_response=expect_response)
|
return self._send(request, expect_response=expect_response)
|
||||||
|
|
||||||
def _send(self, request, expect_response=True):
|
def _send(self, request, expect_response=True):
|
||||||
|
assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED)
|
||||||
future = Future()
|
future = Future()
|
||||||
correlation_id = self._next_correlation_id()
|
correlation_id = self._next_correlation_id()
|
||||||
header = RequestHeader(request,
|
header = RequestHeader(request,
|
||||||
|
Reference in New Issue
Block a user