tempest/tempest/scenario/utils.py

185 lines
6.5 KiB
Python

# Copyright 2013 Hewlett-Packard, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import string
import unicodedata
from oslo_serialization import jsonutils as json
import testscenarios
import testtools
from tempest import clients
from tempest.common import credentials_factory as credentials
from tempest import config
from tempest.lib.common.utils import misc
from tempest.lib import exceptions as exc_lib
CONF = config.CONF
class ImageUtils(object):
default_ssh_user = 'root'
def __init__(self, os):
# Load configuration items
self.ssh_users = json.loads(CONF.input_scenario.ssh_user_regex)
self.non_ssh_image_pattern = \
CONF.input_scenario.non_ssh_image_regex
# Setup clients
self.compute_images_client = os.compute_images_client
self.flavors_client = os.flavors_client
def ssh_user(self, image_id):
_image = self.compute_images_client.show_image(image_id)['image']
for regex, user in self.ssh_users:
# First match wins
if re.match(regex, _image['name']) is not None:
return user
else:
return self.default_ssh_user
def _is_sshable_image(self, image):
return not re.search(pattern=self.non_ssh_image_pattern,
string=str(image['name']))
def is_sshable_image(self, image_id):
_image = self.compute_images_client.show_image(image_id)['image']
return self._is_sshable_image(_image)
def _is_flavor_enough(self, flavor, image):
return image['minDisk'] <= flavor['disk']
def is_flavor_enough(self, flavor_id, image_id):
_image = self.compute_images_client.show_image(image_id)['image']
_flavor = self.flavors_client.show_flavor(flavor_id)['flavor']
return self._is_flavor_enough(_flavor, _image)
@misc.singleton
class InputScenarioUtils(object):
"""Example usage:
import testscenarios
(...)
load_tests = testscenarios.load_tests_apply_scenarios
class TestInputScenario(manager.ScenarioTest):
scenario_utils = utils.InputScenarioUtils()
scenario_flavor = scenario_utils.scenario_flavors
scenario_image = scenario_utils.scenario_images
scenarios = testscenarios.multiply_scenarios(scenario_image,
scenario_flavor)
def test_create_server_metadata(self):
name = rand_name('instance')
self.servers_client.create_server(name=name,
flavorRef=self.flavor_ref,
imageRef=self.image_ref)
"""
validchars = "-_.{ascii}{digit}".format(ascii=string.ascii_letters,
digit=string.digits)
def __init__(self):
network_resources = {
'network': False,
'router': False,
'subnet': False,
'dhcp': False,
}
self.cred_provider = credentials.get_credentials_provider(
name='InputScenarioUtils',
identity_version=CONF.identity.auth_version,
network_resources=network_resources)
os = clients.Manager(self.cred_provider.get_primary_creds())
self.compute_images_client = os.compute_images_client
self.flavors_client = os.flavors_client
self.image_pattern = CONF.input_scenario.image_regex
self.flavor_pattern = CONF.input_scenario.flavor_regex
def _normalize_name(self, name):
nname = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore')
nname = ''.join(c for c in nname if c in self.validchars)
return nname
def clear_creds(self):
self.cred_provider.clear_creds()
@property
def scenario_images(self):
""":return: a scenario with name and uuid of images"""
if not CONF.service_available.glance:
return []
if not hasattr(self, '_scenario_images'):
try:
images = self.compute_images_client.list_images()['images']
self._scenario_images = [
(self._normalize_name(i['name']), dict(image_ref=i['id']))
for i in images if re.search(self.image_pattern,
str(i['name']))
]
except Exception:
self._scenario_images = []
return self._scenario_images
@property
def scenario_flavors(self):
""":return: a scenario with name and uuid of flavors"""
if not hasattr(self, '_scenario_flavors'):
try:
flavors = self.flavors_client.list_flavors()['flavors']
self._scenario_flavors = [
(self._normalize_name(f['name']), dict(flavor_ref=f['id']))
for f in flavors if re.search(self.flavor_pattern,
str(f['name']))
]
except Exception:
self._scenario_flavors = []
return self._scenario_flavors
def load_tests_input_scenario_utils(*args):
"""Wrapper for testscenarios to set the scenarios
The purpose is to avoid running a getattr on the CONF object at import.
"""
if getattr(args[0], 'suiteClass', None) is not None:
loader, standard_tests, pattern = args
else:
standard_tests, module, loader = args
output = None
scenario_utils = None
try:
scenario_utils = InputScenarioUtils()
scenario_flavor = scenario_utils.scenario_flavors
scenario_image = scenario_utils.scenario_images
except (exc_lib.InvalidCredentials, TypeError):
output = standard_tests
finally:
if scenario_utils:
scenario_utils.clear_creds()
if output is not None:
return output
for test in testtools.iterate_tests(standard_tests):
setattr(test, 'scenarios', testscenarios.multiply_scenarios(
scenario_image,
scenario_flavor))
return testscenarios.load_tests_apply_scenarios(*args)