Implement IpRuleCommand.list_rules() using pyroute2
Change-Id: I55d5dd756940e5a92f472c9309d49f427e907928 Related-Bug: #1492714
This commit is contained in:
parent
08bf770055
commit
c68ebd661b
@ -262,8 +262,7 @@ class FipNamespace(namespaces.Namespace):
|
||||
return new_gw_ips != old_gw_ips
|
||||
|
||||
def get_fip_table_indexes(self, ip_version):
|
||||
ns_ipr = ip_lib.IPRule(namespace=self.get_name())
|
||||
ip_rules_list = ns_ipr.rule.list_rules(ip_version)
|
||||
ip_rules_list = ip_lib.list_ip_rules(self.get_name(), ip_version)
|
||||
tbl_index_list = []
|
||||
for ip_rule in ip_rules_list:
|
||||
tbl_index = ip_rule['table']
|
||||
|
@ -326,7 +326,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
|
||||
pass
|
||||
|
||||
def _stale_ip_rule_cleanup(self, ns_ipr, ns_ipd, ip_version):
|
||||
ip_rules_list = ns_ipr.rule.list_rules(ip_version)
|
||||
ip_rules_list = ip_lib.list_ip_rules(ns_ipr.namespace, ip_version)
|
||||
snat_table_list = []
|
||||
for ip_rule in ip_rules_list:
|
||||
snat_table = ip_rule['table']
|
||||
|
@ -44,6 +44,18 @@ IP_NONLOCAL_BIND = 'net.ipv4.ip_nonlocal_bind'
|
||||
LOOPBACK_DEVNAME = 'lo'
|
||||
FB_TUNNEL_DEVICE_NAMES = ['gre0', 'gretap0', 'tunl0', 'erspan0', 'sit0',
|
||||
'ip6tnl0', 'ip6gre0']
|
||||
RULE_TABLES = {'default': 253,
|
||||
'main': 254,
|
||||
'local': 255}
|
||||
|
||||
# Rule indexes: pyroute2.netlink.rtnl
|
||||
# Rule names: https://www.systutorials.com/docs/linux/man/8-ip-rule/
|
||||
# NOTE(ralonsoh): 'masquerade' type is printed as 'nat' in 'ip rule' command
|
||||
RULE_TYPES = {1: 'unicast',
|
||||
6: 'blackhole',
|
||||
7: 'unreachable',
|
||||
8: 'prohibit',
|
||||
10: 'nat'}
|
||||
|
||||
SYS_NET_PATH = '/sys/class/net'
|
||||
DEFAULT_GW_PATTERN = re.compile(r"via (\S+)")
|
||||
@ -427,30 +439,8 @@ class IpRuleCommand(IpCommandBase):
|
||||
|
||||
return {k: str(v) for k, v in map(canonicalize, settings.items())}
|
||||
|
||||
def _parse_line(self, ip_version, line):
|
||||
# Typical rules from 'ip rule show':
|
||||
# 4030201: from 1.2.3.4/24 lookup 10203040
|
||||
# 1024: from all iif qg-c43b1928-48 lookup noscope
|
||||
|
||||
parts = line.split()
|
||||
if not parts:
|
||||
return {}
|
||||
|
||||
# Format of line is: "priority: <key> <value> ... [<type>]"
|
||||
settings = {k: v for k, v in zip(parts[1::2], parts[2::2])}
|
||||
settings['priority'] = parts[0][:-1]
|
||||
if len(parts) % 2 == 0:
|
||||
# When line has an even number of columns, last one is the type.
|
||||
settings['type'] = parts[-1]
|
||||
|
||||
return self._make_canonical(ip_version, settings)
|
||||
|
||||
def list_rules(self, ip_version):
|
||||
lines = self._as_root([ip_version], ['show']).splitlines()
|
||||
return [self._parse_line(ip_version, line) for line in lines]
|
||||
|
||||
def _exists(self, ip_version, **kwargs):
|
||||
return kwargs in self.list_rules(ip_version)
|
||||
return kwargs in list_ip_rules(self._parent.namespace, ip_version)
|
||||
|
||||
def _make__flat_args_tuple(self, *args, **kwargs):
|
||||
for kwargs_item in sorted(kwargs.items(), key=lambda i: i[0]):
|
||||
@ -1309,3 +1299,68 @@ def get_ipv6_forwarding(device, namespace=None):
|
||||
cmd = ['sysctl', '-b', "net.ipv6.conf.%s.forwarding" % device]
|
||||
ip_wrapper = IPWrapper(namespace)
|
||||
return int(ip_wrapper.netns.execute(cmd, run_as_root=True))
|
||||
|
||||
|
||||
def _parse_ip_rule(rule, ip_version):
|
||||
"""Parse a pyroute2 rule and returns a dictionary
|
||||
|
||||
Parameters contained in the returned dictionary:
|
||||
- priority: rule priority
|
||||
- from: source IP address
|
||||
- to: (optional) destination IP address
|
||||
- type: rule type (see RULE_TYPES)
|
||||
- table: table name or number (see RULE_TABLES)
|
||||
- fwmark: (optional) FW mark
|
||||
- iif: (optional) input interface name
|
||||
- oif: (optional) output interface name
|
||||
|
||||
:param rule: pyroute2 rule dictionary
|
||||
:param ip_version: IP version (4, 6)
|
||||
:return: dictionary with IP rule information
|
||||
"""
|
||||
parsed_rule = {'priority': str(rule['attrs'].get('FRA_PRIORITY', 0))}
|
||||
from_ip = rule['attrs'].get('FRA_SRC')
|
||||
if from_ip:
|
||||
parsed_rule['from'] = common_utils.ip_to_cidr(
|
||||
from_ip, prefix=rule['src_len'])
|
||||
if common_utils.is_cidr_host(parsed_rule['from']):
|
||||
parsed_rule['from'] = common_utils.cidr_to_ip(parsed_rule['from'])
|
||||
else:
|
||||
parsed_rule['from'] = constants.IP_ANY[ip_version]
|
||||
to_ip = rule['attrs'].get('FRA_DST')
|
||||
if to_ip:
|
||||
parsed_rule['to'] = common_utils.ip_to_cidr(
|
||||
to_ip, prefix=rule['dst_len'])
|
||||
if common_utils.is_cidr_host(parsed_rule['to']):
|
||||
parsed_rule['to'] = common_utils.cidr_to_ip(parsed_rule['to'])
|
||||
parsed_rule['type'] = RULE_TYPES[rule['action']]
|
||||
table_num = rule['attrs']['FRA_TABLE']
|
||||
for table_name in (name for (name, index) in
|
||||
RULE_TABLES.items() if index == table_num):
|
||||
parsed_rule['table'] = table_name
|
||||
break
|
||||
else:
|
||||
parsed_rule['table'] = str(table_num)
|
||||
fwmark = rule['attrs'].get('FRA_FWMARK')
|
||||
if fwmark:
|
||||
fwmask = rule['attrs'].get('FRA_FWMASK')
|
||||
parsed_rule['fwmark'] = '{0:#x}/{1:#x}'.format(fwmark, fwmask)
|
||||
iifname = rule['attrs'].get('FRA_IIFNAME')
|
||||
if iifname:
|
||||
parsed_rule['iif'] = iifname
|
||||
oifname = rule['attrs'].get('FRA_OIFNAME')
|
||||
if oifname:
|
||||
parsed_rule['oif'] = oifname
|
||||
|
||||
return parsed_rule
|
||||
|
||||
|
||||
def list_ip_rules(namespace, ip_version):
|
||||
"""List all IP rules in a namespace
|
||||
|
||||
:param namespace: namespace name
|
||||
:param ip_version: IP version (4, 6)
|
||||
:return: list of dictionaries with the rules information
|
||||
"""
|
||||
rules = privileged.list_ip_rules(namespace, ip_version)
|
||||
return [_parse_ip_rule(rule, ip_version) for rule in rules]
|
||||
|
@ -469,3 +469,22 @@ def get_devices(namespace, **kwargs):
|
||||
if e.errno == errno.ENOENT:
|
||||
raise NetworkNamespaceNotFound(netns_name=namespace)
|
||||
raise
|
||||
|
||||
|
||||
@privileged.default.entrypoint
|
||||
def list_ip_rules(namespace, ip_version, match=None, **kwargs):
|
||||
"""List all IP rules"""
|
||||
try:
|
||||
with _get_iproute(namespace) as ip:
|
||||
rules = ip.get_rules(family=_IP_VERSION_FAMILY_MAP[ip_version],
|
||||
match=match, **kwargs)
|
||||
for rule in rules:
|
||||
rule['attrs'] = {
|
||||
key: value for key, value
|
||||
in ((item[0], item[1]) for item in rule['attrs'])}
|
||||
return rules
|
||||
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
raise NetworkNamespaceNotFound(netns_name=namespace)
|
||||
raise
|
||||
|
@ -104,8 +104,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
self.agent._process_updated_router(router1.router)
|
||||
router_updated = self.agent.router_info[router1.router['id']]
|
||||
self.assertTrue(self._namespace_exists(router_updated.ns_name))
|
||||
ns_ipr = ip_lib.IPRule(namespace=router1.ns_name)
|
||||
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
|
||||
ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name,
|
||||
lib_constants.IP_VERSION_4)
|
||||
self.assertEqual(6, len(ip4_rules_list))
|
||||
# IPRule list should have 6 entries.
|
||||
# Three entries from 'default', 'main' and 'local' table.
|
||||
@ -364,9 +364,10 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
router1.router['external_gateway_info'] = ""
|
||||
restarted_router = self.manage_router(restarted_agent, router1.router)
|
||||
self.assertTrue(self._namespace_exists(restarted_router.ns_name))
|
||||
ns_ipr = ip_lib.IPRule(namespace=router1.ns_name)
|
||||
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
|
||||
ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6)
|
||||
ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name,
|
||||
lib_constants.IP_VERSION_4)
|
||||
ip6_rules_list = ip_lib.list_ip_rules(router1.ns_name,
|
||||
lib_constants.IP_VERSION_6)
|
||||
# Just make sure the basic set of rules are there in the router
|
||||
# namespace
|
||||
self.assertEqual(3, len(ip4_rules_list))
|
||||
@ -808,9 +809,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
self.assertTrue(self._namespace_exists(router1.ns_name))
|
||||
self.assertTrue(self._namespace_exists(fip_ns_name))
|
||||
self._assert_snat_namespace_exists(router1)
|
||||
ns_ipr = ip_lib.IPRule(namespace=router1.ns_name)
|
||||
ip4_rules_list_with_fip = ns_ipr.rule.list_rules(
|
||||
lib_constants.IP_VERSION_4)
|
||||
ip4_rules_list_with_fip = ip_lib.list_ip_rules(
|
||||
router1.ns_name, lib_constants.IP_VERSION_4)
|
||||
# The rules_list should have 6 entries:
|
||||
# 3 default rules (local, main and default)
|
||||
# 1 Fip forward rule
|
||||
@ -829,7 +829,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
router_updated = self.agent.router_info[router1.router['id']]
|
||||
self.assertTrue(self._namespace_exists(router_updated.ns_name))
|
||||
self._assert_snat_namespace_exists(router1)
|
||||
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
|
||||
ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name,
|
||||
lib_constants.IP_VERSION_4)
|
||||
self.assertEqual(5, len(ip4_rules_list))
|
||||
interface_rules_list_count = 0
|
||||
fip_rule_count = 0
|
||||
@ -946,22 +947,16 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
restarted_router.iptables_manager, 'nat', [prevent_snat_rule])
|
||||
|
||||
def _get_fixed_ip_rule_priority(self, namespace, fip):
|
||||
iprule = ip_lib.IPRule(namespace)
|
||||
lines = iprule.rule._as_root([4], ['show']).splitlines()
|
||||
for line in lines:
|
||||
if fip in line:
|
||||
info = iprule.rule._parse_line(4, line)
|
||||
return info['priority']
|
||||
ipv4_rules = ip_lib.list_ip_rules(namespace, 4)
|
||||
for rule in (rule for rule in ipv4_rules
|
||||
if utils.cidr_to_ip(rule['from']) == fip):
|
||||
return rule['priority']
|
||||
|
||||
def _fixed_ip_rule_exists(self, namespace, ip):
|
||||
iprule = ip_lib.IPRule(namespace)
|
||||
lines = iprule.rule._as_root([4], ['show']).splitlines()
|
||||
for line in lines:
|
||||
if ip in line:
|
||||
info = iprule.rule._parse_line(4, line)
|
||||
if info['from'] == ip:
|
||||
ipv4_rules = ip_lib.list_ip_rules(namespace, 4)
|
||||
for _ in (rule for rule in ipv4_rules
|
||||
if utils.cidr_to_ip(rule['from']) == ip):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def test_dvr_router_add_internal_network_set_arp_cache(self):
|
||||
@ -1582,9 +1577,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
else:
|
||||
self.assertIsNone(fg_device.route.get_gateway(filters=tbl_filter))
|
||||
|
||||
ip_rule = ip_lib.IPRule(namespace=fip_ns_name)
|
||||
ext_net_fw_rules_list = ip_rule.rule.list_rules(
|
||||
lib_constants.IP_VERSION_4)
|
||||
ext_net_fw_rules_list = ip_lib.list_ip_rules(
|
||||
fip_ns_name, lib_constants.IP_VERSION_4)
|
||||
if not check_fpr_int_rule_delete:
|
||||
# When floatingip are associated, make sure that the
|
||||
# corresponding rules and routes in route table are created
|
||||
@ -1699,9 +1693,10 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
self._assert_fip_namespace_deleted(
|
||||
agent_gw_port, assert_ovs_interface=False)
|
||||
if not address_scopes or no_external:
|
||||
ns_ipr = ip_lib.IPRule(namespace=router_updated.ns_name)
|
||||
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
|
||||
ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6)
|
||||
ip4_rules_list = ip_lib.list_ip_rules(router_updated.ns_name,
|
||||
lib_constants.IP_VERSION_4)
|
||||
ip6_rules_list = ip_lib.list_ip_rules(router_updated.ns_name,
|
||||
lib_constants.IP_VERSION_6)
|
||||
self.assertEqual(3, len(ip4_rules_list))
|
||||
self.assertEqual(2, len(ip6_rules_list))
|
||||
|
||||
@ -1759,9 +1754,10 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
fip_2_rtr, rfp_device, rfp_device_name)
|
||||
|
||||
# Check if any snat redirect rules in the router namespace exist.
|
||||
ns_ipr = ip_lib.IPRule(namespace=router1.ns_name)
|
||||
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
|
||||
ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6)
|
||||
ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name,
|
||||
lib_constants.IP_VERSION_4)
|
||||
ip6_rules_list = ip_lib.list_ip_rules(router1.ns_name,
|
||||
lib_constants.IP_VERSION_6)
|
||||
# Just make sure the basic set of rules are there in the router
|
||||
# namespace
|
||||
self.assertEqual(5, len(ip4_rules_list))
|
||||
@ -1822,9 +1818,10 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
self.assertFalse(mock_add_interface_route_rule.called)
|
||||
self.assertFalse(mock_add_fip_interface_route_rule.called)
|
||||
# Check if any snat redirect rules in the router namespace exist.
|
||||
ns_ipr = ip_lib.IPRule(namespace=router1.ns_name)
|
||||
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
|
||||
ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6)
|
||||
ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name,
|
||||
lib_constants.IP_VERSION_4)
|
||||
ip6_rules_list = ip_lib.list_ip_rules(router1.ns_name,
|
||||
lib_constants.IP_VERSION_6)
|
||||
# Just make sure the basic set of rules are there in the router
|
||||
# namespace
|
||||
self.assertEqual(5, len(ip4_rules_list))
|
||||
|
@ -26,6 +26,7 @@ import testtools
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.common import utils
|
||||
from neutron.conf.agent import common as config
|
||||
from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional import base as functional_base
|
||||
|
||||
@ -163,14 +164,14 @@ class IpLibTestCase(IpLibTestFramework):
|
||||
for rule in test_case:
|
||||
ip_rule.rule.add(table=TABLE, priority=PRIORITY, **rule)
|
||||
|
||||
rules = ip_rule.rule.list_rules(ip_version)
|
||||
rules = ip_lib.list_ip_rules(ip_rule.namespace, ip_version)
|
||||
for expected_rule in expected_rules[ip_version]:
|
||||
self.assertIn(expected_rule, rules)
|
||||
|
||||
for rule in test_case:
|
||||
ip_rule.rule.delete(table=TABLE, priority=PRIORITY, **rule)
|
||||
|
||||
rules = ip_rule.rule.list_rules(ip_version)
|
||||
rules = priv_ip_lib.list_ip_rules(ip_rule.namespace, ip_version)
|
||||
for expected_rule in expected_rules[ip_version]:
|
||||
self.assertNotIn(expected_rule, rules)
|
||||
|
||||
|
@ -16,10 +16,10 @@ from oslo_utils import uuidutils
|
||||
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
|
||||
from neutron.tests.functional import base
|
||||
from neutron.tests.functional import base as functional_base
|
||||
|
||||
|
||||
class GetDevicesTestCase(base.BaseLoggingTestCase):
|
||||
class GetDevicesTestCase(functional_base.BaseLoggingTestCase):
|
||||
|
||||
def _remove_ns(self, namespace):
|
||||
priv_ip_lib.remove_netns(namespace)
|
||||
@ -44,3 +44,56 @@ class GetDevicesTestCase(base.BaseLoggingTestCase):
|
||||
device_names = priv_ip_lib.get_devices(namespace)
|
||||
for name in device_names:
|
||||
self.assertNotIn(name, interfaces)
|
||||
|
||||
|
||||
class ListIpRulesTestCase(functional_base.BaseSudoTestCase):
|
||||
|
||||
RULE_TABLES = {'default': 253, 'main': 254, 'local': 255}
|
||||
|
||||
def setUp(self):
|
||||
super(ListIpRulesTestCase, self).setUp()
|
||||
self.namespace = 'ns_test-' + uuidutils.generate_uuid()
|
||||
self.ns = priv_ip_lib.create_netns(self.namespace)
|
||||
self.ip_rule = ip_lib.IPRule(namespace=self.namespace)
|
||||
self.addCleanup(self._remove_ns)
|
||||
|
||||
def _remove_ns(self):
|
||||
priv_ip_lib.remove_netns(self.namespace)
|
||||
|
||||
def test_list_default_rules_ipv4(self):
|
||||
rules_ipv4 = priv_ip_lib.list_ip_rules(self.namespace, 4)
|
||||
self.assertEqual(3, len(rules_ipv4))
|
||||
rule_tables = list(self.RULE_TABLES.values())
|
||||
for rule in rules_ipv4:
|
||||
rule_tables.remove(rule['table'])
|
||||
self.assertEqual(0, len(rule_tables))
|
||||
|
||||
def test_list_rules_ipv4(self):
|
||||
self.ip_rule.rule.add('192.168.0.1/24', table=10)
|
||||
rules_ipv4 = priv_ip_lib.list_ip_rules(self.namespace, 4)
|
||||
for rule in rules_ipv4:
|
||||
if rule['table'] == 10:
|
||||
self.assertEqual('192.168.0.1', rule['attrs']['FRA_SRC'])
|
||||
self.assertEqual(24, rule['src_len'])
|
||||
break
|
||||
else:
|
||||
self.fail('Rule added (192.168.0.1/24, table 10) not found')
|
||||
|
||||
def test_list_default_rules_ipv6(self):
|
||||
rules_ipv6 = priv_ip_lib.list_ip_rules(self.namespace, 6)
|
||||
self.assertEqual(2, len(rules_ipv6))
|
||||
rule_tables = [255, 254]
|
||||
for rule in rules_ipv6:
|
||||
rule_tables.remove(rule['table'])
|
||||
self.assertEqual(0, len(rule_tables))
|
||||
|
||||
def test_list_rules_ipv6(self):
|
||||
self.ip_rule.rule.add('2001:db8::1/64', table=20)
|
||||
rules_ipv6 = priv_ip_lib.list_ip_rules(self.namespace, 6)
|
||||
for rule in rules_ipv6:
|
||||
if rule['table'] == 20:
|
||||
self.assertEqual('2001:db8::1', rule['attrs']['FRA_SRC'])
|
||||
self.assertEqual(64, rule['src_len'])
|
||||
break
|
||||
else:
|
||||
self.fail('Rule added (2001:db8::1/64, table 20) not found')
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import errno
|
||||
import socket
|
||||
|
||||
@ -20,6 +21,7 @@ import mock
|
||||
import netaddr
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import exceptions
|
||||
from oslo_utils import uuidutils
|
||||
import pyroute2
|
||||
from pyroute2.netlink.rtnl import ifinfmsg
|
||||
from pyroute2.netlink.rtnl import ndmsg
|
||||
@ -29,6 +31,7 @@ import testtools
|
||||
from neutron.agent.common import utils # noqa
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.common import exceptions as n_exc
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron import privileged
|
||||
from neutron.privileged.agent.linux import ip_lib as priv_lib
|
||||
from neutron.tests import base
|
||||
@ -617,13 +620,23 @@ class TestIpRuleCommand(TestIPCmdBase):
|
||||
def setUp(self):
|
||||
super(TestIpRuleCommand, self).setUp()
|
||||
self.parent._as_root.return_value = ''
|
||||
self.parent.namespace = uuidutils.generate_uuid()
|
||||
self.command = 'rule'
|
||||
self.rule_cmd = ip_lib.IpRuleCommand(self.parent)
|
||||
self._mock_priv_list_ip_rules = mock.patch.object(priv_lib,
|
||||
'list_ip_rules')
|
||||
self.mock_priv_list_ip_rules = self._mock_priv_list_ip_rules.start()
|
||||
self.addCleanup(self._stop_mock)
|
||||
|
||||
def _stop_mock(self):
|
||||
self._mock_priv_list_ip_rules.stop()
|
||||
|
||||
def _test_add_rule(self, ip, table, priority):
|
||||
ip_version = netaddr.IPNetwork(ip).version
|
||||
with mock.patch.object(ip_lib, '_parse_ip_rule'):
|
||||
self.rule_cmd.add(ip, table=table, priority=priority)
|
||||
self._assert_sudo([ip_version], (['show']))
|
||||
self.mock_priv_list_ip_rules.assert_called_once_with(
|
||||
self.parent.namespace, n_utils.get_ip_version(ip))
|
||||
self._assert_sudo([ip_version], ('add', 'from', ip,
|
||||
'priority', str(priority),
|
||||
'table', str(table),
|
||||
@ -631,9 +644,13 @@ class TestIpRuleCommand(TestIPCmdBase):
|
||||
|
||||
def _test_add_rule_exists(self, ip, table, priority, output):
|
||||
self.parent._as_root.return_value = output
|
||||
ip_version = netaddr.IPNetwork(ip).version
|
||||
with mock.patch.object(self.rule_cmd, '_exists', return_value=True) \
|
||||
as mock_exists:
|
||||
self.rule_cmd.add(ip, table=table, priority=priority)
|
||||
self._assert_sudo([ip_version], (['show']))
|
||||
kwargs = {'from': ip, 'priority': str(priority),
|
||||
'table': str(table), 'type': 'unicast'}
|
||||
mock_exists.assert_called_once_with(n_utils.get_ip_version(ip),
|
||||
**kwargs)
|
||||
|
||||
def _test_delete_rule(self, ip, table, priority):
|
||||
ip_version = netaddr.IPNetwork(ip).version
|
||||
@ -642,23 +659,6 @@ class TestIpRuleCommand(TestIPCmdBase):
|
||||
('del', 'from', ip, 'priority', str(priority),
|
||||
'table', str(table), 'type', 'unicast'))
|
||||
|
||||
def test__parse_line(self):
|
||||
def test(ip_version, line, expected):
|
||||
actual = self.rule_cmd._parse_line(ip_version, line)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
test(4, "4030201:\tfrom 1.2.3.4/24 lookup 10203040",
|
||||
{'from': '1.2.3.4/24',
|
||||
'table': '10203040',
|
||||
'type': 'unicast',
|
||||
'priority': '4030201'})
|
||||
test(6, "1024: from all iif qg-c43b1928-48 lookup noscope",
|
||||
{'priority': '1024',
|
||||
'from': '::/0',
|
||||
'type': 'unicast',
|
||||
'iif': 'qg-c43b1928-48',
|
||||
'table': 'noscope'})
|
||||
|
||||
def test__make_canonical_all_v4(self):
|
||||
actual = self.rule_cmd._make_canonical(4, {'from': 'all'})
|
||||
self.assertEqual({'from': '0.0.0.0/0', 'type': 'unicast'}, actual)
|
||||
@ -1863,3 +1863,120 @@ class TestConntrack(base.BaseTestCase):
|
||||
device.delete_socket_conntrack_state(ip_str, dport, protocol)
|
||||
self.execute.assert_called_once_with(expect_cmd, check_exit_code=True,
|
||||
extra_ok_codes=[1])
|
||||
|
||||
|
||||
class ParseIpRuleTestCase(base.BaseTestCase):
|
||||
|
||||
BASE_RULE = {
|
||||
'family': 2, 'dst_len': 0, 'res2': 0, 'tos': 0, 'res1': 0, 'flags': 0,
|
||||
'header': {
|
||||
'pid': 18152, 'length': 44, 'flags': 2, 'error': None, 'type': 32,
|
||||
'sequence_number': 281},
|
||||
'attrs': {'FRA_TABLE': 255, 'FRA_SUPPRESS_PREFIXLEN': 4294967295},
|
||||
'table': 255, 'action': 1, 'src_len': 0, 'event': 'RTM_NEWRULE'}
|
||||
|
||||
def setUp(self):
|
||||
super(ParseIpRuleTestCase, self).setUp()
|
||||
self.rule = copy.deepcopy(self.BASE_RULE)
|
||||
|
||||
def test_parse_priority(self):
|
||||
self.rule['attrs']['FRA_PRIORITY'] = 1000
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertEqual('1000', parsed_rule['priority'])
|
||||
|
||||
def test_parse_from_ipv4(self):
|
||||
self.rule['attrs']['FRA_SRC'] = '192.168.0.1'
|
||||
self.rule['src_len'] = 24
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertEqual('192.168.0.1/24', parsed_rule['from'])
|
||||
|
||||
def test_parse_from_ipv6(self):
|
||||
self.rule['attrs']['FRA_SRC'] = '2001:db8::1'
|
||||
self.rule['src_len'] = 64
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 6)
|
||||
self.assertEqual('2001:db8::1/64', parsed_rule['from'])
|
||||
|
||||
def test_parse_from_any_ipv4(self):
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertEqual('0.0.0.0/0', parsed_rule['from'])
|
||||
|
||||
def test_parse_from_any_ipv6(self):
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 6)
|
||||
self.assertEqual('::/0', parsed_rule['from'])
|
||||
|
||||
def test_parse_to_ipv4(self):
|
||||
self.rule['attrs']['FRA_DST'] = '192.168.10.1'
|
||||
self.rule['dst_len'] = 24
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertEqual('192.168.10.1/24', parsed_rule['to'])
|
||||
|
||||
def test_parse_to_ipv6(self):
|
||||
self.rule['attrs']['FRA_DST'] = '2001:db8::1'
|
||||
self.rule['dst_len'] = 64
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 6)
|
||||
self.assertEqual('2001:db8::1/64', parsed_rule['to'])
|
||||
|
||||
def test_parse_to_none(self):
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertIsNone(parsed_rule.get('to'))
|
||||
|
||||
def test_parse_table(self):
|
||||
self.rule['attrs']['FRA_TABLE'] = 255
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertEqual('local', parsed_rule['table'])
|
||||
self.rule['attrs']['FRA_TABLE'] = 254
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertEqual('main', parsed_rule['table'])
|
||||
self.rule['attrs']['FRA_TABLE'] = 253
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertEqual('default', parsed_rule['table'])
|
||||
self.rule['attrs']['FRA_TABLE'] = 1000
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertEqual('1000', parsed_rule['table'])
|
||||
|
||||
def test_parse_fwmark(self):
|
||||
self.rule['attrs']['FRA_FWMARK'] = 1000
|
||||
self.rule['attrs']['FRA_FWMASK'] = 10
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertEqual('0x3e8/0xa', parsed_rule['fwmark'])
|
||||
|
||||
def test_parse_fwmark_none(self):
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertIsNone(parsed_rule.get('fwmark'))
|
||||
|
||||
def test_parse_iif(self):
|
||||
self.rule['attrs']['FRA_IIFNAME'] = 'input_interface_name'
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertEqual('input_interface_name', parsed_rule['iif'])
|
||||
|
||||
def test_parse_iif_none(self):
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertIsNone(parsed_rule.get('iif'))
|
||||
|
||||
def test_parse_oif(self):
|
||||
self.rule['attrs']['FRA_OIFNAME'] = 'output_interface_name'
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertEqual('output_interface_name', parsed_rule['oif'])
|
||||
|
||||
def test_parse_oif_none(self):
|
||||
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
|
||||
self.assertIsNone(parsed_rule.get('oif'))
|
||||
|
||||
|
||||
class ListIpRulesTestCase(base.BaseTestCase):
|
||||
|
||||
def test_list_ip_rules(self):
|
||||
rule1 = {'family': 2, 'src_len': 24, 'action': 1,
|
||||
'attrs': {'FRA_SRC': '10.0.0.1', 'FRA_TABLE': 100}}
|
||||
rule2 = {'family': 2, 'src_len': 0, 'action': 6,
|
||||
'attrs': {'FRA_TABLE': 255}}
|
||||
rules = [rule1, rule2]
|
||||
with mock.patch.object(priv_lib, 'list_ip_rules') as mock_list_rules:
|
||||
mock_list_rules.return_value = rules
|
||||
retval = ip_lib.list_ip_rules(mock.ANY, 4)
|
||||
reference = [
|
||||
{'type': 'unicast', 'from': '10.0.0.1/24', 'priority': '0',
|
||||
'table': '100'},
|
||||
{'type': 'blackhole', 'from': '0.0.0.0/0', 'priority': '0',
|
||||
'table': 'local'}]
|
||||
self.assertEqual(reference, retval)
|
||||
|
Loading…
Reference in New Issue
Block a user