# All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import netaddr from neutron_lib import constants from oslo_log import log from paramiko import ssh_exception as ssh_exc from tempest.common import utils from tempest.lib.common.utils import data_utils from tempest.lib import decorators from tempest.lib import exceptions as lib_exc import testtools from neutron_tempest_plugin.common import ssh from neutron_tempest_plugin.common import utils as neutron_utils from neutron_tempest_plugin import config from neutron_tempest_plugin.scenario import base CONF = config.CONF LOG = log.getLogger(__name__) class DHCPTest(base.BaseTempestTestCase): credentials = ['primary', 'admin'] force_tenant_isolation = False @classmethod def resource_setup(cls): super(DHCPTest, cls).resource_setup() cls.rand_name = data_utils.rand_name( cls.__name__.rsplit('.', 1)[-1]) cls.network = cls.create_network(name=cls.rand_name) cls.subnet = cls.create_subnet( network=cls.network, name=cls.rand_name) cls.router = cls.create_router_by_client() cls.create_router_interface(cls.router['id'], cls.subnet['id']) cls.keypair = cls.create_keypair(name=cls.rand_name) cls.security_group = cls.create_security_group(name=cls.rand_name) cls.create_loginable_secgroup_rule(cls.security_group['id']) @utils.requires_ext(extension='extra_dhcp_opt', service='network') @decorators.idempotent_id('58f7c094-1980-4e03-b0d3-6c4dd27217b1') def test_extra_dhcp_opts(self): """This test case tests DHCP extra options configured for Neutron port. Test is checking just extra option "15" which is domain-name according to the RFC 2132: https://tools.ietf.org/html/rfc2132#section-5.3 To test that option, there is spawned VM connected to the port with configured extra_dhcp_opts and test asserts that search domain name is configured inside VM in /etc/resolv.conf file """ test_domain = "test.domain" extra_dhcp_opts = [ {'opt_name': 'domain-name', 'opt_value': '"%s"' % test_domain}] port = self.create_port( network=self.network, name=self.rand_name, security_groups=[self.security_group['id']], extra_dhcp_opts=extra_dhcp_opts) floating_ip = self.create_floatingip(port=port) server = self.create_server( flavor_ref=CONF.compute.flavor_ref, image_ref=CONF.compute.image_ref, key_name=self.keypair['name'], networks=[{'port': port['id']}]) self.wait_for_server_active(server['server']) self.wait_for_guest_os_ready(server['server']) try: ssh_client = ssh.Client( floating_ip['floating_ip_address'], CONF.validation.image_ssh_user, pkey=self.keypair['private_key']) vm_resolv_conf = ssh_client.exec_command( "cat /etc/resolv.conf") self.assertIn(test_domain, vm_resolv_conf) except (lib_exc.SSHTimeout, ssh_exc.AuthenticationException, AssertionError) as error: LOG.debug(error) self._log_console_output([server]) self._log_local_network_status() raise class DHCPPortUpdateTest(base.BaseTempestTestCase): credentials = ['primary', 'admin'] @classmethod def resource_setup(cls): super(DHCPPortUpdateTest, cls).resource_setup() cls.rand_name = data_utils.rand_name( cls.__name__.rsplit('.', 1)[-1]) cls.network = cls.create_network(name=cls.rand_name) cls.router = cls.create_router_by_client() cls.keypair = cls.create_keypair(name=cls.rand_name) cls.security_group = cls.create_security_group(name=cls.rand_name) cls.create_loginable_secgroup_rule(cls.security_group['id']) cls.create_pingable_secgroup_rule(cls.security_group['id']) @testtools.skipUnless( CONF.neutron_plugin_options.firewall_driver == 'ovn', "OVN driver is required to run this test - " "LP#1942794 solution only applied to OVN") @decorators.idempotent_id('8171cc68-9dbb-46ca-b065-17b5b2e26094') def test_modify_dhcp_port_ip_address(self): """Test Scenario 1) Create a network and a subnet with DHCP enabled 2) Modify the default IP address from the subnet DHCP port 3) Create a server in this network and check ssh connectivity For the step 3), the server needs to obtain ssh keys from the metadata Related bug: LP#1942794 """ # create subnet (dhcp is enabled by default) subnet = self.create_subnet(network=self.network, name=self.rand_name) def _get_dhcp_ports(): # in some cases, like ML2/OVS, the subnet port associated to DHCP # is created with device_owner='network:dhcp' dhcp_ports = self.client.list_ports( network_id=self.network['id'], device_owner=constants.DEVICE_OWNER_DHCP)['ports'] # in other cases, like ML2/OVN, the subnet port used for metadata # is created with device_owner='network:distributed' distributed_ports = self.client.list_ports( network_id=self.network['id'], device_owner=constants.DEVICE_OWNER_DISTRIBUTED)['ports'] self.dhcp_ports = dhcp_ports + distributed_ports self.assertLessEqual( len(self.dhcp_ports), 1, msg='Only one port was expected') return len(self.dhcp_ports) == 1 # obtain the dhcp port # in some cases this port is not created together with the subnet, but # immediately after it, so some delay may be needed and that is the # reason why a waiter function is used here self.dhcp_ports = [] neutron_utils.wait_until_true( lambda: _get_dhcp_ports(), timeout=10) dhcp_port = self.dhcp_ports[0] # modify DHCP port IP address old_dhcp_port_ip = netaddr.IPAddress( dhcp_port['fixed_ips'][0]['ip_address']) if str(old_dhcp_port_ip) != subnet['allocation_pools'][0]['end']: new_dhcp_port_ip = str(old_dhcp_port_ip + 1) else: new_dhcp_port_ip = str(old_dhcp_port_ip - 1) self.update_port(port=dhcp_port, fixed_ips=[{'subnet_id': subnet['id'], 'ip_address': new_dhcp_port_ip}]) # create server server = self.create_server( flavor_ref=CONF.compute.flavor_ref, image_ref=CONF.compute.image_ref, key_name=self.keypair['name'], security_groups=[{'name': self.security_group['name']}], networks=[{'uuid': self.network['id']}]) # attach fip to the server self.create_router_interface(self.router['id'], subnet['id']) server_port = self.client.list_ports( network_id=self.network['id'], device_id=server['server']['id'])['ports'][0] fip = self.create_floatingip(port_id=server_port['id']) # check connectivity self.check_connectivity(fip['floating_ip_address'], CONF.validation.image_ssh_user, self.keypair['private_key'])