|
|
|
@ -21,13 +21,6 @@ from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
|
|
|
|
|
from neutron.tests.functional import base as functional_base
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_attr(pyroute2_obj, attr_name):
|
|
|
|
|
rule_attrs = pyroute2_obj.get('attrs', [])
|
|
|
|
|
for attr in (attr for attr in rule_attrs if attr[0] == attr_name):
|
|
|
|
|
return attr[1]
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GetDeviceNamesTestCase(functional_base.BaseSudoTestCase):
|
|
|
|
|
|
|
|
|
|
def _remove_ns(self, namespace):
|
|
|
|
@ -57,6 +50,121 @@ class GetDeviceNamesTestCase(functional_base.BaseSudoTestCase):
|
|
|
|
|
self.assertNotIn(name, interfaces)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GetDevicesInfoTestCase(functional_base.BaseSudoTestCase):
|
|
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
|
super(GetDevicesInfoTestCase, self).setUp()
|
|
|
|
|
self.namespace = 'ns_test-' + uuidutils.generate_uuid()
|
|
|
|
|
priv_ip_lib.create_netns(self.namespace)
|
|
|
|
|
self.addCleanup(self._remove_ns, self.namespace)
|
|
|
|
|
self.interfaces = ['int_01', 'int_02']
|
|
|
|
|
self.interfaces_to_exclude = (ip_lib.FB_TUNNEL_DEVICE_NAMES +
|
|
|
|
|
[ip_lib.LOOPBACK_DEVNAME])
|
|
|
|
|
|
|
|
|
|
def _remove_ns(self, namespace):
|
|
|
|
|
priv_ip_lib.remove_netns(namespace)
|
|
|
|
|
|
|
|
|
|
def test_get_devices_info_lo(self):
|
|
|
|
|
devices = priv_ip_lib.get_link_devices(self.namespace)
|
|
|
|
|
self.assertGreater(len(devices), 0)
|
|
|
|
|
for device in devices:
|
|
|
|
|
if ip_lib.get_attr(device, 'IFLA_IFNAME') != 'lo':
|
|
|
|
|
continue
|
|
|
|
|
self.assertIsNone(ip_lib.get_attr(device, 'IFLA_LINKINFO'))
|
|
|
|
|
break
|
|
|
|
|
else:
|
|
|
|
|
self.fail('Device "lo" not found')
|
|
|
|
|
|
|
|
|
|
def test_get_devices_info_dummy(self):
|
|
|
|
|
interfaces_tested = []
|
|
|
|
|
for interface in self.interfaces:
|
|
|
|
|
priv_ip_lib.create_interface(interface, self.namespace, 'dummy')
|
|
|
|
|
|
|
|
|
|
devices = priv_ip_lib.get_link_devices(self.namespace)
|
|
|
|
|
self.assertGreater(len(devices), 0)
|
|
|
|
|
for device in devices:
|
|
|
|
|
name = ip_lib.get_attr(device, 'IFLA_IFNAME')
|
|
|
|
|
if name in self.interfaces_to_exclude:
|
|
|
|
|
continue
|
|
|
|
|
self.assertIn(name, self.interfaces)
|
|
|
|
|
ifla_linkinfo = ip_lib.get_attr(device, 'IFLA_LINKINFO')
|
|
|
|
|
self.assertEqual(ip_lib.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND'),
|
|
|
|
|
'dummy')
|
|
|
|
|
interfaces_tested.append(name)
|
|
|
|
|
self.assertEqual(sorted(interfaces_tested), sorted(self.interfaces))
|
|
|
|
|
|
|
|
|
|
def test_get_devices_info_vlan(self):
|
|
|
|
|
interfaces_tested = []
|
|
|
|
|
vlan_interfaces = []
|
|
|
|
|
vlan_id = 1000
|
|
|
|
|
for interface in self.interfaces:
|
|
|
|
|
priv_ip_lib.create_interface(interface, self.namespace, 'dummy')
|
|
|
|
|
vlan_interface = interface + '_' + str(vlan_id)
|
|
|
|
|
vlan_interfaces.append(vlan_interface)
|
|
|
|
|
priv_ip_lib.create_interface(
|
|
|
|
|
vlan_interface, self.namespace, 'vlan',
|
|
|
|
|
physical_interface=interface, vlan_id=vlan_id)
|
|
|
|
|
vlan_id += 1
|
|
|
|
|
|
|
|
|
|
devices = priv_ip_lib.get_link_devices(self.namespace)
|
|
|
|
|
self.assertGreater(len(devices), 0)
|
|
|
|
|
for device in devices:
|
|
|
|
|
name = ip_lib.get_attr(device, 'IFLA_IFNAME')
|
|
|
|
|
if name in self.interfaces_to_exclude:
|
|
|
|
|
continue
|
|
|
|
|
self.assertIn(name, self.interfaces + vlan_interfaces)
|
|
|
|
|
ifla_linkinfo = ip_lib.get_attr(device, 'IFLA_LINKINFO')
|
|
|
|
|
if name in vlan_interfaces:
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
ip_lib.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND'), 'vlan')
|
|
|
|
|
ifla_infodata = ip_lib.get_attr(ifla_linkinfo,
|
|
|
|
|
'IFLA_INFO_DATA')
|
|
|
|
|
vlan_id = int(name.split('_')[-1])
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
ip_lib.get_attr(ifla_infodata, 'IFLA_VLAN_ID'), vlan_id)
|
|
|
|
|
interfaces_tested.append(name)
|
|
|
|
|
self.assertEqual(sorted(interfaces_tested),
|
|
|
|
|
sorted(self.interfaces + vlan_interfaces))
|
|
|
|
|
|
|
|
|
|
def test_get_devices_info_vxlan(self):
|
|
|
|
|
interfaces_tested = []
|
|
|
|
|
vxlan_interfaces = []
|
|
|
|
|
vxlan_id = 1000
|
|
|
|
|
for interface in self.interfaces:
|
|
|
|
|
priv_ip_lib.create_interface(interface, self.namespace, 'dummy')
|
|
|
|
|
vxlan_interface = interface + '_' + str(vxlan_id)
|
|
|
|
|
vxlan_interfaces.append(vxlan_interface)
|
|
|
|
|
priv_ip_lib.create_interface(
|
|
|
|
|
vxlan_interface, self.namespace, 'vxlan',
|
|
|
|
|
physical_interface=interface, vxlan_id=vxlan_id,
|
|
|
|
|
vxlan_group='239.1.1.1')
|
|
|
|
|
vxlan_id += 1
|
|
|
|
|
|
|
|
|
|
devices = priv_ip_lib.get_link_devices(self.namespace)
|
|
|
|
|
self.assertGreater(len(devices), 0)
|
|
|
|
|
for device in devices:
|
|
|
|
|
name = ip_lib.get_attr(device, 'IFLA_IFNAME')
|
|
|
|
|
if name in self.interfaces_to_exclude:
|
|
|
|
|
continue
|
|
|
|
|
self.assertIn(name, self.interfaces + vxlan_interfaces)
|
|
|
|
|
ifla_linkinfo = ip_lib.get_attr(device, 'IFLA_LINKINFO')
|
|
|
|
|
if name in vxlan_interfaces:
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
ip_lib.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND'),
|
|
|
|
|
'vxlan')
|
|
|
|
|
ifla_infodata = ip_lib.get_attr(ifla_linkinfo,
|
|
|
|
|
'IFLA_INFO_DATA')
|
|
|
|
|
vxlan_id = int(name.split('_')[-1])
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
ip_lib.get_attr(ifla_infodata, 'IFLA_VXLAN_ID'), vxlan_id)
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
ip_lib.get_attr(ifla_infodata, 'IFLA_VXLAN_GROUP'),
|
|
|
|
|
'239.1.1.1')
|
|
|
|
|
interfaces_tested.append(name)
|
|
|
|
|
self.assertEqual(sorted(interfaces_tested),
|
|
|
|
|
sorted(self.interfaces + vxlan_interfaces))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ListIpRulesTestCase(functional_base.BaseSudoTestCase):
|
|
|
|
|
|
|
|
|
|
RULE_TABLES = {'default': 253, 'main': 254, 'local': 255}
|
|
|
|
@ -276,7 +384,7 @@ class GetIpAddressesTestCase(functional_base.BaseSudoTestCase):
|
|
|
|
|
ip_addresses = priv_ip_lib.get_ip_addresses(namespace)
|
|
|
|
|
for ip_address in ip_addresses:
|
|
|
|
|
int_name = str(ip_address['index'])
|
|
|
|
|
ip = _get_attr(ip_address, 'IFA_ADDRESS')
|
|
|
|
|
ip = ip_lib.get_attr(ip_address, 'IFA_ADDRESS')
|
|
|
|
|
mask = ip_address['prefixlen']
|
|
|
|
|
cidr = common_utils.ip_to_cidr(ip, mask)
|
|
|
|
|
self.assertEqual(interfaces[int_name]['cidr'], cidr)
|
|
|
|
|