910 lines
37 KiB
Python
910 lines
37 KiB
Python
# Copyright (c) 2015 Mirantis Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from __future__ import print_function
|
|
import functools
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
import traceback
|
|
|
|
import fixtures
|
|
from oslo_utils import timeutils
|
|
import prettytable
|
|
import six
|
|
from tempest.lib import base
|
|
from tempest.lib.common import ssh as connection
|
|
from tempest.lib import exceptions as exc
|
|
|
|
from sahara_tests.scenario import clients
|
|
from sahara_tests.scenario import timeouts
|
|
from sahara_tests.scenario import utils
|
|
from sahara_tests.utils import crypto as ssh
|
|
from sahara_tests.utils import url as utils_url
|
|
|
|
logger = logging.getLogger('swiftclient')
|
|
logger.setLevel(logging.CRITICAL)
|
|
|
|
CHECK_OK_STATUS = "OK"
|
|
CHECK_FAILED_STATUS = "FAILED"
|
|
CLUSTER_STATUS_ACTIVE = "Active"
|
|
CLUSTER_STATUS_ERROR = "Error"
|
|
HEALTH_CHECKS = ["RED", "YELLOW", "GREEN"]
|
|
|
|
|
|
def track_result(check_name, exit_with_error=True):
|
|
def decorator(fct):
|
|
@functools.wraps(fct)
|
|
def wrapper(self, *args, **kwargs):
|
|
started_at = timeutils.utcnow()
|
|
test_info = {
|
|
'check_name': check_name,
|
|
'status': CHECK_OK_STATUS,
|
|
'start_time': started_at,
|
|
'duration': None,
|
|
'traceback': None,
|
|
'exception_time': None
|
|
}
|
|
self._results.append(test_info)
|
|
try:
|
|
return fct(self, *args, **kwargs)
|
|
except Exception:
|
|
test_info['exception_time'] = timeutils.utcnow().strftime(
|
|
'%Y%m%d_%H%M%S')
|
|
test_info['status'] = CHECK_FAILED_STATUS
|
|
test_info['traceback'] = traceback.format_exception(
|
|
*sys.exc_info())
|
|
if exit_with_error:
|
|
raise
|
|
finally:
|
|
test_time = timeutils.utcnow() - started_at
|
|
test_info['duration'] = test_time.seconds
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
class BaseTestCase(base.BaseTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(BaseTestCase, cls).setUpClass()
|
|
cls.network = None
|
|
cls.credentials = None
|
|
cls.testcase = None
|
|
cls._results = []
|
|
cls.report = False
|
|
cls.results_dir = '.'
|
|
cls.default_templ_dir = '.'
|
|
cls.use_api_v2 = False
|
|
|
|
def setUp(self):
|
|
super(BaseTestCase, self).setUp()
|
|
self._init_clients()
|
|
timeouts.Defaults.init_defaults(self.testcase)
|
|
self.testcase['ssh_username'] = self.sahara.register_image(
|
|
self.glance.get_image_id(self.testcase['image']),
|
|
self.testcase).username
|
|
self.key = self.testcase.get('key_name')
|
|
if self.key is None:
|
|
self.private_key, self.public_key = ssh.generate_key_pair()
|
|
self.key_name = self.__create_keypair()
|
|
# save the private key if retain_resources is specified
|
|
# (useful for debugging purposes)
|
|
if self.testcase['retain_resources'] or self.key is None:
|
|
private_key_file_name = os.path.join(self.results_dir,
|
|
self.key_name + '.key')
|
|
with open(private_key_file_name, 'w+') as private_key_file:
|
|
private_key_file.write(self.private_key)
|
|
os.chmod(private_key_file_name, 0o600)
|
|
self.plugin_version_option = 'plugin_version'
|
|
if not self.use_api_v2:
|
|
self.plugin_version_option = 'hadoop_version'
|
|
self.plugin_opts = {
|
|
'plugin_name': self.testcase['plugin_name'],
|
|
self.plugin_version_option: self.testcase['plugin_version']
|
|
}
|
|
self.cinder = True
|
|
self.proxy = False
|
|
|
|
def _init_clients(self):
|
|
username = self.credentials['os_username']
|
|
password = self.credentials['os_password']
|
|
tenant_name = self.credentials['os_tenant']
|
|
auth_url = self.credentials['os_auth_url']
|
|
sahara_service_type = self.credentials['sahara_service_type']
|
|
sahara_url = self.credentials['sahara_url']
|
|
auth_version = '3.0' if 'v3' in auth_url else '2.0'
|
|
session = clients.get_session(auth_url, username, password,
|
|
tenant_name,
|
|
self.credentials.get('ssl_verify',
|
|
False),
|
|
self._get_file_with_defaults(
|
|
self.credentials.get('ssl_cert')))
|
|
|
|
api_version = '2' if self.use_api_v2 else '1.1'
|
|
self.sahara = clients.SaharaClient(session=session,
|
|
service_type=sahara_service_type,
|
|
sahara_url=sahara_url,
|
|
api_version=api_version)
|
|
self.nova = clients.NovaClient(session=session)
|
|
self.neutron = clients.NeutronClient(session=session)
|
|
# swiftclient doesn't support keystone sessions
|
|
self.swift = clients.SwiftClient(
|
|
auth_version=auth_version,
|
|
authurl=auth_url,
|
|
user=username,
|
|
key=password,
|
|
insecure=not self.credentials.get('ssl_verify', False),
|
|
cacert=self.credentials.get('ssl_cert'),
|
|
tenant_name=tenant_name)
|
|
self.glance = clients.GlanceClient(session=session)
|
|
# boto is not an OpenStack client, but we can handle it as well
|
|
self.boto = None
|
|
if self.credentials.get("s3_endpoint", None):
|
|
self.boto = clients.BotoClient(
|
|
endpoint=self.credentials["s3_endpoint"],
|
|
accesskey=self.credentials["s3_accesskey"],
|
|
secretkey=self.credentials["s3_secretkey"])
|
|
|
|
def create_cluster(self):
|
|
self.cluster_id = self.sahara.get_cluster_id(
|
|
self.testcase.get('existing_cluster'))
|
|
self.ng_id_map = {}
|
|
if self.cluster_id is None:
|
|
self.ng_id_map = self._create_node_group_templates()
|
|
cl_tmpl_id = self._create_cluster_template()
|
|
self.cluster_id = self._create_cluster(cl_tmpl_id)
|
|
elif self.key is None:
|
|
self.cinder = False
|
|
self._poll_cluster_status_tracked(self.cluster_id)
|
|
cluster = self.sahara.get_cluster(self.cluster_id, show_progress=True)
|
|
self._get_proxy(cluster)
|
|
self.check_cinder()
|
|
if self.check_feature_available("provision_progress"):
|
|
self._check_event_logs(cluster)
|
|
|
|
def _get_proxy(self, cluster):
|
|
for ng in cluster.node_groups:
|
|
if ng['is_proxy_gateway']:
|
|
for instance in ng['instances']:
|
|
if instance['management_ip'] != (
|
|
instance['internal_ip']):
|
|
self.proxy = instance['management_ip']
|
|
|
|
@track_result("Check transient")
|
|
def check_transient(self):
|
|
with fixtures.Timeout(
|
|
timeouts.Defaults.instance.timeout_check_transient,
|
|
gentle=True):
|
|
while True:
|
|
if self.sahara.is_resource_deleted(
|
|
self.sahara.get_cluster_status, self.cluster_id):
|
|
break
|
|
time.sleep(5)
|
|
|
|
def _inject_datasources_data(self, arg, input_url, output_url):
|
|
return arg.format(
|
|
input_datasource=input_url, output_datasource=output_url)
|
|
|
|
def _put_io_data_to_configs(self, configs, input_id, output_id):
|
|
input_url, output_url = None, None
|
|
if input_id is not None:
|
|
input_url = self.sahara.get_datasource(
|
|
data_source_id=input_id).url
|
|
if output_id is not None:
|
|
output_url = self.sahara.get_datasource(
|
|
data_source_id=output_id).url
|
|
pl = lambda x: ( # noqa: E731
|
|
self._inject_datasources_data(x, input_url, output_url))
|
|
args = list(map(pl, configs.get('args', [])))
|
|
configs['args'] = args
|
|
return configs
|
|
|
|
def _prepare_job_running(self, job):
|
|
input_id, output_id = self._create_datasources(job)
|
|
main_libs, additional_libs = self._create_job_binaries(job)
|
|
job_id = self._create_job(job['type'], main_libs, additional_libs)
|
|
configs = self._parse_job_configs(job)
|
|
configs = self._put_io_data_to_configs(
|
|
configs, input_id, output_id)
|
|
return [job_id, input_id, output_id, configs]
|
|
|
|
@track_result("Check EDP jobs", False)
|
|
def check_run_jobs(self):
|
|
batching = self.testcase.get('edp_batching',
|
|
len(self.testcase['edp_jobs_flow']))
|
|
batching_size = batching
|
|
jobs = self.testcase.get('edp_jobs_flow', [])
|
|
|
|
pre_exec = []
|
|
for job in jobs:
|
|
pre_exec.append(self._prepare_job_running(job))
|
|
batching -= 1
|
|
if not batching:
|
|
self._job_batching(pre_exec)
|
|
pre_exec = []
|
|
batching = batching_size
|
|
self.check_verification(self.cluster_id)
|
|
|
|
def _job_batching(self, pre_exec):
|
|
job_exec_ids = []
|
|
for job_exec in pre_exec:
|
|
job_exec_ids.append(self._run_job(*job_exec))
|
|
|
|
self._poll_jobs_status(job_exec_ids)
|
|
|
|
def _create_datasources(self, job):
|
|
def create(ds, name):
|
|
credential_vars = {}
|
|
source = ds.get('source', None)
|
|
destination = None if source else utils.rand_name(
|
|
ds['destination'])
|
|
if ds['type'] == 'swift':
|
|
url = self._create_swift_data(source, destination)
|
|
credential_vars = {
|
|
'credential_user': self.credentials['os_username'],
|
|
'credential_pass': self.credentials['os_password']
|
|
}
|
|
elif ds['type'] == 's3':
|
|
url = self._create_s3_data(source, destination)
|
|
credential_vars = {
|
|
's3_credentials': {
|
|
'accesskey': self.credentials['s3_accesskey'],
|
|
'secretkey': self.credentials['s3_secretkey'],
|
|
'endpoint': utils_url.url_schema_remover(
|
|
self.credentials['s3_endpoint']),
|
|
'ssl': self.credentials['s3_endpoint_ssl'],
|
|
'bucket_in_path': self.credentials['s3_bucket_path']
|
|
}
|
|
}
|
|
elif ds['type'] == 'hdfs':
|
|
url = self._create_dfs_data(source, destination,
|
|
self.testcase.get('hdfs_username',
|
|
'hadoop'),
|
|
ds['type'])
|
|
elif ds['type'] == 'maprfs':
|
|
url = self._create_dfs_data(source, destination,
|
|
ds.get('maprfs_username', 'mapr'),
|
|
ds['type'])
|
|
return self.__create_datasource(
|
|
name=utils.rand_name(name),
|
|
description='',
|
|
data_source_type=ds['type'], url=url,
|
|
**credential_vars)
|
|
|
|
input_id, output_id = None, None
|
|
if job.get('input_datasource'):
|
|
ds = job['input_datasource']
|
|
input_id = create(ds, 'input')
|
|
|
|
if job.get('output_datasource'):
|
|
ds = job['output_datasource']
|
|
output_id = create(ds, 'output')
|
|
|
|
return input_id, output_id
|
|
|
|
def _create_job_binaries(self, job):
|
|
main_libs = []
|
|
additional_libs = []
|
|
if job.get('main_lib'):
|
|
main_libs.append(self._create_job_binary(job['main_lib']))
|
|
for add_lib in job.get('additional_libs', []):
|
|
lib_id = self._create_job_binary(add_lib)
|
|
additional_libs.append(lib_id)
|
|
|
|
return main_libs, additional_libs
|
|
|
|
def _create_job_binary(self, job_binary):
|
|
url = None
|
|
extra = {}
|
|
if job_binary['type'] == 'swift':
|
|
url = self._create_swift_data(job_binary['source'])
|
|
extra['user'] = self.credentials['os_username']
|
|
extra['password'] = self.credentials['os_password']
|
|
elif job_binary['type'] == 's3':
|
|
url = self._create_s3_data(job_binary['source'])
|
|
extra['accesskey'] = self.credentials['s3_accesskey']
|
|
extra['secretkey'] = self.credentials['s3_secretkey']
|
|
extra['endpoint'] = self.credentials['s3_endpoint']
|
|
elif job_binary['type'] == 'database':
|
|
url = self._create_internal_db_data(job_binary['source'])
|
|
|
|
job_binary_name = '%s-%s' % (
|
|
utils.rand_name('test'), os.path.basename(job_binary['source']))
|
|
return self.__create_job_binary(job_binary_name, url, '', extra)
|
|
|
|
def _create_job(self, type, mains, libs):
|
|
return self.__create_job(utils.rand_name('test'), type, mains,
|
|
libs, '')
|
|
|
|
def _parse_job_configs(self, job):
|
|
configs = {}
|
|
if job.get('configs'):
|
|
configs['configs'] = {}
|
|
for param, value in six.iteritems(job['configs']):
|
|
configs['configs'][param] = str(value)
|
|
if job.get('args'):
|
|
configs['args'] = list(map(str, job['args']))
|
|
return configs
|
|
|
|
def _run_job(self, job_id, input_id, output_id, configs):
|
|
return self.__run_job(job_id, self.cluster_id, input_id, output_id,
|
|
configs)
|
|
|
|
def _poll_jobs_status(self, exec_ids):
|
|
try:
|
|
with fixtures.Timeout(
|
|
timeouts.Defaults.instance.timeout_poll_jobs_status,
|
|
gentle=True):
|
|
success = False
|
|
polling_ids = list(exec_ids)
|
|
while not success:
|
|
current_ids = list(polling_ids)
|
|
success = True
|
|
for exec_id in polling_ids:
|
|
status = self.sahara.get_job_status(exec_id)
|
|
if status not in ['FAILED', 'KILLED', 'DONEWITHERROR',
|
|
"SUCCEEDED"]:
|
|
success = False
|
|
else:
|
|
current_ids.remove(exec_id)
|
|
polling_ids = list(current_ids)
|
|
time.sleep(5)
|
|
finally:
|
|
report = []
|
|
for exec_id in exec_ids:
|
|
status = self.sahara.get_job_status(exec_id)
|
|
if status != "SUCCEEDED":
|
|
info = self.sahara.get_job_info(exec_id)
|
|
report.append("Job with id={id}, name={name}, "
|
|
"type={type} has status "
|
|
"{status}".format(id=exec_id,
|
|
name=info.name,
|
|
type=info.type,
|
|
status=status))
|
|
if report:
|
|
self.fail("\n".join(report))
|
|
|
|
def _get_file_with_defaults(self, file_path):
|
|
""" Check if the file exists; if it is a relative path, check also
|
|
among the default files.
|
|
"""
|
|
if not file_path:
|
|
return ''
|
|
|
|
all_files = [file_path]
|
|
if not os.path.isabs(file_path):
|
|
# relative path: look into default templates too, if defined
|
|
default_file = os.path.join(self.default_templ_dir, file_path)
|
|
if os.path.abspath(default_file) != os.path.abspath(file_path):
|
|
all_files.append(default_file)
|
|
for checked_file in all_files:
|
|
if os.path.isfile(checked_file):
|
|
return checked_file
|
|
raise Exception('File %s not found while looking into %s' %
|
|
(file_path, all_files))
|
|
|
|
def _read_source_file(self, source):
|
|
if not source:
|
|
return None
|
|
with open(self._get_file_with_defaults(source), 'rb') as source_fd:
|
|
data = source_fd.read()
|
|
return data
|
|
|
|
def _create_swift_data(self, source=None, destination=None):
|
|
container = self._get_swift_container()
|
|
path = utils.rand_name(destination if destination else 'test')
|
|
data = self._read_source_file(source)
|
|
|
|
self.__upload_to_container(container, path, data)
|
|
|
|
return 'swift://%s.sahara/%s' % (container, path)
|
|
|
|
def _create_s3_data(self, source=None, destination=None):
|
|
bucket = self._get_s3_bucket()
|
|
path = utils.rand_name(destination if destination else 'test')
|
|
data = self._read_source_file(source)
|
|
|
|
self.__upload_to_bucket(bucket, path, data)
|
|
|
|
return 's3://%s/%s' % (bucket, path)
|
|
|
|
def _create_dfs_data(self, source, destination, hdfs_username, fs):
|
|
|
|
def to_hex_present(string):
|
|
return "".join(map(lambda x: hex(ord(x)).replace("0x", "\\x"),
|
|
string.decode('utf-8')))
|
|
if destination:
|
|
return destination
|
|
|
|
command_prefixes = {'hdfs': 'hdfs dfs',
|
|
'maprfs': 'hadoop fs'}
|
|
|
|
hdfs_dir = utils.rand_name("/user/%s/data" % hdfs_username)
|
|
instances = self._get_nodes_with_process('namenode')
|
|
if len(instances) == 0:
|
|
instances = self._get_nodes_with_process('CLDB')
|
|
inst_ip = instances[0]["management_ip"]
|
|
self._run_command_on_node(
|
|
inst_ip,
|
|
"sudo su - -c \"%(prefix)s -mkdir -p %(path)s \" %(user)s" % {
|
|
"prefix": command_prefixes[fs],
|
|
"path": hdfs_dir,
|
|
"user": hdfs_username})
|
|
hdfs_filepath = utils.rand_name(hdfs_dir + "/file")
|
|
data = self._read_source_file(source)
|
|
if not data:
|
|
data = ''
|
|
self._run_command_on_node(
|
|
inst_ip,
|
|
("echo -e \"%(data)s\" | sudo su - -c \"%(prefix)s"
|
|
" -put - %(path)s\" %(user)s") % {
|
|
"data": to_hex_present(data),
|
|
"prefix": command_prefixes[fs],
|
|
"path": hdfs_filepath,
|
|
"user": hdfs_username})
|
|
return hdfs_filepath
|
|
|
|
def _create_internal_db_data(self, source):
|
|
data = self._read_source_file(source)
|
|
id = self.__create_internal_db_data(utils.rand_name('test'), data)
|
|
return 'internal-db://%s' % id
|
|
|
|
def _get_swift_container(self):
|
|
if not getattr(self, '__swift_container', None):
|
|
self.__swift_container = self.__create_container(
|
|
utils.rand_name('sahara-tests'))
|
|
return self.__swift_container
|
|
|
|
def _get_s3_bucket(self):
|
|
if not getattr(self, '__s3_bucket', None):
|
|
self.__s3_bucket = self.__create_bucket(
|
|
utils.rand_name('sahara-tests'))
|
|
return self.__s3_bucket
|
|
|
|
@track_result("Cluster scaling", False)
|
|
def check_scale(self):
|
|
scale_ops = []
|
|
ng_before_scale = self.sahara.get_cluster(self.cluster_id).node_groups
|
|
scale_ops = self.testcase['scaling']
|
|
|
|
body = {}
|
|
for op in scale_ops:
|
|
node_scale = op['node_group']
|
|
if op['operation'] == 'add':
|
|
if 'add_node_groups' not in body:
|
|
body['add_node_groups'] = []
|
|
body['add_node_groups'].append({
|
|
'node_group_template_id':
|
|
self.ng_id_map.get(node_scale,
|
|
self.sahara.get_node_group_template_id(
|
|
node_scale)),
|
|
'count': op['size'],
|
|
'name': utils.rand_name(node_scale)
|
|
})
|
|
if op['operation'] == 'resize':
|
|
if 'resize_node_groups' not in body:
|
|
body['resize_node_groups'] = []
|
|
body['resize_node_groups'].append({
|
|
'name': self.ng_name_map.get(
|
|
node_scale,
|
|
self.sahara.get_node_group_template_id(node_scale)),
|
|
'count': op['size']
|
|
})
|
|
|
|
if body:
|
|
self.sahara.scale_cluster(self.cluster_id, body)
|
|
self._poll_cluster_status(self.cluster_id)
|
|
ng_after_scale = self.sahara.get_cluster(
|
|
self.cluster_id).node_groups
|
|
self._validate_scaling(ng_after_scale,
|
|
self._get_expected_count_of_nodes(
|
|
ng_before_scale, body))
|
|
|
|
def _validate_scaling(self, after, expected_count):
|
|
for (key, value) in six.iteritems(expected_count):
|
|
ng = {}
|
|
for after_ng in after:
|
|
if after_ng['name'] == key:
|
|
ng = after_ng
|
|
break
|
|
self.assertEqual(value, ng.get('count', 0))
|
|
|
|
def _get_expected_count_of_nodes(self, before, body):
|
|
expected_mapper = {}
|
|
for ng in before:
|
|
expected_mapper[ng['name']] = ng['count']
|
|
for ng in body.get('add_node_groups', []):
|
|
expected_mapper[ng['name']] = ng['count']
|
|
for ng in body.get('resize_node_groups', []):
|
|
expected_mapper[ng['name']] = ng['count']
|
|
return expected_mapper
|
|
|
|
@track_result("Check cinder volumes")
|
|
def check_cinder(self):
|
|
if not self._get_node_list_with_volumes() or not self.cinder:
|
|
print("All tests for Cinder were skipped")
|
|
return
|
|
for node_with_volumes in self._get_node_list_with_volumes():
|
|
volume_count_on_node = int(self._run_command_on_node(
|
|
node_with_volumes['node_ip'],
|
|
'mount | grep %s | wc -l' %
|
|
node_with_volumes['volume_mount_prefix']
|
|
))
|
|
self.assertEqual(
|
|
node_with_volumes['volume_count'], volume_count_on_node,
|
|
'Some volumes were not mounted to node.\n'
|
|
'Expected count of mounted volumes to node is %s.\n'
|
|
'Actual count of mounted volumes to node is %s.'
|
|
% (node_with_volumes['volume_count'], volume_count_on_node)
|
|
)
|
|
|
|
def _get_node_list_with_volumes(self):
|
|
node_groups = self.sahara.get_cluster(self.cluster_id).node_groups
|
|
node_list_with_volumes = []
|
|
for node_group in node_groups:
|
|
if node_group['volumes_per_node'] != 0:
|
|
for instance in node_group['instances']:
|
|
node_list_with_volumes.append({
|
|
'node_ip': instance['management_ip'],
|
|
'volume_count': node_group['volumes_per_node'],
|
|
'volume_mount_prefix':
|
|
node_group['volume_mount_prefix']
|
|
})
|
|
return node_list_with_volumes
|
|
|
|
@track_result("Create node group templates")
|
|
def _create_node_group_templates(self):
|
|
ng_id_map = {}
|
|
floating_ip_pool = None
|
|
security_group = None
|
|
proxy_exist = False
|
|
|
|
if self.network['public_network']:
|
|
floating_ip_pool = self.neutron.get_network_id(
|
|
self.network['public_network'])
|
|
|
|
node_groups = []
|
|
for ng in self.testcase['node_group_templates']:
|
|
node_groups.append(ng)
|
|
if ng.get('is_proxy_gateway', False):
|
|
proxy_exist = True
|
|
|
|
for ng in node_groups:
|
|
kwargs = dict(ng)
|
|
kwargs.update(self.plugin_opts)
|
|
kwargs['flavor_id'] = self._get_flavor_id(kwargs['flavor'])
|
|
del kwargs['flavor']
|
|
kwargs['name'] = utils.rand_name(kwargs['name'])
|
|
if (not proxy_exist) or (proxy_exist and kwargs.get(
|
|
'is_proxy_gateway', False)):
|
|
kwargs['floating_ip_pool'] = floating_ip_pool
|
|
if not kwargs.get('auto_security_group', True):
|
|
if security_group is None:
|
|
sg_name = utils.rand_name('scenario')
|
|
security_group = self.__create_security_group(sg_name)
|
|
self.neutron.add_security_group_rule_for_neutron(
|
|
security_group)
|
|
kwargs['security_groups'] = [security_group]
|
|
|
|
# boot_from_volume requires APIv2
|
|
if kwargs.get('boot_from_volume', False) and not self.use_api_v2:
|
|
raise Exception('boot_from_volume is set for %s but it '
|
|
'requires APIv2' % (kwargs['name']))
|
|
|
|
ng_id = self.__create_node_group_template(**kwargs)
|
|
ng_id_map[ng['name']] = ng_id
|
|
return ng_id_map
|
|
|
|
@track_result("Set flavor")
|
|
def _get_flavor_id(self, flavor):
|
|
if isinstance(flavor, six.string_types):
|
|
return self.nova.get_flavor_id(flavor)
|
|
else:
|
|
# if the name already exists, use it
|
|
if flavor.get('name'):
|
|
try:
|
|
return self.nova.get_flavor_id(flavor['name'])
|
|
except exc.NotFound:
|
|
print("Custom flavor %s not found, it will be created" %
|
|
(flavor['name']))
|
|
|
|
flavor_id = self.nova.create_flavor(flavor).id
|
|
self.addCleanup(self.nova.delete_flavor, flavor_id)
|
|
return flavor_id
|
|
|
|
@track_result("Create cluster template")
|
|
def _create_cluster_template(self):
|
|
self.ng_name_map = {}
|
|
template = self.testcase['cluster_template']
|
|
|
|
kwargs = dict(template)
|
|
ngs = kwargs['node_group_templates']
|
|
del kwargs['node_group_templates']
|
|
kwargs['node_groups'] = []
|
|
for ng, count in ngs.items():
|
|
ng_name = utils.rand_name(ng)
|
|
self.ng_name_map[ng] = ng_name
|
|
kwargs['node_groups'].append({
|
|
'name': ng_name,
|
|
'node_group_template_id': self.ng_id_map[ng],
|
|
'count': count})
|
|
|
|
kwargs.update(self.plugin_opts)
|
|
kwargs['name'] = utils.rand_name(kwargs.get('name', 'ct'))
|
|
kwargs['net_id'] = self.neutron.get_network_id(
|
|
self.network['private_network'])
|
|
|
|
return self.__create_cluster_template(**kwargs)
|
|
|
|
@track_result("Check event logs")
|
|
def _check_event_logs(self, cluster):
|
|
invalid_steps = []
|
|
if cluster.is_transient:
|
|
# skip event log testing
|
|
return
|
|
|
|
for step in cluster.provision_progress:
|
|
if not step['successful']:
|
|
invalid_steps.append(step)
|
|
|
|
if len(invalid_steps) > 0:
|
|
invalid_steps_info = "\n".join(six.text_type(e)
|
|
for e in invalid_steps)
|
|
steps_info = "\n".join(six.text_type(e)
|
|
for e in cluster.provision_progress)
|
|
raise exc.TempestException(
|
|
"Issues with event log work: "
|
|
"\n Incomplete steps: \n\n {invalid_steps}"
|
|
"\n All steps: \n\n {steps}".format(
|
|
steps=steps_info,
|
|
invalid_steps=invalid_steps_info))
|
|
|
|
@track_result("Create cluster")
|
|
def _create_cluster(self, cluster_template_id):
|
|
if self.testcase.get('cluster'):
|
|
kwargs = dict(self.testcase['cluster'])
|
|
else:
|
|
kwargs = {} # default template
|
|
|
|
kwargs.update(self.plugin_opts)
|
|
kwargs['name'] = utils.rand_name(kwargs.get('name', 'test'))
|
|
kwargs['cluster_template_id'] = cluster_template_id
|
|
kwargs['default_image_id'] = self.glance.get_image_id(
|
|
self.testcase['image'])
|
|
kwargs['user_keypair_id'] = self.key_name
|
|
|
|
return self.__create_cluster(**kwargs)
|
|
|
|
@track_result("Check cluster state")
|
|
def _poll_cluster_status_tracked(self, cluster_id):
|
|
self._poll_cluster_status(cluster_id)
|
|
|
|
def _poll_cluster_status(self, cluster_id):
|
|
with fixtures.Timeout(
|
|
timeouts.Defaults.instance.timeout_poll_cluster_status,
|
|
gentle=True):
|
|
while True:
|
|
status = self.sahara.get_cluster_status(cluster_id)
|
|
if status == CLUSTER_STATUS_ACTIVE:
|
|
break
|
|
if status == CLUSTER_STATUS_ERROR:
|
|
cluster = self.sahara.get_cluster(cluster_id)
|
|
failure_desc = cluster.status_description
|
|
message = ("Cluster in %s state with"
|
|
" a message below:\n%s") % (status,
|
|
failure_desc)
|
|
|
|
raise exc.TempestException(message)
|
|
time.sleep(3)
|
|
|
|
def _run_command_on_node(self, node_ip, command):
|
|
host_ip = node_ip
|
|
if self.proxy:
|
|
host_ip = self.proxy
|
|
command = ("echo '{pkey}' > {filename} && chmod 600 {filename} && "
|
|
"ssh -o StrictHostKeyChecking=no {ip} -i {filename} "
|
|
"'{cmd}' && rm {filename}".format(
|
|
pkey=self.private_key, filename='scenario.pem',
|
|
ip=node_ip, cmd=command))
|
|
ssh_session = connection.Client(host_ip, self.testcase['ssh_username'],
|
|
pkey=self.private_key)
|
|
return ssh_session.exec_command(command)
|
|
|
|
def _get_nodes_with_process(self, process=None):
|
|
if process is not None:
|
|
process = process.lower()
|
|
nodegroups = self.sahara.get_cluster(self.cluster_id).node_groups
|
|
nodes_with_process = []
|
|
for nodegroup in nodegroups:
|
|
for node_process in nodegroup['node_processes']:
|
|
if not process or process in node_process.lower():
|
|
nodes_with_process.extend(nodegroup['instances'])
|
|
return nodes_with_process
|
|
|
|
def _get_health_status(self, cluster):
|
|
try:
|
|
return cluster.verification['status']
|
|
except (AttributeError, KeyError):
|
|
return 'UNKNOWN'
|
|
|
|
def _poll_verification_status(self, cluster_id):
|
|
with fixtures.Timeout(
|
|
timeouts.Defaults.instance.timeout_poll_cluster_status,
|
|
gentle=True):
|
|
while True:
|
|
cluster = self.sahara.get_cluster(cluster_id)
|
|
status = self._get_health_status(cluster)
|
|
if status == 'UNKNOWN':
|
|
print("Cluster verification did not start")
|
|
break
|
|
if status in HEALTH_CHECKS:
|
|
break
|
|
time.sleep(3)
|
|
|
|
@track_result("Check cluster verification")
|
|
def check_verification(self, cluster_id):
|
|
if self.check_feature_available("verification"):
|
|
self._poll_cluster_status(cluster_id)
|
|
|
|
# need to check if previous verification check is not
|
|
# in the status CHECKING
|
|
self._poll_verification_status(cluster_id)
|
|
self.sahara.start_cluster_verification(cluster_id)
|
|
|
|
# check if this verification check finished without errors
|
|
self._poll_verification_status(cluster_id)
|
|
else:
|
|
print("All tests for cluster verification were skipped")
|
|
|
|
# client ops
|
|
|
|
def __create_node_group_template(self, *args, **kwargs):
|
|
id = self.sahara.create_node_group_template(*args, **kwargs)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.sahara.delete_node_group_template, id)
|
|
return id
|
|
|
|
def __create_security_group(self, sg_name):
|
|
id = self.neutron.create_security_group_for_neutron(sg_name)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.neutron.delete_security_group_for_neutron, id)
|
|
return id
|
|
|
|
def __create_cluster_template(self, *args, **kwargs):
|
|
id = self.sahara.create_cluster_template(*args, **kwargs)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.sahara.delete_cluster_template, id)
|
|
return id
|
|
|
|
def __create_cluster(self, *args, **kwargs):
|
|
id = self.sahara.create_cluster(*args, **kwargs)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.sahara.delete_cluster, id)
|
|
return id
|
|
|
|
def __create_datasource(self, *args, **kwargs):
|
|
id = self.sahara.create_datasource(*args, **kwargs)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.sahara.delete_datasource, id)
|
|
return id
|
|
|
|
def __create_internal_db_data(self, *args, **kwargs):
|
|
id = self.sahara.create_job_binary_internal(*args, **kwargs)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.sahara.delete_job_binary_internal, id)
|
|
return id
|
|
|
|
def __create_job_binary(self, *args, **kwargs):
|
|
id = self.sahara.create_job_binary(*args, **kwargs)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.sahara.delete_job_binary, id)
|
|
return id
|
|
|
|
def __create_job(self, *args, **kwargs):
|
|
id = self.sahara.create_job_template(*args, **kwargs)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.sahara.delete_job_template, id)
|
|
return id
|
|
|
|
def __run_job(self, *args, **kwargs):
|
|
id = self.sahara.run_job(*args, **kwargs)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.sahara.delete_job_execution, id)
|
|
return id
|
|
|
|
def __create_container(self, container_name):
|
|
self.swift.create_container(container_name)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.swift.delete_container, container_name)
|
|
return container_name
|
|
|
|
def __upload_to_container(self, container_name, object_name, data=None):
|
|
if data:
|
|
self.swift.upload_data(container_name, object_name, data)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.swift.delete_object, container_name,
|
|
object_name)
|
|
|
|
def __create_bucket(self, bucket_name):
|
|
self.boto.create_bucket(bucket_name)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.boto.delete_bucket, bucket_name)
|
|
return bucket_name
|
|
|
|
def __upload_to_bucket(self, bucket_name, object_name, data=None):
|
|
if data:
|
|
self.boto.upload_data(bucket_name, object_name, data)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.boto.delete_object, bucket_name,
|
|
object_name)
|
|
|
|
def __create_keypair(self):
|
|
key = utils.rand_name('scenario_key')
|
|
self.nova.nova_client.keypairs.create(key,
|
|
public_key=self.public_key)
|
|
if not self.testcase['retain_resources']:
|
|
self.addCleanup(self.nova.delete_keypair, key)
|
|
return key
|
|
|
|
def check_feature_available(self, feature_name):
|
|
if not getattr(self.sahara.get_cluster(self.cluster_id),
|
|
feature_name, None):
|
|
return False
|
|
return True
|
|
|
|
def tearDown(self):
|
|
tbs = []
|
|
table = prettytable.PrettyTable(["Check", "Status", "Duration, s",
|
|
"Start time"])
|
|
table.align["Check"] = "l"
|
|
for check in self._results:
|
|
table.add_row(
|
|
[check['check_name'], check['status'], check['duration'],
|
|
check['start_time']])
|
|
if check['status'] == CHECK_FAILED_STATUS:
|
|
tbs.append(check['exception_time'])
|
|
tbs.extend(check['traceback'])
|
|
tbs.append("")
|
|
print("Results of testing plugin", self.plugin_opts['plugin_name'],
|
|
self.plugin_opts[self.plugin_version_option])
|
|
print(table)
|
|
print("\n".join(tbs), file=sys.stderr)
|
|
|
|
super(BaseTestCase, self).tearDown()
|
|
|
|
test_failed = any([c['status'] == CHECK_FAILED_STATUS
|
|
for c in self._results])
|
|
|
|
if self.report:
|
|
filename = {"time": time.strftime('%Y%m%d%H%M%S',
|
|
time.localtime())}
|
|
filename.update(self.plugin_opts)
|
|
# let's normalize this variable so that we can use
|
|
# a stable name as formatter later.
|
|
if 'hadoop_version' in filename:
|
|
filename['plugin_version'] = filename['hadoop_version']
|
|
del filename['hadoop_version']
|
|
report_file_name = os.path.join(
|
|
self.results_dir,
|
|
'{plugin_name}_{plugin_version}-{time}'.format(**filename))
|
|
time.strftime('%Y%m%d%H%M%S', time.localtime())
|
|
with open(report_file_name, 'w+') as report_file:
|
|
report_file.write(str(self._results))
|
|
print("Results can be found in %s" % report_file_name)
|
|
if test_failed:
|
|
self.fail("Scenario tests failed")
|