323 lines
13 KiB
Python
323 lines
13 KiB
Python
# Copyright (c) 2019 Red Hat
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
from __future__ import absolute_import
|
|
|
|
from oslo_log import log
|
|
import testtools
|
|
|
|
import tobiko
|
|
from tobiko import config
|
|
from tobiko.shell import files
|
|
from tobiko.shell import ping
|
|
from tobiko.shell import sh
|
|
from tobiko.openstack import neutron
|
|
from tobiko.openstack import stacks
|
|
from tobiko.openstack import topology
|
|
|
|
|
|
CONF = config.CONF
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
class FloatingIPTest(testtools.TestCase):
|
|
"""Tests connectivity via floating IPs"""
|
|
|
|
#: Resources stack with floating IP and Nova server
|
|
stack = tobiko.required_setup_fixture(stacks.CirrosServerStackFixture)
|
|
|
|
def test_stack_create_complete(self):
|
|
self.stack.key_pair_stack.wait_for_create_complete()
|
|
self.stack.network_stack.wait_for_create_complete()
|
|
self.stack.wait_for_create_complete()
|
|
|
|
def test_ssh(self):
|
|
"""Test SSH connectivity to floating IP address"""
|
|
hostname = sh.get_hostname(ssh_client=self.stack.ssh_client)
|
|
self.assertEqual(self.stack.server_name.lower(), hostname)
|
|
|
|
def test_ping(self):
|
|
"""Test ICMP connectivity to floating IP address"""
|
|
ping.ping_until_received(
|
|
self.stack.floating_ip_address).assert_replied()
|
|
|
|
# --- test port-security extension ---------------------------------------
|
|
|
|
@neutron.skip_if_missing_networking_extensions('port-security')
|
|
def test_port_security_enabled_port_attribute(self):
|
|
"""Test port security enabled port attribute"""
|
|
self.assertEqual(self.expected_port_security_enabled,
|
|
self.observed_port_security_enabled)
|
|
|
|
@property
|
|
def expected_port_security_enabled(self):
|
|
"""Expected port security enabled value"""
|
|
return self.stack.port_security_enabled
|
|
|
|
@property
|
|
def observed_port_security_enabled(self):
|
|
"""Actual MTU value for internal network"""
|
|
return self.stack.outputs.port_security_enabled
|
|
|
|
# --- test security_group extension --------------------------------------
|
|
|
|
@neutron.skip_if_missing_networking_extensions('security-group')
|
|
def test_security_groups_port_attribute(self):
|
|
"""Test security groups port attribute"""
|
|
self.assertEqual(self.expected_security_groups,
|
|
self.observed_security_groups)
|
|
|
|
@property
|
|
def expected_security_groups(self):
|
|
"""Expected port security groups"""
|
|
return set(self.stack.security_groups)
|
|
|
|
@property
|
|
def observed_security_groups(self):
|
|
"""Actual port security group"""
|
|
return set(self.stack.outputs.security_groups)
|
|
|
|
# --- test net-mtu and net-mtu-writable extensions ------------------------
|
|
|
|
@ping.skip_if_missing_fragment_ping_option
|
|
@neutron.skip_if_missing_networking_extensions('net-mtu')
|
|
def test_ping_with_net_mtu(self):
|
|
"""Test connectivity to floating IP address with MTU sized packets"""
|
|
# Wait until it can reach remote port with maximum-sized packets
|
|
ping.ping(self.stack.floating_ip_address,
|
|
until=ping.RECEIVED,
|
|
packet_size=self.observed_net_mtu,
|
|
fragmentation=False).assert_replied()
|
|
|
|
# Verify it can't reach remote port with over-sized packets
|
|
ping.ping(self.stack.floating_ip_address,
|
|
packet_size=self.observed_net_mtu + 1,
|
|
fragmentation=False,
|
|
count=5,
|
|
check=False).assert_not_replied()
|
|
|
|
@property
|
|
def observed_net_mtu(self):
|
|
"""Actual MTU value for internal network"""
|
|
return self.stack.network_stack.outputs.mtu
|
|
|
|
# --- test l3_ha extension ------------------------------------------------
|
|
|
|
@neutron.skip_if_missing_networking_extensions('l3-ha')
|
|
def test_l3_ha(self):
|
|
"""Test 'mtu' network attribute"""
|
|
gateway = self.stack.network_stack.gateway_details
|
|
self.assertEqual(self.stack.network_stack.ha,
|
|
gateway['ha'])
|
|
|
|
|
|
# --- Test with port security enabled -----------------------------------------
|
|
|
|
@neutron.skip_if_missing_networking_extensions('port-security',
|
|
'security-group')
|
|
class FloatingIPWithPortSecurityFixture(stacks.CirrosServerStackFixture):
|
|
"""Heat stack for testing a floating IP instance with port security"""
|
|
|
|
#: Resources stack with security group to allow ping Nova servers
|
|
security_groups_stack = tobiko.required_setup_fixture(
|
|
stacks.SecurityGroupsFixture)
|
|
|
|
#: Enable port security on internal network
|
|
port_security_enabled = True
|
|
|
|
@property
|
|
def security_groups(self):
|
|
"""List with ICMP security group"""
|
|
return [self.security_groups_stack.ssh_security_group_id]
|
|
|
|
|
|
@neutron.skip_if_missing_networking_extensions('port-security',
|
|
'security-group')
|
|
class FloatingIPWithPortSecurityTest(FloatingIPTest):
|
|
"""Tests connectivity via floating IPs with port security"""
|
|
|
|
#: Resources stack with floating IP and Nova server with port security
|
|
stack = tobiko.required_setup_fixture(FloatingIPWithPortSecurityFixture)
|
|
|
|
def test_ping(self):
|
|
"""Test connectivity to floating IP address"""
|
|
# Wait for server instance to get ready by logging in
|
|
self.stack.ssh_client.connect()
|
|
|
|
# Check can't reach secured port via floating IP
|
|
ping.ping(self.stack.floating_ip_address,
|
|
count=5,
|
|
check=False).assert_not_replied()
|
|
|
|
@ping.skip_if_missing_fragment_ping_option
|
|
@neutron.skip_if_missing_networking_extensions('net-mtu')
|
|
def test_ping_with_net_mtu(self):
|
|
"""Test connectivity to floating IP address"""
|
|
# Wait for server instance to get ready by logging in
|
|
tobiko.setup_fixture(self.stack.ssh_client)
|
|
self.stack.ssh_client.connect()
|
|
|
|
# Verify it can't reach secured port with maximum-sized packets
|
|
ping.ping(self.stack.floating_ip_address,
|
|
packet_size=self.observed_net_mtu,
|
|
fragmentation=False,
|
|
count=5,
|
|
check=False).assert_not_replied()
|
|
|
|
# Verify it can't reach secured port with over-sized packets
|
|
ping.ping(self.stack.floating_ip_address,
|
|
packet_size=self.observed_net_mtu + 1,
|
|
fragmentation=False,
|
|
count=5,
|
|
check=False).assert_not_replied()
|
|
|
|
|
|
# --- Test with ICMP security group -------------------------------------------
|
|
|
|
class FloatingIPWithICMPSecurityGroupFixture(
|
|
FloatingIPWithPortSecurityFixture):
|
|
"""Heat stack for testing a floating IP instance with security groups"""
|
|
|
|
@property
|
|
def security_groups(self):
|
|
"""List with ICMP security group"""
|
|
return [self.security_groups_stack.ssh_security_group_id,
|
|
self.security_groups_stack.icmp_security_group_id]
|
|
|
|
|
|
@neutron.skip_if_missing_networking_extensions('port-security',
|
|
'security-group')
|
|
class FloatingIPWithICMPSecurityGroupTest(FloatingIPTest):
|
|
"""Tests connectivity via floating IP with security ICMP security group"""
|
|
#: Resources stack with floating IP and Nova server to ping
|
|
stack = tobiko.required_setup_fixture(
|
|
FloatingIPWithICMPSecurityGroupFixture)
|
|
|
|
|
|
# --- Test net-mtu-write extension --------------------------------------------
|
|
|
|
@neutron.skip_if_missing_networking_extensions('net-mtu-writable')
|
|
class FloatingIPWithNetMtuWritableFixture(stacks.CirrosServerStackFixture):
|
|
"""Heat stack for testing floating IP with a custom MTU network value"""
|
|
|
|
#: Heat stack for creating internal network with custom MTU value
|
|
network_stack = tobiko.required_setup_fixture(
|
|
stacks.NetworkWithNetMtuWriteStackFixture)
|
|
|
|
|
|
@neutron.skip_if_missing_networking_extensions('net-mtu-writable')
|
|
class FloatingIpWithMtuWritableTest(FloatingIPTest):
|
|
"""Tests connectivity via floating IP with a custom MTU value"""
|
|
|
|
#: Resources stack with floating IP and Nova server
|
|
stack = tobiko.required_setup_fixture(FloatingIPWithNetMtuWritableFixture)
|
|
|
|
def test_net_mtu_write(self):
|
|
"""Test 'mtu' network attribute"""
|
|
self.assertEqual(self.expected_net_mtu, self.observed_net_mtu)
|
|
|
|
@property
|
|
def expected_net_mtu(self):
|
|
"""Expected MTU value for internal network"""
|
|
return self.stack.network_stack.custom_mtu_size
|
|
|
|
|
|
# --- Test la-h3 extension ----------------------------------------------------
|
|
|
|
@neutron.skip_if_missing_networking_extensions('l3-ha')
|
|
@neutron.skip_if_missing_networking_agents(binary='neutron-l3-agent',
|
|
count=2)
|
|
class FloatingIpWithL3HATest(FloatingIPTest):
|
|
#: Resources stack with floating IP and Nova server
|
|
stack = tobiko.required_setup_fixture(stacks.L3haServerStackFixture)
|
|
|
|
|
|
@topology.skip_unless_osp_version('16.1')
|
|
class TestFloatingIPLogging(testtools.TestCase):
|
|
|
|
stack = tobiko.required_setup_fixture(stacks.NetworkStackFixture)
|
|
|
|
def setUp(self):
|
|
super(TestFloatingIPLogging, self).setUp()
|
|
net = self.stack.network_id
|
|
self.port = neutron.create_port(**{'network_id': net})
|
|
self.addCleanup(self.cleanup_port)
|
|
self.fip = neutron.create_floating_ip()
|
|
self.addCleanup(self.cleanup_floatingip)
|
|
log_filename = '/var/log/containers/neutron/server.log'
|
|
self.log_digger = files.MultihostLogFileDigger(filename=log_filename,
|
|
sudo=True)
|
|
for node in topology.list_openstack_nodes(group='controller'):
|
|
self.log_digger.add_host(hostname=node.hostname,
|
|
ssh_client=node.ssh_client)
|
|
|
|
def cleanup_port(self):
|
|
try:
|
|
neutron.delete_port(self.port['id'])
|
|
except neutron.NoSuchPort:
|
|
pass
|
|
|
|
def cleanup_floatingip(self):
|
|
try:
|
|
neutron.delete_floating_ip(self.fip['id'])
|
|
except neutron.NoSuchFIP:
|
|
pass
|
|
|
|
def test_fip_attach_log(self):
|
|
'''Test log that FIP is attached to VM'''
|
|
pattern = f'{self.fip["id"]}.*associated'
|
|
self.log_digger.find_lines(pattern=pattern)
|
|
neutron.update_floating_ip(
|
|
self.fip['id'], **{'port_id': self.port['id']})
|
|
new_logs = self.log_digger.find_new_lines(pattern=pattern)
|
|
self.assertEqual(len(new_logs), 1)
|
|
self.assertIn(self.port['id'], new_logs[0][1])
|
|
self.assertIn(self.fip['floating_ip_address'], new_logs[0][1])
|
|
|
|
def test_fip_detach_log(self):
|
|
'''Test log that FIP is dettached from VM'''
|
|
neutron.update_floating_ip(
|
|
self.fip['id'], **{'port_id': self.port['id']})
|
|
pattern = f'{self.fip["id"]}.*disassociated'
|
|
self.log_digger.find_lines(pattern=pattern)
|
|
neutron.update_floating_ip(self.fip['id'], **{'port_id': None})
|
|
new_logs = self.log_digger.find_new_lines(pattern=pattern)
|
|
self.assertEqual(len(new_logs), 1)
|
|
self.assertIn(self.port['id'], new_logs[0][1])
|
|
self.assertIn(self.fip['floating_ip_address'], new_logs[0][1])
|
|
|
|
def test_fip_delete_detach_log(self):
|
|
'''Test log that FIP is dettached from VM if FIP is deleted'''
|
|
neutron.update_floating_ip(
|
|
self.fip['id'], **{'port_id': self.port['id']})
|
|
pattern = f'{self.fip["id"]}.*disassociated'
|
|
self.log_digger.find_lines(pattern=pattern)
|
|
neutron.delete_floating_ip(self.fip['id'])
|
|
new_logs = self.log_digger.find_new_lines(pattern=pattern)
|
|
self.assertEqual(len(new_logs), 1)
|
|
self.assertIn(self.port['id'], new_logs[0][1])
|
|
self.assertIn(self.fip['floating_ip_address'], new_logs[0][1])
|
|
|
|
def test_port_delete_fip_detach_log(self):
|
|
'''Test log that FIP is dettached from port if port is deleted'''
|
|
neutron.update_floating_ip(
|
|
self.fip['id'], **{'port_id': self.port['id']})
|
|
pattern = f'{self.fip["id"]}.*disassociated'
|
|
self.log_digger.find_lines(pattern=pattern)
|
|
neutron.delete_port(self.port['id'])
|
|
new_logs = self.log_digger.find_new_lines(pattern=pattern)
|
|
self.assertEqual(len(new_logs), 1)
|
|
self.assertIn(self.port['id'], new_logs[0][1])
|
|
self.assertIn(self.fip['floating_ip_address'], new_logs[0][1])
|