diff --git a/quantum/api/v2/attributes.py b/quantum/api/v2/attributes.py index c6784084ca1..80a5dae8400 100644 --- a/quantum/api/v2/attributes.py +++ b/quantum/api/v2/attributes.py @@ -36,7 +36,7 @@ def _validate_boolean(data, valid_values=None): if data in [True, False]: return else: - msg = _("%s is not boolean") % data + msg = _("'%s' is not boolean") % data LOG.debug("validate_boolean: %s", msg) return msg @@ -51,6 +51,19 @@ def _validate_values(data, valid_values=None): return msg +def _validate_string(data, max_len=None): + if not isinstance(data, basestring): + msg = _("'%s' is not a valid string") % data + LOG.debug("validate_string: %s", msg) + return msg + if max_len is not None: + if len(data) > max_len: + msg = _("'%(data)s' exceeds maximum length of " + "%(max_len)s.") % locals() + LOG.debug("validate_string: %s", msg) + return msg + + def _validate_range(data, valid_values=None): min_value = valid_values[0] max_value = valid_values[1] @@ -69,7 +82,7 @@ def _validate_mac_address(data, valid_values=None): netaddr.EUI(data) return except Exception: - msg = _("%s is not a valid MAC address") % data + msg = _("'%s' is not a valid MAC address") % data LOG.debug("validate_mac_address: %s", msg) return msg @@ -79,11 +92,131 @@ def _validate_ip_address(data, valid_values=None): netaddr.IPAddress(data) return except Exception: - msg = _("%s is not a valid IP address") % data + msg = _("'%s' is not a valid IP address") % data LOG.debug("validate_ip_address: %s", msg) return msg +def _validate_ip_pools(data, valid_values=None): + """Validate that start and end IP addresses are present + + In addition to this the IP addresses will also be validated""" + if not isinstance(data, list): + msg = _("'%s' in not a valid IP pool") % data + LOG.debug("validate_ip_pools: %s", msg) + return msg + + expected_keys = set(['start', 'end']) + try: + for ip_pool in data: + if set(ip_pool.keys()) != expected_keys: + msg = _("Expected keys not found. Expected: %s " + "Provided: %s") % (expected_keys, ip_pool.keys()) + LOG.debug("validate_ip_pools: %s", msg) + return msg + for k in expected_keys: + msg = _validate_ip_address(ip_pool[k]) + if msg: + LOG.debug("validate_ip_pools: %s", msg) + return msg + except KeyError, e: + args = {'key_name': e.message, 'ip_pool': ip_pool} + msg = _("Invalid input. Required key: '%(key_name)s' " + "missing from %(ip_pool)s.") % args + LOG.debug("validate_ip_pools: %s", msg) + return msg + except TypeError, e: + msg = _("Invalid input. Pool %s must be a dictionary.") % ip_pool + LOG.debug("validate_ip_pools: %s", msg) + return msg + except Exception: + msg = _("'%s' in not a valid IP pool") % data + LOG.debug("validate_ip_pools: %s", msg) + return msg + + +def _validate_fixed_ips(data, valid_values=None): + if not isinstance(data, list): + msg = _("'%s' in not a valid fixed IP") % data + LOG.debug("validate_fixed_ips: %s", msg) + return msg + ips = [] + try: + for fixed_ip in data: + if 'ip_address' in fixed_ip: + msg = _validate_ip_address(fixed_ip['ip_address']) + if msg: + LOG.debug("validate_fixed_ips: %s", msg) + return msg + if 'subnet_id' in fixed_ip: + msg = _validate_regex(fixed_ip['subnet_id'], UUID_PATTERN) + if msg: + LOG.debug("validate_fixed_ips: %s", msg) + return msg + # Ensure that duplicate entries are not set - just checking IP + # suffices. Duplicate subnet_id's are legitimate. + if 'ip_address' in fixed_ip: + if fixed_ip['ip_address'] in ips: + msg = _("Duplicate entry %s") % fixed_ip + LOG.debug("validate_fixed_ips: %s", msg) + return msg + ips.append(fixed_ip['ip_address']) + except Exception: + msg = _("'%s' in not a valid fixed IP") % data + LOG.debug("validate_fixed_ips: %s", msg) + return msg + + +def _validate_nameservers(data, valid_values=None): + if not hasattr(data, '__iter__'): + msg = _("'%s' in not a valid nameserver") % data + LOG.debug("validate_nameservers: %s", msg) + return msg + ips = set() + for ip in data: + msg = _validate_ip_address(ip) + if msg: + # This may be a hostname + msg = _validate_regex(ip, HOSTNAME_PATTERN) + if msg: + msg = _("'%s' in not a valid nameserver") % ip + LOG.debug("validate_nameservers: %s", msg) + return msg + if ip in ips: + msg = _("Duplicate nameserver %s") % ip + LOG.debug("validate_nameservers: %s", msg) + return msg + ips.add(ip) + + +def _validate_hostroutes(data, valid_values=None): + if not isinstance(data, list): + msg = _("'%s' in not a valid hostroute") % data + LOG.debug("validate_hostroutes: %s", msg) + return msg + hostroutes = [] + try: + for hostroute in data: + msg = _validate_subnet(hostroute['destination']) + if msg: + LOG.debug("validate_hostroutes: %s", msg) + return msg + msg = _validate_ip_address(hostroute['nexthop']) + if msg: + LOG.debug("validate_hostroutes: %s", msg) + return msg + if hostroute in hostroutes: + msg = _("Duplicate hostroute %s") % hostroute + LOG.debug("validate_hostroutes: %s", msg) + if msg: + return msg + hostroutes.append(hostroute) + except: + msg = _("'%s' in not a valid hostroute") % data + LOG.debug("validate_hostroutes: %s", msg) + return msg + + def _validate_ip_address_or_none(data, valid_values=None): if data is None: return None @@ -98,19 +231,21 @@ def _validate_subnet(data, valid_values=None): except Exception: pass - msg = _("%s is not a valid IP subnet") % data + msg = _("'%s' is not a valid IP subnet") % data LOG.debug("validate_subnet: %s", msg) return msg def _validate_regex(data, valid_values=None): - match = re.match(valid_values, data) - if match: - return - else: - msg = _("%s is not valid") % data - LOG.debug("validate_regex: %s", msg) - return msg + try: + if re.match(valid_values, data): + return + except TypeError: + pass + + msg = _("'%s' is not valid input") % data + LOG.debug("validate_regex: %s", msg) + return msg def _validate_uuid(data, valid_values=None): @@ -131,15 +266,23 @@ def convert_to_boolean(data): return True else: return False - except ValueError, TypeError: + except (ValueError, TypeError): if (data == "True" or data == "true"): return True if (data == "False" or data == "false"): return False - msg = _("%s is not boolean") % data + msg = _("'%s' is not boolean") % data raise q_exc.InvalidInput(error_message=msg) +def convert_to_int(data): + try: + return int(data) + except (ValueError, TypeError): + msg = _("'%s' is not a integer") % data + raise q_exc.InvalidInput(error_message=msg) + + def convert_kvp_str_to_list(data): """Convert a value of the form 'key=value' to ['key', 'value']. @@ -170,6 +313,8 @@ def convert_kvp_list_to_dict(kvp_list): kvp_map[key].add(value) return dict((x, list(y)) for x, y in kvp_map.iteritems()) +HOSTNAME_PATTERN = ("(?=^.{1,254}$)(^(?:(?!\d+\.|-)[a-zA-Z0-9_\-]" + "{1,63}(? cfg.CONF.max_dns_nameservers: raise q_exc.DNSNameServersExhausted( subnet_id=id, @@ -969,8 +969,8 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2): error_message=("error parsing dns address %s" % dns)) self._validate_ip_version(ip_ver, dns, 'dns_nameserver') - if 'host_routes' in s and \ - s['host_routes'] != attributes.ATTR_NOT_SPECIFIED: + if ('host_routes' in s and + s['host_routes'] != attributes.ATTR_NOT_SPECIFIED): if len(s['host_routes']) > cfg.CONF.max_subnet_host_routes: raise q_exc.HostRoutesExhausted( subnet_id=id, diff --git a/quantum/tests/unit/test_attributes.py b/quantum/tests/unit/test_attributes.py index a140e53985c..08654c6d1aa 100644 --- a/quantum/tests/unit/test_attributes.py +++ b/quantum/tests/unit/test_attributes.py @@ -23,6 +23,90 @@ from quantum.common import exceptions as q_exc class TestAttributes(unittest2.TestCase): + def test_strings(self): + msg = attributes._validate_string(None, None) + self.assertEquals(msg, "'None' is not a valid string") + + msg = attributes._validate_string("OK", None) + self.assertEquals(msg, None) + + msg = attributes._validate_string("123456789", 9) + self.assertIsNone(msg) + + msg = attributes._validate_string("1234567890", 9) + self.assertIsNotNone(msg) + + def test_ip_pools(self): + pools = [[{'end': '10.0.0.254'}], + [{'start': '10.0.0.254'}], + [{'start': '1000.0.0.254', + 'end': '1.1.1.1'}], + [{'start': '10.0.0.2', 'end': '10.0.0.254', + 'forza': 'juve'}], + [{'start': '10.0.0.2', 'end': '10.0.0.254'}, + {'end': '10.0.0.254'}], + None] + for pool in pools: + msg = attributes._validate_ip_pools(pool, None) + self.assertIsNotNone(msg) + + pools = [[{'end': '10.0.0.254', 'start': '10.0.0.2'}, + {'start': '11.0.0.2', 'end': '11.1.1.1'}], + [{'start': '11.0.0.2', 'end': '11.0.0.100'}]] + for pool in pools: + msg = attributes._validate_ip_pools(pool, None) + self.assertIsNone(msg) + + def test_fixed_ips(self): + fixed_ips = [[{'subnet_id': '00000000-ffff-ffff-ffff-000000000000', + 'ip_address': '1111.1.1.1'}], + [{'subnet_id': 'invalid'}], + None, + [{'subnet_id': '00000000-0fff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.1'}, + {'subnet_id': '00000000-ffff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.1'}], + [{'subnet_id': '00000000-ffff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.1'}, + {'subnet_id': '00000000-ffff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.1'}]] + for fixed in fixed_ips: + msg = attributes._validate_fixed_ips(fixed, None) + self.assertIsNotNone(msg) + + def test_nameservers(self): + ns_pools = [['1.1.1.2', '1.1.1.2'], + ['www.hostname.com', 'www.hostname.com'], + ['77.hostname.com'], + ['1000.0.0.1'], + None] + + for ns in ns_pools: + msg = attributes._validate_nameservers(ns, None) + self.assertIsNotNone(msg) + + ns_pools = [['100.0.0.2'], + ['www.hostname.com'], + ['www.great.marathons.to.travel'], + ['valid'], + ['www.internal.hostname.com']] + + for ns in ns_pools: + msg = attributes._validate_nameservers(ns, None) + self.assertIsNone(msg) + + def test_hostroutes(self): + hostroute_pools = [[{'destination': '100.0.0.0/24'}], + [{'nexthop': '10.0.2.20'}], + [{'nexthop': '10.0.2.20', + 'destination': '100.0.0.0/8'}, + {'nexthop': '10.0.2.20', + 'destination': '100.0.0.0/8'}], + None] + for host_routes in hostroute_pools: + msg = attributes._validate_hostroutes(host_routes, None) + self.assertIsNotNone(msg) + def test_mac_addresses(self): # Valid - 3 octets base_mac = "fa:16:3e:00:00:00" @@ -40,50 +124,43 @@ class TestAttributes(unittest2.TestCase): base_mac = "01:16:3e:4f:00:00" msg = attributes._validate_regex(base_mac, attributes.MAC_PATTERN) - error = '%s is not valid' % base_mac - self.assertEquals(msg, error) + self.assertIsNotNone(msg) # Invalid - invalid format base_mac = "a:16:3e:4f:00:00" msg = attributes._validate_regex(base_mac, attributes.MAC_PATTERN) - error = '%s is not valid' % base_mac - self.assertEquals(msg, error) + self.assertIsNotNone(msg) # Invalid - invalid format base_mac = "ffa:16:3e:4f:00:00" msg = attributes._validate_regex(base_mac, attributes.MAC_PATTERN) - error = '%s is not valid' % base_mac - self.assertEquals(msg, error) + self.assertIsNotNone(msg) # Invalid - invalid format base_mac = "01163e4f0000" msg = attributes._validate_regex(base_mac, attributes.MAC_PATTERN) - error = '%s is not valid' % base_mac - self.assertEquals(msg, error) + self.assertIsNotNone(msg) # Invalid - invalid format base_mac = "01-16-3e-4f-00-00" msg = attributes._validate_regex(base_mac, attributes.MAC_PATTERN) - error = '%s is not valid' % base_mac - self.assertEquals(msg, error) + self.assertIsNotNone(msg) # Invalid - invalid format base_mac = "00:16:3:f:00:00" msg = attributes._validate_regex(base_mac, attributes.MAC_PATTERN) - error = '%s is not valid' % base_mac - self.assertEquals(msg, error) + self.assertIsNotNone(msg) # Invalid - invalid format base_mac = "12:3:4:5:67:89ab" msg = attributes._validate_regex(base_mac, attributes.MAC_PATTERN) - error = '%s is not valid' % base_mac - self.assertEquals(msg, error) + self.assertIsNotNone(msg) def test_cidr(self): # Valid - IPv4 @@ -108,21 +185,21 @@ class TestAttributes(unittest2.TestCase): cidr = "10.0.2.0" msg = attributes._validate_subnet(cidr, None) - error = "%s is not a valid IP subnet" % cidr + error = "'%s' is not a valid IP subnet" % cidr self.assertEquals(msg, error) # Invalid - IPv6 without final octets, missing mask cidr = "fe80::" msg = attributes._validate_subnet(cidr, None) - error = "%s is not a valid IP subnet" % cidr + error = "'%s' is not a valid IP subnet" % cidr self.assertEquals(msg, error) # Invalid - IPv6 with final octets, missing mask cidr = "fe80::0" msg = attributes._validate_subnet(cidr, None) - error = "%s is not a valid IP subnet" % cidr + error = "'%s' is not a valid IP subnet" % cidr self.assertEquals(msg, error) diff --git a/quantum/tests/unit/test_db_plugin.py b/quantum/tests/unit/test_db_plugin.py index 14ac1594d43..b6fcc5c3533 100644 --- a/quantum/tests/unit/test_db_plugin.py +++ b/quantum/tests/unit/test_db_plugin.py @@ -352,12 +352,12 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): def _do_side_effect(self, patched_plugin, orig, *args, **kwargs): """ Invoked by test cases for injecting failures in plugin """ def second_call(*args, **kwargs): - raise Exception('boom') + raise AttributeError patched_plugin.side_effect = second_call return orig(*args, **kwargs) def _validate_behavior_on_bulk_failure(self, res, collection): - self.assertEqual(res.status_int, 500) + self.assertEqual(res.status_int, 400) req = self.new_list_request(collection) res = req.get_response(self.api) self.assertEquals(res.status_int, 200) @@ -559,6 +559,14 @@ class TestV2HTTPResponse(QuantumDbPluginV2TestCase): res = req.get_response(self.api) self.assertEquals(res.status_int, 200) + def test_update_invalid_json_400(self): + with self.network() as net: + req = self.new_update_request('networks', + '{{"name": "aaa"}}', + net['network']['id']) + res = req.get_response(self.api) + self.assertEquals(res.status_int, 400) + def test_bad_route_404(self): req = self.new_list_request('doohickeys') res = req.get_response(self.api) @@ -804,6 +812,13 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s self.assertEqual(res['port']['admin_state_up'], data['port']['admin_state_up']) + def test_update_device_id_null(self): + with self.port() as port: + data = {'port': {'device_id': None}} + req = self.new_update_request('ports', data, port['port']['id']) + res = req.get_response(self.api) + self.assertEquals(res.status_int, 400) + def test_delete_network_if_port_exists(self): fmt = 'json' with self.port() as port: @@ -1024,6 +1039,19 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s res = self._create_port(fmt, net_id=net_id, **kwargs) self.assertEquals(res.status_int, 400) + def test_overlapping_subnets(self): + fmt = 'json' + with self.subnet() as subnet: + tenant_id = subnet['subnet']['tenant_id'] + net_id = subnet['subnet']['network_id'] + res = self._create_subnet(fmt, + tenant_id=tenant_id, + net_id=net_id, + cidr='10.0.0.225/28', + ip_version=4, + gateway_ip=ATTR_NOT_SPECIFIED) + self.assertEquals(res.status_int, 400) + def test_requested_subnet_id_v4_and_v6(self): fmt = 'json' with self.subnet() as subnet: @@ -1154,6 +1182,17 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id']) self._delete('ports', port2['port']['id']) + def test_invalid_ip(self): + fmt = 'json' + with self.subnet() as subnet: + # Allocate specific IP + kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'], + 'ip_address': '1011.0.0.5'}]} + net_id = subnet['subnet']['network_id'] + res = self._create_port(fmt, net_id=net_id, **kwargs) + port = self.deserialize(fmt, res) + self.assertEquals(res.status_int, 400) + def test_requested_split(self): fmt = 'json' with self.subnet() as subnet: @@ -1201,7 +1240,29 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s net_id = subnet['subnet']['network_id'] res = self._create_port(fmt, net_id=net_id, **kwargs) port2 = self.deserialize(fmt, res) - self.assertEquals(res.status_int, 500) + self.assertEquals(res.status_int, 400) + + def test_fixed_ip_invalid_subnet_id(self): + fmt = 'json' + with self.subnet() as subnet: + # Allocate specific IP + kwargs = {"fixed_ips": [{'subnet_id': 'i am invalid', + 'ip_address': '10.0.0.5'}]} + net_id = subnet['subnet']['network_id'] + res = self._create_port(fmt, net_id=net_id, **kwargs) + port2 = self.deserialize(fmt, res) + self.assertEquals(res.status_int, 400) + + def test_fixed_ip_invalid_ip(self): + fmt = 'json' + with self.subnet() as subnet: + # Allocate specific IP + kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'], + 'ip_address': '10.0.0.55555'}]} + net_id = subnet['subnet']['network_id'] + res = self._create_port(fmt, net_id=net_id, **kwargs) + port2 = self.deserialize(fmt, res) + self.assertEquals(res.status_int, 400) def test_requested_ips_only(self): fmt = 'json' @@ -1956,6 +2017,119 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase): res = subnet_req.get_response(self.api) self.assertEquals(res.status_int, 403) + def test_create_subnet_bad_ip_version(self): + with self.network() as network: + # Check bad IP version + data = {'subnet': {'network_id': network['network']['id'], + 'cidr': '10.0.2.0/24', + 'ip_version': 'abc', + 'tenant_id': network['network']['tenant_id'], + 'gateway_ip': '10.0.2.1'}} + subnet_req = self.new_create_request('subnets', data) + res = subnet_req.get_response(self.api) + self.assertEquals(res.status_int, 400) + + def test_create_subnet_bad_ip_version_null(self): + with self.network() as network: + # Check bad IP version + data = {'subnet': {'network_id': network['network']['id'], + 'cidr': '10.0.2.0/24', + 'ip_version': None, + 'tenant_id': network['network']['tenant_id'], + 'gateway_ip': '10.0.2.1'}} + subnet_req = self.new_create_request('subnets', data) + res = subnet_req.get_response(self.api) + self.assertEquals(res.status_int, 400) + + def test_create_subnet_bad_uuid(self): + with self.network() as network: + # Check invalid UUID + data = {'subnet': {'network_id': None, + 'cidr': '10.0.2.0/24', + 'ip_version': 4, + 'tenant_id': network['network']['tenant_id'], + 'gateway_ip': '10.0.2.1'}} + subnet_req = self.new_create_request('subnets', data) + res = subnet_req.get_response(self.api) + self.assertEquals(res.status_int, 400) + + def test_create_subnet_bad_boolean(self): + with self.network() as network: + # Check invalid boolean + data = {'subnet': {'network_id': network['network']['id'], + 'cidr': '10.0.2.0/24', + 'ip_version': '4', + 'enable_dhcp': None, + 'tenant_id': network['network']['tenant_id'], + 'gateway_ip': '10.0.2.1'}} + subnet_req = self.new_create_request('subnets', data) + res = subnet_req.get_response(self.api) + self.assertEquals(res.status_int, 400) + + def test_create_subnet_bad_pools(self): + with self.network() as network: + # Check allocation pools + allocation_pools = [[{'end': '10.0.0.254'}], + [{'start': '10.0.0.254'}], + [{'start': '1000.0.0.254'}], + [{'start': '10.0.0.2', 'end': '10.0.0.254'}, + {'end': '10.0.0.254'}], + None, + [{'start': '10.0.0.2', 'end': '10.0.0.3'}, + {'start': '10.0.0.2', 'end': '10.0.0.3'}]] + tenant_id = network['network']['tenant_id'] + for pool in allocation_pools: + data = {'subnet': {'network_id': network['network']['id'], + 'cidr': '10.0.2.0/24', + 'ip_version': '4', + 'tenant_id': tenant_id, + 'gateway_ip': '10.0.2.1', + 'allocation_pools': pool}} + subnet_req = self.new_create_request('subnets', data) + res = subnet_req.get_response(self.api) + self.assertEquals(res.status_int, 400) + + def test_create_subnet_bad_nameserver(self): + with self.network() as network: + # Check nameservers + nameserver_pools = [['1100.0.0.2'], + ['1.1.1.2', '1.1000.1.3'], + ['1.1.1.2', '1.1.1.2'], + None] + tenant_id = network['network']['tenant_id'] + for nameservers in nameserver_pools: + data = {'subnet': {'network_id': network['network']['id'], + 'cidr': '10.0.2.0/24', + 'ip_version': '4', + 'tenant_id': tenant_id, + 'gateway_ip': '10.0.2.1', + 'dns_nameservers': nameservers}} + subnet_req = self.new_create_request('subnets', data) + res = subnet_req.get_response(self.api) + self.assertEquals(res.status_int, 400) + + def test_create_subnet_bad_hostroutes(self): + with self.network() as network: + # Check hostroutes + hostroute_pools = [[{'destination': '100.0.0.0/24'}], + [{'nexthop': '10.0.2.20'}], + [{'nexthop': '10.0.2.20', + 'destination': '100.0.0.0/8'}, + {'nexthop': '10.0.2.20', + 'destination': '100.0.0.0/8'}], + None] + tenant_id = network['network']['tenant_id'] + for hostroutes in hostroute_pools: + data = {'subnet': {'network_id': network['network']['id'], + 'cidr': '10.0.2.0/24', + 'ip_version': '4', + 'tenant_id': tenant_id, + 'gateway_ip': '10.0.2.1', + 'host_routes': hostroutes}} + subnet_req = self.new_create_request('subnets', data) + res = subnet_req.get_response(self.api) + self.assertEquals(res.status_int, 400) + def test_create_subnet_defaults(self): gateway = '10.0.0.1' cidr = '10.0.0.0/24'