From f40128b437e58e39efc9f02af7135c02f7826b42 Mon Sep 17 00:00:00 2001 From: LIU Yulong Date: Wed, 5 Apr 2017 09:38:55 +0800 Subject: [PATCH] [L3][QoS] Adding L3 rate limit TC lib This is the TC lib utils for L3 IP QoS implementation. For more detail please see [1]: L3 agent side TC rules. [1] https://review.openstack.org/#/c/374506/ Partially-Implements blueprint: floating-ip-rate-limit Related-Bug: #1596611 Change-Id: Icfec83ca6dc31d7283d9c6c6ef0997d5e60daae6 --- etc/neutron/rootwrap.d/l3.filters | 9 + neutron/agent/linux/l3_tc_lib.py | 194 +++++++++ neutron/common/exceptions.py | 13 + .../functional/agent/linux/test_l3_tc_lib.py | 152 +++++++ .../tests/unit/agent/linux/test_l3_tc_lib.py | 396 ++++++++++++++++++ 5 files changed, 764 insertions(+) create mode 100644 neutron/agent/linux/l3_tc_lib.py create mode 100644 neutron/tests/functional/agent/linux/test_l3_tc_lib.py create mode 100644 neutron/tests/unit/agent/linux/test_l3_tc_lib.py diff --git a/etc/neutron/rootwrap.d/l3.filters b/etc/neutron/rootwrap.d/l3.filters index a0a86e600c4..ea18b1ca43d 100644 --- a/etc/neutron/rootwrap.d/l3.filters +++ b/etc/neutron/rootwrap.d/l3.filters @@ -34,6 +34,15 @@ ip: IpFilter, ip, root find: RegExpFilter, find, root, find, /sys/class/net, -maxdepth, 1, -type, l, -printf, %.* ip_exec: IpNetnsExecFilter, ip, root +# l3_tc_lib +l3_tc_show_qdisc: RegExpFilter, tc, root, tc, qdisc, show, dev, .+ +l3_tc_add_qdisc_ingress: RegExpFilter, tc, root, tc, qdisc, add, dev, .+, ingress +l3_tc_add_qdisc_egress: RegExpFilter, tc, root, tc, qdisc, add, dev, .+, root, handle, 1:, htb +l3_tc_show_filters: RegExpFilter, tc, root, tc, -p, -s, -d, filter, show, dev, .+, parent, .+, prio, 1 +l3_tc_delete_filters: RegExpFilter, tc, root, tc, filter, del, dev, .+, parent, .+, prio, 1, handle, .+, u32 +l3_tc_add_filter_ingress: RegExpFilter, tc, root, tc, filter, add, dev, .+, parent, .+, protocol, ip, prio, 1, u32, match, ip, dst, .+, police, rate, .+, burst, .+, drop, flowid, :1 +l3_tc_add_filter_egress: RegExpFilter, tc, root, tc, filter, add, dev, .+, parent, .+, protocol, ip, prio, 1, u32, match, ip, src, .+, police, rate, .+, burst, .+, drop, flowid, :1 + # For ip monitor kill_ip_monitor: KillFilter, root, ip, -9 diff --git a/neutron/agent/linux/l3_tc_lib.py b/neutron/agent/linux/l3_tc_lib.py new file mode 100644 index 00000000000..19a15c2ba9c --- /dev/null +++ b/neutron/agent/linux/l3_tc_lib.py @@ -0,0 +1,194 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import re + +from neutron_lib import constants +from oslo_log import log as logging + +from neutron.agent.linux import ip_lib +from neutron.agent.linux import tc_lib +from neutron.common import exceptions + +LOG = logging.getLogger(__name__) + +QDISC_IN_REGEX = re.compile(r"qdisc ingress (\w+:) *") +QDISC_OUT_REGEX = re.compile(r"qdisc htb (\w+:) *") +FILTER_ID_REGEX = re.compile(r"filter protocol ip u32 fh (\w+::\w+) *") +FILTER_STATS_REGEX = re.compile(r"Sent (\w+) bytes (\w+) pkts *") + + +class FloatingIPTcCommandBase(ip_lib.IPDevice): + + def _execute_tc_cmd(self, cmd, **kwargs): + cmd = ['tc'] + cmd + ip_wrapper = ip_lib.IPWrapper(self.namespace) + return ip_wrapper.netns.execute(cmd, run_as_root=True, **kwargs) + + def _get_qdiscs(self): + cmd = ['qdisc', 'show', 'dev', self.name] + return self._execute_tc_cmd(cmd) + + def _get_qdisc_id_for_filter(self, direction): + qdisc_results = self._get_qdiscs().split('\n') + for qdisc in qdisc_results: + pattern = (QDISC_OUT_REGEX + if direction == constants.EGRESS_DIRECTION + else QDISC_IN_REGEX) + m = pattern.match(qdisc) + if m: + # No chance to get multiple qdiscs + return m.group(1) + + def _add_qdisc(self, direction): + if direction == constants.EGRESS_DIRECTION: + args = ['root', 'handle', '1:', 'htb'] + else: + args = ['ingress'] + cmd = ['qdisc', 'add', 'dev', self.name] + args + self._execute_tc_cmd(cmd) + + def _get_filters(self, qdisc_id): + cmd = ['-p', '-s', '-d', 'filter', 'show', 'dev', self.name, + 'parent', qdisc_id, 'prio', 1] + return self._execute_tc_cmd(cmd) + + def _get_filterid_for_ip(self, qdisc_id, ip): + filterids_for_ip = [] + filters_output = self._get_filters(qdisc_id) + if not filters_output: + raise exceptions.FilterIDForIPNotFound(ip=ip) + filter_lines = filters_output.split('\n') + for line in filter_lines: + line = line.strip() + m = FILTER_ID_REGEX.match(line) + if m: + filter_id = m.group(1) + # It matched, so ip/32 is not here. continue + continue + elif not line.startswith('match'): + continue + parts = line.split(" ") + if ip + '/32' in parts: + filterids_for_ip.append(filter_id) + if len(filterids_for_ip) > 1: + raise exceptions.MultipleFilterIDForIPFound(ip=ip) + elif len(filterids_for_ip) == 0: + raise exceptions.FilterIDForIPNotFound(ip=ip) + return filterids_for_ip[0] + + def _del_filter_by_id(self, qdisc_id, filter_id): + cmd = ['filter', 'del', 'dev', self.name, + 'parent', qdisc_id, + 'prio', 1, 'handle', filter_id, 'u32'] + self._execute_tc_cmd(cmd) + + def _get_qdisc_filters(self, qdisc_id): + filterids = [] + filters_output = self._get_filters(qdisc_id) + if not filters_output: + return filterids + filter_lines = filters_output.split('\n') + for line in filter_lines: + line = line.strip() + m = FILTER_ID_REGEX.match(line) + if m: + filter_id = m.group(1) + filterids.append(filter_id) + return filterids + + def _add_filter(self, qdisc_id, direction, ip, rate, burst): + rate_value = "%s%s" % (rate, tc_lib.BW_LIMIT_UNIT) + burst_value = "%s%s" % ( + tc_lib.TcCommand.get_ingress_qdisc_burst_value(rate, burst), + tc_lib.BURST_UNIT + ) + protocol = ['protocol', 'ip'] + prio = ['prio', 1] + _match = 'src' if direction == constants.EGRESS_DIRECTION else 'dst' + match = ['u32', 'match', 'ip', _match, ip] + police = ['police', 'rate', rate_value, 'burst', burst_value, + 'drop', 'flowid', ':1'] + args = protocol + prio + match + police + cmd = ['filter', 'add', 'dev', self.name, + 'parent', qdisc_id] + args + self._execute_tc_cmd(cmd) + + def _get_or_create_qdisc(self, direction): + qdisc_id = self._get_qdisc_id_for_filter(direction) + if not qdisc_id: + self._add_qdisc(direction) + qdisc_id = self._get_qdisc_id_for_filter(direction) + if not qdisc_id: + raise exceptions.FailedToAddQdiscToDevice(direction=direction, + device=self.name) + return qdisc_id + + +class FloatingIPTcCommand(FloatingIPTcCommandBase): + + def clear_all_filters(self, direction): + qdisc_id = self._get_qdisc_id_for_filter(direction) + if not qdisc_id: + return + filterids = self._get_qdisc_filters(qdisc_id) + for filter_id in filterids: + self._del_filter_by_id(qdisc_id, filter_id) + + def get_filter_id_for_ip(self, direction, ip): + qdisc_id = self._get_qdisc_id_for_filter(direction) + if not qdisc_id: + return + return self._get_filterid_for_ip(qdisc_id, ip) + + def get_existing_filter_ids(self, direction): + qdisc_id = self._get_qdisc_id_for_filter(direction) + if not qdisc_id: + return + return self._get_qdisc_filters(qdisc_id) + + def delete_filter_ids(self, direction, filterids): + qdisc_id = self._get_qdisc_id_for_filter(direction) + if not qdisc_id: + return + for filter_id in filterids: + self._del_filter_by_id(qdisc_id, filter_id) + + def set_ip_rate_limit(self, direction, ip, rate, burst): + qdisc_id = self._get_or_create_qdisc(direction) + try: + filter_id = self._get_filterid_for_ip(qdisc_id, ip) + LOG.debug("Filter %(filter)s for IP %(ip)s in %(direction)s " + "qdisc already existed, removing.", + {'filter': filter_id, + 'ip': ip, + 'direction': direction}) + self._del_filter_by_id(qdisc_id, filter_id) + except exceptions.FilterIDForIPNotFound: + pass + LOG.debug("Adding filter for IP %(ip)s in %(direction)s.", + {'ip': ip, + 'direction': direction}) + self._add_filter(qdisc_id, direction, ip, rate, burst) + + def clear_ip_rate_limit(self, direction, ip): + qdisc_id = self._get_qdisc_id_for_filter(direction) + if not qdisc_id: + return + try: + filter_id = self._get_filterid_for_ip(qdisc_id, ip) + self._del_filter_by_id(qdisc_id, filter_id) + except exceptions.FilterIDForIPNotFound: + LOG.debug("No filter found for IP %(ip)s in %(direction)s, " + "skipping deletion.", + {'ip': ip, + 'direction': direction}) diff --git a/neutron/common/exceptions.py b/neutron/common/exceptions.py index 9b8e5f36afe..7ecfc5da883 100644 --- a/neutron/common/exceptions.py +++ b/neutron/common/exceptions.py @@ -352,3 +352,16 @@ class TenantQuotaNotFound(e.NotFound): class TenantIdProjectIdFilterConflict(e.BadRequest): message = _("Both tenant_id and project_id passed as filters.") + + +class MultipleFilterIDForIPFound(e.Conflict): + message = _("Multiple filter IDs for IP %(ip)s found.") + + +class FilterIDForIPNotFound(e.NotFound): + message = _("Filter ID for IP %(ip)s could not be found.") + + +class FailedToAddQdiscToDevice(e.NeutronException): + message = _("Failed to add %(direction)s qdisc " + "to device %(device)s.") diff --git a/neutron/tests/functional/agent/linux/test_l3_tc_lib.py b/neutron/tests/functional/agent/linux/test_l3_tc_lib.py new file mode 100644 index 00000000000..70e8fa827f7 --- /dev/null +++ b/neutron/tests/functional/agent/linux/test_l3_tc_lib.py @@ -0,0 +1,152 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from neutron_lib import constants as common_constants +from oslo_utils import uuidutils + +from neutron.agent.l3 import namespaces +from neutron.agent.linux import ip_lib +from neutron.agent.linux import l3_tc_lib +from neutron.common import exceptions +from neutron.tests.functional import base as functional_base + +RATE_LIMIT = 1024 +BURST_LIMIT = 512 +DEV_NAME = "test_device" + + +class TcLibTestCase(functional_base.BaseSudoTestCase): + + def create_tc_wrapper_with_namespace_and_device(self): + ns_name = uuidutils.generate_uuid() + namespace = namespaces.Namespace( + ns_name, None, + mock.Mock(), False) + namespace.create() + self.addCleanup(namespace.delete) + ip_wrapper = ip_lib.IPWrapper(namespace=ns_name) + tc_device = ip_wrapper.add_tuntap(DEV_NAME) + tc_device.link.set_up() + return l3_tc_lib.FloatingIPTcCommand( + DEV_NAME, + namespace=ns_name) + + def test_clear_all_filters(self): + ip_addr = "2.2.2.2" + l3_tc = self.create_tc_wrapper_with_namespace_and_device() + l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr, + RATE_LIMIT, BURST_LIMIT) + l3_tc.set_ip_rate_limit(common_constants.EGRESS_DIRECTION, ip_addr, + RATE_LIMIT, BURST_LIMIT) + + l3_tc.clear_all_filters(common_constants.INGRESS_DIRECTION) + self.assertRaises(exceptions.FilterIDForIPNotFound, + l3_tc.get_filter_id_for_ip, + common_constants.INGRESS_DIRECTION, + ip_addr) + + l3_tc.clear_all_filters(common_constants.EGRESS_DIRECTION) + self.assertRaises(exceptions.FilterIDForIPNotFound, + l3_tc.get_filter_id_for_ip, + common_constants.EGRESS_DIRECTION, + ip_addr) + + def test_get_filter_id_for_ip(self): + ip_addr = "3.3.3.3" + l3_tc = self.create_tc_wrapper_with_namespace_and_device() + l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr, + RATE_LIMIT, BURST_LIMIT) + l3_tc.set_ip_rate_limit(common_constants.EGRESS_DIRECTION, ip_addr, + RATE_LIMIT, BURST_LIMIT) + + self.assertIsNotNone( + l3_tc.get_filter_id_for_ip(common_constants.INGRESS_DIRECTION, + ip_addr)) + self.assertIsNotNone( + l3_tc.get_filter_id_for_ip(common_constants.EGRESS_DIRECTION, + ip_addr)) + + # testing: IP filter does not exist + self.assertRaises(exceptions.FilterIDForIPNotFound, + l3_tc.get_filter_id_for_ip, + common_constants.EGRESS_DIRECTION, + '33.33.33.33') + + def test_get_existing_filter_ids(self): + ip_addr = "4.4.4.4" + l3_tc = self.create_tc_wrapper_with_namespace_and_device() + l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr, + RATE_LIMIT, BURST_LIMIT) + l3_tc.set_ip_rate_limit(common_constants.EGRESS_DIRECTION, ip_addr, + RATE_LIMIT, BURST_LIMIT) + + filter_ids = l3_tc.get_existing_filter_ids( + common_constants.INGRESS_DIRECTION) + self.assertNotEqual(0, len(filter_ids)) + filter_ids = l3_tc.get_existing_filter_ids( + common_constants.EGRESS_DIRECTION) + self.assertNotEqual(0, len(filter_ids)) + + def test_delete_filter_ids(self): + ip_addr1 = "5.5.5.5" + ip_addr2 = "6.6.6.6" + l3_tc = self.create_tc_wrapper_with_namespace_and_device() + l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr1, + RATE_LIMIT, BURST_LIMIT) + l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr2, + RATE_LIMIT, BURST_LIMIT) + + filter_ids = l3_tc.get_existing_filter_ids( + common_constants.INGRESS_DIRECTION) + self.assertEqual(2, len(filter_ids)) + l3_tc.delete_filter_ids(common_constants.INGRESS_DIRECTION, + filter_ids) + filter_ids = l3_tc.get_existing_filter_ids( + common_constants.INGRESS_DIRECTION) + self.assertEqual(0, len(filter_ids)) + + def test_set_ip_rate_limit(self): + ip_addr = "7.7.7.7" + l3_tc = self.create_tc_wrapper_with_namespace_and_device() + # Set it multiple times + l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr, + RATE_LIMIT, BURST_LIMIT) + l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr, + RATE_LIMIT, BURST_LIMIT) + l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, ip_addr, + RATE_LIMIT, BURST_LIMIT) + # Get only one and no exception + filter_id = l3_tc.get_filter_id_for_ip( + common_constants.INGRESS_DIRECTION, + ip_addr) + self.assertIsNotNone(filter_id) + + def test_clear_ip_rate_limit(self): + ip_addr = "8.8.8.8" + l3_tc = self.create_tc_wrapper_with_namespace_and_device() + l3_tc.set_ip_rate_limit(common_constants.INGRESS_DIRECTION, + ip_addr, + RATE_LIMIT, BURST_LIMIT) + filter_id = l3_tc.get_filter_id_for_ip( + common_constants.INGRESS_DIRECTION, + ip_addr) + self.assertIsNotNone(filter_id) + filter_id = l3_tc.clear_ip_rate_limit( + common_constants.INGRESS_DIRECTION, + ip_addr) + self.assertIsNone(filter_id) + + # testing: IP filter does not exist + l3_tc.clear_ip_rate_limit( + common_constants.INGRESS_DIRECTION, + "88.88.88.88") diff --git a/neutron/tests/unit/agent/linux/test_l3_tc_lib.py b/neutron/tests/unit/agent/linux/test_l3_tc_lib.py new file mode 100644 index 00000000000..684b117a649 --- /dev/null +++ b/neutron/tests/unit/agent/linux/test_l3_tc_lib.py @@ -0,0 +1,396 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from neutron_lib import constants + +from neutron.agent.linux import l3_tc_lib as tc_lib +from neutron.common import exceptions +from neutron.tests import base + +FLOATING_IP_DEVICE_NAME = "qg-device_rfp" +FLOATING_IP_ROUTER_NAMESPACE = "qrouter-namespace_snat-namespace" + +FLOATING_IP_1 = "172.16.5.146" +FLOATING_IP_2 = "172.16.10.105" +FILETER_ID_1 = "800::800" +FILETER_ID_2 = "800::801" + +TC_INGRESS_FILTERS = ( + 'filter protocol ip u32 \n' + 'filter protocol ip u32 fh 800: ht divisor 1 \n' + 'filter protocol ip u32 fh %(filter_id1)s order 2048 key ' + 'ht 800 bkt 0 ' + 'flowid :1 (rule hit 0 success 0)\n' + ' match IP dst %(fip1)s/32 (success 0 ) \n' + ' police 0x3 rate 3000Kbit burst 3Mb mtu 64Kb action drop overhead 0b \n' + 'ref 1 bind 1\n' + '\n' + ' Sent 111 bytes 222 pkts (dropped 0, overlimits 0) \n' + 'filter protocol ip u32 fh %(filter_id2)s order 2049 key ' + 'ht 800 bkt 0 ' + 'flowid :1 (rule hit 0 success 0)\n' + ' match IP dst %(fip2)s/32 (success 0 ) \n' + ' police 0x1b rate 22000Kbit burst 22Mb mtu 64Kb action drop ' + 'overhead 0b \n' + 'ref 1 bind 1\n' + '\n' + ' Sent 111 bytes 222 pkts (dropped 0, overlimits 0)\n') % { + "filter_id1": FILETER_ID_1, + "fip1": FLOATING_IP_1, + "filter_id2": FILETER_ID_2, + "fip2": FLOATING_IP_2} + +TC_INGRESS_FILTERS_DUP = TC_INGRESS_FILTERS + ( + 'filter protocol ip u32 fh %(filter_id2)s order 2049 key ' + 'ht 800 bkt 0 ' + 'flowid :1 (rule hit 0 success 0)\n' + ' match IP dst %(fip2)s/32 (success 0 ) \n' + ' police 0x1b rate 22000Kbit burst 22Mb mtu 64Kb action drop ' + 'overhead 0b \n' + 'ref 1 bind 1\n' + '\n' + ' Sent 111 bytes 222 pkts (dropped 0, overlimits 0)\n') % { + "filter_id2": FILETER_ID_2, + "fip2": FLOATING_IP_2} + +TC_EGRESS_FILTERS = ( + 'filter protocol ip u32 \n' + 'filter protocol ip u32 fh 800: ht divisor 1 \n' + 'filter protocol ip u32 fh %(filter_id1)s order 2048 key ' + 'ht 800 bkt 0 ' + 'flowid :1 (rule hit 0 success 0)\n' + ' match IP src %(fip1)s/32 (success 0 ) \n' + ' police 0x4 rate 3000Kbit burst 3Mb mtu 64Kb action drop overhead 0b \n' + 'ref 1 bind 1\n' + '\n' + ' Sent 111 bytes 222 pkts (dropped 0, overlimits 0) \n' + 'filter protocol ip u32 fh %(filter_id2)s order 2049 key ' + 'ht 800 bkt 0 ' + 'flowid :1 (rule hit 0 success 0)\n' + ' match IP src %(fip2)s/32 (success 0 ) \n' + ' police 0x1c rate 22000Kbit burst 22Mb mtu 64Kb action drop ' + 'overhead 0b \n' + 'ref 1 bind 1\n' + '\n' + ' Sent 111 bytes 222 pkts (dropped 0, overlimits 0)\n') % { + "filter_id1": FILETER_ID_1, + "fip1": FLOATING_IP_1, + "filter_id2": FILETER_ID_2, + "fip2": FLOATING_IP_2} +FILTERS_IDS = {constants.INGRESS_DIRECTION: TC_INGRESS_FILTERS, + constants.EGRESS_DIRECTION: TC_EGRESS_FILTERS} + +INGRESS_QSIC_ID = "ffff:" +EGRESS_QDISC_ID = "1:" +QDISC_IDS = {constants.INGRESS_DIRECTION: INGRESS_QSIC_ID, + constants.EGRESS_DIRECTION: EGRESS_QDISC_ID} +TC_QDISCS = ( + 'qdisc htb %(egress)s root refcnt 2 r2q 10 default 0 ' + 'direct_packets_stat 6\n' + 'qdisc ingress %(ingress)s parent ffff:fff1 ----------------\n') % { + "egress": EGRESS_QDISC_ID, + "ingress": INGRESS_QSIC_ID} + + +class TestFloatingIPTcCommandBase(base.BaseTestCase): + def setUp(self): + super(TestFloatingIPTcCommandBase, self).setUp() + self.tc = tc_lib.FloatingIPTcCommandBase( + FLOATING_IP_DEVICE_NAME, + namespace=FLOATING_IP_ROUTER_NAMESPACE) + self.execute = mock.patch('neutron.agent.common.utils.execute').start() + + def test__get_qdiscs(self): + self.tc._get_qdiscs() + self.execute.assert_called_once_with( + ['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE, + 'tc', 'qdisc', 'show', 'dev', FLOATING_IP_DEVICE_NAME], + run_as_root=True, + check_exit_code=True, + log_fail_as_error=True, + extra_ok_codes=None + ) + + def test__get_qdisc_id_for_filter(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_qdiscs') as get_qdiscs: + get_qdiscs.return_value = TC_QDISCS + q1 = self.tc._get_qdisc_id_for_filter(constants.INGRESS_DIRECTION) + self.assertEqual(INGRESS_QSIC_ID, q1) + q2 = self.tc._get_qdisc_id_for_filter(constants.EGRESS_DIRECTION) + self.assertEqual(EGRESS_QDISC_ID, q2) + + def test__add_qdisc(self): + self.tc._add_qdisc(constants.INGRESS_DIRECTION) + self.execute.assert_called_with( + ['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE, + 'tc', 'qdisc', 'add', 'dev', FLOATING_IP_DEVICE_NAME, 'ingress'], + run_as_root=True, + check_exit_code=True, + log_fail_as_error=True, + extra_ok_codes=None + ) + self.tc._add_qdisc(constants.EGRESS_DIRECTION) + self.execute.assert_called_with( + ['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE, + 'tc', 'qdisc', 'add', 'dev', + FLOATING_IP_DEVICE_NAME] + ['root', 'handle', '1:', 'htb'], + run_as_root=True, + check_exit_code=True, + log_fail_as_error=True, + extra_ok_codes=None + ) + + def test__get_filters(self): + self.tc._get_filters(INGRESS_QSIC_ID) + self.execute.assert_called_with( + ['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE, + 'tc', '-p', '-s', '-d', 'filter', 'show', 'dev', + FLOATING_IP_DEVICE_NAME, + 'parent', INGRESS_QSIC_ID, 'prio', 1], + run_as_root=True, + check_exit_code=True, + log_fail_as_error=True, + extra_ok_codes=None + ) + + def test__get_filterid_for_ip(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_filters') as get_filters: + get_filters.return_value = TC_EGRESS_FILTERS + f_id = self.tc._get_filterid_for_ip(INGRESS_QSIC_ID, FLOATING_IP_1) + self.assertEqual(FILETER_ID_1, f_id) + + def test__get_filterid_for_ip_no_output(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_filters') as get_filters: + get_filters.return_value = "" + self.assertRaises(exceptions.FilterIDForIPNotFound, + self.tc._get_filterid_for_ip, + INGRESS_QSIC_ID, FLOATING_IP_1) + + def test__get_filterid_for_ip_duplicated(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_filters') as get_filters: + get_filters.return_value = TC_INGRESS_FILTERS_DUP + self.assertRaises(exceptions.MultipleFilterIDForIPFound, + self.tc._get_filterid_for_ip, + INGRESS_QSIC_ID, FLOATING_IP_2) + + def test__get_filterid_for_ip_not_found(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_filters') as get_filters: + get_filters.return_value = TC_EGRESS_FILTERS + self.assertRaises(exceptions.FilterIDForIPNotFound, + self.tc._get_filterid_for_ip, + INGRESS_QSIC_ID, "1.1.1.1") + + def test__del_filter_by_id(self): + self.tc._del_filter_by_id(INGRESS_QSIC_ID, FLOATING_IP_1) + self.execute.assert_called_once_with( + ['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE, + 'tc', 'filter', 'del', 'dev', FLOATING_IP_DEVICE_NAME, + 'parent', INGRESS_QSIC_ID, + 'prio', 1, 'handle', FLOATING_IP_1, 'u32'], + run_as_root=True, + check_exit_code=True, + log_fail_as_error=True, + extra_ok_codes=None + ) + + def test__get_qdisc_filters(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_filters') as get_filters: + get_filters.return_value = TC_EGRESS_FILTERS + f_ids = self.tc._get_qdisc_filters(INGRESS_QSIC_ID) + self.assertEqual([FILETER_ID_1, FILETER_ID_2], f_ids) + + def test__get_qdisc_filters_no_output(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_filters') as get_filters: + get_filters.return_value = "" + f_ids = self.tc._get_qdisc_filters(INGRESS_QSIC_ID) + self.assertEqual(0, len(f_ids)) + + def test__add_filter(self): + protocol = ['protocol', 'ip'] + prio = ['prio', 1] + match = ['u32', 'match', 'ip', 'dst', FLOATING_IP_1] + police = ['police', 'rate', '1kbit', 'burst', '1kbit', + 'drop', 'flowid', ':1'] + args = protocol + prio + match + police + cmd = ['tc', 'filter', 'add', 'dev', FLOATING_IP_DEVICE_NAME, + 'parent', INGRESS_QSIC_ID] + args + + self.tc._add_filter(INGRESS_QSIC_ID, + constants.INGRESS_DIRECTION, + FLOATING_IP_1, 1, 1) + self.execute.assert_called_once_with( + ['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE] + cmd, + run_as_root=True, + check_exit_code=True, + log_fail_as_error=True, + extra_ok_codes=None + ) + + def test__get_or_create_qdisc(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_qdisc_id_for_filter') as get_disc1: + get_disc1.return_value = None + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_add_qdisc'): + with mock.patch.object( + tc_lib.FloatingIPTcCommandBase, + '_get_qdisc_id_for_filter') as get_disc2: + get_disc2.return_value = INGRESS_QSIC_ID + qdisc_id = self.tc._get_or_create_qdisc( + constants.INGRESS_DIRECTION) + self.assertEqual(INGRESS_QSIC_ID, qdisc_id) + + def test__get_or_create_qdisc_failed(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_qdisc_id_for_filter') as get_disc1: + get_disc1.return_value = None + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_add_qdisc'): + with mock.patch.object( + tc_lib.FloatingIPTcCommandBase, + '_get_qdisc_id_for_filter') as get_disc2: + get_disc2.return_value = None + self.assertRaises(exceptions.FailedToAddQdiscToDevice, + self.tc._get_or_create_qdisc, + constants.INGRESS_DIRECTION) + + +class TestFloatingIPTcCommand(base.BaseTestCase): + def setUp(self): + super(TestFloatingIPTcCommand, self).setUp() + self.tc = tc_lib.FloatingIPTcCommand( + FLOATING_IP_DEVICE_NAME, + namespace=FLOATING_IP_ROUTER_NAMESPACE) + self.execute = mock.patch('neutron.agent.common.utils.execute').start() + + def test_clear_all_filters(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_qdisc_id_for_filter') as get_disc: + get_disc.return_value = EGRESS_QDISC_ID + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_filters') as get_filters: + get_filters.return_value = TC_EGRESS_FILTERS + self.tc.clear_all_filters(constants.EGRESS_DIRECTION) + self.assertEqual(2, self.execute.call_count) + + def test_set_ip_rate_limit_filter_existed(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_qdisc_id_for_filter') as get_disc: + get_disc.return_value = EGRESS_QDISC_ID + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_filterid_for_ip') as get_filter: + get_filter.return_value = FILETER_ID_1 + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_del_filter_by_id') as del_filter: + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_add_filter') as add_filter: + ip = "111.111.111.111" + self.tc.set_ip_rate_limit(constants.EGRESS_DIRECTION, + ip, 1, 1) + del_filter.assert_called_once_with( + EGRESS_QDISC_ID, FILETER_ID_1) + add_filter.assert_called_once_with( + EGRESS_QDISC_ID, constants.EGRESS_DIRECTION, + ip, 1, 1) + + def test_set_ip_rate_limit_no_qdisc(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_qdisc_id_for_filter') as get_disc: + get_disc.return_value = None + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_add_qdisc'): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_filters') as get_filters: + get_filters.return_value = TC_INGRESS_FILTERS + get_disc.return_value = INGRESS_QSIC_ID + ip = "111.111.111.111" + self.tc.set_ip_rate_limit(constants.INGRESS_DIRECTION, + ip, 1, 1) + + protocol = ['protocol', 'ip'] + prio = ['prio', 1] + _match = 'dst' + match = ['u32', 'match', 'ip', _match, ip] + police = ['police', 'rate', '1kbit', 'burst', '1kbit', + 'drop', 'flowid', ':1'] + args = protocol + prio + match + police + + self.execute.assert_called_once_with( + ['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE, + 'tc', 'filter', 'add', 'dev', FLOATING_IP_DEVICE_NAME, + 'parent', INGRESS_QSIC_ID] + args, + run_as_root=True, + check_exit_code=True, + log_fail_as_error=True, + extra_ok_codes=None + ) + + def test_clear_ip_rate_limit(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_qdisc_id_for_filter') as get_disc: + get_disc.return_value = EGRESS_QDISC_ID + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_filterid_for_ip') as get_filter_id: + get_filter_id.return_value = FILETER_ID_1 + self.tc.clear_ip_rate_limit(constants.EGRESS_DIRECTION, + FLOATING_IP_1) + + self.execute.assert_called_once_with( + ['ip', 'netns', 'exec', FLOATING_IP_ROUTER_NAMESPACE, + 'tc', 'filter', 'del', 'dev', FLOATING_IP_DEVICE_NAME, + 'parent', EGRESS_QDISC_ID, + 'prio', 1, 'handle', FILETER_ID_1, 'u32'], + run_as_root=True, + check_exit_code=True, + log_fail_as_error=True, + extra_ok_codes=None + ) + + def test_get_filter_id_for_ip(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_qdisc_id_for_filter') as get_disc: + get_disc.return_value = EGRESS_QDISC_ID + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_filterid_for_ip') as get_filter_id: + self.tc.get_filter_id_for_ip(constants.EGRESS_DIRECTION, + '8.8.8.8') + get_filter_id.assert_called_once_with(EGRESS_QDISC_ID, + '8.8.8.8') + + def test_get_existing_filter_ids(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_qdisc_id_for_filter') as get_disc: + get_disc.return_value = EGRESS_QDISC_ID + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_qdisc_filters') as get_filter_ids: + self.tc.get_existing_filter_ids(constants.EGRESS_DIRECTION) + get_filter_ids.assert_called_once_with(EGRESS_QDISC_ID) + + def test_delete_filter_ids(self): + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_get_qdisc_id_for_filter') as get_disc: + get_disc.return_value = EGRESS_QDISC_ID + with mock.patch.object(tc_lib.FloatingIPTcCommandBase, + '_del_filter_by_id') as del_filter_id: + self.tc.delete_filter_ids(constants.EGRESS_DIRECTION, + [FILETER_ID_1, FILETER_ID_2]) + del_filter_id.assert_has_calls( + [mock.call(EGRESS_QDISC_ID, FILETER_ID_1), + mock.call(EGRESS_QDISC_ID, FILETER_ID_2)])