Merge "Add Spark integration test"

This commit is contained in:
Jenkins 2014-08-22 01:51:00 +00:00 committed by Gerrit Code Review
commit 44f37ef189
7 changed files with 258 additions and 1 deletions

View File

@ -0,0 +1,2 @@
This example includes software developed by The Apache Software
Foundation (http://www.apache.org/).

View File

@ -0,0 +1,8 @@
Example Spark Job
=================
This example contains the compiled classes for SparkPi extracted from
the example jar distributed with Apache Spark version 1.0.0.
SparkPi example estimates Pi. It can take a single optional integer
argument specifying the number of slices (tasks) to use.

Binary file not shown.

View File

@ -544,6 +544,71 @@ HDP2_CONFIG_OPTS = [
]
SPARK_CONFIG_GROUP = cfg.OptGroup(name='SPARK')
SPARK_CONFIG_OPTS = [
cfg.StrOpt('PLUGIN_NAME',
default='spark',
help='Name of plugin.'),
cfg.StrOpt('IMAGE_ID',
default=None,
help='ID for image which is used for cluster creation. Also '
'you can specify image name or tag of image instead of '
'image ID. If you do not specify image related '
'parameters, then image for cluster creation will be '
'chosen by tag "sahara_i_tests".'),
cfg.StrOpt('IMAGE_NAME',
default=None,
help='Name for image which is used for cluster creation. Also '
'you can specify image ID or tag of image instead of '
'image name. If you do not specify image related '
'parameters, then image for cluster creation will be '
'chosen by tag "sahara_i_tests".'),
cfg.StrOpt('IMAGE_TAG',
default=None,
help='Tag for image which is used for cluster creation. Also '
'you can specify image ID or image name instead of tag of '
'image. If you do not specify image related parameters, '
'then image for cluster creation will be chosen by '
'tag "sahara_i_tests".'),
cfg.ListOpt('MASTER_NODE_PROCESSES',
default=['namenode', 'master'],
help='A list of processes that will be launched '
'on master node'),
cfg.ListOpt('WORKER_NODE_PROCESSES',
default=['datanode', 'slave'],
help='A list of processes that will be launched '
'on worker nodes'),
cfg.StrOpt('HADOOP_VERSION',
default='1.0.0',
help='Version of Spark (even though it says "HADOOP".'),
cfg.StrOpt('HADOOP_USER',
default='hdfs',
help='Username which is used for access to Hadoop services.'),
cfg.DictOpt('HADOOP_PROCESSES_WITH_PORTS',
default={
'master': 7077,
'namenode': 8020,
'datanode': 50075
},
help='Spark process map with ports for spark plugin.'
),
cfg.DictOpt('PROCESS_NAMES',
default={
'nn': 'namenode',
'tt': 'tasktracker',
'dn': 'datanode'
},
help='Names for namenode, tasktracker and datanode '
'processes.'),
cfg.BoolOpt('SKIP_ALL_TESTS_FOR_PLUGIN',
default=True,
help='If this flag is True, then all tests for Spark plugin '
'will be skipped.'),
cfg.BoolOpt('SKIP_EDP_TEST', default=False),
cfg.BoolOpt('SKIP_SCALING_TEST', default=False)
]
def register_config(config, config_group, config_opts):
config.register_group(config_group)
config.register_opts(config_opts, config_group)
@ -575,6 +640,7 @@ class ITConfig:
register_config(cfg.CONF, HDP2_CONFIG_GROUP, HDP2_CONFIG_OPTS)
register_config(
cfg.CONF, VANILLA_TWO_CONFIG_GROUP, VANILLA_TWO_CONFIG_OPTS)
register_config(cfg.CONF, SPARK_CONFIG_GROUP, SPARK_CONFIG_OPTS)
cfg.CONF(
[], project='Sahara_integration_tests',
@ -587,3 +653,4 @@ class ITConfig:
self.cdh_config = cfg.CONF.CDH
self.hdp_config = cfg.CONF.HDP
self.hdp2_config = cfg.CONF.HDP2
self.spark_config = cfg.CONF.SPARK

View File

@ -30,6 +30,8 @@ class EDPJobInfo(object):
PIG_PATH = 'etc/edp-examples/pig-job/'
JAVA_PATH = 'etc/edp-examples/edp-java/'
MAPREDUCE_PATH = 'etc/edp-examples/edp-mapreduce/'
SPARK_PATH = 'etc/edp-examples/edp-spark/'
HADOOP2_JAVA_PATH = 'etc/edp-examples/hadoop2/edp-java/'
def read_pig_example_script(self):
@ -81,6 +83,18 @@ class EDPJobInfo(object):
}
}
def read_spark_example_jar(self):
return open(self.SPARK_PATH + 'spark-example.jar').read()
def spark_example_configs(self):
return {
'configs': {
'edp.java.main_class':
'org.apache.spark.examples.SparkPi'
},
'args': ['4']
}
class EDPTest(base.ITestCase):
def setUp(self):
@ -227,7 +241,9 @@ class EDPTest(base.ITestCase):
# Java jobs don't use data sources. Input/output paths must
# be passed as args with corresponding username/password configs
if not edp.compare_job_type(job_type, edp.JOB_TYPE_JAVA):
if not edp.compare_job_type(job_type,
edp.JOB_TYPE_JAVA,
edp.JOB_TYPE_SPARK):
input_id = self._create_data_source(
'input-%s' % str(uuid.uuid4())[:8], 'swift',
swift_input_url)
@ -265,6 +281,10 @@ class EDPTest(base.ITestCase):
if not configs:
configs = {}
# TODO(tmckay): for spark we don't have support for swift
# yet. When we do, we'll need something to here to set up
# swift paths and we can use a spark wordcount job
# Append the input/output paths with the swift configs
# if the caller has requested it...
if edp.compare_job_type(

View File

@ -195,6 +195,8 @@ class HDP2GatingTest(swift.SwiftTest, scaling.ScalingTest,
self._edp_test()
@testcase.attr('hdp2')
@testcase.skipIf(config.SKIP_ALL_TESTS_FOR_PLUGIN,
'All tests for HDP2 plugin were skipped')
def test_hdp2_plugin_gating(self):
self._prepare_test()
self._create_rm_nn_ng_template()

View File

@ -0,0 +1,158 @@
# Copyright 2014 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from testtools import testcase
from sahara.tests.integration.configs import config as cfg
from sahara.tests.integration.tests import base as b
from sahara.tests.integration.tests import edp
from sahara.tests.integration.tests import scaling
from sahara.tests.integration.tests import swift
from sahara.utils import edp as utils_edp
class SparkGatingTest(swift.SwiftTest, scaling.ScalingTest,
edp.EDPTest):
config = cfg.ITConfig().spark_config
SKIP_EDP_TEST = config.SKIP_EDP_TEST
def setUp(self):
super(SparkGatingTest, self).setUp()
self.cluster_id = None
self.cluster_template_id = None
self.ng_template_ids = []
def _prepare_test(self):
self.spark_config = cfg.ITConfig().spark_config
self.floating_ip_pool = self.common_config.FLOATING_IP_POOL
self.internal_neutron_net = None
if self.common_config.NEUTRON_ENABLED:
self.internal_neutron_net = self.get_internal_neutron_net_id()
self.floating_ip_pool = (
self.get_floating_ip_pool_id_for_neutron_net())
self.spark_config.IMAGE_ID, self.spark_config.SSH_USERNAME = (
self.get_image_id_and_ssh_username(self.spark_config))
@b.errormsg("Failure while 'm-nn' node group template creation: ")
def _create_m_nn_ng_template(self):
template = {
'name': 'test-node-group-template-spark-m-nn',
'plugin_config': self.spark_config,
'description': 'test node group template for Spark plugin',
'node_processes': self.spark_config.MASTER_NODE_PROCESSES,
'floating_ip_pool': self.floating_ip_pool,
'node_configs': {}
}
self.ng_tmpl_m_nn_id = self.create_node_group_template(**template)
self.ng_template_ids.append(self.ng_tmpl_m_nn_id)
@b.errormsg("Failure while 's-dn' node group template creation: ")
def _create_s_dn_ng_template(self):
template = {
'name': 'test-node-group-template-spark-s-dn',
'plugin_config': self.spark_config,
'description': 'test node group template for Spark plugin',
'node_processes': self.spark_config.WORKER_NODE_PROCESSES,
'floating_ip_pool': self.floating_ip_pool,
'node_configs': {}
}
self.ng_tmpl_s_dn_id = self.create_node_group_template(**template)
self.ng_template_ids.append(self.ng_tmpl_s_dn_id)
@b.errormsg("Failure while cluster template creation: ")
def _create_cluster_template(self):
template = {
'name': 'test-cluster-template-spark',
'plugin_config': self.spark_config,
'description': 'test cluster template for Spark plugin',
'cluster_configs': {
},
'node_groups': [
{
'name': 'master-node',
'node_group_template_id': self.ng_tmpl_m_nn_id,
'count': 1
},
{
'name': 'worker-node',
'node_group_template_id': self.ng_tmpl_s_dn_id,
'count': 1
}
],
'net_id': self.internal_neutron_net
}
self.cluster_template_id = self.create_cluster_template(**template)
@b.errormsg("Failure while cluster creation: ")
def _create_cluster(self):
cluster_name = '%s-%s' % (self.common_config.CLUSTER_NAME,
self.spark_config.PLUGIN_NAME)
cluster = {
'name': cluster_name,
'plugin_config': self.spark_config,
'cluster_template_id': self.cluster_template_id,
'description': 'test cluster',
'cluster_configs': {}
}
self.create_cluster(**cluster)
self.cluster_info = self.get_cluster_info(self.spark_config)
self.await_active_workers_for_namenode(self.cluster_info['node_info'],
self.spark_config)
@b.errormsg("Failure while EDP testing: ")
def _check_edp(self):
self._edp_test()
def _edp_test(self):
# check spark
spark_jar = self.edp_info.read_spark_example_jar()
spark_configs = self.edp_info.spark_example_configs()
self.edp_testing(utils_edp.JOB_TYPE_SPARK,
job_data_list=[{'jar': spark_jar}],
lib_data_list=[],
configs=spark_configs)
@b.errormsg("Failure while cluster scaling: ")
def _check_scaling(self):
pass
@b.errormsg("Failure while EDP testing after cluster scaling: ")
def _check_edp_after_scaling(self):
# Leave this blank until scaling is implemented
pass
@testcase.attr('spark')
@testcase.skipIf(config.SKIP_ALL_TESTS_FOR_PLUGIN,
'All tests for Spark plugin were skipped')
def test_spark_plugin_gating(self):
self._prepare_test()
self._create_m_nn_ng_template()
self._create_s_dn_ng_template()
self._create_cluster_template()
self._create_cluster()
self._check_edp()
if not self.spark_config.SKIP_SCALING_TEST:
self._check_scaling()
self._check_edp_after_scaling()
def tearDown(self):
self.delete_objects(self.cluster_id, self.cluster_template_id,
self.ng_template_ids)
super(SparkGatingTest, self).tearDown()