Add test to verify DHCP port IP address modification

When the IP address from a subnet DHCP port is changed, the route to the
metadata provisioned for that port should be modified accordingly
In case this route is not correctly updated, the server will not be
able to obtain metadata information - hence, it will not get the
ssh authorized keys

This new test only applies to OVN

Related-Bug: #1942794

Change-Id: I76e75db469e2518ed90561430aa9c8c68846dae5
This commit is contained in:
Eduardo Olivares 2022-02-04 10:22:42 +01:00
parent 77052674d9
commit 32a7fbeb0f

View File

@ -11,14 +11,18 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from neutron_lib import constants
from oslo_log import log
from paramiko import ssh_exception as ssh_exc
from tempest.common import utils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
import testtools
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin.common import utils as neutron_utils
from neutron_tempest_plugin import config
from neutron_tempest_plugin.scenario import base
@ -92,3 +96,96 @@ class DHCPTest(base.BaseTempestTestCase):
self._log_console_output([server])
self._log_local_network_status()
raise
class DHCPPortUpdateTest(base.BaseTempestTestCase):
credentials = ['primary', 'admin']
@classmethod
def resource_setup(cls):
super(DHCPPortUpdateTest, cls).resource_setup()
cls.rand_name = data_utils.rand_name(
cls.__name__.rsplit('.', 1)[-1])
cls.network = cls.create_network(name=cls.rand_name)
cls.router = cls.create_router_by_client()
cls.keypair = cls.create_keypair(name=cls.rand_name)
cls.security_group = cls.create_security_group(name=cls.rand_name)
cls.create_loginable_secgroup_rule(cls.security_group['id'])
cls.create_pingable_secgroup_rule(cls.security_group['id'])
@testtools.skipUnless(
CONF.neutron_plugin_options.firewall_driver == 'ovn',
"OVN driver is required to run this test - "
"LP#1942794 solution only applied to OVN")
@decorators.idempotent_id('8171cc68-9dbb-46ca-b065-17b5b2e26094')
def test_modify_dhcp_port_ip_address(self):
"""Test Scenario
1) Create a network and a subnet with DHCP enabled
2) Modify the default IP address from the subnet DHCP port
3) Create a server in this network and check ssh connectivity
For the step 3), the server needs to obtain ssh keys from the metadata
Related bug: LP#1942794
"""
# create subnet (dhcp is enabled by default)
subnet = self.create_subnet(network=self.network, name=self.rand_name)
def _get_dhcp_ports():
# in some cases, like ML2/OVS, the subnet port associated to DHCP
# is created with device_owner='network:dhcp'
dhcp_ports = self.client.list_ports(
network_id=self.network['id'],
device_owner=constants.DEVICE_OWNER_DHCP)['ports']
# in other cases, like ML2/OVN, the subnet port used for metadata
# is created with device_owner='network:distributed'
distributed_ports = self.client.list_ports(
network_id=self.network['id'],
device_owner=constants.DEVICE_OWNER_DISTRIBUTED)['ports']
self.dhcp_ports = dhcp_ports + distributed_ports
self.assertLessEqual(
len(self.dhcp_ports), 1, msg='Only one port was expected')
return len(self.dhcp_ports) == 1
# obtain the dhcp port
# in some cases this port is not created together with the subnet, but
# immediately after it, so some delay may be needed and that is the
# reason why a waiter function is used here
self.dhcp_ports = []
neutron_utils.wait_until_true(
lambda: _get_dhcp_ports(),
timeout=10)
dhcp_port = self.dhcp_ports[0]
# modify DHCP port IP address
old_dhcp_port_ip = netaddr.IPAddress(
dhcp_port['fixed_ips'][0]['ip_address'])
if str(old_dhcp_port_ip) != subnet['allocation_pools'][0]['end']:
new_dhcp_port_ip = str(old_dhcp_port_ip + 1)
else:
new_dhcp_port_ip = str(old_dhcp_port_ip - 1)
self.update_port(port=dhcp_port,
fixed_ips=[{'subnet_id': subnet['id'],
'ip_address': new_dhcp_port_ip}])
# create server
server = self.create_server(
flavor_ref=CONF.compute.flavor_ref,
image_ref=CONF.compute.image_ref,
key_name=self.keypair['name'],
security_groups=[{'name': self.security_group['name']}],
networks=[{'uuid': self.network['id']}])
# attach fip to the server
self.create_router_interface(self.router['id'], subnet['id'])
server_port = self.client.list_ports(
network_id=self.network['id'],
device_id=server['server']['id'])['ports'][0]
fip = self.create_floatingip(port_id=server_port['id'])
# check connectivity
self.check_connectivity(fip['floating_ip_address'],
CONF.validation.image_ssh_user,
self.keypair['private_key'])