Add validators/converters needed by neutron-fwaas

This change adds additional validators and converters needed by the
neutron-fwaas API definition[1].

[1] https://review.openstack.org/389388

Co-Authored-By: ZhaoBo <zhaobo6@huawei.com>

Change-Id: If49391647dfa0cb1eeca4c80abdc7397eb639675
This commit is contained in:
Nate Johnston 2016-12-09 21:57:03 +00:00
parent 701f3f61fc
commit 100bc4e294
6 changed files with 211 additions and 3 deletions

View File

@ -15,6 +15,7 @@ from oslo_utils import strutils
import six import six
from neutron_lib._i18n import _ from neutron_lib._i18n import _
from neutron_lib.api import validators
from neutron_lib import constants from neutron_lib import constants
from neutron_lib import exceptions as n_exc from neutron_lib import exceptions as n_exc
@ -182,3 +183,53 @@ def convert_ip_to_canonical_format(value):
except netaddr.core.AddrFormatError: except netaddr.core.AddrFormatError:
pass pass
return value return value
def convert_string_to_case_insensitive(data):
"""Convert a string value into a lower case string.
This effectively makes the string case-insensitive.
:param data: The value to convert.
:return: The lower-cased string representation of the value, or None is
'data' is None.
:raises InvalidInput: If the value is not a string.
"""
try:
return data.lower()
except AttributeError:
error_message = _("Input value %s must be string type") % data
raise n_exc.InvalidInput(error_message=error_message)
def convert_to_protocol(data):
"""Validate that a specified IP protocol is valid.
For the authoritative list mapping protocol names to numbers, see the IANA:
http://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
:param data: The value to verify is an IP protocol.
:returns: If data is an int between 0 and 255 or None, return that; if
data is a string then return it lower-cased if it matches one of the
allowed protocol names.
:raises exceptions.InvalidInput: If data is an int < 0, an
int > 255, or a string that does not match one of the allowed protocol
names.
"""
if data is None:
return
val = convert_string_to_case_insensitive(data)
if val in constants.IPTABLES_PROTOCOL_MAP:
return data
error_message = _("IP protocol '%s' is not supported. Only protocol "
"names and their integer representation (0 to "
"255) are supported") % data
try:
if validators.validate_range(convert_to_int(data), [0, 255]) is None:
return data
else:
raise n_exc.InvalidInput(error_message=error_message)
except n_exc.InvalidInput:
raise n_exc.InvalidInput(error_message=error_message)

View File

@ -17,11 +17,12 @@ import functools
import inspect import inspect
import netaddr import netaddr
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import netutils
from oslo_utils import strutils
from oslo_utils import uuidutils from oslo_utils import uuidutils
import six import six
from neutron_lib._i18n import _ from neutron_lib._i18n import _
from neutron_lib.api import converters
from neutron_lib import constants from neutron_lib import constants
from neutron_lib import exceptions as n_exc from neutron_lib import exceptions as n_exc
@ -226,8 +227,8 @@ def validate_boolean(data, valid_values=None):
human readable message indicating why data is invalid. human readable message indicating why data is invalid.
""" """
try: try:
converters.convert_to_boolean(data) strutils.bool_from_string(data, strict=True)
except n_exc.InvalidInput: except ValueError:
msg = _("'%s' is not a valid boolean value") % data msg = _("'%s' is not a valid boolean value") % data
LOG.debug(msg) LOG.debug(msg)
return msg return msg
@ -838,6 +839,42 @@ def validate_non_negative(data, valid_values=None):
return msg return msg
def validate_port_range_or_none(data, valid_values=None):
"""Validate data is a range of TCP/UDP port numbers
:param data: The data to validate
:param valid_values: Not used!
:returns: None if data is an int between 0 and 65535, or two ints between 0
and 65535 with a colon between them, otherwise a human readable message as
to why data is invalid.
"""
if data is None:
return
if validate_string_or_none(data):
msg = _("Port range must be a string.")
LOG.debug(msg)
return msg
ports = data.split(':')
if len(ports) > 2:
msg = _("Port range must be two integers separated by a colon.")
LOG.debug(msg)
return msg
for p in ports:
if len(p) == 0:
msg = _("Port range must be two integers separated by a colon.")
LOG.debug(msg)
return msg
if not netutils.is_valid_port(p):
msg = _("Invalid port: %s.") % p
LOG.debug(msg)
return msg
if len(ports) > 1 and ports[0] > ports[1]:
msg = _("First port in a port range must be lower than the second "
"port.")
LOG.debug(msg)
return msg
def validate_subports(data, valid_values=None): def validate_subports(data, valid_values=None):
"""Validate data is a list of subnet port dicts. """Validate data is a list of subnet port dicts.
@ -910,6 +947,7 @@ validators = {'type:dict': validate_dict,
'type:mac_address_or_none': validate_mac_address_or_none, 'type:mac_address_or_none': validate_mac_address_or_none,
'type:nameservers': validate_nameservers, 'type:nameservers': validate_nameservers,
'type:non_negative': validate_non_negative, 'type:non_negative': validate_non_negative,
'type:port_range': validate_port_range_or_none,
'type:range': validate_range, 'type:range': validate_range,
'type:regex': validate_regex, 'type:regex': validate_regex,
'type:regex_or_none': validate_regex_or_none, 'type:regex_or_none': validate_regex_or_none,

View File

@ -194,6 +194,15 @@ IP_PROTOCOL_MAP = {PROTO_NAME_AH: PROTO_NUM_AH,
PROTO_NAME_UDPLITE: PROTO_NUM_UDPLITE, PROTO_NAME_UDPLITE: PROTO_NUM_UDPLITE,
PROTO_NAME_VRRP: PROTO_NUM_VRRP} PROTO_NAME_VRRP: PROTO_NUM_VRRP}
# Note that this differs from IP_PROTOCOL_MAP because iptables refers to IPv6
# ICMP as 'icmp6' whereas it is 'ipv6-icmp' in IP_PROTOCOL_MAP.
IPTABLES_PROTOCOL_MAP = {PROTO_NAME_DCCP: 'dccp',
PROTO_NAME_ICMP: 'icmp',
PROTO_NAME_IPV6_ICMP: 'icmp6',
PROTO_NAME_SCTP: 'sctp',
PROTO_NAME_TCP: 'tcp',
PROTO_NAME_UDP: 'udp'}
# ICMPv6 types: # ICMPv6 types:
# Destination Unreachable (1) # Destination Unreachable (1)
ICMPV6_TYPE_DEST_UNREACH = 1 ICMPV6_TYPE_DEST_UNREACH = 1

View File

@ -13,9 +13,11 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import six
import testtools import testtools
from neutron_lib.api import converters from neutron_lib.api import converters
from neutron_lib import constants
from neutron_lib import exceptions as n_exc from neutron_lib import exceptions as n_exc
from neutron_lib.tests import _base as base from neutron_lib.tests import _base as base
from neutron_lib.tests import _tools as tools from neutron_lib.tests import _tools as tools
@ -198,3 +200,56 @@ class TestConvertIPv6CanonicalFormat(base.BaseTestCase):
def test_convert_invalid_address(self): def test_convert_invalid_address(self):
result = converters.convert_ip_to_canonical_format("on") result = converters.convert_ip_to_canonical_format("on")
self.assertEqual("on", result) self.assertEqual("on", result)
class TestConvertStringToCaseInsensitive(base.BaseTestCase):
def test_convert_string_to_lower(self):
result = converters.convert_string_to_case_insensitive(u"THIS Is tEsT")
self.assertTrue(isinstance(result, six.string_types))
def test_assert_error_on_non_string(self):
for invalid in [[], 123]:
with testtools.ExpectedException(n_exc.InvalidInput):
converters.convert_string_to_case_insensitive(invalid)
class TestConvertProtocol(base.BaseTestCase):
def test_tcp_is_valid(self):
result = converters.convert_to_protocol(constants.PROTO_NAME_TCP)
self.assertEqual(constants.PROTO_NAME_TCP, result)
proto_num_str = str(constants.PROTO_NUM_TCP)
result = converters.convert_to_protocol(proto_num_str)
self.assertEqual(proto_num_str, result)
def test_udp_is_valid(self):
result = converters.convert_to_protocol(constants.PROTO_NAME_UDP)
self.assertEqual(constants.PROTO_NAME_UDP, result)
proto_num_str = str(constants.PROTO_NUM_UDP)
result = converters.convert_to_protocol(proto_num_str)
self.assertEqual(proto_num_str, result)
def test_icmp_is_valid(self):
result = converters.convert_to_protocol(constants.PROTO_NAME_ICMP)
self.assertEqual(constants.PROTO_NAME_ICMP, result)
proto_num_str = str(constants.PROTO_NUM_ICMP)
result = converters.convert_to_protocol(proto_num_str)
self.assertEqual(proto_num_str, result)
def test_numeric_is_valid(self):
proto_num_str = str(constants.PROTO_NUM_IGMP)
result = converters.convert_to_protocol(proto_num_str)
self.assertEqual(proto_num_str, result)
def test_numeric_too_high(self):
with testtools.ExpectedException(n_exc.InvalidInput):
converters.convert_to_protocol("300")
def test_numeric_too_low(self):
with testtools.ExpectedException(n_exc.InvalidInput):
converters.convert_to_protocol("-1")
def test_unknown_string(self):
with testtools.ExpectedException(n_exc.InvalidInput):
converters.convert_to_protocol("Invalid")

View File

@ -1073,3 +1073,53 @@ class TestValidateIPSubnetNone(base.BaseTestCase):
self.assertEqual(("'::1/2048' is neither a valid IP address, nor is " self.assertEqual(("'::1/2048' is neither a valid IP address, nor is "
"it a valid IP subnet"), "it a valid IP subnet"),
validators.validate_ip_or_subnet_or_none(testdata)) validators.validate_ip_or_subnet_or_none(testdata))
class TestPortRangeValidation(base.BaseTestCase):
def test_valid_port(self):
result = validators.validate_port_range_or_none("80")
self.assertIsNone(result)
def test_valid_range(self):
result = validators.validate_port_range_or_none("80:8888")
self.assertIsNone(result)
def test_port_too_high(self):
result = validators.validate_port_range_or_none("99999")
self.assertEqual(u"Invalid port: 99999.", result)
def test_port_too_low(self):
result = validators.validate_port_range_or_none("-1")
self.assertEqual(u"Invalid port: -1.", result)
def test_range_too_high(self):
result = validators.validate_port_range_or_none("80:99999")
self.assertEqual(u"Invalid port: 99999.", result)
def test_range_too_low(self):
result = validators.validate_port_range_or_none("-1:8888")
self.assertEqual(u"Invalid port: -1.", result)
def test_range_wrong_way(self):
result = validators.validate_port_range_or_none("8888:80")
self.assertEqual(u"First port in a port range must be lower than the "
"second port.", result)
def test_range_invalid(self):
result = validators.validate_port_range_or_none("DEAD:BEEF")
self.assertEqual(u"Invalid port: DEAD.", result)
def test_range_bad_input(self):
result = validators.validate_port_range_or_none(['a', 'b', 'c'])
self.assertEqual(u"Port range must be a string.", result)
def test_range_colon(self):
result = validators.validate_port_range_or_none(":")
self.assertEqual(u"Port range must be two integers separated by a "
"colon.", result)
def test_too_many_colons(self):
result = validators.validate_port_range_or_none("80:888:8888")
self.assertEqual(u"Port range must be two integers separated by a "
"colon.", result)

View File

@ -0,0 +1,5 @@
---
features:
- Added the converter ``convert_string_to_case_insensitive``.
- Added the converter ``convert_to_protocol``.
- Added the validator ``validate_port_range_or_none``.