diff --git a/vmware_nsx_tempest_plugin/lib/appliance_manager.py b/vmware_nsx_tempest_plugin/lib/appliance_manager.py index 2ac5338..140c704 100644 --- a/vmware_nsx_tempest_plugin/lib/appliance_manager.py +++ b/vmware_nsx_tempest_plugin/lib/appliance_manager.py @@ -211,13 +211,13 @@ class ApplianceManager(manager.NetworkScenarioTest): name = network_name # Neutron disables port security by default so we have to check the # config before trying to create the network with port_security_enabled - if CONF.network_feature_enabled.port_security: - port_security_enabled = True - else: - port_security_enabled = False + if 'port_security_enabled' not in kwargs.keys(): + if CONF.network_feature_enabled.port_security: + kwargs['port_security_enabled'] = True + else: + kwargs['port_security_enabled'] = False result = networks_client.create_network( - name=name, tenant_id=tenant_id, - port_security_enabled=port_security_enabled, **kwargs) + name=name, tenant_id=tenant_id, **kwargs) network = result['network'] self.assertEqual(network['name'], name) self.addCleanup(test_utils.call_and_ignore_notfound_exc, @@ -478,7 +478,8 @@ class ApplianceManager(manager.NetworkScenarioTest): clients=None, create_floating_ip=True, port=None, **kwargs): # Define security group for server. if security_groups: - kwargs["security_groups"] = security_groups + if security_groups != 'undef': + kwargs["security_groups"] = security_groups else: _sg = self.create_topology_security_group() _security_groups = [{'name': _sg['name']}] diff --git a/vmware_nsx_tempest_plugin/services/nsxv_client.py b/vmware_nsx_tempest_plugin/services/nsxv_client.py index b311f51..76595e9 100644 --- a/vmware_nsx_tempest_plugin/services/nsxv_client.py +++ b/vmware_nsx_tempest_plugin/services/nsxv_client.py @@ -94,7 +94,7 @@ class VSMClient(object): content_type = self.content_type if content is None else content accept_type = self.accept_type if accept is None else accept auth_cred = self.username + ":" + self.password - auth = base64.b64encode(auth_cred) + auth = base64.b64encode(auth_cred.encode()).decode() headers = {} headers['Authorization'] = "Basic %s" % auth headers['Content-Type'] = content_type @@ -361,3 +361,17 @@ class VSMClient(object): return True else: return False + + def verify_spoofgaurd_policy_nw(self, network_id): + """Retrieve existing scppfguard policy""" + self.__set_api_version('4.0') + self.__set_endpoint("/services/spoofguard/policies/") + response = self.get() + spoofgaurd_policies = response.json()['policies'] + sps = [sp['name'] for sp in spoofgaurd_policies] + if network_id in sps: + LOG.debug('Found spoofgurad policy for network: %s' % network_id) + return True + else: + LOG.debug('spoofguard policy of nw is %s NOT found!' % network_id) + return False diff --git a/vmware_nsx_tempest_plugin/tests/scenario/test_multiple_allow_address_pair.py b/vmware_nsx_tempest_plugin/tests/scenario/test_multiple_allow_address_pair.py new file mode 100644 index 0000000..9433b75 --- /dev/null +++ b/vmware_nsx_tempest_plugin/tests/scenario/test_multiple_allow_address_pair.py @@ -0,0 +1,670 @@ +# Copyright 2017 VMware Inc +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import re + +from tempest import config +from tempest.lib.common.utils import data_utils + +from tempest.lib import decorators +from tempest.lib import exceptions + +from vmware_nsx_tempest_plugin.lib import feature_manager +from vmware_nsx_tempest_plugin.services import fwaas_client as FWAASC +from vmware_nsx_tempest_plugin.services import nsxv3_client +from vmware_nsx_tempest_plugin.services import nsxv_client + +CONF = config.CONF + + +class MultipleAllowAddress(feature_manager.FeatureManager): + + """Allowing address on port when network port security is + disabled Scenario + + """ + @classmethod + def setup_clients(cls): + super(MultipleAllowAddress, cls).setup_clients() + cls.cmgr_adm = cls.get_client_manager('admin') + cls.routers_client = cls.cmgr_adm.routers_client + cls.networks_client = cls.cmgr_adm.networks_client + cls.subnets_client = cls.cmgr_adm.subnets_client + cls.ports_client = cls.cmgr_adm.ports_client + cls.sec_rule_client = cls.cmgr_adm.security_group_rules_client + cls.sec_client = cls.cmgr_adm.security_groups_client + cls.fwaasv1_client = FWAASC.get_client(cls.cmgr_adm) + + @classmethod + def resource_setup(cls): + super(MultipleAllowAddress, cls).resource_setup() + if CONF.network.backend == "nsxv3" \ + or CONF.network.backend == "nsxp": + cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager, + CONF.nsxv3.nsx_user, + CONF.nsxv3.nsx_password) + elif CONF.network.backend == "nsxv": + manager_ip = re.search(r"(\d{1,3}\.){3}\d{1,3}", + CONF.nsxv.manager_uri).group(0) + cls.vsm = nsxv_client.VSMClient( + manager_ip, CONF.nsxv.user, CONF.nsxv.password) + + def _assign_ip_address(self, ssh_source, interface_name, ip_address): + int_face = interface_name.split('.')[0] + vlan_id = interface_name.split('.')[1] + ssh_source.exec_command("sudo ip link add link %s name %s type vlan\ + id %s" % (int_face, interface_name, vlan_id)) + + ssh_source.exec_command("sudo ifconfig %s %s/24 \ + up" % (interface_name, ip_address)) + + def verify_ping_to_fip_from_ext_vm(self, server_details): + self.using_floating_ip_check_server_and_project_network_connectivity( + server_details) + + def verify_ping_own_fip(self, server): + fip = server["floating_ips"][0]["floating_ip_address"] + client = self.verify_server_ssh(server, floating_ip=fip) + ping_cmd = "ping -c 1 %s " % fip + self.exec_cmd_on_server_using_fip(ping_cmd, ssh_client=client) + + def _conv_switch_prof_to_dict(self, switch_profiles): + switch_prof_dict = {} + for i in range(len(switch_profiles)): + switch_prof_dict.update( + {switch_profiles[i]['key']: switch_profiles[i]['value']}) + return switch_prof_dict + + def _verify_if_ipaddress_in_allowed_adress(self, port_id, + ipaddressORcidr): + port_detail = self.ports_client.show_port(port_id) + port_details = port_detail['port']['allowed_address_pairs'] + + ipList = [ip_mac['ip_address'] for ip_mac in port_details] + msg = ipaddressORcidr + 'is not in allowed address' + self.assertTrue(ipaddressORcidr in ipList, msg) + + @decorators.attr(type='negative') + @decorators.attr(type='nsxv') + @decorators.idempotent_id('4346016b-93cc-5099-b217-12345caa24a1') + def test_allow_invalid_ip_when_network_security_disabled(self): + """ + Verify port allow ipv4,ipv6,cidr when + VM has network port security is disabled, + """ + router_name = data_utils.rand_name(name='tempest-rtr') + net_name = data_utils.rand_name(name='tempest-net') + router = self.\ + create_topology_router(router_name, + routers_client=self.routers_client) + body = {"port_security_enabled": 'False', + "admin_state_up": 'True'} + network = self.\ + create_topology_network(net_name, + networks_client=self.networks_client, + **body) + msg = 'spoofgurad policy should not get create' + self.assertFalse(self.vsm.verify_spoofgaurd_policy_nw(network['id']), + msg) + subnet_name = network['name'] + 'sub' + self.create_topology_subnet(subnet_name, network, + router_id=router["id"], + routers_client=self.routers_client, + subnets_client=self.subnets_client) + vm1 = self.\ + create_topology_instance("server1", [network], + security_groups='undef', + clients=self.cmgr_adm) + # Update vm1 port to ipv4 address + ipv4 = '20.0.0.20.5' + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': ipv4}] + } + vm1_port_id = self.ports_client.list_ports(device_id=vm1['id']) + vm1_port_id = vm1_port_id['ports'][0]['id'] + self.assertRaises(exceptions.BadRequest, + self.ports_client.update_port, + vm1_port_id, + **port_body) + + @decorators.attr(type='nsxv') + @decorators.idempotent_id('4446016b-93cc-5099-b217-12345caa24a1') + def test_allow_cidr_ipv4_ipv6_on_port_when_network_security_disabled(self): + """ + Verify port allow ipv4,ipv6,cidr when + VM has network port security is disabled, + """ + router_name = data_utils.rand_name(name='tempest-rtr') + net_name = data_utils.rand_name(name='tempest-net') + router = self.\ + create_topology_router(router_name, + routers_client=self.routers_client) + body = {"port_security_enabled": 'False', + "admin_state_up": 'True'} + network = self.\ + create_topology_network(net_name, + networks_client=self.networks_client, + **body) + msg = 'spoofgurad policy should not get create' + self.assertFalse(self.vsm.verify_spoofgaurd_policy_nw(network['id']), + msg) + subnet_name = network['name'] + 'sub' + self.create_topology_subnet(subnet_name, network, + router_id=router["id"], + routers_client=self.routers_client, + subnets_client=self.subnets_client) + vm1 = self.\ + create_topology_instance("server1", [network], + security_groups='undef', + clients=self.cmgr_adm) + vm2 = self.\ + create_topology_instance("server2", [network], + security_groups='undef', + clients=self.cmgr_adm) + vm3 = self.\ + create_topology_instance("server3", [network], + security_groups='undef', + clients=self.cmgr_adm) + + # Update vm1 port to ipv4 address + ipv4 = '20.0.0.20' + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': ipv4}] + } + vm1_port_id = self.ports_client.list_ports(device_id=vm1['id']) + vm1_port_id = vm1_port_id['ports'][0]['id'] + self.ports_client.update_port(vm1_port_id, + **port_body) + self._verify_if_ipaddress_in_allowed_adress(vm1_port_id, ipv4) + + # Update vm2 port to cidr + cidr = '20.0.0.0/24' + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': cidr}] + } + vm2_port_id = self.ports_client.list_ports(device_id=vm2['id']) + vm2_port_id = vm2_port_id['ports'][0]['id'] + self.ports_client.update_port(vm2_port_id, + **port_body) + self._verify_if_ipaddress_in_allowed_adress(vm2_port_id, cidr) + + # Update vm3 port to ipv6 address + ipv6 = '2607:f0d0:1002:51::4' + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': ipv6}] + } + vm3_port_id = self.ports_client.list_ports(device_id=vm3['id']) + vm3_port_id = vm3_port_id['ports'][0]['id'] + self.ports_client.update_port(vm3_port_id, + **port_body) + self._verify_if_ipaddress_in_allowed_adress(vm3_port_id, ipv6) + + # Enabling security on network is not allowed when port has cidr + body['port_security_enabled'] = 'True' + self.assertRaises(exceptions.BadRequest, + self.networks_client.update_network, + network['id'], + **body) + + @decorators.attr(type='nsxv') + @decorators.idempotent_id('4441016b-93cc-5099-b217-12345caa24a1') + def test_deploy_vm_from_port_when_nw_security_disabled(self): + """ + Verify deploying a vm from port with allow address pair is successfull + """ + router_name = data_utils.rand_name(name='tempest-rtr') + net_name = data_utils.rand_name(name='tempest-net') + router = self.\ + create_topology_router(router_name, + routers_client=self.routers_client) + body = {"port_security_enabled": 'False', + "admin_state_up": 'True'} + network = self.\ + create_topology_network(net_name, + networks_client=self.networks_client, + **body) + msg = 'spoofgurad policy should not get create' + self.assertFalse(self.vsm.verify_spoofgaurd_policy_nw(network['id']), + msg) + subnet_name = network['name'] + 'sub' + self.create_topology_subnet(subnet_name, network, + router_id=router["id"], + routers_client=self.routers_client, + subnets_client=self.subnets_client) + kwargs = dict(tenant_id=network['tenant_id'], + security_group_rules_client=self.sec_rule_client, + security_groups_client=self.sec_client) + self.sg = self.create_topology_security_group(**kwargs) + ipv4 = '20.0.0.20' + create_port_body = {"security_groups": [self.sg['id']], + "port_security_enabled": 'True', + "allowed_address_pairs": [{'ip_address': ipv4}]} + vm1_port = self.create_topology_port(network=network, + ports_client=self.ports_client, + **create_port_body) + vm2_port = self.create_topology_port(network=network, + ports_client=self.ports_client, + **create_port_body) + self.create_topology_instance("server1", + port=vm1_port['port'], + security_groups='undef', + clients=self.cmgr_adm) + self._verify_if_ipaddress_in_allowed_adress(vm1_port['port']['id'], + ipv4) + self._verify_if_ipaddress_in_allowed_adress(vm2_port['port']['id'], + ipv4) + + @decorators.attr(type='nsxv') + @decorators.idempotent_id('4442016b-93cc-5099-b217-12345caa24a1') + def test_allow_same_ip_to_two_diff_port_nw_port_security_disabled(self): + """ + Verify same ip is allowed to 2 ports whwn + network port security is disabled + """ + router_name = data_utils.rand_name(name='tempest-rtr') + net_name = data_utils.rand_name(name='tempest-net') + router = self.\ + create_topology_router(router_name, + routers_client=self.routers_client) + body = {"port_security_enabled": 'False', + "admin_state_up": 'True'} + network = self.\ + create_topology_network(net_name, + networks_client=self.networks_client, + **body) + msg = 'spoofgurad policy should not get create' + self.assertFalse(self.vsm.verify_spoofgaurd_policy_nw(network['id']), + msg) + subnet_name = network['name'] + 'sub' + self.create_topology_subnet(subnet_name, network, + router_id=router["id"], + routers_client=self.routers_client, + subnets_client=self.subnets_client) + vm1 = self.\ + create_topology_instance("server1", [network], + security_groups='undef', + clients=self.cmgr_adm) + + vm2 = self.\ + create_topology_instance("server2", [network], + security_groups='undef', + clients=self.cmgr_adm) + # Update vm2 port to ipv4 address + ipv4 = '20.0.0.20' + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': ipv4}] + } + vm1_port_id = self.ports_client.list_ports(device_id=vm1['id']) + vm1_port_id = vm1_port_id['ports'][0]['id'] + vm2_port_id = self.ports_client.list_ports(device_id=vm2['id']) + vm2_port_id = vm2_port_id['ports'][0]['id'] + self.ports_client.update_port(vm1_port_id, + **port_body) + self.ports_client.update_port(vm2_port_id, + **port_body) + self._verify_if_ipaddress_in_allowed_adress(vm1_port_id, ipv4) + self._verify_if_ipaddress_in_allowed_adress(vm2_port_id, ipv4) + + @decorators.attr(type='nsxv') + @decorators.idempotent_id('4443016b-93cc-5099-b217-12345caa24a1') + def test_allow_same_ip_to_two_diff_port_nw_port_security_enabled(self): + """ + Verify same ip is not allowed to 2 ports when + network port security is enabled with duplicate ip error + """ + router_name = data_utils.rand_name(name='tempest-rtr') + net_name = data_utils.rand_name(name='tempest-net') + router = self.\ + create_topology_router(router_name, + routers_client=self.routers_client) + body = {"port_security_enabled": 'True', + "admin_state_up": 'True'} + network = self.\ + create_topology_network(net_name, + networks_client=self.networks_client, + **body) + msg = 'spoofgurad policy should get create' + self.assertTrue(self.vsm.verify_spoofgaurd_policy_nw(network['id']), + msg) + subnet_name = network['name'] + 'sub' + self.create_topology_subnet(subnet_name, network, + router_id=router["id"], + routers_client=self.routers_client, + subnets_client=self.subnets_client) + vm1 = self.\ + create_topology_instance("server1", [network], + security_groups='undef', + clients=self.cmgr_adm) + + vm2 = self.\ + create_topology_instance("server2", [network], + security_groups='undef', + clients=self.cmgr_adm) + # Update vm2 port to ipv4 address + ipv4 = '20.0.0.20' + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': ipv4}] + } + vm1_port_id = self.ports_client.list_ports(device_id=vm1['id']) + vm1_port_id = vm1_port_id['ports'][0]['id'] + vm2_port_id = self.ports_client.list_ports(device_id=vm2['id']) + vm2_port_id = vm2_port_id['ports'][0]['id'] + self.ports_client.update_port(vm1_port_id, + **port_body) + self.assertRaises(exceptions.BadRequest, + self.ports_client.update_port, + vm2_port_id, + **port_body) + self._verify_if_ipaddress_in_allowed_adress(vm1_port_id, ipv4) + + @decorators.attr(type='nsxv') + @decorators.idempotent_id('4444016b-93cc-5099-b217-12345caa24a1') + def test_update_ipv4_to_cidr_on_port_and_update_nw_port_security(self): + """ + Verify ipv4 to cidr cange is allowed when nw security disabled + VM has network port security is disabled, + """ + router_name = data_utils.rand_name(name='tempest-rtr') + net_name = data_utils.rand_name(name='tempest-net') + router = self.\ + create_topology_router(router_name, + routers_client=self.routers_client) + body = {"port_security_enabled": 'False', + "admin_state_up": 'True'} + network = self.\ + create_topology_network(net_name, + networks_client=self.networks_client, + **body) + msg = 'spoofgurad policy should not get create' + self.assertFalse(self.vsm.verify_spoofgaurd_policy_nw(network['id']), + msg) + subnet_name = network['name'] + 'sub' + self.create_topology_subnet(subnet_name, network, + router_id=router["id"], + routers_client=self.routers_client, + subnets_client=self.subnets_client) + vm1 = self.\ + create_topology_instance("server1", [network], + security_groups='undef', + clients=self.cmgr_adm) + # Update vm2 port to ipv4 address + ipv4 = '20.0.0.20' + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': ipv4}] + } + vm1_port_id = self.ports_client.list_ports(device_id=vm1['id']) + vm1_port_id = vm1_port_id['ports'][0]['id'] + self.ports_client.update_port(vm1_port_id, + **port_body) + self._verify_if_ipaddress_in_allowed_adress(vm1_port_id, ipv4) + + # Updati to cidr from ipv4 is allowed when nw disabled + cidr = '20.0.0.0/24' + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': cidr}] + } + self.ports_client.update_port(vm1_port_id, + **port_body) + self._verify_if_ipaddress_in_allowed_adress(vm1_port_id, cidr) + body['port_security_enabled'] = 'True' + self.assertRaises(exceptions.BadRequest, + self.networks_client.update_network, + network['id'], + **body) + ipv4 = '20.0.0.21' + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': ipv4}] + } + self.ports_client.update_port(vm1_port_id, + **port_body) + self._verify_if_ipaddress_in_allowed_adress(vm1_port_id, ipv4) + # Enable network + self.networks_client.update_network(network['id'], **body) + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': cidr}] + } + self.assertRaises(exceptions.BadRequest, + self.ports_client.update_port, + vm1_port_id, + **port_body) + self._verify_if_ipaddress_in_allowed_adress(vm1_port_id, ipv4) + + @decorators.attr(type='nsxv') + @decorators.idempotent_id('4447016b-93cc-5099-b217-12345caa24a1') + def test_allow_cidr_on_port_when_network_port_security_disabled(self): + """ + Verify port allow cidr when + VM has network port security is disabled, + """ + router_name = data_utils.rand_name(name='tempest-rtr') + net_name = data_utils.rand_name(name='tempest-net') + router = self.\ + create_topology_router(router_name, + routers_client=self.routers_client) + body = {"port_security_enabled": 'False', + "admin_state_up": 'True'} + network = self.\ + create_topology_network(net_name, + networks_client=self.networks_client, + **body) + msg = 'spoofgurad policy should not get create' + self.assertFalse(self.vsm.verify_spoofgaurd_policy_nw(network['id']), + msg) + subnet_name = network['name'] + 'sub' + self.create_topology_subnet(subnet_name, network, + router_id=router["id"], + routers_client=self.routers_client, + subnets_client=self.subnets_client) + vm1 = self.\ + create_topology_instance("server1", [network], + security_groups='undef', + clients=self.cmgr_adm) + # Update vm2 port to ipv4 address + cidr = '20.0.0.0/24' + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': cidr}] + } + vm1_port_id = self.ports_client.list_ports(device_id=vm1['id']) + vm1_port_id = vm1_port_id['ports'][0]['id'] + self.ports_client.update_port(vm1_port_id, + **port_body) + self._verify_if_ipaddress_in_allowed_adress(vm1_port_id, cidr) + # Enabling port security on nw is not allowed when any port has cidr + body['port_security_enabled'] = 'True' + self.assertRaises(exceptions.BadRequest, + self.networks_client.update_network, + network['id'], + **body) + + @decorators.attr(type='nsxv') + @decorators.idempotent_id('4447016b-93cc-5099-b217-12345caa24a1') + def test_allow_cidr_on_port_when_network_port_security_enabled(self): + """ + Verify port allow cidr when + VM has network port security is enabled, + """ + router_name = data_utils.rand_name(name='tempest-rtr') + net_name = data_utils.rand_name(name='tempest-net') + router = self.\ + create_topology_router(router_name, + routers_client=self.routers_client) + body = {"port_security_enabled": 'True', + "admin_state_up": 'True'} + network = self.\ + create_topology_network(net_name, + networks_client=self.networks_client, + **body) + msg = 'spoofgurad policy should get create' + self.assertTrue(self.vsm.verify_spoofgaurd_policy_nw(network['id']), + msg) + subnet_name = network['name'] + 'sub' + self.create_topology_subnet(subnet_name, network, + router_id=router["id"], + routers_client=self.routers_client, + subnets_client=self.subnets_client) + vm1 = self.\ + create_topology_instance("server1", [network], + security_groups='undef', + clients=self.cmgr_adm) + # Update vm2 port to ipv4 address + cidr = '20.0.0.0/24' + vm1_port_id = self.ports_client.list_ports(device_id=vm1['id']) + vm1_port_id = vm1_port_id['ports'][0]['id'] + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': cidr}] + } + self.assertRaises(exceptions.BadRequest, + self.ports_client.update_port, + vm1_port_id, + **port_body) + port_detail = self.ports_client.show_port(vm1_port_id) + self.assertFalse(port_detail['port']['allowed_address_pairs']) + + @decorators.attr(type='nsxv') + @decorators.idempotent_id('3336016b-93cc-5099-b217-12345caa24a1') + def test_allow_ivp4_addr_on_port_when_network_portsecurity_disabled(self): + """ + Verify port allow ipv4 address when + VM has network port security is disabled, + """ + router_name = data_utils.rand_name(name='tempest-rtr') + net_name = data_utils.rand_name(name='tempest-net') + router = self.\ + create_topology_router(router_name, + routers_client=self.routers_client) + body = {"port_security_enabled": 'False', + "admin_state_up": 'True'} + network = self.\ + create_topology_network(net_name, + networks_client=self.networks_client, + **body) + msg = 'spoofgurad policy should not get create' + self.assertFalse(self.vsm.verify_spoofgaurd_policy_nw(network['id']), + msg) + subnet_name = network['name'] + 'sub' + self.create_topology_subnet(subnet_name, network, + router_id=router["id"], + routers_client=self.routers_client, + subnets_client=self.subnets_client) + vm1 = self.\ + create_topology_instance("server1", [network], + security_groups='undef', + clients=self.cmgr_adm) + vm2 = self.\ + create_topology_instance("server2", [network], + security_groups='undef', + clients=self.cmgr_adm) + # Update vm2 port to ipv4 address + vm1_ip = '20.0.0.20' + vm2_ip = '20.0.0.22' + vm1_port_id = self.ports_client.list_ports(device_id=vm1['id']) + vm1_port_id = vm1_port_id['ports'][0]['id'] + vm2_port_id = self.ports_client.list_ports(device_id=vm2['id']) + vm2_port_id = vm2_port_id['ports'][0]['id'] + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': vm1_ip}] + } + self.ports_client.update_port(vm1_port_id, + **port_body) + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': vm2_ip}] + } + self.ports_client.update_port(vm2_port_id, + **port_body) + self._verify_if_ipaddress_in_allowed_adress(vm1_port_id, vm1_ip) + self._verify_if_ipaddress_in_allowed_adress(vm2_port_id, vm2_ip) + # Enabling port security on network is allowed wen port has ipv4 + body['port_security_enabled'] = 'True' + self.networks_client.update_network(network['id'], **body) + msg = 'spoofgurad policy should get create' + self.assertTrue(self.vsm.verify_spoofgaurd_policy_nw(network['id']), + msg) + + @decorators.attr(type='nsxv') + @decorators.idempotent_id('3336016b-93cc-5099-b217-12345caa24a1') + def test_allow_ip4_address_on_port_when_nw_port_security_enabled(self): + """ + Verify port allow ipv4 address when + VM has network port security is enabled, + """ + router_name = data_utils.rand_name(name='tempest-rtr-daku') + net_name = data_utils.rand_name(name='tempest-net') + router = self.\ + create_topology_router(router_name, + routers_client=self.routers_client) + body = {"port_security_enabled": 'True', + "admin_state_up": 'True'} + network = self.\ + create_topology_network(net_name, + networks_client=self.networks_client, + **body) + msg = 'spoofgurad policy should get create' + self.assertTrue(self.vsm.verify_spoofgaurd_policy_nw(network['id']), + msg) + subnet_name = network['name'] + 'sub' + self.create_topology_subnet(subnet_name, network, + router_id=router["id"], + routers_client=self.routers_client, + subnets_client=self.subnets_client) + kwargs = dict(tenant_id=network['tenant_id'], + security_group_rules_client=self.sec_rule_client, + security_groups_client=self.sec_client) + self.sg = self.create_topology_security_group(**kwargs) + vm1 = self.\ + create_topology_instance("server1", [network], + security_groups=[ + {'name': self.sg['name']}], + clients=self.cmgr_adm) + vm2 = self.\ + create_topology_instance("server2", [network], + security_groups=[ + {'name': self.sg['name']}], + clients=self.cmgr_adm) + # Update vm2 port to ipv4 address + vm1_ip = '20.0.0.20' + vm2_ip = '20.0.0.22' + vm1_port_id = self.ports_client.list_ports(device_id=vm1['id']) + vm1_port_id = vm1_port_id['ports'][0]['id'] + vm2_port_id = self.ports_client.list_ports(device_id=vm2['id']) + vm2_port_id = vm2_port_id['ports'][0]['id'] + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': vm1_ip}] + } + self.ports_client.update_port(vm1_port_id, + **port_body) + port_body = {'port_security_enabled': 'True', + 'allowed_address_pairs': [{'ip_address': vm2_ip}] + } + self.ports_client.update_port(vm2_port_id, + **port_body) + self._verify_if_ipaddress_in_allowed_adress(vm1_port_id, vm1_ip) + self._verify_if_ipaddress_in_allowed_adress(vm2_port_id, vm2_ip) + vm1_ip_address = vm1['floating_ips'][0]['floating_ip_address'] + ssh_source1 = self._get_remote_client(vm1_ip_address, + use_password=True) + vm2_ip_address = vm2['floating_ips'][0]['floating_ip_address'] + ssh_source2 = self._get_remote_client(vm2_ip_address, + use_password=True) + self._assign_ip_address(ssh_source1, 'eth0.1', vm1_ip) + self._assign_ip_address(ssh_source2, 'eth0.1', vm2_ip) + # Verify Connectivity between vms + self.check_remote_connectivity(ssh_source1, vm2_ip, + should_succeed=True) + # Disabling port security on network is allowed when any port has ipv4i + body['port_security_enabled'] = 'False' + self.networks_client.update_network(network['id'], **body) + msg = 'spoofgurad policy should not get create' + self.assertFalse(self.vsm.verify_spoofgaurd_policy_nw(network['id']), + msg)