Add rack to BrokerMetadata - it is always None when using MetadataRequest v0

This commit is contained in:
Dana Powers
2016-07-16 11:58:58 -07:00
parent 72bcadcaf1
commit 9960f3d8d2
5 changed files with 52 additions and 40 deletions

View File

@@ -137,7 +137,7 @@ class SimpleClient(object):
kafka.errors.check_error(resp)
# Otherwise return the BrokerMetadata
return BrokerMetadata(resp.nodeId, resp.host, resp.port)
return BrokerMetadata(resp.nodeId, resp.host, resp.port, None)
def _next_id(self):
"""Generate a new correlation id"""
@@ -525,7 +525,7 @@ class SimpleClient(object):
log.debug('Updating broker metadata: %s', resp.brokers)
log.debug('Updating topic metadata: %s', [topic for _, topic, _ in resp.topics])
self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port))
self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port, None))
for nodeId, host, port in resp.brokers])
for error, topic, partitions in resp.topics:
@@ -577,7 +577,7 @@ class SimpleClient(object):
# (not sure how this could happen. server could be in bad state)
else:
self.topics_to_brokers[topic_part] = BrokerMetadata(
leader, None, None
leader, None, None, None
)
def send_metadata_request(self, payloads=[], fail_on_error=True,

View File

@@ -189,7 +189,7 @@ class ClusterMetadata(object):
for node_id, host, port in metadata.brokers:
self._brokers.update({
node_id: BrokerMetadata(node_id, host, port)
node_id: BrokerMetadata(node_id, host, port, None)
})
_new_partitions = {}
@@ -272,7 +272,8 @@ class ClusterMetadata(object):
coordinator = BrokerMetadata(
response.coordinator_id,
response.host,
response.port)
response.port,
None)
# Assume that group coordinators are just brokers
# (this is true now, but could diverge in future)
@@ -281,12 +282,14 @@ class ClusterMetadata(object):
# If this happens, either brokers have moved without
# changing IDs, or our assumption above is wrong
elif coordinator != self._brokers[node_id]:
log.error("GroupCoordinator metadata conflicts with existing"
" broker metadata. Coordinator: %s, Broker: %s",
coordinator, self._brokers[node_id])
self._groups[group] = node_id
return False
else:
node = self._brokers[node_id]
if coordinator.host != node.host or coordinator.port != node.port:
log.error("GroupCoordinator metadata conflicts with existing"
" broker metadata. Coordinator: %s, Broker: %s",
coordinator, node)
self._groups[group] = node_id
return False
log.info("Group coordinator for %s is %s", group, coordinator)
self._groups[group] = node_id

View File

@@ -58,7 +58,7 @@ TopicPartition = namedtuple("TopicPartition",
["topic", "partition"])
BrokerMetadata = namedtuple("BrokerMetadata",
["nodeId", "host", "port"])
["nodeId", "host", "port", "rack"])
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader", "replicas", "isr", "error"])

View File

@@ -1,6 +1,7 @@
import socket
from mock import ANY, MagicMock, patch
from operator import itemgetter
import six
from . import unittest
@@ -117,9 +118,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
BrokerMetadata(0, 'broker_1', 4567, None),
BrokerMetadata(1, 'broker_2', 5678, None)
]
resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_ERROR, 'topic_1', [
@@ -137,7 +139,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_ERROR, 2, 0, [0, 1], [0, 1])
])
]
protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
# client loads metadata at init
client = SimpleClient(hosts=['broker_1:4567'])
@@ -167,9 +169,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
BrokerMetadata(0, 'broker_1', 4567, None),
BrokerMetadata(1, 'broker_2', 5678, None)
]
resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_LEADER, 'topic_still_creating', []),
@@ -179,7 +182,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -197,9 +200,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
BrokerMetadata(0, 'broker_1', 4567, None),
BrokerMetadata(1, 'broker_2', 5678, None)
]
resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_LEADER, 'topic_still_creating', []),
@@ -209,7 +213,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -230,14 +234,15 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
BrokerMetadata(0, 'broker_1', 4567, None),
BrokerMetadata(1, 'broker_2', 5678, None)
]
resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_LEADER, 'topic_no_partitions', [])
]
protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -249,7 +254,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_ERROR, 0, 0, [0, 1], [0, 1])
])
]
protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
# calling _get_leader_for_partition (from any broker aware request)
# will try loading metadata again for the same topic
@@ -267,15 +272,16 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
BrokerMetadata(0, 'broker_1', 4567, None),
BrokerMetadata(1, 'broker_2', 5678, None)
]
resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_LEADER, 'topic_no_partitions', []),
(UNKNOWN_TOPIC_OR_PARTITION, 'topic_unknown', []),
]
protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -294,9 +300,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
BrokerMetadata(0, 'broker_1', 4567, None),
BrokerMetadata(1, 'broker_2', 5678, None)
]
resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_ERROR, 'topic_noleader', [
@@ -304,7 +311,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
self.assertDictEqual(
@@ -330,7 +337,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_ERROR, 1, 1, [1, 0], [1, 0])
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))
@@ -340,9 +347,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
BrokerMetadata(0, 'broker_1', 4567, None),
BrokerMetadata(1, 'broker_2', 5678, None)
]
resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_ERROR, 'topic_noleader', [
@@ -350,7 +358,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -368,14 +376,15 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
BrokerMetadata(0, 'broker_1', 4567, None),
BrokerMetadata(1, 'broker_2', 5678, None)
]
resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []),
]
protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])

View File

@@ -53,8 +53,8 @@ def test_bootstrap_success(conn):
conn.connect.assert_called_with()
conn.send.assert_called_once_with(MetadataRequest[0]([]))
assert cli._bootstrap_fails == 0
assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12),
BrokerMetadata(1, 'bar', 34)])
assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None),
BrokerMetadata(1, 'bar', 34, None)])
def test_bootstrap_failure(conn):
conn.state = ConnectionStates.DISCONNECTED