Change ip_lib network namespace code to use pyroute2

Change network namespace add/delete/list code to use
pyroute2 library instead of calling /sbin/ip.

Also changed all in-tree callers to use the new calls.

Closes-bug: #1717582
Related-bug: #1492714

Change-Id: Id802e77543177fbb95ff15c2c7361172e8824633
This commit is contained in:
Brian Haley 2017-09-20 16:09:04 -04:00 committed by Slawek Kaplonski
parent d2bf790a44
commit 4f627b4e8d
31 changed files with 228 additions and 153 deletions

View File

@ -34,12 +34,14 @@ def register_options(conf):
dhcp_config.register_agent_dhcp_opts(conf) dhcp_config.register_agent_dhcp_opts(conf)
meta_conf.register_meta_conf_opts(meta_conf.SHARED_OPTS, conf) meta_conf.register_meta_conf_opts(meta_conf.SHARED_OPTS, conf)
config.register_interface_opts(conf) config.register_interface_opts(conf)
config.register_root_helper(conf)
def main(): def main():
register_options(cfg.CONF) register_options(cfg.CONF)
common_config.init(sys.argv[1:]) common_config.init(sys.argv[1:])
config.setup_logging() config.setup_logging()
config.setup_privsep()
server = neutron_service.Service.create( server = neutron_service.Service.create(
binary='neutron-dhcp-agent', binary='neutron-dhcp-agent',
topic=topics.DHCP_AGENT, topic=topics.DHCP_AGENT,

View File

@ -596,8 +596,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
def _delete_interface_route_in_fip_ns(self, router_port): def _delete_interface_route_in_fip_ns(self, router_port):
rtr_2_fip_ip, fip_2_rtr_name = self.get_rtr_fip_ip_and_interface_name() rtr_2_fip_ip, fip_2_rtr_name = self.get_rtr_fip_ip_and_interface_name()
fip_ns_name = self.fip_ns.get_name() fip_ns_name = self.fip_ns.get_name()
ip_wrapper = ip_lib.IPWrapper(namespace=fip_ns_name) if ip_lib.network_namespace_exists(fip_ns_name):
if ip_wrapper.netns.exists(fip_ns_name):
device = ip_lib.IPDevice(fip_2_rtr_name, namespace=fip_ns_name) device = ip_lib.IPDevice(fip_2_rtr_name, namespace=fip_ns_name)
if not device.exists(): if not device.exists():
return return
@ -608,8 +607,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
def _add_interface_route_to_fip_ns(self, router_port): def _add_interface_route_to_fip_ns(self, router_port):
rtr_2_fip_ip, fip_2_rtr_name = self.get_rtr_fip_ip_and_interface_name() rtr_2_fip_ip, fip_2_rtr_name = self.get_rtr_fip_ip_and_interface_name()
fip_ns_name = self.fip_ns.get_name() fip_ns_name = self.fip_ns.get_name()
ip_wrapper = ip_lib.IPWrapper(namespace=fip_ns_name) if ip_lib.network_namespace_exists(fip_ns_name):
if ip_wrapper.netns.exists(fip_ns_name):
device = ip_lib.IPDevice(fip_2_rtr_name, namespace=fip_ns_name) device = ip_lib.IPDevice(fip_2_rtr_name, namespace=fip_ns_name)
if not device.exists(): if not device.exists():
return return

View File

@ -30,8 +30,7 @@ class L3AgentExtensionAPI(object):
self._router_info = router_info self._router_info = router_info
def _local_namespaces(self): def _local_namespaces(self):
root_ip = ip_lib.IPWrapper() local_ns_list = ip_lib.list_network_namespaces()
local_ns_list = root_ip.get_namespaces()
return set(local_ns_list) return set(local_ns_list)
def get_router_hosting_port(self, port_id): def get_router_hosting_port(self, port_id):

View File

@ -114,8 +114,7 @@ class NamespaceManager(object):
def list_all(self): def list_all(self):
"""Get a set of all namespaces on host managed by this manager.""" """Get a set of all namespaces on host managed by this manager."""
try: try:
root_ip = ip_lib.IPWrapper() namespaces = ip_lib.list_network_namespaces()
namespaces = root_ip.get_namespaces()
return set(ns for ns in namespaces if self.is_managed(ns)) return set(ns for ns in namespaces if self.is_managed(ns))
except RuntimeError: except RuntimeError:
LOG.exception('RuntimeError in obtaining namespace list for ' LOG.exception('RuntimeError in obtaining namespace list for '

View File

@ -241,12 +241,11 @@ class DhcpLocalProcess(DhcpBase):
LOG.warning('Failed trying to delete interface: %s', LOG.warning('Failed trying to delete interface: %s',
self.interface_name) self.interface_name)
ns_ip = ip_lib.IPWrapper(namespace=self.network.namespace) if not ip_lib.network_namespace_exists(self.network.namespace):
if not ns_ip.netns.exists(self.network.namespace):
LOG.debug("Namespace already deleted: %s", self.network.namespace) LOG.debug("Namespace already deleted: %s", self.network.namespace)
return return
try: try:
ns_ip.netns.delete(self.network.namespace) ip_lib.delete_network_namespace(self.network.namespace)
except RuntimeError: except RuntimeError:
LOG.warning('Failed trying to delete namespace: %s', LOG.warning('Failed trying to delete namespace: %s',
self.network.namespace) self.network.namespace)

View File

@ -17,6 +17,7 @@ import os
import re import re
import time import time
from debtcollector import removals
import eventlet import eventlet
import netaddr import netaddr
from neutron_lib import constants from neutron_lib import constants
@ -24,6 +25,7 @@ from neutron_lib import exceptions
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import excutils from oslo_utils import excutils
from pyroute2 import netns
import six import six
from neutron._i18n import _ from neutron._i18n import _
@ -256,12 +258,13 @@ class IPWrapper(SubProcessBase):
self._as_root([], 'link', cmd) self._as_root([], 'link', cmd)
return (IPDevice(name, namespace=self.namespace)) return (IPDevice(name, namespace=self.namespace))
@removals.remove(version='Queens', removal_version='Rocky',
message="This will be removed in the future. Please use "
"'neutron.agent.linux.ip_lib."
"list_network_namespaces' instead.")
@classmethod @classmethod
def get_namespaces(cls): def get_namespaces(cls):
output = cls._execute( return list_network_namespaces()
[], 'netns', ('list',),
run_as_root=cfg.CONF.AGENT.use_helper_for_ns_read)
return [l.split()[0] for l in output.splitlines()]
class IPDevice(SubProcessBase): class IPDevice(SubProcessBase):
@ -863,14 +866,14 @@ class IpNetnsCommand(IpCommandBase):
COMMAND = 'netns' COMMAND = 'netns'
def add(self, name): def add(self, name):
self._as_root([], ('add', name), use_root_namespace=True) create_network_namespace(name)
wrapper = IPWrapper(namespace=name) wrapper = IPWrapper(namespace=name)
wrapper.netns.execute(['sysctl', '-w', wrapper.netns.execute(['sysctl', '-w',
'net.ipv4.conf.all.promote_secondaries=1']) 'net.ipv4.conf.all.promote_secondaries=1'])
return wrapper return wrapper
def delete(self, name): def delete(self, name):
self._as_root([], ('delete', name), use_root_namespace=True) delete_network_namespace(name)
def execute(self, cmds, addl_env=None, check_exit_code=True, def execute(self, cmds, addl_env=None, check_exit_code=True,
log_fail_as_error=True, extra_ok_codes=None, log_fail_as_error=True, extra_ok_codes=None,
@ -891,13 +894,7 @@ class IpNetnsCommand(IpCommandBase):
log_fail_as_error=log_fail_as_error, **kwargs) log_fail_as_error=log_fail_as_error, **kwargs)
def exists(self, name): def exists(self, name):
output = self._parent._execute( return network_namespace_exists(name)
['o'], 'netns', ['list'],
run_as_root=cfg.CONF.AGENT.use_helper_for_ns_read)
for line in [l.split()[0] for l in output.splitlines()]:
if name == line:
return True
return False
def vlan_in_use(segmentation_id, namespace=None): def vlan_in_use(segmentation_id, namespace=None):
@ -1018,6 +1015,45 @@ def dump_neigh_entries(ip_version, device=None, namespace=None, **kwargs):
**kwargs)) **kwargs))
def create_network_namespace(namespace, **kwargs):
"""Create a network namespace.
:param namespace: The name of the namespace to create
:param kwargs: Callers add any filters they use as kwargs
"""
privileged.create_netns(namespace, **kwargs)
def delete_network_namespace(namespace, **kwargs):
"""Delete a network namespace.
:param namespace: The name of the namespace to delete
:param kwargs: Callers add any filters they use as kwargs
"""
privileged.remove_netns(namespace, **kwargs)
def list_network_namespaces(**kwargs):
"""List all network namespace entries.
:param kwargs: Callers add any filters they use as kwargs
"""
if cfg.CONF.AGENT.use_helper_for_ns_read:
return privileged.list_netns(**kwargs)
else:
return netns.listnetns(**kwargs)
def network_namespace_exists(namespace, **kwargs):
"""Check if a network namespace exists.
:param namespace: The name of the namespace to check
:param kwargs: Callers add any filters they use as kwargs
"""
output = list_network_namespaces(**kwargs)
return namespace in output
def ensure_device_is_ready(device_name, namespace=None): def ensure_device_is_ready(device_name, namespace=None):
dev = IPDevice(device_name, namespace=namespace) dev = IPDevice(device_name, namespace=namespace)
dev.set_log_fail_as_error(False) dev.set_log_fail_as_error(False)

View File

@ -523,7 +523,7 @@ class IptablesManager(object):
# exist. # exist.
with excutils.save_and_reraise_exception() as ctx: with excutils.save_and_reraise_exception() as ctx:
if (self.namespace and not if (self.namespace and not
ip_lib.IPWrapper().netns.exists(self.namespace)): ip_lib.network_namespace_exists(self.namespace)):
ctx.reraise = False ctx.reraise = False
LOG.error("Namespace %s was deleted during IPTables " LOG.error("Namespace %s was deleted during IPTables "
"operations.", self.namespace) "operations.", self.namespace)

View File

@ -256,7 +256,7 @@ def destroy_namespace(conf, namespace, force=False):
def cleanup_network_namespaces(conf): def cleanup_network_namespaces(conf):
# Identify namespaces that are candidates for deletion. # Identify namespaces that are candidates for deletion.
candidates = [ns for ns in candidates = [ns for ns in
ip_lib.IPWrapper.get_namespaces() ip_lib.list_network_namespaces()
if eligible_for_deletion(conf, ns, conf.force)] if eligible_for_deletion(conf, ns, conf.force)]
if candidates: if candidates:

View File

@ -181,15 +181,13 @@ def vf_extended_management_supported():
def netns_read_requires_helper(): def netns_read_requires_helper():
ipw = ip_lib.IPWrapper()
nsname = "netnsreadtest-" + uuidutils.generate_uuid() nsname = "netnsreadtest-" + uuidutils.generate_uuid()
ipw.netns.add(nsname) ip_lib.create_network_namespace(nsname)
try: try:
# read without root_helper. if exists, not required. # read without root_helper. if exists, not required.
ipw_nohelp = ip_lib.IPWrapper() exists = ip_lib.network_namespace_exists(nsname)
exists = ipw_nohelp.netns.exists(nsname)
finally: finally:
ipw.netns.delete(nsname) ip_lib.delete_network_namespace(nsname)
return not exists return not exists
@ -294,7 +292,7 @@ class KeepalivedIPv6Test(object):
common_utils.wait_until_true(_gw_vip_assigned) common_utils.wait_until_true(_gw_vip_assigned)
def __enter__(self): def __enter__(self):
ip_lib.IPWrapper().netns.add(self.nsname) ip_lib.create_network_namespace(self.nsname)
return self return self
def __exit__(self, exc_type, exc_value, exc_tb): def __exit__(self, exc_type, exc_value, exc_tb):
@ -304,7 +302,7 @@ class KeepalivedIPv6Test(object):
self.manager.disable() self.manager.disable()
if self.config_path: if self.config_path:
shutil.rmtree(self.config_path, ignore_errors=True) shutil.rmtree(self.config_path, ignore_errors=True)
ip_lib.IPWrapper().netns.delete(self.nsname) ip_lib.delete_network_namespace(self.nsname)
cfg.CONF.set_override('check_child_processes_interval', cfg.CONF.set_override('check_child_processes_interval',
self.orig_interval, 'AGENT') self.orig_interval, 'AGENT')
@ -450,13 +448,12 @@ def _fix_ip_nonlocal_bind_root_value(original_value):
def ip_nonlocal_bind(): def ip_nonlocal_bind():
ipw = ip_lib.IPWrapper()
nsname1 = "ipnonlocalbind1-" + uuidutils.generate_uuid() nsname1 = "ipnonlocalbind1-" + uuidutils.generate_uuid()
nsname2 = "ipnonlocalbind2-" + uuidutils.generate_uuid() nsname2 = "ipnonlocalbind2-" + uuidutils.generate_uuid()
ipw.netns.add(nsname1) ip_lib.create_network_namespace(nsname1)
try: try:
ipw.netns.add(nsname2) ip_lib.create_network_namespace(nsname2)
try: try:
original_value = ip_lib.get_ip_nonlocal_bind(namespace=None) original_value = ip_lib.get_ip_nonlocal_bind(namespace=None)
try: try:
@ -470,7 +467,7 @@ def ip_nonlocal_bind():
"Exception: %s", e) "Exception: %s", e)
return False return False
finally: finally:
ipw.netns.delete(nsname2) ip_lib.delete_network_namespace(nsname2)
finally: finally:
ipw.netns.delete(nsname1) ip_lib.delete_network_namespace(nsname1)
return ns1_value == 0 return ns1_value == 0

View File

@ -98,14 +98,13 @@ class NeutronDebugAgent(object):
bridge = None bridge = None
if network.external: if network.external:
bridge = self.conf.external_network_bridge bridge = self.conf.external_network_bridge
ip = ip_lib.IPWrapper()
namespace = self._get_namespace(port) namespace = self._get_namespace(port)
if ip.netns.exists(namespace): if ip_lib.network_namespace_exists(namespace):
self.driver.unplug(self.driver.get_device_name(port), self.driver.unplug(self.driver.get_device_name(port),
bridge=bridge, bridge=bridge,
namespace=namespace) namespace=namespace)
try: try:
ip.netns.delete(namespace) ip_lib.delete_network_namespace(namespace)
except Exception: except Exception:
LOG.warning('Failed to delete namespace %s', namespace) LOG.warning('Failed to delete namespace %s', namespace)
else: else:

View File

@ -17,6 +17,7 @@ import pyroute2
from pyroute2.netlink import rtnl from pyroute2.netlink import rtnl
from pyroute2.netlink.rtnl import ndmsg from pyroute2.netlink.rtnl import ndmsg
from pyroute2 import NetlinkError from pyroute2 import NetlinkError
from pyroute2 import netns
from neutron._i18n import _ from neutron._i18n import _
from neutron import privileged from neutron import privileged
@ -175,3 +176,34 @@ def dump_neigh_entries(ip_version, device, namespace, **kwargs):
'lladdr': attrs.get('NDA_LLADDR'), 'lladdr': attrs.get('NDA_LLADDR'),
'device': device}] 'device': device}]
return entries return entries
@privileged.default.entrypoint
def create_netns(name, **kwargs):
"""Create a network namespace.
:param name: The name of the namespace to create
"""
try:
netns.create(name, **kwargs)
except OSError as e:
if e.errno != errno.EEXIST:
raise
@privileged.default.entrypoint
def remove_netns(name, **kwargs):
"""Remove a network namespace.
:param name: The name of the namespace to remove
"""
netns.remove(name, **kwargs)
@privileged.default.entrypoint
def list_netns(**kwargs):
"""List network namespaces.
Caller requires raised priveleges to list namespaces
"""
return netns.listnetns(**kwargs)

View File

@ -81,7 +81,7 @@ class RouterWithMetering(object):
self.id) self.id)
# Check for namespace existence before we assign the # Check for namespace existence before we assign the
# snat_iptables_manager # snat_iptables_manager
if ip_lib.IPWrapper().netns.exists(snat_ns_name): if ip_lib.network_namespace_exists(snat_ns_name):
self.snat_iptables_manager = iptables_manager.IptablesManager( self.snat_iptables_manager = iptables_manager.IptablesManager(
namespace=snat_ns_name, namespace=snat_ns_name,
binary_name=WRAP_NAME, binary_name=WRAP_NAME,
@ -91,8 +91,7 @@ class RouterWithMetering(object):
# NOTE(Swami): If distributed routers, all external traffic on a # NOTE(Swami): If distributed routers, all external traffic on a
# compute node will flow through the rfp interface in the router # compute node will flow through the rfp interface in the router
# namespace. # namespace.
ip_wrapper = ip_lib.IPWrapper(namespace=self.ns_name) if ip_lib.network_namespace_exists(self.ns_name):
if ip_wrapper.netns.exists(self.ns_name):
self.iptables_manager = iptables_manager.IptablesManager( self.iptables_manager = iptables_manager.IptablesManager(
namespace=self.ns_name, namespace=self.ns_name,
binary_name=WRAP_NAME, binary_name=WRAP_NAME,

View File

@ -658,8 +658,7 @@ class MacvtapFixture(fixtures.Fixture):
self.addCleanup(self.destroy) self.addCleanup(self.destroy)
def destroy(self): def destroy(self):
ip_wrapper = ip_lib.IPWrapper(self.ip_dev.namespace) if (ip_lib.network_namespace_exists(self.ip_dev.namespace) or
if (ip_wrapper.netns.exists(self.ip_dev.namespace) or
self.ip_dev.namespace is None): self.ip_dev.namespace is None):
try: try:
self.ip_dev.link.delete() self.ip_dev.link.delete()

View File

@ -16,6 +16,7 @@ import os
from oslo_config import cfg from oslo_config import cfg
from neutron.conf.agent import common as config
from neutron.tests import base as tests_base from neutron.tests import base as tests_base
from neutron.tests.common import helpers from neutron.tests.common import helpers
from neutron.tests.fullstack.resources import client as client_resource from neutron.tests.fullstack.resources import client as client_resource
@ -60,6 +61,7 @@ class BaseFullStackTestCase(testlib_api.MySQLTestCaseMixin,
# configure test runner to use rootwrap # configure test runner to use rootwrap
self.setup_rootwrap() self.setup_rootwrap()
config.setup_privsep()
self.environment = environment self.environment = environment
self.environment.test_name = self.get_name() self.environment.test_name = self.get_name()

View File

@ -200,8 +200,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
# All associated vlan interfaces are deleted too # All associated vlan interfaces are deleted too
self.bridge.delete_port(self.port.name) self.bridge.delete_port(self.port.name)
ip_wrap = ip_lib.IPWrapper(self.namespace) ip_lib.delete_network_namespace(self.namespace)
ip_wrap.netns.delete(self.namespace)
class FakeFullstackTrunkMachine(FakeFullstackMachine): class FakeFullstackTrunkMachine(FakeFullstackMachine):

View File

@ -348,11 +348,10 @@ class DhcpAgentFixture(fixtures.Fixture):
namespace suffix. namespace suffix.
""" """
ip_wrapper = ip_lib.IPWrapper() for namespace in ip_lib.list_network_namespaces():
for namespace in ip_wrapper.get_namespaces():
if self.dhcp_namespace_pattern.match(namespace): if self.dhcp_namespace_pattern.match(namespace):
try: try:
ip_wrapper.netns.delete(namespace) ip_lib.delete_network_namespace(namespace)
except RuntimeError: except RuntimeError:
# Continue cleaning even if namespace deletions fails # Continue cleaning even if namespace deletions fails
pass pass

View File

@ -93,8 +93,8 @@ class TestLegacyL3Agent(TestL3Agent):
return namespaces.build_ns_name(namespaces.NS_PREFIX, router_id) return namespaces.build_ns_name(namespaces.NS_PREFIX, router_id)
def _assert_namespace_exists(self, ns_name): def _assert_namespace_exists(self, ns_name):
ip = ip_lib.IPWrapper(ns_name) common_utils.wait_until_true(
common_utils.wait_until_true(lambda: ip.netns.exists(ns_name)) lambda: ip_lib.network_namespace_exists(ns_name))
def test_namespace_exists(self): def test_namespace_exists(self):
tenant_id = uuidutils.generate_uuid() tenant_id = uuidutils.generate_uuid()

View File

@ -340,8 +340,7 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
ip_version, ipv6_subnet_modes, interface_id) ip_version, ipv6_subnet_modes, interface_id)
def _namespace_exists(self, namespace): def _namespace_exists(self, namespace):
ip = ip_lib.IPWrapper(namespace=namespace) return ip_lib.network_namespace_exists(namespace)
return ip.netns.exists(namespace)
def _metadata_proxy_exists(self, conf, router): def _metadata_proxy_exists(self, conf, router):
pm = external_process.ProcessManager( pm = external_process.ProcessManager(

View File

@ -53,8 +53,7 @@ class NamespaceManagerTestFramework(base.BaseSudoTestCase):
raise e raise e
def _namespace_exists(self, namespace): def _namespace_exists(self, namespace):
ip = ip_lib.IPWrapper(namespace=namespace) return ip_lib.network_namespace_exists(namespace)
return ip.netns.exists(namespace)
class NamespaceManagerTestCase(NamespaceManagerTestFramework): class NamespaceManagerTestCase(NamespaceManagerTestFramework):

View File

@ -173,8 +173,8 @@ class DHCPAgentOVSTestFramework(base.BaseSudoTestCase):
self.assert_dhcp_device(network.namespace, iface_name, dhcp_enabled) self.assert_dhcp_device(network.namespace, iface_name, dhcp_enabled)
def assert_dhcp_namespace(self, namespace, dhcp_enabled): def assert_dhcp_namespace(self, namespace, dhcp_enabled):
ip = ip_lib.IPWrapper() self.assertEqual(dhcp_enabled,
self.assertEqual(dhcp_enabled, ip.netns.exists(namespace)) ip_lib.network_namespace_exists(namespace))
def assert_accept_ra_disabled(self, namespace): def assert_accept_ra_disabled(self, namespace):
actual = ip_lib.IPWrapper(namespace=namespace).netns.execute( actual = ip_lib.IPWrapper(namespace=namespace).netns.execute(

View File

@ -30,7 +30,7 @@ from neutron.tests.common import net_helpers
from neutron.tests.functional import base from neutron.tests.functional import base
from neutron.tests.functional.cmd import process_spawn from neutron.tests.functional.cmd import process_spawn
GET_NAMESPACES = 'neutron.agent.linux.ip_lib.IPWrapper.get_namespaces' GET_NAMESPACES = 'neutron.agent.linux.ip_lib.list_network_namespaces'
TEST_INTERFACE_DRIVER = 'neutron.agent.linux.interface.OVSInterfaceDriver' TEST_INTERFACE_DRIVER = 'neutron.agent.linux.interface.OVSInterfaceDriver'
NUM_SUBPROCESSES = 6 NUM_SUBPROCESSES = 6

View File

@ -89,6 +89,10 @@ class BasicRouterOperationsFramework(base.BaseTestCase):
'neutron.agent.linux.ip_lib.device_exists') 'neutron.agent.linux.ip_lib.device_exists')
self.device_exists = self.device_exists_p.start() self.device_exists = self.device_exists_p.start()
self.list_network_namespaces_p = mock.patch(
'neutron.agent.linux.ip_lib.list_network_namespaces')
self.list_network_namespaces = self.list_network_namespaces_p.start()
self.ensure_dir = mock.patch( self.ensure_dir = mock.patch(
'oslo_utils.fileutils.ensure_tree').start() 'oslo_utils.fileutils.ensure_tree').start()
@ -373,7 +377,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
for r_id in stale_router_ids] for r_id in stale_router_ids]
namespace_list += [namespaces.NS_PREFIX + r['id'] namespace_list += [namespaces.NS_PREFIX + r['id']
for r in active_routers] for r in active_routers]
self.mock_ip.get_namespaces.return_value = namespace_list self.list_network_namespaces.return_value = namespace_list
driver = metadata_driver.MetadataDriver driver = metadata_driver.MetadataDriver
with mock.patch.object( with mock.patch.object(
driver, 'destroy_monitored_metadata_proxy') as destroy_proxy: driver, 'destroy_monitored_metadata_proxy') as destroy_proxy:
@ -2327,7 +2331,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
def test_destroy_namespace(self): def test_destroy_namespace(self):
namespace = 'qrouter-bar' namespace = 'qrouter-bar'
self.mock_ip.get_namespaces.return_value = [namespace] self.list_network_namespaces.return_value = [namespace]
self.mock_ip.get_devices.return_value = [ self.mock_ip.get_devices.return_value = [
l3_test_common.FakeDev('qr-aaaa'), l3_test_common.FakeDev('qr-aaaa'),
l3_test_common.FakeDev('rfp-aaaa')] l3_test_common.FakeDev('rfp-aaaa')]
@ -2356,7 +2360,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
def test_destroy_snat_namespace(self): def test_destroy_snat_namespace(self):
namespace = 'snat-bar' namespace = 'snat-bar'
self.mock_ip.get_namespaces.return_value = [namespace] self.list_network_namespaces.return_value = [namespace]
self.mock_ip.get_devices.return_value = [ self.mock_ip.get_devices.return_value = [
l3_test_common.FakeDev('qg-aaaa'), l3_test_common.FakeDev('qg-aaaa'),
l3_test_common.FakeDev('sg-aaaa')] l3_test_common.FakeDev('sg-aaaa')]
@ -2614,9 +2618,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
for r in router_list] for r in router_list]
good_namespace_list += [dvr_snat_ns.SNAT_NS_PREFIX + r['id'] good_namespace_list += [dvr_snat_ns.SNAT_NS_PREFIX + r['id']
for r in router_list] for r in router_list]
self.mock_ip.get_namespaces.return_value = (stale_namespace_list + self.list_network_namespaces.return_value = (stale_namespace_list +
good_namespace_list + good_namespace_list +
other_namespaces) other_namespaces)
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)

View File

@ -18,6 +18,7 @@ from oslo_utils import uuidutils
from neutron.agent.common import utils from neutron.agent.common import utils
from neutron.agent.l3 import dvr_snat_ns from neutron.agent.l3 import dvr_snat_ns
from neutron.agent.linux import ip_lib
from neutron.tests import base from neutron.tests import base
_uuid = uuidutils.generate_uuid _uuid = uuidutils.generate_uuid
@ -37,7 +38,10 @@ class TestDvrSnatNs(base.BaseTestCase):
use_ipv6=False) use_ipv6=False)
@mock.patch.object(utils, 'execute') @mock.patch.object(utils, 'execute')
def test_create(self, execute): @mock.patch.object(ip_lib, 'create_network_namespace')
@mock.patch.object(ip_lib, 'network_namespace_exists')
def test_create(self, exists, create, execute):
exists.return_value = False
self.snat_ns.create() self.snat_ns.create()
netns_cmd = ['ip', 'netns', 'exec', self.snat_ns.name] netns_cmd = ['ip', 'netns', 'exec', self.snat_ns.name]
@ -46,4 +50,5 @@ class TestDvrSnatNs(base.BaseTestCase):
check_exit_code=True, extra_ok_codes=None, check_exit_code=True, extra_ok_codes=None,
log_fail_as_error=True, run_as_root=True)] log_fail_as_error=True, run_as_root=True)]
create.assert_called_once_with(self.snat_ns.name)
execute.assert_has_calls(expected) execute.assert_has_calls(expected)

View File

@ -43,14 +43,14 @@ class TestL3AgentExtensionApi(base.BaseTestCase):
ports = [{'id': pid} for pid in port_ids] ports = [{'id': pid} for pid in port_ids]
router_info, ri = self._prepare_router_data(ports) router_info, ri = self._prepare_router_data(ports)
with mock.patch.object(ip_lib.IPWrapper, with mock.patch.object(ip_lib,
'get_namespaces') as mock_get_namespaces: 'list_network_namespaces') as mock_list_netns:
mock_get_namespaces.return_value = [] mock_list_netns.return_value = []
api_object = l3_agent_api.L3AgentExtensionAPI(router_info) api_object = l3_agent_api.L3AgentExtensionAPI(router_info)
router = api_object.get_router_hosting_port(port_ids[0]) router = api_object.get_router_hosting_port(port_ids[0])
mock_get_namespaces.assert_called_once_with() mock_list_netns.assert_called_once_with()
self.assertFalse(router) self.assertFalse(router)
def test_get_router_hosting_port_for_router_in_ns(self): def test_get_router_hosting_port_for_router_in_ns(self):
@ -58,9 +58,9 @@ class TestL3AgentExtensionApi(base.BaseTestCase):
ports = [{'id': pid} for pid in port_ids] ports = [{'id': pid} for pid in port_ids]
router_info, ri = self._prepare_router_data(ports) router_info, ri = self._prepare_router_data(ports)
with mock.patch.object(ip_lib.IPWrapper, with mock.patch.object(ip_lib,
'get_namespaces') as mock_get_namespaces: 'list_network_namespaces') as mock_list_netns:
mock_get_namespaces.return_value = [ri.ns_name] mock_list_netns.return_value = [ri.ns_name]
api_object = l3_agent_api.L3AgentExtensionAPI(router_info) api_object = l3_agent_api.L3AgentExtensionAPI(router_info)
router = api_object.get_router_hosting_port(port_ids[0]) router = api_object.get_router_hosting_port(port_ids[0])
self.assertEqual(ri, router) self.assertEqual(ri, router)
@ -68,9 +68,9 @@ class TestL3AgentExtensionApi(base.BaseTestCase):
def test_get_routers_in_project(self): def test_get_routers_in_project(self):
router_info, ri = self._prepare_router_data() router_info, ri = self._prepare_router_data()
with mock.patch.object(ip_lib.IPWrapper, with mock.patch.object(ip_lib,
'get_namespaces') as mock_get_namespaces: 'list_network_namespaces') as mock_list_netns:
mock_get_namespaces.return_value = [ri.ns_name] mock_list_netns.return_value = [ri.ns_name]
api_object = l3_agent_api.L3AgentExtensionAPI(router_info) api_object = l3_agent_api.L3AgentExtensionAPI(router_info)
routers = api_object.get_routers_in_project(self.project_id) routers = api_object.get_routers_in_project(self.project_id)
self.assertEqual([ri], routers) self.assertEqual([ri], routers)
@ -78,9 +78,9 @@ class TestL3AgentExtensionApi(base.BaseTestCase):
def test_is_router_in_namespace_for_in_ns(self): def test_is_router_in_namespace_for_in_ns(self):
router_info, ri = self._prepare_router_data() router_info, ri = self._prepare_router_data()
with mock.patch.object(ip_lib.IPWrapper, with mock.patch.object(ip_lib,
'get_namespaces') as mock_get_namespaces: 'list_network_namespaces') as mock_list_netns:
mock_get_namespaces.return_value = [ri.ns_name] mock_list_netns.return_value = [ri.ns_name]
api_object = l3_agent_api.L3AgentExtensionAPI(router_info) api_object = l3_agent_api.L3AgentExtensionAPI(router_info)
router_in_ns = api_object.is_router_in_namespace(ri.router_id) router_in_ns = api_object.is_router_in_namespace(ri.router_id)
self.assertTrue(router_in_ns) self.assertTrue(router_in_ns)
@ -88,9 +88,9 @@ class TestL3AgentExtensionApi(base.BaseTestCase):
def test_is_router_in_namespace_for_not_in_ns(self): def test_is_router_in_namespace_for_not_in_ns(self):
router_info, ri = self._prepare_router_data() router_info, ri = self._prepare_router_data()
with mock.patch.object(ip_lib.IPWrapper, with mock.patch.object(ip_lib,
'get_namespaces') as mock_get_namespaces: 'list_network_namespaces') as mock_list_netns:
mock_get_namespaces.return_value = [uuidutils.generate_uuid()] mock_list_netns.return_value = [uuidutils.generate_uuid()]
api_object = l3_agent_api.L3AgentExtensionAPI(router_info) api_object = l3_agent_api.L3AgentExtensionAPI(router_info)
router_in_ns = api_object.is_router_in_namespace(ri.router_id) router_in_ns = api_object.is_router_in_namespace(ri.router_id)
self.assertFalse(router_in_ns) self.assertFalse(router_in_ns)

View File

@ -77,7 +77,7 @@ class TestNamespaceManager(NamespaceManagerTestCaseFramework):
'dhcp-' + _uuid(), ] 'dhcp-' + _uuid(), ]
# Test the normal path # Test the normal path
with mock.patch.object(ip_lib.IPWrapper, 'get_namespaces', with mock.patch.object(ip_lib, 'list_network_namespaces',
return_value=ns_names): return_value=ns_names):
retrieved_ns_names = self.ns_manager.list_all() retrieved_ns_names = self.ns_manager.list_all()
self.assertEqual(len(ns_names) - 1, len(retrieved_ns_names)) self.assertEqual(len(ns_names) - 1, len(retrieved_ns_names))
@ -85,8 +85,8 @@ class TestNamespaceManager(NamespaceManagerTestCaseFramework):
self.assertIn(ns_names[i], retrieved_ns_names) self.assertIn(ns_names[i], retrieved_ns_names)
self.assertNotIn(ns_names[-1], retrieved_ns_names) self.assertNotIn(ns_names[-1], retrieved_ns_names)
# Test path where IPWrapper raises exception # Test path where list_network_namespaces() raises exception
with mock.patch.object(ip_lib.IPWrapper, 'get_namespaces', with mock.patch.object(ip_lib, 'list_network_namespaces',
side_effect=RuntimeError): side_effect=RuntimeError):
retrieved_ns_names = self.ns_manager.list_all() retrieved_ns_names = self.ns_manager.list_all()
self.assertFalse(retrieved_ns_names) self.assertFalse(retrieved_ns_names)
@ -104,7 +104,7 @@ class TestNamespaceManager(NamespaceManagerTestCaseFramework):
ns_names += [dvr_snat_ns.SNAT_NS_PREFIX + _uuid() for _ in range(5)] ns_names += [dvr_snat_ns.SNAT_NS_PREFIX + _uuid() for _ in range(5)]
ns_names += [namespaces.NS_PREFIX + router_id, ns_names += [namespaces.NS_PREFIX + router_id,
dvr_snat_ns.SNAT_NS_PREFIX + router_id] dvr_snat_ns.SNAT_NS_PREFIX + router_id]
with mock.patch.object(ip_lib.IPWrapper, 'get_namespaces', with mock.patch.object(ip_lib, 'list_network_namespaces',
return_value=ns_names), \ return_value=ns_names), \
mock.patch.object(self.ns_manager, '_cleanup') as mock_cleanup: mock.patch.object(self.ns_manager, '_cleanup') as mock_cleanup:
self.ns_manager.ensure_router_cleanup(router_id) self.ns_manager.ensure_router_cleanup(router_id)

View File

@ -1096,7 +1096,9 @@ class TestDhcpLocalProcess(TestBase):
self.assertTrue(lp.process_monitor.unregister.called) self.assertTrue(lp.process_monitor.unregister.called)
self.assertTrue(self.external_process().disable.called) self.assertTrue(self.external_process().disable.called)
def test_disable_not_active(self): @mock.patch('neutron.agent.linux.ip_lib.network_namespace_exists')
def test_disable_not_active(self, namespace_exists):
namespace_exists.return_value = False
attrs_to_mock = dict([(a, mock.DEFAULT) for a in attrs_to_mock = dict([(a, mock.DEFAULT) for a in
['active', 'interface_name']]) ['active', 'interface_name']])
with mock.patch.multiple(LocalChild, **attrs_to_mock) as mocks: with mock.patch.multiple(LocalChild, **attrs_to_mock) as mocks:
@ -1121,30 +1123,38 @@ class TestDhcpLocalProcess(TestBase):
lp.disable(retain_port=True) lp.disable(retain_port=True)
self._assert_disabled(lp) self._assert_disabled(lp)
def test_disable(self): @mock.patch('neutron.agent.linux.ip_lib.network_namespace_exists')
def test_disable(self, namespace_exists):
namespace_exists.return_value = True
attrs_to_mock = {'active': mock.DEFAULT} attrs_to_mock = {'active': mock.DEFAULT}
with mock.patch.multiple(LocalChild, **attrs_to_mock) as mocks: with mock.patch.multiple(LocalChild, **attrs_to_mock) as mocks:
mocks['active'].__get__ = mock.Mock(return_value=False) mocks['active'].__get__ = mock.Mock(return_value=False)
lp = LocalChild(self.conf, FakeDualNetwork()) lp = LocalChild(self.conf, FakeDualNetwork())
with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip: with mock.patch('neutron.agent.linux.ip_lib.'
'delete_network_namespace') as delete_ns:
lp.disable() lp.disable()
self._assert_disabled(lp) self._assert_disabled(lp)
ip.return_value.netns.delete.assert_called_with('qdhcp-ns') delete_ns.assert_called_with('qdhcp-ns')
def test_disable_config_dir_removed_after_destroy(self): @mock.patch('neutron.agent.linux.ip_lib.network_namespace_exists')
def test_disable_config_dir_removed_after_destroy(self, namespace_exists):
namespace_exists.return_value = True
parent = mock.MagicMock() parent = mock.MagicMock()
parent.attach_mock(self.rmtree, 'rmtree') parent.attach_mock(self.rmtree, 'rmtree')
parent.attach_mock(self.mock_mgr, 'DeviceManager') parent.attach_mock(self.mock_mgr, 'DeviceManager')
lp = LocalChild(self.conf, FakeDualNetwork()) lp = LocalChild(self.conf, FakeDualNetwork())
lp.disable(retain_port=False) with mock.patch('neutron.agent.linux.ip_lib.'
'delete_network_namespace') as delete_ns:
lp.disable(retain_port=False)
expected = [mock.call.DeviceManager().destroy(mock.ANY, mock.ANY), expected = [mock.call.DeviceManager().destroy(mock.ANY, mock.ANY),
mock.call.rmtree(mock.ANY, ignore_errors=True)] mock.call.rmtree(mock.ANY, ignore_errors=True)]
parent.assert_has_calls(expected) parent.assert_has_calls(expected)
delete_ns.assert_called_with('qdhcp-ns')
def test_get_interface_name(self): def test_get_interface_name(self):
net = FakeDualNetwork() net = FakeDualNetwork()

View File

@ -36,11 +36,6 @@ NETNS_SAMPLE = [
'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb',
'cccccccc-cccc-cccc-cccc-cccccccccccc'] 'cccccccc-cccc-cccc-cccc-cccccccccccc']
NETNS_SAMPLE_IPROUTE2_4 = [
'12345678-1234-5678-abcd-1234567890ab (id: 1)',
'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb (id: 0)',
'cccccccc-cccc-cccc-cccc-cccccccccccc (id: 2)']
LINK_SAMPLE = [ LINK_SAMPLE = [
'1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN \\' '1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN \\'
'link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 promiscuity 0', 'link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 promiscuity 0',
@ -318,29 +313,31 @@ class TestIpWrapper(base.BaseTestCase):
self.assertEqual(device_name, somedevice.name) self.assertEqual(device_name, somedevice.name)
self.assertFalse(devices) self.assertFalse(devices)
def test_get_namespaces_non_root(self): @mock.patch.object(pyroute2.netns, 'listnetns')
@mock.patch.object(priv_lib, 'list_netns')
def test_get_namespaces_non_root(self, priv_listnetns, listnetns):
self.config(group='AGENT', use_helper_for_ns_read=False) self.config(group='AGENT', use_helper_for_ns_read=False)
self.execute.return_value = '\n'.join(NETNS_SAMPLE) listnetns.return_value = NETNS_SAMPLE
retval = ip_lib.IPWrapper.get_namespaces() retval = ip_lib.list_network_namespaces()
self.assertEqual(retval, self.assertEqual(retval,
['12345678-1234-5678-abcd-1234567890ab', ['12345678-1234-5678-abcd-1234567890ab',
'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb',
'cccccccc-cccc-cccc-cccc-cccccccccccc']) 'cccccccc-cccc-cccc-cccc-cccccccccccc'])
self.assertEqual(1, listnetns.call_count)
self.assertFalse(priv_listnetns.called)
self.execute.assert_called_once_with([], 'netns', ('list',), @mock.patch.object(pyroute2.netns, 'listnetns')
run_as_root=False) @mock.patch.object(priv_lib, 'list_netns')
def test_get_namespaces_root(self, priv_listnetns, listnetns):
def test_get_namespaces_iproute2_4_root(self):
self.config(group='AGENT', use_helper_for_ns_read=True) self.config(group='AGENT', use_helper_for_ns_read=True)
self.execute.return_value = '\n'.join(NETNS_SAMPLE_IPROUTE2_4) priv_listnetns.return_value = NETNS_SAMPLE
retval = ip_lib.IPWrapper.get_namespaces() retval = ip_lib.list_network_namespaces()
self.assertEqual(retval, self.assertEqual(retval,
['12345678-1234-5678-abcd-1234567890ab', ['12345678-1234-5678-abcd-1234567890ab',
'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb',
'cccccccc-cccc-cccc-cccc-cccccccccccc']) 'cccccccc-cccc-cccc-cccc-cccccccccccc'])
self.assertEqual(1, priv_listnetns.call_count)
self.execute.assert_called_once_with([], 'netns', ('list',), self.assertFalse(listnetns.called)
run_as_root=True)
def test_add_tuntap(self): def test_add_tuntap(self):
ip_lib.IPWrapper().add_tuntap('tap0') ip_lib.IPWrapper().add_tuntap('tap0')
@ -398,17 +395,16 @@ class TestIpWrapper(base.BaseTestCase):
self.assertEqual(dev.namespace, 'ns') self.assertEqual(dev.namespace, 'ns')
self.assertEqual(dev.name, 'eth0') self.assertEqual(dev.name, 'eth0')
def test_ensure_namespace(self): @mock.patch.object(priv_lib, 'create_netns')
def test_ensure_namespace(self, create):
with mock.patch.object(ip_lib, 'IPDevice') as ip_dev: with mock.patch.object(ip_lib, 'IPDevice') as ip_dev:
ip = ip_lib.IPWrapper() ip = ip_lib.IPWrapper()
with mock.patch.object(ip.netns, 'exists') as ns_exists: with mock.patch.object(ip.netns, 'exists') as ns_exists:
with mock.patch('neutron.agent.common.utils.execute'): with mock.patch('neutron.agent.common.utils.execute'):
ns_exists.return_value = False ns_exists.return_value = False
ip.ensure_namespace('ns') ip.ensure_namespace('ns')
self.execute.assert_has_calls( create.assert_called_once_with('ns')
[mock.call([], 'netns', ('add', 'ns'), ns_exists.assert_called_once_with('ns')
run_as_root=True, namespace=None,
log_fail_as_error=True)])
ip_dev.assert_has_calls([mock.call('lo', namespace='ns'), ip_dev.assert_has_calls([mock.call('lo', namespace='ns'),
mock.call().link.set_up()]) mock.call().link.set_up()])
@ -1235,10 +1231,11 @@ class TestIpNetnsCommand(TestIPCmdBase):
self.command = 'netns' self.command = 'netns'
self.netns_cmd = ip_lib.IpNetnsCommand(self.parent) self.netns_cmd = ip_lib.IpNetnsCommand(self.parent)
def test_add_namespace(self): @mock.patch.object(priv_lib, 'create_netns')
def test_add_namespace(self, create):
with mock.patch('neutron.agent.common.utils.execute') as execute: with mock.patch('neutron.agent.common.utils.execute') as execute:
ns = self.netns_cmd.add('ns') ns = self.netns_cmd.add('ns')
self._assert_sudo([], ('add', 'ns'), use_root_namespace=True) create.assert_called_once_with('ns')
self.assertEqual(ns.namespace, 'ns') self.assertEqual(ns.namespace, 'ns')
execute.assert_called_once_with( execute.assert_called_once_with(
['ip', 'netns', 'exec', 'ns', ['ip', 'netns', 'exec', 'ns',
@ -1246,36 +1243,35 @@ class TestIpNetnsCommand(TestIPCmdBase):
run_as_root=True, check_exit_code=True, extra_ok_codes=None, run_as_root=True, check_exit_code=True, extra_ok_codes=None,
log_fail_as_error=True) log_fail_as_error=True)
def test_delete_namespace(self): @mock.patch.object(priv_lib, 'remove_netns')
with mock.patch('neutron.agent.common.utils.execute'): def test_delete_namespace(self, remove):
self.netns_cmd.delete('ns') self.netns_cmd.delete('ns')
self._assert_sudo([], ('delete', 'ns'), use_root_namespace=True) remove.assert_called_once_with('ns')
def test_namespace_exists_use_helper(self): @mock.patch.object(pyroute2.netns, 'listnetns')
@mock.patch.object(priv_lib, 'list_netns')
def test_namespace_exists_use_helper(self, priv_listnetns, listnetns):
self.config(group='AGENT', use_helper_for_ns_read=True) self.config(group='AGENT', use_helper_for_ns_read=True)
retval = '\n'.join(NETNS_SAMPLE) priv_listnetns.return_value = NETNS_SAMPLE
# need another instance to avoid mocking # need another instance to avoid mocking
netns_cmd = ip_lib.IpNetnsCommand(ip_lib.SubProcessBase()) netns_cmd = ip_lib.IpNetnsCommand(ip_lib.SubProcessBase())
with mock.patch('neutron.agent.common.utils.execute') as execute: self.assertTrue(
execute.return_value = retval netns_cmd.exists('bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb'))
self.assertTrue( self.assertEqual(1, priv_listnetns.call_count)
netns_cmd.exists('bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb')) self.assertFalse(listnetns.called)
execute.assert_called_once_with(['ip', '-o', 'netns', 'list'],
run_as_root=True,
log_fail_as_error=True)
def test_namespace_doest_not_exist_no_helper(self): @mock.patch.object(pyroute2.netns, 'listnetns')
@mock.patch.object(priv_lib, 'list_netns')
def test_namespace_does_not_exist_no_helper(self, priv_listnetns,
listnetns):
self.config(group='AGENT', use_helper_for_ns_read=False) self.config(group='AGENT', use_helper_for_ns_read=False)
retval = '\n'.join(NETNS_SAMPLE) listnetns.return_value = NETNS_SAMPLE
# need another instance to avoid mocking # need another instance to avoid mocking
netns_cmd = ip_lib.IpNetnsCommand(ip_lib.SubProcessBase()) netns_cmd = ip_lib.IpNetnsCommand(ip_lib.SubProcessBase())
with mock.patch('neutron.agent.common.utils.execute') as execute: self.assertFalse(
execute.return_value = retval netns_cmd.exists('bbbbbbbb-1111-2222-3333-bbbbbbbbbbbb'))
self.assertFalse( self.assertEqual(1, listnetns.call_count)
netns_cmd.exists('bbbbbbbb-1111-2222-3333-bbbbbbbbbbbb')) self.assertFalse(priv_listnetns.called)
execute.assert_called_once_with(['ip', '-o', 'netns', 'list'],
run_as_root=False,
log_fail_as_error=True)
def test_execute(self): def test_execute(self):
self.parent.namespace = 'ns' self.parent.namespace = 'ns'

View File

@ -990,11 +990,11 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
self.assertRaises(RuntimeError, self.assertRaises(RuntimeError,
self.iptables._apply_synchronized) self.iptables._apply_synchronized)
self.iptables.namespace = 'test' self.iptables.namespace = 'test'
with mock.patch('neutron.agent.linux.ip_lib.IpNetnsCommand.exists', with mock.patch('neutron.agent.linux.ip_lib.network_namespace_exists',
return_value=True): return_value=True):
self.assertRaises(RuntimeError, self.assertRaises(RuntimeError,
self.iptables._apply_synchronized) self.iptables._apply_synchronized)
with mock.patch('neutron.agent.linux.ip_lib.IpNetnsCommand.exists', with mock.patch('neutron.agent.linux.ip_lib.network_namespace_exists',
return_value=False): return_value=False):
self.assertEqual([], self.iptables._apply_synchronized()) self.assertEqual([], self.iptables._apply_synchronized())

View File

@ -363,8 +363,9 @@ class TestNetnsCleanup(base.BaseTestCase):
def test_main(self): def test_main(self):
namespaces = ['ns1', 'ns2'] namespaces = ['ns1', 'ns2']
with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip_wrap: with mock.patch('neutron.agent.linux.ip_lib.'
ip_wrap.get_namespaces.return_value = namespaces 'list_network_namespaces') as listnetns:
listnetns.return_value = namespaces
with mock.patch('time.sleep') as time_sleep: with mock.patch('time.sleep') as time_sleep:
conf = mock.Mock() conf = mock.Mock()
@ -388,15 +389,15 @@ class TestNetnsCleanup(base.BaseTestCase):
[mock.call(conf, 'ns1', False), [mock.call(conf, 'ns1', False),
mock.call(conf, 'ns2', False)]) mock.call(conf, 'ns2', False)])
ip_wrap.assert_has_calls( self.assertEqual(1, listnetns.call_count)
[mock.call.get_namespaces()])
time_sleep.assert_called_once_with(2) time_sleep.assert_called_once_with(2)
def test_main_no_candidates(self): def test_main_no_candidates(self):
namespaces = ['ns1', 'ns2'] namespaces = ['ns1', 'ns2']
with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip_wrap: with mock.patch('neutron.agent.linux.ip_lib.'
ip_wrap.get_namespaces.return_value = namespaces 'list_network_namespaces') as listnetns:
listnetns.return_value = namespaces
with mock.patch('time.sleep') as time_sleep: with mock.patch('time.sleep') as time_sleep:
conf = mock.Mock() conf = mock.Mock()
@ -412,8 +413,7 @@ class TestNetnsCleanup(base.BaseTestCase):
with mock.patch('neutron.common.config.setup_logging'): with mock.patch('neutron.common.config.setup_logging'):
util.main() util.main()
ip_wrap.assert_has_calls( self.assertEqual(1, listnetns.call_count)
[mock.call.get_namespaces()])
mocks['eligible_for_deletion'].assert_has_calls( mocks['eligible_for_deletion'].assert_has_calls(
[mock.call(conf, 'ns1', False), [mock.call(conf, 'ns1', False),

View File

@ -43,9 +43,12 @@ class TestDebugCommands(base.BaseTestCase):
device_exists_p = mock.patch( device_exists_p = mock.patch(
'neutron.agent.linux.ip_lib.device_exists', return_value=False) 'neutron.agent.linux.ip_lib.device_exists', return_value=False)
device_exists_p.start() device_exists_p.start()
namespace_p = mock.patch( namespace_e_p = mock.patch(
'neutron.agent.linux.ip_lib.IpNetnsCommand') 'neutron.agent.linux.ip_lib.network_namespace_exists')
namespace_p.start() namespace_e_p.start()
namespace_d_p = mock.patch(
'neutron.agent.linux.ip_lib.delete_network_namespace')
namespace_d_p.start()
ensure_namespace_p = mock.patch( ensure_namespace_p = mock.patch(
'neutron.agent.linux.ip_lib.IPWrapper.ensure_namespace') 'neutron.agent.linux.ip_lib.IPWrapper.ensure_namespace')
ensure_namespace_p.start() ensure_namespace_p.start()

View File

@ -134,7 +134,7 @@ class IptablesDriverTestCase(base.BaseTestCase):
self.v4filter_inst = mock.Mock() self.v4filter_inst = mock.Mock()
self.v6filter_inst = mock.Mock() self.v6filter_inst = mock.Mock()
self.namespace_exists_p = mock.patch( self.namespace_exists_p = mock.patch(
'neutron.agent.linux.ip_lib.IpNetnsCommand.exists') 'neutron.agent.linux.ip_lib.network_namespace_exists')
self.namespace_exists = self.namespace_exists_p.start() self.namespace_exists = self.namespace_exists_p.start()
self.snat_ns_name_p = mock.patch( self.snat_ns_name_p = mock.patch(
'neutron.agent.l3.dvr_snat_ns.SnatNamespace.get_snat_ns_name') 'neutron.agent.l3.dvr_snat_ns.SnatNamespace.get_snat_ns_name')