Merge "collector: Use oslo.messaging batch listener"
This commit is contained in:
commit
a7f41eeeb6
@ -13,6 +13,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
from itertools import chain
|
||||||
import socket
|
import socket
|
||||||
|
|
||||||
import msgpack
|
import msgpack
|
||||||
@ -45,6 +46,14 @@ OPTS = [
|
|||||||
default=False,
|
default=False,
|
||||||
help='Requeue the event on the collector event queue '
|
help='Requeue the event on the collector event queue '
|
||||||
'when the collector fails to dispatch it.'),
|
'when the collector fails to dispatch it.'),
|
||||||
|
cfg.IntOpt('batch_size',
|
||||||
|
default=1,
|
||||||
|
help='Number of notification messages to wait before '
|
||||||
|
'dispatching them'),
|
||||||
|
cfg.IntOpt('batch_timeout',
|
||||||
|
default=None,
|
||||||
|
help='Number of seconds to wait before dispatching samples'
|
||||||
|
'when batch_size is not reached (None means indefinitely)'),
|
||||||
]
|
]
|
||||||
|
|
||||||
cfg.CONF.register_opts(OPTS, group="collector")
|
cfg.CONF.register_opts(OPTS, group="collector")
|
||||||
@ -78,21 +87,27 @@ class CollectorService(os_service.Service):
|
|||||||
if list(self.meter_manager):
|
if list(self.meter_manager):
|
||||||
sample_target = oslo_messaging.Target(
|
sample_target = oslo_messaging.Target(
|
||||||
topic=cfg.CONF.publisher_notifier.metering_topic)
|
topic=cfg.CONF.publisher_notifier.metering_topic)
|
||||||
self.sample_listener = messaging.get_notification_listener(
|
self.sample_listener = (
|
||||||
|
messaging.get_batch_notification_listener(
|
||||||
transport, [sample_target],
|
transport, [sample_target],
|
||||||
[SampleEndpoint(self.meter_manager)],
|
[SampleEndpoint(self.meter_manager)],
|
||||||
allow_requeue=(cfg.CONF.collector.
|
allow_requeue=(cfg.CONF.collector.
|
||||||
requeue_sample_on_dispatcher_error))
|
requeue_sample_on_dispatcher_error),
|
||||||
|
batch_size=cfg.CONF.collector.batch_size,
|
||||||
|
batch_timeout=cfg.CONF.collector.batch_timeout))
|
||||||
self.sample_listener.start()
|
self.sample_listener.start()
|
||||||
|
|
||||||
if cfg.CONF.notification.store_events and list(self.event_manager):
|
if cfg.CONF.notification.store_events and list(self.event_manager):
|
||||||
event_target = oslo_messaging.Target(
|
event_target = oslo_messaging.Target(
|
||||||
topic=cfg.CONF.publisher_notifier.event_topic)
|
topic=cfg.CONF.publisher_notifier.event_topic)
|
||||||
self.event_listener = messaging.get_notification_listener(
|
self.event_listener = (
|
||||||
|
messaging.get_batch_notification_listener(
|
||||||
transport, [event_target],
|
transport, [event_target],
|
||||||
[EventEndpoint(self.event_manager)],
|
[EventEndpoint(self.event_manager)],
|
||||||
allow_requeue=(cfg.CONF.collector.
|
allow_requeue=(cfg.CONF.collector.
|
||||||
requeue_event_on_dispatcher_error))
|
requeue_event_on_dispatcher_error),
|
||||||
|
batch_size=cfg.CONF.collector.batch_size,
|
||||||
|
batch_timeout=cfg.CONF.collector.batch_timeout))
|
||||||
self.event_listener.start()
|
self.event_listener.start()
|
||||||
|
|
||||||
if not cfg.CONF.collector.udp_address:
|
if not cfg.CONF.collector.udp_address:
|
||||||
@ -147,14 +162,15 @@ class CollectorEndpoint(object):
|
|||||||
self.dispatcher_manager = dispatcher_manager
|
self.dispatcher_manager = dispatcher_manager
|
||||||
self.requeue_on_error = requeue_on_error
|
self.requeue_on_error = requeue_on_error
|
||||||
|
|
||||||
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
|
def sample(self, messages):
|
||||||
"""RPC endpoint for notification messages
|
"""RPC endpoint for notification messages
|
||||||
|
|
||||||
When another service sends a notification over the message
|
When another service sends a notification over the message
|
||||||
bus, this method receives it.
|
bus, this method receives it.
|
||||||
"""
|
"""
|
||||||
|
samples = list(chain.from_iterable(m["payload"] for m in messages))
|
||||||
try:
|
try:
|
||||||
self.dispatcher_manager.map_method(self.method, payload)
|
self.dispatcher_manager.map_method(self.method, samples)
|
||||||
except Exception:
|
except Exception:
|
||||||
if self.requeue_on_error:
|
if self.requeue_on_error:
|
||||||
LOG.exception(_LE("Dispatcher failed to handle the %s, "
|
LOG.exception(_LE("Dispatcher failed to handle the %s, "
|
||||||
|
@ -89,6 +89,16 @@ def get_notification_listener(transport, targets, endpoints,
|
|||||||
allow_requeue=allow_requeue)
|
allow_requeue=allow_requeue)
|
||||||
|
|
||||||
|
|
||||||
|
def get_batch_notification_listener(transport, targets, endpoints,
|
||||||
|
allow_requeue=False,
|
||||||
|
batch_size=1, batch_timeout=None):
|
||||||
|
"""Return a configured oslo_messaging notification listener."""
|
||||||
|
return oslo_messaging.get_batch_notification_listener(
|
||||||
|
transport, targets, endpoints, executor='threading',
|
||||||
|
allow_requeue=allow_requeue,
|
||||||
|
batch_size=batch_size, batch_timeout=batch_timeout)
|
||||||
|
|
||||||
|
|
||||||
def get_notifier(transport, publisher_id):
|
def get_notifier(transport, publisher_id):
|
||||||
"""Return a configured oslo_messaging notifier."""
|
"""Return a configured oslo_messaging notifier."""
|
||||||
notifier = oslo_messaging.Notifier(transport, serializer=_SERIALIZER)
|
notifier = oslo_messaging.Notifier(transport, serializer=_SERIALIZER)
|
||||||
|
@ -217,7 +217,7 @@ class TestCollector(tests_base.BaseTestCase):
|
|||||||
mock_dispatcher.method_calls[0][1][0],
|
mock_dispatcher.method_calls[0][1][0],
|
||||||
"not-so-secret"))
|
"not-so-secret"))
|
||||||
|
|
||||||
def _test_collector_requeue(self, listener):
|
def _test_collector_requeue(self, listener, batch_listener=False):
|
||||||
|
|
||||||
mock_dispatcher = self._setup_fake_dispatcher()
|
mock_dispatcher = self._setup_fake_dispatcher()
|
||||||
self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
|
self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
|
||||||
@ -226,7 +226,9 @@ class TestCollector(tests_base.BaseTestCase):
|
|||||||
|
|
||||||
self.srv.start()
|
self.srv.start()
|
||||||
endp = getattr(self.srv, listener).dispatcher.endpoints[0]
|
endp = getattr(self.srv, listener).dispatcher.endpoints[0]
|
||||||
ret = endp.sample({}, 'pub_id', 'event', {}, {})
|
ret = endp.sample([{'ctxt': {}, 'publisher_id': 'pub_id',
|
||||||
|
'event_type': 'event', 'payload': {},
|
||||||
|
'metadata': {}}])
|
||||||
self.assertEqual(oslo_messaging.NotificationResult.REQUEUE,
|
self.assertEqual(oslo_messaging.NotificationResult.REQUEUE,
|
||||||
ret)
|
ret)
|
||||||
|
|
||||||
@ -257,8 +259,9 @@ class TestCollector(tests_base.BaseTestCase):
|
|||||||
|
|
||||||
self.srv.start()
|
self.srv.start()
|
||||||
endp = getattr(self.srv, listener).dispatcher.endpoints[0]
|
endp = getattr(self.srv, listener).dispatcher.endpoints[0]
|
||||||
self.assertRaises(FakeException, endp.sample, {}, 'pub_id',
|
self.assertRaises(FakeException, endp.sample, [
|
||||||
'event', {}, {})
|
{'ctxt': {}, 'publisher_id': 'pub_id', 'event_type': 'event',
|
||||||
|
'payload': {}, 'metadata': {}}])
|
||||||
|
|
||||||
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start',
|
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start',
|
||||||
mock.Mock())
|
mock.Mock())
|
||||||
|
Loading…
Reference in New Issue
Block a user