[EDP] Add suspend_job() for sahara edp engine(oozie implementation)
add suspend_job in base_engine,and implement this in oozie engine. Change-Id: I38d32c501d04c3fd0526343c841a55832a902a2c Implements bp: add-suspend-resume-ability-for-edp-jobs
This commit is contained in:
parent
10ab690d8b
commit
c7e8c7bb71
@ -71,8 +71,9 @@ def job_executions_cancel(job_execution_id):
|
||||
@rest.patch('/job-executions/<job_execution_id>')
|
||||
@acl.enforce("data-processing:job-executions:modify")
|
||||
@v.check_exists(api.get_job_execution, id='job_execution_id')
|
||||
@v.validate(
|
||||
v_j_e_schema.JOB_EXEC_UPDATE_SCHEMA, v_j_e.check_job_execution_update)
|
||||
@v.validate(v_j_e_schema.JOB_EXEC_UPDATE_SCHEMA,
|
||||
v_j_e.check_job_execution_update,
|
||||
v_j_e.check_job_status_update)
|
||||
def job_executions_update(job_execution_id, data):
|
||||
return u.to_wrapped_dict(api.update_job_execution, job_execution_id, data)
|
||||
|
||||
|
@ -147,6 +147,16 @@ class CancelingFailed(SaharaException):
|
||||
code = "CANCELING_FAILED"
|
||||
|
||||
|
||||
class SuspendingFailed(SaharaException):
|
||||
message = _("Operation was not suspended")
|
||||
code = "SUSPENDING_FAILED"
|
||||
|
||||
|
||||
class InvalidJobStatus(SaharaException):
|
||||
message = _("Invalid Job Status")
|
||||
code = "INVALID_JOB_STATUS"
|
||||
|
||||
|
||||
class DeletionFailed(SaharaException):
|
||||
code = "DELETION_FAILED"
|
||||
message = _("Object was not deleted")
|
||||
|
@ -168,9 +168,17 @@ def cancel_job_execution(id):
|
||||
|
||||
|
||||
def update_job_execution(id, values):
|
||||
_update_status(values.pop("info", None))
|
||||
return conductor.job_execution_update(context.ctx(), id, values)
|
||||
|
||||
|
||||
def _update_status(info):
|
||||
if info:
|
||||
status = info.get("status", None)
|
||||
if status == edp.JOB_ACTION_SUSPEND:
|
||||
OPS.job_execution_suspend(id)
|
||||
|
||||
|
||||
def delete_job_execution(id):
|
||||
context.set_current_job_execution_id(id)
|
||||
OPS.delete_job_execution(id)
|
||||
|
@ -19,10 +19,14 @@ import six
|
||||
|
||||
from sahara import conductor as c
|
||||
|
||||
|
||||
conductor = c.API
|
||||
|
||||
|
||||
def optional(fun):
|
||||
fun.__not_implemented__ = True
|
||||
return fun
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class JobEngine(object):
|
||||
@abc.abstractmethod
|
||||
@ -54,3 +58,13 @@ class JobEngine(object):
|
||||
@abc.abstractmethod
|
||||
def get_supported_job_types():
|
||||
return None
|
||||
|
||||
@optional
|
||||
def suspend_job(self, job_execution):
|
||||
pass
|
||||
|
||||
def does_engine_implement(self, fun_name):
|
||||
fun = getattr(self, fun_name)
|
||||
if not (fun and callable(fun)):
|
||||
return False
|
||||
return not hasattr(fun, '__not_implemented__')
|
||||
|
@ -50,7 +50,7 @@ def _get_job_type(job_execution):
|
||||
return conductor.job_get(context.ctx(), job_execution.job_id).type
|
||||
|
||||
|
||||
def _get_job_engine(cluster, job_execution):
|
||||
def get_job_engine(cluster, job_execution):
|
||||
return job_utils.get_plugin(cluster).get_edp_engine(cluster,
|
||||
_get_job_type(
|
||||
job_execution))
|
||||
@ -96,7 +96,7 @@ def _run_job(job_execution_id):
|
||||
" inactive cluster."))
|
||||
return
|
||||
|
||||
eng = _get_job_engine(cluster, job_execution)
|
||||
eng = get_job_engine(cluster, job_execution)
|
||||
if eng is None:
|
||||
raise e.EDPError(_("Cluster does not support job type %s")
|
||||
% _get_job_type(job_execution))
|
||||
@ -158,7 +158,7 @@ def cancel_job(job_execution_id):
|
||||
if cluster is None:
|
||||
LOG.info(_LI("Can not cancel this job on a non-existant cluster."))
|
||||
return job_execution
|
||||
engine = _get_job_engine(cluster, job_execution)
|
||||
engine = get_job_engine(cluster, job_execution)
|
||||
if engine is not None:
|
||||
job_execution = conductor.job_execution_update(
|
||||
ctx, job_execution_id,
|
||||
@ -201,7 +201,7 @@ def get_job_status(job_execution_id):
|
||||
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
|
||||
if (cluster is not None and
|
||||
cluster.status == c_u.CLUSTER_STATUS_ACTIVE):
|
||||
engine = _get_job_engine(cluster, job_execution)
|
||||
engine = get_job_engine(cluster, job_execution)
|
||||
if engine is not None:
|
||||
job_execution = _update_job_status(engine,
|
||||
job_execution)
|
||||
@ -225,3 +225,36 @@ def get_job_config_hints(job_type):
|
||||
for eng in ENGINES:
|
||||
if job_type in eng.get_supported_job_types():
|
||||
return eng.get_possible_job_config(job_type)
|
||||
|
||||
|
||||
def suspend_job(job_execution_id):
|
||||
ctx = context.ctx()
|
||||
job_execution = conductor.job_execution_get(ctx, job_execution_id)
|
||||
if job_execution.info['status'] not in edp.JOB_STATUSES_SUSPENDIBLE:
|
||||
raise e.SuspendingFailed(_("Suspending operation can not be performed"
|
||||
" on status: {status}")).format(
|
||||
status=job_execution.info['status'])
|
||||
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
|
||||
engine = get_job_engine(cluster, job_execution)
|
||||
job_execution = conductor.job_execution_update(
|
||||
ctx, job_execution_id, {
|
||||
'info': {'status': edp.JOB_STATUS_TOBESUSPENDED}})
|
||||
try:
|
||||
job_info = engine.suspend_job(job_execution)
|
||||
except Exception as ex:
|
||||
job_info = None
|
||||
conductor.job_execution_update(
|
||||
ctx, job_execution_id, {'info': {
|
||||
'status': edp.JOB_STATUS_SUSPEND_FAILED}})
|
||||
raise e.SuspendingFailed(_("Error during suspending of job execution: "
|
||||
"{error}")).format(error=ex)
|
||||
if job_info is not None:
|
||||
job_execution = _write_job_status(job_execution, job_info)
|
||||
LOG.info(_LI("Job execution was suspended successfully"))
|
||||
return job_execution
|
||||
|
||||
conductor.job_execution_update(
|
||||
ctx, job_execution_id, {'info': {
|
||||
'status': edp.JOB_STATUS_SUSPEND_FAILED}})
|
||||
raise e.SuspendingFailed(_("Failed to suspend job execution"
|
||||
"{jid}")).format(jid=job_execution_id)
|
||||
|
@ -436,3 +436,13 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
if isinstance(v, six.string_types) and v.startswith("hdfs://"):
|
||||
external_hdfs_urls.append(v)
|
||||
return external_hdfs_urls
|
||||
|
||||
def suspend_job(self, job_execution):
|
||||
return self._manage_job(job_execution, edp.JOB_ACTION_SUSPEND)
|
||||
|
||||
def _manage_job(self, job_execution, action):
|
||||
if job_execution.oozie_job_id is not None:
|
||||
client = self._get_client()
|
||||
if action == edp.JOB_ACTION_SUSPEND:
|
||||
client.suspend_job(job_execution)
|
||||
return client.get_job_status(job_execution)
|
||||
|
@ -50,6 +50,12 @@ class OozieClient(object):
|
||||
"?action=kill")
|
||||
_check_status_code(resp, 200)
|
||||
|
||||
def manage_job(self, job_execution, action):
|
||||
session = self._get_http_session(job_execution.extra.get('neutron'))
|
||||
resp = session.put(self.job_url % job_execution.oozie_job_id +
|
||||
"?action=" + action)
|
||||
_check_status_code(resp, 200)
|
||||
|
||||
def get_job_info(self, job_execution, job_id=None):
|
||||
if job_id is None:
|
||||
job_id = job_execution.engine_job_id
|
||||
|
@ -78,6 +78,10 @@ class LocalOps(object):
|
||||
def get_engine_type_and_version(self):
|
||||
return INFRA.get_type_and_version()
|
||||
|
||||
def job_execution_suspend(self, job_execution_id):
|
||||
context.spawn("Suspend Job Execution %s" % job_execution_id,
|
||||
_suspend_job_execution, job_execution_id)
|
||||
|
||||
|
||||
class RemoteOps(rpc_utils.RPCClient):
|
||||
def __init__(self):
|
||||
@ -108,6 +112,9 @@ class RemoteOps(rpc_utils.RPCClient):
|
||||
def get_engine_type_and_version(self):
|
||||
return self.call('get_engine_type_and_version')
|
||||
|
||||
def job_execution_suspend(self, job_execution_id):
|
||||
self.cast('job_execution_suspend', job_execution_id=job_execution_id)
|
||||
|
||||
|
||||
def request_context(func):
|
||||
@functools.wraps(func)
|
||||
@ -152,6 +159,10 @@ class OpsServer(rpc_utils.RPCServer):
|
||||
def get_engine_type_and_version(self):
|
||||
return INFRA.get_type_and_version()
|
||||
|
||||
@request_context
|
||||
def job_execution_suspend(self, job_execution_id):
|
||||
_suspend_job_execution(job_execution_id)
|
||||
|
||||
|
||||
def _setup_trust_for_cluster(cluster):
|
||||
cluster = conductor.cluster_get(context.ctx(), cluster)
|
||||
@ -351,6 +362,10 @@ def _run_edp_job(job_execution_id):
|
||||
job_manager.run_job(job_execution_id)
|
||||
|
||||
|
||||
def _suspend_job_execution(job_execution_id):
|
||||
job_manager.suspend_job(job_execution_id)
|
||||
|
||||
|
||||
def _cancel_job_execution(job_execution_id):
|
||||
job_manager.cancel_job(job_execution_id)
|
||||
|
||||
|
@ -22,10 +22,11 @@ from sahara import context
|
||||
from sahara import exceptions as ex
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins import base as plugin_base
|
||||
from sahara.service.edp import job_utils as j_u
|
||||
from sahara.service.validations import acl
|
||||
import sahara.service.validations.edp.base as b
|
||||
import sahara.service.validations.edp.job_interface as j_i
|
||||
|
||||
from sahara.utils import cluster as c_u
|
||||
|
||||
conductor = c.API
|
||||
|
||||
@ -144,3 +145,23 @@ def check_job_execution_update(job_execution_id, data, **kwargs):
|
||||
|
||||
acl.check_tenant_for_update(ctx, je)
|
||||
acl.check_protected_from_update(je, data)
|
||||
|
||||
|
||||
def check_job_status_update(job_execution_id, data):
|
||||
ctx = context.ctx()
|
||||
job_execution = conductor.job_execution_get(ctx, job_execution_id)
|
||||
# check we are updating status
|
||||
if 'info' in data:
|
||||
if 'status' in data['info']:
|
||||
if len(data) != 1:
|
||||
raise ex.InvalidJobStatus(_("Invalid status parameter"))
|
||||
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
|
||||
engine = j_u.get_plugin(cluster).get_edp_engine(
|
||||
cluster, conductor.job_get(ctx, job_execution_id).type)
|
||||
if cluster is None or cluster.status != c_u.CLUSTER_STATUS_ACTIVE:
|
||||
raise ex.InvalidDataException(
|
||||
_("Suspending operation can not be performed on an inactive or "
|
||||
"non-existent cluster"))
|
||||
if not (engine.does_engine_implement('suspend_job')):
|
||||
raise ex.InvalidReferenceException(
|
||||
_("Engine doesn't support suspending job feature"))
|
||||
|
@ -58,6 +58,15 @@ JOB_EXEC_UPDATE_SCHEMA = {
|
||||
},
|
||||
"is_protected": {
|
||||
"type": ["boolean", "null"],
|
||||
},
|
||||
"info": {
|
||||
"type": "simple_config",
|
||||
"properties": {
|
||||
"status": {
|
||||
"enum": ["suspend"]
|
||||
}
|
||||
},
|
||||
"additionalProperties": False
|
||||
}
|
||||
},
|
||||
"additionalProperties": False,
|
||||
|
@ -539,12 +539,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
def test_job_type_supported(self, job_get):
|
||||
job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
|
||||
job_get.return_value = job
|
||||
self.assertIsNotNone(job_manager._get_job_engine(u.create_cluster(),
|
||||
job_exec))
|
||||
self.assertIsNotNone(job_manager.get_job_engine(u.create_cluster(),
|
||||
job_exec))
|
||||
|
||||
job.type = "unsupported_type"
|
||||
self.assertIsNone(job_manager._get_job_engine(u.create_cluster(),
|
||||
job_exec))
|
||||
self.assertIsNone(job_manager.get_job_engine(u.create_cluster(),
|
||||
job_exec))
|
||||
|
||||
@mock.patch('sahara.conductor.API.job_get')
|
||||
@mock.patch('sahara.conductor.API.job_execution_get')
|
||||
@ -649,3 +649,22 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
job_manager._run_job(job_exec.id)
|
||||
|
||||
self.assertEqual(1, run_scheduled_job.call_count)
|
||||
|
||||
@mock.patch('sahara.conductor.API.job_get')
|
||||
@mock.patch('sahara.conductor.API.job_execution_get')
|
||||
@mock.patch('sahara.conductor.API.cluster_get')
|
||||
@mock.patch('sahara.service.edp.base_engine.JobEngine.suspend_job')
|
||||
def test_suspend_unsuspendible_job(self, suspend_job_get,
|
||||
cluster_get, job_exec_get, job_get):
|
||||
info = {
|
||||
'status': edp.JOB_STATUS_SUCCEEDED
|
||||
}
|
||||
job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG, None, False, info)
|
||||
job_exec_get.return_value = job_exec
|
||||
job_get.return_value = job
|
||||
|
||||
cluster = u.create_cluster()
|
||||
cluster.status = "Active"
|
||||
cluster_get.return_value = cluster
|
||||
|
||||
self.assertEqual(0, suspend_job_get.call_count)
|
||||
|
@ -285,7 +285,10 @@ class TestJobExecUpdateValidation(u.ValidationTestCase):
|
||||
def test_job_execution_update_types(self):
|
||||
data = {
|
||||
'is_public': False,
|
||||
'is_protected': False
|
||||
'is_protected': False,
|
||||
'info': {
|
||||
'status': 'suspend'
|
||||
}
|
||||
}
|
||||
self._assert_types(data)
|
||||
|
||||
@ -293,7 +296,10 @@ class TestJobExecUpdateValidation(u.ValidationTestCase):
|
||||
self._assert_create_object_validation(
|
||||
data={
|
||||
'is_public': False,
|
||||
'is_protected': False
|
||||
'is_protected': False,
|
||||
'info': {
|
||||
'status': 'suspend'
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -28,12 +28,23 @@ JOB_STATUS_READYTORUN = 'READYTORUN'
|
||||
JOB_STATUS_RUNNING = 'RUNNING'
|
||||
JOB_STATUS_SUCCEEDED = 'SUCCEEDED'
|
||||
JOB_STATUS_TOBEKILLED = 'TOBEKILLED'
|
||||
JOB_STATUS_TOBESUSPENDED = 'TOBESUSPENDED'
|
||||
JOB_STATUS_PREP = 'PREP'
|
||||
JOB_STATUS_PREPSUSPENDED = 'PREPSUSPENDED'
|
||||
JOB_STATUS_SUSPENDED = 'SUSPENDED'
|
||||
JOB_STATUS_SUSPEND_FAILED = 'SUSPENDFAILED'
|
||||
# statuses for suspended jobs
|
||||
JOB_STATUSES_SUSPENDIBLE = [
|
||||
JOB_STATUS_PREP,
|
||||
JOB_STATUS_RUNNING
|
||||
]
|
||||
# statuses for terminated jobs
|
||||
JOB_STATUSES_TERMINATED = [
|
||||
JOB_STATUS_DONEWITHERROR,
|
||||
JOB_STATUS_FAILED,
|
||||
JOB_STATUS_KILLED,
|
||||
JOB_STATUS_SUCCEEDED
|
||||
JOB_STATUS_SUCCEEDED,
|
||||
JOB_STATUS_SUSPEND_FAILED
|
||||
]
|
||||
# job type separator character
|
||||
JOB_TYPE_SEP = '.'
|
||||
@ -74,6 +85,13 @@ JOB_TYPES_ACCEPTABLE_CONFIGS = {
|
||||
JOB_TYPE_STORM: {"args"}
|
||||
}
|
||||
|
||||
# job actions
|
||||
JOB_ACTION_SUSPEND = 'suspend'
|
||||
|
||||
JOB_ACTION_TYPES_ACCEPTABLE = [
|
||||
JOB_ACTION_SUSPEND
|
||||
]
|
||||
|
||||
ADAPT_FOR_OOZIE = 'edp.java.adapt_for_oozie'
|
||||
SPARK_DRIVER_CLASSPATH = 'edp.spark.driver.classpath'
|
||||
ADAPT_SPARK_FOR_SWIFT = 'edp.spark.adapt_for_swift'
|
||||
|
Loading…
Reference in New Issue
Block a user