Support pagination for job list operations

1. What is the problem
The job list operations will retrieve all the items in
the database, which will consume too much memory and take long time
to response when the results are considerably large.

2. What is the solution for the problem
To reduce load on the service, list operations will return a maximum
number of items at a time by pagination. To navigate the collection,
the parameters limit and marker can be set in the URI. For example:

/v1.0/jobs?limit=2000&marker=500

The marker parameter is the ID of the last item in the previous list.
The limit parameter sets the page size. These parameters are optional.
If the client requests a limit beyond the maximum limit configured by
the deployment, the server returns the maximum limit number of items.
Pagination and filtering can work together for job list operations.

3. What the features need to be implemented to the Tricircle to
realize the solution
Add pagination feature for job list operations.

Change-Id: I95168a547ac88d8a680102acaac1bdda6dde0733
This commit is contained in:
Fangming Liu 2017-07-05 09:59:58 +08:00
parent 300cb93579
commit e690fca534
5 changed files with 237 additions and 61 deletions

88
tricircle/api/controllers/job.py Normal file → Executable file
View File

@ -200,23 +200,81 @@ class AsyncJobController(rest.RestController):
return utils.format_api_error(
403, _('Unauthorized to show all jobs'))
is_valid_filter, filters = self._get_filters(kwargs)
if not is_valid_filter:
msg = (_('Unsupported filter type: %(filters)s') % {
'filters': ', '.join([filter_name for filter_name in filters])
})
return utils.format_api_error(400, msg)
filters = [{'key': key,
'comparator': 'eq',
'value': value} for key, value in six.iteritems(filters)]
# check limit and marker, default value -1 means no pagination
_limit = kwargs.pop('limit', -1)
try:
jobs_in_job_table = db_api.list_jobs(context, filters)
jobs_in_job_log_table = db_api.list_jobs_from_log(context, filters)
jobs = jobs_in_job_table + jobs_in_job_log_table
return {'jobs': [self._get_more_readable_job(job) for job in jobs]}
limit = int(_limit)
limit = utils.get_pagination_limit(limit)
except ValueError as e:
LOG.exception('Failed to convert pagination limit to an integer: '
'%(exception)s ', {'exception': e})
msg = (_("Limit should be an integer or a valid literal "
"for int() rather than '%s'") % _limit)
return utils.format_api_error(400, msg)
marker = kwargs.pop('marker', None)
sorts = [('id', 'asc')]
if kwargs:
is_valid_filter, filters = self._get_filters(kwargs)
if not is_valid_filter:
msg = (_('Unsupported filter type: %(filters)s') % {
'filters': ', '.join(
[filter_name for filter_name in filters])
})
return utils.format_api_error(400, msg)
filters = [{'key': key, 'comparator': 'eq', 'value': value}
for key, value in six.iteritems(filters)]
else:
filters = None
try:
if marker is not None:
try:
# verify whether the marker is effective
db_api.get_job(context, marker)
jobs = db_api.list_jobs(context, filters,
sorts, limit, marker)
jobs_from_log = []
if len(jobs) < limit:
jobs_from_log = db_api.list_jobs_from_log(
context, filters, sorts, limit - len(jobs), None)
job_collection = jobs + jobs_from_log
except t_exc.ResourceNotFound:
try:
db_api.get_job_from_log(context, marker)
jobs_from_log = db_api.list_jobs_from_log(
context, filters, sorts, limit, marker)
job_collection = jobs_from_log
except t_exc.ResourceNotFound:
msg = (_('Invalid marker: %(marker)s')
% {'marker': marker})
return utils.format_api_error(400, msg)
else:
jobs = db_api.list_jobs(context, filters,
sorts, limit, marker)
jobs_from_log = []
if len(jobs) < limit:
jobs_from_log = db_api.list_jobs_from_log(
context, filters, sorts, limit - len(jobs), None)
job_collection = jobs + jobs_from_log
# add link
links = []
if len(job_collection) >= limit:
marker = job_collection[-1]['id']
base = constants.JOB_PATH
link = "%s?limit=%s&marker=%s" % (base, limit, marker)
links.append({"rel": "next",
"href": link})
result = {'jobs': [self._get_more_readable_job(job)
for job in job_collection]}
if links:
result['jobs_links'] = links
return result
except Exception as e:
LOG.exception('Failed to show all asynchronous jobs: '
'%(exception)s ', {'exception': e})

View File

@ -190,5 +190,6 @@ job_primary_resource_map = {
JT_RESOURCE_RECYCLE: (None, "project_id")
}
# resource routing request path
# admin API request path
ROUTING_PATH = '/v1.0/routings'
JOB_PATH = '/v1.0/jobs'

View File

@ -428,33 +428,6 @@ def get_latest_failed_or_new_jobs(context):
return failed_jobs, new_jobs
def list_jobs(context, filters=None, sorts=None):
with context.session.begin():
# get all jobs from job table
jobs = core.query_resource(context, models.AsyncJob,
filters or [], sorts or [])
return jobs
def list_jobs_from_log(context, filters=None, sorts=None):
with context.session.begin():
# get all jobs from job log table, because the job log table only
# stores successful jobs, so this method merely returns successful jobs
if filters is not None:
for filter in filters:
if filter.get('key') == 'status':
job_status = filter['value']
# job entry in job log table has no status attribute.
if job_status == constants.JS_Success:
filters.remove(filter)
else:
return []
jobs_in_log = core.query_resource(
context, models.AsyncJobLog, filters or [], sorts or [])
return jobs_in_log
def get_job(context, job_id):
with context.session.begin():
return core.get_resource(context, models.AsyncJob, job_id)
@ -470,6 +443,37 @@ def delete_job(context, job_id):
return core.delete_resource(context, models.AsyncJob, job_id)
def list_jobs(context, filters=None, sorts=None, limit=None, marker=None):
with context.session.begin():
return core.paginate_query(
context, models.AsyncJob, limit,
models.AsyncJob(id=marker) if marker else None,
filters or [], sorts or [])
def list_jobs_from_log(context, filters=None, sorts=None,
limit=None, marker=None):
with context.session.begin():
filter_is_success = True
if filters is not None and len(filters) > 0:
for filter in filters:
if filter.get('key') == 'status':
job_status = filter['value']
# job entry in job log table has no
# status attribute.
if job_status == constants.JS_Success:
filters.remove(filter)
else:
filter_is_success = False
break
if filter_is_success:
return core.paginate_query(context, models.AsyncJobLog, limit,
models.AsyncJobLog(
id=marker) if marker else None,
filters or [], sorts or [])
return []
def get_latest_job(context, status, _type, resource_id):
jobs = core.query_resource(
context, models.AsyncJob,

27
tricircle/tests/functional/api/controllers/test_job.py Normal file → Executable file
View File

@ -263,24 +263,14 @@ class TestAsyncJobController(API_FunctionalTest):
for job_type in self.all_job_types:
job = self._prepare_job_element(job_type)
jobs = [
{
"job": job,
"expected_error": 200
},
]
job = {"job": job, "expected_error": 200}
self._test_and_check(jobs)
back_jobid = self._test_and_obtain_id(job)
response = self.app.get('/v1.0/jobs')
return_job = response.json
all_job_ids[index] = return_job['jobs'][index]['id']
all_job_project_ids[job_type] = (
return_job['jobs'][index]['project_id'])
all_job_ids[index] = back_jobid
all_job_project_ids[job_type] = job['job']['project_id']
index = index + 1
service_uris = ['jobs', 'jobs/detail']
amount_of_all_jobs = len(self.all_job_types)
# with no filters all jobs are returned
@ -729,6 +719,15 @@ class TestAsyncJobController(API_FunctionalTest):
expect_errors=True)
self.assertEqual(response.status_int, test_job['expected_error'])
def _test_and_obtain_id(self, job):
response = self.app.post_json(
'/v1.0/jobs', dict(job=job['job']),
expect_errors=True)
self.assertEqual(response.status_int, job['expected_error'])
back_job = response.json
return back_job['job']['id']
def _prepare_job_element(self, job_type):
# in order to create a job, we need three elements: job type,
# job resource and project id.

122
tricircle/tests/unit/api/controllers/test_job.py Normal file → Executable file
View File

@ -13,13 +13,15 @@
import copy
import mock
from mock import patch
from oslo_config import cfg
from oslo_utils import timeutils
from oslo_utils import uuidutils
import re
from six.moves import xrange
import unittest
import pecan
from tricircle.api import app
from tricircle.api.controllers import job
from tricircle.common import constants
from tricircle.common import context
@ -28,6 +30,7 @@ from tricircle.common import xrpcapi
from tricircle.db import api as db_api
from tricircle.db import core
from tricircle.db import models
from tricircle.tests import base
class FakeRPCAPI(xrpcapi.XJobAPI):
@ -47,8 +50,12 @@ class FakeResponse(object):
return super(FakeResponse, cls).__new__(cls)
class AsyncJobControllerTest(unittest.TestCase):
class AsyncJobControllerTest(base.TestCase):
def setUp(self):
super(AsyncJobControllerTest, self).setUp()
cfg.CONF.clear()
cfg.CONF.register_opts(app.common_opts)
core.initialize()
core.ModelBase.metadata.create_all(core.get_engine())
self.controller = FakeAsyncJobController()
@ -177,7 +184,7 @@ class AsyncJobControllerTest(unittest.TestCase):
@patch.object(pecan, 'response', new=FakeResponse)
@patch.object(context, 'extract_context_from_environ')
def test_get_one(self, mock_context):
def test_get_one_and_get_all(self, mock_context):
mock_context.return_value = self.context
# failure case, only admin can list the job's info
@ -251,15 +258,24 @@ class AsyncJobControllerTest(unittest.TestCase):
job_status_filter_1 = {'status': 'success'}
jobs_3 = self.controller.get_one("detail", **job_status_filter_1)
self.assertEqual(amount_of_succ_jobs, len(jobs_3['jobs']))
# set marker in job log
res = self.controller.get_all(marker=jobs_3['jobs'][0]['id'],
limit=amount_of_succ_jobs)
self.assertEqual(amount_of_succ_jobs - 1, len(res['jobs']))
job_status_filter_2 = {'status': 'new'}
jobs_4 = self.controller.get_one("detail", **job_status_filter_2)
self.assertEqual(amount_of_all_jobs - amount_of_succ_jobs,
len(jobs_4['jobs']))
# set marker in job
res = self.controller.get_all(marker=jobs_4['jobs'][0]['id'],
limit=amount_of_all_jobs)
self.assertEqual(amount_of_all_jobs - 1,
len(res['jobs']))
@patch.object(pecan, 'response', new=FakeResponse)
@patch.object(context, 'extract_context_from_environ')
def test_get_all_jobs(self, mock_context):
def test_get_all_jobs_with_pagination(self, mock_context):
mock_context.return_value = self.context
# map job type to project id for later project id filter validation.
@ -377,6 +393,101 @@ class AsyncJobControllerTest(unittest.TestCase):
self.assertEqual(amount_of_running_jobs,
len(jobs_job_status_filter_3['jobs']))
# test for paginate query
job_paginate_no_filter_1 = self.controller.get_all()
self.assertEqual(amount_of_all_jobs,
len(job_paginate_no_filter_1['jobs']))
# no limit no marker
job_paginate_filter_1 = {'status': 'new'}
jobs_paginate_filter_1 = self.controller.get_all(
**job_paginate_filter_1)
self.assertEqual(amount_of_all_jobs - amount_of_running_jobs,
len(jobs_paginate_filter_1['jobs']))
# failed cases, unsupported limit type
job_paginate_filter_2 = {'limit': '2test'}
res = self.controller.get_all(**job_paginate_filter_2)
self._validate_error_code(res, 400)
# successful cases
job_paginate_filter_4 = {'status': 'new', 'limit': '2'}
res = self.controller.get_all(**job_paginate_filter_4)
self.assertEqual(2, len(res['jobs']))
job_paginate_filter_5 = {'status': 'new', 'limit': 2}
res = self.controller.get_all(**job_paginate_filter_5)
self.assertEqual(2, len(res['jobs']))
job_paginate_filter_6 = {'status': 'running', 'limit': 1}
res1 = self.controller.get_all(**job_paginate_filter_6)
marker = res1['jobs'][0]['id']
job_paginate_filter_7 = {'status': 'running', 'marker': marker}
res2 = self.controller.get_all(**job_paginate_filter_7)
self.assertEqual(amount_of_running_jobs-1, len(res2['jobs']))
job_paginate_filter_8 = {'status': 'new', 'limit': 3}
res = self.controller.get_all(**job_paginate_filter_8)
self.assertLessEqual(res['jobs'][0]['id'], res['jobs'][1]['id'])
self.assertLessEqual(res['jobs'][1]['id'], res['jobs'][2]['id'])
# unsupported marker type
res = self.controller.get_all(marker=None)
self.assertEqual(amount_of_all_jobs, len(res['jobs']))
res = self.controller.get_all(marker='-123')
self._validate_error_code(res, 400)
# marker not in job table and job log table
job_paginate_filter_9 = {'marker': uuidutils.generate_uuid()}
res = self.controller.get_all(**job_paginate_filter_9)
self._validate_error_code(res, 400)
# test marker and limit
limit = 2
assert_count = 0
pt = '/v1.0/jobs\?limit=\w+&marker=([\w-]+)'
job_paginate_filter = {'status': 'new', 'limit': limit}
res = self.controller.get_all(**job_paginate_filter)
assert_count = assert_count + len(res['jobs'])
while 'jobs_links' in res:
m = re.match(pt, res['jobs_links'][0]['href'])
marker = m.group(1)
self.assertEqual(limit, len(res['jobs']))
job_paginate_filter = {'status': 'new', 'limit': limit,
'marker': marker}
res = self.controller.get_all(**job_paginate_filter)
assert_count = assert_count + len(res['jobs'])
self.assertEqual(amount_of_all_jobs - amount_of_running_jobs,
assert_count)
job_paginate_filter_10 = {'status': 'running'}
res = self.controller.get_all(**job_paginate_filter_10)
self.assertEqual(amount_of_running_jobs, len(res['jobs']))
# add some rows to job log table
for i in xrange(amount_of_running_jobs):
db_api.finish_job(self.context, res['jobs'][i]['id'], True,
timeutils.utcnow())
res_success_log = db_api.list_jobs_from_log(self.context, None,
[('id', 'asc')])
self.assertEqual(amount_of_running_jobs, len(res_success_log))
marker_id = res_success_log[0]['id']
res = self.controller.get_all(marker=marker_id)
self.assertEqual(2, len(res['jobs']))
res_in_job = db_api.list_jobs(self.context, None, [('id', 'asc')])
self.assertEqual(amount_of_all_jobs - amount_of_running_jobs,
len(res_in_job))
marker_id = res_in_job[0]['id']
res = self.controller.get_all(marker=marker_id)
self.assertEqual(amount_of_all_jobs - 1, len(res['jobs']))
job_paginate_filter_11 = {'limit': 2}
res = self.controller.get_all(**job_paginate_filter_11)
self.assertIsNotNone(res['jobs_links'][0]['href'])
@patch.object(pecan, 'response', new=FakeResponse)
@patch.object(pecan, 'response', new=mock.Mock)
@patch.object(context, 'extract_context_from_environ')
@ -614,4 +725,7 @@ class AsyncJobControllerTest(unittest.TestCase):
self.assertEqual(res[list(res.keys())[0]]['code'], code)
def tearDown(self):
cfg.CONF.unregister_opts(app.common_opts)
core.ModelBase.metadata.drop_all(core.get_engine())
super(AsyncJobControllerTest, self).tearDown()