vmware-nsx-tempest-plugin/vmware_nsx_tempest_plugin/tests/dvs/api/base_dvs.py

155 lines
6.0 KiB
Python

# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from tempest.api.network import base
from tempest import config
from tempest import exceptions
from tempest.lib.common.utils import data_utils
import tempest.test
CONF = config.CONF
class BaseDvsAdminNetworkTest(base.BaseAdminNetworkTest):
@classmethod
def resource_cleanup(cls):
for port in cls.ports:
cls.admin_ports_client.delete_port(port['id'])
for subnet in cls.subnets:
cls.admin_subnets_client.delete_subnet(subnet['id'])
# clean up ports, subnets and networks
cls.ports = []
cls.subnets = []
super(BaseDvsAdminNetworkTest, cls).resource_cleanup()
@classmethod
def create_network(cls, **kwargs):
"""Wrapper utility that returns a test admin provider network."""
network_name = (kwargs.get('net_name') or
data_utils.rand_name('test-adm-net-'))
net_type = kwargs.get('net_type', "flat")
if tempest.test.is_extension_enabled('provider', 'network'):
body = {'name': network_name}
body.update({'provider:network_type': net_type,
'provider:physical_network': 'dvs'})
if net_type == 'vlan':
_vlanid = kwargs.get('seg_id')
body.update({'provider:segmentation_id': _vlanid})
body = cls.admin_networks_client.create_network(**body)
network = body['network']
return network
@classmethod
def create_subnet(cls, network):
"""Wrapper utility that returns a test subnet."""
# The cidr and mask_bits depend on the ip version.
if cls._ip_version == 4:
cidr = netaddr.IPNetwork(CONF.network.project_network_cidr or
"192.168.101.0/24")
mask_bits = CONF.network.project_network_mask_bits or 24
elif cls._ip_version == 6:
cidr = netaddr.IPNetwork(CONF.network.project_network_v6_cidr)
mask_bits = CONF.network.project_network_v6_mask_bits
# Find a cidr that is not in use yet and create a subnet with it
for subnet_cidr in cidr.subnet(mask_bits):
try:
body = cls.admin_subnets_client.create_subnet(
network_id=network['id'],
cidr=str(subnet_cidr),
ip_version=cls._ip_version)
break
except exceptions.BadRequest as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
else:
message = 'Available CIDR for subnet creation could not be found'
raise exceptions.BuildErrorException(message)
subnet = body['subnet']
cls.subnets.append(subnet)
return subnet
@classmethod
def create_port(cls, network_id, **kwargs):
"""Wrapper utility that returns a test port."""
body = cls.admin_ports_client.create_port(network_id=network_id,
**kwargs)
port = body['port']
cls.ports.append(port)
return port
@classmethod
def update_network(cls, network_id, client=None, **kwargs):
net_client = client if client else cls.admin_networks_client
return net_client.update_network(network_id, **kwargs)
@classmethod
def delete_network(cls, network_id, client=None):
net_client = client if client else cls.admin_networks_client
return net_client.delete_network(network_id)
@classmethod
def show_network(cls, network_id, client=None, **kwargs):
net_client = client if client else cls.admin_networks_client
return net_client.show_network(network_id, **kwargs)
@classmethod
def list_networks(cls, client=None, **kwargs):
net_client = client if client else cls.admin_networks_client
return net_client.list_networks(**kwargs)
@classmethod
def update_subnet(cls, subnet_id, client=None, **kwargs):
net_client = client if client else cls.admin_subnets_client
return net_client.update_subnet(subnet_id, **kwargs)
@classmethod
def delete_subnet(cls, subnet_id, client=None):
net_client = client if client else cls.admin_subnets_client
return net_client.delete_subnet(subnet_id)
@classmethod
def show_subnet(cls, subnet_id, client=None, **kwargs):
net_client = client if client else cls.admin_subnets_client
return net_client.show_subnet(subnet_id, **kwargs)
@classmethod
def list_subnets(cls, client=None, **kwargs):
net_client = client if client else cls.admin_subnets_client
return net_client.list_subnets(**kwargs)
@classmethod
def delete_port(cls, port_id, client=None):
net_client = client if client else cls.admin_ports_client
return net_client.delete_port(port_id)
@classmethod
def show_port(cls, port_id, client=None, **kwargs):
net_client = client if client else cls.admin_ports_client
return net_client.show_port(port_id, **kwargs)
@classmethod
def list_ports(cls, client=None, **kwargs):
net_client = client if client else cls.admin_ports_client
return net_client.list_ports(**kwargs)
@classmethod
def update_port(cls, port_id, client=None, **kwargs):
net_client = client if client else cls.admin_ports_client
return net_client.update_port(port_id, **kwargs)