Allowed address pairs with cidr for CI

Change-Id: I6234c999ac643b5dbba5b8eec2d4e5b3540178f2
This commit is contained in:
pbharathbhu 2020-07-16 12:40:28 +00:00
parent 2b9a91f426
commit 85051cada8
1 changed files with 125 additions and 0 deletions

View File

@ -296,6 +296,56 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
(ssh_source, ip_address_vm2, 'True'),
'Destination is reachable')
def _test_connectivity_between_allowed_adddress_pair_ports_cidr(
self, network_topo):
server_name_default = data_utils.rand_name('server-default')
network = network_topo['network']
server_default = self._create_server(server_name_default, network)
server_name_default1 = \
data_utils.rand_name('server-default1-sec-group')
server_default1 = self._create_server(server_name_default1, network)
floating_ip_default = self.create_floating_ip(server_default)
floating_ip_default1 = self.create_floating_ip(server_default1)
ip_address_default_vm = floating_ip_default['floating_ip_address']
ip_address_default1_vm = floating_ip_default1['floating_ip_address']
private_key_default_vm = self._get_server_key(server_default)
private_key_default1_vm = self._get_server_key(server_default1)
port_client = self.ports_client
# Allowed Address pair
ip_address_vm1 = '87.0.0.3/32'
ip_address_vm2 = '87.0.0.4/32'
port_id = self.get_port_id(network['id'],
network_topo['subnet']['id'],
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm1}]
port_client.update_port(
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_id(network['id'],
network_topo['subnet']['id'],
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm2}]
port_client.update_port(
port1_id, allowed_address_pairs=allowed_address_pairs)
ssh_source = self.get_remote_client(
ip_address_default_vm,
private_key=private_key_default_vm)
ssh_source1 = self.get_remote_client(
ip_address_default1_vm,
private_key=private_key_default1_vm)
# Assign Allowed pair ip to vm's
self._assign_ip_address(ssh_source, 'eth0:1', ip_address_vm1)
self._assign_ip_address(ssh_source1, 'eth0:1', ip_address_vm2)
self.assertTrue(self._check_remote_connectivity
(ssh_source1, ip_address_vm1, 'True'),
'Destination is reachable')
self.assertTrue(self._check_remote_connectivity
(ssh_source, ip_address_vm2, 'True'),
'Destination is reachable')
def _test_v6_connectivity_between_allowed_adddress_pair_ports(
self, network_topo):
server_name_default = data_utils.rand_name('server-default')
@ -671,6 +721,67 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
(ssh_source1, ip_address_vm1_2, 'True'),
'Destination is reachable')
def _test_v6_allowed_address_pair_on_vms_with_multiple_cidr_ips(
self, network_topo):
server_name_default = data_utils.rand_name('server-default')
network = network_topo['network']
server_default = self._create_server(server_name_default, network)
server_name_default1 = \
data_utils.rand_name('server-default1-sec-group')
server_default1 = self._create_server(server_name_default1, network)
floating_ip_default = self.create_floating_ip(server_default)
floating_ip_default1 = self.create_floating_ip(server_default1)
ip_address_default_vm = floating_ip_default['floating_ip_address']
ip_address_default1_vm = floating_ip_default1['floating_ip_address']
private_key_default_vm = self._get_server_key(server_default)
private_key_default1_vm = self._get_server_key(server_default1)
port_client = self.ports_client
# Allowed Address pair
ip_address_vm1_1 = '3000:10:10::2/128'
ip_address_vm1_2 = '4000:10:10::2/128'
ip_address_vm2_1 = '3000:10:10::3/128'
ip_address_vm2_2 = '4000:10:10::3/128'
port_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm1_1},
{'ip_address': ip_address_vm1_2}]
port_client.update_port(
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm2_1},
{'ip_address': ip_address_vm2_2}]
port_client.update_port(
port1_id, allowed_address_pairs=allowed_address_pairs)
ssh_source = self.get_remote_client(ip_address_default_vm,
private_key=private_key_default_vm)
ssh_source1 = self.get_remote_client(
ip_address_default1_vm,
private_key=private_key_default1_vm)
# Attach allowed pair ip's to vm's
self._assign_ipv6_address(ssh_source, 'eth0', ip_address_vm1_1)
self._assign_ipv6_address(ssh_source, 'eth0', ip_address_vm1_2)
self._assign_ipv6_address(ssh_source1, 'eth0', ip_address_vm2_1)
self._assign_ipv6_address(ssh_source1, 'eth0', ip_address_vm2_2)
self.assertTrue(self._check_remote_connectivity
(ssh_source, ip_address_vm2_1, 'True'),
'Destination is reachable')
self.assertTrue(self._check_remote_connectivity
(ssh_source, ip_address_vm2_2, 'True'),
'Destination is reachable')
self.assertTrue(self._check_remote_connectivity
(ssh_source1, ip_address_vm1_1, 'True'),
'Destination is reachable')
self.assertTrue(self._check_remote_connectivity
(ssh_source1, ip_address_vm1_2, 'True'),
'Destination is reachable')
def _test_v4v6_allowed_address_pair_on_vms_with_multiple_ips(
self, network_topo):
server_name_default = data_utils.rand_name('server-default')
@ -867,3 +978,17 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
self.network_topo = self.create_ipv6_network_topo()
self._test_v4v6_allowed_address_pair_on_vms_with_multiple_ips(
self.network_topo)
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('c0f0f446-65f5-40fa-8b05-b5798e8dd677')
def test_allowed_adddress_pair_on_vms_with_single_cidr_ip(self):
self.network_topo = self.create_network_topo()
self._test_connectivity_between_allowed_adddress_pair_ports_cidr(
self.network_topo)
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('513ea6d5-4ec6-430e-a501-a6ffbab624f8')
def test_ipv6_allowed_address_pair_on_vms_with_multiple_cidr_ips(self):
self.network_topo = self.create_ipv6_network_topo()
self._test_v6_allowed_address_pair_on_vms_with_multiple_cidr_ips(
self.network_topo)