NSX-P Automation Patch

Added nsxp client
support for router and security group api cases

Change-Id: I88cf258557a0fed54a4c7954e362c4c05cd51b5e
This commit is contained in:
Shubhamk Kadam 2019-02-08 13:20:28 +00:00 committed by Deepthi Kandavara Jayarama
parent 64daec3673
commit e9e66c2531
4 changed files with 475 additions and 23 deletions

View File

@ -0,0 +1,315 @@
# Copyright 2019 VMware Inc
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
from copy import deepcopy
import time
import requests
import six.moves.urllib.parse as urlparse
from oslo_log import log as logging
from oslo_serialization import jsonutils
from vmware_nsx_tempest_plugin.common import constants
requests.packages.urllib3.disable_warnings()
LOG = logging.getLogger(__name__)
class NSXPClient(object):
"""Base NSXP REST client"""
API_VERSION = "v1"
def __init__(self, host, username, password, *args, **kwargs):
self.host = host
self.username = username
self.password = password
self.version = None
self.endpoint = None
self.content_type = "application/json"
self.accept_type = "application/json"
self.verify = False
self.secure = True
self.interface = "json"
self.url = None
self.headers_non_super_admin = self.__set_headers()
self.headers = deepcopy(self.headers_non_super_admin)
self.headers_super_admin = self.__set_headers(super_admin=True)
self.api_version = NSXPClient.API_VERSION
def __set_endpoint(self, endpoint):
self.endpoint = endpoint
def get_endpoint(self):
return self.endpoint
def __set_content_type(self, content_type):
self.content_type = content_type
def get_content_type(self):
return self.content_type
def __set_accept_type(self, accept_type):
self.accept_type = accept_type
def get_accept_type(self):
return self.accept_type
def __set_api_version(self, api_version):
self.api_version = api_version
def get_api_version(self):
return self.api_version
def __set_url(self, api=None, secure=None, host=None, endpoint=None):
api = self.api_version if api is None else api
secure = self.secure if secure is None else secure
host = self.host if host is None else host
endpoint = self.endpoint if endpoint is None else endpoint
http_type = 'https' if secure else 'http'
self.url = '%s://%s/policy/api/%s/infra/%s' % \
(http_type, host, api, endpoint)
def get_url(self):
return self.url
def __set_headers(self, content=None, accept=None, super_admin=False):
content_type = self.content_type if content is None else content
accept_type = self.accept_type if accept is None else accept
auth_cred = self.username + ":" + self.password
auth = base64.b64encode(auth_cred)
headers = {}
headers['Authorization'] = "Basic %s" % auth
headers['Content-Type'] = content_type
headers['Accept'] = accept_type
if super_admin:
headers['X-Allow-Overwrite'] = 'true'
return headers
def get(self, endpoint=None, params=None, cursor=None):
"""
Basic query method for json API request
"""
self.__set_url(endpoint=endpoint)
if cursor:
op = "&" if urlparse.urlparse(self.url).query else "?"
self.url += op + "cursor=" + cursor
response = requests.get(self.url, headers=self.headers,
verify=self.verify, params=params)
return response
def put(self, endpoint=None, body=None):
"""
Basic put API method on endpoint
"""
self.__set_url(endpoint=endpoint)
response = requests.put(self.url, headers=self.headers,
verify=self.verify, data=jsonutils.dumps(body))
return response
def ca_put_request(self, component, comp_id, body):
"""
NSX-T API Put request for certificate Management
"""
endpoint = ("/%s/%s" % (component, comp_id))
response = self.put(endpoint=endpoint, body=body)
return response
def delete(self, endpoint=None, params=None):
"""
Basic delete API method on endpoint
"""
self.__set_url(endpoint=endpoint)
response = requests.delete(self.url, headers=self.headers,
verify=self.verify, params=params)
return response
def ca_delete_request(self, component=None, comp_id=None):
"""
NSX-T API delete request for certificate Management
"""
endpoint = ("/%s/%s" % (component, comp_id))
response = self.delete(endpoint=endpoint)
return response
def delete_super_admin(self, endpoint=None, params=None):
"""
Basic delete API method for NSX super admin on endpoint
"""
self.__set_url(endpoint=endpoint)
response = requests.delete(self.url, headers=self.headers_super_admin,
verify=self.verify, params=params)
return response
def post(self, endpoint=None, body=None):
"""
Basic post API method on endpoint
"""
self.__set_url(endpoint=endpoint)
response = requests.post(self.url, headers=self.headers,
verify=self.verify,
data=jsonutils.dumps(body))
return response
def get_logical_resources(self, endpoint):
"""
Get logical resources based on the endpoint
Getting the logical resource based on the end point. Parse the response
for the cursor. If cursor is present, query url for multiple pages to
get all the logical resources.
"""
results = []
response = self.get(endpoint=endpoint)
res_json = response.json()
cursor = res_json.get("cursor")
if res_json.get("results"):
results.extend(res_json["results"])
while cursor:
page = self.get(endpoint=endpoint, cursor=cursor).json()
results.extend(page.get("results", []))
cursor = page.get("cursor")
return results
def get_os_resources(self, resources):
"""
Get all logical resources created by OpenStack
"""
os_resources = [r for r in resources if 'tags' in r
for tag in r['tags']
if 'os-project-id' in tag.values()]
return os_resources
def get_nsx_resource_by_name(self, nsx_resources, nsx_name):
"""
Get the NSX component created from OpenStack by name.
The name should be converted from os_name to nsx_name.
If found exact one match return it, otherwise report error.
"""
nsx_resource = [n for n in nsx_resources if
n['display_name'] == nsx_name]
if len(nsx_resource) == 0:
LOG.warning("Backend nsx resource %s NOT found!", nsx_name)
return None
if len(nsx_resource) > 1:
LOG.error("More than 1 nsx resources found: %s!",
nsx_resource)
return None
else:
LOG.info("Found nsgroup: %s", nsx_resource[0])
return nsx_resource[0]
def get_transport_zones(self):
"""
Retrieve all transport zones
"""
return self.get_logical_resources("/transport-zones")
def get_logical_routers(self, tier=None):
"""
Retrieve all the logical routers based on router type. If tier
is None, it will return all logical routers.
"""
if tier:
endpoint = "tier-%ss" % tier
else:
endpoint = "tier-1s"
return self.get_logical_resources(endpoint)
def get_logical_router(self, os_name, os_uuid):
"""
Get the logical router based on the os_name and os_uuid provided.
The name of the logical router shoud follow
<os_router_name>_<starting_5_uuid>...<trailing_5_uuid>
Return the logical router if found, otherwise return None.
"""
if not os_name or not os_uuid:
LOG.error("Name and uuid of OS router should be present "
"in order to query backend logical router created")
return None
nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:]
lrouters = self.get_logical_routers()
return self.get_nsx_resource_by_name(lrouters, nsx_name)
def get_ns_groups(self, tenant_id):
"""
Retrieve all NSGroups on NSX backend
"""
return self.get_logical_resources("domains/%s/groups" % tenant_id)
def get_firewall_sections(self, tenant_id=None):
"""
Retrieve all firewall sections
"""
return self.get_logical_resources("domains/%s/security-policies" %
tenant_id)
def get_firewall_section(self, os_name, os_uuid, os_tenant_id=None):
"""
Get the firewall section by os_name and os_uuid
"""
if not os_name or not os_uuid:
LOG.error("Name and uuid of OS security group should be "
"present in order to query backend FW section "
"created")
return None
nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:]
nsx_firewall_time_counter = 0
nsx_dfw_section = None
# wait till timeout or till dfw section
while nsx_firewall_time_counter < \
constants.NSX_FIREWALL_REALIZED_TIMEOUT and \
not nsx_dfw_section:
nsx_firewall_time_counter += 1
fw_sections = self.get_firewall_sections(tenant_id=os_tenant_id)
nsx_dfw_section = self.get_nsx_resource_by_name(fw_sections,
nsx_name)
time.sleep(constants.ONE_SEC)
return nsx_dfw_section
def get_firewall_section_rules(self, fw_section, tenant_id=None):
"""
Retrieve all fw rules for a given fw section
"""
endpoint = "domains/%s/security-policies/%s/rules" % \
(tenant_id, fw_section['id'])
return self.get_logical_resources(endpoint)
def get_firewall_section_rule(self, fw_section, os_uuid,
os_tenant_id=None):
"""
Get the firewall section rule based on the name
"""
fw_rules = self.get_firewall_section_rules(fw_section, os_tenant_id)
nsx_name = os_uuid
return self.get_nsx_resource_by_name(fw_rules, nsx_name)
def get_ns_group(self, os_name, os_uuid, os_tenant_id=None):
"""
Get the NSGroup based on the name provided.
The name of the nsgroup should follow
<os_sg_name> - <os_sg_uuid>
Return nsgroup if found, otherwise return None
"""
if not os_name or not os_uuid:
LOG.error("Name and uuid of OS security group should be "
"present in order to query backend nsgroup created")
return None
nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:]
nsgroups = self.get_ns_groups(tenant_id=os_tenant_id)
return self.get_nsx_resource_by_name(nsgroups, nsx_name)

View File

@ -353,7 +353,7 @@ class NSXV3Client(object):
"""
return self.get_logical_resources("/firewall/sections")
def get_firewall_section(self, os_name, os_uuid):
def get_firewall_section(self, os_name, os_uuid, nsxp=False):
"""
Get the firewall section by os_name and os_uuid
"""
@ -362,7 +362,10 @@ class NSXV3Client(object):
"present in order to query backend FW section "
"created")
return None
nsx_name = os_name + " - " + os_uuid
if nsxp:
nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:]
else:
nsx_name = os_name + " - " + os_uuid
nsx_firewall_time_counter = 0
nsx_dfw_section = None
# wait till timeout or till dfw section
@ -416,7 +419,7 @@ class NSXV3Client(object):
res_json = response.json()
return res_json
def get_ns_group(self, os_name, os_uuid):
def get_ns_group(self, os_name, os_uuid, nsxp=False, os_tenant_id=None):
"""
Get the NSGroup based on the name provided.
The name of the nsgroup should follow
@ -427,7 +430,11 @@ class NSXV3Client(object):
LOG.error("Name and uuid of OS security group should be "
"present in order to query backend nsgroup created")
return None
nsx_name = os_name + " - " + os_uuid
if nsxp:
nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:]
nsx_name = os_tenant_id + '.' + nsx_name
else:
nsx_name = os_name + " - " + os_uuid
nsgroups = self.get_ns_groups()
return self.get_nsx_resource_by_name(nsgroups, nsx_name)

View File

@ -20,6 +20,7 @@ from tempest.lib import decorators
from tempest import test
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.services import nsxp_client
from vmware_nsx_tempest_plugin.services import nsxv3_client
CONF = config.CONF
@ -49,6 +50,9 @@ class NSXv3RoutersTest(base.BaseAdminNetworkTest):
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('0e9938bc-d2a3-4a9a-a4f9-7a93ee8bb344')
@ -59,6 +63,9 @@ class NSXv3RoutersTest(base.BaseAdminNetworkTest):
self.addCleanup(self._delete_router, router['id'])
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router = self.nsxp.get_logical_router(router['name'],
router['id'])
self.assertIsNotNone(nsx_router)
nsx_router = self.nsx.get_logical_router(router['name'],
router['id'])
self.assertEqual(router['name'], router_name)
@ -68,10 +75,13 @@ class NSXv3RoutersTest(base.BaseAdminNetworkTest):
# neutron and nsx backend
updated_name = 'updated ' + router_name
update_body = self.routers_client.update_router(router['id'],
name=updated_name)
name=updated_name)
updated_router = update_body['router']
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router = self.nsxp.get_logical_router(updated_router['name'],
updated_router['id'])
self.assertIsNotNone(nsx_router)
nsx_router = self.nsx.get_logical_router(updated_router['name'],
updated_router['id'])
self.assertEqual(updated_router['name'], updated_name)
@ -85,6 +95,9 @@ class NSXv3RoutersTest(base.BaseAdminNetworkTest):
router = self.create_router(router_name, admin_state_up=True)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router = self.nsxp.get_logical_router(router['name'],
router['id'])
self.assertIsNotNone(nsx_router)
nsx_router = self.nsx.get_logical_router(router['name'],
router['id'])
self.assertEqual(router['name'], router_name)
@ -93,6 +106,9 @@ class NSXv3RoutersTest(base.BaseAdminNetworkTest):
self.routers_client.delete_router(router['id'])
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router = self.nsxp.get_logical_router(router['name'],
router['id'])
self.assertIsNone(nsx_router)
nsx_router = self.nsx.get_logical_router(router['name'],
router['id'])
self.assertIsNone(nsx_router)

View File

@ -25,6 +25,7 @@ from tempest.lib import decorators
from tempest import test
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.services import nsxp_client
from vmware_nsx_tempest_plugin.services import nsxv3_client
LOG = logging.getLogger(__name__)
@ -48,6 +49,9 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
def _create_verify_security_group_rule(self, sg_id, direction,
ethertype, protocol,
@ -84,7 +88,8 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
"rule does not match with %s." %
(key, value))
def _wait_till_firewall_gets_realize(self, secgroup, dfw_error_msg=""):
def _wait_till_firewall_gets_realize(self, secgroup,
dfw_error_msg="", tenant_id=None):
nsx_firewall_time_counter = 0
nsx_dfw_section = None
# wait till timeout or till dfw section
@ -92,14 +97,34 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
constants.NSX_FIREWALL_REALIZED_TIMEOUT and \
not nsx_dfw_section:
nsx_firewall_time_counter += 1
nsx_nsgroup = self.nsx.get_ns_group(secgroup['name'],
secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(secgroup['name'],
secgroup['id'])
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_nsgroup_policy = self.nsxp.get_ns_group(
secgroup['name'], secgroup['id'],
os_tenant_id=tenant_id)
self.assertIsNotNone(nsx_nsgroup_policy)
nsx_dfw_section_policy = self.nsxp.get_firewall_section(
secgroup['name'], secgroup['id'],
os_tenant_id=tenant_id)
self.assertIsNotNone(nsx_dfw_section_policy, dfw_error_msg)
nsx_nsgroup = self.nsx.get_ns_group(
secgroup['name'], secgroup['id'], nsxp=True,
os_tenant_id=tenant_id)
nsx_dfw_section = self.nsx.get_firewall_section(
secgroup['name'], secgroup['id'], nsxp=True)
else:
nsx_nsgroup = self.nsx.get_ns_group(secgroup['name'],
secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(
secgroup['name'], secgroup['id'])
time.sleep(constants.ONE_SEC)
self.assertIsNotNone(nsx_nsgroup)
self.assertIsNotNone(nsx_dfw_section, dfw_error_msg)
return nsx_nsgroup, nsx_dfw_section
if CONF.network.backend == 'nsxp':
return nsx_nsgroup_policy, nsx_dfw_section_policy,\
nsx_nsgroup, nsx_dfw_section
else:
return nsx_nsgroup, nsx_dfw_section
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('105ca2c6-a14e-448b-b227-a7366e611bf2')
@ -108,8 +133,15 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
group_create_body, name = self._create_security_group()
secgroup = group_create_body['security_group']
dfw_error_msg = "Firewall section not found for %s!" % secgroup['name']
nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg)
if CONF.network.backend == 'nsxp':
nsx_nsgroup_policy, nsx_dfw_section_policy,\
nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup,
dfw_error_msg,
secgroup['tenant_id'])
else:
nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg)
client = self.security_group_rules_client
rule_create_body = client.create_security_group_rule(
security_group_id=secgroup['id'],
@ -121,12 +153,24 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
remote_ip_prefix='0.0.0.0/0',
)
secgroup_rule = rule_create_body['security_group_rule']
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_dfw_rule = self.nsxp.get_firewall_section_rule(
nsx_dfw_section_policy,
secgroup_rule['id'], secgroup_rule['tenant_id'])
self.assertIsNotNone(nsx_dfw_rule)
nsx_dfw_rule = self.nsx.get_firewall_section_rule(
nsx_dfw_section,
secgroup_rule['id'])
self.assertIsNotNone(nsx_dfw_rule)
# Delete the security group rule
client.delete_security_group_rule(secgroup_rule['id'])
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_dfw_rule = self.nsxp.get_firewall_section_rule(
nsx_dfw_section_policy,
secgroup_rule['id'], secgroup_rule['tenant_id'])
self.assertIsNone(nsx_dfw_rule)
nsx_dfw_rule = self.nsx.get_firewall_section_rule(
nsx_dfw_section,
secgroup_rule['id'])
@ -139,7 +183,15 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
group_create_body, name = self._create_security_group()
secgroup = group_create_body['security_group']
dfw_error_msg = "Firewall section not found for %s!" % secgroup['name']
self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg)
if CONF.network.backend == 'nsxp':
nsx_nsgroup_policy, nsx_dfw_section_policy,\
nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(
secgroup, dfw_error_msg,
secgroup['tenant_id'])
else:
nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg)
# List security groups and verify if created group is there in response
list_body = self.security_groups_client.list_security_groups()
secgroup_list = list()
@ -157,7 +209,13 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
self.assertEqual(updated_secgroup['description'], new_description)
dfw_error_msg = "Firewall section is not updated for %s!" % \
updated_secgroup['name']
self._wait_till_firewall_gets_realize(updated_secgroup, dfw_error_msg)
if CONF.network.backend == 'nsxp':
self._wait_till_firewall_gets_realize(
updated_secgroup, dfw_error_msg,
updated_secgroup['tenant_id'])
else:
self._wait_till_firewall_gets_realize(updated_secgroup,
dfw_error_msg)
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('e637cc59-c5e6-49b5-a539-e517e780656e')
@ -168,13 +226,35 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
name=name)
secgroup = create_body['security_group']
dfw_error_msg = "Firewall section not found for %s!" % secgroup['name']
self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg)
if CONF.network.backend == 'nsxp':
self._wait_till_firewall_gets_realize(secgroup,
dfw_error_msg,
secgroup['tenant_id'])
else:
self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg)
self.assertEqual(secgroup['name'], name)
# Delete the security group
self._delete_security_group(secgroup['id'])
nsx_nsgroup = self.nsx.get_ns_group(secgroup['name'],
secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(name, secgroup['id'])
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_nsgroup_policy = self.nsxp.get_ns_group(
secgroup['name'], secgroup['id'],
os_tenant_id=secgroup['tenant_id'])
self.assertIsNone(nsx_nsgroup_policy)
nsx_dfw_section_policy = self.nsxp.get_firewall_section(
secgroup['name'], secgroup['id'],
os_tenant_id=secgroup['tenant_id'])
self.assertIsNone(nsx_dfw_section_policy, dfw_error_msg)
nsx_nsgroup = self.nsx.get_ns_group(
secgroup['name'], secgroup['id'], nsxp=True,
os_tenant_id=secgroup['tenant_id'])
nsx_dfw_section = self.nsx.get_firewall_section(
secgroup['name'], secgroup['id'], nsxp=True)
else:
nsx_nsgroup = self.nsx.get_ns_group(secgroup['name'],
secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(name,
secgroup['id'])
self.assertIsNone(nsx_nsgroup)
self.assertIsNone(nsx_dfw_section)
@ -185,8 +265,16 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
create_body, _ = self._create_security_group()
secgroup = create_body['security_group']
dfw_error_msg = "Firewall section not found for %s!" % secgroup['name']
nsx_nsgroup, nsx_dfw_section = self._wait_till_firewall_gets_realize(
secgroup, dfw_error_msg)
if CONF.network.backend == 'nsxp':
nsx_nsgroup_policy, nsx_dfw_section_policy,\
nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(
secgroup, dfw_error_msg,
secgroup['tenant_id'])
else:
nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup,
dfw_error_msg)
# Create rules for each protocol
protocols = ['tcp', 'udp', 'icmp']
client = self.security_group_rules_client
@ -215,6 +303,12 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
for rule in rule_list_body['security_group_rules']]
self.assertIn(rule_create_body['security_group_rule']['id'],
rule_list)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_dfw_rule = self.nsxp.get_firewall_section_rule(
nsx_dfw_section_policy,
secgroup_rule['id'], secgroup_rule['tenant_id'])
self.assertIsNotNone(nsx_dfw_rule)
nsx_dfw_rule = self.nsx.get_firewall_section_rule(
nsx_dfw_section,
secgroup_rule['id'])
@ -243,8 +337,16 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
create_body, _ = self._create_security_group()
secgroup = create_body['security_group']
dfw_error_msg = "Firewall section not found for %s!" % secgroup['name']
nsx_nsgroup, nsx_dfw_section = self._wait_till_firewall_gets_realize(
secgroup, dfw_error_msg)
if CONF.network.backend == 'nsxp':
nsx_nsgroup_policy, nsx_dfw_section_policy,\
nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup,
dfw_error_msg,
secgroup['tenant_id'])
else:
nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup,
dfw_error_msg)
# Create a security group rule
client = self.security_group_rules_client
rule_create_body = client.create_security_group_rule(
@ -256,12 +358,24 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
ethertype=self.ethertype
)
secgroup_rule = rule_create_body['security_group_rule']
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_dfw_rule = self.nsxp.get_firewall_section_rule(
nsx_dfw_section_policy,
secgroup_rule['id'], secgroup_rule['tenant_id'])
self.assertIsNotNone(nsx_dfw_rule)
nsx_dfw_rule = self.nsx.get_firewall_section_rule(
nsx_dfw_section,
secgroup_rule['id'])
self.assertIsNotNone(nsx_dfw_rule)
# Delete the security group rule
client.delete_security_group_rule(secgroup_rule['id'])
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_dfw_rule = self.nsxp.get_firewall_section_rule(
nsx_dfw_section_policy,
secgroup_rule['id'], secgroup_rule['tenant_id'])
self.assertIsNone(nsx_dfw_rule)
nsx_dfw_rule = self.nsx.get_firewall_section_rule(
nsx_dfw_section,
secgroup_rule['id'])