[Policy] Converting port security tests to policy
Change-Id: Ib7b35c1bb898203ed2ff5937c8652338dfdc9b47
This commit is contained in:
parent
3d98e38f99
commit
de71fe7334
@ -402,15 +402,21 @@ class NSXV3Client(object):
|
|||||||
"""
|
"""
|
||||||
return self.get_logical_resources("/ns-groups")
|
return self.get_logical_resources("/ns-groups")
|
||||||
|
|
||||||
def get_neutron_ns_group_id(self):
|
def get_neutron_ns_group_id(self, nsxp=False):
|
||||||
"""
|
"""
|
||||||
Retrieve NSGroup Id
|
Retrieve NSGroup Id
|
||||||
"""
|
"""
|
||||||
nsx_nsgroup = self.get_ns_groups()
|
nsx_nsgroup = self.get_ns_groups()
|
||||||
for group in nsx_nsgroup:
|
if nsxp:
|
||||||
if group['display_name'] == 'neutron_excluded_port_nsgroup':
|
for group in nsx_nsgroup:
|
||||||
nsgroup_id = group['id']
|
if group['display_name'] == (
|
||||||
return nsgroup_id
|
'default.neutron_excluded_ports_group'):
|
||||||
|
nsgroup_id = group['id']
|
||||||
|
else:
|
||||||
|
for group in nsx_nsgroup:
|
||||||
|
if group['display_name'] == 'neutron_excluded_port_nsgroup':
|
||||||
|
nsgroup_id = group['id']
|
||||||
|
return nsgroup_id
|
||||||
|
|
||||||
def get_ns_group_port_members(self, ns_group_id):
|
def get_ns_group_port_members(self, ns_group_id):
|
||||||
"""
|
"""
|
||||||
|
@ -23,6 +23,7 @@ from tempest.lib import exceptions
|
|||||||
|
|
||||||
from tempest import test
|
from tempest import test
|
||||||
from vmware_nsx_tempest_plugin.common import constants
|
from vmware_nsx_tempest_plugin.common import constants
|
||||||
|
from vmware_nsx_tempest_plugin.services import nsxp_client
|
||||||
from vmware_nsx_tempest_plugin.services import nsxv3_client
|
from vmware_nsx_tempest_plugin.services import nsxv3_client
|
||||||
|
|
||||||
CONF = config.CONF
|
CONF = config.CONF
|
||||||
@ -57,6 +58,9 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest):
|
|||||||
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
|
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
|
||||||
CONF.nsxv3.nsx_user,
|
CONF.nsxv3.nsx_user,
|
||||||
CONF.nsxv3.nsx_password)
|
CONF.nsxv3.nsx_password)
|
||||||
|
cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
|
||||||
|
CONF.nsxv3.nsx_user,
|
||||||
|
CONF.nsxv3.nsx_password)
|
||||||
cls.network = cls.create_network()
|
cls.network = cls.create_network()
|
||||||
|
|
||||||
def get_tag_port_id(self, nsxgroup_data, org_port_id):
|
def get_tag_port_id(self, nsxgroup_data, org_port_id):
|
||||||
@ -136,6 +140,17 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest):
|
|||||||
secgroup_id = secgroup['id']
|
secgroup_id = secgroup['id']
|
||||||
return secgroup_id
|
return secgroup_id
|
||||||
|
|
||||||
|
def _create_security_group(self, client):
|
||||||
|
"""
|
||||||
|
Method to create security group and return id
|
||||||
|
"""
|
||||||
|
security_client = client.security_groups_client
|
||||||
|
create_body = security_client.create_security_group(name='sec-group')
|
||||||
|
secgroup = create_body['security_group']
|
||||||
|
# Sleep for 5 sec
|
||||||
|
time.sleep(constants.NSX_BACKEND_VERY_SMALL_TIME_INTERVAL)
|
||||||
|
return secgroup
|
||||||
|
|
||||||
@decorators.attr(type='nsxv3')
|
@decorators.attr(type='nsxv3')
|
||||||
@decorators.idempotent_id('50203701-1cda-4f31-806d-7a51514b9664')
|
@decorators.idempotent_id('50203701-1cda-4f31-806d-7a51514b9664')
|
||||||
def test_create_port_with_security_enabled_check_in_neutron_database(self):
|
def test_create_port_with_security_enabled_check_in_neutron_database(self):
|
||||||
@ -181,7 +196,11 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest):
|
|||||||
port_detail = port_client.show_port(port_id['port']['id'])
|
port_detail = port_client.show_port(port_id['port']['id'])
|
||||||
self.assertEqual(False, port_detail['port']["port_security_enabled"])
|
self.assertEqual(False, port_detail['port']["port_security_enabled"])
|
||||||
org_port_id = port_id['port']['id']
|
org_port_id = port_id['port']['id']
|
||||||
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
if CONF.network.backend == 'nsxp':
|
||||||
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
|
nsgroup_id = self.nsx.get_neutron_ns_group_id(nsxp=True)
|
||||||
|
else:
|
||||||
|
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
||||||
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
||||||
# Sleep for 10 sec
|
# Sleep for 10 sec
|
||||||
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
@ -214,16 +233,24 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest):
|
|||||||
@decorators.attr(type='nsxv3')
|
@decorators.attr(type='nsxv3')
|
||||||
@decorators.idempotent_id('ee6213ac-dfcd-401b-bbc6-03afd26f203a')
|
@decorators.idempotent_id('ee6213ac-dfcd-401b-bbc6-03afd26f203a')
|
||||||
def test_tenant_port_security_at_beckend_after_enable_disable(self):
|
def test_tenant_port_security_at_beckend_after_enable_disable(self):
|
||||||
secgroup_id = self._create_security_group_and_return_id(self.cmgr_alt)
|
secgroup = self._create_security_group(self.cmgr_alt)
|
||||||
network_topo = self._create_network_topo(self.cmgr_alt)
|
network_topo = self._create_network_topo(self.cmgr_alt)
|
||||||
port_client = self.cmgr_alt.ports_client
|
port_client = self.cmgr_alt.ports_client
|
||||||
kwargs = {"port_security_enabled": "false", "security_groups": []}
|
kwargs = {"port_security_enabled": "false", "security_groups": []}
|
||||||
org_port_id = network_topo['port']['port']['id']
|
org_port_id = network_topo['port']['port']['id']
|
||||||
port_client.update_port(org_port_id,
|
port_client.update_port(org_port_id,
|
||||||
**kwargs)
|
**kwargs)
|
||||||
# Sleep for 10 sec
|
if CONF.network.backend == 'nsxp':
|
||||||
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
nsx_nsgroup_policy = self.nsxp.get_ns_group(
|
||||||
|
secgroup['name'], secgroup['id'],
|
||||||
|
os_tenant_id=secgroup['tenant_id'])
|
||||||
|
self.assertIsNotNone(nsx_nsgroup_policy)
|
||||||
|
nsgroup_id = self.nsx.get_neutron_ns_group_id(nsxp=True)
|
||||||
|
else:
|
||||||
|
# Sleep for 10 sec
|
||||||
|
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
|
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
||||||
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
||||||
corresponding_port_id = self.get_tag_port_id(nsxgroup_data,
|
corresponding_port_id = self.get_tag_port_id(nsxgroup_data,
|
||||||
org_port_id)
|
org_port_id)
|
||||||
@ -233,11 +260,14 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest):
|
|||||||
corresponding_port_id)
|
corresponding_port_id)
|
||||||
self.assertEqual(True, status)
|
self.assertEqual(True, status)
|
||||||
kwargs = {"port_security_enabled": "true",
|
kwargs = {"port_security_enabled": "true",
|
||||||
"security_groups": [secgroup_id]}
|
"security_groups": [secgroup['id']]}
|
||||||
port_client.update_port(org_port_id,
|
port_client.update_port(org_port_id,
|
||||||
**kwargs)
|
**kwargs)
|
||||||
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
|
if CONF.network.backend == 'nsxp':
|
||||||
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
|
else:
|
||||||
|
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
|
||||||
|
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
||||||
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
||||||
status = self.check_port_not_exists_in_os_group(nsxgroup_data,
|
status = self.check_port_not_exists_in_os_group(nsxgroup_data,
|
||||||
corresponding_port_id)
|
corresponding_port_id)
|
||||||
@ -246,7 +276,7 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest):
|
|||||||
@decorators.attr(type='nsxv3')
|
@decorators.attr(type='nsxv3')
|
||||||
@decorators.idempotent_id('c6f4c2f2-3fc9-4983-a05a-bb3a3dc35ad8')
|
@decorators.idempotent_id('c6f4c2f2-3fc9-4983-a05a-bb3a3dc35ad8')
|
||||||
def test_admin_port_security_at_beckend_after_enable_disable(self):
|
def test_admin_port_security_at_beckend_after_enable_disable(self):
|
||||||
secgroup_id = self._create_security_group_and_return_id(self.cmgr_adm)
|
secgroup = self._create_security_group(self.cmgr_adm)
|
||||||
network_topo = self._create_network_topo(self.cmgr_adm)
|
network_topo = self._create_network_topo(self.cmgr_adm)
|
||||||
port_client = self.cmgr_adm.ports_client
|
port_client = self.cmgr_adm.ports_client
|
||||||
kwargs = {"port_security_enabled": "false",
|
kwargs = {"port_security_enabled": "false",
|
||||||
@ -254,9 +284,17 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest):
|
|||||||
org_port_id = network_topo['port']['port']['id']
|
org_port_id = network_topo['port']['port']['id']
|
||||||
port_client.update_port(org_port_id,
|
port_client.update_port(org_port_id,
|
||||||
**kwargs)
|
**kwargs)
|
||||||
# Sleep for 10 sec
|
if CONF.network.backend == 'nsxp':
|
||||||
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
nsx_nsgroup_policy = self.nsxp.get_ns_group(
|
||||||
|
secgroup['name'], secgroup['id'],
|
||||||
|
os_tenant_id=secgroup['tenant_id'])
|
||||||
|
self.assertIsNotNone(nsx_nsgroup_policy)
|
||||||
|
nsgroup_id = self.nsx.get_neutron_ns_group_id(nsxp=True)
|
||||||
|
else:
|
||||||
|
# Sleep for 10 sec
|
||||||
|
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
|
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
||||||
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
||||||
corresponding_port_id = self.get_tag_port_id(nsxgroup_data,
|
corresponding_port_id = self.get_tag_port_id(nsxgroup_data,
|
||||||
org_port_id)
|
org_port_id)
|
||||||
@ -266,10 +304,14 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest):
|
|||||||
corresponding_port_id)
|
corresponding_port_id)
|
||||||
self.assertEqual(True, status)
|
self.assertEqual(True, status)
|
||||||
kwargs = {"port_security_enabled": "true",
|
kwargs = {"port_security_enabled": "true",
|
||||||
"security_groups": [secgroup_id]}
|
"security_groups": [secgroup['id']]}
|
||||||
port_client.update_port(org_port_id, **kwargs)
|
port_client.update_port(org_port_id, **kwargs)
|
||||||
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
|
if CONF.network.backend == 'nsxp':
|
||||||
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
|
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
||||||
|
else:
|
||||||
|
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
|
||||||
|
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
||||||
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
||||||
status = self.check_port_not_exists_in_os_group(nsxgroup_data,
|
status = self.check_port_not_exists_in_os_group(nsxgroup_data,
|
||||||
corresponding_port_id)
|
corresponding_port_id)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user