From 16876649e5ffb86be4bdfb40e484ec7a760872c3 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Tue, 1 Aug 2017 16:18:01 +1200 Subject: [PATCH] Add job update api By using update api, users could pause/cancel/resume jobs, or change job execution pattern. Change-Id: I05103316a91ad79c4d0593c4a02a400aac45a762 --- qinling/api/controllers/v1/job.py | 70 +++++++ qinling/api/controllers/v1/resources.py | 2 +- qinling/db/api.py | 4 + qinling/db/sqlalchemy/api.py | 10 +- qinling/db/sqlalchemy/models.py | 7 +- qinling/status.py | 2 + .../tests/unit/api/controllers/v1/test_job.py | 179 +++++++++++++++++- qinling/tests/unit/base.py | 25 +-- qinling/utils/jobs.py | 51 +++-- 9 files changed, 305 insertions(+), 45 deletions(-) diff --git a/qinling/api/controllers/v1/job.py b/qinling/api/controllers/v1/job.py index a213fad0..90c74bfc 100644 --- a/qinling/api/controllers/v1/job.py +++ b/qinling/api/controllers/v1/job.py @@ -12,8 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime + +import croniter from oslo_config import cfg from oslo_log import log as logging +from oslo_utils import timeutils from pecan import rest import wsmeext.pecan as wsme_pecan @@ -30,6 +34,8 @@ LOG = logging.getLogger(__name__) CONF = cfg.CONF POST_REQUIRED = set(['function_id']) +UPDATE_ALLOWED = set(['name', 'function_input', 'status', 'pattern', + 'next_execution_time']) class JobsController(rest.RestController): @@ -49,6 +55,7 @@ class JobsController(rest.RestController): 'Required param is missing. Required: %s' % POST_REQUIRED ) + # Check the input params. first_time, next_time, count = jobs.validate_job(params) LOG.info("Creating %s, params: %s", self.type, params) @@ -101,3 +108,66 @@ class JobsController(rest.RestController): for db_model in db_api.get_jobs()] return resources.Jobs(jobs=jobs) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose( + resources.Job, + types.uuid, + body=resources.Job + ) + def put(self, id, job): + """Update job definition. + + 1. Can not update a finished job. + 2. Can not change job type. + 3. Allow to pause a one-shot job and resume before its first execution + time. + """ + values = {} + for key in UPDATE_ALLOWED: + if key in job.to_dict(): + values.update({key: job.to_dict().get(key)}) + + LOG.info('Update resource, params: %s', values, + resource={'type': self.type, 'id': id}) + + new_status = values.get('status') + pattern = values.get('pattern') + next_execution_time = values.get('next_execution_time') + + job_db = db_api.get_job(id) + + if job_db.status in [status.DONE, status.CANCELLED]: + raise exc.InputException('Can not update a finished job.') + + if pattern: + if not job_db.pattern: + raise exc.InputException('Can not change job type.') + jobs.validate_pattern(pattern) + elif pattern == '' and job_db.pattern: + raise exc.InputException('Can not change job type.') + + valid_states = [status.RUNNING, status.CANCELLED, status.PAUSED] + if new_status and new_status not in valid_states: + raise exc.InputException('Invalid status.') + + if next_execution_time: + values['next_execution_time'] = jobs.validate_next_time( + next_execution_time + ) + elif (job_db.status == status.PAUSED and + new_status == status.RUNNING): + p = job_db.pattern or pattern + + if not p: + # Check if the next execution time for one-shot job is still + # valid. + jobs.validate_next_time(job_db.next_execution_time) + else: + # Update next_execution_time for recurring job. + values['next_execution_time'] = croniter.croniter( + p, timeutils.utcnow() + ).get_next(datetime.datetime) + + updated_job = db_api.update_job(id, values) + return resources.Job.from_dict(updated_job.to_dict()) diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index b075c5b9..4154b3e5 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -312,7 +312,7 @@ class Job(Resource): name = wtypes.text function_id = types.uuid function_input = types.jsontype - status = wsme.wsattr(wtypes.text, readonly=True) + status = wtypes.text pattern = wtypes.text count = int first_execution_time = wtypes.text diff --git a/qinling/db/api.py b/qinling/db/api.py index 3646cb08..d70125cd 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -178,6 +178,10 @@ def delete_job(id): return IMPL.delete_job(id) +def update_job(id, values): + return IMPL.update_job(id, values) + + def get_jobs(): return IMPL.get_jobs() diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index f8d89d7a..6a36cd7a 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -430,12 +430,20 @@ def delete_job(id, session=None): return result.rowcount +@db_base.session_aware() +def update_job(id, values, session=None): + job = get_job(id) + job.update(values.copy()) + + return job + + @db_base.session_aware() def get_next_jobs(before, session=None): return _get_collection( models.Job, insecure=True, sort_keys=['next_execution_time'], sort_dirs=['asc'], next_execution_time={'lt': before}, - status={'neq': status.DONE} + status=status.RUNNING ) diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index 5c3a6165..8e037b41 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -83,12 +83,7 @@ class Job(model_base.QinlingSecureModelBase): __tablename__ = 'job' name = sa.Column(sa.String(255), nullable=True) - pattern = sa.Column( - sa.String(32), - nullable=True, - # Set default to 'never'. - default='0 0 30 2 0' - ) + pattern = sa.Column(sa.String(32), nullable=True) status = sa.Column(sa.String(32), nullable=False) first_execution_time = sa.Column(sa.DateTime, nullable=True) next_execution_time = sa.Column(sa.DateTime, nullable=False) diff --git a/qinling/status.py b/qinling/status.py index ae80204b..1d09a228 100644 --- a/qinling/status.py +++ b/qinling/status.py @@ -19,4 +19,6 @@ ERROR = 'error' DELETING = 'deleting' RUNNING = 'running' DONE = 'done' +PAUSED = 'paused' +CANCELLED = 'cancelled' SUCCESS = 'success' diff --git a/qinling/tests/unit/api/controllers/v1/test_job.py b/qinling/tests/unit/api/controllers/v1/test_job.py index 3974135b..261073c7 100644 --- a/qinling/tests/unit/api/controllers/v1/test_job.py +++ b/qinling/tests/unit/api/controllers/v1/test_job.py @@ -15,6 +15,9 @@ from datetime import datetime from datetime import timedelta +from qinling import context as auth_context +from qinling.db import api as db_api +from qinling import status from qinling.tests.unit.api import base @@ -40,9 +43,183 @@ class TestJobController(base.APITest): def test_delete(self): job_id = self.create_job( - self.function_id, prefix='TestJobController' + self.function_id, prefix='TestJobController', + first_execution_time=datetime.utcnow(), + next_execution_time=datetime.utcnow() + timedelta(hours=1), + status=status.RUNNING, + count=1 ).id resp = self.app.delete('/v1/jobs/%s' % job_id) self.assertEqual(204, resp.status_int) + + def test_update_one_shot_job(self): + job_id = self.create_job( + self.function_id, + prefix='TestJobController', + first_execution_time=datetime.utcnow(), + next_execution_time=datetime.utcnow() + timedelta(hours=1), + status=status.RUNNING, + count=1 + ).id + + req_body = { + 'name': 'new_name', + 'status': status.PAUSED + } + resp = self.app.put_json('/v1/jobs/%s' % job_id, req_body) + + self.assertEqual(200, resp.status_int) + self._assertDictContainsSubset(resp.json, req_body) + + req_body = { + 'status': status.RUNNING + } + resp = self.app.put_json('/v1/jobs/%s' % job_id, req_body) + + self.assertEqual(200, resp.status_int) + self._assertDictContainsSubset(resp.json, req_body) + + def test_update_one_shot_job_failed(self): + job_id = self.create_job( + self.function_id, + prefix='TestJobController', + first_execution_time=datetime.utcnow(), + next_execution_time=datetime.utcnow() + timedelta(hours=1), + status=status.RUNNING, + count=1 + ).id + url = '/v1/jobs/%s' % job_id + + # Try to change job type + resp = self.app.put_json( + url, + {'pattern': '*/1 * * * *'}, + expect_errors=True + ) + + self.assertEqual(400, resp.status_int) + self.assertIn('Can not change job type.', resp.json['faultstring']) + + # Try to resume job but the execution time is invalid + auth_context.set_ctx(self.ctx) + self.addCleanup(auth_context.set_ctx, None) + db_api.update_job( + job_id, + { + 'next_execution_time': datetime.utcnow() - timedelta(hours=1), + 'status': status.PAUSED + } + ) + resp = self.app.put_json( + url, + {'status': status.RUNNING}, + expect_errors=True + ) + + self.assertEqual(400, resp.status_int) + self.assertIn( + 'Execution time must be at least 1 minute in the future', + resp.json['faultstring'] + ) + + def test_update_recurring_job(self): + job_id = self.create_job( + self.function_id, + prefix='TestJobController', + first_execution_time=datetime.utcnow() + timedelta(hours=1), + next_execution_time=datetime.utcnow() + timedelta(hours=1), + pattern='0 */1 * * *', + status=status.RUNNING, + count=10 + ).id + + req_body = { + 'next_execution_time': str( + datetime.utcnow() + timedelta(hours=1.5) + ), + 'pattern': '1 */1 * * *' + } + resp = self.app.put_json('/v1/jobs/%s' % job_id, req_body) + + self.assertEqual(200, resp.status_int) + self._assertDictContainsSubset(resp.json, req_body) + + # Pause the job and resume with a valid next_execution_time + req_body = { + 'status': status.PAUSED + } + resp = self.app.put_json('/v1/jobs/%s' % job_id, req_body) + + self.assertEqual(200, resp.status_int) + self._assertDictContainsSubset(resp.json, req_body) + + req_body = { + 'status': status.RUNNING, + 'next_execution_time': str(datetime.utcnow() + timedelta(hours=2)), + } + resp = self.app.put_json('/v1/jobs/%s' % job_id, req_body) + + self.assertEqual(200, resp.status_int) + self._assertDictContainsSubset(resp.json, req_body) + + # Pause the job and resume without specifying next_execution_time + auth_context.set_ctx(self.ctx) + self.addCleanup(auth_context.set_ctx, None) + db_api.update_job( + job_id, + { + 'next_execution_time': datetime.utcnow() - timedelta(hours=1), + 'status': status.PAUSED + } + ) + + req_body = {'status': status.RUNNING} + resp = self.app.put_json('/v1/jobs/%s' % job_id, req_body) + + self.assertEqual(200, resp.status_int) + self._assertDictContainsSubset(resp.json, req_body) + + def test_update_recurring_job_failed(self): + job_id = self.create_job( + self.function_id, + prefix='TestJobController', + first_execution_time=datetime.utcnow() + timedelta(hours=1), + next_execution_time=datetime.utcnow() + timedelta(hours=1), + pattern='0 */1 * * *', + status=status.RUNNING, + count=10 + ).id + url = '/v1/jobs/%s' % job_id + + # Try to change job type + resp = self.app.put_json( + url, + {'pattern': ''}, + expect_errors=True + ) + + self.assertEqual(400, resp.status_int) + self.assertIn('Can not change job type.', resp.json['faultstring']) + + # Pause the job and try to resume with an invalid next_execution_time + auth_context.set_ctx(self.ctx) + self.addCleanup(auth_context.set_ctx, None) + db_api.update_job(job_id, {'status': status.PAUSED}) + resp = self.app.put_json( + url, + { + 'status': status.RUNNING, + 'next_execution_time': str( + datetime.utcnow() - timedelta(hours=1) + ) + }, + expect_errors=True + ) + + self.assertEqual(400, resp.status_int) + self.assertIn( + 'Execution time must be at least 1 minute in the future', + resp.json['faultstring'] + ) diff --git a/qinling/tests/unit/base.py b/qinling/tests/unit/base.py index 385516a7..1dc4452f 100644 --- a/qinling/tests/unit/base.py +++ b/qinling/tests/unit/base.py @@ -14,7 +14,6 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from datetime import datetime import random from oslo_config import cfg @@ -186,22 +185,18 @@ class DbTestCase(BaseTest): return function - def create_job(self, function_id=None, prefix=None): + def create_job(self, function_id=None, prefix=None, **kwargs): if not function_id: function_id = self.create_function(prefix=prefix).id - job = db_api.create_job( - { - 'name': self.rand_name('job', prefix=prefix), - 'function_id': function_id, - 'first_execution_time': datetime.utcnow(), - 'next_execution_time': datetime.utcnow(), - 'count': 1, - # 'auth_enable' is disabled by default, we create runtime for - # default tenant. - 'project_id': DEFAULT_PROJECT_ID, - 'status': status.RUNNING - } - ) + job_params = { + 'name': self.rand_name('job', prefix=prefix), + 'function_id': function_id, + # 'auth_enable' is disabled by default, we create runtime for + # default tenant. + 'project_id': DEFAULT_PROJECT_ID, + } + job_params.update(kwargs) + job = db_api.create_job(job_params) return job diff --git a/qinling/utils/jobs.py b/qinling/utils/jobs.py index 731a7214..c46648d9 100644 --- a/qinling/utils/jobs.py +++ b/qinling/utils/jobs.py @@ -23,32 +23,46 @@ from qinling import exceptions as exc from qinling.utils.openstack import keystone as keystone_utils +def validate_next_time(next_execution_time): + next_time = next_execution_time + if isinstance(next_execution_time, six.string_types): + try: + # We need naive datetime object. + next_time = parser.parse(next_execution_time, ignoretz=True) + except ValueError as e: + raise exc.InputException(str(e)) + + valid_min_time = timeutils.utcnow() + datetime.timedelta(0, 60) + if valid_min_time > next_time: + raise exc.InputException( + 'Execution time must be at least 1 minute in the future.' + ) + + return next_time + + +def validate_pattern(pattern): + try: + croniter.croniter(pattern) + except (ValueError, KeyError): + raise exc.InputException( + 'The specified pattern is not valid: {}'.format(pattern) + ) + + def validate_job(params): first_time = params.get('first_execution_time') pattern = params.get('pattern') count = params.get('count') start_time = timeutils.utcnow() - if isinstance(first_time, six.string_types): - try: - # We need naive datetime object. - first_time = parser.parse(first_time, ignoretz=True) - except ValueError as e: - raise exc.InputException(e.message) - if not (first_time or pattern): raise exc.InputException( 'Pattern or first_execution_time must be specified.' ) + first_time = validate_next_time(first_time) if first_time: - # first_time is assumed to be UTC time. - valid_min_time = timeutils.utcnow() + datetime.timedelta(0, 60) - if valid_min_time > first_time: - raise exc.InputException( - 'first_execution_time must be at least 1 minute in the ' - 'future.' - ) if not pattern and count and count > 1: raise exc.InputException( 'Pattern must be provided if count is greater than 1.' @@ -57,14 +71,9 @@ def validate_job(params): next_time = first_time if not (pattern or count): count = 1 - if pattern: - try: - croniter.croniter(pattern) - except (ValueError, KeyError): - raise exc.InputException( - 'The specified pattern is not valid: {}'.format(pattern) - ) + if pattern: + validate_pattern(pattern) if not first_time: next_time = croniter.croniter(pattern, start_time).get_next( datetime.datetime