Add Spark support for MapR plugin
Change-Id: Ic15af84f4d20f5a8e168f723ff2678a8f5f5c448 Implements: blueprint mapr-spark
This commit is contained in:
parent
e7bfc1de3a
commit
b542be80ea
@ -15,6 +15,7 @@
|
||||
|
||||
import abc
|
||||
import os
|
||||
import re
|
||||
|
||||
import jinja2 as j2
|
||||
import six
|
||||
@ -130,3 +131,37 @@ class TemplateFile(BaseConfigurationFile):
|
||||
|
||||
def parse(self, content):
|
||||
self._template = j2.Template(content)
|
||||
|
||||
|
||||
class EnvironmentConfig(BaseConfigurationFile):
|
||||
def __init__(self, file_name):
|
||||
super(EnvironmentConfig, self).__init__(file_name)
|
||||
self._lines = []
|
||||
self._regex = re.compile(r'export\s+(\w+)=(.+)')
|
||||
self._tmpl = 'export %s="%s"'
|
||||
|
||||
def parse(self, content):
|
||||
for line in content.splitlines():
|
||||
line = line.strip().decode("utf-8")
|
||||
match = self._regex.match(line)
|
||||
if match:
|
||||
name, value = match.groups()
|
||||
value = value.replace("\"", '')
|
||||
self._lines.append((name, value))
|
||||
self.add_property(name, value)
|
||||
else:
|
||||
self._lines.append(line)
|
||||
|
||||
def render(self):
|
||||
result = []
|
||||
for line in self._lines:
|
||||
if isinstance(line, tuple):
|
||||
name, value = line
|
||||
args = (name, self._config_dict.get(name) or value)
|
||||
result.append(self._tmpl % args)
|
||||
if name in self._config_dict:
|
||||
del self._config_dict[name]
|
||||
else:
|
||||
result.append(line)
|
||||
extra_ops = [self._tmpl % i for i in six.iteritems(self._config_dict)]
|
||||
return '\n'.join(result + extra_ops) + '\n'
|
||||
|
@ -21,6 +21,7 @@ import six
|
||||
import sahara.plugins.mapr.domain.configuration_file as cf
|
||||
import sahara.plugins.mapr.domain.service as s
|
||||
import sahara.plugins.mapr.services.hive.hive as hive
|
||||
from sahara.plugins.mapr.services.spark import spark
|
||||
import sahara.plugins.mapr.util.general as g
|
||||
import sahara.utils.files as f
|
||||
|
||||
@ -170,7 +171,7 @@ class MySQL(s.Service):
|
||||
|
||||
@staticmethod
|
||||
def get_db_instance(context):
|
||||
return context.oozie_server
|
||||
return context.oozie_server or context.get_instance(spark.SPARK_MASTER)
|
||||
|
||||
@staticmethod
|
||||
def create_databases(cluster_context, instances):
|
||||
|
0
sahara/plugins/mapr/services/spark/__init__.py
Normal file
0
sahara/plugins/mapr/services/spark/__init__.py
Normal file
@ -0,0 +1,20 @@
|
||||
[
|
||||
{
|
||||
"name": "SPARK_WORKER_CORES",
|
||||
"config_type": "int",
|
||||
"value": 1,
|
||||
"description": "The number of cores to use on this machine."
|
||||
},
|
||||
{
|
||||
"name": "SPARK_WORKER_MEMORY",
|
||||
"config_type": "string",
|
||||
"value": "16g",
|
||||
"description": "How much total memory workers have to give executors (e.g. 1000m, 2g)."
|
||||
},
|
||||
{
|
||||
"name": "SPARK_WORKER_INSTANCES",
|
||||
"config_type": "int",
|
||||
"value": 1,
|
||||
"description": "The number of worker processes per node."
|
||||
}
|
||||
]
|
175
sahara/plugins/mapr/services/spark/spark.py
Executable file
175
sahara/plugins/mapr/services/spark/spark.py
Executable file
@ -0,0 +1,175 @@
|
||||
# Copyright (c) 2015, MapR Technologies
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
|
||||
import sahara.plugins.mapr.domain.configuration_file as bcf
|
||||
import sahara.plugins.mapr.domain.node_process as np
|
||||
import sahara.plugins.mapr.domain.service as s
|
||||
import sahara.plugins.mapr.util.general as g
|
||||
import sahara.plugins.mapr.util.maprfs_helper as mfs
|
||||
import sahara.plugins.mapr.util.validation_utils as vu
|
||||
|
||||
|
||||
SPARK_MASTER_PORT = 7077
|
||||
|
||||
|
||||
class SparkNodeProcess(np.NodeProcess):
|
||||
pass
|
||||
|
||||
|
||||
class SparkMaster(np.NodeProcess):
|
||||
_submit_port = SPARK_MASTER_PORT
|
||||
|
||||
def submit_url(self, cluster_context):
|
||||
host = cluster_context.get_instance(self).fqdn()
|
||||
args = {'host': host, 'port': self.submit_port(cluster_context)}
|
||||
return 'spark://%(host)s:%(port)s' % args
|
||||
|
||||
def submit_port(self, cluster_context):
|
||||
return self._submit_port
|
||||
|
||||
|
||||
class SparkWorker(SparkNodeProcess):
|
||||
_start_script = 'sbin/start-slave.sh'
|
||||
|
||||
def start(self, cluster_context, instances=None):
|
||||
master_url = SPARK_MASTER.submit_url(cluster_context)
|
||||
args = {
|
||||
'spark_home': Spark().home_dir(cluster_context),
|
||||
'start_slave': self._start_script,
|
||||
'master_url': master_url,
|
||||
}
|
||||
command = g._run_as('mapr', '%(start_slave)s 1 %(master_url)s')
|
||||
command = ('cd %(spark_home)s && ' + command) % args
|
||||
g.execute_command(instances, command)
|
||||
|
||||
|
||||
SPARK_MASTER = SparkMaster(
|
||||
name='spark-master',
|
||||
ui_name='Spark Master',
|
||||
package='mapr-spark-master',
|
||||
open_ports=[SPARK_MASTER_PORT],
|
||||
)
|
||||
SPARK_HISTORY_SERVER = SparkNodeProcess(
|
||||
name='spark-historyserver',
|
||||
ui_name='Spark HistoryServer',
|
||||
package='mapr-spark-historyserver',
|
||||
)
|
||||
SPARK_SLAVE = SparkWorker(
|
||||
name='spark-master',
|
||||
ui_name='Spark Slave',
|
||||
package='mapr-spark',
|
||||
)
|
||||
|
||||
|
||||
@six.add_metaclass(s.Single)
|
||||
class Spark(s.Service):
|
||||
def __init__(self):
|
||||
super(Spark, self).__init__()
|
||||
self._name = 'spark'
|
||||
self._ui_name = 'Spark'
|
||||
self._version = '1.2.1'
|
||||
self._node_processes = [
|
||||
SPARK_HISTORY_SERVER,
|
||||
SPARK_MASTER,
|
||||
SPARK_SLAVE,
|
||||
]
|
||||
self._dependencies = [('mapr-spark', self.version)]
|
||||
self._ui_info = [('SPARK', SPARK_MASTER, 'http://%s:8080')]
|
||||
self._validation_rules = [
|
||||
vu.exactly(1, SPARK_MASTER),
|
||||
vu.exactly(1, SPARK_HISTORY_SERVER),
|
||||
vu.at_least(1, SPARK_SLAVE),
|
||||
]
|
||||
self._node_defaults = ['spark-default.json']
|
||||
|
||||
def _get_packages(self, node_processes):
|
||||
result = []
|
||||
result += self.dependencies
|
||||
result += [(np.package, self.version)
|
||||
for np in node_processes
|
||||
if np != SPARK_HISTORY_SERVER]
|
||||
return g.unique_list(result)
|
||||
|
||||
def get_config_files(self, cluster_context, configs, instance=None):
|
||||
env = bcf.EnvironmentConfig('spark-env.sh')
|
||||
env.remote_path = self.conf_dir(cluster_context)
|
||||
if instance:
|
||||
env.fetch(instance)
|
||||
env.load_properties(configs)
|
||||
env.add_properties(self._get_spark_ha_props(cluster_context))
|
||||
env.add_property('SPARK_WORKER_DIR', '/tmp/spark')
|
||||
return [env]
|
||||
|
||||
def configure(self, cluster_context, instances=None):
|
||||
self._write_slaves_list(cluster_context)
|
||||
|
||||
def update(self, cluster_context, instances=None):
|
||||
if cluster_context.changed_instances(SPARK_SLAVE):
|
||||
self._write_slaves_list(cluster_context)
|
||||
|
||||
def post_install(self, cluster_context, instances):
|
||||
self._install_ssh_keys(cluster_context, instances)
|
||||
|
||||
def post_start(self, cluster_context, instances):
|
||||
self._create_hadoop_spark_dirs(cluster_context)
|
||||
if cluster_context.filter_instances(instances, SPARK_HISTORY_SERVER):
|
||||
self._install_spark_history_server(cluster_context, instances)
|
||||
|
||||
def _install_ssh_keys(self, cluster_context, instances):
|
||||
slaves = cluster_context.filter_instances(instances, SPARK_SLAVE)
|
||||
masters = cluster_context.filter_instances(instances, SPARK_MASTER)
|
||||
instances = g.unique_list(masters + slaves)
|
||||
private_key = cluster_context.cluster.management_private_key
|
||||
public_key = cluster_context.cluster.management_public_key
|
||||
g.execute_on_instances(
|
||||
instances, g.install_ssh_key, 'mapr', private_key, public_key)
|
||||
g.execute_on_instances(instances, g.authorize_key, 'mapr', public_key)
|
||||
|
||||
def _get_spark_ha_props(self, cluster_context):
|
||||
zookeepers = cluster_context.get_zookeeper_nodes_ip_with_port()
|
||||
login_conf = '%s/conf/mapr.login.conf' % cluster_context.mapr_home
|
||||
props = {
|
||||
'spark.deploy.recoveryMode': 'ZOOKEEPER',
|
||||
'spark.deploy.zookeeper.url': zookeepers,
|
||||
'zookeeper.sasl.client': 'false',
|
||||
'java.security.auth.login.config': login_conf,
|
||||
}
|
||||
props = ' '.join(map(lambda i: '-D%s=%s' % i, six.iteritems(props)))
|
||||
return {'SPARK_DAEMON_JAVA_OPTS': props}
|
||||
|
||||
def _write_slaves_list(self, cluster_context):
|
||||
path = '%s/slaves' % self.conf_dir(cluster_context)
|
||||
data = self._generate_slaves_file(cluster_context)
|
||||
master = cluster_context.get_instance(SPARK_MASTER)
|
||||
g.write_file(master, path, data, owner='root')
|
||||
|
||||
def _generate_slaves_file(self, cluster_context):
|
||||
slaves = cluster_context.get_instances(SPARK_SLAVE)
|
||||
return '\n'.join(map(lambda i: i.fqdn(), slaves))
|
||||
|
||||
def _create_hadoop_spark_dirs(self, cluster_context):
|
||||
path = '/apps/spark'
|
||||
run_as_user = 'mapr'
|
||||
with cluster_context.get_instance(SPARK_MASTER).remote() as r:
|
||||
mfs.mkdir(r, path, run_as=run_as_user)
|
||||
mfs.chmod(r, path, 777, run_as=run_as_user)
|
||||
|
||||
def _install_spark_history_server(self, cluster_context, instances):
|
||||
h_servers = cluster_context.filter_instances(
|
||||
instances, SPARK_HISTORY_SERVER)
|
||||
package = [(SPARK_HISTORY_SERVER.package, self.version)]
|
||||
command = cluster_context.distro.create_install_cmd(package)
|
||||
g.execute_command(h_servers, command, run_as='root')
|
@ -15,6 +15,7 @@
|
||||
|
||||
import uuid
|
||||
|
||||
from sahara.conductor import objects
|
||||
from sahara import context
|
||||
import sahara.utils.files as files
|
||||
|
||||
@ -65,12 +66,6 @@ def unpack_archive(instance, src, dest, cleanup=False, run_as=None):
|
||||
r.execute_command(_run_as(run_as, 'rm -r %s' % src))
|
||||
|
||||
|
||||
def is_directory(instance, path):
|
||||
with instance.remote() as r:
|
||||
ec, out = r.execute_command('[ -d %s ]' % path, raise_when_error=False)
|
||||
return not ec
|
||||
|
||||
|
||||
def copy_file(s_path, s_instance, d_path, d_instance, run_as=None):
|
||||
with s_instance.remote() as sr:
|
||||
data = sr.read_file_from(s_path, run_as_root=(run_as == 'root'))
|
||||
@ -109,3 +104,81 @@ def execute_on_instances(instances, function, *args, **kwargs):
|
||||
for instance in instances:
|
||||
t_name = '%s-execution' % function.__name__
|
||||
tg.spawn(t_name, function, instance, *args, **kwargs)
|
||||
|
||||
|
||||
def _replace(args, position, value):
|
||||
return args[:position] + (value,) + args[position + 1:]
|
||||
|
||||
|
||||
def remote_command(position):
|
||||
def wrap(func):
|
||||
def wrapped(*args, **kwargs):
|
||||
target = args[position]
|
||||
if isinstance(target, objects.Instance):
|
||||
with target.remote() as remote:
|
||||
return func(*_replace(args, position, remote), **kwargs)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapped
|
||||
|
||||
return wrap
|
||||
|
||||
|
||||
def execute_command(instances, command, run_as=None):
|
||||
def _execute_command(instance):
|
||||
with instance.remote() as remote:
|
||||
remote.execute_command(_run_as(run_as, command), timeout=1800)
|
||||
|
||||
execute_on_instances(instances, _execute_command)
|
||||
|
||||
|
||||
@remote_command(0)
|
||||
def is_directory(remote, path):
|
||||
command = '[ -d %s ]' % path
|
||||
ec = remote.execute_command(command, True, raise_when_error=False)[0]
|
||||
return not ec
|
||||
|
||||
|
||||
@remote_command(0)
|
||||
def chown(remote, owner, path):
|
||||
args = {'owner': owner, 'path': path}
|
||||
remote.execute_command('chown -R %(owner)s %(path)s' % args, True)
|
||||
|
||||
|
||||
@remote_command(0)
|
||||
def chmod(remote, mode, path):
|
||||
args = {'mode': mode, 'path': path}
|
||||
remote.execute_command('chmod -R %(mode)s %(path)s' % args, True)
|
||||
|
||||
|
||||
@remote_command(0)
|
||||
def mkdir(remote, path, mode=None, owner=''):
|
||||
args = {'mode': '-m %s' % mode if mode else '', 'path': path}
|
||||
remote.execute_command('mkdir -p %(mode)s %(path)s' % args, bool(owner))
|
||||
if owner:
|
||||
chown(remote, owner, path)
|
||||
|
||||
|
||||
@remote_command(0)
|
||||
def write_file(remote, path, data, mode=None, owner=''):
|
||||
remote.write_file_to(path, data, run_as_root=bool(owner))
|
||||
if mode:
|
||||
chmod(remote, mode, path)
|
||||
if owner:
|
||||
chown(remote, owner, path)
|
||||
|
||||
|
||||
@remote_command(0)
|
||||
def install_ssh_key(remote, user, private_key, public_key):
|
||||
ssh_dir = '/home/%s/.ssh' % user
|
||||
owner = '%s:%s' % (user, user)
|
||||
if not is_directory(remote, ssh_dir):
|
||||
mkdir(remote, ssh_dir, 700, owner)
|
||||
write_file(remote, '%s/id_rsa.pub' % ssh_dir, public_key, 644, owner)
|
||||
write_file(remote, '%s/id_rsa' % ssh_dir, private_key, 600, owner)
|
||||
|
||||
|
||||
@remote_command(0)
|
||||
def authorize_key(remote, user, public_key):
|
||||
authorized_keys = '/home/%s/.ssh/authorized_keys' % user
|
||||
remote.append_to_file(authorized_keys, public_key, run_as_root=True)
|
||||
|
@ -18,6 +18,9 @@ import uuid
|
||||
|
||||
import six
|
||||
|
||||
import sahara.plugins.mapr.util.general as g
|
||||
|
||||
|
||||
MV_TO_MAPRFS_CMD = ('sudo -u %(user)s'
|
||||
' hadoop fs -copyFromLocal %(source)s %(target)s'
|
||||
' && sudo rm -f %(source)s')
|
||||
@ -45,3 +48,15 @@ def create_maprfs4_dir(remote, dir_name, hdfs_user):
|
||||
def create_maprfs3_dir(remote, dir_name, hdfs_user):
|
||||
remote.execute_command(MKDIR_CMD_MAPR3 % {'user': hdfs_user,
|
||||
'path': dir_name})
|
||||
|
||||
|
||||
def mkdir(remote, path, recursive=True, run_as=None):
|
||||
command = 'hadoop fs -mkdir %(recursive)s %(path)s'
|
||||
args = {'recursive': '-p' if recursive else '', 'path': path}
|
||||
remote.execute_command(g._run_as(run_as, command % args))
|
||||
|
||||
|
||||
def chmod(remote, path, mode, recursive=True, run_as=None):
|
||||
command = 'hadoop fs -chmod %(recursive)s %(mode)s %(path)s'
|
||||
args = {'recursive': '-R' if recursive else '', 'path': path, 'mode': mode}
|
||||
remote.execute_command(g._run_as(run_as, command % args))
|
||||
|
0
sahara/plugins/mapr/versions/mapr_spark/__init__.py
Executable file
0
sahara/plugins/mapr/versions/mapr_spark/__init__.py
Executable file
49
sahara/plugins/mapr/versions/mapr_spark/context.py
Normal file
49
sahara/plugins/mapr/versions/mapr_spark/context.py
Normal file
@ -0,0 +1,49 @@
|
||||
# Copyright (c) 2015, MapR Technologies
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from sahara.plugins.mapr.base import base_cluster_context as bc
|
||||
from sahara.plugins.mapr.services.yarn import yarn
|
||||
|
||||
|
||||
class Context(bc.BaseClusterContext):
|
||||
def __init__(self, cluster, version_handler, added=None, removed=None):
|
||||
super(Context, self).__init__(cluster, version_handler, added, removed)
|
||||
self._hadoop_version = yarn.YARNv241().version
|
||||
self._hadoop_lib = None
|
||||
self._hadoop_conf = None
|
||||
self._resource_manager_uri = None
|
||||
self._cluster_mode = None
|
||||
self._node_aware = True
|
||||
self._mapr_version = '4.0.1'
|
||||
self._ubuntu_ecosystem_repo = (
|
||||
'http://package.mapr.com/releases/ecosystem-4.x/ubuntu binary/')
|
||||
self._centos_ecosystem_repo = (
|
||||
'http://package.mapr.com/releases/ecosystem-4.x/redhat')
|
||||
|
||||
@property
|
||||
def hadoop_lib(self):
|
||||
if not self._hadoop_lib:
|
||||
self._hadoop_lib = '%s/share/hadoop/common' % self.hadoop_home
|
||||
return self._hadoop_lib
|
||||
|
||||
@property
|
||||
def hadoop_conf(self):
|
||||
if not self._hadoop_conf:
|
||||
self._hadoop_conf = '%s/etc/hadoop' % self.hadoop_home
|
||||
return self._hadoop_conf
|
||||
|
||||
@property
|
||||
def resource_manager_uri(self):
|
||||
return self._resource_manager_uri
|
154
sahara/plugins/mapr/versions/mapr_spark/spark_engine.py
Executable file
154
sahara/plugins/mapr/versions/mapr_spark/spark_engine.py
Executable file
@ -0,0 +1,154 @@
|
||||
# Copyright (c) 2015, MapR Technologies
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import os
|
||||
|
||||
from sahara import conductor as c
|
||||
from sahara import context
|
||||
from sahara import exceptions as e
|
||||
from sahara.i18n import _
|
||||
import sahara.plugins.mapr.services.spark.spark as spark
|
||||
import sahara.plugins.mapr.util.general as g
|
||||
import sahara.plugins.mapr.versions.version_handler_factory as vhf
|
||||
import sahara.plugins.utils as plugin_utils
|
||||
from sahara.service.edp import job_utils
|
||||
from sahara.service.edp.spark import engine as base_engine
|
||||
from sahara.swift import utils as su
|
||||
from sahara.utils import edp
|
||||
|
||||
conductor = c.API
|
||||
|
||||
|
||||
class MapRSparkEngine(base_engine.SparkJobEngine):
|
||||
def run_job(self, job_execution):
|
||||
ctx = context.ctx()
|
||||
job = conductor.job_get(ctx, job_execution.job_id)
|
||||
|
||||
additional_sources, updated_job_configs = (
|
||||
job_utils.resolve_data_source_references(job_execution.job_configs)
|
||||
)
|
||||
|
||||
# We'll always run the driver program on the master
|
||||
master = plugin_utils.get_instance(
|
||||
self.cluster, spark.SPARK_MASTER.ui_name)
|
||||
|
||||
# TODO(tmckay): wf_dir should probably be configurable.
|
||||
# The only requirement is that the dir is writable by the image user
|
||||
wf_dir = job_utils.create_workflow_dir(
|
||||
master, '/tmp/spark-edp', job, job_execution.id, "700")
|
||||
paths, builtin_paths = self._upload_job_files(
|
||||
master, wf_dir, job, updated_job_configs)
|
||||
|
||||
# We can shorten the paths in this case since we'll run out of wf_dir
|
||||
paths = [os.path.basename(p) for p in paths]
|
||||
builtin_paths = [os.path.basename(p) for p in builtin_paths]
|
||||
|
||||
# TODO(tmckay): for now, paths[0] is always assumed to be the app
|
||||
# jar and we generate paths in order (mains, then libs).
|
||||
# When we have a Spark job type, we can require a "main" and set
|
||||
# the app jar explicitly to be "main"
|
||||
app_jar = paths.pop(0)
|
||||
job_class = updated_job_configs["configs"]["edp.java.main_class"]
|
||||
|
||||
# If we uploaded builtins then we are using a wrapper jar. It will
|
||||
# be the first one on the builtin list and the original app_jar needs
|
||||
# to be added to the 'additional' jars
|
||||
if builtin_paths:
|
||||
wrapper_jar = builtin_paths.pop(0)
|
||||
wrapper_class = 'org.openstack.sahara.edp.SparkWrapper'
|
||||
wrapper_xml = self._upload_wrapper_xml(
|
||||
master, wf_dir, updated_job_configs)
|
||||
wrapper_args = "%s %s" % (wrapper_xml, job_class)
|
||||
additional_jars = ",".join([app_jar] + paths + builtin_paths)
|
||||
else:
|
||||
wrapper_jar = wrapper_class = wrapper_args = ""
|
||||
additional_jars = ",".join(paths)
|
||||
|
||||
# All additional jars are passed with the --jars option
|
||||
if additional_jars:
|
||||
additional_jars = " --jars " + additional_jars
|
||||
|
||||
# Launch the spark job using spark-submit and deploy_mode = client
|
||||
cluster_context = self._get_cluster_context(self.cluster)
|
||||
spark_home_dir = spark.Spark().home_dir(cluster_context)
|
||||
|
||||
# TODO(tmckay): we need to clean up wf_dirs on long running clusters
|
||||
# TODO(tmckay): probably allow for general options to spark-submit
|
||||
args = updated_job_configs.get('args', [])
|
||||
args = " ".join([su.inject_swift_url_suffix(arg) for arg in args])
|
||||
|
||||
submit_args = {
|
||||
"spark_submit": "%s/bin/spark-submit" % spark_home_dir,
|
||||
"addnl_jars": additional_jars,
|
||||
"master_url": spark.SPARK_MASTER.submit_url(cluster_context),
|
||||
"args": args
|
||||
}
|
||||
if wrapper_jar and wrapper_class:
|
||||
# Substrings which may be empty have spaces
|
||||
# embedded if they are non-empty
|
||||
submit_args.update({
|
||||
"driver_cp": self.get_driver_classpath(),
|
||||
"wrapper_class": wrapper_class,
|
||||
"wrapper_jar": wrapper_jar,
|
||||
"wrapper_args": wrapper_args,
|
||||
})
|
||||
submit_cmd = ('%(spark_submit)s%(driver_cp)s'
|
||||
' --class %(wrapper_class)s%(addnl_jars)s'
|
||||
' --master %(master_url)s'
|
||||
' %(wrapper_jar)s %(wrapper_args)s %(args)s')
|
||||
else:
|
||||
submit_args.update({
|
||||
"job_class": job_class,
|
||||
"app_jar": app_jar,
|
||||
})
|
||||
submit_cmd = ('%(spark_submit)s --class %(job_class)s'
|
||||
'%(addnl_jars)s --master %(master_url)s'
|
||||
' %(app_jar)s %(args)s')
|
||||
submit_cmd = g._run_as('mapr', submit_cmd % submit_args)
|
||||
|
||||
job_execution = conductor.job_execution_get(ctx, job_execution.id)
|
||||
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
|
||||
return (None, edp.JOB_STATUS_KILLED, None)
|
||||
|
||||
# If an exception is raised here, the job_manager will mark
|
||||
# the job failed and log the exception
|
||||
# The redirects of stdout and stderr will preserve output in the wf_dir
|
||||
with master.remote() as r:
|
||||
# Upload the command launch script
|
||||
launch = os.path.join(wf_dir, "launch_command")
|
||||
r.write_file_to(launch, self._job_script())
|
||||
r.execute_command("chmod +x %s" % launch)
|
||||
ret, stdout = r.execute_command(
|
||||
"cd %s && ./launch_command %s > /dev/null 2>&1 & echo $!"
|
||||
% (wf_dir, submit_cmd), raise_when_error=False)
|
||||
|
||||
if ret == 0:
|
||||
# Success, we'll add the wf_dir in job_execution.extra and store
|
||||
# pid@instance_id as the job id
|
||||
# We know the job is running so return "RUNNING"
|
||||
return (stdout.strip() + "@" + master.id,
|
||||
edp.JOB_STATUS_RUNNING,
|
||||
{'spark-path': wf_dir})
|
||||
|
||||
# Hmm, no execption but something failed.
|
||||
# Since we're using backgrounding with redirect, this is unlikely.
|
||||
raise e.EDPError(_("Spark job execution failed. Exit status = "
|
||||
"%(status)s, stdout = %(stdout)s") %
|
||||
{'status': ret, 'stdout': stdout})
|
||||
|
||||
def _get_cluster_context(self, cluster):
|
||||
version = cluster.hadoop_version
|
||||
handler = vhf.VersionHandlerFactory.get().get_handler(version)
|
||||
return handler.get_context(cluster)
|
@ -0,0 +1,27 @@
|
||||
# Copyright (c) 2015, MapR Technologies
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from sahara.plugins.mapr.base import base_node_manager
|
||||
from sahara.plugins.mapr.services.spark import spark
|
||||
|
||||
|
||||
class SparkNodeManager(base_node_manager.BaseNodeManager):
|
||||
def start(self, cluster_context, instances=None):
|
||||
super(SparkNodeManager, self).start(cluster_context, instances)
|
||||
|
||||
instances = instances or cluster_context.added_instances()
|
||||
slaves = cluster_context.filter_instances(instances, spark.SPARK_SLAVE)
|
||||
if slaves:
|
||||
spark.SPARK_SLAVE.start(cluster_context, slaves)
|
50
sahara/plugins/mapr/versions/mapr_spark/version_handler.py
Executable file
50
sahara/plugins/mapr/versions/mapr_spark/version_handler.py
Executable file
@ -0,0 +1,50 @@
|
||||
# Copyright (c) 2015, MapR Technologies
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from sahara.plugins.mapr.base import base_version_handler as bvh
|
||||
from sahara.plugins.mapr.services.management import management
|
||||
from sahara.plugins.mapr.services.maprfs import maprfs
|
||||
from sahara.plugins.mapr.services.spark import spark
|
||||
from sahara.plugins.mapr.versions.mapr_spark import context as c
|
||||
from sahara.plugins.mapr.versions.mapr_spark import spark_engine as edp_engine
|
||||
from sahara.plugins.mapr.versions.mapr_spark import spark_node_manager
|
||||
|
||||
|
||||
version = 'spark'
|
||||
|
||||
|
||||
class VersionHandler(bvh.BaseVersionHandler):
|
||||
def __init__(self):
|
||||
super(VersionHandler, self).__init__()
|
||||
self._node_manager = spark_node_manager.SparkNodeManager()
|
||||
self._version = version
|
||||
self._required_services = [
|
||||
management.Management(),
|
||||
maprfs.MapRFS(),
|
||||
spark.Spark(),
|
||||
]
|
||||
self._services = [
|
||||
management.Management(),
|
||||
maprfs.MapRFS(),
|
||||
spark.Spark(),
|
||||
]
|
||||
|
||||
def get_context(self, cluster, added=None, removed=None):
|
||||
return c.Context(cluster, self, added, removed)
|
||||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
if job_type in edp_engine.MapRSparkEngine.get_supported_job_types():
|
||||
return edp_engine.MapRSparkEngine(cluster)
|
||||
return None
|
@ -12,6 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import sahara.plugins.mapr.domain.configuration_file as conf_f
|
||||
import sahara.tests.unit.base as b
|
||||
|
||||
@ -182,3 +183,59 @@ key2=value2'''
|
||||
foo.add_property('key2', 'value2')
|
||||
expected = {'key1': 'value1', 'key2': 'value2'}
|
||||
self.assertDictEqual(expected, foo._config_dict)
|
||||
|
||||
|
||||
class TestEnvironmentConfig(b.SaharaTestCase):
|
||||
def __init__(self, *args, **kwds):
|
||||
super(TestEnvironmentConfig, self).__init__(*args, **kwds)
|
||||
self.content = '''
|
||||
non export line
|
||||
export key1=value1
|
||||
export key2=value2
|
||||
export key
|
||||
'''
|
||||
|
||||
def test_remote_path(self):
|
||||
foo = conf_f.EnvironmentConfig('foo')
|
||||
foo.remote_path = '/bar'
|
||||
self.assertEqual('/bar/foo', foo.remote_path)
|
||||
|
||||
def test_parse(self):
|
||||
foo = conf_f.EnvironmentConfig('foo')
|
||||
foo.parse(self.content)
|
||||
expected = {'key1': 'value1', 'key2': 'value2'}
|
||||
self.assertDictEqual(expected, foo._config_dict)
|
||||
|
||||
def test_render(self):
|
||||
foo = conf_f.EnvironmentConfig('foo')
|
||||
expected = {'ke1': 'value1', 'key2': 'value2'}
|
||||
foo._config_dict = expected
|
||||
actual = foo.render()
|
||||
bar = conf_f.EnvironmentConfig('bar')
|
||||
bar.parse(actual)
|
||||
self.assertDictEqual(expected, bar._config_dict)
|
||||
|
||||
def test_render_extra_properties(self):
|
||||
foo = conf_f.EnvironmentConfig('foo')
|
||||
foo.parse(self.content)
|
||||
foo.add_property('key3', 'value3')
|
||||
foo_content = foo.render()
|
||||
bar = conf_f.EnvironmentConfig('bar')
|
||||
bar.parse(foo_content)
|
||||
expected = {'key1': 'value1', 'key2': 'value2', 'key3': 'value3'}
|
||||
self.assertDictEqual(expected, bar._config_dict)
|
||||
|
||||
def test_add_property(self):
|
||||
foo = conf_f.EnvironmentConfig('foo')
|
||||
self.assertDictEqual({}, foo._config_dict)
|
||||
foo.add_property('key1', 'value1')
|
||||
self.assertDictEqual({'key1': 'value1'}, foo._config_dict)
|
||||
foo.add_property('key2', 'value2')
|
||||
expected = {'key1': 'value1', 'key2': 'value2'}
|
||||
self.assertDictEqual(expected, foo._config_dict)
|
||||
|
||||
def test_get_config_value(self):
|
||||
foo = conf_f.EnvironmentConfig('foo')
|
||||
foo._config_dict = {'foo': 'bar'}
|
||||
self.assertEqual('bar', foo._get_config_value('foo'))
|
||||
self.assertIsNone(foo._get_config_value('bar'))
|
||||
|
Loading…
Reference in New Issue
Block a user