Merge "Deprecate kafka publisher"

This commit is contained in:
Jenkins 2017-07-27 20:48:05 +00:00 committed by Gerrit Code Review
commit 22e8481e70
4 changed files with 99 additions and 5 deletions

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from debtcollector import removals
import kafka import kafka
from oslo_log import log from oslo_log import log
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
@ -24,6 +25,9 @@ from ceilometer.publisher import messaging
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@removals.removed_class("KafkaBrokerPublisher",
message="use NotifierPublisher instead",
removal_version='10.0')
class KafkaBrokerPublisher(messaging.MessagingPublisher): class KafkaBrokerPublisher(messaging.MessagingPublisher):
"""Publish metering data to kafka broker. """Publish metering data to kafka broker.

View File

@ -186,11 +186,49 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
class NotifierPublisher(MessagingPublisher): class NotifierPublisher(MessagingPublisher):
"""Publish metering data from notifer publisher.
The ip address and port number of notifer can be configured in
ceilometer pipeline configuration file.
User can customize the transport driver such as rabbit, kafka and
so on. The Notifer uses `sample` method as default method to send
notifications.
This publisher has transmit options such as queue, drop, and
retry. These options are specified using policy field of URL parameter.
When queue option could be selected, local queue length can be determined
using max_queue_length field as well. When the transfer fails with retry
option, try to resend the data as many times as specified in max_retry
field. If max_retry is not specified, by default the number of retry
is 100.
To enable this publisher, add the following section to the
/etc/ceilometer/pipeline.yaml file or simply add it to an existing
pipeline::
meter:
- name: meter_notifier
meters:
- "*"
sinks:
- notifier_sink
sinks:
- name: notifier_sink
transformers:
publishers:
- notifer://[notifier_ip]:[notifier_port]?topic=[topic]&
driver=driver&max_retry=100
"""
def __init__(self, conf, parsed_url, default_topic): def __init__(self, conf, parsed_url, default_topic):
super(NotifierPublisher, self).__init__(conf, parsed_url) super(NotifierPublisher, self).__init__(conf, parsed_url)
options = urlparse.parse_qs(parsed_url.query) options = urlparse.parse_qs(parsed_url.query)
topic = options.pop('topic', [default_topic]) topics = options.pop('topic', [default_topic])
driver = options.pop('driver', ['rabbit'])[0] driver = options.pop('driver', ['rabbit'])[0]
self.max_retry = int(options.get('max_retry', [100])[-1])
url = None url = None
if parsed_url.netloc != '': if parsed_url.netloc != '':
url = urlparse.urlunsplit([driver, parsed_url.netloc, url = urlparse.urlunsplit([driver, parsed_url.netloc,
@ -201,7 +239,7 @@ class NotifierPublisher(MessagingPublisher):
messaging.get_transport(self.conf, url), messaging.get_transport(self.conf, url),
driver=self.conf.publisher_notifier.telemetry_driver, driver=self.conf.publisher_notifier.telemetry_driver,
publisher_id='telemetry.publisher.%s' % self.conf.host, publisher_id='telemetry.publisher.%s' % self.conf.host,
topics=topic, topics=topics,
retry=self.retry retry=self.retry
) )

View File

@ -18,6 +18,8 @@ import datetime
import uuid import uuid
import mock import mock
import oslo_messaging
from oslo_messaging._drivers import impl_kafka as kafka_driver
from oslo_utils import netutils from oslo_utils import netutils
import testscenarios.testcase import testscenarios.testcase
@ -147,6 +149,42 @@ class NotifierOnlyPublisherTest(BasePublisherTestCase):
cgt.assert_called_with(self.CONF, 'amqp://foo:foo@127.0.0.1:1234/foo' cgt.assert_called_with(self.CONF, 'amqp://foo:foo@127.0.0.1:1234/foo'
'?amqp_auto_delete=true') '?amqp_auto_delete=true')
@mock.patch('ceilometer.messaging.get_transport')
def test_publish_with_none_rabbit_driver(self, cgt):
sample_publisher = msg_publisher.SampleNotifierPublisher(
self.CONF,
netutils.urlsplit('notifier://127.0.0.1:9092?driver=kafka'))
cgt.assert_called_with(self.CONF, 'kafka://127.0.0.1:9092')
transport = oslo_messaging.get_transport(self.CONF,
'kafka://127.0.0.1:9092')
self.assertIsInstance(transport._driver, kafka_driver.KafkaDriver)
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(sample_publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
self.assertRaises(
msg_publisher.DeliveryFailure,
sample_publisher.publish_samples,
self.test_sample_data)
self.assertEqual(0, len(sample_publisher.local_queue))
self.assertEqual(100, len(fake_send.mock_calls))
fake_send.assert_called_with('metering', mock.ANY)
event_publisher = msg_publisher.EventNotifierPublisher(
self.CONF,
netutils.urlsplit('notifier://127.0.0.1:9092?driver=kafka'))
cgt.assert_called_with(self.CONF, 'kafka://127.0.0.1:9092')
with mock.patch.object(event_publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
self.assertRaises(
msg_publisher.DeliveryFailure,
event_publisher.publish_events,
self.test_event_data)
self.assertEqual(0, len(event_publisher.local_queue))
self.assertEqual(100, len(fake_send.mock_calls))
fake_send.assert_called_with('event', mock.ANY)
class TestPublisher(testscenarios.testcase.WithScenarios, class TestPublisher(testscenarios.testcase.WithScenarios,
BasePublisherTestCase): BasePublisherTestCase):
@ -186,7 +224,8 @@ class TestPublisherPolicy(TestPublisher):
self.assertTrue(mylog.info.called) self.assertTrue(mylog.info.called)
self.assertEqual('default', publisher.policy) self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with( self.assertEqual(100, len(fake_send.mock_calls))
fake_send.assert_called_with(
self.topic, mock.ANY) self.topic, mock.ANY)
@mock.patch('ceilometer.publisher.messaging.LOG') @mock.patch('ceilometer.publisher.messaging.LOG')
@ -203,7 +242,8 @@ class TestPublisherPolicy(TestPublisher):
self.test_data) self.test_data)
self.assertTrue(mylog.info.called) self.assertTrue(mylog.info.called)
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with( self.assertEqual(100, len(fake_send.mock_calls))
fake_send.assert_called_with(
self.topic, mock.ANY) self.topic, mock.ANY)
@mock.patch('ceilometer.publisher.messaging.LOG') @mock.patch('ceilometer.publisher.messaging.LOG')
@ -221,7 +261,8 @@ class TestPublisherPolicy(TestPublisher):
self.assertTrue(mylog.warning.called) self.assertTrue(mylog.warning.called)
self.assertEqual('default', publisher.policy) self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with( self.assertEqual(100, len(fake_send.mock_calls))
fake_send.assert_called_with(
self.topic, mock.ANY) self.topic, mock.ANY)

View File

@ -0,0 +1,11 @@
---
features:
- |
Ceilometer supports generic notifier to publish data and allow user to
customize parameters such as topic, transport driver and priority. The
publisher configuration in pipeline.yaml can be
notifer://[notifier_ip]:[notifier_port]?topic=[topic]&driver=driver&max_retry=100
Not only rabbit driver, but also other driver like kafka can be used.
deprecations:
- |
Kafka publisher is deprecated to use generic notifier instead.