diff --git a/sahara/tests/integration/tests/edp.py b/sahara/tests/integration/tests/edp.py index 8a4eb02f..40933f23 100644 --- a/sahara/tests/integration/tests/edp.py +++ b/sahara/tests/integration/tests/edp.py @@ -19,7 +19,6 @@ import time import uuid import fixtures -from oslo.utils import excutils import six from sahara.swift import swift_helper as sw @@ -103,41 +102,54 @@ class EDPTest(base.ITestCase): self.edp_info = EDPJobInfo() def _create_data_source(self, name, data_type, url, description=''): - return self.sahara.data_sources.create( + source_id = self.sahara.data_sources.create( name, description, data_type, url, self.common_config.OS_USERNAME, self.common_config.OS_PASSWORD).id + self.addCleanup(self.sahara.data_sources.delete, source_id) + return source_id def _create_job_binary_internals(self, name, data): - return self.sahara.job_binary_internals.create(name, data).id + job_binary_id = self.sahara.job_binary_internals.create(name, data).id + self.addCleanup(self.sahara.job_binary_internals.delete, job_binary_id) + return job_binary_id def _create_job_binary(self, name, url, extra=None, description=None): - return self.sahara.job_binaries.create( + job_binary_id = self.sahara.job_binaries.create( name, url, description or '', extra or {}).id + self.addCleanup(self.sahara.job_binaries.delete, job_binary_id) + return job_binary_id def _create_job(self, name, job_type, mains, libs): - return self.sahara.jobs.create(name, job_type, mains, libs, - description='').id + job_id = self.sahara.jobs.create(name, job_type, mains, libs, + description='').id + self.addCleanup(self.sahara.jobs.delete, job_id) + return job_id - def _await_job_execution(self, job): - timeout = self.common_config.JOB_LAUNCH_TIMEOUT * 60 - status = self.sahara.job_executions.get(job.id).info['status'] + def _get_job_status(self, job_id): + return self.sahara.job_executions.get(job_id).info['status'] + + def poll_jobs_status(self, job_ids): + timeout = self.common_config.JOB_LAUNCH_TIMEOUT * 60 * len(job_ids) try: with fixtures.Timeout(timeout, gentle=True): - while status != edp.JOB_STATUS_SUCCEEDED: - if status == edp.JOB_STATUS_KILLED: - self.fail("Job status == '{0}'.".format( - edp.JOB_STATUS_KILLED)) - - time.sleep(10) - status = self.sahara.job_executions.get( - job.id).info['status'] + success = False + while not success: + success = True + for job_id in job_ids: + status = self._get_job_status(job_id) + if status in [edp.JOB_STATUS_FAILED, + edp.JOB_STATUS_KILLED, + edp.JOB_STATUS_DONEWITHERROR]: + self.fail( + 'Job status "%s" \'%s\'.' % (job_id, status)) + if status != edp.JOB_STATUS_SUCCEEDED: + success = False + time.sleep(5) except fixtures.TimeoutException: self.fail( - "Job did not return to '{0}' status within {1:d} minute(s)." - .format(edp.JOB_STATUS_SUCCEEDED, - self.common_config.JOB_LAUNCH_TIMEOUT) - ) + "Jobs did not return to '{0}' status within {1:d} minute(s)." + .format(edp.JOB_STATUS_SUCCEEDED, timeout / 60)) def _create_job_binaries(self, job_data_list, job_binary_internal_list, job_binary_list, swift_connection=None, @@ -172,23 +184,6 @@ class EDPTest(base.ITestCase): ) ) - def _delete_job(self, execution_job, job_id, job_binary_list, - job_binary_internal_list, input_id, output_id): - if execution_job: - self.sahara.job_executions.delete(execution_job.id) - if job_id: - self.sahara.jobs.delete(job_id) - if job_binary_list: - for job_binary_id in job_binary_list: - self.sahara.job_binaries.delete(job_binary_id) - if job_binary_internal_list: - for internal_id in job_binary_internal_list: - self.sahara.job_binary_internals.delete(internal_id) - if input_id: - self.sahara.data_sources.delete(input_id) - if output_id: - self.sahara.data_sources.delete(output_id) - def _add_swift_configs(self, configs): if "configs" not in configs: @@ -201,8 +196,7 @@ class EDPTest(base.ITestCase): configs["configs"][ sw.HADOOP_SWIFT_PASSWORD] = self.common_config.OS_PASSWORD - @base.skip_test('SKIP_EDP_TEST', - 'Test for EDP was skipped.') + @base.skip_test('SKIP_EDP_TEST', 'Test for EDP was skipped.') def edp_testing(self, job_type, job_data_list, lib_data_list=None, configs=None, pass_input_output_args=False, swift_binaries=False, hdfs_local_output=False): @@ -210,112 +204,94 @@ class EDPTest(base.ITestCase): lib_data_list = lib_data_list or [] configs = configs or {} - try: - swift = self.connect_to_swift() - container_name = 'Edp-test-%s' % str(uuid.uuid4())[:8] - swift.put_container(container_name) - swift.put_object( - container_name, 'input', ''.join( - random.choice(':' + ' ' + '\n' + string.ascii_lowercase) - for x in six.moves.range(10000) - ) + swift = self.connect_to_swift() + container_name = 'Edp-test-%s' % str(uuid.uuid4())[:8] + swift.put_container(container_name) + self.addCleanup(self.delete_swift_container, swift, container_name) + swift.put_object( + container_name, 'input', ''.join( + random.choice(':' + ' ' + '\n' + string.ascii_lowercase) + for x in six.moves.range(10000) ) + ) - except Exception as e: - with excutils.save_and_reraise_exception(): - self.delete_swift_container(swift, container_name) - print(str(e)) input_id = None output_id = None job_id = None job_execution = None - try: - job_binary_list = [] - lib_binary_list = [] - job_binary_internal_list = [] + job_binary_list = [] + lib_binary_list = [] + job_binary_internal_list = [] - swift_input_url = 'swift://%s.sahara/input' % container_name - if hdfs_local_output: - # This will create a file in hdfs under the user - # executing the job (i.e. /usr/hadoop/Edp-test-xxxx-out) - output_type = "hdfs" - output_url = container_name + "-out" + swift_input_url = 'swift://%s.sahara/input' % container_name + if hdfs_local_output: + # This will create a file in hdfs under the user + # executing the job (i.e. /usr/hadoop/Edp-test-xxxx-out) + output_type = "hdfs" + output_url = container_name + "-out" + else: + output_type = "swift" + output_url = 'swift://%s.sahara/output' % container_name + + # Java jobs don't use data sources. Input/output paths must + # be passed as args with corresponding username/password configs + if not edp.compare_job_type(job_type, + edp.JOB_TYPE_JAVA, + edp.JOB_TYPE_SPARK): + input_id = self._create_data_source( + 'input-%s' % str(uuid.uuid4())[:8], 'swift', + swift_input_url) + output_id = self._create_data_source( + 'output-%s' % str(uuid.uuid4())[:8], output_type, + output_url) + + if job_data_list: + if swift_binaries: + self._create_job_binaries(job_data_list, + job_binary_internal_list, + job_binary_list, + swift_connection=swift, + container_name=container_name) else: - output_type = "swift" - output_url = 'swift://%s.sahara/output' % container_name + self._create_job_binaries(job_data_list, + job_binary_internal_list, + job_binary_list) - # Java jobs don't use data sources. Input/output paths must - # be passed as args with corresponding username/password configs - if not edp.compare_job_type(job_type, - edp.JOB_TYPE_JAVA, - edp.JOB_TYPE_SPARK): - input_id = self._create_data_source( - 'input-%s' % str(uuid.uuid4())[:8], 'swift', - swift_input_url) - output_id = self._create_data_source( - 'output-%s' % str(uuid.uuid4())[:8], output_type, - output_url) + if lib_data_list: + if swift_binaries: + self._create_job_binaries(lib_data_list, + job_binary_internal_list, + lib_binary_list, + swift_connection=swift, + container_name=container_name) + else: + self._create_job_binaries(lib_data_list, + job_binary_internal_list, + lib_binary_list) - if job_data_list: - if swift_binaries: - self._create_job_binaries(job_data_list, - job_binary_internal_list, - job_binary_list, - swift_connection=swift, - container_name=container_name) - else: - self._create_job_binaries(job_data_list, - job_binary_internal_list, - job_binary_list) + job_id = self._create_job( + 'Edp-test-job-%s' % str(uuid.uuid4())[:8], job_type, + job_binary_list, lib_binary_list) + if not configs: + configs = {} - if lib_data_list: - if swift_binaries: - self._create_job_binaries(lib_data_list, - job_binary_internal_list, - lib_binary_list, - swift_connection=swift, - container_name=container_name) - else: - self._create_job_binaries(lib_data_list, - job_binary_internal_list, - lib_binary_list) + # TODO(tmckay): for spark we don't have support for swift + # yet. When we do, we'll need something to here to set up + # swift paths and we can use a spark wordcount job - job_id = self._create_job( - 'Edp-test-job-%s' % str(uuid.uuid4())[:8], job_type, - job_binary_list, lib_binary_list) - if not configs: - configs = {} + # Append the input/output paths with the swift configs + # if the caller has requested it... + if edp.compare_job_type( + job_type, edp.JOB_TYPE_JAVA) and pass_input_output_args: + self._add_swift_configs(configs) + if "args" in configs: + configs["args"].extend([swift_input_url, output_url]) + else: + configs["args"] = [swift_input_url, output_url] - # TODO(tmckay): for spark we don't have support for swift - # yet. When we do, we'll need something to here to set up - # swift paths and we can use a spark wordcount job + job_execution = self.sahara.job_executions.create( + job_id, self.cluster_id, input_id, output_id, + configs=configs) + self.addCleanup(self.sahara.job_executions.delete, job_execution.id) - # Append the input/output paths with the swift configs - # if the caller has requested it... - if edp.compare_job_type( - job_type, edp.JOB_TYPE_JAVA) and pass_input_output_args: - self._add_swift_configs(configs) - if "args" in configs: - configs["args"].extend([swift_input_url, - output_url]) - else: - configs["args"] = [swift_input_url, - output_url] - - job_execution = self.sahara.job_executions.create( - job_id, self.cluster_id, input_id, output_id, - configs=configs) - - if job_execution: - self._await_job_execution(job_execution) - - except Exception as e: - with excutils.save_and_reraise_exception(): - print(str(e)) - - finally: - self.delete_swift_container(swift, container_name) - self._delete_job( - job_execution, job_id, job_binary_list + lib_binary_list, - job_binary_internal_list, input_id, output_id - ) + return job_execution.id diff --git a/sahara/tests/integration/tests/gating/test_cdh_gating.py b/sahara/tests/integration/tests/gating/test_cdh_gating.py index c4a4d316..89e89c95 100644 --- a/sahara/tests/integration/tests/gating/test_cdh_gating.py +++ b/sahara/tests/integration/tests/gating/test_cdh_gating.py @@ -199,35 +199,38 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, @b.errormsg("Failure while EDP testing: ") def _check_edp(self): - self._edp_test() + self.poll_jobs_status(list(self._run_edp_test())) - def _edp_test(self): + def _run_edp_test(self): # check pig pig_job = self.edp_info.read_pig_example_script() pig_lib = self.edp_info.read_pig_example_jar() - self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG, - job_data_list=[{'pig': pig_job}], - lib_data_list=[{'jar': pig_lib}], - swift_binaries=False, - hdfs_local_output=True) + yield self.edp_testing( + job_type=utils_edp.JOB_TYPE_PIG, + job_data_list=[{'pig': pig_job}], + lib_data_list=[{'jar': pig_lib}], + swift_binaries=False, + hdfs_local_output=True) # check mapreduce mapreduce_jar = self.edp_info.read_mapreduce_example_jar() mapreduce_configs = self.edp_info.mapreduce_example_configs() - self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE, - job_data_list=[], - lib_data_list=[{'jar': mapreduce_jar}], - configs=mapreduce_configs, - swift_binaries=False, - hdfs_local_output=True) + yield self.edp_testing( + job_type=utils_edp.JOB_TYPE_MAPREDUCE, + job_data_list=[], + lib_data_list=[{'jar': mapreduce_jar}], + configs=mapreduce_configs, + swift_binaries=False, + hdfs_local_output=True) # check mapreduce streaming - self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, - job_data_list=[], - lib_data_list=[], - configs=self.edp_info.mapreduce_streaming_configs(), - swift_binaries=False, - hdfs_local_output=True) + yield self.edp_testing( + job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, + job_data_list=[], + lib_data_list=[], + configs=self.edp_info.mapreduce_streaming_configs(), + swift_binaries=False, + hdfs_local_output=True) @b.errormsg("Failure while cluster scaling: ") def _check_scaling(self): @@ -278,7 +281,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, @b.errormsg("Failure while EDP testing after cluster scaling: ") def _check_edp_after_scaling(self): - self._edp_test() + self._check_edp() @testcase.skipIf( cfg.ITConfig().cdh_config.SKIP_ALL_TESTS_FOR_PLUGIN, diff --git a/sahara/tests/integration/tests/gating/test_hdp2_gating.py b/sahara/tests/integration/tests/gating/test_hdp2_gating.py index 4885f65e..5f9162e7 100644 --- a/sahara/tests/integration/tests/gating/test_hdp2_gating.py +++ b/sahara/tests/integration/tests/gating/test_hdp2_gating.py @@ -124,42 +124,46 @@ class HDP2GatingTest(swift.SwiftTest, scaling.ScalingTest, @b.errormsg("Failure while EDP testing: ") def _check_edp(self): - self._edp_test() + self.poll_jobs_status(list(self._run_edp_test())) - def _edp_test(self): + def _run_edp_test(self): # check pig pig_job = self.edp_info.read_pig_example_script() pig_lib = self.edp_info.read_pig_example_jar() - self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG, - job_data_list=[{'pig': pig_job}], - lib_data_list=[{'jar': pig_lib}], - swift_binaries=True, - hdfs_local_output=True) + yield self.edp_testing( + job_type=utils_edp.JOB_TYPE_PIG, + job_data_list=[{'pig': pig_job}], + lib_data_list=[{'jar': pig_lib}], + swift_binaries=True, + hdfs_local_output=True) # check mapreduce mapreduce_jar = self.edp_info.read_mapreduce_example_jar() mapreduce_configs = self.edp_info.mapreduce_example_configs() - self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE, - job_data_list=[], - lib_data_list=[{'jar': mapreduce_jar}], - configs=mapreduce_configs, - swift_binaries=True, - hdfs_local_output=True) + yield self.edp_testing( + job_type=utils_edp.JOB_TYPE_MAPREDUCE, + job_data_list=[], + lib_data_list=[{'jar': mapreduce_jar}], + configs=mapreduce_configs, + swift_binaries=True, + hdfs_local_output=True) # check mapreduce streaming - self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, - job_data_list=[], - lib_data_list=[], - configs=self.edp_info.mapreduce_streaming_configs()) + yield self.edp_testing( + job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, + job_data_list=[], + lib_data_list=[], + configs=self.edp_info.mapreduce_streaming_configs()) # check java java_jar = self.edp_info.read_java_example_lib(2) java_configs = self.edp_info.java_example_configs(2) - self.edp_testing(utils_edp.JOB_TYPE_JAVA, - job_data_list=[], - lib_data_list=[{'jar': java_jar}], - configs=java_configs) + yield self.edp_testing( + utils_edp.JOB_TYPE_JAVA, + job_data_list=[], + lib_data_list=[{'jar': java_jar}], + configs=java_configs) @b.errormsg("Failure while cluster scaling: ") def _check_scaling(self): @@ -192,7 +196,7 @@ class HDP2GatingTest(swift.SwiftTest, scaling.ScalingTest, @b.errormsg("Failure while EDP testing after cluster scaling: ") def _check_edp_after_scaling(self): - self._edp_test() + self._check_edp() @testcase.attr('hdp2') @testcase.skipIf(config.SKIP_ALL_TESTS_FOR_PLUGIN, diff --git a/sahara/tests/integration/tests/gating/test_hdp_gating.py b/sahara/tests/integration/tests/gating/test_hdp_gating.py index 12ecd4b3..75c7b41d 100644 --- a/sahara/tests/integration/tests/gating/test_hdp_gating.py +++ b/sahara/tests/integration/tests/gating/test_hdp_gating.py @@ -166,27 +166,39 @@ class HDPGatingTest(cinder.CinderVolumeTest, edp.EDPTest, java_lib_data = self.edp_info.read_java_example_lib() try: - self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG, - job_data_list=[{'pig': pig_job_data}], - lib_data_list=[{'jar': pig_lib_data}], - swift_binaries=True, - hdfs_local_output=True) - self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE, - job_data_list=[], - lib_data_list=[{'jar': mapreduce_jar_data}], - configs=self.edp_info.mapreduce_example_configs(), - swift_binaries=True, - hdfs_local_output=True) - self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, - job_data_list=[], - lib_data_list=[], - configs=( - self.edp_info.mapreduce_streaming_configs())) - self.edp_testing(job_type=utils_edp.JOB_TYPE_JAVA, - job_data_list=[], - lib_data_list=[{'jar': java_lib_data}], - configs=self.edp_info.java_example_configs(), - pass_input_output_args=True) + job_ids = [] + job_id = self.edp_testing( + job_type=utils_edp.JOB_TYPE_PIG, + job_data_list=[{'pig': pig_job_data}], + lib_data_list=[{'jar': pig_lib_data}], + swift_binaries=True, + hdfs_local_output=True) + job_ids.append(job_id) + + job_id = self.edp_testing( + job_type=utils_edp.JOB_TYPE_MAPREDUCE, + job_data_list=[], + lib_data_list=[{'jar': mapreduce_jar_data}], + configs=self.edp_info.mapreduce_example_configs(), + swift_binaries=True, + hdfs_local_output=True) + job_ids.append(job_id) + + job_id = self.edp_testing( + job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, + job_data_list=[], + lib_data_list=[], + configs=self.edp_info.mapreduce_streaming_configs()) + job_ids.append(job_id) + + job_id = self.edp_testing( + job_type=utils_edp.JOB_TYPE_JAVA, + job_data_list=[], + lib_data_list=[{'jar': java_lib_data}], + configs=self.edp_info.java_example_configs(), + pass_input_output_args=True) + job_ids.append(job_id) + self.poll_jobs_status(job_ids) except Exception as e: with excutils.save_and_reraise_exception(): diff --git a/sahara/tests/integration/tests/gating/test_spark_gating.py b/sahara/tests/integration/tests/gating/test_spark_gating.py index 26000175..2874c8c7 100644 --- a/sahara/tests/integration/tests/gating/test_spark_gating.py +++ b/sahara/tests/integration/tests/gating/test_spark_gating.py @@ -121,10 +121,12 @@ class SparkGatingTest(swift.SwiftTest, scaling.ScalingTest, # check spark spark_jar = self.edp_info.read_spark_example_jar() spark_configs = self.edp_info.spark_example_configs() - self.edp_testing(utils_edp.JOB_TYPE_SPARK, - job_data_list=[{'jar': spark_jar}], - lib_data_list=[], - configs=spark_configs) + job_id = self.edp_testing( + utils_edp.JOB_TYPE_SPARK, + job_data_list=[{'jar': spark_jar}], + lib_data_list=[], + configs=spark_configs) + self.poll_jobs_status([job_id]) @b.errormsg("Failure while cluster scaling: ") def _check_scaling(self): diff --git a/sahara/tests/integration/tests/gating/test_transient_gating.py b/sahara/tests/integration/tests/gating/test_transient_gating.py index 21e703b6..9017427b 100644 --- a/sahara/tests/integration/tests/gating/test_transient_gating.py +++ b/sahara/tests/integration/tests/gating/test_transient_gating.py @@ -101,9 +101,10 @@ class TransientGatingTest(edp.EDPTest): def _check_transient(self): pig_job_data = self.edp_info.read_pig_example_script() pig_lib_data = self.edp_info.read_pig_example_jar() - self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG, - job_data_list=[{'pig': pig_job_data}], - lib_data_list=[{'jar': pig_lib_data}]) + job_id = self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG, + job_data_list=[{'pig': pig_job_data}], + lib_data_list=[{'jar': pig_lib_data}]) + self.poll_jobs_status([job_id]) # set timeout in seconds timeout = self.common_config.TRANSIENT_CLUSTER_TIMEOUT * 60 diff --git a/sahara/tests/integration/tests/gating/test_vanilla_gating.py b/sahara/tests/integration/tests/gating/test_vanilla_gating.py index 6d8a24a6..df9a6420 100644 --- a/sahara/tests/integration/tests/gating/test_vanilla_gating.py +++ b/sahara/tests/integration/tests/gating/test_vanilla_gating.py @@ -255,40 +255,40 @@ class VanillaGatingTest(cinder.CinderVolumeTest, # This is a modified version of WordCount that takes swift configs java_lib_data = self.edp_info.read_java_example_lib() - try: - self.edp_testing( - job_type=utils_edp.JOB_TYPE_PIG, - job_data_list=[{'pig': pig_job_data}], - lib_data_list=[{'jar': pig_lib_data}], - swift_binaries=True, - hdfs_local_output=True) - self.edp_testing( - job_type=utils_edp.JOB_TYPE_MAPREDUCE, - job_data_list=[], - lib_data_list=[{'jar': mapreduce_jar_data}], - configs=self.edp_info.mapreduce_example_configs(), - swift_binaries=True, - hdfs_local_output=True) - self.edp_testing( - job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, - job_data_list=[], - lib_data_list=[], - configs=self.edp_info.mapreduce_streaming_configs()) - self.edp_testing( - job_type=utils_edp.JOB_TYPE_JAVA, - job_data_list=[], - lib_data_list=[{'jar': java_lib_data}], - configs=self.edp_info.java_example_configs(), - pass_input_output_args=True) + job_ids = [] + job_id = self.edp_testing( + job_type=utils_edp.JOB_TYPE_PIG, + job_data_list=[{'pig': pig_job_data}], + lib_data_list=[{'jar': pig_lib_data}], + swift_binaries=True, + hdfs_local_output=True) + job_ids.append(job_id) - except Exception as e: - with excutils.save_and_reraise_exception(): - self.delete_objects( - cluster_info['cluster_id'], cluster_template_id, - node_group_template_id_list - ) - message = 'Failure while EDP testing: ' - self.print_error_log(message, e) + job_id = self.edp_testing( + job_type=utils_edp.JOB_TYPE_MAPREDUCE, + job_data_list=[], + lib_data_list=[{'jar': mapreduce_jar_data}], + configs=self.edp_info.mapreduce_example_configs(), + swift_binaries=True, + hdfs_local_output=True) + job_ids.append(job_id) + + job_id = self.edp_testing( + job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, + job_data_list=[], + lib_data_list=[], + configs=self.edp_info.mapreduce_streaming_configs()) + job_ids.append(job_id) + + job_id = self.edp_testing( + job_type=utils_edp.JOB_TYPE_JAVA, + job_data_list=[], + lib_data_list=[{'jar': java_lib_data}], + configs=self.edp_info.java_example_configs(), + pass_input_output_args=True) + job_ids.append(job_id) + + self.poll_jobs_status(job_ids) edp_test() diff --git a/sahara/tests/integration/tests/gating/test_vanilla_two_gating.py b/sahara/tests/integration/tests/gating/test_vanilla_two_gating.py index 27b7980d..4e500eb7 100644 --- a/sahara/tests/integration/tests/gating/test_vanilla_two_gating.py +++ b/sahara/tests/integration/tests/gating/test_vanilla_two_gating.py @@ -63,6 +63,17 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest, self.volumes_per_node = 2 self.volume_size = 2 + ng_params = { + 'MapReduce': { + 'yarn.app.mapreduce.am.resource.mb': 256, + 'yarn.app.mapreduce.am.command-opts': '-Xmx256m' + }, + 'YARN': { + 'yarn.scheduler.minimum-allocation-mb': 256, + 'yarn.scheduler.maximum-allocation-mb': 1024 + } + } + @b.errormsg("Failure while 'nm-dn' node group template creation: ") def _create_nm_dn_ng_template(self): template = { @@ -71,7 +82,7 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest, 'description': 'test node group template for Vanilla plugin', 'node_processes': ['nodemanager', 'datanode'], 'floating_ip_pool': self.floating_ip_pool, - 'node_configs': {} + 'node_configs': self.ng_params } self.ng_tmpl_nm_dn_id = self.create_node_group_template(**template) self.ng_template_ids.append(self.ng_tmpl_nm_dn_id) @@ -86,7 +97,7 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest, 'volume_size': self.volume_size, 'node_processes': ['nodemanager'], 'floating_ip_pool': self.floating_ip_pool, - 'node_configs': {} + 'node_configs': self.ng_params } self.ng_tmpl_nm_id = self.create_node_group_template(**template) self.ng_template_ids.append(self.ng_tmpl_nm_id) @@ -101,7 +112,7 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest, 'volume_size': self.volume_size, 'node_processes': ['datanode'], 'floating_ip_pool': self.floating_ip_pool, - 'node_configs': {} + 'node_configs': self.ng_params } self.ng_tmpl_dn_id = self.create_node_group_template(**template) self.ng_template_ids.append(self.ng_tmpl_dn_id) @@ -123,7 +134,8 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest, 'flavor_id': self.flavor_id, 'node_processes': ['namenode', 'resourcemanager'], 'floating_ip_pool': self.floating_ip_pool, - 'count': 1 + 'count': 1, + 'node_configs': self.ng_params }, { 'name': 'master-node-oo-hs', @@ -131,7 +143,8 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest, 'node_processes': ['oozie', 'historyserver', 'secondarynamenode'], 'floating_ip_pool': self.floating_ip_pool, - 'count': 1 + 'count': 1, + 'node_configs': self.ng_params }, { 'name': 'worker-node-nm-dn', @@ -183,51 +196,57 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest, @b.errormsg("Failure while EDP testing: ") def _check_edp(self): + self.poll_jobs_status(list(self._run_edp_tests())) + + def _run_edp_tests(self): skipped_edp_job_types = self.vanilla_two_config.SKIP_EDP_JOB_TYPES if utils_edp.JOB_TYPE_PIG not in skipped_edp_job_types: - self._edp_pig_test() + yield self._edp_pig_test() if utils_edp.JOB_TYPE_MAPREDUCE not in skipped_edp_job_types: - self._edp_mapreduce_test() + yield self._edp_mapreduce_test() if utils_edp.JOB_TYPE_MAPREDUCE_STREAMING not in skipped_edp_job_types: - self._edp_mapreduce_streaming_test() + yield self._edp_mapreduce_streaming_test() if utils_edp.JOB_TYPE_JAVA not in skipped_edp_job_types: - self._edp_java_test() + yield self._edp_java_test() def _edp_pig_test(self): - pig_job = self.edp_info.read_pig_example_script() pig_lib = self.edp_info.read_pig_example_jar() - self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG, - job_data_list=[{'pig': pig_job}], - lib_data_list=[{'jar': pig_lib}], - swift_binaries=True, - hdfs_local_output=True) + return self.edp_testing( + job_type=utils_edp.JOB_TYPE_PIG, + job_data_list=[{'pig': pig_job}], + lib_data_list=[{'jar': pig_lib}], + swift_binaries=True, + hdfs_local_output=True) def _edp_mapreduce_test(self): mapreduce_jar = self.edp_info.read_mapreduce_example_jar() mapreduce_configs = self.edp_info.mapreduce_example_configs() - self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE, - job_data_list=[], - lib_data_list=[{'jar': mapreduce_jar}], - configs=mapreduce_configs, - swift_binaries=True, - hdfs_local_output=True) + return self.edp_testing( + job_type=utils_edp.JOB_TYPE_MAPREDUCE, + job_data_list=[], + lib_data_list=[{'jar': mapreduce_jar}], + configs=mapreduce_configs, + swift_binaries=True, + hdfs_local_output=True) def _edp_mapreduce_streaming_test(self): - self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, - job_data_list=[], - lib_data_list=[], - configs=self.edp_info.mapreduce_streaming_configs()) + return self.edp_testing( + job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, + job_data_list=[], + lib_data_list=[], + configs=self.edp_info.mapreduce_streaming_configs()) def _edp_java_test(self): java_jar = self.edp_info.read_java_example_lib(2) java_configs = self.edp_info.java_example_configs(2) - self.edp_testing(utils_edp.JOB_TYPE_JAVA, - job_data_list=[], - lib_data_list=[{'jar': java_jar}], - configs=java_configs) + return self.edp_testing( + utils_edp.JOB_TYPE_JAVA, + job_data_list=[], + lib_data_list=[{'jar': java_jar}], + configs=java_configs) @b.errormsg("Failure while cluster scaling: ") def _check_scaling(self):