Merge "Start trunk plugin RPC via service framework"

This commit is contained in:
Zuul 2024-05-03 01:49:43 +00:00 committed by Gerrit Code Review
commit 7f56084c88
5 changed files with 29 additions and 18 deletions

View File

@ -19,8 +19,6 @@ from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron.services.trunk.rpc import backend
@registry.has_registry_receivers
class DriverBase(object):
@ -82,7 +80,8 @@ class DriverBase(object):
"""
trigger.register_driver(self)
# Set up the server-side RPC backend if the driver is loaded,
# it is agent based, and the RPC backend is not already initialized.
if self.is_loaded and self.agent_type and not trigger.is_rpc_enabled():
trigger.set_rpc_backend(backend.ServerSideRpcBackend())
@property
def rpc_required(self):
"""True if this driver requires the RPC backend to be started"""
return self.is_loaded and self.agent_type

View File

@ -37,6 +37,8 @@ from neutron.objects import base as objects_base
from neutron.objects import trunk as trunk_objects
from neutron.services.trunk import drivers
from neutron.services.trunk import exceptions as trunk_exc
from neutron.services.trunk.rpc import backend
from neutron.services.trunk.rpc import server
from neutron.services.trunk import rules
from neutron.services.trunk.seg_types import validators
@ -55,7 +57,8 @@ class TrunkPlugin(service_base.ServicePluginBase):
__filter_validation_support = True
def __init__(self):
self._rpc_backend = None
self._rpc_server = None
self._rpc_notifier = None
self._drivers = []
self._segmentation_types = {}
self._interfaces = set()
@ -64,6 +67,10 @@ class TrunkPlugin(service_base.ServicePluginBase):
registry.subscribe(rules.enforce_port_deletion_rules,
resources.PORT, events.BEFORE_DELETE)
registry.publish(resources.TRUNK_PLUGIN, events.AFTER_INIT, self)
if any(drv.rpc_required for drv in self._drivers):
# create notifier backend
self._rpc_notifier = backend.ServerSideRpcBackend()
for driver in self._drivers:
LOG.debug('Trunk plugin loaded with driver %s', driver.name)
self.check_compatibility()
@ -91,6 +98,13 @@ class TrunkPlugin(service_base.ServicePluginBase):
return port_res
def start_rpc_listeners(self):
if not any(drv.rpc_required for drv in self._drivers):
return []
self._rpc_server = server.TrunkSkeleton()
return self._rpc_server.rpc_servers
@staticmethod
@resource_extend.extends([port_def.COLLECTION_NAME_BULK])
def _extend_port_trunk_details_bulk(ports_res, noop):
@ -155,12 +169,6 @@ class TrunkPlugin(service_base.ServicePluginBase):
raise trunk_exc.SegmentationTypeValidatorNotFound(
seg_type=seg_type)
def set_rpc_backend(self, backend):
self._rpc_backend = backend
def is_rpc_enabled(self):
return self._rpc_backend is not None
def register_driver(self, driver):
"""Register driver with trunk plugin."""
if driver.agent_type:

View File

@ -28,10 +28,9 @@ class ServerSideRpcBackend(object):
def __init__(self):
"""Initialize an RPC backend for the Neutron Server."""
self._skeleton = server.TrunkSkeleton()
self._stub = server.TrunkStub()
LOG.debug("RPC backend initialized for trunk plugin")
LOG.debug("RPC notifier initialized for trunk plugin")
for event_type in (events.AFTER_CREATE, events.AFTER_DELETE):
registry.subscribe(self.process_event,

View File

@ -71,7 +71,12 @@ class TrunkSkeleton(object):
self._connection = n_rpc.Connection()
self._connection.create_consumer(
constants.TRUNK_BASE_TOPIC, [self], fanout=False)
self._connection.consume_in_threads()
self._rpc_servers = self._connection.consume_in_threads()
LOG.debug("RPC backend initialized for trunk plugin")
@property
def rpc_servers(self):
return self._rpc_servers
@property
def core_plugin(self):

View File

@ -26,13 +26,13 @@ class TrunkSkeletonTestCase(ml2_test_base.ML2TestFramework):
def setUp(self):
super(TrunkSkeletonTestCase, self).setUp()
self.trunk_plugin = trunk_plugin.TrunkPlugin()
self.trunk_plugin.start_rpc_listeners()
def test__handle_port_binding_set_device_owner(self):
helpers.register_ovs_agent(host=helpers.HOST)
with self.port() as subport:
port = (
self.trunk_plugin.
_rpc_backend._skeleton._handle_port_binding(
self.trunk_plugin._rpc_server._handle_port_binding(
self.context, subport['port']['id'],
mock.ANY, helpers.HOST))
self.assertEqual(