Allow KafkaClient to take in a list of brokers for bootstrapping

This commit is contained in:
Marc Labbe
2013-11-14 09:26:49 -05:00
parent af3a57edb2
commit 0bdff4e833
8 changed files with 257 additions and 40 deletions

View File

@@ -30,7 +30,7 @@ from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer from kafka.producer import SimpleProducer, KeyedProducer
kafka = KafkaClient("localhost", 9092) kafka = KafkaClient("localhost:9092")
# To send messages synchronously # To send messages synchronously
producer = SimpleProducer(kafka, "my-topic") producer = SimpleProducer(kafka, "my-topic")
@@ -81,7 +81,7 @@ from kafka.client import KafkaClient
from kafka.producer import KeyedProducer from kafka.producer import KeyedProducer
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
kafka = KafkaClient("localhost", 9092) kafka = KafkaClient("localhost:9092")
# HashedPartitioner is default # HashedPartitioner is default
producer = KeyedProducer(kafka, "my-topic") producer = KeyedProducer(kafka, "my-topic")
@@ -96,7 +96,7 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
from kafka.client import KafkaClient from kafka.client import KafkaClient
from kafka.consumer import MultiProcessConsumer from kafka.consumer import MultiProcessConsumer
kafka = KafkaClient("localhost", 9092) kafka = KafkaClient("localhost:9092")
# This will split the number of partitions among two processes # This will split the number of partitions among two processes
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
@@ -116,7 +116,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):
```python ```python
from kafka.client import KafkaClient from kafka.client import KafkaClient
kafka = KafkaClient("localhost", 9092) kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1, req = ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProdocol.encode_message("some message")]) messages=[KafkaProdocol.encode_message("some message")])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)

View File

@@ -14,7 +14,7 @@ def consume_example(client):
print(message) print(message)
def main(): def main():
client = KafkaClient("localhost", 9092) client = KafkaClient("localhost:9092")
produce_example(client) produce_example(client)
consume_example(client) consume_example(client)

View File

@@ -18,7 +18,7 @@ There are a few levels of abstraction:
# Possible API # Possible API
client = KafkaClient("localhost", 9092) client = KafkaClient("localhost:9092")
producer = KafkaProducer(client, "topic") producer = KafkaProducer(client, "topic")
producer.send_string("hello") producer.send_string("hello")

View File

@@ -8,7 +8,7 @@ import time
from kafka.common import ErrorMapping, TopicAndPartition from kafka.common import ErrorMapping, TopicAndPartition
from kafka.common import ConnectionError, FailedPayloadsException from kafka.common import ConnectionError, FailedPayloadsException
from kafka.conn import KafkaConnection from kafka.conn import collect_hosts, KafkaConnection
from kafka.protocol import KafkaProtocol from kafka.protocol import KafkaProtocol
log = logging.getLogger("kafka") log = logging.getLogger("kafka")
@@ -19,13 +19,15 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python" CLIENT_ID = "kafka-python"
ID_GEN = count() ID_GEN = count()
def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): def __init__(self, hosts, bufsize=4096, client_id=CLIENT_ID):
# We need one connection to bootstrap # We need one connection to bootstrap
self.bufsize = bufsize self.bufsize = bufsize
self.client_id = client_id self.client_id = client_id
self.conns = { # (host, port) -> KafkaConnection
(host, port): KafkaConnection(host, port, bufsize) self.hosts = collect_hosts(hosts)
}
# create connections only when we need them
self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
@@ -35,15 +37,19 @@ class KafkaClient(object):
# Private API # # Private API #
################## ##################
def _get_conn_for_broker(self, broker): def _get_conn(self, host, port):
""" "Get or create a connection to a broker using host and port"
Get or create a connection to a broker
"""
if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = \
KafkaConnection(broker.host, broker.port, self.bufsize)
return self.conns[(broker.host, broker.port)] host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(host, port, self.bufsize)
return self.conns[host_key]
def _get_conn_for_broker(self, broker):
"Get or create a connection to a broker"
return self._get_conn(broker.host, broker.port)
def _get_leader_for_partition(self, topic, partition): def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition) key = TopicAndPartition(topic, partition)
@@ -108,7 +114,8 @@ class KafkaClient(object):
Attempt to send a broker-agnostic request to one of the available Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed. brokers. Keep trying until you succeed.
""" """
for conn in self.conns.values(): for (host, port) in self.hosts:
conn = self._get_conn(host, port)
try: try:
conn.send(requestId, request) conn.send(requestId, request)
response = conn.recv(requestId) response = conn.recv(requestId)
@@ -174,7 +181,7 @@ class KafkaClient(object):
except ConnectionError, e: # ignore BufferUnderflow for now except ConnectionError, e: # ignore BufferUnderflow for now
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
failed_payloads += payloads failed_payloads += payloads
self.topics_to_brokers = {} # reset metadata self.topics_to_brokers = {} # reset metadata
continue continue
for response in decoder_fn(response): for response in decoder_fn(response):

View File

@@ -2,6 +2,7 @@ import copy
import logging import logging
import socket import socket
import struct import struct
from random import shuffle
from threading import local from threading import local
from kafka.common import BufferUnderflowError from kafka.common import BufferUnderflowError
@@ -10,6 +11,26 @@ from kafka.common import ConnectionError
log = logging.getLogger("kafka") log = logging.getLogger("kafka")
def collect_hosts(hosts, randomize=True):
"""
Collects a comma-separated set of hosts (host:port) and optionnaly
randomize the returned list.
"""
result = []
for host_port in hosts.split(","):
res = host_port.split(':')
host = res[0]
port = int(res[1]) if len(res) > 1 else 9092
result.append((host.strip(), port))
if randomize:
shuffle(result)
return result
class KafkaConnection(local): class KafkaConnection(local):
""" """
A socket connection to a single Kafka broker A socket connection to a single Kafka broker
@@ -19,14 +40,14 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id. since the Kafka API includes a correlation id.
""" """
def __init__(self, host, port, bufsize=4096): def __init__(self, host, port, bufsize=4096, timeout=10):
super(KafkaConnection, self).__init__() super(KafkaConnection, self).__init__()
self.host = host self.host = host
self.port = port self.port = port
self.bufsize = bufsize self.bufsize = bufsize
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.timeout = timeout
self._sock.connect((host, port))
self._sock.settimeout(10) self._sock = socket.create_connection((host, port), timeout=timeout)
self._dirty = False self._dirty = False
def __str__(self): def __str__(self):
@@ -125,7 +146,5 @@ class KafkaConnection(local):
Re-initialize the socket connection Re-initialize the socket connection
""" """
self.close() self.close()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
self._sock.connect((self.host, self.port))
self._sock.settimeout(10)
self._dirty = False self._dirty = False

View File

@@ -1,11 +1,12 @@
import os.path
import sys import sys
from setuptools import setup, Command from setuptools import setup, Command
class Tox(Command): class Tox(Command):
user_options = [] user_options = []
def initialize_options(self): def initialize_options(self):
pass pass
@@ -21,7 +22,7 @@ setup(
name="kafka-python", name="kafka-python",
version="0.8.1-1", version="0.8.1-1",
install_requires=["distribute", "tox"], install_requires=["distribute", "tox", "mock"],
tests_require=["tox"], tests_require=["tox"],
cmdclass={"test": Tox}, cmdclass={"test": Tox},

View File

@@ -16,7 +16,7 @@ class TestKafkaClient(unittest.TestCase):
def setUpClass(cls): # noqa def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance() cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server.host, cls.server.port) cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))
@classmethod @classmethod
def tearDownClass(cls): # noqa def tearDownClass(cls): # noqa
@@ -554,7 +554,7 @@ class TestConsumer(unittest.TestCase):
cls.zk = ZookeeperFixture.instance() cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192) cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192)
@classmethod @classmethod
def tearDownClass(cls): # noqa def tearDownClass(cls): # noqa
@@ -770,20 +770,23 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(all_messages[i], message.message) self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19) self.assertEquals(i, 19)
class TestFailover(unittest.TestCase): class TestFailover(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
zk_chroot = random_string(10) zk_chroot = random_string(10)
replicas = 2 replicas = 2
partitions = 2 partitions = 2
# mini zookeeper, 2 kafka brokers # mini zookeeper, 2 kafka brokers
cls.zk = ZookeeperFixture.instance() cls.zk = ZookeeperFixture.instance()
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port)
hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers])
cls.client = KafkaClient(hosts)
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
@@ -858,17 +861,19 @@ class TestFailover(unittest.TestCase):
resp = producer.send_messages(random_string(10)) resp = producer.send_messages(random_string(10))
if len(resp) > 0: if len(resp) > 0:
self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].error, 0)
time.sleep(1) # give it some time time.sleep(1) # give it some time
def _kill_leader(self, topic, partition): def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
broker = self.brokers[leader.nodeId] broker = self.brokers[leader.nodeId]
broker.close() broker.close()
time.sleep(1) # give it some time time.sleep(1) # give it some time
return broker return broker
def _count_messages(self, group, topic): def _count_messages(self, group, topic):
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
client = KafkaClient(hosts)
consumer = SimpleConsumer(client, group, topic, auto_commit=False) consumer = SimpleConsumer(client, group, topic, auto_commit=False)
all_messages = [] all_messages = []
for message in consumer: for message in consumer:

View File

@@ -3,13 +3,18 @@ import random
import struct import struct
import unittest import unittest
from mock import patch
from kafka.client import KafkaClient from kafka.client import KafkaClient
from kafka.common import ProduceRequest, FetchRequest
from kafka.codec import ( from kafka.codec import (
has_gzip, has_snappy, has_gzip, has_snappy,
gzip_encode, gzip_decode, gzip_encode, gzip_decode,
snappy_encode, snappy_decode snappy_encode, snappy_decode
) )
from kafka.common import (
ProduceRequest, FetchRequest,
BrokerMetadata, PartitionMetadata, TopicAndPartition
)
ITERATIONS = 1000 ITERATIONS = 1000
STRLEN = 100 STRLEN = 100
@@ -217,5 +222,185 @@ class TestRequests(unittest.TestCase):
self.assertEquals(enc, expect) self.assertEquals(enc, expect)
class TestKafkaClient(unittest.TestCase):
def test_send_broker_unaware_request_fail(self):
'Tests that call fails when all hosts are unavailable'
from mock import MagicMock
mocked_conns = {
('kafka01', 9092): MagicMock(),
('kafka02', 9092): MagicMock()
}
# inject conns
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)")
def mock_get_conn(host, port):
print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)])
return mocked_conns[(host, port)]
# patch to avoid making requests before we want it
with patch.object(KafkaClient, '_load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
resp = client._send_broker_unaware_request(1, 'fake request')
self.assertIsNone(resp)
for key, conn in mocked_conns.iteritems():
conn.send.assert_called_with(1, 'fake request')
def test_send_broker_unaware_request(self):
'Tests that call fails when one of the host is available'
from mock import MagicMock
mocked_conns = {
('kafka01', 9092): MagicMock(),
('kafka02', 9092): MagicMock(),
('kafka03', 9092): MagicMock()
}
# inject conns
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response'
mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)")
def mock_get_conn(host, port):
print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)])
return mocked_conns[(host, port)]
# patch to avoid making requests before we want it
with patch.object(KafkaClient, '_load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
resp = client._send_broker_unaware_request(1, 'fake request')
self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
@unittest.skip('requires disabling recursion on _load_metadata_for_topics')
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_client_load_metadata(self, protocol, conn):
conn.recv.return_value = 'response' # anything but None
brokers = {}
brokers[0] = BrokerMetadata(1, 'broker_1', 4567)
brokers[1] = BrokerMetadata(2, 'broker_2', 5678)
topics = {}
topics['topic_1'] = {
0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2])
}
topics['topic_2'] = {
0: PartitionMetadata('topic_2', 0, 0, [0, 1], [0, 1]),
1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0])
}
protocol.decode_metadata_response.return_value = (brokers, topics)
client = KafkaClient(hosts='broker_1:4567')
self.assertItemsEqual(
{
TopicAndPartition('topic_1', 0): brokers[0],
TopicAndPartition('topic_2', 0): brokers[0],
TopicAndPartition('topic_2', 1): brokers[1]
},
client.topics_to_brokers)
@unittest.skip('requires disabling recursion on _load_metadata_for_topics')
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_client_load_metadata_unassigned_partitions(self, protocol, conn):
conn.recv.return_value = 'response' # anything but None
brokers = {}
brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
topics = {}
topics['topic_1'] = {
0: PartitionMetadata('topic_1', 0, -1, [], [])
}
protocol.decode_metadata_response.return_value = (brokers, topics)
client = KafkaClient(hosts='broker_1:4567')
self.assertItemsEqual({}, client.topics_to_brokers)
self.assertRaises(
Exception,
client._get_leader_for_partition,
'topic_1', 0)
# calling _get_leader_for_partition (from any broker aware request)
# will try loading metadata again for the same topic
topics['topic_1'] = {
0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1])
}
leader = client._get_leader_for_partition('topic_1', 0)
self.assertEqual(brokers[0], leader)
self.assertItemsEqual(
{
TopicAndPartition('topic_1', 0): brokers[0],
},
client.topics_to_brokers)
@unittest.skip('requires disabling recursion on _load_metadata_for_topics')
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_client_load_metadata_noleader_partitions(self, protocol, conn):
conn.recv.return_value = 'response' # anything but None
brokers = {}
brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
topics = {}
topics['topic_1'] = {
0: PartitionMetadata('topic_1', 0, -1, [], [])
}
topics['topic_2'] = {
0: PartitionMetadata('topic_2', 0, 0, [0, 1], []),
1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0])
}
protocol.decode_metadata_response.return_value = (brokers, topics)
client = KafkaClient(hosts='broker_1:4567')
self.assertItemsEqual(
{
TopicAndPartition('topic_2', 0): brokers[0],
TopicAndPartition('topic_2', 1): brokers[1]
},
client.topics_to_brokers)
self.assertRaises(
Exception,
client._get_leader_for_partition,
'topic_1', 0)
# calling _get_leader_for_partition (from any broker aware request)
# will try loading metadata again for the same topic
topics['topic_1'] = {
0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1])
}
leader = client._get_leader_for_partition('topic_1', 0)
self.assertEqual(brokers[0], leader)
self.assertItemsEqual(
{
TopicAndPartition('topic_1', 0): brokers[0],
TopicAndPartition('topic_2', 0): brokers[0],
TopicAndPartition('topic_2', 1): brokers[1]
},
client.topics_to_brokers)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()