diff --git a/ceilometer/publisher/kafka_broker.py b/ceilometer/publisher/kafka_broker.py index 8b8955f11d..d3b359428a 100644 --- a/ceilometer/publisher/kafka_broker.py +++ b/ceilometer/publisher/kafka_broker.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +from debtcollector import removals import kafka from oslo_log import log from oslo_serialization import jsonutils @@ -24,6 +25,9 @@ from ceilometer.publisher import messaging LOG = log.getLogger(__name__) +@removals.removed_class("KafkaBrokerPublisher", + message="use NotifierPublisher instead", + removal_version='10.0') class KafkaBrokerPublisher(messaging.MessagingPublisher): """Publish metering data to kafka broker. diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py index 7343030bf6..f62cfc5c57 100644 --- a/ceilometer/publisher/messaging.py +++ b/ceilometer/publisher/messaging.py @@ -186,11 +186,49 @@ class MessagingPublisher(publisher.ConfigPublisherBase): class NotifierPublisher(MessagingPublisher): + """Publish metering data from notifer publisher. + + The ip address and port number of notifer can be configured in + ceilometer pipeline configuration file. + + User can customize the transport driver such as rabbit, kafka and + so on. The Notifer uses `sample` method as default method to send + notifications. + + This publisher has transmit options such as queue, drop, and + retry. These options are specified using policy field of URL parameter. + When queue option could be selected, local queue length can be determined + using max_queue_length field as well. When the transfer fails with retry + option, try to resend the data as many times as specified in max_retry + field. If max_retry is not specified, by default the number of retry + is 100. + + To enable this publisher, add the following section to the + /etc/ceilometer/pipeline.yaml file or simply add it to an existing + pipeline:: + + meter: + - name: meter_notifier + meters: + - "*" + sinks: + - notifier_sink + sinks: + - name: notifier_sink + transformers: + publishers: + - notifer://[notifier_ip]:[notifier_port]?topic=[topic]& + driver=driver&max_retry=100 + + """ + def __init__(self, conf, parsed_url, default_topic): super(NotifierPublisher, self).__init__(conf, parsed_url) options = urlparse.parse_qs(parsed_url.query) - topic = options.pop('topic', [default_topic]) + topics = options.pop('topic', [default_topic]) driver = options.pop('driver', ['rabbit'])[0] + self.max_retry = int(options.get('max_retry', [100])[-1]) + url = None if parsed_url.netloc != '': url = urlparse.urlunsplit([driver, parsed_url.netloc, @@ -201,7 +239,7 @@ class NotifierPublisher(MessagingPublisher): messaging.get_transport(self.conf, url), driver=self.conf.publisher_notifier.telemetry_driver, publisher_id='telemetry.publisher.%s' % self.conf.host, - topics=topic, + topics=topics, retry=self.retry ) diff --git a/ceilometer/tests/unit/publisher/test_messaging_publisher.py b/ceilometer/tests/unit/publisher/test_messaging_publisher.py index 203a48bcca..6be8ada3ba 100644 --- a/ceilometer/tests/unit/publisher/test_messaging_publisher.py +++ b/ceilometer/tests/unit/publisher/test_messaging_publisher.py @@ -18,6 +18,8 @@ import datetime import uuid import mock +import oslo_messaging +from oslo_messaging._drivers import impl_kafka as kafka_driver from oslo_utils import netutils import testscenarios.testcase @@ -147,6 +149,42 @@ class NotifierOnlyPublisherTest(BasePublisherTestCase): cgt.assert_called_with(self.CONF, 'amqp://foo:foo@127.0.0.1:1234/foo' '?amqp_auto_delete=true') + @mock.patch('ceilometer.messaging.get_transport') + def test_publish_with_none_rabbit_driver(self, cgt): + sample_publisher = msg_publisher.SampleNotifierPublisher( + self.CONF, + netutils.urlsplit('notifier://127.0.0.1:9092?driver=kafka')) + cgt.assert_called_with(self.CONF, 'kafka://127.0.0.1:9092') + transport = oslo_messaging.get_transport(self.CONF, + 'kafka://127.0.0.1:9092') + self.assertIsInstance(transport._driver, kafka_driver.KafkaDriver) + + side_effect = msg_publisher.DeliveryFailure() + with mock.patch.object(sample_publisher, '_send') as fake_send: + fake_send.side_effect = side_effect + self.assertRaises( + msg_publisher.DeliveryFailure, + sample_publisher.publish_samples, + self.test_sample_data) + self.assertEqual(0, len(sample_publisher.local_queue)) + self.assertEqual(100, len(fake_send.mock_calls)) + fake_send.assert_called_with('metering', mock.ANY) + + event_publisher = msg_publisher.EventNotifierPublisher( + self.CONF, + netutils.urlsplit('notifier://127.0.0.1:9092?driver=kafka')) + cgt.assert_called_with(self.CONF, 'kafka://127.0.0.1:9092') + + with mock.patch.object(event_publisher, '_send') as fake_send: + fake_send.side_effect = side_effect + self.assertRaises( + msg_publisher.DeliveryFailure, + event_publisher.publish_events, + self.test_event_data) + self.assertEqual(0, len(event_publisher.local_queue)) + self.assertEqual(100, len(fake_send.mock_calls)) + fake_send.assert_called_with('event', mock.ANY) + class TestPublisher(testscenarios.testcase.WithScenarios, BasePublisherTestCase): @@ -186,7 +224,8 @@ class TestPublisherPolicy(TestPublisher): self.assertTrue(mylog.info.called) self.assertEqual('default', publisher.policy) self.assertEqual(0, len(publisher.local_queue)) - fake_send.assert_called_once_with( + self.assertEqual(100, len(fake_send.mock_calls)) + fake_send.assert_called_with( self.topic, mock.ANY) @mock.patch('ceilometer.publisher.messaging.LOG') @@ -203,7 +242,8 @@ class TestPublisherPolicy(TestPublisher): self.test_data) self.assertTrue(mylog.info.called) self.assertEqual(0, len(publisher.local_queue)) - fake_send.assert_called_once_with( + self.assertEqual(100, len(fake_send.mock_calls)) + fake_send.assert_called_with( self.topic, mock.ANY) @mock.patch('ceilometer.publisher.messaging.LOG') @@ -221,7 +261,8 @@ class TestPublisherPolicy(TestPublisher): self.assertTrue(mylog.warning.called) self.assertEqual('default', publisher.policy) self.assertEqual(0, len(publisher.local_queue)) - fake_send.assert_called_once_with( + self.assertEqual(100, len(fake_send.mock_calls)) + fake_send.assert_called_with( self.topic, mock.ANY) diff --git a/releasenotes/notes/deprecate-kafka-publisher-17b4f221758e15da.yaml b/releasenotes/notes/deprecate-kafka-publisher-17b4f221758e15da.yaml new file mode 100644 index 0000000000..0f58826dc4 --- /dev/null +++ b/releasenotes/notes/deprecate-kafka-publisher-17b4f221758e15da.yaml @@ -0,0 +1,11 @@ +--- +features: + - | + Ceilometer supports generic notifier to publish data and allow user to + customize parameters such as topic, transport driver and priority. The + publisher configuration in pipeline.yaml can be + notifer://[notifier_ip]:[notifier_port]?topic=[topic]&driver=driver&max_retry=100 + Not only rabbit driver, but also other driver like kafka can be used. +deprecations: + - | + Kafka publisher is deprecated to use generic notifier instead.