Implement "ip route" commands using Pyroute2
Commands implemented: * Add route * List routes Related-Bug: #1492714 Change-Id: I5e5e9f6981024317773979d9d2d77db3f5e7ec98
This commit is contained in:
parent
d35dd9c9c8
commit
0699713609
@ -785,8 +785,8 @@ class RouterInfo(BaseRouterInfo):
|
||||
for ip_version in (lib_constants.IP_VERSION_4,
|
||||
lib_constants.IP_VERSION_6):
|
||||
gateway = device.route.get_gateway(ip_version=ip_version)
|
||||
if gateway and gateway.get('gateway'):
|
||||
current_gateways.add(gateway.get('gateway'))
|
||||
if gateway and gateway.get('via'):
|
||||
current_gateways.add(gateway.get('via'))
|
||||
for ip in current_gateways - set(gateway_ips):
|
||||
device.route.delete_gateway(ip)
|
||||
for ip in gateway_ips:
|
||||
|
@ -49,6 +49,8 @@ IP_RULE_TABLES = {'default': 253,
|
||||
'main': 254,
|
||||
'local': 255}
|
||||
|
||||
IP_RULE_TABLES_NAMES = {v: k for k, v in IP_RULE_TABLES.items()}
|
||||
|
||||
# Rule indexes: pyroute2.netlink.rtnl
|
||||
# Rule names: https://www.systutorials.com/docs/linux/man/8-ip-rule/
|
||||
# NOTE(ralonsoh): 'masquerade' type is printed as 'nat' in 'ip rule' command
|
||||
@ -596,14 +598,9 @@ class IpRouteCommand(IpDeviceCommandBase):
|
||||
def _dev_args(self):
|
||||
return ['dev', self.name] if self.name else []
|
||||
|
||||
def add_gateway(self, gateway, metric=None, table=None):
|
||||
ip_version = common_utils.get_ip_version(gateway)
|
||||
args = ['replace', 'default', 'via', gateway]
|
||||
if metric:
|
||||
args += ['metric', metric]
|
||||
args += self._dev_args()
|
||||
args += self._table_args(table)
|
||||
self._as_root([ip_version], tuple(args))
|
||||
def add_gateway(self, gateway, metric=None, table=None, scope='global'):
|
||||
self.add_route(None, via=gateway, table=table, metric=metric,
|
||||
scope=scope)
|
||||
|
||||
def _run_as_root_detect_device_not_found(self, options, args):
|
||||
try:
|
||||
@ -622,41 +619,15 @@ class IpRouteCommand(IpDeviceCommandBase):
|
||||
args += self._table_args(table)
|
||||
self._run_as_root_detect_device_not_found([ip_version], args)
|
||||
|
||||
def _parse_routes(self, ip_version, output, **kwargs):
|
||||
for line in output.splitlines():
|
||||
parts = line.split()
|
||||
|
||||
# Format of line is: "<cidr>|default [<key> <value>] ..."
|
||||
route = {k: v for k, v in zip(parts[1::2], parts[2::2])}
|
||||
route['cidr'] = parts[0]
|
||||
# Avoids having to explicitly pass around the IP version
|
||||
if route['cidr'] == 'default':
|
||||
route['cidr'] = constants.IP_ANY[ip_version]
|
||||
|
||||
# ip route drops things like scope and dev from the output if it
|
||||
# was specified as a filter. This allows us to add them back.
|
||||
if self.name:
|
||||
route['dev'] = self.name
|
||||
if self._table:
|
||||
route['table'] = self._table
|
||||
# Callers add any filters they use as kwargs
|
||||
route.update(kwargs)
|
||||
|
||||
yield route
|
||||
|
||||
def list_routes(self, ip_version, **kwargs):
|
||||
args = ['list']
|
||||
args += self._dev_args()
|
||||
args += self._table_args()
|
||||
for k, v in kwargs.items():
|
||||
args += [k, v]
|
||||
|
||||
output = self._run([ip_version], tuple(args))
|
||||
return [r for r in self._parse_routes(ip_version, output, **kwargs)]
|
||||
def list_routes(self, ip_version, scope=None, via=None, table=None,
|
||||
**kwargs):
|
||||
table = table or self._table
|
||||
return list_ip_routes(self._parent.namespace, ip_version, scope=scope,
|
||||
via=via, table=table, device=self.name, **kwargs)
|
||||
|
||||
def list_onlink_routes(self, ip_version):
|
||||
routes = self.list_routes(ip_version, scope='link')
|
||||
return [r for r in routes if 'src' not in r]
|
||||
return [r for r in routes if not r['source_prefix']]
|
||||
|
||||
def add_onlink_route(self, cidr):
|
||||
self.add_route(cidr, scope='link')
|
||||
@ -664,34 +635,12 @@ class IpRouteCommand(IpDeviceCommandBase):
|
||||
def delete_onlink_route(self, cidr):
|
||||
self.delete_route(cidr, scope='link')
|
||||
|
||||
def get_gateway(self, scope=None, filters=None, ip_version=None):
|
||||
options = [ip_version] if ip_version else []
|
||||
|
||||
args = ['list']
|
||||
args += self._dev_args()
|
||||
args += self._table_args()
|
||||
if filters:
|
||||
args += filters
|
||||
|
||||
retval = None
|
||||
|
||||
if scope:
|
||||
args += ['scope', scope]
|
||||
|
||||
route_list_lines = self._run(options, tuple(args)).split('\n')
|
||||
default_route_line = next((x.strip() for x in
|
||||
route_list_lines if
|
||||
x.strip().startswith('default')), None)
|
||||
if default_route_line:
|
||||
retval = dict()
|
||||
gateway = DEFAULT_GW_PATTERN.search(default_route_line)
|
||||
if gateway:
|
||||
retval.update(gateway=gateway.group(1))
|
||||
metric = METRIC_PATTERN.search(default_route_line)
|
||||
if metric:
|
||||
retval.update(metric=int(metric.group(1)))
|
||||
|
||||
return retval
|
||||
def get_gateway(self, scope=None, table=None,
|
||||
ip_version=constants.IP_VERSION_4):
|
||||
routes = self.list_routes(ip_version, scope=scope, table=table)
|
||||
for route in routes:
|
||||
if route['via'] and route['cidr'] in constants.IP_ANY.values():
|
||||
return route
|
||||
|
||||
def flush(self, ip_version, table=None, **kwargs):
|
||||
args = ['flush']
|
||||
@ -700,16 +649,11 @@ class IpRouteCommand(IpDeviceCommandBase):
|
||||
args += [k, v]
|
||||
self._as_root([ip_version], tuple(args))
|
||||
|
||||
def add_route(self, cidr, via=None, table=None, **kwargs):
|
||||
ip_version = common_utils.get_ip_version(cidr)
|
||||
args = ['replace', cidr]
|
||||
if via:
|
||||
args += ['via', via]
|
||||
args += self._dev_args()
|
||||
args += self._table_args(table)
|
||||
for k, v in kwargs.items():
|
||||
args += [k, v]
|
||||
self._run_as_root_detect_device_not_found([ip_version], args)
|
||||
def add_route(self, cidr, via=None, table=None, metric=None, scope=None,
|
||||
**kwargs):
|
||||
table = table or self._table
|
||||
add_ip_route(self._parent.namespace, cidr, device=self.name, via=via,
|
||||
table=table, metric=metric, scope=scope, **kwargs)
|
||||
|
||||
def delete_route(self, cidr, via=None, table=None, **kwargs):
|
||||
ip_version = common_utils.get_ip_version(cidr)
|
||||
@ -1536,3 +1480,54 @@ def ip_monitor(namespace, queue, event_stop, event_started):
|
||||
if e.errno == errno.ENOENT:
|
||||
raise privileged.NetworkNamespaceNotFound(netns_name=namespace)
|
||||
raise
|
||||
|
||||
|
||||
def add_ip_route(namespace, cidr, device=None, via=None, table=None,
|
||||
metric=None, scope=None, **kwargs):
|
||||
"""Add an IP route"""
|
||||
if table:
|
||||
table = IP_RULE_TABLES.get(table, table)
|
||||
ip_version = common_utils.get_ip_version(cidr or via)
|
||||
privileged.add_ip_route(namespace, cidr, ip_version,
|
||||
device=device, via=via, table=table,
|
||||
metric=metric, scope=scope, **kwargs)
|
||||
|
||||
|
||||
def list_ip_routes(namespace, ip_version, scope=None, via=None, table=None,
|
||||
device=None, **kwargs):
|
||||
"""List IP routes"""
|
||||
def get_device(index, devices):
|
||||
for device in (d for d in devices if d['index'] == index):
|
||||
return get_attr(device, 'IFLA_IFNAME')
|
||||
|
||||
table = table if table else 'main'
|
||||
table = IP_RULE_TABLES.get(table, table)
|
||||
routes = privileged.list_ip_routes(namespace, ip_version, device=device,
|
||||
table=table, **kwargs)
|
||||
devices = privileged.get_link_devices(namespace)
|
||||
ret = []
|
||||
for route in routes:
|
||||
cidr = get_attr(route, 'RTA_DST')
|
||||
if cidr:
|
||||
cidr = '%s/%s' % (cidr, route['dst_len'])
|
||||
else:
|
||||
cidr = constants.IP_ANY[ip_version]
|
||||
table = int(get_attr(route, 'RTA_TABLE'))
|
||||
value = {
|
||||
'table': IP_RULE_TABLES_NAMES.get(table, table),
|
||||
'source_prefix': get_attr(route, 'RTA_PREFSRC'),
|
||||
'cidr': cidr,
|
||||
'scope': IP_ADDRESS_SCOPE[int(route['scope'])],
|
||||
'device': get_device(int(get_attr(route, 'RTA_OIF')), devices),
|
||||
'via': get_attr(route, 'RTA_GATEWAY'),
|
||||
'priority': get_attr(route, 'RTA_PRIORITY'),
|
||||
}
|
||||
|
||||
ret.append(value)
|
||||
|
||||
if scope:
|
||||
ret = [route for route in ret if route['scope'] == scope]
|
||||
if via:
|
||||
ret = [route for route in ret if route['via'] == via]
|
||||
|
||||
return ret
|
||||
|
@ -364,7 +364,7 @@ def keepalived_ipv6_supported():
|
||||
|
||||
default_gw = gw_dev.route.get_gateway(ip_version=6)
|
||||
if default_gw:
|
||||
default_gw = default_gw['gateway']
|
||||
default_gw = default_gw['via']
|
||||
|
||||
return expected_default_gw == default_gw
|
||||
|
||||
|
@ -648,3 +648,50 @@ def delete_ip_rule(namespace, **kwargs):
|
||||
if e.errno == errno.ENOENT:
|
||||
raise NetworkNamespaceNotFound(netns_name=namespace)
|
||||
raise
|
||||
|
||||
|
||||
@privileged.default.entrypoint
|
||||
@lockutils.synchronized("privileged-ip-lib")
|
||||
def add_ip_route(namespace, cidr, ip_version, device=None, via=None,
|
||||
table=None, metric=None, scope=None, **kwargs):
|
||||
"""Add an IP route"""
|
||||
try:
|
||||
with get_iproute(namespace) as ip:
|
||||
family = _IP_VERSION_FAMILY_MAP[ip_version]
|
||||
if not scope:
|
||||
scope = 'global' if via else 'link'
|
||||
scope = _get_scope_name(scope)
|
||||
if cidr:
|
||||
kwargs['dst'] = cidr
|
||||
if via:
|
||||
kwargs['gateway'] = via
|
||||
if table:
|
||||
kwargs['table'] = int(table)
|
||||
if device:
|
||||
kwargs['oif'] = get_link_id(device, namespace)
|
||||
if metric:
|
||||
kwargs['priority'] = int(metric)
|
||||
ip.route('replace', family=family, scope=scope, proto='static',
|
||||
**kwargs)
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
raise NetworkNamespaceNotFound(netns_name=namespace)
|
||||
raise
|
||||
|
||||
|
||||
@privileged.default.entrypoint
|
||||
@lockutils.synchronized("privileged-ip-lib")
|
||||
def list_ip_routes(namespace, ip_version, device=None, table=None, **kwargs):
|
||||
"""List IP routes"""
|
||||
try:
|
||||
with get_iproute(namespace) as ip:
|
||||
family = _IP_VERSION_FAMILY_MAP[ip_version]
|
||||
if table:
|
||||
kwargs['table'] = table
|
||||
if device:
|
||||
kwargs['oif'] = get_link_id(device, namespace)
|
||||
return make_serializable(ip.route('show', family=family, **kwargs))
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
raise NetworkNamespaceNotFound(netns_name=namespace)
|
||||
raise
|
||||
|
@ -180,7 +180,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
|
||||
gateway_info = self.port.route.get_gateway()
|
||||
if not gateway_info:
|
||||
return False
|
||||
return gateway_info.get('gateway') == self.gateway_ip
|
||||
return gateway_info.get('via') == self.gateway_ip
|
||||
|
||||
def block_until_boot(self):
|
||||
utils.wait_until_true(
|
||||
|
@ -196,8 +196,8 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
|
||||
def _gateway_check(self, gateway_ip, external_device):
|
||||
expected_gateway = gateway_ip
|
||||
ip_vers = netaddr.IPAddress(expected_gateway).version
|
||||
existing_gateway = (external_device.route.get_gateway(
|
||||
ip_version=ip_vers).get('gateway'))
|
||||
existing_gateway = external_device.route.get_gateway(
|
||||
ip_version=ip_vers).get('via')
|
||||
self.assertEqual(expected_gateway, existing_gateway)
|
||||
|
||||
def _assert_ha_device(self, router):
|
||||
|
@ -78,6 +78,11 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
self._dvr_router_lifecycle(enable_ha=True, enable_snat=True,
|
||||
snat_bound_fip=True)
|
||||
|
||||
def _check_routes(self, expected_routes, actual_routes):
|
||||
actual_routes = [{key: route[key] for key in expected_routes[0].keys()}
|
||||
for route in actual_routes]
|
||||
self.assertEqual(expected_routes, actual_routes)
|
||||
|
||||
def _helper_create_dvr_router_fips_for_ext_network(
|
||||
self, agent_mode, **dvr_router_kwargs):
|
||||
self.agent.conf.agent_mode = agent_mode
|
||||
@ -141,9 +146,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
# Now validate if the gateway is properly configured.
|
||||
rtr_2_fip, fip_2_rtr = router.rtr_fip_subnet.get_pair()
|
||||
tbl_index = router._get_snat_idx(fip_2_rtr)
|
||||
tbl_filter = ['table', tbl_index]
|
||||
self.assertIn('gateway', fg_device.route.get_gateway(
|
||||
filters=tbl_filter))
|
||||
self.assertIn('via', fg_device.route.get_gateway(table=tbl_index))
|
||||
self._validate_fips_for_external_network(
|
||||
router, router.fip_ns.get_name())
|
||||
# Now delete the fg- port that was created
|
||||
@ -169,10 +172,10 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
ip_version=lib_constants.IP_VERSION_4,
|
||||
table=tbl_index)
|
||||
expected_route = [{'cidr': '0.0.0.0/0',
|
||||
'dev': fg_port_name,
|
||||
'device': fg_port_name,
|
||||
'table': tbl_index,
|
||||
u'via': u'19.4.4.2'}]
|
||||
self.assertEqual(expected_route, updated_route)
|
||||
'via': '19.4.4.2'}]
|
||||
self._check_routes(expected_route, updated_route)
|
||||
self._validate_fips_for_external_network(
|
||||
router, router.fip_ns.get_name())
|
||||
self._delete_router(self.agent, router.router_id)
|
||||
@ -194,9 +197,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
# Now validate if the gateway is properly configured.
|
||||
rtr_2_fip, fip_2_rtr = router.rtr_fip_subnet.get_pair()
|
||||
tbl_index = router._get_snat_idx(fip_2_rtr)
|
||||
tbl_filter = ['table', tbl_index]
|
||||
self.assertIn('gateway', fg_device.route.get_gateway(
|
||||
filters=tbl_filter))
|
||||
self.assertIn('via', fg_device.route.get_gateway(table=tbl_index))
|
||||
self._validate_fips_for_external_network(
|
||||
router, router.fip_ns.get_name())
|
||||
# Now delete the fg- port that was created
|
||||
@ -221,10 +222,10 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
ip_version=lib_constants.IP_VERSION_4,
|
||||
table=tbl_index)
|
||||
expected_route = [{'cidr': '0.0.0.0/0',
|
||||
'dev': fg_port_name,
|
||||
'device': fg_port_name,
|
||||
'table': tbl_index,
|
||||
u'via': u'19.4.4.2'}]
|
||||
self.assertEqual(expected_route, updated_route)
|
||||
'via': '19.4.4.2'}]
|
||||
self._check_routes(expected_route, updated_route)
|
||||
self._validate_fips_for_external_network(
|
||||
router, router.fip_ns.get_name())
|
||||
self._delete_router(self.agent, router.router_id)
|
||||
@ -268,10 +269,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
namespace=router.fip_ns.name)
|
||||
rtr_2_fip, fip_2_rtr = router.rtr_fip_subnet.get_pair()
|
||||
tbl_index = router._get_snat_idx(fip_2_rtr)
|
||||
tbl_filter = ['table', tbl_index]
|
||||
# Now validate if the gateway is properly configured.
|
||||
self.assertIn('gateway', fg_device.route.get_gateway(
|
||||
filters=tbl_filter))
|
||||
self.assertIn('via', fg_device.route.get_gateway(table=tbl_index))
|
||||
self._validate_fips_for_external_network(
|
||||
router, router.fip_ns.get_name())
|
||||
self._delete_router(self.agent, router.router_id)
|
||||
@ -730,7 +729,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
external_device = ip_lib.IPDevice(external_device_name,
|
||||
namespace=namespace)
|
||||
existing_gateway = (
|
||||
external_device.route.get_gateway().get('gateway'))
|
||||
external_device.route.get_gateway().get('via'))
|
||||
expected_gateway = external_port['subnets'][0]['gateway_ip']
|
||||
self.assertEqual(expected_gateway, existing_gateway)
|
||||
|
||||
@ -848,14 +847,15 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
self.assertEqual(0, fip_rule_count)
|
||||
|
||||
def _assert_default_gateway(self, fip_2_rtr, rfp_device, device_name):
|
||||
expected_gateway = [{'dev': device_name,
|
||||
expected_gateway = [{'device': device_name,
|
||||
'cidr': '0.0.0.0/0',
|
||||
'via': str(fip_2_rtr.ip),
|
||||
'table': dvr_fip_ns.FIP_RT_TBL}]
|
||||
self.assertEqual(expected_gateway, rfp_device.route.list_routes(
|
||||
listed_routes = rfp_device.route.list_routes(
|
||||
ip_version=lib_constants.IP_VERSION_4,
|
||||
table=dvr_fip_ns.FIP_RT_TBL,
|
||||
via=str(fip_2_rtr.ip)))
|
||||
via=str(fip_2_rtr.ip))
|
||||
self._check_routes(expected_gateway, listed_routes)
|
||||
|
||||
def test_dvr_router_rem_fips_on_restarted_agent(self):
|
||||
self.agent.conf.agent_mode = 'dvr_snat'
|
||||
@ -1683,12 +1683,12 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
fip_ns_int_name = router.fip_ns.get_int_device_name(router.router_id)
|
||||
fg_device = ip_lib.IPDevice(fg_port_name,
|
||||
namespace=fip_ns_name)
|
||||
tbl_filter = ['table', router_fip_table_idx]
|
||||
if not check_fpr_int_rule_delete:
|
||||
self.assertIn('gateway', fg_device.route.get_gateway(
|
||||
filters=tbl_filter))
|
||||
self.assertIn('via', fg_device.route.get_gateway(
|
||||
table=router_fip_table_idx))
|
||||
else:
|
||||
self.assertIsNone(fg_device.route.get_gateway(filters=tbl_filter))
|
||||
self.assertIsNone(fg_device.route.get_gateway(
|
||||
table=router_fip_table_idx))
|
||||
|
||||
ext_net_fw_rules_list = ip_lib.list_ip_rules(
|
||||
fip_ns_name, lib_constants.IP_VERSION_4)
|
||||
@ -1715,10 +1715,10 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
table=router_fip_table_idx,
|
||||
via=str(next_hop))
|
||||
expected_extra_route = [{'cidr': six.u(destination),
|
||||
'dev': fg_port_name,
|
||||
'device': fg_port_name,
|
||||
'table': router_fip_table_idx,
|
||||
'via': next_hop}]
|
||||
self.assertEqual(expected_extra_route, actual_routes)
|
||||
self._check_routes(expected_extra_route, actual_routes)
|
||||
else:
|
||||
# When floatingip are deleted or disassociated, make sure that the
|
||||
# corresponding rules and routes are cleared from the table
|
||||
@ -1776,17 +1776,17 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
route_cidr_2 = (
|
||||
str(net_addr_2) + '/' +
|
||||
str(fixed_ips_2[0]['prefixlen']))
|
||||
expected_routes = [{'dev': fpr_device_name,
|
||||
expected_routes = [{'device': fpr_device_name,
|
||||
'cidr': six.u(route_cidr_1),
|
||||
'via': str(rtr_2_fip.ip),
|
||||
'table': 'main'},
|
||||
{'dev': fpr_device_name,
|
||||
{'device': fpr_device_name,
|
||||
'cidr': six.u(route_cidr_2),
|
||||
'via': str(rtr_2_fip.ip),
|
||||
'table': 'main'}]
|
||||
# Comparing the static routes for both internal interfaces on the
|
||||
# main table.
|
||||
self.assertEqual(expected_routes, actual_routes)
|
||||
self._check_routes(expected_routes, actual_routes)
|
||||
else:
|
||||
self.assertEqual([], actual_routes)
|
||||
|
||||
@ -1962,10 +1962,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
|
||||
rtr_2_fip, fip_2_rtr = router.rtr_fip_subnet.get_pair()
|
||||
tbl_index = router._get_snat_idx(fip_2_rtr)
|
||||
|
||||
self.assertIn('gateway', ex_gw_device.route.get_gateway())
|
||||
tbl_filter = ['table', tbl_index]
|
||||
self.assertIn('gateway', fg_device.route.get_gateway(
|
||||
filters=tbl_filter))
|
||||
self.assertIn('via', ex_gw_device.route.get_gateway())
|
||||
self.assertIn('via', fg_device.route.get_gateway(table=tbl_index))
|
||||
|
||||
# Make this copy to make agent think gw_port changed.
|
||||
router.ex_gw_port = copy.deepcopy(router.ex_gw_port)
|
||||
|
@ -98,7 +98,7 @@ class L3AgentTestCase(framework.L3AgentTestFramework):
|
||||
gw_port = router.get_ex_gw_port()
|
||||
interface_name = router.get_external_device_name(gw_port['id'])
|
||||
device = ip_lib.IPDevice(interface_name, namespace=router.ns_name)
|
||||
self.assertIn('gateway', device.route.get_gateway())
|
||||
self.assertIn('via', device.route.get_gateway())
|
||||
|
||||
# Make this copy, so that the agent will think there is change in
|
||||
# external gateway port.
|
||||
|
@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import itertools
|
||||
import signal
|
||||
|
||||
import netaddr
|
||||
@ -24,6 +25,7 @@ from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import uuidutils
|
||||
from pyroute2.iproute import linux as iproute_linux
|
||||
import testscenarios
|
||||
import testtools
|
||||
|
||||
@ -99,6 +101,11 @@ class IpLibTestFramework(functional_base.BaseSudoTestCase):
|
||||
|
||||
class IpLibTestCase(IpLibTestFramework):
|
||||
|
||||
def _check_routes(self, expected_routes, actual_routes):
|
||||
actual_routes = [{key: route[key] for key in expected_routes[0].keys()}
|
||||
for route in actual_routes]
|
||||
self.assertEqual(expected_routes, actual_routes)
|
||||
|
||||
def test_rules_lifecycle(self):
|
||||
PRIORITY = 32768
|
||||
TABLE = 16
|
||||
@ -316,19 +323,17 @@ class IpLibTestCase(IpLibTestFramework):
|
||||
}
|
||||
expected_gateways = {
|
||||
constants.IP_VERSION_4: {
|
||||
'metric': metric,
|
||||
'gateway': gateways[constants.IP_VERSION_4]},
|
||||
'priority': metric,
|
||||
'via': gateways[constants.IP_VERSION_4]},
|
||||
constants.IP_VERSION_6: {
|
||||
'metric': metric,
|
||||
'gateway': gateways[constants.IP_VERSION_6]}}
|
||||
'priority': metric,
|
||||
'via': gateways[constants.IP_VERSION_6]}}
|
||||
|
||||
for ip_version, gateway_ip in gateways.items():
|
||||
device.route.add_gateway(gateway_ip, metric)
|
||||
|
||||
self.assertEqual(
|
||||
expected_gateways[ip_version],
|
||||
device.route.get_gateway(ip_version=ip_version))
|
||||
|
||||
self._check_routes(
|
||||
[expected_gateways[ip_version]],
|
||||
[device.route.get_gateway(ip_version=ip_version)])
|
||||
device.route.delete_gateway(gateway_ip)
|
||||
self.assertIsNone(
|
||||
device.route.get_gateway(ip_version=ip_version))
|
||||
@ -824,3 +829,119 @@ class IpMonitorTestCase(testscenarios.WithScenarios,
|
||||
|
||||
self._handle_ip_addresses('removed', ip_addresses)
|
||||
self._check_read_file(ip_addresses)
|
||||
|
||||
|
||||
class IpRouteCommandTestCase(functional_base.BaseSudoTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(IpRouteCommandTestCase, self).setUp()
|
||||
self.namespace = self.useFixture(net_helpers.NamespaceFixture()).name
|
||||
ip_lib.IPWrapper(self.namespace).add_dummy('test_device')
|
||||
self.device = ip_lib.IPDevice('test_device', namespace=self.namespace)
|
||||
self.device.link.set_up()
|
||||
self.device_cidr_ipv4 = '192.168.100.1/24'
|
||||
self.device_cidr_ipv6 = '2020::1/64'
|
||||
self.device.addr.add(self.device_cidr_ipv4)
|
||||
self.device.addr.add(self.device_cidr_ipv6)
|
||||
self.cidrs = ['192.168.0.0/24', '10.0.0.0/8', '2001::/64', 'faaa::/96']
|
||||
|
||||
def _assert_route(self, ip_version, table=None, source_prefix=None,
|
||||
cidr=None, scope=None, via=None, metric=None):
|
||||
if cidr:
|
||||
ip_version = utils.get_ip_version(cidr)
|
||||
else:
|
||||
ip_version = utils.get_ip_version(via)
|
||||
cidr = constants.IP_ANY[ip_version]
|
||||
if constants.IP_VERSION_6 == ip_version:
|
||||
scope = ip_lib.IP_ADDRESS_SCOPE[0]
|
||||
elif not scope:
|
||||
scope = 'global' if via else 'link'
|
||||
if ip_version == constants.IP_VERSION_6 and not metric:
|
||||
metric = 1024
|
||||
table = table or iproute_linux.DEFAULT_TABLE
|
||||
table = ip_lib.IP_RULE_TABLES_NAMES.get(table, table)
|
||||
cmp = {'table': table,
|
||||
'cidr': cidr,
|
||||
'source_prefix': source_prefix,
|
||||
'scope': scope,
|
||||
'device': 'test_device',
|
||||
'via': via,
|
||||
'priority': metric}
|
||||
try:
|
||||
utils.wait_until_true(lambda: cmp in self.device.route.list_routes(
|
||||
ip_version, table=table), timeout=5)
|
||||
except utils.WaitTimeout:
|
||||
raise self.fail('Route not found: %s' % cmp)
|
||||
return True
|
||||
|
||||
def test_add_route_table(self):
|
||||
tables = (None, 1, 253, 254, 255)
|
||||
for cidr in self.cidrs:
|
||||
for table in tables:
|
||||
self.device.route.add_route(cidr, table=table)
|
||||
ip_version = utils.get_ip_version(cidr)
|
||||
self._assert_route(ip_version, cidr=cidr, table=table)
|
||||
|
||||
def test_add_route_via(self):
|
||||
gateway_ipv4 = str(netaddr.IPNetwork(self.device_cidr_ipv4).ip)
|
||||
gateway_ipv6 = str(netaddr.IPNetwork(self.device_cidr_ipv6).ip + 1)
|
||||
for cidr in self.cidrs:
|
||||
ip_version = utils.get_ip_version(cidr)
|
||||
gateway = (gateway_ipv4 if ip_version == constants.IP_VERSION_4
|
||||
else gateway_ipv6)
|
||||
self.device.route.add_route(cidr, via=gateway)
|
||||
self._assert_route(ip_version, cidr=cidr, via=gateway)
|
||||
|
||||
def test_add_route_metric(self):
|
||||
metrics = (None, 1, 10, 255)
|
||||
for cidr in self.cidrs:
|
||||
for metric in metrics:
|
||||
self.device.route.add_route(cidr, metric=metric)
|
||||
ip_version = utils.get_ip_version(cidr)
|
||||
self._assert_route(ip_version, cidr=cidr, metric=metric)
|
||||
|
||||
def test_add_route_scope(self):
|
||||
for cidr in self.cidrs:
|
||||
for scope in ip_lib.IP_ADDRESS_SCOPE_NAME:
|
||||
self.device.route.add_route(cidr, scope=scope)
|
||||
ip_version = utils.get_ip_version(cidr)
|
||||
self._assert_route(ip_version, cidr=cidr, scope=scope)
|
||||
|
||||
def test_add_route_gateway(self):
|
||||
gateways = (str(netaddr.IPNetwork(self.device_cidr_ipv4).ip),
|
||||
str(netaddr.IPNetwork(self.device_cidr_ipv6).ip + 1))
|
||||
for gateway in gateways:
|
||||
ip_version = utils.get_ip_version(gateway)
|
||||
self.device.route.add_gateway(gateway)
|
||||
self._assert_route(ip_version, cidr=None, via=gateway,
|
||||
scope='global')
|
||||
|
||||
def test_list_onlink_routes_ipv4(self):
|
||||
cidr_ipv4 = []
|
||||
for cidr in self.cidrs:
|
||||
if utils.get_ip_version(cidr) == constants.IP_VERSION_4:
|
||||
cidr_ipv4.append(cidr)
|
||||
self.device.route.add_onlink_route(cidr)
|
||||
|
||||
for cidr in cidr_ipv4:
|
||||
self._assert_route(constants.IP_VERSION_4, cidr=cidr)
|
||||
|
||||
routes = self.device.route.list_onlink_routes(constants.IP_VERSION_4)
|
||||
self.assertEqual(len(cidr_ipv4), len(routes))
|
||||
|
||||
def test_get_gateway(self):
|
||||
gateways = (str(netaddr.IPNetwork(self.device_cidr_ipv4).ip),
|
||||
str(netaddr.IPNetwork(self.device_cidr_ipv6).ip + 1))
|
||||
scopes = ('global', 'site', 'link')
|
||||
metrics = (None, 1, 255)
|
||||
tables = (None, 1, 254, 255)
|
||||
for gateway, scope, metric, table in itertools.product(
|
||||
gateways, scopes, metrics, tables):
|
||||
ip_version = utils.get_ip_version(gateway)
|
||||
self.device.route.add_gateway(gateway, scope=scope, metric=metric,
|
||||
table=table)
|
||||
self._assert_route(ip_version, cidr=None, via=gateway, scope=scope,
|
||||
metric=metric, table=table)
|
||||
self.assertEqual(gateway, self.device.route.get_gateway(
|
||||
ip_version=ip_version, table=table)['via'])
|
||||
self.device.route.delete_gateway(gateway, table=table)
|
||||
|
@ -12,12 +12,17 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import netaddr
|
||||
from neutron_lib import constants as n_cons
|
||||
from oslo_utils import uuidutils
|
||||
from pyroute2.ipdb import routes as ipdb_routes
|
||||
from pyroute2.iproute import linux as iproute_linux
|
||||
import testtools
|
||||
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.common import utils as common_utils
|
||||
from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional import base as functional_base
|
||||
|
||||
|
||||
@ -450,3 +455,140 @@ class GetIpAddressesTestCase(functional_base.BaseSudoTestCase):
|
||||
self.assertEqual(interfaces[int_name]['cidr'], cidr)
|
||||
self.assertEqual(interfaces[int_name]['scope'],
|
||||
ip_lib.IP_ADDRESS_SCOPE[ip_address['scope']])
|
||||
|
||||
|
||||
class RouteTestCase(functional_base.BaseSudoTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(RouteTestCase, self).setUp()
|
||||
self.namespace = self.useFixture(net_helpers.NamespaceFixture()).name
|
||||
self.device_name = 'test_device'
|
||||
ip_lib.IPWrapper(self.namespace).add_dummy(self.device_name)
|
||||
self.device = ip_lib.IPDevice(self.device_name, self.namespace)
|
||||
self.device.link.set_up()
|
||||
|
||||
def _check_routes(self, cidrs, table=None, gateway=None, metric=None,
|
||||
scope=None):
|
||||
table = table or iproute_linux.DEFAULT_TABLE
|
||||
if not scope:
|
||||
scope = 'universe' if gateway else 'link'
|
||||
scope = priv_ip_lib._get_scope_name(scope)
|
||||
for cidr in cidrs:
|
||||
ip_version = common_utils.get_ip_version(cidr)
|
||||
if ip_version == n_cons.IP_VERSION_6 and not metric:
|
||||
metric = ipdb_routes.IP6_RT_PRIO_USER
|
||||
if ip_version == n_cons.IP_VERSION_6:
|
||||
scope = 0
|
||||
routes = priv_ip_lib.list_ip_routes(self.namespace, ip_version)
|
||||
for route in routes:
|
||||
ip = ip_lib.get_attr(route, 'RTA_DST')
|
||||
mask = route['dst_len']
|
||||
if not (ip == str(netaddr.IPNetwork(cidr).ip) and
|
||||
mask == netaddr.IPNetwork(cidr).cidr.prefixlen):
|
||||
continue
|
||||
self.assertEqual(table, route['table'])
|
||||
self.assertEqual(
|
||||
priv_ip_lib._IP_VERSION_FAMILY_MAP[ip_version],
|
||||
route['family'])
|
||||
self.assertEqual(gateway,
|
||||
ip_lib.get_attr(route, 'RTA_GATEWAY'))
|
||||
self.assertEqual(metric,
|
||||
ip_lib.get_attr(route, 'RTA_PRIORITY'))
|
||||
self.assertEqual(scope, route['scope'])
|
||||
break
|
||||
else:
|
||||
self.fail('CIDR %s not found in the list of routes' % cidr)
|
||||
|
||||
def _check_gateway(self, gateway, table=None, metric=None):
|
||||
table = table or iproute_linux.DEFAULT_TABLE
|
||||
ip_version = common_utils.get_ip_version(gateway)
|
||||
if ip_version == n_cons.IP_VERSION_6 and not metric:
|
||||
metric = ipdb_routes.IP6_RT_PRIO_USER
|
||||
scope = 0
|
||||
routes = priv_ip_lib.list_ip_routes(self.namespace, ip_version)
|
||||
for route in routes:
|
||||
if not (ip_lib.get_attr(route, 'RTA_GATEWAY') == gateway):
|
||||
continue
|
||||
self.assertEqual(table, route['table'])
|
||||
self.assertEqual(
|
||||
priv_ip_lib._IP_VERSION_FAMILY_MAP[ip_version],
|
||||
route['family'])
|
||||
self.assertEqual(gateway,
|
||||
ip_lib.get_attr(route, 'RTA_GATEWAY'))
|
||||
self.assertEqual(scope, route['scope'])
|
||||
self.assertEqual(0, route['dst_len'])
|
||||
self.assertEqual(metric,
|
||||
ip_lib.get_attr(route, 'RTA_PRIORITY'))
|
||||
break
|
||||
else:
|
||||
self.fail('Default gateway %s not found in the list of routes'
|
||||
% gateway)
|
||||
|
||||
def _add_route_device_and_check(self, table=None, metric=None,
|
||||
scope='link'):
|
||||
cidrs = ['192.168.0.0/24', '172.90.0.0/16', '10.0.0.0/8',
|
||||
'2001:db8::/64']
|
||||
for cidr in cidrs:
|
||||
ip_version = common_utils.get_ip_version(cidr)
|
||||
priv_ip_lib.add_ip_route(self.namespace, cidr, ip_version,
|
||||
device=self.device_name, table=table,
|
||||
metric=metric, scope=scope)
|
||||
|
||||
self._check_routes(cidrs, table=table, metric=metric, scope=scope)
|
||||
|
||||
def test_add_route_device(self):
|
||||
self._add_route_device_and_check(table=None)
|
||||
|
||||
def test_add_route_device_table(self):
|
||||
self._add_route_device_and_check(table=100)
|
||||
|
||||
def test_add_route_device_metric(self):
|
||||
self._add_route_device_and_check(metric=50)
|
||||
|
||||
def test_add_route_device_table_metric(self):
|
||||
self._add_route_device_and_check(table=200, metric=30)
|
||||
|
||||
def test_add_route_device_scope_global(self):
|
||||
self._add_route_device_and_check(scope='global')
|
||||
|
||||
def test_add_route_device_scope_site(self):
|
||||
self._add_route_device_and_check(scope='site')
|
||||
|
||||
def test_add_route_device_scope_host(self):
|
||||
self._add_route_device_and_check(scope='host')
|
||||
|
||||
def test_add_route_via_ipv4(self):
|
||||
cidrs = ['192.168.0.0/24', '172.90.0.0/16', '10.0.0.0/8']
|
||||
int_cidr = '192.168.20.1/24'
|
||||
int_ip_address = str(netaddr.IPNetwork(int_cidr).ip)
|
||||
self.device.addr.add(int_cidr)
|
||||
for cidr in cidrs:
|
||||
ip_version = common_utils.get_ip_version(cidr)
|
||||
priv_ip_lib.add_ip_route(self.namespace, cidr, ip_version,
|
||||
via=int_ip_address)
|
||||
self._check_routes(cidrs, gateway=int_ip_address)
|
||||
|
||||
def test_add_route_via_ipv6(self):
|
||||
cidrs = ['2001:db8::/64', 'faaa::/96']
|
||||
int_cidr = 'fd00::1/64'
|
||||
via_ip = 'fd00::2'
|
||||
self.device.addr.add(int_cidr)
|
||||
for cidr in cidrs:
|
||||
ip_version = common_utils.get_ip_version(cidr)
|
||||
priv_ip_lib.add_ip_route(self.namespace, cidr, ip_version,
|
||||
via=via_ip)
|
||||
self._check_routes(cidrs, gateway=via_ip)
|
||||
|
||||
def test_add_default(self):
|
||||
ip_addresses = ['192.168.0.1/24', '172.90.0.1/16', '10.0.0.1/8',
|
||||
'2001:db8::1/64', 'faaa::1/96']
|
||||
for ip_address in ip_addresses:
|
||||
ip_version = common_utils.get_ip_version(ip_address)
|
||||
if ip_version == n_cons.IP_VERSION_4:
|
||||
_ip = str(netaddr.IPNetwork(ip_address).ip)
|
||||
else:
|
||||
_ip = str(netaddr.IPNetwork(ip_address).ip + 1)
|
||||
self.device.addr.add(ip_address)
|
||||
priv_ip_lib.add_ip_route(self.namespace, None, ip_version,
|
||||
device=self.device_name, via=_ip)
|
||||
self._check_gateway(_ip)
|
||||
|
@ -871,40 +871,6 @@ class TestIpRouteCommand(TestIPCmdBase):
|
||||
{'sample': GATEWAY_SAMPLE7,
|
||||
'expected': {'metric': 1}}]
|
||||
|
||||
def test_add_gateway(self):
|
||||
self.route_cmd.add_gateway(self.gateway, self.metric, self.table)
|
||||
self._assert_sudo([self.ip_version],
|
||||
('replace', 'default',
|
||||
'via', self.gateway,
|
||||
'metric', self.metric,
|
||||
'dev', self.parent.name,
|
||||
'table', self.table))
|
||||
|
||||
def test_add_gateway_subtable(self):
|
||||
self.route_cmd.table(self.table).add_gateway(self.gateway, self.metric)
|
||||
self._assert_sudo([self.ip_version],
|
||||
('replace', 'default',
|
||||
'via', self.gateway,
|
||||
'metric', self.metric,
|
||||
'dev', self.parent.name,
|
||||
'table', self.table))
|
||||
|
||||
def test_del_gateway_success(self):
|
||||
self.route_cmd.delete_gateway(self.gateway, table=self.table)
|
||||
self._assert_sudo([self.ip_version],
|
||||
('del', 'default',
|
||||
'via', self.gateway,
|
||||
'dev', self.parent.name,
|
||||
'table', self.table))
|
||||
|
||||
def test_del_gateway_success_subtable(self):
|
||||
self.route_cmd.table(table=self.table).delete_gateway(self.gateway)
|
||||
self._assert_sudo([self.ip_version],
|
||||
('del', 'default',
|
||||
'via', self.gateway,
|
||||
'dev', self.parent.name,
|
||||
'table', self.table))
|
||||
|
||||
def test_del_gateway_cannot_find_device(self):
|
||||
self.parent._as_root.side_effect = RuntimeError("Cannot find device")
|
||||
|
||||
@ -919,44 +885,10 @@ class TestIpRouteCommand(TestIPCmdBase):
|
||||
self.assertRaises(RuntimeError, self.route_cmd.delete_gateway,
|
||||
self.gateway, table=self.table)
|
||||
|
||||
def test_get_gateway(self):
|
||||
for test_case in self.test_cases:
|
||||
self.parent._run = mock.Mock(return_value=test_case['sample'])
|
||||
self.assertEqual(self.route_cmd.get_gateway(),
|
||||
test_case['expected'])
|
||||
|
||||
def test_flush_route_table(self):
|
||||
self.route_cmd.flush(self.ip_version, self.table)
|
||||
self._assert_sudo([self.ip_version], ('flush', 'table', self.table))
|
||||
|
||||
def test_add_route(self):
|
||||
self.route_cmd.add_route(self.cidr, self.ip, self.table)
|
||||
self._assert_sudo([self.ip_version],
|
||||
('replace', self.cidr,
|
||||
'via', self.ip,
|
||||
'dev', self.parent.name,
|
||||
'table', self.table))
|
||||
|
||||
def test_add_route_no_via(self):
|
||||
self.route_cmd.add_route(self.cidr, table=self.table)
|
||||
self._assert_sudo([self.ip_version],
|
||||
('replace', self.cidr,
|
||||
'dev', self.parent.name,
|
||||
'table', self.table))
|
||||
|
||||
def test_add_route_with_scope(self):
|
||||
self.route_cmd.add_route(self.cidr, scope='link')
|
||||
self._assert_sudo([self.ip_version],
|
||||
('replace', self.cidr,
|
||||
'dev', self.parent.name,
|
||||
'scope', 'link'))
|
||||
|
||||
def test_add_route_no_device(self):
|
||||
self.parent._as_root.side_effect = RuntimeError("Cannot find device")
|
||||
self.assertRaises(exceptions.DeviceNotFoundError,
|
||||
self.route_cmd.add_route,
|
||||
self.cidr, self.ip, self.table)
|
||||
|
||||
def test_delete_route(self):
|
||||
self.route_cmd.delete_route(self.cidr, self.ip, self.table)
|
||||
self._assert_sudo([self.ip_version],
|
||||
@ -985,54 +917,6 @@ class TestIpRouteCommand(TestIPCmdBase):
|
||||
self.route_cmd.delete_route,
|
||||
self.cidr, self.ip, self.table)
|
||||
|
||||
def test_list_routes(self):
|
||||
self.parent._run.return_value = (
|
||||
"default via 172.124.4.1 dev eth0 metric 100\n"
|
||||
"10.0.0.0/22 dev eth0 scope link\n"
|
||||
"172.24.4.0/24 dev eth0 proto kernel src 172.24.4.2\n")
|
||||
routes = self.route_cmd.table(self.table).list_routes(self.ip_version)
|
||||
self.assertEqual([{'cidr': '0.0.0.0/0',
|
||||
'dev': 'eth0',
|
||||
'metric': '100',
|
||||
'table': 14,
|
||||
'via': '172.124.4.1'},
|
||||
{'cidr': '10.0.0.0/22',
|
||||
'dev': 'eth0',
|
||||
'scope': 'link',
|
||||
'table': 14},
|
||||
{'cidr': '172.24.4.0/24',
|
||||
'dev': 'eth0',
|
||||
'proto': 'kernel',
|
||||
'src': '172.24.4.2',
|
||||
'table': 14}], routes)
|
||||
|
||||
def test_list_onlink_routes_subtable(self):
|
||||
self.parent._run.return_value = (
|
||||
"10.0.0.0/22\n"
|
||||
"172.24.4.0/24 proto kernel src 172.24.4.2\n")
|
||||
routes = self.route_cmd.table(self.table).list_onlink_routes(
|
||||
self.ip_version)
|
||||
self.assertEqual(['10.0.0.0/22'], [r['cidr'] for r in routes])
|
||||
self._assert_call([self.ip_version],
|
||||
('list', 'dev', self.parent.name,
|
||||
'table', self.table, 'scope', 'link'))
|
||||
|
||||
def test_add_onlink_route_subtable(self):
|
||||
self.route_cmd.table(self.table).add_onlink_route(self.cidr)
|
||||
self._assert_sudo([self.ip_version],
|
||||
('replace', self.cidr,
|
||||
'dev', self.parent.name,
|
||||
'table', self.table,
|
||||
'scope', 'link'))
|
||||
|
||||
def test_delete_onlink_route_subtable(self):
|
||||
self.route_cmd.table(self.table).delete_onlink_route(self.cidr)
|
||||
self._assert_sudo([self.ip_version],
|
||||
('del', self.cidr,
|
||||
'dev', self.parent.name,
|
||||
'table', self.table,
|
||||
'scope', 'link'))
|
||||
|
||||
|
||||
class TestIPv6IpRouteCommand(TestIpRouteCommand):
|
||||
def setUp(self):
|
||||
@ -1059,63 +943,6 @@ class TestIPv6IpRouteCommand(TestIpRouteCommand):
|
||||
{'gateway': '2001:470:9:1224:4508:b885:5fb:740b',
|
||||
'metric': 1024}}]
|
||||
|
||||
def test_list_routes(self):
|
||||
self.parent._run.return_value = (
|
||||
"default via 2001:db8::1 dev eth0 metric 100\n"
|
||||
"2001:db8::/64 dev eth0 proto kernel src 2001:db8::2\n")
|
||||
routes = self.route_cmd.table(self.table).list_routes(self.ip_version)
|
||||
self.assertEqual([{'cidr': '::/0',
|
||||
'dev': 'eth0',
|
||||
'metric': '100',
|
||||
'table': 14,
|
||||
'via': '2001:db8::1'},
|
||||
{'cidr': '2001:db8::/64',
|
||||
'dev': 'eth0',
|
||||
'proto': 'kernel',
|
||||
'src': '2001:db8::2',
|
||||
'table': 14}], routes)
|
||||
|
||||
|
||||
class TestIPRoute(TestIpRouteCommand):
|
||||
"""Leverage existing tests for IpRouteCommand for IPRoute
|
||||
|
||||
This test leverages the tests written for IpRouteCommand. The difference
|
||||
is that the 'dev' argument should not be passed for each of the commands.
|
||||
So, this test removes the dev argument from the expected arguments in each
|
||||
assert.
|
||||
"""
|
||||
def setUp(self):
|
||||
super(TestIPRoute, self).setUp()
|
||||
self.parent = ip_lib.IPRoute()
|
||||
self.parent._run = mock.Mock()
|
||||
self.parent._as_root = mock.Mock()
|
||||
self.route_cmd = self.parent.route
|
||||
self.check_dev_args = False
|
||||
|
||||
def _remove_dev_args(self, args):
|
||||
def args_without_dev():
|
||||
previous = None
|
||||
for arg in args:
|
||||
if 'dev' not in (arg, previous):
|
||||
yield arg
|
||||
previous = arg
|
||||
|
||||
return tuple(arg for arg in args_without_dev())
|
||||
|
||||
def _assert_call(self, options, args):
|
||||
if not self.check_dev_args:
|
||||
args = self._remove_dev_args(args)
|
||||
super(TestIPRoute, self)._assert_call(options, args)
|
||||
|
||||
def _assert_sudo(self, options, args, use_root_namespace=False):
|
||||
if not self.check_dev_args:
|
||||
args = self._remove_dev_args(args)
|
||||
super(TestIPRoute, self)._assert_sudo(options, args)
|
||||
|
||||
def test_del_gateway_cannot_find_device(self):
|
||||
# This test doesn't make sense for this case since dev won't be passed
|
||||
pass
|
||||
|
||||
|
||||
class TestIpNetnsCommand(TestIPCmdBase):
|
||||
def setUp(self):
|
||||
|
Loading…
Reference in New Issue
Block a user