clean up after comments from @rdiomar
This commit is contained in:
@@ -10,6 +10,8 @@ from kafka.common import ConnectionError
|
||||
log = logging.getLogger("kafka")
|
||||
|
||||
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
|
||||
DEFAULT_KAFKA_PORT = 9092
|
||||
|
||||
|
||||
def collect_hosts(hosts, randomize=True):
|
||||
"""
|
||||
@@ -18,14 +20,14 @@ def collect_hosts(hosts, randomize=True):
|
||||
"""
|
||||
|
||||
if isinstance(hosts, str):
|
||||
hosts = hosts.split(',')
|
||||
hosts = hosts.strip().split(',')
|
||||
|
||||
result = []
|
||||
for host_port in hosts:
|
||||
|
||||
res = host_port.split(':')
|
||||
host = res[0]
|
||||
port = int(res[1]) if len(res) > 1 else 9092
|
||||
port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
|
||||
result.append((host.strip(), port))
|
||||
|
||||
if randomize:
|
||||
@@ -105,7 +107,7 @@ class KafkaConnection(local):
|
||||
sent = self._sock.sendall(payload)
|
||||
if sent is not None:
|
||||
self._raise_connection_error()
|
||||
except socket.error, e:
|
||||
except socket.error:
|
||||
log.exception('Unable to send payload to Kafka')
|
||||
self._raise_connection_error()
|
||||
|
||||
|
@@ -3,7 +3,8 @@ import random
|
||||
import struct
|
||||
import unittest
|
||||
|
||||
from mock import patch
|
||||
from mock import MagicMock, patch
|
||||
|
||||
|
||||
from kafka import KafkaClient
|
||||
from kafka.common import (
|
||||
@@ -366,7 +367,6 @@ class TestProtocol(unittest.TestCase):
|
||||
def test_decode_offset_response(self):
|
||||
pass
|
||||
|
||||
|
||||
@unittest.skip("Not Implemented")
|
||||
def test_encode_offset_commit_request(self):
|
||||
pass
|
||||
@@ -409,26 +409,22 @@ class TestKafkaClient(unittest.TestCase):
|
||||
def test_send_broker_unaware_request_fail(self):
|
||||
'Tests that call fails when all hosts are unavailable'
|
||||
|
||||
from mock import MagicMock
|
||||
|
||||
mocked_conns = {
|
||||
('kafka01', 9092): MagicMock(),
|
||||
('kafka02', 9092): MagicMock()
|
||||
}
|
||||
# inject conns
|
||||
# inject KafkaConnection side effects
|
||||
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
|
||||
mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)")
|
||||
|
||||
def mock_get_conn(host, port):
|
||||
print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)])
|
||||
return mocked_conns[(host, port)]
|
||||
|
||||
# patch to avoid making requests before we want it
|
||||
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
|
||||
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
|
||||
|
||||
client = KafkaClient(hosts=['kafka01:9092','kafka02:9092'])
|
||||
|
||||
client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])
|
||||
|
||||
self.assertRaises(
|
||||
KafkaUnavailableError,
|
||||
@@ -439,22 +435,19 @@ class TestKafkaClient(unittest.TestCase):
|
||||
conn.send.assert_called_with(1, 'fake request')
|
||||
|
||||
def test_send_broker_unaware_request(self):
|
||||
'Tests that call fails when one of the host is available'
|
||||
|
||||
from mock import MagicMock
|
||||
'Tests that call works when at least one of the host is available'
|
||||
|
||||
mocked_conns = {
|
||||
('kafka01', 9092): MagicMock(),
|
||||
('kafka02', 9092): MagicMock(),
|
||||
('kafka03', 9092): MagicMock()
|
||||
}
|
||||
# inject conns
|
||||
# inject KafkaConnection side effects
|
||||
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
|
||||
mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response'
|
||||
mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)")
|
||||
|
||||
def mock_get_conn(host, port):
|
||||
print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)])
|
||||
return mocked_conns[(host, port)]
|
||||
|
||||
# patch to avoid making requests before we want it
|
||||
@@ -468,123 +461,6 @@ class TestKafkaClient(unittest.TestCase):
|
||||
self.assertEqual('valid response', resp)
|
||||
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
|
||||
|
||||
@unittest.skip('requires disabling recursion on load_metadata_for_topics')
|
||||
@patch('kafka.client.KafkaConnection')
|
||||
@patch('kafka.client.KafkaProtocol')
|
||||
def test_client_load_metadata(self, protocol, conn):
|
||||
|
||||
conn.recv.return_value = 'response' # anything but None
|
||||
|
||||
brokers = {}
|
||||
brokers[0] = BrokerMetadata(1, 'broker_1', 4567)
|
||||
brokers[1] = BrokerMetadata(2, 'broker_2', 5678)
|
||||
|
||||
topics = {}
|
||||
topics['topic_1'] = {
|
||||
0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2])
|
||||
}
|
||||
topics['topic_2'] = {
|
||||
0: PartitionMetadata('topic_2', 0, 0, [0, 1], [0, 1]),
|
||||
1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0])
|
||||
}
|
||||
protocol.decode_metadata_response.return_value = (brokers, topics)
|
||||
|
||||
client = KafkaClient(hosts='broker_1:4567')
|
||||
self.assertItemsEqual(
|
||||
{
|
||||
TopicAndPartition('topic_1', 0): brokers[0],
|
||||
TopicAndPartition('topic_2', 0): brokers[0],
|
||||
TopicAndPartition('topic_2', 1): brokers[1]
|
||||
},
|
||||
client.topics_to_brokers)
|
||||
|
||||
@unittest.skip('requires disabling recursion on load_metadata_for_topics')
|
||||
@patch('kafka.client.KafkaConnection')
|
||||
@patch('kafka.client.KafkaProtocol')
|
||||
def test_client_load_metadata_unassigned_partitions(self, protocol, conn):
|
||||
|
||||
conn.recv.return_value = 'response' # anything but None
|
||||
|
||||
brokers = {}
|
||||
brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
|
||||
brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
|
||||
|
||||
topics = {}
|
||||
topics['topic_1'] = {
|
||||
0: PartitionMetadata('topic_1', 0, -1, [], [])
|
||||
}
|
||||
protocol.decode_metadata_response.return_value = (brokers, topics)
|
||||
|
||||
client = KafkaClient(hosts='broker_1:4567')
|
||||
|
||||
self.assertItemsEqual({}, client.topics_to_brokers)
|
||||
self.assertRaises(
|
||||
Exception,
|
||||
client._get_leader_for_partition,
|
||||
'topic_1', 0)
|
||||
|
||||
# calling _get_leader_for_partition (from any broker aware request)
|
||||
# will try loading metadata again for the same topic
|
||||
topics['topic_1'] = {
|
||||
0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1])
|
||||
}
|
||||
leader = client._get_leader_for_partition('topic_1', 0)
|
||||
|
||||
self.assertEqual(brokers[0], leader)
|
||||
self.assertItemsEqual(
|
||||
{
|
||||
TopicAndPartition('topic_1', 0): brokers[0],
|
||||
},
|
||||
client.topics_to_brokers)
|
||||
|
||||
@unittest.skip('requires disabling recursion on load_metadata_for_topics')
|
||||
@patch('kafka.client.KafkaConnection')
|
||||
@patch('kafka.client.KafkaProtocol')
|
||||
def test_client_load_metadata_noleader_partitions(self, protocol, conn):
|
||||
|
||||
conn.recv.return_value = 'response' # anything but None
|
||||
|
||||
brokers = {}
|
||||
brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
|
||||
brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
|
||||
|
||||
topics = {}
|
||||
topics['topic_1'] = {
|
||||
0: PartitionMetadata('topic_1', 0, -1, [], [])
|
||||
}
|
||||
topics['topic_2'] = {
|
||||
0: PartitionMetadata('topic_2', 0, 0, [0, 1], []),
|
||||
1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0])
|
||||
}
|
||||
protocol.decode_metadata_response.return_value = (brokers, topics)
|
||||
|
||||
client = KafkaClient(hosts='broker_1:4567')
|
||||
self.assertItemsEqual(
|
||||
{
|
||||
TopicAndPartition('topic_2', 0): brokers[0],
|
||||
TopicAndPartition('topic_2', 1): brokers[1]
|
||||
},
|
||||
client.topics_to_brokers)
|
||||
self.assertRaises(
|
||||
Exception,
|
||||
client._get_leader_for_partition,
|
||||
'topic_1', 0)
|
||||
|
||||
# calling _get_leader_for_partition (from any broker aware request)
|
||||
# will try loading metadata again for the same topic
|
||||
topics['topic_1'] = {
|
||||
0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1])
|
||||
}
|
||||
leader = client._get_leader_for_partition('topic_1', 0)
|
||||
|
||||
self.assertEqual(brokers[0], leader)
|
||||
self.assertItemsEqual(
|
||||
{
|
||||
TopicAndPartition('topic_1', 0): brokers[0],
|
||||
TopicAndPartition('topic_2', 0): brokers[0],
|
||||
TopicAndPartition('topic_2', 1): brokers[1]
|
||||
},
|
||||
client.topics_to_brokers)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Reference in New Issue
Block a user