Add rate limiter to icmp handler(DNAT)

To support traceroute or its backend network functionality, some icmp
messages are generated or re-processed at local controller. This
provides the possiblity of DDoS attack.

To prevent that, several global rate limters are added. So that the
controller won't be overwhelmed. Global rate limters should be enough,
as icmp works as control message.

Change-Id: I82152afa49857f8fe8bb94e4f79ca1aba761ebeb
Partially-implements: blueprint traceroute-support
This commit is contained in:
Hong Hui Xiao 2017-02-09 18:54:43 +08:00
parent 6c4c47d76d
commit 749616388a
4 changed files with 177 additions and 25 deletions

View File

@ -27,6 +27,11 @@ df_dnat_app_opts = [
cfg.StrOpt('ex_peer_patch_port', default='patch-int',
help=_("Peer patch port in external bridge for integration "
"bridge.")),
cfg.IntOpt('dnat_ttl_invalid_max_rate', default=3,
help=_('Max rate to reply ICMP time exceeded message per '
'second.')),
cfg.IntOpt('dnat_icmp_error_max_rate', default=3,
help=_('Max rate to handle ICMP error message per second.')),
]

View File

@ -26,6 +26,7 @@ from ryu.lib.packet import packet
from ryu.ofproto import ether
from dragonflow._i18n import _LW
from dragonflow.common import utils as df_utils
from dragonflow import conf as cfg
from dragonflow.controller.common import arp_responder
from dragonflow.controller.common import constants as const
@ -51,12 +52,25 @@ class DNATApp(df_base_app.DFlowApp):
cfg.CONF.df_dnat_app.external_network_bridge
self.external_bridge_mac = ""
self.integration_bridge = cfg.CONF.df.integration_bridge
self.int_peer_patch_port = cfg.CONF.df_dnat_app.int_peer_patch_port
self.ex_peer_patch_port = cfg.CONF.df_dnat_app.ex_peer_patch_port
self.conf = cfg.CONF.df_dnat_app
self.int_peer_patch_port = self.conf.int_peer_patch_port
self.ex_peer_patch_port = self.conf.ex_peer_patch_port
self.external_networks = collections.defaultdict(int)
self.local_floatingips = collections.defaultdict(str)
# Map between fixed ip mac to floating ip
self.floatingip_rarp_cache = {}
self.egress_ttl_invalid_handler_rate_limit = df_utils.RateLimiter(
max_rate=self.conf.dnat_ttl_invalid_max_rate,
time_unit=1)
self.ingress_ttl_invalid_handler_rate_limit = df_utils.RateLimiter(
max_rate=self.conf.dnat_ttl_invalid_max_rate,
time_unit=1)
self.egress_icmp_error_rate_limit = df_utils.RateLimiter(
max_rate=self.conf.dnat_icmp_error_max_rate,
time_unit=1)
self.ingress_icmp_error_rate_limit = df_utils.RateLimiter(
max_rate=self.conf.dnat_icmp_error_max_rate,
time_unit=1)
self.api.register_table_handler(const.INGRESS_NAT_TABLE,
self.ingress_packet_in_handler)
self.api.register_table_handler(const.EGRESS_NAT_TABLE,
@ -76,12 +90,26 @@ class DNATApp(df_base_app.DFlowApp):
if msg.reason == ofproto.OFPR_INVALID_TTL:
LOG.debug("Get an invalid TTL packet at table %s",
const.INGRESS_NAT_TABLE)
if self.ingress_ttl_invalid_handler_rate_limit():
LOG.warning(_LW("Get more than %(rate)s TTL invalid "
"packets per second at table %(table)s"),
{'rate': self.conf.dnat_ttl_invalid_max_rate,
'table': const.INGRESS_NAT_TABLE})
return
icmp_ttl_pkt = icmp_error_generator.generate(
icmp.ICMP_TIME_EXCEEDED, icmp.ICMP_TTL_EXPIRED_CODE, msg.data)
in_port = msg.match.get('in_port')
self.send_packet(in_port, icmp_ttl_pkt)
return
if self.ingress_icmp_error_rate_limit():
LOG.warning(_LW("Get more than %(rate)s ICMP error messages "
"per second at table %(table)s"),
{'rate': self.conf.dnat_icmp_error_max_rate,
'table': const.INGRESS_NAT_TABLE})
return
pkt = packet.Packet(msg.data)
reply_pkt = self._revert_nat_for_icmp_embedded_packet(pkt, INGRESS)
out_port = msg.match.get('reg7')
@ -90,12 +118,19 @@ class DNATApp(df_base_app.DFlowApp):
def egress_packet_in_handler(self, event):
msg = event.msg
ofproto = self.ofproto
pkt = packet.Packet(msg.data)
e_pkt = pkt.get_protocol(ethernet.ethernet)
if msg.reason == ofproto.OFPR_INVALID_TTL:
LOG.debug("Get an invalid TTL packet at table %s",
const.EGRESS_NAT_TABLE)
if self.egress_ttl_invalid_handler_rate_limit():
LOG.warning(_LW("Get more than %(rate)s TTL invalid "
"packets per second at table %(table)s"),
{'rate': self.conf.dnat_ttl_invalid_max_rate,
'table': const.EGRESS_NAT_TABLE})
return
pkt = packet.Packet(msg.data)
e_pkt = pkt.get_protocol(ethernet.ethernet)
floatingip = self.floatingip_rarp_cache.get(e_pkt.src)
if floatingip:
icmp_ttl_pkt = icmp_error_generator.generate(
@ -109,6 +144,14 @@ class DNATApp(df_base_app.DFlowApp):
return
if self.external_bridge_mac:
if self.ingress_icmp_error_rate_limit():
LOG.warning(_LW("Get more than %(rate)s ICMP error messages "
"per second at table %(table)s"),
{'rate': self.conf.dnat_icmp_error_max_rate,
'table': const.INGRESS_NAT_TABLE})
return
pkt = packet.Packet(msg.data)
reply_pkt = self._revert_nat_for_icmp_embedded_packet(pkt, EGRESS)
self.send_packet(self.external_ofport, reply_pkt)

View File

@ -442,6 +442,12 @@ class Router(object):
self.router.close()
class TimeoutException(Exception):
def __init__(self):
super(TimeoutException, self).__init__('Timeout')
class Policy(object):
"""Represent a policy, i.e. the expected packets on each port in the
topology, and the actions to take in each case.
@ -513,7 +519,7 @@ class Policy(object):
:param timeout: After this many seconds, throw an exception
:type timeout: Number
"""
exception = Exception('Timeout')
exception = TimeoutException()
if timeout is not None:
entry_time = time.time()
for thread in self.threads:

View File

@ -572,16 +572,15 @@ class TestDHCPApp(test_base.DFTestBase):
)
)
policy.start(self.topology)
try:
policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT)
except Exception:
# Since there is no dhcp response, we are expecting timeout
# exception here.
pass
finally:
policy.stop()
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
# Since there is no dhcp response, we are expecting timeout
# exception here.
self.assertRaises(
app_testing_objects.TimeoutException,
policy.wait,
const.DEFAULT_RESOURCE_READY_TIMEOUT)
policy.stop()
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
def _test_enable_dhcp(self):
# Create policy
@ -852,16 +851,15 @@ class TestL3App(test_base.DFTestBase):
)
)
policy.start(self.topology)
try:
policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT)
except Exception:
# Since there is no OpenFlow in vswitch, we are expecting timeout
# exception here.
pass
finally:
policy.stop()
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
# Since there is no OpenFlow in vswitch, we are expecting timeout
# exception here.
self.assertRaises(
app_testing_objects.TimeoutException,
policy.wait,
const.DEFAULT_RESOURCE_READY_TIMEOUT)
policy.stop()
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
cmd[1] = "set-controller"
cmd.append(controller)
@ -1868,6 +1866,72 @@ class TestDNATApp(test_base.DFTestBase):
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
def _create_rate_limit_port_policies(self, rate, icmp_filter):
ignore_action = app_testing_objects.IgnoreAction()
raise_action = app_testing_objects.RaiseAction("Unexpected packet")
# Disable port policy rule, so that any further packets will hit the
# default action, which is raise_action in this case.
count_action = app_testing_objects.CountAction(
rate, app_testing_objects.DisableRuleAction())
key = (self.subnet.subnet_id, self.port.port_id)
rules = [
app_testing_objects.PortPolicyRule(
# Detect ICMP, end simulation
icmp_filter(self._get_ip),
actions=[count_action]
),
app_testing_objects.PortPolicyRule(
# Ignore gratuitous ARP packets
app_testing_objects.RyuARPGratuitousFilter(),
actions=[ignore_action]
),
app_testing_objects.PortPolicyRule(
# Ignore IPv6 packets
app_testing_objects.RyuIPv6Filter(),
actions=[ignore_action]
),
]
policy = app_testing_objects.PortPolicy(
rules=rules,
default_action=raise_action
)
return {key: policy}
def test_ttl_packet_rate_limit(self):
initial_packet = self._create_packet(
self.topology.external_network.get_gw_ip(),
ryu.lib.packet.ipv4.inet.IPPROTO_ICMP,
ttl=1)
send_action = app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port.port_id,
str(initial_packet))
ignore_action = app_testing_objects.IgnoreAction()
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
send_action,
send_action,
send_action,
send_action,
],
port_policies=self._create_rate_limit_port_policies(
cfg.CONF.df_dnat_app.dnat_ttl_invalid_max_rate,
app_testing_objects.RyuICMPTimeExceedFilter),
unknown_port_action=ignore_action
)
)
policy.start(self.topology)
# Since the rate limit, we expect timeout to wait for 4th packet hit
# the policy.
self.assertRaises(
app_testing_objects.TimeoutException,
policy.wait,
const.DEFAULT_RESOURCE_READY_TIMEOUT)
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
def test_nat_embedded_packet(self):
ignore_action = app_testing_objects.IgnoreAction()
self.port.port.update({"security_groups": []})
@ -1893,3 +1957,37 @@ class TestDNATApp(test_base.DFTestBase):
policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT)
if len(policy.exceptions) > 0:
raise policy.exceptions[0]
def test_nat_embedded_rate_limit(self):
self.port.port.update({"security_groups": []})
initial_packet = self._create_packet(
self.topology.external_network.get_gw_ip(),
ryu.lib.packet.ipv4.inet.IPPROTO_UDP)
send_action = app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port.port_id,
str(initial_packet))
ignore_action = app_testing_objects.IgnoreAction()
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
send_action,
send_action,
send_action,
send_action,
],
port_policies=self._create_rate_limit_port_policies(
cfg.CONF.df_dnat_app.dnat_icmp_error_max_rate,
app_testing_objects.RyuICMPUnreachFilter),
unknown_port_action=ignore_action
)
)
policy.start(self.topology)
# Since the rate limit, we expect timeout to wait for 4th packet hit
# the policy.
self.assertRaises(
app_testing_objects.TimeoutException,
policy.wait,
const.DEFAULT_RESOURCE_READY_TIMEOUT)
if len(policy.exceptions) > 0:
raise policy.exceptions[0]