Merge "Switch ip addr add/del/flush commands to use pyroute2" into stable/queens

This commit is contained in:
Zuul 2020-04-23 10:32:19 +00:00 committed by Gerrit Code Review
commit 447a05f0bf
4 changed files with 274 additions and 56 deletions

View File

@ -587,22 +587,15 @@ class IpAddrCommand(IpDeviceCommandBase):
COMMAND = 'addr' COMMAND = 'addr'
def add(self, cidr, scope='global', add_broadcast=True): def add(self, cidr, scope='global', add_broadcast=True):
net = netaddr.IPNetwork(cidr) add_ip_address(self.name, self._parent.namespace, cidr, scope,
args = ['add', cidr, add_broadcast)
'scope', scope,
'dev', self.name]
if add_broadcast and net.version == 4:
args += ['brd', str(net[-1])]
self._as_root([net.version], tuple(args))
def delete(self, cidr): def delete(self, cidr):
ip_version = common_utils.get_ip_version(cidr) delete_ip_address(self.name, self._parent.namespace, cidr)
self._as_root([ip_version],
('del', cidr,
'dev', self.name))
def flush(self, ip_version): def flush(self, ip_version):
self._as_root([ip_version], ('flush', self.name)) flush_ip_addresses(
self.name, self._parent.namespace, ip_version)
def get_devices_with_ip(self, name=None, scope=None, to=None, def get_devices_with_ip(self, name=None, scope=None, to=None,
filters=None, ip_version=None): filters=None, ip_version=None):
@ -960,6 +953,30 @@ NetworkNamespaceNotFound = privileged.NetworkNamespaceNotFound
NetworkInterfaceNotFound = privileged.NetworkInterfaceNotFound NetworkInterfaceNotFound = privileged.NetworkInterfaceNotFound
def add_ip_address(device, namespace, cidr, scope='global',
add_broadcast=True):
net = netaddr.IPNetwork(cidr)
broadcast = None
if add_broadcast and net.version == 4:
# NOTE(slaweq): in case if cidr is /32 net.broadcast is None so
# same IP address as cidr should be set as broadcast
broadcast = str(net.broadcast or net.ip)
privileged.add_ip_address(
net.version, str(net.ip), net.prefixlen,
device, namespace, scope, broadcast)
def delete_ip_address(device, namespace, cidr):
net = netaddr.IPNetwork(cidr)
privileged.delete_ip_address(
net.version, str(net.ip), net.prefixlen, device, namespace)
def flush_ip_addresses(device, namespace, ip_version):
privileged.flush_ip_addresses(
ip_version, device, namespace)
def get_routing_table(ip_version, namespace=None): def get_routing_table(ip_version, namespace=None):
"""Return a list of dictionaries, each representing a route. """Return a list of dictionaries, each representing a route.

View File

@ -29,7 +29,11 @@ _IP_VERSION_FAMILY_MAP = {4: socket.AF_INET, 6: socket.AF_INET6}
def _get_scope_name(scope): def _get_scope_name(scope):
"""Return the name of the scope (given as a number), or the scope number """Return the name of the scope (given as a number), or the scope number
if the name is unknown. if the name is unknown.
For backward compatibility (with "ip" tool) "global" scope is converted to
"universe" before converting to number
""" """
scope = 'universe' if scope == 'global' else scope
return rtnl.rt_scope.get(scope, scope) return rtnl.rt_scope.get(scope, scope)
@ -86,7 +90,7 @@ def _get_iproute(namespace):
return pyroute2.IPRoute() return pyroute2.IPRoute()
def _run_iproute(command, device, namespace, **kwargs): def _run_iproute_neigh(command, device, namespace, **kwargs):
try: try:
with _get_iproute(namespace) as ip: with _get_iproute(namespace) as ip:
idx = ip.link_lookup(ifname=device)[0] idx = ip.link_lookup(ifname=device)[0]
@ -102,6 +106,65 @@ def _run_iproute(command, device, namespace, **kwargs):
raise raise
def _run_iproute_addr(command, device, namespace, **kwargs):
try:
with _get_iproute(namespace) as ip:
idx = ip.link_lookup(ifname=device)[0]
return ip.addr(command, index=idx, **kwargs)
except IndexError:
msg = _("Network interface %(device)s not found in namespace "
"%(namespace)s.") % {'device': device,
'namespace': namespace}
raise NetworkInterfaceNotFound(msg)
except OSError as e:
if e.errno == errno.ENOENT:
raise NetworkNamespaceNotFound(netns_name=namespace)
raise
@privileged.default.entrypoint
def add_ip_address(ip_version, ip, prefixlen, device, namespace, scope,
broadcast=None):
family = _IP_VERSION_FAMILY_MAP[ip_version]
_run_iproute_addr('add',
device,
namespace,
address=ip,
mask=prefixlen,
family=family,
broadcast=broadcast,
scope=_get_scope_name(scope))
@privileged.default.entrypoint
def delete_ip_address(ip_version, ip, prefixlen, device, namespace):
family = _IP_VERSION_FAMILY_MAP[ip_version]
_run_iproute_addr("delete",
device,
namespace,
address=ip,
mask=prefixlen,
family=family)
@privileged.default.entrypoint
def flush_ip_addresses(ip_version, device, namespace):
family = _IP_VERSION_FAMILY_MAP[ip_version]
try:
with _get_iproute(namespace) as ip:
idx = ip.link_lookup(ifname=device)[0]
ip.flush_addr(index=idx, family=family)
except IndexError:
msg = _("Network interface %(device)s not found in namespace "
"%(namespace)s.") % {'device': device,
'namespace': namespace}
raise NetworkInterfaceNotFound(msg)
except OSError as e:
if e.errno == errno.ENOENT:
raise NetworkNamespaceNotFound(netns_name=namespace)
raise
@privileged.default.entrypoint @privileged.default.entrypoint
def open_namespace(namespace): def open_namespace(namespace):
"""Open namespace to test if the namespace is ready to be manipulated""" """Open namespace to test if the namespace is ready to be manipulated"""
@ -120,14 +183,14 @@ def add_neigh_entry(ip_version, ip_address, mac_address, device, namespace,
:param namespace: The name of the namespace in which to add the entry :param namespace: The name of the namespace in which to add the entry
""" """
family = _IP_VERSION_FAMILY_MAP[ip_version] family = _IP_VERSION_FAMILY_MAP[ip_version]
_run_iproute('replace', _run_iproute_neigh('replace',
device, device,
namespace, namespace,
dst=ip_address, dst=ip_address,
lladdr=mac_address, lladdr=mac_address,
family=family, family=family,
state=ndmsg.states['permanent'], state=ndmsg.states['permanent'],
**kwargs) **kwargs)
@privileged.default.entrypoint @privileged.default.entrypoint
@ -142,13 +205,13 @@ def delete_neigh_entry(ip_version, ip_address, mac_address, device, namespace,
""" """
family = _IP_VERSION_FAMILY_MAP[ip_version] family = _IP_VERSION_FAMILY_MAP[ip_version]
try: try:
_run_iproute('delete', _run_iproute_neigh('delete',
device, device,
namespace, namespace,
dst=ip_address, dst=ip_address,
lladdr=mac_address, lladdr=mac_address,
family=family, family=family,
**kwargs) **kwargs)
except NetlinkError as e: except NetlinkError as e:
# trying to delete a non-existent entry shouldn't raise an error # trying to delete a non-existent entry shouldn't raise an error
if e.code == errno.ENOENT: if e.code == errno.ENOENT:
@ -171,11 +234,11 @@ def dump_neigh_entries(ip_version, device, namespace, **kwargs):
""" """
family = _IP_VERSION_FAMILY_MAP[ip_version] family = _IP_VERSION_FAMILY_MAP[ip_version]
entries = [] entries = []
dump = _run_iproute('dump', dump = _run_iproute_neigh('dump',
device, device,
namespace, namespace,
family=family, family=family,
**kwargs) **kwargs)
for entry in dump: for entry in dump:
attrs = dict(entry['attrs']) attrs = dict(entry['attrs'])

View File

@ -842,37 +842,57 @@ class TestIpAddrCommand(TestIPCmdBase):
self.command = 'addr' self.command = 'addr'
self.addr_cmd = ip_lib.IpAddrCommand(self.parent) self.addr_cmd = ip_lib.IpAddrCommand(self.parent)
def test_add_address(self): @mock.patch.object(priv_lib, 'add_ip_address')
def test_add_address(self, add):
self.addr_cmd.add('192.168.45.100/24') self.addr_cmd.add('192.168.45.100/24')
self._assert_sudo([4], add.assert_called_once_with(
('add', '192.168.45.100/24', 4,
'scope', 'global', '192.168.45.100',
'dev', 'tap0', 24,
'brd', '192.168.45.255')) self.parent.name,
self.addr_cmd._parent.namespace,
'global',
'192.168.45.255')
def test_add_address_scoped(self): @mock.patch.object(priv_lib, 'add_ip_address')
def test_add_address_scoped(self, add):
self.addr_cmd.add('192.168.45.100/24', scope='link') self.addr_cmd.add('192.168.45.100/24', scope='link')
self._assert_sudo([4], add.assert_called_once_with(
('add', '192.168.45.100/24', 4,
'scope', 'link', '192.168.45.100',
'dev', 'tap0', 24,
'brd', '192.168.45.255')) self.parent.name,
self.addr_cmd._parent.namespace,
'link',
'192.168.45.255')
def test_add_address_no_broadcast(self): @mock.patch.object(priv_lib, 'add_ip_address')
def test_add_address_no_broadcast(self, add):
self.addr_cmd.add('192.168.45.100/24', add_broadcast=False) self.addr_cmd.add('192.168.45.100/24', add_broadcast=False)
self._assert_sudo([4], add.assert_called_once_with(
('add', '192.168.45.100/24', 4,
'scope', 'global', '192.168.45.100',
'dev', 'tap0')) 24,
self.parent.name,
self.addr_cmd._parent.namespace,
'global',
None)
def test_del_address(self): @mock.patch.object(priv_lib, 'delete_ip_address')
def test_del_address(self, delete):
self.addr_cmd.delete('192.168.45.100/24') self.addr_cmd.delete('192.168.45.100/24')
self._assert_sudo([4], delete.assert_called_once_with(
('del', '192.168.45.100/24', 'dev', 'tap0')) 4,
'192.168.45.100',
24,
self.parent.name,
self.addr_cmd._parent.namespace)
def test_flush(self): @mock.patch.object(priv_lib, 'flush_ip_addresses')
def test_flush(self, flush):
self.addr_cmd.flush(6) self.addr_cmd.flush(6)
self._assert_sudo([6], ('flush', 'tap0')) flush.assert_called_once_with(
6, self.parent.name, self.addr_cmd._parent.namespace)
def test_list(self): def test_list(self):
expected = [ expected = [
@ -1666,7 +1686,7 @@ class TestIpNeighCommand(TestIPCmdBase):
family=2, family=2,
ifindex=1) ifindex=1)
@mock.patch.object(priv_lib, '_run_iproute') @mock.patch.object(priv_lib, '_run_iproute_neigh')
def test_delete_entry_not_exist(self, mock_run_iproute): def test_delete_entry_not_exist(self, mock_run_iproute):
# trying to delete a non-existent entry shouldn't raise an error # trying to delete a non-existent entry shouldn't raise an error
mock_run_iproute.side_effect = NetlinkError(errno.ENOENT, None) mock_run_iproute.side_effect = NetlinkError(errno.ENOENT, None)

View File

@ -0,0 +1,118 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import mock
import pyroute2
from neutron.privileged.agent.linux import ip_lib as priv_lib
from neutron.tests import base
class IpLibTestCase(base.BaseTestCase):
def _test_run_iproute_neigh(self, namespace=None):
ip_obj = "NetNS" if namespace else "IPRoute"
with mock.patch.object(pyroute2, ip_obj) as ip_mock_cls:
ip_mock = ip_mock_cls()
ip_mock.__enter__().link_lookup.return_value = [2]
priv_lib._run_iproute_neigh("test_cmd", "eth0", namespace,
test_param="test_value")
ip_mock.assert_has_calls([
mock.call.__enter__().link_lookup(ifname="eth0"),
mock.call.__enter__().neigh("test_cmd", ifindex=2,
test_param="test_value")])
def test_run_iproute_neigh_no_namespace(self):
self._test_run_iproute_neigh()
def test_run_iproute_neigh_in_namespace(self):
self._test_run_iproute_neigh(namespace="testns")
def test_run_iproute_neigh_interface_not_exists(self):
with mock.patch.object(pyroute2, "IPRoute") as iproute_mock:
ip_mock = iproute_mock()
ip_mock.__enter__().link_lookup.return_value = []
self.assertRaises(
priv_lib.NetworkInterfaceNotFound,
priv_lib._run_iproute_neigh,
"test_cmd", "eth0", None, test_param="test_value")
def test_run_iproute_neigh_namespace_not_exists(self):
with mock.patch.object(pyroute2, "IPRoute") as iproute_mock:
iproute_mock.side_effect = OSError(
errno.ENOENT, "Test no netns exception")
self.assertRaises(
priv_lib.NetworkNamespaceNotFound,
priv_lib._run_iproute_neigh,
"test_cmd", "eth0", None, test_param="test_value")
def test_run_iproute_neigh_error(self):
with mock.patch.object(pyroute2, "IPRoute") as iproute_mock:
iproute_mock.side_effect = OSError(
errno.EINVAL, "Test invalid argument exception")
try:
priv_lib._run_iproute_neigh(
"test_cmd", "eth0", None, test_param="test_value")
self.fail("OSError exception not raised")
except OSError as e:
self.assertEqual(errno.EINVAL, e.errno)
def _test_run_iproute_addr(self, namespace=None):
ip_obj = "NetNS" if namespace else "IPRoute"
with mock.patch.object(pyroute2, ip_obj) as ip_mock_cls:
ip_mock = ip_mock_cls()
ip_mock.__enter__().link_lookup.return_value = [2]
priv_lib._run_iproute_addr("test_cmd", "eth0", namespace,
test_param="test_value")
ip_mock.assert_has_calls([
mock.call.__enter__().link_lookup(ifname="eth0"),
mock.call.__enter__().addr("test_cmd", index=2,
test_param="test_value")])
def test_run_iproute_addr_no_namespace(self):
self._test_run_iproute_addr()
def test_run_iproute_addr_in_namespace(self):
self._test_run_iproute_addr(namespace="testns")
def test_run_iproute_addr_interface_not_exists(self):
with mock.patch.object(pyroute2, "IPRoute") as iproute_mock:
ip_mock = iproute_mock()
ip_mock.__enter__().link_lookup.return_value = []
self.assertRaises(
priv_lib.NetworkInterfaceNotFound,
priv_lib._run_iproute_addr,
"test_cmd", "eth0", None, test_param="test_value")
def test_run_iproute_addr_namespace_not_exists(self):
with mock.patch.object(pyroute2, "IPRoute") as iproute_mock:
iproute_mock.side_effect = OSError(
errno.ENOENT, "Test no netns exception")
self.assertRaises(
priv_lib.NetworkNamespaceNotFound,
priv_lib._run_iproute_addr,
"test_cmd", "eth0", None, test_param="test_value")
def test_run_iproute_addr_error(self):
with mock.patch.object(pyroute2, "IPRoute") as iproute_mock:
iproute_mock.side_effect = OSError(
errno.EINVAL, "Test invalid argument exception")
try:
priv_lib._run_iproute_addr(
"test_cmd", "eth0", None, test_param="test_value")
self.fail("OSError exception not raised")
except OSError as e:
self.assertEqual(errno.EINVAL, e.errno)