# Copyright 2023 Red Hat, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from neutron_tempest_plugin.common import ssh from neutron_tempest_plugin import config from tempest.lib import decorators from whitebox_neutron_tempest_plugin.tests.scenario import base CONF = config.CONF class PortSecurityTest(base.BaseTempestWhiteboxTestCase): credentials = ['admin', 'primary'] required_extensions = ['port-security'] def _create_server(self, ports, scheduler_hints=None): params = { 'flavor_ref': CONF.compute.flavor_ref, 'image_ref': CONF.compute.image_ref, 'key_name': self.keypair['name'], 'networks': [{'port': port['id']} for port in ports]} if scheduler_hints: params['scheduler_hints'] = scheduler_hints server = self.create_server(**params) fip = self.create_floatingip(port=ports[0]) client = ssh.Client(fip['floating_ip_address'], CONF.validation.image_ssh_user, pkey=self.keypair['private_key']) return {'server': server, 'client': client, 'fip': fip} @decorators.idempotent_id('f9523922-9e71-4fa1-9cca-303bedf230bc') def test_port_security_disabled(self): """Verify that traffic is not blocked when port security is disabled Scenario: 1. Create keypair and router. 2. Create 2 internal networks connected to the router, one is for management and the other one is for testing. 3. Create 2 ports in each network, 2 ports for 2 servers. Ports in the test_network will have specific static ip addresses. 4. Create 2 servers with 2 ports each. 5. Make sure that interfaces connected to the test network are up and running with corresponding ip address. 6. Test that one server (src_server) can reach the other (dst_server) by the ip address configured on the interface connected to the test network. 7. On dst_server change MAC and ip address of the interface connected to the test network. 8. Verify that src_server is still able to access dst_server through the port connected to the test_network even when MAC and ip address do not match the ones configured for the port. """ self.keypair = self.create_keypair() router = self.create_router_by_client() mgmt_network = self.create_network(port_security_enabled=False) test_network = self.create_network(port_security_enabled=False) mgmt_subnet = self.create_subnet( network=mgmt_network, cidr="192.168.0.0/24") self.create_router_interface(router['id'], mgmt_subnet['id']) # test_subnet should have no gateway, otherwise, VM routes with two # ports will be messed up test_subnet = self.create_subnet( network=test_network, cidr="192.168.1.0/24", gateway=None) ips = {'src_server': '192.168.1.11', 'dst_server': '192.168.1.12', 'dst_server_alt': '192.168.1.13'} src_server_mgmt_port = self.create_port(mgmt_network) dst_server_mgmt_port = self.create_port(mgmt_network) src_server_test_port = self.create_port( test_network, fixed_ips=[{"subnet_id": test_subnet['id'], "ip_address": ips['src_server']}]) dst_server_test_port = self.create_port( test_network, fixed_ips=[{"subnet_id": test_subnet['id'], "ip_address": ips['dst_server']}]) src_server = self._create_server( ports=[src_server_mgmt_port, src_server_test_port]) dst_server = self._create_server( ports=[dst_server_mgmt_port, dst_server_test_port]) servers = {'src_server': src_server, 'dst_server': dst_server} # As both servers use same image we can get interface from any of them shell_path = "PATH=$PATH:/sbin" test_interface = dst_server['client'].exec_command( "{};ip addr | grep {} -B 1 | head -1 | " r"cut -d ':' -f 2 | sed 's/\ //g'".format( shell_path, dst_server_test_port['mac_address'])).rstrip() # In case second interface is down we need to switch it on # and configure ip address for server in servers.keys(): if CONF.neutron_plugin_options.default_image_is_advanced: cmd = ("ip addr show {interface} | grep {ip} || " "sudo dhclient {interface}").format( ip=ips[server], interface=test_interface) else: cmd = ("cat /sys/class/net/{interface}/operstate | " "grep -q -v down && true || " "({path}; sudo ip link set {interface} up && " "sudo ip addr add {ip}/24 dev {interface})").format( path=shell_path, ip=ips[server], interface=test_interface) servers[server]['client'].exec_command(cmd) self.check_remote_connectivity(src_server['client'], ips['dst_server']) # Now configure another MAC and IP address on the dst_server # test interface and check connectivity again (in both directions) dst_server['client'].exec_command( "{path}; sudo ip addr del {ip}/24 dev {interface} && " "sudo ip addr add {ip_alt}/24 dev {interface} && " "sudo ip link set {interface} address 02:02:02:03:03:03".format( path=shell_path, interface=test_interface, ip=ips['dst_server'], ip_alt=ips['dst_server_alt'])) self.check_remote_connectivity(src_server['client'], ips['dst_server_alt']) self.check_remote_connectivity(dst_server['client'], ips['src_server']) @decorators.idempotent_id('b99a014c-45e4-438d-8827-1063b44d611c') def test_two_vms_same_compute_when_no_port_security(self): self.keypair = self.create_keypair() router = self.create_router_by_client() test_network = self.create_network(port_security_enabled=False) test_subnet = self.create_subnet(network=test_network) self.create_router_interface(router['id'], test_subnet['id']) vm1_port = self.create_port(test_network) vm2_port = self.create_port(test_network) vm1 = self._create_server(ports=[vm1_port]) vm2 = self._create_server( ports=[vm2_port], scheduler_hints={'same_host': vm1['server']['server']['id']}) if (self.get_host_for_server(vm1['server']['server']['id']) != self.get_host_for_server(vm2['server']['server']['id'])): raise self.skipException("VMs are running on different hosts") for vm in (vm1, vm2): self.check_connectivity( host=vm['fip']['floating_ip_address'], ssh_user=vm['client'].username, ssh_key=vm['client'].pkey)