Refactor Tempest to be parallel-test friendly

* Allows a tenant/user to be created by the base test
  class, allowing isolation of the tests. The base
  compute test case class now controls what users
  and tenants get created for the test case
* Moves identity admin stuff into appropriate directory
* Removes _multiprocess_shared_ to force setUpClass to
  run for every test case.
* Adds setup.cfg for use with openstack.nose_plugin
* run_test.sh refactoring to make running smoketests easier

Change-Id: I3ea4c44a8c194d7ae2833b5443df7678924bcd5d
This commit is contained in:
Jay Pipes 2012-06-21 13:37:35 -04:00
parent cf0040ca02
commit f38eaace9c
41 changed files with 724 additions and 469 deletions

View File

@ -27,20 +27,26 @@ strategy = keystone
# This section contains configuration options used when executing tests
# against the OpenStack Compute API.
# Allows test cases to create/destroy tenants and users. This option
# enables isolated test cases and better parallel execution,
# but also requires that OpenStack Identity API admin credentials
# are known.
allow_tenant_isolation = true
# This should be the username of a user WITHOUT administrative privileges
username = {$USERNAME}
username = demo
# The above non-administrative user's password
password = {$PASSWORD}
password = pass
# The above non-administrative user's tenant name
tenant_name = {$TENANT_NAME}
tenant_name = demo
# This should be the username of an alternate user WITHOUT
# administrative privileges
alt_username = {$ALT_USERNAME}
alt_username = alt_demo
# The above non-administrative user's password
alt_password = {$ALT_PASSWORD}
alt_password = pass
# The above non-administrative user's tenant name
alt_tenant_name = {$ALT_TENANT_NAME}
alt_tenant_name = alt_demo
# Reference data for tests. The ref and ref_alt should be
# distinct images/flavors.
@ -109,11 +115,11 @@ host = 127.0.0.1
port = 9292
# This should be the username of a user WITHOUT administrative privileges
username = {$USERNAME}
username = demo
# The above non-administrative user's password
password = {$PASSWORD}
password = pass
# The above non-administrative user's tenant name
tenant_name = {$TENANT_NAME}
tenant_name = demo
[compute-admin]
# This section contains configuration options for an administrative
@ -121,14 +127,26 @@ tenant_name = {$TENANT_NAME}
# the admin-only parts of the Compute API
# This should be the username of a user WITH administrative privileges
username = {$ADMIN_USERNAME}
username = admin
# The above administrative user's password
password = {$ADMIN_PASSWORD}
password = pass
# The above administrative user's tenant name
tenant_name = {$ADMIN_TENANT_NAME}
tenant_name = admin
[network]
# This section contains configuration options used when executing tests
# against the OpenStack Network API.
api_version = v1.1
catalog_type = network
[identity-admin]
# This section contains configuration options for an administrative
# user of the Compute API. These options are used in tests that stress
# the admin-only parts of the Compute API
# This should be the username of a user WITH administrative privileges
username = admin
# The above administrative user's password
password = pass
# The above administrative user's tenant name
tenant_name = admin

View File

@ -23,6 +23,12 @@ strategy = %IDENTITY_STRATEGY%
# This section contains configuration options used when executing tests
# against the OpenStack Compute API.
# Allows test cases to create/destroy tenants and users. This option
# enables isolated test cases and better parallel execution,
# but also requires that OpenStack Identity API admin credentials
# are known.
allow_tenant_isolation = %COMPUTE_ALLOW_TENANT_ISOLATION%
# This should be the username of a user WITHOUT administrative privileges
username = %USERNAME%
# The above non-administrative user's password
@ -100,8 +106,20 @@ tenant_name = %TENANT_NAME%
# the admin-only parts of the Compute API
# This should be the username of a user WITH administrative privileges
username = %ADMIN_USERNAME%
username = %COMPUTE_ADMIN_USERNAME%
# The above administrative user's password
password = %ADMIN_PASSWORD%
password = %COMPUTE_ADMIN_PASSWORD%
# The above administrative user's tenant name
tenant_name = %ADMIN_TENANT_NAME%
tenant_name = %COMPUTE_ADMIN_TENANT_NAME%
[identity-admin]
# This section contains configuration options for an administrative
# user of the Compute API. These options are used in tests that stress
# the admin-only parts of the Compute API
# This should be the username of a user WITH administrative privileges
username = %IDENTITY_ADMIN_USERNAME%
# The above administrative user's password
password = %IDENTITY_ADMIN_PASSWORD%
# The above administrative user's tenant name
tenant_name = %IDENTITY_ADMIN_TENANT_NAME%

View File

@ -1,41 +1,61 @@
#!/bin/bash
function usage {
echo "Usage: [OPTIONS] [SUITES]"
echo "Run all of the test suites"
echo "Usage: $0 [OPTION]..."
echo "Run Tempest test suite"
echo ""
echo " -s, --smoke Only run smoke tests"
echo " -p, --pep8 Just run pep8"
echo " -h, --help Print this usage message"
echo ""
echo " The suites should be listed by the name of their directory."
echo " All other options are passed directly to the suites."
echo " -d. --debug Debug this script -- set -o xtrace"
exit
}
function process_option {
case "$1" in
-h|--help) usage;;
-*|--*) test_opts="$test_opts $1";;
*) tests="$tests $1"
-d|--debug) set -o xtrace;;
-p|--pep8) let just_pep8=1;;
-s|--smoke) noseargs="$noseargs --attr=type=smoke";;
*) noseargs="$noseargs $1"
esac
}
noseargs="tempest"
just_pep8=0
export NOSE_WITH_OPENSTACK=1
export NOSE_OPENSTACK_COLOR=1
export NOSE_OPENSTACK_RED=15.00
export NOSE_OPENSTACK_YELLOW=3.00
export NOSE_OPENSTACK_SHOW_ELAPSED=1
export NOSE_OPENSTACK_STDOUT=1
for arg in "$@"; do
process_option $arg
done
echo $test_opts
function run_tests {
base_dir=$(dirname $0)
for test_dir in $tests
do
test_cmd="${base_dir}/${test_dir}/run_tests.sh ${test_opts}"
echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
echo $test_cmd
echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
$test_cmd
done
$NOSETESTS
}
function run_pep8 {
echo "Running pep8 ..."
PEP8_EXCLUDE="kong,etc,include,tools"
PEP8_OPTIONS="--exclude=$PEP8_EXCLUDE --repeat"
PEP8_INCLUDE="."
pep8 $PEP8_OPTIONS $PEP8_INCLUDE
}
NOSETESTS="nosetests $noseargs"
if [ $just_pep8 -eq 1 ]; then
run_pep8
exit
fi
run_tests || exit
if [ -z "$noseargs" ]; then
run_pep8
fi

7
setup.cfg Normal file
View File

@ -0,0 +1,7 @@
[nosetests]
# NOTE(jkoelker) To run the test suite under nose install the following
# coverage http://pypi.python.org/pypi/coverage
# tissue http://pypi.python.org/pypi/tissue (pep8 checker)
# openstack-nose https://github.com/openstack-dev/openstack-nose
verbosity=2
detailed-errors=1

View File

@ -90,10 +90,40 @@ class IdentityConfig(BaseConfig):
return self.get("strategy", 'keystone')
class IdentityAdminConfig(BaseConfig):
SECTION_NAME = "identity-admin"
@property
def username(self):
"""Username to use for Identity Admin API requests"""
return self.get("username", "admin")
@property
def tenant_name(self):
"""Tenant name to use for Identity Admin API requests"""
return self.get("tenant_name", "admin")
@property
def password(self):
"""API key to use for Identity Admin API requests"""
return self.get("password", "pass")
class ComputeConfig(BaseConfig):
SECTION_NAME = "compute"
@property
def allow_tenant_isolation(self):
"""
Allows test cases to create/destroy tenants and users. This option
enables isolated test cases and better parallel execution,
but also requires that OpenStack Identity API admin credentials
are known.
"""
return self.get("allow_tenant_isolation", 'false').lower() != 'false'
@property
def username(self):
"""Username to use for Nova API requests."""
@ -323,6 +353,7 @@ class TempestConfig:
self.compute = ComputeConfig(self._conf)
self.compute_admin = ComputeAdminConfig(self._conf)
self.identity = IdentityConfig(self._conf)
self.identity_admin = IdentityAdminConfig(self._conf)
self.images = ImagesConfig(self._conf)
self.network = NetworkConfig(self._conf)

View File

@ -17,7 +17,7 @@
import logging
import tempest.config
from tempest import config
from tempest import exceptions
from tempest.services.image import service as image_service
from tempest.services.network.json.network_client import NetworkClient
@ -33,8 +33,6 @@ from tempest.services.nova.json.keypairs_client import KeyPairsClient
from tempest.services.nova.json.volumes_client import VolumesClient
from tempest.services.nova.json.console_output_client \
import ConsoleOutputsClient
from tempest.services.identity.json.admin_client import AdminClient
from tempest.services.identity.json.admin_client import TokenClient
LOG = logging.getLogger(__name__)
@ -55,8 +53,10 @@ class Manager(object):
:param password: Override of the password
:param tenant_name: Override of the tenant name
"""
self.config = tempest.config.TempestConfig()
self.config = config.TempestConfig()
# If no creds are provided, we fall back on the defaults
# in the config file for the Compute API.
username = username or self.config.compute.username
password = password or self.config.compute.password
tenant_name = tenant_name or self.config.compute.tenant_name
@ -85,8 +85,6 @@ class Manager(object):
self.floating_ips_client = FloatingIPsClient(*client_args)
self.volumes_client = VolumesClient(*client_args)
self.console_outputs_client = ConsoleOutputsClient(*client_args)
self.admin_client = AdminClient(*client_args)
self.token_client = TokenClient(self.config)
self.network_client = NetworkClient(*client_args)
@ -98,7 +96,7 @@ class AltManager(Manager):
"""
def __init__(self):
conf = tempest.config.TempestConfig()
conf = config.TempestConfig()
super(AltManager, self).__init__(conf.compute.alt_username,
conf.compute.alt_password,
conf.compute.alt_tenant_name)
@ -112,11 +110,10 @@ class AdminManager(Manager):
"""
def __init__(self):
conf = tempest.config.TempestConfig()
conf = config.TempestConfig()
super(AdminManager, self).__init__(conf.compute_admin.username,
conf.compute_admin.password,
conf.compute_admin.tenant_name)
# TODO(jaypipes): Add Admin-Only client class construction below...
class ServiceManager(object):
@ -126,7 +123,7 @@ class ServiceManager(object):
"""
def __init__(self):
self.config = tempest.config.TempestConfig()
self.config = config.TempestConfig()
self.services = {}
self.services['image'] = image_service.Service(self.config)
self.images = self.services['image']

View File

@ -0,0 +1,79 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import nose
from tempest import config
from tempest import openstack
LOG = logging.getLogger(__name__)
CONFIG = config.TempestConfig()
CREATE_IMAGE_ENABLED = CONFIG.compute.create_image_enabled
RESIZE_AVAILABLE = CONFIG.compute.resize_available
DISK_CONFIG_ENABLED = False
FLAVOR_EXTRA_DATA_ENABLED = False
MULTI_USER = False
# All compute tests -- single setup function
def setup_package():
LOG.debug("Entering tempest.tests.compute.setup_package")
global MULTI_USER, DISK_CONFIG_ENABLED, FLAVOR_EXTRA_DATA_ENABLED
os = openstack.Manager()
images_client = os.images_client
flavors_client = os.flavors_client
extensions_client = os.extensions_client
DISK_CONFIG_ENABLED = extensions_client.is_enabled('DiskConfig')
FLAVOR_EXTRA_DATA_ENABLED = extensions_client.is_enabled('FlavorExtraData')
# Validate reference data exists
# If not, we raise the exception here and prevent
# going forward...
try:
image_ref = CONFIG.compute.image_ref
image_ref_alt = CONFIG.compute.image_ref_alt
images_client.get_image(image_ref)
images_client.get_image(image_ref_alt)
flavor_ref = CONFIG.compute.flavor_ref
flavor_ref_alt = CONFIG.compute.flavor_ref_alt
flavors_client.get_flavor_details(flavor_ref)
flavors_client.get_flavor_details(flavor_ref_alt)
except Exception as e:
msg = "Failed basic configuration: %s" % e
raise nose.SkipTest(msg)
# Determine if there are two regular users that can be
# used in testing. If the test cases are allowed to create
# users (config.compute.allow_tenant_isolation is true,
# then we allow multi-user.
if CONFIG.compute.allow_tenant_isolation:
MULTI_USER = True
else:
user1 = CONFIG.compute.username
user2 = CONFIG.compute.alt_username
if user2 and user1 != user2:
user2_password = CONFIG.compute.alt_password
user2_tenant_name = CONFIG.compute.alt_tenant_name
if not user2_password or user2_tenant_name:
msg = ("Alternate user specified but not alternate "
"tenant or password") % e
raise nose.SkipTest(msg)
MULTI_USER = True

View File

@ -15,16 +15,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import nose
from nose.plugins.attrib import attr
from nose import SkipTest
import tempest.config
from tempest import exceptions
from tempest import openstack
from tempest.tests.compute.base import BaseComputeTest
from tempest.tests.compute.base import BaseComputeAdminTest
from tempest.tests import compute
class FlavorsAdminTest(BaseComputeTest):
class FlavorsAdminTest(BaseComputeAdminTest):
"""
Tests Flavors API Create and Delete that require admin privileges
@ -32,37 +31,32 @@ class FlavorsAdminTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
cls.config = tempest.config.TempestConfig()
cls.admin_username = cls.config.compute_admin.username
cls.admin_password = cls.config.compute_admin.password
cls.admin_tenant = cls.config.compute_admin.tenant_name
if not compute.FLAVOR_EXTRA_DATA_ENABLED:
msg = "FlavorExtraData extension not enabled."
raise nose.SkipTest(msg)
if not cls.admin_username and cls.admin_password and cls.admin_tenant:
raise SkipTest("Missing Admin credentials in configuration")
else:
cls.admin_os = openstack.AdminManager()
cls.admin_client = cls.admin_os.flavors_client
cls.flavor_name = 'test_flavor'
cls.ram = 512
cls.vcpus = 1
cls.disk = 10
cls.ephemeral = 10
cls.new_flavor_id = 1234
cls.swap = 1024
cls.rxtx = 1
super(FlavorsAdminTest, cls).setUpClass()
cls.client = cls.os.flavors_client
cls.flavor_name = 'test_flavor'
cls.ram = 512
cls.vcpus = 1
cls.disk = 10
cls.ephemeral = 10
cls.new_flavor_id = 1234
cls.swap = 1024
cls.rxtx = 1
@attr(type='positive')
def test_create_flavor(self):
"""Create a flavor and ensure it is listed
This operation requires the user to have 'admin' role"""
#Create the flavor
resp, flavor = self.admin_client.create_flavor(self.flavor_name,
self.ram, self.vcpus,
self.disk,
self.ephemeral,
self.new_flavor_id,
self.swap, self.rxtx)
resp, flavor = self.client.create_flavor(self.flavor_name,
self.ram, self.vcpus,
self.disk,
self.ephemeral,
self.new_flavor_id,
self.swap, self.rxtx)
self.assertEqual(200, resp.status)
self.assertEqual(flavor['name'], self.flavor_name)
self.assertEqual(flavor['vcpus'], self.vcpus)
@ -74,29 +68,28 @@ class FlavorsAdminTest(BaseComputeTest):
self.assertEqual(flavor['OS-FLV-EXT-DATA:ephemeral'], self.ephemeral)
#Verify flavor is retrieved
resp, flavor = self.admin_client.get_flavor_details(self.new_flavor_id)
resp, flavor = self.client.get_flavor_details(self.new_flavor_id)
self.assertEqual(resp.status, 200)
self.assertEqual(flavor['name'], self.flavor_name)
#Delete the flavor
resp, body = self.admin_client.delete_flavor(flavor['id'])
resp, body = self.client.delete_flavor(flavor['id'])
self.assertEqual(resp.status, 202)
@attr(type='positive')
def test_create_flavor_verify_entry_in_list_details(self):
"""Create a flavor and ensure it's details are listed
This operation requires the user to have 'admin' role"""
#Create the flavor
resp, flavor = self.admin_client.create_flavor(self.flavor_name,
self.ram, self.vcpus,
self.disk,
self.ephemeral,
self.new_flavor_id,
self.swap, self.rxtx)
resp, flavor = self.client.create_flavor(self.flavor_name,
self.ram, self.vcpus,
self.disk,
self.ephemeral,
self.new_flavor_id,
self.swap, self.rxtx)
flag = False
#Verify flavor is retrieved
resp, flavors = self.admin_client.list_flavors_with_detail()
resp, flavors = self.client.list_flavors_with_detail()
self.assertEqual(resp.status, 200)
for flavor in flavors:
if flavor['name'] == self.flavor_name:
@ -104,33 +97,32 @@ class FlavorsAdminTest(BaseComputeTest):
self.assertTrue(flag)
#Delete the flavor
resp, body = self.admin_client.delete_flavor(self.new_flavor_id)
resp, body = self.client.delete_flavor(self.new_flavor_id)
self.assertEqual(resp.status, 202)
@attr(type='negative')
def test_get_flavor_details_for_deleted_flavor(self):
"""Delete a flavor and ensure it is not listed"""
# Create a test flavor
resp, flavor = self.admin_client.create_flavor(self.flavor_name,
self.ram,
self.vcpus, self.disk,
self.ephemeral,
self.new_flavor_id,
self.swap, self.rxtx)
resp, flavor = self.client.create_flavor(self.flavor_name,
self.ram,
self.vcpus, self.disk,
self.ephemeral,
self.new_flavor_id,
self.swap, self.rxtx)
self.assertEquals(200, resp.status)
# Delete the flavor
resp, _ = self.admin_client.delete_flavor(self.new_flavor_id)
resp, _ = self.client.delete_flavor(self.new_flavor_id)
self.assertEqual(resp.status, 202)
# Deleted flavors can be seen via detailed GET
resp, flavor = self.admin_client.get_flavor_details(self.new_flavor_id)
resp, flavor = self.client.get_flavor_details(self.new_flavor_id)
self.assertEqual(resp.status, 200)
self.assertEqual(flavor['name'], self.flavor_name)
# Deleted flavors should not show up in a list however
resp, flavors = self.admin_client.list_flavors_with_detail()
resp, flavors = self.client.list_flavors_with_detail()
self.assertEqual(resp.status, 200)
flag = True
for flavor in flavors:

View File

@ -15,98 +15,120 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import time
import unittest2 as unittest
from tempest import config
from tempest import exceptions
from tempest import openstack
from tempest.common.utils.data_utils import rand_name
from tempest.services.identity.json.admin_client import AdminClient
LOG = logging.getLogger(__name__)
class BaseComputeTest(unittest.TestCase):
os = openstack.Manager()
servers_client = os.servers_client
flavors_client = os.flavors_client
images_client = os.images_client
extensions_client = os.extensions_client
floating_ips_client = os.floating_ips_client
keypairs_client = os.keypairs_client
floating_ips_client = os.floating_ips_client
security_groups_client = os.security_groups_client
console_outputs_client = os.console_outputs_client
limits_client = os.limits_client
volumes_client = os.volumes_client
config = os.config
build_interval = config.compute.build_interval
build_timeout = config.compute.build_timeout
ssh_user = config.compute.ssh_user
servers = []
"""Base test case class for all Compute API tests"""
# Validate reference data exists
# If not, attempt to auto-configure
try:
image_ref = config.compute.image_ref
image_ref_alt = config.compute.image_ref_alt
images_client.get_image(image_ref)
images_client.get_image(image_ref_alt)
except:
# Make a reasonable attempt to get usable images
params = {'status': 'ACTIVE'}
_, images = images_client.list_images_with_detail(params)
if len(images) is 0:
message = "No usable image exists. Upload an image to Glance."
raise exceptions.InvalidConfiguration(message=message)
if len(images) is 1:
image_ref = images[0]['id']
image_ref_alt = images[0]['id']
@classmethod
def setUpClass(cls):
cls.config = config.TempestConfig()
cls.isolated_creds = []
if cls.config.compute.allow_tenant_isolation:
creds = cls._get_isolated_creds()
username, tenant_name, password = creds
os = openstack.Manager(username=username,
password=password,
tenant_name=tenant_name)
else:
# Try to determine if this is a devstack environment.
# If so, some of the images are not usable
os = openstack.Manager()
# For now, the useable image in devstack has this property
usable = [i for i in images if 'ramdisk_id' in i['metadata']]
if len(usable) > 0:
# We've found at least one image we can use
image_ref = usable[0]['id']
image_ref_alt = usable[0]['id']
else:
# We've done our due dillegence, take the first two images
image_ref = images[0]['id']
image_ref_alt = images[1]['id']
cls.os = os
cls.servers_client = os.servers_client
cls.flavors_client = os.flavors_client
cls.images_client = os.images_client
cls.extensions_client = os.extensions_client
cls.floating_ips_client = os.floating_ips_client
cls.keypairs_client = os.keypairs_client
cls.floating_ips_client = os.floating_ips_client
cls.security_groups_client = os.security_groups_client
cls.console_outputs_client = os.console_outputs_client
cls.limits_client = os.limits_client
cls.volumes_client = os.volumes_client
cls.build_interval = cls.config.compute.build_interval
cls.build_timeout = cls.config.compute.build_timeout
cls.ssh_user = cls.config.compute.ssh_user
cls.image_ref = cls.config.compute.image_ref
cls.image_ref_alt = cls.config.compute.image_ref_alt
cls.flavor_ref = cls.config.compute.flavor_ref
cls.flavor_ref_alt = cls.config.compute.flavor_ref_alt
cls.servers = []
try:
flavor_ref = config.compute.flavor_ref
flavor_ref_alt = config.compute.flavor_ref_alt
flavors_client.get_flavor_details(flavor_ref)
flavors_client.get_flavor_details(flavor_ref_alt)
except:
# Reload both with new values
# Sort so the smallest flavors are used. This is for efficiency.
_, flavors = flavors_client.list_flavors_with_detail()
flavors = sorted(flavors, key=lambda k: k['ram'])
@classmethod
def _get_identity_admin_client(cls):
"""
Returns an instance of the Identity Admin API client
"""
client_args = (cls.config,
cls.config.identity_admin.username,
cls.config.identity_admin.password,
cls.config.identity.auth_url)
tenant_name = cls.config.identity_admin.tenant_name
admin_client = AdminClient(*client_args, tenant_name=tenant_name)
return admin_client
if len(flavors) is 0:
message = "No flavors exists. Add flavors via the admin API."
raise exceptions.InvalidConfiguration(message=message)
if len(flavors) is 1:
flavor_ref = flavors[0]['id']
flavor_ref_alt = flavors[0]['id']
else:
flavor_ref = flavors[0]['id']
# Make sure the second flavor does not have the same RAM
for i in range(1, len(flavors)):
if flavors[i] == flavors[-1]:
# We've tried. Take the last flavor
flavor_ref_alt = flavors[i]['id']
else:
if flavors[i]['ram'] > flavors[0]['ram']:
flavor_ref_alt = flavors[i]['id']
break
@classmethod
def _get_isolated_creds(cls):
"""
Creates a new set of user/tenant/password credentials for a
**regular** user of the Compute API so that a test case can
operate in an isolated tenant container.
"""
admin_client = cls._get_identity_admin_client()
rand_name_root = cls.__name__
if cls.isolated_creds:
# Main user already created. Create the alt one...
rand_name_root += '-alt'
username = rand_name_root + "-user"
email = rand_name_root + "@example.com"
tenant_name = rand_name_root + "-tenant"
tenant_desc = tenant_name + "-desc"
password = "pass"
resp, tenant = admin_client.create_tenant(name=tenant_name,
description=tenant_desc)
resp, user = admin_client.create_user(username,
password,
tenant['id'],
email)
# Store the complete creds (including UUID ids...) for later
# but return just the username, tenant_name, password tuple
# that the various clients will use.
cls.isolated_creds.append((user, tenant))
return username, tenant_name, password
@classmethod
def clear_isolated_creds(cls):
if not cls.isolated_creds:
pass
admin_client = cls._get_identity_admin_client()
for user, tenant in cls.isolated_creds:
admin_client.delete_user(user['id'])
admin_client.delete_tenant(tenant['id'])
@classmethod
def tearDownClass(cls):
cls.clear_isolated_creds()
def create_server(self, image_id=None):
"""Wrapper utility that returns a test server"""
server_name = rand_name('test-vm-')
server_name = rand_name(self.__class__.__name__ + "-instance")
flavor = self.flavor_ref
if not image_id:
image_id = self.image_ref
@ -131,3 +153,22 @@ class BaseComputeTest(unittest.TestCase):
condition()
return
time.sleep(self.build_interval)
class BaseComputeAdminTest(unittest.TestCase):
"""Base test case class for all Compute Admin API tests"""
@classmethod
def setUpClass(cls):
cls.config = config.TempestConfig()
cls.admin_username = cls.config.compute_admin.username
cls.admin_password = cls.config.compute_admin.password
cls.admin_tenant = cls.config.compute_admin.tenant_name
if not cls.admin_username and cls.admin_password and cls.admin_tenant:
msg = ("Missing Compute Admin API credentials "
"in configuration.")
raise nose.SkipTest(msg)
cls.os = openstack.AdminManager()

View File

@ -22,249 +22,223 @@ import unittest2 as unittest
from tempest import openstack
from tempest.common.utils.data_utils import rand_name, parse_image_id
from tempest import exceptions
from tempest.tests import utils
from tempest.tests.compute.base import BaseComputeTest
from tempest.tests import compute
class AuthorizationTest(unittest.TestCase):
class AuthorizationTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
cls.os = openstack.Manager()
if not compute.MULTI_USER:
msg = "Need >1 user"
raise nose.SkipTest(msg)
super(AuthorizationTest, cls).setUpClass()
cls.client = cls.os.servers_client
cls.images_client = cls.os.images_client
cls.keypairs_client = cls.os.keypairs_client
cls.security_client = cls.os.security_groups_client
cls.console_outputs_client = cls.os.console_outputs_client
cls.config = cls.os.config
cls.image_ref = cls.config.compute.image_ref
cls.flavor_ref = cls.config.compute.flavor_ref
cls.image_ref_alt = cls.config.compute.image_ref_alt
cls.flavor_ref_alt = cls.config.compute.flavor_ref_alt
# Verify the second user is not the same as the first and is configured
cls.user1 = cls.config.compute.username
cls.user2 = cls.config.compute.alt_username
cls.user2_password = cls.config.compute.alt_password
cls.user2_tenant_name = cls.config.compute.alt_tenant_name
cls.multi_user = False
if cls.config.compute.allow_tenant_isolation:
creds = cls._get_isolated_creds()
username, tenant_name, password = creds
cls.alt_manager = openstack.Manager(username=username,
password=password,
tenant_name=tenant_name)
else:
# Use the alt_XXX credentials in the config file
cls.alt_manager = openstack.AltManager()
if (cls.user2 != None and cls.user1 != cls.user2
and cls.user2_password != None
and cls.user2_tenant_name != None):
cls.alt_client = cls.alt_manager.servers_client
cls.alt_images_client = cls.alt_manager.images_client
cls.alt_keypairs_client = cls.alt_manager.keypairs_client
cls.alt_security_client = cls.alt_manager.security_groups_client
cls.alt_console_outputs_client = cls.alt_manager.console_outputs_client
try:
cls.other_manager = openstack.AltManager()
cls.other_client = cls.other_manager.servers_client
cls.other_images_client = cls.other_manager.images_client
cls.other_keypairs_client = cls.other_manager.keypairs_client
cls.other_security_client = \
cls.other_manager.security_groups_client
cls.other_console_outputs_client = \
cls.other_manager.console_outputs_client
except exceptions.AuthenticationFailure:
# multi_user is already set to false, just fall through
pass
else:
cls.multi_user = True
cls.alt_security_client._set_auth()
name = rand_name('server')
resp, server = cls.client.create_server(name, cls.image_ref,
cls.flavor_ref)
cls.client.wait_for_server_status(server['id'], 'ACTIVE')
resp, cls.server = cls.client.get_server(server['id'])
cls.other_security_client._set_auth()
name = rand_name('server')
resp, server = cls.client.create_server(name, cls.image_ref,
cls.flavor_ref)
cls.client.wait_for_server_status(server['id'], 'ACTIVE')
resp, cls.server = cls.client.get_server(server['id'])
name = rand_name('image')
resp, body = cls.client.create_image(server['id'], name)
image_id = parse_image_id(resp['location'])
cls.images_client.wait_for_image_resp_code(image_id, 200)
cls.images_client.wait_for_image_status(image_id, 'ACTIVE')
resp, cls.image = cls.images_client.get_image(image_id)
name = rand_name('image')
resp, body = cls.client.create_image(server['id'], name)
image_id = parse_image_id(resp['location'])
cls.images_client.wait_for_image_resp_code(image_id, 200)
cls.images_client.wait_for_image_status(image_id, 'ACTIVE')
resp, cls.image = cls.images_client.get_image(image_id)
cls.keypairname = rand_name('keypair')
resp, keypair = \
cls.keypairs_client.create_keypair(cls.keypairname)
cls.keypairname = rand_name('keypair')
resp, keypair = \
cls.keypairs_client.create_keypair(cls.keypairname)
name = rand_name('security')
description = rand_name('description')
resp, cls.security_group = \
cls.security_client.create_security_group(name, description)
name = rand_name('security')
description = rand_name('description')
resp, cls.security_group = \
cls.security_client.create_security_group(name, description)
parent_group_id = cls.security_group['id']
ip_protocol = 'tcp'
from_port = 22
to_port = 22
resp, cls.rule =\
cls.security_client.create_security_group_rule(\
parent_group_id,
ip_protocol, from_port,
to_port)
parent_group_id = cls.security_group['id']
ip_protocol = 'tcp'
from_port = 22
to_port = 22
resp, cls.rule =\
cls.security_client.create_security_group_rule(\
parent_group_id,
ip_protocol, from_port,
to_port)
@classmethod
def tearDownClass(cls):
if cls.multi_user:
if compute.MULTI_USER:
cls.client.delete_server(cls.server['id'])
cls.images_client.delete_image(cls.image['id'])
cls.keypairs_client.delete_keypair(cls.keypairname)
cls.security_client.delete_security_group(cls.security_group['id'])
super(AuthorizationTest, cls).tearDownClass()
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_get_server_for_other_account_fails(self):
def test_get_server_for_alt_account_fails(self):
"""A GET request for a server on another user's account should fail"""
self.other_client.get_server(self.server['id'])
self.alt_client.get_server(self.server['id'])
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_delete_server_for_other_account_fails(self):
def test_delete_server_for_alt_account_fails(self):
"""A DELETE request for another user's server should fail"""
self.other_client.delete_server(self.server['id'])
self.alt_client.delete_server(self.server['id'])
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_update_server_for_other_account_fails(self):
def test_update_server_for_alt_account_fails(self):
"""An update server request for another user's server should fail"""
self.other_client.update_server(self.server['id'], name='test')
self.alt_client.update_server(self.server['id'], name='test')
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_list_server_addresses_for_other_account_fails(self):
def test_list_server_addresses_for_alt_account_fails(self):
"""A list addresses request for another user's server should fail"""
self.other_client.list_addresses(self.server['id'])
self.alt_client.list_addresses(self.server['id'])
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_list_server_addresses_by_network_for_other_account_fails(self):
def test_list_server_addresses_by_network_for_alt_account_fails(self):
"""
A list address/network request for another user's server should fail
"""
server_id = self.server['id']
self.other_client.list_addresses_by_network(server_id, 'public')
self.alt_client.list_addresses_by_network(server_id, 'public')
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_change_password_for_other_account_fails(self):
def test_change_password_for_alt_account_fails(self):
"""A change password request for another user's server should fail"""
self.other_client.change_password(self.server['id'], 'newpass')
self.alt_client.change_password(self.server['id'], 'newpass')
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_reboot_server_for_other_account_fails(self):
def test_reboot_server_for_alt_account_fails(self):
"""A reboot request for another user's server should fail"""
self.other_client.reboot(self.server['id'], 'HARD')
self.alt_client.reboot(self.server['id'], 'HARD')
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_rebuild_server_for_other_account_fails(self):
def test_rebuild_server_for_alt_account_fails(self):
"""A rebuild request for another user's server should fail"""
self.other_client.rebuild(self.server['id'], self.image_ref_alt)
self.alt_client.rebuild(self.server['id'], self.image_ref_alt)
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_resize_server_for_other_account_fails(self):
def test_resize_server_for_alt_account_fails(self):
"""A resize request for another user's server should fail"""
self.other_client.resize(self.server['id'], self.flavor_ref_alt)
self.alt_client.resize(self.server['id'], self.flavor_ref_alt)
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_create_image_for_other_account_fails(self):
def test_create_image_for_alt_account_fails(self):
"""A create image request for another user's server should fail"""
self.other_images_client.create_image(self.server['id'], 'testImage')
self.alt_images_client.create_image(self.server['id'], 'testImage')
@raises(exceptions.BadRequest)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_create_server_with_unauthorized_image(self):
"""Server creation with another user's image should fail"""
self.other_client.create_server('test', self.image['id'],
self.alt_client.create_server('test', self.image['id'],
self.flavor_ref)
@raises(exceptions.BadRequest)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_create_server_fails_when_tenant_incorrect(self):
"""
A create server request should fail if the tenant id does not match
the current user
"""
saved_base_url = self.other_client.base_url
saved_base_url = self.alt_client.base_url
try:
# Change the base URL to impersonate another user
self.other_client.base_url = self.client.base_url
self.other_client.create_server('test', self.image['id'],
self.alt_client.base_url = self.client.base_url
self.alt_client.create_server('test', self.image['id'],
self.flavor_ref)
finally:
# Reset the base_url...
self.other_client.base_url = saved_base_url
self.alt_client.base_url = saved_base_url
@raises(exceptions.BadRequest)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_create_keypair_in_another_user_tenant(self):
def test_create_keypair_in_analt_user_tenant(self):
"""
A create keypair request should fail if the tenant id does not match
the current user
"""
#POST keypair with other user tenant
k_name = rand_name('keypair-')
self.other_keypairs_client._set_auth()
self.saved_base_url = self.other_keypairs_client.base_url
self.alt_keypairs_client._set_auth()
self.saved_base_url = self.alt_keypairs_client.base_url
try:
# Change the base URL to impersonate another user
self.other_keypairs_client.base_url = self.keypairs_client.base_url
self.alt_keypairs_client.base_url = self.keypairs_client.base_url
resp = {}
resp['status'] = None
resp, _ = self.other_keypairs_client.create_keypair(k_name)
resp, _ = self.alt_keypairs_client.create_keypair(k_name)
finally:
# Reset the base_url...
self.other_keypairs_client.base_url = self.saved_base_url
self.alt_keypairs_client.base_url = self.saved_base_url
if (resp['status'] != None):
resp, _ = self.other_keypairs_client.delete_keypair(k_name)
resp, _ = self.alt_keypairs_client.delete_keypair(k_name)
self.fail("Create keypair request should not happen if the"
" tenant id does not match the current user")
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_get_keypair_of_other_account_fails(self):
def test_get_keypair_of_alt_account_fails(self):
"""A GET request for another user's keypair should fail"""
self.other_keypairs_client.get_keypair(self.keypairname)
self.alt_keypairs_client.get_keypair(self.keypairname)
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_delete_keypair_of_other_account_fails(self):
def test_delete_keypair_of_alt_account_fails(self):
"""A DELETE request for another user's keypair should fail"""
self.other_keypairs_client.delete_keypair(self.keypairname)
self.alt_keypairs_client.delete_keypair(self.keypairname)
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_get_image_for_other_account_fails(self):
def test_get_image_for_alt_account_fails(self):
"""A GET request for an image on another user's account should fail"""
self.other_images_client.get_image(self.image['id'])
self.alt_images_client.get_image(self.image['id'])
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_delete_image_for_other_account_fails(self):
def test_delete_image_for_alt_account_fails(self):
"""A DELETE request for another user's image should fail"""
self.other_images_client.delete_image(self.image['id'])
self.alt_images_client.delete_image(self.image['id'])
@raises(exceptions.BadRequest)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_create_security_group_in_another_user_tenant(self):
def test_create_security_group_in_analt_user_tenant(self):
"""
A create security group request should fail if the tenant id does not
match the current user
@ -272,44 +246,41 @@ class AuthorizationTest(unittest.TestCase):
#POST security group with other user tenant
s_name = rand_name('security-')
s_description = rand_name('security')
self.saved_base_url = self.other_security_client.base_url
self.saved_base_url = self.alt_security_client.base_url
try:
# Change the base URL to impersonate another user
self.other_security_client.base_url = self.security_client.base_url
self.alt_security_client.base_url = self.security_client.base_url
resp = {}
resp['status'] = None
resp, body = self.other_security_client.create_security_group(\
resp, body = self.alt_security_client.create_security_group(\
s_name,
s_description)
finally:
# Reset the base_url...
self.other_security_client.base_url = self.saved_base_url
if (resp['status'] != None):
self.alt_security_client.base_url = self.saved_base_url
if resp['status'] != None:
resp, _ = \
self.other_security_client.delete_security_group(body['id'])
self.alt_security_client.delete_security_group(body['id'])
self.fail("Create Security Group request should not happen if"
"the tenant id does not match the current user")
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_get_security_group_of_other_account_fails(self):
def test_get_security_group_of_alt_account_fails(self):
"""A GET request for another user's security group should fail"""
self.other_security_client.get_security_group(\
self.alt_security_client.get_security_group(\
self.security_group['id'])
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_delete_security_group_of_other_account_fails(self):
def test_delete_security_group_of_alt_account_fails(self):
"""A DELETE request for another user's security group should fail"""
self.other_security_client.delete_security_group(\
self.alt_security_client.delete_security_group(\
self.security_group['id'])
@raises(exceptions.BadRequest)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_create_security_group_rule_in_another_user_tenant(self):
def test_create_security_group_rule_in_analt_user_tenant(self):
"""
A create security group rule request should fail if the tenant id
does not match the current user
@ -319,23 +290,23 @@ class AuthorizationTest(unittest.TestCase):
ip_protocol = 'icmp'
from_port = -1
to_port = -1
self.saved_base_url = self.other_security_client.base_url
self.saved_base_url = self.alt_security_client.base_url
try:
# Change the base URL to impersonate another user
self.other_security_client.base_url = self.security_client.base_url
self.alt_security_client.base_url = self.security_client.base_url
resp = {}
resp['status'] = None
resp, body = \
self.other_security_client.create_security_group_rule(\
self.alt_security_client.create_security_group_rule(\
parent_group_id,
ip_protocol, from_port,
to_port)
finally:
# Reset the base_url...
self.other_security_client.base_url = self.saved_base_url
if (resp['status'] != None):
self.alt_security_client.base_url = self.saved_base_url
if resp['status'] != None:
resp, _ = \
self.other_security_client.delete_security_group_rule(\
self.alt_security_client.delete_security_group_rule(\
body['id'])
self.fail("Create security group rule request should not "
"happen if the tenant id does not match the"
@ -344,43 +315,39 @@ class AuthorizationTest(unittest.TestCase):
@unittest.skip("Skipped until the Bug #1001118 is resolved")
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_delete_security_group_rule_of_other_account_fails(self):
def test_delete_security_group_rule_of_alt_account_fails(self):
"""
A DELETE request for another user's security group rule
should fail
"""
self.other_security_client.delete_security_group_rule(\
self.alt_security_client.delete_security_group_rule(\
self.rule['id'])
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_set_metadata_of_other_account_server_fails(self):
def test_set_metadata_of_alt_account_server_fails(self):
""" A set metadata for another user's server should fail """
req_metadata = {'meta1': 'data1', 'meta2': 'data2'}
self.other_client.set_server_metadata(self.server['id'],
self.alt_client.set_server_metadata(self.server['id'],
req_metadata)
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_set_metadata_of_other_account_image_fails(self):
def test_set_metadata_of_alt_account_image_fails(self):
""" A set metadata for another user's image should fail """
req_metadata = {'meta1': 'value1', 'meta2': 'value2'}
self.other_images_client.set_image_metadata(self.image['id'],
self.alt_images_client.set_image_metadata(self.image['id'],
req_metadata)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_get_metadata_of_other_account_server_fails(self):
def test_get_metadata_of_alt_account_server_fails(self):
""" A get metadata for another user's server should fail """
req_metadata = {'meta1': 'data1'}
self.client.set_server_metadata(self.server['id'],
req_metadata)
try:
resp, meta = \
self.other_client.get_server_metadata_item(self.server['id'],
self.alt_client.get_server_metadata_item(self.server['id'],
'meta1')
except exceptions.NotFound:
pass
@ -389,15 +356,14 @@ class AuthorizationTest(unittest.TestCase):
self.client.delete_server_metadata_item(self.server['id'], 'meta1')
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_get_metadata_of_other_account_image_fails(self):
def test_get_metadata_of_alt_account_image_fails(self):
""" A get metadata for another user's image should fail """
req_metadata = {'meta1': 'value1'}
self.images_client.set_image_metadata(self.image['id'],
req_metadata)
try:
resp, meta = \
self.other_images_client.get_image_metadata_item(self.image['id'],
self.alt_images_client.get_image_metadata_item(self.image['id'],
'meta1')
except exceptions.NotFound:
pass
@ -406,15 +372,14 @@ class AuthorizationTest(unittest.TestCase):
self.image['id'], 'meta1')
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_delete_metadata_of_other_account_server_fails(self):
def test_delete_metadata_of_alt_account_server_fails(self):
""" A delete metadata for another user's server should fail """
req_metadata = {'meta1': 'data1'}
self.client.set_server_metadata(self.server['id'],
req_metadata)
try:
resp, body = \
self.other_client.delete_server_metadata_item(\
self.alt_client.delete_server_metadata_item(\
self.server['id'], 'meta1')
except exceptions.NotFound:
pass
@ -423,15 +388,14 @@ class AuthorizationTest(unittest.TestCase):
self.client.delete_server_metadata_item(self.server['id'], 'meta1')
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_delete_metadata_of_other_account_image_fails(self):
def test_delete_metadata_of_alt_account_image_fails(self):
""" A delete metadata for another user's image should fail """
req_metadata = {'meta1': 'data1'}
self.images_client.set_image_metadata(self.image['id'],
req_metadata)
try:
resp, body = \
self.other_images_client.delete_image_metadata_item(\
self.alt_images_client.delete_image_metadata_item(\
self.image['id'], 'meta1')
except exceptions.NotFound:
pass
@ -442,10 +406,9 @@ class AuthorizationTest(unittest.TestCase):
@raises(exceptions.NotFound)
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
def test_get_console_output_of_other_account_server_fails(self):
def test_get_console_output_of_alt_account_server_fails(self):
"""
A Get Console Output for another user's server should fail
"""
self.other_console_outputs_client.get_console_output(self.server['id'],
self.alt_console_outputs_client.get_console_output(self.server['id'],
10)

View File

@ -27,6 +27,7 @@ class ConsoleOutputTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(ConsoleOutputTest, cls).setUpClass()
cls.client = cls.console_outputs_client
cls.servers_client = cls.servers_client
cls.name = rand_name('server')
@ -40,6 +41,7 @@ class ConsoleOutputTest(BaseComputeTest):
@classmethod
def tearDownClass(cls):
cls.servers_client.delete_server(cls.server_id)
super(ConsoleOutputTest, cls).tearDownClass()
@attr(type='positive')
def test_get_console_output(self):

View File

@ -32,6 +32,7 @@ class ServersTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(ServersTest, cls).setUpClass()
cls.meta = {'hello': 'world'}
cls.accessIPv4 = '1.1.1.1'
cls.accessIPv6 = '::babe:220.12.22.2'
@ -54,6 +55,7 @@ class ServersTest(BaseComputeTest):
@classmethod
def tearDownClass(cls):
cls.client.delete_server(cls.server_initial['id'])
super(ServersTest, cls).tearDownClass()
@attr(type='smoke')
def test_create_server_response(self):

View File

@ -15,34 +15,27 @@
# License for the specific language governing permissions and limitations
# under the License.
import nose
from nose.plugins.attrib import attr
import unittest2 as unittest
from tempest.common.utils.data_utils import rand_name
from tempest import exceptions
from tempest import openstack
import tempest.config
from tempest.tests import utils
from tempest.common.utils.data_utils import rand_name
from tempest.tests.compute.base import BaseComputeTest
from tempest.tests import compute
class TestServerDiskConfig(unittest.TestCase):
resize_available = tempest.config.TempestConfig().compute.resize_available
class TestServerDiskConfig(BaseComputeTest):
@classmethod
def setUpClass(cls):
cls.os = openstack.Manager()
if not compute.DISK_CONFIG_ENABLED:
msg = "DiskConfig extension not enabled."
raise nose.SkipTest(msg)
super(TestServerDiskConfig, cls).setUpClass()
cls.client = cls.os.servers_client
extensions_client = cls.os.extensions_client
cls.config = cls.os.config
cls.image_ref = cls.config.compute.image_ref
cls.image_ref_alt = cls.config.compute.image_ref_alt
cls.flavor_ref = cls.config.compute.flavor_ref
cls.flavor_ref_alt = cls.config.compute.flavor_ref_alt
cls.disk_config = extensions_client.is_enabled('DiskConfig')
@attr(type='positive')
@utils.skip_unless_attr('disk_config', 'Disk config extension not enabled')
def test_create_server_with_manual_disk_config(self):
"""A server should be created with manual disk config"""
name = rand_name('server')
@ -62,7 +55,6 @@ class TestServerDiskConfig(unittest.TestCase):
resp, body = self.client.delete_server(server['id'])
@attr(type='positive')
@utils.skip_unless_attr('disk_config', 'Disk config extension not enabled')
def test_create_server_with_auto_disk_config(self):
"""A server should be created with auto disk config"""
name = rand_name('server')
@ -82,7 +74,6 @@ class TestServerDiskConfig(unittest.TestCase):
resp, body = self.client.delete_server(server['id'])
@attr(type='positive')
@utils.skip_unless_attr('disk_config', 'Disk config extension not enabled')
def test_rebuild_server_with_manual_disk_config(self):
"""A server should be rebuilt using the manual disk config option"""
name = rand_name('server')
@ -113,7 +104,6 @@ class TestServerDiskConfig(unittest.TestCase):
resp, body = self.client.delete_server(server['id'])
@attr(type='positive')
@utils.skip_unless_attr('disk_config', 'Disk config extension not enabled')
def test_rebuild_server_with_auto_disk_config(self):
"""A server should be rebuilt using the auto disk config option"""
name = rand_name('server')
@ -144,8 +134,7 @@ class TestServerDiskConfig(unittest.TestCase):
resp, body = self.client.delete_server(server['id'])
@attr(type='positive')
@utils.skip_unless_attr('disk_config', 'Disk config extension not enabled')
@unittest.skipIf(not resize_available, 'Resize not available.')
@unittest.skipUnless(compute.RESIZE_AVAILABLE, 'Resize not available.')
def test_resize_server_from_manual_to_auto(self):
"""A server should be resized from manual to auto disk config"""
name = rand_name('server')
@ -171,8 +160,7 @@ class TestServerDiskConfig(unittest.TestCase):
resp, body = self.client.delete_server(server['id'])
@attr(type='positive')
@utils.skip_unless_attr('disk_config', 'Disk config extension not enabled')
@unittest.skipIf(not resize_available, 'Resize not available.')
@unittest.skipUnless(compute.RESIZE_AVAILABLE, 'Resize not available.')
def test_resize_server_from_auto_to_manual(self):
"""A server should be resized from auto to manual disk config"""
name = rand_name('server')

View File

@ -24,6 +24,7 @@ class ExtensionsTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(ExtensionsTest, cls).setUpClass()
cls.client = cls.extensions_client
@attr(type='smoke')

View File

@ -22,10 +22,10 @@ from tempest.tests.compute.base import BaseComputeTest
class FlavorsTest(BaseComputeTest):
_multiprocess_shared_ = True
@classmethod
def setUpClass(cls):
super(FlavorsTest, cls).setUpClass()
cls.client = cls.flavors_client
@attr(type='smoke')

View File

@ -30,6 +30,7 @@ class FloatingIPsTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(FloatingIPsTest, cls).setUpClass()
cls.client = cls.floating_ips_client
cls.servers_client = cls.servers_client
@ -60,6 +61,7 @@ class FloatingIPsTest(BaseComputeTest):
resp, body = cls.servers_client.delete_server(cls.server_id)
#Deleting the floating IP which is created in this method
resp, body = cls.client.delete_floating_ip(cls.floating_ip_id)
super(FloatingIPsTest, cls).tearDownClass()
@attr(type='positive')
def test_allocate_floating_ip(self):

View File

@ -26,6 +26,7 @@ class ImagesMetadataTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(ImagesMetadataTest, cls).setUpClass()
cls.servers_client = cls.servers_client
cls.client = cls.images_client
@ -49,6 +50,7 @@ class ImagesMetadataTest(BaseComputeTest):
def tearDownClass(cls):
cls.client.delete_image(cls.image_id)
cls.servers_client.delete_server(cls.server_id)
super(ImagesMetadataTest, cls).tearDownClass()
def setUp(self):
meta = {'key1': 'value1', 'key2': 'value2'}

View File

@ -18,43 +18,35 @@
from nose.plugins.attrib import attr
import unittest2 as unittest
from tempest.common.utils.data_utils import rand_name
from tempest.common.utils.data_utils import rand_name, parse_image_id
import tempest.config
from tempest import exceptions
from tempest import openstack
from tempest.common.utils import data_utils
from tempest.tests.compute.base import BaseComputeTest
from tempest.tests import utils
from tempest.tests import compute
class ImagesTest(BaseComputeTest):
create_image_enabled = tempest.config.TempestConfig().\
compute.create_image_enabled
@classmethod
def setUpClass(cls):
super(ImagesTest, cls).setUpClass()
cls.client = cls.images_client
cls.servers_client = cls.servers_client
cls.user1 = cls.config.compute.username
cls.user2 = cls.config.compute.alt_username
cls.user2_password = cls.config.compute.alt_password
cls.user2_tenant_name = cls.config.compute.alt_tenant_name
cls.multi_user = False
cls.image_ids = []
if (cls.user2 and cls.user1 != cls.user2 and cls.user2_password \
and cls.user2_tenant_name):
try:
cls.alt_manager = openstack.AltManager()
cls.alt_client = cls.alt_manager.images_client
except exceptions.AuthenticationFailure:
# multi_user is already set to false, just fall through
pass
if compute.MULTI_USER:
if cls.config.compute.allow_tenant_isolation:
creds = cls._get_isolated_creds()
username, tenant_name, password = creds
cls.alt_manager = openstack.Manager(username=username,
password=password,
tenant_name=tenant_name)
else:
cls.multi_user = True
# Use the alt_XXX credentials in the config file
cls.alt_manager = openstack.AltManager()
cls.alt_client = cls.alt_manager.images_client
def tearDown(self):
"""Terminate test instances created after a test is executed"""
@ -69,7 +61,7 @@ class ImagesTest(BaseComputeTest):
self.image_ids.remove(image_id)
@attr(type='smoke')
@unittest.skipUnless(create_image_enabled,
@unittest.skipUnless(compute.CREATE_IMAGE_ENABLED,
'Environment unable to create images.')
def test_create_delete_image(self):
"""An image for the provided server should be created"""
@ -83,7 +75,7 @@ class ImagesTest(BaseComputeTest):
name = rand_name('image')
meta = {'image_type': 'test'}
resp, body = self.client.create_image(server['id'], name, meta)
image_id = data_utils.parse_image_id(resp['location'])
image_id = parse_image_id(resp['location'])
self.client.wait_for_image_resp_code(image_id, 200)
self.client.wait_for_image_status(image_id, 'ACTIVE')
@ -123,7 +115,7 @@ class ImagesTest(BaseComputeTest):
pass
else:
image_id = data_utils.parse_image_id(resp['location'])
image_id = parse_image_id(resp['location'])
self.client.wait_for_image_resp_code(image_id, 200)
self.client.wait_for_image_status(image_id, 'ACTIVE')
self.client.delete_image(image_id)
@ -145,13 +137,13 @@ class ImagesTest(BaseComputeTest):
finally:
if (resp['status'] != None):
image_id = data_utils.parse_image_id(resp['location'])
image_id = parse_image_id(resp['location'])
resp, _ = self.client.delete_image(image_id)
self.fail("An image should not be created"
" with invalid server id")
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
@unittest.skipUnless(compute.MULTI_USER, 'Second user not configured')
def test_create_image_for_server_in_another_tenant(self):
"""Creating image of another tenant's server should be return error"""
server = self.create_server()
@ -201,7 +193,7 @@ class ImagesTest(BaseComputeTest):
# Create first snapshot
snapshot_name = rand_name('test-snap-')
resp, body = self.client.create_image(server['id'], snapshot_name)
image_id = data_utils.parse_image_id(resp['location'])
image_id = parse_image_id(resp['location'])
self.image_ids.append(image_id)
# Create second snapshot
@ -369,7 +361,7 @@ class ImagesTest(BaseComputeTest):
"exceeds 35 character ID length limit")
@attr(type='negative')
@utils.skip_unless_attr('multi_user', 'Second user not configured')
@unittest.skipUnless(compute.MULTI_USER, 'Second user not configured')
def test_delete_image_of_another_tenant(self):
"""Return an error while trying to delete another tenant's image"""
@ -377,7 +369,7 @@ class ImagesTest(BaseComputeTest):
snapshot_name = rand_name('test-snap-')
resp, body = self.client.create_image(server['id'], snapshot_name)
image_id = data_utils.parse_image_id(resp['location'])
image_id = parse_image_id(resp['location'])
self.image_ids.append(image_id)
self.client.wait_for_image_resp_code(image_id, 200)
self.client.wait_for_image_status(image_id, 'ACTIVE')
@ -394,7 +386,7 @@ class ImagesTest(BaseComputeTest):
snapshot_name = rand_name('test-snap-')
resp, body = self.client.create_image(server['id'], snapshot_name)
image_id = data_utils.parse_image_id(resp['location'])
image_id = parse_image_id(resp['location'])
self.image_ids.append(image_id)
# Do not wait, attempt to delete the image, ensure it's successful

View File

@ -24,10 +24,10 @@ from tempest.tests.compute.base import BaseComputeTest
class KeyPairsTest(BaseComputeTest):
_multiprocess_shared_ = True
@classmethod
def setUpClass(cls):
super(KeyPairsTest, cls).setUpClass()
cls.client = cls.keypairs_client
@attr(type='smoke')

View File

@ -27,6 +27,7 @@ class FloatingIPDetailsTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(FloatingIPDetailsTest, cls).setUpClass()
cls.client = cls.floating_ips_client
cls.floating_ip = []
cls.floating_ip_id = []
@ -40,6 +41,7 @@ class FloatingIPDetailsTest(BaseComputeTest):
def tearDownClass(cls):
for i in range(3):
cls.client.delete_floating_ip(cls.floating_ip_id[i])
super(FloatingIPDetailsTest, cls).tearDownClass()
@attr(type='positive')
def test_list_floating_ips(self):

View File

@ -23,10 +23,10 @@ from tempest.tests.compute.base import BaseComputeTest
class ListImagesTest(BaseComputeTest):
_multiprocess_shared_ = True
@classmethod
def setUpClass(cls):
super(ListImagesTest, cls).setUpClass()
cls.client = cls.images_client
name = rand_name('server')
@ -72,6 +72,7 @@ class ListImagesTest(BaseComputeTest):
cls.client.delete_image(cls.image3_id)
cls.servers_client.delete_server(cls.server1['id'])
cls.servers_client.delete_server(cls.server2['id'])
super(ListImagesTest, cls).tearDownClass()
@attr(type='smoke')
def test_get_image(self):

View File

@ -25,14 +25,14 @@ from tempest.tests import utils
class ServerDetailsTest(BaseComputeTest):
_multiprocess_shared_ = True
@classmethod
def setUpClass(cls):
super(ServerDetailsTest, cls).setUpClass()
cls.client = cls.servers_client
# Check to see if the alternate image ref actually exists...
images_client = cls.os.images_client
images_client = cls.images_client
resp, images = images_client.list_images()
if cls.image_ref != cls.image_ref_alt and \
@ -84,6 +84,7 @@ class ServerDetailsTest(BaseComputeTest):
cls.client.delete_server(cls.s1['id'])
cls.client.delete_server(cls.s2['id'])
cls.client.delete_server(cls.s3['id'])
super(ServerDetailsTest, cls).tearDownClass()
def test_list_servers(self):
"""Return a list of all servers"""

View File

@ -24,43 +24,28 @@ from tempest import exceptions
from tempest import openstack
from tempest.common.utils.data_utils import rand_name
from tempest.tests.compute.base import BaseComputeTest
from tempest.tests import compute
class ServerDetailsNegativeTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(ServerDetailsNegativeTest, cls).setUpClass()
cls.client = cls.servers_client
cls.servers = []
# Verify the alternate user is configured and not the same as the first
cls.user1 = cls.config.compute.username
cls.user2 = cls.config.compute.alt_username
cls.user2_password = cls.config.compute.alt_password
cls.user2_tenant_name = cls.config.compute.alt_tenant_name
cls.multi_user = False
if (not None in (cls.user2, cls.user2_password, cls.user2_tenant_name)
and cls.user1 != cls.user2):
try:
cls.alt_manager = openstack.AltManager()
cls.alt_client = cls.alt_manager.servers_client
except exceptions.AuthenticationFailure:
# multi_user is already set to false, just fall through
pass
if compute.MULTI_USER:
if cls.config.compute.allow_tenant_isolation:
creds = cls._get_isolated_creds()
username, tenant_name, password = creds
cls.alt_manager = openstack.Manager(username=username,
password=password,
tenant_name=tenant_name)
else:
cls.multi_user = True
@classmethod
def tearDownClass(cls):
"""Terminate all running instances in nova"""
try:
resp, body = cls.client.list_servers()
for server in body['servers']:
resp, body = cls.client.delete_server(server)
except exceptions.NotFound:
pass
# Use the alt_XXX credentials in the config file
cls.alt_manager = openstack.AltManager()
cls.alt_client = cls.alt_manager.servers_client
def tearDown(self):
"""Terminate instances created by tests"""

View File

@ -26,6 +26,7 @@ class SecurityGroupRulesTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(SecurityGroupRulesTest, cls).setUpClass()
cls.client = cls.security_groups_client
@attr(type='positive')

View File

@ -26,6 +26,7 @@ class SecurityGroupsTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(SecurityGroupsTest, cls).setUpClass()
cls.client = cls.security_groups_client
@attr(type='positive')

View File

@ -33,6 +33,7 @@ class ServerActionsTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(ServerActionsTest, cls).setUpClass()
cls.client = cls.servers_client
def setUp(self):

View File

@ -26,6 +26,7 @@ class ServerAddressesTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(ServerAddressesTest, cls).setUpClass()
cls.client = cls.servers_client
cls.name = rand_name('server')
@ -37,6 +38,7 @@ class ServerAddressesTest(BaseComputeTest):
@classmethod
def tearDownClass(cls):
cls.client.delete_server(cls.server['id'])
super(ServerAddressesTest, cls).tearDownClass()
@attr(type='negative', category='server-addresses')
def test_list_server_addresses_invalid_server_id(self):

View File

@ -25,6 +25,7 @@ class ServerMetadataTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(ServerMetadataTest, cls).setUpClass()
cls.client = cls.servers_client
#Create a server to be used for all read only tests
@ -39,6 +40,7 @@ class ServerMetadataTest(BaseComputeTest):
@classmethod
def tearDownClass(cls):
cls.client.delete_server(cls.server_id)
super(ServerMetadataTest, cls).tearDownClass()
def setUp(self):
meta = {'key1': 'value1', 'key2': 'value2'}

View File

@ -25,10 +25,10 @@ from tempest.tests.compute.base import BaseComputeTest
class ServerPersonalityTest(BaseComputeTest):
_multiprocess_shared_ = True
@classmethod
def setUpClass(cls):
super(ServerPersonalityTest, cls).setUpClass()
cls.client = cls.servers_client
cls.user_client = cls.limits_client

View File

@ -22,10 +22,10 @@ from tempest.tests.compute.base import BaseComputeTest
class ServersTest(BaseComputeTest):
_multiprocess_shared_ = True
@classmethod
def setUpClass(cls):
super(ServersTest, cls).setUpClass()
cls.client = cls.servers_client
@attr(type='smoke')

View File

@ -30,19 +30,12 @@ class ServersNegativeTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(ServersNegativeTest, cls).setUpClass()
cls.client = cls.servers_client
cls.img_client = cls.images_client
cls.alt_os = openstack.AltManager()
cls.alt_client = cls.alt_os.servers_client
@classmethod
def tearDownClass(cls):
for server in cls.servers:
try:
cls.client.delete_server(server['id'])
except exceptions.NotFound:
continue
@attr(type='negative')
def test_server_name_blank(self):
"""Create a server with name parameter empty"""

View File

@ -26,7 +26,8 @@ class VolumesGetTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
cls.client = cls.os.volumes_client
super(VolumesGetTest, cls).setUpClass()
cls.client = cls.volumes_client
@attr(type='smoke')
def test_volume_create_get_delete(self):

View File

@ -37,6 +37,7 @@ class VolumesTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(VolumesTest, cls).setUpClass()
cls.client = cls.volumes_client
# Create 3 Volumes
cls.volume_list = list()
@ -74,6 +75,7 @@ class VolumesTest(BaseComputeTest):
# Delete the created Volumes
for volume in cls.volume_list:
resp, _ = cls.client.delete_volume(volume['id'])
super(VolumesTest, cls).tearDownClass()
def test_volume_list(self):
"""Should return the list of Volumes"""

View File

@ -24,10 +24,11 @@ from tempest.common.utils.data_utils import rand_name
from tempest.tests.compute.base import BaseComputeTest
class VolumesTest(BaseComputeTest):
class VolumesNegativeTest(BaseComputeTest):
@classmethod
def setUpClass(cls):
super(VolumesNegativeTest, cls).setUpClass()
cls.client = cls.volumes_client
@attr(type='negative')

View File

@ -0,0 +1,25 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
LOG = logging.getLogger(__name__)
# All identity tests -- single setup function
def setup_package():
LOG.debug("Entering tempest.tests.identity.setup_package")

View File

View File

@ -1,10 +1,28 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest2 as unittest
from tempest import exceptions
from tempest.common.utils.data_utils import rand_name
from base_admin_test import BaseAdminTest
from tempest.tests.identity.base import BaseIdentityAdminTest
class RolesTest(BaseAdminTest):
class RolesTest(BaseIdentityAdminTest):
@classmethod
def setUpClass(cls):

View File

@ -1,10 +1,28 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest2 as unittest
from tempest import exceptions
from tempest.common.utils.data_utils import rand_name
from base_admin_test import BaseAdminTest
from tempest.tests.identity.base import BaseIdentityAdminTest
class TenantsTest(BaseAdminTest):
class TenantsTest(BaseIdentityAdminTest):
@classmethod
def setUpClass(cls):
@ -14,10 +32,6 @@ class TenantsTest(BaseAdminTest):
resp, tenant = cls.client.create_tenant(rand_name('tenant-'))
cls.data.tenants.append(tenant)
@classmethod
def tearDownClass(cls):
super(TenantsTest, cls).tearDownClass()
def test_list_tenants(self):
"""Return a list of all tenants"""
resp, body = self.client.list_tenants()

View File

@ -1,11 +1,29 @@
import unittest2 as unittest
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nose.plugins.attrib import attr
from base_admin_test import BaseAdminTest
import unittest2 as unittest
from tempest import exceptions
from tempest.common.utils.data_utils import rand_name
from tempest.tests.identity.base import BaseIdentityAdminTest
class UsersTest(BaseAdminTest):
class UsersTest(BaseIdentityAdminTest):
alt_user = rand_name('test_user_')
alt_password = rand_name('pass_')

View File

@ -1,34 +1,68 @@
import unittest2 as unittest
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import nose
import unittest2 as unittest
import tempest.config
from tempest import openstack
from tempest.common.utils.data_utils import rand_name
from tempest.services.identity.json.admin_client import AdminClient
from tempest.services.identity.json.admin_client import TokenClient
class BaseAdminTest(unittest.TestCase):
"""Base class for Identity Admin Tests"""
class BaseIdentityAdminTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.config = tempest.config.TempestConfig()
cls.admin_username = cls.config.compute_admin.username
cls.admin_password = cls.config.compute_admin.password
cls.admin_tenant = cls.config.compute_admin.tenant_name
cls.username = cls.config.identity_admin.username
cls.password = cls.config.identity_admin.password
cls.tenant_name = cls.config.identity_admin.tenant_name
if not(cls.admin_username and cls.admin_password and cls.admin_tenant):
if not (cls.username
and cls.password
and cls.tenant_name):
raise nose.SkipTest("Missing Admin credentials in configuration")
cls.admin_os = openstack.AdminManager()
cls.client = cls.admin_os.admin_client
cls.token_client = cls.admin_os.token_client
client_args = (cls.config,
cls.username,
cls.password,
cls.config.identity.auth_url)
cls.client = AdminClient(*client_args, tenant_name=cls.tenant_name)
cls.token_client = TokenClient(cls.config)
if not cls.client.has_admin_extensions():
raise nose.SkipTest("Admin extensions disabled")
cls.os = openstack.Manager()
cls.non_admin_client = cls.os.admin_client
cls.data = DataGenerator(cls.client)
# Create an admin client with regular Compute API credentials. This
# client is used in tests to validate Unauthorized is returned
# for non-admin users accessing Identity Admin API commands
cls.na_username = cls.config.compute.username
cls.na_password = cls.config.compute.password
cls.na_tenant_name = cls.config.compute.tenant_name
na_client_args = (cls.config,
cls.na_username,
cls.na_password,
cls.config.identity.auth_url)
cls.non_admin_client = AdminClient(*na_client_args,
tenant_name=cls.na_tenant_name)
@classmethod
def tearDownClass(cls):
cls.data.teardown_all()

View File

@ -25,24 +25,22 @@ from tempest.common.utils.data_utils import rand_name
class BaseNetworkTest(unittest.TestCase):
os = openstack.Manager()
client = os.network_client
config = os.config
networks = []
enabled = True
# Validate that there is even an endpoint configured
# for networks, and mark the attr for skipping if not
try:
client.list_networks()
except exceptions.EndpointNotFound:
enabled = False
skip_msg = "No network endpoint"
@classmethod
def setUpClass(cls):
if not cls.enabled:
raise nose.SkipTest(cls.skip_msg)
os = openstack.Manager()
client = os.network_client
config = os.config
networks = []
enabled = True
# Validate that there is even an endpoint configured
# for networks, and mark the attr for skipping if not
try:
client.list_networks()
except exceptions.EndpointNotFound:
enabled = False
skip_msg = "No OpenStack Network API endpoint"
raise nose.SkipTest(skip_msg)
@classmethod
def tearDownClass(cls):