Merge "Add validation for security_groups parameter in Port"
This commit is contained in:
commit
7c22311b61
|
@ -16,6 +16,7 @@
|
|||
import abc
|
||||
|
||||
import netaddr
|
||||
from neutron_lib.api import converters
|
||||
from neutron_lib.api import extensions as api_extensions
|
||||
from neutron_lib.api import validators
|
||||
from neutron_lib import constants as const
|
||||
|
@ -23,7 +24,6 @@ from neutron_lib.db import constants as db_const
|
|||
from neutron_lib import exceptions as nexception
|
||||
from neutron_lib.plugins import directory
|
||||
from oslo_utils import netutils
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
|
||||
from neutron._i18n import _
|
||||
|
@ -184,16 +184,6 @@ def convert_validate_port_value(port):
|
|||
raise SecurityGroupInvalidPortValue(port=port)
|
||||
|
||||
|
||||
def convert_to_uuid_list_or_none(value_list):
|
||||
if value_list is None:
|
||||
return
|
||||
for sg_id in value_list:
|
||||
if not uuidutils.is_uuid_like(sg_id):
|
||||
msg = _("'%s' is not an integer or uuid") % sg_id
|
||||
raise nexception.InvalidInput(error_message=msg)
|
||||
return value_list
|
||||
|
||||
|
||||
def convert_ip_prefix_to_cidr(ip_prefix):
|
||||
if not ip_prefix:
|
||||
return
|
||||
|
@ -279,7 +269,9 @@ EXTENDED_ATTRIBUTES_2_0 = {
|
|||
'ports': {SECURITYGROUPS: {'allow_post': True,
|
||||
'allow_put': True,
|
||||
'is_visible': True,
|
||||
'convert_to': convert_to_uuid_list_or_none,
|
||||
'convert_to':
|
||||
converters.convert_none_to_empty_list,
|
||||
'validate': {'type:uuid_list': None},
|
||||
'default': const.ATTR_NOT_SPECIFIED}}}
|
||||
|
||||
# Register the configuration options
|
||||
|
|
|
@ -1380,6 +1380,25 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
|||
res['port'].get(ext_sg.SECURITYGROUPS))
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_update_port_with_invalid_type_in_security_groups_param(self):
|
||||
with self.network() as n:
|
||||
with self.subnet(n):
|
||||
with self.security_group() as sg:
|
||||
res = self._create_port(self.fmt, n['network']['id'],
|
||||
security_groups=(
|
||||
[sg['security_group']['id']]))
|
||||
port = self.deserialize(self.fmt, res)
|
||||
|
||||
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
|
||||
'name': port['port']['name'],
|
||||
'security_groups': True}}
|
||||
|
||||
req = self.new_update_request('ports', data,
|
||||
port['port']['id'])
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(webob.exc.HTTPBadRequest.code,
|
||||
res.status_int)
|
||||
|
||||
def test_create_port_with_bad_security_group(self):
|
||||
with self.network() as n:
|
||||
with self.subnet(n):
|
||||
|
@ -1389,6 +1408,14 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
|||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_create_port_with_invalid_type_in_security_groups_param(self):
|
||||
with self.network() as n:
|
||||
with self.subnet(n):
|
||||
res = self._create_port(self.fmt, n['network']['id'],
|
||||
security_groups=True)
|
||||
|
||||
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
|
||||
|
||||
def test_create_delete_security_group_port_in_use(self):
|
||||
with self.network() as n:
|
||||
with self.subnet(n):
|
||||
|
|
Loading…
Reference in New Issue