diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index 38fa4de1274..a920ba16bbb 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -95,6 +95,8 @@ class PluginApi(object): return value to include fixed_ips and device_owner for the device port 1.4 - tunnel_sync rpc signature upgrade to obtain 'host' + 1.5 - Support update_device_list and + get_devices_details_list_and_failed_devices ''' def __init__(self, topic): @@ -123,6 +125,26 @@ class PluginApi(object): ] return res + def get_devices_details_list_and_failed_devices(self, context, devices, + agent_id, host=None): + """Get devices details and the list of devices that failed. + + This method returns the devices details. If an error is thrown when + retrieving the devices details, the device is put in a list of + failed devices. + """ + try: + cctxt = self.client.prepare(version='1.5') + res = cctxt.call( + context, + 'get_devices_details_list_and_failed_devices', + devices=devices, agent_id=agent_id, host=host) + except oslo_messaging.UnsupportedVersion: + #TODO(rossella_s): Remove this failback logic in M + res = self._device_list_rpc_call_with_failed_dev( + self.get_device_details, context, agent_id, host, devices) + return res + def update_device_down(self, context, device, agent_id, host=None): cctxt = self.client.prepare() return cctxt.call(context, 'update_device_down', device=device, @@ -133,6 +155,41 @@ class PluginApi(object): return cctxt.call(context, 'update_device_up', device=device, agent_id=agent_id, host=host) + def _device_list_rpc_call_with_failed_dev(self, rpc_call, context, + agent_id, host, devices): + succeeded_devices = [] + failed_devices = [] + for device in devices: + try: + rpc_device = rpc_call(context, device, agent_id, host) + except Exception: + failed_devices.append(device) + else: + # update_device_up doesn't return the device + succeeded_dev = rpc_device or device + succeeded_devices.append(succeeded_dev) + return {'devices': succeeded_devices, 'failed_devices': failed_devices} + + def update_device_list(self, context, devices_up, devices_down, + agent_id, host): + try: + cctxt = self.client.prepare(version='1.5') + res = cctxt.call(context, 'update_device_list', + devices_up=devices_up, devices_down=devices_down, + agent_id=agent_id, host=host) + except oslo_messaging.UnsupportedVersion: + #TODO(rossella_s): Remove this failback logic in M + dev_up = self._device_list_rpc_call_with_failed_dev( + self.update_device_up, context, agent_id, host, devices_up) + dev_down = self._device_list_rpc_call_with_failed_dev( + self.update_device_down, context, agent_id, host, devices_down) + + res = {'devices_up': dev_up.get('devices'), + 'failed_devices_up': dev_up.get('failed_devices'), + 'devices_down': dev_down.get('devices'), + 'failed_devices_down': dev_down.get('failed_devices')} + return res + def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None): try: cctxt = self.client.prepare(version='1.4') diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index d39959da5f6..fe9f8a15b41 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -64,8 +64,7 @@ class _mac_mydialect(netaddr.mac_unix): class DeviceListRetrievalError(exceptions.NeutronException): - message = _("Unable to retrieve port details for devices: %(devices)s " - "because of error: %(error)s") + message = _("Unable to retrieve port details for devices: %(devices)s ") # A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac' @@ -740,6 +739,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, port_other_config) def _bind_devices(self, need_binding_ports): + devices_up = [] + devices_down = [] port_info = self.int_br.db_list( "Port", columns=["name", "tag"]) tags_by_name = {x['name']: x['tag'] for x in port_info} @@ -768,13 +769,26 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # API server, thus possibly preventing instance spawn. if port_detail.get('admin_state_up'): LOG.debug("Setting status for %s to UP", device) - self.plugin_rpc.update_device_up( - self.context, device, self.agent_id, self.conf.host) + devices_up.append(device) else: LOG.debug("Setting status for %s to DOWN", device) - self.plugin_rpc.update_device_down( - self.context, device, self.agent_id, self.conf.host) - LOG.info(_LI("Configuration for device %s completed."), device) + devices_down.append(device) + failed_devices = [] + if devices_up or devices_down: + devices_set = self.plugin_rpc.update_device_list( + self.context, devices_up, devices_down, self.agent_id, + self.conf.host) + failed_devices = (devices_set.get('failed_devices_up') + + devices_set.get('failed_devices_down')) + if failed_devices: + LOG.error(_LE("Configuration for devices %s failed!"), + failed_devices) + #TODO(rossella_s) handle better the resync in next patches, + # this is just to preserve the current behavior + raise DeviceListRetrievalError(devices=failed_devices) + LOG.info(_LI("Configuration for devices up %(up)s and devices " + "down %(down)s completed."), + {'up': devices_up, 'down': devices_down}) @staticmethod def setup_arp_spoofing_protection(bridge, vif, port_details): @@ -1201,17 +1215,21 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def treat_devices_added_or_updated(self, devices, ovs_restarted): skipped_devices = [] need_binding_devices = [] - try: - devices_details_list = self.plugin_rpc.get_devices_details_list( + devices_details_list = ( + self.plugin_rpc.get_devices_details_list_and_failed_devices( self.context, devices, self.agent_id, - self.conf.host) - except Exception as e: - raise DeviceListRetrievalError(devices=devices, error=e) + self.conf.host)) + if devices_details_list.get('failed_devices'): + #TODO(rossella_s) handle better the resync in next patches, + # this is just to preserve the current behavior + raise DeviceListRetrievalError(devices=devices) + + devices = devices_details_list.get('devices') vif_by_id = self.int_br.get_vifs_by_ids( - [vif['device'] for vif in devices_details_list]) - for details in devices_details_list: + [vif['device'] for vif in devices]) + for details in devices: device = details['device'] LOG.debug("Processing port: %s", device) port = vif_by_id.get(device) @@ -1247,62 +1265,67 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, return skipped_devices, need_binding_devices def treat_ancillary_devices_added(self, devices): - try: - devices_details_list = self.plugin_rpc.get_devices_details_list( + devices_details_list = ( + self.plugin_rpc.get_devices_details_list_and_failed_devices( self.context, devices, self.agent_id, - self.conf.host) - except Exception as e: - raise DeviceListRetrievalError(devices=devices, error=e) - - for details in devices_details_list: - device = details['device'] - LOG.info(_LI("Ancillary Port %s added"), device) - - # update plugin about port status - self.plugin_rpc.update_device_up(self.context, - device, - self.agent_id, - self.conf.host) + self.conf.host)) + if devices_details_list.get('failed_devices'): + #TODO(rossella_s) handle better the resync in next patches, + # this is just to preserve the current behavior + raise DeviceListRetrievalError(devices=devices) + devices_added = [ + d['device'] for d in devices_details_list.get('devices')] + LOG.info(_LI("Ancillary Ports %s added"), devices_added) + + # update plugin about port status + devices_set_up = ( + self.plugin_rpc.update_device_list(self.context, + devices_added, + [], + self.agent_id, + self.conf.host)) + if devices_set_up.get('failed_devices_up'): + #TODO(rossella_s) handle better the resync in next patches, + # this is just to preserve the current behavior + raise DeviceListRetrievalError() def treat_devices_removed(self, devices): resync = False self.sg_agent.remove_devices_filter(devices) + LOG.info(_LI("Ports %s removed"), devices) + devices_down = self.plugin_rpc.update_device_list(self.context, + [], + devices, + self.agent_id, + self.conf.host) + failed_devices = devices_down.get('failed_devices_down') + if failed_devices: + LOG.debug("Port removal failed for %(devices)s ", failed_devices) + resync = True for device in devices: - LOG.info(_LI("Attachment %s removed"), device) - try: - self.plugin_rpc.update_device_down(self.context, - device, - self.agent_id, - self.conf.host) - except Exception as e: - LOG.debug("port_removed failed for %(device)s: %(e)s", - {'device': device, 'e': e}) - resync = True - continue self.port_unbound(device) return resync def treat_ancillary_devices_removed(self, devices): resync = False - for device in devices: - LOG.info(_LI("Attachment %s removed"), device) - try: - details = self.plugin_rpc.update_device_down(self.context, - device, - self.agent_id, - self.conf.host) - except Exception as e: - LOG.debug("port_removed failed for %(device)s: %(e)s", - {'device': device, 'e': e}) - resync = True - continue - if details['exists']: - LOG.info(_LI("Port %s updated."), device) + LOG.info(_LI("Ancillary ports %s removed"), devices) + devices_down = self.plugin_rpc.update_device_list(self.context, + [], + devices, + self.agent_id, + self.conf.host) + failed_devices = devices_down.get('failed_devices_down') + if failed_devices: + LOG.debug("Port removal failed for %(devices)s ", failed_devices) + resync = True + for detail in devices_down.get('devices_down'): + if detail['exists']: + LOG.info(_LI("Port %s updated."), detail['device']) # Nothing to do regarding local networking else: - LOG.debug("Device %s not defined on plugin", device) + LOG.debug("Device %s not defined on plugin", detail['device']) return resync def process_network_ports(self, port_info, ovs_restarted): @@ -1355,7 +1378,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, port_info.get('updated', set())) self._bind_devices(need_binding_devices) - if 'removed' in port_info: + if 'removed' in port_info and port_info['removed']: start = time.time() resync_b = self.treat_devices_removed(port_info['removed']) LOG.debug("process_network_ports - iteration:%(iter_num)d - " @@ -1368,15 +1391,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def process_ancillary_network_ports(self, port_info): resync_a = False resync_b = False - if 'added' in port_info: + if 'added' in port_info and port_info['added']: start = time.time() try: self.treat_ancillary_devices_added(port_info['added']) LOG.debug("process_ancillary_network_ports - iteration: " "%(iter_num)d - treat_ancillary_devices_added " "completed in %(elapsed).3f", - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) except DeviceListRetrievalError: # Need to resync as there was an error with server # communication. @@ -1384,7 +1407,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, "iteration:%d - failure while retrieving " "port details from server"), self.iter_num) resync_a = True - if 'removed' in port_info: + if 'removed' in port_info and port_info['removed']: start = time.time() resync_b = self.treat_ancillary_devices_removed( port_info['removed']) diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 4187da6864e..4f5c10848c8 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -28,7 +28,7 @@ from neutron.common import rpc as n_rpc from neutron.common import topics from neutron.extensions import portbindings from neutron.extensions import portsecurity as psec -from neutron.i18n import _LW +from neutron.i18n import _LE, _LW from neutron import manager from neutron.plugins.ml2 import driver_api as api from neutron.plugins.ml2.drivers import type_tunnel @@ -48,7 +48,9 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): # return value to include fixed_ips and device_owner for # the device port # 1.4 tunnel_sync rpc signature upgrade to obtain 'host' - target = oslo_messaging.Target(version='1.4') + # 1.5 Support update_device_list and + # get_devices_details_list_and_failed_devices + target = oslo_messaging.Target(version='1.5') def __init__(self, notifier, type_manager): self.setup_tunnel_callback_mixin(notifier, type_manager) @@ -135,6 +137,27 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): for device in kwargs.pop('devices', []) ] + def get_devices_details_list_and_failed_devices(self, + rpc_context, + **kwargs): + devices = [] + failed_devices = [] + cached_networks = {} + for device in kwargs.pop('devices', []): + try: + devices.append(self.get_device_details( + rpc_context, + device=device, + cached_networks=cached_networks, + **kwargs)) + except Exception: + LOG.error(_LE("Failed to get details for device %s"), + device) + failed_devices.append(device) + + return {'devices': devices, + 'failed_devices': failed_devices} + def update_device_down(self, rpc_context, **kwargs): """Device no longer exists on agent.""" # TODO(garyk) - live migration and port status @@ -201,6 +224,44 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): registry.notify( resources.PORT, events.AFTER_UPDATE, plugin, **kwargs) + def update_device_list(self, rpc_context, **kwargs): + devices_up = [] + failed_devices_up = [] + devices_down = [] + failed_devices_down = [] + devices = kwargs.get('devices_up') + if devices: + for device in devices: + try: + self.update_device_up( + rpc_context, + device=device, + **kwargs) + except Exception: + failed_devices_up.append(device) + LOG.error(_LE("Failed to update device %s up"), device) + else: + devices_up.append(device) + + devices = kwargs.get('devices_down') + if devices: + for device in devices: + try: + dev = self.update_device_down( + rpc_context, + device=device, + **kwargs) + except Exception: + failed_devices_down.append(device) + LOG.error(_LE("Failed to update device %s down"), device) + else: + devices_down.append(dev) + + return {'devices_up': devices_up, + 'failed_devices_up': failed_devices_up, + 'devices_down': devices_down, + 'failed_devices_down': failed_devices_down} + class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, sg_rpc.SecurityGroupAgentRpcApiMixin, diff --git a/neutron/tests/functional/agent/test_l2_ovs_agent.py b/neutron/tests/functional/agent/test_l2_ovs_agent.py index 6deaab64e2e..db57e18b18c 100644 --- a/neutron/tests/functional/agent/test_l2_ovs_agent.py +++ b/neutron/tests/functional/agent/test_l2_ovs_agent.py @@ -202,11 +202,6 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): for port in [self.patch_tun, self.patch_int]: self.assertTrue(self.ovs.port_exists(port)) - def assert_no_vlan_tags(self, ports, agent): - for port in ports: - res = agent.int_br.db_get_val('Port', port.get('vif_name'), 'tag') - self.assertEqual([], res) - def assert_vlan_tags(self, ports, agent): for port in ports: res = agent.int_br.db_get_val('Port', port.get('vif_name'), 'tag') @@ -215,30 +210,64 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): class TestOVSAgent(OVSAgentTestFramework): - def _expected_plugin_rpc_call(self, call, expected_devices): + def _expected_plugin_rpc_call(self, call, expected_devices, is_up=True): """Helper to check expected rpc call are received :param call: The call to check :param expected_devices The device for which call is expected + :param is_up True if expected_devices are devices that are set up, + False if expected_devices are devices that are set down """ - args = (args[0][1] for args in call.call_args_list) - return not (set(expected_devices) - set(args)) + if is_up: + rpc_devices = [ + dev for args in call.call_args_list for dev in args[0][1]] + else: + rpc_devices = [ + dev for args in call.call_args_list for dev in args[0][2]] + return not (set(expected_devices) - set(rpc_devices)) - def _create_ports(self, network, agent): + def _create_ports(self, network, agent, trigger_resync=False): ports = [] for x in range(3): ports.append(self._create_test_port_dict()) + def mock_device_raise_exception(context, devices_up, devices_down, + agent_id, host=None): + agent.plugin_rpc.update_device_list.side_effect = ( + mock_update_device) + raise Exception('Exception to trigger resync') + def mock_device_details(context, devices, agent_id, host=None): + details = [] for port in ports: if port['id'] in devices: dev = OVSAgentTestFramework._get_device_details( port, network) details.append(dev) - return details + return {'devices': details, 'failed_devices': []} - agent.plugin_rpc.get_devices_details_list.side_effect = ( - mock_device_details) + def mock_update_device(context, devices_up, devices_down, agent_id, + host=None): + dev_up = [] + dev_down = [] + for port in ports: + if devices_up and port['id'] in devices_up: + dev_up.append(port['id']) + if devices_down and port['id'] in devices_down: + dev_down.append({'device': port['id'], 'exists': True}) + return {'devices_up': dev_up, + 'failed_devices_up': [], + 'devices_down': dev_down, + 'failed_devices_down': []} + + (agent.plugin_rpc.get_devices_details_list_and_failed_devices. + side_effect) = mock_device_details + if trigger_resync: + agent.plugin_rpc.update_device_list.side_effect = ( + mock_device_raise_exception) + else: + agent.plugin_rpc.update_device_list.side_effect = ( + mock_update_device) return ports def test_port_creation_and_deletion(self): @@ -250,39 +279,35 @@ class TestOVSAgent(OVSAgentTestFramework): up_ports_ids = [p['id'] for p in ports] agent_utils.wait_until_true( lambda: self._expected_plugin_rpc_call( - agent.plugin_rpc.update_device_up, up_ports_ids)) + agent.plugin_rpc.update_device_list, up_ports_ids)) down_ports_ids = [p['id'] for p in ports] for port in ports: agent.int_br.delete_port(port['vif_name']) agent_utils.wait_until_true( lambda: self._expected_plugin_rpc_call( - agent.plugin_rpc.update_device_down, down_ports_ids)) + agent.plugin_rpc.update_device_list, down_ports_ids, False)) def test_resync_devices_set_up_after_exception(self): agent = self.create_agent() self.start_agent(agent) network = self._create_test_network_dict() - ports = self._create_ports(network, agent) - agent.plugin_rpc.update_device_up.side_effect = [ - Exception('Exception to trigger resync'), - None, None, None] + ports = self._create_ports(network, agent, True) self._plug_ports(network, ports, agent) ports_ids = [p['id'] for p in ports] agent_utils.wait_until_true( lambda: self._expected_plugin_rpc_call( - agent.plugin_rpc.update_device_up, ports_ids)) + agent.plugin_rpc.update_device_list, ports_ids)) def test_port_vlan_tags(self): agent = self.create_agent() self.start_agent(agent) - ports = [] - for x in range(3): - ports.append(self._create_test_port_dict()) network = self._create_test_network_dict() + ports = self._create_ports(network, agent) + ports_ids = [p['id'] for p in ports] self._plug_ports(network, ports, agent) - agent.provision_local_vlan(network['id'], 'vlan', 'physnet', 1) - self.assert_no_vlan_tags(ports, agent) - self._bind_ports(ports, network, agent) + agent_utils.wait_until_true( + lambda: self._expected_plugin_rpc_call( + agent.plugin_rpc.update_device_list, ports_ids)) self.assert_vlan_tags(ports, agent) def test_assert_bridges_ports_vxlan(self): diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 19bcd520d99..9aaa3132f19 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -322,16 +322,30 @@ class TestOvsNeutronAgent(object): vif_port_set, registered_ports, port_tags_dict=port_tags_dict) self.assertEqual(expected, actual) - def test_treat_devices_added_returns_raises_for_missing_device(self): - with mock.patch.object(self.agent.plugin_rpc, - 'get_devices_details_list', - side_effect=Exception()),\ - mock.patch.object(self.agent.int_br, - 'get_vif_port_by_id', - return_value=mock.Mock()): - self.assertRaises( - self.mod_agent.DeviceListRetrievalError, - self.agent.treat_devices_added_or_updated, [{}], False) + def test_bind_devices(self): + devices_up = ['tap1'] + devices_down = ['tap2'] + self.agent.local_vlan_map["net1"] = mock.Mock() + port_details = [ + {'network_id': 'net1', 'vif_port': mock.Mock(), + 'device': devices_up[0], + 'admin_state_up': True}, + {'network_id': 'net1', 'vif_port': mock.Mock(), + 'device': devices_down[0], + 'admin_state_up': False}] + with mock.patch.object( + self.agent.plugin_rpc, 'update_device_list', + return_value={'devices_up': devices_up, + 'devices_down': devices_down, + 'failed_devices_up': [], + 'failed_devices_down': []}) as update_devices, \ + mock.patch.object(self.agent, + 'int_br') as int_br: + int_br.db_list.return_value = [] + self.agent._bind_devices(port_details) + update_devices.assert_called_once_with(mock.ANY, devices_up, + devices_down, + mock.ANY, mock.ANY) def _mock_treat_devices_added_updated(self, details, port, func_name): """Mock treat devices added or updated. @@ -342,11 +356,17 @@ class TestOvsNeutronAgent(object): :returns: whether the named function was called """ with mock.patch.object(self.agent.plugin_rpc, - 'get_devices_details_list', - return_value=[details]),\ + 'get_devices_details_list_and_failed_devices', + return_value={'devices': [details], + 'failed_devices': None}),\ mock.patch.object(self.agent.int_br, 'get_vifs_by_ids', return_value={details['device']: port}),\ + mock.patch.object(self.agent.plugin_rpc, 'update_device_list', + return_value={'devices_up': [], + 'devices_down': details, + 'failed_devices_up': [], + 'failed_devices_down': []}),\ mock.patch.object(self.agent, func_name) as func: skip_devs, need_bound_devices = ( self.agent.treat_devices_added_or_updated([{}], False)) @@ -367,8 +387,9 @@ class TestOvsNeutronAgent(object): mock.MagicMock(), port, 'port_dead')) def test_treat_devices_added_does_not_process_missing_port(self): - with mock.patch.object(self.agent.plugin_rpc, - 'get_device_details') as get_dev_fn,\ + with mock.patch.object( + self.agent.plugin_rpc, + 'get_devices_details_list_and_failed_devices') as get_dev_fn,\ mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value=None): @@ -384,8 +405,9 @@ class TestOvsNeutronAgent(object): dev_mock = mock.MagicMock() dev_mock.__getitem__.return_value = 'the_skipped_one' with mock.patch.object(self.agent.plugin_rpc, - 'get_devices_details_list', - return_value=[dev_mock]),\ + 'get_devices_details_list_and_failed_devices', + return_value={'devices': [dev_mock], + 'failed_devices': None}),\ mock.patch.object(self.agent.int_br, 'get_vifs_by_ids', return_value={}),\ @@ -411,8 +433,9 @@ class TestOvsNeutronAgent(object): } with mock.patch.object(self.agent.plugin_rpc, - 'get_devices_details_list', - return_value=[fake_details_dict]),\ + 'get_devices_details_list_and_failed_devices', + return_value={'devices': [fake_details_dict], + 'failed_devices': None}),\ mock.patch.object(self.agent.int_br, 'get_vifs_by_ids', return_value={'xxx': mock.MagicMock()}),\ @@ -424,15 +447,14 @@ class TestOvsNeutronAgent(object): self.assertFalse(skip_devs) self.assertTrue(treat_vif_port.called) - def test_treat_devices_removed_returns_true_for_missing_device(self): - with mock.patch.object(self.agent.plugin_rpc, 'update_device_down', - side_effect=Exception()): - self.assertTrue(self.agent.treat_devices_removed([{}])) - def _mock_treat_devices_removed(self, port_exists): details = dict(exists=port_exists) - with mock.patch.object(self.agent.plugin_rpc, 'update_device_down', - return_value=details): + with mock.patch.object(self.agent.plugin_rpc, + 'update_device_list', + return_value={'devices_up': [], + 'devices_down': details, + 'failed_devices_up': [], + 'failed_devices_down': []}): with mock.patch.object(self.agent, 'port_unbound') as port_unbound: self.assertFalse(self.agent.treat_devices_removed([{}])) self.assertTrue(port_unbound.called) @@ -1046,7 +1068,11 @@ class TestOvsNeutronAgent(object): 'physical_network', 'segmentation_id', 'admin_state_up', 'fixed_ips', 'device', 'device_owner')}] - self.agent.plugin_rpc.get_devices_details_list.return_value = plist + (self.agent.plugin_rpc.get_devices_details_list_and_failed_devices. + return_value) = {'devices': plist, 'failed_devices': []} + self.agent.plugin_rpc.update_device_list.return_value = { + 'devices_up': plist, 'devices_down': [], 'failed_devices_up': [], + 'failed_devices_down': []} self.agent.setup_arp_spoofing_protection = mock.Mock() self.agent.treat_devices_added_or_updated([], False) self.assertFalse(self.agent.setup_arp_spoofing_protection.called) @@ -1664,9 +1690,12 @@ class TestOvsDvrNeutronAgent(object): int_br.reset_mock() tun_br.reset_mock() with mock.patch.object(self.agent, 'reclaim_local_vlan'),\ - mock.patch.object(self.agent.plugin_rpc, - 'update_device_down', - return_value=None),\ + mock.patch.object(self.agent.plugin_rpc, 'update_device_list', + return_value={ + 'devices_up': [], + 'devices_down': [self._port.vif_id], + 'failed_devices_up': [], + 'failed_devices_down': []}),\ mock.patch.object(self.agent, 'int_br', new=int_br),\ mock.patch.object(self.agent, 'tun_br', new=tun_br),\ mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\ @@ -1766,9 +1795,13 @@ class TestOvsDvrNeutronAgent(object): int_br.reset_mock() tun_br.reset_mock() with mock.patch.object(self.agent, 'reclaim_local_vlan'),\ - mock.patch.object(self.agent.plugin_rpc, - 'update_device_down', - return_value=None),\ + mock.patch.object(self.agent.plugin_rpc, 'update_device_list', + return_value={ + 'devices_up': [], + 'devices_down': [ + self._compute_port.vif_id], + 'failed_devices_up': [], + 'failed_devices_down': []}),\ mock.patch.object(self.agent, 'int_br', new=int_br),\ mock.patch.object(self.agent, 'tun_br', new=tun_br),\ mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\ @@ -1853,9 +1886,12 @@ class TestOvsDvrNeutronAgent(object): int_br.reset_mock() tun_br.reset_mock() with mock.patch.object(self.agent, 'reclaim_local_vlan'),\ - mock.patch.object(self.agent.plugin_rpc, - 'update_device_down', - return_value=None),\ + mock.patch.object(self.agent.plugin_rpc, 'update_device_list', + return_value={ + 'devices_up': [], + 'devices_down': [self._port.vif_id], + 'failed_devices_up': [], + 'failed_devices_down': []}),\ mock.patch.object(self.agent, 'int_br', new=int_br),\ mock.patch.object(self.agent, 'tun_br', new=tun_br),\ mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\ diff --git a/neutron/tests/unit/plugins/ml2/test_rpc.py b/neutron/tests/unit/plugins/ml2/test_rpc.py index f0e1a360322..72775b9fe80 100644 --- a/neutron/tests/unit/plugins/ml2/test_rpc.py +++ b/neutron/tests/unit/plugins/ml2/test_rpc.py @@ -22,6 +22,7 @@ import collections import mock from oslo_config import cfg from oslo_context import context as oslo_context +import oslo_messaging from sqlalchemy.orm import exc from neutron.agent import rpc as agent_rpc @@ -134,27 +135,53 @@ class RpcCallbacksTestCase(base.BaseTestCase): self.callbacks.get_device_details(mock.Mock()) self.assertTrue(self.plugin.update_port_status.called) - def test_get_devices_details_list(self): + def _test_get_devices_list(self, callback, side_effect, expected): devices = [1, 2, 3, 4, 5] kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'} with mock.patch.object(self.callbacks, 'get_device_details', - side_effect=devices) as f: - res = self.callbacks.get_devices_details_list('fake_context', - devices=devices, - **kwargs) - self.assertEqual(devices, res) + side_effect=side_effect) as f: + res = callback('fake_context', devices=devices, **kwargs) + self.assertEqual(expected, res) self.assertEqual(len(devices), f.call_count) calls = [mock.call('fake_context', device=i, cached_networks={}, **kwargs) for i in devices] f.assert_has_calls(calls) + def test_get_devices_details_list(self): + devices = [1, 2, 3, 4, 5] + expected = devices + callback = self.callbacks.get_devices_details_list + self._test_get_devices_list(callback, devices, expected) + def test_get_devices_details_list_with_empty_devices(self): with mock.patch.object(self.callbacks, 'get_device_details') as f: res = self.callbacks.get_devices_details_list('fake_context') self.assertFalse(f.called) self.assertEqual([], res) + def test_get_devices_details_list_and_failed_devices(self): + devices = [1, 2, 3, 4, 5] + expected = {'devices': devices, 'failed_devices': []} + callback = ( + self.callbacks.get_devices_details_list_and_failed_devices) + self._test_get_devices_list(callback, devices, expected) + + def test_get_devices_details_list_and_failed_devices_failures(self): + devices = [1, Exception('testdevice'), 3, + Exception('testdevice'), 5] + expected = {'devices': [1, 3, 5], 'failed_devices': [2, 4]} + callback = ( + self.callbacks.get_devices_details_list_and_failed_devices) + self._test_get_devices_list(callback, devices, expected) + + def test_get_devices_details_list_and_failed_devices_empty_dev(self): + with mock.patch.object(self.callbacks, 'get_device_details') as f: + res = self.callbacks.get_devices_details_list_and_failed_devices( + 'fake_context') + self.assertFalse(f.called) + self.assertEqual({'devices': [], 'failed_devices': []}, res) + def _test_update_device_not_bound_to_host(self, func): self.plugin.port_bound_to_host.return_value = False self.plugin._device_to_port_id.return_value = 'fake_port_id' @@ -192,6 +219,64 @@ class RpcCallbacksTestCase(base.BaseTestCase): self.callbacks.update_device_down( mock.Mock(), device='fake_device')) + def _test_update_device_list(self, devices_up_side_effect, + devices_down_side_effect, expected): + devices_up = [1, 2, 3] + devices_down = [4, 5] + kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'} + with mock.patch.object(self.callbacks, 'update_device_up', + side_effect=devices_up_side_effect) as f_up, \ + mock.patch.object(self.callbacks, 'update_device_down', + side_effect=devices_down_side_effect) as f_down: + res = self.callbacks.update_device_list( + 'fake_context', devices_up=devices_up, + devices_down=devices_down, **kwargs) + self.assertEqual(expected, res) + self.assertEqual(len(devices_up), f_up.call_count) + self.assertEqual(len(devices_down), f_down.call_count) + + def test_update_device_list_no_failure(self): + devices_up_side_effect = [1, 2, 3] + devices_down_side_effect = [ + {'device': 4, 'exists': True}, + {'device': 5, 'exists': True}] + expected = {'devices_up': devices_up_side_effect, + 'failed_devices_up': [], + 'devices_down': + [{'device': 4, 'exists': True}, + {'device': 5, 'exists': True}], + 'failed_devices_down': []} + self._test_update_device_list(devices_up_side_effect, + devices_down_side_effect, + expected) + + def test_update_device_list_failed_devices(self): + + devices_up_side_effect = [1, Exception('testdevice'), 3] + devices_down_side_effect = [{'device': 4, 'exists': True}, + Exception('testdevice')] + expected = {'devices_up': [1, 3], + 'failed_devices_up': [2], + 'devices_down': + [{'device': 4, 'exists': True}], + 'failed_devices_down': [5]} + + self._test_update_device_list(devices_up_side_effect, + devices_down_side_effect, + expected) + + def test_update_device_list_empty_devices(self): + + expected = {'devices_up': [], + 'failed_devices_up': [], + 'devices_down': [], + 'failed_devices_down': []} + + kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'} + res = self.callbacks.update_device_list( + 'fake_context', devices_up=[], devices_down=[], **kwargs) + self.assertEqual(expected, res) + class RpcApiTestCase(base.BaseTestCase): @@ -314,3 +399,73 @@ class RpcApiTestCase(base.BaseTestCase): device='fake_device', agent_id='fake_agent_id', host='fake_host') + + def test_update_device_list(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + self._test_rpc_api(rpcapi, None, + 'update_device_list', rpc_method='call', + devices_up=['fake_device1', 'fake_device2'], + devices_down=['fake_device3', 'fake_device4'], + agent_id='fake_agent_id', + host='fake_host', + version='1.5') + + def test_update_device_list_unsupported(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + ctxt = oslo_context.RequestContext('fake_user', 'fake_project') + devices_up = ['fake_device1', 'fake_device2'] + devices_down = ['fake_device3', 'fake_device4'] + expected_ret_val = {'devices_up': ['fake_device2'], + 'failed_devices_up': ['fake_device1'], + 'devices_down': [ + {'device': 'fake_device3', 'exists': True}], + 'failed_devices_down': ['fake_device4']} + rpcapi.update_device_up = mock.Mock( + side_effect=[Exception('fake_device1 fails'), None]) + rpcapi.update_device_down = mock.Mock( + side_effect=[{'device': 'fake_device3', 'exists': True}, + Exception('fake_device4 fails')]) + with mock.patch.object(rpcapi.client, 'call'),\ + mock.patch.object(rpcapi.client, 'prepare') as prepare_mock: + prepare_mock.side_effect = oslo_messaging.UnsupportedVersion( + 'test') + res = rpcapi.update_device_list(ctxt, devices_up, devices_down, + 'fake_agent_id', 'fake_host') + self.assertEqual(expected_ret_val, res) + + def test_get_devices_details_list_and_failed_devices(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + self._test_rpc_api(rpcapi, None, + 'get_devices_details_list_and_failed_devices', + rpc_method='call', + devices=['fake_device1', 'fake_device2'], + agent_id='fake_agent_id', + host='fake_host', + version='1.5') + + def test_devices_details_list_and_failed_devices(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + self._test_rpc_api(rpcapi, None, + 'get_devices_details_list_and_failed_devices', + rpc_method='call', + devices=['fake_device1', 'fake_device2'], + agent_id='fake_agent_id', host='fake_host', + version='1.5') + + def test_get_devices_details_list_and_failed_devices_unsupported(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + ctxt = oslo_context.RequestContext('fake_user', 'fake_project') + devices = ['fake_device1', 'fake_device2'] + dev2_details = {'device': 'fake_device2', 'network_id': 'net_id', + 'port_id': 'port_id', 'admin_state_up': True} + expected_ret_val = {'devices': [dev2_details], + 'failed_devices': ['fake_device1']} + rpcapi.get_device_details = mock.Mock( + side_effect=[Exception('fake_device1 fails'), dev2_details]) + with mock.patch.object(rpcapi.client, 'call'),\ + mock.patch.object(rpcapi.client, 'prepare') as prepare_mock: + prepare_mock.side_effect = oslo_messaging.UnsupportedVersion( + 'test') + res = rpcapi.get_devices_details_list_and_failed_devices( + ctxt, devices, 'fake_agent_id', 'fake_host') + self.assertEqual(expected_ret_val, res)