From 7fc0d1e6c71198fa865c7cbf9cab6debc7274f4f Mon Sep 17 00:00:00 2001 From: gordon chung Date: Fri, 16 Jan 2015 14:03:13 -0500 Subject: [PATCH] add event listener to collector enable a listener in collector that listens to events queue. this patch also renames 'metering_*' options which are now used by both events and samples. Change-Id: I6250c91e913864c3973e2d93022e38ad8bed8328 Implements: blueprint notification-pipelines --- ceilometer/agent/plugin_base.py | 2 +- ceilometer/collector.py | 126 ++++++++++++++---- ceilometer/dispatcher/database.py | 3 +- ceilometer/dispatcher/http.py | 3 +- ceilometer/event/endpoint.py | 2 +- ceilometer/notification.py | 18 +-- ceilometer/publisher/direct.py | 3 +- ceilometer/publisher/messaging.py | 10 +- ceilometer/publisher/udp.py | 3 +- ceilometer/publisher/utils.py | 6 +- ceilometer/tests/api/v2/test_acl_scenarios.py | 3 +- .../api/v2/test_complex_query_scenarios.py | 3 +- .../api/v2/test_list_events_scenarios.py | 6 +- .../api/v2/test_list_meters_scenarios.py | 3 +- .../api/v2/test_list_resources_scenarios.py | 56 +++----- .../tests/api/v2/test_statistics_scenarios.py | 27 ++-- ceilometer/tests/dispatcher/test_db.py | 9 +- ceilometer/tests/dispatcher/test_file.py | 6 +- ceilometer/tests/dispatcher/test_http.py | 12 +- ceilometer/tests/publisher/test_udp.py | 2 +- .../tests/storage/test_storage_scenarios.py | 16 +-- ceilometer/tests/test_collector.py | 52 ++++++-- ceilometer/utils.py | 9 ++ doc/source/install/manual.rst | 16 +-- tools/make_test_data.py | 3 +- 25 files changed, 225 insertions(+), 174 deletions(-) diff --git a/ceilometer/agent/plugin_base.py b/ceilometer/agent/plugin_base.py index 1ec89a68..0409e906 100644 --- a/ceilometer/agent/plugin_base.py +++ b/ceilometer/agent/plugin_base.py @@ -175,7 +175,7 @@ class NotificationBase(PluginBase): if self.requeue: meters = [ utils.meter_message_from_counter( - sample, cfg.CONF.publisher.metering_secret) + sample, cfg.CONF.publisher.telemetry_secret) for sample in self.process_notification(notification) ] for notifier in self.transporter: diff --git a/ceilometer/collector.py b/ceilometer/collector.py index 46517ccc..6b022cd2 100644 --- a/ceilometer/collector.py +++ b/ceilometer/collector.py @@ -18,13 +18,16 @@ import socket import msgpack import oslo.messaging from oslo_config import cfg +from oslo_utils import timeutils from oslo_utils import units from ceilometer import dispatcher +from ceilometer.event.storage import models from ceilometer import messaging from ceilometer.i18n import _, _LE from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service +from ceilometer import utils OPTS = [ cfg.StrOpt('udp_address', @@ -39,13 +42,21 @@ OPTS = [ help='Requeue the sample on the collector sample queue ' 'when the collector fails to dispatch it. This is only valid ' 'if the sample come from the notifier publisher.'), + cfg.BoolOpt('requeue_event_on_dispatcher_error', + default=False, + help='Requeue the event on the collector event queue ' + 'when the collector fails to dispatch it.'), ] cfg.CONF.register_opts(OPTS, group="collector") cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging', - group="publisher_rpc") + group='publisher_rpc') cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging', - group="publisher_notifier") + group='publisher_notifier') +cfg.CONF.import_opt('event_topic', 'ceilometer.publisher.messaging', + group='publisher_notifier') +cfg.CONF.import_opt('store_events', 'ceilometer.notification', + group='notification') LOG = log.getLogger(__name__) @@ -58,26 +69,38 @@ class CollectorService(os_service.Service): # ensure dispatcher is configured before starting other services self.dispatcher_manager = dispatcher.load_dispatcher_manager() self.rpc_server = None - self.notification_server = None + self.sample_listener = None + self.event_listener = None super(CollectorService, self).start() if cfg.CONF.collector.udp_address: self.tg.add_thread(self.start_udp) - allow_requeue = cfg.CONF.collector.requeue_sample_on_dispatcher_error transport = messaging.get_transport(optional=True) if transport: self.rpc_server = messaging.get_rpc_server( transport, cfg.CONF.publisher_rpc.metering_topic, self) - target = oslo.messaging.Target( + sample_target = oslo.messaging.Target( topic=cfg.CONF.publisher_notifier.metering_topic) - self.notification_server = messaging.get_notification_listener( - transport, [target], [self], - allow_requeue=allow_requeue) + self.sample_listener = messaging.get_notification_listener( + transport, [sample_target], + [SampleEndpoint(self.dispatcher_manager)], + allow_requeue=(cfg.CONF.collector. + requeue_sample_on_dispatcher_error)) + + if cfg.CONF.notification.store_events: + event_target = oslo.messaging.Target( + topic=cfg.CONF.publisher_notifier.event_topic) + self.event_listener = messaging.get_notification_listener( + transport, [event_target], + [EventEndpoint(self.dispatcher_manager)], + allow_requeue=(cfg.CONF.collector. + requeue_event_on_dispatcher_error)) + self.event_listener.start() self.rpc_server.start() - self.notification_server.start() + self.sample_listener.start() if not cfg.CONF.collector.udp_address: # Add a dummy thread to have wait() working @@ -110,27 +133,12 @@ class CollectorService(os_service.Service): self.udp_run = False if self.rpc_server: self.rpc_server.stop() - if self.notification_server: - self.notification_server.stop() + if self.sample_listener: + utils.kill_listeners([self.sample_listener]) + if self.event_listener: + utils.kill_listeners([self.event_listener]) super(CollectorService, self).stop() - def sample(self, ctxt, publisher_id, event_type, payload, metadata): - """RPC endpoint for notification messages - - When another service sends a notification over the message - bus, this method receives it. - - """ - try: - self.dispatcher_manager.map_method('record_metering_data', - data=payload) - except Exception: - if cfg.CONF.collector.requeue_sample_on_dispatcher_error: - LOG.exception(_LE("Dispatcher failed to handle the sample, " - "requeue it.")) - return oslo.messaging.NotificationResult.REQUEUE - raise - def record_metering_data(self, context, data): """RPC endpoint for messages we send to ourselves. @@ -138,3 +146,65 @@ class CollectorService(os_service.Service): RPC publisher, this method receives them for processing. """ self.dispatcher_manager.map_method('record_metering_data', data=data) + + +class CollectorEndpoint(object): + def __init__(self, dispatcher_manager, requeue_on_error): + self.dispatcher_manager = dispatcher_manager + self.requeue_on_error = requeue_on_error + + def sample(self, ctxt, publisher_id, event_type, payload, metadata): + """RPC endpoint for notification messages + + When another service sends a notification over the message + bus, this method receives it. + """ + try: + self.dispatcher_manager.map_method(self.method, payload) + except Exception: + if self.requeue_on_error: + LOG.exception(_LE("Dispatcher failed to handle the %s, " + "requeue it."), self.ep_type) + return oslo.messaging.NotificationResult.REQUEUE + raise + + +class SampleEndpoint(CollectorEndpoint): + method = 'record_metering_data' + ep_type = 'sample' + + def __init__(self, dispatcher_manager): + super(SampleEndpoint, self).__init__( + dispatcher_manager, + cfg.CONF.collector.requeue_sample_on_dispatcher_error) + + +class EventEndpoint(CollectorEndpoint): + method = 'record_events' + ep_type = 'event' + + def __init__(self, dispatcher_manager): + super(EventEndpoint, self).__init__( + dispatcher_manager, + cfg.CONF.collector.requeue_event_on_dispatcher_error) + + def sample(self, ctxt, publisher_id, event_type, payload, metadata): + events = [] + for ev in payload: + try: + events.append( + models.Event( + message_id=ev['message_id'], + event_type=ev['event_type'], + generated=timeutils.normalize_time( + timeutils.parse_isotime(ev['generated'])), + traits=[models.Trait( + name, dtype, + models.Trait.convert_value(dtype, value)) + for name, dtype, value in ev['traits']]) + ) + except Exception: + LOG.exception(_LE("Error processing event and it will be " + "dropped: %s"), ev) + return super(EventEndpoint, self).sample( + ctxt, publisher_id, event_type, events, metadata) diff --git a/ceilometer/dispatcher/database.py b/ceilometer/dispatcher/database.py index ea555765..2a80bb8d 100644 --- a/ceilometer/dispatcher/database.py +++ b/ceilometer/dispatcher/database.py @@ -78,8 +78,7 @@ class DatabaseDispatcher(dispatcher.Base): 'timestamp': meter.get('timestamp', 'NO TIMESTAMP'), 'counter_volume': meter['counter_volume']})) if publisher_utils.verify_signature( - meter, - self.conf.publisher.metering_secret): + meter, self.conf.publisher.telemetry_secret): try: # Convert the timestamp to a datetime instance. # Storage engines are responsible for converting diff --git a/ceilometer/dispatcher/http.py b/ceilometer/dispatcher/http.py index 430c6b4b..3edcac90 100755 --- a/ceilometer/dispatcher/http.py +++ b/ceilometer/dispatcher/http.py @@ -85,8 +85,7 @@ class HttpDispatcher(dispatcher.Base): 'timestamp': meter.get('timestamp', 'NO TIMESTAMP'), 'counter_volume': meter['counter_volume']})) if publisher_utils.verify_signature( - meter, - self.conf.publisher.metering_secret): + meter, self.conf.publisher.telemetry_secret): try: if self.cadf_only: # Only cadf messages are being wanted. diff --git a/ceilometer/event/endpoint.py b/ceilometer/event/endpoint.py index 65cc7f6a..422e36db 100644 --- a/ceilometer/event/endpoint.py +++ b/ceilometer/event/endpoint.py @@ -68,7 +68,7 @@ class EventsNotificationEndpoint(object): self.ctxt.to_dict(), event_type='pipeline.event', payload=[utils.message_from_event( - event, cfg.CONF.publisher.metering_secret)]) + event, cfg.CONF.publisher.telemetry_secret)]) else: with self.transporter.publisher(self.ctxt) as p: p(event) diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 09fa8382..2b96455e 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -25,6 +25,7 @@ from ceilometer import messaging from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import pipeline +from ceilometer import utils LOG = log.getLogger(__name__) @@ -52,7 +53,7 @@ OPTS = [ ] cfg.CONF.register_opts(OPTS, group="notification") -cfg.CONF.import_opt('metering_driver', 'ceilometer.publisher.messaging', +cfg.CONF.import_opt('telemetry_driver', 'ceilometer.publisher.messaging', group='publisher_notifier') @@ -80,7 +81,7 @@ class NotificationService(os_service.Service): def _get_notifier(self, transport, pipe): return oslo.messaging.Notifier( transport, - driver=cfg.CONF.publisher_notifier.metering_driver, + driver=cfg.CONF.publisher_notifier.telemetry_driver, publisher_id='ceilometer.notification', topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name)) @@ -174,17 +175,8 @@ class NotificationService(os_service.Service): listener.start() self.listeners.append(listener) - @staticmethod - def _kill_listeners(listeners): - # NOTE(gordc): correct usage of oslo.messaging listener is to stop(), - # which stops new messages, and wait(), which processes remaining - # messages and closes connection - for listener in listeners: - listener.stop() - listener.wait() - def _refresh_agent(self, event): - self._kill_listeners(self.pipeline_listeners) + utils.kill_listeners(self.pipeline_listeners) self._configure_pipeline_listeners() def _configure_pipeline_listeners(self): @@ -210,5 +202,5 @@ class NotificationService(os_service.Service): def stop(self): self.partition_coordinator.leave_group(self.group_id) - self._kill_listeners(self.listeners + self.pipeline_listeners) + utils.kill_listeners(self.listeners + self.pipeline_listeners) super(NotificationService, self).stop() diff --git a/ceilometer/publisher/direct.py b/ceilometer/publisher/direct.py index 7b3d071b..14180ae7 100644 --- a/ceilometer/publisher/direct.py +++ b/ceilometer/publisher/direct.py @@ -42,8 +42,7 @@ class DirectPublisher(publisher.PublisherBase): # Transform the Sample objects into a list of dicts meters = [ utils.meter_message_from_counter( - sample, - cfg.CONF.publisher.metering_secret) + sample, cfg.CONF.publisher.telemetry_secret) for sample in samples ] diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py index 2e4d69fc..0ccbb69b 100644 --- a/ceilometer/publisher/messaging.py +++ b/ceilometer/publisher/messaging.py @@ -52,10 +52,11 @@ NOTIFIER_OPTS = [ help='The topic that ceilometer uses for event ' 'notifications.', ), - cfg.StrOpt('metering_driver', + cfg.StrOpt('telemetry_driver', default='messagingv2', help='The driver that ceilometer uses for metering ' 'notifications.', + deprecated_name='metering_driver', ) ] @@ -102,8 +103,7 @@ class MessagingPublisher(publisher.PublisherBase): meters = [ utils.meter_message_from_counter( - sample, - cfg.CONF.publisher.metering_secret) + sample, cfg.CONF.publisher.telemetry_secret) for sample in samples ] topic = cfg.CONF.publisher_rpc.metering_topic @@ -171,7 +171,7 @@ class MessagingPublisher(publisher.PublisherBase): :param events: events from pipeline after transformation """ ev_list = [utils.message_from_event( - event, cfg.CONF.publisher.metering_secret) for event in events] + event, cfg.CONF.publisher.telemetry_secret) for event in events] topic = cfg.CONF.publisher_notifier.event_topic self.local_queue.append((context, topic, ev_list)) @@ -204,7 +204,7 @@ class NotifierPublisher(MessagingPublisher): super(NotifierPublisher, self).__init__(parsed_url) self.notifier = oslo.messaging.Notifier( messaging.get_transport(), - driver=cfg.CONF.publisher_notifier.metering_driver, + driver=cfg.CONF.publisher_notifier.telemetry_driver, publisher_id='telemetry.publisher.%s' % cfg.CONF.host, topic=topic, retry=self.retry diff --git a/ceilometer/publisher/udp.py b/ceilometer/publisher/udp.py index 4da2de8e..718161f0 100644 --- a/ceilometer/publisher/udp.py +++ b/ceilometer/publisher/udp.py @@ -50,8 +50,7 @@ class UDPPublisher(publisher.PublisherBase): for sample in samples: msg = utils.meter_message_from_counter( - sample, - cfg.CONF.publisher.metering_secret) + sample, cfg.CONF.publisher.telemetry_secret) host = self.host port = self.port LOG.debug(_("Publishing sample %(msg)s over UDP to " diff --git a/ceilometer/publisher/utils.py b/ceilometer/publisher/utils.py index 21ddbd86..824a3b21 100644 --- a/ceilometer/publisher/utils.py +++ b/ceilometer/publisher/utils.py @@ -24,14 +24,16 @@ import six from ceilometer import utils OPTS = [ - cfg.StrOpt('metering_secret', + cfg.StrOpt('telemetry_secret', secret=True, default='change this or be hacked', help='Secret value for signing metering messages.', deprecated_opts=[cfg.DeprecatedOpt("metering_secret", "DEFAULT"), cfg.DeprecatedOpt("metering_secret", - "publisher_rpc")] + "publisher_rpc"), + cfg.DeprecatedOpt("metering_secret", + "publisher")] ), ] cfg.CONF.register_opts(OPTS, group="publisher") diff --git a/ceilometer/tests/api/v2/test_acl_scenarios.py b/ceilometer/tests/api/v2/test_acl_scenarios.py index 551f5045..b00ec7b1 100644 --- a/ceilometer/tests/api/v2/test_acl_scenarios.py +++ b/ceilometer/tests/api/v2/test_acl_scenarios.py @@ -101,8 +101,7 @@ class TestAPIACL(v2.FunctionalTest, 'tag': 'self.sample4'}, source='test_source')]: msg = utils.meter_message_from_counter( - cnt, - self.CONF.publisher.metering_secret) + cnt, self.CONF.publisher.telemetry_secret) self.conn.record_metering_data(msg) def get_json(self, path, expect_errors=False, headers=None, diff --git a/ceilometer/tests/api/v2/test_complex_query_scenarios.py b/ceilometer/tests/api/v2/test_complex_query_scenarios.py index 444ac4b3..32bd13b2 100644 --- a/ceilometer/tests/api/v2/test_complex_query_scenarios.py +++ b/ceilometer/tests/api/v2/test_complex_query_scenarios.py @@ -89,8 +89,7 @@ class TestQueryMetersController(tests_api.FunctionalTest, source='test_source')]: msg = utils.meter_message_from_counter( - cnt, - self.CONF.publisher.metering_secret) + cnt, self.CONF.publisher.telemetry_secret) self.conn.record_metering_data(msg) def test_query_fields_are_optional(self): diff --git a/ceilometer/tests/api/v2/test_list_events_scenarios.py b/ceilometer/tests/api/v2/test_list_events_scenarios.py index a3253d6f..5aff5db0 100644 --- a/ceilometer/tests/api/v2/test_list_events_scenarios.py +++ b/ceilometer/tests/api/v2/test_list_events_scenarios.py @@ -54,8 +54,7 @@ class TestListEvents(v2.FunctionalTest, source='test_source', ) msg = utils.meter_message_from_counter( - self.sample1, - self.CONF.publisher.metering_secret, + self.sample1, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -74,8 +73,7 @@ class TestListEvents(v2.FunctionalTest, source='source2', ) msg2 = utils.meter_message_from_counter( - self.sample2, - self.CONF.publisher.metering_secret, + self.sample2, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg2) diff --git a/ceilometer/tests/api/v2/test_list_meters_scenarios.py b/ceilometer/tests/api/v2/test_list_meters_scenarios.py index fbc5016c..37b2c788 100644 --- a/ceilometer/tests/api/v2/test_list_meters_scenarios.py +++ b/ceilometer/tests/api/v2/test_list_meters_scenarios.py @@ -177,8 +177,7 @@ class TestListMeters(v2.FunctionalTest, resource_metadata={}, source='test_source1')]: msg = utils.meter_message_from_counter( - cnt, - self.CONF.publisher.metering_secret) + cnt, self.CONF.publisher.telemetry_secret) self.messages.append(msg) self.conn.record_metering_data(msg) diff --git a/ceilometer/tests/api/v2/test_list_resources_scenarios.py b/ceilometer/tests/api/v2/test_list_resources_scenarios.py index cf95205b..eef10ba1 100644 --- a/ceilometer/tests/api/v2/test_list_resources_scenarios.py +++ b/ceilometer/tests/api/v2/test_list_resources_scenarios.py @@ -61,8 +61,7 @@ class TestListResources(v2.FunctionalTest, source='test', ) msg = utils.meter_message_from_counter( - sample1, - self.CONF.publisher.metering_secret, + sample1, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -90,8 +89,7 @@ class TestListResources(v2.FunctionalTest, source='test', ) msg = utils.meter_message_from_counter( - sample1, - self.CONF.publisher.metering_secret, + sample1, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -110,8 +108,7 @@ class TestListResources(v2.FunctionalTest, source='test', ) msg2 = utils.meter_message_from_counter( - sample2, - self.CONF.publisher.metering_secret, + sample2, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg2) @@ -144,7 +141,7 @@ class TestListResources(v2.FunctionalTest, ) msg = utils.meter_message_from_counter( datapoint, - self.CONF.publisher.metering_secret, + self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -169,8 +166,7 @@ class TestListResources(v2.FunctionalTest, source='test', ) msg = utils.meter_message_from_counter( - sample1, - self.CONF.publisher.metering_secret, + sample1, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -189,8 +185,7 @@ class TestListResources(v2.FunctionalTest, source='test', ) msg2 = utils.meter_message_from_counter( - sample2, - self.CONF.publisher.metering_secret, + sample2, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg2) @@ -213,8 +208,7 @@ class TestListResources(v2.FunctionalTest, source='test_list_resources', ) msg = utils.meter_message_from_counter( - sample1, - self.CONF.publisher.metering_secret, + sample1, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -233,8 +227,7 @@ class TestListResources(v2.FunctionalTest, source='not-test', ) msg2 = utils.meter_message_from_counter( - sample2, - self.CONF.publisher.metering_secret, + sample2, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg2) @@ -262,8 +255,7 @@ class TestListResources(v2.FunctionalTest, source='test_list_resources', ) msg = utils.meter_message_from_counter( - sample1, - self.CONF.publisher.metering_secret, + sample1, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -282,8 +274,7 @@ class TestListResources(v2.FunctionalTest, source='test_list_resources', ) msg2 = utils.meter_message_from_counter( - sample2, - self.CONF.publisher.metering_secret, + sample2, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg2) @@ -315,8 +306,7 @@ class TestListResources(v2.FunctionalTest, source='test_list_resources', ) msg = utils.meter_message_from_counter( - sample1, - self.CONF.publisher.metering_secret, + sample1, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -335,8 +325,7 @@ class TestListResources(v2.FunctionalTest, source='not-test', ) msg2 = utils.meter_message_from_counter( - sample2, - self.CONF.publisher.metering_secret, + sample2, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg2) @@ -362,8 +351,7 @@ class TestListResources(v2.FunctionalTest, source='test_list_resources', ) msg = utils.meter_message_from_counter( - sample1, - self.CONF.publisher.metering_secret, + sample1, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -382,8 +370,7 @@ class TestListResources(v2.FunctionalTest, source='not-test', ) msg2 = utils.meter_message_from_counter( - sample2, - self.CONF.publisher.metering_secret, + sample2, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg2) @@ -409,8 +396,7 @@ class TestListResources(v2.FunctionalTest, source='not-test', ) msg2 = utils.meter_message_from_counter( - sample1, - self.CONF.publisher.metering_secret, + sample1, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg2) @@ -436,8 +422,7 @@ class TestListResources(v2.FunctionalTest, source='not-test', ) msg2 = utils.meter_message_from_counter( - sample1, - self.CONF.publisher.metering_secret, + sample1, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg2) @@ -465,8 +450,7 @@ class TestListResources(v2.FunctionalTest, source='test', ) msg = utils.meter_message_from_counter( - sample1, - self.CONF.publisher.metering_secret, + sample1, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -494,8 +478,7 @@ class TestListResources(v2.FunctionalTest, source='test_list_resources', ) msg = utils.meter_message_from_counter( - sample1, - self.CONF.publisher.metering_secret, + sample1, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -526,8 +509,7 @@ class TestListResources(v2.FunctionalTest, source='test_list_resources', ) msg = utils.meter_message_from_counter( - sample1, - self.CONF.publisher.metering_secret, + sample1, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) diff --git a/ceilometer/tests/api/v2/test_statistics_scenarios.py b/ceilometer/tests/api/v2/test_statistics_scenarios.py index d36fd462..aa1e53e7 100644 --- a/ceilometer/tests/api/v2/test_statistics_scenarios.py +++ b/ceilometer/tests/api/v2/test_statistics_scenarios.py @@ -45,8 +45,7 @@ class TestMaxProjectVolume(v2.FunctionalTest, source='source1', ) msg = utils.meter_message_from_counter( - s, - self.CONF.publisher.metering_secret, + s, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -143,8 +142,7 @@ class TestMaxResourceVolume(v2.FunctionalTest, source='source1', ) msg = utils.meter_message_from_counter( - s, - self.CONF.publisher.metering_secret, + s, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -273,8 +271,7 @@ class TestSumProjectVolume(v2.FunctionalTest, source='source1', ) msg = utils.meter_message_from_counter( - s, - self.CONF.publisher.metering_secret, + s, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -373,8 +370,7 @@ class TestSumResourceVolume(v2.FunctionalTest, source='source1', ) msg = utils.meter_message_from_counter( - s, - self.CONF.publisher.metering_secret, + s, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -530,8 +526,7 @@ class TestGroupByInstance(v2.FunctionalTest, source=test_sample['source'], ) msg = utils.meter_message_from_counter( - c, - self.CONF.publisher.metering_secret, + c, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -1281,8 +1276,7 @@ class TestGroupBySource(v2.FunctionalTest, source=test_sample['source'], ) msg = utils.meter_message_from_counter( - c, - self.CONF.publisher.metering_secret, + c, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -1383,8 +1377,7 @@ class TestSelectableAggregates(v2.FunctionalTest, source=test_sample['source'], ) msg = utils.meter_message_from_counter( - c, - self.CONF.publisher.metering_secret, + c, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -1482,8 +1475,7 @@ class TestSelectableAggregates(v2.FunctionalTest, source='source', ) msg = utils.meter_message_from_counter( - s, - self.CONF.publisher.metering_secret, + s, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -1635,8 +1627,7 @@ class TestUnparameterizedAggregates(v2.FunctionalTest, source=test_sample['source'], ) msg = utils.meter_message_from_counter( - c, - self.CONF.publisher.metering_secret, + c, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) diff --git a/ceilometer/tests/dispatcher/test_db.py b/ceilometer/tests/dispatcher/test_db.py index 906cf513..95a603f2 100644 --- a/ceilometer/tests/dispatcher/test_db.py +++ b/ceilometer/tests/dispatcher/test_db.py @@ -48,8 +48,7 @@ class TestDispatcherDB(base.BaseTestCase): 'counter_volume': 1, } msg['message_signature'] = utils.compute_signature( - msg, - self.CONF.publisher.metering_secret, + msg, self.CONF.publisher.telemetry_secret, ) with mock.patch.object(self.dispatcher.meter_conn, @@ -85,8 +84,7 @@ class TestDispatcherDB(base.BaseTestCase): 'timestamp': '2012-07-02T13:53:40Z', } msg['message_signature'] = utils.compute_signature( - msg, - self.CONF.publisher.metering_secret, + msg, self.CONF.publisher.telemetry_secret, ) expected = msg.copy() @@ -105,8 +103,7 @@ class TestDispatcherDB(base.BaseTestCase): 'timestamp': '2012-09-30T15:31:50.262-08:00', } msg['message_signature'] = utils.compute_signature( - msg, - self.CONF.publisher.metering_secret, + msg, self.CONF.publisher.telemetry_secret, ) expected = msg.copy() diff --git a/ceilometer/tests/dispatcher/test_file.py b/ceilometer/tests/dispatcher/test_file.py index df3b5edb..68617301 100644 --- a/ceilometer/tests/dispatcher/test_file.py +++ b/ceilometer/tests/dispatcher/test_file.py @@ -52,8 +52,7 @@ class TestDispatcherFile(base.BaseTestCase): 'counter_volume': 1, } msg['message_signature'] = utils.compute_signature( - msg, - self.CONF.publisher.metering_secret, + msg, self.CONF.publisher.telemetry_secret, ) # The record_metering_data method should exist and not produce errors. @@ -84,8 +83,7 @@ class TestDispatcherFile(base.BaseTestCase): 'counter_volume': 1, } msg['message_signature'] = utils.compute_signature( - msg, - self.CONF.publisher.metering_secret, + msg, self.CONF.publisher.telemetry_secret, ) # The record_metering_data method should exist and not produce errors. diff --git a/ceilometer/tests/dispatcher/test_http.py b/ceilometer/tests/dispatcher/test_http.py index 10ced3e6..5d728dac 100755 --- a/ceilometer/tests/dispatcher/test_http.py +++ b/ceilometer/tests/dispatcher/test_http.py @@ -32,8 +32,7 @@ class TestDispatcherHttp(base.BaseTestCase): 'counter_volume': 1, } self.msg['message_signature'] = utils.compute_signature( - self.msg, - self.CONF.publisher.metering_secret, + self.msg, self.CONF.publisher.telemetry_secret, ) def test_http_dispatcher_config_options(self): @@ -78,8 +77,7 @@ class TestDispatcherHttp(base.BaseTestCase): self.msg['resource_metadata'] = {'request': {'NONE_CADF_EVENT': { 'q1': 'v1', 'q2': 'v2'}, }, } self.msg['message_signature'] = utils.compute_signature( - self.msg, - self.CONF.publisher.metering_secret, + self.msg, self.CONF.publisher.telemetry_secret, ) with mock.patch.object(requests, 'post') as post: @@ -97,8 +95,7 @@ class TestDispatcherHttp(base.BaseTestCase): self.msg['resource_metadata'] = {'request': {'CADF_EVENT': { 'q1': 'v1', 'q2': 'v2'}, }, } self.msg['message_signature'] = utils.compute_signature( - self.msg, - self.CONF.publisher.metering_secret, + self.msg, self.CONF.publisher.telemetry_secret, ) with mock.patch.object(requests, 'post') as post: @@ -114,8 +111,7 @@ class TestDispatcherHttp(base.BaseTestCase): self.msg['resource_metadata'] = {'any': {'thing1': 'v1', 'thing2': 'v2', }, } self.msg['message_signature'] = utils.compute_signature( - self.msg, - self.CONF.publisher.metering_secret, + self.msg, self.CONF.publisher.telemetry_secret, ) with mock.patch.object(requests, 'post') as post: diff --git a/ceilometer/tests/publisher/test_udp.py b/ceilometer/tests/publisher/test_udp.py index aa764943..0f6ad059 100644 --- a/ceilometer/tests/publisher/test_udp.py +++ b/ceilometer/tests/publisher/test_udp.py @@ -110,7 +110,7 @@ class TestUDPPublisher(base.BaseTestCase): def setUp(self): super(TestUDPPublisher, self).setUp() self.CONF = self.useFixture(fixture_config.Config()).conf - self.CONF.publisher.metering_secret = 'not-so-secret' + self.CONF.publisher.telemetry_secret = 'not-so-secret' def test_published(self): self.data_sent = [] diff --git a/ceilometer/tests/storage/test_storage_scenarios.py b/ceilometer/tests/storage/test_storage_scenarios.py index 9637cfc2..1776f4d9 100644 --- a/ceilometer/tests/storage/test_storage_scenarios.py +++ b/ceilometer/tests/storage/test_storage_scenarios.py @@ -51,7 +51,7 @@ class DBTestBase(tests_db.TestBase): resource_metadata=metadata, source=source ) msg = utils.meter_message_from_counter( - s, self.CONF.publisher.metering_secret + s, self.CONF.publisher.telemetry_secret ) self.conn.record_metering_data(msg) return msg @@ -1409,8 +1409,7 @@ class StatisticsGroupByTest(DBTestBase, source=test_sample['source'], ) msg = utils.meter_message_from_counter( - c, - self.CONF.publisher.metering_secret, + c, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -2608,8 +2607,7 @@ class CounterDataTypeTest(DBTestBase, source='test-1', ) msg = utils.meter_message_from_counter( - c, - self.CONF.publisher.metering_secret, + c, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -2627,8 +2625,7 @@ class CounterDataTypeTest(DBTestBase, source='test-1', ) msg = utils.meter_message_from_counter( - c, - self.CONF.publisher.metering_secret, + c, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -2645,8 +2642,7 @@ class CounterDataTypeTest(DBTestBase, source='test-1', ) msg = utils.meter_message_from_counter( - c, - self.CONF.publisher.metering_secret, + c, self.CONF.publisher.telemetry_secret, ) self.conn.record_metering_data(msg) @@ -3492,7 +3488,7 @@ class BigIntegerTest(tests_db.TestBase, timestamp=datetime.datetime.utcnow(), resource_metadata=metadata) msg = utils.meter_message_from_counter( - s, self.CONF.publisher.metering_secret) + s, self.CONF.publisher.telemetry_secret) self.conn.record_metering_data(msg) diff --git a/ceilometer/tests/test_collector.py b/ceilometer/tests/test_collector.py index b6ff66fb..15178bb4 100644 --- a/ceilometer/tests/test_collector.py +++ b/ceilometer/tests/test_collector.py @@ -47,7 +47,7 @@ class TestCollector(tests_base.BaseTestCase): self.CONF = self.useFixture(fixture_config.Config()).conf self.CONF.import_opt("connection", "oslo.db.options", group="database") self.CONF.set_override("connection", "log://", group='database') - self.CONF.set_override('metering_secret', 'not-so-secret', + self.CONF.set_override('telemetry_secret', 'not-so-secret', group='publisher') self._setup_messaging() @@ -230,25 +230,53 @@ class TestCollector(tests_base.BaseTestCase): mylog.info.assert_called_once_with( 'metering data test for test_run_tasks: 1') - @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start') - @mock.patch.object(collector.CollectorService, 'start_udp') - def test_collector_requeue(self, udp_start, rpc_start): - self.CONF.set_override('requeue_sample_on_dispatcher_error', True, - group='collector') + def _test_collector_requeue(self, listener): self.srv.start() with mock.patch.object(self.srv.dispatcher_manager, 'map_method', side_effect=Exception('boom')): - ret = self.srv.sample({}, 'pub_id', 'event', {}, {}) + endp = getattr(self.srv, listener).dispatcher.endpoints[0] + ret = endp.sample({}, 'pub_id', 'event', {}, {}) self.assertEqual(oslo.messaging.NotificationResult.REQUEUE, ret) - @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start') - @mock.patch.object(collector.CollectorService, 'start_udp') - def test_collector_no_requeue(self, udp_start, rpc_start): - self.CONF.set_override('requeue_sample_on_dispatcher_error', False, + @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start', + mock.Mock()) + @mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock()) + def test_collector_sample_requeue(self): + self.CONF.set_override('requeue_sample_on_dispatcher_error', True, group='collector') + self._test_collector_requeue('sample_listener') + + @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start', + mock.Mock()) + @mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock()) + def test_collector_event_requeue(self): + self.CONF.set_override('requeue_event_on_dispatcher_error', True, + group='collector') + self.CONF.set_override('store_events', True, group='notification') + self._test_collector_requeue('event_listener') + + def _test_collector_no_requeue(self, listener): self.srv.start() with mock.patch.object(self.srv.dispatcher_manager, 'map_method', side_effect=FakeException('boom')): - self.assertRaises(FakeException, self.srv.sample, {}, 'pub_id', + endp = getattr(self.srv, listener).dispatcher.endpoints[0] + self.assertRaises(FakeException, endp.sample, {}, 'pub_id', 'event', {}, {}) + + @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start', + mock.Mock()) + @mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock()) + def test_collector_sample_no_requeue(self): + self.CONF.set_override('requeue_sample_on_dispatcher_error', False, + group='collector') + self._test_collector_no_requeue('sample_listener') + + @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start', + mock.Mock()) + @mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock()) + def test_collector_event_no_requeue(self): + self.CONF.set_override('requeue_event_on_dispatcher_error', False, + group='collector') + self.CONF.set_override('store_events', True, group='notification') + self._test_collector_no_requeue('event_listener') diff --git a/ceilometer/utils.py b/ceilometer/utils.py index 8701eb96..237cae41 100644 --- a/ceilometer/utils.py +++ b/ceilometer/utils.py @@ -249,3 +249,12 @@ class HashRing(object): return None pos = self._get_position_on_ring(key) return self._ring[self._sorted_keys[pos]] + + +def kill_listeners(listeners): + # NOTE(gordc): correct usage of oslo.messaging listener is to stop(), + # which stops new messages, and wait(), which processes remaining + # messages and closes connection + for listener in listeners: + listener.stop() + listener.wait() diff --git a/doc/source/install/manual.rst b/doc/source/install/manual.rst index f0c665d7..3ee5a138 100755 --- a/doc/source/install/manual.rst +++ b/doc/source/install/manual.rst @@ -217,9 +217,9 @@ Installing the notification agent not yet been tested with ZeroMQ. We recommend using Rabbit or qpid for now. - 2. Set the ``metering_secret`` value. + 2. Set the ``telemetry_secret`` value. - Set the ``metering_secret`` value to a large, random, value. Use + Set the ``telemetry_secret`` value to a large, random, value. Use the same value in all ceilometer configuration files, on all nodes, so that messages passing between the nodes can be validated. @@ -289,9 +289,9 @@ Installing the collector not yet been tested with ZeroMQ. We recommend using Rabbit or qpid for now. - 2. Set the ``metering_secret`` value. + 2. Set the ``telemetry_secret`` value. - Set the ``metering_secret`` value to a large, random, value. Use + Set the ``telemetry_secret`` value to a large, random, value. Use the same value in all ceilometer configuration files, on all nodes, so that messages passing between the nodes can be validated. @@ -372,9 +372,9 @@ Installing the Compute Agent not yet been tested with ZeroMQ. We recommend using Rabbit or qpid for now. - 2. Set the ``metering_secret`` value. + 2. Set the ``telemetry_secret`` value. - Set the ``metering_secret`` value to a large, random, value. Use + Set the ``telemetry_secret`` value to a large, random, value. Use the same value in all ceilometer configuration files, on all nodes, so that messages passing between the nodes can be validated. @@ -446,9 +446,9 @@ Installing the Central Agent not yet been tested with ZeroMQ. We recommend using Rabbit or qpid for now. - 2. Set the ``metering_secret`` value. + 2. Set the ``telemetry_secret`` value. - Set the ``metering_secret`` value to a large, random, value. Use + Set the ``telemetry_secret`` value to a large, random, value. Use the same value in all ceilometer configuration files, on all nodes, so that messages passing between the nodes can be validated. diff --git a/tools/make_test_data.py b/tools/make_test_data.py index 347eba1c..55042fb5 100755 --- a/tools/make_test_data.py +++ b/tools/make_test_data.py @@ -81,8 +81,7 @@ def make_test_data(name, meter_type, unit, volume, random_min, source=source, ) data = utils.meter_message_from_counter( - c, - cfg.CONF.publisher.metering_secret) + c, cfg.CONF.publisher.telemetry_secret) yield data n += 1