Add integration test for Hive EDP job

Change-Id: Ia6472d0ba9c92793aa53c7294233442286d561ad
Closes-bug: #1277239
This commit is contained in:
Sergey Reshetnyak 2014-12-02 22:17:57 +03:00
parent 8bd8a9934a
commit 4adb174d6a
6 changed files with 72 additions and 19 deletions

View File

@ -0,0 +1,2 @@
Boris
Homer
1 Boris
2 Homer

View File

@ -0,0 +1,4 @@
Mike,20
Boris,42
Bart,12
Homer,40
1 Mike 20
2 Boris 42
3 Bart 12
4 Homer 40

View File

@ -0,0 +1,4 @@
CREATE DATABASE IF NOT EXISTS tests LOCATION '/user/hive/warehouse/tests.db';
CREATE EXTERNAL TABLE IF NOT EXISTS tests.students (name STRING, age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;
LOAD DATA INPATH '${INPUT}' INTO TABLE tests.students;
INSERT OVERWRITE DIRECTORY '${OUTPUT}' SELECT name FROM tests.students WHERE age > 30;

View File

@ -267,6 +267,16 @@ class ITestCase(testcase.WithAttributes, base.BaseTestCase):
# } # }
return node_ip_list_with_node_processes return node_ip_list_with_node_processes
def put_file_to_hdfs(self, namenode_ip, remote_path, data):
tmp_file_path = '/tmp/%s' % six.text_type(uuid.uuid4())
self.open_ssh_connection(namenode_ip, self.plugin_config.SSH_USERNAME)
self.write_file_to(tmp_file_path, data)
self.execute_command(
'sudo su - -c "hadoop dfs -copyFromLocal %s %s" %s' % (
tmp_file_path, remote_path, self.plugin_config.HADOOP_USER))
self.execute_command('rm -fr %s' % tmp_file_path)
self.close_ssh_connection()
def try_telnet(self, host, port): def try_telnet(self, host, port):
try: try:
telnetlib.Telnet(host, port) telnetlib.Telnet(host, port)

View File

@ -31,9 +31,16 @@ class EDPJobInfo(object):
JAVA_PATH = 'etc/edp-examples/edp-java/' JAVA_PATH = 'etc/edp-examples/edp-java/'
MAPREDUCE_PATH = 'etc/edp-examples/edp-mapreduce/' MAPREDUCE_PATH = 'etc/edp-examples/edp-mapreduce/'
SPARK_PATH = 'etc/edp-examples/edp-spark/' SPARK_PATH = 'etc/edp-examples/edp-spark/'
HIVE_PATH = 'etc/edp-examples/edp-hive/'
HADOOP2_JAVA_PATH = 'etc/edp-examples/hadoop2/edp-java/' HADOOP2_JAVA_PATH = 'etc/edp-examples/hadoop2/edp-java/'
def read_hive_example_script(self):
return open(self.HIVE_PATH + 'script.q').read()
def read_hive_example_input(self):
return open(self.HIVE_PATH + 'input.csv').read()
def read_pig_example_script(self): def read_pig_example_script(self):
return open(self.PIG_PATH + 'example.pig').read() return open(self.PIG_PATH + 'example.pig').read()
@ -204,6 +211,29 @@ class EDPTest(base.ITestCase):
configs["configs"][ configs["configs"][
sw.HADOOP_SWIFT_PASSWORD] = self.common_config.OS_PASSWORD sw.HADOOP_SWIFT_PASSWORD] = self.common_config.OS_PASSWORD
@base.skip_test('SKIP_EDP_TEST', 'Test for EDP was skipped.')
def check_edp_hive(self):
hdfs_input_path = '/user/hive/warehouse/input.csv'
# put input data to HDFS
self.put_file_to_hdfs(
self.cluster_info['node_info']['namenode_ip'],
hdfs_input_path, self.edp_info.read_hive_example_input())
input_id = self._create_data_source('hive-input', 'hdfs',
hdfs_input_path)
output_id = self._create_data_source('hive-output', 'hdfs',
'/user/hive/warehouse/output')
script_id = self._create_job_binary_internals(
'hive-script', self.edp_info.read_hive_example_script())
job_binary_id = self._create_job_binary('hive-edp',
'internal-db://%s' % script_id)
job_id = self._create_job('edp-test-hive', edp.JOB_TYPE_HIVE,
[job_binary_id], [])
job_execution_id = self.sahara.job_executions.create(
job_id, self.cluster_id, input_id, output_id, {}).id
self.addCleanup(self.sahara.job_executions.delete, job_execution_id)
return job_execution_id
@base.skip_test('SKIP_EDP_TEST', 'Test for EDP was skipped.') @base.skip_test('SKIP_EDP_TEST', 'Test for EDP was skipped.')
def edp_testing(self, job_type, job_data_list, lib_data_list=None, def edp_testing(self, job_type, job_data_list, lib_data_list=None,
configs=None, pass_input_output_args=False, configs=None, pass_input_output_args=False,

View File

@ -44,7 +44,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
self.ng_template_ids = [] self.ng_template_ids = []
def _prepare_test(self): def _prepare_test(self):
self.cdh_config = cfg.ITConfig().cdh_config self.plugin_config = cfg.ITConfig().cdh_config
self.floating_ip_pool = self.common_config.FLOATING_IP_POOL self.floating_ip_pool = self.common_config.FLOATING_IP_POOL
self.internal_neutron_net = None self.internal_neutron_net = None
if self.common_config.NEUTRON_ENABLED: if self.common_config.NEUTRON_ENABLED:
@ -52,8 +52,8 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
self.floating_ip_pool = ( self.floating_ip_pool = (
self.get_floating_ip_pool_id_for_neutron_net()) self.get_floating_ip_pool_id_for_neutron_net())
self.cdh_config.IMAGE_ID, self.cdh_config.SSH_USERNAME = ( self.plugin_config.IMAGE_ID, self.plugin_config.SSH_USERNAME = (
self.get_image_id_and_ssh_username(self.cdh_config)) self.get_image_id_and_ssh_username(self.plugin_config))
self.volumes_per_node = 0 self.volumes_per_node = 0
self.volumes_size = 0 self.volumes_size = 0
@ -65,7 +65,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
def _create_nm_dn_ng_template(self): def _create_nm_dn_ng_template(self):
template = { template = {
'name': 'test-node-group-template-cdh-nm-dn', 'name': 'test-node-group-template-cdh-nm-dn',
'plugin_config': self.cdh_config, 'plugin_config': self.plugin_config,
'description': 'test node group template for CDH plugin', 'description': 'test node group template for CDH plugin',
'node_processes': ['NODEMANAGER', 'DATANODE'], 'node_processes': ['NODEMANAGER', 'DATANODE'],
'floating_ip_pool': self.floating_ip_pool, 'floating_ip_pool': self.floating_ip_pool,
@ -79,7 +79,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
def _create_nm_ng_template(self): def _create_nm_ng_template(self):
template = { template = {
'name': 'test-node-group-template-cdh-nm', 'name': 'test-node-group-template-cdh-nm',
'plugin_config': self.cdh_config, 'plugin_config': self.plugin_config,
'description': 'test node group template for CDH plugin', 'description': 'test node group template for CDH plugin',
'volumes_per_node': self.volumes_per_node, 'volumes_per_node': self.volumes_per_node,
'volumes_size': self.volumes_size, 'volumes_size': self.volumes_size,
@ -95,7 +95,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
def _create_dn_ng_template(self): def _create_dn_ng_template(self):
template = { template = {
'name': 'test-node-group-template-cdh-dn', 'name': 'test-node-group-template-cdh-dn',
'plugin_config': self.cdh_config, 'plugin_config': self.plugin_config,
'description': 'test node group template for CDH plugin', 'description': 'test node group template for CDH plugin',
'volumes_per_node': self.volumes_per_node, 'volumes_per_node': self.volumes_per_node,
'volumes_size': self.volumes_size, 'volumes_size': self.volumes_size,
@ -111,24 +111,24 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
def _create_cluster_template(self): def _create_cluster_template(self):
cl_config = { cl_config = {
'general': { 'general': {
'CDH5 repo list URL': self.cdh_config.CDH_REPO_LIST_URL, 'CDH5 repo list URL': self.plugin_config.CDH_REPO_LIST_URL,
'CM5 repo list URL': self.cdh_config.CM_REPO_LIST_URL, 'CM5 repo list URL': self.plugin_config.CM_REPO_LIST_URL,
'CDH5 repo key URL (for debian-based only)': 'CDH5 repo key URL (for debian-based only)':
self.cdh_config.CDH_APT_KEY_URL, self.plugin_config.CDH_APT_KEY_URL,
'CM5 repo key URL (for debian-based only)': 'CM5 repo key URL (for debian-based only)':
self.cdh_config.CM_APT_KEY_URL, self.plugin_config.CM_APT_KEY_URL,
'Enable Swift': True 'Enable Swift': True
} }
} }
template = { template = {
'name': 'test-cluster-template-cdh', 'name': 'test-cluster-template-cdh',
'plugin_config': self.cdh_config, 'plugin_config': self.plugin_config,
'description': 'test cluster template for CDH plugin', 'description': 'test cluster template for CDH plugin',
'cluster_configs': cl_config, 'cluster_configs': cl_config,
'node_groups': [ 'node_groups': [
{ {
'name': 'manager-node', 'name': 'manager-node',
'flavor_id': self.cdh_config.MANAGERNODE_FLAVOR, 'flavor_id': self.plugin_config.MANAGERNODE_FLAVOR,
'node_processes': ['MANAGER'], 'node_processes': ['MANAGER'],
'floating_ip_pool': self.floating_ip_pool, 'floating_ip_pool': self.floating_ip_pool,
'auto_security_group': True, 'auto_security_group': True,
@ -175,10 +175,10 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
@b.errormsg("Failure while cluster creation: ") @b.errormsg("Failure while cluster creation: ")
def _create_cluster(self): def _create_cluster(self):
cluster_name = '%s-%s' % (self.common_config.CLUSTER_NAME, cluster_name = '%s-%s' % (self.common_config.CLUSTER_NAME,
self.cdh_config.PLUGIN_NAME) self.plugin_config.PLUGIN_NAME)
cluster = { cluster = {
'name': cluster_name, 'name': cluster_name,
'plugin_config': self.cdh_config, 'plugin_config': self.plugin_config,
'cluster_template_id': self.cluster_template_id, 'cluster_template_id': self.cluster_template_id,
'description': 'test cluster', 'description': 'test cluster',
'cluster_configs': { 'cluster_configs': {
@ -189,9 +189,9 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
} }
self.cluster_id = self.create_cluster(**cluster) self.cluster_id = self.create_cluster(**cluster)
self.poll_cluster_state(self.cluster_id) self.poll_cluster_state(self.cluster_id)
self.cluster_info = self.get_cluster_info(self.cdh_config) self.cluster_info = self.get_cluster_info(self.plugin_config)
self.await_active_workers_for_namenode(self.cluster_info['node_info'], self.await_active_workers_for_namenode(self.cluster_info['node_info'],
self.cdh_config) self.plugin_config)
@b.errormsg("Failure while Cinder testing: ") @b.errormsg("Failure while Cinder testing: ")
def _check_cinder(self): def _check_cinder(self):
@ -240,6 +240,9 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
swift_binaries=False, swift_binaries=False,
hdfs_local_output=True) hdfs_local_output=True)
# check hive
yield self.check_edp_hive()
@b.errormsg("Failure while cluster scaling: ") @b.errormsg("Failure while cluster scaling: ")
def _check_scaling(self): def _check_scaling(self):
change_list = [ change_list = [
@ -272,7 +275,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
self.cluster_info = self.cluster_scaling(self.cluster_info, self.cluster_info = self.cluster_scaling(self.cluster_info,
change_list) change_list)
self.await_active_workers_for_namenode(self.cluster_info['node_info'], self.await_active_workers_for_namenode(self.cluster_info['node_info'],
self.cdh_config) self.plugin_config)
@b.errormsg("Failure while Cinder testing after cluster scaling: ") @b.errormsg("Failure while Cinder testing after cluster scaling: ")
def _check_cinder_after_scaling(self): def _check_cinder_after_scaling(self):
@ -309,7 +312,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
self._check_swift() self._check_swift()
self._check_edp() self._check_edp()
if not self.cdh_config.SKIP_SCALING_TEST: if not self.plugin_config.SKIP_SCALING_TEST:
self._check_scaling() self._check_scaling()
self._check_cinder_after_scaling() self._check_cinder_after_scaling()
self._check_edp_after_scaling() self._check_edp_after_scaling()
@ -330,7 +333,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest,
print("Cloudera Manager node not found") print("Cloudera Manager node not found")
return return
self.open_ssh_connection(manager_node, self.cdh_config.SSH_USERNAME) self.open_ssh_connection(manager_node, self.plugin_config.SSH_USERNAME)
try: try:
log = self.execute_command('sudo cat /var/log/cloudera-scm-server/' log = self.execute_command('sudo cat /var/log/cloudera-scm-server/'
'cloudera-scm-server.log')[1] 'cloudera-scm-server.log')[1]