Quantum Security Groups API

Implements blueprint quantum-security-groups API

In addition the the convention that if a URI has a '-' in it, it's
replaced with a '_'. For example: POST security-groups will convert
the body to {'security_group':  ..}

Change-Id: I2c5219ed1d44a43ce1bf03d49df9f5c1af23352b
This commit is contained in:
Aaron Rosen 2012-10-09 17:10:27 -07:00
parent f843263da3
commit 6a6c2e3cd5
5 changed files with 1615 additions and 2 deletions

View File

@ -111,7 +111,7 @@ class Controller(object):
if member_actions is None:
member_actions = []
self._plugin = plugin
self._collection = collection
self._collection = collection.replace('-', '_')
self._resource = resource
self._attr_info = attr_info
self._allow_bulk = allow_bulk

View File

@ -0,0 +1,466 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012 Nicira Networks, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Aaron Rosen, Nicira, Inc
#
import re
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
from sqlalchemy.orm import scoped_session
from quantum.api.v2 import attributes
from quantum.openstack.common import cfg
from quantum.common import utils
from quantum.db import model_base
from quantum.db import models_v2
from quantum.extensions import securitygroup as ext_sg
class SecurityGroup(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
"""Represents a v2 quantum security group."""
name = sa.Column(sa.String(255))
description = sa.Column(sa.String(255))
external_id = sa.Column(sa.Integer, unique=True)
class SecurityGroupPortBinding(model_base.BASEV2):
"""Represents binding between quantum ports and security profiles"""
port_id = sa.Column(sa.String(36), sa.ForeignKey("ports.id"),
primary_key=True)
security_group_id = sa.Column(sa.String(36),
sa.ForeignKey("securitygroups.id"),
primary_key=True)
class SecurityGroupRule(model_base.BASEV2, models_v2.HasId,
models_v2.HasTenant):
"""Represents a v2 quantum security group rule."""
external_id = sa.Column(sa.Integer)
security_group_id = sa.Column(sa.String(36),
sa.ForeignKey("securitygroups.id",
ondelete="CASCADE"),
nullable=False)
source_group_id = sa.Column(sa.String(36),
sa.ForeignKey("securitygroups.id",
ondelete="CASCADE"),
nullable=True)
direction = sa.Column(sa.Enum('ingress', 'egress'))
ethertype = sa.Column(sa.String(40))
protocol = sa.Column(sa.String(40))
port_range_min = sa.Column(sa.Integer)
port_range_max = sa.Column(sa.Integer)
source_ip_prefix = sa.Column(sa.String(255))
security_group = orm.relationship(
SecurityGroup,
backref=orm.backref('rules', cascade='all,delete'),
primaryjoin="SecurityGroup.id==SecurityGroupRule.security_group_id")
source_group = orm.relationship(
SecurityGroup,
backref=orm.backref('source_rules', cascade='all,delete'),
primaryjoin="SecurityGroup.id==SecurityGroupRule.source_group_id")
class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
"""Mixin class to add security group to db_plugin_base_v2."""
__native_bulk_support = True
sg_supported_protocols = ['tcp', 'udp', 'icmp']
sg_supported_ethertypes = ['IPv4', 'IPv6']
def create_security_group_bulk(self, context, security_group_rule):
return self._create_bulk('security_group', context,
security_group_rule)
def create_security_group(self, context, security_group, default_sg=False):
"""Create security group.
If default_sg is true that means we are a default security group for
a given tenant if it does not exist.
"""
s = security_group['security_group']
if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
raise ext_sg.SecurityGroupProxyModeNotAdmin()
if (cfg.CONF.SECURITYGROUP.proxy_mode and not s.get('external_id')):
raise ext_sg.SecurityGroupProxyMode()
if not cfg.CONF.SECURITYGROUP.proxy_mode and s.get('external_id'):
raise ext_sg.SecurityGroupNotProxyMode()
tenant_id = self._get_tenant_id_for_create(context, s)
# if in proxy mode a default security group will be created by source
if not default_sg and not cfg.CONF.SECURITYGROUP.proxy_mode:
self._ensure_default_security_group(context, tenant_id,
security_group)
if s.get('external_id'):
try:
# Check if security group already exists
sg = self.get_security_group(context, s.get('external_id'))
if sg:
raise ext_sg.SecurityGroupAlreadyExists(
name=sg.get('name', ''),
external_id=s.get('external_id'))
except ext_sg.SecurityGroupNotFound:
pass
with context.session.begin(subtransactions=True):
security_group_db = SecurityGroup(id=s.get('id') or (
utils.str_uuid()),
description=s['description'],
tenant_id=tenant_id,
name=s['name'],
external_id=s.get('external_id'))
context.session.add(security_group_db)
if s.get('name') == 'default':
for ethertype in self.sg_supported_ethertypes:
# Allow all egress traffic
db = SecurityGroupRule(
id=utils.str_uuid(), tenant_id=tenant_id,
security_group=security_group_db,
direction='egress',
ethertype=ethertype)
context.session.add(db)
# Allow intercommunication
db = SecurityGroupRule(
id=utils.str_uuid(), tenant_id=tenant_id,
security_group=security_group_db,
direction='ingress',
source_group=security_group_db,
ethertype=ethertype)
context.session.add(db)
return self._make_security_group_dict(security_group_db)
def get_security_groups(self, context, filters=None, fields=None):
return self._get_collection(context, SecurityGroup,
self._make_security_group_dict,
filters=filters, fields=fields)
def get_security_group(self, context, id, fields=None, tenant_id=None):
"""Tenant id is given to handle the case when we
are creating a security group or security group rule on behalf of
another use.
"""
if tenant_id:
tmp_context_tenant_id = context.tenant_id
context.tenant_id = tenant_id
try:
ret = self._make_security_group_dict(self._get_security_group(
context, id), fields)
finally:
if tenant_id:
context.tenant_id = tmp_context_tenant_id
return ret
def _get_security_group(self, context, id):
try:
query = self._model_query(context, SecurityGroup)
if not re.match(attributes.UUID_PATTERN, str(id)):
sg = query.filter(SecurityGroup.external_id == id).one()
else:
sg = query.filter(SecurityGroup.id == id).one()
except exc.NoResultFound:
raise ext_sg.SecurityGroupNotFound(id=id)
return sg
def delete_security_group(self, context, id):
if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
raise ext_sg.SecurityGroupProxyModeNotAdmin()
filters = {'security_group_id': [id]}
ports = self._get_port_security_group_bindings(context, filters)
if ports:
raise ext_sg.SecurityGroupInUse(id=id)
# confirm security group exists
sg = self._get_security_group(context, id)
if sg['name'] == 'default':
raise ext_sg.SecurityGroupCannotRemoveDefault()
with context.session.begin(subtransactions=True):
context.session.delete(sg)
def _make_security_group_dict(self, security_group, fields=None):
res = {'id': security_group['id'],
'name': security_group['name'],
'tenant_id': security_group['tenant_id'],
'description': security_group['description']}
if security_group.get('external_id'):
res['external_id'] = security_group['external_id']
return self._fields(res, fields)
def _make_security_group_binding_dict(self, security_group, fields=None):
res = {'port_id': security_group['port_id'],
'security_group_id': security_group['security_group_id']}
return self._fields(res, fields)
def _create_port_security_group_binding(self, context, port_id,
security_group_id):
with context.session.begin(subtransactions=True):
db = SecurityGroupPortBinding(port_id=port_id,
security_group_id=security_group_id)
context.session.add(db)
def _get_port_security_group_bindings(self, context,
filters=None, fields=None):
return self._get_collection(context, SecurityGroupPortBinding,
self._make_security_group_binding_dict,
filters=filters, fields=fields)
def _delete_port_security_group_bindings(self, context, port_id):
query = self._model_query(context, SecurityGroupPortBinding)
bindings = query.filter(
SecurityGroupPortBinding.port_id == port_id)
with context.session.begin(subtransactions=True):
for binding in bindings:
context.session.delete(binding)
def create_security_group_rule_bulk(self, context, security_group_rule):
return self._create_bulk('security_group_rule', context,
security_group_rule)
def create_security_group_rule_bulk_native(self, context,
security_group_rule):
r = security_group_rule['security_group_rules']
scoped_session(context.session)
security_group_id = self._validate_security_group_rules(
context, security_group_rule)
with context.session.begin(subtransactions=True):
if not self.get_security_group(context, security_group_id):
raise ext_sg.SecurityGroupNotFound(id=security_group_id)
self._check_for_duplicate_rules(context, r)
ret = []
for rule_dict in r:
rule = rule_dict['security_group_rule']
tenant_id = self._get_tenant_id_for_create(context, rule)
db = SecurityGroupRule(
id=utils.str_uuid(), tenant_id=tenant_id,
security_group_id=rule['security_group_id'],
direction=rule['direction'],
external_id=rule.get('external_id'),
source_group_id=rule.get('source_group_id'),
ethertype=rule['ethertype'],
protocol=rule['protocol'],
port_range_min=rule['port_range_min'],
port_range_max=rule['port_range_max'],
source_ip_prefix=rule.get('source_ip_prefix'))
context.session.add(db)
ret.append(self._make_security_group_rule_dict(db))
return ret
def create_security_group_rule(self, context, security_group_rule):
bulk_rule = {'security_group_rules': [security_group_rule]}
return self.create_security_group_rule_bulk_native(context,
bulk_rule)[0]
def _validate_security_group_rules(self, context, security_group_rule):
"""Check that rules being installed all belong to the same security
group, source_group_id/security_group_id belong to the same tenant,
and rules are valid.
"""
if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
raise ext_sg.SecurityGroupProxyModeNotAdmin()
new_rules = set()
tenant_ids = set()
for rules in security_group_rule['security_group_rules']:
rule = rules.get('security_group_rule')
new_rules.add(rule['security_group_id'])
if (cfg.CONF.SECURITYGROUP.proxy_mode and
not rule.get('external_id')):
raise ext_sg.SecurityGroupProxyMode()
if (not cfg.CONF.SECURITYGROUP.proxy_mode and
rule.get('external_id')):
raise ext_sg.SecurityGroupNotProxyMode()
# Check that protocol/ethertype are valid
protocol = rule.get('protocol')
if protocol and protocol not in self.sg_supported_protocols:
raise ext_sg.SecurityGroupInvalidProtocolType(value=protocol)
ethertype = rule.get('ethertype')
if ethertype and ethertype not in self.sg_supported_ethertypes:
raise ext_sg.SecurityGroupInvalidEtherType(value=ethertype)
# Check that port_range's are valid
if (rule['port_range_min'] is None and
rule['port_range_max'] is None):
pass
elif (rule['port_range_min'] is not None and
rule['port_range_min'] <= rule['port_range_max']):
if not rule['protocol']:
raise ext_sg.SecurityGroupProtocolRequiredWithPorts()
else:
raise ext_sg.SecurityGroupInvalidPortRange()
if rule['source_ip_prefix'] and rule['source_group_id']:
raise ext_sg.SecurityGroupSourceGroupAndIpPrefix()
if rule['tenant_id'] not in tenant_ids:
tenant_ids.add(rule['tenant_id'])
source_group_id = rule.get('source_group_id')
# Check that source_group_id exists for tenant
if source_group_id:
self.get_security_group(context, source_group_id,
tenant_id=rule['tenant_id'])
if len(new_rules) > 1:
raise ext_sg.SecurityGroupNotSingleGroupRules()
security_group_id = new_rules.pop()
# Confirm single tenant and that the tenant has permission
# to add rules to this security group.
if len(tenant_ids) > 1:
raise ext_sg.SecurityGroupRulesNotSingleTenant()
for tenant_id in tenant_ids:
self.get_security_group(context, security_group_id,
tenant_id=tenant_id)
return security_group_id
def _make_security_group_rule_dict(self, security_group_rule, fields=None):
res = {'id': security_group_rule['id'],
'tenant_id': security_group_rule['tenant_id'],
'security_group_id': security_group_rule['security_group_id'],
'ethertype': security_group_rule['ethertype'],
'direction': security_group_rule['direction'],
'protocol': security_group_rule['protocol'],
'port_range_min': security_group_rule['port_range_min'],
'port_range_max': security_group_rule['port_range_max'],
'source_ip_prefix': security_group_rule['source_ip_prefix'],
'source_group_id': security_group_rule['source_group_id'],
'external_id': security_group_rule['external_id']}
return self._fields(res, fields)
def _make_security_group_rule_filter_dict(self, security_group_rule):
sgr = security_group_rule['security_group_rule']
res = {'tenant_id': [sgr['tenant_id']],
'security_group_id': [sgr['security_group_id']],
'direction': [sgr['direction']]}
include_if_present = ['protocol', 'port_range_max', 'port_range_min',
'ethertype', 'source_ip_prefix',
'source_group_id', 'external_id']
for key in include_if_present:
value = sgr.get(key)
if value:
res[key] = [value]
return res
def _check_for_duplicate_rules(self, context, security_group_rules):
for i in security_group_rules:
found_self = False
for j in security_group_rules:
if i['security_group_rule'] == j['security_group_rule']:
if found_self:
raise ext_sg.DuplicateSecurityGroupRuleInPost(rule=i)
found_self = True
# Check in database if rule exists
filters = self._make_security_group_rule_filter_dict(i)
if self.get_security_group_rules(context, filters):
raise ext_sg.SecurityGroupRuleExists(rule=i)
def get_security_group_rules(self, context, filters=None, fields=None):
return self._get_collection(context, SecurityGroupRule,
self._make_security_group_rule_dict,
filters=filters, fields=fields)
def get_security_group_rule(self, context, id, fields=None):
security_group_rule = self._get_security_group_rule(context, id)
return self._make_security_group_rule_dict(security_group_rule, fields)
def _get_security_group_rule(self, context, id):
try:
if not re.match(attributes.UUID_PATTERN, id):
query = self._model_query(context, SecurityGroupRule)
sgr = query.filter(SecurityGroupRule.external_id == id).one()
else:
query = self._model_query(context, SecurityGroupRule)
sgr = query.filter(SecurityGroupRule.id == id).one()
except exc.NoResultFound:
raise ext_sg.SecurityGroupRuleNotFound(id=id)
return sgr
def delete_security_group_rule(self, context, sgrid):
if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
raise ext_sg.SecurityGroupProxyModeNotAdmin()
with context.session.begin(subtransactions=True):
rule = self._get_security_group_rule(context, sgrid)
context.session.delete(rule)
def _extend_port_dict_security_group(self, context, port):
filters = {'port_id': [port['id']]}
fields = {'security_group_id': None}
port[ext_sg.SECURITYGROUP] = []
security_group_id = self._get_port_security_group_bindings(
context, filters, fields)
for security_group_id in security_group_id:
port[ext_sg.SECURITYGROUP].append(
security_group_id['security_group_id'])
return port
def _process_port_create_security_group(self, context, port_id,
security_group_id):
if not security_group_id:
return
for security_group_id in security_group_id:
self._create_port_security_group_binding(context, port_id,
security_group_id)
def _ensure_default_security_group(self, context, tenant_id,
security_group=None):
"""Create a default security group if one doesn't exist.
:returns: the default security group id.
"""
# if in proxy mode a default security group will be created by source
if not security_group and cfg.CONF.SECURITYGROUP.proxy_mode:
return
filters = {'name': ['default'], 'tenant_id': [tenant_id]}
default_group = self.get_security_groups(context, filters)
if not default_group:
security_group = {'security_group': {'name': 'default',
'tenant_id': tenant_id,
'description': 'default'}}
if security_group:
security_group['security_group']['external_id'] = (
security_group['security_group'].get('external_id'))
ret = self.create_security_group(context, security_group, True)
return ret['id']
else:
return default_group[0]['id']
def _validate_security_groups_on_port(self, context, port):
p = port['port']
if not p.get(ext_sg.SECURITYGROUP):
return
valid_groups = self.get_security_groups(context, fields={'id': None})
valid_groups_set = set([x['id'] for x in valid_groups])
req_sg_set = set(p[ext_sg.SECURITYGROUP])
invalid_sg_set = req_sg_set - valid_groups_set
if invalid_sg_set:
msg = ' '.join(str(x) for x in invalid_sg_set)
raise ext_sg.SecurityGroupNotFound(id=msg)

View File

@ -0,0 +1,318 @@
# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import abstractmethod
from quantum.api.v2 import attributes as attr
from quantum.api.v2 import base
from quantum.common import exceptions as qexception
from quantum.extensions import extensions
from quantum import manager
from quantum.openstack.common import cfg
from quantum import quota
# Security group Exceptions
class SecurityGroupAlreadyExists(qexception.InUse):
# This can only happen if the external_id database is cleared
message = _("Security group %(name)s id %(external_id)s already exists")
class SecurityGroupInvalidProtocolType(qexception.InvalidInput):
message = _("Invalid protocol type %(value)s")
class SecurityGroupInvalidEtherType(qexception.InvalidInput):
message = _("Invalid/Unsupported ethertype %(value)s")
class SecurityGroupInvalidPortRange(qexception.InvalidInput):
message = _("For TCP/UDP protocols, port_range_min must be "
"<= port_range_max")
class SecurityGroupInvalidPortValue(qexception.InvalidInput):
message = _("Invalid value for port %(port)s")
class SecurityGroupInUse(qexception.InUse):
message = _("Security Group %(id)s in use.")
class SecurityGroupCannotRemoveDefault(qexception.InUse):
message = _("Removing default security group not allowed.")
class SecurityGroupDefaultAlreadyExists(qexception.InUse):
message = _("Default security group already exists.")
class SecurityGroupRuleInvalidProtocol(qexception.InUse):
message = _("Security group rule protocol %(protocol)s not supported "
"only protocol values %(values)s supported.")
class SecurityGroupRulesNotSingleTenant(qexception.InvalidInput):
message = _("Multiple tenant_ids in bulk security group rule create"
" not allowed")
class SecurityGroupSourceGroupAndIpPrefix(qexception.InvalidInput):
message = _("Only source_ip_prefix or source_group_id may "
"be provided.")
class SecurityGroupProtocolRequiredWithPorts(qexception.InvalidInput):
message = _("Must also specifiy protocol if port range is given.")
class SecurityGroupNotSingleGroupRules(qexception.InvalidInput):
message = _("Only allowed to update rules for "
"one security profile at a time")
class SecurityGroupSourceGroupNotFound(qexception.NotFound):
message = _("source group id %(id)s does not exist")
class SecurityGroupNotFound(qexception.NotFound):
message = _("Security group %(id)s does not exist")
class SecurityGroupRuleNotFound(qexception.NotFound):
message = _("Security group rule %(id)s does not exist")
class DuplicateSecurityGroupRuleInPost(qexception.InUse):
message = _("Duplicate Security Group Rule in POST.")
class SecurityGroupRuleExists(qexception.InUse):
message = _("Security group rule exists %(rule)s")
class SecurityGroupProxyMode(qexception.InUse):
message = _("Did not recieve external id and in proxy mode")
class SecurityGroupNotProxyMode(qexception.InUse):
message = _("Recieve external id and not in proxy mode")
class SecurityGroupProxyModeNotAdmin(qexception.InvalidExtenstionEnv):
message = _("In Proxy Mode and not from admin")
class SecurityGroupInvalidExternalID(qexception.InvalidInput):
message = _("external_id wrong type %(data)s")
def convert_validate_port_value(port):
if port is None:
return port
try:
val = int(port)
except (ValueError, TypeError):
raise SecurityGroupInvalidPortValue(port=port)
if val >= 0 and val <= 65535:
return val
else:
raise SecurityGroupInvalidPortValue(port=port)
def _validate_name_not_default(data, valid_values=None):
if not cfg.CONF.SECURITYGROUP.proxy_mode and data == "default":
raise SecurityGroupDefaultAlreadyExists()
def _validate_external_id_and_mode(external_id, valid_values=None):
if not cfg.CONF.SECURITYGROUP.proxy_mode and not external_id:
return
elif not cfg.CONF.SECURITYGROUP.proxy_mode and external_id:
raise SecurityGroupNotProxyMode()
try:
int(external_id)
except (ValueError, TypeError):
raise SecurityGroupInvalidExternalID(data=external_id)
if cfg.CONF.SECURITYGROUP.proxy_mode and not external_id:
raise SecurityGroupProxyMode()
attr.validators['type:name_not_default'] = _validate_name_not_default
attr.validators['type:external_id_and_mode'] = _validate_external_id_and_mode
# Attribute Map
RESOURCE_ATTRIBUTE_MAP = {
'security_groups': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:regex': attr.UUID_PATTERN},
'is_visible': True},
'name': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': '',
'validate': {'type:name_not_default': None}},
'description': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': ''},
'external_id': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': None,
'validate': {'type:external_id_and_mode': None}},
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True},
},
'security_group_rules': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:regex': attr.UUID_PATTERN},
'is_visible': True},
# external_id can be used to be backwards compatible with nova
'external_id': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': None,
'validate': {'type:external_id_and_mode': None}},
'security_group_id': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'required_by_policy': True},
'source_group_id': {'allow_post': True, 'allow_put': False,
'default': None, 'is_visible': True},
'direction': {'allow_post': True, 'allow_put': True,
'is_visible': True,
'validate': {'type:values': ['ingress', 'egress']}},
'protocol': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': None},
'port_range_min': {'allow_post': True, 'allow_put': False,
'convert_to': convert_validate_port_value,
'default': None, 'is_visible': True},
'port_range_max': {'allow_post': True, 'allow_put': False,
'convert_to': convert_validate_port_value,
'default': None, 'is_visible': True},
'ethertype': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': 'IPv4'},
'source_ip_prefix': {'allow_post': True, 'allow_put': False,
'default': None, 'is_visible': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True},
}
}
SECURITYGROUP = 'security_groups'
EXTENDED_ATTRIBUTES_2_0 = {
'ports': {SECURITYGROUP: {'allow_post': True,
'allow_put': True,
'is_visible': True,
'default': None}}}
security_group_quota_opts = [
cfg.IntOpt('quota_security_group',
default=10,
help='number of security groups allowed per tenant,'
'-1 for unlimited'),
cfg.IntOpt('quota_security_group_rule',
default=100,
help='number of security rules allowed per tenant, '
'-1 for unlimited'),
]
cfg.CONF.register_opts(security_group_quota_opts, 'QUOTAS')
security_group_opts = [
cfg.StrOpt('proxy_mode', default=False)
]
cfg.CONF.register_opts(security_group_opts, 'SECURITYGROUP')
class Securitygroup(object):
""" Security group extension"""
@classmethod
def get_name(cls):
return "security-group"
@classmethod
def get_alias(cls):
return "security-group"
@classmethod
def get_description(cls):
return "The security groups extension."
@classmethod
def get_namespace(cls):
# todo
return "http://docs.openstack.org/ext/securitygroups/api/v2.0"
@classmethod
def get_updated(cls):
return "2012-10-05T10:00:00-00:00"
@classmethod
def get_resources(cls):
""" Returns Ext Resources """
exts = []
plugin = manager.QuantumManager.get_plugin()
for resource_name in ['security_group', 'security_group_rule']:
collection_name = resource_name.replace('_', '-') + "s"
params = RESOURCE_ATTRIBUTE_MAP.get(resource_name + "s", dict())
quota.QUOTAS.register_resource_by_name(resource_name)
controller = base.create_resource(collection_name,
resource_name,
plugin, params, allow_bulk=True)
ex = extensions.ResourceExtension(collection_name,
controller)
exts.append(ex)
return exts
def get_extended_resources(self, version):
if version == "2.0":
return EXTENDED_ATTRIBUTES_2_0
else:
return {}
class SecurityGroupPluginBase(object):
@abstractmethod
def create_security_group(self, context, security_group):
pass
@abstractmethod
def delete_security_group(self, context, security_group):
pass
@abstractmethod
def update_security_group(self, context, security_group):
pass
@abstractmethod
def get_security_groups(self, context, filters=None, fields=None):
pass
@abstractmethod
def get_security_group(self, context, id, fields=None):
pass
@abstractmethod
def create_security_group_rule(self, context, security_group_rule):
pass
@abstractmethod
def delete_security_group_rule(self, context, sgrid):
pass
@abstractmethod
def get_security_group_rules(self, context, filters=None, fields=None):
pass
@abstractmethod
def get_security_group_rule(self, context, id, fields=None):
pass

View File

@ -258,7 +258,7 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
for arg in ('admin_state_up', 'device_id',
'mac_address', 'name', 'fixed_ips',
'tenant_id', 'device_owner'):
'tenant_id', 'device_owner', 'security_groups'):
# Arg must be present and not empty
if arg in kwargs and kwargs[arg]:
data['port'][arg] = kwargs[arg]

View File

@ -0,0 +1,829 @@
# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import os
import mock
import unittest2
import webob.exc
from quantum.api.v2 import attributes
from quantum.api.v2.router import APIRouter
from quantum import context
from quantum.common.test_lib import test_config
from quantum.common import config
from quantum.db import api as db
from quantum.db import db_base_plugin_v2
from quantum.db import securitygroups_db
from quantum.extensions.extensions import PluginAwareExtensionManager
from quantum.extensions import securitygroup as ext_sg
from quantum.manager import QuantumManager
from quantum.openstack.common import cfg
from quantum.tests.unit import test_db_plugin
from quantum.tests.unit import test_extensions
from quantum.wsgi import JSONDeserializer
DB_PLUGIN_KLASS = ('quantum.tests.unit.test_extension_security_group.'
'SecurityGroupTestPlugin')
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
ETCDIR = os.path.join(ROOTDIR, 'etc')
def etcdir(*p):
return os.path.join(ETCDIR, *p)
class SecurityGroupTestExtensionManager(object):
def get_resources(self):
return ext_sg.Securitygroup.get_resources()
def get_actions(self):
return []
def get_request_extensions(self):
return []
class SecurityGroupsTestCase(test_db_plugin.QuantumDbPluginV2TestCase,
unittest2.TestCase):
def setUp(self, plugin=None):
super(SecurityGroupsTestCase, self).setUp()
db._ENGINE = None
db._MAKER = None
# Make sure at each test a new instance of the plugin is returned
QuantumManager._instance = None
# Make sure at each test according extensions for the plugin is loaded
PluginAwareExtensionManager._instance = None
# Save the attributes map in case the plugin will alter it
# loading extensions
# Note(salvatore-orlando): shallow copy is not good enough in
# this case, but copy.deepcopy does not seem to work, since it
# causes test failures
self._attribute_map_bk = {}
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
self._attribute_map_bk[item] = (attributes.
RESOURCE_ATTRIBUTE_MAP[item].
copy())
json_deserializer = JSONDeserializer()
self._deserializers = {
'application/json': json_deserializer,
}
if not plugin:
plugin = test_config.get('plugin_name_v2', DB_PLUGIN_KLASS)
# Create the default configurations
args = ['--config-file', etcdir('quantum.conf.test')]
# If test_config specifies some config-file, use it, as well
for config_file in test_config.get('config_files', []):
args.extend(['--config-file', config_file])
config.parse(args=args)
# Update the plugin
cfg.CONF.set_override('core_plugin', plugin)
self.api = APIRouter()
def _is_native_bulk_supported():
plugin_obj = QuantumManager.get_plugin()
native_bulk_attr_name = ("_%s__native_bulk_support"
% plugin_obj.__class__.__name__)
return getattr(plugin_obj, native_bulk_attr_name, False)
self._skip_native_bulk = not _is_native_bulk_supported()
QuantumManager.get_plugin().supported_extension_aliases = (
["security-groups"])
ext_mgr = SecurityGroupTestExtensionManager()
if ext_mgr:
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
def tearDown(self):
super(SecurityGroupsTestCase, self).tearDown()
db._ENGINE = None
db._MAKER = None
cfg.CONF.reset()
# Restore the original attribute map
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk
def _create_security_group(self, fmt, name, description, external_id=None,
**kwargs):
data = {'security_group': {'name': name,
'tenant_id': kwargs.get('tenant_id',
'test_tenant'),
'description': description}}
if external_id:
data['security_group']['external_id'] = external_id
security_group_req = self.new_create_request('security-groups', data,
fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
security_group_req.environ['quantum.context'] = (
context.Context('', kwargs['tenant_id']))
return security_group_req.get_response(self.ext_api)
def _build_security_group_rule(self, security_group_id, direction,
protocol, port_range_min, port_range_max,
source_ip_prefix=None, source_group_id=None,
external_id=None, tenant_id='test_tenant'):
data = {'security_group_rule': {'security_group_id': security_group_id,
'direction': direction,
'protocol': protocol,
'port_range_min': port_range_min,
'port_range_max': port_range_max,
'tenant_id': tenant_id}}
if external_id:
data['security_group_rule']['external_id'] = external_id
if source_ip_prefix:
data['security_group_rule']['source_ip_prefix'] = source_ip_prefix
if source_group_id:
data['security_group_rule']['source_group_id'] = source_group_id
return data
def _create_security_group_rule(self, fmt, rules, **kwargs):
security_group_rule_req = self.new_create_request(
'security-group-rules', rules, fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
security_group_rule_req.environ['quantum.context'] = (
context.Context('', kwargs['tenant_id']))
return security_group_rule_req.get_response(self.ext_api)
@contextlib.contextmanager
def security_group(self, name='webservers', description='webservers',
external_id=None, fmt='json', no_delete=False):
res = self._create_security_group(fmt, name, description,
external_id)
security_group = self.deserialize(fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
yield security_group
if not no_delete:
self._delete('security-groups',
security_group['security_group']['id'])
@contextlib.contextmanager
def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'
'd1db38eb087',
direction='ingress', protocol='tcp',
port_range_min='22', port_range_max='22',
source_ip_prefix=None, source_group_id=None,
external_id=None, fmt='json', no_delete=False):
rule = self._build_security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_ip_prefix,
source_group_id, external_id)
res = self._create_security_group_rule('json', rule)
security_group_rule = self.deserialize(fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
yield security_group_rule
if not no_delete:
self._delete('security-group-rules',
security_group_rule['security_group_rule']['id'])
class SecurityGroupTestPlugin(db_base_plugin_v2.QuantumDbPluginV2,
securitygroups_db.SecurityGroupDbMixin):
""" Test plugin that implements necessary calls on create/delete port for
associating ports with security groups.
"""
supported_extension_aliases = ["security-group"]
def create_port(self, context, port):
tenant_id = self._get_tenant_id_for_create(context, port['port'])
default_sg = self._ensure_default_security_group(context, tenant_id)
if not port['port'].get(ext_sg.SECURITYGROUP):
port['port'][ext_sg.SECURITYGROUP] = [default_sg]
self._validate_security_groups_on_port(context, port)
session = context.session
with session.begin(subtransactions=True):
sgids = port['port'].get(ext_sg.SECURITYGROUP)
port = super(SecurityGroupTestPlugin, self).create_port(context,
port)
self._process_port_create_security_group(context, port['id'],
sgids)
self._extend_port_dict_security_group(context, port)
return port
def update_port(self, context, id, port):
session = context.session
with session.begin(subtransactions=True):
self._validate_security_groups_on_port(context, port)
# delete the port binding and read it with the new rules
self._delete_port_security_group_bindings(context, id)
self._process_port_create_security_group(context, id,
port['port'].get(
ext_sg.SECURITYGROUP))
port = super(SecurityGroupTestPlugin, self).update_port(
context, id, port)
self._extend_port_dict_security_group(context, port)
return port
def delete_port(self, context, id):
session = context.session
with session.begin(subtransactions=True):
super(SecurityGroupTestPlugin, self).delete_port(context, id)
self._delete_port_security_group_bindings(context, id)
def create_network(self, context, network):
tenant_id = self._get_tenant_id_for_create(context, network['network'])
self._ensure_default_security_group(context, tenant_id)
return super(SecurityGroupTestPlugin, self).create_network(context,
network)
class SecurityGroupDBTestCase(SecurityGroupsTestCase):
def setUp(self, plugin=None):
test_config['plugin_name_v2'] = DB_PLUGIN_KLASS
ext_mgr = SecurityGroupTestExtensionManager()
test_config['extension_manager'] = ext_mgr
super(SecurityGroupDBTestCase, self).setUp()
class TestSecurityGroups(SecurityGroupDBTestCase):
def test_create_security_group(self):
name = 'webservers'
description = 'my webservers'
keys = [('name', name,), ('description', description)]
with self.security_group(name, description) as security_group:
for k, v, in keys:
self.assertEquals(security_group['security_group'][k], v)
def test_create_security_group_external_id(self):
cfg.CONF.SECURITYGROUP.proxy_mode = True
name = 'webservers'
description = 'my webservers'
external_id = 10
keys = [('name', name,), ('description', description),
('external_id', external_id)]
with self.security_group(name, description, external_id) as sg:
for k, v, in keys:
self.assertEquals(sg['security_group'][k], v)
def test_default_security_group(self):
with self.network():
res = self.new_list_request('security-groups')
groups = self.deserialize('json', res.get_response(self.ext_api))
self.assertEquals(len(groups['security_groups']), 1)
def test_create_security_group_proxy_mode_not_admin(self):
cfg.CONF.SECURITYGROUP.proxy_mode = True
res = self._create_security_group('json', 'webservers',
'webservers', '1',
tenant_id='bad_tenant',
set_context=True)
self.deserialize('json', res)
self.assertEquals(res.status_int, 500)
def test_create_security_group_no_external_id_proxy_mode(self):
cfg.CONF.SECURITYGROUP.proxy_mode = True
res = self._create_security_group('json', 'webservers',
'webservers')
self.deserialize('json', res)
self.assertEquals(res.status_int, 400)
def test_create_security_group_no_external_id_not_proxy_mode(self):
res = self._create_security_group('json', 'webservers',
'webservers', '1')
self.deserialize('json', res)
self.assertEquals(res.status_int, 409)
def test_create_default_security_group_fail(self):
name = 'default'
description = 'my webservers'
res = self._create_security_group('json', name, description)
self.deserialize('json', res)
self.assertEquals(res.status_int, 409)
def test_create_security_group_duplicate_external_id(self):
cfg.CONF.SECURITYGROUP.proxy_mode = True
name = 'webservers'
description = 'my webservers'
external_id = 1
with self.security_group(name, description, external_id):
res = self._create_security_group('json', name, description,
external_id)
self.deserialize('json', res)
self.assertEquals(res.status_int, 409)
def test_list_security_groups(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description):
res = self.new_list_request('security-groups')
groups = self.deserialize('json', res.get_response(self.ext_api))
self.assertEquals(len(groups['security_groups']), 2)
def test_get_security_group(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
source_group_id = sg['security_group']['id']
res = self.new_show_request('security-groups', source_group_id)
group = self.deserialize('json', res.get_response(self.ext_api))
self.assertEquals(group['security_group']['id'], source_group_id)
def test_delete_security_group(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description, no_delete=True) as sg:
source_group_id = sg['security_group']['id']
self._delete('security-groups', source_group_id, 204)
def test_delete_default_security_group_fail(self):
with self.network():
res = self.new_list_request('security-groups')
sg = self.deserialize('json', res.get_response(self.ext_api))
self._delete('security-groups', sg['security_groups'][0]['id'],
409)
def test_default_security_group_rules(self):
with self.network():
res = self.new_list_request('security-groups')
groups = self.deserialize('json', res.get_response(self.ext_api))
self.assertEquals(len(groups['security_groups']), 1)
res = self.new_list_request('security-group-rules')
rules = self.deserialize('json', res.get_response(self.ext_api))
self.assertEquals(len(rules['security_group_rules']), 4)
# just generic rules to allow default egress and
# intergroup communicartion
for rule in rules['security_group_rules']:
self.assertEquals(rule['port_range_max'], None)
self.assertEquals(rule['port_range_min'], None)
self.assertEquals(rule['protocol'], None)
def test_create_security_group_rule_source_ip_prefix(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
direction = "ingress"
source_ip_prefix = "10.0.0.0/24"
protocol = 'tcp'
port_range_min = 22
port_range_max = 22
keys = [('source_ip_prefix', source_ip_prefix),
('security_group_id', security_group_id),
('direction', direction),
('protocol', protocol),
('port_range_min', port_range_min),
('port_range_max', port_range_max)]
with self.security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_ip_prefix) as rule:
for k, v, in keys:
self.assertEquals(rule['security_group_rule'][k], v)
def test_create_security_group_rule_group_id(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
with self.security_group(name, description) as sg2:
security_group_id = sg['security_group']['id']
direction = "ingress"
source_group_id = sg2['security_group']['id']
protocol = 'tcp'
port_range_min = 22
port_range_max = 22
keys = [('source_group_id', source_group_id),
('security_group_id', security_group_id),
('direction', direction),
('protocol', protocol),
('port_range_min', port_range_min),
('port_range_max', port_range_max)]
with self.security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_group_id=source_group_id
) as rule:
for k, v, in keys:
self.assertEquals(rule['security_group_rule'][k], v)
def test_create_security_group_source_group_ip_and_ip_prefix(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
source_ip_prefix = "10.0.0.0/24"
protocol = 'tcp'
port_range_min = 22
port_range_max = 22
source_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
rule = self._build_security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_ip_prefix,
source_group_id)
res = self._create_security_group_rule('json', rule)
self.deserialize('json', res)
self.assertEquals(res.status_int, 400)
def test_create_security_group_rule_bad_security_group_id(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
source_ip_prefix = "10.0.0.0/24"
protocol = 'tcp'
port_range_min = 22
port_range_max = 22
rule = self._build_security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_ip_prefix)
res = self._create_security_group_rule('json', rule)
self.deserialize('json', res)
self.assertEquals(res.status_int, 404)
def test_create_security_group_rule_bad_tenant(self):
with self.security_group() as sg:
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': "bad_tenant"}}
res = self._create_security_group_rule('json', rule)
self.deserialize('json', res)
self.assertEquals(res.status_int, 404)
def test_create_security_group_rule_exteral_id_proxy_mode(self):
cfg.CONF.SECURITYGROUP.proxy_mode = True
with self.security_group(external_id=1) as sg:
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': '22',
'port_range_max': '22',
'external_id': '1',
'tenant_id': 'test_tenant',
'source_group_id': sg['security_group']['id']}}
res = self._create_security_group_rule('json', rule)
self.deserialize('json', res)
self.assertEquals(res.status_int, 201)
def test_create_security_group_rule_exteral_id_not_proxy_mode(self):
with self.security_group() as sg:
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': '22',
'port_range_max': '22',
'external_id': 1,
'tenant_id': 'test_tenant',
'source_group_id': sg['security_group']['id']}}
res = self._create_security_group_rule('json', rule)
self.deserialize('json', res)
self.assertEquals(res.status_int, 409)
def test_create_security_group_rule_not_admin(self):
cfg.CONF.SECURITYGROUP.proxy_mode = True
with self.security_group(external_id='1') as sg:
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': 'bad_tenant',
'external_id': 1,
'source_group_id': sg['security_group']['id']}}
res = self._create_security_group_rule('json', rule,
tenant_id='bad_tenant',
set_context=True)
self.deserialize('json', res)
self.assertEquals(res.status_int, 500)
def test_create_security_group_rule_bad_tenant_source_group_id(self):
with self.security_group() as sg:
res = self._create_security_group('json', 'webservers',
'webservers',
tenant_id='bad_tenant')
sg2 = self.deserialize('json', res)
rule = {'security_group_rule':
{'security_group_id': sg2['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': 'bad_tenant',
'source_group_id': sg['security_group']['id']}}
res = self._create_security_group_rule('json', rule,
tenant_id='bad_tenant',
set_context=True)
self.deserialize('json', res)
self.assertEquals(res.status_int, 404)
def test_create_security_group_rule_bad_tenant_security_group_rule(self):
with self.security_group() as sg:
res = self._create_security_group('json', 'webservers',
'webservers',
tenant_id='bad_tenant')
self.deserialize('json', res)
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': 'bad_tenant'}}
res = self._create_security_group_rule('json', rule,
tenant_id='bad_tenant',
set_context=True)
self.deserialize('json', res)
self.assertEquals(res.status_int, 404)
def test_create_security_group_rule_bad_source_group_id(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
source_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
protocol = 'tcp'
port_range_min = 22
port_range_max = 22
rule = self._build_security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_group_id=source_group_id)
res = self._create_security_group_rule('json', rule)
self.deserialize('json', res)
self.assertEquals(res.status_int, 404)
def test_create_security_group_rule_duplicate_rules(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22')
self._create_security_group_rule('json', rule)
res = self._create_security_group_rule('json', rule)
self.deserialize('json', res)
self.assertEquals(res.status_int, 409)
def test_create_security_group_rule_min_port_greater_max(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '50', '22')
self._create_security_group_rule('json', rule)
res = self._create_security_group_rule('json', rule)
self.deserialize('json', res)
self.assertEquals(res.status_int, 400)
def test_create_security_group_rule_ports_but_no_protocol(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', None, '22', '22')
self._create_security_group_rule('json', rule)
res = self._create_security_group_rule('json', rule)
self.deserialize('json', res)
self.assertEquals(res.status_int, 400)
def test_update_port_with_security_group(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg:
res = self._create_port('json', n['network']['id'])
port = self.deserialize('json', res)
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
'name': port['port']['name'],
ext_sg.SECURITYGROUP:
[sg['security_group']['id']]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEquals(res['port'][ext_sg.SECURITYGROUP][0],
sg['security_group']['id'])
self._delete('ports', port['port']['id'])
def test_update_port_with_multiple_security_groups(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg1:
with self.security_group() as sg2:
res = self._create_port(
'json', n['network']['id'],
security_groups=[sg1['security_group']['id'],
sg2['security_group']['id']])
port = self.deserialize('json', res)
self.assertEquals(len(
port['port'][ext_sg.SECURITYGROUP]), 2)
self._delete('ports', port['port']['id'])
def test_update_port_remove_security_group(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg:
res = self._create_port('json', n['network']['id'],
security_groups=(
[sg['security_group']['id']]))
port = self.deserialize('json', res)
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
'name': port['port']['name']}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEquals(res['port'][ext_sg.SECURITYGROUP], [])
self._delete('ports', port['port']['id'])
def test_create_port_with_bad_security_group(self):
with self.network() as n:
with self.subnet(n):
res = self._create_port('json', n['network']['id'],
security_groups=['bad_id'])
self.deserialize('json', res)
self.assertEquals(res.status_int, 404)
def test_create_delete_security_group_port_in_use(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg:
res = self._create_port('json', n['network']['id'],
security_groups=(
[sg['security_group']['id']]))
port = self.deserialize('json', res)
self.assertEquals(port['port'][ext_sg.SECURITYGROUP][0],
sg['security_group']['id'])
# try to delete security group that's in use
res = self._delete('security-groups',
sg['security_group']['id'], 409)
# delete the blocking port
self._delete('ports', port['port']['id'])
def test_create_security_group_rule_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk "
"security_group_rule create")
with self.security_group() as sg:
rule1 = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '22',
'22', '10.0.0.1/24')
rule2 = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '23',
'23', '10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
res = self._create_security_group_rule('json', rules)
self.deserialize('json', res)
self.assertEquals(res.status_int, 201)
def test_create_security_group_rule_bulk_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.security_group() as sg:
rule1 = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
'10.0.0.1/24')
rule2 = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '23', '23',
'10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]
}
res = self._create_security_group_rule('json', rules)
self.deserialize('json', res)
self.assertEquals(res.status_int, 201)
def test_create_security_group_rule_duplicate_rule_in_post(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk "
"security_group_rule create")
with self.security_group() as sg:
rule = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '22',
'22', '10.0.0.1/24')
rules = {'security_group_rules': [rule['security_group_rule'],
rule['security_group_rule']]}
res = self._create_security_group_rule('json', rules)
rule = self.deserialize('json', res)
self.assertEquals(res.status_int, 409)
def test_create_security_group_rule_duplicate_rule_in_post_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.security_group() as sg:
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
'10.0.0.1/24')
rules = {'security_group_rules': [rule['security_group_rule'],
rule['security_group_rule']]}
res = self._create_security_group_rule('json', rules)
rule = self.deserialize('json', res)
self.assertEquals(res.status_int, 409)
def test_create_security_group_rule_duplicate_rule_db(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk "
"security_group_rule create")
with self.security_group() as sg:
rule = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '22',
'22', '10.0.0.1/24')
rules = {'security_group_rules': [rule]}
self._create_security_group_rule('json', rules)
res = self._create_security_group_rule('json', rules)
rule = self.deserialize('json', res)
self.assertEquals(res.status_int, 409)
def test_create_security_group_rule_duplicate_rule_db_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.security_group() as sg:
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
'10.0.0.1/24')
rules = {'security_group_rules': [rule]}
self._create_security_group_rule('json', rules)
res = self._create_security_group_rule('json', rule)
self.deserialize('json', res)
self.assertEquals(res.status_int, 409)
def test_create_security_group_rule_differnt_security_group_ids(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk "
"security_group_rule create")
with self.security_group() as sg1:
with self.security_group() as sg2:
rule1 = self._build_security_group_rule(
sg1['security_group']['id'], 'ingress', 'tcp', '22', '22',
'10.0.0.1/24')
rule2 = self._build_security_group_rule(
sg2['security_group']['id'], 'ingress', 'tcp', '23', '23',
'10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]
}
res = self._create_security_group_rule('json', rules)
self.deserialize('json', res)
self.assertEquals(res.status_int, 400)