From e7d6799155040a01becbac0cc24329530823cb30 Mon Sep 17 00:00:00 2001 From: Sergey Gotliv Date: Sun, 28 Jun 2015 01:46:46 +0300 Subject: [PATCH] Adding support for the Spark Shell job Implements: blueprint edp-add-spark-shell-action Change-Id: I6d2ec02f854ab2eeeab2413bb56f1a359a3837c1 --- sahara/plugins/spark/edp_engine.py | 4 + sahara/plugins/spark/plugin.py | 19 ++- sahara/plugins/spark/shell_engine.py | 28 ++++ sahara/service/edp/spark/engine.py | 124 ++++++++++------ .../tests/unit/plugins/spark/test_plugin.py | 70 +++++++-- .../unit/service/edp/spark/test_shell.py | 139 ++++++++++++++++++ 6 files changed, 323 insertions(+), 61 deletions(-) create mode 100644 sahara/plugins/spark/shell_engine.py create mode 100644 sahara/tests/unit/service/edp/spark/test_shell.py diff --git a/sahara/plugins/spark/edp_engine.py b/sahara/plugins/spark/edp_engine.py index 2908ab21..31302103 100644 --- a/sahara/plugins/spark/edp_engine.py +++ b/sahara/plugins/spark/edp_engine.py @@ -49,6 +49,10 @@ class EdpEngine(edp_engine.SparkJobEngine): def edp_supported(version): return version >= EdpEngine.edp_base_version + @staticmethod + def job_type_supported(job_type): + return job_type in edp_engine.SparkJobEngine.get_supported_job_types() + def validate_job_execution(self, cluster, job, data): if not self.edp_supported(cluster.hadoop_version): raise ex.InvalidDataException( diff --git a/sahara/plugins/spark/plugin.py b/sahara/plugins/spark/plugin.py index 349f7f86..4565d5de 100644 --- a/sahara/plugins/spark/plugin.py +++ b/sahara/plugins/spark/plugin.py @@ -30,6 +30,7 @@ from sahara.plugins.spark import config_helper as c_helper from sahara.plugins.spark import edp_engine from sahara.plugins.spark import run_scripts as run from sahara.plugins.spark import scaling as sc +from sahara.plugins.spark import shell_engine from sahara.plugins import utils from sahara.topology import topology_helper as th from sahara.utils import cluster_progress_ops as cpo @@ -491,22 +492,34 @@ class SparkProvider(p.ProvisioningPluginBase): rep_factor) def get_edp_engine(self, cluster, job_type): - if job_type in edp_engine.EdpEngine.get_supported_job_types(): + if edp_engine.EdpEngine.job_type_supported(job_type): return edp_engine.EdpEngine(cluster) + if shell_engine.ShellEngine.job_type_supported(job_type): + return shell_engine.ShellEngine(cluster) + return None def get_edp_job_types(self, versions=[]): res = {} for vers in self.get_versions(): if not versions or vers in versions: + res[vers] = shell_engine.ShellEngine.get_supported_job_types() + if edp_engine.EdpEngine.edp_supported(vers): - res[vers] = edp_engine.EdpEngine.get_supported_job_types() + res[vers].extend( + edp_engine.EdpEngine.get_supported_job_types()) + return res def get_edp_config_hints(self, job_type, version): - if edp_engine.EdpEngine.edp_supported(version): + if (edp_engine.EdpEngine.edp_supported(version) and + edp_engine.EdpEngine.job_type_supported(job_type)): return edp_engine.EdpEngine.get_possible_job_config(job_type) + + if shell_engine.ShellEngine.job_type_supported(job_type): + return shell_engine.ShellEngine.get_possible_job_config(job_type) + return {} def get_open_ports(self, node_group): diff --git a/sahara/plugins/spark/shell_engine.py b/sahara/plugins/spark/shell_engine.py new file mode 100644 index 00000000..77484271 --- /dev/null +++ b/sahara/plugins/spark/shell_engine.py @@ -0,0 +1,28 @@ +# Copyright (c) 2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sahara.plugins import utils as plugin_utils +from sahara.service.edp.spark import engine as shell_engine + + +class ShellEngine(shell_engine.SparkShellJobEngine): + def __init__(self, cluster): + super(ShellEngine, self).__init__(cluster) + self.master = plugin_utils.get_instance(cluster, "master") + + @staticmethod + def job_type_supported(job_type): + return (job_type in shell_engine.SparkShellJobEngine. + get_supported_job_types()) diff --git a/sahara/service/edp/spark/engine.py b/sahara/service/edp/spark/engine.py index 7b62c16a..b9da4c0c 100644 --- a/sahara/service/edp/spark/engine.py +++ b/sahara/service/edp/spark/engine.py @@ -198,51 +198,10 @@ class SparkJobEngine(base_engine.JobEngine): with remote.get_remote(instance) as r: return self._get_job_status_from_remote(r, pid, job_execution) - def run_job(self, job_execution): - ctx = context.ctx() - job = conductor.job_get(ctx, job_execution.job_id) + def _build_command(self, wf_dir, paths, builtin_paths, + updated_job_configs): indep_params = {} - # This will be a dictionary of tuples, (native_url, runtime_url) - # keyed by data_source id - data_source_urls = {} - additional_sources, updated_job_configs = ( - job_utils.resolve_data_source_references(job_execution.job_configs, - job_execution.id, - data_source_urls, - self.cluster) - ) - - job_execution = conductor.job_execution_update( - ctx, job_execution, - {"data_source_urls": job_utils.to_url_dict(data_source_urls)}) - - # Now that we've recorded the native urls, we can switch to the - # runtime urls - data_source_urls = job_utils.to_url_dict(data_source_urls, - runtime=True) - - for data_source in additional_sources: - if data_source and data_source.type == 'hdfs': - h.configure_cluster_for_hdfs(self.cluster, data_source) - break - - # It is needed in case we are working with Spark plugin - self.plugin_params['master'] = ( - self.plugin_params['master'] % {'host': self.master.hostname()}) - - # TODO(tmckay): wf_dir should probably be configurable. - # The only requirement is that the dir is writable by the image user - wf_dir = job_utils.create_workflow_dir(self.master, '/tmp/spark-edp', - job, job_execution.id, "700") - paths, builtin_paths = self._upload_job_files( - self.master, wf_dir, job, updated_job_configs) - - # We can shorten the paths in this case since we'll run out of wf_dir - paths = [os.path.basename(p) if p.startswith(wf_dir) else p - for p in paths] - builtin_paths = [os.path.basename(p) for p in builtin_paths] - # TODO(tmckay): for now, paths[0] is always assumed to be the app # jar and we generate paths in order (mains, then libs). # When we have a Spark job type, we can require a "main" and set @@ -313,6 +272,55 @@ class SparkJobEngine(base_engine.JobEngine): ' --deploy-mode %(deploy-mode)s' ' %(app_jar)s%(args)s') % dict( mutual_dict) + + return cmd + + def run_job(self, job_execution): + ctx = context.ctx() + job = conductor.job_get(ctx, job_execution.job_id) + # This will be a dictionary of tuples, (native_url, runtime_url) + # keyed by data_source id + data_source_urls = {} + additional_sources, updated_job_configs = ( + job_utils.resolve_data_source_references(job_execution.job_configs, + job_execution.id, + data_source_urls, + self.cluster) + ) + + job_execution = conductor.job_execution_update( + ctx, job_execution, + {"data_source_urls": job_utils.to_url_dict(data_source_urls)}) + + # Now that we've recorded the native urls, we can switch to the + # runtime urls + data_source_urls = job_utils.to_url_dict(data_source_urls, + runtime=True) + + for data_source in additional_sources: + if data_source and data_source.type == 'hdfs': + h.configure_cluster_for_hdfs(self.cluster, data_source) + break + + # It is needed in case we are working with Spark plugin + self.plugin_params['master'] = ( + self.plugin_params['master'] % {'host': self.master.hostname()}) + + # TODO(tmckay): wf_dir should probably be configurable. + # The only requirement is that the dir is writable by the image user + wf_dir = job_utils.create_workflow_dir(self.master, '/tmp/spark-edp', + job, job_execution.id, "700") + paths, builtin_paths = self._upload_job_files( + self.master, wf_dir, job, updated_job_configs) + + # We can shorten the paths in this case since we'll run out of wf_dir + paths = [os.path.basename(p) if p.startswith(wf_dir) else p + for p in paths] + builtin_paths = [os.path.basename(p) for p in builtin_paths] + + cmd = self._build_command(wf_dir, paths, builtin_paths, + updated_job_configs) + job_execution = conductor.job_execution_get(ctx, job_execution.id) if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED: return (None, edp.JOB_STATUS_KILLED, None) @@ -355,3 +363,33 @@ class SparkJobEngine(base_engine.JobEngine): @staticmethod def get_supported_job_types(): return [edp.JOB_TYPE_SPARK] + + +class SparkShellJobEngine(SparkJobEngine): + def _build_command(self, wf_dir, paths, builtin_paths, + updated_job_configs): + main_script = paths.pop(0) + args = " ".join(updated_job_configs.get('args', [])) + + env_params = "" + params = updated_job_configs.get('params', {}) + for key, value in params.items(): + env_params += "{key}={value} ".format(key=key, value=value) + + cmd = ("{env_params}{cmd} {main_script} {args}".format( + cmd='/bin/sh', main_script=main_script, env_params=env_params, + args=args)) + + return cmd + + def validate_job_execution(self, cluster, job, data): + # Shell job doesn't require any special validation + pass + + @staticmethod + def get_possible_job_config(job_type): + return {'job_config': {'configs': {}, 'args': [], 'params': {}}} + + @staticmethod + def get_supported_job_types(): + return [edp.JOB_TYPE_SHELL] diff --git a/sahara/tests/unit/plugins/spark/test_plugin.py b/sahara/tests/unit/plugins/spark/test_plugin.py index 0c23068c..4b4c2488 100644 --- a/sahara/tests/unit/plugins/spark/test_plugin.py +++ b/sahara/tests/unit/plugins/spark/test_plugin.py @@ -20,6 +20,7 @@ from sahara import conductor as cond from sahara import context from sahara import exceptions as ex from sahara.plugins import base as pb +from sahara.plugins.spark import plugin as pl from sahara.service.edp.spark import engine from sahara.tests.unit import base from sahara.utils import edp @@ -34,12 +35,16 @@ class SparkPluginTest(base.SaharaWithDbTestCase): self.override_config("plugins", ["spark"]) pb.setup_plugins() - def test_plugin09_edp_engine_validation(self): + def _init_cluster_dict(self, version): cluster_dict = { 'name': 'cluster', 'plugin_name': 'spark', - 'hadoop_version': '0.9.1', + 'hadoop_version': version, 'default_image_id': 'image'} + return cluster_dict + + def test_plugin09_edp_engine_validation(self): + cluster_dict = self._init_cluster_dict('0.9.1') job = mock.Mock() job.type = edp.JOB_TYPE_SPARK @@ -54,17 +59,23 @@ class SparkPluginTest(base.SaharaWithDbTestCase): edp_engine.validate_job_execution(cluster, job, mock.Mock()) def test_plugin10_edp_engine(self): - cluster_dict = { - 'name': 'cluster', - 'plugin_name': 'spark', - 'hadoop_version': '1.0.0', - 'default_image_id': 'image'} + self._test_engine('1.0.0', edp.JOB_TYPE_SPARK, + engine.SparkJobEngine) + + def test_plugin09_shell_engine(self): + self._test_engine('0.9.1', edp.JOB_TYPE_SHELL, + engine.SparkShellJobEngine) + + def test_plugin10_shell_engine(self): + self._test_engine('1.0.0', edp.JOB_TYPE_SHELL, + engine.SparkShellJobEngine) + + def _test_engine(self, version, job_type, eng): + cluster_dict = self._init_cluster_dict(version) cluster = conductor.cluster_create(context.ctx(), cluster_dict) plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) - self.assertIsInstance( - plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK), - engine.SparkJobEngine) + self.assertIsInstance(plugin.get_edp_engine(cluster, job_type), eng) def test_plugin13_edp_engine(self): cluster_dict = { @@ -89,11 +100,7 @@ class SparkPluginTest(base.SaharaWithDbTestCase): 'cron': 'cron_text'}} instance.node_group.node_processes = ["master"] instance.node_group.id = id - cluster_dict = { - 'name': 'cluster', - 'plugin_name': 'spark', - 'hadoop_version': '1.0.0', - 'default_image_id': 'image'} + cluster_dict = self._init_cluster_dict('1.0.0') cluster = conductor.cluster_create(context.ctx(), cluster_dict) plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) @@ -115,3 +122,36 @@ class SparkPluginTest(base.SaharaWithDbTestCase): plugin._push_cleanup_job(remote, cluster, extra_conf, instance) remote.execute_command.assert_called_with( 'sudo rm -f /etc/crond.d/spark-cleanup') + + +class SparkProviderTest(base.SaharaTestCase): + def setUp(self): + super(SparkProviderTest, self).setUp() + + def test_supported_job_types(self): + provider = pl.SparkProvider() + + res = provider.get_edp_job_types() + self.assertEqual([edp.JOB_TYPE_SHELL, edp.JOB_TYPE_SPARK], + res['1.0.0']) + self.assertEqual([edp.JOB_TYPE_SHELL, edp.JOB_TYPE_SPARK], + res['1.3.1']) + + def test_edp_config_hints(self): + provider = pl.SparkProvider() + + res = provider.get_edp_config_hints(edp.JOB_TYPE_SHELL, "1.0.0") + self.assertEqual({'configs': {}, 'args': [], 'params': {}}, + res['job_config']) + + res = provider.get_edp_config_hints(edp.JOB_TYPE_SHELL, "1.3.1") + self.assertEqual({'configs': {}, 'args': [], 'params': {}}, + res['job_config']) + + res = provider.get_edp_config_hints(edp.JOB_TYPE_SPARK, "1.0.0") + self.assertEqual({'args': [], 'configs': []}, + res['job_config']) + + res = provider.get_edp_config_hints(edp.JOB_TYPE_SPARK, "1.3.1") + self.assertEqual({'args': [], 'configs': []}, + res['job_config']) diff --git a/sahara/tests/unit/service/edp/spark/test_shell.py b/sahara/tests/unit/service/edp/spark/test_shell.py new file mode 100644 index 00000000..5c7fb558 --- /dev/null +++ b/sahara/tests/unit/service/edp/spark/test_shell.py @@ -0,0 +1,139 @@ +# Copyright (c) 2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from sahara.plugins.spark import shell_engine as shell_engine +from sahara.tests.unit import base +from sahara.utils import edp + + +class TestSparkShellEngine(base.SaharaTestCase): + def setUp(self): + super(TestSparkShellEngine, self).setUp() + self.master_host = "master" + self.master_port = 7077 + self.master_instance_id = "6789" + self.spark_pid = "12345" + self.spark_home = "/opt/spark" + self.workflow_dir = "/wfdir" + + def _create_master_instance(self, return_code=0): + master = mock.Mock() + master.execute_command.return_value = (return_code, self.spark_pid) + master.hostname.return_value = self.master_host + master.id = self.master_instance_id + return master + + def _build_cmd(self, params='', args=''): + cmd = ('%(env_params)s%(cmd)s %(main_script)s %(args)s' % ( + {'cmd': '/bin/sh', 'main_script': 'main_script.sh', + 'env_params': params, 'args': args}) + ) + + return ("cd %s; ./launch_command %s > /dev/null 2>&1 & echo $!" % + (self.workflow_dir, cmd)) + + def _check_status(self, status): + self.assertEqual(("%s@%s" % (self.spark_pid, self.master_instance_id), + edp.JOB_STATUS_RUNNING, + {"spark-path": self.workflow_dir}), status) + + @mock.patch('sahara.conductor.API.job_execution_update') + @mock.patch('sahara.conductor.API.job_execution_get') + @mock.patch('sahara.utils.remote.get_remote') + @mock.patch('sahara.service.edp.job_utils.create_workflow_dir') + @mock.patch('sahara.plugins.utils.get_instance') + @mock.patch('sahara.conductor.API.job_get') + @mock.patch('sahara.context.ctx', return_value="ctx") + def _setup_run_job(self, master_instance, job_configs, + ctx, job_get, get_instance, create_workflow_dir, + get_remote, job_exec_get, job_exec_update): + job = mock.Mock() + job.name = "Spark shell job" + job_get.return_value = job + + create_workflow_dir.return_value = self.workflow_dir + + # This is to mock "with remote.get_remote(master) as r" in run_job + get_remote.return_value.__enter__ = mock.Mock( + return_value=master_instance) + get_instance.return_value = master_instance + + eng = shell_engine.ShellEngine("cluster") + eng._upload_job_files = mock.Mock() + eng._upload_job_files.return_value = ['main_script.sh'], [] + + job_exec = mock.Mock() + job_exec.job_configs = job_configs + status = eng.run_job(job_exec) + + # Check that we launch command on the master node + get_instance.assert_called_with("cluster", self.master_host) + + return status + + def test_run_job_without_args_and_params(self): + job_configs = { + 'configs': {}, + 'args': [], + 'params': {} + } + + master_instance = self._create_master_instance() + status = self._setup_run_job(master_instance, job_configs) + + # Check the command + master_instance.execute_command.assert_called_with( + self._build_cmd()) + + # Check execution status + self._check_status(status) + + def test_run_job_with_args(self): + job_configs = { + 'configs': {}, + 'args': ['arg1', 'arg2'], + 'params': {} + } + + master_instance = self._create_master_instance() + status = self._setup_run_job(master_instance, job_configs) + + # Check the command + master_instance.execute_command.assert_called_with( + self._build_cmd(args='arg1 arg2') + ) + + # Check execution status + self._check_status(status) + + def test_run_job_with_params(self): + job_configs = { + 'configs': {}, + 'args': [], + 'params': {'A': 'a'} + } + + master_instance = self._create_master_instance() + status = self._setup_run_job(master_instance, job_configs) + + # Check the command + master_instance.execute_command.assert_called_with( + self._build_cmd(params='A=a ') + ) + + # Check execution status + self._check_status(status)