remove unnecessary neutron files under neutron/tests

Change-Id: Id1f4ed4a5752a6622cb59a4836cd4189d7ee615c
changes/07/104807/1
Isaku Yamahata 9 years ago
parent a310e46077
commit ff34dae544

@ -1,74 +0,0 @@
# Copyright 2014 Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import random
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.plugins.common import constants as q_const
from neutron.tests import base
BR_PREFIX = 'test-br'
class BaseLinuxTestCase(base.BaseTestCase):
def setUp(self, root_helper='sudo'):
super(BaseLinuxTestCase, self).setUp()
self.root_helper = root_helper
def check_command(self, cmd, error_text, skip_msg):
try:
utils.execute(cmd)
except RuntimeError as e:
if error_text in str(e):
self.skipTest(skip_msg)
raise
def check_sudo_enabled(self):
if os.environ.get('OS_SUDO_TESTING') not in base.TRUE_STRING:
self.skipTest('testing with sudo is not enabled')
def get_rand_name(self, max_length, prefix='test'):
name = prefix + str(random.randint(1, 0x7fffffff))
return name[:max_length]
def create_resource(self, name_prefix, creation_func, *args, **kwargs):
"""Create a new resource that does not already exist.
:param name_prefix: The prefix for a randomly generated name
:param creation_func: A function taking the name of the resource
to be created as it's first argument. An error is assumed
to indicate a name collision.
:param *args *kwargs: These will be passed to the create function.
"""
while True:
name = self.get_rand_name(q_const.MAX_DEV_NAME_LEN, name_prefix)
try:
return creation_func(name, *args, **kwargs)
except RuntimeError:
continue
class BaseOVSLinuxTestCase(BaseLinuxTestCase):
def setUp(self, root_helper='sudo'):
super(BaseOVSLinuxTestCase, self).setUp(root_helper)
self.ovs = ovs_lib.BaseOVS(self.root_helper)
def create_ovs_bridge(self, br_prefix=BR_PREFIX):
br = self.create_resource(br_prefix, self.ovs.add_bridge)
self.addCleanup(br.destroy)
return br

@ -1,71 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import fixtures
from six import moves
from neutron.agent.linux import async_process
from neutron.tests import base
class TestAsyncProcess(base.BaseTestCase):
def setUp(self):
super(TestAsyncProcess, self).setUp()
self.test_file_path = self.useFixture(
fixtures.TempDir()).join("test_async_process.tmp")
self.data = [str(x) for x in moves.xrange(4)]
with file(self.test_file_path, 'w') as f:
f.writelines('%s\n' % item for item in self.data)
def _check_stdout(self, proc):
# Ensure that all the output from the file is read
output = []
while output != self.data:
new_output = list(proc.iter_stdout())
if new_output:
output += new_output
eventlet.sleep(0.01)
def test_stopping_async_process_lifecycle(self):
with self.assert_max_execution_time():
proc = async_process.AsyncProcess(['tail', '-f',
self.test_file_path])
proc.start()
self._check_stdout(proc)
proc.stop()
# Ensure that the process and greenthreads have stopped
proc._process.wait()
self.assertEqual(proc._process.returncode, -9)
for watcher in proc._watchers:
watcher.wait()
def test_async_process_respawns(self):
with self.assert_max_execution_time():
proc = async_process.AsyncProcess(['tail', '-f',
self.test_file_path],
respawn_interval=0)
proc.start()
# Ensure that the same output is read twice
self._check_stdout(proc)
pid = proc._get_pid_to_kill()
proc._kill_process(pid)
self._check_stdout(proc)
proc.stop()

@ -1,108 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests in this module will be skipped unless:
- ovsdb-client is installed
- ovsdb-client can be invoked via password-less sudo
- OS_SUDO_TESTING is set to '1' or 'True' in the test execution
environment
The jenkins gate does not allow direct sudo invocation during test
runs, but configuring OS_SUDO_TESTING ensures that developers are
still able to execute tests that require the capability.
"""
import eventlet
from neutron.agent.linux import ovsdb_monitor
from neutron.tests.functional.agent.linux import base as base_agent
class BaseMonitorTest(base_agent.BaseOVSLinuxTestCase):
def setUp(self):
# Emulate using a rootwrap script with sudo
super(BaseMonitorTest, self).setUp(root_helper='sudo sudo')
self._check_test_requirements()
self.bridge = self.create_ovs_bridge()
def _check_test_requirements(self):
self.check_sudo_enabled()
self.check_command(['which', 'ovsdb-client'],
'Exit code: 1', 'ovsdb-client is not installed')
self.check_command(['sudo', '-n', 'ovsdb-client', 'list-dbs'],
'Exit code: 1',
'password-less sudo not granted for ovsdb-client')
class TestOvsdbMonitor(BaseMonitorTest):
def setUp(self):
super(TestOvsdbMonitor, self).setUp()
self.monitor = ovsdb_monitor.OvsdbMonitor('Bridge',
root_helper=self.root_helper)
self.addCleanup(self.monitor.stop)
self.monitor.start()
def collect_initial_output(self):
while True:
output = list(self.monitor.iter_stdout())
if output:
return output[0]
eventlet.sleep(0.01)
def test_killed_monitor_respawns(self):
with self.assert_max_execution_time():
self.monitor.respawn_interval = 0
old_pid = self.monitor._process.pid
output1 = self.collect_initial_output()
pid = self.monitor._get_pid_to_kill()
self.monitor._kill_process(pid)
self.monitor._reset_queues()
while (self.monitor._process.pid == old_pid):
eventlet.sleep(0.01)
output2 = self.collect_initial_output()
# Initial output should appear twice
self.assertEqual(output1, output2)
class TestSimpleInterfaceMonitor(BaseMonitorTest):
def setUp(self):
super(TestSimpleInterfaceMonitor, self).setUp()
self.monitor = ovsdb_monitor.SimpleInterfaceMonitor(
root_helper=self.root_helper)
self.addCleanup(self.monitor.stop)
self.monitor.start(block=True)
def test_has_updates(self):
self.assertTrue(self.monitor.has_updates,
'Initial call should always be true')
self.assertFalse(self.monitor.has_updates,
'has_updates without port addition should be False')
self.create_resource('test-port-', self.bridge.add_port)
with self.assert_max_execution_time():
# has_updates after port addition should become True
while not self.monitor.has_updates:
eventlet.sleep(0.01)

@ -1,46 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2014 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from neutron.cmd.sanity import checks
from neutron.tests import base
class OVSSanityTestCase(base.BaseTestCase):
def setUp(self):
super(OVSSanityTestCase, self).setUp()
self.root_helper = 'sudo'
def check_sudo_enabled(self):
if os.environ.get('OS_SUDO_TESTING') not in base.TRUE_STRING:
self.skipTest('testing with sudo is not enabled')
def test_ovs_vxlan_support_runs(self):
"""This test just ensures that the test in neutron-sanity-check
can run through without error, without mocking anything out
"""
self.check_sudo_enabled()
checks.vxlan_supported(self.root_helper)
def test_ovs_patch_support_runs(self):
"""This test just ensures that the test in neutron-sanity-check
can run through without error, without mocking anything out
"""
self.check_sudo_enabled()
checks.patch_supported(self.root_helper)

@ -1,377 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 NEC Corporation
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Akihiro Motoki, NEC Corporation
#
import contextlib
import httplib
from oslo.config import cfg
from webob import exc
from neutron import context
from neutron.extensions import portbindings
from neutron import manager
from neutron.tests.unit import test_db_plugin
class PortBindingsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
# VIF_TYPE must be overridden according to plugin vif_type
VIF_TYPE = portbindings.VIF_TYPE_OTHER
# The plugin supports the port security feature such as
# security groups and anti spoofing.
HAS_PORT_FILTER = False
def _check_response_portbindings(self, port):
self.assertEqual(port[portbindings.VIF_TYPE], self.VIF_TYPE)
vif_details = port[portbindings.VIF_DETAILS]
# REVISIT(rkukura): Consider reworking tests to enable ML2 to bind
if self.VIF_TYPE not in [portbindings.VIF_TYPE_UNBOUND,
portbindings.VIF_TYPE_BINDING_FAILED]:
# TODO(rkukura): Replace with new VIF security details
self.assertEqual(vif_details[portbindings.CAP_PORT_FILTER],
self.HAS_PORT_FILTER)
def _check_response_no_portbindings(self, port):
self.assertIn('status', port)
self.assertNotIn(portbindings.VIF_TYPE, port)
self.assertNotIn(portbindings.VIF_DETAILS, port)
def _get_non_admin_context(self):
return context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
def test_port_vif_details(self):
with self.port(name='name') as port:
port_id = port['port']['id']
# Check a response of create_port
self._check_response_portbindings(port['port'])
# Check a response of get_port
ctx = context.get_admin_context()
port = self._show('ports', port_id, neutron_context=ctx)['port']
self._check_response_portbindings(port)
# By default user is admin - now test non admin user
ctx = self._get_non_admin_context()
non_admin_port = self._show(
'ports', port_id, neutron_context=ctx)['port']
self._check_response_no_portbindings(non_admin_port)
def test_ports_vif_details(self):
plugin = manager.NeutronManager.get_plugin()
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(), self.port()):
ctx = context.get_admin_context()
ports = plugin.get_ports(ctx)
self.assertEqual(len(ports), 2)
for port in ports:
self._check_response_portbindings(port)
# By default user is admin - now test non admin user
ctx = self._get_non_admin_context()
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(len(ports), 2)
for non_admin_port in ports:
self._check_response_no_portbindings(non_admin_port)
def _check_port_binding_profile(self, port, profile=None):
# For plugins which does not use binding:profile attr
# we just check an operation for the port succeed.
self.assertIn('id', port)
def _test_create_port_binding_profile(self, profile):
profile_arg = {portbindings.PROFILE: profile}
with self.port(arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
port_id = port['port']['id']
self._check_port_binding_profile(port['port'], profile)
port = self._show('ports', port_id)
self._check_port_binding_profile(port['port'], profile)
def test_create_port_binding_profile_none(self):
self._test_create_port_binding_profile(None)
def test_create_port_binding_profile_with_empty_dict(self):
self._test_create_port_binding_profile({})
def _test_update_port_binding_profile(self, profile):
profile_arg = {portbindings.PROFILE: profile}
with self.port() as port:
# print "(1) %s" % port
self._check_port_binding_profile(port['port'])
port_id = port['port']['id']
ctx = context.get_admin_context()
port = self._update('ports', port_id, {'port': profile_arg},
neutron_context=ctx)['port']
self._check_port_binding_profile(port, profile)
port = self._show('ports', port_id)['port']
self._check_port_binding_profile(port, profile)
def test_update_port_binding_profile_none(self):
self._test_update_port_binding_profile(None)
def test_update_port_binding_profile_with_empty_dict(self):
self._test_update_port_binding_profile({})
def test_port_create_portinfo_non_admin(self):
profile_arg = {portbindings.PROFILE: {'dummy': 'dummy'}}
with self.network(set_context=True, tenant_id='test') as net1:
with self.subnet(network=net1) as subnet1:
# succeed without binding:profile
with self.port(subnet=subnet1,
set_context=True, tenant_id='test'):
pass
# fail with binding:profile
try:
with self.port(subnet=subnet1,
expected_res_status=403,
arg_list=(portbindings.PROFILE,),
set_context=True, tenant_id='test',
**profile_arg):
pass
except exc.HTTPClientError:
pass
def test_port_update_portinfo_non_admin(self):
profile_arg = {portbindings.PROFILE: {'dummy': 'dummy'}}
with self.network() as net1:
with self.subnet(network=net1) as subnet1:
with self.port(subnet=subnet1) as port:
# By default user is admin - now test non admin user
# Note that 404 is returned when prohibit by policy.
# See comment for PolicyNotAuthorized except clause
# in update() in neutron.api.v2.base.Controller.
port_id = port['port']['id']
ctx = self._get_non_admin_context()
port = self._update('ports', port_id,
{'port': profile_arg},
expected_code=404,
neutron_context=ctx)
class PortBindingsHostTestCaseMixin(object):
fmt = 'json'
hostname = 'testhost'
def _check_response_portbindings_host(self, port):
self.assertEqual(port[portbindings.HOST_ID], self.hostname)
def _check_response_no_portbindings_host(self, port):
self.assertIn('status', port)
self.assertNotIn(portbindings.HOST_ID, port)
def test_port_vif_non_admin(self):
with self.network(set_context=True,
tenant_id='test') as net1:
with self.subnet(network=net1) as subnet1:
host_arg = {portbindings.HOST_ID: self.hostname}
try:
with self.port(subnet=subnet1,
expected_res_status=403,
arg_list=(portbindings.HOST_ID,),
set_context=True,
tenant_id='test',
**host_arg):
pass
except exc.HTTPClientError:
pass
def test_port_vif_host(self):
host_arg = {portbindings.HOST_ID: self.hostname}
with self.port(name='name', arg_list=(portbindings.HOST_ID,),
**host_arg) as port:
port_id = port['port']['id']
# Check a response of create_port
self._check_response_portbindings_host(port['port'])
# Check a response of get_port
ctx = context.get_admin_context()
port = self._show('ports', port_id, neutron_context=ctx)['port']
self._check_response_portbindings_host(port)
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
non_admin_port = self._show(
'ports', port_id, neutron_context=ctx)['port']
self._check_response_no_portbindings_host(non_admin_port)
def test_ports_vif_host(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
host_arg = {portbindings.HOST_ID: self.hostname}
with contextlib.nested(
self.port(name='name1',
arg_list=(portbindings.HOST_ID,),
**host_arg),
self.port(name='name2')):
ctx = context.get_admin_context()
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for port in ports:
if port['name'] == 'name1':
self._check_response_portbindings_host(port)
else:
self.assertFalse(port[portbindings.HOST_ID])
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for non_admin_port in ports:
self._check_response_no_portbindings_host(non_admin_port)
def test_ports_vif_host_update(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
host_arg = {portbindings.HOST_ID: self.hostname}
with contextlib.nested(
self.port(name='name1',
arg_list=(portbindings.HOST_ID,),
**host_arg),
self.port(name='name2')) as (port1, port2):
data = {'port': {portbindings.HOST_ID: 'testhosttemp'}}
req = self.new_update_request('ports', data, port1['port']['id'])
req.get_response(self.api)
req = self.new_update_request('ports', data, port2['port']['id'])
ctx = context.get_admin_context()
req.get_response(self.api)
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for port in ports:
self.assertEqual('testhosttemp', port[portbindings.HOST_ID])
def test_ports_vif_non_host_update(self):
host_arg = {portbindings.HOST_ID: self.hostname}
with self.port(name='name', arg_list=(portbindings.HOST_ID,),
**host_arg) as port:
data = {'port': {'admin_state_up': False}}
req = self.new_update_request('ports', data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(port['port'][portbindings.HOST_ID],
res['port'][portbindings.HOST_ID])
def test_ports_vif_non_host_update_when_host_null(self):
with self.port() as port:
data = {'port': {'admin_state_up': False}}
req = self.new_update_request('ports', data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(port['port'][portbindings.HOST_ID],
res['port'][portbindings.HOST_ID])
def test_ports_vif_host_list(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
host_arg = {portbindings.HOST_ID: self.hostname}
with contextlib.nested(
self.port(name='name1',
arg_list=(portbindings.HOST_ID,),
**host_arg),
self.port(name='name2'),
self.port(name='name3',
arg_list=(portbindings.HOST_ID,),
**host_arg),) as (port1, _port2, port3):
self._test_list_resources(
'port', (port1, port3),
query_params='%s=%s' % (portbindings.HOST_ID, self.hostname))
class PortBindingsVnicTestCaseMixin(object):
fmt = 'json'
vnic_type = portbindings.VNIC_NORMAL
def _check_response_portbindings_vnic_type(self, port):
self.assertIn('status', port)
self.assertEqual(port[portbindings.VNIC_TYPE], self.vnic_type)
def test_port_vnic_type_non_admin(self):
with self.network(set_context=True,
tenant_id='test') as net1:
with self.subnet(network=net1) as subnet1:
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type}
with self.port(subnet=subnet1,
expected_res_status=httplib.CREATED,
arg_list=(portbindings.VNIC_TYPE,),
set_context=True,
tenant_id='test',
**vnic_arg) as port:
# Check a response of create_port
self._check_response_portbindings_vnic_type(port['port'])
def test_port_vnic_type(self):
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type}
with self.port(name='name', arg_list=(portbindings.VNIC_TYPE,),
**vnic_arg) as port:
port_id = port['port']['id']
# Check a response of create_port
self._check_response_portbindings_vnic_type(port['port'])
# Check a response of get_port
ctx = context.get_admin_context()
port = self._show('ports', port_id, neutron_context=ctx)['port']
self._check_response_portbindings_vnic_type(port)
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
non_admin_port = self._show(
'ports', port_id, neutron_context=ctx)['port']
self._check_response_portbindings_vnic_type(non_admin_port)
def test_ports_vnic_type(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type}
with contextlib.nested(
self.port(name='name1',
arg_list=(portbindings.VNIC_TYPE,),
**vnic_arg),
self.port(name='name2')):
ctx = context.get_admin_context()
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for port in ports:
if port['name'] == 'name1':
self._check_response_portbindings_vnic_type(port)
else:
self.assertEqual(portbindings.VNIC_NORMAL,
port[portbindings.VNIC_TYPE])
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for non_admin_port in ports:
self._check_response_portbindings_vnic_type(non_admin_port)
def test_ports_vnic_type_list(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type}
with contextlib.nested(
self.port(name='name1',
arg_list=(portbindings.VNIC_TYPE,),
**vnic_arg),
self.port(name='name2'),
self.port(name='name3',
arg_list=(portbindings.VNIC_TYPE,),
**vnic_arg),) as (port1, port2, port3):
self._test_list_resources(
'port', (port1, port2, port3),
query_params='%s=%s' % (portbindings.VNIC_TYPE,
self.vnic_type))

@ -1,967 +0,0 @@
# Copyright 2012, VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import mock
from oslo.config import cfg
import testtools
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.common import exceptions
from neutron.openstack.common import jsonutils
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as p_const
from neutron.plugins.openvswitch.common import constants as const
from neutron.tests import base
from neutron.tests import tools
try:
OrderedDict = collections.OrderedDict
except AttributeError:
import ordereddict
OrderedDict = ordereddict.OrderedDict
OVS_LINUX_KERN_VERS_WITHOUT_VXLAN = "3.12.0"
class TestBaseOVS(base.BaseTestCase):
def setUp(self):
super(TestBaseOVS, self).setUp()
self.root_helper = 'sudo'
self.ovs = ovs_lib.BaseOVS(self.root_helper)
self.br_name = 'bridge1'
def test_add_bridge(self):
with mock.patch.object(self.ovs, 'run_vsctl') as mock_vsctl:
bridge = self.ovs.add_bridge(self.br_name)
mock_vsctl.assert_called_with(["--", "--may-exist",
"add-br", self.br_name])
self.assertEqual(bridge.br_name, self.br_name)
self.assertEqual(bridge.root_helper, self.ovs.root_helper)
def test_delete_bridge(self):
with mock.patch.object(self.ovs, 'run_vsctl') as mock_vsctl:
self.ovs.delete_bridge(self.br_name)
mock_vsctl.assert_called_with(["--", "--if-exists", "del-br",
self.br_name])
def test_bridge_exists_returns_true(self):
with mock.patch.object(self.ovs, 'run_vsctl') as mock_vsctl:
self.assertTrue(self.ovs.bridge_exists(self.br_name))
mock_vsctl.assert_called_with(['br-exists', self.br_name],
check_error=True)
def test_bridge_exists_returns_false_for_exit_code_2(self):
with mock.patch.object(self.ovs, 'run_vsctl',
side_effect=RuntimeError('Exit code: 2\n')):
self.assertFalse(self.ovs.bridge_exists('bridge1'))
def test_bridge_exists_raises_unknown_exception(self):
with mock.patch.object(self.ovs, 'run_vsctl',
side_effect=RuntimeError()):
with testtools.ExpectedException(RuntimeError):
self.ovs.bridge_exists('bridge1')
def test_get_bridge_name_for_port_name_returns_bridge_for_valid_port(self):
port_name = 'bar'
with mock.patch.object(self.ovs, 'run_vsctl',
return_value=self.br_name) as mock_vsctl:
bridge = self.ovs.get_bridge_name_for_port_name(port_name)
self.assertEqual(bridge, self.br_name)
mock_vsctl.assert_called_with(['port-to-br', port_name],
check_error=True)
def test_get_bridge_name_for_port_name_returns_none_for_exit_code_1(self):
with mock.patch.object(self.ovs, 'run_vsctl',
side_effect=RuntimeError('Exit code: 1\n')):
self.assertFalse(self.ovs.get_bridge_name_for_port_name('bridge1'))
def test_get_bridge_name_for_port_name_raises_unknown_exception(self):
with mock.patch.object(self.ovs, 'run_vsctl',
side_effect=RuntimeError()):
with testtools.ExpectedException(RuntimeError):
self.ovs.get_bridge_name_for_port_name('bridge1')
def _test_port_exists(self, br_name, result):
with mock.patch.object(self.ovs,
'get_bridge_name_for_port_name',
return_value=br_name):
self.assertEqual(self.ovs.port_exists('bar'), result)
def test_port_exists_returns_true_for_bridge_name(self):
self._test_port_exists(self.br_name, True)
def test_port_exists_returns_false_for_none(self):
self._test_port_exists(None, False)
class OVS_Lib_Test(base.BaseTestCase):
"""A test suite to exercise the OVS libraries shared by Neutron agents.
Note: these tests do not actually execute ovs-* utilities, and thus
can run on any system. That does, however, limit their scope.
"""
def setUp(self):
super(OVS_Lib_Test, self).setUp()
self.BR_NAME = "br-int"
self.TO = "--timeout=10"
self.root_helper = 'sudo'
self.br = ovs_lib.OVSBridge(self.BR_NAME, self.root_helper)
self.execute = mock.patch.object(
utils, "execute", spec=utils.execute).start()
def test_vifport(self):
"""Create and stringify vif port, confirm no exceptions."""
pname = "vif1.0"
ofport = 5
vif_id = uuidutils.generate_uuid()
mac = "ca:fe:de:ad:be:ef"
# test __init__
port = ovs_lib.VifPort(pname, ofport, vif_id, mac, self.br)
self.assertEqual(port.port_name, pname)
self.assertEqual(port.ofport, ofport)
self.assertEqual(port.vif_id, vif_id)
self.assertEqual(port.vif_mac, mac)
self.assertEqual(port.switch.br_name, self.BR_NAME)
# test __str__
str(port)
def test_set_controller(self):
controller_names = ['tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555']
self.br.set_controller(controller_names)
self.execute.assert_called_once_with(
['ovs-vsctl', self.TO, '--', 'set-controller', self.BR_NAME,
'tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555'],
root_helper=self.root_helper)
def test_del_controller(self):
self.br.del_controller()
self.execute.assert_called_once_with(
['ovs-vsctl', self.TO, '--', 'del-controller', self.BR_NAME],
root_helper=self.root_helper)
def test_get_controller(self):
self.execute.return_value = 'tcp:127.0.0.1:6633\ntcp:172.17.16.10:5555'
names = self.br.get_controller()
self.assertEqual(names,
['tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555'])
self.execute.assert_called_once_with(
['ovs-vsctl', self.TO, '--', 'get-controller', self.BR_NAME],
root_helper=self.root_helper)
def test_set_secure_mode(self):
self.br.set_secure_mode()
self.execute.assert_called_once_with(
['ovs-vsctl', self.TO, '--', 'set-fail-mode', self.BR_NAME,
'secure'], root_helper=self.root_helper)
def test_set_protocols(self):
protocols = 'OpenFlow13'
self.br.set_protocols(protocols)
self.execute.assert_called_once_with(
['ovs-vsctl', self.TO, '--', 'set', 'bridge', self.BR_NAME,
"protocols=%s" % protocols],
root_helper=self.root_helper)
def test_create(self):
self.br.add_bridge(self.BR_NAME)
self.br.create()
def test_destroy(self):
self.br.delete_bridge(self.BR_NAME)
self.br.destroy()
def test_reset_bridge(self):
self.br.destroy()
self.br.create()
self.br.reset_bridge()
def _build_timeout_opt(self, exp_timeout):
return "--timeout=%d" % exp_timeout if exp_timeout else self.TO
def _test_delete_port(self, exp_timeout=None):
exp_timeout_str = self._build_timeout_opt(exp_timeout)
pname = "tap5"
self.br.delete_port(pname)
self.execute.assert_called_once_with(
["ovs-vsctl", exp_timeout_str, "--", "--if-exists",
"del-port", self.BR_NAME, pname],
root_helper=self.root_helper)
def test_delete_port(self):
self._test_delete_port()
def test_call_command_non_default_timeput(self):
# This test is only for verifying a non-default timeout
# is correctly applied. Does not need to be repeated for
# every ovs_lib method
new_timeout = 5
self.br.vsctl_timeout = new_timeout
self._test_delete_port(new_timeout)
def test_add_flow(self):
ofport = "99"
vid = 4000
lsw_id = 18
cidr = '192.168.1.0/24'
flow_dict_1 = OrderedDict([('priority', 2),
('dl_src', 'ca:fe:de:ad:be:ef'),
('actions', 'strip_vlan,output:0')])
flow_dict_2 = OrderedDict([('priority', 1),
('actions', 'normal')])
flow_dict_3 = OrderedDict([('priority', 2),
('actions', 'drop')])
flow_dict_4 = OrderedDict([('priority', 2),
('in_port', ofport),
('actions', 'drop')])
flow_dict_5 = OrderedDict([
('priority', 4),
('in_port', ofport),
('dl_vlan', vid),
('actions', "strip_vlan,set_tunnel:%s,normal" % (lsw_id))])
flow_dict_6 = OrderedDict([
('priority', 3),
('tun_id', lsw_id),
('actions', "mod_vlan_vid:%s,output:%s" % (vid, ofport))])
flow_dict_7 = OrderedDict([
('priority', 4),
('nw_src', cidr),
('proto', 'arp'),
('actions', 'drop')])
self.br.add_flow(**flow_dict_1)
self.br.add_flow(**flow_dict_2)
self.br.add_flow(**flow_dict_3)
self.br.add_flow(**flow_dict_4)
self.br.add_flow(**flow_dict_5)
self.br.add_flow(**flow_dict_6)
self.br.add_flow(**flow_dict_7)
expected_calls = [
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=2,dl_src=ca:fe:de:ad:be:ef"
",actions=strip_vlan,output:0"],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=1,actions=normal"],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=2,actions=drop"],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=2,in_port=%s,actions=drop" % ofport],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=4,dl_vlan=%s,in_port=%s,"
"actions=strip_vlan,set_tunnel:%s,normal"
% (vid, ofport, lsw_id)],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=3,tun_id=%s,actions="
"mod_vlan_vid:%s,output:%s"
% (lsw_id, vid, ofport)],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=4,nw_src=%s,arp,actions=drop" % cidr],
process_input=None, root_helper=self.root_helper),
]
self.execute.assert_has_calls(expected_calls)
def test_add_flow_timeout_set(self):
flow_dict = OrderedDict([('priority', 1),
('hard_timeout', 1000),
('idle_timeout', 2000),
('actions', 'normal')])
self.br.add_flow(**flow_dict)
self.execute.assert_called_once_with(
["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=1000,idle_timeout=2000,priority=1,actions=normal"],
process_input=None,
root_helper=self.root_helper)
def test_add_flow_default_priority(self):
flow_dict = OrderedDict([('actions', 'normal')])
self.br.add_flow(**flow_dict)
self.execute.assert_called_once_with(
["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,priority=1,actions=normal"],
process_input=None,
root_helper=self.root_helper)
def test_get_port_ofport(self):
pname = "tap99"
ofport = "6"
self.execute.return_value = ofport
self.assertEqual(self.br.get_port_ofport(pname), ofport)
self.execute.assert_called_once_with(
["ovs-vsctl", self.TO, "get", "Interface", pname, "ofport"],
root_helper=self.root_helper)
def test_get_port_ofport_non_int(self):
pname = "tap99"
ofport = "[]"
self.execute.return_value = ofport
self.assertEqual(self.br.get_port_ofport(pname), const.INVALID_OFPORT)
self.execute.assert_called_once_with(
["ovs-vsctl", self.TO, "get", "Interface", pname, "ofport"],
root_helper=self.root_helper)
def test_get_datapath_id(self):
datapath_id = '"0000b67f4fbcc149"'
self.execute.return_value = datapath_id
self.assertEqual(self.br.get_datapath_id(), datapath_id.strip('"'))
self.execute.assert_called_once_with(
["ovs-vsctl", self.TO, "get",
"Bridge", self.BR_NAME, "datapath_id"],
root_helper=self.root_helper)
def test_count_flows(self):
self.execute.return_value = 'ignore\nflow-1\n'
# counts the number of flows as total lines of output - 2
self.assertEqual(self.br.count_flows(), 1)
self.execute.assert_called_once_with(
["ovs-ofctl", "dump-flows", self.BR_NAME],
root_helper=self.root_helper,
process_input=None)
def test_delete_flow(self):
ofport = "5"
lsw_id = 40
vid = 39
self.br.delete_flows(in_port=ofport)
self.br.delete_flows(tun_id=lsw_id)
self.br.delete_flows(dl_vlan=vid)
expected_calls = [
mock.call(["ovs-ofctl", "del-flows", self.BR_NAME,
"in_port=" + ofport],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "del-flows", self.BR_NAME,
"tun_id=%s" % lsw_id],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "del-flows", self.BR_NAME,
"dl_vlan=%s" % vid],
process_input=None, root_helper=self.root_helper),
]
self.execute.assert_has_calls(expected_calls)
def test_delete_flow_with_priority_set(self):
params = {'in_port': '1',
'priority': '1'}
self.assertRaises(exceptions.InvalidInput,
self.br.delete_flows,
**params)
def test_dump_flows(self):
table = 23
nxst_flow = "NXST_FLOW reply (xid=0x4):"
flows = "\n".join([" cookie=0x0, duration=18042.514s, table=0, "
"n_packets=6, n_bytes=468, "
"priority=2,in_port=1 actions=drop",
" cookie=0x0, duration=18027.562s, table=0, "
"n_packets=0, n_bytes=0, "
"priority=3,in_port=1,dl_vlan=100 "
"actions=mod_vlan_vid:1,NORMAL",
" cookie=0x0, duration=18044.351s, table=0, "
"n_packets=9, n_bytes=594, priority=1 "
"actions=NORMAL", " cookie=0x0, "
"duration=18044.211s, table=23, n_packets=0, "
"n_bytes=0, priority=0 actions=drop"])
flow_args = '\n'.join([nxst_flow, flows])
run_ofctl = mock.patch.object(self.br, 'run_ofctl').start()
run_ofctl.side_effect = [flow_args]
retflows = self.br.dump_flows_for_table(table)
self.assertEqual(flows, retflows)
def test_dump_flows_ovs_dead(self):
table = 23
run_ofctl = mock.patch.object(self.br, 'run_ofctl').start()
run_ofctl.side_effect = ['']
retflows = self.br.dump_flows_for_table(table)
self.assertEqual(None, retflows)
def test_mod_flow_with_priority_set(self):
params = {'in_port': '1',
'priority': '1'}
self.assertRaises(exceptions.InvalidInput,
self.br.mod_flow,
**params)
def test_mod_flow_no_actions_set(self):
params = {'in_port': '1'}
self.assertRaises(exceptions.InvalidInput,
self.br.mod_flow,
**params)
def test_defer_apply_flows(self):
flow_expr = mock.patch.object(ovs_lib, '_build_flow_expr_str').start()
flow_expr.side_effect = ['added_flow_1', 'added_flow_2',
'deleted_flow_1']
run_ofctl = mock.patch.object(self.br, 'run_ofctl').start()
self.br.defer_apply_on()
self.br.add_flow(flow='add_flow_1')
self.br.defer_apply_on()
self.br.add_flow(flow='add_flow_2')
self.br.delete_flows(flow='delete_flow_1')
self.br.defer_apply_off()
flow_expr.assert_has_calls([
mock.call({'flow': 'add_flow_1'}, 'add'),
mock.call({'flow': 'add_flow_2'}, 'add'),
mock.call({'flow': 'delete_flow_1'}, 'del')
])
run_ofctl.assert_has_calls([
mock.call('add-flows', ['-'], 'added_flow_1\nadded_flow_2\n'),
mock.call('del-flows', ['-'], 'deleted_flow_1\n')
])
def test_defer_apply_flows_concurrently(self):
flow_expr = mock.patch.object(ovs_lib, '_build_flow_expr_str').start()
flow_expr.side_effect = ['added_flow_1', 'deleted_flow_1',
'modified_flow_1', 'added_flow_2',
'deleted_flow_2', 'modified_flow_2']
run_ofctl = mock.patch.object(self.br, 'run_ofctl').start()
def run_ofctl_fake(cmd, args, process_input=None):
self.br.defer_apply_on()
if cmd == 'add-flows':
self.br.add_flow(flow='added_flow_2')
elif cmd == 'del-flows':
self.br.delete_flows(flow='deleted_flow_2')
elif cmd == 'mod-flows':
self.br.mod_flow(flow='modified_flow_2')
run_ofctl.side_effect = run_ofctl_fake
self.br.defer_apply_on()
self.br.add_flow(flow='added_flow_1')
self.br.delete_flows(flow='deleted_flow_1')
self.br.mod_flow(flow='modified_flow_1')
self.br.defer_apply_off()
run_ofctl.side_effect = None
self.br.defer_apply_off()
flow_expr.assert_has_calls([
mock.call({'flow': 'added_flow_1'}, 'add'),
mock.call({'flow': 'deleted_flow_1'}, 'del'),
mock.call({'flow': 'modified_flow_1'}, 'mod'),
mock.call({'flow': 'added_flow_2'}, 'add'),
mock.call({'flow': 'deleted_flow_2'}, 'del'),
mock.call({'flow': 'modified_flow_2'}, 'mod')
])
run_ofctl.assert_has_calls([
mock.call('add-flows', ['-'], 'added_flow_1\n'),
mock.call('del-flows', ['-'], 'deleted_flow_1\n'),
mock.call('mod-flows', ['-'], 'modified_flow_1\n'),
mock.call('add-flows', ['-'], 'added_flow_2\n'),
mock.call('del-flows', ['-'], 'deleted_flow_2\n'),
mock.call('mod-flows', ['-'], 'modified_flow_2\n')
])
def test_add_tunnel_port(self):
pname = "tap99"
local_ip = "1.1.1.1"
remote_ip = "9.9.9.9"
ofport = "6"
command = ["ovs-vsctl", self.TO, '--', "--may-exist", "add-port",
self.BR_NAME, pname]
command.extend(["--", "set", "Interface", pname])
command.extend(["type=gre", "options:df_default=true",
"options:remote_ip=" + remote_ip,
"options:local_ip=" + local_ip,
"options:in_key=flow",
"options:out_key=flow"])
# Each element is a tuple of (expected mock call, return_value)
expected_calls_and_values = [
(mock.call(command, root_helper=self.root_helper), None),
(mock.call(["ovs-vsctl", self.TO, "get",
"Interface", pname, "ofport"],
root_helper=self.root_helper),
ofport),
]
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.assertEqual(
self.br.add_tunnel_port(pname, remote_ip, local_ip),
ofport)
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_add_vxlan_fragmented_tunnel_port(self):
pname = "tap99"
local_ip = "1.1.1.1"
remote_ip = "9.9.9.9"
ofport = "6"
vxlan_udp_port = "9999"
dont_fragment = False
command = ["ovs-vsctl", self.TO, '--', "--may-exist", "add-port",
self.BR_NAME, pname]
command.extend(["--", "set", "Interface", pname])
command.extend(["type=" + p_const.TYPE_VXLAN,
"options:dst_port=" + vxlan_udp_port,
"options:df_default=false",
"options:remote_ip=" + remote_ip,
"options:local_ip=" + local_ip,
"options:in_key=flow",
"options:out_key=flow"])
# Each element is a tuple of (expected mock call, return_value)
expected_calls_and_values = [
(mock.call(command, root_helper=self.root_helper), None),
(mock.call(["ovs-vsctl", self.TO, "get",
"Interface", pname, "ofport"],
root_helper=self.root_helper),
ofport),
]
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.assertEqual(
self.br.add_tunnel_port(pname, remote_ip, local_ip,
p_const.TYPE_VXLAN, vxlan_udp_port,
dont_fragment),
ofport)
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_add_patch_port(self):
pname = "tap99"
peer = "bar10"
ofport = "6"
# Each element is a tuple of (expected mock call, return_value)
command = ["ovs-vsctl", self.TO, "add-port", self.BR_NAME, pname]
command.extend(["--", "set", "Interface", pname])
command.extend(["type=patch", "options:peer=" + peer])
expected_calls_and_values = [
(mock.call(command, root_helper=self.root_helper),
None),
(mock.call(["ovs-vsctl", self.TO, "get",
"Interface", pname, "ofport"],
root_helper=self.root_helper),
ofport)
]
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.assertEqual(self.br.add_patch_port(pname, peer), ofport)
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def _test_get_vif_ports(self, is_xen=False):
pname = "tap99"
ofport = "6"
vif_id = uuidutils.generate_uuid()
mac = "ca:fe:de:ad:be:ef"
if is_xen:
external_ids = ('{xs-vif-uuid="%s", attached-mac="%s"}'
% (vif_id, mac))
else:
external_ids = ('{iface-id="%s", attached-mac="%s"}'
% (vif_id, mac))
# Each element is a tuple of (expected mock call, return_value)
expected_calls_and_values = [
(mock.call(["ovs-vsctl", self.TO, "list-ports", self.BR_NAME],
root_helper=self.root_helper),
"%s\n" % pname),
(mock.call(["ovs-vsctl", self.TO, "get",
"Interface", pname, "external_ids"],
root_helper=self.root_helper),
external_ids),
(mock.call(["ovs-vsctl", self.TO, "get",
"Interface", pname, "ofport"],
root_helper=self.root_helper),
ofport),
]
if is_xen:
expected_calls_and_values.append(
(mock.call(["xe", "vif-param-get", "param-name=other-config",
"param-key=nicira-iface-id", "uuid=" + vif_id],
root_helper=self.root_helper),
vif_id)
)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
ports = self.br.get_vif_ports()
self.assertEqual(1, len(ports))
self.assertEqual(ports[0].port_name, pname)
self.assertEqual(ports[0].ofport, ofport)
self.assertEqual(ports[0].vif_id, vif_id)
self.assertEqual(ports[0].vif_mac, mac)
self.assertEqual(ports[0].switch.br_name, self.BR_NAME)
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def _encode_ovs_json(self, headings, data):
# See man ovs-vsctl(8) for the encoding details.
r = {"data": [],
"headings": headings}
for row in data:
ovs_row = []
r["data"].append(ovs_row)
for cell in row:
if isinstance(cell, (str, int, list)):
ovs_row.append(cell)
elif isinstance(cell, dict):
ovs_row.append(["map", cell.items()])
elif isinstance(cell, set):
ovs_row.append(["set", cell])
else:
raise TypeError('%r not int, str, list, set or dict' %
type(cell))
return jsonutils.dumps(r)
def _test_get_vif_port_set(self, is_xen):
if is_xen:
id_key = 'xs-vif-uuid'
else:
id_key = 'iface-id'
headings = ['name', 'external_ids']
data = [
# A vif port on this bridge:
['tap99', {id_key: 'tap99id', 'attached-mac': 'tap99mac'}, 1],
# A vif port on this bridge not yet configured
['tap98', {id_key: 'tap98id', 'attached-mac': 'tap98mac'}, []],
# Another vif port on this bridge not yet configured
['tap97', {id_key: 'tap97id', 'attached-mac': 'tap97mac'},
['set', []]],
# A vif port on another bridge:
['tap88', {id_key: 'tap88id', 'attached-mac': 'tap88id'}, 1],
# Non-vif port on this bridge:
['tun22', {}, 2],
]