Input scenario capability for tempest

Adding the ability to generate multiple tests from
a template one based on input scenarios.
The implementation is based on testscenarios (added
to requirements.txt), and the correct behaviour
could be verified using unittest or testr + subunit,
both serial and parallel runs.

The generation of the array / matrix of tests relies
on the load_test protocol. nosetests does not implement
the protocol by default.

Static scenarios can be added to any tests with very
little effort. This commit introduces a couple of
dynamic scenarios:
- all flavours matching a configurable regex
- all images matching a configurable regex

Dynamic scenarios are coded in test_utils, and provided
by InputScenarioUtils. The class includes in the doc
an example on how to use input scenarios.

Additional scenarios could be AZs, or credentials.

This commit adapts existing test_server_basic_ops
to use input scenarios, and include server
ssh verification.

This change implements blueprint input-scenarios-for-scenario.
Implements: blueprint input-scenarios-for-scenario

Change-Id: Ia86f48772ac02d67faa1c7d764cb9dc0938f6452
This commit is contained in:
Andrea Frittoli 2013-12-06 07:08:07 +00:00 committed by Gerrit Code Review
parent 651a30dba7
commit f5da28bdf5
5 changed files with 241 additions and 3 deletions

View File

@ -471,6 +471,29 @@
#api_v1=true
[input-scenario]
#
# Options defined in tempest.config
#
# Matching images become parameters for scenario tests (string
# value)
#image_regex=^cirros-0.3.1-x86_64-uec$
# Matching flavors become parameters for scenario tests
# (string value)
#flavor_regex=^m1.(micro|nano|tiny)$
# SSH verification in tests is skippedfor matching images
# (string value)
#non_ssh_image_regex=^.*[Ww]in.*$
# List of user mapped to regex to matching image names.
# (string value)
#ssh_user_regex=[["^.*[Cc]irros.*$", "root"]]
[network]
#

View File

@ -22,3 +22,4 @@ oslo.config>=1.2.0
six>=1.4.1
iso8601>=0.1.8
fixtures>=0.3.14
testscenarios>=0.4

View File

@ -0,0 +1,136 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Hewlett-Packard, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common.utils import misc
from tempest import config
from tempest.scenario import manager
import json
import re
import string
import unicodedata
CONF = config.CONF
@misc.singleton
class ImageUtils(object):
default_ssh_user = 'root'
def __init__(self):
# Load configuration items
self.ssh_users = json.loads(CONF.input_scenario.ssh_user_regex)
self.non_ssh_image_pattern = \
CONF.input_scenario.non_ssh_image_regex
# Setup clients
ocm = manager.OfficialClientManager(CONF.identity.username,
CONF.identity.password,
CONF.identity.tenant_name)
self.client = ocm.compute_client
def ssh_user(self, image_id):
_image = self.client.images.get(image_id)
for regex, user in self.ssh_users:
# First match wins
if re.match(regex, _image.name) is not None:
return user
else:
return self.default_ssh_user
def _is_sshable_image(self, image):
return not re.search(pattern=self.non_ssh_image_pattern,
string=str(image.name))
def is_sshable_image(self, image_id):
_image = self.client.images.get(image_id)
return self._is_sshable_image(_image)
def _is_flavor_enough(self, flavor, image):
return image.minDisk <= flavor.disk
def is_flavor_enough(self, flavor_id, image_id):
_image = self.client.images.get(image_id)
_flavor = self.client.flavors.get(flavor_id)
return self._is_flavor_enough(_flavor, _image)
@misc.singleton
class InputScenarioUtils(object):
"""
Example usage:
import testscenarios
(...)
load_tests = testscenarios.load_tests_apply_scenarios
class TestInputScenario(manager.OfficialClientTest):
scenario_utils = test_utils.InputScenarioUtils()
scenario_flavor = scenario_utils.scenario_flavors
scenario_image = scenario_utils.scenario_images
scenarios = testscenarios.multiply_scenarios(scenario_image,
scenario_flavor)
def test_create_server_metadata(self):
name = rand_name('instance')
_ = self.compute_client.servers.create(name=name,
flavor=self.flavor_ref,
image=self.image_ref)
"""
validchars = "-_.{ascii}{digit}".format(ascii=string.ascii_letters,
digit=string.digits)
def __init__(self):
ocm = manager.OfficialClientManager(CONF.identity.username,
CONF.identity.password,
CONF.identity.tenant_name)
self.client = ocm.compute_client
self.image_pattern = CONF.input_scenario.image_regex
self.flavor_pattern = CONF.input_scenario.flavor_regex
def _normalize_name(self, name):
nname = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore')
nname = ''.join(c for c in nname if c in self.validchars)
return nname
@property
def scenario_images(self):
"""
:return: a scenario with name and uuid of images
"""
if not hasattr(self, '_scenario_images'):
images = self.client.images.list(detailed=False)
self._scenario_images = [
(self._normalize_name(i.name), dict(image_ref=i.id))
for i in images if re.search(self.image_pattern, str(i.name))
]
return self._scenario_images
@property
def scenario_flavors(self):
"""
:return: a scenario with name and uuid of flavors
"""
if not hasattr(self, '_scenario_flavors'):
flavors = self.client.flavors.list(detailed=False)
self._scenario_flavors = [
(self._normalize_name(f.name), dict(flavor_ref=f.id))
for f in flavors if re.search(self.flavor_pattern, str(f.name))
]
return self._scenario_flavors

View File

@ -658,6 +658,27 @@ DebugGroup = [
help="Enable diagnostic commands"),
]
input_scenario_group = cfg.OptGroup(name="input-scenario",
title="Filters and values for"
" input scenarios")
InputScenarioGroup = [
cfg.StrOpt('image_regex',
default='^cirros-0.3.1-x86_64-uec$',
help="Matching images become parameters for scenario tests"),
cfg.StrOpt('flavor_regex',
default='^m1.(micro|nano|tiny)$',
help="Matching flavors become parameters for scenario tests"),
cfg.StrOpt('non_ssh_image_regex',
default='^.*[Ww]in.*$',
help="SSH verification in tests is skipped"
"for matching images"),
cfg.StrOpt('ssh_user_regex',
default="[[\"^.*[Cc]irros.*$\", \"root\"]]",
help="List of user mapped to regex "
"to matching image names."),
]
baremetal_group = cfg.OptGroup(name='baremetal',
title='Baremetal provisioning service options')
@ -735,7 +756,7 @@ class TempestConfigPrivate(object):
ServiceAvailableGroup)
register_opt_group(cfg.CONF, debug_group, DebugGroup)
register_opt_group(cfg.CONF, baremetal_group, BaremetalGroup)
register_opt_group(cfg.CONF, input_scenario_group, InputScenarioGroup)
self.compute = cfg.CONF.compute
self.compute_feature_enabled = cfg.CONF['compute-feature-enabled']
self.identity = cfg.CONF.identity
@ -759,7 +780,7 @@ class TempestConfigPrivate(object):
self.service_available = cfg.CONF.service_available
self.debug = cfg.CONF.debug
self.baremetal = cfg.CONF.baremetal
self.input_scenario = cfg.CONF['input-scenario']
if not self.compute_admin.username:
self.compute_admin.username = self.identity.admin_username
self.compute_admin.password = self.identity.admin_password

View File

@ -16,12 +16,21 @@
# under the License.
from tempest.common.utils import data_utils
from tempest.common.utils import test_utils
from tempest.openstack.common import log as logging
from tempest.scenario import manager
from tempest.test import services
import testscenarios
LOG = logging.getLogger(__name__)
# NOTE(andreaf) - nose does not honour the load_tests protocol
# however it's test discovery regex will match anything
# which includes _tests. So nose would require some further
# investigation to be supported with this
load_tests = testscenarios.load_tests_apply_scenarios
class TestServerBasicOps(manager.OfficialClientTest):
@ -37,6 +46,37 @@ class TestServerBasicOps(manager.OfficialClientTest):
* Terminate the instance
"""
scenario_utils = test_utils.InputScenarioUtils()
scenario_flavor = scenario_utils.scenario_flavors
scenario_image = scenario_utils.scenario_images
scenarios = testscenarios.multiply_scenarios(scenario_image,
scenario_flavor)
def setUp(self):
super(TestServerBasicOps, self).setUp()
# Setup image and flavor the test instance
# Support both configured and injected values
if not hasattr(self, 'image_ref'):
self.image_ref = self.config.compute.image_ref
if not hasattr(self, 'flavor_ref'):
self.flavor_ref = self.config.compute.flavor_ref
self.image_utils = test_utils.ImageUtils()
if not self.image_utils.is_flavor_enough(self.flavor_ref,
self.image_ref):
raise self.skipException(
'{image} does not fit in {flavor}'.format(
image=self.image_ref, flavor=self.flavor_ref
)
)
self.run_ssh = self.config.compute.run_ssh and \
self.image_utils.is_sshable_image(self.image_ref)
self.ssh_user = self.image_utils.ssh_user(self.image_ref)
LOG.debug('Starting test for i:{image}, f:{flavor}. '
'Run ssh: {ssh}, user: {ssh_user}'.format(
image=self.image_ref, flavor=self.flavor_ref,
ssh=self.run_ssh, ssh_user=self.ssh_user))
def add_keypair(self):
self.keypair = self.create_keypair()
@ -53,10 +93,13 @@ class TestServerBasicOps(manager.OfficialClientTest):
self._create_loginable_secgroup_rule_nova(secgroup_id=self.secgroup.id)
def boot_instance(self):
# Create server with image and flavor from input scenario
create_kwargs = {
'key_name': self.keypair.id
}
instance = self.create_server(create_kwargs=create_kwargs)
instance = self.create_server(image=self.image_ref,
flavor=self.flavor_ref,
create_kwargs=create_kwargs)
self.set_resource('instance', instance)
def pause_server(self):
@ -100,6 +143,19 @@ class TestServerBasicOps(manager.OfficialClientTest):
instance.delete()
self.remove_resource('instance')
def verify_ssh(self):
if self.run_ssh:
# Obtain a floating IP
floating_ip = self.compute_client.floating_ips.create()
# Attach a floating IP
instance = self.get_resource('instance')
instance.add_floating_ip(floating_ip)
# Check ssh
self.get_remote_client(
server_or_ip=floating_ip.ip,
username=self.image_utils.ssh_user(self.image_ref),
private_key=self.keypair.private)
@services('compute', 'network')
def test_server_basicops(self):
self.add_keypair()
@ -109,4 +165,5 @@ class TestServerBasicOps(manager.OfficialClientTest):
self.unpause_server()
self.suspend_server()
self.resume_server()
self.verify_ssh()
self.terminate_instance()