Moved validate_edp from plugin SPI to edp_engine

Now EDP engine is fully responsible on validation of data for
job execution.

Other changes:
* Removed API calls from validation to remove circular dependancy
* Removed plugins patching in validation to allow non-vanilla
  plugins testing
* Renamed job_executor to job_execution

Change-Id: I14c86f33b355cb4317e96a70109d8d72d52d3c00
Closes-Bug: #1357512
This commit is contained in:
Andrew Lazarev 2014-08-20 16:51:11 -07:00
parent eeb3e34407
commit e55238a881
21 changed files with 290 additions and 228 deletions

View File

@ -53,14 +53,6 @@ Validates a given cluster object. Raises *SaharaException* with meaningful messa
*Example exception*: <NotSingleNameNodeException {code='NOT_SINGLE_NAME_NODE', message='Hadoop cluster should contain only 1 NameNode instance. Actual NN count is 2' }>
validate_edp(cluster)
~~~~~~~~~~~~~~~~~~~~~
Validates that given cluster can be used to run EDP jobs. In case of
incompatibility raises *SaharaException* with meaningful message.
*Returns*: None
validate_scaling(cluster, existing, additional)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -20,7 +20,7 @@ from sahara.service.validations.edp import data_source as v_d_s
from sahara.service.validations.edp import job as v_j
from sahara.service.validations.edp import job_binary as v_j_b
from sahara.service.validations.edp import job_binary_internal as v_j_b_i
from sahara.service.validations.edp import job_executor as v_j_e
from sahara.service.validations.edp import job_execution as v_j_e
import sahara.utils.api as u
@ -33,7 +33,7 @@ rest = u.Rest('v11', __name__)
@rest.post('/jobs/<job_id>/execute')
@v.check_exists(api.get_job, id='job_id')
@v.validate(v_j_e.JOB_EXEC_SCHEMA, v_j_e.check_job_executor)
@v.validate(v_j_e.JOB_EXEC_SCHEMA, v_j_e.check_job_execution)
def job_execute(job_id, data):
return u.render(job_execution=api.execute_job(job_id, data).to_dict())

View File

@ -14,6 +14,8 @@
# limitations under the License.
from sahara.plugins.cdh import utils as cu
from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils as u
from sahara.service.edp import hdfs_helper
from sahara.service.edp.oozie import engine as edp_engine
@ -40,3 +42,11 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
def get_oozie_server(self, cluster):
return cu.get_oozie(cluster)
def validate_job_execution(self, cluster, job, data):
oo_count = u.get_instances_count(cluster, 'OOZIE_SERVER')
if oo_count != 1:
raise ex.InvalidComponentCountException(
'OOZIE_SERVER', '1', oo_count)
super(EdpOozieEngine, self).validate_job_execution(cluster, job, data)

View File

@ -21,7 +21,6 @@ from sahara.i18n import _
from sahara.i18n import _LI
from sahara.openstack.common import log as logging
from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils as u
from sahara.plugins.hdp import hadoopserver as h
from sahara.plugins.hdp import saharautils as utils
from sahara.plugins.hdp.versions import versionhandlerfactory as vhf
@ -142,12 +141,6 @@ class AmbariPlugin(p.ProvisioningPluginBase):
self.version_factory.get_version_handler(cluster.hadoop_version))
return version_handler.get_edp_engine(cluster, job_type)
def validate_edp(self, cluster):
oo_count = u.get_instances_count(cluster, 'OOZIE_SERVER')
if oo_count != 1:
raise ex.InvalidComponentCountException(
'OOZIE_SERVER', '1', oo_count)
def update_infra(self, cluster):
pass

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils as u
from sahara.service.edp.oozie import engine as edp_engine
@ -30,3 +31,11 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
def get_oozie_server(self, cluster):
return u.get_instance(cluster, "OOZIE_SERVER")
def validate_job_execution(self, cluster, job, data):
oo_count = u.get_instances_count(cluster, 'OOZIE_SERVER')
if oo_count != 1:
raise ex.InvalidComponentCountException(
'OOZIE_SERVER', '1', oo_count)
super(EdpOozieEngine, self).validate_job_execution(cluster, job, data)

View File

@ -61,10 +61,6 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def scale_cluster(self, cluster, instances):
pass
@plugins_base.optional
def validate_edp(self, cluster):
pass
@plugins_base.optional
def get_edp_engine(self, cluster, job_type):
pass

View File

@ -13,8 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.service.edp.spark import engine as edp_engine
class EdpEngine(edp_engine.SparkJobEngine):
pass
def validate_job_execution(self, cluster, job, data):
if cluster.hadoop_version < "1.0.0":
raise ex.InvalidDataException(
_('Spark 1.0.0 or higher required to run spark %s jobs')
% job.type)
super(EdpEngine, self).validate_job_execution(cluster, job, data)

View File

@ -420,9 +420,6 @@ class SparkProvider(p.ProvisioningPluginBase):
rep_factor)
def get_edp_engine(self, cluster, job_type):
if cluster.hadoop_version < "1.0.0":
return
if job_type in edp_engine.EdpEngine.get_supported_job_types():
return edp_engine.EdpEngine(cluster)

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils as u
from sahara.plugins.vanilla import utils as vu
from sahara.service.edp.oozie import engine as edp_engine
@ -29,3 +31,10 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
def get_oozie_server(self, cluster):
return vu.get_oozie(cluster)
def validate_job_execution(self, cluster, job, data):
oo_count = u.get_instances_count(cluster, 'oozie')
if oo_count != 1:
raise ex.InvalidComponentCountException('oozie', '1', oo_count)
super(EdpOozieEngine, self).validate_job_execution(cluster, job, data)

View File

@ -14,8 +14,6 @@
# limitations under the License.
from sahara.i18n import _
from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils as u
from sahara.plugins import provisioning as p
from sahara.plugins.vanilla import versionfactory as vhf
@ -70,11 +68,6 @@ class VanillaProvider(p.ProvisioningPluginBase):
cluster.hadoop_version).validate_scaling(cluster, existing,
additional)
def validate_edp(self, cluster):
oo_count = u.get_instances_count(cluster, 'oozie')
if oo_count != 1:
raise ex.InvalidComponentCountException('oozie', '1', oo_count)
def get_edp_engine(self, cluster, job_type):
return self._get_version_handler(
cluster.hadoop_version).get_edp_engine(cluster, job_type)

View File

@ -37,6 +37,10 @@ class JobEngine(object):
def run_job(self, job_execution):
pass
@abc.abstractmethod
def validate_job_execution(self, cluster, job, data):
pass
@staticmethod
@abc.abstractmethod
def get_possible_job_config(job_type):

View File

@ -27,6 +27,7 @@ from sahara.service.edp import hdfs_helper as h
from sahara.service.edp import job_utils
from sahara.service.edp.oozie import oozie as o
from sahara.service.edp.oozie.workflow_creator import workflow_factory
from sahara.service.validations.edp import job_execution as j
from sahara.utils import edp
from sahara.utils import remote
from sahara.utils import xmlutils as x
@ -140,6 +141,19 @@ class OozieJobEngine(base_engine.JobEngine):
def get_resource_manager_uri(self, cluster):
pass
def validate_job_execution(self, cluster, job, data):
# All types except Java require input and output objects
# and Java require main class
if job.type in [edp.JOB_TYPE_JAVA]:
j.check_main_class_present(data, job)
else:
j.check_data_sources(data, job)
job_type, subtype = edp.split_job_type(job.type)
if job_type == edp.JOB_TYPE_MAPREDUCE and (
subtype == edp.JOB_SUBTYPE_STREAMING):
j.check_streaming_present(data, job)
@staticmethod
def get_possible_job_config(job_type):
return workflow_factory.get_possible_job_config(job_type)

View File

@ -22,6 +22,7 @@ from sahara.plugins.general import utils as plugin_utils
from sahara.plugins.spark import config_helper as c_helper
from sahara.service.edp import base_engine
from sahara.service.edp import job_utils
from sahara.service.validations.edp import job_execution as j
from sahara.utils import edp
from sahara.utils import files
from sahara.utils import general
@ -185,6 +186,9 @@ class SparkJobEngine(base_engine.JobEngine):
raise e.EDPError("Spark job execution failed. Exit status = %s, "
"stdout = %s" % (ret, stdout))
def validate_job_execution(self, cluster, job, data):
j.check_main_class_present(data, job)
@staticmethod
def get_possible_job_config(job_type):
return {'job_config': {'configs': [], 'args': []}}

View File

@ -217,12 +217,6 @@ def check_heat_stack_name(cluster_name):
% cluster_name)
def check_cluster_exists(id):
if not api.get_cluster(id):
raise ex.InvalidException(
_("Cluster with id '%s' doesn't exist") % id)
def check_cluster_hostnames_lengths(cluster_name, node_groups):
for ng in node_groups:
longest_hostname = g.generate_instance_name(cluster_name,
@ -367,11 +361,3 @@ def check_required_image_tags(plugin_name, hadoop_version, image_id):
" tags ['%(name)s', '%(version)s']")
% {'image': image_id, 'name': plugin_name,
'version': hadoop_version})
# EDP
def check_edp_job_support(cluster_id):
cluster = api.get_cluster(cluster_id)
plugin_base.PLUGINS.get_plugin(cluster.plugin_name).validate_edp(cluster)

View File

@ -15,9 +15,11 @@
"""Cluster creation related checks"""
import sahara.exceptions as ex
import sahara.service.edp.api as api
from sahara import conductor as c
from sahara import context
from sahara import exceptions as ex
conductor = c.API
data_source_type = {
"type": "string",
@ -45,32 +47,33 @@ job_configs = {
def check_data_source_unique_name(name):
if name in [ds.name for ds in api.get_data_sources()]:
if name in [ds.name for ds in conductor.data_source_get_all(
context.ctx())]:
raise ex.NameAlreadyExistsException("Data source with name '%s' "
"already exists" % name)
def check_data_source_exists(data_source_id):
if not api.get_data_source(data_source_id):
if not conductor.data_source_get(context.ctx(), data_source_id):
raise ex.InvalidException("DataSource with id '%s'"
" doesn't exist" % data_source_id)
def check_job_unique_name(name):
if name in [j.name for j in api.get_jobs()]:
if name in [j.name for j in conductor.job_get_all(context.ctx())]:
raise ex.NameAlreadyExistsException("Job with name '%s' "
"already exists" % name)
def check_job_binary_internal_exists(jbi_id):
if not api.get_job_binary_internal(jbi_id):
if not conductor.job_binary_internal_get(context.ctx(), jbi_id):
raise ex.InvalidException("JobBinaryInternal with id '%s'"
" doesn't exist" % jbi_id)
def check_data_sources_are_different(data_source_1_id, data_source_2_id):
ds1 = api.get_data_source(data_source_1_id)
ds2 = api.get_data_source(data_source_2_id)
ds1 = conductor.data_source_get(context.ctx(), data_source_1_id)
ds2 = conductor.data_source_get(context.ctx(), data_source_2_id)
if ds1.type == ds2.type and ds1.url == ds2.url:
raise ex.InvalidDataException('Provided input and output '

View File

@ -0,0 +1,106 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara import conductor as c
from sahara import context
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.plugins import base as plugin_base
import sahara.service.validations.edp.base as b
JOB_EXEC_SCHEMA = {
"type": "object",
"properties": {
"input_id": {
"type": "string",
"format": "uuid",
},
"output_id": {
"type": "string",
"format": "uuid",
},
"cluster_id": {
"type": "string",
"format": "uuid",
},
"job_configs": b.job_configs,
},
"additionalProperties": False,
"required": [
"cluster_id"
]
}
conductor = c.API
def _is_main_class_present(data):
return data and 'edp.java.main_class' in data.get('job_configs',
{}).get('configs', {})
def check_main_class_present(data, job):
if not _is_main_class_present(data):
raise ex.InvalidDataException(
_('%s job must specify edp.java.main_class') % job.type)
def _streaming_present(data):
try:
streaming = set(('edp.streaming.mapper',
'edp.streaming.reducer'))
configs = set(data['job_configs']['configs'])
return streaming.intersection(configs) == streaming
except Exception:
return False
def check_streaming_present(data, job):
if not _streaming_present(data):
raise ex.InvalidDataException(
_("%s job must specify streaming mapper and reducer") % job.type)
def check_job_execution(data, job_id):
ctx = context.ctx()
cluster = conductor.cluster_get(ctx, data['cluster_id'])
if not cluster:
raise ex.InvalidException(
_("Cluster with id '%s' doesn't exist") % data['cluster_id'])
job = conductor.job_get(ctx, job_id)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
edp_engine = plugin.get_edp_engine(cluster, job.type)
if not edp_engine:
raise ex.InvalidException(
_("Cluster with id '%(cluster_id)s' doesn't support job type "
"'%(job_type)s'") % {"cluster_id": cluster.id,
"job_type": job.type})
edp_engine.validate_job_execution(cluster, job, data)
def check_data_sources(data, job):
if not ('input_id' in data and 'output_id' in data):
raise ex.InvalidDataException(_("%s job requires 'input_id' "
"and 'output_id'") % job.type)
b.check_data_source_exists(data['input_id'])
b.check_data_source_exists(data['output_id'])
b.check_data_sources_are_different(data['input_id'], data['output_id'])

View File

@ -1,92 +0,0 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sahara.exceptions as ex
from sahara.service.edp import api
import sahara.service.validations.base as main_base
import sahara.service.validations.edp.base as b
from sahara.utils import edp
JOB_EXEC_SCHEMA = {
"type": "object",
"properties": {
"input_id": {
"type": "string",
"format": "uuid",
},
"output_id": {
"type": "string",
"format": "uuid",
},
"cluster_id": {
"type": "string",
"format": "uuid",
},
"job_configs": b.job_configs,
},
"additionalProperties": False,
"required": [
"cluster_id"
]
}
def _is_main_class_present(data):
return data and 'edp.java.main_class' in data.get('job_configs',
{}).get('configs', {})
def _streaming_present(data):
try:
streaming = set(('edp.streaming.mapper',
'edp.streaming.reducer'))
configs = set(data['job_configs']['configs'])
return streaming.intersection(configs) == streaming
except Exception:
return False
def check_job_executor(data, job_id):
job = api.get_job(job_id)
job_type, subtype = edp.split_job_type(job.type)
# Check if cluster contains Oozie service to run job
main_base.check_edp_job_support(data['cluster_id'])
# All types except Java/Spark require input and output objects
# and Java/Spark require main class
if job_type in [edp.JOB_TYPE_JAVA, edp.JOB_TYPE_SPARK]:
if not _is_main_class_present(data):
raise ex.InvalidDataException('%s job must '
'specify edp.java.main_class'
% job.type)
else:
if not ('input_id' in data and 'output_id' in data):
raise ex.InvalidDataException("%s job requires 'input_id' "
"and 'output_id'" % job.type)
b.check_data_source_exists(data['input_id'])
b.check_data_source_exists(data['output_id'])
b.check_data_sources_are_different(data['input_id'], data['output_id'])
if job_type == edp.JOB_TYPE_MAPREDUCE and (
subtype == edp.JOB_SUBTYPE_STREAMING
and not _streaming_present(data)):
raise ex.InvalidDataException("%s job "
"must specify streaming mapper "
"and reducer" % job.type)
main_base.check_cluster_exists(data['cluster_id'])

View File

@ -13,8 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
from sahara import conductor as cond
from sahara import context
from sahara import exceptions as ex
from sahara.plugins import base as pb
from sahara.service.edp.spark import engine
from sahara.tests.unit import base
@ -30,16 +34,24 @@ class SparkPluginTest(base.SaharaWithDbTestCase):
self.override_config("plugins", ["spark"])
pb.setup_plugins()
def test_plugin09_no_edp_engine(self):
def test_plugin09_edp_engine_validation(self):
cluster_dict = {
'name': 'cluster',
'plugin_name': 'spark',
'hadoop_version': '0.9.1',
'default_image_id': 'image'}
job = mock.Mock()
job.type = edp.JOB_TYPE_SPARK
cluster = conductor.cluster_create(context.ctx(), cluster_dict)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
self.assertIsNone(plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK))
edp_engine = plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK)
with testtools.ExpectedException(
ex.InvalidDataException,
value_re="Spark 1.0.0 or higher required to run "
"spark Spark jobs"):
edp_engine.validate_job_execution(cluster, job, mock.Mock())
def test_plugin10_edp_engine(self):
cluster_dict = {

View File

@ -20,20 +20,14 @@ import six
from sahara import main
from sahara.service import api
from sahara.service.validations import base as validation_base
from sahara.service.validations.edp import job_executor as je
from sahara.service.validations.edp import job_execution as je
from sahara.tests.unit.service.validation import utils as u
from sahara.tests.unit import testutils as tu
from sahara.utils import edp
def wrap_it(data):
je.check_job_executor(data, 0)
class FakeJob(object):
type = edp.JOB_TYPE_MAPREDUCE_STREAMING
libs = []
je.check_job_execution(data, 0)
class TestJobExecValidation(u.ValidationTestCase):
@ -47,24 +41,33 @@ class TestJobExecValidation(u.ValidationTestCase):
self.override_config('plugins', main.CONF['plugins'] + ['spark'])
api.plugin_base.setup_plugins()
@mock.patch('sahara.service.validations.edp.base.'
'check_data_sources_are_different', lambda x, y: None)
@mock.patch('sahara.service.validations.base.check_cluster_exists',
lambda x: None)
@mock.patch('sahara.service.validations.base.check_edp_job_support')
@mock.patch('sahara.service.validations'
'.edp.base.check_data_source_exists')
@mock.patch('sahara.service.edp.api.get_job')
def test_streaming(self, get_job, check_data, check_cluster):
check_cluster.return_value = True
check_data.return_value = True
get_job.return_value = FakeJob()
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
@mock.patch('sahara.conductor.api.LocalApi.data_source_get')
@mock.patch('sahara.conductor.api.LocalApi.job_get')
def test_streaming(self, get_job, get_data_source, get_cluster):
get_job.return_value = mock.Mock(
type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[])
ds1_id = six.text_type(uuid.uuid4())
ds2_id = six.text_type(uuid.uuid4())
data_sources = {
ds1_id: mock.Mock(type="swift", url="http://swift/test"),
ds2_id: mock.Mock(type="swift", url="http://swift/test2"),
}
get_data_source.side_effect = lambda ctx, x: data_sources[x]
ng = tu.make_ng_dict('master', 42, ['oozie'], 1,
instances=[tu.make_inst_dict('id', 'name')])
get_cluster.return_value = tu.create_cluster("cluster", "tenant1",
"vanilla", "1.2.1", [ng])
self._assert_create_object_validation(
data={
"cluster_id": six.text_type(uuid.uuid4()),
"input_id": six.text_type(uuid.uuid4()),
"output_id": six.text_type(uuid.uuid4()),
"input_id": ds1_id,
"output_id": ds2_id,
"job_configs": {"configs": {},
"params": {},
"args": []}
@ -77,8 +80,8 @@ class TestJobExecValidation(u.ValidationTestCase):
self._assert_create_object_validation(
data={
"cluster_id": six.text_type(uuid.uuid4()),
"input_id": six.text_type(uuid.uuid4()),
"output_id": six.text_type(uuid.uuid4()),
"input_id": ds1_id,
"output_id": ds2_id,
"job_configs": {
"configs": {
"edp.streaming.mapper": "/bin/cat",
@ -87,14 +90,12 @@ class TestJobExecValidation(u.ValidationTestCase):
"args": []}
})
@mock.patch('sahara.service.validations.base.check_cluster_exists',
lambda x: None)
@mock.patch('sahara.service.validations.base.check_edp_job_support',
lambda x: None)
@mock.patch('sahara.service.edp.api.get_data_source')
@mock.patch('sahara.service.edp.api.get_job')
def test_data_sources_differ(self, get_job, get_data_source):
get_job.return_value = FakeJob()
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
@mock.patch('sahara.conductor.api.LocalApi.data_source_get')
@mock.patch('sahara.conductor.api.LocalApi.job_get')
def test_data_sources_differ(self, get_job, get_data_source, get_cluster):
get_job.return_value = mock.Mock(
type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[])
ds1_id = six.text_type(uuid.uuid4())
ds2_id = six.text_type(uuid.uuid4())
@ -104,7 +105,12 @@ class TestJobExecValidation(u.ValidationTestCase):
ds2_id: mock.Mock(type="swift", url="http://swift/test2"),
}
get_data_source.side_effect = lambda x: data_sources[x]
get_data_source.side_effect = lambda ctx, x: data_sources[x]
ng = tu.make_ng_dict('master', 42, ['oozie'], 1,
instances=[tu.make_inst_dict('id', 'name')])
get_cluster.return_value = tu.create_cluster("cluster", "tenant1",
"vanilla", "1.2.1", [ng])
self._assert_create_object_validation(
data={
@ -138,10 +144,16 @@ class TestJobExecValidation(u.ValidationTestCase):
},
bad_req_i=(1, "INVALID_DATA", err_msg))
@mock.patch('sahara.service.api.get_cluster')
@mock.patch('sahara.service.edp.api.get_job')
def test_check_edp_job_support(self, get_job, get_cluster):
get_job.return_value = FakeJob()
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
@mock.patch('sahara.conductor.api.LocalApi.job_get')
def test_check_edp_no_oozie(self, get_job, get_cluster):
get_job.return_value = mock.Mock(type=edp.JOB_TYPE_PIG, libs=[])
ng = tu.make_ng_dict('master', 42, ['namenode'], 1,
instances=[tu.make_inst_dict('id', 'name')])
get_cluster.return_value = tu.create_cluster("cluster", "tenant1",
"vanilla", "1.2.1", [ng])
self._assert_create_object_validation(
data={
"cluster_id": six.text_type(uuid.uuid4()),
@ -152,14 +164,8 @@ class TestJobExecValidation(u.ValidationTestCase):
"Hadoop cluster should contain 1 oozie component(s). "
"Actual oozie count is 0"))
ng = tu.make_ng_dict('master', 42, ['oozie'], 1,
instances=[tu.make_inst_dict('id', 'name')])
get_cluster.return_value = tu.create_cluster("cluster", "tenant1",
"vanilla", "1.2.1", [ng])
validation_base.check_edp_job_support('some_id')
@mock.patch('sahara.service.api.get_cluster')
@mock.patch('sahara.service.edp.api.get_job')
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
@mock.patch('sahara.conductor.api.LocalApi.job_get')
def test_check_edp_job_support_spark(self, get_job, get_cluster):
# utils.start_patch will construct a vanilla cluster as a
# default for get_cluster, but we want a Spark cluster.
@ -167,9 +173,7 @@ class TestJobExecValidation(u.ValidationTestCase):
# Note that this means we cannot use assert_create_object_validation()
# because it calls start_patch() and will override our setting
job = mock.Mock()
job.type = edp.JOB_TYPE_SPARK
job.mains = ["main"]
job = mock.Mock(type=edp.JOB_TYPE_SPARK, mains=["main"])
get_job.return_value = job
ng = tu.make_ng_dict('master', 42, [], 1,
instances=[tu.make_inst_dict('id', 'name')])
@ -183,17 +187,15 @@ class TestJobExecValidation(u.ValidationTestCase):
"configs": {
"edp.java.main_class": "org.me.class"}}})
@mock.patch('sahara.service.validations.base.check_edp_job_support')
@mock.patch('sahara.service.validations.base.check_cluster_exists')
@mock.patch('sahara.service.edp.api.get_job')
def _test_edp_main_class(self, job_type,
get_job,
check_cluster_exists,
check_edp_job_support):
check_cluster_exists.return_value = True
check_edp_job_support.return_value = None
get_job.return_value = mock.Mock()
get_job.return_value.type = job_type
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
@mock.patch('sahara.conductor.api.LocalApi.job_get')
def test_edp_main_class_java(self, job_get, cluster_get):
job_get.return_value = mock.Mock()
job_get.return_value.type = edp.JOB_TYPE_JAVA
ng = tu.make_ng_dict('master', 42, ['namenode', 'oozie'], 1,
instances=[tu.make_inst_dict('id', 'name')])
cluster_get.return_value = tu.create_cluster("cluster", "tenant1",
"vanilla", "1.2.1", [ng])
self._assert_create_object_validation(
data={
@ -204,7 +206,7 @@ class TestJobExecValidation(u.ValidationTestCase):
},
bad_req_i=(1, "INVALID_DATA",
"%s job must "
"specify edp.java.main_class" % job_type))
"specify edp.java.main_class" % edp.JOB_TYPE_JAVA))
self._assert_create_object_validation(
data={
@ -216,6 +218,32 @@ class TestJobExecValidation(u.ValidationTestCase):
"args": []}
})
def test_edp_main_class(self):
for job_type in (edp.JOB_TYPE_JAVA, edp.JOB_TYPE_SPARK):
self._test_edp_main_class(job_type)
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
@mock.patch('sahara.conductor.api.LocalApi.job_get')
def test_edp_main_class_spark(self, job_get, cluster_get):
job_get.return_value = mock.Mock(type=edp.JOB_TYPE_SPARK)
ng = tu.make_ng_dict('master', 42, ['namenode'], 1,
instances=[tu.make_inst_dict('id', 'name')])
cluster_get.return_value = tu.create_cluster("cluster", "tenant1",
"spark", "1.0.0", [ng])
self._assert_create_object_validation(
data={
"cluster_id": six.text_type(uuid.uuid4()),
"job_configs": {"configs": {},
"params": {},
"args": []}
},
bad_req_i=(1, "INVALID_DATA",
"%s job must "
"specify edp.java.main_class" % edp.JOB_TYPE_SPARK))
self._assert_create_object_validation(
data={
"cluster_id": six.text_type(uuid.uuid4()),
"job_configs": {
"configs": {
"edp.java.main_class": "org.me.myclass"},
"params": {},
"args": []}
})

View File

@ -113,9 +113,6 @@ def start_patch(patch_templates=True):
"sahara.service.api.get_node_group_templates")
get_ng_template_p = mock.patch(
"sahara.service.api.get_node_group_template")
get_plugins_p = mock.patch("sahara.service.api.get_plugins")
get_plugin_p = mock.patch(
"sahara.plugins.base.PluginManager.get_plugin")
if patch_templates:
get_cl_templates_p = mock.patch(
"sahara.service.api.get_cluster_templates")
@ -132,8 +129,6 @@ def start_patch(patch_templates=True):
if patch_templates:
get_ng_templates = get_ng_templates_p.start()
get_ng_template = get_ng_template_p.start()
get_plugins = get_plugins_p.start()
get_plugin = get_plugin_p.start()
if patch_templates:
get_cl_templates = get_cl_templates_p.start()
get_cl_template_p.start()
@ -211,26 +206,16 @@ def start_patch(patch_templates=True):
get_cl_templates.return_value = [r.ClusterTemplateResource(ct_dict)]
vanilla = plugin.VanillaProvider()
vanilla.name = 'vanilla'
get_plugins.return_value = [vanilla]
def _get_ng_template(id):
for template in get_ng_templates():
if template.id == id:
return template
return None
def _get_plugin(name):
if name == 'vanilla':
return vanilla
return None
get_plugin.side_effect = _get_plugin
if patch_templates:
get_ng_template.side_effect = _get_ng_template
# request data to validate
patchers = [get_clusters_p, get_cluster_p, get_plugins_p, get_plugin_p,
patchers = [get_clusters_p, get_cluster_p,
nova_p, keystone_p, get_image_p, heat_p]
if patch_templates:
patchers.extend([get_ng_template_p, get_ng_templates_p,

View File

@ -14,11 +14,16 @@
# limitations under the License.
import uuid
import six
from sahara.conductor import resource as r
def create_cluster(name, tenant, plugin, version, node_groups, **kwargs):
dct = {'name': name, 'tenant_id': tenant, 'plugin_name': plugin,
dct = {'id': six.text_type(uuid.uuid4()), 'name': name,
'tenant_id': tenant, 'plugin_name': plugin,
'hadoop_version': version, 'node_groups': node_groups,
'cluster_configs': {}}
dct.update(kwargs)