Merge "[EDP] Add scheduling EDP jobs in sahara(oozie engine implementation)"

This commit is contained in:
Jenkins 2016-01-20 16:34:03 +00:00 committed by Gerrit Code Review
commit 11ffd666e2
13 changed files with 248 additions and 16 deletions

View File

@ -358,3 +358,13 @@ class MaxRetriesExceeded(SaharaException):
'attempts': attempts}
super(MaxRetriesExceeded, self).__init__(formatted_message)
class InvalidJobExecutionInfoException(SaharaException):
message = _("Job execution information is invalid")
def __init__(self, message=None):
if message:
self.message = message
self.code = "INVALID_JOB_EXECUTION_INFO"
super(InvalidJobExecutionInfoException, self).__init__()

View File

@ -27,6 +27,9 @@ class FakeJobEngine(base_engine.JobEngine):
def run_job(self, job_execution):
return 'engine_job_id', edp.JOB_STATUS_SUCCEEDED, None
def run_scheduled_job(self, job_execution):
pass
def validate_job_execution(self, cluster, job, data):
pass

View File

@ -117,6 +117,13 @@ def execute_job(job_id, data):
# Since we will use a unified class in the database, we pass
# a superset for all job types
# example configs['start'] = '2015-05-12T08:55Z' frequency = 5 means
# the job will starts from 2015-05-12T08:55Z, runs every 5 mins
job_execution_info = data.get('job_execution_info', {})
configs['job_execution_info'] = job_execution_info
job_ex_dict = {'input_id': input_id, 'output_id': output_id,
'job_id': job_id, 'cluster_id': cluster_id,
'info': {'status': edp.JOB_STATUS_PENDING},

View File

@ -37,6 +37,10 @@ class JobEngine(object):
def run_job(self, job_execution):
pass
@abc.abstractmethod
def run_scheduled_job(self, job_execution):
pass
@abc.abstractmethod
def validate_job_execution(self, cluster, job, data):
pass

View File

@ -105,7 +105,11 @@ def _run_job(job_execution_id):
# Job id is a string
# Status is a string
# Extra is a dictionary to add to extra in the job_execution
jid, status, extra = eng.run_job(job_execution)
if job_execution.job_configs.job_execution_info.get('job_execution_type'
) == 'scheduled':
jid, status, extra = eng.run_scheduled_job(job_execution)
else:
jid, status, extra = eng.run_job(job_execution)
# Set the job id and the start time
# Optionally, update the status and the 'extra' field

View File

@ -16,6 +16,7 @@
import abc
import os
import uuid
import xml.dom.minidom as xml
from oslo_config import cfg
import six
@ -50,9 +51,10 @@ class OozieJobEngine(base_engine.JobEngine):
return o.OozieClient(self.get_oozie_server_uri(self.cluster),
self.get_oozie_server(self.cluster))
def _get_oozie_job_params(self, hdfs_user, path_to_workflow, oozie_params,
use_hbase_lib):
app_path = "oozie.wf.application.path"
def _get_oozie_job_params(self, hdfs_user, path_to_workflow,
oozie_params, use_hbase_lib,
scheduled_params=None, job_dir=None,
job_execution_type=None):
oozie_libpath_key = "oozie.libpath"
oozie_libpath = ""
rm_path = self.get_resource_manager_uri(self.cluster)
@ -66,13 +68,24 @@ class OozieJobEngine(base_engine.JobEngine):
else:
oozie_libpath = hbase_common_lib_path
job_parameters = {
"jobTracker": rm_path,
"nameNode": nn_path,
"user.name": hdfs_user,
oozie_libpath_key: oozie_libpath,
app_path: "%s%s" % (nn_path, path_to_workflow),
"oozie.use.system.libpath": "true"}
if job_execution_type == "scheduled":
app_path = "oozie.coord.application.path"
job_parameters = {
"start": scheduled_params.get('start'),
"end": scheduled_params.get('end'),
"frequency": scheduled_params.get('frequency'),
"workflowAppUri": "%s%s" % (nn_path, job_dir),
app_path: "%s%s" % (nn_path, job_dir)}
else:
app_path = "oozie.wf.application.path"
job_parameters = {
app_path: "%s%s" % (nn_path, path_to_workflow)}
job_parameters["nameNode"] = nn_path
job_parameters["user.name"] = hdfs_user
job_parameters["jobTracker"] = rm_path
job_parameters[oozie_libpath_key] = oozie_libpath
job_parameters["oozie.use.system.libpath"] = "true"
# Don't let the application path be overwritten, that can't
# possibly make any sense
@ -89,6 +102,12 @@ class OozieJobEngine(base_engine.JobEngine):
h.put_file_to_hdfs(r, wf_xml, "workflow.xml", job_dir, hdfs_user)
return "%s/workflow.xml" % job_dir
def _upload_coordinator_file(self, where, job_dir, wf_xml, hdfs_user):
with remote.get_remote(where) as r:
h.put_file_to_hdfs(r, wf_xml, "coordinator.xml", job_dir,
hdfs_user)
return "%s/coordinator.xml" % job_dir
def cancel_job(self, job_execution):
if job_execution.engine_job_id is not None:
client = self._get_client()
@ -99,13 +118,15 @@ class OozieJobEngine(base_engine.JobEngine):
if job_execution.engine_job_id is not None:
return self._get_client().get_job_info(job_execution)
def run_job(self, job_execution):
def _prepare_run_job(self, job_execution):
ctx = context.ctx()
# This will be a dictionary of tuples, (native_url, runtime_url)
# keyed by data_source id
data_source_urls = {}
prepared_job_params = {}
job = conductor.job_get(ctx, job_execution.job_id)
input_source, output_source = job_utils.get_data_sources(
job_execution, job, data_source_urls, self.cluster)
@ -173,6 +194,26 @@ class OozieJobEngine(base_engine.JobEngine):
path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir,
wf_xml, hdfs_user)
prepared_job_params['context'] = ctx
prepared_job_params['hdfs_user'] = hdfs_user
prepared_job_params['path_to_workflow'] = path_to_workflow
prepared_job_params['use_hbase_lib'] = use_hbase_lib
prepared_job_params['job_execution'] = job_execution
prepared_job_params['oozie_params'] = oozie_params
prepared_job_params['wf_dir'] = wf_dir
prepared_job_params['oozie_server'] = oozie_server
return prepared_job_params
def run_job(self, job_execution):
prepared_job_params = self._prepare_run_job(job_execution)
path_to_workflow = prepared_job_params['path_to_workflow']
hdfs_user = prepared_job_params['hdfs_user']
oozie_params = prepared_job_params['oozie_params']
use_hbase_lib = prepared_job_params['use_hbase_lib']
ctx = prepared_job_params['context']
job_execution = prepared_job_params['job_execution']
job_params = self._get_oozie_job_params(hdfs_user,
path_to_workflow,
oozie_params,
@ -197,6 +238,43 @@ class OozieJobEngine(base_engine.JobEngine):
status = None
return (oozie_job_id, status, None)
def run_scheduled_job(self, job_execution):
prepared_job_params = self._prepare_run_job(job_execution)
oozie_server = prepared_job_params['oozie_server']
wf_dir = prepared_job_params['wf_dir']
hdfs_user = prepared_job_params['hdfs_user']
oozie_params = prepared_job_params['oozie_params']
use_hbase_lib = prepared_job_params['use_hbase_lib']
ctx = prepared_job_params['context']
job_execution = prepared_job_params['job_execution']
coord_configs = {"jobTracker": "${jobTracker}",
"nameNode": "${nameNode}"}
coord_xml = self._create_coordinator_xml(coord_configs)
self._upload_coordinator_file(oozie_server, wf_dir, coord_xml,
hdfs_user)
job_params = self._get_oozie_job_params(
hdfs_user, None, oozie_params, use_hbase_lib,
job_execution.job_configs.job_execution_info, wf_dir,
"scheduled")
client = self._get_client()
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
job_execution)
job_execution = conductor.job_execution_get(ctx, job_execution.id)
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
return (None, edp.JOB_STATUS_KILLED, None)
try:
status = client.get_job_status(job_execution,
oozie_job_id)['status']
except Exception:
status = None
return (oozie_job_id, status, None)
@abc.abstractmethod
def get_hdfs_user(self):
pass
@ -299,6 +377,41 @@ class OozieJobEngine(base_engine.JobEngine):
return constructed_dir
def _create_coordinator_xml(self, coord_configs, config_filter=None,
appname='coord'):
doc = xml.Document()
# Create the <coordinator-app> base element
coord = doc.createElement('coordinator-app')
coord.attributes['name'] = appname
coord.attributes['start'] = "${start}"
coord.attributes['end'] = "${end}"
coord.attributes['frequency'] = "${frequency}"
coord.attributes['timezone'] = 'UTC'
coord.attributes['xmlns'] = 'uri:oozie:coordinator:0.2'
doc.appendChild(coord)
action = doc.createElement('action')
workflow = doc.createElement('workflow')
coord.appendChild(action)
action.appendChild(workflow)
x.add_text_element_to_tag(doc, "workflow", 'app-path',
"${workflowAppUri}")
configuration = doc.createElement('configuration')
workflow.appendChild(configuration)
default_configs = []
if config_filter is not None:
default_configs = [cfg['name'] for cfg in config_filter]
for name in sorted(coord_configs):
if name in default_configs or config_filter is None:
x.add_property_to_configuration(doc, name, coord_configs[name])
# Return newly created XML
return doc.toprettyxml(indent=" ")
def _add_postfix(self, constructed_dir):
def _append_slash_if_needed(path):
if path[-1] != '/':

View File

@ -365,6 +365,10 @@ class SparkJobEngine(base_engine.JobEngine):
"%(status)s, stdout = %(stdout)s") %
{'status': ret, 'stdout': stdout})
def run_scheduled_job(self, job_execution):
raise e.NotImplementedException(_("Currently Spark engine does not"
" support scheduled EDP jobs"))
def validate_job_execution(self, cluster, job, data):
j.check_main_class_present(data, job)

View File

@ -257,6 +257,10 @@ class StormJobEngine(base_engine.JobEngine):
"%(status)s, stdout = %(stdout)s") %
{'status': ret, 'stdout': stdout})
def run_scheduled_job(self, job_execution):
raise e.NotImplementedException(_("Currently Storm engine does not"
" support scheduled EDP jobs"))
def validate_job_execution(self, cluster, job, data):
j.check_main_class_present(data, job)

View File

@ -41,6 +41,9 @@ job_configs = {
"items": {
"type": "string"
}
},
"job_execution_info": {
"type": "simple_config",
}
},
"additionalProperties": False,

View File

@ -13,6 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from oslo_utils import timeutils
from sahara import conductor as c
from sahara import context
from sahara import exceptions as ex
@ -53,8 +57,27 @@ def check_streaming_present(data, job):
_("%s job must specify streaming mapper and reducer") % job.type)
def check_scheduled_job_execution_info(job_execution_info):
start = job_execution_info.get('start', None)
if start is None:
raise ex.InvalidDataException(_(
"Scheduled job must specify start time"))
try:
start = time.strptime(start, "%Y-%m-%d %H:%M:%S")
start = timeutils.datetime.datetime.fromtimestamp(time.mktime(start))
except Exception:
raise ex.InvalidDataException(_("Invalid Time Format"))
now_time = timeutils.utcnow()
if timeutils.delta_seconds(now_time, start) < 0:
raise ex.InvalidJobExecutionInfoException(_(
"Job start time should be later than now"))
def check_job_execution(data, job_id):
ctx = context.ctx()
job_execution_info = data.get('job_execution_info', {})
cluster = conductor.cluster_get(ctx, data['cluster_id'])
if not cluster:
@ -74,6 +97,11 @@ def check_job_execution(data, job_id):
j_i.check_execution_interface(data, job)
edp_engine.validate_job_execution(cluster, job, data)
if 'job_execution_type' in job_execution_info:
j_type = job_execution_info.get('job_execution_type', 'workflow')
if j_type == 'scheduled':
check_scheduled_job_execution_info(job_execution_info)
def check_data_sources(data, job):
if not ('input_id' in data and 'output_id' in data):

View File

@ -42,8 +42,15 @@ class TestOozieEngine(base.SaharaTestCase):
oje = FakeOozieJobEngine(u.create_cluster())
oozie_params = {'oozie.libpath': '/mylibpath',
'oozie.wf.application.path': '/wrong'}
scheduled_params = {'start': '2015-06-10T06:05Z',
'end': '2015-06-10T06:50Z',
'frequency': '10'}
job_dir = '/job_dir'
job_execution_type = 'workflow'
job_params = oje._get_oozie_job_params('hadoop',
'/tmp', oozie_params, True)
'/tmp', oozie_params, True,
scheduled_params, job_dir,
job_execution_type)
self.assertEqual('http://localhost:50030', job_params["jobTracker"])
self.assertEqual('hdfs://localhost:8020', job_params["nameNode"])
self.assertEqual('hadoop', job_params["user.name"])

View File

@ -624,3 +624,28 @@ class TestJobManager(base.SaharaWithDbTestCase):
with testtools.ExpectedException(ex.CancelingFailed):
job_manager.cancel_job(job_exec.id)
@mock.patch('sahara.conductor.API.job_execution_get')
@mock.patch('sahara.conductor.API.cluster_get')
@mock.patch('sahara.conductor.API.job_get')
@mock.patch(
'sahara.service.edp.oozie.engine.OozieJobEngine.run_scheduled_job')
def test_scheduled_edp_job_run(self, job_exec_get, cluster_get,
job_get, run_scheduled_job):
configs = {
'job_execution_info': {
'job_execution_type': 'scheduled',
'start': '2015-5-15T01:00Z'
}
}
job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG, configs)
job_exec_get.return_value = job_exec
job_get.return_value = job
cluster = u.create_cluster()
cluster.status = "Active"
cluster_get.return_value = cluster
job_manager._run_job(job_exec.id)
self.assertEqual(1, run_scheduled_job.call_count)

View File

@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import uuid
import mock
from oslo_utils import timeutils
import six
import testtools
@ -74,7 +76,8 @@ class TestJobExecCreateValidation(u.ValidationTestCase):
"output_id": ds2_id,
"job_configs": {"configs": {},
"params": {},
"args": []}
"args": [],
"job_execution_info": {}}
},
bad_req_i=(1, "INVALID_DATA",
"MapReduce.Streaming job "
@ -91,6 +94,7 @@ class TestJobExecCreateValidation(u.ValidationTestCase):
"edp.streaming.mapper": "/bin/cat",
"edp.streaming.reducer": "/usr/bin/wc"},
"params": {},
"job_execution_info": {},
"args": []}
})
@ -126,6 +130,7 @@ class TestJobExecCreateValidation(u.ValidationTestCase):
"edp.streaming.mapper": "/bin/cat",
"edp.streaming.reducer": "/usr/bin/wc"},
"params": {},
"job_execution_info": {},
"args": []}
})
@ -144,6 +149,7 @@ class TestJobExecCreateValidation(u.ValidationTestCase):
"edp.streaming.mapper": "/bin/cat",
"edp.streaming.reducer": "/usr/bin/wc"},
"params": {},
"job_execution_info": {},
"args": []}
},
bad_req_i=(1, "INVALID_DATA", err_msg))
@ -207,7 +213,8 @@ class TestJobExecCreateValidation(u.ValidationTestCase):
"cluster_id": six.text_type(uuid.uuid4()),
"job_configs": {"configs": {},
"params": {},
"args": []}
"args": [],
"job_execution_info": {}}
},
bad_req_i=(1, "INVALID_DATA",
"%s job must "
@ -220,6 +227,7 @@ class TestJobExecCreateValidation(u.ValidationTestCase):
"configs": {
"edp.java.main_class": "org.me.myclass"},
"params": {},
"job_execution_info": {},
"args": []}
})
@ -238,7 +246,8 @@ class TestJobExecCreateValidation(u.ValidationTestCase):
"cluster_id": six.text_type(uuid.uuid4()),
"job_configs": {"configs": {},
"params": {},
"args": []}
"args": [],
"job_execution_info": {}}
},
bad_req_i=(1, "INVALID_DATA",
"%s job must "
@ -251,9 +260,20 @@ class TestJobExecCreateValidation(u.ValidationTestCase):
"configs": {
"edp.java.main_class": "org.me.myclass"},
"params": {},
"job_execution_info": {},
"args": []}
})
@mock.patch('oslo_utils.timeutils.utcnow')
def test_invalid_start_time_in_job_execution_info(self, now_get):
configs = {"start": "2015-07-21 14:32:52"}
now = time.strptime("2015-07-22 14:39:14", "%Y-%m-%d %H:%M:%S")
now = timeutils.datetime.datetime.fromtimestamp(time.mktime(now))
now_get.return_value = now
with testtools.ExpectedException(ex.InvalidJobExecutionInfoException):
je.check_scheduled_job_execution_info(configs)
class TestJobExecUpdateValidation(u.ValidationTestCase):