Consume service plugins queues in RPC workers.

This patch adds all RPC workers to consumers of service
plugins queues such as metering and l3-plugin.
This is important for DVR-enabled deployments with hundreds
of agents.

Change-Id: I6fea7f409c91b25d2c35b038d6100fdfa85d1905
Closes-Bug: #1498844
This commit is contained in:
Eugene Nikanorov 2015-09-23 14:06:54 +04:00
parent 47449bdf7d
commit 5be613490d
3 changed files with 20 additions and 12 deletions

View File

@ -125,13 +125,17 @@ def start_plugin_workers():
class RpcWorker(worker.NeutronWorker): class RpcWorker(worker.NeutronWorker):
"""Wraps a worker to be handled by ProcessLauncher""" """Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, plugin): start_listeners_method = 'start_rpc_listeners'
self._plugin = plugin
def __init__(self, plugins):
self._plugins = plugins
self._servers = [] self._servers = []
def start(self): def start(self):
super(RpcWorker, self).start() for plugin in self._plugins:
self._servers = self._plugin.start_rpc_listeners() if hasattr(plugin, self.start_listeners_method):
servers = getattr(plugin, self.start_listeners_method)()
self._servers.extend(servers)
def wait(self): def wait(self):
try: try:
@ -164,6 +168,8 @@ class RpcWorker(worker.NeutronWorker):
def serve_rpc(): def serve_rpc():
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
service_plugins = (
manager.NeutronManager.get_service_plugins().values())
if cfg.CONF.rpc_workers < 1: if cfg.CONF.rpc_workers < 1:
cfg.CONF.set_override('rpc_workers', 1) cfg.CONF.set_override('rpc_workers', 1)
@ -181,7 +187,8 @@ def serve_rpc():
raise NotImplementedError() raise NotImplementedError()
try: try:
rpc = RpcWorker(plugin) # passing service plugins only, because core plugin is among them
rpc = RpcWorker(service_plugins)
# dispose the whole pool before os.fork, otherwise there will # dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause # be shared DB connections in child processes which may cause

View File

@ -58,7 +58,6 @@ class L3RouterPlugin(service_base.ServicePluginBase,
@resource_registry.tracked_resources(router=l3_db.Router, @resource_registry.tracked_resources(router=l3_db.Router,
floatingip=l3_db.FloatingIP) floatingip=l3_db.FloatingIP)
def __init__(self): def __init__(self):
self.setup_rpc()
self.router_scheduler = importutils.import_object( self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver) cfg.CONF.router_scheduler_driver)
self.start_periodic_l3_agent_status_check() self.start_periodic_l3_agent_status_check()
@ -66,9 +65,10 @@ class L3RouterPlugin(service_base.ServicePluginBase,
if 'dvr' in self.supported_extension_aliases: if 'dvr' in self.supported_extension_aliases:
l3_dvrscheduler_db.subscribe() l3_dvrscheduler_db.subscribe()
l3_db.subscribe() l3_db.subscribe()
self.start_rpc_listeners()
@log_helpers.log_method_call @log_helpers.log_method_call
def setup_rpc(self): def start_rpc_listeners(self):
# RPC support # RPC support
self.topic = topics.L3PLUGIN self.topic = topics.L3PLUGIN
self.conn = n_rpc.create_connection(new=True) self.conn = n_rpc.create_connection(new=True)
@ -77,7 +77,7 @@ class L3RouterPlugin(service_base.ServicePluginBase,
self.endpoints = [l3_rpc.L3RpcCallback()] self.endpoints = [l3_rpc.L3RpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints, self.conn.create_consumer(self.topic, self.endpoints,
fanout=False) fanout=False)
self.conn.consume_in_threads() return self.conn.consume_in_threads()
def get_plugin_type(self): def get_plugin_type(self):
return constants.L3_ROUTER_NAT return constants.L3_ROUTER_NAT

View File

@ -27,14 +27,15 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
def __init__(self): def __init__(self):
super(MeteringPlugin, self).__init__() super(MeteringPlugin, self).__init__()
self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)] self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
self.start_rpc_listeners()
def start_rpc_listeners(self):
self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
self.conn = n_rpc.create_connection(new=True) self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer( self.conn.create_consumer(
topics.METERING_PLUGIN, self.endpoints, fanout=False) topics.METERING_PLUGIN, self.endpoints, fanout=False)
self.conn.consume_in_threads() return self.conn.consume_in_threads()
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
def create_metering_label(self, context, metering_label): def create_metering_label(self, context, metering_label):
label = super(MeteringPlugin, self).create_metering_label( label = super(MeteringPlugin, self).create_metering_label(