Validation for classification attributes
This patch validate classification attributes for security-group and firewall rules create. Co-Authored-By: Mohankumar <mohankumar.n@huawei.com> Change-Id: Iad3bb9792dbb08d21b5072f6740c2f622f5b706d
This commit is contained in:
parent
fb482eb6d5
commit
10b2eb3127
@ -13,8 +13,20 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
CLASSIFIER_TYPES = ['ip_classifier', 'ipv4_classifier', 'ipv6_classifier',
|
||||
'transport_classifier', 'ethernet_classifier',
|
||||
'encapsulation_classifier', 'neutron_port_classifier']
|
||||
|
||||
# Protocol names and numbers
|
||||
PROTO_NAME_ICMP = 'icmp'
|
||||
PROTO_NAME_ICMP_V6 = 'icmpv6'
|
||||
PROTO_NAME_TCP = 'tcp'
|
||||
PROTO_NAME_UDP = 'udp'
|
||||
|
||||
# TODO(sc68cal) add more protocols`
|
||||
PROTOCOLS = ['tcp', 'udp', 'icmp', 'icmpv6']
|
||||
PROTOCOLS = [PROTO_NAME_ICMP, PROTO_NAME_ICMP_V6,
|
||||
PROTO_NAME_TCP, PROTO_NAME_UDP]
|
||||
|
||||
ENCAPSULATION_TYPES = ['vxlan', 'gre']
|
||||
|
||||
|
39
neutron_classifier/common/exceptions.py
Normal file
39
neutron_classifier/common/exceptions.py
Normal file
@ -0,0 +1,39 @@
|
||||
# Copyright (c) 2016 Huawei Technologies India Pvt Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
neutron-classifier exception handling.
|
||||
"""
|
||||
|
||||
from neutron_lib import exceptions as nexceptions
|
||||
|
||||
|
||||
class InvalidEthernetClassifier(nexceptions.NeutronException):
|
||||
message = ('Invalid ethernet classifier value for %(eth_type)s.')
|
||||
|
||||
|
||||
class EthertypeConflictWithProtocol(nexceptions.NeutronException):
|
||||
message = ("Invalid ethertype %(ethertype)s for protocol %(protocol)s.")
|
||||
|
||||
|
||||
class IpAddressConflict(nexceptions.NeutronException):
|
||||
message = ("IP address do not agree with the given IP Version.")
|
||||
|
||||
|
||||
class InvalidICMPParameter(nexceptions.NeutronException):
|
||||
message = ("%(param)s are not allowed when protocol is set to ICMP.")
|
||||
|
||||
|
||||
class InvalidPortRange(nexceptions.NeutronException):
|
||||
message = ("Invalid port range %(port_range).")
|
@ -15,6 +15,7 @@
|
||||
|
||||
from neutron_classifier.common import constants
|
||||
from neutron_classifier.db import models
|
||||
from neutron_classifier.db import validators
|
||||
|
||||
|
||||
def security_group_ethertype_to_ethertype_value(ethertype):
|
||||
@ -60,30 +61,37 @@ def convert_security_group_to_classifier(context, security_group):
|
||||
|
||||
|
||||
def convert_security_group_rule_to_classifier(context, sgr, group):
|
||||
# Pull the source from the SG rule
|
||||
cl1 = models.IpClassifier()
|
||||
cl1.source_ip_prefix = sgr['remote_ip_prefix']
|
||||
|
||||
# Ports
|
||||
cl2 = models.TransportClassifier(
|
||||
destination_port_range_min=sgr['port_range_min'],
|
||||
destination_port_range_max=sgr['port_range_max'])
|
||||
|
||||
# Direction
|
||||
cl3 = models.DirectionClassifier(
|
||||
direction=sgr['direction'])
|
||||
cl1 = cl2 = cl3 = cl4 = cl5 = None
|
||||
|
||||
# Ethertype
|
||||
cl4 = models.EthernetClassifier()
|
||||
cl4.ethertype = security_group_ethertype_to_ethertype_value(
|
||||
sgr['ethertype'])
|
||||
if validators.is_ethernetclassifier_valid(sgr, validators.SG_RULE_TYPE):
|
||||
cl1 = models.EthernetClassifier()
|
||||
cl1.ethertype = security_group_ethertype_to_ethertype_value(
|
||||
sgr['ethertype'])
|
||||
|
||||
if cl4.ethertype == constants.ETHERTYPE_IPV6:
|
||||
cl5 = models.Ipv6Classifier()
|
||||
cl5.next_header = sgr['protocol']
|
||||
else:
|
||||
cl5 = models.Ipv4Classifier()
|
||||
cl5.protocol = sgr['protocol']
|
||||
# protocol
|
||||
if validators.is_protocolclassifier_valid(sgr, validators.SG_RULE_TYPE):
|
||||
if cl1 and cl1.ethertype == constants.ETHERTYPE_IPV6:
|
||||
cl2 = models.Ipv6Classifier()
|
||||
cl2.next_header = sgr['protocol']
|
||||
else:
|
||||
cl2 = models.Ipv4Classifier()
|
||||
cl2.protocol = sgr['protocol']
|
||||
|
||||
# remote ip
|
||||
if validators.is_ipclassifier_valid(sgr, validators.SG_RULE_TYPE):
|
||||
cl3 = models.IpClassifier()
|
||||
cl3.source_ip_prefix = sgr['remote_ip_prefix']
|
||||
|
||||
# Ports
|
||||
if validators.is_transportclassifier_valid(sgr, validators.SG_RULE_TYPE):
|
||||
cl4 = models.TransportClassifier(
|
||||
destination_port_range_min=sgr['port_range_min'],
|
||||
destination_port_range_max=sgr['port_range_max'])
|
||||
|
||||
# Direction
|
||||
if validators.is_directionclassifier_valid(sgr, validators.SG_RULE_TYPE):
|
||||
cl5 = models.DirectionClassifier(direction=sgr['direction'])
|
||||
|
||||
classifiers = [cl1, cl2, cl3, cl4, cl5]
|
||||
create_classifier_chain(group, classifiers)
|
||||
@ -128,30 +136,36 @@ def convert_firewall_policy_to_classifier(context, firewall):
|
||||
return cgroup
|
||||
|
||||
|
||||
def convert_firewall_rule_to_classifier(context, fw_rule, group):
|
||||
def convert_firewall_rule_to_classifier(context, fwr, group):
|
||||
cl1 = cl2 = cl3 = cl4 = None
|
||||
|
||||
# ip_version
|
||||
cl1 = models.EthernetClassifier()
|
||||
cl1.ethertype = fw_rule['ip_version']
|
||||
if validators.is_ethernetclassifier_valid(fwr, validators.FW_RULE_TYPE):
|
||||
cl1 = models.EthernetClassifier()
|
||||
cl1.ethertype = fwr['ip_version']
|
||||
|
||||
# protocol
|
||||
if cl1.ethertype == constants.IP_VERSION_6:
|
||||
cl2 = models.Ipv6Classifier()
|
||||
cl2.next_header = fw_rule['protocol']
|
||||
else:
|
||||
cl2 = models.Ipv4Classifier()
|
||||
cl2.protocol = fw_rule['protocol']
|
||||
if validators.is_protocolclassifier_valid(fwr, validators.FW_RULE_TYPE):
|
||||
if cl1.ethertype == constants.IP_VERSION_6:
|
||||
cl2 = models.Ipv6Classifier()
|
||||
cl2.next_header = fwr['protocol']
|
||||
else:
|
||||
cl2 = models.Ipv4Classifier()
|
||||
cl2.protocol = fwr['protocol']
|
||||
|
||||
# Source and destination ip
|
||||
cl3 = models.IpClassifier()
|
||||
cl3.source_ip_prefix = fw_rule['source_ip_address']
|
||||
cl3.destination_ip_prefix = fw_rule['destination_ip_address']
|
||||
if validators.is_ipclassifier_valid(fwr, validators.FW_RULE_TYPE):
|
||||
cl3 = models.IpClassifier()
|
||||
cl3.source_ip_prefix = fwr['source_ip_address']
|
||||
cl3.destination_ip_prefix = fwr['destination_ip_address']
|
||||
|
||||
# Ports
|
||||
cl4 = models.TransportClassifier(
|
||||
source_port_range_min=fw_rule['source_port_range_min'],
|
||||
source_port_range_max=fw_rule['source_port_range_max'],
|
||||
destination_port_range_min=fw_rule['destination_port_range_min'],
|
||||
destination_port_range_max=fw_rule['destination_port_range_max'])
|
||||
if validators.is_transportclassifier_valid(fwr, validators.FW_RULE_TYPE):
|
||||
cl4 = models.TransportClassifier(
|
||||
source_port_range_min=fwr['source_port_range_min'],
|
||||
source_port_range_max=fwr['source_port_range_max'],
|
||||
destination_port_range_min=fwr['destination_port_range_min'],
|
||||
destination_port_range_max=fwr['destination_port_range_max'])
|
||||
|
||||
classifiers = [cl1, cl2, cl3, cl4]
|
||||
create_classifier_chain(group, classifiers)
|
||||
|
141
neutron_classifier/db/validators.py
Normal file
141
neutron_classifier/db/validators.py
Normal file
@ -0,0 +1,141 @@
|
||||
# Copyright (c) 2016 Huawei Technologies India Pvt Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron_classifier.common import constants as const
|
||||
from neutron_classifier.common import exceptions as exc
|
||||
|
||||
import netaddr
|
||||
|
||||
SG_RULE_TYPE = 1
|
||||
FW_RULE_TYPE = 2
|
||||
|
||||
|
||||
def get_attr_value(dict, key):
|
||||
return dict.get(key, None)
|
||||
|
||||
|
||||
def _validate_fwr_protocol_parameters(fwr, protocol):
|
||||
"""Check if given port values and protocol is valid."""
|
||||
if protocol in (const.PROTO_NAME_ICMP, const.PROTO_NAME_ICMP_V6):
|
||||
source_port_range_min = get_attr_value(fwr, 'source_port_range_min')
|
||||
source_port_range_max = get_attr_value(fwr, 'source_port_range_max')
|
||||
destination_port_range_min = get_attr_value(
|
||||
fwr, 'destination_port_range_min')
|
||||
destination_port_range_max = get_attr_value(
|
||||
fwr, 'destination_port_range_max')
|
||||
if (source_port_range_min or source_port_range_max or
|
||||
destination_port_range_min or destination_port_range_max):
|
||||
raise exc.InvalidICMPParameter(param="Source, destination port")
|
||||
|
||||
|
||||
def _validate_sg_ethertype_and_protocol(rule, protocol):
|
||||
"""Check if given ethertype and protocol is valid."""
|
||||
eth_value = get_attr_value(rule, 'ethertype')
|
||||
if protocol == const.PROTO_NAME_ICMP_V6:
|
||||
if eth_value == const.SECURITYGROUP_ETHERTYPE_IPV4:
|
||||
raise exc.EthertypeConflictWithProtocol(ethertype=eth_value,
|
||||
protocol=protocol)
|
||||
|
||||
|
||||
def validate_port_range(min_port, max_port):
|
||||
"""Check that port_range is valid."""
|
||||
port_range = '%s:%s' % (min_port, max_port)
|
||||
if(min_port is None and
|
||||
max_port is None):
|
||||
return
|
||||
if (int(min_port) <= 0 or int(max_port) <= 0):
|
||||
raise exc.InvalidPortRange(port_range=port_range)
|
||||
if int(min_port) > int(max_port):
|
||||
raise exc.InvalidPortRange(port_range=port_range)
|
||||
|
||||
|
||||
def is_ethernetclassifier_valid(rule, type):
|
||||
"""Check ethertype or ip_version in rule dict."""
|
||||
if type == SG_RULE_TYPE:
|
||||
attr_type = 'ethertype'
|
||||
attr_list = [const.SECURITYGROUP_ETHERTYPE_IPV4,
|
||||
const.SECURITYGROUP_ETHERTYPE_IPV6]
|
||||
else:
|
||||
attr_type = 'ip_version'
|
||||
attr_list = [const.IP_VERSION_4, const.IP_VERSION_6]
|
||||
eth_value = get_attr_value(rule, attr_type)
|
||||
if not eth_value:
|
||||
return False
|
||||
elif eth_value not in attr_list:
|
||||
raise exc.InvalidEthernetClassifier(eth_type=attr_type)
|
||||
return True
|
||||
|
||||
|
||||
def is_protocolclassifier_valid(rule, type):
|
||||
"""Check protocol in rule dict and validate dependent params"""
|
||||
protocol = get_attr_value(rule, 'protocol')
|
||||
if not protocol:
|
||||
return False
|
||||
if type == SG_RULE_TYPE:
|
||||
_validate_sg_ethertype_and_protocol(rule, protocol)
|
||||
else:
|
||||
_validate_fwr_protocol_parameters(rule, protocol)
|
||||
return True
|
||||
|
||||
|
||||
def is_ipclassifier_valid(rule, type):
|
||||
"""validate the ip address received in rule dict"""
|
||||
src_ip_version = dst_ip_version = None
|
||||
src_ip_address = dst_ip_address = None
|
||||
if type == SG_RULE_TYPE:
|
||||
dst_ip_address = get_attr_value(rule, 'remote_ip_prefix')
|
||||
attr_type = 'ethertype'
|
||||
else:
|
||||
src_ip_address = get_attr_value(rule, 'source_ip_address')
|
||||
dst_ip_address = get_attr_value(rule, 'destination_ip_address')
|
||||
attr_type = 'ip_version'
|
||||
if src_ip_address:
|
||||
src_ip_version = netaddr.IPNetwork(src_ip_address).version
|
||||
if dst_ip_address:
|
||||
dst_ip_version = netaddr.IPNetwork(dst_ip_address).version
|
||||
rule_ip_version = get_attr_value(rule, attr_type)
|
||||
if type == SG_RULE_TYPE:
|
||||
if rule_ip_version != "IPv%d" % dst_ip_version:
|
||||
raise exc.IpAddressConflict()
|
||||
elif ((src_ip_version and src_ip_version != rule_ip_version) or
|
||||
(dst_ip_version and dst_ip_version != rule_ip_version)):
|
||||
raise exc.IpAddressConflict()
|
||||
return True
|
||||
|
||||
|
||||
def is_directionclassifier_valid(rule, type):
|
||||
"""Check direction param in rule dict"""
|
||||
direction = get_attr_value(rule, 'direction')
|
||||
if not direction:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_transportclassifier_valid(rule, type):
|
||||
"""Verify port range values"""
|
||||
if type == SG_RULE_TYPE:
|
||||
port_range_min = get_attr_value(rule, 'port_range_min')
|
||||
port_range_max = get_attr_value(rule, 'port_range_max')
|
||||
validate_port_range(port_range_min, port_range_max)
|
||||
else:
|
||||
source_port_range_min = get_attr_value(rule, 'source_port_range_min')
|
||||
source_port_range_max = get_attr_value(rule, 'source_port_range_max')
|
||||
destination_port_range_min = get_attr_value(
|
||||
rule, 'destination_port_range_min')
|
||||
destination_port_range_max = get_attr_value(
|
||||
rule, 'destination_port_range_max')
|
||||
validate_port_range(source_port_range_min, source_port_range_max)
|
||||
validate_port_range(destination_port_range_min,
|
||||
destination_port_range_max)
|
||||
return True
|
@ -12,6 +12,8 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy as cp
|
||||
from neutron_classifier.db import api
|
||||
from neutron_classifier.db import models
|
||||
import sqlalchemy as sa
|
||||
@ -129,6 +131,54 @@ class DbApiTestCase(base.BaseTestCase):
|
||||
result['tenant_id'] = FAKE_SG_RULE_V6['tenant_id']
|
||||
self.assertEqual(FAKE_SG_RULE_V6, result)
|
||||
|
||||
def _test_convert_sg_rule_to_classifier_exception(self, sg_rule):
|
||||
try:
|
||||
self._test_convert_security_group_rule_to_classifier(sg_rule)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def test_convert_sg_rule_to_classifier_with_no_ethertype(self):
|
||||
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
|
||||
del FAKE_SG_RULE['ethertype']
|
||||
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
|
||||
|
||||
# test case for invalid ip-version
|
||||
def test_convert_sg_rule_to_classifier_with_invalid_ethertype(self):
|
||||
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
|
||||
FAKE_SG_RULE['ethertype'] = 'IPvx'
|
||||
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
|
||||
|
||||
# test case for protocol none
|
||||
def test_convert_sg_rule_to_classifier_with_None_protocol(self):
|
||||
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
|
||||
del FAKE_SG_RULE['protocol']
|
||||
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
|
||||
|
||||
# can not allow icmpv6 protocol with IPv4 version
|
||||
def test_convert_sg_rule_to_classifier_with_icmpv6_protocol(self):
|
||||
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
|
||||
FAKE_SG_RULE['protocol'] = 'icmpv6'
|
||||
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
|
||||
|
||||
# ip-version is 4 and remote ip as v6 address
|
||||
def test_convert_sg_rule_to_classifier_with_invalid_remote_ipv6(self):
|
||||
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
|
||||
FAKE_SG_RULE['remote_ip_prefix'] = 'fddf:cb3b:bc4::/48'
|
||||
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
|
||||
|
||||
# ip-version is 6 and remote ip as v4 address
|
||||
def test_convert_sg_rule_to_classifier_with_invalid_dest_ipv4(self):
|
||||
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V6)
|
||||
FAKE_SG_RULE['remote_ip_prefix'] = '1.2.3.4/24'
|
||||
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
|
||||
|
||||
# invalid port-range
|
||||
def test_convert_sg_rule_to_classifier_with_invalid_port_range(self):
|
||||
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
|
||||
FAKE_SG_RULE['port_range_min'] = 200
|
||||
FAKE_SG_RULE['port_range_max'] = 10
|
||||
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
|
||||
|
||||
# Firewall testcases
|
||||
def _test_convert_firewall_rule_to_classifier(self, fw_rule):
|
||||
cg = self._create_classifier_group('neutron-fwaas')
|
||||
@ -168,3 +218,48 @@ class DbApiTestCase(base.BaseTestCase):
|
||||
result['action'] = FAKE_FW_RULE_V6['action']
|
||||
result['enabled'] = FAKE_FW_RULE_V6['enabled']
|
||||
self.assertEqual(FAKE_FW_RULE_V6, result)
|
||||
|
||||
def _test_convert_firewall_rule_to_classifier_exception(self, fw_rule):
|
||||
try:
|
||||
self._test_convert_firewall_rule_to_classifier(fw_rule)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# test case for invalid ip-version
|
||||
def test_convert_firewall_rule_to_classifier_with_invalid_ip_version(self):
|
||||
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
|
||||
FAKE_FW_RULE['ip_version'] = 5
|
||||
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
|
||||
|
||||
# test case for protocol none
|
||||
def test_convert_firewall_rule_to_classifier_with_None_protocol(self):
|
||||
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
|
||||
del FAKE_FW_RULE['protocol']
|
||||
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
|
||||
|
||||
# icmp protocol with valid port range
|
||||
def test_convert_firewall_rule_to_classifier_with_icmp_protocol(self):
|
||||
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
|
||||
FAKE_FW_RULE['protocol'] = 'icmp'
|
||||
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
|
||||
|
||||
# ip-version is 4 and source ip as v6 address
|
||||
def test_convert_firewall_rule_to_classifier_with_invalid_source_ip(self):
|
||||
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
|
||||
FAKE_FW_RULE['source_ip_address'] = 'fddf:cb3b:bc4::/48'
|
||||
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
|
||||
|
||||
# ip-version is 6 and dest ip as v4 address
|
||||
def test_convert_firewall_rule_to_classifier_with_invalid_dest_ip(self):
|
||||
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V6)
|
||||
FAKE_FW_RULE['destination_ip_address'] = '1.2.3.4/24'
|
||||
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
|
||||
|
||||
# invalid port-range
|
||||
def test_convert_firewall_rule_to_classifier_with_invalid_port_range(self):
|
||||
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
|
||||
FAKE_FW_RULE['source_port_range_min'] = 200
|
||||
FAKE_FW_RULE['source_port_range_max'] = 10
|
||||
FAKE_FW_RULE['destination_port_range_min'] = 100
|
||||
FAKE_FW_RULE['destination_port_range_max'] = 10
|
||||
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
|
||||
|
Loading…
x
Reference in New Issue
Block a user