vmware-nsx-tempest-plugin/vmware_nsx_tempest_plugin/tests/api/test_vpn.py

940 lines
43 KiB
Python

# Copyright 2018 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest.lib import exceptions as lib_exc
from tempest import test
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.lib import feature_manager
from vmware_nsx_tempest_plugin.services import nsxp_client
from vmware_nsx_tempest_plugin.services import nsxv3_client
from oslo_log import log as logging
CONF = config.CONF
CONF.validation.auth_method = 'None'
LOG = logging.getLogger(__name__)
class TestVpnOps(feature_manager.FeatureManager):
@classmethod
def skip_checks(cls):
super(TestVpnOps, cls).skip_checks()
if not test.is_extension_enabled('vpnaas', 'network'):
msg = "Extension provider-security-group is not enabled."
raise cls.skipException(msg)
@classmethod
def setup_credentials(cls):
cls.set_network_resources()
cls.admin_mgr = cls.get_client_manager('admin')
super(TestVpnOps, cls).setup_credentials()
@classmethod
def setup_clients(cls):
"""
Create various client connections. Such as NSX.
"""
super(TestVpnOps, cls).setup_clients()
cls.nsx_client = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
cls.nsxp_client = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
def create_network_topo(self, enable_snat="False", cidr=None):
kwargs = {}
network = \
self.create_topology_network(
network_name="vpn-network",
networks_client=self.admin_mgr.networks_client)
router_name = 'vpn-router'
# Create router topo
kwargs["enable_snat"] = enable_snat
routers_client = self.admin_mgr.routers_client
router = self.create_topology_router(
router_name, routers_client=routers_client, **kwargs)
subnet_name = 'vpn-subnet'
# Create subnet topo
subnets_client = self.admin_mgr.subnets_client
if cidr is None:
subnet = self.create_topology_subnet(subnet_name, network,
router_id=router['id'],
subnets_client=subnets_client,
routers_client=routers_client
)
else:
subnet = self.create_topology_subnet(subnet_name, network,
router_id=router['id'],
subnets_client=subnets_client,
routers_client=routers_client,
cidr=cidr
)
return dict(network=network, subnet=subnet, router=router)
def create_vpn_basic_topo_endpoint_group(
self, network_topology, name=None, ike=None, pfs=constants.PFS,
encryption_algorithm=constants.ENCRYPTION_ALGO,
lifetime=constants.LIFETIME,
peer_address=constants.PEER_ADDRESS,
peer_id=constants.PEER_ID,
site_connection_state=constants.SITE_CONNECTION_STATE):
# Create network topo
kwargs = {}
subnet = network_topology['subnet']
router = network_topology['router']
kwargs['vpnservice'] = dict(router_id=router['id'],
admin_state_up=site_connection_state,
name="vpn")
vpn_service = self.vpnaas_client.create_vpnservice(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_vpnservice,
vpn_service.get('vpnservice')['id'])
self.vpnaas_client.list_vpnservices()
if ike is None:
kwargs = {}
if lifetime is not None:
kwargs[
'ikepolicy'] = \
dict(name=data_utils.rand_name("ike-policy-"), pfs=pfs,
encryption_algorithm=encryption_algorithm,
lifetime=lifetime)
ike = self.vpnaas_client.create_ikepolicy(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_ikepolicy,
ike.get('ikepolicy')['id'])
kwargs[
'ipsecpolicy'] = dict(name=data_utils.rand_name("ipsec-policy-"),
pfs=pfs)
ipsec = self.vpnaas_client.create_ipsecpolicy(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_ipsecpolicy,
ipsec.get('ipsecpolicy')['id'])
kwargs = {}
name = data_utils.rand_name("local-endpoint-")
kwargs['endpoint_group'] = dict(endpoints=[subnet['id']],
type="subnet",
name=name)
local_endpoint = self.vpnaas_client.create_endpoint_group(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_endpoint_group,
local_endpoint['endpoint_group']['id'])
kwargs = {}
name = data_utils.rand_name("remote_endpoint-")
kwargs['endpoint_group'] = dict(endpoints=["40.20.1.0/24"],
type="cidr",
name=name)
remote_endpoint = self.vpnaas_client.create_endpoint_group(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_endpoint_group,
remote_endpoint['endpoint_group']['id'])
kwargs = {}
if name is not None:
name = "site-conn-" + name
else:
name = "site-conn"
kwargs[
"ipsec_site_connection"] = \
dict(vpnservice_id=vpn_service.get('vpnservice')['id'],
psk="secret",
admin_state_up=site_connection_state,
peer_ep_group_id=remote_endpoint['endpoint_group']['id'],
local_ep_group_id=local_endpoint['endpoint_group']['id'],
ikepolicy_id=ike.get(
'ikepolicy')['id'],
ipsecpolicy_id=ipsec.get(
'ipsecpolicy')['id'],
peer_address=peer_address,
peer_id=peer_id, name=name)
endpoint = self.vpnaas_client.create_ipsec_site_connection(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_ipsec_site_connection,
endpoint.get("ipsec_site_connection")['id'])
return dict(endpoint=endpoint, vpn_service=vpn_service,
ike=ike, ipsec=ipsec)
def create_vpn_basic_topo(
self, network_topology, name=None, ike=None, pfs=constants.PFS,
encryption_algorithm=constants.ENCRYPTION_ALGO,
lifetime=constants.LIFETIME,
peer_address=constants.PEER_ADDRESS,
peer_id=constants.PEER_ID,
site_connection_state=constants.SITE_CONNECTION_STATE):
# Create network topo
kwargs = {}
subnet = network_topology['subnet']
router = network_topology['router']
kwargs['vpnservice'] = dict(subnet_id=subnet['id'],
router_id=router['id'],
admin_state_up=site_connection_state,
name="vpn")
vpn_service = self.vpnaas_client.create_vpnservice(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_vpnservice,
vpn_service.get('vpnservice')['id'])
self.vpnaas_client.list_vpnservices()
if ike is None:
kwargs = {}
if lifetime is not None:
kwargs[
'ikepolicy'] = \
dict(name=data_utils.rand_name("ike-policy-"), pfs=pfs,
encryption_algorithm=encryption_algorithm,
lifetime=lifetime)
ike = self.vpnaas_client.create_ikepolicy(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_ikepolicy,
ike.get('ikepolicy')['id'])
kwargs = {}
kwargs[
'ipsecpolicy'] = dict(name=data_utils.rand_name("ipsec-policy-"),
pfs=pfs)
ipsec = self.vpnaas_client.create_ipsecpolicy(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_ipsecpolicy,
ipsec.get('ipsecpolicy')['id'])
kwargs = {}
if name is not None:
name = "site-conn-" + name
else:
name = "site-conn"
kwargs[
"ipsec_site_connection"] = \
dict(vpnservice_id=vpn_service.get('vpnservice')['id'],
psk="secret",
admin_state_up=site_connection_state, peer_cidrs=[
"10.0.1.0/24"],
ikepolicy_id=ike.get(
'ikepolicy')['id'],
ipsecpolicy_id=ipsec.get(
'ipsecpolicy')['id'],
peer_address=peer_address,
peer_id=peer_id, name=name)
endpoint = self.vpnaas_client.create_ipsec_site_connection(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_ipsec_site_connection,
endpoint.get("ipsec_site_connection")['id'])
return dict(endpoint=endpoint, vpn_service=vpn_service,
ike=ike, ipsec=ipsec)
@decorators.idempotent_id('7022b98f-f006-43c0-a1f7-5926035eb2b9')
def test_create_vpnservice_long_description(self):
description = 'x' * 256
network_topology = self.create_network_topo()
subnet = network_topology['subnet']
router = network_topology['router']
kwargs = {}
kwargs['vpnservice'] = dict(subnet_id=subnet['id'],
router_id=router['id'],
admin_state_up="True",
description=description,
name="vpn")
self.assertRaises(
lib_exc.BadRequest, self.vpnaas_client.create_vpnservice, **kwargs)
@decorators.idempotent_id('a4b0112d-2ab5-4b02-b0ab-562ae2cd4078')
def test_create_vpnservice_with_router_enable_snat(self):
network_topology = self.create_network_topo(enable_snat="True")
subnet = network_topology['subnet']
router = network_topology['router']
kwargs = {}
kwargs['vpnservice'] = dict(subnet_id=subnet['id'],
router_id=router['id'],
admin_state_up="True",
name="vpn")
vpn_service = self.vpnaas_client.create_vpnservice(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_vpnservice,
vpn_service.get('vpnservice')['id'])
if CONF.network.backend == 'nsxp':
nsx_router = self.nsxp_client.get_logical_router(router["name"],
router["id"])
vpn_service = self.nsxp_client.get_vpn_service(router["name"],
router["id"])
self.assertEqual(vpn_service[0].get('tags')[0].get("tag"),
nsx_router["id"])
self.assertEqual(vpn_service[0].get('resource_type'),
'IPSecVpnService')
else:
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_services()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual((vpn.get('tags')[0]).get('tag'),
rtr["id"])
self.assertEqual(vpn['resource_type'],
'IPSecVPNService')
break
break
@decorators.idempotent_id('a68cd562-1df1-44e6-bb8b-f1ed7a1f0e2e')
def test_vpn_basic_ops(self):
"""
Test vpnaasv2 api to create icmp rule/policy/group and update it and
verifying its values
"""
network_topology = self.create_network_topo()
self.create_vpn_basic_topo(network_topology)
@decorators.idempotent_id('5802b98f-f006-43c0-a1f7-5926035eb2b9')
def test_try_to_delete_vpn_service_when_site_connection_active(self):
network_topology = self.create_network_topo(cidr="37.5.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test")
vpn_service = vpn_topo['vpn_service']
self.assertRaises(
lib_exc.Conflict, self.vpnaas_client.delete_vpnservice,
vpn_service.get('vpnservice')['id'])
@decorators.idempotent_id('4602b98f-f006-43c0-a1f7-5926035eb2b9')
def test_try_to_delete_ike_when_site_connection_active(self):
network_topology = self.create_network_topo(cidr="37.6.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test")
ike = vpn_topo['ike']
self.assertRaises(
lib_exc.Conflict, self.vpnaas_client.delete_ikepolicy,
ike.get('ikepolicy')['id'])
@decorators.idempotent_id('4502b98f-f006-43c0-a1f7-5926035eb2b9')
def test_try_to_delete_ipsec_when_site_connection_active(self):
network_topology = self.create_network_topo(cidr="37.9.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test")
ipsec = vpn_topo['ipsec']
self.assertRaises(
lib_exc.Conflict, self.vpnaas_client.delete_ipsecpolicy,
ipsec.get('ipsecpolicy')['id'])
@decorators.idempotent_id('1902b98f-f006-43c0-a1f7-5926035eb2b9')
def test_delete_vpn_ops(self):
network_topology = self.create_network_topo(cidr="37.10.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test")
ipsecpolicy = vpn_topo['ipsec']
vpnservice = vpn_topo['vpn_service']
ikepolicy = vpn_topo['ike']
endpoint = vpn_topo['endpoint']
self.vpnaas_client.delete_ipsec_site_connection(
endpoint.get("ipsec_site_connection")['id'])
self.vpnaas_client.delete_ikepolicy(ikepolicy.get('ikepolicy')['id'])
self.vpnaas_client.delete_ipsecpolicy(
ipsecpolicy.get('ipsecpolicy')['id'])
self.vpnaas_client.delete_vpnservice(
vpnservice.get('vpnservice')['id'])
@decorators.idempotent_id('2022b98f-f006-43c0-a1f7-5926035eb2b9')
def test_peer_endpoint_delete_at_the_backend(self):
flag = 0
network_topology = self.create_network_topo(cidr="37.2.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology)
site = vpn_topo["endpoint"]
dpd_info = self.nsx_client.get_dpd_profiles()
peer_endpoints = self.nsx_client.get_peer_endpoints()
for dpd in dpd_info:
if dpd is not None and dpd.get("tags"):
if dpd.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
dpd_profile = dpd["id"]
break
continue
for end in peer_endpoints:
if end.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
if end['dpd_profile_id'] == dpd_profile:
flag = 1
self.assertEqual(end['peer_id'], '172.24.4.12')
self.assertEqual(
"IPSecVPNPeerEndpoint",
end.get("resource_type"))
break
if flag == 0:
raise Exception('dpd_profile_id doesnt match with endpoint_id')
if flag == 1:
break
flag = 0
endpoint = vpn_topo['endpoint']
self.vpnaas_client.delete_ipsec_site_connection(
endpoint.get("ipsec_site_connection")['id'])
peer_endpoints = self.nsx_client.get_peer_endpoints()
if 'result_count' in peer_endpoints.keys() and \
peer_endpoints.get('result_count') == 0:
pass
else:
for end in peer_endpoints:
if end.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
if end['dpd_profile_id'] == dpd_profile:
flag = 1
self.assertEqual(end['peer_id'], '172.24.4.12')
self.assertEqual(
"IPSecVPNPeerEndpoint",
end.get("resource_type"))
if flag == 1:
raise Exception('rtr_id doesnt match with endpoint_id')
@decorators.idempotent_id('1092b98f-f006-43c0-a1f7-5926035eb2b9')
def test_local_endpoint_delete_at_the_backend(self):
flag = 0
network_topology = self.create_network_topo(cidr="37.14.0.0/24")
router = network_topology["router"]
vpn_topo = self.create_vpn_basic_topo(network_topology)
if CONF.network.backend == 'nsxp':
local_endpoint = self.nsxp_client.get_local_endpoint(
router["name"], router["id"])
if local_endpoint[0].get('tags')[0].get("tag") == \
router["id"]:
self.assertIsNotNone(local_endpoint[0].get("local_address"))
self.assertIsNotNone(local_endpoint[0].get("local_id"))
flag = 1
else:
local_endpoints = self.nsx_client.get_local_endpoints()
for local in local_endpoints:
if local is not None and local.get("tags"):
if local.get("tags")[0]["tag"] == \
router["id"]:
self.assertIsNotNone(local["local_address"])
self.assertIsNotNone(local["local_id"])
flag = 1
break
if flag == 1:
pass
else:
raise Exception('rtr_id doesnt match with endpoint_id')
endpoint = vpn_topo['endpoint']
vpnservice = vpn_topo['vpn_service']
self.vpnaas_client.delete_ipsec_site_connection(
endpoint.get("ipsec_site_connection")['id'])
self.vpnaas_client.delete_vpnservice(
vpnservice.get('vpnservice')['id'])
flag = 0
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
try:
local_endpoints = self.nsxp_client.get_local_endpoint(
router["name"], router["id"])
self.assertNone(local_endpoints)
except IndexError:
raise Exception('local endpoint not deleted from backend')
else:
local_endpoints = self.nsx_client.get_local_endpoints()
if 'result_count' in local_endpoints.keys() and \
local_endpoints.get('result_count') == 0:
pass
else:
for local in local_endpoints:
if local is not None and local.get("tags"):
if local.get("tags")[0]["tag"] == \
router["id"]:
self.assertIsNotNone(local["local_address"])
self.assertIsNotNone(local["local_id"])
flag = 1
break
if flag == 1:
raise Exception('local endpoint not deleted from backend')
@decorators.idempotent_id('7022b98f-f006-43c0-a1f7-5926035eb212')
def test_vpn_service_delete_at_the_backend(self):
flag = 0
network_topology = self.create_network_topo(cidr="37.12.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test-delete")
router = network_topology["router"]
if CONF.network.backend == 'nsxp':
nsx_router = self.nsxp_client.get_logical_router(router["name"],
router["id"])
vpn_service = self.nsxp_client.get_vpn_service(router["name"],
router["id"])
self.assertEqual(vpn_service[0].get('tags')[0].get("tag"),
nsx_router["id"])
self.assertEqual(vpn_service[0].get('resource_type'),
'IPSecVpnService')
else:
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_services()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'],
'IPSecVPNService')
break
break
vpnservice = vpn_topo['vpn_service']
endpoint = vpn_topo['endpoint']
self.vpnaas_client.delete_ipsec_site_connection(
endpoint.get("ipsec_site_connection")['id'])
self.vpnaas_client.delete_vpnservice(
vpnservice.get('vpnservice')['id'])
if CONF.network.backend == 'nsxp':
rtr_name = network_topology["router"]["name"]
rtr_id = network_topology["router"]["id"]
router = self.nsxp_client.get_logical_router(rtr_name, rtr_id)
vpn_service = self.nsxp_client.get_vpn_service(rtr_name, rtr_id)
if len(vpn_service) != 0:
flag = 1
else:
vpn_services = self.nsx_client.get_vpn_services()
if 'result_count' in vpn_services.keys() and \
vpn_services['result_count'] == 0:
pass
else:
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'],
rtr["id"])
self.assertEqual(vpn['resource_type'],
'IPSecVPNService')
flag = 1
break
if flag == 1:
break
if flag == 1:
raise Exception('vpn service not deleted from backend')
@decorators.idempotent_id('747c5864-409f-4ac4-bdbb-b74d7c618504')
def test_vpn_dpd_ike_ipsec_check_at_the_backend(self):
network_topology = self.create_network_topo(cidr="39.0.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test")
site = vpn_topo["endpoint"]
if CONF.network.backend == 'nsxp':
dpd_info = self.nsxp_client.get_dpd_profiles()
else:
dpd_info = self.nsx_client.get_dpd_profiles()
for dpd in dpd_info:
if dpd is not None and dpd.get("tags"):
if dpd.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
self.assertIn(
"site-conn-test-dpd-profile",
dpd["display_name"])
self.assertEqual(
"os-vpn-connection-id",
dpd.get("tags")[0]["scope"])
break
if CONF.network.backend == 'nsxp':
ike_info = self.nsxp_client.get_ike_profiles()
else:
ike_info = self.nsx_client.get_ike_profiles()
for ike in ike_info:
if ike is not None and ike.get("tags"):
if ike.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
self.assertEqual(
ike.get('resource_type'),
"IPSecVPNIKEProfile")
self.assertEqual(
ike.get('encryption_algorithms'),
[u'AES_128'])
self.assertEqual(ike.get('ike_version'), 'IKE_V1')
self.assertEqual(ike.get('dh_groups'), [u'GROUP14'])
break
if CONF.network.backend == 'nsxp':
ipsec_info = self.nsxp_client.get_ipsec_profiles()
else:
ipsec_info = self.nsx_client.get_ipsec_profiles()
for ipsec in ipsec_info:
if ipsec is not None and ipsec.get("tags"):
if ipsec.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
self.assertEqual(
ipsec.get('resource_type'),
"PolicyBasedIPSecVPNSession")
cidr = vpn_topo['endpoint'].get(
'ipsec_site_connection')['peer_cidrs']
peer_cidr = [{u'subnet': u'%s' % cidr[0]}]
self.assertEqual(
ipsec.get('policy_rules')[0]['destinations'],
peer_cidr)
self.assertEqual(
"os-vpn-connection-id",
ipsec.get("tags")[0]["scope"])
break
@decorators.idempotent_id('cdb7333a-94c0-487f-9602-3bd990128a0f')
def test_vpn_dpd_ike_ipsec_update_at_the_backend(self):
kwargs = {}
kwargs[
'ikepolicy'] = dict(name=data_utils.rand_name("ike-policy-"),
pfs=constants.PFS,
encryption_algorithm=constants.ENCRYPTION_ALGO,
lifetime=constants.LIFETIME)
ike = self.vpnaas_client.create_ikepolicy(**kwargs)
kwargs['ikepolicy'] = \
dict(name="ike-new", ike_version="v2",
encryption_algorithm=constants.ENCRYPTION_ALGO_256,
auth_algorithm=constants.AUTH_ALGO_256)
self.vpnaas_client.update_ikepolicy(ike['ikepolicy']['id'],
**kwargs)
network_topology = self.create_network_topo(cidr="37.1.0.0/24")
vpn_topo = self.create_vpn_basic_topo(
network_topology, "test-2", ike=ike)
if CONF.network.backend == 'nsxp':
ike_info = self.nsxp_client.get_ike_profiles()
else:
ike_info = self.nsx_client.get_ike_profiles()
site = vpn_topo["endpoint"]
for ike_p in ike_info:
if ike_p is not None and ike_p.get("tags"):
if ike_p.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
self.assertEqual(ike_p.get('display_name'), "ike-new")
self.assertEqual(ike_p.get('ike_version'), "IKE_V2")
self.assertEqual(
ike_p.get('encryption_algorithms'),
[u'AES_256'])
self.assertEqual(
ike_p.get('digest_algorithms'),
[u'SHA2_256'])
break
@decorators.idempotent_id('a0a87543-fb0a-4c7a-897f-b5cd835de843')
def test_vpn_service_update_at_the_backend(self):
flag = 0
network_topology = self.create_network_topo(cidr="37.1.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test-4")
router = network_topology['router']
kwargs = {}
kwargs['vpnservice'] = dict(name="vpn-new", admin_state_up='false')
self.vpnaas_client.update_vpnservice(
vpn_topo['vpn_service'].get('vpnservice')['id'],
**kwargs)
if CONF.network.backend == 'nsxp':
ipsec_session = self.nsxp_client.get_ipsec_session(router["name"],
router["id"])
self.assertEqual(ipsec_session[0].get('resource_type'),
'PolicyBasedIPSecVpnSession')
self.assertEqual(ipsec_session[0].get('enabled'), False)
else:
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_service()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'],
'IPSecVPNService')
self.assertEqual(vpn['enabled'], True)
flag = 1
break
if flag == 1:
break
# ToDO testcase need to add
# def test_vpn_site_update_at_the_backend
@decorators.idempotent_id('00c8679d-68cd-49a9-b8b7-0dba1b675298')
def test_vpn_service_check_at_the_backend(self):
flag = 0
network_topology = self.create_network_topo(cidr="37.1.0.0/24")
self.create_vpn_basic_topo(network_topology, "test-2")
router = network_topology["router"]
if CONF.network.backend == 'nsxp':
nsx_router = self.nsxp_client.get_logical_router(router["name"],
router["id"])
vpn_service = self.nsxp_client.get_vpn_service(router["name"],
router["id"])
self.assertEqual(vpn_service[0].get('tags')[0].get("tag"),
nsx_router["id"])
self.assertEqual(vpn_service[0].get('resource_type'),
'IPSecVpnService')
else:
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_services()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'],
'IPSecVPNService')
flag = 1
break
if flag == 1:
break
tunnel_profiles = self.nsx_client.get_tunnel_profiles()
for tunnel in tunnel_profiles:
if tunnel is not None and tunnel.get("tags"):
if tunnel.get("tags")[0]["tag"] == tunnel['id']:
self.assertEqual(
"IPSecVPNTunnelProfile",
tunnel.get("resource_type"))
self.assertEqual("ESP",
tunnel.get("transform_protocol"))
self.assertEqual(
[u'AES_128'],
tunnel.get("encryption_algorithms"))
self.assertEqual(
"TUNNEL_MODE",
tunnel.get("encapsulation_mode"))
self.assertEqual(tunnel.get('dh_groups'),
[u'GROUP14'])
break
@decorators.idempotent_id('f446a67a-4d09-4d5f-adff-cc497882d866')
def test_vpn_site_connection_at_the_backend(self):
flag = 1
network_topology = self.create_network_topo(cidr="37.2.0.0/24")
router = network_topology["router"]
vpn_topo = self.create_vpn_basic_topo(network_topology)
site = vpn_topo["endpoint"]
if CONF.network.backend == 'nsxp':
dpd_info = self.nsxp_client.get_dpd_profiles()
else:
dpd_info = self.nsx_client.get_dpd_profiles()
for dpd in dpd_info:
if dpd is not None and dpd.get("tags"):
if dpd.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
dpd_profile = dpd["id"]
break
continue
if CONF.network.backend == 'nsxv3':
peer_endpoints = self.nsx_client.get_peer_endpoints()
for end in peer_endpoints:
if end.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
if end['dpd_profile_id'] == dpd_profile:
flag = 1
self.assertEqual(end['peer_id'], '172.24.4.12')
self.assertEqual(end.get("resource_type"),
"IPSecVPNPeerEndpoint")
break
if flag == 0:
raise Exception('dpd_profile_id doesnt match \
with endpoint_id')
if flag == 1:
break
flag = 0
if CONF.network.backend == 'nsxp':
local_endpoint = self.nsxp_client.get_local_endpoint(
router["name"], router["id"])
if local_endpoint[0].get('tags')[0].get("tag") == \
router["id"]:
self.assertIsNotNone(local_endpoint[0].get("local_address"))
self.assertIsNotNone(local_endpoint[0].get("local_id"))
flag = 1
else:
local_endpoints = self.nsx_client.get_local_endpoints()
for local in local_endpoints:
if local is not None and local.get("tags"):
if local.get("tags")[0]["tag"] == \
network_topology["router"]["id"]:
self.assertIsNotNone(local["local_address"])
self.assertIsNotNone(local["local_id"])
flag = 1
break
if flag == 1:
pass
else:
raise Exception('rtr_id doesnt match with endpoint_id')
@decorators.idempotent_id('eb953c67-8d8a-4ac5-b7c3-3c18270b50ce')
def test_vpn_basic_invalid_pfs(self):
network_topology = self.create_network_topo()
try:
self.create_vpn_basic_topo(network_topology, pfs="group5")
except exceptions.ServerFault:
LOG.info(
"Invalid VPN configuration: Unsupported pfs: "
" group5 in IKE policy.")
pass
@decorators.idempotent_id('1c034ea4-d8e6-41d0-b963-33dd5053476b')
def test_vpn_basic_algo_aes256(self):
network_topology = self.create_network_topo()
try:
self.create_vpn_basic_topo(
network_topology,
encryption_algorithm="aes-256")
except exceptions.ServerFault:
LOG.info(
"Invalid VPN configuration: Unsupported algo: aes-256 "
" is not supported.")
pass
@decorators.idempotent_id('885e1cda-b21d-4dec-8751-2a1f4e91773e')
def test_vpn_basic_invalid_algo(self):
kwargs = {}
try:
kwargs[
'ipsecpolicy'] = \
dict(name=data_utils.rand_name("ipsec-policy-"),
pfs="group14", encryption_algorithm="aes-512")
self.vpnaas_client.create_ipsecpolicy(**kwargs)
except exceptions.BadRequest:
pass
@decorators.idempotent_id('1e6f2f25-de83-4ee2-a30f-0f833da0c741')
def test_vpn_basic_invalid_pfs_value(self):
kwargs = {}
try:
kwargs[
'ipsecpolicy'] = \
dict(name=data_utils.rand_name("ipsec-policy-"),
pfs="group-14")
self.vpnaas_client.create_ipsecpolicy(**kwargs)
except exceptions.BadRequest:
pass
@decorators.idempotent_id('f4bea30b-76df-4dc2-a624-20621b8e0ef7')
def test_vpn_site_conenction_update_ops(self):
network_topo = self.create_network_topo(cidr="34.0.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topo)
site = vpn_topo['endpoint']
kwargs = {}
kwargs["ipsec_site_connection"] = dict(psk="new-secret")
self.vpnaas_client.update_ipsec_site_connections(
site.get('ipsec_site_connection')['id'],
**kwargs)
site_data = self.vpnaas_client.show_ipsec_site_connections(
endpoint_id=site.get('ipsec_site_connection')['id'])
self.assertEqual(
"new-secret",
site_data['ipsec_site_connection']['psk'])
kwargs = {}
kwargs["ipsec_site_connection"] = dict(
admin_state_up='False',
description="New Vpn site")
self.vpnaas_client.update_ipsec_site_connections(
site.get('ipsec_site_connection')['id'],
**kwargs)
site_data = self.vpnaas_client.show_ipsec_site_connections(
endpoint_id=site.get('ipsec_site_connection')['id'])
self.assertEqual(
False,
site_data[
'ipsec_site_connection'][
'admin_state_up'])
self.assertEqual(
"New Vpn site",
site_data[
'ipsec_site_connection'][
'description'])
@decorators.idempotent_id('8fbf9280-d154-425f-ad26-0a1250e0dd91')
def test_vpn_site_conenction_wrong_dpd_info(self):
network_topo = self.create_network_topo(cidr="35.0.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topo)
site = vpn_topo['endpoint']
kwargs = {}
kwargs["ipsec_site_connection"] = dict(action="disabled", timeout=1)
self.assertRaises(
lib_exc.BadRequest,
self.vpnaas_client.update_ipsec_site_connections,
site.get('ipsec_site_connection')['id'], **kwargs
)
kwargs["ipsec_site_connection"] = dict(action="restart-by-peer")
self.assertRaises(
lib_exc.BadRequest,
self.vpnaas_client.update_ipsec_site_connections,
site.get('ipsec_site_connection')['id'], **kwargs
)
kwargs["ipsec_site_connection"] = \
{"dpd": {"action": "hold", "timeout": 300}}
site_data = self.vpnaas_client.update_ipsec_site_connections(
site.get('ipsec_site_connection')['id'], **kwargs)
self.assertEqual(
"hold",
site_data['ipsec_site_connection'].get("dpd")["action"])
@decorators.idempotent_id('ea4fedae-4727-4524-a74a-0078d7fbfdd9')
def test_vpn_basic_update_ops(self):
network_topo = self.create_network_topo()
vpn_topo = self.create_vpn_basic_topo(network_topo)
vpn_service = vpn_topo['vpn_service']
kwargs = {}
kwargs['vpnservice'] = dict(admin_state_up='false')
self.vpnaas_client.update_vpnservice(
vpnservice_id=vpn_service.get('vpnservice')['id'],
**kwargs)
kwargs['vpnservice'] = dict(admin_state_up='true', description="vpn")
self.vpnaas_client.update_vpnservice(
vpnservice_id=vpn_service.get('vpnservice')['id'],
**kwargs)
@decorators.idempotent_id('d576c487-e7d5-4698-8a17-ea5521607675')
def test_vpn_ike_policy_update(self):
network_topo = self.create_network_topo(cidr="36.0.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topo)
try:
kwargs = {}
kwargs['ikepolicy'] = dict(pfs="group5")
self.vpnaas_client.update_ikepolicy(
vpn_topo['ike'].get('ikepolicy')['id'],
**kwargs)
except exceptions.Conflict:
LOG.info(
"IKEPolicy is in use by existing IPsecSiteConnection and "
" can't be updated or deleted")
@decorators.idempotent_id('d576c487-e7d5-4698-8a17-ea4521907675')
def test_vpn_endpoint_group(self):
network_topo = self.create_network_topo(cidr="37.0.0.0/24")
router = network_topo['router']
self.create_vpn_basic_topo_endpoint_group(network_topo)
ipsec_session = self.nsxp_client.get_ipsec_session(router["name"],
router["id"])
self.assertEqual(ipsec_session[0].get('resource_type'),
'PolicyBasedIPSecVpnSession')
nsx_router = self.nsxp_client.get_logical_router(router["name"],
router["id"])
vpn_service = self.nsxp_client.get_vpn_service(router["name"],
router["id"])
self.assertEqual(vpn_service[0].get('tags')[0].get("tag"),
nsx_router["id"])
self.assertEqual(vpn_service[0].get('resource_type'),
'IPSecVpnService')
@decorators.idempotent_id('d576c487-e7d5-4698-8a17-fa4521907675')
def test_vpn_endpoint_group_snat(self):
network_topo = self.create_network_topo(cidr="37.0.0.0/24",
enable_snat="True")
router = network_topo['router']
self.create_vpn_basic_topo_endpoint_group(network_topo)
ipsec_session = self.nsxp_client.get_ipsec_session(router["name"],
router["id"])
self.assertEqual(ipsec_session[0].get('resource_type'),
'PolicyBasedIPSecVpnSession')
nsx_router = self.nsxp_client.get_logical_router(router["name"],
router["id"])
vpn_service = self.nsxp_client.get_vpn_service(router["name"],
router["id"])
self.assertEqual(vpn_service[0].get('tags')[0].get("tag"),
nsx_router["id"])
self.assertEqual(vpn_service[0].get('resource_type'),
'IPSecVpnService')