Merge "L2 agent RPC add new RPC calls"
This commit is contained in:
commit
a320a9bf15
|
@ -95,6 +95,8 @@ class PluginApi(object):
|
|||
return value to include fixed_ips and device_owner for
|
||||
the device port
|
||||
1.4 - tunnel_sync rpc signature upgrade to obtain 'host'
|
||||
1.5 - Support update_device_list and
|
||||
get_devices_details_list_and_failed_devices
|
||||
'''
|
||||
|
||||
def __init__(self, topic):
|
||||
|
@ -123,6 +125,26 @@ class PluginApi(object):
|
|||
]
|
||||
return res
|
||||
|
||||
def get_devices_details_list_and_failed_devices(self, context, devices,
|
||||
agent_id, host=None):
|
||||
"""Get devices details and the list of devices that failed.
|
||||
|
||||
This method returns the devices details. If an error is thrown when
|
||||
retrieving the devices details, the device is put in a list of
|
||||
failed devices.
|
||||
"""
|
||||
try:
|
||||
cctxt = self.client.prepare(version='1.5')
|
||||
res = cctxt.call(
|
||||
context,
|
||||
'get_devices_details_list_and_failed_devices',
|
||||
devices=devices, agent_id=agent_id, host=host)
|
||||
except oslo_messaging.UnsupportedVersion:
|
||||
#TODO(rossella_s): Remove this failback logic in M
|
||||
res = self._device_list_rpc_call_with_failed_dev(
|
||||
self.get_device_details, context, agent_id, host, devices)
|
||||
return res
|
||||
|
||||
def update_device_down(self, context, device, agent_id, host=None):
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(context, 'update_device_down', device=device,
|
||||
|
@ -133,6 +155,41 @@ class PluginApi(object):
|
|||
return cctxt.call(context, 'update_device_up', device=device,
|
||||
agent_id=agent_id, host=host)
|
||||
|
||||
def _device_list_rpc_call_with_failed_dev(self, rpc_call, context,
|
||||
agent_id, host, devices):
|
||||
succeeded_devices = []
|
||||
failed_devices = []
|
||||
for device in devices:
|
||||
try:
|
||||
rpc_device = rpc_call(context, device, agent_id, host)
|
||||
except Exception:
|
||||
failed_devices.append(device)
|
||||
else:
|
||||
# update_device_up doesn't return the device
|
||||
succeeded_dev = rpc_device or device
|
||||
succeeded_devices.append(succeeded_dev)
|
||||
return {'devices': succeeded_devices, 'failed_devices': failed_devices}
|
||||
|
||||
def update_device_list(self, context, devices_up, devices_down,
|
||||
agent_id, host):
|
||||
try:
|
||||
cctxt = self.client.prepare(version='1.5')
|
||||
res = cctxt.call(context, 'update_device_list',
|
||||
devices_up=devices_up, devices_down=devices_down,
|
||||
agent_id=agent_id, host=host)
|
||||
except oslo_messaging.UnsupportedVersion:
|
||||
#TODO(rossella_s): Remove this failback logic in M
|
||||
dev_up = self._device_list_rpc_call_with_failed_dev(
|
||||
self.update_device_up, context, agent_id, host, devices_up)
|
||||
dev_down = self._device_list_rpc_call_with_failed_dev(
|
||||
self.update_device_down, context, agent_id, host, devices_down)
|
||||
|
||||
res = {'devices_up': dev_up.get('devices'),
|
||||
'failed_devices_up': dev_up.get('failed_devices'),
|
||||
'devices_down': dev_down.get('devices'),
|
||||
'failed_devices_down': dev_down.get('failed_devices')}
|
||||
return res
|
||||
|
||||
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None):
|
||||
try:
|
||||
cctxt = self.client.prepare(version='1.4')
|
||||
|
|
|
@ -64,8 +64,7 @@ class _mac_mydialect(netaddr.mac_unix):
|
|||
|
||||
|
||||
class DeviceListRetrievalError(exceptions.NeutronException):
|
||||
message = _("Unable to retrieve port details for devices: %(devices)s "
|
||||
"because of error: %(error)s")
|
||||
message = _("Unable to retrieve port details for devices: %(devices)s ")
|
||||
|
||||
|
||||
# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
|
||||
|
@ -740,6 +739,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
port_other_config)
|
||||
|
||||
def _bind_devices(self, need_binding_ports):
|
||||
devices_up = []
|
||||
devices_down = []
|
||||
port_info = self.int_br.db_list(
|
||||
"Port", columns=["name", "tag"])
|
||||
tags_by_name = {x['name']: x['tag'] for x in port_info}
|
||||
|
@ -768,13 +769,26 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
# API server, thus possibly preventing instance spawn.
|
||||
if port_detail.get('admin_state_up'):
|
||||
LOG.debug("Setting status for %s to UP", device)
|
||||
self.plugin_rpc.update_device_up(
|
||||
self.context, device, self.agent_id, self.conf.host)
|
||||
devices_up.append(device)
|
||||
else:
|
||||
LOG.debug("Setting status for %s to DOWN", device)
|
||||
self.plugin_rpc.update_device_down(
|
||||
self.context, device, self.agent_id, self.conf.host)
|
||||
LOG.info(_LI("Configuration for device %s completed."), device)
|
||||
devices_down.append(device)
|
||||
failed_devices = []
|
||||
if devices_up or devices_down:
|
||||
devices_set = self.plugin_rpc.update_device_list(
|
||||
self.context, devices_up, devices_down, self.agent_id,
|
||||
self.conf.host)
|
||||
failed_devices = (devices_set.get('failed_devices_up') +
|
||||
devices_set.get('failed_devices_down'))
|
||||
if failed_devices:
|
||||
LOG.error(_LE("Configuration for devices %s failed!"),
|
||||
failed_devices)
|
||||
#TODO(rossella_s) handle better the resync in next patches,
|
||||
# this is just to preserve the current behavior
|
||||
raise DeviceListRetrievalError(devices=failed_devices)
|
||||
LOG.info(_LI("Configuration for devices up %(up)s and devices "
|
||||
"down %(down)s completed."),
|
||||
{'up': devices_up, 'down': devices_down})
|
||||
|
||||
@staticmethod
|
||||
def setup_arp_spoofing_protection(bridge, vif, port_details):
|
||||
|
@ -1201,17 +1215,21 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
def treat_devices_added_or_updated(self, devices, ovs_restarted):
|
||||
skipped_devices = []
|
||||
need_binding_devices = []
|
||||
try:
|
||||
devices_details_list = self.plugin_rpc.get_devices_details_list(
|
||||
devices_details_list = (
|
||||
self.plugin_rpc.get_devices_details_list_and_failed_devices(
|
||||
self.context,
|
||||
devices,
|
||||
self.agent_id,
|
||||
self.conf.host)
|
||||
except Exception as e:
|
||||
raise DeviceListRetrievalError(devices=devices, error=e)
|
||||
self.conf.host))
|
||||
if devices_details_list.get('failed_devices'):
|
||||
#TODO(rossella_s) handle better the resync in next patches,
|
||||
# this is just to preserve the current behavior
|
||||
raise DeviceListRetrievalError(devices=devices)
|
||||
|
||||
devices = devices_details_list.get('devices')
|
||||
vif_by_id = self.int_br.get_vifs_by_ids(
|
||||
[vif['device'] for vif in devices_details_list])
|
||||
for details in devices_details_list:
|
||||
[vif['device'] for vif in devices])
|
||||
for details in devices:
|
||||
device = details['device']
|
||||
LOG.debug("Processing port: %s", device)
|
||||
port = vif_by_id.get(device)
|
||||
|
@ -1247,62 +1265,67 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
return skipped_devices, need_binding_devices
|
||||
|
||||
def treat_ancillary_devices_added(self, devices):
|
||||
try:
|
||||
devices_details_list = self.plugin_rpc.get_devices_details_list(
|
||||
devices_details_list = (
|
||||
self.plugin_rpc.get_devices_details_list_and_failed_devices(
|
||||
self.context,
|
||||
devices,
|
||||
self.agent_id,
|
||||
self.conf.host)
|
||||
except Exception as e:
|
||||
raise DeviceListRetrievalError(devices=devices, error=e)
|
||||
|
||||
for details in devices_details_list:
|
||||
device = details['device']
|
||||
LOG.info(_LI("Ancillary Port %s added"), device)
|
||||
self.conf.host))
|
||||
if devices_details_list.get('failed_devices'):
|
||||
#TODO(rossella_s) handle better the resync in next patches,
|
||||
# this is just to preserve the current behavior
|
||||
raise DeviceListRetrievalError(devices=devices)
|
||||
devices_added = [
|
||||
d['device'] for d in devices_details_list.get('devices')]
|
||||
LOG.info(_LI("Ancillary Ports %s added"), devices_added)
|
||||
|
||||
# update plugin about port status
|
||||
self.plugin_rpc.update_device_up(self.context,
|
||||
device,
|
||||
devices_set_up = (
|
||||
self.plugin_rpc.update_device_list(self.context,
|
||||
devices_added,
|
||||
[],
|
||||
self.agent_id,
|
||||
self.conf.host)
|
||||
self.conf.host))
|
||||
if devices_set_up.get('failed_devices_up'):
|
||||
#TODO(rossella_s) handle better the resync in next patches,
|
||||
# this is just to preserve the current behavior
|
||||
raise DeviceListRetrievalError()
|
||||
|
||||
def treat_devices_removed(self, devices):
|
||||
resync = False
|
||||
self.sg_agent.remove_devices_filter(devices)
|
||||
for device in devices:
|
||||
LOG.info(_LI("Attachment %s removed"), device)
|
||||
try:
|
||||
self.plugin_rpc.update_device_down(self.context,
|
||||
device,
|
||||
LOG.info(_LI("Ports %s removed"), devices)
|
||||
devices_down = self.plugin_rpc.update_device_list(self.context,
|
||||
[],
|
||||
devices,
|
||||
self.agent_id,
|
||||
self.conf.host)
|
||||
except Exception as e:
|
||||
LOG.debug("port_removed failed for %(device)s: %(e)s",
|
||||
{'device': device, 'e': e})
|
||||
failed_devices = devices_down.get('failed_devices_down')
|
||||
if failed_devices:
|
||||
LOG.debug("Port removal failed for %(devices)s ", failed_devices)
|
||||
resync = True
|
||||
continue
|
||||
for device in devices:
|
||||
self.port_unbound(device)
|
||||
return resync
|
||||
|
||||
def treat_ancillary_devices_removed(self, devices):
|
||||
resync = False
|
||||
for device in devices:
|
||||
LOG.info(_LI("Attachment %s removed"), device)
|
||||
try:
|
||||
details = self.plugin_rpc.update_device_down(self.context,
|
||||
device,
|
||||
LOG.info(_LI("Ancillary ports %s removed"), devices)
|
||||
devices_down = self.plugin_rpc.update_device_list(self.context,
|
||||
[],
|
||||
devices,
|
||||
self.agent_id,
|
||||
self.conf.host)
|
||||
except Exception as e:
|
||||
LOG.debug("port_removed failed for %(device)s: %(e)s",
|
||||
{'device': device, 'e': e})
|
||||
failed_devices = devices_down.get('failed_devices_down')
|
||||
if failed_devices:
|
||||
LOG.debug("Port removal failed for %(devices)s ", failed_devices)
|
||||
resync = True
|
||||
continue
|
||||
if details['exists']:
|
||||
LOG.info(_LI("Port %s updated."), device)
|
||||
for detail in devices_down.get('devices_down'):
|
||||
if detail['exists']:
|
||||
LOG.info(_LI("Port %s updated."), detail['device'])
|
||||
# Nothing to do regarding local networking
|
||||
else:
|
||||
LOG.debug("Device %s not defined on plugin", device)
|
||||
LOG.debug("Device %s not defined on plugin", detail['device'])
|
||||
return resync
|
||||
|
||||
def process_network_ports(self, port_info, ovs_restarted):
|
||||
|
@ -1355,7 +1378,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
port_info.get('updated', set()))
|
||||
self._bind_devices(need_binding_devices)
|
||||
|
||||
if 'removed' in port_info:
|
||||
if 'removed' in port_info and port_info['removed']:
|
||||
start = time.time()
|
||||
resync_b = self.treat_devices_removed(port_info['removed'])
|
||||
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
|
||||
|
@ -1368,7 +1391,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
def process_ancillary_network_ports(self, port_info):
|
||||
resync_a = False
|
||||
resync_b = False
|
||||
if 'added' in port_info:
|
||||
if 'added' in port_info and port_info['added']:
|
||||
start = time.time()
|
||||
try:
|
||||
self.treat_ancillary_devices_added(port_info['added'])
|
||||
|
@ -1384,7 +1407,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
"iteration:%d - failure while retrieving "
|
||||
"port details from server"), self.iter_num)
|
||||
resync_a = True
|
||||
if 'removed' in port_info:
|
||||
if 'removed' in port_info and port_info['removed']:
|
||||
start = time.time()
|
||||
resync_b = self.treat_ancillary_devices_removed(
|
||||
port_info['removed'])
|
||||
|
|
|
@ -28,7 +28,7 @@ from neutron.common import rpc as n_rpc
|
|||
from neutron.common import topics
|
||||
from neutron.extensions import portbindings
|
||||
from neutron.extensions import portsecurity as psec
|
||||
from neutron.i18n import _LW
|
||||
from neutron.i18n import _LE, _LW
|
||||
from neutron import manager
|
||||
from neutron.plugins.ml2 import driver_api as api
|
||||
from neutron.plugins.ml2.drivers import type_tunnel
|
||||
|
@ -48,7 +48,9 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||
# return value to include fixed_ips and device_owner for
|
||||
# the device port
|
||||
# 1.4 tunnel_sync rpc signature upgrade to obtain 'host'
|
||||
target = oslo_messaging.Target(version='1.4')
|
||||
# 1.5 Support update_device_list and
|
||||
# get_devices_details_list_and_failed_devices
|
||||
target = oslo_messaging.Target(version='1.5')
|
||||
|
||||
def __init__(self, notifier, type_manager):
|
||||
self.setup_tunnel_callback_mixin(notifier, type_manager)
|
||||
|
@ -135,6 +137,27 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||
for device in kwargs.pop('devices', [])
|
||||
]
|
||||
|
||||
def get_devices_details_list_and_failed_devices(self,
|
||||
rpc_context,
|
||||
**kwargs):
|
||||
devices = []
|
||||
failed_devices = []
|
||||
cached_networks = {}
|
||||
for device in kwargs.pop('devices', []):
|
||||
try:
|
||||
devices.append(self.get_device_details(
|
||||
rpc_context,
|
||||
device=device,
|
||||
cached_networks=cached_networks,
|
||||
**kwargs))
|
||||
except Exception:
|
||||
LOG.error(_LE("Failed to get details for device %s"),
|
||||
device)
|
||||
failed_devices.append(device)
|
||||
|
||||
return {'devices': devices,
|
||||
'failed_devices': failed_devices}
|
||||
|
||||
def update_device_down(self, rpc_context, **kwargs):
|
||||
"""Device no longer exists on agent."""
|
||||
# TODO(garyk) - live migration and port status
|
||||
|
@ -201,6 +224,44 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||
registry.notify(
|
||||
resources.PORT, events.AFTER_UPDATE, plugin, **kwargs)
|
||||
|
||||
def update_device_list(self, rpc_context, **kwargs):
|
||||
devices_up = []
|
||||
failed_devices_up = []
|
||||
devices_down = []
|
||||
failed_devices_down = []
|
||||
devices = kwargs.get('devices_up')
|
||||
if devices:
|
||||
for device in devices:
|
||||
try:
|
||||
self.update_device_up(
|
||||
rpc_context,
|
||||
device=device,
|
||||
**kwargs)
|
||||
except Exception:
|
||||
failed_devices_up.append(device)
|
||||
LOG.error(_LE("Failed to update device %s up"), device)
|
||||
else:
|
||||
devices_up.append(device)
|
||||
|
||||
devices = kwargs.get('devices_down')
|
||||
if devices:
|
||||
for device in devices:
|
||||
try:
|
||||
dev = self.update_device_down(
|
||||
rpc_context,
|
||||
device=device,
|
||||
**kwargs)
|
||||
except Exception:
|
||||
failed_devices_down.append(device)
|
||||
LOG.error(_LE("Failed to update device %s down"), device)
|
||||
else:
|
||||
devices_down.append(dev)
|
||||
|
||||
return {'devices_up': devices_up,
|
||||
'failed_devices_up': failed_devices_up,
|
||||
'devices_down': devices_down,
|
||||
'failed_devices_down': failed_devices_down}
|
||||
|
||||
|
||||
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
|
||||
sg_rpc.SecurityGroupAgentRpcApiMixin,
|
||||
|
|
|
@ -202,11 +202,6 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
|||
for port in [self.patch_tun, self.patch_int]:
|
||||
self.assertTrue(self.ovs.port_exists(port))
|
||||
|
||||
def assert_no_vlan_tags(self, ports, agent):
|
||||
for port in ports:
|
||||
res = agent.int_br.db_get_val('Port', port.get('vif_name'), 'tag')
|
||||
self.assertEqual([], res)
|
||||
|
||||
def assert_vlan_tags(self, ports, agent):
|
||||
for port in ports:
|
||||
res = agent.int_br.db_get_val('Port', port.get('vif_name'), 'tag')
|
||||
|
@ -215,30 +210,64 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
|||
|
||||
class TestOVSAgent(OVSAgentTestFramework):
|
||||
|
||||
def _expected_plugin_rpc_call(self, call, expected_devices):
|
||||
def _expected_plugin_rpc_call(self, call, expected_devices, is_up=True):
|
||||
"""Helper to check expected rpc call are received
|
||||
:param call: The call to check
|
||||
:param expected_devices The device for which call is expected
|
||||
:param is_up True if expected_devices are devices that are set up,
|
||||
False if expected_devices are devices that are set down
|
||||
"""
|
||||
args = (args[0][1] for args in call.call_args_list)
|
||||
return not (set(expected_devices) - set(args))
|
||||
if is_up:
|
||||
rpc_devices = [
|
||||
dev for args in call.call_args_list for dev in args[0][1]]
|
||||
else:
|
||||
rpc_devices = [
|
||||
dev for args in call.call_args_list for dev in args[0][2]]
|
||||
return not (set(expected_devices) - set(rpc_devices))
|
||||
|
||||
def _create_ports(self, network, agent):
|
||||
def _create_ports(self, network, agent, trigger_resync=False):
|
||||
ports = []
|
||||
for x in range(3):
|
||||
ports.append(self._create_test_port_dict())
|
||||
|
||||
def mock_device_raise_exception(context, devices_up, devices_down,
|
||||
agent_id, host=None):
|
||||
agent.plugin_rpc.update_device_list.side_effect = (
|
||||
mock_update_device)
|
||||
raise Exception('Exception to trigger resync')
|
||||
|
||||
def mock_device_details(context, devices, agent_id, host=None):
|
||||
|
||||
details = []
|
||||
for port in ports:
|
||||
if port['id'] in devices:
|
||||
dev = OVSAgentTestFramework._get_device_details(
|
||||
port, network)
|
||||
details.append(dev)
|
||||
return details
|
||||
return {'devices': details, 'failed_devices': []}
|
||||
|
||||
agent.plugin_rpc.get_devices_details_list.side_effect = (
|
||||
mock_device_details)
|
||||
def mock_update_device(context, devices_up, devices_down, agent_id,
|
||||
host=None):
|
||||
dev_up = []
|
||||
dev_down = []
|
||||
for port in ports:
|
||||
if devices_up and port['id'] in devices_up:
|
||||
dev_up.append(port['id'])
|
||||
if devices_down and port['id'] in devices_down:
|
||||
dev_down.append({'device': port['id'], 'exists': True})
|
||||
return {'devices_up': dev_up,
|
||||
'failed_devices_up': [],
|
||||
'devices_down': dev_down,
|
||||
'failed_devices_down': []}
|
||||
|
||||
(agent.plugin_rpc.get_devices_details_list_and_failed_devices.
|
||||
side_effect) = mock_device_details
|
||||
if trigger_resync:
|
||||
agent.plugin_rpc.update_device_list.side_effect = (
|
||||
mock_device_raise_exception)
|
||||
else:
|
||||
agent.plugin_rpc.update_device_list.side_effect = (
|
||||
mock_update_device)
|
||||
return ports
|
||||
|
||||
def test_port_creation_and_deletion(self):
|
||||
|
@ -250,39 +279,35 @@ class TestOVSAgent(OVSAgentTestFramework):
|
|||
up_ports_ids = [p['id'] for p in ports]
|
||||
agent_utils.wait_until_true(
|
||||
lambda: self._expected_plugin_rpc_call(
|
||||
agent.plugin_rpc.update_device_up, up_ports_ids))
|
||||
agent.plugin_rpc.update_device_list, up_ports_ids))
|
||||
down_ports_ids = [p['id'] for p in ports]
|
||||
for port in ports:
|
||||
agent.int_br.delete_port(port['vif_name'])
|
||||
agent_utils.wait_until_true(
|
||||
lambda: self._expected_plugin_rpc_call(
|
||||
agent.plugin_rpc.update_device_down, down_ports_ids))
|
||||
agent.plugin_rpc.update_device_list, down_ports_ids, False))
|
||||
|
||||
def test_resync_devices_set_up_after_exception(self):
|
||||
agent = self.create_agent()
|
||||
self.start_agent(agent)
|
||||
network = self._create_test_network_dict()
|
||||
ports = self._create_ports(network, agent)
|
||||
agent.plugin_rpc.update_device_up.side_effect = [
|
||||
Exception('Exception to trigger resync'),
|
||||
None, None, None]
|
||||
ports = self._create_ports(network, agent, True)
|
||||
self._plug_ports(network, ports, agent)
|
||||
ports_ids = [p['id'] for p in ports]
|
||||
agent_utils.wait_until_true(
|
||||
lambda: self._expected_plugin_rpc_call(
|
||||
agent.plugin_rpc.update_device_up, ports_ids))
|
||||
agent.plugin_rpc.update_device_list, ports_ids))
|
||||
|
||||
def test_port_vlan_tags(self):
|
||||
agent = self.create_agent()
|
||||
self.start_agent(agent)
|
||||
ports = []
|
||||
for x in range(3):
|
||||
ports.append(self._create_test_port_dict())
|
||||
network = self._create_test_network_dict()
|
||||
ports = self._create_ports(network, agent)
|
||||
ports_ids = [p['id'] for p in ports]
|
||||
self._plug_ports(network, ports, agent)
|
||||
agent.provision_local_vlan(network['id'], 'vlan', 'physnet', 1)
|
||||
self.assert_no_vlan_tags(ports, agent)
|
||||
self._bind_ports(ports, network, agent)
|
||||
agent_utils.wait_until_true(
|
||||
lambda: self._expected_plugin_rpc_call(
|
||||
agent.plugin_rpc.update_device_list, ports_ids))
|
||||
self.assert_vlan_tags(ports, agent)
|
||||
|
||||
def test_assert_bridges_ports_vxlan(self):
|
||||
|
|
|
@ -322,16 +322,30 @@ class TestOvsNeutronAgent(object):
|
|||
vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_treat_devices_added_returns_raises_for_missing_device(self):
|
||||
with mock.patch.object(self.agent.plugin_rpc,
|
||||
'get_devices_details_list',
|
||||
side_effect=Exception()),\
|
||||
mock.patch.object(self.agent.int_br,
|
||||
'get_vif_port_by_id',
|
||||
return_value=mock.Mock()):
|
||||
self.assertRaises(
|
||||
self.mod_agent.DeviceListRetrievalError,
|
||||
self.agent.treat_devices_added_or_updated, [{}], False)
|
||||
def test_bind_devices(self):
|
||||
devices_up = ['tap1']
|
||||
devices_down = ['tap2']
|
||||
self.agent.local_vlan_map["net1"] = mock.Mock()
|
||||
port_details = [
|
||||
{'network_id': 'net1', 'vif_port': mock.Mock(),
|
||||
'device': devices_up[0],
|
||||
'admin_state_up': True},
|
||||
{'network_id': 'net1', 'vif_port': mock.Mock(),
|
||||
'device': devices_down[0],
|
||||
'admin_state_up': False}]
|
||||
with mock.patch.object(
|
||||
self.agent.plugin_rpc, 'update_device_list',
|
||||
return_value={'devices_up': devices_up,
|
||||
'devices_down': devices_down,
|
||||
'failed_devices_up': [],
|
||||
'failed_devices_down': []}) as update_devices, \
|
||||
mock.patch.object(self.agent,
|
||||
'int_br') as int_br:
|
||||
int_br.db_list.return_value = []
|
||||
self.agent._bind_devices(port_details)
|
||||
update_devices.assert_called_once_with(mock.ANY, devices_up,
|
||||
devices_down,
|
||||
mock.ANY, mock.ANY)
|
||||
|
||||
def _mock_treat_devices_added_updated(self, details, port, func_name):
|
||||
"""Mock treat devices added or updated.
|
||||
|
@ -342,11 +356,17 @@ class TestOvsNeutronAgent(object):
|
|||
:returns: whether the named function was called
|
||||
"""
|
||||
with mock.patch.object(self.agent.plugin_rpc,
|
||||
'get_devices_details_list',
|
||||
return_value=[details]),\
|
||||
'get_devices_details_list_and_failed_devices',
|
||||
return_value={'devices': [details],
|
||||
'failed_devices': None}),\
|
||||
mock.patch.object(self.agent.int_br,
|
||||
'get_vifs_by_ids',
|
||||
return_value={details['device']: port}),\
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_list',
|
||||
return_value={'devices_up': [],
|
||||
'devices_down': details,
|
||||
'failed_devices_up': [],
|
||||
'failed_devices_down': []}),\
|
||||
mock.patch.object(self.agent, func_name) as func:
|
||||
skip_devs, need_bound_devices = (
|
||||
self.agent.treat_devices_added_or_updated([{}], False))
|
||||
|
@ -367,8 +387,9 @@ class TestOvsNeutronAgent(object):
|
|||
mock.MagicMock(), port, 'port_dead'))
|
||||
|
||||
def test_treat_devices_added_does_not_process_missing_port(self):
|
||||
with mock.patch.object(self.agent.plugin_rpc,
|
||||
'get_device_details') as get_dev_fn,\
|
||||
with mock.patch.object(
|
||||
self.agent.plugin_rpc,
|
||||
'get_devices_details_list_and_failed_devices') as get_dev_fn,\
|
||||
mock.patch.object(self.agent.int_br,
|
||||
'get_vif_port_by_id',
|
||||
return_value=None):
|
||||
|
@ -384,8 +405,9 @@ class TestOvsNeutronAgent(object):
|
|||
dev_mock = mock.MagicMock()
|
||||
dev_mock.__getitem__.return_value = 'the_skipped_one'
|
||||
with mock.patch.object(self.agent.plugin_rpc,
|
||||
'get_devices_details_list',
|
||||
return_value=[dev_mock]),\
|
||||
'get_devices_details_list_and_failed_devices',
|
||||
return_value={'devices': [dev_mock],
|
||||
'failed_devices': None}),\
|
||||
mock.patch.object(self.agent.int_br,
|
||||
'get_vifs_by_ids',
|
||||
return_value={}),\
|
||||
|
@ -411,8 +433,9 @@ class TestOvsNeutronAgent(object):
|
|||
}
|
||||
|
||||
with mock.patch.object(self.agent.plugin_rpc,
|
||||
'get_devices_details_list',
|
||||
return_value=[fake_details_dict]),\
|
||||
'get_devices_details_list_and_failed_devices',
|
||||
return_value={'devices': [fake_details_dict],
|
||||
'failed_devices': None}),\
|
||||
mock.patch.object(self.agent.int_br,
|
||||
'get_vifs_by_ids',
|
||||
return_value={'xxx': mock.MagicMock()}),\
|
||||
|
@ -424,15 +447,14 @@ class TestOvsNeutronAgent(object):
|
|||
self.assertFalse(skip_devs)
|
||||
self.assertTrue(treat_vif_port.called)
|
||||
|
||||
def test_treat_devices_removed_returns_true_for_missing_device(self):
|
||||
with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
|
||||
side_effect=Exception()):
|
||||
self.assertTrue(self.agent.treat_devices_removed([{}]))
|
||||
|
||||
def _mock_treat_devices_removed(self, port_exists):
|
||||
details = dict(exists=port_exists)
|
||||
with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
|
||||
return_value=details):
|
||||
with mock.patch.object(self.agent.plugin_rpc,
|
||||
'update_device_list',
|
||||
return_value={'devices_up': [],
|
||||
'devices_down': details,
|
||||
'failed_devices_up': [],
|
||||
'failed_devices_down': []}):
|
||||
with mock.patch.object(self.agent, 'port_unbound') as port_unbound:
|
||||
self.assertFalse(self.agent.treat_devices_removed([{}]))
|
||||
self.assertTrue(port_unbound.called)
|
||||
|
@ -1046,7 +1068,11 @@ class TestOvsNeutronAgent(object):
|
|||
'physical_network', 'segmentation_id',
|
||||
'admin_state_up', 'fixed_ips', 'device',
|
||||
'device_owner')}]
|
||||
self.agent.plugin_rpc.get_devices_details_list.return_value = plist
|
||||
(self.agent.plugin_rpc.get_devices_details_list_and_failed_devices.
|
||||
return_value) = {'devices': plist, 'failed_devices': []}
|
||||
self.agent.plugin_rpc.update_device_list.return_value = {
|
||||
'devices_up': plist, 'devices_down': [], 'failed_devices_up': [],
|
||||
'failed_devices_down': []}
|
||||
self.agent.setup_arp_spoofing_protection = mock.Mock()
|
||||
self.agent.treat_devices_added_or_updated([], False)
|
||||
self.assertFalse(self.agent.setup_arp_spoofing_protection.called)
|
||||
|
@ -1664,9 +1690,12 @@ class TestOvsDvrNeutronAgent(object):
|
|||
int_br.reset_mock()
|
||||
tun_br.reset_mock()
|
||||
with mock.patch.object(self.agent, 'reclaim_local_vlan'),\
|
||||
mock.patch.object(self.agent.plugin_rpc,
|
||||
'update_device_down',
|
||||
return_value=None),\
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_list',
|
||||
return_value={
|
||||
'devices_up': [],
|
||||
'devices_down': [self._port.vif_id],
|
||||
'failed_devices_up': [],
|
||||
'failed_devices_down': []}),\
|
||||
mock.patch.object(self.agent, 'int_br', new=int_br),\
|
||||
mock.patch.object(self.agent, 'tun_br', new=tun_br),\
|
||||
mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\
|
||||
|
@ -1766,9 +1795,13 @@ class TestOvsDvrNeutronAgent(object):
|
|||
int_br.reset_mock()
|
||||
tun_br.reset_mock()
|
||||
with mock.patch.object(self.agent, 'reclaim_local_vlan'),\
|
||||
mock.patch.object(self.agent.plugin_rpc,
|
||||
'update_device_down',
|
||||
return_value=None),\
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_list',
|
||||
return_value={
|
||||
'devices_up': [],
|
||||
'devices_down': [
|
||||
self._compute_port.vif_id],
|
||||
'failed_devices_up': [],
|
||||
'failed_devices_down': []}),\
|
||||
mock.patch.object(self.agent, 'int_br', new=int_br),\
|
||||
mock.patch.object(self.agent, 'tun_br', new=tun_br),\
|
||||
mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\
|
||||
|
@ -1853,9 +1886,12 @@ class TestOvsDvrNeutronAgent(object):
|
|||
int_br.reset_mock()
|
||||
tun_br.reset_mock()
|
||||
with mock.patch.object(self.agent, 'reclaim_local_vlan'),\
|
||||
mock.patch.object(self.agent.plugin_rpc,
|
||||
'update_device_down',
|
||||
return_value=None),\
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_list',
|
||||
return_value={
|
||||
'devices_up': [],
|
||||
'devices_down': [self._port.vif_id],
|
||||
'failed_devices_up': [],
|
||||
'failed_devices_down': []}),\
|
||||
mock.patch.object(self.agent, 'int_br', new=int_br),\
|
||||
mock.patch.object(self.agent, 'tun_br', new=tun_br),\
|
||||
mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\
|
||||
|
|
|
@ -22,6 +22,7 @@ import collections
|
|||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_context import context as oslo_context
|
||||
import oslo_messaging
|
||||
from sqlalchemy.orm import exc
|
||||
|
||||
from neutron.agent import rpc as agent_rpc
|
||||
|
@ -134,27 +135,53 @@ class RpcCallbacksTestCase(base.BaseTestCase):
|
|||
self.callbacks.get_device_details(mock.Mock())
|
||||
self.assertTrue(self.plugin.update_port_status.called)
|
||||
|
||||
def test_get_devices_details_list(self):
|
||||
def _test_get_devices_list(self, callback, side_effect, expected):
|
||||
devices = [1, 2, 3, 4, 5]
|
||||
kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}
|
||||
with mock.patch.object(self.callbacks, 'get_device_details',
|
||||
side_effect=devices) as f:
|
||||
res = self.callbacks.get_devices_details_list('fake_context',
|
||||
devices=devices,
|
||||
**kwargs)
|
||||
self.assertEqual(devices, res)
|
||||
side_effect=side_effect) as f:
|
||||
res = callback('fake_context', devices=devices, **kwargs)
|
||||
self.assertEqual(expected, res)
|
||||
self.assertEqual(len(devices), f.call_count)
|
||||
calls = [mock.call('fake_context', device=i,
|
||||
cached_networks={}, **kwargs)
|
||||
for i in devices]
|
||||
f.assert_has_calls(calls)
|
||||
|
||||
def test_get_devices_details_list(self):
|
||||
devices = [1, 2, 3, 4, 5]
|
||||
expected = devices
|
||||
callback = self.callbacks.get_devices_details_list
|
||||
self._test_get_devices_list(callback, devices, expected)
|
||||
|
||||
def test_get_devices_details_list_with_empty_devices(self):
|
||||
with mock.patch.object(self.callbacks, 'get_device_details') as f:
|
||||
res = self.callbacks.get_devices_details_list('fake_context')
|
||||
self.assertFalse(f.called)
|
||||
self.assertEqual([], res)
|
||||
|
||||
def test_get_devices_details_list_and_failed_devices(self):
|
||||
devices = [1, 2, 3, 4, 5]
|
||||
expected = {'devices': devices, 'failed_devices': []}
|
||||
callback = (
|
||||
self.callbacks.get_devices_details_list_and_failed_devices)
|
||||
self._test_get_devices_list(callback, devices, expected)
|
||||
|
||||
def test_get_devices_details_list_and_failed_devices_failures(self):
|
||||
devices = [1, Exception('testdevice'), 3,
|
||||
Exception('testdevice'), 5]
|
||||
expected = {'devices': [1, 3, 5], 'failed_devices': [2, 4]}
|
||||
callback = (
|
||||
self.callbacks.get_devices_details_list_and_failed_devices)
|
||||
self._test_get_devices_list(callback, devices, expected)
|
||||
|
||||
def test_get_devices_details_list_and_failed_devices_empty_dev(self):
|
||||
with mock.patch.object(self.callbacks, 'get_device_details') as f:
|
||||
res = self.callbacks.get_devices_details_list_and_failed_devices(
|
||||
'fake_context')
|
||||
self.assertFalse(f.called)
|
||||
self.assertEqual({'devices': [], 'failed_devices': []}, res)
|
||||
|
||||
def _test_update_device_not_bound_to_host(self, func):
|
||||
self.plugin.port_bound_to_host.return_value = False
|
||||
self.plugin._device_to_port_id.return_value = 'fake_port_id'
|
||||
|
@ -192,6 +219,64 @@ class RpcCallbacksTestCase(base.BaseTestCase):
|
|||
self.callbacks.update_device_down(
|
||||
mock.Mock(), device='fake_device'))
|
||||
|
||||
def _test_update_device_list(self, devices_up_side_effect,
|
||||
devices_down_side_effect, expected):
|
||||
devices_up = [1, 2, 3]
|
||||
devices_down = [4, 5]
|
||||
kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}
|
||||
with mock.patch.object(self.callbacks, 'update_device_up',
|
||||
side_effect=devices_up_side_effect) as f_up, \
|
||||
mock.patch.object(self.callbacks, 'update_device_down',
|
||||
side_effect=devices_down_side_effect) as f_down:
|
||||
res = self.callbacks.update_device_list(
|
||||
'fake_context', devices_up=devices_up,
|
||||
devices_down=devices_down, **kwargs)
|
||||
self.assertEqual(expected, res)
|
||||
self.assertEqual(len(devices_up), f_up.call_count)
|
||||
self.assertEqual(len(devices_down), f_down.call_count)
|
||||
|
||||
def test_update_device_list_no_failure(self):
|
||||
devices_up_side_effect = [1, 2, 3]
|
||||
devices_down_side_effect = [
|
||||
{'device': 4, 'exists': True},
|
||||
{'device': 5, 'exists': True}]
|
||||
expected = {'devices_up': devices_up_side_effect,
|
||||
'failed_devices_up': [],
|
||||
'devices_down':
|
||||
[{'device': 4, 'exists': True},
|
||||
{'device': 5, 'exists': True}],
|
||||
'failed_devices_down': []}
|
||||
self._test_update_device_list(devices_up_side_effect,
|
||||
devices_down_side_effect,
|
||||
expected)
|
||||
|
||||
def test_update_device_list_failed_devices(self):
|
||||
|
||||
devices_up_side_effect = [1, Exception('testdevice'), 3]
|
||||
devices_down_side_effect = [{'device': 4, 'exists': True},
|
||||
Exception('testdevice')]
|
||||
expected = {'devices_up': [1, 3],
|
||||
'failed_devices_up': [2],
|
||||
'devices_down':
|
||||
[{'device': 4, 'exists': True}],
|
||||
'failed_devices_down': [5]}
|
||||
|
||||
self._test_update_device_list(devices_up_side_effect,
|
||||
devices_down_side_effect,
|
||||
expected)
|
||||
|
||||
def test_update_device_list_empty_devices(self):
|
||||
|
||||
expected = {'devices_up': [],
|
||||
'failed_devices_up': [],
|
||||
'devices_down': [],
|
||||
'failed_devices_down': []}
|
||||
|
||||
kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}
|
||||
res = self.callbacks.update_device_list(
|
||||
'fake_context', devices_up=[], devices_down=[], **kwargs)
|
||||
self.assertEqual(expected, res)
|
||||
|
||||
|
||||
class RpcApiTestCase(base.BaseTestCase):
|
||||
|
||||
|
@ -314,3 +399,73 @@ class RpcApiTestCase(base.BaseTestCase):
|
|||
device='fake_device',
|
||||
agent_id='fake_agent_id',
|
||||
host='fake_host')
|
||||
|
||||
def test_update_device_list(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_rpc_api(rpcapi, None,
|
||||
'update_device_list', rpc_method='call',
|
||||
devices_up=['fake_device1', 'fake_device2'],
|
||||
devices_down=['fake_device3', 'fake_device4'],
|
||||
agent_id='fake_agent_id',
|
||||
host='fake_host',
|
||||
version='1.5')
|
||||
|
||||
def test_update_device_list_unsupported(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
|
||||
devices_up = ['fake_device1', 'fake_device2']
|
||||
devices_down = ['fake_device3', 'fake_device4']
|
||||
expected_ret_val = {'devices_up': ['fake_device2'],
|
||||
'failed_devices_up': ['fake_device1'],
|
||||
'devices_down': [
|
||||
{'device': 'fake_device3', 'exists': True}],
|
||||
'failed_devices_down': ['fake_device4']}
|
||||
rpcapi.update_device_up = mock.Mock(
|
||||
side_effect=[Exception('fake_device1 fails'), None])
|
||||
rpcapi.update_device_down = mock.Mock(
|
||||
side_effect=[{'device': 'fake_device3', 'exists': True},
|
||||
Exception('fake_device4 fails')])
|
||||
with mock.patch.object(rpcapi.client, 'call'),\
|
||||
mock.patch.object(rpcapi.client, 'prepare') as prepare_mock:
|
||||
prepare_mock.side_effect = oslo_messaging.UnsupportedVersion(
|
||||
'test')
|
||||
res = rpcapi.update_device_list(ctxt, devices_up, devices_down,
|
||||
'fake_agent_id', 'fake_host')
|
||||
self.assertEqual(expected_ret_val, res)
|
||||
|
||||
def test_get_devices_details_list_and_failed_devices(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_rpc_api(rpcapi, None,
|
||||
'get_devices_details_list_and_failed_devices',
|
||||
rpc_method='call',
|
||||
devices=['fake_device1', 'fake_device2'],
|
||||
agent_id='fake_agent_id',
|
||||
host='fake_host',
|
||||
version='1.5')
|
||||
|
||||
def test_devices_details_list_and_failed_devices(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_rpc_api(rpcapi, None,
|
||||
'get_devices_details_list_and_failed_devices',
|
||||
rpc_method='call',
|
||||
devices=['fake_device1', 'fake_device2'],
|
||||
agent_id='fake_agent_id', host='fake_host',
|
||||
version='1.5')
|
||||
|
||||
def test_get_devices_details_list_and_failed_devices_unsupported(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
|
||||
devices = ['fake_device1', 'fake_device2']
|
||||
dev2_details = {'device': 'fake_device2', 'network_id': 'net_id',
|
||||
'port_id': 'port_id', 'admin_state_up': True}
|
||||
expected_ret_val = {'devices': [dev2_details],
|
||||
'failed_devices': ['fake_device1']}
|
||||
rpcapi.get_device_details = mock.Mock(
|
||||
side_effect=[Exception('fake_device1 fails'), dev2_details])
|
||||
with mock.patch.object(rpcapi.client, 'call'),\
|
||||
mock.patch.object(rpcapi.client, 'prepare') as prepare_mock:
|
||||
prepare_mock.side_effect = oslo_messaging.UnsupportedVersion(
|
||||
'test')
|
||||
res = rpcapi.get_devices_details_list_and_failed_devices(
|
||||
ctxt, devices, 'fake_agent_id', 'fake_host')
|
||||
self.assertEqual(expected_ret_val, res)
|
||||
|
|
Loading…
Reference in New Issue