Adding support for the Spark Shell job

Implements: blueprint edp-add-spark-shell-action
Change-Id: I6d2ec02f854ab2eeeab2413bb56f1a359a3837c1
This commit is contained in:
Sergey Gotliv 2015-06-28 01:46:46 +03:00
parent 32d8be795f
commit e7d6799155
6 changed files with 323 additions and 61 deletions

View File

@ -49,6 +49,10 @@ class EdpEngine(edp_engine.SparkJobEngine):
def edp_supported(version): def edp_supported(version):
return version >= EdpEngine.edp_base_version return version >= EdpEngine.edp_base_version
@staticmethod
def job_type_supported(job_type):
return job_type in edp_engine.SparkJobEngine.get_supported_job_types()
def validate_job_execution(self, cluster, job, data): def validate_job_execution(self, cluster, job, data):
if not self.edp_supported(cluster.hadoop_version): if not self.edp_supported(cluster.hadoop_version):
raise ex.InvalidDataException( raise ex.InvalidDataException(

View File

@ -30,6 +30,7 @@ from sahara.plugins.spark import config_helper as c_helper
from sahara.plugins.spark import edp_engine from sahara.plugins.spark import edp_engine
from sahara.plugins.spark import run_scripts as run from sahara.plugins.spark import run_scripts as run
from sahara.plugins.spark import scaling as sc from sahara.plugins.spark import scaling as sc
from sahara.plugins.spark import shell_engine
from sahara.plugins import utils from sahara.plugins import utils
from sahara.topology import topology_helper as th from sahara.topology import topology_helper as th
from sahara.utils import cluster_progress_ops as cpo from sahara.utils import cluster_progress_ops as cpo
@ -491,22 +492,34 @@ class SparkProvider(p.ProvisioningPluginBase):
rep_factor) rep_factor)
def get_edp_engine(self, cluster, job_type): def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpEngine.get_supported_job_types(): if edp_engine.EdpEngine.job_type_supported(job_type):
return edp_engine.EdpEngine(cluster) return edp_engine.EdpEngine(cluster)
if shell_engine.ShellEngine.job_type_supported(job_type):
return shell_engine.ShellEngine(cluster)
return None return None
def get_edp_job_types(self, versions=[]): def get_edp_job_types(self, versions=[]):
res = {} res = {}
for vers in self.get_versions(): for vers in self.get_versions():
if not versions or vers in versions: if not versions or vers in versions:
res[vers] = shell_engine.ShellEngine.get_supported_job_types()
if edp_engine.EdpEngine.edp_supported(vers): if edp_engine.EdpEngine.edp_supported(vers):
res[vers] = edp_engine.EdpEngine.get_supported_job_types() res[vers].extend(
edp_engine.EdpEngine.get_supported_job_types())
return res return res
def get_edp_config_hints(self, job_type, version): def get_edp_config_hints(self, job_type, version):
if edp_engine.EdpEngine.edp_supported(version): if (edp_engine.EdpEngine.edp_supported(version) and
edp_engine.EdpEngine.job_type_supported(job_type)):
return edp_engine.EdpEngine.get_possible_job_config(job_type) return edp_engine.EdpEngine.get_possible_job_config(job_type)
if shell_engine.ShellEngine.job_type_supported(job_type):
return shell_engine.ShellEngine.get_possible_job_config(job_type)
return {} return {}
def get_open_ports(self, node_group): def get_open_ports(self, node_group):

View File

@ -0,0 +1,28 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins import utils as plugin_utils
from sahara.service.edp.spark import engine as shell_engine
class ShellEngine(shell_engine.SparkShellJobEngine):
def __init__(self, cluster):
super(ShellEngine, self).__init__(cluster)
self.master = plugin_utils.get_instance(cluster, "master")
@staticmethod
def job_type_supported(job_type):
return (job_type in shell_engine.SparkShellJobEngine.
get_supported_job_types())

View File

@ -198,51 +198,10 @@ class SparkJobEngine(base_engine.JobEngine):
with remote.get_remote(instance) as r: with remote.get_remote(instance) as r:
return self._get_job_status_from_remote(r, pid, job_execution) return self._get_job_status_from_remote(r, pid, job_execution)
def run_job(self, job_execution): def _build_command(self, wf_dir, paths, builtin_paths,
ctx = context.ctx() updated_job_configs):
job = conductor.job_get(ctx, job_execution.job_id)
indep_params = {} indep_params = {}
# This will be a dictionary of tuples, (native_url, runtime_url)
# keyed by data_source id
data_source_urls = {}
additional_sources, updated_job_configs = (
job_utils.resolve_data_source_references(job_execution.job_configs,
job_execution.id,
data_source_urls,
self.cluster)
)
job_execution = conductor.job_execution_update(
ctx, job_execution,
{"data_source_urls": job_utils.to_url_dict(data_source_urls)})
# Now that we've recorded the native urls, we can switch to the
# runtime urls
data_source_urls = job_utils.to_url_dict(data_source_urls,
runtime=True)
for data_source in additional_sources:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(self.cluster, data_source)
break
# It is needed in case we are working with Spark plugin
self.plugin_params['master'] = (
self.plugin_params['master'] % {'host': self.master.hostname()})
# TODO(tmckay): wf_dir should probably be configurable.
# The only requirement is that the dir is writable by the image user
wf_dir = job_utils.create_workflow_dir(self.master, '/tmp/spark-edp',
job, job_execution.id, "700")
paths, builtin_paths = self._upload_job_files(
self.master, wf_dir, job, updated_job_configs)
# We can shorten the paths in this case since we'll run out of wf_dir
paths = [os.path.basename(p) if p.startswith(wf_dir) else p
for p in paths]
builtin_paths = [os.path.basename(p) for p in builtin_paths]
# TODO(tmckay): for now, paths[0] is always assumed to be the app # TODO(tmckay): for now, paths[0] is always assumed to be the app
# jar and we generate paths in order (mains, then libs). # jar and we generate paths in order (mains, then libs).
# When we have a Spark job type, we can require a "main" and set # When we have a Spark job type, we can require a "main" and set
@ -313,6 +272,55 @@ class SparkJobEngine(base_engine.JobEngine):
' --deploy-mode %(deploy-mode)s' ' --deploy-mode %(deploy-mode)s'
' %(app_jar)s%(args)s') % dict( ' %(app_jar)s%(args)s') % dict(
mutual_dict) mutual_dict)
return cmd
def run_job(self, job_execution):
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
# This will be a dictionary of tuples, (native_url, runtime_url)
# keyed by data_source id
data_source_urls = {}
additional_sources, updated_job_configs = (
job_utils.resolve_data_source_references(job_execution.job_configs,
job_execution.id,
data_source_urls,
self.cluster)
)
job_execution = conductor.job_execution_update(
ctx, job_execution,
{"data_source_urls": job_utils.to_url_dict(data_source_urls)})
# Now that we've recorded the native urls, we can switch to the
# runtime urls
data_source_urls = job_utils.to_url_dict(data_source_urls,
runtime=True)
for data_source in additional_sources:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(self.cluster, data_source)
break
# It is needed in case we are working with Spark plugin
self.plugin_params['master'] = (
self.plugin_params['master'] % {'host': self.master.hostname()})
# TODO(tmckay): wf_dir should probably be configurable.
# The only requirement is that the dir is writable by the image user
wf_dir = job_utils.create_workflow_dir(self.master, '/tmp/spark-edp',
job, job_execution.id, "700")
paths, builtin_paths = self._upload_job_files(
self.master, wf_dir, job, updated_job_configs)
# We can shorten the paths in this case since we'll run out of wf_dir
paths = [os.path.basename(p) if p.startswith(wf_dir) else p
for p in paths]
builtin_paths = [os.path.basename(p) for p in builtin_paths]
cmd = self._build_command(wf_dir, paths, builtin_paths,
updated_job_configs)
job_execution = conductor.job_execution_get(ctx, job_execution.id) job_execution = conductor.job_execution_get(ctx, job_execution.id)
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED: if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
return (None, edp.JOB_STATUS_KILLED, None) return (None, edp.JOB_STATUS_KILLED, None)
@ -355,3 +363,33 @@ class SparkJobEngine(base_engine.JobEngine):
@staticmethod @staticmethod
def get_supported_job_types(): def get_supported_job_types():
return [edp.JOB_TYPE_SPARK] return [edp.JOB_TYPE_SPARK]
class SparkShellJobEngine(SparkJobEngine):
def _build_command(self, wf_dir, paths, builtin_paths,
updated_job_configs):
main_script = paths.pop(0)
args = " ".join(updated_job_configs.get('args', []))
env_params = ""
params = updated_job_configs.get('params', {})
for key, value in params.items():
env_params += "{key}={value} ".format(key=key, value=value)
cmd = ("{env_params}{cmd} {main_script} {args}".format(
cmd='/bin/sh', main_script=main_script, env_params=env_params,
args=args))
return cmd
def validate_job_execution(self, cluster, job, data):
# Shell job doesn't require any special validation
pass
@staticmethod
def get_possible_job_config(job_type):
return {'job_config': {'configs': {}, 'args': [], 'params': {}}}
@staticmethod
def get_supported_job_types():
return [edp.JOB_TYPE_SHELL]

View File

@ -20,6 +20,7 @@ from sahara import conductor as cond
from sahara import context from sahara import context
from sahara import exceptions as ex from sahara import exceptions as ex
from sahara.plugins import base as pb from sahara.plugins import base as pb
from sahara.plugins.spark import plugin as pl
from sahara.service.edp.spark import engine from sahara.service.edp.spark import engine
from sahara.tests.unit import base from sahara.tests.unit import base
from sahara.utils import edp from sahara.utils import edp
@ -34,12 +35,16 @@ class SparkPluginTest(base.SaharaWithDbTestCase):
self.override_config("plugins", ["spark"]) self.override_config("plugins", ["spark"])
pb.setup_plugins() pb.setup_plugins()
def test_plugin09_edp_engine_validation(self): def _init_cluster_dict(self, version):
cluster_dict = { cluster_dict = {
'name': 'cluster', 'name': 'cluster',
'plugin_name': 'spark', 'plugin_name': 'spark',
'hadoop_version': '0.9.1', 'hadoop_version': version,
'default_image_id': 'image'} 'default_image_id': 'image'}
return cluster_dict
def test_plugin09_edp_engine_validation(self):
cluster_dict = self._init_cluster_dict('0.9.1')
job = mock.Mock() job = mock.Mock()
job.type = edp.JOB_TYPE_SPARK job.type = edp.JOB_TYPE_SPARK
@ -54,17 +59,23 @@ class SparkPluginTest(base.SaharaWithDbTestCase):
edp_engine.validate_job_execution(cluster, job, mock.Mock()) edp_engine.validate_job_execution(cluster, job, mock.Mock())
def test_plugin10_edp_engine(self): def test_plugin10_edp_engine(self):
cluster_dict = { self._test_engine('1.0.0', edp.JOB_TYPE_SPARK,
'name': 'cluster', engine.SparkJobEngine)
'plugin_name': 'spark',
'hadoop_version': '1.0.0', def test_plugin09_shell_engine(self):
'default_image_id': 'image'} self._test_engine('0.9.1', edp.JOB_TYPE_SHELL,
engine.SparkShellJobEngine)
def test_plugin10_shell_engine(self):
self._test_engine('1.0.0', edp.JOB_TYPE_SHELL,
engine.SparkShellJobEngine)
def _test_engine(self, version, job_type, eng):
cluster_dict = self._init_cluster_dict(version)
cluster = conductor.cluster_create(context.ctx(), cluster_dict) cluster = conductor.cluster_create(context.ctx(), cluster_dict)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
self.assertIsInstance( self.assertIsInstance(plugin.get_edp_engine(cluster, job_type), eng)
plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK),
engine.SparkJobEngine)
def test_plugin13_edp_engine(self): def test_plugin13_edp_engine(self):
cluster_dict = { cluster_dict = {
@ -89,11 +100,7 @@ class SparkPluginTest(base.SaharaWithDbTestCase):
'cron': 'cron_text'}} 'cron': 'cron_text'}}
instance.node_group.node_processes = ["master"] instance.node_group.node_processes = ["master"]
instance.node_group.id = id instance.node_group.id = id
cluster_dict = { cluster_dict = self._init_cluster_dict('1.0.0')
'name': 'cluster',
'plugin_name': 'spark',
'hadoop_version': '1.0.0',
'default_image_id': 'image'}
cluster = conductor.cluster_create(context.ctx(), cluster_dict) cluster = conductor.cluster_create(context.ctx(), cluster_dict)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
@ -115,3 +122,36 @@ class SparkPluginTest(base.SaharaWithDbTestCase):
plugin._push_cleanup_job(remote, cluster, extra_conf, instance) plugin._push_cleanup_job(remote, cluster, extra_conf, instance)
remote.execute_command.assert_called_with( remote.execute_command.assert_called_with(
'sudo rm -f /etc/crond.d/spark-cleanup') 'sudo rm -f /etc/crond.d/spark-cleanup')
class SparkProviderTest(base.SaharaTestCase):
def setUp(self):
super(SparkProviderTest, self).setUp()
def test_supported_job_types(self):
provider = pl.SparkProvider()
res = provider.get_edp_job_types()
self.assertEqual([edp.JOB_TYPE_SHELL, edp.JOB_TYPE_SPARK],
res['1.0.0'])
self.assertEqual([edp.JOB_TYPE_SHELL, edp.JOB_TYPE_SPARK],
res['1.3.1'])
def test_edp_config_hints(self):
provider = pl.SparkProvider()
res = provider.get_edp_config_hints(edp.JOB_TYPE_SHELL, "1.0.0")
self.assertEqual({'configs': {}, 'args': [], 'params': {}},
res['job_config'])
res = provider.get_edp_config_hints(edp.JOB_TYPE_SHELL, "1.3.1")
self.assertEqual({'configs': {}, 'args': [], 'params': {}},
res['job_config'])
res = provider.get_edp_config_hints(edp.JOB_TYPE_SPARK, "1.0.0")
self.assertEqual({'args': [], 'configs': []},
res['job_config'])
res = provider.get_edp_config_hints(edp.JOB_TYPE_SPARK, "1.3.1")
self.assertEqual({'args': [], 'configs': []},
res['job_config'])

View File

@ -0,0 +1,139 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara.plugins.spark import shell_engine as shell_engine
from sahara.tests.unit import base
from sahara.utils import edp
class TestSparkShellEngine(base.SaharaTestCase):
def setUp(self):
super(TestSparkShellEngine, self).setUp()
self.master_host = "master"
self.master_port = 7077
self.master_instance_id = "6789"
self.spark_pid = "12345"
self.spark_home = "/opt/spark"
self.workflow_dir = "/wfdir"
def _create_master_instance(self, return_code=0):
master = mock.Mock()
master.execute_command.return_value = (return_code, self.spark_pid)
master.hostname.return_value = self.master_host
master.id = self.master_instance_id
return master
def _build_cmd(self, params='', args=''):
cmd = ('%(env_params)s%(cmd)s %(main_script)s %(args)s' % (
{'cmd': '/bin/sh', 'main_script': 'main_script.sh',
'env_params': params, 'args': args})
)
return ("cd %s; ./launch_command %s > /dev/null 2>&1 & echo $!" %
(self.workflow_dir, cmd))
def _check_status(self, status):
self.assertEqual(("%s@%s" % (self.spark_pid, self.master_instance_id),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
@mock.patch('sahara.conductor.API.job_execution_update')
@mock.patch('sahara.conductor.API.job_execution_get')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.service.edp.job_utils.create_workflow_dir')
@mock.patch('sahara.plugins.utils.get_instance')
@mock.patch('sahara.conductor.API.job_get')
@mock.patch('sahara.context.ctx', return_value="ctx")
def _setup_run_job(self, master_instance, job_configs,
ctx, job_get, get_instance, create_workflow_dir,
get_remote, job_exec_get, job_exec_update):
job = mock.Mock()
job.name = "Spark shell job"
job_get.return_value = job
create_workflow_dir.return_value = self.workflow_dir
# This is to mock "with remote.get_remote(master) as r" in run_job
get_remote.return_value.__enter__ = mock.Mock(
return_value=master_instance)
get_instance.return_value = master_instance
eng = shell_engine.ShellEngine("cluster")
eng._upload_job_files = mock.Mock()
eng._upload_job_files.return_value = ['main_script.sh'], []
job_exec = mock.Mock()
job_exec.job_configs = job_configs
status = eng.run_job(job_exec)
# Check that we launch command on the master node
get_instance.assert_called_with("cluster", self.master_host)
return status
def test_run_job_without_args_and_params(self):
job_configs = {
'configs': {},
'args': [],
'params': {}
}
master_instance = self._create_master_instance()
status = self._setup_run_job(master_instance, job_configs)
# Check the command
master_instance.execute_command.assert_called_with(
self._build_cmd())
# Check execution status
self._check_status(status)
def test_run_job_with_args(self):
job_configs = {
'configs': {},
'args': ['arg1', 'arg2'],
'params': {}
}
master_instance = self._create_master_instance()
status = self._setup_run_job(master_instance, job_configs)
# Check the command
master_instance.execute_command.assert_called_with(
self._build_cmd(args='arg1 arg2')
)
# Check execution status
self._check_status(status)
def test_run_job_with_params(self):
job_configs = {
'configs': {},
'args': [],
'params': {'A': 'a'}
}
master_instance = self._create_master_instance()
status = self._setup_run_job(master_instance, job_configs)
# Check the command
master_instance.execute_command.assert_called_with(
self._build_cmd(params='A=a ')
)
# Check execution status
self._check_status(status)