L2 agent RPC add new RPC calls

This patch introduces two new RPC calls:
* update_device_list
* get_devices_details_list_and_failed_devices

update_device_up and update_device_down are the RPC calls
used by the L2 agent to notify the plugin that
a device is up or down. These calls accept only one device.
update_device_list accepts a list of devices that will
be put up or down and returns a list of devices for which
the operation failed.

get_devices_details_list_and_failed_devices is a new RPC
call that returns a list of devices details and a list of
devices for which it was not possible to retrieve the
details.

Thanks to these new calls, the agent will be able to update
the status of several devices using one RPC call
and to retry the operation only for the devices that failed
instead of performing a full resync.
This will be implemented in another patch.

This patch introduces the new calls only for the OVS agent.
A following patch will take care of the other L2 agents.

DocImpact
Partially-Implements: blueprint restructure-l2-agent

Change-Id: Idb4e81f4399cf66025ec7edfcb78801c15e493af
This commit is contained in:
rossella 2015-01-14 15:11:36 +01:00
parent 95f8831344
commit ced2e63cb8
6 changed files with 484 additions and 127 deletions

View File

@ -95,6 +95,8 @@ class PluginApi(object):
return value to include fixed_ips and device_owner for
the device port
1.4 - tunnel_sync rpc signature upgrade to obtain 'host'
1.5 - Support update_device_list and
get_devices_details_list_and_failed_devices
'''
def __init__(self, topic):
@ -123,6 +125,26 @@ class PluginApi(object):
]
return res
def get_devices_details_list_and_failed_devices(self, context, devices,
agent_id, host=None):
"""Get devices details and the list of devices that failed.
This method returns the devices details. If an error is thrown when
retrieving the devices details, the device is put in a list of
failed devices.
"""
try:
cctxt = self.client.prepare(version='1.5')
res = cctxt.call(
context,
'get_devices_details_list_and_failed_devices',
devices=devices, agent_id=agent_id, host=host)
except oslo_messaging.UnsupportedVersion:
#TODO(rossella_s): Remove this failback logic in M
res = self._device_list_rpc_call_with_failed_dev(
self.get_device_details, context, agent_id, host, devices)
return res
def update_device_down(self, context, device, agent_id, host=None):
cctxt = self.client.prepare()
return cctxt.call(context, 'update_device_down', device=device,
@ -133,6 +155,41 @@ class PluginApi(object):
return cctxt.call(context, 'update_device_up', device=device,
agent_id=agent_id, host=host)
def _device_list_rpc_call_with_failed_dev(self, rpc_call, context,
agent_id, host, devices):
succeeded_devices = []
failed_devices = []
for device in devices:
try:
rpc_device = rpc_call(context, device, agent_id, host)
except Exception:
failed_devices.append(device)
else:
# update_device_up doesn't return the device
succeeded_dev = rpc_device or device
succeeded_devices.append(succeeded_dev)
return {'devices': succeeded_devices, 'failed_devices': failed_devices}
def update_device_list(self, context, devices_up, devices_down,
agent_id, host):
try:
cctxt = self.client.prepare(version='1.5')
res = cctxt.call(context, 'update_device_list',
devices_up=devices_up, devices_down=devices_down,
agent_id=agent_id, host=host)
except oslo_messaging.UnsupportedVersion:
#TODO(rossella_s): Remove this failback logic in M
dev_up = self._device_list_rpc_call_with_failed_dev(
self.update_device_up, context, agent_id, host, devices_up)
dev_down = self._device_list_rpc_call_with_failed_dev(
self.update_device_down, context, agent_id, host, devices_down)
res = {'devices_up': dev_up.get('devices'),
'failed_devices_up': dev_up.get('failed_devices'),
'devices_down': dev_down.get('devices'),
'failed_devices_down': dev_down.get('failed_devices')}
return res
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None):
try:
cctxt = self.client.prepare(version='1.4')

View File

@ -64,8 +64,7 @@ class _mac_mydialect(netaddr.mac_unix):
class DeviceListRetrievalError(exceptions.NeutronException):
message = _("Unable to retrieve port details for devices: %(devices)s "
"because of error: %(error)s")
message = _("Unable to retrieve port details for devices: %(devices)s ")
# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
@ -740,6 +739,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
port_other_config)
def _bind_devices(self, need_binding_ports):
devices_up = []
devices_down = []
port_info = self.int_br.db_list(
"Port", columns=["name", "tag"])
tags_by_name = {x['name']: x['tag'] for x in port_info}
@ -768,13 +769,26 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# API server, thus possibly preventing instance spawn.
if port_detail.get('admin_state_up'):
LOG.debug("Setting status for %s to UP", device)
self.plugin_rpc.update_device_up(
self.context, device, self.agent_id, self.conf.host)
devices_up.append(device)
else:
LOG.debug("Setting status for %s to DOWN", device)
self.plugin_rpc.update_device_down(
self.context, device, self.agent_id, self.conf.host)
LOG.info(_LI("Configuration for device %s completed."), device)
devices_down.append(device)
failed_devices = []
if devices_up or devices_down:
devices_set = self.plugin_rpc.update_device_list(
self.context, devices_up, devices_down, self.agent_id,
self.conf.host)
failed_devices = (devices_set.get('failed_devices_up') +
devices_set.get('failed_devices_down'))
if failed_devices:
LOG.error(_LE("Configuration for devices %s failed!"),
failed_devices)
#TODO(rossella_s) handle better the resync in next patches,
# this is just to preserve the current behavior
raise DeviceListRetrievalError(devices=failed_devices)
LOG.info(_LI("Configuration for devices up %(up)s and devices "
"down %(down)s completed."),
{'up': devices_up, 'down': devices_down})
@staticmethod
def setup_arp_spoofing_protection(bridge, vif, port_details):
@ -1201,17 +1215,21 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def treat_devices_added_or_updated(self, devices, ovs_restarted):
skipped_devices = []
need_binding_devices = []
try:
devices_details_list = self.plugin_rpc.get_devices_details_list(
devices_details_list = (
self.plugin_rpc.get_devices_details_list_and_failed_devices(
self.context,
devices,
self.agent_id,
self.conf.host)
except Exception as e:
raise DeviceListRetrievalError(devices=devices, error=e)
self.conf.host))
if devices_details_list.get('failed_devices'):
#TODO(rossella_s) handle better the resync in next patches,
# this is just to preserve the current behavior
raise DeviceListRetrievalError(devices=devices)
devices = devices_details_list.get('devices')
vif_by_id = self.int_br.get_vifs_by_ids(
[vif['device'] for vif in devices_details_list])
for details in devices_details_list:
[vif['device'] for vif in devices])
for details in devices:
device = details['device']
LOG.debug("Processing port: %s", device)
port = vif_by_id.get(device)
@ -1247,62 +1265,67 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
return skipped_devices, need_binding_devices
def treat_ancillary_devices_added(self, devices):
try:
devices_details_list = self.plugin_rpc.get_devices_details_list(
devices_details_list = (
self.plugin_rpc.get_devices_details_list_and_failed_devices(
self.context,
devices,
self.agent_id,
self.conf.host)
except Exception as e:
raise DeviceListRetrievalError(devices=devices, error=e)
self.conf.host))
if devices_details_list.get('failed_devices'):
#TODO(rossella_s) handle better the resync in next patches,
# this is just to preserve the current behavior
raise DeviceListRetrievalError(devices=devices)
devices_added = [
d['device'] for d in devices_details_list.get('devices')]
LOG.info(_LI("Ancillary Ports %s added"), devices_added)
for details in devices_details_list:
device = details['device']
LOG.info(_LI("Ancillary Port %s added"), device)
# update plugin about port status
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id,
self.conf.host)
# update plugin about port status
devices_set_up = (
self.plugin_rpc.update_device_list(self.context,
devices_added,
[],
self.agent_id,
self.conf.host))
if devices_set_up.get('failed_devices_up'):
#TODO(rossella_s) handle better the resync in next patches,
# this is just to preserve the current behavior
raise DeviceListRetrievalError()
def treat_devices_removed(self, devices):
resync = False
self.sg_agent.remove_devices_filter(devices)
LOG.info(_LI("Ports %s removed"), devices)
devices_down = self.plugin_rpc.update_device_list(self.context,
[],
devices,
self.agent_id,
self.conf.host)
failed_devices = devices_down.get('failed_devices_down')
if failed_devices:
LOG.debug("Port removal failed for %(devices)s ", failed_devices)
resync = True
for device in devices:
LOG.info(_LI("Attachment %s removed"), device)
try:
self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id,
self.conf.host)
except Exception as e:
LOG.debug("port_removed failed for %(device)s: %(e)s",
{'device': device, 'e': e})
resync = True
continue
self.port_unbound(device)
return resync
def treat_ancillary_devices_removed(self, devices):
resync = False
for device in devices:
LOG.info(_LI("Attachment %s removed"), device)
try:
details = self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id,
self.conf.host)
except Exception as e:
LOG.debug("port_removed failed for %(device)s: %(e)s",
{'device': device, 'e': e})
resync = True
continue
if details['exists']:
LOG.info(_LI("Port %s updated."), device)
LOG.info(_LI("Ancillary ports %s removed"), devices)
devices_down = self.plugin_rpc.update_device_list(self.context,
[],
devices,
self.agent_id,
self.conf.host)
failed_devices = devices_down.get('failed_devices_down')
if failed_devices:
LOG.debug("Port removal failed for %(devices)s ", failed_devices)
resync = True
for detail in devices_down.get('devices_down'):
if detail['exists']:
LOG.info(_LI("Port %s updated."), detail['device'])
# Nothing to do regarding local networking
else:
LOG.debug("Device %s not defined on plugin", device)
LOG.debug("Device %s not defined on plugin", detail['device'])
return resync
def process_network_ports(self, port_info, ovs_restarted):
@ -1355,7 +1378,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
port_info.get('updated', set()))
self._bind_devices(need_binding_devices)
if 'removed' in port_info:
if 'removed' in port_info and port_info['removed']:
start = time.time()
resync_b = self.treat_devices_removed(port_info['removed'])
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
@ -1368,15 +1391,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def process_ancillary_network_ports(self, port_info):
resync_a = False
resync_b = False
if 'added' in port_info:
if 'added' in port_info and port_info['added']:
start = time.time()
try:
self.treat_ancillary_devices_added(port_info['added'])
LOG.debug("process_ancillary_network_ports - iteration: "
"%(iter_num)d - treat_ancillary_devices_added "
"completed in %(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
except DeviceListRetrievalError:
# Need to resync as there was an error with server
# communication.
@ -1384,7 +1407,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
"iteration:%d - failure while retrieving "
"port details from server"), self.iter_num)
resync_a = True
if 'removed' in port_info:
if 'removed' in port_info and port_info['removed']:
start = time.time()
resync_b = self.treat_ancillary_devices_removed(
port_info['removed'])

View File

@ -28,7 +28,7 @@ from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.extensions import portbindings
from neutron.extensions import portsecurity as psec
from neutron.i18n import _LW
from neutron.i18n import _LE, _LW
from neutron import manager
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_tunnel
@ -48,7 +48,9 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
# return value to include fixed_ips and device_owner for
# the device port
# 1.4 tunnel_sync rpc signature upgrade to obtain 'host'
target = oslo_messaging.Target(version='1.4')
# 1.5 Support update_device_list and
# get_devices_details_list_and_failed_devices
target = oslo_messaging.Target(version='1.5')
def __init__(self, notifier, type_manager):
self.setup_tunnel_callback_mixin(notifier, type_manager)
@ -135,6 +137,27 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
for device in kwargs.pop('devices', [])
]
def get_devices_details_list_and_failed_devices(self,
rpc_context,
**kwargs):
devices = []
failed_devices = []
cached_networks = {}
for device in kwargs.pop('devices', []):
try:
devices.append(self.get_device_details(
rpc_context,
device=device,
cached_networks=cached_networks,
**kwargs))
except Exception:
LOG.error(_LE("Failed to get details for device %s"),
device)
failed_devices.append(device)
return {'devices': devices,
'failed_devices': failed_devices}
def update_device_down(self, rpc_context, **kwargs):
"""Device no longer exists on agent."""
# TODO(garyk) - live migration and port status
@ -201,6 +224,44 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
registry.notify(
resources.PORT, events.AFTER_UPDATE, plugin, **kwargs)
def update_device_list(self, rpc_context, **kwargs):
devices_up = []
failed_devices_up = []
devices_down = []
failed_devices_down = []
devices = kwargs.get('devices_up')
if devices:
for device in devices:
try:
self.update_device_up(
rpc_context,
device=device,
**kwargs)
except Exception:
failed_devices_up.append(device)
LOG.error(_LE("Failed to update device %s up"), device)
else:
devices_up.append(device)
devices = kwargs.get('devices_down')
if devices:
for device in devices:
try:
dev = self.update_device_down(
rpc_context,
device=device,
**kwargs)
except Exception:
failed_devices_down.append(device)
LOG.error(_LE("Failed to update device %s down"), device)
else:
devices_down.append(dev)
return {'devices_up': devices_up,
'failed_devices_up': failed_devices_up,
'devices_down': devices_down,
'failed_devices_down': failed_devices_down}
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
sg_rpc.SecurityGroupAgentRpcApiMixin,

View File

@ -202,11 +202,6 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
for port in [self.patch_tun, self.patch_int]:
self.assertTrue(self.ovs.port_exists(port))
def assert_no_vlan_tags(self, ports, agent):
for port in ports:
res = agent.int_br.db_get_val('Port', port.get('vif_name'), 'tag')
self.assertEqual([], res)
def assert_vlan_tags(self, ports, agent):
for port in ports:
res = agent.int_br.db_get_val('Port', port.get('vif_name'), 'tag')
@ -215,30 +210,64 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
class TestOVSAgent(OVSAgentTestFramework):
def _expected_plugin_rpc_call(self, call, expected_devices):
def _expected_plugin_rpc_call(self, call, expected_devices, is_up=True):
"""Helper to check expected rpc call are received
:param call: The call to check
:param expected_devices The device for which call is expected
:param is_up True if expected_devices are devices that are set up,
False if expected_devices are devices that are set down
"""
args = (args[0][1] for args in call.call_args_list)
return not (set(expected_devices) - set(args))
if is_up:
rpc_devices = [
dev for args in call.call_args_list for dev in args[0][1]]
else:
rpc_devices = [
dev for args in call.call_args_list for dev in args[0][2]]
return not (set(expected_devices) - set(rpc_devices))
def _create_ports(self, network, agent):
def _create_ports(self, network, agent, trigger_resync=False):
ports = []
for x in range(3):
ports.append(self._create_test_port_dict())
def mock_device_raise_exception(context, devices_up, devices_down,
agent_id, host=None):
agent.plugin_rpc.update_device_list.side_effect = (
mock_update_device)
raise Exception('Exception to trigger resync')
def mock_device_details(context, devices, agent_id, host=None):
details = []
for port in ports:
if port['id'] in devices:
dev = OVSAgentTestFramework._get_device_details(
port, network)
details.append(dev)
return details
return {'devices': details, 'failed_devices': []}
agent.plugin_rpc.get_devices_details_list.side_effect = (
mock_device_details)
def mock_update_device(context, devices_up, devices_down, agent_id,
host=None):
dev_up = []
dev_down = []
for port in ports:
if devices_up and port['id'] in devices_up:
dev_up.append(port['id'])
if devices_down and port['id'] in devices_down:
dev_down.append({'device': port['id'], 'exists': True})
return {'devices_up': dev_up,
'failed_devices_up': [],
'devices_down': dev_down,
'failed_devices_down': []}
(agent.plugin_rpc.get_devices_details_list_and_failed_devices.
side_effect) = mock_device_details
if trigger_resync:
agent.plugin_rpc.update_device_list.side_effect = (
mock_device_raise_exception)
else:
agent.plugin_rpc.update_device_list.side_effect = (
mock_update_device)
return ports
def test_port_creation_and_deletion(self):
@ -250,39 +279,35 @@ class TestOVSAgent(OVSAgentTestFramework):
up_ports_ids = [p['id'] for p in ports]
agent_utils.wait_until_true(
lambda: self._expected_plugin_rpc_call(
agent.plugin_rpc.update_device_up, up_ports_ids))
agent.plugin_rpc.update_device_list, up_ports_ids))
down_ports_ids = [p['id'] for p in ports]
for port in ports:
agent.int_br.delete_port(port['vif_name'])
agent_utils.wait_until_true(
lambda: self._expected_plugin_rpc_call(
agent.plugin_rpc.update_device_down, down_ports_ids))
agent.plugin_rpc.update_device_list, down_ports_ids, False))
def test_resync_devices_set_up_after_exception(self):
agent = self.create_agent()
self.start_agent(agent)
network = self._create_test_network_dict()
ports = self._create_ports(network, agent)
agent.plugin_rpc.update_device_up.side_effect = [
Exception('Exception to trigger resync'),
None, None, None]
ports = self._create_ports(network, agent, True)
self._plug_ports(network, ports, agent)
ports_ids = [p['id'] for p in ports]
agent_utils.wait_until_true(
lambda: self._expected_plugin_rpc_call(
agent.plugin_rpc.update_device_up, ports_ids))
agent.plugin_rpc.update_device_list, ports_ids))
def test_port_vlan_tags(self):
agent = self.create_agent()
self.start_agent(agent)
ports = []
for x in range(3):
ports.append(self._create_test_port_dict())
network = self._create_test_network_dict()
ports = self._create_ports(network, agent)
ports_ids = [p['id'] for p in ports]
self._plug_ports(network, ports, agent)
agent.provision_local_vlan(network['id'], 'vlan', 'physnet', 1)
self.assert_no_vlan_tags(ports, agent)
self._bind_ports(ports, network, agent)
agent_utils.wait_until_true(
lambda: self._expected_plugin_rpc_call(
agent.plugin_rpc.update_device_list, ports_ids))
self.assert_vlan_tags(ports, agent)
def test_assert_bridges_ports_vxlan(self):

View File

@ -322,16 +322,30 @@ class TestOvsNeutronAgent(object):
vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
self.assertEqual(expected, actual)
def test_treat_devices_added_returns_raises_for_missing_device(self):
with mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list',
side_effect=Exception()),\
mock.patch.object(self.agent.int_br,
'get_vif_port_by_id',
return_value=mock.Mock()):
self.assertRaises(
self.mod_agent.DeviceListRetrievalError,
self.agent.treat_devices_added_or_updated, [{}], False)
def test_bind_devices(self):
devices_up = ['tap1']
devices_down = ['tap2']
self.agent.local_vlan_map["net1"] = mock.Mock()
port_details = [
{'network_id': 'net1', 'vif_port': mock.Mock(),
'device': devices_up[0],
'admin_state_up': True},
{'network_id': 'net1', 'vif_port': mock.Mock(),
'device': devices_down[0],
'admin_state_up': False}]
with mock.patch.object(
self.agent.plugin_rpc, 'update_device_list',
return_value={'devices_up': devices_up,
'devices_down': devices_down,
'failed_devices_up': [],
'failed_devices_down': []}) as update_devices, \
mock.patch.object(self.agent,
'int_br') as int_br:
int_br.db_list.return_value = []
self.agent._bind_devices(port_details)
update_devices.assert_called_once_with(mock.ANY, devices_up,
devices_down,
mock.ANY, mock.ANY)
def _mock_treat_devices_added_updated(self, details, port, func_name):
"""Mock treat devices added or updated.
@ -342,11 +356,17 @@ class TestOvsNeutronAgent(object):
:returns: whether the named function was called
"""
with mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list',
return_value=[details]),\
'get_devices_details_list_and_failed_devices',
return_value={'devices': [details],
'failed_devices': None}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={details['device']: port}),\
mock.patch.object(self.agent.plugin_rpc, 'update_device_list',
return_value={'devices_up': [],
'devices_down': details,
'failed_devices_up': [],
'failed_devices_down': []}),\
mock.patch.object(self.agent, func_name) as func:
skip_devs, need_bound_devices = (
self.agent.treat_devices_added_or_updated([{}], False))
@ -367,8 +387,9 @@ class TestOvsNeutronAgent(object):
mock.MagicMock(), port, 'port_dead'))
def test_treat_devices_added_does_not_process_missing_port(self):
with mock.patch.object(self.agent.plugin_rpc,
'get_device_details') as get_dev_fn,\
with mock.patch.object(
self.agent.plugin_rpc,
'get_devices_details_list_and_failed_devices') as get_dev_fn,\
mock.patch.object(self.agent.int_br,
'get_vif_port_by_id',
return_value=None):
@ -384,8 +405,9 @@ class TestOvsNeutronAgent(object):
dev_mock = mock.MagicMock()
dev_mock.__getitem__.return_value = 'the_skipped_one'
with mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list',
return_value=[dev_mock]),\
'get_devices_details_list_and_failed_devices',
return_value={'devices': [dev_mock],
'failed_devices': None}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={}),\
@ -411,8 +433,9 @@ class TestOvsNeutronAgent(object):
}
with mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list',
return_value=[fake_details_dict]),\
'get_devices_details_list_and_failed_devices',
return_value={'devices': [fake_details_dict],
'failed_devices': None}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={'xxx': mock.MagicMock()}),\
@ -424,15 +447,14 @@ class TestOvsNeutronAgent(object):
self.assertFalse(skip_devs)
self.assertTrue(treat_vif_port.called)
def test_treat_devices_removed_returns_true_for_missing_device(self):
with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
side_effect=Exception()):
self.assertTrue(self.agent.treat_devices_removed([{}]))
def _mock_treat_devices_removed(self, port_exists):
details = dict(exists=port_exists)
with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
return_value=details):
with mock.patch.object(self.agent.plugin_rpc,
'update_device_list',
return_value={'devices_up': [],
'devices_down': details,
'failed_devices_up': [],
'failed_devices_down': []}):
with mock.patch.object(self.agent, 'port_unbound') as port_unbound:
self.assertFalse(self.agent.treat_devices_removed([{}]))
self.assertTrue(port_unbound.called)
@ -1046,7 +1068,11 @@ class TestOvsNeutronAgent(object):
'physical_network', 'segmentation_id',
'admin_state_up', 'fixed_ips', 'device',
'device_owner')}]
self.agent.plugin_rpc.get_devices_details_list.return_value = plist
(self.agent.plugin_rpc.get_devices_details_list_and_failed_devices.
return_value) = {'devices': plist, 'failed_devices': []}
self.agent.plugin_rpc.update_device_list.return_value = {
'devices_up': plist, 'devices_down': [], 'failed_devices_up': [],
'failed_devices_down': []}
self.agent.setup_arp_spoofing_protection = mock.Mock()
self.agent.treat_devices_added_or_updated([], False)
self.assertFalse(self.agent.setup_arp_spoofing_protection.called)
@ -1664,9 +1690,12 @@ class TestOvsDvrNeutronAgent(object):
int_br.reset_mock()
tun_br.reset_mock()
with mock.patch.object(self.agent, 'reclaim_local_vlan'),\
mock.patch.object(self.agent.plugin_rpc,
'update_device_down',
return_value=None),\
mock.patch.object(self.agent.plugin_rpc, 'update_device_list',
return_value={
'devices_up': [],
'devices_down': [self._port.vif_id],
'failed_devices_up': [],
'failed_devices_down': []}),\
mock.patch.object(self.agent, 'int_br', new=int_br),\
mock.patch.object(self.agent, 'tun_br', new=tun_br),\
mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\
@ -1766,9 +1795,13 @@ class TestOvsDvrNeutronAgent(object):
int_br.reset_mock()
tun_br.reset_mock()
with mock.patch.object(self.agent, 'reclaim_local_vlan'),\
mock.patch.object(self.agent.plugin_rpc,
'update_device_down',
return_value=None),\
mock.patch.object(self.agent.plugin_rpc, 'update_device_list',
return_value={
'devices_up': [],
'devices_down': [
self._compute_port.vif_id],
'failed_devices_up': [],
'failed_devices_down': []}),\
mock.patch.object(self.agent, 'int_br', new=int_br),\
mock.patch.object(self.agent, 'tun_br', new=tun_br),\
mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\
@ -1853,9 +1886,12 @@ class TestOvsDvrNeutronAgent(object):
int_br.reset_mock()
tun_br.reset_mock()
with mock.patch.object(self.agent, 'reclaim_local_vlan'),\
mock.patch.object(self.agent.plugin_rpc,
'update_device_down',
return_value=None),\
mock.patch.object(self.agent.plugin_rpc, 'update_device_list',
return_value={
'devices_up': [],
'devices_down': [self._port.vif_id],
'failed_devices_up': [],
'failed_devices_down': []}),\
mock.patch.object(self.agent, 'int_br', new=int_br),\
mock.patch.object(self.agent, 'tun_br', new=tun_br),\
mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\

View File

@ -22,6 +22,7 @@ import collections
import mock
from oslo_config import cfg
from oslo_context import context as oslo_context
import oslo_messaging
from sqlalchemy.orm import exc
from neutron.agent import rpc as agent_rpc
@ -134,27 +135,53 @@ class RpcCallbacksTestCase(base.BaseTestCase):
self.callbacks.get_device_details(mock.Mock())
self.assertTrue(self.plugin.update_port_status.called)
def test_get_devices_details_list(self):
def _test_get_devices_list(self, callback, side_effect, expected):
devices = [1, 2, 3, 4, 5]
kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}
with mock.patch.object(self.callbacks, 'get_device_details',
side_effect=devices) as f:
res = self.callbacks.get_devices_details_list('fake_context',
devices=devices,
**kwargs)
self.assertEqual(devices, res)
side_effect=side_effect) as f:
res = callback('fake_context', devices=devices, **kwargs)
self.assertEqual(expected, res)
self.assertEqual(len(devices), f.call_count)
calls = [mock.call('fake_context', device=i,
cached_networks={}, **kwargs)
for i in devices]
f.assert_has_calls(calls)
def test_get_devices_details_list(self):
devices = [1, 2, 3, 4, 5]
expected = devices
callback = self.callbacks.get_devices_details_list
self._test_get_devices_list(callback, devices, expected)
def test_get_devices_details_list_with_empty_devices(self):
with mock.patch.object(self.callbacks, 'get_device_details') as f:
res = self.callbacks.get_devices_details_list('fake_context')
self.assertFalse(f.called)
self.assertEqual([], res)
def test_get_devices_details_list_and_failed_devices(self):
devices = [1, 2, 3, 4, 5]
expected = {'devices': devices, 'failed_devices': []}
callback = (
self.callbacks.get_devices_details_list_and_failed_devices)
self._test_get_devices_list(callback, devices, expected)
def test_get_devices_details_list_and_failed_devices_failures(self):
devices = [1, Exception('testdevice'), 3,
Exception('testdevice'), 5]
expected = {'devices': [1, 3, 5], 'failed_devices': [2, 4]}
callback = (
self.callbacks.get_devices_details_list_and_failed_devices)
self._test_get_devices_list(callback, devices, expected)
def test_get_devices_details_list_and_failed_devices_empty_dev(self):
with mock.patch.object(self.callbacks, 'get_device_details') as f:
res = self.callbacks.get_devices_details_list_and_failed_devices(
'fake_context')
self.assertFalse(f.called)
self.assertEqual({'devices': [], 'failed_devices': []}, res)
def _test_update_device_not_bound_to_host(self, func):
self.plugin.port_bound_to_host.return_value = False
self.plugin._device_to_port_id.return_value = 'fake_port_id'
@ -192,6 +219,64 @@ class RpcCallbacksTestCase(base.BaseTestCase):
self.callbacks.update_device_down(
mock.Mock(), device='fake_device'))
def _test_update_device_list(self, devices_up_side_effect,
devices_down_side_effect, expected):
devices_up = [1, 2, 3]
devices_down = [4, 5]
kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}
with mock.patch.object(self.callbacks, 'update_device_up',
side_effect=devices_up_side_effect) as f_up, \
mock.patch.object(self.callbacks, 'update_device_down',
side_effect=devices_down_side_effect) as f_down:
res = self.callbacks.update_device_list(
'fake_context', devices_up=devices_up,
devices_down=devices_down, **kwargs)
self.assertEqual(expected, res)
self.assertEqual(len(devices_up), f_up.call_count)
self.assertEqual(len(devices_down), f_down.call_count)
def test_update_device_list_no_failure(self):
devices_up_side_effect = [1, 2, 3]
devices_down_side_effect = [
{'device': 4, 'exists': True},
{'device': 5, 'exists': True}]
expected = {'devices_up': devices_up_side_effect,
'failed_devices_up': [],
'devices_down':
[{'device': 4, 'exists': True},
{'device': 5, 'exists': True}],
'failed_devices_down': []}
self._test_update_device_list(devices_up_side_effect,
devices_down_side_effect,
expected)
def test_update_device_list_failed_devices(self):
devices_up_side_effect = [1, Exception('testdevice'), 3]
devices_down_side_effect = [{'device': 4, 'exists': True},
Exception('testdevice')]
expected = {'devices_up': [1, 3],
'failed_devices_up': [2],
'devices_down':
[{'device': 4, 'exists': True}],
'failed_devices_down': [5]}
self._test_update_device_list(devices_up_side_effect,
devices_down_side_effect,
expected)
def test_update_device_list_empty_devices(self):
expected = {'devices_up': [],
'failed_devices_up': [],
'devices_down': [],
'failed_devices_down': []}
kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}
res = self.callbacks.update_device_list(
'fake_context', devices_up=[], devices_down=[], **kwargs)
self.assertEqual(expected, res)
class RpcApiTestCase(base.BaseTestCase):
@ -314,3 +399,73 @@ class RpcApiTestCase(base.BaseTestCase):
device='fake_device',
agent_id='fake_agent_id',
host='fake_host')
def test_update_device_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_rpc_api(rpcapi, None,
'update_device_list', rpc_method='call',
devices_up=['fake_device1', 'fake_device2'],
devices_down=['fake_device3', 'fake_device4'],
agent_id='fake_agent_id',
host='fake_host',
version='1.5')
def test_update_device_list_unsupported(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
devices_up = ['fake_device1', 'fake_device2']
devices_down = ['fake_device3', 'fake_device4']
expected_ret_val = {'devices_up': ['fake_device2'],
'failed_devices_up': ['fake_device1'],
'devices_down': [
{'device': 'fake_device3', 'exists': True}],
'failed_devices_down': ['fake_device4']}
rpcapi.update_device_up = mock.Mock(
side_effect=[Exception('fake_device1 fails'), None])
rpcapi.update_device_down = mock.Mock(
side_effect=[{'device': 'fake_device3', 'exists': True},
Exception('fake_device4 fails')])
with mock.patch.object(rpcapi.client, 'call'),\
mock.patch.object(rpcapi.client, 'prepare') as prepare_mock:
prepare_mock.side_effect = oslo_messaging.UnsupportedVersion(
'test')
res = rpcapi.update_device_list(ctxt, devices_up, devices_down,
'fake_agent_id', 'fake_host')
self.assertEqual(expected_ret_val, res)
def test_get_devices_details_list_and_failed_devices(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_rpc_api(rpcapi, None,
'get_devices_details_list_and_failed_devices',
rpc_method='call',
devices=['fake_device1', 'fake_device2'],
agent_id='fake_agent_id',
host='fake_host',
version='1.5')
def test_devices_details_list_and_failed_devices(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_rpc_api(rpcapi, None,
'get_devices_details_list_and_failed_devices',
rpc_method='call',
devices=['fake_device1', 'fake_device2'],
agent_id='fake_agent_id', host='fake_host',
version='1.5')
def test_get_devices_details_list_and_failed_devices_unsupported(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
devices = ['fake_device1', 'fake_device2']
dev2_details = {'device': 'fake_device2', 'network_id': 'net_id',
'port_id': 'port_id', 'admin_state_up': True}
expected_ret_val = {'devices': [dev2_details],
'failed_devices': ['fake_device1']}
rpcapi.get_device_details = mock.Mock(
side_effect=[Exception('fake_device1 fails'), dev2_details])
with mock.patch.object(rpcapi.client, 'call'),\
mock.patch.object(rpcapi.client, 'prepare') as prepare_mock:
prepare_mock.side_effect = oslo_messaging.UnsupportedVersion(
'test')
res = rpcapi.get_devices_details_list_and_failed_devices(
ctxt, devices, 'fake_agent_id', 'fake_host')
self.assertEqual(expected_ret_val, res)