Small KafkaClient.check_version() improvements

- filter connection failure logging during version check
  - raise UnrecognizedBrokerVersion if we cant id broker
This commit is contained in:
Dana Powers
2016-01-25 15:59:35 -08:00
parent 71a9e65e58
commit a667a4b3be
2 changed files with 24 additions and 3 deletions

View File

@@ -586,6 +586,15 @@ class KafkaClient(object):
OffsetFetchRequest_v0, GroupCoordinatorRequest)
from .protocol.metadata import MetadataRequest
# Socket errors are logged as exceptions and can alarm users. Mute them
from logging import Filter
class ConnFilter(Filter):
def filter(self, record):
if record.funcName in ('recv', 'send'):
return False
return True
log_filter = ConnFilter()
test_cases = [
('0.9', ListGroupsRequest()),
('0.8.2', GroupCoordinatorRequest('kafka-python-default-group')),
@@ -593,18 +602,20 @@ class KafkaClient(object):
('0.8.0', MetadataRequest([])),
]
logging.getLogger('kafka.conn').addFilter(log_filter)
for version, request in test_cases:
connect()
f = self.send(node_id, request)
time.sleep(0.5)
self.send(node_id, MetadataRequest([]))
time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes
metadata = self.send(node_id, MetadataRequest([]))
self.poll(future=f)
self.poll(future=metadata)
assert f.is_done
if f.succeeded():
log.info('Broker version identifed as %s', version)
return version
break
if six.PY2:
assert isinstance(f.exception.args[0], socket.error)
@@ -615,6 +626,12 @@ class KafkaClient(object):
version, request.__class__.__name__)
continue
else:
raise Errors.UnrecognizedBrokerVersion()
logging.getLogger('kafka.conn').removeFilter(log_filter)
return version
def wakeup(self):
os.write(self._wake_w, b'x')

View File

@@ -132,6 +132,10 @@ class StaleMetadata(KafkaError):
invalid_metadata = True
class UnrecognizedBrokerVersion(KafkaError):
pass
class BrokerResponseError(KafkaError):
errno = None
message = None