From 6189bd46951580f1c0e280d289ab90dfc7d70b37 Mon Sep 17 00:00:00 2001 From: mrtheb Date: Sat, 18 Jan 2014 09:27:08 -0500 Subject: [PATCH 1/7] added mock --- kafka/client.py | 7 +-- setup.py | 3 +- test/test_unit.py | 135 +++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 134 insertions(+), 11 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 155f658..e6b3ca9 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -243,9 +243,10 @@ class KafkaClient(object): self.topic_partitions[topic] = [] for partition, meta in partitions.items(): - topic_part = TopicAndPartition(topic, partition) - self.topics_to_brokers[topic_part] = brokers[meta.leader] - self.topic_partitions[topic].append(partition) + if meta.leader != -1: + topic_part = TopicAndPartition(topic, partition) + self.topics_to_brokers[topic_part] = brokers[meta.leader] + self.topic_partitions[topic].append(partition) def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): diff --git a/setup.py b/setup.py index 0869fee..fe951e7 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,3 @@ -import os.path import sys from setuptools import setup, Command @@ -23,7 +22,7 @@ setup( version="0.9.0", install_requires=["distribute"], - tests_require=["tox"], + tests_require=["tox", "mock"], cmdclass={"test": Tox}, packages=["kafka"], diff --git a/test/test_unit.py b/test/test_unit.py index e3fd4bb..0d8f169 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -6,7 +6,8 @@ import unittest from kafka.common import ( ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, - OffsetAndMessage, BrokerMetadata, PartitionMetadata + OffsetAndMessage, BrokerMetadata, PartitionMetadata, + TopicAndPartition ) from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, @@ -16,6 +17,10 @@ from kafka.protocol import ( create_gzip_message, create_message, create_snappy_message, KafkaProtocol ) +from kafka.client import KafkaClient + +from mock import patch + ITERATIONS = 1000 STRLEN = 100 @@ -87,8 +92,9 @@ class TestProtocol(unittest.TestCase): payloads = ["v1", "v2"] msg = create_gzip_message(payloads) self.assertEqual(msg.magic, 0) - self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & - KafkaProtocol.CODEC_GZIP) + self.assertEqual( + msg.attributes, + KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP) self.assertEqual(msg.key, None) # Need to decode to check since gzipped payload is non-deterministic decoded = gzip_decode(msg.value) @@ -103,8 +109,9 @@ class TestProtocol(unittest.TestCase): payloads = ["v1", "v2"] msg = create_snappy_message(payloads) self.assertEqual(msg.magic, 0) - self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & - KafkaProtocol.CODEC_SNAPPY) + self.assertEqual( + msg.attributes, + KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY) self.assertEqual(msg.key, None) expect = ("8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff\xff\xff\xff" "\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5\x96\nx\x00\x00\xff" @@ -275,6 +282,7 @@ class TestProtocol(unittest.TestCase): len(ms3), ms3) responses = list(KafkaProtocol.decode_fetch_response(encoded)) + def expand_messages(response): return FetchResponse(response.topic, response.partition, response.error, response.highwaterMark, @@ -362,7 +370,6 @@ class TestProtocol(unittest.TestCase): def test_decode_offset_response(self): pass - @unittest.skip("Not Implemented") def test_encode_offset_commit_request(self): pass @@ -380,5 +387,121 @@ class TestProtocol(unittest.TestCase): pass +class TestClient(unittest.TestCase): + + #@unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(1, 'broker_1', 4567) + brokers[1] = BrokerMetadata(2, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) + } + topics['topic_2'] = { + 0: PartitionMetadata('topic_2', 0, 0, [0, 1], [0, 1]), + 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(host='broker_1', port=4567) + self.assertItemsEqual({ + TopicAndPartition('topic_1', 0): brokers[0], + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1]}, + client.topics_to_brokers) + + # @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata_unassigned_partitions(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, -1, [], []) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(host='broker_1', port=4567) + + self.assertItemsEqual({}, client.topics_to_brokers) + self.assertRaises( + Exception, + client._get_leader_for_partition, + 'topic_1', 0) + + # calling _get_leader_for_partition (from any broker aware request) + # will try loading metadata again for the same topic + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) + } + leader = client._get_leader_for_partition('topic_1', 0) + + self.assertEqual(brokers[0], leader) + self.assertItemsEqual({ + TopicAndPartition('topic_1', 0): brokers[0]}, + client.topics_to_brokers) + + #@unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata_noleader_partitions(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, -1, [], []) + } + topics['topic_2'] = { + 0: PartitionMetadata('topic_2', 0, 0, [0, 1], []), + 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(host='broker_1', port=4567) + self.assertItemsEqual( + { + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1] + }, + client.topics_to_brokers) + self.assertRaises( + Exception, + client._get_leader_for_partition, + 'topic_1', 0) + + # calling _get_leader_for_partition (from any broker aware request) + # will try loading metadata again for the same topic + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) + } + leader = client._get_leader_for_partition('topic_1', 0) + + self.assertEqual(brokers[0], leader) + self.assertItemsEqual( + { + TopicAndPartition('topic_1', 0): brokers[0], + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1] + }, + client.topics_to_brokers) + if __name__ == '__main__': unittest.main() From 9e2778efbb01753843c50d245378fb6a3d3ba989 Mon Sep 17 00:00:00 2001 From: mrtheb Date: Sun, 19 Jan 2014 20:42:32 -0500 Subject: [PATCH 2/7] removed commented out lines --- test/test_unit.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/test_unit.py b/test/test_unit.py index 0d8f169..602f5f5 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -389,7 +389,6 @@ class TestProtocol(unittest.TestCase): class TestClient(unittest.TestCase): - #@unittest.skip('requires disabling recursion on _load_metadata_for_topics') @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_client_load_metadata(self, protocol, conn): @@ -417,7 +416,6 @@ class TestClient(unittest.TestCase): TopicAndPartition('topic_2', 1): brokers[1]}, client.topics_to_brokers) - # @unittest.skip('requires disabling recursion on _load_metadata_for_topics') @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_client_load_metadata_unassigned_partitions(self, protocol, conn): @@ -454,7 +452,6 @@ class TestClient(unittest.TestCase): TopicAndPartition('topic_1', 0): brokers[0]}, client.topics_to_brokers) - #@unittest.skip('requires disabling recursion on _load_metadata_for_topics') @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_client_load_metadata_noleader_partitions(self, protocol, conn): From 8bcf0f0940a94ddb2ee44a6edb333ca0d8595913 Mon Sep 17 00:00:00 2001 From: Marc Labbe Date: Fri, 31 Jan 2014 20:06:30 -0500 Subject: [PATCH 3/7] Handle cases for partition with leader=-1 (not defined) --- kafka/client.py | 18 ++++--- kafka/common.py | 4 -- test/test_unit.py | 123 ++++++++++++++++++++++++++-------------------- 3 files changed, 82 insertions(+), 63 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index e6b3ca9..a76bf47 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -8,7 +8,7 @@ from itertools import count from kafka.common import (ErrorMapping, TopicAndPartition, ConnectionError, FailedPayloadsError, BrokerResponseError, PartitionUnavailableError, - KafkaUnavailableError, KafkaRequestError) + KafkaUnavailableError) from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol @@ -53,11 +53,13 @@ class KafkaClient(object): def _get_leader_for_partition(self, topic, partition): key = TopicAndPartition(topic, partition) - if key not in self.topics_to_brokers: + # reload metadata whether the partition is not available + # or has not leader (broker is None) + if self.topics_to_brokers.get(key) is None: self.load_metadata_for_topics(topic) if key not in self.topics_to_brokers: - raise KafkaRequestError("Partition does not exist: %s" % str(key)) + raise PartitionUnavailableError("No leader for %s" % str(key)) return self.topics_to_brokers[key] @@ -239,14 +241,18 @@ class KafkaClient(object): self.reset_topic_metadata(topic) if not partitions: + log.info('No partitions for %s', topic) continue self.topic_partitions[topic] = [] for partition, meta in partitions.items(): - if meta.leader != -1: - topic_part = TopicAndPartition(topic, partition) + self.topic_partitions[topic].append(partition) + topic_part = TopicAndPartition(topic, partition) + if meta.leader == -1: + log.info('No leader for topic %s partition %d', topic, partition) + self.topics_to_brokers[topic_part] = None + else: self.topics_to_brokers[topic_part] = brokers[meta.leader] - self.topic_partitions[topic].append(partition) def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): diff --git a/kafka/common.py b/kafka/common.py index c0a1a6a..ec0b89b 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -74,10 +74,6 @@ class KafkaError(RuntimeError): pass -class KafkaRequestError(KafkaError): - pass - - class KafkaUnavailableError(KafkaError): pass diff --git a/test/test_unit.py b/test/test_unit.py index 602f5f5..b8af242 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -7,7 +7,7 @@ from kafka.common import ( ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, PartitionMetadata, - TopicAndPartition + TopicAndPartition, PartitionUnavailableError ) from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, @@ -55,7 +55,6 @@ class TestPackage(unittest.TestCase): from kafka import KafkaClient as KafkaClient2 self.assertEquals(KafkaClient2.__name__, "KafkaClient") - from kafka.codec import snappy_encode self.assertEquals(snappy_encode.__name__, "snappy_encode") @@ -391,7 +390,8 @@ class TestClient(unittest.TestCase): @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') - def test_client_load_metadata(self, protocol, conn): + def test_load_metadata(self, protocol, conn): + "Load metadata for all topics" conn.recv.return_value = 'response' # anything but None @@ -403,22 +403,33 @@ class TestClient(unittest.TestCase): topics['topic_1'] = { 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) } - topics['topic_2'] = { - 0: PartitionMetadata('topic_2', 0, 0, [0, 1], [0, 1]), - 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) + topics['topic_noleader'] = { + 0: PartitionMetadata('topic_noleader', 0, -1, [], []), + 1: PartitionMetadata('topic_noleader', 1, -1, [], []) + } + topics['topic_no_partitions'] = {} + topics['topic_3'] = { + 0: PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1]), + 1: PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0]), + 2: PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1]) } protocol.decode_metadata_response.return_value = (brokers, topics) + # client loads metadata at init client = KafkaClient(host='broker_1', port=4567) self.assertItemsEqual({ TopicAndPartition('topic_1', 0): brokers[0], - TopicAndPartition('topic_2', 0): brokers[0], - TopicAndPartition('topic_2', 1): brokers[1]}, + TopicAndPartition('topic_noleader', 0): None, + TopicAndPartition('topic_noleader', 1): None, + TopicAndPartition('topic_3', 0): brokers[0], + TopicAndPartition('topic_3', 1): brokers[1], + TopicAndPartition('topic_3', 2): brokers[0]}, client.topics_to_brokers) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') - def test_client_load_metadata_unassigned_partitions(self, protocol, conn): + def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn): + "Get leader for partitions reload metadata if it is not available" conn.recv.return_value = 'response' # anything but None @@ -426,35 +437,54 @@ class TestClient(unittest.TestCase): brokers[0] = BrokerMetadata(0, 'broker_1', 4567) brokers[1] = BrokerMetadata(1, 'broker_2', 5678) - topics = {} - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, -1, [], []) + topics = {'topic_no_partitions': {}} + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(host='broker_1', port=4567) + + # topic metadata is loaded but empty + self.assertItemsEqual({}, client.topics_to_brokers) + + topics['topic_no_partitions'] = { + 0: PartitionMetadata('topic_no_partitions', 0, 0, [0, 1], [0, 1]) } protocol.decode_metadata_response.return_value = (brokers, topics) + # calling _get_leader_for_partition (from any broker aware request) + # will try loading metadata again for the same topic + leader = client._get_leader_for_partition('topic_no_partitions', 0) + + self.assertEqual(brokers[0], leader) + self.assertItemsEqual({ + TopicAndPartition('topic_no_partitions', 0): brokers[0]}, + client.topics_to_brokers) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_get_leader_for_unassigned_partitions(self, protocol, conn): + "Get leader raises if no partitions is defined for a topic" + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {'topic_no_partitions': {}} + protocol.decode_metadata_response.return_value = (brokers, topics) + client = KafkaClient(host='broker_1', port=4567) self.assertItemsEqual({}, client.topics_to_brokers) self.assertRaises( - Exception, + PartitionUnavailableError, client._get_leader_for_partition, - 'topic_1', 0) - - # calling _get_leader_for_partition (from any broker aware request) - # will try loading metadata again for the same topic - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) - } - leader = client._get_leader_for_partition('topic_1', 0) - - self.assertEqual(brokers[0], leader) - self.assertItemsEqual({ - TopicAndPartition('topic_1', 0): brokers[0]}, - client.topics_to_brokers) + 'topic_no_partitions', 0) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') - def test_client_load_metadata_noleader_partitions(self, protocol, conn): + def test_get_leader_returns_none_when_noleader(self, protocol, conn): + "Getting leader for partitions returns None when the partiion has no leader" conn.recv.return_value = 'response' # anything but None @@ -463,42 +493,29 @@ class TestClient(unittest.TestCase): brokers[1] = BrokerMetadata(1, 'broker_2', 5678) topics = {} - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, -1, [], []) - } - topics['topic_2'] = { - 0: PartitionMetadata('topic_2', 0, 0, [0, 1], []), - 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) + topics['topic_noleader'] = { + 0: PartitionMetadata('topic_noleader', 0, -1, [], []), + 1: PartitionMetadata('topic_noleader', 1, -1, [], []) } protocol.decode_metadata_response.return_value = (brokers, topics) client = KafkaClient(host='broker_1', port=4567) self.assertItemsEqual( { - TopicAndPartition('topic_2', 0): brokers[0], - TopicAndPartition('topic_2', 1): brokers[1] + TopicAndPartition('topic_noleader', 0): None, + TopicAndPartition('topic_noleader', 1): None }, client.topics_to_brokers) - self.assertRaises( - Exception, - client._get_leader_for_partition, - 'topic_1', 0) + self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) + self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) - # calling _get_leader_for_partition (from any broker aware request) - # will try loading metadata again for the same topic - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) + topics['topic_noleader'] = { + 0: PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1]), + 1: PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0]) } - leader = client._get_leader_for_partition('topic_1', 0) - - self.assertEqual(brokers[0], leader) - self.assertItemsEqual( - { - TopicAndPartition('topic_1', 0): brokers[0], - TopicAndPartition('topic_2', 0): brokers[0], - TopicAndPartition('topic_2', 1): brokers[1] - }, - client.topics_to_brokers) + protocol.decode_metadata_response.return_value = (brokers, topics) + self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) + self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) if __name__ == '__main__': unittest.main() From b253166bec5a7d836767523b1ad5275eeed6b83f Mon Sep 17 00:00:00 2001 From: mrtheb Date: Fri, 31 Jan 2014 22:08:08 -0500 Subject: [PATCH 4/7] check for broker None in send_broker_aware_request (added test for it) --- kafka/client.py | 19 ++++++++++++++----- test/test_unit.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index a76bf47..fbbff25 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -52,14 +52,22 @@ class KafkaClient(object): return self.conns[(broker.host, broker.port)] def _get_leader_for_partition(self, topic, partition): + """ + Returns the leader for a partition or None if the partition exists + but has no leader. + + PartitionUnavailableError will be raised if the topic or partition + is not part of the metadata. + """ + key = TopicAndPartition(topic, partition) # reload metadata whether the partition is not available - # or has not leader (broker is None) + # or has no leader (broker is None) if self.topics_to_brokers.get(key) is None: self.load_metadata_for_topics(topic) if key not in self.topics_to_brokers: - raise PartitionUnavailableError("No leader for %s" % str(key)) + raise PartitionUnavailableError("%s not available" % str(key)) return self.topics_to_brokers[key] @@ -115,8 +123,9 @@ class KafkaClient(object): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) - if leader == -1: - raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition) + if leader is None: + raise PartitionUnavailableError( + "No leader for topic %s partition %s" % (payload.topic, payload.partition)) payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) @@ -249,7 +258,7 @@ class KafkaClient(object): self.topic_partitions[topic].append(partition) topic_part = TopicAndPartition(topic, partition) if meta.leader == -1: - log.info('No leader for topic %s partition %d', topic, partition) + log.info('No leader for topic %s partition %s', topic, partition) self.topics_to_brokers[topic_part] = None else: self.topics_to_brokers[topic_part] = brokers[meta.leader] diff --git a/test/test_unit.py b/test/test_unit.py index b8af242..f0edd16 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -517,5 +517,33 @@ class TestClient(unittest.TestCase): self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_send_produce_request_raises_when_noleader(self, protocol, conn): + "Getting leader for partitions returns None when the partiion has no leader" + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {} + topics['topic_noleader'] = { + 0: PartitionMetadata('topic_noleader', 0, -1, [], []), + 1: PartitionMetadata('topic_noleader', 1, -1, [], []) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(host='broker_1', port=4567) + + requests = [ProduceRequest( + "topic_noleader", 0, + [create_message("a"), create_message("b")])] + + self.assertRaises( + PartitionUnavailableError, + client.send_produce_request, requests) + if __name__ == '__main__': unittest.main() From 5e5d7097a70e79d90d573aef6a0adba40ada5f03 Mon Sep 17 00:00:00 2001 From: mrtheb Date: Sat, 15 Feb 2014 11:00:35 -0500 Subject: [PATCH 5/7] Changes based on comments by @rdiomar, plus added LeaderUnavailableError for clarity --- kafka/client.py | 11 +++++++---- kafka/common.py | 4 ++++ test/test_unit.py | 8 ++++---- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index fbbff25..c3606e4 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -8,6 +8,7 @@ from itertools import count from kafka.common import (ErrorMapping, TopicAndPartition, ConnectionError, FailedPayloadsError, BrokerResponseError, PartitionUnavailableError, + LeaderUnavailableError, KafkaUnavailableError) from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS @@ -124,8 +125,10 @@ class KafkaClient(object): leader = self._get_leader_for_partition(payload.topic, payload.partition) if leader is None: - raise PartitionUnavailableError( - "No leader for topic %s partition %s" % (payload.topic, payload.partition)) + raise LeaderUnavailableError( + "Leader not available for topic %s partition %s" % + (payload.topic, payload.partition)) + payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) @@ -250,7 +253,7 @@ class KafkaClient(object): self.reset_topic_metadata(topic) if not partitions: - log.info('No partitions for %s', topic) + log.warning('No partitions for %s', topic) continue self.topic_partitions[topic] = [] @@ -258,7 +261,7 @@ class KafkaClient(object): self.topic_partitions[topic].append(partition) topic_part = TopicAndPartition(topic, partition) if meta.leader == -1: - log.info('No leader for topic %s partition %s', topic, partition) + log.warning('No leader for topic %s partition %s', topic, partition) self.topics_to_brokers[topic_part] = None else: self.topics_to_brokers[topic_part] = brokers[meta.leader] diff --git a/kafka/common.py b/kafka/common.py index ec0b89b..b4fe5c7 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -82,6 +82,10 @@ class BrokerResponseError(KafkaError): pass +class LeaderUnavailableError(KafkaError): + pass + + class PartitionUnavailableError(KafkaError): pass diff --git a/test/test_unit.py b/test/test_unit.py index f0edd16..1439d8b 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -7,7 +7,7 @@ from kafka.common import ( ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, PartitionMetadata, - TopicAndPartition, PartitionUnavailableError + TopicAndPartition, LeaderUnavailableError, PartitionUnavailableError ) from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, @@ -386,7 +386,7 @@ class TestProtocol(unittest.TestCase): pass -class TestClient(unittest.TestCase): +class TestKafkaClient(unittest.TestCase): @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') @@ -520,7 +520,7 @@ class TestClient(unittest.TestCase): @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_send_produce_request_raises_when_noleader(self, protocol, conn): - "Getting leader for partitions returns None when the partiion has no leader" + "Send producer request raises LeaderUnavailableError if leader is not available" conn.recv.return_value = 'response' # anything but None @@ -542,7 +542,7 @@ class TestClient(unittest.TestCase): [create_message("a"), create_message("b")])] self.assertRaises( - PartitionUnavailableError, + LeaderUnavailableError, client.send_produce_request, requests) if __name__ == '__main__': From 828133cff064f4f8fba753183ac21619355ac005 Mon Sep 17 00:00:00 2001 From: mrtheb Date: Sat, 15 Feb 2014 22:05:11 -0500 Subject: [PATCH 6/7] fixing _get_leader_for_partition unittests --- test/test_unit.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/test_unit.py b/test/test_unit.py index 1439d8b..0b2b339 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -417,8 +417,8 @@ class TestKafkaClient(unittest.TestCase): # client loads metadata at init client = KafkaClient(host='broker_1', port=4567) - self.assertItemsEqual({ - TopicAndPartition('topic_1', 0): brokers[0], + self.assertDictEqual({ + TopicAndPartition('topic_1', 0): brokers[1], TopicAndPartition('topic_noleader', 0): None, TopicAndPartition('topic_noleader', 1): None, TopicAndPartition('topic_3', 0): brokers[0], @@ -443,7 +443,7 @@ class TestKafkaClient(unittest.TestCase): client = KafkaClient(host='broker_1', port=4567) # topic metadata is loaded but empty - self.assertItemsEqual({}, client.topics_to_brokers) + self.assertDictEqual({}, client.topics_to_brokers) topics['topic_no_partitions'] = { 0: PartitionMetadata('topic_no_partitions', 0, 0, [0, 1], [0, 1]) @@ -455,7 +455,7 @@ class TestKafkaClient(unittest.TestCase): leader = client._get_leader_for_partition('topic_no_partitions', 0) self.assertEqual(brokers[0], leader) - self.assertItemsEqual({ + self.assertDictEqual({ TopicAndPartition('topic_no_partitions', 0): brokers[0]}, client.topics_to_brokers) @@ -475,7 +475,7 @@ class TestKafkaClient(unittest.TestCase): client = KafkaClient(host='broker_1', port=4567) - self.assertItemsEqual({}, client.topics_to_brokers) + self.assertDictEqual({}, client.topics_to_brokers) self.assertRaises( PartitionUnavailableError, client._get_leader_for_partition, @@ -500,7 +500,7 @@ class TestKafkaClient(unittest.TestCase): protocol.decode_metadata_response.return_value = (brokers, topics) client = KafkaClient(host='broker_1', port=4567) - self.assertItemsEqual( + self.assertDictEqual( { TopicAndPartition('topic_noleader', 0): None, TopicAndPartition('topic_noleader', 1): None From 017f4842859d58ce32e74ad470ed619c5a8922df Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Tue, 18 Mar 2014 20:49:10 -0700 Subject: [PATCH 7/7] Fix py26 compatibility issue, add mock to tox --- test/test_unit.py | 32 +++++++++++++++----------------- tox.ini | 4 +++- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/test/test_unit.py b/test/test_unit.py index 47ec561..2d769fb 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -465,18 +465,17 @@ class TestKafkaClient(unittest.TestCase): return mocked_conns[(host, port)] # patch to avoid making requests before we want it - with patch.object(KafkaClient, 'load_metadata_for_topics'), \ - patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + with patch.object(KafkaClient, 'load_metadata_for_topics'): + with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) - client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) + self.assertRaises( + KafkaUnavailableError, + client._send_broker_unaware_request, + 1, 'fake request') - self.assertRaises( - KafkaUnavailableError, - client._send_broker_unaware_request, - 1, 'fake request') - - for key, conn in mocked_conns.iteritems(): - conn.send.assert_called_with(1, 'fake request') + for key, conn in mocked_conns.iteritems(): + conn.send.assert_called_with(1, 'fake request') def test_send_broker_unaware_request(self): 'Tests that call works when at least one of the host is available' @@ -495,15 +494,14 @@ class TestKafkaClient(unittest.TestCase): return mocked_conns[(host, port)] # patch to avoid making requests before we want it - with patch.object(KafkaClient, 'load_metadata_for_topics'), \ - patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + with patch.object(KafkaClient, 'load_metadata_for_topics'): + with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') - client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + resp = client._send_broker_unaware_request(1, 'fake request') - resp = client._send_broker_unaware_request(1, 'fake request') - - self.assertEqual('valid response', resp) - mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) + self.assertEqual('valid response', resp) + mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') diff --git a/tox.ini b/tox.ini index f41911c..0077c4d 100644 --- a/tox.ini +++ b/tox.ini @@ -1,7 +1,9 @@ [tox] envlist = py26, py27 [testenv] -deps = pytest +deps = + pytest + mock commands = py.test --basetemp={envtmpdir} [] setenv = PROJECT_ROOT = {toxinidir}