diff --git a/neutron/tests/functional/agent/linux/base.py b/neutron/tests/functional/agent/linux/base.py index 44ba6b2fabb..c5ea717f7f7 100644 --- a/neutron/tests/functional/agent/linux/base.py +++ b/neutron/tests/functional/agent/linux/base.py @@ -15,12 +15,20 @@ import os import random +from neutron.agent.linux import ovs_lib from neutron.agent.linux import utils from neutron.plugins.common import constants as q_const from neutron.tests import base +BR_PREFIX = 'test-br' + + class BaseLinuxTestCase(base.BaseTestCase): + def setUp(self, root_helper='sudo'): + super(BaseLinuxTestCase, self).setUp() + + self.root_helper = root_helper def check_command(self, cmd, error_text, skip_msg): try: @@ -38,17 +46,29 @@ class BaseLinuxTestCase(base.BaseTestCase): name = prefix + str(random.randint(1, 0x7fffffff)) return name[:max_length] - def create_ovs_resource(self, name_prefix, creation_func): - """Create a new ovs resource that does not already exist. + def create_resource(self, name_prefix, creation_func, *args, **kwargs): + """Create a new resource that does not already exist. :param name_prefix: The prefix for a randomly generated name :param creation_func: A function taking the name of the resource - to be created. An error is assumed to indicate a name - collision. + to be created as it's first argument. An error is assumed + to indicate a name collision. + :param *args *kwargs: These will be passed to the create function. """ while True: name = self.get_rand_name(q_const.MAX_DEV_NAME_LEN, name_prefix) try: - return creation_func(name) + return creation_func(name, *args, **kwargs) except RuntimeError: continue + + +class BaseOVSLinuxTestCase(BaseLinuxTestCase): + def setUp(self, root_helper='sudo'): + super(BaseOVSLinuxTestCase, self).setUp(root_helper) + self.ovs = ovs_lib.BaseOVS(self.root_helper) + + def create_ovs_bridge(self, br_prefix=BR_PREFIX): + br = self.create_resource(br_prefix, self.ovs.add_bridge) + self.addCleanup(br.destroy) + return br diff --git a/neutron/tests/functional/agent/linux/test_ovs_vxlan.py b/neutron/tests/functional/agent/linux/test_ovs_vxlan.py new file mode 100644 index 00000000000..fbff6a5a2a1 --- /dev/null +++ b/neutron/tests/functional/agent/linux/test_ovs_vxlan.py @@ -0,0 +1,67 @@ +# Copyright 2014 Cisco Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.agent.linux import ovs_lib +from neutron.plugins.common import constants as p_const +from neutron.tests.functional.agent.linux import base as base_agent + + +PORT_PREFIX = 'testp-' +INVALID_OFPORT_ID = '-1' + + +class TestOVSAgentVXLAN(base_agent.BaseOVSLinuxTestCase): + + def setUp(self): + super(TestOVSAgentVXLAN, self).setUp() + + self._check_test_requirements() + + def _check_test_requirements(self): + self.check_sudo_enabled() + self.check_command(['which', 'ovs-vsctl'], + 'Exit code: 1', 'ovs-vsctl is not installed') + self.check_command(['sudo', '-n', 'ovs-vsctl', 'show'], + 'Exit code: 1', + 'password-less sudo not granted for ovs-vsctl') + + def test_ovs_lib_vxlan_version_check(self): + """Verify VXLAN versions match + + This function compares the return values of functionally checking if + VXLAN is supported with the ovs_lib programmatic check. It will fail + if the two do not align. + """ + expected = self.is_vxlan_supported() + actual = self.is_ovs_lib_vxlan_supported() + self.assertEqual(actual, expected) + + def is_ovs_lib_vxlan_supported(self): + try: + ovs_lib.check_ovs_vxlan_version(self.root_helper) + except SystemError: + return False + else: + return True + + def is_vxlan_supported(self): + bridge = self.create_ovs_bridge() + vxlan_port = self.create_resource( + PORT_PREFIX, + bridge.add_tunnel_port, + "10.10.10.10", + "10.10.10.20", + p_const.TYPE_VXLAN) + + return vxlan_port != INVALID_OFPORT_ID diff --git a/neutron/tests/functional/agent/linux/test_ovsdb_monitor.py b/neutron/tests/functional/agent/linux/test_ovsdb_monitor.py index 816df21401f..3ef5f94110b 100644 --- a/neutron/tests/functional/agent/linux/test_ovsdb_monitor.py +++ b/neutron/tests/functional/agent/linux/test_ovsdb_monitor.py @@ -32,32 +32,23 @@ still able to execute tests that require the capability. import eventlet -from neutron.agent.linux import ovs_lib from neutron.agent.linux import ovsdb_monitor from neutron.tests.functional.agent.linux import base as base_agent -class BaseMonitorTest(base_agent.BaseLinuxTestCase): +class BaseMonitorTest(base_agent.BaseOVSLinuxTestCase): def setUp(self): - super(BaseMonitorTest, self).setUp() + # Emulate using a rootwrap script with sudo + super(BaseMonitorTest, self).setUp(root_helper='sudo sudo') self._check_test_requirements() - - # Emulate using a rootwrap script with sudo - self.root_helper = 'sudo sudo' - self.ovs = ovs_lib.BaseOVS(self.root_helper) - self.bridge = self.create_ovs_resource('test-br-', self.ovs.add_bridge) - - def cleanup_bridge(): - self.bridge.destroy() - self.addCleanup(cleanup_bridge) + self.bridge = self.create_ovs_bridge() def _check_test_requirements(self): self.check_sudo_enabled() self.check_command(['which', 'ovsdb-client'], - 'Exit code: 1', - 'ovsdb-client is not installed') + 'Exit code: 1', 'ovsdb-client is not installed') self.check_command(['sudo', '-n', 'ovsdb-client', 'list-dbs'], 'Exit code: 1', 'password-less sudo not granted for ovsdb-client') @@ -110,7 +101,7 @@ class TestSimpleInterfaceMonitor(BaseMonitorTest): 'Initial call should always be true') self.assertFalse(self.monitor.has_updates, 'has_updates without port addition should be False') - self.create_ovs_resource('test-port-', self.bridge.add_port) + self.create_resource('test-port-', self.bridge.add_port) with self.assert_max_execution_time(): # has_updates after port addition should become True while not self.monitor.has_updates: