Merge "[EDP] Unified Map to Define Job Interface"

This commit is contained in:
Jenkins 2015-07-07 12:04:05 +00:00 committed by Gerrit Code Review
commit c6dc37c3fb
18 changed files with 862 additions and 37 deletions

View File

@ -127,7 +127,7 @@ def job_list():
@rest.post('/jobs')
@acl.enforce("data-processing:jobs:create")
@v.validate(v_j.JOB_SCHEMA, v_j.check_mains_libs)
@v.validate(v_j.JOB_SCHEMA, v_j.check_mains_libs, v_j.check_interface)
def job_create(data):
return u.render(api.create_job(data).to_wrapped_dict())

View File

@ -242,6 +242,7 @@ class JobExecution(object):
oozie_job_id
return_code
job_configs
interface
extra
data_source_urls
"""
@ -257,6 +258,7 @@ class Job(object):
type
mains
libs
interface
"""

View File

@ -259,6 +259,7 @@ class JobExecution(Resource, objects.JobExecution):
_filter_fields = ['extra']
_sanitize_fields = {'job_configs': sanitize_job_configs,
'info': sanitize_info}
# TODO(egafford): Sanitize interface ("secret" bool field on job args?)
class JobBinary(Resource, objects.JobBinary):

View File

@ -0,0 +1,67 @@
# Copyright 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""add_job_interface
Revision ID: 022
Revises: 021
Create Date: 2015-01-27 15:53:22.128263
"""
# revision identifiers, used by Alembic.
revision = '022'
down_revision = '021'
from alembic import op
import sqlalchemy as sa
MYSQL_ENGINE = 'InnoDB'
MYSQL_CHARSET = 'utf8'
def upgrade():
op.create_table('job_interface_arguments',
sa.Column('created_at', sa.DateTime(),
nullable=True),
sa.Column('updated_at', sa.DateTime(),
nullable=True),
sa.Column('id', sa.String(length=36),
nullable=False),
sa.Column('job_id', sa.String(length=36),
nullable=False),
sa.Column('tenant_id', sa.String(length=36),
nullable=True),
sa.Column('name', sa.String(80),
nullable=False),
sa.Column('description', sa.Text()),
sa.Column('mapping_type', sa.String(80),
nullable=False),
sa.Column('location', sa.Text(),
nullable=False),
sa.Column('value_type', sa.String(80),
nullable=False),
sa.Column('required', sa.Boolean(),
nullable=False),
sa.Column('order', sa.SmallInteger(),
nullable=False),
sa.Column('default', sa.Text()),
sa.ForeignKeyConstraint(['job_id'],
['jobs.id']),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('job_id', 'order'),
sa.UniqueConstraint('job_id', 'name'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET)

View File

@ -747,13 +747,56 @@ def job_execution_count(context, **kwargs):
return query.filter_by(**kwargs).first()[0]
def _get_config_section(configs, mapping_type):
if mapping_type not in configs:
configs[mapping_type] = [] if mapping_type == "args" else {}
return configs[mapping_type]
def _merge_execution_interface(job_ex, job, execution_interface):
"""Merges the interface for a job execution with that of its job."""
configs = job_ex.job_configs or {}
nonexistent = object()
positional_args = {}
for arg in job.interface:
value = nonexistent
typed_configs = _get_config_section(configs, arg.mapping_type)
# Interface args are our first choice for the value.
if arg.name in execution_interface:
value = execution_interface[arg.name]
else:
# If a default exists, we can use that, but...
if arg.default is not None:
value = arg.default
# We should prefer an argument passed through the
# job_configs that maps to the same location.
if arg.mapping_type != "args":
value = typed_configs.get(arg.location, value)
if value is not nonexistent:
if arg.mapping_type != "args":
typed_configs[arg.location] = value
else:
positional_args[int(arg.location)] = value
if positional_args:
positional_args = [positional_args[i] for i
in range(len(positional_args))]
configs["args"] = positional_args + configs["args"]
if configs and not job_ex.job_configs:
job_ex.job_configs = configs
def job_execution_create(context, values):
session = get_session()
execution_interface = values.pop('interface', {})
job_ex = m.JobExecution()
job_ex.update(values)
try:
with session.begin():
job_ex.interface = []
job = _job_get(context, session, job_ex.job_id)
if job.interface:
_merge_execution_interface(job_ex, job, execution_interface)
session.add(job_ex)
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry(
@ -811,23 +854,36 @@ def _append_job_binaries(context, session, from_list, to_list):
to_list.append(job_binary)
def _append_interface(context, from_list, to_list):
for order, argument_values in enumerate(from_list):
argument_values['tenant_id'] = context.tenant_id
argument_values['order'] = order
argument = m.JobInterfaceArgument()
argument.update(argument_values)
to_list.append(argument)
def job_create(context, values):
mains = values.pop("mains", [])
libs = values.pop("libs", [])
interface = values.pop("interface", [])
session = get_session()
try:
with session.begin():
job = m.Job()
job.update(values)
# libs and mains are 'lazy' objects. The initialization below
# is needed here because it provides libs and mains to be
# initialized within a session even if the lists are empty
# These are 'lazy' objects. The initialization below
# is needed here because it provides libs, mains, and
# interface to be initialized within a session even if
# the lists are empty
job.mains = []
job.libs = []
job.interface = []
_append_job_binaries(context, session, mains, job.mains)
_append_job_binaries(context, session, libs, job.libs)
_append_interface(context, interface, job.interface)
session.add(job)

View File

@ -301,6 +301,7 @@ class JobExecution(mb.SaharaBase):
extra = sa.Column(st.JsonDictType())
data_source_urls = sa.Column(st.JsonDictType())
mains_association = sa.Table("mains_association",
mb.SaharaBase.metadata,
sa.Column("Job_id",
@ -344,13 +345,44 @@ class Job(mb.SaharaBase):
libs = relationship("JobBinary",
secondary=libs_association, lazy="joined")
interface = relationship('JobInterfaceArgument',
cascade="all,delete",
order_by="JobInterfaceArgument.order",
backref='job',
lazy='joined')
def to_dict(self):
d = super(Job, self).to_dict()
d['mains'] = [jb.to_dict() for jb in self.mains]
d['libs'] = [jb.to_dict() for jb in self.libs]
d['interface'] = [arg.to_dict() for arg in self.interface]
return d
class JobInterfaceArgument(mb.SaharaBase):
"""JobInterfaceArgument - Configuration setting for a specific job."""
__tablename__ = 'job_interface_arguments'
__table_args__ = (
sa.UniqueConstraint('job_id', 'name'),
sa.UniqueConstraint('job_id', 'order')
)
id = _id_column()
job_id = sa.Column(sa.String(36), sa.ForeignKey('jobs.id'),
nullable=False)
tenant_id = sa.Column(sa.String(36))
name = sa.Column(sa.String(80), nullable=False)
description = sa.Column(sa.Text())
mapping_type = sa.Column(sa.String(80), nullable=False)
location = sa.Column(sa.Text(), nullable=False)
value_type = sa.Column(sa.String(80), nullable=False)
required = sa.Column(sa.Boolean(), nullable=False)
order = sa.Column(sa.SmallInteger(), nullable=False)
default = sa.Column(sa.Text())
class JobBinaryInternal(mb.SaharaBase):
"""JobBinaryInternal - raw binary storage for executable jobs."""

View File

@ -111,6 +111,7 @@ def execute_job(job_id, data):
# Elements common to all job types
cluster_id = data['cluster_id']
configs = data.get('job_configs', {})
interface = data.get('interface', {})
# Not in Java job types but present for all others
input_id = data.get('input_id', None)
@ -121,7 +122,8 @@ def execute_job(job_id, data):
job_ex_dict = {'input_id': input_id, 'output_id': output_id,
'job_id': job_id, 'cluster_id': cluster_id,
'info': {'status': edp.JOB_STATUS_PENDING},
'job_configs': configs, 'extra': {}}
'job_configs': configs, 'extra': {},
'interface': interface}
job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict)
context.set_current_job_execution_id(job_execution.id)

View File

@ -16,8 +16,10 @@
import sahara.exceptions as e
from sahara.i18n import _
from sahara.service.edp import api
from sahara.service.validations.edp import job_interface as j_i
from sahara.utils import edp
JOB_SCHEMA = {
"type": "object",
"properties": {
@ -52,7 +54,8 @@ JOB_SCHEMA = {
},
"streaming": {
"type": "boolean"
}
},
"interface": j_i.INTERFACE_ARGUMENT_SCHEMA
},
"additionalProperties": False,
"required": [
@ -104,3 +107,7 @@ def check_mains_libs(data, **kwargs):
# Make sure that all referenced binaries exist
_check_binaries(mains)
_check_binaries(libs)
def check_interface(data, **kwargs):
j_i.check_job_interface(data, **kwargs)

View File

@ -19,6 +19,7 @@ from sahara import exceptions as ex
from sahara.i18n import _
from sahara.plugins import base as plugin_base
import sahara.service.validations.edp.base as b
import sahara.service.validations.edp.job_interface as j_i
JOB_EXEC_SCHEMA = {
"type": "object",
@ -35,6 +36,9 @@ JOB_EXEC_SCHEMA = {
"type": "string",
"format": "uuid",
},
"interface": {
"type": "simple_config",
},
"job_configs": b.job_configs,
},
"additionalProperties": False,
@ -92,6 +96,7 @@ def check_job_execution(data, job_id):
"'%(job_type)s'") % {"cluster_id": cluster.id,
"job_type": job.type})
j_i.check_execution_interface(data, job)
edp_engine.validate_job_execution(cluster, job, data)

View File

@ -0,0 +1,204 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_utils import uuidutils
import six
from six.moves.urllib import parse as urlparse
import sahara.exceptions as e
from sahara.i18n import _
from sahara.service.validations.edp import base as b
from sahara.utils import edp
DATA_TYPE_STRING = "string"
DATA_TYPE_NUMBER = "number"
DATA_TYPE_DATA_SOURCE = "data_source"
DATA_TYPES = [DATA_TYPE_STRING,
DATA_TYPE_NUMBER,
DATA_TYPE_DATA_SOURCE]
DEFAULT_DATA_TYPE = DATA_TYPE_STRING
INTERFACE_ARGUMENT_SCHEMA = {
"type": ["array", "null"],
"uniqueItems": True,
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"minLength": 1
},
"description": {
"type": ["string", "null"]
},
"mapping_type": {
"type": "string",
"enum": ["args", "configs", "params"]
},
"location": {
"type": "string",
"minLength": 1
},
"value_type": {
"type": "string",
"enum": DATA_TYPES,
"default": "string"
},
"required": {
"type": "boolean"
},
"default": {
"type": ["string", "null"]
}
},
"additionalProperties": False,
"required": ["name", "mapping_type", "location", "required"]
}
}
def _check_job_interface(data, interface):
names = set(arg["name"] for arg in interface)
if len(names) != len(interface):
raise e.InvalidDataException(
_("Name must be unique within the interface for any job."))
mapping_types = set(arg["mapping_type"] for arg in interface)
acceptable_types = edp.JOB_TYPES_ACCEPTABLE_CONFIGS[data["type"]]
if any(m_type not in acceptable_types for m_type in mapping_types):
args = {"mapping_types": str(list(acceptable_types)),
"job_type": data["type"]}
raise e.InvalidDataException(
_("Only mapping types %(mapping_types)s are allowed for job type "
"%(job_type)s.") % args)
positional_args = [arg for arg in interface
if arg["mapping_type"] == "args"]
if not all(six.text_type(arg["location"]).isnumeric()
for arg in positional_args):
raise e.InvalidDataException(
_("Locations of positional arguments must be an unbroken integer "
"sequence ascending from 0."))
locations = set(int(arg["location"]) for arg in positional_args)
if not all(i in locations for i in range(len(locations))):
raise e.InvalidDataException(
_("Locations of positional arguments must be an unbroken integer "
"sequence ascending from 0."))
not_required = (arg for arg in positional_args if not arg["required"])
if not all(arg.get("default", None) for arg in not_required):
raise e.InvalidDataException(
_("Positional arguments must be given default values if they are "
"not required."))
mappings = ((arg["mapping_type"], arg["location"]) for arg in interface)
if len(set(mappings)) != len(interface):
raise e.InvalidDataException(
_("The combination of mapping type and location must be unique "
"within the interface for any job."))
for arg in interface:
if "value_type" not in arg:
arg["value_type"] = DEFAULT_DATA_TYPE
default = arg.get("default", None)
if default is not None:
_validate_value(arg["value_type"], default)
def check_job_interface(data, **kwargs):
interface = data.get("interface", [])
if interface:
_check_job_interface(data, interface)
def _validate_data_source(value):
if uuidutils.is_uuid_like(value):
b.check_data_source_exists(value)
else:
if not urlparse.urlparse(value).scheme:
raise e.InvalidDataException(
_("Data source value '%s' is neither a valid data source ID "
"nor a valid URL.") % value)
def _validate_number(value):
if not six.text_type(value).isnumeric():
raise e.InvalidDataException(
_("Value '%s' is not a valid number.") % value)
def _validate_string(value):
if not isinstance(value, six.string_types):
raise e.InvalidDataException(
_("Value '%s' is not a valid string.") % value)
_value_type_validators = {
DATA_TYPE_STRING: _validate_string,
DATA_TYPE_NUMBER: _validate_number,
DATA_TYPE_DATA_SOURCE: _validate_data_source
}
def _validate_value(type, value):
_value_type_validators[type](value)
def check_execution_interface(data, job):
job_int = {arg.name: arg for arg in job.interface}
execution_int = data.get("interface", None)
if not (job_int or execution_int):
return
if job_int and execution_int is None:
raise e.InvalidDataException(
_("An interface was specified with the template for this job. "
"Please pass an interface map with this job (even if empty)."))
execution_names = set(execution_int.keys())
definition_names = set(job_int.keys())
not_found_names = execution_names - definition_names
if not_found_names:
raise e.InvalidDataException(
_("Argument names: %s were not found in the interface for this "
"job.") % str(list(not_found_names)))
required_names = {arg.name for arg in job.interface if arg.required}
unset_names = required_names - execution_names
if unset_names:
raise e.InvalidDataException(_("Argument names: %s are required for "
"this job.") % str(list(unset_names)))
nonexistent = object()
for name, value in six.iteritems(execution_int):
arg = job_int[name]
_validate_value(arg.value_type, value)
if arg.mapping_type == "args":
continue
typed_configs = data.get("job_configs", {}).get(arg.mapping_type, {})
config_value = typed_configs.get(arg.location, nonexistent)
if config_value is not nonexistent and config_value != value:
args = {"name": name,
"mapping_type": arg.mapping_type,
"location": arg.location}
raise e.InvalidDataException(
_("Argument '%(name)s' was passed both through the interface "
"and in location '%(mapping_type)s'.'%(location)s'. Please "
"pass this through either the interface or the "
"configuration maps, not both.") % args)

View File

@ -27,7 +27,7 @@ from sahara.utils import edp
SAMPLE_DATA_SOURCE = {
"tenant_id": "test_tenant",
"tenant_id": "tenant_1",
"name": "ngt_test",
"description": "test_desc",
"type": "Cassandra",
@ -39,7 +39,7 @@ SAMPLE_DATA_SOURCE = {
}
SAMPLE_JOB = {
"tenant_id": "test_tenant",
"tenant_id": "tenant_1",
"name": "job_test",
"description": "test_desc",
"type": edp.JOB_TYPE_PIG,
@ -47,7 +47,7 @@ SAMPLE_JOB = {
}
SAMPLE_JOB_EXECUTION = {
"tenant_id": "tenant_id",
"tenant_id": "tenant_1",
"return_code": "1",
"job_id": "undefined",
"input_id": "undefined",
@ -57,7 +57,7 @@ SAMPLE_JOB_EXECUTION = {
}
SAMPLE_CONF_JOB_EXECUTION = {
"tenant_id": "tenant_id",
"tenant_id": "tenant_1",
"progress": "0.1",
"return_code": "1",
"job_id": "undefined",

View File

@ -0,0 +1,115 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from sahara import context
import sahara.tests.unit.conductor.base as test_base
from sahara.tests.unit.conductor.manager import test_edp
def _merge_dict(original, update):
new = copy.deepcopy(original)
new.update(update)
return new
SAMPLE_JOB = _merge_dict(test_edp.SAMPLE_JOB, {
"interface": [
{
"name": "Reducer Count",
"mapping_type": "configs",
"location": "mapred.reduce.tasks",
"value_type": "number",
"required": True,
"default": "1"
},
{
"name": "Input Path",
"mapping_type": "params",
"location": "INPUT",
"value_type": "data_source",
"required": False,
"default": "hdfs://path"
},
{
"name": "Positional Argument 2",
"mapping_type": "args",
"location": "1",
"value_type": "string",
"required": False,
"default": "default"
},
{
"name": "Positional Argument 1",
"mapping_type": "args",
"location": "0",
"value_type": "string",
"required": False,
"default": "arg_1"
},
]
})
SAMPLE_JOB_EXECUTION = _merge_dict(test_edp.SAMPLE_JOB, {
"interface": {
"Reducer Count": "2",
"Positional Argument 2": "arg_2"
},
"job_configs": {"args": ["arg_3"], "configs": {"mapred.map.tasks": "3"}}
})
class JobExecutionTest(test_base.ConductorManagerTestCase):
def test_interface_flows(self):
ctx = context.ctx()
job = self.api.job_create(ctx, SAMPLE_JOB)
arg_names = [arg['name'] for arg in job['interface']]
self.assertEqual(arg_names, ["Reducer Count", "Input Path",
"Positional Argument 2",
"Positional Argument 1"])
job_ex_input = copy.deepcopy(SAMPLE_JOB_EXECUTION)
job_ex_input['job_id'] = job['id']
self.api.job_execution_create(ctx, job_ex_input)
lst = self.api.job_execution_get_all(ctx)
self.assertEqual(1, len(lst))
job_ex_result = lst[0]
configs = {
'configs': {'mapred.reduce.tasks': '2',
'mapred.map.tasks': '3'},
'args': ['arg_1', 'arg_2', 'arg_3'],
'params': {'INPUT': 'hdfs://path'}
}
self.assertEqual(configs, job_ex_result['job_configs'])
self.api.job_execution_destroy(ctx, job_ex_result['id'])
del job_ex_input['job_configs']
self.api.job_execution_create(ctx, job_ex_input)
lst = self.api.job_execution_get_all(ctx)
self.assertEqual(1, len(lst))
job_ex_result = lst[0]
configs = {
'configs': {'mapred.reduce.tasks': '2'},
'args': ['arg_1', 'arg_2'],
'params': {'INPUT': 'hdfs://path'}
}
self.assertEqual(configs, job_ex_result['job_configs'])

View File

@ -43,7 +43,7 @@ class SaharaMigrationsCheckers(object):
t = db_utils.get_table(engine, table)
self.assertIn(column, t.c)
def assertColumnsExists(self, engine, table, columns):
def assertColumnsExist(self, engine, table, columns):
for column in columns:
self.assertColumnExists(engine, table, column)
@ -90,7 +90,7 @@ class SaharaMigrationsCheckers(object):
'data',
'datasize'
]
self.assertColumnsExists(
self.assertColumnsExist(
engine, 'job_binary_internal', job_binary_internal_columns)
self.assertColumnCount(
engine, 'job_binary_internal', job_binary_internal_columns)
@ -113,7 +113,7 @@ class SaharaMigrationsCheckers(object):
'volume_mount_prefix',
'floating_ip_pool'
]
self.assertColumnsExists(
self.assertColumnsExist(
engine, 'node_group_templates', node_group_templates_columns)
self.assertColumnCount(
engine, 'node_group_templates', node_group_templates_columns)
@ -129,7 +129,7 @@ class SaharaMigrationsCheckers(object):
'url',
'credentials'
]
self.assertColumnsExists(
self.assertColumnsExist(
engine, 'data_sources', data_sources_columns)
self.assertColumnCount(
engine, 'data_sources', data_sources_columns)
@ -148,7 +148,7 @@ class SaharaMigrationsCheckers(object):
'plugin_name',
'hadoop_version'
]
self.assertColumnsExists(
self.assertColumnsExist(
engine, 'cluster_templates', cluster_templates_columns)
self.assertColumnCount(
engine, 'cluster_templates', cluster_templates_columns)
@ -163,7 +163,7 @@ class SaharaMigrationsCheckers(object):
'url',
'extra'
]
self.assertColumnsExists(
self.assertColumnsExist(
engine, 'job_binaries', job_binaries_columns)
self.assertColumnCount(
engine, 'job_binaries', job_binaries_columns)
@ -177,7 +177,7 @@ class SaharaMigrationsCheckers(object):
'description',
'type'
]
self.assertColumnsExists(engine, 'jobs', jobs_columns)
self.assertColumnsExist(engine, 'jobs', jobs_columns)
self.assertColumnCount(engine, 'jobs', jobs_columns)
templates_relations_columns = [
@ -198,7 +198,7 @@ class SaharaMigrationsCheckers(object):
'node_group_template_id',
'floating_ip_pool'
]
self.assertColumnsExists(
self.assertColumnsExist(
engine, 'templates_relations', templates_relations_columns)
self.assertColumnCount(
engine, 'templates_relations', templates_relations_columns)
@ -207,7 +207,7 @@ class SaharaMigrationsCheckers(object):
'Job_id',
'JobBinary_id'
]
self.assertColumnsExists(
self.assertColumnsExist(
engine, 'mains_association', mains_association_columns)
self.assertColumnCount(
engine, 'mains_association', mains_association_columns)
@ -216,7 +216,7 @@ class SaharaMigrationsCheckers(object):
'Job_id',
'JobBinary_id'
]
self.assertColumnsExists(
self.assertColumnsExist(
engine, 'libs_association', libs_association_columns)
self.assertColumnCount(
engine, 'libs_association', libs_association_columns)
@ -245,7 +245,7 @@ class SaharaMigrationsCheckers(object):
'extra',
'cluster_template_id'
]
self.assertColumnsExists(engine, 'clusters', clusters_columns)
self.assertColumnsExist(engine, 'clusters', clusters_columns)
self.assertColumnCount(engine, 'clusters', clusters_columns)
node_groups_columns = [
@ -267,7 +267,7 @@ class SaharaMigrationsCheckers(object):
'node_group_template_id',
'floating_ip_pool'
]
self.assertColumnsExists(engine, 'node_groups', node_groups_columns)
self.assertColumnsExist(engine, 'node_groups', node_groups_columns)
self.assertColumnCount(engine, 'node_groups', node_groups_columns)
job_executions_columns = [
@ -288,7 +288,7 @@ class SaharaMigrationsCheckers(object):
'job_configs',
'extra'
]
self.assertColumnsExists(
self.assertColumnsExist(
engine, 'job_executions', job_executions_columns)
self.assertColumnCount(
engine, 'job_executions', job_executions_columns)
@ -305,7 +305,7 @@ class SaharaMigrationsCheckers(object):
'management_ip',
'volumes'
]
self.assertColumnsExists(engine, 'instances', instances_columns)
self.assertColumnsExist(engine, 'instances', instances_columns)
self.assertColumnCount(engine, 'instances', instances_columns)
self._data_001(engine, data)
@ -425,11 +425,11 @@ class SaharaMigrationsCheckers(object):
self.assertColumnCount(engine, 'cluster_provision_steps',
provision_steps_columns)
self.assertColumnsExists(engine, 'cluster_provision_steps',
provision_steps_columns)
self.assertColumnsExist(engine, 'cluster_provision_steps',
provision_steps_columns)
self.assertColumnCount(engine, 'cluster_events', events_columns)
self.assertColumnsExists(engine, 'cluster_events', events_columns)
self.assertColumnsExist(engine, 'cluster_events', events_columns)
def _check_016(self, engine, data):
self.assertColumnExists(engine, 'node_group_templates',
@ -464,6 +464,26 @@ class SaharaMigrationsCheckers(object):
def _check_021(self, engine, data):
self.assertColumnExists(engine, 'job_executions', 'data_source_urls')
def _check_022(self, engine, data):
columns = [
'created_at',
'updated_at',
'id',
'job_id',
'tenant_id',
'name',
'description',
'mapping_type',
'location',
'value_type',
'required',
'order',
'default'
]
self.assertColumnCount(engine, 'job_interface_arguments', columns)
self.assertColumnsExist(engine, 'job_interface_arguments', columns)
class TestMigrationsMySQL(SaharaMigrationsCheckers,
base.BaseWalkMigrationTestCase,

View File

@ -41,6 +41,7 @@ def _create_job(id, job_binary, type):
job.id = id
job.type = type
job.name = 'special_name'
job.interface = []
if edp.compare_job_type(type, edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE):
job.mains = [job_binary]
job.libs = None
@ -91,6 +92,8 @@ def _create_job_exec(job_id, type, configs=None):
j_exec.id = six.text_type(uuid.uuid4())
j_exec.job_id = job_id
j_exec.job_configs = configs
if not j_exec.job_configs:
j_exec.job_configs = {}
if edp.compare_job_type(type, edp.JOB_TYPE_JAVA):
j_exec.job_configs['configs']['edp.java.main_class'] = _java_main_class
j_exec.job_configs['configs']['edp.java.java_opts'] = _java_opts
@ -100,8 +103,6 @@ def _create_job_exec(job_id, type, configs=None):
def _create_job_exec_with_proxy(job_id, type, configs=None):
j_exec = _create_job_exec(job_id, type, configs)
j_exec.id = '00000000-1111-2222-3333-4444444444444444'
if not j_exec.job_configs:
j_exec.job_configs = {}
j_exec.job_configs['proxy_configs'] = {
'proxy_username': 'job_' + j_exec.id,
'proxy_password': '55555555-6666-7777-8888-999999999999',

View File

@ -46,7 +46,7 @@ class TestJobExecValidation(u.ValidationTestCase):
@mock.patch('sahara.conductor.api.LocalApi.job_get')
def test_streaming(self, get_job, get_data_source, get_cluster):
get_job.return_value = mock.Mock(
type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[])
type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[], interface=[])
ds1_id = six.text_type(uuid.uuid4())
ds2_id = six.text_type(uuid.uuid4())
@ -95,7 +95,7 @@ class TestJobExecValidation(u.ValidationTestCase):
@mock.patch('sahara.conductor.api.LocalApi.job_get')
def test_data_sources_differ(self, get_job, get_data_source, get_cluster):
get_job.return_value = mock.Mock(
type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[])
type=edp.JOB_TYPE_MAPREDUCE_STREAMING, libs=[], interface=[])
ds1_id = six.text_type(uuid.uuid4())
ds2_id = six.text_type(uuid.uuid4())
@ -147,7 +147,8 @@ class TestJobExecValidation(u.ValidationTestCase):
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
@mock.patch('sahara.conductor.api.LocalApi.job_get')
def test_check_edp_no_oozie(self, get_job, get_cluster):
get_job.return_value = mock.Mock(type=edp.JOB_TYPE_PIG, libs=[])
get_job.return_value = mock.Mock(type=edp.JOB_TYPE_PIG, libs=[],
interface=[])
ng = tu.make_ng_dict('master', 42, ['namenode'], 1,
instances=[tu.make_inst_dict('id', 'name')])
@ -173,7 +174,7 @@ class TestJobExecValidation(u.ValidationTestCase):
# Note that this means we cannot use assert_create_object_validation()
# because it calls start_patch() and will override our setting
job = mock.Mock(type=edp.JOB_TYPE_SPARK, mains=["main"])
job = mock.Mock(type=edp.JOB_TYPE_SPARK, mains=["main"], interface=[])
get_job.return_value = job
ng = tu.make_ng_dict('master', 42, [], 1,
instances=[tu.make_inst_dict('id', 'name')])
@ -190,8 +191,8 @@ class TestJobExecValidation(u.ValidationTestCase):
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
@mock.patch('sahara.conductor.api.LocalApi.job_get')
def test_edp_main_class_java(self, job_get, cluster_get):
job_get.return_value = mock.Mock()
job_get.return_value.type = edp.JOB_TYPE_JAVA
job_get.return_value = mock.Mock(type=edp.JOB_TYPE_JAVA,
interface=[])
ng = tu.make_ng_dict('master', 42, ['namenode', 'oozie'], 1,
instances=[tu.make_inst_dict('id', 'name')])
cluster_get.return_value = tu.create_cluster("cluster", "tenant1",
@ -221,7 +222,8 @@ class TestJobExecValidation(u.ValidationTestCase):
@mock.patch('sahara.conductor.api.LocalApi.cluster_get')
@mock.patch('sahara.conductor.api.LocalApi.job_get')
def test_edp_main_class_spark(self, job_get, cluster_get):
job_get.return_value = mock.Mock(type=edp.JOB_TYPE_SPARK)
job_get.return_value = mock.Mock(type=edp.JOB_TYPE_SPARK,
interface=[])
ng = tu.make_ng_dict('master', 42, ['namenode'], 1,
instances=[tu.make_inst_dict('id', 'name')])
cluster_get.return_value = tu.create_cluster("cluster", "tenant1",

View File

@ -0,0 +1,299 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import itertools
import mock
from sahara.service.validations.edp import job as j
from sahara.service.validations.edp import job_execution as j_e
from sahara.service.validations.edp import job_interface as j_i
from sahara.tests.unit.service.validation import utils as u
from sahara.utils import edp
def _configs(**kwargs):
arg = {
"name": "Reducer Count",
"mapping_type": "configs",
"location": "mapred.reduce.tasks",
"value_type": "number",
"required": True,
"default": "1"
}
arg.update(kwargs)
return arg
def _params(**kwargs):
arg = {
"name": "Input Path",
"mapping_type": "params",
"location": "INPUT",
"value_type": "data_source",
"required": False,
"default": "hdfs://path"
}
arg.update(kwargs)
return arg
def _args(**kwargs):
arg = {
"name": "Positional Argument",
"mapping_type": "args",
"location": "0",
"value_type": "string",
"required": False,
"default": "arg_value"
}
arg.update(kwargs)
return arg
_mapping_types = {"configs", "args", "params"}
def _job(job_type, interface):
return {"name": "job", "type": job_type, "interface": interface}
_job_types = [
_job(edp.JOB_TYPE_HIVE, [_configs(), _params()]),
_job(edp.JOB_TYPE_PIG, [_configs(), _params(), _args()]),
_job(edp.JOB_TYPE_MAPREDUCE, [_configs()]),
_job(edp.JOB_TYPE_MAPREDUCE_STREAMING, [_configs()]),
_job(edp.JOB_TYPE_JAVA, [_configs(), _args()]),
_job(edp.JOB_TYPE_SHELL, [_configs(), _params(), _args()]),
_job(edp.JOB_TYPE_SPARK, [_configs(), _args()]),
_job(edp.JOB_TYPE_STORM, [_args()])
]
class TestJobInterfaceValidation(u.ValidationTestCase):
def setUp(self):
super(TestJobInterfaceValidation, self).setUp()
self._create_object_fun = j.check_interface
self.scheme = j.JOB_SCHEMA
def test_interface(self):
for job in _job_types:
self._assert_create_object_validation(data=job)
def test_no_interface(self):
job = _job(edp.JOB_TYPE_PIG, None)
self._assert_create_object_validation(data=job)
def test_overlapping_names(self):
job = _job(edp.JOB_TYPE_PIG,
[_configs(), _configs(location="mapred.map.tasks")])
self._assert_create_object_validation(
data=job, bad_req_i=(
1, "INVALID_DATA",
"Name must be unique within the interface for any job."))
def test_unacceptable_types(self):
for job in _job_types:
acceptable_types = [arg['mapping_type']
for arg in job['interface']]
unacceptable_types = _mapping_types - set(acceptable_types)
unacceptable_args = (globals()["_" + m_type]()
for m_type in unacceptable_types)
bad_job = job.copy()
for arg in unacceptable_args:
bad_job['interface'] = [arg]
permutations = itertools.permutations(acceptable_types)
msgs = ["Only mapping types %s are allowed for job type %s." %
(list(permutation), bad_job['type'])
for permutation in permutations]
self._assert_create_object_validation(
data=bad_job, bad_req_i=(1, "INVALID_DATA", msgs))
def test_bad_positional_arg_locations(self):
job = _job(edp.JOB_TYPE_PIG, [_args(location="1")])
self._assert_create_object_validation(
data=job, bad_req_i=(
1, "INVALID_DATA",
"Locations of positional arguments must be an unbroken "
"integer sequence ascending from 0."))
job = _job(edp.JOB_TYPE_PIG, [_args(location="fish")])
self._assert_create_object_validation(
data=job, bad_req_i=(
1, "INVALID_DATA",
"Locations of positional arguments must be an unbroken "
"integer sequence ascending from 0."))
job = _job(edp.JOB_TYPE_PIG,
[_args(), _args(location="2", name="Argument 2")])
self._assert_create_object_validation(
data=job, bad_req_i=(
1, "INVALID_DATA",
"Locations of positional arguments must be an unbroken "
"integer sequence ascending from 0."))
def test_required_positional_arg_without_default(self):
arg = _args(required=False)
del arg['default']
job = _job(edp.JOB_TYPE_PIG, [arg])
self._assert_create_object_validation(
data=job, bad_req_i=(
1, "INVALID_DATA",
"Positional arguments must be given default values if they "
"are not required."))
def test_overlapping_locations(self):
job = _job(edp.JOB_TYPE_PIG,
[_configs(), _configs(name="Mapper Count")])
self._assert_create_object_validation(
data=job, bad_req_i=(
1, "INVALID_DATA",
"The combination of mapping type and location must be unique "
"within the interface for any job."))
def test_number_values(self):
job = _job(edp.JOB_TYPE_PIG, [_configs(default="fish")])
self._assert_create_object_validation(
data=job, bad_req_i=(
1, "INVALID_DATA",
"Value 'fish' is not a valid number."))
@mock.patch('sahara.conductor.API.data_source_get')
def test_data_source_values(self, data_source):
data_source.return_value = True
job = _job(edp.JOB_TYPE_PIG,
[_params(default="DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF")])
self._assert_create_object_validation(data=job)
data_source.return_value = False
self._assert_create_object_validation(
data=job, bad_req_i=(
1, "INVALID_REFERENCE",
"DataSource with id 'DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF' "
"doesn't exist"))
job = _job(edp.JOB_TYPE_PIG, [_params(default="Filial Piety")])
self._assert_create_object_validation(
data=job, bad_req_i=(
1, "INVALID_DATA",
"Data source value 'Filial Piety' is neither a valid data "
"source ID nor a valid URL."))
def test_default_data_type(self):
param = _params()
del param['value_type']
job = _job(edp.JOB_TYPE_PIG, [param])
self._assert_create_object_validation(data=job)
assert param['value_type'] == j_i.DEFAULT_DATA_TYPE
int_arg = collections.namedtuple("int_arg",
["name", "mapping_type", "location",
"value_type", "required", "default"])
def j_e_i_wrapper(data):
job = mock.Mock(
interface=[int_arg(**_configs()),
int_arg(**_args()),
int_arg(**_params())]
)
j_i.check_execution_interface(data, job)
class TestJobExecutionInterfaceValidation(u.ValidationTestCase):
def setUp(self):
super(TestJobExecutionInterfaceValidation, self).setUp()
self._create_object_fun = j_e_i_wrapper
self.scheme = j_e.JOB_EXEC_SCHEMA
def test_valid_execution(self):
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
"interface": {"Reducer Count": "2",
"Input Path": "swift://path",
"Positional Argument": "value"}}
self._assert_create_object_validation(data=data)
def test_no_execution_interface(self):
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF"}
self._assert_create_object_validation(data=data, bad_req_i=(
1, "INVALID_DATA",
"An interface was specified with the template for this job. "
"Please pass an interface map with this job (even if empty)."))
def test_bad_argument_name(self):
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
"interface": {"Fish": "Rainbow Trout",
"Reducer Count": "2",
"Input Path": "swift://path"}}
self._assert_create_object_validation(data=data, bad_req_i=(
1, "INVALID_DATA", "Argument names: ['Fish'] were not found in "
"the interface for this job."))
def test_required_argument_missing(self):
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
"interface": {"Positional Argument": "Value"}}
self._assert_create_object_validation(data=data, bad_req_i=(
1, "INVALID_DATA", "Argument names: ['Reducer Count'] are "
"required for this job."))
@mock.patch('sahara.conductor.API.data_source_get')
def test_bad_values(self, data_source):
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
"interface": {"Reducer Count": "Two"}}
self._assert_create_object_validation(data=data, bad_req_i=(
1, "INVALID_DATA", "Value 'Two' is not a valid number."))
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
"interface": {"Reducer Count": "2",
"Input Path": "not_a_url"}}
self._assert_create_object_validation(data=data, bad_req_i=(
1, "INVALID_DATA", "Data source value 'not_a_url' is neither a "
"valid data source ID nor a valid URL."))
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
"interface": {"Reducer Count": "2",
"Positional Argument": 2}}
self._assert_create_object_validation(data=data, bad_req_i=(
1, "INVALID_DATA", "Value '2' is not a valid string."))
data_source.return_value = False
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
"interface":
{"Reducer Count": "2",
"Input Path": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF"}}
self._assert_create_object_validation(
data=data, bad_req_i=(
1, "INVALID_REFERENCE",
"DataSource with id 'DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF' "
"doesn't exist"))
def test_overlapping_data(self):
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
"interface": {"Reducer Count": "2"},
"job_configs": {"configs": {"mapred.reduce.tasks": "2"}}}
self._assert_create_object_validation(data=data)
data = {"cluster_id": "DEADBEEF-DEAD-BEEF-DEAD-BEEFDEADBEEF",
"interface": {"Reducer Count": "2"},
"job_configs": {"configs": {"mapred.reduce.tasks": "3"}}}
self._assert_create_object_validation(data=data, bad_req_i=(
1, "INVALID_DATA",
"Argument 'Reducer Count' was passed both through the interface "
"and in location 'configs'.'mapred.reduce.tasks'. Please pass "
"this through either the interface or the configuration maps, "
"not both."))

View File

@ -255,7 +255,8 @@ class ValidationTestCase(base.SaharaTestCase):
def _assert_calls(self, mock, call_info):
if not call_info:
self.assertEqual(0, mock.call_count)
self.assertEqual(0, mock.call_count, "Unexpected call to %s: %s"
% (mock.name, str(mock.call_args)))
else:
self.assertEqual(call_info[0], mock.call_count)
self.assertEqual(call_info[1], mock.call_args[0][0].code)

View File

@ -62,6 +62,17 @@ JOB_TYPES_ALL = [
JOB_TYPE_STORM
]
JOB_TYPES_ACCEPTABLE_CONFIGS = {
JOB_TYPE_HIVE: {"configs", "params"},
JOB_TYPE_PIG: {"configs", "params", "args"},
JOB_TYPE_MAPREDUCE: {"configs"},
JOB_TYPE_MAPREDUCE_STREAMING: {"configs"},
JOB_TYPE_JAVA: {"configs", "args"},
JOB_TYPE_SHELL: {"configs", "params", "args"},
JOB_TYPE_SPARK: {"configs", "args"},
JOB_TYPE_STORM: {"args"}
}
ADAPT_FOR_OOZIE = 'edp.java.adapt_for_oozie'
ADAPT_SPARK_FOR_SWIFT = 'edp.spark.adapt_for_swift'