Merge "Add ``devlink.get_port`` method for devlink ports"

This commit is contained in:
Zuul 2023-01-16 15:12:44 +00:00 committed by Gerrit Code Review
commit 294e392587
14 changed files with 429 additions and 178 deletions

View File

@ -24,6 +24,7 @@ from pyroute2.netlink import exceptions \
as netlink_exceptions # pylint: disable=no-name-in-module
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils as linux_utils
from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
# NOTE(toabctl): Don't use /sys/devices/virtual/net here because not all tap
@ -184,12 +185,12 @@ class FdbInterface(object):
if dev and dev != name:
continue
master = find_device_name(ip_lib.get_attr(fdb, 'NDA_MASTER'),
master = find_device_name(linux_utils.get_attr(fdb, 'NDA_MASTER'),
devices)
fdb_info = {'mac': ip_lib.get_attr(fdb, 'NDA_LLADDR'),
fdb_info = {'mac': linux_utils.get_attr(fdb, 'NDA_LLADDR'),
'master': master,
'vlan': ip_lib.get_attr(fdb, 'NDA_VLAN'),
'dst_ip': ip_lib.get_attr(fdb, 'NDA_DST')}
'vlan': linux_utils.get_attr(fdb, 'NDA_VLAN'),
'dst_ip': linux_utils.get_attr(fdb, 'NDA_DST')}
ret[name].append(fdb_info)
return ret

View File

@ -0,0 +1,54 @@
# Copyright 2022 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from neutron.agent.linux import utils as linux_utils
from neutron.privileged.agent.linux import devlink as priv_devlink
PortIndex = collections.namedtuple(
'PortIndex', ['name', 'pf_pci', 'pf_num', 'vf_num', 'is_parent'])
def get_port(port_name):
"""Retrieves the devlink port information, including the PF name."""
ports = priv_devlink.get_port_list()
# Build port index, with PF reference and VF index.
port_indexes = []
ret = None
for port in ports:
pf_pci = linux_utils.get_attr(port, 'DEVLINK_ATTR_DEV_NAME')
name = linux_utils.get_attr(port, 'DEVLINK_ATTR_PORT_NETDEV_NAME')
index = linux_utils.get_attr(port, 'DEVLINK_ATTR_PORT_INDEX')
pf_num = index >> 16
is_parent = index & 0xFFFF == 0xFFFF
vf_num = linux_utils.get_attr(port, 'DEVLINK_ATTR_PORT_PCI_VF_NUMBER')
port_indexes.append(PortIndex(name, pf_pci, pf_num, vf_num, is_parent))
if name == port_name:
ret = {'pf_pci': pf_pci,
'pf_num': pf_num,
'pf_name': None,
'vf_num': vf_num,
'vf_name': name,
}
if ret:
for port in port_indexes:
if port.pf_num == ret['pf_num'] and port.is_parent:
ret['pf_name'] = port.name
return ret

View File

@ -36,6 +36,7 @@ from pyroute2 import netns # pylint: disable=no-name-in-module
from neutron._i18n import _
from neutron.agent.common import utils
from neutron.agent.linux import utils as linux_utils
from neutron.common import utils as common_utils
from neutron.privileged.agent.linux import ip_lib as privileged
from neutron.privileged.agent.linux import utils as priv_utils
@ -1369,19 +1370,12 @@ def delete_ip_rule(namespace, ip, iif=None, table=None, priority=None,
privileged.delete_ip_rule(namespace, **cmd_args)
def get_attr(pyroute2_obj, attr_name):
"""Get an attribute from a PyRoute2 object"""
rule_attrs = pyroute2_obj.get('attrs', [])
for attr in (attr for attr in rule_attrs if attr[0] == attr_name):
return attr[1]
def _parse_ip_address(pyroute2_address, device_name):
ip = get_attr(pyroute2_address, 'IFA_ADDRESS')
ip = linux_utils.get_attr(pyroute2_address, 'IFA_ADDRESS')
ip_length = pyroute2_address['prefixlen']
event = IP_ADDRESS_EVENTS.get(pyroute2_address.get('event'))
cidr = common_utils.ip_to_cidr(ip, prefix=ip_length)
flags = get_attr(pyroute2_address, 'IFA_FLAGS')
flags = linux_utils.get_attr(pyroute2_address, 'IFA_FLAGS')
dynamic = not bool(flags & ifaddrmsg.IFA_F_PERMANENT)
tentative = bool(flags & ifaddrmsg.IFA_F_TENTATIVE)
dadfailed = bool(flags & ifaddrmsg.IFA_F_DADFAILED)
@ -1389,7 +1383,8 @@ def _parse_ip_address(pyroute2_address, device_name):
return {'name': device_name,
'cidr': cidr,
'scope': scope,
'broadcast': get_attr(pyroute2_address, 'IFA_BROADCAST'),
'broadcast': linux_utils.get_attr(pyroute2_address,
'IFA_BROADCAST'),
'dynamic': dynamic,
'tentative': tentative,
'dadfailed': dadfailed,
@ -1417,7 +1412,8 @@ def get_devices_with_ip(namespace, name=None, **kwargs):
devices = {} # {device index: name}
for ip_address in ip_addresses:
index = ip_address['index']
name = get_attr(ip_address, 'IFA_LABEL') or devices.get(index)
name = (linux_utils.get_attr(ip_address, 'IFA_LABEL') or
devices.get(index))
if not name:
device = get_devices_info(namespace, index=index)
if not device:
@ -1435,31 +1431,35 @@ def get_devices_info(namespace, **kwargs):
retval = {}
for device in devices:
ret = {'index': device['index'],
'name': get_attr(device, 'IFLA_IFNAME'),
'operstate': get_attr(device, 'IFLA_OPERSTATE'),
'linkmode': get_attr(device, 'IFLA_LINKMODE'),
'mtu': get_attr(device, 'IFLA_MTU'),
'promiscuity': get_attr(device, 'IFLA_PROMISCUITY'),
'mac': get_attr(device, 'IFLA_ADDRESS'),
'broadcast': get_attr(device, 'IFLA_BROADCAST')}
ifla_link = get_attr(device, 'IFLA_LINK')
'name': linux_utils.get_attr(device, 'IFLA_IFNAME'),
'operstate': linux_utils.get_attr(device, 'IFLA_OPERSTATE'),
'linkmode': linux_utils.get_attr(device, 'IFLA_LINKMODE'),
'mtu': linux_utils.get_attr(device, 'IFLA_MTU'),
'promiscuity': linux_utils.get_attr(device, 'IFLA_PROMISCUITY'),
'mac': linux_utils.get_attr(device, 'IFLA_ADDRESS'),
'broadcast': linux_utils.get_attr(device, 'IFLA_BROADCAST')}
ifla_link = linux_utils.get_attr(device, 'IFLA_LINK')
if ifla_link:
ret['parent_index'] = ifla_link
ifla_linkinfo = get_attr(device, 'IFLA_LINKINFO')
ifla_linkinfo = linux_utils.get_attr(device, 'IFLA_LINKINFO')
if ifla_linkinfo:
ret['kind'] = get_attr(ifla_linkinfo, 'IFLA_INFO_KIND')
ifla_data = get_attr(ifla_linkinfo, 'IFLA_INFO_DATA')
ret['kind'] = linux_utils.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND')
ifla_data = linux_utils.get_attr(ifla_linkinfo, 'IFLA_INFO_DATA')
if ret['kind'] == 'vxlan':
ret['vxlan_id'] = get_attr(ifla_data, 'IFLA_VXLAN_ID')
ret['vxlan_group'] = get_attr(ifla_data, 'IFLA_VXLAN_GROUP')
ret['vxlan_link_index'] = get_attr(ifla_data,
'IFLA_VXLAN_LINK')
ret['vxlan_id'] = linux_utils.get_attr(ifla_data,
'IFLA_VXLAN_ID')
ret['vxlan_group'] = linux_utils.get_attr(ifla_data,
'IFLA_VXLAN_GROUP')
ret['vxlan_link_index'] = linux_utils.get_attr(
ifla_data, 'IFLA_VXLAN_LINK')
elif ret['kind'] == 'vlan':
ret['vlan_id'] = get_attr(ifla_data, 'IFLA_VLAN_ID')
ret['vlan_id'] = linux_utils.get_attr(ifla_data,
'IFLA_VLAN_ID')
elif ret['kind'] == 'bridge':
ret['stp'] = get_attr(ifla_data, 'IFLA_BR_STP_STATE')
ret['forward_delay'] = get_attr(ifla_data,
'IFLA_BR_FORWARD_DELAY')
ret['stp'] = linux_utils.get_attr(ifla_data,
'IFLA_BR_STP_STATE')
ret['forward_delay'] = linux_utils.get_attr(
ifla_data, 'IFLA_BR_FORWARD_DELAY')
retval[device['index']] = ret
for device in retval.values():
@ -1519,8 +1519,8 @@ def ip_monitor(namespace, queue, event_stop, event_started):
cache_devices = {}
with privileged.get_iproute(namespace) as ip:
for device in ip.get_links():
cache_devices[device['index']] = get_attr(device,
'IFLA_IFNAME')
cache_devices[device['index']] = linux_utils.get_attr(
device, 'IFLA_IFNAME')
_ip = privileged.get_iproute(namespace)
ip_updates_thread = threading.Thread(target=read_ip_updates,
args=(_ip, _queue))
@ -1570,7 +1570,7 @@ def list_ip_routes(namespace, ip_version, scope=None, via=None, table=None,
"""List IP routes"""
def get_device(index, devices):
for device in (d for d in devices if d['index'] == index):
return get_attr(device, 'IFLA_IFNAME')
return linux_utils.get_attr(device, 'IFLA_IFNAME')
def get_proto(proto_number):
if isinstance(proto_number, int) and proto_number in rtnl.rt_proto:
@ -1587,37 +1587,38 @@ def list_ip_routes(namespace, ip_version, scope=None, via=None, table=None,
devices = privileged.get_link_devices(namespace)
ret = []
for route in routes:
cidr = get_attr(route, 'RTA_DST')
cidr = linux_utils.get_attr(route, 'RTA_DST')
if cidr:
cidr = '%s/%s' % (cidr, route['dst_len'])
else:
cidr = constants.IP_ANY[ip_version]
table = int(get_attr(route, 'RTA_TABLE'))
metric = (get_attr(route, 'RTA_PRIORITY') or
table = int(linux_utils.get_attr(route, 'RTA_TABLE'))
metric = (linux_utils.get_attr(route, 'RTA_PRIORITY') or
IP_ROUTE_METRIC_DEFAULT[ip_version])
proto = get_proto(route['proto'])
value = {
'table': IP_RULE_TABLES_NAMES.get(table, table),
'source_prefix': get_attr(route, 'RTA_PREFSRC'),
'source_prefix': linux_utils.get_attr(route, 'RTA_PREFSRC'),
'cidr': cidr,
'scope': IP_ADDRESS_SCOPE[int(route['scope'])],
'metric': metric,
'proto': proto,
}
multipath = get_attr(route, 'RTA_MULTIPATH')
multipath = linux_utils.get_attr(route, 'RTA_MULTIPATH')
if multipath:
value['device'] = None
mp_via = []
for mp in multipath:
mp_via.append({'device': get_device(int(mp['oif']), devices),
'via': get_attr(mp, 'RTA_GATEWAY'),
'via': linux_utils.get_attr(mp, 'RTA_GATEWAY'),
'weight': int(mp['hops']) + 1})
value['via'] = mp_via
else:
value['device'] = get_device(int(get_attr(route, 'RTA_OIF')),
devices)
value['via'] = get_attr(route, 'RTA_GATEWAY')
value['device'] = get_device(
int(linux_utils.get_attr(route, 'RTA_OIF')),
devices)
value['via'] = linux_utils.get_attr(route, 'RTA_GATEWAY')
ret.append(value)

View File

@ -29,6 +29,7 @@ from pyroute2.netlink.rtnl.tcmsg import common \
from neutron._i18n import _
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils as linux_utils
from neutron.common import utils
from neutron.privileged.agent.linux import tc_lib as priv_tc_lib
@ -111,25 +112,6 @@ def convert_to_kilobits(value, base):
return utils.bits_to_kilobits(bits_value, base)
def _get_attr(pyroute2_obj, attr_name):
"""Get an attribute in a pyroute object
pyroute2 object attributes are stored under a key called 'attrs'. This key
contains a tuple of tuples. E.g.:
pyroute2_obj = {'attrs': (('TCA_KIND': 'htb'),
('TCA_OPTIONS': {...}))}
:param pyroute2_obj: (dict) pyroute2 object
:param attr_name: (string) first value of the tuple we are looking for
:return: (object) second value of the tuple, None if the tuple doesn't
exist
"""
rule_attrs = pyroute2_obj.get('attrs', [])
for attr in (attr for attr in rule_attrs if attr[0] == attr_name):
return attr[1]
return
def _get_tbf_burst_value(rate, burst_limit, kernel_hz):
min_burst_value = float(rate) / float(kernel_hz)
return max(min_burst_value, burst_limit)
@ -366,13 +348,13 @@ def list_tc_qdiscs(device, namespace=None):
retval = []
for qdisc in qdiscs:
qdisc_attrs = {
'qdisc_type': _get_attr(qdisc, 'TCA_KIND'),
'qdisc_type': linux_utils.get_attr(qdisc, 'TCA_KIND'),
'parent': TC_QDISC_PARENT_NAME.get(
qdisc['parent'], _handle_from_hex_to_string(qdisc['parent'])),
'handle': _handle_from_hex_to_string(qdisc['handle'])}
if qdisc_attrs['qdisc_type'] == 'tbf':
tca_options = _get_attr(qdisc, 'TCA_OPTIONS')
tca_tbf_parms = _get_attr(tca_options, 'TCA_TBF_PARMS')
tca_options = linux_utils.get_attr(qdisc, 'TCA_OPTIONS')
tca_tbf_parms = linux_utils.get_attr(tca_options, 'TCA_TBF_PARMS')
qdisc_attrs['max_kbps'] = int(tca_tbf_parms['rate'] * 8 / 1000)
burst_bytes = _calc_burst(tca_tbf_parms['rate'],
tca_tbf_parms['buffer'])
@ -459,8 +441,8 @@ def list_tc_policy_class(device, namespace=None):
if qdisc_type not in TC_QDISC_TYPES:
return None, None, None
tca_params = _get_attr(tca_options,
'TCA_' + qdisc_type.upper() + '_PARMS')
tca_params = linux_utils.get_attr(
tca_options, 'TCA_' + qdisc_type.upper() + '_PARMS')
burst_kb = int(
_calc_burst(tca_params['rate'], tca_params['buffer']) * 8 / 1000)
max_kbps = int(tca_params['ceil'] * 8 / 1000)
@ -475,8 +457,8 @@ def list_tc_policy_class(device, namespace=None):
parent = TC_QDISC_PARENT_NAME.get(
tc_class['parent'], _handle_from_hex_to_string(tc_class['parent']))
classid = _handle_from_hex_to_string(tc_class['handle'])
qdisc_type = _get_attr(tc_class, 'TCA_KIND')
tca_options = _get_attr(tc_class, 'TCA_OPTIONS')
qdisc_type = linux_utils.get_attr(tc_class, 'TCA_KIND')
tca_options = linux_utils.get_attr(tc_class, 'TCA_OPTIONS')
max_kbps, min_kbps, burst_kb = get_params(tca_options, qdisc_type)
tc_class_data = {'device': device,
'index': index,
@ -487,7 +469,7 @@ def list_tc_policy_class(device, namespace=None):
'min_kbps': min_kbps,
'max_kbps': max_kbps,
'burst_kb': burst_kb}
tca_stats = _get_attr(tc_class, 'TCA_STATS')
tca_stats = linux_utils.get_attr(tc_class, 'TCA_STATS')
if tca_stats:
tc_class_data['stats'] = tca_stats
classes.append(tc_class_data)
@ -583,10 +565,10 @@ def list_tc_filters(device, parent, namespace=None):
filters = priv_tc_lib.list_tc_filters(device, parent, namespace=namespace)
retval = []
for filter in filters:
tca_options = _get_attr(filter, 'TCA_OPTIONS')
tca_options = linux_utils.get_attr(filter, 'TCA_OPTIONS')
if not tca_options:
continue
tca_u32_sel = _get_attr(tca_options, 'TCA_U32_SEL')
tca_u32_sel = linux_utils.get_attr(tca_options, 'TCA_U32_SEL')
if not tca_u32_sel:
continue
keys = []
@ -602,9 +584,10 @@ def list_tc_filters(device, parent, namespace=None):
value = {'keys': keys}
tca_u32_police = _get_attr(tca_options, 'TCA_U32_POLICE')
tca_u32_police = linux_utils.get_attr(tca_options, 'TCA_U32_POLICE')
if tca_u32_police:
tca_police_tbf = _get_attr(tca_u32_police, 'TCA_POLICE_TBF')
tca_police_tbf = linux_utils.get_attr(tca_u32_police,
'TCA_POLICE_TBF')
if tca_police_tbf:
value['rate_kbps'] = int(tca_police_tbf['rate'] * 8 / 1000)
value['burst_kb'] = int(

View File

@ -474,3 +474,21 @@ class UnixDomainWSGIServer(wsgi.Server):
protocol=UnixDomainHttpProtocol,
log=logger,
log_format=cfg.CONF.wsgi_log_format)
def get_attr(pyroute2_obj, attr_name):
"""Get an attribute in a pyroute object
pyroute2 object attributes are stored under a key called 'attrs'. This key
contains a tuple of tuples. E.g.:
pyroute2_obj = {'attrs': (('TCA_KIND': 'htb'),
('TCA_OPTIONS': {...}))}
:param pyroute2_obj: (dict) pyroute2 object
:param attr_name: (string) first value of the tuple we are looking for
:return: (object) second value of the tuple, None if the tuple doesn't
exist
"""
rule_attrs = pyroute2_obj.get('attrs', [])
for attr in (attr for attr in rule_attrs if attr[0] == attr_name):
return attr[1]

View File

@ -15,6 +15,8 @@
import ctypes
from ctypes import util as ctypes_util
from pyroute2 import netlink
_CDLL = None
@ -30,3 +32,26 @@ def get_cdll():
# Check https://bugs.launchpad.net/neutron/+bug/1870352
_CDLL = ctypes.PyDLL(ctypes_util.find_library('c'), use_errno=True)
return _CDLL
def make_serializable(value):
"""Make a pyroute2 object serializable
This function converts 'netlink.nla_slot' object (key, value) in a list
of two elements.
"""
def _ensure_string(value):
return value.decode() if isinstance(value, bytes) else value
if isinstance(value, list):
return [make_serializable(item) for item in value]
elif isinstance(value, netlink.nla_slot):
return [_ensure_string(value[0]), make_serializable(value[1])]
elif isinstance(value, netlink.nla_base):
return make_serializable(value.dump())
elif isinstance(value, dict):
return {_ensure_string(key): make_serializable(data)
for key, data in value.items()}
elif isinstance(value, tuple):
return tuple(make_serializable(item) for item in value)
return _ensure_string(value)

View File

@ -0,0 +1,30 @@
# Copyright 2022 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyroute2 import devlink
from neutron import privileged
from neutron.privileged.agent import linux as priv_linux
@privileged.default.entrypoint
def get_port_list():
dl = None
try:
dl = devlink.DL()
return priv_linux.make_serializable(dl.port_list())
finally:
if dl:
dl.close()

View File

@ -608,29 +608,6 @@ def list_netns(**kwargs):
return netns.listnetns(**kwargs)
def make_serializable(value):
"""Make a pyroute2 object serializable
This function converts 'netlink.nla_slot' object (key, value) in a list
of two elements.
"""
def _ensure_string(value):
return value.decode() if isinstance(value, bytes) else value
if isinstance(value, list):
return [make_serializable(item) for item in value]
elif isinstance(value, netlink.nla_slot):
return [_ensure_string(value[0]), make_serializable(value[1])]
elif isinstance(value, netlink.nla_base):
return make_serializable(value.dump())
elif isinstance(value, dict):
return {_ensure_string(key): make_serializable(data)
for key, data in value.items()}
elif isinstance(value, tuple):
return tuple(make_serializable(item) for item in value)
return _ensure_string(value)
@tenacity.retry(
retry=tenacity.retry_if_exception_type(
netlink_exceptions.NetlinkDumpInterrupted),
@ -645,7 +622,7 @@ def get_link_devices(namespace, **kwargs):
"""
try:
with get_iproute(namespace) as ip:
return make_serializable(ip.get_links(**kwargs))
return priv_linux.make_serializable(ip.get_links(**kwargs))
except OSError as e:
if e.errno == errno.ENOENT:
raise NetworkNamespaceNotFound(netns_name=namespace)
@ -681,7 +658,7 @@ def get_ip_addresses(namespace, **kwargs):
"""
try:
with get_iproute(namespace) as ip:
return make_serializable(ip.get_addr(**kwargs))
return priv_linux.make_serializable(ip.get_addr(**kwargs))
except OSError as e:
if e.errno == errno.ENOENT:
raise NetworkNamespaceNotFound(netns_name=namespace)
@ -699,7 +676,7 @@ def list_ip_rules(namespace, ip_version, match=None, **kwargs):
"""List all IP rules"""
try:
with get_iproute(namespace) as ip:
rules = make_serializable(ip.get_rules(
rules = priv_linux.make_serializable(ip.get_rules(
family=_IP_VERSION_FAMILY_MAP[ip_version],
match=match, **kwargs))
for rule in rules:
@ -824,7 +801,7 @@ def list_ip_routes(namespace, ip_version, device=None, table=None, **kwargs):
None))
try:
with get_iproute(namespace) as ip:
return make_serializable(ip.route('show', **kwargs))
return priv_linux.make_serializable(ip.route('show', **kwargs))
except OSError as e:
if e.errno == errno.ENOENT:
raise NetworkNamespaceNotFound(netns_name=namespace)
@ -858,7 +835,7 @@ def list_bridge_fdb(namespace=None, **kwargs):
# NOTE(ralonsoh): fbd does not support ifindex filtering in pyroute2 0.5.14
try:
with get_iproute(namespace) as ip:
return make_serializable(ip.fdb('dump', **kwargs))
return priv_linux.make_serializable(ip.fdb('dump', **kwargs))
except OSError as e:
if e.errno == errno.ENOENT:
raise NetworkNamespaceNotFound(netns_name=namespace)
@ -873,7 +850,7 @@ def _command_bridge_fdb(command, mac, device, dst_ip=None, namespace=None,
if dst_ip:
kwargs['dst'] = dst_ip
with get_iproute(namespace) as ip:
return make_serializable(ip.fdb(command, **kwargs))
return priv_linux.make_serializable(ip.fdb(command, **kwargs))
except OSError as e:
if e.errno == errno.ENOENT:
raise NetworkNamespaceNotFound(netns_name=namespace)

View File

@ -22,6 +22,7 @@ from pyroute2 import protocols \
from neutron._i18n import _
from neutron import privileged
from neutron.privileged.agent import linux as priv_linux
from neutron.privileged.agent.linux import ip_lib
@ -58,7 +59,7 @@ def list_tc_qdiscs(device, namespace=None):
index = ip_lib.get_link_id(device, namespace)
try:
with ip_lib.get_iproute(namespace) as ip:
return ip_lib.make_serializable(ip.get_qdiscs(index=index))
return priv_linux.make_serializable(ip.get_qdiscs(index=index))
except OSError as e:
if e.errno == errno.ENOENT:
raise ip_lib.NetworkNamespaceNotFound(netns_name=namespace)
@ -119,7 +120,7 @@ def list_tc_policy_classes(device, namespace=None):
try:
index = ip_lib.get_link_id(device, namespace)
with ip_lib.get_iproute(namespace) as ip:
return ip_lib.make_serializable(ip.get_classes(index=index))
return priv_linux.make_serializable(ip.get_classes(index=index))
except OSError as e:
if e.errno == errno.ENOENT:
raise ip_lib.NetworkNamespaceNotFound(netns_name=namespace)
@ -196,7 +197,7 @@ def list_tc_filters(device, parent, namespace=None, **kwargs):
try:
index = ip_lib.get_link_id(device, namespace)
with ip_lib.get_iproute(namespace) as ip:
return ip_lib.make_serializable(
return priv_linux.make_serializable(
ip.get_filters(index=index, parent=parent, **kwargs))
except OSError as e:
if e.errno == errno.ENOENT:

View File

@ -26,6 +26,7 @@ from pyroute2.netlink import rtnl
import testtools
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils as linux_utils
from neutron.common import utils as common_utils
from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
from neutron.tests.common import net_helpers
@ -79,9 +80,9 @@ class GetDevicesInfoTestCase(functional_base.BaseSudoTestCase):
devices = priv_ip_lib.get_link_devices(self.namespace)
self.assertGreater(len(devices), 0)
for device in devices:
if ip_lib.get_attr(device, 'IFLA_IFNAME') != 'lo':
if linux_utils.get_attr(device, 'IFLA_IFNAME') != 'lo':
continue
self.assertIsNone(ip_lib.get_attr(device, 'IFLA_LINKINFO'))
self.assertIsNone(linux_utils.get_attr(device, 'IFLA_LINKINFO'))
break
else:
self.fail('Device "lo" not found')
@ -94,13 +95,14 @@ class GetDevicesInfoTestCase(functional_base.BaseSudoTestCase):
devices = priv_ip_lib.get_link_devices(self.namespace)
self.assertGreater(len(devices), 0)
for device in devices:
name = ip_lib.get_attr(device, 'IFLA_IFNAME')
name = linux_utils.get_attr(device, 'IFLA_IFNAME')
if name in self.interfaces_to_exclude:
continue
self.assertIn(name, self.interfaces)
ifla_linkinfo = ip_lib.get_attr(device, 'IFLA_LINKINFO')
self.assertEqual(ip_lib.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND'),
'dummy')
ifla_linkinfo = linux_utils.get_attr(device, 'IFLA_LINKINFO')
self.assertEqual(
linux_utils.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND'),
'dummy')
interfaces_tested.append(name)
self.assertEqual(sorted(interfaces_tested), sorted(self.interfaces))
@ -121,27 +123,29 @@ class GetDevicesInfoTestCase(functional_base.BaseSudoTestCase):
self.assertGreater(len(devices), 0)
device_name_index = {}
for device in devices:
name = ip_lib.get_attr(device, 'IFLA_IFNAME')
name = linux_utils.get_attr(device, 'IFLA_IFNAME')
device_name_index[name] = device['index']
for device in devices:
name = ip_lib.get_attr(device, 'IFLA_IFNAME')
name = linux_utils.get_attr(device, 'IFLA_IFNAME')
if name in self.interfaces_to_exclude:
continue
self.assertIn(name, self.interfaces + vlan_interfaces)
ifla_linkinfo = ip_lib.get_attr(device, 'IFLA_LINKINFO')
ifla_linkinfo = linux_utils.get_attr(device, 'IFLA_LINKINFO')
if name in vlan_interfaces:
self.assertEqual(
ip_lib.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND'), 'vlan')
ifla_infodata = ip_lib.get_attr(ifla_linkinfo,
'IFLA_INFO_DATA')
linux_utils.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND'),
'vlan')
ifla_infodata = linux_utils.get_attr(ifla_linkinfo,
'IFLA_INFO_DATA')
vlan_id = int(name.split('_')[-1])
self.assertEqual(
ip_lib.get_attr(ifla_infodata, 'IFLA_VLAN_ID'), vlan_id)
linux_utils.get_attr(ifla_infodata, 'IFLA_VLAN_ID'),
vlan_id)
vlan_link_name = self.interfaces[vlan_interfaces.index(name)]
vlan_link_index = device_name_index[vlan_link_name]
self.assertEqual(vlan_link_index, ip_lib.get_attr(device,
'IFLA_LINK'))
self.assertEqual(vlan_link_index,
linux_utils.get_attr(device, 'IFLA_LINK'))
interfaces_tested.append(name)
self.assertEqual(sorted(interfaces_tested),
sorted(self.interfaces + vlan_interfaces))
@ -164,39 +168,40 @@ class GetDevicesInfoTestCase(functional_base.BaseSudoTestCase):
self.assertGreater(len(devices), 0)
device_name_index = {}
for device in devices:
name = ip_lib.get_attr(device, 'IFLA_IFNAME')
name = linux_utils.get_attr(device, 'IFLA_IFNAME')
device_name_index[name] = device['index']
for device in devices:
name = ip_lib.get_attr(device, 'IFLA_IFNAME')
name = linux_utils.get_attr(device, 'IFLA_IFNAME')
if name in self.interfaces_to_exclude:
continue
self.assertIn(name, self.interfaces + vxlan_interfaces)
ifla_linkinfo = ip_lib.get_attr(device, 'IFLA_LINKINFO')
ifla_linkinfo = linux_utils.get_attr(device, 'IFLA_LINKINFO')
if name in vxlan_interfaces:
self.assertEqual(
ip_lib.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND'),
linux_utils.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND'),
'vxlan')
ifla_infodata = ip_lib.get_attr(ifla_linkinfo,
'IFLA_INFO_DATA')
ifla_infodata = linux_utils.get_attr(ifla_linkinfo,
'IFLA_INFO_DATA')
vxlan_id = int(name.split('_')[-1])
self.assertEqual(
ip_lib.get_attr(ifla_infodata, 'IFLA_VXLAN_ID'), vxlan_id)
linux_utils.get_attr(ifla_infodata, 'IFLA_VXLAN_ID'),
vxlan_id)
self.assertEqual(
ip_lib.get_attr(ifla_infodata, 'IFLA_VXLAN_GROUP'),
linux_utils.get_attr(ifla_infodata, 'IFLA_VXLAN_GROUP'),
'239.1.1.1')
vxlan_link_name = self.interfaces[vxlan_interfaces.index(name)]
vxlan_link_index = device_name_index[vxlan_link_name]
self.assertEqual(
vxlan_link_index,
ip_lib.get_attr(ifla_infodata, 'IFLA_VXLAN_LINK'))
linux_utils.get_attr(ifla_infodata, 'IFLA_VXLAN_LINK'))
interfaces_tested.append(name)
self.assertEqual(sorted(interfaces_tested),
sorted(self.interfaces + vxlan_interfaces))
def _retrieve_interface(self, interface_name, namespace):
for device in priv_ip_lib.get_link_devices(namespace):
if interface_name == ip_lib.get_attr(device, 'IFLA_IFNAME'):
if interface_name == linux_utils.get_attr(device, 'IFLA_IFNAME'):
return device
else:
self.fail('Interface "%s" not found' % interface_name)
@ -216,14 +221,14 @@ class GetDevicesInfoTestCase(functional_base.BaseSudoTestCase):
veth1_1 = self._retrieve_interface('veth1_1', self.namespace)
veth1_2 = self._retrieve_interface('veth1_2', namespace2)
ifla_linkinfo = ip_lib.get_attr(veth1_1, 'IFLA_LINKINFO')
self.assertEqual(ip_lib.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND'),
ifla_linkinfo = linux_utils.get_attr(veth1_1, 'IFLA_LINKINFO')
self.assertEqual(linux_utils.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND'),
'veth')
# NOTE(ralonsoh): since kernel_version=4.15.0-60-generic, iproute2
# provides the veth pair index, even if the pair interface is in other
# namespace. In previous versions, the parameter 'IFLA_LINK' was not
# present. We need to handle both cases.
self.assertIn(ip_lib.get_attr(veth1_1, 'IFLA_LINK'),
self.assertIn(linux_utils.get_attr(veth1_1, 'IFLA_LINK'),
[None, veth1_2['index']])
def test_get_devices_info_veth_same_namespaces(self):
@ -233,8 +238,8 @@ class GetDevicesInfoTestCase(functional_base.BaseSudoTestCase):
veth1_1 = self._retrieve_interface('veth1_1', self.namespace)
veth1_2 = self._retrieve_interface('veth1_2', self.namespace)
veth1_1_link = ip_lib.get_attr(veth1_1, 'IFLA_LINK')
veth1_2_link = ip_lib.get_attr(veth1_2, 'IFLA_LINK')
veth1_1_link = linux_utils.get_attr(veth1_1, 'IFLA_LINK')
veth1_2_link = linux_utils.get_attr(veth1_2, 'IFLA_LINK')
self.assertEqual(veth1_1['index'], veth1_2_link)
self.assertEqual(veth1_2['index'], veth1_1_link)
@ -464,7 +469,7 @@ class GetIpAddressesTestCase(functional_base.BaseSudoTestCase):
ip_addresses = priv_ip_lib.get_ip_addresses(namespace)
for ip_address in ip_addresses:
int_name = str(ip_address['index'])
ip = ip_lib.get_attr(ip_address, 'IFA_ADDRESS')
ip = linux_utils.get_attr(ip_address, 'IFA_ADDRESS')
mask = ip_address['prefixlen']
cidr = common_utils.ip_to_cidr(ip, mask)
self.assertEqual(interfaces[int_name]['cidr'], cidr)
@ -484,10 +489,11 @@ class RouteTestCase(functional_base.BaseSudoTestCase):
def _check_gateway_or_multipath(self, route, gateway):
if gateway is None or isinstance(gateway, str):
self.assertEqual(gateway, ip_lib.get_attr(route, 'RTA_GATEWAY'))
self.assertEqual(gateway,
linux_utils.get_attr(route, 'RTA_GATEWAY'))
return
rta_multipath = ip_lib.get_attr(route, 'RTA_MULTIPATH')
rta_multipath = linux_utils.get_attr(route, 'RTA_MULTIPATH')
self.assertEqual(len(gateway), len(rta_multipath))
for nexthop in gateway:
to_check = {'hops': 0}
@ -500,7 +506,7 @@ class RouteTestCase(functional_base.BaseSudoTestCase):
to_check['gateway'] = nexthop['via']
for mp in rta_multipath:
mp['gateway'] = ip_lib.get_attr(mp, 'RTA_GATEWAY')
mp['gateway'] = linux_utils.get_attr(mp, 'RTA_GATEWAY')
for key in to_check:
if to_check[key] != mp[key]:
break
@ -524,7 +530,7 @@ class RouteTestCase(functional_base.BaseSudoTestCase):
scope = 0
routes = priv_ip_lib.list_ip_routes(self.namespace, ip_version)
for route in routes:
ip = ip_lib.get_attr(route, 'RTA_DST')
ip = linux_utils.get_attr(route, 'RTA_DST')
mask = route['dst_len']
if not (ip == str(netaddr.IPNetwork(cidr).ip) and
mask == netaddr.IPNetwork(cidr).cidr.prefixlen):
@ -535,7 +541,7 @@ class RouteTestCase(functional_base.BaseSudoTestCase):
route['family'])
self._check_gateway_or_multipath(route, gateway)
self.assertEqual(metric,
ip_lib.get_attr(route, 'RTA_PRIORITY'))
linux_utils.get_attr(route, 'RTA_PRIORITY'))
self.assertEqual(scope, route['scope'])
self.assertEqual(rtnl.rt_proto[proto], route['proto'])
break
@ -550,18 +556,18 @@ class RouteTestCase(functional_base.BaseSudoTestCase):
scope = 0
routes = priv_ip_lib.list_ip_routes(self.namespace, ip_version)
for route in routes:
if not (ip_lib.get_attr(route, 'RTA_GATEWAY') == gateway):
if not (linux_utils.get_attr(route, 'RTA_GATEWAY') == gateway):
continue
self.assertEqual(table, route['table'])
self.assertEqual(
priv_ip_lib._IP_VERSION_FAMILY_MAP[ip_version],
route['family'])
self.assertEqual(gateway,
ip_lib.get_attr(route, 'RTA_GATEWAY'))
linux_utils.get_attr(route, 'RTA_GATEWAY'))
self.assertEqual(scope, route['scope'])
self.assertEqual(0, route['dst_len'])
self.assertEqual(metric,
ip_lib.get_attr(route, 'RTA_PRIORITY'))
linux_utils.get_attr(route, 'RTA_PRIORITY'))
break
else:
self.fail('Default gateway %s not found in the list of routes'
@ -680,8 +686,8 @@ class GetLinkAttributesTestCase(functional_base.BaseSudoTestCase):
self.pyroute_dev = self.pyroute_dev[0]
def test_get_link_attribute_kind(self):
ifla_linkinfo = ip_lib.get_attr(self.pyroute_dev, 'IFLA_LINKINFO')
ifla_link_kind = ip_lib.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND')
ifla_linkinfo = linux_utils.get_attr(self.pyroute_dev, 'IFLA_LINKINFO')
ifla_link_kind = linux_utils.get_attr(ifla_linkinfo, 'IFLA_INFO_KIND')
self.assertEqual('dummy', ifla_link_kind)
self.assertEqual(ifla_link_kind, self.device.link.link_kind)

View File

@ -19,6 +19,7 @@ import pyroute2
from pyroute2.netlink import rtnl
from neutron.agent.linux import tc_lib
from neutron.agent.linux import utils as linux_utils
from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
from neutron.privileged.agent.linux import tc_lib as priv_tc_lib
from neutron.tests.functional import base as functional_base
@ -46,7 +47,7 @@ class TcQdiscTestCase(functional_base.BaseSudoTestCase):
self.assertEqual(1, len(qdiscs))
self.assertEqual(rtnl.TC_H_ROOT, qdiscs[0]['parent'])
self.assertEqual(0x50000, qdiscs[0]['handle'])
self.assertEqual('htb', tc_lib._get_attr(qdiscs[0], 'TCA_KIND'))
self.assertEqual('htb', linux_utils.get_attr(qdiscs[0], 'TCA_KIND'))
priv_tc_lib.delete_tc_qdisc(self.device, rtnl.TC_H_ROOT,
namespace=self.namespace)
@ -63,7 +64,7 @@ class TcQdiscTestCase(functional_base.BaseSudoTestCase):
self.assertEqual(1, len(qdiscs))
self.assertEqual(rtnl.TC_H_ROOT, qdiscs[0]['parent'])
self.assertEqual(0, qdiscs[0]['handle'] & 0xFFFF)
self.assertEqual('htb', tc_lib._get_attr(qdiscs[0], 'TCA_KIND'))
self.assertEqual('htb', linux_utils.get_attr(qdiscs[0], 'TCA_KIND'))
priv_tc_lib.delete_tc_qdisc(self.device, parent=rtnl.TC_H_ROOT,
namespace=self.namespace)
@ -82,9 +83,9 @@ class TcQdiscTestCase(functional_base.BaseSudoTestCase):
namespace=self.namespace)
self.assertEqual(1, len(qdiscs))
self.assertEqual(rtnl.TC_H_ROOT, qdiscs[0]['parent'])
self.assertEqual('tbf', tc_lib._get_attr(qdiscs[0], 'TCA_KIND'))
tca_options = tc_lib._get_attr(qdiscs[0], 'TCA_OPTIONS')
tca_tbf_parms = tc_lib._get_attr(tca_options, 'TCA_TBF_PARMS')
self.assertEqual('tbf', linux_utils.get_attr(qdiscs[0], 'TCA_KIND'))
tca_options = linux_utils.get_attr(qdiscs[0], 'TCA_OPTIONS')
tca_tbf_parms = linux_utils.get_attr(tca_options, 'TCA_TBF_PARMS')
self.assertEqual(rate, tca_tbf_parms['rate'])
self.assertEqual(burst, tc_lib._calc_burst(tca_tbf_parms['rate'],
tca_tbf_parms['buffer']))
@ -103,7 +104,8 @@ class TcQdiscTestCase(functional_base.BaseSudoTestCase):
qdiscs = priv_tc_lib.list_tc_qdiscs(self.device,
namespace=self.namespace)
self.assertEqual(1, len(qdiscs))
self.assertEqual('ingress', tc_lib._get_attr(qdiscs[0], 'TCA_KIND'))
self.assertEqual('ingress', linux_utils.get_attr(qdiscs[0],
'TCA_KIND'))
self.assertEqual(rtnl.TC_H_INGRESS, qdiscs[0]['parent'])
self.assertEqual(0xffff0000, qdiscs[0]['handle'])
@ -139,7 +141,8 @@ class TcQdiscTestCase(functional_base.BaseSudoTestCase):
qdiscs = priv_tc_lib.list_tc_qdiscs(self.device,
namespace=self.namespace)
self.assertEqual(1, len(qdiscs))
self.assertEqual('ingress', tc_lib._get_attr(qdiscs[0], 'TCA_KIND'))
self.assertEqual('ingress', linux_utils.get_attr(qdiscs[0],
'TCA_KIND'))
self.assertIsNone(
priv_tc_lib.delete_tc_qdisc(self.device, kind='ingress',
namespace=self.namespace))
@ -185,8 +188,8 @@ class TcPolicyClassTestCase(functional_base.BaseSudoTestCase):
self.assertEqual(len(self.CLASSES), len(tc_classes))
for tc_class in tc_classes:
handle = tc_lib._handle_from_hex_to_string(tc_class['handle'])
tca_options = tc_lib._get_attr(tc_class, 'TCA_OPTIONS')
tca_htb_params = tc_lib._get_attr(tca_options, 'TCA_HTB_PARMS')
tca_options = linux_utils.get_attr(tc_class, 'TCA_OPTIONS')
tca_htb_params = linux_utils.get_attr(tca_options, 'TCA_HTB_PARMS')
self.assertEqual(self.CLASSES[handle]['rate'],
tca_htb_params['rate'])
self.assertEqual(self.CLASSES[handle]['ceil'],

View File

@ -0,0 +1,134 @@
# Copyright 2022 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from pyroute2.netlink import nlsocket
from neutron.agent.linux import devlink
from neutron.privileged.agent import linux as priv_linux
from neutron.privileged.agent.linux import devlink as priv_devlink
from neutron.tests import base
GET_PORT_LIST = (
{'cmd': 3,
'version': 1,
'reserved': 0,
'attrs': [
('DEVLINK_ATTR_BUS_NAME', 'pci'),
('DEVLINK_ATTR_DEV_NAME', '0000:04:00.0'),
('DEVLINK_ATTR_PORT_INDEX', 65535),
('DEVLINK_ATTR_PORT_TYPE', 2),
('DEVLINK_ATTR_PORT_NETDEV_IFINDEX', 6),
('DEVLINK_ATTR_PORT_NETDEV_NAME', 'enp4s0f0np0'),
('DEVLINK_ATTR_PORT_SPLITTABLE', 0),
('DEVLINK_ATTR_PORT_FLAVOUR', 0),
('DEVLINK_ATTR_PORT_NUMBER', 0)
],
'header': {
'length': 112,
'type': 20,
'flags': 2,
'sequence_number': 258,
'pid': 448943,
'error': None,
'target': 'localhost',
'stats': nlsocket.Stats(qsize=0, delta=0, delay=0)
},
'event': 'DEVLINK_CMD_NEW'},
{'cmd': 3,
'version': 1,
'reserved': 0,
'attrs': [
('DEVLINK_ATTR_BUS_NAME', 'pci'),
('DEVLINK_ATTR_DEV_NAME', '0000:04:00.0'),
('DEVLINK_ATTR_PORT_INDEX', 1),
('DEVLINK_ATTR_PORT_TYPE', 2),
('DEVLINK_ATTR_PORT_NETDEV_IFINDEX', 14),
('DEVLINK_ATTR_PORT_NETDEV_NAME', 'enp4s0f0np0_0'),
('DEVLINK_ATTR_PORT_SPLITTABLE', 0),
('DEVLINK_ATTR_PORT_FLAVOUR', 4),
('UNKNOWN', {'header': {'length': 8, 'type': 150}}),
('DEVLINK_ATTR_PORT_PCI_PF_NUMBER', 0),
('DEVLINK_ATTR_PORT_PCI_VF_NUMBER', 0),
('UNKNOWN', {'header': {'length': 5, 'type': 149}}),
('DEVLINK_ATTR_PORT_FUNCTION', {
'attrs': [('DEVLINK_ATTR_BUS_NAME', '')]
})
],
'header': {
'length': 156,
'type': 20,
'flags': 2,
'sequence_number': 258,
'pid': 448943,
'error': None,
'target': 'localhost',
'stats': nlsocket.Stats(qsize=0, delta=0, delay=0)
},
'event': 'DEVLINK_CMD_NEW'},
{'cmd': 3,
'version': 1,
'reserved': 0,
'attrs': [
('DEVLINK_ATTR_BUS_NAME', 'pci'),
('DEVLINK_ATTR_DEV_NAME', '0000:04:00.0'),
('DEVLINK_ATTR_PORT_INDEX', 2),
('DEVLINK_ATTR_PORT_TYPE', 2),
('DEVLINK_ATTR_PORT_NETDEV_IFINDEX', 15),
('DEVLINK_ATTR_PORT_NETDEV_NAME', 'enp4s0f0np0_1'),
('DEVLINK_ATTR_PORT_SPLITTABLE', 0),
('DEVLINK_ATTR_PORT_FLAVOUR', 4),
('UNKNOWN', {'header': {'length': 8, 'type': 150}}),
('DEVLINK_ATTR_PORT_PCI_PF_NUMBER', 0),
('DEVLINK_ATTR_PORT_PCI_VF_NUMBER', 1),
('UNKNOWN', {'header': {'length': 5, 'type': 149}}),
('DEVLINK_ATTR_PORT_FUNCTION', {
'attrs': [('DEVLINK_ATTR_BUS_NAME', '')]
})
],
'header': {
'length': 156,
'type': 20,
'flags': 2,
'sequence_number': 258,
'pid': 448943,
'error': None,
'target': 'localhost',
'stats': nlsocket.Stats(qsize=0, delta=0, delay=0)
},
'event': 'DEVLINK_CMD_NEW'}
)
class TestDevlink(base.BaseTestCase):
@mock.patch.object(priv_devlink, 'get_port_list')
def test_get_port(self, mock_get_port_list):
mock_get_port_list.return_value = priv_linux.make_serializable(
GET_PORT_LIST)
expected_port1 = {'pf_pci': '0000:04:00.0', 'pf_num': 0,
'pf_name': 'enp4s0f0np0', 'vf_num': 0,
'vf_name': 'enp4s0f0np0_0'}
expected_port2 = {'pf_pci': '0000:04:00.0', 'pf_num': 0,
'pf_name': 'enp4s0f0np0', 'vf_num': 1,
'vf_name': 'enp4s0f0np0_1'}
port1 = devlink.get_port('enp4s0f0np0_0')
port2 = devlink.get_port('enp4s0f0np0_1')
port3 = devlink.get_port('enp4s0f0np0_not_present')
self.assertEqual(expected_port1, port1)
self.assertEqual(expected_port2, port2)
self.assertIsNone(port3)

View File

@ -0,0 +1,37 @@
# Copyright 2022 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyroute2 import netlink
from pyroute2.netlink.rtnl import ifinfmsg
from neutron.privileged.agent import linux as priv_linux
from neutron.tests import base
class MakeSerializableTestCase(base.BaseTestCase):
NLA_DATA1 = ifinfmsg.ifinfbase.state(data=b'54321')
NLA_DATA2 = ifinfmsg.ifinfbase.state(data=b'abcdef')
INPUT_1 = {'key1': 'value1', b'key2': b'value2', 'key3': ('a', 2),
'key4': [1, 2, 'c'],
b'key5': netlink.nla_slot('nla_name1', NLA_DATA1),
'key6': netlink.nla_slot(b'nla_name2', NLA_DATA2)}
OUTPUT_1 = {'key1': 'value1', 'key2': 'value2', 'key3': ('a', 2),
'key4': [1, 2, 'c'],
'key5': ['nla_name1', '54321'],
'key6': ['nla_name2', 'abcdef']}
def test_make_serializable(self):
self.assertEqual(self.OUTPUT_1,
priv_linux.make_serializable(self.INPUT_1))

View File

@ -18,7 +18,6 @@ from unittest import mock
import pyroute2
from pyroute2 import netlink
from pyroute2.netlink import exceptions as netlink_exceptions
from pyroute2.netlink.rtnl import ifinfmsg
from neutron.privileged.agent.linux import ip_lib as priv_lib
from neutron.tests import base
@ -270,21 +269,3 @@ class IpLibTestCase(base.BaseTestCase):
[mock.call('get', 'device', namespace='namespace', ext_mask=1),
mock.call('get', 'device', namespace='namespace', ext_mask=1)]
)
class MakeSerializableTestCase(base.BaseTestCase):
NLA_DATA1 = ifinfmsg.ifinfbase.state(data=b'54321')
NLA_DATA2 = ifinfmsg.ifinfbase.state(data=b'abcdef')
INPUT_1 = {'key1': 'value1', b'key2': b'value2', 'key3': ('a', 2),
'key4': [1, 2, 'c'],
b'key5': netlink.nla_slot('nla_name1', NLA_DATA1),
'key6': netlink.nla_slot(b'nla_name2', NLA_DATA2)}
OUTPUT_1 = {'key1': 'value1', 'key2': 'value2', 'key3': ('a', 2),
'key4': [1, 2, 'c'],
'key5': ['nla_name1', '54321'],
'key6': ['nla_name2', 'abcdef']}
def test_make_serializable(self):
self.assertEqual(self.OUTPUT_1,
priv_lib.make_serializable(self.INPUT_1))