Support consumer metadata requests
Support added for ConsumerMetadataRequest and ConsumerMetadataResponse Added consumer-aware request routine for supporting the consumer coordinator Added separate client method for fetching Kafka-committed offsets from the coordinator
This commit is contained in:
128
kafka/client.py
128
kafka/client.py
@@ -97,6 +97,26 @@ class KafkaClient(object):
|
||||
# Otherwise return the BrokerMetadata
|
||||
return self.brokers[meta.leader]
|
||||
|
||||
def _get_coordinator_for_group(self, group):
|
||||
"""
|
||||
Returns the coordinator broker for a consumer group.
|
||||
|
||||
ConsumerCoordinatorNotAvailableCode will be raised if the coordinator
|
||||
does not currently exist for the group.
|
||||
|
||||
OffsetsLoadInProgressCode is raised if the coordinator is available
|
||||
but is still loading offsets from the internal topic
|
||||
"""
|
||||
|
||||
resp = self.send_consumer_metadata_request(group)
|
||||
|
||||
# If there's a problem with finding the coordinator, raise the
|
||||
# provided error
|
||||
kafka.common.check_error(resp)
|
||||
|
||||
# Otherwise return the BrokerMetadata
|
||||
return BrokerMetadata(resp.nodeId, resp.host, resp.port)
|
||||
|
||||
def _next_id(self):
|
||||
"""Generate a new correlation id"""
|
||||
# modulo to keep w/i int32
|
||||
@@ -237,6 +257,96 @@ class KafkaClient(object):
|
||||
# Return responses in the same order as provided
|
||||
return [responses[tp] for tp in original_ordering]
|
||||
|
||||
def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
|
||||
"""
|
||||
Send a list of requests to the consumer coordinator for the group
|
||||
specified using the supplied encode/decode functions. As the payloads
|
||||
that use consumer-aware requests do not contain the group (e.g.
|
||||
OffsetFetchRequest), all payloads must be for a single group.
|
||||
|
||||
Arguments:
|
||||
|
||||
group: the name of the consumer group (str) the payloads are for
|
||||
payloads: list of object-like entities with topic (str) and
|
||||
partition (int) attributes; payloads with duplicate
|
||||
topic+partition are not supported.
|
||||
|
||||
encode_fn: a method to encode the list of payloads to a request body,
|
||||
must accept client_id, correlation_id, and payloads as
|
||||
keyword arguments
|
||||
|
||||
decode_fn: a method to decode a response body into response objects.
|
||||
The response objects must be object-like and have topic
|
||||
and partition attributes
|
||||
|
||||
Returns:
|
||||
|
||||
List of response objects in the same order as the supplied payloads
|
||||
"""
|
||||
# encoders / decoders do not maintain ordering currently
|
||||
# so we need to keep this so we can rebuild order before returning
|
||||
original_ordering = [(p.topic, p.partition) for p in payloads]
|
||||
|
||||
broker = self._get_coordinator_for_group(group)
|
||||
|
||||
# Send the list of request payloads and collect the responses and
|
||||
# errors
|
||||
responses = {}
|
||||
requestId = self._next_id()
|
||||
log.debug('Request %s to %s: %s', requestId, broker, payloads)
|
||||
request = encoder_fn(client_id=self.client_id,
|
||||
correlation_id=requestId, payloads=payloads)
|
||||
|
||||
# Send the request, recv the response
|
||||
try:
|
||||
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
|
||||
conn.send(requestId, request)
|
||||
|
||||
except ConnectionError as e:
|
||||
log.warning('ConnectionError attempting to send request %s '
|
||||
'to server %s: %s', requestId, broker, e)
|
||||
|
||||
for payload in payloads:
|
||||
topic_partition = (payload.topic, payload.partition)
|
||||
responses[topic_partition] = FailedPayloadsError(payload)
|
||||
|
||||
# No exception, try to get response
|
||||
else:
|
||||
|
||||
# decoder_fn=None signal that the server is expected to not
|
||||
# send a response. This probably only applies to
|
||||
# ProduceRequest w/ acks = 0
|
||||
if decoder_fn is None:
|
||||
log.debug('Request %s does not expect a response '
|
||||
'(skipping conn.recv)', requestId)
|
||||
for payload in payloads:
|
||||
topic_partition = (payload.topic, payload.partition)
|
||||
responses[topic_partition] = None
|
||||
return []
|
||||
|
||||
try:
|
||||
response = conn.recv(requestId)
|
||||
except ConnectionError as e:
|
||||
log.warning('ConnectionError attempting to receive a '
|
||||
'response to request %s from server %s: %s',
|
||||
requestId, broker, e)
|
||||
|
||||
for payload in payloads:
|
||||
topic_partition = (payload.topic, payload.partition)
|
||||
responses[topic_partition] = FailedPayloadsError(payload)
|
||||
|
||||
else:
|
||||
_resps = []
|
||||
for payload_response in decoder_fn(response):
|
||||
topic_partition = (payload_response.topic,
|
||||
payload_response.partition)
|
||||
responses[topic_partition] = payload_response
|
||||
_resps.append(payload_response)
|
||||
log.debug('Response %s: %s', requestId, _resps)
|
||||
|
||||
# Return responses in the same order as provided
|
||||
return [responses[tp] for tp in original_ordering]
|
||||
|
||||
def __repr__(self):
|
||||
return '<KafkaClient client_id=%s>' % (self.client_id)
|
||||
|
||||
@@ -429,6 +539,13 @@ class KafkaClient(object):
|
||||
|
||||
return self._send_broker_unaware_request(payloads, encoder, decoder)
|
||||
|
||||
def send_consumer_metadata_request(self, payloads=[], fail_on_error=True,
|
||||
callback=None):
|
||||
encoder = KafkaProtocol.encode_consumer_metadata_request
|
||||
decoder = KafkaProtocol.decode_consumer_metadata_response
|
||||
|
||||
return self._send_broker_unaware_request(payloads, encoder, decoder)
|
||||
|
||||
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
|
||||
fail_on_error=True, callback=None):
|
||||
"""
|
||||
@@ -529,3 +646,14 @@ class KafkaClient(object):
|
||||
|
||||
return [resp if not callback else callback(resp) for resp in resps
|
||||
if not fail_on_error or not self._raise_on_response_error(resp)]
|
||||
|
||||
def send_offset_fetch_request_kafka(self, group, payloads=[],
|
||||
fail_on_error=True, callback=None):
|
||||
|
||||
encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
|
||||
group=group, from_kafka=True)
|
||||
decoder = KafkaProtocol.decode_offset_fetch_response
|
||||
resps = self._send_consumer_aware_request(group, payloads, encoder, decoder)
|
||||
|
||||
return [resp if not callback else callback(resp) for resp in resps
|
||||
if not fail_on_error or not self._raise_on_response_error(resp)]
|
||||
|
@@ -13,6 +13,13 @@ MetadataRequest = namedtuple("MetadataRequest",
|
||||
MetadataResponse = namedtuple("MetadataResponse",
|
||||
["brokers", "topics"])
|
||||
|
||||
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest
|
||||
ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest",
|
||||
["groups"])
|
||||
|
||||
ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
|
||||
["error", "nodeId", "host", "port"])
|
||||
|
||||
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
|
||||
ProduceRequest = namedtuple("ProduceRequest",
|
||||
["topic", "partition", "messages"])
|
||||
@@ -160,6 +167,21 @@ class StaleLeaderEpochCodeError(BrokerResponseError):
|
||||
message = 'STALE_LEADER_EPOCH_CODE'
|
||||
|
||||
|
||||
class OffsetsLoadInProgressCode(BrokerResponseError):
|
||||
errno = 14
|
||||
message = 'OFFSETS_LOAD_IN_PROGRESS_CODE'
|
||||
|
||||
|
||||
class ConsumerCoordinatorNotAvailableCode(BrokerResponseError):
|
||||
errno = 15
|
||||
message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE_CODE'
|
||||
|
||||
|
||||
class NotCoordinatorForConsumerCode(BrokerResponseError):
|
||||
errno = 16
|
||||
message = 'NOT_COORDINATOR_FOR_CONSUMER_CODE'
|
||||
|
||||
|
||||
class KafkaUnavailableError(KafkaError):
|
||||
pass
|
||||
|
||||
|
@@ -14,7 +14,8 @@ from kafka.common import (
|
||||
MetadataResponse, ProduceResponse, FetchResponse,
|
||||
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
|
||||
ProtocolError, BufferUnderflowError, ChecksumError,
|
||||
ConsumerFetchSizeTooSmall, UnsupportedCodecError
|
||||
ConsumerFetchSizeTooSmall, UnsupportedCodecError,
|
||||
ConsumerMetadataResponse
|
||||
)
|
||||
from kafka.util import (
|
||||
crc32, read_short_string, read_int_string, relative_unpack,
|
||||
@@ -43,19 +44,21 @@ class KafkaProtocol(object):
|
||||
METADATA_KEY = 3
|
||||
OFFSET_COMMIT_KEY = 8
|
||||
OFFSET_FETCH_KEY = 9
|
||||
CONSUMER_METADATA_KEY = 10
|
||||
|
||||
###################
|
||||
# Private API #
|
||||
###################
|
||||
|
||||
@classmethod
|
||||
def _encode_message_header(cls, client_id, correlation_id, request_key):
|
||||
def _encode_message_header(cls, client_id, correlation_id, request_key,
|
||||
version=0):
|
||||
"""
|
||||
Encode the common request envelope
|
||||
"""
|
||||
return struct.pack('>hhih%ds' % len(client_id),
|
||||
request_key, # ApiKey
|
||||
0, # ApiVersion
|
||||
version, # ApiVersion
|
||||
correlation_id, # CorrelationId
|
||||
len(client_id), # ClientId size
|
||||
client_id) # ClientId
|
||||
@@ -429,6 +432,38 @@ class KafkaProtocol(object):
|
||||
|
||||
return MetadataResponse(brokers, topic_metadata)
|
||||
|
||||
@classmethod
|
||||
def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
|
||||
"""
|
||||
Encode a ConsumerMetadataRequest
|
||||
|
||||
Arguments:
|
||||
client_id: string
|
||||
correlation_id: int
|
||||
payloads: string (consumer group)
|
||||
"""
|
||||
message = []
|
||||
message.append(cls._encode_message_header(client_id, correlation_id,
|
||||
KafkaProtocol.CONSUMER_METADATA_KEY))
|
||||
message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads))
|
||||
|
||||
msg = b''.join(message)
|
||||
return write_int_string(msg)
|
||||
|
||||
@classmethod
|
||||
def decode_consumer_metadata_response(cls, data):
|
||||
"""
|
||||
Decode bytes to a ConsumerMetadataResponse
|
||||
|
||||
Arguments:
|
||||
data: bytes to decode
|
||||
"""
|
||||
((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0)
|
||||
(host, cur) = read_short_string(data, cur)
|
||||
((port,), cur) = relative_unpack('>i', data, cur)
|
||||
|
||||
return ConsumerMetadataResponse(error, nodeId, host, port)
|
||||
|
||||
@classmethod
|
||||
def encode_offset_commit_request(cls, client_id, correlation_id,
|
||||
group, payloads):
|
||||
@@ -481,21 +516,27 @@ class KafkaProtocol(object):
|
||||
|
||||
@classmethod
|
||||
def encode_offset_fetch_request(cls, client_id, correlation_id,
|
||||
group, payloads):
|
||||
group, payloads, from_kafka=False):
|
||||
"""
|
||||
Encode some OffsetFetchRequest structs
|
||||
Encode some OffsetFetchRequest structs. The request is encoded using
|
||||
version 0 if from_kafka is false, indicating a request for Zookeeper
|
||||
offsets. It is encoded using version 1 otherwise, indicating a request
|
||||
for Kafka offsets.
|
||||
|
||||
Arguments:
|
||||
client_id: string
|
||||
correlation_id: int
|
||||
group: string, the consumer group you are fetching offsets for
|
||||
payloads: list of OffsetFetchRequest
|
||||
from_kafka: bool, default False, set True for Kafka-committed offsets
|
||||
"""
|
||||
grouped_payloads = group_by_topic_and_partition(payloads)
|
||||
|
||||
message = []
|
||||
reqver = 1 if from_kafka else 0
|
||||
message.append(cls._encode_message_header(client_id, correlation_id,
|
||||
KafkaProtocol.OFFSET_FETCH_KEY))
|
||||
KafkaProtocol.OFFSET_FETCH_KEY,
|
||||
version=reqver))
|
||||
|
||||
message.append(write_short_string(group))
|
||||
message.append(struct.pack('>i', len(grouped_payloads)))
|
||||
|
@@ -560,6 +560,34 @@ class TestProtocol(unittest.TestCase):
|
||||
decoded = KafkaProtocol.decode_metadata_response(encoded)
|
||||
self.assertEqual(decoded, (node_brokers, topic_partitions))
|
||||
|
||||
def test_encode_consumer_metadata_request(self):
|
||||
expected = b"".join([
|
||||
struct.pack(">i", 17), # Total length of the request
|
||||
struct.pack('>h', 10), # API key consumer metadata
|
||||
struct.pack('>h', 0), # API version
|
||||
struct.pack('>i', 4), # Correlation ID
|
||||
struct.pack('>h3s', 3, b"cid"),# The client ID
|
||||
struct.pack('>h2s', 2, b"g1"), # Group "g1"
|
||||
])
|
||||
|
||||
encoded = KafkaProtocol.encode_consumer_metadata_request(b"cid", 4, b"g1")
|
||||
|
||||
self.assertEqual(encoded, expected)
|
||||
|
||||
def test_decode_consumer_metadata_response(self):
|
||||
encoded = b"".join([
|
||||
struct.pack(">i", 42), # Correlation ID
|
||||
struct.pack(">h", 0), # No Error
|
||||
struct.pack(">i", 1), # Broker ID
|
||||
struct.pack(">h23s", 23, b"brokers1.kafka.rdio.com"), # Broker Host
|
||||
struct.pack(">i", 1000), # Broker Port
|
||||
])
|
||||
|
||||
results = KafkaProtocol.decode_consumer_metadata_response(encoded)
|
||||
self.assertEqual(results,
|
||||
ConsumerMetadataResponse(error = 0, node = 1, host = b'brokers1.kafka.rdio.com', port = 1000)
|
||||
)
|
||||
|
||||
def test_encode_offset_request(self):
|
||||
expected = b"".join([
|
||||
struct.pack(">i", 21), # Total length of the request
|
||||
|
Reference in New Issue
Block a user