Implement ip_lib get_devices using pyroute2
IPWrapper.get_devices() now uses pyroute2 and priv_sep. Related-Bug: #1492714 Change-Id: Idb847bf16fe8898735266d93d39430da1f5410f9
This commit is contained in:
parent
094095b3d7
commit
aa19fa1c3f
neutron
agent/linux
privileged/agent/linux
tests
functional/privileged
unit/agent/linux
@ -14,7 +14,6 @@
|
||||
# under the License.
|
||||
|
||||
import errno
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
|
||||
@ -126,37 +125,16 @@ class IPWrapper(SubProcessBase):
|
||||
|
||||
def get_devices(self, exclude_loopback=True, exclude_fb_tun_devices=True):
|
||||
retval = []
|
||||
if self.namespace:
|
||||
# we call out manually because in order to avoid screen scraping
|
||||
# iproute2 we use find to see what is in the sysfs directory, as
|
||||
# suggested by Stephen Hemminger (iproute2 dev).
|
||||
try:
|
||||
cmd = ['ip', 'netns', 'exec', self.namespace,
|
||||
'find', SYS_NET_PATH, '-maxdepth', '1',
|
||||
'-type', 'l', '-printf', '%f ']
|
||||
output = utils.execute(
|
||||
cmd,
|
||||
run_as_root=True,
|
||||
log_fail_as_error=self.log_fail_as_error).split()
|
||||
except RuntimeError:
|
||||
# We could be racing with a cron job deleting namespaces.
|
||||
# Just return a empty list if the namespace is deleted.
|
||||
with excutils.save_and_reraise_exception() as ctx:
|
||||
if not self.netns.exists(self.namespace):
|
||||
ctx.reraise = False
|
||||
return []
|
||||
else:
|
||||
output = (
|
||||
i for i in os.listdir(SYS_NET_PATH)
|
||||
if os.path.islink(os.path.join(SYS_NET_PATH, i))
|
||||
)
|
||||
try:
|
||||
devices = privileged.get_devices(self.namespace)
|
||||
except privileged.NetworkNamespaceNotFound:
|
||||
return retval
|
||||
|
||||
for name in output:
|
||||
for name in devices:
|
||||
if (exclude_loopback and name == LOOPBACK_DEVNAME or
|
||||
exclude_fb_tun_devices and name in FB_TUNNEL_DEVICE_NAMES):
|
||||
continue
|
||||
retval.append(IPDevice(name, namespace=self.namespace))
|
||||
|
||||
return retval
|
||||
|
||||
def get_device_by_ip(self, ip):
|
||||
|
@ -436,3 +436,19 @@ def list_netns(**kwargs):
|
||||
Caller requires raised priveleges to list namespaces
|
||||
"""
|
||||
return netns.listnetns(**kwargs)
|
||||
|
||||
|
||||
@privileged.default.entrypoint
|
||||
def get_devices(namespace, **kwargs):
|
||||
"""List all interfaces in a namespace
|
||||
|
||||
:return: a list of strings with the names of the interfaces in a namespace
|
||||
"""
|
||||
try:
|
||||
with _get_iproute(namespace) as ip:
|
||||
return [link.get_attr('IFLA_IFNAME')
|
||||
for link in ip.get_links(**kwargs)]
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
raise NetworkNamespaceNotFound(netns_name=namespace)
|
||||
raise
|
||||
|
0
neutron/tests/functional/privileged/__init__.py
Normal file
0
neutron/tests/functional/privileged/__init__.py
Normal file
0
neutron/tests/functional/privileged/agent/__init__.py
Normal file
0
neutron/tests/functional/privileged/agent/__init__.py
Normal file
@ -0,0 +1,46 @@
|
||||
# Copyright 2018 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
class GetDevicesTestCase(base.BaseTestCase):
|
||||
|
||||
def _remove_ns(self, namespace):
|
||||
priv_ip_lib.remove_netns(namespace)
|
||||
|
||||
def test_get_devices(self):
|
||||
namespace = 'ns_test-' + uuidutils.generate_uuid()
|
||||
priv_ip_lib.create_netns(namespace)
|
||||
self.addCleanup(self._remove_ns, namespace)
|
||||
interfaces = ['int_01', 'int_02', 'int_03', 'int_04', 'int_05']
|
||||
interfaces_to_check = (interfaces + ip_lib.FB_TUNNEL_DEVICE_NAMES +
|
||||
[ip_lib.LOOPBACK_DEVNAME])
|
||||
for interface in interfaces:
|
||||
priv_ip_lib.create_interface(interface, namespace, 'dummy')
|
||||
|
||||
device_names = priv_ip_lib.get_devices(namespace)
|
||||
for name in device_names:
|
||||
self.assertIn(name, interfaces_to_check)
|
||||
|
||||
for interface in interfaces:
|
||||
priv_ip_lib.delete_interface(interface, namespace)
|
||||
|
||||
device_names = priv_ip_lib.get_devices(namespace)
|
||||
for name in device_names:
|
||||
self.assertNotIn(name, interfaces)
|
@ -222,53 +222,31 @@ class TestIpWrapper(base.BaseTestCase):
|
||||
self.execute_p = mock.patch.object(ip_lib.IPWrapper, '_execute')
|
||||
self.execute = self.execute_p.start()
|
||||
|
||||
@mock.patch('os.path.islink')
|
||||
@mock.patch('os.listdir', return_value=['lo'])
|
||||
def test_get_devices(self, mocked_listdir, mocked_islink):
|
||||
retval = ip_lib.IPWrapper().get_devices()
|
||||
mocked_islink.assert_called_once_with('/sys/class/net/lo')
|
||||
self.assertEqual([], retval)
|
||||
@mock.patch.object(priv_lib, 'get_devices')
|
||||
def test_get_devices(self, mock_get_devices):
|
||||
interfaces = ['br01', 'lo', 'gre0']
|
||||
mock_get_devices.return_value = interfaces
|
||||
devices = ip_lib.IPWrapper(namespace='foo').get_devices()
|
||||
for device in devices:
|
||||
self.assertEqual('br01', device.name)
|
||||
interfaces.remove(device.name)
|
||||
|
||||
@mock.patch('neutron.agent.common.utils.execute')
|
||||
def test_get_devices_namespaces(self, mocked_execute):
|
||||
fake_str = mock.Mock()
|
||||
fake_str.split.return_value = ['lo']
|
||||
mocked_execute.return_value = fake_str
|
||||
retval = ip_lib.IPWrapper(namespace='foo').get_devices()
|
||||
mocked_execute.assert_called_once_with(
|
||||
['ip', 'netns', 'exec', 'foo', 'find', '/sys/class/net',
|
||||
'-maxdepth', '1', '-type', 'l', '-printf', '%f '],
|
||||
run_as_root=True, log_fail_as_error=True)
|
||||
self.assertTrue(fake_str.split.called)
|
||||
self.assertEqual([], retval)
|
||||
|
||||
@mock.patch('neutron.agent.common.utils.execute')
|
||||
def test_get_devices_namespaces_ns_not_exists(self, mocked_execute):
|
||||
mocked_execute.side_effect = RuntimeError(
|
||||
"Cannot open network namespace")
|
||||
with mock.patch.object(ip_lib.IpNetnsCommand, 'exists',
|
||||
return_value=False):
|
||||
retval = ip_lib.IPWrapper(namespace='foo').get_devices()
|
||||
self.assertEqual([], retval)
|
||||
|
||||
@mock.patch('neutron.agent.common.utils.execute')
|
||||
def test_get_devices_namespaces_ns_exists(self, mocked_execute):
|
||||
mocked_execute.side_effect = RuntimeError(
|
||||
"Cannot open network namespace")
|
||||
with mock.patch.object(ip_lib.IpNetnsCommand, 'exists',
|
||||
return_value=True):
|
||||
self.assertRaises(RuntimeError,
|
||||
ip_lib.IPWrapper(namespace='foo').get_devices)
|
||||
|
||||
@mock.patch('neutron.agent.common.utils.execute')
|
||||
def test_get_devices_exclude_loopback_and_gre(self, mocked_execute):
|
||||
device_name = 'somedevice'
|
||||
mocked_execute.return_value = 'lo gre0 sit0 ip6gre0 ' + device_name
|
||||
@mock.patch.object(priv_lib, 'get_devices')
|
||||
def test_get_devices_include_loopback_and_gre(self, mock_get_devices):
|
||||
interfaces = ['br01', 'lo', 'gre0']
|
||||
mock_get_devices.return_value = interfaces
|
||||
devices = ip_lib.IPWrapper(namespace='foo').get_devices(
|
||||
exclude_loopback=True, exclude_fb_tun_devices=True)
|
||||
somedevice = devices.pop()
|
||||
self.assertEqual(device_name, somedevice.name)
|
||||
self.assertFalse(devices)
|
||||
exclude_loopback=False, exclude_fb_tun_devices=False)
|
||||
for device in devices:
|
||||
self.assertIn(device.name, interfaces)
|
||||
interfaces.remove(device.name)
|
||||
self.assertEqual(0, len(interfaces))
|
||||
|
||||
@mock.patch.object(priv_lib, 'get_devices')
|
||||
def test_get_devices_no_netspace(self, mock_get_devices):
|
||||
mock_get_devices.side_effect = priv_lib.NetworkNamespaceNotFound(
|
||||
netns_name='foo')
|
||||
self.assertEqual([], ip_lib.IPWrapper(namespace='foo').get_devices())
|
||||
|
||||
@mock.patch.object(pyroute2.netns, 'listnetns')
|
||||
@mock.patch.object(priv_lib, 'list_netns')
|
||||
|
Loading…
x
Reference in New Issue
Block a user