diff --git a/savanna/api/v11.py b/savanna/api/v11.py index fef9fce6..76f01133 100644 --- a/savanna/api/v11.py +++ b/savanna/api/v11.py @@ -35,13 +35,7 @@ rest = u.Rest('v11', __name__) @v.check_exists(api.get_job, id='job_id') @v.validate(v_j_e.JOB_EXEC_SCHEMA, v_j_e.check_job_executor) def job_execute(job_id, data): - input = data['input_id'] - output = data['output_id'] - cluster = data['cluster_id'] - configs = data.get('job_configs', {}) - return u.render(job_execution=api.execute_job(job_id, input, - output, cluster, - configs).to_dict()) + return u.render(job_execution=api.execute_job(job_id, data).to_dict()) @rest.get('/jobs/config-hints/') diff --git a/savanna/db/sqlalchemy/models.py b/savanna/db/sqlalchemy/models.py index e82f9675..779219de 100644 --- a/savanna/db/sqlalchemy/models.py +++ b/savanna/db/sqlalchemy/models.py @@ -265,6 +265,8 @@ class JobExecution(mb.SavannaBase): oozie_job_id = sa.Column(sa.String(100)) return_code = sa.Column(sa.String(80)) job_configs = sa.Column(st.JsonDictType()) + main_class = sa.Column(sa.String) + java_opts = sa.Column(sa.String) mains_association = sa.Table("mains_association", diff --git a/savanna/service/edp/api.py b/savanna/service/edp/api.py index be87f419..18c22d2e 100644 --- a/savanna/service/edp/api.py +++ b/savanna/service/edp/api.py @@ -29,10 +29,28 @@ def get_job_config_hints(job_type): return w_f.get_possible_job_config(job_type) -def execute_job(job_id, input_id, output_id, cluster_id, configs): - job_ex_dict = {'input_id': input_id, 'output_id': output_id, +def execute_job(job_id, data): + + # Elements common to all job types + cluster_id = data['cluster_id'] + configs = data.get('job_configs', {}) + + # Not in Java job types but present for all others + input_id = data.get('input_id', None) + output_id = data.get('output_id', None) + + # Present for Java job types + main_class = data.get('main_class', '') + java_opts = data.get('java_opts', '') + + # Since we will use a unified class in the database, we pass + # a superset for all job types + job_ex_dict = {'main_class': main_class, + 'java_opts': java_opts, + 'input_id': input_id, 'output_id': output_id, 'job_id': job_id, 'cluster_id': cluster_id, 'info': {'status': 'Pending'}, 'job_configs': configs} + job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict) context.spawn("Starting Job Execution %s" % job_execution.id, diff --git a/savanna/service/edp/job_manager.py b/savanna/service/edp/job_manager.py index 41aec7d5..bbd8b557 100644 --- a/savanna/service/edp/job_manager.py +++ b/savanna/service/edp/job_manager.py @@ -107,9 +107,12 @@ def run_job(job_execution): return job_execution job = conductor.job_get(ctx, job_execution.job_id) - - input_source = conductor.data_source_get(ctx, job_execution.input_id) - output_source = conductor.data_source_get(ctx, job_execution.output_id) + if job.type != 'Java': + input_source = conductor.data_source_get(ctx, job_execution.input_id) + output_source = conductor.data_source_get(ctx, job_execution.output_id) + else: + input_source = None + output_source = None #TODO(nprivalova): should be removed after all features implemented validate(input_source, output_source, job) @@ -124,8 +127,9 @@ def run_job(job_execution): # uploading hive configuration creator.configure_workflow_if_needed(cluster, wf_dir) - wf_xml = creator.get_workflow_xml(job_execution.job_configs, - input_source, output_source) + wf_xml = creator.get_workflow_xml(job_execution, + input_source, + output_source) path_to_workflow = upload_workflow_file(u.get_jobtracker(cluster), wf_dir, wf_xml, hdfs_user) @@ -205,7 +209,8 @@ def _append_slash_if_needed(path): #TODO(nprivalova): this validation should be removed after implementing # all features def validate(input_data, output_data, job): - if input_data.type != 'swift' or output_data.type != 'swift': + if (input_data and input_data.type != 'swift') or\ + (output_data and output_data.type != 'swift'): raise RuntimeError - if job.type not in ['Pig', 'MapReduce', 'Hive', 'Jar']: + if job.type not in ['Pig', 'MapReduce', 'Hive', 'Java', 'Jar']: raise RuntimeError diff --git a/savanna/service/edp/workflow_creator/java_workflow.py b/savanna/service/edp/workflow_creator/java_workflow.py new file mode 100644 index 00000000..9e87b792 --- /dev/null +++ b/savanna/service/edp/workflow_creator/java_workflow.py @@ -0,0 +1,51 @@ +# Copyright (c) 2013 RedHat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from savanna.service.edp.workflow_creator import base_workflow +from savanna.utils import xmlutils as x + + +class JavaWorkflowCreator(base_workflow.OozieWorkflowCreator): + + def __init__(self): + super(JavaWorkflowCreator, self).__init__('java') + + def build_workflow_xml(self, main_class, + prepare={}, + job_xml=None, + configuration=None, + java_opts=None, + arguments=[], + files=[], archives=[]): + + for k, v in prepare.items(): + self._add_to_prepare_element(k, v) + + self._add_job_xml_element(job_xml) + + self._add_configuration_elements(configuration) + + x.add_text_element_to_tag(self.doc, self.tag_name, + 'main-class', main_class) + + if java_opts: + x.add_text_element_to_tag(self.doc, self.tag_name, + 'java-opts', java_opts) + + for arg in arguments: + x.add_text_element_to_tag(self.doc, self.tag_name, + 'arg', arg) + + self._add_files_and_archives(files, archives) diff --git a/savanna/service/edp/workflow_creator/workflow_factory.py b/savanna/service/edp/workflow_creator/workflow_factory.py index ed99e42e..4d626310 100644 --- a/savanna/service/edp/workflow_creator/workflow_factory.py +++ b/savanna/service/edp/workflow_creator/workflow_factory.py @@ -21,6 +21,7 @@ from savanna.plugins import base as plugin_base from savanna.plugins.general import utils as u from savanna.service.edp import hdfs_helper as h from savanna.service.edp.workflow_creator import hive_workflow +from savanna.service.edp.workflow_creator import java_workflow from savanna.service.edp.workflow_creator import mapreduce_workflow from savanna.service.edp.workflow_creator import pig_workflow from savanna.utils import remote @@ -69,11 +70,11 @@ class PigFactory(BaseFactory): def get_script_name(self, job): return conductor.job_main_name(context.ctx(), job) - def get_workflow_xml(self, execution_configs, input_data, output_data): + def get_workflow_xml(self, execution, input_data, output_data): configs = {'configs': self.get_configs(input_data, output_data), 'params': self.get_params(input_data, output_data), 'args': self.get_args()} - self.update_configs(configs, execution_configs) + self.update_configs(configs, execution.job_configs) creator = pig_workflow.PigWorkflowCreator() creator.build_workflow_xml(self.name, configuration=configs['configs'], @@ -92,10 +93,10 @@ class HiveFactory(BaseFactory): def get_script_name(self, job): return conductor.job_main_name(context.ctx(), job) - def get_workflow_xml(self, execution_configs, input_data, output_data): + def get_workflow_xml(self, execution, input_data, output_data): configs = {'configs': self.get_configs(input_data, output_data), 'params': self.get_params(input_data, output_data)} - self.update_configs(configs, execution_configs) + self.update_configs(configs, execution.job_configs) creator = hive_workflow.HiveWorkflowCreator() creator.build_workflow_xml(self.name, self.job_xml, @@ -120,14 +121,40 @@ class MapReduceFactory(BaseFactory): configs['mapred.output.dir'] = output_data.url return configs - def get_workflow_xml(self, execution_configs, input_data, output_data): + def get_workflow_xml(self, execution, input_data, output_data): configs = {'configs': self.get_configs(input_data, output_data)} - self.update_configs(configs, execution_configs) + self.update_configs(configs, execution.job_configs) creator = mapreduce_workflow.MapReduceWorkFlowCreator() creator.build_workflow_xml(configuration=configs['configs']) return creator.get_built_workflow_xml() +class JavaFactory(BaseFactory): + + def get_workflow_xml(self, execution, *args, **kwargs): + # input and output will be handled as args, so we don't really + # know whether or not to include the swift configs. Hmmm. + configs = {'configs': {}} + self.update_configs(configs, execution.job_configs) + + # Update is not supported for list types, and besides + # since args are listed (not named) update doesn't make + # sense, just replacement of any default args + configs['args'] = execution.job_configs.get('args', []) + + if hasattr(execution, 'java_opts'): + java_opts = execution.java_opts + else: + java_opts = "" + + creator = java_workflow.JavaWorkflowCreator() + creator.build_workflow_xml(execution.main_class, + configuration=configs['configs'], + java_opts=java_opts, + arguments=configs['args']) + return creator.get_built_workflow_xml() + + def get_creator(job): def make_PigFactory(): @@ -140,6 +167,7 @@ def get_creator(job): MapReduceFactory, make_HiveFactory, make_PigFactory, + JavaFactory, # Keep 'Jar' as a synonym for 'MapReduce' MapReduceFactory, ] @@ -151,6 +179,10 @@ def get_creator(job): def get_possible_job_config(job_type): if job_type not in get_possible_job_types(): return None + + if job_type == "Java": + return {'job_config': {'configs': [], 'args': []}} + if job_type in ['MapReduce', 'Pig', 'Jar']: #TODO(nmakhotkin) Savanna should return config based on specific plugin cfg = xmlutils.load_hadoop_xml_defaults( @@ -163,7 +195,7 @@ def get_possible_job_config(job_type): cfg = xmlutils.load_hadoop_xml_defaults( 'plugins/vanilla/resources/hive-default.xml') config = {'configs': cfg, "args": {}} - if job_type not in ['MapReduce', 'Jar']: + if job_type not in ['MapReduce', 'Jar', 'Java']: config.update({'params': {}}) return {'job_config': config} @@ -173,5 +205,6 @@ def get_possible_job_types(): 'MapReduce', 'Hive', 'Pig', + 'Java', 'Jar', ] diff --git a/savanna/service/validations/edp/base.py b/savanna/service/validations/edp/base.py index 1c1aca53..766021be 100644 --- a/savanna/service/validations/edp/base.py +++ b/savanna/service/validations/edp/base.py @@ -42,6 +42,23 @@ job_configs = { } +java_job_configs = { + "type": "object", + "properties": { + "configs": { + "type": "simple_config", + }, + "args": { + "type": "array", + "items": { + "type": "string", + } + } + }, + "additionalProperties": False, +} + + def check_data_source_unique_name(name): if name in [ds.name for ds in api.get_data_sources()]: raise ex.NameAlreadyExistsException("Data source with name '%s' " diff --git a/savanna/service/validations/edp/job.py b/savanna/service/validations/edp/job.py index f9bd7b43..ed336b62 100644 --- a/savanna/service/validations/edp/job.py +++ b/savanna/service/validations/edp/job.py @@ -35,6 +35,7 @@ JOB_SCHEMA = { "Pig", "Hive", "MapReduce", + "Java", # Leave this here for validation of create_job, # but it will be changed to MapReduce on creation "Jar", diff --git a/savanna/service/validations/edp/job_executor.py b/savanna/service/validations/edp/job_executor.py index d84053b4..c3ac9081 100644 --- a/savanna/service/validations/edp/job_executor.py +++ b/savanna/service/validations/edp/job_executor.py @@ -13,11 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import savanna.exceptions as ex +from savanna.service.edp import api import savanna.service.validations.base as main_base import savanna.service.validations.edp.base as b -JOB_EXEC_SCHEMA = { +MR_EXEC_SCHEMA = { "type": "object", "properties": { "input_id": { @@ -38,12 +40,50 @@ JOB_EXEC_SCHEMA = { "required": [ "input_id", "output_id", - "cluster_id", + "cluster_id" ] } -def check_job_executor(data, **kwargs): - b.check_data_source_exists(data['input_id']) - b.check_data_source_exists(data['output_id']) +JAVA_EXEC_SCHEMA = { + "type": "object", + "properties": { + "main_class": { + "type": "string", + }, + "java_opts": { + "type": "string", + }, + "cluster_id": { + "type": "string", + "format": "uuid", + }, + "job_configs": b.java_job_configs, + }, + "additionalProperties": False, + "required": [ + "cluster_id", + "main_class", + ] +} + + +JOB_EXEC_SCHEMA = { + "oneOf": [MR_EXEC_SCHEMA, JAVA_EXEC_SCHEMA] +} + + +def check_job_executor(data, job_id): + job = api.get_job(job_id) + + # Make sure we have the right schema for the job type + # We can identify the Java action schema by looking for 'main_class' + if ('main_class' in data) ^ (job.type == 'Java'): + raise ex.InvalidException("Schema is not valid for job type %s" + % job.type) + + if 'input_id' in data: + b.check_data_source_exists(data['input_id']) + b.check_data_source_exists(data['output_id']) + main_base.check_cluster_exists(data['cluster_id']) diff --git a/savanna/tests/unit/service/edp/test_job_manager.py b/savanna/tests/unit/service/edp/test_job_manager.py index 7c020cc9..a5ca2d7c 100644 --- a/savanna/tests/unit/service/edp/test_job_manager.py +++ b/savanna/tests/unit/service/edp/test_job_manager.py @@ -25,6 +25,9 @@ from savanna.utils import patches as p conductor = cond.API +_java_main_class = "org.apache.hadoop.examples.WordCount" +_java_opts = "-Dparam1=val1 -Dparam2=val2" + def _resource_passthrough(*args, **kwargs): return True @@ -105,7 +108,7 @@ class TestJobManager(models_test_base.DbTestCase): creator = workflow_factory.get_creator(job) - res = creator.get_workflow_xml(job_exec.job_configs, + res = creator.get_workflow_xml(job_exec, input_data, output_data) self.assertIn(""" @@ -134,7 +137,7 @@ class TestJobManager(models_test_base.DbTestCase): creator = workflow_factory.get_creator(job) - res = creator.get_workflow_xml(job_exec.job_configs, + res = creator.get_workflow_xml(job_exec, input_data, output_data) self.assertIn(""" @@ -161,19 +164,45 @@ class TestJobManager(models_test_base.DbTestCase): admin """, res) - def test_jar_creator_is_mapreduce(self): - # Ensure that we get the MapReduce workflow factory for 'Jar' jobs - job, _ = _create_all_stack('Jar') - - creator = workflow_factory.get_creator(job) - self.assertEqual(type(creator), workflow_factory.MapReduceFactory) - def test_build_workflow_for_job_mapreduce(self): self._build_workflow_common('MapReduce') def test_build_workflow_for_job_jar(self): self._build_workflow_common('Jar') + def test_build_workflow_for_job_java(self): + # If args include swift paths, user and password values + # will have to be supplied via configs instead of being + # lifted from input or output data sources + configs = {workflow_factory.swift_username: 'admin', + workflow_factory.swift_password: 'admin1'} + + configs = { + 'configs': configs, + 'args': ['input_path', + 'output_path'] + } + + job, job_exec = _create_all_stack('Java', configs) + creator = workflow_factory.get_creator(job) + res = creator.get_workflow_xml(job_exec) + + self.assertIn(""" + + + fs.swift.service.savanna.password + admin1 + + + fs.swift.service.savanna.username + admin + + + %s + %s + input_path + output_path""" % (_java_main_class, _java_opts), res) + @mock.patch('savanna.conductor.API.job_binary_get') def test_build_workflow_for_job_hive(self, job_binary): @@ -185,7 +214,7 @@ class TestJobManager(models_test_base.DbTestCase): creator = workflow_factory.get_creator(job) - res = creator.get_workflow_xml(job_exec.job_configs, + res = creator.get_workflow_xml(job_exec, input_data, output_data) self.assertIn(""" @@ -210,11 +239,12 @@ class TestJobManager(models_test_base.DbTestCase): input_data = _create_data_source('swift://ex.savanna/i') output_data = _create_data_source('swift://ex.savanna/o') - job_exec = _create_job_exec(job.id, configs={"configs": {'c': 'f'}}) + job_exec = _create_job_exec(job.id, + job_type, configs={"configs": {'c': 'f'}}) creator = workflow_factory.get_creator(job) - res = creator.get_workflow_xml(job_exec.job_configs, + res = creator.get_workflow_xml(job_exec, input_data, output_data) self.assertIn(""" @@ -241,11 +271,18 @@ class TestJobManager(models_test_base.DbTestCase): def test_build_workflow_for_job_jar_with_conf(self): self._build_workflow_with_conf_common('Jar') + def test_jar_creator_is_mapreduce(self): + # Ensure that we get the MapReduce workflow factory for 'Jar' jobs + job, _ = _create_all_stack('Jar') -def _create_all_stack(type): + creator = workflow_factory.get_creator(job) + self.assertEqual(type(creator), workflow_factory.MapReduceFactory) + + +def _create_all_stack(type, configs=None): b = _create_job_binary('1', type) j = _create_job('2', b, type) - e = _create_job_exec(j.id) + e = _create_job_exec(j.id, type, configs) return j, e @@ -257,7 +294,7 @@ def _create_job(id, job_binary, type): if type == 'Pig' or type == 'Hive': job.mains = [job_binary] job.libs = None - if type in ['MapReduce', 'Jar']: + if type in ['MapReduce', 'Jar', 'Java']: job.libs = [job_binary] job.mains = None return job @@ -269,7 +306,7 @@ def _create_job_binary(id, type): binary.url = "savanna-db://42" if type == "Pig": binary.name = "script.pig" - if type in ['MapReduce', 'Jar']: + if type in ['MapReduce', 'Jar', 'Java']: binary.name = "main.jar" if type == "Hive": binary.name = "script.q" @@ -286,8 +323,11 @@ def _create_data_source(url): return data_source -def _create_job_exec(job_id, configs=None): +def _create_job_exec(job_id, type, configs=None): j_exec = mock.Mock() j_exec.job_id = job_id j_exec.job_configs = configs + if type == "Java": + j_exec.main_class = _java_main_class + j_exec.java_opts = _java_opts return j_exec diff --git a/savanna/tests/unit/service/edp/workflow_creator/test_create_workflow.py b/savanna/tests/unit/service/edp/workflow_creator/test_create_workflow.py index 30882066..22b9762f 100644 --- a/savanna/tests/unit/service/edp/workflow_creator/test_create_workflow.py +++ b/savanna/tests/unit/service/edp/workflow_creator/test_create_workflow.py @@ -16,6 +16,7 @@ import unittest2 from savanna.service.edp.workflow_creator import hive_workflow as hw +from savanna.service.edp.workflow_creator import java_workflow as jw from savanna.service.edp.workflow_creator import mapreduce_workflow as mrw from savanna.service.edp.workflow_creator import pig_workflow as pw from savanna.utils import patches as p @@ -143,3 +144,46 @@ class TestPigWorkflowCreator(unittest2.TestCase): """ self.assertIn(hive_action, res) + + def test_create_java_workflow(self): + java_workflow = jw.JavaWorkflowCreator() + main_class = 'org.apache.hadoop.examples.SomeClass' + args = ['/user/hadoop/input', + '/user/hadoop/output'] + java_opts = '-Dparam1=val1 -Dparam2=val2' + + java_workflow.build_workflow_xml(main_class, + self.prepare, + self.job_xml, self.configuration, + java_opts, args, + self.files, self.archives) + res = java_workflow.get_built_workflow_xml() + java_action = """ + ${jobTracker} + ${nameNode} + + + + + + job_xml.xml + + + conf_param_1 + conf_value_1 + + + conf_param_2 + conf_value_3 + + + org.apache.hadoop.examples.SomeClass + -Dparam1=val1 -Dparam2=val2 + /user/hadoop/input + /user/hadoop/output + file1 + file2 + arch1 + """ + + self.assertIn(java_action, res) diff --git a/savanna/tests/unit/service/validation/edp/test_job.py b/savanna/tests/unit/service/validation/edp/test_job.py index cecff990..f2ed65f8 100644 --- a/savanna/tests/unit/service/validation/edp/test_job.py +++ b/savanna/tests/unit/service/validation/edp/test_job.py @@ -24,7 +24,7 @@ class TestJobValidation(u.ValidationTestCase): self.scheme = j.JOB_SCHEMA def test_empty_mains_and_libs(self): - for job_type in ['MapReduce', 'Jar']: + for job_type in ['MapReduce', 'Java', 'Jar']: self._assert_create_object_validation( data={ "name": "jar.jar", @@ -51,7 +51,7 @@ class TestJobValidation(u.ValidationTestCase): "Hive flow requires main script")) def test_overlap_libs(self): - for job_type in ['MapReduce', 'Jar']: + for job_type in ['MapReduce', 'Java', 'Jar']: self._assert_create_object_validation( data={ "name": "jar.jar",