Merged with trunk

This commit is contained in:
Tushar Patil 2011-08-11 18:23:49 -07:00
commit 651e5f91a5
6 changed files with 1249 additions and 5 deletions

View File

@ -0,0 +1,466 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""The security groups extension."""
import netaddr
import urllib
from webob import exc
import webob
from nova import compute
from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova.api.openstack import common
from nova.api.openstack import extensions
from nova.api.openstack import wsgi
from xml.dom import minidom
LOG = logging.getLogger("nova.api.contrib.security_groups")
FLAGS = flags.FLAGS
class SecurityGroupController(object):
"""The Security group API controller for the OpenStack API."""
def __init__(self):
self.compute_api = compute.API()
super(SecurityGroupController, self).__init__()
def _format_security_group_rule(self, context, rule):
sg_rule = {}
sg_rule['id'] = rule.id
sg_rule['parent_group_id'] = rule.parent_group_id
sg_rule['ip_protocol'] = rule.protocol
sg_rule['from_port'] = rule.from_port
sg_rule['to_port'] = rule.to_port
sg_rule['group'] = {}
sg_rule['ip_range'] = {}
if rule.group_id:
source_group = db.security_group_get(context, rule.group_id)
sg_rule['group'] = {'name': source_group.name,
'tenant_id': source_group.project_id}
else:
sg_rule['ip_range'] = {'cidr': rule.cidr}
return sg_rule
def _format_security_group(self, context, group):
security_group = {}
security_group['id'] = group.id
security_group['description'] = group.description
security_group['name'] = group.name
security_group['tenant_id'] = group.project_id
security_group['rules'] = []
for rule in group.rules:
security_group['rules'] += [self._format_security_group_rule(
context, rule)]
return security_group
def show(self, req, id):
"""Return data about the given security group."""
context = req.environ['nova.context']
try:
id = int(id)
security_group = db.security_group_get(context, id)
except ValueError:
msg = _("Security group id is not integer")
return exc.HTTPBadRequest(explanation=msg)
except exception.NotFound as exp:
return exc.HTTPNotFound(explanation=unicode(exp))
return {'security_group': self._format_security_group(context,
security_group)}
def delete(self, req, id):
"""Delete a security group."""
context = req.environ['nova.context']
try:
id = int(id)
security_group = db.security_group_get(context, id)
except ValueError:
msg = _("Security group id is not integer")
return exc.HTTPBadRequest(explanation=msg)
except exception.SecurityGroupNotFound as exp:
return exc.HTTPNotFound(explanation=unicode(exp))
LOG.audit(_("Delete security group %s"), id, context=context)
db.security_group_destroy(context, security_group.id)
return exc.HTTPAccepted()
def index(self, req):
"""Returns a list of security groups"""
context = req.environ['nova.context']
self.compute_api.ensure_default_security_group(context)
groups = db.security_group_get_by_project(context,
context.project_id)
limited_list = common.limited(groups, req)
result = [self._format_security_group(context, group)
for group in limited_list]
return {'security_groups':
list(sorted(result,
key=lambda k: (k['tenant_id'], k['name'])))}
def create(self, req, body):
"""Creates a new security group."""
context = req.environ['nova.context']
if not body:
return exc.HTTPUnprocessableEntity()
security_group = body.get('security_group', None)
if security_group is None:
return exc.HTTPUnprocessableEntity()
group_name = security_group.get('name', None)
group_description = security_group.get('description', None)
self._validate_security_group_property(group_name, "name")
self._validate_security_group_property(group_description,
"description")
group_name = group_name.strip()
group_description = group_description.strip()
LOG.audit(_("Create Security Group %s"), group_name, context=context)
self.compute_api.ensure_default_security_group(context)
if db.security_group_exists(context, context.project_id, group_name):
msg = _('Security group %s already exists') % group_name
raise exc.HTTPBadRequest(explanation=msg)
group = {'user_id': context.user_id,
'project_id': context.project_id,
'name': group_name,
'description': group_description}
group_ref = db.security_group_create(context, group)
return {'security_group': self._format_security_group(context,
group_ref)}
def _validate_security_group_property(self, value, typ):
""" typ will be either 'name' or 'description',
depending on the caller
"""
try:
val = value.strip()
except AttributeError:
msg = _("Security group %s is not a string or unicode") % typ
raise exc.HTTPBadRequest(explanation=msg)
if not val:
msg = _("Security group %s cannot be empty.") % typ
raise exc.HTTPBadRequest(explanation=msg)
if len(val) > 255:
msg = _("Security group %s should not be greater "
"than 255 characters.") % typ
raise exc.HTTPBadRequest(explanation=msg)
class SecurityGroupRulesController(SecurityGroupController):
def create(self, req, body):
context = req.environ['nova.context']
if not body:
raise exc.HTTPUnprocessableEntity()
if not 'security_group_rule' in body:
raise exc.HTTPUnprocessableEntity()
self.compute_api.ensure_default_security_group(context)
sg_rule = body['security_group_rule']
parent_group_id = sg_rule.get('parent_group_id', None)
try:
parent_group_id = int(parent_group_id)
security_group = db.security_group_get(context, parent_group_id)
except ValueError:
msg = _("Parent group id is not integer")
return exc.HTTPBadRequest(explanation=msg)
except exception.NotFound as exp:
msg = _("Security group (%s) not found") % parent_group_id
return exc.HTTPNotFound(explanation=msg)
msg = _("Authorize security group ingress %s")
LOG.audit(msg, security_group['name'], context=context)
try:
values = self._rule_args_to_dict(context,
to_port=sg_rule.get('to_port'),
from_port=sg_rule.get('from_port'),
parent_group_id=sg_rule.get('parent_group_id'),
ip_protocol=sg_rule.get('ip_protocol'),
cidr=sg_rule.get('cidr'),
group_id=sg_rule.get('group_id'))
except Exception as exp:
raise exc.HTTPBadRequest(explanation=unicode(exp))
if values is None:
msg = _("Not enough parameters to build a "
"valid rule.")
raise exc.HTTPBadRequest(explanation=msg)
values['parent_group_id'] = security_group.id
if self._security_group_rule_exists(security_group, values):
msg = _('This rule already exists in group %s') % parent_group_id
raise exc.HTTPBadRequest(explanation=msg)
security_group_rule = db.security_group_rule_create(context, values)
self.compute_api.trigger_security_group_rules_refresh(context,
security_group_id=security_group['id'])
return {'security_group_rule': self._format_security_group_rule(
context,
security_group_rule)}
def _security_group_rule_exists(self, security_group, values):
"""Indicates whether the specified rule values are already
defined in the given security group.
"""
for rule in security_group.rules:
if 'group_id' in values:
if rule['group_id'] == values['group_id']:
return True
else:
is_duplicate = True
for key in ('cidr', 'from_port', 'to_port', 'protocol'):
if rule[key] != values[key]:
is_duplicate = False
break
if is_duplicate:
return True
return False
def _rule_args_to_dict(self, context, to_port=None, from_port=None,
parent_group_id=None, ip_protocol=None,
cidr=None, group_id=None):
values = {}
if group_id:
try:
parent_group_id = int(parent_group_id)
group_id = int(group_id)
except ValueError:
msg = _("Parent or group id is not integer")
raise exception.InvalidInput(reason=msg)
if parent_group_id == group_id:
msg = _("Parent group id and group id cannot be same")
raise exception.InvalidInput(reason=msg)
values['group_id'] = group_id
#check if groupId exists
db.security_group_get(context, group_id)
elif cidr:
# If this fails, it throws an exception. This is what we want.
try:
cidr = urllib.unquote(cidr).decode()
netaddr.IPNetwork(cidr)
except Exception:
raise exception.InvalidCidr(cidr=cidr)
values['cidr'] = cidr
else:
values['cidr'] = '0.0.0.0/0'
if ip_protocol and from_port and to_port:
try:
from_port = int(from_port)
to_port = int(to_port)
except ValueError:
raise exception.InvalidPortRange(from_port=from_port,
to_port=to_port)
ip_protocol = str(ip_protocol)
if ip_protocol.upper() not in ['TCP', 'UDP', 'ICMP']:
raise exception.InvalidIpProtocol(protocol=ip_protocol)
if ((min(from_port, to_port) < -1) or
(max(from_port, to_port) > 65535)):
raise exception.InvalidPortRange(from_port=from_port,
to_port=to_port)
values['protocol'] = ip_protocol
values['from_port'] = from_port
values['to_port'] = to_port
else:
# If cidr based filtering, protocol and ports are mandatory
if 'cidr' in values:
return None
return values
def delete(self, req, id):
context = req.environ['nova.context']
self.compute_api.ensure_default_security_group(context)
try:
id = int(id)
rule = db.security_group_rule_get(context, id)
except ValueError:
msg = _("Rule id is not integer")
return exc.HTTPBadRequest(explanation=msg)
except exception.NotFound as exp:
msg = _("Rule (%s) not found") % id
return exc.HTTPNotFound(explanation=msg)
group_id = rule.parent_group_id
self.compute_api.ensure_default_security_group(context)
security_group = db.security_group_get(context, group_id)
msg = _("Revoke security group ingress %s")
LOG.audit(msg, security_group['name'], context=context)
db.security_group_rule_destroy(context, rule['id'])
self.compute_api.trigger_security_group_rules_refresh(context,
security_group_id=security_group['id'])
return exc.HTTPAccepted()
class Security_groups(extensions.ExtensionDescriptor):
def get_name(self):
return "SecurityGroups"
def get_alias(self):
return "security_groups"
def get_description(self):
return "Security group support"
def get_namespace(self):
return "http://docs.openstack.org/ext/securitygroups/api/v1.1"
def get_updated(self):
return "2011-07-21T00:00:00+00:00"
def get_resources(self):
resources = []
metadata = _get_metadata()
body_serializers = {
'application/xml': wsgi.XMLDictSerializer(metadata=metadata,
xmlns=wsgi.XMLNS_V11),
}
serializer = wsgi.ResponseSerializer(body_serializers, None)
body_deserializers = {
'application/xml': SecurityGroupXMLDeserializer(),
}
deserializer = wsgi.RequestDeserializer(body_deserializers)
res = extensions.ResourceExtension('os-security-groups',
controller=SecurityGroupController(),
deserializer=deserializer,
serializer=serializer)
resources.append(res)
body_deserializers = {
'application/xml': SecurityGroupRulesXMLDeserializer(),
}
deserializer = wsgi.RequestDeserializer(body_deserializers)
res = extensions.ResourceExtension('os-security-group-rules',
controller=SecurityGroupRulesController(),
deserializer=deserializer,
serializer=serializer)
resources.append(res)
return resources
class SecurityGroupXMLDeserializer(wsgi.MetadataXMLDeserializer):
"""
Deserializer to handle xml-formatted security group requests.
"""
def create(self, string):
"""Deserialize an xml-formatted security group create request"""
dom = minidom.parseString(string)
security_group = {}
sg_node = self.find_first_child_named(dom,
'security_group')
if sg_node is not None:
if sg_node.hasAttribute('name'):
security_group['name'] = sg_node.getAttribute('name')
desc_node = self.find_first_child_named(sg_node,
"description")
if desc_node:
security_group['description'] = self.extract_text(desc_node)
return {'body': {'security_group': security_group}}
class SecurityGroupRulesXMLDeserializer(wsgi.MetadataXMLDeserializer):
"""
Deserializer to handle xml-formatted security group requests.
"""
def create(self, string):
"""Deserialize an xml-formatted security group create request"""
dom = minidom.parseString(string)
security_group_rule = self._extract_security_group_rule(dom)
return {'body': {'security_group_rule': security_group_rule}}
def _extract_security_group_rule(self, node):
"""Marshal the security group rule attribute of a parsed request"""
sg_rule = {}
sg_rule_node = self.find_first_child_named(node,
'security_group_rule')
if sg_rule_node is not None:
ip_protocol_node = self.find_first_child_named(sg_rule_node,
"ip_protocol")
if ip_protocol_node is not None:
sg_rule['ip_protocol'] = self.extract_text(ip_protocol_node)
from_port_node = self.find_first_child_named(sg_rule_node,
"from_port")
if from_port_node is not None:
sg_rule['from_port'] = self.extract_text(from_port_node)
to_port_node = self.find_first_child_named(sg_rule_node, "to_port")
if to_port_node is not None:
sg_rule['to_port'] = self.extract_text(to_port_node)
parent_group_id_node = self.find_first_child_named(sg_rule_node,
"parent_group_id")
if parent_group_id_node is not None:
sg_rule['parent_group_id'] = self.extract_text(
parent_group_id_node)
group_id_node = self.find_first_child_named(sg_rule_node,
"group_id")
if group_id_node is not None:
sg_rule['group_id'] = self.extract_text(group_id_node)
cidr_node = self.find_first_child_named(sg_rule_node, "cidr")
if cidr_node is not None:
sg_rule['cidr'] = self.extract_text(cidr_node)
return sg_rule
def _get_metadata():
metadata = {
"attributes": {
"security_group": ["id", "tenant_id", "name"],
"rule": ["id", "parent_group_id"],
"security_group_rule": ["id", "parent_group_id"],
}
}
return metadata

View File

@ -266,9 +266,13 @@ class ExtensionMiddleware(base_wsgi.Middleware):
for resource in ext_mgr.get_resources():
LOG.debug(_('Extended resource: %s'),
resource.collection)
if resource.serializer is None:
resource.serializer = serializer
mapper.resource(resource.collection, resource.collection,
controller=wsgi.Resource(
resource.controller, serializer=serializer),
resource.controller, resource.deserializer,
resource.serializer),
collection=resource.collection_actions,
member=resource.member_actions,
parent_resource=resource.parent)
@ -461,7 +465,8 @@ class ResourceExtension(object):
"""Add top level resources to the OpenStack API in nova."""
def __init__(self, collection, controller, parent=None,
collection_actions=None, member_actions=None):
collection_actions=None, member_actions=None,
deserializer=None, serializer=None):
if not collection_actions:
collection_actions = {}
if not member_actions:
@ -471,6 +476,8 @@ class ResourceExtension(object):
self.parent = parent
self.collection_actions = collection_actions
self.member_actions = member_actions
self.deserializer = deserializer
self.serializer = serializer
class ExtensionsXMLSerializer(wsgi.XMLDictSerializer):

View File

@ -1102,6 +1102,11 @@ def security_group_rule_destroy(context, security_group_rule_id):
return IMPL.security_group_rule_destroy(context, security_group_rule_id)
def security_group_rule_get(context, security_group_rule_id):
"""Gets a security group rule."""
return IMPL.security_group_rule_get(context, security_group_rule_id)
###################

View File

@ -209,6 +209,10 @@ class InvalidContentType(Invalid):
message = _("Invalid content type %(content_type)s.")
class InvalidCidr(Invalid):
message = _("Invalid cidr %(cidr)s.")
# Cannot be templated as the error syntax varies.
# msg needs to be constructed when raised.
class InvalidParameterValue(Invalid):

View File

@ -0,0 +1,761 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import unittest
import webob
from xml.dom import minidom
from nova import test
from nova.api.openstack.contrib import security_groups
from nova.tests.api.openstack import fakes
def _get_create_request_json(body_dict):
req = webob.Request.blank('/v1.1/os-security-groups')
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
req.body = json.dumps(body_dict)
return req
def _create_security_group_json(security_group):
body_dict = _create_security_group_request_dict(security_group)
request = _get_create_request_json(body_dict)
response = request.get_response(fakes.wsgi_app())
return response
def _create_security_group_request_dict(security_group):
sg = {}
if security_group is not None:
name = security_group.get('name', None)
description = security_group.get('description', None)
if name:
sg['name'] = security_group['name']
if description:
sg['description'] = security_group['description']
return {'security_group': sg}
class TestSecurityGroups(test.TestCase):
def setUp(self):
super(TestSecurityGroups, self).setUp()
def tearDown(self):
super(TestSecurityGroups, self).tearDown()
def _create_security_group_request_dict(self, security_group):
sg = {}
if security_group is not None:
name = security_group.get('name', None)
description = security_group.get('description', None)
if name:
sg['name'] = security_group['name']
if description:
sg['description'] = security_group['description']
return {'security_group': sg}
def _format_create_xml_request_body(self, body_dict):
sg = body_dict['security_group']
body_parts = []
body_parts.extend([
'<?xml version="1.0" encoding="UTF-8"?>',
'<security_group xmlns="http://docs.openstack.org/ext/'
'securitygroups/api/v1.1"',
' name="%s">' % (sg['name'])])
if 'description' in sg:
body_parts.append('<description>%s</description>'
% sg['description'])
body_parts.append('</security_group>')
return ''.join(body_parts)
def _get_create_request_xml(self, body_dict):
req = webob.Request.blank('/v1.1/os-security-groups')
req.headers['Content-Type'] = 'application/xml'
req.content_type = 'application/xml'
req.accept = 'application/xml'
req.method = 'POST'
req.body = self._format_create_xml_request_body(body_dict)
return req
def _create_security_group_xml(self, security_group):
body_dict = self._create_security_group_request_dict(security_group)
request = self._get_create_request_xml(body_dict)
response = request.get_response(fakes.wsgi_app())
return response
def _delete_security_group(self, id):
request = webob.Request.blank('/v1.1/os-security-groups/%s'
% id)
request.method = 'DELETE'
response = request.get_response(fakes.wsgi_app())
return response
def test_create_security_group_json(self):
security_group = {}
security_group['name'] = "test"
security_group['description'] = "group-description"
response = _create_security_group_json(security_group)
res_dict = json.loads(response.body)
self.assertEqual(res_dict['security_group']['name'], "test")
self.assertEqual(res_dict['security_group']['description'],
"group-description")
self.assertEquals(response.status_int, 200)
def test_create_security_group_xml(self):
security_group = {}
security_group['name'] = "test"
security_group['description'] = "group-description"
response = \
self._create_security_group_xml(security_group)
self.assertEquals(response.status_int, 200)
dom = minidom.parseString(response.body)
sg = dom.childNodes[0]
self.assertEquals(sg.nodeName, 'security_group')
self.assertEqual(security_group['name'], sg.getAttribute('name'))
def test_create_security_group_with_no_name_json(self):
security_group = {}
security_group['description'] = "group-description"
response = _create_security_group_json(security_group)
self.assertEquals(response.status_int, 400)
def test_create_security_group_with_no_description_json(self):
security_group = {}
security_group['name'] = "test"
response = _create_security_group_json(security_group)
self.assertEquals(response.status_int, 400)
def test_create_security_group_with_blank_name_json(self):
security_group = {}
security_group['name'] = ""
security_group['description'] = "group-description"
response = _create_security_group_json(security_group)
self.assertEquals(response.status_int, 400)
def test_create_security_group_with_whitespace_name_json(self):
security_group = {}
security_group['name'] = " "
security_group['description'] = "group-description"
response = _create_security_group_json(security_group)
self.assertEquals(response.status_int, 400)
def test_create_security_group_with_blank_description_json(self):
security_group = {}
security_group['name'] = "test"
security_group['description'] = ""
response = _create_security_group_json(security_group)
self.assertEquals(response.status_int, 400)
def test_create_security_group_with_whitespace_description_json(self):
security_group = {}
security_group['name'] = "name"
security_group['description'] = " "
response = _create_security_group_json(security_group)
self.assertEquals(response.status_int, 400)
def test_create_security_group_with_duplicate_name_json(self):
security_group = {}
security_group['name'] = "test"
security_group['description'] = "group-description"
response = _create_security_group_json(security_group)
self.assertEquals(response.status_int, 200)
response = _create_security_group_json(security_group)
self.assertEquals(response.status_int, 400)
def test_create_security_group_with_no_body_json(self):
request = _get_create_request_json(body_dict=None)
response = request.get_response(fakes.wsgi_app())
self.assertEquals(response.status_int, 422)
def test_create_security_group_with_no_security_group(self):
body_dict = {}
body_dict['no-securityGroup'] = None
request = _get_create_request_json(body_dict)
response = request.get_response(fakes.wsgi_app())
self.assertEquals(response.status_int, 422)
def test_create_security_group_above_255_characters_name_json(self):
security_group = {}
security_group['name'] = ("1234567890123456"
"1234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890")
security_group['description'] = "group-description"
response = _create_security_group_json(security_group)
self.assertEquals(response.status_int, 400)
def test_create_security_group_above_255_characters_description_json(self):
security_group = {}
security_group['name'] = "test"
security_group['description'] = ("1234567890123456"
"1234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890")
response = _create_security_group_json(security_group)
self.assertEquals(response.status_int, 400)
def test_create_security_group_non_string_name_json(self):
security_group = {}
security_group['name'] = 12
security_group['description'] = "group-description"
response = _create_security_group_json(security_group)
self.assertEquals(response.status_int, 400)
def test_create_security_group_non_string_description_json(self):
security_group = {}
security_group['name'] = "test"
security_group['description'] = 12
response = _create_security_group_json(security_group)
self.assertEquals(response.status_int, 400)
def test_get_security_group_list(self):
security_group = {}
security_group['name'] = "test"
security_group['description'] = "group-description"
response = _create_security_group_json(security_group)
req = webob.Request.blank('/v1.1/os-security-groups')
req.headers['Content-Type'] = 'application/json'
req.method = 'GET'
response = req.get_response(fakes.wsgi_app())
res_dict = json.loads(response.body)
expected = {'security_groups': [
{'id': 1,
'name':"default",
'tenant_id': "fake",
"description":"default",
"rules": []
},
]
}
expected['security_groups'].append(
{
'id': 2,
'name': "test",
'tenant_id': "fake",
"description": "group-description",
"rules": []
}
)
self.assertEquals(response.status_int, 200)
self.assertEquals(res_dict, expected)
def test_get_security_group_by_id(self):
security_group = {}
security_group['name'] = "test"
security_group['description'] = "group-description"
response = _create_security_group_json(security_group)
res_dict = json.loads(response.body)
req = webob.Request.blank('/v1.1/os-security-groups/%s' %
res_dict['security_group']['id'])
req.headers['Content-Type'] = 'application/json'
req.method = 'GET'
response = req.get_response(fakes.wsgi_app())
res_dict = json.loads(response.body)
expected = {
'security_group': {
'id': 2,
'name': "test",
'tenant_id': "fake",
'description': "group-description",
'rules': []
}
}
self.assertEquals(response.status_int, 200)
self.assertEquals(res_dict, expected)
def test_get_security_group_by_invalid_id(self):
req = webob.Request.blank('/v1.1/os-security-groups/invalid')
req.headers['Content-Type'] = 'application/json'
req.method = 'GET'
response = req.get_response(fakes.wsgi_app())
self.assertEquals(response.status_int, 400)
def test_get_security_group_by_non_existing_id(self):
req = webob.Request.blank('/v1.1/os-security-groups/111111111')
req.headers['Content-Type'] = 'application/json'
req.method = 'GET'
response = req.get_response(fakes.wsgi_app())
self.assertEquals(response.status_int, 404)
def test_delete_security_group_by_id(self):
security_group = {}
security_group['name'] = "test"
security_group['description'] = "group-description"
response = _create_security_group_json(security_group)
security_group = json.loads(response.body)['security_group']
response = self._delete_security_group(security_group['id'])
self.assertEquals(response.status_int, 202)
response = self._delete_security_group(security_group['id'])
self.assertEquals(response.status_int, 404)
def test_delete_security_group_by_invalid_id(self):
response = self._delete_security_group('invalid')
self.assertEquals(response.status_int, 400)
def test_delete_security_group_by_non_existing_id(self):
response = self._delete_security_group(11111111)
self.assertEquals(response.status_int, 404)
class TestSecurityGroupRules(test.TestCase):
def setUp(self):
super(TestSecurityGroupRules, self).setUp()
security_group = {}
security_group['name'] = "authorize-revoke"
security_group['description'] = ("Security group created for "
" authorize-revoke testing")
response = _create_security_group_json(security_group)
security_group = json.loads(response.body)
self.parent_security_group = security_group['security_group']
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "22",
"parent_group_id": self.parent_security_group['id'],
"cidr": "10.0.0.0/24"
}
}
res = self._create_security_group_rule_json(rules)
self.assertEquals(res.status_int, 200)
self.security_group_rule = json.loads(res.body)['security_group_rule']
def tearDown(self):
super(TestSecurityGroupRules, self).tearDown()
def _create_security_group_rule_json(self, rules):
request = webob.Request.blank('/v1.1/os-security-group-rules')
request.headers['Content-Type'] = 'application/json'
request.method = 'POST'
request.body = json.dumps(rules)
response = request.get_response(fakes.wsgi_app())
return response
def _delete_security_group_rule(self, id):
request = webob.Request.blank('/v1.1/os-security-group-rules/%s'
% id)
request.method = 'DELETE'
response = request.get_response(fakes.wsgi_app())
return response
def test_create_by_cidr_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "22",
"parent_group_id": 2,
"cidr": "10.2.3.124/24"
}
}
response = self._create_security_group_rule_json(rules)
security_group_rule = json.loads(response.body)['security_group_rule']
self.assertEquals(response.status_int, 200)
self.assertNotEquals(security_group_rule['id'], 0)
self.assertEquals(security_group_rule['parent_group_id'], 2)
self.assertEquals(security_group_rule['ip_range']['cidr'],
"10.2.3.124/24")
def test_create_by_group_id_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "22",
"group_id": "1",
"parent_group_id": "%s"
% self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 200)
security_group_rule = json.loads(response.body)['security_group_rule']
self.assertNotEquals(security_group_rule['id'], 0)
self.assertEquals(security_group_rule['parent_group_id'], 2)
def test_create_add_existing_rules_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "22",
"cidr": "10.0.0.0/24",
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_create_with_no_body_json(self):
request = webob.Request.blank('/v1.1/os-security-group-rules')
request.headers['Content-Type'] = 'application/json'
request.method = 'POST'
request.body = json.dumps(None)
response = request.get_response(fakes.wsgi_app())
self.assertEquals(response.status_int, 422)
def test_create_with_no_security_group_rule_in_body_json(self):
request = webob.Request.blank('/v1.1/os-security-group-rules')
request.headers['Content-Type'] = 'application/json'
request.method = 'POST'
body_dict = {'test': "test"}
request.body = json.dumps(body_dict)
response = request.get_response(fakes.wsgi_app())
self.assertEquals(response.status_int, 422)
def test_create_with_invalid_parent_group_id_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "22",
"parent_group_id": "invalid"
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_create_with_non_existing_parent_group_id_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "22",
"group_id": "invalid",
"parent_group_id": "1111111111111"
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 404)
def test_create_with_invalid_protocol_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "invalid-protocol",
"from_port": "22",
"to_port": "22",
"cidr": "10.2.2.0/24",
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_create_with_no_protocol_json(self):
rules = {
"security_group_rule": {
"from_port": "22",
"to_port": "22",
"cidr": "10.2.2.0/24",
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_create_with_invalid_from_port_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "666666",
"to_port": "22",
"cidr": "10.2.2.0/24",
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_create_with_invalid_to_port_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "666666",
"cidr": "10.2.2.0/24",
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_create_with_non_numerical_from_port_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "invalid",
"to_port": "22",
"cidr": "10.2.2.0/24",
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_create_with_non_numerical_to_port_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "invalid",
"cidr": "10.2.2.0/24",
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_create_with_no_to_port_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"cidr": "10.2.2.0/24",
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_create_with_invalid_cidr_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "22",
"cidr": "10.2.22222.0/24",
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_create_with_no_cidr_group_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "22",
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
security_group_rule = json.loads(response.body)['security_group_rule']
self.assertEquals(response.status_int, 200)
self.assertNotEquals(security_group_rule['id'], 0)
self.assertEquals(security_group_rule['parent_group_id'],
self.parent_security_group['id'])
self.assertEquals(security_group_rule['ip_range']['cidr'],
"0.0.0.0/0")
def test_create_with_invalid_group_id_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "22",
"group_id": "invalid",
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_create_with_empty_group_id_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "22",
"group_id": "invalid",
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_create_with_invalid_group_id_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "22",
"group_id": "222222",
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_create_rule_with_same_group_parent_id_json(self):
rules = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": "22",
"to_port": "22",
"group_id": "%s" % self.parent_security_group['id'],
"parent_group_id": "%s" % self.parent_security_group['id'],
}
}
response = self._create_security_group_rule_json(rules)
self.assertEquals(response.status_int, 400)
def test_delete(self):
response = self._delete_security_group_rule(
self.security_group_rule['id'])
self.assertEquals(response.status_int, 202)
response = self._delete_security_group_rule(
self.security_group_rule['id'])
self.assertEquals(response.status_int, 404)
def test_delete_invalid_rule_id(self):
response = self._delete_security_group_rule('invalid')
self.assertEquals(response.status_int, 400)
def test_delete_non_existing_rule_id(self):
response = self._delete_security_group_rule(22222222222222)
self.assertEquals(response.status_int, 404)
class TestSecurityGroupRulesXMLDeserializer(unittest.TestCase):
def setUp(self):
self.deserializer = security_groups.SecurityGroupRulesXMLDeserializer()
def test_create_request(self):
serial_request = """
<security_group_rule>
<parent_group_id>12</parent_group_id>
<from_port>22</from_port>
<to_port>22</to_port>
<group_id></group_id>
<ip_protocol>tcp</ip_protocol>
<cidr>10.0.0.0/24</cidr>
</security_group_rule>"""
request = self.deserializer.deserialize(serial_request, 'create')
expected = {
"security_group_rule": {
"parent_group_id": "12",
"from_port": "22",
"to_port": "22",
"ip_protocol": "tcp",
"group_id": "",
"cidr": "10.0.0.0/24",
},
}
self.assertEquals(request['body'], expected)
def test_create_no_protocol_request(self):
serial_request = """
<security_group_rule>
<parent_group_id>12</parent_group_id>
<from_port>22</from_port>
<to_port>22</to_port>
<group_id></group_id>
<cidr>10.0.0.0/24</cidr>
</security_group_rule>"""
request = self.deserializer.deserialize(serial_request, 'create')
expected = {
"security_group_rule": {
"parent_group_id": "12",
"from_port": "22",
"to_port": "22",
"group_id": "",
"cidr": "10.0.0.0/24",
},
}
self.assertEquals(request['body'], expected)
class TestSecurityGroupXMLDeserializer(unittest.TestCase):
def setUp(self):
self.deserializer = security_groups.SecurityGroupXMLDeserializer()
def test_create_request(self):
serial_request = """
<security_group name="test">
<description>test</description>
</security_group>"""
request = self.deserializer.deserialize(serial_request, 'create')
expected = {
"security_group": {
"name": "test",
"description": "test",
},
}
self.assertEquals(request['body'], expected)
def test_create_no_description_request(self):
serial_request = """
<security_group name="test">
</security_group>"""
request = self.deserializer.deserialize(serial_request, 'create')
expected = {
"security_group": {
"name": "test",
},
}
self.assertEquals(request['body'], expected)
def test_create_no_name_request(self):
serial_request = """
<security_group>
<description>test</description>
</security_group>"""
request = self.deserializer.deserialize(serial_request, 'create')
expected = {
"security_group": {
"description": "test",
},
}
self.assertEquals(request['body'], expected)

View File

@ -97,7 +97,8 @@ class ExtensionControllerTest(test.TestCase):
names = [x['name'] for x in data['extensions']]
names.sort()
self.assertEqual(names, ["FlavorExtraSpecs", "Floating_ips",
"Fox In Socks", "Hosts", "Keypairs", "Multinic", "Volumes"])
"Fox In Socks", "Hosts", "Keypairs", "Multinic", "SecurityGroups",
"Volumes"])
# Make sure that at least Fox in Sox is correct.
(fox_ext,) = [
@ -108,7 +109,7 @@ class ExtensionControllerTest(test.TestCase):
'updated': '2011-01-22T13:25:27-06:00',
'description': 'The Fox In Socks Extension',
'alias': 'FOXNSOX',
'links': [],
'links': []
},
)
@ -144,7 +145,7 @@ class ExtensionControllerTest(test.TestCase):
# Make sure we have all the extensions.
exts = root.findall('{0}extension'.format(NS))
self.assertEqual(len(exts), 7)
self.assertEqual(len(exts), 8)
# Make sure that at least Fox in Sox is correct.
(fox_ext,) = [x for x in exts if x.get('alias') == 'FOXNSOX']