Merge "Minor EDP refactoring"

This commit is contained in:
Jenkins 2014-06-16 06:56:24 +00:00 committed by Gerrit Code Review
commit 8d2c8f96ee
3 changed files with 130 additions and 83 deletions

View File

@ -82,23 +82,16 @@ def update_job_statuses():
(je.id, e)) (je.id, e))
def _get_hdfs_user(cluster): def _get_plugin(cluster):
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) return plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
hdfs_user = plugin.get_hdfs_user()
return hdfs_user
def _create_oozie_client(cluster): def _create_oozie_client(cluster):
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) plugin = _get_plugin(cluster)
return o.OozieClient(plugin.get_oozie_server_uri(cluster), return o.OozieClient(plugin.get_oozie_server_uri(cluster),
plugin.get_oozie_server(cluster)) plugin.get_oozie_server(cluster))
def _get_oozie_server(cluster):
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
return plugin.get_oozie_server(cluster)
def cancel_job(job_execution_id): def cancel_job(job_execution_id):
ctx = context.ctx() ctx = context.ctx()
job_execution = conductor.job_execution_get(ctx, job_execution_id) job_execution = conductor.job_execution_get(ctx, job_execution_id)
@ -116,6 +109,41 @@ def cancel_job(job_execution_id):
return job_execution return job_execution
def _update_job_execution_extra(job_execution, cluster):
if CONF.use_namespaces and not CONF.use_floating_ips:
oozie = _get_plugin(cluster).get_oozie_server(cluster)
info = oozie.remote().get_neutron_info()
extra = job_execution.extra.copy()
extra['neutron'] = info
job_execution = conductor.job_execution_update(
context.ctx(), job_execution.id, {'extra': extra})
return job_execution
def _get_data_sources(job_execution, job):
if edp.compare_job_type(job.type, edp.JOB_TYPE_JAVA):
return None, None
ctx = context.ctx()
input_source = conductor.data_source_get(ctx, job_execution.input_id)
output_source = conductor.data_source_get(ctx, job_execution.output_id)
return input_source, output_source
def _get_oozie_job_params(cluster, hdfs_user, path_to_workflow):
plugin = _get_plugin(cluster)
rm_path = plugin.get_resource_manager_uri(cluster)
nn_path = plugin.get_name_node_uri(cluster)
job_parameters = {
"jobTracker": rm_path,
"nameNode": nn_path,
"user.name": hdfs_user,
"oozie.wf.application.path": "%s%s" % (nn_path, path_to_workflow),
"oozie.use.system.libpath": "true"}
return job_parameters
def run_job(job_execution_id): def run_job(job_execution_id):
try: try:
_run_job(job_execution_id) _run_job(job_execution_id)
@ -133,74 +161,45 @@ def run_job(job_execution_id):
def _run_job(job_execution_id): def _run_job(job_execution_id):
ctx = context.ctx() ctx = context.ctx()
job_execution = conductor.job_execution_get(ctx, job_execution = conductor.job_execution_get(ctx, job_execution_id)
job_execution_id)
cluster = conductor.cluster_get(ctx, job_execution.cluster_id) cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
if cluster.status != 'Active': if cluster.status != 'Active':
return return
if CONF.use_namespaces and not CONF.use_floating_ips: job_execution = _update_job_execution_extra(job_execution, cluster)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
oozie = plugin.get_oozie_server(cluster)
info = oozie.remote().get_neutron_info()
extra = job_execution.extra.copy()
extra['neutron'] = info
job_execution = conductor.job_execution_update(ctx,
job_execution_id,
{'extra': extra})
job = conductor.job_get(ctx, job_execution.job_id) job = conductor.job_get(ctx, job_execution.job_id)
if not edp.compare_job_type(job.type, edp.JOB_TYPE_JAVA): input_source, output_source = _get_data_sources(job_execution, job)
input_source = conductor.data_source_get(ctx, job_execution.input_id)
output_source = conductor.data_source_get(ctx, job_execution.output_id)
else:
input_source = None
output_source = None
for data_source in [input_source, output_source]: for data_source in [input_source, output_source]:
if data_source and data_source.type == 'hdfs': if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(cluster, data_source) h.configure_cluster_for_hdfs(cluster, data_source)
hdfs_user = _get_hdfs_user(cluster) plugin = _get_plugin(cluster)
oozie_server = _get_oozie_server(cluster) hdfs_user = plugin.get_hdfs_user()
oozie_server = plugin.get_oozie_server(cluster)
wf_dir = create_workflow_dir(oozie_server, job, hdfs_user) wf_dir = create_workflow_dir(oozie_server, job, hdfs_user)
upload_job_files(oozie_server, wf_dir, job, hdfs_user) upload_job_files(oozie_server, wf_dir, job, hdfs_user)
creator = workflow_factory.get_creator(job) wf_xml = workflow_factory.get_workflow_xml(
job, cluster, job_execution, input_source, output_source)
wf_xml = creator.get_workflow_xml(cluster, job_execution,
input_source, output_source)
path_to_workflow = upload_workflow_file(oozie_server, path_to_workflow = upload_workflow_file(oozie_server,
wf_dir, wf_xml, hdfs_user) wf_dir, wf_xml, hdfs_user)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
rm_path = plugin.get_resource_manager_uri(cluster)
nn_path = plugin.get_name_node_uri(cluster)
client = _create_oozie_client(cluster) client = _create_oozie_client(cluster)
job_parameters = {"jobTracker": rm_path, job_params = _get_oozie_job_params(cluster, hdfs_user, path_to_workflow)
"nameNode": nn_path, oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
"user.name": hdfs_user,
"oozie.wf.application.path":
"%s%s" % (nn_path, path_to_workflow),
"oozie.use.system.libpath": "true"}
oozie_job_id = client.add_job(x.create_hadoop_xml(job_parameters),
job_execution) job_execution)
job_execution = conductor.job_execution_update(ctx, job_execution, job_execution = conductor.job_execution_update(
{'oozie_job_id': ctx, job_execution, {'oozie_job_id': oozie_job_id,
oozie_job_id, 'start_time': datetime.datetime.now()})
'start_time':
datetime.datetime.now()})
client.run_job(job_execution, oozie_job_id) client.run_job(job_execution, oozie_job_id)
def upload_job_files(where, job_dir, job, hdfs_user): def upload_job_files(where, job_dir, job, hdfs_user):
mains = job.mains or [] mains = job.mains or []
libs = job.libs or [] libs = job.libs or []
uploaded_paths = [] uploaded_paths = []

View File

@ -208,7 +208,7 @@ class JavaFactory(BaseFactory):
return creator.get_built_workflow_xml() return creator.get_built_workflow_xml()
def get_creator(job): def _get_creator(job):
def make_PigFactory(): def make_PigFactory():
return PigFactory(job) return PigFactory(job)
@ -227,6 +227,11 @@ def get_creator(job):
return type_map[job.type]() return type_map[job.type]()
def get_workflow_xml(job, cluster, execution, *args, **kwargs):
return _get_creator(job).get_workflow_xml(
cluster, execution, *args, **kwargs)
def get_possible_job_config(job_type): def get_possible_job_config(job_type):
if not edp.compare_job_type(job_type, *edp.JOB_TYPES_ALL): if not edp.compare_job_type(job_type, *edp.JOB_TYPES_ALL):
return None return None

View File

@ -101,10 +101,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = _create_data_source('swift://ex/i') input_data = _create_data_source('swift://ex/i')
output_data = _create_data_source('swift://ex/o') output_data = _create_data_source('swift://ex/o')
creator = workflow_factory.get_creator(job) res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
res = creator.get_workflow_xml(_create_cluster(), job_exec,
input_data, output_data)
self.assertIn(""" self.assertIn("""
<param>INPUT=swift://ex.sahara/i</param> <param>INPUT=swift://ex.sahara/i</param>
@ -134,9 +132,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = _create_data_source('swift://ex/i') input_data = _create_data_source('swift://ex/i')
output_data = _create_data_source('hdfs://user/hadoop/out') output_data = _create_data_source('hdfs://user/hadoop/out')
creator = workflow_factory.get_creator(job) res = workflow_factory.get_workflow_xml(
res = creator.get_workflow_xml(_create_cluster(), job_exec, job, _create_cluster(), job_exec, input_data, output_data)
input_data, output_data)
self.assertIn(""" self.assertIn("""
<configuration> <configuration>
@ -153,10 +150,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = _create_data_source('hdfs://user/hadoop/in') input_data = _create_data_source('hdfs://user/hadoop/in')
output_data = _create_data_source('swift://ex/o') output_data = _create_data_source('swift://ex/o')
creator = workflow_factory.get_creator(job) res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
res = creator.get_workflow_xml(_create_cluster(), job_exec,
input_data, output_data)
self.assertIn(""" self.assertIn("""
<configuration> <configuration>
@ -175,10 +170,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = _create_data_source('hdfs://user/hadoop/in') input_data = _create_data_source('hdfs://user/hadoop/in')
output_data = _create_data_source('hdfs://user/hadoop/out') output_data = _create_data_source('hdfs://user/hadoop/out')
creator = workflow_factory.get_creator(job) res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
res = creator.get_workflow_xml(_create_cluster(), job_exec,
input_data, output_data)
self.assertIn(""" self.assertIn("""
<configuration> <configuration>
@ -201,10 +194,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = _create_data_source('swift://ex/i') input_data = _create_data_source('swift://ex/i')
output_data = _create_data_source('swift://ex/o') output_data = _create_data_source('swift://ex/o')
creator = workflow_factory.get_creator(job) res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
res = creator.get_workflow_xml(_create_cluster(), job_exec,
input_data, output_data)
if streaming: if streaming:
self.assertIn(""" self.assertIn("""
@ -255,8 +246,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
} }
job, job_exec = _create_all_stack(edp.JOB_TYPE_JAVA, configs) job, job_exec = _create_all_stack(edp.JOB_TYPE_JAVA, configs)
creator = workflow_factory.get_creator(job) res = workflow_factory.get_workflow_xml(
res = creator.get_workflow_xml(_create_cluster(), job_exec) job, _create_cluster(), job_exec)
self.assertIn(""" self.assertIn("""
<configuration> <configuration>
@ -283,10 +274,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = _create_data_source('swift://ex/i') input_data = _create_data_source('swift://ex/i')
output_data = _create_data_source('swift://ex/o') output_data = _create_data_source('swift://ex/o')
creator = workflow_factory.get_creator(job) res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
res = creator.get_workflow_xml(_create_cluster(), job_exec,
input_data, output_data)
self.assertIn(""" self.assertIn("""
<job-xml>/user/hadoop/conf/hive-site.xml</job-xml> <job-xml>/user/hadoop/conf/hive-site.xml</job-xml>
@ -313,10 +302,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
job_exec = _create_job_exec(job.id, job_exec = _create_job_exec(job.id,
job_type, configs={"configs": {'c': 'f'}}) job_type, configs={"configs": {'c': 'f'}})
creator = workflow_factory.get_creator(job) res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
res = creator.get_workflow_xml(_create_cluster(), job_exec,
input_data, output_data)
self.assertIn(""" self.assertIn("""
<property> <property>
@ -392,6 +379,57 @@ class TestJobManager(base.SaharaWithDbTestCase):
new_status = job_ex_upd.call_args[0][2]["info"]["status"] new_status = job_ex_upd.call_args[0][2]["info"]["status"]
self.assertEqual('FAILED', new_status) self.assertEqual('FAILED', new_status)
def test_get_plugin(self):
plugin = job_manager._get_plugin(_create_cluster())
self.assertEqual("vanilla", plugin.name)
@mock.patch('sahara.conductor.API.data_source_get')
def test_get_data_sources(self, ds):
job, job_exec = _create_all_stack(edp.JOB_TYPE_PIG)
job_exec.input_id = 's1'
job_exec.output_id = 's2'
ds.side_effect = _conductor_data_source_get
input_source, output_source = (
job_manager._get_data_sources(job_exec, job))
self.assertEqual('obj_s1', input_source)
self.assertEqual('obj_s2', output_source)
def test_get_data_sources_java(self):
configs = {sw.HADOOP_SWIFT_USERNAME: 'admin',
sw.HADOOP_SWIFT_PASSWORD: 'admin1'}
configs = {
'configs': configs,
'args': ['swift://ex/i',
'output_path']
}
job, job_exec = _create_all_stack(edp.JOB_TYPE_JAVA, configs)
input_source, output_source = (
job_manager._get_data_sources(job_exec, job))
self.assertEqual(None, input_source)
self.assertEqual(None, output_source)
@mock.patch('sahara.service.edp.job_manager._get_plugin')
def test_get_oozie_job_params(self, getplugin):
plugin = mock.Mock()
getplugin.return_value = plugin
plugin.get_resource_manager_uri.return_value = 'http://localhost:50030'
plugin.get_name_node_uri.return_value = 'hdfs://localhost:8020'
cluster = _create_cluster()
job_params = job_manager._get_oozie_job_params(cluster, 'hadoop',
'/tmp')
self.assertEqual('http://localhost:50030', job_params["jobTracker"])
self.assertEqual('hdfs://localhost:8020', job_params["nameNode"])
self.assertEqual('hadoop', job_params["user.name"])
def _create_all_stack(type, configs=None): def _create_all_stack(type, configs=None):
b = _create_job_binary('1', type) b = _create_job_binary('1', type)
@ -430,6 +468,7 @@ def _create_job_binary(id, type):
def _create_cluster(): def _create_cluster():
cluster = mock.Mock() cluster = mock.Mock()
cluster.plugin_name = 'vanilla' cluster.plugin_name = 'vanilla'
cluster.plugin_version = '1.2.1'
return cluster return cluster
@ -453,3 +492,7 @@ def _create_job_exec(job_id, type, configs=None):
j_exec.job_configs['configs']['edp.java.main_class'] = _java_main_class j_exec.job_configs['configs']['edp.java.main_class'] = _java_main_class
j_exec.job_configs['configs']['edp.java.java_opts'] = _java_opts j_exec.job_configs['configs']['edp.java.java_opts'] = _java_opts
return j_exec return j_exec
def _conductor_data_source_get(ctx, id):
return "obj_" + id