New integration tests - EDP

Added EDP supports in tests
Added resource for Java EDP job for vanilla 2.6.0 plugin

partially implements bp: scenario-integration-tests

Change-Id: I6acebd3af5a9ecbe5aa4a80296111b8fba7fc4cd
This commit is contained in:
Sergey Reshetnyak 2015-01-22 15:17:13 +03:00
parent 3557158473
commit 3f88035333
5 changed files with 293 additions and 2 deletions

View File

@ -12,6 +12,7 @@ clusters:
- plugin_name: vanilla
plugin_version: 2.6.0
image: sahara-juno-vanilla-2.6.0-ubuntu-14.04
edp_jobs_flow: test_flow
- plugin_name: hdp
plugin_version: 2.0.6
image: f3c4a228-9ba4-41f1-b100-a0587689d4dd
@ -19,3 +20,27 @@ clusters:
- operation: resize
node_group: hdp-worker
size: 5
edp_jobs_flow:
test_flow:
- type: Pig
input_datasource:
type: swift
source: etc/edp-examples/edp-pig/top-todoers/data/input
output_datasource:
type: hdfs
destination: /user/hadoop/edp-output
main_lib:
type: swift
source: etc/edp-examples/edp-pig/top-todoers/example.pig
configs:
dfs.replication: 1
- type: Java
additional_libs:
- type: database
source: etc/edp-examples/hadoop2/edp-java/hadoop-mapreduce-examples-2.6.0.jar
configs:
edp.java.main_class: org.apache.hadoop.examples.QuasiMonteCarlo
args:
- 10
- 10

View File

@ -17,17 +17,22 @@ from __future__ import print_function
import functools
import glob
import json
import logging
import os
import time
import fixtures
from oslo_utils import excutils
import six
from tempest_lib import base
from tempest_lib import exceptions as exc
from sahara.tests.scenario import clients
from sahara.tests.scenario import utils
logger = logging.getLogger('swiftclient')
logger.setLevel(logging.CRITICAL)
DEFAULT_TEMPLATES_PATH = (
'sahara/tests/scenario/templates/%(plugin_name)s/%(hadoop_version)s')
@ -83,12 +88,145 @@ class BaseTestCase(base.BaseTestCase):
tenant_name=tenant_name,
auth_url=auth_url)
self.swift = clients.SwiftClient(authurl=auth_url,
user=username,
key=password,
tenant_name=tenant_name)
def create_cluster(self):
self.ng_id_map = self._create_node_group_templates()
cl_tmpl_id = self._create_cluster_template()
self.cluster_id = self._create_cluster(cl_tmpl_id)
self._poll_cluster_status(self.cluster_id)
def check_run_jobs(self):
jobs = {}
if self.testcase['edp_jobs_flow']:
jobs = self.testcase['edp_jobs_flow']
else:
jobs = []
pre_exec = []
for job in jobs:
input_id, output_id = self._create_datasources(job)
main_libs, additional_libs = self._create_job_binaries(job)
job_id = self._create_job(job['type'], main_libs, additional_libs)
configs = self._parse_job_configs(job)
pre_exec.append([job_id, input_id, output_id, configs])
job_exec_ids = []
for job_exec in pre_exec:
job_exec_ids.append(self._run_job(*job_exec))
self._poll_jobs_status(job_exec_ids)
def _create_datasources(self, job):
def create(ds, name):
location = ds.get('source', None)
if not location:
location = utils.rand_name(ds['destination'])
if ds['type'] == 'swift':
url = self._create_swift_data(location)
if ds['type'] == 'hdfs':
url = location
return self.__create_datasource(
name=utils.rand_name(name),
description='',
data_source_type=ds['type'], url=url,
credential_user=self.credentials['os_username'],
credential_pass=self.credentials['os_password'])
input_id, output_id = None, None
if job.get('input_datasource'):
ds = job['input_datasource']
input_id = create(ds, 'input')
if job.get('output_datasource'):
ds = job['output_datasource']
output_id = create(ds, 'output')
return input_id, output_id
def _create_job_binaries(self, job):
main_libs = []
additional_libs = []
if job.get('main_lib'):
main_libs.append(self._create_job_binary(job['main_lib']))
for add_lib in job.get('additional_libs', []):
lib_id = self._create_job_binary(add_lib)
additional_libs.append(lib_id)
return main_libs, additional_libs
def _create_job_binary(self, job_binary):
url = None
extra = {}
if job_binary['type'] == 'swift':
url = self._create_swift_data(job_binary['source'])
extra['user'] = self.credentials['os_username']
extra['password'] = self.credentials['os_password']
if job_binary['type'] == 'database':
url = self._create_internal_db_data(job_binary['source'])
job_binary_name = '%s-%s' % (
utils.rand_name('test'), os.path.basename(job_binary['source']))
return self.__create_job_binary(job_binary_name, url, '', extra)
def _create_job(self, type, mains, libs):
return self.__create_job(utils.rand_name('test'), type, mains,
libs, '')
def _parse_job_configs(self, job):
configs = {}
if job.get('configs'):
configs['configs'] = {}
for param, value in six.iteritems(job['configs']):
configs['configs'][param] = str(value)
if job.get('args'):
configs['args'] = map(str, job['args'])
return configs
def _run_job(self, job_id, input_id, output_id, configs):
return self.__run_job(job_id, self.cluster_id, input_id, output_id,
configs)
def _poll_jobs_status(self, exec_ids):
# TODO(sreshetniak): make timeout configurable
with fixtures.Timeout(1800, gentle=True):
success = False
while not success:
success = True
for exec_id in exec_ids:
status = self.sahara.get_job_status(exec_id)
if status in ['FAILED', 'KILLED', 'DONEWITHERROR']:
self.fail("Job %s in %s status" % (exec_id, status))
if status != 'SUCCEEDED':
success = False
time.sleep(5)
def _create_swift_data(self, source=None):
container = self._get_swift_container()
path = utils.rand_name('test')
data = None
if source:
data = open(source).read()
self.__upload_to_container(container, path, data)
return 'swift://%s.sahara/%s' % (container, path)
def _create_internal_db_data(self, source):
data = open(source).read()
id = self.__create_internal_db_data(utils.rand_name('test'), data)
return 'internal-db://%s' % id
def _get_swift_container(self):
if not getattr(self, '__swift_container', None):
self.__swift_container = self.__create_container(
utils.rand_name('sahara-tests'))
return self.__swift_container
@errormsg("Cluster scaling failed")
def check_scale(self):
scale_ops = []
@ -212,7 +350,7 @@ class BaseTestCase(base.BaseTestCase):
raise exc.TempestException("Cluster in %s state" % status)
time.sleep(3)
# sahara client ops
# client ops
def __create_node_group_template(self, *args, **kwargs):
id = self.sahara.create_node_group_template(*args, **kwargs)
@ -231,3 +369,46 @@ class BaseTestCase(base.BaseTestCase):
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_cluster, id)
return id
def __create_datasource(self, *args, **kwargs):
id = self.sahara.create_datasource(*args, **kwargs)
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_datasource, id)
return id
def __create_internal_db_data(self, *args, **kwargs):
id = self.sahara.create_job_binary_internal(*args, **kwargs)
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_job_binary_internal, id)
return id
def __create_job_binary(self, *args, **kwargs):
id = self.sahara.create_job_binary(*args, **kwargs)
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_job_binary, id)
return id
def __create_job(self, *args, **kwargs):
id = self.sahara.create_job(*args, **kwargs)
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_job, id)
return id
def __run_job(self, *args, **kwargs):
id = self.sahara.run_job(*args, **kwargs)
if not self.testcase['retain_resources']:
self.addCleanup(self.sahara.delete_job_execution, id)
return id
def __create_container(self, container_name):
self.swift.create_container(container_name)
if not self.testcase['retain_resources']:
self.addCleanup(self.swift.delete_container, container_name)
return container_name
def __upload_to_container(self, container_name, object_name, data=None):
if data:
self.swift.upload_data(container_name, object_name, data)
if not self.testcase['retain_resources']:
self.addCleanup(self.swift.delete_object, container_name,
object_name)

View File

@ -21,6 +21,8 @@ from novaclient import client as nova_client
from oslo_utils import uuidutils
from saharaclient.api import base as saharaclient_base
from saharaclient import client as sahara_client
from swiftclient import client as swift_client
from swiftclient import exceptions as swift_exc
from tempest_lib import exceptions as exc
@ -71,10 +73,59 @@ class SaharaClient(Client):
def scale_cluster(self, cluster_id, body):
return self.sahara_client.clusters.scale(cluster_id, body)
def create_datasource(self, *args, **kwargs):
data = self.sahara_client.data_sources.create(*args, **kwargs)
return data.id
def delete_datasource(self, datasource_id):
return self.delete_resource(
self.sahara_client.data_sources.delete,
datasource_id)
def create_job_binary_internal(self, *args, **kwargs):
data = self.sahara_client.job_binary_internals.create(*args, **kwargs)
return data.id
def delete_job_binary_internal(self, job_binary_internal_id):
return self.delete_resource(
self.sahara_client.job_binary_internals.delete,
job_binary_internal_id)
def create_job_binary(self, *args, **kwargs):
data = self.sahara_client.job_binaries.create(*args, **kwargs)
return data.id
def delete_job_binary(self, job_binary_id):
return self.delete_resource(
self.sahara_client.job_binaries.delete,
job_binary_id)
def create_job(self, *args, **kwargs):
data = self.sahara_client.jobs.create(*args, **kwargs)
return data.id
def delete_job(self, job_id):
return self.delete_resource(
self.sahara_client.jobs.delete,
job_id)
def run_job(self, *args, **kwargs):
data = self.sahara_client.job_executions.create(*args, **kwargs)
return data.id
def delete_job_execution(self, job_execution_id):
return self.delete_resource(
self.sahara_client.job_executions.delete,
job_execution_id)
def get_cluster_status(self, cluster_id):
data = self.sahara_client.clusters.get(cluster_id)
return str(data.status)
def get_job_status(self, exec_id):
data = self.sahara_client.job_executions.get(exec_id)
return str(data.info['status'])
def is_resource_deleted(self, method, *args, **kwargs):
try:
method(*args, **kwargs)
@ -110,3 +161,34 @@ class NeutronClient(Client):
if len(networks) < 1:
raise exc.NotFound(network_name)
return networks[0]['id']
class SwiftClient(Client):
def __init__(self, *args, **kwargs):
self.swift_client = swift_client.Connection(auth_version='2.0',
*args, **kwargs)
def create_container(self, container_name):
return self.swift_client.put_container(container_name)
def delete_container(self, container_name):
return self.delete_resource(
self.swift_client.delete_container,
container_name)
def upload_data(self, container_name, object_name, data):
return self.swift_client.put_object(container_name, object_name, data)
def delete_object(self, container_name, object_name):
return self.delete_resource(
self.swift_client.delete_object,
container_name,
object_name)
def is_resource_deleted(self, method, *args, **kwargs):
try:
method(*args, **kwargs)
except swift_exc.ClientException as ex:
return ex.http_status == 404
return False

View File

@ -50,7 +50,7 @@ def set_defaults(config):
False)
net['public_network'] = net.get('public_network', 'public')
default_scenario = ['scale']
default_scenario = ['run_jobs', 'scale', 'run_jobs']
# set up tests parameters
for testcase in config['clusters']:
@ -59,6 +59,9 @@ def set_defaults(config):
testcase['plugin_version'].replace('.', '_')])
testcase['retain_resources'] = testcase.get('retain_resources', False)
testcase['scenario'] = testcase.get('scenario', default_scenario)
testcase['edp_jobs_flow'] = (
config.get('edp_jobs_flow', {}).get(
testcase.get('edp_jobs_flow', None), None))
def main():