Re-use transport for rpc server

This patch changes the rpc server to re-use
the transport in the same manner as the rpc client.

Story: 2008494
Task: 41752

Change-Id: I93eecacbe45d19c4f73e9a974d60e642e87bbdf0
This commit is contained in:
Erik Olof Gunnar Andersson 2021-01-13 22:38:37 -08:00
parent d614499825
commit 672b119507
3 changed files with 5 additions and 36 deletions

View File

@ -152,11 +152,6 @@ def get_client(target, version_cap=None, serializer=None, timeout=None):
def get_server(target, endpoints, serializer=None): def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None assert TRANSPORT is not None
access_policy = dispatcher.DefaultRPCAccessPolicy access_policy = dispatcher.DefaultRPCAccessPolicy
if profiler:
serializer = ProfilerRequestContextSerializer(serializer)
else:
serializer = RequestContextSerializer(serializer)
return messaging.get_rpc_server(TRANSPORT, return messaging.get_rpc_server(TRANSPORT,
target, target,
endpoints, endpoints,

View File

@ -15,7 +15,6 @@
"""Common RPC service and API tools for Magnum.""" """Common RPC service and API tools for Magnum."""
import oslo_messaging as messaging import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from oslo_service import service from oslo_service import service
from oslo_utils import importutils from oslo_utils import importutils
@ -47,14 +46,11 @@ class Service(service.Service):
def __init__(self, topic, server, handlers, binary): def __init__(self, topic, server, handlers, binary):
super(Service, self).__init__() super(Service, self).__init__()
serializer = _init_serializer() serializer = _init_serializer()
transport = messaging.get_rpc_transport(CONF)
# TODO(asalkeld) add support for version='x.y' # TODO(asalkeld) add support for version='x.y'
access_policy = dispatcher.DefaultRPCAccessPolicy
target = messaging.Target(topic=topic, server=server) target = messaging.Target(topic=topic, server=server)
self._server = messaging.get_rpc_server(transport, target, handlers, self._server = rpc.get_server(target, handlers,
executor='eventlet', serializer=serializer)
serializer=serializer,
access_policy=access_policy)
self.binary = binary self.binary = binary
profiler.setup(binary, CONF.host) profiler.setup(binary, CONF.host)

View File

@ -42,38 +42,16 @@ class TestRpc(base.TestCase):
self.assertEqual('client', client) self.assertEqual('client', client)
@mock.patch.object(rpc, 'profiler', None) @mock.patch.object(rpc, 'profiler', None)
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_rpc_server') @mock.patch.object(messaging, 'get_rpc_server')
def test_get_server(self, mock_get, mock_ser): def test_get_server(self, mock_get):
rpc.TRANSPORT = mock.Mock() rpc.TRANSPORT = mock.Mock()
ser = mock.Mock() ser = mock.Mock()
tgt = mock.Mock() tgt = mock.Mock()
ends = mock.Mock() ends = mock.Mock()
mock_ser.return_value = ser
mock_get.return_value = 'server' mock_get.return_value = 'server'
access_policy = dispatcher.DefaultRPCAccessPolicy access_policy = dispatcher.DefaultRPCAccessPolicy
server = rpc.get_server(tgt, ends, serializer='foo') server = rpc.get_server(tgt, ends, serializer=ser)
mock_ser.assert_called_once_with('foo')
mock_get.assert_called_once_with(rpc.TRANSPORT, tgt, ends,
executor='eventlet', serializer=ser,
access_policy=access_policy)
self.assertEqual('server', server)
@mock.patch.object(rpc, 'profiler', mock.Mock())
@mock.patch.object(rpc, 'ProfilerRequestContextSerializer')
@mock.patch.object(messaging, 'get_rpc_server')
def test_get_server_profiler_enabled(self, mock_get, mock_ser):
rpc.TRANSPORT = mock.Mock()
ser = mock.Mock()
tgt = mock.Mock()
ends = mock.Mock()
mock_ser.return_value = ser
mock_get.return_value = 'server'
access_policy = dispatcher.DefaultRPCAccessPolicy
server = rpc.get_server(tgt, ends, serializer='foo')
mock_ser.assert_called_once_with('foo')
mock_get.assert_called_once_with(rpc.TRANSPORT, tgt, ends, mock_get.assert_called_once_with(rpc.TRANSPORT, tgt, ends,
executor='eventlet', serializer=ser, executor='eventlet', serializer=ser,
access_policy=access_policy) access_policy=access_policy)