diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index 4a13b2fef27..877ef2178b7 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -33,6 +33,7 @@ from neutron.agent import rpc as agent_rpc from neutron.agent import securitygroups_rpc as sg_rpc from neutron.common import config as logging_config from neutron.common import constants as q_const +from neutron.common import exceptions from neutron.common import legacy from neutron.common import topics from neutron.common import utils as q_utils @@ -51,6 +52,11 @@ LOG = logging.getLogger(__name__) DEAD_VLAN_TAG = str(q_const.MAX_VLAN_TAG + 1) +class DeviceListRetrievalError(exceptions.NeutronException): + message = _("Unable to retrieve port details for devices: %(devices)s " + "because of error: %(error)s") + + # A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac' # attributes set). class LocalVLANMapping: @@ -1000,29 +1006,32 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.tun_br_ofports[tunnel_type].pop(remote_ip, None) def treat_devices_added_or_updated(self, devices, ovs_restarted): - resync = False + skipped_devices = [] + devices_details_list = [] for device in devices: - LOG.debug(_("Processing port %s"), device) - port = self.int_br.get_vif_port_by_id(device) - if not port: - # The port has disappeared and should not be processed - # There is no need to put the port DOWN in the plugin as - # it never went up in the first place - LOG.info(_("Port %s was not found on the integration bridge " - "and will therefore not be processed"), device) - continue try: # TODO(salv-orlando): Provide bulk API for retrieving # details for all devices in one call - details = self.plugin_rpc.get_device_details(self.context, - device, - self.agent_id) + devices_details_list.append( + self.plugin_rpc.get_device_details( + self.context, device, self.agent_id)) except Exception as e: LOG.debug(_("Unable to get port details for " "%(device)s: %(e)s"), {'device': device, 'e': e}) - resync = True + raise DeviceListRetrievalError(devices=devices, error=e) + + for details in devices_details_list: + device = details['device'] + LOG.debug(_("Processing port %s"), device) + port = self.int_br.get_vif_port_by_id(device) + if not port: + # The port disappeared and cannot be processed + LOG.info(_("Port %s was not found on the integration bridge " + "and will therefore not be processed"), device) + skipped_devices.append(device) continue + if 'port_id' in details: LOG.info(_("Port %(device)s updated. Details: %(details)s"), {'device': device, 'details': details}) @@ -1034,6 +1043,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, details['admin_state_up'], ovs_restarted) # update plugin about port status + # FIXME(salv-orlando): Failures while updating device status + # must be handled appropriately. Otherwise this might prevent + # neutron server from sending network-vif-* events to the nova + # API server, thus possibly preventing instance spawn. if details.get('admin_state_up'): LOG.debug(_("Setting status for %s to UP"), device) self.plugin_rpc.update_device_up( @@ -1047,28 +1060,32 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, LOG.warn(_("Device %s not defined on plugin"), device) if (port and port.ofport != -1): self.port_dead(port) - return resync + return skipped_devices def treat_ancillary_devices_added(self, devices): - resync = False + devices_details_list = [] for device in devices: - LOG.info(_("Ancillary Port %s added"), device) try: - self.plugin_rpc.get_device_details(self.context, device, - self.agent_id) + # TODO(salv-orlando): Provide bulk API for retrieving + # details for all devices in one call + devices_details_list.append( + self.plugin_rpc.get_device_details( + self.context, device, self.agent_id)) except Exception as e: LOG.debug(_("Unable to get port details for " "%(device)s: %(e)s"), {'device': device, 'e': e}) - resync = True - continue + raise DeviceListRetrievalError(devices=devices, error=e) + + for details in devices_details_list: + device = details['device'] + LOG.info(_("Ancillary Port %s added"), device) # update plugin about port status self.plugin_rpc.update_device_up(self.context, device, self.agent_id, cfg.CONF.host) - return resync def treat_devices_removed(self, devices): resync = False @@ -1131,13 +1148,29 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, port_info.get('updated', set())) if devices_added_updated: start = time.time() - resync_a = self.treat_devices_added_or_updated( - devices_added_updated, ovs_restarted) - LOG.debug(_("process_network_ports - iteration:%(iter_num)d -" - "treat_devices_added_or_updated completed " - "in %(elapsed).3f"), - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) + try: + skipped_devices = self.treat_devices_added_or_updated( + devices_added_updated, ovs_restarted) + LOG.debug(_("process_network_ports - iteration:%(iter_num)d -" + "treat_devices_added_or_updated completed. " + "Skipped %(num_skipped)d devices of " + "%(num_current)d devices currently available. " + "Time elapsed: %(elapsed).3f"), + {'iter_num': self.iter_num, + 'num_skipped': len(skipped_devices), + 'num_current': len(port_info['current']), + 'elapsed': time.time() - start}) + # Update the list of current ports storing only those which + # have been actually processed. + port_info['current'] = (port_info['current'] - + set(skipped_devices)) + except DeviceListRetrievalError: + # Need to resync as there was an error with server + # communication. + LOG.exception(_("process_network_ports - iteration:%d - " + "failure while retrieving port details " + "from server"), self.iter_num) + resync_a = True if 'removed' in port_info: start = time.time() resync_b = self.treat_devices_removed(port_info['removed']) @@ -1153,12 +1186,20 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, resync_b = False if 'added' in port_info: start = time.time() - resync_a = self.treat_ancillary_devices_added(port_info['added']) - LOG.debug(_("process_ancillary_network_ports - iteration: " - "%(iter_num)d - treat_ancillary_devices_added " - "completed in %(elapsed).3f"), - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) + try: + self.treat_ancillary_devices_added(port_info['added']) + LOG.debug(_("process_ancillary_network_ports - iteration: " + "%(iter_num)d - treat_ancillary_devices_added " + "completed in %(elapsed).3f"), + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + except DeviceListRetrievalError: + # Need to resync as there was an error with server + # communication. + LOG.exception(_("process_ancillary_network_ports - " + "iteration:%d - failure while retrieving " + "port details from server"), self.iter_num) + resync_a = True if 'removed' in port_info: start = time.time() resync_b = self.treat_ancillary_devices_removed( @@ -1278,7 +1319,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.updated_ports = set() reg_ports = (set() if ovs_restarted else ports) port_info = self.scan_ports(reg_ports, updated_ports_copy) - ports = port_info['current'] LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - " "port information retrieved. " "Elapsed:%(elapsed).3f"), @@ -1304,6 +1344,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, len(port_info.get('updated', []))) port_stats['regular']['removed'] = ( len(port_info.get('removed', []))) + ports = port_info['current'] # Treat ancillary devices if they exist if self.ancillary_brs: port_info = self.update_ancillary_ports( diff --git a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py index a04e52a9ff2..6a66d785fb6 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py @@ -275,14 +275,15 @@ class TestOvsNeutronAgent(base.BaseTestCase): vif_port_set, registered_ports, port_tags_dict=port_tags_dict) self.assertEqual(expected, actual) - def test_treat_devices_added_returns_true_for_missing_device(self): + def test_treat_devices_added_returns_raises_for_missing_device(self): with contextlib.nested( mock.patch.object(self.agent.plugin_rpc, 'get_device_details', side_effect=Exception()), mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value=mock.Mock())): - self.assertTrue(self.agent.treat_devices_added_or_updated([{}], - False)) + self.assertRaises( + ovs_neutron_agent.DeviceListRetrievalError, + self.agent.treat_devices_added_or_updated, [{}], False) def _mock_treat_devices_added_updated(self, details, port, func_name): """Mock treat devices added or updated. @@ -301,8 +302,9 @@ class TestOvsNeutronAgent(base.BaseTestCase): mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), mock.patch.object(self.agent, func_name) ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func): - self.assertFalse(self.agent.treat_devices_added_or_updated([{}], - False)) + skip_devs = self.agent.treat_devices_added_or_updated([{}], False) + # The function should not raise + self.assertFalse(skip_devs) return func.called def test_treat_devices_added_updated_ignores_invalid_ofport(self): @@ -325,12 +327,34 @@ class TestOvsNeutronAgent(base.BaseTestCase): ) as (get_dev_fn, get_vif_func): self.assertFalse(get_dev_fn.called) - def test_treat_devices_added__updated_updates_known_port(self): + def test_treat_devices_added_updated_updates_known_port(self): details = mock.MagicMock() details.__contains__.side_effect = lambda x: True self.assertTrue(self._mock_treat_devices_added_updated( details, mock.Mock(), 'treat_vif_port')) + def test_treat_devices_added_updated_skips_if_port_not_found(self): + dev_mock = mock.MagicMock() + dev_mock.__getitem__.return_value = 'the_skipped_one' + with contextlib.nested( + mock.patch.object(self.agent.plugin_rpc, + 'get_device_details', + return_value=dev_mock), + mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', + return_value=None), + mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), + mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), + mock.patch.object(self.agent, 'treat_vif_port') + ) as (get_dev_fn, get_vif_func, upd_dev_up, + upd_dev_down, treat_vif_port): + skip_devs = self.agent.treat_devices_added_or_updated([{}], False) + # The function should return False for resync and no device + # processed + self.assertEqual(['the_skipped_one'], skip_devs) + self.assertFalse(treat_vif_port.called) + self.assertFalse(upd_dev_down.called) + self.assertFalse(upd_dev_up.called) + def test_treat_devices_added_updated_put_port_down(self): fake_details_dict = {'admin_state_up': False, 'port_id': 'xxx', @@ -349,8 +373,9 @@ class TestOvsNeutronAgent(base.BaseTestCase): mock.patch.object(self.agent, 'treat_vif_port') ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, treat_vif_port): - self.assertFalse(self.agent.treat_devices_added_or_updated([{}], - False)) + skip_devs = self.agent.treat_devices_added_or_updated([{}], False) + # The function should return False for resync + self.assertFalse(skip_devs) self.assertTrue(treat_vif_port.called) self.assertTrue(upd_dev_down.called) @@ -377,7 +402,7 @@ class TestOvsNeutronAgent(base.BaseTestCase): with contextlib.nested( mock.patch.object(self.agent.sg_agent, "setup_port_filters"), mock.patch.object(self.agent, "treat_devices_added_or_updated", - return_value=False), + return_value=[]), mock.patch.object(self.agent, "treat_devices_removed", return_value=False) ) as (setup_port_filters, device_added_updated, device_removed):