From af23b6eeafcb7adc76f60bfcb04aee699c975e31 Mon Sep 17 00:00:00 2001 From: Hanxi Liu Date: Thu, 20 Jul 2017 17:57:49 +0800 Subject: [PATCH] Deprecate kafka publisher Original kafka publisher has some restrictions on demand. oslo.messaging has introduced kafka driver and make it scalable, so it's recommended to integrate kafka driver with NotifierPublisher and push events to broker. And this patch also implements a generic NotifierPublisher to push data to external server with customized transport driver. Closes-Bug: #1649947 Change-Id: I3eecd74b61a42ba1c760b53032ac6e80e85d1dda --- ceilometer/publisher/kafka_broker.py | 4 ++ ceilometer/publisher/messaging.py | 42 ++++++++++++++++- .../publisher/test_messaging_publisher.py | 47 +++++++++++++++++-- ...cate-kafka-publisher-17b4f221758e15da.yaml | 11 +++++ 4 files changed, 99 insertions(+), 5 deletions(-) create mode 100644 releasenotes/notes/deprecate-kafka-publisher-17b4f221758e15da.yaml diff --git a/ceilometer/publisher/kafka_broker.py b/ceilometer/publisher/kafka_broker.py index 8b8955f11d..d3b359428a 100644 --- a/ceilometer/publisher/kafka_broker.py +++ b/ceilometer/publisher/kafka_broker.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +from debtcollector import removals import kafka from oslo_log import log from oslo_serialization import jsonutils @@ -24,6 +25,9 @@ from ceilometer.publisher import messaging LOG = log.getLogger(__name__) +@removals.removed_class("KafkaBrokerPublisher", + message="use NotifierPublisher instead", + removal_version='10.0') class KafkaBrokerPublisher(messaging.MessagingPublisher): """Publish metering data to kafka broker. diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py index 7343030bf6..f62cfc5c57 100644 --- a/ceilometer/publisher/messaging.py +++ b/ceilometer/publisher/messaging.py @@ -186,11 +186,49 @@ class MessagingPublisher(publisher.ConfigPublisherBase): class NotifierPublisher(MessagingPublisher): + """Publish metering data from notifer publisher. + + The ip address and port number of notifer can be configured in + ceilometer pipeline configuration file. + + User can customize the transport driver such as rabbit, kafka and + so on. The Notifer uses `sample` method as default method to send + notifications. + + This publisher has transmit options such as queue, drop, and + retry. These options are specified using policy field of URL parameter. + When queue option could be selected, local queue length can be determined + using max_queue_length field as well. When the transfer fails with retry + option, try to resend the data as many times as specified in max_retry + field. If max_retry is not specified, by default the number of retry + is 100. + + To enable this publisher, add the following section to the + /etc/ceilometer/pipeline.yaml file or simply add it to an existing + pipeline:: + + meter: + - name: meter_notifier + meters: + - "*" + sinks: + - notifier_sink + sinks: + - name: notifier_sink + transformers: + publishers: + - notifer://[notifier_ip]:[notifier_port]?topic=[topic]& + driver=driver&max_retry=100 + + """ + def __init__(self, conf, parsed_url, default_topic): super(NotifierPublisher, self).__init__(conf, parsed_url) options = urlparse.parse_qs(parsed_url.query) - topic = options.pop('topic', [default_topic]) + topics = options.pop('topic', [default_topic]) driver = options.pop('driver', ['rabbit'])[0] + self.max_retry = int(options.get('max_retry', [100])[-1]) + url = None if parsed_url.netloc != '': url = urlparse.urlunsplit([driver, parsed_url.netloc, @@ -201,7 +239,7 @@ class NotifierPublisher(MessagingPublisher): messaging.get_transport(self.conf, url), driver=self.conf.publisher_notifier.telemetry_driver, publisher_id='telemetry.publisher.%s' % self.conf.host, - topics=topic, + topics=topics, retry=self.retry ) diff --git a/ceilometer/tests/unit/publisher/test_messaging_publisher.py b/ceilometer/tests/unit/publisher/test_messaging_publisher.py index 203a48bcca..6be8ada3ba 100644 --- a/ceilometer/tests/unit/publisher/test_messaging_publisher.py +++ b/ceilometer/tests/unit/publisher/test_messaging_publisher.py @@ -18,6 +18,8 @@ import datetime import uuid import mock +import oslo_messaging +from oslo_messaging._drivers import impl_kafka as kafka_driver from oslo_utils import netutils import testscenarios.testcase @@ -147,6 +149,42 @@ class NotifierOnlyPublisherTest(BasePublisherTestCase): cgt.assert_called_with(self.CONF, 'amqp://foo:foo@127.0.0.1:1234/foo' '?amqp_auto_delete=true') + @mock.patch('ceilometer.messaging.get_transport') + def test_publish_with_none_rabbit_driver(self, cgt): + sample_publisher = msg_publisher.SampleNotifierPublisher( + self.CONF, + netutils.urlsplit('notifier://127.0.0.1:9092?driver=kafka')) + cgt.assert_called_with(self.CONF, 'kafka://127.0.0.1:9092') + transport = oslo_messaging.get_transport(self.CONF, + 'kafka://127.0.0.1:9092') + self.assertIsInstance(transport._driver, kafka_driver.KafkaDriver) + + side_effect = msg_publisher.DeliveryFailure() + with mock.patch.object(sample_publisher, '_send') as fake_send: + fake_send.side_effect = side_effect + self.assertRaises( + msg_publisher.DeliveryFailure, + sample_publisher.publish_samples, + self.test_sample_data) + self.assertEqual(0, len(sample_publisher.local_queue)) + self.assertEqual(100, len(fake_send.mock_calls)) + fake_send.assert_called_with('metering', mock.ANY) + + event_publisher = msg_publisher.EventNotifierPublisher( + self.CONF, + netutils.urlsplit('notifier://127.0.0.1:9092?driver=kafka')) + cgt.assert_called_with(self.CONF, 'kafka://127.0.0.1:9092') + + with mock.patch.object(event_publisher, '_send') as fake_send: + fake_send.side_effect = side_effect + self.assertRaises( + msg_publisher.DeliveryFailure, + event_publisher.publish_events, + self.test_event_data) + self.assertEqual(0, len(event_publisher.local_queue)) + self.assertEqual(100, len(fake_send.mock_calls)) + fake_send.assert_called_with('event', mock.ANY) + class TestPublisher(testscenarios.testcase.WithScenarios, BasePublisherTestCase): @@ -186,7 +224,8 @@ class TestPublisherPolicy(TestPublisher): self.assertTrue(mylog.info.called) self.assertEqual('default', publisher.policy) self.assertEqual(0, len(publisher.local_queue)) - fake_send.assert_called_once_with( + self.assertEqual(100, len(fake_send.mock_calls)) + fake_send.assert_called_with( self.topic, mock.ANY) @mock.patch('ceilometer.publisher.messaging.LOG') @@ -203,7 +242,8 @@ class TestPublisherPolicy(TestPublisher): self.test_data) self.assertTrue(mylog.info.called) self.assertEqual(0, len(publisher.local_queue)) - fake_send.assert_called_once_with( + self.assertEqual(100, len(fake_send.mock_calls)) + fake_send.assert_called_with( self.topic, mock.ANY) @mock.patch('ceilometer.publisher.messaging.LOG') @@ -221,7 +261,8 @@ class TestPublisherPolicy(TestPublisher): self.assertTrue(mylog.warning.called) self.assertEqual('default', publisher.policy) self.assertEqual(0, len(publisher.local_queue)) - fake_send.assert_called_once_with( + self.assertEqual(100, len(fake_send.mock_calls)) + fake_send.assert_called_with( self.topic, mock.ANY) diff --git a/releasenotes/notes/deprecate-kafka-publisher-17b4f221758e15da.yaml b/releasenotes/notes/deprecate-kafka-publisher-17b4f221758e15da.yaml new file mode 100644 index 0000000000..0f58826dc4 --- /dev/null +++ b/releasenotes/notes/deprecate-kafka-publisher-17b4f221758e15da.yaml @@ -0,0 +1,11 @@ +--- +features: + - | + Ceilometer supports generic notifier to publish data and allow user to + customize parameters such as topic, transport driver and priority. The + publisher configuration in pipeline.yaml can be + notifer://[notifier_ip]:[notifier_port]?topic=[topic]&driver=driver&max_retry=100 + Not only rabbit driver, but also other driver like kafka can be used. +deprecations: + - | + Kafka publisher is deprecated to use generic notifier instead.