vmware-nsx-tempest-plugin/vmware_nsx_tempest_plugin/tests/nsxv3/scenario/test_router_nonat_ops.py

487 lines
23 KiB
Python

# Copyright 2017 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import time
from oslo_log import log as logging
from tempest.common import utils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.services import nsxp_client
from vmware_nsx_tempest_plugin.services import nsxv3_client
from vmware_nsx_tempest_plugin.tests.scenario import manager
CONF = config.CONF
LOG = logging.getLogger(__name__)
Floating_IP_tuple = collections.namedtuple('Floating_IP_tuple',
['floating_ip', 'server'])
class TestRouterNoNATOps(manager.NetworkScenarioTest):
"""Test l3 router NoNAT scenario
Test the following two NoNAT scenarios
- Create a NoNAT topology and check end to end traffic.
- Create a NATed topology and check end to end traffic.
Update the router to NoNAT and check end to end traffic.
Note: For NoNAT use case, Enable CONF.network.project_networks_reachable
and add the static route on external VM in order for NSX connected
network to be reachable from external.
route add -net 192.168.1.0 netmask 255.255.255.0 gw 172.20.1.60 eth1
"""
@classmethod
def skip_checks(cls):
super(TestRouterNoNATOps, cls).skip_checks()
if not (CONF.network.project_networks_reachable or
CONF.network.public_network_id):
msg = ('Either project_networks_reachable must be "true", or '
'public_network_id must be defined.')
raise cls.skipException(msg)
for ext in ['router', 'security-group']:
if not utils.is_extension_enabled(ext, 'network'):
msg = "%s extension not enabled." % ext
raise cls.skipException(msg)
@classmethod
def setup_credentials(cls):
cls.set_network_resources()
super(TestRouterNoNATOps, cls).setup_credentials()
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
def setUp(self):
super(TestRouterNoNATOps, self).setUp()
self.keypairs = {}
self.servers = []
self.config_drive = CONF.compute_feature_enabled.config_drive
self.cmgr_adm = self.get_client_manager('admin')
def _setup_network_topo(self, enable_snat=None):
self.security_group = self._create_security_group()
self.network = self._create_network()
self.subnet = self._create_subnet(self.network)
time.sleep(constants.NSX_NETWORK_REALISE_TIMEOUT)
self.router = self._create_router(
router_name=data_utils.rand_name('router-smoke'),
external_network_id=CONF.network.public_network_id,
enable_snat=enable_snat,
routers_client=self.cmgr_adm.routers_client)
self.routers_client.add_router_interface(
self.router['id'], subnet_id=self.subnet['id'])
self.addCleanup(self.routers_client.remove_router_interface,
self.router['id'], subnet_id=self.subnet['id'])
server_name = data_utils.rand_name('server-smoke')
self.server = self._create_server(server_name, self.network)
if enable_snat:
floating_ip = self.create_floating_ip(self.server)
self.floating_ip_tuple = Floating_IP_tuple(floating_ip,
self.server)
def _cleanup_router(self, router):
self._delete_router(router)
def _delete_router(self, router):
body = self.ports_client.list_ports(device_id=router['id'])
interfaces = body['ports']
for i in interfaces:
test_utils.call_and_ignore_notfound_exc(
self.routers_client.remove_router_interface, router['id'],
subnet_id=i['fixed_ips'][0]['subnet_id'])
self.routers_client.delete_router(router['id'])
def _update_router(self, router_id, router_client, ext_gw_info):
router_client.update_router(
router_id=router_id, external_gateway_info=ext_gw_info)
def _create_router(self, router_name=None, admin_state_up=True,
external_network_id=None, enable_snat=None,
routers_client=None,
**kwargs):
ext_gw_info = {}
if external_network_id:
ext_gw_info['network_id'] = external_network_id
if enable_snat is not None:
ext_gw_info['enable_snat'] = enable_snat
if not routers_client:
routers_client = self.routers_client
body = self.routers_client.create_router(
name=router_name,
admin_state_up=admin_state_up, **kwargs)
router = body['router']
# Only admin can configure SNAT parameteters
self._update_router(router['id'], routers_client, ext_gw_info)
self.addCleanup(self._cleanup_router, router)
return router
def _create_subnet(self, network, subnets_client=None, **kwargs):
client = subnets_client or self.subnets_client
body = client.create_subnet(
name=data_utils.rand_name('subnet-smoke'),
network_id=network['id'], tenant_id=network['tenant_id'],
cidr='192.168.1.0/24', ip_version=4, **kwargs)
subnet = body['subnet']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_subnet, subnet['id'])
return subnet
def _create_server(self, name, network, image_id=None):
kwargs = {}
keypair = self.create_keypair()
self.keypairs[keypair['name']] = keypair
security_groups = [{'name': self.security_group['name']}]
kwargs["security_groups"] = security_groups
network = {'uuid': network['id']}
server = self.create_server(name=name, networks=[network],
key_name=keypair['name'],
image_id=image_id,
wait_until='ACTIVE', **kwargs)
self.servers.append(server)
return server
def _get_server_key(self, server):
return self.keypairs[server['key_name']]['private_key']
def _get_server_ip(self, server):
addresses = server['addresses'][self.network['name']]
for address in addresses:
if address['version'] == CONF.validation.ip_version_for_ssh:
return address['addr']
def _list_ports(self, *args, **kwargs):
"""List ports using admin creds """
ports_list = self.admin_manager.ports_client.list_ports(
*args, **kwargs)
return ports_list['ports']
def _check_network_internal_connectivity(self, network,
should_connect=True):
floating_ip, server = self.floating_ip_tuple
# test internal connectivity to the network ports on the network
network_ips = [p['fixed_ips'][0]['ip_address'] for p in
self._list_ports(tenant_id=server['tenant_id'],
network_id=network['id'])
if p['device_owner'].startswith('network')]
self._check_server_connectivity(floating_ip,
network_ips,
should_connect)
def _check_network_vm_connectivity(self, network,
should_connect=True):
floating_ip, server = self.floating_ip_tuple
# test internal connectivity to the other VM on the same network
compute_ips = [p['fixed_ips'][0]['ip_address'] for p in
self._list_ports(tenant_id=server['tenant_id'],
network_id=network['id'])
if p['device_owner'].startswith('compute')]
self._check_server_connectivity(floating_ip,
compute_ips,
should_connect)
def _check_nonat_network_connectivity(self, should_connect=True):
# test internal connectivity to the network ports on the network
network_ips = [p['fixed_ips'][0]['ip_address'] for p in
self._list_ports(tenant_id=self.server['tenant_id'],
network_id=self.network['id'])
if p['device_owner'].startswith('network')]
network_ips.append(self._get_server_ip(self.server))
self._check_fixed_ip_connectivity_from_ext_vm(
network_ips, should_connect=should_connect)
def _check_fixed_ip_connectivity_from_ext_vm(self, fixed_ips,
should_connect=True):
if not CONF.network.project_networks_reachable and should_connect:
return
for ip in fixed_ips:
self.ping_ip_address(ip, should_succeed=should_connect)
def _check_server_connectivity(self, floating_ip, address_list,
should_connect=True):
ip_address = floating_ip['floating_ip_address']
private_key = self._get_server_key(self.server)
ssh_source = self.get_remote_client(ip_address,
private_key=private_key)
for remote_ip in address_list:
if should_connect:
msg = ("Timed out waiting for %s to become "
"reachable") % remote_ip
else:
msg = "ip address %s is reachable" % remote_ip
try:
self.assertTrue(self._check_remote_connectivity
(ssh_source, remote_ip, should_connect),
msg)
except Exception:
LOG.exception("Unable to access %(dest)s via ssh to "
"floating-ip %(src)s",
{'dest': remote_ip, 'src': floating_ip})
raise
def _test_router_nat_when_floating_ips_active_on_network(self):
"""Expect raise condition when floating ips are active on
on network and tenant try to disable NAT
"""
snat = True
self._setup_network_topo(enable_snat=snat)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_policy = self.nsxp.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router_policy, None)
self.assertEqual(nsx_router_policy['resource_type'], 'Tier1')
nsx_router = self.nsx.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router, None)
self.assertEqual(nsx_router['router_type'], 'TIER1')
self._check_network_internal_connectivity(network=self.network)
self._check_network_vm_connectivity(network=self.network)
self._check_nonat_network_connectivity(should_connect=False)
# Update router to disable snat and disassociate floating ip
external_gateway_info = {
'network_id': CONF.network.public_network_id,
'enable_snat': (not snat)}
self.assertRaises(exceptions.BadRequest, self._update_router,
self.router['id'],
self.cmgr_adm.routers_client,
external_gateway_info)
def _test_router_nat_update_when_snat(self):
"""Test update router from NATed to NoNAT scenario"""
snat = True
self._setup_network_topo(enable_snat=snat)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_policy = self.nsxp.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router_policy, None)
self.assertEqual(nsx_router_policy['resource_type'], 'Tier1')
nsx_router = self.nsx.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router, None)
self.assertEqual(nsx_router['router_type'], 'TIER1')
# Check nat rules created correctly
if CONF.network.backend == 'nsxp':
nat_rules = self.nsxp.get_logical_router_nat_rules(
nsx_router_policy)
else:
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
router_adv = self.nsx.get_logical_router_advertisement(nsx_router)
# Check router advertisement is correctly set
adv_msg = "Tier1 router's advertise_nsx_connected_routes is not True"
nat_msg = "Tier1 router's advertise_nat_routes is not False"
if any(d['action'] == 'NO_DNAT' for d in nat_rules):
self.assertTrue(len(nat_rules) == 4)
else:
self.assertTrue(len(nat_rules) == 3)
if CONF.network.backend == 'nsxp':
route_adv_type_nat = 'TIER1_NAT' in nsx_router_policy[
'route_advertisement_types']
self.assertTrue(
route_adv_type_nat,
nat_msg)
route_adv_type_conn = 'TIER1_CONNECTED' in nsx_router_policy[
'route_advertisement_types']
self.assertFalse(route_adv_type_conn, adv_msg)
else:
self.assertTrue(router_adv['advertise_nat_routes'], nat_msg)
self.assertFalse(
router_adv['advertise_nsx_connected_routes'], adv_msg)
self._check_network_internal_connectivity(network=self.network)
self._check_network_vm_connectivity(network=self.network)
self._check_nonat_network_connectivity(should_connect=False)
# To configure SNAT=False, needs to release all the floating ips
floating_ip, server = self.floating_ip_tuple
self._disassociate_floating_ip(floating_ip)
# Update router to disable snat and disassociate floating ip
external_gateway_info = {
'network_id': CONF.network.public_network_id,
'enable_snat': (not snat)}
self._update_router(self.router['id'], self.cmgr_adm.routers_client,
external_gateway_info)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_policy = self.nsxp.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router_policy, None)
self.assertEqual(nsx_router_policy['resource_type'], 'Tier1')
nsx_router = self.nsx.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router, None)
self.assertEqual(nsx_router['router_type'], 'TIER1')
# Check nat rules created correctly
if CONF.network.backend == 'nsxp':
nat_rules = self.nsxp.get_logical_router_nat_rules(
nsx_router_policy)
else:
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
router_adv = self.nsx.get_logical_router_advertisement(nsx_router)
# Check router advertisement is correctly set
if len(nat_rules) == 1:
self.assertTrue(any(d['action'] == 'NO_DNAT' for d in nat_rules))
else:
if len(nat_rules) != 0:
self.assertTrue(len(nat_rules['results']) == 0)
else:
self.assertTrue(len(nat_rules) == 0)
if CONF.network.backend == 'nsxp':
route_adv_type_nat = 'TIER1_NAT' in nsx_router_policy[
'route_advertisement_types']
self.assertFalse(
route_adv_type_nat,
nat_msg)
route_adv_type_conn = 'TIER1_CONNECTED' in nsx_router_policy[
'route_advertisement_types']
self.assertTrue(route_adv_type_conn, adv_msg)
else:
self.assertFalse(router_adv['advertise_nat_routes'], nat_msg)
self.assertTrue(
router_adv['advertise_nsx_connected_routes'], adv_msg)
self._check_nonat_network_connectivity()
def _test_router_nat_update_when_no_snat(self):
"""Test update router from NATed to NoNAT scenario"""
snat = False
self._setup_network_topo(enable_snat=snat)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_policy = self.nsxp.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router_policy, None)
self.assertEqual(nsx_router_policy['resource_type'], 'Tier1')
nsx_router = self.nsx.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router, None)
self.assertEqual(nsx_router['router_type'], 'TIER1')
# Check nat rules created correctly
if CONF.network.backend == 'nsxp':
nat_rules = self.nsxp.get_logical_router_nat_rules(
nsx_router_policy)
else:
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
router_adv = self.nsx.get_logical_router_advertisement(nsx_router)
# Check router advertisement is correctly set
adv_msg = "Tier1 router's advertise_nsx_connected_routes is not True"
nat_msg = "Tier1 router's advertise_nat_routes is not False"
if len(nat_rules) == 1:
self.assertTrue(any(d['action'] == 'NO_DNAT' for d in nat_rules))
else:
self.assertTrue(len(nat_rules) == 0)
if CONF.network.backend == 'nsxp':
route_adv_type_nat = 'TIER1_NAT' in nsx_router_policy[
'route_advertisement_types']
self.assertTrue(
route_adv_type_nat,
nat_msg)
route_adv_type_conn = 'TIER1_CONNECTED' in nsx_router_policy[
'route_advertisement_types']
self.assertFalse(route_adv_type_conn, adv_msg)
else:
self.assertFalse(router_adv['advertise_nat_routes'], nat_msg)
self.assertTrue(
router_adv['advertise_nsx_connected_routes'], adv_msg)
self._check_nonat_network_connectivity()
# Update router to Enable snat and associate floating ip
external_gateway_info = {
'network_id': CONF.network.public_network_id,
'enable_snat': (not snat)}
self._update_router(self.router['id'], self.cmgr_adm.routers_client,
external_gateway_info)
floating_ip = self.create_floating_ip(self.server)
self.floating_ip_tuple = Floating_IP_tuple(floating_ip, self.server)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_policy = self.nsxp.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router_policy, None)
self.assertEqual(nsx_router_policy['resource_type'], 'Tier1')
nsx_router = self.nsx.get_logical_router(
self.router['name'], self.router['id'])
self.assertNotEqual(nsx_router, None)
self.assertEqual(nsx_router['router_type'], 'TIER1')
# Check nat rules created correctly
if CONF.network.backend == 'nsxp':
nat_rules = self.nsxp.get_logical_router_nat_rules(
nsx_router_policy)
else:
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
router_adv = self.nsx.get_logical_router_advertisement(nsx_router)
# Check router advertisement is correctly set
if any(d['action'] == 'NO_DNAT' for d in nat_rules):
self.assertTrue(len(nat_rules) == 4)
else:
self.assertTrue(len(nat_rules) == 3)
if CONF.network.backend == 'nsxp':
route_adv_type_nat = 'TIER1_NAT' in nsx_router_policy[
'route_advertisement_types']
self.assertTrue(
route_adv_type_nat,
nat_msg)
route_adv_type_conn = 'TIER1_CONNECTED' in nsx_router_policy[
'route_advertisement_types']
self.assertFalse(route_adv_type_conn, adv_msg)
else:
self.assertTrue(router_adv['advertise_nat_routes'], nat_msg)
self.assertFalse(
router_adv['advertise_nsx_connected_routes'], adv_msg)
self._check_network_internal_connectivity(network=self.network)
self._check_network_vm_connectivity(network=self.network)
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('5e5bfdd4-0962-47d3-a89b-7ce64322b53e')
def test_router_nat_to_nonat_ops(self):
"""Test update router from NATed to NoNAT scenario"""
self._test_router_nat_update_when_snat()
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('b951f7fb-f2b2-40eb-8bbd-b54bd76ffbe8')
def test_disable_nat_when_floating_ips_active_on_network(self):
"""Expect raise condition when floating ips are active on
on network and tenant try to disable NAT
"""
self._test_router_nat_when_floating_ips_active_on_network()
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('a0274738-d3e7-49db-bf10-a5563610940d')
def test_router_nonat_to_nat_ops(self):
"""Test update router from NoNAT to NATed scenario"""
self._test_router_nat_update_when_no_snat()
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('971e8e8b-3cf2-47a9-ac24-5b19f586731c')
def test_only_admin_can_configure_snat(self):
"""Only admin can configure the SNAT"""
self.security_group = self._create_security_group()
self.network = self._create_network()
self.subnet = self._create_subnet(self.network)
self.assertRaises(exceptions.Forbidden, self._create_router,
router_name=data_utils.rand_name('router-smoke'),
external_network_id=CONF.network.public_network_id,
enable_snat=False)