Add a ping endpoint to RPC dispatcher

The purpose of this patch is to add an endpoint directly in RPC
dispatcher, so this endpoint will always be available, in a cross
project manner, without the need for projects to manage it by themself.

This endpoint stay disabled by default, so this change is harmless
without a specific configuration option.

To enable this ping endpoint, an operator will just have to add a new
parameter in the [DEFAULT] section, alongside with rpc_response_timeout
[DEFAULT]
rpc_ping_enabled=true  # default is false

The purpose of this new endpoint is to help operators do a RPC call (a
ping) toward a specific RPC callback (e.g. a nova-compute, or a
neutron-agent).
This is helping a lot for monitoring agents (for example, if agents are
deployed in a kubernetes pod).

The endpoint is named oslo_rpc_server_ping.

Change-Id: I51cf67e060f240e6eb82260e70a057fe599f9063
Signed-off-by: Arnaud Morin <arnaud.morin@corp.ovh.com>
This commit is contained in:
Arnaud Morin 2020-06-12 20:34:28 +02:00
parent c2074e4760
commit 82492442f3
5 changed files with 81 additions and 0 deletions

View File

@ -61,6 +61,8 @@ class ConfFixture(fixtures.Fixture):
'amqp1_opts', 'oslo_messaging_amqp') 'amqp1_opts', 'oslo_messaging_amqp')
_import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts') _import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts')
_import_opts(self.conf, 'oslo_messaging.transport', '_transport_opts') _import_opts(self.conf, 'oslo_messaging.transport', '_transport_opts')
_import_opts(self.conf, 'oslo_messaging.rpc.dispatcher',
'_dispatcher_opts')
_import_opts(self.conf, _import_opts(self.conf,
'oslo_messaging.notify.notifier', 'oslo_messaging.notify.notifier',
'_notifier_opts', '_notifier_opts',

View File

@ -23,6 +23,7 @@ from oslo_messaging._drivers import impl_rabbit
from oslo_messaging._drivers.kafka_driver import kafka_options from oslo_messaging._drivers.kafka_driver import kafka_options
from oslo_messaging.notify import notifier from oslo_messaging.notify import notifier
from oslo_messaging.rpc import client from oslo_messaging.rpc import client
from oslo_messaging.rpc import dispatcher
from oslo_messaging import server from oslo_messaging import server
from oslo_messaging import transport from oslo_messaging import transport
@ -35,6 +36,7 @@ _global_opt_lists = [
server._pool_opts, server._pool_opts,
client._client_opts, client._client_opts,
transport._transport_opts, transport._transport_opts,
dispatcher._dispatcher_opts,
] ]
_opts = [ _opts = [

View File

@ -22,6 +22,7 @@ import logging
import sys import sys
import threading import threading
from oslo_config import cfg
from oslo_utils import eventletutils from oslo_utils import eventletutils
from oslo_messaging import _utils as utils from oslo_messaging import _utils as utils
@ -30,6 +31,13 @@ from oslo_messaging import serializer as msg_serializer
from oslo_messaging import server as msg_server from oslo_messaging import server as msg_server
from oslo_messaging import target as msg_target from oslo_messaging import target as msg_target
_dispatcher_opts = [
cfg.BoolOpt('rpc_ping_enabled',
default=False,
help='Add an endpoint to answer to ping calls. '
'Endpoint is named oslo_rpc_server_ping'),
]
__all__ = [ __all__ = [
'NoSuchMethod', 'NoSuchMethod',
'RPCAccessPolicyBase', 'RPCAccessPolicyBase',
@ -45,6 +53,11 @@ __all__ = [
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class PingEndpoint(object):
def oslo_rpc_server_ping(self, ctxt, **kwargs):
return 'pong'
class ExpectedException(Exception): class ExpectedException(Exception):
"""Encapsulates an expected exception raised by an RPC endpoint """Encapsulates an expected exception raised by an RPC endpoint
@ -153,8 +166,11 @@ class RPCDispatcher(dispatcher.DispatcherBase):
:param endpoints: list of endpoint objects for dispatching to :param endpoints: list of endpoint objects for dispatching to
:param serializer: optional message serializer :param serializer: optional message serializer
""" """
cfg.CONF.register_opts(_dispatcher_opts)
oslo_rpc_server_ping = None
for ep in endpoints: for ep in endpoints:
# Check if we have an attribute named 'target'
target = getattr(ep, 'target', None) target = getattr(ep, 'target', None)
if target and not isinstance(target, msg_target.Target): if target and not isinstance(target, msg_target.Target):
errmsg = "'target' is a reserved Endpoint attribute used" + \ errmsg = "'target' is a reserved Endpoint attribute used" + \
@ -163,7 +179,27 @@ class RPCDispatcher(dispatcher.DispatcherBase):
" define an Endpoint method named 'target'" " define an Endpoint method named 'target'"
raise TypeError("%s: endpoint=%s" % (errmsg, ep)) raise TypeError("%s: endpoint=%s" % (errmsg, ep))
# Check if we have an attribute named 'oslo_rpc_server_ping'
oslo_rpc_server_ping = getattr(ep, 'oslo_rpc_server_ping', None)
if oslo_rpc_server_ping:
errmsg = "'oslo_rpc_server_ping' is a reserved Endpoint" + \
" attribute which can be use to ping the" + \
" endpoint. Please avoid using any oslo_* " + \
" naming."
LOG.warning("%s (endpoint=%s)" % (errmsg, ep))
self.endpoints = endpoints self.endpoints = endpoints
# Add ping endpoint if enabled in config
if cfg.CONF.rpc_ping_enabled:
if oslo_rpc_server_ping:
LOG.warning("rpc_ping_enabled=True in config but "
"oslo_rpc_server_ping is already declared "
"in an other Endpoint. Not enabling rpc_ping "
"Endpoint.")
else:
self.endpoints.append(PingEndpoint())
self.serializer = serializer or msg_serializer.NoOpSerializer() self.serializer = serializer or msg_serializer.NoOpSerializer()
self._default_target = msg_target.Target() self._default_target = msg_target.Target()
if access_policy is not None: if access_policy is not None:

View File

@ -217,6 +217,34 @@ class TestDispatcher(test_utils.BaseTestCase):
'method: {}'.format(method)) 'method: {}'.format(method))
class TestDispatcherWithPingEndpoint(test_utils.BaseTestCase):
def test_dispatcher_with_ping(self):
self.config(rpc_ping_enabled=True)
dispatcher = oslo_messaging.RPCDispatcher([], None, None)
incoming = mock.Mock(ctxt={},
message=dict(method='oslo_rpc_server_ping'),
client_timeout=0)
res = dispatcher.dispatch(incoming)
self.assertEqual('pong', res)
def test_dispatcher_with_ping_already_used(self):
class MockEndpoint(object):
def oslo_rpc_server_ping(self, ctxt, **kwargs):
return 'not_pong'
mockEndpoint = MockEndpoint()
self.config(rpc_ping_enabled=True)
dispatcher = oslo_messaging.RPCDispatcher([mockEndpoint], None, None)
incoming = mock.Mock(ctxt={},
message=dict(method='oslo_rpc_server_ping'),
client_timeout=0)
res = dispatcher.dispatch(incoming)
self.assertEqual('not_pong', res)
class TestSerializer(test_utils.BaseTestCase): class TestSerializer(test_utils.BaseTestCase):
scenarios = [ scenarios = [
('no_args_or_retval', ('no_args_or_retval',

View File

@ -0,0 +1,13 @@
---
features:
- |
RPC dispatcher can have an extra endpoint named ping.
This endpoint can be enabled thanks to a specific configuration parameter:
[DEFAULT]
rpc_ping_enabled=true # default is false
The purpose of this new endpoint is to help operators do a RPC call (a
ping) toward a specific RPC callback (e.g. a nova-compute, or a
neutron-agent).
This is helping a lot for monitoring agents (for example, if agents are
deployed in a kubernetes pod).