Implement EDP for a Spark standalone cluster

This change adds an EDP engine for a Spark standalone cluster.
The engine uses the spark-submit script and various linux
commands via ssh to run, monitor, and terminate Spark jobs.

Currently, the Spark engine can launch "Java" job types (this is
the same type used to submit Oozie Java action on Hadoop clusters)

A directory is created for each Spark job on the master node which
contains jar files, the script used to launch the job, the
job's stderr and stdout, and a result file containing the exit
status of spark-submit.  The directory is named after the Sahara
job and the job execution id so it is easy to locate.  Preserving
these files is a big help in debugging jobs.

A few general improvements are included:
* engine.cancel_job() may return updated job status
* engine.run_job() may return job status and fields for job_execution.extra
in addition to job id

Still to do:
* create a proper Spark job type (new CR)
* make the job dir location on the master node configurable (new CR)
* add something to clean up job directories on the master node (new CR)
* allows users to pass some general options to spark-submit itself (new CR)

Partial implements: blueprint edp-spark-standalone

Change-Id: I2c84e9cdb75e846754896d7c435e94bc6cc397ff
This commit is contained in:
Trevor McKay
2014-07-17 20:41:09 -04:00
parent e23efe5471
commit 5698799ee3
10 changed files with 739 additions and 31 deletions

View File

@@ -414,3 +414,36 @@ class SparkProvider(p.ProvisioningPluginBase):
"there would be not enough nodes for HDFS "
"replicas (replication factor is %s)" %
rep_factor)
def get_edp_engine(self, cluster, job_type, default_engines):
'''Select an EDP engine for Spark standalone deployment
The default_engines parameter is a list of default EDP implementations.
Each item in the list is a dictionary, and each dictionary has the
following elements:
name (a simple name for the implementation)
job_types (a list of EDP job types supported by the implementation)
engine (a class derived from sahara.service.edp.base_engine.JobEngine)
This method will choose the first engine that it finds from the default
list which meets the following criteria:
eng['name'] == spark
eng['job_types'] == job_type
An instance of that engine will be allocated and returned.
:param cluster: a Sahara cluster object
:param job_type: an EDP job type string
:param default_engines: a list of dictionaries describing the default
implementations.
:returns: an instance of a class derived from
sahara.service.edp.base_engine.JobEngine or None
'''
# We know that spark EDP requires at least spark 1.0.0
# to have spark-submit. Reject anything else.
if cluster.hadoop_version >= "1.0.0":
for eng in default_engines:
if self.name == eng['name'] and job_type in eng["job_types"]:
return eng["engine"](cluster)

View File

@@ -35,8 +35,6 @@ CONF = cfg.CONF
conductor = c.API
terminated_job_states = ['DONEWITHERROR', 'FAILED', 'KILLED', 'SUCCEEDED']
def _make_engine(name, job_types, engine_class):
return {"name": name,
@@ -51,7 +49,7 @@ default_engines = [_make_engine("oozie",
edp.JOB_TYPE_PIG],
oozie_engine.OozieJobEngine),
_make_engine("spark",
[],
[edp.JOB_TYPE_JAVA],
spark_engine.SparkJobEngine)
]
@@ -67,15 +65,19 @@ def _get_job_engine(cluster, job_execution):
default_engines)
def _write_job_status(job_execution, job_info):
update = {"info": job_info}
if job_info['status'] in job_utils.terminated_job_states:
update['end_time'] = datetime.datetime.now()
return conductor.job_execution_update(context.ctx(),
job_execution,
update)
def _update_job_status(engine, job_execution):
job_info = engine.get_job_status(job_execution)
if job_info is not None:
update = {"info": job_info}
if job_info['status'] in terminated_job_states:
update['end_time'] = datetime.datetime.now()
job_execution = conductor.job_execution_update(context.ctx(),
job_execution,
update)
job_execution = _write_job_status(job_execution, job_info)
return job_execution
@@ -102,11 +104,25 @@ def _run_job(job_execution_id):
raise e.EDPError(_("Cluster does not support job type %s")
% _get_job_type(job_execution))
job_execution = _update_job_execution_extra(cluster, job_execution)
jid = eng.run_job(job_execution)
# Job id is a string
# Status is a string
# Extra is a dictionary to add to extra in the job_execution
jid, status, extra = eng.run_job(job_execution)
# Set the job id and the start time
# Optionally, update the status and the 'extra' field
update_dict = {'oozie_job_id': jid,
'start_time': datetime.datetime.now()}
if status:
update_dict['info'] = {'status': status}
if extra:
curr_extra = job_execution.extra.copy()
curr_extra.update(extra)
update_dict['extra'] = curr_extra
job_execution = conductor.job_execution_update(
ctx, job_execution, {'oozie_job_id': jid,
'start_time': datetime.datetime.now()})
ctx, job_execution, update_dict)
def run_job(job_execution_id):
@@ -132,12 +148,14 @@ def cancel_job(job_execution_id):
engine = _get_job_engine(cluster, job_execution)
if engine is not None:
try:
engine.cancel_job(job_execution)
job_info = engine.cancel_job(job_execution)
except Exception as e:
job_info = None
LOG.exception(
_LE("Error during cancel of job execution %(job)s: "
"%(error)s"), {'job': job_execution.id, 'error': e})
job_execution = _update_job_status(engine, job_execution)
if job_info is not None:
job_execution = _write_job_status(job_execution, job_info)
return job_execution

View File

@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import uuid
from oslo.config import cfg
@@ -39,12 +40,14 @@ CONF.register_opts(opts)
conductor = c.API
terminated_job_states = ['DONEWITHERROR', 'FAILED', 'KILLED', 'SUCCEEDED']
def get_plugin(cluster):
return plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
def upload_job_files(where, job_dir, job, hdfs_user):
def upload_job_files_to_hdfs(where, job_dir, job, hdfs_user):
mains = job.mains or []
libs = job.libs or []
uploaded_paths = []
@@ -64,7 +67,34 @@ def upload_job_files(where, job_dir, job, hdfs_user):
return uploaded_paths
def create_workflow_dir(where, job, hdfs_user):
def upload_job_files(where, job_dir, job, libs_subdir=True):
mains = job.mains or []
libs = job.libs or []
uploaded_paths = []
# Include libs files in the main dir if libs_subdir is False
if not libs_subdir:
mains += libs
with remote.get_remote(where) as r:
for job_file in mains:
dst = os.path.join(job_dir, job_file.name)
raw_data = dispatch.get_raw_binary(job_file)
r.write_file_to(dst, raw_data)
uploaded_paths.append(dst)
if libs_subdir and libs:
libs_dir = os.path.join(job_dir, "libs")
r.execute_command("mkdir -p %s" % libs_dir)
for job_file in libs:
dst = os.path.join(libs_dir, job_file.name)
raw_data = dispatch.get_raw_binary(job_file)
r.write_file_to(dst, raw_data)
uploaded_paths.append(dst)
return uploaded_paths
def create_hdfs_workflow_dir(where, job, hdfs_user):
constructed_dir = '/user/%s/' % hdfs_user
constructed_dir = _add_postfix(constructed_dir)
@@ -75,6 +105,18 @@ def create_workflow_dir(where, job, hdfs_user):
return constructed_dir
def create_workflow_dir(where, path, job, uuid=None):
if uuid is None:
uuid = six.text_type(uuid.uuid4())
constructed_dir = _add_postfix(path)
constructed_dir += '%s/%s' % (job.name, uuid)
with remote.get_remote(where) as r:
ret, stdout = r.execute_command("mkdir -p %s" % constructed_dir)
return constructed_dir
def get_data_sources(job_execution, job):
if edp.compare_job_type(job.type, edp.JOB_TYPE_JAVA):
return None, None

View File

@@ -58,7 +58,10 @@ class OozieJobEngine(base_engine.JobEngine):
return "%s/workflow.xml" % job_dir
def cancel_job(self, job_execution):
self._get_client().kill_job(job_execution)
if job_execution.oozie_job_id is not None:
client = self._get_client()
client.kill_job(job_execution)
return client.get_job_status(job_execution)
def get_job_status(self, job_execution):
if job_execution.oozie_job_id is not None:
@@ -83,8 +86,10 @@ class OozieJobEngine(base_engine.JobEngine):
# However, other engines may need it.
oozie_server = self.plugin.get_oozie_server(self.cluster)
wf_dir = job_utils.create_workflow_dir(oozie_server, job, hdfs_user)
job_utils.upload_job_files(oozie_server, wf_dir, job, hdfs_user)
wf_dir = job_utils.create_hdfs_workflow_dir(oozie_server,
job, hdfs_user)
job_utils.upload_job_files_to_hdfs(oozie_server, wf_dir,
job, hdfs_user)
wf_xml = workflow_factory.get_workflow_xml(
job, self.cluster, job_execution, input_source, output_source)
@@ -99,7 +104,12 @@ class OozieJobEngine(base_engine.JobEngine):
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
job_execution)
client.run_job(job_execution, oozie_job_id)
return oozie_job_id
try:
status = client.get_job_status(job_execution,
oozie_job_id)['status']
except Exception:
status = None
return (oozie_job_id, status, None)
def get_possible_job_config(job_type):

View File

@@ -50,10 +50,11 @@ class OozieClient(object):
"?action=kill")
_check_status_code(resp, 200)
def get_job_status(self, job_execution):
def get_job_status(self, job_execution, job_id=None):
if job_id is None:
job_id = job_execution.oozie_job_id
session = self._get_http_session(job_execution.extra.get('neutron'))
resp = session.get(self.job_url % job_execution.oozie_job_id +
"?show=info")
resp = session.get(self.job_url % job_id + "?show=info")
_check_status_code(resp, 200)
return get_json(resp)

View File

@@ -0,0 +1,66 @@
#!/usr/bin/env python
# Copyright (c) 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import signal
import subprocess
import sys
log = logging.getLogger()
hdlr = logging.FileHandler(sys.argv[0]+".log")
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
log.addHandler(hdlr)
log.setLevel(logging.DEBUG)
def make_handler(a):
def handle_signal(signum, stack):
a.send_signal(signum)
log.info("Sent SIGINT to subprocess")
return handle_signal
log.info("Running %s" % ' '.join(sys.argv[1:]))
try:
# "Unignore" SIGINT before the subprocess is launched
# in case this process is running in the background
# (background processes ignore SIGINT)
signal.signal(signal.SIGINT, signal.SIG_DFL)
# Interpret all command line args as the command to run
a = subprocess.Popen(sys.argv[1:],
stdout=open("stdout", "w"),
stderr=open("stderr", "w"))
# Set our handler to trap SIGINT and propagate to the child
# The expectation is that the child process handles SIGINT
# and exits.
signal.signal(signal.SIGINT, make_handler(a))
# Write out the childpid just in case there is a
# need to send special signals directly to the child process
open("childpid", "w").write("%s\n" % a.pid)
# Wait for child to exit and write result file
log.info("Waiting for subprocess %s" % a.pid)
ret = a.wait()
log.info("Subprocess exit status %s" % ret)
open("result", "w").write("%s\n" % ret)
except Exception as e:
log.exception(e)

View File

@@ -13,18 +13,173 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from sahara import conductor as c
from sahara import context
from sahara import exceptions as e
from sahara.plugins.general import utils as plugin_utils
from sahara.plugins.spark import config_helper as c_helper
from sahara.service.edp import base_engine
from sahara.service.edp import job_utils
from sahara.utils import files
from sahara.utils import general
from sahara.utils import remote
conductor = c.API
class SparkJobEngine(base_engine.JobEngine):
def __init__(self, cluster):
self.cluster = cluster
def _get_pid_and_inst_id(self, job_id):
try:
pid, inst_id = job_id.split("@", 1)
if pid and inst_id:
return (pid, inst_id)
except Exception:
pass
return "", ""
def _get_instance_if_running(self, job_execution):
pid, inst_id = self._get_pid_and_inst_id(job_execution.oozie_job_id)
if not pid or not inst_id or (
job_execution.info['status'] in job_utils.terminated_job_states):
return None, None
# TODO(tmckay): well, if there is a list index out of range
# error here it probably means that the instance is gone. If we
# have a job execution that is not terminated, and the instance
# is gone, we should probably change the status somehow.
# For now, do nothing.
try:
instance = general.get_instances(self.cluster, [inst_id])[0]
except Exception:
instance = None
return pid, instance
def _get_result_file(self, r, job_execution):
result = os.path.join(job_execution.extra['spark-path'], "result")
return r.execute_command("cat %s" % result,
raise_when_error=False)
def _check_pid(self, r, pid):
ret, stdout = r.execute_command("ps hp %s" % pid,
raise_when_error=False)
return ret
def _get_job_status_from_remote(self, r, pid, job_execution):
# If the pid is there, it's still running
if self._check_pid(r, pid) == 0:
return {"status": "RUNNING"}
# The process ended. Look in the result file to get the exit status
ret, stdout = self._get_result_file(r, job_execution)
if ret == 0:
exit_status = stdout.strip()
if exit_status == "0":
return {"status": "SUCCEEDED"}
# SIGINT will yield either -2 or 130
elif exit_status in ["-2", "130"]:
return {"status": "KILLED"}
# Well, process is done and result is missing or unexpected
return {"status": "DONEWITHERROR"}
def cancel_job(self, job_execution):
return job_execution
pid, instance = self._get_instance_if_running(job_execution)
if instance is not None:
with remote.get_remote(instance) as r:
ret, stdout = r.execute_command("kill -SIGINT %s" % pid,
raise_when_error=False)
if ret == 0:
# We had some effect, check the status
return self._get_job_status_from_remote(r,
pid, job_execution)
def get_job_status(self, job_execution):
return {"status": "FAILED"}
pid, instance = self._get_instance_if_running(job_execution)
if instance is not None:
with remote.get_remote(instance) as r:
return self._get_job_status_from_remote(r, pid, job_execution)
def _job_script(self):
path = "service/edp/resources/launch_command.py"
return files.get_file_text(path)
def run_job(self, job_execution):
return 0
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
# We'll always run the driver program on the master
master = plugin_utils.get_instance(self.cluster, "master")
# TODO(tmckay): wf_dir should probably be configurable.
# The only requirement is that the dir is writable by the image user
wf_dir = job_utils.create_workflow_dir(master, '/tmp/spark-edp', job,
job_execution.id)
paths = job_utils.upload_job_files(master, wf_dir, job,
libs_subdir=False)
# We can shorten the paths in this case since we'll run out of wf_dir
paths = [os.path.basename(p) for p in paths]
# TODO(tmckay): for now, paths[0] is always assumed to be the app
# jar and we generate paths in order (mains, then libs).
# When we have a Spark job type, we can require a "main" and set
# the app jar explicitly to be "main"
app_jar = paths.pop(0)
# The rest of the paths will be passed with --jars
additional_jars = ",".join(paths)
if additional_jars:
additional_jars = "--jars " + additional_jars
# Launch the spark job using spark-submit and deploy_mode = client
host = master.hostname()
port = c_helper.get_config_value("Spark", "Master port", self.cluster)
spark_submit = os.path.join(
c_helper.get_config_value("Spark",
"Spark home",
self.cluster),
"bin/spark-submit")
job_class = job_execution.job_configs.configs["edp.java.main_class"]
# TODO(tmckay): we need to clean up wf_dirs on long running clusters
# TODO(tmckay): probably allow for general options to spark-submit
args = " ".join(job_execution.job_configs.get('args', []))
# The redirects of stdout and stderr will preserve output in the wf_dir
cmd = "%s %s --class %s %s --master spark://%s:%s %s" % (
spark_submit,
app_jar,
job_class,
additional_jars,
host,
port,
args)
# If an exception is raised here, the job_manager will mark
# the job failed and log the exception
with remote.get_remote(master) as r:
# Upload the command launch script
launch = os.path.join(wf_dir, "launch_command")
r.write_file_to(launch, self._job_script())
r.execute_command("chmod +x %s" % launch)
ret, stdout = r.execute_command(
"cd %s; ./launch_command %s > /dev/null 2>&1 & echo $!"
% (wf_dir, cmd))
if ret == 0:
# Success, we'll add the wf_dir in job_execution.extra and store
# pid@instance_id as the job id
# We know the job is running so return "RUNNING"
return (stdout.strip() + "@" + master.id,
"RUNNING",
{'spark-path': wf_dir})
# Hmm, no execption but something failed.
# Since we're using backgrounding with redirect, this is unlikely.
raise e.EDPError("Spark job execution failed. Exit status = %s, "
"stdout = %s" % (ret, stdout))

View File

@@ -0,0 +1,383 @@
# Copyright (c) 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import sahara.exceptions as ex
from sahara.service.edp import job_utils
from sahara.service.edp.spark import engine as se
from sahara.tests.unit import base
class TestSpark(base.SaharaTestCase):
def setUp(self):
super(TestSpark, self).setUp()
def test_get_pid_and_inst_id(self):
'''Test parsing of job ids
Test that job ids of the form pid@instance are
split into pid and instance ids by eng._get_pid_and_inst_id()
but anything else returns empty strings
'''
eng = se.SparkJobEngine(None)
for job_id in [None, "", "@", "something", "pid@", "@instance"]:
pid, inst_id = eng._get_pid_and_inst_id(job_id)
self.assertEqual((pid, inst_id), ("", ""))
pid, inst_id = eng._get_pid_and_inst_id("pid@instance")
self.assertEqual(("pid", "instance"), (pid, inst_id))
@mock.patch('sahara.utils.general.get_instances')
def test_get_instance_if_running(self, get_instances):
'''Test retrieval of pid and instance object for running job
If the job id is valid and the job status is non-terminated,
_get_instance_if_running() should retrieve the instance
based on the inst_id and return the pid and instance.
If the job is invalid or the job is terminated, it should
return None, None.
If get_instances() throws an exception or returns an empty list,
the instance returned should be None (pid might still be set)
'''
get_instances.return_value = ["instance"]
job_exec = mock.Mock()
eng = se.SparkJobEngine("cluster")
job_exec.oozie_job_id = "invalid id"
self.assertEqual((None, None),
eng._get_instance_if_running(job_exec))
job_exec.oozie_job_id = "pid@inst_id"
for state in job_utils.terminated_job_states:
job_exec.info = {'status': state}
self.assertEqual((None, None),
eng._get_instance_if_running(job_exec))
job_exec.info = {'status': 'RUNNING'}
self.assertEqual(("pid", "instance"),
eng._get_instance_if_running(job_exec))
get_instances.assert_called_with("cluster", ["inst_id"])
# Pretend get_instances returns nothing
get_instances.return_value = []
pid, instance = eng._get_instance_if_running(job_exec)
self.assertEqual(instance, None)
# Pretend get_instances throws an exception
get_instances.side_effect = Exception("some failure")
pid, instance = eng._get_instance_if_running(job_exec)
self.assertEqual(instance, None)
def test_get_result_file(self):
remote = mock.Mock()
remote.execute_command.return_value = 999, "value"
job_exec = mock.Mock()
job_exec.extra = {"spark-path": "/tmp/spark-edp/Job/123"}
eng = se.SparkJobEngine("cluster")
ret, stdout = eng._get_result_file(remote, job_exec)
remote.execute_command.assert_called_with(
"cat /tmp/spark-edp/Job/123/result",
raise_when_error=False)
self.assertEqual((ret, stdout),
remote.execute_command.return_value)
def test_check_pid(self):
remote = mock.Mock()
remote.execute_command.return_value = 999, ""
eng = se.SparkJobEngine("cluster")
ret = eng._check_pid(remote, "pid")
remote.execute_command.assert_called_with("ps hp pid",
raise_when_error=False)
self.assertEqual(ret, 999)
@mock.patch.object(se.SparkJobEngine,
'_get_result_file',
autospec=True)
@mock.patch.object(se.SparkJobEngine,
'_check_pid',
autospec=True)
def test_get_job_status_from_remote(self, _check_pid, _get_result_file):
'''Test retrieval of job status from remote instance
If the process is present, status is RUNNING
If the process is not present, status depends on the result file
If the result file is missing, status is DONEWITHERROR
'''
eng = se.SparkJobEngine("cluster")
job_exec = mock.Mock()
remote = mock.Mock()
# Pretend process is running
_check_pid.return_value = 0
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
_check_pid.assert_called_with(eng, remote, "pid")
self.assertEqual({"status": "RUNNING"}, status)
# Pretend process ended and result file contains 0 (success)
_check_pid.return_value = 1
_get_result_file.return_value = 0, "0"
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": "SUCCEEDED"}, status)
# Pretend process ended and result file contains 1 (success)
_get_result_file.return_value = 0, "1"
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": "DONEWITHERROR"}, status)
# Pretend process ended and result file contains 130 (killed)
_get_result_file.return_value = 0, "130"
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": "KILLED"}, status)
# Pretend process ended and result file contains -2 (killed)
_get_result_file.return_value = 0, "-2"
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": "KILLED"}, status)
# Pretend process ended and result file is missing
_get_result_file.return_value = 1, ""
status = eng._get_job_status_from_remote(remote, "pid", job_exec)
self.assertEqual({"status": "DONEWITHERROR"}, status)
@mock.patch.object(se.SparkJobEngine,
'_get_job_status_from_remote',
autospec=True)
@mock.patch.object(se.SparkJobEngine,
'_get_instance_if_running',
autospec=True)
@mock.patch('sahara.utils.remote.get_remote')
def test_get_job_status(self,
get_remote,
_get_instance_if_running,
_get_job_status_from_remote):
# This is to mock "with remote.get_remote(instance) as r"
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
# Pretend instance is not returned
_get_instance_if_running.return_value = "pid", None
job_exec = mock.Mock()
eng = se.SparkJobEngine("cluster")
status = eng.get_job_status(job_exec)
self.assertEqual(status, None)
# Pretend we have an instance
_get_instance_if_running.return_value = "pid", "instance"
_get_job_status_from_remote.return_value = {"status": "RUNNING"}
status = eng.get_job_status(job_exec)
_get_job_status_from_remote.assert_called_with(eng,
remote_instance,
"pid", job_exec)
self.assertEqual(status, {"status": "RUNNING"})
@mock.patch.object(se.SparkJobEngine,
'_get_instance_if_running',
autospec=True,
return_value=(None, None))
@mock.patch('sahara.utils.remote.get_remote')
def test_cancel_job_null_or_done(self,
get_remote,
_get_instance_if_running):
'''Test cancel_job() when instance is None
Test that cancel_job() returns None and does not try to
retrieve a remote instance if _get_instance_if_running() returns None
'''
eng = se.SparkJobEngine("cluster")
job_exec = mock.Mock()
self.assertIsNone(eng.cancel_job(job_exec))
self.assertTrue(_get_instance_if_running.called)
self.assertFalse(get_remote.called)
@mock.patch.object(se.SparkJobEngine,
'_get_job_status_from_remote',
autospec=True,
return_value={"status": "KILLED"})
@mock.patch.object(se.SparkJobEngine,
'_get_instance_if_running',
autospec=True,
return_value=("pid", "instance"))
@mock.patch('sahara.utils.remote.get_remote')
def test_cancel_job(self,
get_remote,
_get_instance_if_running,
_get_job_status_from_remote):
'''Test cancel_job() with a valid instance
For a valid instance, test that cancel_job:
* retrieves the remote instance
* executes the proper kill command
* retrieves the job status (because the remote command is successful)
'''
# This is to mock "with remote.get_remote(instance) as r" in cancel_job
# and to mock r.execute_command to return success
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
remote_instance.execute_command.return_value = (0, "standard out")
eng = se.SparkJobEngine("cluster")
job_exec = mock.Mock()
status = eng.cancel_job(job_exec)
# check that remote.get_remote was called with the result of
# eng._get_instance_if_running()
get_remote.assert_called_with("instance")
# check that execute_command was called with the proper arguments
# ("pid" was passed in)
remote_instance.execute_command.assert_called_with(
"kill -SIGINT pid",
raise_when_error=False)
# check that the job status was retrieved since the command succeeded
_get_job_status_from_remote.assert_called_with(eng,
remote_instance,
"pid", job_exec)
self.assertEqual(status, {"status": "KILLED"})
@mock.patch.object(se.SparkJobEngine,
'_get_job_status_from_remote',
autospec=True)
@mock.patch.object(se.SparkJobEngine,
'_get_instance_if_running',
autospec=True,
return_value=("pid", "instance"))
@mock.patch('sahara.utils.remote.get_remote')
def test_cancel_job_failed(self,
get_remote,
_get_instance_if_running,
_get_job_status_from_remote):
'''Test cancel_job() when remote command fails
For a valid instance and a failed kill command, test that cancel_job:
* retrieves the remote instance
* executes the proper kill command
* does not retrieve the job status (because the remote command failed)
'''
# This is to mock "with remote.get_remote(instance) as r"
# and to mock r.execute_command to return failure
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
remote_instance.execute_command.return_value = (-1, "some error")
eng = se.SparkJobEngine("cluster")
job_exec = mock.Mock()
status = eng.cancel_job(job_exec)
# check that remote.get_remote was called with the result of
# eng._get_instance_if_running
get_remote.assert_called_with("instance")
# check that execute_command was called with the proper arguments
# ("pid" was passed in)
remote_instance.execute_command.assert_called_with(
"kill -SIGINT pid",
raise_when_error=False)
# check that the job status was not retrieved since the command failed
self.assertEqual(_get_job_status_from_remote.called, 0)
# check that we have nothing new to report ...
self.assertEqual(status, None)
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.plugins.spark.config_helper.get_config_value')
@mock.patch('sahara.service.edp.job_utils.upload_job_files',
return_value=["/wfdir/app.jar",
"/wfdir/jar1.jar",
"/wfdir/jar2.jar"])
@mock.patch('sahara.service.edp.job_utils.create_workflow_dir',
return_value="/wfdir")
@mock.patch('sahara.plugins.general.utils.get_instance')
@mock.patch('sahara.conductor.API.job_get')
@mock.patch('sahara.context.ctx', return_value="ctx")
def test_run_job(self, ctx, job_get, get_instance, create_workflow_dir,
upload_job_files, get_config_value, get_remote):
def fix_get(field, default):
if field == "args":
return ["input_arg", "output_arg"]
return default
eng = se.SparkJobEngine("cluster")
job = mock.Mock()
job.name = "MyJob"
job_get.return_value = job
job_exec = mock.Mock()
job_exec.job_configs.configs = {"edp.java.main_class":
"org.me.myclass"}
job_exec.job_configs.get = fix_get
master = mock.Mock()
get_instance.return_value = master
master.hostname.return_value = "master"
master.id = "6789"
get_config_value.side_effect = lambda *x: {
("Spark", "Master port", "cluster"): 7077,
("Spark", "Spark home", "cluster"): "/opt/spark"}[x]
# This is to mock "with remote.get_remote(master) as r" in run_job
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
remote_instance.execute_command.return_value = (0, "12345")
status = eng.run_job(job_exec)
# Check that we launch on the master node
get_instance.assert_called_with("cluster", "master")
# Check the command
remote_instance.execute_command.assert_called_with(
'cd /wfdir; ./launch_command /opt/spark/bin/spark-submit app.jar '
'--class org.me.myclass --jars jar1.jar,jar2.jar '
'--master spark://master:7077 input_arg output_arg '
'> /dev/null 2>&1 & echo $!')
# Check result here
self.assertEqual(status, ("12345@6789",
"RUNNING",
{"spark-path": "/wfdir"}))
# Run again without support jars. Note the extra space
# after 'myclass', this is from a %s with empty string
upload_job_files.return_value = ["/wfdir/app.jar"]
status = eng.run_job(job_exec)
remote_instance.execute_command.assert_called_with(
'cd /wfdir; ./launch_command /opt/spark/bin/spark-submit app.jar '
'--class org.me.myclass '
'--master spark://master:7077 input_arg output_arg '
'> /dev/null 2>&1 & echo $!')
# run again with non-zero result, should raise EDPError
remote_instance.execute_command.return_value = (1, "some_error")
self.assertRaises(ex.EDPError, eng.run_job, job_exec)

View File

@@ -52,7 +52,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
helper.return_value = 'ok'
job, _ = _create_all_stack(edp.JOB_TYPE_PIG)
res = job_utils.create_workflow_dir(mock.Mock(), job, 'hadoop')
res = job_utils.create_hdfs_workflow_dir(mock.Mock(), job, 'hadoop')
self.assertIn('/user/hadoop/special_name/', res)
remote.reset_mock()
@@ -73,13 +73,13 @@ class TestJobManager(base.SaharaWithDbTestCase):
conductor_raw_data.return_value = 'ok'
job, _ = _create_all_stack(edp.JOB_TYPE_PIG)
res = job_utils.upload_job_files(mock.Mock(), 'job_prefix',
job, 'hadoop')
res = job_utils.upload_job_files_to_hdfs(mock.Mock(), 'job_prefix',
job, 'hadoop')
self.assertEqual(['job_prefix/script.pig'], res)
job, _ = _create_all_stack(edp.JOB_TYPE_MAPREDUCE)
res = job_utils.upload_job_files(mock.Mock(), 'job_prefix',
job, 'hadoop')
res = job_utils.upload_job_files_to_hdfs(mock.Mock(), 'job_prefix',
job, 'hadoop')
self.assertEqual(['job_prefix/lib/main.jar'], res)
remote.reset_mock()