sahara-tests/sahara_tests/scenario/base.py

910 lines
37 KiB
Python

# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import functools
import logging
import os
import sys
import time
import traceback
import fixtures
from oslo_utils import timeutils
import prettytable
import six
from tempest.lib import base
from tempest.lib.common import ssh as connection
from tempest.lib import exceptions as exc
from sahara_tests.scenario import clients
from sahara_tests.scenario import timeouts
from sahara_tests.scenario import utils
from sahara_tests.utils import crypto as ssh
from sahara_tests.utils import url as utils_url
logger = logging.getLogger('swiftclient')
logger.setLevel(logging.CRITICAL)
CHECK_OK_STATUS = "OK"
CHECK_FAILED_STATUS = "FAILED"
CLUSTER_STATUS_ACTIVE = "Active"
CLUSTER_STATUS_ERROR = "Error"
HEALTH_CHECKS = ["RED", "YELLOW", "GREEN"]
def track_result(check_name, exit_with_error=True):
def decorator(fct):
@functools.wraps(fct)
def wrapper(self, *args, **kwargs):
started_at = timeutils.utcnow()
test_info = {
'check_name': check_name,
'status': CHECK_OK_STATUS,
'start_time': started_at,
'duration': None,
'traceback': None,
'exception_time': None
}
self._results.append(test_info)
try:
return fct(self, *args, **kwargs)
except Exception:
test_info['exception_time'] = timeutils.utcnow().strftime(
'%Y%m%d_%H%M%S')
test_info['status'] = CHECK_FAILED_STATUS
test_info['traceback'] = traceback.format_exception(
*sys.exc_info())
if exit_with_error:
raise
finally:
test_time = timeutils.utcnow() - started_at
test_info['duration'] = test_time.seconds
return wrapper
return decorator
class BaseTestCase(base.BaseTestCase):
@classmethod
def setUpClass(cls):
super(BaseTestCase, cls).setUpClass()
cls.network = None
cls.credentials = None
cls.testcase = None
cls._results = []
cls.report = False
cls.results_dir = '.'
cls.default_templ_dir = '.'
cls.use_api_v2 = False
def setUp(self):
super(BaseTestCase, self).setUp()
self._init_clients()
timeouts.Defaults.init_defaults(self.testcase)
self.testcase['ssh_username'] = self.sahara.register_image(
self.glance.get_image_id(self.testcase['image']),
self.testcase).username
self.key = self.testcase.get('key_name')
if self.key is None:
self.private_key, self.public_key = ssh.generate_key_pair()
self.key_name = self.__create_keypair()
# save the private key if retain_resources is specified
# (useful for debugging purposes)
if self.testcase['retain_resources'] or self.key is None:
private_key_file_name = os.path.join(self.results_dir,
self.key_name + '.key')
with open(private_key_file_name, 'w+') as private_key_file:
private_key_file.write(self.private_key)
os.chmod(private_key_file_name, 0o600)
self.plugin_version_option = 'plugin_version'
if not self.use_api_v2:
self.plugin_version_option = 'hadoop_version'
self.plugin_opts = {
'plugin_name': self.testcase['plugin_name'],
self.plugin_version_option: self.testcase['plugin_version']
}
self.cinder = True
self.proxy = False
def _init_clients(self):
username = self.credentials['os_username']
password = self.credentials['os_password']
tenant_name = self.credentials['os_tenant']
auth_url = self.credentials['os_auth_url']
sahara_service_type = self.credentials['sahara_service_type']
sahara_url = self.credentials['sahara_url']
auth_version = '3.0' if 'v3' in auth_url else '2.0'
session = clients.get_session(auth_url, username, password,
tenant_name,
self.credentials.get('ssl_verify',
False),
self._get_file_with_defaults(
self.credentials.get('ssl_cert')))
api_version = '2' if self.use_api_v2 else '1.1'
self.sahara = clients.SaharaClient(session=session,
service_type=sahara_service_type,
sahara_url=sahara_url,
api_version=api_version)
self.nova = clients.NovaClient(session=session)
self.neutron = clients.NeutronClient(session=session)
# swiftclient doesn't support keystone sessions
self.swift = clients.SwiftClient(
auth_version=auth_version,
authurl=auth_url,
user=username,
key=password,
insecure=not self.credentials.get('ssl_verify', False),
cacert=self.credentials.get('ssl_cert'),
tenant_name=tenant_name)
self.glance = clients.GlanceClient(session=session)
# boto is not an OpenStack client, but we can handle it as well
self.boto = None
if self.credentials.get("s3_endpoint", None):
self.boto = clients.BotoClient(
endpoint=self.credentials["s3_endpoint"],
accesskey=self.credentials["s3_accesskey"],
secretkey=self.credentials["s3_secretkey"])
def create_cluster(self):
self.cluster_id = self.sahara.get_cluster_id(
self.testcase.get('existing_cluster'))
self.ng_id_map = {}
if self.cluster_id is None:
self.ng_id_map = self._create_node_group_templates()
cl_tmpl_id = self._create_cluster_template()
self.cluster_id = self._create_cluster(cl_tmpl_id)
elif self.key is None:
self.cinder = False
self._poll_cluster_status_tracked(self.cluster_id)
cluster = self.sahara.get_cluster(self.cluster_id, show_progress=True)
self._get_proxy(cluster)
self.check_cinder()
if self.check_feature_available("provision_progress"):
self._check_event_logs(cluster)
def _get_proxy(self, cluster):
for ng in cluster.node_groups:
if ng['is_proxy_gateway']:
for instance in ng['instances']:
if instance['management_ip'] != (
instance['internal_ip']):
self.proxy = instance['management_ip']
@track_result("Check transient")
def check_transient(self):
with fixtures.Timeout(
timeouts.Defaults.instance.timeout_check_transient,
gentle=True):
while True:
if self.sahara.is_resource_deleted(
self.sahara.get_cluster_status, self.cluster_id):
break
time.sleep(5)
def _inject_datasources_data(self, arg, input_url, output_url):
return arg.format(
input_datasource=input_url, output_datasource=output_url)
def _put_io_data_to_configs(self, configs, input_id, output_id):
input_url, output_url = None, None
if input_id is not None:
input_url = self.sahara.get_datasource(
data_source_id=input_id).url
if output_id is not None:
output_url = self.sahara.get_datasource(
data_source_id=output_id).url
pl = lambda x: ( # noqa: E731
self._inject_datasources_data(x, input_url, output_url))
args = list(map(pl, configs.get('args', [])))
configs['args'] = args
return configs
def _prepare_job_running(self, job):
input_id, output_id = self._create_datasources(job)
main_libs, additional_libs = self._create_job_binaries(job)
job_id = self._create_job(job['type'], main_libs, additional_libs)
configs = self._parse_job_configs(job)
configs = self._put_io_data_to_configs(
configs, input_id, output_id)
return [job_id, input_id, output_id, configs]
@track_result("Check EDP jobs", False)
def check_run_jobs(self):
batching = self.testcase.get('edp_batching',
len(self.testcase['edp_jobs_flow']))
batching_size = batching
jobs = self.testcase.get('edp_jobs_flow', [])
pre_exec = []
for job in jobs:
pre_exec.append(self._prepare_job_running(job))
batching -= 1
if not batching:
self._job_batching(pre_exec)
pre_exec = []
batching = batching_size
self.check_verification(self.cluster_id)
def _job_batching(self, pre_exec):
job_exec_ids = []
for job_exec in pre_exec:
job_exec_ids.append(self._run_job(*job_exec))
self._poll_jobs_status(job_exec_ids)
def _create_datasources(self, job):
def create(ds, name):
credential_vars = {}
source = ds.get('source', None)
destination = None if source else utils.rand_name(
ds['destination'])
if ds['type'] == 'swift':
url = self._create_swift_data(source, destination)
credential_vars = {
'credential_user': self.credentials['os_username'],
'credential_pass': self.credentials['os_password']
}
elif ds['type'] == 's3':
url = self._create_s3_data(source, destination)
credential_vars = {
's3_credentials': {
'accesskey': self.credentials['s3_accesskey'],
'secretkey': self.credentials['s3_secretkey'],
'endpoint': utils_url.url_schema_remover(
self.credentials['s3_endpoint']),
'ssl': self.credentials['s3_endpoint_ssl'],
'bucket_in_path': self.credentials['s3_bucket_path']
}
}
elif ds['type'] == 'hdfs':
url = self._create_dfs_data(source, destination,
self.testcase.get('hdfs_username',
'hadoop'),
ds['type'])
elif ds['type'] == 'maprfs':
url = self._create_dfs_data(source, destination,
ds.get('maprfs_username', 'mapr'),
ds['type'])
return self.__create_datasource(
name=utils.rand_name(name),
description='',
data_source_type=ds['type'], url=url,
**credential_vars)
input_id, output_id = None, None
if job.get('input_datasource'):
ds = job['input_datasource']
input_id = create(ds, 'input')
if job.get('output_datasource'):
ds = job['output_datasource']
output_id = create(ds, 'output')
return input_id, output_id
def _create_job_binaries(self, job):
main_libs = []
additional_libs = []
if job.get('main_lib'):
main_libs.append(self._create_job_binary(job['main_lib']))
for add_lib in job.get('additional_libs', []):
lib_id = self._create_job_binary(add_lib)
additional_libs.append(lib_id)
return main_libs, additional_libs
def _create_job_binary(self, job_binary):
url = None
extra = {}
if job_binary['type'] == 'swift':
url = self._create_swift_data(job_binary['source'])
extra['user'] = self.credentials['os_username']
extra['password'] = self.credentials['os_password']
elif job_binary['type'] == 's3':
url = self._create_s3_data(job_binary['source'])
extra['accesskey'] = self.credentials['s3_accesskey']
extra['secretkey'] = self.credentials['s3_secretkey']
extra['endpoint'] = self.credentials['s3_endpoint']
elif job_binary['type'] == 'database':
url = self._create_internal_db_data(job_binary['source'])
job_binary_name = '%s-%s' % (
utils.rand_name('test'), os.path.basename(job_binary['source']))
return self.__create_job_binary(job_binary_name, url, '', extra)
def _create_job(self, type, mains, libs):
return self.__create_job(utils.rand_name('test'), type, mains,
libs, '')
def _parse_job_configs(self, job):
configs = {}
if job.get('configs'):
configs['configs'] = {}
for param, value in six.iteritems(job['configs']):
configs['configs'][param] = str(value)
if job.get('args'):
configs['args'] = list(map(str, job['args']))
return configs
def _run_job(self, job_id, input_id, output_id, configs):
return self.__run_job(job_id, self.cluster_id, input_id, output_id,
configs)
def _poll_jobs_status(self, exec_ids):
try:
with fixtures.Timeout(
timeouts.Defaults.instance.timeout_poll_jobs_status,
gentle=True):
success = False
polling_ids = list(exec_ids)
while not success:
current_ids = list(polling_ids)
success = True
for exec_id in polling_ids:
status = self.sahara.get_job_status(exec_id)
if status not in ['FAILED', 'KILLED', 'DONEWITHERROR',
"SUCCEEDED"]:
success = False
else:
current_ids.remove(exec_id)
polling_ids = list(current_ids)
time.sleep(5)
finally:
report = []
for exec_id in exec_ids:
status = self.sahara.get_job_status(exec_id)
if status != "SUCCEEDED":
info = self.sahara.get_job_info(exec_id)
report.append("Job with id={id}, name={name}, "
"type={type} has status "
"{status}".format(id=exec_id,
name=info.name,
type=info.type,
status=status))
if report:
self.fail("\n".join(report))
def _get_file_with_defaults(self, file_path):
""" Check if the file exists; if it is a relative path, check also
among the default files.
"""
if not file_path:
return ''
all_files = [file_path]
if not os.path.isabs(file_path):
# relative path: look into default templates too, if defined
default_file = os.path.join(self.default_templ_dir, file_path)
if os.path.abspath(default_file) != os.path.abspath(file_path):
all_files.append(default_file)
for checked_file in all_files:
if os.path.isfile(checked_file):
return checked_file
raise Exception('File %s not found while looking into %s' %
(file_path, all_files))
def _read_source_file(self, source):
if not source:
return None
with open(self._get_file_with_defaults(source), 'rb') as source_fd:
data = source_fd.read()
return data
def _create_swift_data(self, source=None, destination=None):
container = self._get_swift_container()
path = utils.rand_name(destination if destination else 'test')
data = self._read_source_file(source)
self.__upload_to_container(container, path, data)
return 'swift://%s.sahara/%s' % (container, path)
def _create_s3_data(self, source=None, destination=None):
bucket = self._get_s3_bucket()
path = utils.rand_name(destination if destination else 'test')
data = self._read_source_file(source)
self.__upload_to_bucket(bucket, path, data)
return 's3://%s/%s' % (bucket, path)
def _create_dfs_data(self, source, destination, hdfs_username, fs):
def to_hex_present(string):
return "".join(map(lambda x: hex(ord(x)).replace("0x", "\\x"),
string.decode('utf-8')))
if destination:
return destination
command_prefixes = {'hdfs': 'hdfs dfs',
'maprfs': 'hadoop fs'}
hdfs_dir = utils.rand_name("/user/%s/data" % hdfs_username)
instances = self._get_nodes_with_process('namenode')
if len(instances) == 0:
instances = self._get_nodes_with_process('CLDB')
inst_ip = instances[0]["management_ip"]
self._run_command_on_node(
inst_ip,
"sudo su - -c \"%(prefix)s -mkdir -p %(path)s \" %(user)s" % {
"prefix": command_prefixes[fs],
"path": hdfs_dir,
"user": hdfs_username})
hdfs_filepath = utils.rand_name(hdfs_dir + "/file")
data = self._read_source_file(source)
if not data:
data = ''
self._run_command_on_node(
inst_ip,
("echo -e \"%(data)s\" | sudo su - -c \"%(prefix)s"
" -put - %(path)s\" %(user)s") % {
"data": to_hex_present(data),
"prefix": command_prefixes[fs],
"path": hdfs_filepath,
"user": hdfs_username})
return hdfs_filepath
def _create_internal_db_data(self, source):
data = self._read_source_file(source)
id = self.__create_internal_db_data(utils.rand_name('test'), data)
return 'internal-db://%s' % id
def _get_swift_container(self):
if not getattr(self, '__swift_container', None):
self.__swift_container = self.__create_container(
utils.rand_name('sahara-tests'))
return self.__swift_container
def _get_s3_bucket(self):
if not getattr(self, '__s3_bucket', None):
self.__s3_bucket = self.__create_bucket(
utils.rand_name('sahara-tests'))
return self.__s3_bucket
@track_result("Cluster scaling", False)
def check_scale(self):
scale_ops = []
ng_before_scale = self.sahara.get_cluster(self.cluster_id).node_groups
scale_ops = self.testcase['scaling']
body = {}
for op in scale_ops:
node_scale = op['node_group']
if op['operation'] == 'add':
if 'add_node_groups' not in body:
body['add_node_groups'] = []
body['add_node_groups'].append({
'node_group_template_id':
self.ng_id_map.get(node_scale,
self.sahara.get_node_group_template_id(
node_scale)),
'count': op['size'],
'name': utils.rand_name(node_scale)
})
if op['operation'] == 'resize':
if 'resize_node_groups' not in body:
body['resize_node_groups'] = []
body['resize_node_groups'].append({
'name': self.ng_name_map.get(
node_scale,
self.sahara.get_node_group_template_id(node_scale)),
'count': op['size']
})
if body:
self.sahara.scale_cluster(self.cluster_id, body)
self._poll_cluster_status(self.cluster_id)
ng_after_scale = self.sahara.get_cluster(
self.cluster_id).node_groups
self._validate_scaling(ng_after_scale,
self._get_expected_count_of_nodes(
ng_before_scale, body))
def _validate_scaling(self, after, expected_count):
for (key, value) in six.iteritems(expected_count):
ng = {}
for after_ng in after:
if after_ng['name'] == key:
ng = after_ng
break
self.assertEqual(value, ng.get('count', 0))
def _get_expected_count_of_nodes(self, before, body):
expected_mapper = {}
for ng in before:
expected_mapper[ng['name']] = ng['count']
for ng in body.get('add_node_groups', []):
expected_mapper[ng['name']] = ng['count']
for ng in body.get('resize_node_groups', []):
expected_mapper[ng['name']] = ng['count']
return expected_mapper
@track_result("Check cinder volumes")
def check_cinder(self):
if not self._get_node_list_with_volumes() or not self.cinder:
print("All tests for Cinder were skipped")
return
for node_with_volumes in self._get_node_list_with_volumes():
volume_count_on_node = int(self._run_command_on_node(
node_with_volumes['node_ip'],
'mount | grep %s | wc -l' %
node_with_volumes['volume_mount_prefix']
))
self.assertEqual(
node_with_volumes['volume_count'], volume_count_on_node,
'Some volumes were not mounted to node.\n'
'Expected count of mounted volumes to node is %s.\n'
'Actual count of mounted volumes to node is %s.'
% (node_with_volumes['volume_count'], volume_count_on_node)
)
def _get_node_list_with_volumes(self):
node_groups = self.sahara.get_cluster(self.cluster_id).node_groups
node_list_with_volumes = []
for node_group in node_groups:
if node_group['volumes_per_node'] != 0:
for instance in node_group['instances']:
node_list_with_volumes.append({
'node_ip': instance['management_ip'],
'volume_count': node_group['volumes_per_node'],
'volume_mount_prefix':
node_group['volume_mount_prefix']
})
return node_list_with_volumes
@track_result("Create node group templates")
def _create_node_group_templates(self):
ng_id_map = {}
floating_ip_pool = None
security_group = None
proxy_exist = False
if self.network['public_network']:
floating_ip_pool = self.neutron.get_network_id(
self.network['public_network'])
node_groups = []
for ng in self.testcase['node_group_templates']:
node_groups.append(ng)
if ng.get('is_proxy_gateway', False):
proxy_exist = True
for ng in node_groups:
kwargs = dict(ng)
kwargs.update(self.plugin_opts)
kwargs['flavor_id'] = self._get_flavor_id(kwargs['flavor'])
del kwargs['flavor']
kwargs['name'] = utils.rand_name(kwargs['name'])
if (not proxy_exist) or (proxy_exist and kwargs.get(
'is_proxy_gateway', False)):
kwargs['floating_ip_pool'] = floating_ip_pool
if not kwargs.get('auto_security_group', True):
if security_group is None:
sg_name = utils.rand_name('scenario')
security_group = self.__create_security_group(sg_name)
self.neutron.add_security_group_rule_for_neutron(
security_group)
kwargs['security_groups'] = [security_group]
# boot_from_volume requires APIv2
if kwargs.get('boot_from_volume', False) and not self.use_api_v2:
raise Exception('boot_from_volume is set for %s but it '
'requires APIv2' % (kwargs['name']))
ng_id = self.__create_node_group_template(**kwargs)
ng_id_map[ng['name']] = ng_id
return ng_id_map
@track_result("Set flavor")
def _get_flavor_id(self, flavor):
if isinstance(flavor, six.string_types):
return self.nova.get_flavor_id(flavor)
else:
# if the name already exists, use it
if flavor.get('name'):
try:
return self.nova.get_flavor_id(flavor['name'])
except exc.NotFound:
print("Custom flavor %s not found, it will be created" %
(flavor['name']))
flavor_id = self.nova.create_flavor(flavor).id
self.addCleanup(self.nova.delete_flavor, flavor_id)
return flavor_id
@track_result("Create cluster template")
def _create_cluster_template(self):
self.ng_name_map = {}
template = self.testcase['cluster_template']
kwargs = dict(template)
ngs = kwargs['node_group_templates']
del kwargs['node_group_templates']
kwargs['node_groups'] = []
for ng, count in ngs.items():
ng_name = utils.rand_name(ng)
self.ng_name_map[ng] = ng_name
kwargs['node_groups'].append({
'name': ng_name,
'node_group_template_id': self.ng_id_map[ng],
'count': count})
kwargs.update(self.plugin_opts)
kwargs['name'] = utils.rand_name(kwargs.get('name', 'ct'))
kwargs['net_id'] = self.neutron.get_network_id(
self.network['private_network'])
return self.__create_cluster_template(**kwargs)
@track_result("Check event logs")
def _check_event_logs(self, cluster):
invalid_steps = []
if cluster.is_transient:
# skip event log testing
return
for step in cluster.provision_progress:
if not step['successful']:
invalid_steps.append(step)
if len(invalid_steps) > 0:
invalid_steps_info = "\n".join(six.text_type(e)
for e in invalid_steps)
steps_info = "\n".join(six.text_type(e)
for e in cluster.provision_progress)
raise exc.TempestException(
"Issues with event log work: "
"\n Incomplete steps: \n\n {invalid_steps}"
"\n All steps: \n\n {steps}".format(
steps=steps_info,
invalid_steps=invalid_steps_info))
@track_result("Create cluster")
def _create_cluster(self, cluster_template_id):
if self.testcase.get('cluster'):
kwargs = dict(self.testcase['cluster'])
else:
kwargs = {} # default template
kwargs.update(self.plugin_opts)
kwargs['name'] = utils.rand_name(kwargs.get('name', 'test'))
kwargs['cluster_template_id'] = cluster_template_id
kwargs['default_image_id'] = self.glance.get_image_id(
self.testcase['image'])
kwargs['user_keypair_id'] = self.key_name
return self.__create_cluster(**kwargs)
@track_result("Check cluster state")
def _poll_cluster_status_tracked(self, cluster_id):
self._poll_cluster_status(cluster_id)
def _poll_cluster_status(self, cluster_id):
with fixtures.Timeout(
timeouts.Defaults.instance.timeout_poll_cluster_status,
gentle=True):
while True:
status = self.sahara.get_cluster_status(cluster_id)
if status == CLUSTER_STATUS_ACTIVE:
break
if status == CLUSTER_STATUS_ERROR:
cluster = self.sahara.get_cluster(cluster_id)
failure_desc = cluster.status_description
message = ("Cluster in %s state with"
" a message below:\n%s") % (status,
failure_desc)
raise exc.TempestException(message)
time.sleep(3)
def _run_command_on_node(self, node_ip, command):
host_ip = node_ip
if self.proxy:
host_ip = self.proxy
command = ("echo '{pkey}' > {filename} && chmod 600 {filename} && "
"ssh -o StrictHostKeyChecking=no {ip} -i {filename} "
"'{cmd}' && rm {filename}".format(
pkey=self.private_key, filename='scenario.pem',
ip=node_ip, cmd=command))
ssh_session = connection.Client(host_ip, self.testcase['ssh_username'],
pkey=self.private_key)
return ssh_session.exec_command(command)
def _get_nodes_with_process(self, process=None):
if process is not None:
process = process.lower()
nodegroups = self.sahara.get_cluster(self.cluster_id).node_groups
nodes_with_process = []
for nodegroup in nodegroups:
for node_process in nodegroup['node_processes']:
if not process or process in node_process.lower():
nodes_with_process.extend(nodegroup['instances'])
return nodes_with_process
def _get_health_status(self, cluster):
try:
return cluster.verification['status']
except (AttributeError, KeyError):
return 'UNKNOWN'
def _poll_verification_status(self, cluster_id):
with fixtures.Timeout(
timeouts.Defaults.instance.timeout_poll_cluster_status,
gentle=True):
while True:
cluster = self.sahara.get_cluster(cluster_id)
status = self._get_health_status(cluster)
if status == 'UNKNOWN':
print("Cluster verification did not start")
break
if status in HEALTH_CHECKS:
break
time.sleep(3)
@track_result("Check cluster verification")
def check_verification(self, cluster_id):
if self.check_feature_available("verification"):
self._poll_cluster_status(cluster_id)
# need to check if previous verification check is not
# in the status CHECKING
self._poll_verification_status(cluster_id)
self.sahara.start_cluster_verification(cluster_id)
# check if this verification check finished without errors
self._poll_verification_status(cluster_id)
else:
print("All tests for cluster verification were skipped")
# client ops
def __create_node_group_template(self, *args, **kwargs):
id = self.sahara.create_node_group_template(*args, **kwargs)
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_node_group_template, id)
return id
def __create_security_group(self, sg_name):
id = self.neutron.create_security_group_for_neutron(sg_name)
if not self.testcase['retain_resources']:
self.addCleanup(self.neutron.delete_security_group_for_neutron, id)
return id
def __create_cluster_template(self, *args, **kwargs):
id = self.sahara.create_cluster_template(*args, **kwargs)
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_cluster_template, id)
return id
def __create_cluster(self, *args, **kwargs):
id = self.sahara.create_cluster(*args, **kwargs)
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_cluster, id)
return id
def __create_datasource(self, *args, **kwargs):
id = self.sahara.create_datasource(*args, **kwargs)
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_datasource, id)
return id
def __create_internal_db_data(self, *args, **kwargs):
id = self.sahara.create_job_binary_internal(*args, **kwargs)
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_job_binary_internal, id)
return id
def __create_job_binary(self, *args, **kwargs):
id = self.sahara.create_job_binary(*args, **kwargs)
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_job_binary, id)
return id
def __create_job(self, *args, **kwargs):
id = self.sahara.create_job_template(*args, **kwargs)
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_job_template, id)
return id
def __run_job(self, *args, **kwargs):
id = self.sahara.run_job(*args, **kwargs)
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_job_execution, id)
return id
def __create_container(self, container_name):
self.swift.create_container(container_name)
if not self.testcase['retain_resources']:
self.addCleanup(self.swift.delete_container, container_name)
return container_name
def __upload_to_container(self, container_name, object_name, data=None):
if data:
self.swift.upload_data(container_name, object_name, data)
if not self.testcase['retain_resources']:
self.addCleanup(self.swift.delete_object, container_name,
object_name)
def __create_bucket(self, bucket_name):
self.boto.create_bucket(bucket_name)
if not self.testcase['retain_resources']:
self.addCleanup(self.boto.delete_bucket, bucket_name)
return bucket_name
def __upload_to_bucket(self, bucket_name, object_name, data=None):
if data:
self.boto.upload_data(bucket_name, object_name, data)
if not self.testcase['retain_resources']:
self.addCleanup(self.boto.delete_object, bucket_name,
object_name)
def __create_keypair(self):
key = utils.rand_name('scenario_key')
self.nova.nova_client.keypairs.create(key,
public_key=self.public_key)
if not self.testcase['retain_resources']:
self.addCleanup(self.nova.delete_keypair, key)
return key
def check_feature_available(self, feature_name):
if not getattr(self.sahara.get_cluster(self.cluster_id),
feature_name, None):
return False
return True
def tearDown(self):
tbs = []
table = prettytable.PrettyTable(["Check", "Status", "Duration, s",
"Start time"])
table.align["Check"] = "l"
for check in self._results:
table.add_row(
[check['check_name'], check['status'], check['duration'],
check['start_time']])
if check['status'] == CHECK_FAILED_STATUS:
tbs.append(check['exception_time'])
tbs.extend(check['traceback'])
tbs.append("")
print("Results of testing plugin", self.plugin_opts['plugin_name'],
self.plugin_opts[self.plugin_version_option])
print(table)
print("\n".join(tbs), file=sys.stderr)
super(BaseTestCase, self).tearDown()
test_failed = any([c['status'] == CHECK_FAILED_STATUS
for c in self._results])
if self.report:
filename = {"time": time.strftime('%Y%m%d%H%M%S',
time.localtime())}
filename.update(self.plugin_opts)
# let's normalize this variable so that we can use
# a stable name as formatter later.
if 'hadoop_version' in filename:
filename['plugin_version'] = filename['hadoop_version']
del filename['hadoop_version']
report_file_name = os.path.join(
self.results_dir,
'{plugin_name}_{plugin_version}-{time}'.format(**filename))
time.strftime('%Y%m%d%H%M%S', time.localtime())
with open(report_file_name, 'w+') as report_file:
report_file.write(str(self._results))
print("Results can be found in %s" % report_file_name)
if test_failed:
self.fail("Scenario tests failed")