Merge "Change hard coded numbers to constants in security group tests"

This commit is contained in:
Jenkins 2013-09-26 09:15:38 +00:00 committed by Gerrit Code Review
commit 0836aab266
7 changed files with 223 additions and 189 deletions

View File

@ -35,11 +35,6 @@ METERING_LABEL_KEY = '_metering_labels'
IPv4 = 'IPv4'
IPv6 = 'IPv6'
ICMP_PROTOCOL = 1
TCP_PROTOCOL = 6
UDP_PROTOCOL = 17
ICMPv6_PROTOCOL = 58
DHCP_RESPONSE_PORT = 68
MIN_VLAN_TAG = 1
@ -87,3 +82,13 @@ PORT_BINDING_EXT_ALIAS = 'binding'
L3_AGENT_SCHEDULER_EXT_ALIAS = 'l3_agent_scheduler'
DHCP_AGENT_SCHEDULER_EXT_ALIAS = 'dhcp_agent_scheduler'
LBAAS_AGENT_SCHEDULER_EXT_ALIAS = 'lbaas_agent_scheduler'
# Protocol names and numbers for Security Groups/Firewalls
PROTO_NAME_TCP = 'tcp'
PROTO_NAME_ICMP = 'icmp'
PROTO_NAME_ICMP_V6 = 'icmpv6'
PROTO_NAME_UDP = 'udp'
PROTO_NUM_TCP = 6
PROTO_NUM_ICMP = 1
PROTO_NUM_ICMP_V6 = 58
PROTO_NUM_UDP = 17

View File

@ -30,10 +30,10 @@ from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import uuidutils
IP_PROTOCOL_MAP = {'tcp': constants.TCP_PROTOCOL,
'udp': constants.UDP_PROTOCOL,
'icmp': constants.ICMP_PROTOCOL,
'icmpv6': constants.ICMPv6_PROTOCOL}
IP_PROTOCOL_MAP = {constants.PROTO_NAME_TCP: constants.PROTO_NUM_TCP,
constants.PROTO_NAME_UDP: constants.PROTO_NUM_UDP,
constants.PROTO_NAME_ICMP: constants.PROTO_NUM_ICMP,
constants.PROTO_NAME_ICMP_V6: constants.PROTO_NUM_ICMP_V6}
class SecurityGroup(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
@ -304,13 +304,13 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
if not rule['protocol']:
raise ext_sg.SecurityGroupProtocolRequiredWithPorts()
ip_proto = self._get_ip_proto_number(rule['protocol'])
if ip_proto in [constants.TCP_PROTOCOL, constants.UDP_PROTOCOL]:
if ip_proto in [constants.PROTO_NUM_TCP, constants.PROTO_NUM_UDP]:
if (rule['port_range_min'] is not None and
rule['port_range_min'] <= rule['port_range_max']):
pass
else:
raise ext_sg.SecurityGroupInvalidPortRange()
elif ip_proto == constants.ICMP_PROTOCOL:
elif ip_proto == constants.PROTO_NUM_ICMP:
for attr, field in [('port_range_min', 'type'),
('port_range_max', 'code')]:
if rule[attr] > 255:

View File

@ -23,6 +23,7 @@ from oslo.config import cfg
from neutron.api import extensions
from neutron.api.v2 import attributes as attr
from neutron.api.v2 import base
from neutron.common import constants as const
from neutron.common import exceptions as qexception
from neutron import manager
from neutron.openstack.common import uuidutils
@ -158,7 +159,8 @@ def _validate_name_not_default(data, valid_values=None):
attr.validators['type:name_not_default'] = _validate_name_not_default
sg_supported_protocols = [None, 'tcp', 'udp', 'icmp']
sg_supported_protocols = [None, const.PROTO_NAME_TCP,
const.PROTO_NAME_UDP, const.PROTO_NAME_ICMP]
sg_supported_ethertypes = ['IPv4', 'IPv6']
# Attribute Map

View File

@ -61,8 +61,8 @@ def get_protocol_value(protocol):
return protocol
mapping = {
'tcp': constants.TCP_PROTOCOL,
'udp': constants.UDP_PROTOCOL,
'icmp': constants.ICMP_PROTOCOL
constants.PROTO_NAME_TCP: constants.PROTO_NUM_TCP,
constants.PROTO_NAME_UDP: constants.PROTO_NUM_UDP,
constants.PROTO_NAME_ICMP: constants.PROTO_NUM_ICMP
}
return mapping.get(protocol.lower())

View File

@ -1073,7 +1073,7 @@ def create_security_profile(cluster, tenant_id, security_profile):
# Allow all dhcp responses and all ingress traffic
hidden_rules = {'logical_port_egress_rules':
[{'ethertype': 'IPv4',
'protocol': constants.UDP_PROTOCOL,
'protocol': constants.PROTO_NUM_UDP,
'port_range_min': constants.DHCP_RESPONSE_PORT,
'port_range_max': constants.DHCP_RESPONSE_PORT,
'ip_prefix': '0.0.0.0/0'}],
@ -1111,7 +1111,7 @@ def update_security_group_rules(cluster, spid, rules):
# Allow all dhcp responses in
rules['logical_port_egress_rules'].append(
{'ethertype': 'IPv4', 'protocol': constants.UDP_PROTOCOL,
{'ethertype': 'IPv4', 'protocol': constants.PROTO_NUM_UDP,
'port_range_min': constants.DHCP_RESPONSE_PORT,
'port_range_max': constants.DHCP_RESPONSE_PORT,
'ip_prefix': '0.0.0.0/0'})

View File

@ -20,6 +20,7 @@ import mock
import webob.exc
from neutron.api.v2 import attributes as attr
from neutron.common import constants as const
from neutron.common.test_lib import test_config
from neutron import context
from neutron.db import db_base_plugin_v2
@ -75,7 +76,7 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
port_range_min=None, port_range_max=None,
remote_ip_prefix=None, remote_group_id=None,
tenant_id='test_tenant',
ethertype='IPv4'):
ethertype=const.IPv4):
data = {'security_group_rule': {'security_group_id': security_group_id,
'direction': direction,
@ -110,13 +111,13 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
def _make_security_group(self, fmt, name, description, **kwargs):
res = self._create_security_group(fmt, name, description, **kwargs)
if res.status_int >= 400:
if res.status_int >= webob.exc.HTTPBadRequest.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _make_security_group_rule(self, fmt, rules, **kwargs):
res = self._create_security_group_rule(self.fmt, rules)
if res.status_int >= 400:
if res.status_int >= webob.exc.HTTPBadRequest.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
@ -136,10 +137,10 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
@contextlib.contextmanager
def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'
'd1db38eb087',
direction='ingress', protocol='tcp',
direction='ingress', protocol=const.PROTO_NAME_TCP,
port_range_min='22', port_range_max='22',
remote_ip_prefix=None, remote_group_id=None,
fmt=None, no_delete=False, ethertype='IPv4'):
fmt=None, no_delete=False, ethertype=const.IPv4):
if not fmt:
fmt = self.fmt
rule = self._build_security_group_rule(security_group_id,
@ -263,11 +264,11 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg_rules = security_group['security_group']['security_group_rules']
self.assertEqual(len(sg_rules), 2)
v4_rules = filter(lambda x: x['ethertype'] == 'IPv4', sg_rules)
v4_rules = [r for r in sg_rules if r['ethertype'] == const.IPv4]
self.assertEqual(len(v4_rules), 1)
v4_rule = v4_rules[0]
expected = {'direction': 'egress',
'ethertype': 'IPv4',
'ethertype': const.IPv4,
'remote_group_id': None,
'remote_ip_prefix': None,
'protocol': None,
@ -275,11 +276,11 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
'port_range_min': None}
self._assert_sg_rule_has_kvs(v4_rule, expected)
v6_rules = filter(lambda x: x['ethertype'] == 'IPv6', sg_rules)
v6_rules = [r for r in sg_rules if r['ethertype'] == const.IPv6]
self.assertEqual(len(v6_rules), 1)
v6_rule = v6_rules[0]
expected = {'direction': 'egress',
'ethertype': 'IPv6',
'ethertype': const.IPv6,
'remote_group_id': None,
'remote_ip_prefix': None,
'protocol': None,
@ -309,7 +310,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg['security_group']['id'])
req.environ['neutron.context'] = context.Context('', 'somebody')
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 409)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_update_default_security_group_name_fail(self):
with self.network():
@ -322,7 +323,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg['security_groups'][0]['id'])
req.environ['neutron.context'] = context.Context('', 'somebody')
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 404)
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_update_default_security_group_with_description(self):
with self.network():
@ -347,7 +348,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
description = 'my webservers'
res = self._create_security_group(self.fmt, name, description)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_list_security_groups(self):
with contextlib.nested(self.security_group(name='sg1',
@ -406,23 +407,23 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id']
ethertype = 2
rule = self._build_security_group_rule(
security_group_id, 'ingress', 'tcp', '22', '22', None, None,
ethertype=ethertype)
security_group_id, 'ingress', const.PROTO_NAME_TCP, '22',
'22', None, None, ethertype=ethertype)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_tcp_protocol_as_number(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
protocol = 6 # TCP
protocol = const.PROTO_NUM_TCP # TCP
rule = self._build_security_group_rule(
security_group_id, 'ingress', protocol, '22', '22')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
def test_create_security_group_rule_protocol_as_number(self):
name = 'webservers'
@ -434,7 +435,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id, 'ingress', protocol)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
def test_create_security_group_rule_case_insensitive(self):
name = 'webservers'
@ -457,7 +458,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.assertEqual(rule['security_group_rule']['protocol'],
protocol.lower())
self.assertEqual(rule['security_group_rule']['ethertype'],
'IPv4')
const.IPv4)
def test_get_security_group(self):
name = 'webservers'
@ -468,7 +469,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
protocol = 'tcp'
protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
keys = [('remote_ip_prefix', remote_ip_prefix),
@ -488,8 +489,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.assertEqual(group['security_group']['id'],
remote_group_id)
self.assertEqual(len(sg_rule), 3)
sg_rule = filter(lambda x: x['direction'] == 'ingress',
sg_rule)
sg_rule = [r for r in sg_rule if r['direction'] == 'ingress']
for k, v, in keys:
self.assertEqual(sg_rule[0][k], v)
@ -498,14 +498,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
description = 'my webservers'
with self.security_group(name, description, no_delete=True) as sg:
remote_group_id = sg['security_group']['id']
self._delete('security-groups', remote_group_id, 204)
self._delete('security-groups', remote_group_id,
webob.exc.HTTPNoContent.code)
def test_delete_default_security_group_admin(self):
with self.network():
res = self.new_list_request('security-groups')
sg = self.deserialize(self.fmt, res.get_response(self.ext_api))
self._delete('security-groups', sg['security_groups'][0]['id'],
204)
webob.exc.HTTPNoContent.code)
def test_delete_default_security_group_nonadmin(self):
with self.network():
@ -513,7 +514,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg = self.deserialize(self.fmt, res.get_response(self.ext_api))
neutron_context = context.Context('', 'test-tenant')
self._delete('security-groups', sg['security_groups'][0]['id'],
409, neutron_context=neutron_context)
webob.exc.HTTPConflict.code,
neutron_context=neutron_context)
def test_security_group_list_creates_default_security_group(self):
neutron_context = context.Context('', 'test-tenant')
@ -533,15 +535,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
# Verify default rule for v4 egress
sg_rules = rules['security_group_rules']
rules = filter(
lambda x: (
x['direction'] == 'egress' and x['ethertype'] == 'IPv4'),
sg_rules)
rules = [
r for r in sg_rules
if r['direction'] == 'egress' and r['ethertype'] == const.IPv4
]
self.assertEqual(len(rules), 1)
v4_egress = rules[0]
expected = {'direction': 'egress',
'ethertype': 'IPv4',
'ethertype': const.IPv4,
'remote_group_id': None,
'remote_ip_prefix': None,
'protocol': None,
@ -550,15 +552,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self._assert_sg_rule_has_kvs(v4_egress, expected)
# Verify default rule for v6 egress
rules = filter(
lambda x: (
x['direction'] == 'egress' and x['ethertype'] == 'IPv6'),
sg_rules)
rules = [
r for r in sg_rules
if r['direction'] == 'egress' and r['ethertype'] == const.IPv6
]
self.assertEqual(len(rules), 1)
v6_egress = rules[0]
expected = {'direction': 'egress',
'ethertype': 'IPv6',
'ethertype': const.IPv6,
'remote_group_id': None,
'remote_ip_prefix': None,
'protocol': None,
@ -567,15 +569,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self._assert_sg_rule_has_kvs(v6_egress, expected)
# Verify default rule for v4 ingress
rules = filter(
lambda x: (
x['direction'] == 'ingress' and x['ethertype'] == 'IPv4'),
sg_rules)
rules = [
r for r in sg_rules
if r['direction'] == 'ingress' and r['ethertype'] == const.IPv4
]
self.assertEqual(len(rules), 1)
v4_ingress = rules[0]
expected = {'direction': 'ingress',
'ethertype': 'IPv4',
'ethertype': const.IPv4,
'remote_group_id': security_group_id,
'remote_ip_prefix': None,
'protocol': None,
@ -584,15 +586,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self._assert_sg_rule_has_kvs(v4_ingress, expected)
# Verify default rule for v6 ingress
rules = filter(
lambda x: (
x['direction'] == 'ingress' and x['ethertype'] == 'IPv6'),
sg_rules)
rules = [
r for r in sg_rules
if r['direction'] == 'ingress' and r['ethertype'] == const.IPv6
]
self.assertEqual(len(rules), 1)
v6_ingress = rules[0]
expected = {'direction': 'ingress',
'ethertype': 'IPv6',
'ethertype': const.IPv6,
'remote_group_id': security_group_id,
'remote_ip_prefix': None,
'protocol': None,
@ -607,7 +609,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
protocol = 'tcp'
protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
keys = [('remote_ip_prefix', remote_ip_prefix),
@ -631,7 +633,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_group_id = sg2['security_group']['id']
protocol = 'tcp'
protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
keys = [('remote_group_id', remote_group_id),
@ -655,7 +657,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
protocol = 'icmp'
protocol = const.PROTO_NAME_ICMP
# port_range_min (ICMP type) is greater than port_range_max
# (ICMP code) in order to confirm min <= max port check is
# not called for ICMP.
@ -681,7 +683,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
protocol = 'icmp'
protocol = const.PROTO_NAME_ICMP
# ICMP type
port_range_min = 8
# ICMP code
@ -703,7 +705,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
protocol = 'tcp'
protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
@ -714,13 +716,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
remote_group_id)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_bad_security_group_id(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
protocol = 'tcp'
protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
rule = self._build_security_group_rule(security_group_id, direction,
@ -729,21 +731,21 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
remote_ip_prefix)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404)
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_bad_tenant(self):
with self.security_group() as sg:
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'protocol': const.PROTO_NAME_TCP,
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': "bad_tenant"}}
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404)
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_bad_tenant_remote_group_id(self):
with self.security_group() as sg:
@ -754,7 +756,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
rule = {'security_group_rule':
{'security_group_id': sg2['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'protocol': const.PROTO_NAME_TCP,
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': 'bad_tenant',
@ -764,7 +766,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
tenant_id='bad_tenant',
set_context=True)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404)
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_bad_tenant_security_group_rule(self):
with self.security_group() as sg:
@ -775,7 +777,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
'protocol': 'tcp',
'protocol': const.PROTO_NAME_TCP,
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': 'bad_tenant'}}
@ -784,7 +786,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
tenant_id='bad_tenant',
set_context=True)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404)
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_bad_remote_group_id(self):
name = 'webservers'
@ -793,7 +795,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id']
remote_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
protocol = 'tcp'
protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
rule = self._build_security_group_rule(security_group_id, direction,
@ -802,7 +804,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
remote_group_id=remote_group_id)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404)
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_duplicate_rules(self):
name = 'webservers'
@ -811,11 +813,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22')
sg['security_group']['id'], 'ingress',
const.PROTO_NAME_TCP, '22', '22')
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_min_port_greater_max(self):
name = 'webservers'
@ -823,14 +826,16 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
for protocol in ['tcp', 'udp', 6, 17]:
for protocol in [const.PROTO_NAME_TCP, const.PROTO_NAME_UDP,
const.PROTO_NUM_TCP, const.PROTO_NUM_UDP]:
rule = self._build_security_group_rule(
sg['security_group']['id'],
'ingress', protocol, '50', '22')
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
self.assertEqual(res.status_int,
webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_ports_but_no_protocol(self):
name = 'webservers'
@ -843,7 +848,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_port_range_min_only(self):
name = 'webservers'
@ -852,11 +857,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', None)
sg['security_group']['id'], 'ingress',
const.PROTO_NAME_TCP, '22', None)
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_port_range_max_only(self):
name = 'webservers'
@ -865,11 +871,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', None, '22')
sg['security_group']['id'], 'ingress',
const.PROTO_NAME_TCP, None, '22')
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_icmp_type_too_big(self):
name = 'webservers'
@ -878,11 +885,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'icmp', '256', None)
sg['security_group']['id'], 'ingress',
const.PROTO_NAME_ICMP, '256', None)
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_icmp_code_too_big(self):
name = 'webservers'
@ -891,11 +899,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'icmp', '8', '256')
sg['security_group']['id'], 'ingress',
const.PROTO_NAME_ICMP, '8', '256')
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_list_ports_security_group(self):
with self.network() as n:
@ -1107,7 +1116,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_groups=['bad_id'])
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_delete_security_group_port_in_use(self):
with self.network() as n:
@ -1121,7 +1130,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg['security_group']['id'])
# try to delete security group that's in use
res = self._delete('security-groups',
sg['security_group']['id'], 409)
sg['security_group']['id'],
webob.exc.HTTPConflict.code)
# delete the blocking port
self._delete('ports', port['port']['id'])
@ -1131,16 +1141,18 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
"security_group_rule create")
with self.security_group() as sg:
rule1 = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '22',
'ingress',
const.PROTO_NAME_TCP, '22',
'22', '10.0.0.1/24')
rule2 = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '23',
'ingress',
const.PROTO_NAME_TCP, '23',
'23', '10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
def test_create_security_group_rule_bulk_emulated(self):
real_has_attr = hasattr
@ -1155,17 +1167,17 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
new=fakehasattr):
with self.security_group() as sg:
rule1 = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
'10.0.0.1/24')
sg['security_group']['id'], 'ingress',
const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
rule2 = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '23', '23',
'10.0.0.1/24')
sg['security_group']['id'], 'ingress',
const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]
}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
def test_create_security_group_rule_duplicate_rule_in_post(self):
if self._skip_native_bulk:
@ -1173,13 +1185,14 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
"security_group_rule create")
with self.security_group() as sg:
rule = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '22',
'ingress',
const.PROTO_NAME_TCP, '22',
'22', '10.0.0.1/24')
rules = {'security_group_rules': [rule['security_group_rule'],
rule['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_duplicate_rule_in_post_emulated(self):
real_has_attr = hasattr
@ -1195,13 +1208,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
with self.security_group() as sg:
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
'10.0.0.1/24')
sg['security_group']['id'], 'ingress',
const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
rules = {'security_group_rules': [rule['security_group_rule'],
rule['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_duplicate_rule_db(self):
if self._skip_native_bulk:
@ -1209,13 +1222,14 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
"security_group_rule create")
with self.security_group() as sg:
rule = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '22',
'ingress',
const.PROTO_NAME_TCP, '22',
'22', '10.0.0.1/24')
rules = {'security_group_rules': [rule]}
self._create_security_group_rule(self.fmt, rules)
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_duplicate_rule_db_emulated(self):
real_has_attr = hasattr
@ -1230,13 +1244,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
new=fakehasattr):
with self.security_group() as sg:
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
'10.0.0.1/24')
sg['security_group']['id'], 'ingress',
const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
rules = {'security_group_rules': [rule]}
self._create_security_group_rule(self.fmt, rules)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_differnt_security_group_ids(self):
if self._skip_native_bulk:
@ -1245,24 +1259,24 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
with self.security_group() as sg1:
with self.security_group() as sg2:
rule1 = self._build_security_group_rule(
sg1['security_group']['id'], 'ingress', 'tcp', '22', '22',
'10.0.0.1/24')
sg1['security_group']['id'], 'ingress',
const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
rule2 = self._build_security_group_rule(
sg2['security_group']['id'], 'ingress', 'tcp', '23', '23',
'10.0.0.1/24')
sg2['security_group']['id'], 'ingress',
const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]
}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_with_invalid_ethertype(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
protocol = 'tcp'
protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
@ -1274,7 +1288,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
ethertype='IPv5')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_with_invalid_protocol(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
@ -1291,7 +1305,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
remote_group_id)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_port_with_non_uuid(self):
with self.network() as n:
@ -1300,7 +1314,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_groups=['not_valid'])
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
class TestSecurityGroupsXML(TestSecurityGroups):

View File

@ -21,11 +21,13 @@ import mock
from mock import call
import mox
from oslo.config import cfg
import webob.exc
from neutron.agent import firewall as firewall_base
from neutron.agent.linux import iptables_manager
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import constants as const
from neutron import context
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import allowedaddresspairs as addr_pair
@ -54,7 +56,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
self.rpc = FakeSGCallback()
def test_security_group_rules_for_devices_ipv4_ingress(self):
fake_prefix = test_fw.FAKE_PREFIX['IPv4']
fake_prefix = test_fw.FAKE_PREFIX[const.IPv4]
with self.network() as n:
with nested(self.subnet(n),
self.security_group()) as (subnet_v4,
@ -62,18 +64,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
'ingress', 'tcp', '22',
'ingress', const.PROTO_NAME_TCP, '22',
'22')
rule2 = self._build_security_group_rule(
sg1_id,
'ingress', 'tcp', '23',
'ingress', const.PROTO_NAME_TCP, '23',
'23', fake_prefix)
rules = {
'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port(
self.fmt, n['network']['id'],
@ -86,17 +88,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
expected = [{'direction': 'egress', 'ethertype': 'IPv4',
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv6',
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id},
{'direction': 'ingress',
'protocol': 'tcp', 'ethertype': 'IPv4',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_max': 22,
'security_group_id': sg1_id,
'port_range_min': 22},
{'direction': 'ingress', 'protocol': 'tcp',
'ethertype': 'IPv4',
{'direction': 'ingress',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_max': 23, 'security_group_id': sg1_id,
'port_range_min': 23,
'source_ip_prefix': fake_prefix},
@ -169,7 +173,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv4_egress(self):
fake_prefix = test_fw.FAKE_PREFIX['IPv4']
fake_prefix = test_fw.FAKE_PREFIX[const.IPv4]
with self.network() as n:
with nested(self.subnet(n),
self.security_group()) as (subnet_v4,
@ -177,18 +181,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
'egress', 'tcp', '22',
'egress', const.PROTO_NAME_TCP, '22',
'22')
rule2 = self._build_security_group_rule(
sg1_id,
'egress', 'udp', '23',
'egress', const.PROTO_NAME_UDP, '23',
'23', fake_prefix)
rules = {
'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port(
self.fmt, n['network']['id'],
@ -201,17 +205,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
expected = [{'direction': 'egress', 'ethertype': 'IPv4',
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv6',
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id},
{'direction': 'egress',
'protocol': 'tcp', 'ethertype': 'IPv4',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_max': 22,
'security_group_id': sg1_id,
'port_range_min': 22},
{'direction': 'egress', 'protocol': 'udp',
'ethertype': 'IPv4',
{'direction': 'egress',
'protocol': const.PROTO_NAME_UDP,
'ethertype': const.IPv4,
'port_range_max': 23, 'security_group_id': sg1_id,
'port_range_min': 23,
'dest_ip_prefix': fake_prefix},
@ -232,13 +238,13 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
sg2_id = sg2['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
'ingress', 'tcp', '24',
'ingress', const.PROTO_NAME_TCP, '24',
'25', remote_group_id=sg2['security_group']['id'])
rules = {
'security_group_rules': [rule1['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port(
self.fmt, n['network']['id'],
@ -258,17 +264,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
expected = [{'direction': 'egress', 'ethertype': 'IPv4',
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv6',
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv4',
{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg2_id},
{'direction': 'egress', 'ethertype': 'IPv6',
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg2_id},
{'direction': u'ingress',
'source_ip_prefix': u'10.0.0.3/32',
'protocol': u'tcp', 'ethertype': u'IPv4',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_max': 25, 'port_range_min': 24,
'remote_group_id': sg2_id,
'security_group_id': sg1_id},
@ -279,7 +286,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
self._delete('ports', port_id2)
def test_security_group_rules_for_devices_ipv6_ingress(self):
fake_prefix = test_fw.FAKE_PREFIX['IPv6']
fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
with self.network() as n:
with nested(self.subnet(n,
cidr=fake_prefix,
@ -289,20 +296,20 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
'ingress', 'tcp', '22',
'ingress', const.PROTO_NAME_TCP, '22',
'22',
ethertype='IPv6')
ethertype=const.IPv6)
rule2 = self._build_security_group_rule(
sg1_id,
'ingress', 'udp', '23',
'ingress', const.PROTO_NAME_UDP, '23',
'23', fake_prefix,
ethertype='IPv6')
ethertype=const.IPv6)
rules = {
'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port(
self.fmt, n['network']['id'],
@ -316,17 +323,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
expected = [{'direction': 'egress', 'ethertype': 'IPv4',
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv6',
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id},
{'direction': 'ingress',
'protocol': 'tcp', 'ethertype': 'IPv6',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv6,
'port_range_max': 22,
'security_group_id': sg1_id,
'port_range_min': 22},
{'direction': 'ingress', 'protocol': 'udp',
'ethertype': 'IPv6',
{'direction': 'ingress',
'protocol': const.PROTO_NAME_UDP,
'ethertype': const.IPv6,
'port_range_max': 23, 'security_group_id': sg1_id,
'port_range_min': 23,
'source_ip_prefix': fake_prefix},
@ -336,7 +345,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv6_egress(self):
fake_prefix = test_fw.FAKE_PREFIX['IPv6']
fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
with self.network() as n:
with nested(self.subnet(n,
cidr=fake_prefix,
@ -346,20 +355,20 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
'egress', 'tcp', '22',
'egress', const.PROTO_NAME_TCP, '22',
'22',
ethertype='IPv6')
ethertype=const.IPv6)
rule2 = self._build_security_group_rule(
sg1_id,
'egress', 'udp', '23',
'egress', const.PROTO_NAME_UDP, '23',
'23', fake_prefix,
ethertype='IPv6')
ethertype=const.IPv6)
rules = {
'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port(
self.fmt, n['network']['id'],
@ -374,18 +383,21 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
expected = [{'direction': 'egress', 'ethertype': 'IPv4',
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv6',
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id},
{'direction': 'egress',
'protocol': 'tcp', 'ethertype': 'IPv6',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv6,
'port_range_max': 22,
'security_group_id': sg1_id,
'port_range_min': 22},
{'direction': 'egress', 'protocol': 'udp',
'ethertype': 'IPv6',
'port_range_max': 23, 'security_group_id': sg1_id,
{'direction': 'egress',
'protocol': const.PROTO_NAME_UDP,
'ethertype': const.IPv6,
'port_range_max': 23,
'security_group_id': sg1_id,
'port_range_min': 23,
'dest_ip_prefix': fake_prefix},
]
@ -394,7 +406,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv6_source_group(self):
fake_prefix = test_fw.FAKE_PREFIX['IPv6']
fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
with self.network() as n:
with nested(self.subnet(n,
cidr=fake_prefix,
@ -407,15 +419,15 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
sg2_id = sg2['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
'ingress', 'tcp', '24',
'ingress', const.PROTO_NAME_TCP, '24',
'25',
ethertype='IPv6',
ethertype=const.IPv6,
remote_group_id=sg2['security_group']['id'])
rules = {
'security_group_rules': [rule1['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port(
self.fmt, n['network']['id'],
@ -438,17 +450,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
expected = [{'direction': 'egress', 'ethertype': 'IPv4',
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv6',
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv4',
{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg2_id},
{'direction': 'egress', 'ethertype': 'IPv6',
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg2_id},
{'direction': 'ingress',
'source_ip_prefix': 'fe80::3/128',
'protocol': 'tcp', 'ethertype': 'IPv6',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv6,
'port_range_max': 25, 'port_range_min': 24,
'remote_group_id': sg2_id,
'security_group_id': sg1_id},
@ -1220,36 +1233,36 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
self.rpc = mock.Mock()
self.agent.plugin_rpc = self.rpc
rule1 = [{'direction': 'ingress',
'protocol': 'udp',
'ethertype': 'IPv4',
'protocol': const.PROTO_NAME_UDP,
'ethertype': const.IPv4,
'source_ip_prefix': '10.0.0.2',
'source_port_range_min': 67,
'source_port_range_max': 67,
'port_range_min': 68,
'port_range_max': 68},
{'direction': 'ingress',
'protocol': 'tcp',
'ethertype': 'IPv4',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_min': 22,
'port_range_max': 22},
{'direction': 'egress',
'ethertype': 'IPv4'}]
'ethertype': const.IPv4}]
rule2 = rule1[:]
rule2 += [{'direction': 'ingress',
'source_ip_prefix': '10.0.0.4',
'ethertype': 'IPv4'}]
'ethertype': const.IPv4}]
rule3 = rule2[:]
rule3 += [{'direction': 'ingress',
'protocol': 'icmp',
'ethertype': 'IPv4'}]
'protocol': const.PROTO_NAME_ICMP,
'ethertype': const.IPv4}]
rule4 = rule1[:]
rule4 += [{'direction': 'ingress',
'source_ip_prefix': '10.0.0.3',
'ethertype': 'IPv4'}]
'ethertype': const.IPv4}]
rule5 = rule4[:]
rule5 += [{'direction': 'ingress',
'protocol': 'icmp',
'ethertype': 'IPv4'}]
'protocol': const.PROTO_NAME_ICMP,
'ethertype': const.IPv4}]
self.devices1 = {'tap_port1': self._device('tap_port1',
'10.0.0.3',
'12:34:56:78:9a:bc',
@ -1360,7 +1373,7 @@ class SGNotificationTestMixin():
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_group_id = sg2['security_group']['id']
protocol = 'tcp'
protocol = const.PROTO_NAME_TCP
port_range_min = 88
port_range_max = 88
with self.security_group_rule(security_group_id, direction,