From e4dc6fd5bd7bbff06828df41c7bd32d684174bea Mon Sep 17 00:00:00 2001 From: Shu Yingya Date: Thu, 2 Mar 2017 19:45:01 +0800 Subject: [PATCH] [APIv2] Further rename endpoint of jobs & job_executions Patch [1] has already changed endpoint from jobs to job_templates and job_executions to jobs. However, the filename is still confusing. Follow changes included in this patch: * rename filenames: jobs.py -> job_templates.py job_executions.py -> jobs.py * rename function name in "api/v2": job_list() -> job_template_list() * rename parameter in both "api/v11.py" and "api/v2": job_id -> job_templates_id job_execution_id -> job_id ... I want to correct the parameter name in APIv2, but that leads to validation error. So I also changed the parameter name in "api/v11.py" [1]: https://review.openstack.org/#/c/309395/ Partial-Implements: bp v2-api-experimental-impl Change-Id: Ia7a7ca811c82ddea13c21521a3abd62165ebd513 --- sahara/api/v11.py | 73 ++++++------ sahara/api/v2/__init__.py | 6 +- sahara/api/v2/job_executions.py | 77 ------------- sahara/api/v2/job_templates.py | 83 +++++++++++++ sahara/api/v2/jobs.py | 87 +++++++------- sahara/service/api/v2/job_executions.py | 109 ------------------ sahara/service/api/v2/job_templates.py | 45 ++++++++ sahara/service/api/v2/jobs.py | 88 ++++++++++++-- .../service/validations/edp/job_execution.py | 28 ++--- 9 files changed, 299 insertions(+), 297 deletions(-) delete mode 100644 sahara/api/v2/job_executions.py create mode 100644 sahara/api/v2/job_templates.py delete mode 100644 sahara/service/api/v2/job_executions.py create mode 100644 sahara/service/api/v2/job_templates.py diff --git a/sahara/api/v11.py b/sahara/api/v11.py index ece16a28..2b71a3e9 100644 --- a/sahara/api/v11.py +++ b/sahara/api/v11.py @@ -46,44 +46,44 @@ def job_executions_list(): return u.render(res=result, name='job_executions') -@rest.get('/job-executions/') +@rest.get('/job-executions/') @acl.enforce("data-processing:job-executions:get") -@v.check_exists(api.get_job_execution, id='job_execution_id') -def job_executions(job_execution_id): - return u.to_wrapped_dict(api.get_job_execution, job_execution_id) +@v.check_exists(api.get_job_execution, id='job_id') +def job_executions(job_id): + return u.to_wrapped_dict(api.get_job_execution, job_id) -@rest.get('/job-executions//refresh-status') +@rest.get('/job-executions//refresh-status') @acl.enforce("data-processing:job-executions:refresh_status") -@v.check_exists(api.get_job_execution, id='job_execution_id') -def job_executions_status(job_execution_id): - return u.to_wrapped_dict(api.get_job_execution_status, job_execution_id) +@v.check_exists(api.get_job_execution, id='job_id') +def job_executions_status(job_id): + return u.to_wrapped_dict(api.get_job_execution_status, job_id) -@rest.get('/job-executions//cancel') +@rest.get('/job-executions//cancel') @acl.enforce("data-processing:job-executions:cancel") -@v.check_exists(api.get_job_execution, id='job_execution_id') +@v.check_exists(api.get_job_execution, id='job_id') @v.validate(None, v_j_e.check_job_execution_cancel) -def job_executions_cancel(job_execution_id): - return u.to_wrapped_dict(api.cancel_job_execution, job_execution_id) +def job_executions_cancel(job_id): + return u.to_wrapped_dict(api.cancel_job_execution, job_id) -@rest.patch('/job-executions/') +@rest.patch('/job-executions/') @acl.enforce("data-processing:job-executions:modify") -@v.check_exists(api.get_job_execution, id='job_execution_id') +@v.check_exists(api.get_job_execution, id='job_id') @v.validate(v_j_e_schema.JOB_EXEC_UPDATE_SCHEMA, v_j_e.check_job_execution_update, v_j_e.check_job_status_update) -def job_executions_update(job_execution_id, data): - return u.to_wrapped_dict(api.update_job_execution, job_execution_id, data) +def job_executions_update(job_id, data): + return u.to_wrapped_dict(api.update_job_execution, job_id, data) -@rest.delete('/job-executions/') +@rest.delete('/job-executions/') @acl.enforce("data-processing:job-executions:delete") -@v.check_exists(api.get_job_execution, id='job_execution_id') +@v.check_exists(api.get_job_execution, id='job_id') @v.validate(None, v_j_e.check_job_execution_delete) -def job_executions_delete(job_execution_id): - api.delete_job_execution(job_execution_id) +def job_executions_delete(job_id): + api.delete_job_execution(job_id) return u.render() @@ -151,35 +151,36 @@ def job_create(data): return u.render(api.create_job(data).to_wrapped_dict()) -@rest.get('/jobs/') +@rest.get('/jobs/') @acl.enforce("data-processing:jobs:get") -@v.check_exists(api.get_job, id='job_id') -def job_get(job_id): - return u.to_wrapped_dict(api.get_job, job_id) +@v.check_exists(api.get_job, id='job_templates_id') +def job_get(job_templates_id): + return u.to_wrapped_dict(api.get_job, job_templates_id) -@rest.patch('/jobs/') +@rest.patch('/jobs/') @acl.enforce("data-processing:jobs:modify") -@v.check_exists(api.get_job, id='job_id') +@v.check_exists(api.get_job, id='job_templates_id') @v.validate(v_j_schema.JOB_UPDATE_SCHEMA) -def job_update(job_id, data): - return u.to_wrapped_dict(api.update_job, job_id, data) +def job_update(job_templates_id, data): + return u.to_wrapped_dict(api.update_job, job_templates_id, data) -@rest.delete('/jobs/') +@rest.delete('/jobs/') @acl.enforce("data-processing:jobs:delete") -@v.check_exists(api.get_job, id='job_id') -def job_delete(job_id): - api.delete_job(job_id) +@v.check_exists(api.get_job, id='job_templates_id') +def job_delete(job_templates_id): + api.delete_job(job_templates_id) return u.render() -@rest.post('/jobs//execute') +@rest.post('/jobs//execute') @acl.enforce("data-processing:jobs:execute") -@v.check_exists(api.get_job, id='job_id') +@v.check_exists(api.get_job, id='job_templates_id') @v.validate(v_j_e_schema.JOB_EXEC_SCHEMA, v_j_e.check_job_execution) -def job_execute(job_id, data): - return u.render(job_execution=api.execute_job(job_id, data).to_dict()) +def job_execute(job_templates_id, data): + return u.render(job_execution=api.execute_job( + job_templates_id, data).to_dict()) @rest.get('/jobs/config-hints/') diff --git a/sahara/api/v2/__init__.py b/sahara/api/v2/__init__.py index 3aae2908..8dac28c1 100644 --- a/sahara/api/v2/__init__.py +++ b/sahara/api/v2/__init__.py @@ -37,7 +37,7 @@ from sahara.api.v2 import clusters from sahara.api.v2 import data_sources from sahara.api.v2 import images from sahara.api.v2 import job_binaries -from sahara.api.v2 import job_executions +from sahara.api.v2 import job_templates from sahara.api.v2 import job_types from sahara.api.v2 import jobs from sahara.api.v2 import node_group_templates @@ -59,8 +59,8 @@ def register_blueprints(app, url_prefix): app.register_blueprint(data_sources.rest, url_prefix=url_prefix) app.register_blueprint(images.rest, url_prefix=url_prefix) app.register_blueprint(job_binaries.rest, url_prefix=url_prefix) - app.register_blueprint(job_executions.rest, url_prefix=url_prefix) - app.register_blueprint(job_types.rest, url_prefix=url_prefix) app.register_blueprint(jobs.rest, url_prefix=url_prefix) + app.register_blueprint(job_types.rest, url_prefix=url_prefix) + app.register_blueprint(job_templates.rest, url_prefix=url_prefix) app.register_blueprint(node_group_templates.rest, url_prefix=url_prefix) app.register_blueprint(plugins.rest, url_prefix=url_prefix) diff --git a/sahara/api/v2/job_executions.py b/sahara/api/v2/job_executions.py deleted file mode 100644 index b9686765..00000000 --- a/sahara/api/v2/job_executions.py +++ /dev/null @@ -1,77 +0,0 @@ -# Copyright (c) 2016 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from sahara.api import acl -from sahara.service.api.v2 import job_executions as api -from sahara.service import validation as v -from sahara.service.validations.edp import job_execution as v_j_e -from sahara.service.validations.edp import job_execution_schema as v_j_e_schema -import sahara.utils.api as u - - -rest = u.RestV2('job-executions', __name__) - - -@rest.get('/jobs') -@acl.enforce("data-processing:job-executions:get_all") -@v.check_exists(api.get_job_execution, 'marker') -@v.validate(None, v.validate_pagination_limit, - v.validate_sorting_job_executions) -def job_executions_list(): - result = api.job_execution_list(**u.get_request_args().to_dict()) - return u.render(res=result, name='jobs') - - -@rest.get('/jobs/') -@acl.enforce("data-processing:job-executions:get") -@v.check_exists(api.get_job_execution, id='job_execution_id') -def job_executions(job_execution_id): - return u.to_wrapped_dict(api.get_job_execution, job_execution_id) - - -@rest.get('/jobs//refresh-status') -@acl.enforce("data-processing:job-executions:refresh_status") -@v.check_exists(api.get_job_execution, id='job_execution_id') -def job_executions_status(job_execution_id): - return u.to_wrapped_dict( - api.get_job_execution_status, job_execution_id) - - -@rest.get('/jobs//cancel') -@acl.enforce("data-processing:job-executions:cancel") -@v.check_exists(api.get_job_execution, id='job_execution_id') -@v.validate(None, v_j_e.check_job_execution_cancel) -def job_executions_cancel(job_execution_id): - return u.to_wrapped_dict(api.cancel_job_execution, job_execution_id) - - -@rest.patch('/jobs/') -@acl.enforce("data-processing:job-executions:modify") -@v.check_exists(api.get_job_execution, id='job_execution_id') -@v.validate( - v_j_e_schema.JOB_EXEC_UPDATE_SCHEMA, v_j_e.check_job_execution_update) -def job_executions_update(job_execution_id, data): - return u.to_wrapped_dict( - api.update_job_execution, job_execution_id, data) - - -@rest.delete('/jobs/') -@acl.enforce("data-processing:job-executions:delete") -@v.check_exists(api.get_job_execution, id='job_execution_id') -@v.validate(None, v_j_e.check_job_execution_delete) -def job_executions_delete(job_execution_id): - api.delete_job_execution(job_execution_id) - return u.render() diff --git a/sahara/api/v2/job_templates.py b/sahara/api/v2/job_templates.py new file mode 100644 index 00000000..a0c0c5c3 --- /dev/null +++ b/sahara/api/v2/job_templates.py @@ -0,0 +1,83 @@ +# Copyright (c) 2016 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sahara.api import acl +from sahara.service.api.v2 import job_templates as api +from sahara.service.api.v2 import jobs as j_e_api +from sahara.service import validation as v +from sahara.service.validations.edp import job as v_j +from sahara.service.validations.edp import job_execution as v_j_e +from sahara.service.validations.edp import job_execution_schema as v_j_e_schema +from sahara.service.validations.edp import job_schema as v_j_schema +import sahara.utils.api as u + + +rest = u.RestV2('job-templates', __name__) + + +@rest.get('/job-templates') +@acl.enforce("data-processing:job-templates:get_all") +@v.check_exists(api.get_job_templates, 'marker') +@v.validate(None, v.validate_pagination_limit, + v.validate_sorting_jobs) +def job_templates_list(): + result = api.get_job_templates(**u.get_request_args().to_dict()) + return u.render(res=result, name='job_templates') + + +@rest.post('/job-templates') +@acl.enforce("data-processing:job-templates:create") +@v.validate(v_j_schema.JOB_SCHEMA, v_j.check_mains_libs, v_j.check_interface) +def job_templates_create(data): + return u.render(api.create_job_template(data).to_wrapped_dict()) + + +@rest.get('/job-templates/') +@acl.enforce("data-processing:job-templates:get") +@v.check_exists(api.get_job_templates, id='job_templates_id') +def job_templates_get(job_templates_id): + return u.to_wrapped_dict(api.get_job_template, job_templates_id) + + +@rest.patch('/job-templates/') +@acl.enforce("data-processing:jobs:modify") +@v.check_exists(api.get_job_templates, id='job_templates_id') +@v.validate(v_j_schema.JOB_UPDATE_SCHEMA) +def job_templates_update(job_templates_id, data): + return u.to_wrapped_dict(api.update_job_template, job_templates_id, data) + + +@rest.delete('/job-templates/') +@acl.enforce("data-processing:jobs:delete") +@v.check_exists(api.get_job_templates, id='job_templates_id') +def job_templates_delete(job_templates_id): + api.delete_job_template(job_templates_id) + return u.render() + + +@rest.post('/job-templates//execute') +@acl.enforce("data-processing:jobs:execute") +@v.check_exists(api.get_job_templates, id='job_templates_id') +@v.validate(v_j_e_schema.JOB_EXEC_SCHEMA, v_j_e.check_job_execution) +def job_templates_execute(job_templates_id, data): + return u.render(job_execution=j_e_api.execute_job( + job_templates_id, data).to_dict()) + + +@rest.get('/job-templates/config-hints/') +@acl.enforce("data-processing:jobs:get_config_hints") +@v.check_exists(api.get_job_config_hints, job_type='job_type') +def job_config_hints_get(job_type): + return u.render(api.get_job_config_hints(job_type)) diff --git a/sahara/api/v2/jobs.py b/sahara/api/v2/jobs.py index df31b1e2..e60ca675 100644 --- a/sahara/api/v2/jobs.py +++ b/sahara/api/v2/jobs.py @@ -13,70 +13,65 @@ # See the License for the specific language governing permissions and # limitations under the License. + from sahara.api import acl -from sahara.service.api.v2 import job_executions as j_e_api from sahara.service.api.v2 import jobs as api from sahara.service import validation as v -from sahara.service.validations.edp import job as v_j from sahara.service.validations.edp import job_execution as v_j_e from sahara.service.validations.edp import job_execution_schema as v_j_e_schema -from sahara.service.validations.edp import job_schema as v_j_schema import sahara.utils.api as u rest = u.RestV2('jobs', __name__) -@rest.get('/job-templates') -@acl.enforce("data-processing:jobs:get_all") -@v.check_exists(api.get_job, 'marker') +@rest.get('/jobs') +@acl.enforce("data-processing:job-executions:get_all") +@v.check_exists(api.get_job_execution, 'marker') @v.validate(None, v.validate_pagination_limit, - v.validate_sorting_jobs) -def job_list(): - result = api.get_jobs(**u.get_request_args().to_dict()) - return u.render(res=result, name='job_templates') + v.validate_sorting_job_executions) +def job_executions_list(): + result = api.job_execution_list(**u.get_request_args().to_dict()) + return u.render(res=result, name='jobs') -@rest.post('/job-templates') -@acl.enforce("data-processing:jobs:create") -@v.validate(v_j_schema.JOB_SCHEMA, v_j.check_mains_libs, v_j.check_interface) -def job_create(data): - return u.render(api.create_job(data).to_wrapped_dict()) +@rest.get('/jobs/') +@acl.enforce("data-processing:job-executions:get") +@v.check_exists(api.get_job_execution, id='job_id') +def job_executions(job_id): + return u.to_wrapped_dict(api.get_job_execution, job_id) -@rest.get('/job-templates/') -@acl.enforce("data-processing:jobs:get") -@v.check_exists(api.get_job, id='job_id') -def job_get(job_id): - return u.to_wrapped_dict(api.get_job, job_id) +@rest.get('/jobs//refresh-status') +@acl.enforce("data-processing:job-executions:refresh_status") +@v.check_exists(api.get_job_execution, id='job_id') +def job_executions_status(job_id): + return u.to_wrapped_dict( + api.get_job_execution_status, job_id) -@rest.patch('/job-templates/') -@acl.enforce("data-processing:jobs:modify") -@v.check_exists(api.get_job, id='job_id') -@v.validate(v_j_schema.JOB_UPDATE_SCHEMA) -def job_update(job_id, data): - return u.to_wrapped_dict(api.update_job, job_id, data) +@rest.get('/jobs//cancel') +@acl.enforce("data-processing:job-executions:cancel") +@v.check_exists(api.get_job_execution, id='job_id') +@v.validate(None, v_j_e.check_job_execution_cancel) +def job_executions_cancel(job_id): + return u.to_wrapped_dict(api.cancel_job_execution, job_id) -@rest.delete('/job-templates/') -@acl.enforce("data-processing:jobs:delete") -@v.check_exists(api.get_job, id='job_id') -def job_delete(job_id): - api.delete_job(job_id) +@rest.patch('/jobs/') +@acl.enforce("data-processing:job-executions:modify") +@v.check_exists(api.get_job_execution, id='job_id') +@v.validate( + v_j_e_schema.JOB_EXEC_UPDATE_SCHEMA, v_j_e.check_job_execution_update) +def job_executions_update(job_id, data): + return u.to_wrapped_dict( + api.update_job_execution, job_id, data) + + +@rest.delete('/jobs/') +@acl.enforce("data-processing:job-executions:delete") +@v.check_exists(api.get_job_execution, id='job_id') +@v.validate(None, v_j_e.check_job_execution_delete) +def job_executions_delete(job_id): + api.delete_job_execution(job_id) return u.render() - - -@rest.post('/job-templates//execute') -@acl.enforce("data-processing:jobs:execute") -@v.check_exists(api.get_job, id='job_id') -@v.validate(v_j_e_schema.JOB_EXEC_SCHEMA, v_j_e.check_job_execution) -def job_execute(job_id, data): - return u.render(job_execution=j_e_api.execute_job(job_id, data).to_dict()) - - -@rest.get('/job-templates/config-hints/') -@acl.enforce("data-processing:jobs:get_config_hints") -@v.check_exists(api.get_job_config_hints, job_type='job_type') -def job_config_hints_get(job_type): - return u.render(api.get_job_config_hints(job_type)) diff --git a/sahara/service/api/v2/job_executions.py b/sahara/service/api/v2/job_executions.py deleted file mode 100644 index 613b6b1f..00000000 --- a/sahara/service/api/v2/job_executions.py +++ /dev/null @@ -1,109 +0,0 @@ -# Copyright (c) 2016 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from oslo_log import log as logging - -from sahara import conductor as c -from sahara import context -from sahara import exceptions as ex -from sahara.i18n import _LE -from sahara.service import api -from sahara.service.edp import job_manager as manager -from sahara.utils import edp -from sahara.utils import proxy as p - - -conductor = c.API -LOG = logging.getLogger(__name__) - - -def execute_job(job_id, data): - # Elements common to all job types - cluster_id = data['cluster_id'] - configs = data.get('job_configs', {}) - interface = data.get('interface', {}) - - # Not in Java job types but present for all others - input_id = data.get('input_id', None) - output_id = data.get('output_id', None) - - # Since we will use a unified class in the database, we pass - # a superset for all job types - # example configs['start'] = '2015-05-12T08:55Z' frequency = 5 mins - # the job will starts from 2015-05-12T08:55Z, runs every 5 mins - - job_execution_info = data.get('job_execution_info', {}) - - configs['job_execution_info'] = job_execution_info - - job_ex_dict = {'input_id': input_id, 'output_id': output_id, - 'job_id': job_id, 'cluster_id': cluster_id, - 'info': {'status': edp.JOB_STATUS_PENDING}, - 'job_configs': configs, 'extra': {}, - 'interface': interface} - job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict) - context.set_current_job_execution_id(job_execution.id) - - # check to use proxy user - if p.job_execution_requires_proxy_user(job_execution): - try: - p.create_proxy_user_for_job_execution(job_execution) - except ex.SaharaException as e: - LOG.error(_LE("Can't run job execution. " - "(Reasons: {reason})").format(reason=e)) - conductor.job_execution_destroy(context.ctx(), job_execution) - raise e - - api.OPS.run_edp_job(job_execution.id) - - return job_execution - - -def get_job_execution_status(id): - return manager.get_job_status(id) - - -def job_execution_list(**kwargs): - return conductor.job_execution_get_all(context.ctx(), - regex_search=True, **kwargs) - - -def get_job_execution(id): - return conductor.job_execution_get(context.ctx(), id) - - -def cancel_job_execution(id): - context.set_current_job_execution_id(id) - job_execution = conductor.job_execution_get(context.ctx(), id) - api.OPS.cancel_job_execution(id) - - return job_execution - - -def update_job_execution(id, values): - _update_status(values.pop("info", None)) - return conductor.job_execution_update(context.ctx(), id, values) - - -def _update_status(info): - if info: - status = info.get("status", None) - if status == edp.JOB_ACTION_SUSPEND: - api.OPS.job_execution_suspend(id) - - -def delete_job_execution(id): - context.set_current_job_execution_id(id) - api.OPS.delete_job_execution(id) diff --git a/sahara/service/api/v2/job_templates.py b/sahara/service/api/v2/job_templates.py new file mode 100644 index 00000000..945f3c67 --- /dev/null +++ b/sahara/service/api/v2/job_templates.py @@ -0,0 +1,45 @@ +# Copyright (c) 2016 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sahara import conductor as c +from sahara import context +from sahara.service.edp import job_manager as manager + + +conductor = c.API + + +def get_job_templates(**kwargs): + return conductor.job_get_all(context.ctx(), regex_search=True, **kwargs) + + +def get_job_template(id): + return conductor.job_get(context.ctx(), id) + + +def create_job_template(values): + return conductor.job_create(context.ctx(), values) + + +def update_job_template(id, values): + return conductor.job_update(context.ctx(), id, values) + + +def delete_job_template(job_id): + return conductor.job_destroy(context.ctx(), job_id) + + +def get_job_config_hints(job_type): + return manager.get_job_config_hints(job_type) diff --git a/sahara/service/api/v2/jobs.py b/sahara/service/api/v2/jobs.py index 57e2cf6f..613b6b1f 100644 --- a/sahara/service/api/v2/jobs.py +++ b/sahara/service/api/v2/jobs.py @@ -13,33 +13,97 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo_log import log as logging + from sahara import conductor as c from sahara import context +from sahara import exceptions as ex +from sahara.i18n import _LE +from sahara.service import api from sahara.service.edp import job_manager as manager +from sahara.utils import edp +from sahara.utils import proxy as p conductor = c.API +LOG = logging.getLogger(__name__) -def get_jobs(**kwargs): - return conductor.job_get_all(context.ctx(), regex_search=True, **kwargs) +def execute_job(job_id, data): + # Elements common to all job types + cluster_id = data['cluster_id'] + configs = data.get('job_configs', {}) + interface = data.get('interface', {}) + + # Not in Java job types but present for all others + input_id = data.get('input_id', None) + output_id = data.get('output_id', None) + + # Since we will use a unified class in the database, we pass + # a superset for all job types + # example configs['start'] = '2015-05-12T08:55Z' frequency = 5 mins + # the job will starts from 2015-05-12T08:55Z, runs every 5 mins + + job_execution_info = data.get('job_execution_info', {}) + + configs['job_execution_info'] = job_execution_info + + job_ex_dict = {'input_id': input_id, 'output_id': output_id, + 'job_id': job_id, 'cluster_id': cluster_id, + 'info': {'status': edp.JOB_STATUS_PENDING}, + 'job_configs': configs, 'extra': {}, + 'interface': interface} + job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict) + context.set_current_job_execution_id(job_execution.id) + + # check to use proxy user + if p.job_execution_requires_proxy_user(job_execution): + try: + p.create_proxy_user_for_job_execution(job_execution) + except ex.SaharaException as e: + LOG.error(_LE("Can't run job execution. " + "(Reasons: {reason})").format(reason=e)) + conductor.job_execution_destroy(context.ctx(), job_execution) + raise e + + api.OPS.run_edp_job(job_execution.id) + + return job_execution -def get_job(id): - return conductor.job_get(context.ctx(), id) +def get_job_execution_status(id): + return manager.get_job_status(id) -def create_job(values): - return conductor.job_create(context.ctx(), values) +def job_execution_list(**kwargs): + return conductor.job_execution_get_all(context.ctx(), + regex_search=True, **kwargs) -def update_job(id, values): - return conductor.job_update(context.ctx(), id, values) +def get_job_execution(id): + return conductor.job_execution_get(context.ctx(), id) -def delete_job(job_id): - return conductor.job_destroy(context.ctx(), job_id) +def cancel_job_execution(id): + context.set_current_job_execution_id(id) + job_execution = conductor.job_execution_get(context.ctx(), id) + api.OPS.cancel_job_execution(id) + + return job_execution -def get_job_config_hints(job_type): - return manager.get_job_config_hints(job_type) +def update_job_execution(id, values): + _update_status(values.pop("info", None)) + return conductor.job_execution_update(context.ctx(), id, values) + + +def _update_status(info): + if info: + status = info.get("status", None) + if status == edp.JOB_ACTION_SUSPEND: + api.OPS.job_execution_suspend(id) + + +def delete_job_execution(id): + context.set_current_job_execution_id(id) + api.OPS.delete_job_execution(id) diff --git a/sahara/service/validations/edp/job_execution.py b/sahara/service/validations/edp/job_execution.py index 4b357400..5ee15033 100644 --- a/sahara/service/validations/edp/job_execution.py +++ b/sahara/service/validations/edp/job_execution.py @@ -100,7 +100,7 @@ def check_scheduled_job_execution_info(job_execution_info): "Job start time should be later than now")) -def check_job_execution(data, job_id): +def check_job_execution(data, job_templates_id): ctx = context.ctx() job_execution_info = data.get('job_execution_info', {}) @@ -110,7 +110,7 @@ def check_job_execution(data, job_id): _("Cluster with id '%s' doesn't exist") % data['cluster_id']) val_base.check_plugin_labels(cluster.plugin_name, cluster.hadoop_version) - job = conductor.job_get(ctx, job_id) + job = conductor.job_get(ctx, job_templates_id) plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) edp_engine = plugin.get_edp_engine(cluster, job.type) @@ -140,41 +140,41 @@ def check_data_sources(data, job): b.check_data_sources_are_different(data['input_id'], data['output_id']) -def check_job_execution_cancel(job_execution_id, **kwargs): +def check_job_execution_cancel(job_id, **kwargs): ctx = context.current() - je = conductor.job_execution_get(ctx, job_execution_id) + je = conductor.job_execution_get(ctx, job_id) if je.tenant_id != ctx.tenant_id: raise ex.CancelingFailed( _("Job execution with id '%s' cannot be canceled " "because it wasn't created in this tenant") - % job_execution_id) + % job_id) if je.is_protected: raise ex.CancelingFailed( _("Job Execution with id '%s' cannot be canceled " - "because it's marked as protected") % job_execution_id) + "because it's marked as protected") % job_id) -def check_job_execution_delete(job_execution_id, **kwargs): +def check_job_execution_delete(job_id, **kwargs): ctx = context.current() - je = conductor.job_execution_get(ctx, job_execution_id) + je = conductor.job_execution_get(ctx, job_id) acl.check_tenant_for_delete(ctx, je) acl.check_protected_from_delete(je) -def check_job_execution_update(job_execution_id, data, **kwargs): +def check_job_execution_update(job_id, data, **kwargs): ctx = context.current() - je = conductor.job_execution_get(ctx, job_execution_id) + je = conductor.job_execution_get(ctx, job_id) acl.check_tenant_for_update(ctx, je) acl.check_protected_from_update(je, data) -def check_job_status_update(job_execution_id, data): +def check_job_status_update(job_id, data): ctx = context.ctx() - job_execution = conductor.job_execution_get(ctx, job_execution_id) + job_execution = conductor.job_execution_get(ctx, job_id) # check we are updating status if 'info' in data: if len(data) != 1: @@ -184,8 +184,8 @@ def check_job_status_update(job_execution_id, data): raise ex.InvalidDataException( _("Suspending operation can not be performed on an inactive or " "non-existent cluster")) - job_id = conductor.job_execution_get(ctx, job_execution_id).job_id - job_type = conductor.job_get(ctx, job_id).type + job_templates_id = conductor.job_execution_get(ctx, job_id).job_id + job_type = conductor.job_get(ctx, job_templates_id).type engine = j_u.get_plugin(cluster).get_edp_engine(cluster, job_type) if 'info' in data: if data.info['status'] == edp.JOB_ACTION_SUSPEND: