From ced2e63cb8e287b9649edae7183b25a92097b3ed Mon Sep 17 00:00:00 2001 From: rossella Date: Wed, 14 Jan 2015 15:11:36 +0100 Subject: [PATCH] L2 agent RPC add new RPC calls This patch introduces two new RPC calls: * update_device_list * get_devices_details_list_and_failed_devices update_device_up and update_device_down are the RPC calls used by the L2 agent to notify the plugin that a device is up or down. These calls accept only one device. update_device_list accepts a list of devices that will be put up or down and returns a list of devices for which the operation failed. get_devices_details_list_and_failed_devices is a new RPC call that returns a list of devices details and a list of devices for which it was not possible to retrieve the details. Thanks to these new calls, the agent will be able to update the status of several devices using one RPC call and to retry the operation only for the devices that failed instead of performing a full resync. This will be implemented in another patch. This patch introduces the new calls only for the OVS agent. A following patch will take care of the other L2 agents. DocImpact Partially-Implements: blueprint restructure-l2-agent Change-Id: Idb4e81f4399cf66025ec7edfcb78801c15e493af --- neutron/agent/rpc.py | 57 ++++++ .../openvswitch/agent/ovs_neutron_agent.py | 141 ++++++++------- neutron/plugins/ml2/rpc.py | 65 ++++++- .../functional/agent/test_l2_ovs_agent.py | 75 +++++--- .../agent/test_ovs_neutron_agent.py | 106 +++++++---- neutron/tests/unit/plugins/ml2/test_rpc.py | 167 +++++++++++++++++- 6 files changed, 484 insertions(+), 127 deletions(-) diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index 11bf79784c5..8a05a291574 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -95,6 +95,8 @@ class PluginApi(object): return value to include fixed_ips and device_owner for the device port 1.4 - tunnel_sync rpc signature upgrade to obtain 'host' + 1.5 - Support update_device_list and + get_devices_details_list_and_failed_devices ''' def __init__(self, topic): @@ -123,6 +125,26 @@ class PluginApi(object): ] return res + def get_devices_details_list_and_failed_devices(self, context, devices, + agent_id, host=None): + """Get devices details and the list of devices that failed. + + This method returns the devices details. If an error is thrown when + retrieving the devices details, the device is put in a list of + failed devices. + """ + try: + cctxt = self.client.prepare(version='1.5') + res = cctxt.call( + context, + 'get_devices_details_list_and_failed_devices', + devices=devices, agent_id=agent_id, host=host) + except oslo_messaging.UnsupportedVersion: + #TODO(rossella_s): Remove this failback logic in M + res = self._device_list_rpc_call_with_failed_dev( + self.get_device_details, context, agent_id, host, devices) + return res + def update_device_down(self, context, device, agent_id, host=None): cctxt = self.client.prepare() return cctxt.call(context, 'update_device_down', device=device, @@ -133,6 +155,41 @@ class PluginApi(object): return cctxt.call(context, 'update_device_up', device=device, agent_id=agent_id, host=host) + def _device_list_rpc_call_with_failed_dev(self, rpc_call, context, + agent_id, host, devices): + succeeded_devices = [] + failed_devices = [] + for device in devices: + try: + rpc_device = rpc_call(context, device, agent_id, host) + except Exception: + failed_devices.append(device) + else: + # update_device_up doesn't return the device + succeeded_dev = rpc_device or device + succeeded_devices.append(succeeded_dev) + return {'devices': succeeded_devices, 'failed_devices': failed_devices} + + def update_device_list(self, context, devices_up, devices_down, + agent_id, host): + try: + cctxt = self.client.prepare(version='1.5') + res = cctxt.call(context, 'update_device_list', + devices_up=devices_up, devices_down=devices_down, + agent_id=agent_id, host=host) + except oslo_messaging.UnsupportedVersion: + #TODO(rossella_s): Remove this failback logic in M + dev_up = self._device_list_rpc_call_with_failed_dev( + self.update_device_up, context, agent_id, host, devices_up) + dev_down = self._device_list_rpc_call_with_failed_dev( + self.update_device_down, context, agent_id, host, devices_down) + + res = {'devices_up': dev_up.get('devices'), + 'failed_devices_up': dev_up.get('failed_devices'), + 'devices_down': dev_down.get('devices'), + 'failed_devices_down': dev_down.get('failed_devices')} + return res + def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None): try: cctxt = self.client.prepare(version='1.4') diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index d39959da5f6..fe9f8a15b41 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -64,8 +64,7 @@ class _mac_mydialect(netaddr.mac_unix): class DeviceListRetrievalError(exceptions.NeutronException): - message = _("Unable to retrieve port details for devices: %(devices)s " - "because of error: %(error)s") + message = _("Unable to retrieve port details for devices: %(devices)s ") # A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac' @@ -740,6 +739,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, port_other_config) def _bind_devices(self, need_binding_ports): + devices_up = [] + devices_down = [] port_info = self.int_br.db_list( "Port", columns=["name", "tag"]) tags_by_name = {x['name']: x['tag'] for x in port_info} @@ -768,13 +769,26 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # API server, thus possibly preventing instance spawn. if port_detail.get('admin_state_up'): LOG.debug("Setting status for %s to UP", device) - self.plugin_rpc.update_device_up( - self.context, device, self.agent_id, self.conf.host) + devices_up.append(device) else: LOG.debug("Setting status for %s to DOWN", device) - self.plugin_rpc.update_device_down( - self.context, device, self.agent_id, self.conf.host) - LOG.info(_LI("Configuration for device %s completed."), device) + devices_down.append(device) + failed_devices = [] + if devices_up or devices_down: + devices_set = self.plugin_rpc.update_device_list( + self.context, devices_up, devices_down, self.agent_id, + self.conf.host) + failed_devices = (devices_set.get('failed_devices_up') + + devices_set.get('failed_devices_down')) + if failed_devices: + LOG.error(_LE("Configuration for devices %s failed!"), + failed_devices) + #TODO(rossella_s) handle better the resync in next patches, + # this is just to preserve the current behavior + raise DeviceListRetrievalError(devices=failed_devices) + LOG.info(_LI("Configuration for devices up %(up)s and devices " + "down %(down)s completed."), + {'up': devices_up, 'down': devices_down}) @staticmethod def setup_arp_spoofing_protection(bridge, vif, port_details): @@ -1201,17 +1215,21 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def treat_devices_added_or_updated(self, devices, ovs_restarted): skipped_devices = [] need_binding_devices = [] - try: - devices_details_list = self.plugin_rpc.get_devices_details_list( + devices_details_list = ( + self.plugin_rpc.get_devices_details_list_and_failed_devices( self.context, devices, self.agent_id, - self.conf.host) - except Exception as e: - raise DeviceListRetrievalError(devices=devices, error=e) + self.conf.host)) + if devices_details_list.get('failed_devices'): + #TODO(rossella_s) handle better the resync in next patches, + # this is just to preserve the current behavior + raise DeviceListRetrievalError(devices=devices) + + devices = devices_details_list.get('devices') vif_by_id = self.int_br.get_vifs_by_ids( - [vif['device'] for vif in devices_details_list]) - for details in devices_details_list: + [vif['device'] for vif in devices]) + for details in devices: device = details['device'] LOG.debug("Processing port: %s", device) port = vif_by_id.get(device) @@ -1247,62 +1265,67 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, return skipped_devices, need_binding_devices def treat_ancillary_devices_added(self, devices): - try: - devices_details_list = self.plugin_rpc.get_devices_details_list( + devices_details_list = ( + self.plugin_rpc.get_devices_details_list_and_failed_devices( self.context, devices, self.agent_id, - self.conf.host) - except Exception as e: - raise DeviceListRetrievalError(devices=devices, error=e) + self.conf.host)) + if devices_details_list.get('failed_devices'): + #TODO(rossella_s) handle better the resync in next patches, + # this is just to preserve the current behavior + raise DeviceListRetrievalError(devices=devices) + devices_added = [ + d['device'] for d in devices_details_list.get('devices')] + LOG.info(_LI("Ancillary Ports %s added"), devices_added) - for details in devices_details_list: - device = details['device'] - LOG.info(_LI("Ancillary Port %s added"), device) - - # update plugin about port status - self.plugin_rpc.update_device_up(self.context, - device, - self.agent_id, - self.conf.host) + # update plugin about port status + devices_set_up = ( + self.plugin_rpc.update_device_list(self.context, + devices_added, + [], + self.agent_id, + self.conf.host)) + if devices_set_up.get('failed_devices_up'): + #TODO(rossella_s) handle better the resync in next patches, + # this is just to preserve the current behavior + raise DeviceListRetrievalError() def treat_devices_removed(self, devices): resync = False self.sg_agent.remove_devices_filter(devices) + LOG.info(_LI("Ports %s removed"), devices) + devices_down = self.plugin_rpc.update_device_list(self.context, + [], + devices, + self.agent_id, + self.conf.host) + failed_devices = devices_down.get('failed_devices_down') + if failed_devices: + LOG.debug("Port removal failed for %(devices)s ", failed_devices) + resync = True for device in devices: - LOG.info(_LI("Attachment %s removed"), device) - try: - self.plugin_rpc.update_device_down(self.context, - device, - self.agent_id, - self.conf.host) - except Exception as e: - LOG.debug("port_removed failed for %(device)s: %(e)s", - {'device': device, 'e': e}) - resync = True - continue self.port_unbound(device) return resync def treat_ancillary_devices_removed(self, devices): resync = False - for device in devices: - LOG.info(_LI("Attachment %s removed"), device) - try: - details = self.plugin_rpc.update_device_down(self.context, - device, - self.agent_id, - self.conf.host) - except Exception as e: - LOG.debug("port_removed failed for %(device)s: %(e)s", - {'device': device, 'e': e}) - resync = True - continue - if details['exists']: - LOG.info(_LI("Port %s updated."), device) + LOG.info(_LI("Ancillary ports %s removed"), devices) + devices_down = self.plugin_rpc.update_device_list(self.context, + [], + devices, + self.agent_id, + self.conf.host) + failed_devices = devices_down.get('failed_devices_down') + if failed_devices: + LOG.debug("Port removal failed for %(devices)s ", failed_devices) + resync = True + for detail in devices_down.get('devices_down'): + if detail['exists']: + LOG.info(_LI("Port %s updated."), detail['device']) # Nothing to do regarding local networking else: - LOG.debug("Device %s not defined on plugin", device) + LOG.debug("Device %s not defined on plugin", detail['device']) return resync def process_network_ports(self, port_info, ovs_restarted): @@ -1355,7 +1378,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, port_info.get('updated', set())) self._bind_devices(need_binding_devices) - if 'removed' in port_info: + if 'removed' in port_info and port_info['removed']: start = time.time() resync_b = self.treat_devices_removed(port_info['removed']) LOG.debug("process_network_ports - iteration:%(iter_num)d - " @@ -1368,15 +1391,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def process_ancillary_network_ports(self, port_info): resync_a = False resync_b = False - if 'added' in port_info: + if 'added' in port_info and port_info['added']: start = time.time() try: self.treat_ancillary_devices_added(port_info['added']) LOG.debug("process_ancillary_network_ports - iteration: " "%(iter_num)d - treat_ancillary_devices_added " "completed in %(elapsed).3f", - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) except DeviceListRetrievalError: # Need to resync as there was an error with server # communication. @@ -1384,7 +1407,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, "iteration:%d - failure while retrieving " "port details from server"), self.iter_num) resync_a = True - if 'removed' in port_info: + if 'removed' in port_info and port_info['removed']: start = time.time() resync_b = self.treat_ancillary_devices_removed( port_info['removed']) diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 4187da6864e..4f5c10848c8 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -28,7 +28,7 @@ from neutron.common import rpc as n_rpc from neutron.common import topics from neutron.extensions import portbindings from neutron.extensions import portsecurity as psec -from neutron.i18n import _LW +from neutron.i18n import _LE, _LW from neutron import manager from neutron.plugins.ml2 import driver_api as api from neutron.plugins.ml2.drivers import type_tunnel @@ -48,7 +48,9 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): # return value to include fixed_ips and device_owner for # the device port # 1.4 tunnel_sync rpc signature upgrade to obtain 'host' - target = oslo_messaging.Target(version='1.4') + # 1.5 Support update_device_list and + # get_devices_details_list_and_failed_devices + target = oslo_messaging.Target(version='1.5') def __init__(self, notifier, type_manager): self.setup_tunnel_callback_mixin(notifier, type_manager) @@ -135,6 +137,27 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): for device in kwargs.pop('devices', []) ] + def get_devices_details_list_and_failed_devices(self, + rpc_context, + **kwargs): + devices = [] + failed_devices = [] + cached_networks = {} + for device in kwargs.pop('devices', []): + try: + devices.append(self.get_device_details( + rpc_context, + device=device, + cached_networks=cached_networks, + **kwargs)) + except Exception: + LOG.error(_LE("Failed to get details for device %s"), + device) + failed_devices.append(device) + + return {'devices': devices, + 'failed_devices': failed_devices} + def update_device_down(self, rpc_context, **kwargs): """Device no longer exists on agent.""" # TODO(garyk) - live migration and port status @@ -201,6 +224,44 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): registry.notify( resources.PORT, events.AFTER_UPDATE, plugin, **kwargs) + def update_device_list(self, rpc_context, **kwargs): + devices_up = [] + failed_devices_up = [] + devices_down = [] + failed_devices_down = [] + devices = kwargs.get('devices_up') + if devices: + for device in devices: + try: + self.update_device_up( + rpc_context, + device=device, + **kwargs) + except Exception: + failed_devices_up.append(device) + LOG.error(_LE("Failed to update device %s up"), device) + else: + devices_up.append(device) + + devices = kwargs.get('devices_down') + if devices: + for device in devices: + try: + dev = self.update_device_down( + rpc_context, + device=device, + **kwargs) + except Exception: + failed_devices_down.append(device) + LOG.error(_LE("Failed to update device %s down"), device) + else: + devices_down.append(dev) + + return {'devices_up': devices_up, + 'failed_devices_up': failed_devices_up, + 'devices_down': devices_down, + 'failed_devices_down': failed_devices_down} + class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, sg_rpc.SecurityGroupAgentRpcApiMixin, diff --git a/neutron/tests/functional/agent/test_l2_ovs_agent.py b/neutron/tests/functional/agent/test_l2_ovs_agent.py index 6deaab64e2e..db57e18b18c 100644 --- a/neutron/tests/functional/agent/test_l2_ovs_agent.py +++ b/neutron/tests/functional/agent/test_l2_ovs_agent.py @@ -202,11 +202,6 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): for port in [self.patch_tun, self.patch_int]: self.assertTrue(self.ovs.port_exists(port)) - def assert_no_vlan_tags(self, ports, agent): - for port in ports: - res = agent.int_br.db_get_val('Port', port.get('vif_name'), 'tag') - self.assertEqual([], res) - def assert_vlan_tags(self, ports, agent): for port in ports: res = agent.int_br.db_get_val('Port', port.get('vif_name'), 'tag') @@ -215,30 +210,64 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): class TestOVSAgent(OVSAgentTestFramework): - def _expected_plugin_rpc_call(self, call, expected_devices): + def _expected_plugin_rpc_call(self, call, expected_devices, is_up=True): """Helper to check expected rpc call are received :param call: The call to check :param expected_devices The device for which call is expected + :param is_up True if expected_devices are devices that are set up, + False if expected_devices are devices that are set down """ - args = (args[0][1] for args in call.call_args_list) - return not (set(expected_devices) - set(args)) + if is_up: + rpc_devices = [ + dev for args in call.call_args_list for dev in args[0][1]] + else: + rpc_devices = [ + dev for args in call.call_args_list for dev in args[0][2]] + return not (set(expected_devices) - set(rpc_devices)) - def _create_ports(self, network, agent): + def _create_ports(self, network, agent, trigger_resync=False): ports = [] for x in range(3): ports.append(self._create_test_port_dict()) + def mock_device_raise_exception(context, devices_up, devices_down, + agent_id, host=None): + agent.plugin_rpc.update_device_list.side_effect = ( + mock_update_device) + raise Exception('Exception to trigger resync') + def mock_device_details(context, devices, agent_id, host=None): + details = [] for port in ports: if port['id'] in devices: dev = OVSAgentTestFramework._get_device_details( port, network) details.append(dev) - return details + return {'devices': details, 'failed_devices': []} - agent.plugin_rpc.get_devices_details_list.side_effect = ( - mock_device_details) + def mock_update_device(context, devices_up, devices_down, agent_id, + host=None): + dev_up = [] + dev_down = [] + for port in ports: + if devices_up and port['id'] in devices_up: + dev_up.append(port['id']) + if devices_down and port['id'] in devices_down: + dev_down.append({'device': port['id'], 'exists': True}) + return {'devices_up': dev_up, + 'failed_devices_up': [], + 'devices_down': dev_down, + 'failed_devices_down': []} + + (agent.plugin_rpc.get_devices_details_list_and_failed_devices. + side_effect) = mock_device_details + if trigger_resync: + agent.plugin_rpc.update_device_list.side_effect = ( + mock_device_raise_exception) + else: + agent.plugin_rpc.update_device_list.side_effect = ( + mock_update_device) return ports def test_port_creation_and_deletion(self): @@ -250,39 +279,35 @@ class TestOVSAgent(OVSAgentTestFramework): up_ports_ids = [p['id'] for p in ports] agent_utils.wait_until_true( lambda: self._expected_plugin_rpc_call( - agent.plugin_rpc.update_device_up, up_ports_ids)) + agent.plugin_rpc.update_device_list, up_ports_ids)) down_ports_ids = [p['id'] for p in ports] for port in ports: agent.int_br.delete_port(port['vif_name']) agent_utils.wait_until_true( lambda: self._expected_plugin_rpc_call( - agent.plugin_rpc.update_device_down, down_ports_ids)) + agent.plugin_rpc.update_device_list, down_ports_ids, False)) def test_resync_devices_set_up_after_exception(self): agent = self.create_agent() self.start_agent(agent) network = self._create_test_network_dict() - ports = self._create_ports(network, agent) - agent.plugin_rpc.update_device_up.side_effect = [ - Exception('Exception to trigger resync'), - None, None, None] + ports = self._create_ports(network, agent, True) self._plug_ports(network, ports, agent) ports_ids = [p['id'] for p in ports] agent_utils.wait_until_true( lambda: self._expected_plugin_rpc_call( - agent.plugin_rpc.update_device_up, ports_ids)) + agent.plugin_rpc.update_device_list, ports_ids)) def test_port_vlan_tags(self): agent = self.create_agent() self.start_agent(agent) - ports = [] - for x in range(3): - ports.append(self._create_test_port_dict()) network = self._create_test_network_dict() + ports = self._create_ports(network, agent) + ports_ids = [p['id'] for p in ports] self._plug_ports(network, ports, agent) - agent.provision_local_vlan(network['id'], 'vlan', 'physnet', 1) - self.assert_no_vlan_tags(ports, agent) - self._bind_ports(ports, network, agent) + agent_utils.wait_until_true( + lambda: self._expected_plugin_rpc_call( + agent.plugin_rpc.update_device_list, ports_ids)) self.assert_vlan_tags(ports, agent) def test_assert_bridges_ports_vxlan(self): diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 19bcd520d99..9aaa3132f19 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -322,16 +322,30 @@ class TestOvsNeutronAgent(object): vif_port_set, registered_ports, port_tags_dict=port_tags_dict) self.assertEqual(expected, actual) - def test_treat_devices_added_returns_raises_for_missing_device(self): - with mock.patch.object(self.agent.plugin_rpc, - 'get_devices_details_list', - side_effect=Exception()),\ - mock.patch.object(self.agent.int_br, - 'get_vif_port_by_id', - return_value=mock.Mock()): - self.assertRaises( - self.mod_agent.DeviceListRetrievalError, - self.agent.treat_devices_added_or_updated, [{}], False) + def test_bind_devices(self): + devices_up = ['tap1'] + devices_down = ['tap2'] + self.agent.local_vlan_map["net1"] = mock.Mock() + port_details = [ + {'network_id': 'net1', 'vif_port': mock.Mock(), + 'device': devices_up[0], + 'admin_state_up': True}, + {'network_id': 'net1', 'vif_port': mock.Mock(), + 'device': devices_down[0], + 'admin_state_up': False}] + with mock.patch.object( + self.agent.plugin_rpc, 'update_device_list', + return_value={'devices_up': devices_up, + 'devices_down': devices_down, + 'failed_devices_up': [], + 'failed_devices_down': []}) as update_devices, \ + mock.patch.object(self.agent, + 'int_br') as int_br: + int_br.db_list.return_value = [] + self.agent._bind_devices(port_details) + update_devices.assert_called_once_with(mock.ANY, devices_up, + devices_down, + mock.ANY, mock.ANY) def _mock_treat_devices_added_updated(self, details, port, func_name): """Mock treat devices added or updated. @@ -342,11 +356,17 @@ class TestOvsNeutronAgent(object): :returns: whether the named function was called """ with mock.patch.object(self.agent.plugin_rpc, - 'get_devices_details_list', - return_value=[details]),\ + 'get_devices_details_list_and_failed_devices', + return_value={'devices': [details], + 'failed_devices': None}),\ mock.patch.object(self.agent.int_br, 'get_vifs_by_ids', return_value={details['device']: port}),\ + mock.patch.object(self.agent.plugin_rpc, 'update_device_list', + return_value={'devices_up': [], + 'devices_down': details, + 'failed_devices_up': [], + 'failed_devices_down': []}),\ mock.patch.object(self.agent, func_name) as func: skip_devs, need_bound_devices = ( self.agent.treat_devices_added_or_updated([{}], False)) @@ -367,8 +387,9 @@ class TestOvsNeutronAgent(object): mock.MagicMock(), port, 'port_dead')) def test_treat_devices_added_does_not_process_missing_port(self): - with mock.patch.object(self.agent.plugin_rpc, - 'get_device_details') as get_dev_fn,\ + with mock.patch.object( + self.agent.plugin_rpc, + 'get_devices_details_list_and_failed_devices') as get_dev_fn,\ mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value=None): @@ -384,8 +405,9 @@ class TestOvsNeutronAgent(object): dev_mock = mock.MagicMock() dev_mock.__getitem__.return_value = 'the_skipped_one' with mock.patch.object(self.agent.plugin_rpc, - 'get_devices_details_list', - return_value=[dev_mock]),\ + 'get_devices_details_list_and_failed_devices', + return_value={'devices': [dev_mock], + 'failed_devices': None}),\ mock.patch.object(self.agent.int_br, 'get_vifs_by_ids', return_value={}),\ @@ -411,8 +433,9 @@ class TestOvsNeutronAgent(object): } with mock.patch.object(self.agent.plugin_rpc, - 'get_devices_details_list', - return_value=[fake_details_dict]),\ + 'get_devices_details_list_and_failed_devices', + return_value={'devices': [fake_details_dict], + 'failed_devices': None}),\ mock.patch.object(self.agent.int_br, 'get_vifs_by_ids', return_value={'xxx': mock.MagicMock()}),\ @@ -424,15 +447,14 @@ class TestOvsNeutronAgent(object): self.assertFalse(skip_devs) self.assertTrue(treat_vif_port.called) - def test_treat_devices_removed_returns_true_for_missing_device(self): - with mock.patch.object(self.agent.plugin_rpc, 'update_device_down', - side_effect=Exception()): - self.assertTrue(self.agent.treat_devices_removed([{}])) - def _mock_treat_devices_removed(self, port_exists): details = dict(exists=port_exists) - with mock.patch.object(self.agent.plugin_rpc, 'update_device_down', - return_value=details): + with mock.patch.object(self.agent.plugin_rpc, + 'update_device_list', + return_value={'devices_up': [], + 'devices_down': details, + 'failed_devices_up': [], + 'failed_devices_down': []}): with mock.patch.object(self.agent, 'port_unbound') as port_unbound: self.assertFalse(self.agent.treat_devices_removed([{}])) self.assertTrue(port_unbound.called) @@ -1046,7 +1068,11 @@ class TestOvsNeutronAgent(object): 'physical_network', 'segmentation_id', 'admin_state_up', 'fixed_ips', 'device', 'device_owner')}] - self.agent.plugin_rpc.get_devices_details_list.return_value = plist + (self.agent.plugin_rpc.get_devices_details_list_and_failed_devices. + return_value) = {'devices': plist, 'failed_devices': []} + self.agent.plugin_rpc.update_device_list.return_value = { + 'devices_up': plist, 'devices_down': [], 'failed_devices_up': [], + 'failed_devices_down': []} self.agent.setup_arp_spoofing_protection = mock.Mock() self.agent.treat_devices_added_or_updated([], False) self.assertFalse(self.agent.setup_arp_spoofing_protection.called) @@ -1664,9 +1690,12 @@ class TestOvsDvrNeutronAgent(object): int_br.reset_mock() tun_br.reset_mock() with mock.patch.object(self.agent, 'reclaim_local_vlan'),\ - mock.patch.object(self.agent.plugin_rpc, - 'update_device_down', - return_value=None),\ + mock.patch.object(self.agent.plugin_rpc, 'update_device_list', + return_value={ + 'devices_up': [], + 'devices_down': [self._port.vif_id], + 'failed_devices_up': [], + 'failed_devices_down': []}),\ mock.patch.object(self.agent, 'int_br', new=int_br),\ mock.patch.object(self.agent, 'tun_br', new=tun_br),\ mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\ @@ -1766,9 +1795,13 @@ class TestOvsDvrNeutronAgent(object): int_br.reset_mock() tun_br.reset_mock() with mock.patch.object(self.agent, 'reclaim_local_vlan'),\ - mock.patch.object(self.agent.plugin_rpc, - 'update_device_down', - return_value=None),\ + mock.patch.object(self.agent.plugin_rpc, 'update_device_list', + return_value={ + 'devices_up': [], + 'devices_down': [ + self._compute_port.vif_id], + 'failed_devices_up': [], + 'failed_devices_down': []}),\ mock.patch.object(self.agent, 'int_br', new=int_br),\ mock.patch.object(self.agent, 'tun_br', new=tun_br),\ mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\ @@ -1853,9 +1886,12 @@ class TestOvsDvrNeutronAgent(object): int_br.reset_mock() tun_br.reset_mock() with mock.patch.object(self.agent, 'reclaim_local_vlan'),\ - mock.patch.object(self.agent.plugin_rpc, - 'update_device_down', - return_value=None),\ + mock.patch.object(self.agent.plugin_rpc, 'update_device_list', + return_value={ + 'devices_up': [], + 'devices_down': [self._port.vif_id], + 'failed_devices_up': [], + 'failed_devices_down': []}),\ mock.patch.object(self.agent, 'int_br', new=int_br),\ mock.patch.object(self.agent, 'tun_br', new=tun_br),\ mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\ diff --git a/neutron/tests/unit/plugins/ml2/test_rpc.py b/neutron/tests/unit/plugins/ml2/test_rpc.py index f0e1a360322..72775b9fe80 100644 --- a/neutron/tests/unit/plugins/ml2/test_rpc.py +++ b/neutron/tests/unit/plugins/ml2/test_rpc.py @@ -22,6 +22,7 @@ import collections import mock from oslo_config import cfg from oslo_context import context as oslo_context +import oslo_messaging from sqlalchemy.orm import exc from neutron.agent import rpc as agent_rpc @@ -134,27 +135,53 @@ class RpcCallbacksTestCase(base.BaseTestCase): self.callbacks.get_device_details(mock.Mock()) self.assertTrue(self.plugin.update_port_status.called) - def test_get_devices_details_list(self): + def _test_get_devices_list(self, callback, side_effect, expected): devices = [1, 2, 3, 4, 5] kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'} with mock.patch.object(self.callbacks, 'get_device_details', - side_effect=devices) as f: - res = self.callbacks.get_devices_details_list('fake_context', - devices=devices, - **kwargs) - self.assertEqual(devices, res) + side_effect=side_effect) as f: + res = callback('fake_context', devices=devices, **kwargs) + self.assertEqual(expected, res) self.assertEqual(len(devices), f.call_count) calls = [mock.call('fake_context', device=i, cached_networks={}, **kwargs) for i in devices] f.assert_has_calls(calls) + def test_get_devices_details_list(self): + devices = [1, 2, 3, 4, 5] + expected = devices + callback = self.callbacks.get_devices_details_list + self._test_get_devices_list(callback, devices, expected) + def test_get_devices_details_list_with_empty_devices(self): with mock.patch.object(self.callbacks, 'get_device_details') as f: res = self.callbacks.get_devices_details_list('fake_context') self.assertFalse(f.called) self.assertEqual([], res) + def test_get_devices_details_list_and_failed_devices(self): + devices = [1, 2, 3, 4, 5] + expected = {'devices': devices, 'failed_devices': []} + callback = ( + self.callbacks.get_devices_details_list_and_failed_devices) + self._test_get_devices_list(callback, devices, expected) + + def test_get_devices_details_list_and_failed_devices_failures(self): + devices = [1, Exception('testdevice'), 3, + Exception('testdevice'), 5] + expected = {'devices': [1, 3, 5], 'failed_devices': [2, 4]} + callback = ( + self.callbacks.get_devices_details_list_and_failed_devices) + self._test_get_devices_list(callback, devices, expected) + + def test_get_devices_details_list_and_failed_devices_empty_dev(self): + with mock.patch.object(self.callbacks, 'get_device_details') as f: + res = self.callbacks.get_devices_details_list_and_failed_devices( + 'fake_context') + self.assertFalse(f.called) + self.assertEqual({'devices': [], 'failed_devices': []}, res) + def _test_update_device_not_bound_to_host(self, func): self.plugin.port_bound_to_host.return_value = False self.plugin._device_to_port_id.return_value = 'fake_port_id' @@ -192,6 +219,64 @@ class RpcCallbacksTestCase(base.BaseTestCase): self.callbacks.update_device_down( mock.Mock(), device='fake_device')) + def _test_update_device_list(self, devices_up_side_effect, + devices_down_side_effect, expected): + devices_up = [1, 2, 3] + devices_down = [4, 5] + kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'} + with mock.patch.object(self.callbacks, 'update_device_up', + side_effect=devices_up_side_effect) as f_up, \ + mock.patch.object(self.callbacks, 'update_device_down', + side_effect=devices_down_side_effect) as f_down: + res = self.callbacks.update_device_list( + 'fake_context', devices_up=devices_up, + devices_down=devices_down, **kwargs) + self.assertEqual(expected, res) + self.assertEqual(len(devices_up), f_up.call_count) + self.assertEqual(len(devices_down), f_down.call_count) + + def test_update_device_list_no_failure(self): + devices_up_side_effect = [1, 2, 3] + devices_down_side_effect = [ + {'device': 4, 'exists': True}, + {'device': 5, 'exists': True}] + expected = {'devices_up': devices_up_side_effect, + 'failed_devices_up': [], + 'devices_down': + [{'device': 4, 'exists': True}, + {'device': 5, 'exists': True}], + 'failed_devices_down': []} + self._test_update_device_list(devices_up_side_effect, + devices_down_side_effect, + expected) + + def test_update_device_list_failed_devices(self): + + devices_up_side_effect = [1, Exception('testdevice'), 3] + devices_down_side_effect = [{'device': 4, 'exists': True}, + Exception('testdevice')] + expected = {'devices_up': [1, 3], + 'failed_devices_up': [2], + 'devices_down': + [{'device': 4, 'exists': True}], + 'failed_devices_down': [5]} + + self._test_update_device_list(devices_up_side_effect, + devices_down_side_effect, + expected) + + def test_update_device_list_empty_devices(self): + + expected = {'devices_up': [], + 'failed_devices_up': [], + 'devices_down': [], + 'failed_devices_down': []} + + kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'} + res = self.callbacks.update_device_list( + 'fake_context', devices_up=[], devices_down=[], **kwargs) + self.assertEqual(expected, res) + class RpcApiTestCase(base.BaseTestCase): @@ -314,3 +399,73 @@ class RpcApiTestCase(base.BaseTestCase): device='fake_device', agent_id='fake_agent_id', host='fake_host') + + def test_update_device_list(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + self._test_rpc_api(rpcapi, None, + 'update_device_list', rpc_method='call', + devices_up=['fake_device1', 'fake_device2'], + devices_down=['fake_device3', 'fake_device4'], + agent_id='fake_agent_id', + host='fake_host', + version='1.5') + + def test_update_device_list_unsupported(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + ctxt = oslo_context.RequestContext('fake_user', 'fake_project') + devices_up = ['fake_device1', 'fake_device2'] + devices_down = ['fake_device3', 'fake_device4'] + expected_ret_val = {'devices_up': ['fake_device2'], + 'failed_devices_up': ['fake_device1'], + 'devices_down': [ + {'device': 'fake_device3', 'exists': True}], + 'failed_devices_down': ['fake_device4']} + rpcapi.update_device_up = mock.Mock( + side_effect=[Exception('fake_device1 fails'), None]) + rpcapi.update_device_down = mock.Mock( + side_effect=[{'device': 'fake_device3', 'exists': True}, + Exception('fake_device4 fails')]) + with mock.patch.object(rpcapi.client, 'call'),\ + mock.patch.object(rpcapi.client, 'prepare') as prepare_mock: + prepare_mock.side_effect = oslo_messaging.UnsupportedVersion( + 'test') + res = rpcapi.update_device_list(ctxt, devices_up, devices_down, + 'fake_agent_id', 'fake_host') + self.assertEqual(expected_ret_val, res) + + def test_get_devices_details_list_and_failed_devices(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + self._test_rpc_api(rpcapi, None, + 'get_devices_details_list_and_failed_devices', + rpc_method='call', + devices=['fake_device1', 'fake_device2'], + agent_id='fake_agent_id', + host='fake_host', + version='1.5') + + def test_devices_details_list_and_failed_devices(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + self._test_rpc_api(rpcapi, None, + 'get_devices_details_list_and_failed_devices', + rpc_method='call', + devices=['fake_device1', 'fake_device2'], + agent_id='fake_agent_id', host='fake_host', + version='1.5') + + def test_get_devices_details_list_and_failed_devices_unsupported(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + ctxt = oslo_context.RequestContext('fake_user', 'fake_project') + devices = ['fake_device1', 'fake_device2'] + dev2_details = {'device': 'fake_device2', 'network_id': 'net_id', + 'port_id': 'port_id', 'admin_state_up': True} + expected_ret_val = {'devices': [dev2_details], + 'failed_devices': ['fake_device1']} + rpcapi.get_device_details = mock.Mock( + side_effect=[Exception('fake_device1 fails'), dev2_details]) + with mock.patch.object(rpcapi.client, 'call'),\ + mock.patch.object(rpcapi.client, 'prepare') as prepare_mock: + prepare_mock.side_effect = oslo_messaging.UnsupportedVersion( + 'test') + res = rpcapi.get_devices_details_list_and_failed_devices( + ctxt, devices, 'fake_agent_id', 'fake_host') + self.assertEqual(expected_ret_val, res)