Merge "Fix cells rpc connection leak"
This commit is contained in:
@@ -114,12 +114,11 @@ class InterCellRPCAPI(object):
|
||||
self.version_cap = (
|
||||
self.VERSION_ALIASES.get(CONF.upgrade_levels.intercell,
|
||||
CONF.upgrade_levels.intercell))
|
||||
self.transports = {}
|
||||
|
||||
def _get_client(self, next_hop, topic):
|
||||
"""Turn the DB information for a cell into a messaging.RPCClient."""
|
||||
transport_url = next_hop.db_info['transport_url']
|
||||
transport = messaging.get_transport(cfg.CONF, transport_url,
|
||||
rpc.TRANSPORT_ALIASES)
|
||||
transport = self._get_transport(next_hop)
|
||||
target = messaging.Target(topic=topic, version='1.0')
|
||||
serializer = rpc.RequestContextSerializer(None)
|
||||
return messaging.RPCClient(transport,
|
||||
@@ -127,6 +126,21 @@ class InterCellRPCAPI(object):
|
||||
version_cap=self.version_cap,
|
||||
serializer=serializer)
|
||||
|
||||
def _get_transport(self, next_hop):
|
||||
"""NOTE(belliott) Each Transport object contains connection pool
|
||||
state. Maintain references to them to avoid continual reconnects
|
||||
to the message broker.
|
||||
"""
|
||||
transport_url = next_hop.db_info['transport_url']
|
||||
if transport_url not in self.transports:
|
||||
transport = messaging.get_transport(cfg.CONF, transport_url,
|
||||
rpc.TRANSPORT_ALIASES)
|
||||
self.transports[transport_url] = transport
|
||||
else:
|
||||
transport = self.transports[transport_url]
|
||||
|
||||
return transport
|
||||
|
||||
def send_message_to_cell(self, cell_state, message):
|
||||
"""Send a message to another cell by JSON-ifying the message and
|
||||
making an RPC cast to 'process_message'. If the message says to
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
Tests For Cells RPC Communication Driver
|
||||
"""
|
||||
|
||||
import mock
|
||||
from mox3 import mox
|
||||
from oslo.config import cfg
|
||||
from oslo import messaging as oslo_messaging
|
||||
@@ -79,6 +80,26 @@ class CellsRPCDriverTestCase(test.NoDBTestCase):
|
||||
self.driver.stop_servers()
|
||||
self.assertEqual(fake_servers, call_info['stopped'])
|
||||
|
||||
def test_create_transport_once(self):
|
||||
# should only construct each Transport once
|
||||
rpcapi = self.driver.intercell_rpcapi
|
||||
|
||||
transport_url = 'amqp://fakeurl'
|
||||
next_hop = fakes.FakeCellState('cellname')
|
||||
next_hop.db_info['transport_url'] = transport_url
|
||||
|
||||
# first call to _get_transport creates a oslo.messaging.Transport obj
|
||||
with mock.patch.object(oslo_messaging, 'get_transport') as get_trans:
|
||||
transport = rpcapi._get_transport(next_hop)
|
||||
get_trans.assert_called_once_with(rpc_driver.CONF, transport_url,
|
||||
rpc.TRANSPORT_ALIASES)
|
||||
self.assertIn(transport_url, rpcapi.transports)
|
||||
self.assertEqual(transport, rpcapi.transports[transport_url])
|
||||
|
||||
# subsequent calls should return the pre-created Transport obj
|
||||
transport2 = rpcapi._get_transport(next_hop)
|
||||
self.assertEqual(transport, transport2)
|
||||
|
||||
def test_send_message_to_cell_cast(self):
|
||||
msg_runner = fakes.get_message_runner('api-cell')
|
||||
cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
|
||||
|
||||
Reference in New Issue
Block a user