Merge pull request #639 from dpkp/conn_state_callback

Use KafkaClient callback to manage BrokerConnection state changes
This commit is contained in:
Dana Powers
2016-04-08 17:00:47 -07:00
3 changed files with 174 additions and 110 deletions

View File

@@ -1,6 +1,7 @@
from __future__ import absolute_import
import copy
import functools
import heapq
import itertools
import logging
@@ -93,6 +94,7 @@ class KafkaClient(object):
self._metadata_refresh_in_progress = False
self._conns = {}
self._connecting = set()
self._refresh_on_disconnects = True
self._delayed_tasks = DelayedTaskQueue()
self._last_bootstrap = 0
self._bootstrap_fails = 0
@@ -117,7 +119,10 @@ class KafkaClient(object):
metadata_request = MetadataRequest[0]([])
for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
bootstrap = BrokerConnection(host, port, afi, **self.config)
cb = functools.partial(self._conn_state_change, 'bootstrap')
bootstrap = BrokerConnection(host, port, afi,
state_change_callback=cb,
**self.config)
bootstrap.connect()
while bootstrap.connecting():
bootstrap.connect()
@@ -152,6 +157,29 @@ class KafkaClient(object):
conn = self._conns[node_id]
return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out()
def _conn_state_change(self, node_id, conn):
if conn.connecting():
self._connecting.add(node_id)
elif conn.connected():
log.debug("Node %s connected", node_id)
if node_id in self._connecting:
self._connecting.remove(node_id)
if 'bootstrap' in self._conns and node_id != 'bootstrap':
bootstrap = self._conns.pop('bootstrap')
# XXX: make conn.close() require error to cause refresh
self._refresh_on_disconnects = False
bootstrap.close()
self._refresh_on_disconnects = True
# Connection failures imply that our metadata is stale, so let's refresh
elif conn.state is ConnectionStates.DISCONNECTING:
if node_id in self._connecting:
self._connecting.remove(node_id)
if self._refresh_on_disconnects:
log.warning("Node %s connect failed -- refreshing metadata", node_id)
self.cluster.request_update()
def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
if node_id not in self._conns:
@@ -160,32 +188,15 @@ class KafkaClient(object):
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
host, port, afi = get_ip_port_afi(broker.host)
cb = functools.partial(self._conn_state_change, node_id)
self._conns[node_id] = BrokerConnection(host, broker.port, afi,
state_change_callback=cb,
**self.config)
conn = self._conns[node_id]
if conn.connected():
return True
conn.connect()
if conn.connecting():
if node_id not in self._connecting:
self._connecting.add(node_id)
# Whether CONNECTED or DISCONNECTED, we need to remove from connecting
elif node_id in self._connecting:
self._connecting.remove(node_id)
if conn.connected():
log.debug("Node %s connected", node_id)
# Connection failures imply that our metadata is stale, so let's refresh
elif conn.disconnected():
log.warning("Node %s connect failed -- refreshing metadata", node_id)
self.cluster.request_update()
return conn.connected()
def ready(self, node_id):
@@ -597,84 +608,13 @@ class KafkaClient(object):
if node_id is None:
raise Errors.NoBrokersAvailable()
def connect(node_id):
timeout_at = time.time() + timeout
# brokers < 0.9 do not return any broker metadata if there are no topics
# so we're left with a single bootstrap connection
while not self.ready(node_id):
if time.time() >= timeout_at:
raise Errors.NodeNotReadyError(node_id)
time.sleep(0.025)
# Monkeypatch the connection request timeout
# Generally this timeout should not get triggered
# but in case it does, we want it to be reasonably short
self._conns[node_id].config['request_timeout_ms'] = timeout * 1000
# kafka kills the connection when it doesnt recognize an API request
# so we can send a test request and then follow immediately with a
# vanilla MetadataRequest. If the server did not recognize the first
# request, both will be failed with a ConnectionError that wraps
# socket.error (32, 54, or 104)
import socket
from .protocol.admin import ListGroupsRequest
from .protocol.commit import (
OffsetFetchRequest, GroupCoordinatorRequest)
from .protocol.metadata import MetadataRequest
# Socket errors are logged as exceptions and can alarm users. Mute them
from logging import Filter
class ConnFilter(Filter):
def filter(self, record):
if record.funcName in ('recv', 'send'):
return False
return True
log_filter = ConnFilter()
test_cases = [
('0.9', ListGroupsRequest[0]()),
('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
('0.8.0', MetadataRequest[0]([])),
]
logging.getLogger('kafka.conn').addFilter(log_filter)
for version, request in test_cases:
connect(node_id)
f = self.send(node_id, request)
time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes
metadata = self.send(node_id, MetadataRequest[0]([]))
self.poll(future=f)
self.poll(future=metadata)
assert f.is_done, 'Future is not done? Please file bug report'
if f.succeeded():
log.info('Broker version identifed as %s', version)
break
# Only enable strict checking to verify that we understand failure
# modes. For most users, the fact that the request failed should be
# enough to rule out a particular broker version.
if strict:
# If the socket flush hack did not work (which should force the
# connection to close and fail all pending requests), then we
# get a basic Request Timeout. Thisisn
if isinstance(f.exception, Errors.RequestTimedOutError):
pass
elif six.PY2:
assert isinstance(f.exception.args[0], socket.error)
assert f.exception.args[0].errno in (32, 54, 104)
else:
assert isinstance(f.exception.args[0], ConnectionError)
log.info("Broker is not v%s -- it did not recognize %s",
version, request.__class__.__name__)
else:
raise Errors.UnrecognizedBrokerVersion()
logging.getLogger('kafka.conn').removeFilter(log_filter)
self._conns[node_id].config['request_timeout_ms'] = self.config['request_timeout_ms']
# We will be intentionally causing socket failures
# and should not trigger metadata refresh
self._refresh_on_disconnects = False
self._maybe_connect(node_id)
conn = self._conns[node_id]
version = conn.check_version()
self._refresh_on_disconnects = True
return version
def wakeup(self):

View File

@@ -31,6 +31,7 @@ DEFAULT_KAFKA_PORT = 9092
class ConnectionStates(object):
DISCONNECTING = '<disconnecting>'
DISCONNECTED = '<disconnected>'
CONNECTING = '<connecting>'
CONNECTED = '<connected>'
@@ -49,6 +50,7 @@ class BrokerConnection(object):
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'api_version': (0, 8, 2), # default to most restrictive
'state_change_callback': lambda conn: True,
}
def __init__(self, host, port, afi, **configs):
@@ -87,6 +89,7 @@ class BrokerConnection(object):
self._sock.setblocking(False)
self.state = ConnectionStates.CONNECTING
self.last_attempt = time.time()
self.config['state_change_callback'](self)
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -101,6 +104,7 @@ class BrokerConnection(object):
if not ret or ret == errno.EISCONN:
log.debug('%s: established TCP connection', str(self))
self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)
# Connection failed
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
@@ -151,6 +155,9 @@ class BrokerConnection(object):
will be failed with this exception.
Default: kafka.errors.ConnectionError.
"""
if self.state is not ConnectionStates.DISCONNECTED:
self.state = ConnectionStates.DISCONNECTING
self.config['state_change_callback'](self)
if self._sock:
self._sock.close()
self._sock = None
@@ -165,6 +172,7 @@ class BrokerConnection(object):
while self.in_flight_requests:
ifr = self.in_flight_requests.popleft()
ifr.future.failure(error)
self.config['state_change_callback'](self)
def send(self, request, expect_response=True):
"""send request, return Future()
@@ -352,6 +360,102 @@ class BrokerConnection(object):
self._correlation_id = (self._correlation_id + 1) % 2**31
return self._correlation_id
def check_version(self, timeout=2, strict=False):
"""Attempt to guess the broker version. This is a blocking call."""
# Monkeypatch the connection request timeout
# Generally this timeout should not get triggered
# but in case it does, we want it to be reasonably short
stashed_request_timeout_ms = self.config['request_timeout_ms']
self.config['request_timeout_ms'] = timeout * 1000
# kafka kills the connection when it doesnt recognize an API request
# so we can send a test request and then follow immediately with a
# vanilla MetadataRequest. If the server did not recognize the first
# request, both will be failed with a ConnectionError that wraps
# socket.error (32, 54, or 104)
from .protocol.admin import ListGroupsRequest
from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
from .protocol.metadata import MetadataRequest
# Socket errors are logged as exceptions and can alarm users. Mute them
from logging import Filter
class ConnFilter(Filter):
def filter(self, record):
if record.funcName in ('recv', 'send'):
return False
return True
log_filter = ConnFilter()
log.addFilter(log_filter)
test_cases = [
('0.9', ListGroupsRequest[0]()),
('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
('0.8.0', MetadataRequest[0]([])),
]
def connect():
self.connect()
if self.connected():
return
timeout_at = time.time() + timeout
while time.time() < timeout_at and self.connecting():
if self.connect() is ConnectionStates.CONNECTED:
return
time.sleep(0.05)
raise Errors.NodeNotReadyError()
for version, request in test_cases:
connect()
f = self.send(request)
# HACK: sleeping to wait for socket to send bytes
time.sleep(0.1)
# when broker receives an unrecognized request API
# it abruptly closes our socket.
# so we attempt to send a second request immediately
# that we believe it will definitely recognize (metadata)
# the attempt to write to a disconnected socket should
# immediately fail and allow us to infer that the prior
# request was unrecognized
metadata = self.send(MetadataRequest[0]([]))
if self._sock:
self._sock.setblocking(True)
resp_1 = self.recv()
resp_2 = self.recv()
if self._sock:
self._sock.setblocking(False)
assert f.is_done, 'Future is not done? Please file bug report'
if f.succeeded():
log.info('Broker version identifed as %s', version)
break
# Only enable strict checking to verify that we understand failure
# modes. For most users, the fact that the request failed should be
# enough to rule out a particular broker version.
if strict:
# If the socket flush hack did not work (which should force the
# connection to close and fail all pending requests), then we
# get a basic Request Timeout. This is not ideal, but we'll deal
if isinstance(f.exception, Errors.RequestTimedOutError):
pass
elif six.PY2:
assert isinstance(f.exception.args[0], socket.error)
assert f.exception.args[0].errno in (32, 54, 104)
else:
assert isinstance(f.exception.args[0], ConnectionError)
log.info("Broker is not v%s -- it did not recognize %s",
version, request.__class__.__name__)
else:
raise Errors.UnrecognizedBrokerVersion()
log.removeFilter(log_filter)
self.config['request_timeout_ms'] = stashed_request_timeout_ms
return version
def __repr__(self):
return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)

View File

@@ -1,5 +1,5 @@
import time
import socket
import time
import pytest
@@ -34,7 +34,10 @@ def test_bootstrap_servers(mocker, bootstrap, expected_hosts):
def test_bootstrap_success(conn):
conn.state = ConnectionStates.CONNECTED
cli = KafkaClient()
conn.assert_called_once_with('localhost', 9092, socket.AF_INET, **cli.config)
args, kwargs = conn.call_args
assert args == ('localhost', 9092, socket.AF_INET)
kwargs.pop('state_change_callback')
assert kwargs == cli.config
conn.connect.assert_called_with()
conn.send.assert_called_once_with(MetadataRequest[0]([]))
assert cli._bootstrap_fails == 0
@@ -44,7 +47,10 @@ def test_bootstrap_success(conn):
def test_bootstrap_failure(conn):
conn.state = ConnectionStates.DISCONNECTED
cli = KafkaClient()
conn.assert_called_once_with('localhost', 9092, socket.AF_INET, **cli.config)
args, kwargs = conn.call_args
assert args == ('localhost', 9092, socket.AF_INET)
kwargs.pop('state_change_callback')
assert kwargs == cli.config
conn.connect.assert_called_with()
conn.close.assert_called_with()
assert cli._bootstrap_fails == 1
@@ -83,26 +89,40 @@ def test_maybe_connect(conn):
else:
assert False, 'Exception not raised'
# New node_id creates a conn object
assert 0 not in cli._conns
conn.state = ConnectionStates.DISCONNECTED
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING)
assert cli._maybe_connect(0) is False
assert cli._conns[0] is conn
assert 0 in cli._connecting
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTED)
assert cli._maybe_connect(0) is True
assert 0 not in cli._connecting
def test_conn_state_change(mocker, conn):
cli = KafkaClient()
node_id = 0
conn.state = ConnectionStates.CONNECTING
cli._conn_state_change(node_id, conn)
assert node_id in cli._connecting
conn.state = ConnectionStates.CONNECTED
cli._conn_state_change(node_id, conn)
assert node_id not in cli._connecting
# Failure to connect should trigger metadata update
assert cli.cluster._need_update is False
conn.state = ConnectionStates.CONNECTING
cli._connecting.add(0)
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.DISCONNECTED)
assert cli._maybe_connect(0) is False
assert 0 not in cli._connecting
conn.state = ConnectionStates.DISCONNECTING
cli._conn_state_change(node_id, conn)
assert node_id not in cli._connecting
assert cli.cluster._need_update is True
conn.state = ConnectionStates.CONNECTING
cli._conn_state_change(node_id, conn)
assert node_id in cli._connecting
conn.state = ConnectionStates.DISCONNECTING
cli._conn_state_change(node_id, conn)
assert node_id not in cli._connecting
def test_ready(mocker, conn):
cli = KafkaClient()