Merge "Moved validate_edp from plugin SPI to edp_engine"
This commit is contained in:
commit
ed4e658522
@ -53,14 +53,6 @@ Validates a given cluster object. Raises *SaharaException* with meaningful messa
|
||||
|
||||
*Example exception*: <NotSingleNameNodeException {code='NOT_SINGLE_NAME_NODE', message='Hadoop cluster should contain only 1 NameNode instance. Actual NN count is 2' }>
|
||||
|
||||
validate_edp(cluster)
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Validates that given cluster can be used to run EDP jobs. In case of
|
||||
incompatibility raises *SaharaException* with meaningful message.
|
||||
|
||||
*Returns*: None
|
||||
|
||||
validate_scaling(cluster, existing, additional)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
@ -20,7 +20,7 @@ from sahara.service.validations.edp import data_source as v_d_s
|
||||
from sahara.service.validations.edp import job as v_j
|
||||
from sahara.service.validations.edp import job_binary as v_j_b
|
||||
from sahara.service.validations.edp import job_binary_internal as v_j_b_i
|
||||
from sahara.service.validations.edp import job_executor as v_j_e
|
||||
from sahara.service.validations.edp import job_execution as v_j_e
|
||||
import sahara.utils.api as u
|
||||
|
||||
|
||||
@ -33,7 +33,7 @@ rest = u.Rest('v11', __name__)
|
||||
|
||||
@rest.post('/jobs/<job_id>/execute')
|
||||
@v.check_exists(api.get_job, id='job_id')
|
||||
@v.validate(v_j_e.JOB_EXEC_SCHEMA, v_j_e.check_job_executor)
|
||||
@v.validate(v_j_e.JOB_EXEC_SCHEMA, v_j_e.check_job_execution)
|
||||
def job_execute(job_id, data):
|
||||
return u.render(job_execution=api.execute_job(job_id, data).to_dict())
|
||||
|
||||
|
@ -14,6 +14,8 @@
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.plugins.cdh import utils as cu
|
||||
from sahara.plugins.general import exceptions as ex
|
||||
from sahara.plugins.general import utils as u
|
||||
from sahara.service.edp import hdfs_helper
|
||||
from sahara.service.edp.oozie import engine as edp_engine
|
||||
|
||||
@ -40,3 +42,11 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return cu.get_oozie(cluster)
|
||||
|
||||
def validate_job_execution(self, cluster, job, data):
|
||||
oo_count = u.get_instances_count(cluster, 'OOZIE_SERVER')
|
||||
if oo_count != 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'OOZIE_SERVER', '1', oo_count)
|
||||
|
||||
super(EdpOozieEngine, self).validate_job_execution(cluster, job, data)
|
||||
|
@ -21,7 +21,6 @@ from sahara.i18n import _
|
||||
from sahara.i18n import _LI
|
||||
from sahara.openstack.common import log as logging
|
||||
from sahara.plugins.general import exceptions as ex
|
||||
from sahara.plugins.general import utils as u
|
||||
from sahara.plugins.hdp import hadoopserver as h
|
||||
from sahara.plugins.hdp import saharautils as utils
|
||||
from sahara.plugins.hdp.versions import versionhandlerfactory as vhf
|
||||
@ -142,12 +141,6 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
||||
self.version_factory.get_version_handler(cluster.hadoop_version))
|
||||
return version_handler.get_edp_engine(cluster, job_type)
|
||||
|
||||
def validate_edp(self, cluster):
|
||||
oo_count = u.get_instances_count(cluster, 'OOZIE_SERVER')
|
||||
if oo_count != 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'OOZIE_SERVER', '1', oo_count)
|
||||
|
||||
def update_infra(self, cluster):
|
||||
pass
|
||||
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.plugins.general import exceptions as ex
|
||||
from sahara.plugins.general import utils as u
|
||||
from sahara.service.edp.oozie import engine as edp_engine
|
||||
|
||||
@ -30,3 +31,11 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return u.get_instance(cluster, "OOZIE_SERVER")
|
||||
|
||||
def validate_job_execution(self, cluster, job, data):
|
||||
oo_count = u.get_instances_count(cluster, 'OOZIE_SERVER')
|
||||
if oo_count != 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'OOZIE_SERVER', '1', oo_count)
|
||||
|
||||
super(EdpOozieEngine, self).validate_job_execution(cluster, job, data)
|
||||
|
@ -61,10 +61,6 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
|
||||
def scale_cluster(self, cluster, instances):
|
||||
pass
|
||||
|
||||
@plugins_base.optional
|
||||
def validate_edp(self, cluster):
|
||||
pass
|
||||
|
||||
@plugins_base.optional
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
pass
|
||||
|
@ -13,8 +13,16 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara import exceptions as ex
|
||||
from sahara.i18n import _
|
||||
from sahara.service.edp.spark import engine as edp_engine
|
||||
|
||||
|
||||
class EdpEngine(edp_engine.SparkJobEngine):
|
||||
pass
|
||||
def validate_job_execution(self, cluster, job, data):
|
||||
if cluster.hadoop_version < "1.0.0":
|
||||
raise ex.InvalidDataException(
|
||||
_('Spark 1.0.0 or higher required to run spark %s jobs')
|
||||
% job.type)
|
||||
|
||||
super(EdpEngine, self).validate_job_execution(cluster, job, data)
|
||||
|
@ -420,9 +420,6 @@ class SparkProvider(p.ProvisioningPluginBase):
|
||||
rep_factor)
|
||||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
if cluster.hadoop_version < "1.0.0":
|
||||
return
|
||||
|
||||
if job_type in edp_engine.EdpEngine.get_supported_job_types():
|
||||
return edp_engine.EdpEngine(cluster)
|
||||
|
||||
|
@ -13,6 +13,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.plugins.general import exceptions as ex
|
||||
from sahara.plugins.general import utils as u
|
||||
from sahara.plugins.vanilla import utils as vu
|
||||
from sahara.service.edp.oozie import engine as edp_engine
|
||||
|
||||
@ -29,3 +31,10 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return vu.get_oozie(cluster)
|
||||
|
||||
def validate_job_execution(self, cluster, job, data):
|
||||
oo_count = u.get_instances_count(cluster, 'oozie')
|
||||
if oo_count != 1:
|
||||
raise ex.InvalidComponentCountException('oozie', '1', oo_count)
|
||||
|
||||
super(EdpOozieEngine, self).validate_job_execution(cluster, job, data)
|
||||
|
@ -14,8 +14,6 @@
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins.general import exceptions as ex
|
||||
from sahara.plugins.general import utils as u
|
||||
from sahara.plugins import provisioning as p
|
||||
from sahara.plugins.vanilla import versionfactory as vhf
|
||||
|
||||
@ -70,11 +68,6 @@ class VanillaProvider(p.ProvisioningPluginBase):
|
||||
cluster.hadoop_version).validate_scaling(cluster, existing,
|
||||
additional)
|
||||
|
||||
def validate_edp(self, cluster):
|
||||
oo_count = u.get_instances_count(cluster, 'oozie')
|
||||
if oo_count != 1:
|
||||
raise ex.InvalidComponentCountException('oozie', '1', oo_count)
|
||||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
return self._get_version_handler(
|
||||
cluster.hadoop_version).get_edp_engine(cluster, job_type)
|
||||
|
@ -37,6 +37,10 @@ class JobEngine(object):
|
||||
def run_job(self, job_execution):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def validate_job_execution(self, cluster, job, data):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
@abc.abstractmethod
|
||||
def get_possible_job_config(job_type):
|
||||
|
@ -27,6 +27,7 @@ from sahara.service.edp import hdfs_helper as h
|
||||
from sahara.service.edp import job_utils
|
||||
from sahara.service.edp.oozie import oozie as o
|
||||
from sahara.service.edp.oozie.workflow_creator import workflow_factory
|
||||
from sahara.service.validations.edp import job_execution as j
|
||||
from sahara.utils import edp
|
||||
from sahara.utils import remote
|
||||
from sahara.utils import xmlutils as x
|
||||
@ -142,6 +143,19 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
def get_resource_manager_uri(self, cluster):
|
||||
pass
|
||||
|
||||
def validate_job_execution(self, cluster, job, data):
|
||||
# All types except Java require input and output objects
|
||||
# and Java require main class
|
||||
if job.type in [edp.JOB_TYPE_JAVA]:
|
||||
j.check_main_class_present(data, job)
|
||||
else:
|
||||
j.check_data_sources(data, job)
|
||||
|
||||
job_type, subtype = edp.split_job_type(job.type)
|
||||
if job_type == edp.JOB_TYPE_MAPREDUCE and (
|
||||
subtype == edp.JOB_SUBTYPE_STREAMING):
|
||||
j.check_streaming_present(data, job)
|
||||
|
||||
@staticmethod
|
||||
def get_possible_job_config(job_type):
|
||||
return workflow_factory.get_possible_job_config(job_type)
|
||||
|
@ -23,6 +23,7 @@ from sahara.plugins.general import utils as plugin_utils
|
||||
from sahara.plugins.spark import config_helper as c_helper
|
||||
from sahara.service.edp import base_engine
|
||||
from sahara.service.edp import job_utils
|
||||
from sahara.service.validations.edp import job_execution as j
|
||||
from sahara.utils import edp
|
||||
from sahara.utils import files
|
||||
from sahara.utils import general
|
||||
@ -190,6 +191,9 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
"%(status)s, stdout = %(stdout)s") %
|
||||
{'status': ret, 'stdout': stdout})
|
||||
|
||||
def validate_job_execution(self, cluster, job, data):
|
||||
j.check_main_class_present(data, job)
|
||||
|
||||
@staticmethod
|
||||
def get_possible_job_config(job_type):
|
||||
return {'job_config': {'configs': [], 'args': []}}
|
||||
|
@ -220,12 +220,6 @@ def check_heat_stack_name(cluster_name):
|
||||
% cluster_name)
|
||||
|
||||
|
||||
def check_cluster_exists(id):
|
||||
if not api.get_cluster(id):
|
||||
raise ex.InvalidException(
|
||||
_("Cluster with id '%s' doesn't exist") % id)
|
||||
|
||||
|
||||
def check_cluster_hostnames_lengths(cluster_name, node_groups):
|
||||
for ng in node_groups:
|
||||
longest_hostname = g.generate_instance_name(cluster_name,
|
||||
@ -370,11 +364,3 @@ def check_required_image_tags(plugin_name, hadoop_version, image_id):
|
||||
" tags ['%(name)s', '%(version)s']")
|
||||
% {'image': image_id, 'name': plugin_name,
|
||||
'version': hadoop_version})
|
||||
|
||||
|
||||
# EDP
|
||||
|
||||
|
||||
def check_edp_job_support(cluster_id):
|
||||
cluster = api.get_cluster(cluster_id)
|
||||
plugin_base.PLUGINS.get_plugin(cluster.plugin_name).validate_edp(cluster)
|
||||
|
@ -15,9 +15,11 @@
|
||||
|
||||
"""Cluster creation related checks"""
|
||||
|
||||
import sahara.exceptions as ex
|
||||
import sahara.service.edp.api as api
|
||||
from sahara import conductor as c
|
||||
from sahara import context
|
||||
from sahara import exceptions as ex
|
||||
|
||||
conductor = c.API
|
||||
|
||||
data_source_type = {
|
||||
"type": "string",
|
||||
@ -45,32 +47,33 @@ job_configs = {
|
||||
|
||||
|
||||
def check_data_source_unique_name(name):
|
||||
if name in [ds.name for ds in api.get_data_sources()]:
|
||||
if name in [ds.name for ds in conductor.data_source_get_all(
|
||||
context.ctx())]:
|
||||
raise ex.NameAlreadyExistsException("Data source with name '%s' "
|
||||
"already exists" % name)
|
||||
|
||||
|
||||
def check_data_source_exists(data_source_id):
|
||||
if not api.get_data_source(data_source_id):
|
||||
if not conductor.data_source_get(context.ctx(), data_source_id):
|
||||
raise ex.InvalidException("DataSource with id '%s'"
|
||||
" doesn't exist" % data_source_id)
|
||||
|
||||
|
||||
def check_job_unique_name(name):
|
||||
if name in [j.name for j in api.get_jobs()]:
|
||||
if name in [j.name for j in conductor.job_get_all(context.ctx())]:
|
||||
raise ex.NameAlreadyExistsException("Job with name '%s' "
|
||||
"already exists" % name)
|
||||
|
||||
|
||||
def check_job_binary_internal_exists(jbi_id):
|
||||
if not api.get_job_binary_internal(jbi_id):
|
||||
if not conductor.job_binary_internal_get(context.ctx(), jbi_id):
|
||||
raise ex.InvalidException("JobBinaryInternal with id '%s'"
|
||||
" doesn't exist" % jbi_id)
|
||||
|
||||
|
||||
def check_data_sources_are_different(data_source_1_id, data_source_2_id):
|
||||
ds1 = api.get_data_source(data_source_1_id)
|
||||
ds2 = api.get_data_source(data_source_2_id)
|
||||
ds1 = conductor.data_source_get(context.ctx(), data_source_1_id)
|
||||
ds2 = conductor.data_source_get(context.ctx(), data_source_2_id)
|
||||
|
||||
if ds1.type == ds2.type and ds1.url == ds2.url:
|
||||
raise ex.InvalidDataException('Provided input and output '
|
||||
|
106
sahara/service/validations/edp/job_execution.py
Normal file
106
sahara/service/validations/edp/job_execution.py
Normal file
@ -0,0 +1,106 @@
|
||||
# Copyright (c) 2013 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara import conductor as c
|
||||
from sahara import context
|
||||
from sahara import exceptions as ex
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins import base as plugin_base
|
||||
import sahara.service.validations.edp.base as b
|
||||
|
||||
JOB_EXEC_SCHEMA = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"input_id": {
|
||||
"type": "string",
|
||||
"format": "uuid",
|
||||
},
|
||||
"output_id": {
|
||||
"type": "string",
|
||||
"format": "uuid",
|
||||
},
|
||||
"cluster_id": {
|
||||
"type": "string",
|
||||
"format": "uuid",
|
||||
},
|
||||
"job_configs": b.job_configs,
|
||||
},
|
||||
"additionalProperties": False,
|
||||
"required": [
|
||||
"cluster_id"
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
conductor = c.API
|
||||
|
||||
|
||||
def _is_main_class_present(data):
|
||||
return data and 'edp.java.main_class' in data.get('job_configs',
|
||||
{}).get('configs', {})
|
||||
|
||||
|
||||
def check_main_class_present(data, job):
|
||||
if not _is_main_class_present(data):
|
||||
raise ex.InvalidDataException(
|
||||
_('%s job must specify edp.java.main_class') % job.type)
|
||||
|
||||
|
||||
def _streaming_present(data):
|
||||
try:
|
||||
streaming = set(('edp.streaming.mapper',
|
||||
'edp.streaming.reducer'))
|
||||
configs = set(data['job_configs']['configs'])
|
||||
return streaming.intersection(configs) == streaming
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def check_streaming_present(data, job):
|
||||
if not _streaming_present(data):
|
||||
raise ex.InvalidDataException(
|
||||
_("%s job must specify streaming mapper and reducer") % job.type)
|
||||
|
||||
|
||||
def check_job_execution(data, job_id):
|
||||
ctx = context.ctx()
|
||||
|
||||
cluster = conductor.cluster_get(ctx, data['cluster_id'])
|
||||
if not cluster:
|
||||
raise ex.InvalidException(
|
||||
_("Cluster with id '%s' doesn't exist") % data['cluster_id'])
|
||||
|
||||
job = conductor.job_get(ctx, job_id)
|
||||
|
||||
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
||||
edp_engine = plugin.get_edp_engine(cluster, job.type)
|
||||
if not edp_engine:
|
||||
raise ex.InvalidException(
|
||||
_("Cluster with id '%(cluster_id)s' doesn't support job type "
|
||||
"'%(job_type)s'") % {"cluster_id": cluster.id,
|
||||
"job_type": job.type})
|
||||
|
||||
edp_engine.validate_job_execution(cluster, job, data)
|
||||
|
||||
|
||||
def check_data_sources(data, job):
|
||||
if not ('input_id' in data and 'output_id' in data):
|
||||
raise ex.InvalidDataException(_("%s job requires 'input_id' "
|
||||
"and 'output_id'") % job.type)
|
||||
|
||||
b.check_data_source_exists(data['input_id'])
|
||||
b.check_data_source_exists(data['output_id'])
|
||||
|
||||
b.check_data_sources_are_different(data['input_id'], data['output_id'])
|
@ -1,92 +0,0 @@
|
||||
# Copyright (c) 2013 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import sahara.exceptions as ex
|
||||
from sahara.service.edp import api
|
||||
import sahara.service.validations.base as main_base
|
||||
import sahara.service.validations.edp.base as b
|
||||
from sahara.utils import edp
|
||||
|
||||
JOB_EXEC_SCHEMA = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"input_id": {
|
||||
"type": "string",
|
||||
"format": "uuid",
|
||||
},
|
||||
"output_id": {
|
||||
"type": "string",
|
||||
"format": "uuid",
|
||||
},
|
||||
"cluster_id": {
|
||||
"type": "string",
|
||||
"format": "uuid",
|
||||
},
|
||||
"job_configs": b.job_configs,
|
||||
},
|
||||
"additionalProperties": False,
|
||||
"required": [
|
||||
"cluster_id"
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def _is_main_class_present(data):
|
||||
return data and 'edp.java.main_class' in data.get('job_configs',
|
||||
{}).get('configs', {})
|
||||
|
||||
|
||||
def _streaming_present(data):
|
||||
try:
|
||||
streaming = set(('edp.streaming.mapper',
|
||||
'edp.streaming.reducer'))
|
||||
configs = set(data['job_configs']['configs'])
|
||||
return streaming.intersection(configs) == streaming
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def check_job_executor(data, job_id):
|
||||
job = api.get_job(job_id)
|
||||
job_type, subtype = edp.split_job_type(job.type)
|
||||
|
||||
# Check if cluster contains Oozie service to run job
|
||||
main_base.check_edp_job_support(data['cluster_id'])
|
||||
|
||||
# All types except Java/Spark require input and output objects
|
||||
# and Java/Spark require main class
|
||||
if job_type in [edp.JOB_TYPE_JAVA, edp.JOB_TYPE_SPARK]:
|
||||
if not _is_main_class_present(data):
|
||||
raise ex.InvalidDataException('%s job must '
|
||||
'specify edp.java.main_class'
|
||||
% job.type)
|
||||
else:
|
||||
if not ('input_id' in data and 'output_id' in data):
|
||||
raise ex.InvalidDataException("%s job requires 'input_id' "
|
||||
"and 'output_id'" % job.type)
|
||||
|
||||
b.check_data_source_exists(data['input_id'])
|
||||
b.check_data_source_exists(data['output_id'])
|
||||
|
||||
b.check_data_sources_are_different(data['input_id'], data['output_id'])
|
||||
|
||||
if job_type == edp.JOB_TYPE_MAPREDUCE and (
|
||||
subtype == edp.JOB_SUBTYPE_STREAMING
|
||||
and not _streaming_present(data)):
|
||||
raise ex.InvalidDataException("%s job "
|
||||
"must specify streaming mapper "
|
||||
"and reducer" % job.type)
|
||||
|
||||
main_base.check_cluster_exists(data['cluster_id'])
|
@ -13,8 +13,12 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
from sahara import conductor as cond
|
||||
from sahara import context
|
||||
from sahara import exceptions as ex
|
||||
from sahara.plugins import base as pb
|
||||
from sahara.service.edp.spark import engine
|
||||
from sahara.tests.unit import base
|
||||
@ -30,16 +34,24 @@ class SparkPluginTest(base.SaharaWithDbTestCase):
|
||||
self.override_config("plugins", ["spark"])
|
||||
pb.setup_plugins()
|
||||
|
||||
def test_plugin09_no_edp_engine(self):
|
||||
def test_plugin09_edp_engine_validation(self):
|
||||
cluster_dict = {
|
||||
'name': 'cluster',
|
||||
'plugin_name': 'spark',
|
||||
'hadoop_version': '0.9.1',
|
||||
'default_image_id': 'image'}
|
||||
|
||||
job = mock.Mock()
|
||||
job.type = edp.JOB_TYPE_SPARK
|
||||
|
||||
cluster = conductor.cluster_create(context.ctx(), cluster_dict)
|
||||
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
|
||||
self.assertIsNone(plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK))
|
||||
edp_engine = plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK)
|
||||
with testtools.ExpectedException(
|
||||
ex.InvalidDataException,
|
||||
value_re="Spark 1.0.0 or higher required to run "
|
||||
"spark Spark jobs"):
|
||||
edp_engine.validate_job_execution(cluster, job, mock.Mock())
|
||||
|
||||
def test_plugin10_edp_engine(self):
|
||||
cluster_dict = {
|
||||
|
@ -20,20 +20,14 @@ import six
|
||||
|
||||
from sahara import main
|
||||
from sahara.service import api
|
||||
from sahara.service.validations import base as validation_base
|
||||
from sahara.service.validations.edp import job_executor as je
|
||||
from sahara.service.validations.edp import job_execution as je
|
||||
from sahara.tests.unit.service.validation import utils as u
|
||||
from sahara.tests.unit import testutils as tu
|
||||
from sahara.utils import edp
|
||||
|
||||
|
||||
def wrap_it(data):
|
||||
je.check_job_executor(data, 0)
|
||||
|
||||
|
||||
class FakeJob(object):
|
||||
type = edp.JOB_TYPE_MAPREDUCE_STREAMING
|
||||
libs = []
|
||||
je.check_job_execution(data, 0)
|
||||
|
||||
|
||||
class TestJobExecValidation(u.ValidationTestCase):
|
||||
@ -47,24 +41,33 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
self.override_config('plugins', main.CONF['plugins'] + ['spark'])
|
||||
api.plugin_base.setup_plugins()
|
||||
|
||||
@mock.patch('sahara.service.validations.edp.base.'
|
||||
'check_data_sources_are_different', lambda x, y: None)
|
||||
@mock.patch('sahara.service.validations.base.check_cluster_exists',
|
||||
lambda x: None)
|
||||
@mock.patch('sahara.service.validations.base.check_edp_job_support')
|
||||
@mock.patch('sahara.service.validations'
|
||||
'.edp.base.check_data_source_exists')
|
||||
@mock.patch('sahara.service.edp.api.get_job')
|
||||
def test_streaming(self, get_job, check_data, check_cluster):
|
||||
check_cluster.return_value = True
|
||||
check_data.return_value = True
|
||||
get_job.return_value = FakeJob()
|
||||
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
|
||||
@mock.patch('sahara.conductor.api.LocalApi.data_source_get')
|
||||
@mock.patch('sahara.conductor.api.LocalApi.job_get')
|
||||
def test_streaming(self, get_job, get_data_source, get_cluster):
|
||||
get_job.return_value = mock.Mock(
|
||||
type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[])
|
||||
|
||||
ds1_id = six.text_type(uuid.uuid4())
|
||||
ds2_id = six.text_type(uuid.uuid4())
|
||||
|
||||
data_sources = {
|
||||
ds1_id: mock.Mock(type="swift", url="http://swift/test"),
|
||||
ds2_id: mock.Mock(type="swift", url="http://swift/test2"),
|
||||
}
|
||||
|
||||
get_data_source.side_effect = lambda ctx, x: data_sources[x]
|
||||
|
||||
ng = tu.make_ng_dict('master', 42, ['oozie'], 1,
|
||||
instances=[tu.make_inst_dict('id', 'name')])
|
||||
get_cluster.return_value = tu.create_cluster("cluster", "tenant1",
|
||||
"vanilla", "1.2.1", [ng])
|
||||
|
||||
self._assert_create_object_validation(
|
||||
data={
|
||||
"cluster_id": six.text_type(uuid.uuid4()),
|
||||
"input_id": six.text_type(uuid.uuid4()),
|
||||
"output_id": six.text_type(uuid.uuid4()),
|
||||
"input_id": ds1_id,
|
||||
"output_id": ds2_id,
|
||||
"job_configs": {"configs": {},
|
||||
"params": {},
|
||||
"args": []}
|
||||
@ -77,8 +80,8 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
self._assert_create_object_validation(
|
||||
data={
|
||||
"cluster_id": six.text_type(uuid.uuid4()),
|
||||
"input_id": six.text_type(uuid.uuid4()),
|
||||
"output_id": six.text_type(uuid.uuid4()),
|
||||
"input_id": ds1_id,
|
||||
"output_id": ds2_id,
|
||||
"job_configs": {
|
||||
"configs": {
|
||||
"edp.streaming.mapper": "/bin/cat",
|
||||
@ -87,14 +90,12 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
"args": []}
|
||||
})
|
||||
|
||||
@mock.patch('sahara.service.validations.base.check_cluster_exists',
|
||||
lambda x: None)
|
||||
@mock.patch('sahara.service.validations.base.check_edp_job_support',
|
||||
lambda x: None)
|
||||
@mock.patch('sahara.service.edp.api.get_data_source')
|
||||
@mock.patch('sahara.service.edp.api.get_job')
|
||||
def test_data_sources_differ(self, get_job, get_data_source):
|
||||
get_job.return_value = FakeJob()
|
||||
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
|
||||
@mock.patch('sahara.conductor.api.LocalApi.data_source_get')
|
||||
@mock.patch('sahara.conductor.api.LocalApi.job_get')
|
||||
def test_data_sources_differ(self, get_job, get_data_source, get_cluster):
|
||||
get_job.return_value = mock.Mock(
|
||||
type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[])
|
||||
|
||||
ds1_id = six.text_type(uuid.uuid4())
|
||||
ds2_id = six.text_type(uuid.uuid4())
|
||||
@ -104,7 +105,12 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
ds2_id: mock.Mock(type="swift", url="http://swift/test2"),
|
||||
}
|
||||
|
||||
get_data_source.side_effect = lambda x: data_sources[x]
|
||||
get_data_source.side_effect = lambda ctx, x: data_sources[x]
|
||||
|
||||
ng = tu.make_ng_dict('master', 42, ['oozie'], 1,
|
||||
instances=[tu.make_inst_dict('id', 'name')])
|
||||
get_cluster.return_value = tu.create_cluster("cluster", "tenant1",
|
||||
"vanilla", "1.2.1", [ng])
|
||||
|
||||
self._assert_create_object_validation(
|
||||
data={
|
||||
@ -138,10 +144,16 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
},
|
||||
bad_req_i=(1, "INVALID_DATA", err_msg))
|
||||
|
||||
@mock.patch('sahara.service.api.get_cluster')
|
||||
@mock.patch('sahara.service.edp.api.get_job')
|
||||
def test_check_edp_job_support(self, get_job, get_cluster):
|
||||
get_job.return_value = FakeJob()
|
||||
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
|
||||
@mock.patch('sahara.conductor.api.LocalApi.job_get')
|
||||
def test_check_edp_no_oozie(self, get_job, get_cluster):
|
||||
get_job.return_value = mock.Mock(type=edp.JOB_TYPE_PIG, libs=[])
|
||||
|
||||
ng = tu.make_ng_dict('master', 42, ['namenode'], 1,
|
||||
instances=[tu.make_inst_dict('id', 'name')])
|
||||
get_cluster.return_value = tu.create_cluster("cluster", "tenant1",
|
||||
"vanilla", "1.2.1", [ng])
|
||||
|
||||
self._assert_create_object_validation(
|
||||
data={
|
||||
"cluster_id": six.text_type(uuid.uuid4()),
|
||||
@ -152,14 +164,8 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
"Hadoop cluster should contain 1 oozie component(s). "
|
||||
"Actual oozie count is 0"))
|
||||
|
||||
ng = tu.make_ng_dict('master', 42, ['oozie'], 1,
|
||||
instances=[tu.make_inst_dict('id', 'name')])
|
||||
get_cluster.return_value = tu.create_cluster("cluster", "tenant1",
|
||||
"vanilla", "1.2.1", [ng])
|
||||
validation_base.check_edp_job_support('some_id')
|
||||
|
||||
@mock.patch('sahara.service.api.get_cluster')
|
||||
@mock.patch('sahara.service.edp.api.get_job')
|
||||
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
|
||||
@mock.patch('sahara.conductor.api.LocalApi.job_get')
|
||||
def test_check_edp_job_support_spark(self, get_job, get_cluster):
|
||||
# utils.start_patch will construct a vanilla cluster as a
|
||||
# default for get_cluster, but we want a Spark cluster.
|
||||
@ -167,9 +173,7 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
|
||||
# Note that this means we cannot use assert_create_object_validation()
|
||||
# because it calls start_patch() and will override our setting
|
||||
job = mock.Mock()
|
||||
job.type = edp.JOB_TYPE_SPARK
|
||||
job.mains = ["main"]
|
||||
job = mock.Mock(type=edp.JOB_TYPE_SPARK, mains=["main"])
|
||||
get_job.return_value = job
|
||||
ng = tu.make_ng_dict('master', 42, [], 1,
|
||||
instances=[tu.make_inst_dict('id', 'name')])
|
||||
@ -183,17 +187,15 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
"configs": {
|
||||
"edp.java.main_class": "org.me.class"}}})
|
||||
|
||||
@mock.patch('sahara.service.validations.base.check_edp_job_support')
|
||||
@mock.patch('sahara.service.validations.base.check_cluster_exists')
|
||||
@mock.patch('sahara.service.edp.api.get_job')
|
||||
def _test_edp_main_class(self, job_type,
|
||||
get_job,
|
||||
check_cluster_exists,
|
||||
check_edp_job_support):
|
||||
check_cluster_exists.return_value = True
|
||||
check_edp_job_support.return_value = None
|
||||
get_job.return_value = mock.Mock()
|
||||
get_job.return_value.type = job_type
|
||||
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
|
||||
@mock.patch('sahara.conductor.api.LocalApi.job_get')
|
||||
def test_edp_main_class_java(self, job_get, cluster_get):
|
||||
job_get.return_value = mock.Mock()
|
||||
job_get.return_value.type = edp.JOB_TYPE_JAVA
|
||||
ng = tu.make_ng_dict('master', 42, ['namenode', 'oozie'], 1,
|
||||
instances=[tu.make_inst_dict('id', 'name')])
|
||||
cluster_get.return_value = tu.create_cluster("cluster", "tenant1",
|
||||
"vanilla", "1.2.1", [ng])
|
||||
|
||||
self._assert_create_object_validation(
|
||||
data={
|
||||
@ -204,7 +206,7 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
},
|
||||
bad_req_i=(1, "INVALID_DATA",
|
||||
"%s job must "
|
||||
"specify edp.java.main_class" % job_type))
|
||||
"specify edp.java.main_class" % edp.JOB_TYPE_JAVA))
|
||||
|
||||
self._assert_create_object_validation(
|
||||
data={
|
||||
@ -216,6 +218,32 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
"args": []}
|
||||
})
|
||||
|
||||
def test_edp_main_class(self):
|
||||
for job_type in (edp.JOB_TYPE_JAVA, edp.JOB_TYPE_SPARK):
|
||||
self._test_edp_main_class(job_type)
|
||||
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
|
||||
@mock.patch('sahara.conductor.api.LocalApi.job_get')
|
||||
def test_edp_main_class_spark(self, job_get, cluster_get):
|
||||
job_get.return_value = mock.Mock(type=edp.JOB_TYPE_SPARK)
|
||||
ng = tu.make_ng_dict('master', 42, ['namenode'], 1,
|
||||
instances=[tu.make_inst_dict('id', 'name')])
|
||||
cluster_get.return_value = tu.create_cluster("cluster", "tenant1",
|
||||
"spark", "1.0.0", [ng])
|
||||
|
||||
self._assert_create_object_validation(
|
||||
data={
|
||||
"cluster_id": six.text_type(uuid.uuid4()),
|
||||
"job_configs": {"configs": {},
|
||||
"params": {},
|
||||
"args": []}
|
||||
},
|
||||
bad_req_i=(1, "INVALID_DATA",
|
||||
"%s job must "
|
||||
"specify edp.java.main_class" % edp.JOB_TYPE_SPARK))
|
||||
|
||||
self._assert_create_object_validation(
|
||||
data={
|
||||
"cluster_id": six.text_type(uuid.uuid4()),
|
||||
"job_configs": {
|
||||
"configs": {
|
||||
"edp.java.main_class": "org.me.myclass"},
|
||||
"params": {},
|
||||
"args": []}
|
||||
})
|
||||
|
@ -115,9 +115,6 @@ def start_patch(patch_templates=True):
|
||||
"sahara.service.api.get_node_group_templates")
|
||||
get_ng_template_p = mock.patch(
|
||||
"sahara.service.api.get_node_group_template")
|
||||
get_plugins_p = mock.patch("sahara.service.api.get_plugins")
|
||||
get_plugin_p = mock.patch(
|
||||
"sahara.plugins.base.PluginManager.get_plugin")
|
||||
if patch_templates:
|
||||
get_cl_templates_p = mock.patch(
|
||||
"sahara.service.api.get_cluster_templates")
|
||||
@ -134,8 +131,6 @@ def start_patch(patch_templates=True):
|
||||
if patch_templates:
|
||||
get_ng_templates = get_ng_templates_p.start()
|
||||
get_ng_template = get_ng_template_p.start()
|
||||
get_plugins = get_plugins_p.start()
|
||||
get_plugin = get_plugin_p.start()
|
||||
if patch_templates:
|
||||
get_cl_templates = get_cl_templates_p.start()
|
||||
get_cl_template_p.start()
|
||||
@ -213,26 +208,16 @@ def start_patch(patch_templates=True):
|
||||
|
||||
get_cl_templates.return_value = [r.ClusterTemplateResource(ct_dict)]
|
||||
|
||||
vanilla = plugin.VanillaProvider()
|
||||
vanilla.name = 'vanilla'
|
||||
get_plugins.return_value = [vanilla]
|
||||
|
||||
def _get_ng_template(id):
|
||||
for template in get_ng_templates():
|
||||
if template.id == id:
|
||||
return template
|
||||
return None
|
||||
|
||||
def _get_plugin(name):
|
||||
if name == 'vanilla':
|
||||
return vanilla
|
||||
return None
|
||||
|
||||
get_plugin.side_effect = _get_plugin
|
||||
if patch_templates:
|
||||
get_ng_template.side_effect = _get_ng_template
|
||||
# request data to validate
|
||||
patchers = [get_clusters_p, get_cluster_p, get_plugins_p, get_plugin_p,
|
||||
patchers = [get_clusters_p, get_cluster_p,
|
||||
nova_p, keystone_p, get_image_p, heat_p]
|
||||
if patch_templates:
|
||||
patchers.extend([get_ng_template_p, get_ng_templates_p,
|
||||
|
@ -14,11 +14,16 @@
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import uuid
|
||||
|
||||
import six
|
||||
|
||||
from sahara.conductor import resource as r
|
||||
|
||||
|
||||
def create_cluster(name, tenant, plugin, version, node_groups, **kwargs):
|
||||
dct = {'name': name, 'tenant_id': tenant, 'plugin_name': plugin,
|
||||
dct = {'id': six.text_type(uuid.uuid4()), 'name': name,
|
||||
'tenant_id': tenant, 'plugin_name': plugin,
|
||||
'hadoop_version': version, 'node_groups': node_groups,
|
||||
'cluster_configs': {}, "sahara_info": {}}
|
||||
dct.update(kwargs)
|
||||
|
Loading…
Reference in New Issue
Block a user