Parallel testing EDP jobs

implements bp: parallel-testing-edp-jobs

Change-Id: I6b9f7d858f0f01e62efae04e11cbf8818ca8df61
This commit is contained in:
Sergey Reshetnyak 2014-07-31 17:56:01 +04:00
parent 547bec78a4
commit 3686b03da1
8 changed files with 284 additions and 267 deletions

View File

@ -19,7 +19,6 @@ import time
import uuid import uuid
import fixtures import fixtures
from oslo.utils import excutils
import six import six
from sahara.swift import swift_helper as sw from sahara.swift import swift_helper as sw
@ -103,41 +102,54 @@ class EDPTest(base.ITestCase):
self.edp_info = EDPJobInfo() self.edp_info = EDPJobInfo()
def _create_data_source(self, name, data_type, url, description=''): def _create_data_source(self, name, data_type, url, description=''):
return self.sahara.data_sources.create( source_id = self.sahara.data_sources.create(
name, description, data_type, url, self.common_config.OS_USERNAME, name, description, data_type, url, self.common_config.OS_USERNAME,
self.common_config.OS_PASSWORD).id self.common_config.OS_PASSWORD).id
self.addCleanup(self.sahara.data_sources.delete, source_id)
return source_id
def _create_job_binary_internals(self, name, data): def _create_job_binary_internals(self, name, data):
return self.sahara.job_binary_internals.create(name, data).id job_binary_id = self.sahara.job_binary_internals.create(name, data).id
self.addCleanup(self.sahara.job_binary_internals.delete, job_binary_id)
return job_binary_id
def _create_job_binary(self, name, url, extra=None, description=None): def _create_job_binary(self, name, url, extra=None, description=None):
return self.sahara.job_binaries.create( job_binary_id = self.sahara.job_binaries.create(
name, url, description or '', extra or {}).id name, url, description or '', extra or {}).id
self.addCleanup(self.sahara.job_binaries.delete, job_binary_id)
return job_binary_id
def _create_job(self, name, job_type, mains, libs): def _create_job(self, name, job_type, mains, libs):
return self.sahara.jobs.create(name, job_type, mains, libs, job_id = self.sahara.jobs.create(name, job_type, mains, libs,
description='').id description='').id
self.addCleanup(self.sahara.jobs.delete, job_id)
return job_id
def _await_job_execution(self, job): def _get_job_status(self, job_id):
timeout = self.common_config.JOB_LAUNCH_TIMEOUT * 60 return self.sahara.job_executions.get(job_id).info['status']
status = self.sahara.job_executions.get(job.id).info['status']
def poll_jobs_status(self, job_ids):
timeout = self.common_config.JOB_LAUNCH_TIMEOUT * 60 * len(job_ids)
try: try:
with fixtures.Timeout(timeout, gentle=True): with fixtures.Timeout(timeout, gentle=True):
while status != edp.JOB_STATUS_SUCCEEDED: success = False
if status == edp.JOB_STATUS_KILLED: while not success:
self.fail("Job status == '{0}'.".format( success = True
edp.JOB_STATUS_KILLED)) for job_id in job_ids:
status = self._get_job_status(job_id)
time.sleep(10) if status in [edp.JOB_STATUS_FAILED,
status = self.sahara.job_executions.get( edp.JOB_STATUS_KILLED,
job.id).info['status'] edp.JOB_STATUS_DONEWITHERROR]:
self.fail(
'Job status "%s" \'%s\'.' % (job_id, status))
if status != edp.JOB_STATUS_SUCCEEDED:
success = False
time.sleep(5)
except fixtures.TimeoutException: except fixtures.TimeoutException:
self.fail( self.fail(
"Job did not return to '{0}' status within {1:d} minute(s)." "Jobs did not return to '{0}' status within {1:d} minute(s)."
.format(edp.JOB_STATUS_SUCCEEDED, .format(edp.JOB_STATUS_SUCCEEDED, timeout / 60))
self.common_config.JOB_LAUNCH_TIMEOUT)
)
def _create_job_binaries(self, job_data_list, job_binary_internal_list, def _create_job_binaries(self, job_data_list, job_binary_internal_list,
job_binary_list, swift_connection=None, job_binary_list, swift_connection=None,
@ -172,23 +184,6 @@ class EDPTest(base.ITestCase):
) )
) )
def _delete_job(self, execution_job, job_id, job_binary_list,
job_binary_internal_list, input_id, output_id):
if execution_job:
self.sahara.job_executions.delete(execution_job.id)
if job_id:
self.sahara.jobs.delete(job_id)
if job_binary_list:
for job_binary_id in job_binary_list:
self.sahara.job_binaries.delete(job_binary_id)
if job_binary_internal_list:
for internal_id in job_binary_internal_list:
self.sahara.job_binary_internals.delete(internal_id)
if input_id:
self.sahara.data_sources.delete(input_id)
if output_id:
self.sahara.data_sources.delete(output_id)
def _add_swift_configs(self, configs): def _add_swift_configs(self, configs):
if "configs" not in configs: if "configs" not in configs:
@ -201,8 +196,7 @@ class EDPTest(base.ITestCase):
configs["configs"][ configs["configs"][
sw.HADOOP_SWIFT_PASSWORD] = self.common_config.OS_PASSWORD sw.HADOOP_SWIFT_PASSWORD] = self.common_config.OS_PASSWORD
@base.skip_test('SKIP_EDP_TEST', @base.skip_test('SKIP_EDP_TEST', 'Test for EDP was skipped.')
'Test for EDP was skipped.')
def edp_testing(self, job_type, job_data_list, lib_data_list=None, def edp_testing(self, job_type, job_data_list, lib_data_list=None,
configs=None, pass_input_output_args=False, configs=None, pass_input_output_args=False,
swift_binaries=False, hdfs_local_output=False): swift_binaries=False, hdfs_local_output=False):
@ -210,112 +204,94 @@ class EDPTest(base.ITestCase):
lib_data_list = lib_data_list or [] lib_data_list = lib_data_list or []
configs = configs or {} configs = configs or {}
try: swift = self.connect_to_swift()
swift = self.connect_to_swift() container_name = 'Edp-test-%s' % str(uuid.uuid4())[:8]
container_name = 'Edp-test-%s' % str(uuid.uuid4())[:8] swift.put_container(container_name)
swift.put_container(container_name) self.addCleanup(self.delete_swift_container, swift, container_name)
swift.put_object( swift.put_object(
container_name, 'input', ''.join( container_name, 'input', ''.join(
random.choice(':' + ' ' + '\n' + string.ascii_lowercase) random.choice(':' + ' ' + '\n' + string.ascii_lowercase)
for x in six.moves.range(10000) for x in six.moves.range(10000)
)
) )
)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_swift_container(swift, container_name)
print(str(e))
input_id = None input_id = None
output_id = None output_id = None
job_id = None job_id = None
job_execution = None job_execution = None
try: job_binary_list = []
job_binary_list = [] lib_binary_list = []
lib_binary_list = [] job_binary_internal_list = []
job_binary_internal_list = []
swift_input_url = 'swift://%s.sahara/input' % container_name swift_input_url = 'swift://%s.sahara/input' % container_name
if hdfs_local_output: if hdfs_local_output:
# This will create a file in hdfs under the user # This will create a file in hdfs under the user
# executing the job (i.e. /usr/hadoop/Edp-test-xxxx-out) # executing the job (i.e. /usr/hadoop/Edp-test-xxxx-out)
output_type = "hdfs" output_type = "hdfs"
output_url = container_name + "-out" output_url = container_name + "-out"
else:
output_type = "swift"
output_url = 'swift://%s.sahara/output' % container_name
# Java jobs don't use data sources. Input/output paths must
# be passed as args with corresponding username/password configs
if not edp.compare_job_type(job_type,
edp.JOB_TYPE_JAVA,
edp.JOB_TYPE_SPARK):
input_id = self._create_data_source(
'input-%s' % str(uuid.uuid4())[:8], 'swift',
swift_input_url)
output_id = self._create_data_source(
'output-%s' % str(uuid.uuid4())[:8], output_type,
output_url)
if job_data_list:
if swift_binaries:
self._create_job_binaries(job_data_list,
job_binary_internal_list,
job_binary_list,
swift_connection=swift,
container_name=container_name)
else: else:
output_type = "swift" self._create_job_binaries(job_data_list,
output_url = 'swift://%s.sahara/output' % container_name job_binary_internal_list,
job_binary_list)
# Java jobs don't use data sources. Input/output paths must if lib_data_list:
# be passed as args with corresponding username/password configs if swift_binaries:
if not edp.compare_job_type(job_type, self._create_job_binaries(lib_data_list,
edp.JOB_TYPE_JAVA, job_binary_internal_list,
edp.JOB_TYPE_SPARK): lib_binary_list,
input_id = self._create_data_source( swift_connection=swift,
'input-%s' % str(uuid.uuid4())[:8], 'swift', container_name=container_name)
swift_input_url) else:
output_id = self._create_data_source( self._create_job_binaries(lib_data_list,
'output-%s' % str(uuid.uuid4())[:8], output_type, job_binary_internal_list,
output_url) lib_binary_list)
if job_data_list: job_id = self._create_job(
if swift_binaries: 'Edp-test-job-%s' % str(uuid.uuid4())[:8], job_type,
self._create_job_binaries(job_data_list, job_binary_list, lib_binary_list)
job_binary_internal_list, if not configs:
job_binary_list, configs = {}
swift_connection=swift,
container_name=container_name)
else:
self._create_job_binaries(job_data_list,
job_binary_internal_list,
job_binary_list)
if lib_data_list: # TODO(tmckay): for spark we don't have support for swift
if swift_binaries: # yet. When we do, we'll need something to here to set up
self._create_job_binaries(lib_data_list, # swift paths and we can use a spark wordcount job
job_binary_internal_list,
lib_binary_list,
swift_connection=swift,
container_name=container_name)
else:
self._create_job_binaries(lib_data_list,
job_binary_internal_list,
lib_binary_list)
job_id = self._create_job( # Append the input/output paths with the swift configs
'Edp-test-job-%s' % str(uuid.uuid4())[:8], job_type, # if the caller has requested it...
job_binary_list, lib_binary_list) if edp.compare_job_type(
if not configs: job_type, edp.JOB_TYPE_JAVA) and pass_input_output_args:
configs = {} self._add_swift_configs(configs)
if "args" in configs:
configs["args"].extend([swift_input_url, output_url])
else:
configs["args"] = [swift_input_url, output_url]
# TODO(tmckay): for spark we don't have support for swift job_execution = self.sahara.job_executions.create(
# yet. When we do, we'll need something to here to set up job_id, self.cluster_id, input_id, output_id,
# swift paths and we can use a spark wordcount job configs=configs)
self.addCleanup(self.sahara.job_executions.delete, job_execution.id)
# Append the input/output paths with the swift configs return job_execution.id
# if the caller has requested it...
if edp.compare_job_type(
job_type, edp.JOB_TYPE_JAVA) and pass_input_output_args:
self._add_swift_configs(configs)
if "args" in configs:
configs["args"].extend([swift_input_url,
output_url])
else:
configs["args"] = [swift_input_url,
output_url]
job_execution = self.sahara.job_executions.create(
job_id, self.cluster_id, input_id, output_id,
configs=configs)
if job_execution:
self._await_job_execution(job_execution)
except Exception as e:
with excutils.save_and_reraise_exception():
print(str(e))
finally:
self.delete_swift_container(swift, container_name)
self._delete_job(
job_execution, job_id, job_binary_list + lib_binary_list,
job_binary_internal_list, input_id, output_id
)

View File

@ -199,35 +199,38 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
@b.errormsg("Failure while EDP testing: ") @b.errormsg("Failure while EDP testing: ")
def _check_edp(self): def _check_edp(self):
self._edp_test() self.poll_jobs_status(list(self._run_edp_test()))
def _edp_test(self): def _run_edp_test(self):
# check pig # check pig
pig_job = self.edp_info.read_pig_example_script() pig_job = self.edp_info.read_pig_example_script()
pig_lib = self.edp_info.read_pig_example_jar() pig_lib = self.edp_info.read_pig_example_jar()
self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG, yield self.edp_testing(
job_data_list=[{'pig': pig_job}], job_type=utils_edp.JOB_TYPE_PIG,
lib_data_list=[{'jar': pig_lib}], job_data_list=[{'pig': pig_job}],
swift_binaries=False, lib_data_list=[{'jar': pig_lib}],
hdfs_local_output=True) swift_binaries=False,
hdfs_local_output=True)
# check mapreduce # check mapreduce
mapreduce_jar = self.edp_info.read_mapreduce_example_jar() mapreduce_jar = self.edp_info.read_mapreduce_example_jar()
mapreduce_configs = self.edp_info.mapreduce_example_configs() mapreduce_configs = self.edp_info.mapreduce_example_configs()
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE, yield self.edp_testing(
job_data_list=[], job_type=utils_edp.JOB_TYPE_MAPREDUCE,
lib_data_list=[{'jar': mapreduce_jar}], job_data_list=[],
configs=mapreduce_configs, lib_data_list=[{'jar': mapreduce_jar}],
swift_binaries=False, configs=mapreduce_configs,
hdfs_local_output=True) swift_binaries=False,
hdfs_local_output=True)
# check mapreduce streaming # check mapreduce streaming
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, yield self.edp_testing(
job_data_list=[], job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
lib_data_list=[], job_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs(), lib_data_list=[],
swift_binaries=False, configs=self.edp_info.mapreduce_streaming_configs(),
hdfs_local_output=True) swift_binaries=False,
hdfs_local_output=True)
@b.errormsg("Failure while cluster scaling: ") @b.errormsg("Failure while cluster scaling: ")
def _check_scaling(self): def _check_scaling(self):
@ -278,7 +281,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
@b.errormsg("Failure while EDP testing after cluster scaling: ") @b.errormsg("Failure while EDP testing after cluster scaling: ")
def _check_edp_after_scaling(self): def _check_edp_after_scaling(self):
self._edp_test() self._check_edp()
@testcase.skipIf( @testcase.skipIf(
cfg.ITConfig().cdh_config.SKIP_ALL_TESTS_FOR_PLUGIN, cfg.ITConfig().cdh_config.SKIP_ALL_TESTS_FOR_PLUGIN,

View File

@ -124,42 +124,46 @@ class HDP2GatingTest(swift.SwiftTest, scaling.ScalingTest,
@b.errormsg("Failure while EDP testing: ") @b.errormsg("Failure while EDP testing: ")
def _check_edp(self): def _check_edp(self):
self._edp_test() self.poll_jobs_status(list(self._run_edp_test()))
def _edp_test(self): def _run_edp_test(self):
# check pig # check pig
pig_job = self.edp_info.read_pig_example_script() pig_job = self.edp_info.read_pig_example_script()
pig_lib = self.edp_info.read_pig_example_jar() pig_lib = self.edp_info.read_pig_example_jar()
self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG, yield self.edp_testing(
job_data_list=[{'pig': pig_job}], job_type=utils_edp.JOB_TYPE_PIG,
lib_data_list=[{'jar': pig_lib}], job_data_list=[{'pig': pig_job}],
swift_binaries=True, lib_data_list=[{'jar': pig_lib}],
hdfs_local_output=True) swift_binaries=True,
hdfs_local_output=True)
# check mapreduce # check mapreduce
mapreduce_jar = self.edp_info.read_mapreduce_example_jar() mapreduce_jar = self.edp_info.read_mapreduce_example_jar()
mapreduce_configs = self.edp_info.mapreduce_example_configs() mapreduce_configs = self.edp_info.mapreduce_example_configs()
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE, yield self.edp_testing(
job_data_list=[], job_type=utils_edp.JOB_TYPE_MAPREDUCE,
lib_data_list=[{'jar': mapreduce_jar}], job_data_list=[],
configs=mapreduce_configs, lib_data_list=[{'jar': mapreduce_jar}],
swift_binaries=True, configs=mapreduce_configs,
hdfs_local_output=True) swift_binaries=True,
hdfs_local_output=True)
# check mapreduce streaming # check mapreduce streaming
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, yield self.edp_testing(
job_data_list=[], job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
lib_data_list=[], job_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs()) lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs())
# check java # check java
java_jar = self.edp_info.read_java_example_lib(2) java_jar = self.edp_info.read_java_example_lib(2)
java_configs = self.edp_info.java_example_configs(2) java_configs = self.edp_info.java_example_configs(2)
self.edp_testing(utils_edp.JOB_TYPE_JAVA, yield self.edp_testing(
job_data_list=[], utils_edp.JOB_TYPE_JAVA,
lib_data_list=[{'jar': java_jar}], job_data_list=[],
configs=java_configs) lib_data_list=[{'jar': java_jar}],
configs=java_configs)
@b.errormsg("Failure while cluster scaling: ") @b.errormsg("Failure while cluster scaling: ")
def _check_scaling(self): def _check_scaling(self):
@ -192,7 +196,7 @@ class HDP2GatingTest(swift.SwiftTest, scaling.ScalingTest,
@b.errormsg("Failure while EDP testing after cluster scaling: ") @b.errormsg("Failure while EDP testing after cluster scaling: ")
def _check_edp_after_scaling(self): def _check_edp_after_scaling(self):
self._edp_test() self._check_edp()
@testcase.attr('hdp2') @testcase.attr('hdp2')
@testcase.skipIf(config.SKIP_ALL_TESTS_FOR_PLUGIN, @testcase.skipIf(config.SKIP_ALL_TESTS_FOR_PLUGIN,

View File

@ -166,27 +166,39 @@ class HDPGatingTest(cinder.CinderVolumeTest, edp.EDPTest,
java_lib_data = self.edp_info.read_java_example_lib() java_lib_data = self.edp_info.read_java_example_lib()
try: try:
self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG, job_ids = []
job_data_list=[{'pig': pig_job_data}], job_id = self.edp_testing(
lib_data_list=[{'jar': pig_lib_data}], job_type=utils_edp.JOB_TYPE_PIG,
swift_binaries=True, job_data_list=[{'pig': pig_job_data}],
hdfs_local_output=True) lib_data_list=[{'jar': pig_lib_data}],
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE, swift_binaries=True,
job_data_list=[], hdfs_local_output=True)
lib_data_list=[{'jar': mapreduce_jar_data}], job_ids.append(job_id)
configs=self.edp_info.mapreduce_example_configs(),
swift_binaries=True, job_id = self.edp_testing(
hdfs_local_output=True) job_type=utils_edp.JOB_TYPE_MAPREDUCE,
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, job_data_list=[],
job_data_list=[], lib_data_list=[{'jar': mapreduce_jar_data}],
lib_data_list=[], configs=self.edp_info.mapreduce_example_configs(),
configs=( swift_binaries=True,
self.edp_info.mapreduce_streaming_configs())) hdfs_local_output=True)
self.edp_testing(job_type=utils_edp.JOB_TYPE_JAVA, job_ids.append(job_id)
job_data_list=[],
lib_data_list=[{'jar': java_lib_data}], job_id = self.edp_testing(
configs=self.edp_info.java_example_configs(), job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
pass_input_output_args=True) job_data_list=[],
lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs())
job_ids.append(job_id)
job_id = self.edp_testing(
job_type=utils_edp.JOB_TYPE_JAVA,
job_data_list=[],
lib_data_list=[{'jar': java_lib_data}],
configs=self.edp_info.java_example_configs(),
pass_input_output_args=True)
job_ids.append(job_id)
self.poll_jobs_status(job_ids)
except Exception as e: except Exception as e:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():

View File

@ -121,10 +121,12 @@ class SparkGatingTest(swift.SwiftTest, scaling.ScalingTest,
# check spark # check spark
spark_jar = self.edp_info.read_spark_example_jar() spark_jar = self.edp_info.read_spark_example_jar()
spark_configs = self.edp_info.spark_example_configs() spark_configs = self.edp_info.spark_example_configs()
self.edp_testing(utils_edp.JOB_TYPE_SPARK, job_id = self.edp_testing(
job_data_list=[{'jar': spark_jar}], utils_edp.JOB_TYPE_SPARK,
lib_data_list=[], job_data_list=[{'jar': spark_jar}],
configs=spark_configs) lib_data_list=[],
configs=spark_configs)
self.poll_jobs_status([job_id])
@b.errormsg("Failure while cluster scaling: ") @b.errormsg("Failure while cluster scaling: ")
def _check_scaling(self): def _check_scaling(self):

View File

@ -101,9 +101,10 @@ class TransientGatingTest(edp.EDPTest):
def _check_transient(self): def _check_transient(self):
pig_job_data = self.edp_info.read_pig_example_script() pig_job_data = self.edp_info.read_pig_example_script()
pig_lib_data = self.edp_info.read_pig_example_jar() pig_lib_data = self.edp_info.read_pig_example_jar()
self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG, job_id = self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job_data}], job_data_list=[{'pig': pig_job_data}],
lib_data_list=[{'jar': pig_lib_data}]) lib_data_list=[{'jar': pig_lib_data}])
self.poll_jobs_status([job_id])
# set timeout in seconds # set timeout in seconds
timeout = self.common_config.TRANSIENT_CLUSTER_TIMEOUT * 60 timeout = self.common_config.TRANSIENT_CLUSTER_TIMEOUT * 60

View File

@ -255,40 +255,40 @@ class VanillaGatingTest(cinder.CinderVolumeTest,
# This is a modified version of WordCount that takes swift configs # This is a modified version of WordCount that takes swift configs
java_lib_data = self.edp_info.read_java_example_lib() java_lib_data = self.edp_info.read_java_example_lib()
try: job_ids = []
self.edp_testing( job_id = self.edp_testing(
job_type=utils_edp.JOB_TYPE_PIG, job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job_data}], job_data_list=[{'pig': pig_job_data}],
lib_data_list=[{'jar': pig_lib_data}], lib_data_list=[{'jar': pig_lib_data}],
swift_binaries=True, swift_binaries=True,
hdfs_local_output=True) hdfs_local_output=True)
self.edp_testing( job_ids.append(job_id)
job_type=utils_edp.JOB_TYPE_MAPREDUCE,
job_data_list=[],
lib_data_list=[{'jar': mapreduce_jar_data}],
configs=self.edp_info.mapreduce_example_configs(),
swift_binaries=True,
hdfs_local_output=True)
self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
job_data_list=[],
lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs())
self.edp_testing(
job_type=utils_edp.JOB_TYPE_JAVA,
job_data_list=[],
lib_data_list=[{'jar': java_lib_data}],
configs=self.edp_info.java_example_configs(),
pass_input_output_args=True)
except Exception as e: job_id = self.edp_testing(
with excutils.save_and_reraise_exception(): job_type=utils_edp.JOB_TYPE_MAPREDUCE,
self.delete_objects( job_data_list=[],
cluster_info['cluster_id'], cluster_template_id, lib_data_list=[{'jar': mapreduce_jar_data}],
node_group_template_id_list configs=self.edp_info.mapreduce_example_configs(),
) swift_binaries=True,
message = 'Failure while EDP testing: ' hdfs_local_output=True)
self.print_error_log(message, e) job_ids.append(job_id)
job_id = self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
job_data_list=[],
lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs())
job_ids.append(job_id)
job_id = self.edp_testing(
job_type=utils_edp.JOB_TYPE_JAVA,
job_data_list=[],
lib_data_list=[{'jar': java_lib_data}],
configs=self.edp_info.java_example_configs(),
pass_input_output_args=True)
job_ids.append(job_id)
self.poll_jobs_status(job_ids)
edp_test() edp_test()

View File

@ -63,6 +63,17 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
self.volumes_per_node = 2 self.volumes_per_node = 2
self.volume_size = 2 self.volume_size = 2
ng_params = {
'MapReduce': {
'yarn.app.mapreduce.am.resource.mb': 256,
'yarn.app.mapreduce.am.command-opts': '-Xmx256m'
},
'YARN': {
'yarn.scheduler.minimum-allocation-mb': 256,
'yarn.scheduler.maximum-allocation-mb': 1024
}
}
@b.errormsg("Failure while 'nm-dn' node group template creation: ") @b.errormsg("Failure while 'nm-dn' node group template creation: ")
def _create_nm_dn_ng_template(self): def _create_nm_dn_ng_template(self):
template = { template = {
@ -71,7 +82,7 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
'description': 'test node group template for Vanilla plugin', 'description': 'test node group template for Vanilla plugin',
'node_processes': ['nodemanager', 'datanode'], 'node_processes': ['nodemanager', 'datanode'],
'floating_ip_pool': self.floating_ip_pool, 'floating_ip_pool': self.floating_ip_pool,
'node_configs': {} 'node_configs': self.ng_params
} }
self.ng_tmpl_nm_dn_id = self.create_node_group_template(**template) self.ng_tmpl_nm_dn_id = self.create_node_group_template(**template)
self.ng_template_ids.append(self.ng_tmpl_nm_dn_id) self.ng_template_ids.append(self.ng_tmpl_nm_dn_id)
@ -86,7 +97,7 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
'volume_size': self.volume_size, 'volume_size': self.volume_size,
'node_processes': ['nodemanager'], 'node_processes': ['nodemanager'],
'floating_ip_pool': self.floating_ip_pool, 'floating_ip_pool': self.floating_ip_pool,
'node_configs': {} 'node_configs': self.ng_params
} }
self.ng_tmpl_nm_id = self.create_node_group_template(**template) self.ng_tmpl_nm_id = self.create_node_group_template(**template)
self.ng_template_ids.append(self.ng_tmpl_nm_id) self.ng_template_ids.append(self.ng_tmpl_nm_id)
@ -101,7 +112,7 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
'volume_size': self.volume_size, 'volume_size': self.volume_size,
'node_processes': ['datanode'], 'node_processes': ['datanode'],
'floating_ip_pool': self.floating_ip_pool, 'floating_ip_pool': self.floating_ip_pool,
'node_configs': {} 'node_configs': self.ng_params
} }
self.ng_tmpl_dn_id = self.create_node_group_template(**template) self.ng_tmpl_dn_id = self.create_node_group_template(**template)
self.ng_template_ids.append(self.ng_tmpl_dn_id) self.ng_template_ids.append(self.ng_tmpl_dn_id)
@ -123,7 +134,8 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
'flavor_id': self.flavor_id, 'flavor_id': self.flavor_id,
'node_processes': ['namenode', 'resourcemanager'], 'node_processes': ['namenode', 'resourcemanager'],
'floating_ip_pool': self.floating_ip_pool, 'floating_ip_pool': self.floating_ip_pool,
'count': 1 'count': 1,
'node_configs': self.ng_params
}, },
{ {
'name': 'master-node-oo-hs', 'name': 'master-node-oo-hs',
@ -131,7 +143,8 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
'node_processes': ['oozie', 'historyserver', 'node_processes': ['oozie', 'historyserver',
'secondarynamenode'], 'secondarynamenode'],
'floating_ip_pool': self.floating_ip_pool, 'floating_ip_pool': self.floating_ip_pool,
'count': 1 'count': 1,
'node_configs': self.ng_params
}, },
{ {
'name': 'worker-node-nm-dn', 'name': 'worker-node-nm-dn',
@ -183,51 +196,57 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
@b.errormsg("Failure while EDP testing: ") @b.errormsg("Failure while EDP testing: ")
def _check_edp(self): def _check_edp(self):
self.poll_jobs_status(list(self._run_edp_tests()))
def _run_edp_tests(self):
skipped_edp_job_types = self.vanilla_two_config.SKIP_EDP_JOB_TYPES skipped_edp_job_types = self.vanilla_two_config.SKIP_EDP_JOB_TYPES
if utils_edp.JOB_TYPE_PIG not in skipped_edp_job_types: if utils_edp.JOB_TYPE_PIG not in skipped_edp_job_types:
self._edp_pig_test() yield self._edp_pig_test()
if utils_edp.JOB_TYPE_MAPREDUCE not in skipped_edp_job_types: if utils_edp.JOB_TYPE_MAPREDUCE not in skipped_edp_job_types:
self._edp_mapreduce_test() yield self._edp_mapreduce_test()
if utils_edp.JOB_TYPE_MAPREDUCE_STREAMING not in skipped_edp_job_types: if utils_edp.JOB_TYPE_MAPREDUCE_STREAMING not in skipped_edp_job_types:
self._edp_mapreduce_streaming_test() yield self._edp_mapreduce_streaming_test()
if utils_edp.JOB_TYPE_JAVA not in skipped_edp_job_types: if utils_edp.JOB_TYPE_JAVA not in skipped_edp_job_types:
self._edp_java_test() yield self._edp_java_test()
def _edp_pig_test(self): def _edp_pig_test(self):
pig_job = self.edp_info.read_pig_example_script() pig_job = self.edp_info.read_pig_example_script()
pig_lib = self.edp_info.read_pig_example_jar() pig_lib = self.edp_info.read_pig_example_jar()
self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG, return self.edp_testing(
job_data_list=[{'pig': pig_job}], job_type=utils_edp.JOB_TYPE_PIG,
lib_data_list=[{'jar': pig_lib}], job_data_list=[{'pig': pig_job}],
swift_binaries=True, lib_data_list=[{'jar': pig_lib}],
hdfs_local_output=True) swift_binaries=True,
hdfs_local_output=True)
def _edp_mapreduce_test(self): def _edp_mapreduce_test(self):
mapreduce_jar = self.edp_info.read_mapreduce_example_jar() mapreduce_jar = self.edp_info.read_mapreduce_example_jar()
mapreduce_configs = self.edp_info.mapreduce_example_configs() mapreduce_configs = self.edp_info.mapreduce_example_configs()
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE, return self.edp_testing(
job_data_list=[], job_type=utils_edp.JOB_TYPE_MAPREDUCE,
lib_data_list=[{'jar': mapreduce_jar}], job_data_list=[],
configs=mapreduce_configs, lib_data_list=[{'jar': mapreduce_jar}],
swift_binaries=True, configs=mapreduce_configs,
hdfs_local_output=True) swift_binaries=True,
hdfs_local_output=True)
def _edp_mapreduce_streaming_test(self): def _edp_mapreduce_streaming_test(self):
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, return self.edp_testing(
job_data_list=[], job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
lib_data_list=[], job_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs()) lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs())
def _edp_java_test(self): def _edp_java_test(self):
java_jar = self.edp_info.read_java_example_lib(2) java_jar = self.edp_info.read_java_example_lib(2)
java_configs = self.edp_info.java_example_configs(2) java_configs = self.edp_info.java_example_configs(2)
self.edp_testing(utils_edp.JOB_TYPE_JAVA, return self.edp_testing(
job_data_list=[], utils_edp.JOB_TYPE_JAVA,
lib_data_list=[{'jar': java_jar}], job_data_list=[],
configs=java_configs) lib_data_list=[{'jar': java_jar}],
configs=java_configs)
@b.errormsg("Failure while cluster scaling: ") @b.errormsg("Failure while cluster scaling: ")
def _check_scaling(self): def _check_scaling(self):