Start trunk plugin RPC via service framework
Instead of each individual driver setting up the RPC server (and setting the _rpc_backend attribute on the TrunkPlugin) we now check in the TrunkPlugin if any driver requires the RPC backend to be started. Additionally, we only start it when this is requested by Neutron via start_rpc_listeners(). This is required when running neutron-server and neutron-rpc-server separately to run RPC only in neutron-rpc-server. As we still need the notifiers of ServerSideRpcBackend to be created/started, we separate TrunkSkeleton (which is the RPC server implementation) and ServerSideRpcBackend (which is essentially only a notifier). In case RPC is required by a driver, we always start the notifier, but the RPC server only when requested via start_rpc_listeners(). Change-Id: I2c6362b3320e534a6e65bd7701b5ac2feca42a49 Closes-Bug: #2015275 Closes-Bug: #2062009
This commit is contained in:
parent
f2342e0d17
commit
ffcaeda32a
@ -19,8 +19,6 @@ from neutron_lib.callbacks import events
|
||||
from neutron_lib.callbacks import registry
|
||||
from neutron_lib.callbacks import resources
|
||||
|
||||
from neutron.services.trunk.rpc import backend
|
||||
|
||||
|
||||
@registry.has_registry_receivers
|
||||
class DriverBase(object):
|
||||
@ -82,7 +80,8 @@ class DriverBase(object):
|
||||
"""
|
||||
|
||||
trigger.register_driver(self)
|
||||
# Set up the server-side RPC backend if the driver is loaded,
|
||||
# it is agent based, and the RPC backend is not already initialized.
|
||||
if self.is_loaded and self.agent_type and not trigger.is_rpc_enabled():
|
||||
trigger.set_rpc_backend(backend.ServerSideRpcBackend())
|
||||
|
||||
@property
|
||||
def rpc_required(self):
|
||||
"""True if this driver requires the RPC backend to be started"""
|
||||
return self.is_loaded and self.agent_type
|
||||
|
@ -37,6 +37,8 @@ from neutron.objects import base as objects_base
|
||||
from neutron.objects import trunk as trunk_objects
|
||||
from neutron.services.trunk import drivers
|
||||
from neutron.services.trunk import exceptions as trunk_exc
|
||||
from neutron.services.trunk.rpc import backend
|
||||
from neutron.services.trunk.rpc import server
|
||||
from neutron.services.trunk import rules
|
||||
from neutron.services.trunk.seg_types import validators
|
||||
|
||||
@ -55,7 +57,8 @@ class TrunkPlugin(service_base.ServicePluginBase):
|
||||
__filter_validation_support = True
|
||||
|
||||
def __init__(self):
|
||||
self._rpc_backend = None
|
||||
self._rpc_server = None
|
||||
self._rpc_notifier = None
|
||||
self._drivers = []
|
||||
self._segmentation_types = {}
|
||||
self._interfaces = set()
|
||||
@ -64,6 +67,10 @@ class TrunkPlugin(service_base.ServicePluginBase):
|
||||
registry.subscribe(rules.enforce_port_deletion_rules,
|
||||
resources.PORT, events.BEFORE_DELETE)
|
||||
registry.publish(resources.TRUNK_PLUGIN, events.AFTER_INIT, self)
|
||||
if any(drv.rpc_required for drv in self._drivers):
|
||||
# create notifier backend
|
||||
self._rpc_notifier = backend.ServerSideRpcBackend()
|
||||
|
||||
for driver in self._drivers:
|
||||
LOG.debug('Trunk plugin loaded with driver %s', driver.name)
|
||||
self.check_compatibility()
|
||||
@ -91,6 +98,13 @@ class TrunkPlugin(service_base.ServicePluginBase):
|
||||
|
||||
return port_res
|
||||
|
||||
def start_rpc_listeners(self):
|
||||
if not any(drv.rpc_required for drv in self._drivers):
|
||||
return []
|
||||
|
||||
self._rpc_server = server.TrunkSkeleton()
|
||||
return self._rpc_server.rpc_servers
|
||||
|
||||
@staticmethod
|
||||
@resource_extend.extends([port_def.COLLECTION_NAME_BULK])
|
||||
def _extend_port_trunk_details_bulk(ports_res, noop):
|
||||
@ -155,12 +169,6 @@ class TrunkPlugin(service_base.ServicePluginBase):
|
||||
raise trunk_exc.SegmentationTypeValidatorNotFound(
|
||||
seg_type=seg_type)
|
||||
|
||||
def set_rpc_backend(self, backend):
|
||||
self._rpc_backend = backend
|
||||
|
||||
def is_rpc_enabled(self):
|
||||
return self._rpc_backend is not None
|
||||
|
||||
def register_driver(self, driver):
|
||||
"""Register driver with trunk plugin."""
|
||||
if driver.agent_type:
|
||||
|
@ -28,10 +28,9 @@ class ServerSideRpcBackend(object):
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize an RPC backend for the Neutron Server."""
|
||||
self._skeleton = server.TrunkSkeleton()
|
||||
self._stub = server.TrunkStub()
|
||||
|
||||
LOG.debug("RPC backend initialized for trunk plugin")
|
||||
LOG.debug("RPC notifier initialized for trunk plugin")
|
||||
|
||||
for event_type in (events.AFTER_CREATE, events.AFTER_DELETE):
|
||||
registry.subscribe(self.process_event,
|
||||
|
@ -71,7 +71,12 @@ class TrunkSkeleton(object):
|
||||
self._connection = n_rpc.Connection()
|
||||
self._connection.create_consumer(
|
||||
constants.TRUNK_BASE_TOPIC, [self], fanout=False)
|
||||
self._connection.consume_in_threads()
|
||||
self._rpc_servers = self._connection.consume_in_threads()
|
||||
LOG.debug("RPC backend initialized for trunk plugin")
|
||||
|
||||
@property
|
||||
def rpc_servers(self):
|
||||
return self._rpc_servers
|
||||
|
||||
@property
|
||||
def core_plugin(self):
|
||||
|
@ -26,13 +26,13 @@ class TrunkSkeletonTestCase(ml2_test_base.ML2TestFramework):
|
||||
def setUp(self):
|
||||
super(TrunkSkeletonTestCase, self).setUp()
|
||||
self.trunk_plugin = trunk_plugin.TrunkPlugin()
|
||||
self.trunk_plugin.start_rpc_listeners()
|
||||
|
||||
def test__handle_port_binding_set_device_owner(self):
|
||||
helpers.register_ovs_agent(host=helpers.HOST)
|
||||
with self.port() as subport:
|
||||
port = (
|
||||
self.trunk_plugin.
|
||||
_rpc_backend._skeleton._handle_port_binding(
|
||||
self.trunk_plugin._rpc_server._handle_port_binding(
|
||||
self.context, subport['port']['id'],
|
||||
mock.ANY, helpers.HOST))
|
||||
self.assertEqual(
|
||||
|
Loading…
x
Reference in New Issue
Block a user