Add common base class for agent functional tests

Refactor the functional tests a bit to add a common base class for agent
tests. This is needed an upcoming commit which adds a functional test
for VXLAN version checking.

Closes-Bug: #1301363

Change-Id: I438e401cbfb40e5557e5125f2a409bac5c1fd1c2
This commit is contained in:
Kyle Mestery 2014-04-02 07:17:37 +00:00
parent 9009a2296b
commit 83a11d406e
3 changed files with 68 additions and 45 deletions

View File

@ -80,3 +80,6 @@ TYPE_LOCAL = 'local'
TYPE_VXLAN = 'vxlan' TYPE_VXLAN = 'vxlan'
TYPE_VLAN = 'vlan' TYPE_VLAN = 'vlan'
TYPE_NONE = 'none' TYPE_NONE = 'none'
# The maximum length of an interface name (in Linux)
MAX_DEV_NAME_LEN = 16

View File

@ -0,0 +1,54 @@
# Copyright 2014 Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import random
from neutron.agent.linux import utils
from neutron.plugins.common import constants as q_const
from neutron.tests import base
class BaseLinuxTestCase(base.BaseTestCase):
def check_command(self, cmd, error_text, skip_msg):
try:
utils.execute(cmd)
except RuntimeError as e:
if error_text in str(e):
self.skipTest(skip_msg)
raise
def check_sudo_enabled(self):
if os.environ.get('OS_SUDO_TESTING') not in base.TRUE_STRING:
self.skipTest('testing with sudo is not enabled')
def get_rand_name(self, max_length, prefix='test'):
name = prefix + str(random.randint(1, 0x7fffffff))
return name[:max_length]
def create_ovs_resource(self, name_prefix, creation_func):
"""Create a new ovs resource that does not already exist.
:param name_prefix: The prefix for a randomly generated name
:param creation_func: A function taking the name of the resource
to be created. An error is assumed to indicate a name
collision.
"""
while True:
name = self.get_rand_name(q_const.MAX_DEV_NAME_LEN, name_prefix)
try:
return creation_func(name)
except RuntimeError:
continue

View File

@ -30,39 +30,14 @@ runs, but configuring OS_SUDO_TESTING ensures that developers are
still able to execute tests that require the capability. still able to execute tests that require the capability.
""" """
import os
import random
import eventlet import eventlet
from neutron.agent.linux import ovs_lib from neutron.agent.linux import ovs_lib
from neutron.agent.linux import ovsdb_monitor from neutron.agent.linux import ovsdb_monitor
from neutron.agent.linux import utils from neutron.tests.functional.agent.linux import base as base_agent
from neutron.tests import base
def get_rand_name(name='test'): class BaseMonitorTest(base_agent.BaseLinuxTestCase):
return name + str(random.randint(1, 0x7fffffff))
def create_ovs_resource(name_prefix, creation_func):
"""Create a new ovs resource that does not already exist.
:param name_prefix: The prefix for a randomly generated name
:param creation_func: A function taking the name of the resource
to be created. An error is assumed to indicate a name
collision.
"""
while True:
name = get_rand_name(name_prefix)
try:
return creation_func(name)
except RuntimeError:
continue
break
class BaseMonitorTest(base.BaseTestCase):
def setUp(self): def setUp(self):
super(BaseMonitorTest, self).setUp() super(BaseMonitorTest, self).setUp()
@ -72,29 +47,20 @@ class BaseMonitorTest(base.BaseTestCase):
# Emulate using a rootwrap script with sudo # Emulate using a rootwrap script with sudo
self.root_helper = 'sudo sudo' self.root_helper = 'sudo sudo'
self.ovs = ovs_lib.BaseOVS(self.root_helper) self.ovs = ovs_lib.BaseOVS(self.root_helper)
self.bridge = create_ovs_resource('test-br-', self.ovs.add_bridge) self.bridge = self.create_ovs_resource('test-br-', self.ovs.add_bridge)
def cleanup_bridge(): def cleanup_bridge():
self.bridge.destroy() self.bridge.destroy()
self.addCleanup(cleanup_bridge) self.addCleanup(cleanup_bridge)
def _check_command(self, cmd, error_text, skip_msg):
try:
utils.execute(cmd)
except RuntimeError as e:
if error_text in str(e):
self.skipTest(skip_msg)
raise
def _check_test_requirements(self): def _check_test_requirements(self):
if os.environ.get('OS_SUDO_TESTING') not in base.TRUE_STRING: self.check_sudo_enabled()
self.skipTest('testing with sudo is not enabled') self.check_command(['which', 'ovsdb-client'],
self._check_command(['which', 'ovsdb-client'], 'Exit code: 1',
'Exit code: 1', 'ovsdb-client is not installed')
'ovsdb-client is not installed') self.check_command(['sudo', '-n', 'ovsdb-client', 'list-dbs'],
self._check_command(['sudo', '-n', 'ovsdb-client', 'list-dbs'], 'Exit code: 1',
'Exit code: 1', 'password-less sudo not granted for ovsdb-client')
'password-less sudo not granted for ovsdb-client')
class TestOvsdbMonitor(BaseMonitorTest): class TestOvsdbMonitor(BaseMonitorTest):
@ -144,7 +110,7 @@ class TestSimpleInterfaceMonitor(BaseMonitorTest):
'Initial call should always be true') 'Initial call should always be true')
self.assertFalse(self.monitor.has_updates, self.assertFalse(self.monitor.has_updates,
'has_updates without port addition should be False') 'has_updates without port addition should be False')
create_ovs_resource('test-port-', self.bridge.add_port) self.create_ovs_resource('test-port-', self.bridge.add_port)
with self.assert_max_execution_time(): with self.assert_max_execution_time():
# has_updates after port addition should become True # has_updates after port addition should become True
while not self.monitor.has_updates: while not self.monitor.has_updates: