Merge "Implement IpAddrCommand.get_devices_with_ip using pyroute2"

This commit is contained in:
Zuul 2018-12-18 23:37:37 +00:00 committed by Gerrit Code Review
commit 23da72833f
7 changed files with 347 additions and 217 deletions

View File

@ -24,6 +24,8 @@ from neutron_lib import exceptions
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from pyroute2.netlink import rtnl
from pyroute2.netlink.rtnl import ifaddrmsg
from pyroute2.netlink.rtnl import ifinfmsg
from pyroute2 import NetlinkError
from pyroute2 import netns
@ -43,19 +45,26 @@ IP_NONLOCAL_BIND = 'net.ipv4.ip_nonlocal_bind'
LOOPBACK_DEVNAME = 'lo'
FB_TUNNEL_DEVICE_NAMES = ['gre0', 'gretap0', 'tunl0', 'erspan0', 'sit0',
'ip6tnl0', 'ip6gre0']
RULE_TABLES = {'default': 253,
'main': 254,
'local': 255}
IP_RULE_TABLES = {'default': 253,
'main': 254,
'local': 255}
# Rule indexes: pyroute2.netlink.rtnl
# Rule names: https://www.systutorials.com/docs/linux/man/8-ip-rule/
# NOTE(ralonsoh): 'masquerade' type is printed as 'nat' in 'ip rule' command
RULE_TYPES = {0: 'unspecified',
1: 'unicast',
6: 'blackhole',
7: 'unreachable',
8: 'prohibit',
10: 'nat'}
IP_RULE_TYPES = {0: 'unspecified',
1: 'unicast',
6: 'blackhole',
7: 'unreachable',
8: 'prohibit',
10: 'nat'}
IP_ADDRESS_SCOPE = {rtnl.rtscopes['RT_SCOPE_UNIVERSE']: 'global',
rtnl.rtscopes['RT_SCOPE_SITE']: 'site',
rtnl.rtscopes['RT_SCOPE_LINK']: 'link',
rtnl.rtscopes['RT_SCOPE_HOST']: 'host'}
IP_ADDRESS_SCOPE_NAME = {v: k for k, v in IP_ADDRESS_SCOPE.items()}
SYS_NET_PATH = '/sys/class/net'
DEFAULT_GW_PATTERN = re.compile(r"via (\S+)")
@ -138,7 +147,7 @@ class IPWrapper(SubProcessBase):
def get_devices(self, exclude_loopback=True, exclude_fb_tun_devices=True):
retval = []
try:
devices = privileged.get_devices(self.namespace)
devices = privileged.get_device_names(self.namespace)
except privileged.NetworkNamespaceNotFound:
return retval
@ -159,8 +168,18 @@ class IPWrapper(SubProcessBase):
if not ip:
return None
addr = IpAddrCommand(self)
devices = addr.get_devices_with_ip(to=ip)
cidr = common_utils.ip_to_cidr(ip)
kwargs = {'address': common_utils.cidr_to_ip(cidr)}
if not common_utils.is_cidr_host(cidr):
kwargs['mask'] = common_utils.cidr_mask_length(cidr)
devices = get_devices_with_ip(self.namespace, **kwargs)
if not devices:
# Search by broadcast address.
broadcast = common_utils.cidr_broadcast_address(cidr)
if broadcast:
devices = get_devices_with_ip(self.namespace,
broadcast=broadcast)
if devices:
return IPDevice(devices[0]['name'], namespace=self.namespace)
@ -475,68 +494,40 @@ class IpAddrCommand(IpDeviceCommandBase):
def flush(self, ip_version):
flush_ip_addresses(ip_version, self.name, self._parent.namespace)
def get_devices_with_ip(self, name=None, scope=None, to=None,
filters=None, ip_version=None):
"""Get a list of all the devices with an IP attached in the namespace.
:param name: if it's not None, only a device with that matching name
will be returned.
:param scope: address scope, for example, global, link, or host
:param to: IP address or cidr to match. If cidr then it will match
any IP within the specified subnet
:param filters: list of any other filters supported by /sbin/ip
:param ip_version: 4 or 6
"""
options = [ip_version] if ip_version else []
args = ['show']
if name:
args += [name]
if filters:
args += filters
if scope:
args += ['scope', scope]
if to:
args += ['to', to]
retval = []
for line in self._run(options, tuple(args)).split('\n'):
line = line.strip()
match = DEVICE_NAME_PATTERN.search(line)
if match:
# Found a match for a device name, but its' addresses will
# only appear in following lines, so we may as well continue.
device_name = remove_interface_suffix(match.group(2))
continue
elif not line.startswith('inet'):
continue
parts = line.split(" ")
broadcast = None
if parts[0] == 'inet6':
scope = parts[3]
else:
if parts[2] == 'brd':
broadcast = parts[3]
scope = parts[5]
else:
scope = parts[3]
retval.append(dict(name=device_name,
cidr=parts[1],
scope=scope,
broadcast=broadcast,
dynamic=('dynamic' == parts[-1]),
tentative=('tentative' in line),
dadfailed=('dadfailed' == parts[-1])))
return retval
def list(self, scope=None, to=None, filters=None, ip_version=None):
"""Get device details of a device named <self.name>."""
return self.get_devices_with_ip(
self.name, scope, to, filters, ip_version)
def filter_device(device, filters):
# Accepted filters: dynamic, permanent, tentative, dadfailed.
for filter in filters:
if filter == 'permanent' and device['dynamic']:
return False
elif not device[filter]:
return False
return True
kwargs = {}
if to:
cidr = common_utils.ip_to_cidr(to)
kwargs = {'address': common_utils.cidr_to_ip(cidr)}
if not common_utils.is_cidr_host(cidr):
kwargs['mask'] = common_utils.cidr_mask_length(cidr)
if scope:
kwargs['scope'] = IP_ADDRESS_SCOPE_NAME[scope]
if ip_version:
kwargs['family'] = common_utils.get_socket_address_family(
ip_version)
devices = get_devices_with_ip(self._parent.namespace, name=self.name,
**kwargs)
if not filters:
return devices
filtered_devices = []
for device in (device for device in devices
if filter_device(device, filters)):
filtered_devices.append(device)
return filtered_devices
def wait_until_address_ready(self, address, wait_time=30):
"""Wait until an address is no longer marked 'tentative'
@ -1223,10 +1214,10 @@ def _parse_ip_rule(rule, ip_version):
to_ip, prefix=rule['dst_len'])
if common_utils.is_cidr_host(parsed_rule['to']):
parsed_rule['to'] = common_utils.cidr_to_ip(parsed_rule['to'])
parsed_rule['type'] = RULE_TYPES[rule['action']]
parsed_rule['type'] = IP_RULE_TYPES[rule['action']]
table_num = rule['attrs']['FRA_TABLE']
for table_name in (name for (name, index) in
RULE_TABLES.items() if index == table_num):
IP_RULE_TABLES.items() if index == table_num):
parsed_rule['table'] = table_name
break
else:
@ -1281,7 +1272,7 @@ def _make_pyroute2_args(ip, iif, table, priority, to):
cmd_args['dst'] = common_utils.cidr_to_ip(to)
cmd_args['dst_len'] = common_utils.cidr_mask(to)
if table:
cmd_args['table'] = RULE_TABLES.get(table) or int(table)
cmd_args['table'] = IP_RULE_TABLES.get(table) or int(table)
if priority:
cmd_args['priority'] = int(priority)
return cmd_args
@ -1337,3 +1328,52 @@ def delete_ip_rule(namespace, ip, iif=None, table=None, priority=None,
"""
cmd_args = _make_pyroute2_args(ip, iif, table, priority, to)
privileged.delete_ip_rule(namespace, **cmd_args)
def _parse_link_device(namespace, device, **kwargs):
"""Parse pytoute2 link device information
For each link device, the IP address information is retrieved and returned
in a dictionary.
IP address scope: http://linux-ip.net/html/tools-ip-address.html
"""
def get_attr(pyroute2_obj, attr_name):
rule_attrs = pyroute2_obj.get('attrs', [])
for attr in (attr for attr in rule_attrs if attr[0] == attr_name):
return attr[1]
return
retval = []
name = get_attr(device, 'IFLA_IFNAME')
ip_addresses = privileged.get_ip_addresses(namespace,
index=device['index'],
**kwargs)
for ip_address in ip_addresses:
ip = get_attr(ip_address, 'IFA_ADDRESS')
ip_length = ip_address['prefixlen']
cidr = common_utils.ip_to_cidr(ip, prefix=ip_length)
flags = get_attr(ip_address, 'IFA_FLAGS')
dynamic = not bool(flags & ifaddrmsg.IFA_F_PERMANENT)
tentative = bool(flags & ifaddrmsg.IFA_F_TENTATIVE)
dadfailed = bool(flags & ifaddrmsg.IFA_F_DADFAILED)
scope = IP_ADDRESS_SCOPE[ip_address['scope']]
retval.append({'name': name,
'cidr': cidr,
'scope': scope,
'broadcast': get_attr(ip_address, 'IFA_BROADCAST'),
'dynamic': dynamic,
'tentative': tentative,
'dadfailed': dadfailed})
return retval
def get_devices_with_ip(namespace, name=None, **kwargs):
link_args = {}
if name:
link_args['ifname'] = name
devices = privileged.get_link_devices(namespace, **link_args)
retval = []
for parsed_ips in (_parse_link_device(namespace, device, **kwargs)
for device in devices):
retval += parsed_ips
return retval

View File

@ -266,6 +266,29 @@ def is_cidr_host(cidr):
return net.prefixlen == n_const.IPv6_BITS
def cidr_mask_length(cidr):
"""Returns the mask length of a cidr
:param cidr: (string) either an ipv4 or ipv6 cidr or a host IP.
:returns: (int) mask length of a cidr; in case of host IP, the mask length
will be 32 (IPv4) or 128 (IPv6)
"""
return netaddr.IPNetwork(cidr).netmask.netmask_bits()
def cidr_broadcast_address(cidr):
"""Returns the broadcast address of a cidr
:param cidr: (string, netaddr.IPNetwork, netaddr.IPAddress) either an ipv4
or ipv6 cidr or a host IP.
:returns: (string) broadcast address of the cidr, None if the cidr has no
broadcast domain
"""
broadcast = netaddr.IPNetwork(cidr).broadcast
if broadcast:
return str(broadcast)
def get_ip_version(ip_or_cidr):
return netaddr.IPNetwork(ip_or_cidr).version

View File

@ -57,6 +57,7 @@ from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.common import config
from neutron.common import constants as c_const
from neutron.common import utils as n_utils
from neutron.conf.agent import common as agent_config
from neutron.conf.agent import xenapi_conf
from neutron.plugins.ml2.drivers.agent import capabilities
from neutron.plugins.ml2.drivers.l2pop.rpc_manager import l2population_rpc
@ -2322,6 +2323,7 @@ def main(bridge_classes):
prepare_xen_compute()
ovs_capabilities.register()
ext_manager.register_opts(cfg.CONF)
agent_config.setup_privsep()
ext_mgr = ext_manager.L2AgentExtensionsManager(cfg.CONF)

View File

@ -15,6 +15,7 @@ import socket
from neutron_lib import constants
import pyroute2
from pyroute2 import netlink
from pyroute2.netlink import exceptions as netlink_exceptions
from pyroute2.netlink import rtnl
from pyroute2.netlink.rtnl import ifinfmsg
@ -456,16 +457,62 @@ def list_netns(**kwargs):
return netns.listnetns(**kwargs)
@privileged.default.entrypoint
def get_devices(namespace, **kwargs):
"""List all interfaces in a namespace
def _make_serializable(value):
"""Make a pyroute2 object serializable
:return: a list of strings with the names of the interfaces in a namespace
This function converts 'netlink.nla_slot' object (key, value) in a list
of two elements.
"""
if isinstance(value, list):
return [_make_serializable(item) for item in value]
elif isinstance(value, dict):
return {key: _make_serializable(data) for key, data in value.items()}
elif isinstance(value, netlink.nla_slot):
return [value[0], _make_serializable(value[1])]
elif isinstance(value, tuple):
return tuple(_make_serializable(item) for item in value)
return value
@privileged.default.entrypoint
def get_link_devices(namespace, **kwargs):
"""List interfaces in a namespace
:return: (list) interfaces in a namespace
"""
try:
with _get_iproute(namespace) as ip:
return [link.get_attr('IFLA_IFNAME')
for link in ip.get_links(**kwargs)]
return _make_serializable(ip.get_links(**kwargs))
except OSError as e:
if e.errno == errno.ENOENT:
raise NetworkNamespaceNotFound(netns_name=namespace)
raise
def get_device_names(namespace, **kwargs):
"""List interface names in a namespace
:return: a list of strings with the names of the interfaces in a namespace
"""
devices_attrs = [link['attrs'] for link
in get_link_devices(namespace, **kwargs)]
device_names = []
for device_attrs in devices_attrs:
for link_name in (link_attr[1] for link_attr in device_attrs
if link_attr[0] == 'IFLA_IFNAME'):
device_names.append(link_name)
return device_names
@privileged.default.entrypoint
def get_ip_addresses(namespace, **kwargs):
"""List of IP addresses in a namespace
:return: (tuple) IP addresses in a namespace
"""
try:
with _get_iproute(namespace) as ip:
return _make_serializable(ip.get_addr(**kwargs))
except OSError as e:
if e.errno == errno.ENOENT:
raise NetworkNamespaceNotFound(netns_name=namespace)

View File

@ -21,12 +21,19 @@ from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
from neutron.tests.functional import base as functional_base
class GetDevicesTestCase(functional_base.BaseLoggingTestCase):
def _get_attr(pyroute2_obj, attr_name):
rule_attrs = pyroute2_obj.get('attrs', [])
for attr in (attr for attr in rule_attrs if attr[0] == attr_name):
return attr[1]
return
class GetDeviceNamesTestCase(functional_base.BaseSudoTestCase):
def _remove_ns(self, namespace):
priv_ip_lib.remove_netns(namespace)
def test_get_devices(self):
def test_get_device_names(self):
namespace = 'ns_test-' + uuidutils.generate_uuid()
priv_ip_lib.create_netns(namespace)
self.addCleanup(self._remove_ns, namespace)
@ -36,14 +43,16 @@ class GetDevicesTestCase(functional_base.BaseLoggingTestCase):
for interface in interfaces:
priv_ip_lib.create_interface(interface, namespace, 'dummy')
device_names = priv_ip_lib.get_devices(namespace)
device_names = priv_ip_lib.get_device_names(namespace)
self.assertGreater(len(device_names), 0)
for name in device_names:
self.assertIn(name, interfaces_to_check)
for interface in interfaces:
priv_ip_lib.delete_interface(interface, namespace)
device_names = priv_ip_lib.get_devices(namespace)
device_names = priv_ip_lib.get_device_names(namespace)
self.assertGreater(len(device_names), 0)
for name in device_names:
self.assertNotIn(name, interfaces)
@ -240,3 +249,36 @@ class RuleTestCase(functional_base.BaseSudoTestCase):
rules = ip_lib.list_ip_rules(self.namespace, 4)
self._check_rules(rules, ['iif'], [iif], 'iif name %s' % iif)
self.assertEqual(4, len(rules))
class GetIpAddressesTestCase(functional_base.BaseSudoTestCase):
def _remove_ns(self, namespace):
priv_ip_lib.remove_netns(namespace)
def test_get_ip_addresses(self):
namespace = 'ns_test-' + uuidutils.generate_uuid()
priv_ip_lib.create_netns(namespace)
self.addCleanup(self._remove_ns, namespace)
interfaces = {
'20': {'cidr': '192.168.10.20/24', 'scope': 'link',
'add_broadcast': True},
'30': {'cidr': '2001::1/64', 'scope': 'global',
'add_broadcast': False}}
for int_name, int_parameters in interfaces.items():
priv_ip_lib.create_interface(int_name, namespace, 'dummy',
index=int(int_name))
ip_lib.add_ip_address(
int_parameters['cidr'], int_name, namespace,
int_parameters['scope'], int_parameters['add_broadcast'])
ip_addresses = priv_ip_lib.get_ip_addresses(namespace)
for ip_address in ip_addresses:
int_name = str(ip_address['index'])
ip = _get_attr(ip_address, 'IFA_ADDRESS')
mask = ip_address['prefixlen']
cidr = common_utils.ip_to_cidr(ip, mask)
self.assertEqual(interfaces[int_name]['cidr'], cidr)
self.assertEqual(interfaces[int_name]['scope'],
ip_lib.IP_ADDRESS_SCOPE[ip_address['scope']])

View File

@ -25,6 +25,7 @@ from oslo_utils import fileutils
import testtools
from neutron.agent.linux import dhcp
from neutron.agent.linux import ip_lib
from neutron.conf.agent import common as config
from neutron.conf.agent import dhcp as dhcp_config
from neutron.conf import common as base_config
@ -1233,6 +1234,16 @@ class TestDhcpLocalProcess(TestBase):
class TestDnsmasq(TestBase):
def setUp(self):
super(TestDnsmasq, self).setUp()
self._mock_get_devices_with_ip = mock.patch.object(
ip_lib, 'get_devices_with_ip')
self.mock_get_devices_with_ip = self._mock_get_devices_with_ip.start()
self.addCleanup(self._stop_mocks)
def _stop_mocks(self):
self._mock_get_devices_with_ip.stop()
def _get_dnsmasq(self, network, process_monitor=None):
process_monitor = process_monitor or mock.Mock()
return dhcp.Dnsmasq(self.conf, network,

View File

@ -23,6 +23,7 @@ from neutron_lib import constants
from neutron_lib import exceptions
from oslo_utils import uuidutils
import pyroute2
from pyroute2.netlink.rtnl import ifaddrmsg
from pyroute2.netlink.rtnl import ifinfmsg
from pyroute2.netlink.rtnl import ndmsg
from pyroute2 import NetlinkError
@ -41,58 +42,6 @@ NETNS_SAMPLE = [
'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb',
'cccccccc-cccc-cccc-cccc-cccccccccccc']
ADDR_SAMPLE = ("""
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000
link/ether dd:cc:aa:b9:76:ce brd ff:ff:ff:ff:ff:ff
inet 172.16.77.240/24 brd 172.16.77.255 scope global eth0
inet6 2001:470:9:1224:5595:dd51:6ba2:e788/64 scope global temporary dynamic
valid_lft 14187sec preferred_lft 3387sec
inet6 fe80::3023:39ff:febc:22ae/64 scope link tentative
valid_lft forever preferred_lft forever
inet6 fe80::3023:39ff:febc:22af/64 scope link tentative dadfailed
valid_lft forever preferred_lft forever
inet6 2001:470:9:1224:fd91:272:581e:3a32/64 scope global temporary """
"""deprecated dynamic
valid_lft 14187sec preferred_lft 0sec
inet6 2001:470:9:1224:4508:b885:5fb:740b/64 scope global temporary """
"""deprecated dynamic
valid_lft 14187sec preferred_lft 0sec
inet6 2001:470:9:1224:dfcc:aaff:feb9:76ce/64 scope global dynamic
valid_lft 14187sec preferred_lft 3387sec
inet6 fe80::dfcc:aaff:feb9:76ce/64 scope link
valid_lft forever preferred_lft forever
""")
ADDR_SAMPLE2 = ("""
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000
link/ether dd:cc:aa:b9:76:ce brd ff:ff:ff:ff:ff:ff
inet 172.16.77.240/24 scope global eth0
inet6 2001:470:9:1224:5595:dd51:6ba2:e788/64 scope global temporary dynamic
valid_lft 14187sec preferred_lft 3387sec
inet6 fe80::3023:39ff:febc:22ae/64 scope link tentative
valid_lft forever preferred_lft forever
inet6 fe80::3023:39ff:febc:22af/64 scope link tentative dadfailed
valid_lft forever preferred_lft forever
inet6 2001:470:9:1224:fd91:272:581e:3a32/64 scope global temporary """
"""deprecated dynamic
valid_lft 14187sec preferred_lft 0sec
inet6 2001:470:9:1224:4508:b885:5fb:740b/64 scope global temporary """
"""deprecated dynamic
valid_lft 14187sec preferred_lft 0sec
inet6 2001:470:9:1224:dfcc:aaff:feb9:76ce/64 scope global dynamic
valid_lft 14187sec preferred_lft 3387sec
inet6 fe80::dfcc:aaff:feb9:76ce/64 scope link
valid_lft forever preferred_lft forever
""")
ADDR_SAMPLE3 = ("""
2: eth0@NONE: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP
link/ether dd:cc:aa:b9:76:ce brd ff:ff:ff:ff:ff:ff
inet 172.16.77.240/24 brd 172.16.77.255 scope global eth0
""")
GATEWAY_SAMPLE1 = ("""
default via 10.35.19.254 metric 100
10.35.16.0/22 proto kernel scope link src 10.35.17.97
@ -211,7 +160,7 @@ class TestIpWrapper(base.BaseTestCase):
self.execute_p = mock.patch.object(ip_lib.IPWrapper, '_execute')
self.execute = self.execute_p.start()
@mock.patch.object(priv_lib, 'get_devices')
@mock.patch.object(priv_lib, 'get_device_names')
def test_get_devices(self, mock_get_devices):
interfaces = ['br01', 'lo', 'gre0']
mock_get_devices.return_value = interfaces
@ -220,7 +169,7 @@ class TestIpWrapper(base.BaseTestCase):
self.assertEqual('br01', device.name)
interfaces.remove(device.name)
@mock.patch.object(priv_lib, 'get_devices')
@mock.patch.object(priv_lib, 'get_device_names')
def test_get_devices_include_loopback_and_gre(self, mock_get_devices):
interfaces = ['br01', 'lo', 'gre0']
mock_get_devices.return_value = interfaces
@ -231,7 +180,7 @@ class TestIpWrapper(base.BaseTestCase):
interfaces.remove(device.name)
self.assertEqual(0, len(interfaces))
@mock.patch.object(priv_lib, 'get_devices')
@mock.patch.object(priv_lib, 'get_device_names')
def test_get_devices_no_netspace(self, mock_get_devices):
mock_get_devices.side_effect = priv_lib.NetworkNamespaceNotFound(
netns_name='foo')
@ -620,7 +569,7 @@ class TestIpRuleCommand(TestIPCmdBase):
def _test_add_rule(self, ip, iif, table, priority):
ip_version = netaddr.IPNetwork(ip).version
ip_family = common_utils.get_socket_address_family(ip_version)
table_num = ip_lib.RULE_TABLES.get(table) or int(table)
table_num = ip_lib.IP_RULE_TABLES.get(table) or int(table)
cmd_args = {'table': table_num,
'priority': priority,
'family': ip_family}
@ -809,48 +758,8 @@ class TestIpAddrCommand(TestIPCmdBase):
flush.assert_called_once_with(
6, self.parent.name, self.addr_cmd._parent.namespace)
def test_list(self):
expected_brd = [
dict(name='eth0', scope='global', tentative=False, dadfailed=False,
dynamic=False, cidr='172.16.77.240/24',
broadcast='172.16.77.255')]
expected_no_brd = [
dict(name='eth0', scope='global', tentative=False, dadfailed=False,
dynamic=False, cidr='172.16.77.240/24', broadcast=None)]
expected_ipv6 = [
dict(name='eth0', scope='global', dadfailed=False, tentative=False,
dynamic=True, cidr='2001:470:9:1224:5595:dd51:6ba2:e788/64',
broadcast=None),
dict(name='eth0', scope='link', dadfailed=False, tentative=True,
dynamic=False, cidr='fe80::3023:39ff:febc:22ae/64',
broadcast=None),
dict(name='eth0', scope='link', dadfailed=True, tentative=True,
dynamic=False, cidr='fe80::3023:39ff:febc:22af/64',
broadcast=None),
dict(name='eth0', scope='global', dadfailed=False, tentative=False,
dynamic=True, cidr='2001:470:9:1224:fd91:272:581e:3a32/64',
broadcast=None),
dict(name='eth0', scope='global', dadfailed=False, tentative=False,
dynamic=True, cidr='2001:470:9:1224:4508:b885:5fb:740b/64',
broadcast=None),
dict(name='eth0', scope='global', dadfailed=False, tentative=False,
dynamic=True, cidr='2001:470:9:1224:dfcc:aaff:feb9:76ce/64',
broadcast=None),
dict(name='eth0', scope='link', dadfailed=False, tentative=False,
dynamic=False, cidr='fe80::dfcc:aaff:feb9:76ce/64',
broadcast=None)]
cases = [
(ADDR_SAMPLE, expected_brd + expected_ipv6),
(ADDR_SAMPLE2, expected_no_brd + expected_ipv6)]
for test_case, expected in cases:
self.parent._run = mock.Mock(return_value=test_case)
self.assertEqual(expected, self.addr_cmd.list())
self._assert_call([], ('show', 'tap0'))
def test_wait_until_address_ready(self):
self.parent._run.return_value = ADDR_SAMPLE
self.addr_cmd.list = mock.Mock(return_value=[{'tentative': False}])
# this address is not tentative or failed so it should return
self.assertIsNone(self.addr_cmd.wait_until_address_ready(
'2001:470:9:1224:fd91:272:581e:3a32'))
@ -869,44 +778,68 @@ class TestIpAddrCommand(TestIPCmdBase):
self.addr_cmd.wait_until_address_ready(tentative_address,
wait_time=1)
def test_list_filtered(self):
expected_brd = [
dict(name='eth0', scope='global', tentative=False, dadfailed=False,
dynamic=False, cidr='172.16.77.240/24',
broadcast='172.16.77.255')]
expected_no_brd = [
dict(name='eth0', scope='global', tentative=False, dadfailed=False,
dynamic=False, cidr='172.16.77.240/24', broadcast=None)]
@mock.patch.object(ip_lib, 'get_devices_with_ip')
def test_list(self, mock_get_dev_ip):
self.addr_cmd._parent.namespace = 'test_ns'
self.addr_cmd.list()
mock_get_dev_ip.assert_called_once_with('test_ns',
name=self.addr_cmd.name)
cases = [
(ADDR_SAMPLE, expected_brd), (ADDR_SAMPLE2, expected_no_brd)]
@mock.patch.object(ip_lib, 'get_devices_with_ip')
def test_list_scope(self, mock_get_dev_ip):
self.addr_cmd._parent.namespace = 'test_ns'
self.addr_cmd.list(scope='link')
mock_get_dev_ip.assert_called_once_with('test_ns',
name=self.addr_cmd.name,
scope=253)
for test_case, expected in cases:
output = '\n'.join(test_case.split('\n')[0:4])
self.parent._run.return_value = output
self.assertEqual(
expected,
self.addr_cmd.list(
'global', filters=['permanent']))
self._assert_call([], ('show', 'tap0', 'permanent', 'scope',
'global'))
@mock.patch.object(ip_lib, 'get_devices_with_ip')
def test_list_to(self, mock_get_dev_ip):
self.addr_cmd._parent.namespace = 'test_ns'
cidrs = [{'cidr': '1.2.3.4', 'mask': None},
{'cidr': '1.2.3.4/24', 'mask': 24},
{'cidr': '2001:db8::1', 'mask': None},
{'cidr': '2001:db8::1/64', 'mask': 64}]
for cidr in cidrs:
self.addr_cmd.list(to=cidr['cidr'])
args = {'name': self.addr_cmd.name,
'address': common_utils.cidr_to_ip(cidr['cidr'])}
if cidr['mask']:
args['mask'] = cidr['mask']
mock_get_dev_ip.assert_called_once_with('test_ns', **args)
mock_get_dev_ip.reset_mock()
def test_get_devices_with_ip(self):
# This can only verify that get_devices_with_ip() returns a dict
# with the correct entry, it doesn't actually test that it only
# returns items filtered by the arguments since it isn't calling
# /sbin/ip at all.
self.parent._run.return_value = ADDR_SAMPLE3
devices = self.addr_cmd.get_devices_with_ip(to='172.16.77.240/24')
self.assertEqual(1, len(devices))
expected = {'cidr': '172.16.77.240/24',
'broadcast': '172.16.77.255',
'dadfailed': False,
'dynamic': False,
'name': 'eth0',
'scope': 'global',
'tentative': False}
self.assertEqual(expected, devices[0])
@mock.patch.object(ip_lib, 'get_devices_with_ip')
def test_list_ip_version(self, mock_get_dev_ip):
self.addr_cmd._parent.namespace = 'test_ns'
ip_versions = [
{'ip_version': constants.IP_VERSION_4, 'family': socket.AF_INET},
{'ip_version': constants.IP_VERSION_6, 'family': socket.AF_INET6}]
for ip_version in ip_versions:
self.addr_cmd.list(ip_version=ip_version['ip_version'])
mock_get_dev_ip.assert_called_once_with(
'test_ns', name=self.addr_cmd.name,
family=ip_version['family'])
mock_get_dev_ip.reset_mock()
@mock.patch.object(ip_lib, 'get_devices_with_ip')
def test_list_filters_dynamic_permanent(self, mock_get_dev_ip):
self.addr_cmd._parent.namespace = 'test_ns'
mock_get_dev_ip.return_value = [{'dynamic': True}]
retval = self.addr_cmd.list(filters=['dynamic'])
self.assertEqual(1, len(retval))
retval = self.addr_cmd.list(filters=['permanent'])
self.assertEqual(0, len(retval))
@mock.patch.object(ip_lib, 'get_devices_with_ip')
def test_list_filters_tentative_dadfailed(self, mock_get_dev_ip):
self.addr_cmd._parent.namespace = 'test_ns'
mock_get_dev_ip.return_value = [{'tentative': True,
'dadfailed': False}]
retval = self.addr_cmd.list(filters=['tentative'])
self.assertEqual(1, len(retval))
retval = self.addr_cmd.list(filters=['tentative', 'dadfailed'])
self.assertEqual(0, len(retval))
class TestIpRouteCommand(TestIPCmdBase):
@ -1941,3 +1874,35 @@ class ListIpRulesTestCase(base.BaseTestCase):
{'type': 'blackhole', 'from': '0.0.0.0/0', 'priority': '0',
'table': 'local'}]
self.assertEqual(reference, retval)
class ParseLinkDeviceTestCase(base.BaseTestCase):
def setUp(self):
super(ParseLinkDeviceTestCase, self).setUp()
self._mock_get_ip_addresses = mock.patch.object(priv_lib,
'get_ip_addresses')
self.mock_get_ip_addresses = self._mock_get_ip_addresses.start()
self.addCleanup(self._stop_mock)
def _stop_mock(self):
self._mock_get_ip_addresses.stop()
def test_parse_link_devices(self):
device = ({'index': 1, 'attrs': [['IFLA_IFNAME', 'int_name']]})
self.mock_get_ip_addresses.return_value = [
{'prefixlen': 24, 'scope': 200, 'attrs': [
['IFA_ADDRESS', '192.168.10.20'],
['IFA_FLAGS', ifaddrmsg.IFA_F_PERMANENT]]},
{'prefixlen': 64, 'scope': 200, 'attrs': [
['IFA_ADDRESS', '2001:db8::1'],
['IFA_FLAGS', ifaddrmsg.IFA_F_PERMANENT]]}]
retval = ip_lib._parse_link_device('namespace', device)
expected = [{'scope': 'site', 'cidr': '192.168.10.20/24',
'dynamic': False, 'dadfailed': False, 'name': 'int_name',
'broadcast': None, 'tentative': False},
{'scope': 'site', 'cidr': '2001:db8::1/64',
'dynamic': False, 'dadfailed': False, 'name': 'int_name',
'broadcast': None, 'tentative': False}]
self.assertEqual(expected, retval)