Improve KafkaClient connect and ready handling
- merge _initiate and _finish into _maybe_connect - add connected(node_id) method - only short-circuit send() when not connected
This commit is contained in:
@@ -152,8 +152,8 @@ class KafkaClient(object):
|
||||
conn = self._conns[node_id]
|
||||
return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out()
|
||||
|
||||
def _initiate_connect(self, node_id):
|
||||
"""Initiate a connection to the given node (must be in metadata)"""
|
||||
def _maybe_connect(self, node_id):
|
||||
"""Idempotent non-blocking connection attempt to the given node id."""
|
||||
if node_id not in self._conns:
|
||||
broker = self.cluster.broker_metadata(node_id)
|
||||
assert broker, 'Broker id %s not in current metadata' % node_id
|
||||
@@ -164,22 +164,21 @@ class KafkaClient(object):
|
||||
host, port, afi = get_ip_port_afi(broker.host)
|
||||
self._conns[node_id] = BrokerConnection(host, broker.port, afi,
|
||||
**self.config)
|
||||
return self._finish_connect(node_id)
|
||||
|
||||
def _finish_connect(self, node_id):
|
||||
assert node_id in self._conns, '%s is not in current conns' % node_id
|
||||
state = self._conns[node_id].connect()
|
||||
if state is ConnectionStates.CONNECTING:
|
||||
self._connecting.add(node_id)
|
||||
|
||||
# Whether CONNECTED or DISCONNECTED, we need to remove from connecting
|
||||
elif node_id in self._connecting:
|
||||
log.debug("Node %s connection state is %s", node_id, state)
|
||||
self._connecting.remove(node_id)
|
||||
|
||||
# Connection failures imply that our metadata is stale, so let's refresh
|
||||
if state is ConnectionStates.DISCONNECTED:
|
||||
log.warning("Node %s connect failed -- refreshing metadata", node_id)
|
||||
self.cluster.request_update()
|
||||
|
||||
return state
|
||||
return self._conns[node_id].connected()
|
||||
|
||||
def ready(self, node_id):
|
||||
"""Check whether a node is connected and ok to send more requests.
|
||||
@@ -190,19 +189,15 @@ class KafkaClient(object):
|
||||
Returns:
|
||||
bool: True if we are ready to send to the given node
|
||||
"""
|
||||
if self.is_ready(node_id):
|
||||
return True
|
||||
|
||||
if self._can_connect(node_id):
|
||||
# if we are interested in sending to a node
|
||||
# and we don't have a connection to it, initiate one
|
||||
self._initiate_connect(node_id)
|
||||
|
||||
if node_id in self._connecting:
|
||||
self._finish_connect(node_id)
|
||||
|
||||
self._maybe_connect(node_id)
|
||||
return self.is_ready(node_id)
|
||||
|
||||
def connected(self, node_id):
|
||||
"""Return True iff the node_id is connected."""
|
||||
if node_id not in self._conns:
|
||||
return False
|
||||
return self._conns[node_id].connected()
|
||||
|
||||
def close(self, node_id=None):
|
||||
"""Closes the connection to a particular node (if there is one).
|
||||
|
||||
@@ -295,15 +290,13 @@ class KafkaClient(object):
|
||||
request (Struct): request object (not-encoded)
|
||||
|
||||
Raises:
|
||||
NodeNotReadyError: if node_id is not ready
|
||||
AssertionError: if node_id is not in current cluster metadata
|
||||
|
||||
Returns:
|
||||
Future: resolves to Response struct
|
||||
Future: resolves to Response struct or Error
|
||||
"""
|
||||
if not self._can_send_request(node_id):
|
||||
raise Errors.NodeNotReadyError("Attempt to send a request to node"
|
||||
" which is not ready (node id %s)."
|
||||
% node_id)
|
||||
if not self._maybe_connect(node_id):
|
||||
return Future().failure(Errors.NodeNotReadyError(node_id))
|
||||
|
||||
# Every request gets a response, except one special case:
|
||||
expect_response = True
|
||||
@@ -341,7 +334,7 @@ class KafkaClient(object):
|
||||
|
||||
# Attempt to complete pending connections
|
||||
for node_id in list(self._connecting):
|
||||
self._finish_connect(node_id)
|
||||
self._maybe_connect(node_id)
|
||||
|
||||
# Send a metadata request if needed
|
||||
metadata_timeout_ms = self._maybe_refresh_metadata()
|
||||
@@ -557,7 +550,7 @@ class KafkaClient(object):
|
||||
|
||||
elif self._can_connect(node_id):
|
||||
log.debug("Initializing connection to node %s for metadata request", node_id)
|
||||
self._initiate_connect(node_id)
|
||||
self._maybe_connect(node_id)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
@@ -41,7 +41,8 @@ def conn(mocker):
|
||||
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
|
||||
[])) # topics
|
||||
conn.blacked_out.return_value = False
|
||||
conn.connect.return_value = conn.state
|
||||
conn.connect.side_effect = lambda: conn.state
|
||||
conn.connected = lambda: conn.connect() is ConnectionStates.CONNECTED
|
||||
return conn
|
||||
|
||||
|
||||
@@ -76,7 +77,7 @@ def test_can_connect(conn):
|
||||
assert cli._can_connect(0)
|
||||
|
||||
# Node is connected, can't reconnect
|
||||
cli._initiate_connect(0)
|
||||
assert cli._maybe_connect(0) is True
|
||||
assert not cli._can_connect(0)
|
||||
|
||||
# Node is disconnected, can connect
|
||||
@@ -87,60 +88,47 @@ def test_can_connect(conn):
|
||||
conn.blacked_out.return_value = True
|
||||
assert not cli._can_connect(0)
|
||||
|
||||
def test_initiate_connect(conn):
|
||||
def test_maybe_connect(conn):
|
||||
cli = KafkaClient()
|
||||
try:
|
||||
# Node not in metadata, raises AssertionError
|
||||
cli._initiate_connect(2)
|
||||
cli._maybe_connect(2)
|
||||
except AssertionError:
|
||||
pass
|
||||
else:
|
||||
assert False, 'Exception not raised'
|
||||
|
||||
assert 0 not in cli._conns
|
||||
state = cli._initiate_connect(0)
|
||||
conn.state = ConnectionStates.DISCONNECTED
|
||||
conn.connect.side_effect = lambda: ConnectionStates.CONNECTING
|
||||
assert cli._maybe_connect(0) is False
|
||||
assert cli._conns[0] is conn
|
||||
assert state is conn.state
|
||||
|
||||
|
||||
def test_finish_connect(conn):
|
||||
cli = KafkaClient()
|
||||
try:
|
||||
# Node not in metadata, raises AssertionError
|
||||
cli._initiate_connect(2)
|
||||
except AssertionError:
|
||||
pass
|
||||
else:
|
||||
assert False, 'Exception not raised'
|
||||
|
||||
assert 0 not in cli._conns
|
||||
cli._initiate_connect(0)
|
||||
|
||||
conn.connect.return_value = ConnectionStates.CONNECTING
|
||||
state = cli._finish_connect(0)
|
||||
assert 0 in cli._connecting
|
||||
assert state is ConnectionStates.CONNECTING
|
||||
|
||||
conn.connect.return_value = ConnectionStates.CONNECTED
|
||||
state = cli._finish_connect(0)
|
||||
conn.state = ConnectionStates.CONNECTING
|
||||
conn.connect.side_effect = lambda: ConnectionStates.CONNECTED
|
||||
assert cli._maybe_connect(0) is True
|
||||
assert 0 not in cli._connecting
|
||||
assert state is ConnectionStates.CONNECTED
|
||||
|
||||
# Failure to connect should trigger metadata update
|
||||
assert not cli.cluster._need_update
|
||||
assert cli.cluster._need_update is False
|
||||
cli._connecting.add(0)
|
||||
conn.connect.return_value = ConnectionStates.DISCONNECTED
|
||||
state = cli._finish_connect(0)
|
||||
conn.state = ConnectionStates.CONNECTING
|
||||
conn.connect.side_effect = lambda: ConnectionStates.DISCONNECTED
|
||||
assert cli._maybe_connect(0) is False
|
||||
assert 0 not in cli._connecting
|
||||
assert state is ConnectionStates.DISCONNECTED
|
||||
assert cli.cluster._need_update
|
||||
assert cli.cluster._need_update is True
|
||||
|
||||
|
||||
def test_ready(conn):
|
||||
cli = KafkaClient()
|
||||
|
||||
# Node not in metadata
|
||||
assert not cli.ready(2)
|
||||
# Node not in metadata raises Exception
|
||||
try:
|
||||
cli.ready(2)
|
||||
assert False, 'Exception not raised'
|
||||
except AssertionError:
|
||||
pass
|
||||
|
||||
# Node in metadata will connect
|
||||
assert 0 not in cli._conns
|
||||
@@ -176,13 +164,13 @@ def test_ready(conn):
|
||||
# disconnected nodes, not ready
|
||||
assert cli.ready(0)
|
||||
assert cli.ready(1)
|
||||
conn.connected.return_value = False
|
||||
conn.state = ConnectionStates.DISCONNECTED
|
||||
assert not cli.ready(0)
|
||||
conn.connected.return_value = True
|
||||
|
||||
# connecting node connects
|
||||
cli._connecting.add(0)
|
||||
conn.connected.return_value = False
|
||||
conn.state = ConnectionStates.CONNECTING
|
||||
conn.connect.side_effect = lambda: ConnectionStates.CONNECTED
|
||||
cli.ready(0)
|
||||
assert 0 not in cli._connecting
|
||||
assert cli._conns[0].connect.called_with()
|
||||
@@ -195,13 +183,13 @@ def test_close(conn):
|
||||
cli.close(2)
|
||||
|
||||
# Single node close
|
||||
cli._initiate_connect(0)
|
||||
cli._maybe_connect(0)
|
||||
assert not conn.close.call_count
|
||||
cli.close(0)
|
||||
assert conn.close.call_count == 1
|
||||
|
||||
# All node close
|
||||
cli._initiate_connect(1)
|
||||
cli._maybe_connect(1)
|
||||
cli.close()
|
||||
assert conn.close.call_count == 3
|
||||
|
||||
@@ -213,7 +201,7 @@ def test_is_disconnected(conn):
|
||||
conn.state = ConnectionStates.DISCONNECTED
|
||||
assert not cli.is_disconnected(0)
|
||||
|
||||
cli._initiate_connect(0)
|
||||
cli._maybe_connect(0)
|
||||
assert cli.is_disconnected(0)
|
||||
|
||||
conn.state = ConnectionStates.CONNECTING
|
||||
@@ -225,14 +213,22 @@ def test_is_disconnected(conn):
|
||||
|
||||
def test_send(conn):
|
||||
cli = KafkaClient()
|
||||
|
||||
# Send to unknown node => raises AssertionError
|
||||
try:
|
||||
cli.send(2, None)
|
||||
except Errors.NodeNotReadyError:
|
||||
assert False, 'Exception not raised'
|
||||
except AssertionError:
|
||||
pass
|
||||
else:
|
||||
assert False, 'NodeNotReadyError not raised'
|
||||
|
||||
cli._initiate_connect(0)
|
||||
# Send to disconnected node => NodeNotReady
|
||||
conn.state = ConnectionStates.DISCONNECTED
|
||||
f = cli.send(0, None)
|
||||
assert f.failed()
|
||||
assert isinstance(f.exception, Errors.NodeNotReadyError)
|
||||
|
||||
conn.state = ConnectionStates.CONNECTED
|
||||
cli._maybe_connect(0)
|
||||
# ProduceRequest w/ 0 required_acks -> no response
|
||||
request = ProduceRequest(0, 0, [])
|
||||
ret = cli.send(0, request)
|
||||
|
||||
Reference in New Issue
Block a user