Merge "Add Swift integration with Spark"

This commit is contained in:
Jenkins
2015-02-04 09:17:27 +00:00
committed by Gerrit Code Review
13 changed files with 571 additions and 173 deletions

View File

@@ -15,8 +15,13 @@
import mock
import copy
import xml.dom.minidom as xml
from sahara.plugins.spark import config_helper as c_helper
from sahara.swift import swift_helper as swift
from sahara.tests.unit import base as test_base
from sahara.utils import xmlutils
class ConfigHelperUtilsTest(test_base.SaharaTestCase):
@@ -56,3 +61,39 @@ class ConfigHelperUtilsTest(test_base.SaharaTestCase):
self.assertFalse(configs['valid'])
self.assertNotIn(configs, 'script')
self.assertNotIn(configs, 'cron')
@mock.patch("sahara.swift.utils.retrieve_auth_url")
def test_generate_xml_configs(self, auth_url):
auth_url.return_value = "http://localhost:5000/v2/"
# Make a dict of swift configs to verify generated values
swift_vals = c_helper.extract_name_values(swift.get_swift_configs())
# Make sure that all the swift configs are in core-site
c = c_helper.generate_xml_configs({}, ['/mnt/one'], 'localhost', None)
doc = xml.parseString(c['core-site'])
configuration = doc.getElementsByTagName('configuration')
properties = xmlutils.get_property_dict(configuration[0])
self.assertDictContainsSubset(swift_vals, properties)
# Make sure that user values have precedence over defaults
c = c_helper.generate_xml_configs(
{'HDFS': {'fs.swift.service.sahara.tenant': 'fred'}},
['/mnt/one'], 'localhost', None)
doc = xml.parseString(c['core-site'])
configuration = doc.getElementsByTagName('configuration')
properties = xmlutils.get_property_dict(configuration[0])
mod_swift_vals = copy.copy(swift_vals)
mod_swift_vals['fs.swift.service.sahara.tenant'] = 'fred'
self.assertDictContainsSubset(mod_swift_vals, properties)
# Make sure that swift confgs are left out if not enabled
c = c_helper.generate_xml_configs(
{'HDFS': {'fs.swift.service.sahara.tenant': 'fred'},
'general': {'Enable Swift': False}},
['/mnt/one'], 'localhost', None)
doc = xml.parseString(c['core-site'])
configuration = doc.getElementsByTagName('configuration')
properties = xmlutils.get_property_dict(configuration[0])
for key in mod_swift_vals.keys():
self.assertNotIn(key, properties)

View File

@@ -14,6 +14,7 @@
# limitations under the License.
import mock
import os
import sahara.exceptions as ex
from sahara.service.edp.spark import engine as se
@@ -25,6 +26,14 @@ class TestSpark(base.SaharaTestCase):
def setUp(self):
super(TestSpark, self).setUp()
self.master_host = "master"
self.master_port = 7077
self.master_inst = "6789"
self.spark_pid = "12345"
self.spark_home = "/opt/spark"
self.workflow_dir = "/wfdir"
self.driver_cp = "/usr/lib/hadoop/hadoop-swift.jar"
def test_get_pid_and_inst_id(self):
'''Test parsing of job ids
@@ -307,83 +316,308 @@ class TestSpark(base.SaharaTestCase):
# check that we have nothing new to report ...
self.assertEqual(status, None)
@mock.patch('sahara.service.edp.binary_retrievers.dispatch.get_raw_binary')
@mock.patch('sahara.utils.remote.get_remote')
def test_upload_job_files(self, get_remote, get_raw_binary):
main_names = ["main1", "main2", "main3"]
lib_names = ["lib1", "lib2", "lib3"]
def make_data_objects(*args):
objs = []
for name in args:
m = mock.Mock()
m.name = name
objs.append(m)
return objs
job = mock.Mock()
job.name = "job"
job.mains = make_data_objects(*main_names)
job.libs = make_data_objects(*lib_names)
# This is to mock "with remote.get_remote(instance) as r"
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
get_raw_binary.return_value = "data"
eng = se.SparkJobEngine("cluster")
paths, builtins = eng._upload_job_files("where", "/somedir", job, {})
self.assertEqual(paths,
["/somedir/" + n for n in main_names + lib_names])
for path in paths:
remote_instance.write_file_to.assert_any_call(path, "data")
def _make_master_instance(self, return_code=0):
master = mock.Mock()
master.execute_command.return_value = (return_code, self.spark_pid)
master.hostname.return_value = self.master_host
master.id = self.master_inst
return master
def _config_values(self, *key):
return {("Spark", "Master port", "cluster"): self.master_port,
("Spark", "Spark home", "cluster"): self.spark_home,
("Spark", "Executor extra classpath",
"cluster"): self.driver_cp}[key]
@mock.patch('sahara.conductor.API.job_execution_get')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.plugins.spark.config_helper.get_config_value')
@mock.patch('sahara.service.edp.job_utils.upload_job_files',
return_value=["/wfdir/app.jar",
"/wfdir/jar1.jar",
"/wfdir/jar2.jar"])
@mock.patch('sahara.service.edp.job_utils.create_workflow_dir',
return_value="/wfdir")
@mock.patch('sahara.service.edp.job_utils.create_workflow_dir')
@mock.patch('sahara.plugins.utils.get_instance')
@mock.patch('sahara.conductor.API.job_get')
@mock.patch('sahara.context.ctx', return_value="ctx")
def test_run_job(self, ctx, job_get, get_instance, create_workflow_dir,
upload_job_files, get_config_value, get_remote,
job_exec_get):
def _setup_run_job(self, master_instance, job_configs, files,
ctx, job_get, get_instance, create_workflow_dir,
get_config_value, get_remote, job_exec_get):
eng = se.SparkJobEngine("cluster")
def _upload_job_files(where, job_dir, job,
libs_subdir=True, job_configs=None):
paths = [os.path.join(self.workflow_dir, f) for f in files['jars']]
bltns = files.get('bltns', [])
bltns = [os.path.join(self.workflow_dir, f) for f in bltns]
return paths, bltns
job = mock.Mock()
job.name = "MyJob"
job_get.return_value = job
job_exec = mock.Mock()
job_exec.job_configs = {'configs': {"edp.java.main_class":
"org.me.myclass"},
'args': ['input_arg', 'output_arg']}
job_exec.job_configs = job_configs
master = mock.Mock()
get_instance.return_value = master
master.hostname.return_value = "master"
master.id = "6789"
get_config_value.side_effect = self._config_values
get_config_value.side_effect = lambda *x: {
("Spark", "Master port", "cluster"): 7077,
("Spark", "Spark home", "cluster"): "/opt/spark"}[x]
create_workflow_dir.return_value = self.workflow_dir
# This is to mock "with remote.get_remote(master) as r" in run_job
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
remote_instance.execute_command.return_value = (0, "12345")
return_value=master_instance)
get_instance.return_value = master_instance
eng = se.SparkJobEngine("cluster")
eng._upload_job_files = mock.Mock()
eng._upload_job_files.side_effect = _upload_job_files
status = eng.run_job(job_exec)
# Check that we launch on the master node
get_instance.assert_called_with("cluster", "master")
get_instance.assert_called_with("cluster", self.master_host)
return status
def test_run_job_raise(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar",
"jar1.jar",
"jar2.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance(return_code=1)
# If execute_command returns an error we should get a raise
self.assertRaises(ex.EDPError,
self._setup_run_job,
master_instance, job_configs, files)
def test_run_job_extra_jars_args(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar",
"jar1.jar",
"jar2.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
remote_instance.execute_command.assert_called_with(
'cd /wfdir; ./launch_command /opt/spark/bin/spark-submit '
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_home)s/bin/spark-submit '
'--class org.me.myclass --jars jar1.jar,jar2.jar '
'--master spark://master:7077 app.jar input_arg output_arg '
'> /dev/null 2>&1 & echo $!')
'--master spark://%(master_host)s:%(master_port)s '
'app.jar input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_home": self.spark_home,
"master_host": self.master_host,
"master_port": self.master_port})
# Check result here
self.assertEqual(status, ("12345@6789",
self.assertEqual(status, ("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": "/wfdir"}))
{"spark-path": self.workflow_dir}))
# Run again without arguments
job_exec.job_configs['args'] = []
status = eng.run_job(job_exec)
remote_instance.execute_command.assert_called_with(
'cd /wfdir; ./launch_command /opt/spark/bin/spark-submit '
'--class org.me.myclass --jars jar1.jar,jar2.jar '
'--master spark://master:7077 app.jar '
'> /dev/null 2>&1 & echo $!')
def test_run_job_args(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
'args': ['input_arg', 'output_arg']
}
# Run again without support jars.
upload_job_files.return_value = ["/wfdir/app.jar"]
status = eng.run_job(job_exec)
remote_instance.execute_command.assert_called_with(
'cd /wfdir; ./launch_command /opt/spark/bin/spark-submit '
files = {'jars': ["app.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_home)s/bin/spark-submit '
'--class org.me.myclass '
'--master spark://master:7077 app.jar '
'> /dev/null 2>&1 & echo $!')
'--master spark://%(master_host)s:%(master_port)s '
'app.jar input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_home": self.spark_home,
"master_host": self.master_host,
"master_port": self.master_port})
# run again with non-zero result, should raise EDPError
remote_instance.execute_command.return_value = (1, "some_error")
self.assertRaises(ex.EDPError, eng.run_job, job_exec)
# Check result here
self.assertEqual(status, ("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}))
def test_run_job(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
}
files = {'jars': ["app.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_home)s/bin/spark-submit '
'--class org.me.myclass '
'--master spark://%(master_host)s:%(master_port)s '
'app.jar '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_home": self.spark_home,
"master_host": self.master_host,
"master_port": self.master_port})
# Check result here
self.assertEqual(status, ("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}))
def test_run_job_wrapper_extra_jars_args(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass",
"edp.spark.adapt_for_swift": True},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar",
"jar1.jar",
"jar2.jar"],
'bltns': ["wrapper.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_home)s/bin/spark-submit '
'--driver-class-path %(driver_cp)s '
'--class org.openstack.sahara.edp.SparkWrapper '
'--jars app.jar,jar1.jar,jar2.jar '
'--master spark://%(master_host)s:%(master_port)s '
'wrapper.jar spark.xml org.me.myclass input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_home": self.spark_home,
"driver_cp": self.driver_cp,
"master_host": self.master_host,
"master_port": self.master_port})
# Check result here
self.assertEqual(status, ("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}))
def test_run_job_wrapper_args(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass",
"edp.spark.adapt_for_swift": True},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar"],
'bltns': ["wrapper.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_home)s/bin/spark-submit '
'--driver-class-path %(driver_cp)s '
'--class org.openstack.sahara.edp.SparkWrapper '
'--jars app.jar '
'--master spark://%(master_host)s:%(master_port)s '
'wrapper.jar spark.xml org.me.myclass input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_home": self.spark_home,
"driver_cp": self.driver_cp,
"master_host": self.master_host,
"master_port": self.master_port})
# Check result here
self.assertEqual(status, ("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}))
def test_run_job_wrapper(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass",
"edp.spark.adapt_for_swift": True}
}
files = {'jars': ["app.jar"],
'bltns': ["wrapper.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command %(spark_home)s/bin/spark-submit '
'--driver-class-path %(driver_cp)s '
'--class org.openstack.sahara.edp.SparkWrapper '
'--jars app.jar '
'--master spark://%(master_host)s:%(master_port)s '
'wrapper.jar spark.xml org.me.myclass '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_home": self.spark_home,
"driver_cp": self.driver_cp,
"master_host": self.master_host,
"master_port": self.master_port})
# Check result here
self.assertEqual(status, ("%s@%s" % (self.spark_pid, self.master_inst),
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}))

View File

@@ -26,6 +26,7 @@ from sahara.service.edp import job_manager
from sahara.service.edp import job_utils
from sahara.service.edp.oozie.workflow_creator import workflow_factory
from sahara.swift import swift_helper as sw
from sahara.swift import utils as su
from sahara.tests.unit import base
from sahara.tests.unit.service.edp import edp_test_utils as u
from sahara.utils import edp
@@ -67,49 +68,6 @@ class TestJobManager(base.SaharaWithDbTestCase):
remote_instance.execute_command.assert_called_with(
"mkdir -p /tmp/somewhere/job/generated_uuid")
@mock.patch('sahara.service.edp.binary_retrievers.dispatch.get_raw_binary')
@mock.patch('sahara.utils.remote.get_remote')
def test_upload_job_files(self, get_remote, get_raw_binary):
main_names = ["main1", "main2", "main3"]
lib_names = ["lib1", "lib2", "lib3"]
def make_data_objects(*args):
objs = []
for name in args:
m = mock.Mock()
m.name = name
objs.append(m)
return objs
job = mock.Mock()
job.name = "job"
job.mains = make_data_objects(*main_names)
job.libs = make_data_objects(*lib_names)
# This is to mock "with remote.get_remote(instance) as r"
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
get_raw_binary.return_value = "data"
paths = job_utils.upload_job_files(
"where", "/somedir", job, libs_subdir=False)
self.assertEqual(paths,
["/somedir/" + n for n in main_names + lib_names])
for path in paths:
remote_instance.write_file_to.assert_any_call(path, "data")
remote_instance.write_file_to.reset_mock()
paths = job_utils.upload_job_files(
"where", "/somedir", job, libs_subdir=True)
remote_instance.execute_command.assert_called_with(
"mkdir -p /somedir/libs")
expected = ["/somedir/" + n for n in main_names]
expected += ["/somedir/libs/" + n for n in lib_names]
self.assertEqual(paths, expected)
for path in paths:
remote_instance.write_file_to.assert_any_call(path, "data")
@mock.patch('sahara.conductor.API.job_binary_get')
def test_build_workflow_for_job_pig(self, job_binary):
@@ -478,15 +436,14 @@ class TestJobManager(base.SaharaWithDbTestCase):
self.assertEqual(orig_exec_job_dict, exec_job_dict)
def test_inject_swift_url_suffix(self):
w = workflow_factory.BaseFactory()
self.assertEqual(w.inject_swift_url_suffix("swift://ex/o"),
self.assertEqual(su.inject_swift_url_suffix("swift://ex/o"),
"swift://ex.sahara/o")
self.assertEqual(w.inject_swift_url_suffix("swift://ex.sahara/o"),
self.assertEqual(su.inject_swift_url_suffix("swift://ex.sahara/o"),
"swift://ex.sahara/o")
self.assertEqual(w.inject_swift_url_suffix("hdfs://my/path"),
self.assertEqual(su.inject_swift_url_suffix("hdfs://my/path"),
"hdfs://my/path")
self.assertEqual(w.inject_swift_url_suffix(12345), 12345)
self.assertEqual(w.inject_swift_url_suffix(['test']), ['test'])
self.assertEqual(su.inject_swift_url_suffix(12345), 12345)
self.assertEqual(su.inject_swift_url_suffix(['test']), ['test'])
@mock.patch('sahara.conductor.API.job_execution_update')
@mock.patch('sahara.service.edp.job_manager._run_job')