Consume LBaaS v2 plugin queue in RPC workers

This patch adds all RPC workers to consumers of LBaaS v2 plugin
queue.
This is important for DVR-enabled deployments with hundreds
of agents.

Change-Id: Iaec2f623cbbba99d17dc15802ed70603db37e0fb
Related-Bug: #1498844
This commit is contained in:
Elena Ezhova 2015-09-29 17:18:51 +03:00
parent 6c52cf18a3
commit 746b1d5516
2 changed files with 22 additions and 12 deletions

View File

@ -318,7 +318,13 @@ class AgentDriverBase(driver_base.LoadBalancerBaseDriver):
self.agent_rpc = LoadBalancerAgentApi(lb_const.LOADBALANCER_AGENTV2)
self._set_callbacks_on_plugin()
self.agent_endpoints = [
agent_callbacks.LoadBalancerCallbacks(self.plugin),
agents_db.AgentExtRpcCallback(self.plugin.db)
]
self.conn = None
# Setting this on the db because the plugin no longer inherts from
# database classes, the db does.
self.plugin.db.agent_notifiers.update(
@ -329,21 +335,16 @@ class AgentDriverBase(driver_base.LoadBalancerBaseDriver):
self.loadbalancer_scheduler = importutils.import_object(
lb_sched_driver)
def _set_callbacks_on_plugin(self):
def start_rpc_listeners(self):
# other agent based plugin driver might already set callbacks on plugin
if hasattr(self.plugin, 'agent_callbacks'):
return
self.plugin.agent_endpoints = [
agent_callbacks.LoadBalancerCallbacks(self.plugin),
agents_db.AgentExtRpcCallback(self.plugin.db)
]
self.plugin.conn = n_rpc.create_connection()
self.plugin.conn.create_consumer(
lb_const.LOADBALANCER_PLUGINV2,
self.plugin.agent_endpoints,
fanout=False)
self.plugin.conn.consume_in_threads()
self.conn = n_rpc.create_connection()
self.conn.create_consumer(lb_const.LOADBALANCER_PLUGINV2,
self.agent_endpoints,
fanout=False)
return self.conn.consume_in_threads()
def get_loadbalancer_agent(self, context, loadbalancer_id):
agent = self.plugin.db.get_agent_hosting_loadbalancer(

View File

@ -393,8 +393,17 @@ class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2):
add_provider_configuration(
self.service_type_manager, constants.LOADBALANCERV2)
self._load_drivers()
self.start_rpc_listeners()
self.db.subscribe()
def start_rpc_listeners(self):
listeners = []
for driver in self.drivers.values():
if hasattr(driver, 'start_rpc_listeners'):
listener = driver.start_rpc_listeners()
listeners.append(listener)
return listeners
def _load_drivers(self):
"""Loads plugin-drivers specified in configuration."""
self.drivers, self.default_provider = service_base.load_drivers(