vmware-nsx/quantum/tests/unit/test_extension_security_gro...

1034 lines
48 KiB
Python

# Copyright (c) 2012 OpenStack, LLC.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import os
import mock
import webob.exc
from quantum.api.v2 import attributes as attr
from quantum.common.test_lib import test_config
from quantum import context
from quantum.db import db_base_plugin_v2
from quantum.db import securitygroups_db
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import cfg
from quantum.tests.unit import test_db_plugin
DB_PLUGIN_KLASS = ('quantum.tests.unit.test_extension_security_group.'
'SecurityGroupTestPlugin')
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
ETCDIR = os.path.join(ROOTDIR, 'etc')
def etcdir(*p):
return os.path.join(ETCDIR, *p)
class SecurityGroupTestExtensionManager(object):
def get_resources(self):
return ext_sg.Securitygroup.get_resources()
def get_actions(self):
return []
def get_request_extensions(self):
return []
class SecurityGroupsTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
def _create_security_group(self, fmt, name, description, external_id=None,
**kwargs):
data = {'security_group': {'name': name,
'tenant_id': kwargs.get('tenant_id',
'test_tenant'),
'description': description}}
if external_id:
data['security_group']['external_id'] = external_id
security_group_req = self.new_create_request('security-groups', data,
fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
security_group_req.environ['quantum.context'] = (
context.Context('', kwargs['tenant_id']))
return security_group_req.get_response(self.ext_api)
def _build_security_group_rule(self, security_group_id, direction,
protocol, port_range_min, port_range_max,
source_ip_prefix=None, source_group_id=None,
external_id=None, tenant_id='test_tenant',
ethertype='IPv4'):
data = {'security_group_rule': {'security_group_id': security_group_id,
'direction': direction,
'protocol': protocol,
'ethertype': ethertype,
'port_range_min': port_range_min,
'port_range_max': port_range_max,
'tenant_id': tenant_id,
'ethertype': ethertype}}
if external_id:
data['security_group_rule']['external_id'] = external_id
if source_ip_prefix:
data['security_group_rule']['source_ip_prefix'] = source_ip_prefix
if source_group_id:
data['security_group_rule']['source_group_id'] = source_group_id
return data
def _create_security_group_rule(self, fmt, rules, **kwargs):
security_group_rule_req = self.new_create_request(
'security-group-rules', rules, fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
security_group_rule_req.environ['quantum.context'] = (
context.Context('', kwargs['tenant_id']))
return security_group_rule_req.get_response(self.ext_api)
def _make_security_group(self, fmt, name, description, external_id=None,
**kwargs):
res = self._create_security_group(fmt, name, description,
external_id, **kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _make_security_group_rule(self, fmt, rules, **kwargs):
res = self._create_security_group_rule(self.fmt, rules)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
@contextlib.contextmanager
def security_group(self, name='webservers', description='webservers',
external_id=None, fmt=None, no_delete=False):
if not fmt:
fmt = self.fmt
security_group = self._make_security_group(fmt, name, description,
external_id)
try:
yield security_group
finally:
if not no_delete:
self._delete('security-groups',
security_group['security_group']['id'])
@contextlib.contextmanager
def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'
'd1db38eb087',
direction='ingress', protocol='tcp',
port_range_min='22', port_range_max='22',
source_ip_prefix=None, source_group_id=None,
external_id=None, fmt=None, no_delete=False,
ethertype='IPv4'):
if not fmt:
fmt = self.fmt
rule = self._build_security_group_rule(security_group_id,
direction,
protocol, port_range_min,
port_range_max,
source_ip_prefix,
source_group_id,
external_id,
ethertype=ethertype)
security_group_rule = self._make_security_group_rule(self.fmt, rule)
try:
yield security_group_rule
finally:
if not no_delete:
self._delete('security-group-rules',
security_group_rule['security_group_rule']['id'])
class SecurityGroupsTestCaseXML(SecurityGroupsTestCase):
fmt = 'xml'
class SecurityGroupTestPlugin(db_base_plugin_v2.QuantumDbPluginV2,
securitygroups_db.SecurityGroupDbMixin):
""" Test plugin that implements necessary calls on create/delete port for
associating ports with security groups.
"""
supported_extension_aliases = ["security-group"]
def create_port(self, context, port):
tenant_id = self._get_tenant_id_for_create(context, port['port'])
default_sg = self._ensure_default_security_group(context, tenant_id)
if not attr.is_attr_set(port['port'].get(ext_sg.SECURITYGROUPS)):
port['port'][ext_sg.SECURITYGROUPS] = [default_sg]
session = context.session
with session.begin(subtransactions=True):
sgids = self._get_security_groups_on_port(context, port)
port = super(SecurityGroupTestPlugin, self).create_port(context,
port)
self._process_port_create_security_group(context, port['id'],
sgids)
self._extend_port_dict_security_group(context, port)
return port
def update_port(self, context, id, port):
session = context.session
with session.begin(subtransactions=True):
if ext_sg.SECURITYGROUPS in port['port']:
port['port'][ext_sg.SECURITYGROUPS] = (
self._get_security_groups_on_port(context, port))
# delete the port binding and read it with the new rules
self._delete_port_security_group_bindings(context, id)
self._process_port_create_security_group(
context, id, port['port'].get(ext_sg.SECURITYGROUPS))
port = super(SecurityGroupTestPlugin, self).update_port(
context, id, port)
self._extend_port_dict_security_group(context, port)
return port
def create_network(self, context, network):
tenant_id = self._get_tenant_id_for_create(context, network['network'])
self._ensure_default_security_group(context, tenant_id)
return super(SecurityGroupTestPlugin, self).create_network(context,
network)
def get_ports(self, context, filters=None, fields=None):
quantum_lports = super(SecurityGroupTestPlugin, self).get_ports(
context, filters)
for quantum_lport in quantum_lports:
self._extend_port_dict_security_group(context, quantum_lport)
return quantum_lports
class SecurityGroupDBTestCase(SecurityGroupsTestCase):
def setUp(self, plugin=None):
test_config['plugin_name_v2'] = DB_PLUGIN_KLASS
ext_mgr = SecurityGroupTestExtensionManager()
test_config['extension_manager'] = ext_mgr
super(SecurityGroupDBTestCase, self).setUp(plugin)
def tearDown(self):
del test_config['plugin_name_v2']
super(SecurityGroupDBTestCase, self).tearDown()
class TestSecurityGroups(SecurityGroupDBTestCase):
def test_create_security_group(self):
name = 'webservers'
description = 'my webservers'
keys = [('name', name,), ('description', description)]
with self.security_group(name, description) as security_group:
for k, v, in keys:
self.assertEqual(security_group['security_group'][k], v)
def test_create_security_group_external_id(self):
cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
name = 'webservers'
description = 'my webservers'
external_id = 10
keys = [('name', name,), ('description', description),
('external_id', external_id)]
with self.security_group(name, description, external_id) as sg:
for k, v, in keys:
self.assertEqual(sg['security_group'][k], v)
def test_default_security_group(self):
with self.network():
res = self.new_list_request('security-groups')
groups = self.deserialize(self.fmt, res.get_response(self.ext_api))
self.assertEqual(len(groups['security_groups']), 1)
def test_create_security_group_proxy_mode_not_admin(self):
cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
res = self._create_security_group(self.fmt, 'webservers',
'webservers', '1',
tenant_id='bad_tenant',
set_context=True)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 403)
def test_create_security_group_no_external_id_proxy_mode(self):
cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
res = self._create_security_group(self.fmt, 'webservers',
'webservers')
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_create_security_group_no_external_id_not_proxy_mode(self):
res = self._create_security_group(self.fmt, 'webservers',
'webservers', '1')
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
def test_create_default_security_group_fail(self):
name = 'default'
description = 'my webservers'
res = self._create_security_group(self.fmt, name, description)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
def test_create_security_group_duplicate_external_id(self):
cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
name = 'webservers'
description = 'my webservers'
external_id = 1
with self.security_group(name, description, external_id):
res = self._create_security_group(self.fmt, name, description,
external_id)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
def test_list_security_groups(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description):
res = self.new_list_request('security-groups')
groups = self.deserialize(self.fmt, res.get_response(self.ext_api))
self.assertEqual(len(groups['security_groups']), 2)
for group in groups['security_groups']:
if group['name'] == 'default':
self.assertEquals(len(group['security_group_rules']), 2)
else:
self.assertEquals(len(group['security_group_rules']), 0)
def test_create_security_group_rule_ethertype_invalid_as_number(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
ethertype = 2
rule = self._build_security_group_rule(
security_group_id, 'ingress', 'tcp', '22', '22', None, None,
ethertype=ethertype)
res = self._create_security_group_rule('json', rule)
self.deserialize('json', res)
self.assertEqual(res.status_int, 400)
def test_create_security_group_rule_protocol_invalid_as_number(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
protocol = 2
rule = self._build_security_group_rule(
security_group_id, 'ingress', protocol, '22', '22',
None, None)
res = self._create_security_group_rule('json', rule)
self.deserialize('json', res)
self.assertEqual(res.status_int, 400)
def test_create_security_group_rule_case_insensitive(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
direction = "ingress"
source_ip_prefix = "10.0.0.0/24"
protocol = 'TCP'
port_range_min = 22
port_range_max = 22
ethertype = 'ipV4'
with self.security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_ip_prefix,
ethertype=ethertype) as rule:
# the lower case value will be return
self.assertEquals(rule['security_group_rule']['protocol'],
protocol.lower())
self.assertEquals(rule['security_group_rule']['ethertype'],
'IPv4')
def test_get_security_group(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
source_group_id = sg['security_group']['id']
res = self.new_show_request('security-groups', source_group_id)
security_group_id = sg['security_group']['id']
direction = "ingress"
source_ip_prefix = "10.0.0.0/24"
protocol = 'tcp'
port_range_min = 22
port_range_max = 22
keys = [('source_ip_prefix', source_ip_prefix),
('security_group_id', security_group_id),
('direction', direction),
('protocol', protocol),
('port_range_min', port_range_min),
('port_range_max', port_range_max)]
with self.security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_ip_prefix):
group = self.deserialize(
self.fmt, res.get_response(self.ext_api))
sg_rule = group['security_group']['security_group_rules']
self.assertEqual(group['security_group']['id'],
source_group_id)
self.assertEqual(len(sg_rule), 1)
for k, v, in keys:
self.assertEqual(sg_rule[0][k], v)
def test_delete_security_group(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description, no_delete=True) as sg:
source_group_id = sg['security_group']['id']
self._delete('security-groups', source_group_id, 204)
def test_delete_default_security_group_fail(self):
with self.network():
res = self.new_list_request('security-groups')
sg = self.deserialize(self.fmt, res.get_response(self.ext_api))
self._delete('security-groups', sg['security_groups'][0]['id'],
409)
def test_default_security_group_rules(self):
with self.network():
res = self.new_list_request('security-groups')
groups = self.deserialize(self.fmt, res.get_response(self.ext_api))
self.assertEqual(len(groups['security_groups']), 1)
res = self.new_list_request('security-group-rules')
rules = self.deserialize(self.fmt, res.get_response(self.ext_api))
self.assertEqual(len(rules['security_group_rules']), 2)
# just generic rules to allow default egress and
# intergroup communicartion
for rule in rules['security_group_rules']:
self.assertEqual(rule['port_range_max'], None)
self.assertEqual(rule['port_range_min'], None)
self.assertEqual(rule['protocol'], None)
def test_create_security_group_rule_source_ip_prefix(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
direction = "ingress"
source_ip_prefix = "10.0.0.0/24"
protocol = 'tcp'
port_range_min = 22
port_range_max = 22
keys = [('source_ip_prefix', source_ip_prefix),
('security_group_id', security_group_id),
('direction', direction),
('protocol', protocol),
('port_range_min', port_range_min),
('port_range_max', port_range_max)]
with self.security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_ip_prefix) as rule:
for k, v, in keys:
self.assertEqual(rule['security_group_rule'][k], v)
def test_create_security_group_rule_group_id(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
with self.security_group(name, description) as sg2:
security_group_id = sg['security_group']['id']
direction = "ingress"
source_group_id = sg2['security_group']['id']
protocol = 'tcp'
port_range_min = 22
port_range_max = 22
keys = [('source_group_id', source_group_id),
('security_group_id', security_group_id),
('direction', direction),
('protocol', protocol),
('port_range_min', port_range_min),
('port_range_max', port_range_max)]
with self.security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_group_id=source_group_id
) as rule:
for k, v, in keys:
self.assertEqual(rule['security_group_rule'][k], v)
def test_create_security_group_source_group_ip_and_ip_prefix(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
source_ip_prefix = "10.0.0.0/24"
protocol = 'tcp'
port_range_min = 22
port_range_max = 22
source_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
rule = self._build_security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_ip_prefix,
source_group_id)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_create_security_group_rule_bad_security_group_id(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
source_ip_prefix = "10.0.0.0/24"
protocol = 'tcp'
port_range_min = 22
port_range_max = 22
rule = self._build_security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_ip_prefix)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404)
def test_create_security_group_rule_bad_tenant(self):
with self.security_group() as sg:
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': "bad_tenant"}}
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404)
def test_create_security_group_rule_exteral_id_proxy_mode(self):
cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
with self.security_group(external_id=1) as sg:
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': '22',
'port_range_max': '22',
'external_id': '1',
'tenant_id': 'test_tenant',
'source_group_id': sg['security_group']['id']}}
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
def test_create_security_group_rule_exteral_id_not_proxy_mode(self):
with self.security_group() as sg:
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': '22',
'port_range_max': '22',
'external_id': 1,
'tenant_id': 'test_tenant',
'source_group_id': sg['security_group']['id']}}
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
def test_create_security_group_rule_not_admin(self):
cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
with self.security_group(external_id='1') as sg:
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': 'bad_tenant',
'external_id': 1,
'source_group_id': sg['security_group']['id']}}
res = self._create_security_group_rule(self.fmt, rule,
tenant_id='bad_tenant',
set_context=True)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 403)
def test_create_security_group_rule_bad_tenant_source_group_id(self):
with self.security_group() as sg:
res = self._create_security_group(self.fmt, 'webservers',
'webservers',
tenant_id='bad_tenant')
sg2 = self.deserialize(self.fmt, res)
rule = {'security_group_rule':
{'security_group_id': sg2['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': 'bad_tenant',
'source_group_id': sg['security_group']['id']}}
res = self._create_security_group_rule(self.fmt, rule,
tenant_id='bad_tenant',
set_context=True)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404)
def test_create_security_group_rule_bad_tenant_security_group_rule(self):
with self.security_group() as sg:
res = self._create_security_group(self.fmt, 'webservers',
'webservers',
tenant_id='bad_tenant')
self.deserialize(self.fmt, res)
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': 'bad_tenant'}}
res = self._create_security_group_rule(self.fmt, rule,
tenant_id='bad_tenant',
set_context=True)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404)
def test_create_security_group_rule_bad_source_group_id(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
source_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
protocol = 'tcp'
port_range_min = 22
port_range_max = 22
rule = self._build_security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_group_id=source_group_id)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404)
def test_create_security_group_rule_duplicate_rules(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22')
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
def test_create_security_group_rule_min_port_greater_max(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '50', '22')
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_create_security_group_rule_ports_but_no_protocol(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', None, '22', '22')
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_list_ports_security_group(self):
with self.network() as n:
with self.subnet(n):
res = self._create_port(self.fmt, n['network']['id'])
self.deserialize(self.fmt, res)
res = self.new_list_request('ports')
ports = self.deserialize(self.fmt,
res.get_response(self.api))
port = ports['ports'][0]
self.assertEquals(len(port[ext_sg.SECURITYGROUPS]), 1)
self._delete('ports', port['id'])
def test_update_port_with_security_group(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg:
res = self._create_port(self.fmt, n['network']['id'])
port = self.deserialize(self.fmt, res)
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
'name': port['port']['name'],
ext_sg.SECURITYGROUPS:
[sg['security_group']['id']]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.api))
self.assertEqual(res['port'][ext_sg.SECURITYGROUPS][0],
sg['security_group']['id'])
# Test update port without security group
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
'name': port['port']['name']}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.api))
self.assertEqual(res['port'][ext_sg.SECURITYGROUPS][0],
sg['security_group']['id'])
self._delete('ports', port['port']['id'])
def test_update_port_with_multiple_security_groups(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg1:
with self.security_group() as sg2:
res = self._create_port(
self.fmt, n['network']['id'],
security_groups=[sg1['security_group']['id'],
sg2['security_group']['id']])
port = self.deserialize(self.fmt, res)
self.assertEqual(len(
port['port'][ext_sg.SECURITYGROUPS]), 2)
self._delete('ports', port['port']['id'])
def test_update_port_remove_security_group_empty_list(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg:
res = self._create_port(self.fmt, n['network']['id'],
security_groups=(
[sg['security_group']['id']]))
port = self.deserialize(self.fmt, res)
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
'name': port['port']['name'],
'security_groups': []}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.api))
self.assertEqual(res['port'].get(ext_sg.SECURITYGROUPS),
[])
self._delete('ports', port['port']['id'])
def test_update_port_remove_security_group_none(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg:
res = self._create_port(self.fmt, n['network']['id'],
security_groups=(
[sg['security_group']['id']]))
port = self.deserialize(self.fmt, res)
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
'name': port['port']['name'],
'security_groups': None}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.api))
self.assertEqual(res['port'].get(ext_sg.SECURITYGROUPS),
[])
self._delete('ports', port['port']['id'])
def test_create_port_with_bad_security_group(self):
with self.network() as n:
with self.subnet(n):
res = self._create_port(self.fmt, n['network']['id'],
security_groups=['bad_id'])
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_create_delete_security_group_port_in_use(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg:
res = self._create_port(self.fmt, n['network']['id'],
security_groups=(
[sg['security_group']['id']]))
port = self.deserialize(self.fmt, res)
self.assertEqual(port['port'][ext_sg.SECURITYGROUPS][0],
sg['security_group']['id'])
# try to delete security group that's in use
res = self._delete('security-groups',
sg['security_group']['id'], 409)
# delete the blocking port
self._delete('ports', port['port']['id'])
def test_create_security_group_rule_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk "
"security_group_rule create")
with self.security_group() as sg:
rule1 = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '22',
'22', '10.0.0.1/24')
rule2 = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '23',
'23', '10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
def test_create_security_group_rule_bulk_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.security_group() as sg:
rule1 = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
'10.0.0.1/24')
rule2 = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '23', '23',
'10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]
}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
def test_create_security_group_rule_duplicate_rule_in_post(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk "
"security_group_rule create")
with self.security_group() as sg:
rule = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '22',
'22', '10.0.0.1/24')
rules = {'security_group_rules': [rule['security_group_rule'],
rule['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
def test_create_security_group_rule_duplicate_rule_in_post_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.security_group() as sg:
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
'10.0.0.1/24')
rules = {'security_group_rules': [rule['security_group_rule'],
rule['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
def test_create_security_group_rule_duplicate_rule_db(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk "
"security_group_rule create")
with self.security_group() as sg:
rule = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '22',
'22', '10.0.0.1/24')
rules = {'security_group_rules': [rule]}
self._create_security_group_rule(self.fmt, rules)
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
def test_create_security_group_rule_duplicate_rule_db_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.security_group() as sg:
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
'10.0.0.1/24')
rules = {'security_group_rules': [rule]}
self._create_security_group_rule(self.fmt, rules)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
def test_create_security_group_rule_differnt_security_group_ids(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk "
"security_group_rule create")
with self.security_group() as sg1:
with self.security_group() as sg2:
rule1 = self._build_security_group_rule(
sg1['security_group']['id'], 'ingress', 'tcp', '22', '22',
'10.0.0.1/24')
rule2 = self._build_security_group_rule(
sg2['security_group']['id'], 'ingress', 'tcp', '23', '23',
'10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]
}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_create_security_group_rule_with_invalid_ethertype(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
source_ip_prefix = "10.0.0.0/24"
protocol = 'tcp'
port_range_min = 22
port_range_max = 22
source_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
rule = self._build_security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_ip_prefix,
source_group_id,
ethertype='IPv5')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_create_security_group_rule_with_invalid_protocol(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
source_ip_prefix = "10.0.0.0/24"
protocol = 'tcp/ip'
port_range_min = 22
port_range_max = 22
source_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
rule = self._build_security_group_rule(security_group_id, direction,
protocol, port_range_min,
port_range_max,
source_ip_prefix,
source_group_id)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_validate_port_external_id_quantum_id(self):
cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
with self.network() as n:
with self.subnet(n):
sg1 = (self.deserialize(self.fmt,
self._create_security_group(self.fmt,
'foo', 'bar', '1')))
sg2 = (self.deserialize(self.fmt,
self._create_security_group(self.fmt,
'foo', 'bar', '2')))
res = self._create_port(
self.fmt, n['network']['id'],
security_groups=[sg1['security_group']['id']])
port = self.deserialize(self.fmt, res)
# This request updates the port sending the quantum security
# group id in and a nova security group id.
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
'name': port['port']['name'],
ext_sg.SECURITYGROUPS:
[sg1['security_group']['external_id'],
sg2['security_group']['id']]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEquals(len(res['port'][ext_sg.SECURITYGROUPS]), 2)
for sg_id in res['port'][ext_sg.SECURITYGROUPS]:
# only security group id's should be
# returned and not external_ids
self.assertEquals(len(sg_id), 36)
self._delete('ports', port['port']['id'])
def test_validate_port_external_id_string_or_int(self):
cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
with self.network() as n:
with self.subnet(n):
string_id = '1'
int_id = 2
self.deserialize(
self.fmt, self._create_security_group(self.fmt,
'foo', 'bar',
string_id))
self.deserialize(
self.fmt, self._create_security_group(self.fmt,
'foo', 'bar',
int_id))
res = self._create_port(
self.fmt, n['network']['id'],
security_groups=[string_id, int_id])
port = self.deserialize(self.fmt, res)
self._delete('ports', port['port']['id'])
def test_create_port_with_non_uuid_or_int(self):
with self.network() as n:
with self.subnet(n):
res = self._create_port(self.fmt, n['network']['id'],
security_groups=['not_valid'])
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_validate_port_external_id_fail(self):
cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
with self.network() as n:
with self.subnet(n):
bad_id = 1
res = self._create_port(
self.fmt, n['network']['id'],
security_groups=[bad_id])
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404)
class TestSecurityGroupsXML(TestSecurityGroups):
fmt = 'xml'