Config parameters beginning with "oozie." should be in job properties file
In the Oozie EDP engine, look for configs beginning with 'oozie.' and pass them to _get_oozie_job_params() as additional values to be written to the job properties file. Do not allow the workflow application path to be overwritten (oozie.wf.application.path) since it is generated by EDP. Prevent configs beginning with 'oozie.' from being written to the workflow.xml file in workflow_factory.py Closes-Bug: 1419923 Change-Id: I75b60e5bc3d1afadac7c2b209e3ea68e4ba9e88b
This commit is contained in:
@@ -49,15 +49,23 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
return o.OozieClient(self.get_oozie_server_uri(self.cluster),
|
||||
self.get_oozie_server(self.cluster))
|
||||
|
||||
def _get_oozie_job_params(self, hdfs_user, path_to_workflow):
|
||||
def _get_oozie_job_params(self, hdfs_user, path_to_workflow, oozie_params):
|
||||
app_path = "oozie.wf.application.path"
|
||||
rm_path = self.get_resource_manager_uri(self.cluster)
|
||||
nn_path = self.get_name_node_uri(self.cluster)
|
||||
job_parameters = {
|
||||
"jobTracker": rm_path,
|
||||
"nameNode": nn_path,
|
||||
"user.name": hdfs_user,
|
||||
"oozie.wf.application.path": "%s%s" % (nn_path, path_to_workflow),
|
||||
app_path: "%s%s" % (nn_path, path_to_workflow),
|
||||
"oozie.use.system.libpath": "true"}
|
||||
|
||||
# Don't let the application path be overwritten, that can't
|
||||
# possibly make any sense
|
||||
if app_path in oozie_params:
|
||||
del oozie_params[app_path]
|
||||
|
||||
job_parameters.update(oozie_params)
|
||||
return job_parameters
|
||||
|
||||
def _upload_workflow_file(self, where, job_dir, wf_xml, hdfs_user):
|
||||
@@ -95,6 +103,14 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
proxy_configs = updated_job_configs.get('proxy_configs')
|
||||
configs = updated_job_configs.get('configs', {})
|
||||
|
||||
# Extract all the 'oozie.' configs so that they can be set in the
|
||||
# job properties file. These are config values for Oozie itself,
|
||||
# not the job code
|
||||
oozie_params = {}
|
||||
for k in list(configs):
|
||||
if k.startswith('oozie.'):
|
||||
oozie_params[k] = configs[k]
|
||||
|
||||
for data_source in [input_source, output_source] + additional_sources:
|
||||
if data_source and data_source.type == 'hdfs':
|
||||
h.configure_cluster_for_hdfs(self.cluster, data_source)
|
||||
@@ -119,7 +135,8 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
wf_xml, hdfs_user)
|
||||
|
||||
job_params = self._get_oozie_job_params(hdfs_user,
|
||||
path_to_workflow)
|
||||
path_to_workflow,
|
||||
oozie_params)
|
||||
|
||||
client = self._get_client()
|
||||
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
|
||||
|
||||
@@ -40,7 +40,8 @@ class BaseFactory(object):
|
||||
for k, v in six.iteritems(job_dict['configs']):
|
||||
if k.startswith('edp.'):
|
||||
edp_configs[k] = v
|
||||
else:
|
||||
elif not k.startswith('oozie.'):
|
||||
# 'oozie.' configs have been written to the properties file
|
||||
configs[k] = v
|
||||
return configs, edp_configs
|
||||
|
||||
|
||||
@@ -40,10 +40,21 @@ class TestOozieEngine(base.SaharaTestCase):
|
||||
|
||||
def test_get_oozie_job_params(self):
|
||||
oje = FakeOozieJobEngine(u.create_cluster())
|
||||
job_params = oje._get_oozie_job_params('hadoop', '/tmp')
|
||||
oozie_params = {'oozie.libpath': '/mylibpath',
|
||||
'oozie.wf.application.path': '/wrong'}
|
||||
job_params = oje._get_oozie_job_params('hadoop',
|
||||
'/tmp', oozie_params)
|
||||
self.assertEqual('http://localhost:50030', job_params["jobTracker"])
|
||||
self.assertEqual('hdfs://localhost:8020', job_params["nameNode"])
|
||||
self.assertEqual('hadoop', job_params["user.name"])
|
||||
self.assertEqual('hdfs://localhost:8020/tmp',
|
||||
job_params['oozie.wf.application.path'])
|
||||
self.assertEqual('/mylibpath', job_params['oozie.libpath'])
|
||||
|
||||
# Make sure this doesn't raise an exception
|
||||
job_params = oje._get_oozie_job_params('hadoop',
|
||||
'/tmp', {})
|
||||
self.assertNotIn('oozie.libpath', job_params)
|
||||
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
@mock.patch('sahara.utils.ssh_remote.InstanceInteropHelper')
|
||||
|
||||
Reference in New Issue
Block a user