From ed1ffb194921285fb89d92a35ffbc04cc2610861 Mon Sep 17 00:00:00 2001 From: Renat Nurgaliyev Date: Wed, 18 May 2022 17:38:39 +0200 Subject: [PATCH] Consume BGP service plugin queue in RPC workers This patch adds BGP service plugin RPC queue to RPC workers, like it is done in other Neutron service plugins (l3-plugin, metering, etc.). Without it some RPC requests and AMQP heartbeats are not processed in time, causing AMQP connection dropping, and other unpredictable unwanted behavior. Closes-Bug: #1974057 Change-Id: I1b13f01ca47c8390f1361e01d5eb313fe2fc417f (cherry picked from commit b9c085b85f3f2912d77050a33667f88b8edd6ca0) --- neutron_dynamic_routing/services/bgp/bgp_plugin.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/neutron_dynamic_routing/services/bgp/bgp_plugin.py b/neutron_dynamic_routing/services/bgp/bgp_plugin.py index 20e56321..cdc9be50 100644 --- a/neutron_dynamic_routing/services/bgp/bgp_plugin.py +++ b/neutron_dynamic_routing/services/bgp/bgp_plugin.py @@ -52,7 +52,10 @@ class BgpPlugin(service_base.ServicePluginBase, super(BgpPlugin, self).__init__() self.bgp_drscheduler = importutils.import_object( cfg.CONF.bgp_drscheduler_driver) - self._setup_rpc() + self.agent_notifiers[bgp_consts.AGENT_TYPE_BGP_ROUTING] = ( + bgp_dr_rpc_agent_api.BgpDrAgentNotifyApi() + ) + self._bgp_rpc = self.agent_notifiers[bgp_consts.AGENT_TYPE_BGP_ROUTING] self._register_callbacks() self.add_periodic_dragent_status_check() @@ -64,17 +67,13 @@ class BgpPlugin(service_base.ServicePluginBase, return ("BGP dynamic routing service for announcement of next-hops " "for project networks, floating IP's, and DVR host routes.") - def _setup_rpc(self): + def start_rpc_listeners(self): self.topic = bgp_consts.BGP_PLUGIN self.conn = n_rpc.Connection() - self.agent_notifiers[bgp_consts.AGENT_TYPE_BGP_ROUTING] = ( - bgp_dr_rpc_agent_api.BgpDrAgentNotifyApi() - ) - self._bgp_rpc = self.agent_notifiers[bgp_consts.AGENT_TYPE_BGP_ROUTING] self.endpoints = [bs_rpc.BgpSpeakerRpcCallback()] self.conn.create_consumer(self.topic, self.endpoints, fanout=False) - self.conn.consume_in_threads() + return self.conn.consume_in_threads() def _register_callbacks(self): registry.subscribe(self.floatingip_update_callback,