tricircle/tricircle/api/controllers/job.py

433 lines
18 KiB
Python
Executable File

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pecan
from pecan import expose
from pecan import rest
import six
from oslo_log import log as logging
from oslo_utils import timeutils
from tricircle.common import constants
import tricircle.common.context as t_context
import tricircle.common.exceptions as t_exc
from tricircle.common.i18n import _
from tricircle.common import policy
from tricircle.common import utils
from tricircle.common import xrpcapi
from tricircle.db import api as db_api
LOG = logging.getLogger(__name__)
class AsyncJobController(rest.RestController):
# with AsyncJobController, admin can create, show, delete and
# redo asynchronous jobs
def __init__(self):
self.xjob_handler = xrpcapi.XJobAPI()
@expose(generic=True, template='json')
def post(self, **kw):
context = t_context.extract_context_from_environ()
job_resource_map = constants.job_resource_map
if not policy.enforce(context, policy.ADMIN_API_JOB_CREATE):
return utils.format_api_error(
403, _("Unauthorized to create a job"))
if 'job' not in kw:
return utils.format_api_error(
400, _("Request body not found"))
job = kw['job']
for field in ('type', 'project_id'):
value = job.get(field)
if value is None:
return utils.format_api_error(
400, _("%(field)s isn't provided in request body") % {
'field': field})
elif len(value.strip()) == 0:
return utils.format_api_error(
400, _("%(field)s can't be empty") % {'field': field})
if job['type'] not in job_resource_map.keys():
return utils.format_api_error(
400, _('There is no such job type: %(job_type)s') % {
'job_type': job['type']})
job_type = job['type']
project_id = job['project_id']
if 'resource' not in job:
return utils.format_api_error(
400, _('Failed to create job, because the resource is not'
' specified'))
# verify that all given resources are exactly needed
request_fields = set(job['resource'].keys())
require_fields = set([resource_id
for resource_type, resource_id in
job_resource_map[job_type]])
missing_fields = require_fields - request_fields
redundant_fields = request_fields - require_fields
if missing_fields:
return utils.format_api_error(
400, _('Some required fields are not specified:'
' %(field)s') % {'field': missing_fields})
if redundant_fields:
return utils.format_api_error(
400, _('Some fields are redundant: %(field)s') % {
'field': redundant_fields})
# validate whether the project id is legal
resource_type_1, resource_id_1 = (
constants.job_primary_resource_map[job_type])
if resource_type_1 is not None:
filter = [{'key': 'project_id', 'comparator': 'eq',
'value': project_id},
{'key': 'resource_type', 'comparator': 'eq',
'value': resource_type_1},
{'key': 'top_id', 'comparator': 'eq',
'value': job['resource'][resource_id_1]}]
routings = db_api.list_resource_routings(context, filter)
if not routings:
msg = (_("%(resource)s %(resource_id)s doesn't belong to the"
" project %(project_id)s") %
{'resource': resource_type_1,
'resource_id': job['resource'][resource_id_1],
'project_id': project_id})
return utils.format_api_error(400, msg)
# if job_type = seg_rule_setup, we should ensure the project id
# is equal to the one from resource.
if job_type in (constants.JT_SEG_RULE_SETUP,
constants.JT_RESOURCE_RECYCLE):
if job['project_id'] != job['resource']['project_id']:
msg = (_("Specified project_id %(project_id_1)s and resource's"
" project_id %(project_id_2)s are different") %
{'project_id_1': job['project_id'],
'project_id_2': job['resource']['project_id']})
return utils.format_api_error(400, msg)
# combine uuid into target resource id
resource_id = '#'.join([job['resource'][resource_id]
for resource_type, resource_id
in job_resource_map[job_type]])
try:
# create a job and put it into execution immediately
self.xjob_handler.invoke_method(context, project_id,
constants.job_handles[job_type],
job_type, resource_id)
except Exception as e:
LOG.exception('Failed to create job: '
'%(exception)s ', {'exception': e})
return utils.format_api_error(
500, _('Failed to create a job'))
new_job = db_api.get_latest_job(context, constants.JS_New, job_type,
resource_id)
return {'job': self._get_more_readable_job(new_job)}
@expose(generic=True, template='json')
def get_one(self, id, **kwargs):
"""the return value may vary according to the value of id
:param id: 1) if id = 'schemas', return job schemas
2) if id = 'detail', return all jobs
3) if id = $job_id, return detailed single job info
:return: return value is decided by id parameter
"""
context = t_context.extract_context_from_environ()
job_resource_map = constants.job_resource_map
if not policy.enforce(context, policy.ADMIN_API_JOB_SCHEMA_LIST):
return utils.format_api_error(
403, _('Unauthorized to show job information'))
if id == 'schemas':
job_schemas = []
for job_type in job_resource_map.keys():
job = {}
resource = []
for resource_type, resource_id in job_resource_map[job_type]:
resource.append(resource_id)
job['resource'] = resource
job['type'] = job_type
job_schemas.append(job)
return {'schemas': job_schemas}
if id == 'detail':
return self.get_all(**kwargs)
try:
job = db_api.get_job(context, id)
return {'job': self._get_more_readable_job(job)}
except Exception:
try:
job = db_api.get_job_from_log(context, id)
return {'job': self._get_more_readable_job(job)}
except t_exc.ResourceNotFound:
return utils.format_api_error(
404, _('Resource not found'))
@expose(generic=True, template='json')
def get_all(self, **kwargs):
"""Get all the jobs. Using filters, only get a subset of jobs.
:param kwargs: job filters
:return: a list of jobs
"""
context = t_context.extract_context_from_environ()
if not policy.enforce(context, policy.ADMIN_API_JOB_LIST):
return utils.format_api_error(
403, _('Unauthorized to show all jobs'))
# check limit and marker, default value -1 means no pagination
_limit = kwargs.pop('limit', -1)
try:
limit = int(_limit)
limit = utils.get_pagination_limit(limit)
except ValueError as e:
LOG.exception('Failed to convert pagination limit to an integer: '
'%(exception)s ', {'exception': e})
msg = (_("Limit should be an integer or a valid literal "
"for int() rather than '%s'") % _limit)
return utils.format_api_error(400, msg)
marker = kwargs.pop('marker', None)
sorts = [('timestamp', 'desc'), ('id', 'desc')]
is_valid_filter, filters = self._get_filters(kwargs)
if not is_valid_filter:
msg = (_('Unsupported filter type: %(filters)s') % {
'filters': ', '.join(
[filter_name for filter_name in filters])
})
return utils.format_api_error(400, msg)
# project ID from client should be equal to the one from
# context, since only the project ID in which the user
# is authorized will be used as the filter.
filters['project_id'] = context.project_id
filters = [{'key': key, 'comparator': 'eq', 'value': value}
for key, value in six.iteritems(filters)]
try:
if marker is not None:
try:
# verify whether the marker is effective
db_api.get_job(context, marker)
jobs = db_api.list_jobs(context, filters,
sorts, limit, marker)
jobs_from_log = []
if len(jobs) < limit:
jobs_from_log = db_api.list_jobs_from_log(
context, filters, sorts, limit - len(jobs), None)
job_collection = jobs + jobs_from_log
except t_exc.ResourceNotFound:
try:
db_api.get_job_from_log(context, marker)
jobs_from_log = db_api.list_jobs_from_log(
context, filters, sorts, limit, marker)
job_collection = jobs_from_log
except t_exc.ResourceNotFound:
msg = (_('Invalid marker: %(marker)s')
% {'marker': marker})
return utils.format_api_error(400, msg)
else:
jobs = db_api.list_jobs(context, filters,
sorts, limit, marker)
jobs_from_log = []
if len(jobs) < limit:
jobs_from_log = db_api.list_jobs_from_log(
context, filters, sorts, limit - len(jobs), None)
job_collection = jobs + jobs_from_log
# add link
links = []
if len(job_collection) >= limit:
marker = job_collection[-1]['id']
base = constants.JOB_PATH
link = "%s?limit=%s&marker=%s" % (base, limit, marker)
links.append({"rel": "next",
"href": link})
result = {'jobs': [self._get_more_readable_job(job)
for job in job_collection]}
if links:
result['jobs_links'] = links
return result
except Exception as e:
LOG.exception('Failed to show all asynchronous jobs: '
'%(exception)s ', {'exception': e})
return utils.format_api_error(
500, _('Failed to show all asynchronous jobs'))
# make the job status and resource id more human readable. Split
# resource id into several member uuid(s) to provide more detailed resource
# information. If job entry is from job table, then remove resource id
# and extra id from job attributes. If job entry is from job log table,
# only remove resource id from job attributes.
def _get_more_readable_job(self, job):
job_resource_map = constants.job_resource_map
if 'status' in job:
job['status'] = constants.job_status_map[job['status']]
else:
job['status'] = constants.job_status_map[constants.JS_Success]
job['resource'] = dict(zip([resource_id
for resource_type, resource_id
in job_resource_map[job['type']]],
job['resource_id'].split('#')))
job.pop('resource_id')
if "extra_id" in job:
job.pop('extra_id')
return job
def _get_filters(self, params):
"""Return a dictionary of query param filters from the request.
:param params: the URI params coming from the wsgi layer
:return: (flag, filters), flag indicates whether the filters are valid,
and the filters denote a list of key-value pairs.
"""
filters = {}
unsupported_filters = {}
for filter_name in params:
if filter_name in constants.JOB_LIST_SUPPORTED_FILTERS:
# map filter name
if filter_name == 'status':
job_status_in_db = self._get_job_status_in_db(
params.get(filter_name))
filters[filter_name] = job_status_in_db
continue
filters[filter_name] = params.get(filter_name)
else:
unsupported_filters[filter_name] = params.get(filter_name)
if unsupported_filters:
return False, unsupported_filters
return True, filters
# map user input job status to job status stored in database
def _get_job_status_in_db(self, job_status):
job_status_map = {
'fail': constants.JS_Fail,
'success': constants.JS_Success,
'running': constants.JS_Running,
'new': constants.JS_New
}
job_status_lower = job_status.lower()
if job_status_lower in job_status_map:
return job_status_map[job_status_lower]
return job_status
@expose(generic=True, template='json')
def delete(self, job_id):
# delete a job from the database. If the job is running, the delete
# operation will fail. In other cases, job will be deleted directly.
context = t_context.extract_context_from_environ()
if not policy.enforce(context, policy.ADMIN_API_JOB_DELETE):
return utils.format_api_error(
403, _('Unauthorized to delete a job'))
try:
db_api.get_job_from_log(context, job_id)
return utils.format_api_error(
400, _('Job %(job_id)s is from job log') % {'job_id': job_id})
except Exception:
try:
job = db_api.get_job(context, job_id)
except t_exc.ResourceNotFound:
return utils.format_api_error(
404, _('Job %(job_id)s not found') % {'job_id': job_id})
try:
# if job status = RUNNING, notify user this new one, delete
# operation fails.
if job['status'] == constants.JS_Running:
return utils.format_api_error(
400, (_('Failed to delete the running job %(job_id)s') %
{"job_id": job_id}))
# if job status = SUCCESS, move the job entry to job log table,
# then delete it from job table.
elif job['status'] == constants.JS_Success:
db_api.finish_job(context, job_id, True, timeutils.utcnow())
pecan.response.status = 200
return {}
db_api.delete_job(context, job_id)
pecan.response.status = 200
return {}
except Exception as e:
LOG.exception('Failed to delete the job: '
'%(exception)s ', {'exception': e})
return utils.format_api_error(
500, _('Failed to delete the job'))
@expose(generic=True, template='json')
def put(self, job_id):
# we use HTTP/HTTPS PUT method to redo a job. Regularly PUT method
# requires a request body, but considering the job redo operation
# doesn't need more information other than job id, we will issue
# this request without a request body.
context = t_context.extract_context_from_environ()
if not policy.enforce(context, policy.ADMIN_API_JOB_REDO):
return utils.format_api_error(
403, _('Unauthorized to redo a job'))
try:
db_api.get_job_from_log(context, job_id)
return utils.format_api_error(
400, _('Job %(job_id)s is from job log') % {'job_id': job_id})
except Exception:
try:
job = db_api.get_job(context, job_id)
except t_exc.ResourceNotFound:
return utils.format_api_error(
404, _('Job %(job_id)s not found') % {'job_id': job_id})
try:
# if status = RUNNING, notify user this new one and then exit
if job['status'] == constants.JS_Running:
return utils.format_api_error(
400, (_("Can't redo job %(job_id)s which is running") %
{'job_id': job['id']}))
# if status = SUCCESS, notify user this new one and then exit
elif job['status'] == constants.JS_Success:
msg = (_("Can't redo job %(job_id)s which had run successfully"
) % {'job_id': job['id']})
return utils.format_api_error(400, msg)
# if job status = FAIL or job status = NEW, redo it immediately
self.xjob_handler.invoke_method(context, job['project_id'],
constants.job_handles[job['type']],
job['type'], job['resource_id'])
except Exception as e:
LOG.exception('Failed to redo the job: '
'%(exception)s ', {'exception': e})
return utils.format_api_error(
500, _('Failed to redo the job'))