oswin-tempest-plugin/oswin_tempest_plugin/tests/test_base.py

275 lines
10 KiB
Python

# Copyright 2017 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from oslo_log import log as logging
from tempest.common import compute
from tempest.common.utils.linux import remote_client
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
import tempest.test
from oswin_tempest_plugin import config
CONF = config.CONF
LOG = logging.getLogger(__name__)
Server_tuple = collections.namedtuple(
'Server_tuple',
['server', 'floating_ip', 'keypair', 'security_groups'])
class TestBase(tempest.test.BaseTestCase):
"""Base class for tests."""
credentials = ['primary', 'admin']
# Inheriting TestCases should change this version if needed.
_MIN_HYPERV_VERSION = 6002
# Inheriting TestCases should change this image ref if needed.
_IMAGE_REF = CONF.compute.image_ref
# suffix to use for the newly created flavors.
_FLAVOR_SUFFIX = ''
@classmethod
def skip_checks(cls):
super(TestBase, cls).skip_checks()
# check if the configured hypervisor_version is higher than
# the test's required minimum Hyper-V version.
# TODO(claudiub): instead of expecting a config option specifying
# the hypervisor version, we could check nova's compute nodes for
# their hypervisor versions.
config_vers = CONF.hyperv.hypervisor_version
if config_vers < cls._MIN_HYPERV_VERSION:
msg = ('Configured hypervisor_version (%(config_vers)s) is not '
'supported. It must be higher than %(req_vers)s.' % {
'config_vers': config_vers,
'req_vers': cls._MIN_HYPERV_VERSION})
raise cls.skipException(msg)
@classmethod
def setup_clients(cls):
super(TestBase, cls).setup_clients()
# Compute client
cls.compute_fips_client = (
cls.os_primary.compute_floating_ips_client)
cls.keypairs_client = cls.os_primary.keypairs_client
cls.servers_client = cls.os_primary.servers_client
cls.admin_servers_client = cls.os_admin.servers_client
cls.admin_flavors_client = cls.os_admin.flavors_client
cls.admin_migrations_client = cls.os_admin.migrations_client
cls.admin_hypervisor_client = cls.os_admin.hypervisor_client
# Neutron network client
cls.security_groups_client = (
cls.os_primary.security_groups_client)
cls.security_group_rules_client = (
cls.os_primary.security_group_rules_client)
def create_floating_ip(self, server):
"""Create a floating IP and associates to a server on Nova"""
pool_name = CONF.network.floating_network_name
floating_ip = (
self.compute_fips_client.create_floating_ip(pool=pool_name))
floating_ip = floating_ip['floating_ip']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.compute_fips_client.delete_floating_ip,
floating_ip['id'])
self.compute_fips_client.associate_floating_ip_to_server(
floating_ip['ip'], server['id'])
return floating_ip
def create_keypair(self):
name = data_utils.rand_name(self.__class__.__name__)
body = self.keypairs_client.create_keypair(name=name)
self.addCleanup(self.keypairs_client.delete_keypair, name)
return body['keypair']
def _get_image_ref(self):
return self._IMAGE_REF
def _flavor_cleanup(self, flavor_id):
try:
self.admin_flavors_client.delete_flavor(flavor_id)
self.admin_flavors_client.wait_for_resource_deletion(flavor_id)
except exceptions.NotFound:
pass
def _create_new_flavor(self, flavor_ref, flavor_updates):
"""Creates a new flavor based on the given flavor and flavor updates.
:returns: the newly created flavor's ID.
"""
flavor = self.admin_flavors_client.show_flavor(flavor_ref)['flavor']
flavor_name = 'test_resize'
if self._FLAVOR_SUFFIX:
flavor_name += '_%s' % self._FLAVOR_SUFFIX
new_flavor = self.admin_flavors_client.create_flavor(
name=data_utils.rand_name(flavor_name),
ram=flavor['ram'] + flavor_updates.get('ram', 0),
disk=flavor['disk'] + flavor_updates.get('disk', 0),
vcpus=flavor['vcpus'] + flavor_updates.get('vcpus', 0),
)['flavor']
self.addCleanup(self._flavor_cleanup, new_flavor['id'])
# Add flavor extra_specs, if needed.
extra_specs = flavor_updates.get('extra_specs')
if extra_specs:
self.admin_flavors_client.set_flavor_extra_spec(
new_flavor['id'], **extra_specs)
return new_flavor
def _get_flavor_ref(self):
return CONF.compute.flavor_ref
def _create_server(self, flavor=None):
"""Wrapper utility that returns a test server.
This wrapper utility calls the common create test server and
returns a test server.
"""
clients = self.os_primary
name = data_utils.rand_name(self.__class__.__name__ + "-server")
image_id = self._get_image_ref()
flavor = flavor or self._get_flavor_ref()
keypair = self.create_keypair()
tenant_network = self.get_tenant_network()
security_group = self._create_security_group()
# we need to pass the security group's name to the instance.
sg_group_names = [{'name': security_group['name']}]
body, _ = compute.create_test_server(
clients, name=name,
flavor=flavor,
image_id=image_id,
key_name=keypair['name'],
tenant_network=tenant_network,
security_groups=sg_group_names,
wait_until='ACTIVE')
self.addCleanup(waiters.wait_for_server_termination,
self.servers_client, body['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.servers_client.delete_server, body['id'])
server = clients.servers_client.show_server(body['id'])['server']
floating_ip = self.create_floating_ip(server)
server_tuple = Server_tuple(
server=server,
keypair=keypair,
floating_ip=floating_ip,
security_groups=[security_group])
return server_tuple
def _create_security_group(self):
sg_name = data_utils.rand_name(self.__class__.__name__)
sg_desc = sg_name + " description"
secgroup = self.security_groups_client.create_security_group(
name=sg_name, description=sg_desc)['security_group']
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.security_groups_client.delete_security_group,
secgroup['id'])
# Add rules to the security group
self._create_loginable_secgroup_rule(secgroup)
return secgroup
def _create_loginable_secgroup_rule(self, secgroup):
"""Create loginable security group rule
This function will create:
1. egress and ingress tcp port 22 allow rule in order to allow ssh
access for ipv4.
3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
"""
rulesets = [
# ssh
dict(protocol='tcp',
port_range_min=22,
port_range_max=22),
# ping
dict(protocol='icmp'),
]
for ruleset in rulesets:
for r_direction in ['ingress', 'egress']:
ruleset['direction'] = r_direction
sg_rule = self._create_security_group_rule(
secgroup, **ruleset)
def _create_security_group_rule(self, secgroup, **kwargs):
"""Create a rule from a dictionary of rule parameters.
Creates a rule in a secgroup.
:param secgroup: the security group.
:param kwargs: a dictionary containing rule parameters:
for example, to allow incoming ssh:
rule = {
direction: 'ingress'
protocol:'tcp',
port_range_min: 22,
port_range_max: 22
}
"""
ruleset = dict(security_group_id=secgroup['id'],
tenant_id=secgroup['tenant_id'])
ruleset.update(kwargs)
sec_group_rules_client = self.security_group_rules_client
sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
sg_rule = sg_rule['security_group_rule']
return sg_rule
def _wait_for_server_status(self, server, status='ACTIVE'):
waiters.wait_for_server_status(self.servers_client, server['id'],
status)
def _get_server_client(self, server_tuple):
"""Get a SSH client to a remote server
:returns: RemoteClient object
"""
server = server_tuple.server
ip_address = server_tuple.floating_ip['ip']
private_key = server_tuple.keypair['private_key']
# ssh into the VM.
username = CONF.validation.image_ssh_user
linux_client = remote_client.RemoteClient(
ip_address, username, pkey=private_key, password=None,
server=server, servers_client=self.servers_client)
linux_client.validate_authentication()
return linux_client
def _check_server_connectivity(self, server_tuple):
# if server connectivity works, an SSH client can be opened.
self._get_server_client(server_tuple)