neutron/neutron/tests/functional/agent/linux/test_bridge_lib.py

197 lines
8.0 KiB
Python

# Copyright (c) 2015 Thales Services SAS
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import netaddr
from neutron_lib import constants
from oslo_utils import uuidutils
import testscenarios
from neutron.agent.linux import bridge_lib
from neutron.agent.linux import ip_lib
from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
from neutron.tests.common import net_helpers
from neutron.tests.functional import base
class BridgeLibTestCase(base.BaseSudoTestCase):
def setUp(self):
super(BridgeLibTestCase, self).setUp()
self.bridge, self.port_fixture = self.create_bridge_port_fixture()
def create_bridge_port_fixture(self):
bridge = self.useFixture(
net_helpers.LinuxBridgeFixture(namespace=None)).bridge
port_fixture = self.useFixture(
net_helpers.LinuxBridgePortFixture(
bridge, port_id=uuidutils.generate_uuid()))
return bridge, port_fixture
def test_is_bridged_interface(self):
self.assertTrue(
bridge_lib.is_bridged_interface(self.port_fixture.br_port.name))
def test_is_not_bridged_interface(self):
self.assertFalse(
bridge_lib.is_bridged_interface(self.port_fixture.port.name))
def test_get_bridge_names(self):
self.assertIn(self.bridge.name, bridge_lib.get_bridge_names())
def test_get_interface_ifindex(self):
port = self.port_fixture.br_port
t1 = bridge_lib.get_interface_ifindex(str(port))
self.port_fixture.veth_fixture.destroy()
self.port_fixture.veth_fixture._setUp()
t2 = bridge_lib.get_interface_ifindex(str(port))
self.assertIsNotNone(t1)
self.assertIsNotNone(t2)
self.assertGreaterEqual(t2, t1)
def test_get_interface_bridge(self):
bridge = bridge_lib.BridgeDevice.get_interface_bridge(
self.port_fixture.br_port.name)
self.assertEqual(self.bridge.name, bridge.name)
def test_get_interface_no_bridge(self):
bridge = bridge_lib.BridgeDevice.get_interface_bridge(
self.port_fixture.port.name)
self.assertIsNone(bridge)
def test_get_interfaces(self):
self.assertEqual(
[self.port_fixture.br_port.name], self.bridge.get_interfaces())
def test_get_interfaces_no_bridge(self):
bridge = bridge_lib.BridgeDevice('--fake--')
self.assertEqual([], bridge.get_interfaces())
def test_disable_ipv6(self):
sysfs_path = ("/proc/sys/net/ipv6/conf/%s/disable_ipv6" %
self.bridge.name)
# first, make sure it's enabled
with open(sysfs_path, 'r') as sysfs_disable_ipv6_file:
sysfs_disable_ipv6 = sysfs_disable_ipv6_file.read()
self.assertEqual("0\n", sysfs_disable_ipv6)
self.assertEqual(0, self.bridge.disable_ipv6())
with open(sysfs_path, 'r') as sysfs_disable_ipv6_file:
sysfs_disable_ipv6 = sysfs_disable_ipv6_file.read()
self.assertEqual("1\n", sysfs_disable_ipv6)
class FdbInterfaceTestCase(testscenarios.WithScenarios, base.BaseSudoTestCase):
MAC1 = 'ca:fe:ca:fe:ca:fe'
MAC2 = 'ca:fe:ca:fe:ca:01'
RULE_PATTERN = (r"^(?P<mac>([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})) "
r"(dst (?P<ip>\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b))*")
scenarios = [
('namespace', {'namespace': 'ns_' + uuidutils.generate_uuid()}),
('no_namespace', {'namespace': None})
]
def setUp(self):
super(FdbInterfaceTestCase, self).setUp()
_uuid = uuidutils.generate_uuid()
self.device = ('int_' + _uuid)[:constants.DEVICE_NAME_MAX_LEN]
self.device_vxlan = ('vxlan_' + _uuid)[:constants.DEVICE_NAME_MAX_LEN]
self.ip = '10.220.0.1/24'
self.ip_vxlan = '10.221.0.1/24'
if self.namespace:
priv_ip_lib.create_netns(self.namespace)
else:
self._cleanup()
self.addCleanup(self._cleanup)
ip_wrapper = ip_lib.IPWrapper(self.namespace)
ip_wrapper.add_dummy(self.device)
ip_wrapper.add_vxlan(self.device_vxlan, 100, dev=self.device)
ip_device = ip_lib.IPDevice(self.device, self.namespace)
ip_device.link.set_up()
ip_device.addr.add(self.ip)
ip_device_vxlan = ip_lib.IPDevice(self.device_vxlan, self.namespace)
ip_device_vxlan.link.set_up()
ip_device_vxlan.addr.add(self.ip_vxlan)
def _cleanup(self):
if self.namespace:
priv_ip_lib.remove_netns(self.namespace)
else:
for device in (self.device_vxlan, self.device):
try:
priv_ip_lib.delete_interface(device, None)
except priv_ip_lib.NetworkInterfaceNotFound:
pass
def _list_fdb_rules(self, device):
output = bridge_lib.FdbInterface.show(dev=device,
namespace=self.namespace)
rules = re.finditer(self.RULE_PATTERN, output, flags=re.MULTILINE)
ret = {}
for rule in rules:
ret[rule.groupdict()['mac']] = rule.groupdict()['ip']
return ret
def test_add_delete(self):
self.assertNotIn(self.MAC1, self._list_fdb_rules(self.device))
bridge_lib.FdbInterface.add(self.MAC1, self.device,
namespace=self.namespace)
self.assertIn(self.MAC1, self._list_fdb_rules(self.device))
bridge_lib.FdbInterface.delete(self.MAC1, self.device,
namespace=self.namespace)
self.assertNotIn(self.MAC1, self._list_fdb_rules(self.device))
def test_add_delete_dst(self):
self.assertNotIn(self.MAC1, self._list_fdb_rules(self.device_vxlan))
bridge_lib.FdbInterface.add(
self.MAC1, self.device_vxlan, namespace=self.namespace,
ip_dst=str(netaddr.IPNetwork(self.ip).ip))
rules = self._list_fdb_rules(self.device_vxlan)
self.assertEqual(str(netaddr.IPNetwork(self.ip).ip), rules[self.MAC1])
bridge_lib.FdbInterface.delete(
self.MAC1, self.device_vxlan, namespace=self.namespace,
ip_dst=str(netaddr.IPNetwork(self.ip).ip))
self.assertNotIn(self.MAC1, self._list_fdb_rules(self.device_vxlan))
def test_append(self):
self.assertNotIn(self.MAC1, self._list_fdb_rules(self.device))
bridge_lib.FdbInterface.append(self.MAC1, self.device,
namespace=self.namespace)
self.assertIn(self.MAC1, self._list_fdb_rules(self.device))
def test_append_dst(self):
self.assertNotIn(self.MAC1, self._list_fdb_rules(self.device_vxlan))
bridge_lib.FdbInterface.append(
self.MAC1, self.device_vxlan, namespace=self.namespace,
ip_dst=str(netaddr.IPNetwork(self.ip).ip))
rules = self._list_fdb_rules(self.device_vxlan)
self.assertEqual(str(netaddr.IPNetwork(self.ip).ip), rules[self.MAC1])
def test_replace(self):
self.assertNotIn(self.MAC1, self._list_fdb_rules(self.device))
bridge_lib.FdbInterface.add(
self.MAC1, self.device_vxlan, namespace=self.namespace,
ip_dst=str(netaddr.IPNetwork(self.ip).ip))
rules = self._list_fdb_rules(self.device_vxlan)
self.assertEqual(str(netaddr.IPNetwork(self.ip).ip), rules[self.MAC1])
bridge_lib.FdbInterface.replace(
self.MAC1, self.device_vxlan, namespace=self.namespace,
ip_dst='1.1.1.1')
rules = self._list_fdb_rules(self.device_vxlan)
self.assertEqual('1.1.1.1', rules[self.MAC1])