Consume service plugins queues in RPC workers.
This patch adds all RPC workers to consumers of service plugins queues such as metering and l3-plugin. This is important for DVR-enabled deployments with hundreds of agents. Closes-Bug: #1498844 (cherry picked from commit5be613490d
) === Also include the following fix that keeps backwards compatibility with plugins that don't implement start_rpc_listeners method: Check if plugin supports starting rpc listeners When neutron starts an rpc worker, it checks if the plugin has the method "start_rpc_listeners". Since most plugins inherit from a base class, and that base class implements the start_rpc_listeners method and raises NotImplementedError, the rpc worker will attempt to call that method. It should just catch the NotImplementedError and continue on. Change-Id: Ie1830b6140acffffd0f283a0d8eefa52067f7650 Closes-Bug: 1551542 (cherry picked from commitcd7be292a8
) === Change-Id: I6fea7f409c91b25d2c35b038d6100fdfa85d1905
This commit is contained in:
parent
8ad932750b
commit
de5bdc9cc2
|
@ -125,13 +125,20 @@ def start_plugin_workers():
|
|||
|
||||
class RpcWorker(worker.NeutronWorker):
|
||||
"""Wraps a worker to be handled by ProcessLauncher"""
|
||||
def __init__(self, plugin):
|
||||
self._plugin = plugin
|
||||
start_listeners_method = 'start_rpc_listeners'
|
||||
|
||||
def __init__(self, plugins):
|
||||
self._plugins = plugins
|
||||
self._servers = []
|
||||
|
||||
def start(self):
|
||||
super(RpcWorker, self).start()
|
||||
self._servers = self._plugin.start_rpc_listeners()
|
||||
for plugin in self._plugins:
|
||||
if hasattr(plugin, self.start_listeners_method):
|
||||
try:
|
||||
servers = getattr(plugin, self.start_listeners_method)()
|
||||
except NotImplementedError:
|
||||
continue
|
||||
self._servers.extend(servers)
|
||||
|
||||
def wait(self):
|
||||
try:
|
||||
|
@ -164,6 +171,8 @@ class RpcWorker(worker.NeutronWorker):
|
|||
|
||||
def serve_rpc():
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
service_plugins = (
|
||||
manager.NeutronManager.get_service_plugins().values())
|
||||
|
||||
if cfg.CONF.rpc_workers < 1:
|
||||
cfg.CONF.set_override('rpc_workers', 1)
|
||||
|
@ -181,7 +190,8 @@ def serve_rpc():
|
|||
raise NotImplementedError()
|
||||
|
||||
try:
|
||||
rpc = RpcWorker(plugin)
|
||||
# passing service plugins only, because core plugin is among them
|
||||
rpc = RpcWorker(service_plugins)
|
||||
|
||||
# dispose the whole pool before os.fork, otherwise there will
|
||||
# be shared DB connections in child processes which may cause
|
||||
|
|
|
@ -58,7 +58,6 @@ class L3RouterPlugin(service_base.ServicePluginBase,
|
|||
@resource_registry.tracked_resources(router=l3_db.Router,
|
||||
floatingip=l3_db.FloatingIP)
|
||||
def __init__(self):
|
||||
self.setup_rpc()
|
||||
self.router_scheduler = importutils.import_object(
|
||||
cfg.CONF.router_scheduler_driver)
|
||||
self.start_periodic_l3_agent_status_check()
|
||||
|
@ -66,9 +65,10 @@ class L3RouterPlugin(service_base.ServicePluginBase,
|
|||
if 'dvr' in self.supported_extension_aliases:
|
||||
l3_dvrscheduler_db.subscribe()
|
||||
l3_db.subscribe()
|
||||
self.start_rpc_listeners()
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def setup_rpc(self):
|
||||
def start_rpc_listeners(self):
|
||||
# RPC support
|
||||
self.topic = topics.L3PLUGIN
|
||||
self.conn = n_rpc.create_connection(new=True)
|
||||
|
@ -77,7 +77,7 @@ class L3RouterPlugin(service_base.ServicePluginBase,
|
|||
self.endpoints = [l3_rpc.L3RpcCallback()]
|
||||
self.conn.create_consumer(self.topic, self.endpoints,
|
||||
fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
return self.conn.consume_in_threads()
|
||||
|
||||
def get_plugin_type(self):
|
||||
return constants.L3_ROUTER_NAT
|
||||
|
|
|
@ -27,14 +27,15 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
|
|||
def __init__(self):
|
||||
super(MeteringPlugin, self).__init__()
|
||||
|
||||
self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
|
||||
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
|
||||
self.start_rpc_listeners()
|
||||
|
||||
def start_rpc_listeners(self):
|
||||
self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
|
||||
self.conn = n_rpc.create_connection(new=True)
|
||||
self.conn.create_consumer(
|
||||
topics.METERING_PLUGIN, self.endpoints, fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
|
||||
return self.conn.consume_in_threads()
|
||||
|
||||
def create_metering_label(self, context, metering_label):
|
||||
label = super(MeteringPlugin, self).create_metering_label(
|
||||
|
|
Loading…
Reference in New Issue