diff --git a/nova/api/openstack/contrib/security_groups.py b/nova/api/openstack/contrib/security_groups.py
new file mode 100644
index 000000000000..6c57fbb5181d
--- /dev/null
+++ b/nova/api/openstack/contrib/security_groups.py
@@ -0,0 +1,466 @@
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""The security groups extension."""
+
+import netaddr
+import urllib
+from webob import exc
+import webob
+
+from nova import compute
+from nova import db
+from nova import exception
+from nova import flags
+from nova import log as logging
+from nova.api.openstack import common
+from nova.api.openstack import extensions
+from nova.api.openstack import wsgi
+
+
+from xml.dom import minidom
+
+
+LOG = logging.getLogger("nova.api.contrib.security_groups")
+FLAGS = flags.FLAGS
+
+
+class SecurityGroupController(object):
+ """The Security group API controller for the OpenStack API."""
+
+ def __init__(self):
+ self.compute_api = compute.API()
+ super(SecurityGroupController, self).__init__()
+
+ def _format_security_group_rule(self, context, rule):
+ sg_rule = {}
+ sg_rule['id'] = rule.id
+ sg_rule['parent_group_id'] = rule.parent_group_id
+ sg_rule['ip_protocol'] = rule.protocol
+ sg_rule['from_port'] = rule.from_port
+ sg_rule['to_port'] = rule.to_port
+ sg_rule['group'] = {}
+ sg_rule['ip_range'] = {}
+ if rule.group_id:
+ source_group = db.security_group_get(context, rule.group_id)
+ sg_rule['group'] = {'name': source_group.name,
+ 'tenant_id': source_group.project_id}
+ else:
+ sg_rule['ip_range'] = {'cidr': rule.cidr}
+ return sg_rule
+
+ def _format_security_group(self, context, group):
+ security_group = {}
+ security_group['id'] = group.id
+ security_group['description'] = group.description
+ security_group['name'] = group.name
+ security_group['tenant_id'] = group.project_id
+ security_group['rules'] = []
+ for rule in group.rules:
+ security_group['rules'] += [self._format_security_group_rule(
+ context, rule)]
+ return security_group
+
+ def show(self, req, id):
+ """Return data about the given security group."""
+ context = req.environ['nova.context']
+ try:
+ id = int(id)
+ security_group = db.security_group_get(context, id)
+ except ValueError:
+ msg = _("Security group id is not integer")
+ return exc.HTTPBadRequest(explanation=msg)
+ except exception.NotFound as exp:
+ return exc.HTTPNotFound(explanation=unicode(exp))
+
+ return {'security_group': self._format_security_group(context,
+ security_group)}
+
+ def delete(self, req, id):
+ """Delete a security group."""
+ context = req.environ['nova.context']
+ try:
+ id = int(id)
+ security_group = db.security_group_get(context, id)
+ except ValueError:
+ msg = _("Security group id is not integer")
+ return exc.HTTPBadRequest(explanation=msg)
+ except exception.SecurityGroupNotFound as exp:
+ return exc.HTTPNotFound(explanation=unicode(exp))
+
+ LOG.audit(_("Delete security group %s"), id, context=context)
+ db.security_group_destroy(context, security_group.id)
+
+ return exc.HTTPAccepted()
+
+ def index(self, req):
+ """Returns a list of security groups"""
+ context = req.environ['nova.context']
+
+ self.compute_api.ensure_default_security_group(context)
+ groups = db.security_group_get_by_project(context,
+ context.project_id)
+ limited_list = common.limited(groups, req)
+ result = [self._format_security_group(context, group)
+ for group in limited_list]
+
+ return {'security_groups':
+ list(sorted(result,
+ key=lambda k: (k['tenant_id'], k['name'])))}
+
+ def create(self, req, body):
+ """Creates a new security group."""
+ context = req.environ['nova.context']
+ if not body:
+ return exc.HTTPUnprocessableEntity()
+
+ security_group = body.get('security_group', None)
+
+ if security_group is None:
+ return exc.HTTPUnprocessableEntity()
+
+ group_name = security_group.get('name', None)
+ group_description = security_group.get('description', None)
+
+ self._validate_security_group_property(group_name, "name")
+ self._validate_security_group_property(group_description,
+ "description")
+ group_name = group_name.strip()
+ group_description = group_description.strip()
+
+ LOG.audit(_("Create Security Group %s"), group_name, context=context)
+ self.compute_api.ensure_default_security_group(context)
+ if db.security_group_exists(context, context.project_id, group_name):
+ msg = _('Security group %s already exists') % group_name
+ raise exc.HTTPBadRequest(explanation=msg)
+
+ group = {'user_id': context.user_id,
+ 'project_id': context.project_id,
+ 'name': group_name,
+ 'description': group_description}
+ group_ref = db.security_group_create(context, group)
+
+ return {'security_group': self._format_security_group(context,
+ group_ref)}
+
+ def _validate_security_group_property(self, value, typ):
+ """ typ will be either 'name' or 'description',
+ depending on the caller
+ """
+ try:
+ val = value.strip()
+ except AttributeError:
+ msg = _("Security group %s is not a string or unicode") % typ
+ raise exc.HTTPBadRequest(explanation=msg)
+ if not val:
+ msg = _("Security group %s cannot be empty.") % typ
+ raise exc.HTTPBadRequest(explanation=msg)
+ if len(val) > 255:
+ msg = _("Security group %s should not be greater "
+ "than 255 characters.") % typ
+ raise exc.HTTPBadRequest(explanation=msg)
+
+
+class SecurityGroupRulesController(SecurityGroupController):
+
+ def create(self, req, body):
+ context = req.environ['nova.context']
+
+ if not body:
+ raise exc.HTTPUnprocessableEntity()
+
+ if not 'security_group_rule' in body:
+ raise exc.HTTPUnprocessableEntity()
+
+ self.compute_api.ensure_default_security_group(context)
+
+ sg_rule = body['security_group_rule']
+ parent_group_id = sg_rule.get('parent_group_id', None)
+ try:
+ parent_group_id = int(parent_group_id)
+ security_group = db.security_group_get(context, parent_group_id)
+ except ValueError:
+ msg = _("Parent group id is not integer")
+ return exc.HTTPBadRequest(explanation=msg)
+ except exception.NotFound as exp:
+ msg = _("Security group (%s) not found") % parent_group_id
+ return exc.HTTPNotFound(explanation=msg)
+
+ msg = _("Authorize security group ingress %s")
+ LOG.audit(msg, security_group['name'], context=context)
+
+ try:
+ values = self._rule_args_to_dict(context,
+ to_port=sg_rule.get('to_port'),
+ from_port=sg_rule.get('from_port'),
+ parent_group_id=sg_rule.get('parent_group_id'),
+ ip_protocol=sg_rule.get('ip_protocol'),
+ cidr=sg_rule.get('cidr'),
+ group_id=sg_rule.get('group_id'))
+ except Exception as exp:
+ raise exc.HTTPBadRequest(explanation=unicode(exp))
+
+ if values is None:
+ msg = _("Not enough parameters to build a "
+ "valid rule.")
+ raise exc.HTTPBadRequest(explanation=msg)
+
+ values['parent_group_id'] = security_group.id
+
+ if self._security_group_rule_exists(security_group, values):
+ msg = _('This rule already exists in group %s') % parent_group_id
+ raise exc.HTTPBadRequest(explanation=msg)
+
+ security_group_rule = db.security_group_rule_create(context, values)
+
+ self.compute_api.trigger_security_group_rules_refresh(context,
+ security_group_id=security_group['id'])
+
+ return {'security_group_rule': self._format_security_group_rule(
+ context,
+ security_group_rule)}
+
+ def _security_group_rule_exists(self, security_group, values):
+ """Indicates whether the specified rule values are already
+ defined in the given security group.
+ """
+ for rule in security_group.rules:
+ if 'group_id' in values:
+ if rule['group_id'] == values['group_id']:
+ return True
+ else:
+ is_duplicate = True
+ for key in ('cidr', 'from_port', 'to_port', 'protocol'):
+ if rule[key] != values[key]:
+ is_duplicate = False
+ break
+ if is_duplicate:
+ return True
+ return False
+
+ def _rule_args_to_dict(self, context, to_port=None, from_port=None,
+ parent_group_id=None, ip_protocol=None,
+ cidr=None, group_id=None):
+ values = {}
+
+ if group_id:
+ try:
+ parent_group_id = int(parent_group_id)
+ group_id = int(group_id)
+ except ValueError:
+ msg = _("Parent or group id is not integer")
+ raise exception.InvalidInput(reason=msg)
+
+ if parent_group_id == group_id:
+ msg = _("Parent group id and group id cannot be same")
+ raise exception.InvalidInput(reason=msg)
+
+ values['group_id'] = group_id
+ #check if groupId exists
+ db.security_group_get(context, group_id)
+ elif cidr:
+ # If this fails, it throws an exception. This is what we want.
+ try:
+ cidr = urllib.unquote(cidr).decode()
+ netaddr.IPNetwork(cidr)
+ except Exception:
+ raise exception.InvalidCidr(cidr=cidr)
+ values['cidr'] = cidr
+ else:
+ values['cidr'] = '0.0.0.0/0'
+
+ if ip_protocol and from_port and to_port:
+
+ try:
+ from_port = int(from_port)
+ to_port = int(to_port)
+ except ValueError:
+ raise exception.InvalidPortRange(from_port=from_port,
+ to_port=to_port)
+ ip_protocol = str(ip_protocol)
+ if ip_protocol.upper() not in ['TCP', 'UDP', 'ICMP']:
+ raise exception.InvalidIpProtocol(protocol=ip_protocol)
+ if ((min(from_port, to_port) < -1) or
+ (max(from_port, to_port) > 65535)):
+ raise exception.InvalidPortRange(from_port=from_port,
+ to_port=to_port)
+
+ values['protocol'] = ip_protocol
+ values['from_port'] = from_port
+ values['to_port'] = to_port
+ else:
+ # If cidr based filtering, protocol and ports are mandatory
+ if 'cidr' in values:
+ return None
+
+ return values
+
+ def delete(self, req, id):
+ context = req.environ['nova.context']
+
+ self.compute_api.ensure_default_security_group(context)
+ try:
+ id = int(id)
+ rule = db.security_group_rule_get(context, id)
+ except ValueError:
+ msg = _("Rule id is not integer")
+ return exc.HTTPBadRequest(explanation=msg)
+ except exception.NotFound as exp:
+ msg = _("Rule (%s) not found") % id
+ return exc.HTTPNotFound(explanation=msg)
+
+ group_id = rule.parent_group_id
+ self.compute_api.ensure_default_security_group(context)
+ security_group = db.security_group_get(context, group_id)
+
+ msg = _("Revoke security group ingress %s")
+ LOG.audit(msg, security_group['name'], context=context)
+
+ db.security_group_rule_destroy(context, rule['id'])
+ self.compute_api.trigger_security_group_rules_refresh(context,
+ security_group_id=security_group['id'])
+
+ return exc.HTTPAccepted()
+
+
+class Security_groups(extensions.ExtensionDescriptor):
+ def get_name(self):
+ return "SecurityGroups"
+
+ def get_alias(self):
+ return "security_groups"
+
+ def get_description(self):
+ return "Security group support"
+
+ def get_namespace(self):
+ return "http://docs.openstack.org/ext/securitygroups/api/v1.1"
+
+ def get_updated(self):
+ return "2011-07-21T00:00:00+00:00"
+
+ def get_resources(self):
+ resources = []
+
+ metadata = _get_metadata()
+ body_serializers = {
+ 'application/xml': wsgi.XMLDictSerializer(metadata=metadata,
+ xmlns=wsgi.XMLNS_V11),
+ }
+ serializer = wsgi.ResponseSerializer(body_serializers, None)
+
+ body_deserializers = {
+ 'application/xml': SecurityGroupXMLDeserializer(),
+ }
+ deserializer = wsgi.RequestDeserializer(body_deserializers)
+
+ res = extensions.ResourceExtension('os-security-groups',
+ controller=SecurityGroupController(),
+ deserializer=deserializer,
+ serializer=serializer)
+
+ resources.append(res)
+
+ body_deserializers = {
+ 'application/xml': SecurityGroupRulesXMLDeserializer(),
+ }
+ deserializer = wsgi.RequestDeserializer(body_deserializers)
+
+ res = extensions.ResourceExtension('os-security-group-rules',
+ controller=SecurityGroupRulesController(),
+ deserializer=deserializer,
+ serializer=serializer)
+ resources.append(res)
+ return resources
+
+
+class SecurityGroupXMLDeserializer(wsgi.MetadataXMLDeserializer):
+ """
+ Deserializer to handle xml-formatted security group requests.
+ """
+ def create(self, string):
+ """Deserialize an xml-formatted security group create request"""
+ dom = minidom.parseString(string)
+ security_group = {}
+ sg_node = self.find_first_child_named(dom,
+ 'security_group')
+ if sg_node is not None:
+ if sg_node.hasAttribute('name'):
+ security_group['name'] = sg_node.getAttribute('name')
+ desc_node = self.find_first_child_named(sg_node,
+ "description")
+ if desc_node:
+ security_group['description'] = self.extract_text(desc_node)
+ return {'body': {'security_group': security_group}}
+
+
+class SecurityGroupRulesXMLDeserializer(wsgi.MetadataXMLDeserializer):
+ """
+ Deserializer to handle xml-formatted security group requests.
+ """
+
+ def create(self, string):
+ """Deserialize an xml-formatted security group create request"""
+ dom = minidom.parseString(string)
+ security_group_rule = self._extract_security_group_rule(dom)
+ return {'body': {'security_group_rule': security_group_rule}}
+
+ def _extract_security_group_rule(self, node):
+ """Marshal the security group rule attribute of a parsed request"""
+ sg_rule = {}
+ sg_rule_node = self.find_first_child_named(node,
+ 'security_group_rule')
+ if sg_rule_node is not None:
+ ip_protocol_node = self.find_first_child_named(sg_rule_node,
+ "ip_protocol")
+ if ip_protocol_node is not None:
+ sg_rule['ip_protocol'] = self.extract_text(ip_protocol_node)
+
+ from_port_node = self.find_first_child_named(sg_rule_node,
+ "from_port")
+ if from_port_node is not None:
+ sg_rule['from_port'] = self.extract_text(from_port_node)
+
+ to_port_node = self.find_first_child_named(sg_rule_node, "to_port")
+ if to_port_node is not None:
+ sg_rule['to_port'] = self.extract_text(to_port_node)
+
+ parent_group_id_node = self.find_first_child_named(sg_rule_node,
+ "parent_group_id")
+ if parent_group_id_node is not None:
+ sg_rule['parent_group_id'] = self.extract_text(
+ parent_group_id_node)
+
+ group_id_node = self.find_first_child_named(sg_rule_node,
+ "group_id")
+ if group_id_node is not None:
+ sg_rule['group_id'] = self.extract_text(group_id_node)
+
+ cidr_node = self.find_first_child_named(sg_rule_node, "cidr")
+ if cidr_node is not None:
+ sg_rule['cidr'] = self.extract_text(cidr_node)
+
+ return sg_rule
+
+
+def _get_metadata():
+ metadata = {
+ "attributes": {
+ "security_group": ["id", "tenant_id", "name"],
+ "rule": ["id", "parent_group_id"],
+ "security_group_rule": ["id", "parent_group_id"],
+ }
+ }
+ return metadata
diff --git a/nova/api/openstack/extensions.py b/nova/api/openstack/extensions.py
index b1538950fc29..bb407a04541b 100644
--- a/nova/api/openstack/extensions.py
+++ b/nova/api/openstack/extensions.py
@@ -266,9 +266,13 @@ class ExtensionMiddleware(base_wsgi.Middleware):
for resource in ext_mgr.get_resources():
LOG.debug(_('Extended resource: %s'),
resource.collection)
+ if resource.serializer is None:
+ resource.serializer = serializer
+
mapper.resource(resource.collection, resource.collection,
controller=wsgi.Resource(
- resource.controller, serializer=serializer),
+ resource.controller, resource.deserializer,
+ resource.serializer),
collection=resource.collection_actions,
member=resource.member_actions,
parent_resource=resource.parent)
@@ -461,7 +465,8 @@ class ResourceExtension(object):
"""Add top level resources to the OpenStack API in nova."""
def __init__(self, collection, controller, parent=None,
- collection_actions=None, member_actions=None):
+ collection_actions=None, member_actions=None,
+ deserializer=None, serializer=None):
if not collection_actions:
collection_actions = {}
if not member_actions:
@@ -471,6 +476,8 @@ class ResourceExtension(object):
self.parent = parent
self.collection_actions = collection_actions
self.member_actions = member_actions
+ self.deserializer = deserializer
+ self.serializer = serializer
class ExtensionsXMLSerializer(wsgi.XMLDictSerializer):
diff --git a/nova/db/api.py b/nova/db/api.py
index 3d0727f8b9c4..0f22187523c3 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -1102,6 +1102,11 @@ def security_group_rule_destroy(context, security_group_rule_id):
return IMPL.security_group_rule_destroy(context, security_group_rule_id)
+def security_group_rule_get(context, security_group_rule_id):
+ """Gets a security group rule."""
+ return IMPL.security_group_rule_get(context, security_group_rule_id)
+
+
###################
diff --git a/nova/exception.py b/nova/exception.py
index 0d60cb0bfbab..a5a25086ea9e 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -209,6 +209,10 @@ class InvalidContentType(Invalid):
message = _("Invalid content type %(content_type)s.")
+class InvalidCidr(Invalid):
+ message = _("Invalid cidr %(cidr)s.")
+
+
# Cannot be templated as the error syntax varies.
# msg needs to be constructed when raised.
class InvalidParameterValue(Invalid):
diff --git a/nova/tests/api/openstack/contrib/test_security_groups.py b/nova/tests/api/openstack/contrib/test_security_groups.py
new file mode 100644
index 000000000000..4317880cac64
--- /dev/null
+++ b/nova/tests/api/openstack/contrib/test_security_groups.py
@@ -0,0 +1,761 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import unittest
+import webob
+from xml.dom import minidom
+
+from nova import test
+from nova.api.openstack.contrib import security_groups
+from nova.tests.api.openstack import fakes
+
+
+def _get_create_request_json(body_dict):
+ req = webob.Request.blank('/v1.1/os-security-groups')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'POST'
+ req.body = json.dumps(body_dict)
+ return req
+
+
+def _create_security_group_json(security_group):
+ body_dict = _create_security_group_request_dict(security_group)
+ request = _get_create_request_json(body_dict)
+ response = request.get_response(fakes.wsgi_app())
+ return response
+
+
+def _create_security_group_request_dict(security_group):
+ sg = {}
+ if security_group is not None:
+ name = security_group.get('name', None)
+ description = security_group.get('description', None)
+ if name:
+ sg['name'] = security_group['name']
+ if description:
+ sg['description'] = security_group['description']
+ return {'security_group': sg}
+
+
+class TestSecurityGroups(test.TestCase):
+ def setUp(self):
+ super(TestSecurityGroups, self).setUp()
+
+ def tearDown(self):
+ super(TestSecurityGroups, self).tearDown()
+
+ def _create_security_group_request_dict(self, security_group):
+ sg = {}
+ if security_group is not None:
+ name = security_group.get('name', None)
+ description = security_group.get('description', None)
+ if name:
+ sg['name'] = security_group['name']
+ if description:
+ sg['description'] = security_group['description']
+ return {'security_group': sg}
+
+ def _format_create_xml_request_body(self, body_dict):
+ sg = body_dict['security_group']
+ body_parts = []
+ body_parts.extend([
+ '',
+ '' % (sg['name'])])
+ if 'description' in sg:
+ body_parts.append('%s'
+ % sg['description'])
+ body_parts.append('')
+ return ''.join(body_parts)
+
+ def _get_create_request_xml(self, body_dict):
+ req = webob.Request.blank('/v1.1/os-security-groups')
+ req.headers['Content-Type'] = 'application/xml'
+ req.content_type = 'application/xml'
+ req.accept = 'application/xml'
+ req.method = 'POST'
+ req.body = self._format_create_xml_request_body(body_dict)
+ return req
+
+ def _create_security_group_xml(self, security_group):
+ body_dict = self._create_security_group_request_dict(security_group)
+ request = self._get_create_request_xml(body_dict)
+ response = request.get_response(fakes.wsgi_app())
+ return response
+
+ def _delete_security_group(self, id):
+ request = webob.Request.blank('/v1.1/os-security-groups/%s'
+ % id)
+ request.method = 'DELETE'
+ response = request.get_response(fakes.wsgi_app())
+ return response
+
+ def test_create_security_group_json(self):
+ security_group = {}
+ security_group['name'] = "test"
+ security_group['description'] = "group-description"
+ response = _create_security_group_json(security_group)
+ res_dict = json.loads(response.body)
+ self.assertEqual(res_dict['security_group']['name'], "test")
+ self.assertEqual(res_dict['security_group']['description'],
+ "group-description")
+ self.assertEquals(response.status_int, 200)
+
+ def test_create_security_group_xml(self):
+ security_group = {}
+ security_group['name'] = "test"
+ security_group['description'] = "group-description"
+ response = \
+ self._create_security_group_xml(security_group)
+
+ self.assertEquals(response.status_int, 200)
+ dom = minidom.parseString(response.body)
+ sg = dom.childNodes[0]
+ self.assertEquals(sg.nodeName, 'security_group')
+ self.assertEqual(security_group['name'], sg.getAttribute('name'))
+
+ def test_create_security_group_with_no_name_json(self):
+ security_group = {}
+ security_group['description'] = "group-description"
+ response = _create_security_group_json(security_group)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_security_group_with_no_description_json(self):
+ security_group = {}
+ security_group['name'] = "test"
+ response = _create_security_group_json(security_group)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_security_group_with_blank_name_json(self):
+ security_group = {}
+ security_group['name'] = ""
+ security_group['description'] = "group-description"
+ response = _create_security_group_json(security_group)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_security_group_with_whitespace_name_json(self):
+ security_group = {}
+ security_group['name'] = " "
+ security_group['description'] = "group-description"
+ response = _create_security_group_json(security_group)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_security_group_with_blank_description_json(self):
+ security_group = {}
+ security_group['name'] = "test"
+ security_group['description'] = ""
+ response = _create_security_group_json(security_group)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_security_group_with_whitespace_description_json(self):
+ security_group = {}
+ security_group['name'] = "name"
+ security_group['description'] = " "
+ response = _create_security_group_json(security_group)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_security_group_with_duplicate_name_json(self):
+ security_group = {}
+ security_group['name'] = "test"
+ security_group['description'] = "group-description"
+ response = _create_security_group_json(security_group)
+
+ self.assertEquals(response.status_int, 200)
+ response = _create_security_group_json(security_group)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_security_group_with_no_body_json(self):
+ request = _get_create_request_json(body_dict=None)
+ response = request.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 422)
+
+ def test_create_security_group_with_no_security_group(self):
+ body_dict = {}
+ body_dict['no-securityGroup'] = None
+ request = _get_create_request_json(body_dict)
+ response = request.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 422)
+
+ def test_create_security_group_above_255_characters_name_json(self):
+ security_group = {}
+ security_group['name'] = ("1234567890123456"
+ "1234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890")
+ security_group['description'] = "group-description"
+ response = _create_security_group_json(security_group)
+
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_security_group_above_255_characters_description_json(self):
+ security_group = {}
+ security_group['name'] = "test"
+ security_group['description'] = ("1234567890123456"
+ "1234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890")
+ response = _create_security_group_json(security_group)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_security_group_non_string_name_json(self):
+ security_group = {}
+ security_group['name'] = 12
+ security_group['description'] = "group-description"
+ response = _create_security_group_json(security_group)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_security_group_non_string_description_json(self):
+ security_group = {}
+ security_group['name'] = "test"
+ security_group['description'] = 12
+ response = _create_security_group_json(security_group)
+ self.assertEquals(response.status_int, 400)
+
+ def test_get_security_group_list(self):
+ security_group = {}
+ security_group['name'] = "test"
+ security_group['description'] = "group-description"
+ response = _create_security_group_json(security_group)
+
+ req = webob.Request.blank('/v1.1/os-security-groups')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'GET'
+ response = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(response.body)
+
+ expected = {'security_groups': [
+ {'id': 1,
+ 'name':"default",
+ 'tenant_id': "fake",
+ "description":"default",
+ "rules": []
+ },
+ ]
+ }
+ expected['security_groups'].append(
+ {
+ 'id': 2,
+ 'name': "test",
+ 'tenant_id': "fake",
+ "description": "group-description",
+ "rules": []
+ }
+ )
+ self.assertEquals(response.status_int, 200)
+ self.assertEquals(res_dict, expected)
+
+ def test_get_security_group_by_id(self):
+ security_group = {}
+ security_group['name'] = "test"
+ security_group['description'] = "group-description"
+ response = _create_security_group_json(security_group)
+
+ res_dict = json.loads(response.body)
+ req = webob.Request.blank('/v1.1/os-security-groups/%s' %
+ res_dict['security_group']['id'])
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'GET'
+ response = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(response.body)
+
+ expected = {
+ 'security_group': {
+ 'id': 2,
+ 'name': "test",
+ 'tenant_id': "fake",
+ 'description': "group-description",
+ 'rules': []
+ }
+ }
+ self.assertEquals(response.status_int, 200)
+ self.assertEquals(res_dict, expected)
+
+ def test_get_security_group_by_invalid_id(self):
+ req = webob.Request.blank('/v1.1/os-security-groups/invalid')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'GET'
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 400)
+
+ def test_get_security_group_by_non_existing_id(self):
+ req = webob.Request.blank('/v1.1/os-security-groups/111111111')
+ req.headers['Content-Type'] = 'application/json'
+ req.method = 'GET'
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 404)
+
+ def test_delete_security_group_by_id(self):
+ security_group = {}
+ security_group['name'] = "test"
+ security_group['description'] = "group-description"
+ response = _create_security_group_json(security_group)
+ security_group = json.loads(response.body)['security_group']
+ response = self._delete_security_group(security_group['id'])
+ self.assertEquals(response.status_int, 202)
+
+ response = self._delete_security_group(security_group['id'])
+ self.assertEquals(response.status_int, 404)
+
+ def test_delete_security_group_by_invalid_id(self):
+ response = self._delete_security_group('invalid')
+ self.assertEquals(response.status_int, 400)
+
+ def test_delete_security_group_by_non_existing_id(self):
+ response = self._delete_security_group(11111111)
+ self.assertEquals(response.status_int, 404)
+
+
+class TestSecurityGroupRules(test.TestCase):
+ def setUp(self):
+ super(TestSecurityGroupRules, self).setUp()
+ security_group = {}
+ security_group['name'] = "authorize-revoke"
+ security_group['description'] = ("Security group created for "
+ " authorize-revoke testing")
+ response = _create_security_group_json(security_group)
+ security_group = json.loads(response.body)
+ self.parent_security_group = security_group['security_group']
+
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "22",
+ "parent_group_id": self.parent_security_group['id'],
+ "cidr": "10.0.0.0/24"
+ }
+ }
+ res = self._create_security_group_rule_json(rules)
+ self.assertEquals(res.status_int, 200)
+ self.security_group_rule = json.loads(res.body)['security_group_rule']
+
+ def tearDown(self):
+ super(TestSecurityGroupRules, self).tearDown()
+
+ def _create_security_group_rule_json(self, rules):
+ request = webob.Request.blank('/v1.1/os-security-group-rules')
+ request.headers['Content-Type'] = 'application/json'
+ request.method = 'POST'
+ request.body = json.dumps(rules)
+ response = request.get_response(fakes.wsgi_app())
+ return response
+
+ def _delete_security_group_rule(self, id):
+ request = webob.Request.blank('/v1.1/os-security-group-rules/%s'
+ % id)
+ request.method = 'DELETE'
+ response = request.get_response(fakes.wsgi_app())
+ return response
+
+ def test_create_by_cidr_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "22",
+ "parent_group_id": 2,
+ "cidr": "10.2.3.124/24"
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ security_group_rule = json.loads(response.body)['security_group_rule']
+ self.assertEquals(response.status_int, 200)
+ self.assertNotEquals(security_group_rule['id'], 0)
+ self.assertEquals(security_group_rule['parent_group_id'], 2)
+ self.assertEquals(security_group_rule['ip_range']['cidr'],
+ "10.2.3.124/24")
+
+ def test_create_by_group_id_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "22",
+ "group_id": "1",
+ "parent_group_id": "%s"
+ % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 200)
+ security_group_rule = json.loads(response.body)['security_group_rule']
+ self.assertNotEquals(security_group_rule['id'], 0)
+ self.assertEquals(security_group_rule['parent_group_id'], 2)
+
+ def test_create_add_existing_rules_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "22",
+ "cidr": "10.0.0.0/24",
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_with_no_body_json(self):
+ request = webob.Request.blank('/v1.1/os-security-group-rules')
+ request.headers['Content-Type'] = 'application/json'
+ request.method = 'POST'
+ request.body = json.dumps(None)
+ response = request.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 422)
+
+ def test_create_with_no_security_group_rule_in_body_json(self):
+ request = webob.Request.blank('/v1.1/os-security-group-rules')
+ request.headers['Content-Type'] = 'application/json'
+ request.method = 'POST'
+ body_dict = {'test': "test"}
+ request.body = json.dumps(body_dict)
+ response = request.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 422)
+
+ def test_create_with_invalid_parent_group_id_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "22",
+ "parent_group_id": "invalid"
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_with_non_existing_parent_group_id_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "22",
+ "group_id": "invalid",
+ "parent_group_id": "1111111111111"
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 404)
+
+ def test_create_with_invalid_protocol_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "invalid-protocol",
+ "from_port": "22",
+ "to_port": "22",
+ "cidr": "10.2.2.0/24",
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_with_no_protocol_json(self):
+ rules = {
+ "security_group_rule": {
+ "from_port": "22",
+ "to_port": "22",
+ "cidr": "10.2.2.0/24",
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_with_invalid_from_port_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "666666",
+ "to_port": "22",
+ "cidr": "10.2.2.0/24",
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_with_invalid_to_port_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "666666",
+ "cidr": "10.2.2.0/24",
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_with_non_numerical_from_port_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "invalid",
+ "to_port": "22",
+ "cidr": "10.2.2.0/24",
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_with_non_numerical_to_port_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "invalid",
+ "cidr": "10.2.2.0/24",
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_with_no_to_port_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "cidr": "10.2.2.0/24",
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_with_invalid_cidr_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "22",
+ "cidr": "10.2.22222.0/24",
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_with_no_cidr_group_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "22",
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ security_group_rule = json.loads(response.body)['security_group_rule']
+ self.assertEquals(response.status_int, 200)
+ self.assertNotEquals(security_group_rule['id'], 0)
+ self.assertEquals(security_group_rule['parent_group_id'],
+ self.parent_security_group['id'])
+ self.assertEquals(security_group_rule['ip_range']['cidr'],
+ "0.0.0.0/0")
+
+ def test_create_with_invalid_group_id_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "22",
+ "group_id": "invalid",
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_with_empty_group_id_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "22",
+ "group_id": "invalid",
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_with_invalid_group_id_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "22",
+ "group_id": "222222",
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_create_rule_with_same_group_parent_id_json(self):
+ rules = {
+ "security_group_rule": {
+ "ip_protocol": "tcp",
+ "from_port": "22",
+ "to_port": "22",
+ "group_id": "%s" % self.parent_security_group['id'],
+ "parent_group_id": "%s" % self.parent_security_group['id'],
+ }
+ }
+
+ response = self._create_security_group_rule_json(rules)
+ self.assertEquals(response.status_int, 400)
+
+ def test_delete(self):
+ response = self._delete_security_group_rule(
+ self.security_group_rule['id'])
+ self.assertEquals(response.status_int, 202)
+
+ response = self._delete_security_group_rule(
+ self.security_group_rule['id'])
+ self.assertEquals(response.status_int, 404)
+
+ def test_delete_invalid_rule_id(self):
+ response = self._delete_security_group_rule('invalid')
+ self.assertEquals(response.status_int, 400)
+
+ def test_delete_non_existing_rule_id(self):
+ response = self._delete_security_group_rule(22222222222222)
+ self.assertEquals(response.status_int, 404)
+
+
+class TestSecurityGroupRulesXMLDeserializer(unittest.TestCase):
+
+ def setUp(self):
+ self.deserializer = security_groups.SecurityGroupRulesXMLDeserializer()
+
+ def test_create_request(self):
+ serial_request = """
+
+ 12
+ 22
+ 22
+
+ tcp
+ 10.0.0.0/24
+"""
+ request = self.deserializer.deserialize(serial_request, 'create')
+ expected = {
+ "security_group_rule": {
+ "parent_group_id": "12",
+ "from_port": "22",
+ "to_port": "22",
+ "ip_protocol": "tcp",
+ "group_id": "",
+ "cidr": "10.0.0.0/24",
+ },
+ }
+ self.assertEquals(request['body'], expected)
+
+ def test_create_no_protocol_request(self):
+ serial_request = """
+
+ 12
+ 22
+ 22
+
+ 10.0.0.0/24
+"""
+ request = self.deserializer.deserialize(serial_request, 'create')
+ expected = {
+ "security_group_rule": {
+ "parent_group_id": "12",
+ "from_port": "22",
+ "to_port": "22",
+ "group_id": "",
+ "cidr": "10.0.0.0/24",
+ },
+ }
+ self.assertEquals(request['body'], expected)
+
+
+class TestSecurityGroupXMLDeserializer(unittest.TestCase):
+
+ def setUp(self):
+ self.deserializer = security_groups.SecurityGroupXMLDeserializer()
+
+ def test_create_request(self):
+ serial_request = """
+
+ test
+"""
+ request = self.deserializer.deserialize(serial_request, 'create')
+ expected = {
+ "security_group": {
+ "name": "test",
+ "description": "test",
+ },
+ }
+ self.assertEquals(request['body'], expected)
+
+ def test_create_no_description_request(self):
+ serial_request = """
+
+"""
+ request = self.deserializer.deserialize(serial_request, 'create')
+ expected = {
+ "security_group": {
+ "name": "test",
+ },
+ }
+ self.assertEquals(request['body'], expected)
+
+ def test_create_no_name_request(self):
+ serial_request = """
+
+test
+"""
+ request = self.deserializer.deserialize(serial_request, 'create')
+ expected = {
+ "security_group": {
+ "description": "test",
+ },
+ }
+ self.assertEquals(request['body'], expected)
diff --git a/nova/tests/api/openstack/test_extensions.py b/nova/tests/api/openstack/test_extensions.py
index 8b7e11a5b48c..ea8fe68a7964 100644
--- a/nova/tests/api/openstack/test_extensions.py
+++ b/nova/tests/api/openstack/test_extensions.py
@@ -97,7 +97,8 @@ class ExtensionControllerTest(test.TestCase):
names = [x['name'] for x in data['extensions']]
names.sort()
self.assertEqual(names, ["FlavorExtraSpecs", "Floating_ips",
- "Fox In Socks", "Hosts", "Keypairs", "Multinic", "Volumes"])
+ "Fox In Socks", "Hosts", "Keypairs", "Multinic", "SecurityGroups",
+ "Volumes"])
# Make sure that at least Fox in Sox is correct.
(fox_ext,) = [
@@ -108,7 +109,7 @@ class ExtensionControllerTest(test.TestCase):
'updated': '2011-01-22T13:25:27-06:00',
'description': 'The Fox In Socks Extension',
'alias': 'FOXNSOX',
- 'links': [],
+ 'links': []
},
)
@@ -144,7 +145,7 @@ class ExtensionControllerTest(test.TestCase):
# Make sure we have all the extensions.
exts = root.findall('{0}extension'.format(NS))
- self.assertEqual(len(exts), 7)
+ self.assertEqual(len(exts), 8)
# Make sure that at least Fox in Sox is correct.
(fox_ext,) = [x for x in exts if x.get('alias') == 'FOXNSOX']