From 45b2c99690fad07930d11dffb8d93dca19104c50 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 7 Apr 2016 23:53:22 -0700 Subject: [PATCH 1/6] Move check_version() logic to BrokerConnection --- kafka/client_async.py | 81 ++---------------------------------- kafka/conn.py | 96 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 78 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index cfc89fc..64233f8 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -597,84 +597,9 @@ class KafkaClient(object): if node_id is None: raise Errors.NoBrokersAvailable() - def connect(node_id): - timeout_at = time.time() + timeout - # brokers < 0.9 do not return any broker metadata if there are no topics - # so we're left with a single bootstrap connection - while not self.ready(node_id): - if time.time() >= timeout_at: - raise Errors.NodeNotReadyError(node_id) - time.sleep(0.025) - - # Monkeypatch the connection request timeout - # Generally this timeout should not get triggered - # but in case it does, we want it to be reasonably short - self._conns[node_id].config['request_timeout_ms'] = timeout * 1000 - - # kafka kills the connection when it doesnt recognize an API request - # so we can send a test request and then follow immediately with a - # vanilla MetadataRequest. If the server did not recognize the first - # request, both will be failed with a ConnectionError that wraps - # socket.error (32, 54, or 104) - import socket - from .protocol.admin import ListGroupsRequest - from .protocol.commit import ( - OffsetFetchRequest, GroupCoordinatorRequest) - from .protocol.metadata import MetadataRequest - - # Socket errors are logged as exceptions and can alarm users. Mute them - from logging import Filter - class ConnFilter(Filter): - def filter(self, record): - if record.funcName in ('recv', 'send'): - return False - return True - log_filter = ConnFilter() - - test_cases = [ - ('0.9', ListGroupsRequest[0]()), - ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')), - ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])), - ('0.8.0', MetadataRequest[0]([])), - ] - - logging.getLogger('kafka.conn').addFilter(log_filter) - for version, request in test_cases: - connect(node_id) - f = self.send(node_id, request) - time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes - metadata = self.send(node_id, MetadataRequest[0]([])) - self.poll(future=f) - self.poll(future=metadata) - - assert f.is_done, 'Future is not done? Please file bug report' - - if f.succeeded(): - log.info('Broker version identifed as %s', version) - break - - # Only enable strict checking to verify that we understand failure - # modes. For most users, the fact that the request failed should be - # enough to rule out a particular broker version. - if strict: - # If the socket flush hack did not work (which should force the - # connection to close and fail all pending requests), then we - # get a basic Request Timeout. Thisisn - if isinstance(f.exception, Errors.RequestTimedOutError): - pass - elif six.PY2: - assert isinstance(f.exception.args[0], socket.error) - assert f.exception.args[0].errno in (32, 54, 104) - else: - assert isinstance(f.exception.args[0], ConnectionError) - log.info("Broker is not v%s -- it did not recognize %s", - version, request.__class__.__name__) - else: - - raise Errors.UnrecognizedBrokerVersion() - - logging.getLogger('kafka.conn').removeFilter(log_filter) - self._conns[node_id].config['request_timeout_ms'] = self.config['request_timeout_ms'] + self._maybe_connect(node_id) + conn = self._conns[node_id] + version = conn.check_version() return version def wakeup(self): diff --git a/kafka/conn.py b/kafka/conn.py index 92b2fd3..030a3f1 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -352,6 +352,102 @@ class BrokerConnection(object): self._correlation_id = (self._correlation_id + 1) % 2**31 return self._correlation_id + def check_version(self, timeout=2, strict=False): + """Attempt to guess the broker version. This is a blocking call.""" + + # Monkeypatch the connection request timeout + # Generally this timeout should not get triggered + # but in case it does, we want it to be reasonably short + stashed_request_timeout_ms = self.config['request_timeout_ms'] + self.config['request_timeout_ms'] = timeout * 1000 + + # kafka kills the connection when it doesnt recognize an API request + # so we can send a test request and then follow immediately with a + # vanilla MetadataRequest. If the server did not recognize the first + # request, both will be failed with a ConnectionError that wraps + # socket.error (32, 54, or 104) + from .protocol.admin import ListGroupsRequest + from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest + from .protocol.metadata import MetadataRequest + + # Socket errors are logged as exceptions and can alarm users. Mute them + from logging import Filter + class ConnFilter(Filter): + def filter(self, record): + if record.funcName in ('recv', 'send'): + return False + return True + log_filter = ConnFilter() + log.addFilter(log_filter) + + test_cases = [ + ('0.9', ListGroupsRequest[0]()), + ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')), + ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])), + ('0.8.0', MetadataRequest[0]([])), + ] + + def connect(): + self.connect() + if self.connected(): + return + timeout_at = time.time() + timeout + while time.time() < timeout_at and self.connecting(): + if self.connect() is ConnectionStates.CONNECTED: + return + time.sleep(0.05) + raise Errors.NodeNotReadyError() + + for version, request in test_cases: + connect() + f = self.send(request) + # HACK: sleeping to wait for socket to send bytes + time.sleep(0.1) + # when broker receives an unrecognized request API + # it abruptly closes our socket. + # so we attempt to send a second request immediately + # that we believe it will definitely recognize (metadata) + # the attempt to write to a disconnected socket should + # immediately fail and allow us to infer that the prior + # request was unrecognized + metadata = self.send(MetadataRequest[0]([])) + + if self._sock: + self._sock.setblocking(True) + resp_1 = self.recv() + resp_2 = self.recv() + if self._sock: + self._sock.setblocking(False) + + assert f.is_done, 'Future is not done? Please file bug report' + + if f.succeeded(): + log.info('Broker version identifed as %s', version) + break + + # Only enable strict checking to verify that we understand failure + # modes. For most users, the fact that the request failed should be + # enough to rule out a particular broker version. + if strict: + # If the socket flush hack did not work (which should force the + # connection to close and fail all pending requests), then we + # get a basic Request Timeout. This is not ideal, but we'll deal + if isinstance(f.exception, Errors.RequestTimedOutError): + pass + elif six.PY2: + assert isinstance(f.exception.args[0], socket.error) + assert f.exception.args[0].errno in (32, 54, 104) + else: + assert isinstance(f.exception.args[0], ConnectionError) + log.info("Broker is not v%s -- it did not recognize %s", + version, request.__class__.__name__) + else: + raise Errors.UnrecognizedBrokerVersion() + + log.removeFilter(log_filter) + self.config['request_timeout_ms'] = stashed_request_timeout_ms + return version + def __repr__(self): return "" % (self.host, self.port) From 5368c81cf674536227cf33426b69d93dfd1e15db Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 7 Apr 2016 21:54:49 -0700 Subject: [PATCH 2/6] Add state_change_callback to BrokerConnection --- kafka/conn.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/kafka/conn.py b/kafka/conn.py index 030a3f1..28c09d9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -31,6 +31,7 @@ DEFAULT_KAFKA_PORT = 9092 class ConnectionStates(object): + DISCONNECTING = '' DISCONNECTED = '' CONNECTING = '' CONNECTED = '' @@ -49,6 +50,7 @@ class BrokerConnection(object): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'api_version': (0, 8, 2), # default to most restrictive + 'state_change_callback': lambda conn: True, } def __init__(self, host, port, afi, **configs): @@ -87,6 +89,7 @@ class BrokerConnection(object): self._sock.setblocking(False) self.state = ConnectionStates.CONNECTING self.last_attempt = time.time() + self.config['state_change_callback'](self) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -101,6 +104,7 @@ class BrokerConnection(object): if not ret or ret == errno.EISCONN: log.debug('%s: established TCP connection', str(self)) self.state = ConnectionStates.CONNECTED + self.config['state_change_callback'](self) # Connection failed # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems @@ -151,6 +155,9 @@ class BrokerConnection(object): will be failed with this exception. Default: kafka.errors.ConnectionError. """ + if self.state is not ConnectionStates.DISCONNECTED: + self.state = ConnectionStates.DISCONNECTING + self.config['state_change_callback'](self) if self._sock: self._sock.close() self._sock = None @@ -165,6 +172,7 @@ class BrokerConnection(object): while self.in_flight_requests: ifr = self.in_flight_requests.popleft() ifr.future.failure(error) + self.config['state_change_callback'](self) def send(self, request, expect_response=True): """send request, return Future() From 1435356cc5688df509e96eb3fb6ee4ad95732452 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 7 Apr 2016 22:21:57 -0700 Subject: [PATCH 3/6] Move state logic from KafkaClient._maybe_connect to _conn_state_change as callback --- kafka/client_async.py | 38 +++++++++++++++++++------------------- test/test_client_async.py | 34 ++++++++++++++++++++++++---------- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 64233f8..3dee2e1 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import copy +import functools import heapq import itertools import logging @@ -152,6 +153,22 @@ class KafkaClient(object): conn = self._conns[node_id] return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out() + def _conn_state_change(self, node_id, conn): + if conn.connecting(): + self._connecting.add(node_id) + + elif conn.connected(): + log.debug("Node %s connected", node_id) + if node_id in self._connecting: + self._connecting.remove(node_id) + + # Connection failures imply that our metadata is stale, so let's refresh + elif conn.state is ConnectionStates.DISCONNECTING: + log.warning("Node %s connect failed -- refreshing metadata", node_id) + if node_id in self._connecting: + self._connecting.remove(node_id) + self.cluster.request_update() + def _maybe_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" if node_id not in self._conns: @@ -160,32 +177,15 @@ class KafkaClient(object): log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) - host, port, afi = get_ip_port_afi(broker.host) + cb = functools.partial(self._conn_state_change, node_id) self._conns[node_id] = BrokerConnection(host, broker.port, afi, + state_change_callback=cb, **self.config) conn = self._conns[node_id] if conn.connected(): return True - conn.connect() - - if conn.connecting(): - if node_id not in self._connecting: - self._connecting.add(node_id) - - # Whether CONNECTED or DISCONNECTED, we need to remove from connecting - elif node_id in self._connecting: - self._connecting.remove(node_id) - - if conn.connected(): - log.debug("Node %s connected", node_id) - - # Connection failures imply that our metadata is stale, so let's refresh - elif conn.disconnected(): - log.warning("Node %s connect failed -- refreshing metadata", node_id) - self.cluster.request_update() - return conn.connected() def ready(self, node_id): diff --git a/test/test_client_async.py b/test/test_client_async.py index 6da5394..ae8549d 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -1,5 +1,5 @@ -import time import socket +import time import pytest @@ -83,26 +83,40 @@ def test_maybe_connect(conn): else: assert False, 'Exception not raised' + # New node_id creates a conn object assert 0 not in cli._conns conn.state = ConnectionStates.DISCONNECTED conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING) assert cli._maybe_connect(0) is False assert cli._conns[0] is conn - assert 0 in cli._connecting - conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTED) - assert cli._maybe_connect(0) is True - assert 0 not in cli._connecting + +def test_conn_state_change(mocker, conn): + cli = KafkaClient() + + node_id = 0 + conn.state = ConnectionStates.CONNECTING + cli._conn_state_change(node_id, conn) + assert node_id in cli._connecting + + conn.state = ConnectionStates.CONNECTED + cli._conn_state_change(node_id, conn) + assert node_id not in cli._connecting # Failure to connect should trigger metadata update assert cli.cluster._need_update is False - conn.state = ConnectionStates.CONNECTING - cli._connecting.add(0) - conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.DISCONNECTED) - assert cli._maybe_connect(0) is False - assert 0 not in cli._connecting + conn.state = ConnectionStates.DISCONNECTING + cli._conn_state_change(node_id, conn) + assert node_id not in cli._connecting assert cli.cluster._need_update is True + conn.state = ConnectionStates.CONNECTING + cli._conn_state_change(node_id, conn) + assert node_id in cli._connecting + conn.state = ConnectionStates.DISCONNECTING + cli._conn_state_change(node_id, conn) + assert node_id not in cli._connecting + def test_ready(mocker, conn): cli = KafkaClient() From 3e70e17fa9e7439477ee145f2d9151c3a6ef20a9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 8 Apr 2016 15:40:42 -0700 Subject: [PATCH 4/6] Add private _refresh_on_disconnects flag to KafkaClient --- kafka/client_async.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 3dee2e1..bf2f6ea 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -94,6 +94,7 @@ class KafkaClient(object): self._metadata_refresh_in_progress = False self._conns = {} self._connecting = set() + self._refresh_on_disconnects = True self._delayed_tasks = DelayedTaskQueue() self._last_bootstrap = 0 self._bootstrap_fails = 0 @@ -164,10 +165,11 @@ class KafkaClient(object): # Connection failures imply that our metadata is stale, so let's refresh elif conn.state is ConnectionStates.DISCONNECTING: - log.warning("Node %s connect failed -- refreshing metadata", node_id) if node_id in self._connecting: self._connecting.remove(node_id) - self.cluster.request_update() + if self._refresh_on_disconnects: + log.warning("Node %s connect failed -- refreshing metadata", node_id) + self.cluster.request_update() def _maybe_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" @@ -597,9 +599,13 @@ class KafkaClient(object): if node_id is None: raise Errors.NoBrokersAvailable() + # We will be intentionally causing socket failures + # and should not trigger metadata refresh + self._refresh_on_disconnects = False self._maybe_connect(node_id) conn = self._conns[node_id] version = conn.check_version() + self._refresh_on_disconnects = True return version def wakeup(self): From 85261e02e3b1dcaaa4816ef2cea90326352135f3 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 8 Apr 2016 15:43:47 -0700 Subject: [PATCH 5/6] Drop bootstrap connection once first normal broker is connected --- kafka/client_async.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kafka/client_async.py b/kafka/client_async.py index bf2f6ea..d0a3723 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -162,6 +162,12 @@ class KafkaClient(object): log.debug("Node %s connected", node_id) if node_id in self._connecting: self._connecting.remove(node_id) + if 'bootstrap' in self._conns and node_id != 'bootstrap': + bootstrap = self._conns.pop('bootstrap') + # XXX: make conn.close() require error to cause refresh + self._refresh_on_disconnects = False + bootstrap.close() + self._refresh_on_disconnects = True # Connection failures imply that our metadata is stale, so let's refresh elif conn.state is ConnectionStates.DISCONNECTING: From 897ca399917baa178390af78870fe4be90c051d5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 8 Apr 2016 15:36:18 -0700 Subject: [PATCH 6/6] Add state_change_callback to bootstrap connection --- kafka/client_async.py | 5 ++++- test/test_client_async.py | 10 ++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index d0a3723..0c22f90 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -119,7 +119,10 @@ class KafkaClient(object): metadata_request = MetadataRequest[0]([]) for host, port, afi in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) - bootstrap = BrokerConnection(host, port, afi, **self.config) + cb = functools.partial(self._conn_state_change, 'bootstrap') + bootstrap = BrokerConnection(host, port, afi, + state_change_callback=cb, + **self.config) bootstrap.connect() while bootstrap.connecting(): bootstrap.connect() diff --git a/test/test_client_async.py b/test/test_client_async.py index ae8549d..ad76aad 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -34,7 +34,10 @@ def test_bootstrap_servers(mocker, bootstrap, expected_hosts): def test_bootstrap_success(conn): conn.state = ConnectionStates.CONNECTED cli = KafkaClient() - conn.assert_called_once_with('localhost', 9092, socket.AF_INET, **cli.config) + args, kwargs = conn.call_args + assert args == ('localhost', 9092, socket.AF_INET) + kwargs.pop('state_change_callback') + assert kwargs == cli.config conn.connect.assert_called_with() conn.send.assert_called_once_with(MetadataRequest[0]([])) assert cli._bootstrap_fails == 0 @@ -44,7 +47,10 @@ def test_bootstrap_success(conn): def test_bootstrap_failure(conn): conn.state = ConnectionStates.DISCONNECTED cli = KafkaClient() - conn.assert_called_once_with('localhost', 9092, socket.AF_INET, **cli.config) + args, kwargs = conn.call_args + assert args == ('localhost', 9092, socket.AF_INET) + kwargs.pop('state_change_callback') + assert kwargs == cli.config conn.connect.assert_called_with() conn.close.assert_called_with() assert cli._bootstrap_fails == 1