Cleanup fanout queues on ovs agent stop

Previously when a neutron-openvswitch-agent was stopped it left
behind the following fanout queues in rabbitmq:
neutron-vo-Network-1.0_fanout_someuuid
neutron-vo-Port-1.1_fanout_someuuid
neutron-vo-SecurityGroup-1.0_fanout_someuuid
neutron-vo-SecurityGroupRule-1.0_fanout_someuuid
neutron-vo-SubPort-1.0_fanout_someuuid
neutron-vo-Subnet-1.0_fanout_someuuid
neutron-vo-Trunk-1.1_fanout_someuuid

In this change we ensure that all but the SubPort and Trunk fanout
queues are correctly removed from rabbitmq by cleanly stopping the
RemoteResourceCache when the agent stops.

Partial-Bug: #1586731
Change-Id: I672f9414a1a8ed91e259e9379ca707a70f6b4467
This commit is contained in:
Felix Huettner 2022-09-05 08:34:03 +02:00
parent ead685b938
commit 9ff46546cb
4 changed files with 19 additions and 0 deletions

View File

@ -49,6 +49,9 @@ class RemoteResourceCache(object):
def start_watcher(self):
self._watcher = RemoteResourceWatcher(self)
def stop_watcher(self):
self._watcher.stop()
def get_resource_by_id(self, rtype, obj_id, agent_restarted=False):
"""Returns None if it doesn't exist."""
if obj_id in self._deleted_ids_by_type[rtype]:
@ -263,3 +266,6 @@ class RemoteResourceWatcher(object):
else:
# creates and updates are treated equally
self.rcache.record_resource_update(context, rtype, r)
def stop(self):
self._connection.close()

View File

@ -398,6 +398,9 @@ class CacheBackedPluginApi(PluginApi):
rcache.start_watcher()
self.remote_resource_cache = rcache
def stop(self):
self.remote_resource_cache.stop_watcher()
# TODO(ralonsoh): move this method to neutron_lib.plugins.utils
def migrating_to_host(bindings, host=None):

View File

@ -2850,6 +2850,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
bridge_names=bridge_names,
ovs=self.ovs) as pm:
self.rpc_loop(polling_manager=pm)
if self.plugin_rpc:
self.plugin_rpc.stop()
def _handle_sigterm(self, signum, frame):
self.catch_sigterm = True

View File

@ -2049,9 +2049,13 @@ class TestOvsNeutronAgent(object):
mock.patch.object(self.agent, 'rpc_loop') as mock_loop, \
mock.patch.dict(self.agent.phys_brs, {'physnet0': ex_br_mock},
clear=True), \
mock.patch.object(
self.agent.plugin_rpc,
'stop') as rpc_stop, \
mock.patch.object(self.agent.ovs.ovsdb, 'idl_monitor') as \
mock_idl_monitor:
self.agent.daemon_loop()
rpc_stop.assert_called_once()
mock_get_pm.assert_called_with(
True, ovs_constants.DEFAULT_OVSDBMON_RESPAWN, bridge_names=[],
ovs=self.agent.ovs)
@ -2399,6 +2403,9 @@ class TestOvsNeutronAgent(object):
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'_check_and_handle_signal') as check_and_handle_signal, \
mock.patch.object(
self.agent.plugin_rpc,
'stop') as rpc_stop, \
mock.patch.object(self.agent.ovs.ovsdb, 'idl_monitor'):
process_network_ports.side_effect = Exception("Trigger resync")
check_ovs_status.return_value = ovs_constants.OVS_NORMAL
@ -2406,6 +2413,7 @@ class TestOvsNeutronAgent(object):
self.agent.daemon_loop()
self.assertTrue(update_stale.called)
cleanup.assert_not_called()
rpc_stop.assert_called_once()
def test_set_rpc_timeout(self):
with mock.patch.object(n_rpc.BackingOffClient,