Add Oozie java action workflows

Change-Id: I6559f75deb7d238c8ab4e2df2f0483f100143b93
Implements: blueprint edp-oozie-java-action
This commit is contained in:
Trevor McKay 2014-01-02 14:28:19 -05:00
parent fc3cf8bcf0
commit afe8da21f9
12 changed files with 292 additions and 47 deletions

View File

@ -35,13 +35,7 @@ rest = u.Rest('v11', __name__)
@v.check_exists(api.get_job, id='job_id')
@v.validate(v_j_e.JOB_EXEC_SCHEMA, v_j_e.check_job_executor)
def job_execute(job_id, data):
input = data['input_id']
output = data['output_id']
cluster = data['cluster_id']
configs = data.get('job_configs', {})
return u.render(job_execution=api.execute_job(job_id, input,
output, cluster,
configs).to_dict())
return u.render(job_execution=api.execute_job(job_id, data).to_dict())
@rest.get('/jobs/config-hints/<job_type>')

View File

@ -265,6 +265,8 @@ class JobExecution(mb.SavannaBase):
oozie_job_id = sa.Column(sa.String(100))
return_code = sa.Column(sa.String(80))
job_configs = sa.Column(st.JsonDictType())
main_class = sa.Column(sa.String)
java_opts = sa.Column(sa.String)
mains_association = sa.Table("mains_association",

View File

@ -29,10 +29,28 @@ def get_job_config_hints(job_type):
return w_f.get_possible_job_config(job_type)
def execute_job(job_id, input_id, output_id, cluster_id, configs):
job_ex_dict = {'input_id': input_id, 'output_id': output_id,
def execute_job(job_id, data):
# Elements common to all job types
cluster_id = data['cluster_id']
configs = data.get('job_configs', {})
# Not in Java job types but present for all others
input_id = data.get('input_id', None)
output_id = data.get('output_id', None)
# Present for Java job types
main_class = data.get('main_class', '')
java_opts = data.get('java_opts', '')
# Since we will use a unified class in the database, we pass
# a superset for all job types
job_ex_dict = {'main_class': main_class,
'java_opts': java_opts,
'input_id': input_id, 'output_id': output_id,
'job_id': job_id, 'cluster_id': cluster_id,
'info': {'status': 'Pending'}, 'job_configs': configs}
job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict)
context.spawn("Starting Job Execution %s" % job_execution.id,

View File

@ -107,9 +107,12 @@ def run_job(job_execution):
return job_execution
job = conductor.job_get(ctx, job_execution.job_id)
if job.type != 'Java':
input_source = conductor.data_source_get(ctx, job_execution.input_id)
output_source = conductor.data_source_get(ctx, job_execution.output_id)
else:
input_source = None
output_source = None
#TODO(nprivalova): should be removed after all features implemented
validate(input_source, output_source, job)
@ -124,8 +127,9 @@ def run_job(job_execution):
# uploading hive configuration
creator.configure_workflow_if_needed(cluster, wf_dir)
wf_xml = creator.get_workflow_xml(job_execution.job_configs,
input_source, output_source)
wf_xml = creator.get_workflow_xml(job_execution,
input_source,
output_source)
path_to_workflow = upload_workflow_file(u.get_jobtracker(cluster),
wf_dir, wf_xml, hdfs_user)
@ -205,7 +209,8 @@ def _append_slash_if_needed(path):
#TODO(nprivalova): this validation should be removed after implementing
# all features
def validate(input_data, output_data, job):
if input_data.type != 'swift' or output_data.type != 'swift':
if (input_data and input_data.type != 'swift') or\
(output_data and output_data.type != 'swift'):
raise RuntimeError
if job.type not in ['Pig', 'MapReduce', 'Hive', 'Jar']:
if job.type not in ['Pig', 'MapReduce', 'Hive', 'Java', 'Jar']:
raise RuntimeError

View File

@ -0,0 +1,51 @@
# Copyright (c) 2013 RedHat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.service.edp.workflow_creator import base_workflow
from savanna.utils import xmlutils as x
class JavaWorkflowCreator(base_workflow.OozieWorkflowCreator):
def __init__(self):
super(JavaWorkflowCreator, self).__init__('java')
def build_workflow_xml(self, main_class,
prepare={},
job_xml=None,
configuration=None,
java_opts=None,
arguments=[],
files=[], archives=[]):
for k, v in prepare.items():
self._add_to_prepare_element(k, v)
self._add_job_xml_element(job_xml)
self._add_configuration_elements(configuration)
x.add_text_element_to_tag(self.doc, self.tag_name,
'main-class', main_class)
if java_opts:
x.add_text_element_to_tag(self.doc, self.tag_name,
'java-opts', java_opts)
for arg in arguments:
x.add_text_element_to_tag(self.doc, self.tag_name,
'arg', arg)
self._add_files_and_archives(files, archives)

View File

@ -21,6 +21,7 @@ from savanna.plugins import base as plugin_base
from savanna.plugins.general import utils as u
from savanna.service.edp import hdfs_helper as h
from savanna.service.edp.workflow_creator import hive_workflow
from savanna.service.edp.workflow_creator import java_workflow
from savanna.service.edp.workflow_creator import mapreduce_workflow
from savanna.service.edp.workflow_creator import pig_workflow
from savanna.utils import remote
@ -69,11 +70,11 @@ class PigFactory(BaseFactory):
def get_script_name(self, job):
return conductor.job_main_name(context.ctx(), job)
def get_workflow_xml(self, execution_configs, input_data, output_data):
def get_workflow_xml(self, execution, input_data, output_data):
configs = {'configs': self.get_configs(input_data, output_data),
'params': self.get_params(input_data, output_data),
'args': self.get_args()}
self.update_configs(configs, execution_configs)
self.update_configs(configs, execution.job_configs)
creator = pig_workflow.PigWorkflowCreator()
creator.build_workflow_xml(self.name,
configuration=configs['configs'],
@ -92,10 +93,10 @@ class HiveFactory(BaseFactory):
def get_script_name(self, job):
return conductor.job_main_name(context.ctx(), job)
def get_workflow_xml(self, execution_configs, input_data, output_data):
def get_workflow_xml(self, execution, input_data, output_data):
configs = {'configs': self.get_configs(input_data, output_data),
'params': self.get_params(input_data, output_data)}
self.update_configs(configs, execution_configs)
self.update_configs(configs, execution.job_configs)
creator = hive_workflow.HiveWorkflowCreator()
creator.build_workflow_xml(self.name,
self.job_xml,
@ -120,14 +121,40 @@ class MapReduceFactory(BaseFactory):
configs['mapred.output.dir'] = output_data.url
return configs
def get_workflow_xml(self, execution_configs, input_data, output_data):
def get_workflow_xml(self, execution, input_data, output_data):
configs = {'configs': self.get_configs(input_data, output_data)}
self.update_configs(configs, execution_configs)
self.update_configs(configs, execution.job_configs)
creator = mapreduce_workflow.MapReduceWorkFlowCreator()
creator.build_workflow_xml(configuration=configs['configs'])
return creator.get_built_workflow_xml()
class JavaFactory(BaseFactory):
def get_workflow_xml(self, execution, *args, **kwargs):
# input and output will be handled as args, so we don't really
# know whether or not to include the swift configs. Hmmm.
configs = {'configs': {}}
self.update_configs(configs, execution.job_configs)
# Update is not supported for list types, and besides
# since args are listed (not named) update doesn't make
# sense, just replacement of any default args
configs['args'] = execution.job_configs.get('args', [])
if hasattr(execution, 'java_opts'):
java_opts = execution.java_opts
else:
java_opts = ""
creator = java_workflow.JavaWorkflowCreator()
creator.build_workflow_xml(execution.main_class,
configuration=configs['configs'],
java_opts=java_opts,
arguments=configs['args'])
return creator.get_built_workflow_xml()
def get_creator(job):
def make_PigFactory():
@ -140,6 +167,7 @@ def get_creator(job):
MapReduceFactory,
make_HiveFactory,
make_PigFactory,
JavaFactory,
# Keep 'Jar' as a synonym for 'MapReduce'
MapReduceFactory,
]
@ -151,6 +179,10 @@ def get_creator(job):
def get_possible_job_config(job_type):
if job_type not in get_possible_job_types():
return None
if job_type == "Java":
return {'job_config': {'configs': [], 'args': []}}
if job_type in ['MapReduce', 'Pig', 'Jar']:
#TODO(nmakhotkin) Savanna should return config based on specific plugin
cfg = xmlutils.load_hadoop_xml_defaults(
@ -163,7 +195,7 @@ def get_possible_job_config(job_type):
cfg = xmlutils.load_hadoop_xml_defaults(
'plugins/vanilla/resources/hive-default.xml')
config = {'configs': cfg, "args": {}}
if job_type not in ['MapReduce', 'Jar']:
if job_type not in ['MapReduce', 'Jar', 'Java']:
config.update({'params': {}})
return {'job_config': config}
@ -173,5 +205,6 @@ def get_possible_job_types():
'MapReduce',
'Hive',
'Pig',
'Java',
'Jar',
]

View File

@ -42,6 +42,23 @@ job_configs = {
}
java_job_configs = {
"type": "object",
"properties": {
"configs": {
"type": "simple_config",
},
"args": {
"type": "array",
"items": {
"type": "string",
}
}
},
"additionalProperties": False,
}
def check_data_source_unique_name(name):
if name in [ds.name for ds in api.get_data_sources()]:
raise ex.NameAlreadyExistsException("Data source with name '%s' "

View File

@ -35,6 +35,7 @@ JOB_SCHEMA = {
"Pig",
"Hive",
"MapReduce",
"Java",
# Leave this here for validation of create_job,
# but it will be changed to MapReduce on creation
"Jar",

View File

@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import savanna.exceptions as ex
from savanna.service.edp import api
import savanna.service.validations.base as main_base
import savanna.service.validations.edp.base as b
JOB_EXEC_SCHEMA = {
MR_EXEC_SCHEMA = {
"type": "object",
"properties": {
"input_id": {
@ -38,12 +40,50 @@ JOB_EXEC_SCHEMA = {
"required": [
"input_id",
"output_id",
"cluster_id",
"cluster_id"
]
}
def check_job_executor(data, **kwargs):
JAVA_EXEC_SCHEMA = {
"type": "object",
"properties": {
"main_class": {
"type": "string",
},
"java_opts": {
"type": "string",
},
"cluster_id": {
"type": "string",
"format": "uuid",
},
"job_configs": b.java_job_configs,
},
"additionalProperties": False,
"required": [
"cluster_id",
"main_class",
]
}
JOB_EXEC_SCHEMA = {
"oneOf": [MR_EXEC_SCHEMA, JAVA_EXEC_SCHEMA]
}
def check_job_executor(data, job_id):
job = api.get_job(job_id)
# Make sure we have the right schema for the job type
# We can identify the Java action schema by looking for 'main_class'
if ('main_class' in data) ^ (job.type == 'Java'):
raise ex.InvalidException("Schema is not valid for job type %s"
% job.type)
if 'input_id' in data:
b.check_data_source_exists(data['input_id'])
b.check_data_source_exists(data['output_id'])
main_base.check_cluster_exists(data['cluster_id'])

View File

@ -25,6 +25,9 @@ from savanna.utils import patches as p
conductor = cond.API
_java_main_class = "org.apache.hadoop.examples.WordCount"
_java_opts = "-Dparam1=val1 -Dparam2=val2"
def _resource_passthrough(*args, **kwargs):
return True
@ -105,7 +108,7 @@ class TestJobManager(models_test_base.DbTestCase):
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec.job_configs,
res = creator.get_workflow_xml(job_exec,
input_data, output_data)
self.assertIn("""
@ -134,7 +137,7 @@ class TestJobManager(models_test_base.DbTestCase):
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec.job_configs,
res = creator.get_workflow_xml(job_exec,
input_data, output_data)
self.assertIn("""
@ -161,19 +164,45 @@ class TestJobManager(models_test_base.DbTestCase):
<value>admin</value>
</property>""", res)
def test_jar_creator_is_mapreduce(self):
# Ensure that we get the MapReduce workflow factory for 'Jar' jobs
job, _ = _create_all_stack('Jar')
creator = workflow_factory.get_creator(job)
self.assertEqual(type(creator), workflow_factory.MapReduceFactory)
def test_build_workflow_for_job_mapreduce(self):
self._build_workflow_common('MapReduce')
def test_build_workflow_for_job_jar(self):
self._build_workflow_common('Jar')
def test_build_workflow_for_job_java(self):
# If args include swift paths, user and password values
# will have to be supplied via configs instead of being
# lifted from input or output data sources
configs = {workflow_factory.swift_username: 'admin',
workflow_factory.swift_password: 'admin1'}
configs = {
'configs': configs,
'args': ['input_path',
'output_path']
}
job, job_exec = _create_all_stack('Java', configs)
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec)
self.assertIn("""
<configuration>
<property>
<name>fs.swift.service.savanna.password</name>
<value>admin1</value>
</property>
<property>
<name>fs.swift.service.savanna.username</name>
<value>admin</value>
</property>
</configuration>
<main-class>%s</main-class>
<java-opts>%s</java-opts>
<arg>input_path</arg>
<arg>output_path</arg>""" % (_java_main_class, _java_opts), res)
@mock.patch('savanna.conductor.API.job_binary_get')
def test_build_workflow_for_job_hive(self, job_binary):
@ -185,7 +214,7 @@ class TestJobManager(models_test_base.DbTestCase):
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec.job_configs,
res = creator.get_workflow_xml(job_exec,
input_data, output_data)
self.assertIn("""
@ -210,11 +239,12 @@ class TestJobManager(models_test_base.DbTestCase):
input_data = _create_data_source('swift://ex.savanna/i')
output_data = _create_data_source('swift://ex.savanna/o')
job_exec = _create_job_exec(job.id, configs={"configs": {'c': 'f'}})
job_exec = _create_job_exec(job.id,
job_type, configs={"configs": {'c': 'f'}})
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec.job_configs,
res = creator.get_workflow_xml(job_exec,
input_data, output_data)
self.assertIn("""
@ -241,11 +271,18 @@ class TestJobManager(models_test_base.DbTestCase):
def test_build_workflow_for_job_jar_with_conf(self):
self._build_workflow_with_conf_common('Jar')
def test_jar_creator_is_mapreduce(self):
# Ensure that we get the MapReduce workflow factory for 'Jar' jobs
job, _ = _create_all_stack('Jar')
def _create_all_stack(type):
creator = workflow_factory.get_creator(job)
self.assertEqual(type(creator), workflow_factory.MapReduceFactory)
def _create_all_stack(type, configs=None):
b = _create_job_binary('1', type)
j = _create_job('2', b, type)
e = _create_job_exec(j.id)
e = _create_job_exec(j.id, type, configs)
return j, e
@ -257,7 +294,7 @@ def _create_job(id, job_binary, type):
if type == 'Pig' or type == 'Hive':
job.mains = [job_binary]
job.libs = None
if type in ['MapReduce', 'Jar']:
if type in ['MapReduce', 'Jar', 'Java']:
job.libs = [job_binary]
job.mains = None
return job
@ -269,7 +306,7 @@ def _create_job_binary(id, type):
binary.url = "savanna-db://42"
if type == "Pig":
binary.name = "script.pig"
if type in ['MapReduce', 'Jar']:
if type in ['MapReduce', 'Jar', 'Java']:
binary.name = "main.jar"
if type == "Hive":
binary.name = "script.q"
@ -286,8 +323,11 @@ def _create_data_source(url):
return data_source
def _create_job_exec(job_id, configs=None):
def _create_job_exec(job_id, type, configs=None):
j_exec = mock.Mock()
j_exec.job_id = job_id
j_exec.job_configs = configs
if type == "Java":
j_exec.main_class = _java_main_class
j_exec.java_opts = _java_opts
return j_exec

View File

@ -16,6 +16,7 @@
import unittest2
from savanna.service.edp.workflow_creator import hive_workflow as hw
from savanna.service.edp.workflow_creator import java_workflow as jw
from savanna.service.edp.workflow_creator import mapreduce_workflow as mrw
from savanna.service.edp.workflow_creator import pig_workflow as pw
from savanna.utils import patches as p
@ -143,3 +144,46 @@ class TestPigWorkflowCreator(unittest2.TestCase):
</hive>"""
self.assertIn(hive_action, res)
def test_create_java_workflow(self):
java_workflow = jw.JavaWorkflowCreator()
main_class = 'org.apache.hadoop.examples.SomeClass'
args = ['/user/hadoop/input',
'/user/hadoop/output']
java_opts = '-Dparam1=val1 -Dparam2=val2'
java_workflow.build_workflow_xml(main_class,
self.prepare,
self.job_xml, self.configuration,
java_opts, args,
self.files, self.archives)
res = java_workflow.get_built_workflow_xml()
java_action = """
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<mkdir path="mkdir_1"/>
<delete path="delete_dir_1"/>
<delete path="delete_dir_2"/>
</prepare>
<job-xml>job_xml.xml</job-xml>
<configuration>
<property>
<name>conf_param_1</name>
<value>conf_value_1</value>
</property>
<property>
<name>conf_param_2</name>
<value>conf_value_3</value>
</property>
</configuration>
<main-class>org.apache.hadoop.examples.SomeClass</main-class>
<java-opts>-Dparam1=val1 -Dparam2=val2</java-opts>
<arg>/user/hadoop/input</arg>
<arg>/user/hadoop/output</arg>
<file>file1</file>
<file>file2</file>
<archive>arch1</archive>
</java>"""
self.assertIn(java_action, res)

View File

@ -24,7 +24,7 @@ class TestJobValidation(u.ValidationTestCase):
self.scheme = j.JOB_SCHEMA
def test_empty_mains_and_libs(self):
for job_type in ['MapReduce', 'Jar']:
for job_type in ['MapReduce', 'Java', 'Jar']:
self._assert_create_object_validation(
data={
"name": "jar.jar",
@ -51,7 +51,7 @@ class TestJobValidation(u.ValidationTestCase):
"Hive flow requires main script"))
def test_overlap_libs(self):
for job_type in ['MapReduce', 'Jar']:
for job_type in ['MapReduce', 'Java', 'Jar']:
self._assert_create_object_validation(
data={
"name": "jar.jar",