Spark job for Cloudera 5.3.0 and 5.4.0 added

Spark jobs in Cloudera 5.3.0 and 5.4.0 plugins are now supported.
Required unit tests have been added. Merged with current
master HEAD.

Change-Id: Ic8fde97e424e45c6f31f7794749793b26c844915
Implements: blueprint spark-jobs-for-cdh-5-3-0
This commit is contained in:
Alexander Aleksiyants 2015-07-10 14:13:14 +03:00
parent a1c95bca60
commit 74159dfdd2
14 changed files with 1070 additions and 703 deletions

View File

@ -142,7 +142,6 @@ SWIFT_LIB_URL = p.Config(
description=("Library that adds Swift support to CDH. The file will be "
"downloaded from VM."))
EXTJS_LIB_URL = p.Config(
"ExtJS library URL", 'general', 'cluster', priority=1,
default_value=DEFAULT_EXTJS_LIB_URL,
@ -160,12 +159,23 @@ AWAIT_MANAGER_STARTING_TIMEOUT = p.Config(
config_type='int', priority=1, default_value=300, is_optional=True,
description='Timeout for Cloudera Manager starting, in seconds')
_default_executor_classpath = ":".join(
['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar',
'/usr/lib/hadoop/hadoop-swift.jar'])
EXECUTOR_EXTRA_CLASSPATH = p.Config(
'Executor extra classpath', 'Spark', 'cluster', priority=2,
default_value=_default_executor_classpath,
description='Value for spark.executor.extraClassPath in '
'spark-defaults.conf (default: %s)'
% _default_executor_classpath)
def _get_cluster_plugin_configs():
return [CDH5_REPO_URL, CDH5_REPO_KEY_URL, CM5_REPO_URL, CM5_REPO_KEY_URL,
ENABLE_SWIFT, ENABLE_HBASE_COMMON_LIB, SWIFT_LIB_URL,
EXTJS_LIB_URL, AWAIT_AGENTS_TIMEOUT,
AWAIT_MANAGER_STARTING_TIMEOUT]
AWAIT_MANAGER_STARTING_TIMEOUT, EXECUTOR_EXTRA_CLASSPATH]
# ng wide configs

View File

@ -13,12 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara import exceptions as ex
from sahara.plugins.cdh import confighints_helper as ch_helper
from sahara.plugins.cdh.v5_3_0 import cloudera_utils as cu
from sahara.plugins import exceptions as ex
from sahara.plugins import exceptions as pl_ex
from sahara.plugins import utils as u
from sahara.service.edp import hdfs_helper
from sahara.service.edp.oozie import engine as edp_engine
from sahara.service.edp.spark import engine as edp_spark_engine
from sahara.utils import edp
CU = cu.ClouderaUtilsV530()
@ -50,7 +52,7 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
def validate_job_execution(self, cluster, job, data):
oo_count = u.get_instances_count(cluster, 'OOZIE_SERVER')
if oo_count != 1:
raise ex.InvalidComponentCountException(
raise pl_ex.InvalidComponentCountException(
'OOZIE_SERVER', '1', oo_count)
super(EdpOozieEngine, self).validate_job_execution(cluster, job, data)
@ -69,3 +71,41 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
return {'job_config': ch_helper.get_possible_pig_config_from(
'plugins/cdh/v5_3_0/resources/mapred-site.xml')}
return edp_engine.OozieJobEngine.get_possible_job_config(job_type)
class EdpSparkEngine(edp_spark_engine.SparkJobEngine):
edp_base_version = "5.3.0"
def __init__(self, cluster):
super(EdpSparkEngine, self).__init__(cluster)
self.master = u.get_instance(cluster, "CLOUDERA_MANAGER")
self.plugin_params["spark-user"] = "sudo -u spark "
self.plugin_params["spark-submit"] = "spark-submit"
self.plugin_params["deploy-mode"] = "cluster"
self.plugin_params["master"] = "yarn-cluster"
driver_cp = u.get_config_value_or_default(
"Spark", "Executor extra classpath", self.cluster)
if driver_cp:
driver_cp = " --driver-class-path " + driver_cp
self.plugin_params["driver-class-path"] = driver_cp
@staticmethod
def edp_supported(version):
return version >= EdpSparkEngine.edp_base_version
def validate_job_execution(self, cluster, job, data):
if not self.edp_supported(cluster.hadoop_version):
raise ex.InvalidDataException(
_('Cloudera {base} or higher required to run {type}'
'jobs').format(
base=EdpSparkEngine.edp_base_version, type=job.type))
shs_count = u.get_instances_count(
cluster, 'SPARK_YARN_HISTORY_SERVER')
if shs_count != 1:
raise pl_ex.InvalidComponentCountException(
'SPARK_YARN_HISTORY_SERVER', '1', shs_count)
super(EdpSparkEngine, self).validate_job_execution(
cluster, job, data)

View File

@ -1,4 +1,5 @@
# Copyright (c) 2014 Mirantis Inc.
# Copyright (c) 2015 ISPRAS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -108,10 +109,13 @@ class VersionHandler(avm.AbstractVersionHandler):
def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster)
if job_type in edp_engine.EdpSparkEngine.get_supported_job_types():
return edp_engine.EdpSparkEngine(cluster)
return None
def get_edp_job_types(self):
return edp_engine.EdpOozieEngine.get_supported_job_types()
return (edp_engine.EdpOozieEngine.get_supported_job_types() +
edp_engine.EdpSparkEngine.get_supported_job_types())
def get_edp_config_hints(self, job_type):
return edp_engine.EdpOozieEngine.get_possible_job_config(job_type)

View File

@ -181,13 +181,24 @@ AWAIT_MANAGER_STARTING_TIMEOUT = p.Config(
config_type='int', priority=1, default_value=300, is_optional=True,
description='Timeout for Cloudera Manager starting, in seconds')
_default_executor_classpath = ":".join(
['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar',
'/usr/lib/hadoop/hadoop-swift.jar'])
EXECUTOR_EXTRA_CLASSPATH = p.Config(
'Executor extra classpath', 'Spark', 'cluster', priority=2,
default_value=_default_executor_classpath,
description='Value for spark.executor.extraClassPath in '
'spark-defaults.conf (default: %s)'
% _default_executor_classpath)
def _get_cluster_plugin_configs():
return [CDH5_REPO_URL, CDH5_REPO_KEY_URL, CM5_REPO_URL, CM5_REPO_KEY_URL,
KMS_REPO_URL, KMS_REPO_KEY_URL,
ENABLE_SWIFT, ENABLE_HBASE_COMMON_LIB, SWIFT_LIB_URL,
EXTJS_LIB_URL, AWAIT_AGENTS_TIMEOUT,
AWAIT_MANAGER_STARTING_TIMEOUT]
AWAIT_MANAGER_STARTING_TIMEOUT, EXECUTOR_EXTRA_CLASSPATH]
# ng wide configs

View File

@ -13,12 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara import exceptions as ex
from sahara.plugins.cdh import confighints_helper as ch_helper
from sahara.plugins.cdh.v5_4_0 import cloudera_utils as cu
from sahara.plugins import exceptions as ex
from sahara.plugins import exceptions as pl_ex
from sahara.plugins import utils as u
from sahara.service.edp import hdfs_helper
from sahara.service.edp.oozie import engine as edp_engine
from sahara.service.edp.spark import engine as edp_spark_engine
from sahara.utils import edp
CU = cu.ClouderaUtilsV540()
@ -50,7 +52,7 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
def validate_job_execution(self, cluster, job, data):
oo_count = u.get_instances_count(cluster, 'OOZIE_SERVER')
if oo_count != 1:
raise ex.InvalidComponentCountException(
raise pl_ex.InvalidComponentCountException(
'OOZIE_SERVER', '1', oo_count)
super(EdpOozieEngine, self).validate_job_execution(cluster, job, data)
@ -69,3 +71,41 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
return {'job_config': ch_helper.get_possible_pig_config_from(
'plugins/cdh/v5_4_0/resources/mapred-site.xml')}
return edp_engine.OozieJobEngine.get_possible_job_config(job_type)
class EdpSparkEngine(edp_spark_engine.SparkJobEngine):
edp_base_version = "5.4.0"
def __init__(self, cluster):
super(EdpSparkEngine, self).__init__(cluster)
self.master = u.get_instance(cluster, "CLOUDERA_MANAGER")
self.plugin_params["spark-user"] = "sudo -u spark "
self.plugin_params["spark-submit"] = "spark-submit"
self.plugin_params["deploy-mode"] = "cluster"
self.plugin_params["master"] = "yarn-cluster"
driver_cp = u.get_config_value_or_default(
"Spark", "Executor extra classpath", self.cluster)
if driver_cp:
driver_cp = " --driver-class-path " + driver_cp
self.plugin_params["driver-class-path"] = driver_cp
@staticmethod
def edp_supported(version):
return version >= EdpSparkEngine.edp_base_version
def validate_job_execution(self, cluster, job, data):
if not self.edp_supported(cluster.hadoop_version):
raise ex.InvalidDataException(
_('Cloudera {base} or higher required to run {type}'
'jobs').format(
base=EdpSparkEngine.edp_base_version, type=job.type))
shs_count = u.get_instances_count(
cluster, 'SPARK_YARN_HISTORY_SERVER')
if shs_count != 1:
raise pl_ex.InvalidComponentCountException(
'SPARK_YARN_HISTORY_SERVER', '1', shs_count)
super(EdpSparkEngine, self).validate_job_execution(
cluster, job, data)

View File

@ -1,4 +1,5 @@
# Copyright (c) 2015 Intel Corporation
# Copyright (c) 2015 ISPRAS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -109,10 +110,13 @@ class VersionHandler(avm.AbstractVersionHandler):
def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster)
if job_type in edp_engine.EdpSparkEngine.get_supported_job_types():
return edp_engine.EdpSparkEngine(cluster)
return None
def get_edp_job_types(self):
return edp_engine.EdpOozieEngine.get_supported_job_types()
return (edp_engine.EdpOozieEngine.get_supported_job_types() +
edp_engine.EdpSparkEngine.get_supported_job_types())
def get_edp_config_hints(self, job_type):
return edp_engine.EdpOozieEngine.get_possible_job_config(job_type)

View File

@ -1,4 +1,5 @@
# Copyright (c) 2014 Mirantis Inc.
# Copyright (c) 2015 ISPRAS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,8 +14,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import six
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.plugins import utils as plugin_utils
from sahara.service.edp.spark import engine as edp_engine
@ -22,6 +28,25 @@ class EdpEngine(edp_engine.SparkJobEngine):
edp_base_version = "1.0.0"
def __init__(self, cluster):
super(EdpEngine, self).__init__(cluster)
self.master = plugin_utils.get_instance(cluster, "master")
self.plugin_params["spark-user"] = ""
self.plugin_params["spark-submit"] = os.path.join(
plugin_utils.
get_config_value_or_default("Spark", "Spark home", self.cluster),
"bin/spark-submit")
self.plugin_params["deploy-mode"] = "client"
port_str = six.text_type(
plugin_utils.get_config_value_or_default(
"Spark", "Master port", self.cluster))
self.plugin_params["master"] = ('spark://%(host)s:' + port_str)
driver_cp = plugin_utils.get_config_value_or_default(
"Spark", "Executor extra classpath", self.cluster)
if driver_cp:
driver_cp = " --driver-class-path " + driver_cp
self.plugin_params["driver-class-path"] = driver_cp
@staticmethod
def edp_supported(version):
return version >= EdpEngine.edp_base_version

View File

@ -1,4 +1,5 @@
# Copyright (c) 2014 OpenStack Foundation
# Copyright (c) 2015 ISPRAS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -23,7 +24,6 @@ from sahara import conductor as c
from sahara import context
from sahara import exceptions as e
from sahara.i18n import _
from sahara.plugins import utils as plugin_utils
from sahara.service.edp import base_engine
from sahara.service.edp.binary_retrievers import dispatch
from sahara.service.edp import hdfs_helper as h
@ -44,6 +44,14 @@ CONF = cfg.CONF
class SparkJobEngine(base_engine.JobEngine):
def __init__(self, cluster):
self.cluster = cluster
# We'll always run the driver program on the master
self.master = None
# These parameters depend on engine that is used
self.plugin_params = {"master": "",
"spark-user": "",
"deploy-mode": "",
"spark-submit": ""
}
def _get_pid_and_inst_id(self, job_id):
try:
@ -182,7 +190,7 @@ class SparkJobEngine(base_engine.JobEngine):
def run_job(self, job_execution):
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
indep_params = {}
data_source_urls = {}
additional_sources, updated_job_configs = (
job_utils.resolve_data_source_references(
@ -197,16 +205,16 @@ class SparkJobEngine(base_engine.JobEngine):
h.configure_cluster_for_hdfs(self.cluster, data_source)
break
# We'll always run the driver program on the master
master = plugin_utils.get_instance(self.cluster, "master")
# It is needed in case we are working with Spark plugin
self.plugin_params['master'] = (
self.plugin_params['master'] % {'host': self.master.hostname()})
# TODO(tmckay): wf_dir should probably be configurable.
# The only requirement is that the dir is writable by the image user
wf_dir = job_utils.create_workflow_dir(master, '/tmp/spark-edp', job,
job_execution.id, "700")
wf_dir = job_utils.create_workflow_dir(self.master, '/tmp/spark-edp',
job, job_execution.id, "700")
paths, builtin_paths = self._upload_job_files(
master, wf_dir, job, updated_job_configs)
self.master, wf_dir, job, updated_job_configs)
# We can shorten the paths in this case since we'll run out of wf_dir
paths = [os.path.basename(p) for p in paths]
@ -216,81 +224,63 @@ class SparkJobEngine(base_engine.JobEngine):
# jar and we generate paths in order (mains, then libs).
# When we have a Spark job type, we can require a "main" and set
# the app jar explicitly to be "main"
app_jar = paths.pop(0)
job_class = updated_job_configs["configs"]["edp.java.main_class"]
indep_params["app_jar"] = paths.pop(0)
indep_params["job_class"] = (
updated_job_configs["configs"]["edp.java.main_class"])
# If we uploaded builtins then we are using a wrapper jar. It will
# be the first one on the builtin list and the original app_jar needs
# to be added to the 'additional' jars
if builtin_paths:
wrapper_jar = builtin_paths.pop(0)
wrapper_class = 'org.openstack.sahara.edp.SparkWrapper'
wrapper_xml = self._upload_wrapper_xml(master,
indep_params["wrapper_jar"] = builtin_paths.pop(0)
indep_params["wrapper_class"] = (
'org.openstack.sahara.edp.SparkWrapper')
wrapper_xml = self._upload_wrapper_xml(self.master,
wf_dir,
updated_job_configs)
wrapper_args = "%s %s" % (wrapper_xml, job_class)
indep_params["wrapper_args"] = "%s %s" % (
wrapper_xml, indep_params["job_class"])
additional_jars = ",".join([app_jar] + paths + builtin_paths)
indep_params["addnl_jars"] = ",".join(
[indep_params["app_jar"]] + paths + builtin_paths)
else:
wrapper_jar = wrapper_class = wrapper_args = ""
additional_jars = ",".join(paths)
indep_params["addnl_jars"] = ",".join(paths)
# All additional jars are passed with the --jars option
if additional_jars:
additional_jars = " --jars " + additional_jars
if indep_params["addnl_jars"]:
indep_params["addnl_jars"] = (
" --jars " + indep_params["addnl_jars"])
# Launch the spark job using spark-submit and deploy_mode = client
host = master.hostname()
port = plugin_utils.get_config_value_or_default("Spark",
"Master port",
self.cluster)
spark_submit = os.path.join(
plugin_utils.get_config_value_or_default("Spark",
"Spark home",
self.cluster),
"bin/spark-submit")
# TODO(tmckay): we need to clean up wf_dirs on long running clusters
# TODO(tmckay): probably allow for general options to spark-submit
args = updated_job_configs.get('args', [])
args = " ".join([su.inject_swift_url_suffix(arg) for arg in args])
if args:
args = " " + args
indep_params["args"] = updated_job_configs.get('args', [])
indep_params["args"] = " ".join([su.inject_swift_url_suffix(arg)
for arg in indep_params["args"]])
if indep_params.get("args"):
indep_params["args"] = (" " + indep_params["args"])
if wrapper_jar and wrapper_class:
mutual_dict = self.plugin_params.copy()
mutual_dict.update(indep_params)
if mutual_dict.get("wrapper_jar"):
# Substrings which may be empty have spaces
# embedded if they are non-empty
cmd = (
'%(spark_submit)s%(driver_cp)s'
'%(spark-user)s%(spark-submit)s%(driver-class-path)s'
' --class %(wrapper_class)s%(addnl_jars)s'
' --master spark://%(host)s:%(port)s'
' %(wrapper_jar)s %(wrapper_args)s%(args)s') % (
{
"spark_submit": spark_submit,
"driver_cp": self.get_driver_classpath(),
"wrapper_class": wrapper_class,
"addnl_jars": additional_jars,
"host": host,
"port": port,
"wrapper_jar": wrapper_jar,
"wrapper_args": wrapper_args,
"args": args
})
' --master %(master)s'
' --deploy-mode %(deploy-mode)s'
' %(wrapper_jar)s %(wrapper_args)s%(args)s') % dict(
mutual_dict)
else:
cmd = (
'%(spark_submit)s --class %(job_class)s%(addnl_jars)s'
' --master spark://%(host)s:%(port)s %(app_jar)s%(args)s') % (
{
"spark_submit": spark_submit,
"job_class": job_class,
"addnl_jars": additional_jars,
"host": host,
"port": port,
"app_jar": app_jar,
"args": args
})
'%(spark-user)s%(spark-submit)s'
' --class %(job_class)s%(addnl_jars)s'
' --master %(master)s'
' --deploy-mode %(deploy-mode)s'
' %(app_jar)s%(args)s') % dict(
mutual_dict)
job_execution = conductor.job_execution_get(ctx, job_execution.id)
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
return (None, edp.JOB_STATUS_KILLED, None)
@ -298,10 +288,11 @@ class SparkJobEngine(base_engine.JobEngine):
# If an exception is raised here, the job_manager will mark
# the job failed and log the exception
# The redirects of stdout and stderr will preserve output in the wf_dir
with remote.get_remote(master) as r:
with remote.get_remote(self.master) as r:
# Upload the command launch script
launch = os.path.join(wf_dir, "launch_command")
r.write_file_to(launch, self._job_script())
r.execute_command("chmod u+rwx,g+rx,o+rx %s" % wf_dir)
r.execute_command("chmod +x %s" % launch)
ret, stdout = r.execute_command(
"cd %s; ./launch_command %s > /dev/null 2>&1 & echo $!"
@ -311,7 +302,7 @@ class SparkJobEngine(base_engine.JobEngine):
# Success, we'll add the wf_dir in job_execution.extra and store
# pid@instance_id as the job id
# We know the job is running so return "RUNNING"
return (stdout.strip() + "@" + master.id,
return (stdout.strip() + "@" + self.master.id,
edp.JOB_STATUS_RUNNING,
{'spark-path': wf_dir})
@ -331,10 +322,3 @@ class SparkJobEngine(base_engine.JobEngine):
@staticmethod
def get_supported_job_types():
return [edp.JOB_TYPE_SPARK]
def get_driver_classpath(self):
cp = plugin_utils.get_config_value_or_default(
"Spark", "Executor extra classpath", self.cluster)
if cp:
cp = " --driver-class-path " + cp
return cp

View File

@ -0,0 +1,69 @@
# Copyright (c) 2015 ISPRAS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
from sahara import conductor as cond
from sahara import context
from sahara.plugins import base as pb
from sahara.plugins import exceptions as ex
from sahara.service.edp.spark import engine
from sahara.tests.unit import base
from sahara.utils import edp
conductor = cond.API
class SparkPluginTest(base.SaharaWithDbTestCase):
def setUp(self):
super(SparkPluginTest, self).setUp()
self.override_config("plugins", ["cdh"])
pb.setup_plugins()
def test_plugin_edp_engine_no_spark(self):
cluster_dict = {
'name': 'cluster',
'plugin_name': 'cdh',
'hadoop_version': '5.3.0',
'default_image_id': 'image'}
job = mock.Mock()
job.type = edp.JOB_TYPE_SPARK
cluster = conductor.cluster_create(context.ctx(), cluster_dict)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
edp_engine = plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK)
with testtools.ExpectedException(
ex.InvalidComponentCountException,
value_re="Hadoop cluster should contain 1 "
"SPARK_YARN_HISTORY_SERVER component\(s\). Actual "
"SPARK_YARN_HISTORY_SERVER count is 0\nError ID: .*"):
edp_engine.validate_job_execution(cluster, job, mock.Mock())
def test_plugin_edp_engine(self):
cluster_dict = {
'name': 'cluster',
'plugin_name': 'cdh',
'hadoop_version': '5.3.0',
'default_image_id': 'image'}
cluster = conductor.cluster_create(context.ctx(), cluster_dict)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
self.assertIsInstance(
plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK),
engine.SparkJobEngine)

View File

@ -0,0 +1,69 @@
# Copyright (c) 2015 ISPRAS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
from sahara import conductor as cond
from sahara import context
from sahara.plugins import base as pb
from sahara.plugins import exceptions as ex
from sahara.service.edp.spark import engine
from sahara.tests.unit import base
from sahara.utils import edp
conductor = cond.API
class SparkPluginTest(base.SaharaWithDbTestCase):
def setUp(self):
super(SparkPluginTest, self).setUp()
self.override_config("plugins", ["cdh"])
pb.setup_plugins()
def test_plugin_edp_engine_no_spark(self):
cluster_dict = {
'name': 'cluster',
'plugin_name': 'cdh',
'hadoop_version': '5.4.0',
'default_image_id': 'image'}
job = mock.Mock()
job.type = edp.JOB_TYPE_SPARK
cluster = conductor.cluster_create(context.ctx(), cluster_dict)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
edp_engine = plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK)
with testtools.ExpectedException(
ex.InvalidComponentCountException,
value_re="Hadoop cluster should contain 1 "
"SPARK_YARN_HISTORY_SERVER component\(s\). Actual "
"SPARK_YARN_HISTORY_SERVER count is 0\nError ID: .*"):
edp_engine.validate_job_execution(cluster, job, mock.Mock())
def test_plugin_edp_engine(self):
cluster_dict = {
'name': 'cluster',
'plugin_name': 'cdh',
'hadoop_version': '5.4.0',
'default_image_id': 'image'}
cluster = conductor.cluster_create(context.ctx(), cluster_dict)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
self.assertIsInstance(
plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK),
engine.SparkJobEngine)

View File

@ -0,0 +1,665 @@
# Copyright (c) 2014 OpenStack Foundation
# Copyright (c) 2015 ISPRAS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import mock
import sahara.exceptions as ex
from sahara.service.edp.spark import engine as se
from sahara.tests.unit import base
from sahara.utils import edp
class TestSpark(base.SaharaTestCase):
def setUp(self):
super(TestSpark, self).setUp()
# These variables are initialized in subclasses because its values
# depend on plugin
self.master_host = None
self.engine_class = None
self.spark_user = None
self.spark_submit = None
self.master = None
self.deploy_mode = None
self.master_port = 7077
self.master_inst = "6789"
self.spark_pid = "12345"
self.spark_home = "/opt/spark"
self.workflow_dir = "/wfdir"
self.driver_cp = "/usr/lib/hadoop/hadoop-swift.jar"
def test_get_pid_and_inst_id(self):
'''Test parsing of job ids
Test that job ids of the form pid@instance are
split into pid and instance ids by eng._get_pid_and_inst_id()
but anything else returns empty strings
'''
eng = se.SparkJobEngine(None)
for job_id in [None, "", "@", "something", "pid@", "@instance"]:
pid, inst_id = eng._get_pid_and_inst_id(job_id)
self.assertEqual(("", ""), (pid, inst_id))
pid, inst_id = eng._get_pid_and_inst_id("pid@instance")
self.assertEqual(("pid", "instance"), (pid, inst_id))
@mock.patch('sahara.utils.general.get_instances')
def test_get_instance_if_running(self, get_instances):
'''Test retrieval of pid and instance object for running job
If the job id is valid and the job status is non-terminated,
_get_instance_if_running() should retrieve the instance
based on the inst_id and return the pid and instance.
If the job is invalid or the job is terminated, it should
return None, None.
If get_instances() throws an exception or returns an empty list,
the instance returned should be None (pid might still be set)
'''
get_instances.return_value = ["instance"]
job_exec = mock.Mock()
eng = se.SparkJobEngine("cluster")
job_exec.oozie_job_id = "invalid id"
self.assertEqual((None, None),
eng._get_instance_if_running(job_exec))
job_exec.oozie_job_id = "pid@inst_id"
for state in edp.JOB_STATUSES_TERMINATED:
job_exec.info = {'status': state}
self.assertEqual((None, None),
eng._get_instance_if_running(job_exec))
job_exec.info = {'status': edp.JOB_STATUS_RUNNING}
self.assertEqual(("pid", "instance"),
eng._get_instance_if_running(job_exec))
get_instances.assert_called_with("cluster", ["inst_id"])
# Pretend get_instances returns nothing
get_instances.return_value = []
pid, instance = eng._get_instance_if_running(job_exec)
self.assertIsNone(instance)
# Pretend get_instances throws an exception
get_instances.side_effect = Exception("some failure")
pid, instance = eng._get_instance_if_running(job_exec)
self.assertIsNone(instance)
def test_get_result_file(self):
remote = mock.Mock()
remote.execute_command.return_value = 999, "value"
job_exec = mock.Mock()
job_exec.extra = {"spark-path": "/tmp/spark-edp/Job/123"}
eng = se.SparkJobEngine("cluster")
ret, stdout = eng._get_result_file(remote, job_exec)
remote.execute_command.assert_called_with(
"cat /tmp/spark-edp/Job/123/result",
raise_when_error=False)
self.assertEqual((ret, stdout),
remote.execute_command.return_value)
def test_check_pid(self):
remote = mock.Mock()
remote.execute_command.return_value = 999, ""
eng = se.SparkJobEngine("cluster")
ret = eng._check_pid(remote, "pid")
remote.execute_command.assert_called_with("ps hp pid",
raise_when_error=False)
self.assertEqual(999, ret)
@mock.patch.object(se.SparkJobEngine,
'_get_result_file',
autospec=True)
@mock.patch.object(se.SparkJobEngine,
'_check_pid',
autospec=True)
def test_get_job_status_from_remote(self, _check_pid, _get_result_file):
'''Test retrieval of job status from remote instance
If the process is present, status is RUNNING
If the process is not present, status depends on the result file
If the result file is missing, status is DONEWITHERROR
'''
eng = se.SparkJobEngine("cluster")
job_exec = mock.Mock()
remote = mock.Mock()
# Pretend process is running
_check_pid.return_value = 0
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
_check_pid.assert_called_with(eng, remote, "pid")
self.assertEqual({"status": edp.JOB_STATUS_RUNNING}, status)
# Pretend process ended and result file contains 0 (success)
_check_pid.return_value = 1
_get_result_file.return_value = 0, "0"
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_SUCCEEDED}, status)
# Pretend process ended and result file contains 1 (success)
_get_result_file.return_value = 0, "1"
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_DONEWITHERROR}, status)
# Pretend process ended and result file contains 130 (killed)
_get_result_file.return_value = 0, "130"
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_KILLED}, status)
# Pretend process ended and result file contains -2 (killed)
_get_result_file.return_value = 0, "-2"
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_KILLED}, status)
# Pretend process ended and result file is missing
_get_result_file.return_value = 1, ""
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_DONEWITHERROR}, status)
@mock.patch.object(se.SparkJobEngine,
'_get_job_status_from_remote',
autospec=True)
@mock.patch.object(se.SparkJobEngine,
'_get_instance_if_running',
autospec=True)
@mock.patch('sahara.utils.remote.get_remote')
def test_get_job_status(self,
get_remote,
_get_instance_if_running,
_get_job_status_from_remote):
# This is to mock "with remote.get_remote(instance) as r"
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
# Pretend instance is not returned
_get_instance_if_running.return_value = "pid", None
job_exec = mock.Mock()
eng = se.SparkJobEngine("cluster")
status = eng.get_job_status(job_exec)
self.assertIsNone(status)
# Pretend we have an instance
_get_instance_if_running.return_value = "pid", "instance"
_get_job_status_from_remote.return_value = {"status":
edp.JOB_STATUS_RUNNING}
status = eng.get_job_status(job_exec)
_get_job_status_from_remote.assert_called_with(eng,
remote_instance,
"pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_RUNNING}, status)
@mock.patch.object(se.SparkJobEngine,
'_get_instance_if_running',
autospec=True,
return_value=(None, None))
@mock.patch('sahara.utils.remote.get_remote')
def test_cancel_job_null_or_done(self,
get_remote,
_get_instance_if_running):
'''Test cancel_job() when instance is None
Test that cancel_job() returns None and does not try to
retrieve a remote instance if _get_instance_if_running() returns None
'''
eng = se.SparkJobEngine("cluster")
job_exec = mock.Mock()
self.assertIsNone(eng.cancel_job(job_exec))
self.assertTrue(_get_instance_if_running.called)
self.assertFalse(get_remote.called)
@mock.patch.object(se.SparkJobEngine,
'_get_job_status_from_remote',
autospec=True,
return_value={"status": edp.JOB_STATUS_KILLED})
@mock.patch.object(se.SparkJobEngine,
'_get_instance_if_running',
autospec=True,
return_value=("pid", "instance"))
@mock.patch('sahara.utils.remote.get_remote')
def test_cancel_job(self,
get_remote,
_get_instance_if_running,
_get_job_status_from_remote):
'''Test cancel_job() with a valid instance
For a valid instance, test that cancel_job:
* retrieves the remote instance
* executes the proper kill command
* retrieves the job status (because the remote command is successful)
'''
# This is to mock "with remote.get_remote(instance) as r" in cancel_job
# and to mock r.execute_command to return success
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
remote_instance.execute_command.return_value = (0, "standard out")
eng = se.SparkJobEngine("cluster")
job_exec = mock.Mock()
status = eng.cancel_job(job_exec)
# check that remote.get_remote was called with the result of
# eng._get_instance_if_running()
get_remote.assert_called_with("instance")
# check that execute_command was called with the proper arguments
# ("pid" was passed in)
remote_instance.execute_command.assert_called_with(
"kill -SIGINT pid",
raise_when_error=False)
# check that the job status was retrieved since the command succeeded
_get_job_status_from_remote.assert_called_with(eng,
remote_instance,
"pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_KILLED}, status)
@mock.patch.object(se.SparkJobEngine,
'_get_job_status_from_remote',
autospec=True)
@mock.patch.object(se.SparkJobEngine,
'_get_instance_if_running',
autospec=True,
return_value=("pid", "instance"))
@mock.patch('sahara.utils.remote.get_remote')
def test_cancel_job_failed(self,
get_remote,
_get_instance_if_running,
_get_job_status_from_remote):
'''Test cancel_job() when remote command fails
For a valid instance and a failed kill command, test that cancel_job:
* retrieves the remote instance
* executes the proper kill command
* does not retrieve the job status (because the remote command failed)
'''
# This is to mock "with remote.get_remote(instance) as r"
# and to mock r.execute_command to return failure
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
remote_instance.execute_command.return_value = (-1, "some error")
eng = se.SparkJobEngine("cluster")
job_exec = mock.Mock()
status = eng.cancel_job(job_exec)
# check that remote.get_remote was called with the result of
# eng._get_instance_if_running
get_remote.assert_called_with("instance")
# check that execute_command was called with the proper arguments
# ("pid" was passed in)
remote_instance.execute_command.assert_called_with(
"kill -SIGINT pid",
raise_when_error=False)
# check that the job status was not retrieved since the command failed
self.assertEqual(0, _get_job_status_from_remote.called)
# check that we have nothing new to report ...
self.assertIsNone(status)
@mock.patch('sahara.service.edp.binary_retrievers.dispatch.get_raw_binary')
@mock.patch('sahara.utils.remote.get_remote')
def test_upload_job_files(self, get_remote, get_raw_binary):
main_names = ["main1", "main2", "main3"]
lib_names = ["lib1", "lib2", "lib3"]
def make_data_objects(*args):
objs = []
for name in args:
m = mock.Mock()
m.name = name
objs.append(m)
return objs
job = mock.Mock()
job.name = "job"
job.mains = make_data_objects(*main_names)
job.libs = make_data_objects(*lib_names)
# This is to mock "with remote.get_remote(instance) as r"
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
get_raw_binary.return_value = "data"
eng = se.SparkJobEngine("cluster")
paths, builtins = eng._upload_job_files("where", "/somedir", job, {})
self.assertEqual(["/somedir/" + n for n in main_names + lib_names],
paths)
for path in paths:
remote_instance.write_file_to.assert_any_call(path, "data")
def _make_master_instance(self, return_code=0):
master = mock.Mock()
master.execute_command.return_value = (return_code, self.spark_pid)
master.hostname.return_value = self.master_host
master.id = self.master_inst
return master
def _config_values(self, *key):
return {("Spark", "Master port", "cluster"): self.master_port,
("Spark", "Spark home", "cluster"): self.spark_home,
("Spark", "Executor extra classpath",
"cluster"): self.driver_cp}[key]
@mock.patch('sahara.conductor.API.job_execution_update')
@mock.patch('sahara.conductor.API.job_execution_get')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.plugins.utils.get_config_value_or_default')
@mock.patch('sahara.service.edp.job_utils.create_workflow_dir')
@mock.patch('sahara.plugins.utils.get_instance')
@mock.patch('sahara.conductor.API.job_get')
@mock.patch('sahara.context.ctx', return_value="ctx")
def _setup_run_job(self, master_instance, job_configs, files,
ctx, job_get, get_instance, create_workflow_dir,
get_config_value, get_remote, job_exec_get,
job_exec_update):
def _upload_job_files(where, job_dir, job,
libs_subdir=True, job_configs=None):
paths = [os.path.join(self.workflow_dir, f) for f in files['jars']]
bltns = files.get('bltns', [])
bltns = [os.path.join(self.workflow_dir, f) for f in bltns]
return paths, bltns
job = mock.Mock()
job.name = "MyJob"
job_get.return_value = job
job_exec = mock.Mock()
job_exec.job_configs = job_configs
get_config_value.side_effect = self._config_values
create_workflow_dir.return_value = self.workflow_dir
# This is to mock "with remote.get_remote(master) as r" in run_job
get_remote.return_value.__enter__ = mock.Mock(
return_value=master_instance)
get_instance.return_value = master_instance
eng = self.engine_class("cluster")
eng._upload_job_files = mock.Mock()
eng._upload_job_files.side_effect = _upload_job_files
status = eng.run_job(job_exec)
# Check that we launch on the master node
get_instance.assert_called_with("cluster", self.master_host)
return status
def test_run_job_raise(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar",
"jar1.jar",
"jar2.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance(return_code=1)
# If execute_command returns an error we should get a raise
self.assertRaises(ex.EDPError,
self._setup_run_job,
master_instance, job_configs, files)
def test_run_job_extra_jars_args(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar",
"jar1.jar",
"jar2.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_user)s%(spark_submit)s '
'--class org.me.myclass --jars jar1.jar,jar2.jar '
'--master %(master)s '
'--deploy-mode %(deploy_mode)s '
'app.jar input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_user": self.spark_user,
"spark_submit": self.spark_submit,
"master": self.master,
"deploy_mode": self.deploy_mode})
# Check result here
self.assertEqual(("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
def test_run_job_args(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_user)s%(spark_submit)s '
'--class org.me.myclass '
'--master %(master)s '
'--deploy-mode %(deploy_mode)s '
'app.jar input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_user": self.spark_user,
"spark_submit": self.spark_submit,
"master": self.master,
"deploy_mode": self.deploy_mode})
# Check result here
self.assertEqual(("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
def test_run_job(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
}
files = {'jars': ["app.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_user)s%(spark_submit)s '
'--class org.me.myclass '
'--master %(master)s '
'--deploy-mode %(deploy_mode)s '
'app.jar '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_user": self.spark_user,
"spark_submit": self.spark_submit,
"master": self.master,
"deploy_mode": self.deploy_mode})
# Check result here
self.assertEqual(("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
def test_run_job_wrapper_extra_jars_args(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass",
"edp.spark.adapt_for_swift": True},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar",
"jar1.jar",
"jar2.jar"],
'bltns': ["wrapper.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_user)s%(spark_submit)s '
'--driver-class-path %(driver_cp)s '
'--class org.openstack.sahara.edp.SparkWrapper '
'--jars app.jar,jar1.jar,jar2.jar '
'--master %(master)s '
'--deploy-mode %(deploy_mode)s '
'wrapper.jar spark.xml org.me.myclass input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_user": self.spark_user,
"spark_submit": self.spark_submit,
"driver_cp": self.driver_cp,
"master": self.master,
"deploy_mode": self.deploy_mode})
# Check result here
self.assertEqual(("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
def test_run_job_wrapper_args(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass",
"edp.spark.adapt_for_swift": True},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar"],
'bltns': ["wrapper.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_user)s%(spark_submit)s '
'--driver-class-path %(driver_cp)s '
'--class org.openstack.sahara.edp.SparkWrapper '
'--jars app.jar '
'--master %(master)s '
'--deploy-mode %(deploy_mode)s '
'wrapper.jar spark.xml org.me.myclass input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_user": self.spark_user,
"spark_submit": self.spark_submit,
"driver_cp": self.driver_cp,
"master": self.master,
"deploy_mode": self.deploy_mode})
# Check result here
self.assertEqual(("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
def test_run_job_wrapper(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass",
"edp.spark.adapt_for_swift": True}
}
files = {'jars': ["app.jar"],
'bltns': ["wrapper.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_user)s%(spark_submit)s '
'--driver-class-path %(driver_cp)s '
'--class org.openstack.sahara.edp.SparkWrapper '
'--jars app.jar '
'--master %(master)s '
'--deploy-mode %(deploy_mode)s '
'wrapper.jar spark.xml org.me.myclass '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_user": self.spark_user,
"spark_submit": self.spark_submit,
"driver_cp": self.driver_cp,
"master": self.master,
"deploy_mode": self.deploy_mode})
# Check result here
self.assertEqual(("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
@mock.patch('sahara.service.edp.hdfs_helper.configure_cluster_for_hdfs')
@mock.patch('sahara.service.edp.job_utils.resolve_data_source_references')
def test_external_hdfs_config(self, resolver, configurer):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
}
files = {'jars': ["app.jar"]}
data_source = mock.Mock()
data_source.type = 'hdfs'
resolver.return_value = ([data_source], job_configs)
master_instance = self._make_master_instance()
self._setup_run_job(master_instance, job_configs, files)
configurer.assert_called_with("cluster", data_source)

View File

@ -1,4 +1,5 @@
# Copyright (c) 2014 OpenStack Foundation
# Copyright (c) 2015 ISPRAS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,632 +14,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import mock
import sahara.exceptions as ex
from sahara.service.edp.spark import engine as se
from sahara.tests.unit import base
from sahara.utils import edp
from sahara.plugins.spark import edp_engine as spark_edp
from sahara.tests.unit.service.edp.spark import base as tests
class TestSpark(base.SaharaTestCase):
class TestSparkPlugin(tests.TestSpark):
def setUp(self):
super(TestSpark, self).setUp()
super(TestSparkPlugin, self).setUp()
self.master_host = "master"
self.master_port = 7077
self.master_inst = "6789"
self.spark_pid = "12345"
self.spark_home = "/opt/spark"
self.workflow_dir = "/wfdir"
self.driver_cp = "/usr/lib/hadoop/hadoop-swift.jar"
def test_get_pid_and_inst_id(self):
'''Test parsing of job ids
Test that job ids of the form pid@instance are
split into pid and instance ids by eng._get_pid_and_inst_id()
but anything else returns empty strings
'''
eng = se.SparkJobEngine(None)
for job_id in [None, "", "@", "something", "pid@", "@instance"]:
pid, inst_id = eng._get_pid_and_inst_id(job_id)
self.assertEqual(("", ""), (pid, inst_id))
pid, inst_id = eng._get_pid_and_inst_id("pid@instance")
self.assertEqual(("pid", "instance"), (pid, inst_id))
@mock.patch('sahara.utils.general.get_instances')
def test_get_instance_if_running(self, get_instances):
'''Test retrieval of pid and instance object for running job
If the job id is valid and the job status is non-terminated,
_get_instance_if_running() should retrieve the instance
based on the inst_id and return the pid and instance.
If the job is invalid or the job is terminated, it should
return None, None.
If get_instances() throws an exception or returns an empty list,
the instance returned should be None (pid might still be set)
'''
get_instances.return_value = ["instance"]
job_exec = mock.Mock()
eng = se.SparkJobEngine("cluster")
job_exec.oozie_job_id = "invalid id"
self.assertEqual((None, None),
eng._get_instance_if_running(job_exec))
job_exec.oozie_job_id = "pid@inst_id"
for state in edp.JOB_STATUSES_TERMINATED:
job_exec.info = {'status': state}
self.assertEqual((None, None),
eng._get_instance_if_running(job_exec))
job_exec.info = {'status': edp.JOB_STATUS_RUNNING}
self.assertEqual(("pid", "instance"),
eng._get_instance_if_running(job_exec))
get_instances.assert_called_with("cluster", ["inst_id"])
# Pretend get_instances returns nothing
get_instances.return_value = []
pid, instance = eng._get_instance_if_running(job_exec)
self.assertIsNone(instance)
# Pretend get_instances throws an exception
get_instances.side_effect = Exception("some failure")
pid, instance = eng._get_instance_if_running(job_exec)
self.assertIsNone(instance)
def test_get_result_file(self):
remote = mock.Mock()
remote.execute_command.return_value = 999, "value"
job_exec = mock.Mock()
job_exec.extra = {"spark-path": "/tmp/spark-edp/Job/123"}
eng = se.SparkJobEngine("cluster")
ret, stdout = eng._get_result_file(remote, job_exec)
remote.execute_command.assert_called_with(
"cat /tmp/spark-edp/Job/123/result",
raise_when_error=False)
self.assertEqual((ret, stdout),
remote.execute_command.return_value)
def test_check_pid(self):
remote = mock.Mock()
remote.execute_command.return_value = 999, ""
eng = se.SparkJobEngine("cluster")
ret = eng._check_pid(remote, "pid")
remote.execute_command.assert_called_with("ps hp pid",
raise_when_error=False)
self.assertEqual(999, ret)
@mock.patch.object(se.SparkJobEngine,
'_get_result_file',
autospec=True)
@mock.patch.object(se.SparkJobEngine,
'_check_pid',
autospec=True)
def test_get_job_status_from_remote(self, _check_pid, _get_result_file):
'''Test retrieval of job status from remote instance
If the process is present, status is RUNNING
If the process is not present, status depends on the result file
If the result file is missing, status is DONEWITHERROR
'''
eng = se.SparkJobEngine("cluster")
job_exec = mock.Mock()
remote = mock.Mock()
# Pretend process is running
_check_pid.return_value = 0
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
_check_pid.assert_called_with(eng, remote, "pid")
self.assertEqual({"status": edp.JOB_STATUS_RUNNING}, status)
# Pretend process ended and result file contains 0 (success)
_check_pid.return_value = 1
_get_result_file.return_value = 0, "0"
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_SUCCEEDED}, status)
# Pretend process ended and result file contains 1 (success)
_get_result_file.return_value = 0, "1"
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_DONEWITHERROR}, status)
# Pretend process ended and result file contains 130 (killed)
_get_result_file.return_value = 0, "130"
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_KILLED}, status)
# Pretend process ended and result file contains -2 (killed)
_get_result_file.return_value = 0, "-2"
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_KILLED}, status)
# Pretend process ended and result file is missing
_get_result_file.return_value = 1, ""
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_DONEWITHERROR}, status)
@mock.patch.object(se.SparkJobEngine,
'_get_job_status_from_remote',
autospec=True)
@mock.patch.object(se.SparkJobEngine,
'_get_instance_if_running',
autospec=True)
@mock.patch('sahara.utils.remote.get_remote')
def test_get_job_status(self,
get_remote,
_get_instance_if_running,
_get_job_status_from_remote):
# This is to mock "with remote.get_remote(instance) as r"
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
# Pretend instance is not returned
_get_instance_if_running.return_value = "pid", None
job_exec = mock.Mock()
eng = se.SparkJobEngine("cluster")
status = eng.get_job_status(job_exec)
self.assertIsNone(status)
# Pretend we have an instance
_get_instance_if_running.return_value = "pid", "instance"
_get_job_status_from_remote.return_value = {"status":
edp.JOB_STATUS_RUNNING}
status = eng.get_job_status(job_exec)
_get_job_status_from_remote.assert_called_with(eng,
remote_instance,
"pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_RUNNING}, status)
@mock.patch.object(se.SparkJobEngine,
'_get_instance_if_running',
autospec=True,
return_value=(None, None))
@mock.patch('sahara.utils.remote.get_remote')
def test_cancel_job_null_or_done(self,
get_remote,
_get_instance_if_running):
'''Test cancel_job() when instance is None
Test that cancel_job() returns None and does not try to
retrieve a remote instance if _get_instance_if_running() returns None
'''
eng = se.SparkJobEngine("cluster")
job_exec = mock.Mock()
self.assertIsNone(eng.cancel_job(job_exec))
self.assertTrue(_get_instance_if_running.called)
self.assertFalse(get_remote.called)
@mock.patch.object(se.SparkJobEngine,
'_get_job_status_from_remote',
autospec=True,
return_value={"status": edp.JOB_STATUS_KILLED})
@mock.patch.object(se.SparkJobEngine,
'_get_instance_if_running',
autospec=True,
return_value=("pid", "instance"))
@mock.patch('sahara.utils.remote.get_remote')
def test_cancel_job(self,
get_remote,
_get_instance_if_running,
_get_job_status_from_remote):
'''Test cancel_job() with a valid instance
For a valid instance, test that cancel_job:
* retrieves the remote instance
* executes the proper kill command
* retrieves the job status (because the remote command is successful)
'''
# This is to mock "with remote.get_remote(instance) as r" in cancel_job
# and to mock r.execute_command to return success
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
remote_instance.execute_command.return_value = (0, "standard out")
eng = se.SparkJobEngine("cluster")
job_exec = mock.Mock()
status = eng.cancel_job(job_exec)
# check that remote.get_remote was called with the result of
# eng._get_instance_if_running()
get_remote.assert_called_with("instance")
# check that execute_command was called with the proper arguments
# ("pid" was passed in)
remote_instance.execute_command.assert_called_with(
"kill -SIGINT pid",
raise_when_error=False)
# check that the job status was retrieved since the command succeeded
_get_job_status_from_remote.assert_called_with(eng,
remote_instance,
"pid", job_exec)
self.assertEqual({"status": edp.JOB_STATUS_KILLED}, status)
@mock.patch.object(se.SparkJobEngine,
'_get_job_status_from_remote',
autospec=True)
@mock.patch.object(se.SparkJobEngine,
'_get_instance_if_running',
autospec=True,
return_value=("pid", "instance"))
@mock.patch('sahara.utils.remote.get_remote')
def test_cancel_job_failed(self,
get_remote,
_get_instance_if_running,
_get_job_status_from_remote):
'''Test cancel_job() when remote command fails
For a valid instance and a failed kill command, test that cancel_job:
* retrieves the remote instance
* executes the proper kill command
* does not retrieve the job status (because the remote command failed)
'''
# This is to mock "with remote.get_remote(instance) as r"
# and to mock r.execute_command to return failure
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
remote_instance.execute_command.return_value = (-1, "some error")
eng = se.SparkJobEngine("cluster")
job_exec = mock.Mock()
status = eng.cancel_job(job_exec)
# check that remote.get_remote was called with the result of
# eng._get_instance_if_running
get_remote.assert_called_with("instance")
# check that execute_command was called with the proper arguments
# ("pid" was passed in)
remote_instance.execute_command.assert_called_with(
"kill -SIGINT pid",
raise_when_error=False)
# check that the job status was not retrieved since the command failed
self.assertEqual(0, _get_job_status_from_remote.called)
# check that we have nothing new to report ...
self.assertIsNone(status)
@mock.patch('sahara.service.edp.binary_retrievers.dispatch.get_raw_binary')
@mock.patch('sahara.utils.remote.get_remote')
def test_upload_job_files(self, get_remote, get_raw_binary):
main_names = ["main1", "main2", "main3"]
lib_names = ["lib1", "lib2", "lib3"]
def make_data_objects(*args):
objs = []
for name in args:
m = mock.Mock()
m.name = name
objs.append(m)
return objs
job = mock.Mock()
job.name = "job"
job.mains = make_data_objects(*main_names)
job.libs = make_data_objects(*lib_names)
# This is to mock "with remote.get_remote(instance) as r"
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
get_raw_binary.return_value = "data"
eng = se.SparkJobEngine("cluster")
paths, builtins = eng._upload_job_files("where", "/somedir", job, {})
self.assertEqual(["/somedir/" + n for n in main_names + lib_names],
paths)
for path in paths:
remote_instance.write_file_to.assert_any_call(path, "data")
def _make_master_instance(self, return_code=0):
master = mock.Mock()
master.execute_command.return_value = (return_code, self.spark_pid)
master.hostname.return_value = self.master_host
master.id = self.master_inst
return master
def _config_values(self, *key):
return {("Spark", "Master port", "cluster"): self.master_port,
("Spark", "Spark home", "cluster"): self.spark_home,
("Spark", "Executor extra classpath",
"cluster"): self.driver_cp}[key]
@mock.patch('sahara.conductor.API.job_execution_update')
@mock.patch('sahara.conductor.API.job_execution_get')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.plugins.utils.get_config_value_or_default')
@mock.patch('sahara.service.edp.job_utils.create_workflow_dir')
@mock.patch('sahara.plugins.utils.get_instance')
@mock.patch('sahara.conductor.API.job_get')
@mock.patch('sahara.context.ctx', return_value="ctx")
def _setup_run_job(self, master_instance, job_configs, files,
ctx, job_get, get_instance, create_workflow_dir,
get_config_value, get_remote, job_exec_get,
job_exec_update):
def _upload_job_files(where, job_dir, job,
libs_subdir=True, job_configs=None):
paths = [os.path.join(self.workflow_dir, f) for f in files['jars']]
bltns = files.get('bltns', [])
bltns = [os.path.join(self.workflow_dir, f) for f in bltns]
return paths, bltns
job = mock.Mock()
job.name = "MyJob"
job_get.return_value = job
job_exec = mock.Mock()
job_exec.job_configs = job_configs
get_config_value.side_effect = self._config_values
create_workflow_dir.return_value = self.workflow_dir
# This is to mock "with remote.get_remote(master) as r" in run_job
get_remote.return_value.__enter__ = mock.Mock(
return_value=master_instance)
get_instance.return_value = master_instance
eng = se.SparkJobEngine("cluster")
eng._upload_job_files = mock.Mock()
eng._upload_job_files.side_effect = _upload_job_files
status = eng.run_job(job_exec)
# Check that we launch on the master node
get_instance.assert_called_with("cluster", self.master_host)
return status
def test_run_job_raise(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar",
"jar1.jar",
"jar2.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance(return_code=1)
# If execute_command returns an error we should get a raise
self.assertRaises(ex.EDPError,
self._setup_run_job,
master_instance, job_configs, files)
def test_run_job_extra_jars_args(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar",
"jar1.jar",
"jar2.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_home)s/bin/spark-submit '
'--class org.me.myclass --jars jar1.jar,jar2.jar '
'--master spark://%(master_host)s:%(master_port)s '
'app.jar input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_home": self.spark_home,
"master_host": self.master_host,
"master_port": self.master_port})
# Check result here
self.assertEqual(("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
def test_run_job_args(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_home)s/bin/spark-submit '
'--class org.me.myclass '
'--master spark://%(master_host)s:%(master_port)s '
'app.jar input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_home": self.spark_home,
"master_host": self.master_host,
"master_port": self.master_port})
# Check result here
self.assertEqual(("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
def test_run_job(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
}
files = {'jars': ["app.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_home)s/bin/spark-submit '
'--class org.me.myclass '
'--master spark://%(master_host)s:%(master_port)s '
'app.jar '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_home": self.spark_home,
"master_host": self.master_host,
"master_port": self.master_port})
# Check result here
self.assertEqual(("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
def test_run_job_wrapper_extra_jars_args(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass",
"edp.spark.adapt_for_swift": True},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar",
"jar1.jar",
"jar2.jar"],
'bltns': ["wrapper.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_home)s/bin/spark-submit '
'--driver-class-path %(driver_cp)s '
'--class org.openstack.sahara.edp.SparkWrapper '
'--jars app.jar,jar1.jar,jar2.jar '
'--master spark://%(master_host)s:%(master_port)s '
'wrapper.jar spark.xml org.me.myclass input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_home": self.spark_home,
"driver_cp": self.driver_cp,
"master_host": self.master_host,
"master_port": self.master_port})
# Check result here
self.assertEqual(("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
def test_run_job_wrapper_args(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass",
"edp.spark.adapt_for_swift": True},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar"],
'bltns': ["wrapper.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_home)s/bin/spark-submit '
'--driver-class-path %(driver_cp)s '
'--class org.openstack.sahara.edp.SparkWrapper '
'--jars app.jar '
'--master spark://%(master_host)s:%(master_port)s '
'wrapper.jar spark.xml org.me.myclass input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_home": self.spark_home,
"driver_cp": self.driver_cp,
"master_host": self.master_host,
"master_port": self.master_port})
# Check result here
self.assertEqual(("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
def test_run_job_wrapper(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass",
"edp.spark.adapt_for_swift": True}
}
files = {'jars': ["app.jar"],
'bltns': ["wrapper.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_home)s/bin/spark-submit '
'--driver-class-path %(driver_cp)s '
'--class org.openstack.sahara.edp.SparkWrapper '
'--jars app.jar '
'--master spark://%(master_host)s:%(master_port)s '
'wrapper.jar spark.xml org.me.myclass '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_home": self.spark_home,
"driver_cp": self.driver_cp,
"master_host": self.master_host,
"master_port": self.master_port})
# Check result here
self.assertEqual(("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
@mock.patch('sahara.service.edp.hdfs_helper.configure_cluster_for_hdfs')
@mock.patch('sahara.service.edp.job_utils.resolve_data_source_references')
def test_external_hdfs_config(self, resolver, configurer):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
}
files = {'jars': ["app.jar"]}
data_source = mock.Mock()
data_source.type = 'hdfs'
resolver.return_value = ([data_source], job_configs)
master_instance = self._make_master_instance()
self._setup_run_job(master_instance, job_configs, files)
configurer.assert_called_with("cluster", data_source)
self.engine_class = spark_edp.EdpEngine
self.spark_user = ""
self.spark_submit = (
"%(spark_home)s/bin/spark-submit" %
{"spark_home": self.spark_home})
self.master = (
"spark://%(master_host)s:%(master_port)s" %
{"master_host": self.master_host,
"master_port": self.master_port})
self.deploy_mode = "client"

View File

@ -0,0 +1,28 @@
# Copyright (c) 2015 ISPRAS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins.cdh.v5_3_0 import edp_engine
from sahara.tests.unit.service.edp.spark import base as tests
class TestClouderaPlugin(tests.TestSpark):
def setUp(self):
super(TestClouderaPlugin, self).setUp()
self.master_host = "CLOUDERA_MANAGER"
self.engine_class = edp_engine.EdpSparkEngine
self.spark_user = "sudo -u spark "
self.spark_submit = "spark-submit"
self.master = "yarn-cluster"
self.deploy_mode = "cluster"

View File

@ -0,0 +1,28 @@
# Copyright (c) 2015 ISPRAS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins.cdh.v5_4_0 import edp_engine
from sahara.tests.unit.service.edp.spark import base as tests
class TestClouderaPlugin(tests.TestSpark):
def setUp(self):
super(TestClouderaPlugin, self).setUp()
self.master_host = "CLOUDERA_MANAGER"
self.engine_class = edp_engine.EdpSparkEngine
self.spark_user = "sudo -u spark "
self.spark_submit = "spark-submit"
self.master = "yarn-cluster"
self.deploy_mode = "cluster"