diff --git a/neutron_lib/api/converters.py b/neutron_lib/api/converters.py index 803160ba3..bc3efa329 100644 --- a/neutron_lib/api/converters.py +++ b/neutron_lib/api/converters.py @@ -15,6 +15,7 @@ from oslo_utils import strutils import six from neutron_lib._i18n import _ +from neutron_lib.api import validators from neutron_lib import constants from neutron_lib import exceptions as n_exc @@ -182,3 +183,53 @@ def convert_ip_to_canonical_format(value): except netaddr.core.AddrFormatError: pass return value + + +def convert_string_to_case_insensitive(data): + """Convert a string value into a lower case string. + + This effectively makes the string case-insensitive. + + :param data: The value to convert. + :return: The lower-cased string representation of the value, or None is + 'data' is None. + :raises InvalidInput: If the value is not a string. + """ + try: + return data.lower() + except AttributeError: + error_message = _("Input value %s must be string type") % data + raise n_exc.InvalidInput(error_message=error_message) + + +def convert_to_protocol(data): + """Validate that a specified IP protocol is valid. + + For the authoritative list mapping protocol names to numbers, see the IANA: + http://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml + + :param data: The value to verify is an IP protocol. + :returns: If data is an int between 0 and 255 or None, return that; if + data is a string then return it lower-cased if it matches one of the + allowed protocol names. + :raises exceptions.InvalidInput: If data is an int < 0, an + int > 255, or a string that does not match one of the allowed protocol + names. + """ + + if data is None: + return + val = convert_string_to_case_insensitive(data) + if val in constants.IPTABLES_PROTOCOL_MAP: + return data + + error_message = _("IP protocol '%s' is not supported. Only protocol " + "names and their integer representation (0 to " + "255) are supported") % data + try: + if validators.validate_range(convert_to_int(data), [0, 255]) is None: + return data + else: + raise n_exc.InvalidInput(error_message=error_message) + except n_exc.InvalidInput: + raise n_exc.InvalidInput(error_message=error_message) diff --git a/neutron_lib/api/validators.py b/neutron_lib/api/validators.py index c6f1ad8de..2d8145133 100644 --- a/neutron_lib/api/validators.py +++ b/neutron_lib/api/validators.py @@ -17,11 +17,12 @@ import functools import inspect import netaddr from oslo_log import log as logging +from oslo_utils import netutils +from oslo_utils import strutils from oslo_utils import uuidutils import six from neutron_lib._i18n import _ -from neutron_lib.api import converters from neutron_lib import constants from neutron_lib import exceptions as n_exc @@ -226,8 +227,8 @@ def validate_boolean(data, valid_values=None): human readable message indicating why data is invalid. """ try: - converters.convert_to_boolean(data) - except n_exc.InvalidInput: + strutils.bool_from_string(data, strict=True) + except ValueError: msg = _("'%s' is not a valid boolean value") % data LOG.debug(msg) return msg @@ -838,6 +839,42 @@ def validate_non_negative(data, valid_values=None): return msg +def validate_port_range_or_none(data, valid_values=None): + """Validate data is a range of TCP/UDP port numbers + + :param data: The data to validate + :param valid_values: Not used! + :returns: None if data is an int between 0 and 65535, or two ints between 0 + and 65535 with a colon between them, otherwise a human readable message as + to why data is invalid. + """ + if data is None: + return + if validate_string_or_none(data): + msg = _("Port range must be a string.") + LOG.debug(msg) + return msg + ports = data.split(':') + if len(ports) > 2: + msg = _("Port range must be two integers separated by a colon.") + LOG.debug(msg) + return msg + for p in ports: + if len(p) == 0: + msg = _("Port range must be two integers separated by a colon.") + LOG.debug(msg) + return msg + if not netutils.is_valid_port(p): + msg = _("Invalid port: %s.") % p + LOG.debug(msg) + return msg + if len(ports) > 1 and ports[0] > ports[1]: + msg = _("First port in a port range must be lower than the second " + "port.") + LOG.debug(msg) + return msg + + def validate_subports(data, valid_values=None): """Validate data is a list of subnet port dicts. @@ -910,6 +947,7 @@ validators = {'type:dict': validate_dict, 'type:mac_address_or_none': validate_mac_address_or_none, 'type:nameservers': validate_nameservers, 'type:non_negative': validate_non_negative, + 'type:port_range': validate_port_range_or_none, 'type:range': validate_range, 'type:regex': validate_regex, 'type:regex_or_none': validate_regex_or_none, diff --git a/neutron_lib/constants.py b/neutron_lib/constants.py index d4633abb1..8916a6407 100644 --- a/neutron_lib/constants.py +++ b/neutron_lib/constants.py @@ -194,6 +194,15 @@ IP_PROTOCOL_MAP = {PROTO_NAME_AH: PROTO_NUM_AH, PROTO_NAME_UDPLITE: PROTO_NUM_UDPLITE, PROTO_NAME_VRRP: PROTO_NUM_VRRP} +# Note that this differs from IP_PROTOCOL_MAP because iptables refers to IPv6 +# ICMP as 'icmp6' whereas it is 'ipv6-icmp' in IP_PROTOCOL_MAP. +IPTABLES_PROTOCOL_MAP = {PROTO_NAME_DCCP: 'dccp', + PROTO_NAME_ICMP: 'icmp', + PROTO_NAME_IPV6_ICMP: 'icmp6', + PROTO_NAME_SCTP: 'sctp', + PROTO_NAME_TCP: 'tcp', + PROTO_NAME_UDP: 'udp'} + # ICMPv6 types: # Destination Unreachable (1) ICMPV6_TYPE_DEST_UNREACH = 1 diff --git a/neutron_lib/tests/unit/api/test_conversions.py b/neutron_lib/tests/unit/api/test_conversions.py index 4ad38dbf8..efe9a8124 100644 --- a/neutron_lib/tests/unit/api/test_conversions.py +++ b/neutron_lib/tests/unit/api/test_conversions.py @@ -13,9 +13,11 @@ # License for the specific language governing permissions and limitations # under the License. +import six import testtools from neutron_lib.api import converters +from neutron_lib import constants from neutron_lib import exceptions as n_exc from neutron_lib.tests import _base as base from neutron_lib.tests import _tools as tools @@ -198,3 +200,56 @@ class TestConvertIPv6CanonicalFormat(base.BaseTestCase): def test_convert_invalid_address(self): result = converters.convert_ip_to_canonical_format("on") self.assertEqual("on", result) + + +class TestConvertStringToCaseInsensitive(base.BaseTestCase): + + def test_convert_string_to_lower(self): + result = converters.convert_string_to_case_insensitive(u"THIS Is tEsT") + self.assertTrue(isinstance(result, six.string_types)) + + def test_assert_error_on_non_string(self): + for invalid in [[], 123]: + with testtools.ExpectedException(n_exc.InvalidInput): + converters.convert_string_to_case_insensitive(invalid) + + +class TestConvertProtocol(base.BaseTestCase): + + def test_tcp_is_valid(self): + result = converters.convert_to_protocol(constants.PROTO_NAME_TCP) + self.assertEqual(constants.PROTO_NAME_TCP, result) + proto_num_str = str(constants.PROTO_NUM_TCP) + result = converters.convert_to_protocol(proto_num_str) + self.assertEqual(proto_num_str, result) + + def test_udp_is_valid(self): + result = converters.convert_to_protocol(constants.PROTO_NAME_UDP) + self.assertEqual(constants.PROTO_NAME_UDP, result) + proto_num_str = str(constants.PROTO_NUM_UDP) + result = converters.convert_to_protocol(proto_num_str) + self.assertEqual(proto_num_str, result) + + def test_icmp_is_valid(self): + result = converters.convert_to_protocol(constants.PROTO_NAME_ICMP) + self.assertEqual(constants.PROTO_NAME_ICMP, result) + proto_num_str = str(constants.PROTO_NUM_ICMP) + result = converters.convert_to_protocol(proto_num_str) + self.assertEqual(proto_num_str, result) + + def test_numeric_is_valid(self): + proto_num_str = str(constants.PROTO_NUM_IGMP) + result = converters.convert_to_protocol(proto_num_str) + self.assertEqual(proto_num_str, result) + + def test_numeric_too_high(self): + with testtools.ExpectedException(n_exc.InvalidInput): + converters.convert_to_protocol("300") + + def test_numeric_too_low(self): + with testtools.ExpectedException(n_exc.InvalidInput): + converters.convert_to_protocol("-1") + + def test_unknown_string(self): + with testtools.ExpectedException(n_exc.InvalidInput): + converters.convert_to_protocol("Invalid") diff --git a/neutron_lib/tests/unit/api/test_validators.py b/neutron_lib/tests/unit/api/test_validators.py index d92f45a4e..7aa0e62da 100644 --- a/neutron_lib/tests/unit/api/test_validators.py +++ b/neutron_lib/tests/unit/api/test_validators.py @@ -1073,3 +1073,53 @@ class TestValidateIPSubnetNone(base.BaseTestCase): self.assertEqual(("'::1/2048' is neither a valid IP address, nor is " "it a valid IP subnet"), validators.validate_ip_or_subnet_or_none(testdata)) + + +class TestPortRangeValidation(base.BaseTestCase): + + def test_valid_port(self): + result = validators.validate_port_range_or_none("80") + self.assertIsNone(result) + + def test_valid_range(self): + result = validators.validate_port_range_or_none("80:8888") + self.assertIsNone(result) + + def test_port_too_high(self): + result = validators.validate_port_range_or_none("99999") + self.assertEqual(u"Invalid port: 99999.", result) + + def test_port_too_low(self): + result = validators.validate_port_range_or_none("-1") + self.assertEqual(u"Invalid port: -1.", result) + + def test_range_too_high(self): + result = validators.validate_port_range_or_none("80:99999") + self.assertEqual(u"Invalid port: 99999.", result) + + def test_range_too_low(self): + result = validators.validate_port_range_or_none("-1:8888") + self.assertEqual(u"Invalid port: -1.", result) + + def test_range_wrong_way(self): + result = validators.validate_port_range_or_none("8888:80") + self.assertEqual(u"First port in a port range must be lower than the " + "second port.", result) + + def test_range_invalid(self): + result = validators.validate_port_range_or_none("DEAD:BEEF") + self.assertEqual(u"Invalid port: DEAD.", result) + + def test_range_bad_input(self): + result = validators.validate_port_range_or_none(['a', 'b', 'c']) + self.assertEqual(u"Port range must be a string.", result) + + def test_range_colon(self): + result = validators.validate_port_range_or_none(":") + self.assertEqual(u"Port range must be two integers separated by a " + "colon.", result) + + def test_too_many_colons(self): + result = validators.validate_port_range_or_none("80:888:8888") + self.assertEqual(u"Port range must be two integers separated by a " + "colon.", result) diff --git a/releasenotes/notes/fwaas_converters_validators-c310900b4386146e.yaml b/releasenotes/notes/fwaas_converters_validators-c310900b4386146e.yaml new file mode 100644 index 000000000..62b82aeab --- /dev/null +++ b/releasenotes/notes/fwaas_converters_validators-c310900b4386146e.yaml @@ -0,0 +1,5 @@ +--- +features: + - Added the converter ``convert_string_to_case_insensitive``. + - Added the converter ``convert_to_protocol``. + - Added the validator ``validate_port_range_or_none``.