[EDP] Unified Map to Define Job Interface
This change adds an "interface" map to the API for job creation, such that the operator registering a job can define a unified, human-readable way to pass in all arguments, parameters, and configurations that the execution of that job may require or accept. This will allow platform-agnostic wizarding at the job execution phase and allows users to document use of their own jobs once in a persistent, standardized format. Change-Id: I59b9b679a650361ddcd30975891496fdfbabb93c Partially Implements: blueprint unified-job-interface-map
This commit is contained in:
parent
853ce1187a
commit
b2c01f5d13
@ -116,7 +116,7 @@ def job_list():
|
||||
|
||||
@rest.post('/jobs')
|
||||
@acl.enforce("data-processing:jobs:create")
|
||||
@v.validate(v_j.JOB_SCHEMA, v_j.check_mains_libs)
|
||||
@v.validate(v_j.JOB_SCHEMA, v_j.check_mains_libs, v_j.check_interface)
|
||||
def job_create(data):
|
||||
return u.render(api.create_job(data).to_wrapped_dict())
|
||||
|
||||
|
@ -242,6 +242,7 @@ class JobExecution(object):
|
||||
oozie_job_id
|
||||
return_code
|
||||
job_configs
|
||||
interface
|
||||
extra
|
||||
data_source_urls
|
||||
"""
|
||||
@ -257,6 +258,7 @@ class Job(object):
|
||||
type
|
||||
mains
|
||||
libs
|
||||
interface
|
||||
"""
|
||||
|
||||
|
||||
|
@ -258,6 +258,7 @@ class JobExecution(Resource, objects.JobExecution):
|
||||
_filter_fields = ['extra']
|
||||
_sanitize_fields = {'job_configs': sanitize_job_configs,
|
||||
'info': sanitize_info}
|
||||
# TODO(egafford): Sanitize interface ("secret" bool field on job args?)
|
||||
|
||||
|
||||
class JobBinary(Resource, objects.JobBinary):
|
||||
|
@ -0,0 +1,67 @@
|
||||
# Copyright 2015 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""add_job_interface
|
||||
|
||||
Revision ID: 022
|
||||
Revises: 021
|
||||
Create Date: 2015-01-27 15:53:22.128263
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '022'
|
||||
down_revision = '021'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
MYSQL_ENGINE = 'InnoDB'
|
||||
MYSQL_CHARSET = 'utf8'
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table('job_interface_arguments',
|
||||
sa.Column('created_at', sa.DateTime(),
|
||||
nullable=True),
|
||||
sa.Column('updated_at', sa.DateTime(),
|
||||
nullable=True),
|
||||
sa.Column('id', sa.String(length=36),
|
||||
nullable=False),
|
||||
sa.Column('job_id', sa.String(length=36),
|
||||
nullable=False),
|
||||
sa.Column('tenant_id', sa.String(length=36),
|
||||
nullable=True),
|
||||
sa.Column('name', sa.String(80),
|
||||
nullable=False),
|
||||
sa.Column('description', sa.Text()),
|
||||
sa.Column('mapping_type', sa.String(80),
|
||||
nullable=False),
|
||||
sa.Column('location', sa.Text(),
|
||||
nullable=False),
|
||||
sa.Column('value_type', sa.String(80),
|
||||
nullable=False),
|
||||
sa.Column('required', sa.Boolean(),
|
||||
nullable=False),
|
||||
sa.Column('order', sa.SmallInteger(),
|
||||
nullable=False),
|
||||
sa.Column('default', sa.Text()),
|
||||
sa.ForeignKeyConstraint(['job_id'],
|
||||
['jobs.id']),
|
||||
sa.PrimaryKeyConstraint('id'),
|
||||
sa.UniqueConstraint('job_id', 'order'),
|
||||
sa.UniqueConstraint('job_id', 'name'),
|
||||
mysql_engine=MYSQL_ENGINE,
|
||||
mysql_charset=MYSQL_CHARSET)
|
@ -720,13 +720,56 @@ def job_execution_count(context, **kwargs):
|
||||
return query.filter_by(**kwargs).first()[0]
|
||||
|
||||
|
||||
def _get_config_section(configs, mapping_type):
|
||||
if mapping_type not in configs:
|
||||
configs[mapping_type] = [] if mapping_type == "args" else {}
|
||||
return configs[mapping_type]
|
||||
|
||||
|
||||
def _merge_execution_interface(job_ex, job, execution_interface):
|
||||
"""Merges the interface for a job execution with that of its job."""
|
||||
configs = job_ex.job_configs or {}
|
||||
nonexistent = object()
|
||||
positional_args = {}
|
||||
for arg in job.interface:
|
||||
value = nonexistent
|
||||
typed_configs = _get_config_section(configs, arg.mapping_type)
|
||||
# Interface args are our first choice for the value.
|
||||
if arg.name in execution_interface:
|
||||
value = execution_interface[arg.name]
|
||||
else:
|
||||
# If a default exists, we can use that, but...
|
||||
if arg.default is not None:
|
||||
value = arg.default
|
||||
# We should prefer an argument passed through the
|
||||
# job_configs that maps to the same location.
|
||||
if arg.mapping_type != "args":
|
||||
value = typed_configs.get(arg.location, value)
|
||||
if value is not nonexistent:
|
||||
if arg.mapping_type != "args":
|
||||
typed_configs[arg.location] = value
|
||||
else:
|
||||
positional_args[int(arg.location)] = value
|
||||
if positional_args:
|
||||
positional_args = [positional_args[i] for i
|
||||
in range(len(positional_args))]
|
||||
configs["args"] = positional_args + configs["args"]
|
||||
if configs and not job_ex.job_configs:
|
||||
job_ex.job_configs = configs
|
||||
|
||||
|
||||
def job_execution_create(context, values):
|
||||
session = get_session()
|
||||
execution_interface = values.pop('interface', {})
|
||||
job_ex = m.JobExecution()
|
||||
job_ex.update(values)
|
||||
|
||||
try:
|
||||
with session.begin():
|
||||
job_ex.interface = []
|
||||
job = _job_get(context, session, job_ex.job_id)
|
||||
if job.interface:
|
||||
_merge_execution_interface(job_ex, job, execution_interface)
|
||||
session.add(job_ex)
|
||||
except db_exc.DBDuplicateEntry as e:
|
||||
raise ex.DBDuplicateEntry(
|
||||
@ -784,23 +827,36 @@ def _append_job_binaries(context, session, from_list, to_list):
|
||||
to_list.append(job_binary)
|
||||
|
||||
|
||||
def _append_interface(context, from_list, to_list):
|
||||
for order, argument_values in enumerate(from_list):
|
||||
argument_values['tenant_id'] = context.tenant_id
|
||||
argument_values['order'] = order
|
||||
argument = m.JobInterfaceArgument()
|
||||
argument.update(argument_values)
|
||||
to_list.append(argument)
|
||||
|
||||
|
||||
def job_create(context, values):
|
||||
mains = values.pop("mains", [])
|
||||
libs = values.pop("libs", [])
|
||||
interface = values.pop("interface", [])
|
||||
|
||||
session = get_session()
|
||||
try:
|
||||
with session.begin():
|
||||
job = m.Job()
|
||||
job.update(values)
|
||||
# libs and mains are 'lazy' objects. The initialization below
|
||||
# is needed here because it provides libs and mains to be
|
||||
# initialized within a session even if the lists are empty
|
||||
# These are 'lazy' objects. The initialization below
|
||||
# is needed here because it provides libs, mains, and
|
||||
# interface to be initialized within a session even if
|
||||
# the lists are empty
|
||||
job.mains = []
|
||||
job.libs = []
|
||||
job.interface = []
|
||||
|
||||
_append_job_binaries(context, session, mains, job.mains)
|
||||
_append_job_binaries(context, session, libs, job.libs)
|
||||
_append_interface(context, interface, job.interface)
|
||||
|
||||
session.add(job)
|
||||
|
||||
|
@ -301,6 +301,7 @@ class JobExecution(mb.SaharaBase):
|
||||
extra = sa.Column(st.JsonDictType())
|
||||
data_source_urls = sa.Column(st.JsonDictType())
|
||||
|
||||
|
||||
mains_association = sa.Table("mains_association",
|
||||
mb.SaharaBase.metadata,
|
||||
sa.Column("Job_id",
|
||||
@ -344,13 +345,44 @@ class Job(mb.SaharaBase):
|
||||
libs = relationship("JobBinary",
|
||||
secondary=libs_association, lazy="joined")
|
||||
|
||||
interface = relationship('JobInterfaceArgument',
|
||||
cascade="all,delete",
|
||||
order_by="JobInterfaceArgument.order",
|
||||
backref='job',
|
||||
lazy='joined')
|
||||
|
||||
def to_dict(self):
|
||||
d = super(Job, self).to_dict()
|
||||
d['mains'] = [jb.to_dict() for jb in self.mains]
|
||||
d['libs'] = [jb.to_dict() for jb in self.libs]
|
||||
d['interface'] = [arg.to_dict() for arg in self.interface]
|
||||
return d
|
||||
|
||||
|
||||
class JobInterfaceArgument(mb.SaharaBase):
|
||||
"""JobInterfaceArgument - Configuration setting for a specific job."""
|
||||
|
||||
__tablename__ = 'job_interface_arguments'
|
||||
|
||||
__table_args__ = (
|
||||
sa.UniqueConstraint('job_id', 'name'),
|
||||
sa.UniqueConstraint('job_id', 'order')
|
||||
)
|
||||
|
||||
id = _id_column()
|
||||
job_id = sa.Column(sa.String(36), sa.ForeignKey('jobs.id'),
|
||||
nullable=False)
|
||||
tenant_id = sa.Column(sa.String(36))
|
||||
name = sa.Column(sa.String(80), nullable=False)
|
||||
description = sa.Column(sa.Text())
|
||||
mapping_type = sa.Column(sa.String(80), nullable=False)
|
||||
location = sa.Column(sa.Text(), nullable=False)
|
||||
value_type = sa.Column(sa.String(80), nullable=False)
|
||||
required = sa.Column(sa.Boolean(), nullable=False)
|
||||
order = sa.Column(sa.SmallInteger(), nullable=False)
|
||||
default = sa.Column(sa.Text())
|
||||
|
||||
|
||||
class JobBinaryInternal(mb.SaharaBase):
|
||||
"""JobBinaryInternal - raw binary storage for executable jobs."""
|
||||
|
||||
|
@ -111,6 +111,7 @@ def execute_job(job_id, data):
|
||||
# Elements common to all job types
|
||||
cluster_id = data['cluster_id']
|
||||
configs = data.get('job_configs', {})
|
||||
interface = data.get('interface', {})
|
||||
|
||||
# Not in Java job types but present for all others
|
||||
input_id = data.get('input_id', None)
|
||||
@ -121,7 +122,8 @@ def execute_job(job_id, data):
|
||||
job_ex_dict = {'input_id': input_id, 'output_id': output_id,
|
||||
'job_id': job_id, 'cluster_id': cluster_id,
|
||||
'info': {'status': edp.JOB_STATUS_PENDING},
|
||||
'job_configs': configs, 'extra': {}}
|
||||
'job_configs': configs, 'extra': {},
|
||||
'interface': interface}
|
||||
job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict)
|
||||
context.set_current_job_execution_id(job_execution.id)
|
||||
|
||||
|
@ -16,8 +16,10 @@
|
||||
import sahara.exceptions as e
|
||||
from sahara.i18n import _
|
||||
from sahara.service.edp import api
|
||||
from sahara.service.validations.edp import job_interface as j_i
|
||||
from sahara.utils import edp
|
||||
|
||||
|
||||
JOB_SCHEMA = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@ -52,7 +54,8 @@ JOB_SCHEMA = {
|
||||
},
|
||||
"streaming": {
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"interface": j_i.INTERFACE_ARGUMENT_SCHEMA
|
||||
},
|
||||
"additionalProperties": False,
|
||||
"required": [
|
||||
@ -104,3 +107,7 @@ def check_mains_libs(data, **kwargs):
|
||||
# Make sure that all referenced binaries exist
|
||||
_check_binaries(mains)
|
||||
_check_binaries(libs)
|
||||
|
||||
|
||||
def check_interface(data, **kwargs):
|
||||
j_i.check_job_interface(data, **kwargs)
|
||||
|
@ -19,6 +19,7 @@ from sahara import exceptions as ex
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins import base as plugin_base
|
||||
import sahara.service.validations.edp.base as b
|
||||
import sahara.service.validations.edp.job_interface as j_i
|
||||
|
||||
JOB_EXEC_SCHEMA = {
|
||||
"type": "object",
|
||||
@ -35,6 +36,9 @@ JOB_EXEC_SCHEMA = {
|
||||
"type": "string",
|
||||
"format": "uuid",
|
||||
},
|
||||
"interface": {
|
||||
"type": "simple_config",
|
||||
},
|
||||
"job_configs": b.job_configs,
|
||||
},
|
||||
"additionalProperties": False,
|
||||
@ -92,6 +96,7 @@ def check_job_execution(data, job_id):
|
||||
"'%(job_type)s'") % {"cluster_id": cluster.id,
|
||||
"job_type": job.type})
|
||||
|
||||
j_i.check_execution_interface(data, job)
|
||||
edp_engine.validate_job_execution(cluster, job, data)
|
||||
|
||||
|
||||
|
204
sahara/service/validations/edp/job_interface.py
Normal file
204
sahara/service/validations/edp/job_interface.py
Normal file
@ -0,0 +1,204 @@
|
||||
# Copyright (c) 2015 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
from six.moves.urllib import parse as urlparse
|
||||
|
||||
import sahara.exceptions as e
|
||||
from sahara.i18n import _
|
||||
from sahara.service.validations.edp import base as b
|
||||
from sahara.utils import edp
|
||||
|
||||
|
||||
DATA_TYPE_STRING = "string"
|
||||
DATA_TYPE_NUMBER = "number"
|
||||
DATA_TYPE_DATA_SOURCE = "data_source"
|
||||
|
||||
DATA_TYPES = [DATA_TYPE_STRING,
|
||||
DATA_TYPE_NUMBER,
|
||||
DATA_TYPE_DATA_SOURCE]
|
||||
DEFAULT_DATA_TYPE = DATA_TYPE_STRING
|
||||
|
||||
|
||||
INTERFACE_ARGUMENT_SCHEMA = {
|
||||
"type": ["array", "null"],
|
||||
"uniqueItems": True,
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"minLength": 1
|
||||
},
|
||||
"description": {
|
||||
"type": ["string", "null"]
|
||||
},
|
||||
"mapping_type": {
|
||||
"type": "string",
|
||||
"enum": ["args", "configs", "params"]
|
||||
},
|
||||
"location": {
|
||||
"type": "string",
|
||||
"minLength": 1
|
||||
},
|
||||
"value_type": {
|
||||
"type": "string",
|
||||
"enum": DATA_TYPES,
|
||||
"default": "string"
|
||||
},
|
||||
"required": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"default": {
|
||||
"type": ["string", "null"]
|
||||
}
|
||||
},
|
||||
"additionalProperties": False,
|
||||
"required": ["name", "mapping_type", "location", "required"]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def _check_job_interface(data, interface):
|
||||
names = set(arg["name"] for arg in interface)
|
||||
if len(names) != len(interface):
|
||||
raise e.InvalidDataException(
|
||||
_("Name must be unique within the interface for any job."))
|
||||
|
||||
mapping_types = set(arg["mapping_type"] for arg in interface)
|
||||
acceptable_types = edp.JOB_TYPES_ACCEPTABLE_CONFIGS[data["type"]]
|
||||
if any(m_type not in acceptable_types for m_type in mapping_types):
|
||||
args = {"mapping_types": str(list(acceptable_types)),
|
||||
"job_type": data["type"]}
|
||||
raise e.InvalidDataException(
|
||||
_("Only mapping types %(mapping_types)s are allowed for job type "
|
||||
"%(job_type)s.") % args)
|
||||
|
||||
positional_args = [arg for arg in interface
|
||||
if arg["mapping_type"] == "args"]
|
||||
if not all(six.text_type(arg["location"]).isnumeric()
|
||||
for arg in positional_args):
|
||||
raise e.InvalidDataException(
|
||||
_("Locations of positional arguments must be an unbroken integer "
|
||||
"sequence ascending from 0."))
|
||||
locations = set(int(arg["location"]) for arg in positional_args)
|
||||
if not all(i in locations for i in range(len(locations))):
|
||||
raise e.InvalidDataException(
|
||||
_("Locations of positional arguments must be an unbroken integer "
|
||||
"sequence ascending from 0."))
|
||||
|
||||
not_required = (arg for arg in positional_args if not arg["required"])
|
||||
if not all(arg.get("default", None) for arg in not_required):
|
||||
raise e.InvalidDataException(
|
||||
_("Positional arguments must be given default values if they are "
|
||||
"not required."))
|
||||
|
||||
mappings = ((arg["mapping_type"], arg["location"]) for arg in interface)
|
||||
if len(set(mappings)) != len(interface):
|
||||
raise e.InvalidDataException(
|
||||
_("The combination of mapping type and location must be unique "
|
||||
"within the interface for any job."))
|
||||
|
||||
for arg in interface:
|
||||
if "value_type" not in arg:
|
||||
arg["value_type"] = DEFAULT_DATA_TYPE
|
||||
default = arg.get("default", None)
|
||||
if default is not None:
|
||||
_validate_value(arg["value_type"], default)
|
||||
|
||||
|
||||
def check_job_interface(data, **kwargs):
|
||||
interface = data.get("interface", [])
|
||||
if interface:
|
||||
_check_job_interface(data, interface)
|
||||
|
||||
|
||||
def _validate_data_source(value):
|
||||
if uuidutils.is_uuid_like(value):
|
||||
b.check_data_source_exists(value)
|
||||
else:
|
||||
if not urlparse.urlparse(value).scheme:
|
||||
raise e.InvalidDataException(
|
||||
_("Data source value '%s' is neither a valid data source ID "
|
||||
"nor a valid URL.") % value)
|
||||
|
||||
|
||||
def _validate_number(value):
|
||||
if not six.text_type(value).isnumeric():
|
||||
raise e.InvalidDataException(
|
||||
_("Value '%s' is not a valid number.") % value)
|
||||
|
||||
|
||||
def _validate_string(value):
|
||||
if not isinstance(value, six.string_types):
|
||||
raise e.InvalidDataException(
|
||||
_("Value '%s' is not a valid string.") % value)
|
||||
|
||||
|
||||
_value_type_validators = {
|
||||
DATA_TYPE_STRING: _validate_string,
|
||||
DATA_TYPE_NUMBER: _validate_number,
|
||||
DATA_TYPE_DATA_SOURCE: _validate_data_source
|
||||
}
|
||||
|
||||
|
||||
def _validate_value(type, value):
|
||||
_value_type_validators[type](value)
|
||||
|
||||
|
||||
def check_execution_interface(data, job):
|
||||
job_int = {arg.name: arg for arg in job.interface}
|
||||
execution_int = data.get("interface", None)
|
||||
|
||||
if not (job_int or execution_int):
|
||||
return
|
||||
if job_int and execution_int is None:
|
||||
raise e.InvalidDataException(
|
||||
_("An interface was specified with the template for this job. "
|
||||
"Please pass an interface map with this job (even if empty)."))
|
||||
|
||||
execution_names = set(execution_int.keys())
|
||||
|
||||
definition_names = set(job_int.keys())
|
||||
not_found_names = execution_names - definition_names
|
||||
if not_found_names:
|
||||
raise e.InvalidDataException(
|
||||
_("Argument names: %s were not found in the interface for this "
|
||||
"job.") % str(list(not_found_names)))
|
||||
|
||||
required_names = {arg.name for arg in job.interface if arg.required}
|
||||
unset_names = required_names - execution_names
|
||||
if unset_names:
|
||||
raise e.InvalidDataException(_("Argument names: %s are required for "
|
||||
"this job.") % str(list(unset_names)))
|
||||
|
||||
nonexistent = object()
|
||||
for name, value in six.iteritems(execution_int):
|
||||
arg = job_int[name]
|
||||
_validate_value(arg.value_type, value)
|
||||
if arg.mapping_type == "args":
|
||||
continue
|
||||
typed_configs = data.get("job_configs", {}).get(arg.mapping_type, {})
|
||||
config_value = typed_configs.get(arg.location, nonexistent)
|
||||
if config_value is not nonexistent and config_value != value:
|
||||
args = {"name": name,
|
||||
"mapping_type": arg.mapping_type,
|
||||
"location": arg.location}
|
||||
raise e.InvalidDataException(
|
||||
_("Argument '%(name)s' was passed both through the interface "
|
||||
"and in location '%(mapping_type)s'.'%(location)s'. Please "
|
||||
"pass this through either the interface or the "
|
||||
"configuration maps, not both.") % args)
|
@ -27,7 +27,7 @@ from sahara.utils import edp
|
||||
|
||||
|
||||
SAMPLE_DATA_SOURCE = {
|
||||
"tenant_id": "test_tenant",
|
||||
"tenant_id": "tenant_1",
|
||||
"name": "ngt_test",
|
||||
"description": "test_desc",
|
||||
"type": "Cassandra",
|
||||
@ -39,7 +39,7 @@ SAMPLE_DATA_SOURCE = {
|
||||
}
|
||||
|
||||
SAMPLE_JOB = {
|
||||
"tenant_id": "test_tenant",
|
||||
"tenant_id": "tenant_1",
|
||||
"name": "job_test",
|
||||
"description": "test_desc",
|
||||
"type": edp.JOB_TYPE_PIG,
|
||||
@ -47,7 +47,7 @@ SAMPLE_JOB = {
|
||||
}
|
||||
|
||||
SAMPLE_JOB_EXECUTION = {
|
||||
"tenant_id": "tenant_id",
|
||||
"tenant_id": "tenant_1",
|
||||
"return_code": "1",
|
||||
"job_id": "undefined",
|
||||
"input_id": "undefined",
|
||||
@ -57,7 +57,7 @@ SAMPLE_JOB_EXECUTION = {
|
||||
}
|
||||
|
||||
SAMPLE_CONF_JOB_EXECUTION = {
|
||||
"tenant_id": "tenant_id",
|
||||
"tenant_id": "tenant_1",
|
||||
"progress": "0.1",
|
||||
"return_code": "1",
|
||||
"job_id": "undefined",
|
||||
|
115
sahara/tests/unit/conductor/manager/test_edp_interface.py
Normal file
115
sahara/tests/unit/conductor/manager/test_edp_interface.py
Normal file
@ -0,0 +1,115 @@
|
||||
# Copyright (c) 2015 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from sahara import context
|
||||
import sahara.tests.unit.conductor.base as test_base
|
||||
from sahara.tests.unit.conductor.manager import test_edp
|
||||
|
||||
|
||||
def _merge_dict(original, update):
|
||||
new = copy.deepcopy(original)
|
||||
new.update(update)
|
||||
return new
|
||||
|
||||
|
||||
SAMPLE_JOB = _merge_dict(test_edp.SAMPLE_JOB, {
|
||||
"interface": [
|
||||
{
|
||||
"name": "Reducer Count",
|
||||
"mapping_type": "configs",
|
||||
"location": "mapred.reduce.tasks",
|
||||
"value_type": "number",
|
||||
"required": True,
|
||||
"default": "1"
|
||||
},
|
||||
{
|
||||
"name": "Input Path",
|
||||
"mapping_type": "params",
|
||||
"location": "INPUT",
|
||||
"value_type": "data_source",
|
||||
"required": False,
|
||||
"default": "hdfs://path"
|
||||
},
|
||||
{
|
||||
"name": "Positional Argument 2",
|
||||
"mapping_type": "args",
|
||||
"location": "1",
|
||||
"value_type": "string",
|
||||
"required": False,
|
||||
"default": "default"
|
||||
},
|
||||
{
|
||||
"name": "Positional Argument 1",
|
||||
"mapping_type": "args",
|
||||
"location": "0",
|
||||
"value_type": "string",
|
||||
"required": False,
|
||||
"default": "arg_1"
|
||||
},
|
||||
|
||||
]
|
||||
})
|
||||
|
||||
SAMPLE_JOB_EXECUTION = _merge_dict(test_edp.SAMPLE_JOB, {
|
||||
"interface": {
|
||||
"Reducer Count": "2",
|
||||
"Positional Argument 2": "arg_2"
|
||||
},
|
||||
"job_configs": {"args": ["arg_3"], "configs": {"mapred.map.tasks": "3"}}
|
||||
})
|
||||
|
||||
|
||||
class JobExecutionTest(test_base.ConductorManagerTestCase):
|
||||
def test_interface_flows(self):
|
||||
ctx = context.ctx()
|
||||
job = self.api.job_create(ctx, SAMPLE_JOB)
|
||||
arg_names = [arg['name'] for arg in job['interface']]
|
||||
self.assertEqual(arg_names, ["Reducer Count", "Input Path",
|
||||
"Positional Argument 2",
|
||||
"Positional Argument 1"])
|
||||
|
||||
job_ex_input = copy.deepcopy(SAMPLE_JOB_EXECUTION)
|
||||
job_ex_input['job_id'] = job['id']
|
||||
|
||||
self.api.job_execution_create(ctx, job_ex_input)
|
||||
|
||||
lst = self.api.job_execution_get_all(ctx)
|
||||
self.assertEqual(1, len(lst))
|
||||
|
||||
job_ex_result = lst[0]
|
||||
configs = {
|
||||
'configs': {'mapred.reduce.tasks': '2',
|
||||
'mapred.map.tasks': '3'},
|
||||
'args': ['arg_1', 'arg_2', 'arg_3'],
|
||||
'params': {'INPUT': 'hdfs://path'}
|
||||
}
|
||||
self.assertEqual(configs, job_ex_result['job_configs'])
|
||||
self.api.job_execution_destroy(ctx, job_ex_result['id'])
|
||||
|
||||
del job_ex_input['job_configs']
|
||||
self.api.job_execution_create(ctx, job_ex_input)
|
||||
|
||||
lst = self.api.job_execution_get_all(ctx)
|
||||
self.assertEqual(1, len(lst))
|
||||
|
||||
job_ex_result = lst[0]
|
||||
configs = {
|
||||
'configs': {'mapred.reduce.tasks': '2'},
|
||||
'args': ['arg_1', 'arg_2'],
|
||||
'params': {'INPUT': 'hdfs://path'}
|
||||
}
|
||||
self.assertEqual(configs, job_ex_result['job_configs'])
|
@ -43,7 +43,7 @@ class SaharaMigrationsCheckers(object):
|
||||
t = db_utils.get_table(engine, table)
|
||||
self.assertIn(column, t.c)
|
||||
|
||||
def assertColumnsExists(self, engine, table, columns):
|
||||
def assertColumnsExist(self, engine, table, columns):
|
||||
for column in columns:
|
||||
self.assertColumnExists(engine, table, column)
|
||||
|
||||
@ -90,7 +90,7 @@ class SaharaMigrationsCheckers(object):
|
||||
'data',
|
||||
'datasize'
|
||||
]
|
||||
self.assertColumnsExists(
|
||||
self.assertColumnsExist(
|
||||
engine, 'job_binary_internal', job_binary_internal_columns)
|
||||
self.assertColumnCount(
|
||||
engine, 'job_binary_internal', job_binary_internal_columns)
|
||||
@ -113,7 +113,7 @@ class SaharaMigrationsCheckers(object):
|
||||
'volume_mount_prefix',
|
||||
'floating_ip_pool'
|
||||
]
|
||||
self.assertColumnsExists(
|
||||
self.assertColumnsExist(
|
||||
engine, 'node_group_templates', node_group_templates_columns)
|
||||
self.assertColumnCount(
|
||||
engine, 'node_group_templates', node_group_templates_columns)
|
||||
@ -129,7 +129,7 @@ class SaharaMigrationsCheckers(object):
|
||||
'url',
|
||||
'credentials'
|
||||
]
|
||||
self.assertColumnsExists(
|
||||
self.assertColumnsExist(
|
||||
engine, 'data_sources', data_sources_columns)
|
||||
self.assertColumnCount(
|
||||
engine, 'data_sources', data_sources_columns)
|
||||
@ -148,7 +148,7 @@ class SaharaMigrationsCheckers(object):
|
||||
'plugin_name',
|
||||
'hadoop_version'
|
||||
]
|
||||
self.assertColumnsExists(
|
||||
self.assertColumnsExist(
|
||||
engine, 'cluster_templates', cluster_templates_columns)
|
||||
self.assertColumnCount(
|
||||
engine, 'cluster_templates', cluster_templates_columns)
|
||||
@ -163,7 +163,7 @@ class SaharaMigrationsCheckers(object):
|
||||
'url',
|
||||
'extra'
|
||||
]
|
||||
self.assertColumnsExists(
|
||||
self.assertColumnsExist(
|
||||
engine, 'job_binaries', job_binaries_columns)
|
||||
self.assertColumnCount(
|
||||
engine, 'job_binaries', job_binaries_columns)
|
||||
@ -177,7 +177,7 @@ class SaharaMigrationsCheckers(object):
|
||||
'description',
|
||||
'type'
|
||||
]
|
||||
self.assertColumnsExists(engine, 'jobs', jobs_columns)
|
||||
self.assertColumnsExist(engine, 'jobs', jobs_columns)
|
||||
self.assertColumnCount(engine, 'jobs', jobs_columns)
|
||||
|
||||
templates_relations_columns = [
|
||||
@ -198,7 +198,7 @@ class SaharaMigrationsCheckers(object):
|
||||
'node_group_template_id',
|
||||
'floating_ip_pool'
|
||||
]
|
||||
self.assertColumnsExists(
|
||||
self.assertColumnsExist(
|
||||
engine, 'templates_relations', templates_relations_columns)
|
||||
self.assertColumnCount(
|
||||
engine, 'templates_relations', templates_relations_columns)
|
||||
@ -207,7 +207,7 @@ class SaharaMigrationsCheckers(object):
|
||||
'Job_id',
|
||||
'JobBinary_id'
|
||||
]
|
||||
self.assertColumnsExists(
|
||||
self.assertColumnsExist(
|
||||
engine, 'mains_association', mains_association_columns)
|
||||
self.assertColumnCount(
|
||||
engine, 'mains_association', mains_association_columns)
|
||||
@ -216,7 +216,7 @@ class SaharaMigrationsCheckers(object):
|
||||
'Job_id',
|
||||
'JobBinary_id'
|
||||
]
|
||||
self.assertColumnsExists(
|
||||
self.assertColumnsExist(
|
||||
engine, 'libs_association', libs_association_columns)
|
||||
self.assertColumnCount(
|
||||
engine, 'libs_association', libs_association_columns)
|
||||
@ -245,7 +245,7 @@ class SaharaMigrationsCheckers(object):
|
||||
'extra',
|
||||
'cluster_template_id'
|
||||
]
|
||||
self.assertColumnsExists(engine, 'clusters', clusters_columns)
|
||||
self.assertColumnsExist(engine, 'clusters', clusters_columns)
|
||||
self.assertColumnCount(engine, 'clusters', clusters_columns)
|
||||
|
||||
node_groups_columns = [
|
||||
@ -267,7 +267,7 @@ class SaharaMigrationsCheckers(object):
|
||||
'node_group_template_id',
|
||||
'floating_ip_pool'
|
||||
]
|
||||
self.assertColumnsExists(engine, 'node_groups', node_groups_columns)
|
||||
self.assertColumnsExist(engine, 'node_groups', node_groups_columns)
|
||||
self.assertColumnCount(engine, 'node_groups', node_groups_columns)
|
||||
|
||||
job_executions_columns = [
|
||||
@ -288,7 +288,7 @@ class SaharaMigrationsCheckers(object):
|
||||
'job_configs',
|
||||
'extra'
|
||||
]
|
||||
self.assertColumnsExists(
|
||||
self.assertColumnsExist(
|
||||
engine, 'job_executions', job_executions_columns)
|
||||
self.assertColumnCount(
|
||||
engine, 'job_executions', job_executions_columns)
|
||||
@ -305,7 +305,7 @@ class SaharaMigrationsCheckers(object):
|
||||
'management_ip',
|
||||
'volumes'
|
||||
]
|
||||
self.assertColumnsExists(engine, 'instances', instances_columns)
|
||||
self.assertColumnsExist(engine, 'instances', instances_columns)
|
||||
self.assertColumnCount(engine, 'instances', instances_columns)
|
||||
|
||||
self._data_001(engine, data)
|
||||
@ -425,11 +425,11 @@ class SaharaMigrationsCheckers(object):
|
||||
|
||||
self.assertColumnCount(engine, 'cluster_provision_steps',
|
||||
provision_steps_columns)
|
||||
self.assertColumnsExists(engine, 'cluster_provision_steps',
|
||||
provision_steps_columns)
|
||||
self.assertColumnsExist(engine, 'cluster_provision_steps',
|
||||
provision_steps_columns)
|
||||
|
||||
self.assertColumnCount(engine, 'cluster_events', events_columns)
|
||||
self.assertColumnsExists(engine, 'cluster_events', events_columns)
|
||||
self.assertColumnsExist(engine, 'cluster_events', events_columns)
|
||||
|
||||
def _check_016(self, engine, data):
|
||||
self.assertColumnExists(engine, 'node_group_templates',
|
||||
@ -464,6 +464,26 @@ class SaharaMigrationsCheckers(object):
|
||||
def _check_021(self, engine, data):
|
||||
self.assertColumnExists(engine, 'job_executions', 'data_source_urls')
|
||||
|
||||
def _check_022(self, engine, data):
|
||||
columns = [
|
||||
'created_at',
|
||||
'updated_at',
|
||||
'id',
|
||||
'job_id',
|
||||
'tenant_id',
|
||||
'name',
|
||||
'description',
|
||||
'mapping_type',
|
||||
'location',
|
||||
'value_type',
|
||||
'required',
|
||||
'order',
|
||||
'default'
|
||||
]
|
||||
|
||||
self.assertColumnCount(engine, 'job_interface_arguments', columns)
|
||||
self.assertColumnsExist(engine, 'job_interface_arguments', columns)
|
||||
|
||||
|
||||
class TestMigrationsMySQL(SaharaMigrationsCheckers,
|
||||
base.BaseWalkMigrationTestCase,
|
||||
|
@ -41,6 +41,7 @@ def _create_job(id, job_binary, type):
|
||||
job.id = id
|
||||
job.type = type
|
||||
job.name = 'special_name'
|
||||
job.interface = []
|
||||
if edp.compare_job_type(type, edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE):
|
||||
job.mains = [job_binary]
|
||||
job.libs = None
|
||||
@ -91,6 +92,8 @@ def _create_job_exec(job_id, type, configs=None):
|
||||
j_exec.id = six.text_type(uuid.uuid4())
|
||||
j_exec.job_id = job_id
|
||||
j_exec.job_configs = configs
|
||||
if not j_exec.job_configs:
|
||||
j_exec.job_configs = {}
|
||||
if edp.compare_job_type(type, edp.JOB_TYPE_JAVA):
|
||||
j_exec.job_configs['configs']['edp.java.main_class'] = _java_main_class
|
||||
j_exec.job_configs['configs']['edp.java.java_opts'] = _java_opts
|
||||
@ -100,8 +103,6 @@ def _create_job_exec(job_id, type, configs=None):
|
||||
def _create_job_exec_with_proxy(job_id, type, configs=None):
|
||||
j_exec = _create_job_exec(job_id, type, configs)
|
||||
j_exec.id = '00000000-1111-2222-3333-4444444444444444'
|
||||
if not j_exec.job_configs:
|
||||
j_exec.job_configs = {}
|
||||
j_exec.job_configs['proxy_configs'] = {
|
||||
'proxy_username': 'job_' + j_exec.id,
|
||||
'proxy_password': '55555555-6666-7777-8888-999999999999',
|
||||
|
@ -46,7 +46,7 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
@mock.patch('sahara.conductor.api.LocalApi.job_get')
|
||||
def test_streaming(self, get_job, get_data_source, get_cluster):
|
||||
get_job.return_value = mock.Mock(
|
||||
type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[])
|
||||
type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[], interface=[])
|
||||
|
||||
ds1_id = six.text_type(uuid.uuid4())
|
||||
ds2_id = six.text_type(uuid.uuid4())
|
||||
@ -95,7 +95,7 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
@mock.patch('sahara.conductor.api.LocalApi.job_get')
|
||||
def test_data_sources_differ(self, get_job, get_data_source, get_cluster):
|
||||
get_job.return_value = mock.Mock(
|
||||
type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[])
|
||||
type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[], interface=[])
|
||||
|
||||
ds1_id = six.text_type(uuid.uuid4())
|
||||
ds2_id = six.text_type(uuid.uuid4())
|
||||
@ -147,7 +147,8 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
|
||||
@mock.patch('sahara.conductor.api.LocalApi.job_get')
|
||||
def test_check_edp_no_oozie(self, get_job, get_cluster):
|
||||
get_job.return_value = mock.Mock(type=edp.JOB_TYPE_PIG, libs=[])
|
||||
get_job.return_value = mock.Mock(type=edp.JOB_TYPE_PIG, libs=[],
|
||||
interface=[])
|
||||
|
||||
ng = tu.make_ng_dict('master', 42, ['namenode'], 1,
|
||||
instances=[tu.make_inst_dict('id', 'name')])
|
||||
@ -173,7 +174,7 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
|
||||
# Note that this means we cannot use assert_create_object_validation()
|
||||
# because it calls start_patch() and will override our setting
|
||||
job = mock.Mock(type=edp.JOB_TYPE_SPARK, mains=["main"])
|
||||
job = mock.Mock(type=edp.JOB_TYPE_SPARK, mains=["main"], interface=[])
|
||||
get_job.return_value = job
|
||||
ng = tu.make_ng_dict('master', 42, [], 1,
|
||||
instances=[tu.make_inst_dict('id', 'name')])
|
||||
@ -190,8 +191,8 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
|
||||
@mock.patch('sahara.conductor.api.LocalApi.job_get')
|
||||
def test_edp_main_class_java(self, job_get, cluster_get):
|
||||
job_get.return_value = mock.Mock()
|
||||
job_get.return_value.type = edp.JOB_TYPE_JAVA
|
||||
job_get.return_value = mock.Mock(type=edp.JOB_TYPE_JAVA,
|
||||
interface=[])
|
||||
ng = tu.make_ng_dict('master', 42, ['namenode', 'oozie'], 1,
|
||||
instances=[tu.make_inst_dict('id', 'name')])
|
||||
cluster_get.return_value = tu.create_cluster("cluster", "tenant1",
|
||||
@ -221,7 +222,8 @@ class TestJobExecValidation(u.ValidationTestCase):
|
||||
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
|
||||
@mock.patch('sahara.conductor.api.LocalApi.job_get')
|
||||
def test_edp_main_class_spark(self, job_get, cluster_get):
|
||||
job_get.return_value = mock.Mock(type=edp.JOB_TYPE_SPARK)
|
||||
job_get.return_value = mock.Mock(type=edp.JOB_TYPE_SPARK,
|
||||
interface=[])
|
||||
ng = tu.make_ng_dict('master', 42, ['namenode'], 1,
|
||||
instances=[tu.make_inst_dict('id', 'name')])
|
||||
cluster_get.return_value = tu.create_cluster("cluster", "tenant1",
|
||||
|
299
sahara/tests/unit/service/validation/edp/test_job_interface.py
Normal file
299
sahara/tests/unit/service/validation/edp/test_job_interface.py
Normal file
@ -0,0 +1,299 @@
|
||||
# Copyright (c) 2015 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import itertools
|
||||
|
||||
import mock
|
||||
|
||||
from sahara.service.validations.edp import job as j
|
||||
from sahara.service.validations.edp import job_execution as j_e
|
||||
from sahara.service.validations.edp import job_interface as j_i
|
||||
from sahara.tests.unit.service.validation import utils as u
|
||||
from sahara.utils import edp
|
||||
|
||||
|
||||
def _configs(**kwargs):
|
||||
arg = {
|
||||
"name": "Reducer Count",
|
||||
"mapping_type": "configs",
|
||||
"location": "mapred.reduce.tasks",
|
||||
"value_type": "number",
|
||||
"required": True,
|
||||
"default": "1"
|
||||
}
|
||||
arg.update(kwargs)
|
||||
return arg
|
||||
|
||||
|
||||
def _params(**kwargs):
|
||||
arg = {
|
||||
"name": "Input Path",
|
||||
"mapping_type": "params",
|
||||
"location": "INPUT",
|
||||
"value_type": "data_source",
|
||||
"required": False,
|
||||
"default": "hdfs://path"
|
||||
}
|
||||
arg.update(kwargs)
|
||||
return arg
|
||||
|
||||
|
||||
def _args(**kwargs):
|
||||
arg = {
|
||||
"name": "Positional Argument",
|
||||
"mapping_type": "args",
|
||||
"location": "0",
|
||||
"value_type": "string",
|
||||
"required": False,
|
||||
"default": "arg_value"
|
||||
}
|
||||
arg.update(kwargs)
|
||||
return arg
|
||||
|
||||
|
||||
_mapping_types = {"configs", "args", "params"}
|
||||
|
||||
|
||||
def _job(job_type, interface):
|
||||
return {"name": "job", "type": job_type, "interface": interface}
|
||||
|
||||
|
||||
_job_types = [
|
||||
_job(edp.JOB_TYPE_HIVE, [_configs(), _params()]),
|
||||
_job(edp.JOB_TYPE_PIG, [_configs(), _params(), _args()]),
|
||||
_job(edp.JOB_TYPE_MAPREDUCE, [_configs()]),
|
||||
_job(edp.JOB_TYPE_MAPREDUCE_STREAMING, [_configs()]),
|
||||
_job(edp.JOB_TYPE_JAVA, [_configs(), _args()]),
|
||||
_job(edp.JOB_TYPE_SHELL, [_configs(), _params(), _args()]),
|
||||
_job(edp.JOB_TYPE_SPARK, [_configs(), _args()]),
|
||||
_job(edp.JOB_TYPE_STORM, [_args()])
|
||||
]
|
||||
|
||||
|
||||
class TestJobInterfaceValidation(u.ValidationTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestJobInterfaceValidation, self).setUp()
|
||||
self._create_object_fun = j.check_interface
|
||||
self.scheme = j.JOB_SCHEMA
|
||||
|
||||
def test_interface(self):
|
||||
for job in _job_types:
|
||||
self._assert_create_object_validation(data=job)
|
||||
|
||||
def test_no_interface(self):
|
||||
job = _job(edp.JOB_TYPE_PIG, None)
|
||||
self._assert_create_object_validation(data=job)
|
||||
|
||||
def test_overlapping_names(self):
|
||||
job = _job(edp.JOB_TYPE_PIG,
|
||||
[_configs(), _configs(location="mapred.map.tasks")])
|
||||
self._assert_create_object_validation(
|
||||
data=job, bad_req_i=(
|
||||
1, "INVALID_DATA",
|
||||
"Name must be unique within the interface for any job."))
|
||||
|
||||
def test_unacceptable_types(self):
|
||||
for job in _job_types:
|
||||
acceptable_types = [arg['mapping_type']
|
||||
for arg in job['interface']]
|
||||
unacceptable_types = _mapping_types - set(acceptable_types)
|
||||
unacceptable_args = (globals()["_" + m_type]()
|
||||
for m_type in unacceptable_types)
|
||||
bad_job = job.copy()
|
||||
for arg in unacceptable_args:
|
||||
bad_job['interface'] = [arg]
|
||||
permutations = itertools.permutations(acceptable_types)
|
||||
msgs = ["Only mapping types %s are allowed for job type %s." %
|
||||
(list(permutation), bad_job['type'])
|
||||
for permutation in permutations]
|
||||
self._assert_create_object_validation(
|
||||
data=bad_job, bad_req_i=(1, "INVALID_DATA", msgs))
|
||||
|
||||
def test_bad_positional_arg_locations(self):
|
||||
job = _job(edp.JOB_TYPE_PIG, [_args(location="1")])
|
||||
self._assert_create_object_validation(
|
||||
data=job, bad_req_i=(
|
||||
1, "INVALID_DATA",
|
||||
"Locations of positional arguments must be an unbroken "
|
||||
"integer sequence ascending from 0."))
|
||||
|
||||
job = _job(edp.JOB_TYPE_PIG, [_args(location="fish")])
|
||||
self._assert_create_object_validation(
|
||||
data=job, bad_req_i=(
|
||||
1, "INVALID_DATA",
|
||||
"Locations of positional arguments must be an unbroken "
|
||||
"integer sequence ascending from 0."))
|
||||
|
||||
job = _job(edp.JOB_TYPE_PIG,
|
||||
[_args(), _args(location="2", name="Argument 2")])
|
||||
self._assert_create_object_validation(
|
||||
data=job, bad_req_i=(
|
||||
1, "INVALID_DATA",
|
||||
"Locations of positional arguments must be an unbroken "
|
||||
"integer sequence ascending from 0."))
|
||||
|
||||
def test_required_positional_arg_without_default(self):
|
||||
arg = _args(required=False)
|
||||
del arg['default']
|
||||
job = _job(edp.JOB_TYPE_PIG, [arg])
|
||||
self._assert_create_object_validation(
|
||||
data=job, bad_req_i=(
|
||||
1, "INVALID_DATA",
|
||||
"Positional arguments must be given default values if they "
|
||||
"are not required."))
|
||||
|
||||
def test_overlapping_locations(self):
|
||||
job = _job(edp.JOB_TYPE_PIG,
|
||||
[_configs(), _configs(name="Mapper Count")])
|
||||
self._assert_create_object_validation(
|
||||
data=job, bad_req_i=(
|
||||
1, "INVALID_DATA",
|
||||
"The combination of mapping type and location must be unique "
|
||||
"within the interface for any job."))
|
||||
|
||||
def test_number_values(self):
|
||||
job = _job(edp.JOB_TYPE_PIG, [_configs(default="fish")])
|
||||
self._assert_create_object_validation(
|
||||
data=job, bad_req_i=(
|
||||
1, "INVALID_DATA",
|
||||
"Value 'fish' is not a valid number."))
|
||||
|
||||
@mock.patch('sahara.conductor.API.data_source_get')
|
||||
def test_data_source_values(self, data_source):
|
||||
data_source.return_value = True
|
||||
job = _job(edp.JOB_TYPE_PIG,
|
||||
[_params(default="DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF")])
|
||||
self._assert_create_object_validation(data=job)
|
||||
|
||||
data_source.return_value = False
|
||||
self._assert_create_object_validation(
|
||||
data=job, bad_req_i=(
|
||||
1, "INVALID_REFERENCE",
|
||||
"DataSource with id 'DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF' "
|
||||
"doesn't exist"))
|
||||
|
||||
job = _job(edp.JOB_TYPE_PIG, [_params(default="Filial Piety")])
|
||||
self._assert_create_object_validation(
|
||||
data=job, bad_req_i=(
|
||||
1, "INVALID_DATA",
|
||||
"Data source value 'Filial Piety' is neither a valid data "
|
||||
"source ID nor a valid URL."))
|
||||
|
||||
def test_default_data_type(self):
|
||||
param = _params()
|
||||
del param['value_type']
|
||||
job = _job(edp.JOB_TYPE_PIG, [param])
|
||||
self._assert_create_object_validation(data=job)
|
||||
assert param['value_type'] == j_i.DEFAULT_DATA_TYPE
|
||||
|
||||
int_arg = collections.namedtuple("int_arg",
|
||||
["name", "mapping_type", "location",
|
||||
"value_type", "required", "default"])
|
||||
|
||||
|
||||
def j_e_i_wrapper(data):
|
||||
job = mock.Mock(
|
||||
interface=[int_arg(**_configs()),
|
||||
int_arg(**_args()),
|
||||
int_arg(**_params())]
|
||||
)
|
||||
j_i.check_execution_interface(data, job)
|
||||
|
||||
|
||||
class TestJobExecutionInterfaceValidation(u.ValidationTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestJobExecutionInterfaceValidation, self).setUp()
|
||||
self._create_object_fun = j_e_i_wrapper
|
||||
self.scheme = j_e.JOB_EXEC_SCHEMA
|
||||
|
||||
def test_valid_execution(self):
|
||||
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
|
||||
"interface": {"Reducer Count": "2",
|
||||
"Input Path": "swift://path",
|
||||
"Positional Argument": "value"}}
|
||||
self._assert_create_object_validation(data=data)
|
||||
|
||||
def test_no_execution_interface(self):
|
||||
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF"}
|
||||
self._assert_create_object_validation(data=data, bad_req_i=(
|
||||
1, "INVALID_DATA",
|
||||
"An interface was specified with the template for this job. "
|
||||
"Please pass an interface map with this job (even if empty)."))
|
||||
|
||||
def test_bad_argument_name(self):
|
||||
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
|
||||
"interface": {"Fish": "Rainbow Trout",
|
||||
"Reducer Count": "2",
|
||||
"Input Path": "swift://path"}}
|
||||
self._assert_create_object_validation(data=data, bad_req_i=(
|
||||
1, "INVALID_DATA", "Argument names: ['Fish'] were not found in "
|
||||
"the interface for this job."))
|
||||
|
||||
def test_required_argument_missing(self):
|
||||
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
|
||||
"interface": {"Positional Argument": "Value"}}
|
||||
self._assert_create_object_validation(data=data, bad_req_i=(
|
||||
1, "INVALID_DATA", "Argument names: ['Reducer Count'] are "
|
||||
"required for this job."))
|
||||
|
||||
@mock.patch('sahara.conductor.API.data_source_get')
|
||||
def test_bad_values(self, data_source):
|
||||
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
|
||||
"interface": {"Reducer Count": "Two"}}
|
||||
self._assert_create_object_validation(data=data, bad_req_i=(
|
||||
1, "INVALID_DATA", "Value 'Two' is not a valid number."))
|
||||
|
||||
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
|
||||
"interface": {"Reducer Count": "2",
|
||||
"Input Path": "not_a_url"}}
|
||||
self._assert_create_object_validation(data=data, bad_req_i=(
|
||||
1, "INVALID_DATA", "Data source value 'not_a_url' is neither a "
|
||||
"valid data source ID nor a valid URL."))
|
||||
|
||||
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
|
||||
"interface": {"Reducer Count": "2",
|
||||
"Positional Argument": 2}}
|
||||
self._assert_create_object_validation(data=data, bad_req_i=(
|
||||
1, "INVALID_DATA", "Value '2' is not a valid string."))
|
||||
|
||||
data_source.return_value = False
|
||||
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
|
||||
"interface":
|
||||
{"Reducer Count": "2",
|
||||
"Input Path": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF"}}
|
||||
self._assert_create_object_validation(
|
||||
data=data, bad_req_i=(
|
||||
1, "INVALID_REFERENCE",
|
||||
"DataSource with id 'DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF' "
|
||||
"doesn't exist"))
|
||||
|
||||
def test_overlapping_data(self):
|
||||
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
|
||||
"interface": {"Reducer Count": "2"},
|
||||
"job_configs": {"configs": {"mapred.reduce.tasks": "2"}}}
|
||||
self._assert_create_object_validation(data=data)
|
||||
|
||||
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
|
||||
"interface": {"Reducer Count": "2"},
|
||||
"job_configs": {"configs": {"mapred.reduce.tasks": "3"}}}
|
||||
self._assert_create_object_validation(data=data, bad_req_i=(
|
||||
1, "INVALID_DATA",
|
||||
"Argument 'Reducer Count' was passed both through the interface "
|
||||
"and in location 'configs'.'mapred.reduce.tasks'. Please pass "
|
||||
"this through either the interface or the configuration maps, "
|
||||
"not both."))
|
@ -252,7 +252,8 @@ class ValidationTestCase(base.SaharaTestCase):
|
||||
|
||||
def _assert_calls(self, mock, call_info):
|
||||
if not call_info:
|
||||
self.assertEqual(0, mock.call_count)
|
||||
self.assertEqual(0, mock.call_count, "Unexpected call to %s: %s"
|
||||
% (mock.name, str(mock.call_args)))
|
||||
else:
|
||||
self.assertEqual(call_info[0], mock.call_count)
|
||||
self.assertEqual(call_info[1], mock.call_args[0][0].code)
|
||||
|
@ -62,6 +62,17 @@ JOB_TYPES_ALL = [
|
||||
JOB_TYPE_STORM
|
||||
]
|
||||
|
||||
JOB_TYPES_ACCEPTABLE_CONFIGS = {
|
||||
JOB_TYPE_HIVE: {"configs", "params"},
|
||||
JOB_TYPE_PIG: {"configs", "params", "args"},
|
||||
JOB_TYPE_MAPREDUCE: {"configs"},
|
||||
JOB_TYPE_MAPREDUCE_STREAMING: {"configs"},
|
||||
JOB_TYPE_JAVA: {"configs", "args"},
|
||||
JOB_TYPE_SHELL: {"configs", "params", "args"},
|
||||
JOB_TYPE_SPARK: {"configs", "args"},
|
||||
JOB_TYPE_STORM: {"args"}
|
||||
}
|
||||
|
||||
ADAPT_FOR_OOZIE = 'edp.java.adapt_for_oozie'
|
||||
|
||||
ADAPT_SPARK_FOR_SWIFT = 'edp.spark.adapt_for_swift'
|
||||
|
Loading…
Reference in New Issue
Block a user