diff --git a/octavia/api/drivers/amphora_driver/driver.py b/octavia/api/drivers/amphora_driver/driver.py index 39d0676130..c029e04bd6 100644 --- a/octavia/api/drivers/amphora_driver/driver.py +++ b/octavia/api/drivers/amphora_driver/driver.py @@ -14,7 +14,6 @@ from jsonschema import exceptions as js_exceptions from jsonschema import validate - from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging @@ -27,6 +26,7 @@ from octavia.api.drivers import provider_base as driver_base from octavia.api.drivers import utils as driver_utils from octavia.common import constants as consts from octavia.common import data_models +from octavia.common import rpc from octavia.common import utils from octavia.db import api as db_apis from octavia.db import repositories @@ -41,11 +41,10 @@ class AmphoraProviderDriver(driver_base.ProviderDriver): def __init__(self): super(AmphoraProviderDriver, self).__init__() topic = cfg.CONF.oslo_messaging.topic - self.transport = messaging.get_rpc_transport(cfg.CONF) self.target = messaging.Target( namespace=consts.RPC_NAMESPACE_CONTROLLER_AGENT, topic=topic, version="1.0", fanout=False) - self.client = messaging.RPCClient(self.transport, target=self.target) + self.client = rpc.get_client(self.target) self.repositories = repositories.Repositories() # Load Balancer diff --git a/octavia/api/handlers/queue/producer.py b/octavia/api/handlers/queue/producer.py index 7652df5187..af2999a96e 100644 --- a/octavia/api/handlers/queue/producer.py +++ b/octavia/api/handlers/queue/producer.py @@ -32,6 +32,8 @@ import six from octavia.api.handlers import abstract_handler from octavia.common import constants +from octavia.common import rpc + cfg.CONF.import_group('oslo_messaging', 'octavia.common.config') @@ -46,11 +48,10 @@ class BaseProducer(abstract_handler.BaseObjectHandler): def __init__(self): topic = cfg.CONF.oslo_messaging.topic - self.transport = messaging.get_rpc_transport(cfg.CONF) self.target = messaging.Target( namespace=constants.RPC_NAMESPACE_CONTROLLER_AGENT, topic=topic, version="1.0", fanout=False) - self.client = messaging.RPCClient(self.transport, target=self.target) + self.client = rpc.get_client(self.target) def create(self, model): """Sends a create message to the controller via oslo.messaging @@ -229,12 +230,12 @@ class ProducerHandler(abstract_handler.BaseHandler): used to send messages via the Class variables load_balancer, listener, health_monitor, member, l7policy and l7rule. """ - - load_balancer = LoadBalancerProducer() - listener = ListenerProducer() - pool = PoolProducer() - health_monitor = HealthMonitorProducer() - member = MemberProducer() - l7policy = L7PolicyProducer() - l7rule = L7RuleProducer() - amphora = AmphoraProducer() + def __init__(self): + self.load_balancer = LoadBalancerProducer() + self.listener = ListenerProducer() + self.pool = PoolProducer() + self.health_monitor = HealthMonitorProducer() + self.member = MemberProducer() + self.l7policy = L7PolicyProducer() + self.l7rule = L7RuleProducer() + self.amphora = AmphoraProducer() diff --git a/octavia/api/v2/controllers/amphora.py b/octavia/api/v2/controllers/amphora.py index ff2846a4e7..b3711140f8 100644 --- a/octavia/api/v2/controllers/amphora.py +++ b/octavia/api/v2/controllers/amphora.py @@ -25,7 +25,7 @@ from octavia.api.v2.controllers import base from octavia.api.v2.types import amphora as amp_types from octavia.common import constants from octavia.common import exceptions - +from octavia.common import rpc CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -96,11 +96,10 @@ class FailoverController(base.BaseController): def __init__(self, amp_id): super(FailoverController, self).__init__() topic = cfg.CONF.oslo_messaging.topic - self.transport = messaging.get_rpc_transport(cfg.CONF) self.target = messaging.Target( namespace=constants.RPC_NAMESPACE_CONTROLLER_AGENT, topic=topic, version="1.0", fanout=False) - self.client = messaging.RPCClient(self.transport, target=self.target) + self.client = rpc.get_client(self.target) self.amp_id = amp_id @wsme_pecan.wsexpose(None, wtypes.text, status_code=202) diff --git a/octavia/common/rpc.py b/octavia/common/rpc.py new file mode 100644 index 0000000000..d591c85466 --- /dev/null +++ b/octavia/common/rpc.py @@ -0,0 +1,66 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging +from oslo_messaging.rpc import dispatcher + +LOG = logging.getLogger(__name__) + +TRANSPORT = None + + +def init(): + global TRANSPORT + TRANSPORT = create_transport(get_transport_url()) + + +def cleanup(): + global TRANSPORT + if TRANSPORT is not None: + TRANSPORT.cleanup() + TRANSPORT = None + + +def get_transport_url(url_str=None): + return messaging.TransportURL.parse(cfg.CONF, url_str) + + +def get_client(target, version_cap=None, serializer=None, + call_monitor_timeout=None): + if TRANSPORT is None: + init() + + return messaging.RPCClient(TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer, + call_monitor_timeout=call_monitor_timeout) + + +def get_server(target, endpoints, executor='threading', + access_policy=dispatcher.DefaultRPCAccessPolicy, + serializer=None): + if TRANSPORT is None: + init() + + return messaging.get_rpc_server(TRANSPORT, + target, + endpoints, + executor=executor, + serializer=serializer, + access_policy=access_policy) + + +def create_transport(url): + return messaging.get_rpc_transport(cfg.CONF, url=url) diff --git a/octavia/common/service.py b/octavia/common/service.py index 1c665f9060..ff4d7455eb 100644 --- a/octavia/common/service.py +++ b/octavia/common/service.py @@ -16,6 +16,7 @@ from oslo_config import cfg from oslo_log import log from octavia.common import config +from octavia.common import rpc def prepare_service(argv=None): @@ -24,3 +25,4 @@ def prepare_service(argv=None): config.init(argv[1:]) log.set_defaults() config.setup_logging(cfg.CONF) + rpc.init() diff --git a/octavia/controller/queue/consumer.py b/octavia/controller/queue/consumer.py index 8e347454a2..5cf8766dfa 100644 --- a/octavia/controller/queue/consumer.py +++ b/octavia/controller/queue/consumer.py @@ -17,6 +17,7 @@ from oslo_log import log as logging import oslo_messaging as messaging from oslo_messaging.rpc import dispatcher +from octavia.common import rpc from octavia.controller.queue import endpoint LOG = logging.getLogger(__name__) @@ -35,13 +36,14 @@ class ConsumerService(cotyledon.Service): def run(self): LOG.info('Starting consumer...') - transport = messaging.get_rpc_transport(self.conf) target = messaging.Target(topic=self.topic, server=self.server, fanout=False) self.endpoints = [endpoint.Endpoint()] - self.message_listener = messaging.get_rpc_server( - transport, target, self.endpoints, - executor='threading', access_policy=self.access_policy) + self.message_listener = rpc.get_server( + target, self.endpoints, + executor='threading', + access_policy=self.access_policy + ) self.message_listener.start() def terminate(self, graceful=False): diff --git a/octavia/tests/unit/api/drivers/amphora_driver/test_amphora_driver.py b/octavia/tests/unit/api/drivers/amphora_driver/test_amphora_driver.py index bdf558a2ec..4e981dbfb2 100644 --- a/octavia/tests/unit/api/drivers/amphora_driver/test_amphora_driver.py +++ b/octavia/tests/unit/api/drivers/amphora_driver/test_amphora_driver.py @@ -11,7 +11,6 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - import mock from octavia.api.drivers.amphora_driver import driver @@ -23,7 +22,7 @@ from octavia.tests.unit.api.drivers import sample_data_models from octavia.tests.unit import base -class TestAmphoraDriver(base.TestCase): +class TestAmphoraDriver(base.TestRpc): def setUp(self): super(TestAmphoraDriver, self).setUp() self.amp_driver = driver.AmphoraProviderDriver() diff --git a/octavia/tests/unit/api/handlers/queue/test_producer.py b/octavia/tests/unit/api/handlers/queue/test_producer.py index 51ce90a226..952cb27547 100644 --- a/octavia/tests/unit/api/handlers/queue/test_producer.py +++ b/octavia/tests/unit/api/handlers/queue/test_producer.py @@ -41,26 +41,23 @@ from octavia.common import data_models from octavia.tests.unit import base -class TestProducer(base.TestCase): +class TestProducer(base.TestRpc): def setUp(self): super(TestProducer, self).setUp() self.mck_model = mock.Mock() self.mck_model.id = '10' conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) conf.config(group="oslo_messaging", topic='OCTAVIA_PROV') + self.mck_client = mock.create_autospec(messaging.RPCClient) mck_target = mock.patch( 'octavia.api.handlers.queue.producer.messaging.Target') - mck_transport = mock.patch( - 'octavia.api.handlers.queue.producer.messaging.get_transport') self.mck_client = mock.create_autospec(messaging.RPCClient) mck_client = mock.patch( 'octavia.api.handlers.queue.producer.messaging.RPCClient', return_value=self.mck_client) mck_target.start() - mck_transport.start() mck_client.start() self.addCleanup(mck_target.stop) - self.addCleanup(mck_transport.stop) self.addCleanup(mck_client.stop) def test_create_loadbalancer(self): diff --git a/octavia/tests/unit/base.py b/octavia/tests/unit/base.py index 0f3748655f..835059a4a0 100644 --- a/octavia/tests/unit/base.py +++ b/octavia/tests/unit/base.py @@ -11,11 +11,16 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - +import fixtures import mock +from oslo_config import cfg +import oslo_messaging as messaging +from oslo_messaging import conffixture as messaging_conffixture import testtools from octavia.common import clients +from octavia.common import rpc + # needed for tests to function when run independently: from octavia.common import config # noqa: F401 @@ -30,3 +35,29 @@ class TestCase(testtools.TestCase): def clean_caches(self): clients.NovaAuth.nova_client = None clients.NeutronAuth.neutron_client = None + + +class TestRpc(testtools.TestCase): + def __init__(self, *args, **kwargs): + super(TestRpc, self).__init__(*args, **kwargs) + self._buses = {} + + def _fake_create_transport(self, url): + if url not in self._buses: + self._buses[url] = messaging.get_rpc_transport( + cfg.CONF, + url=url) + return self._buses[url] + + def setUp(self): + super(TestRpc, self).setUp() + self.addCleanup(rpc.cleanup) + self.messaging_conf = messaging_conffixture.ConfFixture(cfg.CONF) + self.messaging_conf.transport_url = 'fake:/' + self.useFixture(self.messaging_conf) + self.useFixture(fixtures.MonkeyPatch( + 'octavia.common.rpc.create_transport', + self._fake_create_transport)) + with mock.patch('octavia.common.rpc.get_transport_url') as mock_gtu: + mock_gtu.return_value = None + rpc.init() diff --git a/octavia/tests/unit/controller/queue/test_consumer.py b/octavia/tests/unit/controller/queue/test_consumer.py index b10451cc65..4e3865a587 100644 --- a/octavia/tests/unit/controller/queue/test_consumer.py +++ b/octavia/tests/unit/controller/queue/test_consumer.py @@ -16,18 +16,13 @@ import mock from oslo_config import cfg from oslo_config import fixture as oslo_fixture import oslo_messaging as messaging -from oslo_messaging.rpc import dispatcher from octavia.controller.queue import consumer from octavia.controller.queue import endpoint from octavia.tests.unit import base -@mock.patch.object(messaging, 'get_rpc_transport') -@mock.patch.object(messaging, 'Target') -@mock.patch.object(endpoint, 'Endpoint') -@mock.patch.object(messaging, 'get_rpc_server') -class TestConsumer(base.TestCase): +class TestConsumer(base.TestRpc): def setUp(self): super(TestConsumer, self).setUp() @@ -36,10 +31,10 @@ class TestConsumer(base.TestCase): conf.config(host='test-hostname') self.conf = conf.conf - def test_consumer_run(self, mock_rpc_server, mock_endpoint, mock_target, - mock_get_transport): - mock_get_transport_rv = mock.Mock() - mock_get_transport.return_value = mock_get_transport_rv + @mock.patch.object(messaging, 'Target') + @mock.patch.object(endpoint, 'Endpoint') + @mock.patch.object(messaging, 'get_rpc_server') + def test_consumer_run(self, mock_rpc_server, mock_endpoint, mock_target): mock_rpc_server_rv = mock.Mock() mock_rpc_server.return_value = mock_rpc_server_rv mock_endpoint_rv = mock.Mock() @@ -49,20 +44,13 @@ class TestConsumer(base.TestCase): consumer.ConsumerService(1, self.conf).run() - mock_get_transport.assert_called_once_with(cfg.CONF) mock_target.assert_called_once_with(topic='foo_topic', server='test-hostname', fanout=False) mock_endpoint.assert_called_once_with() - access_policy = dispatcher.DefaultRPCAccessPolicy - mock_rpc_server.assert_called_once_with(mock_get_transport_rv, - mock_target_rv, - [mock_endpoint_rv], - executor='threading', - access_policy=access_policy) - def test_consumer_terminate(self, mock_rpc_server, mock_endpoint, - mock_target, mock_get_transport): + @mock.patch.object(messaging, 'get_rpc_server') + def test_consumer_terminate(self, mock_rpc_server): mock_rpc_server_rv = mock.Mock() mock_rpc_server.return_value = mock_rpc_server_rv @@ -72,8 +60,8 @@ class TestConsumer(base.TestCase): mock_rpc_server_rv.stop.assert_called_once_with() self.assertFalse(mock_rpc_server_rv.wait.called) - def test_consumer_graceful_terminate(self, mock_rpc_server, mock_endpoint, - mock_target, mock_get_transport): + @mock.patch.object(messaging, 'get_rpc_server') + def test_consumer_graceful_terminate(self, mock_rpc_server): mock_rpc_server_rv = mock.Mock() mock_rpc_server.return_value = mock_rpc_server_rv diff --git a/releasenotes/notes/fix-oslo-messaging-connection-leakage-aeb79474105ac116.yaml b/releasenotes/notes/fix-oslo-messaging-connection-leakage-aeb79474105ac116.yaml new file mode 100644 index 0000000000..9cfc750c61 --- /dev/null +++ b/releasenotes/notes/fix-oslo-messaging-connection-leakage-aeb79474105ac116.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + Fixed a bug that caused an excessive number of RabbitMQ connections to be + opened.