Add special treatment for 'any' in SG rule API

The openstack client changes the protocol to None in
the case that 'any' is given as an argument when creating
a security group rule. But using 'any' in a POST call
will return an error saying it is invalid.

Add special treatment for 'any' as a protocol value in
the API by treating it the same as None, but do not
use the 'any' string when creating the DB entry, it is
only treated as an alias.

Closes-bug: #2074056
Change-Id: Ic88ae2c249eb2cd1af1ebbf6707c707f51a52638
This commit is contained in:
Brian Haley 2024-08-17 19:37:36 -04:00
parent f17cc24e8a
commit 81375f0b2b
6 changed files with 37 additions and 4 deletions

View File

@ -45,6 +45,13 @@ IPTABLES_MULTIPORT_ONLY_PROTOCOLS = [
IPV6_ICMP_LEGACY_PROTO_LIST = [constants.PROTO_NAME_ICMP,
constants.PROTO_NAME_IPV6_ICMP_LEGACY]
# Protocol 'any', which is treated the same way as 'no protocol' in the API.
# The openstack client changes 'any' to None for the API call, but we want
# to accept both as being the same, for example, if it was passed directly
# via the curl command.
PROTO_NAME_ANY = 'any'
SG_RULE_PROTO_ANY = (None, PROTO_NAME_ANY)
# Number of resources for neutron agent side functions to deal
# with large sets.
# Setting this value does not count on special conditions, it is just a human

View File

@ -686,7 +686,7 @@ class SecurityGroupDbMixin(
rule_obj, fields=fields)
def _get_ip_proto_number(self, protocol):
if protocol is None:
if protocol in const.SG_RULE_PROTO_ANY:
return
# According to bug 1381379, protocol is always set to string to avoid
# problems with comparing int and string in PostgreSQL. Here this
@ -697,7 +697,7 @@ class SecurityGroupDbMixin(
return int(constants.IP_PROTOCOL_MAP.get(protocol, protocol))
def _get_ip_proto_name_and_num(self, protocol, ethertype=None):
if protocol is None:
if protocol in const.SG_RULE_PROTO_ANY:
return
protocol = str(protocol)
# Force all legacy IPv6 ICMP protocol names to be 'ipv6-icmp', and

View File

@ -28,6 +28,7 @@ from oslo_utils import netutils
from neutron._i18n import _
from neutron.api import extensions
from neutron.api.v2 import base
from neutron.common import _constants
from neutron.conf import quota
from neutron.extensions import standardattrdescription as stdattr_ext
from neutron.quota import resource_registry
@ -148,7 +149,7 @@ class SecurityGroupRuleInvalidEtherType(exceptions.InvalidInput):
def convert_protocol(value):
if value is None:
if value in _constants.SG_RULE_PROTO_ANY:
return
try:
val = int(value)
@ -208,7 +209,8 @@ def _validate_name_not_default(data, max_len=db_const.NAME_FIELD_SIZE):
validators.add_validator('name_not_default', _validate_name_not_default)
sg_supported_protocols = ([None] + list(const.IP_PROTOCOL_MAP.keys()))
sg_supported_protocols = (_constants.SG_RULE_PROTO_ANY +
tuple(const.IP_PROTOCOL_MAP.keys()))
sg_supported_ethertypes = ['IPv4', 'IPv6']
SECURITYGROUPS = 'security_groups'
SECURITYGROUPRULES = 'security_group_rules'

View File

@ -24,6 +24,7 @@ from neutron_lib.objects import exceptions as obj_exc
import sqlalchemy
import testtools
from neutron.common import _constants as const
from neutron.db import securitygroups_db
from neutron.extensions import security_groups_default_rules as \
ext_sg_default_rules
@ -499,10 +500,13 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
def test_get_ip_proto_name_and_num(self):
protocols = [constants.PROTO_NAME_UDP, str(constants.PROTO_NUM_TCP),
constants.PROTO_NAME_IP, None, const.PROTO_NAME_ANY,
'blah', '111']
protocol_names_nums = (
[[constants.PROTO_NAME_UDP, str(constants.PROTO_NUM_UDP)],
[constants.PROTO_NAME_TCP, str(constants.PROTO_NUM_TCP)],
[constants.PROTO_NAME_IP, str(constants.PROTO_NUM_IP)],
None, None,
['blah', 'blah'], ['111', '111']])
for i, protocol in enumerate(protocols):

View File

@ -703,6 +703,18 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_protocol_as_any(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
protocol = 'any'
rule = self._build_security_group_rule(
security_group_id, 'ingress', protocol)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_protocol_as_number_with_port_bad(self):
# When specifying ports, neither can be None
name = 'webservers'

View File

@ -0,0 +1,8 @@
---
fixes:
- |
Add special treatment for the keyword ``any`` in the security group rule
API protocol field to match what is documented in the api-ref itself.
It is already supported in the client, where ``any`` is simply changed
to ``None``, so do the same in the API itself. For more information,
see bug `2074056 <https://bugs.launchpad.net/neutron/+bug/2074056>`_.