From e0a28535b91e81f2b8d4b6c405bf35e7651d1000 Mon Sep 17 00:00:00 2001 From: "celik.esra" Date: Wed, 2 Nov 2016 15:47:36 +0300 Subject: [PATCH] Replace SimpleProducer with KafkaProducer SimpleProducer is marked as deprecated at kafka-python library [1]. The deprecated method causes "FailedPayloadsError: FailedPayloadsError" error. This patch replaces SimpleProducer with KafkaProducer. [1] http://kafka-python.readthedocs.io/en/1.1.1/simple.html Change-Id: I7c979e1064c9c42e82ae71e2631a3d2bd0462839 Closes-bug: #1628456 --- ceilometer/publisher/kafka_broker.py | 10 +++++-- .../publisher/test_kafka_broker_publisher.py | 28 +++++++++---------- requirements.txt | 2 +- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/ceilometer/publisher/kafka_broker.py b/ceilometer/publisher/kafka_broker.py index 2ecae2d6e4..8a73b08f58 100644 --- a/ceilometer/publisher/kafka_broker.py +++ b/ceilometer/publisher/kafka_broker.py @@ -77,8 +77,12 @@ class KafkaBrokerPublisher(messaging.MessagingPublisher): return try: - client = kafka.KafkaClient("%s:%s" % (self._host, self._port)) - self._producer = kafka.SimpleProducer(client) + self._producer = kafka.KafkaProducer( + bootstrap_servers=["%s:%s" % (self._host, self._port)]) + except kafka.errors.KafkaError as e: + LOG.exception(_LE("Failed to connect to Kafka service: %s"), e) + raise messaging.DeliveryFailure('Kafka Client is not available, ' + 'please restart Kafka client') except Exception as e: LOG.exception(_LE("Failed to connect to Kafka service: %s"), e) raise messaging.DeliveryFailure('Kafka Client is not available, ' @@ -91,6 +95,6 @@ class KafkaBrokerPublisher(messaging.MessagingPublisher): # application... try: for d in data: - self._producer.send_messages(self._topic, jsonutils.dumps(d)) + self._producer.send(self._topic, jsonutils.dumps(d)) except Exception as e: messaging.raise_delivery_failure(e) diff --git a/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py b/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py index aec30d84b6..f13cf94d4b 100644 --- a/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py +++ b/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py @@ -103,7 +103,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase): with mock.patch.object(publisher, '_producer') as fake_producer: publisher.publish_samples(self.test_data) - self.assertEqual(5, len(fake_producer.send_messages.mock_calls)) + self.assertEqual(5, len(fake_producer.send.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) def test_publish_without_options(self): @@ -112,7 +112,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase): with mock.patch.object(publisher, '_producer') as fake_producer: publisher.publish_samples(self.test_data) - self.assertEqual(5, len(fake_producer.send_messages.mock_calls)) + self.assertEqual(5, len(fake_producer.send.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) def test_publish_to_host_without_policy(self): @@ -129,11 +129,11 @@ class TestKafkaPublisher(tests_base.BaseTestCase): 'kafka://127.0.0.1:9092?topic=ceilometer&policy=default')) with mock.patch.object(publisher, '_producer') as fake_producer: - fake_producer.send_messages.side_effect = TypeError + fake_producer.send.side_effect = TypeError self.assertRaises(msg_publisher.DeliveryFailure, publisher.publish_samples, self.test_data) - self.assertEqual(100, len(fake_producer.send_messages.mock_calls)) + self.assertEqual(100, len(fake_producer.send.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) def test_publish_to_host_with_drop_policy(self): @@ -141,9 +141,9 @@ class TestKafkaPublisher(tests_base.BaseTestCase): 'kafka://127.0.0.1:9092?topic=ceilometer&policy=drop')) with mock.patch.object(publisher, '_producer') as fake_producer: - fake_producer.send_messages.side_effect = Exception("test") + fake_producer.send.side_effect = Exception("test") publisher.publish_samples(self.test_data) - self.assertEqual(1, len(fake_producer.send_messages.mock_calls)) + self.assertEqual(1, len(fake_producer.send.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) def test_publish_to_host_with_queue_policy(self): @@ -151,9 +151,9 @@ class TestKafkaPublisher(tests_base.BaseTestCase): 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) with mock.patch.object(publisher, '_producer') as fake_producer: - fake_producer.send_messages.side_effect = Exception("test") + fake_producer.send.side_effect = Exception("test") publisher.publish_samples(self.test_data) - self.assertEqual(1, len(fake_producer.send_messages.mock_calls)) + self.assertEqual(1, len(fake_producer.send.mock_calls)) self.assertEqual(1, len(publisher.local_queue)) def test_publish_to_down_host_with_default_queue_size(self): @@ -161,7 +161,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase): 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) with mock.patch.object(publisher, '_producer') as fake_producer: - fake_producer.send_messages.side_effect = Exception("test") + fake_producer.send.side_effect = Exception("test") for i in range(0, 2000): for s in self.test_data: @@ -179,7 +179,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase): 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) with mock.patch.object(publisher, '_producer') as fake_producer: - fake_producer.send_messages.side_effect = Exception("test") + fake_producer.send.side_effect = Exception("test") for i in range(0, 16): for s in self.test_data: s.name = 'test-%d' % i @@ -187,7 +187,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase): self.assertEqual(16, len(publisher.local_queue)) - fake_producer.send_messages.side_effect = None + fake_producer.send.side_effect = None for s in self.test_data: s.name = 'test-%d' % 16 publisher.publish_samples(self.test_data) @@ -199,12 +199,12 @@ class TestKafkaPublisher(tests_base.BaseTestCase): with mock.patch.object(publisher, '_producer') as fake_producer: publisher.publish_events(self.test_event_data) - self.assertEqual(5, len(fake_producer.send_messages.mock_calls)) + self.assertEqual(5, len(fake_producer.send.mock_calls)) with mock.patch.object(publisher, '_producer') as fake_producer: - fake_producer.send_messages.side_effect = Exception("test") + fake_producer.send.side_effect = Exception("test") self.assertRaises(msg_publisher.DeliveryFailure, publisher.publish_events, self.test_event_data) - self.assertEqual(100, len(fake_producer.send_messages.mock_calls)) + self.assertEqual(100, len(fake_producer.send.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) diff --git a/requirements.txt b/requirements.txt index 7804e36989..99821b8c53 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ debtcollector>=1.2.0 # Apache-2.0 retrying!=1.3.0,>=1.2.3 # Apache-2.0 jsonpath-rw-ext>=0.1.9 # Apache-2.0 jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT -kafka-python<1.0.0,>=0.9.5 # Apache-2.0 +kafka-python>=1.3.1 # Apache-2.0 keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0 lxml>=2.3 # BSD msgpack-python>=0.4.0 # Apache-2.0