From 9ff46546cb36ab93504ae733f273394d5ccd9be4 Mon Sep 17 00:00:00 2001 From: Felix Huettner Date: Mon, 5 Sep 2022 08:34:03 +0200 Subject: [PATCH] Cleanup fanout queues on ovs agent stop Previously when a neutron-openvswitch-agent was stopped it left behind the following fanout queues in rabbitmq: neutron-vo-Network-1.0_fanout_someuuid neutron-vo-Port-1.1_fanout_someuuid neutron-vo-SecurityGroup-1.0_fanout_someuuid neutron-vo-SecurityGroupRule-1.0_fanout_someuuid neutron-vo-SubPort-1.0_fanout_someuuid neutron-vo-Subnet-1.0_fanout_someuuid neutron-vo-Trunk-1.1_fanout_someuuid In this change we ensure that all but the SubPort and Trunk fanout queues are correctly removed from rabbitmq by cleanly stopping the RemoteResourceCache when the agent stops. Partial-Bug: #1586731 Change-Id: I672f9414a1a8ed91e259e9379ca707a70f6b4467 --- neutron/agent/resource_cache.py | 6 ++++++ neutron/agent/rpc.py | 3 +++ .../ml2/drivers/openvswitch/agent/ovs_neutron_agent.py | 2 ++ .../drivers/openvswitch/agent/test_ovs_neutron_agent.py | 8 ++++++++ 4 files changed, 19 insertions(+) diff --git a/neutron/agent/resource_cache.py b/neutron/agent/resource_cache.py index 10b68f2309e..0a4ea1fb6bf 100644 --- a/neutron/agent/resource_cache.py +++ b/neutron/agent/resource_cache.py @@ -49,6 +49,9 @@ class RemoteResourceCache(object): def start_watcher(self): self._watcher = RemoteResourceWatcher(self) + def stop_watcher(self): + self._watcher.stop() + def get_resource_by_id(self, rtype, obj_id, agent_restarted=False): """Returns None if it doesn't exist.""" if obj_id in self._deleted_ids_by_type[rtype]: @@ -263,3 +266,6 @@ class RemoteResourceWatcher(object): else: # creates and updates are treated equally self.rcache.record_resource_update(context, rtype, r) + + def stop(self): + self._connection.close() diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index dc4c27ab18f..9a133afb072 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -398,6 +398,9 @@ class CacheBackedPluginApi(PluginApi): rcache.start_watcher() self.remote_resource_cache = rcache + def stop(self): + self.remote_resource_cache.stop_watcher() + # TODO(ralonsoh): move this method to neutron_lib.plugins.utils def migrating_to_host(bindings, host=None): diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 1787c3f8537..906a3467d39 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -2850,6 +2850,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, bridge_names=bridge_names, ovs=self.ovs) as pm: self.rpc_loop(polling_manager=pm) + if self.plugin_rpc: + self.plugin_rpc.stop() def _handle_sigterm(self, signum, frame): self.catch_sigterm = True diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 6bf5850be84..06efeabcf42 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -2049,9 +2049,13 @@ class TestOvsNeutronAgent(object): mock.patch.object(self.agent, 'rpc_loop') as mock_loop, \ mock.patch.dict(self.agent.phys_brs, {'physnet0': ex_br_mock}, clear=True), \ + mock.patch.object( + self.agent.plugin_rpc, + 'stop') as rpc_stop, \ mock.patch.object(self.agent.ovs.ovsdb, 'idl_monitor') as \ mock_idl_monitor: self.agent.daemon_loop() + rpc_stop.assert_called_once() mock_get_pm.assert_called_with( True, ovs_constants.DEFAULT_OVSDBMON_RESPAWN, bridge_names=[], ovs=self.agent.ovs) @@ -2399,6 +2403,9 @@ class TestOvsNeutronAgent(object): mock.patch.object( self.mod_agent.OVSNeutronAgent, '_check_and_handle_signal') as check_and_handle_signal, \ + mock.patch.object( + self.agent.plugin_rpc, + 'stop') as rpc_stop, \ mock.patch.object(self.agent.ovs.ovsdb, 'idl_monitor'): process_network_ports.side_effect = Exception("Trigger resync") check_ovs_status.return_value = ovs_constants.OVS_NORMAL @@ -2406,6 +2413,7 @@ class TestOvsNeutronAgent(object): self.agent.daemon_loop() self.assertTrue(update_stale.called) cleanup.assert_not_called() + rpc_stop.assert_called_once() def test_set_rpc_timeout(self): with mock.patch.object(n_rpc.BackingOffClient,