Merge "Re-use transport for rpc server"
This commit is contained in:
commit
ce57cf0fc9
|
@ -152,11 +152,6 @@ def get_client(target, version_cap=None, serializer=None, timeout=None):
|
|||
def get_server(target, endpoints, serializer=None):
|
||||
assert TRANSPORT is not None
|
||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
if profiler:
|
||||
serializer = ProfilerRequestContextSerializer(serializer)
|
||||
else:
|
||||
serializer = RequestContextSerializer(serializer)
|
||||
|
||||
return messaging.get_rpc_server(TRANSPORT,
|
||||
target,
|
||||
endpoints,
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
"""Common RPC service and API tools for Magnum."""
|
||||
|
||||
import oslo_messaging as messaging
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
from oslo_service import service
|
||||
from oslo_utils import importutils
|
||||
|
||||
|
@ -47,14 +46,11 @@ class Service(service.Service):
|
|||
def __init__(self, topic, server, handlers, binary):
|
||||
super(Service, self).__init__()
|
||||
serializer = _init_serializer()
|
||||
transport = messaging.get_rpc_transport(CONF)
|
||||
# TODO(asalkeld) add support for version='x.y'
|
||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
target = messaging.Target(topic=topic, server=server)
|
||||
self._server = messaging.get_rpc_server(transport, target, handlers,
|
||||
executor='eventlet',
|
||||
serializer=serializer,
|
||||
access_policy=access_policy)
|
||||
self._server = rpc.get_server(target, handlers,
|
||||
serializer=serializer)
|
||||
|
||||
self.binary = binary
|
||||
profiler.setup(binary, CONF.host)
|
||||
|
||||
|
|
|
@ -42,38 +42,16 @@ class TestRpc(base.TestCase):
|
|||
self.assertEqual('client', client)
|
||||
|
||||
@mock.patch.object(rpc, 'profiler', None)
|
||||
@mock.patch.object(rpc, 'RequestContextSerializer')
|
||||
@mock.patch.object(messaging, 'get_rpc_server')
|
||||
def test_get_server(self, mock_get, mock_ser):
|
||||
def test_get_server(self, mock_get):
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
ser = mock.Mock()
|
||||
tgt = mock.Mock()
|
||||
ends = mock.Mock()
|
||||
mock_ser.return_value = ser
|
||||
mock_get.return_value = 'server'
|
||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
server = rpc.get_server(tgt, ends, serializer='foo')
|
||||
server = rpc.get_server(tgt, ends, serializer=ser)
|
||||
|
||||
mock_ser.assert_called_once_with('foo')
|
||||
mock_get.assert_called_once_with(rpc.TRANSPORT, tgt, ends,
|
||||
executor='eventlet', serializer=ser,
|
||||
access_policy=access_policy)
|
||||
self.assertEqual('server', server)
|
||||
|
||||
@mock.patch.object(rpc, 'profiler', mock.Mock())
|
||||
@mock.patch.object(rpc, 'ProfilerRequestContextSerializer')
|
||||
@mock.patch.object(messaging, 'get_rpc_server')
|
||||
def test_get_server_profiler_enabled(self, mock_get, mock_ser):
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
ser = mock.Mock()
|
||||
tgt = mock.Mock()
|
||||
ends = mock.Mock()
|
||||
mock_ser.return_value = ser
|
||||
mock_get.return_value = 'server'
|
||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
server = rpc.get_server(tgt, ends, serializer='foo')
|
||||
|
||||
mock_ser.assert_called_once_with('foo')
|
||||
mock_get.assert_called_once_with(rpc.TRANSPORT, tgt, ends,
|
||||
executor='eventlet', serializer=ser,
|
||||
access_policy=access_policy)
|
||||
|
|
Loading…
Reference in New Issue