From 0bdff4e833f73518a7219fca04dfbc3ed201b06e Mon Sep 17 00:00:00 2001 From: Marc Labbe Date: Thu, 14 Nov 2013 09:26:49 -0500 Subject: [PATCH 1/7] Allow KafkaClient to take in a list of brokers for bootstrapping --- README.md | 8 +- example.py | 2 +- kafka/NOTES.md | 2 +- kafka/client.py | 37 ++++---- kafka/conn.py | 33 +++++-- setup.py | 5 +- test/test_integration.py | 23 +++-- test/test_unit.py | 187 ++++++++++++++++++++++++++++++++++++++- 8 files changed, 257 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index c9f782d..edf3931 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ from kafka.client import KafkaClient from kafka.consumer import SimpleConsumer from kafka.producer import SimpleProducer, KeyedProducer -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") # To send messages synchronously producer = SimpleProducer(kafka, "my-topic") @@ -81,7 +81,7 @@ from kafka.client import KafkaClient from kafka.producer import KeyedProducer from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") # HashedPartitioner is default producer = KeyedProducer(kafka, "my-topic") @@ -96,7 +96,7 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) from kafka.client import KafkaClient from kafka.consumer import MultiProcessConsumer -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") # This will split the number of partitions among two processes consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) @@ -116,7 +116,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4): ```python from kafka.client import KafkaClient -kafka = KafkaClient("localhost", 9092) +kafka = KafkaClient("localhost:9092") req = ProduceRequest(topic="my-topic", partition=1, messages=[KafkaProdocol.encode_message("some message")]) resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) diff --git a/example.py b/example.py index 3a2dc92..0cf5583 100644 --- a/example.py +++ b/example.py @@ -14,7 +14,7 @@ def consume_example(client): print(message) def main(): - client = KafkaClient("localhost", 9092) + client = KafkaClient("localhost:9092") produce_example(client) consume_example(client) diff --git a/kafka/NOTES.md b/kafka/NOTES.md index 540cdad..8fb0f47 100644 --- a/kafka/NOTES.md +++ b/kafka/NOTES.md @@ -18,7 +18,7 @@ There are a few levels of abstraction: # Possible API - client = KafkaClient("localhost", 9092) + client = KafkaClient("localhost:9092") producer = KafkaProducer(client, "topic") producer.send_string("hello") diff --git a/kafka/client.py b/kafka/client.py index 71ededa..81eec7d 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -8,7 +8,7 @@ import time from kafka.common import ErrorMapping, TopicAndPartition from kafka.common import ConnectionError, FailedPayloadsException -from kafka.conn import KafkaConnection +from kafka.conn import collect_hosts, KafkaConnection from kafka.protocol import KafkaProtocol log = logging.getLogger("kafka") @@ -19,13 +19,15 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): + def __init__(self, hosts, bufsize=4096, client_id=CLIENT_ID): # We need one connection to bootstrap self.bufsize = bufsize self.client_id = client_id - self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize) - } + + self.hosts = collect_hosts(hosts) + + # create connections only when we need them + self.conns = {} self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] @@ -35,15 +37,19 @@ class KafkaClient(object): # Private API # ################## - def _get_conn_for_broker(self, broker): - """ - Get or create a connection to a broker - """ - if (broker.host, broker.port) not in self.conns: - self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize) + def _get_conn(self, host, port): + "Get or create a connection to a broker using host and port" - return self.conns[(broker.host, broker.port)] + host_key = (host, port) + if host_key not in self.conns: + self.conns[host_key] = KafkaConnection(host, port, self.bufsize) + + return self.conns[host_key] + + def _get_conn_for_broker(self, broker): + "Get or create a connection to a broker" + + return self._get_conn(broker.host, broker.port) def _get_leader_for_partition(self, topic, partition): key = TopicAndPartition(topic, partition) @@ -108,7 +114,8 @@ class KafkaClient(object): Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ - for conn in self.conns.values(): + for (host, port) in self.hosts: + conn = self._get_conn(host, port) try: conn.send(requestId, request) response = conn.recv(requestId) @@ -174,7 +181,7 @@ class KafkaClient(object): except ConnectionError, e: # ignore BufferUnderflow for now log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) failed_payloads += payloads - self.topics_to_brokers = {} # reset metadata + self.topics_to_brokers = {} # reset metadata continue for response in decoder_fn(response): diff --git a/kafka/conn.py b/kafka/conn.py index 14aebc6..614b1bb 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -2,6 +2,7 @@ import copy import logging import socket import struct +from random import shuffle from threading import local from kafka.common import BufferUnderflowError @@ -10,6 +11,26 @@ from kafka.common import ConnectionError log = logging.getLogger("kafka") +def collect_hosts(hosts, randomize=True): + """ + Collects a comma-separated set of hosts (host:port) and optionnaly + randomize the returned list. + """ + + result = [] + for host_port in hosts.split(","): + + res = host_port.split(':') + host = res[0] + port = int(res[1]) if len(res) > 1 else 9092 + result.append((host.strip(), port)) + + if randomize: + shuffle(result) + + return result + + class KafkaConnection(local): """ A socket connection to a single Kafka broker @@ -19,14 +40,14 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=4096): + def __init__(self, host, port, bufsize=4096, timeout=10): super(KafkaConnection, self).__init__() self.host = host self.port = port self.bufsize = bufsize - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) - self._sock.settimeout(10) + self.timeout = timeout + + self._sock = socket.create_connection((host, port), timeout=timeout) self._dirty = False def __str__(self): @@ -125,7 +146,5 @@ class KafkaConnection(local): Re-initialize the socket connection """ self.close() - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((self.host, self.port)) - self._sock.settimeout(10) + self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout) self._dirty = False diff --git a/setup.py b/setup.py index a246914..fedf139 100644 --- a/setup.py +++ b/setup.py @@ -1,11 +1,12 @@ -import os.path import sys from setuptools import setup, Command class Tox(Command): + user_options = [] + def initialize_options(self): pass @@ -21,7 +22,7 @@ setup( name="kafka-python", version="0.8.1-1", - install_requires=["distribute", "tox"], + install_requires=["distribute", "tox", "mock"], tests_require=["tox"], cmdclass={"test": Tox}, diff --git a/test/test_integration.py b/test/test_integration.py index a10dae2..1f37ebf 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -16,7 +16,7 @@ class TestKafkaClient(unittest.TestCase): def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server.host, cls.server.port) + cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port)) @classmethod def tearDownClass(cls): # noqa @@ -554,7 +554,7 @@ class TestConsumer(unittest.TestCase): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192) + cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192) @classmethod def tearDownClass(cls): # noqa @@ -770,20 +770,23 @@ class TestConsumer(unittest.TestCase): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) + class TestFailover(unittest.TestCase): @classmethod def setUpClass(cls): zk_chroot = random_string(10) - replicas = 2 + replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - cls.zk = ZookeeperFixture.instance() - kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] + cls.zk = ZookeeperFixture.instance() + kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port) + + hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers]) + cls.client = KafkaClient(hosts) @classmethod def tearDownClass(cls): @@ -858,17 +861,19 @@ class TestFailover(unittest.TestCase): resp = producer.send_messages(random_string(10)) if len(resp) > 0: self.assertEquals(resp[0].error, 0) - time.sleep(1) # give it some time + time.sleep(1) # give it some time def _kill_leader(self, topic, partition): leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] broker = self.brokers[leader.nodeId] broker.close() - time.sleep(1) # give it some time + time.sleep(1) # give it some time return broker def _count_messages(self, group, topic): - client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + + hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port) + client = KafkaClient(hosts) consumer = SimpleConsumer(client, group, topic, auto_commit=False) all_messages = [] for message in consumer: diff --git a/test/test_unit.py b/test/test_unit.py index 3f3af66..4ea9442 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -3,13 +3,18 @@ import random import struct import unittest +from mock import patch + from kafka.client import KafkaClient -from kafka.common import ProduceRequest, FetchRequest from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode ) +from kafka.common import ( + ProduceRequest, FetchRequest, + BrokerMetadata, PartitionMetadata, TopicAndPartition +) ITERATIONS = 1000 STRLEN = 100 @@ -217,5 +222,185 @@ class TestRequests(unittest.TestCase): self.assertEquals(enc, expect) +class TestKafkaClient(unittest.TestCase): + + def test_send_broker_unaware_request_fail(self): + 'Tests that call fails when all hosts are unavailable' + + from mock import MagicMock + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock() + } + # inject conns + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)") + + def mock_get_conn(host, port): + print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)]) + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, '_load_metadata_for_topics'), \ + patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + + resp = client._send_broker_unaware_request(1, 'fake request') + + self.assertIsNone(resp) + + for key, conn in mocked_conns.iteritems(): + conn.send.assert_called_with(1, 'fake request') + + def test_send_broker_unaware_request(self): + 'Tests that call fails when one of the host is available' + + from mock import MagicMock + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock(), + ('kafka03', 9092): MagicMock() + } + # inject conns + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response' + mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)") + + def mock_get_conn(host, port): + print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)]) + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, '_load_metadata_for_topics'), \ + patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + + resp = client._send_broker_unaware_request(1, 'fake request') + + self.assertEqual('valid response', resp) + mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) + + @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(1, 'broker_1', 4567) + brokers[1] = BrokerMetadata(2, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) + } + topics['topic_2'] = { + 0: PartitionMetadata('topic_2', 0, 0, [0, 1], [0, 1]), + 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(hosts='broker_1:4567') + self.assertItemsEqual( + { + TopicAndPartition('topic_1', 0): brokers[0], + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1] + }, + client.topics_to_brokers) + + @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata_unassigned_partitions(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, -1, [], []) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(hosts='broker_1:4567') + + self.assertItemsEqual({}, client.topics_to_brokers) + self.assertRaises( + Exception, + client._get_leader_for_partition, + 'topic_1', 0) + + # calling _get_leader_for_partition (from any broker aware request) + # will try loading metadata again for the same topic + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) + } + leader = client._get_leader_for_partition('topic_1', 0) + + self.assertEqual(brokers[0], leader) + self.assertItemsEqual( + { + TopicAndPartition('topic_1', 0): brokers[0], + }, + client.topics_to_brokers) + + @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata_noleader_partitions(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, -1, [], []) + } + topics['topic_2'] = { + 0: PartitionMetadata('topic_2', 0, 0, [0, 1], []), + 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(hosts='broker_1:4567') + self.assertItemsEqual( + { + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1] + }, + client.topics_to_brokers) + self.assertRaises( + Exception, + client._get_leader_for_partition, + 'topic_1', 0) + + # calling _get_leader_for_partition (from any broker aware request) + # will try loading metadata again for the same topic + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) + } + leader = client._get_leader_for_partition('topic_1', 0) + + self.assertEqual(brokers[0], leader) + self.assertItemsEqual( + { + TopicAndPartition('topic_1', 0): brokers[0], + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1] + }, + client.topics_to_brokers) + if __name__ == '__main__': unittest.main() From a2191e5be5d5fcd212582580c163f4533cca6c73 Mon Sep 17 00:00:00 2001 From: mrtheb Date: Sun, 9 Feb 2014 13:44:47 -0500 Subject: [PATCH 2/7] Support list (or comma-separated) of hosts (replaces host and port arguments) --- kafka/client.py | 13 +++++++------ kafka/conn.py | 5 ++++- test/test_integration.py | 26 +++++++++++++------------- test/test_unit.py | 40 ++++++++++++++++++++++++++++++++-------- 4 files changed, 56 insertions(+), 28 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 33c4419..96cc1df 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -10,7 +10,7 @@ from kafka.common import (ErrorMapping, TopicAndPartition, BrokerResponseError, PartitionUnavailableError, KafkaUnavailableError, KafkaRequestError) -from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS +from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol log = logging.getLogger("kafka") @@ -24,14 +24,15 @@ class KafkaClient(object): # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a # socket timeout. - def __init__(self, host, port, client_id=CLIENT_ID, + def __init__(self, hosts, client_id=CLIENT_ID, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): # We need one connection to bootstrap self.client_id = client_id self.timeout = timeout - self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, timeout=timeout) - } + self.hosts = collect_hosts(hosts) + + # create connections only when we need them + self.conns = {} self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] @@ -46,7 +47,7 @@ class KafkaClient(object): host_key = (host, port) if host_key not in self.conns: - self.conns[host_key] = KafkaConnection(host, port, self.bufsize) + self.conns[host_key] = KafkaConnection(host, port) return self.conns[host_key] diff --git a/kafka/conn.py b/kafka/conn.py index de2d385..20f22dc 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -17,8 +17,11 @@ def collect_hosts(hosts, randomize=True): randomize the returned list. """ + if isinstance(hosts, str): + hosts = hosts.split(',') + result = [] - for host_port in hosts.split(","): + for host_port in hosts: res = host_port.split(':') host = res[0] diff --git a/test/test_integration.py b/test/test_integration.py index 000f44a..3d6ccf6 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -33,7 +33,7 @@ def ensure_topic_creation(client, topic_name): class KafkaTestCase(unittest.TestCase): def setUp(self): - self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10)) + self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) ensure_topic_creation(self.client, self.topic) @@ -578,7 +578,7 @@ class TestConsumer(KafkaTestCase): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192) + cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port)) @classmethod def tearDownClass(cls): # noqa @@ -800,7 +800,6 @@ class TestConsumer(KafkaTestCase): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) - # Produce 1 message that is too large (bigger than max fetch size) big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10 big_message = create_message(random_string(big_message_size)) @@ -827,25 +826,26 @@ class TestConsumer(KafkaTestCase): class TestFailover(KafkaTestCase): - def setUp(self): + @classmethod + def setUpClass(cls): # noqa zk_chroot = random_string(10) replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - self.zk = ZookeeperFixture.instance() - kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] - self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + cls.zk = ZookeeperFixture.instance() + kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] + cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers]) + hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers] cls.client = KafkaClient(hosts) - super(TestFailover, self).setUp() - def tearDown(self): - self.client.close() - for broker in self.brokers: + @classmethod + def tearDownClass(cls): + cls.client.close() + for broker in cls.brokers: broker.close() - self.zk.close() + cls.zk.close() def test_switch_leader(self): key, topic, partition = random_string(5), self.topic, 0 diff --git a/test/test_unit.py b/test/test_unit.py index 4c78c1b..624fe39 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -5,11 +5,13 @@ import unittest from mock import patch +from kafka import KafkaClient from kafka.common import ( ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, PartitionMetadata ) +from kafka.common import KafkaUnavailableError from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode @@ -384,6 +386,26 @@ class TestProtocol(unittest.TestCase): class TestKafkaClient(unittest.TestCase): + def test_init_with_list(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092']) + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + + def test_init_with_csv(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts='kafka01:9092,kafka02:9092,kafka03:9092') + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + def test_send_broker_unaware_request_fail(self): 'Tests that call fails when all hosts are unavailable' @@ -402,14 +424,16 @@ class TestKafkaClient(unittest.TestCase): return mocked_conns[(host, port)] # patch to avoid making requests before we want it - with patch.object(KafkaClient, '_load_metadata_for_topics'), \ + with patch.object(KafkaClient, 'load_metadata_for_topics'), \ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): - client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + client = KafkaClient(hosts=['kafka01:9092','kafka02:9092']) - resp = client._send_broker_unaware_request(1, 'fake request') - self.assertIsNone(resp) + self.assertRaises( + KafkaUnavailableError, + client._send_broker_unaware_request, + 1, 'fake request') for key, conn in mocked_conns.iteritems(): conn.send.assert_called_with(1, 'fake request') @@ -434,7 +458,7 @@ class TestKafkaClient(unittest.TestCase): return mocked_conns[(host, port)] # patch to avoid making requests before we want it - with patch.object(KafkaClient, '_load_metadata_for_topics'), \ + with patch.object(KafkaClient, 'load_metadata_for_topics'), \ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): client = KafkaClient(hosts='kafka01:9092,kafka02:9092') @@ -444,7 +468,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEqual('valid response', resp) mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) - @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @unittest.skip('requires disabling recursion on load_metadata_for_topics') @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_client_load_metadata(self, protocol, conn): @@ -474,7 +498,7 @@ class TestKafkaClient(unittest.TestCase): }, client.topics_to_brokers) - @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @unittest.skip('requires disabling recursion on load_metadata_for_topics') @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_client_load_metadata_unassigned_partitions(self, protocol, conn): @@ -513,7 +537,7 @@ class TestKafkaClient(unittest.TestCase): }, client.topics_to_brokers) - @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @unittest.skip('requires disabling recursion on load_metadata_for_topics') @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_client_load_metadata_noleader_partitions(self, protocol, conn): From 51910f981843dfa967d24659cdb46117210c832d Mon Sep 17 00:00:00 2001 From: mrtheb Date: Sat, 15 Feb 2014 13:43:56 -0500 Subject: [PATCH 3/7] clean up after comments from @rdiomar --- kafka/conn.py | 8 ++- test/test_unit.py | 136 ++-------------------------------------------- 2 files changed, 11 insertions(+), 133 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 20f22dc..749cc02 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -10,6 +10,8 @@ from kafka.common import ConnectionError log = logging.getLogger("kafka") DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 +DEFAULT_KAFKA_PORT = 9092 + def collect_hosts(hosts, randomize=True): """ @@ -18,14 +20,14 @@ def collect_hosts(hosts, randomize=True): """ if isinstance(hosts, str): - hosts = hosts.split(',') + hosts = hosts.strip().split(',') result = [] for host_port in hosts: res = host_port.split(':') host = res[0] - port = int(res[1]) if len(res) > 1 else 9092 + port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT result.append((host.strip(), port)) if randomize: @@ -105,7 +107,7 @@ class KafkaConnection(local): sent = self._sock.sendall(payload) if sent is not None: self._raise_connection_error() - except socket.error, e: + except socket.error: log.exception('Unable to send payload to Kafka') self._raise_connection_error() diff --git a/test/test_unit.py b/test/test_unit.py index 624fe39..3239e6a 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -3,7 +3,8 @@ import random import struct import unittest -from mock import patch +from mock import MagicMock, patch + from kafka import KafkaClient from kafka.common import ( @@ -366,7 +367,6 @@ class TestProtocol(unittest.TestCase): def test_decode_offset_response(self): pass - @unittest.skip("Not Implemented") def test_encode_offset_commit_request(self): pass @@ -409,26 +409,22 @@ class TestKafkaClient(unittest.TestCase): def test_send_broker_unaware_request_fail(self): 'Tests that call fails when all hosts are unavailable' - from mock import MagicMock - mocked_conns = { ('kafka01', 9092): MagicMock(), ('kafka02', 9092): MagicMock() } - # inject conns + # inject KafkaConnection side effects mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)") def mock_get_conn(host, port): - print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)]) return mocked_conns[(host, port)] # patch to avoid making requests before we want it with patch.object(KafkaClient, 'load_metadata_for_topics'), \ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): - client = KafkaClient(hosts=['kafka01:9092','kafka02:9092']) - + client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) self.assertRaises( KafkaUnavailableError, @@ -439,22 +435,19 @@ class TestKafkaClient(unittest.TestCase): conn.send.assert_called_with(1, 'fake request') def test_send_broker_unaware_request(self): - 'Tests that call fails when one of the host is available' - - from mock import MagicMock + 'Tests that call works when at least one of the host is available' mocked_conns = { ('kafka01', 9092): MagicMock(), ('kafka02', 9092): MagicMock(), ('kafka03', 9092): MagicMock() } - # inject conns + # inject KafkaConnection side effects mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response' mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)") def mock_get_conn(host, port): - print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)]) return mocked_conns[(host, port)] # patch to avoid making requests before we want it @@ -468,123 +461,6 @@ class TestKafkaClient(unittest.TestCase): self.assertEqual('valid response', resp) mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) - @unittest.skip('requires disabling recursion on load_metadata_for_topics') - @patch('kafka.client.KafkaConnection') - @patch('kafka.client.KafkaProtocol') - def test_client_load_metadata(self, protocol, conn): - - conn.recv.return_value = 'response' # anything but None - - brokers = {} - brokers[0] = BrokerMetadata(1, 'broker_1', 4567) - brokers[1] = BrokerMetadata(2, 'broker_2', 5678) - - topics = {} - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) - } - topics['topic_2'] = { - 0: PartitionMetadata('topic_2', 0, 0, [0, 1], [0, 1]), - 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) - - client = KafkaClient(hosts='broker_1:4567') - self.assertItemsEqual( - { - TopicAndPartition('topic_1', 0): brokers[0], - TopicAndPartition('topic_2', 0): brokers[0], - TopicAndPartition('topic_2', 1): brokers[1] - }, - client.topics_to_brokers) - - @unittest.skip('requires disabling recursion on load_metadata_for_topics') - @patch('kafka.client.KafkaConnection') - @patch('kafka.client.KafkaProtocol') - def test_client_load_metadata_unassigned_partitions(self, protocol, conn): - - conn.recv.return_value = 'response' # anything but None - - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) - - topics = {} - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, -1, [], []) - } - protocol.decode_metadata_response.return_value = (brokers, topics) - - client = KafkaClient(hosts='broker_1:4567') - - self.assertItemsEqual({}, client.topics_to_brokers) - self.assertRaises( - Exception, - client._get_leader_for_partition, - 'topic_1', 0) - - # calling _get_leader_for_partition (from any broker aware request) - # will try loading metadata again for the same topic - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) - } - leader = client._get_leader_for_partition('topic_1', 0) - - self.assertEqual(brokers[0], leader) - self.assertItemsEqual( - { - TopicAndPartition('topic_1', 0): brokers[0], - }, - client.topics_to_brokers) - - @unittest.skip('requires disabling recursion on load_metadata_for_topics') - @patch('kafka.client.KafkaConnection') - @patch('kafka.client.KafkaProtocol') - def test_client_load_metadata_noleader_partitions(self, protocol, conn): - - conn.recv.return_value = 'response' # anything but None - - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) - - topics = {} - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, -1, [], []) - } - topics['topic_2'] = { - 0: PartitionMetadata('topic_2', 0, 0, [0, 1], []), - 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) - - client = KafkaClient(hosts='broker_1:4567') - self.assertItemsEqual( - { - TopicAndPartition('topic_2', 0): brokers[0], - TopicAndPartition('topic_2', 1): brokers[1] - }, - client.topics_to_brokers) - self.assertRaises( - Exception, - client._get_leader_for_partition, - 'topic_1', 0) - - # calling _get_leader_for_partition (from any broker aware request) - # will try loading metadata again for the same topic - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) - } - leader = client._get_leader_for_partition('topic_1', 0) - - self.assertEqual(brokers[0], leader) - self.assertItemsEqual( - { - TopicAndPartition('topic_1', 0): brokers[0], - TopicAndPartition('topic_2', 0): brokers[0], - TopicAndPartition('topic_2', 1): brokers[1] - }, - client.topics_to_brokers) if __name__ == '__main__': unittest.main() From 96c9ce0c4a2ab515c581062117d5a6221f1f2036 Mon Sep 17 00:00:00 2001 From: Greg Bowyer Date: Wed, 19 Feb 2014 14:42:40 -0800 Subject: [PATCH 4/7] Make it possible to read and write xerial snappy Fixes mumrah/kafka-python#126 TL;DR ===== This makes it possible to read and write snappy compressed streams that are compatible with the java and scala kafka clients (the xerial blocking format)) Xerial Details ============== Kafka supports transparent compression of data (both in transit and at rest) of messages, one of the allowable compression algorithms is Google's snappy, an algorithm which has excellent performance at the cost of efficiency. The specific implementation of snappy used in kafka is the xerial-snappy implementation, this is a readily available java library for snappy. As part of this implementation, there is a specialised blocking format that is somewhat none standard in the snappy world. Xerial Format ------------- The blocking mode of the xerial snappy library is fairly simple, using a magic header to identify itself and then a size + block scheme, unless otherwise noted all items in xerials blocking format are assumed to be big-endian. A block size (```xerial_blocksize``` in implementation) controls how frequent the blocking occurs 32k is the default in the xerial library, this blocking controls the size of the uncompressed chunks that will be fed to snappy to be compressed. The format winds up being | Header | Block1 len | Block1 data | Blockn len | Blockn data | | ----------- | ---------- | ------------ | ---------- | ------------ | | 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes | It is important to not that the blocksize is the amount of uncompressed data presented to snappy at each block, whereas the blocklen is the number of bytes that will be present in the stream, that is the length will always be <= blocksize. Xerial blocking header ---------------------- Marker | Magic String | Null / Pad | Version | Compat ------ | ------------ | ---------- | -------- | -------- byte | c-string | byte | int32 | int32 ------ | ------------ | ---------- | -------- | -------- -126 | 'SNAPPY' | \0 | variable | variable The pad appears to be to ensure that SNAPPY is a valid cstring, and to align the header on a word boundary. The version is the version of this format as written by xerial, in the wild this is currently 1 as such we only support v1. Compat is there to claim the minimum supported version that can read a xerial block stream, presently in the wild this is 1. Implementation specific details =============================== The implementation presented here follows the Xerial implementation as of its v1 blocking format, no attempts are made to check for future versions. Since none-xerial aware clients might have persisted snappy compressed messages to kafka brokers we allow clients to turn on xerial compatibility for message sending, and perform header sniffing to detect xerial vs plain snappy payloads. --- kafka/codec.py | 98 +++++++++++++++++++++++++++++++++++++++++++++-- test/test_unit.py | 43 +++++++++++++++++++++ 2 files changed, 138 insertions(+), 3 deletions(-) diff --git a/kafka/codec.py b/kafka/codec.py index eb5d03c..206ddb4 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -1,5 +1,9 @@ from cStringIO import StringIO import gzip +import struct + +_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1) +_XERIAL_V1_FORMAT = 'bccccccBii' try: import snappy @@ -36,13 +40,101 @@ def gzip_decode(payload): return result -def snappy_encode(payload): +def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): + """Encodes the given data with snappy if xerial_compatible is set then the + stream is encoded in a fashion compatible with the xerial snappy library + + The block size (xerial_blocksize) controls how frequent the blocking occurs + 32k is the default in the xerial library. + + The format winds up being + +-------------+------------+--------------+------------+--------------+ + | Header | Block1 len | Block1 data | Blockn len | Blockn data | + |-------------+------------+--------------+------------+--------------| + | 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes | + +-------------+------------+--------------+------------+--------------+ + + It is important to not that the blocksize is the amount of uncompressed + data presented to snappy at each block, whereas the blocklen is the + number of bytes that will be present in the stream, that is the + length will always be <= blocksize. + """ + if not _has_snappy: raise NotImplementedError("Snappy codec is not available") - return snappy.compress(payload) + + if xerial_compatible: + def _chunker(): + for i in xrange(0, len(payload), xerial_blocksize): + yield payload[i:i+xerial_blocksize] + + out = StringIO() + + header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat + in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) + + out.write(header) + for chunk in _chunker(): + block = snappy.compress(chunk) + block_size = len(block) + out.write(struct.pack('!i', block_size)) + out.write(block) + + out.seek(0) + return out.read() + + else: + return snappy.compress(payload) + + +def _detect_xerial_stream(payload): + """Detects if the data given might have been encoded with the blocking mode + of the xerial snappy library. + + This mode writes a magic header of the format: + +--------+--------------+------------+---------+--------+ + | Marker | Magic String | Null / Pad | Version | Compat | + |--------+--------------+------------+---------+--------| + | byte | c-string | byte | int32 | int32 | + |--------+--------------+------------+---------+--------| + | -126 | 'SNAPPY' | \0 | | | + +--------+--------------+------------+---------+--------+ + + The pad appears to be to ensure that SNAPPY is a valid cstring + The version is the version of this format as written by xerial, + in the wild this is currently 1 as such we only support v1. + + Compat is there to claim the miniumum supported version that + can read a xerial block stream, presently in the wild this is + 1. + """ + + if len(payload) > 16: + header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) + return header == _XERIAL_V1_HEADER + return False def snappy_decode(payload): if not _has_snappy: raise NotImplementedError("Snappy codec is not available") - return snappy.decompress(payload) + + if _detect_xerial_stream(payload): + # TODO ? Should become a fileobj ? + out = StringIO() + byt = buffer(payload[16:]) + length = len(byt) + cursor = 0 + + while cursor < length: + block_size = struct.unpack_from('!i', byt[cursor:])[0] + # Skip the block size + cursor += 4 + end = cursor + block_size + out.write(snappy.decompress(byt[cursor:end])) + cursor = end + + out.seek(0) + return out.read() + else: + return snappy.decompress(payload) diff --git a/test/test_unit.py b/test/test_unit.py index e3fd4bb..b5f0118 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -70,6 +70,49 @@ class TestCodec(unittest.TestCase): s2 = snappy_decode(snappy_encode(s1)) self.assertEquals(s1, s2) + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_detect_xerial(self): + import kafka as kafka1 + _detect_xerial_stream = kafka1.codec._detect_xerial_stream + + header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes' + false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01' + random_snappy = snappy_encode('SNAPPY' * 50) + short_data = b'\x01\x02\x03\x04' + + self.assertTrue(_detect_xerial_stream(header)) + self.assertFalse(_detect_xerial_stream(b'')) + self.assertFalse(_detect_xerial_stream(b'\x00')) + self.assertFalse(_detect_xerial_stream(false_header)) + self.assertFalse(_detect_xerial_stream(random_snappy)) + self.assertFalse(_detect_xerial_stream(short_data)) + + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_decode_xerial(self): + header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + random_snappy = snappy_encode('SNAPPY' * 50) + block_len = len(random_snappy) + random_snappy2 = snappy_encode('XERIAL' * 50) + block_len2 = len(random_snappy2) + + to_test = header \ + + struct.pack('!i', block_len) + random_snappy \ + + struct.pack('!i', block_len2) + random_snappy2 \ + + self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50)) + + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_encode_xerial(self): + to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \ + '\x00\x00\x00\x18' + \ + '\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \ + '\x00\x00\x00\x18' + \ + '\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + + to_test = ('SNAPPY' * 50) + ('XERIAL' * 50) + + compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300) + self.assertEquals(compressed, to_ensure) class TestProtocol(unittest.TestCase): From e5fdc1c7b22c8ad2aaa66a486871d0ed65977e3d Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 25 Feb 2014 10:46:28 -0500 Subject: [PATCH 5/7] Fix version in __init__.py to match setup.py --- kafka/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/__init__.py b/kafka/__init__.py index 73aa760..e446f58 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -1,5 +1,5 @@ __title__ = 'kafka' -__version__ = '0.2-alpha' +__version__ = '0.9.0' __author__ = 'David Arthur' __license__ = 'Apache License 2.0' __copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0' From 2eac578a5a75e5f8f2afcfdcd18d127c7541e5b3 Mon Sep 17 00:00:00 2001 From: Zack Dever Date: Mon, 3 Mar 2014 17:48:37 -0800 Subject: [PATCH 6/7] nit: fixed misspelling --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 749cc02..7266ae8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -15,7 +15,7 @@ DEFAULT_KAFKA_PORT = 9092 def collect_hosts(hosts, randomize=True): """ - Collects a comma-separated set of hosts (host:port) and optionnaly + Collects a comma-separated set of hosts (host:port) and optionally randomize the returned list. """ From c4cbe04cf13cc94ea1c72904f2b63b91fb5c0b84 Mon Sep 17 00:00:00 2001 From: stephenarmstrong Date: Thu, 13 Mar 2014 15:14:32 -0700 Subject: [PATCH 7/7] If a broker refuses the connection, try the next --- kafka/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 96cc1df..a683fe0 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -83,14 +83,14 @@ class KafkaClient(object): brokers. Keep trying until you succeed. """ for (host, port) in self.hosts: - conn = self._get_conn(host, port) try: + conn = self._get_conn(host, port) conn.send(requestId, request) response = conn.recv(requestId) return response except Exception, e: - log.warning("Could not send request [%r] to server %s, " - "trying next server: %s" % (request, conn, e)) + log.warning("Could not send request [%r] to server %s:%i, " + "trying next server: %s" % (request, host, port, e)) continue raise KafkaUnavailableError("All servers failed to process request")