From 749616388a73b3d965500f305b487ba45cee4079 Mon Sep 17 00:00:00 2001 From: Hong Hui Xiao Date: Thu, 9 Feb 2017 18:54:43 +0800 Subject: [PATCH] Add rate limiter to icmp handler(DNAT) To support traceroute or its backend network functionality, some icmp messages are generated or re-processed at local controller. This provides the possiblity of DDoS attack. To prevent that, several global rate limters are added. So that the controller won't be overwhelmed. Global rate limters should be enough, as icmp works as control message. Change-Id: I82152afa49857f8fe8bb94e4f79ca1aba761ebeb Partially-implements: blueprint traceroute-support --- dragonflow/conf/df_dnat.py | 5 + dragonflow/controller/dnat_app.py | 51 ++++++- .../tests/common/app_testing_objects.py | 8 +- dragonflow/tests/fullstack/test_apps.py | 138 +++++++++++++++--- 4 files changed, 177 insertions(+), 25 deletions(-) diff --git a/dragonflow/conf/df_dnat.py b/dragonflow/conf/df_dnat.py index 5799fc8ad..181ba2267 100644 --- a/dragonflow/conf/df_dnat.py +++ b/dragonflow/conf/df_dnat.py @@ -27,6 +27,11 @@ df_dnat_app_opts = [ cfg.StrOpt('ex_peer_patch_port', default='patch-int', help=_("Peer patch port in external bridge for integration " "bridge.")), + cfg.IntOpt('dnat_ttl_invalid_max_rate', default=3, + help=_('Max rate to reply ICMP time exceeded message per ' + 'second.')), + cfg.IntOpt('dnat_icmp_error_max_rate', default=3, + help=_('Max rate to handle ICMP error message per second.')), ] diff --git a/dragonflow/controller/dnat_app.py b/dragonflow/controller/dnat_app.py index 69bda5b2f..1d7ada5cd 100644 --- a/dragonflow/controller/dnat_app.py +++ b/dragonflow/controller/dnat_app.py @@ -26,6 +26,7 @@ from ryu.lib.packet import packet from ryu.ofproto import ether from dragonflow._i18n import _LW +from dragonflow.common import utils as df_utils from dragonflow import conf as cfg from dragonflow.controller.common import arp_responder from dragonflow.controller.common import constants as const @@ -51,12 +52,25 @@ class DNATApp(df_base_app.DFlowApp): cfg.CONF.df_dnat_app.external_network_bridge self.external_bridge_mac = "" self.integration_bridge = cfg.CONF.df.integration_bridge - self.int_peer_patch_port = cfg.CONF.df_dnat_app.int_peer_patch_port - self.ex_peer_patch_port = cfg.CONF.df_dnat_app.ex_peer_patch_port + self.conf = cfg.CONF.df_dnat_app + self.int_peer_patch_port = self.conf.int_peer_patch_port + self.ex_peer_patch_port = self.conf.ex_peer_patch_port self.external_networks = collections.defaultdict(int) self.local_floatingips = collections.defaultdict(str) # Map between fixed ip mac to floating ip self.floatingip_rarp_cache = {} + self.egress_ttl_invalid_handler_rate_limit = df_utils.RateLimiter( + max_rate=self.conf.dnat_ttl_invalid_max_rate, + time_unit=1) + self.ingress_ttl_invalid_handler_rate_limit = df_utils.RateLimiter( + max_rate=self.conf.dnat_ttl_invalid_max_rate, + time_unit=1) + self.egress_icmp_error_rate_limit = df_utils.RateLimiter( + max_rate=self.conf.dnat_icmp_error_max_rate, + time_unit=1) + self.ingress_icmp_error_rate_limit = df_utils.RateLimiter( + max_rate=self.conf.dnat_icmp_error_max_rate, + time_unit=1) self.api.register_table_handler(const.INGRESS_NAT_TABLE, self.ingress_packet_in_handler) self.api.register_table_handler(const.EGRESS_NAT_TABLE, @@ -76,12 +90,26 @@ class DNATApp(df_base_app.DFlowApp): if msg.reason == ofproto.OFPR_INVALID_TTL: LOG.debug("Get an invalid TTL packet at table %s", const.INGRESS_NAT_TABLE) + if self.ingress_ttl_invalid_handler_rate_limit(): + LOG.warning(_LW("Get more than %(rate)s TTL invalid " + "packets per second at table %(table)s"), + {'rate': self.conf.dnat_ttl_invalid_max_rate, + 'table': const.INGRESS_NAT_TABLE}) + return + icmp_ttl_pkt = icmp_error_generator.generate( icmp.ICMP_TIME_EXCEEDED, icmp.ICMP_TTL_EXPIRED_CODE, msg.data) in_port = msg.match.get('in_port') self.send_packet(in_port, icmp_ttl_pkt) return + if self.ingress_icmp_error_rate_limit(): + LOG.warning(_LW("Get more than %(rate)s ICMP error messages " + "per second at table %(table)s"), + {'rate': self.conf.dnat_icmp_error_max_rate, + 'table': const.INGRESS_NAT_TABLE}) + return + pkt = packet.Packet(msg.data) reply_pkt = self._revert_nat_for_icmp_embedded_packet(pkt, INGRESS) out_port = msg.match.get('reg7') @@ -90,12 +118,19 @@ class DNATApp(df_base_app.DFlowApp): def egress_packet_in_handler(self, event): msg = event.msg ofproto = self.ofproto - pkt = packet.Packet(msg.data) - e_pkt = pkt.get_protocol(ethernet.ethernet) if msg.reason == ofproto.OFPR_INVALID_TTL: LOG.debug("Get an invalid TTL packet at table %s", const.EGRESS_NAT_TABLE) + if self.egress_ttl_invalid_handler_rate_limit(): + LOG.warning(_LW("Get more than %(rate)s TTL invalid " + "packets per second at table %(table)s"), + {'rate': self.conf.dnat_ttl_invalid_max_rate, + 'table': const.EGRESS_NAT_TABLE}) + return + + pkt = packet.Packet(msg.data) + e_pkt = pkt.get_protocol(ethernet.ethernet) floatingip = self.floatingip_rarp_cache.get(e_pkt.src) if floatingip: icmp_ttl_pkt = icmp_error_generator.generate( @@ -109,6 +144,14 @@ class DNATApp(df_base_app.DFlowApp): return if self.external_bridge_mac: + if self.ingress_icmp_error_rate_limit(): + LOG.warning(_LW("Get more than %(rate)s ICMP error messages " + "per second at table %(table)s"), + {'rate': self.conf.dnat_icmp_error_max_rate, + 'table': const.INGRESS_NAT_TABLE}) + return + + pkt = packet.Packet(msg.data) reply_pkt = self._revert_nat_for_icmp_embedded_packet(pkt, EGRESS) self.send_packet(self.external_ofport, reply_pkt) diff --git a/dragonflow/tests/common/app_testing_objects.py b/dragonflow/tests/common/app_testing_objects.py index 825fca860..544b1c87e 100644 --- a/dragonflow/tests/common/app_testing_objects.py +++ b/dragonflow/tests/common/app_testing_objects.py @@ -442,6 +442,12 @@ class Router(object): self.router.close() +class TimeoutException(Exception): + + def __init__(self): + super(TimeoutException, self).__init__('Timeout') + + class Policy(object): """Represent a policy, i.e. the expected packets on each port in the topology, and the actions to take in each case. @@ -513,7 +519,7 @@ class Policy(object): :param timeout: After this many seconds, throw an exception :type timeout: Number """ - exception = Exception('Timeout') + exception = TimeoutException() if timeout is not None: entry_time = time.time() for thread in self.threads: diff --git a/dragonflow/tests/fullstack/test_apps.py b/dragonflow/tests/fullstack/test_apps.py index 43983e167..4ec0cb48b 100644 --- a/dragonflow/tests/fullstack/test_apps.py +++ b/dragonflow/tests/fullstack/test_apps.py @@ -572,16 +572,15 @@ class TestDHCPApp(test_base.DFTestBase): ) ) policy.start(self.topology) - try: - policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT) - except Exception: - # Since there is no dhcp response, we are expecting timeout - # exception here. - pass - finally: - policy.stop() - if len(policy.exceptions) > 0: - raise policy.exceptions[0] + # Since there is no dhcp response, we are expecting timeout + # exception here. + self.assertRaises( + app_testing_objects.TimeoutException, + policy.wait, + const.DEFAULT_RESOURCE_READY_TIMEOUT) + policy.stop() + if len(policy.exceptions) > 0: + raise policy.exceptions[0] def _test_enable_dhcp(self): # Create policy @@ -852,16 +851,15 @@ class TestL3App(test_base.DFTestBase): ) ) policy.start(self.topology) - try: - policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT) - except Exception: - # Since there is no OpenFlow in vswitch, we are expecting timeout - # exception here. - pass - finally: - policy.stop() - if len(policy.exceptions) > 0: - raise policy.exceptions[0] + # Since there is no OpenFlow in vswitch, we are expecting timeout + # exception here. + self.assertRaises( + app_testing_objects.TimeoutException, + policy.wait, + const.DEFAULT_RESOURCE_READY_TIMEOUT) + policy.stop() + if len(policy.exceptions) > 0: + raise policy.exceptions[0] cmd[1] = "set-controller" cmd.append(controller) @@ -1868,6 +1866,72 @@ class TestDNATApp(test_base.DFTestBase): if len(policy.exceptions) > 0: raise policy.exceptions[0] + def _create_rate_limit_port_policies(self, rate, icmp_filter): + ignore_action = app_testing_objects.IgnoreAction() + raise_action = app_testing_objects.RaiseAction("Unexpected packet") + # Disable port policy rule, so that any further packets will hit the + # default action, which is raise_action in this case. + count_action = app_testing_objects.CountAction( + rate, app_testing_objects.DisableRuleAction()) + + key = (self.subnet.subnet_id, self.port.port_id) + rules = [ + app_testing_objects.PortPolicyRule( + # Detect ICMP, end simulation + icmp_filter(self._get_ip), + actions=[count_action] + ), + app_testing_objects.PortPolicyRule( + # Ignore gratuitous ARP packets + app_testing_objects.RyuARPGratuitousFilter(), + actions=[ignore_action] + ), + app_testing_objects.PortPolicyRule( + # Ignore IPv6 packets + app_testing_objects.RyuIPv6Filter(), + actions=[ignore_action] + ), + ] + policy = app_testing_objects.PortPolicy( + rules=rules, + default_action=raise_action + ) + return {key: policy} + + def test_ttl_packet_rate_limit(self): + initial_packet = self._create_packet( + self.topology.external_network.get_gw_ip(), + ryu.lib.packet.ipv4.inet.IPPROTO_ICMP, + ttl=1) + send_action = app_testing_objects.SendAction( + self.subnet.subnet_id, + self.port.port_id, + str(initial_packet)) + ignore_action = app_testing_objects.IgnoreAction() + policy = self.store( + app_testing_objects.Policy( + initial_actions=[ + send_action, + send_action, + send_action, + send_action, + ], + port_policies=self._create_rate_limit_port_policies( + cfg.CONF.df_dnat_app.dnat_ttl_invalid_max_rate, + app_testing_objects.RyuICMPTimeExceedFilter), + unknown_port_action=ignore_action + ) + ) + policy.start(self.topology) + # Since the rate limit, we expect timeout to wait for 4th packet hit + # the policy. + self.assertRaises( + app_testing_objects.TimeoutException, + policy.wait, + const.DEFAULT_RESOURCE_READY_TIMEOUT) + if len(policy.exceptions) > 0: + raise policy.exceptions[0] + def test_nat_embedded_packet(self): ignore_action = app_testing_objects.IgnoreAction() self.port.port.update({"security_groups": []}) @@ -1893,3 +1957,37 @@ class TestDNATApp(test_base.DFTestBase): policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT) if len(policy.exceptions) > 0: raise policy.exceptions[0] + + def test_nat_embedded_rate_limit(self): + self.port.port.update({"security_groups": []}) + initial_packet = self._create_packet( + self.topology.external_network.get_gw_ip(), + ryu.lib.packet.ipv4.inet.IPPROTO_UDP) + send_action = app_testing_objects.SendAction( + self.subnet.subnet_id, + self.port.port_id, + str(initial_packet)) + ignore_action = app_testing_objects.IgnoreAction() + policy = self.store( + app_testing_objects.Policy( + initial_actions=[ + send_action, + send_action, + send_action, + send_action, + ], + port_policies=self._create_rate_limit_port_policies( + cfg.CONF.df_dnat_app.dnat_icmp_error_max_rate, + app_testing_objects.RyuICMPUnreachFilter), + unknown_port_action=ignore_action + ) + ) + policy.start(self.topology) + # Since the rate limit, we expect timeout to wait for 4th packet hit + # the policy. + self.assertRaises( + app_testing_objects.TimeoutException, + policy.wait, + const.DEFAULT_RESOURCE_READY_TIMEOUT) + if len(policy.exceptions) > 0: + raise policy.exceptions[0]