Re-use transport for rpc calls
We are currently creating a new transport for each api
call. This patch changes that so that each worker
can re-use the same transport for multiple requests.
Story: 2008494
Task: 41544
Change-Id: I11a24f035a9d66a536e5e58328084ee08f0c6285
(cherry picked from commit 987c9d809e
)
This commit is contained in:
parent
528ec941da
commit
4162553c6f
@ -142,11 +142,6 @@ def get_transport_url(url_str=None):
|
|||||||
|
|
||||||
def get_client(target, version_cap=None, serializer=None, timeout=None):
|
def get_client(target, version_cap=None, serializer=None, timeout=None):
|
||||||
assert TRANSPORT is not None
|
assert TRANSPORT is not None
|
||||||
if profiler:
|
|
||||||
serializer = ProfilerRequestContextSerializer(serializer)
|
|
||||||
else:
|
|
||||||
serializer = RequestContextSerializer(serializer)
|
|
||||||
|
|
||||||
return messaging.RPCClient(TRANSPORT,
|
return messaging.RPCClient(TRANSPORT,
|
||||||
target,
|
target,
|
||||||
version_cap=version_cap,
|
version_cap=version_cap,
|
||||||
|
@ -79,20 +79,16 @@ class Service(service.Service):
|
|||||||
|
|
||||||
|
|
||||||
class API(object):
|
class API(object):
|
||||||
def __init__(self, transport=None, context=None, topic=None, server=None,
|
def __init__(self, context=None, topic=None, server=None,
|
||||||
timeout=None):
|
timeout=None):
|
||||||
serializer = _init_serializer()
|
serializer = _init_serializer()
|
||||||
if transport is None:
|
|
||||||
exmods = rpc.get_allowed_exmods()
|
|
||||||
transport = messaging.get_rpc_transport(
|
|
||||||
CONF, allowed_remote_exmods=exmods)
|
|
||||||
self._context = context
|
self._context = context
|
||||||
if topic is None:
|
if topic is None:
|
||||||
topic = ''
|
topic = ''
|
||||||
target = messaging.Target(topic=topic, server=server)
|
target = messaging.Target(topic=topic, server=server)
|
||||||
self._client = messaging.RPCClient(transport, target,
|
self._client = rpc.get_client(target,
|
||||||
serializer=serializer,
|
serializer=serializer,
|
||||||
timeout=timeout)
|
timeout=timeout)
|
||||||
|
|
||||||
def _call(self, method, *args, **kwargs):
|
def _call(self, method, *args, **kwargs):
|
||||||
return self._client.call(self._context, method, *args, **kwargs)
|
return self._client.call(self._context, method, *args, **kwargs)
|
||||||
|
@ -25,9 +25,8 @@ CONF = magnum.conf.CONF
|
|||||||
|
|
||||||
@profiler.trace_cls("rpc")
|
@profiler.trace_cls("rpc")
|
||||||
class API(rpc_service.API):
|
class API(rpc_service.API):
|
||||||
def __init__(self, transport=None, context=None, topic=None):
|
def __init__(self, context=None, topic=CONF.conductor.topic):
|
||||||
super(API, self).__init__(transport, context,
|
super(API, self).__init__(context=context, topic=topic)
|
||||||
topic=CONF.conductor.topic)
|
|
||||||
|
|
||||||
# Cluster Operations
|
# Cluster Operations
|
||||||
|
|
||||||
|
@ -25,38 +25,16 @@ from magnum.tests import base
|
|||||||
|
|
||||||
class TestRpc(base.TestCase):
|
class TestRpc(base.TestCase):
|
||||||
@mock.patch.object(rpc, 'profiler', None)
|
@mock.patch.object(rpc, 'profiler', None)
|
||||||
@mock.patch.object(rpc, 'RequestContextSerializer')
|
|
||||||
@mock.patch.object(messaging, 'RPCClient')
|
@mock.patch.object(messaging, 'RPCClient')
|
||||||
def test_get_client(self, mock_client, mock_ser):
|
def test_get_client(self, mock_client):
|
||||||
rpc.TRANSPORT = mock.Mock()
|
rpc.TRANSPORT = mock.Mock()
|
||||||
tgt = mock.Mock()
|
tgt = mock.Mock()
|
||||||
ser = mock.Mock()
|
ser = mock.Mock()
|
||||||
mock_client.return_value = 'client'
|
mock_client.return_value = 'client'
|
||||||
mock_ser.return_value = ser
|
|
||||||
|
|
||||||
client = rpc.get_client(tgt, version_cap='1.0', serializer='foo',
|
client = rpc.get_client(tgt, version_cap='1.0', serializer=ser,
|
||||||
timeout=6969)
|
timeout=6969)
|
||||||
|
|
||||||
mock_ser.assert_called_once_with('foo')
|
|
||||||
mock_client.assert_called_once_with(rpc.TRANSPORT,
|
|
||||||
tgt, version_cap='1.0',
|
|
||||||
serializer=ser, timeout=6969)
|
|
||||||
self.assertEqual('client', client)
|
|
||||||
|
|
||||||
@mock.patch.object(rpc, 'profiler', mock.Mock())
|
|
||||||
@mock.patch.object(rpc, 'ProfilerRequestContextSerializer')
|
|
||||||
@mock.patch.object(messaging, 'RPCClient')
|
|
||||||
def test_get_client_profiler_enabled(self, mock_client, mock_ser):
|
|
||||||
rpc.TRANSPORT = mock.Mock()
|
|
||||||
tgt = mock.Mock()
|
|
||||||
ser = mock.Mock()
|
|
||||||
mock_client.return_value = 'client'
|
|
||||||
mock_ser.return_value = ser
|
|
||||||
|
|
||||||
client = rpc.get_client(tgt, version_cap='1.0', serializer='foo',
|
|
||||||
timeout=6969)
|
|
||||||
|
|
||||||
mock_ser.assert_called_once_with('foo')
|
|
||||||
mock_client.assert_called_once_with(rpc.TRANSPORT,
|
mock_client.assert_called_once_with(rpc.TRANSPORT,
|
||||||
tgt, version_cap='1.0',
|
tgt, version_cap='1.0',
|
||||||
serializer=ser, timeout=6969)
|
serializer=ser, timeout=6969)
|
||||||
|
Loading…
Reference in New Issue
Block a user