diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py index 4549ee5de..39e76151d 100644 --- a/oslo_messaging/conffixture.py +++ b/oslo_messaging/conffixture.py @@ -61,6 +61,8 @@ class ConfFixture(fixtures.Fixture): 'amqp1_opts', 'oslo_messaging_amqp') _import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts') _import_opts(self.conf, 'oslo_messaging.transport', '_transport_opts') + _import_opts(self.conf, 'oslo_messaging.rpc.dispatcher', + '_dispatcher_opts') _import_opts(self.conf, 'oslo_messaging.notify.notifier', '_notifier_opts', diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index 36d25a790..d24698352 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -23,6 +23,7 @@ from oslo_messaging._drivers import impl_rabbit from oslo_messaging._drivers.kafka_driver import kafka_options from oslo_messaging.notify import notifier from oslo_messaging.rpc import client +from oslo_messaging.rpc import dispatcher from oslo_messaging import server from oslo_messaging import transport @@ -35,6 +36,7 @@ _global_opt_lists = [ server._pool_opts, client._client_opts, transport._transport_opts, + dispatcher._dispatcher_opts, ] _opts = [ diff --git a/oslo_messaging/rpc/dispatcher.py b/oslo_messaging/rpc/dispatcher.py index 86e2e6630..61ce0209a 100644 --- a/oslo_messaging/rpc/dispatcher.py +++ b/oslo_messaging/rpc/dispatcher.py @@ -22,6 +22,7 @@ import logging import sys import threading +from oslo_config import cfg from oslo_utils import eventletutils from oslo_messaging import _utils as utils @@ -30,6 +31,13 @@ from oslo_messaging import serializer as msg_serializer from oslo_messaging import server as msg_server from oslo_messaging import target as msg_target +_dispatcher_opts = [ + cfg.BoolOpt('rpc_ping_enabled', + default=False, + help='Add an endpoint to answer to ping calls. ' + 'Endpoint is named oslo_rpc_server_ping'), +] + __all__ = [ 'NoSuchMethod', 'RPCAccessPolicyBase', @@ -45,6 +53,11 @@ __all__ = [ LOG = logging.getLogger(__name__) +class PingEndpoint(object): + def oslo_rpc_server_ping(self, ctxt, **kwargs): + return 'pong' + + class ExpectedException(Exception): """Encapsulates an expected exception raised by an RPC endpoint @@ -153,8 +166,11 @@ class RPCDispatcher(dispatcher.DispatcherBase): :param endpoints: list of endpoint objects for dispatching to :param serializer: optional message serializer """ + cfg.CONF.register_opts(_dispatcher_opts) + oslo_rpc_server_ping = None for ep in endpoints: + # Check if we have an attribute named 'target' target = getattr(ep, 'target', None) if target and not isinstance(target, msg_target.Target): errmsg = "'target' is a reserved Endpoint attribute used" + \ @@ -163,7 +179,27 @@ class RPCDispatcher(dispatcher.DispatcherBase): " define an Endpoint method named 'target'" raise TypeError("%s: endpoint=%s" % (errmsg, ep)) + # Check if we have an attribute named 'oslo_rpc_server_ping' + oslo_rpc_server_ping = getattr(ep, 'oslo_rpc_server_ping', None) + if oslo_rpc_server_ping: + errmsg = "'oslo_rpc_server_ping' is a reserved Endpoint" + \ + " attribute which can be use to ping the" + \ + " endpoint. Please avoid using any oslo_* " + \ + " naming." + LOG.warning("%s (endpoint=%s)" % (errmsg, ep)) + self.endpoints = endpoints + + # Add ping endpoint if enabled in config + if cfg.CONF.rpc_ping_enabled: + if oslo_rpc_server_ping: + LOG.warning("rpc_ping_enabled=True in config but " + "oslo_rpc_server_ping is already declared " + "in an other Endpoint. Not enabling rpc_ping " + "Endpoint.") + else: + self.endpoints.append(PingEndpoint()) + self.serializer = serializer or msg_serializer.NoOpSerializer() self._default_target = msg_target.Target() if access_policy is not None: diff --git a/oslo_messaging/tests/rpc/test_dispatcher.py b/oslo_messaging/tests/rpc/test_dispatcher.py index 89b36cd5e..337593b74 100755 --- a/oslo_messaging/tests/rpc/test_dispatcher.py +++ b/oslo_messaging/tests/rpc/test_dispatcher.py @@ -217,6 +217,34 @@ class TestDispatcher(test_utils.BaseTestCase): 'method: {}'.format(method)) +class TestDispatcherWithPingEndpoint(test_utils.BaseTestCase): + def test_dispatcher_with_ping(self): + self.config(rpc_ping_enabled=True) + dispatcher = oslo_messaging.RPCDispatcher([], None, None) + incoming = mock.Mock(ctxt={}, + message=dict(method='oslo_rpc_server_ping'), + client_timeout=0) + + res = dispatcher.dispatch(incoming) + self.assertEqual('pong', res) + + def test_dispatcher_with_ping_already_used(self): + class MockEndpoint(object): + def oslo_rpc_server_ping(self, ctxt, **kwargs): + return 'not_pong' + + mockEndpoint = MockEndpoint() + + self.config(rpc_ping_enabled=True) + dispatcher = oslo_messaging.RPCDispatcher([mockEndpoint], None, None) + incoming = mock.Mock(ctxt={}, + message=dict(method='oslo_rpc_server_ping'), + client_timeout=0) + + res = dispatcher.dispatch(incoming) + self.assertEqual('not_pong', res) + + class TestSerializer(test_utils.BaseTestCase): scenarios = [ ('no_args_or_retval', diff --git a/releasenotes/notes/add-ping-endpoint.yaml b/releasenotes/notes/add-ping-endpoint.yaml new file mode 100644 index 000000000..180f7f2fc --- /dev/null +++ b/releasenotes/notes/add-ping-endpoint.yaml @@ -0,0 +1,13 @@ +--- +features: + - | + RPC dispatcher can have an extra endpoint named ping. + This endpoint can be enabled thanks to a specific configuration parameter: + [DEFAULT] + rpc_ping_enabled=true # default is false + + The purpose of this new endpoint is to help operators do a RPC call (a + ping) toward a specific RPC callback (e.g. a nova-compute, or a + neutron-agent). + This is helping a lot for monitoring agents (for example, if agents are + deployed in a kubernetes pod).