From fec77dbc85fed701fdbc3a96bae57e5ae3a705cb Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Tue, 29 Jul 2014 12:12:06 +0200 Subject: [PATCH] Permit usage of notifications for metering This patch introduces a new publisher that uses notification instead of RPC. And set it by default. Closes bug: #1005933 Implements blueprint replace-rpc-cast-with-notifications Co-Authored-By: Ala Rezmerita Change-Id: Idc40c148ef60d5e1349d30c66ba85691d93c5675 --- ceilometer/collector.py | 25 ++- ceilometer/publisher/{rpc.py => messaging.py} | 75 +++++++-- .../api/v2/test_post_samples_scenarios.py | 20 +-- ...blisher.py => test_messaging_publisher.py} | 150 ++++++++++-------- ceilometer/tests/test_collector.py | 3 +- etc/ceilometer/pipeline.yaml | 8 +- setup.cfg | 7 +- 7 files changed, 182 insertions(+), 106 deletions(-) rename ceilometer/publisher/{rpc.py => messaging.py} (76%) rename ceilometer/tests/publisher/{test_rpc_publisher.py => test_messaging_publisher.py} (72%) diff --git a/ceilometer/collector.py b/ceilometer/collector.py index 6da4371a..cde04a4a 100644 --- a/ceilometer/collector.py +++ b/ceilometer/collector.py @@ -19,6 +19,7 @@ import socket import msgpack from oslo.config import cfg +import oslo.messaging from oslo.utils import units from ceilometer import dispatcher @@ -38,8 +39,10 @@ OPTS = [ ] cfg.CONF.register_opts(OPTS, group="collector") -cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.rpc', +cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging', group="publisher_rpc") +cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging', + group="publisher_notifier") LOG = log.getLogger(__name__) @@ -52,6 +55,7 @@ class CollectorService(os_service.Service): # ensure dispatcher is configured before starting other services self.dispatcher_manager = dispatcher.load_dispatcher_manager() self.rpc_server = None + self.notification_server = None super(CollectorService, self).start() if cfg.CONF.collector.udp_address: @@ -61,7 +65,14 @@ class CollectorService(os_service.Service): if transport: self.rpc_server = messaging.get_rpc_server( transport, cfg.CONF.publisher_rpc.metering_topic, self) + + target = oslo.messaging.Target( + topic=cfg.CONF.publisher_notifier.metering_topic) + self.notification_server = messaging.get_notification_listener( + transport, [target], [self]) + self.rpc_server.start() + self.notification_server.start() if not cfg.CONF.collector.udp_address: # Add a dummy thread to have wait() working @@ -94,8 +105,20 @@ class CollectorService(os_service.Service): self.udp_run = False if self.rpc_server: self.rpc_server.stop() + if self.notification_server: + self.notification_server.stop() super(CollectorService, self).stop() + def sample(self, ctxt, publisher_id, event_type, payload, metadata): + """RPC endpoint for notification messages + + When another service sends a notification over the message + bus, this method receives it. + + """ + self.dispatcher_manager.map_method('record_metering_data', + data=payload) + def record_metering_data(self, context, data): """RPC endpoint for messages we send to ourselves. diff --git a/ceilometer/publisher/rpc.py b/ceilometer/publisher/messaging.py similarity index 76% rename from ceilometer/publisher/rpc.py rename to ceilometer/publisher/messaging.py index bf3c5a5b..99b4f6ec 100644 --- a/ceilometer/publisher/rpc.py +++ b/ceilometer/publisher/messaging.py @@ -17,13 +17,14 @@ """Publish a sample using the preferred RPC mechanism. """ - +import abc import itertools import operator from oslo.config import cfg import oslo.messaging import oslo.messaging._drivers.common +import six import six.moves.urllib.parse as urlparse from ceilometer import messaging @@ -35,7 +36,7 @@ from ceilometer.publisher import utils LOG = log.getLogger(__name__) -METER_PUBLISH_OPTS = [ +METER_PUBLISH_RPC_OPTS = [ cfg.StrOpt('metering_topic', default='metering', help='The topic that ceilometer uses for metering messages.', @@ -43,13 +44,24 @@ METER_PUBLISH_OPTS = [ ), ] +METER_PUBLISH_NOTIFIER_OPTS = [ + cfg.StrOpt('metering_topic', + default='metering', + help='The topic that ceilometer uses for metering ' + 'notifications.', + ), + cfg.StrOpt('metering_driver', + default='messagingv2', + help='The driver that ceilometer uses for metering ' + 'notifications.', + ) +] -def register_opts(config): - """Register the options for publishing metering messages.""" - config.register_opts(METER_PUBLISH_OPTS, group="publisher_rpc") - - -register_opts(cfg.CONF) +cfg.CONF.register_opts(METER_PUBLISH_RPC_OPTS, + group="publisher_rpc") +cfg.CONF.register_opts(METER_PUBLISH_NOTIFIER_OPTS, + group="publisher_notifier") +cfg.CONF.import_opt('host', 'ceilometer.service') def oslo_messaging_is_rabbit(): @@ -76,7 +88,8 @@ def override_backend_retry_config(value): cfg.CONF.set_override('rabbit_max_retries', value) -class RPCPublisher(publisher.PublisherBase): +@six.add_metaclass(abc.ABCMeta) +class MessagingPublisher(publisher.PublisherBase): def __init__(self, parsed_url): options = urlparse.parse_qs(parsed_url.query) @@ -86,8 +99,6 @@ class RPCPublisher(publisher.PublisherBase): self.per_meter_topic = bool(int( options.get('per_meter_topic', [0])[-1])) - self.target = options.get('target', ['record_metering_data'])[0] - self.policy = options.get('policy', ['default'])[-1] self.max_queue_length = int(options.get( 'max_queue_length', [1024])[-1]) @@ -105,9 +116,6 @@ class RPCPublisher(publisher.PublisherBase): % self.policy) self.policy = 'default' - transport = messaging.get_transport() - self.rpc_client = messaging.get_rpc_client(transport, version='1.0') - def publish_samples(self, context, samples): """Publish samples on RPC. @@ -174,8 +182,7 @@ class RPCPublisher(publisher.PublisherBase): while queue: context, topic, meters = queue[0] try: - self.rpc_client.prepare(topic=topic).cast( - context, self.target, data=meters) + self._send(context, topic, meters) except oslo.messaging._drivers.common.RPCException: samples = sum([len(m) for __, __, m in queue]) if policy == 'queue': @@ -191,3 +198,39 @@ class RPCPublisher(publisher.PublisherBase): else: queue.pop(0) return [] + + @abc.abstractmethod + def _send(self, context, topic, meters): + """Send the meters to the messaging topic.""" + + +class RPCPublisher(MessagingPublisher): + def __init__(self, parsed_url): + super(RPCPublisher, self).__init__(parsed_url) + + options = urlparse.parse_qs(parsed_url.query) + self.target = options.get('target', ['record_metering_data'])[0] + + self.rpc_client = messaging.get_rpc_client( + messaging.get_transport(), + version='1.0' + ) + + def _send(self, context, topic, meters): + self.rpc_client.prepare(topic=topic).cast(context, self.target, + data=meters) + + +class NotifierPublisher(MessagingPublisher): + def __init__(self, parsed_url): + super(NotifierPublisher, self).__init__(parsed_url) + self.notifier = oslo.messaging.Notifier( + messaging.get_transport(), + driver=cfg.CONF.publisher_notifier.metering_driver, + publisher_id='metering.publisher.%s' % cfg.CONF.host, + topic=cfg.CONF.publisher_notifier.metering_topic + ) + + def _send(self, context, event_type, meters): + self.notifier.sample(context.to_dict(), event_type=event_type, + payload=meters) diff --git a/ceilometer/tests/api/v2/test_post_samples_scenarios.py b/ceilometer/tests/api/v2/test_post_samples_scenarios.py index 1f3ceab7..c39cdbb6 100644 --- a/ceilometer/tests/api/v2/test_post_samples_scenarios.py +++ b/ceilometer/tests/api/v2/test_post_samples_scenarios.py @@ -30,23 +30,17 @@ from ceilometer.tests import db as tests_db class TestPostSamples(v2.FunctionalTest, tests_db.MixinTestsWithBackendScenarios): - def fake_cast(self, ctxt, target, data): - for m in data: + def fake_notifier_sample(self, ctxt, event_type, payload): + for m in payload: del m['message_signature'] - self.published.append(data) - - def fake_get_rpc_client(self, *args, **kwargs): - cast_ctxt = mock.Mock() - cast_ctxt.cast.side_effect = self.fake_cast - client = mock.Mock() - client.prepare.return_value = cast_ctxt - return client + self.published.append(payload) def setUp(self): self.published = [] - self.useFixture(mockpatch.Patch( - 'ceilometer.messaging.get_rpc_client', - new=self.fake_get_rpc_client)) + notifier = mock.Mock() + notifier.sample.side_effect = self.fake_notifier_sample + self.useFixture(mockpatch.Patch('oslo.messaging.Notifier', + return_value=notifier)) super(TestPostSamples, self).setUp() def test_one(self): diff --git a/ceilometer/tests/publisher/test_rpc_publisher.py b/ceilometer/tests/publisher/test_messaging_publisher.py similarity index 72% rename from ceilometer/tests/publisher/test_rpc_publisher.py rename to ceilometer/tests/publisher/test_messaging_publisher.py index 427be665..7eecca60 100644 --- a/ceilometer/tests/publisher/test_rpc_publisher.py +++ b/ceilometer/tests/publisher/test_messaging_publisher.py @@ -15,7 +15,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -"""Tests for ceilometer/publisher/rpc.py +"""Tests for ceilometer/publisher/messaging.py """ import datetime @@ -25,15 +25,16 @@ from oslo.config import fixture as fixture_config import oslo.messaging import oslo.messaging._drivers.common from oslo.utils import netutils +import testscenarios.testcase from ceilometer import messaging from ceilometer.openstack.common import context -from ceilometer.publisher import rpc +from ceilometer.publisher import messaging as msg_publisher from ceilometer import sample from ceilometer.tests import base as tests_base -class TestPublish(tests_base.BaseTestCase): +class BasePublisherTestCase(tests_base.BaseTestCase): test_data = [ sample.Sample( name='test', @@ -93,13 +94,15 @@ class TestPublish(tests_base.BaseTestCase): ] def setUp(self): - super(TestPublish, self).setUp() + super(BasePublisherTestCase, self).setUp() self.CONF = self.useFixture(fixture_config.Config()).conf self.setup_messaging(self.CONF) self.published = [] + +class RpcOnlyPublisherTest(BasePublisherTestCase): def test_published_no_mock(self): - publisher = rpc.RPCPublisher( + publisher = msg_publisher.RPCPublisher( netutils.urlsplit('rpc://')) endpoint = mock.MagicMock(['record_metering_data']) @@ -126,7 +129,7 @@ class TestPublish(tests_base.BaseTestCase): mock.ANY, data=Matcher()) def test_publish_target(self): - publisher = rpc.RPCPublisher( + publisher = msg_publisher.RPCPublisher( netutils.urlsplit('rpc://?target=custom_procedure_call')) cast_context = mock.MagicMock() with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: @@ -140,7 +143,7 @@ class TestPublish(tests_base.BaseTestCase): mock.ANY, 'custom_procedure_call', data=mock.ANY) def test_published_with_per_meter_topic(self): - publisher = rpc.RPCPublisher( + publisher = msg_publisher.RPCPublisher( netutils.urlsplit('rpc://?per_meter_topic=1')) with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: publisher.publish_samples(mock.MagicMock(), @@ -166,23 +169,29 @@ class TestPublish(tests_base.BaseTestCase): data=MeterGroupMatcher())] self.assertEqual(expected, prepare.mock_calls) + +class TestPublisher(testscenarios.testcase.WithScenarios, + BasePublisherTestCase): + scenarios = [ + ('notifier', dict(protocol="notifier", + publisher_cls=msg_publisher.NotifierPublisher)), + ('rpc', dict(protocol="rpc", + publisher_cls=msg_publisher.RPCPublisher)), + ] + def test_published_concurrency(self): """Test concurrent access to the local queue of the rpc publisher.""" - publisher = rpc.RPCPublisher(netutils.urlsplit('rpc://')) - cast_context = mock.MagicMock() + publisher = self.publisher_cls( + netutils.urlsplit('%s://' % self.protocol)) - with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: - def fake_prepare_go(topic): - return cast_context - - def fake_prepare_wait(topic): - prepare.side_effect = fake_prepare_go + with mock.patch.object(publisher, '_send') as fake_send: + def fake_send_wait(ctxt, topic, meters): + fake_send.side_effect = mock.Mock() # Sleep to simulate concurrency and allow other threads to work eventlet.sleep(0) - return cast_context - prepare.side_effect = fake_prepare_wait + fake_send.side_effect = fake_send_wait job1 = eventlet.spawn(publisher.publish_samples, mock.MagicMock(), self.test_data) @@ -193,16 +202,16 @@ class TestPublish(tests_base.BaseTestCase): job2.wait() self.assertEqual('default', publisher.policy) - self.assertEqual(2, len(cast_context.cast.mock_calls)) + self.assertEqual(2, len(fake_send.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) - @mock.patch('ceilometer.publisher.rpc.LOG') + @mock.patch('ceilometer.publisher.messaging.LOG') def test_published_with_no_policy(self, mylog): - publisher = rpc.RPCPublisher( - netutils.urlsplit('rpc://')) + publisher = self.publisher_cls( + netutils.urlsplit('%s://' % self.protocol)) side_effect = oslo.messaging._drivers.common.RPCException() - with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: - prepare.side_effect = side_effect + with mock.patch.object(publisher, '_send') as fake_send: + fake_send.side_effect = side_effect self.assertRaises( oslo.messaging._drivers.common.RPCException, @@ -211,32 +220,34 @@ class TestPublish(tests_base.BaseTestCase): self.assertTrue(mylog.info.called) self.assertEqual('default', publisher.policy) self.assertEqual(0, len(publisher.local_queue)) - prepare.assert_called_once_with( - topic=self.CONF.publisher_rpc.metering_topic) + fake_send.assert_called_once_with( + mock.ANY, self.CONF.publisher_rpc.metering_topic, + mock.ANY) - @mock.patch('ceilometer.publisher.rpc.LOG') + @mock.patch('ceilometer.publisher.messaging.LOG') def test_published_with_policy_block(self, mylog): - publisher = rpc.RPCPublisher( - netutils.urlsplit('rpc://?policy=default')) + publisher = self.publisher_cls( + netutils.urlsplit('%s://?policy=default' % self.protocol)) side_effect = oslo.messaging._drivers.common.RPCException() - with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: - prepare.side_effect = side_effect + with mock.patch.object(publisher, '_send') as fake_send: + fake_send.side_effect = side_effect self.assertRaises( oslo.messaging._drivers.common.RPCException, publisher.publish_samples, mock.MagicMock(), self.test_data) self.assertTrue(mylog.info.called) self.assertEqual(0, len(publisher.local_queue)) - prepare.assert_called_once_with( - topic=self.CONF.publisher_rpc.metering_topic) + fake_send.assert_called_once_with( + mock.ANY, self.CONF.publisher_rpc.metering_topic, + mock.ANY) - @mock.patch('ceilometer.publisher.rpc.LOG') + @mock.patch('ceilometer.publisher.messaging.LOG') def test_published_with_policy_incorrect(self, mylog): - publisher = rpc.RPCPublisher( - netutils.urlsplit('rpc://?policy=notexist')) + publisher = self.publisher_cls( + netutils.urlsplit('%s://?policy=notexist' % self.protocol)) side_effect = oslo.messaging._drivers.common.RPCException() - with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: - prepare.side_effect = side_effect + with mock.patch.object(publisher, '_send') as fake_send: + fake_send.side_effect = side_effect self.assertRaises( oslo.messaging._drivers.common.RPCException, publisher.publish_samples, @@ -244,66 +255,69 @@ class TestPublish(tests_base.BaseTestCase): self.assertTrue(mylog.warn.called) self.assertEqual('default', publisher.policy) self.assertEqual(0, len(publisher.local_queue)) - prepare.assert_called_once_with( - topic=self.CONF.publisher_rpc.metering_topic) + fake_send.assert_called_once_with( + mock.ANY, self.CONF.publisher_rpc.metering_topic, + mock.ANY) def test_published_with_policy_drop_and_rpc_down(self): - publisher = rpc.RPCPublisher( - netutils.urlsplit('rpc://?policy=drop')) + publisher = self.publisher_cls( + netutils.urlsplit('%s://?policy=drop' % self.protocol)) side_effect = oslo.messaging._drivers.common.RPCException() - with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: - prepare.side_effect = side_effect + with mock.patch.object(publisher, '_send') as fake_send: + fake_send.side_effect = side_effect publisher.publish_samples(mock.MagicMock(), self.test_data) self.assertEqual(0, len(publisher.local_queue)) - prepare.assert_called_once_with( - topic=self.CONF.publisher_rpc.metering_topic) + fake_send.assert_called_once_with( + mock.ANY, self.CONF.publisher_rpc.metering_topic, + mock.ANY) def test_published_with_policy_queue_and_rpc_down(self): - publisher = rpc.RPCPublisher( - netutils.urlsplit('rpc://?policy=queue')) + publisher = self.publisher_cls( + netutils.urlsplit('%s://?policy=queue' % self.protocol)) side_effect = oslo.messaging._drivers.common.RPCException() - with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: - prepare.side_effect = side_effect + with mock.patch.object(publisher, '_send') as fake_send: + fake_send.side_effect = side_effect publisher.publish_samples(mock.MagicMock(), self.test_data) self.assertEqual(1, len(publisher.local_queue)) - prepare.assert_called_once_with( - topic=self.CONF.publisher_rpc.metering_topic) + fake_send.assert_called_once_with( + mock.ANY, self.CONF.publisher_rpc.metering_topic, + mock.ANY) def test_published_with_policy_queue_and_rpc_down_up(self): self.rpc_unreachable = True - publisher = rpc.RPCPublisher( - netutils.urlsplit('rpc://?policy=queue')) + publisher = self.publisher_cls( + netutils.urlsplit('%s://?policy=queue' % self.protocol)) side_effect = oslo.messaging._drivers.common.RPCException() - with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: - prepare.side_effect = side_effect + with mock.patch.object(publisher, '_send') as fake_send: + fake_send.side_effect = side_effect publisher.publish_samples(mock.MagicMock(), self.test_data) self.assertEqual(1, len(publisher.local_queue)) - prepare.side_effect = mock.MagicMock() + fake_send.side_effect = mock.MagicMock() publisher.publish_samples(mock.MagicMock(), self.test_data) self.assertEqual(0, len(publisher.local_queue)) topic = self.CONF.publisher_rpc.metering_topic - expected = [mock.call(topic=topic), - mock.call(topic=topic), - mock.call(topic=topic)] - self.assertEqual(expected, prepare.mock_calls) + expected = [mock.call(mock.ANY, topic, mock.ANY), + mock.call(mock.ANY, topic, mock.ANY), + mock.call(mock.ANY, topic, mock.ANY)] + self.assertEqual(expected, fake_send.mock_calls) def test_published_with_policy_sized_queue_and_rpc_down(self): - publisher = rpc.RPCPublisher( - netutils.urlsplit('rpc://?policy=queue&max_queue_length=3')) + publisher = self.publisher_cls(netutils.urlsplit( + '%s://?policy=queue&max_queue_length=3' % self.protocol)) side_effect = oslo.messaging._drivers.common.RPCException() - with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: - prepare.side_effect = side_effect + with mock.patch.object(publisher, '_send') as fake_send: + fake_send.side_effect = side_effect for i in range(0, 5): for s in self.test_data: s.source = 'test-%d' % i @@ -325,12 +339,12 @@ class TestPublish(tests_base.BaseTestCase): ) def test_published_with_policy_default_sized_queue_and_rpc_down(self): - publisher = rpc.RPCPublisher( - netutils.urlsplit('rpc://?policy=queue')) + publisher = self.publisher_cls( + netutils.urlsplit('%s://?policy=queue' % self.protocol)) side_effect = oslo.messaging._drivers.common.RPCException() - with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: - prepare.side_effect = side_effect + with mock.patch.object(publisher, '_send') as fake_send: + fake_send.side_effect = side_effect for i in range(0, 2000): for s in self.test_data: s.source = 'test-%d' % i diff --git a/ceilometer/tests/test_collector.py b/ceilometer/tests/test_collector.py index ea4d067a..039aa758 100644 --- a/ceilometer/tests/test_collector.py +++ b/ceilometer/tests/test_collector.py @@ -200,7 +200,8 @@ class TestCollector(tests_base.BaseTestCase): """Check that only RPC is started if udp_address is empty.""" self.CONF.set_override('udp_address', '', group='collector') self.srv.start() - self.assertEqual(1, rpc_start.call_count) + # two calls because two servers (notification and rpc) + self.assertEqual(2, rpc_start.call_count) self.assertEqual(0, udp_start.call_count) def test_udp_receive_valid_encoding(self): diff --git a/etc/ceilometer/pipeline.yaml b/etc/ceilometer/pipeline.yaml index 29fd0283..0dd040c2 100644 --- a/etc/ceilometer/pipeline.yaml +++ b/etc/ceilometer/pipeline.yaml @@ -98,7 +98,7 @@ sinks: - name: meter_sink transformers: publishers: - - rpc:// + - notifier:// - name: cpu_sink transformers: - name: "rate_of_change" @@ -109,7 +109,7 @@ sinks: type: "gauge" scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" publishers: - - rpc:// + - notifier:// - name: disk_sink transformers: - name: "rate_of_change" @@ -124,7 +124,7 @@ sinks: unit: "\\1/s" type: "gauge" publishers: - - rpc:// + - notifier:// - name: network_sink transformers: - name: "rate_of_change" @@ -139,4 +139,4 @@ sinks: unit: "\\1/s" type: "gauge" publishers: - - rpc:// + - notifier:// diff --git a/setup.cfg b/setup.cfg index 63bcb368..b809c89a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -212,9 +212,10 @@ ceilometer.transformer = ceilometer.publisher = test = ceilometer.publisher.test:TestPublisher - meter_publisher = ceilometer.publisher.rpc:RPCPublisher - meter = ceilometer.publisher.rpc:RPCPublisher - rpc = ceilometer.publisher.rpc:RPCPublisher + meter_publisher = ceilometer.publisher.messaging:RPCPublisher + meter = ceilometer.publisher.messaging:RPCPublisher + rpc = ceilometer.publisher.messaging:RPCPublisher + notifier = ceilometer.publisher.messaging:NotifierPublisher udp = ceilometer.publisher.udp:UDPPublisher file = ceilometer.publisher.file:FilePublisher