Merge "Implement IpRuleCommand.delete() using pyroute2"
This commit is contained in:
commit
902740ff52
@ -404,7 +404,6 @@ class FipNamespace(namespaces.Namespace):
|
||||
# Remove the fip namespace rules and routes associated to
|
||||
# fpr interface route table.
|
||||
tbl_index = ri._get_snat_idx(fip_2_rtr)
|
||||
fip_rt_rule = ip_lib.IPRule(namespace=fip_ns_name)
|
||||
# Flush the table
|
||||
fg_device.route.flush(lib_constants.IP_VERSION_4,
|
||||
table=tbl_index)
|
||||
@ -416,10 +415,9 @@ class FipNamespace(namespaces.Namespace):
|
||||
# link-local address IP version. Using any of those
|
||||
# is equivalent to using 'from all' for iproute2.
|
||||
rule_ip = lib_constants.IP_ANY[fip_2_rtr.ip.version]
|
||||
fip_rt_rule.rule.delete(ip=rule_ip,
|
||||
iif=fip_2_rtr_name,
|
||||
table=tbl_index,
|
||||
priority=tbl_index)
|
||||
ip_lib.delete_ip_rule(fip_ns_name, ip=rule_ip,
|
||||
iif=fip_2_rtr_name, table=tbl_index,
|
||||
priority=tbl_index)
|
||||
self.local_subnets.release(ri.router_id)
|
||||
ri.rtr_fip_subnet = None
|
||||
|
||||
|
@ -157,10 +157,9 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
|
||||
def _remove_floating_ip_rule(self, floating_ip):
|
||||
if floating_ip in self.floating_ips_dict:
|
||||
fixed_ip, rule_pr = self.floating_ips_dict[floating_ip]
|
||||
ip_rule = ip_lib.IPRule(namespace=self.ns_name)
|
||||
ip_rule.rule.delete(ip=fixed_ip,
|
||||
table=dvr_fip_ns.FIP_RT_TBL,
|
||||
priority=int(str(rule_pr)))
|
||||
ip_lib.delete_ip_rule(self.ns_name, ip=fixed_ip,
|
||||
table=dvr_fip_ns.FIP_RT_TBL,
|
||||
priority=int(str(rule_pr)))
|
||||
self.fip_ns.deallocate_rule_priority(floating_ip)
|
||||
# TODO(rajeev): Handle else case - exception/log?
|
||||
|
||||
@ -325,8 +324,8 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
|
||||
except exceptions.DeviceNotFoundError:
|
||||
pass
|
||||
|
||||
def _stale_ip_rule_cleanup(self, ns_ipr, ns_ipd, ip_version):
|
||||
ip_rules_list = ip_lib.list_ip_rules(ns_ipr.namespace, ip_version)
|
||||
def _stale_ip_rule_cleanup(self, namespace, ns_ipd, ip_version):
|
||||
ip_rules_list = ip_lib.list_ip_rules(namespace, ip_version)
|
||||
snat_table_list = []
|
||||
for ip_rule in ip_rules_list:
|
||||
snat_table = ip_rule['table']
|
||||
@ -338,24 +337,23 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
|
||||
dvr_fip_ns.FIP_PR_END)):
|
||||
continue
|
||||
gateway_cidr = ip_rule['from']
|
||||
ns_ipr.rule.delete(ip=gateway_cidr,
|
||||
table=snat_table,
|
||||
priority=priority)
|
||||
ip_lib.delete_ip_rule(namespace, ip=gateway_cidr, table=snat_table,
|
||||
priority=priority)
|
||||
snat_table_list.append(snat_table)
|
||||
for tb in snat_table_list:
|
||||
ns_ipd.route.flush(ip_version, table=tb)
|
||||
|
||||
def gateway_redirect_cleanup(self, rtr_interface):
|
||||
ns_ipr = ip_lib.IPRule(namespace=self.ns_name)
|
||||
ns_ipd = ip_lib.IPDevice(rtr_interface, namespace=self.ns_name)
|
||||
self._stale_ip_rule_cleanup(ns_ipr, ns_ipd, lib_constants.IP_VERSION_4)
|
||||
self._stale_ip_rule_cleanup(ns_ipr, ns_ipd, lib_constants.IP_VERSION_6)
|
||||
self._stale_ip_rule_cleanup(self.ns_name, ns_ipd,
|
||||
lib_constants.IP_VERSION_4)
|
||||
self._stale_ip_rule_cleanup(self.ns_name, ns_ipd,
|
||||
lib_constants.IP_VERSION_6)
|
||||
|
||||
def _snat_redirect_modify(self, gateway, sn_port, sn_int, is_add):
|
||||
"""Adds or removes rules and routes for SNAT redirection."""
|
||||
cmd = ['net.ipv4.conf.%s.send_redirects=0' % sn_int]
|
||||
try:
|
||||
ns_ipr = ip_lib.IPRule(namespace=self.ns_name)
|
||||
ns_ipd = ip_lib.IPDevice(sn_int, namespace=self.ns_name)
|
||||
for port_fixed_ip in sn_port['fixed_ips']:
|
||||
# Iterate and find the gateway IP address matching
|
||||
@ -380,9 +378,10 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
|
||||
self._delete_gateway_device_if_exists(ns_ipd,
|
||||
gw_ip_addr,
|
||||
snat_idx)
|
||||
ns_ipr.rule.delete(ip=sn_port_cidr,
|
||||
table=snat_idx,
|
||||
priority=snat_idx)
|
||||
ip_lib.delete_ip_rule(self.ns_name,
|
||||
ip=sn_port_cidr,
|
||||
table=snat_idx,
|
||||
priority=snat_idx)
|
||||
except Exception:
|
||||
if is_add:
|
||||
exc = 'DVR: error adding redirection logic'
|
||||
@ -696,12 +695,11 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
|
||||
priority=dvr_fip_ns.FAST_PATH_EXIT_PR)
|
||||
|
||||
def _delete_interface_routing_rule_in_router_ns(self, router_port):
|
||||
ip_rule = ip_lib.IPRule(namespace=self.ns_name)
|
||||
for subnet in router_port['subnets']:
|
||||
rtr_port_cidr = subnet['cidr']
|
||||
ip_rule.rule.delete(ip=rtr_port_cidr,
|
||||
table=dvr_fip_ns.FIP_RT_TBL,
|
||||
priority=dvr_fip_ns.FAST_PATH_EXIT_PR)
|
||||
ip_lib.delete_ip_rule(self.ns_name, ip=rtr_port_cidr,
|
||||
table=dvr_fip_ns.FIP_RT_TBL,
|
||||
priority=dvr_fip_ns.FAST_PATH_EXIT_PR)
|
||||
|
||||
def get_rtr_fip_ip_and_interface_name(self):
|
||||
"""Function that returns the router to fip interface name and ip."""
|
||||
|
@ -27,7 +27,6 @@ from oslo_utils import excutils
|
||||
from pyroute2.netlink.rtnl import ifinfmsg
|
||||
from pyroute2 import NetlinkError
|
||||
from pyroute2 import netns
|
||||
import six
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.agent.common import utils
|
||||
@ -370,97 +369,6 @@ class IpCommandBase(object):
|
||||
use_root_namespace=use_root_namespace)
|
||||
|
||||
|
||||
class IPRule(SubProcessBase):
|
||||
def __init__(self, namespace=None):
|
||||
super(IPRule, self).__init__(namespace=namespace)
|
||||
self.rule = IpRuleCommand(self)
|
||||
|
||||
|
||||
class IpRuleCommand(IpCommandBase):
|
||||
COMMAND = 'rule'
|
||||
|
||||
@staticmethod
|
||||
def _make_canonical(ip_version, settings):
|
||||
"""Converts settings to a canonical representation to compare easily"""
|
||||
def canonicalize_fwmark_string(fwmark_mask):
|
||||
"""Reformats fwmark/mask in to a canonical form
|
||||
|
||||
Examples, these are all equivalent:
|
||||
"0x1"
|
||||
0x1
|
||||
"0x1/0xfffffffff"
|
||||
(0x1, 0xfffffffff)
|
||||
|
||||
:param fwmark_mask: The firewall and mask (default 0xffffffff)
|
||||
:type fwmark_mask: A string with / as delimiter, an iterable, or a
|
||||
single value.
|
||||
"""
|
||||
# Turn the value we were passed in to an iterable: fwmark[, mask]
|
||||
if isinstance(fwmark_mask, six.string_types):
|
||||
# A / separates the optional mask in a string
|
||||
iterable = fwmark_mask.split('/')
|
||||
else:
|
||||
try:
|
||||
iterable = iter(fwmark_mask)
|
||||
except TypeError:
|
||||
# At this point, it must be a single integer
|
||||
iterable = [fwmark_mask]
|
||||
|
||||
def to_i(s):
|
||||
if isinstance(s, six.string_types):
|
||||
# Passing 0 as "base" arg to "int" causes it to determine
|
||||
# the base automatically.
|
||||
return int(s, 0)
|
||||
# s isn't a string, can't specify base argument
|
||||
return int(s)
|
||||
|
||||
integers = [to_i(x) for x in iterable]
|
||||
|
||||
# The default mask is all ones, the mask is 32 bits.
|
||||
if len(integers) == 1:
|
||||
integers.append(0xffffffff)
|
||||
|
||||
# We now have two integers in a list. Convert to canonical string.
|
||||
return '{0:#x}/{1:#x}'.format(*integers)
|
||||
|
||||
def canonicalize(item):
|
||||
k, v = item
|
||||
# ip rule shows these as 'any'
|
||||
if k == 'from' and v == 'all':
|
||||
return k, constants.IP_ANY[ip_version]
|
||||
# lookup and table are interchangeable. Use table every time.
|
||||
if k == 'lookup':
|
||||
return 'table', v
|
||||
if k == 'fwmark':
|
||||
return k, canonicalize_fwmark_string(v)
|
||||
return k, v
|
||||
|
||||
if 'type' not in settings:
|
||||
settings['type'] = 'unicast'
|
||||
|
||||
return {k: str(v) for k, v in map(canonicalize, settings.items())}
|
||||
|
||||
def _make__flat_args_tuple(self, *args, **kwargs):
|
||||
for kwargs_item in sorted(kwargs.items(), key=lambda i: i[0]):
|
||||
args += kwargs_item
|
||||
return tuple(args)
|
||||
|
||||
def delete(self, ip, **kwargs):
|
||||
ip_version = common_utils.get_ip_version(ip)
|
||||
|
||||
# In case we need to delete a rule based on an incoming
|
||||
# interface, pass the "any" IP address, for example, 0.0.0.0/0,
|
||||
# else pass the given IP.
|
||||
if kwargs.get('iif'):
|
||||
kwargs.update({'from': constants.IP_ANY[ip_version]})
|
||||
else:
|
||||
kwargs.update({'from': ip})
|
||||
canonical_kwargs = self._make_canonical(ip_version, kwargs)
|
||||
|
||||
args_tuple = self._make__flat_args_tuple('del', **canonical_kwargs)
|
||||
self._as_root([ip_version], args_tuple)
|
||||
|
||||
|
||||
class IpDeviceCommandBase(IpCommandBase):
|
||||
@property
|
||||
def name(self):
|
||||
@ -1414,3 +1322,18 @@ def add_ip_rule(namespace, ip, iif=None, table=None, priority=None, to=None):
|
||||
return
|
||||
cmd_args = _make_pyroute2_args(ip, iif, table, priority, to)
|
||||
privileged.add_ip_rule(namespace, **cmd_args)
|
||||
|
||||
|
||||
def delete_ip_rule(namespace, ip, iif=None, table=None, priority=None,
|
||||
to=None):
|
||||
"""Delete an IP rule in a namespace
|
||||
|
||||
:param namespace: (string) namespace name
|
||||
:param ip: (string) source IP or CIDR address (IPv4, IPv6)
|
||||
:param iif: (Optional) (string) input interface name
|
||||
:param table: (Optional) (string, int) table number
|
||||
:param priority: (Optional) (string, int) rule priority
|
||||
:param to: (Optional) (string) destination IP or CIDR address (IPv4, IPv6)
|
||||
"""
|
||||
cmd_args = _make_pyroute2_args(ip, iif, table, priority, to)
|
||||
privileged.delete_ip_rule(namespace, **cmd_args)
|
||||
|
@ -505,3 +505,15 @@ def add_ip_rule(namespace, **kwargs):
|
||||
if e.errno == errno.ENOENT:
|
||||
raise NetworkNamespaceNotFound(netns_name=namespace)
|
||||
raise
|
||||
|
||||
|
||||
@privileged.default.entrypoint
|
||||
def delete_ip_rule(namespace, **kwargs):
|
||||
"""Delete an IP rule"""
|
||||
try:
|
||||
with _get_iproute(namespace) as ip:
|
||||
ip.rule('del', **kwargs)
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
raise NetworkNamespaceNotFound(netns_name=namespace)
|
||||
raise
|
||||
|
@ -107,7 +107,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name,
|
||||
lib_constants.IP_VERSION_4)
|
||||
self.assertEqual(6, len(ip4_rules_list))
|
||||
# IPRule list should have 6 entries.
|
||||
# list_ip_rules list should have 6 entries.
|
||||
# Three entries from 'default', 'main' and 'local' table.
|
||||
# One rule for the floatingip.
|
||||
# The remaining 2 is for the two router interfaces(csnat ports).
|
||||
|
@ -159,20 +159,20 @@ class IpLibTestCase(IpLibTestFramework):
|
||||
]
|
||||
}
|
||||
|
||||
ip_rule = ip_lib.IPRule(namespace=device.namespace)
|
||||
for ip_version, test_case in test_cases.items():
|
||||
for rule in test_case:
|
||||
ip_lib.add_ip_rule(namespace=device.namespace, table=TABLE,
|
||||
priority=PRIORITY, **rule)
|
||||
|
||||
rules = ip_lib.list_ip_rules(ip_rule.namespace, ip_version)
|
||||
rules = ip_lib.list_ip_rules(device.namespace, ip_version)
|
||||
for expected_rule in expected_rules[ip_version]:
|
||||
self.assertIn(expected_rule, rules)
|
||||
|
||||
for rule in test_case:
|
||||
ip_rule.rule.delete(table=TABLE, priority=PRIORITY, **rule)
|
||||
ip_lib.delete_ip_rule(device.namespace, table=TABLE,
|
||||
priority=PRIORITY, **rule)
|
||||
|
||||
rules = priv_ip_lib.list_ip_rules(ip_rule.namespace, ip_version)
|
||||
rules = priv_ip_lib.list_ip_rules(device.namespace, ip_version)
|
||||
for expected_rule in expected_rules[ip_version]:
|
||||
self.assertNotIn(expected_rule, rules)
|
||||
|
||||
|
@ -56,7 +56,6 @@ class ListIpRulesTestCase(functional_base.BaseSudoTestCase):
|
||||
super(ListIpRulesTestCase, self).setUp()
|
||||
self.namespace = 'ns_test-' + uuidutils.generate_uuid()
|
||||
self.ns = priv_ip_lib.create_netns(self.namespace)
|
||||
self.ip_rule = ip_lib.IPRule(namespace=self.namespace)
|
||||
self.addCleanup(self._remove_ns)
|
||||
|
||||
def _remove_ns(self):
|
||||
@ -112,13 +111,17 @@ class RuleTestCase(functional_base.BaseSudoTestCase):
|
||||
def _remove_ns(self):
|
||||
priv_ip_lib.remove_netns(self.namespace)
|
||||
|
||||
def _check_rules(self, rules, parameters, values, exception_string):
|
||||
def _check_rules(self, rules, parameters, values, exception_string=None,
|
||||
raise_exception=True):
|
||||
for rule in rules:
|
||||
if all(rule.get(parameter) == value
|
||||
for parameter, value in zip(parameters, values)):
|
||||
break
|
||||
return True
|
||||
else:
|
||||
self.fail('Rule with %s was expected' % exception_string)
|
||||
if raise_exception:
|
||||
self.fail('Rule with %s was expected' % exception_string)
|
||||
else:
|
||||
return False
|
||||
|
||||
def test_add_rule_ip(self):
|
||||
ip_addresses = ['192.168.200.250', '2001::250']
|
||||
@ -132,6 +135,13 @@ class RuleTestCase(functional_base.BaseSudoTestCase):
|
||||
self._check_rules(rules, ['from'], [ip_address],
|
||||
'"from" IP address %s' % ip_address)
|
||||
|
||||
priv_ip_lib.delete_ip_rule(self.namespace, family=ip_family,
|
||||
src=ip_address, src_len=ip_lenght)
|
||||
rules = ip_lib.list_ip_rules(self.namespace, ip_version)
|
||||
self.assertFalse(
|
||||
self._check_rules(rules, ['from'], [ip_address],
|
||||
raise_exception=False))
|
||||
|
||||
def test_add_rule_iif(self):
|
||||
iif = 'iif_device_1'
|
||||
priv_ip_lib.create_interface(iif, self.namespace, 'dummy')
|
||||
@ -139,6 +149,11 @@ class RuleTestCase(functional_base.BaseSudoTestCase):
|
||||
rules = ip_lib.list_ip_rules(self.namespace, 4)
|
||||
self._check_rules(rules, ['iif'], [iif], 'iif name %s' % iif)
|
||||
|
||||
priv_ip_lib.delete_ip_rule(self.namespace, iifname=iif)
|
||||
rules = ip_lib.list_ip_rules(self.namespace, 4)
|
||||
self.assertFalse(
|
||||
self._check_rules(rules, ['iif'], [iif], raise_exception=False))
|
||||
|
||||
def test_add_rule_table(self):
|
||||
table = 212
|
||||
ip_addresses = ['192.168.200.251', '2001::251']
|
||||
@ -154,6 +169,15 @@ class RuleTestCase(functional_base.BaseSudoTestCase):
|
||||
rules, ['table', 'from'], [str(table), ip_address],
|
||||
'table %s and "from" IP address %s' % (table, ip_address))
|
||||
|
||||
priv_ip_lib.delete_ip_rule(self.namespace, table=table,
|
||||
src=ip_address, src_len=ip_lenght,
|
||||
family=ip_family)
|
||||
rules = ip_lib.list_ip_rules(self.namespace, ip_version)
|
||||
self.assertFalse(
|
||||
self._check_rules(rules, ['table', 'from'],
|
||||
[str(table), ip_address],
|
||||
raise_exception=False))
|
||||
|
||||
def test_add_rule_priority(self):
|
||||
priority = 12345
|
||||
ip_addresses = ['192.168.200.252', '2001::252']
|
||||
@ -170,6 +194,15 @@ class RuleTestCase(functional_base.BaseSudoTestCase):
|
||||
'priority %s and "from" IP address %s' %
|
||||
(priority, ip_address))
|
||||
|
||||
priv_ip_lib.delete_ip_rule(self.namespace, priority=priority,
|
||||
src=ip_address, src_len=ip_lenght,
|
||||
family=ip_family)
|
||||
rules = ip_lib.list_ip_rules(self.namespace, ip_version)
|
||||
self.assertFalse(
|
||||
self._check_rules(rules, ['priority', 'from'],
|
||||
[str(priority), ip_address],
|
||||
raise_exception=False))
|
||||
|
||||
def test_add_rule_priority_table_iif(self):
|
||||
table = 213
|
||||
priority = 12346
|
||||
@ -184,6 +217,14 @@ class RuleTestCase(functional_base.BaseSudoTestCase):
|
||||
[str(priority), iif, str(table)],
|
||||
'priority %s, table %s and iif name %s' % (priority, table, iif))
|
||||
|
||||
priv_ip_lib.delete_ip_rule(self.namespace, priority=priority,
|
||||
iifname=iif, table=table)
|
||||
rules = ip_lib.list_ip_rules(self.namespace, 4)
|
||||
self.assertFalse(
|
||||
self._check_rules(rules, ['priority', 'iif', 'table'],
|
||||
[str(priority), iif, str(table)],
|
||||
raise_exception=False))
|
||||
|
||||
@testtools.skip('https://github.com/svinota/pyroute2/issues/566')
|
||||
def test_add_rule_exists(self):
|
||||
iif = 'iif_device_1'
|
||||
|
@ -134,12 +134,10 @@ class BasicRouterOperationsFramework(base.BaseTestCase):
|
||||
self.mock_ip = mock.MagicMock()
|
||||
ip_cls.return_value = self.mock_ip
|
||||
|
||||
ip_rule = mock.patch('neutron.agent.linux.ip_lib.IPRule').start()
|
||||
self.mock_rule = mock.MagicMock()
|
||||
ip_rule.return_value = self.mock_rule
|
||||
|
||||
self.mock_add_ip_rule = mock.patch.object(ip_lib,
|
||||
'add_ip_rule').start()
|
||||
self.mock_add_ip_rule = mock.patch.object(ip_lib,
|
||||
'delete_ip_rule').start()
|
||||
|
||||
ip_dev = mock.patch('neutron.agent.linux.ip_lib.IPDevice').start()
|
||||
self.mock_ip_dev = mock.MagicMock()
|
||||
|
@ -100,9 +100,8 @@ class TestDvrRouterOperations(base.BaseTestCase):
|
||||
self.mock_ip = mock.MagicMock()
|
||||
ip_cls.return_value = self.mock_ip
|
||||
|
||||
ip_rule = mock.patch('neutron.agent.linux.ip_lib.IPRule').start()
|
||||
self.mock_rule = mock.MagicMock()
|
||||
ip_rule.return_value = self.mock_rule
|
||||
self.mock_delete_ip_rule = mock.patch.object(ip_lib,
|
||||
'delete_ip_rule').start()
|
||||
|
||||
ip_dev = mock.patch('neutron.agent.linux.ip_lib.IPDevice').start()
|
||||
self.mock_ip_dev = mock.MagicMock()
|
||||
@ -358,8 +357,7 @@ class TestDvrRouterOperations(base.BaseTestCase):
|
||||
|
||||
@mock.patch.object(ip_lib, 'IPWrapper')
|
||||
@mock.patch.object(ip_lib, 'IPDevice')
|
||||
@mock.patch.object(ip_lib, 'IPRule')
|
||||
def test_floating_ip_removed_dist(self, mIPRule, mIPDevice, mIPWrapper):
|
||||
def test_floating_ip_removed_dist(self, mIPDevice, mIPWrapper):
|
||||
router = mock.MagicMock()
|
||||
ri = self._create_router(router)
|
||||
ri.ex_gw_port = ri.router['gw_port']
|
||||
@ -385,14 +383,13 @@ class TestDvrRouterOperations(base.BaseTestCase):
|
||||
ri.rtr_fip_subnet = s
|
||||
ri.fip_ns.local_subnets = mock.Mock()
|
||||
ri.floating_ip_removed_dist(fip_cidr)
|
||||
mIPRule().rule.delete.assert_called_with(
|
||||
ip=fixed_ip, table=16, priority=FIP_PRI)
|
||||
self.mock_delete_ip_rule.assert_called_with(
|
||||
ri.router_namespace.name, ip=fixed_ip, table=16, priority=FIP_PRI)
|
||||
mIPDevice().route.delete_route.assert_called_with(fip_cidr, str(s.ip))
|
||||
ri.fip_ns.local_subnets.allocate.assert_not_called()
|
||||
|
||||
@mock.patch.object(ip_lib, 'IPRule')
|
||||
@mock.patch.object(ip_lib, 'add_ip_rule')
|
||||
def test_floating_ip_moved_dist(self, mock_add_ip_rule, mIPRule):
|
||||
def test_floating_ip_moved_dist(self, mock_add_ip_rule):
|
||||
router = mock.MagicMock()
|
||||
ri = self._create_router(router)
|
||||
floating_ip_address = '15.1.2.3'
|
||||
@ -404,8 +401,8 @@ class TestDvrRouterOperations(base.BaseTestCase):
|
||||
ri.fip_ns.allocate_rule_priority.return_value = FIP_PRI
|
||||
ri.floating_ip_moved_dist(fip)
|
||||
|
||||
mIPRule().rule.delete.assert_called_once_with(
|
||||
ip=fixed_ip, table=16, priority=FIP_PRI)
|
||||
self.mock_delete_ip_rule.assert_called_once_with(
|
||||
ri.router_namespace.name, ip=fixed_ip, table=16, priority=FIP_PRI)
|
||||
ri.fip_ns.deallocate_rule_priority.assert_called_once_with(
|
||||
floating_ip_address)
|
||||
ri.fip_ns.allocate_rule_priority.assert_called_once_with(
|
||||
|
@ -606,9 +606,9 @@ class TestIpRuleCommand(TestIPCmdBase):
|
||||
def setUp(self):
|
||||
super(TestIpRuleCommand, self).setUp()
|
||||
self.parent._as_root.return_value = ''
|
||||
self.parent.namespace = uuidutils.generate_uuid()
|
||||
self.ns = uuidutils.generate_uuid()
|
||||
self.parent.namespace = self.ns
|
||||
self.command = 'rule'
|
||||
self.rule_cmd = ip_lib.IpRuleCommand(self.parent)
|
||||
self._mock_priv_list_ip_rules = mock.patch.object(priv_lib,
|
||||
'list_ip_rules')
|
||||
self.mock_priv_list_ip_rules = self._mock_priv_list_ip_rules.start()
|
||||
@ -637,54 +637,19 @@ class TestIpRuleCommand(TestIPCmdBase):
|
||||
|
||||
def _test_add_rule_exists(self, ip, table, priority, output):
|
||||
self.parent._as_root.return_value = output
|
||||
with mock.patch.object(self.rule_cmd, '_exists', return_value=True) \
|
||||
with mock.patch.object(ip_lib, '_exist_ip_rule', return_value=True) \
|
||||
as mock_exists:
|
||||
self.rule_cmd.add(ip, table=table, priority=priority)
|
||||
ip_lib.add_ip_rule(self.ns, ip, table=table, priority=priority)
|
||||
kwargs = {'from': ip, 'priority': str(priority),
|
||||
'table': str(table), 'type': 'unicast'}
|
||||
mock_exists.assert_called_once_with(
|
||||
common_utils.get_ip_version(ip), **kwargs)
|
||||
|
||||
def _test_delete_rule(self, ip, table, priority):
|
||||
ip_version = netaddr.IPNetwork(ip).version
|
||||
self.rule_cmd.delete(ip, table=table, priority=priority)
|
||||
self._assert_sudo([ip_version],
|
||||
('del', 'from', ip, 'priority', str(priority),
|
||||
'table', str(table), 'type', 'unicast'))
|
||||
|
||||
def test__make_canonical_all_v4(self):
|
||||
actual = self.rule_cmd._make_canonical(4, {'from': 'all'})
|
||||
self.assertEqual({'from': '0.0.0.0/0', 'type': 'unicast'}, actual)
|
||||
|
||||
def test__make_canonical_all_v6(self):
|
||||
actual = self.rule_cmd._make_canonical(6, {'from': 'all'})
|
||||
self.assertEqual({'from': '::/0', 'type': 'unicast'}, actual)
|
||||
|
||||
def test__make_canonical_lookup(self):
|
||||
actual = self.rule_cmd._make_canonical(6, {'lookup': 'table'})
|
||||
self.assertEqual({'table': 'table', 'type': 'unicast'}, actual)
|
||||
|
||||
def test__make_canonical_iif(self):
|
||||
actual = self.rule_cmd._make_canonical(6, {'iif': 'iface_name'})
|
||||
self.assertEqual({'iif': 'iface_name', 'type': 'unicast'}, actual)
|
||||
|
||||
def test__make_canonical_fwmark(self):
|
||||
actual = self.rule_cmd._make_canonical(6, {'fwmark': '0x400'})
|
||||
self.assertEqual({'fwmark': '0x400/0xffffffff',
|
||||
'type': 'unicast'}, actual)
|
||||
|
||||
def test__make_canonical_fwmark_with_mask(self):
|
||||
actual = self.rule_cmd._make_canonical(6, {'fwmark': '0x400/0x00ff'})
|
||||
self.assertEqual({'fwmark': '0x400/0xff', 'type': 'unicast'}, actual)
|
||||
|
||||
def test__make_canonical_fwmark_integer(self):
|
||||
actual = self.rule_cmd._make_canonical(6, {'fwmark': 0x400})
|
||||
self.assertEqual({'fwmark': '0x400/0xffffffff',
|
||||
'type': 'unicast'}, actual)
|
||||
|
||||
def test__make_canonical_fwmark_iterable(self):
|
||||
actual = self.rule_cmd._make_canonical(6, {'fwmark': (0x400, 0xffff)})
|
||||
self.assertEqual({'fwmark': '0x400/0xffff', 'type': 'unicast'}, actual)
|
||||
with mock.patch.object(priv_lib, 'delete_ip_rule') as mock_delete:
|
||||
ip_lib.delete_ip_rule(self.ns, ip, table=table, priority=priority)
|
||||
args = ip_lib._make_pyroute2_args(ip, None, table, priority, None)
|
||||
mock_delete.assert_called_with(self.ns, **args)
|
||||
|
||||
def test_add_rule_v4(self):
|
||||
self._test_add_rule('192.168.45.100', None, 2, 100)
|
||||
|
Loading…
x
Reference in New Issue
Block a user