From 4adb174d6a8415657a2104fcb8252959850ef299 Mon Sep 17 00:00:00 2001 From: Sergey Reshetnyak Date: Tue, 2 Dec 2014 22:17:57 +0300 Subject: [PATCH] Add integration test for Hive EDP job Change-Id: Ia6472d0ba9c92793aa53c7294233442286d561ad Closes-bug: #1277239 --- etc/edp-examples/edp-hive/expected_output.csv | 2 + etc/edp-examples/edp-hive/input.csv | 4 ++ etc/edp-examples/edp-hive/script.q | 4 ++ sahara/tests/integration/tests/base.py | 10 +++++ sahara/tests/integration/tests/edp.py | 30 ++++++++++++++ .../tests/gating/test_cdh_gating.py | 41 ++++++++++--------- 6 files changed, 72 insertions(+), 19 deletions(-) create mode 100644 etc/edp-examples/edp-hive/expected_output.csv create mode 100644 etc/edp-examples/edp-hive/input.csv create mode 100644 etc/edp-examples/edp-hive/script.q diff --git a/etc/edp-examples/edp-hive/expected_output.csv b/etc/edp-examples/edp-hive/expected_output.csv new file mode 100644 index 0000000000..be6bf27cb3 --- /dev/null +++ b/etc/edp-examples/edp-hive/expected_output.csv @@ -0,0 +1,2 @@ +Boris +Homer diff --git a/etc/edp-examples/edp-hive/input.csv b/etc/edp-examples/edp-hive/input.csv new file mode 100644 index 0000000000..d682e1304b --- /dev/null +++ b/etc/edp-examples/edp-hive/input.csv @@ -0,0 +1,4 @@ +Mike,20 +Boris,42 +Bart,12 +Homer,40 diff --git a/etc/edp-examples/edp-hive/script.q b/etc/edp-examples/edp-hive/script.q new file mode 100644 index 0000000000..64bc0868e6 --- /dev/null +++ b/etc/edp-examples/edp-hive/script.q @@ -0,0 +1,4 @@ +CREATE DATABASE IF NOT EXISTS tests LOCATION '/user/hive/warehouse/tests.db'; +CREATE EXTERNAL TABLE IF NOT EXISTS tests.students (name STRING, age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE; +LOAD DATA INPATH '${INPUT}' INTO TABLE tests.students; +INSERT OVERWRITE DIRECTORY '${OUTPUT}' SELECT name FROM tests.students WHERE age > 30; diff --git a/sahara/tests/integration/tests/base.py b/sahara/tests/integration/tests/base.py index faaf7600f7..c6e0b6eed1 100644 --- a/sahara/tests/integration/tests/base.py +++ b/sahara/tests/integration/tests/base.py @@ -267,6 +267,16 @@ class ITestCase(testcase.WithAttributes, base.BaseTestCase): # } return node_ip_list_with_node_processes + def put_file_to_hdfs(self, namenode_ip, remote_path, data): + tmp_file_path = '/tmp/%s' % six.text_type(uuid.uuid4()) + self.open_ssh_connection(namenode_ip, self.plugin_config.SSH_USERNAME) + self.write_file_to(tmp_file_path, data) + self.execute_command( + 'sudo su - -c "hadoop dfs -copyFromLocal %s %s" %s' % ( + tmp_file_path, remote_path, self.plugin_config.HADOOP_USER)) + self.execute_command('rm -fr %s' % tmp_file_path) + self.close_ssh_connection() + def try_telnet(self, host, port): try: telnetlib.Telnet(host, port) diff --git a/sahara/tests/integration/tests/edp.py b/sahara/tests/integration/tests/edp.py index 3fba4b5989..416a33148c 100644 --- a/sahara/tests/integration/tests/edp.py +++ b/sahara/tests/integration/tests/edp.py @@ -31,9 +31,16 @@ class EDPJobInfo(object): JAVA_PATH = 'etc/edp-examples/edp-java/' MAPREDUCE_PATH = 'etc/edp-examples/edp-mapreduce/' SPARK_PATH = 'etc/edp-examples/edp-spark/' + HIVE_PATH = 'etc/edp-examples/edp-hive/' HADOOP2_JAVA_PATH = 'etc/edp-examples/hadoop2/edp-java/' + def read_hive_example_script(self): + return open(self.HIVE_PATH + 'script.q').read() + + def read_hive_example_input(self): + return open(self.HIVE_PATH + 'input.csv').read() + def read_pig_example_script(self): return open(self.PIG_PATH + 'example.pig').read() @@ -204,6 +211,29 @@ class EDPTest(base.ITestCase): configs["configs"][ sw.HADOOP_SWIFT_PASSWORD] = self.common_config.OS_PASSWORD + @base.skip_test('SKIP_EDP_TEST', 'Test for EDP was skipped.') + def check_edp_hive(self): + hdfs_input_path = '/user/hive/warehouse/input.csv' + # put input data to HDFS + self.put_file_to_hdfs( + self.cluster_info['node_info']['namenode_ip'], + hdfs_input_path, self.edp_info.read_hive_example_input()) + + input_id = self._create_data_source('hive-input', 'hdfs', + hdfs_input_path) + output_id = self._create_data_source('hive-output', 'hdfs', + '/user/hive/warehouse/output') + script_id = self._create_job_binary_internals( + 'hive-script', self.edp_info.read_hive_example_script()) + job_binary_id = self._create_job_binary('hive-edp', + 'internal-db://%s' % script_id) + job_id = self._create_job('edp-test-hive', edp.JOB_TYPE_HIVE, + [job_binary_id], []) + job_execution_id = self.sahara.job_executions.create( + job_id, self.cluster_id, input_id, output_id, {}).id + self.addCleanup(self.sahara.job_executions.delete, job_execution_id) + return job_execution_id + @base.skip_test('SKIP_EDP_TEST', 'Test for EDP was skipped.') def edp_testing(self, job_type, job_data_list, lib_data_list=None, configs=None, pass_input_output_args=False, diff --git a/sahara/tests/integration/tests/gating/test_cdh_gating.py b/sahara/tests/integration/tests/gating/test_cdh_gating.py index 963df3d1d5..46120bd1dc 100644 --- a/sahara/tests/integration/tests/gating/test_cdh_gating.py +++ b/sahara/tests/integration/tests/gating/test_cdh_gating.py @@ -44,7 +44,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, self.ng_template_ids = [] def _prepare_test(self): - self.cdh_config = cfg.ITConfig().cdh_config + self.plugin_config = cfg.ITConfig().cdh_config self.floating_ip_pool = self.common_config.FLOATING_IP_POOL self.internal_neutron_net = None if self.common_config.NEUTRON_ENABLED: @@ -52,8 +52,8 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, self.floating_ip_pool = ( self.get_floating_ip_pool_id_for_neutron_net()) - self.cdh_config.IMAGE_ID, self.cdh_config.SSH_USERNAME = ( - self.get_image_id_and_ssh_username(self.cdh_config)) + self.plugin_config.IMAGE_ID, self.plugin_config.SSH_USERNAME = ( + self.get_image_id_and_ssh_username(self.plugin_config)) self.volumes_per_node = 0 self.volumes_size = 0 @@ -65,7 +65,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, def _create_nm_dn_ng_template(self): template = { 'name': 'test-node-group-template-cdh-nm-dn', - 'plugin_config': self.cdh_config, + 'plugin_config': self.plugin_config, 'description': 'test node group template for CDH plugin', 'node_processes': ['NODEMANAGER', 'DATANODE'], 'floating_ip_pool': self.floating_ip_pool, @@ -79,7 +79,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, def _create_nm_ng_template(self): template = { 'name': 'test-node-group-template-cdh-nm', - 'plugin_config': self.cdh_config, + 'plugin_config': self.plugin_config, 'description': 'test node group template for CDH plugin', 'volumes_per_node': self.volumes_per_node, 'volumes_size': self.volumes_size, @@ -95,7 +95,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, def _create_dn_ng_template(self): template = { 'name': 'test-node-group-template-cdh-dn', - 'plugin_config': self.cdh_config, + 'plugin_config': self.plugin_config, 'description': 'test node group template for CDH plugin', 'volumes_per_node': self.volumes_per_node, 'volumes_size': self.volumes_size, @@ -111,24 +111,24 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, def _create_cluster_template(self): cl_config = { 'general': { - 'CDH5 repo list URL': self.cdh_config.CDH_REPO_LIST_URL, - 'CM5 repo list URL': self.cdh_config.CM_REPO_LIST_URL, + 'CDH5 repo list URL': self.plugin_config.CDH_REPO_LIST_URL, + 'CM5 repo list URL': self.plugin_config.CM_REPO_LIST_URL, 'CDH5 repo key URL (for debian-based only)': - self.cdh_config.CDH_APT_KEY_URL, + self.plugin_config.CDH_APT_KEY_URL, 'CM5 repo key URL (for debian-based only)': - self.cdh_config.CM_APT_KEY_URL, + self.plugin_config.CM_APT_KEY_URL, 'Enable Swift': True } } template = { 'name': 'test-cluster-template-cdh', - 'plugin_config': self.cdh_config, + 'plugin_config': self.plugin_config, 'description': 'test cluster template for CDH plugin', 'cluster_configs': cl_config, 'node_groups': [ { 'name': 'manager-node', - 'flavor_id': self.cdh_config.MANAGERNODE_FLAVOR, + 'flavor_id': self.plugin_config.MANAGERNODE_FLAVOR, 'node_processes': ['MANAGER'], 'floating_ip_pool': self.floating_ip_pool, 'auto_security_group': True, @@ -175,10 +175,10 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, @b.errormsg("Failure while cluster creation: ") def _create_cluster(self): cluster_name = '%s-%s' % (self.common_config.CLUSTER_NAME, - self.cdh_config.PLUGIN_NAME) + self.plugin_config.PLUGIN_NAME) cluster = { 'name': cluster_name, - 'plugin_config': self.cdh_config, + 'plugin_config': self.plugin_config, 'cluster_template_id': self.cluster_template_id, 'description': 'test cluster', 'cluster_configs': { @@ -189,9 +189,9 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, } self.cluster_id = self.create_cluster(**cluster) self.poll_cluster_state(self.cluster_id) - self.cluster_info = self.get_cluster_info(self.cdh_config) + self.cluster_info = self.get_cluster_info(self.plugin_config) self.await_active_workers_for_namenode(self.cluster_info['node_info'], - self.cdh_config) + self.plugin_config) @b.errormsg("Failure while Cinder testing: ") def _check_cinder(self): @@ -240,6 +240,9 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, swift_binaries=False, hdfs_local_output=True) + # check hive + yield self.check_edp_hive() + @b.errormsg("Failure while cluster scaling: ") def _check_scaling(self): change_list = [ @@ -272,7 +275,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, self.cluster_info = self.cluster_scaling(self.cluster_info, change_list) self.await_active_workers_for_namenode(self.cluster_info['node_info'], - self.cdh_config) + self.plugin_config) @b.errormsg("Failure while Cinder testing after cluster scaling: ") def _check_cinder_after_scaling(self): @@ -309,7 +312,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, self._check_swift() self._check_edp() - if not self.cdh_config.SKIP_SCALING_TEST: + if not self.plugin_config.SKIP_SCALING_TEST: self._check_scaling() self._check_cinder_after_scaling() self._check_edp_after_scaling() @@ -330,7 +333,7 @@ class CDHGatingTest(cluster_configs.ClusterConfigTest, print("Cloudera Manager node not found") return - self.open_ssh_connection(manager_node, self.cdh_config.SSH_USERNAME) + self.open_ssh_connection(manager_node, self.plugin_config.SSH_USERNAME) try: log = self.execute_command('sudo cat /var/log/cloudera-scm-server/' 'cloudera-scm-server.log')[1]