Replace SimpleProducer with KafkaProducer

SimpleProducer is marked as deprecated at kafka-python library [1].
The deprecated method causes "FailedPayloadsError: FailedPayloadsError" error.
This patch replaces SimpleProducer with KafkaProducer.

[1] http://kafka-python.readthedocs.io/en/1.1.1/simple.html

Change-Id: I7c979e1064c9c42e82ae71e2631a3d2bd0462839
Closes-bug: #1628456
This commit is contained in:
celik.esra 2016-11-02 15:47:36 +03:00
parent 24e71595a4
commit e0a28535b9
3 changed files with 22 additions and 18 deletions

View File

@ -77,8 +77,12 @@ class KafkaBrokerPublisher(messaging.MessagingPublisher):
return
try:
client = kafka.KafkaClient("%s:%s" % (self._host, self._port))
self._producer = kafka.SimpleProducer(client)
self._producer = kafka.KafkaProducer(
bootstrap_servers=["%s:%s" % (self._host, self._port)])
except kafka.errors.KafkaError as e:
LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
raise messaging.DeliveryFailure('Kafka Client is not available, '
'please restart Kafka client')
except Exception as e:
LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
raise messaging.DeliveryFailure('Kafka Client is not available, '
@ -91,6 +95,6 @@ class KafkaBrokerPublisher(messaging.MessagingPublisher):
# application...
try:
for d in data:
self._producer.send_messages(self._topic, jsonutils.dumps(d))
self._producer.send(self._topic, jsonutils.dumps(d))
except Exception as e:
messaging.raise_delivery_failure(e)

View File

@ -103,7 +103,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_samples(self.test_data)
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
self.assertEqual(5, len(fake_producer.send.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
def test_publish_without_options(self):
@ -112,7 +112,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_samples(self.test_data)
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
self.assertEqual(5, len(fake_producer.send.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
def test_publish_to_host_without_policy(self):
@ -129,11 +129,11 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
'kafka://127.0.0.1:9092?topic=ceilometer&policy=default'))
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = TypeError
fake_producer.send.side_effect = TypeError
self.assertRaises(msg_publisher.DeliveryFailure,
publisher.publish_samples,
self.test_data)
self.assertEqual(100, len(fake_producer.send_messages.mock_calls))
self.assertEqual(100, len(fake_producer.send.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
def test_publish_to_host_with_drop_policy(self):
@ -141,9 +141,9 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
'kafka://127.0.0.1:9092?topic=ceilometer&policy=drop'))
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = Exception("test")
fake_producer.send.side_effect = Exception("test")
publisher.publish_samples(self.test_data)
self.assertEqual(1, len(fake_producer.send_messages.mock_calls))
self.assertEqual(1, len(fake_producer.send.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
def test_publish_to_host_with_queue_policy(self):
@ -151,9 +151,9 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = Exception("test")
fake_producer.send.side_effect = Exception("test")
publisher.publish_samples(self.test_data)
self.assertEqual(1, len(fake_producer.send_messages.mock_calls))
self.assertEqual(1, len(fake_producer.send.mock_calls))
self.assertEqual(1, len(publisher.local_queue))
def test_publish_to_down_host_with_default_queue_size(self):
@ -161,7 +161,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = Exception("test")
fake_producer.send.side_effect = Exception("test")
for i in range(0, 2000):
for s in self.test_data:
@ -179,7 +179,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = Exception("test")
fake_producer.send.side_effect = Exception("test")
for i in range(0, 16):
for s in self.test_data:
s.name = 'test-%d' % i
@ -187,7 +187,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
self.assertEqual(16, len(publisher.local_queue))
fake_producer.send_messages.side_effect = None
fake_producer.send.side_effect = None
for s in self.test_data:
s.name = 'test-%d' % 16
publisher.publish_samples(self.test_data)
@ -199,12 +199,12 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_events(self.test_event_data)
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
self.assertEqual(5, len(fake_producer.send.mock_calls))
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = Exception("test")
fake_producer.send.side_effect = Exception("test")
self.assertRaises(msg_publisher.DeliveryFailure,
publisher.publish_events,
self.test_event_data)
self.assertEqual(100, len(fake_producer.send_messages.mock_calls))
self.assertEqual(100, len(fake_producer.send.mock_calls))
self.assertEqual(0, len(publisher.local_queue))

View File

@ -9,7 +9,7 @@ debtcollector>=1.2.0 # Apache-2.0
retrying!=1.3.0,>=1.2.3 # Apache-2.0
jsonpath-rw-ext>=0.1.9 # Apache-2.0
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
kafka-python<1.0.0,>=0.9.5 # Apache-2.0
kafka-python>=1.3.1 # Apache-2.0
keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0
lxml>=2.3 # BSD
msgpack-python>=0.4.0 # Apache-2.0