Files
os-net-config/os_net_config/tests/test_impl_ifcfg.py
Dan Sneddon c9bf347a23 Apply IP/netmask/route/MTU changes without bouncing interfaces.
Modifications to impl_ifcfg.py to support making changes to IP
address, MTU, and routes live, rather than running ifdown/ifup on the
interface. Bouncing the interface is very disruptive, especially with
bridges where all the child interfaces are also bounced.

If only the IP address, netmask, MTU, routes, and/or boot status are
changed, the file will be written and 'ip' or 'ip route' commands
will be run to apply the new config on the fly. If any commands fail,
the interface will be restarted instead.

Closes-bug: 1795129
Change-Id: I75239fd3f273a88feb80f09a67aa5a947d64ac30
2018-11-06 16:05:43 -08:00

2071 lines
74 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os.path
import random
import shutil
import tempfile
from oslo_concurrency import processutils
import os_net_config
from os_net_config import impl_ifcfg
from os_net_config import objects
from os_net_config import sriov_config
from os_net_config.tests import base
from os_net_config import utils
_BASE_IFCFG = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
"""
_BASE_IFCFG_NETWORKMANAGER = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=yes
PEERDNS=no
"""
_HOTPLUG = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=yes
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
_ONBOOT = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
_NO_IP = _BASE_IFCFG + "BOOTPROTO=none\n"
_V4_IFCFG = _BASE_IFCFG + """BOOTPROTO=static
IPADDR=192.168.1.2
NETMASK=255.255.255.0
"""
_V4_V6_IFCFG = _BASE_IFCFG + """IPV6INIT=yes
BOOTPROTO=static
IPADDR=192.168.1.2
NETMASK=255.255.255.0
IPV6_AUTOCONF=no
IPV6ADDR=2001:abc:a::/64
"""
_IFCFG_VLAN = """# This file is autogenerated by os-net-config
DEVICE=em1.120
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
VLAN=yes
BOOTPROTO=none
"""
_IFCFG_DHCP = """# This file is autogenerated by os-net-config
DEVICE="eth0"
BOOTPROTO="dhcp"
ONBOOT="yes"
TYPE="Ethernet"
"""
_IFCFG_STATIC1 = """# This file is autogenerated by os-net-config
DEVICE=eth0
BOOTPROTO=static
IPADDR=10.0.0.1
NETMASK=255.255.255.0
TYPE=Ethernet
ONBOOT=yes
"""
_IFCFG_STATIC1_MTU = _IFCFG_STATIC1 + "\nMTU=9000"
_IFCFG_STATIC2 = """DEVICE=eth0
BOOTPROTO=static
IPADDR=10.0.1.2
NETMASK=255.255.254.0
TYPE=Ethernet
ONBOOT=yes
"""
_IFCFG_STATIC2_MTU = _IFCFG_STATIC2 + "\nMTU=9000"
_IFCFG_OVS = """DEVICE=eth0
ONBOOT=yes
DEVICETYPE=ovs
TYPE=OVSPort
OVS_BRIDGE=brctlplane
BOOTPROTO=none
HOTPLUG=no
"""
_IFCFG_ROUTES1 = """default via 192.0.2.1 dev eth0
192.0.2.1/24 via 192.0.2.1 dev eth0
"""
_IFCFG_ROUTES2 = """default via 192.0.1.1 dev eth0
192.0.1.1/24 via 192.0.3.1 dev eth1
"""
_V4_IFCFG_MAPPED = _V4_IFCFG.replace('em1', 'nic1') + "HWADDR=a1:b2:c3:d4:e5\n"
_BASE_IB_IFCFG = """# This file is autogenerated by os-net-config
DEVICE=ib0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
TYPE=Infiniband
"""
_V4_IB_IFCFG = _BASE_IB_IFCFG + """BOOTPROTO=static
IPADDR=192.168.1.2
NETMASK=255.255.255.0
"""
_V4_IFCFG_MULTIPLE = _V4_IFCFG + """IPADDR1=192.168.1.3
NETMASK1=255.255.255.255
IPADDR2=10.0.0.2
NETMASK2=255.0.0.0
"""
_IB_V4_IFCFG_MULTIPLE = _V4_IB_IFCFG + """IPADDR1=192.168.1.3
NETMASK1=255.255.255.255
IPADDR2=10.0.0.2
NETMASK2=255.0.0.0
"""
_V6_IFCFG = _BASE_IFCFG + """IPV6INIT=yes
IPV6_AUTOCONF=no
IPV6ADDR=2001:abc:a::/64
"""
_V6_IFCFG_MULTIPLE = (_V6_IFCFG + "IPV6ADDR_SECONDARIES=\"2001:abc:b::1/64 " +
"2001:abc:c::2/96\"\n")
_OVS_IFCFG = _BASE_IFCFG + "DEVICETYPE=ovs\nBOOTPROTO=none\n"
_OVS_IFCFG_TUNNEL = """# This file is autogenerated by os-net-config
DEVICE=tun0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSTunnel
OVS_BRIDGE=br-ctlplane
OVS_TUNNEL_TYPE=gre
OVS_TUNNEL_OPTIONS="options:remote_ip=192.168.1.1"
"""
_OVS_BRIDGE_IFCFG = _BASE_IFCFG + "DEVICETYPE=ovs\n"
_LINUX_BRIDGE_IFCFG = _BASE_IFCFG + "BRIDGE=br-ctlplane\nBOOTPROTO=none\n"
_ROUTES = """default via 192.168.1.1 dev em1 metric 10
172.19.0.0/24 via 192.168.1.1 dev em1
172.20.0.0/24 via 192.168.1.5 dev em1 metric 100
"""
_ROUTES_V6 = """default via 2001:db8::1 dev em1
2001:db8:dead:beef:cafe::/56 via fd00:fd00:2000::1 dev em1
2001:db8:dead:beff::/64 via fd00:fd00:2000::1 dev em1 metric 100
"""
_OVS_INTERFACE = _BASE_IFCFG + """DEVICETYPE=ovs
TYPE=OVSPort
OVS_BRIDGE=br-ctlplane
BOOTPROTO=none
"""
_OVS_BRIDGE_DHCP = """# This file is autogenerated by os-net-config
DEVICE=br-ctlplane
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
DEVICETYPE=ovs
TYPE=OVSBridge
OVSBOOTPROTO=dhcp
OVSDHCPINTERFACES="em1"
"""
_NM_CONTROLLED_INTERFACE = _BASE_IFCFG_NETWORKMANAGER + """MASTER=bond1
SLAVE=yes
BOOTPROTO=none
"""
_NM_CONTROLLED_BOND = """# This file is autogenerated by os-net-config
DEVICE=bond1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=yes
PEERDNS=no
"""
_OVS_BRIDGE_DHCP_STANDALONE = _OVS_BRIDGE_DHCP + (
"OVS_EXTRA=\"set bridge br-ctlplane fail_mode=standalone "
"-- del-controller br-ctlplane\"\n")
_OVS_BRIDGE_DHCP_SECURE = _OVS_BRIDGE_DHCP + \
"OVS_EXTRA=\"set bridge br-ctlplane fail_mode=secure\"\n"
_LINUX_BRIDGE_DHCP = """# This file is autogenerated by os-net-config
DEVICE=br-ctlplane
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
TYPE=Bridge
DELAY=0
BOOTPROTO=dhcp
"""
_OVS_BRIDGE_STATIC = """# This file is autogenerated by os-net-config
DEVICE=br-ctlplane
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSBridge
BOOTPROTO=static
IPADDR=192.168.1.2
NETMASK=255.255.255.0
"""
_LINUX_BRIDGE_STATIC = """# This file is autogenerated by os-net-config
DEVICE=br-ctlplane
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
TYPE=Bridge
DELAY=0
BOOTPROTO=static
IPADDR=192.168.1.2
NETMASK=255.255.255.0
"""
_OVS_BRIDGE_DHCP_PRIMARY_INTERFACE = _OVS_BRIDGE_DHCP + \
"OVS_EXTRA=\"set bridge br-ctlplane other-config:hwaddr=a1:b2:c3:d4:e5\"\n"
_OVS_BRIDGE_DHCP_OVS_EXTRA = _OVS_BRIDGE_DHCP + \
"OVS_EXTRA=\"set bridge br-ctlplane other-config:hwaddr=a1:b2:c3:d4:e5" + \
" -- br-set-external-id br-ctlplane bridge-id br-ctlplane\"\n"
_BASE_VLAN = """# This file is autogenerated by os-net-config
DEVICE=vlan5
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
VLAN=yes
PHYSDEV=em1
"""
# vlans on an OVS bridge do not set VLAN=yes or PHYSDEV
_BASE_VLAN_OVS = """# This file is autogenerated by os-net-config
DEVICE=vlan5
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
"""
_VLAN_NO_IP = _BASE_VLAN + "BOOTPROTO=none\n"
_VLAN_OVS = _BASE_VLAN_OVS + "DEVICETYPE=ovs\nBOOTPROTO=none\n"
_VLAN_OVS_BRIDGE = _BASE_VLAN_OVS + """DEVICETYPE=ovs
TYPE=OVSIntPort
OVS_BRIDGE=br-ctlplane
OVS_OPTIONS="tag=5"
BOOTPROTO=none
"""
_VLAN_LINUX_BRIDGE = _BASE_VLAN_OVS + """VLAN=yes
PHYSDEV=em1
BRIDGE=br-ctlplane
BOOTPROTO=none
"""
_OVS_BOND_DHCP = """# This file is autogenerated by os-net-config
DEVICE=bond0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
DEVICETYPE=ovs
TYPE=OVSBond
OVSBOOTPROTO=dhcp
BOND_IFACES="em1 em2"
"""
_LINUX_BOND_DHCP = """# This file is autogenerated by os-net-config
DEVICE=bond0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
BOOTPROTO=dhcp
"""
_LINUX_TEAM_DHCP = """# This file is autogenerated by os-net-config
DEVICE=team0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
BOOTPROTO=dhcp
DEVICETYPE=Team
"""
_LINUX_BOND_INTERFACE = _BASE_IFCFG + """MASTER=bond0
SLAVE=yes
BOOTPROTO=none
"""
_LINUX_TEAM_INTERFACE = _BASE_IFCFG + """TEAM_MASTER=team0
BOOTPROTO=none
"""
_IVS_UPLINK = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ivs
IVS_BRIDGE=ivs
BOOTPROTO=none
"""
_IVS_INTERFACE = """# This file is autogenerated by os-net-config
DEVICE=storage5
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
TYPE=IVSIntPort
DEVICETYPE=ivs
IVS_BRIDGE=ivs
MTU=1500
BOOTPROTO=static
IPADDR=172.16.2.7
NETMASK=255.255.255.0
"""
_IVS_CONFIG = ('DAEMON_ARGS=\"--hitless --certificate /etc/ivs '
'--inband-vlan 4092 -u em1 --internal-port=storage5\"')
_NFVSWITCH_INTERFACE = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=nfvswitch
NFVSWITCH_BRIDGE=nfvswitch
BOOTPROTO=none
"""
_NFVSWITCH_INTERNAL = """# This file is autogenerated by os-net-config
DEVICE=storage5
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
TYPE=NFVSWITCHIntPort
DEVICETYPE=nfvswitch
NFVSWITCH_BRIDGE=nfvswitch
MTU=1500
BOOTPROTO=static
IPADDR=172.16.2.7
NETMASK=255.255.255.0
"""
_NFVSWITCH_CONFIG = ('SETUP_ARGS=\"-c 2,3,4,5 -u em1 -m storage5\"')
_OVS_IFCFG_PATCH_PORT = """# This file is autogenerated by os-net-config
DEVICE=br-pub-patch
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSPatchPort
OVS_BRIDGE=br-ex
OVS_PATCH_PEER=br-ex-patch
"""
_LINUX_TEAM_PRIMARY_IFACE = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
TEAM_MASTER=team1
TEAM_PORT_CONFIG='{"prio": 100}'
BOOTPROTO=none
"""
_CONTRAIL_VROUTER_IFACE = """# This file is autogenerated by os-net-config
DEVICE=vhost0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
DEVICETYPE=vhost
TYPE=kernel_mode
BIND_INT=em3
"""
_CONTRAIL_VROUTER_VLAN_IFACE = """# This file is autogenerated by os-net-config
DEVICE=vhost0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
DEVICETYPE=vhost
TYPE=kernel_mode
BIND_INT=vlan100
"""
_CONTRAIL_VROUTER_DPDK_IFACE = """# This file is autogenerated by os-net-config
DEVICE=vhost0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
DEVICETYPE=vhost
TYPE=dpdk
BIND_INT=0000:00:03.0
DRIVER=uio_pci_generic
CPU_LIST=0-31
"""
_CONTRAIL_VROUTER_DPDK_IFACE_CUST_DRIVER = """# This file is autogenerated by \
os-net-config
DEVICE=vhost0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
DEVICETYPE=vhost
TYPE=dpdk
BIND_INT=0000:00:03.0
DRIVER=vfio
CPU_LIST=0-31
"""
_CONTRAIL_VROUTER_DPDK_BOND_IFACE = """# This file is autogenerated by \
os-net-config
DEVICE=vhost0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
DEVICETYPE=vhost
TYPE=dpdk
BIND_INT=0000:00:03.0,0000:00:01.0
BOND_MODE=2
BOND_POLICY=802.3ad
DRIVER=uio_pci_generic
CPU_LIST=2,3
"""
class TestIfcfgNetConfig(base.TestCase):
def setUp(self):
super(TestIfcfgNetConfig, self).setUp()
rand = str(int(random.random() * 100000))
sriov_config._SRIOV_CONFIG_FILE = '/tmp/sriov_config_' + rand + '.yaml'
self.provider = impl_ifcfg.IfcfgNetConfig()
def stub_is_ovs_installed():
return True
self.stub_out('os_net_config.utils.is_ovs_installed',
stub_is_ovs_installed)
def tearDown(self):
super(TestIfcfgNetConfig, self).tearDown()
if os.path.isfile(sriov_config._SRIOV_CONFIG_FILE):
os.remove(sriov_config._SRIOV_CONFIG_FILE)
def get_interface_config(self, name='em1'):
return self.provider.interface_data[name]
def get_vlan_config(self, name='vlan1'):
return self.provider.vlan_data[name]
def get_linux_bond_config(self, name='bond0'):
return self.provider.linuxbond_data[name]
def get_linux_team_config(self, name='team0'):
return self.provider.linuxteam_data[name]
def get_route_config(self, name='em1'):
return self.provider.route_data.get(name, '')
def get_route6_config(self, name='em1'):
return self.provider.route6_data.get(name, '')
def stub_get_stored_pci_address(self, ifname, noop):
if 'eth0' in ifname:
return "0000:00:07.0"
if 'eth1' in ifname:
return "0000:00:08.0"
if 'eth2' in ifname:
return "0000:00:09.0"
if 'em3' in ifname:
return "0000:00:03.0"
if 'em1' in ifname:
return "0000:00:01.0"
def test_add_base_interface(self):
interface = objects.Interface('em1')
self.provider.add_interface(interface)
self.assertEqual(_NO_IP, self.get_interface_config())
def test_add_interface_with_hotplug(self):
interface = objects.Interface('em1', hotplug=True)
self.provider.add_interface(interface)
self.assertEqual(_HOTPLUG, self.get_interface_config())
def test_add_interface_with_onboot(self):
interface = objects.Interface('em1', onboot=True)
self.provider.add_interface(interface)
self.assertEqual(_ONBOOT, self.get_interface_config())
def test_add_base_interface_vlan(self):
interface = objects.Interface('em1.120')
self.provider.add_interface(interface)
self.assertEqual(_IFCFG_VLAN, self.get_interface_config('em1.120'))
def test_add_ovs_interface(self):
interface = objects.Interface('em1')
interface.ovs_port = True
self.provider.add_interface(interface)
self.assertEqual(_OVS_IFCFG, self.get_interface_config())
def test_add_ovs_tunnel(self):
interface = objects.OvsTunnel('tun0')
interface.type = 'ovs_tunnel'
interface.tunnel_type = 'gre'
interface.ovs_options = ['options:remote_ip=192.168.1.1']
interface.bridge_name = 'br-ctlplane'
self.provider.add_interface(interface)
self.assertEqual(_OVS_IFCFG_TUNNEL, self.get_interface_config('tun0'))
def test_add_ovs_patch_port(self):
patch_port = objects.OvsPatchPort("br-pub-patch")
patch_port.type = 'ovs_patch_port'
patch_port.bridge_name = 'br-ex'
patch_port.peer = 'br-ex-patch'
self.provider.add_interface(patch_port)
self.assertEqual(_OVS_IFCFG_PATCH_PORT,
self.get_interface_config('br-pub-patch'))
def test_add_interface_with_v4(self):
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr])
self.provider.add_interface(interface)
self.assertEqual(_V4_IFCFG, self.get_interface_config())
self.assertEqual('', self.get_route_config())
def test_add_interface_with_v4_multiple(self):
addresses = [objects.Address('192.168.1.2/24'),
objects.Address('192.168.1.3/32'),
objects.Address('10.0.0.2/8')]
interface = objects.Interface('em1', addresses=addresses)
self.provider.add_interface(interface)
self.assertEqual(_V4_IFCFG_MULTIPLE, self.get_interface_config())
self.assertEqual('', self.get_route_config())
def test_add_interface_map_persisted(self):
def test_interface_mac(name):
macs = {'em1': 'a1:b2:c3:d4:e5'}
return macs[name]
self.stub_out('os_net_config.utils.interface_mac', test_interface_mac)
nic_mapping = {'nic1': 'em1'}
self.stubbed_mapped_nics = nic_mapping
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('nic1', addresses=[v4_addr],
nic_mapping=nic_mapping,
persist_mapping=True)
self.assertEqual('a1:b2:c3:d4:e5', interface.hwaddr)
self.provider.add_interface(interface)
self.assertEqual(_V4_IFCFG_MAPPED, self.get_interface_config('nic1'))
self.assertEqual('', self.get_route_config('nic1'))
def test_add_interface_with_v6(self):
v6_addr = objects.Address('2001:abc:a::/64')
interface = objects.Interface('em1', addresses=[v6_addr])
self.provider.add_interface(interface)
self.assertEqual(_V6_IFCFG, self.get_interface_config())
def test_add_interface_with_v6_multiple(self):
addresses = [objects.Address('2001:abc:a::/64'),
objects.Address('2001:abc:b::1/64'),
objects.Address('2001:abc:c::2/96')]
interface = objects.Interface('em1', addresses=addresses)
self.provider.add_interface(interface)
self.assertEqual(_V6_IFCFG_MULTIPLE, self.get_interface_config())
def test_network_with_routes(self):
route1 = objects.Route('192.168.1.1', default=True,
route_options="metric 10")
route2 = objects.Route('192.168.1.1', '172.19.0.0/24')
route3 = objects.Route('192.168.1.5', '172.20.0.0/24',
route_options="metric 100")
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr],
routes=[route1, route2, route3])
self.provider.add_interface(interface)
self.assertEqual(_V4_IFCFG, self.get_interface_config())
self.assertEqual(_ROUTES, self.get_route_config())
def test_network_with_ipv6_routes(self):
route1 = objects.Route('192.168.1.1', default=True,
route_options="metric 10")
route2 = objects.Route('192.168.1.1', '172.19.0.0/24')
route3 = objects.Route('192.168.1.5', '172.20.0.0/24',
route_options="metric 100")
route4 = objects.Route('2001:db8::1', default=True)
route5 = objects.Route('fd00:fd00:2000::1',
'2001:db8:dead:beef:cafe::/56')
route6 = objects.Route('fd00:fd00:2000::1',
'2001:db8:dead:beff::/64',
route_options="metric 100")
v4_addr = objects.Address('192.168.1.2/24')
v6_addr = objects.Address('2001:abc:a::/64')
interface = objects.Interface('em1', addresses=[v4_addr, v6_addr],
routes=[route1, route2, route3,
route4, route5, route6])
self.provider.add_interface(interface)
self.assertEqual(_V4_V6_IFCFG, self.get_interface_config())
self.assertEqual(_ROUTES_V6, self.get_route6_config())
def test_network_ovs_bridge_with_dhcp(self):
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_DHCP,
self.provider.bridge_data['br-ctlplane'])
def test_network_ovs_bridge_with_standalone_fail_mode(self):
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface],
fail_mode='standalone')
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_DHCP_STANDALONE,
self.provider.bridge_data['br-ctlplane'])
def test_network_ovs_bridge_with_secure_fail_mode(self):
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface],
fail_mode='secure')
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_DHCP_SECURE,
self.provider.bridge_data['br-ctlplane'])
def test_network_linux_bridge_with_dhcp(self):
interface = objects.Interface('em1')
bridge = objects.LinuxBridge('br-ctlplane', use_dhcp=True,
members=[interface])
self.provider.add_linux_bridge(bridge)
self.provider.add_interface(interface)
self.assertEqual(_LINUX_BRIDGE_IFCFG, self.get_interface_config())
self.assertEqual(_LINUX_BRIDGE_DHCP,
self.provider.linuxbridge_data['br-ctlplane'])
def test_network_ovs_bridge_static(self):
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', members=[interface],
addresses=[v4_addr])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_STATIC,
self.provider.bridge_data['br-ctlplane'])
def test_network_ovs_bridge_with_tunnel(self):
interface = objects.OvsTunnel('tun0')
interface.type = 'ovs_tunnel'
interface.tunnel_type = 'gre'
interface.ovs_options = ['options:remote_ip=192.168.1.1']
interface.bridge_name = 'br-ctlplane'
self.provider.add_interface(interface)
v4_addr = objects.Address('192.168.1.2/24')
bridge = objects.OvsBridge('br-ctlplane', members=[interface],
addresses=[v4_addr])
self.provider.add_bridge(bridge)
self.provider.add_interface(interface)
self.assertEqual(_OVS_IFCFG_TUNNEL, self.get_interface_config('tun0'))
self.assertEqual(_OVS_BRIDGE_STATIC,
self.provider.bridge_data['br-ctlplane'])
def test_network_linux_bridge_static(self):
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1')
bridge = objects.LinuxBridge('br-ctlplane', members=[interface],
addresses=[v4_addr])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_LINUX_BRIDGE_IFCFG, self.get_interface_config())
self.assertEqual(_LINUX_BRIDGE_STATIC,
self.provider.bridge_data['br-ctlplane'])
def test_network_ovs_bridge_with_dhcp_primary_interface(self):
def test_interface_mac(name):
return "a1:b2:c3:d4:e5"
self.stub_out('os_net_config.utils.interface_mac', test_interface_mac)
interface = objects.Interface('em1', primary=True)
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_DHCP_PRIMARY_INTERFACE,
self.provider.bridge_data['br-ctlplane'])
def test_network_ovs_bridge_with_dhcp_primary_interface_with_extra(self):
def test_interface_mac(name):
return "a1:b2:c3:d4:e5"
self.stub_out('os_net_config.utils.interface_mac', test_interface_mac)
interface = objects.Interface('em1', primary=True)
ovs_extra = "br-set-external-id br-ctlplane bridge-id br-ctlplane"
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface],
ovs_extra=[ovs_extra])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_DHCP_OVS_EXTRA,
self.provider.bridge_data['br-ctlplane'])
def test_network_ovs_bridge_with_dhcp_primary_interface_with_format(self):
def test_interface_mac(name):
return "a1:b2:c3:d4:e5"
self.stub_out('os_net_config.utils.interface_mac', test_interface_mac)
interface = objects.Interface('em1', primary=True)
ovs_extra = "br-set-external-id {name} bridge-id {name}"
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface],
ovs_extra=[ovs_extra])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.assertEqual(_OVS_INTERFACE, self.get_interface_config())
self.assertEqual(_OVS_BRIDGE_DHCP_OVS_EXTRA,
self.provider.bridge_data['br-ctlplane'])
def test_network_ivs_with_uplink_and_interface(self):
interface = objects.Interface('em1')
v4_addr = objects.Address('172.16.2.7/24')
ivs_interface = objects.IvsInterface(vlan_id=5,
name='storage',
addresses=[v4_addr])
bridge = objects.IvsBridge(members=[interface, ivs_interface])
self.provider.add_interface(interface)
self.provider.add_ivs_interface(ivs_interface)
self.provider.add_bridge(bridge)
self.assertEqual(_IVS_UPLINK, self.get_interface_config())
self.assertEqual(_IVS_INTERFACE,
self.provider.ivsinterface_data[ivs_interface.name])
data = self.provider.generate_ivs_config(['em1'], ['storage5'])
self.assertEqual(_IVS_CONFIG, data)
def test_network_nfvswitch_with_interfaces_and_internal_interfaces(self):
interface = objects.Interface('em1')
v4_addr = objects.Address('172.16.2.7/24')
nfvswitch_internal = objects.NfvswitchInternal(vlan_id=5,
name='storage',
addresses=[v4_addr])
iface_name = nfvswitch_internal.name
bridge = objects.NfvswitchBridge(members=[interface,
nfvswitch_internal],
options="-c 2,3,4,5")
self.provider.add_interface(interface)
self.provider.add_nfvswitch_internal(nfvswitch_internal)
self.provider.add_nfvswitch_bridge(bridge)
self.assertEqual(_NFVSWITCH_INTERFACE, self.get_interface_config())
self.assertEqual(_NFVSWITCH_INTERNAL,
self.provider.nfvswitch_intiface_data[iface_name])
data = self.provider.generate_nfvswitch_config(['em1'], ['storage5'])
self.assertEqual(_NFVSWITCH_CONFIG, data)
def test_add_ib_interface_with_v4_multiple(self):
addresses = [objects.Address('192.168.1.2/24'),
objects.Address('192.168.1.3/32'),
objects.Address('10.0.0.2/8')]
ib_interface = objects.IbInterface('ib0', addresses=addresses)
self.provider.add_interface(ib_interface)
self.assertEqual(_IB_V4_IFCFG_MULTIPLE,
self.get_interface_config('ib0'))
self.assertEqual('', self.get_route_config())
def test_add_contrail_vrouter(self):
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('em3')
cvi = objects.ContrailVrouter('vhost0', addresses=addresses,
members=[interface1],)
self.provider.add_contrail_vrouter(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_vlan(self):
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('vlan100')
cvi = objects.ContrailVrouter('vhost0', addresses=addresses,
members=[interface1],)
self.provider.add_contrail_vrouter(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_VLAN_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_with_nic_mapping(self):
nic_mapping = {'nic1': 'em3'}
self.stubbed_mapped_nics = nic_mapping
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('nic1')
cvi = objects.ContrailVrouter('vhost0', addresses=addresses,
members=[interface1])
self.provider.add_contrail_vrouter(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_dpdk_interface(self):
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('em3')
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
cvi = objects.ContrailVrouterDpdk('vhost0', addresses=addresses,
members=[interface1])
self.provider.noop = True
self.provider.add_contrail_vrouter_dpdk(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_DPDK_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_dpdk_interface_cust_driver(self):
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('em3')
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
cvi = objects.ContrailVrouterDpdk('vhost0', addresses=addresses,
members=[interface1], driver='vfio')
self.provider.noop = True
self.provider.add_contrail_vrouter_dpdk(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_DPDK_IFACE_CUST_DRIVER,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_dpdk_interface_nic_mapping(self):
nic_mapping = {'nic1': 'em3'}
self.stubbed_mapped_nics = nic_mapping
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('nic1')
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
cvi = objects.ContrailVrouterDpdk('vhost0', addresses=addresses,
members=[interface1])
self.provider.noop = True
self.provider.add_contrail_vrouter_dpdk(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_DPDK_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_dpdk_bond_interface(self):
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('em3')
interface2 = objects.Interface('em1')
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
cvi = objects.ContrailVrouterDpdk('vhost0', addresses=addresses,
members=[interface1, interface2],
bond_mode="2",
bond_policy="802.3ad",
cpu_list="2,3")
self.provider.noop = True
self.provider.add_contrail_vrouter_dpdk(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_DPDK_BOND_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_contrail_vrouter_dpdk_bond_interface_nic_mapping(self):
nic_mapping = {'nic1': 'em3', 'nic2': 'em1'}
self.stubbed_mapped_nics = nic_mapping
addresses = [objects.Address('10.0.0.30/24')]
interface1 = objects.Interface('nic1')
interface2 = objects.Interface('nic2')
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
cvi = objects.ContrailVrouterDpdk('vhost0', addresses=addresses,
members=[interface1, interface2],
bond_mode="2",
bond_policy="802.3ad",
cpu_list="2,3")
self.provider.noop = True
self.provider.add_contrail_vrouter_dpdk(cvi)
self.assertEqual(
_CONTRAIL_VROUTER_DPDK_BOND_IFACE,
self.provider.interface_data['vhost0'])
self.assertEqual('', self.get_route_config('vhost0'))
def test_add_vlan(self):
vlan = objects.Vlan('em1', 5)
self.provider.add_vlan(vlan)
self.assertEqual(_VLAN_NO_IP, self.get_vlan_config('vlan5'))
def test_add_vlan_ovs(self):
vlan = objects.Vlan('em1', 5)
vlan.ovs_port = True
self.provider.add_vlan(vlan)
self.assertEqual(_VLAN_OVS, self.get_vlan_config('vlan5'))
def test_add_vlan_mtu_1500(self):
vlan = objects.Vlan('em1', 5, mtu=1500)
self.provider.add_vlan(vlan)
expected = _VLAN_NO_IP + 'MTU=1500\n'
self.assertEqual(expected, self.get_vlan_config('vlan5'))
def test_add_ovs_bridge_with_vlan(self):
vlan = objects.Vlan('em1', 5)
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[vlan])
self.provider.add_vlan(vlan)
self.provider.add_bridge(bridge)
self.assertEqual(_VLAN_OVS_BRIDGE, self.get_vlan_config('vlan5'))
def test_add_linux_bridge_with_vlan(self):
vlan = objects.Vlan('em1', 5)
bridge = objects.LinuxBridge('br-ctlplane', use_dhcp=True,
members=[vlan])
self.provider.add_vlan(vlan)
self.provider.add_bridge(bridge)
self.assertEqual(_VLAN_LINUX_BRIDGE, self.get_vlan_config('vlan5'))
def test_ovs_bond(self):
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2')
bond = objects.OvsBond('bond0', use_dhcp=True,
members=[interface1, interface2])
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.provider.add_bond(bond)
self.assertEqual(_NO_IP, self.get_interface_config('em1'))
em2_config = """# This file is autogenerated by os-net-config
DEVICE=em2
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
self.assertEqual(em2_config, self.get_interface_config('em2'))
self.assertEqual(_OVS_BOND_DHCP,
self.get_interface_config('bond0'))
def test_linux_bond(self):
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2')
bond = objects.LinuxBond('bond0', use_dhcp=True,
members=[interface1, interface2])
self.provider.add_linux_bond(bond)
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.assertEqual(_LINUX_BOND_DHCP,
self.get_linux_bond_config('bond0'))
self.assertEqual(_LINUX_BOND_INTERFACE,
self.get_interface_config('em1'))
def test_linux_team(self):
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2')
team = objects.LinuxTeam('team0', use_dhcp=True,
members=[interface1, interface2])
self.provider.add_linux_team(team)
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.assertEqual(_LINUX_TEAM_DHCP,
self.get_linux_team_config('team0'))
self.assertEqual(_LINUX_TEAM_INTERFACE,
self.get_interface_config('em1'))
def test_interface_defroute(self):
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2', defroute=False)
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
em2_config = """# This file is autogenerated by os-net-config
DEVICE=em2
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
DEFROUTE=no
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
self.assertEqual(em2_config, self.get_interface_config('em2'))
def test_interface_dhclient_opts(self):
interface1 = objects.Interface('em1', dhclient_args='--foobar')
self.provider.add_interface(interface1)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
DHCLIENTARGS=--foobar
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
def test_interface_ethtool_opts(self):
interface1 = objects.Interface('em1',
ethtool_opts='speed 1000 duplex full')
self.provider.add_interface(interface1)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
ETHTOOL_OPTS=\"speed 1000 duplex full\"
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
def test_interface_single_dns_server(self):
interface1 = objects.Interface('em1', dns_servers=['1.2.3.4'])
self.provider.add_interface(interface1)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
BOOTPROTO=none
DNS1=1.2.3.4
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
def test_interface_dns_servers(self):
interface1 = objects.Interface('em1', dns_servers=['1.2.3.4',
'5.6.7.8'])
self.provider.add_interface(interface1)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
BOOTPROTO=none
DNS1=1.2.3.4
DNS2=5.6.7.8
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
def test_interface_more_dns_servers(self):
interface1 = objects.Interface('em1', dns_servers=['1.2.3.4',
'5.6.7.8',
'9.10.11.12'])
self.provider.add_interface(interface1)
em1_config = """# This file is autogenerated by os-net-config
DEVICE=em1
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
BOOTPROTO=none
DNS1=1.2.3.4
DNS2=5.6.7.8
"""
self.assertEqual(em1_config, self.get_interface_config('em1'))
def test_nm_controlled(self):
interface1 = objects.Interface('em1', nm_controlled=True)
interface2 = objects.Interface('em2', nm_controlled=True)
bond = objects.LinuxBond('bond1', nm_controlled=True,
members=[interface1, interface2])
self.provider.add_linux_bond(bond)
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
ifcfg_data = self.get_interface_config('em1')
self.assertEqual(_NM_CONTROLLED_INTERFACE, ifcfg_data)
bond_data = self.get_linux_bond_config('bond1')
self.assertEqual(_NM_CONTROLLED_BOND, bond_data)
def test_network_sriov_vf_without_config(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
addresses = [objects.Address('10.0.0.30/24')]
def test_update_sriov_vf_map(pf_name, vfid, vf_name, vlan_id=None,
qos=None, spoofcheck=None, trust=None,
state=None, macaddr=None, promisc=None):
self.assertEqual(pf_name, 'eth2')
self.assertEqual(vfid, 7)
self.assertEqual(vlan_id, 0)
self.assertEqual(qos, 0)
self.assertEqual(spoofcheck, None)
self.assertEqual(trust, None)
self.assertEqual(state, None)
self.assertEqual(macaddr, None)
self.stub_out('os_net_config.utils.update_sriov_vf_map',
test_update_sriov_vf_map)
def test_get_vf_devname(device, vfid):
return device + '_' + str(vfid)
self.stub_out('os_net_config.utils.get_vf_devname',
test_get_vf_devname)
vf = objects.SriovVF(device='nic3', vfid=7, addresses=addresses)
self.provider.add_sriov_vf(vf)
vf_config = """# This file is autogenerated by os-net-config
DEVICE=eth2_7
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
"""
self.assertEqual(vf_config, self.get_interface_config('eth2_7'))
def test_network_sriov_vf_true(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
addresses = [objects.Address('10.0.0.30/24')]
def test_update_sriov_vf_map(pf_name, vfid, vf_name, vlan_id=None,
qos=None, spoofcheck=None, trust=None,
state=None, macaddr=None, promisc=None):
self.assertEqual(pf_name, 'eth2')
self.assertEqual(vf_name, 'eth2_7')
self.assertEqual(vfid, 7)
self.assertEqual(vlan_id, 100)
self.assertEqual(qos, 10)
self.assertTrue(spoofcheck)
self.assertTrue(trust)
self.assertEqual(state, "auto")
self.assertEqual(macaddr, "AA:BB:CC:DD:EE:FF")
self.assertTrue(promisc)
self.stub_out('os_net_config.utils.update_sriov_vf_map',
test_update_sriov_vf_map)
def test_get_vf_devname(device, vfid):
return device + '_' + str(vfid)
self.stub_out('os_net_config.utils.get_vf_devname',
test_get_vf_devname)
vf = objects.SriovVF(device='nic3', vfid=7, addresses=addresses,
vlan_id='100', qos='10', spoofcheck=True,
trust=True, state="auto",
macaddr="AA:BB:CC:DD:EE:FF", promisc=True)
self.provider.add_sriov_vf(vf)
vf_config = """# This file is autogenerated by os-net-config
DEVICE=eth2_7
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
"""
self.assertEqual(vf_config, self.get_interface_config('eth2_7'))
def test_network_sriov_vf_config_false(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
addresses = [objects.Address('10.0.0.30/24')]
def test_update_sriov_vf_map(pf_name, vfid, vf_name, vlan_id=None,
qos=None, spoofcheck=None, trust=None,
state=None, macaddr=None, promisc=None):
self.assertEqual(pf_name, 'eth2')
self.assertEqual(vf_name, 'eth2_7')
self.assertEqual(vfid, 7)
self.assertEqual(vlan_id, 100)
self.assertEqual(qos, 10)
self.assertFalse(spoofcheck)
self.assertFalse(trust)
self.assertEqual(state, "enable")
self.assertEqual(macaddr, "AA:BB:CC:DD:EE:FF")
self.assertFalse(promisc)
self.stub_out('os_net_config.utils.update_sriov_vf_map',
test_update_sriov_vf_map)
def test_get_vf_devname(device, vfid):
return device + '_' + str(vfid)
self.stub_out('os_net_config.utils.get_vf_devname',
test_get_vf_devname)
vf = objects.SriovVF(device='nic3', vfid=7, addresses=addresses,
vlan_id='100', qos='10', spoofcheck=False,
trust=False, state="enable",
macaddr="AA:BB:CC:DD:EE:FF", promisc=False)
self.provider.add_sriov_vf(vf)
vf_config = """# This file is autogenerated by os-net-config
DEVICE=eth2_7
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=static
IPADDR=10.0.0.30
NETMASK=255.255.255.0
"""
self.assertEqual(vf_config, self.get_interface_config('eth2_7'))
def test_network_sriov_pf_without_promisc(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
pf = objects.SriovPF(name='nic3', numvfs=10)
def test_update_sriov_pf_map(name, numvfs, noop, promisc=None):
self.assertEqual(name, 'eth2')
self.assertEqual(numvfs, 10)
self.assertEqual(promisc, None)
self.stub_out('os_net_config.utils.update_sriov_pf_map',
test_update_sriov_pf_map)
self.provider.add_sriov_pf(pf)
pf_config = """# This file is autogenerated by os-net-config
DEVICE=eth2
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
self.assertEqual(pf_config, self.get_interface_config('eth2'))
def test_network_sriov_pf_with_promisc_on(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
pf = objects.SriovPF(name='nic3', numvfs=10, promisc=True)
def test_update_sriov_pf_map(name, numvfs, noop, promisc=None):
self.assertEqual(name, 'eth2')
self.assertEqual(numvfs, 10)
self.assertTrue(promisc)
self.stub_out('os_net_config.utils.update_sriov_pf_map',
test_update_sriov_pf_map)
self.provider.add_sriov_pf(pf)
pf_config = """# This file is autogenerated by os-net-config
DEVICE=eth2
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
self.assertEqual(pf_config, self.get_interface_config('eth2'))
def test_network_sriov_pf_with_promisc_off(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
pf = objects.SriovPF(name='nic3', numvfs=10, promisc=False)
def test_update_sriov_pf_map(name, numvfs, noop, promisc=None):
self.assertEqual(name, 'eth2')
self.assertEqual(numvfs, 10)
self.assertFalse(promisc)
self.stub_out('os_net_config.utils.update_sriov_pf_map',
test_update_sriov_pf_map)
self.provider.add_sriov_pf(pf)
pf_config = """# This file is autogenerated by os-net-config
DEVICE=eth2
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
BOOTPROTO=none
"""
self.assertEqual(pf_config, self.get_interface_config('eth2'))
def test_network_ovs_dpdk_bridge_and_port(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
interface = objects.Interface(name='nic3')
dpdk_port = objects.OvsDpdkPort(name='dpdk0', members=[interface])
bridge = objects.OvsUserBridge('br-link', members=[dpdk_port])
def test_bind_dpdk_interfaces(ifname, driver, noop):
self.assertEqual(ifname, 'eth2')
self.assertEqual(driver, 'vfio-pci')
self.stub_out('os_net_config.utils.bind_dpdk_interfaces',
test_bind_dpdk_interfaces)
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
self.provider.add_ovs_dpdk_port(dpdk_port)
self.provider.add_ovs_user_bridge(bridge)
br_link_config = """# This file is autogenerated by os-net-config
DEVICE=br-link
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSUserBridge
"""
dpdk0_config = """# This file is autogenerated by os-net-config
DEVICE=dpdk0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSDPDKPort
OVS_BRIDGE=br-link
OVS_EXTRA="set Interface $DEVICE options:dpdk-devargs=0000:00:09.0"
"""
self.assertEqual(br_link_config,
self.provider.bridge_data['br-link'])
self.assertEqual(dpdk0_config, self.get_interface_config('dpdk0'))
def test_network_ovs_dpdk_bridge_and_port_with_mtu_rxqueue(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
interface = objects.Interface(name='nic3')
dpdk_port = objects.OvsDpdkPort(name='dpdk0', members=[interface],
mtu=9000, rx_queue=4)
bridge = objects.OvsUserBridge('br-link', members=[dpdk_port])
def test_bind_dpdk_interfaces(ifname, driver, noop):
self.assertEqual(ifname, 'eth2')
self.assertEqual(driver, 'vfio-pci')
self.stub_out('os_net_config.utils.bind_dpdk_interfaces',
test_bind_dpdk_interfaces)
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
self.provider.add_ovs_dpdk_port(dpdk_port)
self.provider.add_ovs_user_bridge(bridge)
br_link_config = """# This file is autogenerated by os-net-config
DEVICE=br-link
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSUserBridge
"""
dpdk0_config = """# This file is autogenerated by os-net-config
DEVICE=dpdk0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSDPDKPort
OVS_BRIDGE=br-link
RX_QUEUE=4
MTU=9000
OVS_EXTRA="set Interface $DEVICE options:dpdk-devargs=0000:00:09.0 \
-- set Interface $DEVICE mtu_request=$MTU \
-- set Interface $DEVICE options:n_rxq=$RX_QUEUE"
"""
self.assertEqual(br_link_config,
self.provider.bridge_data['br-link'])
self.assertEqual(dpdk0_config, self.get_interface_config('dpdk0'))
def test_network_ovs_dpdk_bond(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
iface0 = objects.Interface(name='nic2')
dpdk0 = objects.OvsDpdkPort(name='dpdk0', members=[iface0])
iface1 = objects.Interface(name='nic3')
dpdk1 = objects.OvsDpdkPort(name='dpdk1', members=[iface1])
bond = objects.OvsDpdkBond('dpdkbond0', members=[dpdk0, dpdk1])
bridge = objects.OvsUserBridge('br-link', members=[bond])
def test_bind_dpdk_interfaces(ifname, driver, noop):
self.assertIn(ifname, ['eth1', 'eth2'])
self.assertEqual(driver, 'vfio-pci')
self.stub_out('os_net_config.utils.bind_dpdk_interfaces',
test_bind_dpdk_interfaces)
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
self.provider.add_ovs_dpdk_bond(bond)
self.provider.add_ovs_user_bridge(bridge)
dpdk_bond_config = """# This file is autogenerated by os-net-config
DEVICE=dpdkbond0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSDPDKBond
OVS_BRIDGE=br-link
BOND_IFACES="dpdk0 dpdk1"
OVS_EXTRA="set Interface dpdk0 options:dpdk-devargs=0000:00:08.0 \
-- set Interface dpdk1 options:dpdk-devargs=0000:00:09.0"
"""
self.assertEqual(dpdk_bond_config,
self.get_interface_config('dpdkbond0'))
def test_network_ovs_dpdk_bond_with_mtu(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
iface0 = objects.Interface(name='nic2')
dpdk0 = objects.OvsDpdkPort(name='dpdk0', members=[iface0])
iface1 = objects.Interface(name='nic3')
dpdk1 = objects.OvsDpdkPort(name='dpdk1', members=[iface1])
bond = objects.OvsDpdkBond('dpdkbond0', mtu=9000,
members=[dpdk0, dpdk1])
bridge = objects.OvsUserBridge('br-link', members=[bond])
def test_bind_dpdk_interfaces(ifname, driver, noop):
self.assertIn(ifname, ['eth1', 'eth2'])
self.assertEqual(driver, 'vfio-pci')
self.stub_out('os_net_config.utils.bind_dpdk_interfaces',
test_bind_dpdk_interfaces)
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
self.provider.add_ovs_dpdk_bond(bond)
self.provider.add_ovs_user_bridge(bridge)
dpdk_bond_config = """# This file is autogenerated by os-net-config
DEVICE=dpdkbond0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSDPDKBond
OVS_BRIDGE=br-link
BOND_IFACES="dpdk0 dpdk1"
MTU=9000
OVS_EXTRA="set Interface dpdk0 options:dpdk-devargs=0000:00:08.0 \
-- set Interface dpdk1 options:dpdk-devargs=0000:00:09.0 \
-- set Interface dpdk0 mtu_request=$MTU \
-- set Interface dpdk1 mtu_request=$MTU"
"""
self.assertEqual(dpdk_bond_config,
self.get_interface_config('dpdkbond0'))
def test_network_ovs_dpdk_bond_with_rx_queue(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
iface0 = objects.Interface(name='nic2')
dpdk0 = objects.OvsDpdkPort(name='dpdk0', members=[iface0])
iface1 = objects.Interface(name='nic3')
dpdk1 = objects.OvsDpdkPort(name='dpdk1', members=[iface1])
bond = objects.OvsDpdkBond('dpdkbond0', rx_queue=4,
members=[dpdk0, dpdk1])
bridge = objects.OvsUserBridge('br-link', members=[bond])
def test_bind_dpdk_interfaces(ifname, driver, noop):
self.assertIn(ifname, ['eth1', 'eth2'])
self.assertEqual(driver, 'vfio-pci')
self.stub_out('os_net_config.utils.bind_dpdk_interfaces',
test_bind_dpdk_interfaces)
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
self.provider.add_ovs_dpdk_bond(bond)
self.provider.add_ovs_user_bridge(bridge)
dpdk_bond_config = """# This file is autogenerated by os-net-config
DEVICE=dpdkbond0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSDPDKBond
OVS_BRIDGE=br-link
BOND_IFACES="dpdk0 dpdk1"
RX_QUEUE=4
OVS_EXTRA="set Interface dpdk0 options:dpdk-devargs=0000:00:08.0 \
-- set Interface dpdk1 options:dpdk-devargs=0000:00:09.0 \
-- set Interface dpdk0 options:n_rxq=$RX_QUEUE \
-- set Interface dpdk1 options:n_rxq=$RX_QUEUE"
"""
self.assertEqual(dpdk_bond_config,
self.get_interface_config('dpdkbond0'))
def test_network_ovs_dpdk_bond_with_mtu_and_rx_queue(self):
nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'}
self.stubbed_mapped_nics = nic_mapping
iface0 = objects.Interface(name='nic2')
dpdk0 = objects.OvsDpdkPort(name='dpdk0', members=[iface0])
iface1 = objects.Interface(name='nic3')
dpdk1 = objects.OvsDpdkPort(name='dpdk1', members=[iface1])
bond = objects.OvsDpdkBond('dpdkbond0', rx_queue=4, mtu=9000,
members=[dpdk0, dpdk1])
bridge = objects.OvsUserBridge('br-link', members=[bond])
def test_bind_dpdk_interfaces(ifname, driver, noop):
self.assertIn(ifname, ['eth1', 'eth2'])
self.assertEqual(driver, 'vfio-pci')
self.stub_out('os_net_config.utils.bind_dpdk_interfaces',
test_bind_dpdk_interfaces)
self.stub_out('os_net_config.utils.get_stored_pci_address',
self.stub_get_stored_pci_address)
self.provider.add_ovs_dpdk_bond(bond)
self.provider.add_ovs_user_bridge(bridge)
dpdk_bond_config = """# This file is autogenerated by os-net-config
DEVICE=dpdkbond0
ONBOOT=yes
HOTPLUG=no
NM_CONTROLLED=no
PEERDNS=no
DEVICETYPE=ovs
TYPE=OVSDPDKBond
OVS_BRIDGE=br-link
BOND_IFACES="dpdk0 dpdk1"
RX_QUEUE=4
MTU=9000
OVS_EXTRA="set Interface dpdk0 options:dpdk-devargs=0000:00:08.0 \
-- set Interface dpdk1 options:dpdk-devargs=0000:00:09.0 \
-- set Interface dpdk0 mtu_request=$MTU \
-- set Interface dpdk1 mtu_request=$MTU \
-- set Interface dpdk0 options:n_rxq=$RX_QUEUE \
-- set Interface dpdk1 options:n_rxq=$RX_QUEUE"
"""
self.assertEqual(dpdk_bond_config,
self.get_interface_config('dpdkbond0'))
class TestIfcfgNetConfigApply(base.TestCase):
def setUp(self):
super(TestIfcfgNetConfigApply, self).setUp()
self.temp_ifcfg_file = tempfile.NamedTemporaryFile()
self.temp_bond_file = tempfile.NamedTemporaryFile()
self.temp_route_file = tempfile.NamedTemporaryFile()
self.temp_route6_file = tempfile.NamedTemporaryFile()
self.temp_bridge_file = tempfile.NamedTemporaryFile()
self.temp_cleanup_file = tempfile.NamedTemporaryFile(delete=False)
self.ifup_interface_names = []
self.ovs_appctl_cmds = []
self.stop_dhclient_interfaces = []
self.ip_reconfigure_commands = []
def test_ifcfg_path(name):
return self.temp_ifcfg_file.name
self.stub_out(
'os_net_config.impl_ifcfg.ifcfg_config_path', test_ifcfg_path)
def test_remove_ifcfg_config(name):
ifcfg_file = self.temp_ifcfg_file.name
if os.path.exists(ifcfg_file):
os.remove(ifcfg_file)
self.stub_out('os_net_config.impl_ifcfg.remove_ifcfg_config',
test_remove_ifcfg_config)
def test_routes_path(name):
return self.temp_route_file.name
self.stub_out(
'os_net_config.impl_ifcfg.route_config_path', test_routes_path)
def test_routes6_path(name):
return self.temp_route6_file.name
self.stub_out(
'os_net_config.impl_ifcfg.route6_config_path', test_routes6_path)
def test_bridge_path(name):
return self.temp_bridge_file.name
self.stub_out(
'os_net_config.impl_ifcfg.bridge_config_path', test_bridge_path)
def test_cleanup_pattern():
return self.temp_cleanup_file.name
self.stub_out('os_net_config.impl_ifcfg.cleanup_pattern',
test_cleanup_pattern)
def test_stop_dhclient_process(interface):
self.stop_dhclient_interfaces.append(interface)
self.stub_out('os_net_config.impl_ifcfg.stop_dhclient_process',
test_stop_dhclient_process)
def test_execute(*args, **kwargs):
if args[0] == '/sbin/ifup':
self.ifup_interface_names.append(args[1])
elif args[0] == '/bin/ovs-appctl':
self.ovs_appctl_cmds.append(' '.join(args))
elif args[0] == '/sbin/ip' or args[0] == '/usr/sbin/ip':
self.ip_reconfigure_commands.append(' '.join(args[1:]))
pass
self.stub_out('oslo_concurrency.processutils.execute', test_execute)
def stub_is_ovs_installed():
return True
self.stub_out('os_net_config.utils.is_ovs_installed',
stub_is_ovs_installed)
self.provider = impl_ifcfg.IfcfgNetConfig()
def tearDown(self):
self.temp_ifcfg_file.close()
self.temp_route_file.close()
self.temp_route6_file.close()
self.temp_bridge_file.close()
if os.path.exists(self.temp_cleanup_file.name):
self.temp_cleanup_file.close()
super(TestIfcfgNetConfigApply, self).tearDown()
def test_network_apply(self):
route1 = objects.Route('192.168.1.1', default=True,
route_options="metric 10")
route2 = objects.Route('192.168.1.1', '172.19.0.0/24')
route3 = objects.Route('192.168.1.5', '172.20.0.0/24',
route_options="metric 100")
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr],
routes=[route1, route2, route3])
self.provider.add_interface(interface)
self.provider.apply()
ifcfg_data = utils.get_file_data(self.temp_ifcfg_file.name)
self.assertEqual(_V4_IFCFG, ifcfg_data)
route_data = utils.get_file_data(self.temp_route_file.name)
self.assertEqual(_ROUTES, route_data)
def test_dhcp_ovs_bridge_network_apply(self):
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.provider.apply()
ifcfg_data = utils.get_file_data(self.temp_ifcfg_file.name)
self.assertEqual(_OVS_INTERFACE, ifcfg_data)
bridge_data = utils.get_file_data(self.temp_bridge_file.name)
self.assertEqual(_OVS_BRIDGE_DHCP, bridge_data)
route_data = utils.get_file_data(self.temp_route_file.name)
self.assertEqual("", route_data)
def test_dhclient_stop_on_iface_activate(self):
self.stop_dhclient_interfaces = []
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr])
interface2 = objects.Interface('em2', use_dhcp=True)
interface3 = objects.Interface('em3', use_dhcp=False)
self.provider.add_interface(interface)
self.provider.add_interface(interface2)
self.provider.add_interface(interface3)
self.provider.apply()
# stop dhclient on em1 due to static IP and em3 due to no IP
self.assertIn('em1', self.stop_dhclient_interfaces)
self.assertIn('em3', self.stop_dhclient_interfaces)
self.assertNotIn('em2', self.stop_dhclient_interfaces)
def test_apply_noactivate(self):
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.provider.apply(activate=False)
self.assertEqual([], self.ifup_interface_names)
def test_bond_active_slave(self):
# setup and apply a bond
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2', primary=True)
bond = objects.OvsBond('bond1', use_dhcp=True,
members=[interface1, interface2])
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.provider.add_bond(bond)
self.provider.apply()
ovs_appctl_cmds = '/bin/ovs-appctl bond/set-active-slave bond1 em2'
self.assertIn(ovs_appctl_cmds, self.ovs_appctl_cmds)
def test_bond_active_ordering(self):
# setup and apply a bond
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2')
bond = objects.OvsBond('bond1', use_dhcp=True,
members=[interface1, interface2])
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.provider.add_bond(bond)
self.provider.apply()
ovs_appctl_cmds = '/bin/ovs-appctl bond/set-active-slave bond1 em1'
self.assertIn(ovs_appctl_cmds, self.ovs_appctl_cmds)
def test_reconfigure_and_apply(self):
route1 = objects.Route('192.168.1.1', default=True)
route2 = objects.Route('192.168.1.1', '172.19.0.0/24')
v4_addr1 = objects.Address('192.168.1.2/24')
interface1 = objects.Interface('em1', addresses=[v4_addr1],
routes=[route1, route2])
self.provider.add_interface(interface1)
self.provider.apply()
self.assertIn('em1', self.ifup_interface_names)
expected_commands = ['addr add 192.168.0.2/23 dev em1',
'addr del 192.168.1.2/24 dev em1',
'link set dev em1 mtu 9000']
v4_addr2 = objects.Address('192.168.0.2/23')
interface2 = objects.Interface('em1', addresses=[v4_addr2],
routes=[route1, route2], mtu=9000)
self.provider.add_interface(interface2)
self.provider.apply()
self.assertEqual(expected_commands, self.ip_reconfigure_commands)
self.ip_reconfigure_commands = []
expected_commands = ['addr add 192.168.1.2/24 dev em1',
'addr del 192.168.0.2/23 dev em1',
'link set dev em1 mtu 1500',
'route add default via 192.168.0.1 dev em1',
'route add 172.19.0.0/24 via 192.168.0.1 dev em1']
route3 = objects.Route('192.168.0.1', default=True)
route4 = objects.Route('192.168.0.1', '172.19.0.0/24')
interface3 = objects.Interface('em1', addresses=[v4_addr1],
routes=[route3, route4])
self.provider.add_interface(interface3)
self.provider.apply()
self.assertEqual(expected_commands, self.ip_reconfigure_commands)
def test_change_restart_required(self):
tmpdir = tempfile.mkdtemp()
interface = "eth0"
interface_filename = tmpdir + '/ifcfg-' + interface
file = open(interface_filename, 'w')
file.write(_IFCFG_DHCP)
file.close()
# Changing a dhcp interface to static should not require restart
self.assertFalse(
self.provider.ifcfg_requires_restart(interface_filename,
_IFCFG_STATIC1))
# Changing a standard interface to ovs should require restart
self.assertTrue(
self.provider.ifcfg_requires_restart(interface_filename,
_IFCFG_OVS))
# Changing a static interface to dhcp should require restart
file = open(interface_filename, 'w')
file.write(_IFCFG_STATIC1)
file.close()
self.assertTrue(self.provider.ifcfg_requires_restart(
interface_filename, _IFCFG_DHCP))
# Configuring a previously unconfigured interface requires restart
self.assertTrue(self.provider.ifcfg_requires_restart('/doesnotexist',
_IFCFG_DHCP))
shutil.rmtree(tmpdir)
def test_ifcfg_route_commands(self):
tmpdir = tempfile.mkdtemp()
interface = "eth0"
interface_filename = tmpdir + '/route-' + interface
file = open(interface_filename, 'w')
file.write(_IFCFG_ROUTES1)
file.close()
# Changing only the routes should delete and add routes
command_list1 = ['route del default via 192.0.2.1 dev eth0',
'route del 192.0.2.1/24 via 192.0.2.1 dev eth0',
'route add default via 192.0.1.1 dev eth0',
'route add 192.0.1.1/24 via 192.0.3.1 dev eth1']
commands = self.provider.iproute2_route_commands(interface_filename,
_IFCFG_ROUTES2)
self.assertTrue(commands == command_list1)
def test_ifcfg_ipmtu_commands(self):
tmpdir = tempfile.mkdtemp()
interface = "eth0"
interface_filename = tmpdir + '/ifcfg-' + interface
file = open(interface_filename, 'w')
file.write(_IFCFG_STATIC1)
file.close()
# Changing only the IP should delete and add the IP
command_list1 = ['addr add 10.0.1.2/23 dev eth0',
'addr del 10.0.0.1/24 dev eth0']
commands = self.provider.iproute2_apply_commands(interface,
interface_filename,
_IFCFG_STATIC2)
self.assertTrue(commands == command_list1)
# Changing only the MTU should just set the interface MTU
command_list2 = ['link set dev eth0 mtu 9000']
commands = self.provider.iproute2_apply_commands(interface,
interface_filename,
_IFCFG_STATIC1_MTU)
self.assertTrue(commands == command_list2)
# Changing both the IP and MTU should delete IP, add IP, and set MTU
command_list3 = ['addr add 10.0.1.2/23 dev eth0',
'addr del 10.0.0.1/24 dev eth0',
'link set dev eth0 mtu 9000']
commands = self.provider.iproute2_apply_commands(interface,
interface_filename,
_IFCFG_STATIC2_MTU)
self.assertTrue(commands == command_list3)
def test_restart_children_on_change(self):
# setup and apply a bridge
interface = objects.Interface('em1')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.provider.apply()
self.assertIn('em1', self.ifup_interface_names)
self.assertIn('br-ctlplane', self.ifup_interface_names)
# changing the bridge should restart the interface too
self.ifup_interface_names = []
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=False,
members=[interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.provider.apply()
self.assertIn('em1', self.ifup_interface_names)
# test infiniband interfaces act as proper bridge members
ib_interface = objects.IbInterface('ib0')
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[ib_interface])
self.provider.add_interface(ib_interface)
self.provider.add_bridge(bridge)
self.provider.apply()
self.assertIn('ib0', self.ifup_interface_names)
self.assertIn('br-ctlplane', self.ifup_interface_names)
# changing the bridge should restart the interface too
self.ifup_interface_names = []
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=False,
members=[ib_interface])
self.provider.add_interface(interface)
self.provider.add_bridge(bridge)
self.provider.apply()
self.assertIn('ib0', self.ifup_interface_names)
# setup and apply a bond on a bridge
self.ifup_interface_names = []
interface1 = objects.Interface('em1')
interface2 = objects.Interface('em2')
bond = objects.OvsBond('bond0',
members=[interface1, interface2])
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True,
members=[bond])
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.provider.add_bond(bond)
self.provider.add_bridge(bridge)
self.provider.apply()
# changing the bridge should restart everything
self.ifup_interface_names = []
bridge = objects.OvsBridge('br-ctlplane', use_dhcp=False,
members=[bond])
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.provider.add_bond(bond)
self.provider.add_bridge(bridge)
self.provider.apply()
self.assertIn('br-ctlplane', self.ifup_interface_names)
self.assertIn('bond0', self.ifup_interface_names)
self.assertIn('em1', self.ifup_interface_names)
self.assertIn('em2', self.ifup_interface_names)
def test_restart_interface_counts(self):
interface = objects.Interface('em1')
self.provider.add_interface(interface)
interface2 = objects.Interface('em2')
self.provider.add_interface(interface2)
self.provider.apply()
self.assertEqual(1, self.ifup_interface_names.count("em1"))
self.assertEqual(1, self.ifup_interface_names.count("em2"))
def test_vlan_apply(self):
vlan = objects.Vlan('em1', 5)
self.provider.add_vlan(vlan)
self.provider.apply()
ifcfg_data = utils.get_file_data(self.temp_ifcfg_file.name)
self.assertEqual(_VLAN_NO_IP, ifcfg_data)
def test_cleanup(self):
self.provider.apply(cleanup=True)
self.assertTrue(not os.path.exists(self.temp_cleanup_file.name))
def test_cleanup_not_loopback(self):
tmp_lo_file = '%s-lo' % self.temp_cleanup_file.name
utils.write_config(tmp_lo_file, 'foo')
def test_cleanup_pattern():
return '%s-*' % self.temp_cleanup_file.name
self.stub_out('os_net_config.impl_ifcfg.cleanup_pattern',
test_cleanup_pattern)
self.provider.apply(cleanup=True)
self.assertTrue(os.path.exists(tmp_lo_file))
os.remove(tmp_lo_file)
def test_ovs_restart_called(self):
interface = objects.Interface('em1')
dpdk_port = objects.OvsDpdkPort('dpdk0', members=[interface])
execute_strings = []
def test_execute(*args, **kwargs):
execute_strings.append(args[1])
self.stub_out('os_net_config.NetConfig.execute', test_execute)
self.provider.noop = True
self.provider.add_ovs_dpdk_port(dpdk_port)
self.provider.apply()
self.assertIn('Restart openvswitch', execute_strings)
def test_ovs_restart_not_called(self):
interface = objects.Interface('em1')
execute_strings = []
def test_execute(*args, **kwargs):
execute_strings.append(args[1])
self.stub_out('os_net_config.NetConfig.execute', test_execute)
self.provider.noop = True
self.provider.add_interface(interface)
self.provider.apply()
self.assertNotIn('Restart openvswitch', execute_strings)
def _failed_execute(*args, **kwargs):
if kwargs.get('check_exit_code', True):
raise processutils.ProcessExecutionError('Test stderr',
'Test stdout',
str(kwargs))
def test_interface_failure(self):
self.stub_out('oslo_concurrency.processutils.execute',
self._failed_execute)
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr])
self.provider.add_interface(interface)
self.assertRaises(os_net_config.ConfigurationError,
self.provider.apply)
self.assertEqual(1, len(self.provider.errors))
def test_interface_failure_multiple(self):
self.stub_out('oslo_concurrency.processutils.execute',
self._failed_execute)
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr])
v4_addr2 = objects.Address('192.168.2.2/24')
interface2 = objects.Interface('em2', addresses=[v4_addr2])
self.provider.add_interface(interface)
self.provider.add_interface(interface2)
self.assertRaises(os_net_config.ConfigurationError,
self.provider.apply)
# Even though the first one failed, we should have attempted both
self.assertEqual(2, len(self.provider.errors))