Implement job-types endpoint support methods for Spark plugin

This change implements the optional methods in the Plugins SPI
to support the job-types endpoint for the Spark plugin.

Config hints at this point are unchanged. Additional work may be
needed to provide config-hints specific to Spark plugin versions.

Partial-Implements: blueprint edp-job-types-endpoint
Change-Id: I1cd318da11c997119b192e7396969f89d8f0f216
This commit is contained in:
Trevor McKay 2015-03-17 17:57:34 -04:00
parent 456c559c7d
commit a7adef4708
3 changed files with 24 additions and 4 deletions

View File

@ -19,10 +19,17 @@ from sahara.service.edp.spark import engine as edp_engine
class EdpEngine(edp_engine.SparkJobEngine):
edp_base_version = "1.0.0"
@staticmethod
def edp_supported(version):
return version >= EdpEngine.edp_base_version
def validate_job_execution(self, cluster, job, data):
if cluster.hadoop_version < "1.0.0":
if not self.edp_supported(cluster.hadoop_version):
raise ex.InvalidDataException(
_('Spark 1.0.0 or higher required to run spark %s jobs')
% job.type)
_('Spark {base} or higher required to run {type} jobs').format(
base=EdpEngine.edp_base_version, type=job.type))
super(EdpEngine, self).validate_job_execution(cluster, job, data)

View File

@ -489,6 +489,19 @@ class SparkProvider(p.ProvisioningPluginBase):
return None
def get_edp_job_types(self, versions=[]):
res = {}
for vers in self.get_versions():
if not versions or vers in versions:
if edp_engine.EdpEngine.edp_supported(vers):
res[vers] = edp_engine.EdpEngine.get_supported_job_types()
return res
def get_edp_config_hints(self, job_type, version):
if edp_engine.EdpEngine.edp_supported(version):
return edp_engine.EdpEngine.get_possible_job_config(job_type)
return {}
def get_open_ports(self, node_group):
cluster = node_group.cluster
ports_map = {

View File

@ -50,7 +50,7 @@ class SparkPluginTest(base.SaharaWithDbTestCase):
with testtools.ExpectedException(
ex.InvalidDataException,
value_re="Spark 1.0.0 or higher required to run "
"spark Spark jobs\nError ID: .*"):
"Spark jobs\nError ID: .*"):
edp_engine.validate_job_execution(cluster, job, mock.Mock())
def test_plugin10_edp_engine(self):