From 056d4cef502ab293a8f4ef5b89408003b2bc378f Mon Sep 17 00:00:00 2001 From: Shubhamk Kadam Date: Fri, 29 Mar 2019 17:08:18 +0000 Subject: [PATCH] Added policy support for router_nonat , cert cases - All scenario nonot cases - one cert cases (test_prevention_modification_openstack_network) Change-Id: I66422ab5f39bfcc21c08bb996f2852826cab3745 --- .../services/nsxp_client.py | 47 ++++++- .../tests/nsxv3/api/test_nsx_networks.py | 24 ++++ .../scenario/test_client_cert_mgmt_ops.py | 96 ++++++++------ .../nsxv3/scenario/test_router_nonat_ops.py | 120 +++++++++++++++--- 4 files changed, 232 insertions(+), 55 deletions(-) diff --git a/vmware_nsx_tempest_plugin/services/nsxp_client.py b/vmware_nsx_tempest_plugin/services/nsxp_client.py index ec030db..47670b1 100644 --- a/vmware_nsx_tempest_plugin/services/nsxp_client.py +++ b/vmware_nsx_tempest_plugin/services/nsxp_client.py @@ -125,7 +125,7 @@ class NSXPClient(object): """ NSX-T API Put request for certificate Management """ - endpoint = ("/%s/%s" % (component, comp_id)) + endpoint = ("%s/%s" % (component, comp_id)) response = self.put(endpoint=endpoint, body=body) return response @@ -142,7 +142,7 @@ class NSXPClient(object): """ NSX-T API delete request for certificate Management """ - endpoint = ("/%s/%s" % (component, comp_id)) + endpoint = ("%s/%s" % (component, comp_id)) response = self.delete(endpoint=endpoint) return response @@ -313,3 +313,46 @@ class NSXPClient(object): nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:] nsgroups = self.get_ns_groups(tenant_id=os_tenant_id) return self.get_nsx_resource_by_name(nsgroups, nsx_name) + + def get_logical_switches(self): + """ + Retrieve all logical switches on NSX backend + """ + return self.get_logical_resources("segments") + + def get_logical_switch(self, os_name, os_uuid): + """ + Get the logical switch based on the name and uuid provided. + + The name of the logical switch should follow + _... + Return logical switch if found, otherwise return None + """ + if not os_name or not os_uuid: + LOG.error("Name and uuid of OpenStack L2 network need to be " + "present in order to query backend logical switch!") + return None + nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:] + lswitches = self.get_logical_switches() + return self.get_nsx_resource_by_name(lswitches, nsx_name) + + def get_logical_router_nat_rules(self, lrouter): + """ + Get all user defined NAT rules of the specific logical router + """ + if not lrouter: + LOG.error("Logical router needs to be present in order " + "to get the NAT rules") + return None + endpoint = "tier-1s/%s/nat/USER/nat-rules" % lrouter['id'] + return self.get_logical_resources(endpoint) + + def get_logical_router_advertisement(self, lrouter): + """Get logical router advertisement""" + if not lrouter: + LOG.error("Logical router needs to be present in order " + "to get router advertisement!") + return None + endpoint = "/logical-routers/%s/routing/advertisement" % lrouter['id'] + response = self.get(endpoint) + return response.json() diff --git a/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_nsx_networks.py b/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_nsx_networks.py index 75d88f2..4e127f9 100644 --- a/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_nsx_networks.py +++ b/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_nsx_networks.py @@ -10,11 +10,15 @@ # License for the specific language governing permissions and limitations # under the License. +import time + from tempest.api.network import base from tempest import config from tempest.lib.common.utils import data_utils from tempest.lib import decorators +from vmware_nsx_tempest_plugin.common import constants +from vmware_nsx_tempest_plugin.services import nsxp_client from vmware_nsx_tempest_plugin.services import nsxv3_client CONF = config.CONF @@ -35,6 +39,9 @@ class NSXv3NetworksTest(base.BaseNetworkTest): cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager, CONF.nsxv3.nsx_user, CONF.nsxv3.nsx_password) + cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager, + CONF.nsxv3.nsx_user, + CONF.nsxv3.nsx_password) @decorators.attr(type='nsxv3') @decorators.idempotent_id('63085723-23ae-4109-ac86-69f895097957') @@ -43,6 +50,12 @@ class NSXv3NetworksTest(base.BaseNetworkTest): name = data_utils.rand_name('network-') network = self.create_network(network_name=name) net_id = network['id'] + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsxp_network = self.nsxp.get_logical_switch(network['name'], + network['id']) + self.assertEqual('ACTIVE', network['status']) + self.assertIsNotNone(nsxp_network) nsx_network = self.nsx.get_logical_switch(network['name'], network['id']) self.assertEqual('ACTIVE', network['status']) @@ -51,12 +64,23 @@ class NSXv3NetworksTest(base.BaseNetworkTest): new_name = "New_network" body = self.networks_client.update_network(net_id, name=new_name) updated_net = body['network'] + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsxp_network = self.nsxp.get_logical_switch(updated_net['name'], + updated_net['id']) + self.assertEqual(updated_net['name'], new_name) + self.assertIsNotNone(nsxp_network) nsx_network = self.nsx.get_logical_switch(updated_net['name'], updated_net['id']) self.assertEqual(updated_net['name'], new_name) self.assertIsNotNone(nsx_network) # Verify delete network self.networks_client.delete_network(updated_net['id']) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsxp_network = self.nsxp.get_logical_switch(updated_net['name'], + updated_net['id']) + self.assertIsNone(nsxp_network) nsx_network = self.nsx.get_logical_switch(updated_net['name'], updated_net['id']) self.assertIsNone(nsx_network) diff --git a/vmware_nsx_tempest_plugin/tests/nsxv3/scenario/test_client_cert_mgmt_ops.py b/vmware_nsx_tempest_plugin/tests/nsxv3/scenario/test_client_cert_mgmt_ops.py index 3ac8057..bffdc9a 100644 --- a/vmware_nsx_tempest_plugin/tests/nsxv3/scenario/test_client_cert_mgmt_ops.py +++ b/vmware_nsx_tempest_plugin/tests/nsxv3/scenario/test_client_cert_mgmt_ops.py @@ -13,17 +13,18 @@ # License for the specific language governing permissions and limitations # under the License. +import time from oslo_log import log as logging - from tempest.common import utils from tempest import config - from tempest.lib import decorators from tempest.lib.common.utils import data_utils from tempest.lib.common.utils import test_utils +from vmware_nsx_tempest_plugin.common import constants +from vmware_nsx_tempest_plugin.services import nsxp_client from vmware_nsx_tempest_plugin.services import nsxv3_client from vmware_nsx_tempest_plugin.services.qos import base_qos from vmware_nsx_tempest_plugin.tests.scenario import manager @@ -41,8 +42,8 @@ class TestCertificateMgmt(manager.NetworkScenarioTest): @classmethod def skip_checks(cls): super(TestCertificateMgmt, cls).skip_checks() - if not (CONF.network.project_networks_reachable - or CONF.network.public_network_id): + if not (CONF.network.project_networks_reachable or + CONF.network.public_network_id): msg = ('Either project_networks_reachable must be true, or\ public_network_id must be defined.') raise cls.skipException(msg) @@ -55,7 +56,11 @@ class TestCertificateMgmt(manager.NetworkScenarioTest): cls.set_network_resources() super(TestCertificateMgmt, cls).setup_credentials() cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager, - CONF.nsxv3.nsx_user, CONF.nsxv3.nsx_password) + CONF.nsxv3.nsx_user, + CONF.nsxv3.nsx_password) + cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager, + CONF.nsxv3.nsx_user, + CONF.nsxv3.nsx_password) @classmethod def resource_setup(cls): @@ -121,7 +126,7 @@ class TestCertificateMgmt(manager.NetworkScenarioTest): msg = 'Error: NSX admin is able to modify/delete' if all(x in response.json()['error_message'] for x in self.error_msg): LOG.info('NSX admin is unable to modify/delete ' - 'the openstack object') + 'the openstack object') else: raise Exception(msg) @@ -131,13 +136,13 @@ class TestCertificateMgmt(manager.NetworkScenarioTest): and a logical port attached to the network """ self.network = self._create_network(namestart="net-ca") - self.subnet = self._create_subnet(self.network, - cidr=CONF.network.project_network_cidr) + self.subnet = self._create_subnet( + self.network, cidr=CONF.network.project_network_cidr) self.port = self._create_port(network_id=self.network['id'], - namestart='ca') + namestart='ca') msg = 'Logical Port %s not found' % self.port['name'] - self.assertIsNotNone(self.nsx.get_logical_port( - self.port['name']), msg) + self.assertIsNotNone(self.nsx.get_logical_port(self.port['name']), + msg) data = self.nsx.get_logical_port(self.port['name']) return data @@ -163,24 +168,40 @@ class TestCertificateMgmtOps(TestCertificateMgmt): Verify if NSX admin is unable to modify this network """ self.network = self._create_network() - self.subnet = self._create_subnet(self.network, - cidr=CONF.network.project_network_cidr) + self.subnet = self._create_subnet( + self.network, cidr=CONF.network.project_network_cidr) #check backend if the network was created msg = 'network %s not found' % self.network['name'] + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + self.assertIsNotNone(self.nsxp.get_logical_switch( + self.network['name'], self.network['id']), msg) self.assertIsNotNone(self.nsx.get_logical_switch( self.network['name'], self.network['id']), msg) + if CONF.network.backend == 'nsxp': + data_policy = self.nsxp.get_logical_switch(self.network['name'], + self.network['id']) + self.assertEqual(data_policy['_create_user'], self.openstack_tag, + 'Incorrect tag for the create user') data = self.nsx.get_logical_switch(self.network['name'], - self.network['id']) + self.network['id']) """ Check if backend shows openstack as the create user for the object """ self.assertEqual(data['_create_user'], self.openstack_tag, - 'Incorrect tag for the create user') + 'Incorrect tag for the create user') #try to update network name as NSX admin - data.update({"display_name": "nsx_modified_switch"}) - response = self.nsx.ca_put_request(component='logical-switches', - comp_id=data['id'], body=data) + if CONF.network.backend == 'nsxp': + data_policy.update({"display_name": "nsx_modified_switch"}) + response = self.nsxp.ca_put_request(component='segments', + comp_id=data_policy['id'], + body=data_policy) + else: + data.update({"display_name": "nsx_modified_switch"}) + response = self.nsx.ca_put_request(component='segments', + comp_id=data['id'], + body=data) self.parse_response(response) @decorators.attr(type='nsxv3') @@ -193,8 +214,8 @@ class TestCertificateMgmtOps(TestCertificateMgmt): Verify if NSX admin can not delete this router """ self.network = self._create_network() - self.subnet = self._create_subnet(self.network, - cidr=CONF.network.project_network_cidr) + self.subnet = self._create_subnet( + self.network, cidr=CONF.network.project_network_cidr) #create router and add an interface self.router = self._create_router( router_name=data_utils.rand_name('router-cert-mgmt'), @@ -208,19 +229,19 @@ class TestCertificateMgmtOps(TestCertificateMgmt): self.assertIsNotNone(self.nsx.get_logical_router( self.router['name'], self.router['id']), msg) data = self.nsx.get_logical_router(self.router['name'], - self.router['id']) + self.router['id']) """ Check if backend shows openstack as the create user for the object """ self.assertEqual(data['_create_user'], self.openstack_tag, - 'Incorrect tag for the create user') + 'Incorrect tag for the create user') #Obtain any router port corresponding to the logical router rtr_ports = self.nsx.get_logical_router_ports(data) #try to update router name as NSX admin data.update({"display_name": "nsx_modified_router"}) response = self.nsx.ca_put_request(component='logical-routers', - comp_id=data['id'], body=data) + comp_id=data['id'], body=data) self.parse_response(response) #try to delete logical router port as NSX admin if len(rtr_ports) != 0: @@ -246,7 +267,7 @@ class TestCertificateMgmtOps(TestCertificateMgmt): #obtain all switching profiles at the backend qos_policies = self.nsx.get_switching_profiles() nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies, - policy['name']) + policy['name']) #check backend if the qos policy was created msg = 'Qos policy %s not found' % policy['name'] self.assertIsNotNone(self.nsx.get_switching_profile( @@ -257,15 +278,15 @@ class TestCertificateMgmtOps(TestCertificateMgmt): as the create user for the object """ self.assertEqual(data['_create_user'], self.openstack_tag, - 'Incorrect tag for the create user') + 'Incorrect tag for the create user') #try to update qos policy as NSX admin data.update({"display_name": "nsx_modified_qos-policy"}) response = self.nsx.ca_put_request(component='switching-profiles', - comp_id=data['id'], body=data) + comp_id=data['id'], body=data) self.parse_response(response) #try to delete qos policy as NSX admin response = self.nsx.ca_delete_request(component='switching-profiles', - comp_id=data['id']) + comp_id=data['id']) self.parse_response(response) @decorators.attr(type='nsxv3') @@ -283,25 +304,25 @@ class TestCertificateMgmtOps(TestCertificateMgmt): self.assertIsNotNone(self.nsx.get_firewall_section( self.security_group['name'], self.security_group['id']), msg) data = self.nsx.get_firewall_section(self.security_group['name'], - self.security_group['id']) + self.security_group['id']) """ Check if backend shows openstack as the create user for the object """ self.assertEqual(data['_create_user'], self.openstack_tag, - 'Incorrect tag for the create user') + 'Incorrect tag for the create user') #obtain firewall rules related to the security group fw_rules = self.nsx.get_firewall_section_rules(data) #try to update security group as NSX admin data.update({"display_name": "nsx_modified_security_group"}) response = self.nsx.ca_put_request(component='firewall/sections', - comp_id=data['id'], body=data) + comp_id=data['id'], body=data) self.parse_response(response) #try to delete logical firewall rule as NSX admin if len(fw_rules) != 0: component = 'firewall/sections/' + data['id'] + '/rules' response = self.nsx.ca_delete_request(component=component, - comp_id=fw_rules[0]['id']) + comp_id=fw_rules[0]['id']) self.parse_response(response) @decorators.attr(type='nsxv3') @@ -317,15 +338,15 @@ class TestCertificateMgmtOps(TestCertificateMgmt): """ data = self.ca_topo() self.assertEqual(data['_create_user'], self.openstack_tag, - 'Incorrect tag for the create user') + 'Incorrect tag for the create user') #try to update logical port as NSX admin data.update({"display_name": "nsx_modified_logical_port"}) response = self.nsx.ca_put_request(component='logical-ports', - comp_id=data['id'], body=data) + comp_id=data['id'], body=data) self.parse_response(response) #try to delete logical port as NSX admin response = self.nsx.ca_delete_request(component='logical-ports', - comp_id=data['id']) + comp_id=data['id']) self.parse_response(response) @decorators.attr(type='nsxv3') @@ -344,7 +365,7 @@ class TestCertificateMgmtOps(TestCertificateMgmt): #obtain all switching profiles at the backend qos_policies = self.nsx.get_switching_profiles() nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies, - policy['name']) + policy['name']) #check backend if the qos policy was created msg = 'Qos policy %s not found' % policy['name'] self.assertIsNotNone(self.nsx.get_switching_profile( @@ -352,10 +373,11 @@ class TestCertificateMgmtOps(TestCertificateMgmt): data = self.nsx.get_switching_profile(nsx_policy['id']) #try to delete qos policy as NSX admin endpoint = ("/%s/%s" % ('switching-profiles', - data['id'])) + data['id'])) response = self.nsx.delete_super_admin(endpoint) self.assertEqual(response.status_code, 200, - "Superadmin unable to delete the qos switching profile") + "Superadmin unable to " + "delete the qos switching profile") @decorators.attr(type='nsxv3') @decorators.idempotent_id('a874d78b-eb7a-4df6-a01b-dc0a22422dc2') diff --git a/vmware_nsx_tempest_plugin/tests/nsxv3/scenario/test_router_nonat_ops.py b/vmware_nsx_tempest_plugin/tests/nsxv3/scenario/test_router_nonat_ops.py index 5102f9d..565cb54 100644 --- a/vmware_nsx_tempest_plugin/tests/nsxv3/scenario/test_router_nonat_ops.py +++ b/vmware_nsx_tempest_plugin/tests/nsxv3/scenario/test_router_nonat_ops.py @@ -14,6 +14,7 @@ # under the License. import collections +import time from oslo_log import log as logging @@ -24,6 +25,8 @@ from tempest.lib.common.utils import test_utils from tempest.lib import decorators from tempest.lib import exceptions +from vmware_nsx_tempest_plugin.common import constants +from vmware_nsx_tempest_plugin.services import nsxp_client from vmware_nsx_tempest_plugin.services import nsxv3_client from vmware_nsx_tempest_plugin.tests.scenario import manager @@ -70,6 +73,9 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest): cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager, CONF.nsxv3.nsx_user, CONF.nsxv3.nsx_password) + cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager, + CONF.nsxv3.nsx_user, + CONF.nsxv3.nsx_password) def setUp(self): super(TestRouterNoNATOps, self).setUp() @@ -245,6 +251,12 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest): """ snat = True self._setup_network_topo(enable_snat=snat) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_router_policy = self.nsxp.get_logical_router( + self.router['name'], self.router['id']) + self.assertNotEqual(nsx_router_policy, None) + self.assertEqual(nsx_router_policy['resource_type'], 'Tier1') nsx_router = self.nsx.get_logical_router( self.router['name'], self.router['id']) self.assertNotEqual(nsx_router, None) @@ -265,22 +277,41 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest): """Test update router from NATed to NoNAT scenario""" snat = True self._setup_network_topo(enable_snat=snat) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_router_policy = self.nsxp.get_logical_router( + self.router['name'], self.router['id']) + self.assertNotEqual(nsx_router_policy, None) + self.assertEqual(nsx_router_policy['resource_type'], 'Tier1') nsx_router = self.nsx.get_logical_router( self.router['name'], self.router['id']) self.assertNotEqual(nsx_router, None) self.assertEqual(nsx_router['router_type'], 'TIER1') # Check nat rules created correctly - nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router) + if CONF.network.backend == 'nsxp': + nat_rules = self.nsxp.get_logical_router_nat_rules( + nsx_router_policy) + else: + nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router) + router_adv = self.nsx.get_logical_router_advertisement(nsx_router) # Check router advertisement is correctly set - router_adv = self.nsx.get_logical_router_advertisement(nsx_router) adv_msg = "Tier1 router's advertise_nsx_connected_routes is not True" nat_msg = "Tier1 router's advertise_nat_routes is not False" if any(d['action'] == 'NO_DNAT' for d in nat_rules): self.assertTrue(len(nat_rules) == 4) else: self.assertTrue(len(nat_rules) == 3) - self.assertTrue(router_adv['advertise_nat_routes'], nat_msg) - self.assertFalse(router_adv['advertise_nsx_connected_routes'], adv_msg) + if CONF.network.backend == 'nsxp': + self.assertTrue( + 'TIER1_NAT' in nsx_router_policy['route_advertisement_types'], + nat_msg) + self.assertFalse( + 'TIER1_CONNECTED' in nsx_router_policy[ + 'route_advertisement_types'], adv_msg) + else: + self.assertTrue(router_adv['advertise_nat_routes'], nat_msg) + self.assertFalse( + router_adv['advertise_nsx_connected_routes'], adv_msg) self._check_network_internal_connectivity(network=self.network) self._check_network_vm_connectivity(network=self.network) self._check_nonat_network_connectivity(should_connect=False) @@ -293,42 +324,80 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest): 'enable_snat': (not snat)} self._update_router(self.router['id'], self.cmgr_adm.routers_client, external_gateway_info) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_router_policy = self.nsxp.get_logical_router( + self.router['name'], self.router['id']) + self.assertNotEqual(nsx_router_policy, None) + self.assertEqual(nsx_router_policy['resource_type'], 'Tier1') nsx_router = self.nsx.get_logical_router( self.router['name'], self.router['id']) self.assertNotEqual(nsx_router, None) self.assertEqual(nsx_router['router_type'], 'TIER1') # Check nat rules created correctly - nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router) + if CONF.network.backend == 'nsxp': + nat_rules = self.nsxp.get_logical_router_nat_rules( + nsx_router_policy) + else: + nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router) + router_adv = self.nsx.get_logical_router_advertisement(nsx_router) # Check router advertisement is correctly set - router_adv = self.nsx.get_logical_router_advertisement(nsx_router) if len(nat_rules) == 1: self.assertTrue(any(d['action'] == 'NO_DNAT' for d in nat_rules)) else: self.assertTrue(len(nat_rules) == 0) - self.assertFalse(router_adv['advertise_nat_routes'], nat_msg) - self.assertTrue(router_adv['advertise_nsx_connected_routes'], adv_msg) + if CONF.network.backend == 'nsxp': + self.assertFalse( + 'TIER1_NAT' in nsx_router_policy[ + 'route_advertisement_types'], nat_msg) + self.assertTrue( + 'TIER1_CONNECTED' in nsx_router_policy[ + 'route_advertisement_types'], adv_msg) + else: + self.assertFalse(router_adv['advertise_nat_routes'], nat_msg) + self.assertTrue( + router_adv['advertise_nsx_connected_routes'], adv_msg) self._check_nonat_network_connectivity() def _test_router_nat_update_when_no_snat(self): """Test update router from NATed to NoNAT scenario""" snat = False self._setup_network_topo(enable_snat=snat) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_router_policy = self.nsxp.get_logical_router( + self.router['name'], self.router['id']) + self.assertNotEqual(nsx_router_policy, None) + self.assertEqual(nsx_router_policy['resource_type'], 'Tier1') nsx_router = self.nsx.get_logical_router( self.router['name'], self.router['id']) self.assertNotEqual(nsx_router, None) self.assertEqual(nsx_router['router_type'], 'TIER1') # Check nat rules created correctly - nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router) + if CONF.network.backend == 'nsxp': + nat_rules = self.nsxp.get_logical_router_nat_rules( + nsx_router_policy) + else: + nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router) + router_adv = self.nsx.get_logical_router_advertisement(nsx_router) # Check router advertisement is correctly set - router_adv = self.nsx.get_logical_router_advertisement(nsx_router) adv_msg = "Tier1 router's advertise_nsx_connected_routes is not True" nat_msg = "Tier1 router's advertise_nat_routes is not False" if len(nat_rules) == 1: self.assertTrue(any(d['action'] == 'NO_DNAT' for d in nat_rules)) else: self.assertTrue(len(nat_rules) == 0) - self.assertFalse(router_adv['advertise_nat_routes'], nat_msg) - self.assertTrue(router_adv['advertise_nsx_connected_routes'], adv_msg) + if CONF.network.backend == 'nsxp': + self.assertFalse( + 'TIER1_NAT' in nsx_router_policy[ + 'route_advertisement_types'], nat_msg) + self.assertTrue( + 'TIER1_CONNECTED' in nsx_router_policy[ + 'route_advertisement_types'], adv_msg) + else: + self.assertFalse(router_adv['advertise_nat_routes'], nat_msg) + self.assertTrue( + router_adv['advertise_nsx_connected_routes'], adv_msg) self._check_nonat_network_connectivity() # Update router to Enable snat and associate floating ip external_gateway_info = { @@ -338,20 +407,39 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest): external_gateway_info) floating_ip = self.create_floating_ip(self.server) self.floating_ip_tuple = Floating_IP_tuple(floating_ip, self.server) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_router_policy = self.nsxp.get_logical_router( + self.router['name'], self.router['id']) + self.assertNotEqual(nsx_router_policy, None) + self.assertEqual(nsx_router_policy['resource_type'], 'Tier1') nsx_router = self.nsx.get_logical_router( self.router['name'], self.router['id']) self.assertNotEqual(nsx_router, None) self.assertEqual(nsx_router['router_type'], 'TIER1') # Check nat rules created correctly - nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router) + if CONF.network.backend == 'nsxp': + nat_rules = self.nsxp.get_logical_router_nat_rules( + nsx_router_policy) + else: + nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router) + router_adv = self.nsx.get_logical_router_advertisement(nsx_router) # Check router advertisement is correctly set - router_adv = self.nsx.get_logical_router_advertisement(nsx_router) if any(d['action'] == 'NO_DNAT' for d in nat_rules): self.assertTrue(len(nat_rules) == 4) else: self.assertTrue(len(nat_rules) == 3) - self.assertTrue(router_adv['advertise_nat_routes'], nat_msg) - self.assertFalse(router_adv['advertise_nsx_connected_routes'], adv_msg) + if CONF.network.backend == 'nsxp': + self.assertTrue( + 'TIER1_NAT' in nsx_router_policy[ + 'route_advertisement_types'], nat_msg) + self.assertFalse( + 'TIER1_CONNECTED' in nsx_router_policy[ + 'route_advertisement_types'], adv_msg) + else: + self.assertTrue(router_adv['advertise_nat_routes'], nat_msg) + self.assertFalse( + router_adv['advertise_nsx_connected_routes'], adv_msg) self._check_network_internal_connectivity(network=self.network) self._check_network_vm_connectivity(network=self.network)