From c7e8c7bb71f477b20a1d21c6e31cfd6e1dbb6549 Mon Sep 17 00:00:00 2001 From: luhuichun Date: Tue, 14 Jul 2015 15:31:33 +0800 Subject: [PATCH] [EDP] Add suspend_job() for sahara edp engine(oozie implementation) add suspend_job in base_engine,and implement this in oozie engine. Change-Id: I38d32c501d04c3fd0526343c841a55832a902a2c Implements bp: add-suspend-resume-ability-for-edp-jobs --- sahara/api/v11.py | 5 ++- sahara/exceptions.py | 10 +++++ sahara/service/edp/api.py | 8 ++++ sahara/service/edp/base_engine.py | 16 +++++++- sahara/service/edp/job_manager.py | 41 +++++++++++++++++-- sahara/service/edp/oozie/engine.py | 10 +++++ sahara/service/edp/oozie/oozie.py | 6 +++ sahara/service/ops.py | 15 +++++++ .../service/validations/edp/job_execution.py | 23 ++++++++++- .../validations/edp/job_execution_schema.py | 9 ++++ .../unit/service/edp/test_job_manager.py | 27 ++++++++++-- .../validation/edp/test_job_executor.py | 10 ++++- sahara/utils/edp.py | 20 ++++++++- 13 files changed, 185 insertions(+), 15 deletions(-) diff --git a/sahara/api/v11.py b/sahara/api/v11.py index 0cc76b15..779f660f 100644 --- a/sahara/api/v11.py +++ b/sahara/api/v11.py @@ -71,8 +71,9 @@ def job_executions_cancel(job_execution_id): @rest.patch('/job-executions/') @acl.enforce("data-processing:job-executions:modify") @v.check_exists(api.get_job_execution, id='job_execution_id') -@v.validate( - v_j_e_schema.JOB_EXEC_UPDATE_SCHEMA, v_j_e.check_job_execution_update) +@v.validate(v_j_e_schema.JOB_EXEC_UPDATE_SCHEMA, + v_j_e.check_job_execution_update, + v_j_e.check_job_status_update) def job_executions_update(job_execution_id, data): return u.to_wrapped_dict(api.update_job_execution, job_execution_id, data) diff --git a/sahara/exceptions.py b/sahara/exceptions.py index ed08099d..cf183f0e 100644 --- a/sahara/exceptions.py +++ b/sahara/exceptions.py @@ -147,6 +147,16 @@ class CancelingFailed(SaharaException): code = "CANCELING_FAILED" +class SuspendingFailed(SaharaException): + message = _("Operation was not suspended") + code = "SUSPENDING_FAILED" + + +class InvalidJobStatus(SaharaException): + message = _("Invalid Job Status") + code = "INVALID_JOB_STATUS" + + class DeletionFailed(SaharaException): code = "DELETION_FAILED" message = _("Object was not deleted") diff --git a/sahara/service/edp/api.py b/sahara/service/edp/api.py index c3096934..4f1a820f 100644 --- a/sahara/service/edp/api.py +++ b/sahara/service/edp/api.py @@ -168,9 +168,17 @@ def cancel_job_execution(id): def update_job_execution(id, values): + _update_status(values.pop("info", None)) return conductor.job_execution_update(context.ctx(), id, values) +def _update_status(info): + if info: + status = info.get("status", None) + if status == edp.JOB_ACTION_SUSPEND: + OPS.job_execution_suspend(id) + + def delete_job_execution(id): context.set_current_job_execution_id(id) OPS.delete_job_execution(id) diff --git a/sahara/service/edp/base_engine.py b/sahara/service/edp/base_engine.py index a6f86be1..938e87f1 100644 --- a/sahara/service/edp/base_engine.py +++ b/sahara/service/edp/base_engine.py @@ -19,10 +19,14 @@ import six from sahara import conductor as c - conductor = c.API +def optional(fun): + fun.__not_implemented__ = True + return fun + + @six.add_metaclass(abc.ABCMeta) class JobEngine(object): @abc.abstractmethod @@ -54,3 +58,13 @@ class JobEngine(object): @abc.abstractmethod def get_supported_job_types(): return None + + @optional + def suspend_job(self, job_execution): + pass + + def does_engine_implement(self, fun_name): + fun = getattr(self, fun_name) + if not (fun and callable(fun)): + return False + return not hasattr(fun, '__not_implemented__') diff --git a/sahara/service/edp/job_manager.py b/sahara/service/edp/job_manager.py index e3ff52a9..ee7bf57e 100644 --- a/sahara/service/edp/job_manager.py +++ b/sahara/service/edp/job_manager.py @@ -50,7 +50,7 @@ def _get_job_type(job_execution): return conductor.job_get(context.ctx(), job_execution.job_id).type -def _get_job_engine(cluster, job_execution): +def get_job_engine(cluster, job_execution): return job_utils.get_plugin(cluster).get_edp_engine(cluster, _get_job_type( job_execution)) @@ -96,7 +96,7 @@ def _run_job(job_execution_id): " inactive cluster.")) return - eng = _get_job_engine(cluster, job_execution) + eng = get_job_engine(cluster, job_execution) if eng is None: raise e.EDPError(_("Cluster does not support job type %s") % _get_job_type(job_execution)) @@ -158,7 +158,7 @@ def cancel_job(job_execution_id): if cluster is None: LOG.info(_LI("Can not cancel this job on a non-existant cluster.")) return job_execution - engine = _get_job_engine(cluster, job_execution) + engine = get_job_engine(cluster, job_execution) if engine is not None: job_execution = conductor.job_execution_update( ctx, job_execution_id, @@ -201,7 +201,7 @@ def get_job_status(job_execution_id): cluster = conductor.cluster_get(ctx, job_execution.cluster_id) if (cluster is not None and cluster.status == c_u.CLUSTER_STATUS_ACTIVE): - engine = _get_job_engine(cluster, job_execution) + engine = get_job_engine(cluster, job_execution) if engine is not None: job_execution = _update_job_status(engine, job_execution) @@ -225,3 +225,36 @@ def get_job_config_hints(job_type): for eng in ENGINES: if job_type in eng.get_supported_job_types(): return eng.get_possible_job_config(job_type) + + +def suspend_job(job_execution_id): + ctx = context.ctx() + job_execution = conductor.job_execution_get(ctx, job_execution_id) + if job_execution.info['status'] not in edp.JOB_STATUSES_SUSPENDIBLE: + raise e.SuspendingFailed(_("Suspending operation can not be performed" + " on status: {status}")).format( + status=job_execution.info['status']) + cluster = conductor.cluster_get(ctx, job_execution.cluster_id) + engine = get_job_engine(cluster, job_execution) + job_execution = conductor.job_execution_update( + ctx, job_execution_id, { + 'info': {'status': edp.JOB_STATUS_TOBESUSPENDED}}) + try: + job_info = engine.suspend_job(job_execution) + except Exception as ex: + job_info = None + conductor.job_execution_update( + ctx, job_execution_id, {'info': { + 'status': edp.JOB_STATUS_SUSPEND_FAILED}}) + raise e.SuspendingFailed(_("Error during suspending of job execution: " + "{error}")).format(error=ex) + if job_info is not None: + job_execution = _write_job_status(job_execution, job_info) + LOG.info(_LI("Job execution was suspended successfully")) + return job_execution + + conductor.job_execution_update( + ctx, job_execution_id, {'info': { + 'status': edp.JOB_STATUS_SUSPEND_FAILED}}) + raise e.SuspendingFailed(_("Failed to suspend job execution" + "{jid}")).format(jid=job_execution_id) diff --git a/sahara/service/edp/oozie/engine.py b/sahara/service/edp/oozie/engine.py index 54f08036..89a8b27b 100644 --- a/sahara/service/edp/oozie/engine.py +++ b/sahara/service/edp/oozie/engine.py @@ -436,3 +436,13 @@ class OozieJobEngine(base_engine.JobEngine): if isinstance(v, six.string_types) and v.startswith("hdfs://"): external_hdfs_urls.append(v) return external_hdfs_urls + + def suspend_job(self, job_execution): + return self._manage_job(job_execution, edp.JOB_ACTION_SUSPEND) + + def _manage_job(self, job_execution, action): + if job_execution.oozie_job_id is not None: + client = self._get_client() + if action == edp.JOB_ACTION_SUSPEND: + client.suspend_job(job_execution) + return client.get_job_status(job_execution) diff --git a/sahara/service/edp/oozie/oozie.py b/sahara/service/edp/oozie/oozie.py index 81f42756..33a391ff 100644 --- a/sahara/service/edp/oozie/oozie.py +++ b/sahara/service/edp/oozie/oozie.py @@ -50,6 +50,12 @@ class OozieClient(object): "?action=kill") _check_status_code(resp, 200) + def manage_job(self, job_execution, action): + session = self._get_http_session(job_execution.extra.get('neutron')) + resp = session.put(self.job_url % job_execution.oozie_job_id + + "?action=" + action) + _check_status_code(resp, 200) + def get_job_info(self, job_execution, job_id=None): if job_id is None: job_id = job_execution.engine_job_id diff --git a/sahara/service/ops.py b/sahara/service/ops.py index f5b7262f..d5ebdc37 100644 --- a/sahara/service/ops.py +++ b/sahara/service/ops.py @@ -78,6 +78,10 @@ class LocalOps(object): def get_engine_type_and_version(self): return INFRA.get_type_and_version() + def job_execution_suspend(self, job_execution_id): + context.spawn("Suspend Job Execution %s" % job_execution_id, + _suspend_job_execution, job_execution_id) + class RemoteOps(rpc_utils.RPCClient): def __init__(self): @@ -108,6 +112,9 @@ class RemoteOps(rpc_utils.RPCClient): def get_engine_type_and_version(self): return self.call('get_engine_type_and_version') + def job_execution_suspend(self, job_execution_id): + self.cast('job_execution_suspend', job_execution_id=job_execution_id) + def request_context(func): @functools.wraps(func) @@ -152,6 +159,10 @@ class OpsServer(rpc_utils.RPCServer): def get_engine_type_and_version(self): return INFRA.get_type_and_version() + @request_context + def job_execution_suspend(self, job_execution_id): + _suspend_job_execution(job_execution_id) + def _setup_trust_for_cluster(cluster): cluster = conductor.cluster_get(context.ctx(), cluster) @@ -351,6 +362,10 @@ def _run_edp_job(job_execution_id): job_manager.run_job(job_execution_id) +def _suspend_job_execution(job_execution_id): + job_manager.suspend_job(job_execution_id) + + def _cancel_job_execution(job_execution_id): job_manager.cancel_job(job_execution_id) diff --git a/sahara/service/validations/edp/job_execution.py b/sahara/service/validations/edp/job_execution.py index 798cb156..f5858d2c 100644 --- a/sahara/service/validations/edp/job_execution.py +++ b/sahara/service/validations/edp/job_execution.py @@ -22,10 +22,11 @@ from sahara import context from sahara import exceptions as ex from sahara.i18n import _ from sahara.plugins import base as plugin_base +from sahara.service.edp import job_utils as j_u from sahara.service.validations import acl import sahara.service.validations.edp.base as b import sahara.service.validations.edp.job_interface as j_i - +from sahara.utils import cluster as c_u conductor = c.API @@ -144,3 +145,23 @@ def check_job_execution_update(job_execution_id, data, **kwargs): acl.check_tenant_for_update(ctx, je) acl.check_protected_from_update(je, data) + + +def check_job_status_update(job_execution_id, data): + ctx = context.ctx() + job_execution = conductor.job_execution_get(ctx, job_execution_id) + # check we are updating status + if 'info' in data: + if 'status' in data['info']: + if len(data) != 1: + raise ex.InvalidJobStatus(_("Invalid status parameter")) + cluster = conductor.cluster_get(ctx, job_execution.cluster_id) + engine = j_u.get_plugin(cluster).get_edp_engine( + cluster, conductor.job_get(ctx, job_execution_id).type) + if cluster is None or cluster.status != c_u.CLUSTER_STATUS_ACTIVE: + raise ex.InvalidDataException( + _("Suspending operation can not be performed on an inactive or " + "non-existent cluster")) + if not (engine.does_engine_implement('suspend_job')): + raise ex.InvalidReferenceException( + _("Engine doesn't support suspending job feature")) diff --git a/sahara/service/validations/edp/job_execution_schema.py b/sahara/service/validations/edp/job_execution_schema.py index 26c34a0d..b4af708a 100644 --- a/sahara/service/validations/edp/job_execution_schema.py +++ b/sahara/service/validations/edp/job_execution_schema.py @@ -58,6 +58,15 @@ JOB_EXEC_UPDATE_SCHEMA = { }, "is_protected": { "type": ["boolean", "null"], + }, + "info": { + "type": "simple_config", + "properties": { + "status": { + "enum": ["suspend"] + } + }, + "additionalProperties": False } }, "additionalProperties": False, diff --git a/sahara/tests/unit/service/edp/test_job_manager.py b/sahara/tests/unit/service/edp/test_job_manager.py index 6d699aa6..dbd074d2 100644 --- a/sahara/tests/unit/service/edp/test_job_manager.py +++ b/sahara/tests/unit/service/edp/test_job_manager.py @@ -539,12 +539,12 @@ class TestJobManager(base.SaharaWithDbTestCase): def test_job_type_supported(self, job_get): job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG) job_get.return_value = job - self.assertIsNotNone(job_manager._get_job_engine(u.create_cluster(), - job_exec)) + self.assertIsNotNone(job_manager.get_job_engine(u.create_cluster(), + job_exec)) job.type = "unsupported_type" - self.assertIsNone(job_manager._get_job_engine(u.create_cluster(), - job_exec)) + self.assertIsNone(job_manager.get_job_engine(u.create_cluster(), + job_exec)) @mock.patch('sahara.conductor.API.job_get') @mock.patch('sahara.conductor.API.job_execution_get') @@ -649,3 +649,22 @@ class TestJobManager(base.SaharaWithDbTestCase): job_manager._run_job(job_exec.id) self.assertEqual(1, run_scheduled_job.call_count) + + @mock.patch('sahara.conductor.API.job_get') + @mock.patch('sahara.conductor.API.job_execution_get') + @mock.patch('sahara.conductor.API.cluster_get') + @mock.patch('sahara.service.edp.base_engine.JobEngine.suspend_job') + def test_suspend_unsuspendible_job(self, suspend_job_get, + cluster_get, job_exec_get, job_get): + info = { + 'status': edp.JOB_STATUS_SUCCEEDED + } + job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG, None, False, info) + job_exec_get.return_value = job_exec + job_get.return_value = job + + cluster = u.create_cluster() + cluster.status = "Active" + cluster_get.return_value = cluster + + self.assertEqual(0, suspend_job_get.call_count) diff --git a/sahara/tests/unit/service/validation/edp/test_job_executor.py b/sahara/tests/unit/service/validation/edp/test_job_executor.py index 27899aa5..7b45d47d 100644 --- a/sahara/tests/unit/service/validation/edp/test_job_executor.py +++ b/sahara/tests/unit/service/validation/edp/test_job_executor.py @@ -285,7 +285,10 @@ class TestJobExecUpdateValidation(u.ValidationTestCase): def test_job_execution_update_types(self): data = { 'is_public': False, - 'is_protected': False + 'is_protected': False, + 'info': { + 'status': 'suspend' + } } self._assert_types(data) @@ -293,7 +296,10 @@ class TestJobExecUpdateValidation(u.ValidationTestCase): self._assert_create_object_validation( data={ 'is_public': False, - 'is_protected': False + 'is_protected': False, + 'info': { + 'status': 'suspend' + } } ) diff --git a/sahara/utils/edp.py b/sahara/utils/edp.py index 14fd7878..746d32f4 100644 --- a/sahara/utils/edp.py +++ b/sahara/utils/edp.py @@ -28,12 +28,23 @@ JOB_STATUS_READYTORUN = 'READYTORUN' JOB_STATUS_RUNNING = 'RUNNING' JOB_STATUS_SUCCEEDED = 'SUCCEEDED' JOB_STATUS_TOBEKILLED = 'TOBEKILLED' +JOB_STATUS_TOBESUSPENDED = 'TOBESUSPENDED' +JOB_STATUS_PREP = 'PREP' +JOB_STATUS_PREPSUSPENDED = 'PREPSUSPENDED' +JOB_STATUS_SUSPENDED = 'SUSPENDED' +JOB_STATUS_SUSPEND_FAILED = 'SUSPENDFAILED' +# statuses for suspended jobs +JOB_STATUSES_SUSPENDIBLE = [ + JOB_STATUS_PREP, + JOB_STATUS_RUNNING +] # statuses for terminated jobs JOB_STATUSES_TERMINATED = [ JOB_STATUS_DONEWITHERROR, JOB_STATUS_FAILED, JOB_STATUS_KILLED, - JOB_STATUS_SUCCEEDED + JOB_STATUS_SUCCEEDED, + JOB_STATUS_SUSPEND_FAILED ] # job type separator character JOB_TYPE_SEP = '.' @@ -74,6 +85,13 @@ JOB_TYPES_ACCEPTABLE_CONFIGS = { JOB_TYPE_STORM: {"args"} } +# job actions +JOB_ACTION_SUSPEND = 'suspend' + +JOB_ACTION_TYPES_ACCEPTABLE = [ + JOB_ACTION_SUSPEND +] + ADAPT_FOR_OOZIE = 'edp.java.adapt_for_oozie' SPARK_DRIVER_CLASSPATH = 'edp.spark.driver.classpath' ADAPT_SPARK_FOR_SWIFT = 'edp.spark.adapt_for_swift'