Timeout idle connections via connections_max_idle_ms (#1068)
This commit is contained in:
parent
7c24135eaf
commit
04296994de
|
@ -135,6 +135,7 @@ class KafkaClient(object):
|
|||
'bootstrap_servers': 'localhost',
|
||||
'client_id': 'kafka-python-' + __version__,
|
||||
'request_timeout_ms': 40000,
|
||||
'connections_max_idle_ms': 9 * 60 * 1000,
|
||||
'reconnect_backoff_ms': 50,
|
||||
'max_in_flight_requests_per_connection': 5,
|
||||
'receive_buffer_bytes': None,
|
||||
|
@ -194,6 +195,7 @@ class KafkaClient(object):
|
|||
self._wake_r.setblocking(False)
|
||||
self._wake_lock = threading.Lock()
|
||||
self._selector.register(self._wake_r, selectors.EVENT_READ)
|
||||
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
|
||||
self._closed = False
|
||||
self._sensors = None
|
||||
if self.config['metrics']:
|
||||
|
@ -291,6 +293,8 @@ class KafkaClient(object):
|
|||
if self._sensors:
|
||||
self._sensors.connection_created.record()
|
||||
|
||||
self._idle_expiry_manager.update(node_id)
|
||||
|
||||
if 'bootstrap' in self._conns and node_id != 'bootstrap':
|
||||
bootstrap = self._conns.pop('bootstrap')
|
||||
# XXX: make conn.close() require error to cause refresh
|
||||
|
@ -308,7 +312,13 @@ class KafkaClient(object):
|
|||
pass
|
||||
if self._sensors:
|
||||
self._sensors.connection_closed.record()
|
||||
if self._refresh_on_disconnects and not self._closed:
|
||||
|
||||
idle_disconnect = False
|
||||
if self._idle_expiry_manager.is_expired(node_id):
|
||||
idle_disconnect = True
|
||||
self._idle_expiry_manager.remove(node_id)
|
||||
|
||||
if self._refresh_on_disconnects and not self._closed and not idle_disconnect:
|
||||
log.warning("Node %s connection failed -- refreshing metadata", node_id)
|
||||
self.cluster.request_update()
|
||||
|
||||
|
@ -514,10 +524,12 @@ class KafkaClient(object):
|
|||
if future and future.is_done:
|
||||
timeout = 0
|
||||
else:
|
||||
idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
|
||||
timeout = min(
|
||||
timeout_ms,
|
||||
metadata_timeout_ms,
|
||||
self._delayed_tasks.next_at() * 1000,
|
||||
idle_connection_timeout_ms,
|
||||
self.config['request_timeout_ms'])
|
||||
timeout = max(0, timeout / 1000.0) # avoid negative timeouts
|
||||
|
||||
|
@ -572,6 +584,8 @@ class KafkaClient(object):
|
|||
conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests'))
|
||||
continue
|
||||
|
||||
self._idle_expiry_manager.update(conn.node_id)
|
||||
|
||||
# Accumulate as many responses as the connection has pending
|
||||
while conn.in_flight_requests:
|
||||
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
|
||||
|
@ -601,6 +615,7 @@ class KafkaClient(object):
|
|||
|
||||
if self._sensors:
|
||||
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
|
||||
self._maybe_close_oldest_connection()
|
||||
return responses
|
||||
|
||||
def in_flight_request_count(self, node_id=None):
|
||||
|
@ -846,6 +861,14 @@ class KafkaClient(object):
|
|||
except socket.error:
|
||||
break
|
||||
|
||||
def _maybe_close_oldest_connection(self):
|
||||
expired_connection = self._idle_expiry_manager.poll_expired_connection()
|
||||
if expired_connection:
|
||||
conn_id, ts = expired_connection
|
||||
idle_ms = (time.time() - ts) * 1000
|
||||
log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms)
|
||||
self.close(node_id=conn_id)
|
||||
|
||||
|
||||
class DelayedTaskQueue(object):
|
||||
# see https://docs.python.org/2/library/heapq.html
|
||||
|
@ -920,6 +943,76 @@ class DelayedTaskQueue(object):
|
|||
return ready_tasks
|
||||
|
||||
|
||||
# OrderedDict requires python2.7+
|
||||
try:
|
||||
from collections import OrderedDict
|
||||
except ImportError:
|
||||
# If we dont have OrderedDict, we'll fallback to dict with O(n) priority reads
|
||||
OrderedDict = dict
|
||||
|
||||
|
||||
class IdleConnectionManager(object):
|
||||
def __init__(self, connections_max_idle_ms):
|
||||
if connections_max_idle_ms > 0:
|
||||
self.connections_max_idle = connections_max_idle_ms / 1000
|
||||
else:
|
||||
self.connections_max_idle = float('inf')
|
||||
self.next_idle_close_check_time = None
|
||||
self.update_next_idle_close_check_time(time.time())
|
||||
self.lru_connections = OrderedDict()
|
||||
|
||||
def update(self, conn_id):
|
||||
# order should reflect last-update
|
||||
if conn_id in self.lru_connections:
|
||||
del self.lru_connections[conn_id]
|
||||
self.lru_connections[conn_id] = time.time()
|
||||
|
||||
def remove(self, conn_id):
|
||||
if conn_id in self.lru_connections:
|
||||
del self.lru_connections[conn_id]
|
||||
|
||||
def is_expired(self, conn_id):
|
||||
if conn_id not in self.lru_connections:
|
||||
return None
|
||||
return time.time() >= self.lru_connections[conn_id] + self.connections_max_idle
|
||||
|
||||
def next_check_ms(self):
|
||||
now = time.time()
|
||||
if not self.lru_connections:
|
||||
return float('inf')
|
||||
elif self.next_idle_close_check_time <= now:
|
||||
return 0
|
||||
else:
|
||||
return int((self.next_idle_close_check_time - now) * 1000)
|
||||
|
||||
def update_next_idle_close_check_time(self, ts):
|
||||
self.next_idle_close_check_time = ts + self.connections_max_idle
|
||||
|
||||
def poll_expired_connection(self):
|
||||
if time.time() < self.next_idle_close_check_time:
|
||||
return None
|
||||
|
||||
if not len(self.lru_connections):
|
||||
return None
|
||||
|
||||
oldest_conn_id = None
|
||||
oldest_ts = None
|
||||
if OrderedDict is dict:
|
||||
for conn_id, ts in self.lru_connections.items():
|
||||
if oldest_conn_id is None or ts < oldest_ts:
|
||||
oldest_conn_id = conn_id
|
||||
oldest_ts = ts
|
||||
else:
|
||||
(oldest_conn_id, oldest_ts) = next(iter(self.lru_connections.items()))
|
||||
|
||||
self.update_next_idle_close_check_time(oldest_ts)
|
||||
|
||||
if time.time() >= oldest_ts + self.connections_max_idle:
|
||||
return (oldest_conn_id, oldest_ts)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
class KafkaClientMetrics(object):
|
||||
def __init__(self, metrics, metric_group_prefix, conns):
|
||||
self.metrics = metrics
|
||||
|
|
|
@ -177,6 +177,8 @@ class BrokerConnection(object):
|
|||
if key in configs:
|
||||
self.config[key] = configs[key]
|
||||
|
||||
self.node_id = self.config.pop('node_id')
|
||||
|
||||
if self.config['receive_buffer_bytes'] is not None:
|
||||
self.config['socket_options'].append(
|
||||
(socket.SOL_SOCKET, socket.SO_RCVBUF,
|
||||
|
@ -214,7 +216,7 @@ class BrokerConnection(object):
|
|||
if self.config['metrics']:
|
||||
self._sensors = BrokerConnectionMetrics(self.config['metrics'],
|
||||
self.config['metric_group_prefix'],
|
||||
self.config['node_id'])
|
||||
self.node_id)
|
||||
|
||||
def connect(self):
|
||||
"""Attempt to connect and return ConnectionState"""
|
||||
|
@ -904,7 +906,7 @@ class BrokerConnection(object):
|
|||
|
||||
def __repr__(self):
|
||||
return "<BrokerConnection node_id=%s host=%s/%s port=%d>" % (
|
||||
self.config['node_id'], self.hostname, self.host, self.port)
|
||||
self.node_id, self.hostname, self.host, self.port)
|
||||
|
||||
|
||||
class BrokerConnectionMetrics(object):
|
||||
|
|
|
@ -266,7 +266,7 @@ class KafkaProducer(object):
|
|||
'linger_ms': 0,
|
||||
'partitioner': DefaultPartitioner(),
|
||||
'buffer_memory': 33554432,
|
||||
'connections_max_idle_ms': 600000, # not implemented yet
|
||||
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
|
||||
'max_block_ms': 60000,
|
||||
'max_request_size': 1048576,
|
||||
'metadata_max_age_ms': 300000,
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
from __future__ import absolute_import, division
|
||||
|
||||
# selectors in stdlib as of py3.4
|
||||
try:
|
||||
import selectors # pylint: disable=import-error
|
||||
|
@ -10,7 +12,7 @@ import time
|
|||
|
||||
import pytest
|
||||
|
||||
from kafka.client_async import KafkaClient
|
||||
from kafka.client_async import KafkaClient, IdleConnectionManager
|
||||
from kafka.conn import ConnectionStates
|
||||
import kafka.errors as Errors
|
||||
from kafka.future import Future
|
||||
|
@ -319,7 +321,10 @@ def client(mocker):
|
|||
mocker.patch.object(KafkaClient, '_bootstrap')
|
||||
_poll = mocker.patch.object(KafkaClient, '_poll')
|
||||
|
||||
cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))
|
||||
cli = KafkaClient(request_timeout_ms=9999999,
|
||||
reconnect_backoff_ms=2222,
|
||||
connections_max_idle_ms=float('inf'),
|
||||
api_version=(0, 9))
|
||||
|
||||
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
|
||||
tasks.return_value = 9999999
|
||||
|
@ -395,3 +400,32 @@ def test_schedule():
|
|||
|
||||
def test_unschedule():
|
||||
pass
|
||||
|
||||
|
||||
def test_idle_connection_manager(mocker):
|
||||
t = mocker.patch.object(time, 'time')
|
||||
t.return_value = 0
|
||||
|
||||
idle = IdleConnectionManager(100)
|
||||
assert idle.next_check_ms() == float('inf')
|
||||
|
||||
idle.update('foo')
|
||||
assert not idle.is_expired('foo')
|
||||
assert idle.poll_expired_connection() is None
|
||||
assert idle.next_check_ms() == 100
|
||||
|
||||
t.return_value = 90 / 1000
|
||||
assert not idle.is_expired('foo')
|
||||
assert idle.poll_expired_connection() is None
|
||||
assert idle.next_check_ms() == 10
|
||||
|
||||
t.return_value = 100 / 1000
|
||||
assert idle.is_expired('foo')
|
||||
assert idle.next_check_ms() == 0
|
||||
|
||||
conn_id, conn_ts = idle.poll_expired_connection()
|
||||
assert conn_id == 'foo'
|
||||
assert conn_ts == 0
|
||||
|
||||
idle.remove('foo')
|
||||
assert idle.next_check_ms() == float('inf')
|
||||
|
|
Loading…
Reference in New Issue