Merge "Parallel testing EDP jobs"

This commit is contained in:
Jenkins 2014-10-07 03:52:01 +00:00 committed by Gerrit Code Review
commit 4d3d9cfa9f
8 changed files with 284 additions and 267 deletions

View File

@ -19,7 +19,6 @@ import time
import uuid
import fixtures
from oslo.utils import excutils
import six
from sahara.swift import swift_helper as sw
@ -103,41 +102,54 @@ class EDPTest(base.ITestCase):
self.edp_info = EDPJobInfo()
def _create_data_source(self, name, data_type, url, description=''):
return self.sahara.data_sources.create(
source_id = self.sahara.data_sources.create(
name, description, data_type, url, self.common_config.OS_USERNAME,
self.common_config.OS_PASSWORD).id
self.addCleanup(self.sahara.data_sources.delete, source_id)
return source_id
def _create_job_binary_internals(self, name, data):
return self.sahara.job_binary_internals.create(name, data).id
job_binary_id = self.sahara.job_binary_internals.create(name, data).id
self.addCleanup(self.sahara.job_binary_internals.delete, job_binary_id)
return job_binary_id
def _create_job_binary(self, name, url, extra=None, description=None):
return self.sahara.job_binaries.create(
job_binary_id = self.sahara.job_binaries.create(
name, url, description or '', extra or {}).id
self.addCleanup(self.sahara.job_binaries.delete, job_binary_id)
return job_binary_id
def _create_job(self, name, job_type, mains, libs):
return self.sahara.jobs.create(name, job_type, mains, libs,
description='').id
job_id = self.sahara.jobs.create(name, job_type, mains, libs,
description='').id
self.addCleanup(self.sahara.jobs.delete, job_id)
return job_id
def _await_job_execution(self, job):
timeout = self.common_config.JOB_LAUNCH_TIMEOUT * 60
status = self.sahara.job_executions.get(job.id).info['status']
def _get_job_status(self, job_id):
return self.sahara.job_executions.get(job_id).info['status']
def poll_jobs_status(self, job_ids):
timeout = self.common_config.JOB_LAUNCH_TIMEOUT * 60 * len(job_ids)
try:
with fixtures.Timeout(timeout, gentle=True):
while status != edp.JOB_STATUS_SUCCEEDED:
if status == edp.JOB_STATUS_KILLED:
self.fail("Job status == '{0}'.".format(
edp.JOB_STATUS_KILLED))
time.sleep(10)
status = self.sahara.job_executions.get(
job.id).info['status']
success = False
while not success:
success = True
for job_id in job_ids:
status = self._get_job_status(job_id)
if status in [edp.JOB_STATUS_FAILED,
edp.JOB_STATUS_KILLED,
edp.JOB_STATUS_DONEWITHERROR]:
self.fail(
'Job status "%s" \'%s\'.' % (job_id, status))
if status != edp.JOB_STATUS_SUCCEEDED:
success = False
time.sleep(5)
except fixtures.TimeoutException:
self.fail(
"Job did not return to '{0}' status within {1:d} minute(s)."
.format(edp.JOB_STATUS_SUCCEEDED,
self.common_config.JOB_LAUNCH_TIMEOUT)
)
"Jobs did not return to '{0}' status within {1:d} minute(s)."
.format(edp.JOB_STATUS_SUCCEEDED, timeout / 60))
def _create_job_binaries(self, job_data_list, job_binary_internal_list,
job_binary_list, swift_connection=None,
@ -172,23 +184,6 @@ class EDPTest(base.ITestCase):
)
)
def _delete_job(self, execution_job, job_id, job_binary_list,
job_binary_internal_list, input_id, output_id):
if execution_job:
self.sahara.job_executions.delete(execution_job.id)
if job_id:
self.sahara.jobs.delete(job_id)
if job_binary_list:
for job_binary_id in job_binary_list:
self.sahara.job_binaries.delete(job_binary_id)
if job_binary_internal_list:
for internal_id in job_binary_internal_list:
self.sahara.job_binary_internals.delete(internal_id)
if input_id:
self.sahara.data_sources.delete(input_id)
if output_id:
self.sahara.data_sources.delete(output_id)
def _add_swift_configs(self, configs):
if "configs" not in configs:
@ -201,8 +196,7 @@ class EDPTest(base.ITestCase):
configs["configs"][
sw.HADOOP_SWIFT_PASSWORD] = self.common_config.OS_PASSWORD
@base.skip_test('SKIP_EDP_TEST',
'Test for EDP was skipped.')
@base.skip_test('SKIP_EDP_TEST', 'Test for EDP was skipped.')
def edp_testing(self, job_type, job_data_list, lib_data_list=None,
configs=None, pass_input_output_args=False,
swift_binaries=False, hdfs_local_output=False):
@ -210,112 +204,94 @@ class EDPTest(base.ITestCase):
lib_data_list = lib_data_list or []
configs = configs or {}
try:
swift = self.connect_to_swift()
container_name = 'Edp-test-%s' % str(uuid.uuid4())[:8]
swift.put_container(container_name)
swift.put_object(
container_name, 'input', ''.join(
random.choice(':' + ' ' + '\n' + string.ascii_lowercase)
for x in six.moves.range(10000)
)
swift = self.connect_to_swift()
container_name = 'Edp-test-%s' % str(uuid.uuid4())[:8]
swift.put_container(container_name)
self.addCleanup(self.delete_swift_container, swift, container_name)
swift.put_object(
container_name, 'input', ''.join(
random.choice(':' + ' ' + '\n' + string.ascii_lowercase)
for x in six.moves.range(10000)
)
)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_swift_container(swift, container_name)
print(str(e))
input_id = None
output_id = None
job_id = None
job_execution = None
try:
job_binary_list = []
lib_binary_list = []
job_binary_internal_list = []
job_binary_list = []
lib_binary_list = []
job_binary_internal_list = []
swift_input_url = 'swift://%s.sahara/input' % container_name
if hdfs_local_output:
# This will create a file in hdfs under the user
# executing the job (i.e. /usr/hadoop/Edp-test-xxxx-out)
output_type = "hdfs"
output_url = container_name + "-out"
swift_input_url = 'swift://%s.sahara/input' % container_name
if hdfs_local_output:
# This will create a file in hdfs under the user
# executing the job (i.e. /usr/hadoop/Edp-test-xxxx-out)
output_type = "hdfs"
output_url = container_name + "-out"
else:
output_type = "swift"
output_url = 'swift://%s.sahara/output' % container_name
# Java jobs don't use data sources. Input/output paths must
# be passed as args with corresponding username/password configs
if not edp.compare_job_type(job_type,
edp.JOB_TYPE_JAVA,
edp.JOB_TYPE_SPARK):
input_id = self._create_data_source(
'input-%s' % str(uuid.uuid4())[:8], 'swift',
swift_input_url)
output_id = self._create_data_source(
'output-%s' % str(uuid.uuid4())[:8], output_type,
output_url)
if job_data_list:
if swift_binaries:
self._create_job_binaries(job_data_list,
job_binary_internal_list,
job_binary_list,
swift_connection=swift,
container_name=container_name)
else:
output_type = "swift"
output_url = 'swift://%s.sahara/output' % container_name
self._create_job_binaries(job_data_list,
job_binary_internal_list,
job_binary_list)
# Java jobs don't use data sources. Input/output paths must
# be passed as args with corresponding username/password configs
if not edp.compare_job_type(job_type,
edp.JOB_TYPE_JAVA,
edp.JOB_TYPE_SPARK):
input_id = self._create_data_source(
'input-%s' % str(uuid.uuid4())[:8], 'swift',
swift_input_url)
output_id = self._create_data_source(
'output-%s' % str(uuid.uuid4())[:8], output_type,
output_url)
if lib_data_list:
if swift_binaries:
self._create_job_binaries(lib_data_list,
job_binary_internal_list,
lib_binary_list,
swift_connection=swift,
container_name=container_name)
else:
self._create_job_binaries(lib_data_list,
job_binary_internal_list,
lib_binary_list)
if job_data_list:
if swift_binaries:
self._create_job_binaries(job_data_list,
job_binary_internal_list,
job_binary_list,
swift_connection=swift,
container_name=container_name)
else:
self._create_job_binaries(job_data_list,
job_binary_internal_list,
job_binary_list)
job_id = self._create_job(
'Edp-test-job-%s' % str(uuid.uuid4())[:8], job_type,
job_binary_list, lib_binary_list)
if not configs:
configs = {}
if lib_data_list:
if swift_binaries:
self._create_job_binaries(lib_data_list,
job_binary_internal_list,
lib_binary_list,
swift_connection=swift,
container_name=container_name)
else:
self._create_job_binaries(lib_data_list,
job_binary_internal_list,
lib_binary_list)
# TODO(tmckay): for spark we don't have support for swift
# yet. When we do, we'll need something to here to set up
# swift paths and we can use a spark wordcount job
job_id = self._create_job(
'Edp-test-job-%s' % str(uuid.uuid4())[:8], job_type,
job_binary_list, lib_binary_list)
if not configs:
configs = {}
# Append the input/output paths with the swift configs
# if the caller has requested it...
if edp.compare_job_type(
job_type, edp.JOB_TYPE_JAVA) and pass_input_output_args:
self._add_swift_configs(configs)
if "args" in configs:
configs["args"].extend([swift_input_url, output_url])
else:
configs["args"] = [swift_input_url, output_url]
# TODO(tmckay): for spark we don't have support for swift
# yet. When we do, we'll need something to here to set up
# swift paths and we can use a spark wordcount job
job_execution = self.sahara.job_executions.create(
job_id, self.cluster_id, input_id, output_id,
configs=configs)
self.addCleanup(self.sahara.job_executions.delete, job_execution.id)
# Append the input/output paths with the swift configs
# if the caller has requested it...
if edp.compare_job_type(
job_type, edp.JOB_TYPE_JAVA) and pass_input_output_args:
self._add_swift_configs(configs)
if "args" in configs:
configs["args"].extend([swift_input_url,
output_url])
else:
configs["args"] = [swift_input_url,
output_url]
job_execution = self.sahara.job_executions.create(
job_id, self.cluster_id, input_id, output_id,
configs=configs)
if job_execution:
self._await_job_execution(job_execution)
except Exception as e:
with excutils.save_and_reraise_exception():
print(str(e))
finally:
self.delete_swift_container(swift, container_name)
self._delete_job(
job_execution, job_id, job_binary_list + lib_binary_list,
job_binary_internal_list, input_id, output_id
)
return job_execution.id

View File

@ -199,35 +199,38 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
@b.errormsg("Failure while EDP testing: ")
def _check_edp(self):
self._edp_test()
self.poll_jobs_status(list(self._run_edp_test()))
def _edp_test(self):
def _run_edp_test(self):
# check pig
pig_job = self.edp_info.read_pig_example_script()
pig_lib = self.edp_info.read_pig_example_jar()
self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job}],
lib_data_list=[{'jar': pig_lib}],
swift_binaries=False,
hdfs_local_output=True)
yield self.edp_testing(
job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job}],
lib_data_list=[{'jar': pig_lib}],
swift_binaries=False,
hdfs_local_output=True)
# check mapreduce
mapreduce_jar = self.edp_info.read_mapreduce_example_jar()
mapreduce_configs = self.edp_info.mapreduce_example_configs()
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE,
job_data_list=[],
lib_data_list=[{'jar': mapreduce_jar}],
configs=mapreduce_configs,
swift_binaries=False,
hdfs_local_output=True)
yield self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE,
job_data_list=[],
lib_data_list=[{'jar': mapreduce_jar}],
configs=mapreduce_configs,
swift_binaries=False,
hdfs_local_output=True)
# check mapreduce streaming
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
job_data_list=[],
lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs(),
swift_binaries=False,
hdfs_local_output=True)
yield self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
job_data_list=[],
lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs(),
swift_binaries=False,
hdfs_local_output=True)
@b.errormsg("Failure while cluster scaling: ")
def _check_scaling(self):
@ -278,7 +281,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
@b.errormsg("Failure while EDP testing after cluster scaling: ")
def _check_edp_after_scaling(self):
self._edp_test()
self._check_edp()
@testcase.skipIf(
cfg.ITConfig().cdh_config.SKIP_ALL_TESTS_FOR_PLUGIN,

View File

@ -124,42 +124,46 @@ class HDP2GatingTest(swift.SwiftTest, scaling.ScalingTest,
@b.errormsg("Failure while EDP testing: ")
def _check_edp(self):
self._edp_test()
self.poll_jobs_status(list(self._run_edp_test()))
def _edp_test(self):
def _run_edp_test(self):
# check pig
pig_job = self.edp_info.read_pig_example_script()
pig_lib = self.edp_info.read_pig_example_jar()
self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job}],
lib_data_list=[{'jar': pig_lib}],
swift_binaries=True,
hdfs_local_output=True)
yield self.edp_testing(
job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job}],
lib_data_list=[{'jar': pig_lib}],
swift_binaries=True,
hdfs_local_output=True)
# check mapreduce
mapreduce_jar = self.edp_info.read_mapreduce_example_jar()
mapreduce_configs = self.edp_info.mapreduce_example_configs()
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE,
job_data_list=[],
lib_data_list=[{'jar': mapreduce_jar}],
configs=mapreduce_configs,
swift_binaries=True,
hdfs_local_output=True)
yield self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE,
job_data_list=[],
lib_data_list=[{'jar': mapreduce_jar}],
configs=mapreduce_configs,
swift_binaries=True,
hdfs_local_output=True)
# check mapreduce streaming
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
job_data_list=[],
lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs())
yield self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
job_data_list=[],
lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs())
# check java
java_jar = self.edp_info.read_java_example_lib(2)
java_configs = self.edp_info.java_example_configs(2)
self.edp_testing(utils_edp.JOB_TYPE_JAVA,
job_data_list=[],
lib_data_list=[{'jar': java_jar}],
configs=java_configs)
yield self.edp_testing(
utils_edp.JOB_TYPE_JAVA,
job_data_list=[],
lib_data_list=[{'jar': java_jar}],
configs=java_configs)
@b.errormsg("Failure while cluster scaling: ")
def _check_scaling(self):
@ -192,7 +196,7 @@ class HDP2GatingTest(swift.SwiftTest, scaling.ScalingTest,
@b.errormsg("Failure while EDP testing after cluster scaling: ")
def _check_edp_after_scaling(self):
self._edp_test()
self._check_edp()
@testcase.attr('hdp2')
@testcase.skipIf(config.SKIP_ALL_TESTS_FOR_PLUGIN,

View File

@ -166,27 +166,39 @@ class HDPGatingTest(cinder.CinderVolumeTest, edp.EDPTest,
java_lib_data = self.edp_info.read_java_example_lib()
try:
self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job_data}],
lib_data_list=[{'jar': pig_lib_data}],
swift_binaries=True,
hdfs_local_output=True)
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE,
job_data_list=[],
lib_data_list=[{'jar': mapreduce_jar_data}],
configs=self.edp_info.mapreduce_example_configs(),
swift_binaries=True,
hdfs_local_output=True)
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
job_data_list=[],
lib_data_list=[],
configs=(
self.edp_info.mapreduce_streaming_configs()))
self.edp_testing(job_type=utils_edp.JOB_TYPE_JAVA,
job_data_list=[],
lib_data_list=[{'jar': java_lib_data}],
configs=self.edp_info.java_example_configs(),
pass_input_output_args=True)
job_ids = []
job_id = self.edp_testing(
job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job_data}],
lib_data_list=[{'jar': pig_lib_data}],
swift_binaries=True,
hdfs_local_output=True)
job_ids.append(job_id)
job_id = self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE,
job_data_list=[],
lib_data_list=[{'jar': mapreduce_jar_data}],
configs=self.edp_info.mapreduce_example_configs(),
swift_binaries=True,
hdfs_local_output=True)
job_ids.append(job_id)
job_id = self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
job_data_list=[],
lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs())
job_ids.append(job_id)
job_id = self.edp_testing(
job_type=utils_edp.JOB_TYPE_JAVA,
job_data_list=[],
lib_data_list=[{'jar': java_lib_data}],
configs=self.edp_info.java_example_configs(),
pass_input_output_args=True)
job_ids.append(job_id)
self.poll_jobs_status(job_ids)
except Exception as e:
with excutils.save_and_reraise_exception():

View File

@ -121,10 +121,12 @@ class SparkGatingTest(swift.SwiftTest, scaling.ScalingTest,
# check spark
spark_jar = self.edp_info.read_spark_example_jar()
spark_configs = self.edp_info.spark_example_configs()
self.edp_testing(utils_edp.JOB_TYPE_SPARK,
job_data_list=[{'jar': spark_jar}],
lib_data_list=[],
configs=spark_configs)
job_id = self.edp_testing(
utils_edp.JOB_TYPE_SPARK,
job_data_list=[{'jar': spark_jar}],
lib_data_list=[],
configs=spark_configs)
self.poll_jobs_status([job_id])
@b.errormsg("Failure while cluster scaling: ")
def _check_scaling(self):

View File

@ -101,9 +101,10 @@ class TransientGatingTest(edp.EDPTest):
def _check_transient(self):
pig_job_data = self.edp_info.read_pig_example_script()
pig_lib_data = self.edp_info.read_pig_example_jar()
self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job_data}],
lib_data_list=[{'jar': pig_lib_data}])
job_id = self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job_data}],
lib_data_list=[{'jar': pig_lib_data}])
self.poll_jobs_status([job_id])
# set timeout in seconds
timeout = self.common_config.TRANSIENT_CLUSTER_TIMEOUT * 60

View File

@ -255,40 +255,40 @@ class VanillaGatingTest(cinder.CinderVolumeTest,
# This is a modified version of WordCount that takes swift configs
java_lib_data = self.edp_info.read_java_example_lib()
try:
self.edp_testing(
job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job_data}],
lib_data_list=[{'jar': pig_lib_data}],
swift_binaries=True,
hdfs_local_output=True)
self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE,
job_data_list=[],
lib_data_list=[{'jar': mapreduce_jar_data}],
configs=self.edp_info.mapreduce_example_configs(),
swift_binaries=True,
hdfs_local_output=True)
self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
job_data_list=[],
lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs())
self.edp_testing(
job_type=utils_edp.JOB_TYPE_JAVA,
job_data_list=[],
lib_data_list=[{'jar': java_lib_data}],
configs=self.edp_info.java_example_configs(),
pass_input_output_args=True)
job_ids = []
job_id = self.edp_testing(
job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job_data}],
lib_data_list=[{'jar': pig_lib_data}],
swift_binaries=True,
hdfs_local_output=True)
job_ids.append(job_id)
except Exception as e:
with excutils.save_and_reraise_exception():
self.delete_objects(
cluster_info['cluster_id'], cluster_template_id,
node_group_template_id_list
)
message = 'Failure while EDP testing: '
self.print_error_log(message, e)
job_id = self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE,
job_data_list=[],
lib_data_list=[{'jar': mapreduce_jar_data}],
configs=self.edp_info.mapreduce_example_configs(),
swift_binaries=True,
hdfs_local_output=True)
job_ids.append(job_id)
job_id = self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
job_data_list=[],
lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs())
job_ids.append(job_id)
job_id = self.edp_testing(
job_type=utils_edp.JOB_TYPE_JAVA,
job_data_list=[],
lib_data_list=[{'jar': java_lib_data}],
configs=self.edp_info.java_example_configs(),
pass_input_output_args=True)
job_ids.append(job_id)
self.poll_jobs_status(job_ids)
edp_test()

View File

@ -63,6 +63,17 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
self.volumes_per_node = 2
self.volume_size = 2
ng_params = {
'MapReduce': {
'yarn.app.mapreduce.am.resource.mb': 256,
'yarn.app.mapreduce.am.command-opts': '-Xmx256m'
},
'YARN': {
'yarn.scheduler.minimum-allocation-mb': 256,
'yarn.scheduler.maximum-allocation-mb': 1024
}
}
@b.errormsg("Failure while 'nm-dn' node group template creation: ")
def _create_nm_dn_ng_template(self):
template = {
@ -71,7 +82,7 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
'description': 'test node group template for Vanilla plugin',
'node_processes': ['nodemanager', 'datanode'],
'floating_ip_pool': self.floating_ip_pool,
'node_configs': {}
'node_configs': self.ng_params
}
self.ng_tmpl_nm_dn_id = self.create_node_group_template(**template)
self.ng_template_ids.append(self.ng_tmpl_nm_dn_id)
@ -86,7 +97,7 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
'volume_size': self.volume_size,
'node_processes': ['nodemanager'],
'floating_ip_pool': self.floating_ip_pool,
'node_configs': {}
'node_configs': self.ng_params
}
self.ng_tmpl_nm_id = self.create_node_group_template(**template)
self.ng_template_ids.append(self.ng_tmpl_nm_id)
@ -101,7 +112,7 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
'volume_size': self.volume_size,
'node_processes': ['datanode'],
'floating_ip_pool': self.floating_ip_pool,
'node_configs': {}
'node_configs': self.ng_params
}
self.ng_tmpl_dn_id = self.create_node_group_template(**template)
self.ng_template_ids.append(self.ng_tmpl_dn_id)
@ -123,7 +134,8 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
'flavor_id': self.flavor_id,
'node_processes': ['namenode', 'resourcemanager'],
'floating_ip_pool': self.floating_ip_pool,
'count': 1
'count': 1,
'node_configs': self.ng_params
},
{
'name': 'master-node-oo-hs',
@ -131,7 +143,8 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
'node_processes': ['oozie', 'historyserver',
'secondarynamenode'],
'floating_ip_pool': self.floating_ip_pool,
'count': 1
'count': 1,
'node_configs': self.ng_params
},
{
'name': 'worker-node-nm-dn',
@ -183,51 +196,57 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
@b.errormsg("Failure while EDP testing: ")
def _check_edp(self):
self.poll_jobs_status(list(self._run_edp_tests()))
def _run_edp_tests(self):
skipped_edp_job_types = self.vanilla_two_config.SKIP_EDP_JOB_TYPES
if utils_edp.JOB_TYPE_PIG not in skipped_edp_job_types:
self._edp_pig_test()
yield self._edp_pig_test()
if utils_edp.JOB_TYPE_MAPREDUCE not in skipped_edp_job_types:
self._edp_mapreduce_test()
yield self._edp_mapreduce_test()
if utils_edp.JOB_TYPE_MAPREDUCE_STREAMING not in skipped_edp_job_types:
self._edp_mapreduce_streaming_test()
yield self._edp_mapreduce_streaming_test()
if utils_edp.JOB_TYPE_JAVA not in skipped_edp_job_types:
self._edp_java_test()
yield self._edp_java_test()
def _edp_pig_test(self):
pig_job = self.edp_info.read_pig_example_script()
pig_lib = self.edp_info.read_pig_example_jar()
self.edp_testing(job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job}],
lib_data_list=[{'jar': pig_lib}],
swift_binaries=True,
hdfs_local_output=True)
return self.edp_testing(
job_type=utils_edp.JOB_TYPE_PIG,
job_data_list=[{'pig': pig_job}],
lib_data_list=[{'jar': pig_lib}],
swift_binaries=True,
hdfs_local_output=True)
def _edp_mapreduce_test(self):
mapreduce_jar = self.edp_info.read_mapreduce_example_jar()
mapreduce_configs = self.edp_info.mapreduce_example_configs()
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE,
job_data_list=[],
lib_data_list=[{'jar': mapreduce_jar}],
configs=mapreduce_configs,
swift_binaries=True,
hdfs_local_output=True)
return self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE,
job_data_list=[],
lib_data_list=[{'jar': mapreduce_jar}],
configs=mapreduce_configs,
swift_binaries=True,
hdfs_local_output=True)
def _edp_mapreduce_streaming_test(self):
self.edp_testing(job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
job_data_list=[],
lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs())
return self.edp_testing(
job_type=utils_edp.JOB_TYPE_MAPREDUCE_STREAMING,
job_data_list=[],
lib_data_list=[],
configs=self.edp_info.mapreduce_streaming_configs())
def _edp_java_test(self):
java_jar = self.edp_info.read_java_example_lib(2)
java_configs = self.edp_info.java_example_configs(2)
self.edp_testing(utils_edp.JOB_TYPE_JAVA,
job_data_list=[],
lib_data_list=[{'jar': java_jar}],
configs=java_configs)
return self.edp_testing(
utils_edp.JOB_TYPE_JAVA,
job_data_list=[],
lib_data_list=[{'jar': java_jar}],
configs=java_configs)
@b.errormsg("Failure while cluster scaling: ")
def _check_scaling(self):