1.Created tests package.

2.Moved sanity tests to tests package.
3.Implemented smoke tests, added them to tests package.
4.Implemented base.py, __init__.py modules for smoke tests.
5.Modified config.py, test.conf to enable smoke tests.

Change-Id: Id29f8c74284ba82fb93c4537ae29a3e770de0173
Note: smoke tests might not work while test.conf is not set up properly.
This commit is contained in:
anaboikina 2013-06-19 12:04:38 +03:00
parent 2e064d6ba9
commit b0b53cb2b6
16 changed files with 713 additions and 5 deletions

View File

@ -2,7 +2,7 @@
# This section contains configuration options that a variety of
# test clients use when authenticating with different user/tenant
# combinations
url = http://192.168.56.102/auth/login/
# The type of endpoint for a Identity service. Unless you have a
# custom Keystone service catalog implementation, you probably want to leave
# this value as "identity"

View File

@ -38,6 +38,9 @@ IdentityGroup = [
cfg.StrOpt('uri',
default=None,
help="Full URI of the OpenStack Identity API (Keystone), v2"),
cfg.StrOpt('url',
default='http://10.0.0.1/',
help="Dashboard Openstack url, v2"),
cfg.StrOpt('uri_v3',
help='Full URI of the OpenStack Identity API (Keystone), v3'),
cfg.StrOpt('strategy',
@ -510,11 +513,13 @@ class FuelConfig:
register_network_opts(cfg.CONF)
register_volume_opts(cfg.CONF)
register_compute_admin_opts(cfg.CONF)
register_smoke_opts(cfg.CONF)
self.compute = cfg.CONF.compute
self.identity = cfg.CONF.identity
self.network = cfg.CONF.network
self.volume = cfg.CONF.volume
self.compute_admin = cfg.CONF['compute-admin']
self.smoke = cfg.CONF.smoke
if not self.compute_admin.username:
self.compute_admin.username = self.identity.admin_username
self.compute_admin.password = self.identity.admin_password

View File

@ -22,9 +22,10 @@ import nose.plugins.attrib
import testresources
import testtools
from fuel.common import log as logging
from fuel import config
from fuel import manager
from fuel.common import log as logging
LOG = logging.getLogger(__name__)
@ -94,7 +95,7 @@ def call_until_true(func, duration, sleep_for):
class TestCase(BaseTestCase):
"""Base test case class for all Tempest tests
"""Base test case class for all tests
Contains basic setup and convenience methods
"""
@ -127,6 +128,7 @@ class TestCase(BaseTestCase):
self.os_resources.remove(thing)
del self.resource_keys[key]
def status_timeout(self, things, thing_id, expected_status):
"""
Given a thing and an expected status, do a loop, sleeping
@ -149,7 +151,7 @@ class TestCase(BaseTestCase):
LOG.debug("Waiting for %s to get to %s status. "
"Currently in %s status",
thing, expected_status, new_status)
conf = config.TempestConfig()
conf = config.FuelConfig()
if not call_until_true(check_status,
conf.compute.build_timeout,
conf.compute.build_interval):
@ -161,7 +163,7 @@ class ComputeFuzzClientTest(TestCase):
"""
Base test case class for OpenStack Compute API (Nova)
that uses the Tempest REST fuzz client libs for calling the API.
that uses the REST fuzz client libs for calling the API.
"""
manager_class = manager.ComputeFuzzClientManager

1
fuel/tests/__init__.py Normal file
View File

@ -0,0 +1 @@
__author__ = 'anaboikina'

View File

@ -0,0 +1,58 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fuel import clients
from fuel.common import log as logging
from fuel import config
from fuel.exceptions import InvalidConfiguration
LOG = logging.getLogger(__name__)
CONFIG = config.FuelConfig()
CREATE_IMAGE_ENABLED = CONFIG.compute.create_image_enabled
RESIZE_AVAILABLE = CONFIG.compute.resize_available
CHANGE_PASSWORD_AVAILABLE = CONFIG.compute.change_password_available
DISK_CONFIG_ENABLED = True
DISK_CONFIG_ENABLED_OVERRIDE = CONFIG.compute.disk_config_enabled_override
FLAVOR_EXTRA_DATA_ENABLED = True
MULTI_USER = True
# All compute tests -- single setup function
def generic_setup_package():
LOG.debug("Entering fuel.setup_package")
global MULTI_USER, DISK_CONFIG_ENABLED, FLAVOR_EXTRA_DATA_ENABLED
os = clients.Manager()
# Determine if there are two regular users that can be
# used in testing. If the test cases are allowed to create
# users (config.compute.allow_tenant_isolation is true,
# then we allow multi-user.
if not CONFIG.compute.allow_tenant_isolation:
user1 = CONFIG.identity.username
user2 = CONFIG.identity.alt_username
if not user2 or user1 == user2:
MULTI_USER = False
else:
user2_password = CONFIG.identity.alt_password
user2_tenant_name = CONFIG.identity.alt_tenant_name
if not user2_password or not user2_tenant_name:
msg = ("Alternate user specified but not alternate "
"tenant or password: alt_tenant_name=%s alt_password=%s"
% (user2_tenant_name, user2_password))
raise InvalidConfiguration(msg)

395
fuel/tests/smoke/base.py Normal file
View File

@ -0,0 +1,395 @@
import netaddr
import time
from fuel import clients
from fuel import exceptions
from fuel.common import log as logging
from fuel.common.utils.data_utils import rand_name
from fuel.tests import smoke
import fuel.test
LOG = logging.getLogger(__name__)
class BaseComputeTest(fuel.test.BaseTestCase):
"""Base test case class for all Compute API tests."""
conclusion = smoke.generic_setup_package()
@classmethod
def setUpClass(cls):
cls.isolated_creds = []
if cls.config.compute.allow_tenant_isolation:
creds = cls._get_isolated_creds()
username, tenant_name, password = creds
os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
interface=cls._interface)
else:
os = clients.Manager(interface=cls._interface)
cls.os = os
cls.servers_client = os.servers_client
cls.flavors_client = os.flavors_client
cls.images_client = os.images_client
cls.floating_ips_client = os.floating_ips_client
cls.keypairs_client = os.keypairs_client
cls.security_groups_client = os.security_groups_client
cls.quotas_client = os.quotas_client
cls.limits_client = os.limits_client
cls.volumes_client = os.volumes_client
cls.snapshots_client = os.snapshots_client
cls.interfaces_client = os.interfaces_client
cls.fixed_ips_client = os.fixed_ips_client
cls.services_client = os.services_client
cls.hypervisor_client = os.hypervisor_client
cls.build_interval = cls.config.compute.build_interval
cls.build_timeout = cls.config.compute.build_timeout
cls.ssh_user = cls.config.compute.ssh_user
cls.image_ref = cls.config.smoke.image_ref
cls.image_ref_alt = cls.config.smoke.image_ref_alt
cls.flavor_ref = cls.config.smoke.flavor_ref
cls.flavor_ref_alt = cls.config.smoke.flavor_ref_alt
if os.config.network.quantum_available:
cls.network_client = os.config.network
cls.servers = []
cls.servers_client_v3_auth = os.servers_client_v3_auth
@classmethod
def _get_identity_admin_client(cls):
"""
Returns an instance of the Identity Admin API client
"""
os = clients.AdminManager(interface=cls._interface)
admin_client = os.identity_client
return admin_client
@classmethod
def _get_client_args(cls):
return (
cls.config,
cls.config.identity.admin_username,
cls.config.identity.admin_password,
cls.config.identity.uri
)
@classmethod
def _get_isolated_creds(cls):
"""
Creates a new set of user/tenant/password credentials for a
**regular** user of the Compute API so that a test case can
operate in an isolated tenant container.
"""
admin_client = cls._get_identity_admin_client()
password = "pass"
while True:
try:
rand_name_root = rand_name(cls.__name__)
if cls.isolated_creds:
# Main user already created. Create the alt one...
rand_name_root += '-alt'
tenant_name = rand_name_root + "-tenant"
tenant_desc = tenant_name + "-desc"
resp, tenant = admin_client.create_tenant(
name=tenant_name, description=tenant_desc)
break
except exceptions.Duplicate:
if cls.config.compute.allow_tenant_reuse:
tenant = admin_client.get_tenant_by_name(tenant_name)
LOG.info('Re-using existing tenant %s', tenant)
break
while True:
try:
rand_name_root = rand_name(cls.__name__)
if cls.isolated_creds:
# Main user already created. Create the alt one...
rand_name_root += '-alt'
username = rand_name_root + "-user"
email = rand_name_root + "@example.com"
resp, user = admin_client.create_user(username,
password,
tenant['id'],
email)
break
except exceptions.Duplicate:
if cls.config.compute.allow_tenant_reuse:
user = admin_client.get_user_by_username(tenant['id'],
username)
LOG.info('Re-using existing user %s', user)
break
# Store the complete creds (including UUID ids...) for later
# but return just the username, tenant_name, password tuple
# that the various clients will use.
cls.isolated_creds.append((user, tenant))
return username, tenant_name, password
@classmethod
def clear_isolated_creds(cls):
if not cls.isolated_creds:
return
admin_client = cls._get_identity_admin_client()
for user, tenant in cls.isolated_creds:
admin_client.delete_user(user['id'])
admin_client.delete_tenant(tenant['id'])
@classmethod
def clear_servers(cls):
for server in cls.servers:
try:
cls.servers_client.delete_server(server['id'])
except Exception:
pass
for server in cls.servers:
try:
cls.servers_client.wait_for_server_termination(server['id'])
except Exception:
pass
@classmethod
def create_server(cls, **kwargs):
"""Wrapper utility that returns a test server."""
name = rand_name(cls.__name__ + "-instance")
if 'name' in kwargs:
name = kwargs.pop('name')
flavor = kwargs.get('flavor', cls.flavor_ref)
image_id = kwargs.get('image_id', cls.image_ref)
resp, body = cls.servers_client.create_server(
name, image_id, flavor, **kwargs)
# handle the case of multiple servers
servers = [body]
if 'min_count' in kwargs or 'max_count' in kwargs:
# Get servers created which name match with name param.
r, b = cls.servers_client.list_servers()
servers = [s for s in b['servers'] if s['name'].startswith(name)]
cls.servers.extend(servers)
if 'wait_until' in kwargs:
for server in servers:
cls.servers_client.wait_for_server_status(
server['id'], kwargs['wait_until'])
return resp, body
@classmethod
def tearDownClass(cls):
cls.clear_servers()
cls.clear_isolated_creds()
def wait_for(self, condition):
"""Repeatedly calls condition() until a timeout."""
start_time = int(time.time())
while True:
try:
condition()
except Exception:
pass
else:
return
if int(time.time()) - start_time >= self.build_timeout:
condition()
return
time.sleep(self.build_interval)
class BaseComputeAdminTest(BaseComputeTest):
"""Base test case class for all Compute Admin API tests."""
@classmethod
def setUpClass(cls):
super(BaseComputeAdminTest, cls).setUpClass()
admin_username = cls.config.compute_admin.username
admin_password = cls.config.compute_admin.password
admin_tenant = cls.config.compute_admin.tenant_name
if not (admin_username and admin_password and admin_tenant):
msg = ("Missing Compute Admin API credentials "
"in configuration.")
raise cls.skipException(msg)
cls.os_adm = clients.ComputeAdminManager(interface=cls._interface)
class BaseIdentityAdminTest(fuel.test.BaseTestCase):
@classmethod
def setUpClass(cls):
os = clients.AdminManager(interface=cls._interface)
cls.client = os.identity_client
cls.token_client = os.token_client
cls.service_client = os.services_client
if not cls.client.has_admin_extensions():
raise cls.skipException("Admin extensions disabled")
cls.data = DataGenerator(cls.client)
os = clients.Manager(interface=cls._interface)
cls.non_admin_client = os.identity_client
@classmethod
def tearDownClass(cls):
cls.data.teardown_all()
def disable_user(self, user_name):
user = self.get_user_by_name(user_name)
self.client.enable_disable_user(user['id'], False)
def disable_tenant(self, tenant_name):
tenant = self.get_tenant_by_name(tenant_name)
self.client.update_tenant(tenant['id'], enabled=False)
def get_user_by_name(self, name):
_, users = self.client.get_users()
user = [u for u in users if u['name'] == name]
if len(user) > 0:
return user[0]
def get_tenant_by_name(self, name):
_, tenants = self.client.list_tenants()
tenant = [t for t in tenants if t['name'] == name]
if len(tenant) > 0:
return tenant[0]
def get_role_by_name(self, name):
_, roles = self.client.list_roles()
role = [r for r in roles if r['name'] == name]
if len(role) > 0:
return role[0]
class DataGenerator(object):
def __init__(self, client):
self.client = client
self.users = []
self.tenants = []
self.roles = []
self.role_name = None
def setup_test_user(self):
"""Set up a test user."""
self.setup_test_tenant()
self.test_user = rand_name('test_user_')
self.test_password = rand_name('pass_')
self.test_email = self.test_user + '@testmail.tm'
resp, self.user = self.client.create_user(self.test_user,
self.test_password,
self.tenant['id'],
self.test_email)
self.users.append(self.user)
def setup_test_tenant(self):
"""Set up a test tenant."""
self.test_tenant = rand_name('test_tenant_')
self.test_description = rand_name('desc_')
resp, self.tenant = self.client.create_tenant(
name=self.test_tenant,
description=self.test_description)
self.tenants.append(self.tenant)
def setup_test_role(self):
"""Set up a test role."""
self.test_role = rand_name('role')
resp, self.role = self.client.create_role(self.test_role)
self.roles.append(self.role)
def teardown_all(self):
for user in self.users:
self.client.delete_user(user['id'])
for tenant in self.tenants:
self.client.delete_tenant(tenant['id'])
for role in self.roles:
self.client.delete_role(role['id'])
class BaseNetworkTest(fuel.test.BaseTestCase):
@classmethod
def setUpClass(cls):
os = clients.Manager()
cls.network_cfg = os.config.network
if not cls.network_cfg.quantum_available:
raise cls.skipException("Quantum support is required")
cls.client = os.network_client
cls.networks = []
cls.subnets = []
@classmethod
def tearDownClass(cls):
for subnet in cls.subnets:
cls.client.delete_subnet(subnet['id'])
for network in cls.networks:
cls.client.delete_network(network['id'])
@classmethod
def create_network(cls, network_name=None):
"""Wrapper utility that returns a test network."""
network_name = network_name or rand_name('test-network-')
resp, body = cls.client.create_network(network_name)
network = body['network']
cls.networks.append(network)
return network
@classmethod
def create_subnet(cls, network):
"""Wrapper utility that returns a test subnet."""
cidr = netaddr.IPNetwork(cls.network_cfg.tenant_network_cidr)
mask_bits = cls.network_cfg.tenant_network_mask_bits
# Find a cidr that is not in use yet and create a subnet with it
for subnet_cidr in cidr.subnet(mask_bits):
try:
resp, body = cls.client.create_subnet(network['id'],
str(subnet_cidr))
break
except exceptions.BadRequest as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
subnet = body['subnet']
cls.subnets.append(subnet)
return subnet
class FixedIPsBase(BaseComputeAdminTest):
_interface = 'json'
ip = None
@classmethod
def setUpClass(cls):
super(FixedIPsBase, cls).setUpClass()
# NOTE(maurosr): The idea here is: the server creation is just an
# auxiliary element to the ip details or reservation, there was no way
# (at least none in my mind) to get an valid and existing ip except
# by creating a server and using its ip. So the intention is to create
# fewer server possible (one) and use it to both: json and xml tests.
# This decreased time to run both tests, in my test machine, from 53
# secs to 29 (agains 23 secs when running only json tests)
if cls.ip is None:
cls.client = cls.os_adm.fixed_ips_client
cls.non_admin_client = cls.fixed_ips_client
resp, server = cls.create_server(wait_until='ACTIVE')
resp, server = cls.servers_client.get_server(server['id'])
for ip_set in server['addresses']:
for ip in server['addresses'][ip_set]:
if ip['OS-EXT-IPS:type'] == 'fixed':
cls.ip = ip['addr']
break
if cls.ip:
break

View File

@ -0,0 +1,66 @@
from fuel.common.utils import data_utils
from fuel.test import attr
from fuel.tests.smoke import base
""" Test module contains tests for flavor creation/deletion. """
class FlavorsAdminTest(base.BaseComputeAdminTest):
"""
Tests for flavor creation that require admin privileges.
"""
_interface = 'json'
@classmethod
def setUpClass(cls):
super(FlavorsAdminTest, cls).setUpClass()
cls.client = cls.os_adm.flavors_client
cls.user_client = cls.os.flavors_client
cls.flavor_name_prefix = 'test_flavor_'
cls.ram = 256
cls.vcpus = 1
cls.disk = 0
cls.ephemeral = 0
cls.swap = 256
cls.rxtx = 2
@attr(type=["fuel", "smoke"])
def test_create_flavor(self):
"""
Test low requirements flavor creation.
"""
# Create a flavor.
# This operation requires the user to have 'admin' role.
flavor_name = data_utils.rand_name(self.flavor_name_prefix)
new_flavor_id = data_utils.rand_int_id(start=1000)
#Create the flavor
resp, flavor = self.client.create_flavor(flavor_name,
self.ram, self.vcpus,
self.disk,
new_flavor_id,
ephemeral=self.ephemeral,
swap=self.swap,
rxtx=self.rxtx)
self.flavor = flavor
self.assertEqual(200, resp.status)
self.assertEqual(flavor['name'], flavor_name)
self.assertEqual(flavor['vcpus'], self.vcpus)
self.assertEqual(flavor['disk'], self.disk)
self.assertEqual(flavor['ram'], self.ram)
self.assertEqual(int(flavor['id']), new_flavor_id)
self.assertEqual(flavor['swap'], self.swap)
self.assertEqual(flavor['rxtx_factor'], self.rxtx)
self.assertEqual(flavor['OS-FLV-EXT-DATA:ephemeral'],
self.ephemeral)
#Verify flavor is retrieved
resp, flavor = self.client.get_flavor_details(new_flavor_id)
self.assertEqual(resp.status, 200)
# TODO: add teardown for this test.

View File

@ -0,0 +1,32 @@
from fuel.common.utils import data_utils
from fuel.test import attr
from fuel.tests.smoke import base
class KeyPairsTestJSON(base.BaseComputeTest):
_interface = 'json'
@classmethod
def setUpClass(cls):
super(KeyPairsTestJSON, cls).setUpClass()
cls.client = cls.keypairs_client
@attr(type=['fuel', 'smoke'])
def test_keypair_create_delete(self):
""" Test keypair creation and deletion. """
k_name = data_utils.rand_name('keypair-')
resp, keypair = self.client.create_keypair(k_name)
self.assertEqual(200, resp.status)
private_key = keypair['private_key']
key_name = keypair['name']
self.assertEqual(key_name, k_name,
"The created keypair name is not equal "
"to the requested name")
self.assertTrue(private_key is not None,
"Field private_key is empty or not found.")
resp, _ = self.client.delete_keypair(k_name)
self.assertEqual(202, resp.status)
# TODO: add teardown for this test.

View File

@ -0,0 +1,50 @@
import netaddr
from fuel.common.utils import data_utils
from fuel.test import attr
from fuel.tests.smoke import base
"""
Test module for network creation.
Dependencies:
v2.0 of the Quantum API is assumed. It is also assumed that the following
options are defined in the [network] section of etc/tempest.conf:
- tenant_network_cidr with a block of cidr's from which smaller blocks
can be allocated for tenant networks;
- tenant_network_mask_bits with the mask bits to be used to partition the
block defined by tenant-network_cidr.
"""
class NetworksTest(base.BaseComputeTest):
"""
Test class for tenant netwprk creation the Quantum API using the REST client for
Quantum.
"""
_interface = 'json'
@classmethod
def setUpClass(cls):
super(NetworksTest, cls).setUpClass()
@attr(type=["fuel", "smoke"])
def test_create_network(self):
""" Test network creation. """
network_name = data_utils.rand_name('test-network-')
resp, body = self.network_client.create_network(network_name)
network = body['network']
self.assertEqual(body['name'], network_name)
self.assertTrue(network['id'] is not None)
cidr = netaddr.IPNetwork(self.network_cfg.tenant_network_cidr)
#TODO: finish this test (it`s incomplete for now).
#TODO: add teardown for this test.

View File

@ -0,0 +1,40 @@
from fuel.common.utils import data_utils
from fuel.test import attr
from fuel.tests.smoke import base
class SecurityGroupsTest(base.BaseComputeTest):
"""
Test security group creation.
"""
_interface = 'json'
@classmethod
def setUpClass(cls):
super(SecurityGroupsTest, cls).setUpClass()
cls.client = cls.security_groups_client
def _delete_security_group(self, securitygroup_id):
resp, _ = self.client.delete_security_group(securitygroup_id)
self.assertEqual(202, resp.status)
@attr(type=['fuel', 'smoke'])
def test_security_group_create_delete(self):
# Security Group should be created, verified and deleted
s_name = data_utils.rand_name('securitygroup-')
s_description = data_utils.rand_name('description-')
resp, securitygroup = \
self.client.create_security_group(s_name, s_description)
self.assertTrue('id' in securitygroup)
securitygroup_id = securitygroup['id']
self.id = securitygroup_id
self.addCleanup(self._delete_security_group,
securitygroup_id)
self.assertEqual(200, resp.status)
self.assertFalse(securitygroup_id is None)
self.assertTrue('name' in securitygroup)
securitygroup_name = securitygroup['name']
self.assertEqual(securitygroup_name, s_name,
"The created Security Group name is "
"not equal to the requested name")

View File

@ -0,0 +1,59 @@
import requests
from fuel.common.utils import data_utils
from fuel.test import attr
from fuel.tests.smoke import base
class TestUserTenantRole(base.BaseIdentityAdminTest):
"""
Test class verifies the following:
- verify tenant can be created;
- verify user can be created on base of created tenant;
- verify user role can be created.
"""
_interface = 'json'
alt_user = data_utils.rand_name('test_user_')
alt_password = data_utils.rand_name('pass_')
alt_email = alt_user + '@testmail.tm'
alt_tenant = data_utils.rand_name('test_tenant_')
alt_description = data_utils.rand_name('desc_')
alt_role = data_utils.rand_name('role_')
@attr(type=["fuel", "smoke"])
def test_create_user(self):
# Create a tenant:
resp, tenant = self.client.create_tenant(self.alt_tenant)
self.assertEqual('200', resp['status'])
# Create a user:
resp, user = self.client.create_user(self.alt_user, self.alt_password,
tenant['id'],
self.alt_email)
self.assertEqual('200', resp['status'])
self.assertEqual(self.alt_user, user['name'])
# Create a user role:
resp, role = self.client.create_role(user['name'])
self.assertEqual('200', resp['status'])
# Authenticate with created user:
resp, body = self.token_client.auth(user['name'], self.alt_password, tenant['name'])
self.assertEqual('200', resp['status'])
# Auth in horizon with non-admin user
client = requests.session()
url = self.config.identity.url
# Retrieve the CSRF token first
client.get(url) # sets cookie
csrftoken = client.cookies['csrftoken']
login_data = dict(username=user['name'],
password=self.alt_password,
csrfmiddlewaretoken=csrftoken,
next='/')
resp = client.post(url, data=login_data, headers=dict(Referer=url))
self.assertEqual(resp.status_code, 200)