Patch for python3 change in tempest

Change-Id: I470b5a23f64b14e92bcb29962e44546a0e2d826f
This commit is contained in:
shubhamk 2020-03-16 13:52:09 +00:00 committed by Shubham Kadam
parent cb68cce000
commit 486303bbae
11 changed files with 147 additions and 80 deletions

View File

@ -799,6 +799,7 @@ class FeatureManager(traffic_manager.IperfManager,
break
def count_response(self, response):
response = response.decode('utf-8')
if response in self.http_cnt:
self.http_cnt[response] += 1
else:

View File

@ -183,7 +183,7 @@ class TrafficManager(appliance_manager.ApplianceManager):
'file1': src_file,
'dst_folder':
dst_folder}
args = shlex.split(cmd.encode('utf-8'))
args = shlex.split(cmd)
subprocess_args = {'stdout': subprocess.PIPE,
'stderr': subprocess.STDOUT}
proc = subprocess.Popen(args, **subprocess_args)
@ -257,10 +257,11 @@ class TrafficManager(appliance_manager.ApplianceManager):
'Connection: close\r\nContent-Type: text/html; '
'charset=UTF-8\r\n\r\n%s"; cat >/dev/null')
with tempfile.NamedTemporaryFile() as script:
script.write(resp % (len(server_name), server_name))
script.write((resp % (len(server_name), server_name)).
encode('utf-8'))
script.flush()
with tempfile.NamedTemporaryFile() as key:
key.write(private_key)
key.write(private_key.encode('utf-8'))
key.flush()
self.scp_file_to_instance_using_fip(script.name,
"/tmp/script",

View File

@ -91,7 +91,7 @@ class NSXPClient(object):
content_type = self.content_type if content is None else content
accept_type = self.accept_type if accept is None else accept
auth_cred = self.username + ":" + self.password
auth = base64.b64encode(auth_cred)
auth = base64.b64encode(auth_cred.encode()).decode()
headers = {}
headers['Authorization'] = "Basic %s" % auth
headers['Content-Type'] = content_type

View File

@ -90,7 +90,7 @@ class NSXV3Client(object):
content_type = self.content_type if content is None else content
accept_type = self.accept_type if accept is None else accept
auth_cred = self.username + ":" + self.password
auth = base64.b64encode(auth_cred)
auth = base64.b64encode(auth_cred.encode()).decode()
headers = {}
headers['Authorization'] = "Basic %s" % auth
headers['Content-Type'] = content_type

View File

@ -79,7 +79,8 @@ class TestQos(feature_manager.FeatureManager):
'start': str(address_cidr).split('/')[0] + '2',
'end':str(address_cidr).split('/')[0] + '70'}]}
if slaac:
subnet = self.create_topology_subnet(subnet_name, network,
subnet = self.create_topology_subnet(
subnet_name, network,
subnets_client=subnet_client,
routers_client=self.cmgr_adm.routers_client,
router_id=router['id'],
@ -87,7 +88,8 @@ class TestQos(feature_manager.FeatureManager):
ipv6_address_mode='slaac',
**allocation_pools)
else:
subnet = self.create_topology_subnet(subnet_name, network,
subnet = self.create_topology_subnet(
subnet_name, network,
subnets_client=subnet_client,
ip_version=6, enable_dhcp=False,
**allocation_pools)
@ -129,14 +131,14 @@ class TestQos(feature_manager.FeatureManager):
"""Create network with IPv6 static assignment based subnet
Associate qos policy to the network
"""
network, _ = self._create_single_ipv6_rtr_topology()
policy = self. _create_qos_policy()
network, _ = self._create_single_ipv6_rtr_topology()
self.cmgr_adm.networks_client.update_network(
network['id'], qos_policy_id=policy['id'])
updated_network = self.cmgr_adm.networks_client.show_network(
network['id'])
self.assertEqual(updated_network['network']['qos_policy_id'],
policy['id'])
policy['id'])
@decorators.attr(type=['nsxv3', 'positive'])
@decorators.idempotent_id('a121d38e-d977-4bd0-b7db-6f7d9e206f7a')
@ -151,7 +153,7 @@ class TestQos(feature_manager.FeatureManager):
updated_network = self.cmgr_adm.networks_client.show_network(
network['id'])
self.assertEqual(updated_network['network']['qos_policy_id'],
policy['id'])
policy['id'])
@decorators.attr(type=['nsxv3', 'positive'])
@decorators.idempotent_id('7a6a2088-612f-4fc6-87be-d58c5d04b946')
@ -164,13 +166,14 @@ class TestQos(feature_manager.FeatureManager):
policy = self. _create_qos_policy()
port_client = self.cmgr_adm.ports_client
body = self.create_topology_port(network=network,
ports_client=port_client, qos_policy_id=policy['id'])
ports_client=port_client,
qos_policy_id=policy['id'])
port = body['port']
body = self.show_topology_port(port['id'],
ports_client=port_client)
port = body['port']
self.assertEqual(port['qos_policy_id'],
policy['id'])
policy['id'])
@decorators.attr(type=['nsxv3', 'positive'])
@decorators.idempotent_id('f6297a98-f487-4403-852b-61fd0fce329c')
@ -189,4 +192,4 @@ class TestQos(feature_manager.FeatureManager):
updated_network = self.cmgr_adm.networks_client.show_network(
network['id'])
self.assertEqual(updated_network['network']['qos_policy_id'],
policy['id'])
policy['id'])

View File

@ -13,12 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
from tempest.api.network import base
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.services import nsxv3_client
CONF = config.CONF
@ -57,6 +60,8 @@ class NSXv3NativeDHCPNegative(base.BaseNetworkTest):
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.networks_client.delete_network, net_id)
self.assertTrue('ACTIVE', network['status'])
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_switch = self.nsx.get_logical_switch(network['name'],
network['id'])
dhcp_server = self.nsx.get_logical_dhcp_server(network['name'],
@ -75,6 +80,8 @@ class NSXv3NativeDHCPNegative(base.BaseNetworkTest):
self.networks_client.delete_network, net_id)
self.create_subnet(network, enable_dhcp=False)
self.assertTrue('ACTIVE', network['status'])
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_switch = self.nsx.get_logical_switch(network['name'],
network['id'])
dhcp_server = self.nsx.get_logical_dhcp_server(network['name'],
@ -93,6 +100,8 @@ class NSXv3NativeDHCPNegative(base.BaseNetworkTest):
self.networks_client.delete_network, net_id)
subnet = self.create_subnet(network)
self.assertTrue('ACTIVE', network['status'])
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_switch = self.nsx.get_logical_switch(network['name'],
network['id'])
dhcp_server = self.nsx.get_logical_dhcp_server(network['name'],
@ -101,6 +110,8 @@ class NSXv3NativeDHCPNegative(base.BaseNetworkTest):
self.assertIsNotNone(dhcp_server)
# Update subnet to disable DHCP
self.subnets_client.update_subnet(subnet['id'], enable_dhcp=False)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
dhcp_server = self.nsx.get_logical_dhcp_server(network['name'],
network['id'])
self.assertIsNone(dhcp_server)

View File

@ -258,6 +258,8 @@ class NSXv3MacLearningTest(base.BaseNetworkTest):
self.assertEqual(nsx_port['display_name'], test_port['name'],
"OS port and NSX port name do not match")
self._delete_port(test_port)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self.assertIsNone(self.nsx.get_logical_port(test_port['name']),
"Port %s is not None" % test_port['name'])

View File

@ -117,8 +117,9 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
*args, **kwargs)
return ports_list['ports']
def get_port_id(self, network_id, subnet_id, instance):
_, instance_addr = instance["addresses"].items()[0]
def get_port_id(self, network_id, subnet_id, instance,
network_name=None):
instance_addr = instance["addresses"][network_name]
instance_fixed_ip = instance_addr[0]["addr"]
for port in self._list_ports(device_id=instance['id']):
port_fixed_ip = port["fixed_ips"][0]["ip_address"]
@ -129,8 +130,9 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
self.assertIsNotNone(port_id, "Failed to find Instance's port id!!!")
return port_id
def get_port_ipv4v6_id(self, network_id, subnet_id, instance):
_, instance_addr = instance["addresses"].items()[0]
def get_port_ipv4v6_id(self, network_id, subnet_id, instance,
network_name=None):
instance_addr = instance["addresses"][network_name]
for addr in instance_addr:
if addr['version'] == 4:
instance_fixed_ip = addr["addr"]
@ -216,7 +218,7 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
self.subnet = self._create_subnet(self.network,
cidr='14.168.1.0/24')
self.subnet_v6 = self._create_subnet_v6(self.network,
cidr='3010::/64')
cidr='3010::/64')
self.router = self._create_router(
router_name=data_utils.rand_name('router-default1'),
external_network_id=CONF.network.public_network_id)
@ -284,14 +286,16 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
ip_address_vm2 = '87.0.0.4'
port_id = self.get_port_id(network['id'],
network_topo['subnet']['id'],
server_default)
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm1}]
port_client.update_port(
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_id(network['id'],
network_topo['subnet']['id'],
server_default1)
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm2}]
port_client.update_port(
@ -312,8 +316,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
(ssh_source, ip_address_vm2, 'True'),
'Destination is reachable')
def _test_v6_connectivity_between_allowed_adddress_pair_ports(self,
network_topo):
def _test_v6_connectivity_between_allowed_adddress_pair_ports(
self, network_topo):
server_name_default = data_utils.rand_name('server-default')
network = network_topo['network']
server_default = self._create_server(server_name_default, network)
@ -331,15 +335,17 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
ip_address_vm1 = '3000:10:10::2'
ip_address_vm2 = '3000:10:10::3'
port_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default)
network_topo['subnet']['id'],
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm1}]
port_client.update_port(
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default1)
network_topo['subnet']['id'],
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm2}]
port_client.update_port(
@ -360,8 +366,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
(ssh_source, ip_address_vm2, 'True'),
'Destination is reachable')
def _test_v4_v6_connectivity_between_allowed_adddress_pair_ports(self,
network_topo):
def _test_v4_v6_connectivity_between_allowed_adddress_pair_ports(
self, network_topo):
server_name_default = data_utils.rand_name('server-default')
network = network_topo['network']
server_default = self._create_server(server_name_default, network)
@ -381,16 +387,18 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
ipv6_address_vm1 = '3000:10:10::2'
ipv6_address_vm2 = '3000:10:10::3'
port_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default)
network_topo['subnet']['id'],
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ipv4_address_vm1},
{'ip_address': ipv6_address_vm1}]
port_client.update_port(
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default1)
network_topo['subnet']['id'],
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ipv4_address_vm2},
{'ip_address': ipv6_address_vm2}]
@ -544,8 +552,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.interface_client.delete_interface,
server_default1['id'], port1_id['port']['id'])
ssh_source1 = self.get_remote_client(ip_address_default_vm,
private_key=private_key_default_vm)
ssh_source1 = self.get_remote_client(
ip_address_default_vm, private_key=private_key_default_vm)
ssh_source2 = self.get_remote_client(
ip_address_default1_vm,
private_key=private_key_default1_vm)
@ -583,7 +591,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
ip_address_vm2_2 = '78.0.0.4'
port_id = self.get_port_id(network['id'],
network_topo['subnet']['id'],
server_default)
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm1_1},
{'ip_address': ip_address_vm1_2}]
@ -591,7 +600,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_id(network['id'],
network_topo['subnet']['id'],
server_default1)
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm2_1},
{'ip_address': ip_address_vm2_2}]
@ -641,16 +651,18 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
ip_address_vm2_1 = '3000:10:10::3'
ip_address_vm2_2 = '4000:10:10::3'
port_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default)
network_topo['subnet']['id'],
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm1_1},
{'ip_address': ip_address_vm1_2}]
port_client.update_port(
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default1)
network_topo['subnet']['id'],
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm2_1},
{'ip_address': ip_address_vm2_2}]
@ -704,8 +716,9 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
ipv6_address_vm2_1 = '3000:10:10::3'
ipv6_address_vm2_2 = '4000:10:10::3'
port_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default)
network_topo['subnet']['id'],
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ipv4_address_vm1_1},
{'ip_address': ipv4_address_vm1_2},
@ -714,8 +727,9 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
port_client.update_port(
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default1)
network_topo['subnet']['id'],
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ipv4_address_vm2_1},
{'ip_address': ipv4_address_vm2_2},
@ -789,7 +803,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
# Allowed Address pair
port_id = self.get_port_id(network['id'],
network_topo['subnet']['id'],
server_default)
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm1}]
port_client.update_port(port_id,

View File

@ -65,10 +65,12 @@ class ProviderNetworks(feature_manager.FeatureManager):
for tz in out:
if "transport_type" in tz.keys() and (vlan_flag == 0 or
vxlan_flag == 0):
if vxlan_flag == 0 and tz['transport_type'] == "OVERLAY":
if vxlan_flag == 0 and tz['transport_type'] == "OVERLAY" \
and tz['_create_user'] == 'admin':
cls.overlay_id = tz['id']
vxlan_flag = 1
if vlan_flag == 0 and tz['transport_type'] == "VLAN":
if vlan_flag == 0 and tz['transport_type'] == "VLAN" \
and tz['_create_user'] == 'admin':
cls.vlan_id = tz['id']
vlan_flag = 1

View File

@ -136,8 +136,9 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
*args, **kwargs)
return ports_list['ports']
def get_port_id(self, network_id, subnet_id, instance):
_, instance_addr = instance["addresses"].items()[0]
def get_port_id(self, network_id, subnet_id, instance,
network_name=None):
instance_addr = instance["addresses"][network_name]
instance_fixed_ip = instance_addr[0]["addr"]
port_id = None
for port in self._list_ports():
@ -150,6 +151,21 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
self.assertIsNotNone(port_id, "Failed to find Instance's port id!!!")
return port_id
def get_port_ipv4v6_id(self, network_id, subnet_id, instance,
network_name=None):
instance_addr = instance["addresses"][network_name]
for addr in instance_addr:
if addr['version'] == 4:
instance_fixed_ip = addr["addr"]
for port in self._list_ports():
port_fixed_ip = port["fixed_ips"][0]["ip_address"]
if port["network_id"] == network_id and port["fixed_ips"][0][
"subnet_id"] == subnet_id and instance["id"] == port[
"device_id"] and port_fixed_ip == instance_fixed_ip:
port_id = port["id"]
self.assertIsNotNone(port_id, "Failed to find Instance's port id!!!")
return port_id
def _create_server(self, name, network, port_id=None, image_id=None):
keypair = self.create_keypair()
self.keypairs[keypair['name']] = keypair
@ -263,7 +279,7 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
self.subnet = self._create_subnet(self.network,
cidr='10.168.1.0/24')
self.subnet_v6 = self._create_subnet_v6(self.network,
cidr='2020::/64')
cidr='2020::/64')
self.router = self._create_router(
router_name=data_utils.rand_name('router-port-sec'),
external_network_id=CONF.network.public_network_id)
@ -315,7 +331,7 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
self.subnet = self._create_subnet(self.network,
cidr='10.18.1.0/24')
self.subnet_v6 = self._create_subnet_v6(self.network,
cidr='3020::/64')
cidr='3020::/64')
self.router = self._create_router(
router_name=data_utils.rand_name('router-port-sec'),
external_network_id=CONF.network.public_network_id)
@ -451,9 +467,11 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
self._check_server_connectivity(public_ip_address_server_1,
private_ip_address_server_2,
private_key_server_1)
port_id_server_1 = self.get_port_id(network_topo['network']['id'],
network_topo['subnet']['id'],
server_default_1)
port_id_server_1 = self.get_port_id(
network_topo['network']['id'],
network_topo['subnet']['id'],
server_default_1,
network_name=network_topo['network']['name'])
port_id_server_2 = port_id['port']['id']
sec_grp_port = port_client.show_port(port_id_server_1)
sec_group = sec_grp_port['port']['security_groups'][0]
@ -521,9 +539,11 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
self._check_server_connectivity(public_ip_address_server_1,
private_ip_address_server_2,
private_key_server_1)
port_id_server_1 = self.get_port_id(network_topo['network']['id'],
network_topo['subnet']['id'],
server_default_1)
port_id_server_1 = self.get_port_id(
network_topo['network']['id'],
network_topo['subnet']['id'],
server_default_1,
network_name=network_topo['network']['name'])
port_id_server_2 = port_id['port']['id']
sec_grp_port = port_client.show_port(port_id_server_1)
sec_group = sec_grp_port['port']['security_groups'][0]
@ -573,7 +593,8 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
network = network_topo['network']
server_1 = self._create_server(server_name_1, network)
port_id = self.get_port_id(network['id'],
network_topo['subnet']['id'], server_1)
network_topo['subnet']['id'], server_1,
network_name=network['name'])
kwargs = {"port_security_enabled": "false", "security_groups": []}
port_client = self.cmgr_adm.ports_client
sec_grp_port = port_client.show_port(port_id)
@ -641,7 +662,8 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
private_ip_address_server_1,
private_key_server_2)
port_id1 = self.get_port_id(network['id'],
network_topo['subnet']['id'], server_2)
network_topo['subnet']['id'], server_2,
network_name=network['name'])
kwargs = {"port_security_enabled": "false", "security_groups": []}
port_client = self.cmgr_adm.ports_client
sec_grp_port = port_client.show_port(port_id1)
@ -678,8 +700,10 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
self._check_server_connectivity(public_ip_address_server_2,
private_ip_address_server_1,
private_key_server_2)
port_id1 = self.get_port_id(network['id'],
network_topo['subnet']['id'], server_2)
port_id1 = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_2,
network_name=network['name'])
kwargs = {"port_security_enabled": "false", "security_groups": []}
port_client = self.cmgr_adm.ports_client
sec_grp_port = port_client.show_port(port_id1)
@ -727,7 +751,8 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
kwargs = {"port_security_enabled": "false",
"security_groups": []}
port_id = self.get_port_id(network2['id'],
subnet2['id'], server_2)
subnet2['id'], server_2,
network_name=network2['name'])
sec_grp_port = port_client.show_port(port_id)
sec_group = sec_grp_port['port']['security_groups'][0]
port_client.update_port(port_id, **kwargs)
@ -753,8 +778,8 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
private_address_server_1,
private_key_server_2)
def _test_ipv6_connectivity_between_servers_with_router(self,
network_topo):
def _test_ipv6_connectivity_between_servers_with_router(
self, network_topo):
server_name_default_1 =\
data_utils.rand_name('server-port-sec-1')
server_name_default_2 =\
@ -780,7 +805,8 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
kwargs = {"port_security_enabled": "false",
"security_groups": []}
port_id = self.get_port_id(network['id'],
subnet['id'], server_1)
subnet['id'], server_1,
network_name=network['name'])
sec_grp_port = port_client.show_port(port_id)
sec_group = sec_grp_port['port']['security_groups'][0]
port_client.update_port(port_id, **kwargs)

View File

@ -143,8 +143,9 @@ class TestProviderSecurityGroup(manager.NetworkScenarioTest):
*args, **kwargs)
return ports_list['ports']
def get_port_id(self, network_id, subnet_id, instance):
_, instance_addr = instance["addresses"].items()[0]
def get_port_id(self, network_id, subnet_id, instance,
network_name=None):
instance_addr = instance["addresses"][network_name]
instance_fixed_ip = instance_addr[0]["addr"]
port_id = None
for port in self._list_ports():
@ -193,8 +194,8 @@ class TestProviderSecurityGroup(manager.NetworkScenarioTest):
return sgr.get('security_group_rule', sgr)
def create_security_group_ipv6_rule(self, security_group_id,
cmgr=None, project_id=None,
protocol=None):
cmgr=None, project_id=None,
protocol=None):
cmgr = cmgr or self.cmgr_adm
sgr_client = cmgr.security_group_rules_client
sgr_dict = dict(security_group_id=security_group_id,
@ -247,7 +248,7 @@ class TestProviderSecurityGroup(manager.NetworkScenarioTest):
self.subnet = self._create_subnet(self.network,
cidr='10.168.1.0/24')
self.subnet_v6 = self._create_subnet_v6(self.network,
cidr='2020::/64')
cidr='2020::/64')
self.router = self._create_router(
router_name=data_utils.rand_name('router-psg'),
external_network_id=CONF.network.public_network_id)
@ -291,7 +292,7 @@ class TestProviderSecurityGroup(manager.NetworkScenarioTest):
self.subnet = self._create_subnet(self.network,
cidr='10.168.1.0/24')
self.subnet_v6 = self._create_subnet_v6(self.network,
cidr='2020::/64')
cidr='2020::/64')
self.network_2 = self._create_network()
self.subnet_2 = self._create_subnet(self.network_2,
cidr='10.168.2.0/24')
@ -405,12 +406,16 @@ class TestProviderSecurityGroup(manager.NetworkScenarioTest):
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
p_client = self.ports_client
kwargs = {"provider_security_groups": ["%s" % sg_id]}
port_id_psg = self.get_port_id(network_topo['network']['id'],
network_topo['subnet']['id'],
servers['server_psg'])
port_id_default = self.get_port_id(network_topo['network']['id'],
network_topo['subnet']['id'],
servers['server_default'])
port_id_psg = self.get_port_id(
network_topo['network']['id'],
network_topo['subnet']['id'],
servers['server_psg'],
network_name=network_topo['network']['name'])
port_id_default = self.get_port_id(
network_topo['network']['id'],
network_topo['subnet']['id'],
servers['server_default'],
network_name=network_topo['network']['name'])
p_client.update_port(port_id_psg, **kwargs)
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
p_client.update_port(port_id_default, **kwargs)
@ -477,7 +482,7 @@ class TestProviderSecurityGroup(manager.NetworkScenarioTest):
private_ip_address_psg_vm,
private_key_default_vm)
self.create_security_group_ipv6_rule(sg_id, cmgr=self.cmgr_adm,
protocol='icmpv6')
protocol='icmpv6')
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
self._check_server_connectivity(ip_address_default_vm,
private_ip_address_psg_vm,
@ -511,7 +516,8 @@ class TestProviderSecurityGroup(manager.NetworkScenarioTest):
should_connect=False)
kwargs = {"provider_security_groups": []}
port_id = self.get_port_id(network['id'],
network_topo['subnet']['id'], server_psg)
network_topo['subnet']['id'], server_psg,
network_name=network['name'])
self.cmgr_adm.ports_client.update_port(port_id, **kwargs)
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
self._check_server_connectivity(ip_address_default_vm,
@ -679,7 +685,7 @@ class TestProviderSecurityGroup(manager.NetworkScenarioTest):
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('8426ae72-c6ce-4af5-9ac5-b3c74686353f')
def test_ipv6_connectivity_between_default_psg_server_with_multi_networks(
self):
self):
self.network_topo = self.create_multi_network_ipv6_topo()
self._test_build_up_topology_and_check_v6_connectivity(
self.network_topo)