Add Spark integration test
This integration test includes spinning up a cluster, running an EDP job, and deleting the cluster. Cluster scaling is not included currently. The Spark example included is SparkPi. Implements: blueprint edp-spark-integration-tests Change-Id: I9cdb3ab29d3364024f5cbe42b3cf4bae398cd547
This commit is contained in:
parent
90187b0322
commit
bab13bc26d
2
etc/edp-examples/edp-spark/NOTICE.txt
Normal file
2
etc/edp-examples/edp-spark/NOTICE.txt
Normal file
@ -0,0 +1,2 @@
|
||||
This example includes software developed by The Apache Software
|
||||
Foundation (http://www.apache.org/).
|
8
etc/edp-examples/edp-spark/README.rst
Normal file
8
etc/edp-examples/edp-spark/README.rst
Normal file
@ -0,0 +1,8 @@
|
||||
Example Spark Job
|
||||
=================
|
||||
|
||||
This example contains the compiled classes for SparkPi extracted from
|
||||
the example jar distributed with Apache Spark version 1.0.0.
|
||||
|
||||
SparkPi example estimates Pi. It can take a single optional integer
|
||||
argument specifying the number of slices (tasks) to use.
|
BIN
etc/edp-examples/edp-spark/spark-example.jar
Normal file
BIN
etc/edp-examples/edp-spark/spark-example.jar
Normal file
Binary file not shown.
@ -547,6 +547,71 @@ HDP2_CONFIG_OPTS = [
|
||||
]
|
||||
|
||||
|
||||
SPARK_CONFIG_GROUP = cfg.OptGroup(name='SPARK')
|
||||
SPARK_CONFIG_OPTS = [
|
||||
cfg.StrOpt('PLUGIN_NAME',
|
||||
default='spark',
|
||||
help='Name of plugin.'),
|
||||
cfg.StrOpt('IMAGE_ID',
|
||||
default=None,
|
||||
help='ID for image which is used for cluster creation. Also '
|
||||
'you can specify image name or tag of image instead of '
|
||||
'image ID. If you do not specify image related '
|
||||
'parameters, then image for cluster creation will be '
|
||||
'chosen by tag "sahara_i_tests".'),
|
||||
cfg.StrOpt('IMAGE_NAME',
|
||||
default=None,
|
||||
help='Name for image which is used for cluster creation. Also '
|
||||
'you can specify image ID or tag of image instead of '
|
||||
'image name. If you do not specify image related '
|
||||
'parameters, then image for cluster creation will be '
|
||||
'chosen by tag "sahara_i_tests".'),
|
||||
cfg.StrOpt('IMAGE_TAG',
|
||||
default=None,
|
||||
help='Tag for image which is used for cluster creation. Also '
|
||||
'you can specify image ID or image name instead of tag of '
|
||||
'image. If you do not specify image related parameters, '
|
||||
'then image for cluster creation will be chosen by '
|
||||
'tag "sahara_i_tests".'),
|
||||
cfg.ListOpt('MASTER_NODE_PROCESSES',
|
||||
default=['namenode', 'master'],
|
||||
help='A list of processes that will be launched '
|
||||
'on master node'),
|
||||
cfg.ListOpt('WORKER_NODE_PROCESSES',
|
||||
default=['datanode', 'slave'],
|
||||
help='A list of processes that will be launched '
|
||||
'on worker nodes'),
|
||||
cfg.StrOpt('HADOOP_VERSION',
|
||||
default='1.0.0',
|
||||
help='Version of Spark (even though it says "HADOOP".'),
|
||||
cfg.StrOpt('HADOOP_USER',
|
||||
default='hdfs',
|
||||
help='Username which is used for access to Hadoop services.'),
|
||||
cfg.DictOpt('HADOOP_PROCESSES_WITH_PORTS',
|
||||
default={
|
||||
'master': 7077,
|
||||
'namenode': 8020,
|
||||
'datanode': 50075
|
||||
},
|
||||
help='Spark process map with ports for spark plugin.'
|
||||
),
|
||||
cfg.DictOpt('PROCESS_NAMES',
|
||||
default={
|
||||
'nn': 'namenode',
|
||||
'tt': 'tasktracker',
|
||||
'dn': 'datanode'
|
||||
},
|
||||
help='Names for namenode, tasktracker and datanode '
|
||||
'processes.'),
|
||||
cfg.BoolOpt('SKIP_ALL_TESTS_FOR_PLUGIN',
|
||||
default=True,
|
||||
help='If this flag is True, then all tests for Spark plugin '
|
||||
'will be skipped.'),
|
||||
cfg.BoolOpt('SKIP_EDP_TEST', default=False),
|
||||
cfg.BoolOpt('SKIP_SCALING_TEST', default=False)
|
||||
]
|
||||
|
||||
|
||||
def register_config(config, config_group, config_opts):
|
||||
config.register_group(config_group)
|
||||
config.register_opts(config_opts, config_group)
|
||||
@ -578,6 +643,7 @@ class ITConfig:
|
||||
register_config(cfg.CONF, HDP2_CONFIG_GROUP, HDP2_CONFIG_OPTS)
|
||||
register_config(
|
||||
cfg.CONF, VANILLA_TWO_CONFIG_GROUP, VANILLA_TWO_CONFIG_OPTS)
|
||||
register_config(cfg.CONF, SPARK_CONFIG_GROUP, SPARK_CONFIG_OPTS)
|
||||
|
||||
cfg.CONF(
|
||||
[], project='Sahara_integration_tests',
|
||||
@ -590,3 +656,4 @@ class ITConfig:
|
||||
self.cdh_config = cfg.CONF.CDH
|
||||
self.hdp_config = cfg.CONF.HDP
|
||||
self.hdp2_config = cfg.CONF.HDP2
|
||||
self.spark_config = cfg.CONF.SPARK
|
||||
|
@ -30,6 +30,8 @@ class EDPJobInfo(object):
|
||||
PIG_PATH = 'etc/edp-examples/pig-job/'
|
||||
JAVA_PATH = 'etc/edp-examples/edp-java/'
|
||||
MAPREDUCE_PATH = 'etc/edp-examples/edp-mapreduce/'
|
||||
SPARK_PATH = 'etc/edp-examples/edp-spark/'
|
||||
|
||||
HADOOP2_JAVA_PATH = 'etc/edp-examples/hadoop2/edp-java/'
|
||||
|
||||
def read_pig_example_script(self):
|
||||
@ -81,6 +83,18 @@ class EDPJobInfo(object):
|
||||
}
|
||||
}
|
||||
|
||||
def read_spark_example_jar(self):
|
||||
return open(self.SPARK_PATH + 'spark-example.jar').read()
|
||||
|
||||
def spark_example_configs(self):
|
||||
return {
|
||||
'configs': {
|
||||
'edp.java.main_class':
|
||||
'org.apache.spark.examples.SparkPi'
|
||||
},
|
||||
'args': ['4']
|
||||
}
|
||||
|
||||
|
||||
class EDPTest(base.ITestCase):
|
||||
def setUp(self):
|
||||
@ -227,7 +241,9 @@ class EDPTest(base.ITestCase):
|
||||
|
||||
# Java jobs don't use data sources. Input/output paths must
|
||||
# be passed as args with corresponding username/password configs
|
||||
if not edp.compare_job_type(job_type, edp.JOB_TYPE_JAVA):
|
||||
if not edp.compare_job_type(job_type,
|
||||
edp.JOB_TYPE_JAVA,
|
||||
edp.JOB_TYPE_SPARK):
|
||||
input_id = self._create_data_source(
|
||||
'input-%s' % str(uuid.uuid4())[:8], 'swift',
|
||||
swift_input_url)
|
||||
@ -265,6 +281,10 @@ class EDPTest(base.ITestCase):
|
||||
if not configs:
|
||||
configs = {}
|
||||
|
||||
# TODO(tmckay): for spark we don't have support for swift
|
||||
# yet. When we do, we'll need something to here to set up
|
||||
# swift paths and we can use a spark wordcount job
|
||||
|
||||
# Append the input/output paths with the swift configs
|
||||
# if the caller has requested it...
|
||||
if edp.compare_job_type(
|
||||
|
@ -195,6 +195,8 @@ class HDP2GatingTest(swift.SwiftTest, scaling.ScalingTest,
|
||||
self._edp_test()
|
||||
|
||||
@testcase.attr('hdp2')
|
||||
@testcase.skipIf(config.SKIP_ALL_TESTS_FOR_PLUGIN,
|
||||
'All tests for HDP2 plugin were skipped')
|
||||
def test_hdp2_plugin_gating(self):
|
||||
self._prepare_test()
|
||||
self._create_rm_nn_ng_template()
|
||||
|
158
sahara/tests/integration/tests/gating/test_spark_gating.py
Normal file
158
sahara/tests/integration/tests/gating/test_spark_gating.py
Normal file
@ -0,0 +1,158 @@
|
||||
# Copyright 2014 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from testtools import testcase
|
||||
|
||||
from sahara.tests.integration.configs import config as cfg
|
||||
from sahara.tests.integration.tests import base as b
|
||||
from sahara.tests.integration.tests import edp
|
||||
from sahara.tests.integration.tests import scaling
|
||||
from sahara.tests.integration.tests import swift
|
||||
from sahara.utils import edp as utils_edp
|
||||
|
||||
|
||||
class SparkGatingTest(swift.SwiftTest, scaling.ScalingTest,
|
||||
edp.EDPTest):
|
||||
|
||||
config = cfg.ITConfig().spark_config
|
||||
SKIP_EDP_TEST = config.SKIP_EDP_TEST
|
||||
|
||||
def setUp(self):
|
||||
super(SparkGatingTest, self).setUp()
|
||||
self.cluster_id = None
|
||||
self.cluster_template_id = None
|
||||
self.ng_template_ids = []
|
||||
|
||||
def _prepare_test(self):
|
||||
self.spark_config = cfg.ITConfig().spark_config
|
||||
self.floating_ip_pool = self.common_config.FLOATING_IP_POOL
|
||||
self.internal_neutron_net = None
|
||||
if self.common_config.NEUTRON_ENABLED:
|
||||
self.internal_neutron_net = self.get_internal_neutron_net_id()
|
||||
self.floating_ip_pool = (
|
||||
self.get_floating_ip_pool_id_for_neutron_net())
|
||||
|
||||
self.spark_config.IMAGE_ID, self.spark_config.SSH_USERNAME = (
|
||||
self.get_image_id_and_ssh_username(self.spark_config))
|
||||
|
||||
@b.errormsg("Failure while 'm-nn' node group template creation: ")
|
||||
def _create_m_nn_ng_template(self):
|
||||
template = {
|
||||
'name': 'test-node-group-template-spark-m-nn',
|
||||
'plugin_config': self.spark_config,
|
||||
'description': 'test node group template for Spark plugin',
|
||||
'node_processes': self.spark_config.MASTER_NODE_PROCESSES,
|
||||
'floating_ip_pool': self.floating_ip_pool,
|
||||
'node_configs': {}
|
||||
}
|
||||
self.ng_tmpl_m_nn_id = self.create_node_group_template(**template)
|
||||
self.ng_template_ids.append(self.ng_tmpl_m_nn_id)
|
||||
|
||||
@b.errormsg("Failure while 's-dn' node group template creation: ")
|
||||
def _create_s_dn_ng_template(self):
|
||||
template = {
|
||||
'name': 'test-node-group-template-spark-s-dn',
|
||||
'plugin_config': self.spark_config,
|
||||
'description': 'test node group template for Spark plugin',
|
||||
'node_processes': self.spark_config.WORKER_NODE_PROCESSES,
|
||||
'floating_ip_pool': self.floating_ip_pool,
|
||||
'node_configs': {}
|
||||
}
|
||||
self.ng_tmpl_s_dn_id = self.create_node_group_template(**template)
|
||||
self.ng_template_ids.append(self.ng_tmpl_s_dn_id)
|
||||
|
||||
@b.errormsg("Failure while cluster template creation: ")
|
||||
def _create_cluster_template(self):
|
||||
template = {
|
||||
'name': 'test-cluster-template-spark',
|
||||
'plugin_config': self.spark_config,
|
||||
'description': 'test cluster template for Spark plugin',
|
||||
'cluster_configs': {
|
||||
},
|
||||
'node_groups': [
|
||||
{
|
||||
'name': 'master-node',
|
||||
'node_group_template_id': self.ng_tmpl_m_nn_id,
|
||||
'count': 1
|
||||
},
|
||||
{
|
||||
'name': 'worker-node',
|
||||
'node_group_template_id': self.ng_tmpl_s_dn_id,
|
||||
'count': 1
|
||||
}
|
||||
],
|
||||
'net_id': self.internal_neutron_net
|
||||
}
|
||||
self.cluster_template_id = self.create_cluster_template(**template)
|
||||
|
||||
@b.errormsg("Failure while cluster creation: ")
|
||||
def _create_cluster(self):
|
||||
cluster_name = '%s-%s' % (self.common_config.CLUSTER_NAME,
|
||||
self.spark_config.PLUGIN_NAME)
|
||||
cluster = {
|
||||
'name': cluster_name,
|
||||
'plugin_config': self.spark_config,
|
||||
'cluster_template_id': self.cluster_template_id,
|
||||
'description': 'test cluster',
|
||||
'cluster_configs': {}
|
||||
}
|
||||
self.create_cluster(**cluster)
|
||||
self.cluster_info = self.get_cluster_info(self.spark_config)
|
||||
self.await_active_workers_for_namenode(self.cluster_info['node_info'],
|
||||
self.spark_config)
|
||||
|
||||
@b.errormsg("Failure while EDP testing: ")
|
||||
def _check_edp(self):
|
||||
self._edp_test()
|
||||
|
||||
def _edp_test(self):
|
||||
# check spark
|
||||
spark_jar = self.edp_info.read_spark_example_jar()
|
||||
spark_configs = self.edp_info.spark_example_configs()
|
||||
self.edp_testing(utils_edp.JOB_TYPE_SPARK,
|
||||
job_data_list=[{'jar': spark_jar}],
|
||||
lib_data_list=[],
|
||||
configs=spark_configs)
|
||||
|
||||
@b.errormsg("Failure while cluster scaling: ")
|
||||
def _check_scaling(self):
|
||||
pass
|
||||
|
||||
@b.errormsg("Failure while EDP testing after cluster scaling: ")
|
||||
def _check_edp_after_scaling(self):
|
||||
# Leave this blank until scaling is implemented
|
||||
pass
|
||||
|
||||
@testcase.attr('spark')
|
||||
@testcase.skipIf(config.SKIP_ALL_TESTS_FOR_PLUGIN,
|
||||
'All tests for Spark plugin were skipped')
|
||||
def test_spark_plugin_gating(self):
|
||||
|
||||
self._prepare_test()
|
||||
self._create_m_nn_ng_template()
|
||||
self._create_s_dn_ng_template()
|
||||
self._create_cluster_template()
|
||||
self._create_cluster()
|
||||
|
||||
self._check_edp()
|
||||
|
||||
if not self.spark_config.SKIP_SCALING_TEST:
|
||||
self._check_scaling()
|
||||
self._check_edp_after_scaling()
|
||||
|
||||
def tearDown(self):
|
||||
self.delete_objects(self.cluster_id, self.cluster_template_id,
|
||||
self.ng_template_ids)
|
||||
super(SparkGatingTest, self).tearDown()
|
Loading…
Reference in New Issue
Block a user