Complete move of neutron-vpnaas tempest tests to tempest plugin

As discussed in the neutron_ci meeting [1] the QA team would like to
move the tempest tests for the stadium projects from their repos to
repos specific to being tempest plugins. This is the second part of a two
stage move, by removing the tempest tests that were copied to the
neutron-tempest-plugin repo [2].

[1] http://eavesdrop.openstack.org/meetings/neutron_ci/2019/neutron_ci.2019-03-12-16.01.log.html#l-94
[2] https://etherpad.openstack.org/p/neutron_stadium_move_to_tempest_plugin_repo

Depends-On: https://review.opendev.org/#/c/649373
Change-Id: Iafc2cd1790175d58b3016624b8451c8bfceafa32
This commit is contained in:
Miguel Lavalle
2019-11-24 19:42:24 -06:00
parent e0ed58c46c
commit 4077093c10
13 changed files with 11 additions and 1504 deletions

View File

@@ -11,43 +11,25 @@
check:
jobs:
- neutron-vpnaas-dsvm-functional-sswan
- neutron-vpnaas-tempest
- neutron-vpnaas-tempest-libreswan-centos
- neutron-tempest-plugin-vpnaas
- neutron-tempest-plugin-vpnaas-libreswan-centos:
# TODO(mlavalle) switch to voting when this job is moved to Centos
# 8
voting: false
gate:
jobs:
- neutron-vpnaas-dsvm-functional-sswan
- neutron-vpnaas-tempest
- neutron-vpnaas-tempest-libreswan-centos
- neutron-tempest-plugin-vpnaas
# TODO(mlavalle) uncomment following line when the job is moved to
# Centos 8
# - neutron-tempest-plugin-vpnaas-libreswan-centos
experimental:
jobs:
- neutron-vpnaas-dsvm-rally
- job:
name: neutron-vpnaas-tempest
parent: devstack-tempest
timeout: 3900
required-projects:
- openstack/neutron
- openstack/tempest
- openstack/neutron-tempest-plugin
vars:
tempest_test_regex: ^neutron_vpnaas\.
tox_envlist: all-plugin
devstack_plugins:
neutron-vpnaas: https://opendev.org/openstack/neutron-vpnaas
neutron-tempest-plugin: https://opendev.org/openstack/neutron-tempest-plugin
devstack_localrc:
IPSEC_PACKAGE: strongswan
USE_PYTHON3: true
irrelevant-files:
- ^.*\.rst$
- ^doc/.*$
- ^neutron_vpnaas/tests/unit/.*$
- ^releasenotes/.*$
- job:
name: neutron-vpnaas-tempest-libreswan-centos
parent: neutron-vpnaas-tempest
name: neutron-tempest-plugin-vpnaas-libreswan-centos
parent: neutron-tempest-plugin-vpnaas
nodeset: devstack-single-node-centos-7
vars:
devstack_localrc:

View File

@@ -1,6 +0,0 @@
===============================================
Tempest Integration of neutron-vpnaas
===============================================
This directory contains Tempest tests to cover the neutron-vpnaas project.

View File

@@ -1,161 +0,0 @@
# Copyright 2012 OpenStack Foundation
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.common.utils import data_utils
from neutron_tempest_plugin.api import base
from neutron_tempest_plugin import config
from neutron_vpnaas.tests.tempest.api import clients
CONF = config.CONF
class BaseNetworkTest(base.BaseNetworkTest):
@classmethod
def resource_setup(cls):
super(BaseNetworkTest, cls).resource_setup()
cls.vpnservices = []
cls.ikepolicies = []
cls.ipsecpolicies = []
cls.ipsec_site_connections = []
cls.endpoint_groups = []
@classmethod
def get_client_manager(cls, credential_type=None, roles=None,
force_new=None):
manager = super(BaseNetworkTest, cls).get_client_manager(
credential_type=credential_type,
roles=roles,
force_new=force_new)
# Neutron uses a different clients manager than the one in the Tempest
return clients.Manager(manager.credentials)
@classmethod
def resource_cleanup(cls):
if CONF.service_available.neutron:
# Clean up ipsec connections
for ipsec_site_connection in cls.ipsec_site_connections:
cls._try_delete_resource(
cls.client.delete_ipsec_site_connection,
ipsec_site_connection['id'])
# Clean up ipsec endpoint group
for endpoint_group in cls.endpoint_groups:
cls._try_delete_resource(cls.client.delete_endpoint_group,
endpoint_group['id'])
# Clean up ipsec policies
for ipsecpolicy in cls.ipsecpolicies:
cls._try_delete_resource(cls.client.delete_ipsecpolicy,
ipsecpolicy['id'])
# Clean up ike policies
for ikepolicy in cls.ikepolicies:
cls._try_delete_resource(cls.client.delete_ikepolicy,
ikepolicy['id'])
# Clean up vpn services
for vpnservice in cls.vpnservices:
cls._try_delete_resource(cls.client.delete_vpnservice,
vpnservice['id'])
super(BaseNetworkTest, cls).resource_cleanup()
@classmethod
def create_vpnservice(cls, subnet_id, router_id, name=None):
"""Wrapper utility that returns a test vpn service."""
if name is None:
name = data_utils.rand_name("vpnservice-")
body = cls.client.create_vpnservice(
subnet_id=subnet_id, router_id=router_id, admin_state_up=True,
name=name)
vpnservice = body['vpnservice']
cls.vpnservices.append(vpnservice)
return vpnservice
@classmethod
def create_vpnservice_no_subnet(cls, router_id):
"""Wrapper utility that returns a test vpn service."""
body = cls.client.create_vpnservice(
router_id=router_id, admin_state_up=True,
name=data_utils.rand_name("vpnservice-"))
vpnservice = body['vpnservice']
cls.vpnservices.append(vpnservice)
return vpnservice
@classmethod
def create_ikepolicy(cls, name):
"""Wrapper utility that returns a test ike policy."""
body = cls.client.create_ikepolicy(name=name)
ikepolicy = body['ikepolicy']
cls.ikepolicies.append(ikepolicy)
return ikepolicy
@classmethod
def create_ipsecpolicy(cls, name):
"""Wrapper utility that returns a test ipsec policy."""
body = cls.client.create_ipsecpolicy(name=name)
ipsecpolicy = body['ipsecpolicy']
cls.ipsecpolicies.append(ipsecpolicy)
return ipsecpolicy
@classmethod
def create_ipsec_site_connection(cls, ikepolicy_id, ipsecpolicy_id,
vpnservice_id, psk="secret",
peer_address="172.24.4.233",
peer_id="172.24.4.233",
peer_cidrs=None,
name=None):
"""Wrapper utility that returns a test vpn connection."""
if peer_cidrs is None:
peer_cidrs = ['1.1.1.0/24', '2.2.2.0/24']
if name is None:
name = data_utils.rand_name("ipsec_site_connection-")
body = cls.client.create_ipsec_site_connection(
psk=psk,
initiator="bi-directional",
ipsecpolicy_id=ipsecpolicy_id,
admin_state_up=True,
mtu=1500,
ikepolicy_id=ikepolicy_id,
vpnservice_id=vpnservice_id,
peer_address=peer_address,
peer_id=peer_id,
peer_cidrs=peer_cidrs,
name=name)
ipsec_site_connection = body['ipsec_site_connection']
cls.ipsec_site_connections.append(ipsec_site_connection)
return ipsec_site_connection
@classmethod
def create_endpoint_group(cls, name, type, endpoints):
"""Wrapper utility that returns a test ipsec policy."""
body = cls.client.create_endpoint_group(
endpoints=endpoints,
type=type,
description='endpoint type:' + type,
name=name)
endpoint_group = body['endpoint_group']
cls.endpoint_groups.append(endpoint_group)
return endpoint_group
class BaseAdminNetworkTest(BaseNetworkTest):
credentials = ['primary', 'admin']
@classmethod
def setup_clients(cls):
super(BaseAdminNetworkTest, cls).setup_clients()
cls.admin_client = cls.os_admin.network_client
cls.identity_admin_client = cls.os_admin.tenants_client

View File

@@ -1,70 +0,0 @@
# Copyright 2012 OpenStack Foundation
# Copyright 2016 Hewlett Packard Enterprise Development Company
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_tempest_plugin.api import clients as manager
from neutron_tempest_plugin import config
from neutron_tempest_plugin.services.network.json import network_client
CONF = config.CONF
class NetworkClient(network_client.NetworkClientJSON):
def pluralize(self, resource_name):
resource_plural_map = {
'ikepolicy': 'ikepolicies',
'ipsecpolicy': 'ipsecpolicies'
}
if resource_name in resource_plural_map:
return resource_plural_map.get(resource_name)
return super(NetworkClient, self).pluralize(resource_name)
def get_uri(self, plural_name):
# get service prefix from resource name
service_resource_prefix_list = [
'vpnservices',
'ikepolicies',
'ipsecpolicies',
'ipsec_site_connections',
'endpoint_groups',
]
if plural_name in service_resource_prefix_list:
plural_name = plural_name.replace("_", "-")
service_prefix = 'vpn'
uri = '%s/%s/%s' % (self.uri_prefix, service_prefix,
plural_name)
return uri
return super(NetworkClient, self).get_uri(plural_name)
class Manager(manager.Manager):
def __init__(self, credentials=None, service=None):
super(Manager, self).__init__(credentials=credentials)
self.network_client = NetworkClient(
self.auth_provider,
CONF.network.catalog_type,
CONF.network.region or CONF.identity.region,
endpoint_type=CONF.network.endpoint_type,
build_interval=CONF.network.build_interval,
build_timeout=CONF.network.build_timeout,
**self.default_params)

View File

@@ -1,879 +0,0 @@
# Copyright 2012,2016 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.db import constants as db_const
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron_tempest_plugin import config
from neutron_vpnaas.tests.tempest.api import base
CONF = config.CONF
_LONG_NAME = 'x' * (db_const.NAME_FIELD_SIZE + 1)
_LONG_DESCRIPTION = 'y' * (db_const.DESCRIPTION_FIELD_SIZE + 1)
class VPNaaSTestJSON(base.BaseAdminNetworkTest):
"""
Tests the following operations in the Neutron API using the REST client for
Neutron:
List, Show, Create, Delete, and Update VPN Service
List, Show, Create, Delete, and Update IKE policy
List, Show, Create, Delete, and Update IPSec policy
"""
@classmethod
def resource_setup(cls):
if not test.is_extension_enabled('vpnaas', 'network'):
msg = "vpnaas extension not enabled."
raise cls.skipException(msg)
super(VPNaaSTestJSON, cls).resource_setup()
cls.ext_net_id = CONF.network.public_network_id
network_name = data_utils.rand_name('network-')
cls.network = cls.create_network(network_name)
cls.subnet = cls.create_subnet(cls.network)
cls.router = cls.create_router(
data_utils.rand_name("router"),
external_network_id=CONF.network.public_network_id)
cls.create_router_interface(cls.router['id'], cls.subnet['id'])
cls.vpnservice = cls.create_vpnservice(cls.subnet['id'],
cls.router['id'])
vpnservice2 = cls.create_vpnservice_no_subnet(cls.router['id'])
cls.vpnservice_no_subnet = vpnservice2
cls.ikepolicy = cls.create_ikepolicy(
data_utils.rand_name("ike-policy-"))
cls.ipsecpolicy = cls.create_ipsecpolicy(
data_utils.rand_name("ipsec-policy-"))
cls.endpoint_group_local = cls.create_endpoint_group(
data_utils.rand_name("endpoint-group-local-"),
'subnet',
cls.subnet['id'])
cls.endpoint_group_remote = cls.create_endpoint_group(
data_utils.rand_name("endpoint-group-remote-"),
'cidr',
["10.101.0.0/24", "10.102.0.0/24"])
cls.ipsec_site_connection = cls.create_ipsec_site_connection(
cls.ikepolicy['id'],
cls.ipsecpolicy['id'],
cls.vpnservice['id'])
def _delete_ike_policy(self, ike_policy_id):
# Deletes a ike policy and verifies if it is deleted or not
ike_list = list()
all_ike = self.client.list_ikepolicies()
for ike in all_ike['ikepolicies']:
ike_list.append(ike['id'])
if ike_policy_id in ike_list:
self.client.delete_ikepolicy(ike_policy_id)
# Asserting that the policy is not found in list after deletion
ikepolicies = self.client.list_ikepolicies()
ike_id_list = list()
for i in ikepolicies['ikepolicies']:
ike_id_list.append(i['id'])
self.assertNotIn(ike_policy_id, ike_id_list)
def _delete_ipsec_policy(self, ipsec_policy_id):
# Deletes an ike policy if it exists
try:
self.client.delete_ipsecpolicy(ipsec_policy_id)
except lib_exc.NotFound:
pass
def _delete_ipsec_site_connection(self, conn_id):
# Deletes an ipsec site connection if it exists
try:
self.client.delete_ipsec_site_connection(conn_id)
except lib_exc.NotFound:
pass
def _assertExpected(self, expected, actual):
# Check if not expected keys/values exists in actual response body
for key, value in expected.items():
self.assertIn(key, actual)
self.assertEqual(value, actual[key])
def _delete_vpn_service(self, vpn_service_id):
self.client.delete_vpnservice(vpn_service_id)
# Asserting if vpn service is found in the list after deletion
body = self.client.list_vpnservices()
vpn_services = [vs['id'] for vs in body['vpnservices']]
self.assertNotIn(vpn_service_id, vpn_services)
def _delete_endpoint_group(self, endpoint_group_id):
# Delete a endpoint-group and verifies if it is deleted or not
endpoint_group_list = list()
all_endpoint = self.client.list_endpoint_groups()
for endpoint in all_endpoint['endpoint_groups']:
endpoint_group_list.append(endpoint['id'])
if endpoint_group_id in endpoint_group_list:
self.client.delete_endpoint_group(endpoint_group_id)
# Asserting that the endpoint is not found in list after deletion
endpoint_group = self.client.list_endpoint_groups()
for e in endpoint_group['endpoint_groups']:
endpoint_group_list.append(e['id'])
self.assertNotIn(endpoint_group_list, endpoint_group_id)
def _get_tenant_id(self):
"""
Returns the tenant_id of the client current user
"""
return self.client.tenant_id
@decorators.attr(type='smoke')
def test_admin_create_ipsec_policy_for_tenant(self):
tenant_id = self._get_tenant_id()
# Create IPSec policy for the newly created tenant
name = data_utils.rand_name('ipsec-policy')
body = (self.admin_client.
create_ipsecpolicy(name=name, tenant_id=tenant_id))
ipsecpolicy = body['ipsecpolicy']
self.assertIsNotNone(ipsecpolicy['id'])
self.addCleanup(self.admin_client.delete_ipsecpolicy,
ipsecpolicy['id'])
# Assert that created ipsec policy is found in API list call
body = self.client.list_ipsecpolicies()
ipsecpolicies = [policy['id'] for policy in body['ipsecpolicies']]
self.assertIn(ipsecpolicy['id'], ipsecpolicies)
@decorators.attr(type='smoke')
def test_admin_create_vpn_service_for_tenant(self):
tenant_id = self._get_tenant_id()
# Create vpn service for the newly created tenant
network2 = self.create_network()
subnet2 = self.create_subnet(network2)
router2 = self.create_router(data_utils.rand_name('router-'),
external_network_id=self.ext_net_id)
self.create_router_interface(router2['id'], subnet2['id'])
name = data_utils.rand_name('vpn-service')
body = self.admin_client.create_vpnservice(
subnet_id=subnet2['id'],
router_id=router2['id'],
name=name,
admin_state_up=True,
tenant_id=tenant_id)
vpnservice = body['vpnservice']
self.assertIsNotNone(vpnservice['id'])
self.addCleanup(self.admin_client.delete_vpnservice, vpnservice['id'])
# Assert that created vpnservice is found in API list call
body = self.client.list_vpnservices()
vpn_services = [vs['id'] for vs in body['vpnservices']]
self.assertIn(vpnservice['id'], vpn_services)
@decorators.attr(type='smoke')
def test_admin_create_ike_policy_for_tenant(self):
tenant_id = self._get_tenant_id()
# Create IKE policy for the newly created tenant
name = data_utils.rand_name('ike-policy')
body = (self.admin_client.
create_ikepolicy(name=name, ike_version="v1",
encryption_algorithm="aes-128",
auth_algorithm="sha1",
tenant_id=tenant_id))
ikepolicy = body['ikepolicy']
self.assertIsNotNone(ikepolicy['id'])
self.addCleanup(self.admin_client.delete_ikepolicy, ikepolicy['id'])
# Assert that created ike policy is found in API list call
body = self.client.list_ikepolicies()
ikepolicies = [ikp['id'] for ikp in body['ikepolicies']]
self.assertIn(ikepolicy['id'], ikepolicies)
@decorators.attr(type='smoke')
def test_list_vpn_services(self):
# Verify the VPN service exists in the list of all VPN services
body = self.client.list_vpnservices()
vpnservices = body['vpnservices']
self.assertIn(self.vpnservice['id'], [v['id'] for v in vpnservices])
@decorators.attr(type='smoke')
def test_create_update_delete_vpn_service(self):
# Creates a VPN service and sets up deletion
network1 = self.create_network()
subnet1 = self.create_subnet(network1)
router1 = self.create_router(data_utils.rand_name('router-'),
external_network_id=self.ext_net_id)
self.create_router_interface(router1['id'], subnet1['id'])
name = data_utils.rand_name('vpn-service1')
body = self.client.create_vpnservice(subnet_id=subnet1['id'],
router_id=router1['id'],
name=name,
admin_state_up=True)
vpnservice = body['vpnservice']
self.addCleanup(self._delete_vpn_service, vpnservice['id'])
# Assert if created vpnservices are not found in vpnservices list
body = self.client.list_vpnservices()
vpn_services = [vs['id'] for vs in body['vpnservices']]
self.assertIsNotNone(vpnservice['id'])
self.assertIn(vpnservice['id'], vpn_services)
# TODO(raies): implement logic to update vpnservice
# VPNaaS client function to update is implemented.
# But precondition is that current state of vpnservice
# should be "ACTIVE" not "PENDING*"
@decorators.attr(type='smoke')
def test_show_vpn_service(self):
# Verifies the details of a vpn service
body = self.client.show_vpnservice(self.vpnservice['id'])
vpnservice = body['vpnservice']
self.assertEqual(self.vpnservice['id'], vpnservice['id'])
self.assertEqual(self.vpnservice['name'], vpnservice['name'])
self.assertEqual(self.vpnservice['description'],
vpnservice['description'])
self.assertEqual(self.vpnservice['router_id'], vpnservice['router_id'])
self.assertEqual(self.vpnservice['subnet_id'], vpnservice['subnet_id'])
self.assertEqual(self.vpnservice['tenant_id'], vpnservice['tenant_id'])
valid_status = ["ACTIVE", "DOWN", "BUILD", "ERROR", "PENDING_CREATE",
"PENDING_UPDATE", "PENDING_DELETE"]
self.assertIn(vpnservice['status'], valid_status)
@decorators.attr(type='smoke')
def test_list_ike_policies(self):
# Verify the ike policy exists in the list of all IKE policies
body = self.client.list_ikepolicies()
ikepolicies = body['ikepolicies']
self.assertIn(self.ikepolicy['id'], [i['id'] for i in ikepolicies])
@decorators.attr(type='smoke')
def test_create_update_delete_ike_policy(self):
# Creates a IKE policy
name = data_utils.rand_name('ike-policy')
body = (self.client.create_ikepolicy(
name=name,
ike_version="v1",
encryption_algorithm="aes-128",
auth_algorithm="sha1"))
ikepolicy = body['ikepolicy']
self.assertIsNotNone(ikepolicy['id'])
self.addCleanup(self._delete_ike_policy, ikepolicy['id'])
# Update IKE Policy
new_ike = {'name': data_utils.rand_name("New-IKE"),
'description': "Updated ike policy",
'encryption_algorithm': "aes-256",
'ike_version': "v2",
'pfs': "group14",
'lifetime': {'units': "seconds", 'value': 2000}}
self.client.update_ikepolicy(ikepolicy['id'], **new_ike)
# Confirm that update was successful by verifying using 'show'
body = self.client.show_ikepolicy(ikepolicy['id'])
ike_policy = body['ikepolicy']
for key, value in new_ike.items():
self.assertIn(key, ike_policy)
self.assertEqual(value, ike_policy[key])
# Verification of ike policy delete
self.client.delete_ikepolicy(ikepolicy['id'])
body = self.client.list_ikepolicies()
ikepolicies = [ikp['id'] for ikp in body['ikepolicies']]
self.assertNotIn(ike_policy['id'], ikepolicies)
@decorators.attr(type='smoke')
def test_show_ike_policy(self):
# Verifies the details of a ike policy
body = self.client.show_ikepolicy(self.ikepolicy['id'])
ikepolicy = body['ikepolicy']
self.assertEqual(self.ikepolicy['id'], ikepolicy['id'])
self.assertEqual(self.ikepolicy['name'], ikepolicy['name'])
self.assertEqual(self.ikepolicy['description'],
ikepolicy['description'])
self.assertEqual(self.ikepolicy['encryption_algorithm'],
ikepolicy['encryption_algorithm'])
self.assertEqual(self.ikepolicy['auth_algorithm'],
ikepolicy['auth_algorithm'])
self.assertEqual(self.ikepolicy['tenant_id'],
ikepolicy['tenant_id'])
self.assertEqual(self.ikepolicy['pfs'],
ikepolicy['pfs'])
self.assertEqual(self.ikepolicy['phase1_negotiation_mode'],
ikepolicy['phase1_negotiation_mode'])
self.assertEqual(self.ikepolicy['ike_version'],
ikepolicy['ike_version'])
@decorators.attr(type='smoke')
def test_list_ipsec_policies(self):
# Verify the ipsec policy exists in the list of all ipsec policies
body = self.client.list_ipsecpolicies()
ipsecpolicies = body['ipsecpolicies']
self.assertIn(self.ipsecpolicy['id'], [i['id'] for i in ipsecpolicies])
@decorators.attr(type='smoke')
def test_create_update_delete_ipsec_policy(self):
# Creates an ipsec policy
ipsec_policy_body = {'name': data_utils.rand_name('ipsec-policy'),
'pfs': 'group5',
'encryption_algorithm': "aes-128",
'auth_algorithm': 'sha1'}
resp_body = self.client.create_ipsecpolicy(**ipsec_policy_body)
ipsecpolicy = resp_body['ipsecpolicy']
self.addCleanup(self._delete_ipsec_policy, ipsecpolicy['id'])
self._assertExpected(ipsec_policy_body, ipsecpolicy)
# Verification of ipsec policy update
new_ipsec = {'description': 'Updated ipsec policy',
'pfs': 'group2',
'name': data_utils.rand_name("New-IPSec"),
'encryption_algorithm': "aes-256",
'lifetime': {'units': "seconds", 'value': '2000'}}
body = self.client.update_ipsecpolicy(ipsecpolicy['id'],
**new_ipsec)
updated_ipsec_policy = body['ipsecpolicy']
self._assertExpected(new_ipsec, updated_ipsec_policy)
# Verification of ipsec policy delete
self.client.delete_ipsecpolicy(ipsecpolicy['id'])
self.assertRaises(lib_exc.NotFound,
self.client.delete_ipsecpolicy, ipsecpolicy['id'])
@decorators.attr(type='smoke')
def test_show_ipsec_policy(self):
# Verifies the details of an ipsec policy
body = self.client.show_ipsecpolicy(self.ipsecpolicy['id'])
ipsecpolicy = body['ipsecpolicy']
self._assertExpected(self.ipsecpolicy, ipsecpolicy)
@decorators.attr(type=['negative', 'smoke'])
def test_create_vpnservice_long_name(self):
"""
Test excessively long name.
Without REST checks, this call would return 500 INTERNAL SERVER
error on internal db failure instead.
"""
name = _LONG_NAME
self.assertRaises(
lib_exc.BadRequest, self.client.create_vpnservice,
subnet_id=self.subnet['id'], router_id=self.router['id'],
name=name, admin_state_up=True)
@decorators.attr(type=['negative', 'smoke'])
def test_create_vpnservice_long_description(self):
name = data_utils.rand_name('vpn-service1')
description = _LONG_DESCRIPTION
self.assertRaises(
lib_exc.BadRequest, self.client.create_vpnservice,
subnet_id=self.subnet['id'], router_id=self.router['id'],
name=name, description=description, admin_state_up=True)
@decorators.attr(type='smoke')
def test_list_vpn_connections(self):
# Verify the VPN service exists in the list of all VPN services
body = self.client.list_ipsec_site_connections()
ipsec_site_connections = body['ipsec_site_connections']
self.assertIn(self.ipsec_site_connection['id'],
[v['id'] for v in ipsec_site_connections])
@decorators.attr(type='smoke')
def test_create_delete_vpn_connection_with_legacy_mode(self):
# Verify create VPN connection
name = data_utils.rand_name("ipsec_site_connection-")
body = self.client.create_ipsec_site_connection(
ipsecpolicy_id=self.ipsecpolicy['id'],
ikepolicy_id=self.ikepolicy['id'],
vpnservice_id=self.vpnservice['id'],
peer_address="172.24.4.233",
peer_id="172.24.4.233",
peer_cidrs=['10.1.1.0/24', '10.2.2.0/24'],
name=name,
mtu=1500,
admin_state_up=True,
initiator="bi-directional",
psk="secret")
ipsec_site_connection = body['ipsec_site_connection']
self.assertEqual(ipsec_site_connection['name'], name)
self.assertEqual(ipsec_site_connection['mtu'], 1500)
self.addCleanup(self._delete_ipsec_site_connection,
ipsec_site_connection['id'])
# Verification of IPsec connection delete
self.client.delete_ipsec_site_connection(ipsec_site_connection['id'])
body = self.client.list_ipsec_site_connections()
ipsec_site_connections = body['ipsec_site_connections']
self.assertNotIn(ipsec_site_connection['id'],
[v['id'] for v in ipsec_site_connections])
@decorators.attr(type=['negative', 'smoke'])
def test_create_vpn_connection_missing_peer_cidr(self):
# Verify create VPN connection with JSON missing peer cidr
# in legacy mode
name = data_utils.rand_name("ipsec_site_connection-")
self.assertRaises(
lib_exc.BadRequest,
self.client.create_ipsec_site_connection,
ipsecpolicy_id=self.ipsecpolicy['id'],
ikepolicy_id=self.ikepolicy['id'],
vpnservice_id=self.vpnservice['id'],
peer_address="172.24.4.233",
peer_id="172.24.4.233",
name=name,
mtu=1500,
admin_state_up=True,
initiator="bi-directional",
psk="secret")
@decorators.attr(type=['negative', 'smoke'])
def test_create_vpn_service_subnet_not_on_router(self):
# Verify create VPN service with a subnet not on router
tenant_id = self._get_tenant_id()
# Create vpn service for the newly created tenant
network2 = self.create_network()
subnet2 = self.create_subnet(network2)
router2 = self.create_router(data_utils.rand_name('router-'),
external_network_id=self.ext_net_id)
self.addCleanup(self.admin_client.delete_router, router2['id'])
self.addCleanup(self.admin_client.delete_network, network2['id'])
name = data_utils.rand_name('vpn-service')
self.assertRaises(
lib_exc.BadRequest,
self.admin_client.create_vpnservice,
subnet_id=subnet2['id'],
router_id=router2['id'],
name=name,
admin_state_up=True,
tenant_id=tenant_id)
@decorators.attr(type=['negative', 'smoke'])
def test_create_vpn_connection_small_MTU(self):
# Verify create VPN connection with small MTU
name = data_utils.rand_name("ipsec_site_connection-")
self.assertRaises(
lib_exc.BadRequest,
self.client.create_ipsec_site_connection,
ipsecpolicy_id=self.ipsecpolicy['id'],
ikepolicy_id=self.ikepolicy['id'],
vpnservice_id=self.vpnservice['id'],
peer_address="172.24.4.233",
peer_id="172.24.4.233",
peer_cidrs=['10.1.1.0/24', '10.2.2.0/24'],
name=name,
mtu=63,
admin_state_up=True,
initiator="bi-directional",
psk="secret")
@decorators.attr(type=['negative', 'smoke'])
def test_create_vpn_connection_small_dpd(self):
# Verify create VPN connection with small dpd
name = data_utils.rand_name("ipsec_site_connection-")
self.assertRaises(
lib_exc.BadRequest,
self.client.create_ipsec_site_connection,
ipsecpolicy_id=self.ipsecpolicy['id'],
ikepolicy_id=self.ikepolicy['id'],
vpnservice_id=self.vpnservice['id'],
peer_address="172.24.4.233",
peer_id="172.24.4.233",
peer_cidrs=['10.1.1.0/24', '10.2.2.0/24'],
name=name,
dpd=59,
admin_state_up=True,
initiator="bi-directional",
psk="secret")
@decorators.attr(type=['negative', 'smoke'])
def test_create_vpn_connection_wrong_peer_cidr(self):
# Verify create VPN connection with wrong peer cidr
name = data_utils.rand_name("ipsec_site_connection-")
self.assertRaises(
lib_exc.BadRequest,
self.client.create_ipsec_site_connection,
ipsecpolicy_id=self.ipsecpolicy['id'],
ikepolicy_id=self.ikepolicy['id'],
vpnservice_id=self.vpnservice['id'],
peer_address="172.24.4.233",
peer_id="172.24.4.233",
peer_cidrs=['1.0.0.0/33'],
name=name,
mtu=1500,
admin_state_up=True,
initiator="bi-directional",
psk="secret")
@decorators.attr(type=['negative', 'smoke'])
def test_create_connection_with_cidr_and_endpoint_group(self):
tenant_id = self._get_tenant_id()
# Create endpoint group for the newly created tenant
name = data_utils.rand_name('endpoint_group')
subnet_id = self.subnet['id']
body = self.client.create_endpoint_group(
tenant_id=tenant_id,
name=name,
type='subnet',
endpoints=subnet_id)
endpoint_group_local = body['endpoint_group']
self.addCleanup(self._delete_endpoint_group,
endpoint_group_local['id'])
name = data_utils.rand_name('endpoint_group')
body = self.client.create_endpoint_group(
tenant_id=tenant_id,
name=name,
type='cidr',
endpoints=["10.103.0.0/24", "10.104.0.0/24"])
endpoint_group_remote = body['endpoint_group']
self.addCleanup(self._delete_endpoint_group,
endpoint_group_remote['id'])
# Create connections
name = data_utils.rand_name("ipsec_site_connection-")
self.assertRaises(
lib_exc.BadRequest,
self.client.create_ipsec_site_connection,
ipsecpolicy_id=self.ipsecpolicy['id'],
ikepolicy_id=self.ikepolicy['id'],
vpnservice_id=self.vpnservice_no_subnet['id'],
peer_address="172.24.4.233",
peer_id="172.24.4.233",
peer_cidr="10.1.0.0/24",
peer_ep_group_id=endpoint_group_local['id'],
local_ep_group_id=endpoint_group_remote['id'],
name=name,
admin_state_up=True,
initiator="bi-directional",
psk="secret")
@decorators.attr(type=['negative', 'smoke'])
def test_create_vpn_connection_with_missing_remote_endpoint_group(self):
# Verify create VPN connection without subnet in vpnservice
# and has only local endpoint group
tenant_id = self._get_tenant_id()
# Create endpoint group for the newly created tenant
tenant_id = self._get_tenant_id()
name = data_utils.rand_name('endpoint_group')
subnet_id = self.subnet['id']
body = self.client.create_endpoint_group(
tenant_id=tenant_id,
name=name,
type='subnet',
endpoints=subnet_id)
endpoint_group = body['endpoint_group']
self.addCleanup(self._delete_endpoint_group, endpoint_group['id'])
# Create connections
name = data_utils.rand_name("ipsec_site_connection-")
self.assertRaises(
lib_exc.BadRequest,
self.client.create_ipsec_site_connection,
ipsecpolicy_id=self.ipsecpolicy['id'],
ikepolicy_id=self.ikepolicy['id'],
vpnservice_id=self.vpnservice_no_subnet['id'],
peer_address="172.24.4.233",
peer_id="172.24.4.233",
local_ep_group_id=endpoint_group['id'],
name=name,
admin_state_up=True,
initiator="bi-directional",
psk="secret")
@decorators.attr(type=['negative', 'smoke'])
def test_create_vpn_connection_with_missing_local_endpoint_group(self):
# Verify create VPN connection without subnet in vpnservice
# and only have only local endpoint group
tenant_id = self._get_tenant_id()
# Create endpoint group for the newly created tenant
tenant_id = self._get_tenant_id()
name = data_utils.rand_name('endpoint_group')
body = self.client.create_endpoint_group(
tenant_id=tenant_id,
name=name,
type='cidr',
endpoints=["10.101.0.0/24", "10.102.0.0/24"])
endpoint_group = body['endpoint_group']
self.addCleanup(self._delete_endpoint_group, endpoint_group['id'])
# Create connections
name = data_utils.rand_name("ipsec_site_connection-")
self.assertRaises(
lib_exc.BadRequest,
self.client.create_ipsec_site_connection,
ipsecpolicy_id=self.ipsecpolicy['id'],
ikepolicy_id=self.ikepolicy['id'],
vpnservice_id=self.vpnservice_no_subnet['id'],
peer_address="172.24.4.233",
peer_id="172.24.4.233",
peer_ep_group_id=endpoint_group['id'],
name=name,
admin_state_up=True,
initiator="bi-directional",
psk="secret")
@decorators.attr(type=['negative', 'smoke'])
def test_create_connection_with_mix_ip_endpoint_group(self):
tenant_id = self._get_tenant_id()
# Create endpoint group for the newly created tenant
name = data_utils.rand_name('endpoint_group')
subnet_id = self.subnet['id']
body = self.client.create_endpoint_group(
tenant_id=tenant_id,
name=name,
type='subnet',
endpoints=subnet_id)
endpoint_group_local = body['endpoint_group']
self.addCleanup(self._delete_endpoint_group,
endpoint_group_local['id'])
name_v6 = data_utils.rand_name('endpoint_group')
body_v6 = self.client.create_endpoint_group(
tenant_id=tenant_id,
name=name_v6,
type='cidr',
endpoints=["fec0:101::/64", "fec0:102::/64"])
endpoint_group_remote = body_v6['endpoint_group']
self.addCleanup(self._delete_endpoint_group,
endpoint_group_remote['id'])
# Create connections
name = data_utils.rand_name("ipsec_site_connection-")
self.assertEqual(endpoint_group_local['type'], 'subnet')
self.assertEqual(endpoint_group_remote['type'], 'cidr')
self.assertRaises(
lib_exc.BadRequest,
self.client.create_ipsec_site_connection,
ipsecpolicy_id=self.ipsecpolicy['id'],
ikepolicy_id=self.ikepolicy['id'],
vpnservice_id=self.vpnservice_no_subnet['id'],
peer_address="172.24.4.233",
peer_id="172.24.4.233",
peer_ep_group_id=endpoint_group_local['id'],
local_ep_group_id=endpoint_group_remote['id'],
name=name,
admin_state_up=True,
initiator="bi-directional",
psk="secret")
@decorators.attr(type=['negative', 'smoke'])
def test_create_connection_with_subnet_and_remote_endpoint_group(self):
tenant_id = self._get_tenant_id()
# Create endpoint group for the newly created tenant
name = data_utils.rand_name('endpoint_group')
body = self.client.create_endpoint_group(
tenant_id=tenant_id,
name=name,
type='cidr',
endpoints=["10.101.0.0/24", "10.102.0.0/24"])
endpoint_group = body['endpoint_group']
self.addCleanup(self._delete_endpoint_group, endpoint_group['id'])
# Create connections
name = data_utils.rand_name("ipsec_site_connection-")
self.assertRaises(
lib_exc.BadRequest,
self.client.create_ipsec_site_connection,
ipsecpolicy_id=self.ipsecpolicy['id'],
ikepolicy_id=self.ikepolicy['id'],
vpnservice_id=self.vpnservice['id'],
peer_address="172.24.4.233",
peer_ep_group_id=endpoint_group['id'],
name=name,
admin_state_up=True,
initiator="bi-directional",
psk="secret")
@decorators.attr(type=['negative', 'smoke'])
def test_create_connection_with_subnet_and_local_endpoint_group(self):
tenant_id = self._get_tenant_id()
# Create endpoint group for the newly created tenant
name = data_utils.rand_name('endpoint_group')
subnet_id = self.subnet['id']
body = self.client.create_endpoint_group(
tenant_id=tenant_id,
name=name,
type='subnet',
endpoints=subnet_id)
endpoint_group = body['endpoint_group']
self.addCleanup(self._delete_endpoint_group, endpoint_group['id'])
# Create connections
name = data_utils.rand_name("ipsec_site_connection-")
self.assertRaises(
lib_exc.BadRequest,
self.client.create_ipsec_site_connection,
ipsecpolicy_id=self.ipsecpolicy['id'],
ikepolicy_id=self.ikepolicy['id'],
vpnservice_id=self.vpnservice['id'],
peer_address="172.24.4.233",
local_ep_group_id=endpoint_group['id'],
name=name,
admin_state_up=True,
initiator="bi-directional",
psk="secret")
@decorators.attr(type='smoke')
def test_create_update_delete_endpoint_group(self):
# Creates a endpoint-group
name = data_utils.rand_name('endpoint_group')
body = (self.client.create_endpoint_group(
name=name,
type='cidr',
endpoints=["10.2.0.0/24", "10.3.0.0/24"]))
endpoint_group = body['endpoint_group']
self.assertIsNotNone(endpoint_group['id'])
self.addCleanup(self._delete_endpoint_group, endpoint_group['id'])
# Update endpoint-group
body = {'name': data_utils.rand_name("new_endpoint_group")}
self.client.update_endpoint_group(endpoint_group['id'],
name=name)
# Confirm that update was successful by verifying using 'show'
body = self.client.show_endpoint_group(endpoint_group['id'])
endpoint_group = body['endpoint_group']
self.assertEqual(name, endpoint_group['name'])
# Verification of endpoint-group delete
endpoint_group_id = endpoint_group['id']
self.client.delete_endpoint_group(endpoint_group['id'])
body = self.client.list_endpoint_groups()
endpoint_group = [enp['id'] for enp in body['endpoint_groups']]
self.assertNotIn(endpoint_group_id, endpoint_group)
@decorators.attr(type='smoke')
def test_admin_create_endpoint_group_for_tenant(self):
# Create endpoint group for the newly created tenant
tenant_id = self._get_tenant_id()
name = data_utils.rand_name('endpoint_group')
body = (self.client.
create_endpoint_group(
name=name,
type='cidr',
endpoints=["10.2.0.0/24", "10.3.0.0/24"],
tenant_id=tenant_id))
endpoint_group = body['endpoint_group']
self.assertIsNotNone(endpoint_group['id'])
self.addCleanup(self._delete_endpoint_group, endpoint_group['id'])
# Assert that created endpoint group is found in API list call
endpoint_group_id = endpoint_group['id']
self.client.delete_endpoint_group(endpoint_group['id'])
body = self.client.list_endpoint_groups()
endpoint_group = [enp['id'] for enp in body['endpoint_groups']]
self.assertNotIn(endpoint_group_id, endpoint_group)
@decorators.attr(type='smoke')
def test_show_endpoint_group(self):
# Verifies the details of an endpoint group
body = self.client.show_endpoint_group(self.endpoint_group_local['id'])
endpoint_group = body['endpoint_group']
self.assertEqual(self.endpoint_group_local['id'], endpoint_group['id'])
self.assertEqual(self.endpoint_group_local['name'],
endpoint_group['name'])
self.assertEqual(self.endpoint_group_local['description'],
endpoint_group['description'])
self.assertEqual(self.endpoint_group_local['tenant_id'],
endpoint_group['tenant_id'])
self.assertEqual(self.endpoint_group_local['type'],
endpoint_group['type'])
self.assertEqual(self.endpoint_group_local['endpoints'],
endpoint_group['endpoints'])
# Verifies the details of an endpoint group
body = self.client.show_endpoint_group(
self.endpoint_group_remote['id'])
endpoint_group = body['endpoint_group']
#endpoint_group_remote = endpoint_group['id']
self.assertEqual(self.endpoint_group_remote['id'],
endpoint_group['id'])
self.assertEqual(self.endpoint_group_remote['name'],
endpoint_group['name'])
self.assertEqual(self.endpoint_group_remote['description'],
endpoint_group['description'])
self.assertEqual(self.endpoint_group_remote['tenant_id'],
endpoint_group['tenant_id'])
self.assertEqual(self.endpoint_group_remote['type'],
endpoint_group['type'])
self.assertEqual(self.endpoint_group_remote['endpoints'],
endpoint_group['endpoints'])
@decorators.attr(type='smoke')
def test_create_delete_vpn_connection_with_ep_group(self):
# Creates a endpoint-group with type cidr
name = data_utils.rand_name('endpoint_group')
body = self.client.create_endpoint_group(
name=name,
type='cidr',
endpoints=["10.2.0.0/24", "10.3.0.0/24"])
endpoint_group_remote = body['endpoint_group']
self.addCleanup(self._delete_endpoint_group,
endpoint_group_remote['id'])
# Creates a endpoint-group with type subnet
name = data_utils.rand_name('endpoint_group')
subnet_id = self.subnet['id']
body2 = self.client.create_endpoint_group(
name=name,
type='subnet',
endpoints=subnet_id)
endpoint_group_local = body2['endpoint_group']
self.addCleanup(self._delete_endpoint_group,
endpoint_group_local['id'])
# Verify create VPN connection
name = data_utils.rand_name("ipsec_site_connection-")
body = self.client.create_ipsec_site_connection(
ipsecpolicy_id=self.ipsecpolicy['id'],
ikepolicy_id=self.ikepolicy['id'],
vpnservice_id=self.vpnservice_no_subnet['id'],
peer_ep_group_id=endpoint_group_remote['id'],
local_ep_group_id=endpoint_group_local['id'],
name=name,
mtu=1500,
admin_state_up=True,
initiator="bi-directional",
peer_address="172.24.4.233",
peer_id="172.24.4.233",
psk="secret")
ipsec_site_connection = body['ipsec_site_connection']
self.assertEqual(ipsec_site_connection['name'], name)
self.assertEqual(ipsec_site_connection['mtu'], 1500)
self.addCleanup(self._delete_ipsec_site_connection,
ipsec_site_connection['id'])
# Verification of IPsec connection delete
self.client.delete_ipsec_site_connection(ipsec_site_connection['id'])
body = self.client.list_ipsec_site_connections()
ipsec_site_connections = body['ipsec_site_connections']
self.assertNotIn(ipsec_site_connection['id'],
[v['id'] for v in ipsec_site_connections])
@decorators.attr(type=['negative', 'smoke'])
def test_fail_create_endpoint_group_when_wrong_type(self):
# Creates a endpoint-group with wrong type
name = data_utils.rand_name('endpoint_group')
self.assertRaises(
lib_exc.BadRequest,
self.client.create_endpoint_group,
name=name,
type='subnet',
endpoints=["10.2.0.0/24", "10.3.0.0/24"])
@decorators.attr(type=['negative', 'smoke'])
def test_fail_create_endpoint_group_when_provide_subnet_id_with_cidr(self):
# Creates a endpoint-group when provide subnet id with type cidr
name = data_utils.rand_name('endpoint_group')
subnet_id = self.subnet['id']
self.assertRaises(
lib_exc.BadRequest,
self.client.create_endpoint_group,
name=name,
type='cidr',
endpoints=subnet_id)
@decorators.attr(type=['negative', 'smoke'])
def test_fail_create_endpoint_group_with_mixed_IP_version(self):
# Creates a endpoint-group with mixed IP version
name = data_utils.rand_name('endpoint_group')
self.assertRaises(
lib_exc.BadRequest,
self.client.create_endpoint_group,
name=name,
type='cidr',
endpoints=["10.2.0.0/24", "2000::1"])

View File

@@ -1,36 +0,0 @@
# Copyright 2015
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from tempest.test_discover import plugins
import neutron_vpnaas
class VPNTempestPlugin(plugins.TempestPlugin):
def load_tests(self):
base_path = os.path.split(os.path.dirname(
os.path.abspath(neutron_vpnaas.__file__)))[0]
test_dir = "neutron_vpnaas/tests/tempest"
full_test_dir = os.path.join(base_path, test_dir)
return full_test_dir, base_path
def register_opts(self, conf):
pass
def get_opt_lists(self):
pass

View File

@@ -1,22 +0,0 @@
# Copyright (c) 2017 Midokura SARL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_tempest_plugin.scenario import base
from neutron_vpnaas.tests.tempest.api import base as base_api
class BaseTempestTestCase(base_api.BaseNetworkTest, base.BaseTempestTestCase):
pass

View File

@@ -1,299 +0,0 @@
# Copyright (c) 2017 Midokura SARL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from oslo_config import cfg
import testtools
from tempest.common import utils
from tempest.common import waiters
from tempest.lib.common import ssh
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from neutron_tempest_plugin import config
from neutron_tempest_plugin.scenario import constants
from neutron_vpnaas.tests.tempest.scenario import base
CONF = config.CONF
# NOTE(huntxu): This is a workaround due to a upstream bug [1].
# VPNaaS 4in6 and 6in4 is not working properly with LibreSwan 3.19+.
# In OpenStack zuul checks the base CentOS 7 node is using Libreswan 3.20 on
# CentOS 7.4. So we need to provide a way to skip the 4in6 and 6in4 test cases
# for zuul.
#
# Once the upstream bug gets fixed and the base node uses a newer version of
# Libreswan with that fix, we can remove this.
#
# [1] https://github.com/libreswan/libreswan/issues/175
CONF.register_opt(
cfg.BoolOpt('skip_4in6_6in4_tests',
default=False,
help='Whether to skip 4in6 and 6in4 test cases.'),
'neutron_vpnaas_plugin_options'
)
class Vpnaas(base.BaseTempestTestCase):
"""Test the following topology
+-------------------+
| public |
| network |
| |
+-+---------------+-+
| |
| |
+-------+-+ +-+-------+
| LEFT | | RIGHT |
| router | <--VPN--> | router |
| | | |
+----+----+ +----+----+
| |
+----+----+ +----+----+
| LEFT | | RIGHT |
| network | | network |
| | | |
+---------+ +---------+
"""
credentials = ['primary', 'admin']
inner_ipv6 = False
outer_ipv6 = False
@classmethod
@utils.requires_ext(extension="vpnaas", service="network")
def resource_setup(cls):
super(Vpnaas, cls).resource_setup()
# common
cls.keypair = cls.create_keypair()
cls.secgroup = cls.os_primary.network_client.create_security_group(
name=data_utils.rand_name('secgroup-'))['security_group']
cls.security_groups.append(cls.secgroup)
cls.create_loginable_secgroup_rule(secgroup_id=cls.secgroup['id'])
cls.create_pingable_secgroup_rule(secgroup_id=cls.secgroup['id'])
cls.ikepolicy = cls.create_ikepolicy(
data_utils.rand_name("ike-policy-"))
cls.ipsecpolicy = cls.create_ipsecpolicy(
data_utils.rand_name("ipsec-policy-"))
cls.extra_subnet_attributes = {}
if cls.inner_ipv6:
cls.create_v6_pingable_secgroup_rule(
secgroup_id=cls.secgroup['id'])
cls.extra_subnet_attributes['ipv6_address_mode'] = 'slaac'
cls.extra_subnet_attributes['ipv6_ra_mode'] = 'slaac'
# LEFT
cls.router = cls.create_router(
data_utils.rand_name('left-router'),
admin_state_up=True,
external_network_id=CONF.network.public_network_id)
cls.network = cls.create_network(network_name='left-network')
ip_version = 6 if cls.inner_ipv6 else 4
v4_cidr = netaddr.IPNetwork('10.20.0.0/24')
v6_cidr = netaddr.IPNetwork('2001:db8:0:2::/64')
cidr = v6_cidr if cls.inner_ipv6 else v4_cidr
cls.subnet = cls.create_subnet(
cls.network, ip_version=ip_version, cidr=cidr, name='left-subnet',
**cls.extra_subnet_attributes)
cls.create_router_interface(cls.router['id'], cls.subnet['id'])
# Gives an internal IPv4 subnet for floating IP to the left server,
# we use it to ssh into the left server.
if cls.inner_ipv6:
v4_subnet = cls.create_subnet(
cls.network, ip_version=4, name='left-v4-subnet')
cls.create_router_interface(cls.router['id'], v4_subnet['id'])
# RIGHT
cls._right_network, cls._right_subnet, cls._right_router = \
cls._create_right_network()
@classmethod
def create_v6_pingable_secgroup_rule(cls, secgroup_id=None, client=None):
# NOTE(huntxu): This method should be moved into the base class, along
# with the v4 version.
"""This rule is intended to permit inbound ping6
"""
rule_list = [{'protocol': 'ipv6-icmp',
'direction': 'ingress',
'port_range_min': 128, # type
'port_range_max': 0, # code
'ethertype': 'IPv6',
'remote_ip_prefix': '::/0'}]
client = client or cls.os_primary.network_client
cls.create_secgroup_rules(rule_list, client=client,
secgroup_id=secgroup_id)
@classmethod
def _create_right_network(cls):
router = cls.create_router(
data_utils.rand_name('right-router'),
admin_state_up=True,
external_network_id=CONF.network.public_network_id)
network = cls.create_network(network_name='right-network')
v4_cidr = netaddr.IPNetwork('10.10.0.0/24')
v6_cidr = netaddr.IPNetwork('2001:db8:0:1::/64')
cidr = v6_cidr if cls.inner_ipv6 else v4_cidr
ip_version = 6 if cls.inner_ipv6 else 4
subnet = cls.create_subnet(
network, ip_version=ip_version, cidr=cidr, name='right-subnet',
**cls.extra_subnet_attributes)
cls.create_router_interface(router['id'], subnet['id'])
return network, subnet, router
def _create_server(self, create_floating_ip=True, network=None):
if network is None:
network = self.network
port = self.create_port(network, security_groups=[self.secgroup['id']])
if create_floating_ip:
fip = self.create_and_associate_floatingip(port['id'])
else:
fip = None
server = self.create_server(
flavor_ref=CONF.compute.flavor_ref,
image_ref=CONF.compute.image_ref,
key_name=self.keypair['name'],
networks=[{'port': port['id']}])['server']
waiters.wait_for_server_status(self.os_primary.servers_client,
server['id'],
constants.SERVER_STATUS_ACTIVE)
return {'port': port, 'fip': fip, 'server': server}
def _setup_vpn(self):
sites = [
dict(name="left", network=self.network, subnet=self.subnet,
router=self.router),
dict(name="right", network=self._right_network,
subnet=self._right_subnet, router=self._right_router),
]
psk = data_utils.rand_name('mysecret')
for i in range(0, 2):
site = sites[i]
site['vpnservice'] = self.create_vpnservice(
site['subnet']['id'], site['router']['id'],
name=data_utils.rand_name('%s-vpnservice' % site['name']))
for i in range(0, 2):
site = sites[i]
vpnservice = site['vpnservice']
peer = sites[1 - i]
if self.outer_ipv6:
peer_address = peer['vpnservice']['external_v6_ip']
if not peer_address:
msg = "Public network must have an IPv6 subnet."
raise self.skipException(msg)
else:
peer_address = peer['vpnservice']['external_v4_ip']
self.create_ipsec_site_connection(
self.ikepolicy['id'],
self.ipsecpolicy['id'],
vpnservice['id'],
peer_address=peer_address,
peer_id=peer_address,
peer_cidrs=[peer['subnet']['cidr']],
psk=psk,
name=data_utils.rand_name(
'%s-ipsec-site-connection' % site['name']))
def _get_ip_on_subnet_for_port(self, port, subnet_id):
for fixed_ip in port['fixed_ips']:
if fixed_ip['subnet_id'] == subnet_id:
return fixed_ip['ip_address']
msg = "Cannot get IP address on specified subnet %s for port %r." % (
subnet_id, port)
raise self.fail(msg)
def _test_vpnaas(self):
# RIGHT
right_server = self._create_server(network=self._right_network,
create_floating_ip=False)
right_ip = self._get_ip_on_subnet_for_port(
right_server['port'], self._right_subnet['id'])
# LEFT
left_server = self._create_server()
ssh_client = ssh.Client(left_server['fip']['floating_ip_address'],
CONF.validation.image_ssh_user,
pkey=self.keypair['private_key'])
# check LEFT -> RIGHT connectivity via VPN
self.check_remote_connectivity(ssh_client, right_ip,
should_succeed=False)
self._setup_vpn()
self.check_remote_connectivity(ssh_client, right_ip)
# Test VPN traffic and floating IP traffic don't interfere each other.
if not self.inner_ipv6:
# Assign a floating-ip and check connectivity.
# This is NOT via VPN.
fip = self.create_and_associate_floatingip(
right_server['port']['id'])
self.check_remote_connectivity(ssh_client,
fip['floating_ip_address'])
# check LEFT -> RIGHT connectivity via VPN again, to ensure
# the above floating-ip doesn't interfere the traffic.
self.check_remote_connectivity(ssh_client, right_ip)
class Vpnaas4in4(Vpnaas):
@decorators.idempotent_id('aa932ab2-63aa-49cf-a2a0-8ae71ac2bc24')
def test_vpnaas(self):
self._test_vpnaas()
class Vpnaas4in6(Vpnaas):
outer_ipv6 = True
@decorators.idempotent_id('2d5f18dc-6186-4deb-842b-051325bd0466')
@testtools.skipUnless(CONF.network_feature_enabled.ipv6,
'IPv6 tests are disabled.')
@testtools.skipIf(
CONF.neutron_vpnaas_plugin_options.skip_4in6_6in4_tests,
'VPNaaS 4in6 test is skipped.')
def test_vpnaas_4in6(self):
self._test_vpnaas()
class Vpnaas6in4(Vpnaas):
inner_ipv6 = True
@decorators.idempotent_id('10febf33-c5b7-48af-aa13-94b4fb585a55')
@testtools.skipUnless(CONF.network_feature_enabled.ipv6,
'IPv6 tests are disabled.')
@testtools.skipIf(
CONF.neutron_vpnaas_plugin_options.skip_4in6_6in4_tests,
'VPNaaS 6in4 test is skipped.')
def test_vpnaas_6in4(self):
self._test_vpnaas()
class Vpnaas6in6(Vpnaas):
inner_ipv6 = True
outer_ipv6 = True
@decorators.idempotent_id('8b503ffc-aeb0-4938-8dba-73c7323e276d')
@testtools.skipUnless(CONF.network_feature_enabled.ipv6,
'IPv6 tests are disabled.')
def test_vpnaas_6in6(self):
self._test_vpnaas()

View File

@@ -49,8 +49,6 @@ oslo.policy.policies =
neutron-vpnaas = neutron_vpnaas.policies:list_rules
neutron.policies =
neutron-vpnaas = neutron_vpnaas.policies:list_rules
tempest.test_plugins =
neutron_vpnaas_tests = neutron_vpnaas.tests.tempest.plugin:VPNTempestPlugin
[build_sphinx]
all_files = 1