ovs agent resync may miss port remove event
In OVS Agent rpc_loop() resync mechanism clears the registered ports and rescans them again, and it might result in missing some "port removed" event and treat_devices_removed will not be called. This fix rescans the newly updated ports when resync mechanism called, without clearing the current registered ports. The registered ports will be cleared only if there are too many consecutive resyncs to avoid resycing forever because of the same faulty port. Closes-Bug: #1329223 Co-Authored-By: Andrey Epifanov <aepifanov@mirantis.com> Co-Authored-By: Gandharva S <gandharva.s@hp.com> Co-Authored-By: Romil Gupta <romilg@hp.com> Co-Authored-By: Rossella Sblendido <rsblendido@gmail.com> Change-Id: Ib0db9dcf889d9fd90b623857782c9a6b091e18f5
This commit is contained in:
parent
375efc377f
commit
d6d0853be3
@ -100,3 +100,5 @@ EXTENSION_DRIVER_TYPE = 'ovs'
|
||||
# ovs datapath types
|
||||
OVS_DATAPATH_SYSTEM = 'system'
|
||||
OVS_DATAPATH_NETDEV = 'netdev'
|
||||
|
||||
MAX_DEVICE_RETRIES = 5
|
||||
|
@ -1157,22 +1157,26 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
port_moves.append(name)
|
||||
return port_moves
|
||||
|
||||
def _get_port_info(self, registered_ports, cur_ports):
|
||||
def _get_port_info(self, registered_ports, cur_ports,
|
||||
readd_registered_ports):
|
||||
port_info = {'current': cur_ports}
|
||||
# FIXME(salv-orlando): It's not really necessary to return early
|
||||
# if nothing has changed.
|
||||
if cur_ports == registered_ports:
|
||||
# No added or removed ports to set, just return here
|
||||
if not readd_registered_ports and cur_ports == registered_ports:
|
||||
return port_info
|
||||
port_info['added'] = cur_ports - registered_ports
|
||||
# Remove all the known ports not found on the integration bridge
|
||||
|
||||
if readd_registered_ports:
|
||||
port_info['added'] = cur_ports
|
||||
else:
|
||||
port_info['added'] = cur_ports - registered_ports
|
||||
# Update port_info with ports not found on the integration bridge
|
||||
port_info['removed'] = registered_ports - cur_ports
|
||||
return port_info
|
||||
|
||||
def scan_ports(self, registered_ports, updated_ports=None):
|
||||
def scan_ports(self, registered_ports, sync, updated_ports=None):
|
||||
cur_ports = self.int_br.get_vif_port_set()
|
||||
self.int_br_device_count = len(cur_ports)
|
||||
port_info = self._get_port_info(registered_ports, cur_ports)
|
||||
port_info = self._get_port_info(registered_ports, cur_ports, sync)
|
||||
if updated_ports is None:
|
||||
updated_ports = set()
|
||||
updated_ports.update(self.check_changed_vlans())
|
||||
@ -1186,11 +1190,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
port_info['updated'] = updated_ports
|
||||
return port_info
|
||||
|
||||
def scan_ancillary_ports(self, registered_ports):
|
||||
def scan_ancillary_ports(self, registered_ports, sync):
|
||||
cur_ports = set()
|
||||
for bridge in self.ancillary_brs:
|
||||
cur_ports |= bridge.get_vif_port_set()
|
||||
return self._get_port_info(registered_ports, cur_ports)
|
||||
return self._get_port_info(registered_ports, cur_ports, sync)
|
||||
|
||||
def check_changed_vlans(self):
|
||||
"""Return ports which have lost their vlan tag.
|
||||
@ -1621,6 +1625,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
ancillary_ports = set()
|
||||
tunnel_sync = True
|
||||
ovs_restarted = False
|
||||
consecutive_resyncs = 0
|
||||
while self._check_and_handle_signal():
|
||||
port_info = {}
|
||||
ancillary_port_info = {}
|
||||
@ -1629,10 +1634,18 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
self.iter_num)
|
||||
if sync:
|
||||
LOG.info(_LI("Agent out of sync with plugin!"))
|
||||
ports.clear()
|
||||
ancillary_ports.clear()
|
||||
sync = False
|
||||
polling_manager.force_polling()
|
||||
consecutive_resyncs = consecutive_resyncs + 1
|
||||
if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES:
|
||||
LOG.warn(_LW("Clearing cache of registered ports, retrials"
|
||||
" to resync were > %s"),
|
||||
constants.MAX_DEVICE_RETRIES)
|
||||
ports.clear()
|
||||
ancillary_ports.clear()
|
||||
sync = False
|
||||
consecutive_resyncs = 0
|
||||
else:
|
||||
consecutive_resyncs = 0
|
||||
ovs_status = self.check_ovs_status()
|
||||
if ovs_status == constants.OVS_RESTARTED:
|
||||
self.setup_integration_br()
|
||||
@ -1677,7 +1690,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
updated_ports_copy = self.updated_ports
|
||||
self.updated_ports = set()
|
||||
reg_ports = (set() if ovs_restarted else ports)
|
||||
port_info = self.scan_ports(reg_ports, updated_ports_copy)
|
||||
port_info = self.scan_ports(reg_ports, sync,
|
||||
updated_ports_copy)
|
||||
self.process_deleted_ports(port_info)
|
||||
ofport_changed_ports = self.update_stale_ofport_rules()
|
||||
if ofport_changed_ports:
|
||||
@ -1688,16 +1702,16 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
"Elapsed:%(elapsed).3f",
|
||||
{'iter_num': self.iter_num,
|
||||
'elapsed': time.time() - start})
|
||||
|
||||
# Treat ancillary devices if they exist
|
||||
if self.ancillary_brs:
|
||||
ancillary_port_info = self.scan_ancillary_ports(
|
||||
ancillary_ports)
|
||||
ancillary_ports, sync)
|
||||
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
|
||||
"ancillary port info retrieved. "
|
||||
"Elapsed:%(elapsed).3f",
|
||||
{'iter_num': self.iter_num,
|
||||
'elapsed': time.time() - start})
|
||||
sync = False
|
||||
# Secure and wire/unwire VIFs and update their status
|
||||
# on Neutron server
|
||||
if (self._port_info_has_changes(port_info) or
|
||||
|
@ -283,7 +283,7 @@ class TestOvsNeutronAgent(object):
|
||||
self._test_port_dead(self.mod_agent.DEAD_VLAN_TAG)
|
||||
|
||||
def mock_scan_ports(self, vif_port_set=None, registered_ports=None,
|
||||
updated_ports=None, port_tags_dict=None):
|
||||
updated_ports=None, port_tags_dict=None, sync=False):
|
||||
if port_tags_dict is None: # Because empty dicts evaluate as False.
|
||||
port_tags_dict = {}
|
||||
with mock.patch.object(self.agent.int_br,
|
||||
@ -292,7 +292,7 @@ class TestOvsNeutronAgent(object):
|
||||
mock.patch.object(self.agent.int_br,
|
||||
'get_port_tag_dict',
|
||||
return_value=port_tags_dict):
|
||||
return self.agent.scan_ports(registered_ports, updated_ports)
|
||||
return self.agent.scan_ports(registered_ports, sync, updated_ports)
|
||||
|
||||
def test_scan_ports_returns_current_only_for_unchanged_ports(self):
|
||||
vif_port_set = set([1, 3])
|
||||
@ -308,6 +308,15 @@ class TestOvsNeutronAgent(object):
|
||||
actual = self.mock_scan_ports(vif_port_set, registered_ports)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_scan_ports_returns_port_changes_with_sync(self):
|
||||
vif_port_set = set([1, 3])
|
||||
registered_ports = set([1, 2])
|
||||
expected = dict(current=vif_port_set, added=vif_port_set,
|
||||
removed=set([2]))
|
||||
actual = self.mock_scan_ports(vif_port_set, registered_ports,
|
||||
sync=True)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def _test_scan_ports_with_updated_ports(self, updated_ports):
|
||||
vif_port_set = set([1, 3, 4])
|
||||
registered_ports = set([1, 2, 4])
|
||||
@ -1315,8 +1324,8 @@ class TestOvsNeutronAgent(object):
|
||||
pass
|
||||
|
||||
scan_ports.assert_has_calls([
|
||||
mock.call(set(), set()),
|
||||
mock.call(set(), set())
|
||||
mock.call(set(), True, set()),
|
||||
mock.call(set(), False, set())
|
||||
])
|
||||
process_network_ports.assert_has_calls([
|
||||
mock.call(reply2, False),
|
||||
@ -1556,7 +1565,7 @@ class AncillaryBridgesTest(object):
|
||||
self._test_ancillary_bridges(bridges, ['br-ex1', 'br-ex2'])
|
||||
|
||||
def mock_scan_ancillary_ports(self, vif_port_set=None,
|
||||
registered_ports=None):
|
||||
registered_ports=None, sync=False):
|
||||
bridges = ['br-int', 'br-ex']
|
||||
ancillary = ['br-ex']
|
||||
|
||||
@ -1574,7 +1583,7 @@ class AncillaryBridgesTest(object):
|
||||
return_value=vif_port_set):
|
||||
self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(),
|
||||
**self.kwargs)
|
||||
return self.agent.scan_ancillary_ports(registered_ports)
|
||||
return self.agent.scan_ancillary_ports(registered_ports, sync)
|
||||
|
||||
def test_scan_ancillary_ports_returns_cur_only_for_unchanged_ports(self):
|
||||
vif_port_set = set([1, 2])
|
||||
@ -1590,6 +1599,15 @@ class AncillaryBridgesTest(object):
|
||||
actual = self.mock_scan_ancillary_ports(vif_port_set, registered_ports)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_scan_ancillary_ports_returns_port_changes_with_sync(self):
|
||||
vif_port_set = set([1, 3])
|
||||
registered_ports = set([1, 2])
|
||||
expected = dict(current=vif_port_set, added=vif_port_set,
|
||||
removed=set([2]))
|
||||
actual = self.mock_scan_ancillary_ports(vif_port_set, registered_ports,
|
||||
sync=True)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
|
||||
class AncillaryBridgesTestOFCtl(AncillaryBridgesTest,
|
||||
ovs_test_base.OVSOFCtlTestBase):
|
||||
|
@ -554,8 +554,8 @@ class TunnelTest(object):
|
||||
log_exception.assert_called_once_with(
|
||||
"Error while processing VIF ports")
|
||||
scan_ports.assert_has_calls([
|
||||
mock.call(set(), set()),
|
||||
mock.call(set(['tap0']), set())
|
||||
mock.call(set(), True, set()),
|
||||
mock.call(set(['tap0']), False, set())
|
||||
])
|
||||
process_network_ports.assert_has_calls([
|
||||
mock.call({'current': set(['tap0']),
|
||||
|
Loading…
Reference in New Issue
Block a user