From b2c01f5d132be2543dd6014f63cfaa0676a376ce Mon Sep 17 00:00:00 2001 From: Ethan Gafford Date: Tue, 27 Jan 2015 12:48:52 -0500 Subject: [PATCH] [EDP] Unified Map to Define Job Interface This change adds an "interface" map to the API for job creation, such that the operator registering a job can define a unified, human-readable way to pass in all arguments, parameters, and configurations that the execution of that job may require or accept. This will allow platform-agnostic wizarding at the job execution phase and allows users to document use of their own jobs once in a persistent, standardized format. Change-Id: I59b9b679a650361ddcd30975891496fdfbabb93c Partially Implements: blueprint unified-job-interface-map --- sahara/api/v11.py | 2 +- sahara/conductor/objects.py | 2 + sahara/conductor/resource.py | 1 + .../versions/022_add_job_interface.py | 67 ++++ sahara/db/sqlalchemy/api.py | 62 +++- sahara/db/sqlalchemy/models.py | 32 ++ sahara/service/edp/api.py | 4 +- sahara/service/validations/edp/job.py | 9 +- .../service/validations/edp/job_execution.py | 5 + .../service/validations/edp/job_interface.py | 204 ++++++++++++ .../tests/unit/conductor/manager/test_edp.py | 8 +- .../conductor/manager/test_edp_interface.py | 115 +++++++ .../unit/db/migration/test_migrations.py | 54 +++- .../tests/unit/service/edp/edp_test_utils.py | 5 +- .../validation/edp/test_job_executor.py | 16 +- .../validation/edp/test_job_interface.py | 299 ++++++++++++++++++ sahara/tests/unit/service/validation/utils.py | 3 +- sahara/utils/edp.py | 11 + 18 files changed, 862 insertions(+), 37 deletions(-) create mode 100644 sahara/db/migration/alembic_migrations/versions/022_add_job_interface.py create mode 100644 sahara/service/validations/edp/job_interface.py create mode 100644 sahara/tests/unit/conductor/manager/test_edp_interface.py create mode 100644 sahara/tests/unit/service/validation/edp/test_job_interface.py diff --git a/sahara/api/v11.py b/sahara/api/v11.py index e118ebb0..17d15d7c 100644 --- a/sahara/api/v11.py +++ b/sahara/api/v11.py @@ -116,7 +116,7 @@ def job_list(): @rest.post('/jobs') @acl.enforce("data-processing:jobs:create") -@v.validate(v_j.JOB_SCHEMA, v_j.check_mains_libs) +@v.validate(v_j.JOB_SCHEMA, v_j.check_mains_libs, v_j.check_interface) def job_create(data): return u.render(api.create_job(data).to_wrapped_dict()) diff --git a/sahara/conductor/objects.py b/sahara/conductor/objects.py index 85c13ad7..12bacf9a 100644 --- a/sahara/conductor/objects.py +++ b/sahara/conductor/objects.py @@ -242,6 +242,7 @@ class JobExecution(object): oozie_job_id return_code job_configs + interface extra data_source_urls """ @@ -257,6 +258,7 @@ class Job(object): type mains libs + interface """ diff --git a/sahara/conductor/resource.py b/sahara/conductor/resource.py index 5080cd8a..2d09421c 100644 --- a/sahara/conductor/resource.py +++ b/sahara/conductor/resource.py @@ -258,6 +258,7 @@ class JobExecution(Resource, objects.JobExecution): _filter_fields = ['extra'] _sanitize_fields = {'job_configs': sanitize_job_configs, 'info': sanitize_info} + # TODO(egafford): Sanitize interface ("secret" bool field on job args?) class JobBinary(Resource, objects.JobBinary): diff --git a/sahara/db/migration/alembic_migrations/versions/022_add_job_interface.py b/sahara/db/migration/alembic_migrations/versions/022_add_job_interface.py new file mode 100644 index 00000000..b14453e5 --- /dev/null +++ b/sahara/db/migration/alembic_migrations/versions/022_add_job_interface.py @@ -0,0 +1,67 @@ +# Copyright 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""add_job_interface + +Revision ID: 022 +Revises: 021 +Create Date: 2015-01-27 15:53:22.128263 + +""" + +# revision identifiers, used by Alembic. +revision = '022' +down_revision = '021' + +from alembic import op +import sqlalchemy as sa + +MYSQL_ENGINE = 'InnoDB' +MYSQL_CHARSET = 'utf8' + + +def upgrade(): + op.create_table('job_interface_arguments', + sa.Column('created_at', sa.DateTime(), + nullable=True), + sa.Column('updated_at', sa.DateTime(), + nullable=True), + sa.Column('id', sa.String(length=36), + nullable=False), + sa.Column('job_id', sa.String(length=36), + nullable=False), + sa.Column('tenant_id', sa.String(length=36), + nullable=True), + sa.Column('name', sa.String(80), + nullable=False), + sa.Column('description', sa.Text()), + sa.Column('mapping_type', sa.String(80), + nullable=False), + sa.Column('location', sa.Text(), + nullable=False), + sa.Column('value_type', sa.String(80), + nullable=False), + sa.Column('required', sa.Boolean(), + nullable=False), + sa.Column('order', sa.SmallInteger(), + nullable=False), + sa.Column('default', sa.Text()), + sa.ForeignKeyConstraint(['job_id'], + ['jobs.id']), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('job_id', 'order'), + sa.UniqueConstraint('job_id', 'name'), + mysql_engine=MYSQL_ENGINE, + mysql_charset=MYSQL_CHARSET) diff --git a/sahara/db/sqlalchemy/api.py b/sahara/db/sqlalchemy/api.py index b829e74e..9a0ebd89 100644 --- a/sahara/db/sqlalchemy/api.py +++ b/sahara/db/sqlalchemy/api.py @@ -720,13 +720,56 @@ def job_execution_count(context, **kwargs): return query.filter_by(**kwargs).first()[0] +def _get_config_section(configs, mapping_type): + if mapping_type not in configs: + configs[mapping_type] = [] if mapping_type == "args" else {} + return configs[mapping_type] + + +def _merge_execution_interface(job_ex, job, execution_interface): + """Merges the interface for a job execution with that of its job.""" + configs = job_ex.job_configs or {} + nonexistent = object() + positional_args = {} + for arg in job.interface: + value = nonexistent + typed_configs = _get_config_section(configs, arg.mapping_type) + # Interface args are our first choice for the value. + if arg.name in execution_interface: + value = execution_interface[arg.name] + else: + # If a default exists, we can use that, but... + if arg.default is not None: + value = arg.default + # We should prefer an argument passed through the + # job_configs that maps to the same location. + if arg.mapping_type != "args": + value = typed_configs.get(arg.location, value) + if value is not nonexistent: + if arg.mapping_type != "args": + typed_configs[arg.location] = value + else: + positional_args[int(arg.location)] = value + if positional_args: + positional_args = [positional_args[i] for i + in range(len(positional_args))] + configs["args"] = positional_args + configs["args"] + if configs and not job_ex.job_configs: + job_ex.job_configs = configs + + def job_execution_create(context, values): session = get_session() + execution_interface = values.pop('interface', {}) job_ex = m.JobExecution() job_ex.update(values) try: with session.begin(): + job_ex.interface = [] + job = _job_get(context, session, job_ex.job_id) + if job.interface: + _merge_execution_interface(job_ex, job, execution_interface) session.add(job_ex) except db_exc.DBDuplicateEntry as e: raise ex.DBDuplicateEntry( @@ -784,23 +827,36 @@ def _append_job_binaries(context, session, from_list, to_list): to_list.append(job_binary) +def _append_interface(context, from_list, to_list): + for order, argument_values in enumerate(from_list): + argument_values['tenant_id'] = context.tenant_id + argument_values['order'] = order + argument = m.JobInterfaceArgument() + argument.update(argument_values) + to_list.append(argument) + + def job_create(context, values): mains = values.pop("mains", []) libs = values.pop("libs", []) + interface = values.pop("interface", []) session = get_session() try: with session.begin(): job = m.Job() job.update(values) - # libs and mains are 'lazy' objects. The initialization below - # is needed here because it provides libs and mains to be - # initialized within a session even if the lists are empty + # These are 'lazy' objects. The initialization below + # is needed here because it provides libs, mains, and + # interface to be initialized within a session even if + # the lists are empty job.mains = [] job.libs = [] + job.interface = [] _append_job_binaries(context, session, mains, job.mains) _append_job_binaries(context, session, libs, job.libs) + _append_interface(context, interface, job.interface) session.add(job) diff --git a/sahara/db/sqlalchemy/models.py b/sahara/db/sqlalchemy/models.py index cbd2f3d8..10aa8858 100644 --- a/sahara/db/sqlalchemy/models.py +++ b/sahara/db/sqlalchemy/models.py @@ -301,6 +301,7 @@ class JobExecution(mb.SaharaBase): extra = sa.Column(st.JsonDictType()) data_source_urls = sa.Column(st.JsonDictType()) + mains_association = sa.Table("mains_association", mb.SaharaBase.metadata, sa.Column("Job_id", @@ -344,13 +345,44 @@ class Job(mb.SaharaBase): libs = relationship("JobBinary", secondary=libs_association, lazy="joined") + interface = relationship('JobInterfaceArgument', + cascade="all,delete", + order_by="JobInterfaceArgument.order", + backref='job', + lazy='joined') + def to_dict(self): d = super(Job, self).to_dict() d['mains'] = [jb.to_dict() for jb in self.mains] d['libs'] = [jb.to_dict() for jb in self.libs] + d['interface'] = [arg.to_dict() for arg in self.interface] return d +class JobInterfaceArgument(mb.SaharaBase): + """JobInterfaceArgument - Configuration setting for a specific job.""" + + __tablename__ = 'job_interface_arguments' + + __table_args__ = ( + sa.UniqueConstraint('job_id', 'name'), + sa.UniqueConstraint('job_id', 'order') + ) + + id = _id_column() + job_id = sa.Column(sa.String(36), sa.ForeignKey('jobs.id'), + nullable=False) + tenant_id = sa.Column(sa.String(36)) + name = sa.Column(sa.String(80), nullable=False) + description = sa.Column(sa.Text()) + mapping_type = sa.Column(sa.String(80), nullable=False) + location = sa.Column(sa.Text(), nullable=False) + value_type = sa.Column(sa.String(80), nullable=False) + required = sa.Column(sa.Boolean(), nullable=False) + order = sa.Column(sa.SmallInteger(), nullable=False) + default = sa.Column(sa.Text()) + + class JobBinaryInternal(mb.SaharaBase): """JobBinaryInternal - raw binary storage for executable jobs.""" diff --git a/sahara/service/edp/api.py b/sahara/service/edp/api.py index 7f0e89fb..dc4b77f0 100644 --- a/sahara/service/edp/api.py +++ b/sahara/service/edp/api.py @@ -111,6 +111,7 @@ def execute_job(job_id, data): # Elements common to all job types cluster_id = data['cluster_id'] configs = data.get('job_configs', {}) + interface = data.get('interface', {}) # Not in Java job types but present for all others input_id = data.get('input_id', None) @@ -121,7 +122,8 @@ def execute_job(job_id, data): job_ex_dict = {'input_id': input_id, 'output_id': output_id, 'job_id': job_id, 'cluster_id': cluster_id, 'info': {'status': edp.JOB_STATUS_PENDING}, - 'job_configs': configs, 'extra': {}} + 'job_configs': configs, 'extra': {}, + 'interface': interface} job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict) context.set_current_job_execution_id(job_execution.id) diff --git a/sahara/service/validations/edp/job.py b/sahara/service/validations/edp/job.py index 8f658501..e015a82b 100644 --- a/sahara/service/validations/edp/job.py +++ b/sahara/service/validations/edp/job.py @@ -16,8 +16,10 @@ import sahara.exceptions as e from sahara.i18n import _ from sahara.service.edp import api +from sahara.service.validations.edp import job_interface as j_i from sahara.utils import edp + JOB_SCHEMA = { "type": "object", "properties": { @@ -52,7 +54,8 @@ JOB_SCHEMA = { }, "streaming": { "type": "boolean" - } + }, + "interface": j_i.INTERFACE_ARGUMENT_SCHEMA }, "additionalProperties": False, "required": [ @@ -104,3 +107,7 @@ def check_mains_libs(data, **kwargs): # Make sure that all referenced binaries exist _check_binaries(mains) _check_binaries(libs) + + +def check_interface(data, **kwargs): + j_i.check_job_interface(data, **kwargs) diff --git a/sahara/service/validations/edp/job_execution.py b/sahara/service/validations/edp/job_execution.py index c87efc38..7fdd5ae2 100644 --- a/sahara/service/validations/edp/job_execution.py +++ b/sahara/service/validations/edp/job_execution.py @@ -19,6 +19,7 @@ from sahara import exceptions as ex from sahara.i18n import _ from sahara.plugins import base as plugin_base import sahara.service.validations.edp.base as b +import sahara.service.validations.edp.job_interface as j_i JOB_EXEC_SCHEMA = { "type": "object", @@ -35,6 +36,9 @@ JOB_EXEC_SCHEMA = { "type": "string", "format": "uuid", }, + "interface": { + "type": "simple_config", + }, "job_configs": b.job_configs, }, "additionalProperties": False, @@ -92,6 +96,7 @@ def check_job_execution(data, job_id): "'%(job_type)s'") % {"cluster_id": cluster.id, "job_type": job.type}) + j_i.check_execution_interface(data, job) edp_engine.validate_job_execution(cluster, job, data) diff --git a/sahara/service/validations/edp/job_interface.py b/sahara/service/validations/edp/job_interface.py new file mode 100644 index 00000000..50db702b --- /dev/null +++ b/sahara/service/validations/edp/job_interface.py @@ -0,0 +1,204 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_utils import uuidutils +import six +from six.moves.urllib import parse as urlparse + +import sahara.exceptions as e +from sahara.i18n import _ +from sahara.service.validations.edp import base as b +from sahara.utils import edp + + +DATA_TYPE_STRING = "string" +DATA_TYPE_NUMBER = "number" +DATA_TYPE_DATA_SOURCE = "data_source" + +DATA_TYPES = [DATA_TYPE_STRING, + DATA_TYPE_NUMBER, + DATA_TYPE_DATA_SOURCE] +DEFAULT_DATA_TYPE = DATA_TYPE_STRING + + +INTERFACE_ARGUMENT_SCHEMA = { + "type": ["array", "null"], + "uniqueItems": True, + "items": { + "type": "object", + "properties": { + "name": { + "type": "string", + "minLength": 1 + }, + "description": { + "type": ["string", "null"] + }, + "mapping_type": { + "type": "string", + "enum": ["args", "configs", "params"] + }, + "location": { + "type": "string", + "minLength": 1 + }, + "value_type": { + "type": "string", + "enum": DATA_TYPES, + "default": "string" + }, + "required": { + "type": "boolean" + }, + "default": { + "type": ["string", "null"] + } + }, + "additionalProperties": False, + "required": ["name", "mapping_type", "location", "required"] + } +} + + +def _check_job_interface(data, interface): + names = set(arg["name"] for arg in interface) + if len(names) != len(interface): + raise e.InvalidDataException( + _("Name must be unique within the interface for any job.")) + + mapping_types = set(arg["mapping_type"] for arg in interface) + acceptable_types = edp.JOB_TYPES_ACCEPTABLE_CONFIGS[data["type"]] + if any(m_type not in acceptable_types for m_type in mapping_types): + args = {"mapping_types": str(list(acceptable_types)), + "job_type": data["type"]} + raise e.InvalidDataException( + _("Only mapping types %(mapping_types)s are allowed for job type " + "%(job_type)s.") % args) + + positional_args = [arg for arg in interface + if arg["mapping_type"] == "args"] + if not all(six.text_type(arg["location"]).isnumeric() + for arg in positional_args): + raise e.InvalidDataException( + _("Locations of positional arguments must be an unbroken integer " + "sequence ascending from 0.")) + locations = set(int(arg["location"]) for arg in positional_args) + if not all(i in locations for i in range(len(locations))): + raise e.InvalidDataException( + _("Locations of positional arguments must be an unbroken integer " + "sequence ascending from 0.")) + + not_required = (arg for arg in positional_args if not arg["required"]) + if not all(arg.get("default", None) for arg in not_required): + raise e.InvalidDataException( + _("Positional arguments must be given default values if they are " + "not required.")) + + mappings = ((arg["mapping_type"], arg["location"]) for arg in interface) + if len(set(mappings)) != len(interface): + raise e.InvalidDataException( + _("The combination of mapping type and location must be unique " + "within the interface for any job.")) + + for arg in interface: + if "value_type" not in arg: + arg["value_type"] = DEFAULT_DATA_TYPE + default = arg.get("default", None) + if default is not None: + _validate_value(arg["value_type"], default) + + +def check_job_interface(data, **kwargs): + interface = data.get("interface", []) + if interface: + _check_job_interface(data, interface) + + +def _validate_data_source(value): + if uuidutils.is_uuid_like(value): + b.check_data_source_exists(value) + else: + if not urlparse.urlparse(value).scheme: + raise e.InvalidDataException( + _("Data source value '%s' is neither a valid data source ID " + "nor a valid URL.") % value) + + +def _validate_number(value): + if not six.text_type(value).isnumeric(): + raise e.InvalidDataException( + _("Value '%s' is not a valid number.") % value) + + +def _validate_string(value): + if not isinstance(value, six.string_types): + raise e.InvalidDataException( + _("Value '%s' is not a valid string.") % value) + + +_value_type_validators = { + DATA_TYPE_STRING: _validate_string, + DATA_TYPE_NUMBER: _validate_number, + DATA_TYPE_DATA_SOURCE: _validate_data_source +} + + +def _validate_value(type, value): + _value_type_validators[type](value) + + +def check_execution_interface(data, job): + job_int = {arg.name: arg for arg in job.interface} + execution_int = data.get("interface", None) + + if not (job_int or execution_int): + return + if job_int and execution_int is None: + raise e.InvalidDataException( + _("An interface was specified with the template for this job. " + "Please pass an interface map with this job (even if empty).")) + + execution_names = set(execution_int.keys()) + + definition_names = set(job_int.keys()) + not_found_names = execution_names - definition_names + if not_found_names: + raise e.InvalidDataException( + _("Argument names: %s were not found in the interface for this " + "job.") % str(list(not_found_names))) + + required_names = {arg.name for arg in job.interface if arg.required} + unset_names = required_names - execution_names + if unset_names: + raise e.InvalidDataException(_("Argument names: %s are required for " + "this job.") % str(list(unset_names))) + + nonexistent = object() + for name, value in six.iteritems(execution_int): + arg = job_int[name] + _validate_value(arg.value_type, value) + if arg.mapping_type == "args": + continue + typed_configs = data.get("job_configs", {}).get(arg.mapping_type, {}) + config_value = typed_configs.get(arg.location, nonexistent) + if config_value is not nonexistent and config_value != value: + args = {"name": name, + "mapping_type": arg.mapping_type, + "location": arg.location} + raise e.InvalidDataException( + _("Argument '%(name)s' was passed both through the interface " + "and in location '%(mapping_type)s'.'%(location)s'. Please " + "pass this through either the interface or the " + "configuration maps, not both.") % args) diff --git a/sahara/tests/unit/conductor/manager/test_edp.py b/sahara/tests/unit/conductor/manager/test_edp.py index a763230d..a872febd 100644 --- a/sahara/tests/unit/conductor/manager/test_edp.py +++ b/sahara/tests/unit/conductor/manager/test_edp.py @@ -27,7 +27,7 @@ from sahara.utils import edp SAMPLE_DATA_SOURCE = { - "tenant_id": "test_tenant", + "tenant_id": "tenant_1", "name": "ngt_test", "description": "test_desc", "type": "Cassandra", @@ -39,7 +39,7 @@ SAMPLE_DATA_SOURCE = { } SAMPLE_JOB = { - "tenant_id": "test_tenant", + "tenant_id": "tenant_1", "name": "job_test", "description": "test_desc", "type": edp.JOB_TYPE_PIG, @@ -47,7 +47,7 @@ SAMPLE_JOB = { } SAMPLE_JOB_EXECUTION = { - "tenant_id": "tenant_id", + "tenant_id": "tenant_1", "return_code": "1", "job_id": "undefined", "input_id": "undefined", @@ -57,7 +57,7 @@ SAMPLE_JOB_EXECUTION = { } SAMPLE_CONF_JOB_EXECUTION = { - "tenant_id": "tenant_id", + "tenant_id": "tenant_1", "progress": "0.1", "return_code": "1", "job_id": "undefined", diff --git a/sahara/tests/unit/conductor/manager/test_edp_interface.py b/sahara/tests/unit/conductor/manager/test_edp_interface.py new file mode 100644 index 00000000..d16f600a --- /dev/null +++ b/sahara/tests/unit/conductor/manager/test_edp_interface.py @@ -0,0 +1,115 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +from sahara import context +import sahara.tests.unit.conductor.base as test_base +from sahara.tests.unit.conductor.manager import test_edp + + +def _merge_dict(original, update): + new = copy.deepcopy(original) + new.update(update) + return new + + +SAMPLE_JOB = _merge_dict(test_edp.SAMPLE_JOB, { + "interface": [ + { + "name": "Reducer Count", + "mapping_type": "configs", + "location": "mapred.reduce.tasks", + "value_type": "number", + "required": True, + "default": "1" + }, + { + "name": "Input Path", + "mapping_type": "params", + "location": "INPUT", + "value_type": "data_source", + "required": False, + "default": "hdfs://path" + }, + { + "name": "Positional Argument 2", + "mapping_type": "args", + "location": "1", + "value_type": "string", + "required": False, + "default": "default" + }, + { + "name": "Positional Argument 1", + "mapping_type": "args", + "location": "0", + "value_type": "string", + "required": False, + "default": "arg_1" + }, + + ] +}) + +SAMPLE_JOB_EXECUTION = _merge_dict(test_edp.SAMPLE_JOB, { + "interface": { + "Reducer Count": "2", + "Positional Argument 2": "arg_2" + }, + "job_configs": {"args": ["arg_3"], "configs": {"mapred.map.tasks": "3"}} +}) + + +class JobExecutionTest(test_base.ConductorManagerTestCase): + def test_interface_flows(self): + ctx = context.ctx() + job = self.api.job_create(ctx, SAMPLE_JOB) + arg_names = [arg['name'] for arg in job['interface']] + self.assertEqual(arg_names, ["Reducer Count", "Input Path", + "Positional Argument 2", + "Positional Argument 1"]) + + job_ex_input = copy.deepcopy(SAMPLE_JOB_EXECUTION) + job_ex_input['job_id'] = job['id'] + + self.api.job_execution_create(ctx, job_ex_input) + + lst = self.api.job_execution_get_all(ctx) + self.assertEqual(1, len(lst)) + + job_ex_result = lst[0] + configs = { + 'configs': {'mapred.reduce.tasks': '2', + 'mapred.map.tasks': '3'}, + 'args': ['arg_1', 'arg_2', 'arg_3'], + 'params': {'INPUT': 'hdfs://path'} + } + self.assertEqual(configs, job_ex_result['job_configs']) + self.api.job_execution_destroy(ctx, job_ex_result['id']) + + del job_ex_input['job_configs'] + self.api.job_execution_create(ctx, job_ex_input) + + lst = self.api.job_execution_get_all(ctx) + self.assertEqual(1, len(lst)) + + job_ex_result = lst[0] + configs = { + 'configs': {'mapred.reduce.tasks': '2'}, + 'args': ['arg_1', 'arg_2'], + 'params': {'INPUT': 'hdfs://path'} + } + self.assertEqual(configs, job_ex_result['job_configs']) diff --git a/sahara/tests/unit/db/migration/test_migrations.py b/sahara/tests/unit/db/migration/test_migrations.py index d042b641..4e1eb0b1 100644 --- a/sahara/tests/unit/db/migration/test_migrations.py +++ b/sahara/tests/unit/db/migration/test_migrations.py @@ -43,7 +43,7 @@ class SaharaMigrationsCheckers(object): t = db_utils.get_table(engine, table) self.assertIn(column, t.c) - def assertColumnsExists(self, engine, table, columns): + def assertColumnsExist(self, engine, table, columns): for column in columns: self.assertColumnExists(engine, table, column) @@ -90,7 +90,7 @@ class SaharaMigrationsCheckers(object): 'data', 'datasize' ] - self.assertColumnsExists( + self.assertColumnsExist( engine, 'job_binary_internal', job_binary_internal_columns) self.assertColumnCount( engine, 'job_binary_internal', job_binary_internal_columns) @@ -113,7 +113,7 @@ class SaharaMigrationsCheckers(object): 'volume_mount_prefix', 'floating_ip_pool' ] - self.assertColumnsExists( + self.assertColumnsExist( engine, 'node_group_templates', node_group_templates_columns) self.assertColumnCount( engine, 'node_group_templates', node_group_templates_columns) @@ -129,7 +129,7 @@ class SaharaMigrationsCheckers(object): 'url', 'credentials' ] - self.assertColumnsExists( + self.assertColumnsExist( engine, 'data_sources', data_sources_columns) self.assertColumnCount( engine, 'data_sources', data_sources_columns) @@ -148,7 +148,7 @@ class SaharaMigrationsCheckers(object): 'plugin_name', 'hadoop_version' ] - self.assertColumnsExists( + self.assertColumnsExist( engine, 'cluster_templates', cluster_templates_columns) self.assertColumnCount( engine, 'cluster_templates', cluster_templates_columns) @@ -163,7 +163,7 @@ class SaharaMigrationsCheckers(object): 'url', 'extra' ] - self.assertColumnsExists( + self.assertColumnsExist( engine, 'job_binaries', job_binaries_columns) self.assertColumnCount( engine, 'job_binaries', job_binaries_columns) @@ -177,7 +177,7 @@ class SaharaMigrationsCheckers(object): 'description', 'type' ] - self.assertColumnsExists(engine, 'jobs', jobs_columns) + self.assertColumnsExist(engine, 'jobs', jobs_columns) self.assertColumnCount(engine, 'jobs', jobs_columns) templates_relations_columns = [ @@ -198,7 +198,7 @@ class SaharaMigrationsCheckers(object): 'node_group_template_id', 'floating_ip_pool' ] - self.assertColumnsExists( + self.assertColumnsExist( engine, 'templates_relations', templates_relations_columns) self.assertColumnCount( engine, 'templates_relations', templates_relations_columns) @@ -207,7 +207,7 @@ class SaharaMigrationsCheckers(object): 'Job_id', 'JobBinary_id' ] - self.assertColumnsExists( + self.assertColumnsExist( engine, 'mains_association', mains_association_columns) self.assertColumnCount( engine, 'mains_association', mains_association_columns) @@ -216,7 +216,7 @@ class SaharaMigrationsCheckers(object): 'Job_id', 'JobBinary_id' ] - self.assertColumnsExists( + self.assertColumnsExist( engine, 'libs_association', libs_association_columns) self.assertColumnCount( engine, 'libs_association', libs_association_columns) @@ -245,7 +245,7 @@ class SaharaMigrationsCheckers(object): 'extra', 'cluster_template_id' ] - self.assertColumnsExists(engine, 'clusters', clusters_columns) + self.assertColumnsExist(engine, 'clusters', clusters_columns) self.assertColumnCount(engine, 'clusters', clusters_columns) node_groups_columns = [ @@ -267,7 +267,7 @@ class SaharaMigrationsCheckers(object): 'node_group_template_id', 'floating_ip_pool' ] - self.assertColumnsExists(engine, 'node_groups', node_groups_columns) + self.assertColumnsExist(engine, 'node_groups', node_groups_columns) self.assertColumnCount(engine, 'node_groups', node_groups_columns) job_executions_columns = [ @@ -288,7 +288,7 @@ class SaharaMigrationsCheckers(object): 'job_configs', 'extra' ] - self.assertColumnsExists( + self.assertColumnsExist( engine, 'job_executions', job_executions_columns) self.assertColumnCount( engine, 'job_executions', job_executions_columns) @@ -305,7 +305,7 @@ class SaharaMigrationsCheckers(object): 'management_ip', 'volumes' ] - self.assertColumnsExists(engine, 'instances', instances_columns) + self.assertColumnsExist(engine, 'instances', instances_columns) self.assertColumnCount(engine, 'instances', instances_columns) self._data_001(engine, data) @@ -425,11 +425,11 @@ class SaharaMigrationsCheckers(object): self.assertColumnCount(engine, 'cluster_provision_steps', provision_steps_columns) - self.assertColumnsExists(engine, 'cluster_provision_steps', - provision_steps_columns) + self.assertColumnsExist(engine, 'cluster_provision_steps', + provision_steps_columns) self.assertColumnCount(engine, 'cluster_events', events_columns) - self.assertColumnsExists(engine, 'cluster_events', events_columns) + self.assertColumnsExist(engine, 'cluster_events', events_columns) def _check_016(self, engine, data): self.assertColumnExists(engine, 'node_group_templates', @@ -464,6 +464,26 @@ class SaharaMigrationsCheckers(object): def _check_021(self, engine, data): self.assertColumnExists(engine, 'job_executions', 'data_source_urls') + def _check_022(self, engine, data): + columns = [ + 'created_at', + 'updated_at', + 'id', + 'job_id', + 'tenant_id', + 'name', + 'description', + 'mapping_type', + 'location', + 'value_type', + 'required', + 'order', + 'default' + ] + + self.assertColumnCount(engine, 'job_interface_arguments', columns) + self.assertColumnsExist(engine, 'job_interface_arguments', columns) + class TestMigrationsMySQL(SaharaMigrationsCheckers, base.BaseWalkMigrationTestCase, diff --git a/sahara/tests/unit/service/edp/edp_test_utils.py b/sahara/tests/unit/service/edp/edp_test_utils.py index 7ff82108..57ecc4d7 100644 --- a/sahara/tests/unit/service/edp/edp_test_utils.py +++ b/sahara/tests/unit/service/edp/edp_test_utils.py @@ -41,6 +41,7 @@ def _create_job(id, job_binary, type): job.id = id job.type = type job.name = 'special_name' + job.interface = [] if edp.compare_job_type(type, edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE): job.mains = [job_binary] job.libs = None @@ -91,6 +92,8 @@ def _create_job_exec(job_id, type, configs=None): j_exec.id = six.text_type(uuid.uuid4()) j_exec.job_id = job_id j_exec.job_configs = configs + if not j_exec.job_configs: + j_exec.job_configs = {} if edp.compare_job_type(type, edp.JOB_TYPE_JAVA): j_exec.job_configs['configs']['edp.java.main_class'] = _java_main_class j_exec.job_configs['configs']['edp.java.java_opts'] = _java_opts @@ -100,8 +103,6 @@ def _create_job_exec(job_id, type, configs=None): def _create_job_exec_with_proxy(job_id, type, configs=None): j_exec = _create_job_exec(job_id, type, configs) j_exec.id = '00000000-1111-2222-3333-4444444444444444' - if not j_exec.job_configs: - j_exec.job_configs = {} j_exec.job_configs['proxy_configs'] = { 'proxy_username': 'job_' + j_exec.id, 'proxy_password': '55555555-6666-7777-8888-999999999999', diff --git a/sahara/tests/unit/service/validation/edp/test_job_executor.py b/sahara/tests/unit/service/validation/edp/test_job_executor.py index 51bdaa69..561ee621 100644 --- a/sahara/tests/unit/service/validation/edp/test_job_executor.py +++ b/sahara/tests/unit/service/validation/edp/test_job_executor.py @@ -46,7 +46,7 @@ class TestJobExecValidation(u.ValidationTestCase): @mock.patch('sahara.conductor.api.LocalApi.job_get') def test_streaming(self, get_job, get_data_source, get_cluster): get_job.return_value = mock.Mock( - type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[]) + type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[], interface=[]) ds1_id = six.text_type(uuid.uuid4()) ds2_id = six.text_type(uuid.uuid4()) @@ -95,7 +95,7 @@ class TestJobExecValidation(u.ValidationTestCase): @mock.patch('sahara.conductor.api.LocalApi.job_get') def test_data_sources_differ(self, get_job, get_data_source, get_cluster): get_job.return_value = mock.Mock( - type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[]) + type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[], interface=[]) ds1_id = six.text_type(uuid.uuid4()) ds2_id = six.text_type(uuid.uuid4()) @@ -147,7 +147,8 @@ class TestJobExecValidation(u.ValidationTestCase): @mock.patch('sahara.conductor.api.LocalApi.cluster_get') @mock.patch('sahara.conductor.api.LocalApi.job_get') def test_check_edp_no_oozie(self, get_job, get_cluster): - get_job.return_value = mock.Mock(type=edp.JOB_TYPE_PIG, libs=[]) + get_job.return_value = mock.Mock(type=edp.JOB_TYPE_PIG, libs=[], + interface=[]) ng = tu.make_ng_dict('master', 42, ['namenode'], 1, instances=[tu.make_inst_dict('id', 'name')]) @@ -173,7 +174,7 @@ class TestJobExecValidation(u.ValidationTestCase): # Note that this means we cannot use assert_create_object_validation() # because it calls start_patch() and will override our setting - job = mock.Mock(type=edp.JOB_TYPE_SPARK, mains=["main"]) + job = mock.Mock(type=edp.JOB_TYPE_SPARK, mains=["main"], interface=[]) get_job.return_value = job ng = tu.make_ng_dict('master', 42, [], 1, instances=[tu.make_inst_dict('id', 'name')]) @@ -190,8 +191,8 @@ class TestJobExecValidation(u.ValidationTestCase): @mock.patch('sahara.conductor.api.LocalApi.cluster_get') @mock.patch('sahara.conductor.api.LocalApi.job_get') def test_edp_main_class_java(self, job_get, cluster_get): - job_get.return_value = mock.Mock() - job_get.return_value.type = edp.JOB_TYPE_JAVA + job_get.return_value = mock.Mock(type=edp.JOB_TYPE_JAVA, + interface=[]) ng = tu.make_ng_dict('master', 42, ['namenode', 'oozie'], 1, instances=[tu.make_inst_dict('id', 'name')]) cluster_get.return_value = tu.create_cluster("cluster", "tenant1", @@ -221,7 +222,8 @@ class TestJobExecValidation(u.ValidationTestCase): @mock.patch('sahara.conductor.api.LocalApi.cluster_get') @mock.patch('sahara.conductor.api.LocalApi.job_get') def test_edp_main_class_spark(self, job_get, cluster_get): - job_get.return_value = mock.Mock(type=edp.JOB_TYPE_SPARK) + job_get.return_value = mock.Mock(type=edp.JOB_TYPE_SPARK, + interface=[]) ng = tu.make_ng_dict('master', 42, ['namenode'], 1, instances=[tu.make_inst_dict('id', 'name')]) cluster_get.return_value = tu.create_cluster("cluster", "tenant1", diff --git a/sahara/tests/unit/service/validation/edp/test_job_interface.py b/sahara/tests/unit/service/validation/edp/test_job_interface.py new file mode 100644 index 00000000..8eb6c911 --- /dev/null +++ b/sahara/tests/unit/service/validation/edp/test_job_interface.py @@ -0,0 +1,299 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import itertools + +import mock + +from sahara.service.validations.edp import job as j +from sahara.service.validations.edp import job_execution as j_e +from sahara.service.validations.edp import job_interface as j_i +from sahara.tests.unit.service.validation import utils as u +from sahara.utils import edp + + +def _configs(**kwargs): + arg = { + "name": "Reducer Count", + "mapping_type": "configs", + "location": "mapred.reduce.tasks", + "value_type": "number", + "required": True, + "default": "1" + } + arg.update(kwargs) + return arg + + +def _params(**kwargs): + arg = { + "name": "Input Path", + "mapping_type": "params", + "location": "INPUT", + "value_type": "data_source", + "required": False, + "default": "hdfs://path" + } + arg.update(kwargs) + return arg + + +def _args(**kwargs): + arg = { + "name": "Positional Argument", + "mapping_type": "args", + "location": "0", + "value_type": "string", + "required": False, + "default": "arg_value" + } + arg.update(kwargs) + return arg + + +_mapping_types = {"configs", "args", "params"} + + +def _job(job_type, interface): + return {"name": "job", "type": job_type, "interface": interface} + + +_job_types = [ + _job(edp.JOB_TYPE_HIVE, [_configs(), _params()]), + _job(edp.JOB_TYPE_PIG, [_configs(), _params(), _args()]), + _job(edp.JOB_TYPE_MAPREDUCE, [_configs()]), + _job(edp.JOB_TYPE_MAPREDUCE_STREAMING, [_configs()]), + _job(edp.JOB_TYPE_JAVA, [_configs(), _args()]), + _job(edp.JOB_TYPE_SHELL, [_configs(), _params(), _args()]), + _job(edp.JOB_TYPE_SPARK, [_configs(), _args()]), + _job(edp.JOB_TYPE_STORM, [_args()]) +] + + +class TestJobInterfaceValidation(u.ValidationTestCase): + + def setUp(self): + super(TestJobInterfaceValidation, self).setUp() + self._create_object_fun = j.check_interface + self.scheme = j.JOB_SCHEMA + + def test_interface(self): + for job in _job_types: + self._assert_create_object_validation(data=job) + + def test_no_interface(self): + job = _job(edp.JOB_TYPE_PIG, None) + self._assert_create_object_validation(data=job) + + def test_overlapping_names(self): + job = _job(edp.JOB_TYPE_PIG, + [_configs(), _configs(location="mapred.map.tasks")]) + self._assert_create_object_validation( + data=job, bad_req_i=( + 1, "INVALID_DATA", + "Name must be unique within the interface for any job.")) + + def test_unacceptable_types(self): + for job in _job_types: + acceptable_types = [arg['mapping_type'] + for arg in job['interface']] + unacceptable_types = _mapping_types - set(acceptable_types) + unacceptable_args = (globals()["_" + m_type]() + for m_type in unacceptable_types) + bad_job = job.copy() + for arg in unacceptable_args: + bad_job['interface'] = [arg] + permutations = itertools.permutations(acceptable_types) + msgs = ["Only mapping types %s are allowed for job type %s." % + (list(permutation), bad_job['type']) + for permutation in permutations] + self._assert_create_object_validation( + data=bad_job, bad_req_i=(1, "INVALID_DATA", msgs)) + + def test_bad_positional_arg_locations(self): + job = _job(edp.JOB_TYPE_PIG, [_args(location="1")]) + self._assert_create_object_validation( + data=job, bad_req_i=( + 1, "INVALID_DATA", + "Locations of positional arguments must be an unbroken " + "integer sequence ascending from 0.")) + + job = _job(edp.JOB_TYPE_PIG, [_args(location="fish")]) + self._assert_create_object_validation( + data=job, bad_req_i=( + 1, "INVALID_DATA", + "Locations of positional arguments must be an unbroken " + "integer sequence ascending from 0.")) + + job = _job(edp.JOB_TYPE_PIG, + [_args(), _args(location="2", name="Argument 2")]) + self._assert_create_object_validation( + data=job, bad_req_i=( + 1, "INVALID_DATA", + "Locations of positional arguments must be an unbroken " + "integer sequence ascending from 0.")) + + def test_required_positional_arg_without_default(self): + arg = _args(required=False) + del arg['default'] + job = _job(edp.JOB_TYPE_PIG, [arg]) + self._assert_create_object_validation( + data=job, bad_req_i=( + 1, "INVALID_DATA", + "Positional arguments must be given default values if they " + "are not required.")) + + def test_overlapping_locations(self): + job = _job(edp.JOB_TYPE_PIG, + [_configs(), _configs(name="Mapper Count")]) + self._assert_create_object_validation( + data=job, bad_req_i=( + 1, "INVALID_DATA", + "The combination of mapping type and location must be unique " + "within the interface for any job.")) + + def test_number_values(self): + job = _job(edp.JOB_TYPE_PIG, [_configs(default="fish")]) + self._assert_create_object_validation( + data=job, bad_req_i=( + 1, "INVALID_DATA", + "Value 'fish' is not a valid number.")) + + @mock.patch('sahara.conductor.API.data_source_get') + def test_data_source_values(self, data_source): + data_source.return_value = True + job = _job(edp.JOB_TYPE_PIG, + [_params(default="DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF")]) + self._assert_create_object_validation(data=job) + + data_source.return_value = False + self._assert_create_object_validation( + data=job, bad_req_i=( + 1, "INVALID_REFERENCE", + "DataSource with id 'DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF' " + "doesn't exist")) + + job = _job(edp.JOB_TYPE_PIG, [_params(default="Filial Piety")]) + self._assert_create_object_validation( + data=job, bad_req_i=( + 1, "INVALID_DATA", + "Data source value 'Filial Piety' is neither a valid data " + "source ID nor a valid URL.")) + + def test_default_data_type(self): + param = _params() + del param['value_type'] + job = _job(edp.JOB_TYPE_PIG, [param]) + self._assert_create_object_validation(data=job) + assert param['value_type'] == j_i.DEFAULT_DATA_TYPE + +int_arg = collections.namedtuple("int_arg", + ["name", "mapping_type", "location", + "value_type", "required", "default"]) + + +def j_e_i_wrapper(data): + job = mock.Mock( + interface=[int_arg(**_configs()), + int_arg(**_args()), + int_arg(**_params())] + ) + j_i.check_execution_interface(data, job) + + +class TestJobExecutionInterfaceValidation(u.ValidationTestCase): + + def setUp(self): + super(TestJobExecutionInterfaceValidation, self).setUp() + self._create_object_fun = j_e_i_wrapper + self.scheme = j_e.JOB_EXEC_SCHEMA + + def test_valid_execution(self): + data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF", + "interface": {"Reducer Count": "2", + "Input Path": "swift://path", + "Positional Argument": "value"}} + self._assert_create_object_validation(data=data) + + def test_no_execution_interface(self): + data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF"} + self._assert_create_object_validation(data=data, bad_req_i=( + 1, "INVALID_DATA", + "An interface was specified with the template for this job. " + "Please pass an interface map with this job (even if empty).")) + + def test_bad_argument_name(self): + data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF", + "interface": {"Fish": "Rainbow Trout", + "Reducer Count": "2", + "Input Path": "swift://path"}} + self._assert_create_object_validation(data=data, bad_req_i=( + 1, "INVALID_DATA", "Argument names: ['Fish'] were not found in " + "the interface for this job.")) + + def test_required_argument_missing(self): + data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF", + "interface": {"Positional Argument": "Value"}} + self._assert_create_object_validation(data=data, bad_req_i=( + 1, "INVALID_DATA", "Argument names: ['Reducer Count'] are " + "required for this job.")) + + @mock.patch('sahara.conductor.API.data_source_get') + def test_bad_values(self, data_source): + data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF", + "interface": {"Reducer Count": "Two"}} + self._assert_create_object_validation(data=data, bad_req_i=( + 1, "INVALID_DATA", "Value 'Two' is not a valid number.")) + + data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF", + "interface": {"Reducer Count": "2", + "Input Path": "not_a_url"}} + self._assert_create_object_validation(data=data, bad_req_i=( + 1, "INVALID_DATA", "Data source value 'not_a_url' is neither a " + "valid data source ID nor a valid URL.")) + + data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF", + "interface": {"Reducer Count": "2", + "Positional Argument": 2}} + self._assert_create_object_validation(data=data, bad_req_i=( + 1, "INVALID_DATA", "Value '2' is not a valid string.")) + + data_source.return_value = False + data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF", + "interface": + {"Reducer Count": "2", + "Input Path": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF"}} + self._assert_create_object_validation( + data=data, bad_req_i=( + 1, "INVALID_REFERENCE", + "DataSource with id 'DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF' " + "doesn't exist")) + + def test_overlapping_data(self): + data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF", + "interface": {"Reducer Count": "2"}, + "job_configs": {"configs": {"mapred.reduce.tasks": "2"}}} + self._assert_create_object_validation(data=data) + + data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF", + "interface": {"Reducer Count": "2"}, + "job_configs": {"configs": {"mapred.reduce.tasks": "3"}}} + self._assert_create_object_validation(data=data, bad_req_i=( + 1, "INVALID_DATA", + "Argument 'Reducer Count' was passed both through the interface " + "and in location 'configs'.'mapred.reduce.tasks'. Please pass " + "this through either the interface or the configuration maps, " + "not both.")) diff --git a/sahara/tests/unit/service/validation/utils.py b/sahara/tests/unit/service/validation/utils.py index 9b2becf1..e80e744d 100644 --- a/sahara/tests/unit/service/validation/utils.py +++ b/sahara/tests/unit/service/validation/utils.py @@ -252,7 +252,8 @@ class ValidationTestCase(base.SaharaTestCase): def _assert_calls(self, mock, call_info): if not call_info: - self.assertEqual(0, mock.call_count) + self.assertEqual(0, mock.call_count, "Unexpected call to %s: %s" + % (mock.name, str(mock.call_args))) else: self.assertEqual(call_info[0], mock.call_count) self.assertEqual(call_info[1], mock.call_args[0][0].code) diff --git a/sahara/utils/edp.py b/sahara/utils/edp.py index 7db636d3..b99eae0c 100644 --- a/sahara/utils/edp.py +++ b/sahara/utils/edp.py @@ -62,6 +62,17 @@ JOB_TYPES_ALL = [ JOB_TYPE_STORM ] +JOB_TYPES_ACCEPTABLE_CONFIGS = { + JOB_TYPE_HIVE: {"configs", "params"}, + JOB_TYPE_PIG: {"configs", "params", "args"}, + JOB_TYPE_MAPREDUCE: {"configs"}, + JOB_TYPE_MAPREDUCE_STREAMING: {"configs"}, + JOB_TYPE_JAVA: {"configs", "args"}, + JOB_TYPE_SHELL: {"configs", "params", "args"}, + JOB_TYPE_SPARK: {"configs", "args"}, + JOB_TYPE_STORM: {"args"} +} + ADAPT_FOR_OOZIE = 'edp.java.adapt_for_oozie' ADAPT_SPARK_FOR_SWIFT = 'edp.spark.adapt_for_swift'