From b542be80ea4c3af83413c6061c5e3707859d6e3d Mon Sep 17 00:00:00 2001 From: Artem Osadchyi Date: Tue, 10 Mar 2015 12:30:09 +0200 Subject: [PATCH] Add Spark support for MapR plugin Change-Id: Ic15af84f4d20f5a8e168f723ff2678a8f5f5c448 Implements: blueprint mapr-spark --- .../plugins/mapr/domain/configuration_file.py | 35 ++++ sahara/plugins/mapr/services/mysql/mysql.py | 3 +- .../plugins/mapr/services/spark/__init__.py | 0 .../spark/resources/spark-default.json | 20 ++ sahara/plugins/mapr/services/spark/spark.py | 175 ++++++++++++++++++ sahara/plugins/mapr/util/general.py | 85 ++++++++- sahara/plugins/mapr/util/maprfs_helper.py | 15 ++ .../mapr/versions/mapr_spark/__init__.py | 0 .../mapr/versions/mapr_spark/context.py | 49 +++++ .../mapr/versions/mapr_spark/spark_engine.py | 154 +++++++++++++++ .../versions/mapr_spark/spark_node_manager.py | 27 +++ .../versions/mapr_spark/version_handler.py | 50 +++++ .../unit/plugins/mapr/test_config_files.py | 57 ++++++ 13 files changed, 663 insertions(+), 7 deletions(-) create mode 100644 sahara/plugins/mapr/services/spark/__init__.py create mode 100644 sahara/plugins/mapr/services/spark/resources/spark-default.json create mode 100755 sahara/plugins/mapr/services/spark/spark.py create mode 100755 sahara/plugins/mapr/versions/mapr_spark/__init__.py create mode 100644 sahara/plugins/mapr/versions/mapr_spark/context.py create mode 100755 sahara/plugins/mapr/versions/mapr_spark/spark_engine.py create mode 100644 sahara/plugins/mapr/versions/mapr_spark/spark_node_manager.py create mode 100755 sahara/plugins/mapr/versions/mapr_spark/version_handler.py diff --git a/sahara/plugins/mapr/domain/configuration_file.py b/sahara/plugins/mapr/domain/configuration_file.py index 40981a23..b259c6dc 100644 --- a/sahara/plugins/mapr/domain/configuration_file.py +++ b/sahara/plugins/mapr/domain/configuration_file.py @@ -15,6 +15,7 @@ import abc import os +import re import jinja2 as j2 import six @@ -130,3 +131,37 @@ class TemplateFile(BaseConfigurationFile): def parse(self, content): self._template = j2.Template(content) + + +class EnvironmentConfig(BaseConfigurationFile): + def __init__(self, file_name): + super(EnvironmentConfig, self).__init__(file_name) + self._lines = [] + self._regex = re.compile(r'export\s+(\w+)=(.+)') + self._tmpl = 'export %s="%s"' + + def parse(self, content): + for line in content.splitlines(): + line = line.strip().decode("utf-8") + match = self._regex.match(line) + if match: + name, value = match.groups() + value = value.replace("\"", '') + self._lines.append((name, value)) + self.add_property(name, value) + else: + self._lines.append(line) + + def render(self): + result = [] + for line in self._lines: + if isinstance(line, tuple): + name, value = line + args = (name, self._config_dict.get(name) or value) + result.append(self._tmpl % args) + if name in self._config_dict: + del self._config_dict[name] + else: + result.append(line) + extra_ops = [self._tmpl % i for i in six.iteritems(self._config_dict)] + return '\n'.join(result + extra_ops) + '\n' diff --git a/sahara/plugins/mapr/services/mysql/mysql.py b/sahara/plugins/mapr/services/mysql/mysql.py index 804e137e..6e270518 100644 --- a/sahara/plugins/mapr/services/mysql/mysql.py +++ b/sahara/plugins/mapr/services/mysql/mysql.py @@ -21,6 +21,7 @@ import six import sahara.plugins.mapr.domain.configuration_file as cf import sahara.plugins.mapr.domain.service as s import sahara.plugins.mapr.services.hive.hive as hive +from sahara.plugins.mapr.services.spark import spark import sahara.plugins.mapr.util.general as g import sahara.utils.files as f @@ -170,7 +171,7 @@ class MySQL(s.Service): @staticmethod def get_db_instance(context): - return context.oozie_server + return context.oozie_server or context.get_instance(spark.SPARK_MASTER) @staticmethod def create_databases(cluster_context, instances): diff --git a/sahara/plugins/mapr/services/spark/__init__.py b/sahara/plugins/mapr/services/spark/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sahara/plugins/mapr/services/spark/resources/spark-default.json b/sahara/plugins/mapr/services/spark/resources/spark-default.json new file mode 100644 index 00000000..b5ecc813 --- /dev/null +++ b/sahara/plugins/mapr/services/spark/resources/spark-default.json @@ -0,0 +1,20 @@ +[ + { + "name": "SPARK_WORKER_CORES", + "config_type": "int", + "value": 1, + "description": "The number of cores to use on this machine." + }, + { + "name": "SPARK_WORKER_MEMORY", + "config_type": "string", + "value": "16g", + "description": "How much total memory workers have to give executors (e.g. 1000m, 2g)." + }, + { + "name": "SPARK_WORKER_INSTANCES", + "config_type": "int", + "value": 1, + "description": "The number of worker processes per node." + } +] \ No newline at end of file diff --git a/sahara/plugins/mapr/services/spark/spark.py b/sahara/plugins/mapr/services/spark/spark.py new file mode 100755 index 00000000..12a3256d --- /dev/null +++ b/sahara/plugins/mapr/services/spark/spark.py @@ -0,0 +1,175 @@ +# Copyright (c) 2015, MapR Technologies +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +import sahara.plugins.mapr.domain.configuration_file as bcf +import sahara.plugins.mapr.domain.node_process as np +import sahara.plugins.mapr.domain.service as s +import sahara.plugins.mapr.util.general as g +import sahara.plugins.mapr.util.maprfs_helper as mfs +import sahara.plugins.mapr.util.validation_utils as vu + + +SPARK_MASTER_PORT = 7077 + + +class SparkNodeProcess(np.NodeProcess): + pass + + +class SparkMaster(np.NodeProcess): + _submit_port = SPARK_MASTER_PORT + + def submit_url(self, cluster_context): + host = cluster_context.get_instance(self).fqdn() + args = {'host': host, 'port': self.submit_port(cluster_context)} + return 'spark://%(host)s:%(port)s' % args + + def submit_port(self, cluster_context): + return self._submit_port + + +class SparkWorker(SparkNodeProcess): + _start_script = 'sbin/start-slave.sh' + + def start(self, cluster_context, instances=None): + master_url = SPARK_MASTER.submit_url(cluster_context) + args = { + 'spark_home': Spark().home_dir(cluster_context), + 'start_slave': self._start_script, + 'master_url': master_url, + } + command = g._run_as('mapr', '%(start_slave)s 1 %(master_url)s') + command = ('cd %(spark_home)s && ' + command) % args + g.execute_command(instances, command) + + +SPARK_MASTER = SparkMaster( + name='spark-master', + ui_name='Spark Master', + package='mapr-spark-master', + open_ports=[SPARK_MASTER_PORT], +) +SPARK_HISTORY_SERVER = SparkNodeProcess( + name='spark-historyserver', + ui_name='Spark HistoryServer', + package='mapr-spark-historyserver', +) +SPARK_SLAVE = SparkWorker( + name='spark-master', + ui_name='Spark Slave', + package='mapr-spark', +) + + +@six.add_metaclass(s.Single) +class Spark(s.Service): + def __init__(self): + super(Spark, self).__init__() + self._name = 'spark' + self._ui_name = 'Spark' + self._version = '1.2.1' + self._node_processes = [ + SPARK_HISTORY_SERVER, + SPARK_MASTER, + SPARK_SLAVE, + ] + self._dependencies = [('mapr-spark', self.version)] + self._ui_info = [('SPARK', SPARK_MASTER, 'http://%s:8080')] + self._validation_rules = [ + vu.exactly(1, SPARK_MASTER), + vu.exactly(1, SPARK_HISTORY_SERVER), + vu.at_least(1, SPARK_SLAVE), + ] + self._node_defaults = ['spark-default.json'] + + def _get_packages(self, node_processes): + result = [] + result += self.dependencies + result += [(np.package, self.version) + for np in node_processes + if np != SPARK_HISTORY_SERVER] + return g.unique_list(result) + + def get_config_files(self, cluster_context, configs, instance=None): + env = bcf.EnvironmentConfig('spark-env.sh') + env.remote_path = self.conf_dir(cluster_context) + if instance: + env.fetch(instance) + env.load_properties(configs) + env.add_properties(self._get_spark_ha_props(cluster_context)) + env.add_property('SPARK_WORKER_DIR', '/tmp/spark') + return [env] + + def configure(self, cluster_context, instances=None): + self._write_slaves_list(cluster_context) + + def update(self, cluster_context, instances=None): + if cluster_context.changed_instances(SPARK_SLAVE): + self._write_slaves_list(cluster_context) + + def post_install(self, cluster_context, instances): + self._install_ssh_keys(cluster_context, instances) + + def post_start(self, cluster_context, instances): + self._create_hadoop_spark_dirs(cluster_context) + if cluster_context.filter_instances(instances, SPARK_HISTORY_SERVER): + self._install_spark_history_server(cluster_context, instances) + + def _install_ssh_keys(self, cluster_context, instances): + slaves = cluster_context.filter_instances(instances, SPARK_SLAVE) + masters = cluster_context.filter_instances(instances, SPARK_MASTER) + instances = g.unique_list(masters + slaves) + private_key = cluster_context.cluster.management_private_key + public_key = cluster_context.cluster.management_public_key + g.execute_on_instances( + instances, g.install_ssh_key, 'mapr', private_key, public_key) + g.execute_on_instances(instances, g.authorize_key, 'mapr', public_key) + + def _get_spark_ha_props(self, cluster_context): + zookeepers = cluster_context.get_zookeeper_nodes_ip_with_port() + login_conf = '%s/conf/mapr.login.conf' % cluster_context.mapr_home + props = { + 'spark.deploy.recoveryMode': 'ZOOKEEPER', + 'spark.deploy.zookeeper.url': zookeepers, + 'zookeeper.sasl.client': 'false', + 'java.security.auth.login.config': login_conf, + } + props = ' '.join(map(lambda i: '-D%s=%s' % i, six.iteritems(props))) + return {'SPARK_DAEMON_JAVA_OPTS': props} + + def _write_slaves_list(self, cluster_context): + path = '%s/slaves' % self.conf_dir(cluster_context) + data = self._generate_slaves_file(cluster_context) + master = cluster_context.get_instance(SPARK_MASTER) + g.write_file(master, path, data, owner='root') + + def _generate_slaves_file(self, cluster_context): + slaves = cluster_context.get_instances(SPARK_SLAVE) + return '\n'.join(map(lambda i: i.fqdn(), slaves)) + + def _create_hadoop_spark_dirs(self, cluster_context): + path = '/apps/spark' + run_as_user = 'mapr' + with cluster_context.get_instance(SPARK_MASTER).remote() as r: + mfs.mkdir(r, path, run_as=run_as_user) + mfs.chmod(r, path, 777, run_as=run_as_user) + + def _install_spark_history_server(self, cluster_context, instances): + h_servers = cluster_context.filter_instances( + instances, SPARK_HISTORY_SERVER) + package = [(SPARK_HISTORY_SERVER.package, self.version)] + command = cluster_context.distro.create_install_cmd(package) + g.execute_command(h_servers, command, run_as='root') diff --git a/sahara/plugins/mapr/util/general.py b/sahara/plugins/mapr/util/general.py index 09997eb4..caf9499d 100644 --- a/sahara/plugins/mapr/util/general.py +++ b/sahara/plugins/mapr/util/general.py @@ -15,6 +15,7 @@ import uuid +from sahara.conductor import objects from sahara import context import sahara.utils.files as files @@ -65,12 +66,6 @@ def unpack_archive(instance, src, dest, cleanup=False, run_as=None): r.execute_command(_run_as(run_as, 'rm -r %s' % src)) -def is_directory(instance, path): - with instance.remote() as r: - ec, out = r.execute_command('[ -d %s ]' % path, raise_when_error=False) - return not ec - - def copy_file(s_path, s_instance, d_path, d_instance, run_as=None): with s_instance.remote() as sr: data = sr.read_file_from(s_path, run_as_root=(run_as == 'root')) @@ -109,3 +104,81 @@ def execute_on_instances(instances, function, *args, **kwargs): for instance in instances: t_name = '%s-execution' % function.__name__ tg.spawn(t_name, function, instance, *args, **kwargs) + + +def _replace(args, position, value): + return args[:position] + (value,) + args[position + 1:] + + +def remote_command(position): + def wrap(func): + def wrapped(*args, **kwargs): + target = args[position] + if isinstance(target, objects.Instance): + with target.remote() as remote: + return func(*_replace(args, position, remote), **kwargs) + return func(*args, **kwargs) + + return wrapped + + return wrap + + +def execute_command(instances, command, run_as=None): + def _execute_command(instance): + with instance.remote() as remote: + remote.execute_command(_run_as(run_as, command), timeout=1800) + + execute_on_instances(instances, _execute_command) + + +@remote_command(0) +def is_directory(remote, path): + command = '[ -d %s ]' % path + ec = remote.execute_command(command, True, raise_when_error=False)[0] + return not ec + + +@remote_command(0) +def chown(remote, owner, path): + args = {'owner': owner, 'path': path} + remote.execute_command('chown -R %(owner)s %(path)s' % args, True) + + +@remote_command(0) +def chmod(remote, mode, path): + args = {'mode': mode, 'path': path} + remote.execute_command('chmod -R %(mode)s %(path)s' % args, True) + + +@remote_command(0) +def mkdir(remote, path, mode=None, owner=''): + args = {'mode': '-m %s' % mode if mode else '', 'path': path} + remote.execute_command('mkdir -p %(mode)s %(path)s' % args, bool(owner)) + if owner: + chown(remote, owner, path) + + +@remote_command(0) +def write_file(remote, path, data, mode=None, owner=''): + remote.write_file_to(path, data, run_as_root=bool(owner)) + if mode: + chmod(remote, mode, path) + if owner: + chown(remote, owner, path) + + +@remote_command(0) +def install_ssh_key(remote, user, private_key, public_key): + ssh_dir = '/home/%s/.ssh' % user + owner = '%s:%s' % (user, user) + if not is_directory(remote, ssh_dir): + mkdir(remote, ssh_dir, 700, owner) + write_file(remote, '%s/id_rsa.pub' % ssh_dir, public_key, 644, owner) + write_file(remote, '%s/id_rsa' % ssh_dir, private_key, 600, owner) + + +@remote_command(0) +def authorize_key(remote, user, public_key): + authorized_keys = '/home/%s/.ssh/authorized_keys' % user + remote.append_to_file(authorized_keys, public_key, run_as_root=True) diff --git a/sahara/plugins/mapr/util/maprfs_helper.py b/sahara/plugins/mapr/util/maprfs_helper.py index 00949f16..f03e47c7 100644 --- a/sahara/plugins/mapr/util/maprfs_helper.py +++ b/sahara/plugins/mapr/util/maprfs_helper.py @@ -18,6 +18,9 @@ import uuid import six +import sahara.plugins.mapr.util.general as g + + MV_TO_MAPRFS_CMD = ('sudo -u %(user)s' ' hadoop fs -copyFromLocal %(source)s %(target)s' ' && sudo rm -f %(source)s') @@ -45,3 +48,15 @@ def create_maprfs4_dir(remote, dir_name, hdfs_user): def create_maprfs3_dir(remote, dir_name, hdfs_user): remote.execute_command(MKDIR_CMD_MAPR3 % {'user': hdfs_user, 'path': dir_name}) + + +def mkdir(remote, path, recursive=True, run_as=None): + command = 'hadoop fs -mkdir %(recursive)s %(path)s' + args = {'recursive': '-p' if recursive else '', 'path': path} + remote.execute_command(g._run_as(run_as, command % args)) + + +def chmod(remote, path, mode, recursive=True, run_as=None): + command = 'hadoop fs -chmod %(recursive)s %(mode)s %(path)s' + args = {'recursive': '-R' if recursive else '', 'path': path, 'mode': mode} + remote.execute_command(g._run_as(run_as, command % args)) diff --git a/sahara/plugins/mapr/versions/mapr_spark/__init__.py b/sahara/plugins/mapr/versions/mapr_spark/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/sahara/plugins/mapr/versions/mapr_spark/context.py b/sahara/plugins/mapr/versions/mapr_spark/context.py new file mode 100644 index 00000000..a593e7f5 --- /dev/null +++ b/sahara/plugins/mapr/versions/mapr_spark/context.py @@ -0,0 +1,49 @@ +# Copyright (c) 2015, MapR Technologies +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from sahara.plugins.mapr.base import base_cluster_context as bc +from sahara.plugins.mapr.services.yarn import yarn + + +class Context(bc.BaseClusterContext): + def __init__(self, cluster, version_handler, added=None, removed=None): + super(Context, self).__init__(cluster, version_handler, added, removed) + self._hadoop_version = yarn.YARNv241().version + self._hadoop_lib = None + self._hadoop_conf = None + self._resource_manager_uri = None + self._cluster_mode = None + self._node_aware = True + self._mapr_version = '4.0.1' + self._ubuntu_ecosystem_repo = ( + 'http://package.mapr.com/releases/ecosystem-4.x/ubuntu binary/') + self._centos_ecosystem_repo = ( + 'http://package.mapr.com/releases/ecosystem-4.x/redhat') + + @property + def hadoop_lib(self): + if not self._hadoop_lib: + self._hadoop_lib = '%s/share/hadoop/common' % self.hadoop_home + return self._hadoop_lib + + @property + def hadoop_conf(self): + if not self._hadoop_conf: + self._hadoop_conf = '%s/etc/hadoop' % self.hadoop_home + return self._hadoop_conf + + @property + def resource_manager_uri(self): + return self._resource_manager_uri diff --git a/sahara/plugins/mapr/versions/mapr_spark/spark_engine.py b/sahara/plugins/mapr/versions/mapr_spark/spark_engine.py new file mode 100755 index 00000000..05d0d9c5 --- /dev/null +++ b/sahara/plugins/mapr/versions/mapr_spark/spark_engine.py @@ -0,0 +1,154 @@ +# Copyright (c) 2015, MapR Technologies +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import os + +from sahara import conductor as c +from sahara import context +from sahara import exceptions as e +from sahara.i18n import _ +import sahara.plugins.mapr.services.spark.spark as spark +import sahara.plugins.mapr.util.general as g +import sahara.plugins.mapr.versions.version_handler_factory as vhf +import sahara.plugins.utils as plugin_utils +from sahara.service.edp import job_utils +from sahara.service.edp.spark import engine as base_engine +from sahara.swift import utils as su +from sahara.utils import edp + +conductor = c.API + + +class MapRSparkEngine(base_engine.SparkJobEngine): + def run_job(self, job_execution): + ctx = context.ctx() + job = conductor.job_get(ctx, job_execution.job_id) + + additional_sources, updated_job_configs = ( + job_utils.resolve_data_source_references(job_execution.job_configs) + ) + + # We'll always run the driver program on the master + master = plugin_utils.get_instance( + self.cluster, spark.SPARK_MASTER.ui_name) + + # TODO(tmckay): wf_dir should probably be configurable. + # The only requirement is that the dir is writable by the image user + wf_dir = job_utils.create_workflow_dir( + master, '/tmp/spark-edp', job, job_execution.id, "700") + paths, builtin_paths = self._upload_job_files( + master, wf_dir, job, updated_job_configs) + + # We can shorten the paths in this case since we'll run out of wf_dir + paths = [os.path.basename(p) for p in paths] + builtin_paths = [os.path.basename(p) for p in builtin_paths] + + # TODO(tmckay): for now, paths[0] is always assumed to be the app + # jar and we generate paths in order (mains, then libs). + # When we have a Spark job type, we can require a "main" and set + # the app jar explicitly to be "main" + app_jar = paths.pop(0) + job_class = updated_job_configs["configs"]["edp.java.main_class"] + + # If we uploaded builtins then we are using a wrapper jar. It will + # be the first one on the builtin list and the original app_jar needs + # to be added to the 'additional' jars + if builtin_paths: + wrapper_jar = builtin_paths.pop(0) + wrapper_class = 'org.openstack.sahara.edp.SparkWrapper' + wrapper_xml = self._upload_wrapper_xml( + master, wf_dir, updated_job_configs) + wrapper_args = "%s %s" % (wrapper_xml, job_class) + additional_jars = ",".join([app_jar] + paths + builtin_paths) + else: + wrapper_jar = wrapper_class = wrapper_args = "" + additional_jars = ",".join(paths) + + # All additional jars are passed with the --jars option + if additional_jars: + additional_jars = " --jars " + additional_jars + + # Launch the spark job using spark-submit and deploy_mode = client + cluster_context = self._get_cluster_context(self.cluster) + spark_home_dir = spark.Spark().home_dir(cluster_context) + + # TODO(tmckay): we need to clean up wf_dirs on long running clusters + # TODO(tmckay): probably allow for general options to spark-submit + args = updated_job_configs.get('args', []) + args = " ".join([su.inject_swift_url_suffix(arg) for arg in args]) + + submit_args = { + "spark_submit": "%s/bin/spark-submit" % spark_home_dir, + "addnl_jars": additional_jars, + "master_url": spark.SPARK_MASTER.submit_url(cluster_context), + "args": args + } + if wrapper_jar and wrapper_class: + # Substrings which may be empty have spaces + # embedded if they are non-empty + submit_args.update({ + "driver_cp": self.get_driver_classpath(), + "wrapper_class": wrapper_class, + "wrapper_jar": wrapper_jar, + "wrapper_args": wrapper_args, + }) + submit_cmd = ('%(spark_submit)s%(driver_cp)s' + ' --class %(wrapper_class)s%(addnl_jars)s' + ' --master %(master_url)s' + ' %(wrapper_jar)s %(wrapper_args)s %(args)s') + else: + submit_args.update({ + "job_class": job_class, + "app_jar": app_jar, + }) + submit_cmd = ('%(spark_submit)s --class %(job_class)s' + '%(addnl_jars)s --master %(master_url)s' + ' %(app_jar)s %(args)s') + submit_cmd = g._run_as('mapr', submit_cmd % submit_args) + + job_execution = conductor.job_execution_get(ctx, job_execution.id) + if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED: + return (None, edp.JOB_STATUS_KILLED, None) + + # If an exception is raised here, the job_manager will mark + # the job failed and log the exception + # The redirects of stdout and stderr will preserve output in the wf_dir + with master.remote() as r: + # Upload the command launch script + launch = os.path.join(wf_dir, "launch_command") + r.write_file_to(launch, self._job_script()) + r.execute_command("chmod +x %s" % launch) + ret, stdout = r.execute_command( + "cd %s && ./launch_command %s > /dev/null 2>&1 & echo $!" + % (wf_dir, submit_cmd), raise_when_error=False) + + if ret == 0: + # Success, we'll add the wf_dir in job_execution.extra and store + # pid@instance_id as the job id + # We know the job is running so return "RUNNING" + return (stdout.strip() + "@" + master.id, + edp.JOB_STATUS_RUNNING, + {'spark-path': wf_dir}) + + # Hmm, no execption but something failed. + # Since we're using backgrounding with redirect, this is unlikely. + raise e.EDPError(_("Spark job execution failed. Exit status = " + "%(status)s, stdout = %(stdout)s") % + {'status': ret, 'stdout': stdout}) + + def _get_cluster_context(self, cluster): + version = cluster.hadoop_version + handler = vhf.VersionHandlerFactory.get().get_handler(version) + return handler.get_context(cluster) diff --git a/sahara/plugins/mapr/versions/mapr_spark/spark_node_manager.py b/sahara/plugins/mapr/versions/mapr_spark/spark_node_manager.py new file mode 100644 index 00000000..ed356e0e --- /dev/null +++ b/sahara/plugins/mapr/versions/mapr_spark/spark_node_manager.py @@ -0,0 +1,27 @@ +# Copyright (c) 2015, MapR Technologies +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from sahara.plugins.mapr.base import base_node_manager +from sahara.plugins.mapr.services.spark import spark + + +class SparkNodeManager(base_node_manager.BaseNodeManager): + def start(self, cluster_context, instances=None): + super(SparkNodeManager, self).start(cluster_context, instances) + + instances = instances or cluster_context.added_instances() + slaves = cluster_context.filter_instances(instances, spark.SPARK_SLAVE) + if slaves: + spark.SPARK_SLAVE.start(cluster_context, slaves) diff --git a/sahara/plugins/mapr/versions/mapr_spark/version_handler.py b/sahara/plugins/mapr/versions/mapr_spark/version_handler.py new file mode 100755 index 00000000..37a36c37 --- /dev/null +++ b/sahara/plugins/mapr/versions/mapr_spark/version_handler.py @@ -0,0 +1,50 @@ +# Copyright (c) 2015, MapR Technologies +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from sahara.plugins.mapr.base import base_version_handler as bvh +from sahara.plugins.mapr.services.management import management +from sahara.plugins.mapr.services.maprfs import maprfs +from sahara.plugins.mapr.services.spark import spark +from sahara.plugins.mapr.versions.mapr_spark import context as c +from sahara.plugins.mapr.versions.mapr_spark import spark_engine as edp_engine +from sahara.plugins.mapr.versions.mapr_spark import spark_node_manager + + +version = 'spark' + + +class VersionHandler(bvh.BaseVersionHandler): + def __init__(self): + super(VersionHandler, self).__init__() + self._node_manager = spark_node_manager.SparkNodeManager() + self._version = version + self._required_services = [ + management.Management(), + maprfs.MapRFS(), + spark.Spark(), + ] + self._services = [ + management.Management(), + maprfs.MapRFS(), + spark.Spark(), + ] + + def get_context(self, cluster, added=None, removed=None): + return c.Context(cluster, self, added, removed) + + def get_edp_engine(self, cluster, job_type): + if job_type in edp_engine.MapRSparkEngine.get_supported_job_types(): + return edp_engine.MapRSparkEngine(cluster) + return None diff --git a/sahara/tests/unit/plugins/mapr/test_config_files.py b/sahara/tests/unit/plugins/mapr/test_config_files.py index 840189ce..f158bd46 100644 --- a/sahara/tests/unit/plugins/mapr/test_config_files.py +++ b/sahara/tests/unit/plugins/mapr/test_config_files.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. + import sahara.plugins.mapr.domain.configuration_file as conf_f import sahara.tests.unit.base as b @@ -182,3 +183,59 @@ key2=value2''' foo.add_property('key2', 'value2') expected = {'key1': 'value1', 'key2': 'value2'} self.assertDictEqual(expected, foo._config_dict) + + +class TestEnvironmentConfig(b.SaharaTestCase): + def __init__(self, *args, **kwds): + super(TestEnvironmentConfig, self).__init__(*args, **kwds) + self.content = ''' +non export line +export key1=value1 +export key2=value2 +export key +''' + + def test_remote_path(self): + foo = conf_f.EnvironmentConfig('foo') + foo.remote_path = '/bar' + self.assertEqual('/bar/foo', foo.remote_path) + + def test_parse(self): + foo = conf_f.EnvironmentConfig('foo') + foo.parse(self.content) + expected = {'key1': 'value1', 'key2': 'value2'} + self.assertDictEqual(expected, foo._config_dict) + + def test_render(self): + foo = conf_f.EnvironmentConfig('foo') + expected = {'ke1': 'value1', 'key2': 'value2'} + foo._config_dict = expected + actual = foo.render() + bar = conf_f.EnvironmentConfig('bar') + bar.parse(actual) + self.assertDictEqual(expected, bar._config_dict) + + def test_render_extra_properties(self): + foo = conf_f.EnvironmentConfig('foo') + foo.parse(self.content) + foo.add_property('key3', 'value3') + foo_content = foo.render() + bar = conf_f.EnvironmentConfig('bar') + bar.parse(foo_content) + expected = {'key1': 'value1', 'key2': 'value2', 'key3': 'value3'} + self.assertDictEqual(expected, bar._config_dict) + + def test_add_property(self): + foo = conf_f.EnvironmentConfig('foo') + self.assertDictEqual({}, foo._config_dict) + foo.add_property('key1', 'value1') + self.assertDictEqual({'key1': 'value1'}, foo._config_dict) + foo.add_property('key2', 'value2') + expected = {'key1': 'value1', 'key2': 'value2'} + self.assertDictEqual(expected, foo._config_dict) + + def test_get_config_value(self): + foo = conf_f.EnvironmentConfig('foo') + foo._config_dict = {'foo': 'bar'} + self.assertEqual('bar', foo._get_config_value('foo')) + self.assertIsNone(foo._get_config_value('bar'))