From 4824ca3dc650021039ad17e3b7e229cecebf9c8c Mon Sep 17 00:00:00 2001 From: Vitaly Gridnev Date: Fri, 4 Sep 2015 16:20:17 +0300 Subject: [PATCH] [CDH] Fix problem with launching Spark jobs Sahara EDP should select SPARK_YARN_HISTORY_SERVER node as the master node for running Spark jobs. Also uploaded spark job to cdh_flow on CI. Closes-bug: 1490012 Change-Id: I09a745b8c84e6cca2fb2924e85d845aafc8e1134 --- etc/scenario/sahara-ci/cdh-5.4.0.yaml.mako | 3 +- etc/scenario/sahara-ci/edp.yaml.mako | 60 +++++++++++++++++++ sahara/plugins/cdh/v5_3_0/edp_engine.py | 2 +- sahara/plugins/cdh/v5_4_0/edp_engine.py | 2 +- .../edp/spark/test_spark_cloudera_v5_3_0.py | 2 +- .../edp/spark/test_spark_cloudera_v5_4_0.py | 2 +- 6 files changed, 66 insertions(+), 5 deletions(-) diff --git a/etc/scenario/sahara-ci/cdh-5.4.0.yaml.mako b/etc/scenario/sahara-ci/cdh-5.4.0.yaml.mako index 26e59d45..4bf0c937 100644 --- a/etc/scenario/sahara-ci/cdh-5.4.0.yaml.mako +++ b/etc/scenario/sahara-ci/cdh-5.4.0.yaml.mako @@ -53,6 +53,7 @@ clusters: - HDFS_SECONDARYNAMENODE - HIVE_METASTORE - HIVE_SERVER2 + - SPARK_YARN_HISTORY_SERVER auto_security_group: true cluster_template: name: cdh540 @@ -71,4 +72,4 @@ clusters: scenario: - run_jobs - sentry - edp_jobs_flow: hadoop_2 + edp_jobs_flow: cdh_flow diff --git a/etc/scenario/sahara-ci/edp.yaml.mako b/etc/scenario/sahara-ci/edp.yaml.mako index 284e8179..2a0c5b8c 100644 --- a/etc/scenario/sahara-ci/edp.yaml.mako +++ b/etc/scenario/sahara-ci/edp.yaml.mako @@ -169,3 +169,63 @@ edp_jobs_flow: args: - 10 - 10 + cdh_flow: + - type: Pig + input_datasource: + type: swift + source: etc/edp-examples/edp-pig/trim-spaces/data/input + output_datasource: + type: hdfs + destination: /user/hadoop/edp-output + main_lib: + type: swift + source: etc/edp-examples/edp-pig/trim-spaces/example.pig + additional_libs: + - type: swift + source: etc/edp-examples/edp-pig/trim-spaces/udf.jar + - type: MapReduce + input_datasource: + type: swift + source: etc/edp-examples/edp-pig/trim-spaces/data/input + output_datasource: + type: hdfs + destination: /user/hadoop/edp-output + additional_libs: + - type: database + source: etc/edp-examples/edp-mapreduce/edp-mapreduce.jar + configs: + mapred.mapper.class: org.apache.oozie.example.SampleMapper + mapred.reducer.class: org.apache.oozie.example.SampleReducer + - type: MapReduce.Streaming + input_datasource: + type: swift + source: etc/edp-examples/edp-pig/trim-spaces/data/input + output_datasource: + type: hdfs + destination: /user/hadoop/edp-output + configs: + edp.streaming.mapper: /bin/cat + edp.streaming.reducer: /usr/bin/wc + - type: Java + additional_libs: + - type: database + source: etc/edp-examples/hadoop2/edp-java/hadoop-mapreduce-examples-2.6.0.jar + configs: + edp.java.main_class: org.apache.hadoop.examples.QuasiMonteCarlo + args: + - 10 + - 10 + - type: Spark + input_datasource: + type: swift + source: etc/edp-examples/edp-spark/sample_input.txt + main_lib: + type: database + source: etc/edp-examples/edp-spark/spark-wordcount.jar + configs: + edp.java.main_class: sahara.edp.spark.SparkWordCount + edp.spark.adapt_for_swift: true + fs.swift.service.sahara.username: ${OS_USERNAME} + fs.swift.service.sahara.password: ${OS_PASSWORD} + args: + - '{input_datasource}' diff --git a/sahara/plugins/cdh/v5_3_0/edp_engine.py b/sahara/plugins/cdh/v5_3_0/edp_engine.py index b793a310..000d6477 100644 --- a/sahara/plugins/cdh/v5_3_0/edp_engine.py +++ b/sahara/plugins/cdh/v5_3_0/edp_engine.py @@ -79,7 +79,7 @@ class EdpSparkEngine(edp_spark_engine.SparkJobEngine): def __init__(self, cluster): super(EdpSparkEngine, self).__init__(cluster) - self.master = u.get_instance(cluster, "CLOUDERA_MANAGER") + self.master = u.get_instance(cluster, "SPARK_YARN_HISTORY_SERVER") self.plugin_params["spark-user"] = "sudo -u spark " self.plugin_params["spark-submit"] = "spark-submit" self.plugin_params["deploy-mode"] = "cluster" diff --git a/sahara/plugins/cdh/v5_4_0/edp_engine.py b/sahara/plugins/cdh/v5_4_0/edp_engine.py index d12f7901..0ee48637 100644 --- a/sahara/plugins/cdh/v5_4_0/edp_engine.py +++ b/sahara/plugins/cdh/v5_4_0/edp_engine.py @@ -82,7 +82,7 @@ class EdpSparkEngine(edp_spark_engine.SparkJobEngine): def __init__(self, cluster): super(EdpSparkEngine, self).__init__(cluster) - self.master = u.get_instance(cluster, "CLOUDERA_MANAGER") + self.master = u.get_instance(cluster, "SPARK_YARN_HISTORY_SERVER") self.plugin_params["spark-user"] = "sudo -u spark " self.plugin_params["spark-submit"] = "spark-submit" self.plugin_params["deploy-mode"] = "cluster" diff --git a/sahara/tests/unit/service/edp/spark/test_spark_cloudera_v5_3_0.py b/sahara/tests/unit/service/edp/spark/test_spark_cloudera_v5_3_0.py index f27b88fc..7d17f427 100644 --- a/sahara/tests/unit/service/edp/spark/test_spark_cloudera_v5_3_0.py +++ b/sahara/tests/unit/service/edp/spark/test_spark_cloudera_v5_3_0.py @@ -20,7 +20,7 @@ from sahara.tests.unit.service.edp.spark import base as tests class TestClouderaPlugin(tests.TestSpark): def setUp(self): super(TestClouderaPlugin, self).setUp() - self.master_host = "CLOUDERA_MANAGER" + self.master_host = "SPARK_YARN_HISTORY_SERVER" self.engine_class = edp_engine.EdpSparkEngine self.spark_user = "sudo -u spark " self.spark_submit = "spark-submit" diff --git a/sahara/tests/unit/service/edp/spark/test_spark_cloudera_v5_4_0.py b/sahara/tests/unit/service/edp/spark/test_spark_cloudera_v5_4_0.py index a421b59f..bcca7bb3 100644 --- a/sahara/tests/unit/service/edp/spark/test_spark_cloudera_v5_4_0.py +++ b/sahara/tests/unit/service/edp/spark/test_spark_cloudera_v5_4_0.py @@ -20,7 +20,7 @@ from sahara.tests.unit.service.edp.spark import base as tests class TestClouderaPlugin(tests.TestSpark): def setUp(self): super(TestClouderaPlugin, self).setUp() - self.master_host = "CLOUDERA_MANAGER" + self.master_host = "SPARK_YARN_HISTORY_SERVER" self.engine_class = edp_engine.EdpSparkEngine self.spark_user = "sudo -u spark " self.spark_submit = "spark-submit"