diff --git a/ceilometer/collector.py b/ceilometer/collector.py index d603cfff..05b35305 100644 --- a/ceilometer/collector.py +++ b/ceilometer/collector.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +from itertools import chain import socket import msgpack @@ -45,6 +46,14 @@ OPTS = [ default=False, help='Requeue the event on the collector event queue ' 'when the collector fails to dispatch it.'), + cfg.IntOpt('batch_size', + default=1, + help='Number of notification messages to wait before ' + 'dispatching them'), + cfg.IntOpt('batch_timeout', + default=None, + help='Number of seconds to wait before dispatching samples' + 'when batch_size is not reached (None means indefinitely)'), ] cfg.CONF.register_opts(OPTS, group="collector") @@ -78,21 +87,27 @@ class CollectorService(os_service.Service): if list(self.meter_manager): sample_target = oslo_messaging.Target( topic=cfg.CONF.publisher_notifier.metering_topic) - self.sample_listener = messaging.get_notification_listener( - transport, [sample_target], - [SampleEndpoint(self.meter_manager)], - allow_requeue=(cfg.CONF.collector. - requeue_sample_on_dispatcher_error)) + self.sample_listener = ( + messaging.get_batch_notification_listener( + transport, [sample_target], + [SampleEndpoint(self.meter_manager)], + allow_requeue=(cfg.CONF.collector. + requeue_sample_on_dispatcher_error), + batch_size=cfg.CONF.collector.batch_size, + batch_timeout=cfg.CONF.collector.batch_timeout)) self.sample_listener.start() if cfg.CONF.notification.store_events and list(self.event_manager): event_target = oslo_messaging.Target( topic=cfg.CONF.publisher_notifier.event_topic) - self.event_listener = messaging.get_notification_listener( - transport, [event_target], - [EventEndpoint(self.event_manager)], - allow_requeue=(cfg.CONF.collector. - requeue_event_on_dispatcher_error)) + self.event_listener = ( + messaging.get_batch_notification_listener( + transport, [event_target], + [EventEndpoint(self.event_manager)], + allow_requeue=(cfg.CONF.collector. + requeue_event_on_dispatcher_error), + batch_size=cfg.CONF.collector.batch_size, + batch_timeout=cfg.CONF.collector.batch_timeout)) self.event_listener.start() if not cfg.CONF.collector.udp_address: @@ -147,14 +162,15 @@ class CollectorEndpoint(object): self.dispatcher_manager = dispatcher_manager self.requeue_on_error = requeue_on_error - def sample(self, ctxt, publisher_id, event_type, payload, metadata): + def sample(self, messages): """RPC endpoint for notification messages When another service sends a notification over the message bus, this method receives it. """ + samples = list(chain.from_iterable(m["payload"] for m in messages)) try: - self.dispatcher_manager.map_method(self.method, payload) + self.dispatcher_manager.map_method(self.method, samples) except Exception: if self.requeue_on_error: LOG.exception(_LE("Dispatcher failed to handle the %s, " diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py index c56ea21c..d7580db4 100644 --- a/ceilometer/messaging.py +++ b/ceilometer/messaging.py @@ -89,6 +89,16 @@ def get_notification_listener(transport, targets, endpoints, allow_requeue=allow_requeue) +def get_batch_notification_listener(transport, targets, endpoints, + allow_requeue=False, + batch_size=1, batch_timeout=None): + """Return a configured oslo_messaging notification listener.""" + return oslo_messaging.get_batch_notification_listener( + transport, targets, endpoints, executor='threading', + allow_requeue=allow_requeue, + batch_size=batch_size, batch_timeout=batch_timeout) + + def get_notifier(transport, publisher_id): """Return a configured oslo_messaging notifier.""" notifier = oslo_messaging.Notifier(transport, serializer=_SERIALIZER) diff --git a/ceilometer/tests/functional/test_collector.py b/ceilometer/tests/functional/test_collector.py index b93f0f0e..d701d402 100644 --- a/ceilometer/tests/functional/test_collector.py +++ b/ceilometer/tests/functional/test_collector.py @@ -217,7 +217,7 @@ class TestCollector(tests_base.BaseTestCase): mock_dispatcher.method_calls[0][1][0], "not-so-secret")) - def _test_collector_requeue(self, listener): + def _test_collector_requeue(self, listener, batch_listener=False): mock_dispatcher = self._setup_fake_dispatcher() self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager() @@ -226,7 +226,9 @@ class TestCollector(tests_base.BaseTestCase): self.srv.start() endp = getattr(self.srv, listener).dispatcher.endpoints[0] - ret = endp.sample({}, 'pub_id', 'event', {}, {}) + ret = endp.sample([{'ctxt': {}, 'publisher_id': 'pub_id', + 'event_type': 'event', 'payload': {}, + 'metadata': {}}]) self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret) @@ -257,8 +259,9 @@ class TestCollector(tests_base.BaseTestCase): self.srv.start() endp = getattr(self.srv, listener).dispatcher.endpoints[0] - self.assertRaises(FakeException, endp.sample, {}, 'pub_id', - 'event', {}, {}) + self.assertRaises(FakeException, endp.sample, [ + {'ctxt': {}, 'publisher_id': 'pub_id', 'event_type': 'event', + 'payload': {}, 'metadata': {}}]) @mock.patch.object(oslo_messaging.MessageHandlingServer, 'start', mock.Mock())