os-net-config/os_net_config/tests/test_impl_ifcfg.py

2411 lines
87 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os.path
import random
import shutil
import tempfile
from oslo_concurrency import processutils
import os_net_config
from os_net_config import impl_ifcfg
from os_net_config import objects
from os_net_config import sriov_config
from os_net_config.tests import base
from os_net_config import utils
_BASE_IFCFG = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
"""
_BASE_IFCFG_NETWORKMANAGER = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=yes
PEERDNS=no
"""
_HOTPLUG = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=yes
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
_ONBOOT = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
_LINKDELAY = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
LINKDELAY=10
BOOTPROTO=none
"""
_NO_IP = _BASE_IFCFG + "BOOTPROTO=none\n"
_V4_IFCFG = _BASE_IFCFG + """BOOTPROTO=static
IPADDR=192.168.1.2
NETMASK=255.255.255.0
"""
_V4_V6_IFCFG = _BASE_IFCFG + """IPV6INIT=yes
BOOTPROTO=static
IPADDR=192.168.1.2
NETMASK=255.255.255.0
IPV6_AUTOCONF=no
IPV6ADDR=2001:abc:a::/64
"""
_IFCFG_VLAN = """# This file is autogenerated by os-net-config
DEVICE=em1.120
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
VLAN=yes
BOOTPROTO=none
"""
_IFCFG_DHCP = """# This file is autogenerated by os-net-config
DEVICE="eth0"
BOOTPROTO="dhcp"
ONBOOT="yes"
TYPE="Ethernet"
"""
_IFCFG_STATIC1 = """# This file is autogenerated by os-net-config
DEVICE=eth0
BOOTPROTO=static
IPADDR=10.0.0.1
NETMASK=255.255.255.0
TYPE=Ethernet
ONBOOT=yes
"""
_IFCFG_STATIC1_MTU = _IFCFG_STATIC1 + "\nMTU=9000"
_IFCFG_STATIC2 = """DEVICE=eth0
BOOTPROTO=static
IPADDR=10.0.1.2
NETMASK=255.255.254.0
TYPE=Ethernet
ONBOOT=yes
"""
_IFCFG_STATIC2_MTU = _IFCFG_STATIC2 + "\nMTU=9000"
_IFCFG_OVS = """DEVICE=eth0
ONBOOT=yes
DEVICETYPE=ovs
TYPE=OVSPort
OVS_BRIDGE=brctlplane
BOOTPROTO=none
HOTPLUG=no
"""
_IFCFG_ROUTES1 = """default via 192.0.2.1 dev eth0
192.0.2.1/24 via 192.0.2.1 dev eth0
"""
_IFCFG_ROUTES2 = """default via 192.0.1.1 dev eth0
192.0.1.1/24 via 192.0.3.1 dev eth1
"""
_V4_IFCFG_MAPPED = _V4_IFCFG.replace('em1', 'nic1') + "HWADDR=a1:b2:c3:d4:e5\n"
_BASE_IB_IFCFG = """# This file is autogenerated by os-net-config
DEVICE=ib0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
TYPE=Infiniband
"""
_IB_IFCFG = _BASE_IB_IFCFG + """BOOTPROTO=none
"""
_V4_IB_IFCFG = _BASE_IB_IFCFG + """BOOTPROTO=static
IPADDR=192.168.1.2
NETMASK=255.255.255.0
"""
_V4_IFCFG_MULTIPLE = _V4_IFCFG + """IPADDR1=192.168.1.3
NETMASK1=255.255.255.255
IPADDR2=10.0.0.2
NETMASK2=255.0.0.0
"""
_IB_V4_IFCFG_MULTIPLE = _V4_IB_IFCFG + """IPADDR1=192.168.1.3
NETMASK1=255.255.255.255
IPADDR2=10.0.0.2
NETMASK2=255.0.0.0
"""
_V6_IFCFG = _BASE_IFCFG + """IPV6INIT=yes
IPV6_AUTOCONF=no
IPV6ADDR=2001:abc:a::/64
"""
_V6_IFCFG_MULTIPLE = (_V6_IFCFG + "IPV6ADDR_SECONDARIES=\"2001:abc:b::1/64 " +
"2001:abc:c::2/96\"\n")
_OVS_IFCFG = _BASE_IFCFG + "DEVICETYPE=ovs\nBOOTPROTO=none\n"
_OVS_IFCFG_TUNNEL = """# This file is autogenerated by os-net-config
DEVICE=tun0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSTunnel
OVS_BRIDGE=br-ctlplane
OVS_TUNNEL_TYPE=gre
OVS_TUNNEL_OPTIONS="options:remote_ip=192.168.1.1"
"""
_OVS_BRIDGE_IFCFG = _BASE_IFCFG + "DEVICETYPE=ovs\n"
_LINUX_BRIDGE_IFCFG = _BASE_IFCFG + "BRIDGE=br-ctlplane\nBOOTPROTO=none\n"
_ROUTES = """default via 192.168.1.1 dev em1 metric 10
172.19.0.0/24 via 192.168.1.1 dev em1
172.20.0.0/24 via 192.168.1.5 dev em1 metric 100
"""
_ROUTES_PF = """default via 192.168.1.1 dev enp3s0f0 metric 10
172.19.0.0/24 via 192.168.1.1 dev enp3s0f0
172.20.0.0/24 via 192.168.1.5 dev enp3s0f0 metric 100
"""
_ROUTES_WITH_TABLES = """172.19.0.0/24 via 192.168.1.1 dev em1 table table1
172.20.0.0/24 via 192.168.1.1 dev em1 table 201
172.21.0.0/24 via 192.168.1.1 dev em1 table 200
"""
_ROUTE_RULES = """# This file is autogenerated by os-net-config
# test comment
from 192.0.2.0/24 table 200
"""
_RT_DEFAULT = """# reserved values
#
255\tlocal
254\tmain
253\tdefault
0\tunspec
#
# local
#
#1\tinr.ruhep\n"""
_RT_CUSTOM = _RT_DEFAULT + "# Custom\n10\tcustom # Custom table\n20\ttable1\n"
_RT_FULL = _RT_DEFAULT + """# Custom
10\tcustom # os-net-config managed table
200\ttable1 # os-net-config managed table\n"""
_ROUTES_V6 = """default via 2001:db8::1 dev em1
2001:db8:dead:beef:cafe::/56 via fd00:fd00:2000::1 dev em1
2001:db8:dead:beff::/64 via fd00:fd00:2000::1 dev em1 metric 100
"""
_OVS_INTERFACE = _BASE_IFCFG + """DEVICETYPE=ovs
TYPE=OVSPort
OVS_BRIDGE=br-ctlplane
BOOTPROTO=none
"""
_OVS_BRIDGE_DHCP = """# This file is autogenerated by os-net-config
DEVICE=br-ctlplane
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
DEVICETYPE=ovs
TYPE=OVSBridge
OVSBOOTPROTO=dhcp
OVSDHCPINTERFACES="em1"
"""
_NM_CONTROLLED_INTERFACE = _BASE_IFCFG_NETWORKMANAGER + """MASTER=bond1
SLAVE=yes
BOOTPROTO=none
"""
_NM_CONTROLLED_BOND = """# This file is autogenerated by os-net-config
DEVICE=bond1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=yes
PEERDNS=no
"""
_OVS_BRIDGE_DHCP_STANDALONE = _OVS_BRIDGE_DHCP + (
"OVS_EXTRA=\"set bridge br-ctlplane fail_mode=standalone "
"-- del-controller br-ctlplane\"\n")
_OVS_BRIDGE_DHCP_SECURE = _OVS_BRIDGE_DHCP + \
"OVS_EXTRA=\"set bridge br-ctlplane fail_mode=secure\"\n"
_LINUX_BRIDGE_DHCP = """# This file is autogenerated by os-net-config
DEVICE=br-ctlplane
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
TYPE=Bridge
DELAY=0
BOOTPROTO=dhcp
"""
_OVS_BRIDGE_STATIC = """# This file is autogenerated by os-net-config
DEVICE=br-ctlplane
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSBridge
BOOTPROTO=static
IPADDR=192.168.1.2
NETMASK=255.255.255.0
"""
_LINUX_BRIDGE_STATIC = """# This file is autogenerated by os-net-config
DEVICE=br-ctlplane
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
TYPE=Bridge
DELAY=0
BOOTPROTO=static
IPADDR=192.168.1.2
NETMASK=255.255.255.0
"""
_OVS_BRIDGE_DHCP_PRIMARY_INTERFACE = _OVS_BRIDGE_DHCP + \
"OVS_EXTRA=\"set bridge br-ctlplane other-config:hwaddr=a1:b2:c3:d4:e5\"\n"
_OVS_BRIDGE_DHCP_OVS_EXTRA = _OVS_BRIDGE_DHCP + \
"OVS_EXTRA=\"set bridge br-ctlplane other-config:hwaddr=a1:b2:c3:d4:e5" + \
" -- br-set-external-id br-ctlplane bridge-id br-ctlplane\"\n"
_BASE_VLAN = """# This file is autogenerated by os-net-config
DEVICE=vlan5
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
VLAN=yes
PHYSDEV=em1
"""
# vlans on an OVS bridge do not set VLAN=yes or PHYSDEV
_BASE_VLAN_OVS = """# This file is autogenerated by os-net-config
DEVICE=vlan5
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
"""
_VLAN_NO_IP = _BASE_VLAN + "BOOTPROTO=none\n"
_VLAN_OVS = _BASE_VLAN_OVS + "DEVICETYPE=ovs\nBOOTPROTO=none\n"
_VLAN_OVS_EXTRA = _BASE_VLAN_OVS + "OVS_OPTIONS=\"foo\"\nDEVICETYPE=ovs\n" + \
"BOOTPROTO=none\nOVS_EXTRA=\"bar -- baz\"\n"
_VLAN_OVS_BRIDGE = _BASE_VLAN_OVS + """DEVICETYPE=ovs
TYPE=OVSIntPort
OVS_BRIDGE=br-ctlplane
OVS_OPTIONS="tag=5"
BOOTPROTO=none
"""
_VLAN_LINUX_BRIDGE = _BASE_VLAN_OVS + """VLAN=yes
PHYSDEV=em1
BRIDGE=br-ctlplane
BOOTPROTO=none
"""
_OVS_BOND_DHCP = """# This file is autogenerated by os-net-config
DEVICE=bond0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
DEVICETYPE=ovs
TYPE=OVSBond
OVSBOOTPROTO=dhcp
BOND_IFACES="em1 em2"
"""
_LINUX_BOND_DHCP = """# This file is autogenerated by os-net-config
DEVICE=bond0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
BOOTPROTO=dhcp
"""
_LINUX_TEAM_DHCP = """# This file is autogenerated by os-net-config
DEVICE=team0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
BOOTPROTO=dhcp
DEVICETYPE=Team
"""
_LINUX_BOND_INTERFACE = _BASE_IFCFG + """MASTER=bond0
SLAVE=yes
BOOTPROTO=none
"""
_LINUX_TEAM_INTERFACE = _BASE_IFCFG + """TEAM_MASTER=team0
BOOTPROTO=none
"""
_IVS_UPLINK = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ivs
IVS_BRIDGE=ivs
BOOTPROTO=none
"""
_IVS_INTERFACE = """# This file is autogenerated by os-net-config
DEVICE=storage5
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
TYPE=IVSIntPort
DEVICETYPE=ivs
IVS_BRIDGE=ivs
MTU=1500
BOOTPROTO=static
IPADDR=172.16.2.7
NETMASK=255.255.255.0
"""
_IVS_CONFIG = ('DAEMON_ARGS=\"--hitless --certificate /etc/ivs '
'--inband-vlan 4092 -u em1 --internal-port=storage5\"')
_NFVSWITCH_INTERFACE = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=nfvswitch
NFVSWITCH_BRIDGE=nfvswitch
BOOTPROTO=none
"""
_NFVSWITCH_INTERNAL = """# This file is autogenerated by os-net-config
DEVICE=storage5
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
TYPE=NFVSWITCHIntPort
DEVICETYPE=nfvswitch
NFVSWITCH_BRIDGE=nfvswitch
MTU=1500
BOOTPROTO=static
IPADDR=172.16.2.7
NETMASK=255.255.255.0
"""
_NFVSWITCH_CONFIG = ('SETUP_ARGS=\"-c 2,3,4,5 -u em1 -m storage5\"')
_OVS_IFCFG_PATCH_PORT = """# This file is autogenerated by os-net-config
DEVICE=br-pub-patch
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSPatchPort
OVS_BRIDGE=br-ex
OVS_PATCH_PEER=br-ex-patch
"""
_LINUX_TEAM_PRIMARY_IFACE = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
TEAM_MASTER=team1
TEAM_PORT_CONFIG='{"prio": 100}'
BOOTPROTO=none
"""
_CONTRAIL_VROUTER_IFACE = """# This file is autogenerated by os-net-config
DEVICE=vhost0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
DEVICETYPE=vhost
TYPE=kernel_mode
BIND_INT=em3
"""
_CONTRAIL_VROUTER_VLAN_IFACE = """# This file is autogenerated by os-net-config
DEVICE=vhost0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
DEVICETYPE=vhost
TYPE=kernel_mode
BIND_INT=vlan100
"""
_CONTRAIL_VROUTER_DPDK_IFACE = """# This file is autogenerated by os-net-config
DEVICE=vhost0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
DEVICETYPE=vhost
TYPE=dpdk
BIND_INT=0000:00:03.0
DRIVER=uio_pci_generic
CPU_LIST=0-31
"""
_CONTRAIL_VROUTER_DPDK_IFACE_CUST_DRIVER = """# This file is autogenerated by \
os-net-config
DEVICE=vhost0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
DEVICETYPE=vhost
TYPE=dpdk
BIND_INT=0000:00:03.0
DRIVER=vfio
CPU_LIST=0-31
"""
_CONTRAIL_VROUTER_DPDK_BOND_IFACE = """# This file is autogenerated by \
os-net-config
DEVICE=vhost0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
DEVICETYPE=vhost
TYPE=dpdk
BIND_INT=0000:00:03.0,0000:00:01.0
BOND_MODE=2
BOND_POLICY=802.3ad
DRIVER=uio_pci_generic
CPU_LIST=2,3
"""
_SRIOV_PF_IFCFG = """# This file is autogenerated by os-net-config
DEVICE=enp3s0f0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
_V4_SRIOV_PF_IFCFG = """# This file is autogenerated by os-net-config
DEVICE=enp3s0f0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=192.168.1.2
NETMASK=255.255.255.0
"""
class TestIfcfgNetConfig(base.TestCase):
def setUp(self):
super(TestIfcfgNetConfig, self).setUp()
rand = str(int(random.random() * 100000))
sriov_config._SRIOV_CONFIG_FILE = '/tmp/sriov_config_' + rand + '.yaml'
self.provider = impl_ifcfg.IfcfgNetConfig()
def stub_is_ovs_installed():
return True
self.stub_out('os_net_config.utils.is_ovs_installed',
stub_is_ovs_installed)
def tearDown(self):
super(TestIfcfgNetConfig, self).tearDown()
if os.path.isfile(sriov_config._SRIOV_CONFIG_FILE):
os.remove(sriov_config._SRIOV_CONFIG_FILE)
def get_interface_config(self, name='em1'):
return self.provider.interface_data[name]
def get_vlan_config(self, name='vlan1'):
return self.provider.vlan_data[name]
def get_linux_bond_config(self, name='bond0'):
return self.provider.linuxbond_data[name]
def get_linux_team_config(self, name='team0'):
return self.provider.linuxteam_data[name]
def get_route_config(self, name='em1'):
return self.provider.route_data.get(name, '')
def get_route_table_config(self, name='custom', table_id=200):
return self.provider.route_table_data.get(name, table_id)
def get_rule_config(self, name='em1'):
return self.provider.rule_data.get(name)
def get_route6_config(self, name='em1'):
return self.provider.route6_data.get(name, '')
def stub_get_stored_pci_address(self, ifname, noop):
if 'eth0' in ifname:
return "0000:00:07.0"
if 'eth1' in ifname:
return "0000:00:08.0"
if 'eth2' in ifname:
return "0000:00:09.0"
if 'em3' in ifname:
return "0000:00:03.0"
if 'em1' in ifname:
return "0000:00:01.0"
def test_add_route_table(self):
route_table1 = objects.RouteTable('table1', 200)
route_table2 = objects.RouteTable('table2', '201')
self.provider.add_route_table(route_table1)
self.provider.add_route_table(route_table2)
self.assertEqual("table1", self.get_route_table_config(200))
self.assertEqual("table2", self.get_route_table_config(201))
def test_add_route_with_table(self):
route_rule1 = objects.RouteRule('from 192.0.2.0/24 table 200',
'test comment')
# Test route table by name
route1 = objects.Route('192.168.1.1', '172.19.0.0/24', False,
route_table="table1")
# Test that table specified in route_options takes precedence
route2 = objects.Route('192.168.1.1', '172.20.0.0/24', False,
'table 201', route_table=200)
# Test route table specified by integer ID
route3 = objects.Route('192.168.1.1', '172.21.0.0/24', False,
route_table=200)
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr],
routes=[route1, route2, route3],
rules=[route_rule1])
self.provider.add_interface(interface)
self.assertEqual(_V4_IFCFG, self.get_interface_config())
self.assertEqual(_ROUTES_WITH_TABLES, self.get_route_config())
self.assertEqual(_ROUTE_RULES, self.get_rule_config())
def test_add_base_interface(self):
interface = objects.Interface('em1')
self.provider.add_interface(interface)
self.assertEqual(_NO_IP, self.get_interface_config())
def test_add_interface_with_hotplug(self):
interface = objects.Interface('em1', hotplug=True)
self.provider.add_interface(interface)
self.assertEqual(_HOTPLUG, self.get_interface_config())
def test_add_interface_with_onboot(self):
interface = objects.Interface('em1', onboot=True)
self.provider.add_interface(interface)
self.assertEqual(_ONBOOT, self.get_interface_config())
def test_add_interface_with_linkdelay(self):
interface = objects.Interface('em1', linkdelay=10)
self.provider.add_interface(interface)
self.assertEqual(_LINKDELAY, self.get_interface_config())
def test_add_base_interface_vlan(self):
interface = objects.Interface('em1.120')
self.provider.add_interface(interface)
self.assertEqual(_IFCFG_VLAN, self.get_interface_config('em1.120'))
def test_add_ovs_interface(self):
interface = objects.Interface('em1')
interface.ovs_port = True
self.provider.add_interface(interface)
self.assertEqual(_OVS_IFCFG, self.get_interface_config())
def test_add_ovs_tunnel(self):
interface = objects.OvsTunnel('tun0')
interface.type = 'ovs_tunnel'
interface.tunnel_type = 'gre'
interface.ovs_options = ['options:remote_ip=192.168.1.1']
interface.bridge_name = 'br-ctlplane'
self.provider.add_interface(interface)
self.assertEqual(_OVS_IFCFG_TUNNEL, self.get_interface_config('tun0'))
def test_add_ovs_patch_port(self):
patch_port = objects.OvsPatchPort("br-pub-patch")
patch_port.type = 'ovs_patch_port'
patch_port.bridge_name = 'br-ex'
patch_port.peer = 'br-ex-patch'
self.provider.add_interface(patch_port)
self.assertEqual(_OVS_IFCFG_PATCH_PORT,
self.get_interface_config('br-pub-patch'))
def test_add_interface_with_v4(self):
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr])
self.provider.add_interface(interface)
self.assertEqual(_V4_IFCFG, self.get_interface_config())
self.assertEqual('', self.get_route_config())
def test_add_interface_with_v4_multiple(self):
addresses = [objects.Address('192.168.1.2/24'),
objects.Address('192.168.1.3/32'),
objects.Address('10.0.0.2/8')]
interface = objects.Interface('em1', addresses=addresses)
self.provider.add_interface(interface)
self.assertEqual(_V4_IFCFG_MULTIPLE, self.get_interface_config())
self.assertEqual('', self.get_route_config())
def test_add_interface_map_persisted(self):
def test_interface_mac(name):
macs = {'em1': 'a1:b2:c3:d4:e5'}
return macs[name]
self.stub_out('os_net_config.utils.interface_mac', test_interface_mac)
nic_mapping = {'nic1': 'em1'}
self.stubbed_mapped_nics = nic_mapping
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('nic1', addresses=[v4_addr],
nic_mapping=nic_mapping,
persist_mapping=True)
self.assertEqual('a1:b2:c3:d4:e5', interface.hwaddr)
self.provider.add_interface(interface)
self.assertEqual(_V4_IFCFG_MAPPED, self.get_interface_config('nic1'))
self.assertEqual('', self.get_route_config('nic1'))
def test_add_interface_with_v6(self):
v6_addr = objects.Address('2001:abc:a::/64')
interface = objects.Interface('em1', addresses=[v6_addr])
self.provider.add_interface(interface)
self.assertEqual(_V6_IFCFG, self.get_interface_config())
def test_add_interface_with_v6_multiple(self):
addresses = [objects.Address('2001:abc:a::/64'),
objects.Address('2001:abc:b::1/64'),
objects.Address('2001:abc:c::2/96')]
interface = objects.Interface('em1', addresses=addresses)
self.provider.add_interface(interface)
self.assertEqual(_V6_IFCFG_MULTIPLE, self.get_interface_config())
def test_network_with_routes(self):
route1 = objects.Route('192.168.1.1', default=True,
route_options="metric 10")
route2 = objects.Route('192.168.1.1', '172.19.0.0/24')
route3 = objects.Route('192.168.1.5', '172.20.0.0/24',
route_options="metric 100")
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr],
routes=[route1, route2, route3])
self.provider.add_interface(interface)
self.assertEqual(_V4_IFCFG, self.get_interface_config())
self.assertEqual(_ROUTES, self.get_route_config())
def test_network_with_ipv6_routes(self):
route1 = objects.Route('192.168.1.1', default=True,
route_options="metric 10")
route2 = objects.Route('192.168.1.1', '172.19.0.0/24')
route3 = objects.Route('192.168.1.5', '172.20.0.0/24',
route_options="metric 100")
route4 = objects.Route('2001:db8::1', default=True)
route5 = objects.Route('fd00:fd00:2000::1',
'2001:db8:dead:beef:cafe::/56')
route6 = objects.Route('fd00:fd00:2000::1',
'2001:db8:dead:beff::/64',
route_options="metric 100")
v4_addr = objects.Address('192.168.1.2/24')
v6_addr = objects.Address('2001:abc:a::/64')
interface = objects.Interface('em1', addresses=[v4_addr, v6_addr],
routes=[route1, route2, route3,
route4, route5, route6])
self.provider.add_interface(interface)
self.assertEqual(_V4_V6_IFCFG, self.get_interface_config())
self.assertEqual(_ROUTES_V6, self.get_route6_config())
def test_network_ovs_bridge_with_dhcp(self):
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_DHCP,
self.provider.bridge_data['br-ctlplane'])
def test_network_ovs_bridge_with_standalone_fail_mode(self):
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface],
fail_mode='standalone')
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_DHCP_STANDALONE,
self.provider.bridge_data['br-ctlplane'])
def test_network_ovs_bridge_with_secure_fail_mode(self):
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface],
fail_mode='secure')
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_DHCP_SECURE,
self.provider.bridge_data['br-ctlplane'])
def test_network_linux_bridge_with_dhcp(self):
interface = objects.Interface('em1')
bridge = objects.LinuxBridge('br-ctlplane', use_dhcp=True,
members=[interface])
self.provider.add_linux_bridge(bridge)
self.provider.add_interface(interface)
self.assertEqual(_LINUX_BRIDGE_IFCFG, self.get_interface_config())
self.assertEqual(_LINUX_BRIDGE_DHCP,
self.provider.linuxbridge_data['br-ctlplane'])
def test_network_ovs_bridge_static(self):
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', members=[interface],
addresses=[v4_addr])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_STATIC,
self.provider.bridge_data['br-ctlplane'])
def test_network_ovs_bridge_with_tunnel(self):
interface = objects.OvsTunnel('tun0')
interface.type = 'ovs_tunnel'
interface.tunnel_type = 'gre'
interface.ovs_options = ['options:remote_ip=192.168.1.1']
interface.bridge_name = 'br-ctlplane'
self.provider.add_interface(interface)
v4_addr = objects.Address('192.168.1.2/24')
bridge = objects.OvsBridge('br-ctlplane', members=[interface],
addresses=[v4_addr])
self.provider.add_bridge(bridge)
self.provider.add_interface(interface)
self.assertEqual(_OVS_IFCFG_TUNNEL, self.get_interface_config('tun0'))
self.assertEqual(_OVS_BRIDGE_STATIC,
self.provider.bridge_data['br-ctlplane'])
def test_network_linux_bridge_static(self):
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1')
bridge = objects.LinuxBridge('br-ctlplane', members=[interface],
addresses=[v4_addr])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_LINUX_BRIDGE_IFCFG, self.get_interface_config())
self.assertEqual(_LINUX_BRIDGE_STATIC,
self.provider.bridge_data['br-ctlplane'])
def test_network_ovs_bridge_with_dhcp_primary_interface(self):
def test_interface_mac(name):
return "a1:b2:c3:d4:e5"
self.stub_out('os_net_config.utils.interface_mac', test_interface_mac)
interface = objects.Interface('em1', primary=True)
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_DHCP_PRIMARY_INTERFACE,
self.provider.bridge_data['br-ctlplane'])
def test_network_ovs_bridge_with_dhcp_primary_interface_with_extra(self):
def test_interface_mac(name):
return "a1:b2:c3:d4:e5"
self.stub_out('os_net_config.utils.interface_mac', test_interface_mac)
interface = objects.Interface('em1', primary=True)
ovs_extra = "br-set-external-id br-ctlplane bridge-id br-ctlplane"
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface],
ovs_extra=[ovs_extra])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_DHCP_OVS_EXTRA,
self.provider.bridge_data['br-ctlplane'])
def test_network_ovs_bridge_with_dhcp_primary_interface_with_format(self):
def test_interface_mac(name):
return "a1:b2:c3:d4:e5"
self.stub_out('os_net_config.utils.interface_mac', test_interface_mac)
interface = objects.Interface('em1', primary=True)
ovs_extra = "br-set-external-id {name} bridge-id {name}"
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface],
ovs_extra=[ovs_extra])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_DHCP_OVS_EXTRA,
self.provider.bridge_data['br-ctlplane'])
def test_network_ivs_with_uplink_and_interface(self):
interface = objects.Interface('em1')
v4_addr = objects.Address('172.16.2.7/24')
ivs_interface = objects.IvsInterface(vlan_id=5,
name='storage',
addresses=[v4_addr])
bridge = objects.IvsBridge(members=[interface, ivs_interface])
self.provider.add_interface(interface)
self.provider.add_ivs_interface(ivs_interface)
self.provider.add_bridge(bridge)
self.assertEqual(_IVS_UPLINK, self.get_interface_config())
self.assertEqual(_IVS_INTERFACE,
self.provider.ivsinterface_data[ivs_interface.name])
data = self.provider.generate_ivs_config(['em1'], ['storage5'])
self.assertEqual(_IVS_CONFIG, data)
def test_network_nfvswitch_with_interfaces_and_internal_interfaces(self):
interface = objects.Interface('em1')
v4_addr = objects.Address('172.16.2.7/24')
nfvswitch_internal = objects.NfvswitchInternal(vlan_id=5,
name='storage',
addresses=[v4_addr])
iface_name = nfvswitch_internal.name
bridge = objects.NfvswitchBridge(members=[interface,
nfvswitch_internal],
options="-c 2,3,4,5")
self.provider.add_interface(interface)
self.provider.add_nfvswitch_internal(nfvswitch_internal)
self.provider.add_nfvswitch_bridge(bridge)
self.assertEqual(_NFVSWITCH_INTERFACE, self.get_interface_config())
self.assertEqual(_NFVSWITCH_INTERNAL,
self.provider.nfvswitch_intiface_data[iface_name])
data = self.provider.generate_nfvswitch_config(['em1'], ['storage5'])
self.assertEqual(_NFVSWITCH_CONFIG, data)
def test_add_ib_interface_with_v4_multiple(self):
addresses = [objects.Address('192.168.1.2/24'),
objects.Address('192.168.1.3/32'),
objects.Address('10.0.0.2/8')]
ib_interface = objects.IbInterface('ib0', addresses=addresses)
self.provider.add_interface(ib_interface)
self.assertEqual(_IB_V4_IFCFG_MULTIPLE,
self.get_interface_config('ib0'))
self.assertEqual('', self.get_route_config())
def test_add_contrail_vrouter(self):
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('em3')
cvi = objects.ContrailVrouter('vhost0', addresses=addresses,
members=[interface1],)
self.provider.add_contrail_vrouter(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_vlan(self):
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('vlan100')
cvi = objects.ContrailVrouter('vhost0', addresses=addresses,
members=[interface1],)
self.provider.add_contrail_vrouter(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_VLAN_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_with_nic_mapping(self):
nic_mapping = {'nic1': 'em3'}
self.stubbed_mapped_nics = nic_mapping
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('nic1')
cvi = objects.ContrailVrouter('vhost0', addresses=addresses,
members=[interface1])
self.provider.add_contrail_vrouter(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_dpdk_interface(self):
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('em3')
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
cvi = objects.ContrailVrouterDpdk('vhost0', addresses=addresses,
members=[interface1])
self.provider.noop = True
self.provider.add_contrail_vrouter_dpdk(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_DPDK_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_dpdk_interface_cust_driver(self):
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('em3')
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
cvi = objects.ContrailVrouterDpdk('vhost0', addresses=addresses,
members=[interface1], driver='vfio')
self.provider.noop = True
self.provider.add_contrail_vrouter_dpdk(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_DPDK_IFACE_CUST_DRIVER,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_dpdk_interface_nic_mapping(self):
nic_mapping = {'nic1': 'em3'}
self.stubbed_mapped_nics = nic_mapping
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('nic1')
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
cvi = objects.ContrailVrouterDpdk('vhost0', addresses=addresses,
members=[interface1])
self.provider.noop = True
self.provider.add_contrail_vrouter_dpdk(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_DPDK_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_dpdk_bond_interface(self):
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('em3')
interface2 = objects.Interface('em1')
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
cvi = objects.ContrailVrouterDpdk('vhost0', addresses=addresses,
members=[interface1, interface2],
bond_mode="2",
bond_policy="802.3ad",
cpu_list="2,3")
self.provider.noop = True
self.provider.add_contrail_vrouter_dpdk(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_DPDK_BOND_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_dpdk_bond_interface_nic_mapping(self):
nic_mapping = {'nic1': 'em3', 'nic2': 'em1'}
self.stubbed_mapped_nics = nic_mapping
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('nic1')
interface2 = objects.Interface('nic2')
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
cvi = objects.ContrailVrouterDpdk('vhost0', addresses=addresses,
members=[interface1, interface2],
bond_mode="2",
bond_policy="802.3ad",
cpu_list="2,3")
self.provider.noop = True
self.provider.add_contrail_vrouter_dpdk(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_DPDK_BOND_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_vlan(self):
vlan = objects.Vlan('em1', 5)
self.provider.add_vlan(vlan)
self.assertEqual(_VLAN_NO_IP, self.get_vlan_config('vlan5'))
def test_add_vlan_ovs(self):
vlan = objects.Vlan('em1', 5)
vlan.ovs_port = True
self.provider.add_vlan(vlan)
self.assertEqual(_VLAN_OVS, self.get_vlan_config('vlan5'))
def test_add_vlan_ovs_options(self):
vlan = objects.Vlan('em1', 5)
vlan.ovs_port = True
vlan.ovs_options = 'foo'
vlan.ovs_extra = ['bar', 'baz']
self.provider.add_vlan(vlan)
self.assertEqual(_VLAN_OVS_EXTRA, self.get_vlan_config('vlan5'))
def test_add_vlan_mtu_1500(self):
vlan = objects.Vlan('em1', 5, mtu=1500)
self.provider.add_vlan(vlan)
expected = _VLAN_NO_IP + 'MTU=1500\n'
self.assertEqual(expected, self.get_vlan_config('vlan5'))
def test_add_ovs_bridge_with_vlan(self):
vlan = objects.Vlan('em1', 5)
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[vlan])
self.provider.add_vlan(vlan)
self.provider.add_bridge(bridge)
self.assertEqual(_VLAN_OVS_BRIDGE, self.get_vlan_config('vlan5'))
def test_add_linux_bridge_with_vlan(self):
vlan = objects.Vlan('em1', 5)
bridge = objects.LinuxBridge('br-ctlplane', use_dhcp=True,
members=[vlan])
self.provider.add_vlan(vlan)
self.provider.add_bridge(bridge)
self.assertEqual(_VLAN_LINUX_BRIDGE, self.get_vlan_config('vlan5'))
def test_ovs_bond(self):
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2')
bond = objects.OvsBond('bond0', use_dhcp=True,
members=[interface1, interface2])
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.provider.add_bond(bond)
self.assertEqual(_NO_IP, self.get_interface_config('em1'))
em2_config = """# This file is autogenerated by os-net-config
DEVICE=em2
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
self.assertEqual(em2_config, self.get_interface_config('em2'))
self.assertEqual(_OVS_BOND_DHCP,
self.get_interface_config('bond0'))
def test_linux_bond(self):
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2')
bond = objects.LinuxBond('bond0', use_dhcp=True,
members=[interface1, interface2])
self.provider.add_linux_bond(bond)
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.assertEqual(_LINUX_BOND_DHCP,
self.get_linux_bond_config('bond0'))
self.assertEqual(_LINUX_BOND_INTERFACE,
self.get_interface_config('em1'))
def test_linux_team(self):
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2')
team = objects.LinuxTeam('team0', use_dhcp=True,
members=[interface1, interface2])
self.provider.add_linux_team(team)
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.assertEqual(_LINUX_TEAM_DHCP,
self.get_linux_team_config('team0'))
self.assertEqual(_LINUX_TEAM_INTERFACE,
self.get_interface_config('em1'))
def test_interface_defroute(self):
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2', defroute=False)
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
em2_config = """# This file is autogenerated by os-net-config
DEVICE=em2
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
DEFROUTE=no
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
self.assertEqual(em2_config, self.get_interface_config('em2'))
def test_interface_dhclient_opts(self):
interface1 = objects.Interface('em1', dhclient_args='--foobar')
self.provider.add_interface(interface1)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
DHCLIENTARGS=--foobar
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
def test_sriov_pf_ethtool_opts(self):
ifc = objects.SriovPF('enp3s0f0', 16,
ethtool_opts='speed 1000 duplex full')
self.provider.add_interface(ifc)
pf_config = "".join((_SRIOV_PF_IFCFG,
"ETHTOOL_OPTS=\"speed 1000 duplex full\"\n"))
self.assertEqual(pf_config, self.get_interface_config('enp3s0f0'))
def test_interface_ethtool_opts(self):
interface1 = objects.Interface('em1',
ethtool_opts='speed 1000 duplex full')
self.provider.add_interface(interface1)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
ETHTOOL_OPTS=\"speed 1000 duplex full\"
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
def test_ib_interface_ethtool_opts(self):
ifc = objects.IbInterface('ib0', ethtool_opts='speed 1000 duplex full')
self.provider.add_interface(ifc)
ib_config = "".join((_IB_IFCFG,
"ETHTOOL_OPTS=\"speed 1000 duplex full\"\n"))
self.assertEqual(ib_config, self.get_interface_config('ib0'))
def test_linux_bond_with_ethtool_opts(self):
interface1 = objects.Interface(
'em1',
ethtool_opts='-K ${DEVICE} tx-gre-csum-segmentation off')
interface2 = objects.Interface(
'em2',
ethtool_opts='-K ${DEVICE} tx-gre-csum-segmentation off')
bond = objects.LinuxBond(
'bond0', use_dhcp=True,
members=[interface1, interface2],
ethtool_opts='-K ${DEVICE} tx-gre-csum-segmentation off')
self.provider.add_linux_bond(bond)
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
bond_config = "".join(
(_LINUX_BOND_DHCP,
"ETHTOOL_OPTS=\"-K ${DEVICE} tx-gre-csum-segmentation off\"\n"))
interface_config = "".join(
(_LINUX_BOND_INTERFACE,
"ETHTOOL_OPTS=\"-K ${DEVICE} tx-gre-csum-segmentation off\"\n"))
self.assertEqual(bond_config,
self.get_linux_bond_config('bond0'))
self.assertEqual(interface_config,
self.get_interface_config('em1'))
def test_interface_single_dns_server(self):
interface1 = objects.Interface('em1', dns_servers=['1.2.3.4'])
self.provider.add_interface(interface1)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
BOOTPROTO=none
DNS1=1.2.3.4
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
def test_interface_dns_servers(self):
interface1 = objects.Interface('em1', dns_servers=['1.2.3.4',
'5.6.7.8'])
self.provider.add_interface(interface1)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
BOOTPROTO=none
DNS1=1.2.3.4
DNS2=5.6.7.8
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
def test_interface_more_dns_servers(self):
interface1 = objects.Interface('em1', dns_servers=['1.2.3.4',
'5.6.7.8',
'9.10.11.12'])
self.provider.add_interface(interface1)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
BOOTPROTO=none
DNS1=1.2.3.4
DNS2=5.6.7.8
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
def test_interface_single_domain(self):
interface1 = objects.Interface('em1', dns_servers=['1.2.3.4'],
domain='openstack.local')
self.provider.add_interface(interface1)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
BOOTPROTO=none
DNS1=1.2.3.4
DOMAIN=openstack.local
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
def test_interface_single_domain_in_list(self):
interface1 = objects.Interface('em1', dns_servers=['1.2.3.4'],
domain=['openstack.local'])
self.provider.add_interface(interface1)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
BOOTPROTO=none
DNS1=1.2.3.4
DOMAIN="openstack.local"
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
def test_interface_multiple_domains(self):
interface1 = objects.Interface(
'em1',
dns_servers=['1.2.3.4'],
domain=['openstack.local', 'subdomain.openstack.local'])
self.provider.add_interface(interface1)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
BOOTPROTO=none
DNS1=1.2.3.4
DOMAIN="openstack.local subdomain.openstack.local"
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
def test_nm_controlled(self):
interface1 = objects.Interface('em1', nm_controlled=True)
interface2 = objects.Interface('em2', nm_controlled=True)
bond = objects.LinuxBond('bond1', nm_controlled=True,
members=[interface1, interface2])
self.provider.add_linux_bond(bond)
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
ifcfg_data = self.get_interface_config('em1')
self.assertEqual(_NM_CONTROLLED_INTERFACE, ifcfg_data)
bond_data = self.get_linux_bond_config('bond1')
self.assertEqual(_NM_CONTROLLED_BOND, bond_data)
def test_network_sriov_vf_without_config(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
addresses = [objects.Address('10.0.0.30/24')]
def test_update_sriov_vf_map(pf_name, vfid, vf_name, vlan_id=None,
qos=None, spoofcheck=None, trust=None,
state=None, macaddr=None, promisc=None):
self.assertEqual(pf_name, 'eth2')
self.assertEqual(vfid, 7)
self.assertEqual(vlan_id, 0)
self.assertEqual(qos, 0)
self.assertEqual(spoofcheck, None)
self.assertEqual(trust, None)
self.assertEqual(state, None)
self.assertEqual(macaddr, None)
self.stub_out('os_net_config.utils.update_sriov_vf_map',
test_update_sriov_vf_map)
def test_get_vf_devname(device, vfid):
return device + '_' + str(vfid)
self.stub_out('os_net_config.utils.get_vf_devname',
test_get_vf_devname)
vf = objects.SriovVF(device='nic3', vfid=7, addresses=addresses)
self.provider.add_sriov_vf(vf)
vf_config = """# This file is autogenerated by os-net-config
DEVICE=eth2_7
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
"""
self.assertEqual(vf_config, self.get_interface_config('eth2_7'))
def test_network_sriov_vf_true(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
addresses = [objects.Address('10.0.0.30/24')]
def test_update_sriov_vf_map(pf_name, vfid, vf_name, vlan_id=None,
qos=None, spoofcheck=None, trust=None,
state=None, macaddr=None, promisc=None):
self.assertEqual(pf_name, 'eth2')
self.assertEqual(vf_name, 'eth2_7')
self.assertEqual(vfid, 7)
self.assertEqual(vlan_id, 100)
self.assertEqual(qos, 10)
self.assertTrue(spoofcheck)
self.assertTrue(trust)
self.assertEqual(state, "auto")
self.assertEqual(macaddr, "AA:BB:CC:DD:EE:FF")
self.assertTrue(promisc)
self.stub_out('os_net_config.utils.update_sriov_vf_map',
test_update_sriov_vf_map)
def test_get_vf_devname(device, vfid):
return device + '_' + str(vfid)
self.stub_out('os_net_config.utils.get_vf_devname',
test_get_vf_devname)
vf = objects.SriovVF(device='nic3', vfid=7, addresses=addresses,
vlan_id='100', qos='10', spoofcheck=True,
trust=True, state="auto",
macaddr="AA:BB:CC:DD:EE:FF", promisc=True)
self.provider.add_sriov_vf(vf)
vf_config = """# This file is autogenerated by os-net-config
DEVICE=eth2_7
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
"""
self.assertEqual(vf_config, self.get_interface_config('eth2_7'))
def test_network_sriov_vf_config_false(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
addresses = [objects.Address('10.0.0.30/24')]
def test_update_sriov_vf_map(pf_name, vfid, vf_name, vlan_id=None,
qos=None, spoofcheck=None, trust=None,
state=None, macaddr=None, promisc=None):
self.assertEqual(pf_name, 'eth2')
self.assertEqual(vf_name, 'eth2_7')
self.assertEqual(vfid, 7)
self.assertEqual(vlan_id, 100)
self.assertEqual(qos, 10)
self.assertFalse(spoofcheck)
self.assertFalse(trust)
self.assertEqual(state, "enable")
self.assertEqual(macaddr, "AA:BB:CC:DD:EE:FF")
self.assertFalse(promisc)
self.stub_out('os_net_config.utils.update_sriov_vf_map',
test_update_sriov_vf_map)
def test_get_vf_devname(device, vfid):
return device + '_' + str(vfid)
self.stub_out('os_net_config.utils.get_vf_devname',
test_get_vf_devname)
vf = objects.SriovVF(device='nic3', vfid=7, addresses=addresses,
vlan_id='100', qos='10', spoofcheck=False,
trust=False, state="enable",
macaddr="AA:BB:CC:DD:EE:FF", promisc=False)
self.provider.add_sriov_vf(vf)
vf_config = """# This file is autogenerated by os-net-config
DEVICE=eth2_7
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
"""
self.assertEqual(vf_config, self.get_interface_config('eth2_7'))
def test_network_sriov_pf_without_promisc(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
pf = objects.SriovPF(name='nic3', numvfs=10)
def test_update_sriov_pf_map(name, numvfs, noop, promisc=None,
link_mode='legacy'):
self.assertEqual(name, 'eth2')
self.assertEqual(numvfs, 10)
self.assertEqual(promisc, None)
self.assertEqual(link_mode, 'legacy')
self.stub_out('os_net_config.utils.update_sriov_pf_map',
test_update_sriov_pf_map)
self.provider.add_sriov_pf(pf)
pf_config = """# This file is autogenerated by os-net-config
DEVICE=eth2
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
self.assertEqual(pf_config, self.get_interface_config('eth2'))
def test_network_sriov_pf_with_promisc_on(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
pf = objects.SriovPF(name='nic3', numvfs=10, promisc=True)
def test_update_sriov_pf_map(name, numvfs, noop, promisc=None,
link_mode='legacy'):
self.assertEqual(name, 'eth2')
self.assertEqual(numvfs, 10)
self.assertTrue(promisc)
self.assertEqual(link_mode, 'legacy')
self.stub_out('os_net_config.utils.update_sriov_pf_map',
test_update_sriov_pf_map)
self.provider.add_sriov_pf(pf)
pf_config = """# This file is autogenerated by os-net-config
DEVICE=eth2
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
self.assertEqual(pf_config, self.get_interface_config('eth2'))
def test_network_sriov_pf_with_promisc_off(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
pf = objects.SriovPF(name='nic3', numvfs=10, promisc=False)
def test_update_sriov_pf_map(name, numvfs, noop, promisc=None,
link_mode='legacy'):
self.assertEqual(name, 'eth2')
self.assertEqual(numvfs, 10)
self.assertFalse(promisc)
self.assertEqual(link_mode, 'legacy')
self.stub_out('os_net_config.utils.update_sriov_pf_map',
test_update_sriov_pf_map)
self.provider.add_sriov_pf(pf)
pf_config = """# This file is autogenerated by os-net-config
DEVICE=eth2
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
self.assertEqual(pf_config, self.get_interface_config('eth2'))
def test_network_ovs_dpdk_bridge_and_port(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
interface = objects.Interface(name='nic3')
dpdk_port = objects.OvsDpdkPort(name='dpdk0', members=[interface])
bridge = objects.OvsUserBridge('br-link', members=[dpdk_port])
def test_bind_dpdk_interfaces(ifname, driver, noop):
self.assertEqual(ifname, 'eth2')
self.assertEqual(driver, 'vfio-pci')
self.stub_out('os_net_config.utils.bind_dpdk_interfaces',
test_bind_dpdk_interfaces)
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
self.provider.add_ovs_dpdk_port(dpdk_port)
self.provider.add_ovs_user_bridge(bridge)
br_link_config = """# This file is autogenerated by os-net-config
DEVICE=br-link
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSUserBridge
"""
dpdk0_config = """# This file is autogenerated by os-net-config
DEVICE=dpdk0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSDPDKPort
OVS_BRIDGE=br-link
OVS_EXTRA="set Interface $DEVICE options:dpdk-devargs=0000:00:09.0"
"""
self.assertEqual(br_link_config,
self.provider.bridge_data['br-link'])
self.assertEqual(dpdk0_config, self.get_interface_config('dpdk0'))
def test_network_ovs_dpdk_bridge_and_port_with_mtu_rxqueue(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
interface = objects.Interface(name='nic3')
dpdk_port = objects.OvsDpdkPort(name='dpdk0', members=[interface],
mtu=9000, rx_queue=4)
bridge = objects.OvsUserBridge('br-link', members=[dpdk_port])
def test_bind_dpdk_interfaces(ifname, driver, noop):
self.assertEqual(ifname, 'eth2')
self.assertEqual(driver, 'vfio-pci')
self.stub_out('os_net_config.utils.bind_dpdk_interfaces',
test_bind_dpdk_interfaces)
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
self.provider.add_ovs_dpdk_port(dpdk_port)
self.provider.add_ovs_user_bridge(bridge)
br_link_config = """# This file is autogenerated by os-net-config
DEVICE=br-link
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSUserBridge
"""
dpdk0_config = """# This file is autogenerated by os-net-config
DEVICE=dpdk0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSDPDKPort
OVS_BRIDGE=br-link
RX_QUEUE=4
MTU=9000
OVS_EXTRA="set Interface $DEVICE options:dpdk-devargs=0000:00:09.0 \
-- set Interface $DEVICE mtu_request=$MTU \
-- set Interface $DEVICE options:n_rxq=$RX_QUEUE"
"""
self.assertEqual(br_link_config,
self.provider.bridge_data['br-link'])
self.assertEqual(dpdk0_config, self.get_interface_config('dpdk0'))
def test_network_ovs_dpdk_bond(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
iface0 = objects.Interface(name='nic2')
dpdk0 = objects.OvsDpdkPort(name='dpdk0', members=[iface0])
iface1 = objects.Interface(name='nic3')
dpdk1 = objects.OvsDpdkPort(name='dpdk1', members=[iface1])
bond = objects.OvsDpdkBond('dpdkbond0', members=[dpdk0, dpdk1])
bridge = objects.OvsUserBridge('br-link', members=[bond])
def test_bind_dpdk_interfaces(ifname, driver, noop):
self.assertIn(ifname, ['eth1', 'eth2'])
self.assertEqual(driver, 'vfio-pci')
self.stub_out('os_net_config.utils.bind_dpdk_interfaces',
test_bind_dpdk_interfaces)
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
self.provider.add_ovs_dpdk_bond(bond)
self.provider.add_ovs_user_bridge(bridge)
dpdk_bond_config = """# This file is autogenerated by os-net-config
DEVICE=dpdkbond0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSDPDKBond
OVS_BRIDGE=br-link
BOND_IFACES="dpdk0 dpdk1"
OVS_EXTRA="set Interface dpdk0 options:dpdk-devargs=0000:00:08.0 \
-- set Interface dpdk1 options:dpdk-devargs=0000:00:09.0"
"""
self.assertEqual(dpdk_bond_config,
self.get_interface_config('dpdkbond0'))
def test_network_ovs_dpdk_bond_with_mtu(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
iface0 = objects.Interface(name='nic2')
dpdk0 = objects.OvsDpdkPort(name='dpdk0', members=[iface0])
iface1 = objects.Interface(name='nic3')
dpdk1 = objects.OvsDpdkPort(name='dpdk1', members=[iface1])
bond = objects.OvsDpdkBond('dpdkbond0', mtu=9000,
members=[dpdk0, dpdk1])
bridge = objects.OvsUserBridge('br-link', members=[bond])
def test_bind_dpdk_interfaces(ifname, driver, noop):
self.assertIn(ifname, ['eth1', 'eth2'])
self.assertEqual(driver, 'vfio-pci')
self.stub_out('os_net_config.utils.bind_dpdk_interfaces',
test_bind_dpdk_interfaces)
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
self.provider.add_ovs_dpdk_bond(bond)
self.provider.add_ovs_user_bridge(bridge)
dpdk_bond_config = """# This file is autogenerated by os-net-config
DEVICE=dpdkbond0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSDPDKBond
OVS_BRIDGE=br-link
BOND_IFACES="dpdk0 dpdk1"
MTU=9000
OVS_EXTRA="set Interface dpdk0 options:dpdk-devargs=0000:00:08.0 \
-- set Interface dpdk1 options:dpdk-devargs=0000:00:09.0 \
-- set Interface dpdk0 mtu_request=$MTU \
-- set Interface dpdk1 mtu_request=$MTU"
"""
self.assertEqual(dpdk_bond_config,
self.get_interface_config('dpdkbond0'))
def test_network_ovs_dpdk_bond_with_rx_queue(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
iface0 = objects.Interface(name='nic2')
dpdk0 = objects.OvsDpdkPort(name='dpdk0', members=[iface0])
iface1 = objects.Interface(name='nic3')
dpdk1 = objects.OvsDpdkPort(name='dpdk1', members=[iface1])
bond = objects.OvsDpdkBond('dpdkbond0', rx_queue=4,
members=[dpdk0, dpdk1])
bridge = objects.OvsUserBridge('br-link', members=[bond])
def test_bind_dpdk_interfaces(ifname, driver, noop):
self.assertIn(ifname, ['eth1', 'eth2'])
self.assertEqual(driver, 'vfio-pci')
self.stub_out('os_net_config.utils.bind_dpdk_interfaces',
test_bind_dpdk_interfaces)
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
self.provider.add_ovs_dpdk_bond(bond)
self.provider.add_ovs_user_bridge(bridge)
dpdk_bond_config = """# This file is autogenerated by os-net-config
DEVICE=dpdkbond0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSDPDKBond
OVS_BRIDGE=br-link
BOND_IFACES="dpdk0 dpdk1"
RX_QUEUE=4
OVS_EXTRA="set Interface dpdk0 options:dpdk-devargs=0000:00:08.0 \
-- set Interface dpdk1 options:dpdk-devargs=0000:00:09.0 \
-- set Interface dpdk0 options:n_rxq=$RX_QUEUE \
-- set Interface dpdk1 options:n_rxq=$RX_QUEUE"
"""
self.assertEqual(dpdk_bond_config,
self.get_interface_config('dpdkbond0'))
def test_network_ovs_dpdk_bond_with_mtu_and_rx_queue(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
iface0 = objects.Interface(name='nic2')
dpdk0 = objects.OvsDpdkPort(name='dpdk0', members=[iface0])
iface1 = objects.Interface(name='nic3')
dpdk1 = objects.OvsDpdkPort(name='dpdk1', members=[iface1])
bond = objects.OvsDpdkBond('dpdkbond0', rx_queue=4, mtu=9000,
members=[dpdk0, dpdk1])
bridge = objects.OvsUserBridge('br-link', members=[bond])
def test_bind_dpdk_interfaces(ifname, driver, noop):
self.assertIn(ifname, ['eth1', 'eth2'])
self.assertEqual(driver, 'vfio-pci')
self.stub_out('os_net_config.utils.bind_dpdk_interfaces',
test_bind_dpdk_interfaces)
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
self.provider.add_ovs_dpdk_bond(bond)
self.provider.add_ovs_user_bridge(bridge)
dpdk_bond_config = """# This file is autogenerated by os-net-config
DEVICE=dpdkbond0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSDPDKBond
OVS_BRIDGE=br-link
BOND_IFACES="dpdk0 dpdk1"
RX_QUEUE=4
MTU=9000
OVS_EXTRA="set Interface dpdk0 options:dpdk-devargs=0000:00:08.0 \
-- set Interface dpdk1 options:dpdk-devargs=0000:00:09.0 \
-- set Interface dpdk0 mtu_request=$MTU \
-- set Interface dpdk1 mtu_request=$MTU \
-- set Interface dpdk0 options:n_rxq=$RX_QUEUE \
-- set Interface dpdk1 options:n_rxq=$RX_QUEUE"
"""
self.assertEqual(dpdk_bond_config,
self.get_interface_config('dpdkbond0'))
class TestIfcfgNetConfigApply(base.TestCase):
def setUp(self):
super(TestIfcfgNetConfigApply, self).setUp()
self.temp_ifcfg_file = tempfile.NamedTemporaryFile()
self.temp_bond_file = tempfile.NamedTemporaryFile()
self.temp_route_file = tempfile.NamedTemporaryFile()
self.temp_route6_file = tempfile.NamedTemporaryFile()
self.temp_route_table_file = tempfile.NamedTemporaryFile()
self.temp_rule_file = tempfile.NamedTemporaryFile()
self.temp_bridge_file = tempfile.NamedTemporaryFile()
self.temp_cleanup_file = tempfile.NamedTemporaryFile(delete=False)
self.ifup_interface_names = []
self.ovs_appctl_cmds = []
self.stop_dhclient_interfaces = []
self.ip_reconfigure_commands = []
def test_ifcfg_path(name):
return self.temp_ifcfg_file.name
self.stub_out(
'os_net_config.impl_ifcfg.ifcfg_config_path', test_ifcfg_path)
def test_remove_ifcfg_config(name):
ifcfg_file = self.temp_ifcfg_file.name
if os.path.exists(ifcfg_file):
os.remove(ifcfg_file)
self.stub_out('os_net_config.impl_ifcfg.remove_ifcfg_config',
test_remove_ifcfg_config)
def test_routes_path(name):
return self.temp_route_file.name
self.stub_out(
'os_net_config.impl_ifcfg.route_config_path', test_routes_path)
def test_routes6_path(name):
return self.temp_route6_file.name
self.stub_out(
'os_net_config.impl_ifcfg.route6_config_path', test_routes6_path)
def test_rules_path(name):
return self.temp_rule_file.name
self.stub_out(
'os_net_config.impl_ifcfg.route_rule_config_path', test_rules_path)
def test_route_table_path():
return self.temp_route_table_file.name
self.stub_out(
'os_net_config.impl_ifcfg.route_table_config_path',
test_route_table_path)
utils.write_config(self.temp_route_table_file.name, _RT_CUSTOM)
def test_bridge_path(name):
return self.temp_bridge_file.name
self.stub_out(
'os_net_config.impl_ifcfg.bridge_config_path', test_bridge_path)
def test_cleanup_pattern():
return self.temp_cleanup_file.name
self.stub_out('os_net_config.impl_ifcfg.cleanup_pattern',
test_cleanup_pattern)
def test_stop_dhclient_process(interface):
self.stop_dhclient_interfaces.append(interface)
self.stub_out('os_net_config.impl_ifcfg.stop_dhclient_process',
test_stop_dhclient_process)
def test_execute(*args, **kwargs):
if args[0] == '/sbin/ifup':
self.ifup_interface_names.append(args[1])
elif args[0] == '/bin/ovs-appctl':
self.ovs_appctl_cmds.append(' '.join(args))
elif args[0] == '/sbin/ip' or args[0] == '/usr/sbin/ip':
self.ip_reconfigure_commands.append(' '.join(args[1:]))
pass
self.stub_out('oslo_concurrency.processutils.execute', test_execute)
def stub_is_ovs_installed():
return True
self.stub_out('os_net_config.utils.is_ovs_installed',
stub_is_ovs_installed)
self.provider = impl_ifcfg.IfcfgNetConfig()
def tearDown(self):
self.temp_ifcfg_file.close()
self.temp_route_file.close()
self.temp_route6_file.close()
self.temp_bridge_file.close()
if os.path.exists(self.temp_cleanup_file.name):
self.temp_cleanup_file.close()
super(TestIfcfgNetConfigApply, self).tearDown()
def test_route_table_apply(self):
# Test overriding an existing route table
route_table1 = objects.RouteTable('custom', 10)
self.provider.add_route_table(route_table1)
route_table2 = objects.RouteTable('table1', 200)
self.provider.add_route_table(route_table2)
def test_route_table_path():
return self.temp_route_table_file.name
self.stub_out(
'os_net_config.impl_ifcfg.route_table_config_path',
test_route_table_path)
utils.write_config(self.temp_route_table_file.name, _RT_CUSTOM)
self.provider.apply()
route_table_data = utils.get_file_data(
self.temp_route_table_file.name)
self.assertEqual(_RT_FULL, route_table_data)
def test_reserved_route_table_id(self):
route_table1 = objects.RouteTable('table1', 253)
self.provider.add_route_table(route_table1)
self.assertRaises(os_net_config.ConfigurationError,
self.provider.apply)
def test_reserved_route_table_name(self):
route_table2 = objects.RouteTable('default', 200)
self.provider.add_route_table(route_table2)
self.assertRaises(os_net_config.ConfigurationError,
self.provider.apply)
def test_network_apply(self):
route1 = objects.Route('192.168.1.1', default=True,
route_options="metric 10")
route2 = objects.Route('192.168.1.1', '172.19.0.0/24')
route3 = objects.Route('192.168.1.5', '172.20.0.0/24',
route_options="metric 100")
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr],
routes=[route1, route2, route3])
self.provider.add_interface(interface)
self.provider.apply()
ifcfg_data = utils.get_file_data(self.temp_ifcfg_file.name)
self.assertEqual(_V4_IFCFG, ifcfg_data)
route_data = utils.get_file_data(self.temp_route_file.name)
self.assertEqual(_ROUTES, route_data)
def test_sriov_pf_network_apply(self):
def get_numvfs_stub(pf_name):
return 0
self.stub_out('os_net_config.sriov_config.get_numvfs',
get_numvfs_stub)
route1 = objects.Route('192.168.1.1', default=True,
route_options="metric 10")
route2 = objects.Route('192.168.1.1', '172.19.0.0/24')
route3 = objects.Route('192.168.1.5', '172.20.0.0/24',
route_options="metric 100")
v4_addr = objects.Address('192.168.1.2/24')
pf = objects.SriovPF('enp3s0f0', numvfs=10, addresses=[v4_addr],
promisc=False, routes=[route1, route2, route3])
self.provider.add_sriov_pf(pf)
self.provider.apply()
ifcfg_data = utils.get_file_data(self.temp_ifcfg_file.name)
self.assertEqual(_V4_SRIOV_PF_IFCFG, ifcfg_data)
route_data = utils.get_file_data(self.temp_route_file.name)
self.assertEqual(_ROUTES_PF, route_data)
def test_dhcp_ovs_bridge_network_apply(self):
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.provider.apply()
ifcfg_data = utils.get_file_data(self.temp_ifcfg_file.name)
self.assertEqual(_OVS_INTERFACE, ifcfg_data)
bridge_data = utils.get_file_data(self.temp_bridge_file.name)
self.assertEqual(_OVS_BRIDGE_DHCP, bridge_data)
route_data = utils.get_file_data(self.temp_route_file.name)
self.assertEqual("", route_data)
def test_dhclient_stop_on_iface_activate(self):
self.stop_dhclient_interfaces = []
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr])
interface2 = objects.Interface('em2', use_dhcp=True)
interface3 = objects.Interface('em3', use_dhcp=False)
self.provider.add_interface(interface)
self.provider.add_interface(interface2)
self.provider.add_interface(interface3)
self.provider.apply()
# stop dhclient on em1 due to static IP and em3 due to no IP
self.assertIn('em1', self.stop_dhclient_interfaces)
self.assertIn('em3', self.stop_dhclient_interfaces)
self.assertNotIn('em2', self.stop_dhclient_interfaces)
def test_apply_noactivate(self):
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.provider.apply(activate=False)
self.assertEqual([], self.ifup_interface_names)
def test_bond_active_slave(self):
# setup and apply a bond
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2', primary=True)
bond = objects.OvsBond('bond1', use_dhcp=True,
members=[interface1, interface2])
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.provider.add_bond(bond)
self.provider.apply()
ovs_appctl_cmds = '/bin/ovs-appctl bond/set-active-slave bond1 em2'
self.assertIn(ovs_appctl_cmds, self.ovs_appctl_cmds)
def test_bond_active_ordering(self):
# setup and apply a bond
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2')
bond = objects.OvsBond('bond1', use_dhcp=True,
members=[interface1, interface2])
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.provider.add_bond(bond)
self.provider.apply()
ovs_appctl_cmds = '/bin/ovs-appctl bond/set-active-slave bond1 em1'
self.assertIn(ovs_appctl_cmds, self.ovs_appctl_cmds)
def test_reconfigure_and_apply(self):
route1 = objects.Route('192.168.1.1', default=True)
route2 = objects.Route('192.168.1.1', '172.19.0.0/24')
v4_addr1 = objects.Address('192.168.1.2/24')
interface1 = objects.Interface('em1', addresses=[v4_addr1],
routes=[route1, route2])
self.provider.add_interface(interface1)
self.provider.apply()
self.assertIn('em1', self.ifup_interface_names)
expected_commands = ['addr add 192.168.0.2/23 dev em1',
'addr del 192.168.1.2/24 dev em1',
'link set dev em1 mtu 9000']
v4_addr2 = objects.Address('192.168.0.2/23')
interface2 = objects.Interface('em1', addresses=[v4_addr2],
routes=[route1, route2], mtu=9000)
self.provider.add_interface(interface2)
self.provider.apply()
self.assertEqual(expected_commands, self.ip_reconfigure_commands)
self.ip_reconfigure_commands = []
expected_commands = ['addr add 192.168.1.2/24 dev em1',
'addr del 192.168.0.2/23 dev em1',
'link set dev em1 mtu 1500',
'route del default via 192.168.1.1 dev em1',
'route del 172.19.0.0/24 via 192.168.1.1 dev em1',
'route add default via 192.168.0.1 dev em1',
'route add 172.19.0.0/24 via 192.168.0.1 dev em1']
route3 = objects.Route('192.168.0.1', default=True)
route4 = objects.Route('192.168.0.1', '172.19.0.0/24')
interface3 = objects.Interface('em1', addresses=[v4_addr1],
routes=[route3, route4])
self.provider.add_interface(interface3)
self.provider.apply()
self.assertEqual(expected_commands, self.ip_reconfigure_commands)
def test_change_restart_required(self):
tmpdir = tempfile.mkdtemp()
interface = "eth0"
interface_filename = tmpdir + '/ifcfg-' + interface
file = open(interface_filename, 'w')
file.write(_IFCFG_DHCP)
file.close()
# Changing a dhcp interface to static should not require restart
self.assertFalse(
self.provider.ifcfg_requires_restart(interface_filename,
_IFCFG_STATIC1))
# Changing a standard interface to ovs should require restart
self.assertTrue(
self.provider.ifcfg_requires_restart(interface_filename,
_IFCFG_OVS))
# Changing a static interface to dhcp should require restart
file = open(interface_filename, 'w')
file.write(_IFCFG_STATIC1)
file.close()
self.assertTrue(self.provider.ifcfg_requires_restart(
interface_filename, _IFCFG_DHCP))
# Configuring a previously unconfigured interface requires restart
self.assertTrue(self.provider.ifcfg_requires_restart('/doesnotexist',
_IFCFG_DHCP))
shutil.rmtree(tmpdir)
def test_ifcfg_route_commands(self):
tmpdir = tempfile.mkdtemp()
interface = "eth0"
interface_filename = tmpdir + '/route-' + interface
file = open(interface_filename, 'w')
file.write(_IFCFG_ROUTES1)
file.close()
# Changing only the routes should delete and add routes
command_list1 = ['route del default via 192.0.2.1 dev eth0',
'route del 192.0.2.1/24 via 192.0.2.1 dev eth0',
'route add default via 192.0.1.1 dev eth0',
'route add 192.0.1.1/24 via 192.0.3.1 dev eth1']
commands = self.provider.iproute2_route_commands(interface_filename,
_IFCFG_ROUTES2)
self.assertTrue(commands == command_list1)
def test_ifcfg_ipmtu_commands(self):
tmpdir = tempfile.mkdtemp()
interface = "eth0"
interface_filename = tmpdir + '/ifcfg-' + interface
file = open(interface_filename, 'w')
file.write(_IFCFG_STATIC1)
file.close()
# Changing only the IP should delete and add the IP
command_list1 = ['addr add 10.0.1.2/23 dev eth0',
'addr del 10.0.0.1/24 dev eth0']
commands = self.provider.iproute2_apply_commands(interface,
interface_filename,
_IFCFG_STATIC2)
self.assertTrue(commands == command_list1)
# Changing only the MTU should just set the interface MTU
command_list2 = ['link set dev eth0 mtu 9000']
commands = self.provider.iproute2_apply_commands(interface,
interface_filename,
_IFCFG_STATIC1_MTU)
self.assertTrue(commands == command_list2)
# Changing both the IP and MTU should delete IP, add IP, and set MTU
command_list3 = ['addr add 10.0.1.2/23 dev eth0',
'addr del 10.0.0.1/24 dev eth0',
'link set dev eth0 mtu 9000']
commands = self.provider.iproute2_apply_commands(interface,
interface_filename,
_IFCFG_STATIC2_MTU)
self.assertTrue(commands == command_list3)
def test_restart_children_on_change(self):
# setup and apply a bridge
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.provider.apply()
self.assertIn('em1', self.ifup_interface_names)
self.assertIn('br-ctlplane', self.ifup_interface_names)
# changing the bridge should restart the interface too
self.ifup_interface_names = []
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=False,
members=[interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.provider.apply()
self.assertIn('em1', self.ifup_interface_names)
# test infiniband interfaces act as proper bridge members
ib_interface = objects.IbInterface('ib0')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[ib_interface])
self.provider.add_interface(ib_interface)
self.provider.add_bridge(bridge)
self.provider.apply()
self.assertIn('ib0', self.ifup_interface_names)
self.assertIn('br-ctlplane', self.ifup_interface_names)
# changing the bridge should restart the interface too
self.ifup_interface_names = []
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=False,
members=[ib_interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.provider.apply()
self.assertIn('ib0', self.ifup_interface_names)
# setup and apply a bond on a bridge
self.ifup_interface_names = []
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2')
bond = objects.OvsBond('bond0',
members=[interface1, interface2])
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[bond])
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.provider.add_bond(bond)
self.provider.add_bridge(bridge)
self.provider.apply()
# changing the bridge should restart everything
self.ifup_interface_names = []
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=False,
members=[bond])
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.provider.add_bond(bond)
self.provider.add_bridge(bridge)
self.provider.apply()
self.assertIn('br-ctlplane', self.ifup_interface_names)
self.assertIn('bond0', self.ifup_interface_names)
self.assertIn('em1', self.ifup_interface_names)
self.assertIn('em2', self.ifup_interface_names)
def test_restart_vlans_on_bond_change(self):
self.ifup_interface_names = []
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2')
bond = objects.LinuxBond('bond0', members=[interface1, interface2])
vlan = objects.Vlan('bond0', 10)
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.provider.add_linux_bond(bond)
self.provider.add_vlan(vlan)
self.provider.apply()
self.assertIn('bond0', self.ifup_interface_names)
self.assertIn('em1', self.ifup_interface_names)
self.assertIn('em2', self.ifup_interface_names)
self.assertIn('vlan10', self.ifup_interface_names)
# Change the bond configuration
self.ifup_interface_names = []
bond.bonding_options = 'mode=1 miimon=100'
self.provider.add_linux_bond(bond)
self.provider.apply()
self.assertIn('bond0', self.ifup_interface_names)
self.assertIn('em1', self.ifup_interface_names)
self.assertIn('em2', self.ifup_interface_names)
self.assertIn('vlan10', self.ifup_interface_names)
def test_linux_bond_ifup_on_child_restart(self):
# if a bond slave is restarted then ifup should be called on the bond
self.ifup_interface_names = []
interface1 = objects.Interface('em1')
bond = objects.LinuxBond('bond0', members=[interface1])
self.provider.add_linux_bond(bond)
self.provider.add_interface(interface1)
self.provider.apply()
self.assertIn('em1', self.ifup_interface_names)
self.assertIn('bond0', self.ifup_interface_names)
self.ifup_interface_names = []
# changing mtu should not require a restart of em1 (or the bond)
interface1.mtu = 9000
self.provider.linuxbond_data = {}
self.provider.interface_data = {}
self.provider.add_linux_bond(bond)
self.provider.add_interface(interface1)
self.provider.apply()
self.assertNotIn('em1', self.ifup_interface_names)
self.assertNotIn('bond0', self.ifup_interface_names)
# changing nm_controlled should require a restart of em1 (and the bond)
interface1.nm_controlled = True
self.provider.add_interface(interface1)
self.provider.apply()
self.assertIn('em1', self.ifup_interface_names)
self.assertIn('bond0', self.ifup_interface_names)
def test_restart_interface_counts(self):
interface = objects.Interface('em1')
interface2 = objects.Interface('em2')
bond = objects.LinuxBond('bond0', members=[interface, interface2])
self.provider.add_bond(bond)
self.provider.add_interface(interface)
self.provider.add_interface(interface2)
self.provider.apply()
self.assertEqual(1, self.ifup_interface_names.count("em1"))
self.assertEqual(1, self.ifup_interface_names.count("em2"))
self.assertEqual(1, self.ifup_interface_names.count("bond0"))
def test_vlan_apply(self):
vlan = objects.Vlan('em1', 5)
self.provider.add_vlan(vlan)
self.provider.apply()
ifcfg_data = utils.get_file_data(self.temp_ifcfg_file.name)
self.assertEqual(_VLAN_NO_IP, ifcfg_data)
def test_cleanup(self):
self.provider.apply(cleanup=True)
self.assertTrue(not os.path.exists(self.temp_cleanup_file.name))
def test_cleanup_not_loopback(self):
tmp_lo_file = '%s-lo' % self.temp_cleanup_file.name
utils.write_config(tmp_lo_file, 'foo')
def test_cleanup_pattern():
return '%s-*' % self.temp_cleanup_file.name
self.stub_out('os_net_config.impl_ifcfg.cleanup_pattern',
test_cleanup_pattern)
self.provider.apply(cleanup=True)
self.assertTrue(os.path.exists(tmp_lo_file))
os.remove(tmp_lo_file)
def test_ovs_restart_called(self):
interface = objects.Interface('em1')
dpdk_port = objects.OvsDpdkPort('dpdk0', members=[interface])
execute_strings = []
def test_execute(*args, **kwargs):
execute_strings.append(args[1])
self.stub_out('os_net_config.NetConfig.execute', test_execute)
self.provider.noop = True
self.provider.add_ovs_dpdk_port(dpdk_port)
self.provider.apply()
self.assertIn('Restart openvswitch', execute_strings)
def test_ovs_restart_not_called(self):
interface = objects.Interface('em1')
execute_strings = []
def test_execute(*args, **kwargs):
execute_strings.append(args[1])
self.stub_out('os_net_config.NetConfig.execute', test_execute)
self.provider.noop = True
self.provider.add_interface(interface)
self.provider.apply()
self.assertNotIn('Restart openvswitch', execute_strings)
def _failed_execute(*args, **kwargs):
if kwargs.get('check_exit_code', True):
raise processutils.ProcessExecutionError('Test stderr',
'Test stdout',
str(kwargs))
def test_interface_failure(self):
self.stub_out('oslo_concurrency.processutils.execute',
self._failed_execute)
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr])
self.provider.add_interface(interface)
self.assertRaises(os_net_config.ConfigurationError,
self.provider.apply)
self.assertEqual(1, len(self.provider.errors))
def test_interface_failure_multiple(self):
self.stub_out('oslo_concurrency.processutils.execute',
self._failed_execute)
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr])
v4_addr2 = objects.Address('192.168.2.2/24')
interface2 = objects.Interface('em2', addresses=[v4_addr2])
self.provider.add_interface(interface)
self.provider.add_interface(interface2)
self.assertRaises(os_net_config.ConfigurationError,
self.provider.apply)
# Even though the first one failed, we should have attempted both
self.assertEqual(2, len(self.provider.errors))