ICMPv6 is not an available protocol when creating firewall rule

When creating IPv6 firewall rule, the network protocol that can be
selected is ICMP TCP UDP or null.
But in fact, ICMPv6 is the message control protocol we actually need
for the firewall rule whose ip-version = 6.

This patch fixes this bug with the following logic:
When creating firewall rule whose "ip-version = 6, protocol = ipv6-icmp"
, we should consider that the "icmp" refers to "ipv6-icmp".

Closes-Bug: #1799904
Change-Id: I27cff5ba9986f30fa4c7ddb12db920300edd521b
This commit is contained in:
quyue 2018-11-16 10:35:04 +08:00
parent f2c2a51a9f
commit fa48d16d69
2 changed files with 110 additions and 13 deletions

View File

@ -244,7 +244,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
if not rule['enabled']:
continue
iptbl_rule = self._convert_fwaas_to_iptables_rule(rule)
if rule['ip_version'] == 4:
if rule['ip_version'] == constants.IP_VERSION_4:
ver = IPV4
table = ipt_mgr.ipv4['filter']
else:
@ -258,7 +258,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
if not rule['enabled']:
continue
iptbl_rule = self._convert_fwaas_to_iptables_rule(rule)
if rule['ip_version'] == 4:
if rule['ip_version'] == constants.IP_VERSION_4:
ver = IPV4
table = ipt_mgr.ipv4['filter']
else:
@ -350,39 +350,45 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
def _add_accepted_chain_v4v6(self, ipt_mgr):
v4rules_in_chain = \
ipt_mgr.get_chain("filter", ACCEPTED_CHAIN, ip_version=4)
ipt_mgr.get_chain("filter", ACCEPTED_CHAIN,
ip_version=constants.IP_VERSION_4)
if not v4rules_in_chain:
ipt_mgr.ipv4['filter'].add_chain(ACCEPTED_CHAIN)
ipt_mgr.ipv4['filter'].add_rule(ACCEPTED_CHAIN, '-j ACCEPT')
v6rules_in_chain = \
ipt_mgr.get_chain("filter", ACCEPTED_CHAIN, ip_version=6)
ipt_mgr.get_chain("filter", ACCEPTED_CHAIN,
ip_version=constants.IP_VERSION_6)
if not v6rules_in_chain:
ipt_mgr.ipv6['filter'].add_chain(ACCEPTED_CHAIN)
ipt_mgr.ipv6['filter'].add_rule(ACCEPTED_CHAIN, '-j ACCEPT')
def _add_dropped_chain_v4v6(self, ipt_mgr):
v4rules_in_chain = \
ipt_mgr.get_chain("filter", DROPPED_CHAIN, ip_version=4)
ipt_mgr.get_chain("filter", DROPPED_CHAIN,
ip_version=constants.IP_VERSION_4)
if not v4rules_in_chain:
ipt_mgr.ipv4['filter'].add_chain(DROPPED_CHAIN)
ipt_mgr.ipv4['filter'].add_rule(DROPPED_CHAIN, '-j DROP')
v6rules_in_chain = \
ipt_mgr.get_chain("filter", DROPPED_CHAIN, ip_version=6)
ipt_mgr.get_chain("filter", DROPPED_CHAIN,
ip_version=constants.IP_VERSION_6)
if not v6rules_in_chain:
ipt_mgr.ipv6['filter'].add_chain(DROPPED_CHAIN)
ipt_mgr.ipv6['filter'].add_rule(DROPPED_CHAIN, '-j DROP')
def _add_rejected_chain_v4v6(self, ipt_mgr):
v4rules_in_chain = \
ipt_mgr.get_chain("filter", REJECTED_CHAIN, ip_version=4)
ipt_mgr.get_chain("filter", REJECTED_CHAIN,
ip_version=constants.IP_VERSION_4)
if not v4rules_in_chain:
ipt_mgr.ipv4['filter'].add_chain(REJECTED_CHAIN)
ipt_mgr.ipv4['filter'].add_rule(REJECTED_CHAIN, '-j REJECT')
v6rules_in_chain = \
ipt_mgr.get_chain("filter", REJECTED_CHAIN, ip_version=6)
ipt_mgr.get_chain("filter", REJECTED_CHAIN,
ip_version=constants.IP_VERSION_6)
if not v6rules_in_chain:
ipt_mgr.ipv6['filter'].add_chain(REJECTED_CHAIN)
ipt_mgr.ipv6['filter'].add_rule(REJECTED_CHAIN, '-j REJECT')
@ -457,7 +463,8 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
# and readding rules.
args = []
args += self._protocol_arg(rule.get('protocol'))
args += self._protocol_arg(rule.get('protocol'),
rule.get('ip_version'))
args += self._ip_prefix_arg('s', rule.get('source_ip_address'))
args += self._ip_prefix_arg('d', rule.get('destination_ip_address'))
@ -496,10 +503,14 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
return args
def _protocol_arg(self, protocol):
def _protocol_arg(self, protocol, ip_version):
if not protocol:
return []
if (protocol == constants.PROTO_NAME_ICMP and
ip_version == constants.IP_VERSION_6):
protocol = constants.PROTO_NAME_IPV6_ICMP
args = ['-p', protocol]
return args
@ -508,15 +519,18 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
if not protocol:
return []
protocol_modules = {'udp': 'udp', 'tcp': 'tcp',
'icmp': 'icmp', 'ipv6-icmp': 'icmp6'}
protocol_modules = {constants.PROTO_NAME_UDP: 'udp',
constants.PROTO_NAME_TCP: 'tcp',
constants.PROTO_NAME_ICMP: 'icmp',
constants.PROTO_NAME_IPV6_ICMP: 'icmp6'}
# iptables adds '-m protocol' when the port number is specified
args = ['-m', protocol_modules[protocol]]
return args
def _port_arg(self, direction, protocol, port):
if protocol not in ['udp', 'tcp'] or port is None:
if protocol not in [constants.PROTO_NAME_UDP,
constants.PROTO_NAME_TCP] or port is None:
return []
args = ['--%s' % direction, '%s' % port]

View File

@ -77,6 +77,23 @@ class IptablesFwaasTestCase(base.BaseTestCase):
rule_list.append(rule3)
return rule_list
def _fake_rules_v6(self, fwid, apply_list):
rule_list = []
rule1 = {'enabled': True,
'action': 'allow',
'ip_version': 6,
'protocol': 'icmp',
'destination_ip_address': '2001:db8::2',
'id': 'fake-fw-rule1'}
ingress_chain = ('iv6%s' % fwid)[:11]
egress_chain = ('ov6%s' % fwid)[:11]
for router_info_inst, port_ids in apply_list:
v6filter_inst = router_info_inst.iptables_manager.ipv6['filter']
v6filter_inst.chains.append(ingress_chain)
v6filter_inst.chains.append(egress_chain)
rule_list.append(rule1)
return rule_list
def _fake_firewall_no_rule(self):
rule_list = []
fw_inst = {'id': FAKE_FW_ID,
@ -201,6 +218,66 @@ class IptablesFwaasTestCase(base.BaseTestCase):
intf_name, binary_name)))
v4filter_inst.assert_has_calls(calls)
def _setup_firewall_with_rules_v6(self, func, router_count=1,
distributed=False, distributed_mode=None):
apply_list = self._fake_apply_list(router_count=router_count,
distributed=distributed, distributed_mode=distributed_mode)
rule_list = self._fake_rules_v6(FAKE_FW_ID, apply_list)
firewall = self._fake_firewall(rule_list)
if distributed:
if distributed_mode == 'dvr_snat':
if_prefix = 'sg-'
if distributed_mode == 'dvr':
if_prefix = 'rfp-'
else:
if_prefix = 'qr-'
distributed_mode = 'legacy'
func(distributed_mode, apply_list, firewall)
binary_name = fwaas.iptables_manager.binary_name
dropped = '%s-dropped' % binary_name
accepted = '%s-accepted' % binary_name
invalid_rule = '-m state --state INVALID -j %s' % dropped
est_rule = '-m state --state RELATED,ESTABLISHED -j ACCEPT'
rule1 = '-p ipv6-icmp -d 2001:db8::2/128 -j %s' % accepted
ingress_chain = 'iv6%s' % firewall['id']
egress_chain = 'ov6%s' % firewall['id']
ipt_mgr_ichain = '%s-%s' % (binary_name, ingress_chain[:11])
ipt_mgr_echain = '%s-%s' % (binary_name, egress_chain[:11])
for router_info_inst, port_ids in apply_list:
v6filter_inst = router_info_inst.iptables_manager.ipv6['filter']
calls = [mock.call.remove_chain('iv6fake-fw-uuid'),
mock.call.remove_chain('ov6fake-fw-uuid'),
mock.call.remove_chain('fwaas-default-policy'),
mock.call.add_chain('fwaas-default-policy'),
mock.call.add_rule(
'fwaas-default-policy', '-j %s' % dropped),
mock.call.add_chain(ingress_chain),
mock.call.add_rule(ingress_chain, invalid_rule),
mock.call.add_rule(ingress_chain, est_rule),
mock.call.add_chain(egress_chain),
mock.call.add_rule(egress_chain, invalid_rule),
mock.call.add_rule(egress_chain, est_rule),
mock.call.add_rule(ingress_chain, rule1),
mock.call.add_rule(egress_chain, rule1)
]
for port in FAKE_PORT_IDS:
intf_name = self._get_intf_name(if_prefix, port)
calls.append(mock.call.add_rule('FORWARD',
'-o %s -j %s' % (intf_name, ipt_mgr_ichain)))
for port in FAKE_PORT_IDS:
intf_name = self._get_intf_name(if_prefix, port)
calls.append(mock.call.add_rule('FORWARD',
'-i %s -j %s' % (intf_name, ipt_mgr_echain)))
for direction in ['o', 'i']:
for port_id in FAKE_PORT_IDS:
intf_name = self._get_intf_name(if_prefix, port_id)
calls.append(mock.call.add_rule('FORWARD',
'-%s %s -j %s-fwaas-defau' % (direction,
intf_name, binary_name)))
v6filter_inst.assert_has_calls(calls)
def test_create_firewall_group_no_rules(self):
apply_list = self._fake_apply_list()
first_ri = apply_list[0][0]
@ -244,6 +321,9 @@ class IptablesFwaasTestCase(base.BaseTestCase):
def test_create_firewall_group_with_rules(self):
self._setup_firewall_with_rules(self.firewall.create_firewall_group)
def test_create_firewall_group_with_rules_v6(self):
self._setup_firewall_with_rules_v6(self.firewall.create_firewall_group)
def test_create_firewall_group_with_rules_without_distributed_attr(self):
self._setup_firewall_with_rules(self.firewall.create_firewall_group,
distributed=None)
@ -255,6 +335,9 @@ class IptablesFwaasTestCase(base.BaseTestCase):
def test_update_firewall_group_with_rules(self):
self._setup_firewall_with_rules(self.firewall.update_firewall_group)
def test_update_firewall_group_with_rules_v6(self):
self._setup_firewall_with_rules_v6(self.firewall.update_firewall_group)
def test_update_firewall_group_with_rules_without_distributed_attr(self):
self._setup_firewall_with_rules(self.firewall.update_firewall_group,
distributed=None)