Check for 0.8.2 GroupCoordinator quirk in BrokerConnection

This commit is contained in:
Dana Powers
2016-01-03 16:06:35 -08:00
parent 5c45ec13f3
commit 9b07bfb529

View File

@@ -16,6 +16,7 @@ import kafka.common as Errors
from kafka.common import ConnectionError from kafka.common import ConnectionError
from kafka.future import Future from kafka.future import Future
from kafka.protocol.api import RequestHeader from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.types import Int32 from kafka.protocol.types import Int32
from kafka.version import __version__ from kafka.version import __version__
@@ -44,6 +45,7 @@ class BrokerConnection(object):
'max_in_flight_requests_per_connection': 5, 'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': 32768, 'receive_buffer_bytes': 32768,
'send_buffer_bytes': 131072, 'send_buffer_bytes': 131072,
'api_version': (0, 8, 2), # default to most restrictive
} }
def __init__(self, host, port, **configs): def __init__(self, host, port, **configs):
@@ -278,7 +280,17 @@ class BrokerConnection(object):
# verify send/recv correlation ids match # verify send/recv correlation ids match
recv_correlation_id = Int32.decode(read_buffer) recv_correlation_id = Int32.decode(read_buffer)
if ifr.correlation_id != recv_correlation_id:
# 0.8.2 quirk
if (self.config['api_version'] == (0, 8, 2) and
ifr.response_type is GroupCoordinatorResponse and
recv_correlation_id == 0):
raise Errors.KafkaError(
'Kafka 0.8.2 quirk -- try creating a topic first')
elif ifr.correlation_id != recv_correlation_id:
error = Errors.CorrelationIdError( error = Errors.CorrelationIdError(
'Correlation ids do not match: sent %d, recv %d' 'Correlation ids do not match: sent %d, recv %d'
% (ifr.correlation_id, recv_correlation_id)) % (ifr.correlation_id, recv_correlation_id))