New basic API tests for the default SG rules templates CRUDs

This patch adds some basic API tests for the new API for default SG
rules templates. Those new tests are checking if by default SG rules are
set in the same way as legacy rules which were there since "forever".
Second test checks basic lifecycle of the SG rule template.

Depends-On: https://review.opendev.org/c/openstack/neutron/+/883246/

Related-Bug: #1983053
Change-Id: I458f54ff6b73e277fe9506e90fa6af44d9c51101
This commit is contained in:
Slawek Kaplonski 2023-05-18 18:59:26 +02:00
parent 97409c6992
commit aa22c9e1bb
4 changed files with 314 additions and 7 deletions

View File

@ -0,0 +1,260 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import random
from neutron_lib import constants
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from neutron_tempest_plugin.api import base
RULE_KEYWORDS_TO_CHECK = [
'direction', 'remote_group_id', 'remote_address_group_id', 'description',
'protocol', 'port_range_min', 'port_range_max', 'ethertype',
'remote_ip_prefix', 'used_in_default_sg', 'used_in_non_default_sg'
]
class DefaultSecurityGroupRuleTest(base.BaseNetworkTest):
required_extensions = ['security-groups-default-rules']
credentials = ['primary', 'admin']
@classmethod
def setup_clients(cls):
super(DefaultSecurityGroupRuleTest, cls).setup_clients()
cls.admin_client = cls.os_admin.network_client
def _filter_not_relevant_rule_keys(self, rule):
new_rule = {}
rule_keys = list(rule.keys())
for k in rule_keys:
if k in RULE_KEYWORDS_TO_CHECK:
new_rule[k] = rule[k]
return new_rule
def _filter_not_relevant_rules_keys(self, rules):
return [self._filter_not_relevant_rule_keys(r) for r in rules]
def _assert_rules_exists(self, expected_rules, actual_rules):
actual_rules = self._filter_not_relevant_rules_keys(actual_rules)
for expected_rule in expected_rules:
self.assertIn(expected_rule, actual_rules)
@decorators.idempotent_id('2f3d3070-e9fa-4127-a33f-f1532fd89108')
def test_legacy_default_sg_rules_created_by_default(self):
expected_legacy_template_rules = [
{
'direction': 'egress',
'ethertype': 'IPv4',
'remote_group_id': None,
'protocol': None,
'remote_ip_prefix': None,
'remote_address_group_id': None,
'port_range_max': None,
'port_range_min': None,
'used_in_default_sg': True,
'used_in_non_default_sg': True,
'description': 'Legacy default SG rule for egress traffic'
}, {
'remote_group_id': 'PARENT',
'direction': 'ingress',
'ethertype': 'IPv6',
'protocol': None,
'remote_ip_prefix': None,
'remote_address_group_id': None,
'port_range_max': None,
'port_range_min': None,
'used_in_default_sg': True,
'used_in_non_default_sg': False,
'description': 'Legacy default SG rule for ingress traffic'
}, {
'remote_group_id': 'PARENT',
'direction': 'ingress',
'ethertype': 'IPv4',
'protocol': None,
'remote_ip_prefix': None,
'remote_address_group_id': None,
'port_range_max': None,
'port_range_min': None,
'used_in_default_sg': True,
'used_in_non_default_sg': False,
'description': 'Legacy default SG rule for ingress traffic'
}, {
'direction': 'egress',
'ethertype': 'IPv6',
'remote_group_id': None,
'protocol': None,
'remote_ip_prefix': None,
'remote_address_group_id': None,
'port_range_max': None,
'port_range_min': None,
'used_in_default_sg': True,
'used_in_non_default_sg': True,
'description': 'Legacy default SG rule for egress traffic'
}
]
sg_rules_template = (
self.admin_client.list_default_security_group_rules()[
'default_security_group_rules'
])
self._assert_rules_exists(expected_legacy_template_rules,
sg_rules_template)
@decorators.idempotent_id('df98f969-ff2d-4597-9765-f5d4f81f775f')
def test_default_security_group_rule_lifecycle(self):
tcp_port = random.randint(constants.PORT_RANGE_MIN,
constants.PORT_RANGE_MAX)
rule_args = {
'direction': 'ingress',
'ethertype': 'IPv4',
'protocol': 'tcp',
'port_range_max': tcp_port,
'port_range_min': tcp_port,
'used_in_default_sg': False,
'used_in_non_default_sg': True,
'description': (
'Allow tcp connections over IPv4 on port %s' % tcp_port)
}
expected_rule = {
'remote_group_id': None,
'direction': 'ingress',
'ethertype': 'IPv4',
'protocol': 'tcp',
'port_range_min': tcp_port,
'port_range_max': tcp_port,
'remote_ip_prefix': None,
'remote_address_group_id': None,
'used_in_default_sg': False,
'used_in_non_default_sg': True,
'description': (
'Allow tcp connections over IPv4 on port %s' % tcp_port)
}
created_rule_template = self.create_default_security_group_rule(
**rule_args)
self.assertDictEqual(
expected_rule,
self._filter_not_relevant_rule_keys(created_rule_template)
)
observed_rule_template = (
self.admin_client.get_default_security_group_rule(
created_rule_template['id'])
)['default_security_group_rule']
self.assertDictEqual(
expected_rule,
self._filter_not_relevant_rule_keys(observed_rule_template)
)
self.admin_client.delete_default_security_group_rule(
created_rule_template['id']
)
self.assertRaises(
lib_exc.NotFound,
self.admin_client.get_default_security_group_rule,
created_rule_template['id']
)
@decorators.idempotent_id('6c5a2f41-5899-47f4-9daf-4f8ddbbd3ad5')
def test_create_duplicate_default_security_group_rule_different_templates(
self):
tcp_port = random.randint(constants.PORT_RANGE_MIN,
constants.PORT_RANGE_MAX)
rule_args = {
'direction': 'ingress',
'ethertype': 'IPv4',
'protocol': 'tcp',
'port_range_max': tcp_port,
'port_range_min': tcp_port,
'used_in_default_sg': True,
'used_in_non_default_sg': True}
self.create_default_security_group_rule(**rule_args)
# Now, even if 'used_in_non_default_sg' will be different error should
# be returned as 'used_in_default_sg' is the same
new_rule_args = copy.copy(rule_args)
new_rule_args['used_in_non_default_sg'] = False
self.assertRaises(
lib_exc.Conflict,
self.admin_client.create_default_security_group_rule,
**new_rule_args)
# Same in the opposite way: even if 'used_in_default_sg' will be
# different error should be returned as 'used_in_non_default_sg'
# is the same
new_rule_args = copy.copy(rule_args)
new_rule_args['used_in_default_sg'] = False
self.assertRaises(
lib_exc.Conflict,
self.admin_client.create_default_security_group_rule,
**new_rule_args)
@decorators.idempotent_id('e4696607-1a13-48eb-8912-ee1e742d9471')
def test_create_same_default_security_group_rule_for_different_templates(
self):
tcp_port = random.randint(constants.PORT_RANGE_MIN,
constants.PORT_RANGE_MAX)
expected_rules = [{
'remote_group_id': None,
'direction': 'ingress',
'ethertype': 'IPv4',
'protocol': 'tcp',
'remote_ip_prefix': None,
'remote_address_group_id': None,
'port_range_max': tcp_port,
'port_range_min': tcp_port,
'used_in_default_sg': True,
'used_in_non_default_sg': False,
'description': ''
}, {
'remote_group_id': None,
'direction': 'ingress',
'ethertype': 'IPv4',
'protocol': 'tcp',
'remote_ip_prefix': None,
'remote_address_group_id': None,
'port_range_max': tcp_port,
'port_range_min': tcp_port,
'used_in_default_sg': False,
'used_in_non_default_sg': True,
'description': ''
}]
default_sg_rule_args = {
'direction': 'ingress',
'ethertype': 'IPv4',
'protocol': 'tcp',
'port_range_max': tcp_port,
'port_range_min': tcp_port,
'used_in_default_sg': True,
'used_in_non_default_sg': False}
self.create_default_security_group_rule(**default_sg_rule_args)
custom_sg_rule_args = {
'direction': 'ingress',
'ethertype': 'IPv4',
'protocol': 'tcp',
'port_range_max': tcp_port,
'port_range_min': tcp_port,
'used_in_default_sg': False,
'used_in_non_default_sg': True}
self.create_default_security_group_rule(**custom_sg_rule_args)
sg_rules_template = (
self.admin_client.list_default_security_group_rules()[
'default_security_group_rules'
])
self._assert_rules_exists(expected_rules,
sg_rules_template)

View File

@ -135,6 +135,7 @@ class BaseNetworkTest(test.BaseTestCase):
cls.admin_subnetpools = []
cls.security_groups = []
cls.admin_security_groups = []
cls.sg_rule_templates = []
cls.projects = []
cls.log_objects = []
cls.reserved_subnet_cidrs = set()
@ -243,6 +244,12 @@ class BaseNetworkTest(test.BaseTestCase):
security_group,
client=cls.admin_client)
# Clean up security group rule templates
for sg_rule_template in cls.sg_rule_templates:
cls._try_delete_resource(
cls.admin_client.delete_default_security_group_rule,
sg_rule_template['id'])
for subnetpool in cls.subnetpools:
cls._try_delete_resource(cls.client.delete_subnetpool,
subnetpool['id'])
@ -970,6 +977,15 @@ class BaseNetworkTest(test.BaseTestCase):
client = client or security_group.get('client') or cls.client
client.delete_security_group(security_group['id'])
@classmethod
def get_security_group(cls, name='default', client=None):
client = client or cls.client
security_groups = client.list_security_groups()['security_groups']
for security_group in security_groups:
if security_group['name'] == name:
return security_group
raise ValueError("No such security group named {!r}".format(name))
@classmethod
def create_security_group_rule(cls, security_group=None, project=None,
client=None, ip_version=None, **kwargs):
@ -1006,13 +1022,11 @@ class BaseNetworkTest(test.BaseTestCase):
'security_group_rule']
@classmethod
def get_security_group(cls, name='default', client=None):
client = client or cls.client
security_groups = client.list_security_groups()['security_groups']
for security_group in security_groups:
if security_group['name'] == name:
return security_group
raise ValueError("No such security group named {!r}".format(name))
def create_default_security_group_rule(cls, **kwargs):
body = cls.admin_client.create_default_security_group_rule(**kwargs)
default_sg_rule = body['default_security_group_rule']
cls.sg_rule_templates.append(default_sg_rule)
return default_sg_rule
@classmethod
def create_keypair(cls, client=None, name=None, **kwargs):

View File

@ -846,6 +846,38 @@ class NetworkClientJSON(service_client.RestClient):
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def list_default_security_group_rules(self, **kwargs):
uri = '%s/default-security-group-rules' % self.uri_prefix
if kwargs:
uri += '?' + urlparse.urlencode(kwargs, doseq=1)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def get_default_security_group_rule(self, rule_id):
uri = '%s/default-security-group-rules/%s' % (self.uri_prefix,
rule_id)
get_resp, get_resp_body = self.get(uri)
self.expected_success(200, get_resp.status)
body = jsonutils.loads(get_resp_body)
return service_client.ResponseBody(get_resp, body)
def create_default_security_group_rule(self, **kwargs):
post_body = {'default_security_group_rule': kwargs}
body = jsonutils.dumps(post_body)
uri = '%s/default-security-group-rules' % self.uri_prefix
resp, body = self.post(uri, body)
self.expected_success(201, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def delete_default_security_group_rule(self, rule_id):
uri = '%s/default-security-group-rules/%s' % (self.uri_prefix, rule_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def list_ports(self, **kwargs):
uri = '%s/ports' % self.uri_prefix
if kwargs:

View File

@ -114,6 +114,7 @@
- router
- router_availability_zone
- security-group
- security-groups-default-rules
- security-groups-remote-address-group
- segment
- service-type