Move check_version() logic to BrokerConnection

This commit is contained in:
Dana Powers
2016-04-07 23:53:22 -07:00
parent 4323e5c21c
commit 45b2c99690
2 changed files with 99 additions and 78 deletions

View File

@@ -597,84 +597,9 @@ class KafkaClient(object):
if node_id is None:
raise Errors.NoBrokersAvailable()
def connect(node_id):
timeout_at = time.time() + timeout
# brokers < 0.9 do not return any broker metadata if there are no topics
# so we're left with a single bootstrap connection
while not self.ready(node_id):
if time.time() >= timeout_at:
raise Errors.NodeNotReadyError(node_id)
time.sleep(0.025)
# Monkeypatch the connection request timeout
# Generally this timeout should not get triggered
# but in case it does, we want it to be reasonably short
self._conns[node_id].config['request_timeout_ms'] = timeout * 1000
# kafka kills the connection when it doesnt recognize an API request
# so we can send a test request and then follow immediately with a
# vanilla MetadataRequest. If the server did not recognize the first
# request, both will be failed with a ConnectionError that wraps
# socket.error (32, 54, or 104)
import socket
from .protocol.admin import ListGroupsRequest
from .protocol.commit import (
OffsetFetchRequest, GroupCoordinatorRequest)
from .protocol.metadata import MetadataRequest
# Socket errors are logged as exceptions and can alarm users. Mute them
from logging import Filter
class ConnFilter(Filter):
def filter(self, record):
if record.funcName in ('recv', 'send'):
return False
return True
log_filter = ConnFilter()
test_cases = [
('0.9', ListGroupsRequest[0]()),
('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
('0.8.0', MetadataRequest[0]([])),
]
logging.getLogger('kafka.conn').addFilter(log_filter)
for version, request in test_cases:
connect(node_id)
f = self.send(node_id, request)
time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes
metadata = self.send(node_id, MetadataRequest[0]([]))
self.poll(future=f)
self.poll(future=metadata)
assert f.is_done, 'Future is not done? Please file bug report'
if f.succeeded():
log.info('Broker version identifed as %s', version)
break
# Only enable strict checking to verify that we understand failure
# modes. For most users, the fact that the request failed should be
# enough to rule out a particular broker version.
if strict:
# If the socket flush hack did not work (which should force the
# connection to close and fail all pending requests), then we
# get a basic Request Timeout. Thisisn
if isinstance(f.exception, Errors.RequestTimedOutError):
pass
elif six.PY2:
assert isinstance(f.exception.args[0], socket.error)
assert f.exception.args[0].errno in (32, 54, 104)
else:
assert isinstance(f.exception.args[0], ConnectionError)
log.info("Broker is not v%s -- it did not recognize %s",
version, request.__class__.__name__)
else:
raise Errors.UnrecognizedBrokerVersion()
logging.getLogger('kafka.conn').removeFilter(log_filter)
self._conns[node_id].config['request_timeout_ms'] = self.config['request_timeout_ms']
self._maybe_connect(node_id)
conn = self._conns[node_id]
version = conn.check_version()
return version
def wakeup(self):

View File

@@ -352,6 +352,102 @@ class BrokerConnection(object):
self._correlation_id = (self._correlation_id + 1) % 2**31
return self._correlation_id
def check_version(self, timeout=2, strict=False):
"""Attempt to guess the broker version. This is a blocking call."""
# Monkeypatch the connection request timeout
# Generally this timeout should not get triggered
# but in case it does, we want it to be reasonably short
stashed_request_timeout_ms = self.config['request_timeout_ms']
self.config['request_timeout_ms'] = timeout * 1000
# kafka kills the connection when it doesnt recognize an API request
# so we can send a test request and then follow immediately with a
# vanilla MetadataRequest. If the server did not recognize the first
# request, both will be failed with a ConnectionError that wraps
# socket.error (32, 54, or 104)
from .protocol.admin import ListGroupsRequest
from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
from .protocol.metadata import MetadataRequest
# Socket errors are logged as exceptions and can alarm users. Mute them
from logging import Filter
class ConnFilter(Filter):
def filter(self, record):
if record.funcName in ('recv', 'send'):
return False
return True
log_filter = ConnFilter()
log.addFilter(log_filter)
test_cases = [
('0.9', ListGroupsRequest[0]()),
('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
('0.8.0', MetadataRequest[0]([])),
]
def connect():
self.connect()
if self.connected():
return
timeout_at = time.time() + timeout
while time.time() < timeout_at and self.connecting():
if self.connect() is ConnectionStates.CONNECTED:
return
time.sleep(0.05)
raise Errors.NodeNotReadyError()
for version, request in test_cases:
connect()
f = self.send(request)
# HACK: sleeping to wait for socket to send bytes
time.sleep(0.1)
# when broker receives an unrecognized request API
# it abruptly closes our socket.
# so we attempt to send a second request immediately
# that we believe it will definitely recognize (metadata)
# the attempt to write to a disconnected socket should
# immediately fail and allow us to infer that the prior
# request was unrecognized
metadata = self.send(MetadataRequest[0]([]))
if self._sock:
self._sock.setblocking(True)
resp_1 = self.recv()
resp_2 = self.recv()
if self._sock:
self._sock.setblocking(False)
assert f.is_done, 'Future is not done? Please file bug report'
if f.succeeded():
log.info('Broker version identifed as %s', version)
break
# Only enable strict checking to verify that we understand failure
# modes. For most users, the fact that the request failed should be
# enough to rule out a particular broker version.
if strict:
# If the socket flush hack did not work (which should force the
# connection to close and fail all pending requests), then we
# get a basic Request Timeout. This is not ideal, but we'll deal
if isinstance(f.exception, Errors.RequestTimedOutError):
pass
elif six.PY2:
assert isinstance(f.exception.args[0], socket.error)
assert f.exception.args[0].errno in (32, 54, 104)
else:
assert isinstance(f.exception.args[0], ConnectionError)
log.info("Broker is not v%s -- it did not recognize %s",
version, request.__class__.__name__)
else:
raise Errors.UnrecognizedBrokerVersion()
log.removeFilter(log_filter)
self.config['request_timeout_ms'] = stashed_request_timeout_ms
return version
def __repr__(self):
return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)