diff --git a/ceilometer/collector.py b/ceilometer/collector.py
index d603cfff82..05b3530572 100644
--- a/ceilometer/collector.py
+++ b/ceilometer/collector.py
@@ -13,6 +13,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+from itertools import chain
 import socket
 
 import msgpack
@@ -45,6 +46,14 @@ OPTS = [
                 default=False,
                 help='Requeue the event on the collector event queue '
                 'when the collector fails to dispatch it.'),
+    cfg.IntOpt('batch_size',
+               default=1,
+               help='Number of notification messages to wait before '
+               'dispatching them'),
+    cfg.IntOpt('batch_timeout',
+               default=None,
+               help='Number of seconds to wait before dispatching samples'
+               'when batch_size is not reached (None means indefinitely)'),
 ]
 
 cfg.CONF.register_opts(OPTS, group="collector")
@@ -78,21 +87,27 @@ class CollectorService(os_service.Service):
             if list(self.meter_manager):
                 sample_target = oslo_messaging.Target(
                     topic=cfg.CONF.publisher_notifier.metering_topic)
-                self.sample_listener = messaging.get_notification_listener(
-                    transport, [sample_target],
-                    [SampleEndpoint(self.meter_manager)],
-                    allow_requeue=(cfg.CONF.collector.
-                                   requeue_sample_on_dispatcher_error))
+                self.sample_listener = (
+                    messaging.get_batch_notification_listener(
+                        transport, [sample_target],
+                        [SampleEndpoint(self.meter_manager)],
+                        allow_requeue=(cfg.CONF.collector.
+                                       requeue_sample_on_dispatcher_error),
+                        batch_size=cfg.CONF.collector.batch_size,
+                        batch_timeout=cfg.CONF.collector.batch_timeout))
                 self.sample_listener.start()
 
             if cfg.CONF.notification.store_events and list(self.event_manager):
                 event_target = oslo_messaging.Target(
                     topic=cfg.CONF.publisher_notifier.event_topic)
-                self.event_listener = messaging.get_notification_listener(
-                    transport, [event_target],
-                    [EventEndpoint(self.event_manager)],
-                    allow_requeue=(cfg.CONF.collector.
-                                   requeue_event_on_dispatcher_error))
+                self.event_listener = (
+                    messaging.get_batch_notification_listener(
+                        transport, [event_target],
+                        [EventEndpoint(self.event_manager)],
+                        allow_requeue=(cfg.CONF.collector.
+                                       requeue_event_on_dispatcher_error),
+                        batch_size=cfg.CONF.collector.batch_size,
+                        batch_timeout=cfg.CONF.collector.batch_timeout))
                 self.event_listener.start()
 
             if not cfg.CONF.collector.udp_address:
@@ -147,14 +162,15 @@ class CollectorEndpoint(object):
         self.dispatcher_manager = dispatcher_manager
         self.requeue_on_error = requeue_on_error
 
-    def sample(self, ctxt, publisher_id, event_type, payload, metadata):
+    def sample(self, messages):
         """RPC endpoint for notification messages
 
         When another service sends a notification over the message
         bus, this method receives it.
         """
+        samples = list(chain.from_iterable(m["payload"] for m in messages))
         try:
-            self.dispatcher_manager.map_method(self.method, payload)
+            self.dispatcher_manager.map_method(self.method, samples)
         except Exception:
             if self.requeue_on_error:
                 LOG.exception(_LE("Dispatcher failed to handle the %s, "
diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py
index c56ea21c2f..d7580db414 100644
--- a/ceilometer/messaging.py
+++ b/ceilometer/messaging.py
@@ -89,6 +89,16 @@ def get_notification_listener(transport, targets, endpoints,
         allow_requeue=allow_requeue)
 
 
+def get_batch_notification_listener(transport, targets, endpoints,
+                                    allow_requeue=False,
+                                    batch_size=1, batch_timeout=None):
+    """Return a configured oslo_messaging notification listener."""
+    return oslo_messaging.get_batch_notification_listener(
+        transport, targets, endpoints, executor='threading',
+        allow_requeue=allow_requeue,
+        batch_size=batch_size, batch_timeout=batch_timeout)
+
+
 def get_notifier(transport, publisher_id):
     """Return a configured oslo_messaging notifier."""
     notifier = oslo_messaging.Notifier(transport, serializer=_SERIALIZER)
diff --git a/ceilometer/tests/functional/test_collector.py b/ceilometer/tests/functional/test_collector.py
index b93f0f0eb9..d701d4022c 100644
--- a/ceilometer/tests/functional/test_collector.py
+++ b/ceilometer/tests/functional/test_collector.py
@@ -217,7 +217,7 @@ class TestCollector(tests_base.BaseTestCase):
                 mock_dispatcher.method_calls[0][1][0],
                 "not-so-secret"))
 
-    def _test_collector_requeue(self, listener):
+    def _test_collector_requeue(self, listener, batch_listener=False):
 
         mock_dispatcher = self._setup_fake_dispatcher()
         self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
@@ -226,7 +226,9 @@ class TestCollector(tests_base.BaseTestCase):
 
         self.srv.start()
         endp = getattr(self.srv, listener).dispatcher.endpoints[0]
-        ret = endp.sample({}, 'pub_id', 'event', {}, {})
+        ret = endp.sample([{'ctxt': {}, 'publisher_id': 'pub_id',
+                            'event_type': 'event', 'payload': {},
+                            'metadata': {}}])
         self.assertEqual(oslo_messaging.NotificationResult.REQUEUE,
                          ret)
 
@@ -257,8 +259,9 @@ class TestCollector(tests_base.BaseTestCase):
 
         self.srv.start()
         endp = getattr(self.srv, listener).dispatcher.endpoints[0]
-        self.assertRaises(FakeException, endp.sample, {}, 'pub_id',
-                          'event', {}, {})
+        self.assertRaises(FakeException, endp.sample, [
+            {'ctxt': {}, 'publisher_id': 'pub_id', 'event_type': 'event',
+             'payload': {}, 'metadata': {}}])
 
     @mock.patch.object(oslo_messaging.MessageHandlingServer, 'start',
                        mock.Mock())