Use selectors module in KafkaClient
This commit is contained in:
@@ -6,7 +6,14 @@ import heapq
|
|||||||
import itertools
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
import select
|
|
||||||
|
# selectors in stdlib as of py3.4
|
||||||
|
try:
|
||||||
|
import selectors # pylint: disable=import-error
|
||||||
|
except ImportError:
|
||||||
|
# vendored backport module
|
||||||
|
from . import selectors34 as selectors
|
||||||
|
|
||||||
import socket
|
import socket
|
||||||
import time
|
import time
|
||||||
|
|
||||||
@@ -92,6 +99,7 @@ class KafkaClient(object):
|
|||||||
self.cluster = ClusterMetadata(**self.config)
|
self.cluster = ClusterMetadata(**self.config)
|
||||||
self._topics = set() # empty set will fetch all topic metadata
|
self._topics = set() # empty set will fetch all topic metadata
|
||||||
self._metadata_refresh_in_progress = False
|
self._metadata_refresh_in_progress = False
|
||||||
|
self._selector = selectors.DefaultSelector()
|
||||||
self._conns = {}
|
self._conns = {}
|
||||||
self._connecting = set()
|
self._connecting = set()
|
||||||
self._refresh_on_disconnects = True
|
self._refresh_on_disconnects = True
|
||||||
@@ -101,6 +109,7 @@ class KafkaClient(object):
|
|||||||
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
|
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
|
||||||
self._wake_r, self._wake_w = socket.socketpair()
|
self._wake_r, self._wake_w = socket.socketpair()
|
||||||
self._wake_r.setblocking(False)
|
self._wake_r.setblocking(False)
|
||||||
|
self._selector.register(self._wake_r, selectors.EVENT_READ)
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
self._wake_r.close()
|
self._wake_r.close()
|
||||||
@@ -160,11 +169,19 @@ class KafkaClient(object):
|
|||||||
def _conn_state_change(self, node_id, conn):
|
def _conn_state_change(self, node_id, conn):
|
||||||
if conn.connecting():
|
if conn.connecting():
|
||||||
self._connecting.add(node_id)
|
self._connecting.add(node_id)
|
||||||
|
self._selector.register(conn._sock, selectors.EVENT_WRITE)
|
||||||
|
|
||||||
elif conn.connected():
|
elif conn.connected():
|
||||||
log.debug("Node %s connected", node_id)
|
log.debug("Node %s connected", node_id)
|
||||||
if node_id in self._connecting:
|
if node_id in self._connecting:
|
||||||
self._connecting.remove(node_id)
|
self._connecting.remove(node_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._selector.unregister(conn._sock)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
self._selector.register(conn._sock, selectors.EVENT_READ, conn)
|
||||||
|
|
||||||
if 'bootstrap' in self._conns and node_id != 'bootstrap':
|
if 'bootstrap' in self._conns and node_id != 'bootstrap':
|
||||||
bootstrap = self._conns.pop('bootstrap')
|
bootstrap = self._conns.pop('bootstrap')
|
||||||
# XXX: make conn.close() require error to cause refresh
|
# XXX: make conn.close() require error to cause refresh
|
||||||
@@ -176,6 +193,10 @@ class KafkaClient(object):
|
|||||||
elif conn.state is ConnectionStates.DISCONNECTING:
|
elif conn.state is ConnectionStates.DISCONNECTING:
|
||||||
if node_id in self._connecting:
|
if node_id in self._connecting:
|
||||||
self._connecting.remove(node_id)
|
self._connecting.remove(node_id)
|
||||||
|
try:
|
||||||
|
self._selector.unregister(conn._sock)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
if self._refresh_on_disconnects:
|
if self._refresh_on_disconnects:
|
||||||
log.warning("Node %s connect failed -- refreshing metadata", node_id)
|
log.warning("Node %s connect failed -- refreshing metadata", node_id)
|
||||||
self.cluster.request_update()
|
self.cluster.request_update()
|
||||||
@@ -388,45 +409,25 @@ class KafkaClient(object):
|
|||||||
|
|
||||||
return responses
|
return responses
|
||||||
|
|
||||||
def _poll(self, timeout, sleep=False):
|
def _poll(self, timeout, sleep=True):
|
||||||
# select on reads across all connected sockets, blocking up to timeout
|
# select on reads across all connected sockets, blocking up to timeout
|
||||||
sockets = dict([(conn._sock, conn)
|
assert self.in_flight_request_count() > 0 or self._connecting or sleep
|
||||||
for conn in six.itervalues(self._conns)
|
|
||||||
if conn.state is ConnectionStates.CONNECTED
|
|
||||||
and conn.in_flight_requests])
|
|
||||||
if not sockets:
|
|
||||||
# if sockets are connecting, we can wake when they are writeable
|
|
||||||
if self._connecting:
|
|
||||||
sockets = [self._conns[node]._sock for node in self._connecting]
|
|
||||||
select.select([self._wake_r], sockets, [], timeout)
|
|
||||||
elif timeout:
|
|
||||||
if sleep:
|
|
||||||
log.debug('Sleeping at %s for %s', time.time(), timeout)
|
|
||||||
select.select([self._wake_r], [], [], timeout)
|
|
||||||
log.debug('Woke up at %s', time.time())
|
|
||||||
else:
|
|
||||||
log.warning('_poll called with a non-zero timeout and'
|
|
||||||
' sleep=False -- but there was nothing to do.'
|
|
||||||
' This can cause high CPU usage during idle.')
|
|
||||||
self._clear_wake_fd()
|
|
||||||
return []
|
|
||||||
|
|
||||||
# Add a private pipe fd to allow external wakeups
|
|
||||||
fds = list(sockets.keys())
|
|
||||||
fds.append(self._wake_r)
|
|
||||||
ready, _, _ = select.select(fds, [], [], timeout)
|
|
||||||
|
|
||||||
responses = []
|
responses = []
|
||||||
for sock in ready:
|
for key, events in self._selector.select(timeout):
|
||||||
if sock == self._wake_r:
|
if key.fileobj is self._wake_r:
|
||||||
|
self._clear_wake_fd()
|
||||||
continue
|
continue
|
||||||
conn = sockets[sock]
|
elif not (events & selectors.EVENT_READ):
|
||||||
|
continue
|
||||||
|
conn = key.data
|
||||||
while conn.in_flight_requests:
|
while conn.in_flight_requests:
|
||||||
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
|
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
|
||||||
|
|
||||||
|
# Incomplete responses are buffered internally
|
||||||
|
# while conn.in_flight_requests retains the request
|
||||||
if not response:
|
if not response:
|
||||||
break
|
break
|
||||||
responses.append(response)
|
responses.append(response)
|
||||||
self._clear_wake_fd()
|
|
||||||
return responses
|
return responses
|
||||||
|
|
||||||
def in_flight_request_count(self, node_id=None):
|
def in_flight_request_count(self, node_id=None):
|
||||||
|
|||||||
@@ -1,3 +1,10 @@
|
|||||||
|
# selectors in stdlib as of py3.4
|
||||||
|
try:
|
||||||
|
import selectors # pylint: disable=import-error
|
||||||
|
except ImportError:
|
||||||
|
# vendored backport module
|
||||||
|
import kafka.selectors34 as selectors
|
||||||
|
|
||||||
import socket
|
import socket
|
||||||
import time
|
import time
|
||||||
|
|
||||||
@@ -99,15 +106,19 @@ def test_maybe_connect(conn):
|
|||||||
|
|
||||||
def test_conn_state_change(mocker, conn):
|
def test_conn_state_change(mocker, conn):
|
||||||
cli = KafkaClient()
|
cli = KafkaClient()
|
||||||
|
sel = mocker.patch.object(cli, '_selector')
|
||||||
|
|
||||||
node_id = 0
|
node_id = 0
|
||||||
conn.state = ConnectionStates.CONNECTING
|
conn.state = ConnectionStates.CONNECTING
|
||||||
cli._conn_state_change(node_id, conn)
|
cli._conn_state_change(node_id, conn)
|
||||||
assert node_id in cli._connecting
|
assert node_id in cli._connecting
|
||||||
|
sel.register.assert_called_with(conn._sock, selectors.EVENT_WRITE)
|
||||||
|
|
||||||
conn.state = ConnectionStates.CONNECTED
|
conn.state = ConnectionStates.CONNECTED
|
||||||
cli._conn_state_change(node_id, conn)
|
cli._conn_state_change(node_id, conn)
|
||||||
assert node_id not in cli._connecting
|
assert node_id not in cli._connecting
|
||||||
|
sel.unregister.assert_called_with(conn._sock)
|
||||||
|
sel.register.assert_called_with(conn._sock, selectors.EVENT_READ, conn)
|
||||||
|
|
||||||
# Failure to connect should trigger metadata update
|
# Failure to connect should trigger metadata update
|
||||||
assert cli.cluster._need_update is False
|
assert cli.cluster._need_update is False
|
||||||
@@ -115,6 +126,7 @@ def test_conn_state_change(mocker, conn):
|
|||||||
cli._conn_state_change(node_id, conn)
|
cli._conn_state_change(node_id, conn)
|
||||||
assert node_id not in cli._connecting
|
assert node_id not in cli._connecting
|
||||||
assert cli.cluster._need_update is True
|
assert cli.cluster._need_update is True
|
||||||
|
sel.unregister.assert_called_with(conn._sock)
|
||||||
|
|
||||||
conn.state = ConnectionStates.CONNECTING
|
conn.state = ConnectionStates.CONNECTING
|
||||||
cli._conn_state_change(node_id, conn)
|
cli._conn_state_change(node_id, conn)
|
||||||
@@ -167,8 +179,9 @@ def test_is_ready(mocker, conn):
|
|||||||
assert not cli.is_ready(0)
|
assert not cli.is_ready(0)
|
||||||
|
|
||||||
|
|
||||||
def test_close(conn):
|
def test_close(mocker, conn):
|
||||||
cli = KafkaClient()
|
cli = KafkaClient()
|
||||||
|
mocker.patch.object(cli, '_selector')
|
||||||
|
|
||||||
# Unknown node - silent
|
# Unknown node - silent
|
||||||
cli.close(2)
|
cli.close(2)
|
||||||
|
|||||||
Reference in New Issue
Block a user