diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index 6ee023f037..35fb9fb221 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -35,6 +35,7 @@ from neutron.agent import rpc as agent_rpc from neutron.agent import securitygroups_rpc as sg_rpc from neutron.common import config as common_config from neutron.common import constants as q_const +from neutron.common import exceptions from neutron.common import rpc as n_rpc from neutron.common import topics from neutron.common import utils as q_utils @@ -52,6 +53,11 @@ LOG = logging.getLogger(__name__) DEAD_VLAN_TAG = str(q_const.MAX_VLAN_TAG + 1) +class DeviceListRetrievalError(exceptions.NeutronException): + message = _("Unable to retrieve port details for devices: %(devices)s " + "because of error: %(error)s") + + # A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac' # attributes set). class LocalVLANMapping: @@ -1106,26 +1112,23 @@ class OVSNeutronAgent(n_rpc.RpcCallback, self.tun_br_ofports[tunnel_type].pop(remote_ip, None) def treat_devices_added_or_updated(self, devices, ovs_restarted): + skipped_devices = [] try: devices_details_list = self.plugin_rpc.get_devices_details_list( self.context, devices, self.agent_id) except Exception as e: - LOG.debug("Unable to get port details for %(devices)s: %(e)s", - {'devices': devices, 'e': e}) - # resync is needed - return True + raise DeviceListRetrievalError(devices=devices, error=e) for details in devices_details_list: device = details['device'] LOG.debug("Processing port: %s", device) port = self.int_br.get_vif_port_by_id(device) if not port: - # The port has disappeared and should not be processed - # There is no need to put the port DOWN in the plugin as - # it never went up in the first place + # The port disappeared and cannot be processed LOG.info(_("Port %s was not found on the integration bridge " "and will therefore not be processed"), device) + skipped_devices.append(device) continue if 'port_id' in details: @@ -1139,6 +1142,10 @@ class OVSNeutronAgent(n_rpc.RpcCallback, details['admin_state_up'], ovs_restarted) # update plugin about port status + # FIXME(salv-orlando): Failures while updating device status + # must be handled appropriately. Otherwise this might prevent + # neutron server from sending network-vif-* events to the nova + # API server, thus possibly preventing instance spawn. if details.get('admin_state_up'): LOG.debug(_("Setting status for %s to UP"), device) self.plugin_rpc.update_device_up( @@ -1152,7 +1159,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback, LOG.warn(_("Device %s not defined on plugin"), device) if (port and port.ofport != -1): self.port_dead(port) - return False + return skipped_devices def treat_ancillary_devices_added(self, devices): try: @@ -1161,10 +1168,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback, devices, self.agent_id) except Exception as e: - LOG.debug("Unable to get port details for " - "%(devices)s: %(e)s", {'devices': devices, 'e': e}) - # resync is needed - return True + raise DeviceListRetrievalError(devices=devices, error=e) for details in devices_details_list: device = details['device'] @@ -1175,7 +1179,6 @@ class OVSNeutronAgent(n_rpc.RpcCallback, device, self.agent_id, cfg.CONF.host) - return False def treat_devices_removed(self, devices): resync = False @@ -1238,13 +1241,29 @@ class OVSNeutronAgent(n_rpc.RpcCallback, port_info.get('updated', set())) if devices_added_updated: start = time.time() - resync_a = self.treat_devices_added_or_updated( - devices_added_updated, ovs_restarted) - LOG.debug(_("process_network_ports - iteration:%(iter_num)d -" - "treat_devices_added_or_updated completed " - "in %(elapsed).3f"), - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) + try: + skipped_devices = self.treat_devices_added_or_updated( + devices_added_updated, ovs_restarted) + LOG.debug(_("process_network_ports - iteration:%(iter_num)d -" + "treat_devices_added_or_updated completed. " + "Skipped %(num_skipped)d devices of " + "%(num_current)d devices currently available. " + "Time elapsed: %(elapsed).3f"), + {'iter_num': self.iter_num, + 'num_skipped': len(skipped_devices), + 'num_current': len(port_info['current']), + 'elapsed': time.time() - start}) + # Update the list of current ports storing only those which + # have been actually processed. + port_info['current'] = (port_info['current'] - + set(skipped_devices)) + except DeviceListRetrievalError: + # Need to resync as there was an error with server + # communication. + LOG.exception(_("process_network_ports - iteration:%d - " + "failure while retrieving port details " + "from server"), self.iter_num) + resync_a = True if 'removed' in port_info: start = time.time() resync_b = self.treat_devices_removed(port_info['removed']) @@ -1260,12 +1279,20 @@ class OVSNeutronAgent(n_rpc.RpcCallback, resync_b = False if 'added' in port_info: start = time.time() - resync_a = self.treat_ancillary_devices_added(port_info['added']) - LOG.debug(_("process_ancillary_network_ports - iteration: " - "%(iter_num)d - treat_ancillary_devices_added " - "completed in %(elapsed).3f"), - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) + try: + self.treat_ancillary_devices_added(port_info['added']) + LOG.debug(_("process_ancillary_network_ports - iteration: " + "%(iter_num)d - treat_ancillary_devices_added " + "completed in %(elapsed).3f"), + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + except DeviceListRetrievalError: + # Need to resync as there was an error with server + # communication. + LOG.exception(_("process_ancillary_network_ports - " + "iteration:%d - failure while retrieving " + "port details from server"), self.iter_num) + resync_a = True if 'removed' in port_info: start = time.time() resync_b = self.treat_ancillary_devices_removed( @@ -1386,7 +1413,6 @@ class OVSNeutronAgent(n_rpc.RpcCallback, self.updated_ports = set() reg_ports = (set() if ovs_restarted else ports) port_info = self.scan_ports(reg_ports, updated_ports_copy) - ports = port_info['current'] LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - " "port information retrieved. " "Elapsed:%(elapsed).3f"), @@ -1412,6 +1438,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback, len(port_info.get('updated', []))) port_stats['regular']['removed'] = ( len(port_info.get('removed', []))) + ports = port_info['current'] # Treat ancillary devices if they exist if self.ancillary_brs: port_info = self.update_ancillary_ports( diff --git a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py index 446e1d9a1a..498046c8df 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py @@ -283,15 +283,16 @@ class TestOvsNeutronAgent(base.BaseTestCase): vif_port_set, registered_ports, port_tags_dict=port_tags_dict) self.assertEqual(expected, actual) - def test_treat_devices_added_returns_true_for_missing_device(self): + def test_treat_devices_added_returns_raises_for_missing_device(self): with contextlib.nested( mock.patch.object(self.agent.plugin_rpc, 'get_devices_details_list', side_effect=Exception()), mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value=mock.Mock())): - self.assertTrue(self.agent.treat_devices_added_or_updated([{}], - False)) + self.assertRaises( + ovs_neutron_agent.DeviceListRetrievalError, + self.agent.treat_devices_added_or_updated, [{}], False) def _mock_treat_devices_added_updated(self, details, port, func_name): """Mock treat devices added or updated. @@ -311,8 +312,9 @@ class TestOvsNeutronAgent(base.BaseTestCase): mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), mock.patch.object(self.agent, func_name) ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func): - self.assertFalse(self.agent.treat_devices_added_or_updated([{}], - False)) + skip_devs = self.agent.treat_devices_added_or_updated([{}], False) + # The function should not raise + self.assertFalse(skip_devs) return func.called def test_treat_devices_added_updated_ignores_invalid_ofport(self): @@ -335,12 +337,34 @@ class TestOvsNeutronAgent(base.BaseTestCase): ) as (get_dev_fn, get_vif_func): self.assertFalse(get_dev_fn.called) - def test_treat_devices_added__updated_updates_known_port(self): + def test_treat_devices_added_updated_updates_known_port(self): details = mock.MagicMock() details.__contains__.side_effect = lambda x: True self.assertTrue(self._mock_treat_devices_added_updated( details, mock.Mock(), 'treat_vif_port')) + def test_treat_devices_added_updated_skips_if_port_not_found(self): + dev_mock = mock.MagicMock() + dev_mock.__getitem__.return_value = 'the_skipped_one' + with contextlib.nested( + mock.patch.object(self.agent.plugin_rpc, + 'get_devices_details_list', + return_value=[dev_mock]), + mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', + return_value=None), + mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), + mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), + mock.patch.object(self.agent, 'treat_vif_port') + ) as (get_dev_fn, get_vif_func, upd_dev_up, + upd_dev_down, treat_vif_port): + skip_devs = self.agent.treat_devices_added_or_updated([{}], False) + # The function should return False for resync and no device + # processed + self.assertEqual(['the_skipped_one'], skip_devs) + self.assertFalse(treat_vif_port.called) + self.assertFalse(upd_dev_down.called) + self.assertFalse(upd_dev_up.called) + def test_treat_devices_added_updated_put_port_down(self): fake_details_dict = {'admin_state_up': False, 'port_id': 'xxx', @@ -360,8 +384,9 @@ class TestOvsNeutronAgent(base.BaseTestCase): mock.patch.object(self.agent, 'treat_vif_port') ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, treat_vif_port): - self.assertFalse(self.agent.treat_devices_added_or_updated([{}], - False)) + skip_devs = self.agent.treat_devices_added_or_updated([{}], False) + # The function should return False for resync + self.assertFalse(skip_devs) self.assertTrue(treat_vif_port.called) self.assertTrue(upd_dev_down.called) @@ -388,7 +413,7 @@ class TestOvsNeutronAgent(base.BaseTestCase): with contextlib.nested( mock.patch.object(self.agent.sg_agent, "setup_port_filters"), mock.patch.object(self.agent, "treat_devices_added_or_updated", - return_value=False), + return_value=[]), mock.patch.object(self.agent, "treat_devices_removed", return_value=False) ) as (setup_port_filters, device_added_updated, device_removed):