Re-use rpc transport

Blazar is currently not re-using the rpc transport, this
will cause connections to leak. This patch introduces
a commonly shared rpc pattern for openstack services
that ensures that the transport is always re-used.

Change-Id: Ib49d7820b2ce4bc6cc9bbbd91c075631b7fb56bb
This commit is contained in:
Erik Olof Gunnar Andersson 2021-01-16 13:12:31 -08:00
parent 37bee67efd
commit 7d131c96d1
2 changed files with 58 additions and 10 deletions

51
blazar/rpc.py Normal file
View File

@ -0,0 +1,51 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
import oslo_messaging as messaging
CONF = cfg.CONF
TRANSPORT = None
def init():
global TRANSPORT
if TRANSPORT is None:
TRANSPORT = create_transport(get_transport_url())
def get_transport_url(url_str=None):
return messaging.TransportURL.parse(CONF, url_str)
def get_client(target):
if TRANSPORT is None:
raise AssertionError("'TRANSPORT' must not be None")
return messaging.RPCClient(
TRANSPORT,
target,
)
def get_server(target, endpoints):
if TRANSPORT is None:
raise AssertionError("'TRANSPORT' must not be None")
return messaging.get_rpc_server(
TRANSPORT,
target,
endpoints,
executor='eventlet',
)
def create_transport(url):
return messaging.get_rpc_transport(CONF, url=url)

View File

@ -19,10 +19,10 @@ import functools
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import service
from blazar import context
from blazar import rpc
LOG = logging.getLogger(__name__)
@ -30,10 +30,8 @@ LOG = logging.getLogger(__name__)
class RPCClient(object):
def __init__(self, target):
super(RPCClient, self).__init__()
self._client = messaging.RPCClient(
target=target,
transport=messaging.get_rpc_transport(cfg.CONF),
)
rpc.init()
self._client = rpc.get_client(target)
def cast(self, name, **kwargs):
ctx = context.current()
@ -47,11 +45,10 @@ class RPCClient(object):
class RPCServer(service.Service):
def __init__(self, target):
super(RPCServer, self).__init__()
self._server = messaging.get_rpc_server(
target=target,
transport=messaging.get_rpc_transport(cfg.CONF),
endpoints=[ContextEndpointHandler(self, target)],
executor='eventlet',
rpc.init()
self._server = rpc.get_server(
target,
endpoints=[ContextEndpointHandler(self, target)]
)
def start(self):