Merge "Add rate limiter to icmp handler(DNAT)"
This commit is contained in:
commit
de74522e80
@ -27,6 +27,11 @@ df_dnat_app_opts = [
|
||||
cfg.StrOpt('ex_peer_patch_port', default='patch-int',
|
||||
help=_("Peer patch port in external bridge for integration "
|
||||
"bridge.")),
|
||||
cfg.IntOpt('dnat_ttl_invalid_max_rate', default=3,
|
||||
help=_('Max rate to reply ICMP time exceeded message per '
|
||||
'second.')),
|
||||
cfg.IntOpt('dnat_icmp_error_max_rate', default=3,
|
||||
help=_('Max rate to handle ICMP error message per second.')),
|
||||
]
|
||||
|
||||
|
||||
|
@ -26,6 +26,7 @@ from ryu.lib.packet import packet
|
||||
from ryu.ofproto import ether
|
||||
|
||||
from dragonflow._i18n import _LW
|
||||
from dragonflow.common import utils as df_utils
|
||||
from dragonflow import conf as cfg
|
||||
from dragonflow.controller.common import arp_responder
|
||||
from dragonflow.controller.common import constants as const
|
||||
@ -51,12 +52,25 @@ class DNATApp(df_base_app.DFlowApp):
|
||||
cfg.CONF.df_dnat_app.external_network_bridge
|
||||
self.external_bridge_mac = ""
|
||||
self.integration_bridge = cfg.CONF.df.integration_bridge
|
||||
self.int_peer_patch_port = cfg.CONF.df_dnat_app.int_peer_patch_port
|
||||
self.ex_peer_patch_port = cfg.CONF.df_dnat_app.ex_peer_patch_port
|
||||
self.conf = cfg.CONF.df_dnat_app
|
||||
self.int_peer_patch_port = self.conf.int_peer_patch_port
|
||||
self.ex_peer_patch_port = self.conf.ex_peer_patch_port
|
||||
self.external_networks = collections.defaultdict(int)
|
||||
self.local_floatingips = collections.defaultdict(str)
|
||||
# Map between fixed ip mac to floating ip
|
||||
self.floatingip_rarp_cache = {}
|
||||
self.egress_ttl_invalid_handler_rate_limit = df_utils.RateLimiter(
|
||||
max_rate=self.conf.dnat_ttl_invalid_max_rate,
|
||||
time_unit=1)
|
||||
self.ingress_ttl_invalid_handler_rate_limit = df_utils.RateLimiter(
|
||||
max_rate=self.conf.dnat_ttl_invalid_max_rate,
|
||||
time_unit=1)
|
||||
self.egress_icmp_error_rate_limit = df_utils.RateLimiter(
|
||||
max_rate=self.conf.dnat_icmp_error_max_rate,
|
||||
time_unit=1)
|
||||
self.ingress_icmp_error_rate_limit = df_utils.RateLimiter(
|
||||
max_rate=self.conf.dnat_icmp_error_max_rate,
|
||||
time_unit=1)
|
||||
self.api.register_table_handler(const.INGRESS_NAT_TABLE,
|
||||
self.ingress_packet_in_handler)
|
||||
self.api.register_table_handler(const.EGRESS_NAT_TABLE,
|
||||
@ -76,12 +90,26 @@ class DNATApp(df_base_app.DFlowApp):
|
||||
if msg.reason == ofproto.OFPR_INVALID_TTL:
|
||||
LOG.debug("Get an invalid TTL packet at table %s",
|
||||
const.INGRESS_NAT_TABLE)
|
||||
if self.ingress_ttl_invalid_handler_rate_limit():
|
||||
LOG.warning(_LW("Get more than %(rate)s TTL invalid "
|
||||
"packets per second at table %(table)s"),
|
||||
{'rate': self.conf.dnat_ttl_invalid_max_rate,
|
||||
'table': const.INGRESS_NAT_TABLE})
|
||||
return
|
||||
|
||||
icmp_ttl_pkt = icmp_error_generator.generate(
|
||||
icmp.ICMP_TIME_EXCEEDED, icmp.ICMP_TTL_EXPIRED_CODE, msg.data)
|
||||
in_port = msg.match.get('in_port')
|
||||
self.send_packet(in_port, icmp_ttl_pkt)
|
||||
return
|
||||
|
||||
if self.ingress_icmp_error_rate_limit():
|
||||
LOG.warning(_LW("Get more than %(rate)s ICMP error messages "
|
||||
"per second at table %(table)s"),
|
||||
{'rate': self.conf.dnat_icmp_error_max_rate,
|
||||
'table': const.INGRESS_NAT_TABLE})
|
||||
return
|
||||
|
||||
pkt = packet.Packet(msg.data)
|
||||
reply_pkt = self._revert_nat_for_icmp_embedded_packet(pkt, INGRESS)
|
||||
out_port = msg.match.get('reg7')
|
||||
@ -90,12 +118,19 @@ class DNATApp(df_base_app.DFlowApp):
|
||||
def egress_packet_in_handler(self, event):
|
||||
msg = event.msg
|
||||
ofproto = self.ofproto
|
||||
pkt = packet.Packet(msg.data)
|
||||
e_pkt = pkt.get_protocol(ethernet.ethernet)
|
||||
|
||||
if msg.reason == ofproto.OFPR_INVALID_TTL:
|
||||
LOG.debug("Get an invalid TTL packet at table %s",
|
||||
const.EGRESS_NAT_TABLE)
|
||||
if self.egress_ttl_invalid_handler_rate_limit():
|
||||
LOG.warning(_LW("Get more than %(rate)s TTL invalid "
|
||||
"packets per second at table %(table)s"),
|
||||
{'rate': self.conf.dnat_ttl_invalid_max_rate,
|
||||
'table': const.EGRESS_NAT_TABLE})
|
||||
return
|
||||
|
||||
pkt = packet.Packet(msg.data)
|
||||
e_pkt = pkt.get_protocol(ethernet.ethernet)
|
||||
floatingip = self.floatingip_rarp_cache.get(e_pkt.src)
|
||||
if floatingip:
|
||||
icmp_ttl_pkt = icmp_error_generator.generate(
|
||||
@ -109,6 +144,14 @@ class DNATApp(df_base_app.DFlowApp):
|
||||
return
|
||||
|
||||
if self.external_bridge_mac:
|
||||
if self.ingress_icmp_error_rate_limit():
|
||||
LOG.warning(_LW("Get more than %(rate)s ICMP error messages "
|
||||
"per second at table %(table)s"),
|
||||
{'rate': self.conf.dnat_icmp_error_max_rate,
|
||||
'table': const.INGRESS_NAT_TABLE})
|
||||
return
|
||||
|
||||
pkt = packet.Packet(msg.data)
|
||||
reply_pkt = self._revert_nat_for_icmp_embedded_packet(pkt, EGRESS)
|
||||
self.send_packet(self.external_ofport, reply_pkt)
|
||||
|
||||
|
@ -442,6 +442,12 @@ class Router(object):
|
||||
self.router.close()
|
||||
|
||||
|
||||
class TimeoutException(Exception):
|
||||
|
||||
def __init__(self):
|
||||
super(TimeoutException, self).__init__('Timeout')
|
||||
|
||||
|
||||
class Policy(object):
|
||||
"""Represent a policy, i.e. the expected packets on each port in the
|
||||
topology, and the actions to take in each case.
|
||||
@ -513,7 +519,7 @@ class Policy(object):
|
||||
:param timeout: After this many seconds, throw an exception
|
||||
:type timeout: Number
|
||||
"""
|
||||
exception = Exception('Timeout')
|
||||
exception = TimeoutException()
|
||||
if timeout is not None:
|
||||
entry_time = time.time()
|
||||
for thread in self.threads:
|
||||
|
@ -572,16 +572,15 @@ class TestDHCPApp(test_base.DFTestBase):
|
||||
)
|
||||
)
|
||||
policy.start(self.topology)
|
||||
try:
|
||||
policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
except Exception:
|
||||
# Since there is no dhcp response, we are expecting timeout
|
||||
# exception here.
|
||||
pass
|
||||
finally:
|
||||
policy.stop()
|
||||
if len(policy.exceptions) > 0:
|
||||
raise policy.exceptions[0]
|
||||
# Since there is no dhcp response, we are expecting timeout
|
||||
# exception here.
|
||||
self.assertRaises(
|
||||
app_testing_objects.TimeoutException,
|
||||
policy.wait,
|
||||
const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
policy.stop()
|
||||
if len(policy.exceptions) > 0:
|
||||
raise policy.exceptions[0]
|
||||
|
||||
def _test_enable_dhcp(self):
|
||||
# Create policy
|
||||
@ -852,16 +851,15 @@ class TestL3App(test_base.DFTestBase):
|
||||
)
|
||||
)
|
||||
policy.start(self.topology)
|
||||
try:
|
||||
policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
except Exception:
|
||||
# Since there is no OpenFlow in vswitch, we are expecting timeout
|
||||
# exception here.
|
||||
pass
|
||||
finally:
|
||||
policy.stop()
|
||||
if len(policy.exceptions) > 0:
|
||||
raise policy.exceptions[0]
|
||||
# Since there is no OpenFlow in vswitch, we are expecting timeout
|
||||
# exception here.
|
||||
self.assertRaises(
|
||||
app_testing_objects.TimeoutException,
|
||||
policy.wait,
|
||||
const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
policy.stop()
|
||||
if len(policy.exceptions) > 0:
|
||||
raise policy.exceptions[0]
|
||||
|
||||
cmd[1] = "set-controller"
|
||||
cmd.append(controller)
|
||||
@ -1868,6 +1866,72 @@ class TestDNATApp(test_base.DFTestBase):
|
||||
if len(policy.exceptions) > 0:
|
||||
raise policy.exceptions[0]
|
||||
|
||||
def _create_rate_limit_port_policies(self, rate, icmp_filter):
|
||||
ignore_action = app_testing_objects.IgnoreAction()
|
||||
raise_action = app_testing_objects.RaiseAction("Unexpected packet")
|
||||
# Disable port policy rule, so that any further packets will hit the
|
||||
# default action, which is raise_action in this case.
|
||||
count_action = app_testing_objects.CountAction(
|
||||
rate, app_testing_objects.DisableRuleAction())
|
||||
|
||||
key = (self.subnet.subnet_id, self.port.port_id)
|
||||
rules = [
|
||||
app_testing_objects.PortPolicyRule(
|
||||
# Detect ICMP, end simulation
|
||||
icmp_filter(self._get_ip),
|
||||
actions=[count_action]
|
||||
),
|
||||
app_testing_objects.PortPolicyRule(
|
||||
# Ignore gratuitous ARP packets
|
||||
app_testing_objects.RyuARPGratuitousFilter(),
|
||||
actions=[ignore_action]
|
||||
),
|
||||
app_testing_objects.PortPolicyRule(
|
||||
# Ignore IPv6 packets
|
||||
app_testing_objects.RyuIPv6Filter(),
|
||||
actions=[ignore_action]
|
||||
),
|
||||
]
|
||||
policy = app_testing_objects.PortPolicy(
|
||||
rules=rules,
|
||||
default_action=raise_action
|
||||
)
|
||||
return {key: policy}
|
||||
|
||||
def test_ttl_packet_rate_limit(self):
|
||||
initial_packet = self._create_packet(
|
||||
self.topology.external_network.get_gw_ip(),
|
||||
ryu.lib.packet.ipv4.inet.IPPROTO_ICMP,
|
||||
ttl=1)
|
||||
send_action = app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port.port_id,
|
||||
str(initial_packet))
|
||||
ignore_action = app_testing_objects.IgnoreAction()
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
],
|
||||
port_policies=self._create_rate_limit_port_policies(
|
||||
cfg.CONF.df_dnat_app.dnat_ttl_invalid_max_rate,
|
||||
app_testing_objects.RyuICMPTimeExceedFilter),
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
)
|
||||
policy.start(self.topology)
|
||||
# Since the rate limit, we expect timeout to wait for 4th packet hit
|
||||
# the policy.
|
||||
self.assertRaises(
|
||||
app_testing_objects.TimeoutException,
|
||||
policy.wait,
|
||||
const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
if len(policy.exceptions) > 0:
|
||||
raise policy.exceptions[0]
|
||||
|
||||
def test_nat_embedded_packet(self):
|
||||
ignore_action = app_testing_objects.IgnoreAction()
|
||||
self.port.port.update({"security_groups": []})
|
||||
@ -1893,3 +1957,37 @@ class TestDNATApp(test_base.DFTestBase):
|
||||
policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
if len(policy.exceptions) > 0:
|
||||
raise policy.exceptions[0]
|
||||
|
||||
def test_nat_embedded_rate_limit(self):
|
||||
self.port.port.update({"security_groups": []})
|
||||
initial_packet = self._create_packet(
|
||||
self.topology.external_network.get_gw_ip(),
|
||||
ryu.lib.packet.ipv4.inet.IPPROTO_UDP)
|
||||
send_action = app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port.port_id,
|
||||
str(initial_packet))
|
||||
ignore_action = app_testing_objects.IgnoreAction()
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
],
|
||||
port_policies=self._create_rate_limit_port_policies(
|
||||
cfg.CONF.df_dnat_app.dnat_icmp_error_max_rate,
|
||||
app_testing_objects.RyuICMPUnreachFilter),
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
)
|
||||
policy.start(self.topology)
|
||||
# Since the rate limit, we expect timeout to wait for 4th packet hit
|
||||
# the policy.
|
||||
self.assertRaises(
|
||||
app_testing_objects.TimeoutException,
|
||||
policy.wait,
|
||||
const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
if len(policy.exceptions) > 0:
|
||||
raise policy.exceptions[0]
|
||||
|
Loading…
Reference in New Issue
Block a user