diff --git a/neutron/common/constants.py b/neutron/common/constants.py index 016fa94c6f3..dfa8f1fd927 100644 --- a/neutron/common/constants.py +++ b/neutron/common/constants.py @@ -35,11 +35,6 @@ METERING_LABEL_KEY = '_metering_labels' IPv4 = 'IPv4' IPv6 = 'IPv6' -ICMP_PROTOCOL = 1 -TCP_PROTOCOL = 6 -UDP_PROTOCOL = 17 -ICMPv6_PROTOCOL = 58 - DHCP_RESPONSE_PORT = 68 MIN_VLAN_TAG = 1 @@ -87,3 +82,13 @@ PORT_BINDING_EXT_ALIAS = 'binding' L3_AGENT_SCHEDULER_EXT_ALIAS = 'l3_agent_scheduler' DHCP_AGENT_SCHEDULER_EXT_ALIAS = 'dhcp_agent_scheduler' LBAAS_AGENT_SCHEDULER_EXT_ALIAS = 'lbaas_agent_scheduler' + +# Protocol names and numbers for Security Groups/Firewalls +PROTO_NAME_TCP = 'tcp' +PROTO_NAME_ICMP = 'icmp' +PROTO_NAME_ICMP_V6 = 'icmpv6' +PROTO_NAME_UDP = 'udp' +PROTO_NUM_TCP = 6 +PROTO_NUM_ICMP = 1 +PROTO_NUM_ICMP_V6 = 58 +PROTO_NUM_UDP = 17 diff --git a/neutron/db/securitygroups_db.py b/neutron/db/securitygroups_db.py index 89b66d2d3a6..5986190c5d4 100644 --- a/neutron/db/securitygroups_db.py +++ b/neutron/db/securitygroups_db.py @@ -30,10 +30,10 @@ from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import uuidutils -IP_PROTOCOL_MAP = {'tcp': constants.TCP_PROTOCOL, - 'udp': constants.UDP_PROTOCOL, - 'icmp': constants.ICMP_PROTOCOL, - 'icmpv6': constants.ICMPv6_PROTOCOL} +IP_PROTOCOL_MAP = {constants.PROTO_NAME_TCP: constants.PROTO_NUM_TCP, + constants.PROTO_NAME_UDP: constants.PROTO_NUM_UDP, + constants.PROTO_NAME_ICMP: constants.PROTO_NUM_ICMP, + constants.PROTO_NAME_ICMP_V6: constants.PROTO_NUM_ICMP_V6} class SecurityGroup(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): @@ -304,13 +304,13 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase): if not rule['protocol']: raise ext_sg.SecurityGroupProtocolRequiredWithPorts() ip_proto = self._get_ip_proto_number(rule['protocol']) - if ip_proto in [constants.TCP_PROTOCOL, constants.UDP_PROTOCOL]: + if ip_proto in [constants.PROTO_NUM_TCP, constants.PROTO_NUM_UDP]: if (rule['port_range_min'] is not None and rule['port_range_min'] <= rule['port_range_max']): pass else: raise ext_sg.SecurityGroupInvalidPortRange() - elif ip_proto == constants.ICMP_PROTOCOL: + elif ip_proto == constants.PROTO_NUM_ICMP: for attr, field in [('port_range_min', 'type'), ('port_range_max', 'code')]: if rule[attr] > 255: diff --git a/neutron/extensions/securitygroup.py b/neutron/extensions/securitygroup.py index ebc1f780bc4..85d499ad59f 100644 --- a/neutron/extensions/securitygroup.py +++ b/neutron/extensions/securitygroup.py @@ -23,6 +23,7 @@ from oslo.config import cfg from neutron.api import extensions from neutron.api.v2 import attributes as attr from neutron.api.v2 import base +from neutron.common import constants as const from neutron.common import exceptions as qexception from neutron import manager from neutron.openstack.common import uuidutils @@ -158,7 +159,8 @@ def _validate_name_not_default(data, valid_values=None): attr.validators['type:name_not_default'] = _validate_name_not_default -sg_supported_protocols = [None, 'tcp', 'udp', 'icmp'] +sg_supported_protocols = [None, const.PROTO_NAME_TCP, + const.PROTO_NAME_UDP, const.PROTO_NAME_ICMP] sg_supported_ethertypes = ['IPv4', 'IPv6'] # Attribute Map diff --git a/neutron/plugins/midonet/common/net_util.py b/neutron/plugins/midonet/common/net_util.py index 26479711c37..884048675f4 100644 --- a/neutron/plugins/midonet/common/net_util.py +++ b/neutron/plugins/midonet/common/net_util.py @@ -61,8 +61,8 @@ def get_protocol_value(protocol): return protocol mapping = { - 'tcp': constants.TCP_PROTOCOL, - 'udp': constants.UDP_PROTOCOL, - 'icmp': constants.ICMP_PROTOCOL + constants.PROTO_NAME_TCP: constants.PROTO_NUM_TCP, + constants.PROTO_NAME_UDP: constants.PROTO_NUM_UDP, + constants.PROTO_NAME_ICMP: constants.PROTO_NUM_ICMP } return mapping.get(protocol.lower()) diff --git a/neutron/plugins/nicira/nvplib.py b/neutron/plugins/nicira/nvplib.py index 0842a244451..37802f1d5b8 100644 --- a/neutron/plugins/nicira/nvplib.py +++ b/neutron/plugins/nicira/nvplib.py @@ -1073,7 +1073,7 @@ def create_security_profile(cluster, tenant_id, security_profile): # Allow all dhcp responses and all ingress traffic hidden_rules = {'logical_port_egress_rules': [{'ethertype': 'IPv4', - 'protocol': constants.UDP_PROTOCOL, + 'protocol': constants.PROTO_NUM_UDP, 'port_range_min': constants.DHCP_RESPONSE_PORT, 'port_range_max': constants.DHCP_RESPONSE_PORT, 'ip_prefix': '0.0.0.0/0'}], @@ -1111,7 +1111,7 @@ def update_security_group_rules(cluster, spid, rules): # Allow all dhcp responses in rules['logical_port_egress_rules'].append( - {'ethertype': 'IPv4', 'protocol': constants.UDP_PROTOCOL, + {'ethertype': 'IPv4', 'protocol': constants.PROTO_NUM_UDP, 'port_range_min': constants.DHCP_RESPONSE_PORT, 'port_range_max': constants.DHCP_RESPONSE_PORT, 'ip_prefix': '0.0.0.0/0'}) diff --git a/neutron/tests/unit/test_extension_security_group.py b/neutron/tests/unit/test_extension_security_group.py index 17c87335122..d0f6ae0bff6 100644 --- a/neutron/tests/unit/test_extension_security_group.py +++ b/neutron/tests/unit/test_extension_security_group.py @@ -20,6 +20,7 @@ import mock import webob.exc from neutron.api.v2 import attributes as attr +from neutron.common import constants as const from neutron.common.test_lib import test_config from neutron import context from neutron.db import db_base_plugin_v2 @@ -75,7 +76,7 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase): port_range_min=None, port_range_max=None, remote_ip_prefix=None, remote_group_id=None, tenant_id='test_tenant', - ethertype='IPv4'): + ethertype=const.IPv4): data = {'security_group_rule': {'security_group_id': security_group_id, 'direction': direction, @@ -110,13 +111,13 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase): def _make_security_group(self, fmt, name, description, **kwargs): res = self._create_security_group(fmt, name, description, **kwargs) - if res.status_int >= 400: + if res.status_int >= webob.exc.HTTPBadRequest.code: raise webob.exc.HTTPClientError(code=res.status_int) return self.deserialize(fmt, res) def _make_security_group_rule(self, fmt, rules, **kwargs): res = self._create_security_group_rule(self.fmt, rules) - if res.status_int >= 400: + if res.status_int >= webob.exc.HTTPBadRequest.code: raise webob.exc.HTTPClientError(code=res.status_int) return self.deserialize(fmt, res) @@ -136,10 +137,10 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase): @contextlib.contextmanager def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7' 'd1db38eb087', - direction='ingress', protocol='tcp', + direction='ingress', protocol=const.PROTO_NAME_TCP, port_range_min='22', port_range_max='22', remote_ip_prefix=None, remote_group_id=None, - fmt=None, no_delete=False, ethertype='IPv4'): + fmt=None, no_delete=False, ethertype=const.IPv4): if not fmt: fmt = self.fmt rule = self._build_security_group_rule(security_group_id, @@ -263,11 +264,11 @@ class TestSecurityGroups(SecurityGroupDBTestCase): sg_rules = security_group['security_group']['security_group_rules'] self.assertEqual(len(sg_rules), 2) - v4_rules = filter(lambda x: x['ethertype'] == 'IPv4', sg_rules) + v4_rules = [r for r in sg_rules if r['ethertype'] == const.IPv4] self.assertEqual(len(v4_rules), 1) v4_rule = v4_rules[0] expected = {'direction': 'egress', - 'ethertype': 'IPv4', + 'ethertype': const.IPv4, 'remote_group_id': None, 'remote_ip_prefix': None, 'protocol': None, @@ -275,11 +276,11 @@ class TestSecurityGroups(SecurityGroupDBTestCase): 'port_range_min': None} self._assert_sg_rule_has_kvs(v4_rule, expected) - v6_rules = filter(lambda x: x['ethertype'] == 'IPv6', sg_rules) + v6_rules = [r for r in sg_rules if r['ethertype'] == const.IPv6] self.assertEqual(len(v6_rules), 1) v6_rule = v6_rules[0] expected = {'direction': 'egress', - 'ethertype': 'IPv6', + 'ethertype': const.IPv6, 'remote_group_id': None, 'remote_ip_prefix': None, 'protocol': None, @@ -309,7 +310,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): sg['security_group']['id']) req.environ['neutron.context'] = context.Context('', 'somebody') res = req.get_response(self.ext_api) - self.assertEqual(res.status_int, 409) + self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_update_default_security_group_name_fail(self): with self.network(): @@ -322,7 +323,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): sg['security_groups'][0]['id']) req.environ['neutron.context'] = context.Context('', 'somebody') res = req.get_response(self.ext_api) - self.assertEqual(res.status_int, 404) + self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code) def test_update_default_security_group_with_description(self): with self.network(): @@ -347,7 +348,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): description = 'my webservers' res = self._create_security_group(self.fmt, name, description) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 409) + self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_list_security_groups(self): with contextlib.nested(self.security_group(name='sg1', @@ -406,23 +407,23 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id = sg['security_group']['id'] ethertype = 2 rule = self._build_security_group_rule( - security_group_id, 'ingress', 'tcp', '22', '22', None, None, - ethertype=ethertype) + security_group_id, 'ingress', const.PROTO_NAME_TCP, '22', + '22', None, None, ethertype=ethertype) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 400) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_tcp_protocol_as_number(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] - protocol = 6 # TCP + protocol = const.PROTO_NUM_TCP # TCP rule = self._build_security_group_rule( security_group_id, 'ingress', protocol, '22', '22') res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 201) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) def test_create_security_group_rule_protocol_as_number(self): name = 'webservers' @@ -434,7 +435,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id, 'ingress', protocol) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 201) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) def test_create_security_group_rule_case_insensitive(self): name = 'webservers' @@ -457,7 +458,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): self.assertEqual(rule['security_group_rule']['protocol'], protocol.lower()) self.assertEqual(rule['security_group_rule']['ethertype'], - 'IPv4') + const.IPv4) def test_get_security_group(self): name = 'webservers' @@ -468,7 +469,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id = sg['security_group']['id'] direction = "ingress" remote_ip_prefix = "10.0.0.0/24" - protocol = 'tcp' + protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 keys = [('remote_ip_prefix', remote_ip_prefix), @@ -488,8 +489,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): self.assertEqual(group['security_group']['id'], remote_group_id) self.assertEqual(len(sg_rule), 3) - sg_rule = filter(lambda x: x['direction'] == 'ingress', - sg_rule) + sg_rule = [r for r in sg_rule if r['direction'] == 'ingress'] for k, v, in keys: self.assertEqual(sg_rule[0][k], v) @@ -498,14 +498,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase): description = 'my webservers' with self.security_group(name, description, no_delete=True) as sg: remote_group_id = sg['security_group']['id'] - self._delete('security-groups', remote_group_id, 204) + self._delete('security-groups', remote_group_id, + webob.exc.HTTPNoContent.code) def test_delete_default_security_group_admin(self): with self.network(): res = self.new_list_request('security-groups') sg = self.deserialize(self.fmt, res.get_response(self.ext_api)) self._delete('security-groups', sg['security_groups'][0]['id'], - 204) + webob.exc.HTTPNoContent.code) def test_delete_default_security_group_nonadmin(self): with self.network(): @@ -513,7 +514,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase): sg = self.deserialize(self.fmt, res.get_response(self.ext_api)) neutron_context = context.Context('', 'test-tenant') self._delete('security-groups', sg['security_groups'][0]['id'], - 409, neutron_context=neutron_context) + webob.exc.HTTPConflict.code, + neutron_context=neutron_context) def test_security_group_list_creates_default_security_group(self): neutron_context = context.Context('', 'test-tenant') @@ -533,15 +535,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase): # Verify default rule for v4 egress sg_rules = rules['security_group_rules'] - rules = filter( - lambda x: ( - x['direction'] == 'egress' and x['ethertype'] == 'IPv4'), - sg_rules) + rules = [ + r for r in sg_rules + if r['direction'] == 'egress' and r['ethertype'] == const.IPv4 + ] self.assertEqual(len(rules), 1) v4_egress = rules[0] expected = {'direction': 'egress', - 'ethertype': 'IPv4', + 'ethertype': const.IPv4, 'remote_group_id': None, 'remote_ip_prefix': None, 'protocol': None, @@ -550,15 +552,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase): self._assert_sg_rule_has_kvs(v4_egress, expected) # Verify default rule for v6 egress - rules = filter( - lambda x: ( - x['direction'] == 'egress' and x['ethertype'] == 'IPv6'), - sg_rules) + rules = [ + r for r in sg_rules + if r['direction'] == 'egress' and r['ethertype'] == const.IPv6 + ] self.assertEqual(len(rules), 1) v6_egress = rules[0] expected = {'direction': 'egress', - 'ethertype': 'IPv6', + 'ethertype': const.IPv6, 'remote_group_id': None, 'remote_ip_prefix': None, 'protocol': None, @@ -567,15 +569,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase): self._assert_sg_rule_has_kvs(v6_egress, expected) # Verify default rule for v4 ingress - rules = filter( - lambda x: ( - x['direction'] == 'ingress' and x['ethertype'] == 'IPv4'), - sg_rules) + rules = [ + r for r in sg_rules + if r['direction'] == 'ingress' and r['ethertype'] == const.IPv4 + ] self.assertEqual(len(rules), 1) v4_ingress = rules[0] expected = {'direction': 'ingress', - 'ethertype': 'IPv4', + 'ethertype': const.IPv4, 'remote_group_id': security_group_id, 'remote_ip_prefix': None, 'protocol': None, @@ -584,15 +586,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase): self._assert_sg_rule_has_kvs(v4_ingress, expected) # Verify default rule for v6 ingress - rules = filter( - lambda x: ( - x['direction'] == 'ingress' and x['ethertype'] == 'IPv6'), - sg_rules) + rules = [ + r for r in sg_rules + if r['direction'] == 'ingress' and r['ethertype'] == const.IPv6 + ] self.assertEqual(len(rules), 1) v6_ingress = rules[0] expected = {'direction': 'ingress', - 'ethertype': 'IPv6', + 'ethertype': const.IPv6, 'remote_group_id': security_group_id, 'remote_ip_prefix': None, 'protocol': None, @@ -607,7 +609,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id = sg['security_group']['id'] direction = "ingress" remote_ip_prefix = "10.0.0.0/24" - protocol = 'tcp' + protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 keys = [('remote_ip_prefix', remote_ip_prefix), @@ -631,7 +633,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id = sg['security_group']['id'] direction = "ingress" remote_group_id = sg2['security_group']['id'] - protocol = 'tcp' + protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 keys = [('remote_group_id', remote_group_id), @@ -655,7 +657,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id = sg['security_group']['id'] direction = "ingress" remote_ip_prefix = "10.0.0.0/24" - protocol = 'icmp' + protocol = const.PROTO_NAME_ICMP # port_range_min (ICMP type) is greater than port_range_max # (ICMP code) in order to confirm min <= max port check is # not called for ICMP. @@ -681,7 +683,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id = sg['security_group']['id'] direction = "ingress" remote_ip_prefix = "10.0.0.0/24" - protocol = 'icmp' + protocol = const.PROTO_NAME_ICMP # ICMP type port_range_min = 8 # ICMP code @@ -703,7 +705,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" direction = "ingress" remote_ip_prefix = "10.0.0.0/24" - protocol = 'tcp' + protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087" @@ -714,13 +716,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase): remote_group_id) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 400) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_bad_security_group_id(self): security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" direction = "ingress" remote_ip_prefix = "10.0.0.0/24" - protocol = 'tcp' + protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 rule = self._build_security_group_rule(security_group_id, direction, @@ -729,21 +731,21 @@ class TestSecurityGroups(SecurityGroupDBTestCase): remote_ip_prefix) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 404) + self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code) def test_create_security_group_rule_bad_tenant(self): with self.security_group() as sg: rule = {'security_group_rule': {'security_group_id': sg['security_group']['id'], 'direction': 'ingress', - 'protocol': 'tcp', + 'protocol': const.PROTO_NAME_TCP, 'port_range_min': '22', 'port_range_max': '22', 'tenant_id': "bad_tenant"}} res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 404) + self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code) def test_create_security_group_rule_bad_tenant_remote_group_id(self): with self.security_group() as sg: @@ -754,7 +756,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): rule = {'security_group_rule': {'security_group_id': sg2['security_group']['id'], 'direction': 'ingress', - 'protocol': 'tcp', + 'protocol': const.PROTO_NAME_TCP, 'port_range_min': '22', 'port_range_max': '22', 'tenant_id': 'bad_tenant', @@ -764,7 +766,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): tenant_id='bad_tenant', set_context=True) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 404) + self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code) def test_create_security_group_rule_bad_tenant_security_group_rule(self): with self.security_group() as sg: @@ -775,7 +777,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): rule = {'security_group_rule': {'security_group_id': sg['security_group']['id'], 'direction': 'ingress', - 'protocol': 'tcp', + 'protocol': const.PROTO_NAME_TCP, 'port_range_min': '22', 'port_range_max': '22', 'tenant_id': 'bad_tenant'}} @@ -784,7 +786,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): tenant_id='bad_tenant', set_context=True) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 404) + self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code) def test_create_security_group_rule_bad_remote_group_id(self): name = 'webservers' @@ -793,7 +795,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id = sg['security_group']['id'] remote_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" direction = "ingress" - protocol = 'tcp' + protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 rule = self._build_security_group_rule(security_group_id, direction, @@ -802,7 +804,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): remote_group_id=remote_group_id) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 404) + self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code) def test_create_security_group_rule_duplicate_rules(self): name = 'webservers' @@ -811,11 +813,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id = sg['security_group']['id'] with self.security_group_rule(security_group_id): rule = self._build_security_group_rule( - sg['security_group']['id'], 'ingress', 'tcp', '22', '22') + sg['security_group']['id'], 'ingress', + const.PROTO_NAME_TCP, '22', '22') self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 409) + self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_create_security_group_rule_min_port_greater_max(self): name = 'webservers' @@ -823,14 +826,16 @@ class TestSecurityGroups(SecurityGroupDBTestCase): with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] with self.security_group_rule(security_group_id): - for protocol in ['tcp', 'udp', 6, 17]: + for protocol in [const.PROTO_NAME_TCP, const.PROTO_NAME_UDP, + const.PROTO_NUM_TCP, const.PROTO_NUM_UDP]: rule = self._build_security_group_rule( sg['security_group']['id'], 'ingress', protocol, '50', '22') self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 400) + self.assertEqual(res.status_int, + webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_ports_but_no_protocol(self): name = 'webservers' @@ -843,7 +848,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 400) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_port_range_min_only(self): name = 'webservers' @@ -852,11 +857,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id = sg['security_group']['id'] with self.security_group_rule(security_group_id): rule = self._build_security_group_rule( - sg['security_group']['id'], 'ingress', 'tcp', '22', None) + sg['security_group']['id'], 'ingress', + const.PROTO_NAME_TCP, '22', None) self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 400) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_port_range_max_only(self): name = 'webservers' @@ -865,11 +871,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id = sg['security_group']['id'] with self.security_group_rule(security_group_id): rule = self._build_security_group_rule( - sg['security_group']['id'], 'ingress', 'tcp', None, '22') + sg['security_group']['id'], 'ingress', + const.PROTO_NAME_TCP, None, '22') self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 400) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_icmp_type_too_big(self): name = 'webservers' @@ -878,11 +885,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id = sg['security_group']['id'] with self.security_group_rule(security_group_id): rule = self._build_security_group_rule( - sg['security_group']['id'], 'ingress', 'icmp', '256', None) + sg['security_group']['id'], 'ingress', + const.PROTO_NAME_ICMP, '256', None) self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 400) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_icmp_code_too_big(self): name = 'webservers' @@ -891,11 +899,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_group_id = sg['security_group']['id'] with self.security_group_rule(security_group_id): rule = self._build_security_group_rule( - sg['security_group']['id'], 'ingress', 'icmp', '8', '256') + sg['security_group']['id'], 'ingress', + const.PROTO_NAME_ICMP, '8', '256') self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 400) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_list_ports_security_group(self): with self.network() as n: @@ -1107,7 +1116,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_groups=['bad_id']) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 400) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_delete_security_group_port_in_use(self): with self.network() as n: @@ -1121,7 +1130,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase): sg['security_group']['id']) # try to delete security group that's in use res = self._delete('security-groups', - sg['security_group']['id'], 409) + sg['security_group']['id'], + webob.exc.HTTPConflict.code) # delete the blocking port self._delete('ports', port['port']['id']) @@ -1131,16 +1141,18 @@ class TestSecurityGroups(SecurityGroupDBTestCase): "security_group_rule create") with self.security_group() as sg: rule1 = self._build_security_group_rule(sg['security_group']['id'], - 'ingress', 'tcp', '22', + 'ingress', + const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rule2 = self._build_security_group_rule(sg['security_group']['id'], - 'ingress', 'tcp', '23', + 'ingress', + const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24') rules = {'security_group_rules': [rule1['security_group_rule'], rule2['security_group_rule']]} res = self._create_security_group_rule(self.fmt, rules) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 201) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) def test_create_security_group_rule_bulk_emulated(self): real_has_attr = hasattr @@ -1155,17 +1167,17 @@ class TestSecurityGroups(SecurityGroupDBTestCase): new=fakehasattr): with self.security_group() as sg: rule1 = self._build_security_group_rule( - sg['security_group']['id'], 'ingress', 'tcp', '22', '22', - '10.0.0.1/24') + sg['security_group']['id'], 'ingress', + const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rule2 = self._build_security_group_rule( - sg['security_group']['id'], 'ingress', 'tcp', '23', '23', - '10.0.0.1/24') + sg['security_group']['id'], 'ingress', + const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24') rules = {'security_group_rules': [rule1['security_group_rule'], rule2['security_group_rule']] } res = self._create_security_group_rule(self.fmt, rules) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 201) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) def test_create_security_group_rule_duplicate_rule_in_post(self): if self._skip_native_bulk: @@ -1173,13 +1185,14 @@ class TestSecurityGroups(SecurityGroupDBTestCase): "security_group_rule create") with self.security_group() as sg: rule = self._build_security_group_rule(sg['security_group']['id'], - 'ingress', 'tcp', '22', + 'ingress', + const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rules = {'security_group_rules': [rule['security_group_rule'], rule['security_group_rule']]} res = self._create_security_group_rule(self.fmt, rules) rule = self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 409) + self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_create_security_group_rule_duplicate_rule_in_post_emulated(self): real_has_attr = hasattr @@ -1195,13 +1208,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase): with self.security_group() as sg: rule = self._build_security_group_rule( - sg['security_group']['id'], 'ingress', 'tcp', '22', '22', - '10.0.0.1/24') + sg['security_group']['id'], 'ingress', + const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rules = {'security_group_rules': [rule['security_group_rule'], rule['security_group_rule']]} res = self._create_security_group_rule(self.fmt, rules) rule = self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 409) + self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_create_security_group_rule_duplicate_rule_db(self): if self._skip_native_bulk: @@ -1209,13 +1222,14 @@ class TestSecurityGroups(SecurityGroupDBTestCase): "security_group_rule create") with self.security_group() as sg: rule = self._build_security_group_rule(sg['security_group']['id'], - 'ingress', 'tcp', '22', + 'ingress', + const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rules = {'security_group_rules': [rule]} self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules) rule = self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 409) + self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_create_security_group_rule_duplicate_rule_db_emulated(self): real_has_attr = hasattr @@ -1230,13 +1244,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase): new=fakehasattr): with self.security_group() as sg: rule = self._build_security_group_rule( - sg['security_group']['id'], 'ingress', 'tcp', '22', '22', - '10.0.0.1/24') + sg['security_group']['id'], 'ingress', + const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rules = {'security_group_rules': [rule]} self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 409) + self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_create_security_group_rule_differnt_security_group_ids(self): if self._skip_native_bulk: @@ -1245,24 +1259,24 @@ class TestSecurityGroups(SecurityGroupDBTestCase): with self.security_group() as sg1: with self.security_group() as sg2: rule1 = self._build_security_group_rule( - sg1['security_group']['id'], 'ingress', 'tcp', '22', '22', - '10.0.0.1/24') + sg1['security_group']['id'], 'ingress', + const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rule2 = self._build_security_group_rule( - sg2['security_group']['id'], 'ingress', 'tcp', '23', '23', - '10.0.0.1/24') + sg2['security_group']['id'], 'ingress', + const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24') rules = {'security_group_rules': [rule1['security_group_rule'], rule2['security_group_rule']] } res = self._create_security_group_rule(self.fmt, rules) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 400) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_with_invalid_ethertype(self): security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" direction = "ingress" remote_ip_prefix = "10.0.0.0/24" - protocol = 'tcp' + protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087" @@ -1274,7 +1288,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): ethertype='IPv5') res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 400) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_with_invalid_protocol(self): security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" @@ -1291,7 +1305,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): remote_group_id) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 400) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_port_with_non_uuid(self): with self.network() as n: @@ -1300,7 +1314,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase): security_groups=['not_valid']) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 400) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) class TestSecurityGroupsXML(TestSecurityGroups): diff --git a/neutron/tests/unit/test_security_groups_rpc.py b/neutron/tests/unit/test_security_groups_rpc.py index 02f50777a82..ca98a55be91 100644 --- a/neutron/tests/unit/test_security_groups_rpc.py +++ b/neutron/tests/unit/test_security_groups_rpc.py @@ -21,11 +21,13 @@ import mock from mock import call import mox from oslo.config import cfg +import webob.exc from neutron.agent import firewall as firewall_base from neutron.agent.linux import iptables_manager from neutron.agent import rpc as agent_rpc from neutron.agent import securitygroups_rpc as sg_rpc +from neutron.common import constants as const from neutron import context from neutron.db import securitygroups_rpc_base as sg_db_rpc from neutron.extensions import allowedaddresspairs as addr_pair @@ -54,7 +56,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): self.rpc = FakeSGCallback() def test_security_group_rules_for_devices_ipv4_ingress(self): - fake_prefix = test_fw.FAKE_PREFIX['IPv4'] + fake_prefix = test_fw.FAKE_PREFIX[const.IPv4] with self.network() as n: with nested(self.subnet(n), self.security_group()) as (subnet_v4, @@ -62,18 +64,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): sg1_id = sg1['security_group']['id'] rule1 = self._build_security_group_rule( sg1_id, - 'ingress', 'tcp', '22', + 'ingress', const.PROTO_NAME_TCP, '22', '22') rule2 = self._build_security_group_rule( sg1_id, - 'ingress', 'tcp', '23', + 'ingress', const.PROTO_NAME_TCP, '23', '23', fake_prefix) rules = { 'security_group_rules': [rule1['security_group_rule'], rule2['security_group_rule']]} res = self._create_security_group_rule(self.fmt, rules) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 201) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) res1 = self._create_port( self.fmt, n['network']['id'], @@ -86,17 +88,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): ports_rpc = self.rpc.security_group_rules_for_devices( ctx, devices=devices) port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': 'IPv4', + expected = [{'direction': 'egress', 'ethertype': const.IPv4, 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': 'IPv6', + {'direction': 'egress', 'ethertype': const.IPv6, 'security_group_id': sg1_id}, {'direction': 'ingress', - 'protocol': 'tcp', 'ethertype': 'IPv4', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv4, 'port_range_max': 22, 'security_group_id': sg1_id, 'port_range_min': 22}, - {'direction': 'ingress', 'protocol': 'tcp', - 'ethertype': 'IPv4', + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv4, 'port_range_max': 23, 'security_group_id': sg1_id, 'port_range_min': 23, 'source_ip_prefix': fake_prefix}, @@ -169,7 +173,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): self._delete('ports', port_id1) def test_security_group_rules_for_devices_ipv4_egress(self): - fake_prefix = test_fw.FAKE_PREFIX['IPv4'] + fake_prefix = test_fw.FAKE_PREFIX[const.IPv4] with self.network() as n: with nested(self.subnet(n), self.security_group()) as (subnet_v4, @@ -177,18 +181,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): sg1_id = sg1['security_group']['id'] rule1 = self._build_security_group_rule( sg1_id, - 'egress', 'tcp', '22', + 'egress', const.PROTO_NAME_TCP, '22', '22') rule2 = self._build_security_group_rule( sg1_id, - 'egress', 'udp', '23', + 'egress', const.PROTO_NAME_UDP, '23', '23', fake_prefix) rules = { 'security_group_rules': [rule1['security_group_rule'], rule2['security_group_rule']]} res = self._create_security_group_rule(self.fmt, rules) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 201) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) res1 = self._create_port( self.fmt, n['network']['id'], @@ -201,17 +205,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): ports_rpc = self.rpc.security_group_rules_for_devices( ctx, devices=devices) port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': 'IPv4', + expected = [{'direction': 'egress', 'ethertype': const.IPv4, 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': 'IPv6', + {'direction': 'egress', 'ethertype': const.IPv6, 'security_group_id': sg1_id}, {'direction': 'egress', - 'protocol': 'tcp', 'ethertype': 'IPv4', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv4, 'port_range_max': 22, 'security_group_id': sg1_id, 'port_range_min': 22}, - {'direction': 'egress', 'protocol': 'udp', - 'ethertype': 'IPv4', + {'direction': 'egress', + 'protocol': const.PROTO_NAME_UDP, + 'ethertype': const.IPv4, 'port_range_max': 23, 'security_group_id': sg1_id, 'port_range_min': 23, 'dest_ip_prefix': fake_prefix}, @@ -232,13 +238,13 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): sg2_id = sg2['security_group']['id'] rule1 = self._build_security_group_rule( sg1_id, - 'ingress', 'tcp', '24', + 'ingress', const.PROTO_NAME_TCP, '24', '25', remote_group_id=sg2['security_group']['id']) rules = { 'security_group_rules': [rule1['security_group_rule']]} res = self._create_security_group_rule(self.fmt, rules) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 201) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) res1 = self._create_port( self.fmt, n['network']['id'], @@ -258,17 +264,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): ports_rpc = self.rpc.security_group_rules_for_devices( ctx, devices=devices) port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': 'IPv4', + expected = [{'direction': 'egress', 'ethertype': const.IPv4, 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': 'IPv6', + {'direction': 'egress', 'ethertype': const.IPv6, 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': 'IPv4', + {'direction': 'egress', 'ethertype': const.IPv4, 'security_group_id': sg2_id}, - {'direction': 'egress', 'ethertype': 'IPv6', + {'direction': 'egress', 'ethertype': const.IPv6, 'security_group_id': sg2_id}, {'direction': u'ingress', 'source_ip_prefix': u'10.0.0.3/32', - 'protocol': u'tcp', 'ethertype': u'IPv4', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv4, 'port_range_max': 25, 'port_range_min': 24, 'remote_group_id': sg2_id, 'security_group_id': sg1_id}, @@ -279,7 +286,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): self._delete('ports', port_id2) def test_security_group_rules_for_devices_ipv6_ingress(self): - fake_prefix = test_fw.FAKE_PREFIX['IPv6'] + fake_prefix = test_fw.FAKE_PREFIX[const.IPv6] with self.network() as n: with nested(self.subnet(n, cidr=fake_prefix, @@ -289,20 +296,20 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): sg1_id = sg1['security_group']['id'] rule1 = self._build_security_group_rule( sg1_id, - 'ingress', 'tcp', '22', + 'ingress', const.PROTO_NAME_TCP, '22', '22', - ethertype='IPv6') + ethertype=const.IPv6) rule2 = self._build_security_group_rule( sg1_id, - 'ingress', 'udp', '23', + 'ingress', const.PROTO_NAME_UDP, '23', '23', fake_prefix, - ethertype='IPv6') + ethertype=const.IPv6) rules = { 'security_group_rules': [rule1['security_group_rule'], rule2['security_group_rule']]} res = self._create_security_group_rule(self.fmt, rules) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 201) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) res1 = self._create_port( self.fmt, n['network']['id'], @@ -316,17 +323,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): ports_rpc = self.rpc.security_group_rules_for_devices( ctx, devices=devices) port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': 'IPv4', + expected = [{'direction': 'egress', 'ethertype': const.IPv4, 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': 'IPv6', + {'direction': 'egress', 'ethertype': const.IPv6, 'security_group_id': sg1_id}, {'direction': 'ingress', - 'protocol': 'tcp', 'ethertype': 'IPv6', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv6, 'port_range_max': 22, 'security_group_id': sg1_id, 'port_range_min': 22}, - {'direction': 'ingress', 'protocol': 'udp', - 'ethertype': 'IPv6', + {'direction': 'ingress', + 'protocol': const.PROTO_NAME_UDP, + 'ethertype': const.IPv6, 'port_range_max': 23, 'security_group_id': sg1_id, 'port_range_min': 23, 'source_ip_prefix': fake_prefix}, @@ -336,7 +345,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): self._delete('ports', port_id1) def test_security_group_rules_for_devices_ipv6_egress(self): - fake_prefix = test_fw.FAKE_PREFIX['IPv6'] + fake_prefix = test_fw.FAKE_PREFIX[const.IPv6] with self.network() as n: with nested(self.subnet(n, cidr=fake_prefix, @@ -346,20 +355,20 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): sg1_id = sg1['security_group']['id'] rule1 = self._build_security_group_rule( sg1_id, - 'egress', 'tcp', '22', + 'egress', const.PROTO_NAME_TCP, '22', '22', - ethertype='IPv6') + ethertype=const.IPv6) rule2 = self._build_security_group_rule( sg1_id, - 'egress', 'udp', '23', + 'egress', const.PROTO_NAME_UDP, '23', '23', fake_prefix, - ethertype='IPv6') + ethertype=const.IPv6) rules = { 'security_group_rules': [rule1['security_group_rule'], rule2['security_group_rule']]} res = self._create_security_group_rule(self.fmt, rules) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 201) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) res1 = self._create_port( self.fmt, n['network']['id'], @@ -374,18 +383,21 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): ports_rpc = self.rpc.security_group_rules_for_devices( ctx, devices=devices) port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': 'IPv4', + expected = [{'direction': 'egress', 'ethertype': const.IPv4, 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': 'IPv6', + {'direction': 'egress', 'ethertype': const.IPv6, 'security_group_id': sg1_id}, {'direction': 'egress', - 'protocol': 'tcp', 'ethertype': 'IPv6', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv6, 'port_range_max': 22, 'security_group_id': sg1_id, 'port_range_min': 22}, - {'direction': 'egress', 'protocol': 'udp', - 'ethertype': 'IPv6', - 'port_range_max': 23, 'security_group_id': sg1_id, + {'direction': 'egress', + 'protocol': const.PROTO_NAME_UDP, + 'ethertype': const.IPv6, + 'port_range_max': 23, + 'security_group_id': sg1_id, 'port_range_min': 23, 'dest_ip_prefix': fake_prefix}, ] @@ -394,7 +406,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): self._delete('ports', port_id1) def test_security_group_rules_for_devices_ipv6_source_group(self): - fake_prefix = test_fw.FAKE_PREFIX['IPv6'] + fake_prefix = test_fw.FAKE_PREFIX[const.IPv6] with self.network() as n: with nested(self.subnet(n, cidr=fake_prefix, @@ -407,15 +419,15 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): sg2_id = sg2['security_group']['id'] rule1 = self._build_security_group_rule( sg1_id, - 'ingress', 'tcp', '24', + 'ingress', const.PROTO_NAME_TCP, '24', '25', - ethertype='IPv6', + ethertype=const.IPv6, remote_group_id=sg2['security_group']['id']) rules = { 'security_group_rules': [rule1['security_group_rule']]} res = self._create_security_group_rule(self.fmt, rules) self.deserialize(self.fmt, res) - self.assertEqual(res.status_int, 201) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) res1 = self._create_port( self.fmt, n['network']['id'], @@ -438,17 +450,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): ports_rpc = self.rpc.security_group_rules_for_devices( ctx, devices=devices) port_rpc = ports_rpc[port_id1] - expected = [{'direction': 'egress', 'ethertype': 'IPv4', + expected = [{'direction': 'egress', 'ethertype': const.IPv4, 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': 'IPv6', + {'direction': 'egress', 'ethertype': const.IPv6, 'security_group_id': sg1_id}, - {'direction': 'egress', 'ethertype': 'IPv4', + {'direction': 'egress', 'ethertype': const.IPv4, 'security_group_id': sg2_id}, - {'direction': 'egress', 'ethertype': 'IPv6', + {'direction': 'egress', 'ethertype': const.IPv6, 'security_group_id': sg2_id}, {'direction': 'ingress', 'source_ip_prefix': 'fe80::3/128', - 'protocol': 'tcp', 'ethertype': 'IPv6', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv6, 'port_range_max': 25, 'port_range_min': 24, 'remote_group_id': sg2_id, 'security_group_id': sg1_id}, @@ -1220,36 +1233,36 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase): self.rpc = mock.Mock() self.agent.plugin_rpc = self.rpc rule1 = [{'direction': 'ingress', - 'protocol': 'udp', - 'ethertype': 'IPv4', + 'protocol': const.PROTO_NAME_UDP, + 'ethertype': const.IPv4, 'source_ip_prefix': '10.0.0.2', 'source_port_range_min': 67, 'source_port_range_max': 67, 'port_range_min': 68, 'port_range_max': 68}, {'direction': 'ingress', - 'protocol': 'tcp', - 'ethertype': 'IPv4', + 'protocol': const.PROTO_NAME_TCP, + 'ethertype': const.IPv4, 'port_range_min': 22, 'port_range_max': 22}, {'direction': 'egress', - 'ethertype': 'IPv4'}] + 'ethertype': const.IPv4}] rule2 = rule1[:] rule2 += [{'direction': 'ingress', 'source_ip_prefix': '10.0.0.4', - 'ethertype': 'IPv4'}] + 'ethertype': const.IPv4}] rule3 = rule2[:] rule3 += [{'direction': 'ingress', - 'protocol': 'icmp', - 'ethertype': 'IPv4'}] + 'protocol': const.PROTO_NAME_ICMP, + 'ethertype': const.IPv4}] rule4 = rule1[:] rule4 += [{'direction': 'ingress', 'source_ip_prefix': '10.0.0.3', - 'ethertype': 'IPv4'}] + 'ethertype': const.IPv4}] rule5 = rule4[:] rule5 += [{'direction': 'ingress', - 'protocol': 'icmp', - 'ethertype': 'IPv4'}] + 'protocol': const.PROTO_NAME_ICMP, + 'ethertype': const.IPv4}] self.devices1 = {'tap_port1': self._device('tap_port1', '10.0.0.3', '12:34:56:78:9a:bc', @@ -1360,7 +1373,7 @@ class SGNotificationTestMixin(): security_group_id = sg['security_group']['id'] direction = "ingress" remote_group_id = sg2['security_group']['id'] - protocol = 'tcp' + protocol = const.PROTO_NAME_TCP port_range_min = 88 port_range_max = 88 with self.security_group_rule(security_group_id, direction,