Support of Spark EDP in Ambari plugin
Start support Spark EDP jobs in Ambari plugin. Also added sparkPi job to test this support. Current issues: * Spark with Swift doesn't work Change-Id: I2374d387054efa20876fbcee46ede038e0f3d520 Partially-implements-blueprint: hdp-22-support
This commit is contained in:
parent
63561a9747
commit
b194572aa5
@ -8,6 +8,7 @@ clusters:
|
||||
node_processes:
|
||||
- Ambari
|
||||
- MapReduce History Server
|
||||
- Spark History Server
|
||||
- NameNode
|
||||
- ResourceManager
|
||||
- SecondaryNameNode
|
||||
@ -44,3 +45,4 @@ clusters:
|
||||
- run_jobs
|
||||
edp_jobs_flow:
|
||||
- java_job
|
||||
- spark_pi
|
@ -13,11 +13,32 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara import exceptions as exc
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins.ambari import common as p_common
|
||||
from sahara.plugins import exceptions as pex
|
||||
from sahara.plugins import utils as plugin_utils
|
||||
from sahara.service.edp import hdfs_helper
|
||||
from sahara.service.edp.oozie import engine as oozie_engine
|
||||
from sahara.service.edp.spark import engine as spark_engine
|
||||
|
||||
|
||||
def _get_lib_location(instance, lib_name):
|
||||
with instance.remote() as r:
|
||||
code, jar_path = r.execute_command(
|
||||
('find /usr/hdp -name "{lib_name}" 2>/dev/null '
|
||||
'-print | head -n 1'.format(lib_name=lib_name)),
|
||||
run_as_root=True)
|
||||
# drop last whitespace character
|
||||
return jar_path.rstrip()
|
||||
|
||||
|
||||
def _get_hadoop_openstack_jar_location(instance):
|
||||
return _get_lib_location(instance, "hadoop-openstack*.jar")
|
||||
|
||||
|
||||
def _get_jackson_core(instance):
|
||||
return _get_lib_location(instance, "jackson-core-asl-1.9*.jar")
|
||||
|
||||
|
||||
class EDPOozieEngine(oozie_engine.OozieJobEngine):
|
||||
@ -54,3 +75,45 @@ class EDPOozieEngine(oozie_engine.OozieJobEngine):
|
||||
@staticmethod
|
||||
def get_possible_job_config(job_type):
|
||||
return {"job_config": []}
|
||||
|
||||
|
||||
class EDPSparkEngine(spark_engine.SparkJobEngine):
|
||||
edp_base_version = "2.2"
|
||||
|
||||
def __init__(self, cluster):
|
||||
super(EDPSparkEngine, self).__init__(cluster)
|
||||
# searching for spark instance
|
||||
self.master = plugin_utils.get_instance(
|
||||
cluster, p_common.SPARK_JOBHISTORYSERVER)
|
||||
self.plugin_params["spark-user"] = "sudo -u spark "
|
||||
self.plugin_params["spark-submit"] = "spark-submit"
|
||||
self.plugin_params["deploy-mode"] = "cluster"
|
||||
self.plugin_params["master"] = "yarn-cluster"
|
||||
|
||||
@staticmethod
|
||||
def edp_supported(version):
|
||||
return version >= EDPSparkEngine.edp_base_version
|
||||
|
||||
def run_job(self, job_execution):
|
||||
# calculate class-path dynamically
|
||||
driver_classpath = [
|
||||
_get_hadoop_openstack_jar_location(self.master),
|
||||
_get_jackson_core(self.master)]
|
||||
self.plugin_params['driver-class-path'] = ":".join(driver_classpath)
|
||||
return super(EDPSparkEngine, self).run_job(job_execution)
|
||||
|
||||
def validate_job_execution(self, cluster, job, data):
|
||||
if not self.edp_supported(cluster.hadoop_version):
|
||||
raise exc.InvalidDataException(
|
||||
_('Ambari plugin of {base} or higher required to run {type} '
|
||||
'jobs').format(
|
||||
base=EDPSparkEngine.edp_base_version, type=job.type))
|
||||
|
||||
spark_nodes_count = plugin_utils.get_instances_count(
|
||||
cluster, p_common.SPARK_JOBHISTORYSERVER)
|
||||
if spark_nodes_count != 1:
|
||||
raise pex.InvalidComponentCountException(
|
||||
p_common.SPARK_JOBHISTORYSERVER, '1', spark_nodes_count)
|
||||
|
||||
super(EDPSparkEngine, self).validate_job_execution(
|
||||
cluster, job, data)
|
||||
|
@ -176,6 +176,8 @@ class AmbariPluginProvider(p.ProvisioningPluginBase):
|
||||
pass
|
||||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
if job_type in edp_engine.EDPSparkEngine.get_supported_job_types():
|
||||
return edp_engine.EDPSparkEngine(cluster)
|
||||
if job_type in edp_engine.EDPOozieEngine.get_supported_job_types():
|
||||
return edp_engine.EDPOozieEngine(cluster)
|
||||
return None
|
||||
@ -185,10 +187,14 @@ class AmbariPluginProvider(p.ProvisioningPluginBase):
|
||||
for version in self.get_versions():
|
||||
if not versions or version in versions:
|
||||
oozie_engine = edp_engine.EDPOozieEngine
|
||||
res[version] = oozie_engine.get_supported_job_types()
|
||||
spark_engine = edp_engine.EDPSparkEngine
|
||||
res[version] = (oozie_engine.get_supported_job_types() +
|
||||
spark_engine.get_supported_job_types())
|
||||
return res
|
||||
|
||||
def get_edp_config_hints(self, job_type, version):
|
||||
if job_type in edp_engine.EDPSparkEngine.get_supported_job_types():
|
||||
return edp_engine.EDPSparkEngine.get_possible_job_config(job_type)
|
||||
if job_type in edp_engine.EDPOozieEngine.get_supported_job_types():
|
||||
return edp_engine.EDPOozieEngine.get_possible_job_config(job_type)
|
||||
|
||||
|
@ -26,14 +26,14 @@ class TestPlugin(test_base.SaharaTestCase):
|
||||
self.assertEqual({
|
||||
'2.2': [
|
||||
'Hive', 'Java', 'MapReduce', 'MapReduce.Streaming',
|
||||
'Pig', 'Shell'],
|
||||
'Pig', 'Shell', 'Spark'],
|
||||
'2.3': [
|
||||
'Hive', 'Java', 'MapReduce', 'MapReduce.Streaming',
|
||||
'Pig', 'Shell']
|
||||
'Pig', 'Shell', 'Spark']
|
||||
}, self.plugin.get_edp_job_types())
|
||||
|
||||
self.assertEqual({
|
||||
'2.3': [
|
||||
'Hive', 'Java', 'MapReduce', 'MapReduce.Streaming',
|
||||
'Pig', 'Shell'],
|
||||
'Pig', 'Shell', 'Spark'],
|
||||
}, self.plugin.get_edp_job_types(versions=['2.3']))
|
||||
|
Loading…
Reference in New Issue
Block a user