diff --git a/neutron/services/trunk/drivers/base.py b/neutron/services/trunk/drivers/base.py index 4ae4802f58f..25793f4c08c 100644 --- a/neutron/services/trunk/drivers/base.py +++ b/neutron/services/trunk/drivers/base.py @@ -19,8 +19,6 @@ from neutron_lib.callbacks import events from neutron_lib.callbacks import registry from neutron_lib.callbacks import resources -from neutron.services.trunk.rpc import backend - @registry.has_registry_receivers class DriverBase(object): @@ -82,7 +80,8 @@ class DriverBase(object): """ trigger.register_driver(self) - # Set up the server-side RPC backend if the driver is loaded, - # it is agent based, and the RPC backend is not already initialized. - if self.is_loaded and self.agent_type and not trigger.is_rpc_enabled(): - trigger.set_rpc_backend(backend.ServerSideRpcBackend()) + + @property + def rpc_required(self): + """True if this driver requires the RPC backend to be started""" + return self.is_loaded and self.agent_type diff --git a/neutron/services/trunk/plugin.py b/neutron/services/trunk/plugin.py index d19515777db..c19f9a18860 100644 --- a/neutron/services/trunk/plugin.py +++ b/neutron/services/trunk/plugin.py @@ -37,6 +37,8 @@ from neutron.objects import base as objects_base from neutron.objects import trunk as trunk_objects from neutron.services.trunk import drivers from neutron.services.trunk import exceptions as trunk_exc +from neutron.services.trunk.rpc import backend +from neutron.services.trunk.rpc import server from neutron.services.trunk import rules from neutron.services.trunk.seg_types import validators @@ -55,7 +57,8 @@ class TrunkPlugin(service_base.ServicePluginBase): __filter_validation_support = True def __init__(self): - self._rpc_backend = None + self._rpc_server = None + self._rpc_notifier = None self._drivers = [] self._segmentation_types = {} self._interfaces = set() @@ -64,6 +67,10 @@ class TrunkPlugin(service_base.ServicePluginBase): registry.subscribe(rules.enforce_port_deletion_rules, resources.PORT, events.BEFORE_DELETE) registry.publish(resources.TRUNK_PLUGIN, events.AFTER_INIT, self) + if any(drv.rpc_required for drv in self._drivers): + # create notifier backend + self._rpc_notifier = backend.ServerSideRpcBackend() + for driver in self._drivers: LOG.debug('Trunk plugin loaded with driver %s', driver.name) self.check_compatibility() @@ -91,6 +98,13 @@ class TrunkPlugin(service_base.ServicePluginBase): return port_res + def start_rpc_listeners(self): + if not any(drv.rpc_required for drv in self._drivers): + return [] + + self._rpc_server = server.TrunkSkeleton() + return self._rpc_server.rpc_servers + @staticmethod @resource_extend.extends([port_def.COLLECTION_NAME_BULK]) def _extend_port_trunk_details_bulk(ports_res, noop): @@ -155,12 +169,6 @@ class TrunkPlugin(service_base.ServicePluginBase): raise trunk_exc.SegmentationTypeValidatorNotFound( seg_type=seg_type) - def set_rpc_backend(self, backend): - self._rpc_backend = backend - - def is_rpc_enabled(self): - return self._rpc_backend is not None - def register_driver(self, driver): """Register driver with trunk plugin.""" if driver.agent_type: diff --git a/neutron/services/trunk/rpc/backend.py b/neutron/services/trunk/rpc/backend.py index f52c0c194ac..18d91edabc7 100644 --- a/neutron/services/trunk/rpc/backend.py +++ b/neutron/services/trunk/rpc/backend.py @@ -28,10 +28,9 @@ class ServerSideRpcBackend(object): def __init__(self): """Initialize an RPC backend for the Neutron Server.""" - self._skeleton = server.TrunkSkeleton() self._stub = server.TrunkStub() - LOG.debug("RPC backend initialized for trunk plugin") + LOG.debug("RPC notifier initialized for trunk plugin") for event_type in (events.AFTER_CREATE, events.AFTER_DELETE): registry.subscribe(self.process_event, diff --git a/neutron/services/trunk/rpc/server.py b/neutron/services/trunk/rpc/server.py index d8e45616df8..be5fd86a896 100644 --- a/neutron/services/trunk/rpc/server.py +++ b/neutron/services/trunk/rpc/server.py @@ -71,7 +71,12 @@ class TrunkSkeleton(object): self._connection = n_rpc.Connection() self._connection.create_consumer( constants.TRUNK_BASE_TOPIC, [self], fanout=False) - self._connection.consume_in_threads() + self._rpc_servers = self._connection.consume_in_threads() + LOG.debug("RPC backend initialized for trunk plugin") + + @property + def rpc_servers(self): + return self._rpc_servers @property def core_plugin(self): diff --git a/neutron/tests/functional/services/trunk/rpc/test_server.py b/neutron/tests/functional/services/trunk/rpc/test_server.py index 40ae1cae703..c9a05fb6348 100644 --- a/neutron/tests/functional/services/trunk/rpc/test_server.py +++ b/neutron/tests/functional/services/trunk/rpc/test_server.py @@ -26,13 +26,13 @@ class TrunkSkeletonTestCase(ml2_test_base.ML2TestFramework): def setUp(self): super(TrunkSkeletonTestCase, self).setUp() self.trunk_plugin = trunk_plugin.TrunkPlugin() + self.trunk_plugin.start_rpc_listeners() def test__handle_port_binding_set_device_owner(self): helpers.register_ovs_agent(host=helpers.HOST) with self.port() as subport: port = ( - self.trunk_plugin. - _rpc_backend._skeleton._handle_port_binding( + self.trunk_plugin._rpc_server._handle_port_binding( self.context, subport['port']['id'], mock.ANY, helpers.HOST)) self.assertEqual(