From e55238a881d5a1b5fc566cc5bebc05622297c098 Mon Sep 17 00:00:00 2001 From: Andrew Lazarev Date: Wed, 20 Aug 2014 16:51:11 -0700 Subject: [PATCH] Moved validate_edp from plugin SPI to edp_engine Now EDP engine is fully responsible on validation of data for job execution. Other changes: * Removed API calls from validation to remove circular dependancy * Removed plugins patching in validation to allow non-vanilla plugins testing * Renamed job_executor to job_execution Change-Id: I14c86f33b355cb4317e96a70109d8d72d52d3c00 Closes-Bug: #1357512 --- doc/source/devref/plugin.spi.rst | 8 - sahara/api/v11.py | 4 +- sahara/plugins/cdh/edp_engine.py | 10 ++ sahara/plugins/hdp/ambariplugin.py | 7 - sahara/plugins/hdp/edp_engine.py | 9 + sahara/plugins/provisioning.py | 4 - sahara/plugins/spark/edp_engine.py | 10 +- sahara/plugins/spark/plugin.py | 3 - sahara/plugins/vanilla/edp_engine.py | 9 + sahara/plugins/vanilla/plugin.py | 7 - sahara/service/edp/base_engine.py | 4 + sahara/service/edp/oozie/engine.py | 14 ++ sahara/service/edp/spark/engine.py | 4 + sahara/service/validations/base.py | 14 -- sahara/service/validations/edp/base.py | 19 ++- .../service/validations/edp/job_execution.py | 106 ++++++++++++ .../service/validations/edp/job_executor.py | 92 ----------- .../tests/unit/plugins/spark/test_plugin.py | 16 +- .../validation/edp/test_job_executor.py | 154 +++++++++++------- sahara/tests/unit/service/validation/utils.py | 17 +- sahara/tests/unit/testutils.py | 7 +- 21 files changed, 290 insertions(+), 228 deletions(-) create mode 100644 sahara/service/validations/edp/job_execution.py delete mode 100644 sahara/service/validations/edp/job_executor.py diff --git a/doc/source/devref/plugin.spi.rst b/doc/source/devref/plugin.spi.rst index 72cc4f11..2820fd6d 100644 --- a/doc/source/devref/plugin.spi.rst +++ b/doc/source/devref/plugin.spi.rst @@ -53,14 +53,6 @@ Validates a given cluster object. Raises *SaharaException* with meaningful messa *Example exception*: -validate_edp(cluster) -~~~~~~~~~~~~~~~~~~~~~ - -Validates that given cluster can be used to run EDP jobs. In case of -incompatibility raises *SaharaException* with meaningful message. - -*Returns*: None - validate_scaling(cluster, existing, additional) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/sahara/api/v11.py b/sahara/api/v11.py index 3843e36f..1b0cb038 100644 --- a/sahara/api/v11.py +++ b/sahara/api/v11.py @@ -20,7 +20,7 @@ from sahara.service.validations.edp import data_source as v_d_s from sahara.service.validations.edp import job as v_j from sahara.service.validations.edp import job_binary as v_j_b from sahara.service.validations.edp import job_binary_internal as v_j_b_i -from sahara.service.validations.edp import job_executor as v_j_e +from sahara.service.validations.edp import job_execution as v_j_e import sahara.utils.api as u @@ -33,7 +33,7 @@ rest = u.Rest('v11', __name__) @rest.post('/jobs//execute') @v.check_exists(api.get_job, id='job_id') -@v.validate(v_j_e.JOB_EXEC_SCHEMA, v_j_e.check_job_executor) +@v.validate(v_j_e.JOB_EXEC_SCHEMA, v_j_e.check_job_execution) def job_execute(job_id, data): return u.render(job_execution=api.execute_job(job_id, data).to_dict()) diff --git a/sahara/plugins/cdh/edp_engine.py b/sahara/plugins/cdh/edp_engine.py index d995b88b..029edbe7 100644 --- a/sahara/plugins/cdh/edp_engine.py +++ b/sahara/plugins/cdh/edp_engine.py @@ -14,6 +14,8 @@ # limitations under the License. from sahara.plugins.cdh import utils as cu +from sahara.plugins.general import exceptions as ex +from sahara.plugins.general import utils as u from sahara.service.edp import hdfs_helper from sahara.service.edp.oozie import engine as edp_engine @@ -40,3 +42,11 @@ class EdpOozieEngine(edp_engine.OozieJobEngine): def get_oozie_server(self, cluster): return cu.get_oozie(cluster) + + def validate_job_execution(self, cluster, job, data): + oo_count = u.get_instances_count(cluster, 'OOZIE_SERVER') + if oo_count != 1: + raise ex.InvalidComponentCountException( + 'OOZIE_SERVER', '1', oo_count) + + super(EdpOozieEngine, self).validate_job_execution(cluster, job, data) diff --git a/sahara/plugins/hdp/ambariplugin.py b/sahara/plugins/hdp/ambariplugin.py index 019da7bb..e7351e4a 100644 --- a/sahara/plugins/hdp/ambariplugin.py +++ b/sahara/plugins/hdp/ambariplugin.py @@ -21,7 +21,6 @@ from sahara.i18n import _ from sahara.i18n import _LI from sahara.openstack.common import log as logging from sahara.plugins.general import exceptions as ex -from sahara.plugins.general import utils as u from sahara.plugins.hdp import hadoopserver as h from sahara.plugins.hdp import saharautils as utils from sahara.plugins.hdp.versions import versionhandlerfactory as vhf @@ -142,12 +141,6 @@ class AmbariPlugin(p.ProvisioningPluginBase): self.version_factory.get_version_handler(cluster.hadoop_version)) return version_handler.get_edp_engine(cluster, job_type) - def validate_edp(self, cluster): - oo_count = u.get_instances_count(cluster, 'OOZIE_SERVER') - if oo_count != 1: - raise ex.InvalidComponentCountException( - 'OOZIE_SERVER', '1', oo_count) - def update_infra(self, cluster): pass diff --git a/sahara/plugins/hdp/edp_engine.py b/sahara/plugins/hdp/edp_engine.py index 230159a0..8de7b3b1 100644 --- a/sahara/plugins/hdp/edp_engine.py +++ b/sahara/plugins/hdp/edp_engine.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from sahara.plugins.general import exceptions as ex from sahara.plugins.general import utils as u from sahara.service.edp.oozie import engine as edp_engine @@ -30,3 +31,11 @@ class EdpOozieEngine(edp_engine.OozieJobEngine): def get_oozie_server(self, cluster): return u.get_instance(cluster, "OOZIE_SERVER") + + def validate_job_execution(self, cluster, job, data): + oo_count = u.get_instances_count(cluster, 'OOZIE_SERVER') + if oo_count != 1: + raise ex.InvalidComponentCountException( + 'OOZIE_SERVER', '1', oo_count) + + super(EdpOozieEngine, self).validate_job_execution(cluster, job, data) diff --git a/sahara/plugins/provisioning.py b/sahara/plugins/provisioning.py index 62a52280..ca1fb667 100644 --- a/sahara/plugins/provisioning.py +++ b/sahara/plugins/provisioning.py @@ -61,10 +61,6 @@ class ProvisioningPluginBase(plugins_base.PluginInterface): def scale_cluster(self, cluster, instances): pass - @plugins_base.optional - def validate_edp(self, cluster): - pass - @plugins_base.optional def get_edp_engine(self, cluster, job_type): pass diff --git a/sahara/plugins/spark/edp_engine.py b/sahara/plugins/spark/edp_engine.py index 779d6aee..172a93bf 100644 --- a/sahara/plugins/spark/edp_engine.py +++ b/sahara/plugins/spark/edp_engine.py @@ -13,8 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +from sahara import exceptions as ex +from sahara.i18n import _ from sahara.service.edp.spark import engine as edp_engine class EdpEngine(edp_engine.SparkJobEngine): - pass + def validate_job_execution(self, cluster, job, data): + if cluster.hadoop_version < "1.0.0": + raise ex.InvalidDataException( + _('Spark 1.0.0 or higher required to run spark %s jobs') + % job.type) + + super(EdpEngine, self).validate_job_execution(cluster, job, data) diff --git a/sahara/plugins/spark/plugin.py b/sahara/plugins/spark/plugin.py index fdf8be56..4d91c7f1 100644 --- a/sahara/plugins/spark/plugin.py +++ b/sahara/plugins/spark/plugin.py @@ -420,9 +420,6 @@ class SparkProvider(p.ProvisioningPluginBase): rep_factor) def get_edp_engine(self, cluster, job_type): - if cluster.hadoop_version < "1.0.0": - return - if job_type in edp_engine.EdpEngine.get_supported_job_types(): return edp_engine.EdpEngine(cluster) diff --git a/sahara/plugins/vanilla/edp_engine.py b/sahara/plugins/vanilla/edp_engine.py index bf7ec715..2b1939d9 100644 --- a/sahara/plugins/vanilla/edp_engine.py +++ b/sahara/plugins/vanilla/edp_engine.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from sahara.plugins.general import exceptions as ex +from sahara.plugins.general import utils as u from sahara.plugins.vanilla import utils as vu from sahara.service.edp.oozie import engine as edp_engine @@ -29,3 +31,10 @@ class EdpOozieEngine(edp_engine.OozieJobEngine): def get_oozie_server(self, cluster): return vu.get_oozie(cluster) + + def validate_job_execution(self, cluster, job, data): + oo_count = u.get_instances_count(cluster, 'oozie') + if oo_count != 1: + raise ex.InvalidComponentCountException('oozie', '1', oo_count) + + super(EdpOozieEngine, self).validate_job_execution(cluster, job, data) diff --git a/sahara/plugins/vanilla/plugin.py b/sahara/plugins/vanilla/plugin.py index 1c8f8686..03a6b3b9 100644 --- a/sahara/plugins/vanilla/plugin.py +++ b/sahara/plugins/vanilla/plugin.py @@ -14,8 +14,6 @@ # limitations under the License. from sahara.i18n import _ -from sahara.plugins.general import exceptions as ex -from sahara.plugins.general import utils as u from sahara.plugins import provisioning as p from sahara.plugins.vanilla import versionfactory as vhf @@ -70,11 +68,6 @@ class VanillaProvider(p.ProvisioningPluginBase): cluster.hadoop_version).validate_scaling(cluster, existing, additional) - def validate_edp(self, cluster): - oo_count = u.get_instances_count(cluster, 'oozie') - if oo_count != 1: - raise ex.InvalidComponentCountException('oozie', '1', oo_count) - def get_edp_engine(self, cluster, job_type): return self._get_version_handler( cluster.hadoop_version).get_edp_engine(cluster, job_type) diff --git a/sahara/service/edp/base_engine.py b/sahara/service/edp/base_engine.py index f183177f..09d5f06c 100644 --- a/sahara/service/edp/base_engine.py +++ b/sahara/service/edp/base_engine.py @@ -37,6 +37,10 @@ class JobEngine(object): def run_job(self, job_execution): pass + @abc.abstractmethod + def validate_job_execution(self, cluster, job, data): + pass + @staticmethod @abc.abstractmethod def get_possible_job_config(job_type): diff --git a/sahara/service/edp/oozie/engine.py b/sahara/service/edp/oozie/engine.py index a47c4174..38d46493 100644 --- a/sahara/service/edp/oozie/engine.py +++ b/sahara/service/edp/oozie/engine.py @@ -27,6 +27,7 @@ from sahara.service.edp import hdfs_helper as h from sahara.service.edp import job_utils from sahara.service.edp.oozie import oozie as o from sahara.service.edp.oozie.workflow_creator import workflow_factory +from sahara.service.validations.edp import job_execution as j from sahara.utils import edp from sahara.utils import remote from sahara.utils import xmlutils as x @@ -140,6 +141,19 @@ class OozieJobEngine(base_engine.JobEngine): def get_resource_manager_uri(self, cluster): pass + def validate_job_execution(self, cluster, job, data): + # All types except Java require input and output objects + # and Java require main class + if job.type in [edp.JOB_TYPE_JAVA]: + j.check_main_class_present(data, job) + else: + j.check_data_sources(data, job) + + job_type, subtype = edp.split_job_type(job.type) + if job_type == edp.JOB_TYPE_MAPREDUCE and ( + subtype == edp.JOB_SUBTYPE_STREAMING): + j.check_streaming_present(data, job) + @staticmethod def get_possible_job_config(job_type): return workflow_factory.get_possible_job_config(job_type) diff --git a/sahara/service/edp/spark/engine.py b/sahara/service/edp/spark/engine.py index 04255c6a..2d8e5fc9 100644 --- a/sahara/service/edp/spark/engine.py +++ b/sahara/service/edp/spark/engine.py @@ -22,6 +22,7 @@ from sahara.plugins.general import utils as plugin_utils from sahara.plugins.spark import config_helper as c_helper from sahara.service.edp import base_engine from sahara.service.edp import job_utils +from sahara.service.validations.edp import job_execution as j from sahara.utils import edp from sahara.utils import files from sahara.utils import general @@ -185,6 +186,9 @@ class SparkJobEngine(base_engine.JobEngine): raise e.EDPError("Spark job execution failed. Exit status = %s, " "stdout = %s" % (ret, stdout)) + def validate_job_execution(self, cluster, job, data): + j.check_main_class_present(data, job) + @staticmethod def get_possible_job_config(job_type): return {'job_config': {'configs': [], 'args': []}} diff --git a/sahara/service/validations/base.py b/sahara/service/validations/base.py index 4d49a00c..ab2fa65c 100644 --- a/sahara/service/validations/base.py +++ b/sahara/service/validations/base.py @@ -217,12 +217,6 @@ def check_heat_stack_name(cluster_name): % cluster_name) -def check_cluster_exists(id): - if not api.get_cluster(id): - raise ex.InvalidException( - _("Cluster with id '%s' doesn't exist") % id) - - def check_cluster_hostnames_lengths(cluster_name, node_groups): for ng in node_groups: longest_hostname = g.generate_instance_name(cluster_name, @@ -367,11 +361,3 @@ def check_required_image_tags(plugin_name, hadoop_version, image_id): " tags ['%(name)s', '%(version)s']") % {'image': image_id, 'name': plugin_name, 'version': hadoop_version}) - - -# EDP - - -def check_edp_job_support(cluster_id): - cluster = api.get_cluster(cluster_id) - plugin_base.PLUGINS.get_plugin(cluster.plugin_name).validate_edp(cluster) diff --git a/sahara/service/validations/edp/base.py b/sahara/service/validations/edp/base.py index fafb08d8..1c000ad5 100644 --- a/sahara/service/validations/edp/base.py +++ b/sahara/service/validations/edp/base.py @@ -15,9 +15,11 @@ """Cluster creation related checks""" -import sahara.exceptions as ex -import sahara.service.edp.api as api +from sahara import conductor as c +from sahara import context +from sahara import exceptions as ex +conductor = c.API data_source_type = { "type": "string", @@ -45,32 +47,33 @@ job_configs = { def check_data_source_unique_name(name): - if name in [ds.name for ds in api.get_data_sources()]: + if name in [ds.name for ds in conductor.data_source_get_all( + context.ctx())]: raise ex.NameAlreadyExistsException("Data source with name '%s' " "already exists" % name) def check_data_source_exists(data_source_id): - if not api.get_data_source(data_source_id): + if not conductor.data_source_get(context.ctx(), data_source_id): raise ex.InvalidException("DataSource with id '%s'" " doesn't exist" % data_source_id) def check_job_unique_name(name): - if name in [j.name for j in api.get_jobs()]: + if name in [j.name for j in conductor.job_get_all(context.ctx())]: raise ex.NameAlreadyExistsException("Job with name '%s' " "already exists" % name) def check_job_binary_internal_exists(jbi_id): - if not api.get_job_binary_internal(jbi_id): + if not conductor.job_binary_internal_get(context.ctx(), jbi_id): raise ex.InvalidException("JobBinaryInternal with id '%s'" " doesn't exist" % jbi_id) def check_data_sources_are_different(data_source_1_id, data_source_2_id): - ds1 = api.get_data_source(data_source_1_id) - ds2 = api.get_data_source(data_source_2_id) + ds1 = conductor.data_source_get(context.ctx(), data_source_1_id) + ds2 = conductor.data_source_get(context.ctx(), data_source_2_id) if ds1.type == ds2.type and ds1.url == ds2.url: raise ex.InvalidDataException('Provided input and output ' diff --git a/sahara/service/validations/edp/job_execution.py b/sahara/service/validations/edp/job_execution.py new file mode 100644 index 00000000..4f4081c5 --- /dev/null +++ b/sahara/service/validations/edp/job_execution.py @@ -0,0 +1,106 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sahara import conductor as c +from sahara import context +from sahara import exceptions as ex +from sahara.i18n import _ +from sahara.plugins import base as plugin_base +import sahara.service.validations.edp.base as b + +JOB_EXEC_SCHEMA = { + "type": "object", + "properties": { + "input_id": { + "type": "string", + "format": "uuid", + }, + "output_id": { + "type": "string", + "format": "uuid", + }, + "cluster_id": { + "type": "string", + "format": "uuid", + }, + "job_configs": b.job_configs, + }, + "additionalProperties": False, + "required": [ + "cluster_id" + ] +} + + +conductor = c.API + + +def _is_main_class_present(data): + return data and 'edp.java.main_class' in data.get('job_configs', + {}).get('configs', {}) + + +def check_main_class_present(data, job): + if not _is_main_class_present(data): + raise ex.InvalidDataException( + _('%s job must specify edp.java.main_class') % job.type) + + +def _streaming_present(data): + try: + streaming = set(('edp.streaming.mapper', + 'edp.streaming.reducer')) + configs = set(data['job_configs']['configs']) + return streaming.intersection(configs) == streaming + except Exception: + return False + + +def check_streaming_present(data, job): + if not _streaming_present(data): + raise ex.InvalidDataException( + _("%s job must specify streaming mapper and reducer") % job.type) + + +def check_job_execution(data, job_id): + ctx = context.ctx() + + cluster = conductor.cluster_get(ctx, data['cluster_id']) + if not cluster: + raise ex.InvalidException( + _("Cluster with id '%s' doesn't exist") % data['cluster_id']) + + job = conductor.job_get(ctx, job_id) + + plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) + edp_engine = plugin.get_edp_engine(cluster, job.type) + if not edp_engine: + raise ex.InvalidException( + _("Cluster with id '%(cluster_id)s' doesn't support job type " + "'%(job_type)s'") % {"cluster_id": cluster.id, + "job_type": job.type}) + + edp_engine.validate_job_execution(cluster, job, data) + + +def check_data_sources(data, job): + if not ('input_id' in data and 'output_id' in data): + raise ex.InvalidDataException(_("%s job requires 'input_id' " + "and 'output_id'") % job.type) + + b.check_data_source_exists(data['input_id']) + b.check_data_source_exists(data['output_id']) + + b.check_data_sources_are_different(data['input_id'], data['output_id']) diff --git a/sahara/service/validations/edp/job_executor.py b/sahara/service/validations/edp/job_executor.py deleted file mode 100644 index 4f2c641e..00000000 --- a/sahara/service/validations/edp/job_executor.py +++ /dev/null @@ -1,92 +0,0 @@ -# Copyright (c) 2013 Mirantis Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sahara.exceptions as ex -from sahara.service.edp import api -import sahara.service.validations.base as main_base -import sahara.service.validations.edp.base as b -from sahara.utils import edp - -JOB_EXEC_SCHEMA = { - "type": "object", - "properties": { - "input_id": { - "type": "string", - "format": "uuid", - }, - "output_id": { - "type": "string", - "format": "uuid", - }, - "cluster_id": { - "type": "string", - "format": "uuid", - }, - "job_configs": b.job_configs, - }, - "additionalProperties": False, - "required": [ - "cluster_id" - ] -} - - -def _is_main_class_present(data): - return data and 'edp.java.main_class' in data.get('job_configs', - {}).get('configs', {}) - - -def _streaming_present(data): - try: - streaming = set(('edp.streaming.mapper', - 'edp.streaming.reducer')) - configs = set(data['job_configs']['configs']) - return streaming.intersection(configs) == streaming - except Exception: - return False - - -def check_job_executor(data, job_id): - job = api.get_job(job_id) - job_type, subtype = edp.split_job_type(job.type) - - # Check if cluster contains Oozie service to run job - main_base.check_edp_job_support(data['cluster_id']) - - # All types except Java/Spark require input and output objects - # and Java/Spark require main class - if job_type in [edp.JOB_TYPE_JAVA, edp.JOB_TYPE_SPARK]: - if not _is_main_class_present(data): - raise ex.InvalidDataException('%s job must ' - 'specify edp.java.main_class' - % job.type) - else: - if not ('input_id' in data and 'output_id' in data): - raise ex.InvalidDataException("%s job requires 'input_id' " - "and 'output_id'" % job.type) - - b.check_data_source_exists(data['input_id']) - b.check_data_source_exists(data['output_id']) - - b.check_data_sources_are_different(data['input_id'], data['output_id']) - - if job_type == edp.JOB_TYPE_MAPREDUCE and ( - subtype == edp.JOB_SUBTYPE_STREAMING - and not _streaming_present(data)): - raise ex.InvalidDataException("%s job " - "must specify streaming mapper " - "and reducer" % job.type) - - main_base.check_cluster_exists(data['cluster_id']) diff --git a/sahara/tests/unit/plugins/spark/test_plugin.py b/sahara/tests/unit/plugins/spark/test_plugin.py index d2c3ef48..417c6cf9 100644 --- a/sahara/tests/unit/plugins/spark/test_plugin.py +++ b/sahara/tests/unit/plugins/spark/test_plugin.py @@ -13,8 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock +import testtools + from sahara import conductor as cond from sahara import context +from sahara import exceptions as ex from sahara.plugins import base as pb from sahara.service.edp.spark import engine from sahara.tests.unit import base @@ -30,16 +34,24 @@ class SparkPluginTest(base.SaharaWithDbTestCase): self.override_config("plugins", ["spark"]) pb.setup_plugins() - def test_plugin09_no_edp_engine(self): + def test_plugin09_edp_engine_validation(self): cluster_dict = { 'name': 'cluster', 'plugin_name': 'spark', 'hadoop_version': '0.9.1', 'default_image_id': 'image'} + job = mock.Mock() + job.type = edp.JOB_TYPE_SPARK + cluster = conductor.cluster_create(context.ctx(), cluster_dict) plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) - self.assertIsNone(plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK)) + edp_engine = plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK) + with testtools.ExpectedException( + ex.InvalidDataException, + value_re="Spark 1.0.0 or higher required to run " + "spark Spark jobs"): + edp_engine.validate_job_execution(cluster, job, mock.Mock()) def test_plugin10_edp_engine(self): cluster_dict = { diff --git a/sahara/tests/unit/service/validation/edp/test_job_executor.py b/sahara/tests/unit/service/validation/edp/test_job_executor.py index 11cb6ad3..51bdaa69 100644 --- a/sahara/tests/unit/service/validation/edp/test_job_executor.py +++ b/sahara/tests/unit/service/validation/edp/test_job_executor.py @@ -20,20 +20,14 @@ import six from sahara import main from sahara.service import api -from sahara.service.validations import base as validation_base -from sahara.service.validations.edp import job_executor as je +from sahara.service.validations.edp import job_execution as je from sahara.tests.unit.service.validation import utils as u from sahara.tests.unit import testutils as tu from sahara.utils import edp def wrap_it(data): - je.check_job_executor(data, 0) - - -class FakeJob(object): - type = edp.JOB_TYPE_MAPREDUCE_STREAMING - libs = [] + je.check_job_execution(data, 0) class TestJobExecValidation(u.ValidationTestCase): @@ -47,24 +41,33 @@ class TestJobExecValidation(u.ValidationTestCase): self.override_config('plugins', main.CONF['plugins'] + ['spark']) api.plugin_base.setup_plugins() - @mock.patch('sahara.service.validations.edp.base.' - 'check_data_sources_are_different', lambda x, y: None) - @mock.patch('sahara.service.validations.base.check_cluster_exists', - lambda x: None) - @mock.patch('sahara.service.validations.base.check_edp_job_support') - @mock.patch('sahara.service.validations' - '.edp.base.check_data_source_exists') - @mock.patch('sahara.service.edp.api.get_job') - def test_streaming(self, get_job, check_data, check_cluster): - check_cluster.return_value = True - check_data.return_value = True - get_job.return_value = FakeJob() + @mock.patch('sahara.conductor.api.LocalApi.cluster_get') + @mock.patch('sahara.conductor.api.LocalApi.data_source_get') + @mock.patch('sahara.conductor.api.LocalApi.job_get') + def test_streaming(self, get_job, get_data_source, get_cluster): + get_job.return_value = mock.Mock( + type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[]) + + ds1_id = six.text_type(uuid.uuid4()) + ds2_id = six.text_type(uuid.uuid4()) + + data_sources = { + ds1_id: mock.Mock(type="swift", url="http://swift/test"), + ds2_id: mock.Mock(type="swift", url="http://swift/test2"), + } + + get_data_source.side_effect = lambda ctx, x: data_sources[x] + + ng = tu.make_ng_dict('master', 42, ['oozie'], 1, + instances=[tu.make_inst_dict('id', 'name')]) + get_cluster.return_value = tu.create_cluster("cluster", "tenant1", + "vanilla", "1.2.1", [ng]) self._assert_create_object_validation( data={ "cluster_id": six.text_type(uuid.uuid4()), - "input_id": six.text_type(uuid.uuid4()), - "output_id": six.text_type(uuid.uuid4()), + "input_id": ds1_id, + "output_id": ds2_id, "job_configs": {"configs": {}, "params": {}, "args": []} @@ -77,8 +80,8 @@ class TestJobExecValidation(u.ValidationTestCase): self._assert_create_object_validation( data={ "cluster_id": six.text_type(uuid.uuid4()), - "input_id": six.text_type(uuid.uuid4()), - "output_id": six.text_type(uuid.uuid4()), + "input_id": ds1_id, + "output_id": ds2_id, "job_configs": { "configs": { "edp.streaming.mapper": "/bin/cat", @@ -87,14 +90,12 @@ class TestJobExecValidation(u.ValidationTestCase): "args": []} }) - @mock.patch('sahara.service.validations.base.check_cluster_exists', - lambda x: None) - @mock.patch('sahara.service.validations.base.check_edp_job_support', - lambda x: None) - @mock.patch('sahara.service.edp.api.get_data_source') - @mock.patch('sahara.service.edp.api.get_job') - def test_data_sources_differ(self, get_job, get_data_source): - get_job.return_value = FakeJob() + @mock.patch('sahara.conductor.api.LocalApi.cluster_get') + @mock.patch('sahara.conductor.api.LocalApi.data_source_get') + @mock.patch('sahara.conductor.api.LocalApi.job_get') + def test_data_sources_differ(self, get_job, get_data_source, get_cluster): + get_job.return_value = mock.Mock( + type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[]) ds1_id = six.text_type(uuid.uuid4()) ds2_id = six.text_type(uuid.uuid4()) @@ -104,7 +105,12 @@ class TestJobExecValidation(u.ValidationTestCase): ds2_id: mock.Mock(type="swift", url="http://swift/test2"), } - get_data_source.side_effect = lambda x: data_sources[x] + get_data_source.side_effect = lambda ctx, x: data_sources[x] + + ng = tu.make_ng_dict('master', 42, ['oozie'], 1, + instances=[tu.make_inst_dict('id', 'name')]) + get_cluster.return_value = tu.create_cluster("cluster", "tenant1", + "vanilla", "1.2.1", [ng]) self._assert_create_object_validation( data={ @@ -138,10 +144,16 @@ class TestJobExecValidation(u.ValidationTestCase): }, bad_req_i=(1, "INVALID_DATA", err_msg)) - @mock.patch('sahara.service.api.get_cluster') - @mock.patch('sahara.service.edp.api.get_job') - def test_check_edp_job_support(self, get_job, get_cluster): - get_job.return_value = FakeJob() + @mock.patch('sahara.conductor.api.LocalApi.cluster_get') + @mock.patch('sahara.conductor.api.LocalApi.job_get') + def test_check_edp_no_oozie(self, get_job, get_cluster): + get_job.return_value = mock.Mock(type=edp.JOB_TYPE_PIG, libs=[]) + + ng = tu.make_ng_dict('master', 42, ['namenode'], 1, + instances=[tu.make_inst_dict('id', 'name')]) + get_cluster.return_value = tu.create_cluster("cluster", "tenant1", + "vanilla", "1.2.1", [ng]) + self._assert_create_object_validation( data={ "cluster_id": six.text_type(uuid.uuid4()), @@ -152,14 +164,8 @@ class TestJobExecValidation(u.ValidationTestCase): "Hadoop cluster should contain 1 oozie component(s). " "Actual oozie count is 0")) - ng = tu.make_ng_dict('master', 42, ['oozie'], 1, - instances=[tu.make_inst_dict('id', 'name')]) - get_cluster.return_value = tu.create_cluster("cluster", "tenant1", - "vanilla", "1.2.1", [ng]) - validation_base.check_edp_job_support('some_id') - - @mock.patch('sahara.service.api.get_cluster') - @mock.patch('sahara.service.edp.api.get_job') + @mock.patch('sahara.conductor.api.LocalApi.cluster_get') + @mock.patch('sahara.conductor.api.LocalApi.job_get') def test_check_edp_job_support_spark(self, get_job, get_cluster): # utils.start_patch will construct a vanilla cluster as a # default for get_cluster, but we want a Spark cluster. @@ -167,9 +173,7 @@ class TestJobExecValidation(u.ValidationTestCase): # Note that this means we cannot use assert_create_object_validation() # because it calls start_patch() and will override our setting - job = mock.Mock() - job.type = edp.JOB_TYPE_SPARK - job.mains = ["main"] + job = mock.Mock(type=edp.JOB_TYPE_SPARK, mains=["main"]) get_job.return_value = job ng = tu.make_ng_dict('master', 42, [], 1, instances=[tu.make_inst_dict('id', 'name')]) @@ -183,17 +187,15 @@ class TestJobExecValidation(u.ValidationTestCase): "configs": { "edp.java.main_class": "org.me.class"}}}) - @mock.patch('sahara.service.validations.base.check_edp_job_support') - @mock.patch('sahara.service.validations.base.check_cluster_exists') - @mock.patch('sahara.service.edp.api.get_job') - def _test_edp_main_class(self, job_type, - get_job, - check_cluster_exists, - check_edp_job_support): - check_cluster_exists.return_value = True - check_edp_job_support.return_value = None - get_job.return_value = mock.Mock() - get_job.return_value.type = job_type + @mock.patch('sahara.conductor.api.LocalApi.cluster_get') + @mock.patch('sahara.conductor.api.LocalApi.job_get') + def test_edp_main_class_java(self, job_get, cluster_get): + job_get.return_value = mock.Mock() + job_get.return_value.type = edp.JOB_TYPE_JAVA + ng = tu.make_ng_dict('master', 42, ['namenode', 'oozie'], 1, + instances=[tu.make_inst_dict('id', 'name')]) + cluster_get.return_value = tu.create_cluster("cluster", "tenant1", + "vanilla", "1.2.1", [ng]) self._assert_create_object_validation( data={ @@ -204,7 +206,7 @@ class TestJobExecValidation(u.ValidationTestCase): }, bad_req_i=(1, "INVALID_DATA", "%s job must " - "specify edp.java.main_class" % job_type)) + "specify edp.java.main_class" % edp.JOB_TYPE_JAVA)) self._assert_create_object_validation( data={ @@ -216,6 +218,32 @@ class TestJobExecValidation(u.ValidationTestCase): "args": []} }) - def test_edp_main_class(self): - for job_type in (edp.JOB_TYPE_JAVA, edp.JOB_TYPE_SPARK): - self._test_edp_main_class(job_type) + @mock.patch('sahara.conductor.api.LocalApi.cluster_get') + @mock.patch('sahara.conductor.api.LocalApi.job_get') + def test_edp_main_class_spark(self, job_get, cluster_get): + job_get.return_value = mock.Mock(type=edp.JOB_TYPE_SPARK) + ng = tu.make_ng_dict('master', 42, ['namenode'], 1, + instances=[tu.make_inst_dict('id', 'name')]) + cluster_get.return_value = tu.create_cluster("cluster", "tenant1", + "spark", "1.0.0", [ng]) + + self._assert_create_object_validation( + data={ + "cluster_id": six.text_type(uuid.uuid4()), + "job_configs": {"configs": {}, + "params": {}, + "args": []} + }, + bad_req_i=(1, "INVALID_DATA", + "%s job must " + "specify edp.java.main_class" % edp.JOB_TYPE_SPARK)) + + self._assert_create_object_validation( + data={ + "cluster_id": six.text_type(uuid.uuid4()), + "job_configs": { + "configs": { + "edp.java.main_class": "org.me.myclass"}, + "params": {}, + "args": []} + }) diff --git a/sahara/tests/unit/service/validation/utils.py b/sahara/tests/unit/service/validation/utils.py index 62a5aa9e..49d112c0 100644 --- a/sahara/tests/unit/service/validation/utils.py +++ b/sahara/tests/unit/service/validation/utils.py @@ -113,9 +113,6 @@ def start_patch(patch_templates=True): "sahara.service.api.get_node_group_templates") get_ng_template_p = mock.patch( "sahara.service.api.get_node_group_template") - get_plugins_p = mock.patch("sahara.service.api.get_plugins") - get_plugin_p = mock.patch( - "sahara.plugins.base.PluginManager.get_plugin") if patch_templates: get_cl_templates_p = mock.patch( "sahara.service.api.get_cluster_templates") @@ -132,8 +129,6 @@ def start_patch(patch_templates=True): if patch_templates: get_ng_templates = get_ng_templates_p.start() get_ng_template = get_ng_template_p.start() - get_plugins = get_plugins_p.start() - get_plugin = get_plugin_p.start() if patch_templates: get_cl_templates = get_cl_templates_p.start() get_cl_template_p.start() @@ -211,26 +206,16 @@ def start_patch(patch_templates=True): get_cl_templates.return_value = [r.ClusterTemplateResource(ct_dict)] - vanilla = plugin.VanillaProvider() - vanilla.name = 'vanilla' - get_plugins.return_value = [vanilla] - def _get_ng_template(id): for template in get_ng_templates(): if template.id == id: return template return None - def _get_plugin(name): - if name == 'vanilla': - return vanilla - return None - - get_plugin.side_effect = _get_plugin if patch_templates: get_ng_template.side_effect = _get_ng_template # request data to validate - patchers = [get_clusters_p, get_cluster_p, get_plugins_p, get_plugin_p, + patchers = [get_clusters_p, get_cluster_p, nova_p, keystone_p, get_image_p, heat_p] if patch_templates: patchers.extend([get_ng_template_p, get_ng_templates_p, diff --git a/sahara/tests/unit/testutils.py b/sahara/tests/unit/testutils.py index 13539131..b001680f 100644 --- a/sahara/tests/unit/testutils.py +++ b/sahara/tests/unit/testutils.py @@ -14,11 +14,16 @@ # limitations under the License. +import uuid + +import six + from sahara.conductor import resource as r def create_cluster(name, tenant, plugin, version, node_groups, **kwargs): - dct = {'name': name, 'tenant_id': tenant, 'plugin_name': plugin, + dct = {'id': six.text_type(uuid.uuid4()), 'name': name, + 'tenant_id': tenant, 'plugin_name': plugin, 'hadoop_version': version, 'node_groups': node_groups, 'cluster_configs': {}} dct.update(kwargs)