diff --git a/etc/nova/policy.json b/etc/nova/policy.json
index b9e684d509eb..4ffb55052c4d 100644
--- a/etc/nova/policy.json
+++ b/etc/nova/policy.json
@@ -123,6 +123,7 @@
"compute_extension:v3:os-rescue": "",
"compute_extension:security_group_default_rules": "rule:admin_api",
"compute_extension:security_groups": "",
+ "compute_extension:v3:os-security-groups": "",
"compute_extension:server_diagnostics": "rule:admin_api",
"compute_extension:v3:os-server-diagnostics": "rule:admin_api",
"compute_extension:server_password": "",
diff --git a/nova/api/openstack/compute/plugins/v3/security_groups.py b/nova/api/openstack/compute/plugins/v3/security_groups.py
index e2862be4e743..80f7389f10b4 100644
--- a/nova/api/openstack/compute/plugins/v3/security_groups.py
+++ b/nova/api/openstack/compute/plugins/v3/security_groups.py
@@ -22,7 +22,6 @@ import webob
from webob import exc
from xml.dom import minidom
-from nova.api.openstack import common
from nova.api.openstack import extensions
from nova.api.openstack import wsgi
from nova.api.openstack import xmlutil
@@ -31,49 +30,11 @@ from nova.compute import api as compute_api
from nova import exception
from nova.network.security_group import openstack_driver
from nova.network.security_group import quantum_driver
-from nova.virt import netutils
-authorize = extensions.extension_authorizer('compute', 'security_groups')
-softauth = extensions.soft_extension_authorizer('compute', 'security_groups')
-
-
-def make_rule(elem):
- elem.set('id')
- elem.set('parent_group_id')
-
- proto = xmlutil.SubTemplateElement(elem, 'ip_protocol')
- proto.text = 'ip_protocol'
-
- from_port = xmlutil.SubTemplateElement(elem, 'from_port')
- from_port.text = 'from_port'
-
- to_port = xmlutil.SubTemplateElement(elem, 'to_port')
- to_port.text = 'to_port'
-
- group = xmlutil.SubTemplateElement(elem, 'group', selector='group')
- name = xmlutil.SubTemplateElement(group, 'name')
- name.text = 'name'
- tenant_id = xmlutil.SubTemplateElement(group, 'tenant_id')
- tenant_id.text = 'tenant_id'
-
- ip_range = xmlutil.SubTemplateElement(elem, 'ip_range',
- selector='ip_range')
- cidr = xmlutil.SubTemplateElement(ip_range, 'cidr')
- cidr.text = 'cidr'
-
-
-def make_sg(elem):
- elem.set('id')
- elem.set('tenant_id')
- elem.set('name')
-
- desc = xmlutil.SubTemplateElement(elem, 'description')
- desc.text = 'description'
-
- rules = xmlutil.SubTemplateElement(elem, 'rules')
- rule = xmlutil.SubTemplateElement(rules, 'rule', selector='rules')
- make_rule(rule)
+ALIAS = 'os-security-groups'
+authorize = extensions.extension_authorizer('compute', 'v3:' + ALIAS)
+softauth = extensions.soft_extension_authorizer('compute', 'v3:' + ALIAS)
def _authorize_context(req):
@@ -81,102 +42,6 @@ def _authorize_context(req):
authorize(context)
return context
-sg_nsmap = {None: wsgi.XMLNS_V11}
-
-
-class SecurityGroupRuleTemplate(xmlutil.TemplateBuilder):
- def construct(self):
- root = xmlutil.TemplateElement('security_group_rule',
- selector='security_group_rule')
- make_rule(root)
- return xmlutil.MasterTemplate(root, 1, nsmap=sg_nsmap)
-
-
-class SecurityGroupTemplate(xmlutil.TemplateBuilder):
- def construct(self):
- root = xmlutil.TemplateElement('security_group',
- selector='security_group')
- make_sg(root)
- return xmlutil.MasterTemplate(root, 1, nsmap=sg_nsmap)
-
-
-class SecurityGroupsTemplate(xmlutil.TemplateBuilder):
- def construct(self):
- root = xmlutil.TemplateElement('security_groups')
- elem = xmlutil.SubTemplateElement(root, 'security_group',
- selector='security_groups')
- make_sg(elem)
- return xmlutil.MasterTemplate(root, 1, nsmap=sg_nsmap)
-
-
-class SecurityGroupXMLDeserializer(wsgi.MetadataXMLDeserializer):
- """
- Deserializer to handle xml-formatted security group requests.
- """
- def default(self, string):
- """Deserialize an xml-formatted security group create request."""
- dom = xmlutil.safe_minidom_parse_string(string)
- security_group = {}
- sg_node = self.find_first_child_named(dom,
- 'security_group')
- if sg_node is not None:
- if sg_node.hasAttribute('name'):
- security_group['name'] = sg_node.getAttribute('name')
- desc_node = self.find_first_child_named(sg_node,
- "description")
- if desc_node:
- security_group['description'] = self.extract_text(desc_node)
- return {'body': {'security_group': security_group}}
-
-
-class SecurityGroupRulesXMLDeserializer(wsgi.MetadataXMLDeserializer):
- """
- Deserializer to handle xml-formatted security group requests.
- """
-
- def default(self, string):
- """Deserialize an xml-formatted security group create request."""
- dom = xmlutil.safe_minidom_parse_string(string)
- security_group_rule = self._extract_security_group_rule(dom)
- return {'body': {'security_group_rule': security_group_rule}}
-
- def _extract_security_group_rule(self, node):
- """Marshal the security group rule attribute of a parsed request."""
- sg_rule = {}
- sg_rule_node = self.find_first_child_named(node,
- 'security_group_rule')
- if sg_rule_node is not None:
- ip_protocol_node = self.find_first_child_named(sg_rule_node,
- "ip_protocol")
- if ip_protocol_node is not None:
- sg_rule['ip_protocol'] = self.extract_text(ip_protocol_node)
-
- from_port_node = self.find_first_child_named(sg_rule_node,
- "from_port")
- if from_port_node is not None:
- sg_rule['from_port'] = self.extract_text(from_port_node)
-
- to_port_node = self.find_first_child_named(sg_rule_node, "to_port")
- if to_port_node is not None:
- sg_rule['to_port'] = self.extract_text(to_port_node)
-
- parent_group_id_node = self.find_first_child_named(sg_rule_node,
- "parent_group_id")
- if parent_group_id_node is not None:
- sg_rule['parent_group_id'] = self.extract_text(
- parent_group_id_node)
-
- group_id_node = self.find_first_child_named(sg_rule_node,
- "group_id")
- if group_id_node is not None:
- sg_rule['group_id'] = self.extract_text(group_id_node)
-
- cidr_node = self.find_first_child_named(sg_rule_node, "cidr")
- if cidr_node is not None:
- sg_rule['cidr'] = self.extract_text(cidr_node)
-
- return sg_rule
-
@contextlib.contextmanager
def translate_exceptions():
@@ -197,250 +62,6 @@ def translate_exceptions():
raise exc.HTTPRequestEntityTooLarge(explanation=msg)
-class SecurityGroupControllerBase(object):
- """Base class for Security Group controllers."""
-
- def __init__(self):
- self.security_group_api = (
- openstack_driver.get_openstack_security_group_driver())
- self.compute_api = compute.API(
- security_group_api=self.security_group_api)
-
- def _format_security_group_rule(self, context, rule):
- sg_rule = {}
- sg_rule['id'] = rule['id']
- sg_rule['parent_group_id'] = rule['parent_group_id']
- sg_rule['ip_protocol'] = rule['protocol']
- sg_rule['from_port'] = rule['from_port']
- sg_rule['to_port'] = rule['to_port']
- sg_rule['group'] = {}
- sg_rule['ip_range'] = {}
- if rule['group_id']:
- with translate_exceptions():
- source_group = self.security_group_api.get(context,
- id=rule['group_id'])
- sg_rule['group'] = {'name': source_group.get('name'),
- 'tenant_id': source_group.get('project_id')}
- else:
- sg_rule['ip_range'] = {'cidr': rule['cidr']}
- return sg_rule
-
- def _format_security_group(self, context, group):
- security_group = {}
- security_group['id'] = group['id']
- security_group['description'] = group['description']
- security_group['name'] = group['name']
- security_group['tenant_id'] = group['project_id']
- security_group['rules'] = []
- for rule in group['rules']:
- security_group['rules'] += [self._format_security_group_rule(
- context, rule)]
- return security_group
-
- def _from_body(self, body, key):
- if not body:
- raise exc.HTTPUnprocessableEntity()
- value = body.get(key, None)
- if value is None:
- raise exc.HTTPUnprocessableEntity()
- return value
-
-
-class SecurityGroupController(SecurityGroupControllerBase):
- """The Security group API controller for the OpenStack API."""
-
- @wsgi.serializers(xml=SecurityGroupTemplate)
- def show(self, req, id):
- """Return data about the given security group."""
- context = _authorize_context(req)
-
- with translate_exceptions():
- id = self.security_group_api.validate_id(id)
- security_group = self.security_group_api.get(context, None, id,
- map_exception=True)
-
- return {'security_group': self._format_security_group(context,
- security_group)}
-
- def delete(self, req, id):
- """Delete a security group."""
- context = _authorize_context(req)
-
- with translate_exceptions():
- id = self.security_group_api.validate_id(id)
- security_group = self.security_group_api.get(context, None, id,
- map_exception=True)
- self.security_group_api.destroy(context, security_group)
-
- return webob.Response(status_int=202)
-
- @wsgi.serializers(xml=SecurityGroupsTemplate)
- def index(self, req):
- """Returns a list of security groups."""
- context = _authorize_context(req)
-
- search_opts = {}
- search_opts.update(req.GET)
-
- with translate_exceptions():
- project_id = context.project_id
- raw_groups = self.security_group_api.list(context,
- project=project_id,
- search_opts=search_opts)
-
- limited_list = common.limited(raw_groups, req)
- result = [self._format_security_group(context, group)
- for group in limited_list]
-
- return {'security_groups':
- list(sorted(result,
- key=lambda k: (k['tenant_id'], k['name'])))}
-
- @wsgi.serializers(xml=SecurityGroupTemplate)
- @wsgi.deserializers(xml=SecurityGroupXMLDeserializer)
- def create(self, req, body):
- """Creates a new security group."""
- context = _authorize_context(req)
-
- security_group = self._from_body(body, 'security_group')
-
- group_name = security_group.get('name', None)
- group_description = security_group.get('description', None)
-
- with translate_exceptions():
- self.security_group_api.validate_property(group_name, 'name', None)
- self.security_group_api.validate_property(group_description,
- 'description', None)
- group_ref = self.security_group_api.create_security_group(
- context, group_name, group_description)
-
- return {'security_group': self._format_security_group(context,
- group_ref)}
-
- @wsgi.serializers(xml=SecurityGroupTemplate)
- def update(self, req, id, body):
- """Update a security group."""
- context = _authorize_context(req)
-
- with translate_exceptions():
- id = self.security_group_api.validate_id(id)
- security_group = self.security_group_api.get(context, None, id,
- map_exception=True)
-
- security_group_data = self._from_body(body, 'security_group')
- group_name = security_group_data.get('name', None)
- group_description = security_group_data.get('description', None)
-
- with translate_exceptions():
- self.security_group_api.validate_property(group_name, 'name', None)
- self.security_group_api.validate_property(group_description,
- 'description', None)
- group_ref = self.security_group_api.update_security_group(
- context, security_group, group_name, group_description)
-
- return {'security_group': self._format_security_group(context,
- group_ref)}
-
-
-class SecurityGroupRulesController(SecurityGroupControllerBase):
-
- @wsgi.serializers(xml=SecurityGroupRuleTemplate)
- @wsgi.deserializers(xml=SecurityGroupRulesXMLDeserializer)
- def create(self, req, body):
- context = _authorize_context(req)
-
- sg_rule = self._from_body(body, 'security_group_rule')
-
- with translate_exceptions():
- parent_group_id = self.security_group_api.validate_id(
- sg_rule.get('parent_group_id', None))
- security_group = self.security_group_api.get(context, None,
- parent_group_id,
- map_exception=True)
- try:
- new_rule = self._rule_args_to_dict(context,
- to_port=sg_rule.get('to_port'),
- from_port=sg_rule.get('from_port'),
- ip_protocol=sg_rule.get('ip_protocol'),
- cidr=sg_rule.get('cidr'),
- group_id=sg_rule.get('group_id'))
- except Exception as exp:
- raise exc.HTTPBadRequest(explanation=unicode(exp))
-
- if new_rule is None:
- msg = _("Not enough parameters to build a valid rule.")
- raise exc.HTTPBadRequest(explanation=msg)
-
- new_rule['parent_group_id'] = security_group['id']
-
- if 'cidr' in new_rule:
- net, prefixlen = netutils.get_net_and_prefixlen(new_rule['cidr'])
- if net != '0.0.0.0' and prefixlen == '0':
- msg = _("Bad prefix for network in cidr %s") % new_rule['cidr']
- raise exc.HTTPBadRequest(explanation=msg)
-
- with translate_exceptions():
- security_group_rule = (
- self.security_group_api.create_security_group_rule(
- context, security_group, new_rule))
-
- return {"security_group_rule": self._format_security_group_rule(
- context,
- security_group_rule)}
-
- def _rule_args_to_dict(self, context, to_port=None, from_port=None,
- ip_protocol=None, cidr=None, group_id=None):
-
- if group_id is not None:
- group_id = self.security_group_api.validate_id(group_id)
-
- # check if groupId exists
- self.security_group_api.get(context, id=group_id)
- return self.security_group_api.new_group_ingress_rule(
- group_id, ip_protocol, from_port, to_port)
- else:
- cidr = self.security_group_api.parse_cidr(cidr)
- return self.security_group_api.new_cidr_ingress_rule(
- cidr, ip_protocol, from_port, to_port)
-
- def delete(self, req, id):
- context = _authorize_context(req)
-
- with translate_exceptions():
- id = self.security_group_api.validate_id(id)
- rule = self.security_group_api.get_rule(context, id)
- group_id = rule['parent_group_id']
- security_group = self.security_group_api.get(context, None,
- group_id,
- map_exception=True)
- self.security_group_api.remove_rules(context, security_group,
- [rule['id']])
-
- return webob.Response(status_int=202)
-
-
-class ServerSecurityGroupController(SecurityGroupControllerBase):
-
- @wsgi.serializers(xml=SecurityGroupsTemplate)
- def index(self, req, server_id):
- """Returns a list of security groups for the given instance."""
- context = _authorize_context(req)
-
- self.security_group_api.ensure_default(context)
-
- with translate_exceptions():
- instance = self.compute_api.get(context, server_id)
- groups = self.security_group_api.get_instance_security_groups(
- context, instance['uuid'], True)
-
- result = [self._format_security_group(context, group)
- for group in groups]
-
- return {'security_groups':
- list(sorted(result,
- key=lambda k: (k['tenant_id'], k['name'])))}
-
-
class SecurityGroupActionController(wsgi.Controller):
def __init__(self, *args, **kwargs):
super(SecurityGroupActionController, self).__init__(*args, **kwargs)
@@ -610,12 +231,12 @@ class SecurityGroupServersTemplate(xmlutil.TemplateBuilder):
return xmlutil.SlaveTemplate(root, 1)
-class Security_groups(extensions.ExtensionDescriptor):
+class SecurityGroups(extensions.V3APIExtensionBase):
"""Security group support."""
name = "SecurityGroups"
- alias = "os-security-groups"
- namespace = "http://docs.openstack.org/compute/ext/securitygroups/api/v1.1"
- updated = "2013-05-28T00:00:00+00:00"
+ alias = ALIAS
+ namespace = "http://docs.openstack.org/compute/ext/securitygroups/api/v3"
+ version = 1
def get_controller_extensions(self):
controller = SecurityGroupActionController()
@@ -625,24 +246,7 @@ class Security_groups(extensions.ExtensionDescriptor):
return [actions, output]
def get_resources(self):
- resources = []
-
- res = extensions.ResourceExtension('os-security-groups',
- controller=SecurityGroupController())
-
- resources.append(res)
-
- res = extensions.ResourceExtension('os-security-group-rules',
- controller=SecurityGroupRulesController())
- resources.append(res)
-
- res = extensions.ResourceExtension(
- 'os-security-groups',
- controller=ServerSecurityGroupController(),
- parent=dict(member_name='server', collection_name='servers'))
- resources.append(res)
-
- return resources
+ return []
class NativeSecurityGroupExceptions(object):
diff --git a/nova/tests/api/openstack/compute/plugins/v3/test_security_groups.py b/nova/tests/api/openstack/compute/plugins/v3/test_security_groups.py
index df45f0d7db22..9b68d037919b 100644
--- a/nova/tests/api/openstack/compute/plugins/v3/test_security_groups.py
+++ b/nova/tests/api/openstack/compute/plugins/v3/test_security_groups.py
@@ -20,63 +20,19 @@ import mox
from oslo.config import cfg
import webob
-from nova.api.openstack.compute.contrib import security_groups
-from nova.api.openstack import wsgi
+from nova.api.openstack.compute.plugins.v3 import security_groups
from nova.api.openstack import xmlutil
from nova import compute
from nova.compute import power_state
import nova.db
from nova import exception
-from nova.objects import instance as instance_obj
from nova.openstack.common import jsonutils
-from nova import quota
from nova import test
from nova.tests.api.openstack import fakes
-from nova.tests import utils
+
CONF = cfg.CONF
FAKE_UUID1 = 'a47ae74e-ab08-447f-8eee-ffd43fc46c16'
-FAKE_UUID2 = 'c6e6430a-6563-4efa-9542-5e93c9e97d18'
-
-
-class AttrDict(dict):
- def __getattr__(self, k):
- return self[k]
-
-
-def security_group_template(**kwargs):
- sg = kwargs.copy()
- sg.setdefault('tenant_id', '123')
- sg.setdefault('name', 'test')
- sg.setdefault('description', 'test-description')
- return sg
-
-
-def security_group_db(security_group, id=None):
- attrs = security_group.copy()
- if 'tenant_id' in attrs:
- attrs['project_id'] = attrs.pop('tenant_id')
- if id is not None:
- attrs['id'] = id
- attrs.setdefault('rules', [])
- attrs.setdefault('instances', [])
- return AttrDict(attrs)
-
-
-def security_group_rule_template(**kwargs):
- rule = kwargs.copy()
- rule.setdefault('ip_protocol', 'tcp')
- rule.setdefault('from_port', 22)
- rule.setdefault('to_port', 22)
- rule.setdefault('parent_group_id', 2)
- return rule
-
-
-def security_group_rule_db(rule, id=None):
- attrs = rule.copy()
- if 'ip_protocol' in attrs:
- attrs['protocol'] = attrs.pop('ip_protocol')
- return AttrDict(attrs)
def return_server(context, server_id):
@@ -116,413 +72,22 @@ def return_server_nonexistent(context, server_id):
class TestSecurityGroups(test.TestCase):
def setUp(self):
super(TestSecurityGroups, self).setUp()
-
- self.controller = security_groups.SecurityGroupController()
- self.server_controller = (
- security_groups.ServerSecurityGroupController())
self.manager = security_groups.SecurityGroupActionController()
- # This needs to be done here to set fake_id because the derived
- # class needs to be called first if it wants to set
- # 'security_group_api' and this setUp method needs to be called.
- if self.controller.security_group_api.id_is_uuid:
- self.fake_id = '11111111-1111-1111-1111-111111111111'
- else:
- self.fake_id = '11111111'
-
- def _assert_no_security_groups_reserved(self, context):
- """Check that no reservations are leaked during tests."""
- result = quota.QUOTAS.get_project_quotas(context, context.project_id)
- self.assertEqual(result['security_groups']['reserved'], 0)
-
- def test_create_security_group(self):
- sg = security_group_template()
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- res_dict = self.controller.create(req, {'security_group': sg})
- self.assertEqual(res_dict['security_group']['name'], 'test')
- self.assertEqual(res_dict['security_group']['description'],
- 'test-description')
-
- def test_create_security_group_with_no_name(self):
- sg = security_group_template()
- del sg['name']
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.assertRaises(webob.exc.HTTPUnprocessableEntity,
- self.controller.create, req, sg)
-
- self._assert_no_security_groups_reserved(req.environ['nova.context'])
-
- def test_create_security_group_with_no_description(self):
- sg = security_group_template()
- del sg['description']
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group': sg})
-
- self._assert_no_security_groups_reserved(req.environ['nova.context'])
-
- def test_create_security_group_with_blank_name(self):
- sg = security_group_template(name='')
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group': sg})
-
- self._assert_no_security_groups_reserved(req.environ['nova.context'])
-
- def test_create_security_group_with_whitespace_name(self):
- sg = security_group_template(name=' ')
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group': sg})
-
- self._assert_no_security_groups_reserved(req.environ['nova.context'])
-
- def test_create_security_group_with_blank_description(self):
- sg = security_group_template(description='')
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group': sg})
-
- self._assert_no_security_groups_reserved(req.environ['nova.context'])
-
- def test_create_security_group_with_whitespace_description(self):
- sg = security_group_template(description=' ')
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group': sg})
-
- self._assert_no_security_groups_reserved(req.environ['nova.context'])
-
- def test_create_security_group_with_duplicate_name(self):
- sg = security_group_template()
-
- # FIXME: Stub out _get instead of creating twice
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.controller.create(req, {'security_group': sg})
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group': sg})
-
- self._assert_no_security_groups_reserved(req.environ['nova.context'])
-
- def test_create_security_group_with_no_body(self):
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.assertRaises(webob.exc.HTTPUnprocessableEntity,
- self.controller.create, req, None)
-
- self._assert_no_security_groups_reserved(req.environ['nova.context'])
-
- def test_create_security_group_with_no_security_group(self):
- body = {'no-securityGroup': None}
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.assertRaises(webob.exc.HTTPUnprocessableEntity,
- self.controller.create, req, body)
-
- self._assert_no_security_groups_reserved(req.environ['nova.context'])
-
- def test_create_security_group_above_255_characters_name(self):
- sg = security_group_template(name='1234567890' * 26)
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group': sg})
-
- self._assert_no_security_groups_reserved(req.environ['nova.context'])
-
- def test_create_security_group_above_255_characters_description(self):
- sg = security_group_template(description='1234567890' * 26)
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group': sg})
-
- self._assert_no_security_groups_reserved(req.environ['nova.context'])
-
- def test_create_security_group_non_string_name(self):
- sg = security_group_template(name=12)
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group': sg})
-
- self._assert_no_security_groups_reserved(req.environ['nova.context'])
-
- def test_create_security_group_non_string_description(self):
- sg = security_group_template(description=12)
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group': sg})
-
- self._assert_no_security_groups_reserved(req.environ['nova.context'])
-
- def test_create_security_group_quota_limit(self):
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- for num in range(1, CONF.quota_security_groups + 1):
- name = 'test%s' % num
- sg = security_group_template(name=name)
- res_dict = self.controller.create(req, {'security_group': sg})
- self.assertEqual(res_dict['security_group']['name'], name)
-
- sg = security_group_template()
- self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
- self.controller.create,
- req, {'security_group': sg})
-
- def test_get_security_group_list(self):
- groups = []
- for i, name in enumerate(['default', 'test']):
- sg = security_group_template(id=i + 1,
- name=name,
- description=name + '-desc',
- rules=[])
- groups.append(sg)
- expected = {'security_groups': groups}
-
- def return_security_groups(context, project_id):
- return [security_group_db(sg) for sg in groups]
-
- self.stubs.Set(nova.db, 'security_group_get_by_project',
- return_security_groups)
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
- res_dict = self.controller.index(req)
-
- self.assertEquals(res_dict, expected)
-
- def test_get_security_group_list_all_tenants(self):
- all_groups = []
- tenant_groups = []
-
- for i, name in enumerate(['default', 'test']):
- sg = security_group_template(id=i + 1,
- name=name,
- description=name + '-desc',
- rules=[])
- all_groups.append(sg)
- if name == 'default':
- tenant_groups.append(sg)
-
- all = {'security_groups': all_groups}
- tenant_specific = {'security_groups': tenant_groups}
-
- def return_all_security_groups(context):
- return [security_group_db(sg) for sg in all_groups]
-
- self.stubs.Set(nova.db, 'security_group_get_all',
- return_all_security_groups)
-
- def return_tenant_security_groups(context, project_id):
- return [security_group_db(sg) for sg in tenant_groups]
-
- self.stubs.Set(nova.db, 'security_group_get_by_project',
- return_tenant_security_groups)
-
- path = '/v2/fake/os-security-groups'
-
- req = fakes.HTTPRequest.blank(path, use_admin_context=True)
- res_dict = self.controller.index(req)
- self.assertEquals(res_dict, tenant_specific)
-
- req = fakes.HTTPRequest.blank('%s?all_tenants=1' % path,
- use_admin_context=True)
- res_dict = self.controller.index(req)
- self.assertEquals(res_dict, all)
-
- def test_get_security_group_by_instance(self):
- groups = []
- for i, name in enumerate(['default', 'test']):
- sg = security_group_template(id=i + 1,
- name=name,
- description=name + '-desc',
- rules=[])
- groups.append(sg)
- expected = {'security_groups': groups}
-
- def return_instance(context, server_id):
- self.assertEquals(server_id, FAKE_UUID1)
- return return_server_by_uuid(context, server_id)
-
- self.stubs.Set(nova.db, 'instance_get_by_uuid',
- return_instance)
-
- def return_security_groups(context, instance_uuid):
- self.assertEquals(instance_uuid, FAKE_UUID1)
- return [security_group_db(sg) for sg in groups]
-
- self.stubs.Set(nova.db, 'security_group_get_by_instance',
- return_security_groups)
-
- req = fakes.HTTPRequest.blank('/v2/%s/servers/%s/os-security-groups' %
- ('fake', FAKE_UUID1))
- res_dict = self.server_controller.index(req, FAKE_UUID1)
-
- self.assertEquals(res_dict, expected)
-
- def test_get_security_group_by_instance_non_existing(self):
- self.stubs.Set(nova.db, 'instance_get', return_server_nonexistent)
- self.stubs.Set(nova.db, 'instance_get_by_uuid',
- return_server_nonexistent)
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/os-security-groups')
- self.assertRaises(webob.exc.HTTPNotFound,
- self.server_controller.index, req, '1')
-
- def test_get_security_group_by_instance_invalid_id(self):
- req = fakes.HTTPRequest.blank(
- '/v2/fake/servers/invalid/os-security-groups')
- self.assertRaises(webob.exc.HTTPNotFound,
- self.server_controller.index, req, 'invalid')
-
- def test_get_security_group_by_id(self):
- sg = security_group_template(id=2, rules=[])
-
- def return_security_group(context, group_id):
- self.assertEquals(sg['id'], group_id)
- return security_group_db(sg)
-
- self.stubs.Set(nova.db, 'security_group_get',
- return_security_group)
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/2')
- res_dict = self.controller.show(req, '2')
-
- expected = {'security_group': sg}
- self.assertEquals(res_dict, expected)
-
- def test_get_security_group_by_invalid_id(self):
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/invalid')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
- req, 'invalid')
-
- def test_get_security_group_by_non_existing_id(self):
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s' %
- self.fake_id)
- self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
- req, self.fake_id)
-
- def test_update_security_group(self):
- sg = security_group_template(id=2, rules=[])
- sg_update = security_group_template(id=2, rules=[],
- name='update_name', description='update_desc')
-
- def return_security_group(context, group_id):
- self.assertEquals(sg['id'], group_id)
- return security_group_db(sg)
-
- def return_update_security_group(context, group_id, values):
- self.assertEquals(sg_update['id'], group_id)
- self.assertEquals(sg_update['name'], values['name'])
- self.assertEquals(sg_update['description'], values['description'])
- return security_group_db(sg_update)
-
- self.stubs.Set(nova.db, 'security_group_update',
- return_update_security_group)
- self.stubs.Set(nova.db, 'security_group_get',
- return_security_group)
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/2')
- res_dict = self.controller.update(req, '2',
- {'security_group': sg_update})
-
- expected = {'security_group': sg_update}
- self.assertEquals(res_dict, expected)
-
- def test_update_security_group_name_to_default(self):
- sg = security_group_template(id=2, rules=[], name='default')
-
- def return_security_group(context, group_id):
- self.assertEquals(sg['id'], group_id)
- return security_group_db(sg)
-
- self.stubs.Set(nova.db, 'security_group_get',
- return_security_group)
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/2')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.update,
- req, '2', {'security_group': sg})
-
- def test_update_default_security_group_fail(self):
- sg = security_group_template()
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/1')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.update,
- req, '1', {'security_group': sg})
-
- def test_delete_security_group_by_id(self):
- sg = security_group_template(id=1, rules=[])
-
- self.called = False
-
- def security_group_destroy(context, id):
- self.called = True
-
- def return_security_group(context, group_id):
- self.assertEquals(sg['id'], group_id)
- return security_group_db(sg)
-
- self.stubs.Set(nova.db, 'security_group_destroy',
- security_group_destroy)
- self.stubs.Set(nova.db, 'security_group_get',
- return_security_group)
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/1')
- self.controller.delete(req, '1')
-
- self.assertTrue(self.called)
-
- def test_delete_security_group_by_invalid_id(self):
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/invalid')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
- req, 'invalid')
-
- def test_delete_security_group_by_non_existing_id(self):
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s'
- % self.fake_id)
- self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
- req, self.fake_id)
-
- def test_delete_security_group_in_use(self):
- sg = security_group_template(id=1, rules=[])
-
- def security_group_in_use(context, id):
- return True
-
- def return_security_group(context, group_id):
- self.assertEquals(sg['id'], group_id)
- return security_group_db(sg)
-
- self.stubs.Set(nova.db, 'security_group_in_use',
- security_group_in_use)
- self.stubs.Set(nova.db, 'security_group_get',
- return_security_group)
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/1')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
- req, '1')
-
def test_associate_by_non_existing_security_group_name(self):
self.stubs.Set(nova.db, 'instance_get', return_server)
self.assertEquals(return_server(None, '1'),
nova.db.instance_get(None, '1'))
body = dict(addSecurityGroup=dict(name='non-existing'))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.assertRaises(webob.exc.HTTPNotFound,
self.manager._addSecurityGroup, req, '1', body)
def test_associate_by_invalid_server_id(self):
body = dict(addSecurityGroup=dict(name='test'))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/invalid/action')
+ req = fakes.HTTPRequestV3.blank('/servers/invalid/action')
self.assertRaises(webob.exc.HTTPNotFound,
self.manager._addSecurityGroup, req, 'invalid', body)
@@ -530,7 +95,7 @@ class TestSecurityGroups(test.TestCase):
self.stubs.Set(nova.db, 'instance_get', return_server)
body = dict(addSecurityGroup=None)
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.assertRaises(webob.exc.HTTPBadRequest,
self.manager._addSecurityGroup, req, '1', body)
@@ -538,7 +103,7 @@ class TestSecurityGroups(test.TestCase):
self.stubs.Set(nova.db, 'instance_get', return_server)
body = dict(addSecurityGroup=dict())
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.assertRaises(webob.exc.HTTPBadRequest,
self.manager._addSecurityGroup, req, '1', body)
@@ -546,7 +111,7 @@ class TestSecurityGroups(test.TestCase):
self.stubs.Set(nova.db, 'instance_get', return_server)
body = dict(addSecurityGroup=dict(name=" "))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.assertRaises(webob.exc.HTTPBadRequest,
self.manager._addSecurityGroup, req, '1', body)
@@ -556,7 +121,7 @@ class TestSecurityGroups(test.TestCase):
return_server_nonexistent)
body = dict(addSecurityGroup=dict(name="test"))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.assertRaises(webob.exc.HTTPNotFound,
self.manager._addSecurityGroup, req, '1', body)
@@ -568,7 +133,7 @@ class TestSecurityGroups(test.TestCase):
return_security_group_without_instances)
body = dict(addSecurityGroup=dict(name="test"))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.manager._addSecurityGroup(req, '1', body)
def test_associate_already_associated_security_group_to_instance(self):
@@ -579,7 +144,7 @@ class TestSecurityGroups(test.TestCase):
return_security_group_by_name)
body = dict(addSecurityGroup=dict(name="test"))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.assertRaises(webob.exc.HTTPBadRequest,
self.manager._addSecurityGroup, req, '1', body)
@@ -597,7 +162,7 @@ class TestSecurityGroups(test.TestCase):
body = dict(addSecurityGroup=dict(name="test"))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.manager._addSecurityGroup(req, '1', body)
def test_disassociate_by_non_existing_security_group_name(self):
@@ -606,7 +171,7 @@ class TestSecurityGroups(test.TestCase):
nova.db.instance_get(None, '1'))
body = dict(removeSecurityGroup=dict(name='non-existing'))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.assertRaises(webob.exc.HTTPNotFound,
self.manager._removeSecurityGroup, req, '1', body)
@@ -615,7 +180,7 @@ class TestSecurityGroups(test.TestCase):
return_security_group_by_name)
body = dict(removeSecurityGroup=dict(name='test'))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/invalid/action')
+ req = fakes.HTTPRequestV3.blank('/servers/invalid/action')
self.assertRaises(webob.exc.HTTPNotFound,
self.manager._removeSecurityGroup, req, 'invalid',
body)
@@ -624,7 +189,7 @@ class TestSecurityGroups(test.TestCase):
self.stubs.Set(nova.db, 'instance_get', return_server)
body = dict(removeSecurityGroup=None)
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.assertRaises(webob.exc.HTTPBadRequest,
self.manager._removeSecurityGroup, req, '1', body)
@@ -632,7 +197,7 @@ class TestSecurityGroups(test.TestCase):
self.stubs.Set(nova.db, 'instance_get', return_server)
body = dict(removeSecurityGroup=dict())
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.assertRaises(webob.exc.HTTPBadRequest,
self.manager._removeSecurityGroup, req, '1', body)
@@ -640,7 +205,7 @@ class TestSecurityGroups(test.TestCase):
self.stubs.Set(nova.db, 'instance_get', return_server)
body = dict(removeSecurityGroup=dict(name=" "))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.assertRaises(webob.exc.HTTPBadRequest,
self.manager._removeSecurityGroup, req, '1', body)
@@ -650,7 +215,7 @@ class TestSecurityGroups(test.TestCase):
return_security_group_by_name)
body = dict(removeSecurityGroup=dict(name="test"))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.assertRaises(webob.exc.HTTPNotFound,
self.manager._removeSecurityGroup, req, '1', body)
@@ -662,7 +227,7 @@ class TestSecurityGroups(test.TestCase):
return_security_group_by_name)
body = dict(removeSecurityGroup=dict(name="test"))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.manager._removeSecurityGroup(req, '1', body)
def test_disassociate_already_associated_security_group_to_instance(self):
@@ -673,7 +238,7 @@ class TestSecurityGroups(test.TestCase):
return_security_group_without_instances)
body = dict(removeSecurityGroup=dict(name="test"))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.assertRaises(webob.exc.HTTPBadRequest,
self.manager._removeSecurityGroup, req, '1', body)
@@ -691,793 +256,25 @@ class TestSecurityGroups(test.TestCase):
body = dict(removeSecurityGroup=dict(name="test"))
- req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ req = fakes.HTTPRequestV3.blank('/servers/1/action')
self.manager._removeSecurityGroup(req, '1', body)
-class TestSecurityGroupRules(test.TestCase):
- def setUp(self):
- super(TestSecurityGroupRules, self).setUp()
-
- self.controller = security_groups.SecurityGroupController()
- if self.controller.security_group_api.id_is_uuid:
- id1 = '11111111-1111-1111-1111-111111111111'
- id2 = '22222222-2222-2222-2222-222222222222'
- self.invalid_id = '33333333-3333-3333-3333-333333333333'
- else:
- id1 = 1
- id2 = 2
- self.invalid_id = '33333333'
-
- self.sg1 = security_group_template(id=id1)
- self.sg2 = security_group_template(
- id=id2, name='authorize_revoke',
- description='authorize-revoke testing')
-
- db1 = security_group_db(self.sg1)
- db2 = security_group_db(self.sg2)
-
- def return_security_group(context, group_id, columns_to_join=None):
- if group_id == db1['id']:
- return db1
- if group_id == db2['id']:
- return db2
- raise exception.NotFound()
-
- self.stubs.Set(nova.db, 'security_group_get',
- return_security_group)
-
- self.parent_security_group = db2
-
- self.controller = security_groups.SecurityGroupRulesController()
-
- def test_create_by_cidr(self):
- rule = security_group_rule_template(cidr='10.2.3.124/24',
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- res_dict = self.controller.create(req, {'security_group_rule': rule})
- security_group_rule = res_dict['security_group_rule']
- self.assertNotEquals(security_group_rule['id'], 0)
- self.assertEquals(security_group_rule['parent_group_id'],
- self.sg2['id'])
- self.assertEquals(security_group_rule['ip_range']['cidr'],
- "10.2.3.124/24")
-
- def test_create_by_group_id(self):
- rule = security_group_rule_template(group_id=self.sg1['id'],
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- res_dict = self.controller.create(req, {'security_group_rule': rule})
-
- security_group_rule = res_dict['security_group_rule']
- self.assertNotEquals(security_group_rule['id'], 0)
- self.assertEquals(security_group_rule['parent_group_id'],
- self.sg2['id'])
-
- def test_create_by_same_group_id(self):
- rule1 = security_group_rule_template(group_id=self.sg1['id'],
- from_port=80, to_port=80,
- parent_group_id=self.sg2['id'])
- self.parent_security_group['rules'] = [security_group_rule_db(rule1)]
-
- rule2 = security_group_rule_template(group_id=self.sg1['id'],
- from_port=81, to_port=81,
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- res_dict = self.controller.create(req, {'security_group_rule': rule2})
-
- security_group_rule = res_dict['security_group_rule']
- self.assertNotEquals(security_group_rule['id'], 0)
- self.assertEquals(security_group_rule['parent_group_id'],
- self.sg2['id'])
- self.assertEquals(security_group_rule['from_port'], 81)
- self.assertEquals(security_group_rule['to_port'], 81)
-
- def test_create_none_value_from_to_port(self):
- rule = {'parent_group_id': self.sg1['id'],
- 'group_id': self.sg1['id']}
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- res_dict = self.controller.create(req, {'security_group_rule': rule})
- security_group_rule = res_dict['security_group_rule']
- self.assertEquals(security_group_rule['from_port'], None)
- self.assertEquals(security_group_rule['to_port'], None)
- self.assertEquals(security_group_rule['group']['name'], 'test')
- self.assertEquals(security_group_rule['parent_group_id'],
- self.sg1['id'])
-
- def test_create_none_value_from_to_port_icmp(self):
- rule = {'parent_group_id': self.sg1['id'],
- 'group_id': self.sg1['id'],
- 'ip_protocol': 'ICMP'}
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- res_dict = self.controller.create(req, {'security_group_rule': rule})
- security_group_rule = res_dict['security_group_rule']
- self.assertEquals(security_group_rule['ip_protocol'], 'ICMP')
- self.assertEquals(security_group_rule['from_port'], -1)
- self.assertEquals(security_group_rule['to_port'], -1)
- self.assertEquals(security_group_rule['group']['name'], 'test')
- self.assertEquals(security_group_rule['parent_group_id'],
- self.sg1['id'])
-
- def test_create_none_value_from_to_port_tcp(self):
- rule = {'parent_group_id': self.sg1['id'],
- 'group_id': self.sg1['id'],
- 'ip_protocol': 'TCP'}
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- res_dict = self.controller.create(req, {'security_group_rule': rule})
- security_group_rule = res_dict['security_group_rule']
- self.assertEquals(security_group_rule['ip_protocol'], 'TCP')
- self.assertEquals(security_group_rule['from_port'], 1)
- self.assertEquals(security_group_rule['to_port'], 65535)
- self.assertEquals(security_group_rule['group']['name'], 'test')
- self.assertEquals(security_group_rule['parent_group_id'],
- self.sg1['id'])
-
- def test_create_by_invalid_cidr_json(self):
- rule = security_group_rule_template(
- ip_protocol="tcp",
- from_port=22,
- to_port=22,
- parent_group_id=self.sg2['id'],
- cidr="10.2.3.124/2433")
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_by_invalid_tcp_port_json(self):
- rule = security_group_rule_template(
- ip_protocol="tcp",
- from_port=75534,
- to_port=22,
- parent_group_id=self.sg2['id'],
- cidr="10.2.3.124/24")
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_by_invalid_icmp_port_json(self):
- rule = security_group_rule_template(
- ip_protocol="icmp",
- from_port=1,
- to_port=256,
- parent_group_id=self.sg2['id'],
- cidr="10.2.3.124/24")
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_add_existing_rules_by_cidr(self):
- rule = security_group_rule_template(cidr='10.0.0.0/24',
- parent_group_id=self.sg2['id'])
-
- self.parent_security_group['rules'] = [security_group_rule_db(rule)]
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_add_existing_rules_by_group_id(self):
- rule = security_group_rule_template(group_id=1)
-
- self.parent_security_group['rules'] = [security_group_rule_db(rule)]
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_no_body(self):
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPUnprocessableEntity,
- self.controller.create, req, None)
-
- def test_create_with_no_security_group_rule_in_body(self):
- rules = {'test': 'test'}
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPUnprocessableEntity,
- self.controller.create, req, rules)
-
- def test_create_with_invalid_parent_group_id(self):
- rule = security_group_rule_template(parent_group_id='invalid')
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_non_existing_parent_group_id(self):
- rule = security_group_rule_template(group_id='invalid',
- parent_group_id=self.invalid_id)
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPNotFound, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_invalid_protocol(self):
- rule = security_group_rule_template(ip_protocol='invalid-protocol',
- cidr='10.2.2.0/24',
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_no_protocol(self):
- rule = security_group_rule_template(cidr='10.2.2.0/24',
- parent_group_id=self.sg2['id'])
- del rule['ip_protocol']
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_invalid_from_port(self):
- rule = security_group_rule_template(from_port='666666',
- cidr='10.2.2.0/24',
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_invalid_to_port(self):
- rule = security_group_rule_template(to_port='666666',
- cidr='10.2.2.0/24',
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_non_numerical_from_port(self):
- rule = security_group_rule_template(from_port='invalid',
- cidr='10.2.2.0/24',
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_non_numerical_to_port(self):
- rule = security_group_rule_template(to_port='invalid',
- cidr='10.2.2.0/24',
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_no_from_port(self):
- rule = security_group_rule_template(cidr='10.2.2.0/24',
- parent_group_id=self.sg2['id'])
- del rule['from_port']
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_no_to_port(self):
- rule = security_group_rule_template(cidr='10.2.2.0/24',
- parent_group_id=self.sg2['id'])
- del rule['to_port']
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_invalid_cidr(self):
- rule = security_group_rule_template(cidr='10.2.2222.0/24',
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_no_cidr_group(self):
- rule = security_group_rule_template(parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- res_dict = self.controller.create(req, {'security_group_rule': rule})
-
- security_group_rule = res_dict['security_group_rule']
- self.assertNotEquals(security_group_rule['id'], 0)
- self.assertEquals(security_group_rule['parent_group_id'],
- self.parent_security_group['id'])
- self.assertEquals(security_group_rule['ip_range']['cidr'],
- "0.0.0.0/0")
-
- def test_create_with_invalid_group_id(self):
- rule = security_group_rule_template(group_id='invalid',
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_empty_group_id(self):
- rule = security_group_rule_template(group_id='',
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_nonexist_group_id(self):
- rule = security_group_rule_template(group_id=self.invalid_id,
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_with_same_group_parent_id_and_group_id(self):
- rule = security_group_rule_template(group_id=self.sg1['id'],
- parent_group_id=self.sg1['id'])
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- res_dict = self.controller.create(req, {'security_group_rule': rule})
- security_group_rule = res_dict['security_group_rule']
- self.assertNotEquals(security_group_rule['id'], 0)
- self.assertEquals(security_group_rule['parent_group_id'],
- self.sg1['id'])
- self.assertEquals(security_group_rule['group']['name'],
- self.sg1['name'])
-
- def _test_create_with_no_ports_and_no_group(self, proto):
- rule = {'ip_protocol': proto, 'parent_group_id': self.sg2['id']}
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
- def _test_create_with_no_ports(self, proto):
- rule = {'ip_protocol': proto, 'parent_group_id': self.sg2['id'],
- 'group_id': self.sg1['id']}
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- res_dict = self.controller.create(req, {'security_group_rule': rule})
- security_group_rule = res_dict['security_group_rule']
- expected_rule = {
- 'from_port': 1, 'group': {'tenant_id': '123', 'name': 'test'},
- 'ip_protocol': proto, 'to_port': 65535, 'parent_group_id':
- self.sg2['id'], 'ip_range': {}, 'id': security_group_rule['id']
- }
- if proto == 'icmp':
- expected_rule['to_port'] = -1
- expected_rule['from_port'] = -1
- self.assertTrue(security_group_rule == expected_rule)
-
- def test_create_with_no_ports_icmp(self):
- self._test_create_with_no_ports_and_no_group('icmp')
- self._test_create_with_no_ports('icmp')
-
- def test_create_with_no_ports_tcp(self):
- self._test_create_with_no_ports_and_no_group('tcp')
- self._test_create_with_no_ports('tcp')
-
- def test_create_with_no_ports_udp(self):
- self._test_create_with_no_ports_and_no_group('udp')
- self._test_create_with_no_ports('udp')
-
- def _test_create_with_ports(self, proto, from_port, to_port):
- rule = {
- 'ip_protocol': proto, 'from_port': from_port, 'to_port': to_port,
- 'parent_group_id': self.sg2['id'], 'group_id': self.sg1['id']
- }
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- res_dict = self.controller.create(req, {'security_group_rule': rule})
-
- security_group_rule = res_dict['security_group_rule']
- expected_rule = {
- 'from_port': from_port,
- 'group': {'tenant_id': '123', 'name': 'test'},
- 'ip_protocol': proto, 'to_port': to_port, 'parent_group_id':
- self.sg2['id'], 'ip_range': {}, 'id': security_group_rule['id']
- }
- self.assertTrue(security_group_rule['ip_protocol'] == proto)
- self.assertTrue(security_group_rule['from_port'] == from_port)
- self.assertTrue(security_group_rule['to_port'] == to_port)
- self.assertTrue(security_group_rule == expected_rule)
-
- def test_create_with_ports_icmp(self):
- self._test_create_with_ports('icmp', 0, 1)
- self._test_create_with_ports('icmp', 0, 0)
- self._test_create_with_ports('icmp', 1, 0)
-
- def test_create_with_ports_tcp(self):
- self._test_create_with_ports('tcp', 1, 1)
- self._test_create_with_ports('tcp', 1, 65535)
- self._test_create_with_ports('tcp', 65535, 65535)
-
- def test_create_with_ports_udp(self):
- self._test_create_with_ports('udp', 1, 1)
- self._test_create_with_ports('udp', 1, 65535)
- self._test_create_with_ports('udp', 65535, 65535)
-
- def test_delete(self):
- rule = security_group_rule_template(id=self.sg2['id'],
- parent_group_id=self.sg2['id'])
-
- def security_group_rule_get(context, id):
- return security_group_rule_db(rule)
-
- def security_group_rule_destroy(context, id):
- pass
-
- self.stubs.Set(nova.db, 'security_group_rule_get',
- security_group_rule_get)
- self.stubs.Set(nova.db, 'security_group_rule_destroy',
- security_group_rule_destroy)
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules/%s'
- % self.sg2['id'])
- self.controller.delete(req, self.sg2['id'])
-
- def test_delete_invalid_rule_id(self):
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules' +
- '/invalid')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
- req, 'invalid')
-
- def test_delete_non_existing_rule_id(self):
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules/%s'
- % self.invalid_id)
- self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
- req, self.invalid_id)
-
- def test_create_rule_quota_limit(self):
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- for num in range(100, 100 + CONF.quota_security_group_rules):
- rule = {
- 'ip_protocol': 'tcp', 'from_port': num,
- 'to_port': num, 'parent_group_id': self.sg2['id'],
- 'group_id': self.sg1['id']
- }
- self.controller.create(req, {'security_group_rule': rule})
-
- rule = {
- 'ip_protocol': 'tcp', 'from_port': '121', 'to_port': '121',
- 'parent_group_id': self.sg2['id'], 'group_id': self.sg1['id']
- }
- self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
- self.controller.create,
- req, {'security_group_rule': rule})
-
- def test_create_rule_cidr_allow_all(self):
- rule = security_group_rule_template(cidr='0.0.0.0/0',
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- res_dict = self.controller.create(req, {'security_group_rule': rule})
-
- security_group_rule = res_dict['security_group_rule']
- self.assertNotEquals(security_group_rule['id'], 0)
- self.assertEquals(security_group_rule['parent_group_id'],
- self.parent_security_group['id'])
- self.assertEquals(security_group_rule['ip_range']['cidr'],
- "0.0.0.0/0")
-
- def test_create_rule_cidr_allow_some(self):
- rule = security_group_rule_template(cidr='15.0.0.0/8',
- parent_group_id=self.sg2['id'])
-
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- res_dict = self.controller.create(req, {'security_group_rule': rule})
-
- security_group_rule = res_dict['security_group_rule']
- self.assertNotEquals(security_group_rule['id'], 0)
- self.assertEquals(security_group_rule['parent_group_id'],
- self.parent_security_group['id'])
- self.assertEquals(security_group_rule['ip_range']['cidr'],
- "15.0.0.0/8")
-
- def test_create_rule_cidr_bad_netmask(self):
- rule = security_group_rule_template(cidr='15.0.0.0/0')
- req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
- self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
- req, {'security_group_rule': rule})
-
-
-class TestSecurityGroupRulesXMLDeserializer(test.TestCase):
-
- def setUp(self):
- super(TestSecurityGroupRulesXMLDeserializer, self).setUp()
- self.deserializer = security_groups.SecurityGroupRulesXMLDeserializer()
-
- def test_create_request(self):
- serial_request = """
-
- 12
- 22
- 22
-
- tcp
- 10.0.0.0/24
-"""
- request = self.deserializer.deserialize(serial_request)
- expected = {
- "security_group_rule": {
- "parent_group_id": "12",
- "from_port": "22",
- "to_port": "22",
- "ip_protocol": "tcp",
- "group_id": "",
- "cidr": "10.0.0.0/24",
- },
- }
- self.assertEquals(request['body'], expected)
-
- def test_create_no_protocol_request(self):
- serial_request = """
-
- 12
- 22
- 22
-
- 10.0.0.0/24
-"""
- request = self.deserializer.deserialize(serial_request)
- expected = {
- "security_group_rule": {
- "parent_group_id": "12",
- "from_port": "22",
- "to_port": "22",
- "group_id": "",
- "cidr": "10.0.0.0/24",
- },
- }
- self.assertEquals(request['body'], expected)
-
- def test_corrupt_xml(self):
- """Should throw a 400 error on corrupt xml."""
- self.assertRaises(
- exception.MalformedRequestBody,
- self.deserializer.deserialize,
- utils.killer_xml_body())
-
-
-class TestSecurityGroupXMLDeserializer(test.TestCase):
-
- def setUp(self):
- super(TestSecurityGroupXMLDeserializer, self).setUp()
- self.deserializer = security_groups.SecurityGroupXMLDeserializer()
-
- def test_create_request(self):
- serial_request = """
-
- test
-"""
- request = self.deserializer.deserialize(serial_request)
- expected = {
- "security_group": {
- "name": "test",
- "description": "test",
- },
- }
- self.assertEquals(request['body'], expected)
-
- def test_create_no_description_request(self):
- serial_request = """
-
-"""
- request = self.deserializer.deserialize(serial_request)
- expected = {
- "security_group": {
- "name": "test",
- },
- }
- self.assertEquals(request['body'], expected)
-
- def test_create_no_name_request(self):
- serial_request = """
-
-test
-"""
- request = self.deserializer.deserialize(serial_request)
- expected = {
- "security_group": {
- "description": "test",
- },
- }
- self.assertEquals(request['body'], expected)
-
- def test_corrupt_xml(self):
- """Should throw a 400 error on corrupt xml."""
- self.assertRaises(
- exception.MalformedRequestBody,
- self.deserializer.deserialize,
- utils.killer_xml_body())
-
-
-class TestSecurityGroupXMLSerializer(test.TestCase):
- def setUp(self):
- super(TestSecurityGroupXMLSerializer, self).setUp()
- self.namespace = wsgi.XMLNS_V11
- self.rule_serializer = security_groups.SecurityGroupRuleTemplate()
- self.index_serializer = security_groups.SecurityGroupsTemplate()
- self.default_serializer = security_groups.SecurityGroupTemplate()
-
- def _tag(self, elem):
- tagname = elem.tag
- self.assertEqual(tagname[0], '{')
- tmp = tagname.partition('}')
- namespace = tmp[0][1:]
- self.assertEqual(namespace, self.namespace)
- return tmp[2]
-
- def _verify_security_group_rule(self, raw_rule, tree):
- self.assertEqual(raw_rule['id'], tree.get('id'))
- self.assertEqual(raw_rule['parent_group_id'],
- tree.get('parent_group_id'))
-
- seen = set()
- expected = set(['ip_protocol', 'from_port', 'to_port',
- 'group', 'group/name', 'group/tenant_id',
- 'ip_range', 'ip_range/cidr'])
-
- for child in tree:
- child_tag = self._tag(child)
- self.assertTrue(child_tag in raw_rule)
- seen.add(child_tag)
- if child_tag in ('group', 'ip_range'):
- for gr_child in child:
- gr_child_tag = self._tag(gr_child)
- self.assertTrue(gr_child_tag in raw_rule[child_tag])
- seen.add('%s/%s' % (child_tag, gr_child_tag))
- self.assertEqual(gr_child.text,
- raw_rule[child_tag][gr_child_tag])
- else:
- self.assertEqual(child.text, raw_rule[child_tag])
- self.assertEqual(seen, expected)
-
- def _verify_security_group(self, raw_group, tree):
- rules = raw_group['rules']
- self.assertEqual('security_group', self._tag(tree))
- self.assertEqual(raw_group['id'], tree.get('id'))
- self.assertEqual(raw_group['tenant_id'], tree.get('tenant_id'))
- self.assertEqual(raw_group['name'], tree.get('name'))
- self.assertEqual(2, len(tree))
- for child in tree:
- child_tag = self._tag(child)
- if child_tag == 'rules':
- self.assertEqual(2, len(child))
- for idx, gr_child in enumerate(child):
- self.assertEqual(self._tag(gr_child), 'rule')
- self._verify_security_group_rule(rules[idx], gr_child)
- else:
- self.assertEqual('description', child_tag)
- self.assertEqual(raw_group['description'], child.text)
-
- def test_rule_serializer(self):
- raw_rule = dict(
- id='123',
- parent_group_id='456',
- ip_protocol='tcp',
- from_port='789',
- to_port='987',
- group=dict(name='group', tenant_id='tenant'),
- ip_range=dict(cidr='10.0.0.0/8'))
- rule = dict(security_group_rule=raw_rule)
- text = self.rule_serializer.serialize(rule)
-
- tree = etree.fromstring(text)
-
- self.assertEqual('security_group_rule', self._tag(tree))
- self._verify_security_group_rule(raw_rule, tree)
-
- def test_group_serializer(self):
- rules = [dict(
- id='123',
- parent_group_id='456',
- ip_protocol='tcp',
- from_port='789',
- to_port='987',
- group=dict(name='group1', tenant_id='tenant1'),
- ip_range=dict(cidr='10.55.44.0/24')),
- dict(
- id='654',
- parent_group_id='321',
- ip_protocol='udp',
- from_port='234',
- to_port='567',
- group=dict(name='group2', tenant_id='tenant2'),
- ip_range=dict(cidr='10.44.55.0/24'))]
- raw_group = dict(
- id='890',
- description='description',
- name='name',
- tenant_id='tenant',
- rules=rules)
- sg_group = dict(security_group=raw_group)
- text = self.default_serializer.serialize(sg_group)
-
- tree = etree.fromstring(text)
-
- self._verify_security_group(raw_group, tree)
-
- def test_groups_serializer(self):
- rules = [dict(
- id='123',
- parent_group_id='1234',
- ip_protocol='tcp',
- from_port='12345',
- to_port='123456',
- group=dict(name='group1', tenant_id='tenant1'),
- ip_range=dict(cidr='10.123.0.0/24')),
- dict(
- id='234',
- parent_group_id='2345',
- ip_protocol='udp',
- from_port='23456',
- to_port='234567',
- group=dict(name='group2', tenant_id='tenant2'),
- ip_range=dict(cidr='10.234.0.0/24')),
- dict(
- id='345',
- parent_group_id='3456',
- ip_protocol='tcp',
- from_port='34567',
- to_port='345678',
- group=dict(name='group3', tenant_id='tenant3'),
- ip_range=dict(cidr='10.345.0.0/24')),
- dict(
- id='456',
- parent_group_id='4567',
- ip_protocol='udp',
- from_port='45678',
- to_port='456789',
- group=dict(name='group4', tenant_id='tenant4'),
- ip_range=dict(cidr='10.456.0.0/24'))]
- groups = [dict(
- id='567',
- description='description1',
- name='name1',
- tenant_id='tenant1',
- rules=rules[0:2]),
- dict(
- id='678',
- description='description2',
- name='name2',
- tenant_id='tenant2',
- rules=rules[2:4])]
- sg_groups = dict(security_groups=groups)
- text = self.index_serializer.serialize(sg_groups)
-
- tree = etree.fromstring(text)
-
- self.assertEqual('security_groups', self._tag(tree))
- self.assertEqual(len(groups), len(tree))
- for idx, child in enumerate(tree):
- self._verify_security_group(groups[idx], child)
-
-
UUID1 = '00000000-0000-0000-0000-000000000001'
UUID2 = '00000000-0000-0000-0000-000000000002'
UUID3 = '00000000-0000-0000-0000-000000000003'
def fake_compute_get_all(*args, **kwargs):
- base = {'id': 1, 'description': 'foo', 'user_id': 'bar',
- 'project_id': 'baz', 'deleted': False, 'deleted_at': None,
- 'updated_at': None, 'created_at': None}
- db_list = [
- fakes.stub_instance(
- 1, uuid=UUID1,
- security_groups=[dict(base, **{'name': 'fake-0-0'}),
- dict(base, **{'name': 'fake-0-1'})]),
- fakes.stub_instance(
- 2, uuid=UUID2,
- security_groups=[dict(base, **{'name': 'fake-1-0'}),
- dict(base, **{'name': 'fake-1-1'})])
+ return [
+ fakes.stub_instance(1, uuid=UUID1,
+ security_groups=[{'name': 'fake-0-0'},
+ {'name': 'fake-0-1'}]),
+ fakes.stub_instance(2, uuid=UUID2,
+ security_groups=[{'name': 'fake-1-0'},
+ {'name': 'fake-1-1'}])
]
- return instance_obj._make_instance_list(args[1],
- instance_obj.InstanceList(),
- db_list,
- ['metadata', 'system_metadata',
- 'security_groups', 'info_cache'])
-
def fake_compute_get(*args, **kwargs):
return fakes.stub_instance(1, uuid=UUID3,
@@ -1499,15 +296,12 @@ class SecurityGroupsOutputTest(test.TestCase):
def setUp(self):
super(SecurityGroupsOutputTest, self).setUp()
- self.controller = security_groups.SecurityGroupController()
fakes.stub_out_nw_api(self.stubs)
self.stubs.Set(compute.api.API, 'get', fake_compute_get)
self.stubs.Set(compute.api.API, 'get_all', fake_compute_get_all)
self.stubs.Set(compute.api.API, 'create', fake_compute_create)
- self.flags(
- osapi_compute_extension=[
- 'nova.api.openstack.compute.contrib.select_extensions'],
- osapi_compute_ext_list=['Security_groups'])
+ self.app = fakes.wsgi_app_v3(init_only=('servers',
+ 'os-security-groups'))
def _make_request(self, url, body=None):
req = webob.Request.blank(url)
@@ -1516,7 +310,7 @@ class SecurityGroupsOutputTest(test.TestCase):
req.body = self._encode_body(body)
req.content_type = self.content_type
req.headers['Accept'] = self.content_type
- res = req.get_response(fakes.wsgi_app(init_only=('servers',)))
+ res = req.get_response(self.app)
return res
def _encode_body(self, body):
@@ -1532,7 +326,7 @@ class SecurityGroupsOutputTest(test.TestCase):
return server.get('security_groups')
def test_create(self):
- url = '/v2/fake/servers'
+ url = '/v3/servers'
image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
server = dict(name='server_test', imageRef=image_uuid, flavorRef=2)
res = self._make_request(url, {'server': server})
@@ -1543,7 +337,7 @@ class SecurityGroupsOutputTest(test.TestCase):
self.assertEqual(group.get('name'), name)
def test_show(self):
- url = '/v2/fake/servers/%s' % UUID3
+ url = '/v3/servers/%s' % UUID3
res = self._make_request(url)
self.assertEqual(res.status_int, 200)
@@ -1553,7 +347,7 @@ class SecurityGroupsOutputTest(test.TestCase):
self.assertEqual(group.get('name'), name)
def test_detail(self):
- url = '/v2/fake/servers/detail'
+ url = '/v3/servers/detail'
res = self._make_request(url)
self.assertEqual(res.status_int, 200)
@@ -1568,7 +362,7 @@ class SecurityGroupsOutputTest(test.TestCase):
raise exception.InstanceNotFound(instance_id='fake')
self.stubs.Set(compute.api.API, 'get', fake_compute_get)
- url = '/v2/fake/servers/70f6db34-de8d-4fbd-aafb-4065bdfa6115'
+ url = '/v3/servers/70f6db34-de8d-4fbd-aafb-4065bdfa6115'
res = self._make_request(url)
self.assertEqual(res.status_int, 404)
diff --git a/nova/tests/fake_policy.py b/nova/tests/fake_policy.py
index 0f315b0b9b3a..1b70c25d9db2 100644
--- a/nova/tests/fake_policy.py
+++ b/nova/tests/fake_policy.py
@@ -200,6 +200,7 @@ policy_data = """
"compute_extension:v3:os-rescue": "",
"compute_extension:security_group_default_rules": "",
"compute_extension:security_groups": "",
+ "compute_extension:v3:os-security-groups": "",
"compute_extension:server_diagnostics": "",
"compute_extension:v3:os-server-diagnostics": "",
"compute_extension:server_password": "",
diff --git a/setup.cfg b/setup.cfg
index 9b109bc7fb96..c04a7bbb16fd 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -81,6 +81,7 @@ nova.api.v3.extensions =
rescue = nova.api.openstack.compute.plugins.v3.rescue:Rescue
scheduler_hints = nova.api.openstack.compute.plugins.v3.scheduler_hints:SchedulerHints
server_diagnostics = nova.api.openstack.compute.plugins.v3.server_diagnostics:ServerDiagnostics
+ security_groups = nova.api.openstack.compute.plugins.v3.security_groups:SecurityGroups
servers = nova.api.openstack.compute.plugins.v3.servers:Servers
simple_tenant_usage = nova.api.openstack.compute.plugins.v3.simple_tenant_usage:SimpleTenantUsage
@@ -99,7 +100,7 @@ build-dir = doc/build
source-dir = doc/source
[egg_info]
-tag_build =
+tag_build =
tag_date = 0
tag_svn_revision = 0