Implementing constants for the job types used by EDP
Changes * adding job type variables to sahara.utils.edp * JOB_TYPE_HIVE * JOB_TYPE_JAVA * JOB_TYPE_MAPREDUCE * JOB_TYPE_MAPREDUCE_STREAMING * JOB_TYPE_PIG * adding job sub type variables to sahara.utils.edp * JOB_SUBTYPE_STREAMING * adding job types list variables to sahara.utils.edp * JOB_TYPES_ALL * replacing string literals with edp job type variables * removing get_possible_job_types from sahara/service/edp/workflow_creator/workflow_factory Implements: blueprint edp-job-type-constants Change-Id: I8624dbdcd53814dcabdf3692ed3aa9547413226a
This commit is contained in:
parent
a1538b6f23
commit
debac37821
|
@ -125,7 +125,7 @@ def run_job(job_execution):
|
||||||
return job_execution
|
return job_execution
|
||||||
|
|
||||||
job = conductor.job_get(ctx, job_execution.job_id)
|
job = conductor.job_get(ctx, job_execution.job_id)
|
||||||
if not edp.compare_job_type(job.type, 'Java'):
|
if not edp.compare_job_type(job.type, edp.JOB_TYPE_JAVA):
|
||||||
input_source = conductor.data_source_get(ctx, job_execution.input_id)
|
input_source = conductor.data_source_get(ctx, job_execution.input_id)
|
||||||
output_source = conductor.data_source_get(ctx, job_execution.output_id)
|
output_source = conductor.data_source_get(ctx, job_execution.output_id)
|
||||||
else:
|
else:
|
||||||
|
@ -230,6 +230,7 @@ def _append_slash_if_needed(path):
|
||||||
#TODO(nprivalova): this validation should be removed after implementing
|
#TODO(nprivalova): this validation should be removed after implementing
|
||||||
# all features
|
# all features
|
||||||
def validate(input_data, output_data, job):
|
def validate(input_data, output_data, job):
|
||||||
if not edp.compare_job_type(job.type, 'Pig', 'MapReduce',
|
if not edp.compare_job_type(job.type, edp.JOB_TYPE_PIG,
|
||||||
'Hive', 'Java'):
|
edp.JOB_TYPE_MAPREDUCE, edp.JOB_TYPE_HIVE,
|
||||||
|
edp.JOB_TYPE_JAVA):
|
||||||
raise RuntimeError
|
raise RuntimeError
|
||||||
|
|
|
@ -192,33 +192,33 @@ def get_creator(job):
|
||||||
def make_HiveFactory():
|
def make_HiveFactory():
|
||||||
return HiveFactory(job)
|
return HiveFactory(job)
|
||||||
|
|
||||||
factories = [
|
type_map = {
|
||||||
MapReduceFactory,
|
edp.JOB_TYPE_HIVE: make_HiveFactory,
|
||||||
MapReduceFactory,
|
edp.JOB_TYPE_JAVA: JavaFactory,
|
||||||
make_HiveFactory,
|
edp.JOB_TYPE_MAPREDUCE: MapReduceFactory,
|
||||||
make_PigFactory,
|
edp.JOB_TYPE_MAPREDUCE_STREAMING: MapReduceFactory,
|
||||||
JavaFactory
|
edp.JOB_TYPE_PIG: make_PigFactory
|
||||||
]
|
}
|
||||||
type_map = dict(zip(get_possible_job_types(), factories))
|
|
||||||
|
|
||||||
return type_map[job.type]()
|
return type_map[job.type]()
|
||||||
|
|
||||||
|
|
||||||
def get_possible_job_config(job_type):
|
def get_possible_job_config(job_type):
|
||||||
if not edp.compare_job_type(job_type, *get_possible_job_types()):
|
if not edp.compare_job_type(job_type, *edp.JOB_TYPES_ALL):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if edp.compare_job_type(job_type, 'Java'):
|
if edp.compare_job_type(job_type, edp.JOB_TYPE_JAVA):
|
||||||
return {'job_config': {'configs': [], 'args': []}}
|
return {'job_config': {'configs': [], 'args': []}}
|
||||||
|
|
||||||
if edp.compare_job_type(job_type, 'MapReduce', 'Pig'):
|
if edp.compare_job_type(job_type,
|
||||||
|
edp.JOB_TYPE_MAPREDUCE, edp.JOB_TYPE_PIG):
|
||||||
#TODO(nmakhotkin) Here we should return config based on specific plugin
|
#TODO(nmakhotkin) Here we should return config based on specific plugin
|
||||||
cfg = xmlutils.load_hadoop_xml_defaults(
|
cfg = xmlutils.load_hadoop_xml_defaults(
|
||||||
'plugins/vanilla/v1_2_1/resources/mapred-default.xml')
|
'plugins/vanilla/v1_2_1/resources/mapred-default.xml')
|
||||||
if edp.compare_job_type(job_type, 'MapReduce'):
|
if edp.compare_job_type(job_type, edp.JOB_TYPE_MAPREDUCE):
|
||||||
cfg += xmlutils.load_hadoop_xml_defaults(
|
cfg += xmlutils.load_hadoop_xml_defaults(
|
||||||
'service/edp/resources/mapred-job-config.xml')
|
'service/edp/resources/mapred-job-config.xml')
|
||||||
elif edp.compare_job_type(job_type, 'Hive'):
|
elif edp.compare_job_type(job_type, edp.JOB_TYPE_HIVE):
|
||||||
#TODO(nmakhotkin) Here we should return config based on specific plugin
|
#TODO(nmakhotkin) Here we should return config based on specific plugin
|
||||||
cfg = xmlutils.load_hadoop_xml_defaults(
|
cfg = xmlutils.load_hadoop_xml_defaults(
|
||||||
'plugins/vanilla/v1_2_1/resources/hive-default.xml')
|
'plugins/vanilla/v1_2_1/resources/hive-default.xml')
|
||||||
|
@ -226,16 +226,6 @@ def get_possible_job_config(job_type):
|
||||||
# TODO(tmckay): args should be a list when bug #269968
|
# TODO(tmckay): args should be a list when bug #269968
|
||||||
# is fixed on the UI side
|
# is fixed on the UI side
|
||||||
config = {'configs': cfg, "args": {}}
|
config = {'configs': cfg, "args": {}}
|
||||||
if not edp.compare_job_type('MapReduce', 'Java'):
|
if not edp.compare_job_type(edp.JOB_TYPE_MAPREDUCE, edp.JOB_TYPE_JAVA):
|
||||||
config.update({'params': {}})
|
config.update({'params': {}})
|
||||||
return {'job_config': config}
|
return {'job_config': config}
|
||||||
|
|
||||||
|
|
||||||
def get_possible_job_types():
|
|
||||||
return [
|
|
||||||
'MapReduce',
|
|
||||||
'MapReduce.Streaming',
|
|
||||||
'Hive',
|
|
||||||
'Pig',
|
|
||||||
'Java'
|
|
||||||
]
|
|
||||||
|
|
|
@ -31,13 +31,7 @@ JOB_SCHEMA = {
|
||||||
},
|
},
|
||||||
"type": {
|
"type": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"enum": [
|
"enum": edp.JOB_TYPES_ALL,
|
||||||
"Pig",
|
|
||||||
"Hive",
|
|
||||||
"MapReduce",
|
|
||||||
"MapReduce.Streaming",
|
|
||||||
"Java"
|
|
||||||
],
|
|
||||||
},
|
},
|
||||||
"mains": {
|
"mains": {
|
||||||
"type": "array",
|
"type": "array",
|
||||||
|
@ -78,10 +72,11 @@ def check_mains_libs(data, **kwargs):
|
||||||
mains = data.get("mains", [])
|
mains = data.get("mains", [])
|
||||||
libs = data.get("libs", [])
|
libs = data.get("libs", [])
|
||||||
job_type, subtype = edp.split_job_type(data.get("type"))
|
job_type, subtype = edp.split_job_type(data.get("type"))
|
||||||
streaming = job_type == "MapReduce" and subtype == "Streaming"
|
streaming = (job_type == edp.JOB_TYPE_MAPREDUCE and
|
||||||
|
subtype == edp.JOB_SUBTYPE_STREAMING)
|
||||||
|
|
||||||
# Pig or Hive flow has to contain script in mains, may also use libs
|
# Pig or Hive flow has to contain script in mains, may also use libs
|
||||||
if job_type in ['Pig', 'Hive']:
|
if job_type in [edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE]:
|
||||||
if not mains:
|
if not mains:
|
||||||
raise e.InvalidDataException("%s flow requires main script" %
|
raise e.InvalidDataException("%s flow requires main script" %
|
||||||
data.get("type"))
|
data.get("type"))
|
||||||
|
|
|
@ -66,7 +66,7 @@ def check_job_executor(data, job_id):
|
||||||
main_base.check_cluster_contains_oozie(data['cluster_id'])
|
main_base.check_cluster_contains_oozie(data['cluster_id'])
|
||||||
|
|
||||||
# All types except Java require input and output objects
|
# All types except Java require input and output objects
|
||||||
if job_type == 'Java':
|
if job_type == edp.JOB_TYPE_JAVA:
|
||||||
if not _is_main_class_present(data):
|
if not _is_main_class_present(data):
|
||||||
raise ex.InvalidDataException('Java job must '
|
raise ex.InvalidDataException('Java job must '
|
||||||
'specify edp.java.main_class')
|
'specify edp.java.main_class')
|
||||||
|
@ -80,8 +80,9 @@ def check_job_executor(data, job_id):
|
||||||
|
|
||||||
b.check_data_sources_are_different(data['input_id'], data['output_id'])
|
b.check_data_sources_are_different(data['input_id'], data['output_id'])
|
||||||
|
|
||||||
if job_type == 'MapReduce' and (
|
if job_type == edp.JOB_TYPE_MAPREDUCE and (
|
||||||
subtype == 'Streaming' and not _streaming_present(data)):
|
subtype == edp.JOB_SUBTYPE_STREAMING
|
||||||
|
and not _streaming_present(data)):
|
||||||
raise ex.InvalidDataException("%s job "
|
raise ex.InvalidDataException("%s job "
|
||||||
"must specify streaming mapper "
|
"must specify streaming mapper "
|
||||||
"and reducer" % job.type)
|
"and reducer" % job.type)
|
||||||
|
|
|
@ -136,7 +136,7 @@ class EDPTest(base.ITestCase):
|
||||||
|
|
||||||
# Java jobs don't use data sources. Input/output paths must
|
# Java jobs don't use data sources. Input/output paths must
|
||||||
# be passed as args with corresponding username/password configs
|
# be passed as args with corresponding username/password configs
|
||||||
if not edp.compare_job_type(job_type, "Java"):
|
if not edp.compare_job_type(job_type, edp.JOB_TYPE_JAVA):
|
||||||
input_id = self._create_data_source(
|
input_id = self._create_data_source(
|
||||||
'input-%s' % str(uuid.uuid4())[:8], 'swift',
|
'input-%s' % str(uuid.uuid4())[:8], 'swift',
|
||||||
swift_input_url)
|
swift_input_url)
|
||||||
|
@ -160,8 +160,8 @@ class EDPTest(base.ITestCase):
|
||||||
|
|
||||||
# Append the input/output paths with the swift configs
|
# Append the input/output paths with the swift configs
|
||||||
# if the caller has requested it...
|
# if the caller has requested it...
|
||||||
if edp.compare_job_type(job_type,
|
if edp.compare_job_type(
|
||||||
"Java") and pass_input_output_args:
|
job_type, edp.JOB_TYPE_JAVA) and pass_input_output_args:
|
||||||
self._add_swift_configs(configs)
|
self._add_swift_configs(configs)
|
||||||
if "args" in configs:
|
if "args" in configs:
|
||||||
configs["args"].extend([swift_input_url,
|
configs["args"].extend([swift_input_url,
|
||||||
|
|
|
@ -22,6 +22,7 @@ from sahara.tests.integration.tests import edp
|
||||||
from sahara.tests.integration.tests import map_reduce
|
from sahara.tests.integration.tests import map_reduce
|
||||||
from sahara.tests.integration.tests import scaling
|
from sahara.tests.integration.tests import scaling
|
||||||
from sahara.tests.integration.tests import swift
|
from sahara.tests.integration.tests import swift
|
||||||
|
from sahara.utils import edp as utils_edp
|
||||||
|
|
||||||
|
|
||||||
class HDPGatingTest(cinder.CinderVolumeTest, edp.EDPTest,
|
class HDPGatingTest(cinder.CinderVolumeTest, edp.EDPTest,
|
||||||
|
@ -182,16 +183,13 @@ class HDPGatingTest(cinder.CinderVolumeTest, edp.EDPTest,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
self.edp_testing('Pig', [{'pig': pig_job_data}],
|
self.edp_testing(utils_edp.JOB_TYPE_PIG, [{'pig': pig_job_data}],
|
||||||
[{'jar': pig_lib_data}])
|
[{'jar': pig_lib_data}])
|
||||||
self.edp_testing(
|
self.edp_testing(utils_edp.JOB_TYPE_MAPREDUCE, [],
|
||||||
'MapReduce', [], [{'jar': mapreduce_jar_data}],
|
[{'jar': mapreduce_jar_data}], mapreduce_configs)
|
||||||
mapreduce_configs
|
self.edp_testing(utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, [], [],
|
||||||
)
|
mapreduce_streaming_configs)
|
||||||
self.edp_testing(
|
self.edp_testing(utils_edp.JOB_TYPE_JAVA, [],
|
||||||
'MapReduce.Streaming', [], [], mapreduce_streaming_configs
|
|
||||||
)
|
|
||||||
self.edp_testing('Java', [],
|
|
||||||
lib_data_list=[{'jar': java_lib_data}],
|
lib_data_list=[{'jar': java_lib_data}],
|
||||||
configs=java_configs,
|
configs=java_configs,
|
||||||
pass_input_output_args=True)
|
pass_input_output_args=True)
|
||||||
|
|
|
@ -24,6 +24,7 @@ from sahara.tests.integration.tests import map_reduce
|
||||||
from sahara.tests.integration.tests import scaling
|
from sahara.tests.integration.tests import scaling
|
||||||
from sahara.tests.integration.tests import swift
|
from sahara.tests.integration.tests import swift
|
||||||
from sahara.tests.integration.tests import vanilla_transient_cluster
|
from sahara.tests.integration.tests import vanilla_transient_cluster
|
||||||
|
from sahara.utils import edp as utils_edp
|
||||||
|
|
||||||
|
|
||||||
class VanillaGatingTest(cinder.CinderVolumeTest,
|
class VanillaGatingTest(cinder.CinderVolumeTest,
|
||||||
|
@ -290,16 +291,13 @@ class VanillaGatingTest(cinder.CinderVolumeTest,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
self.edp_testing('Pig', [{'pig': pig_job_data}],
|
self.edp_testing(utils_edp.JOB_TYPE_PIG, [{'pig': pig_job_data}],
|
||||||
[{'jar': pig_lib_data}])
|
[{'jar': pig_lib_data}])
|
||||||
self.edp_testing(
|
self.edp_testing(utils_edp.JOB_TYPE_MAPREDUCE, [],
|
||||||
'MapReduce', [], [{'jar': mapreduce_jar_data}],
|
[{'jar': mapreduce_jar_data}], mapreduce_configs)
|
||||||
mapreduce_configs
|
self.edp_testing(utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, [], [],
|
||||||
)
|
mapreduce_streaming_configs)
|
||||||
self.edp_testing(
|
self.edp_testing(utils_edp.JOB_TYPE_JAVA, [],
|
||||||
'MapReduce.Streaming', [], [], mapreduce_streaming_configs
|
|
||||||
)
|
|
||||||
self.edp_testing('Java', [],
|
|
||||||
lib_data_list=[{'jar': java_lib_data}],
|
lib_data_list=[{'jar': java_lib_data}],
|
||||||
configs=java_configs,
|
configs=java_configs,
|
||||||
pass_input_output_args=True)
|
pass_input_output_args=True)
|
||||||
|
|
|
@ -23,6 +23,7 @@ from sahara.tests.integration.tests import edp
|
||||||
from sahara.tests.integration.tests import map_reduce
|
from sahara.tests.integration.tests import map_reduce
|
||||||
from sahara.tests.integration.tests import scaling
|
from sahara.tests.integration.tests import scaling
|
||||||
from sahara.tests.integration.tests import swift
|
from sahara.tests.integration.tests import swift
|
||||||
|
from sahara.utils import edp as utils_edp
|
||||||
from sahara.utils import files as f
|
from sahara.utils import files as f
|
||||||
|
|
||||||
|
|
||||||
|
@ -189,7 +190,8 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
|
||||||
# check pig
|
# check pig
|
||||||
pig_job = f.get_file_text(path + 'edp-job.pig')
|
pig_job = f.get_file_text(path + 'edp-job.pig')
|
||||||
pig_lib = f.get_file_text(path + 'edp-lib.jar')
|
pig_lib = f.get_file_text(path + 'edp-lib.jar')
|
||||||
self.edp_testing('Pig', [{'pig': pig_job}], [{'jar': pig_lib}])
|
self.edp_testing(utils_edp.JOB_TYPE_PIG,
|
||||||
|
[{'pig': pig_job}], [{'jar': pig_lib}])
|
||||||
|
|
||||||
# check mapreduce
|
# check mapreduce
|
||||||
mapreduce_jar = f.get_file_text(path + 'edp-mapreduce.jar')
|
mapreduce_jar = f.get_file_text(path + 'edp-mapreduce.jar')
|
||||||
|
@ -200,7 +202,8 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
|
||||||
'org.apache.oozie.example.SampleReducer'
|
'org.apache.oozie.example.SampleReducer'
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.edp_testing('MapReduce', [], [{'jar': mapreduce_jar}],
|
self.edp_testing(utils_edp.JOB_TYPE_MAPREDUCE,
|
||||||
|
[], [{'jar': mapreduce_jar}],
|
||||||
mapreduce_configs)
|
mapreduce_configs)
|
||||||
|
|
||||||
# check mapreduce streaming
|
# check mapreduce streaming
|
||||||
|
@ -211,7 +214,7 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.edp_testing('MapReduce.Streaming', [], [],
|
self.edp_testing(utils_edp.JOB_TYPE_MAPREDUCE_STREAMING, [], [],
|
||||||
mapreduce_streaming_configs)
|
mapreduce_streaming_configs)
|
||||||
|
|
||||||
# check java
|
# check java
|
||||||
|
@ -224,7 +227,8 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
|
||||||
},
|
},
|
||||||
'args': ['10', '10']
|
'args': ['10', '10']
|
||||||
}
|
}
|
||||||
self.edp_testing('Java', [], lib_data_list=[{'jar': java_jar}],
|
self.edp_testing(utils_edp.JOB_TYPE_JAVA,
|
||||||
|
[], lib_data_list=[{'jar': java_jar}],
|
||||||
configs=java_configs)
|
configs=java_configs)
|
||||||
|
|
||||||
@b.errormsg("Failure while cluster scaling: ")
|
@b.errormsg("Failure while cluster scaling: ")
|
||||||
|
|
|
@ -19,6 +19,7 @@ import datetime
|
||||||
from sahara import context
|
from sahara import context
|
||||||
from sahara import exceptions as ex
|
from sahara import exceptions as ex
|
||||||
import sahara.tests.unit.conductor.base as test_base
|
import sahara.tests.unit.conductor.base as test_base
|
||||||
|
from sahara.utils import edp
|
||||||
|
|
||||||
|
|
||||||
SAMPLE_DATA_SOURCE = {
|
SAMPLE_DATA_SOURCE = {
|
||||||
|
@ -37,7 +38,7 @@ SAMPLE_JOB = {
|
||||||
"tenant_id": "test_tenant",
|
"tenant_id": "test_tenant",
|
||||||
"name": "job_test",
|
"name": "job_test",
|
||||||
"description": "test_desc",
|
"description": "test_desc",
|
||||||
"type": "Pig",
|
"type": edp.JOB_TYPE_PIG,
|
||||||
"mains": []
|
"mains": []
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,7 +46,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||||
remote.return_value = remote_class
|
remote.return_value = remote_class
|
||||||
helper.return_value = 'ok'
|
helper.return_value = 'ok'
|
||||||
|
|
||||||
job, _ = _create_all_stack('Pig')
|
job, _ = _create_all_stack(edp.JOB_TYPE_PIG)
|
||||||
res = job_manager.create_workflow_dir(mock.Mock(), job, 'hadoop')
|
res = job_manager.create_workflow_dir(mock.Mock(), job, 'hadoop')
|
||||||
self.assertIn('/user/hadoop/special_name/', res)
|
self.assertIn('/user/hadoop/special_name/', res)
|
||||||
|
|
||||||
|
@ -67,12 +67,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||||
dir_missing.return_value = False
|
dir_missing.return_value = False
|
||||||
conductor_raw_data.return_value = 'ok'
|
conductor_raw_data.return_value = 'ok'
|
||||||
|
|
||||||
job, _ = _create_all_stack('Pig')
|
job, _ = _create_all_stack(edp.JOB_TYPE_PIG)
|
||||||
res = job_manager.upload_job_files(mock.Mock(), 'job_prefix',
|
res = job_manager.upload_job_files(mock.Mock(), 'job_prefix',
|
||||||
job, 'hadoop')
|
job, 'hadoop')
|
||||||
self.assertEqual(['job_prefix/script.pig'], res)
|
self.assertEqual(['job_prefix/script.pig'], res)
|
||||||
|
|
||||||
job, _ = _create_all_stack('MapReduce')
|
job, _ = _create_all_stack(edp.JOB_TYPE_MAPREDUCE)
|
||||||
res = job_manager.upload_job_files(mock.Mock(), 'job_prefix',
|
res = job_manager.upload_job_files(mock.Mock(), 'job_prefix',
|
||||||
job, 'hadoop')
|
job, 'hadoop')
|
||||||
self.assertEqual(['job_prefix/lib/main.jar'], res)
|
self.assertEqual(['job_prefix/lib/main.jar'], res)
|
||||||
|
@ -93,7 +93,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||||
@mock.patch('sahara.conductor.API.job_binary_get')
|
@mock.patch('sahara.conductor.API.job_binary_get')
|
||||||
def test_build_workflow_for_job_pig(self, job_binary):
|
def test_build_workflow_for_job_pig(self, job_binary):
|
||||||
|
|
||||||
job, job_exec = _create_all_stack('Pig')
|
job, job_exec = _create_all_stack(edp.JOB_TYPE_PIG)
|
||||||
job_binary.return_value = {"name": "script.pig"}
|
job_binary.return_value = {"name": "script.pig"}
|
||||||
|
|
||||||
input_data = _create_data_source('swift://ex.sahara/i')
|
input_data = _create_data_source('swift://ex.sahara/i')
|
||||||
|
@ -126,7 +126,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||||
def test_build_workflow_swift_configs(self, job_binary):
|
def test_build_workflow_swift_configs(self, job_binary):
|
||||||
|
|
||||||
# Test that swift configs come from either input or output data sources
|
# Test that swift configs come from either input or output data sources
|
||||||
job, job_exec = _create_all_stack('Pig')
|
job, job_exec = _create_all_stack(edp.JOB_TYPE_PIG)
|
||||||
job_binary.return_value = {"name": "script.pig"}
|
job_binary.return_value = {"name": "script.pig"}
|
||||||
|
|
||||||
input_data = _create_data_source('swift://ex.sahara/i')
|
input_data = _create_data_source('swift://ex.sahara/i')
|
||||||
|
@ -168,8 +168,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||||
</property>
|
</property>
|
||||||
</configuration>""", res)
|
</configuration>""", res)
|
||||||
|
|
||||||
job, job_exec = _create_all_stack('Pig', configs={'configs':
|
job, job_exec = _create_all_stack(
|
||||||
{'dummy': 'value'}})
|
edp.JOB_TYPE_PIG, configs={'configs': {'dummy': 'value'}})
|
||||||
input_data = _create_data_source('hdfs://user/hadoop/in')
|
input_data = _create_data_source('hdfs://user/hadoop/in')
|
||||||
output_data = _create_data_source('hdfs://user/hadoop/out')
|
output_data = _create_data_source('hdfs://user/hadoop/out')
|
||||||
|
|
||||||
|
@ -236,8 +236,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||||
</property>""", res)
|
</property>""", res)
|
||||||
|
|
||||||
def test_build_workflow_for_job_mapreduce(self):
|
def test_build_workflow_for_job_mapreduce(self):
|
||||||
self._build_workflow_common('MapReduce')
|
self._build_workflow_common(edp.JOB_TYPE_MAPREDUCE)
|
||||||
self._build_workflow_common('MapReduce', streaming=True)
|
self._build_workflow_common(edp.JOB_TYPE_MAPREDUCE, streaming=True)
|
||||||
|
|
||||||
def test_build_workflow_for_job_java(self):
|
def test_build_workflow_for_job_java(self):
|
||||||
# If args include swift paths, user and password values
|
# If args include swift paths, user and password values
|
||||||
|
@ -252,7 +252,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||||
'output_path']
|
'output_path']
|
||||||
}
|
}
|
||||||
|
|
||||||
job, job_exec = _create_all_stack('Java', configs)
|
job, job_exec = _create_all_stack(edp.JOB_TYPE_JAVA, configs)
|
||||||
creator = workflow_factory.get_creator(job)
|
creator = workflow_factory.get_creator(job)
|
||||||
res = creator.get_workflow_xml(_create_cluster(), job_exec)
|
res = creator.get_workflow_xml(_create_cluster(), job_exec)
|
||||||
|
|
||||||
|
@ -275,7 +275,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||||
@mock.patch('sahara.conductor.API.job_binary_get')
|
@mock.patch('sahara.conductor.API.job_binary_get')
|
||||||
def test_build_workflow_for_job_hive(self, job_binary):
|
def test_build_workflow_for_job_hive(self, job_binary):
|
||||||
|
|
||||||
job, job_exec = _create_all_stack('Hive')
|
job, job_exec = _create_all_stack(edp.JOB_TYPE_HIVE)
|
||||||
job_binary.return_value = {"name": "script.q"}
|
job_binary.return_value = {"name": "script.q"}
|
||||||
|
|
||||||
input_data = _create_data_source('swift://ex.sahara/i')
|
input_data = _create_data_source('swift://ex.sahara/i')
|
||||||
|
@ -335,7 +335,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||||
</property>""", res)
|
</property>""", res)
|
||||||
|
|
||||||
def test_build_workflow_for_job_mapreduce_with_conf(self):
|
def test_build_workflow_for_job_mapreduce_with_conf(self):
|
||||||
self._build_workflow_with_conf_common('MapReduce')
|
self._build_workflow_with_conf_common(edp.JOB_TYPE_MAPREDUCE)
|
||||||
|
|
||||||
def test_update_job_dict(self):
|
def test_update_job_dict(self):
|
||||||
w = workflow_factory.BaseFactory()
|
w = workflow_factory.BaseFactory()
|
||||||
|
@ -382,7 +382,7 @@ def _create_job(id, job_binary, type):
|
||||||
job.id = id
|
job.id = id
|
||||||
job.type = type
|
job.type = type
|
||||||
job.name = 'special_name'
|
job.name = 'special_name'
|
||||||
if edp.compare_job_type(type, 'Pig', 'Hive'):
|
if edp.compare_job_type(type, edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE):
|
||||||
job.mains = [job_binary]
|
job.mains = [job_binary]
|
||||||
job.libs = None
|
job.libs = None
|
||||||
else:
|
else:
|
||||||
|
@ -395,9 +395,9 @@ def _create_job_binary(id, type):
|
||||||
binary = mock.Mock()
|
binary = mock.Mock()
|
||||||
binary.id = id
|
binary.id = id
|
||||||
binary.url = "internal-db://42"
|
binary.url = "internal-db://42"
|
||||||
if edp.compare_job_type(type, 'Pig'):
|
if edp.compare_job_type(type, edp.JOB_TYPE_PIG):
|
||||||
binary.name = "script.pig"
|
binary.name = "script.pig"
|
||||||
elif edp.compare_job_type(type, 'MapReduce', 'Java'):
|
elif edp.compare_job_type(type, edp.JOB_TYPE_MAPREDUCE, edp.JOB_TYPE_JAVA):
|
||||||
binary.name = "main.jar"
|
binary.name = "main.jar"
|
||||||
else:
|
else:
|
||||||
binary.name = "script.q"
|
binary.name = "script.q"
|
||||||
|
@ -426,7 +426,7 @@ def _create_job_exec(job_id, type, configs=None):
|
||||||
j_exec = mock.Mock()
|
j_exec = mock.Mock()
|
||||||
j_exec.job_id = job_id
|
j_exec.job_id = job_id
|
||||||
j_exec.job_configs = configs
|
j_exec.job_configs = configs
|
||||||
if edp.compare_job_type(type, "Java"):
|
if edp.compare_job_type(type, edp.JOB_TYPE_JAVA):
|
||||||
j_exec.job_configs['configs']['edp.java.main_class'] = _java_main_class
|
j_exec.job_configs['configs']['edp.java.main_class'] = _java_main_class
|
||||||
j_exec.job_configs['configs']['edp.java.java_opts'] = _java_opts
|
j_exec.job_configs['configs']['edp.java.java_opts'] = _java_opts
|
||||||
return j_exec
|
return j_exec
|
||||||
|
|
|
@ -16,12 +16,13 @@
|
||||||
import unittest2
|
import unittest2
|
||||||
|
|
||||||
from sahara.service.edp.workflow_creator import workflow_factory as w_f
|
from sahara.service.edp.workflow_creator import workflow_factory as w_f
|
||||||
|
from sahara.utils import edp
|
||||||
|
|
||||||
|
|
||||||
class TestJobPossibleConfigs(unittest2.TestCase):
|
class TestJobPossibleConfigs(unittest2.TestCase):
|
||||||
|
|
||||||
def test_possible_configs(self):
|
def test_possible_configs(self):
|
||||||
res = w_f.get_possible_job_config("MapReduce")
|
res = w_f.get_possible_job_config(edp.JOB_TYPE_MAPREDUCE)
|
||||||
sample_config_property = {
|
sample_config_property = {
|
||||||
'name': 'mapred.map.tasks',
|
'name': 'mapred.map.tasks',
|
||||||
'value': '2',
|
'value': '2',
|
||||||
|
@ -30,7 +31,7 @@ class TestJobPossibleConfigs(unittest2.TestCase):
|
||||||
}
|
}
|
||||||
self.assertIn(sample_config_property, res['job_config']["configs"])
|
self.assertIn(sample_config_property, res['job_config']["configs"])
|
||||||
|
|
||||||
res = w_f.get_possible_job_config("Hive")
|
res = w_f.get_possible_job_config(edp.JOB_TYPE_HIVE)
|
||||||
sample_config_property = {
|
sample_config_property = {
|
||||||
"description": "The serde used by FetchTask to serialize the "
|
"description": "The serde used by FetchTask to serialize the "
|
||||||
"fetch output.",
|
"fetch output.",
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
from sahara.service.validations.edp import job as j
|
from sahara.service.validations.edp import job as j
|
||||||
from sahara.tests.unit.service.validation import utils as u
|
from sahara.tests.unit.service.validation import utils as u
|
||||||
|
from sahara.utils import edp
|
||||||
|
|
||||||
|
|
||||||
class TestJobValidation(u.ValidationTestCase):
|
class TestJobValidation(u.ValidationTestCase):
|
||||||
|
@ -24,7 +25,7 @@ class TestJobValidation(u.ValidationTestCase):
|
||||||
self.scheme = j.JOB_SCHEMA
|
self.scheme = j.JOB_SCHEMA
|
||||||
|
|
||||||
def test_empty_libs(self):
|
def test_empty_libs(self):
|
||||||
for job_type in ['MapReduce', 'Java']:
|
for job_type in [edp.JOB_TYPE_MAPREDUCE, edp.JOB_TYPE_JAVA]:
|
||||||
self._assert_create_object_validation(
|
self._assert_create_object_validation(
|
||||||
data={
|
data={
|
||||||
"name": "jar.jar",
|
"name": "jar.jar",
|
||||||
|
@ -36,11 +37,11 @@ class TestJobValidation(u.ValidationTestCase):
|
||||||
self._assert_create_object_validation(
|
self._assert_create_object_validation(
|
||||||
data={
|
data={
|
||||||
"name": "jar.jar",
|
"name": "jar.jar",
|
||||||
"type": "MapReduce.Streaming",
|
"type": edp.JOB_TYPE_MAPREDUCE_STREAMING,
|
||||||
})
|
})
|
||||||
|
|
||||||
def test_mains_unused(self):
|
def test_mains_unused(self):
|
||||||
for job_type in ['MapReduce', 'Java']:
|
for job_type in [edp.JOB_TYPE_MAPREDUCE, edp.JOB_TYPE_JAVA]:
|
||||||
self._assert_create_object_validation(
|
self._assert_create_object_validation(
|
||||||
data={
|
data={
|
||||||
"name": "jar.jar",
|
"name": "jar.jar",
|
||||||
|
@ -54,7 +55,7 @@ class TestJobValidation(u.ValidationTestCase):
|
||||||
def test_empty_pig_mains(self):
|
def test_empty_pig_mains(self):
|
||||||
data = {
|
data = {
|
||||||
"name": "pig.pig",
|
"name": "pig.pig",
|
||||||
"type": "Pig",
|
"type": edp.JOB_TYPE_PIG,
|
||||||
"libs": ['lib-uuid']
|
"libs": ['lib-uuid']
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,14 +63,14 @@ class TestJobValidation(u.ValidationTestCase):
|
||||||
data=data, bad_req_i=(1, "INVALID_DATA",
|
data=data, bad_req_i=(1, "INVALID_DATA",
|
||||||
"Pig flow requires main script"))
|
"Pig flow requires main script"))
|
||||||
|
|
||||||
data.update({"type": "Hive"})
|
data.update({"type": edp.JOB_TYPE_HIVE})
|
||||||
|
|
||||||
self._assert_create_object_validation(
|
self._assert_create_object_validation(
|
||||||
data=data, bad_req_i=(1, "INVALID_DATA",
|
data=data, bad_req_i=(1, "INVALID_DATA",
|
||||||
"Hive flow requires main script"))
|
"Hive flow requires main script"))
|
||||||
|
|
||||||
def test_overlap_libs(self):
|
def test_overlap_libs(self):
|
||||||
for job_type in ['Hive', 'Pig']:
|
for job_type in [edp.JOB_TYPE_HIVE, edp.JOB_TYPE_PIG]:
|
||||||
self._assert_create_object_validation(
|
self._assert_create_object_validation(
|
||||||
data={
|
data={
|
||||||
"name": "jar.jar",
|
"name": "jar.jar",
|
||||||
|
@ -86,6 +87,4 @@ class TestJobValidation(u.ValidationTestCase):
|
||||||
"type": "Jar",
|
"type": "Jar",
|
||||||
},
|
},
|
||||||
bad_req_i=(1, "VALIDATION_ERROR",
|
bad_req_i=(1, "VALIDATION_ERROR",
|
||||||
"'Jar' is not one of "
|
"'Jar' is not one of " + str(edp.JOB_TYPES_ALL)))
|
||||||
"['Pig', 'Hive', 'MapReduce', "
|
|
||||||
"'MapReduce.Streaming', 'Java']"))
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ from sahara.service.validations import base as validation_base
|
||||||
from sahara.service.validations.edp import job_executor as je
|
from sahara.service.validations.edp import job_executor as je
|
||||||
from sahara.tests.unit.service.validation import utils as u
|
from sahara.tests.unit.service.validation import utils as u
|
||||||
from sahara.tests.unit import testutils as tu
|
from sahara.tests.unit import testutils as tu
|
||||||
|
from sahara.utils import edp
|
||||||
|
|
||||||
|
|
||||||
def wrap_it(data):
|
def wrap_it(data):
|
||||||
|
@ -30,7 +31,7 @@ def wrap_it(data):
|
||||||
|
|
||||||
|
|
||||||
class FakeJob(object):
|
class FakeJob(object):
|
||||||
type = "MapReduce.Streaming"
|
type = edp.JOB_TYPE_MAPREDUCE_STREAMING
|
||||||
libs = []
|
libs = []
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ import six
|
||||||
|
|
||||||
from sahara.service.validations.edp import job_executor as je
|
from sahara.service.validations.edp import job_executor as je
|
||||||
from sahara.tests.unit.service.validation import utils as u
|
from sahara.tests.unit.service.validation import utils as u
|
||||||
|
from sahara.utils import edp
|
||||||
|
|
||||||
|
|
||||||
def wrap_it(data):
|
def wrap_it(data):
|
||||||
|
@ -27,7 +28,7 @@ def wrap_it(data):
|
||||||
|
|
||||||
|
|
||||||
class FakeJob(object):
|
class FakeJob(object):
|
||||||
type = "Java"
|
type = edp.JOB_TYPE_JAVA
|
||||||
libs = []
|
libs = []
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -18,27 +18,32 @@ import unittest2
|
||||||
from sahara.utils import edp
|
from sahara.utils import edp
|
||||||
|
|
||||||
|
|
||||||
MAPRED_STREAMING = "MapReduce" + edp.JOB_TYPE_SEP + "Streaming"
|
|
||||||
|
|
||||||
|
|
||||||
class SplitJobTypeTest(unittest2.TestCase):
|
class SplitJobTypeTest(unittest2.TestCase):
|
||||||
def test_split_job_type(self):
|
def test_split_job_type(self):
|
||||||
jtype, stype = edp.split_job_type("MapReduce")
|
jtype, stype = edp.split_job_type(edp.JOB_TYPE_MAPREDUCE)
|
||||||
self.assertEqual(jtype, "MapReduce")
|
self.assertEqual(jtype, edp.JOB_TYPE_MAPREDUCE)
|
||||||
self.assertEqual(stype, "")
|
self.assertEqual(stype, edp.JOB_SUBTYPE_NONE)
|
||||||
|
|
||||||
jtype, stype = edp.split_job_type(MAPRED_STREAMING)
|
jtype, stype = edp.split_job_type(edp.JOB_TYPE_MAPREDUCE_STREAMING)
|
||||||
self.assertEqual(jtype, "MapReduce")
|
self.assertEqual(jtype, edp.JOB_TYPE_MAPREDUCE)
|
||||||
self.assertEqual(stype, "Streaming")
|
self.assertEqual(stype, edp.JOB_SUBTYPE_STREAMING)
|
||||||
|
|
||||||
def test_compare_job_type(self):
|
def test_compare_job_type(self):
|
||||||
self.assertTrue(edp.compare_job_type("Java",
|
self.assertTrue(edp.compare_job_type(
|
||||||
"Java", "MapReduce",
|
edp.JOB_TYPE_JAVA,
|
||||||
strict=True))
|
edp.JOB_TYPE_JAVA,
|
||||||
self.assertFalse(edp.compare_job_type(MAPRED_STREAMING,
|
edp.JOB_TYPE_MAPREDUCE,
|
||||||
"Java", "MapReduce",
|
strict=True))
|
||||||
strict=True))
|
self.assertFalse(edp.compare_job_type(
|
||||||
self.assertTrue(edp.compare_job_type(MAPRED_STREAMING,
|
edp.JOB_TYPE_MAPREDUCE_STREAMING,
|
||||||
"Java", "MapReduce"))
|
edp.JOB_TYPE_JAVA,
|
||||||
self.assertFalse(edp.compare_job_type("MapReduce",
|
edp.JOB_TYPE_MAPREDUCE,
|
||||||
"Java", MAPRED_STREAMING))
|
strict=True))
|
||||||
|
self.assertTrue(edp.compare_job_type(
|
||||||
|
edp.JOB_TYPE_MAPREDUCE_STREAMING,
|
||||||
|
edp.JOB_TYPE_JAVA,
|
||||||
|
edp.JOB_TYPE_MAPREDUCE))
|
||||||
|
self.assertFalse(edp.compare_job_type(
|
||||||
|
edp.JOB_TYPE_MAPREDUCE,
|
||||||
|
edp.JOB_TYPE_JAVA,
|
||||||
|
edp.JOB_TYPE_MAPREDUCE_STREAMING))
|
||||||
|
|
|
@ -13,7 +13,26 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
# job type separator character
|
||||||
JOB_TYPE_SEP = '.'
|
JOB_TYPE_SEP = '.'
|
||||||
|
# job sub types available
|
||||||
|
JOB_SUBTYPE_STREAMING = 'Streaming'
|
||||||
|
JOB_SUBTYPE_NONE = ''
|
||||||
|
# job types available
|
||||||
|
JOB_TYPE_HIVE = 'Hive'
|
||||||
|
JOB_TYPE_JAVA = 'Java'
|
||||||
|
JOB_TYPE_MAPREDUCE = 'MapReduce'
|
||||||
|
JOB_TYPE_MAPREDUCE_STREAMING = (JOB_TYPE_MAPREDUCE + JOB_TYPE_SEP +
|
||||||
|
JOB_SUBTYPE_STREAMING)
|
||||||
|
JOB_TYPE_PIG = 'Pig'
|
||||||
|
# job type groupings available
|
||||||
|
JOB_TYPES_ALL = [
|
||||||
|
JOB_TYPE_HIVE,
|
||||||
|
JOB_TYPE_JAVA,
|
||||||
|
JOB_TYPE_MAPREDUCE,
|
||||||
|
JOB_TYPE_MAPREDUCE_STREAMING,
|
||||||
|
JOB_TYPE_PIG
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def split_job_type(job_type):
|
def split_job_type(job_type):
|
||||||
|
|
Loading…
Reference in New Issue