Consume BGP service plugin queue in RPC workers

This patch adds BGP service plugin RPC queue to RPC workers, like it is
done in other Neutron service plugins (l3-plugin, metering, etc.).
Without it some RPC requests and AMQP heartbeats are not processed in
time, causing AMQP connection dropping, and other unpredictable unwanted
behavior.

Closes-Bug: #1974057
Change-Id: I1b13f01ca47c8390f1361e01d5eb313fe2fc417f
(cherry picked from commit b9c085b85f)
This commit is contained in:
Renat Nurgaliyev 2022-05-18 17:38:39 +02:00 committed by Dr. Jens Harbott
parent c5c8612375
commit ed1ffb1949

View File

@ -52,7 +52,10 @@ class BgpPlugin(service_base.ServicePluginBase,
super(BgpPlugin, self).__init__() super(BgpPlugin, self).__init__()
self.bgp_drscheduler = importutils.import_object( self.bgp_drscheduler = importutils.import_object(
cfg.CONF.bgp_drscheduler_driver) cfg.CONF.bgp_drscheduler_driver)
self._setup_rpc() self.agent_notifiers[bgp_consts.AGENT_TYPE_BGP_ROUTING] = (
bgp_dr_rpc_agent_api.BgpDrAgentNotifyApi()
)
self._bgp_rpc = self.agent_notifiers[bgp_consts.AGENT_TYPE_BGP_ROUTING]
self._register_callbacks() self._register_callbacks()
self.add_periodic_dragent_status_check() self.add_periodic_dragent_status_check()
@ -64,17 +67,13 @@ class BgpPlugin(service_base.ServicePluginBase,
return ("BGP dynamic routing service for announcement of next-hops " return ("BGP dynamic routing service for announcement of next-hops "
"for project networks, floating IP's, and DVR host routes.") "for project networks, floating IP's, and DVR host routes.")
def _setup_rpc(self): def start_rpc_listeners(self):
self.topic = bgp_consts.BGP_PLUGIN self.topic = bgp_consts.BGP_PLUGIN
self.conn = n_rpc.Connection() self.conn = n_rpc.Connection()
self.agent_notifiers[bgp_consts.AGENT_TYPE_BGP_ROUTING] = (
bgp_dr_rpc_agent_api.BgpDrAgentNotifyApi()
)
self._bgp_rpc = self.agent_notifiers[bgp_consts.AGENT_TYPE_BGP_ROUTING]
self.endpoints = [bs_rpc.BgpSpeakerRpcCallback()] self.endpoints = [bs_rpc.BgpSpeakerRpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints, self.conn.create_consumer(self.topic, self.endpoints,
fanout=False) fanout=False)
self.conn.consume_in_threads() return self.conn.consume_in_threads()
def _register_callbacks(self): def _register_callbacks(self):
registry.subscribe(self.floatingip_update_callback, registry.subscribe(self.floatingip_update_callback,