api: Add request body schemas for SG APIs

These are deprecated but there's value in having a proper - if loose -
schema in place for API documentation purposes. Also, doing things this
way allows us to remove a whole load of hand-rolled stuff.

Change-Id: I4106cfa2a09d135f12892ed6d1f42f4151dc72e4
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2024-02-16 20:07:09 +00:00
parent 7dc4b1ea62
commit 847608e75a
7 changed files with 239 additions and 161 deletions

View File

@ -0,0 +1,16 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# NOTE(stephenfin): These schemas are intentionally empty since these APIs have
# been removed
create = {}

View File

@ -14,6 +14,69 @@
from nova.api.validation import parameter_types
create = {
'type': 'object',
'properties': {
'security_group': {
'type': 'object',
'properties': {
'name': {
'type': 'string',
'minLength': 0,
'maxLength': 255,
},
'description': {
'type': 'string',
'minLength': 0,
'maxLength': 255,
},
},
'required': ['name', 'description'],
# NOTE(stephenfin): Per gmann's note below
'additionalProperties': True,
},
},
'required': ['security_group'],
# NOTE(stephenfin): Per gmann's note below
'additionalProperties': True,
}
update = create
create_rules = {
'type': 'object',
'properties': {
'security_group_rule': {
'type': 'object',
'properties': {
'group_id': {
'oneOf': [
{'type': 'null'},
{'type': 'string', 'format': 'uuid'},
],
},
'parent_group_id': {
'type': 'string',
'format': 'uuid',
},
# NOTE(stephenfin): We never validated these and we're not
# going to add that validation now.
'to_port': {},
'from_port': {},
'ip_protocol': {},
'cidr': {},
},
'required': ['parent_group_id'],
# NOTE(stephenfin): Per gmann's note below
'additionalProperties': True,
},
},
'required': ['security_group_rule'],
# NOTE(stephenfin): Per gmann's note below
'additionalProperties': True,
}
index_query = {
'type': 'object',
'properties': {

View File

@ -14,13 +14,16 @@
from webob import exc
from nova.api.openstack.compute.schemas import security_group_default_rules as schema # noqa: E501
from nova.api.openstack import wsgi
from nova.api import validation
class SecurityGroupDefaultRulesController(wsgi.Controller):
"""(Removed) Controller for default project security groups."""
@wsgi.expected_errors(410)
@validation.schema(schema.create)
def create(self, req, body):
raise exc.HTTPGone()

View File

@ -21,8 +21,7 @@ from webob import exc
from nova.api.openstack.api_version_request \
import MAX_PROXY_API_SUPPORT_VERSION
from nova.api.openstack import common
from nova.api.openstack.compute.schemas import security_groups as \
schema_security_groups
from nova.api.openstack.compute.schemas import security_groups as schema
from nova.api.openstack import wsgi
from nova.api import validation
from nova.compute import api as compute
@ -131,16 +130,6 @@ class SecurityGroupControllerBase(object):
SG_NOT_FOUND)
return group_rule_data_by_rule_group_id
def _from_body(self, body, key):
if not body:
raise exc.HTTPBadRequest(
explanation=_("The request body can't be empty"))
value = body.get(key, None)
if value is None:
raise exc.HTTPBadRequest(
explanation=_("Missing parameter %s") % key)
return value
class SecurityGroupController(SecurityGroupControllerBase, wsgi.Controller):
"""The Security group API controller for the OpenStack API."""
@ -183,7 +172,7 @@ class SecurityGroupController(SecurityGroupControllerBase, wsgi.Controller):
raise exc.HTTPBadRequest(explanation=exp.format_message())
@wsgi.Controller.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@validation.query_schema(schema_security_groups.index_query)
@validation.query_schema(schema.index_query)
@wsgi.expected_errors(404)
def index(self, req):
"""Returns a list of security groups."""
@ -208,21 +197,17 @@ class SecurityGroupController(SecurityGroupControllerBase, wsgi.Controller):
@wsgi.Controller.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors((400, 403))
@validation.schema(schema.create)
def create(self, req, body):
"""Creates a new security group."""
context = req.environ['nova.context']
context.can(sg_policies.POLICY_NAME % 'create',
target={'project_id': context.project_id})
security_group = self._from_body(body, 'security_group')
group_name = security_group.get('name', None)
group_description = security_group.get('description', None)
group_name = body['security_group']['name']
group_description = body['security_group']['description']
try:
security_group_api.validate_property(group_name, 'name', None)
security_group_api.validate_property(group_description,
'description', None)
group_ref = security_group_api.create_security_group(
context, group_name, group_description)
except exception.Invalid as exp:
@ -235,6 +220,7 @@ class SecurityGroupController(SecurityGroupControllerBase, wsgi.Controller):
@wsgi.Controller.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors((400, 404))
@validation.schema(schema.update)
def update(self, req, id, body):
"""Update a security group."""
context = req.environ['nova.context']
@ -249,14 +235,10 @@ class SecurityGroupController(SecurityGroupControllerBase, wsgi.Controller):
except exception.Invalid as exp:
raise exc.HTTPBadRequest(explanation=exp.format_message())
security_group_data = self._from_body(body, 'security_group')
group_name = security_group_data.get('name', None)
group_description = security_group_data.get('description', None)
group_name = body['security_group']['name']
group_description = body['security_group']['description']
try:
security_group_api.validate_property(group_name, 'name', None)
security_group_api.validate_property(
group_description, 'description', None)
group_ref = security_group_api.update_security_group(
context, security_group, group_name, group_description)
except exception.SecurityGroupNotFound as exp:
@ -273,22 +255,20 @@ class SecurityGroupRulesController(SecurityGroupControllerBase,
@wsgi.Controller.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors((400, 403, 404))
@validation.schema(schema.create_rules)
def create(self, req, body):
context = req.environ['nova.context']
context.can(sg_policies.POLICY_NAME % 'rule:create',
target={'project_id': context.project_id})
sg_rule = self._from_body(body, 'security_group_rule')
sg_rule = body['security_group_rule']
group_id = sg_rule.get('group_id')
parent_group_id = sg_rule['parent_group_id']
source_group = {}
try:
parent_group_id = security_group_api.validate_id(
sg_rule.get('parent_group_id'))
security_group = security_group_api.get(
context, parent_group_id)
if group_id is not None:
group_id = security_group_api.validate_id(group_id)
source_group = security_group_api.get(
context, id=group_id)
new_rule = self._rule_args_to_dict(context,

View File

@ -34,7 +34,6 @@ from nova import exception
from nova.i18n import _
from nova.network import neutron as neutronapi
from nova.objects import security_group as security_group_obj
from nova import utils
LOG = logging.getLogger(__name__)
@ -204,18 +203,6 @@ def _rule_exists(security_group, new_rule):
return False
def validate_property(value, property, allowed):
"""Validate given security group property.
:param value: the value to validate, as a string or unicode
:param property: the property, either 'name' or 'description'
:param allowed: the range of characters allowed, but not used because
Neutron is allowing any characters.
"""
utils.check_string_length(value, name=property, min_length=0,
max_length=255)
def populate_security_groups(security_groups):
"""Build and return a SecurityGroupList.

View File

@ -77,7 +77,7 @@ def security_group_rule_template(**kwargs):
rule.setdefault('ip_protocol', 'tcp')
rule.setdefault('from_port', 22)
rule.setdefault('to_port', 22)
rule.setdefault('parent_group_id', 2)
rule.setdefault('parent_group_id', uuids.parent_group_id)
return rule
@ -454,7 +454,8 @@ class TestSecurityGroupsV21(test.TestCase):
def test_create_security_group(self):
sg = security_group_request_template()
res_dict = self.controller.create(self.req, {'security_group': sg})
res_dict = self.controller.create(
self.req, body={'security_group': sg})
self.assertEqual(res_dict['security_group']['name'], 'test')
self.assertEqual(res_dict['security_group']['description'],
'test-description')
@ -463,31 +464,36 @@ class TestSecurityGroupsV21(test.TestCase):
sg = security_group_request_template()
del sg['name']
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, self.req,
{'security_group': sg})
self.assertRaises(
exception.ValidationError,
self.controller.create, self.req,
body={'security_group': sg})
def test_create_security_group_with_no_body(self):
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, self.req, None)
self.assertRaises(
exception.ValidationError,
self.controller.create, self.req, body=None)
def test_create_security_group_with_no_security_group(self):
body = {'no-securityGroup': None}
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, self.req, body)
self.assertRaises(
exception.ValidationError,
self.controller.create, self.req, body=body)
def test_create_security_group_above_255_characters_name(self):
sg = security_group_request_template(name='1234567890' * 26)
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group': sg})
self.assertRaises(
exception.ValidationError, self.controller.create,
self.req, body={'security_group': sg})
def test_create_security_group_above_255_characters_description(self):
sg = security_group_request_template(description='1234567890' * 26)
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group': sg})
self.assertRaises(
nova.exception.ValidationError, self.controller.create,
self.req, body={'security_group': sg})
def test_get_security_group_list(self):
self._create_sg_template().get('security_group')
@ -506,14 +512,16 @@ class TestSecurityGroupsV21(test.TestCase):
def test_get_security_group_list_missing_group_id_rule(self):
groups = []
rule1 = security_group_rule_template(cidr='10.2.3.124/24',
parent_group_id=1,
group_id={}, id=88,
protocol='TCP')
rule2 = security_group_rule_template(cidr='10.2.3.125/24',
parent_group_id=1,
id=99, protocol=88,
group_id='HAS_BEEN_DELETED')
rule1 = security_group_rule_template(
cidr='10.2.3.124/24',
parent_group_id=uuids.parent_group_id,
group_id={}, id=88,
protocol='TCP')
rule2 = security_group_rule_template(
cidr='10.2.3.125/24',
parent_group_id=uuids.parent_group_id,
id=99, protocol=88,
group_id='HAS_BEEN_DELETED')
sg = security_group_template(id=1,
name='test',
description='test-desc',
@ -525,7 +533,8 @@ class TestSecurityGroupsV21(test.TestCase):
# passed in. For example:
# "cidr": "0.0.0.0/0" ->"ip_range": {"cidr": "0.0.0.0/0"}
expected_rule = security_group_rule_template(
ip_range={'cidr': '10.2.3.124/24'}, parent_group_id=1,
ip_range={'cidr': '10.2.3.124/24'},
parent_group_id=uuids.parent_group_id,
group={}, id=88, ip_protocol='TCP')
expected = security_group_template(id=1,
name='test',
@ -603,7 +612,7 @@ class TestSecurityGroupsV21(test.TestCase):
sg = security_group_template()
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.update,
self.req, '1', {'security_group': sg})
self.req, '1', body={'security_group': sg})
def test_delete_security_group_by_id(self):
sg = self._create_sg_template().get('security_group')
@ -973,8 +982,8 @@ class TestSecurityGroupRulesV21(test.TestCase):
rule = security_group_rule_template(cidr='10.2.3.124/24',
parent_group_id=self.sg2['id'])
res_dict = self.controller.create(self.req,
{'security_group_rule': rule})
res_dict = self.controller.create(
self.req, body={'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
self.assertNotEqual(security_group_rule['id'], 0)
self.assertEqual(security_group_rule['parent_group_id'],
@ -986,8 +995,8 @@ class TestSecurityGroupRulesV21(test.TestCase):
rule = security_group_rule_template(group_id=self.sg1['id'],
parent_group_id=self.sg2['id'])
res_dict = self.controller.create(self.req,
{'security_group_rule': rule})
res_dict = self.controller.create(
self.req, body={'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
self.assertNotEqual(security_group_rule['id'], 0)
@ -1004,8 +1013,8 @@ class TestSecurityGroupRulesV21(test.TestCase):
from_port=81, to_port=81,
parent_group_id=self.sg2['id'])
res_dict = self.controller.create(self.req,
{'security_group_rule': rule2})
res_dict = self.controller.create(
self.req, body={'security_group_rule': rule2})
security_group_rule = res_dict['security_group_rule']
self.assertNotEqual(security_group_rule['id'], 0)
@ -1017,8 +1026,8 @@ class TestSecurityGroupRulesV21(test.TestCase):
def test_create_none_value_from_to_port(self):
rule = {'parent_group_id': self.sg1['id'],
'group_id': self.sg1['id']}
res_dict = self.controller.create(self.req,
{'security_group_rule': rule})
res_dict = self.controller.create(
self.req, body={'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
self.assertIsNone(security_group_rule['from_port'])
self.assertIsNone(security_group_rule['to_port'])
@ -1030,8 +1039,8 @@ class TestSecurityGroupRulesV21(test.TestCase):
rule = {'parent_group_id': self.sg1['id'],
'group_id': self.sg1['id'],
'ip_protocol': 'ICMP'}
res_dict = self.controller.create(self.req,
{'security_group_rule': rule})
res_dict = self.controller.create(
self.req, body={'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
self.assertEqual(security_group_rule['ip_protocol'], 'ICMP')
self.assertEqual(security_group_rule['from_port'], -1)
@ -1044,8 +1053,8 @@ class TestSecurityGroupRulesV21(test.TestCase):
rule = {'parent_group_id': self.sg1['id'],
'group_id': self.sg1['id'],
'ip_protocol': 'TCP'}
res_dict = self.controller.create(self.req,
{'security_group_rule': rule})
res_dict = self.controller.create(
self.req, body={'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
self.assertEqual(security_group_rule['ip_protocol'], 'TCP')
self.assertEqual(security_group_rule['from_port'], 1)
@ -1061,8 +1070,9 @@ class TestSecurityGroupRulesV21(test.TestCase):
to_port=22,
parent_group_id=self.sg2['id'],
cidr="10.2.3.124/2433")
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_by_invalid_tcp_port_json(self):
rule = security_group_rule_template(
@ -1072,8 +1082,9 @@ class TestSecurityGroupRulesV21(test.TestCase):
parent_group_id=self.sg2['id'],
cidr="10.2.3.124/24")
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_by_invalid_icmp_port_json(self):
rule = security_group_rule_template(
@ -1082,140 +1093,158 @@ class TestSecurityGroupRulesV21(test.TestCase):
to_port=256,
parent_group_id=self.sg2['id'],
cidr="10.2.3.124/24")
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_add_existing_rules_by_cidr(self):
sg = security_group_template()
req = fakes.HTTPRequest.blank(
'/v2/%s/os-security-groups' % fakes.FAKE_PROJECT_ID)
self.controller_sg.create(req, {'security_group': sg})
self.controller_sg.create(req, body={'security_group': sg})
rule = security_group_rule_template(
cidr='15.0.0.0/8', parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank(
'/v2/%s/os-security-group-rules' % fakes.FAKE_PROJECT_ID)
self.controller.create(req, {'security_group_rule': rule})
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
req, {'security_group_rule': rule})
self.controller.create(req, body={'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
req, body={'security_group_rule': rule})
def test_create_add_existing_rules_by_group_id(self):
sg = security_group_template()
req = fakes.HTTPRequest.blank(
'/v2/%s/os-security-groups' % fakes.FAKE_PROJECT_ID)
self.controller_sg.create(req, {'security_group': sg})
self.controller_sg.create(req, body={'security_group': sg})
rule = security_group_rule_template(
group=self.sg1['id'], parent_group_id=self.sg2['id'])
req = fakes.HTTPRequest.blank(
'/v2/%s/os-security-group-rules' % fakes.FAKE_PROJECT_ID)
self.controller.create(req, {'security_group_rule': rule})
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
req, {'security_group_rule': rule})
self.controller.create(req, body={'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
req, body={'security_group_rule': rule})
def test_create_with_no_body(self):
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, self.req, None)
self.assertRaises(
exception.ValidationError,
self.controller.create, self.req, body=None)
def test_create_with_no_security_group_rule_in_body(self):
rules = {'test': 'test'}
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, self.req, rules)
self.assertRaises(
exception.ValidationError,
self.controller.create, self.req, body=rules)
def test_create_with_invalid_parent_group_id(self):
rule = security_group_rule_template(parent_group_id='invalid')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
exception.ValidationError,
self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_non_existing_parent_group_id(self):
rule = security_group_rule_template(group_id=None,
parent_group_id=self.invalid_id)
self.assertRaises(webob.exc.HTTPNotFound, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPNotFound, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_non_existing_group_id(self):
rule = security_group_rule_template(group_id='invalid',
rule = security_group_rule_template(group_id=uuids.doesnotexist,
parent_group_id=self.sg2['id'])
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPNotFound, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_invalid_protocol(self):
rule = security_group_rule_template(ip_protocol='invalid-protocol',
cidr='10.2.2.0/24',
parent_group_id=self.sg2['id'])
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_no_protocol(self):
rule = security_group_rule_template(cidr='10.2.2.0/24',
parent_group_id=self.sg2['id'])
del rule['ip_protocol']
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_invalid_from_port(self):
rule = security_group_rule_template(from_port='666666',
cidr='10.2.2.0/24',
parent_group_id=self.sg2['id'])
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_invalid_to_port(self):
rule = security_group_rule_template(to_port='666666',
cidr='10.2.2.0/24',
parent_group_id=self.sg2['id'])
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_non_numerical_from_port(self):
rule = security_group_rule_template(from_port='invalid',
cidr='10.2.2.0/24',
parent_group_id=self.sg2['id'])
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_non_numerical_to_port(self):
rule = security_group_rule_template(to_port='invalid',
cidr='10.2.2.0/24',
parent_group_id=self.sg2['id'])
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_no_from_port(self):
rule = security_group_rule_template(cidr='10.2.2.0/24',
parent_group_id=self.sg2['id'])
del rule['from_port']
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_no_to_port(self):
rule = security_group_rule_template(cidr='10.2.2.0/24',
parent_group_id=self.sg2['id'])
del rule['to_port']
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_invalid_cidr(self):
rule = security_group_rule_template(cidr='10.2.2222.0/24',
parent_group_id=self.sg2['id'])
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_no_cidr_group(self):
rule = security_group_rule_template(parent_group_id=self.sg2['id'])
res_dict = self.controller.create(self.req,
{'security_group_rule': rule})
res_dict = self.controller.create(
self.req, body={'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
self.assertNotEqual(security_group_rule['id'], 0)
@ -1228,28 +1257,31 @@ class TestSecurityGroupRulesV21(test.TestCase):
rule = security_group_rule_template(group_id='invalid',
parent_group_id=self.sg2['id'])
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
exception.ValidationError, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_empty_group_id(self):
rule = security_group_rule_template(group_id='',
parent_group_id=self.sg2['id'])
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
exception.ValidationError, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_nonexist_group_id(self):
rule = security_group_rule_template(group_id=self.invalid_id,
parent_group_id=self.sg2['id'])
self.assertRaises(webob.exc.HTTPNotFound, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPNotFound, self.controller.create,
self.req, body={'security_group_rule': rule})
def test_create_with_same_group_parent_id_and_group_id(self):
rule = security_group_rule_template(group_id=self.sg1['id'],
parent_group_id=self.sg1['id'])
res_dict = self.controller.create(self.req,
{'security_group_rule': rule})
res_dict = self.controller.create(
self.req, body={'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
self.assertNotEqual(security_group_rule['id'], 0)
self.assertEqual(security_group_rule['parent_group_id'],
@ -1260,15 +1292,16 @@ class TestSecurityGroupRulesV21(test.TestCase):
def _test_create_with_no_ports_and_no_group(self, proto):
rule = {'ip_protocol': proto, 'parent_group_id': self.sg2['id']}
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
def _test_create_with_no_ports(self, proto):
rule = {'ip_protocol': proto, 'parent_group_id': self.sg2['id'],
'group_id': self.sg1['id']}
res_dict = self.controller.create(self.req,
{'security_group_rule': rule})
res_dict = self.controller.create(
self.req, body={'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
expected_rule = {
'from_port': 1, 'group': {'tenant_id': '123', 'name': 'test'},
@ -1297,8 +1330,8 @@ class TestSecurityGroupRulesV21(test.TestCase):
'ip_protocol': proto, 'from_port': from_port, 'to_port': to_port,
'parent_group_id': self.sg2['id'], 'group_id': self.sg1['id']
}
res_dict = self.controller.create(self.req,
{'security_group_rule': rule})
res_dict = self.controller.create(
self.req, body={'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
expected_rule = {
@ -1332,7 +1365,8 @@ class TestSecurityGroupRulesV21(test.TestCase):
req = fakes.HTTPRequest.blank(
'/v2/%s/os-security-group-rules' % fakes.FAKE_PROJECT_ID)
res_dict = self.controller.create(req, {'security_group_rule': rule})
res_dict = self.controller.create(
req, body={'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
req = fakes.HTTPRequest.blank('/v2/%s/os-security-group-rules/%s' % (
fakes.FAKE_PROJECT_ID, security_group_rule['id']))
@ -1350,8 +1384,8 @@ class TestSecurityGroupRulesV21(test.TestCase):
rule = security_group_rule_template(cidr='0.0.0.0/0',
parent_group_id=self.sg2['id'])
res_dict = self.controller.create(self.req,
{'security_group_rule': rule})
res_dict = self.controller.create(
self.req, body={'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
self.assertNotEqual(security_group_rule['id'], 0)
@ -1364,8 +1398,8 @@ class TestSecurityGroupRulesV21(test.TestCase):
rule = security_group_rule_template(cidr='::/0',
parent_group_id=self.sg2['id'])
res_dict = self.controller.create(self.req,
{'security_group_rule': rule})
res_dict = self.controller.create(
self.req, body={'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
self.assertNotEqual(security_group_rule['id'], 0)
@ -1378,8 +1412,8 @@ class TestSecurityGroupRulesV21(test.TestCase):
rule = security_group_rule_template(cidr='15.0.0.0/8',
parent_group_id=self.sg2['id'])
res_dict = self.controller.create(self.req,
{'security_group_rule': rule})
res_dict = self.controller.create(
self.req, body={'security_group_rule': rule})
security_group_rule = res_dict['security_group_rule']
self.assertNotEqual(security_group_rule['id'], 0)
@ -1389,9 +1423,12 @@ class TestSecurityGroupRulesV21(test.TestCase):
"15.0.0.0/8")
def test_create_rule_cidr_bad_netmask(self):
rule = security_group_rule_template(cidr='15.0.0.0/0')
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
self.req, {'security_group_rule': rule})
rule = security_group_rule_template(cidr='15.0.0.0/0',
parent_group_id=self.sg2['id'])
exc = self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
self.req, body={'security_group_rule': rule})
self.assertIn('Bad prefix for network ', str(exc))
UUID1 = '00000000-0000-0000-0000-000000000001'
@ -1502,7 +1539,7 @@ class SecurityGroupsOutputTest(test.TestCase):
security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]
for security_group in security_groups:
sg = security_group_template(name=security_group['name'])
self.controller.create(req, {'security_group': sg})
self.controller.create(req, body={'security_group': sg})
server = dict(name='server_test', imageRef=image_uuid, flavorRef=2,
security_groups=security_groups)
@ -1536,7 +1573,7 @@ class SecurityGroupsOutputTest(test.TestCase):
security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]
for security_group in security_groups:
sg = security_group_template(name=security_group['name'])
self.controller.create(req, {'security_group': sg})
self.controller.create(req, body={'security_group': sg})
server = dict(name='server_test', imageRef=image_uuid, flavorRef=2,
security_groups=security_groups)
@ -1592,9 +1629,9 @@ class TestSecurityGroupsDeprecation(test.NoDBTestCase):
self.assertRaises(exception.VersionNotFoundForAPIMethod,
self.controller.index, self.req)
self.assertRaises(exception.VersionNotFoundForAPIMethod,
self.controller.update, self.req, fakes.FAKE_UUID, {})
self.controller.update, self.req, fakes.FAKE_UUID, body={})
self.assertRaises(exception.VersionNotFoundForAPIMethod,
self.controller.create, self.req, {})
self.controller.create, self.req, body={})
class TestSecurityGroupRulesDeprecation(test.NoDBTestCase):
@ -1606,6 +1643,6 @@ class TestSecurityGroupRulesDeprecation(test.NoDBTestCase):
def test_all_apis_return_not_found(self):
self.assertRaises(exception.VersionNotFoundForAPIMethod,
self.controller.create, self.req, {})
self.controller.create, self.req, body={})
self.assertRaises(exception.VersionNotFoundForAPIMethod,
self.controller.delete, self.req, fakes.FAKE_UUID)

View File

@ -392,14 +392,6 @@ class TestNeutronDriver(test.NoDBTestCase):
class TestNeutronDriverWithoutMock(test.NoDBTestCase):
def test_validate_property(self):
sg_api.validate_property('foo', 'name', None)
sg_api.validate_property('', 'name', None)
self.assertRaises(exception.Invalid, sg_api.validate_property,
'a' * 256, 'name', None)
self.assertRaises(exception.Invalid, sg_api.validate_property,
None, 'name', None)
def test_populate_security_groups(self):
r = sg_api.populate_security_groups(['default', uuids.secgroup_uuid])
self.assertIsInstance(r, objects.SecurityGroupList)