diff --git a/devstack/files/monasca-transform/monasca-transform.conf b/devstack/files/monasca-transform/monasca-transform.conf index e907cf1..0584879 100644 --- a/devstack/files/monasca-transform/monasca-transform.conf +++ b/devstack/files/monasca-transform/monasca-transform.conf @@ -63,7 +63,7 @@ service_log_filename=monasca-transform.log spark_event_logging_enabled = true # A list of jars which Spark should use -spark_jars_list = /opt/spark/current/lib/spark-streaming-kafka_2.10-1.6.3.jar,/opt/spark/current/lib/scala-library-2.10.1.jar,/opt/spark/current/lib/kafka_2.10-0.8.1.1.jar,/opt/spark/current/lib/metrics-core-2.2.0.jar,/opt/spark/current/lib/drizzle-jdbc-1.3.jar +spark_jars_list = /opt/spark/current/assembly/target/scala-2.10/jars/spark-streaming-kafka-0-8_2.10-2.2.0.jar,/opt/spark/current/assembly/target/scala-2.10/jars/scala-library-2.10.6.jar,/opt/spark/current/assembly/target/scala-2.10/jars/kafka_2.10-0.8.1.1.jar,/opt/spark/current/assembly/target/scala-2.10/jars/metrics-core-2.2.0.jar,/opt/spark/current/assembly/target/scala-2.10/jars/drizzle-jdbc-1.3.jar # A list of where the Spark master(s) should run spark_master_list = spark://localhost:7077 diff --git a/devstack/files/spark/spark-defaults.conf b/devstack/files/spark/spark-defaults.conf index 32c5071..23ee2f9 100644 --- a/devstack/files/spark/spark-defaults.conf +++ b/devstack/files/spark/spark-defaults.conf @@ -1,5 +1,5 @@ -spark.driver.extraClassPath /opt/spark/current/lib/drizzle-jdbc-1.3.jar -spark.executor.extraClassPath /opt/spark/current/lib/drizzle-jdbc-1.3.jar +spark.driver.extraClassPath /opt/spark/current/assembly/target/scala-2.10/jars/drizzle-jdbc-1.3.jar +spark.executor.extraClassPath /opt/spark/current/assembly/target/scala-2.10/jars/drizzle-jdbc-1.3.jar spark.blockManager.port 7100 spark.broadcast.port 7105 diff --git a/devstack/files/spark/spark-worker-env.sh b/devstack/files/spark/spark-worker-env.sh index 6938bbd..effc202 100644 --- a/devstack/files/spark/spark-worker-env.sh +++ b/devstack/files/spark/spark-worker-env.sh @@ -11,7 +11,7 @@ export SPARK_WORKER_WEBUI_PORT=18081 export SPARK_WORKER_DIR=/var/run/spark/work export SPARK_WORKER_MEMORY=2g -export SPARK_WORKER_CORES=2 +export SPARK_WORKER_CORES=1 export SPARK_WORKER_OPTS="$SPARK_WORKER_OPTS -Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=900 -Dspark.worker.cleanup.appDataTtl=1*24*3600" export SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Dspark.history.fs.logDirectory=file://var/log/spark/events -Dspark.history.ui.port=18082" export SPARK_LOG_DIR=/var/log/spark diff --git a/devstack/files/spark/start-spark-master.sh b/devstack/files/spark/start-spark-master.sh index 0073dab..fca4e99 100644 --- a/devstack/files/spark/start-spark-master.sh +++ b/devstack/files/spark/start-spark-master.sh @@ -2,8 +2,13 @@ . /opt/spark/current/conf/spark-env.sh export EXEC_CLASS=org.apache.spark.deploy.master.Master export INSTANCE_ID=1 -export SPARK_CLASSPATH=/opt/spark/current/conf/:/opt/spark/current/lib/spark-assembly-1.6.3-hadoop2.6.0.jar:/opt/spark/current/lib/datanucleus-core-3.2.10.jar:/opt/spark/current/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark/current/lib/datanucleus-api-jdo-3.2.6.jar +export SPARK_CLASSPATH=/etc/spark/conf/:/opt/spark/current/assembly/target/scala-2.10/jars/* export log="$SPARK_LOG_DIR/spark-spark-"$EXEC_CLASS"-"$INSTANCE_ID"-127.0.0.1.out" export SPARK_HOME=/opt/spark/current -/usr/bin/java -cp $SPARK_CLASSPATH $SPARK_DAEMON_JAVA_OPTS -Xms1g -Xmx1g -XX:MaxPermSize=256m "$EXEC_CLASS" --ip "$SPARK_MASTER_IP" --port "$SPARK_MASTER_PORT" --webui-port "$SPARK_MASTER_WEBUI_PORT" +# added for spark 2 +export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}" +export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}" +export SPARK_SCALA_VERSION="2.10" + +/usr/bin/java -cp "$SPARK_CLASSPATH" $SPARK_DAEMON_JAVA_OPTS -Xms1g -Xmx1g "$EXEC_CLASS" --ip "$SPARK_MASTER_IP" --port "$SPARK_MASTER_PORT" --webui-port "$SPARK_MASTER_WEBUI_PORT" --properties-file "/etc/spark/conf/spark-defaults.conf" diff --git a/devstack/files/spark/start-spark-worker.sh b/devstack/files/spark/start-spark-worker.sh index 9111b4f..ea8514b 100644 --- a/devstack/files/spark/start-spark-worker.sh +++ b/devstack/files/spark/start-spark-worker.sh @@ -1,9 +1,17 @@ #!/usr/bin/env bash -. /opt/spark/current/conf/spark-env.sh +. /opt/spark/current/conf/spark-worker-env.sh export EXEC_CLASS=org.apache.spark.deploy.worker.Worker export INSTANCE_ID=1 -export SPARK_CLASSPATH=/opt/spark/current/conf/:/opt/spark/current/lib/spark-assembly-1.6.3-hadoop2.6.0.jar:/opt/spark/current/lib/datanucleus-core-3.2.10.jar:/opt/spark/current/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark/current/lib/datanucleus-api-jdo-3.2.6.jar +export SPARK_CLASSPATH=/etc/spark/conf/:/opt/spark/current/assembly/target/scala-2.10/jars/* export log="$SPARK_LOG_DIR/spark-spark-"$EXEC_CLASS"-"$INSTANCE_ID"-127.0.0.1.out" export SPARK_HOME=/opt/spark/current -/usr/bin/java -cp $SPARK_CLASSPATH $SPARK_DAEMON_JAVA_OPTS -Xms1g -Xmx1g -XX:MaxPermSize=256m "$EXEC_CLASS" --webui-port "$SPARK_WORKER_WEBUI_PORT" --port $SPARK_WORKER_PORT $SPARK_MASTERS +# added for spark 2.1.1 +export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}" +export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}" +export SPARK_SCALA_VERSION="2.10" + +/usr/bin/java -cp "$SPARK_CLASSPATH" $SPARK_DAEMON_JAVA_OPTS -Xms1g -Xmx1g "$EXEC_CLASS" --host $SPARK_LOCAL_IP --cores $SPARK_WORKER_CORES --memory $SPARK_WORKER_MEMORY --port "$SPARK_WORKER_PORT" -d "$SPARK_WORKER_DIR" --webui-port "$SPARK_WORKER_WEBUI_PORT" --properties-file "/etc/spark/conf/spark-defaults.conf" spark://$SPARK_MASTERS + + + diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 8ab5276..a002718 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -1,4 +1,3 @@ - # (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP # Copyright 2016 FUJITSU LIMITED # @@ -60,6 +59,13 @@ function pre_install_spark { SPARK_LIB_NAME=`echo ${SPARK_JAVA_LIB} | sed 's/.*\///'` download_through_cache ${MAVEN_REPO}/${SPARK_JAVA_LIB} ${SPARK_LIB_NAME} done + + for SPARK_JAR in "${SPARK_JARS[@]}" + do + SPARK_JAR_NAME=`echo ${SPARK_JAR} | sed 's/.*\///'` + download_through_cache ${MAVEN_REPO}/${SPARK_JAR} ${SPARK_JAR_NAME} + done + download_through_cache ${APACHE_MIRROR}/spark/spark-${SPARK_VERSION}/${SPARK_TARBALL_NAME} ${SPARK_TARBALL_NAME} 1000 @@ -67,7 +73,7 @@ function pre_install_spark { function install_java_libs { - pushd /opt/spark/current/lib + pushd /opt/spark/current/assembly/target/scala-2.10/jars/ for SPARK_JAVA_LIB in "${SPARK_JAVA_LIBS[@]}" do SPARK_LIB_NAME=`echo ${SPARK_JAVA_LIB} | sed 's/.*\///'` @@ -76,15 +82,27 @@ function install_java_libs { popd } -function link_spark_streaming_lib { +function install_spark_jars { - pushd /opt/spark/current/lib - ln -sf spark-streaming-kafka.jar spark-streaming-kafka_2.10-1.6.3.jar + # create a directory for jars + mkdir -p /opt/spark/current/assembly/target/scala-2.10/jars + + # copy jars to new location + pushd /opt/spark/current/assembly/target/scala-2.10/jars + for SPARK_JAR in "${SPARK_JARS[@]}" + do + SPARK_JAR_NAME=`echo ${SPARK_JAR} | sed 's/.*\///'` + copy_from_cache ${SPARK_JAR_NAME} + done + + # copy all jars except spark and scala to assembly/target/scala_2.10/jars + find /opt/spark/current/jars/ -type f ! \( -iname 'spark*' -o -iname 'scala*' -o -iname 'jackson-module-scala*' -o -iname 'json4s-*' -o -iname 'breeze*' -o -iname 'spire*' -o -iname 'macro-compat*' -o -iname 'shapeless*' -o -iname 'machinist*' -o -iname 'chill*' \) -exec cp {} . \; + + # rename jars directory + mv /opt/spark/current/jars/ /opt/spark/current/jars_original popd - } - function copy_from_cache { resource_name=$1 target_directory=${2:-"./."} @@ -341,6 +359,8 @@ function install_spark { ln -sf /opt/spark/${SPARK_HADOOP_VERSION} /opt/spark/current + install_spark_jars + install_java_libs create_spark_directories diff --git a/devstack/settings b/devstack/settings index 87e53b6..4dce43e 100644 --- a/devstack/settings +++ b/devstack/settings @@ -37,8 +37,8 @@ enable_service spark-worker # spark vars SPARK_DIRECTORIES=("/var/spark" "/var/log/spark" "/var/log/spark/events" "/var/run/spark" "/var/run/spark/work" "/etc/spark/conf" "/etc/spark/init" ) -SPARK_VERSION=${SPARK_VERSION:-1.6.3} -HADOOP_VERSION=${HADOOP_VERSION:-2.6} +SPARK_VERSION=${SPARK_VERSION:-2.2.0} +HADOOP_VERSION=${HADOOP_VERSION:-2.7} SPARK_HADOOP_VERSION=spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION SPARK_TARBALL_NAME=${SPARK_HADOOP_VERSION}.tgz MAVEN_REPO=${MAVEN_REPO:-https://repo1.maven.org/maven2} @@ -48,7 +48,10 @@ APACHE_MIRROR=${APACHE_MIRROR:-http://archive.apache.org/dist/} BASE_KAFKA_VERSION=${BASE_KAFKA_VERSION:-0.8.1.1} SCALA_VERSION=${SCALA_VERSION:-2.10} KAFKA_VERSION=${KAFKA_VERSION:-${SCALA_VERSION}-${BASE_KAFKA_VERSION}} -SPARK_JAVA_LIBS=("org/apache/kafka/kafka_2.10/0.8.1.1/kafka_2.10-0.8.1.1.jar" "org/scala-lang/scala-library/2.10.1/scala-library-2.10.1.jar" "com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar" "org/apache/spark/spark-streaming-kafka_2.10/${SPARK_VERSION}/spark-streaming-kafka_2.10-${SPARK_VERSION}.jar" "org/drizzle/jdbc/drizzle-jdbc/1.3/drizzle-jdbc-1.3.jar") +SPARK_JAVA_LIBS=("org/apache/kafka/kafka_2.10/0.8.1.1/kafka_2.10-0.8.1.1.jar" "com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar" "org/scala-lang/scala-library/2.10.6/scala-library-2.10.6.jar" "org/scala-lang/scala-compiler/2.10.6/scala-compiler-2.10.6.jar" "org/scala-lang/scala-reflect/2.10.6/scala-reflect-2.10.6.jar" "org/scala-lang/scalap/2.10.6/scalap-2.10.6.jar" "org/apache/spark/spark-streaming-kafka-0-8_2.10/${SPARK_VERSION}/spark-streaming-kafka-0-8_2.10-${SPARK_VERSION}.jar" "org/drizzle/jdbc/drizzle-jdbc/1.3/drizzle-jdbc-1.3.jar" "com/fasterxml/jackson/module/jackson-module-scala_2.10/2.6.5/jackson-module-scala_2.10-2.6.5.jar" "org/json4s/json4s-jackson_2.10/3.2.11/json4s-jackson_2.10-3.2.11.jar" "org/json4s/json4s-core_2.10/3.2.11/json4s-core_2.10-3.2.11.jar" "org/json4s/json4s-ast_2.10/3.2.11/json4s-ast_2.10-3.2.11.jar" "org/scalanlp/breeze-macros_2.10/0.13.1/breeze-macros_2.10-0.13.1.jar" "org/spire-math/spire_2.10/0.13.0/spire_2.10-0.13.0.jar" "org/typelevel/macro-compat_2.10/1.1.1/macro-compat_2.10-1.1.1.jar" "com/chuusai/shapeless_2.10/2.3.2/shapeless_2.10-2.3.2.jar" "org/spire-math/spire-macros_2.10/0.13.0/spire-macros_2.10-0.13.0.jar" "org/typelevel/machinist_2.10/0.6.1/machinist_2.10-0.6.1.jar" "org/scalanlp/breeze_2.10/0.13.1/breeze_2.10-0.13.1.jar" "com/twitter/chill_2.10/0.8.0/chill_2.10-0.8.0.jar" "com/twitter/chill-java/0.8.0/chill-java-0.8.0.jar") + +# Get Spark 2.2 jars compiled with Scala 2.10 from mvn +SPARK_JARS=("org/apache/spark/spark-catalyst_2.10/${SPARK_VERSION}/spark-catalyst_2.10-2.2.0.jar" "org/apache/spark/spark-core_2.10/${SPARK_VERSION}/spark-core_2.10-2.2.0.jar" "org/apache/spark/spark-graphx_2.10/${SPARK_VERSION}/spark-graphx_2.10-2.2.0.jar" "org/apache/spark/spark-launcher_2.10/${SPARK_VERSION}/spark-launcher_2.10-2.2.0.jar" "org/apache/spark/spark-mllib_2.10/${SPARK_VERSION}/spark-mllib_2.10-2.2.0.jar" "org/apache/spark/spark-mllib-local_2.10/${SPARK_VERSION}/spark-mllib-local_2.10-2.2.0.jar" "org/apache/spark/spark-network-common_2.10/${SPARK_VERSION}/spark-network-common_2.10-2.2.0.jar" "org/apache/spark/spark-network-shuffle_2.10/${SPARK_VERSION}/spark-network-shuffle_2.10-2.2.0.jar" "org/apache/spark/spark-repl_2.10/${SPARK_VERSION}/spark-repl_2.10-2.2.0.jar" "org/apache/spark/spark-sketch_2.10/${SPARK_VERSION}/spark-sketch_2.10-2.2.0.jar" "org/apache/spark/spark-sql_2.10/${SPARK_VERSION}/spark-sql_2.10-2.2.0.jar" "org/apache/spark/spark-streaming_2.10/${SPARK_VERSION}/spark-streaming_2.10-2.2.0.jar" "org/apache/spark/spark-tags_2.10/${SPARK_VERSION}/spark-tags_2.10-2.2.0.jar" "org/apache/spark/spark-unsafe_2.10/${SPARK_VERSION}/spark-unsafe_2.10-2.2.0.jar" "org/apache/spark/spark-yarn_2.10/${SPARK_VERSION}/spark-yarn_2.10-2.2.0.jar") # monasca-api stuff diff --git a/etc/monasca-transform.conf b/etc/monasca-transform.conf index 111ff37..d4aac15 100644 --- a/etc/monasca-transform.conf +++ b/etc/monasca-transform.conf @@ -67,10 +67,10 @@ service_log_filename=monasca-transform.log spark_event_logging_enabled = true # A list of jars which Spark should use -spark_jars_list = /opt/spark/current/lib/spark-streaming-kafka.jar,/opt/spark/current/lib/scala-library-2.10.1.jar,/opt/spark/current/lib/kafka_2.10-0.8.1.1.jar,/opt/spark/current/lib/metrics-core-2.2.0.jar,/usr/share/java/mysql.jar +spark_jars_list = /opt/spark/current/assembly/target/scala-2.10/jars/spark-streaming-kafka-0-8_2.10-2.1.1.jar,/opt/spark/current/assembly/target/scala-2.10/jars/scala-library-2.10.6.jar,/opt/spark/current/assembly/target/scala-2.10/jars/kafka_2.10-0.8.1.1.jar,/opt/spark/current/assembly/target/scala-2.10/jars/metrics-core-2.2.0.jar,/opt/spark/current/assembly/target/scala-2.10/jars/drizzle-jdbc-1.3.jar # A list of where the Spark master(s) should run -spark_master_list = spark://192.168.10.4:7077,192.168.10.5:7077 +spark_master_list = spark://localhost:7077 # spark_home for the environment spark_home = /opt/spark/current diff --git a/monasca_transform/component/usage/fetch_quantity.py b/monasca_transform/component/usage/fetch_quantity.py index 303aea7..2622268 100644 --- a/monasca_transform/component/usage/fetch_quantity.py +++ b/monasca_transform/component/usage/fetch_quantity.py @@ -486,7 +486,7 @@ class FetchQuantity(UsageComponent): grouped_data = record_store_df_int.groupBy(*group_by_columns_list) grouped_record_store_df = grouped_data.agg(agg_operations_map) - grouped_data_rdd_with_operation = grouped_record_store_df.map( + grouped_data_rdd_with_operation = grouped_record_store_df.rdd.map( lambda x: GroupedDataWithOperation(x, str(usage_fetch_operation))) diff --git a/monasca_transform/data_driven_specs/mysql_data_driven_specs_repo.py b/monasca_transform/data_driven_specs/mysql_data_driven_specs_repo.py index da1b747..e6c75c3 100644 --- a/monasca_transform/data_driven_specs/mysql_data_driven_specs_repo.py +++ b/monasca_transform/data_driven_specs/mysql_data_driven_specs_repo.py @@ -56,7 +56,7 @@ class MySQLDataDrivenSpecsRepo(DataDrivenSpecsRepo): spec = json.loads(item['transform_spec']) data.append(json.dumps(spec)) - data_frame = sql_context.jsonRDD(spark_context.parallelize(data)) + data_frame = sql_context.read.json(spark_context.parallelize(data)) self.transform_specs_data_frame = data_frame def generate_pre_transform_specs_data_frame(self, spark_context=None, @@ -72,5 +72,5 @@ class MySQLDataDrivenSpecsRepo(DataDrivenSpecsRepo): spec = json.loads(item['pre_transform_spec']) data.append(json.dumps(spec)) - data_frame = sql_context.jsonRDD(spark_context.parallelize(data)) + data_frame = sql_context.read.json(spark_context.parallelize(data)) self.pre_transform_specs_data_frame = data_frame diff --git a/monasca_transform/driver/mon_metrics_kafka.py b/monasca_transform/driver/mon_metrics_kafka.py index abc83af..e2a0724 100644 --- a/monasca_transform/driver/mon_metrics_kafka.py +++ b/monasca_transform/driver/mon_metrics_kafka.py @@ -324,7 +324,7 @@ class MonMetricsKafkaProcessor(object): # filter out unwanted metrics and keep metrics we are interested in # cond = [ - raw_mon_metrics_df.metric.name == + raw_mon_metrics_df.metric["name"] == pre_transform_specs_df.event_type] filtered_metrics_df = raw_mon_metrics_df.join( pre_transform_specs_df, cond) diff --git a/monasca_transform/transform/transform_utils.py b/monasca_transform/transform/transform_utils.py index f4aafc1..10527e5 100644 --- a/monasca_transform/transform/transform_utils.py +++ b/monasca_transform/transform/transform_utils.py @@ -78,7 +78,7 @@ class InstanceUsageUtils(TransformUtils): def create_df_from_json_rdd(sql_context, jsonrdd): """create instance usage df from json rdd.""" schema = InstanceUsageUtils._get_instance_usage_schema() - instance_usage_schema_df = sql_context.jsonRDD(jsonrdd, schema) + instance_usage_schema_df = sql_context.read.json(jsonrdd, schema) return instance_usage_schema_df @@ -269,7 +269,7 @@ class MonMetricUtils(TransformUtils): def create_mon_metrics_df_from_json_rdd(sql_context, jsonrdd): """create mon metrics df from json rdd.""" schema = MonMetricUtils._get_mon_metric_json_schema() - mon_metrics_df = sql_context.jsonRDD(jsonrdd, schema) + mon_metrics_df = sql_context.read.json(jsonrdd, schema) return mon_metrics_df diff --git a/tests/functional/__init__.py b/tests/functional/__init__.py index 0eb8ce5..793a5d4 100644 --- a/tests/functional/__init__.py +++ b/tests/functional/__init__.py @@ -20,7 +20,7 @@ import sys try: sys.path.append(os.path.join("/opt/spark/current", "python")) sys.path.append(os.path.join("/opt/spark/current", - "python", "lib", "py4j-0.9-src.zip")) + "python", "lib", "py4j-0.10.4-src.zip")) except KeyError: print("Error adding Spark location to the path") # TODO(someone) not sure what action is appropriate diff --git a/tests/functional/data_driven_specs/__init__.py b/tests/functional/data_driven_specs/__init__.py index 008647e..233d9ec 100644 --- a/tests/functional/data_driven_specs/__init__.py +++ b/tests/functional/data_driven_specs/__init__.py @@ -18,7 +18,7 @@ import sys try: sys.path.append(os.path.join("/opt/spark/current", "python")) sys.path.append(os.path.join("/opt/spark/current", - "python", "lib", "py4j-0.9-src.zip")) + "python", "lib", "py4j-0.10.4-src.zip")) except KeyError: print("Error adding Spark location to the path") sys.exit(1) diff --git a/tests/functional/driver/__init__.py b/tests/functional/driver/__init__.py index 008647e..233d9ec 100644 --- a/tests/functional/driver/__init__.py +++ b/tests/functional/driver/__init__.py @@ -18,7 +18,7 @@ import sys try: sys.path.append(os.path.join("/opt/spark/current", "python")) sys.path.append(os.path.join("/opt/spark/current", - "python", "lib", "py4j-0.9-src.zip")) + "python", "lib", "py4j-0.10.4-src.zip")) except KeyError: print("Error adding Spark location to the path") sys.exit(1) diff --git a/tox.ini b/tox.ini index 1855f33..c302633 100644 --- a/tox.ini +++ b/tox.ini @@ -46,6 +46,7 @@ basepython = python2.7 install_command = {[testenv]install_command} setenv = {[testenv]setenv} SPARK_HOME=/opt/spark/current + SPARK_SCALA_VERSION=2.10 OS_TEST_PATH=tests/functional whitelist_externals = {[testenv]whitelist_externals}