From 4bf0bfb86c7eb79ffd57ad1dfea8ec7be5b52c58 Mon Sep 17 00:00:00 2001 From: Matt Dietz Date: Wed, 17 Sep 2014 03:30:38 +0000 Subject: [PATCH] RM8880 Adds translation methods for making ethertype and protocol human readable. Additionally, adds a new migration that includes a new foreign key from remote_group_id back to the reference security group and converts ethertype to an integer. --- ...6e984b48a0d_revise_security_group_rules.py | 64 +++++++++ quark/db/migration/alembic/versions/HEAD | 2 +- quark/db/models.py | 10 +- quark/exceptions.py | 4 + quark/plugin.py | 1 + quark/plugin_modules/security_groups.py | 58 ++------ quark/plugin_views.py | 10 +- quark/protocols.py | 132 ++++++++++++++++++ .../plugin_modules/test_security_groups.py | 129 ++++++++++------- 9 files changed, 306 insertions(+), 104 deletions(-) create mode 100644 quark/db/migration/alembic/versions/26e984b48a0d_revise_security_group_rules.py create mode 100644 quark/protocols.py diff --git a/quark/db/migration/alembic/versions/26e984b48a0d_revise_security_group_rules.py b/quark/db/migration/alembic/versions/26e984b48a0d_revise_security_group_rules.py new file mode 100644 index 0000000..17e4f55 --- /dev/null +++ b/quark/db/migration/alembic/versions/26e984b48a0d_revise_security_group_rules.py @@ -0,0 +1,64 @@ +"""Revise security group rules + +Revision ID: 26e984b48a0d +Revises: 1664300cb03a +Create Date: 2014-09-16 22:01:07.329380 + +""" + +# revision identifiers, used by Alembic. +revision = '26e984b48a0d' +down_revision = '1664300cb03a' + +from alembic import op +import sqlalchemy as sa + + +OLD_TABLE = "quark_security_group_rule" +NEW_TABLE = "quark_security_group_rules" + + +def upgrade(): + # NOTE(mdietz): You can't change the datatype or remove columns, + # in SQLite, please see + # http://sqlite.org/lang_altertable.html + op.drop_table(OLD_TABLE) + op.create_table( + NEW_TABLE, + sa.Column('tenant_id', sa.String(length=255), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('group_id', sa.String(length=36), nullable=False), + sa.Column('direction', sa.String(length=10), nullable=False), + sa.Column('port_range_max', sa.Integer(), nullable=True), + sa.Column('port_range_min', sa.Integer(), nullable=True), + sa.Column('protocol', sa.Integer(), nullable=True), + sa.Column("ethertype", type_=sa.Integer(), nullable=False), + sa.Column('remote_group_id', sa.String(length=36), nullable=True), + sa.Column("remote_ip_prefix", type_=sa.String(255)), + sa.ForeignKeyConstraint(["remote_group_id"], + ["quark_security_groups.id"], + "fk_remote_group_id"), + sa.ForeignKeyConstraint(['group_id'], ['quark_security_groups.id'], ), + sa.PrimaryKeyConstraint('id'), + mysql_engine="InnoDB") + + +def downgrade(): + op.drop_table(NEW_TABLE) + op.create_table( + OLD_TABLE, + sa.Column('tenant_id', sa.String(length=255), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('group_id', sa.String(length=36), nullable=False), + sa.Column('direction', sa.String(length=10), nullable=False), + sa.Column('ethertype', sa.String(length=4), nullable=False), + sa.Column('port_range_max', sa.Integer(), nullable=True), + sa.Column('port_range_min', sa.Integer(), nullable=True), + sa.Column('protocol', sa.Integer(), nullable=True), + sa.Column('remote_ip_prefix', sa.String(length=22), nullable=True), + sa.Column('remote_group_id', sa.String(length=36), nullable=True), + sa.ForeignKeyConstraint(['group_id'], ['quark_security_groups.id'], ), + sa.PrimaryKeyConstraint('id'), + mysql_engine='InnoDB') diff --git a/quark/db/migration/alembic/versions/HEAD b/quark/db/migration/alembic/versions/HEAD index f4f0906..0d5a20b 100644 --- a/quark/db/migration/alembic/versions/HEAD +++ b/quark/db/migration/alembic/versions/HEAD @@ -1 +1 @@ -1664300cb03a \ No newline at end of file +26e984b48a0d \ No newline at end of file diff --git a/quark/db/models.py b/quark/db/models.py index 1b37c38..30f18e4 100644 --- a/quark/db/models.py +++ b/quark/db/models.py @@ -265,18 +265,20 @@ port_group_association_table = sa.Table( class SecurityGroupRule(BASEV2, models.HasId, models.HasTenant): - __tablename__ = "quark_security_group_rule" + __tablename__ = "quark_security_group_rules" id = sa.Column(sa.String(36), primary_key=True) group_id = sa.Column(sa.String(36), sa.ForeignKey("quark_security_groups.id"), nullable=False) direction = sa.Column(sa.String(10), nullable=False) - ethertype = sa.Column(sa.String(4), nullable=False) + ethertype = sa.Column(sa.Integer(), nullable=False) port_range_max = sa.Column(sa.Integer(), nullable=True) port_range_min = sa.Column(sa.Integer(), nullable=True) protocol = sa.Column(sa.Integer(), nullable=True) - remote_ip_prefix = sa.Column(sa.String(22), nullable=True) - remote_group_id = sa.Column(sa.String(36), nullable=True) + remote_ip_prefix = sa.Column(sa.String(255), nullable=True) + remote_group_id = sa.Column(sa.String(36), + sa.ForeignKey("quark_security_groups.id"), + nullable=True) class SecurityGroup(BASEV2, models.HasId): diff --git a/quark/exceptions.py b/quark/exceptions.py index 419ab31..95236d0 100644 --- a/quark/exceptions.py +++ b/quark/exceptions.py @@ -9,6 +9,10 @@ class InvalidMacAddressRange(exceptions.NeutronException): message = _("Invalid MAC address range %(cidr)s.") +class InvalidEthertype(exceptions.NeutronException): + message = _("Invalid Ethertype %(ethertype)s.") + + class MacAddressRangeNotFound(exceptions.NotFound): message = _("MAC address range %(mac_address_range_id) not found.") diff --git a/quark/plugin.py b/quark/plugin.py index 46eb151..e5c9494 100644 --- a/quark/plugin.py +++ b/quark/plugin.py @@ -68,6 +68,7 @@ def append_quark_extensions(conf): append_quark_extensions(CONF) + CONF.register_opts(quark_quota_opts, "QUOTAS") quota.QUOTAS.register_resources(quark_resources) diff --git a/quark/plugin_modules/security_groups.py b/quark/plugin_modules/security_groups.py index 5678b38..da4cf18 100644 --- a/quark/plugin_modules/security_groups.py +++ b/quark/plugin_modules/security_groups.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -from neutron.common import exceptions from neutron.extensions import securitygroup as sg_ext from neutron.openstack.common import log as logging from neutron.openstack.common import uuidutils @@ -22,7 +21,7 @@ from oslo.config import cfg from quark.db import api as db_api from quark import plugin_views as v - +from quark import protocols CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -30,8 +29,6 @@ DEFAULT_SG_UUID = "00000000-0000-0000-0000-000000000000" def _validate_security_group_rule(context, rule): - PROTOCOLS = {"icmp": 1, "tcp": 6, "udp": 17} - ALLOWED_WITH_RANGE = [6, 17] if rule.get("remote_ip_prefix") and rule.get("remote_group_id"): raise sg_ext.SecurityGroupRemoteGroupAndRemoteIpPrefix() @@ -41,32 +38,18 @@ def _validate_security_group_rule(context, rule): port_range_max = rule['port_range_max'] if protocol: - try: - proto = int(protocol) - except ValueError: - proto = str(protocol).lower() - proto = PROTOCOLS.get(proto, -1) - - # Please see http://en.wikipedia.org/wiki/List_of_IP_protocol_numbers - # The field is always 8 bits, and 255 is a reserved value - if not (0 <= proto <= 254): - raise sg_ext.SecurityGroupRuleInvalidProtocol( - protocol=protocol, values=PROTOCOLS.keys()) - - if protocol in ALLOWED_WITH_RANGE: - if (port_range_min is None) != (port_range_max is None): - raise exceptions.InvalidInput( - error_message="For TCP/UDP rules, cannot wildcard " - "only one end of port range.") - if port_range_min is not None and port_range_max is not None: - if port_range_min > port_range_max: - raise sg_ext.SecurityGroupInvalidPortRange() - + protocol = protocols.translate_protocol(protocol, rule["ethertype"]) + protocols.validate_protocol_with_port_ranges(protocol, + port_range_min, + port_range_max) rule['protocol'] = protocol else: if port_range_min is not None or port_range_max is not None: raise sg_ext.SecurityGroupProtocolRequiredWithPorts() + ethertype = protocols.translate_ethertype(rule["ethertype"]) + rule["ethertype"] = ethertype + return rule @@ -87,34 +70,9 @@ def create_security_group(context, security_group): return v._make_security_group_dict(dbgroup) -def _create_default_security_group(context): - default_group = { - "name": "default", "description": "", - "group_id": DEFAULT_SG_UUID, - "port_egress_rules": [], - "port_ingress_rules": [ - {"ethertype": "IPv4", "protocol": 1}, - {"ethertype": "IPv4", "protocol": 6}, - {"ethertype": "IPv4", "protocol": 17}, - {"ethertype": "IPv6", "protocol": 1}, - {"ethertype": "IPv6", "protocol": 6}, - {"ethertype": "IPv6", "protocol": 17}, - ]} - - default_group["id"] = DEFAULT_SG_UUID - default_group["tenant_id"] = context.tenant_id - for rule in default_group.pop("port_ingress_rules"): - db_api.security_group_rule_create( - context, security_group_id=default_group["id"], - tenant_id=context.tenant_id, direction="ingress", - **rule) - db_api.security_group_create(context, **default_group) - - def create_security_group_rule(context, security_group_rule): LOG.info("create_security_group for tenant %s" % (context.tenant_id)) - with context.session.begin(): rule = _validate_security_group_rule( context, security_group_rule["security_group_rule"]) diff --git a/quark/plugin_views.py b/quark/plugin_views.py index 3756b6c..2aad39c 100644 --- a/quark/plugin_views.py +++ b/quark/plugin_views.py @@ -25,6 +25,7 @@ from oslo.config import cfg from quark.db import api as db_api from quark.db import models from quark import network_strategy +from quark import protocols from quark import utils CONF = cfg.CONF @@ -151,13 +152,18 @@ def _make_security_group_dict(security_group, fields=None): def _make_security_group_rule_dict(security_rule, fields=None): + ethertype = protocols.human_readable_ethertype( + security_rule.get("ethertype")) + protocol = protocols.human_readable_protocol( + security_rule.get("protocol"), ethertype) + res = {"id": security_rule.get("id"), - "ethertype": security_rule.get("ethertype"), + "ethertype": ethertype, "direction": security_rule.get("direction"), "tenant_id": security_rule.get("tenant_id"), "port_range_max": security_rule.get("port_range_max"), "port_range_min": security_rule.get("port_range_min"), - "protocol": security_rule.get("protocol"), + "protocol": protocol, "remote_ip_prefix": security_rule.get("remote_ip_prefix"), "security_group_id": security_rule.get("group_id"), "remote_group_id": security_rule.get("remote_group_id")} diff --git a/quark/protocols.py b/quark/protocols.py new file mode 100644 index 0000000..57fc1ee --- /dev/null +++ b/quark/protocols.py @@ -0,0 +1,132 @@ +# Copyright 2014 Openstack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.common import exceptions +from neutron.extensions import securitygroup as sg_ext +from neutron.openstack.common import log as logging +from oslo.config import cfg + +from quark import exceptions as q_exc + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + +# Neutron doesn't officially support any other ethertype +ETHERTYPES = { + "IPv4": 0x0800, + "IPv6": 0x86DD +} + +# Neutron only officially supports TCP, ICMP and UDP, +# with ethertypes IPv4 and IPv6 +PROTOCOLS = { + ETHERTYPES["IPv4"]: { + "icmp": 1, + "tcp": 6, + "udp": 17, + }, + ETHERTYPES["IPv6"]: { + "icmp": 1, + "tcp": 6, + "udp": 17 + } +} + + +ALLOWED_PROTOCOLS = None +ALLOWED_WITH_RANGE = [6, 17] +MIN_PROTOCOL = 0 +MAX_PROTOCOL = 255 +REVERSE_PROTOCOLS = {} +REVERSE_ETHERTYPES = {} + + +def _is_allowed(protocol, ethertype): + # Please see http://en.wikipedia.org/wiki/List_of_IP_protocol_numbers + # The field is always 8 bits wide. + if not (MIN_PROTOCOL <= protocol <= MAX_PROTOCOL): + return False + + return (protocol in PROTOCOLS[ethertype] or + protocol in REVERSE_PROTOCOLS) + + +def translate_ethertype(ethertype): + if ethertype not in ETHERTYPES: + raise q_exc.InvalidEthertype(ethertype=ethertype) + return ETHERTYPES[ethertype] + + +def translate_protocol(protocol, ethertype): + ether = translate_ethertype(ethertype) + try: + proto = int(protocol) + except ValueError: + proto = str(protocol).lower() + proto = PROTOCOLS[ether].get(proto, -1) + + if not _is_allowed(proto, ether): + # TODO(mdietz) This will change as neutron supports new protocols + value_list = PROTOCOLS[ETHERTYPES["IPv4"]].keys() + raise sg_ext.SecurityGroupRuleInvalidProtocol( + protocol=protocol, values=value_list) + return proto + + +def human_readable_ethertype(ethertype): + return REVERSE_ETHERTYPES[ethertype] + + +def human_readable_protocol(protocol, ethertype): + if protocol is None: + return + proto = translate_protocol(protocol, ethertype) + return REVERSE_PROTOCOLS[proto] + + +def validate_protocol_with_port_ranges(protocol, port_range_min, + port_range_max): + if protocol in ALLOWED_WITH_RANGE: + # TODO(mdietz) Allowed with range makes little sense. TCP without + # a port range means what, exactly? + if (port_range_min is None) != (port_range_max is None): + raise exceptions.InvalidInput( + error_message="For TCP/UDP rules, port_range_min and" + "port_range_max must either both be supplied, " + "or neither of them") + if port_range_min is not None and port_range_max is not None: + if port_range_min > port_range_max: + raise sg_ext.SecurityGroupInvalidPortRange() + else: + raise exceptions.InvalidInput( + error_message=("You may not supply ports for the requested " + "protocol")) + + +def _init_protocols(): + if not REVERSE_PROTOCOLS: + # Protocols don't change between ethertypes, but we want to get + # them all, from all ethertypes + for ether_str, ethertype in ETHERTYPES.iteritems(): + for proto, proto_int in PROTOCOLS[ethertype].iteritems(): + REVERSE_PROTOCOLS[proto_int] = proto.upper() + + if not REVERSE_ETHERTYPES: + for ether_str, ethertype in ETHERTYPES.iteritems(): + REVERSE_ETHERTYPES[ethertype] = ether_str + + +_init_protocols() diff --git a/quark/tests/plugin_modules/test_security_groups.py b/quark/tests/plugin_modules/test_security_groups.py index 5b3fbdd..e0de776 100644 --- a/quark/tests/plugin_modules/test_security_groups.py +++ b/quark/tests/plugin_modules/test_security_groups.py @@ -21,7 +21,9 @@ from neutron.extensions import securitygroup as sg_ext from oslo.config import cfg from quark.db import models +from quark import exceptions as q_exc from quark.plugin_modules import security_groups +from quark import protocols from quark.tests import test_quark_plugin @@ -94,10 +96,12 @@ class TestQuarkGetSecurityGroupRules(test_quark_plugin.TestQuarkPlugin): def test_get_security_group_rules(self): rule = {"id": 1, "remote_group_id": 2, "direction": "ingress", "port_range_min": 80, "port_range_max": 100, - "remote_ip_prefix": None, "ethertype": "IPv4", - "tenant_id": "foo", "protocol": "udp", "group_id": 1} + "remote_ip_prefix": None, + "ethertype": protocols.translate_ethertype("IPv4"), + "tenant_id": "foo", "protocol": "UDP", "group_id": 1} expected = rule.copy() expected["security_group_id"] = expected.pop("group_id") + expected["ethertype"] = "IPv4" with self._stubs([rule]): resp = self.plugin.get_security_group_rules(self.context, {}) @@ -108,10 +112,12 @@ class TestQuarkGetSecurityGroupRules(test_quark_plugin.TestQuarkPlugin): def test_get_security_group_rule(self): rule = {"id": 1, "remote_group_id": 2, "direction": "ingress", "port_range_min": 80, "port_range_max": 100, - "remote_ip_prefix": None, "ethertype": "IPv4", - "tenant_id": "foo", "protocol": "udp", "group_id": 1} + "remote_ip_prefix": None, + "ethertype": protocols.translate_ethertype("IPv4"), + "tenant_id": "foo", "protocol": "UDP", "group_id": 1} expected = rule.copy() expected["security_group_id"] = expected.pop("group_id") + expected["ethertype"] = "IPv4" with self._stubs(rule): resp = self.plugin.get_security_group_rule(self.context, 1) @@ -246,7 +252,6 @@ class TestQuarkCreateSecurityGroupRule(test_quark_plugin.TestQuarkPlugin): 'protocol': None, 'port_range_min': None, 'port_range_max': None} self.expected = { - 'id': 1, 'remote_group_id': None, 'direction': None, 'port_range_min': None, @@ -258,34 +263,43 @@ class TestQuarkCreateSecurityGroupRule(test_quark_plugin.TestQuarkPlugin): 'security_group_id': 1} @contextlib.contextmanager - def _stubs(self, rule, group): - dbrule = models.SecurityGroupRule() - dbrule.update(rule) - dbrule.group_id = rule['security_group_id'] + def _stubs(self, rule, group, limit_raise=False): dbgroup = None if group: dbgroup = models.SecurityGroup() dbgroup.update(group) + def _create_rule(context, **rule): + dbrule = models.SecurityGroupRule() + dbrule.update(rule) + dbrule["group_id"] = rule['security_group_id'] + return dbrule + with contextlib.nested( - mock.patch("quark.db.api.security_group_find"), - mock.patch("quark.db.api.security_group_rule_find"), - mock.patch("quark.db.api.security_group_rule_create") - ) as (group_find, rule_find, rule_create): + mock.patch("quark.db.api.security_group_find"), + mock.patch("quark.db.api.security_group_rule_find"), + mock.patch("quark.db.api.security_group_rule_create"), + mock.patch("quark.protocols.human_readable_protocol"), + mock.patch("neutron.quota.QuotaEngine.limit_check") + ) as (group_find, rule_find, rule_create, human, limit_check): group_find.return_value = dbgroup rule_find.return_value.count.return_value = group.get( 'port_rules', None) if group else 0 - rule_create.return_value = dbrule + + rule_create.side_effect = _create_rule + human.return_value = rule["protocol"] + if limit_raise: + limit_check.side_effect = exceptions.OverQuota yield rule_create - def _test_create_security_rule(self, **ruleset): + def _test_create_security_rule(self, limit_raise=False, **ruleset): ruleset['tenant_id'] = self.context.tenant_id rule = dict(self.rule, **ruleset) group = rule.pop('group') expected = dict(self.expected, **ruleset) expected.pop('group', None) hax = {'security_group_rule': rule} - with self._stubs(rule, group) as rule_create: + with self._stubs(rule, group, limit_raise) as rule_create: result = self.plugin.create_security_group_rule(self.context, hax) self.assertTrue(rule_create.called) for key in expected.keys(): @@ -294,24 +308,6 @@ class TestQuarkCreateSecurityGroupRule(test_quark_plugin.TestQuarkPlugin): def test_create_security_rule_IPv6(self): self._test_create_security_rule(ethertype='IPv6') - def test_create_security_rule_UDP(self): - self._test_create_security_rule(protocol=17) - - def test_create_security_rule_UDP_string(self): - self._test_create_security_rule(protocol="UDP") - - def test_create_security_rule_bad_string_fail(self): - self.assertRaises(sg_ext.SecurityGroupRuleInvalidProtocol, - self._test_create_security_rule, protocol="DERP") - - def test_create_security_rule_protocol_under_range_fails(self): - self.assertRaises(sg_ext.SecurityGroupRuleInvalidProtocol, - self._test_create_security_rule, protocol=-1) - - def test_create_security_rule_protocol_over_range_fails(self): - self.assertRaises(sg_ext.SecurityGroupRuleInvalidProtocol, - self._test_create_security_rule, protocol=255) - def test_create_security_rule_TCP(self): self._test_create_security_rule(protocol=6) @@ -325,23 +321,11 @@ class TestQuarkCreateSecurityGroupRule(test_quark_plugin.TestQuarkPlugin): with self.assertRaises(exceptions.InvalidInput): self._test_create_security_rule(protocol=6, port_range_min=0) - def test_create_security_group_no_proto_with_ranges_fails(self): - with self.assertRaises(sg_ext.SecurityGroupProtocolRequiredWithPorts): - self._test_create_security_rule(protocol=None, port_range_min=0) - with self.assertRaises(Exception): # noqa - self._test_create_security_rule( - protocol=6, port_range_min=1, port_range_max=0) - def test_create_security_rule_remote_conflicts(self): with self.assertRaises(Exception): # noqa self._test_create_security_rule(remote_ip_prefix='192.168.0.1', remote_group_id='0') - def test_create_security_rule_min_greater_than_max_fails(self): - with self.assertRaises(sg_ext.SecurityGroupInvalidPortRange): - self._test_create_security_rule(protocol=6, port_range_min=10, - port_range_max=9) - def test_create_security_rule_no_group(self): with self.assertRaises(sg_ext.SecurityGroupNotFound): self._test_create_security_rule(group=None) @@ -349,7 +333,15 @@ class TestQuarkCreateSecurityGroupRule(test_quark_plugin.TestQuarkPlugin): def test_create_security_rule_group_at_max(self): with self.assertRaises(exceptions.OverQuota): self._test_create_security_rule( - group={'id': 1, 'rules': [models.SecurityGroupRule()]}) + group={'id': 1, 'rules': [models.SecurityGroupRule()]}, + limit_raise=True) + + def test_create_security_group_no_proto_with_ranges_fails(self): + with self.assertRaises(sg_ext.SecurityGroupProtocolRequiredWithPorts): + self._test_create_security_rule(protocol=None, port_range_min=0) + with self.assertRaises(Exception): # noqa + self._test_create_security_rule( + protocol=6, port_range_min=1, port_range_max=0) class TestQuarkDeleteSecurityGroupRule(test_quark_plugin.TestQuarkPlugin): @@ -393,3 +385,46 @@ class TestQuarkDeleteSecurityGroupRule(test_quark_plugin.TestQuarkPlugin): None): with self.assertRaises(sg_ext.SecurityGroupNotFound): self.plugin.delete_security_group_rule(self.context, 1) + + +class TestQuarkProtocolHandling(test_quark_plugin.TestQuarkPlugin): + def test_create_security_rule_min_greater_than_max_fails(self): + with self.assertRaises(sg_ext.SecurityGroupInvalidPortRange): + protocols.validate_protocol_with_port_ranges( + protocol=6, port_range_min=10, port_range_max=9) + + def test_translate_protocol_string(self): + proto = protocols.translate_protocol("udp", "IPv4") + self.assertEqual(proto, 17) + + def test_translate_protocol_int(self): + proto = protocols.translate_protocol(17, "IPv4") + self.assertEqual(proto, 17) + + def test_human_readable_protocol_string(self): + proto = protocols.human_readable_protocol("UDP", "IPv4") + self.assertEqual(proto, "UDP") + + def test_human_readable_protocol_int(self): + proto = protocols.human_readable_protocol(17, "IPv4") + self.assertEqual(proto, "UDP") + + def test_human_readable_protocol_string_as_int(self): + proto = protocols.human_readable_protocol("17", "IPv4") + self.assertEqual(proto, "UDP") + + def test_invalid_protocol_string_fail(self): + with self.assertRaises(sg_ext.SecurityGroupRuleInvalidProtocol): + protocols.translate_protocol("DERP", "IPv4") + + def test_translate_protocol_under_range(self): + with self.assertRaises(sg_ext.SecurityGroupRuleInvalidProtocol): + protocols.translate_protocol(-1, "IPv4") + + def test_translate_protocol_over_range(self): + with self.assertRaises(sg_ext.SecurityGroupRuleInvalidProtocol): + protocols.translate_protocol(256, "IPv4") + + def test_translate_protocol_invalid_ethertype(self): + with self.assertRaises(q_exc.InvalidEthertype): + protocols.translate_protocol(256, "IPv7")