Improve metadata protocol handling

- add MetadataRequest and MetadataResponse namedtuples
  - add TopicMetadata namedtuple
  - add error codes to Topic and Partition Metadata
  - add KafkaClient.send_metadata_request() method
  - KafkaProtocol.decode_metadata_response changed
    to return a MetadataResponse object
    so that it is consistent with server api: [broker_list, topic_list]
This commit is contained in:
Dana Powers
2014-09-01 01:48:18 -07:00
parent eddd1436c2
commit 18ac148607
5 changed files with 189 additions and 142 deletions

View File

@@ -6,7 +6,7 @@ import logging
import time import time
import kafka.common import kafka.common
from kafka.common import (TopicAndPartition, from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError, ConnectionError, FailedPayloadsError,
PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError, PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError,
KafkaTimeoutError, KafkaTimeoutError,
@@ -83,20 +83,26 @@ class KafkaClient(object):
""" """
return KafkaClient.ID_GEN.next() return KafkaClient.ID_GEN.next()
def _send_broker_unaware_request(self, requestId, request): def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
""" """
Attempt to send a broker-agnostic request to one of the available Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed. brokers. Keep trying until you succeed.
""" """
for (host, port) in self.hosts: for (host, port) in self.hosts:
requestId = self._next_id()
try: try:
conn = self._get_conn(host, port) conn = self._get_conn(host, port)
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId,
payloads=payloads)
conn.send(requestId, request) conn.send(requestId, request)
response = conn.recv(requestId) response = conn.recv(requestId)
return response return decoder_fn(response)
except Exception as e: except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, " log.warning("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (request, host, port, e)) "trying next server: %s" % (requestId, host, port, e))
raise KafkaUnavailableError("All servers failed to process request") raise KafkaUnavailableError("All servers failed to process request")
@@ -246,13 +252,11 @@ class KafkaClient(object):
Discover brokers and metadata for a set of topics. This function is called Discover brokers and metadata for a set of topics. This function is called
lazily whenever metadata is unavailable. lazily whenever metadata is unavailable.
""" """
request_id = self._next_id()
request = KafkaProtocol.encode_metadata_request(self.client_id,
request_id, topics)
response = self._send_broker_unaware_request(request_id, request) resp = self.send_metadata_request(topics)
(brokers, topics) = KafkaProtocol.decode_metadata_response(response) brokers = dict([(broker.nodeId, broker) for broker in resp.brokers])
topics = dict([(t.topic, dict([(p.partition, p) for p in t.partitions]) ) for t in resp.topics])
log.debug("Broker metadata: %s", brokers) log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics) log.debug("Topic metadata: %s", topics)
@@ -276,6 +280,14 @@ class KafkaClient(object):
else: else:
self.topics_to_brokers[topic_part] = brokers[meta.leader] self.topics_to_brokers[topic_part] = brokers[meta.leader]
def send_metadata_request(self, payloads=[], fail_on_error=True,
callback=None):
encoder = KafkaProtocol.encode_metadata_request
decoder = KafkaProtocol.decode_metadata_response
return self._send_broker_unaware_request(payloads, encoder, decoder)
def send_produce_request(self, payloads=[], acks=1, timeout=1000, def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None): fail_on_error=True, callback=None):
""" """

View File

@@ -17,8 +17,14 @@ OffsetRequest = namedtuple("OffsetRequest",
OffsetCommitRequest = namedtuple("OffsetCommitRequest", OffsetCommitRequest = namedtuple("OffsetCommitRequest",
["topic", "partition", "offset", "metadata"]) ["topic", "partition", "offset", "metadata"])
MetadataRequest = namedtuple("MetadataRequest",
["topics"])
OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
MetadataResponse = namedtuple("MetadataResponse",
["brokers", "topics"])
# Response payloads # Response payloads
ProduceResponse = namedtuple("ProduceResponse", ProduceResponse = namedtuple("ProduceResponse",
["topic", "partition", "error", "offset"]) ["topic", "partition", "error", "offset"])
@@ -36,16 +42,26 @@ OffsetFetchResponse = namedtuple("OffsetFetchResponse",
["topic", "partition", "offset", ["topic", "partition", "offset",
"metadata", "error"]) "metadata", "error"])
BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader",
"replicas", "isr"])
# Other useful structs # Other useful structs
OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) BrokerMetadata = namedtuple("BrokerMetadata",
Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) ["nodeId", "host", "port"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
TopicMetadata = namedtuple("TopicMetadata",
["topic", "error", "partitions"])
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader", "replicas", "isr", "error"])
OffsetAndMessage = namedtuple("OffsetAndMessage",
["offset", "message"])
Message = namedtuple("Message",
["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition",
["topic", "partition"])
################# #################

View File

@@ -6,11 +6,12 @@ from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode gzip_encode, gzip_decode, snappy_encode, snappy_decode
) )
from kafka.common import ( from kafka.common import (
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage, Message, OffsetAndMessage, TopicAndPartition,
ProduceResponse, FetchResponse, OffsetResponse, BrokerMetadata, TopicMetadata, PartitionMetadata,
OffsetCommitResponse, OffsetFetchResponse, ProtocolError, MetadataResponse, ProduceResponse, FetchResponse,
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
UnsupportedCodecError ProtocolError, BufferUnderflowError, ChecksumError,
ConsumerFetchSizeTooSmall, UnsupportedCodecError
) )
from kafka.util import ( from kafka.util import (
read_short_string, read_int_string, relative_unpack, read_short_string, read_int_string, relative_unpack,
@@ -340,7 +341,8 @@ class KafkaProtocol(object):
yield OffsetResponse(topic, partition, error, tuple(offsets)) yield OffsetResponse(topic, partition, error, tuple(offsets))
@classmethod @classmethod
def encode_metadata_request(cls, client_id, correlation_id, topics=None): def encode_metadata_request(cls, client_id, correlation_id, topics=None,
payloads=None):
""" """
Encode a MetadataRequest Encode a MetadataRequest
@@ -350,7 +352,11 @@ class KafkaProtocol(object):
correlation_id: int correlation_id: int
topics: list of strings topics: list of strings
""" """
topics = [] if topics is None else topics if payloads is None:
topics = [] if topics is None else topics
else:
topics = payloads
message = cls._encode_message_header(client_id, correlation_id, message = cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.METADATA_KEY) KafkaProtocol.METADATA_KEY)
@@ -373,28 +379,24 @@ class KafkaProtocol(object):
((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0) ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
# Broker info # Broker info
brokers = {} brokers = []
for i in range(numbrokers): for i in range(numbrokers):
((nodeId, ), cur) = relative_unpack('>i', data, cur) ((nodeId, ), cur) = relative_unpack('>i', data, cur)
(host, cur) = read_short_string(data, cur) (host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur) ((port,), cur) = relative_unpack('>i', data, cur)
brokers[nodeId] = BrokerMetadata(nodeId, host, port) brokers.append(BrokerMetadata(nodeId, host, port))
# Topic info # Topic info
((num_topics,), cur) = relative_unpack('>i', data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur)
topic_metadata = {} topic_metadata = []
for i in range(num_topics): for i in range(num_topics):
# NOTE: topic_error is discarded. Should probably be returned with
# the topic metadata.
((topic_error,), cur) = relative_unpack('>h', data, cur) ((topic_error,), cur) = relative_unpack('>h', data, cur)
(topic_name, cur) = read_short_string(data, cur) (topic_name, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur)
partition_metadata = {} partition_metadata = []
for j in range(num_partitions): for j in range(num_partitions):
# NOTE: partition_error_code is discarded. Should probably be
# returned with the partition metadata.
((partition_error_code, partition, leader, numReplicas), cur) = \ ((partition_error_code, partition, leader, numReplicas), cur) = \
relative_unpack('>hiii', data, cur) relative_unpack('>hiii', data, cur)
@@ -404,13 +406,16 @@ class KafkaProtocol(object):
((num_isr,), cur) = relative_unpack('>i', data, cur) ((num_isr,), cur) = relative_unpack('>i', data, cur)
(isr, cur) = relative_unpack('>%di' % num_isr, data, cur) (isr, cur) = relative_unpack('>%di' % num_isr, data, cur)
partition_metadata[partition] = \ partition_metadata.append(
PartitionMetadata( PartitionMetadata(topic_name, partition, leader,
topic_name, partition, leader, replicas, isr) replicas, isr, partition_error_code)
)
topic_metadata[topic_name] = partition_metadata topic_metadata.append(
TopicMetadata(topic_name, topic_error, partition_metadata)
)
return brokers, topic_metadata return MetadataResponse(brokers, topic_metadata)
@classmethod @classmethod
def encode_offset_commit_request(cls, client_id, correlation_id, def encode_offset_commit_request(cls, client_id, correlation_id,

View File

@@ -1,10 +1,11 @@
import unittest2 import unittest2
from mock import MagicMock, patch from mock import ANY, MagicMock, patch
from kafka import KafkaClient from kafka import KafkaClient
from kafka.common import ( from kafka.common import (
ProduceRequest, BrokerMetadata, PartitionMetadata, ProduceRequest, MetadataResponse,
BrokerMetadata, TopicMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError, TopicAndPartition, KafkaUnavailableError,
LeaderUnavailableError, PartitionUnavailableError LeaderUnavailableError, PartitionUnavailableError
) )
@@ -56,10 +57,12 @@ class TestKafkaClient(unittest2.TestCase):
client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])
with self.assertRaises(KafkaUnavailableError): with self.assertRaises(KafkaUnavailableError):
client._send_broker_unaware_request(1, 'fake request') client._send_broker_unaware_request(payloads=['fake request'],
encoder_fn=MagicMock(return_value='fake encoded message'),
decoder_fn=lambda x: x)
for key, conn in mocked_conns.iteritems(): for key, conn in mocked_conns.iteritems():
conn.send.assert_called_with(1, 'fake request') conn.send.assert_called_with(ANY, 'fake encoded message')
def test_send_broker_unaware_request(self): def test_send_broker_unaware_request(self):
'Tests that call works when at least one of the host is available' 'Tests that call works when at least one of the host is available'
@@ -80,12 +83,15 @@ class TestKafkaClient(unittest2.TestCase):
# patch to avoid making requests before we want it # patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'): with patch.object(KafkaClient, 'load_metadata_for_topics'):
with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092') with patch.object(KafkaClient, '_next_id', return_value=1):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
resp = client._send_broker_unaware_request(1, 'fake request') resp = client._send_broker_unaware_request(payloads=['fake request'],
encoder_fn=MagicMock(),
decoder_fn=lambda x: x)
self.assertEqual('valid response', resp) self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
@patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')
@@ -94,25 +100,27 @@ class TestKafkaClient(unittest2.TestCase):
conn.recv.return_value = 'response' # anything but None conn.recv.return_value = 'response' # anything but None
brokers = {} brokers = [
brokers[0] = BrokerMetadata(1, 'broker_1', 4567) BrokerMetadata(0, 'broker_1', 4567),
brokers[1] = BrokerMetadata(2, 'broker_2', 5678) BrokerMetadata(1, 'broker_2', 5678)
]
topics = {} topics = [
topics['topic_1'] = { TopicMetadata('topic_1', 0, [
0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], 0)
} ]),
topics['topic_noleader'] = { TopicMetadata('topic_noleader', 0, [
0: PartitionMetadata('topic_noleader', 0, -1, [], []), PartitionMetadata('topic_noleader', 0, -1, [], [], 0),
1: PartitionMetadata('topic_noleader', 1, -1, [], []) PartitionMetadata('topic_noleader', 1, -1, [], [], 0)
} ]),
topics['topic_no_partitions'] = {} TopicMetadata('topic_no_partitions', 0, []),
topics['topic_3'] = { TopicMetadata('topic_3', 0, [
0: PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1]), PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], 0),
1: PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0]), PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], 0),
2: PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1]) PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], 0)
} ])
protocol.decode_metadata_response.return_value = (brokers, topics) ]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
# client loads metadata at init # client loads metadata at init
client = KafkaClient(hosts=['broker_1:4567']) client = KafkaClient(hosts=['broker_1:4567'])
@@ -132,30 +140,35 @@ class TestKafkaClient(unittest2.TestCase):
conn.recv.return_value = 'response' # anything but None conn.recv.return_value = 'response' # anything but None
brokers = {} brokers = [
brokers[0] = BrokerMetadata(0, 'broker_1', 4567) BrokerMetadata(0, 'broker_1', 4567),
brokers[1] = BrokerMetadata(1, 'broker_2', 5678) BrokerMetadata(1, 'broker_2', 5678)
]
topics = {'topic_no_partitions': {}} topics = [
protocol.decode_metadata_response.return_value = (brokers, topics) TopicMetadata('topic_no_partitions', 0, [])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567']) client = KafkaClient(hosts=['broker_1:4567'])
# topic metadata is loaded but empty # topic metadata is loaded but empty
self.assertDictEqual({}, client.topics_to_brokers) self.assertDictEqual({}, client.topics_to_brokers)
topics['topic_no_partitions'] = { topics = [
0: PartitionMetadata('topic_no_partitions', 0, 0, [0, 1], [0, 1]) TopicMetadata('topic_one_partition', 0, [
} PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], 0)
protocol.decode_metadata_response.return_value = (brokers, topics) ])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
# calling _get_leader_for_partition (from any broker aware request) # calling _get_leader_for_partition (from any broker aware request)
# will try loading metadata again for the same topic # will try loading metadata again for the same topic
leader = client._get_leader_for_partition('topic_no_partitions', 0) leader = client._get_leader_for_partition('topic_one_partition', 0)
self.assertEqual(brokers[0], leader) self.assertEqual(brokers[0], leader)
self.assertDictEqual({ self.assertDictEqual({
TopicAndPartition('topic_no_partitions', 0): brokers[0]}, TopicAndPartition('topic_one_partition', 0): brokers[0]},
client.topics_to_brokers) client.topics_to_brokers)
@patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaConnection')
@@ -165,12 +178,15 @@ class TestKafkaClient(unittest2.TestCase):
conn.recv.return_value = 'response' # anything but None conn.recv.return_value = 'response' # anything but None
brokers = {} brokers = [
brokers[0] = BrokerMetadata(0, 'broker_1', 4567) BrokerMetadata(0, 'broker_1', 4567),
brokers[1] = BrokerMetadata(1, 'broker_2', 5678) BrokerMetadata(1, 'broker_2', 5678)
]
topics = {'topic_no_partitions': {}} topics = [
protocol.decode_metadata_response.return_value = (brokers, topics) TopicMetadata('topic_no_partitions', 0, [])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567']) client = KafkaClient(hosts=['broker_1:4567'])
@@ -186,16 +202,18 @@ class TestKafkaClient(unittest2.TestCase):
conn.recv.return_value = 'response' # anything but None conn.recv.return_value = 'response' # anything but None
brokers = {} brokers = [
brokers[0] = BrokerMetadata(0, 'broker_1', 4567) BrokerMetadata(0, 'broker_1', 4567),
brokers[1] = BrokerMetadata(1, 'broker_2', 5678) BrokerMetadata(1, 'broker_2', 5678)
]
topics = {} topics = [
topics['topic_noleader'] = { TopicMetadata('topic_noleader', 0, [
0: PartitionMetadata('topic_noleader', 0, -1, [], []), PartitionMetadata('topic_noleader', 0, -1, [], [], 0),
1: PartitionMetadata('topic_noleader', 1, -1, [], []) PartitionMetadata('topic_noleader', 1, -1, [], [], 0)
} ]),
protocol.decode_metadata_response.return_value = (brokers, topics) ]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567']) client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual( self.assertDictEqual(
@@ -207,31 +225,35 @@ class TestKafkaClient(unittest2.TestCase):
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
topics['topic_noleader'] = { topics = [
0: PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1]), TopicMetadata('topic_noleader', 0, [
1: PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0]) PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], 0),
} PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], 0)
protocol.decode_metadata_response.return_value = (brokers, topics) ]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))
@patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')
def test_send_produce_request_raises_when_noleader(self, protocol, conn): def test_send_produce_request_raises_when_noleader(self, protocol, conn):
"Send producer request raises LeaderUnavailableError if leader is not available" "Send producer request raises LeaderNotAvailableError if leader is not available"
conn.recv.return_value = 'response' # anything but None conn.recv.return_value = 'response' # anything but None
brokers = {} brokers = [
brokers[0] = BrokerMetadata(0, 'broker_1', 4567) BrokerMetadata(0, 'broker_1', 4567),
brokers[1] = BrokerMetadata(1, 'broker_2', 5678) BrokerMetadata(1, 'broker_2', 5678)
]
topics = {} topics = [
topics['topic_noleader'] = { TopicMetadata('topic_noleader', 0, [
0: PartitionMetadata('topic_noleader', 0, -1, [], []), PartitionMetadata('topic_noleader', 0, -1, [], [], 0),
1: PartitionMetadata('topic_noleader', 1, -1, [], []) PartitionMetadata('topic_noleader', 1, -1, [], [], 0)
} ]),
protocol.decode_metadata_response.return_value = (brokers, topics) ]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567']) client = KafkaClient(hosts=['broker_1:4567'])

View File

@@ -11,10 +11,10 @@ from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProduceRequest, FetchRequest, Message, ChecksumError, ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, ProduceResponse, FetchResponse, OffsetAndMessage,
BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
ProtocolError, LeaderUnavailableError, PartitionUnavailableError, KafkaUnavailableError, PartitionUnavailableError,
UnsupportedCodecError UnsupportedCodecError, ConsumerFetchSizeTooSmall, ProtocolError,
) )
from kafka.codec import ( from kafka.codec import (
has_snappy, gzip_encode, gzip_decode, has_snappy, gzip_encode, gzip_decode,
@@ -454,21 +454,22 @@ class TestProtocol(unittest2.TestCase):
self.assertEqual(encoded, expected) self.assertEqual(encoded, expected)
def _create_encoded_metadata_response(self, broker_data, topic_data, def _create_encoded_metadata_response(self, brokers, topics):
topic_errors, partition_errors): encoded = struct.pack('>ii', 3, len(brokers))
encoded = struct.pack('>ii', 3, len(broker_data)) for broker in brokers:
for node_id, broker in broker_data.iteritems(): encoded += struct.pack('>ih%dsi' % len(broker.host), broker.nodeId,
encoded += struct.pack('>ih%dsi' % len(broker.host), node_id,
len(broker.host), broker.host, broker.port) len(broker.host), broker.host, broker.port)
encoded += struct.pack('>i', len(topic_data)) encoded += struct.pack('>i', len(topics))
for topic, partitions in topic_data.iteritems(): for topic in topics:
encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic], encoded += struct.pack('>hh%dsi' % len(topic.topic),
len(topic), topic, len(partitions)) topic.error, len(topic.topic),
for partition, metadata in partitions.iteritems(): topic.topic, len(topic.partitions))
for metadata in topic.partitions:
encoded += struct.pack('>hiii', encoded += struct.pack('>hiii',
partition_errors[(topic, partition)], metadata.error,
partition, metadata.leader, metadata.partition,
metadata.leader,
len(metadata.replicas)) len(metadata.replicas))
if len(metadata.replicas) > 0: if len(metadata.replicas) > 0:
encoded += struct.pack('>%di' % len(metadata.replicas), encoded += struct.pack('>%di' % len(metadata.replicas),
@@ -478,35 +479,26 @@ class TestProtocol(unittest2.TestCase):
if len(metadata.isr) > 0: if len(metadata.isr) > 0:
encoded += struct.pack('>%di' % len(metadata.isr), encoded += struct.pack('>%di' % len(metadata.isr),
*metadata.isr) *metadata.isr)
return encoded return encoded
def test_decode_metadata_response(self): def test_decode_metadata_response(self):
node_brokers = { node_brokers = [
0: BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000), BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000),
1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001), BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001),
3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000) BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000)
} ]
topic_partitions = { topic_partitions = [
"topic1": { TopicMetadata("topic1", 0, [
0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)), PartitionMetadata("topic1", 0, 1, (0, 2), (2,), 0),
1: PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1)) PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1), 1)
}, ]),
"topic2": { TopicMetadata("topic2", 1, [
0: PartitionMetadata("topic2", 0, 0, (), ()) PartitionMetadata("topic2", 0, 0, (), (), 0),
} ]),
} ]
topic_errors = {"topic1": 0, "topic2": 1}
partition_errors = {
("topic1", 0): 0,
("topic1", 1): 1,
("topic2", 0): 0
}
encoded = self._create_encoded_metadata_response(node_brokers, encoded = self._create_encoded_metadata_response(node_brokers,
topic_partitions, topic_partitions)
topic_errors,
partition_errors)
decoded = KafkaProtocol.decode_metadata_response(encoded) decoded = KafkaProtocol.decode_metadata_response(encoded)
self.assertEqual(decoded, (node_brokers, topic_partitions)) self.assertEqual(decoded, (node_brokers, topic_partitions))