[APIv2] Further rename endpoint of jobs & job_executions

Patch [1] has already changed endpoint from jobs to
job_templates and job_executions to jobs.
However, the filename is still confusing.
Follow changes included in this patch:
 * rename filenames: jobs.py -> job_templates.py
                     job_executions.py -> jobs.py
 * rename function name in "api/v2":
                     job_list() -> job_template_list()
 * rename parameter in both "api/v11.py" and "api/v2":
                     job_id -> job_templates_id
                     job_execution_id -> job_id
                     ...
I want to correct the parameter name in APIv2,
but that leads to validation error. So I also changed
the parameter name in "api/v11.py"

[1]: https://review.openstack.org/#/c/309395/

Partial-Implements: bp v2-api-experimental-impl
Change-Id: Ia7a7ca811c82ddea13c21521a3abd62165ebd513
This commit is contained in:
Shu Yingya 2017-03-02 19:45:01 +08:00
parent e8084f140c
commit e4dc6fd5bd
9 changed files with 299 additions and 297 deletions

View File

@ -46,44 +46,44 @@ def job_executions_list():
return u.render(res=result, name='job_executions')
@rest.get('/job-executions/<job_execution_id>')
@rest.get('/job-executions/<job_id>')
@acl.enforce("data-processing:job-executions:get")
@v.check_exists(api.get_job_execution, id='job_execution_id')
def job_executions(job_execution_id):
return u.to_wrapped_dict(api.get_job_execution, job_execution_id)
@v.check_exists(api.get_job_execution, id='job_id')
def job_executions(job_id):
return u.to_wrapped_dict(api.get_job_execution, job_id)
@rest.get('/job-executions/<job_execution_id>/refresh-status')
@rest.get('/job-executions/<job_id>/refresh-status')
@acl.enforce("data-processing:job-executions:refresh_status")
@v.check_exists(api.get_job_execution, id='job_execution_id')
def job_executions_status(job_execution_id):
return u.to_wrapped_dict(api.get_job_execution_status, job_execution_id)
@v.check_exists(api.get_job_execution, id='job_id')
def job_executions_status(job_id):
return u.to_wrapped_dict(api.get_job_execution_status, job_id)
@rest.get('/job-executions/<job_execution_id>/cancel')
@rest.get('/job-executions/<job_id>/cancel')
@acl.enforce("data-processing:job-executions:cancel")
@v.check_exists(api.get_job_execution, id='job_execution_id')
@v.check_exists(api.get_job_execution, id='job_id')
@v.validate(None, v_j_e.check_job_execution_cancel)
def job_executions_cancel(job_execution_id):
return u.to_wrapped_dict(api.cancel_job_execution, job_execution_id)
def job_executions_cancel(job_id):
return u.to_wrapped_dict(api.cancel_job_execution, job_id)
@rest.patch('/job-executions/<job_execution_id>')
@rest.patch('/job-executions/<job_id>')
@acl.enforce("data-processing:job-executions:modify")
@v.check_exists(api.get_job_execution, id='job_execution_id')
@v.check_exists(api.get_job_execution, id='job_id')
@v.validate(v_j_e_schema.JOB_EXEC_UPDATE_SCHEMA,
v_j_e.check_job_execution_update,
v_j_e.check_job_status_update)
def job_executions_update(job_execution_id, data):
return u.to_wrapped_dict(api.update_job_execution, job_execution_id, data)
def job_executions_update(job_id, data):
return u.to_wrapped_dict(api.update_job_execution, job_id, data)
@rest.delete('/job-executions/<job_execution_id>')
@rest.delete('/job-executions/<job_id>')
@acl.enforce("data-processing:job-executions:delete")
@v.check_exists(api.get_job_execution, id='job_execution_id')
@v.check_exists(api.get_job_execution, id='job_id')
@v.validate(None, v_j_e.check_job_execution_delete)
def job_executions_delete(job_execution_id):
api.delete_job_execution(job_execution_id)
def job_executions_delete(job_id):
api.delete_job_execution(job_id)
return u.render()
@ -151,35 +151,36 @@ def job_create(data):
return u.render(api.create_job(data).to_wrapped_dict())
@rest.get('/jobs/<job_id>')
@rest.get('/jobs/<job_templates_id>')
@acl.enforce("data-processing:jobs:get")
@v.check_exists(api.get_job, id='job_id')
def job_get(job_id):
return u.to_wrapped_dict(api.get_job, job_id)
@v.check_exists(api.get_job, id='job_templates_id')
def job_get(job_templates_id):
return u.to_wrapped_dict(api.get_job, job_templates_id)
@rest.patch('/jobs/<job_id>')
@rest.patch('/jobs/<job_templates_id>')
@acl.enforce("data-processing:jobs:modify")
@v.check_exists(api.get_job, id='job_id')
@v.check_exists(api.get_job, id='job_templates_id')
@v.validate(v_j_schema.JOB_UPDATE_SCHEMA)
def job_update(job_id, data):
return u.to_wrapped_dict(api.update_job, job_id, data)
def job_update(job_templates_id, data):
return u.to_wrapped_dict(api.update_job, job_templates_id, data)
@rest.delete('/jobs/<job_id>')
@rest.delete('/jobs/<job_templates_id>')
@acl.enforce("data-processing:jobs:delete")
@v.check_exists(api.get_job, id='job_id')
def job_delete(job_id):
api.delete_job(job_id)
@v.check_exists(api.get_job, id='job_templates_id')
def job_delete(job_templates_id):
api.delete_job(job_templates_id)
return u.render()
@rest.post('/jobs/<job_id>/execute')
@rest.post('/jobs/<job_templates_id>/execute')
@acl.enforce("data-processing:jobs:execute")
@v.check_exists(api.get_job, id='job_id')
@v.check_exists(api.get_job, id='job_templates_id')
@v.validate(v_j_e_schema.JOB_EXEC_SCHEMA, v_j_e.check_job_execution)
def job_execute(job_id, data):
return u.render(job_execution=api.execute_job(job_id, data).to_dict())
def job_execute(job_templates_id, data):
return u.render(job_execution=api.execute_job(
job_templates_id, data).to_dict())
@rest.get('/jobs/config-hints/<job_type>')

View File

@ -37,7 +37,7 @@ from sahara.api.v2 import clusters
from sahara.api.v2 import data_sources
from sahara.api.v2 import images
from sahara.api.v2 import job_binaries
from sahara.api.v2 import job_executions
from sahara.api.v2 import job_templates
from sahara.api.v2 import job_types
from sahara.api.v2 import jobs
from sahara.api.v2 import node_group_templates
@ -59,8 +59,8 @@ def register_blueprints(app, url_prefix):
app.register_blueprint(data_sources.rest, url_prefix=url_prefix)
app.register_blueprint(images.rest, url_prefix=url_prefix)
app.register_blueprint(job_binaries.rest, url_prefix=url_prefix)
app.register_blueprint(job_executions.rest, url_prefix=url_prefix)
app.register_blueprint(job_types.rest, url_prefix=url_prefix)
app.register_blueprint(jobs.rest, url_prefix=url_prefix)
app.register_blueprint(job_types.rest, url_prefix=url_prefix)
app.register_blueprint(job_templates.rest, url_prefix=url_prefix)
app.register_blueprint(node_group_templates.rest, url_prefix=url_prefix)
app.register_blueprint(plugins.rest, url_prefix=url_prefix)

View File

@ -1,77 +0,0 @@
# Copyright (c) 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.api import acl
from sahara.service.api.v2 import job_executions as api
from sahara.service import validation as v
from sahara.service.validations.edp import job_execution as v_j_e
from sahara.service.validations.edp import job_execution_schema as v_j_e_schema
import sahara.utils.api as u
rest = u.RestV2('job-executions', __name__)
@rest.get('/jobs')
@acl.enforce("data-processing:job-executions:get_all")
@v.check_exists(api.get_job_execution, 'marker')
@v.validate(None, v.validate_pagination_limit,
v.validate_sorting_job_executions)
def job_executions_list():
result = api.job_execution_list(**u.get_request_args().to_dict())
return u.render(res=result, name='jobs')
@rest.get('/jobs/<job_execution_id>')
@acl.enforce("data-processing:job-executions:get")
@v.check_exists(api.get_job_execution, id='job_execution_id')
def job_executions(job_execution_id):
return u.to_wrapped_dict(api.get_job_execution, job_execution_id)
@rest.get('/jobs/<job_execution_id>/refresh-status')
@acl.enforce("data-processing:job-executions:refresh_status")
@v.check_exists(api.get_job_execution, id='job_execution_id')
def job_executions_status(job_execution_id):
return u.to_wrapped_dict(
api.get_job_execution_status, job_execution_id)
@rest.get('/jobs/<job_execution_id>/cancel')
@acl.enforce("data-processing:job-executions:cancel")
@v.check_exists(api.get_job_execution, id='job_execution_id')
@v.validate(None, v_j_e.check_job_execution_cancel)
def job_executions_cancel(job_execution_id):
return u.to_wrapped_dict(api.cancel_job_execution, job_execution_id)
@rest.patch('/jobs/<job_execution_id>')
@acl.enforce("data-processing:job-executions:modify")
@v.check_exists(api.get_job_execution, id='job_execution_id')
@v.validate(
v_j_e_schema.JOB_EXEC_UPDATE_SCHEMA, v_j_e.check_job_execution_update)
def job_executions_update(job_execution_id, data):
return u.to_wrapped_dict(
api.update_job_execution, job_execution_id, data)
@rest.delete('/jobs/<job_execution_id>')
@acl.enforce("data-processing:job-executions:delete")
@v.check_exists(api.get_job_execution, id='job_execution_id')
@v.validate(None, v_j_e.check_job_execution_delete)
def job_executions_delete(job_execution_id):
api.delete_job_execution(job_execution_id)
return u.render()

View File

@ -0,0 +1,83 @@
# Copyright (c) 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.api import acl
from sahara.service.api.v2 import job_templates as api
from sahara.service.api.v2 import jobs as j_e_api
from sahara.service import validation as v
from sahara.service.validations.edp import job as v_j
from sahara.service.validations.edp import job_execution as v_j_e
from sahara.service.validations.edp import job_execution_schema as v_j_e_schema
from sahara.service.validations.edp import job_schema as v_j_schema
import sahara.utils.api as u
rest = u.RestV2('job-templates', __name__)
@rest.get('/job-templates')
@acl.enforce("data-processing:job-templates:get_all")
@v.check_exists(api.get_job_templates, 'marker')
@v.validate(None, v.validate_pagination_limit,
v.validate_sorting_jobs)
def job_templates_list():
result = api.get_job_templates(**u.get_request_args().to_dict())
return u.render(res=result, name='job_templates')
@rest.post('/job-templates')
@acl.enforce("data-processing:job-templates:create")
@v.validate(v_j_schema.JOB_SCHEMA, v_j.check_mains_libs, v_j.check_interface)
def job_templates_create(data):
return u.render(api.create_job_template(data).to_wrapped_dict())
@rest.get('/job-templates/<job_templates_id>')
@acl.enforce("data-processing:job-templates:get")
@v.check_exists(api.get_job_templates, id='job_templates_id')
def job_templates_get(job_templates_id):
return u.to_wrapped_dict(api.get_job_template, job_templates_id)
@rest.patch('/job-templates/<job_templates_id>')
@acl.enforce("data-processing:jobs:modify")
@v.check_exists(api.get_job_templates, id='job_templates_id')
@v.validate(v_j_schema.JOB_UPDATE_SCHEMA)
def job_templates_update(job_templates_id, data):
return u.to_wrapped_dict(api.update_job_template, job_templates_id, data)
@rest.delete('/job-templates/<job_templates_id>')
@acl.enforce("data-processing:jobs:delete")
@v.check_exists(api.get_job_templates, id='job_templates_id')
def job_templates_delete(job_templates_id):
api.delete_job_template(job_templates_id)
return u.render()
@rest.post('/job-templates/<job_templates_id>/execute')
@acl.enforce("data-processing:jobs:execute")
@v.check_exists(api.get_job_templates, id='job_templates_id')
@v.validate(v_j_e_schema.JOB_EXEC_SCHEMA, v_j_e.check_job_execution)
def job_templates_execute(job_templates_id, data):
return u.render(job_execution=j_e_api.execute_job(
job_templates_id, data).to_dict())
@rest.get('/job-templates/config-hints/<job_type>')
@acl.enforce("data-processing:jobs:get_config_hints")
@v.check_exists(api.get_job_config_hints, job_type='job_type')
def job_config_hints_get(job_type):
return u.render(api.get_job_config_hints(job_type))

View File

@ -13,70 +13,65 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.api import acl
from sahara.service.api.v2 import job_executions as j_e_api
from sahara.service.api.v2 import jobs as api
from sahara.service import validation as v
from sahara.service.validations.edp import job as v_j
from sahara.service.validations.edp import job_execution as v_j_e
from sahara.service.validations.edp import job_execution_schema as v_j_e_schema
from sahara.service.validations.edp import job_schema as v_j_schema
import sahara.utils.api as u
rest = u.RestV2('jobs', __name__)
@rest.get('/job-templates')
@acl.enforce("data-processing:jobs:get_all")
@v.check_exists(api.get_job, 'marker')
@rest.get('/jobs')
@acl.enforce("data-processing:job-executions:get_all")
@v.check_exists(api.get_job_execution, 'marker')
@v.validate(None, v.validate_pagination_limit,
v.validate_sorting_jobs)
def job_list():
result = api.get_jobs(**u.get_request_args().to_dict())
return u.render(res=result, name='job_templates')
v.validate_sorting_job_executions)
def job_executions_list():
result = api.job_execution_list(**u.get_request_args().to_dict())
return u.render(res=result, name='jobs')
@rest.post('/job-templates')
@acl.enforce("data-processing:jobs:create")
@v.validate(v_j_schema.JOB_SCHEMA, v_j.check_mains_libs, v_j.check_interface)
def job_create(data):
return u.render(api.create_job(data).to_wrapped_dict())
@rest.get('/jobs/<job_id>')
@acl.enforce("data-processing:job-executions:get")
@v.check_exists(api.get_job_execution, id='job_id')
def job_executions(job_id):
return u.to_wrapped_dict(api.get_job_execution, job_id)
@rest.get('/job-templates/<job_id>')
@acl.enforce("data-processing:jobs:get")
@v.check_exists(api.get_job, id='job_id')
def job_get(job_id):
return u.to_wrapped_dict(api.get_job, job_id)
@rest.get('/jobs/<job_id>/refresh-status')
@acl.enforce("data-processing:job-executions:refresh_status")
@v.check_exists(api.get_job_execution, id='job_id')
def job_executions_status(job_id):
return u.to_wrapped_dict(
api.get_job_execution_status, job_id)
@rest.patch('/job-templates/<job_id>')
@acl.enforce("data-processing:jobs:modify")
@v.check_exists(api.get_job, id='job_id')
@v.validate(v_j_schema.JOB_UPDATE_SCHEMA)
def job_update(job_id, data):
return u.to_wrapped_dict(api.update_job, job_id, data)
@rest.get('/jobs/<job_id>/cancel')
@acl.enforce("data-processing:job-executions:cancel")
@v.check_exists(api.get_job_execution, id='job_id')
@v.validate(None, v_j_e.check_job_execution_cancel)
def job_executions_cancel(job_id):
return u.to_wrapped_dict(api.cancel_job_execution, job_id)
@rest.delete('/job-templates/<job_id>')
@acl.enforce("data-processing:jobs:delete")
@v.check_exists(api.get_job, id='job_id')
def job_delete(job_id):
api.delete_job(job_id)
@rest.patch('/jobs/<job_id>')
@acl.enforce("data-processing:job-executions:modify")
@v.check_exists(api.get_job_execution, id='job_id')
@v.validate(
v_j_e_schema.JOB_EXEC_UPDATE_SCHEMA, v_j_e.check_job_execution_update)
def job_executions_update(job_id, data):
return u.to_wrapped_dict(
api.update_job_execution, job_id, data)
@rest.delete('/jobs/<job_id>')
@acl.enforce("data-processing:job-executions:delete")
@v.check_exists(api.get_job_execution, id='job_id')
@v.validate(None, v_j_e.check_job_execution_delete)
def job_executions_delete(job_id):
api.delete_job_execution(job_id)
return u.render()
@rest.post('/job-templates/<job_id>/execute')
@acl.enforce("data-processing:jobs:execute")
@v.check_exists(api.get_job, id='job_id')
@v.validate(v_j_e_schema.JOB_EXEC_SCHEMA, v_j_e.check_job_execution)
def job_execute(job_id, data):
return u.render(job_execution=j_e_api.execute_job(job_id, data).to_dict())
@rest.get('/job-templates/config-hints/<job_type>')
@acl.enforce("data-processing:jobs:get_config_hints")
@v.check_exists(api.get_job_config_hints, job_type='job_type')
def job_config_hints_get(job_type):
return u.render(api.get_job_config_hints(job_type))

View File

@ -1,109 +0,0 @@
# Copyright (c) 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from sahara import conductor as c
from sahara import context
from sahara import exceptions as ex
from sahara.i18n import _LE
from sahara.service import api
from sahara.service.edp import job_manager as manager
from sahara.utils import edp
from sahara.utils import proxy as p
conductor = c.API
LOG = logging.getLogger(__name__)
def execute_job(job_id, data):
# Elements common to all job types
cluster_id = data['cluster_id']
configs = data.get('job_configs', {})
interface = data.get('interface', {})
# Not in Java job types but present for all others
input_id = data.get('input_id', None)
output_id = data.get('output_id', None)
# Since we will use a unified class in the database, we pass
# a superset for all job types
# example configs['start'] = '2015-05-12T08:55Z' frequency = 5 mins
# the job will starts from 2015-05-12T08:55Z, runs every 5 mins
job_execution_info = data.get('job_execution_info', {})
configs['job_execution_info'] = job_execution_info
job_ex_dict = {'input_id': input_id, 'output_id': output_id,
'job_id': job_id, 'cluster_id': cluster_id,
'info': {'status': edp.JOB_STATUS_PENDING},
'job_configs': configs, 'extra': {},
'interface': interface}
job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict)
context.set_current_job_execution_id(job_execution.id)
# check to use proxy user
if p.job_execution_requires_proxy_user(job_execution):
try:
p.create_proxy_user_for_job_execution(job_execution)
except ex.SaharaException as e:
LOG.error(_LE("Can't run job execution. "
"(Reasons: {reason})").format(reason=e))
conductor.job_execution_destroy(context.ctx(), job_execution)
raise e
api.OPS.run_edp_job(job_execution.id)
return job_execution
def get_job_execution_status(id):
return manager.get_job_status(id)
def job_execution_list(**kwargs):
return conductor.job_execution_get_all(context.ctx(),
regex_search=True, **kwargs)
def get_job_execution(id):
return conductor.job_execution_get(context.ctx(), id)
def cancel_job_execution(id):
context.set_current_job_execution_id(id)
job_execution = conductor.job_execution_get(context.ctx(), id)
api.OPS.cancel_job_execution(id)
return job_execution
def update_job_execution(id, values):
_update_status(values.pop("info", None))
return conductor.job_execution_update(context.ctx(), id, values)
def _update_status(info):
if info:
status = info.get("status", None)
if status == edp.JOB_ACTION_SUSPEND:
api.OPS.job_execution_suspend(id)
def delete_job_execution(id):
context.set_current_job_execution_id(id)
api.OPS.delete_job_execution(id)

View File

@ -0,0 +1,45 @@
# Copyright (c) 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara import conductor as c
from sahara import context
from sahara.service.edp import job_manager as manager
conductor = c.API
def get_job_templates(**kwargs):
return conductor.job_get_all(context.ctx(), regex_search=True, **kwargs)
def get_job_template(id):
return conductor.job_get(context.ctx(), id)
def create_job_template(values):
return conductor.job_create(context.ctx(), values)
def update_job_template(id, values):
return conductor.job_update(context.ctx(), id, values)
def delete_job_template(job_id):
return conductor.job_destroy(context.ctx(), job_id)
def get_job_config_hints(job_type):
return manager.get_job_config_hints(job_type)

View File

@ -13,33 +13,97 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from sahara import conductor as c
from sahara import context
from sahara import exceptions as ex
from sahara.i18n import _LE
from sahara.service import api
from sahara.service.edp import job_manager as manager
from sahara.utils import edp
from sahara.utils import proxy as p
conductor = c.API
LOG = logging.getLogger(__name__)
def get_jobs(**kwargs):
return conductor.job_get_all(context.ctx(), regex_search=True, **kwargs)
def execute_job(job_id, data):
# Elements common to all job types
cluster_id = data['cluster_id']
configs = data.get('job_configs', {})
interface = data.get('interface', {})
# Not in Java job types but present for all others
input_id = data.get('input_id', None)
output_id = data.get('output_id', None)
# Since we will use a unified class in the database, we pass
# a superset for all job types
# example configs['start'] = '2015-05-12T08:55Z' frequency = 5 mins
# the job will starts from 2015-05-12T08:55Z, runs every 5 mins
job_execution_info = data.get('job_execution_info', {})
configs['job_execution_info'] = job_execution_info
job_ex_dict = {'input_id': input_id, 'output_id': output_id,
'job_id': job_id, 'cluster_id': cluster_id,
'info': {'status': edp.JOB_STATUS_PENDING},
'job_configs': configs, 'extra': {},
'interface': interface}
job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict)
context.set_current_job_execution_id(job_execution.id)
# check to use proxy user
if p.job_execution_requires_proxy_user(job_execution):
try:
p.create_proxy_user_for_job_execution(job_execution)
except ex.SaharaException as e:
LOG.error(_LE("Can't run job execution. "
"(Reasons: {reason})").format(reason=e))
conductor.job_execution_destroy(context.ctx(), job_execution)
raise e
api.OPS.run_edp_job(job_execution.id)
return job_execution
def get_job(id):
return conductor.job_get(context.ctx(), id)
def get_job_execution_status(id):
return manager.get_job_status(id)
def create_job(values):
return conductor.job_create(context.ctx(), values)
def job_execution_list(**kwargs):
return conductor.job_execution_get_all(context.ctx(),
regex_search=True, **kwargs)
def update_job(id, values):
return conductor.job_update(context.ctx(), id, values)
def get_job_execution(id):
return conductor.job_execution_get(context.ctx(), id)
def delete_job(job_id):
return conductor.job_destroy(context.ctx(), job_id)
def cancel_job_execution(id):
context.set_current_job_execution_id(id)
job_execution = conductor.job_execution_get(context.ctx(), id)
api.OPS.cancel_job_execution(id)
return job_execution
def get_job_config_hints(job_type):
return manager.get_job_config_hints(job_type)
def update_job_execution(id, values):
_update_status(values.pop("info", None))
return conductor.job_execution_update(context.ctx(), id, values)
def _update_status(info):
if info:
status = info.get("status", None)
if status == edp.JOB_ACTION_SUSPEND:
api.OPS.job_execution_suspend(id)
def delete_job_execution(id):
context.set_current_job_execution_id(id)
api.OPS.delete_job_execution(id)

View File

@ -100,7 +100,7 @@ def check_scheduled_job_execution_info(job_execution_info):
"Job start time should be later than now"))
def check_job_execution(data, job_id):
def check_job_execution(data, job_templates_id):
ctx = context.ctx()
job_execution_info = data.get('job_execution_info', {})
@ -110,7 +110,7 @@ def check_job_execution(data, job_id):
_("Cluster with id '%s' doesn't exist") % data['cluster_id'])
val_base.check_plugin_labels(cluster.plugin_name, cluster.hadoop_version)
job = conductor.job_get(ctx, job_id)
job = conductor.job_get(ctx, job_templates_id)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
edp_engine = plugin.get_edp_engine(cluster, job.type)
@ -140,41 +140,41 @@ def check_data_sources(data, job):
b.check_data_sources_are_different(data['input_id'], data['output_id'])
def check_job_execution_cancel(job_execution_id, **kwargs):
def check_job_execution_cancel(job_id, **kwargs):
ctx = context.current()
je = conductor.job_execution_get(ctx, job_execution_id)
je = conductor.job_execution_get(ctx, job_id)
if je.tenant_id != ctx.tenant_id:
raise ex.CancelingFailed(
_("Job execution with id '%s' cannot be canceled "
"because it wasn't created in this tenant")
% job_execution_id)
% job_id)
if je.is_protected:
raise ex.CancelingFailed(
_("Job Execution with id '%s' cannot be canceled "
"because it's marked as protected") % job_execution_id)
"because it's marked as protected") % job_id)
def check_job_execution_delete(job_execution_id, **kwargs):
def check_job_execution_delete(job_id, **kwargs):
ctx = context.current()
je = conductor.job_execution_get(ctx, job_execution_id)
je = conductor.job_execution_get(ctx, job_id)
acl.check_tenant_for_delete(ctx, je)
acl.check_protected_from_delete(je)
def check_job_execution_update(job_execution_id, data, **kwargs):
def check_job_execution_update(job_id, data, **kwargs):
ctx = context.current()
je = conductor.job_execution_get(ctx, job_execution_id)
je = conductor.job_execution_get(ctx, job_id)
acl.check_tenant_for_update(ctx, je)
acl.check_protected_from_update(je, data)
def check_job_status_update(job_execution_id, data):
def check_job_status_update(job_id, data):
ctx = context.ctx()
job_execution = conductor.job_execution_get(ctx, job_execution_id)
job_execution = conductor.job_execution_get(ctx, job_id)
# check we are updating status
if 'info' in data:
if len(data) != 1:
@ -184,8 +184,8 @@ def check_job_status_update(job_execution_id, data):
raise ex.InvalidDataException(
_("Suspending operation can not be performed on an inactive or "
"non-existent cluster"))
job_id = conductor.job_execution_get(ctx, job_execution_id).job_id
job_type = conductor.job_get(ctx, job_id).type
job_templates_id = conductor.job_execution_get(ctx, job_id).job_id
job_type = conductor.job_get(ctx, job_templates_id).type
engine = j_u.get_plugin(cluster).get_edp_engine(cluster, job_type)
if 'info' in data:
if data.info['status'] == edp.JOB_ACTION_SUSPEND: