Merge "Consume LBaaS v2 plugin queue in RPC workers"
This commit is contained in:
commit
9401c5771f
@ -318,7 +318,13 @@ class AgentDriverBase(driver_base.LoadBalancerBaseDriver):
|
|||||||
|
|
||||||
self.agent_rpc = LoadBalancerAgentApi(lb_const.LOADBALANCER_AGENTV2)
|
self.agent_rpc = LoadBalancerAgentApi(lb_const.LOADBALANCER_AGENTV2)
|
||||||
|
|
||||||
self._set_callbacks_on_plugin()
|
self.agent_endpoints = [
|
||||||
|
agent_callbacks.LoadBalancerCallbacks(self.plugin),
|
||||||
|
agents_db.AgentExtRpcCallback(self.plugin.db)
|
||||||
|
]
|
||||||
|
|
||||||
|
self.conn = None
|
||||||
|
|
||||||
# Setting this on the db because the plugin no longer inherts from
|
# Setting this on the db because the plugin no longer inherts from
|
||||||
# database classes, the db does.
|
# database classes, the db does.
|
||||||
self.plugin.db.agent_notifiers.update(
|
self.plugin.db.agent_notifiers.update(
|
||||||
@ -329,21 +335,16 @@ class AgentDriverBase(driver_base.LoadBalancerBaseDriver):
|
|||||||
self.loadbalancer_scheduler = importutils.import_object(
|
self.loadbalancer_scheduler = importutils.import_object(
|
||||||
lb_sched_driver)
|
lb_sched_driver)
|
||||||
|
|
||||||
def _set_callbacks_on_plugin(self):
|
def start_rpc_listeners(self):
|
||||||
# other agent based plugin driver might already set callbacks on plugin
|
# other agent based plugin driver might already set callbacks on plugin
|
||||||
if hasattr(self.plugin, 'agent_callbacks'):
|
if hasattr(self.plugin, 'agent_callbacks'):
|
||||||
return
|
return
|
||||||
|
|
||||||
self.plugin.agent_endpoints = [
|
self.conn = n_rpc.create_connection()
|
||||||
agent_callbacks.LoadBalancerCallbacks(self.plugin),
|
self.conn.create_consumer(lb_const.LOADBALANCER_PLUGINV2,
|
||||||
agents_db.AgentExtRpcCallback(self.plugin.db)
|
self.agent_endpoints,
|
||||||
]
|
fanout=False)
|
||||||
self.plugin.conn = n_rpc.create_connection()
|
return self.conn.consume_in_threads()
|
||||||
self.plugin.conn.create_consumer(
|
|
||||||
lb_const.LOADBALANCER_PLUGINV2,
|
|
||||||
self.plugin.agent_endpoints,
|
|
||||||
fanout=False)
|
|
||||||
self.plugin.conn.consume_in_threads()
|
|
||||||
|
|
||||||
def get_loadbalancer_agent(self, context, loadbalancer_id):
|
def get_loadbalancer_agent(self, context, loadbalancer_id):
|
||||||
agent = self.plugin.db.get_agent_hosting_loadbalancer(
|
agent = self.plugin.db.get_agent_hosting_loadbalancer(
|
||||||
|
@ -396,8 +396,17 @@ class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2):
|
|||||||
add_provider_configuration(
|
add_provider_configuration(
|
||||||
self.service_type_manager, constants.LOADBALANCERV2)
|
self.service_type_manager, constants.LOADBALANCERV2)
|
||||||
self._load_drivers()
|
self._load_drivers()
|
||||||
|
self.start_rpc_listeners()
|
||||||
self.db.subscribe()
|
self.db.subscribe()
|
||||||
|
|
||||||
|
def start_rpc_listeners(self):
|
||||||
|
listeners = []
|
||||||
|
for driver in self.drivers.values():
|
||||||
|
if hasattr(driver, 'start_rpc_listeners'):
|
||||||
|
listener = driver.start_rpc_listeners()
|
||||||
|
listeners.append(listener)
|
||||||
|
return listeners
|
||||||
|
|
||||||
def _load_drivers(self):
|
def _load_drivers(self):
|
||||||
"""Loads plugin-drivers specified in configuration."""
|
"""Loads plugin-drivers specified in configuration."""
|
||||||
self.drivers, self.default_provider = service_base.load_drivers(
|
self.drivers, self.default_provider = service_base.load_drivers(
|
||||||
|
Loading…
Reference in New Issue
Block a user