Tempest: Added new design for tempest test cases

- Added appliance manager to deploy, maintain, update and clean topology appliances for test modules
- Added traffic manager to define network related traffic functions
- Added feature manager to CRUD feature related tasks
- Rewrote test_micro_segmentation_ops.py based on new design
- Rewrote L2gaeway nsxv3 tests based on new design
- Added L2gateway nsxv3 scenario tests based on new design

Change-Id: I571a9dede56266204efd36ad2720340e7128fd79
This commit is contained in:
Devang Doshi 2017-05-10 22:27:48 -07:00
parent 3486e23a23
commit 2f7850f3c0
14 changed files with 1435 additions and 1016 deletions

View File

@ -1,4 +1,4 @@
# Copyright 2016 VMware, Inc.
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -54,3 +54,8 @@ EXCLUSIVE_ROUTER = 'exclusive'
DISTRIBUTED_ROUTER = 'distributed'
TCP_PROTOCOL = 'tcp'
ICMP_PROTOCOL = 'icmp'
# NSXV3 Firewall
NSX_FIREWALL_REALIZED_DELAY = 2
APPLIANCE_NAME_STARTS_WITH = "vmw_"

View File

@ -166,6 +166,18 @@ L2gwGroup = [
default="192.168.1.0/24",
help="Subnet 1 network cidr."
"Example: 1.1.1.0/24"),
cfg.StrOpt('vm_on_vds_tz1_vlan16_ip',
default="192.168.1.203",
help="IPv4 IP address of VM3"),
cfg.StrOpt('vm_on_switch_vlan16',
default="192.168.1.204",
help="IPv4 IP address of VM4"),
cfg.StrOpt('vm_on_vds_tz2_vlan16_ip',
default="192.168.1.205",
help="IPv4 IP address of VM5"),
cfg.StrOpt('vm_on_vds_tz2_vlan17_ip',
default="192.168.1.206",
help="IPv4 IP address of VM6"),
]
nsxv3_group = cfg.OptGroup(name='nsxv3',

View File

@ -0,0 +1,234 @@
# Copyright 2017 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import netaddr
from oslo_log import log as logging
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
from tempest.scenario import manager
from vmware_nsx_tempest.common import constants
CONF = config.CONF
LOG = logging.getLogger(__name__)
class ApplianceManager(manager.NetworkScenarioTest):
server_details = collections.namedtuple('server_details',
['server', 'floating_ip',
'networks'])
def setUp(self):
super(ApplianceManager, self).setUp()
self.topology_routers = {}
self.topology_networks = {}
self.topology_subnets = {}
self.topology_servers = {}
self.topology_servers_floating_ip = []
self.topology_public_network_id = CONF.network.public_network_id
self.topology_config_drive = CONF.compute_feature_enabled.config_drive
self.topology_keypairs = {}
self.servers_details = {}
def get_internal_ips(self, server, network, device="network"):
internal_ips = [p['fixed_ips'][0]['ip_address'] for p in
self.os_admin.ports_client.list_ports(
tenant_id=server['tenant_id'],
network_id=network['id'])['ports'] if
p['device_owner'].startswith(device)]
return internal_ips
def _verify_empty_security_group_status(self, security_group):
ip_protocols = ["IPV6", "IPV4"]
nsx_fw_section, nsx_fw_section_rules = \
self.nsx_client.get_firewall_section_and_rules(
security_group['name'], security_group['id'])
msg = "Newly created empty security group does not meet criteria !!!"
self.assertEqual(nsx_fw_section["rule_count"], 2, msg)
self.assertEqual(nsx_fw_section_rules[0]["action"], "ALLOW", msg)
self.assertEqual(nsx_fw_section_rules[1]["action"], "ALLOW", msg)
self.assertEqual(nsx_fw_section_rules[0]["direction"], "OUT", msg)
self.assertEqual(nsx_fw_section_rules[1]["direction"], "OUT", msg)
self.assertIn(nsx_fw_section_rules[0]["ip_protocol"], ip_protocols,
msg)
self.assertIn(nsx_fw_section_rules[1]["ip_protocol"], ip_protocols,
msg)
def create_topology_empty_security_group(self, namestart="vmw_"):
security_group = self._create_empty_security_group(namestart=namestart)
self._verify_empty_security_group_status(security_group)
return security_group
def add_security_group_rule(self, security_group, rule):
return self._create_security_group_rule(secgroup=security_group,
**rule)
def get_server_key(self, server):
return self.topology_keypairs[server['key_name']]['private_key']
def create_topology_router(self, router_name, routers_client=None,
**kwargs):
if not routers_client:
routers_client = self.routers_client
router_name_ = constants.APPLIANCE_NAME_STARTS_WITH + router_name
router = self._create_router(namestart=router_name_, **kwargs)
public_network_info = {"external_gateway_info": dict(
network_id=self.topology_public_network_id)}
routers_client.update_router(router['id'], **public_network_info)
self.topology_routers[router_name] = router
return router
def create_topology_network(self, network_name, networks_client=None,
tenant_id=None, port_security_enabled=True, **kwargs):
if not networks_client:
networks_client = self.networks_client
if not tenant_id:
tenant_id = networks_client.tenant_id
network_name_ = constants.APPLIANCE_NAME_STARTS_WITH + network_name
name = data_utils.rand_name(network_name_)
# Neutron disables port security by default so we have to check the
# config before trying to create the network with port_security_enabled
if CONF.network_feature_enabled.port_security:
port_security_enabled = True
result = networks_client.create_network(
name=name, tenant_id=tenant_id,
port_security_enabled=port_security_enabled, **kwargs)
network = result['network']
self.assertEqual(network['name'], name)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
networks_client.delete_network, network['id'])
self.topology_networks[network_name] = network
return network
def create_topology_subnet(
self, subnet_name, network, routers_client=None,
subnets_client=None, router_id=None, ip_version=4, cidr=None,
mask_bits=None, **kwargs):
subnet_name_ = constants.APPLIANCE_NAME_STARTS_WITH + subnet_name
if not subnets_client:
subnets_client = self.subnets_client
if not routers_client:
routers_client = self.routers_client
def cidr_in_use(cidr, tenant_id):
"""Check cidr existence
:returns: True if subnet with cidr already exist in tenant
False else
"""
cidr_in_use = \
self.os_admin.subnets_client.list_subnets(tenant_id=tenant_id,
cidr=cidr)['subnets']
return len(cidr_in_use) != 0
if ip_version == 6:
tenant_cidr = (cidr or netaddr.IPNetwork(
CONF.network.project_network_v6_cidr))
mask_bits = mask_bits or CONF.network.project_network_v6_mask_bits
else:
tenant_cidr = cidr or netaddr.IPNetwork(
CONF.network.project_network_cidr)
mask_bits = mask_bits or CONF.network.project_network_mask_bits
str_cidr = str(tenant_cidr)
if not cidr:
# Repeatedly attempt subnet creation with sequential cidr
# blocks until an unallocated block is found.
for subnet_cidr in tenant_cidr.subnet(mask_bits):
str_cidr = str(subnet_cidr)
if not cidr_in_use(str_cidr, tenant_id=network['tenant_id']):
break
else:
if cidr_in_use(str_cidr, tenant_id=network['tenant_id']):
LOG.error("Specified subnet %r is in use" % str_cidr)
raise
subnet = dict(name=data_utils.rand_name(subnet_name_),
network_id=network['id'], tenant_id=network['tenant_id'],
cidr=str_cidr, ip_version=ip_version, **kwargs)
try:
result = None
result = subnets_client.create_subnet(**subnet)
except lib_exc.Conflict as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
self.assertIsNotNone(result, 'Unable to allocate tenant network')
subnet = result['subnet']
self.assertEqual(subnet['cidr'], str_cidr)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
subnets_client.delete_subnet, subnet['id'])
self.topology_subnets[subnet_name] = subnet
if router_id:
if not routers_client:
routers_client = self.routers_client
routers_client.add_router_interface(
router_id, subnet_id=subnet["id"])
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
routers_client.remove_router_interface, router_id,
subnet_id=subnet["id"])
return subnet
def create_topology_security_group(self, **kwargs):
return self._create_security_group(**kwargs)
def create_topology_instance(
self, server_name, networks, security_groups=None,
config_drive=None, keypair=None, image_id=None,
clients=None, create_floating_ip=True, **kwargs):
# Define security group for server.
if security_groups:
kwargs["security_groups"] = security_groups
else:
_sg = self.create_topology_security_group()
_security_groups = [{'name': _sg['name']}]
kwargs["security_groups"] = _security_groups
# Define config drive for server.
if not config_drive:
kwargs["config_drive"] = self.topology_config_drive
else:
kwargs["config_drive"] = config_drive
if not keypair:
keypair = self.create_keypair()
self.topology_keypairs[keypair['name']] = keypair
kwargs["key_name"] = keypair['name']
else:
kwargs["key_name"] = keypair['name']
# Define image id for server.
if image_id:
kwargs["image_id"] = image_id
server_name_ = constants.APPLIANCE_NAME_STARTS_WITH + server_name
# Collect all the networks for server.
networks_ = []
for net in networks:
net_ = {"uuid": net["id"]}
networks_.append(net_)
# Deploy server with all teh args.
server = self.create_server(
name=server_name_, networks=networks_, clients=clients, **kwargs)
if create_floating_ip:
floating_ip = self.create_floating_ip(server)
server["floating_ip"] = floating_ip
self.topology_servers_floating_ip.append(floating_ip)
else:
floating_ip = None
server_details = self.server_details(server=server,
floating_ip=floating_ip,
networks=networks)
self.servers_details[server_name] = server_details
self.topology_servers[server_name] = server
return server

View File

@ -0,0 +1,172 @@
# Copyright 2017 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import config
from tempest.lib.common.utils import test_utils
from vmware_nsx_tempest._i18n import _
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.lib import traffic_manager
from vmware_nsx_tempest.services import nsx_client
from vmware_nsx_tempest.services import openstack_network_clients
LOG = constants.log.getLogger(__name__)
CONF = config.CONF
# It includes feature related function such CRUD Mdproxy, L2GW or QoS
class FeatureManager(traffic_manager.TrafficManager):
@classmethod
def setup_clients(cls):
"""
Create various client connections. Such as NSXv3 and L2 Gateway.
"""
super(FeatureManager, cls).setup_clients()
try:
manager = getattr(cls.os_admin, "manager", cls.os_admin)
net_client = getattr(manager, "networks_client")
_params = manager.default_params_withy_timeout_values.copy()
except AttributeError as attribute_err:
LOG.warning(
"Failed to locate the attribute, Error: %(err_msg)s",
{"err_msg": attribute_err.__str__()})
_params = {}
cls.l2gw_client = openstack_network_clients.L2GatewayClient(
net_client.auth_provider,
net_client.service,
net_client.region,
net_client.endpoint_type,
**_params)
cls.nsx_client = nsx_client.NSXClient(
CONF.network.backend,
CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
cls.l2gwc_client = openstack_network_clients.L2GatewayConnectionClient(
net_client.auth_provider,
net_client.service,
net_client.region,
net_client.endpoint_type,
**_params)
#
# L2Gateway base class. To get basics of L2GW.
#
def create_l2gw(self, l2gw_name, l2gw_param):
"""
Creates L2GW and returns the response.
:param l2gw_name: name of the L2GW
:param l2gw_param: L2GW parameters
:return: response of L2GW create API
"""
LOG.info("l2gw name: %(name)s, l2gw_param: %(devices)s ",
{"name": l2gw_name, "devices": l2gw_param})
devices = []
for device_dict in l2gw_param:
interface = [{"name": device_dict["iname"],
"segmentation_id": device_dict[
"vlans"]}] if "vlans" in device_dict else [
{"name": device_dict["iname"]}]
device = {"device_name": device_dict["dname"],
"interfaces": interface}
devices.append(device)
l2gw_request_body = {"devices": devices}
LOG.info(" l2gw_request_body: %s", l2gw_request_body)
rsp = self.l2gw_client.create_l2_gateway(
name=l2gw_name, **l2gw_request_body)
LOG.info(" l2gw response: %s", rsp)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.l2gw_client.delete_l2_gateway, rsp[constants.L2GW]["id"])
return rsp, devices
def delete_l2gw(self, l2gw_id):
"""
Delete L2gw.
:param l2gw_id: L2GW id to delete l2gw.
:return: response of the l2gw delete API.
"""
LOG.info("L2GW id: %(id)s to be deleted.", {"id": l2gw_id})
rsp = self.l2gw_client.delete_l2_gateway(l2gw_id)
LOG.info("response : %(rsp)s", {"rsp": rsp})
return rsp
def update_l2gw(self, l2gw_id, l2gw_new_name, devices):
"""
Update existing L2GW.
:param l2gw_id: L2GW id to update its parameters.
:param l2gw_new_name: name of the L2GW.
:param devices: L2GW parameters.
:return: Response of the L2GW update API.
"""
rsp = self.l2gw_client.update_l2_gateway(l2gw_id,
name=l2gw_new_name, **devices)
return rsp
def nsx_bridge_cluster_info(self):
"""
Collect the device and interface name of the nsx brdige cluster.
:return: nsx bridge id and display name.
"""
response = self.nsx_client.get_bridge_cluster_info()
if len(response) == 0:
raise RuntimeError(_("NSX bridge cluster information is null"))
return [(x.get("id"), x.get("display_name")) for x in response]
def create_l2gw_connection(self, l2gwc_param):
"""
Creates L2GWC and return the response.
:param l2gwc_param: L2GWC parameters.
:return: response of L2GWC create API.
"""
LOG.info("l2gwc param: %(param)s ", {"param": l2gwc_param})
l2gwc_request_body = {"l2_gateway_id": l2gwc_param["l2_gateway_id"],
"network_id": l2gwc_param["network_id"]}
if "segmentation_id" in l2gwc_param:
l2gwc_request_body["segmentation_id"] = l2gwc_param[
"segmentation_id"]
LOG.info("l2gwc_request_body: %s", l2gwc_request_body)
rsp = self.l2gwc_client.create_l2_gateway_connection(
**l2gwc_request_body)
LOG.info("l2gwc response: %s", rsp)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.l2gwc_client.delete_l2_gateway_connection,
rsp[constants.L2GWC]["id"])
return rsp
def delete_l2gw_connection(self, l2gwc_id):
"""
Delete L2GWC and returns the response.
:param l2gwc_id: L2GWC id to delete L2GWC.
:return: response of the l2gwc delete API.
"""
LOG.info("L2GW connection id: %(id)s to be deleted",
{"id": l2gwc_id})
rsp = self.l2gwc_client.delete_l2_gateway_connection(l2gwc_id)
LOG.info("response : %(rsp)s", {"rsp": rsp})
return rsp

View File

@ -0,0 +1,65 @@
# Copyright 2017 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from vmware_nsx_tempest.lib import appliance_manager
class TrafficManager(appliance_manager.ApplianceManager):
def check_server_internal_ips_using_floating_ip(self, floating_ip, server,
address_list, should_connect=True):
ip_address = floating_ip['floating_ip_address']
private_key = self.get_server_key(server)
ssh_source = self.get_remote_client(ip_address,
private_key=private_key)
for remote_ip in address_list:
self.check_remote_connectivity(ssh_source, remote_ip,
should_succeed=should_connect)
def check_network_internal_connectivity(self, network, floating_ip, server,
should_connect=True):
"""via ssh check VM internal connectivity:
- ping internal gateway and DHCP port, implying in-tenant connectivity
pinging both, because L3 and DHCP agents might be on different nodes
"""
# get internal ports' ips:
# get all network ports in the new network
internal_ips = self.get_internal_ips(server, network, device="network")
self.check_server_internal_ips_using_floating_ip(floating_ip, server,
internal_ips, should_connect)
def check_vm_internal_connectivity(self, network, floating_ip, server,
should_connect=True):
# test internal connectivity to the other VM on the same network
compute_ips = self.get_internal_ips(server, network, device="compute")
self.check_server_internal_ips_using_floating_ip(floating_ip, server,
compute_ips, should_connect)
def using_floating_ip_check_server_and_project_network_connectivity(self,
server_details, network=None):
if not network:
network = server_details.networks[0]
floating_ip = server_details.floating_ip
server = server_details.server
self.check_network_internal_connectivity(network, floating_ip, server)
self.check_vm_internal_connectivity(network, floating_ip, server)
def check_cross_network_connectivity(self, network1,
floating_ip_on_network2, server_on_network2, should_connect=False):
# test internal connectivity to the other VM on the same network
remote_ips = self.get_internal_ips(server_on_network2, network1,
device="compute")
self.check_server_internal_ips_using_floating_ip(
floating_ip_on_network2, server_on_network2, remote_ips,
should_connect)

View File

@ -16,15 +16,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.network import base
from tempest import config
from tempest import test
from vmware_nsx_tempest._i18n import _
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.services import l2_gateway_client
from vmware_nsx_tempest.services import l2_gateway_connection_client
from vmware_nsx_tempest.services import nsxv3_client
LOG = constants.log.getLogger(__name__)
@ -107,176 +101,3 @@ def form_dict_devices(devices):
devices1.setdefault(device_name, []).append(int_seg)
int_seg = []
return devices1
class BaseL2GatewayTest(base.BaseAdminNetworkTest):
"""
L2Gateway base class. Extend this class to get basics of L2GW.
"""
credentials = ["primary", "admin"]
@classmethod
def skip_checks(cls):
"""
Skip running test if we do not meet criteria to run the tests.
"""
super(BaseL2GatewayTest, cls).skip_checks()
if not test.is_extension_enabled("l2-gateway", "network"):
raise cls.skipException("l2-gateway extension not enabled.")
@classmethod
def setup_clients(cls):
"""
Create various client connections. Such as NSXv3 and L2 Gateway.
"""
super(BaseL2GatewayTest, cls).setup_clients()
cls.l2gw_created = {}
cls.l2gwc_created = {}
try:
manager = getattr(cls.os_adm, "manager", cls.os_adm)
net_client = getattr(manager, "networks_client")
_params = manager.default_params_withy_timeout_values.copy()
except AttributeError as attribute_err:
LOG.warning(
"Failed to locate the attribute, Error: %(err_msg)s",
{"err_msg": attribute_err.__str__()})
_params = {}
cls.l2gw_client = l2_gateway_client.L2GatewayClient(
net_client.auth_provider,
net_client.service,
net_client.region,
net_client.endpoint_type,
**_params)
cls.nsxv3_client_obj = nsxv3_client.NSXV3Client(
CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
cls.l2gwc_client = \
l2_gateway_connection_client.L2GatewayConnectionClient(
net_client.auth_provider,
net_client.service,
net_client.region,
net_client.endpoint_type,
**_params)
@classmethod
def resource_setup(cls):
"""
Setting up the resources for the test.
"""
super(BaseL2GatewayTest, cls).resource_setup()
cls.VLAN_1 = CONF.l2gw.vlan_1
cls.VLAN_2 = CONF.l2gw.vlan_2
@classmethod
def resource_cleanup(cls):
"""
Clean all the resources used during the test.
"""
for l2gw_id in cls.l2gw_created.keys():
cls.l2gw_client.delete_l2_gateway(l2gw_id)
cls.l2gw_created.pop(l2gw_id)
for l2gwc_id in cls.l2gwc_created.keys():
cls.l2gwc_client.delete_l2_gateway_connection(l2gwc_id)
cls.l2gwc_created.pop(l2gwc_id)
def create_l2gw(self, l2gw_name, l2gw_param):
"""
Creates L2GW and return the response.
:param l2gw_name: name of the L2GW
:param l2gw_param: L2GW parameters
:return: response of L2GW create API
"""
LOG.info("l2gw name: %(name)s, l2gw_param: %(devices)s ",
{"name": l2gw_name, "devices": l2gw_param})
devices = []
for device_dict in l2gw_param:
interface = [{"name": device_dict["iname"],
"segmentation_id": device_dict[
"vlans"]}] if "vlans" in device_dict else [
{"name": device_dict["iname"]}]
device = {"device_name": device_dict["dname"],
"interfaces": interface}
devices.append(device)
l2gw_request_body = {"devices": devices}
LOG.info(" l2gw_request_body: %s", l2gw_request_body)
rsp = self.l2gw_client.create_l2_gateway(
name=l2gw_name, **l2gw_request_body)
LOG.info(" l2gw response: %s", rsp)
self.l2gw_created[rsp[constants.L2GW]["id"]] = rsp[constants.L2GW]
return rsp, devices
def delete_l2gw(self, l2gw_id):
"""
Delete L2gw.
:param l2gw_id: L2GW id to delete l2gw.
:return: response of the l2gw delete API.
"""
LOG.info("L2GW id: %(id)s to be deleted.", {"id": l2gw_id})
rsp = self.l2gw_client.delete_l2_gateway(l2gw_id)
LOG.info("response : %(rsp)s", {"rsp": rsp})
return rsp
def update_l2gw(self, l2gw_id, l2gw_new_name, devices):
"""
Update existing L2GW.
:param l2gw_id: L2GW id to update its parameters.
:param l2gw_new_name: name of the L2GW.
:param devices: L2GW parameters.
:return: Response of the L2GW update API.
"""
rsp = self.l2gw_client.update_l2_gateway(l2gw_id,
name=l2gw_new_name, **devices)
return rsp
def nsx_bridge_cluster_info(self):
"""
Collect the device and interface name of the nsx brdige cluster.
:return: nsx bridge id and display name.
"""
response = self.nsxv3_client_obj.get_bridge_cluster_info()
if len(response) == 0:
raise RuntimeError(_("NSX bridge cluster information is null"))
return [(x.get("id"), x.get("display_name")) for x in response]
def create_l2gw_connection(self, l2gwc_param):
"""
Creates L2GWC and return the response.
:param l2gwc_param: L2GWC parameters.
:return: response of L2GWC create API.
"""
LOG.info("l2gwc param: %(param)s ", {"param": l2gwc_param})
l2gwc_request_body = {"l2_gateway_id": l2gwc_param["l2_gateway_id"],
"network_id": l2gwc_param["network_id"]}
if "segmentation_id" in l2gwc_param:
l2gwc_request_body["segmentation_id"] = l2gwc_param[
"segmentation_id"]
LOG.info("l2gwc_request_body: %s", l2gwc_request_body)
rsp = self.l2gwc_client.create_l2_gateway_connection(
**l2gwc_request_body)
LOG.info("l2gwc response: %s", rsp)
self.l2gwc_created[rsp[constants.L2GWC]["id"]] = rsp[constants.L2GWC]
return rsp
def delete_l2gw_connection(self, l2gwc_id):
"""
Delete L2GWC and returns the response.
:param l2gwc_id: L2GWC id to delete L2GWC.
:return: response of the l2gwc delete API.
"""
LOG.info("L2GW connection id: %(id)s to be deleted",
{"id": l2gwc_id})
rsp = self.l2gwc_client.delete_l2_gateway_connection(l2gwc_id)
LOG.info("response : %(rsp)s", {"rsp": rsp})
return rsp

View File

@ -0,0 +1,46 @@
# Copyright 2017 VMware Inc
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from vmware_nsx_tempest.services import nsxv3_client
LOG = logging.getLogger(__name__)
class NSXClient(object):
"""Base NSX REST client"""
def __init__(self, backend, host, username, password, *args, **kwargs):
self.backend = backend.lower()
self.host = host
self.username = username
self.password = password
if backend.lower() == "nsxv3":
self.nsx = nsxv3_client.NSXV3Client(host, username, password)
def get_firewall_section_and_rules(self, *args, **kwargs):
if self.backend == "nsxv3":
firewall_section = self.nsx.get_firewall_section(
*args, **kwargs)
firewall_section_rules = self.nsx.get_firewall_section_rules(
firewall_section)
return firewall_section, firewall_section_rules
else:
#TODO(ddoshi) define else for nsxv
pass
def get_bridge_cluster_info(self, *args, **kwargs):
if self.backend == "nsxv3":
return self.nsx.get_bridge_cluster_info(
*args, **kwargs)

View File

@ -15,6 +15,7 @@
import base64
from copy import deepcopy
import time
import requests
import six.moves.urllib.parse as urlparse
@ -22,6 +23,8 @@ import six.moves.urllib.parse as urlparse
from oslo_log import log as logging
from oslo_serialization import jsonutils
from vmware_nsx_tempest.common import constants
requests.packages.urllib3.disable_warnings()
LOG = logging.getLogger(__name__)
@ -70,7 +73,7 @@ class NSXV3Client(object):
self.api_version = api_version
def get_api_version(self):
return self.api
return self.api_version
def __set_url(self, api=None, secure=None, host=None, endpoint=None):
api = self.api_version if api is None else api
@ -357,8 +360,18 @@ class NSXV3Client(object):
"created")
return None
nsx_name = os_name + " - " + os_uuid
fw_sections = self.get_firewall_sections()
return self.get_nsx_resource_by_name(fw_sections, nsx_name)
nsx_firewall_time_counter = 0
nsx_dfw_section = None
# wait till timeout or till dfw section
while nsx_firewall_time_counter < \
constants.NSX_FIREWALL_REALIZED_TIMEOUT and \
not nsx_dfw_section:
nsx_firewall_time_counter += 1
fw_sections = self.get_firewall_sections()
nsx_dfw_section = self.get_nsx_resource_by_name(fw_sections,
nsx_name)
time.sleep(constants.ONE_SEC)
return nsx_dfw_section
def get_firewall_section_rules(self, fw_section):
"""
@ -521,10 +534,10 @@ class NSXV3Client(object):
"""
cert_response = self.get_nsx_certificate()
for cert in cert_response['results']:
if (cert["_create_user"] == "admin" and
cert["resource_type"] == "certificate_self_signed" and
cert["display_name"]
!= "NSX MP Client Certificate for Key Manager"):
if (cert["_create_user"] == "admin" and cert[
"resource_type"] == "certificate_self_signed" and cert[
"display_name"] != "NSX MP Client Certificate for Key "
"Manager"):
LOG.info('Client certificate created')
return cert
LOG.error("Client Certificate not created")

View File

@ -0,0 +1,100 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from tempest.lib.services.network import base
from vmware_nsx_tempest.common import constants
LOG = log.getLogger(__name__)
class L2GatewayClient(base.BaseNetworkClient):
"""
Request resources via API for L2GatewayClient
l2 gateway create request
l2 gateway update request
l2 gateway show request
l2 gateway delete request
l2 gateway list all request
"""
def create_l2_gateway(self, **kwargs):
uri = constants.L2_GWS_BASE_URI
post_data = {constants.L2GW: kwargs}
LOG.info("URI : %(uri)s, posting data : %(post_data)s",
{"uri": uri, "post_data": post_data})
return self.create_resource(uri, post_data)
def update_l2_gateway(self, l2_gateway_id, **kwargs):
uri = constants.L2_GWS_BASE_URI + "/" + l2_gateway_id
post_data = {constants.L2GW: kwargs}
constants.LOG.info(
"URI : %(uri)s, posting data : %(post_data)s",
{"uri": uri, "post_data": post_data})
return self.update_resource(uri, post_data)
def show_l2_gateway(self, l2_gateway_id, **fields):
uri = constants.L2_GWS_BASE_URI + "/" + l2_gateway_id
LOG.info("URI : %(uri)s", {"uri": uri})
return self.show_resource(uri, **fields)
def delete_l2_gateway(self, l2_gateway_id):
uri = constants.L2_GWS_BASE_URI + "/" + l2_gateway_id
LOG.info("URI : %(uri)s", {"uri": uri})
return self.delete_resource(uri)
def list_l2_gateways(self, **filters):
uri = constants.L2_GWS_BASE_URI
LOG.info("URI : %(uri)s", {"uri": uri})
return self.list_resources(uri, **filters)
class L2GatewayConnectionClient(base.BaseNetworkClient):
"""
Request resources via API for L2GatewayClient
l2 gateway connection create request
l2 gateway connection update request
l2 gateway connection show request
l2 gateway connection delete request
l2 gateway connection list all request
"""
resource = 'l2_gateway_connection'
resource_plural = 'l2_gateway_connections'
path = 'l2-gateway-connections'
resource_base_path = '/%s' % path
resource_object_path = '/%s/%%s' % path
def create_l2_gateway_connection(self, **kwargs):
uri = self.resource_base_path
post_data = {self.resource: kwargs}
return self.create_resource(uri, post_data)
def update_l2_gateway_connection(self, l2_gateway_id, **kwargs):
uri = self.resource_object_path % l2_gateway_id
post_data = {self.resource: kwargs}
return self.update_resource(uri, post_data)
def show_l2_gateway_connection(self, l2_gateway_id, **fields):
uri = self.resource_object_path % l2_gateway_id
return self.show_resource(uri, **fields)
def delete_l2_gateway_connection(self, l2_gateway_id):
uri = self.resource_object_path % l2_gateway_id
return self.delete_resource(uri)
def list_l2_gateway_connections(self, **filters):
uri = self.resource_base_path
return self.list_resources(uri, **filters)

View File

@ -14,20 +14,55 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.services import base_l2gw
from vmware_nsx_tempest.lib import feature_manager
LOG = constants.log.getLogger(__name__)
CONF = config.CONF
NON_EXIST_UUID = "12341234-0000-1111-2222-000000000000"
class L2GatewayTest(base_l2gw.BaseL2GatewayTest):
class L2GatewayBase(feature_manager.FeatureManager):
@classmethod
def skip_checks(cls):
"""
Skip running test if we do not meet criteria to run the tests.
"""
super(L2GatewayBase, cls).skip_checks()
if not test.is_extension_enabled("l2-gateway", "network"):
raise cls.skipException("l2-gateway extension not enabled.")
@classmethod
def resource_setup(cls):
"""
Setting up the resources for the test.
"""
super(L2GatewayBase, cls).resource_setup()
cls.VLAN_1 = CONF.l2gw.vlan_1
cls.VLAN_2 = CONF.l2gw.vlan_2
# Create subnet on the network just created.
cls.SUBNET_1_NETWORK_CIDR = CONF.l2gw.subnet_1_cidr
cls.SUBNET_1_MASK = cls.SUBNET_1_NETWORK_CIDR.split("/")[1]
def deploy_l2gateway_topology(self):
network_l2gateway = self.create_topology_network("network_l2gateway")
# cidr must be presented & in IPNetwork structure.
self.CIDR = netaddr.IPNetwork(self.SUBNET_1_NETWORK_CIDR)
self.create_topology_subnet(
"subnet1_l2gateway", network_l2gateway, cidr=self.CIDR,
mask_bits=int(self.SUBNET_1_MASK))
class L2GatewayTest(L2GatewayBase):
"""
Test l2 gateway operations.
"""
@ -55,7 +90,6 @@ class L2GatewayTest(base_l2gw.BaseL2GatewayTest):
self.assertEqual(requested_devices[0]["device_name"],
rsp[constants.L2GW]["devices"][0]["device_name"],
"Device name is not the same as expected")
self.resource_cleanup()
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("9968a529-e785-472f-8705-9b394a912e43")
@ -92,7 +126,6 @@ class L2GatewayTest(base_l2gw.BaseL2GatewayTest):
"segmentation_id"]
for id in requested_vlans:
self.assertIn(id, response_vlans)
self.resource_cleanup()
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("3861aab0-4f76-4472-ad0e-a255e6e42193")
@ -129,7 +162,6 @@ class L2GatewayTest(base_l2gw.BaseL2GatewayTest):
"segmentation_id"]
for id in requested_vlans:
self.assertIn(id, response_vlans)
self.resource_cleanup()
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("670cbcb5-134e-467d-ba41-0d7cdbcf3903")
@ -159,8 +191,6 @@ class L2GatewayTest(base_l2gw.BaseL2GatewayTest):
rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_204})
self.l2gw_created.pop(l2gw_id)
self.resource_cleanup()
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("fa76f6e6-8aa7-46d8-9af4-2206d0773dc3")
@ -202,7 +232,6 @@ class L2GatewayTest(base_l2gw.BaseL2GatewayTest):
"l2gw name=%(rsp_name)s is not the same as "
"requested=%(name)s" % {"rsp_name": rsp_l2gw["name"],
"name": l2gw_new_name})
self.resource_cleanup()
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("c4977df8-8e3a-4b7e-a8d2-5aa757117658")
@ -240,7 +269,6 @@ class L2GatewayTest(base_l2gw.BaseL2GatewayTest):
"code":
constants.EXPECTED_HTTP_RESPONSE_200})
rsp_l2gw = update_rsp[constants.L2GW]
self.l2gw_created[rsp_l2gw["id"]] = rsp_l2gw
LOG.info("response : %(rsp_l2gw)s", {"rsp_l2gw": rsp_l2gw})
if "segmentation_id" in devices["devices"][0]["interfaces"][0]:
self.assertEqual(devices["devices"][0]["interfaces"][0][
@ -248,7 +276,6 @@ class L2GatewayTest(base_l2gw.BaseL2GatewayTest):
rsp_l2gw["devices"][0]["interfaces"][0][
"segmentation_id"][0],
"L2GW segmentation id update failed!!!")
self.resource_cleanup()
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("5a3cef97-c91c-4e03-92c8-d180f9269f27")
@ -291,7 +318,6 @@ class L2GatewayTest(base_l2gw.BaseL2GatewayTest):
response_vlans = show_rsp[0]["interfaces"][0]["segmentation_id"]
for id in requested_vlans:
self.assertIn(id, response_vlans)
self.resource_cleanup()
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("d4a7d3af-e637-45c5-a967-d179153a6e58")
@ -325,4 +351,454 @@ class L2GatewayTest(base_l2gw.BaseL2GatewayTest):
break
self.assertEqual(l2gw_rsp, list_rsp, "L2GW create response and L2GW "
"list response does not match.")
self.resource_cleanup()
class L2GatewayConnectionTest(L2GatewayBase):
"""
Test l2 gateway connection operations.
"""
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("81edfb9e-4722-4565-939c-6593b8405ff4")
def test_l2_gateway_connection_create(self):
"""
Create l2 gateway connection using one vlan. Vlan parameter is
passed into L2GW create.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
self.deploy_l2gateway_topology()
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name,
"vlans": [self.VLAN_1]}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id":
self.topology_networks["network_l2gateway"]["id"]}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
self.assertEqual(l2gwc_param["l2_gateway_id"],
l2gwc_rsp[constants.L2GWC]["l2_gateway_id"],
"l2gw id is not same as expected in "
"create l2gw connection response")
self.assertEqual(l2gwc_param["network_id"],
l2gwc_rsp[constants.L2GWC]["network_id"],
"network id is not same as expected in "
"create l2gw connection response")
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("7db4f6c9-18c5-4a99-93c1-68bc2ecb48a7")
def test_l2_gateway_connection_create_with_multiple_vlans(self):
"""
Create l2 gateway connection using multiple vlans. Vlan parameter is
passed into L2GW create.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
self.deploy_l2gateway_topology()
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name,
"vlans": [self.VLAN_1, self.VLAN_2]}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id":
self.topology_networks["network_l2gateway"]["id"]}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
self.assertEqual(l2gwc_param["l2_gateway_id"],
l2gwc_rsp[constants.L2GWC]["l2_gateway_id"],
"l2gw id is not same as expected in "
"create l2gw connection response")
self.assertEqual(l2gwc_param["network_id"],
l2gwc_rsp[constants.L2GWC]["network_id"],
"network id is not same as expected in "
"create l2gw connection response")
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("de70d6a2-d454-4a09-b06b-8f39be67b635")
def test_l2_gateway_connection_with_seg_id_create(self):
"""
Create l2 gateway connection using one vlan. Vlan parameter is
passed into L2GW connection create.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
self.deploy_l2gateway_topology()
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id":
self.topology_networks["network_l2gateway"]["id"],
"segmentation_id": self.VLAN_1}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
self.assertEqual(l2gwc_param["l2_gateway_id"],
l2gwc_rsp[constants.L2GWC]["l2_gateway_id"],
"l2gw id is not same as expected in "
"create l2gw connection response")
self.assertEqual(l2gwc_param["network_id"],
l2gwc_rsp[constants.L2GWC]["network_id"],
"network id is not same as expected in "
"create l2gw connection response")
self.assertEqual(l2gwc_param["segmentation_id"],
l2gwc_rsp[constants.L2GWC]["segmentation_id"],
"segmentation id is not same as expected in "
"create l2gw connection response")
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("819d9b50-9159-48d0-be2a-493ec686534c")
def test_l2_gateway_connection_show(self):
"""
Create l2 gateway connection using one vlan and tes l2 gateway
connection show api
"""
LOG.info("Testing test_l2_gateway_connection_create api")
self.deploy_l2gateway_topology()
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id":
self.topology_networks["network_l2gateway"]["id"],
"segmentation_id": self.VLAN_1}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
l2gwc_id = l2gwc_rsp[constants.L2GWC]["id"]
show_rsp = self.l2gwc_client.show_l2_gateway_connection(l2gwc_id)
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_200,
show_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_200})
self.assertEqual(l2gwc_param["l2_gateway_id"],
show_rsp[constants.L2GWC]["l2_gateway_id"],
"l2gw id is not same as expected in "
"show l2gw connection response")
self.assertEqual(l2gwc_param["network_id"],
show_rsp[constants.L2GWC]["network_id"],
"network id is not same as expected in "
"show l2gw connection response")
show_rsp_seg_id = str(show_rsp[constants.L2GWC][
"segmentation_id"])
self.assertEqual(l2gwc_param["segmentation_id"],
show_rsp_seg_id,
"segmentation id is not same as expected in "
"show l2gw connection response")
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("4188f8e7-cd65-427e-92b8-2a9e0492ab21")
def test_l2_gateway_connection_list(self):
"""
Create l2 gateway connection using one vlan and test l2 gateway
connection list api.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
self.deploy_l2gateway_topology()
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
# Create 2 l2 gateways.
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
# Create 2 l2 gateway connections.
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id":
self.topology_networks["network_l2gateway"]["id"],
"segmentation_id": self.VLAN_1}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
list_rsp = self.l2gwc_client.list_l2_gateway_connections()
LOG.info("l2gw connection list response: %s", list_rsp)
# Assert in case of failure.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_200,
list_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_200})
self.assertEqual(l2gwc_rsp["l2_gateway_connection"]["id"],
list_rsp["l2_gateway_connections"][0]["id"],
"l2gw connection list does not show proper id")
self.assertEqual(l2gwc_rsp["l2_gateway_connection"]["l2_gateway_id"],
list_rsp["l2_gateway_connections"][0][
"l2_gateway_id"],
"l2gw connection list does not show proper "
"l2_gateway_id")
self.assertEqual(l2gwc_rsp["l2_gateway_connection"]["network_id"],
list_rsp["l2_gateway_connections"][0]["network_id"],
"l2gw connection list does not show proper "
"network_id")
self.assertEqual(l2gwc_rsp["l2_gateway_connection"]["tenant_id"],
list_rsp["l2_gateway_connections"][0]["tenant_id"],
"l2gw connection list does not show proper tenant_id")
self.assertEqual(l2gwc_rsp["l2_gateway_connection"]["segmentation_id"],
str(list_rsp["l2_gateway_connections"][0][
"segmentation_id"]),
"l2gw connection list does not show proper "
"segmentation_id")
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("4d71111f-3d2b-4557-97c7-2e149a6f41fb")
def test_l2_gateway_connection_recreate(self):
"""
Recreate l2 gateway connection.
- Create l2GW.
- Create l2gw connection.
- delete l2gw connection.
- Recreate l2gw connection
- verify with l2gw connection list API.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
self.deploy_l2gateway_topology()
# List all the L2GW connection.
list_rsp = self.l2gwc_client.list_l2_gateway_connections()
LOG.info("l2gw connection list response: %s", list_rsp)
# Assert in case of failure.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_200,
list_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_200})
list_rsp = list_rsp["l2_gateway_connections"]
l2gwc_ids = [item.get("id") for item in list_rsp if "id"
in item]
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id":
self.topology_networks["network_l2gateway"]["id"],
"segmentation_id": self.VLAN_1}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
l2gwc_id = l2gwc_rsp[constants.L2GWC]["id"]
# Delete l2gw.
rsp = self.delete_l2gw_connection(l2gwc_id)
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_204,
rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_204})
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
# List all the L2GW connection.
list_rsp = self.l2gwc_client.list_l2_gateway_connections()
LOG.info("l2gw connection list response: %s", list_rsp)
# Assert in case of failure.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_200,
list_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_200})
list_rsp = list_rsp["l2_gateway_connections"]
l2gwc_ids = l2gwc_ids + [item.get("id") for item in list_rsp if
"id" in item]
self.assertNotIn(l2gwc_id, l2gwc_ids, "l2gwc list api shows hanging "
"l2gwc id")
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("671cacb5-134e-467d-ba41-0d7cdbcf3903")
def test_l2_gateway_connection_delete(self):
"""
Delete l2gw will create l2gw and delete recently created l2gw. To
delete l2gw we need l2gw id.
"""
LOG.info("Testing l2_gateway_connection_delete api")
self.deploy_l2gateway_topology()
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id":
self.topology_networks["network_l2gateway"]["id"],
"segmentation_id": self.VLAN_1}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
l2gwc_id = l2gwc_rsp[constants.L2GWC]["id"]
# Delete l2gw.
rsp = self.delete_l2gw_connection(l2gwc_id)
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_204,
rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_204})
class L2GatewayConnectionNegative(L2GatewayBase):
"""
Negative L2GW tests.
"""
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("e86bd8e9-b32b-425d-86fa-cd866138d028")
def test_active_l2_gateway_delete(self):
"""
Delete l2 gateway with active mapping.
"""
LOG.info("Testing test_l2_gateway_create api")
self.deploy_l2gateway_topology()
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id":
self.topology_networks["network_l2gateway"]["id"],
"segmentation_id": self.VLAN_1}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
l2gw_id = l2gw_rsp[constants.L2GW]["id"]
# Delete l2gw must raise Conflict exception.
self.assertRaises(lib_exc.Conflict, self.delete_l2gw, l2gw_id)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("488faaae-180a-4c48-8b7a-44c3a243369f")
def test_recreate_l2_gateway_connection(self):
"""
Recreate l2 gateway connection using same parameters.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
self.deploy_l2gateway_topology()
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name,
"vlans": [self.VLAN_1]}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id":
self.topology_networks["network_l2gateway"]["id"]}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
self.assertRaises(lib_exc.Conflict, self.create_l2gw_connection,
l2gwc_param)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("14606e74-4f65-402e-ae50-a0adcd877a83")
def test_create_l2gwc_with_nonexist_l2gw(self):
"""
Create l2 gateway connection using non exist l2gw uuid.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
self.deploy_l2gateway_topology()
non_exist_l2gw_uuid = NON_EXIST_UUID
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": non_exist_l2gw_uuid,
"network_id":
self.topology_networks["network_l2gateway"]["id"],
"segmentation_id": self.VLAN_1}
# Delete l2gw must raise Conflict exception.
self.assertRaises(lib_exc.NotFound, self.create_l2gw_connection,
l2gwc_param)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("e6cb8973-fcbc-443e-a3cb-c6a82ae58b63")
def test_create_l2gwc_with_nonexist_network(self):
"""
Create l2 gateway connection using non exist l2gw uuid.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
non_exist_network_uuid = NON_EXIST_UUID
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id": non_exist_network_uuid,
"segmentation_id": self.VLAN_1}
# Delete l2gw must raise Conflict exception.
self.assertRaises(lib_exc.NotFound, self.create_l2gw_connection,
l2gwc_param)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("27c7c64f-511f-421e-8b62-dfed143fc00b")
def test_create_l2gw_with_invalid_seg_id(self):
"""
Create l2 gateway connection using invalid seg id.
"""
LOG.info("Testing l2_gateway_create api with segmentation ID")
invalid_seg_id = 20000
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name,
"vlans": [invalid_seg_id]}
l2gw_param = [device_1]
self.assertRaises(lib_exc.BadRequest, self.create_l2gw, l2gw_name,
l2gw_param)
@decorators.skip_because(bug="1640033")
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("000cc597-bcea-4539-af07-bd70357e8d82")
def test_create_l2gw_with_non_int_seg_id(self):
"""
Create l2 gateway connection using invalid seg id.
"""
LOG.info("Testing l2_gateway_create api with segmentation ID")
invalid_seg_id = 2.45
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name,
"vlans": [invalid_seg_id]}
l2gw_param = [device_1]
self.assertRaises(lib_exc.BadRequest, self.create_l2gw, l2gw_name,
l2gw_param)

View File

@ -1,379 +0,0 @@
# Copyright 2015 OpenStack Foundation
# Copyright 2016 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.services import base_l2gw
CONF = config.CONF
LOG = constants.log.getLogger(__name__)
class L2GatewayConnectionTest(base_l2gw.BaseL2GatewayTest):
"""
Test l2 gateway connection operations.
"""
@classmethod
def resource_setup(cls):
"""
Setting up the resources for the test.
"""
super(L2GatewayConnectionTest, cls).resource_setup()
# Create a network.
cls.network = cls.create_network()
# Create subnet on the network just created.
cls.SUBNET_1_NETWORK_CIDR = CONF.l2gw.subnet_1_cidr
network_cidr = cls.SUBNET_1_NETWORK_CIDR.split("/")
cls.SUBNET_1_MASK = network_cidr[1]
subnet_info = {}
# cidr must be presented & in IPNetwork structure.
cls.CIDR = netaddr.IPNetwork(cls.SUBNET_1_NETWORK_CIDR)
cls.subnet = cls.create_subnet(cls.network, cidr=cls.CIDR,
mask_bits=int(cls.SUBNET_1_MASK),
**subnet_info)
@classmethod
def resource_cleanup(cls):
"""
Clean all the resources used during the test.
"""
super(L2GatewayConnectionTest, cls).resource_cleanup()
test_utils.call_and_ignore_notfound_exc(
cls.networks_client.delete_network, cls.network["id"])
@classmethod
def l2gw_cleanup(cls):
"""
Delete created L2GWs and L2GWCs.
"""
for l2gwc_id in cls.l2gwc_created.keys():
cls.l2gwc_client.delete_l2_gateway_connection(l2gwc_id)
cls.l2gwc_created.pop(l2gwc_id)
for l2gw_id in cls.l2gw_created.keys():
cls.l2gw_client.delete_l2_gateway(l2gw_id)
cls.l2gw_created.pop(l2gw_id)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("81edfb9e-4722-4565-939c-6593b8405ff4")
def test_l2_gateway_connection_create(self):
"""
Create l2 gateway connection using one vlan. Vlan parameter is
passed into L2GW create.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name,
"vlans": [self.VLAN_1]}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id": self.network["id"]}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
self.assertEqual(l2gwc_param["l2_gateway_id"],
l2gwc_rsp[constants.L2GWC]["l2_gateway_id"],
"l2gw id is not same as expected in "
"create l2gw connection response")
self.assertEqual(l2gwc_param["network_id"],
l2gwc_rsp[constants.L2GWC]["network_id"],
"network id is not same as expected in "
"create l2gw connection response")
self.addCleanup(self.l2gw_cleanup)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("7db4f6c9-18c5-4a99-93c1-68bc2ecb48a7")
def test_l2_gateway_connection_create_with_multiple_vlans(self):
"""
Create l2 gateway connection using multiple vlans. Vlan parameter is
passed into L2GW create.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name,
"vlans": [self.VLAN_1, self.VLAN_2]}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id": self.network["id"]}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
self.assertEqual(l2gwc_param["l2_gateway_id"],
l2gwc_rsp[constants.L2GWC]["l2_gateway_id"],
"l2gw id is not same as expected in "
"create l2gw connection response")
self.assertEqual(l2gwc_param["network_id"],
l2gwc_rsp[constants.L2GWC]["network_id"],
"network id is not same as expected in "
"create l2gw connection response")
self.addCleanup(self.l2gw_cleanup)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("de70d6a2-d454-4a09-b06b-8f39be67b635")
def test_l2_gateway_connection_with_seg_id_create(self):
"""
Create l2 gateway connection using one vlan. Vlan parameter is
passed into L2GW connection create.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id": self.network["id"],
"segmentation_id": self.VLAN_1}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
self.assertEqual(l2gwc_param["l2_gateway_id"],
l2gwc_rsp[constants.L2GWC]["l2_gateway_id"],
"l2gw id is not same as expected in "
"create l2gw connection response")
self.assertEqual(l2gwc_param["network_id"],
l2gwc_rsp[constants.L2GWC]["network_id"],
"network id is not same as expected in "
"create l2gw connection response")
self.assertEqual(l2gwc_param["segmentation_id"],
l2gwc_rsp[constants.L2GWC]["segmentation_id"],
"segmentation id is not same as expected in "
"create l2gw connection response")
self.addCleanup(self.l2gw_cleanup)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("819d9b50-9159-48d0-be2a-493ec686534c")
def test_l2_gateway_connection_show(self):
"""
Create l2 gateway connection using one vlan and tes l2 gateway
connection show api
"""
LOG.info("Testing test_l2_gateway_connection_create api")
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id": self.network["id"],
"segmentation_id": self.VLAN_1}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
l2gwc_id = l2gwc_rsp[constants.L2GWC]["id"]
show_rsp = self.l2gwc_client.show_l2_gateway_connection(l2gwc_id)
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_200,
show_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_200})
self.assertEqual(l2gwc_param["l2_gateway_id"],
show_rsp[constants.L2GWC]["l2_gateway_id"],
"l2gw id is not same as expected in "
"show l2gw connection response")
self.assertEqual(l2gwc_param["network_id"],
show_rsp[constants.L2GWC]["network_id"],
"network id is not same as expected in "
"show l2gw connection response")
show_rsp_seg_id = str(show_rsp[constants.L2GWC][
"segmentation_id"])
self.assertEqual(l2gwc_param["segmentation_id"],
show_rsp_seg_id,
"segmentation id is not same as expected in "
"show l2gw connection response")
self.addCleanup(self.l2gw_cleanup)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("4188f8e7-cd65-427e-92b8-2a9e0492ab21")
def test_l2_gateway_connection_list(self):
"""
Create l2 gateway connection using one vlan and test l2 gateway
connection list api.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
# Create 2 l2 gateways.
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
# Create 2 l2 gateway connections.
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id": self.network["id"],
"segmentation_id": self.VLAN_1}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
list_rsp = self.l2gwc_client.list_l2_gateway_connections()
LOG.info("l2gw connection list response: %s", list_rsp)
# Assert in case of failure.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_200,
list_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_200})
self.assertEqual(l2gwc_rsp["l2_gateway_connection"]["id"],
list_rsp["l2_gateway_connections"][0]["id"],
"l2gw connection list does not show proper id")
self.assertEqual(l2gwc_rsp["l2_gateway_connection"]["l2_gateway_id"],
list_rsp["l2_gateway_connections"][0][
"l2_gateway_id"],
"l2gw connection list does not show proper "
"l2_gateway_id")
self.assertEqual(l2gwc_rsp["l2_gateway_connection"]["network_id"],
list_rsp["l2_gateway_connections"][0]["network_id"],
"l2gw connection list does not show proper "
"network_id")
self.assertEqual(l2gwc_rsp["l2_gateway_connection"]["tenant_id"],
list_rsp["l2_gateway_connections"][0]["tenant_id"],
"l2gw connection list does not show proper tenant_id")
self.assertEqual(l2gwc_rsp["l2_gateway_connection"]["segmentation_id"],
str(list_rsp["l2_gateway_connections"][0][
"segmentation_id"]),
"l2gw connection list does not show proper "
"segmentation_id")
self.addCleanup(self.l2gw_cleanup)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("4d71111f-3d2b-4557-97c7-2e149a6f41fb")
def test_l2_gateway_connection_recreate(self):
"""
Recreate l2 gateway connection.
- Create l2GW.
- Create l2gw connection.
- delete l2gw connection.
- Recreate l2gw connection
- verify with l2gw connection list API.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
# List all the L2GW connection.
list_rsp = self.l2gwc_client.list_l2_gateway_connections()
LOG.info("l2gw connection list response: %s", list_rsp)
# Assert in case of failure.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_200,
list_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_200})
list_rsp = list_rsp["l2_gateway_connections"]
l2gwc_ids = [item.get("id") for item in list_rsp if "id"
in item]
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id": self.network["id"],
"segmentation_id": self.VLAN_1}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
l2gwc_id = l2gwc_rsp[constants.L2GWC]["id"]
# Delete l2gw.
rsp = self.delete_l2gw_connection(l2gwc_id)
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_204,
rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_204})
# Since we delete l2gwc pop that id from list.
self.l2gwc_created.pop(l2gwc_id)
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
# List all the L2GW connection.
list_rsp = self.l2gwc_client.list_l2_gateway_connections()
LOG.info("l2gw connection list response: %s", list_rsp)
# Assert in case of failure.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_200,
list_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_200})
list_rsp = list_rsp["l2_gateway_connections"]
l2gwc_ids = l2gwc_ids + [item.get("id") for item in list_rsp if
"id" in item]
self.assertNotIn(l2gwc_id, l2gwc_ids, "l2gwc list api shows hanging "
"l2gwc id")
self.addCleanup(self.l2gw_cleanup)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("671cacb5-134e-467d-ba41-0d7cdbcf3903")
def test_l2_gateway_connection_delete(self):
"""
Delete l2gw will create l2gw and delete recently created l2gw. To
delete l2gw we need l2gw id.
"""
LOG.info("Testing l2_gateway_connection_delete api")
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id": self.network["id"],
"segmentation_id": self.VLAN_1}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
l2gwc_id = l2gwc_rsp[constants.L2GWC]["id"]
# Delete l2gw.
rsp = self.delete_l2gw_connection(l2gwc_id)
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_204,
rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_204})
# Since we delete l2gwc pop that id from list.
self.l2gwc_created.pop(l2gwc_id)
self.addCleanup(self.l2gw_cleanup)

View File

@ -1,212 +0,0 @@
# Copyright 2015 OpenStack Foundation
# Copyright 2016 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.services import base_l2gw
CONF = config.CONF
NON_EXIST_UUID = "12341234-0000-1111-2222-000000000000"
LOG = constants.log.getLogger(__name__)
class L2GatewayConnectionNegative(base_l2gw.BaseL2GatewayTest):
"""
Negative L2GW tests.
"""
@classmethod
def resource_setup(cls):
"""
Setting up the resources for the test.
"""
super(L2GatewayConnectionNegative, cls).resource_setup()
# Create a network.
cls.network = cls.create_network()
# Create subnet on the network just created.
cls.SUBNET_1_NETWORK_CIDR = CONF.l2gw.subnet_1_cidr
network_cidr = cls.SUBNET_1_NETWORK_CIDR.split("/")
cls.SUBNET_1_MASK = network_cidr[1]
subnet_info = {}
# cidr must be presented & in IPNetwork structure.
cls.CIDR = netaddr.IPNetwork(cls.SUBNET_1_NETWORK_CIDR)
cls.subnet = cls.create_subnet(cls.network, cidr=cls.CIDR,
mask_bits=int(cls.SUBNET_1_MASK),
**subnet_info)
@classmethod
def resource_cleanup(cls):
"""
Clean all the resources used during the test.
"""
super(L2GatewayConnectionNegative, cls).resource_cleanup()
test_utils.call_and_ignore_notfound_exc(
cls.networks_client.delete_network, cls.network["id"])
@classmethod
def l2gw_cleanup(cls):
"""
Delete created L2GWs and L2GWCs.
"""
for l2gwc_id in cls.l2gwc_created.keys():
cls.l2gwc_client.delete_l2_gateway_connection(l2gwc_id)
cls.l2gwc_created.pop(l2gwc_id)
for l2gw_id in cls.l2gw_created.keys():
cls.l2gw_client.delete_l2_gateway(l2gw_id)
cls.l2gw_created.pop(l2gw_id)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("e86bd8e9-b32b-425d-86fa-cd866138d028")
def test_active_l2_gateway_delete(self):
"""
Delete l2 gateway with active mapping.
"""
LOG.info("Testing test_l2_gateway_create api")
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id": self.network["id"],
"segmentation_id": self.VLAN_1}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
l2gw_id = l2gw_rsp[constants.L2GW]["id"]
# Delete l2gw must raise Conflict exception.
self.assertRaises(lib_exc.Conflict, self.delete_l2gw, l2gw_id)
self.addCleanup(self.l2gw_cleanup)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("488faaae-180a-4c48-8b7a-44c3a243369f")
def test_recreate_l2_gateway_connection(self):
"""
Recreate l2 gateway connection using same parameters.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name,
"vlans": [self.VLAN_1]}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id": self.network["id"]}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
self.assertRaises(lib_exc.Conflict, self.create_l2gw_connection,
l2gwc_param)
self.addCleanup(self.l2gw_cleanup)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("14606e74-4f65-402e-ae50-a0adcd877a83")
def test_create_l2gwc_with_nonexist_l2gw(self):
"""
Create l2 gateway connection using non exist l2gw uuid.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
non_exist_l2gw_uuid = NON_EXIST_UUID
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": non_exist_l2gw_uuid,
"network_id": self.network["id"],
"segmentation_id": self.VLAN_1}
# Delete l2gw must raise Conflict exception.
self.assertRaises(lib_exc.NotFound, self.create_l2gw_connection,
l2gwc_param)
self.addCleanup(self.l2gw_cleanup)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("e6cb8973-fcbc-443e-a3cb-c6a82ae58b63")
def test_create_l2gwc_with_nonexist_network(self):
"""
Create l2 gateway connection using non exist l2gw uuid.
"""
LOG.info("Testing test_l2_gateway_connection_create api")
non_exist_network_uuid = NON_EXIST_UUID
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id": non_exist_network_uuid,
"segmentation_id": self.VLAN_1}
# Delete l2gw must raise Conflict exception.
self.assertRaises(lib_exc.NotFound, self.create_l2gw_connection,
l2gwc_param)
self.addCleanup(self.l2gw_cleanup)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("27c7c64f-511f-421e-8b62-dfed143fc00b")
def test_create_l2gw_with_invalid_seg_id(self):
"""
Create l2 gateway connection using invalid seg id.
"""
LOG.info("Testing l2_gateway_create api with segmentation ID")
invalid_seg_id = 20000
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name,
"vlans": [invalid_seg_id]}
l2gw_param = [device_1]
self.assertRaises(lib_exc.BadRequest, self.create_l2gw, l2gw_name,
l2gw_param)
self.addCleanup(self.l2gw_cleanup)
@decorators.skip_because(bug="1640033")
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("000cc597-bcea-4539-af07-bd70357e8d82")
def test_create_l2gw_with_non_int_seg_id(self):
"""
Create l2 gateway connection using invalid seg id.
"""
LOG.info("Testing l2_gateway_create api with segmentation ID")
invalid_seg_id = 2.45
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name,
"vlans": [invalid_seg_id]}
l2gw_param = [device_1]
self.assertRaises(lib_exc.BadRequest, self.create_l2gw, l2gw_name,
l2gw_param)
self.addCleanup(self.l2gw_cleanup)

View File

@ -0,0 +1,205 @@
# Copyright 2015 OpenStack Foundation
# Copyright 2016 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest import test
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.lib import feature_manager
CONF = config.CONF
LOG = constants.log.getLogger(__name__)
class L2GatewayScenarioTest(feature_manager.FeatureManager):
"""
Test l2 gateway connection operations.
"""
@classmethod
def skip_checks(cls):
"""
Skip running test if we do not meet criteria to run the tests.
"""
super(L2GatewayScenarioTest, cls).skip_checks()
if not test.is_extension_enabled("l2-gateway", "network"):
raise cls.skipException("l2-gateway extension not enabled.")
@classmethod
def resource_setup(cls):
"""
Setting up the resources for the test.
"""
super(L2GatewayScenarioTest, cls).resource_setup()
# Create subnet on the network just created.
cls.SUBNET_1_NETWORK_CIDR = CONF.l2gw.subnet_1_cidr
# VLAN id used in setups
cls.VLAN_1 = CONF.l2gw.vlan_1
cls.VLAN_2 = CONF.l2gw.vlan_2
# IPs of predeployed vms.
cls.VM_ON_VDS_TZ1_VLAN16_IP = CONF.l2gw.vm_on_vds_tz1_vlan16_ip
cls.VM1_ON_SWITCH_VLAN16 = CONF.l2gw.vm_on_switch_vlan16
cls.VM1_ON_VDS_TZ2_VLAN16_IP = CONF.l2gw.vm_on_vds_tz2_vlan16_ip
cls.VM1_ON_VDS_TZ2_VLAN17_IP = CONF.l2gw.vm_on_vds_tz2_vlan17_ip
cls.SUBNET_1_MASK = cls.SUBNET_1_NETWORK_CIDR.split("/")[1]
cls.CIDR = netaddr.IPNetwork(cls.SUBNET_1_NETWORK_CIDR)
@classmethod
def resource_cleanup(cls):
"""
Clean all the resources used during the test.
"""
super(L2GatewayScenarioTest, cls).resource_cleanup()
def deploy_l2gateway_topology(self):
router_l2gateway = self.create_topology_router("router_l2gateway")
# L2gateway network with router
network_l2gateway = self.create_topology_network("network_l2gateway")
# cidr must be presented & in IPNetwork structure.
self.CIDR = netaddr.IPNetwork(self.SUBNET_1_NETWORK_CIDR)
self.create_topology_subnet(
"subnet1_l2gateway", network_l2gateway, cidr=self.CIDR,
router_id=router_l2gateway["id"],
mask_bits=int(self.SUBNET_1_MASK))
secgroup = self.create_topology_security_group()
secgroups = [{'name': secgroup['name']}]
self.create_topology_instance(
"server1_l2gateway", [network_l2gateway],
security_groups=secgroups)
self.create_topology_instance(
"server2_l2gateway", [network_l2gateway],
security_groups=secgroups)
def deploy_topology_and_create_l2gateway(self, vlan_id):
self.deploy_l2gateway_topology()
cluster_info = self.nsx_bridge_cluster_info()
device_name, interface_name = cluster_info[0][0], cluster_info[0][1]
l2gw_name = data_utils.rand_name(constants.L2GW)
device_1 = {"dname": device_name, "iname": interface_name,
"vlans": [vlan_id]}
l2gw_param = [device_1]
l2gw_rsp, _ = self.create_l2gw(l2gw_name, l2gw_param)
l2gwc_param = {"l2_gateway_id": l2gw_rsp[constants.L2GW]["id"],
"network_id":
self.topology_networks["network_l2gateway"]["id"]}
l2gwc_rsp = self.create_l2gw_connection(l2gwc_param)
# Assert if create fails.
self.assertEqual(constants.EXPECTED_HTTP_RESPONSE_201,
l2gwc_rsp.response["status"],
"Response code is not %(code)s" % {
"code": constants.EXPECTED_HTTP_RESPONSE_201})
self.assertEqual(l2gwc_param["l2_gateway_id"],
l2gwc_rsp[constants.L2GWC]["l2_gateway_id"],
"l2gw id is not same as expected in "
"create l2gw connection response")
self.assertEqual(l2gwc_param["network_id"],
l2gwc_rsp[constants.L2GWC]["network_id"],
"network id is not same as expected in "
"create l2gw connection response")
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("b62a7452-f2c1-4f2b-9403-f121f5201516")
def test_l2_gateway_ping_servers_on_overlays(self):
"""
Create l2 gateway connection using one vlan. Vlan parameter is
passed into L2GW create.
"""
LOG.info("Testing test_l2_gateway_ping_servers_on_overlays")
self.deploy_topology_and_create_l2gateway(self.VLAN_1)
server1_floatingip = self.topology_servers["server1_l2gateway"][
"floating_ip"]
server1 = self.topology_servers["server1_l2gateway"]
address_list = [server1_floatingip["fixed_ip_address"]]
address_list.append(self.topology_servers["server2_l2gateway"][
"floating_ip"]["fixed_ip_address"])
self.check_server_internal_ips_using_floating_ip(
server1_floatingip, server1, address_list)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("74e67d5f-0319-45e8-9731-d2c245c05beb")
def test_l2_gateway_ping_servers_overlay_to_vds_with_same_tz(self):
"""
Create l2 gateway connection using one vlan. Vlan parameter is
passed into L2GW create. ping from server on OS ls to NSX ls
"""
LOG.info("Testing test_l2_gateway_ping_servers_overlay_to_nsx_ls")
self.deploy_topology_and_create_l2gateway(self.VLAN_1)
server1_floatingip = self.topology_servers["server1_l2gateway"][
"floating_ip"]
server1 = self.topology_servers["server1_l2gateway"]
address_list = [server1_floatingip["fixed_ip_address"]]
address_list.append(self.VM_ON_VDS_TZ1_VLAN16_IP)
self.check_server_internal_ips_using_floating_ip(
server1_floatingip, server1, address_list)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("4e66584f-f61b-465d-952c-795a285d7c55")
def test_l2_gateway_ping_servers_overlay_to_vds_with_diff_tz(self):
"""
Create l2 gateway connection using one vlan. Vlan parameter is
passed into L2GW create. ping from server on OS ls to NSX ls
"""
LOG.info("Testing test_l2_gateway_ping_servers_overlay_to_nsx_ls")
self.deploy_topology_and_create_l2gateway(self.VLAN_1)
server1_floatingip = self.topology_servers["server1_l2gateway"][
"floating_ip"]
server1 = self.topology_servers["server1_l2gateway"]
address_list = [server1_floatingip["fixed_ip_address"]]
address_list.append(self.VM1_ON_VDS_TZ2_VLAN16_IP)
self.check_server_internal_ips_using_floating_ip(
server1_floatingip, server1, address_list)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("aef2a142-0b49-48a9-8881-f47897c09745")
def test_l2_gateway_ping_servers_overlay_to_physical_vlan(self):
"""
Create l2 gateway connection using one vlan. Vlan parameter is
passed into L2GW create. ping from server on OS ls to NSX ls
"""
LOG.info("Testing test_l2_gateway_ping_servers_overlay_to_nsx_ls")
self.deploy_topology_and_create_l2gateway(self.VLAN_1)
server1_floatingip = self.topology_servers["server1_l2gateway"][
"floating_ip"]
server1 = self.topology_servers["server1_l2gateway"]
address_list = [server1_floatingip["fixed_ip_address"]]
address_list.append(self.VM1_ON_SWITCH_VLAN16)
self.check_server_internal_ips_using_floating_ip(
server1_floatingip, server1, address_list)
@decorators.attr(type="nsxv3")
@decorators.idempotent_id("00036e1d-69e0-4faf-a62f-602600bc5631")
def test_l2_gateway_reconfig_ping_servers_overlay_to_vds_with_diff_tz(
self):
"""
Create l2 gateway connection using one vlan. Vlan parameter is
passed into L2GW create. ping from server on OS ls to NSX ls
"""
LOG.info(
"Testing test_l2_gateway_reconfig_ping_servers_overlay_to_vds_"
"with_diff_tz")
self.deploy_topology_and_create_l2gateway(self.VLAN_2)
server1_floatingip = self.topology_servers["server1_l2gateway"][
"floating_ip"]
server1 = self.topology_servers["server1_l2gateway"]
address_list = [server1_floatingip["fixed_ip_address"]]
address_list.append(self.VM1_ON_VDS_TZ2_VLAN17_IP)
self.check_server_internal_ips_using_floating_ip(
server1_floatingip, server1, address_list)

View File

@ -1,4 +1,4 @@
# Copyright 2016 VMware Inc
# Copyright 2017 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -12,36 +12,21 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from oslo_log import log as logging
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.scenario import manager
from tempest import test
from vmware_nsx_tempest.lib import feature_manager
from vmware_nsx_tempest.services import nsx_client
CONF = config.CONF
LOG = logging.getLogger(__name__)
Floating_IP_tuple = collections.namedtuple('Floating_IP_tuple',
['floating_ip', 'server'])
class TestMicroSegmentationOps(manager.NetworkScenarioTest):
"""Test suite for micro-segmentation scenario test
The purpose of this scenario test is to test micro-segmentation use
case which is one of the most important features of NSX. In the test,
two-tier application web and app networks are created, and security
group rules are defined based on this use case. Verify that VMs on
these networks have the correct behaviors as expected.
"""
class TestMicroSegmentationOps(feature_manager.FeatureManager):
@classmethod
def skip_checks(cls):
@ -55,7 +40,7 @@ class TestMicroSegmentationOps(manager.NetworkScenarioTest):
if not test.is_extension_enabled(ext, 'network'):
msg = "%s extension not enabled." % ext
raise cls.skipException(msg)
if not (CONF.network.public_network_cidr):
if not CONF.network.public_network_cidr:
msg = "public_network_cidr must be defined in network section."
raise cls.skipException(msg)
@ -64,213 +49,91 @@ class TestMicroSegmentationOps(manager.NetworkScenarioTest):
cls.set_network_resources()
super(TestMicroSegmentationOps, cls).setup_credentials()
def setUp(self):
super(TestMicroSegmentationOps, self).setUp()
self.keypairs = {}
self.config_drive = CONF.compute_feature_enabled.config_drive
@classmethod
def setup_clients(cls):
"""
Create various client connections. Such as NSX.
"""
super(TestMicroSegmentationOps, cls).setup_clients()
cls.nsx_client = nsx_client.NSXClient(
CONF.network.backend,
CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
def _create_security_groups(self):
web_sg = self._create_empty_security_group(namestart="secgroup-web")
app_sg = self._create_empty_security_group(namestart="secgroup-app")
def define_security_groups(self):
self.web_sg = self.create_topology_empty_security_group(
namestart="web_sg_")
self.app_sg = self.create_topology_empty_security_group(
namestart="app_sg_")
# Common rules to allow the following traffic
# 1. Egress ICMP IPv4 any any
# 2. Egress ICMP IPv6 any any
# 3. Ingress ICMP IPv4 from public network
# 4. Ingress TCP 22 (SSH) from public network
common_ruleset = [
dict(
direction='egress',
protocol='icmp'
),
dict(
direction='egress',
protocol='icmp',
ethertype='IPv6'
),
dict(
direction='ingress',
protocol='tcp',
port_range_min=22,
port_range_max=22,
remote_ip_prefix=CONF.network.public_network_cidr
),
dict(
direction='ingress',
protocol='icmp',
remote_ip_prefix=CONF.network.public_network_cidr
)
]
common_ruleset = [dict(direction='egress', protocol='icmp'),
dict(direction='egress', protocol='icmp',
ethertype='IPv6'),
dict(direction='ingress', protocol='tcp',
port_range_min=22, port_range_max=22,
remote_ip_prefix=CONF.network
.public_network_cidr),
dict(direction='ingress', protocol='icmp',
remote_ip_prefix=CONF.network
.public_network_cidr)]
# Rules that are specific to web tier network
# 1. Ingress ICMP IPv4 from web_sg
# 2. Ingress TCP 80 (HTTP) any any
# 3. Ingress TCP 443 (HTTPS) any any
web_ruleset = [
dict(
direction='ingress',
protocol='icmp',
remote_group_id=web_sg['id']
),
dict(
direction='ingress',
protocol='tcp',
port_range_min=80,
port_range_max=80,
),
dict(
direction='ingress',
protocol='tcp',
port_range_min=443,
port_range_max=443,
)
]
web_rulesets = common_ruleset + web_ruleset
web_rules = [dict(direction='ingress', protocol='icmp',
remote_group_id=self.web_sg['id']),
dict(direction='ingress', protocol='tcp',
port_range_min=80, port_range_max=80, ),
dict(direction='ingress', protocol='tcp',
port_range_min=443, port_range_max=443, )]
web_rules = common_ruleset + web_rules
# Rules that are specific to app tier network
# 1. Ingress ICMP IPv4 from app_sg
# 2. Ingress TCP 22 (SSH) from web_sg
app_ruleset = [
dict(
direction='ingress',
protocol='icmp',
remote_group_id=app_sg['id']
),
dict(
direction='ingress',
protocol='tcp',
port_range_min=22,
port_range_max=22,
remote_group_id=web_sg['id']
)
]
app_rulesets = common_ruleset + app_ruleset
for ruleset in web_rulesets:
self._create_security_group_rule(secgroup=web_sg, **ruleset)
for ruleset in app_rulesets:
self._create_security_group_rule(secgroup=app_sg, **ruleset)
return (web_sg, app_sg)
app_rules = [dict(direction='ingress', protocol='icmp',
remote_group_id=self.app_sg['id']),
dict(direction='ingress', protocol='tcp',
port_range_min=22, port_range_max=22,
remote_group_id=self.web_sg['id'])]
app_rules = common_ruleset + app_rules
for rule in web_rules:
self.add_security_group_rule(self.web_sg, rule)
for rule in app_rules:
self.add_security_group_rule(self.app_sg, rule)
def _create_network_topo(self, **kwargs):
self.web_net, self.web_subnet, self.router = self.create_networks(
**kwargs)
self.app_net = self._create_network()
self.app_subnet = self._create_subnet(network=self.app_net)
router_id = self.router['id']
self.routers_client.add_router_interface(
router_id, subnet_id=self.app_subnet['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.routers_client.remove_router_interface,
router_id, subnet_id=self.app_subnet['id'])
def deploy_micro_segmentation_topology(self):
router_microseg = self.create_topology_router("router_microseg")
# Web network
network_web = self.create_topology_network("network_web")
self.create_topology_subnet("subnet_web", network_web,
router_id=router_microseg["id"])
self.create_topology_instance(
"server_web_1", [network_web],
security_groups=[{'name': self.web_sg['name']}])
self.create_topology_instance(
"server_web_2", [network_web],
security_groups=[{'name': self.web_sg['name']}])
# App network
network_app = self.create_topology_network("network_app")
self.create_topology_subnet("subnet_app", network_app,
router_id=router_microseg["id"])
self.create_topology_instance(
"server_app_1", [network_app],
security_groups=[{'name': self.app_sg['name']}])
self.create_topology_instance(
"server_app_2", [network_app],
security_groups=[{'name': self.app_sg['name']}])
def _create_servers(self):
web_server1_name = data_utils.rand_name('web-vm1')
web_server2_name = data_utils.rand_name('web-vm2')
app_server1_name = data_utils.rand_name('app-vm1')
app_server2_name = data_utils.rand_name('app-vm2')
# Create two VMs on web-tier network
self.web_server1, self.web_server1_fip_tuple = self._create_server(
web_server1_name, self.web_net, self.web_sg)
self.web_server2, self.web_server2_fip_tuple = self._create_server(
web_server2_name, self.web_net, self.web_sg)
# Create two VMs on app-tier network
self.app_server1, self.app_server1_fip_tuple = self._create_server(
app_server1_name, self.app_net, self.app_sg)
self.app_server2, self.app_server2_fip_tuple = self._create_server(
app_server2_name, self.app_net, self.app_sg)
def check_server_project_connectivity(self, server_details):
self.using_floating_ip_check_server_and_project_network_connectivity(
server_details)
def _setup_micro_seg_topo(self, **kwargs):
self.web_sg, self.app_sg = self._create_security_groups()
self._create_network_topo(**kwargs)
self._create_servers()
def _create_server(self, name, network, secgroup, image_id=None):
keypair = self.create_keypair()
self.keypairs[keypair['name']] = keypair
security_groups = [{'name': secgroup['name']}]
network = {'uuid': network['id']}
server = self.create_server(name=name, networks=[network],
key_name=keypair['name'],
config_drive=self.config_drive,
security_groups=security_groups,
image_id=image_id,
wait_until='ACTIVE')
floating_ip = self.create_floating_ip(server)
fip_tuple = Floating_IP_tuple(floating_ip, server)
return (server, fip_tuple)
def _get_server_key(self, server):
return self.keypairs[server['key_name']]['private_key']
def _list_ports(self, *args, **kwargs):
"""List ports using admin creds """
ports_list = self.admin_manager.ports_client.list_ports(
*args, **kwargs)
return ports_list['ports']
def _check_network_internal_connectivity(self, network, fip_tuple,
should_connect=True):
floating_ip, server = fip_tuple
# test internal connectivity to the network ports on the network
network_ips = (p['fixed_ips'][0]['ip_address'] for p in
self._list_ports(tenant_id=server['tenant_id'],
network_id=network['id'])
if p['device_owner'].startswith('network'))
self._check_server_connectivity(floating_ip,
server,
network_ips,
should_connect)
def _check_network_vm_connectivity(self, network, fip_tuple,
should_connect=True):
floating_ip, server = fip_tuple
# test internal connectivity to the other VM on the same network
compute_ips = (p['fixed_ips'][0]['ip_address'] for p in
self._list_ports(tenant_id=server['tenant_id'],
network_id=network['id'])
if p['device_owner'].startswith('compute'))
self._check_server_connectivity(floating_ip,
server,
compute_ips,
should_connect)
def _check_server_connectivity(self, floating_ip, server, address_list,
should_connect=True):
ip_address = floating_ip['floating_ip_address']
private_key = self._get_server_key(server)
ssh_source = self.get_remote_client(ip_address,
private_key=private_key)
for remote_ip in address_list:
if should_connect:
msg = ("Timed out waiting for %s to become "
"reachable") % remote_ip
else:
msg = "ip address %s is reachable" % remote_ip
try:
self.assertTrue(self._check_remote_connectivity
(ssh_source, remote_ip, should_connect),
msg)
except Exception:
LOG.exception("Unable to access %{dest}s via ssh to "
"floating-ip %{src}s",
{'dest': remote_ip, 'src': floating_ip})
raise
def _check_cross_network_connectivity(self, network, should_connect=False):
if network['id'] == self.web_net['id']:
net_id = self.app_net['id']
floating_ip, server = self.web_server1_fip_tuple
else:
net_id = self.web_net['id']
floating_ip, server = self.app_server1_fip_tuple
# test internal connectivity to the other VM on the same network
remote_ips = (p['fixed_ips'][0]['ip_address'] for p in
self._list_ports(tenant_id=server['tenant_id'],
network_id=net_id)
if p['device_owner'].startswith('compute'))
self._check_server_connectivity(floating_ip,
server,
remote_ips,
should_connect)
@decorators.attr(type='common')
@decorators.attr(type=["nsxv3", "nsxv"])
@decorators.idempotent_id('91e1ee1f-10d9-4b19-8350-804aea7e57b4')
def test_micro_segmentation_ops(self):
"""Test micro-segmentation use case
@ -280,17 +143,15 @@ class TestMicroSegmentationOps(manager.NetworkScenarioTest):
on the network, and verify the connectivity based on the rule.
"""
self._setup_micro_seg_topo()
network_server_list = [
(self.web_net, self.web_server1_fip_tuple),
(self.web_net, self.web_server2_fip_tuple),
(self.app_net, self.app_server1_fip_tuple),
(self.app_net, self.app_server2_fip_tuple)
]
for net, fip_tuple in network_server_list:
self._check_network_internal_connectivity(network=net,
fip_tuple=fip_tuple)
self._check_network_vm_connectivity(network=net,
fip_tuple=fip_tuple)
for net in [self.web_net, self.app_net]:
self._check_cross_network_connectivity(network=net)
self.define_security_groups()
self.deploy_micro_segmentation_topology()
for server, details in self.servers_details.items():
self.check_server_project_connectivity(details)
self.check_cross_network_connectivity(
self.topology_networks["network_web"],
self.servers_details["server_app_1"].floating_ip,
self.servers_details["server_app_1"].server)
self.check_cross_network_connectivity(
self.topology_networks["network_app"],
self.servers_details["server_web_1"].floating_ip,
self.servers_details["server_web_1"].server)